From 44c7aa60c2ac54c6e9eb9cb5e1caf35c04fef3e8 Mon Sep 17 00:00:00 2001 From: Jacob Date: Wed, 6 May 2026 21:40:06 -0400 Subject: [PATCH] feat: cache user data in memory to make access faster, simplifies code --- src/user_data.rs | 232 +++++++++++++++++++++++++++-------------------- 1 file changed, 133 insertions(+), 99 deletions(-) diff --git a/src/user_data.rs b/src/user_data.rs index 07c6791..263955e 100644 --- a/src/user_data.rs +++ b/src/user_data.rs @@ -1,8 +1,10 @@ -use std::str::FromStr; +use std::{str::FromStr, sync::Arc}; use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder}; +use bytes::Bytes; use capnp::message::TypedBuilder; use futures::{AsyncReadExt, AsyncWriteExt, TryStream, TryStreamExt}; +use moka::future::Cache; use opendal::Operator; use snafu::{OptionExt as _, ResultExt as _, Snafu, ensure}; use twilight_model::id::{Id, marker::UserMarker}; @@ -14,11 +16,15 @@ pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true; #[derive(Debug, Clone)] pub struct UserDataManager { operator: Operator, + cache: Cache, Bytes>, } impl UserDataManager { pub fn new(operator: Operator) -> Self { - Self { operator } + Self { + operator, + cache: Cache::builder().build(), + } } } @@ -97,12 +103,105 @@ impl UserDataManager { #[derive(Debug, Snafu)] #[snafu(module)] -pub enum WithError { +pub enum ReadError { /// couldn't read data for this user from the storage operator - ReadError { source: opendal::Error }, + OperatorReaderError { source: opendal::Error }, /// couldn't decompress the user data from storage DecompressionError { source: std::io::Error }, +} + +impl UserDataManager { + async fn read(&self, id: Id) -> Result { + self.cache + .try_get_with::<_, ReadError>(id, async { + let path = path(id); + let compressed_reader = self + .operator + .async_reader_if_exists(&path) + .await + .context(read_error::OperatorReaderSnafu)?; + + let decompressed_reader = compressed_reader.map(BrotliDecoder::new); + + let decompressed = decompressed_reader + .map_async(|mut reader| async move { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec).await?; + Ok(vec) + }) + .await + .transpose() + .context(read_error::DecompressionSnafu)?; + + let vec = decompressed.unwrap_or_else(|| { + let mut fallback = TypedBuilder::::new_default(); + fallback.init_root(); + + let mut vec = Vec::new(); + capnp::serialize::write_message(&mut vec, fallback.borrow_inner()) + .expect("cannot error"); + vec + }); + + let bytes = vec.into(); + + Ok(bytes) + }) + .await + .map_err(|arc| { + Arc::into_inner(arc).expect("expected this to be the only reference to the error") + }) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum WriteError { + /// couldn't create a writer for this user data in the storage operator + OperatorWriterError { source: opendal::Error }, + + /// couldn't write (modified) data for this user to the storage operator + WriteError { source: std::io::Error }, + + /// couldn't finalize writing (modified) data for this user to the storage operator + FinalizeError { source: std::io::Error }, +} + +impl UserDataManager { + async fn write(&self, id: Id, bytes: Bytes) -> Result<(), WriteError> { + let path = path(id); + + let compressed_writer = self + .operator + .writer(&path) + .await + .context(write_error::OperatorWriterSnafu)? + .into_futures_async_write(); + + let mut decompressed_writer = BrotliEncoder::new(compressed_writer); + + decompressed_writer + .write_all(&bytes) + .await + .context(write_error::WriteSnafu)?; + + decompressed_writer + .close() + .await + .context(write_error::FinalizeSnafu)?; + + self.cache.insert(id, bytes).await; + + Ok(()) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum WithError { + /// couldn't read the user data from storage + ReadError { source: ReadError }, /// couldn't deserialize the user data DeserializeError { source: capnp::Error }, @@ -114,41 +213,18 @@ impl UserDataManager { id: Id, f: impl FnOnce(user_capnp::user::Reader<'_>) -> R, ) -> Result { - let compressed_buffer = self - .operator - .async_reader_if_exists(&path(id)) - .await - .context(with_error::ReadSnafu)?; + let message_bytes = self.read(id).await.context(with_error::ReadSnafu)?; - let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); - let decompressed = decompressed_reader - .map_async(|mut reader| async move { - let mut vec = Vec::new(); - reader.read_to_end(&mut vec).await?; - Ok(vec) - }) - .await - .transpose() - .context(with_error::DecompressionSnafu)?; + let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( + &mut message_bytes.as_ref(), + Default::default(), + ) + .context(with_error::DeserializeSnafu)?; - let mut message = TypedBuilder::::new_default(); - let fallback = message.init_root(); - - let mut user_data = fallback.into_reader(); - let message_reader; - - if let Some(mut bytes) = decompressed.as_deref() { - message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( - &mut bytes, - Default::default(), - ) + let user_data = message_reader + .get_root() .context(with_error::DeserializeSnafu)?; - user_data = message_reader - .get_root() - .context(with_error::DeserializeSnafu)?; - } - Ok(f(user_data)) } } @@ -156,11 +232,8 @@ impl UserDataManager { #[derive(Debug, Snafu)] #[snafu(module)] pub enum UpdateError { - /// couldn't read data for this user from the storage operator - ReadError { source: opendal::Error }, - - /// couldn't decompress the user data from storage - DecompressionError { source: std::io::Error }, + /// couldn't read the user data from storage + ReadError { source: ReadError }, /// couldn't deserialize the user data DeserializeError { source: capnp::Error }, @@ -168,14 +241,8 @@ pub enum UpdateError { /// couldn't serialize the (modified) user data SerializeError { source: capnp::Error }, - /// couldn't create a writer for this user data in the storage operator - WriterError { source: opendal::Error }, - - /// couldn't write (modified) data for this user to the storage operator - WriteError { source: std::io::Error }, - - /// couldn't finalize writing (modified) data for this user to the storage operator - FinalizeError { source: std::io::Error }, + /// couldn't write the user data to storage + WriteError { source: WriteError }, } impl UserDataManager { @@ -184,71 +251,38 @@ impl UserDataManager { id: Id, f: impl FnOnce(user_capnp::user::Builder<'_>) -> R, ) -> Result { - let path = path(id); - let compressed_buffer = self - .operator - .async_reader_if_exists(&path) - .await - .context(update_error::ReadSnafu)?; + let bytes = self.read(id).await.context(update_error::ReadSnafu)?; - let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); - let decompressed = decompressed_reader - .map_async(|mut reader| async move { - let mut vec = Vec::new(); - reader.read_to_end(&mut vec).await?; - Ok(vec) - }) - .await - .transpose() - .context(update_error::DecompressionSnafu)?; + let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( + &mut bytes.as_ref(), + Default::default(), + ) + .context(update_error::DeserializeSnafu)?; let mut message = TypedBuilder::::new_default(); - let ret = if let Some(mut bytes) = decompressed.as_deref() { - let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( - &mut bytes, - Default::default(), - ) + let user_data = message_reader + .get_root() .context(update_error::DeserializeSnafu)?; - let user_data = message_reader - .get_root() - .context(update_error::DeserializeSnafu)?; + message + .set_root(user_data) + .context(update_error::DeserializeSnafu)?; - message - .set_root(user_data) - .context(update_error::DeserializeSnafu)?; + let ret = f( + message.get_root().unwrap(), // this is logically impossible + ); - f( - message.get_root().unwrap(), // this is logically impossible - ) - } else { - f(message.init_root()) - }; - - let mut buffer = Vec::new(); - capnp::serialize::write_message(&mut buffer, message.borrow_inner()) + let mut vec = Vec::new(); + capnp::serialize::write_message(&mut vec, message.borrow_inner()) .context(update_error::SerializeSnafu)?; - let compressed_writer = self - .operator - .writer(&path) - .await - .context(update_error::WriterSnafu)? - .into_futures_async_write(); + let bytes = vec.into(); - let mut decompressed_writer = BrotliEncoder::new(compressed_writer); - - decompressed_writer - .write_all(&buffer) + self.write(id, bytes) .await .context(update_error::WriteSnafu)?; - decompressed_writer - .close() - .await - .context(update_error::FinalizeSnafu)?; - Ok(ret) } }