Compare commits
2 Commits
2c0d5c8479
...
48e92d1736
| Author | SHA1 | Date | |
|---|---|---|---|
|
48e92d1736
|
|||
|
733e8f73ea
|
175
src/bot_data.rs
Normal file
175
src/bot_data.rs
Normal file
@@ -0,0 +1,175 @@
|
||||
use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder};
|
||||
use capnp::message::TypedBuilder;
|
||||
use futures::{AsyncReadExt, AsyncWriteExt};
|
||||
use opendal::Operator;
|
||||
use snafu::{ResultExt as _, Snafu};
|
||||
|
||||
use crate::{OperatorExt, bot_capnp, option_ext::OptionExt as _};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BotDataManager {
|
||||
operator: Operator,
|
||||
}
|
||||
|
||||
impl BotDataManager {
|
||||
pub fn new(operator: Operator) -> Self {
|
||||
Self { operator }
|
||||
}
|
||||
}
|
||||
|
||||
const PATH: &str = "data.bin.brotli";
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(module)]
|
||||
pub enum WithError {
|
||||
/// couldn't read data for this bot from the storage operator
|
||||
ReadError { source: opendal::Error },
|
||||
|
||||
/// couldn't decompress the bot data from storage
|
||||
DecompressionError { source: std::io::Error },
|
||||
|
||||
/// couldn't deserialize the bot data
|
||||
DeserializeError { source: capnp::Error },
|
||||
}
|
||||
|
||||
impl BotDataManager {
|
||||
pub async fn with<R>(
|
||||
&self,
|
||||
f: impl FnOnce(bot_capnp::bot::Reader<'_>) -> R,
|
||||
) -> Result<R, WithError> {
|
||||
let compressed_buffer = self
|
||||
.operator
|
||||
.async_reader_if_exists(PATH)
|
||||
.await
|
||||
.context(with_error::ReadSnafu)?;
|
||||
|
||||
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
|
||||
let decompressed = decompressed_reader
|
||||
.map_async(|mut reader| async move {
|
||||
let mut vec = Vec::new();
|
||||
reader.read_to_end(&mut vec).await?;
|
||||
Ok(vec)
|
||||
})
|
||||
.await
|
||||
.transpose()
|
||||
.context(with_error::DecompressionSnafu)?;
|
||||
|
||||
let mut message = TypedBuilder::<bot_capnp::bot::Owned>::new_default();
|
||||
let fallback = message.init_root();
|
||||
|
||||
let mut bot_data = fallback.into_reader();
|
||||
let message_reader;
|
||||
|
||||
if let Some(mut bytes) = decompressed.as_deref() {
|
||||
message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||
&mut bytes,
|
||||
Default::default(),
|
||||
)
|
||||
.context(with_error::DeserializeSnafu)?;
|
||||
|
||||
bot_data = message_reader
|
||||
.get_root()
|
||||
.context(with_error::DeserializeSnafu)?;
|
||||
}
|
||||
|
||||
Ok(f(bot_data))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(module)]
|
||||
pub enum UpdateError {
|
||||
/// couldn't read data for this bot from the storage operator
|
||||
ReadError { source: opendal::Error },
|
||||
|
||||
/// couldn't decompress the bot data from storage
|
||||
DecompressionError { source: std::io::Error },
|
||||
|
||||
/// couldn't deserialize the bot data
|
||||
DeserializeError { source: capnp::Error },
|
||||
|
||||
/// couldn't serialize the (modified) bot data
|
||||
SerializeError { source: capnp::Error },
|
||||
|
||||
/// couldn't create a writer for this bot data in the storage operator
|
||||
WriterError { source: opendal::Error },
|
||||
|
||||
/// couldn't write (modified) data for this bot to the storage operator
|
||||
WriteError { source: std::io::Error },
|
||||
|
||||
/// couldn't finalize writing (modified) data for this bot to the storage operator
|
||||
FinalizeError { source: std::io::Error },
|
||||
}
|
||||
|
||||
impl BotDataManager {
|
||||
pub async fn update<R>(
|
||||
&self,
|
||||
f: impl FnOnce(bot_capnp::bot::Builder<'_>) -> R,
|
||||
) -> Result<R, UpdateError> {
|
||||
let compressed_buffer = self
|
||||
.operator
|
||||
.async_reader_if_exists(PATH)
|
||||
.await
|
||||
.context(update_error::ReadSnafu)?;
|
||||
|
||||
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
|
||||
let decompressed = decompressed_reader
|
||||
.map_async(|mut reader| async move {
|
||||
let mut vec = Vec::new();
|
||||
reader.read_to_end(&mut vec).await?;
|
||||
Ok(vec)
|
||||
})
|
||||
.await
|
||||
.transpose()
|
||||
.context(update_error::DecompressionSnafu)?;
|
||||
|
||||
let mut message = TypedBuilder::<bot_capnp::bot::Owned>::new_default();
|
||||
|
||||
let ret = if let Some(mut bytes) = decompressed.as_deref() {
|
||||
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||
&mut bytes,
|
||||
Default::default(),
|
||||
)
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
|
||||
let bot_data = message_reader
|
||||
.get_root()
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
|
||||
message
|
||||
.set_root(bot_data)
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
|
||||
f(
|
||||
message.get_root().unwrap(), // this is logically impossible
|
||||
)
|
||||
} else {
|
||||
f(message.init_root())
|
||||
};
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
capnp::serialize::write_message(&mut buffer, message.borrow_inner())
|
||||
.context(update_error::SerializeSnafu)?;
|
||||
|
||||
let compressed_writer = self
|
||||
.operator
|
||||
.writer(PATH)
|
||||
.await
|
||||
.context(update_error::WriterSnafu)?
|
||||
.into_futures_async_write();
|
||||
|
||||
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
|
||||
|
||||
decompressed_writer
|
||||
.write_all(&buffer)
|
||||
.await
|
||||
.context(update_error::WriteSnafu)?;
|
||||
|
||||
decompressed_writer
|
||||
.close()
|
||||
.await
|
||||
.context(update_error::FinalizeSnafu)?;
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,6 @@
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use async_compression::futures::bufread::BrotliDecoder;
|
||||
use capnp::message::ReaderOptions;
|
||||
use futures::{AsyncReadExt, TryStreamExt};
|
||||
use opendal::ErrorKind;
|
||||
use futures::TryStreamExt;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
use std::sync::LazyLock;
|
||||
use twilight_model::{
|
||||
application::{
|
||||
command::{Command, CommandType},
|
||||
@@ -20,11 +16,10 @@ use twilight_util::builder::{
|
||||
embed::{EmbedAuthorBuilder, EmbedBuilder, EmbedFieldBuilder},
|
||||
};
|
||||
|
||||
use crate::{bot_capnp, command::State};
|
||||
use crate::command::State;
|
||||
|
||||
const NAME: &str = "debug";
|
||||
const DESCRIPTION: &str =
|
||||
"(Only the bot owner can use this) Show various information for debugging purposes";
|
||||
const NAME: &str = "info";
|
||||
const DESCRIPTION: &str = "Show various information";
|
||||
|
||||
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
|
||||
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
|
||||
@@ -114,54 +109,22 @@ pub async fn handle(state: State, interaction: Interaction) {
|
||||
.await
|
||||
.expect("TODO");
|
||||
|
||||
let heat_script_description = {
|
||||
let compressed_result = state
|
||||
.bot_data
|
||||
.reader("data.bin.brotli")
|
||||
.await
|
||||
.expect("TODO")
|
||||
.into_futures_async_read(..)
|
||||
.await;
|
||||
|
||||
let mut buf = Vec::default();
|
||||
let mut message = capnp::message::TypedBuilder::<bot_capnp::bot::Owned>::new_default();
|
||||
let fallback = message.init_root();
|
||||
|
||||
let message_reader;
|
||||
let mut bot_data = fallback.into_reader();
|
||||
match compressed_result {
|
||||
Ok(compressed) => {
|
||||
let mut decompressed = BrotliDecoder::new(compressed);
|
||||
decompressed.read_to_end(&mut buf).await.expect("TODO");
|
||||
|
||||
message_reader =
|
||||
capnp::serialize_packed::read_message(&*buf, ReaderOptions::default())
|
||||
.expect("TODO");
|
||||
|
||||
bot_data = message_reader
|
||||
.get_root::<bot_capnp::bot::Reader>()
|
||||
.expect("TODO");
|
||||
}
|
||||
Err(error) if error.kind() == ErrorKind::NotFound => {
|
||||
tracing::error!("TODO: proceeding with fallback");
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!(?error, "TODO");
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let heat_script_option = bot_data
|
||||
.has_heat_script()
|
||||
.then(|| bot_data.get_heat_script().expect("TODO"));
|
||||
let heat_script_option =
|
||||
heat_script_option.map(|heat_script| heat_script.to_str().expect("TODO"));
|
||||
|
||||
heat_script_option.map_or("none set yet".into(), |heat_script| {
|
||||
format!("```\n{heat_script}\n```")
|
||||
let heat_script_description = state
|
||||
.bot_data_manager
|
||||
.with(|bot_data| {
|
||||
let heat_script_option = bot_data.has_heat_script().then(|| {
|
||||
bot_data
|
||||
.get_heat_script()
|
||||
.expect("TODO")
|
||||
.to_string()
|
||||
.expect("TODO")
|
||||
});
|
||||
heat_script_option.map_or("none set yet".into(), |heat_script| {
|
||||
format!("```\n{heat_script}\n```")
|
||||
})
|
||||
})
|
||||
};
|
||||
.await
|
||||
.expect("TODO");
|
||||
|
||||
state
|
||||
.discord_client
|
||||
@@ -16,9 +16,9 @@ use twilight_model::{
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{GuildVoiceChannelToTextChannel, UserDataManager, VCs};
|
||||
use crate::{BotDataManager, GuildVoiceChannelToTextChannel, UserDataManager, VCs};
|
||||
|
||||
pub mod debug;
|
||||
pub mod info;
|
||||
pub mod join;
|
||||
pub mod leave;
|
||||
pub mod opt_in;
|
||||
@@ -28,7 +28,7 @@ pub mod opt_out;
|
||||
pub struct State {
|
||||
pub audio_channels: Channels,
|
||||
pub audio_sample_rate: SampleRate,
|
||||
pub bot_data: Operator,
|
||||
pub bot_data_manager: BotDataManager,
|
||||
pub cancellation_token: CancellationToken,
|
||||
pub discord_application_id: Id<ApplicationMarker>,
|
||||
pub discord_bot_owner_user_id: Id<UserMarker>,
|
||||
@@ -58,7 +58,7 @@ where
|
||||
|
||||
pub fn all() -> Vec<(&'static Command, BoxedHandler)> {
|
||||
vec![
|
||||
(&debug::COMMAND, box_handler(debug::handle)),
|
||||
(&info::COMMAND, box_handler(info::handle)),
|
||||
(&join::COMMAND, box_handler(join::handle)),
|
||||
(&leave::COMMAND, box_handler(leave::handle)),
|
||||
(&opt_in::COMMAND, box_handler(opt_in::handle)),
|
||||
|
||||
@@ -5,6 +5,7 @@ use twilight_model::{
|
||||
command::{Command, CommandType},
|
||||
interaction::Interaction,
|
||||
},
|
||||
channel::message::MessageFlags,
|
||||
http::interaction::{InteractionResponse, InteractionResponseType},
|
||||
};
|
||||
use twilight_util::builder::{InteractionResponseDataBuilder, command::CommandBuilder};
|
||||
@@ -79,6 +80,7 @@ pub async fn handle(state: State, interaction: Interaction) {
|
||||
.content(format!(
|
||||
"opted you in, your previous consent was {previous_consent:?}"
|
||||
))
|
||||
.flags(MessageFlags::EPHEMERAL)
|
||||
.build(),
|
||||
),
|
||||
},
|
||||
|
||||
@@ -5,6 +5,7 @@ use twilight_model::{
|
||||
command::{Command, CommandType},
|
||||
interaction::Interaction,
|
||||
},
|
||||
channel::message::MessageFlags,
|
||||
http::interaction::{InteractionResponse, InteractionResponseType},
|
||||
};
|
||||
use twilight_util::builder::{InteractionResponseDataBuilder, command::CommandBuilder};
|
||||
@@ -79,6 +80,7 @@ pub async fn handle(state: State, interaction: Interaction) {
|
||||
.content(format!(
|
||||
"opted you out, your previous consent was {previous_consent:?}"
|
||||
))
|
||||
.flags(MessageFlags::EPHEMERAL)
|
||||
.build(),
|
||||
),
|
||||
},
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
mod bot_data;
|
||||
pub mod command;
|
||||
mod one_to_many;
|
||||
mod one_to_many_with_data;
|
||||
@@ -11,6 +12,7 @@ mod vc_user;
|
||||
capnp::generated_code!(mod bot_capnp);
|
||||
capnp::generated_code!(mod user_capnp);
|
||||
|
||||
pub use bot_data::BotDataManager;
|
||||
pub use command::{Router as CommandRouter, State, all as all_commands};
|
||||
pub use one_to_many::OneToManyUniqueBTreeMap;
|
||||
pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use clap::Parser;
|
||||
use fomo_reducer::{
|
||||
CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, all_commands,
|
||||
command, initialize_vcs, update_vcs,
|
||||
BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager,
|
||||
all_commands, command, initialize_vcs, update_vcs,
|
||||
};
|
||||
use secrecy::{ExposeSecret, SecretString};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
@@ -322,6 +322,7 @@ async fn main() -> Result<(), MainError> {
|
||||
let recording_data = recording_data.into_inner();
|
||||
let user_data = user_data.into_inner();
|
||||
|
||||
let bot_data_manager = BotDataManager::new(bot_data);
|
||||
let user_data_manager = UserDataManager::new(user_data);
|
||||
|
||||
let discord_voice_channel_corresponding_text_channel = {
|
||||
@@ -343,7 +344,7 @@ async fn main() -> Result<(), MainError> {
|
||||
let state = State {
|
||||
audio_channels,
|
||||
audio_sample_rate,
|
||||
bot_data,
|
||||
bot_data_manager,
|
||||
cancellation_token: cancellation_token.clone(),
|
||||
discord_application_id,
|
||||
discord_bot_owner_user_id,
|
||||
|
||||
Reference in New Issue
Block a user