Compare commits
No commits in common. 'next' and 'caches' have entirely different histories.
83 changed files with 4964 additions and 10420 deletions
@ -1,309 +1,28 @@
@@ -1,309 +1,28 @@
|
||||
stages: |
||||
- build |
||||
- build docker image |
||||
- test |
||||
- upload artifacts |
||||
image: "rust:latest" |
||||
|
||||
variables: |
||||
GIT_SUBMODULE_STRATEGY: recursive |
||||
FF_USE_FASTZIP: 1 |
||||
CACHE_COMPRESSION_LEVEL: fastest |
||||
|
||||
# --------------------------------------------------------------------- # |
||||
# Cargo: Compiling for different architectures # |
||||
# --------------------------------------------------------------------- # |
||||
|
||||
.build-cargo-shared-settings: |
||||
stage: "build" |
||||
needs: [] |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH == "master"' |
||||
- if: '$CI_COMMIT_BRANCH == "next"' |
||||
- if: "$CI_COMMIT_TAG" |
||||
interruptible: true |
||||
image: "rust:latest" |
||||
tags: ["docker"] |
||||
cache: |
||||
paths: |
||||
- cargohome |
||||
- target/ |
||||
key: "build_cache--$TARGET--$CI_COMMIT_BRANCH--release" |
||||
variables: |
||||
CARGO_PROFILE_RELEASE_LTO: "true" |
||||
CARGO_PROFILE_RELEASE_CODEGEN_UNITS: "1" |
||||
before_script: |
||||
- 'echo "Building for target $TARGET"' |
||||
- 'mkdir -p cargohome && CARGOHOME="cargohome"' |
||||
- "rustc --version && cargo --version && rustup show" # Print version info for debugging |
||||
- "rustup target add $TARGET" |
||||
script: |
||||
- time cargo build --target $TARGET --release |
||||
- 'cp "target/$TARGET/release/conduit" "conduit-$TARGET"' |
||||
artifacts: |
||||
expire_in: never |
||||
|
||||
build:release:cargo:x86_64-unknown-linux-musl-with-debug: |
||||
extends: .build-cargo-shared-settings |
||||
image: messense/rust-musl-cross:x86_64-musl |
||||
variables: |
||||
CARGO_PROFILE_RELEASE_DEBUG: 2 # Enable debug info for flamegraph profiling |
||||
TARGET: "x86_64-unknown-linux-musl" |
||||
after_script: |
||||
- "mv ./conduit-x86_64-unknown-linux-musl ./conduit-x86_64-unknown-linux-musl-with-debug" |
||||
artifacts: |
||||
name: "conduit-x86_64-unknown-linux-musl-with-debug" |
||||
paths: |
||||
- "conduit-x86_64-unknown-linux-musl-with-debug" |
||||
expose_as: "Conduit for x86_64-unknown-linux-musl-with-debug" |
||||
default: |
||||
tags: [docker] |
||||
|
||||
build:release:cargo:x86_64-unknown-linux-musl: |
||||
extends: .build-cargo-shared-settings |
||||
image: messense/rust-musl-cross:x86_64-musl |
||||
variables: |
||||
TARGET: "x86_64-unknown-linux-musl" |
||||
artifacts: |
||||
name: "conduit-x86_64-unknown-linux-musl" |
||||
paths: |
||||
- "conduit-x86_64-unknown-linux-musl" |
||||
expose_as: "Conduit for x86_64-unknown-linux-musl" |
||||
cache: |
||||
paths: |
||||
- target |
||||
- cargohome |
||||
|
||||
build:release:cargo:arm-unknown-linux-musleabihf: |
||||
extends: .build-cargo-shared-settings |
||||
image: messense/rust-musl-cross:arm-musleabihf |
||||
variables: |
||||
TARGET: "arm-unknown-linux-musleabihf" |
||||
artifacts: |
||||
name: "conduit-arm-unknown-linux-musleabihf" |
||||
paths: |
||||
- "conduit-arm-unknown-linux-musleabihf" |
||||
expose_as: "Conduit for arm-unknown-linux-musleabihf" |
||||
|
||||
build:release:cargo:armv7-unknown-linux-musleabihf: |
||||
extends: .build-cargo-shared-settings |
||||
image: messense/rust-musl-cross:armv7-musleabihf |
||||
variables: |
||||
TARGET: "armv7-unknown-linux-musleabihf" |
||||
artifacts: |
||||
name: "conduit-armv7-unknown-linux-musleabihf" |
||||
paths: |
||||
- "conduit-armv7-unknown-linux-musleabihf" |
||||
expose_as: "Conduit for armv7-unknown-linux-musleabihf" |
||||
|
||||
build:release:cargo:aarch64-unknown-linux-musl: |
||||
extends: .build-cargo-shared-settings |
||||
image: messense/rust-musl-cross:aarch64-musl |
||||
variables: |
||||
TARGET: "aarch64-unknown-linux-musl" |
||||
artifacts: |
||||
name: "conduit-aarch64-unknown-linux-musl" |
||||
paths: |
||||
- "conduit-aarch64-unknown-linux-musl" |
||||
expose_as: "Conduit for aarch64-unknown-linux-musl" |
||||
|
||||
.cargo-debug-shared-settings: |
||||
extends: ".build-cargo-shared-settings" |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH != "master"' |
||||
cache: |
||||
key: "build_cache--$TARGET--$CI_COMMIT_BRANCH--debug" |
||||
script: |
||||
- "time cargo build --target $TARGET" |
||||
- 'mv "target/$TARGET/debug/conduit" "conduit-debug-$TARGET"' |
||||
artifacts: |
||||
expire_in: 4 weeks |
||||
|
||||
build:debug:cargo:x86_64-unknown-linux-musl: |
||||
extends: ".cargo-debug-shared-settings" |
||||
image: messense/rust-musl-cross:x86_64-musl |
||||
variables: |
||||
TARGET: "x86_64-unknown-linux-musl" |
||||
artifacts: |
||||
name: "conduit-debug-x86_64-unknown-linux-musl" |
||||
paths: |
||||
- "conduit-debug-x86_64-unknown-linux-musl" |
||||
expose_as: "Conduit DEBUG for x86_64-unknown-linux-musl" |
||||
|
||||
# --------------------------------------------------------------------- # |
||||
# Create and publish docker image # |
||||
# --------------------------------------------------------------------- # |
||||
|
||||
.docker-shared-settings: |
||||
stage: "build docker image" |
||||
image: jdrouet/docker-with-buildx:stable |
||||
tags: ["docker"] |
||||
services: |
||||
- docker:dind |
||||
needs: |
||||
- "build:release:cargo:x86_64-unknown-linux-musl" |
||||
- "build:release:cargo:arm-unknown-linux-musleabihf" |
||||
- "build:release:cargo:armv7-unknown-linux-musleabihf" |
||||
- "build:release:cargo:aarch64-unknown-linux-musl" |
||||
variables: |
||||
DOCKER_HOST: tcp://docker:2375/ |
||||
DOCKER_TLS_CERTDIR: "" |
||||
DOCKER_DRIVER: overlay2 |
||||
PLATFORMS: "linux/arm/v6,linux/arm/v7,linux/arm64,linux/amd64" |
||||
DOCKER_FILE: "docker/ci-binaries-packaging.Dockerfile" |
||||
cache: |
||||
paths: |
||||
- docker_cache |
||||
key: "$CI_JOB_NAME" |
||||
before_script: |
||||
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY |
||||
# Only log in to Dockerhub if the credentials are given: |
||||
- if [ -n "${DOCKER_HUB}" ]; then docker login -u "$DOCKER_HUB_USER" -p "$DOCKER_HUB_PASSWORD" "$DOCKER_HUB"; fi |
||||
script: |
||||
# Prepare buildx to build multiarch stuff: |
||||
- docker context create 'ci-context' |
||||
- docker buildx create --name 'multiarch-builder' --use 'ci-context' |
||||
# Copy binaries to their docker arch path |
||||
- mkdir -p linux/ && mv ./conduit-x86_64-unknown-linux-musl linux/amd64 |
||||
- mkdir -p linux/arm/ && mv ./conduit-arm-unknown-linux-musleabihf linux/arm/v6 |
||||
- mkdir -p linux/arm/ && mv ./conduit-armv7-unknown-linux-musleabihf linux/arm/v7 |
||||
- mv ./conduit-aarch64-unknown-linux-musl linux/arm64 |
||||
- 'export CREATED=$(date -u +''%Y-%m-%dT%H:%M:%SZ'') && echo "Docker image creation date: $CREATED"' |
||||
# Build and push image: |
||||
- > |
||||
docker buildx build |
||||
--pull |
||||
--push |
||||
--cache-from=type=local,src=$CI_PROJECT_DIR/docker_cache |
||||
--cache-to=type=local,dest=$CI_PROJECT_DIR/docker_cache |
||||
--build-arg CREATED=$CREATED |
||||
--build-arg VERSION=$(grep -m1 -o '[0-9].[0-9].[0-9]' Cargo.toml) |
||||
--build-arg "GIT_REF=$CI_COMMIT_SHORT_SHA" |
||||
--platform "$PLATFORMS" |
||||
--tag "$TAG" |
||||
--tag "$TAG-alpine" |
||||
--tag "$TAG-commit-$CI_COMMIT_SHORT_SHA" |
||||
--file "$DOCKER_FILE" . |
||||
|
||||
docker:next:gitlab: |
||||
extends: .docker-shared-settings |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH == "next"' |
||||
variables: |
||||
TAG: "$CI_REGISTRY_IMAGE/matrix-conduit:next" |
||||
|
||||
docker:next:dockerhub: |
||||
extends: .docker-shared-settings |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH == "next" && $DOCKER_HUB' |
||||
variables: |
||||
TAG: "$DOCKER_HUB_IMAGE/matrixconduit/matrix-conduit:next" |
||||
|
||||
docker:master:gitlab: |
||||
extends: .docker-shared-settings |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH == "master"' |
||||
variables: |
||||
TAG: "$CI_REGISTRY_IMAGE/matrix-conduit:latest" |
||||
|
||||
docker:master:dockerhub: |
||||
extends: .docker-shared-settings |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH == "master" && $DOCKER_HUB' |
||||
variables: |
||||
TAG: "$DOCKER_HUB_IMAGE/matrixconduit/matrix-conduit:latest" |
||||
variables: |
||||
GIT_SUBMODULE_STRATEGY: recursive |
||||
CARGO_HOME: "cargohome" |
||||
FF_USE_FASTZIP: 1 |
||||
|
||||
# --------------------------------------------------------------------- # |
||||
# Run tests # |
||||
# --------------------------------------------------------------------- # |
||||
before_script: |
||||
- mkdir -p $CARGO_HOME && echo "using $CARGO_HOME to cache cargo deps" |
||||
- apt-get update -yqq |
||||
- apt-get install -yqq --no-install-recommends build-essential libssl-dev pkg-config |
||||
- rustup component add clippy rustfmt |
||||
|
||||
test:cargo: |
||||
stage: "test" |
||||
needs: [] |
||||
image: "rust:latest" |
||||
tags: ["docker"] |
||||
variables: |
||||
CARGO_HOME: "cargohome" |
||||
cache: |
||||
paths: |
||||
- target |
||||
- cargohome |
||||
key: test_cache |
||||
interruptible: true |
||||
before_script: |
||||
- mkdir -p $CARGO_HOME && echo "using $CARGO_HOME to cache cargo deps" |
||||
- apt-get update -yqq |
||||
- apt-get install -yqq --no-install-recommends build-essential libssl-dev pkg-config wget |
||||
- rustup component add clippy rustfmt |
||||
- wget "https://faulty-storage.de/gitlab-report" |
||||
- chmod +x ./gitlab-report |
||||
script: |
||||
- rustc --version && cargo --version # Print version info for debugging |
||||
- rustc --version && cargo --version # Print version info for debugging |
||||
- cargo test --workspace --verbose --locked |
||||
- cargo fmt --all -- --check |
||||
- "cargo test --color always --workspace --verbose --locked --no-fail-fast -- -Z unstable-options --format json | ./gitlab-report -p test > $CI_PROJECT_DIR/report.xml" |
||||
- "cargo clippy --color always --verbose --message-format=json | ./gitlab-report -p clippy > $CI_PROJECT_DIR/gl-code-quality-report.json" |
||||
artifacts: |
||||
when: always |
||||
reports: |
||||
junit: report.xml |
||||
codequality: gl-code-quality-report.json |
||||
|
||||
test:sytest: |
||||
stage: "test" |
||||
allow_failure: true |
||||
needs: |
||||
- "build:debug:cargo:x86_64-unknown-linux-musl" |
||||
image: |
||||
name: "valkum/sytest-conduit:latest" |
||||
entrypoint: [""] |
||||
tags: ["docker"] |
||||
variables: |
||||
PLUGINS: "https://github.com/valkum/sytest_conduit/archive/master.tar.gz" |
||||
before_script: |
||||
- "mkdir -p /app" |
||||
- "cp ./conduit-debug-x86_64-unknown-linux-musl /app/conduit" |
||||
- "chmod +x /app/conduit" |
||||
- "rm -rf /src && ln -s $CI_PROJECT_DIR/ /src" |
||||
- "mkdir -p /work/server-0/database/ && mkdir -p /work/server-1/database/ && mkdir -p /work/server-2/database/" |
||||
- "cd /" |
||||
script: |
||||
- "SYTEST_EXIT_CODE=0" |
||||
- "/bootstrap.sh conduit || SYTEST_EXIT_CODE=1" |
||||
- 'perl /sytest/tap-to-junit-xml.pl --puretap --input /logs/results.tap --output $CI_PROJECT_DIR/sytest.xml "Sytest" && cp /logs/results.tap $CI_PROJECT_DIR/results.tap' |
||||
- "exit $SYTEST_EXIT_CODE" |
||||
artifacts: |
||||
when: always |
||||
paths: |
||||
- "$CI_PROJECT_DIR/sytest.xml" |
||||
- "$CI_PROJECT_DIR/results.tap" |
||||
reports: |
||||
junit: "$CI_PROJECT_DIR/sytest.xml" |
||||
|
||||
# --------------------------------------------------------------------- # |
||||
# Store binaries as package so they have download urls # |
||||
# --------------------------------------------------------------------- # |
||||
|
||||
publish:package: |
||||
stage: "upload artifacts" |
||||
needs: |
||||
- "build:release:cargo:x86_64-unknown-linux-musl" |
||||
- "build:release:cargo:arm-unknown-linux-musleabihf" |
||||
- "build:release:cargo:armv7-unknown-linux-musleabihf" |
||||
- "build:release:cargo:aarch64-unknown-linux-musl" |
||||
# - "build:cargo-deb:x86_64-unknown-linux-gnu" |
||||
rules: |
||||
- if: '$CI_COMMIT_BRANCH == "master"' |
||||
- if: '$CI_COMMIT_BRANCH == "next"' |
||||
- if: "$CI_COMMIT_TAG" |
||||
image: curlimages/curl:latest |
||||
tags: ["docker"] |
||||
variables: |
||||
GIT_STRATEGY: "none" # Don't need a clean copy of the code, we just operate on artifacts |
||||
script: |
||||
- 'BASE_URL="${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/conduit-${CI_COMMIT_REF_SLUG}/build-${CI_PIPELINE_ID}"' |
||||
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file conduit-x86_64-unknown-linux-musl "${BASE_URL}/conduit-x86_64-unknown-linux-musl"' |
||||
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file conduit-arm-unknown-linux-musleabihf "${BASE_URL}/conduit-arm-unknown-linux-musleabihf"' |
||||
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file conduit-armv7-unknown-linux-musleabihf "${BASE_URL}/conduit-armv7-unknown-linux-musleabihf"' |
||||
- 'curl --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file conduit-aarch64-unknown-linux-musl "${BASE_URL}/conduit-aarch64-unknown-linux-musl"' |
||||
|
||||
# Avoid duplicate pipelines |
||||
# See: https://docs.gitlab.com/ee/ci/yaml/workflow.html#switch-between-branch-pipelines-and-merge-request-pipelines |
||||
workflow: |
||||
rules: |
||||
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"' |
||||
- if: "$CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS" |
||||
when: never |
||||
- if: "$CI_COMMIT_BRANCH" |
||||
- cargo clippy |
||||
|
||||
@ -1,19 +0,0 @@
@@ -1,19 +0,0 @@
|
||||
<!-- |
||||
If you're requesting a new feature, that isn't part of this project yet, |
||||
then please consider filling out a "Feature Request" instead! |
||||
|
||||
If you need a hand setting up your conduit server, feel free to ask for help in the |
||||
Conduit Matrix Chat: https://matrix.to/#/#conduit:fachschaften.org. |
||||
--> |
||||
|
||||
### Description |
||||
<!-- What did you do and what happened? Why is that bad? --> |
||||
|
||||
### System Configuration |
||||
<!-- Other data that might help us debug this issue, like os, conduit version, database backend --> |
||||
|
||||
Conduit Version: |
||||
Database backend (default is sqlite): sqlite |
||||
|
||||
|
||||
/label ~conduit |
||||
@ -1,17 +0,0 @@
@@ -1,17 +0,0 @@
|
||||
<!-- |
||||
If you want to report a bug or an error, |
||||
then please consider filling out a "Bug Report" instead! |
||||
--> |
||||
|
||||
|
||||
### Is your feature request related to a problem? Please describe. |
||||
|
||||
<!-- Eg. I'm always frustrated when [...] --> |
||||
|
||||
|
||||
### Describe the solution you'd like |
||||
|
||||
|
||||
|
||||
|
||||
/label ~conduit |
||||
@ -0,0 +1,15 @@
@@ -0,0 +1,15 @@
|
||||
# Headline |
||||
|
||||
### Description |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/label ~conduit |
||||
@ -1,8 +0,0 @@
@@ -1,8 +0,0 @@
|
||||
|
||||
<!-- Please describe your changes here --> |
||||
|
||||
----------------------------------------------------------------------------- |
||||
|
||||
- [ ] I ran `cargo fmt` and `cargo test` |
||||
- [ ] I agree to release my code and all other changes of this MR under the Apache-2.0 license |
||||
|
||||
@ -1,78 +0,0 @@
@@ -1,78 +0,0 @@
|
||||
# syntax=docker/dockerfile:1 |
||||
# --------------------------------------------------------------------------------------------------------- |
||||
# This Dockerfile is intended to be built as part of Conduit's CI pipeline. |
||||
# It does not build Conduit in Docker, but just copies the matching build artifact from the build jobs. |
||||
# |
||||
# It is mostly based on the normal Conduit Dockerfile, but adjusted in a few places to maximise caching. |
||||
# Credit's for the original Dockerfile: Weasy666. |
||||
# --------------------------------------------------------------------------------------------------------- |
||||
|
||||
FROM docker.io/alpine:3.15.0 AS runner |
||||
|
||||
# Standard port on which Conduit launches. |
||||
# You still need to map the port when using the docker command or docker-compose. |
||||
EXPOSE 6167 |
||||
|
||||
# Note from @jfowl: I would like to remove this in the future and just have the Docker version be configured with envs. |
||||
ENV CONDUIT_CONFIG="/srv/conduit/conduit.toml" |
||||
|
||||
# Conduit needs: |
||||
# ca-certificates: for https |
||||
# libgcc: Apparently this is needed, even if I (@jfowl) don't know exactly why. But whatever, it's not that big. |
||||
RUN apk add --no-cache \ |
||||
ca-certificates \ |
||||
libgcc |
||||
|
||||
|
||||
ARG CREATED |
||||
ARG VERSION |
||||
ARG GIT_REF |
||||
# Labels according to https://github.com/opencontainers/image-spec/blob/master/annotations.md |
||||
# including a custom label specifying the build command |
||||
LABEL org.opencontainers.image.created=${CREATED} \ |
||||
org.opencontainers.image.authors="Conduit Contributors" \ |
||||
org.opencontainers.image.title="Conduit" \ |
||||
org.opencontainers.image.version=${VERSION} \ |
||||
org.opencontainers.image.vendor="Conduit Contributors" \ |
||||
org.opencontainers.image.description="A Matrix homeserver written in Rust" \ |
||||
org.opencontainers.image.url="https://conduit.rs/" \ |
||||
org.opencontainers.image.revision=${GIT_REF} \ |
||||
org.opencontainers.image.source="https://gitlab.com/famedly/conduit.git" \ |
||||
org.opencontainers.image.licenses="Apache-2.0" \ |
||||
org.opencontainers.image.documentation="https://gitlab.com/famedly/conduit" \ |
||||
org.opencontainers.image.ref.name="" |
||||
|
||||
# Created directory for the database and media files |
||||
RUN mkdir -p /srv/conduit/.local/share/conduit |
||||
|
||||
# Test if Conduit is still alive, uses the same endpoint as Element |
||||
COPY ./docker/healthcheck.sh /srv/conduit/healthcheck.sh |
||||
HEALTHCHECK --start-period=5s --interval=5s CMD ./healthcheck.sh |
||||
|
||||
|
||||
# Depending on the target platform (e.g. "linux/arm/v7", "linux/arm64/v8", or "linux/amd64") |
||||
# copy the matching binary into this docker image |
||||
ARG TARGETPLATFORM |
||||
COPY ./$TARGETPLATFORM /srv/conduit/conduit |
||||
|
||||
|
||||
# Improve security: Don't run stuff as root, that does not need to run as root: |
||||
# Add www-data user and group with UID 82, as used by alpine |
||||
# https://git.alpinelinux.org/aports/tree/main/nginx/nginx.pre-install |
||||
RUN set -x ; \ |
||||
addgroup -Sg 82 www-data 2>/dev/null ; \ |
||||
adduser -S -D -H -h /srv/conduit -G www-data -g www-data www-data 2>/dev/null ; \ |
||||
addgroup www-data www-data 2>/dev/null && exit 0 ; exit 1 |
||||
|
||||
# Change ownership of Conduit files to www-data user and group |
||||
RUN chown -cR www-data:www-data /srv/conduit |
||||
RUN chmod +x /srv/conduit/healthcheck.sh |
||||
|
||||
# Change user to www-data |
||||
USER www-data |
||||
# Set container home directory |
||||
WORKDIR /srv/conduit |
||||
|
||||
# Run Conduit and print backtraces on panics |
||||
ENV RUST_BACKTRACE=1 |
||||
ENTRYPOINT [ "/srv/conduit/conduit" ] |
||||
@ -1,13 +0,0 @@
@@ -1,13 +0,0 @@
|
||||
#!/bin/sh |
||||
|
||||
# If the port is not specified as env var, take it from the config file |
||||
if [ -z ${CONDUIT_PORT} ]; then |
||||
CONDUIT_PORT=$(grep -m1 -o 'port\s=\s[0-9]*' conduit.toml | grep -m1 -o '[0-9]*') |
||||
fi |
||||
|
||||
# The actual health check. |
||||
# We try to first get a response on HTTP and when that fails on HTTPS and when that fails, we exit with code 1. |
||||
# TODO: Change this to a single wget call. Do we have a config value that we can check for that? |
||||
wget --no-verbose --tries=1 --spider "http://localhost:${CONDUIT_PORT}/_matrix/client/versions" || \ |
||||
wget --no-verbose --tries=1 --spider "https://localhost:${CONDUIT_PORT}/_matrix/client/versions" || \ |
||||
exit 1 |
||||
File diff suppressed because it is too large
Load Diff
@ -1,63 +1,42 @@
@@ -1,63 +1,42 @@
|
||||
use std::sync::Arc; |
||||
|
||||
use crate::{database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Ruma}; |
||||
use super::State; |
||||
use crate::{pdu::PduBuilder, ConduitResult, Database, Ruma}; |
||||
use ruma::{ |
||||
api::client::r0::redact::redact_event, |
||||
events::{room::redaction::RoomRedactionEventContent, EventType}, |
||||
events::{room::redaction, EventType}, |
||||
}; |
||||
use std::sync::Arc; |
||||
|
||||
#[cfg(feature = "conduit_bin")] |
||||
use rocket::put; |
||||
use serde_json::value::to_raw_value; |
||||
|
||||
/// # `PUT /_matrix/client/r0/rooms/{roomId}/redact/{eventId}/{txnId}`
|
||||
///
|
||||
/// Tries to send a redaction event into the room.
|
||||
///
|
||||
/// - TODO: Handle txn id
|
||||
#[cfg_attr(
|
||||
feature = "conduit_bin", |
||||
put("/_matrix/client/r0/rooms/<_>/redact/<_>/<_>", data = "<body>") |
||||
)] |
||||
#[tracing::instrument(skip(db, body))] |
||||
pub async fn redact_event_route( |
||||
db: DatabaseGuard, |
||||
db: State<'_, Arc<Database>>, |
||||
body: Ruma<redact_event::Request<'_>>, |
||||
) -> ConduitResult<redact_event::Response> { |
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); |
||||
let body = body.body; |
||||
|
||||
let mutex_state = Arc::clone( |
||||
db.globals |
||||
.roomid_mutex_state |
||||
.write() |
||||
.unwrap() |
||||
.entry(body.room_id.clone()) |
||||
.or_default(), |
||||
); |
||||
let state_lock = mutex_state.lock().await; |
||||
|
||||
let event_id = db.rooms.build_and_append_pdu( |
||||
PduBuilder { |
||||
event_type: EventType::RoomRedaction, |
||||
content: to_raw_value(&RoomRedactionEventContent { |
||||
content: serde_json::to_value(redaction::RedactionEventContent { |
||||
reason: body.reason.clone(), |
||||
}) |
||||
.expect("event is valid, we just created it"), |
||||
unsigned: None, |
||||
state_key: None, |
||||
redacts: Some(body.event_id.into()), |
||||
redacts: Some(body.event_id.clone()), |
||||
}, |
||||
sender_user, |
||||
&sender_user, |
||||
&body.room_id, |
||||
&db, |
||||
&state_lock, |
||||
)?; |
||||
|
||||
drop(state_lock); |
||||
|
||||
db.flush()?; |
||||
db.flush().await?; |
||||
|
||||
let event_id = (*event_id).to_owned(); |
||||
Ok(redact_event::Response { event_id }.into()) |
||||
} |
||||
|
||||
@ -1,84 +0,0 @@
@@ -1,84 +0,0 @@
|
||||
use crate::{ |
||||
database::{admin::AdminCommand, DatabaseGuard}, |
||||
ConduitResult, Error, Ruma, |
||||
}; |
||||
use ruma::{ |
||||
api::client::{error::ErrorKind, r0::room::report_content}, |
||||
events::room::message, |
||||
int, |
||||
}; |
||||
|
||||
#[cfg(feature = "conduit_bin")] |
||||
use rocket::{http::RawStr, post}; |
||||
|
||||
/// # `POST /_matrix/client/r0/rooms/{roomId}/report/{eventId}`
|
||||
///
|
||||
/// Reports an inappropriate event to homeserver admins
|
||||
///
|
||||
#[cfg_attr(
|
||||
feature = "conduit_bin", |
||||
post("/_matrix/client/r0/rooms/<_>/report/<_>", data = "<body>") |
||||
)] |
||||
#[tracing::instrument(skip(db, body))] |
||||
pub async fn report_event_route( |
||||
db: DatabaseGuard, |
||||
body: Ruma<report_content::Request<'_>>, |
||||
) -> ConduitResult<report_content::Response> { |
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); |
||||
|
||||
let pdu = match db.rooms.get_pdu(&body.event_id)? { |
||||
Some(pdu) => pdu, |
||||
_ => { |
||||
return Err(Error::BadRequest( |
||||
ErrorKind::InvalidParam, |
||||
"Invalid Event ID", |
||||
)) |
||||
} |
||||
}; |
||||
|
||||
if body.score > int!(0) || body.score < int!(-100) { |
||||
return Err(Error::BadRequest( |
||||
ErrorKind::InvalidParam, |
||||
"Invalid score, must be within 0 to -100", |
||||
)); |
||||
}; |
||||
|
||||
if body.reason.chars().count() > 250 { |
||||
return Err(Error::BadRequest( |
||||
ErrorKind::InvalidParam, |
||||
"Reason too long, should be 250 characters or fewer", |
||||
)); |
||||
}; |
||||
|
||||
db.admin.send(AdminCommand::SendMessage( |
||||
message::RoomMessageEventContent::text_html( |
||||
format!( |
||||
"Report received from: {}\n\n\ |
||||
Event ID: {}\n\ |
||||
Room ID: {}\n\ |
||||
Sent By: {}\n\n\ |
||||
Report Score: {}\n\ |
||||
Report Reason: {}", |
||||
sender_user, pdu.event_id, pdu.room_id, pdu.sender, body.score, body.reason |
||||
), |
||||
format!( |
||||
"<details><summary>Report received from: <a href=\"https://matrix.to/#/{0}\">{0}\
|
||||
</a></summary><ul><li>Event Info<ul><li>Event ID: <code>{1}</code>\ |
||||
<a href=\"https://matrix.to/#/{2}/{1}\">🔗</a></li><li>Room ID: <code>{2}</code>\
|
||||
</li><li>Sent By: <a href=\"https://matrix.to/#/{3}\">{3}</a></li></ul></li><li>\
|
||||
Report Info<ul><li>Report Score: {4}</li><li>Report Reason: {5}</li></ul></li>\ |
||||
</ul></details>", |
||||
sender_user, |
||||
pdu.event_id, |
||||
pdu.room_id, |
||||
pdu.sender, |
||||
body.score, |
||||
RawStr::new(&body.reason).html_escape() |
||||
), |
||||
), |
||||
)); |
||||
|
||||
db.flush()?; |
||||
|
||||
Ok(report_content::Response {}.into()) |
||||
} |
||||
@ -1,58 +1,18 @@
@@ -1,58 +1,18 @@
|
||||
use crate::{database::DatabaseGuard, ConduitResult, Ruma}; |
||||
use hmac::{Hmac, Mac, NewMac}; |
||||
use crate::ConduitResult; |
||||
use ruma::api::client::r0::voip::get_turn_server_info; |
||||
use ruma::SecondsSinceUnixEpoch; |
||||
use sha1::Sha1; |
||||
use std::time::{Duration, SystemTime}; |
||||
|
||||
type HmacSha1 = Hmac<Sha1>; |
||||
use std::time::Duration; |
||||
|
||||
#[cfg(feature = "conduit_bin")] |
||||
use rocket::get; |
||||
|
||||
/// # `GET /_matrix/client/r0/voip/turnServer`
|
||||
///
|
||||
/// TODO: Returns information about the recommended turn server.
|
||||
#[cfg_attr(
|
||||
feature = "conduit_bin", |
||||
get("/_matrix/client/r0/voip/turnServer", data = "<body>") |
||||
)] |
||||
#[tracing::instrument(skip(body, db))] |
||||
pub async fn turn_server_route( |
||||
body: Ruma<get_turn_server_info::Request>, |
||||
db: DatabaseGuard, |
||||
) -> ConduitResult<get_turn_server_info::Response> { |
||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated"); |
||||
|
||||
let turn_secret = db.globals.turn_secret(); |
||||
|
||||
let (username, password) = if !turn_secret.is_empty() { |
||||
let expiry = SecondsSinceUnixEpoch::from_system_time( |
||||
SystemTime::now() + Duration::from_secs(db.globals.turn_ttl()), |
||||
) |
||||
.expect("time is valid"); |
||||
|
||||
let username: String = format!("{}:{}", expiry.get(), sender_user); |
||||
|
||||
let mut mac = HmacSha1::new_from_slice(turn_secret.as_bytes()) |
||||
.expect("HMAC can take key of any size"); |
||||
mac.update(username.as_bytes()); |
||||
|
||||
let password: String = base64::encode_config(mac.finalize().into_bytes(), base64::STANDARD); |
||||
|
||||
(username, password) |
||||
} else { |
||||
( |
||||
db.globals.turn_username().clone(), |
||||
db.globals.turn_password().clone(), |
||||
) |
||||
}; |
||||
|
||||
#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/voip/turnServer"))] |
||||
#[tracing::instrument] |
||||
pub async fn turn_server_route() -> ConduitResult<get_turn_server_info::Response> { |
||||
Ok(get_turn_server_info::Response { |
||||
username, |
||||
password, |
||||
uris: db.globals.turn_uris().to_vec(), |
||||
ttl: Duration::from_secs(db.globals.turn_ttl()), |
||||
username: "".to_owned(), |
||||
password: "".to_owned(), |
||||
uris: Vec::new(), |
||||
ttl: Duration::from_secs(60 * 60 * 24), |
||||
} |
||||
.into()) |
||||
} |
||||
|
||||
@ -1,240 +0,0 @@
@@ -1,240 +0,0 @@
|
||||
use super::super::Config; |
||||
use crossbeam::channel::{bounded, Sender as ChannelSender}; |
||||
use threadpool::ThreadPool; |
||||
|
||||
use crate::{Error, Result}; |
||||
use std::{ |
||||
collections::HashMap, |
||||
future::Future, |
||||
pin::Pin, |
||||
sync::{Arc, Mutex, RwLock}, |
||||
}; |
||||
use tokio::sync::oneshot::Sender; |
||||
|
||||
use super::{DatabaseEngine, Tree}; |
||||
|
||||
type TupleOfBytes = (Vec<u8>, Vec<u8>); |
||||
|
||||
pub struct Engine { |
||||
env: heed::Env, |
||||
iter_pool: Mutex<ThreadPool>, |
||||
} |
||||
|
||||
pub struct EngineTree { |
||||
engine: Arc<Engine>, |
||||
tree: Arc<heed::UntypedDatabase>, |
||||
watchers: RwLock<HashMap<Vec<u8>, Vec<Sender<()>>>>, |
||||
} |
||||
|
||||
fn convert_error(error: heed::Error) -> Error { |
||||
Error::HeedError { |
||||
error: error.to_string(), |
||||
} |
||||
} |
||||
|
||||
impl DatabaseEngine for Engine { |
||||
fn open(config: &Config) -> Result<Arc<Self>> { |
||||
let mut env_builder = heed::EnvOpenOptions::new(); |
||||
env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte
|
||||
env_builder.max_readers(126); |
||||
env_builder.max_dbs(128); |
||||
unsafe { |
||||
env_builder.flag(heed::flags::Flags::MdbWriteMap); |
||||
env_builder.flag(heed::flags::Flags::MdbMapAsync); |
||||
} |
||||
|
||||
Ok(Arc::new(Engine { |
||||
env: env_builder |
||||
.open(&config.database_path) |
||||
.map_err(convert_error)?, |
||||
iter_pool: Mutex::new(ThreadPool::new(10)), |
||||
})) |
||||
} |
||||
|
||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { |
||||
// Creates the db if it doesn't exist already
|
||||
Ok(Arc::new(EngineTree { |
||||
engine: Arc::clone(self), |
||||
tree: Arc::new( |
||||
self.env |
||||
.create_database(Some(name)) |
||||
.map_err(convert_error)?, |
||||
), |
||||
watchers: RwLock::new(HashMap::new()), |
||||
})) |
||||
} |
||||
|
||||
fn flush(self: &Arc<Self>) -> Result<()> { |
||||
self.env.force_sync().map_err(convert_error)?; |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
impl EngineTree { |
||||
#[tracing::instrument(skip(self, tree, from, backwards))] |
||||
fn iter_from_thread( |
||||
&self, |
||||
tree: Arc<heed::UntypedDatabase>, |
||||
from: Vec<u8>, |
||||
backwards: bool, |
||||
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> { |
||||
let (s, r) = bounded::<TupleOfBytes>(100); |
||||
let engine = Arc::clone(&self.engine); |
||||
|
||||
let lock = self.engine.iter_pool.lock().await; |
||||
if lock.active_count() < lock.max_count() { |
||||
lock.execute(move || { |
||||
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); |
||||
}); |
||||
} else { |
||||
std::thread::spawn(move || { |
||||
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); |
||||
}); |
||||
} |
||||
|
||||
Box::new(r.into_iter()) |
||||
} |
||||
} |
||||
|
||||
#[tracing::instrument(skip(tree, txn, from, backwards))] |
||||
fn iter_from_thread_work( |
||||
tree: Arc<heed::UntypedDatabase>, |
||||
txn: &heed::RoTxn<'_>, |
||||
from: Vec<u8>, |
||||
backwards: bool, |
||||
s: &ChannelSender<(Vec<u8>, Vec<u8>)>, |
||||
) { |
||||
if backwards { |
||||
for (k, v) in tree.rev_range(txn, ..=&*from).unwrap().map(|r| r.unwrap()) { |
||||
if s.send((k.to_vec(), v.to_vec())).is_err() { |
||||
return; |
||||
} |
||||
} |
||||
} else { |
||||
if from.is_empty() { |
||||
for (k, v) in tree.iter(txn).unwrap().map(|r| r.unwrap()) { |
||||
if s.send((k.to_vec(), v.to_vec())).is_err() { |
||||
return; |
||||
} |
||||
} |
||||
} else { |
||||
for (k, v) in tree.range(txn, &*from..).unwrap().map(|r| r.unwrap()) { |
||||
if s.send((k.to_vec(), v.to_vec())).is_err() { |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl Tree for EngineTree { |
||||
#[tracing::instrument(skip(self, key))] |
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
let txn = self.engine.env.read_txn().map_err(convert_error)?; |
||||
Ok(self |
||||
.tree |
||||
.get(&txn, &key) |
||||
.map_err(convert_error)? |
||||
.map(|s| s.to_vec())) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, key, value))] |
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { |
||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?; |
||||
self.tree |
||||
.put(&mut txn, &key, &value) |
||||
.map_err(convert_error)?; |
||||
txn.commit().map_err(convert_error)?; |
||||
|
||||
let watchers = self.watchers.read().unwrap(); |
||||
let mut triggered = Vec::new(); |
||||
|
||||
for length in 0..=key.len() { |
||||
if watchers.contains_key(&key[..length]) { |
||||
triggered.push(&key[..length]); |
||||
} |
||||
} |
||||
|
||||
drop(watchers); |
||||
|
||||
if !triggered.is_empty() { |
||||
let mut watchers = self.watchers.write().unwrap(); |
||||
for prefix in triggered { |
||||
if let Some(txs) = watchers.remove(prefix) { |
||||
for tx in txs { |
||||
let _ = tx.send(()); |
||||
} |
||||
} |
||||
} |
||||
}; |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, key))] |
||||
fn remove(&self, key: &[u8]) -> Result<()> { |
||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?; |
||||
self.tree.delete(&mut txn, &key).map_err(convert_error)?; |
||||
txn.commit().map_err(convert_error)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self))] |
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { |
||||
self.iter_from(&[], false) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, from, backwards))] |
||||
fn iter_from( |
||||
&self, |
||||
from: &[u8], |
||||
backwards: bool, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> { |
||||
self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, key))] |
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
||||
let mut txn = self.engine.env.write_txn().map_err(convert_error)?; |
||||
|
||||
let old = self.tree.get(&txn, &key).map_err(convert_error)?; |
||||
let new = |
||||
crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some"); |
||||
|
||||
self.tree |
||||
.put(&mut txn, &key, &&*new) |
||||
.map_err(convert_error)?; |
||||
|
||||
txn.commit().map_err(convert_error)?; |
||||
|
||||
Ok(new) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, prefix))] |
||||
fn scan_prefix<'a>( |
||||
&'a self, |
||||
prefix: Vec<u8>, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> { |
||||
Box::new( |
||||
self.iter_from(&prefix, false) |
||||
.take_while(move |(key, _)| key.starts_with(&prefix)), |
||||
) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, prefix))] |
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
||||
let (tx, rx) = tokio::sync::oneshot::channel(); |
||||
|
||||
self.watchers |
||||
.write() |
||||
.unwrap() |
||||
.entry(prefix.to_vec()) |
||||
.or_default() |
||||
.push(tx); |
||||
|
||||
Box::pin(async move { |
||||
// Tx is never destroyed
|
||||
rx.await.unwrap(); |
||||
}) |
||||
} |
||||
} |
||||
@ -1,128 +0,0 @@
@@ -1,128 +0,0 @@
|
||||
use super::super::Config; |
||||
use crate::{utils, Result}; |
||||
use std::{future::Future, pin::Pin, sync::Arc}; |
||||
use tracing::warn; |
||||
|
||||
use super::{DatabaseEngine, Tree}; |
||||
|
||||
pub struct Engine(sled::Db); |
||||
|
||||
pub struct SledEngineTree(sled::Tree); |
||||
|
||||
impl DatabaseEngine for Engine { |
||||
fn open(config: &Config) -> Result<Arc<Self>> { |
||||
Ok(Arc::new(Engine( |
||||
sled::Config::default() |
||||
.path(&config.database_path) |
||||
.cache_capacity((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64) |
||||
.use_compression(true) |
||||
.open()?, |
||||
))) |
||||
} |
||||
|
||||
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> { |
||||
Ok(Arc::new(SledEngineTree(self.0.open_tree(name)?))) |
||||
} |
||||
|
||||
fn flush(self: &Arc<Self>) -> Result<()> { |
||||
Ok(()) // noop
|
||||
} |
||||
} |
||||
|
||||
impl Tree for SledEngineTree { |
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
Ok(self.0.get(key)?.map(|v| v.to_vec())) |
||||
} |
||||
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { |
||||
self.0.insert(key, value)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, iter))] |
||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> { |
||||
for (key, value) in iter { |
||||
self.0.insert(key, value)?; |
||||
} |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
fn remove(&self, key: &[u8]) -> Result<()> { |
||||
self.0.remove(key)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
||||
Box::new( |
||||
self.0 |
||||
.iter() |
||||
.filter_map(|r| { |
||||
if let Err(e) = &r { |
||||
warn!("Error: {}", e); |
||||
} |
||||
r.ok() |
||||
}) |
||||
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into())), |
||||
) |
||||
} |
||||
|
||||
fn iter_from( |
||||
&self, |
||||
from: &[u8], |
||||
backwards: bool, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> { |
||||
let iter = if backwards { |
||||
self.0.range(..=from) |
||||
} else { |
||||
self.0.range(from..) |
||||
}; |
||||
|
||||
let iter = iter |
||||
.filter_map(|r| { |
||||
if let Err(e) = &r { |
||||
warn!("Error: {}", e); |
||||
} |
||||
r.ok() |
||||
}) |
||||
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); |
||||
|
||||
if backwards { |
||||
Box::new(iter.rev()) |
||||
} else { |
||||
Box::new(iter) |
||||
} |
||||
} |
||||
|
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
||||
Ok(self |
||||
.0 |
||||
.update_and_fetch(key, utils::increment) |
||||
.map(|o| o.expect("increment always sets a value").to_vec())?) |
||||
} |
||||
|
||||
fn scan_prefix<'a>( |
||||
&'a self, |
||||
prefix: Vec<u8>, |
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> { |
||||
let iter = self |
||||
.0 |
||||
.scan_prefix(prefix) |
||||
.filter_map(|r| { |
||||
if let Err(e) = &r { |
||||
warn!("Error: {}", e); |
||||
} |
||||
r.ok() |
||||
}) |
||||
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into())); |
||||
|
||||
Box::new(iter) |
||||
} |
||||
|
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
||||
let prefix = prefix.to_vec(); |
||||
Box::pin(async move { |
||||
self.0.watch_prefix(prefix).await; |
||||
}) |
||||
} |
||||
} |
||||
@ -1,392 +0,0 @@
@@ -1,392 +0,0 @@
|
||||
use super::{DatabaseEngine, Tree}; |
||||
use crate::{database::Config, Result}; |
||||
use parking_lot::{Mutex, MutexGuard, RwLock}; |
||||
use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; |
||||
use std::{ |
||||
cell::RefCell, |
||||
collections::{hash_map, HashMap}, |
||||
future::Future, |
||||
path::{Path, PathBuf}, |
||||
pin::Pin, |
||||
sync::Arc, |
||||
}; |
||||
use thread_local::ThreadLocal; |
||||
use tokio::sync::watch; |
||||
use tracing::debug; |
||||
|
||||
thread_local! { |
||||
static READ_CONNECTION: RefCell<Option<&'static Connection>> = RefCell::new(None); |
||||
static READ_CONNECTION_ITERATOR: RefCell<Option<&'static Connection>> = RefCell::new(None); |
||||
} |
||||
|
||||
struct PreparedStatementIterator<'a> { |
||||
pub iterator: Box<dyn Iterator<Item = TupleOfBytes> + 'a>, |
||||
pub statement_ref: NonAliasingBox<rusqlite::Statement<'a>>, |
||||
} |
||||
|
||||
impl Iterator for PreparedStatementIterator<'_> { |
||||
type Item = TupleOfBytes; |
||||
|
||||
fn next(&mut self) -> Option<Self::Item> { |
||||
self.iterator.next() |
||||
} |
||||
} |
||||
|
||||
struct NonAliasingBox<T>(*mut T); |
||||
impl<T> Drop for NonAliasingBox<T> { |
||||
fn drop(&mut self) { |
||||
unsafe { Box::from_raw(self.0) }; |
||||
} |
||||
} |
||||
|
||||
pub struct Engine { |
||||
writer: Mutex<Connection>, |
||||
read_conn_tls: ThreadLocal<Connection>, |
||||
read_iterator_conn_tls: ThreadLocal<Connection>, |
||||
|
||||
path: PathBuf, |
||||
cache_size_per_thread: u32, |
||||
} |
||||
|
||||
impl Engine { |
||||
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> { |
||||
let conn = Connection::open(&path)?; |
||||
|
||||
conn.pragma_update(Some(Main), "page_size", &2048)?; |
||||
conn.pragma_update(Some(Main), "journal_mode", &"WAL")?; |
||||
conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?; |
||||
conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?; |
||||
conn.pragma_update(Some(Main), "wal_autocheckpoint", &0)?; |
||||
|
||||
Ok(conn) |
||||
} |
||||
|
||||
fn write_lock(&self) -> MutexGuard<'_, Connection> { |
||||
self.writer.lock() |
||||
} |
||||
|
||||
fn read_lock(&self) -> &Connection { |
||||
self.read_conn_tls |
||||
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()) |
||||
} |
||||
|
||||
fn read_lock_iterator(&self) -> &Connection { |
||||
self.read_iterator_conn_tls |
||||
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()) |
||||
} |
||||
|
||||
pub fn flush_wal(self: &Arc<Self>) -> Result<()> { |
||||
self.write_lock() |
||||
.pragma_update(Some(Main), "wal_checkpoint", &"RESTART")?; |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
impl DatabaseEngine for Engine { |
||||
fn open(config: &Config) -> Result<Arc<Self>> { |
||||
let path = Path::new(&config.database_path).join("conduit.db"); |
||||
|
||||
// calculates cache-size per permanent connection
|
||||
// 1. convert MB to KiB
|
||||
// 2. divide by permanent connections + permanent iter connections + write connection
|
||||
// 3. round down to nearest integer
|
||||
let cache_size_per_thread: u32 = ((config.db_cache_capacity_mb * 1024.0) |
||||
/ ((num_cpus::get().max(1) * 2) + 1) as f64) |
||||
as u32; |
||||
|
||||
let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?); |
||||
|
||||
let arc = Arc::new(Engine { |
||||
writer, |
||||
read_conn_tls: ThreadLocal::new(), |
||||
read_iterator_conn_tls: ThreadLocal::new(), |
||||
path, |
||||
cache_size_per_thread, |
||||
}); |
||||
|
||||
Ok(arc) |
||||
} |
||||
|
||||
fn open_tree(self: &Arc<Self>, name: &str) -> Result<Arc<dyn Tree>> { |
||||
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )", name), [])?; |
||||
|
||||
Ok(Arc::new(SqliteTable { |
||||
engine: Arc::clone(self), |
||||
name: name.to_owned(), |
||||
watchers: RwLock::new(HashMap::new()), |
||||
})) |
||||
} |
||||
|
||||
fn flush(self: &Arc<Self>) -> Result<()> { |
||||
// we enabled PRAGMA synchronous=normal, so this should not be necessary
|
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
pub struct SqliteTable { |
||||
engine: Arc<Engine>, |
||||
name: String, |
||||
watchers: RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>, |
||||
} |
||||
|
||||
type TupleOfBytes = (Vec<u8>, Vec<u8>); |
||||
|
||||
impl SqliteTable { |
||||
#[tracing::instrument(skip(self, guard, key))] |
||||
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
//dbg!(&self.name);
|
||||
Ok(guard |
||||
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? |
||||
.query_row([key], |row| row.get(0)) |
||||
.optional()?) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, guard, key, value))] |
||||
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { |
||||
//dbg!(&self.name);
|
||||
guard.execute( |
||||
format!( |
||||
"INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", |
||||
self.name |
||||
) |
||||
.as_str(), |
||||
[key, value], |
||||
)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn iter_with_guard<'a>( |
||||
&'a self, |
||||
guard: &'a Connection, |
||||
) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> { |
||||
let statement = Box::leak(Box::new( |
||||
guard |
||||
.prepare(&format!( |
||||
"SELECT key, value FROM {} ORDER BY key ASC", |
||||
&self.name |
||||
)) |
||||
.unwrap(), |
||||
)); |
||||
|
||||
let statement_ref = NonAliasingBox(statement); |
||||
|
||||
//let name = self.name.clone();
|
||||
|
||||
let iterator = Box::new( |
||||
statement |
||||
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) |
||||
.unwrap() |
||||
.map(move |r| { |
||||
//dbg!(&name);
|
||||
r.unwrap() |
||||
}), |
||||
); |
||||
|
||||
Box::new(PreparedStatementIterator { |
||||
iterator, |
||||
statement_ref, |
||||
}) |
||||
} |
||||
} |
||||
|
||||
impl Tree for SqliteTable { |
||||
#[tracing::instrument(skip(self, key))] |
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
self.get_with_guard(self.engine.read_lock(), key) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, key, value))] |
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { |
||||
let guard = self.engine.write_lock(); |
||||
self.insert_with_guard(&guard, key, value)?; |
||||
drop(guard); |
||||
|
||||
let watchers = self.watchers.read(); |
||||
let mut triggered = Vec::new(); |
||||
|
||||
for length in 0..=key.len() { |
||||
if watchers.contains_key(&key[..length]) { |
||||
triggered.push(&key[..length]); |
||||
} |
||||
} |
||||
|
||||
drop(watchers); |
||||
|
||||
if !triggered.is_empty() { |
||||
let mut watchers = self.watchers.write(); |
||||
for prefix in triggered { |
||||
if let Some(tx) = watchers.remove(prefix) { |
||||
let _ = tx.0.send(()); |
||||
} |
||||
} |
||||
}; |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, iter))] |
||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> { |
||||
let guard = self.engine.write_lock(); |
||||
|
||||
guard.execute("BEGIN", [])?; |
||||
for (key, value) in iter { |
||||
self.insert_with_guard(&guard, &key, &value)?; |
||||
} |
||||
guard.execute("COMMIT", [])?; |
||||
|
||||
drop(guard); |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, iter))] |
||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> { |
||||
let guard = self.engine.write_lock(); |
||||
|
||||
guard.execute("BEGIN", [])?; |
||||
for key in iter { |
||||
let old = self.get_with_guard(&guard, &key)?; |
||||
let new = crate::utils::increment(old.as_deref()) |
||||
.expect("utils::increment always returns Some"); |
||||
self.insert_with_guard(&guard, &key, &new)?; |
||||
} |
||||
guard.execute("COMMIT", [])?; |
||||
|
||||
drop(guard); |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, key))] |
||||
fn remove(&self, key: &[u8]) -> Result<()> { |
||||
let guard = self.engine.write_lock(); |
||||
|
||||
guard.execute( |
||||
format!("DELETE FROM {} WHERE key = ?", self.name).as_str(), |
||||
[key], |
||||
)?; |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self))] |
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> { |
||||
let guard = self.engine.read_lock_iterator(); |
||||
|
||||
self.iter_with_guard(guard) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, from, backwards))] |
||||
fn iter_from<'a>( |
||||
&'a self, |
||||
from: &[u8], |
||||
backwards: bool, |
||||
) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> { |
||||
let guard = self.engine.read_lock_iterator(); |
||||
let from = from.to_vec(); // TODO change interface?
|
||||
|
||||
//let name = self.name.clone();
|
||||
|
||||
if backwards { |
||||
let statement = Box::leak(Box::new( |
||||
guard |
||||
.prepare(&format!( |
||||
"SELECT key, value FROM {} WHERE key <= ? ORDER BY key DESC", |
||||
&self.name |
||||
)) |
||||
.unwrap(), |
||||
)); |
||||
|
||||
let statement_ref = NonAliasingBox(statement); |
||||
|
||||
let iterator = Box::new( |
||||
statement |
||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) |
||||
.unwrap() |
||||
.map(move |r| { |
||||
//dbg!(&name);
|
||||
r.unwrap() |
||||
}), |
||||
); |
||||
Box::new(PreparedStatementIterator { |
||||
iterator, |
||||
statement_ref, |
||||
}) |
||||
} else { |
||||
let statement = Box::leak(Box::new( |
||||
guard |
||||
.prepare(&format!( |
||||
"SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC", |
||||
&self.name |
||||
)) |
||||
.unwrap(), |
||||
)); |
||||
|
||||
let statement_ref = NonAliasingBox(statement); |
||||
|
||||
let iterator = Box::new( |
||||
statement |
||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) |
||||
.unwrap() |
||||
.map(move |r| { |
||||
//dbg!(&name);
|
||||
r.unwrap() |
||||
}), |
||||
); |
||||
|
||||
Box::new(PreparedStatementIterator { |
||||
iterator, |
||||
statement_ref, |
||||
}) |
||||
} |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, key))] |
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> { |
||||
let guard = self.engine.write_lock(); |
||||
|
||||
let old = self.get_with_guard(&guard, key)?; |
||||
|
||||
let new = |
||||
crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some"); |
||||
|
||||
self.insert_with_guard(&guard, key, &new)?; |
||||
|
||||
Ok(new) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, prefix))] |
||||
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> { |
||||
Box::new( |
||||
self.iter_from(&prefix, false) |
||||
.take_while(move |(key, _)| key.starts_with(&prefix)), |
||||
) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self, prefix))] |
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
||||
let mut rx = match self.watchers.write().entry(prefix.to_vec()) { |
||||
hash_map::Entry::Occupied(o) => o.get().1.clone(), |
||||
hash_map::Entry::Vacant(v) => { |
||||
let (tx, rx) = tokio::sync::watch::channel(()); |
||||
v.insert((tx, rx.clone())); |
||||
rx |
||||
} |
||||
}; |
||||
|
||||
Box::pin(async move { |
||||
// Tx is never destroyed
|
||||
rx.changed().await.unwrap(); |
||||
}) |
||||
} |
||||
|
||||
#[tracing::instrument(skip(self))] |
||||
fn clear(&self) -> Result<()> { |
||||
debug!("clear: running"); |
||||
self.engine |
||||
.write_lock() |
||||
.execute(format!("DELETE FROM {}", self.name).as_str(), [])?; |
||||
debug!("clear: ran"); |
||||
Ok(()) |
||||
} |
||||
} |
||||
@ -1,146 +0,0 @@
@@ -1,146 +0,0 @@
|
||||
use reqwest::{Proxy, Url}; |
||||
use serde::Deserialize; |
||||
|
||||
use crate::Result; |
||||
|
||||
/// ## Examples:
|
||||
/// - No proxy (default):
|
||||
/// ```toml
|
||||
/// proxy ="none"
|
||||
/// ```
|
||||
/// - Global proxy
|
||||
/// ```toml
|
||||
/// [proxy]
|
||||
/// global = { url = "socks5h://localhost:9050" }
|
||||
/// ```
|
||||
/// - Proxy some domains
|
||||
/// ```toml
|
||||
/// [proxy]
|
||||
/// [[proxy.by_domain]]
|
||||
/// url = "socks5h://localhost:9050"
|
||||
/// include = ["*.onion", "matrix.myspecial.onion"]
|
||||
/// exclude = ["*.myspecial.onion"]
|
||||
/// ```
|
||||
/// ## Include vs. Exclude
|
||||
/// If include is an empty list, it is assumed to be `["*"]`.
|
||||
///
|
||||
/// If a domain matches both the exclude and include list, the proxy will only be used if it was
|
||||
/// included because of a more specific rule than it was excluded. In the above example, the proxy
|
||||
/// would be used for `ordinary.onion`, `matrix.myspecial.onion`, but not `hello.myspecial.onion`.
|
||||
#[derive(Clone, Debug, Deserialize)] |
||||
#[serde(rename_all = "snake_case")] |
||||
pub enum ProxyConfig { |
||||
None, |
||||
Global { |
||||
#[serde(deserialize_with = "crate::utils::deserialize_from_str")] |
||||
url: Url, |
||||
}, |
||||
ByDomain(Vec<PartialProxyConfig>), |
||||
} |
||||
impl ProxyConfig { |
||||
pub fn to_proxy(&self) -> Result<Option<Proxy>> { |
||||
Ok(match self.clone() { |
||||
ProxyConfig::None => None, |
||||
ProxyConfig::Global { url } => Some(Proxy::all(url)?), |
||||
ProxyConfig::ByDomain(proxies) => Some(Proxy::custom(move |url| { |
||||
proxies.iter().find_map(|proxy| proxy.for_url(url)).cloned() // first matching proxy
|
||||
})), |
||||
}) |
||||
} |
||||
} |
||||
impl Default for ProxyConfig { |
||||
fn default() -> Self { |
||||
ProxyConfig::None |
||||
} |
||||
} |
||||
|
||||
#[derive(Clone, Debug, Deserialize)] |
||||
pub struct PartialProxyConfig { |
||||
#[serde(deserialize_with = "crate::utils::deserialize_from_str")] |
||||
url: Url, |
||||
#[serde(default)] |
||||
include: Vec<WildCardedDomain>, |
||||
#[serde(default)] |
||||
exclude: Vec<WildCardedDomain>, |
||||
} |
||||
impl PartialProxyConfig { |
||||
pub fn for_url(&self, url: &Url) -> Option<&Url> { |
||||
let domain = url.domain()?; |
||||
let mut included_because = None; // most specific reason it was included
|
||||
let mut excluded_because = None; // most specific reason it was excluded
|
||||
if self.include.is_empty() { |
||||
// treat empty include list as `*`
|
||||
included_because = Some(&WildCardedDomain::WildCard) |
||||
} |
||||
for wc_domain in &self.include { |
||||
if wc_domain.matches(domain) { |
||||
match included_because { |
||||
Some(prev) if !wc_domain.more_specific_than(prev) => (), |
||||
_ => included_because = Some(wc_domain), |
||||
} |
||||
} |
||||
} |
||||
for wc_domain in &self.exclude { |
||||
if wc_domain.matches(domain) { |
||||
match excluded_because { |
||||
Some(prev) if !wc_domain.more_specific_than(prev) => (), |
||||
_ => excluded_because = Some(wc_domain), |
||||
} |
||||
} |
||||
} |
||||
match (included_because, excluded_because) { |
||||
(Some(a), Some(b)) if a.more_specific_than(b) => Some(&self.url), // included for a more specific reason than excluded
|
||||
(Some(_), None) => Some(&self.url), |
||||
_ => None, |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// A domain name, that optionally allows a * as its first subdomain.
|
||||
#[derive(Clone, Debug)] |
||||
pub enum WildCardedDomain { |
||||
WildCard, |
||||
WildCarded(String), |
||||
Exact(String), |
||||
} |
||||
impl WildCardedDomain { |
||||
pub fn matches(&self, domain: &str) -> bool { |
||||
match self { |
||||
WildCardedDomain::WildCard => true, |
||||
WildCardedDomain::WildCarded(d) => domain.ends_with(d), |
||||
WildCardedDomain::Exact(d) => domain == d, |
||||
} |
||||
} |
||||
pub fn more_specific_than(&self, other: &Self) -> bool { |
||||
match (self, other) { |
||||
(WildCardedDomain::WildCard, WildCardedDomain::WildCard) => false, |
||||
(_, WildCardedDomain::WildCard) => true, |
||||
(WildCardedDomain::Exact(a), WildCardedDomain::WildCarded(_)) => other.matches(a), |
||||
(WildCardedDomain::WildCarded(a), WildCardedDomain::WildCarded(b)) => { |
||||
a != b && a.ends_with(b) |
||||
} |
||||
_ => false, |
||||
} |
||||
} |
||||
} |
||||
impl std::str::FromStr for WildCardedDomain { |
||||
type Err = std::convert::Infallible; |
||||
fn from_str(s: &str) -> Result<Self, Self::Err> { |
||||
// maybe do some domain validation?
|
||||
Ok(if s.starts_with("*.") { |
||||
WildCardedDomain::WildCarded(s[1..].to_owned()) |
||||
} else if s == "*" { |
||||
WildCardedDomain::WildCarded("".to_owned()) |
||||
} else { |
||||
WildCardedDomain::Exact(s.to_owned()) |
||||
}) |
||||
} |
||||
} |
||||
impl<'de> Deserialize<'de> for WildCardedDomain { |
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> |
||||
where |
||||
D: serde::de::Deserializer<'de>, |
||||
{ |
||||
crate::utils::deserialize_from_str(deserializer) |
||||
} |
||||
} |
||||
@ -1,15 +0,0 @@
@@ -1,15 +0,0 @@
|
||||
[global] |
||||
|
||||
# Server runs in same container as tests do, so localhost is fine |
||||
server_name = "localhost" |
||||
|
||||
# With a bit of luck /tmp is a RAM disk, so that the file system does not become the bottleneck while testing |
||||
database_path = "/tmp" |
||||
|
||||
# All the other settings are left at their defaults: |
||||
port = 6167 |
||||
max_request_size = 20_000_000 |
||||
allow_registration = true |
||||
trusted_servers = ["matrix.org"] |
||||
address = "127.0.0.1" |
||||
proxy = "none" |
||||
Loading…
Reference in new issue