From 0ce26fc0e555c0f82dffbec372ac068157fe4590 Mon Sep 17 00:00:00 2001 From: Jacob Date: Tue, 21 Apr 2026 03:11:27 -0400 Subject: [PATCH] feat: user consent setting and retrieving (NOTE: does not affect recording yet) --- Cargo.lock | 53 ++++++-- Cargo.toml | 6 +- src/command/mod.rs | 9 +- src/command/opt_in.rs | 76 +++++++++++- src/command/opt_out.rs | 112 +++++++++++++---- src/lib.rs | 40 +++--- src/main.rs | 8 +- src/operator_ext.rs | 24 ++++ src/option_ext.rs | 11 ++ src/storage.rs | 1 - src/track_vcs.rs | 269 +++++++++++++++++++++-------------------- src/user_data.rs | 173 ++++++++++++++++++++++++++ user.capnp | 8 +- 13 files changed, 589 insertions(+), 201 deletions(-) create mode 100644 src/operator_ext.rs create mode 100644 src/option_ext.rs create mode 100644 src/user_data.rs diff --git a/Cargo.lock b/Cargo.lock index a0c2a7f..be69a22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -562,9 +562,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cacache" @@ -1601,6 +1601,35 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "ext-trait" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccef4f53516d7589a8ed95216b6ebc9d519df033c1303b42125bfe57aa475d23" +dependencies = [ + "ext-trait-proc_macros", +] + +[[package]] +name = "ext-trait-proc_macros" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "025e48a9a5db92b84dbd3b6be37853a0e60c1ce9c7c03c08e6ac282766f3e3f0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + +[[package]] +name = "extension-traits" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7dae1256e3fea2900e1e674bae228edc852f5ce9ccb1c916a496a33cb9bc4cb" +dependencies = [ + "ext-trait", +] + [[package]] name = "fastpool" version = "1.1.0" @@ -1700,12 +1729,15 @@ version = "0.1.0" dependencies = [ "async-compression", "async-trait", + "bytes", "capnp", "capnpc", "clap", "dashmap 6.1.0", + "extension-traits", "futures", "hound", + "moka", "opendal", "opus2", "patricia_tree 0.10.1", @@ -1725,6 +1757,7 @@ dependencies = [ "twilight-model", "twilight-util", "typed-builder 0.23.2", + "yoke", ] [[package]] @@ -3414,9 +3447,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.12" +version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a" +checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" dependencies = [ "async-lock", "crossbeam-channel", @@ -4954,9 +4987,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.42" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] @@ -8369,9 +8402,9 @@ checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" [[package]] name = "yoke" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -8380,9 +8413,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 7a9d842..cc8b314 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,11 +6,14 @@ edition = "2024" [dependencies] async-compression = { version = "0.4.41", features = ["brotli", "futures-io"] } async-trait = "0.1.89" +bytes = "1.11.1" capnp = "0.25.3" clap = { version = "4.5.40", features = ["derive", "env"] } dashmap = "6.1.0" +extension-traits = "2.0.2" futures = "0.3.32" hound = "3.5.1" +moka = { version = "0.12.15", features = ["future"] } opendal = { git = "https://github.com/apache/opendal", features = [ "services-azfile", "services-aliyun-drive", @@ -65,7 +68,7 @@ songbird = { version = "0.6.0", default-features = false, features = [ strum = { version = "0.28.0", features = ["derive"] } time = "0.3.47" tokio = { version = "1.46.0", features = ["rt-multi-thread", "macros", "signal"] } -tokio-util = "0.7.18" +tokio-util = { version = "0.7.18", features = ["io"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } twilight-gateway = { version = "0.17", default-features = false, features = ["rustls-webpki-roots", "twilight-http"] } @@ -77,6 +80,7 @@ twilight-http = { version = "0.17", default-features = false, features = [ twilight-model = "0.17" twilight-util = { version = "0.17", features = ["builder"] } typed-builder = "0.23.2" +yoke = "0.8.2" [build-dependencies] capnpc = "0.25.3" diff --git a/src/command/mod.rs b/src/command/mod.rs index 142cb48..ff6f510 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -3,7 +3,10 @@ use std::{fmt::Debug, sync::Arc}; use futures::future::BoxFuture; use opendal::Operator; use patricia_tree::StringPatriciaMap; -use songbird::{Songbird, driver::{Channels, SampleRate}}; +use songbird::{ + Songbird, + driver::{Channels, SampleRate}, +}; use tokio_util::sync::CancellationToken; use twilight_model::{ application::{command::Command, interaction::Interaction}, @@ -13,7 +16,7 @@ use twilight_model::{ }, }; -use crate::{VCs, track_vcs::GuildVoiceChannelToTextChannel}; +use crate::{GuildVoiceChannelToTextChannel, UserDataManager, VCs}; mod debug; mod join; @@ -34,7 +37,7 @@ pub struct State { pub discord_voice_channel_corresponding_text_channel: Arc, pub recording_data: Operator, pub songbird: Arc, - pub user_data: Operator, + pub user_data_manager: UserDataManager, pub vcs: Arc, } diff --git a/src/command/opt_in.rs b/src/command/opt_in.rs index 7378f5d..c8611b6 100644 --- a/src/command/opt_in.rs +++ b/src/command/opt_in.rs @@ -1,12 +1,15 @@ use std::sync::LazyLock; -use twilight_model::application::{ - command::{Command, CommandType}, - interaction::Interaction, +use twilight_model::{ + application::{ + command::{Command, CommandType}, + interaction::Interaction, + }, + http::interaction::{InteractionResponse, InteractionResponseType}, }; -use twilight_util::builder::command::CommandBuilder; +use twilight_util::builder::{InteractionResponseDataBuilder, command::CommandBuilder}; -use crate::command::State; +use crate::{command::State, user_capnp::user::Consent}; const NAME: &str = "opt-in"; const DESCRIPTION: &str = "Opt in to being recorded"; @@ -20,5 +23,66 @@ pub static COMMAND: LazyLock = LazyLock::new(|| { #[tracing::instrument] pub async fn handle(state: State, interaction: Interaction) { - todo!(); + let user_id = interaction + .member + .as_ref() + .and_then(|member| member.user.as_ref().map(|user| user.id)); + + let user_id = match user_id { + Some(user_id) => user_id, + None => { + state + .discord_client + .interaction(state.discord_application_id) + .create_response( + interaction.id, + &interaction.token, + &InteractionResponse { + kind: InteractionResponseType::ChannelMessageWithSource, + data: Some( + InteractionResponseDataBuilder::new() + .content("TODO") + .build(), + ), + }, + ) + .await + .expect("TODO"); + return; + } + }; + + let previous_consent = state + .user_data_manager + .update(user_id, |mut user_data| { + let previous_consent = user_data + .reborrow() + .get_voice_recording_consent() + .expect("TODO"); + user_data.set_voice_recording_consent(Consent::Granted); + + previous_consent + }) + .await + .expect("TODO"); + + state + .discord_client + .interaction(state.discord_application_id) + .create_response( + interaction.id, + &interaction.token, + &InteractionResponse { + kind: InteractionResponseType::ChannelMessageWithSource, + data: Some( + InteractionResponseDataBuilder::new() + .content(format!( + "opted you in, your previous consent was {previous_consent:?}" + )) + .build(), + ), + }, + ) + .await + .expect("TODO"); } diff --git a/src/command/opt_out.rs b/src/command/opt_out.rs index 7651201..bcf19d1 100644 --- a/src/command/opt_out.rs +++ b/src/command/opt_out.rs @@ -1,24 +1,88 @@ -use std::sync::LazyLock; - -use twilight_model::application::{ - command::{Command, CommandType}, - interaction::Interaction, -}; -use twilight_util::builder::command::CommandBuilder; - -use crate::command::State; - -const NAME: &str = "opt-out"; -const DESCRIPTION: &str = "Opt out of being recorded"; - -pub static COMMAND: LazyLock = LazyLock::new(|| { - CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput) - .validate() - .expect("command wasn't correct") - .build() -}); - -#[tracing::instrument] -pub async fn handle(state: State, interaction: Interaction) { - todo!(); -} +use std::sync::LazyLock; + +use twilight_model::{ + application::{ + command::{Command, CommandType}, + interaction::Interaction, + }, + http::interaction::{InteractionResponse, InteractionResponseType}, +}; +use twilight_util::builder::{InteractionResponseDataBuilder, command::CommandBuilder}; + +use crate::{command::State, user_capnp::user::Consent}; + +const NAME: &str = "opt-out"; +const DESCRIPTION: &str = "Opt out of being recorded"; + +pub static COMMAND: LazyLock = LazyLock::new(|| { + CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput) + .validate() + .expect("command wasn't correct") + .build() +}); + +#[tracing::instrument] +pub async fn handle(state: State, interaction: Interaction) { + let user_id = interaction + .member + .as_ref() + .and_then(|member| member.user.as_ref().map(|user| user.id)); + + let user_id = match user_id { + Some(user_id) => user_id, + None => { + state + .discord_client + .interaction(state.discord_application_id) + .create_response( + interaction.id, + &interaction.token, + &InteractionResponse { + kind: InteractionResponseType::ChannelMessageWithSource, + data: Some( + InteractionResponseDataBuilder::new() + .content("TODO") + .build(), + ), + }, + ) + .await + .expect("TODO"); + return; + } + }; + + let previous_consent = state + .user_data_manager + .update(user_id, |mut user_data| { + let previous_consent = user_data + .reborrow() + .get_voice_recording_consent() + .expect("TODO"); + user_data.set_voice_recording_consent(Consent::Withheld); + + previous_consent + }) + .await + .expect("TODO"); + + state + .discord_client + .interaction(state.discord_application_id) + .create_response( + interaction.id, + &interaction.token, + &InteractionResponse { + kind: InteractionResponseType::ChannelMessageWithSource, + data: Some( + InteractionResponseDataBuilder::new() + .content(format!( + "opted you out, your previous consent was {previous_consent:?}" + )) + .build(), + ), + }, + ) + .await + .expect("TODO"); +} diff --git a/src/lib.rs b/src/lib.rs index 9de2c20..2e5651f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,22 @@ -mod command; -mod one_to_many; -mod one_to_many_with_data; -mod one_to_one; -mod storage; -mod track_vcs; -mod vc_user; - -pub use command::{Router as CommandRouter, State, all as all_commands}; -pub use one_to_many::OneToManyUniqueBTreeMap; -pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData; -pub use one_to_one::OneToOneBTreeMap; -pub use storage::Storage; -pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, initialize_vcs, update_vcs}; -pub use vc_user::{UserInVCData, VoiceStatus}; - -capnp::generated_code!(pub mod user_capnp); -capnp::generated_code!(pub mod bot_capnp); +mod command; +mod one_to_many; +mod one_to_many_with_data; +mod one_to_one; +mod operator_ext; +mod option_ext; +mod storage; +mod track_vcs; +mod user_data; +mod vc_user; +capnp::generated_code!(mod bot_capnp); +capnp::generated_code!(mod user_capnp); + +pub use command::{Router as CommandRouter, State, all as all_commands}; +pub use one_to_many::OneToManyUniqueBTreeMap; +pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData; +pub use one_to_one::OneToOneBTreeMap; +pub use operator_ext::OperatorExt; +pub use storage::Storage; +pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, initialize_vcs, update_vcs}; +pub use user_data::UserDataManager; +pub use vc_user::{UserInVCData, VoiceStatus}; diff --git a/src/main.rs b/src/main.rs index 2ff8ae0..050dfaf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use clap::Parser; use fomo_reducer::{ - CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, all_commands, initialize_vcs, - update_vcs, + CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, all_commands, + initialize_vcs, update_vcs, }; use secrecy::{ExposeSecret, SecretString}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -303,6 +303,8 @@ async fn main() -> Result<(), MainError> { let recording_data = recording_data.into_inner(); let user_data = user_data.into_inner(); + let user_data_manager = UserDataManager::new(user_data); + let discord_voice_channel_corresponding_text_channel = { let mut map = GuildVoiceChannelToTextChannel::default(); @@ -331,7 +333,7 @@ async fn main() -> Result<(), MainError> { discord_voice_channel_corresponding_text_channel, recording_data, songbird, - user_data, + user_data_manager, vcs, }; diff --git a/src/operator_ext.rs b/src/operator_ext.rs new file mode 100644 index 0000000..59a3d44 --- /dev/null +++ b/src/operator_ext.rs @@ -0,0 +1,24 @@ +use extension_traits::extension; +use opendal::{Buffer, Error, ErrorKind, FuturesAsyncReader, Operator}; + +#[extension(pub trait OperatorExt)] +impl Operator { + async fn read_if_exists(&self, path: &str) -> Result, Error> { + match self.read(path).await { + Ok(buffer) => Ok(Some(buffer)), + Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None), + Err(error) => Err(error), + } + } + + async fn async_reader_if_exists( + &self, + path: &str, + ) -> Result, Error> { + match self.reader(path).await { + Ok(reader) => Ok(Some(reader.into_futures_async_read(..).await?)), + Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None), + Err(error) => Err(error), + } + } +} diff --git a/src/option_ext.rs b/src/option_ext.rs new file mode 100644 index 0000000..6d4e344 --- /dev/null +++ b/src/option_ext.rs @@ -0,0 +1,11 @@ +use extension_traits::extension; + +#[extension(pub trait OptionExt)] +impl Option { + async fn map_async, F: FnOnce(S) -> Fut>(self, f: F) -> Option { + match self { + Some(s) => Some(f(s).await), + None => None, + } + } +} diff --git a/src/storage.rs b/src/storage.rs index e33cf78..8d42b86 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -2,7 +2,6 @@ use std::{fmt::Debug, str::FromStr}; use opendal::{IntoOperatorUri, Operator, OperatorUri}; - #[derive(Clone)] pub struct Storage { uri: OperatorUri, diff --git a/src/track_vcs.rs b/src/track_vcs.rs index d78dbe3..26f3c8e 100644 --- a/src/track_vcs.rs +++ b/src/track_vcs.rs @@ -1,134 +1,135 @@ -use std::collections::BTreeMap; - -use dashmap::DashMap; -use futures::{StreamExt, stream::FuturesUnordered}; -use twilight_model::{ - gateway::payload::incoming::VoiceStateUpdate, - id::{ - Id, - marker::{ChannelMarker, GuildMarker, UserMarker}, - }, -}; - -use crate::{OneToManyUniqueBTreeMapWithData, OneToOneBTreeMap, UserInVCData, VoiceStatus}; - -pub type GuildVoiceChannelToTextChannel = BTreeMap, OneToOneBTreeMap, Id>>; - -type VCsInGuild = OneToManyUniqueBTreeMapWithData, Id, UserInVCData>; -pub type VCs = DashMap, VCsInGuild>; - -#[tracing::instrument(skip(discord_client), ret)] -async fn initialize_user_in_vc( - discord_client: &twilight_http::Client, - guild_id: Id, - user_id: Id, -) -> Option<(Id, UserInVCData)> { - if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await - && let Ok(voice_state) = voice_state_res.model().await - { - tracing::info!(?user_id, ?voice_state); - - let voice_status = VoiceStatus::builder() - .self_deafened(voice_state.self_deaf) - .self_muted(voice_state.self_mute) - .server_deafened(voice_state.deaf) - .server_muted(voice_state.mute) - .camming(voice_state.self_video) - .streaming(voice_state.self_stream) - .build(); - let user_in_vc_data = voice_status.into(); - - voice_state - .channel_id - .map(|channel_id| (channel_id, user_in_vc_data)) - } else { - None // TODO - } -} - -#[tracing::instrument(skip(discord_client), ret)] -async fn initialize_server_vcs( - discord_client: &twilight_http::Client, - id: Id, -) -> VCsInGuild { - if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await - && let Ok(guild_members) = guild_members_res.model().await - { - FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move { - ( - member.user.id, - initialize_user_in_vc(discord_client, id, member.user.id).await, - ) - })) - .filter_map( - |(user_id, channel_id_and_user_in_vc_data_option)| async move { - channel_id_and_user_in_vc_data_option - .map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data)) - }, - ) - .collect() - .await - } else { - Default::default() - } -} - -#[tracing::instrument(skip(discord_client), ret)] -pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs { - if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await - && let Ok(guilds) = guilds_res.model().await - { - FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move { - let guild_vcs = initialize_server_vcs(discord_client, guild.id).await; - - (guild.id, guild_vcs) - })) - .collect() - .await - } else { - Default::default() - } -} - -#[tracing::instrument(skip(vcs))] -pub fn update_vcs(voice_state_update: &VoiceStateUpdate, vcs: &VCs) { - let user_id = voice_state_update.user_id; - match voice_state_update.guild_id { - Some(guild_id) => match voice_state_update.channel_id { - Some(channel_id) => { - let voice_status = VoiceStatus::builder() - .self_deafened(voice_state_update.self_deaf) - .self_muted(voice_state_update.self_mute) - .server_deafened(voice_state_update.deaf) - .server_muted(voice_state_update.mute) - .camming(voice_state_update.self_video) - .streaming(voice_state_update.self_stream) - .build(); - let user_in_vc_data = voice_status.into(); - - vcs.entry(guild_id) - .or_default() - .insert(channel_id, user_id, user_in_vc_data); - - tracing::info!( - ?guild_id, - ?channel_id, - ?user_id, - "connected or otherwise changed state while connected" - ); - } - - None => { - if let Some(mut channel_vcers) = vcs.get_mut(&guild_id) { - channel_vcers.remove_right(&user_id); - } - - tracing::info!(?guild_id, ?user_id, "disconnected"); - } - }, - - None => { - tracing::error!("why doesn't this have a guild id attached?!"); - } - } -} +use std::collections::BTreeMap; + +use dashmap::DashMap; +use futures::{StreamExt, stream::FuturesUnordered}; +use twilight_model::{ + gateway::payload::incoming::VoiceStateUpdate, + id::{ + Id, + marker::{ChannelMarker, GuildMarker, UserMarker}, + }, +}; + +use crate::{OneToManyUniqueBTreeMapWithData, OneToOneBTreeMap, UserInVCData, VoiceStatus}; + +pub type GuildVoiceChannelToTextChannel = + BTreeMap, OneToOneBTreeMap, Id>>; + +type VCsInGuild = OneToManyUniqueBTreeMapWithData, Id, UserInVCData>; +pub type VCs = DashMap, VCsInGuild>; + +#[tracing::instrument(skip(discord_client), ret)] +async fn initialize_user_in_vc( + discord_client: &twilight_http::Client, + guild_id: Id, + user_id: Id, +) -> Option<(Id, UserInVCData)> { + if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await + && let Ok(voice_state) = voice_state_res.model().await + { + tracing::info!(?user_id, ?voice_state); + + let voice_status = VoiceStatus::builder() + .self_deafened(voice_state.self_deaf) + .self_muted(voice_state.self_mute) + .server_deafened(voice_state.deaf) + .server_muted(voice_state.mute) + .camming(voice_state.self_video) + .streaming(voice_state.self_stream) + .build(); + let user_in_vc_data = voice_status.into(); + + voice_state + .channel_id + .map(|channel_id| (channel_id, user_in_vc_data)) + } else { + None // TODO + } +} + +#[tracing::instrument(skip(discord_client), ret)] +async fn initialize_server_vcs( + discord_client: &twilight_http::Client, + id: Id, +) -> VCsInGuild { + if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await + && let Ok(guild_members) = guild_members_res.model().await + { + FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move { + ( + member.user.id, + initialize_user_in_vc(discord_client, id, member.user.id).await, + ) + })) + .filter_map( + |(user_id, channel_id_and_user_in_vc_data_option)| async move { + channel_id_and_user_in_vc_data_option + .map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data)) + }, + ) + .collect() + .await + } else { + Default::default() + } +} + +#[tracing::instrument(skip(discord_client), ret)] +pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs { + if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await + && let Ok(guilds) = guilds_res.model().await + { + FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move { + let guild_vcs = initialize_server_vcs(discord_client, guild.id).await; + + (guild.id, guild_vcs) + })) + .collect() + .await + } else { + Default::default() + } +} + +#[tracing::instrument(skip(vcs))] +pub fn update_vcs(voice_state_update: &VoiceStateUpdate, vcs: &VCs) { + let user_id = voice_state_update.user_id; + match voice_state_update.guild_id { + Some(guild_id) => match voice_state_update.channel_id { + Some(channel_id) => { + let voice_status = VoiceStatus::builder() + .self_deafened(voice_state_update.self_deaf) + .self_muted(voice_state_update.self_mute) + .server_deafened(voice_state_update.deaf) + .server_muted(voice_state_update.mute) + .camming(voice_state_update.self_video) + .streaming(voice_state_update.self_stream) + .build(); + let user_in_vc_data = voice_status.into(); + + vcs.entry(guild_id) + .or_default() + .insert(channel_id, user_id, user_in_vc_data); + + tracing::info!( + ?guild_id, + ?channel_id, + ?user_id, + "connected or otherwise changed state while connected" + ); + } + + None => { + if let Some(mut channel_vcers) = vcs.get_mut(&guild_id) { + channel_vcers.remove_right(&user_id); + } + + tracing::info!(?guild_id, ?user_id, "disconnected"); + } + }, + + None => { + tracing::error!("why doesn't this have a guild id attached?!"); + } + } +} diff --git a/src/user_data.rs b/src/user_data.rs new file mode 100644 index 0000000..fd706ac --- /dev/null +++ b/src/user_data.rs @@ -0,0 +1,173 @@ +use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder}; +use capnp::message::{TypedBuilder, TypedReader}; +use futures::{AsyncReadExt, AsyncWriteExt}; +use opendal::Operator; +use snafu::{ResultExt, Snafu}; +use twilight_model::id::{Id, marker::UserMarker}; + +use crate::{OperatorExt, option_ext::OptionExt, user_capnp}; + +#[derive(Debug, Clone)] +pub struct UserDataManager { + operator: Operator, +} + +impl UserDataManager { + pub fn new(operator: Operator) -> Self { + Self { operator } + } + + fn path(id: Id) -> String { + format!("{id}/data.bin.brotli") + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum WithError { + /// couldn't read data for this user from the storage operator + ReadError { source: opendal::Error }, + + /// couldn't decompress the user data from storage + DecompressionError { source: std::io::Error }, + + /// couldn't deserialize the user data + DeserializeError { source: capnp::Error }, +} + +impl UserDataManager { + pub async fn with( + &self, + id: Id, + f: impl FnOnce(user_capnp::user::Reader<'_>) -> R, + ) -> Result { + let compressed_buffer = self + .operator + .async_reader_if_exists(&UserDataManager::path(id)) + .await + .context(with_error::ReadSnafu)?; + + let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); + let decompressed = decompressed_reader + .map_async(|mut reader| async move { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec).await?; + Ok(vec) + }) + .await + .transpose() + .context(with_error::DecompressionSnafu)?; + + let mut message = TypedBuilder::::new_default(); + let fallback = message.init_root(); + + let mut user_data = fallback.into_reader(); + let message_reader; + + if let Some(mut bytes) = decompressed.as_deref() { + message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( + &mut bytes, + Default::default(), + ) + .context(with_error::DeserializeSnafu)?; + + user_data = message_reader + .get_root() + .context(with_error::DeserializeSnafu)?; + } + + Ok(f(user_data)) + } +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum UpdateError { + /// couldn't read data for this user from the storage operator + ReadError { source: opendal::Error }, + + /// couldn't decompress the user data from storage + DecompressionError { source: std::io::Error }, + + /// couldn't deserialize the user data + DeserializeError { source: capnp::Error }, + + /// couldn't serialize the (modified) user data + SerializeError { source: capnp::Error }, + + /// couldn't create a writer for this user data in the storage operator + WriterError { source: opendal::Error }, + + /// couldn't write (modified) data for this user to the storage operator + WriteError { source: std::io::Error }, +} + +impl UserDataManager { + pub async fn update( + &self, + id: Id, + f: impl FnOnce(user_capnp::user::Builder<'_>) -> R, + ) -> Result { + let path = UserDataManager::path(id); + let compressed_buffer = self + .operator + .async_reader_if_exists(&path) + .await + .context(update_error::ReadSnafu)?; + + let decompressed_reader = compressed_buffer.map(BrotliDecoder::new); + let decompressed = decompressed_reader + .map_async(|mut reader| async move { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec).await?; + Ok(vec) + }) + .await + .transpose() + .context(update_error::DecompressionSnafu)?; + + let mut message = TypedBuilder::::new_default(); + + let ret = if let Some(mut bytes) = decompressed.as_deref() { + let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc( + &mut bytes, + Default::default(), + ) + .context(update_error::DeserializeSnafu)?; + + let user_data = message_reader + .get_root() + .context(update_error::DeserializeSnafu)?; + + message + .set_root(user_data) + .context(update_error::DeserializeSnafu)?; + + f( + message.get_root().unwrap(), // this is logically impossible + ) + } else { + f(message.init_root()) + }; + + let mut buffer = Vec::new(); + capnp::serialize::write_message(&mut buffer, message.borrow_inner()) + .context(update_error::SerializeSnafu)?; + + let compressed_writer = self + .operator + .writer(&path) + .await + .context(update_error::WriterSnafu)? + .into_futures_async_write(); + + let mut decompressed_writer = BrotliEncoder::new(compressed_writer); + + decompressed_writer + .write_all(&buffer) + .await + .context(update_error::WriteSnafu)?; + + Ok(ret) + } +} diff --git a/user.capnp b/user.capnp index 24c8c8f..8e467f3 100644 --- a/user.capnp +++ b/user.capnp @@ -3,5 +3,11 @@ struct User { notificationScript @0 :Text; - voiceRecordingConsent @1 :Bool; + voiceRecordingConsent @1 :Consent = unspecified; + + enum Consent { + unspecified @0; + granted @1; + withheld @2; + } }