Compare commits

...

2 Commits

Author SHA1 Message Date
2dbc0a2e87 chore: format 2026-05-06 21:40:14 -04:00
44c7aa60c2 feat: cache user data in memory to make access faster, simplifies code 2026-05-06 21:40:06 -04:00
4 changed files with 152 additions and 114 deletions

View File

@@ -26,20 +26,18 @@ pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
.build() .build()
}); });
#[tracing::instrument] #[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) { pub async fn handle(state: State, interaction: Interaction) {
let revision = build_info::COMMIT_HASH; let revision = build_info::COMMIT_HASH;
let bot_owner_user_id = state.discord_bot_owner_user_id; let bot_owner_user_id = state.discord_bot_owner_user_id;
let is_bot_owner = let is_bot_owner = interaction
interaction .member
.member .as_ref()
.as_ref() .and_then(|member| member.user.as_ref().map(|user| user.id))
.and_then(|member| member.user.as_ref().map(|user| user.id)) .map(|user_id| user_id == bot_owner_user_id)
.map(|user_id| user_id == bot_owner_user_id) .unwrap_or(false);
.unwrap_or(false);
let bot_owner_mention = format!("<@{}>", bot_owner_user_id); let bot_owner_mention = format!("<@{}>", bot_owner_user_id);

View File

