meta: initial commit
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
/target
|
||||||
|
|
||||||
|
.history/
|
||||||
2032
Cargo.lock
generated
Normal file
2032
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
41
Cargo.toml
Normal file
41
Cargo.toml
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
[package]
|
||||||
|
name = "katniss-cdn"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-compression = { version = "0.4.32", features = [
|
||||||
|
"brotli",
|
||||||
|
"tokio",
|
||||||
|
"zstd",
|
||||||
|
] }
|
||||||
|
axum = { version = "0.8.6", features = ["http2", "multipart"] }
|
||||||
|
axum-extra = { version = "0.10.3", features = [
|
||||||
|
"file-stream",
|
||||||
|
"form",
|
||||||
|
"query",
|
||||||
|
"typed-header",
|
||||||
|
"typed-routing",
|
||||||
|
] }
|
||||||
|
base32 = "0.5.1"
|
||||||
|
base64 = "0.22.1"
|
||||||
|
bytes = "1.10.1"
|
||||||
|
clap = { version = "4.5.48", features = ["derive", "env"] }
|
||||||
|
ecow = { version = "0.2.6", features = ["serde"] }
|
||||||
|
futures = "0.3.31"
|
||||||
|
mime_guess = "2.0.5"
|
||||||
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
|
serde_with = "3.15.0"
|
||||||
|
sha2 = "0.10.9"
|
||||||
|
snafu = { version = "0.8.9", features = ["futures"] }
|
||||||
|
tempfile = "3.23.0"
|
||||||
|
tokio = { version = "1.47.1", features = [
|
||||||
|
"rt-multi-thread",
|
||||||
|
"macros",
|
||||||
|
"fs",
|
||||||
|
"net",
|
||||||
|
] }
|
||||||
|
tokio-util = { version = "0.7.16", features = ["io", "io-util"] }
|
||||||
|
tower-http = { version = "0.6.6", features = ["cors", "fs"] }
|
||||||
|
tracing = "0.1.41"
|
||||||
|
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
|
||||||
298
src/bin/server.rs
Normal file
298
src/bin/server.rs
Normal file
@@ -0,0 +1,298 @@
|
|||||||
|
use axum::Router;
|
||||||
|
use clap::Parser;
|
||||||
|
use snafu::{ResultExt, Snafu};
|
||||||
|
use std::{
|
||||||
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
use tracing::level_filters::LevelFilter;
|
||||||
|
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
struct Args {
|
||||||
|
#[arg(long = "logging", env = "RUST_LOG", default_value_t = EnvFilter::new("").add_directive(LevelFilter::INFO.into()))]
|
||||||
|
env_filter: EnvFilter,
|
||||||
|
|
||||||
|
#[arg(long, env, default_value_t = Ipv4Addr::new(127, 0, 0, 1).into())]
|
||||||
|
ip: IpAddr,
|
||||||
|
|
||||||
|
#[arg(long, env, default_value_t = 1498)]
|
||||||
|
port: u16,
|
||||||
|
|
||||||
|
#[arg(long, env)]
|
||||||
|
content_directory: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Snafu)]
|
||||||
|
enum MainError {
|
||||||
|
BindError { source: std::io::Error },
|
||||||
|
ServeError { source: std::io::Error },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct AppState {
|
||||||
|
content_directory: Arc<Path>,
|
||||||
|
}
|
||||||
|
|
||||||
|
mod routes {
|
||||||
|
use axum::Router;
|
||||||
|
|
||||||
|
use crate::AppState;
|
||||||
|
|
||||||
|
mod content {
|
||||||
|
use axum::Router;
|
||||||
|
use tower_http::cors::CorsLayer;
|
||||||
|
|
||||||
|
use crate::AppState;
|
||||||
|
|
||||||
|
mod sha512 {
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
Router,
|
||||||
|
extract::State,
|
||||||
|
http::StatusCode,
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use axum_extra::{
|
||||||
|
TypedHeader,
|
||||||
|
headers::{CacheControl, ContentEncoding, ContentType, ETag, Expires, IfNoneMatch},
|
||||||
|
response::FileStream,
|
||||||
|
routing::{RouterExt, TypedPath},
|
||||||
|
};
|
||||||
|
use base64::{DecodeError, Engine, engine::general_purpose::STANDARD};
|
||||||
|
use ecow::EcoString;
|
||||||
|
use mime_guess::{MimeGuess, mime};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_with::{DeserializeFromStr, SerializeDisplay};
|
||||||
|
use snafu::{Report, ResultExt, Snafu};
|
||||||
|
use std::{
|
||||||
|
fmt,
|
||||||
|
io::ErrorKind,
|
||||||
|
str::FromStr,
|
||||||
|
time::{Duration, SystemTime},
|
||||||
|
};
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio_util::io::ReaderStream;
|
||||||
|
|
||||||
|
use crate::AppState;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, DeserializeFromStr, SerializeDisplay)]
|
||||||
|
struct Base64Standard(Vec<u8>);
|
||||||
|
|
||||||
|
impl fmt::Display for Base64Standard {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{}", STANDARD.encode(&self.0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for Base64Standard {
|
||||||
|
type Err = DecodeError;
|
||||||
|
|
||||||
|
fn from_str(input: &str) -> Result<Self, Self::Err> {
|
||||||
|
STANDARD.decode(input).map(Self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, DeserializeFromStr, SerializeDisplay)]
|
||||||
|
struct Base64StandardAndMaybeExtension {
|
||||||
|
base64: Base64Standard,
|
||||||
|
extension: Option<EcoString>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Base64StandardAndMaybeExtension {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{}", self.base64)?;
|
||||||
|
|
||||||
|
if let Some(extension) = &self.extension {
|
||||||
|
write!(f, "{extension}")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Snafu)]
|
||||||
|
enum Base64StandardAndMaybeExtensionParseError {
|
||||||
|
InvalidBase64Standard { source: DecodeError },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for Base64StandardAndMaybeExtension {
|
||||||
|
type Err = Base64StandardAndMaybeExtensionParseError;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
let (base64, extension) = match s.split_once('.') {
|
||||||
|
Some((base64, extension)) => (base64, Some(extension)),
|
||||||
|
None => (s, None),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Base64StandardAndMaybeExtension {
|
||||||
|
base64: base64.parse().context(InvalidBase64StandardSnafu)?,
|
||||||
|
extension: extension.map(Into::into),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(TypedPath, Deserialize)]
|
||||||
|
#[typed_path("/{*sha512_hash_and_maybe_extension}")]
|
||||||
|
struct ByHash {
|
||||||
|
sha512_hash_and_maybe_extension: Base64StandardAndMaybeExtension,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Snafu)]
|
||||||
|
enum GetByHashError {
|
||||||
|
OpenTargetError {
|
||||||
|
source: std::io::Error,
|
||||||
|
},
|
||||||
|
InvalidETagError {
|
||||||
|
source: <axum_extra::headers::ETag as FromStr>::Err,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoResponse for GetByHashError {
|
||||||
|
fn into_response(self) -> axum::response::Response {
|
||||||
|
match &self {
|
||||||
|
GetByHashError::OpenTargetError { source } => {
|
||||||
|
if source.kind() == ErrorKind::NotFound {
|
||||||
|
StatusCode::NOT_FOUND.into_response()
|
||||||
|
} else {
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Report::from_error(self).to_string(),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GetByHashError::InvalidETagError { .. } => (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Report::from_error(self).to_string(),
|
||||||
|
)
|
||||||
|
.into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(fields(target))]
|
||||||
|
async fn get_by_hash(
|
||||||
|
ByHash {
|
||||||
|
sha512_hash_and_maybe_extension,
|
||||||
|
}: ByHash,
|
||||||
|
|
||||||
|
if_none_match: Option<TypedHeader<IfNoneMatch>>,
|
||||||
|
|
||||||
|
State(AppState { content_directory }): State<AppState>,
|
||||||
|
) -> Result<Response, GetByHashError> {
|
||||||
|
let etag = ETag::from_str("\"unchangeable\"").context(InvalidETagSnafu)?;
|
||||||
|
if let Some(TypedHeader(if_none_match)) = if_none_match
|
||||||
|
&& if_none_match == etag.clone().into()
|
||||||
|
{
|
||||||
|
return Ok(StatusCode::NOT_MODIFIED.into_response());
|
||||||
|
}
|
||||||
|
|
||||||
|
let Base64StandardAndMaybeExtension {
|
||||||
|
base64: Base64Standard(sha512_hash),
|
||||||
|
extension,
|
||||||
|
} = &sha512_hash_and_maybe_extension;
|
||||||
|
|
||||||
|
let target = base32::encode(
|
||||||
|
base32::Alphabet::Rfc4648HexLower { padding: false },
|
||||||
|
sha512_hash,
|
||||||
|
);
|
||||||
|
tracing::Span::current().record("target", &target);
|
||||||
|
|
||||||
|
let brotli_directory = content_directory.join("brotli");
|
||||||
|
// safety: cannot contain `.` or `/` because the base32 alphabet used doesn't have those
|
||||||
|
let path = brotli_directory.join(target);
|
||||||
|
|
||||||
|
let file = File::open(path).await.context(OpenTargetSnafu)?;
|
||||||
|
|
||||||
|
let stream = ReaderStream::new(file);
|
||||||
|
let file_stream = FileStream::new(stream);
|
||||||
|
|
||||||
|
let mime = extension
|
||||||
|
.as_deref()
|
||||||
|
.map(mime_guess::from_ext)
|
||||||
|
.as_ref()
|
||||||
|
.and_then(MimeGuess::first)
|
||||||
|
.unwrap_or(mime::APPLICATION_OCTET_STREAM);
|
||||||
|
let content_type = TypedHeader::<ContentType>(mime.into());
|
||||||
|
|
||||||
|
let content_encoding = TypedHeader(ContentEncoding::brotli());
|
||||||
|
|
||||||
|
const YEAR: u64 = 60 * 60 * 24 * 365;
|
||||||
|
let cache_length = Duration::from_secs(5 * YEAR);
|
||||||
|
|
||||||
|
let cache_control = TypedHeader(
|
||||||
|
CacheControl::new()
|
||||||
|
.with_immutable()
|
||||||
|
.with_max_age(cache_length),
|
||||||
|
);
|
||||||
|
|
||||||
|
let expires_at = SystemTime::now() + cache_length;
|
||||||
|
let expires = TypedHeader(Expires::from(expires_at));
|
||||||
|
|
||||||
|
let etag = TypedHeader(etag);
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
content_type,
|
||||||
|
content_encoding,
|
||||||
|
cache_control,
|
||||||
|
expires,
|
||||||
|
etag,
|
||||||
|
file_stream,
|
||||||
|
)
|
||||||
|
.into_response())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_router() -> Router<AppState> {
|
||||||
|
Router::new().typed_get(get_by_hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_router() -> Router<AppState> {
|
||||||
|
Router::new()
|
||||||
|
.nest("/sha512", sha512::create_router())
|
||||||
|
.layer(CorsLayer::permissive())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_router() -> Router<AppState> {
|
||||||
|
Router::new().nest("/content", content::create_router())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_router(state: AppState) -> Router<()> {
|
||||||
|
routes::create_router().with_state(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[snafu::report]
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), MainError> {
|
||||||
|
let Args {
|
||||||
|
env_filter,
|
||||||
|
ip,
|
||||||
|
port,
|
||||||
|
content_directory,
|
||||||
|
} = Args::parse();
|
||||||
|
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.pretty()
|
||||||
|
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
|
||||||
|
.with_env_filter(env_filter)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let state = AppState {
|
||||||
|
content_directory: content_directory.into(),
|
||||||
|
};
|
||||||
|
let router = create_router(state);
|
||||||
|
|
||||||
|
let addr = SocketAddr::new(ip, port);
|
||||||
|
let listener = tokio::net::TcpListener::bind(addr)
|
||||||
|
.await
|
||||||
|
.context(BindSnafu)?;
|
||||||
|
tracing::info!(?addr, "listening on");
|
||||||
|
|
||||||
|
axum::serve(listener, router).await.context(ServeSnafu)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
166
src/bin/upload.rs
Normal file
166
src/bin/upload.rs
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
use async_compression::tokio::bufread::BrotliEncoder;
|
||||||
|
use base64::{Engine, engine::general_purpose::STANDARD};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use clap::Parser;
|
||||||
|
use futures::{SinkExt, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt, channel::mpsc};
|
||||||
|
use sha2::{Digest, Sha512};
|
||||||
|
use snafu::{ResultExt, Snafu, futures::TryFutureExt as SnafuTryFutureExt};
|
||||||
|
use std::{
|
||||||
|
ffi::OsStr,
|
||||||
|
io::ErrorKind,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
use tempfile::NamedTempFile;
|
||||||
|
use tokio::{
|
||||||
|
fs::File,
|
||||||
|
io::{copy, stdin},
|
||||||
|
};
|
||||||
|
use tokio_util::{
|
||||||
|
either::Either,
|
||||||
|
io::{ReaderStream, StreamReader},
|
||||||
|
};
|
||||||
|
use tracing::level_filters::LevelFilter;
|
||||||
|
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
struct Args {
|
||||||
|
#[arg(long = "logging", env = "RUST_LOG", default_value_t = EnvFilter::new("").add_directive(LevelFilter::INFO.into()))]
|
||||||
|
env_filter: EnvFilter,
|
||||||
|
|
||||||
|
#[arg(long, env)]
|
||||||
|
content_directory: PathBuf,
|
||||||
|
|
||||||
|
source: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Snafu)]
|
||||||
|
enum MainError {
|
||||||
|
SourceOpen { source: std::io::Error },
|
||||||
|
Hash { source: std::io::Error },
|
||||||
|
TempFileCreation { source: std::io::Error },
|
||||||
|
FileWriting { source: std::io::Error },
|
||||||
|
CreateContentBrotliDirectory { source: std::io::Error },
|
||||||
|
FilePersistence { source: tempfile::PathPersistError },
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clone_stream<const COPIES: usize, T: Clone>(
|
||||||
|
mut stream: impl Stream<Item = T> + Unpin,
|
||||||
|
buffer: usize,
|
||||||
|
) -> (impl Future<Output = ()>, [impl Stream<Item = T>; COPIES]) {
|
||||||
|
let (txes, rxes): (Vec<_>, Vec<_>) = (0..COPIES).map(|_| mpsc::channel(buffer)).unzip();
|
||||||
|
|
||||||
|
let mut txes: [_; COPIES] = txes.try_into().unwrap();
|
||||||
|
let rxes: [_; COPIES] = rxes.try_into().unwrap();
|
||||||
|
|
||||||
|
(
|
||||||
|
async move {
|
||||||
|
while let Some(value) = stream.next().await {
|
||||||
|
for tx in &mut txes {
|
||||||
|
let _ = tx.send(value.clone()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
rxes,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_hash_from_stream<E>(
|
||||||
|
mut source: impl TryStream<Ok = Bytes, Error = E> + Unpin,
|
||||||
|
hasher: &mut impl sha2::Digest,
|
||||||
|
) -> Result<(), E> {
|
||||||
|
while let Some(data) = source.try_next().await? {
|
||||||
|
hasher.update(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[snafu::report]
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), MainError> {
|
||||||
|
let Args {
|
||||||
|
env_filter,
|
||||||
|
content_directory,
|
||||||
|
source,
|
||||||
|
} = Args::parse();
|
||||||
|
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.pretty()
|
||||||
|
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
|
||||||
|
.with_env_filter(env_filter)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let (source_reader, source_extension) = if source == Path::new("-") {
|
||||||
|
(Either::Left(stdin()), None)
|
||||||
|
} else {
|
||||||
|
(
|
||||||
|
Either::Right(File::open(&source).await.context(SourceOpenSnafu)?),
|
||||||
|
source.extension(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let source_stream = ReaderStream::new(source_reader);
|
||||||
|
let (task, [for_hasher, for_compressor]) =
|
||||||
|
clone_stream(source_stream.map_err(|e| e.kind()), 1024);
|
||||||
|
let clone_task = tokio::spawn(task);
|
||||||
|
|
||||||
|
let mut hasher = Sha512::new();
|
||||||
|
|
||||||
|
let for_compressor = StreamReader::new(for_compressor);
|
||||||
|
let mut compressor =
|
||||||
|
BrotliEncoder::with_quality(for_compressor, async_compression::Level::Best);
|
||||||
|
|
||||||
|
let (temp_file, temp_path) = NamedTempFile::new()
|
||||||
|
.context(TempFileCreationSnafu)?
|
||||||
|
.into_parts();
|
||||||
|
|
||||||
|
let mut temp_file = File::from_std(temp_file);
|
||||||
|
|
||||||
|
let (_, _) = tokio::try_join!(
|
||||||
|
try_hash_from_stream(for_hasher, &mut hasher)
|
||||||
|
.map_err(std::io::Error::from)
|
||||||
|
.context(HashSnafu),
|
||||||
|
copy(&mut compressor, &mut temp_file).context(FileWritingSnafu)
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let sha512_hash = hasher.finalize();
|
||||||
|
|
||||||
|
let sha512_hash_base32 = base32::encode(
|
||||||
|
base32::Alphabet::Rfc4648HexLower { padding: false },
|
||||||
|
&sha512_hash,
|
||||||
|
);
|
||||||
|
|
||||||
|
let target = sha512_hash_base32;
|
||||||
|
let brotli_directory = content_directory.join("brotli");
|
||||||
|
let target_path = brotli_directory.join(target);
|
||||||
|
|
||||||
|
create_dir_all_exist_ok(&brotli_directory)
|
||||||
|
.await
|
||||||
|
.context(CreateContentBrotliDirectorySnafu)?;
|
||||||
|
temp_path
|
||||||
|
.persist_noclobber(&target_path)
|
||||||
|
.context(FilePersistenceSnafu)?;
|
||||||
|
tracing::info!(?target_path, "persisted");
|
||||||
|
|
||||||
|
let sha512_hash_base64 = STANDARD.encode(sha512_hash);
|
||||||
|
let mut cdn_path = format!("/content/sha512/{sha512_hash_base64}");
|
||||||
|
if let Some(extension) = source_extension.and_then(OsStr::to_str) {
|
||||||
|
cdn_path = format!("{cdn_path}.{extension}");
|
||||||
|
}
|
||||||
|
tracing::info!(cdn_path);
|
||||||
|
|
||||||
|
let integrity = format!("sha512-{sha512_hash_base64}");
|
||||||
|
tracing::info!(integrity);
|
||||||
|
|
||||||
|
clone_task.await.unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_dir_all_exist_ok(path: &Path) -> Result<(), std::io::Error> {
|
||||||
|
match tokio::fs::create_dir_all(path).await {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(e) if e.kind() == ErrorKind::AlreadyExists => Ok(()),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
1
src/lib.rs
Normal file
1
src/lib.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
Reference in New Issue
Block a user