Rework error representation (#517)

* start error repr rework

* add kitsune-error crate

* progress

* rm unused dependencies

* progress

* progress

* progress

* progress

* finish

* fix checks

* fix tests

* enable serde for garde

* Add codecov token

* remove some more error enums

* remove another enum

* remove httperror

* improve somewhat

* make it possible to return responses as the 'body'

* use try-block polyfills everywhere
This commit is contained in:
Aumetra Weisman 2024-04-08 00:59:17 +02:00
parent dc2101927d
commit 225e81b80a
192 changed files with 1389 additions and 2430 deletions

View File

@ -48,7 +48,8 @@ jobs:
DATABASE_URL: "postgres://postgres:postgres@localhost/test_db"
REDIS_URL: "redis://localhost"
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
files: lcov.info
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}

361
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -24,13 +24,13 @@ members = [
"crates/kitsune-db",
"crates/kitsune-email",
"crates/kitsune-embed",
"crates/kitsune-error",
"crates/kitsune-federation",
"crates/kitsune-federation-filter",
"crates/kitsune-http-client",
"crates/kitsune-jobs",
"crates/kitsune-language",
"crates/kitsune-mastodon",
"crates/kitsune-messaging",
"crates/kitsune-observability",
"crates/kitsune-oidc",
"crates/kitsune-s3",
@ -65,6 +65,8 @@ members = [
"lib/tower-http-digest",
"lib/tower-stop-using-brave",
"lib/tower-x-clacks-overhead",
"lib/trials",
"lib/trials/macros",
"xtask",
]
resolver = "2"
@ -117,4 +119,3 @@ install-updater = true
[patch.crates-io]
diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "017ebe2fb7a2709ab5db92148dea5ce812a35e09" }
tokio-postgres-rustls = { git = "https://github.com/jbg/tokio-postgres-rustls.git", rev = "b3b59ac2fa1b5823f2426fef78a0fb74c004ec38" }

View File

@ -11,7 +11,6 @@ autometrics = { version = "1.0.1", default-features = false }
base64-simd = "0.8.0"
diesel = "2.1.5"
diesel-async = "0.4.1"
eyre = "0.6.12"
futures-util = "0.3.30"
headers = "0.4.0"
http = "1.1.0"
@ -21,6 +20,7 @@ kitsune-config = { path = "../kitsune-config" }
kitsune-core = { path = "../kitsune-core" }
kitsune-db = { path = "../kitsune-db" }
kitsune-embed = { path = "../kitsune-embed" }
kitsune-error = { path = "../kitsune-error" }
kitsune-federation-filter = { path = "../kitsune-federation-filter" }
kitsune-http-client = { path = "../kitsune-http-client" }
kitsune-language = { path = "../kitsune-language" }
@ -32,12 +32,10 @@ kitsune-util = { path = "../kitsune-util" }
kitsune-wasm-mrf = { path = "../kitsune-wasm-mrf" }
mime = "0.3.17"
mime_guess = { version = "2.0.4", default-features = false }
rsa = "0.9.6"
serde = "1.0.197"
sha2 = "0.10.8"
simd-json = "0.13.9"
speedy-uuid = { path = "../../lib/speedy-uuid" }
thiserror = "1.0.58"
tracing = "0.1.40"
typed-builder = "0.18.1"
url = "2.5.0"

View File

@ -1,9 +1,9 @@
use crate::error::{Error, Result};
use autometrics::autometrics;
use futures_util::{stream::FuturesUnordered, Stream, StreamExt};
use http::{Method, Request};
use kitsune_core::consts::USER_AGENT;
use kitsune_db::model::{account::Account, user::User};
use kitsune_error::{Error, Result};
use kitsune_federation_filter::FederationFilter;
use kitsune_http_client::Client;
use kitsune_type::ap::Activity;

View File

