Convert to std::future

This commit is contained in:
Raphaël Thériault 2020-01-15 00:10:48 -05:00
parent 0e4b47fea9
commit 07208152e3
4 changed files with 748 additions and 692 deletions

1099
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -15,20 +15,20 @@ keywords = [
license = "MIT" license = "MIT"
[dependencies] [dependencies]
actix-files = "0.1.6" actix-files = "0.2.1"
actix-identity = "0.1.0" actix-identity = "0.2.1"
actix-web = "1.0.8" actix-rt = "1.0.0"
actix-web = "2.0.0"
base64 = "0.11.0" base64 = "0.11.0"
blake2 = "0.8.1" blake2 = "0.8.1"
chrono = "0.4.9" chrono = "0.4.10"
diesel_migrations = "1.4.0" diesel_migrations = "1.4.0"
dirs = "2.0.2" dirs = "2.0.2"
dotenv = { version = "0.15.0", optional = true } dotenv = { version = "0.15.0", optional = true }
env_logger = "0.7.1" env_logger = "0.7.1"
futures = "0.1.29"
lazy_static = "1.4.0" lazy_static = "1.4.0"
num_cpus = "1.10.1" num_cpus = "1.11.1"
toml = "0.5.3" toml = "0.5.5"
[dependencies.diesel] [dependencies.diesel]
version = "1.4.3" version = "1.4.3"
features = ["r2d2", "sqlite"] features = ["r2d2", "sqlite"]
@ -36,7 +36,7 @@ features = ["r2d2", "sqlite"]
version = "0.16.0" version = "0.16.0"
features = ["bundled"] features = ["bundled"]
[dependencies.serde] [dependencies.serde]
version = "1.0.102" version = "1.0.104"
features = ["derive"] features = ["derive"]
[features] [features]

View File

@ -33,9 +33,10 @@ pub mod setup;
pub type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>; pub type Pool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
#[cfg(not(feature = "dev"))] #[cfg(not(feature = "dev"))]
embed_migrations!("./migrations"); embed_migrations!();
fn main() { #[actix_rt::main]
async fn main() {
let config = { let config = {
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
{ {
@ -75,7 +76,7 @@ fn main() {
}; };
let port = config.port; let port = config.port;
let max_filesize = (config.max_filesize as f64 * 1.37) as usize; let max_filesize_json = (config.max_filesize as f64 * 1.37) as usize;
println!("Listening on port {}", port); println!("Listening on port {}", port);
@ -93,29 +94,29 @@ fn main() {
.route("/", web::get().to(routes::index)) .route("/", web::get().to(routes::index))
.route("/logout", web::get().to(routes::logout)) .route("/logout", web::get().to(routes::logout))
.route("/config", web::get().to(routes::get_config)) .route("/config", web::get().to(routes::get_config))
.route("/f", web::get().to_async(routes::files::gets)) .route("/f", web::get().to(routes::files::gets))
.route("/l", web::get().to_async(routes::links::gets)) .route("/l", web::get().to(routes::links::gets))
.route("/t", web::get().to_async(routes::texts::gets)) .route("/t", web::get().to(routes::texts::gets))
.route("/f/{id}", web::get().to_async(routes::files::get)) .route("/f/{id}", web::get().to(routes::files::get))
.route("/l/{id}", web::get().to_async(routes::links::get)) .route("/l/{id}", web::get().to(routes::links::get))
.route("/t/{id}", web::get().to_async(routes::texts::get)) .route("/t/{id}", web::get().to(routes::texts::get))
.service( .service(
web::resource("/f/{id}") web::resource("/f/{id}")
.data(web::Json::<routes::files::PutFile>::configure(|cfg| { .data(web::Json::<routes::files::PutFile>::configure(|cfg| {
cfg.limit(max_filesize) cfg.limit(max_filesize_json)
})) }))
.route(web::put().to_async(routes::files::put)) .route(web::put().to(routes::files::put))
.route(web::delete().to_async(routes::files::delete)), .route(web::delete().to(routes::files::delete)),
) )
.service( .service(
web::resource("/l/{id}") web::resource("/l/{id}")
.route(web::put().to_async(routes::links::put)) .route(web::put().to(routes::links::put))
.route(web::delete().to_async(routes::links::delete)), .route(web::delete().to(routes::links::delete)),
) )
.service( .service(
web::resource("/t/{id}") web::resource("/t/{id}")
.route(web::put().to_async(routes::texts::put)) .route(web::put().to(routes::texts::put))
.route(web::delete().to_async(routes::texts::delete)), .route(web::delete().to(routes::texts::delete)),
) )
}) })
.bind(&format!("localhost:{}", port)) .bind(&format!("localhost:{}", port))
@ -124,6 +125,7 @@ fn main() {
process::exit(1); process::exit(1);
}) })
.run() .run()
.await
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
eprintln!("Can't start webserver: {}.", e); eprintln!("Can't start webserver: {}.", e);
process::exit(1); process::exit(1);

