diff --git a/Cargo.lock b/Cargo.lock index 9ee1b9d..b040651 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,12 @@ version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "async-trait" version = "0.1.53" @@ -128,6 +134,17 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bigdecimal" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1e50562e37200edf7c6c43e54a08e64a5553bfb59d9c297d5572512aa517256" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bit_field" version = "0.10.1" @@ -292,7 +309,7 @@ version = "3.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da95d038ede1a964ce99f49cbe27a7fb538d1da595e4b4f70b8c8f338d17bf16" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro-error", "proc-macro2", "quote", @@ -522,6 +539,16 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if 1.0.0", + "num_cpus", +] + [[package]] name = "deflate" version = "1.0.0" @@ -917,6 +944,15 @@ dependencies = [ "http", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.0" @@ -938,6 +974,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "histogram" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" + [[package]] name = "hkdf" version = "0.12.3" @@ -1102,6 +1144,15 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.1" @@ -1225,10 +1276,11 @@ dependencies = [ "rayon", "rusoto_core", "rusoto_s3", + "scylla", "serde", "serde_json", "serde_yaml", - "strum", + "strum 0.24.0", "tokio", "tracing", "tracing-futures", @@ -1237,6 +1289,15 @@ dependencies = [ "webp", ] +[[package]] +name = "lz4_flex" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42c51df9d8d4842336c835df1d85ed447c4813baa237d033d95128bf5552ad8a" +dependencies = [ + "twox-hash", +] + [[package]] name = "mach" version = "0.3.2" @@ -1427,6 +1488,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -1478,6 +1550,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num_threads" version = "0.1.5" @@ -2105,6 +2198,45 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scylla" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8e606c6aa5d7560cbb1b65e271277b8f11ba7c3a8cc9dc57764313439afd13d" +dependencies = [ + "arc-swap", + "bigdecimal", + "byteorder", + "bytes 1.1.0", + "chrono", + "dashmap", + "futures", + "histogram", + "itertools", + "lz4_flex", + "num-bigint", + "num_enum", + "rand", + "scylla-macros", + "snap", + "strum 0.23.0", + "strum_macros 0.23.1", + "thiserror", + "tokio", + "tracing", + "uuid", +] + +[[package]] +name = "scylla-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc0caffb1274feb3df615e3260cb71a5a7a5d579adc49ba5544c87950a701c" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "security-framework" version = "2.6.1" @@ -2278,6 +2410,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snap" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" + [[package]] name = "socket2" version = "0.4.4" @@ -2315,19 +2453,44 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" + [[package]] name = "strum" version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8" dependencies = [ - "strum_macros", + "strum_macros 0.24.0", +] + +[[package]] +name = "strum_macros" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" +dependencies = [ + "heck 0.3.3", + "proc-macro2", + "quote", + "rustversion", + "syn", ] [[package]] @@ -2336,7 +2499,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "rustversion", @@ -2668,6 +2831,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "twox-hash" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0" +dependencies = [ + "cfg-if 1.0.0", + "static_assertions", +] + [[package]] name = "typenum" version = "1.15.0" @@ -2698,6 +2871,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" + [[package]] name = "unicode-xid" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index a888b97..7d992d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ strum = { version = "0.24", features = ["derive"] } # Blob storage deps rusoto_core = "0.47.0" rusoto_s3 = "0.47.0" +scylla = "0.4.3" moka = "0.8.0" rayon = "1.5.1" diff --git a/src/storage/backends/mod.rs b/src/storage/backends/mod.rs index cd2d244..eabb17d 100644 --- a/src/storage/backends/mod.rs +++ b/src/storage/backends/mod.rs @@ -1,5 +1,6 @@ mod register; mod filesystem; mod blob_storage; +mod scylladb; pub use register::BackendConfigs; \ No newline at end of file diff --git a/src/storage/backends/register.rs b/src/storage/backends/register.rs index d0ba9ae..24cfea0 100644 --- a/src/storage/backends/register.rs +++ b/src/storage/backends/register.rs @@ -7,9 +7,13 @@ use crate::StorageBackend; #[derive(Debug, Deserialize)] #[serde(rename_all = "lowercase")] pub enum BackendConfigs { - // Scylla { - // nodes: Vec, - // }, + Scylla { + nodes: Vec, + username: Option, + password: Option, + keyspace: String, + table: Option, + }, FileSystem { /// The base output directory to store files. directory: PathBuf, @@ -51,6 +55,23 @@ impl BackendConfigs { Ok(Arc::new(backend)) }, + Self::Scylla { + nodes, + username, + password, + keyspace, + table, + } => { + let backend = super::scylladb::ScyllaBackend::connect( + keyspace.clone(), + table.clone(), + nodes, + username.clone(), + password.clone(), + ).await?; + + Ok(Arc::new(backend)) + } } } } diff --git a/src/storage/backends/scylladb.rs b/src/storage/backends/scylladb.rs new file mode 100644 index 0000000..fb70c63 --- /dev/null +++ b/src/storage/backends/scylladb.rs @@ -0,0 +1,160 @@ +use anyhow::anyhow; +use bytes::Bytes; +use uuid::Uuid; +use async_trait::async_trait; +use scylla::IntoTypedRows; +use crate::config::ImageKind; +use crate::controller::get_bucket_by_id; +use crate::StorageBackend; + + +pub struct ScyllaBackend { + table: String, + connection: session::Session, +} + +impl ScyllaBackend { + pub async fn connect( + keyspace: String, + table: Option, + known_nodes: &[String], + user: Option, + password: Option, + ) -> anyhow::Result { + let mut cfg = scylla::SessionConfig::new(); + cfg.add_known_nodes(known_nodes); + cfg.auth_password = user; + cfg.auth_password = password; + + let base = scylla::Session::connect(cfg).await?; + base.use_keyspace(keyspace, false).await?; + + let connection = session::Session::from(base); + + let table = table.unwrap_or_else(|| "lust_image".to_string()); + let qry = format!("CREATE TABLE IF NOT EXISTS {} (bucket_id bigint, sizing_id bigint, image_id uuid, kind text, data blob)", table); + connection.query(&qry, &[]).await?; + + Ok(Self { + table, + connection + }) + } +} + +#[async_trait] +impl StorageBackend for ScyllaBackend { + async fn store(&self, bucket_id: u32, image_id: Uuid, kind: ImageKind, sizing_id: u32, data: Bytes) -> anyhow::Result<()> { + let qry = format!("INSERT INTO {table} (bucket_id, sizing_id, image_id, kind, data) VALUES (?, ?, ?, ?, ?);", table = self.table); + + self.connection + .query_prepared(&qry, (bucket_id as i64, image_id, kind.as_file_extension(), sizing_id as i64, data.to_vec())) + .await?; + + Ok(()) + } + + async fn fetch(&self, bucket_id: u32, image_id: Uuid, kind: ImageKind, sizing_id: u32) -> anyhow::Result> { + let qry = format!("SELECT data FROM {table} WHERE bucket_id = ? AND image_id = ? AND kind = ? AND sizing_id = ?;", table = self.table); + + let buff = self.connection + .query_prepared(&qry, (bucket_id as i64, image_id, kind.as_file_extension(), sizing_id as i64)) + .await? + .rows + .unwrap_or_default() + .into_typed::<(Vec,)>() + .next() + .transpose()? + .map(|v| Bytes::from(v.0)); + + Ok(buff) + } + + async fn delete(&self, bucket_id: u32, image_id: Uuid) -> anyhow::Result> { + let qry = format!("DELETE FROM {table} WHERE bucket_id = ? AND image_id = ? AND kind = ? AND sizing_id = ?;", table = self.table); + + let bucket = get_bucket_by_id(bucket_id) + .ok_or_else(|| anyhow!("Bucket does not exist."))? + .cfg(); + + let mut hit_entries = vec![]; + for sizing_id in bucket.sizing_preset_ids().iter().copied() { + for kind in ImageKind::variants() { + let values = (bucket_id as i64, image_id, kind.as_file_extension(), sizing_id as i64); + debug!("Purging image @ {:?}", &values); + + self.connection + .query_prepared(&qry, values) + .await?; + + hit_entries.push((sizing_id, *kind)) + } + } + + Ok(hit_entries) + } +} + +mod session { + use std::fmt::Debug; + use scylla::frame::value::ValueList; + use scylla::query::Query; + use scylla::transport::errors::{DbError, QueryError}; + use scylla::QueryResult; + + pub struct Session(scylla::CachingSession); + + impl From for Session { + fn from(s: scylla::Session) -> Self { + Self(scylla::CachingSession::from(s, 100)) + } + } + + impl AsRef for Session { + fn as_ref(&self) -> &scylla::Session { + &self.0.session + } + } + + impl Session { + #[instrument(skip(self, query), level = "debug")] + pub async fn query( + &self, + query: &str, + values: impl ValueList + Debug, + ) -> Result { + debug!("executing query {}", query); + let result = self.0.execute(query, &values).await; + + if let Err(ref e) = result { + consider_logging_error(e); + } + + result + } + + #[instrument(skip(self, query), level = "debug")] + pub async fn query_prepared( + &self, + query: &str, + values: impl ValueList + Debug, + ) -> Result { + debug!("preparing new statement: {}", query); + let result = self.0.execute(Query::from(query), &values).await; + + match result { + Ok(res) => Ok(res), + Err(e) => { + consider_logging_error(&e); + Err(e) + }, + } + } + } + + fn consider_logging_error(e: &QueryError) { + if let QueryError::DbError(DbError::AlreadyExists { .. }, ..) = e { + info!("Keyspace already exists, skipping..."); + } + } +} \ No newline at end of file