@ -1,5 +1,4 @@
use crate::{
error::Result,
mapping::{self, IntoActivity},
InboxResolver,
};
@ -17,6 +16,7 @@ use kitsune_db::{
schema::{accounts, posts, users},
with_connection, PgPool,
};
use kitsune_error::Result;
use kitsune_service::attachment::AttachmentService;
use kitsune_type::ap::{ap_context, Activity, ActivityType, ObjectField};
use kitsune_url::UrlService;
@ -390,7 +390,7 @@ impl Deliverer {
#[async_trait]
impl DelivererTrait for Deliverer {
async fn deliver(&self, action: Action) -> eyre::Result<()> {
async fn deliver(&self, action: Action) -> Result<()> {
match action {
Action::AcceptFollow(follow) => self.accept_follow(follow).await,
Action::Create(post) | Action::Repost(post) => self.create_or_repost(post).await,

View File

@ -1,87 +0,0 @@
use diesel_async::pooled_connection::bb8;
use rsa::pkcs8::der;
use std::{convert::Infallible, fmt::Debug};
use thiserror::Error;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum Error {
#[error("Instance is blocked")]
BlockedInstance,
#[error(transparent)]
Cache(#[from] kitsune_cache::Error),
#[error(transparent)]
DatabasePool(#[from] bb8::RunError),
#[error(transparent)]
Der(#[from] der::Error),
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Embed(#[from] kitsune_embed::Error),
#[error(transparent)]
FederationFilter(#[from] kitsune_federation_filter::error::Error),
#[error(transparent)]
FetchAccount(eyre::Report),
#[error(transparent)]
FetchEmoji(eyre::Report),
#[error(transparent)]
FetchPost(eyre::Report),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
HttpClient(#[from] kitsune_http_client::Error),
#[error("Invalid ActivityPub document")]
InvalidDocument,
#[error("Invalid ActivityPub response")]
InvalidResponse,
#[error(transparent)]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("Missing host")]
MissingHost,
#[error(transparent)]
Mrf(#[from] kitsune_wasm_mrf::Error),
#[error("Not found")]
NotFound,
#[error(transparent)]
Resolver(eyre::Report),
#[error(transparent)]
Search(#[from] kitsune_search::Error),
#[error(transparent)]
Service(#[from] kitsune_service::error::Error),
#[error(transparent)]
SimdJson(#[from] simd_json::Error),
#[error("Unsupported media type")]
UnsupportedMediaType,
#[error(transparent)]
UrlParse(#[from] url::ParseError),
}
impl From<Infallible> for Error {
fn from(err: Infallible) -> Self {
match err {}
}
}

View File

@ -1,8 +1,5 @@
use super::Fetcher;
use crate::{
error::{Error, Result},
process_attachments,
};
use crate::process_attachments;
use autometrics::autometrics;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
@ -13,6 +10,7 @@ use kitsune_db::{
schema::accounts,
with_connection, with_transaction,
};
use kitsune_error::{kitsune_error, Error, Result};
use kitsune_search::SearchBackend;
use kitsune_type::ap::actor::Actor;
use kitsune_util::{convert::timestamp_to_uuid, sanitize::CleanHtmlExt};
@ -55,7 +53,10 @@ impl Fetcher {
return Ok(None);
};
let mut domain = url.host_str().ok_or(Error::MissingHost)?;
let mut domain = url
.host_str()
.ok_or_else(|| kitsune_error!("missing host component"))?;
let domain_buf;
let try_resolver = opts
.acct
@ -65,8 +66,7 @@ impl Fetcher {
match self
.resolver
.resolve_account(&actor.preferred_username, domain)
.await
.map_err(Error::Resolver)?
.await?
{
Some(resource) if resource.uri == actor.id => {
actor.preferred_username = resource.username;
@ -85,7 +85,9 @@ impl Fetcher {
if !used_resolver && actor.id != url.as_str() {
url = Url::parse(&actor.id)?;
domain = url.host_str().ok_or(Error::MissingHost)?;
domain = url
.host_str()
.ok_or_else(|| kitsune_error!("missing host component"))?;
}
actor.clean_html();

View File

@ -1,5 +1,4 @@
use super::Fetcher;
use crate::error::{Error, Result};
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
use iso8601_timestamp::Timestamp;
@ -11,6 +10,7 @@ use kitsune_db::{
schema::{custom_emojis, media_attachments},
with_connection, with_transaction,
};
use kitsune_error::{kitsune_error, Error, Result};
use kitsune_type::ap::emoji::Emoji;
use speedy_uuid::Uuid;
use url::Url;
@ -35,11 +35,15 @@ impl Fetcher {
return Ok(None);
};
let mut domain = url.host_str().ok_or(Error::MissingHost)?;
let mut domain = url
.host_str()
.ok_or_else(|| kitsune_error!("missing host component"))?;
if emoji.id != url.as_str() {
url = Url::parse(&emoji.id)?;
domain = url.host_str().ok_or(Error::MissingHost)?;
domain = url
.host_str()
.ok_or_else(|| kitsune_error!("missing host component"))?;
}
let content_type = emoji
@ -47,7 +51,7 @@ impl Fetcher {
.media_type
.as_deref()
.or_else(|| mime_guess::from_path(&emoji.icon.url).first_raw())
.ok_or(Error::InvalidDocument)?;
.ok_or_else(|| kitsune_error!("failed to guess content-type"))?;
let name_pure = emoji.name.replace(':', "");

View File

@ -1,4 +1,3 @@
use crate::error::{Error, Result};
use async_trait::async_trait;
use headers::{ContentType, HeaderMapExt};
use http::HeaderValue;
@ -16,6 +15,7 @@ use kitsune_db::{
PgPool,
};
use kitsune_embed::Client as EmbedClient;
use kitsune_error::{bail, Error, Result};
use kitsune_federation_filter::FederationFilter;
use kitsune_http_client::Client;
use kitsune_type::jsonld::RdfNode;
@ -70,7 +70,7 @@ impl Fetcher {
{
let url = url.try_into()?;
if !self.federation_filter.is_url_allowed(&url)? {
return Err(Error::BlockedInstance);
bail!("instance is blocked");
}
let response = self.client.get(url.as_str()).await?;
@ -84,7 +84,7 @@ impl Fetcher {
.typed_get::<ContentType>()
.map(Mime::from)
else {
return Err(Error::InvalidResponse);
bail!("invalid content-type header in response");
};
let is_json_ld_activitystreams = || {
@ -108,7 +108,7 @@ impl Fetcher {
};
if !is_json_ld_activitystreams() && !is_activity_json() {
return Err(Error::InvalidResponse);
bail!("invalid content-type: isnt either ld+json or activity+json");
}
let response = response.jsonld().await?;
@ -123,15 +123,15 @@ impl FetcherTrait for Fetcher {
Arc::new(self.resolver.clone())
}
async fn fetch_account(&self, opts: AccountFetchOptions<'_>) -> eyre::Result<Option<Account>> {
async fn fetch_account(&self, opts: AccountFetchOptions<'_>) -> Result<Option<Account>> {
Ok(self.fetch_actor(opts).await?)
}
async fn fetch_emoji(&self, url: &str) -> eyre::Result<Option<CustomEmoji>> {
async fn fetch_emoji(&self, url: &str) -> Result<Option<CustomEmoji>> {
Ok(self.fetch_emoji(url).await?)
}
async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> eyre::Result<Option<Post>> {
async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result<Option<Post>> {
Ok(self.fetch_object(opts.url, opts.call_depth).await?)
}
}

View File

@ -1,10 +1,11 @@
use super::Fetcher;
use crate::{error::Result, process_new_object, ProcessNewObject};
use crate::{process_new_object, ProcessNewObject};
use autometrics::autometrics;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
use kitsune_cache::CacheBackend;
use kitsune_db::{model::post::Post, schema::posts, with_connection};
use kitsune_error::Result;
// Maximum call depth of fetching new posts. Prevents unbounded recursion.
// Setting this to >=40 would cause the `fetch_infinitely_long_reply_chain` test to run into stack overflow

View File

@ -1,4 +1,3 @@
use crate::error::{Error, Result};
use diesel::{
result::Error as DieselError, BelongingToDsl, BoolExpressionMethods, ExpressionMethods,
JoinOnDsl, QueryDsl, SelectableHelper,
@ -15,6 +14,7 @@ use kitsune_db::{
schema::{accounts, accounts_follows},
with_connection, PgPool,
};
use kitsune_error::{Error, Result};
pub struct InboxResolver {
db_pool: PgPool,

View File

@ -1,7 +1,6 @@
#[macro_use]
extern crate tracing;
use crate::error::{Error, Result};
use diesel::{ExpressionMethods, SelectableHelper};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use futures_util::{stream, StreamExt, TryStreamExt};
@ -23,6 +22,7 @@ use kitsune_db::{
with_transaction, PgPool,
};
use kitsune_embed::Client as EmbedClient;
use kitsune_error::{kitsune_error, Error, Result};
use kitsune_language::Language;
use kitsune_search::{AnySearchBackend, SearchBackend};
use kitsune_type::ap::{object::MediaAttachment, Object, Tag, TagType};
@ -31,7 +31,6 @@ use speedy_uuid::Uuid;
use typed_builder::TypedBuilder;
pub mod deliverer;
pub mod error;
pub mod fetcher;
pub mod inbox_resolver;
pub mod mapping;
@ -104,8 +103,7 @@ async fn handle_custom_emojis(
emoji_text: emoji_tag.name.clone(),
})
.try_collect::<Vec<PostCustomEmoji>>()
.await
.map_err(Error::FetchEmoji)?;
.await?;
diesel::insert_into(posts_custom_emojis::table)
.values(emojis)
@ -206,15 +204,14 @@ async fn preprocess_object(
if Uri::try_from(&object.attributed_to)?.authority()
!= Uri::try_from(&object.id)?.authority()
{
return Err(Error::InvalidDocument);
return Err(kitsune_error!("invalid document"));
}
let Some(author) = fetcher
.fetch_account(object.attributed_to.as_str().into())
.await
.map_err(Error::FetchAccount)?
.await?
else {
return Err(Error::NotFound);
return Err(kitsune_error!("account not found"));
};
CowBox::boxed(author)
@ -229,8 +226,7 @@ async fn preprocess_object(
.call_depth(call_depth + 1)
.build(),
)
.await
.map_err(Error::FetchPost)?
.await?
.map(|post| post.id)
} else {
None

View File

@ -1,5 +1,4 @@
use super::{IntoObject, State};
use crate::error::Result;
use diesel::QueryDsl;
use diesel_async::RunQueryDsl;
use iso8601_timestamp::Timestamp;
@ -8,6 +7,7 @@ use kitsune_db::{
schema::{accounts, posts},
with_connection,
};
use kitsune_error::Result;
use kitsune_type::ap::{ap_context, Activity, ActivityType, ObjectField};
use kitsune_util::try_join;
use std::future::Future;

View File

@ -1,5 +1,4 @@
use super::{util::BaseToCc, State};
use crate::error::{Error, Result};
use diesel::{BelongingToDsl, ExpressionMethods, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
use futures_util::{future::OptionFuture, FutureExt, TryFutureExt, TryStreamExt};
@ -14,6 +13,7 @@ use kitsune_db::{
schema::{accounts, custom_emojis, media_attachments, posts, posts_custom_emojis},
with_connection,
};
use kitsune_error::{bail, kitsune_error, Error, ErrorType, Result};
use kitsune_type::ap::{
actor::{Actor, PublicKey},
ap_context,
@ -35,12 +35,19 @@ impl IntoObject for DbMediaAttachment {
type Output = MediaAttachment;
async fn into_object(self, state: State<'_>) -> Result<Self::Output> {
let mime = Mime::from_str(&self.content_type).map_err(|_| Error::UnsupportedMediaType)?;
let mime = Mime::from_str(&self.content_type).map_err(
|_| kitsune_error!(type = ErrorType::UnsupportedMediaType, "unsupported media type"),
)?;
let r#type = match mime.type_() {
mime::AUDIO => MediaAttachmentType::Audio,
mime::IMAGE => MediaAttachmentType::Image,
mime::VIDEO => MediaAttachmentType::Video,
_ => return Err(Error::UnsupportedMediaType),
_ => {
return Err(
kitsune_error!(type = ErrorType::UnsupportedMediaType, "unsupported media type"),
)
}
};
let url = state.service.attachment.get_url(self.id).await?;
@ -98,7 +105,7 @@ impl IntoObject for Post {
// Therefore it's also not an object
// We just return en error here
if self.reposted_post_id.is_some() {
return Err(Error::NotFound);
bail!("post not found");
}
let (account, in_reply_to, mentions, emojis, attachment_stream) =
@ -255,7 +262,7 @@ impl IntoObject for CustomEmoji {
// Let's pretend we're not home and do not answer
let name = match self.domain {
None => Ok(format!(":{}:", self.shortcode)),
Some(_) => Err(Error::NotFound),
Some(_) => Err(kitsune_error!("custom emoji not found")),
}?;
let icon = with_connection!(state.db_pool, |db_conn| {

View File

@ -1,18 +1,24 @@
use super::handle::handle;
use http_body_util::Empty;
use hyper::{body::Bytes, Request, Response};
use kitsune_activitypub::{error::Error, Fetcher};
use kitsune_activitypub::Fetcher;
use kitsune_cache::NoopCache;
use kitsune_config::instance::FederationFilterConfiguration;
use kitsune_core::traits::Fetcher as _;
use kitsune_federation_filter::FederationFilter;
use kitsune_http_client::Client;
use kitsune_search::NoopSearchService;
use kitsune_test::{database_test, language_detection_config};
use kitsune_test::{assert_display_eq, database_test, language_detection_config};
use kitsune_webfinger::Webfinger;
use std::{convert::Infallible, sync::Arc};
use tower::service_fn;
macro_rules! assert_blocked {
($error:expr) => {
assert_display_eq!($error, "instance is blocked")
};
}
#[tokio::test]
async fn federation_allow() {
database_test(|db_pool| async move {
@ -46,25 +52,15 @@ async fn federation_allow() {
)))
.build();
assert!(matches!(
*fetcher
.fetch_post("https://example.com/fakeobject".into())
.await
.unwrap_err()
.downcast_ref()
.unwrap(),
Error::BlockedInstance
));
assert_blocked!(fetcher
.fetch_post("https://example.com/fakeobject".into())
.await
.unwrap_err());
assert!(matches!(
*fetcher
.fetch_post("https://other.badstuff.com/otherfake".into())
.await
.unwrap_err()
.downcast_ref()
.unwrap(),
Error::BlockedInstance
));
assert_blocked!(fetcher
.fetch_post("https://other.badstuff.com/otherfake".into())
.await
.unwrap_err());
let client = Client::builder().service(service_fn(handle));
let fetcher = builder
@ -118,24 +114,15 @@ async fn federation_deny() {
.post_cache(Arc::new(NoopCache.into()))
.build();
assert!(matches!(
fetcher
.fetch_post("https://example.com/fakeobject".into())
.await
.unwrap_err()
.downcast_ref()
.unwrap(),
Error::BlockedInstance
));
assert!(matches!(
*fetcher
.fetch_post("https://other.badstuff.com/otherfake".into())
.await
.unwrap_err()
.downcast_ref()
.unwrap(),
Error::BlockedInstance
));
assert_blocked!(fetcher
.fetch_post("https://example.com/fakeobject".into())
.await
.unwrap_err());
assert_blocked!(fetcher
.fetch_post("https://other.badstuff.com/otherfake".into())
.await
.unwrap_err());
})
.await;
}

View File

@ -1,14 +1,14 @@
use super::handle::handle;
use http::{header::CONTENT_TYPE, uri::PathAndQuery};
use hyper::Request;
use kitsune_activitypub::{error::Error, Fetcher};
use kitsune_activitypub::Fetcher;
use kitsune_cache::NoopCache;
use kitsune_config::instance::FederationFilterConfiguration;
use kitsune_core::traits::Fetcher as _;
use kitsune_federation_filter::FederationFilter;
use kitsune_http_client::Client;
use kitsune_search::NoopSearchService;
use kitsune_test::{database_test, language_detection_config};
use kitsune_test::{assert_display_eq, database_test, language_detection_config};
use kitsune_webfinger::Webfinger;
use std::{convert::Infallible, sync::Arc};
use tower::service_fn;
@ -108,15 +108,13 @@ async fn check_ap_content_type() {
.post_cache(Arc::new(NoopCache.into()))
.build();
assert!(matches!(
*fetcher
assert_display_eq!(
fetcher
.fetch_post("https://corteximplant.com/users/0x0".into())
.await
.unwrap_err()
.downcast_ref()
.unwrap(),
Error::InvalidResponse
));
.unwrap_err(),
"invalid content-type header in response"
);
})
.await;
}

View File

@ -7,15 +7,15 @@ license.workspace = true
[dependencies]
enum_dispatch = "0.3.13"
kitsune-error = { path = "../kitsune-error" }
moka = { version = "0.12.5", features = ["future"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
redis = { version = "0.25.2", default-features = false, features = [
redis = { version = "0.25.3", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
serde = "1.0.197"
simd-json = "0.13.9"
thiserror = "1.0.58"
tracing = "0.1.40"
typed-builder = "0.18.1"

View File

@ -1,11 +0,0 @@
use redis::RedisError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Redis(#[from] RedisError),
#[error(transparent)]
SimdJson(#[from] simd_json::Error),
}

View File

@ -1,4 +1,5 @@
use crate::{CacheBackend, CacheResult};
use crate::CacheBackend;
use kitsune_error::Result;
use moka::future::Cache;
use std::{fmt::Display, marker::PhantomData, time::Duration};
@ -34,16 +35,16 @@ where
K: Display + Send + Sync + ?Sized,
V: Clone + Send + Sync + 'static,
{
async fn delete(&self, key: &K) -> CacheResult<()> {
async fn delete(&self, key: &K) -> Result<()> {
self.inner.remove(&key.to_string()).await;
Ok(())
}
async fn get(&self, key: &K) -> CacheResult<Option<V>> {
async fn get(&self, key: &K) -> Result<Option<V>> {
Ok(self.inner.get(&key.to_string()).await)
}
async fn set(&self, key: &K, value: &V) -> CacheResult<()> {
async fn set(&self, key: &K, value: &V) -> Result<()> {
self.inner.insert(key.to_string(), value.clone()).await;
Ok(())
}

View File

@ -2,19 +2,16 @@
extern crate tracing;
use enum_dispatch::enum_dispatch;
use kitsune_error::Result;
use serde::{de::DeserializeOwned, Serialize};
use std::{fmt::Display, sync::Arc};
pub use self::error::Error;
pub use self::in_memory::InMemory as InMemoryCache;
pub use self::redis::Redis as RedisCache;
mod error;
mod in_memory;
mod redis;
type CacheResult<T, E = Error> = Result<T, E>;
pub type ArcCache<K, V> = Arc<AnyCache<K, V>>;
#[enum_dispatch(CacheBackend<K, V>)]
@ -34,9 +31,9 @@ pub trait CacheBackend<K, V>: Send + Sync
where
K: ?Sized,
{
async fn delete(&self, key: &K) -> CacheResult<()>;
async fn get(&self, key: &K) -> CacheResult<Option<V>>;
async fn set(&self, key: &K, value: &V) -> CacheResult<()>;
async fn delete(&self, key: &K) -> Result<()>;
async fn get(&self, key: &K) -> Result<Option<V>>;
async fn set(&self, key: &K, value: &V) -> Result<()>;
}
#[derive(Clone)]
@ -47,15 +44,15 @@ where
K: Send + Sync + ?Sized,
V: Send + Sync,
{
async fn delete(&self, _key: &K) -> CacheResult<()> {
async fn delete(&self, _key: &K) -> Result<()> {
Ok(())
}
async fn get(&self, _key: &K) -> CacheResult<Option<V>> {
async fn get(&self, _key: &K) -> Result<Option<V>> {
Ok(None)
}
async fn set(&self, _key: &K, _value: &V) -> CacheResult<()> {
async fn set(&self, _key: &K, _value: &V) -> Result<()> {
Ok(())
}
}

View File

@ -1,4 +1,5 @@
use super::{CacheBackend, CacheResult};
use super::CacheBackend;
use kitsune_error::Result;
use redis::{aio::ConnectionManager, AsyncCommands};
use serde::{de::DeserializeOwned, Serialize};
use std::{fmt::Display, marker::PhantomData, time::Duration};
@ -53,7 +54,7 @@ where
V: Serialize + DeserializeOwned + Send + Sync,
{
#[instrument(skip_all, fields(%key))]
async fn delete(&self, key: &K) -> CacheResult<()> {
async fn delete(&self, key: &K) -> Result<()> {
let mut conn = self.redis_conn.get();
let key = self.compute_key(key);
@ -64,7 +65,7 @@ where
}
#[instrument(skip_all, fields(%key))]
async fn get(&self, key: &K) -> CacheResult<Option<V>> {
async fn get(&self, key: &K) -> Result<Option<V>> {
let mut conn = self.redis_conn.get();
let key = self.compute_key(key);
@ -79,7 +80,7 @@ where
}
#[instrument(skip_all, fields(%key))]
async fn set(&self, key: &K, value: &V) -> CacheResult<()> {
async fn set(&self, key: &K, value: &V) -> Result<()> {
let mut conn = self.redis_conn.get();
let key = self.compute_key(key);
let serialised = simd_json::to_string(value)?;

View File

@ -8,12 +8,12 @@ license.workspace = true
[dependencies]
enum_dispatch = "0.3.13"
http = "1.1.0"
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
serde = { version = "1.0.197", features = ["derive"] }
serde_urlencoded = "0.7.1"
simd-json = "0.13.9"
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0.58"
typed-builder = "0.18.1"
[lints]

View File

@ -1,8 +1,7 @@
use serde::{Deserialize, Serialize};
use strum::{Display, EnumString};
use thiserror::Error;
#[derive(Debug, PartialEq, Display, Serialize, Deserialize, EnumString, Error)]
#[derive(Debug, PartialEq, Display, Serialize, Deserialize, EnumString)]
pub enum CaptchaVerification {
#[strum(serialize = "missing-input-secret")]
MissingInputSecret,
@ -28,21 +27,3 @@ pub enum CaptchaVerification {
#[strum(default)]
Other(String),
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
CaptchaVerification(#[from] CaptchaVerification),
#[error(transparent)]
SimdJson(#[from] simd_json::Error),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
HttpClient(#[from] kitsune_http_client::Error),
#[error(transparent)]
HttpForm(#[from] serde_urlencoded::ser::Error),
}

View File

@ -2,16 +2,12 @@
use self::error::CaptchaVerification;
use enum_dispatch::enum_dispatch;
use kitsune_error::Result;
pub mod error;
pub mod hcaptcha;
pub mod mcaptcha;
pub use self::error::Error;
/// Result alias where the error defaults to [`BoxError`]
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Status of the captcha challenge verification
#[derive(PartialEq)]
pub enum ChallengeStatus {

View File

@ -9,13 +9,9 @@ build = "build.rs"
[dependencies]
async-trait = "0.1.79"
const_format = "0.2.32"
eyre = "0.6.12"
http = "1.1.0"
kitsune-db = { path = "../kitsune-db" }
kitsune-messaging = { path = "../kitsune-messaging" }
kitsune-error = { path = "../kitsune-error" }
serde = { version = "1.0.197", features = ["derive"] }
speedy-uuid = { path = "../../lib/speedy-uuid", features = ["diesel"] }
thiserror = "1.0.58"
typed-builder = "0.18.1"
[build-dependencies]

View File

@ -1,48 +0,0 @@
use http::StatusCode;
use std::borrow::Cow;
use thiserror::Error;
macro_rules! http_error {
($($variant_name:ident => $status_code:path),*$(,)?) => {
#[derive(Debug, Error)]
pub enum HttpError {
$(
#[doc = stringify!($variant_name)]
#[error("{}", self.as_str())]
$variant_name,
)*
}
impl HttpError {
#[inline]
pub fn as_str(&self) -> Cow<'static, str> {
let status_code = self.status_code();
status_code
.canonical_reason()
.map_or_else(
|| Cow::Owned(status_code.as_str().to_string()),
Cow::Borrowed,
)
}
#[inline]
#[must_use]
pub fn status_code(&self) -> ::http::StatusCode {
match self {
$(
Self::$variant_name => $status_code,
)*
}
}
}
}
}
http_error! {
BadRequest => StatusCode::NOT_FOUND,
InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
NotFound => StatusCode::NOT_FOUND,
Unauthorised => StatusCode::UNAUTHORIZED,
UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE,
}

View File

@ -1,8 +0,0 @@
use kitsune_messaging::{MessageConsumer, MessageEmitter};
pub use self::post::PostEvent;
pub mod post;
pub type PostEventConsumer = MessageConsumer<PostEvent>;
pub type PostEventEmitter = MessageEmitter<PostEvent>;

View File

@ -1,16 +0,0 @@
use serde::{Deserialize, Serialize};
use speedy_uuid::Uuid;
#[derive(Clone, Copy, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum EventType {
Create,
Delete,
Update,
}
#[derive(Clone, Copy, Deserialize, Serialize)]
pub struct PostEvent {
pub r#type: EventType,
pub post_id: Uuid,
}

View File

@ -1,4 +1,2 @@
pub mod consts;
pub mod error;
pub mod event;
pub mod traits;

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use eyre::Result;
use kitsune_db::model::{account::Account, favourite::Favourite, follower::Follow, post::Post};
use kitsune_error::Result;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

View File

@ -1,7 +1,7 @@
use super::Resolver;
use async_trait::async_trait;
use eyre::Result;
use kitsune_db::model::{account::Account, custom_emoji::CustomEmoji, post::Post};
use kitsune_error::Result;
use std::sync::Arc;
use typed_builder::TypedBuilder;

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use eyre::Result;
use kitsune_error::Result;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

View File

@ -22,6 +22,7 @@ futures-util = { version = "0.3.30", default-features = false, features = [
] }
iso8601-timestamp = { version = "0.2.17", features = ["diesel-pg"] }
kitsune-config = { path = "../kitsune-config" }
kitsune-error = { path = "../kitsune-error" }
kitsune-language = { path = "../kitsune-language" }
kitsune-type = { path = "../kitsune-type" }
num-derive = "0.4.2"
@ -36,12 +37,12 @@ rustls-native-certs = "0.7.0"
serde = { version = "1.0.197", features = ["derive"] }
simd-json = "0.13.9"
speedy-uuid = { path = "../../lib/speedy-uuid", features = ["diesel"] }
thiserror = "1.0.58"
tokio = { version = "1.37.0", features = ["rt"] }
tokio-postgres = "0.7.10"
tokio-postgres-rustls = "0.11.1"
tokio-postgres-rustls = "0.12.0"
tracing = "0.1.40"
tracing-log = "0.2.0"
trials = { path = "../../lib/trials" }
typed-builder = "0.18.1"
[dev-dependencies]

View File

@ -1,10 +1,5 @@
use core::fmt;
use diesel_async::pooled_connection::bb8;
use std::error::Error as StdError;
use thiserror::Error;
pub type BoxError = Box<dyn StdError + Send + Sync>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct EnumConversionError(pub i32);
@ -35,21 +30,3 @@ impl fmt::Display for IsoCodeConversionError {
}
impl StdError for IsoCodeConversionError {}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Blocking(#[from] blowocking::Error),
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
DieselConnection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
Migration(BoxError),
#[error(transparent)]
Pool(#[from] bb8::RunError),
}

View File

@ -9,13 +9,13 @@ use diesel_async::{
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use kitsune_config::database::Configuration as DatabaseConfig;
use kitsune_error::{Error, Result};
use tracing_log::LogTracer;
pub type PgPool = Pool<AsyncPgConnection>;
pub use crate::error::{Error, Result};
#[doc(hidden)]
pub use diesel_async;
pub use {diesel_async, trials};
mod error;
mod pool;
@ -45,7 +45,7 @@ pub async fn connect(config: &DatabaseConfig) -> Result<PgPool> {
migration_conn
.run_pending_migrations(MIGRATIONS)
.map_err(Error::Migration)?;
.map_err(Error::msg)?;
Ok::<_, Error>(())
}

View File

@ -7,20 +7,13 @@ macro_rules! with_connection {
}};
}
#[macro_export]
macro_rules! catch_error {
($($tt:tt)*) => {{
let result: ::std::result::Result<_, ::diesel_async::pooled_connection::bb8::RunError> = async {
Ok({ $($tt)* })
}.await;
result
}};
}
#[macro_export]
macro_rules! with_connection_panicky {
($pool:expr, $($other:tt)*) => {{
$crate::catch_error!($crate::with_connection!($pool, $($other)*)).unwrap()
let result: ::std::result::Result<_, $crate::diesel_async::pooled_connection::bb8::RunError> = $crate::trials::attempt! { async
$crate::with_connection!($pool, $($other)*)
};
result.unwrap()
}};
}

View File

@ -14,6 +14,7 @@ askama_axum = "0.4.0" # Damn it, cargo. Because "kitsune" uses "askama" with the
diesel = "2.1.5"
diesel-async = "0.4.1"
kitsune-db = { path = "../kitsune-db" }
kitsune-error = { path = "../kitsune-error" }
kitsune-url = { path = "../kitsune-url" }
lettre = { version = "0.11.6", default-features = false, features = [
"builder",
@ -24,13 +25,12 @@ lettre = { version = "0.11.6", default-features = false, features = [
"tokio1-rustls-tls",
"tracing",
] }
mrml = { version = "3.1.3", default-features = false, features = [
mrml = { version = "3.1.4", default-features = false, features = [
"orderedmap",
"parse",
"render",
] }
speedy-uuid = { path = "../../lib/speedy-uuid" }
thiserror = "1.0.58"
typed-builder = "0.18.1"
[lints]

View File

@ -1,33 +0,0 @@
use diesel_async::pooled_connection::bb8::RunError as DatabasePoolError;
use std::{error::Error as StdError, fmt::Debug};
use thiserror::Error;
pub type BoxError = Box<dyn StdError + Send + Sync>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Address(#[from] lettre::address::AddressError),
#[error(transparent)]
DatabasePool(#[from] DatabasePoolError),
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Lettre(#[from] lettre::error::Error),
#[error(transparent)]
Templating(#[from] askama::Error),
#[error(transparent)]
Transport(BoxError),
#[error(transparent)]
RenderParsing(#[from] mrml::prelude::parser::Error),
#[error(transparent)]
Rendering(#[from] mrml::prelude::render::Error),
}

View File

@ -1,7 +1,5 @@
use crate::{
error::{BoxError, Error, Result},
traits::RenderableEmail,
};
use crate::traits::RenderableEmail;
use kitsune_error::{Error, Result};
use lettre::{
message::{Mailbox, MultiPart},
AsyncTransport, Message,
@ -12,7 +10,6 @@ use typed_builder::TypedBuilder;
pub use self::service::Mailing as MailingService;
pub use lettre;
pub mod error;
pub mod mails;
pub mod service;
pub mod traits;
@ -27,7 +24,7 @@ pub struct MailSender<B> {
impl<B> MailSender<B>
where
B: AsyncTransport + Sync,
<B as AsyncTransport>::Error: Into<BoxError>,
Error: From<<B as AsyncTransport>::Error>,
{
pub async fn send<'a, I, M>(&self, mailboxes: I, email: &M) -> Result<()>
where
@ -46,10 +43,7 @@ where
rendered_email.body.clone(),
))?;
self.backend
.send(message)
.await
.map_err(|err| Error::Transport(err.into()))?;
self.backend.send(message).await?;
}
Ok(())

View File

@ -1,8 +1,6 @@
use crate::{
error::Result,
traits::{RenderableEmail, RenderedEmail},
};
use crate::traits::{RenderableEmail, RenderedEmail};
use askama::Template;
use kitsune_error::Result;
use mrml::{mjml::Mjml, prelude::render::RenderOptions};
use typed_builder::TypedBuilder;

View File

@ -1,7 +1,8 @@
use crate::{error::Result, mails::confirm_account::ConfirmAccount, MailSender};
use crate::{mails::confirm_account::ConfirmAccount, MailSender};
use diesel::{ExpressionMethods, NullableExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use kitsune_db::{function::now, model::user::User, schema::users, with_connection, PgPool};
use kitsune_error::Result;
use kitsune_url::UrlService;
use lettre::{AsyncSmtpTransport, Tokio1Executor};
use speedy_uuid::Uuid;

View File

@ -1,4 +1,4 @@
use crate::error::Result;
use kitsune_error::Result;
#[derive(Debug)]
pub struct RenderedEmail {

View File

@ -12,11 +12,11 @@ embed-sdk = { git = "https://github.com/Lantern-chat/embed-service.git", rev = "
http = "1.1.0"
iso8601-timestamp = "0.2.17"
kitsune-db = { path = "../kitsune-db" }
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
once_cell = "1.19.0"
scraper = { version = "0.19.0", default-features = false }
smol_str = "0.2.1"
thiserror = "1.0.58"
typed-builder = "0.18.1"
[lints]

View File

@ -1,5 +1,5 @@
use diesel::{OptionalExtension, QueryDsl};
use diesel_async::{pooled_connection::bb8, RunQueryDsl};
use diesel_async::RunQueryDsl;
use embed_sdk::EmbedWithExpire;
use http::{Method, Request};
use iso8601_timestamp::Timestamp;
@ -9,18 +9,16 @@ use kitsune_db::{
schema::link_previews,
with_connection, PgPool,
};
use kitsune_error::Result;
use kitsune_http_client::Client as HttpClient;
use once_cell::sync::Lazy;
use scraper::{Html, Selector};
use smol_str::SmolStr;
use std::fmt::Debug;
use typed_builder::TypedBuilder;
pub use embed_sdk;
pub use embed_sdk::Embed;
type Result<T, E = Error> = std::result::Result<T, E>;
static LINK_SELECTOR: Lazy<Selector> = Lazy::new(|| {
Selector::parse("a:not(.mention, .hashtag)").expect("[Bug] Failed to parse link HTML selector")
});
@ -34,18 +32,6 @@ fn first_link_from_fragment(fragment: &str) -> Option<String> {
.and_then(|element| element.value().attr("href").map(ToString::to_string))
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Http(#[from] kitsune_http_client::Error),
#[error(transparent)]
Pool(#[from] bb8::RunError),
}
#[derive(Clone, TypedBuilder)]
pub struct Client {
db_pool: PgPool,

View File

@ -0,0 +1,18 @@
[package]
name = "kitsune-error"
authors.workspace = true
edition.workspace = true
version.workspace = true
license.workspace = true
[dependencies]
axum-core = "0.4.3"
eyre = "0.6.12"
garde = { version = "0.18.0", default-features = false, features = ["serde"] }
http = "1.1.0"
simd-json = "0.13.9"
sync_wrapper = "1.0.0"
tracing = "0.1.40"
[lints]
workspace = true

View File

@ -0,0 +1,48 @@
use crate::{Error, ErrorType};
use axum_core::response::{IntoResponse, Response};
use http::StatusCode;
#[inline]
fn to_response<B>(status_code: StatusCode, maybe_body: Option<B>) -> Response
where
B: IntoResponse,
{
maybe_body.map_or_else(
|| status_code.into_response(),
|body| (status_code, body).into_response(),
)
}
impl From<Error> for Response {
#[inline]
fn from(value: Error) -> Self {
value.into_response()
}
}
impl IntoResponse for Error {
fn into_response(self) -> Response {
debug!(error = ?self.inner);
if let Some(garde_report) = self.inner.downcast_ref::<garde::Report>() {
let body = match simd_json::to_string(&garde_report) {
Ok(body) => body,
Err(error) => return Error::from(error).into_response(),
};
return to_response(StatusCode::BAD_REQUEST, Some(body));
}
let maybe_body = self.ctx.body.into_inner();
match self.ctx.ty {
ErrorType::BadRequest => to_response(StatusCode::BAD_REQUEST, maybe_body),
ErrorType::Forbidden => to_response(StatusCode::FORBIDDEN, maybe_body),
ErrorType::NotFound => to_response(StatusCode::NOT_FOUND, maybe_body),
ErrorType::Unauthorized => to_response(StatusCode::UNAUTHORIZED, maybe_body),
ErrorType::UnsupportedMediaType => {
to_response(StatusCode::UNSUPPORTED_MEDIA_TYPE, maybe_body)
}
ErrorType::Other => to_response(StatusCode::INTERNAL_SERVER_ERROR, maybe_body),
}
}
}

View File

@ -0,0 +1,21 @@
use crate::{Error, ErrorContext};
mod sealed {
pub trait Sealed {}
impl<T, E> Sealed for Result<T, E> {}
}
pub trait ResultExt<T>: sealed::Sealed {
fn with_error_context(self, ty: ErrorContext) -> Result<T, Error>;
}
impl<T, E> ResultExt<T> for Result<T, E>
where
E: Into<Error>,
{
#[inline]
fn with_error_context(self, ctx: ErrorContext) -> Result<T, Error> {
self.map_err(|err| err.into().with_context(ctx))
}
}

View File

@ -0,0 +1,139 @@
#[macro_use]
extern crate tracing;
use axum_core::response::{IntoResponse, Response};
use std::fmt::{self, Debug, Display};
use sync_wrapper::SyncWrapper;
pub use self::ext::ResultExt;
mod axum;
mod ext;
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[macro_export]
macro_rules! bail {
($(type = $type:expr,)? $msg:expr) => {
return Err($crate::kitsune_error!($(type = $type,)? $msg).into());
};
}
#[macro_export]
macro_rules! kitsune_error {
(type = $type:expr, $msg:expr) => {
$crate::Error::msg($msg).with_context({ $type }.into())
};
($msg:expr) => {
$crate::kitsune_error!(type = $crate::ErrorType::Other, $msg)
};
}
#[derive(Clone, Debug)]
pub enum ErrorType {
BadRequest,
Forbidden,
NotFound,
Unauthorized,
UnsupportedMediaType,
Other,
}
impl ErrorType {
#[must_use]
pub fn with_body<B>(self, body: B) -> ErrorContext
where
B: IntoResponse,
{
ErrorContext {
ty: self,
body: Some(body.into_response()).into(),
}
}
}
impl From<ErrorType> for ErrorContext {
fn from(value: ErrorType) -> Self {
Self {
ty: value,
body: SyncWrapper::new(None),
}
}
}
#[derive(Debug)]
pub struct ErrorContext {
ty: ErrorType,
body: SyncWrapper<Option<Response>>,
}
#[derive(Debug)]
pub struct Error {
ctx: ErrorContext,
inner: eyre::Report,
}
impl Error {
#[inline]
pub fn new<E>(ctx: ErrorContext, err: E) -> Self
where
E: Into<eyre::Report>,
{
Self {
ctx,
inner: err.into(),
}
}
#[inline]
pub fn msg<M>(msg: M) -> Self
where
M: Debug + Display + Send + Sync + 'static,
{
eyre::Report::msg(msg).into()
}
#[must_use]
pub fn context(&self) -> &ErrorContext {
&self.ctx
}
pub fn error(&self) -> &eyre::Report {
&self.inner
}
pub fn into_error(self) -> eyre::Report {
self.inner
}
#[must_use]
pub fn with_context(self, ctx: ErrorContext) -> Self {
Self { ctx, ..self }
}
}
impl<T> From<T> for Error
where
T: Into<eyre::Report>,
{
fn from(value: T) -> Self {
Self {
ctx: ErrorType::Other.into(),
inner: value.into(),
}
}
}
impl From<Error> for BoxError {
fn from(value: Error) -> Self {
BoxError::from(value.inner)
}
}
impl fmt::Display for Error {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
<eyre::Report as fmt::Display>::fmt(&self.inner, f)
}
}

View File

@ -8,8 +8,8 @@ license.workspace = true
[dependencies]
globset = "0.4.14"
kitsune-config = { path = "../kitsune-config" }
kitsune-error = { path = "../kitsune-error" }
kitsune-type = { path = "../kitsune-type" }
thiserror = "1.0.58"
url = "2.5.0"
[lints]

View File

@ -1,15 +0,0 @@
use thiserror::Error;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Glob(#[from] globset::Error),
#[error("Host missing from URL")]
HostMissing,
#[error(transparent)]
UrlParse(#[from] url::ParseError),
}

View File

@ -1,12 +1,10 @@
use crate::error::{Error, Result};
use globset::{Glob, GlobSet, GlobSetBuilder};
use kitsune_config::instance::FederationFilterConfiguration;
use kitsune_error::{kitsune_error, Result};
use kitsune_type::ap::{actor::Actor, Activity, Object};
use std::sync::Arc;
use url::Url;
pub mod error;
pub trait Entity {
fn id(&self) -> &str;
}
@ -58,7 +56,9 @@ impl FederationFilter {
}
pub fn is_url_allowed(&self, url: &Url) -> Result<bool> {
let host = url.host_str().ok_or(Error::HostMissing)?;
let host = url
.host_str()
.ok_or_else(|| kitsune_error!("missing host component"))?;
let allowed = match self.filter {
FilterMode::Allow { .. } => self.domains.is_match(host),

View File

@ -10,11 +10,11 @@ athena = { path = "../../lib/athena" }
derive_more = { version = "1.0.0-beta.6", features = ["from"] }
diesel = "2.1.5"
diesel-async = "0.4.1"
eyre = "0.6.12"
futures-util = "0.3.30"
kitsune-core = { path = "../kitsune-core" }
kitsune-db = { path = "../kitsune-db" }
kitsune-email = { path = "../kitsune-email" }
kitsune-error = { path = "../kitsune-error" }
serde = { version = "1.0.197", features = ["derive"] }
speedy-uuid = { path = "../../lib/speedy-uuid" }
tracing = "0.1.40"

View File

@ -13,7 +13,7 @@ pub struct DeliverAccept {
impl Runnable for DeliverAccept {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(follow_id = %self.follow_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -14,7 +14,7 @@ pub struct DeliverCreate {
impl Runnable for DeliverCreate {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(post_id = %self.post_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -14,7 +14,7 @@ pub struct DeliverDelete {
impl Runnable for DeliverDelete {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(post_id = %self.post_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -14,7 +14,7 @@ pub struct DeliverFavourite {
impl Runnable for DeliverFavourite {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(favourite_id = %self.favourite_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -14,7 +14,7 @@ pub struct DeliverFollow {
impl Runnable for DeliverFollow {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(follow_id = %self.follow_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -13,7 +13,7 @@ pub struct DeliverReject {
impl Runnable for DeliverReject {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(follow_id = %self.follow_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -14,7 +14,7 @@ pub struct DeliverUnfavourite {
impl Runnable for DeliverUnfavourite {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
#[instrument(skip_all, fields(favourite_id = %self.favourite_id))]
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {

View File

@ -14,7 +14,7 @@ pub struct DeliverUnfollow {
impl Runnable for DeliverUnfollow {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {
let follow = with_connection!(ctx.db_pool, |db_conn| {

View File

@ -25,7 +25,7 @@ pub struct DeliverUpdate {
impl Runnable for DeliverUpdate {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {
let action = match self.entity {

View File

@ -55,7 +55,7 @@ pub enum Job {
impl Runnable for Job {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {
match self {
@ -87,7 +87,7 @@ impl KitsuneContextRepo {
impl JobContextRepository for KitsuneContextRepo {
type JobContext = Job;
type Error = eyre::Report;
type Error = kitsune_error::Error;
type Stream = BoxStream<'static, Result<(Uuid, Self::JobContext), Self::Error>>;
async fn fetch_context<I>(&self, job_ids: I) -> Result<Self::Stream, Self::Error>
@ -103,7 +103,7 @@ impl JobContextRepository for KitsuneContextRepo {
Ok(stream
.map_ok(|ctx| (ctx.id, ctx.context.0))
.map_err(eyre::Report::from)
.map_err(kitsune_error::Error::from)
.boxed())
}

View File

@ -13,7 +13,7 @@ pub struct SendConfirmationMail {
impl Runnable for SendConfirmationMail {
type Context = JobRunnerContext;
type Error = eyre::Report;
type Error = kitsune_error::Error;
async fn run(&self, ctx: &Self::Context) -> Result<(), Self::Error> {
let mailing_service = &ctx.service.mailing;

View File

@ -6,15 +6,14 @@ version.workspace = true
license.workspace = true
[dependencies]
derive_builder = "0.20.0"
diesel = "2.1.5"
diesel-async = "0.4.1"
futures-util = "0.3.30"
iso8601-timestamp = "0.2.17"
kitsune-cache = { path = "../kitsune-cache" }
kitsune-core = { path = "../kitsune-core" }
kitsune-db = { path = "../kitsune-db" }
kitsune-embed = { path = "../kitsune-embed" }
kitsune-error = { path = "../kitsune-error" }
kitsune-service = { path = "../kitsune-service" }
kitsune-type = { path = "../kitsune-type" }
kitsune-url = { path = "../kitsune-url" }
@ -24,8 +23,6 @@ serde = "1.0.197"
simd-json = "0.13.9"
smol_str = "0.2.1"
speedy-uuid = { path = "../../lib/speedy-uuid" }
thiserror = "1.0.58"
tokio = { version = "1.37.0", features = ["rt"] }
tracing = "0.1.40"
typed-builder = "0.18.1"

View File

@ -1,22 +0,0 @@
use diesel_async::pooled_connection::bb8;
use std::fmt::Debug;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Cache(#[from] kitsune_cache::Error),
#[error(transparent)]
DatabasePool(#[from] bb8::RunError),
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Embed(#[from] kitsune_embed::Error),
#[error(transparent)]
Service(#[from] kitsune_service::error::Error),
}

View File

@ -2,13 +2,10 @@
extern crate tracing;
use self::sealed::{IntoMastodon, MapperState};
use crate::error::Result;
use derive_builder::Builder;
use futures_util::StreamExt;
use kitsune_cache::{ArcCache, CacheBackend};
use kitsune_core::event::{post::EventType, PostEventConsumer};
use kitsune_db::PgPool;
use kitsune_embed::Client as EmbedClient;
use kitsune_error::Result;
use kitsune_service::attachment::AttachmentService;
use kitsune_url::UrlService;
use serde::Deserialize;
@ -18,71 +15,12 @@ use typed_builder::TypedBuilder;
mod sealed;
pub mod error;
pub trait MapperMarker: IntoMastodon {}
impl<T> MapperMarker for T where T: IntoMastodon {}
#[derive(TypedBuilder)]
struct CacheInvalidationActor {
cache: ArcCache<Uuid, OwnedValue>,
event_consumer: PostEventConsumer,
}
impl CacheInvalidationActor {
async fn run(mut self) {
loop {
while let Some(event) = self.event_consumer.next().await {
let event = match event {
Ok(event) => event,
Err(err) => {
error!(error = %err, "Failed to receive status event");
continue;
}
};
if matches!(event.r#type, EventType::Delete | EventType::Update) {
if let Err(err) = self.cache.delete(&event.post_id).await {
error!(error = %err, "Failed to remove entry from cache");
}
}
}
if let Err(err) = self.event_consumer.reconnect().await {
error!(error = %err, "Failed to reconnect to event source");
}
}
}
pub fn spawn(self) {
tokio::spawn(self.run());
}
}
#[derive(Builder, Clone)]
#[builder(pattern = "owned")]
#[allow(clippy::used_underscore_binding)]
#[derive(Clone, TypedBuilder)]
pub struct MastodonMapper {
#[builder(
field(
ty = "Option<PostEventConsumer>",
build = "CacheInvalidationActor::builder()
.cache(
self.mastodon_cache
.clone()
.ok_or(MastodonMapperBuilderError::UninitializedField(\"mastodon_cache\"))?
)
.event_consumer(
self._cache_invalidator
.ok_or(MastodonMapperBuilderError::UninitializedField(\"cache_invalidator\"))?
)
.build()
.spawn();",
),
setter(name = "cache_invalidator", strip_option)
)]
_cache_invalidator: (),
attachment_service: AttachmentService,
db_pool: PgPool,
embed_client: Option<EmbedClient>,
@ -91,11 +29,6 @@ pub struct MastodonMapper {
}
impl MastodonMapper {
#[must_use]
pub fn builder() -> MastodonMapperBuilder {
MastodonMapperBuilder::default()
}
/// Return a reference to a mapper state
///
/// Passed down to the concrete mapping implementations

View File

@ -1,4 +1,3 @@
use crate::error::{Error, Result};
use diesel::{
BelongingToDsl, BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods,
OptionalExtension, QueryDsl, SelectableHelper,
@ -28,6 +27,7 @@ use kitsune_db::{
};
use kitsune_embed::Client as EmbedClient;
use kitsune_embed::{embed_sdk::EmbedType, Embed};
use kitsune_error::{Error, Result};
use kitsune_service::attachment::AttachmentService;
use kitsune_type::mastodon::{
account::Source,

View File

@ -1,22 +0,0 @@
[package]
name = "kitsune-messaging"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
ahash = "0.8.11"
derive_more = { version = "1.0.0-beta.6", features = ["from"] }
futures-util = "0.3.30"
just-retry = { path = "../../lib/just-retry" }
pin-project-lite = "0.2.14"
redis = { version = "0.25.2", features = ["connection-manager", "tokio-comp"] }
serde = "1.0.197"
simd-json = "0.13.9"
tokio = { version = "1.37.0", features = ["macros", "rt", "sync"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = "0.1.40"
[lints]
workspace = true

View File

@ -1,7 +0,0 @@
# kitsune-messaging
Messaging backend abstraction for Kitsune
## About
Some common infrastructure built on a trait that abstracts over message delivery backends such as Redis PubSub, RabbitMQ, etc.

View File

@ -1,48 +0,0 @@
use futures_util::StreamExt;
use kitsune_messaging::{redis::RedisMessagingBackend, MessagingHub};
use simd_json::{json, OwnedValue};
use std::time::Duration;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let redis_backend = RedisMessagingBackend::new("redis://localhost")
.await
.unwrap();
let hub = MessagingHub::new(redis_backend);
let consumer = hub.consumer::<OwnedValue>("test".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 1: {msg:?}]") }));
let consumer = hub.consumer::<OwnedValue>("test".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 2: {msg:?}]") }));
let consumer = hub.consumer::<OwnedValue>("test2".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 3: {msg:?}]") }));
let consumer = hub.consumer::<OwnedValue>("test2".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 4: {msg:?}]") }));
for i in 1..=3 {
let emitter = hub.emitter("test".into());
emitter
.emit(json!({
"hello": "world",
"who": ["are", "you", "?"],
"message": i,
}))
.await
.unwrap();
let emitter = hub.emitter("test2".into());
emitter
.emit(json!({
"hello": "world",
"who": ["are", "you", "?"],
"message": i,
}))
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}

View File

@ -1,46 +0,0 @@
use futures_util::StreamExt;
use kitsune_messaging::{tokio_broadcast::TokioBroadcastMessagingBackend, MessagingHub};
use simd_json::{json, OwnedValue};
use std::time::Duration;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let tokio_broadcast_backend = TokioBroadcastMessagingBackend::default();
let hub = MessagingHub::new(tokio_broadcast_backend);
let consumer = hub.consumer::<OwnedValue>("test".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 1: {msg:?}]") }));
let consumer = hub.consumer::<OwnedValue>("test".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 2: {msg:?}]") }));
let consumer = hub.consumer::<OwnedValue>("test2".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 3: {msg:?}]") }));
let consumer = hub.consumer::<OwnedValue>("test2".into()).await.unwrap();
tokio::spawn(consumer.for_each(|msg| async move { println!("Consumer 4: {msg:?}]") }));
for i in 1..=3 {
let emitter = hub.emitter("test".into());
emitter
.emit(json!({
"hello": "world",
"who": ["are", "you", "?"],
"message": i,
}))
.await
.unwrap();
let emitter = hub.emitter("test2".into());
emitter
.emit(json!({
"hello": "world",
"who": ["are", "you", "?"],
"message": i,
}))
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}

View File

@ -1,257 +0,0 @@
#![doc = include_str!("../README.md")]
#![forbid(missing_docs)]
#[macro_use]
extern crate tracing;
use derive_more::From;
use futures_util::{stream::BoxStream, Stream, StreamExt};
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
error::Error,
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{self, ready, Poll},
};
/// Boxed error
pub type BoxError = Box<dyn Error + Send + Sync>;
/// Type alias for Result, defaulting to [`BoxError`] on the error branch
pub type Result<T, E = BoxError> = std::result::Result<T, E>;
pub mod redis;
pub mod tokio_broadcast;
/// Enum dispatch over all supported backends
#[derive(From)]
pub enum AnyMessagingBackend {
/// Redis backend
Redis(redis::RedisMessagingBackend),
/// Tokio broadcast backend
Tokio(tokio_broadcast::TokioBroadcastMessagingBackend),
}
impl MessagingBackend for AnyMessagingBackend {
async fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> Result<()> {
match self {
Self::Redis(redis) => redis.enqueue(channel_name, message).await,
Self::Tokio(tokio) => tokio.enqueue(channel_name, message).await,
}
}
async fn message_stream(
&self,
channel_name: String,
) -> Result<impl Stream<Item = Result<Vec<u8>>> + 'static> {
match self {
Self::Redis(redis) => redis
.message_stream(channel_name)
.await
.map(StreamExt::left_stream),
Self::Tokio(tokio) => tokio
.message_stream(channel_name)
.await
.map(StreamExt::right_stream),
}
}
}
/// Messaging backend
///
/// This is the trait that lets the message hub create emitters and consumers.
/// The backend just needs to be able to transport bytes, that's all.
///
/// The trait is designed to be object-safe since it's internally stored inside an `Arc`
/// and supposed to be type-erased for ease of testing.
pub trait MessagingBackend {
/// Enqueue a new message onto the backend
fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> impl Future<Output = Result<()>>;
/// Open a new stream of messages from the backend
fn message_stream(
&self,
channel_name: String,
) -> impl Future<Output = Result<impl Stream<Item = Result<Vec<u8>>> + 'static>>;
}
pin_project! {
/// Consumer of messages
pub struct MessageConsumer<M> {
backend: Arc<AnyMessagingBackend>,
channel_name: String,
#[pin]
inner: BoxStream<'static, Result<Vec<u8>>>,
_ty: PhantomData<M>,
}
}
impl<M> MessageConsumer<M>
where
M: DeserializeOwned + Serialize,
{
/// Duplicate the message consumer
///
/// This is essentially just creating another consumer.
/// Useful if you don't have access to the backend nor an emitter
///
/// # Errors
///
/// - Failed to create another consumer
///
/// For more details, check [`MessagingHub::consumer`]
pub async fn duplicate(&self) -> Result<Self> {
MessagingHub {
backend: self.backend.clone(),
}
.consumer(self.channel_name.clone())
.await
}
/// Create an emitter that emits messages to this consumer
#[must_use]
pub fn emitter(&self) -> MessageEmitter<M> {
MessagingHub {
backend: self.backend.clone(),
}
.emitter(self.channel_name.clone())
}
/// Reconnect the message consumer
///
/// Use this if the stream ever ends and you think it really shouldn't
///
/// # Errors
///
/// - Reconnection failed
pub async fn reconnect(&mut self) -> Result<()> {
self.inner = self
.backend
.message_stream(self.channel_name.clone())
.await?
.boxed();
Ok(())
}
}
impl<M> Stream for MessageConsumer<M>
where
M: DeserializeOwned,
{
type Item = Result<M>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.inner.poll_next(cx)) {
Some(Ok(mut msg)) => {
Poll::Ready(Some(simd_json::from_slice(&mut msg).map_err(Into::into)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
}
}
}
/// Message emitter
///
/// This is cheaply clonable. Internally it is a string for the channel name and an `Arc` referencing the backend.
#[derive(Clone)]
pub struct MessageEmitter<M> {
backend: Arc<AnyMessagingBackend>,
channel_name: String,
_ty: PhantomData<M>,
}
impl<M> MessageEmitter<M>
where
M: DeserializeOwned + Serialize,
{
/// Create a new consumer from the emitter
///
/// # Errors
///
/// - Failed to create consumer
pub async fn consumer(&self) -> Result<MessageConsumer<M>> {
MessagingHub {
backend: self.backend.clone(),
}
.consumer(self.channel_name.clone())
.await
}
/// Emit a new message
///
/// # Errors
///
/// - Message failed to serialise
/// - Message failed to enqueue
pub async fn emit(&self, message: M) -> Result<()> {
let message = simd_json::to_vec(&message)?;
self.backend.enqueue(&self.channel_name, message).await
}
}
/// Central hub for messaging
///
/// Allows for the registration of new emitters and consumers
///
/// Using the same backend instance ensures that channels with the same name are connected.
/// When using two distinct backend instances it depends on the backend.
///
/// For example, the Redis backend, when connected to the same Redis server, will connect channels with the same name across two different instances.
pub struct MessagingHub {
backend: Arc<AnyMessagingBackend>,
}
impl MessagingHub {
/// Create a new messaging hub
pub fn new<B>(backend: B) -> Self
where
B: Into<AnyMessagingBackend>,
{
Self {
backend: Arc::new(backend.into()),
}
}
/// Create a new consumer of messages emitted to the channel
///
/// # Errors
///
/// - Consumer failed to be created
pub async fn consumer<M>(&self, channel_name: String) -> Result<MessageConsumer<M>>
where
M: DeserializeOwned + Serialize,
{
let message_stream = self
.backend
.message_stream(channel_name.clone())
.await?
.boxed();
Ok(MessageConsumer {
backend: self.backend.clone(),
channel_name,
inner: message_stream,
_ty: PhantomData,
})
}
/// Create a new emitter for a channel
#[must_use]
pub fn emitter<M>(&self, channel_name: String) -> MessageEmitter<M>
where
M: DeserializeOwned + Serialize,
{
MessageEmitter {
channel_name,
backend: self.backend.clone(),
_ty: PhantomData,
}
}
}

View File

@ -1,172 +0,0 @@
//!
//! Redis implementation
//!
use crate::{MessagingBackend, Result};
use ahash::AHashMap;
use futures_util::{future, Stream, StreamExt, TryStreamExt};
use just_retry::RetryExt;
use redis::{
aio::{ConnectionManager, PubSub},
AsyncCommands, RedisError,
};
use std::fmt::Debug;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::BroadcastStream;
const BROADCAST_CAPACITY: usize = 10;
const REGISTRATION_QUEUE_SIZE: usize = 50;
macro_rules! handle_err {
($result:expr, $msg:literal $(,)?) => {{
if let Err(error) = { $result } {
error!(?error, $msg);
}
}};
($result:expr $(,)?) => {
handle_err!($result, "");
};
}
#[derive(Debug)]
struct RegistrationMessage {
channel_pattern: String,
responder: oneshot::Sender<broadcast::Receiver<Vec<u8>>>,
}
struct MultiplexActor {
client: redis::Client,
conn: PubSub,
mapping: AHashMap<String, broadcast::Sender<Vec<u8>>>,
registration_queue: mpsc::Receiver<RegistrationMessage>,
}
impl MultiplexActor {
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.registration_queue.recv() => {
let receiver = if let Some(sender) = self.mapping.get(&msg.channel_pattern) {
sender.subscribe()
} else {
let (sender, receiver) = broadcast::channel(BROADCAST_CAPACITY);
handle_err!(self.conn.psubscribe(
msg.channel_pattern.as_str()).await,
"Failed to subscribe to pattern",
);
self.mapping.insert(msg.channel_pattern, sender);
receiver
};
drop(msg.responder.send(receiver));
}
msg = future::poll_fn(|ctx| self.conn.on_message().poll_next_unpin(ctx)) => {
if let Some(msg) = msg {
let pattern: String = msg.get_pattern().unwrap();
if let Some(sender) = self.mapping.get(&pattern) {
if sender.send(msg.get_payload_bytes().to_vec()).is_err() {
// According to the tokio docs, this case only occurs when all receivers have been dropped
handle_err!(
self.conn.punsubscribe(pattern.as_str()).await,
"Failed to unsubscribe from pattern",
);
self.mapping.remove(&pattern);
}
} else {
debug!(%pattern, "Failed to find correct receiver");
}
} else {
self.conn = (|| {
let client = self.client.clone();
async move {
client
.get_async_pubsub()
.await
}
})
.retry(just_retry::backoff_policy())
.await
.unwrap();
for key in self.mapping.keys() {
handle_err!(
self.conn.psubscribe(key).await,
"Failed to subscribe to pattern",
);
}
}
}
}
}
}
pub async fn spawn(
client: redis::Client,
) -> Result<mpsc::Sender<RegistrationMessage>, RedisError> {
let (sender, receiver) = mpsc::channel(REGISTRATION_QUEUE_SIZE);
let actor = Self {
mapping: AHashMap::new(),
conn: client.get_async_pubsub().await?,
client,
registration_queue: receiver,
};
tokio::spawn(actor.run());
Ok(sender)
}
}
/// Implementation of the [`MessagingBackend`] trait for Redis PubSub
///
/// Note: Channel names, when passed to the `message_stream` function, are interpreted as channel patterns.
pub struct RedisMessagingBackend {
pub_connection: ConnectionManager,
sub_actor: mpsc::Sender<RegistrationMessage>,
}
impl RedisMessagingBackend {
/// Create a new Redis PubSub backend
///
/// # Errors
///
/// - Failed to connect to the Redis instance
pub async fn new(conn_string: &str) -> Result<Self, RedisError> {
let client = redis::Client::open(conn_string)?;
let sub_actor = MultiplexActor::spawn(client.clone()).await?;
let pub_connection = ConnectionManager::new(client).await?;
Ok(Self {
pub_connection,
sub_actor,
})
}
}
impl MessagingBackend for RedisMessagingBackend {
async fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> Result<()> {
self.pub_connection
.clone()
.publish(channel_name, message)
.await
.map_err(Into::into)
}
async fn message_stream(
&self,
channel_name: String,
) -> Result<impl Stream<Item = Result<Vec<u8>>> + 'static> {
let (sender, receiver) = oneshot::channel();
self.sub_actor
.send(RegistrationMessage {
channel_pattern: channel_name,
responder: sender,
})
.await?;
let broadcast_receiver = receiver.await?;
Ok(BroadcastStream::new(broadcast_receiver).map_err(Into::into))
}
}

View File

@ -1,54 +0,0 @@
//!
//! Implementation over a Tokio broadcast channel
//!
use crate::{MessagingBackend, Result};
use futures_util::{Stream, TryStreamExt};
use std::{collections::HashMap, sync::RwLock};
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
const BROADCAST_CAPACITY: usize = 50;
/// Messaging backend implementation based on Tokio's broadcast channel
pub struct TokioBroadcastMessagingBackend {
registry: RwLock<HashMap<String, broadcast::Sender<Vec<u8>>>>,
}
impl Default for TokioBroadcastMessagingBackend {
fn default() -> Self {
Self {
registry: RwLock::new(HashMap::new()),
}
}
}
impl MessagingBackend for TokioBroadcastMessagingBackend {
async fn enqueue(&self, channel_name: &str, message: Vec<u8>) -> Result<()> {
let guard = self.registry.read().unwrap();
if let Some(sender) = guard.get(channel_name) {
sender.send(message)?;
}
Ok(())
}
async fn message_stream(
&self,
channel_name: String,
) -> Result<impl Stream<Item = Result<Vec<u8>>> + 'static> {
let guard = self.registry.read().unwrap();
let receiver = if let Some(sender) = guard.get(&channel_name) {
sender.subscribe()
} else {
drop(guard);
let mut guard = self.registry.write().unwrap();
let (sender, receiver) = broadcast::channel(BROADCAST_CAPACITY);
guard.insert(channel_name, sender);
receiver
};
Ok(BroadcastStream::new(receiver).map_err(Into::into))
}
}

View File

@ -10,6 +10,7 @@ enum_dispatch = "0.3.13"
http = "1.1.0"
http-compat = { path = "../../lib/http-compat" }
kitsune-config = { path = "../kitsune-config" }
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
moka = { version = "0.12.5", features = ["future"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
@ -19,14 +20,13 @@ openidconnect = { version = "3.5.0", default-features = false, features = [
"accept-rfc3339-timestamps",
"accept-string-booleans",
] }
redis = { version = "0.25.2", default-features = false, features = [
redis = { version = "0.25.3", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
serde = { version = "1.0.197", features = ["derive"] }
simd-json = "0.13.9"
speedy-uuid = { path = "../../lib/speedy-uuid", features = ["serde"] }
thiserror = "1.0.58"
url = "2.5.0"
[lints]

View File

@ -1,55 +0,0 @@
use openidconnect::{
core::CoreErrorResponseType, ClaimsVerificationError, DiscoveryError, RequestTokenError,
SigningError, StandardErrorResponse,
};
use thiserror::Error;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
ClaimsVerification(#[from] ClaimsVerificationError),
#[error(transparent)]
Discovery(#[from] DiscoveryError<kitsune_http_client::Error>),
#[error(transparent)]
JsonParse(#[from] simd_json::Error),
#[error("Missing Email address")]
MissingEmail,
#[error("Mismatching hash")]
MismatchingHash,
#[error("Missing ID token")]
MissingIdToken,
#[error("Missing login state")]
MissingLoginState,
#[error("Missing username")]
MissingUsername,
#[error(transparent)]
Redis(#[from] redis::RedisError),
#[error(transparent)]
RequestToken(
#[from]
RequestTokenError<
kitsune_http_client::Error,
StandardErrorResponse<CoreErrorResponseType>,
>,
),
#[error(transparent)]
Signing(#[from] SigningError),
#[error("Unknown CSRF token")]
UnknownCsrfToken,
#[error(transparent)]
UrlParse(#[from] url::ParseError),
}

View File

@ -1,11 +1,9 @@
use crate::{
error::Result,
state::{
store::{InMemory as InMemoryStore, Redis as RedisStore},
LoginState, OAuth2LoginState, Store,
},
use crate::state::{
store::{InMemory as InMemoryStore, Redis as RedisStore},
LoginState, OAuth2LoginState, Store,
};
use kitsune_config::oidc::{Configuration, StoreConfiguration};
use kitsune_error::{bail, kitsune_error, Result};
use multiplex_pool::RoundRobinStrategy;
use openidconnect::{
core::{CoreAuthenticationFlow, CoreClient, CoreProviderMetadata},
@ -15,9 +13,6 @@ use openidconnect::{
use speedy_uuid::Uuid;
use url::Url;
pub use self::error::Error;
mod error;
mod state;
pub mod http;
@ -135,7 +130,9 @@ impl OidcService {
.request_async(self::http::async_client)
.await?;
let id_token = token_response.id_token().ok_or(Error::MissingIdToken)?;
let id_token = token_response
.id_token()
.ok_or_else(|| kitsune_error!("missing id token"))?;
let claims = id_token.claims(&self.client.id_token_verifier(), &nonce)?;
if let Some(expected_hash) = claims.access_token_hash() {
@ -145,7 +142,7 @@ impl OidcService {
)?;
if actual_hash != *expected_hash {
return Err(Error::MismatchingHash);
bail!("hash mismatch");
}
}
@ -153,9 +150,12 @@ impl OidcService {
subject: claims.subject().to_string(),
username: claims
.preferred_username()
.ok_or(Error::MissingUsername)?
.ok_or_else(|| kitsune_error!("missing username"))?
.to_string(),
email: claims
.email()
.ok_or_else(|| kitsune_error!("missing email address"))?
.to_string(),
email: claims.email().ok_or(Error::MissingEmail)?.to_string(),
oauth2: OAuth2Info {
application_id: oauth2.application_id,
scope: oauth2.scope,

View File

@ -1,8 +1,6 @@
use super::Store;
use crate::{
error::{Error, Result},
state::LoginState,
};
use crate::state::LoginState;
use kitsune_error::{kitsune_error, ErrorType, Result};
use moka::future::Cache;
#[derive(Clone)]
@ -20,7 +18,10 @@ impl InMemory {
impl Store for InMemory {
async fn get_and_remove(&self, key: &str) -> Result<LoginState> {
self.inner.remove(key).await.ok_or(Error::MissingLoginState)
self.inner
.remove(key)
.await
.ok_or_else(|| kitsune_error!(type = ErrorType::BadRequest, "missing login state"))
}
async fn set(&self, key: &str, value: LoginState) -> Result<()> {

View File

@ -1,6 +1,6 @@
use super::LoginState;
use crate::error::Result;
use enum_dispatch::enum_dispatch;
use kitsune_error::Result;
pub use self::{in_memory::InMemory, redis::Redis};

View File

@ -1,5 +1,6 @@
use super::Store;
use crate::{error::Result, state::LoginState};
use crate::state::LoginState;
use kitsune_error::Result;
use redis::{aio::ConnectionManager, AsyncCommands};
const REDIS_PREFIX: &str = "OIDC-LOGIN-STATE";

View File

@ -9,6 +9,7 @@ license.workspace = true
bytes = "1.6.0"
futures-util = { version = "0.3.30", default-features = false }
http = "1.1.0"
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
quick-xml = { version = "0.31.0", features = ["serialize"] }
rusty-s3 = "0.5.0"

View File

@ -4,15 +4,13 @@ use http::{
header::{CONTENT_LENGTH, ETAG},
Request,
};
use kitsune_error::{bail, Error, Result};
use kitsune_http_client::{Body, Client as HttpClient, Response};
use rusty_s3::{actions::CreateMultipartUpload, Bucket, Credentials, S3Action};
use serde::Serialize;
use std::{ops::Deref, time::Duration};
use typed_builder::TypedBuilder;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
type Result<T, E = BoxError> = std::result::Result<T, E>;
const TWO_MINUTES: Duration = Duration::from_secs(2 * 60);
#[derive(Serialize)]
@ -49,7 +47,7 @@ async fn execute_request(client: &HttpClient, req: Request<Body>) -> Result<Resp
err_msg.push_str("\nbody: ");
err_msg.push_str(&body);
return Err(Box::from(err_msg));
bail!(err_msg);
}
Ok(response)
@ -129,7 +127,7 @@ impl Client {
pub async fn put_object<S, E>(&self, path: &str, stream: S) -> Result<()>
where
S: Stream<Item = Result<Bytes, E>> + Send + Sync + 'static,
E: Into<BoxError>,
E: Into<Error>,
{
let create_multipart_upload = self
.bucket
@ -171,7 +169,7 @@ impl Client {
let response = execute_request(&self.http_client, request).await?;
let Some(etag_header) = response.headers().get(ETAG) else {
return Err(Box::from("missing etag header"));
bail!("missing etag header");
};
etags.push(etag_header.to_str()?.to_string());
@ -228,8 +226,9 @@ impl Client {
#[cfg(test)]
mod test {
use crate::{BoxError, CreateBucketConfiguration};
use crate::CreateBucketConfiguration;
use futures_util::{future, stream, TryStreamExt};
use kitsune_error::{kitsune_error, Error};
use kitsune_test::minio_test;
const TEST_DATA: &[u8] = b"https://open.spotify.com/track/6VNNakpjSH8LNBX7fSGhUv";
@ -255,7 +254,7 @@ mod test {
client
.put_object(
"good song",
stream::once(future::ok::<_, BoxError>(TEST_DATA.into())),
stream::once(future::ok::<_, Error>(TEST_DATA.into())),
)
.await
.unwrap();
@ -287,7 +286,7 @@ mod test {
let result = client
.put_object(
"this will break horribly",
stream::once(future::err(BoxError::from("hehe"))),
stream::once(future::err(kitsune_error!("hehe"))),
)
.await;

View File

@ -16,11 +16,11 @@ enum_dispatch = "0.3.13"
futures-util = "0.3.30"
kitsune-config = { path = "../kitsune-config" }
kitsune-db = { path = "../kitsune-db" }
kitsune-error = { path = "../kitsune-error" }
kitsune-language = { path = "../kitsune-language" }
serde = { version = "1.0.197", features = ["derive"] }
speedy-uuid = { path = "../../lib/speedy-uuid" }
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0.58"
tracing = "0.1.40"
typed-builder = "0.18.1"

View File

@ -1,16 +0,0 @@
use diesel_async::pooled_connection::bb8;
use std::fmt::Debug;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Database(#[from] diesel::result::Error),
#[error(transparent)]
DatabasePool(#[from] bb8::RunError),
#[cfg(feature = "meilisearch")]
#[error(transparent)]
Meilisearch(#[from] meilisearch_sdk::errors::Error),
}

View File

@ -3,22 +3,19 @@ extern crate tracing;
use enum_dispatch::enum_dispatch;
use kitsune_db::model::{account::Account as DbAccount, post::Post as DbPost};
use kitsune_error::Result;
use serde::{Deserialize, Serialize};
use speedy_uuid::Uuid;
use strum::{AsRefStr, EnumIter};
mod error;
#[cfg(feature = "meilisearch")]
mod meilisearch;
mod sql;
pub use self::error::Error;
#[cfg(feature = "meilisearch")]
pub use self::meilisearch::MeiliSearchService;
pub use self::sql::SearchService as SqlSearchService;
type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Clone)]
#[enum_dispatch(SearchBackend)]
pub enum AnySearchBackend {

View File

@ -34,10 +34,10 @@ kitsune-core = { path = "../kitsune-core" }
kitsune-db = { path = "../kitsune-db" }
kitsune-email = { path = "../kitsune-email" }
kitsune-embed = { path = "../kitsune-embed" }
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
kitsune-jobs = { path = "../kitsune-jobs" }
kitsune-language = { path = "../kitsune-language" }
kitsune-messaging = { path = "../kitsune-messaging" }
kitsune-search = { path = "../kitsune-search" }
kitsune-storage = { path = "../kitsune-storage" }
kitsune-url = { path = "../kitsune-url" }
@ -48,17 +48,15 @@ password-hash = { version = "0.5.0", features = ["std"] }
pkcs8 = "0.10.2"
post-process = { path = "../../lib/post-process" }
rand = "0.8.5"
redis = { version = "0.25.2", default-features = false, features = [
redis = { version = "0.25.3", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
rsa = "0.9.6"
rusty-s3 = { version = "0.5.0", default-features = false }
serde = "1.0.197"
simd-json = "0.13.9"
smol_str = "0.2.1"
speedy-uuid = { path = "../../lib/speedy-uuid" }
thiserror = "1.0.58"
tokio = { version = "1.37.0", features = ["macros", "sync"] }
tracing = "0.1.40"
typed-builder = "0.18.1"

View File

@ -3,7 +3,6 @@ use super::{
job::{Enqueue, JobService},
LimitContext,
};
use crate::error::{Error, Result};
use bytes::Bytes;
use derive_builder::Builder;
use diesel::{
@ -28,6 +27,7 @@ use kitsune_db::{
schema::{accounts, accounts_follows, accounts_preferences, notifications, posts},
with_connection, PgPool,
};
use kitsune_error::{Error, Result};
use kitsune_jobs::deliver::{
accept::DeliverAccept,
follow::DeliverFollow,
@ -299,8 +299,7 @@ impl AccountService {
let Some(webfinger_actor) = self
.resolver
.resolve_account(get_user.username, domain)
.await
.map_err(Error::Resolver)?
.await?
else {
return Ok(None);
};
@ -310,10 +309,7 @@ impl AccountService {
.url(&webfinger_actor.uri)
.build();
self.fetcher
.fetch_account(opts)
.await
.map_err(Error::Fetcher)
self.fetcher.fetch_account(opts).await
} else {
with_connection!(self.db_pool, |db_conn| {
accounts::table
@ -577,8 +573,8 @@ impl AccountService {
pub async fn update<A, H>(&self, mut update: Update<A, H>) -> Result<Account>
where
A: Stream<Item = kitsune_storage::Result<Bytes>> + Send + Sync + 'static,
H: Stream<Item = kitsune_storage::Result<Bytes>> + Send + Sync + 'static,
A: Stream<Item = Result<Bytes>> + Send + Sync + 'static,
H: Stream<Item = Result<Bytes>> + Send + Sync + 'static,
{
update.validate(&())?;

View File

@ -1,4 +1,3 @@
use crate::error::{AttachmentError, Error, Result};
use bytes::{Bytes, BytesMut};
use derive_builder::Builder;
use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl};
@ -12,8 +11,9 @@ use kitsune_db::{
schema::media_attachments,
with_connection, PgPool,
};
use kitsune_error::{kitsune_error, Error, ErrorType, Result};
use kitsune_http_client::Client;
use kitsune_storage::{AnyStorageBackend, BoxError, StorageBackend};
use kitsune_storage::{AnyStorageBackend, StorageBackend};
use kitsune_url::UrlService;
use speedy_uuid::Uuid;
use typed_builder::TypedBuilder;
@ -125,13 +125,9 @@ impl AttachmentService {
) -> Result<impl Stream<Item = Result<Bytes>> + 'static> {
// TODO: Find way to avoid boxing the streams here
if let Some(ref file_path) = media_attachment.file_path {
let stream = self
.storage_backend
.get(file_path.as_str())
.await
.map_err(Error::Storage)?;
let stream = self.storage_backend.get(file_path.as_str()).await?;
Ok(stream.map_err(Error::Storage).boxed())
Ok(stream.map_err(Error::from).boxed())
} else if self.media_proxy_enabled {
Ok(self
.client
@ -141,7 +137,7 @@ impl AttachmentService {
.map_err(Into::into)
.boxed())
} else {
Err(AttachmentError::NotFound.into())
Err(kitsune_error!(type = ErrorType::NotFound, "attachment not found"))
}
}
@ -172,7 +168,7 @@ impl AttachmentService {
pub async fn upload<S>(&self, upload: Upload<S>) -> Result<MediaAttachment>
where
S: Stream<Item = Result<Bytes, BoxError>> + Send + Sync + 'static,
S: Stream<Item = Result<Bytes>> + Send + Sync + 'static,
{
upload.validate(&())?;
@ -182,32 +178,26 @@ impl AttachmentService {
pin_mut!(stream);
let mut img_bytes = BytesMut::new();
while let Some(chunk) = stream
.next()
.await
.transpose()
.map_err(AttachmentError::StreamError)?
{
while let Some(chunk) = stream.next().await.transpose()? {
img_bytes.extend_from_slice(&chunk);
}
let img_bytes = img_bytes.freeze();
let final_bytes = DynImage::from_bytes(img_bytes)
.map_err(AttachmentError::ImageProcessingError)?
let final_bytes = DynImage::from_bytes(img_bytes)?
.ok_or(img_parts::Error::WrongSignature)
.map(|mut image| {
image.set_exif(None);
image.encoder().bytes()
})
.map_err(AttachmentError::ImageProcessingError)?;
})?;
self.storage_backend
.put(&upload.path, stream::once(async { Ok(final_bytes) }))
.await
.await?;
} else {
self.storage_backend.put(&upload.path, upload.stream).await
self.storage_backend
.put(&upload.path, upload.stream)
.await?;
}
.map_err(Error::Storage)?;
let media_attachment = with_connection!(self.db_pool, |db_conn| {
diesel::insert_into(media_attachments::table)

View File

@ -1,5 +1,5 @@
use crate::error::Result;
use kitsune_captcha::{AnyCaptcha, CaptchaBackend, ChallengeStatus};
use kitsune_error::Result;
use typed_builder::TypedBuilder;
#[derive(Clone, TypedBuilder)]

View File

@ -1,5 +1,4 @@
use super::attachment::{AttachmentService, Upload};
use crate::error::{BoxError, Error, Result};
use bytes::Bytes;
use diesel::{
BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods,
@ -15,6 +14,7 @@ use kitsune_db::{
schema::{custom_emojis, media_attachments, posts, posts_custom_emojis},
with_connection, PgPool,
};
use kitsune_error::{Error, Result};
use kitsune_url::UrlService;
use speedy_uuid::Uuid;
use typed_builder::TypedBuilder;
@ -134,7 +134,7 @@ impl CustomEmojiService {
pub async fn add_emoji<S>(&self, emoji_upload: EmojiUpload<S>) -> Result<CustomEmoji>
where
S: Stream<Item = Result<Bytes, BoxError>> + Send + Sync + 'static,
S: Stream<Item = Result<Bytes>> + Send + Sync + 'static,
{
emoji_upload.validate(&())?;

View File

@ -1,129 +0,0 @@
use diesel_async::pooled_connection::bb8;
use std::{error::Error as StdError, fmt::Debug};
use thiserror::Error;
pub type BoxError = Box<dyn StdError + Send + Sync>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum AttachmentError {
#[error(transparent)]
ImageProcessingError(#[from] img_parts::Error),
#[error("Not found")]
NotFound,
#[error(transparent)]
StreamError(#[from] BoxError),
}
#[derive(Debug, Error)]
pub enum PostError {
#[error("Bad request")]
BadRequest,
#[error("Unauthorised")]
Unauthorised,
}
#[derive(Debug, Error)]
pub enum UserError {
#[error("Invalid captcha")]
InvalidCaptcha,
#[error("Registrations closed")]
RegistrationsClosed,
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Attachment(#[from] AttachmentError),
#[error(transparent)]
Blocking(#[from] blowocking::Error),
#[error(transparent)]
Cache(#[from] kitsune_cache::Error),
#[error(transparent)]
Captcha(#[from] kitsune_captcha::Error),
#[error(transparent)]
DatabasePool(#[from] bb8::RunError),
#[error(transparent)]
Der(#[from] pkcs8::der::Error),
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Email(#[from] kitsune_email::error::Error),
#[error(transparent)]
Embed(#[from] kitsune_embed::Error),
#[error(transparent)]
Event(kitsune_messaging::BoxError),
#[error(transparent)]
Fetcher(eyre::Report),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
HttpClient(#[from] kitsune_http_client::Error),
#[error(transparent)]
HttpHeaderToStr(#[from] http::header::ToStrError),
#[error(transparent)]
JobQueue(#[from] athena::Error),
#[error(transparent)]
Mime(#[from] mime::FromStrError),
#[error(transparent)]
PasswordHash(#[from] password_hash::Error),
#[error(transparent)]
Pkcs8(#[from] pkcs8::Error),
#[error(transparent)]
Post(#[from] PostError),
#[error(transparent)]
PostProcessing(post_process::BoxError),
#[error(transparent)]
Resolver(eyre::Report),
#[error(transparent)]
Rsa(#[from] rsa::Error),
#[error(transparent)]
Search(#[from] kitsune_search::Error),
#[error(transparent)]
SimdJson(#[from] simd_json::Error),
#[error(transparent)]
Spki(#[from] pkcs8::spki::Error),
#[error(transparent)]
Storage(kitsune_storage::BoxError),
#[error(transparent)]
UriInvalid(#[from] http::uri::InvalidUri),
#[error(transparent)]
UrlParse(#[from] url::ParseError),
#[error(transparent)]
User(#[from] UserError),
#[error(transparent)]
Validate(#[from] garde::Report),
}

View File

@ -1,10 +1,10 @@
use crate::error::{Error, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use kitsune_db::{
schema::{accounts, posts, users},
with_connection, PgPool,
};
use kitsune_error::{Error, Result};
use smol_str::SmolStr;
use typed_builder::TypedBuilder;

View File

@ -1,6 +1,6 @@
use crate::error::Result;
use athena::{JobDetails, JobQueue};
use iso8601_timestamp::Timestamp;
use kitsune_error::Result;
use kitsune_jobs::{Job, KitsuneContextRepo};
use typed_builder::TypedBuilder;

View File

@ -7,7 +7,6 @@ pub mod account;
pub mod attachment;
pub mod captcha;
pub mod custom_emoji;
pub mod error;
pub mod instance;
pub mod job;
pub mod notification;

View File

@ -1,5 +1,4 @@
use super::LimitContext;
use crate::error::{Error, Result};
use diesel::{
BoolExpressionMethods, ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl,
SelectableHelper,
@ -12,6 +11,7 @@ use kitsune_db::{
schema::{accounts, accounts_follows, accounts_preferences, notifications, posts},
with_connection, PgPool,
};
use kitsune_error::{Error, Result};
use speedy_uuid::Uuid;
use typed_builder::TypedBuilder;

View File

@ -4,7 +4,6 @@ use super::{
notification::NotificationService,
LimitContext,
};
use crate::error::{Error, PostError, Result};
use async_stream::try_stream;
use diesel::{
BelongingToDsl, BoolExpressionMethods, ExpressionMethods, JoinOnDsl, OptionalExtension,
@ -15,7 +14,6 @@ use futures_util::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use garde::Validate;
use iso8601_timestamp::Timestamp;
use kitsune_config::language_detection::Configuration as LanguageDetectionConfig;
use kitsune_core::event::{post::EventType, PostEvent, PostEventEmitter};
use kitsune_db::{
model::{
account::Account,
@ -36,6 +34,7 @@ use kitsune_db::{
with_connection, with_transaction, PgPool,
};
use kitsune_embed::Client as EmbedClient;
use kitsune_error::{bail, Error, ErrorType, Result};
use kitsune_jobs::deliver::{
create::DeliverCreate,
delete::DeliverDelete,
@ -304,7 +303,6 @@ pub struct PostService {
language_detection_config: LanguageDetectionConfig,
post_resolver: PostResolver,
search_backend: kitsune_search::AnySearchBackend,
status_event_emitter: PostEventEmitter,
url_service: UrlService,
}
@ -325,7 +323,7 @@ impl PostService {
.await?
!= media_attachment_ids.len() as i64
{
return Err(PostError::BadRequest.into());
bail!(type = ErrorType::BadRequest, "tried to attach unknown attachment ids");
}
diesel::insert_into(posts_media_attachments::table)
@ -534,14 +532,6 @@ impl PostService {
.await?;
}
self.status_event_emitter
.emit(PostEvent {
r#type: EventType::Create,
post_id: post.id,
})
.await
.map_err(Error::Event)?;
Ok(post)
}
@ -567,14 +557,6 @@ impl PostService {
)
.await?;
self.status_event_emitter
.emit(PostEvent {
r#type: EventType::Delete,
post_id: post.id,
})
.await
.map_err(Error::Event)?;
self.search_backend.remove_from_index(&post.into()).await?;
Ok(())
@ -682,14 +664,6 @@ impl PostService {
.await?;
}
self.status_event_emitter
.emit(PostEvent {
r#type: EventType::Update,
post_id: post.id,
})
.await
.map_err(Error::Event)?;
Ok(post)
}
@ -774,14 +748,6 @@ impl PostService {
)
.await?;
self.status_event_emitter
.emit(PostEvent {
r#type: EventType::Create,
post_id: repost.id,
})
.await
.map_err(Error::Event)?;
Ok(repost)
}
@ -816,14 +782,6 @@ impl PostService {
)
.await?;
self.status_event_emitter
.emit(PostEvent {
r#type: EventType::Delete,
post_id: post.id,
})
.await
.map_err(Error::Event)?;
Ok(post)
}
@ -1175,10 +1133,10 @@ impl PostService {
})?;
if admin_role_count == 0 {
return Err(PostError::Unauthorised.into());
bail!(type = ErrorType::Unauthorized, "unauthorised (not an admin)");
}
} else {
return Err(PostError::Unauthorised.into());
bail!(type = ErrorType::Unauthorized, "unauthorised (not logged in)");
}
}

View File

@ -1,8 +1,8 @@
use crate::{
account::{AccountService, GetUser},
custom_emoji::{CustomEmojiService, GetEmoji},
error::{Error, Result},
};
use kitsune_error::{Error, Result};
use post_process::{BoxError, Element, Html, Render};
use speedy_uuid::Uuid;
use std::{borrow::Cow, sync::mpsc};
@ -94,7 +94,7 @@ impl PostResolver {
)
})
.await
.map_err(Error::PostProcessing)?;
.map_err(Error::msg)?;
Ok(ResolvedPost {
mentioned_accounts: mentioned_account_ids.try_iter().collect(),

View File

@ -1,16 +1,12 @@
use eyre::WrapErr;
use kitsune_cache::{ArcCache, InMemoryCache, NoopCache, RedisCache};
use kitsune_captcha::AnyCaptcha;
use kitsune_captcha::{hcaptcha::Captcha as HCaptcha, mcaptcha::Captcha as MCaptcha};
use kitsune_config::{cache, captcha, email, language_detection, messaging, search, storage};
use kitsune_config::{cache, captcha, email, language_detection, search, storage};
use kitsune_db::PgPool;
use kitsune_email::{
lettre::{message::Mailbox, AsyncSmtpTransport, Tokio1Executor},
MailSender,
};
use kitsune_messaging::{
redis::RedisMessagingBackend, tokio_broadcast::TokioBroadcastMessagingBackend, MessagingHub,
};
use kitsune_search::{AnySearchBackend, NoopSearchService, SqlSearchService};
use kitsune_storage::{fs::Storage as FsStorage, s3::Storage as S3Storage, AnyStorageBackend};
use multiplex_pool::RoundRobinStrategy;
@ -127,23 +123,6 @@ pub fn mail_sender(
.build())
}
pub async fn messaging(config: &messaging::Configuration) -> eyre::Result<MessagingHub> {
let backend = match config {
messaging::Configuration::InProcess => {
MessagingHub::new(TokioBroadcastMessagingBackend::default())
}
messaging::Configuration::Redis(ref redis_config) => {
let redis_messaging_backend = RedisMessagingBackend::new(&redis_config.url)
.await
.wrap_err("Failed to initialise Redis messaging backend")?;
MessagingHub::new(redis_messaging_backend)
}
};
Ok(backend)
}
#[allow(clippy::unused_async)] // "async" is only unused when none of the more advanced searches are compiled in
pub async fn search(
search_config: &search::Configuration,
@ -157,10 +136,15 @@ pub async fn search(
#[cfg(feature = "meilisearch")]
#[allow(clippy::used_underscore_binding)]
kitsune_search::MeiliSearchService::new(&_config.instance_url, &_config.api_key)
.await
.wrap_err("Failed to connect to Meilisearch")?
.into()
{
use eyre::WrapErr;
kitsune_search::MeiliSearchService::new(&_config.instance_url, &_config.api_key)
.await
.map_err(kitsune_error::Error::into_error)
.wrap_err("Failed to connect to Meilisearch")?
.into()
}
}
search::Configuration::Sql => SqlSearchService::builder()
.db_pool(db_pool.clone())

Some files were not shown because too many files have changed in this diff Show More