Cleanup upload logic for better efficiency and more indepth timing

This commit is contained in:
Harrison Burt 2022-04-03 13:23:14 +01:00
parent 9b18cc3773
commit 5058714ba5
1 changed files with 60 additions and 48 deletions

View File

@ -1,11 +1,11 @@
use std::hash::Hash;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use uuid::Uuid;
use poem_openapi::Object;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::time::Instant;
use crate::cache::{Cache, global_cache};
use crate::config::{BucketConfig, ImageKind};
@ -62,6 +62,9 @@ pub struct UploadInfo {
/// The time spent processing the image in seconds.
processing_time: f32,
/// The time spent uploading the image to the persistent store.
io_time: f32,
/// The crc32 checksum of the uploaded image.
checksum: u32,
@ -77,7 +80,7 @@ pub struct UploadInfo {
pub struct BucketController {
bucket_id: u32,
cache: Option<Cache>,
cache: Option<Arc<Cache>>,
global_limiter: Option<Arc<Semaphore>>,
config: BucketConfig,
pipeline: PipelineController,
@ -96,7 +99,7 @@ impl BucketController {
) -> Self {
Self {
bucket_id,
cache,
cache: cache.map(Arc::new),
global_limiter,
limiter: config.max_concurrency.map(Semaphore::new),
config,
@ -114,44 +117,27 @@ impl BucketController {
debug!("Uploading processed image with kind: {:?} and is {} bytes in size.", kind, data.len());
let _permit = get_optional_permit(&self.global_limiter, &self.limiter).await?;
let start = Instant::now();
let processing_start = Instant::now();
let checksum = crc32fast::hash(&data);
let pipeline = self.pipeline.clone();
let result = tokio::task::spawn_blocking(move || {
pipeline.on_upload(kind, data)
}).await??;
let processing_time = processing_start.elapsed();
let mut image_upload_info = vec![];
let image_id = Uuid::new_v4();
for store_entry in result.result.to_store {
self.storage
.store(
self.bucket_id,
image_id,
store_entry.kind,
store_entry.sizing_id,
store_entry.data.clone(),
).await?;
image_upload_info.push(ImageUploadInfo { sizing_id: store_entry.sizing_id });
if let Some(ref cache) = self.cache {
let cache_key = self.cache_key(
store_entry.sizing_id,
image_id,
store_entry.kind,
);
cache.insert(cache_key, store_entry.data);
}
}
let io_start = Instant::now();
let image_upload_info = self.concurrent_upload(image_id, result.result.to_store).await?;
let io_time = io_start.elapsed();
Ok(UploadInfo {
checksum,
image_id,
bucket_id: self.bucket_id,
images: image_upload_info,
processing_time: start.elapsed().as_secs_f32(),
processing_time: processing_time.as_secs_f32(),
io_time: io_time.as_secs_f32(),
})
}
@ -227,26 +213,7 @@ impl BucketController {
pipeline.on_fetch(desired_kind, retrieved_kind, data, sizing_id, custom_sizing)
}).await??;
let mut tasks = vec![];
for store_entry in result.result.to_store {
let storage = self.storage.clone();
let bucket_id = self.bucket_id;
let t = tokio::spawn(async move {
storage.store(
bucket_id,
image_id,
store_entry.kind,
store_entry.sizing_id,
store_entry.data,
).await
});
tasks.push(t);
}
for task in tasks {
task.await??;
}
self.concurrent_upload(image_id, result.result.to_store).await?;
Ok(result.result.response)
}
@ -288,7 +255,7 @@ impl BucketController {
) -> anyhow::Result<Option<Bytes>> {
let maybe_cache_backend = self.cache
.as_ref()
.map(Some)
.map(|v| Some(v.as_ref()))
.unwrap_or_else(global_cache);
let cache_key = self.cache_key(sizing_id, image_id, fetch_kind);
@ -314,4 +281,49 @@ impl BucketController {
Ok(maybe_existing)
}
async fn concurrent_upload(
&self,
image_id: Uuid,
to_store: Vec<StoreEntry>,
) -> anyhow::Result<Vec<ImageUploadInfo>> {
let mut image_upload_info = vec![];
let mut tasks = vec![];
for store_entry in to_store {
image_upload_info.push(ImageUploadInfo { sizing_id: store_entry.sizing_id });
let storage = self.storage.clone();
let bucket_id = self.bucket_id;
let cache = self.cache.clone();
let cache_key = self.cache_key(
store_entry.sizing_id,
image_id,
store_entry.kind,
);
let t = tokio::spawn(async move {
storage.store(
bucket_id,
image_id,
store_entry.kind,
store_entry.sizing_id,
store_entry.data.clone(),
).await?;
if let Some(ref cache) = cache {
cache.insert(cache_key, store_entry.data);
}
Ok::<_, anyhow::Error>(())
});
tasks.push(t);
}
for task in tasks {
task.await??;
}
Ok(image_upload_info)
}
}