Compare commits
2 Commits
fa61d5a4b6
...
2dbc0a2e87
| Author | SHA1 | Date | |
|---|---|---|---|
| 2dbc0a2e87 | |||
| 44c7aa60c2 |
@@ -26,21 +26,19 @@ pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
|
||||
.build()
|
||||
});
|
||||
|
||||
|
||||
#[tracing::instrument]
|
||||
pub async fn handle(state: State, interaction: Interaction) {
|
||||
let revision = build_info::COMMIT_HASH;
|
||||
|
||||
let bot_owner_user_id = state.discord_bot_owner_user_id;
|
||||
|
||||
let is_bot_owner =
|
||||
interaction
|
||||
.member
|
||||
.as_ref()
|
||||
.and_then(|member| member.user.as_ref().map(|user| user.id))
|
||||
.map(|user_id| user_id == bot_owner_user_id)
|
||||
.unwrap_or(false);
|
||||
|
||||
let is_bot_owner = interaction
|
||||
.member
|
||||
.as_ref()
|
||||
.and_then(|member| member.user.as_ref().map(|user| user.id))
|
||||
.map(|user_id| user_id == bot_owner_user_id)
|
||||
.unwrap_or(false);
|
||||
|
||||
let bot_owner_mention = format!("<@{}>", bot_owner_user_id);
|
||||
|
||||
let opt_in_mention = format!(
|
||||
@@ -58,7 +56,7 @@ pub async fn handle(state: State, interaction: Interaction) {
|
||||
.create_response(
|
||||
interaction.id,
|
||||
&interaction.token,
|
||||
&InteractionResponse {
|
||||
&InteractionResponse {
|
||||
kind: InteractionResponseType::ChannelMessageWithSource,
|
||||
data: Some(
|
||||
InteractionResponseDataBuilder::new().embeds([
|
||||
@@ -87,7 +85,7 @@ pub async fn handle(state: State, interaction: Interaction) {
|
||||
})
|
||||
.await
|
||||
.expect("TODO");
|
||||
|
||||
|
||||
if is_bot_owner {
|
||||
let heat_script_description = state
|
||||
.bot_data_manager
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use clap::Parser;
|
||||
use fomo_reducer::{
|
||||
BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, VCsWatcher, all_commands, command, initialize_vcs, update_vcs
|
||||
BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager,
|
||||
VCsWatcher, all_commands, command, initialize_vcs, update_vcs,
|
||||
};
|
||||
use secrecy::{ExposeSecret, SecretString};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
@@ -461,7 +462,9 @@ async fn handle_event(command_router: Arc<CommandRouter>, state: State, event: E
|
||||
|
||||
match event {
|
||||
Event::VoiceStateUpdate(voice_state_update) => {
|
||||
state.vcs_watcher.send_modify(|vcs| update_vcs(&voice_state_update, vcs));
|
||||
state
|
||||
.vcs_watcher
|
||||
.send_modify(|vcs| update_vcs(&voice_state_update, vcs));
|
||||
}
|
||||
Event::InteractionCreate(interaction_create) => {
|
||||
let InteractionCreate(interaction) = *interaction_create;
|
||||
|
||||
@@ -15,7 +15,8 @@ use crate::{OneToManyUniqueBTreeMapWithData, OneToOneBTreeMap, UserInVCData, Voi
|
||||
pub type GuildVoiceChannelToTextChannel =
|
||||
BTreeMap<Id<GuildMarker>, OneToOneBTreeMap<Id<ChannelMarker>, Id<ChannelMarker>>>;
|
||||
|
||||
pub type VCsInGuild = OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>;
|
||||
pub type VCsInGuild =
|
||||
OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>;
|
||||
pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>;
|
||||
pub type VCsWatcher = watch::Sender<VCs>;
|
||||
|
||||
@@ -108,7 +109,9 @@ pub fn update_vcs(voice_state_update: &VoiceStateUpdate, vcs: &mut VCs) {
|
||||
.build();
|
||||
let user_in_vc_data = voice_status.into();
|
||||
|
||||
vcs.entry(guild_id).or_default().insert(channel_id, user_id, user_in_vc_data);
|
||||
vcs.entry(guild_id)
|
||||
.or_default()
|
||||
.insert(channel_id, user_id, user_in_vc_data);
|
||||
|
||||
tracing::info!(
|
||||
?guild_id,
|
||||
|
||||
232
src/user_data.rs
232
src/user_data.rs
@@ -1,8 +1,10 @@
|
||||
use std::str::FromStr;
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
|
||||
use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder};
|
||||
use bytes::Bytes;
|
||||
use capnp::message::TypedBuilder;
|
||||
use futures::{AsyncReadExt, AsyncWriteExt, TryStream, TryStreamExt};
|
||||
use moka::future::Cache;
|
||||
use opendal::Operator;
|
||||
use snafu::{OptionExt as _, ResultExt as _, Snafu, ensure};
|
||||
use twilight_model::id::{Id, marker::UserMarker};
|
||||
@@ -14,11 +16,15 @@ pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UserDataManager {
|
||||
operator: Operator,
|
||||
cache: Cache<Id<UserMarker>, Bytes>,
|
||||
}
|
||||
|
||||
impl UserDataManager {
|
||||
pub fn new(operator: Operator) -> Self {
|
||||
Self { operator }
|
||||
Self {
|
||||
operator,
|
||||
cache: Cache::builder().build(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,12 +103,105 @@ impl UserDataManager {
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(module)]
|
||||
pub enum WithError {
|
||||
pub enum ReadError {
|
||||
/// couldn't read data for this user from the storage operator
|
||||
ReadError { source: opendal::Error },
|
||||
OperatorReaderError { source: opendal::Error },
|
||||
|
||||
/// couldn't decompress the user data from storage
|
||||
DecompressionError { source: std::io::Error },
|
||||
}
|
||||
|
||||
impl UserDataManager {
|
||||
async fn read(&self, id: Id<UserMarker>) -> Result<Bytes, ReadError> {
|
||||
self.cache
|
||||
.try_get_with::<_, ReadError>(id, async {
|
||||
let path = path(id);
|
||||
let compressed_reader = self
|
||||
.operator
|
||||
.async_reader_if_exists(&path)
|
||||
.await
|
||||
.context(read_error::OperatorReaderSnafu)?;
|
||||
|
||||
let decompressed_reader = compressed_reader.map(BrotliDecoder::new);
|
||||
|
||||
let decompressed = decompressed_reader
|
||||
.map_async(|mut reader| async move {
|
||||
let mut vec = Vec::new();
|
||||
reader.read_to_end(&mut vec).await?;
|
||||
Ok(vec)
|
||||
})
|
||||
.await
|
||||
.transpose()
|
||||
.context(read_error::DecompressionSnafu)?;
|
||||
|
||||
let vec = decompressed.unwrap_or_else(|| {
|
||||
let mut fallback = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
||||
fallback.init_root();
|
||||
|
||||
let mut vec = Vec::new();
|
||||
capnp::serialize::write_message(&mut vec, fallback.borrow_inner())
|
||||
.expect("cannot error");
|
||||
vec
|
||||
});
|
||||
|
||||
let bytes = vec.into();
|
||||
|
||||
Ok(bytes)
|
||||
})
|
||||
.await
|
||||
.map_err(|arc| {
|
||||
Arc::into_inner(arc).expect("expected this to be the only reference to the error")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(module)]
|
||||
pub enum WriteError {
|
||||
/// couldn't create a writer for this user data in the storage operator
|
||||
OperatorWriterError { source: opendal::Error },
|
||||
|
||||
/// couldn't write (modified) data for this user to the storage operator
|
||||
WriteError { source: std::io::Error },
|
||||
|
||||
/// couldn't finalize writing (modified) data for this user to the storage operator
|
||||
FinalizeError { source: std::io::Error },
|
||||
}
|
||||
|
||||
impl UserDataManager {
|
||||
async fn write(&self, id: Id<UserMarker>, bytes: Bytes) -> Result<(), WriteError> {
|
||||
let path = path(id);
|
||||
|
||||
let compressed_writer = self
|
||||
.operator
|
||||
.writer(&path)
|
||||
.await
|
||||
.context(write_error::OperatorWriterSnafu)?
|
||||
.into_futures_async_write();
|
||||
|
||||
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
|
||||
|
||||
decompressed_writer
|
||||
.write_all(&bytes)
|
||||
.await
|
||||
.context(write_error::WriteSnafu)?;
|
||||
|
||||
decompressed_writer
|
||||
.close()
|
||||
.await
|
||||
.context(write_error::FinalizeSnafu)?;
|
||||
|
||||
self.cache.insert(id, bytes).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(module)]
|
||||
pub enum WithError {
|
||||
/// couldn't read the user data from storage
|
||||
ReadError { source: ReadError },
|
||||
|
||||
/// couldn't deserialize the user data
|
||||
DeserializeError { source: capnp::Error },
|
||||
@@ -114,41 +213,18 @@ impl UserDataManager {
|
||||
id: Id<UserMarker>,
|
||||
f: impl FnOnce(user_capnp::user::Reader<'_>) -> R,
|
||||
) -> Result<R, WithError> {
|
||||
let compressed_buffer = self
|
||||
.operator
|
||||
.async_reader_if_exists(&path(id))
|
||||
.await
|
||||
.context(with_error::ReadSnafu)?;
|
||||
let message_bytes = self.read(id).await.context(with_error::ReadSnafu)?;
|
||||
|
||||
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
|
||||
let decompressed = decompressed_reader
|
||||
.map_async(|mut reader| async move {
|
||||
let mut vec = Vec::new();
|
||||
reader.read_to_end(&mut vec).await?;
|
||||
Ok(vec)
|
||||
})
|
||||
.await
|
||||
.transpose()
|
||||
.context(with_error::DecompressionSnafu)?;
|
||||
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||
&mut message_bytes.as_ref(),
|
||||
Default::default(),
|
||||
)
|
||||
.context(with_error::DeserializeSnafu)?;
|
||||
|
||||
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
||||
let fallback = message.init_root();
|
||||
|
||||
let mut user_data = fallback.into_reader();
|
||||
let message_reader;
|
||||
|
||||
if let Some(mut bytes) = decompressed.as_deref() {
|
||||
message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||
&mut bytes,
|
||||
Default::default(),
|
||||
)
|
||||
let user_data = message_reader
|
||||
.get_root()
|
||||
.context(with_error::DeserializeSnafu)?;
|
||||
|
||||
user_data = message_reader
|
||||
.get_root()
|
||||
.context(with_error::DeserializeSnafu)?;
|
||||
}
|
||||
|
||||
Ok(f(user_data))
|
||||
}
|
||||
}
|
||||
@@ -156,11 +232,8 @@ impl UserDataManager {
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(module)]
|
||||
pub enum UpdateError {
|
||||
/// couldn't read data for this user from the storage operator
|
||||
ReadError { source: opendal::Error },
|
||||
|
||||
/// couldn't decompress the user data from storage
|
||||
DecompressionError { source: std::io::Error },
|
||||
/// couldn't read the user data from storage
|
||||
ReadError { source: ReadError },
|
||||
|
||||
/// couldn't deserialize the user data
|
||||
DeserializeError { source: capnp::Error },
|
||||
@@ -168,14 +241,8 @@ pub enum UpdateError {
|
||||
/// couldn't serialize the (modified) user data
|
||||
SerializeError { source: capnp::Error },
|
||||
|
||||
/// couldn't create a writer for this user data in the storage operator
|
||||
WriterError { source: opendal::Error },
|
||||
|
||||
/// couldn't write (modified) data for this user to the storage operator
|
||||
WriteError { source: std::io::Error },
|
||||
|
||||
/// couldn't finalize writing (modified) data for this user to the storage operator
|
||||
FinalizeError { source: std::io::Error },
|
||||
/// couldn't write the user data to storage
|
||||
WriteError { source: WriteError },
|
||||
}
|
||||
|
||||
impl UserDataManager {
|
||||
@@ -184,71 +251,38 @@ impl UserDataManager {
|
||||
id: Id<UserMarker>,
|
||||
f: impl FnOnce(user_capnp::user::Builder<'_>) -> R,
|
||||
) -> Result<R, UpdateError> {
|
||||
let path = path(id);
|
||||
let compressed_buffer = self
|
||||
.operator
|
||||
.async_reader_if_exists(&path)
|
||||
.await
|
||||
.context(update_error::ReadSnafu)?;
|
||||
let bytes = self.read(id).await.context(update_error::ReadSnafu)?;
|
||||
|
||||
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
|
||||
let decompressed = decompressed_reader
|
||||
.map_async(|mut reader| async move {
|
||||
let mut vec = Vec::new();
|
||||
reader.read_to_end(&mut vec).await?;
|
||||
Ok(vec)
|
||||
})
|
||||
.await
|
||||
.transpose()
|
||||
.context(update_error::DecompressionSnafu)?;
|
||||
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||
&mut bytes.as_ref(),
|
||||
Default::default(),
|
||||
)
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
|
||||
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
|
||||
|
||||
let ret = if let Some(mut bytes) = decompressed.as_deref() {
|
||||
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
|
||||
&mut bytes,
|
||||
Default::default(),
|
||||
)
|
||||
let user_data = message_reader
|
||||
.get_root()
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
|
||||
let user_data = message_reader
|
||||
.get_root()
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
message
|
||||
.set_root(user_data)
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
|
||||
message
|
||||
.set_root(user_data)
|
||||
.context(update_error::DeserializeSnafu)?;
|
||||
let ret = f(
|
||||
message.get_root().unwrap(), // this is logically impossible
|
||||
);
|
||||
|
||||
f(
|
||||
message.get_root().unwrap(), // this is logically impossible
|
||||
)
|
||||
} else {
|
||||
f(message.init_root())
|
||||
};
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
capnp::serialize::write_message(&mut buffer, message.borrow_inner())
|
||||
let mut vec = Vec::new();
|
||||
capnp::serialize::write_message(&mut vec, message.borrow_inner())
|
||||
.context(update_error::SerializeSnafu)?;
|
||||
|
||||
let compressed_writer = self
|
||||
.operator
|
||||
.writer(&path)
|
||||
.await
|
||||
.context(update_error::WriterSnafu)?
|
||||
.into_futures_async_write();
|
||||
let bytes = vec.into();
|
||||
|
||||
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
|
||||
|
||||
decompressed_writer
|
||||
.write_all(&buffer)
|
||||
self.write(id, bytes)
|
||||
.await
|
||||
.context(update_error::WriteSnafu)?;
|
||||
|
||||
decompressed_writer
|
||||
.close()
|
||||
.await
|
||||
.context(update_error::FinalizeSnafu)?;
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user