View File

@ -2,11 +2,12 @@
use crate::setup::{self, Config}; use crate::setup::{self, Config};
use actix_identity::Identity; use actix_identity::Identity;
use actix_web::{error::BlockingError, web, HttpRequest, HttpResponse, Responder}; use actix_web::{error::BlockingError, web, Error, HttpRequest, HttpResponse, Responder};
use base64; use base64;
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
use diesel; use diesel;
use serde::Serialize; use serde::Serialize;
use std::convert::Infallible;
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
use crate::get_env; use crate::get_env;
@ -21,8 +22,7 @@ fn parse_id(id: &str) -> Result<i32, HttpResponse> {
} }
} }
/// Checks for authentication async fn auth(
fn auth(
identity: Identity, identity: Identity,
request: HttpRequest, request: HttpRequest,
password_hash: &[u8], password_hash: &[u8],
@ -50,8 +50,10 @@ fn auth(
let connection_string = header.replace("Basic ", ""); let connection_string = header.replace("Basic ", "");
let (user, password) = match base64::decode(&connection_string) { let (user, password) = match base64::decode(&connection_string) {
Ok(c) => { Ok(c) => {
let credentials: Vec<Vec<u8>> = let credentials: Vec<Vec<u8>> = c
c.splitn(2, |b| b == &b':').map(|s| s.to_vec()).collect(); .splitn(2, |b| b == &b':')
.map(|s| s.to_vec())
.collect::<Vec<Vec<u8>>>();
match credentials.len() { match credentials.len() {
2 => (credentials[0].clone(), credentials[1].clone()), 2 => (credentials[0].clone(), credentials[1].clone()),
_ => return Err(HttpResponse::BadRequest().body("Invalid Authorization header")), _ => return Err(HttpResponse::BadRequest().body("Invalid Authorization header")),
@ -60,7 +62,8 @@ fn auth(
Err(_) => return Err(HttpResponse::BadRequest().body("Invalid Authorization header")), Err(_) => return Err(HttpResponse::BadRequest().body("Invalid Authorization header")),
}; };
if setup::hash(password).as_slice() == password_hash { let infallible_hash = move || -> Result<Vec<u8>, Infallible> { Ok(setup::hash(password)) };
if web::block(infallible_hash).await.unwrap().as_slice() == password_hash {
match String::from_utf8(user.to_vec()) { match String::from_utf8(user.to_vec()) {
Ok(u) => { Ok(u) => {
identity.remember(u); identity.remember(u);
@ -79,24 +82,30 @@ fn auth(
#[inline(always)] #[inline(always)]
fn match_replace_result<T: Serialize>( fn match_replace_result<T: Serialize>(
result: Result<T, BlockingError<diesel::result::Error>>, result: Result<T, BlockingError<diesel::result::Error>>,
) -> Result<HttpResponse, HttpResponse> { ) -> Result<HttpResponse, Error> {
match result { match result {
Ok(x) => Ok(HttpResponse::Created().json(x)), Ok(x) => Ok(HttpResponse::Created().json(x)),
Err(_) => Err(HttpResponse::InternalServerError().body("Internal server error")), Err(_) => Err(HttpResponse::InternalServerError()
.body("Internal server error")
.into()),
} }
} }
/// Handles error from single GET queries using find /// Handles error from single GET queries using find
#[inline(always)] #[inline(always)]
fn match_find_error<T>(error: BlockingError<diesel::result::Error>) -> Result<T, HttpResponse> { fn match_find_error<T>(error: BlockingError<diesel::result::Error>) -> Result<T, Error> {
match error { match error {
BlockingError::Error(e) => match e { BlockingError::Error(e) => match e {
diesel::result::Error::NotFound => Err(HttpResponse::NotFound().body("Not found")), diesel::result::Error::NotFound => {
_ => Err(HttpResponse::InternalServerError().body("Internal server error")), Err(HttpResponse::NotFound().body("Not found").into())
}
_ => Err(HttpResponse::InternalServerError()
.body("Internal server error")
.into()),
}, },
BlockingError::Canceled => { BlockingError::Canceled => Err(HttpResponse::InternalServerError()
Err(HttpResponse::InternalServerError().body("Internal server error")) .body("Internal server error")
} .into()),
} }
} }
@ -110,25 +119,22 @@ fn timestamp_to_last_modified(timestamp: i32) -> String {
/// GET multiple entries /// GET multiple entries
macro_rules! select { macro_rules! select {
($m:ident) => { ($m:ident) => {
pub fn gets( pub async fn gets(
request: HttpRequest, request: HttpRequest,
query: actix_web::web::Query<SelectQuery>, query: actix_web::web::Query<SelectQuery>,
pool: actix_web::web::Data<Pool>, pool: actix_web::web::Data<Pool>,
identity: actix_identity::Identity, identity: actix_identity::Identity,
password_hash: actix_web::web::Data<Vec<u8>>, password_hash: actix_web::web::Data<Vec<u8>>,
) -> impl futures::Future<Item = actix_web::HttpResponse, Error = actix_web::Error> { ) -> Result<actix_web::HttpResponse, actix_web::Error> {
crate::routes::auth(identity, request, &password_hash).await?;
let filters = crate::queries::SelectFilters::from(query.into_inner()); let filters = crate::queries::SelectFilters::from(query.into_inner());
futures::future::result(crate::routes::auth(identity, request, &password_hash)) match actix_web::web::block(move || crate::queries::$m::select(filters, pool)).await {
.and_then(move |_| { Ok(x) => Ok(actix_web::HttpResponse::Ok().json(x)),
actix_web::web::block(move || crate::queries::$m::select(filters, pool)).then( Err(_) => Err(actix_web::HttpResponse::InternalServerError()
|result| match result { .body("Internal server error")
Ok(x) => Ok(actix_web::HttpResponse::Ok().json(x)), .into()),
Err(_) => Err(actix_web::HttpResponse::InternalServerError() }
.body("Internal server error")),
},
)
})
.from_err()
} }
}; };
} }
@ -136,24 +142,20 @@ macro_rules! select {
/// DELETE an entry /// DELETE an entry
macro_rules! delete { macro_rules! delete {
($m:ident) => { ($m:ident) => {
pub fn delete( pub async fn delete(
request: HttpRequest, request: HttpRequest,
path: actix_web::web::Path<String>, path: actix_web::web::Path<String>,
pool: actix_web::web::Data<Pool>, pool: actix_web::web::Data<Pool>,
identity: actix_identity::Identity, identity: actix_identity::Identity,
password_hash: actix_web::web::Data<Vec<u8>>, password_hash: actix_web::web::Data<Vec<u8>>,
) -> impl futures::Future<Item = actix_web::HttpResponse, Error = actix_web::Error> { ) -> Result<actix_web::HttpResponse, actix_web::Error> {
futures::future::result(crate::routes::auth(identity, request, &password_hash)) crate::routes::auth(identity, request, &password_hash).await?;
.and_then(move |_| futures::future::result(crate::routes::parse_id(&path)))
.and_then(move |id| { let id = crate::routes::parse_id(&path)?;
actix_web::web::block(move || crate::queries::$m::delete(id, pool)).then( match actix_web::web::block(move || crate::queries::$m::delete(id, pool)).await {
|result| match result { Ok(()) => Ok(actix_web::HttpResponse::NoContent().body("Deleted")),
Ok(()) => Ok(actix_web::HttpResponse::NoContent().body("Deleted")), Err(e) => crate::routes::match_find_error(e),
Err(e) => crate::routes::match_find_error(e), }
},
)
})
.from_err()
} }
}; };
} }
@ -195,12 +197,12 @@ lazy_static! {
} }
/// Index page letting users upload via a UI /// Index page letting users upload via a UI
pub fn index( pub async fn index(
request: HttpRequest, request: HttpRequest,
identity: Identity, identity: Identity,
password_hash: web::Data<Vec<u8>>, password_hash: web::Data<Vec<u8>>,
) -> impl Responder { ) -> impl Responder {
if let Err(response) = auth(identity, request, &password_hash) { if let Err(response) = auth(identity, request, &password_hash).await {
return response; return response;
} }
@ -215,7 +217,7 @@ pub fn index(
} }
#[cfg(not(feature = "dev"))] #[cfg(not(feature = "dev"))]
{ {
INDEX_CONTENTS.clone() (&*INDEX_CONTENTS).clone()
} }
}; };
@ -225,20 +227,20 @@ pub fn index(
} }
/// GET the config info /// GET the config info
pub fn get_config( pub async fn get_config(
request: HttpRequest, request: HttpRequest,
config: web::Data<Config>, config: web::Data<Config>,
identity: Identity, identity: Identity,
password_hash: web::Data<Vec<u8>>, password_hash: web::Data<Vec<u8>>,
) -> impl Responder { ) -> impl Responder {
match auth(identity, request, &password_hash) { match auth(identity, request, &password_hash).await {
Ok(_) => HttpResponse::Ok().json(config.get_ref()), Ok(_) => HttpResponse::Ok().json(config.get_ref()),
Err(response) => response, Err(response) => response,
} }
} }
/// Logout route /// Logout route
pub fn logout(identity: Identity) -> impl Responder { pub async fn logout(identity: Identity) -> impl Responder {
if identity.identity().is_some() { if identity.identity().is_some() {
identity.forget(); identity.forget();
HttpResponse::Ok().body("Logged out") HttpResponse::Ok().body("Logged out")
@ -260,34 +262,28 @@ pub mod files {
use actix_identity::Identity; use actix_identity::Identity;
use actix_web::{error::BlockingError, http, web, Error, HttpRequest, HttpResponse}; use actix_web::{error::BlockingError, http, web, Error, HttpRequest, HttpResponse};
use chrono::Utc; use chrono::Utc;
use futures::{future, Future};
use std::{fs, path::PathBuf}; use std::{fs, path::PathBuf};
select!(files); select!(files);
/// GET a file entry and statically serve it /// GET a file entry and statically serve it
pub fn get( pub async fn get(
path: web::Path<String>, path: web::Path<String>,
pool: web::Data<Pool>, pool: web::Data<Pool>,
config: web::Data<Config>, config: web::Data<Config>,
) -> impl Future<Item = NamedFile, Error = Error> { ) -> Result<NamedFile, Error> {
future::result(parse_id(&path)) let id = parse_id(&path)?;
.and_then(move |id| { match web::block(move || queries::files::find(id, pool)).await {
web::block(move || queries::files::find(id, pool)).then( Ok(file) => {
move |result| match result { let mut path = config.files_dir.clone();
Ok(file) => { path.push(file.filepath);
let mut path = config.files_dir.clone(); match NamedFile::open(&path) {
path.push(file.filepath); Ok(nf) => Ok(nf),
match NamedFile::open(&path) { Err(_) => Err(HttpResponse::NotFound().body("Not found").into()),
Ok(nf) => Ok(nf), }
Err(_) => Err(HttpResponse::NotFound().body("Not found")), }
} Err(e) => match_find_error(e),
} }
Err(e) => match_find_error(e),
},
)
})
.from_err()
} }
/// Request body when PUTting files /// Request body when PUTting files
@ -298,7 +294,7 @@ pub mod files {
} }
/// PUT a new file entry /// PUT a new file entry
pub fn put( pub async fn put(
request: HttpRequest, request: HttpRequest,
path: web::Path<String>, path: web::Path<String>,
body: web::Json<PutFile>, body: web::Json<PutFile>,
@ -306,51 +302,50 @@ pub mod files {
config: web::Data<Config>, config: web::Data<Config>,
identity: Identity, identity: Identity,
password_hash: web::Data<Vec<u8>>, password_hash: web::Data<Vec<u8>>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> Result<HttpResponse, Error> {
future::result(auth(identity, request, &password_hash)) auth(identity, request, &password_hash).await?;
.and_then(move |_| future::result(parse_id(&path)))
.and_then(move |id| {
web::block(move || {
let mut path = config.files_dir.clone();
let mut relative_path = PathBuf::new();
if fs::create_dir_all(&path).is_err() {
return Err(http::StatusCode::from_u16(500).unwrap());
}
let mut filename = body.filename.clone(); let id = parse_id(&path)?;
filename = format!("{:x}.{}", Utc::now().timestamp(), filename); let result = web::block(move || {
path.push(&filename); let mut path = config.files_dir.clone();
relative_path.push(&filename); let mut relative_path = PathBuf::new();
if fs::create_dir_all(&path).is_err() {
return Err(http::StatusCode::from_u16(500).unwrap());
}
let relative_path = match relative_path.to_str() { let mut filename = body.filename.clone();
Some(rp) => rp, filename = format!("{:x}.{}", Utc::now().timestamp(), filename);
None => return Err(http::StatusCode::from_u16(500).unwrap()), path.push(&filename);
}; relative_path.push(&filename);
let contents = match base64::decode(&body.base64) { let relative_path = match relative_path.to_str() {
Ok(contents) => contents, Some(rp) => rp,
Err(_) => return Err(http::StatusCode::from_u16(400).unwrap()), None => return Err(http::StatusCode::from_u16(500).unwrap()),
}; };
if fs::write(&path, contents).is_err() {
return Err(http::StatusCode::from_u16(500).unwrap());
}
match queries::files::replace(id, relative_path, pool) { let contents = match base64::decode(&body.base64) {
Ok(file) => Ok(file), Ok(contents) => contents,
Err(_) => Err(http::StatusCode::from_u16(500).unwrap()), Err(_) => return Err(http::StatusCode::from_u16(400).unwrap()),
} };
}) if fs::write(&path, contents).is_err() {
.then(|result| match result { return Err(http::StatusCode::from_u16(500).unwrap());
Ok(file) => Ok(HttpResponse::Created().json(file)), }
Err(e) => match e {
BlockingError::Error(sc) => Err(HttpResponse::new(sc)), match queries::files::replace(id, relative_path, pool) {
BlockingError::Canceled => { Ok(file) => Ok(file),
Err(HttpResponse::InternalServerError().body("Internal server error")) Err(_) => Err(http::StatusCode::from_u16(500).unwrap()),
} }
}, })
}) .await;
}) match result {
.from_err() Ok(file) => Ok(HttpResponse::Created().json(file)),
Err(e) => match e {
BlockingError::Error(sc) => Err(HttpResponse::new(sc).into()),
BlockingError::Canceled => Err(HttpResponse::InternalServerError()
.body("Internal server error")
.into()),
},
}
} }
delete!(files); delete!(files);
@ -366,26 +361,22 @@ pub mod links {
}; };
use actix_identity::Identity; use actix_identity::Identity;
use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web::{web, Error, HttpRequest, HttpResponse};
use futures::{future, Future};
select!(links); select!(links);
/// GET a link entry and redirect to it /// GET a link entry and redirect to it
pub fn get( pub async fn get(
path: web::Path<String>, path: web::Path<String>,
pool: web::Data<Pool>, pool: web::Data<Pool>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> Result<HttpResponse, Error> {
future::result(parse_id(&path)) let id = parse_id(&path)?;
.and_then(move |id| { match web::block(move || queries::links::find(id, pool)).await {
web::block(move || queries::links::find(id, pool)).then(|result| match result { Ok(link) => Ok(HttpResponse::Found()
Ok(link) => Ok(HttpResponse::Found() .header("Location", link.forward)
.header("Location", link.forward) .header("Last-Modified", timestamp_to_last_modified(link.created))
.header("Last-Modified", timestamp_to_last_modified(link.created)) .finish()),
.finish()), Err(e) => match_find_error(e),
Err(e) => match_find_error(e), }
})
})
.from_err()
} }
/// Request body when PUTting links /// Request body when PUTting links
@ -395,21 +386,20 @@ pub mod links {
} }
/// PUT a new link entry /// PUT a new link entry
pub fn put( pub async fn put(
request: HttpRequest, request: HttpRequest,
path: web::Path<String>, path: web::Path<String>,
body: web::Json<PutLink>, body: web::Json<PutLink>,
pool: web::Data<Pool>, pool: web::Data<Pool>,
identity: Identity, identity: Identity,
password_hash: web::Data<Vec<u8>>, password_hash: web::Data<Vec<u8>>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> Result<HttpResponse, Error> {
future::result(auth(identity, request, &password_hash)) auth(identity, request, &password_hash).await?;
.and_then(move |_| future::result(parse_id(&path)))
.and_then(move |id| { let id = parse_id(&path)?;
web::block(move || queries::links::replace(id, &body.forward, pool)) match_replace_result(
.then(match_replace_result) web::block(move || queries::links::replace(id, &body.forward, pool)).await,
}) )
.from_err()
} }
delete!(links); delete!(links);
@ -425,25 +415,21 @@ pub mod texts {
}; };
use actix_identity::Identity; use actix_identity::Identity;
use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web::{web, Error, HttpRequest, HttpResponse};
use futures::{future, Future};
select!(texts); select!(texts);
/// GET a text entry and display it /// GET a text entry and display it
pub fn get( pub async fn get(
path: web::Path<String>, path: web::Path<String>,
pool: web::Data<Pool>, pool: web::Data<Pool>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> Result<HttpResponse, Error> {
future::result(parse_id(&path)) let id = parse_id(&path)?;
.and_then(move |id| { match web::block(move || queries::texts::find(id, pool)).await {
web::block(move || queries::texts::find(id, pool)).then(|result| match result { Ok(text) => Ok(HttpResponse::Ok()
Ok(text) => Ok(HttpResponse::Ok() .header("Last-Modified", timestamp_to_last_modified(text.created))
.header("Last-Modified", timestamp_to_last_modified(text.created)) .body(text.contents)),
.body(text.contents)), Err(e) => match_find_error(e),
Err(e) => match_find_error(e), }
})
})
.from_err()
} }
/// Request body when PUTting texts /// Request body when PUTting texts
@ -453,21 +439,20 @@ pub mod texts {
} }
/// PUT a new text entry /// PUT a new text entry
pub fn put( pub async fn put(
request: HttpRequest, request: HttpRequest,
path: web::Path<String>, path: web::Path<String>,
body: web::Json<PutText>, body: web::Json<PutText>,
pool: web::Data<Pool>, pool: web::Data<Pool>,
identity: Identity, identity: Identity,
password_hash: web::Data<Vec<u8>>, password_hash: web::Data<Vec<u8>>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> Result<HttpResponse, Error> {
future::result(auth(identity, request, &password_hash)) auth(identity, request, &password_hash).await?;
.and_then(move |_| future::result(parse_id(&path)))
.and_then(move |id| { let id = parse_id(&path)?;
web::block(move || queries::texts::replace(id, &body.contents, pool)) match_replace_result(
.then(match_replace_result) web::block(move || queries::texts::replace(id, &body.contents, pool)).await,
}) )
.from_err()
} }
delete!(texts); delete!(texts);