feat: cache user data in memory to make access faster, simplifies code
This commit is contained in:
232
src/user_data.rs
232
src/user_data.rs
@@ -1,8 +1,10 @@
|
|||||||
use std::str::FromStr;
|
use std::{str::FromStr, sync::Arc};
|
||||||
|
|
||||||
use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder};
|
use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder};
|
||||||
|
use bytes::Bytes;
|
||||||
use capnp::message::TypedBuilder;
|
use capnp::message::TypedBuilder;
|
||||||
use futures::{AsyncReadExt, AsyncWriteExt, TryStream, TryStreamExt};
|
use futures::{AsyncReadExt, AsyncWriteExt, TryStream, TryStreamExt};
|
||||||
|
use moka::future::Cache;
|
||||||
use opendal::Operator;
|
use opendal::Operator;
|
||||||
use snafu::{OptionExt as _, ResultExt as _, Snafu, ensure};
|
use snafu::{OptionExt as _, ResultExt as _, Snafu, ensure};
|
||||||
use twilight_model::id::{Id, marker::UserMarker};
|
use twilight_model::id::{Id, marker::UserMarker};
|
||||||
@@ -14,11 +16,15 @@ pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true;
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct UserDataManager {
|
pub struct UserDataManager {
|
||||||
operator: Operator,
|
operator: Operator,
|
||||||
|
cache: Cache<Id<UserMarker>, Bytes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UserDataManager {
|
impl UserDataManager {
|
||||||
pub fn new(operator: Operator) -> Self {
|
pub fn new(operator: Operator) -> Self {
|
||||||
Self { operator }
|
Self {
|
||||||
|
operator,
|
||||||
|
cache: Cache::builder().build(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,12 +103,105 @@ impl UserDataManager {
|
|||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
#[snafu(module)]
|
#[snafu(module)]
|
||||||
pub enum WithError {
|
pub enum ReadError {
|
||||||
/// couldn't read data for this user from the storage operator
|
/// couldn't read data for this user from the storage operator
|
||||||
ReadError { source: opendal::Error },
|
OperatorReaderError { source: opendal::Error },
|
||||||
|
|
||||||
/// couldn't decompress the user data from storage
|
/// couldn't decompress the user data from storage
|
||||||
DecompressionError { source: std::io::Error },
|
DecompressionError { source: std::io::Error },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UserDataManager {
|
||||||
|
async fn read(&self, id: Id<UserMarker>) -> Result<Bytes, ReadError> {
|
||||||
|
self.cache
|
||||||
|
.try_get_with::<_, ReadError>(id, async {
|
||||||
|
let path = path(id);
|
||||||
|
let compressed_reader = self
|
||||||
|
.operator
|
||||||
|
.async_reader_if_exists(&path)
|
||||||
|
.await
|
||||||
|
.context(read_error::OperatorReaderSnafu)?;
|
||||||
|
|
||||||
|
let decompressed_reader = compressed_reader.map(BrotliDecoder::new);
|
||||||
|
|
||||||
|
let decompressed = decompressed_reader
|
||||||
|
.map_async(|mut reader| async move {
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
reader.read_to_end(&mut vec).await?;
|
||||||
|
Ok(vec)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.transpose()
|
||||||
|
.context(read_error::DecompressionSnafu)?;
|
||||||
|
|
||||||
|
let vec = decompressed.unwrap_or_else(|| {
|
||||||
|
let mut fallback = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
||||||
|
fallback.init_root();
|
||||||
|
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
capnp::serialize::write_message(&mut vec, fallback.borrow_inner())
|
||||||
|
.expect("cannot error");
|
||||||
|
vec
|
||||||
|
});
|
||||||
|
|
||||||
|
let bytes = vec.into();
|
||||||
|
|
||||||
|
Ok(bytes)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|arc| {
|
||||||
|
Arc::into_inner(arc).expect("expected this to be the only reference to the error")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Snafu)]
|
||||||
|
#[snafu(module)]
|
||||||
|
pub enum WriteError {
|
||||||
|
/// couldn't create a writer for this user data in the storage operator
|
||||||
|
OperatorWriterError { source: opendal::Error },
|
||||||
|
|
||||||
|
/// couldn't write (modified) data for this user to the storage operator
|
||||||
|
WriteError { source: std::io::Error },
|
||||||
|
|
||||||
|
/// couldn't finalize writing (modified) data for this user to the storage operator
|
||||||
|
FinalizeError { source: std::io::Error },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UserDataManager {
|
||||||
|
async fn write(&self, id: Id<UserMarker>, bytes: Bytes) -> Result<(), WriteError> {
|
||||||
|
let path = path(id);
|
||||||
|
|
||||||
|
let compressed_writer = self
|
||||||
|
.operator
|
||||||
|
.writer(&path)
|
||||||
|
.await
|
||||||
|
.context(write_error::OperatorWriterSnafu)?
|
||||||
|
.into_futures_async_write();
|
||||||
|
|
||||||
|
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
|
||||||
|
|
||||||
|
decompressed_writer
|
||||||
|
.write_all(&bytes)
|
||||||
|
.await
|
||||||
|
.context(write_error::WriteSnafu)?;
|
||||||
|
|
||||||
|
decompressed_writer
|
||||||
|
.close()
|
||||||
|
.await
|
||||||
|
.context(write_error::FinalizeSnafu)?;
|
||||||
|
|
||||||
|
self.cache.insert(id, bytes).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Snafu)]
|
||||||
|
#[snafu(module)]
|
||||||
|
pub enum WithError {
|
||||||
|
/// couldn't read the user data from storage
|
||||||
|
ReadError { source: ReadError },
|
||||||
|
|
||||||
/// couldn't deserialize the user data
|
/// couldn't deserialize the user data
|
||||||
DeserializeError { source: capnp::Error },
|
DeserializeError { source: capnp::Error },
|
||||||
@@ -114,41 +213,18 @@ impl UserDataManager {
|
|||||||
id: Id<UserMarker>,
|
id: Id<UserMarker>,
|
||||||
f: impl FnOnce(user_capnp::user::Reader<'_>) -> R,
|
f: impl FnOnce(user_capnp::user::Reader<'_>) -> R,
|
||||||
) -> Result<R, WithError> {
|
) -> Result<R, WithError> {
|
||||||
let compressed_buffer = self
|
let message_bytes = self.read(id).await.context(with_error::ReadSnafu)?;
|
||||||
.operator
|
|
||||||
.async_reader_if_exists(&path(id))
|
|
||||||
.await
|
|
||||||
.context(with_error::ReadSnafu)?;
|
|
||||||
|
|
||||||
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
|
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||||
let decompressed = decompressed_reader
|
&mut message_bytes.as_ref(),
|
||||||
.map_async(|mut reader| async move {
|
Default::default(),
|
||||||
let mut vec = Vec::new();
|
)
|
||||||
reader.read_to_end(&mut vec).await?;
|
.context(with_error::DeserializeSnafu)?;
|
||||||
Ok(vec)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.transpose()
|
|
||||||
.context(with_error::DecompressionSnafu)?;
|
|
||||||
|
|
||||||
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
let user_data = message_reader
|
||||||
let fallback = message.init_root();
|
.get_root()
|
||||||
|
|
||||||
let mut user_data = fallback.into_reader();
|
|
||||||
let message_reader;
|
|
||||||
|
|
||||||
if let Some(mut bytes) = decompressed.as_deref() {
|
|
||||||
message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
|
||||||
&mut bytes,
|
|
||||||
Default::default(),
|
|
||||||
)
|
|
||||||
.context(with_error::DeserializeSnafu)?;
|
.context(with_error::DeserializeSnafu)?;
|
||||||
|
|
||||||
user_data = message_reader
|
|
||||||
.get_root()
|
|
||||||
.context(with_error::DeserializeSnafu)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(f(user_data))
|
Ok(f(user_data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -156,11 +232,8 @@ impl UserDataManager {
|
|||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
#[snafu(module)]
|
#[snafu(module)]
|
||||||
pub enum UpdateError {
|
pub enum UpdateError {
|
||||||
/// couldn't read data for this user from the storage operator
|
/// couldn't read the user data from storage
|
||||||
ReadError { source: opendal::Error },
|
ReadError { source: ReadError },
|
||||||
|
|
||||||
/// couldn't decompress the user data from storage
|
|
||||||
DecompressionError { source: std::io::Error },
|
|
||||||
|
|
||||||
/// couldn't deserialize the user data
|
/// couldn't deserialize the user data
|
||||||
DeserializeError { source: capnp::Error },
|
DeserializeError { source: capnp::Error },
|
||||||
@@ -168,14 +241,8 @@ pub enum UpdateError {
|
|||||||
/// couldn't serialize the (modified) user data
|
/// couldn't serialize the (modified) user data
|
||||||
SerializeError { source: capnp::Error },
|
SerializeError { source: capnp::Error },
|
||||||
|
|
||||||
/// couldn't create a writer for this user data in the storage operator
|
/// couldn't write the user data to storage
|
||||||
WriterError { source: opendal::Error },
|
WriteError { source: WriteError },
|
||||||
|
|
||||||
/// couldn't write (modified) data for this user to the storage operator
|
|
||||||
WriteError { source: std::io::Error },
|
|
||||||
|
|
||||||
/// couldn't finalize writing (modified) data for this user to the storage operator
|
|
||||||
FinalizeError { source: std::io::Error },
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UserDataManager {
|
impl UserDataManager {
|
||||||
@@ -184,71 +251,38 @@ impl UserDataManager {
|
|||||||
id: Id<UserMarker>,
|
id: Id<UserMarker>,
|
||||||
f: impl FnOnce(user_capnp::user::Builder<'_>) -> R,
|
f: impl FnOnce(user_capnp::user::Builder<'_>) -> R,
|
||||||
) -> Result<R, UpdateError> {
|
) -> Result<R, UpdateError> {
|
||||||
let path = path(id);
|
let bytes = self.read(id).await.context(update_error::ReadSnafu)?;
|
||||||
let compressed_buffer = self
|
|
||||||
.operator
|
|
||||||
.async_reader_if_exists(&path)
|
|
||||||
.await
|
|
||||||
.context(update_error::ReadSnafu)?;
|
|
||||||
|
|
||||||
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
|
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||||
let decompressed = decompressed_reader
|
&mut bytes.as_ref(),
|
||||||
.map_async(|mut reader| async move {
|
Default::default(),
|
||||||
let mut vec = Vec::new();
|
)
|
||||||
reader.read_to_end(&mut vec).await?;
|
.context(update_error::DeserializeSnafu)?;
|
||||||
Ok(vec)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.transpose()
|
|
||||||
.context(update_error::DecompressionSnafu)?;
|
|
||||||
|
|
||||||
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
||||||
|
|
||||||
let ret = if let Some(mut bytes) = decompressed.as_deref() {
|
let user_data = message_reader
|
||||||
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
.get_root()
|
||||||
&mut bytes,
|
|
||||||
Default::default(),
|
|
||||||
)
|
|
||||||
.context(update_error::DeserializeSnafu)?;
|
.context(update_error::DeserializeSnafu)?;
|
||||||
|
|
||||||
let user_data = message_reader
|
message
|
||||||
.get_root()
|
.set_root(user_data)
|
||||||
.context(update_error::DeserializeSnafu)?;
|
.context(update_error::DeserializeSnafu)?;
|
||||||
|
|
||||||
message
|
let ret = f(
|
||||||
.set_root(user_data)
|
message.get_root().unwrap(), // this is logically impossible
|
||||||
.context(update_error::DeserializeSnafu)?;
|
);
|
||||||
|
|
||||||
f(
|
let mut vec = Vec::new();
|
||||||
message.get_root().unwrap(), // this is logically impossible
|
capnp::serialize::write_message(&mut vec, message.borrow_inner())
|
||||||
)
|
|
||||||
} else {
|
|
||||||
f(message.init_root())
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
capnp::serialize::write_message(&mut buffer, message.borrow_inner())
|
|
||||||
.context(update_error::SerializeSnafu)?;
|
.context(update_error::SerializeSnafu)?;
|
||||||
|
|
||||||
let compressed_writer = self
|
let bytes = vec.into();
|
||||||
.operator
|
|
||||||
.writer(&path)
|
|
||||||
.await
|
|
||||||
.context(update_error::WriterSnafu)?
|
|
||||||
.into_futures_async_write();
|
|
||||||
|
|
||||||
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
|
self.write(id, bytes)
|
||||||
|
|
||||||
decompressed_writer
|
|
||||||
.write_all(&buffer)
|
|
||||||
.await
|
.await
|
||||||
.context(update_error::WriteSnafu)?;
|
.context(update_error::WriteSnafu)?;
|
||||||
|
|
||||||
decompressed_writer
|
|
||||||
.close()
|
|
||||||
.await
|
|
||||||
.context(update_error::FinalizeSnafu)?;
|
|
||||||
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user