Decompose the job scheduler (#520)

* make job scheduler more modular

* up

* Move to traits

* up

* add redis feature

* add basic scheduling test

* use minio run by github actions

* update cargo-dist

* update openidconnect

* changes

* remove slow test

* up

* im cryign

* new snapshot
This commit is contained in:
Aumetra Weisman 2024-04-29 20:35:47 +02:00 committed by GitHub
parent a247eb01ad
commit f42d425463
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
77 changed files with 2582 additions and 1698 deletions

View File

@ -12,6 +12,13 @@ jobs:
name: Test coverage
runs-on: ubuntu-latest
services:
minio:
image: bitnami/minio
env:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- 9000:9000
postgres:
image: postgres
ports:
@ -46,6 +53,7 @@ jobs:
run: cargo llvm-cov nextest --all-features --workspace --lcov --output-path lcov.info
env:
DATABASE_URL: "postgres://postgres:postgres@localhost/test_db"
MINIO_URL: "http://127.0.0.1:9000"
REDIS_URL: "redis://localhost"
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4

View File

@ -1,4 +1,4 @@
# Copyright 2022-2023, axodotdev
# Copyright 2022-2024, axodotdev
# SPDX-License-Identifier: MIT or Apache-2.0
#
# CI that:
@ -6,9 +6,9 @@
# * checks for a Git Tag that looks like a release
# * builds artifacts with cargo-dist (archives, installers, hashes)
# * uploads those artifacts to temporary workflow zip
# * on success, uploads the artifacts to a Github Release
# * on success, uploads the artifacts to a GitHub Release
#
# Note that the Github Release will be created with a generated
# Note that the GitHub Release will be created with a generated
# title/body based on your changelogs.
name: Release
@ -31,7 +31,7 @@ permissions:
# packages versioned/released in lockstep).
#
# If you push multiple tags at once, separate instances of this workflow will
# spin up, creating an independent announcement for each one. However Github
# spin up, creating an independent announcement for each one. However, GitHub
# will hard limit this to 3 tags per commit, as it will assume more tags is a
# mistake.
#
@ -62,7 +62,7 @@ jobs:
# we specify bash to get pipefail; it guards against the `curl` command
# failing. otherwise `sh` won't catch that `curl` returned non-0
shell: bash
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.12.0/cargo-dist-installer.sh | sh"
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.13.2/cargo-dist-installer.sh | sh"
# sure would be cool if github gave us proper conditionals...
# so here's a doubly-nested ternary-via-truthiness to try to provide the best possible
# functionality based on whether this is a pull_request, and whether it's from a fork.
@ -109,6 +109,8 @@ jobs:
with:
submodules: recursive
- uses: swatinem/rust-cache@v2
with:
key: ${{ join(matrix.targets, '-') }}
- name: Install cargo-dist
run: ${{ matrix.install_dist }}
# Get the dist-manifest
@ -135,7 +137,7 @@ jobs:
run: |
# Parse out what we just built and upload it to scratch storage
echo "paths<<EOF" >> "$GITHUB_OUTPUT"
jq --raw-output ".artifacts[]?.path | select( . != null )" dist-manifest.json >> "$GITHUB_OUTPUT"
jq --raw-output ".upload_files[]" dist-manifest.json >> "$GITHUB_OUTPUT"
echo "EOF" >> "$GITHUB_OUTPUT"
cp dist-manifest.json "$BUILD_MANIFEST_NAME"
@ -162,7 +164,7 @@ jobs:
submodules: recursive
- name: Install cargo-dist
shell: bash
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.12.0/cargo-dist-installer.sh | sh"
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.13.2/cargo-dist-installer.sh | sh"
# Get all the local artifacts for the global tasks to use (for e.g. checksums)
- name: Fetch local artifacts
uses: actions/download-artifact@v4
@ -178,7 +180,7 @@ jobs:
# Parse out what we just built and upload it to scratch storage
echo "paths<<EOF" >> "$GITHUB_OUTPUT"
jq --raw-output ".artifacts[]?.path | select( . != null )" dist-manifest.json >> "$GITHUB_OUTPUT"
jq --raw-output ".upload_files[]" dist-manifest.json >> "$GITHUB_OUTPUT"
echo "EOF" >> "$GITHUB_OUTPUT"
cp dist-manifest.json "$BUILD_MANIFEST_NAME"
@ -207,7 +209,7 @@ jobs:
with:
submodules: recursive
- name: Install cargo-dist
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.12.0/cargo-dist-installer.sh | sh"
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.13.2/cargo-dist-installer.sh | sh"
# Fetch artifacts from scratch-storage
- name: Fetch artifacts
uses: actions/download-artifact@v4
@ -215,7 +217,7 @@ jobs:
pattern: artifacts-*
path: target/distrib/
merge-multiple: true
# This is a harmless no-op for Github Releases, hosting for that happens in "announce"
# This is a harmless no-op for GitHub Releases, hosting for that happens in "announce"
- id: host
shell: bash
run: |
@ -230,7 +232,7 @@ jobs:
name: artifacts-dist-manifest
path: dist-manifest.json
# Create a Github Release while uploading all files to it
# Create a GitHub Release while uploading all files to it
announce:
needs:
- plan
@ -246,7 +248,7 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: "Download Github Artifacts"
- name: "Download GitHub Artifacts"
uses: actions/download-artifact@v4
with:
pattern: artifacts-*
@ -256,7 +258,7 @@ jobs:
run: |
# Remove the granular manifests
rm -f artifacts/*-dist-manifest.json
- name: Create Github Release
- name: Create GitHub Release
uses: ncipollo/release-action@v1
with:
tag: ${{ needs.plan.outputs.tag }}

View File

@ -39,6 +39,13 @@ jobs:
name: Test
runs-on: ubuntu-latest
services:
minio:
image: bitnami/minio
env:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- 9000:9000
postgres:
image: postgres
ports:
@ -71,4 +78,5 @@ jobs:
- run: nix develop --impure --command bash -c "unset LD_LIBRARY_PATH && cargo nextest run --all-features"
env:
DATABASE_URL: "postgres://postgres:postgres@localhost/test_db"
MINIO_URL: "http://127.0.0.1:9000"
REDIS_URL: "redis://localhost"

1381
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,4 @@
[profile.dev.package]
backtrace = { opt-level = 3 }
num-bigint-dig = { opt-level = 3 }
taplo = { debug-assertions = false } # A debug assertion will make the xtask panic with too long trailing comments
# The profile that 'cargo dist' will build with
@ -100,7 +98,7 @@ license = "AGPL-3.0-or-later"
# Whether to pass --all-features to cargo build
all-features = true
# The preferred cargo-dist version to use in CI (Cargo.toml SemVer syntax)
cargo-dist-version = "0.12.0"
cargo-dist-version = "0.13.2"
# CI backends to support
ci = ["github"]
# The installers to generate for each app
@ -118,4 +116,4 @@ pr-run-mode = "plan"
install-updater = true
[patch.crates-io]
diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "017ebe2fb7a2709ab5db92148dea5ce812a35e09" }
diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "d02798c67065d763154d7272dd0c09b39757d0f2" }

View File

@ -8,6 +8,7 @@
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/kitsune-soc/kitsune/rust.yml?style=for-the-badge)
[![dependency status](https://deps.rs/repo/github/kitsune-soc/kitsune/status.svg?style=for-the-badge)](https://deps.rs/repo/github/kitsune-soc/kitsune)
[![Maintenance: Experimental](https://img.shields.io/badge/maintainance-experimental-blue?style=for-the-badge)](https://gist.github.com/taiki-e/ad73eaea17e2e0372efb76ef6b38f17b)
</div>

View File

@ -6,10 +6,10 @@ version.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.79"
async-trait = "0.1.80"
autometrics = { version = "1.0.1", default-features = false }
base64-simd = "0.8.0"
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
futures-util = "0.3.30"
headers = "0.4.0"
@ -32,12 +32,12 @@ kitsune-util = { path = "../kitsune-util" }
kitsune-wasm-mrf = { path = "../kitsune-wasm-mrf" }
mime = "0.3.17"
mime_guess = { version = "2.0.4", default-features = false }
serde = "1.0.197"
serde = "1.0.199"
sha2 = "0.10.8"
simd-json = "0.13.9"
simd-json = "0.13.10"
speedy-uuid = { path = "../../lib/speedy-uuid" }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
url = "2.5.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
@ -45,7 +45,7 @@ sha2 = { version = "0.10.8", features = ["asm"] }
[dev-dependencies]
http-body-util = "0.1.1"
hyper = "1.2.0"
hyper = "1.3.1"
kitsune-config = { path = "../kitsune-config" }
kitsune-test = { path = "../kitsune-test" }
kitsune-webfinger = { path = "../kitsune-webfinger" }

View File

@ -8,16 +8,16 @@ license.workspace = true
[dependencies]
enum_dispatch = "0.3.13"
kitsune-error = { path = "../kitsune-error" }
moka = { version = "0.12.5", features = ["future"] }
moka = { version = "0.12.7", features = ["future"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
redis = { version = "0.25.3", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
serde = "1.0.197"
simd-json = "0.13.9"
serde = "1.0.199"
simd-json = "0.13.10"
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[dev-dependencies]
tokio = { version = "1.37.0", features = ["macros", "rt"] }

View File

@ -10,11 +10,11 @@ enum_dispatch = "0.3.13"
http = "1.1.0"
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
serde_urlencoded = "0.7.1"
simd-json = "0.13.9"
simd-json = "0.13.10"
strum = { version = "0.26.2", features = ["derive"] }
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -9,7 +9,7 @@ license.workspace = true
eyre = "0.6.12"
human-size = { version = "0.4.3", features = ["serde"] }
isolang = { version = "2.4.0", features = ["serde"] }
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
smol_str = { version = "0.2.1", features = ["serde"] }
tokio = { version = "1.37.0", features = ["fs"] }
toml = { version = "0.8.12", default-features = false, features = ["parse"] }

View File

@ -7,12 +7,12 @@ license.workspace = true
build = "build.rs"
[dependencies]
async-trait = "0.1.79"
async-trait = "0.1.80"
const_format = "0.2.32"
kitsune-db = { path = "../kitsune-db" }
kitsune-error = { path = "../kitsune-error" }
serde = { version = "1.0.197", features = ["derive"] }
typed-builder = "0.18.1"
serde = { version = "1.0.199", features = ["derive"] }
typed-builder = "0.18.2"
[build-dependencies]
vergen = { version = "8.3.1", features = ["build", "git", "gitcl"] }

View File

@ -8,7 +8,7 @@ build = "build.rs"
[dependencies]
blowocking = { path = "../../lib/blowocking" }
diesel = { version = "2.1.5", features = ["uuid"] }
diesel = { version = "2.1.6", features = ["uuid"] }
diesel-async = { version = "0.4.1", features = [
"async-connection-wrapper",
"bb8",
@ -34,8 +34,8 @@ rustls = { version = "0.23.5", default-features = false, features = [
"tls12",
] }
rustls-native-certs = "0.7.0"
serde = { version = "1.0.197", features = ["derive"] }
simd-json = "0.13.9"
serde = { version = "1.0.199", features = ["derive"] }
simd-json = "0.13.10"
speedy-uuid = { path = "../../lib/speedy-uuid", features = ["diesel"] }
tokio = { version = "1.37.0", features = ["rt"] }
tokio-postgres = "0.7.10"
@ -43,7 +43,7 @@ tokio-postgres-rustls = "0.12.0"
tracing = "0.1.40"
tracing-log = "0.2.0"
trials = { path = "../../lib/trials" }
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[dev-dependencies]
kitsune-test = { path = "../kitsune-test" }

View File

@ -11,12 +11,12 @@ ignored = ["askama_axum"] # See reason below.
[dependencies]
askama = "0.12.1"
askama_axum = "0.4.0" # Damn it, cargo. Because "kitsune" uses "askama" with the axum feature, we have to have the crate available here as well..
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
kitsune-db = { path = "../kitsune-db" }
kitsune-error = { path = "../kitsune-error" }
kitsune-url = { path = "../kitsune-url" }
lettre = { version = "0.11.6", default-features = false, features = [
lettre = { version = "0.11.7", default-features = false, features = [
"builder",
"hostname",
"pool",
@ -25,13 +25,13 @@ lettre = { version = "0.11.6", default-features = false, features = [
"tokio1-rustls-tls",
"tracing",
] }
mrml = { version = "3.1.4", default-features = false, features = [
mrml = { version = "3.1.5", default-features = false, features = [
"orderedmap",
"parse",
"render",
] }
speedy-uuid = { path = "../../lib/speedy-uuid" }
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -6,9 +6,9 @@ version.workspace = true
license.workspace = true
[dependencies]
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
embed-sdk = { git = "https://github.com/Lantern-chat/embed-service.git", rev = "0d43394bb2514f57edc402a83f69b171705c3650" }
embed-sdk = { git = "https://github.com/Lantern-chat/embed-service.git", rev = "f46ea95ca89775a6c35d0d5cef9172cbc931258a" }
http = "1.1.0"
iso8601-timestamp = "0.2.17"
kitsune-db = { path = "../kitsune-db" }
@ -17,7 +17,7 @@ kitsune-http-client = { path = "../kitsune-http-client" }
once_cell = "1.19.0"
scraper = { version = "0.19.0", default-features = false }
smol_str = "0.2.1"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -10,8 +10,8 @@ axum-core = "0.4.3"
eyre = "0.6.12"
garde = { version = "0.18.0", default-features = false, features = ["serde"] }
http = "1.1.0"
simd-json = "0.13.9"
sync_wrapper = "1.0.0"
simd-json = "0.13.10"
sync_wrapper = "1.0.1"
tracing = "0.1.40"
[lints]

View File

@ -18,7 +18,7 @@ kitsune-service = { path = "../kitsune-service" }
kitsune-url = { path = "../kitsune-url" }
kitsune-wasm-mrf = { path = "../kitsune-wasm-mrf" }
kitsune-webfinger = { path = "../kitsune-webfinger" }
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -14,14 +14,14 @@ futures-util = { version = "0.3.30", default-features = false, features = [
http-body = "1.0.0"
http-body-util = "0.1.1"
http-signatures = { path = "../../lib/http-signatures" }
hyper = "1.2.0"
hyper = "1.3.1"
hyper-util = { version = "0.1.3", features = [
"client-legacy",
"http1",
"http2",
"tokio",
] }
hyper-rustls = { version = "0.27.0", default-features = false, features = [
hyper-rustls = { version = "0.27.1", default-features = false, features = [
"http1",
"http2",
"logging",
@ -31,9 +31,9 @@ hyper-rustls = { version = "0.27.0", default-features = false, features = [
] }
kitsune-type = { path = "../kitsune-type" }
pin-project = "1.1.5"
serde = "1.0.197"
serde = "1.0.199"
simdutf8 = { version = "0.1.4", features = ["aarch64_neon"] }
simd-json = "0.13.9"
simd-json = "0.13.10"
tower = { version = "0.4.13", features = ["util"] }
tower-http = { version = "0.5.2", features = [
# Explicitly exclude `zstd`

View File

@ -45,7 +45,8 @@ pub struct Error {
impl Error {
#[inline]
pub(crate) fn new<E>(inner: E) -> Self
#[doc(hidden)]
pub fn new<E>(inner: E) -> Self
where
E: Into<BoxError>,
{

View File

@ -8,17 +8,17 @@ license.workspace = true
[dependencies]
athena = { path = "../../lib/athena" }
derive_more = { version = "1.0.0-beta.6", features = ["from"] }
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
futures-util = "0.3.30"
kitsune-core = { path = "../kitsune-core" }
kitsune-db = { path = "../kitsune-db" }
kitsune-email = { path = "../kitsune-email" }
kitsune-error = { path = "../kitsune-error" }
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
speedy-uuid = { path = "../../lib/speedy-uuid" }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -6,7 +6,7 @@ version.workspace = true
license.workspace = true
[dependencies]
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = { version = "0.4.1", features = ["postgres"] }
kitsune-config = { path = "../kitsune-config" }
isolang = { version = "2.4.0", features = [

View File

@ -6,7 +6,7 @@ version.workspace = true
license.workspace = true
[dependencies]
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
futures-util = "0.3.30"
iso8601-timestamp = "0.2.17"
@ -19,12 +19,12 @@ kitsune-type = { path = "../kitsune-type" }
kitsune-url = { path = "../kitsune-url" }
kitsune-util = { path = "../kitsune-util" }
mime = "0.3.17"
serde = "1.0.197"
simd-json = "0.13.9"
serde = "1.0.199"
simd-json = "0.13.10"
smol_str = "0.2.1"
speedy-uuid = { path = "../../lib/speedy-uuid" }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -6,14 +6,14 @@ version.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.79"
async-trait = "0.1.80"
eyre = "0.6.12"
http-body-util = "0.1.1"
http-compat = { path = "../../lib/http-compat" }
hyper = { version = "1.2.0", default-features = false }
hyper = { version = "1.3.1", default-features = false }
kitsune-config = { path = "../kitsune-config" }
kitsune-http-client = { path = "../kitsune-http-client" }
metrics = "=0.22.0"
metrics = "0.22.3"
metrics-opentelemetry = { git = "https://github.com/aumetra/metrics-opentelemetry.git", rev = "95537b16370e595981e195be52f98ea5983a7a8e" }
metrics-tracing-context = "0.15.0"
metrics-util = "0.16.3"

View File

@ -8,14 +8,15 @@ license.workspace = true
[dependencies]
enum_dispatch = "0.3.13"
http = "1.1.0"
http-compat = { path = "../../lib/http-compat" }
http-body-util = "0.1.1"
kitsune-config = { path = "../kitsune-config" }
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
moka = { version = "0.12.5", features = ["future"] }
moka = { version = "0.12.7", features = ["future"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
oauth2 = { version = "5.0.0-alpha.4", default-features = false }
once_cell = "1.19.0"
openidconnect = { version = "3.5.0", default-features = false, features = [
openidconnect = { version = "4.0.0-alpha.1", default-features = false, features = [
# Accept these two, per specification invalid, cases to increase compatibility
"accept-rfc3339-timestamps",
"accept-string-booleans",
@ -24,8 +25,8 @@ redis = { version = "0.25.3", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
serde = { version = "1.0.197", features = ["derive"] }
simd-json = "0.13.9"
serde = { version = "1.0.199", features = ["derive"] }
simd-json = "0.13.10"
speedy-uuid = { path = "../../lib/speedy-uuid", features = ["serde"] }
url = "2.5.0"

View File

@ -1,22 +1,20 @@
use http::Request;
use http_compat::Compat;
use http_body_util::BodyExt;
use kitsune_http_client::Client as HttpClient;
use once_cell::sync::Lazy;
use openidconnect::{HttpRequest, HttpResponse};
static HTTP_CLIENT: Lazy<HttpClient> = Lazy::new(HttpClient::default);
#[inline]
pub async fn async_client(req: HttpRequest) -> Result<HttpResponse, kitsune_http_client::Error> {
let mut request = Request::builder()
.method(req.method.compat())
.uri(req.url.as_str());
*request.headers_mut().unwrap() = req.headers.compat();
let request = request.body(req.body.into()).unwrap();
let response = HTTP_CLIENT.execute(request).await?;
let response = HTTP_CLIENT.execute(req.map(Into::into)).await?;
Ok(HttpResponse {
status_code: response.status().compat(),
headers: response.headers().clone().compat(),
body: response.bytes().await?.to_vec(),
})
let (parts, body) = response.into_inner().into_parts();
let body = body
.collect()
.await
.map_err(kitsune_http_client::Error::new)?
.to_bytes();
Ok(HttpResponse::from_parts(parts, body.to_vec()))
}

View File

@ -13,6 +13,38 @@ use openidconnect::{
use speedy_uuid::Uuid;
use url::Url;
type OidcClient = openidconnect::Client<
openidconnect::EmptyAdditionalClaims,
openidconnect::core::CoreAuthDisplay,
openidconnect::core::CoreGenderClaim,
openidconnect::core::CoreJweContentEncryptionAlgorithm,
openidconnect::core::CoreJsonWebKey,
openidconnect::core::CoreAuthPrompt,
openidconnect::StandardErrorResponse<oauth2::basic::BasicErrorResponseType>,
openidconnect::StandardTokenResponse<
openidconnect::IdTokenFields<
openidconnect::EmptyAdditionalClaims,
openidconnect::EmptyExtraTokenFields,
openidconnect::core::CoreGenderClaim,
openidconnect::core::CoreJweContentEncryptionAlgorithm,
openidconnect::core::CoreJwsSigningAlgorithm,
>,
oauth2::basic::BasicTokenType,
>,
openidconnect::StandardTokenIntrospectionResponse<
openidconnect::EmptyExtraTokenFields,
oauth2::basic::BasicTokenType,
>,
oauth2::StandardRevocableToken,
openidconnect::StandardErrorResponse<openidconnect::RevocationErrorResponseType>,
openidconnect::EndpointSet,
openidconnect::EndpointNotSet,
openidconnect::EndpointNotSet,
openidconnect::EndpointNotSet,
openidconnect::EndpointMaybeSet,
openidconnect::EndpointMaybeSet,
>;
mod state;
pub mod http;
@ -36,7 +68,7 @@ pub struct UserInfo {
#[derive(Clone)]
pub struct OidcService {
client: CoreClient,
client: OidcClient,
login_state_store: self::state::AnyStore,
}
@ -45,7 +77,7 @@ impl OidcService {
pub async fn initialise(config: &Configuration, redirect_uri: String) -> Result<Self> {
let provider_metadata = CoreProviderMetadata::discover_async(
IssuerUrl::new(config.server_url.to_string())?,
self::http::async_client,
&self::http::async_client,
)
.await?;
@ -125,20 +157,22 @@ impl OidcService {
let token_response = self
.client
.exchange_code(AuthorizationCode::new(authorization_code))
.exchange_code(AuthorizationCode::new(authorization_code))?
.set_pkce_verifier(pkce_verifier)
.request_async(self::http::async_client)
.request_async(&self::http::async_client)
.await?;
let id_token = token_response
.id_token()
.ok_or_else(|| kitsune_error!("missing id token"))?;
let id_token_verifier = self.client.id_token_verifier();
let claims = id_token.claims(&self.client.id_token_verifier(), &nonce)?;
if let Some(expected_hash) = claims.access_token_hash() {
let actual_hash = AccessTokenHash::from_token(
token_response.access_token(),
&id_token.signing_alg()?,
id_token.signing_alg()?,
id_token.signing_key(&id_token_verifier)?,
)?;
if actual_hash != *expected_hash {

View File

@ -13,8 +13,8 @@ kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
quick-xml = { version = "0.31.0", features = ["serialize"] }
rusty-s3 = "0.5.0"
serde = { version = "1.0.197", features = ["derive"] }
typed-builder = "0.18.1"
serde = { version = "1.0.199", features = ["derive"] }
typed-builder = "0.18.2"
[dev-dependencies]
kitsune-test = { path = "../kitsune-test" }

View File

@ -6,7 +6,7 @@ version.workspace = true
license.workspace = true
[dependencies]
anyhow = "1.0.81"
anyhow = "1.0.82"
glob = "0.3.1"
rsass = "0.28.8"
tracing = { version = "0.1.40", default-features = false }

View File

@ -9,7 +9,7 @@ license.workspace = true
ignored = ["isahc"] # To make `meilisearch` builds static
[dependencies]
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
diesel_full_text_search = { version = "2.1.1", default-features = false }
enum_dispatch = "0.3.13"
@ -18,11 +18,11 @@ kitsune-config = { path = "../kitsune-config" }
kitsune-db = { path = "../kitsune-db" }
kitsune-error = { path = "../kitsune-error" }
kitsune-language = { path = "../kitsune-language" }
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
speedy-uuid = { path = "../../lib/speedy-uuid" }
strum = { version = "0.26.2", features = ["derive"] }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
# "meilisearch" feature
isahc = { version = "1.7.2", default-features = false, features = [

View File

@ -13,7 +13,7 @@ athena = { path = "../../lib/athena" }
blowocking = { path = "../../lib/blowocking" }
bytes = "1.6.0"
derive_builder = "0.20.0"
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
eyre = "0.6.12"
futures-util = "0.3.30"
@ -54,12 +54,12 @@ redis = { version = "0.25.3", default-features = false, features = [
] }
rsa = "0.9.6"
rusty-s3 = { version = "0.5.0", default-features = false }
serde = "1.0.197"
serde = "1.0.199"
smol_str = "0.2.1"
speedy-uuid = { path = "../../lib/speedy-uuid" }
tokio = { version = "1.37.0", features = ["macros", "sync"] }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
url = "2.5.0"
zxcvbn = { version = "2.2.2", default-features = false }
@ -69,7 +69,7 @@ meilisearch = ["kitsune-search/meilisearch"]
[dev-dependencies]
hex-simd = "0.8.0"
http-body-util = "0.1.1"
hyper = "1.2.0"
hyper = "1.3.1"
kitsune-activitypub = { path = "../kitsune-activitypub" }
kitsune-config = { path = "../kitsune-config" }
kitsune-federation-filter = { path = "../kitsune-federation-filter" }

View File

@ -2,6 +2,7 @@ use athena::{JobDetails, JobQueue};
use iso8601_timestamp::Timestamp;
use kitsune_error::Result;
use kitsune_jobs::{Job, KitsuneContextRepo};
use std::sync::Arc;
use typed_builder::TypedBuilder;
#[derive(TypedBuilder)]
@ -13,7 +14,7 @@ pub struct Enqueue<T> {
#[derive(Clone, TypedBuilder)]
pub struct JobService {
job_queue: JobQueue<KitsuneContextRepo>,
job_queue: Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>,
}
impl JobService {

View File

@ -111,7 +111,7 @@ mod test {
account::AccountService, attachment::AttachmentService, custom_emoji::CustomEmojiService,
job::JobService,
};
use athena::JobQueue;
use athena::RedisJobQueue;
use core::convert::Infallible;
use diesel::{QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
@ -184,13 +184,13 @@ mod test {
.build();
let context_repo = KitsuneContextRepo::builder().db_pool(db_pool.clone()).build();
let job_queue = JobQueue::builder()
let job_queue = RedisJobQueue::builder()
.context_repository(context_repo)
.queue_name("parse_mentions_test")
.redis_pool(redis_pool)
.build();
let job_service = JobService::builder().job_queue(job_queue).build();
let job_service = JobService::builder().job_queue(Arc::new(job_queue)).build();
let url_service = UrlService::builder()
.domain("example.com")

View File

@ -23,8 +23,8 @@ redis = { version = "0.25.3", default-features = false, features = [
"tokio-rustls-comp",
] }
rusty-s3 = { version = "0.5.0", default-features = false }
testcontainers = "0.15.0"
testcontainers-modules = { version = "0.3.7", features = [
testcontainers = "0.16.3"
testcontainers-modules = { version = "0.4.0", features = [
"minio",
"postgres",
"redis",

View File

@ -1,53 +1,56 @@
use testcontainers::{clients::Cli as CliClient, Container, RunnableImage};
use testcontainers::{core::ContainerAsync, runners::AsyncRunner, RunnableImage};
use testcontainers_modules::{minio::MinIO, postgres::Postgres, redis::Redis};
pub trait Service {
const PORT: u16;
fn url(&self) -> String;
async fn url(&self) -> String;
}
impl Service for Container<'_, MinIO> {
impl Service for ContainerAsync<MinIO> {
const PORT: u16 = 9000;
fn url(&self) -> String {
let port = self.get_host_port_ipv4(Self::PORT);
async fn url(&self) -> String {
let port = self.get_host_port_ipv4(Self::PORT).await;
format!("http://127.0.0.1:{port}")
}
}
impl Service for Container<'_, Postgres> {
impl Service for ContainerAsync<Postgres> {
const PORT: u16 = 5432;
fn url(&self) -> String {
let port = self.get_host_port_ipv4(Self::PORT);
async fn url(&self) -> String {
let port = self.get_host_port_ipv4(Self::PORT).await;
format!("postgres://postgres:postgres@127.0.0.1:{port}/test_db")
}
}
impl Service for Container<'_, Redis> {
impl Service for ContainerAsync<Redis> {
const PORT: u16 = 6379;
fn url(&self) -> String {
let port = self.get_host_port_ipv4(Self::PORT);
async fn url(&self) -> String {
let port = self.get_host_port_ipv4(Self::PORT).await;
format!("redis://127.0.0.1:{port}")
}
}
pub fn minio(client: &CliClient) -> impl Service + '_ {
client.run(MinIO::default())
pub async fn minio() -> impl Service {
MinIO::default().start().await
}
pub fn postgres(client: &CliClient) -> impl Service + '_ {
pub async fn postgres() -> impl Service {
let base = Postgres::default()
.with_user("postgres")
.with_password("postgres")
.with_db_name("test_db");
client.run(RunnableImage::from(base).with_tag("15-alpine"))
RunnableImage::from(base)
.with_tag("15-alpine")
.start()
.await
}
pub fn redis(client: &CliClient) -> impl Service + '_ {
pub async fn redis() -> impl Service {
#[allow(clippy::default_constructed_unit_structs)]
client.run(Redis::default())
Redis::default().start().await
}

View File

@ -38,7 +38,7 @@ where
Fut: Future,
{
let resource_handle = get_resource!("DATABASE_URL", self::container::postgres);
let mut url = Url::parse(&resource_handle.url()).unwrap();
let mut url = Url::parse(&resource_handle.url().await).unwrap();
// Create a new separate database for this test
let id = Uuid::new_v4().as_simple().to_string();
@ -85,7 +85,7 @@ where
Fut: Future,
{
let resource_handle = get_resource!("MINIO_URL", self::container::minio);
let endpoint = resource_handle.url().parse().unwrap();
let endpoint = resource_handle.url().await.parse().unwrap();
// Create a new bucket with a random ID
let bucket_id = Uuid::new_v4().as_simple().to_string();
@ -117,7 +117,7 @@ where
Fut: Future,
{
let resource_handle = get_resource!("REDIS_URL", self::container::redis);
let client = ::redis::Client::open(resource_handle.url().as_ref()).unwrap();
let client = ::redis::Client::open(resource_handle.url().await.as_ref()).unwrap();
// Connect to a random Redis database
let db_id = self::redis::find_unused_database(&client).await;

View File

@ -1,22 +1,15 @@
use crate::{catch_panic::CatchPanic, container::Service};
use std::{borrow::Cow, future::Future, panic, sync::OnceLock};
pub static CONTAINER_CLIENT: OnceLock<testcontainers::clients::Cli> = OnceLock::new();
use std::{borrow::Cow, future::Future, panic};
#[macro_export]
macro_rules! get_resource {
($env_name:literal, $container_fn:path) => {
::std::env::var($env_name).map_or_else(
|_| {
// Only initialize client if we actually need it
let client = $crate::resource::CONTAINER_CLIENT.get_or_init(|| {
::testcontainers::clients::Cli::new::<::testcontainers::core::env::Os>()
});
$crate::resource::ResourceHandle::Container($container_fn(client))
},
$crate::resource::ResourceHandle::Url,
)
if let Ok(url) = ::std::env::var($env_name) {
$crate::resource::ResourceHandle::Url(url)
} else {
let container = $container_fn().await;
$crate::resource::ResourceHandle::Container(container)
}
};
}
@ -32,9 +25,9 @@ impl<S> ResourceHandle<S>
where
S: Service,
{
pub fn url(&self) -> Cow<'_, str> {
pub async fn url(&self) -> Cow<'_, str> {
match self {
Self::Container(container) => Cow::Owned(container.url()),
Self::Container(container) => Cow::Owned(container.url().await),
Self::Url(ref url) => Cow::Borrowed(url),
}
}

View File

@ -7,8 +7,8 @@ license.workspace = true
[dependencies]
iso8601-timestamp = "0.2.17"
serde = { version = "1.0.197", features = ["derive"] }
simd-json = "0.13.9"
serde = { version = "1.0.199", features = ["derive"] }
simd-json = "0.13.10"
smol_str = { version = "0.2.1", features = ["serde"] }
speedy-uuid = { path = "../../lib/speedy-uuid", features = ["serde"] }
strum = { version = "0.26.2", features = ["derive"] }

View File

@ -8,7 +8,7 @@ license.workspace = true
[dependencies]
smol_str = "0.2.1"
speedy-uuid = { path = "../../lib/speedy-uuid" }
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[lints]
workspace = true

View File

@ -6,11 +6,11 @@ version.workspace = true
license.workspace = true
[dependencies]
bubble-bath = "0.1.2"
bubble-bath = "0.1.3"
iso8601-timestamp = "0.2.17"
kitsune-type = { path = "../kitsune-type" }
once_cell = "1.19.0"
pulldown-cmark = { version = "0.10.2", default-features = false, features = [
pulldown-cmark = { version = "0.10.3", default-features = false, features = [
"html",
"simd",
] }

View File

@ -7,7 +7,7 @@ license.workspace = true
build = "build.rs"
[dependencies]
async-trait = "0.1.79"
async-trait = "0.1.80"
color-eyre = "0.6.3"
derive_more = { version = "1.0.0-beta.6", features = ["from"] }
enum_dispatch = "0.3.13"
@ -23,15 +23,15 @@ redis = { version = "0.25.3", default-features = false, features = [
"connection-manager",
"tokio-rustls-comp",
] }
simd-json = "0.13.9"
simd-json = "0.13.10"
slab = "0.4.9"
sled = "0.34.7"
smol_str = "0.2.1"
tokio = { version = "1.37.0", features = ["fs"] }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
walkdir = "2.5.0"
wasmtime = { version = "19.0.1", default-features = false, features = [
wasmtime = { version = "20.0.0", default-features = false, features = [
"addr2line",
"async",
"component-model",
@ -40,7 +40,7 @@ wasmtime = { version = "19.0.1", default-features = false, features = [
"pooling-allocator",
"runtime",
] }
wasmtime-wasi = { version = "19.0.1", default-features = false }
wasmtime-wasi = { version = "20.0.0", default-features = false }
[dev-dependencies]
tempfile = "3.10.1"

View File

@ -111,7 +111,7 @@ impl MrfService {
let mut linker = Linker::<Context>::new(&engine);
mrf_wit::v1::Mrf::add_to_linker(&mut linker, |ctx| ctx).map_err(eyre::Report::msg)?;
wasmtime_wasi::command::add_to_linker(&mut linker).map_err(eyre::Report::msg)?;
wasmtime_wasi::add_to_linker_async(&mut linker).map_err(eyre::Report::msg)?;
Ok(Self {
engine,

View File

@ -6,7 +6,7 @@ version.workspace = true
license.workspace = true
[dependencies]
async-trait = "0.1.79"
async-trait = "0.1.80"
autometrics = { version = "1.0.1", default-features = false }
futures-util = "0.3.30"
http = "1.1.0"
@ -26,9 +26,9 @@ urlencoding = "2.1.3"
[dev-dependencies]
http-body-util = "0.1.1"
hyper = "1.2.0"
hyper = "1.3.1"
pretty_assertions = "1.4.0"
simd-json = "0.13.9"
simd-json = "0.13.10"
tokio = { version = "1.37.0", features = ["macros"] }
tower = { version = "0.4.13", default-features = false, features = ["util"] }

View File

@ -15,14 +15,14 @@ eula = false
[dependencies]
clap = { version = "4.5.4", features = ["derive", "wrap_help"] }
color-eyre = "0.6.3"
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
dotenvy = "0.15.7"
envy = "0.4.2"
kitsune-config = { path = "../crates/kitsune-config" }
kitsune-db = { path = "../crates/kitsune-db" }
kitsune-error = { path = "../crates/kitsune-error" }
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
speedy-uuid = { path = "../lib/speedy-uuid" }
tokio = { version = "1.37.0", features = ["full"] }
tracing-subscriber = "0.3.18"

View File

@ -1,20 +0,0 @@
{
"root": true,
"env": {
"browser": true,
"es2021": true,
"node": true
},
"extends": [
"plugin:vue/vue3-recommended",
"eslint:recommended",
"@vue/typescript/recommended",
"@vue/prettier"
],
"ignorePatterns": ["src/graphql/types/**"],
"parserOptions": {
"ecmaVersion": 2021
},
"plugins": [],
"rules": {}
}

View File

@ -0,0 +1,20 @@
import { FlatCompat } from '@eslint/eslintrc';
import js from '@eslint/js';
import eslintVue from 'eslint-plugin-vue';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
const compat = new FlatCompat({
baseDirectory: path.dirname(fileURLToPath(import.meta.url)),
recommendedConfig: js.configs.recommended,
});
export default [
...eslintVue.configs['flat/recommended'],
...compat.extends('@vue/eslint-config-prettier/skip-formatting'),
...compat.extends('@vue/eslint-config-typescript/recommended'),
{
ignores: ['.gitignore', 'src/graphql/types/**'],
},
];

View File

@ -8,64 +8,64 @@
"dev": "vite",
"build": "vue-tsc && vite build",
"preview": "vite preview",
"lint": "eslint --ext .js,.vue,.ts --ignore-path .gitignore src",
"lint": "eslint src",
"format": "prettier . --write"
},
"dependencies": {
"@fluent/bundle": "^0.18.0",
"@formkit/core": "^1.6.0",
"@formkit/validation": "^1.6.0",
"@formkit/vue": "^1.6.0",
"@fortawesome/fontawesome-svg-core": "^6.5.1",
"@fortawesome/free-solid-svg-icons": "^6.5.1",
"@formkit/core": "^1.6.2",
"@formkit/validation": "^1.6.2",
"@formkit/vue": "^1.6.2",
"@fortawesome/fontawesome-svg-core": "^6.5.2",
"@fortawesome/free-solid-svg-icons": "^6.5.2",
"@fortawesome/vue-fontawesome": "^3.0.6",
"@hcaptcha/vue3-hcaptcha": "^1.3.0",
"@mcaptcha/vanilla-glue": "^0.1.0-alpha-3",
"@tiptap/pm": "^2.2.4",
"@tiptap/starter-kit": "^2.2.4",
"@tiptap/vue-3": "^2.2.4",
"@urql/exchange-graphcache": "^6.5.0",
"@urql/vue": "^1.1.2",
"@tiptap/pm": "^2.3.0",
"@tiptap/starter-kit": "^2.3.0",
"@tiptap/vue-3": "^2.3.0",
"@urql/exchange-graphcache": "^7.0.1",
"@urql/vue": "^1.1.3",
"@vueuse/core": "^10.9.0",
"@zxcvbn-ts/core": "^3.0.4",
"@zxcvbn-ts/language-common": "^3.0.4",
"@zxcvbn-ts/language-en": "^3.0.2",
"floating-vue": "^5.2.2",
"fluent-vue": "^3.5.0",
"fluent-vue": "^3.5.2",
"graphql": "^16.8.1",
"lodash": "^4.17.21",
"pinia": "^2.1.7",
"pinia-plugin-persistedstate": "^3.2.1",
"rollup": "npm:@rollup/wasm-node",
"tiptap-markdown": "^0.8.10",
"unhead": "^1.8.20",
"vue": "^3.4.21",
"unhead": "^1.9.7",
"vue": "^3.4.23",
"vue-powerglitch": "^1.0.0",
"vue-router": "^4.3.0",
"vue-router": "^4.3.2",
"vue-virtual-scroller": "^2.0.0-beta.8"
},
"devDependencies": {
"@graphql-codegen/cli": "^5.0.2",
"@graphql-codegen/client-preset": "^4.2.4",
"@graphql-codegen/client-preset": "^4.2.5",
"@parcel/watcher": "^2.4.1",
"@trivago/prettier-plugin-sort-imports": "^4.3.0",
"@types/lodash": "^4.17.0",
"@typescript-eslint/eslint-plugin": "^7.3.1",
"@typescript-eslint/parser": "^7.3.1",
"@typescript-eslint/eslint-plugin": "^7.7.0",
"@typescript-eslint/parser": "^7.7.0",
"@vitejs/plugin-vue": "^5.0.4",
"@vue/eslint-config-prettier": "^9.0.0",
"@vue/eslint-config-typescript": "^13.0.0",
"eslint": "^8.57.0",
"eslint": "^9.1.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-prettier": "^5.1.3",
"eslint-plugin-vue": "^9.23.0",
"eslint-plugin-vue": "^9.25.0",
"prettier": "^3.2.5",
"prettier-plugin-css-order": "^2.0.1",
"sass": "^1.72.0",
"typescript": "^5.4.3",
"unplugin-fluent-vue": "^1.2.0",
"vite": "^5.2.6",
"vue-tsc": "^2.0.7"
"prettier-plugin-css-order": "^2.1.2",
"sass": "^1.75.0",
"typescript": "^5.4.5",
"unplugin-fluent-vue": "^1.3.0",
"vite": "^5.2.10",
"vue-tsc": "^2.0.13"
},
"resolutions": {
"rollup": "npm:@rollup/wasm-node"

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,7 @@ license = false
eula = false
[dependencies]
athena = { path = "../lib/athena" }
athena = { path = "../lib/athena", features = ["redis"] }
clap = { version = "4.5.4", features = ["derive", "wrap_help"] }
color-eyre = "0.6.3"
just-retry = { path = "../lib/just-retry" }
@ -28,7 +28,7 @@ kitsune-observability = { path = "../crates/kitsune-observability" }
kitsune-service = { path = "../crates/kitsune-service" }
kitsune-url = { path = "../crates/kitsune-url" }
kitsune-wasm-mrf = { path = "../crates/kitsune-wasm-mrf" }
mimalloc = "0.1.39"
mimalloc = "0.1.41"
multiplex-pool = { path = "../lib/multiplex-pool" }
redis = { version = "0.25.3", default-features = false, features = [
"aio",
@ -37,7 +37,7 @@ redis = { version = "0.25.3", default-features = false, features = [
] }
tokio = { version = "1.37.0", features = ["full"] }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[features]

View File

@ -1,7 +1,7 @@
#[macro_use]
extern crate tracing;
use athena::{JobQueue, TaskTracker};
use athena::{JobQueue, RedisJobQueue, TaskTracker};
use just_retry::RetryExt;
use kitsune_config::job_queue::Configuration;
use kitsune_db::PgPool;
@ -37,7 +37,7 @@ pub struct JobDispatcherState {
pub async fn prepare_job_queue(
db_pool: PgPool,
config: &Configuration,
) -> RedisResult<JobQueue<KitsuneContextRepo>> {
) -> RedisResult<Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>> {
let context_repo = KitsuneContextRepo::builder().db_pool(db_pool).build();
let client = redis::Client::open(config.redis_url.as_str())?;
@ -48,18 +48,18 @@ pub async fn prepare_job_queue(
)
.await?;
let queue = JobQueue::builder()
let queue = RedisJobQueue::builder()
.context_repository(context_repo)
.queue_name("kitsune-jobs")
.redis_pool(redis_pool)
.build();
Ok(queue)
Ok(Arc::new(queue))
}
#[instrument(skip(job_queue, state))]
pub async fn run_dispatcher(
job_queue: JobQueue<KitsuneContextRepo>,
job_queue: Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>,
state: JobDispatcherState,
num_job_workers: usize,
) {
@ -88,7 +88,6 @@ pub async fn run_dispatcher(
},
});
let job_queue = Arc::new(job_queue);
let job_tracker = TaskTracker::new();
job_tracker.close();
@ -99,13 +98,13 @@ pub async fn run_dispatcher(
let job_tracker = job_tracker.clone();
async move {
job_queue
.spawn_jobs(
num_job_workers - job_tracker.len(),
Arc::clone(&ctx),
&job_tracker,
)
.await
athena::spawn_jobs(
&job_queue,
num_job_workers - job_tracker.len(),
Arc::clone(&ctx),
&job_tracker,
)
.await
}
})
.retry(just_retry::backoff_policy())

View File

@ -15,13 +15,13 @@ license = false
eula = false
[dependencies]
athena = { path = "../lib/athena" }
athena = { path = "../lib/athena", features = ["redis"] }
argon2 = { version = "0.5.3", features = ["std"] }
askama = { version = "0.12.1", features = [
"with-axum",
], default-features = false }
askama_axum = "0.4.0"
async-trait = "0.1.79"
async-trait = "0.1.80"
axum = { version = "0.7.5", features = ["macros", "multipart"] }
axum-extra = { version = "0.9.3", features = [
"cookie",
@ -32,11 +32,11 @@ axum-extra = { version = "0.9.3", features = [
axum-flash = "0.8.0"
blowocking = { path = "../lib/blowocking" }
bytes = "1.6.0"
chrono = { version = "0.4.37", default-features = false }
chrono = { version = "0.4.38", default-features = false }
clap = { version = "4.5.4", features = ["derive", "wrap_help"] }
color-eyre = "0.6.3"
cursiv = { path = "../lib/cursiv", features = ["axum"] }
diesel = "2.1.5"
diesel = "2.1.6"
diesel-async = "0.4.1"
futures-util = "0.3.30"
headers = "0.4.0"
@ -68,8 +68,8 @@ kitsune-url = { path = "../crates/kitsune-url" }
kitsune-util = { path = "../crates/kitsune-util" }
kitsune-wasm-mrf = { path = "../crates/kitsune-wasm-mrf" }
kitsune-webfinger = { path = "../crates/kitsune-webfinger" }
metrics = "=0.22.0"
mimalloc = "0.1.39"
metrics = "0.22.3"
mimalloc = "0.1.41"
mime = "0.3.17"
mime_guess = { version = "2.0.4", default-features = false }
oxide-auth = "0.5.4"
@ -80,14 +80,14 @@ redis = { version = "0.25.3", default-features = false, features = [
] }
rust-embed = { version = "8.3.0", features = ["include-exclude"] }
scoped-futures = "0.1.3"
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
serde_urlencoded = "0.7.1"
simd-json = "0.13.9"
simd-json = "0.13.10"
simdutf8 = { version = "0.1.4", features = ["aarch64_neon"] }
speedy-uuid = { path = "../lib/speedy-uuid" }
strum = { version = "0.26.2", features = ["derive", "phf"] }
tempfile = "3.10.1"
time = "0.3.34"
time = "0.3.36"
tokio = { version = "1.37.0", features = ["full"] }
tokio-util = { version = "0.7.10", features = ["compat"] }
tower = { version = "0.4.13", features = ["util"] }
@ -104,7 +104,7 @@ tower-http = { version = "0.5.2", features = [
tower-http-digest = { path = "../lib/tower-http-digest" }
tracing = "0.1.40"
trials = { path = "../lib/trials" }
typed-builder = "0.18.1"
typed-builder = "0.18.2"
url = "2.5.0"
utoipa = { version = "4.2.0", features = ["axum_extras", "uuid"] }
utoipa-swagger-ui = { version = "6.0.0", features = ["axum"] }

View File

@ -67,7 +67,7 @@ pub fn routes() -> Router<Zustand> {
#[cfg(test)]
mod tests {
use super::{get, WebfingerQuery};
use athena::JobQueue;
use athena::RedisJobQueue;
use axum::{
extract::{Query, State},
Json,
@ -148,13 +148,13 @@ mod tests {
let context_repo = KitsuneContextRepo::builder()
.db_pool(db_pool.clone())
.build();
let job_queue = JobQueue::builder()
let job_queue = RedisJobQueue::builder()
.context_repository(context_repo)
.queue_name("webfinger_test")
.redis_pool(redis_pool)
.build();
let job_service = JobService::builder().job_queue(job_queue).build();
let job_service = JobService::builder().job_queue(Arc::new(job_queue)).build();
AccountService::builder()
.attachment_service(attachment_service)

View File

@ -41,6 +41,7 @@ use kitsune_service::{
};
use kitsune_url::UrlService;
use kitsune_wasm_mrf::MrfService;
use std::sync::Arc;
#[cfg(feature = "oidc")]
use {futures_util::future::OptionFuture, kitsune_oidc::OidcService};
@ -49,7 +50,7 @@ use {futures_util::future::OptionFuture, kitsune_oidc::OidcService};
pub async fn initialise_state(
config: &Configuration,
db_pool: PgPool,
job_queue: JobQueue<KitsuneContextRepo>,
job_queue: Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>,
) -> eyre::Result<Zustand> {
let url_service = UrlService::builder()
.domain(config.url.domain.clone())

View File

@ -5,33 +5,50 @@ edition.workspace = true
version.workspace = true
license = "MIT OR Apache-2.0"
[[example]]
name = "basic_queue"
required-features = ["redis"]
[dependencies]
ahash = "0.8.11"
either = { version = "1.10.0", default-features = false }
async-trait = "0.1.80"
either = { version = "1.11.0", default-features = false, optional = true }
futures-util = { version = "0.3.30", default-features = false }
iso8601-timestamp = { version = "0.2.17", features = ["diesel-pg"] }
iso8601-timestamp = "0.2.17"
just-retry = { path = "../just-retry" }
multiplex-pool = { path = "../multiplex-pool" }
once_cell = "1.19.0"
rand = "0.8.5"
multiplex-pool = { path = "../multiplex-pool", optional = true }
once_cell = { version = "1.19.0", optional = true }
rand = { version = "0.8.5", optional = true }
redis = { version = "0.25.3", default-features = false, features = [
"ahash",
"connection-manager",
"script",
"streams",
"tokio-comp",
] }
serde = { version = "1.0.197", features = ["derive"] }
simd-json = "0.13.9"
], optional = true }
serde = { version = "1.0.199", features = ["derive"] }
simd-json = { version = "0.13.10", optional = true }
smol_str = "0.2.1"
speedy-uuid = { path = "../speedy-uuid", features = ["redis", "serde"] }
thiserror = "1.0.58"
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["macros", "rt", "sync"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
typetag = "0.2.16"
[features]
redis = [
"dep:either",
"dep:multiplex-pool",
"dep:once_cell",
"dep:rand",
"dep:redis",
"dep:simd-json",
]
[dev-dependencies]
kitsune-test = { path = "../../crates/kitsune-test" }
redis = { version = "0.25.3", features = ["connection-manager"] }
tracing-subscriber = "0.3.18"

View File

@ -1,4 +1,4 @@
use athena::{JobContextRepository, JobDetails, JobQueue, Runnable};
use athena::{JobContextRepository, JobDetails, JobQueue, RedisJobQueue, Runnable};
use futures_util::{
stream::{self, BoxStream},
StreamExt,
@ -71,7 +71,7 @@ async fn main() {
.await
.unwrap();
let queue = JobQueue::builder()
let queue = RedisJobQueue::builder()
.context_repository(ContextRepo)
.queue_name("test_queue")
.redis_pool(pool)
@ -102,7 +102,7 @@ async fn main() {
loop {
if tokio::time::timeout(
Duration::from_secs(5),
queue.spawn_jobs(20, Arc::new(()), &jobs),
athena::spawn_jobs(&queue, 20, Arc::new(()), &jobs),
)
.await
.is_err()

100
lib/athena/src/common.rs Normal file
View File

@ -0,0 +1,100 @@
use crate::{
consts::MIN_IDLE_TIME,
error::{Error, Result},
JobContextRepository, JobData, JobQueue, JobResult, Outcome, Runnable,
};
use ahash::AHashMap;
use futures_util::TryStreamExt;
use just_retry::RetryExt;
use speedy_uuid::Uuid;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use tokio_util::task::TaskTracker;
type ContextFor<Queue> =
<<<Queue as JobQueue>::ContextRepository as JobContextRepository>::JobContext as Runnable>::Context;
pub async fn spawn_jobs<Q>(
queue: &Q,
max_jobs: usize,
run_ctx: Arc<ContextFor<Q>>,
task_tracker: &TaskTracker,
) -> Result<()>
where
Q: JobQueue + Clone,
{
let job_data = queue.fetch_job_data(max_jobs).await?;
let job_ids: Vec<Uuid> = job_data.iter().map(|data| data.job_id).collect();
let context_stream = queue
.context_repository()
.fetch_context(job_ids.into_iter())
.await
.map_err(|err| Error::ContextRepository(err.into()))?;
tokio::pin!(context_stream);
// Collect all the job data into a hashmap indexed by the job ID
// This is because we don't enforce an ordering with the batch fetching
let job_data = job_data
.into_iter()
.map(|data| (data.job_id, data))
.collect::<AHashMap<Uuid, JobData>>();
let job_data = Arc::new(job_data);
while let Some((job_id, job_ctx)) = context_stream
.try_next()
.await
.map_err(|err| Error::ContextRepository(err.into()))?
{
let queue = queue.clone();
let job_data = Arc::clone(&job_data);
let run_ctx = Arc::clone(&run_ctx);
task_tracker.spawn(async move {
let job_data = &job_data[&job_id];
let run_fut = job_ctx.run(&run_ctx);
tokio::pin!(run_fut);
let tick_period = MIN_IDLE_TIME - Duration::from_secs(2 * 60);
let mut tick_interval =
tokio::time::interval_at(Instant::now() + tick_period, tick_period);
let result = loop {
tokio::select! {
result = &mut run_fut => break result,
_ = tick_interval.tick() => {
(|| queue.reclaim_job(job_data))
.retry(just_retry::backoff_policy())
.await
.expect("Failed to reclaim job");
}
}
};
let job_state = if let Err(error) = result {
error!(error = ?error.into(), "Failed run job");
JobResult {
outcome: Outcome::Fail {
fail_count: job_data.fail_count,
},
job_id,
ctx: &job_data.ctx,
}
} else {
JobResult {
outcome: Outcome::Success,
job_id,
ctx: &job_data.ctx,
}
};
(|| queue.complete_job(&job_state))
.retry(just_retry::backoff_policy())
.await
.expect("Failed to mark job as completed");
});
}
Ok(())
}

5
lib/athena/src/consts.rs Normal file
View File

@ -0,0 +1,5 @@
use std::time::Duration;
pub const BLOCK_TIME: Duration = Duration::from_secs(2);
pub const MAX_RETRIES: u32 = 10;
pub const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60);

View File

@ -9,9 +9,11 @@ pub enum Error {
#[error(transparent)]
ContextRepository(BoxError),
#[cfg(feature = "redis")]
#[error(transparent)]
Redis(#[from] redis::RedisError),
#[cfg(feature = "redis")]
#[error(transparent)]
SimdJson(#[from] simd_json::Error),

View File

@ -2,20 +2,170 @@
extern crate tracing;
use self::error::{BoxError, Result};
use async_trait::async_trait;
use futures_util::{Future, Stream};
use iso8601_timestamp::Timestamp;
use serde::{Deserialize, Serialize};
use speedy_uuid::Uuid;
pub use self::{
error::Error,
queue::{JobDetails, JobQueue},
use std::{
any::{Any, TypeId},
sync::Arc,
};
use typed_builder::TypedBuilder;
pub use self::error::Error;
pub use tokio_util::task::TaskTracker;
pub use self::common::spawn_jobs;
#[cfg(feature = "redis")]
pub use self::redis::JobQueue as RedisJobQueue;
mod common;
mod error;
mod macros;
mod queue;
#[cfg(feature = "redis")]
mod redis;
type RedisPool = multiplex_pool::Pool<redis::aio::ConnectionManager>;
pub mod consts;
#[derive(TypedBuilder)]
#[non_exhaustive]
pub struct JobDetails<C> {
#[builder(setter(into))]
pub context: C,
#[builder(default)]
pub fail_count: u32,
#[builder(default = Uuid::now_v7(), setter(into))]
pub job_id: Uuid,
#[builder(default, setter(into))]
pub run_at: Option<Timestamp>,
}
#[typetag::serde]
pub trait Keepable: Any + Send + Sync + 'static {}
// Hack around <https://github.com/rust-lang/rust/issues/65991> because it's not stable yet.
// So I had to implement trait downcasting myself.
//
// TODO: Remove this once <https://github.com/rust-lang/rust/issues/65991> is stabilized.
#[inline]
fn downcast_to<T>(obj: &dyn Keepable) -> Option<&T>
where
T: 'static,
{
if obj.type_id() == TypeId::of::<T>() {
#[allow(unsafe_code)]
// SAFETY: the `TypeId` equality check ensures this type cast is correct
Some(unsafe { &*(obj as *const dyn Keepable).cast::<T>() })
} else {
None
}
}
#[typetag::serde]
impl Keepable for String {}
#[derive(Deserialize, Serialize)]
#[serde(transparent)]
pub struct KeeperOfTheSecrets {
inner: Option<Box<dyn Keepable>>,
}
impl KeeperOfTheSecrets {
#[inline]
#[must_use]
pub fn empty() -> Self {
Self { inner: None }
}
#[inline]
pub fn new<T>(inner: T) -> Self
where
T: Keepable,
{
Self {
inner: Some(Box::new(inner)),
}
}
#[inline]
#[must_use]
pub fn get<T>(&self) -> Option<&T>
where
T: 'static,
{
self.inner
.as_ref()
.and_then(|item| downcast_to(item.as_ref()))
}
}
pub enum Outcome {
Success,
Fail { fail_count: u32 },
}
pub struct JobResult<'a> {
pub outcome: Outcome,
pub job_id: Uuid,
pub ctx: &'a KeeperOfTheSecrets,
}
#[derive(Deserialize, Serialize)]
pub struct JobData {
pub job_id: Uuid,
pub fail_count: u32,
pub ctx: KeeperOfTheSecrets,
}
#[async_trait]
pub trait JobQueue: Send + Sync + 'static {
type ContextRepository: JobContextRepository + 'static;
fn context_repository(&self) -> &Self::ContextRepository;
async fn enqueue(
&self,
job_details: JobDetails<<Self::ContextRepository as JobContextRepository>::JobContext>,
) -> Result<()>;
async fn fetch_job_data(&self, max_jobs: usize) -> Result<Vec<JobData>>;
async fn reclaim_job(&self, job_data: &JobData) -> Result<()>;
async fn complete_job(&self, state: &JobResult<'_>) -> Result<()>;
}
#[async_trait]
impl<CR> JobQueue for Arc<dyn JobQueue<ContextRepository = CR> + '_>
where
CR: JobContextRepository + 'static,
{
type ContextRepository = CR;
fn context_repository(&self) -> &Self::ContextRepository {
(**self).context_repository()
}
async fn enqueue(
&self,
job_details: JobDetails<<Self::ContextRepository as JobContextRepository>::JobContext>,
) -> Result<()> {
(**self).enqueue(job_details).await
}
async fn fetch_job_data(&self, max_jobs: usize) -> Result<Vec<JobData>> {
(**self).fetch_job_data(max_jobs).await
}
async fn reclaim_job(&self, job_data: &JobData) -> Result<()> {
(**self).reclaim_job(job_data).await
}
async fn complete_job(&self, state: &JobResult<'_>) -> Result<()> {
(**self).complete_job(state).await
}
}
pub trait Runnable {
/// User-defined context that is getting passed to the job when run
@ -35,7 +185,7 @@ pub trait JobContextRepository {
/// To support multiple job types per repository, consider using the enum dispatch technique
type JobContext: Runnable + Send + Sync + 'static;
type Error: Into<BoxError>;
type Stream: Stream<Item = Result<(Uuid, Self::JobContext), Self::Error>>;
type Stream: Stream<Item = Result<(Uuid, Self::JobContext), Self::Error>> + Send;
/// Batch fetch job contexts
///

View File

@ -1,90 +1,31 @@
use self::{scheduled::ScheduledJobActor, util::StreamAutoClaimReply};
use crate::{error::Result, impl_to_redis_args, Error, JobContextRepository, RedisPool, Runnable};
use ahash::AHashMap;
use crate::{
consts::{BLOCK_TIME, MAX_RETRIES, MIN_IDLE_TIME},
error::Result,
Error, JobContextRepository, JobData, JobDetails, JobResult, KeeperOfTheSecrets, Outcome,
};
use async_trait::async_trait;
use either::Either;
use futures_util::StreamExt;
use iso8601_timestamp::Timestamp;
use just_retry::{
retry_policies::{policies::ExponentialBackoff, Jitter},
JustRetryPolicy, RetryExt, StartTime,
JustRetryPolicy, StartTime,
};
use redis::{
aio::ConnectionLike,
streams::{StreamReadOptions, StreamReadReply},
AsyncCommands, RedisResult,
};
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use speedy_uuid::Uuid;
use std::{
ops::ControlFlow,
pin::pin,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{sync::OnceCell, time::Instant};
use tokio_util::task::TaskTracker;
use std::{ops::ControlFlow, str::FromStr, sync::Arc, time::SystemTime};
use tokio::sync::OnceCell;
use typed_builder::TypedBuilder;
mod scheduled;
mod util;
const BLOCK_TIME: Duration = Duration::from_secs(2);
const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60);
const MAX_RETRIES: u32 = 10;
enum JobState<'a> {
Succeeded {
job_id: Uuid,
stream_id: &'a str,
},
Failed {
fail_count: u32,
job_id: Uuid,
stream_id: &'a str,
},
}
impl JobState<'_> {
fn job_id(&self) -> Uuid {
match self {
Self::Succeeded { job_id, .. } | Self::Failed { job_id, .. } => *job_id,
}
}
fn stream_id(&self) -> &str {
match self {
Self::Succeeded { stream_id, .. } | Self::Failed { stream_id, .. } => stream_id,
}
}
}
#[derive(TypedBuilder)]
pub struct JobDetails<C> {
#[builder(setter(into))]
context: C,
#[builder(default)]
fail_count: u32,
#[builder(default = Uuid::now_v7(), setter(into))]
job_id: Uuid,
#[builder(default, setter(into))]
run_at: Option<Timestamp>,
}
struct JobData {
stream_id: String,
meta: JobMeta,
}
impl_to_redis_args! {
#[derive(Deserialize, Serialize)]
struct JobMeta {
job_id: Uuid,
fail_count: u32,
}
}
type Pool = multiplex_pool::Pool<redis::aio::ConnectionManager>;
#[derive(TypedBuilder)]
pub struct JobQueue<CR> {
@ -98,7 +39,7 @@ pub struct JobQueue<CR> {
max_retries: u32,
#[builder(setter(into))]
queue_name: SmolStr,
redis_pool: RedisPool,
redis_pool: Pool,
#[builder(default = SmolStr::from(format!("{queue_name}:scheduled")))]
scheduled_queue_name: SmolStr,
@ -150,7 +91,7 @@ where
fn enqueue_redis_cmd(
&self,
job_meta: &JobMeta,
job_meta: &JobData,
run_at: Option<Timestamp>,
) -> Result<redis::Cmd> {
let cmd = if let Some(run_at) = run_at {
@ -162,36 +103,53 @@ where
)
} else {
let mut cmd = redis::cmd("XADD");
cmd.arg(self.queue_name.as_str()).arg("*").arg(job_meta);
cmd.arg(self.queue_name.as_str())
.arg("*")
.arg("job_id")
.arg(job_meta.job_id)
.arg("fail_count")
.arg(job_meta.fail_count);
cmd
};
Ok(cmd)
}
}
pub async fn enqueue(&self, job_details: JobDetails<CR::JobContext>) -> Result<()> {
let job_meta = JobMeta {
#[async_trait]
impl<CR> crate::JobQueue for JobQueue<CR>
where
CR: JobContextRepository + Send + Sync + 'static,
{
type ContextRepository = CR;
#[inline]
fn context_repository(&self) -> &Self::ContextRepository {
&self.context_repository
}
async fn enqueue(&self, job_details: JobDetails<CR::JobContext>) -> Result<()> {
let job_data = JobData {
job_id: job_details.job_id,
fail_count: job_details.fail_count,
ctx: KeeperOfTheSecrets::empty(),
};
self.context_repository
.store_context(job_meta.job_id, job_details.context)
.store_context(job_data.job_id, job_details.context)
.await
.map_err(|err| Error::ContextRepository(err.into()))?;
let mut redis_conn = self.redis_pool.get();
self.enqueue_redis_cmd(&job_meta, job_details.run_at)?
self.enqueue_redis_cmd(&job_data, job_details.run_at)?
.query_async(&mut redis_conn)
.await?;
Ok(())
}
async fn fetch_job_data(
&self,
max_jobs: usize,
) -> Result<impl Iterator<Item = JobData> + Clone> {
async fn fetch_job_data(&self, max_jobs: usize) -> Result<Vec<JobData>> {
let mut redis_conn = self.redis_pool.get();
self.initialise_group(&mut redis_conn).await?;
@ -231,23 +189,31 @@ where
Either::Right(claimed_ids.into_iter().chain(read_ids))
};
let job_data_iterator = claimed_ids.map(|id| {
let job_id: String =
redis::from_redis_value(&id.map["job_id"]).expect("[Bug] Malformed Job ID");
let job_id = Uuid::from_str(&job_id).expect("[Bug] Job ID is not a UUID");
let fail_count: u32 =
redis::from_redis_value(&id.map["fail_count"]).expect("[Bug] Malformed fail count");
let job_data = claimed_ids
.map(|id| {
let job_id: String =
redis::from_redis_value(&id.map["job_id"]).expect("[Bug] Malformed Job ID");
let job_id = Uuid::from_str(&job_id).expect("[Bug] Job ID is not a UUID");
let fail_count: u32 = redis::from_redis_value(&id.map["fail_count"])
.expect("[Bug] Malformed fail count");
JobData {
stream_id: id.id,
meta: JobMeta { job_id, fail_count },
}
});
JobData {
ctx: KeeperOfTheSecrets::new(id.id),
job_id,
fail_count,
}
})
.collect();
Ok(job_data_iterator)
Ok(job_data)
}
async fn complete_job(&self, state: &JobState<'_>) -> Result<()> {
async fn complete_job(&self, state: &JobResult<'_>) -> Result<()> {
let stream_id = state
.ctx
.get::<String>()
.expect("[Bug] Not a string in the context");
let mut pipeline = redis::pipe();
pipeline
.atomic()
@ -255,28 +221,27 @@ where
.xack(
self.queue_name.as_str(),
self.consumer_group.as_str(),
&[state.stream_id()],
&[stream_id],
)
.xdel(self.queue_name.as_str(), &[state.stream_id()]);
.xdel(self.queue_name.as_str(), &[stream_id]);
let remove_context = match state {
JobState::Failed {
fail_count, job_id, ..
} => {
let remove_context = match state.outcome {
Outcome::Fail { fail_count } => {
let backoff = ExponentialBackoff::builder()
.jitter(Jitter::Bounded)
.build_with_max_retries(self.max_retries);
if let ControlFlow::Continue(delta) =
backoff.should_retry(StartTime::Irrelevant, *fail_count)
backoff.should_retry(StartTime::Irrelevant, fail_count)
{
let job_meta = JobMeta {
job_id: *job_id,
let job_data = JobData {
job_id: state.job_id,
fail_count: fail_count + 1,
ctx: KeeperOfTheSecrets::empty(),
};
let backoff_timestamp = Timestamp::from(SystemTime::now() + delta);
let enqueue_cmd = self.enqueue_redis_cmd(&job_meta, Some(backoff_timestamp))?;
let enqueue_cmd = self.enqueue_redis_cmd(&job_data, Some(backoff_timestamp))?;
pipeline.add_command(enqueue_cmd);
@ -285,7 +250,7 @@ where
true // We hit the maximum amount of retries, we won't re-enqueue the job, so we can just remove the context
}
}
JobState::Succeeded { .. } => true, // Execution succeeded, we don't need the context anymore
Outcome::Success => true, // Execution succeeded, we don't need the context anymore
};
{
@ -295,7 +260,7 @@ where
if remove_context {
self.context_repository
.remove_context(state.job_id())
.remove_context(state.job_id)
.await
.map_err(|err| Error::ContextRepository(err.into()))?;
}
@ -304,94 +269,23 @@ where
}
async fn reclaim_job(&self, job_data: &JobData) -> Result<()> {
let stream_id = job_data
.ctx
.get::<String>()
.expect("[Bug] Not a string in the context");
let mut conn = self.redis_pool.get();
conn.xclaim(
self.queue_name.as_str(),
self.consumer_group.as_str(),
self.consumer_name.as_str(),
0,
&[job_data.stream_id.as_str()],
&[stream_id],
)
.await?;
Ok(())
}
pub async fn spawn_jobs(
&self,
max_jobs: usize,
run_ctx: Arc<<CR::JobContext as Runnable>::Context>,
join_set: &TaskTracker,
) -> Result<()> {
let job_data = self.fetch_job_data(max_jobs).await?;
let context_stream = self
.context_repository
.fetch_context(job_data.clone().map(|data| data.meta.job_id))
.await
.map_err(|err| Error::ContextRepository(err.into()))?;
tokio::pin!(context_stream);
// Collect all the job data into a hashmap indexed by the job ID
// This is because we don't enforce an ordering with the batch fetching
let job_data = job_data
.map(|data| (data.meta.job_id, data))
.collect::<AHashMap<Uuid, JobData>>();
let job_data = Arc::new(job_data);
while let Some((job_id, job_ctx)) = context_stream
.next()
.await
.transpose()
.map_err(|err| Error::ContextRepository(err.into()))?
{
let this = self.clone();
let job_data = Arc::clone(&job_data);
let run_ctx = Arc::clone(&run_ctx);
join_set.spawn(async move {
let job_data = &job_data[&job_id];
let mut run_fut = pin!(job_ctx.run(&run_ctx));
let tick_period = MIN_IDLE_TIME - Duration::from_secs(2 * 60);
let mut tick_interval =
tokio::time::interval_at(Instant::now() + tick_period, tick_period);
let result = loop {
tokio::select! {
result = &mut run_fut => break result,
_ = tick_interval.tick() => {
(|| this.reclaim_job(job_data))
.retry(just_retry::backoff_policy())
.await
.expect("Failed to reclaim job");
}
}
};
let job_state = if let Err(error) = result {
error!(error = ?error.into(), "Failed run job");
JobState::Failed {
fail_count: job_data.meta.fail_count,
job_id,
stream_id: &job_data.stream_id,
}
} else {
JobState::Succeeded {
job_id,
stream_id: &job_data.stream_id,
}
};
(|| this.complete_job(&job_state))
.retry(just_retry::backoff_policy())
.await
.expect("Failed to mark job as completed");
});
}
Ok(())
}
}
impl<CR> Clone for JobQueue<CR> {

View File

@ -1,4 +1,5 @@
use crate::{error::Result, RedisPool};
use super::Pool;
use crate::error::Result;
use once_cell::sync::Lazy;
use rand::Rng;
use redis::Script;
@ -15,7 +16,7 @@ static SCHEDULE_SCRIPT: Lazy<Script> =
#[derive(TypedBuilder)]
pub struct ScheduledJobActor {
redis_pool: RedisPool,
redis_pool: Pool,
scheduled_queue_name: SmolStr,
queue_name: SmolStr,
}

89
lib/athena/tests/redis.rs Normal file
View File

@ -0,0 +1,89 @@
#![cfg(feature = "redis")]
use athena::{JobContextRepository, JobDetails, JobQueue, RedisJobQueue, Runnable};
use futures_util::{
stream::{self, BoxStream},
StreamExt,
};
use kitsune_test::redis_test;
use speedy_uuid::Uuid;
use std::{
convert::Infallible,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio_util::task::TaskTracker;
static DID_RUN: AtomicBool = AtomicBool::new(false);
#[derive(Clone)]
struct JobCtx;
impl Runnable for JobCtx {
type Context = ();
type Error = Infallible;
async fn run(&self, _ctx: &Self::Context) -> Result<(), Self::Error> {
DID_RUN.store(true, Ordering::Release);
Ok(())
}
}
struct ContextRepo;
impl JobContextRepository for ContextRepo {
type JobContext = JobCtx;
type Error = Infallible;
type Stream = BoxStream<'static, Result<(Uuid, Self::JobContext), Self::Error>>;
async fn fetch_context<I>(&self, job_ids: I) -> Result<Self::Stream, Self::Error>
where
I: Iterator<Item = Uuid> + Send,
{
let vec: Vec<_> = job_ids.collect();
let stream = stream::iter(vec).map(|id| Ok((id, JobCtx)));
Ok(stream.boxed())
}
async fn remove_context(&self, _job_id: Uuid) -> Result<(), Self::Error> {
Ok(())
}
async fn store_context(
&self,
_job_id: Uuid,
_context: Self::JobContext,
) -> Result<(), Self::Error> {
Ok(())
}
}
#[tokio::test]
async fn basic_schedule() {
redis_test(|pool| async move {
let queue = RedisJobQueue::builder()
.context_repository(ContextRepo)
.queue_name("test_queue")
.redis_pool(pool)
.build();
queue
.enqueue(JobDetails::builder().context(JobCtx).build())
.await
.unwrap();
let jobs = TaskTracker::new();
jobs.close();
athena::spawn_jobs(&queue, 1, Arc::new(()), &jobs)
.await
.unwrap();
jobs.wait().await;
assert!(DID_RUN.load(Ordering::Acquire));
})
.await;
}

View File

@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
once_cell = "1.19.0"
rayon = "1.10.0"
thiserror = "1.0.58"
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["sync"] }
tracing = "0.1.40"

View File

@ -17,7 +17,7 @@ tower = { version = "0.4.13", default-features = false }
zeroize = { version = "1.7.0", features = ["derive"] }
# `axum` feature
async-trait = { version = "0.1.79", optional = true }
async-trait = { version = "0.1.80", optional = true }
axum-core = { version = "0.4.3", optional = true }
[dev-dependencies]

View File

@ -6,19 +6,19 @@ version.workspace = true
license = "MIT OR Apache-2.0"
[dependencies]
async-trait = "0.1.79"
hickory-resolver = { version = "0.24.0", features = ["dns-over-rustls"] }
async-trait = "0.1.80"
hickory-resolver = { version = "0.24.1", features = ["dns-over-rustls"] }
rand = "0.8.5"
serde = { version = "1.0.197", features = ["derive"] }
serde = { version = "1.0.199", features = ["derive"] }
simdutf8 = { version = "0.1.4", features = ["aarch64_neon"] }
thiserror = "1.0.58"
thiserror = "1.0.59"
tracing = "0.1.40"
typed-builder = "0.18.1"
typed-builder = "0.18.2"
[dev-dependencies]
insta = { version = "1.38.0", features = ["json"] }
rand_xorshift = "0.3.0"
serde_json = "1.0.115"
serde_json = "1.0.116"
tokio = { version = "1.37.0", features = ["macros", "rt"] }
[lints]

View File

@ -31,13 +31,12 @@ miette = "7.2.0"
pkcs8 = { version = "0.10.2", features = ["pem", "std"] }
ring = { version = "0.17.8", features = ["std"] }
scoped-futures = { version = "0.1.3", default-features = false }
thiserror = "1.0.58"
thiserror = "1.0.59"
tick-tock-mock = { path = "../tick-tock-mock" }
tracing = { version = "0.1.40", default-features = false, optional = true }
[dev-dependencies]
criterion = "0.5.1"
proptest = { version = "1.4.0", default-features = false, features = ["std"] }
tokio = { version = "1.37.0", features = ["macros", "rt"] }
[features]

View File

@ -116,23 +116,3 @@ where
Ok(())
}
#[cfg(test)]
mod test {
use super::is_subset;
use proptest::{prop_assert_eq, proptest};
use std::collections::HashSet;
proptest! {
#[test]
fn subset_behaves_equal(left: HashSet<String>, right: HashSet<String>) {
let vec_left = left.iter().collect::<Vec<_>>();
let vec_right = right.iter().collect::<Vec<_>>();
let slice_subset = is_subset(&vec_left, &vec_right);
let set_subset = left.is_subset(&right);
prop_assert_eq!(slice_subset, set_subset);
}
}
}

View File

@ -6,7 +6,7 @@ version.workspace = true
license = "MIT OR Apache-2.0"
[dependencies]
chrono = { version = "0.4.37", default-features = false, features = ["std"] }
chrono = { version = "0.4.38", default-features = false, features = ["std"] }
retry-policies = "0.3.0"
tokio = { version = "1.37.0", features = ["time"] }
tracing = "0.1.40"

View File

@ -22,7 +22,7 @@ std = []
[dev-dependencies]
criterion = "0.5.1"
time = "0.3.34"
time = "0.3.36"
uuid = { version = "1.8.0", features = ["v7"] }
[lints]

View File

@ -10,16 +10,16 @@ leb128 = { version = "0.2.5", optional = true }
olpc-cjson = { version = "0.1.3", optional = true }
schemars = { version = "0.8.16", features = ["impl_json_schema", "semver"] }
semver = { version = "1.0.22", features = ["serde"] }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = { version = "1.0.115", optional = true }
thiserror = { version = "1.0.58", optional = true }
wasm-encoder = { version = "0.202.0", optional = true }
wasmparser = { version = "0.202.0", optional = true }
serde = { version = "1.0.199", features = ["derive"] }
serde_json = { version = "1.0.116", optional = true }
thiserror = { version = "1.0.59", optional = true }
wasm-encoder = { version = "0.206.0", optional = true }
wasmparser = { version = "0.206.0", optional = true }
[dev-dependencies]
serde_json = "1.0.115"
serde_json = "1.0.116"
insta = { version = "1.38.0", default-features = false, features = ["json"] }
wat = "1.202.0"
wat = "1.206.0"
[features]
decode = ["dep:leb128", "dep:serde_json", "dep:thiserror", "dep:wasmparser"]

View File

@ -3,7 +3,7 @@
const MANIFEST: &str = include_str!("test-manifest.json");
#[test]
fn encode_works() {
fn serialise_works() {
let manifest = serde_json::from_str(MANIFEST).unwrap();
let encoded_manifest = mrf_manifest::serialise(&manifest).unwrap();
let encoded_manifest_str = String::from_utf8(encoded_manifest).unwrap();

View File

@ -14,8 +14,8 @@ mrf-manifest = { path = "../mrf-manifest", features = [
"encode",
"serialise",
] }
serde_json = "1.0.115"
wasmparser = "0.202.0"
serde_json = "1.0.116"
wasmparser = "0.206.0"
[lints]
workspace = true

View File

@ -7,13 +7,13 @@ license = "MIT OR Apache-2.0"
[dependencies]
async-graphql = { version = "7.0.3", default-features = false, optional = true }
diesel = { version = "2.1.5", features = [
diesel = { version = "2.1.6", features = [
"postgres_backend",
"uuid",
], optional = true }
redis = { version = "0.25.3", default-features = false, optional = true }
serde = { version = "1.0.197", optional = true }
thiserror = "1.0.58"
serde = { version = "1.0.199", optional = true }
thiserror = "1.0.59"
uuid = { version = "1.8.0", features = ["fast-rng", "v7"] }
uuid-simd = { version = "0.8.0", features = ["uuid"] }

View File

@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
base64-simd = "0.8.0"
bytes = "1.6.0"
either = { version = "1.10.0", default-features = false }
either = { version = "1.11.0", default-features = false }
http = "1.1.0"
http-body = "1.0.0"
memchr = "2.7.2"

View File

@ -6,7 +6,7 @@ version.workspace = true
license = "MIT OR Apache-2.0"
[dependencies]
either = "1.10.0"
either = "1.11.0"
http = "1.1.0"
once_cell = "1.19.0"
regex = "1.10.4"

View File

@ -9,9 +9,9 @@ license = "MIT OR Apache-2.0"
proc-macro = true
[dependencies]
proc-macro2 = "1.0.79"
quote = "1.0.35"
syn = { version = "2.0.58", features = ["full", "visit-mut"] }
proc-macro2 = "1.0.81"
quote = "1.0.36"
syn = { version = "2.0.60", features = ["full", "visit-mut"] }
[lints]
workspace = true

View File

@ -6,7 +6,7 @@ license.workspace = true
publish = false
[dependencies]
anyhow = "1.0.81"
anyhow = "1.0.82"
argh = "0.1.12"
glob = "0.3.1"
taplo = { version = "0.13.0", default-features = false }