diff --git a/src/bot_data.rs b/src/bot_data.rs new file mode 100644 index 0000000..ce255d4 --- /dev/null +++ b/src/bot_data.rs @@ -0,0 +1,175 @@ +use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder}; +use capnp::message::TypedBuilder; +use futures::{AsyncReadExt, AsyncWriteExt}; +use opendal::Operator; +use snafu::{ResultExt as _, Snafu}; + +use crate::{OperatorExt, bot_capnp, option_ext::OptionExt as _}; + +#[derive(Debug, Clone)] +pub struct BotDataManager { + operator: Operator, +} + +impl BotDataManager { + pub fn new(operator: Operator) -> Self { + Self { operator } + } +} + +const PATH: &str = "data.bin.brotli"; + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum WithError { + /// couldn't read data for this bot from the storage operator + ReadError { source: opendal::Error }, + + /// couldn't decompress the bot data from storage + DecompressionError { source: std::io::Error }, + + /// couldn't deserialize the bot data + DeserializeError { source: capnp::Error }, +} + +impl BotDataManager { + pub async fn with( + &self, + f: impl FnOnce(bot_capnp::bot::Reader<'_>) -> R, + ) -> Result { + let compressed_buffer = self + .operator + .async_reader_if_exists(PATH) + .await + .context(with_error::ReadSnafu)?; + + let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); + let decompressed = decompressed_reader + .map_async(|mut reader| async move { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec).await?; + Ok(vec) + }) + .await + .transpose() + .context(with_error::DecompressionSnafu)?; + + let mut message = TypedBuilder::::new_default(); + let fallback = message.init_root(); + + let mut bot_data = fallback.into_reader(); + let message_reader; + + if let Some(mut bytes) = decompressed.as_deref() { + message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( + &mut bytes, + Default::default(), + ) + .context(with_error::DeserializeSnafu)?; + + bot_data = message_reader + .get_root() + .context(with_error::DeserializeSnafu)?; + } + + Ok(f(bot_data)) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum UpdateError { + /// couldn't read data for this bot from the storage operator + ReadError { source: opendal::Error }, + + /// couldn't decompress the bot data from storage + DecompressionError { source: std::io::Error }, + + /// couldn't deserialize the bot data + DeserializeError { source: capnp::Error }, + + /// couldn't serialize the (modified) bot data + SerializeError { source: capnp::Error }, + + /// couldn't create a writer for this bot data in the storage operator + WriterError { source: opendal::Error }, + + /// couldn't write (modified) data for this bot to the storage operator + WriteError { source: std::io::Error }, + + /// couldn't finalize writing (modified) data for this bot to the storage operator + FinalizeError { source: std::io::Error }, +} + +impl BotDataManager { + pub async fn update( + &self, + f: impl FnOnce(bot_capnp::bot::Builder<'_>) -> R, + ) -> Result { + let compressed_buffer = self + .operator + .async_reader_if_exists(PATH) + .await + .context(update_error::ReadSnafu)?; + + let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); + let decompressed = decompressed_reader + .map_async(|mut reader| async move { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec).await?; + Ok(vec) + }) + .await + .transpose() + .context(update_error::DecompressionSnafu)?; + + let mut message = TypedBuilder::::new_default(); + + let ret = if let Some(mut bytes) = decompressed.as_deref() { + let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( + &mut bytes, + Default::default(), + ) + .context(update_error::DeserializeSnafu)?; + + let bot_data = message_reader + .get_root() + .context(update_error::DeserializeSnafu)?; + + message + .set_root(bot_data) + .context(update_error::DeserializeSnafu)?; + + f( + message.get_root().unwrap(), // this is logically impossible + ) + } else { + f(message.init_root()) + }; + + let mut buffer = Vec::new(); + capnp::serialize::write_message(&mut buffer, message.borrow_inner()) + .context(update_error::SerializeSnafu)?; + + let compressed_writer = self + .operator + .writer(PATH) + .await + .context(update_error::WriterSnafu)? + .into_futures_async_write(); + + let mut decompressed_writer = BrotliEncoder::new(compressed_writer); + + decompressed_writer + .write_all(&buffer) + .await + .context(update_error::WriteSnafu)?; + + decompressed_writer + .close() + .await + .context(update_error::FinalizeSnafu)?; + + Ok(ret) + } +} diff --git a/src/command/debug.rs b/src/command/info.rs similarity index 73% rename from src/command/debug.rs rename to src/command/info.rs index 23f64ea..07b92ac 100644 --- a/src/command/debug.rs +++ b/src/command/info.rs @@ -1,10 +1,6 @@ -use std::sync::LazyLock; - -use async_compression::futures::bufread::BrotliDecoder; -use capnp::message::ReaderOptions; -use futures::{AsyncReadExt, TryStreamExt}; -use opendal::ErrorKind; +use futures::TryStreamExt; use snafu::{OptionExt, Snafu}; +use std::sync::LazyLock; use twilight_model::{ application::{ command::{Command, CommandType}, @@ -20,11 +16,10 @@ use twilight_util::builder::{ embed::{EmbedAuthorBuilder, EmbedBuilder, EmbedFieldBuilder}, }; -use crate::{bot_capnp, command::State}; +use crate::command::State; -const NAME: &str = "debug"; -const DESCRIPTION: &str = - "(Only the bot owner can use this) Show various information for debugging purposes"; +const NAME: &str = "info"; +const DESCRIPTION: &str = "Show various information"; pub static COMMAND: LazyLock = LazyLock::new(|| { CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput) @@ -114,54 +109,22 @@ pub async fn handle(state: State, interaction: Interaction) { .await .expect("TODO"); - let heat_script_description = { - let compressed_result = state - .bot_data - .reader("data.bin.brotli") - .await - .expect("TODO") - .into_futures_async_read(..) - .await; - - let mut buf = Vec::default(); - let mut message = capnp::message::TypedBuilder::::new_default(); - let fallback = message.init_root(); - - let message_reader; - let mut bot_data = fallback.into_reader(); - match compressed_result { - Ok(compressed) => { - let mut decompressed = BrotliDecoder::new(compressed); - decompressed.read_to_end(&mut buf).await.expect("TODO"); - - message_reader = - capnp::serialize_packed::read_message(&*buf, ReaderOptions::default()) - .expect("TODO"); - - bot_data = message_reader - .get_root::() - .expect("TODO"); - } - Err(error) if error.kind() == ErrorKind::NotFound => { - tracing::error!("TODO: proceeding with fallback"); - } - Err(error) => { - tracing::error!(?error, "TODO"); - - return; - } - } - - let heat_script_option = bot_data - .has_heat_script() - .then(|| bot_data.get_heat_script().expect("TODO")); - let heat_script_option = - heat_script_option.map(|heat_script| heat_script.to_str().expect("TODO")); - - heat_script_option.map_or("none set yet".into(), |heat_script| { - format!("```\n{heat_script}\n```") + let heat_script_description = state + .bot_data_manager + .with(|bot_data| { + let heat_script_option = bot_data.has_heat_script().then(|| { + bot_data + .get_heat_script() + .expect("TODO") + .to_string() + .expect("TODO") + }); + heat_script_option.map_or("none set yet".into(), |heat_script| { + format!("```\n{heat_script}\n```") + }) }) - }; + .await + .expect("TODO"); state .discord_client diff --git a/src/command/mod.rs b/src/command/mod.rs index d65bf5e..af98444 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -16,9 +16,9 @@ use twilight_model::{ }, }; -use crate::{GuildVoiceChannelToTextChannel, UserDataManager, VCs}; +use crate::{BotDataManager, GuildVoiceChannelToTextChannel, UserDataManager, VCs}; -pub mod debug; +pub mod info; pub mod join; pub mod leave; pub mod opt_in; @@ -28,7 +28,7 @@ pub mod opt_out; pub struct State { pub audio_channels: Channels, pub audio_sample_rate: SampleRate, - pub bot_data: Operator, + pub bot_data_manager: BotDataManager, pub cancellation_token: CancellationToken, pub discord_application_id: Id, pub discord_bot_owner_user_id: Id, @@ -58,7 +58,7 @@ where pub fn all() -> Vec<(&'static Command, BoxedHandler)> { vec![ - (&debug::COMMAND, box_handler(debug::handle)), + (&info::COMMAND, box_handler(info::handle)), (&join::COMMAND, box_handler(join::handle)), (&leave::COMMAND, box_handler(leave::handle)), (&opt_in::COMMAND, box_handler(opt_in::handle)), diff --git a/src/lib.rs b/src/lib.rs index 065b87f..0189097 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +mod bot_data; pub mod command; mod one_to_many; mod one_to_many_with_data; @@ -11,6 +12,7 @@ mod vc_user; capnp::generated_code!(mod bot_capnp); capnp::generated_code!(mod user_capnp); +pub use bot_data::BotDataManager; pub use command::{Router as CommandRouter, State, all as all_commands}; pub use one_to_many::OneToManyUniqueBTreeMap; pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData; diff --git a/src/main.rs b/src/main.rs index 94edcdf..464f2d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use clap::Parser; use fomo_reducer::{ - CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, all_commands, - command, initialize_vcs, update_vcs, + BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, + all_commands, command, initialize_vcs, update_vcs, }; use secrecy::{ExposeSecret, SecretString}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -322,6 +322,7 @@ async fn main() -> Result<(), MainError> { let recording_data = recording_data.into_inner(); let user_data = user_data.into_inner(); + let bot_data_manager = BotDataManager::new(bot_data); let user_data_manager = UserDataManager::new(user_data); let discord_voice_channel_corresponding_text_channel = { @@ -343,7 +344,7 @@ async fn main() -> Result<(), MainError> { let state = State { audio_channels, audio_sample_rate, - bot_data, + bot_data_manager, cancellation_token: cancellation_token.clone(), discord_application_id, discord_bot_owner_user_id,