@@ -1,6 +1,7 @@
use clap::Parser; use clap::Parser;
use fomo_reducer::{ use fomo_reducer::{
BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, VCsWatcher, all_commands, command, initialize_vcs, update_vcs BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager,
VCsWatcher, all_commands, command, initialize_vcs, update_vcs,
}; };
use secrecy::{ExposeSecret, SecretString}; use secrecy::{ExposeSecret, SecretString};
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
@@ -461,7 +462,9 @@ async fn handle_event(command_router: Arc<CommandRouter>, state: State, event: E
match event { match event {
Event::VoiceStateUpdate(voice_state_update) => { Event::VoiceStateUpdate(voice_state_update) => {
state.vcs_watcher.send_modify(|vcs| update_vcs(&voice_state_update, vcs)); state
.vcs_watcher
.send_modify(|vcs| update_vcs(&voice_state_update, vcs));
} }
Event::InteractionCreate(interaction_create) => { Event::InteractionCreate(interaction_create) => {
let InteractionCreate(interaction) = *interaction_create; let InteractionCreate(interaction) = *interaction_create;

View File

@@ -15,7 +15,8 @@ use crate::{OneToManyUniqueBTreeMapWithData, OneToOneBTreeMap, UserInVCData, Voi
pub type GuildVoiceChannelToTextChannel = pub type GuildVoiceChannelToTextChannel =
BTreeMap<Id<GuildMarker>, OneToOneBTreeMap<Id<ChannelMarker>, Id<ChannelMarker>>>; BTreeMap<Id<GuildMarker>, OneToOneBTreeMap<Id<ChannelMarker>, Id<ChannelMarker>>>;
pub type VCsInGuild = OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>; pub type VCsInGuild =
OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>;
pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>; pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>;
pub type VCsWatcher = watch::Sender<VCs>; pub type VCsWatcher = watch::Sender<VCs>;
@@ -108,7 +109,9 @@ pub fn update_vcs(voice_state_update: &VoiceStateUpdate, vcs: &mut VCs) {
.build(); .build();
let user_in_vc_data = voice_status.into(); let user_in_vc_data = voice_status.into();
vcs.entry(guild_id).or_default().insert(channel_id, user_id, user_in_vc_data); vcs.entry(guild_id)
.or_default()
.insert(channel_id, user_id, user_in_vc_data);
tracing::info!( tracing::info!(
?guild_id, ?guild_id,

View File

@@ -1,8 +1,10 @@
use std::str::FromStr; use std::{str::FromStr, sync::Arc};
use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder}; use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder};
use bytes::Bytes;
use capnp::message::TypedBuilder; use capnp::message::TypedBuilder;
use futures::{AsyncReadExt, AsyncWriteExt, TryStream, TryStreamExt}; use futures::{AsyncReadExt, AsyncWriteExt, TryStream, TryStreamExt};
use moka::future::Cache;
use opendal::Operator; use opendal::Operator;
use snafu::{OptionExt as _, ResultExt as _, Snafu, ensure}; use snafu::{OptionExt as _, ResultExt as _, Snafu, ensure};
use twilight_model::id::{Id, marker::UserMarker}; use twilight_model::id::{Id, marker::UserMarker};
@@ -14,11 +16,15 @@ pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UserDataManager { pub struct UserDataManager {
operator: Operator, operator: Operator,
cache: Cache<Id<UserMarker>, Bytes>,
} }
impl UserDataManager { impl UserDataManager {
pub fn new(operator: Operator) -> Self { pub fn new(operator: Operator) -> Self {
Self { operator } Self {
operator,
cache: Cache::builder().build(),
}
} }
} }
@@ -97,12 +103,105 @@ impl UserDataManager {
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[snafu(module)] #[snafu(module)]
pub enum WithError { pub enum ReadError {
/// couldn't read data for this user from the storage operator /// couldn't read data for this user from the storage operator
ReadError { source: opendal::Error }, OperatorReaderError { source: opendal::Error },
/// couldn't decompress the user data from storage /// couldn't decompress the user data from storage
DecompressionError { source: std::io::Error }, DecompressionError { source: std::io::Error },
}
impl UserDataManager {
async fn read(&self, id: Id<UserMarker>) -> Result<Bytes, ReadError> {
self.cache
.try_get_with::<_, ReadError>(id, async {
let path = path(id);
let compressed_reader = self
.operator
.async_reader_if_exists(&path)
.await
.context(read_error::OperatorReaderSnafu)?;
let decompressed_reader = compressed_reader.map(BrotliDecoder::new);
let decompressed = decompressed_reader
.map_async(|mut reader| async move {
let mut vec = Vec::new();
reader.read_to_end(&mut vec).await?;
Ok(vec)
})
.await
.transpose()
.context(read_error::DecompressionSnafu)?;
let vec = decompressed.unwrap_or_else(|| {
let mut fallback = TypedBuilder::<user_capnp::user::Owned>::new_default();
fallback.init_root();
let mut vec = Vec::new();
capnp::serialize::write_message(&mut vec, fallback.borrow_inner())
.expect("cannot error");
vec
});
let bytes = vec.into();
Ok(bytes)
})
.await
.map_err(|arc| {
Arc::into_inner(arc).expect("expected this to be the only reference to the error")
})
}
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum WriteError {
/// couldn't create a writer for this user data in the storage operator
OperatorWriterError { source: opendal::Error },
/// couldn't write (modified) data for this user to the storage operator
WriteError { source: std::io::Error },
/// couldn't finalize writing (modified) data for this user to the storage operator
FinalizeError { source: std::io::Error },
}
impl UserDataManager {
async fn write(&self, id: Id<UserMarker>, bytes: Bytes) -> Result<(), WriteError> {
let path = path(id);
let compressed_writer = self
.operator
.writer(&path)
.await
.context(write_error::OperatorWriterSnafu)?
.into_futures_async_write();
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
decompressed_writer
.write_all(&bytes)
.await
.context(write_error::WriteSnafu)?;
decompressed_writer
.close()
.await
.context(write_error::FinalizeSnafu)?;
self.cache.insert(id, bytes).await;
Ok(())
}
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum WithError {
/// couldn't read the user data from storage
ReadError { source: ReadError },
/// couldn't deserialize the user data /// couldn't deserialize the user data
DeserializeError { source: capnp::Error }, DeserializeError { source: capnp::Error },
@@ -114,41 +213,18 @@ impl UserDataManager {
id: Id<UserMarker>, id: Id<UserMarker>,
f: impl FnOnce(user_capnp::user::Reader<'_>) -> R, f: impl FnOnce(user_capnp::user::Reader<'_>) -> R,
) -> Result<R, WithError> { ) -> Result<R, WithError> {
let compressed_buffer = self let message_bytes = self.read(id).await.context(with_error::ReadSnafu)?;
.operator
.async_reader_if_exists(&path(id))
.await
.context(with_error::ReadSnafu)?;
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
let decompressed = decompressed_reader &mut message_bytes.as_ref(),
.map_async(|mut reader| async move { Default::default(),
let mut vec = Vec::new(); )
reader.read_to_end(&mut vec).await?; .context(with_error::DeserializeSnafu)?;
Ok(vec)
})
.await
.transpose()
.context(with_error::DecompressionSnafu)?;
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default(); let user_data = message_reader
let fallback = message.init_root(); .get_root()
let mut user_data = fallback.into_reader();
let message_reader;
if let Some(mut bytes) = decompressed.as_deref() {
message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
&mut bytes,
Default::default(),
)
.context(with_error::DeserializeSnafu)?; .context(with_error::DeserializeSnafu)?;
user_data = message_reader
.get_root()
.context(with_error::DeserializeSnafu)?;
}
Ok(f(user_data)) Ok(f(user_data))
} }
} }
@@ -156,11 +232,8 @@ impl UserDataManager {
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[snafu(module)] #[snafu(module)]
pub enum UpdateError { pub enum UpdateError {
/// couldn't read data for this user from the storage operator /// couldn't read the user data from storage
ReadError { source: opendal::Error }, ReadError { source: ReadError },
/// couldn't decompress the user data from storage
DecompressionError { source: std::io::Error },
/// couldn't deserialize the user data /// couldn't deserialize the user data
DeserializeError { source: capnp::Error }, DeserializeError { source: capnp::Error },
@@ -168,14 +241,8 @@ pub enum UpdateError {
/// couldn't serialize the (modified) user data /// couldn't serialize the (modified) user data
SerializeError { source: capnp::Error }, SerializeError { source: capnp::Error },
/// couldn't create a writer for this user data in the storage operator /// couldn't write the user data to storage
WriterError { source: opendal::Error }, WriteError { source: WriteError },
/// couldn't write (modified) data for this user to the storage operator
WriteError { source: std::io::Error },
/// couldn't finalize writing (modified) data for this user to the storage operator
FinalizeError { source: std::io::Error },
} }
impl UserDataManager { impl UserDataManager {
@@ -184,71 +251,38 @@ impl UserDataManager {
id: Id<UserMarker>, id: Id<UserMarker>,
f: impl FnOnce(user_capnp::user::Builder<'_>) -> R, f: impl FnOnce(user_capnp::user::Builder<'_>) -> R,
) -> Result<R, UpdateError> { ) -> Result<R, UpdateError> {
let path = path(id); let bytes = self.read(id).await.context(update_error::ReadSnafu)?;
let compressed_buffer = self
.operator
.async_reader_if_exists(&path)
.await
.context(update_error::ReadSnafu)?;
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
let decompressed = decompressed_reader &mut bytes.as_ref(),
.map_async(|mut reader| async move { Default::default(),
let mut vec = Vec::new(); )
reader.read_to_end(&mut vec).await?; .context(update_error::DeserializeSnafu)?;
Ok(vec)
})
.await
.transpose()
.context(update_error::DecompressionSnafu)?;
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default(); let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
let ret = if let Some(mut bytes) = decompressed.as_deref() { let user_data = message_reader
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( .get_root()
&mut bytes,
Default::default(),
)
.context(update_error::DeserializeSnafu)?; .context(update_error::DeserializeSnafu)?;
let user_data = message_reader message
.get_root() .set_root(user_data)
.context(update_error::DeserializeSnafu)?; .context(update_error::DeserializeSnafu)?;
message let ret = f(
.set_root(user_data) message.get_root().unwrap(), // this is logically impossible
.context(update_error::DeserializeSnafu)?; );
f( let mut vec = Vec::new();
message.get_root().unwrap(), // this is logically impossible capnp::serialize::write_message(&mut vec, message.borrow_inner())
)
} else {
f(message.init_root())
};
let mut buffer = Vec::new();
capnp::serialize::write_message(&mut buffer, message.borrow_inner())
.context(update_error::SerializeSnafu)?; .context(update_error::SerializeSnafu)?;
let compressed_writer = self let bytes = vec.into();
.operator
.writer(&path)
.await
.context(update_error::WriterSnafu)?
.into_futures_async_write();
let mut decompressed_writer = BrotliEncoder::new(compressed_writer); self.write(id, bytes)
decompressed_writer
.write_all(&buffer)
.await .await
.context(update_error::WriteSnafu)?; .context(update_error::WriteSnafu)?;
decompressed_writer
.close()
.await
.context(update_error::FinalizeSnafu)?;
Ok(ret) Ok(ret)
} }
} }