use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder}; use capnp::message::TypedBuilder; use futures::{AsyncReadExt, AsyncWriteExt}; use opendal::Operator; use snafu::{ResultExt as _, Snafu}; use crate::{OperatorExt, bot_capnp, option_ext::OptionExt as _}; #[derive(Debug, Clone)] pub struct BotDataManager { operator: Operator, } impl BotDataManager { pub fn new(operator: Operator) -> Self { Self { operator } } } const PATH: &str = "data.bin.brotli"; #[derive(Debug, Snafu)] #[snafu(module)] pub enum WithError { /// couldn't read data for this bot from the storage operator ReadError { source: opendal::Error }, /// couldn't decompress the bot data from storage DecompressionError { source: std::io::Error }, /// couldn't deserialize the bot data DeserializeError { source: capnp::Error }, } impl BotDataManager { pub async fn with( &self, f: impl FnOnce(bot_capnp::bot::Reader<'_>) -> R, ) -> Result { let compressed_buffer = self .operator .async_reader_if_exists(PATH) .await .context(with_error::ReadSnafu)?; let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); let decompressed = decompressed_reader .map_async(|mut reader| async move { let mut vec = Vec::new(); reader.read_to_end(&mut vec).await?; Ok(vec) }) .await .transpose() .context(with_error::DecompressionSnafu)?; let mut message = TypedBuilder::::new_default(); let fallback = message.init_root(); let mut bot_data = fallback.into_reader(); let message_reader; if let Some(mut bytes) = decompressed.as_deref() { message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( &mut bytes, Default::default(), ) .context(with_error::DeserializeSnafu)?; bot_data = message_reader .get_root() .context(with_error::DeserializeSnafu)?; } Ok(f(bot_data)) } } #[derive(Debug, Snafu)] #[snafu(module)] pub enum UpdateError { /// couldn't read data for this bot from the storage operator ReadError { source: opendal::Error }, /// couldn't decompress the bot data from storage DecompressionError { source: std::io::Error }, /// couldn't deserialize the bot data DeserializeError { source: capnp::Error }, /// couldn't serialize the (modified) bot data SerializeError { source: capnp::Error }, /// couldn't create a writer for this bot data in the storage operator WriterError { source: opendal::Error }, /// couldn't write (modified) data for this bot to the storage operator WriteError { source: std::io::Error }, /// couldn't finalize writing (modified) data for this bot to the storage operator FinalizeError { source: std::io::Error }, } impl BotDataManager { pub async fn update( &self, f: impl FnOnce(bot_capnp::bot::Builder<'_>) -> R, ) -> Result { let compressed_buffer = self .operator .async_reader_if_exists(PATH) .await .context(update_error::ReadSnafu)?; let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); let decompressed = decompressed_reader .map_async(|mut reader| async move { let mut vec = Vec::new(); reader.read_to_end(&mut vec).await?; Ok(vec) }) .await .transpose() .context(update_error::DecompressionSnafu)?; let mut message = TypedBuilder::::new_default(); let ret = if let Some(mut bytes) = decompressed.as_deref() { let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( &mut bytes, Default::default(), ) .context(update_error::DeserializeSnafu)?; let bot_data = message_reader .get_root() .context(update_error::DeserializeSnafu)?; message .set_root(bot_data) .context(update_error::DeserializeSnafu)?; f( message.get_root().unwrap(), // this is logically impossible ) } else { f(message.init_root()) }; let mut buffer = Vec::new(); capnp::serialize::write_message(&mut buffer, message.borrow_inner()) .context(update_error::SerializeSnafu)?; let compressed_writer = self .operator .writer(PATH) .await .context(update_error::WriterSnafu)? .into_futures_async_write(); let mut decompressed_writer = BrotliEncoder::new(compressed_writer); decompressed_writer .write_all(&buffer) .await .context(update_error::WriteSnafu)?; decompressed_writer .close() .await .context(update_error::FinalizeSnafu)?; Ok(ret) } }