Add s3 bucket backend

This commit is contained in:
Harrison Burt 2022-03-29 21:52:25 +01:00
parent 3da4b01c70
commit 4846d75d23
4 changed files with 175 additions and 292 deletions

329
Cargo.lock generated
View File

@ -59,12 +59,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ahash"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e"
[[package]]
name = "ahash"
version = "0.7.6"
@ -111,22 +105,6 @@ dependencies = [
"syn",
]
[[package]]
name = "attohttpc"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e69e13a99a7e6e070bb114f7ff381e58c7ccc188630121fc4c2fe4bcf24cd072"
dependencies = [
"http",
"log",
"native-tls",
"openssl",
"serde",
"serde_json",
"url",
"wildmatch",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -144,31 +122,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-creds"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460a75eac8f3cb7683e0a9a588a83c3ff039331ea7bfbfbfcecf1dacab276e11"
dependencies = [
"anyhow",
"attohttpc",
"dirs",
"rust-ini",
"serde",
"serde-xml-rs",
"serde_derive",
"url",
]
[[package]]
name = "aws-region"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10110ddbd800fb47e6bef95e88fc13495795d252f585272a4fa3ac4f5b2e0a4d"
dependencies = [
"anyhow",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -259,6 +212,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time 0.1.43",
"winapi",
]
@ -542,34 +496,26 @@ dependencies = [
]
[[package]]
name = "dirs"
version = "4.0.0"
name = "dirs-next"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
dependencies = [
"dirs-sys",
"cfg-if",
"dirs-sys-next",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
name = "dirs-sys-next"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "dlv-list"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68df3f2b690c1b86e65ef7830956aededf3cb0a16f898f79b9a6f421a7b6211b"
dependencies = [
"rand",
]
[[package]]
name = "either"
version = "1.6.1"
@ -847,15 +793,6 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hashbrown"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
dependencies = [
"ahash 0.4.7",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
@ -868,7 +805,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c21d40587b92fa6a6c6e3c1bdbf87d75511db5672f9c93175574b3a00df1758"
dependencies = [
"ahash 0.7.6",
"ahash",
]
[[package]]
@ -1081,12 +1018,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "ipnet"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35e70ee094dc02fd9c13fdad4940090f22dbd6ac7c9e7094a46cf0232a50bc7c"
[[package]]
name = "itoa"
version = "1.0.1"
@ -1207,7 +1138,8 @@ dependencies = [
"poem",
"poem-openapi",
"rayon",
"rust-s3",
"rusoto_core",
"rusoto_s3",
"serde",
"serde_json",
"serde_yaml",
@ -1227,22 +1159,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "maybe-async"
version = "0.2.6"
name = "md-5"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6"
checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15"
dependencies = [
"proc-macro2",
"quote",
"syn",
"block-buffer 0.9.0",
"digest 0.9.0",
"opaque-debug",
]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.4.1"
@ -1273,15 +1199,6 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minidom"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332592c2149fc7dd40a64fc9ef6f0d65607284b474cef9817d1fc8c7e7b3608e"
dependencies = [
"quick-xml",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
@ -1484,16 +1401,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "ordered-multimap"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c672c7ad9ec066e428c00eb917124a06f08db19e2584de982cc34b1f4c12485"
dependencies = [
"dlv-list",
"hashbrown 0.9.1",
]
[[package]]
name = "os_str_bytes"
version = "6.0.0"
@ -1753,15 +1660,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quick-xml"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26aab6b48e2590e4a64d1ed808749ba06257882b461d01ca71baeb747074a6dd"
dependencies = [
"memchr",
]
[[package]]
name = "quote"
version = "1.0.17"
@ -1873,81 +1771,85 @@ dependencies = [
]
[[package]]
name = "reqwest"
version = "0.11.10"
name = "rusoto_core"
version = "0.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb"
checksum = "5b4f000e8934c1b4f70adde180056812e7ea6b1a247952db8ee98c94cd3116cc"
dependencies = [
"async-trait",
"base64",
"bytes 1.1.0",
"crc32fast",
"futures",
"http",
"hyper",
"hyper-tls",
"lazy_static",
"log",
"rusoto_credential",
"rusoto_signature",
"rustc_version",
"serde",
"serde_json",
"tokio",
"xml-rs",
]
[[package]]
name = "rusoto_credential"
version = "0.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a46b67db7bb66f5541e44db22b0a02fed59c9603e146db3a9e633272d3bac2f"
dependencies = [
"async-trait",
"chrono",
"dirs-next",
"futures",
"hyper",
"serde",
"serde_json",
"shlex",
"tokio",
"zeroize",
]
[[package]]
name = "rusoto_s3"
version = "0.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027"
dependencies = [
"async-trait",
"bytes 1.1.0",
"futures",
"rusoto_core",
"xml-rs",
]
[[package]]
name = "rusoto_signature"
version = "0.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6264e93384b90a747758bcc82079711eacf2e755c3a8b5091687b5349d870bcc"
dependencies = [
"base64",
"bytes 1.1.0",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"native-tls",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-util 0.6.9",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "rust-ini"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63471c4aa97a1cf8332a5f97709a79a4234698de6a1f5087faf66f2dae810e22"
dependencies = [
"cfg-if",
"ordered-multimap",
]
[[package]]
name = "rust-s3"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff7c04dc81e5159a1ecc8594361f61cb8a62d4e4d0f5b0a0b4b48c463a55910f"
dependencies = [
"anyhow",
"async-trait",
"aws-creds",
"aws-region",
"base64",
"cfg-if",
"chrono",
"digest 0.9.0",
"futures",
"hex",
"hmac 0.11.0",
"http",
"hyper",
"log",
"maybe-async",
"md5",
"minidom",
"md-5",
"percent-encoding",
"reqwest",
"pin-project-lite",
"rusoto_credential",
"rustc_version",
"serde",
"serde-xml-rs",
"serde_derive",
"sha2 0.9.9",
"time 0.3.9",
"tokio",
"tokio-stream",
"url",
]
[[package]]
@ -2031,18 +1933,6 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-xml-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa"
dependencies = [
"log",
"serde",
"thiserror",
"xml-rs",
]
[[package]]
name = "serde_derive"
version = "1.0.136"
@ -2133,6 +2023,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -2645,18 +2541,6 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb6ec270a31b1d3c7e266b999739109abce8b6c87e4b31fcfcd788b65267395"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.79"
@ -2686,16 +2570,6 @@ version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2"
[[package]]
name = "web-sys"
version = "0.3.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webp"
version = "0.2.0"
@ -2711,12 +2585,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b77fdfd5a253be4ab714e4ffa3c49caf146b4de743e97510c0656cf90f1e8e"
[[package]]
name = "wildmatch"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6c48bd20df7e4ced539c12f570f937c6b4884928a87fee70a479d72f031d4e0"
[[package]]
name = "winapi"
version = "0.3.9"
@ -2791,15 +2659,6 @@ version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "xml-rs"
version = "0.8.4"
@ -2814,3 +2673,9 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "zeroize"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7eb5728b8afd3f280a869ce1d4c554ffaed35f45c231fc41bfbd0381bef50317"

View File

@ -24,7 +24,10 @@ mimalloc = { version = "*", default-features = false }
clap = { version = "3", features = ["derive", "env"] }
strum = { version = "0.24", features = ["derive"] }
rust-s3 = "0.30"
# Blob storage deps
rusoto_core = "0.47.0"
rusoto_s3 = "0.47.0"
rayon = "1.5.1"
crc32fast = "1.3.2"
enum_dispatch = "0.3.8"

View File

@ -1,17 +1,24 @@
use std::time::Duration;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use bytes::Bytes;
use s3::{Bucket, Region};
use s3::creds::Credentials;
use rusoto_core::credential::{AutoRefreshingProvider, ChainProvider};
use rusoto_core::{HttpClient, HttpConfig, Region};
use rusoto_s3::{DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3, StreamingBody};
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use crate::config::ImageKind;
use crate::controller::get_bucket_by_id;
use crate::StorageBackend;
/// A credential timeout.
const CREDENTIAL_TIMEOUT: u64 = 5;
pub struct BlobStorageBackend {
bucket: Bucket,
bucket_name: String,
client: S3Client,
store_public: bool,
}
impl BlobStorageBackend {
@ -20,19 +27,32 @@ impl BlobStorageBackend {
name: String,
region: String,
endpoint: String,
access_key: Option<&str>,
secret_key: Option<&str>,
security_token: Option<&str>,
session_token: Option<&str>,
request_timeout: Option<Duration>,
store_public: bool,
) -> Result<Self> {
let creds = Credentials::new(access_key, secret_key, security_token, session_token, None)?;
let region = Region::Custom { region, endpoint };
let mut bucket = Bucket::new(&name, region, creds)?;
bucket.set_request_timeout(request_timeout);
let mut chain_provider = ChainProvider::new();
chain_provider.set_timeout(Duration::from_secs(CREDENTIAL_TIMEOUT));
let credentials_provider = AutoRefreshingProvider::new(chain_provider)
.with_context(|| "Failed to fetch credentials for the object storage.")?;
let mut http_config: HttpConfig = HttpConfig::default();
http_config.pool_idle_timeout(std::time::Duration::from_secs(10));
let http_client = HttpClient::new_with_config(http_config)
.with_context(|| "Failed to create request dispatcher")?;
let region = Region::Custom { name: region, endpoint };
let client = S3Client::new_with(
http_client,
credentials_provider,
region,
);
Ok(Self {
bucket
bucket_name: name,
client,
store_public,
})
}
@ -61,12 +81,18 @@ impl StorageBackend for BlobStorageBackend {
let store_in = self.format_path(bucket_id, sizing_id, image_id, kind);
debug!("Storing image in bucket @ {}", &store_in);
let (_, code) = self.bucket.put_object(store_in, &data).await?;
if code != 200 {
Err(anyhow!("Remote storage bucket did not respond correctly, expected status 200 got {}", code))
} else {
Ok(())
}
let request = PutObjectRequest {
bucket: self.bucket_name.clone(),
key: store_in,
body: Some(StreamingBody::from(data.to_vec())),
content_length: Some(data.len() as i64),
acl: if self.store_public { Some("public-read".to_string()) } else { None },
..Default::default()
};
self.client.put_object(request).await?;
Ok(())
}
async fn fetch(
@ -79,13 +105,24 @@ impl StorageBackend for BlobStorageBackend {
let store_in = self.format_path(bucket_id, sizing_id, image_id, kind);
debug!("Retrieving image in bucket @ {}", &store_in);
let (data, code) = self.bucket.get_object(store_in).await?;
if code == 404 {
Ok(None)
} else if code != 200 {
Err(anyhow!("Remote storage bucket did not respond correctly, expected status 200 got {}", code))
let request = GetObjectRequest {
key: store_in,
bucket: self.bucket_name.clone(),
..Default::default()
};
let res = self.client.get_object(request).await?;
let content_length = res.content_length.unwrap_or(0) as usize;
if let Some(body) = res.body {
let mut buffer = Vec::with_capacity(content_length);
body
.into_async_read()
.read_to_end(&mut buffer)
.await?;
Ok(Some(buffer.into()))
} else {
Ok(Some(data.into()))
Ok(None)
}
}
@ -103,13 +140,12 @@ impl StorageBackend for BlobStorageBackend {
let store_in = self.format_path(bucket_id, sizing_id, image_id, *kind);
debug!("Purging file in bucket @ {}", &store_in);
let (_, code) = self.bucket.delete_object(store_in).await?;
if code != 200 && code != 404 {
return Err(anyhow!(
"Remote storage bucket did not respond correctly, \
expected status 200 got {}", code
))
}
let request = DeleteObjectRequest {
bucket: self.bucket_name.clone(),
key: store_in,
..Default::default()
};
self.client.delete_object(request).await?;
}
}

View File

@ -1,6 +1,5 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use serde::Deserialize;
use crate::StorageBackend;
@ -25,20 +24,9 @@ pub enum BackendConfigs {
/// The bucket endpoint.
endpoint: String,
/// The optional bucket access_key.
access_key: Option<String>,
/// The optional bucket secret key.
secret_key: Option<String>,
/// The optional bucket security token.
security_token: Option<String>,
/// The optional bucket session token.
session_token: Option<String>,
/// A optional request timeout in seconds.
request_timeout: Option<u32>,
#[serde(default)]
/// Store objects with the `public-read` acl.
store_public: bool,
}
}
@ -52,22 +40,13 @@ impl BackendConfigs {
name,
region,
endpoint,
access_key,
secret_key,
security_token,
session_token,
request_timeout,
store_public,
} => {
let timeout = request_timeout.map(|v| Duration::from_secs(v as u64));
let backend = super::blob_storage::BlobStorageBackend::new(
name.to_string(),
region.to_string(),
endpoint.to_string(),
access_key.as_ref().map(|v| v.as_str()),
secret_key.as_ref().map(|v| v.as_str()),
security_token.as_ref().map(|v| v.as_str()),
session_token.as_ref().map(|v| v.as_str()),
timeout,
*store_public,
)?;
Ok(Arc::new(backend))