feat: user consent setting and retrieving (NOTE: does not affect recording yet)

This commit is contained in:
2026-04-21 03:11:27 -04:00
parent 62399c2046
commit 0ce26fc0e5
13 changed files with 589 additions and 201 deletions

53
Cargo.lock generated
View File

@@ -562,9 +562,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.11.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "cacache"
@@ -1601,6 +1601,35 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "ext-trait"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccef4f53516d7589a8ed95216b6ebc9d519df033c1303b42125bfe57aa475d23"
dependencies = [
"ext-trait-proc_macros",
]
[[package]]
name = "ext-trait-proc_macros"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "025e48a9a5db92b84dbd3b6be37853a0e60c1ce9c7c03c08e6ac282766f3e3f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]]
name = "extension-traits"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7dae1256e3fea2900e1e674bae228edc852f5ce9ccb1c916a496a33cb9bc4cb"
dependencies = [
"ext-trait",
]
[[package]]
name = "fastpool"
version = "1.1.0"
@@ -1700,12 +1729,15 @@ version = "0.1.0"
dependencies = [
"async-compression",
"async-trait",
"bytes",
"capnp",
"capnpc",
"clap",
"dashmap 6.1.0",
"extension-traits",
"futures",
"hound",
"moka",
"opendal",
"opus2",
"patricia_tree 0.10.1",
@@ -1725,6 +1757,7 @@ dependencies = [
"twilight-model",
"twilight-util",
"typed-builder 0.23.2",
"yoke",
]
[[package]]
@@ -3414,9 +3447,9 @@ dependencies = [
[[package]]
name = "moka"
version = "0.12.12"
version = "0.12.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a"
checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046"
dependencies = [
"async-lock",
"crossbeam-channel",
@@ -4954,9 +4987,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.42"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
@@ -8369,9 +8402,9 @@ checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
[[package]]
name = "yoke"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954"
checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca"
dependencies = [
"stable_deref_trait",
"yoke-derive",
@@ -8380,9 +8413,9 @@ dependencies = [
[[package]]
name = "yoke-derive"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -6,11 +6,14 @@ edition = "2024"
[dependencies]
async-compression = { version = "0.4.41", features = ["brotli", "futures-io"] }
async-trait = "0.1.89"
bytes = "1.11.1"
capnp = "0.25.3"
clap = { version = "4.5.40", features = ["derive", "env"] }
dashmap = "6.1.0"
extension-traits = "2.0.2"
futures = "0.3.32"
hound = "3.5.1"
moka = { version = "0.12.15", features = ["future"] }
opendal = { git = "https://github.com/apache/opendal", features = [
"services-azfile",
"services-aliyun-drive",
@@ -65,7 +68,7 @@ songbird = { version = "0.6.0", default-features = false, features = [
strum = { version = "0.28.0", features = ["derive"] }
time = "0.3.47"
tokio = { version = "1.46.0", features = ["rt-multi-thread", "macros", "signal"] }
tokio-util = "0.7.18"
tokio-util = { version = "0.7.18", features = ["io"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
twilight-gateway = { version = "0.17", default-features = false, features = ["rustls-webpki-roots", "twilight-http"] }
@@ -77,6 +80,7 @@ twilight-http = { version = "0.17", default-features = false, features = [
twilight-model = "0.17"
twilight-util = { version = "0.17", features = ["builder"] }
typed-builder = "0.23.2"
yoke = "0.8.2"
[build-dependencies]
capnpc = "0.25.3"

View File

@@ -3,7 +3,10 @@ use std::{fmt::Debug, sync::Arc};
use futures::future::BoxFuture;
use opendal::Operator;
use patricia_tree::StringPatriciaMap;
use songbird::{Songbird, driver::{Channels, SampleRate}};
use songbird::{
Songbird,
driver::{Channels, SampleRate},
};
use tokio_util::sync::CancellationToken;
use twilight_model::{
application::{command::Command, interaction::Interaction},
@@ -13,7 +16,7 @@ use twilight_model::{
},
};
use crate::{VCs, track_vcs::GuildVoiceChannelToTextChannel};
use crate::{GuildVoiceChannelToTextChannel, UserDataManager, VCs};
mod debug;
mod join;
@@ -34,7 +37,7 @@ pub struct State {
pub discord_voice_channel_corresponding_text_channel: Arc<GuildVoiceChannelToTextChannel>,
pub recording_data: Operator,
pub songbird: Arc<Songbird>,
pub user_data: Operator,
pub user_data_manager: UserDataManager,
pub vcs: Arc<VCs>,
}

View File

@@ -1,12 +1,15 @@
use std::sync::LazyLock;
use twilight_model::application::{
command::{Command, CommandType},
interaction::Interaction,
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
http::interaction::{InteractionResponse, InteractionResponseType},
};
use twilight_util::builder::command::CommandBuilder;
use twilight_util::builder::{InteractionResponseDataBuilder, command::CommandBuilder};
use crate::command::State;
use crate::{command::State, user_capnp::user::Consent};
const NAME: &str = "opt-in";
const DESCRIPTION: &str = "Opt in to being recorded";
@@ -20,5 +23,66 @@ pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
#[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) {
todo!();
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id));
let user_id = match user_id {
Some(user_id) => user_id,
None => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.content("TODO")
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
let previous_consent = state
.user_data_manager
.update(user_id, |mut user_data| {
let previous_consent = user_data
.reborrow()
.get_voice_recording_consent()
.expect("TODO");
user_data.set_voice_recording_consent(Consent::Granted);
previous_consent
})
.await
.expect("TODO");
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.content(format!(
"opted you in, your previous consent was {previous_consent:?}"
))
.build(),
),
},
)
.await
.expect("TODO");
}

View File

@@ -1,24 +1,88 @@
use std::sync::LazyLock;
use twilight_model::application::{
command::{Command, CommandType},
interaction::Interaction,
};
use twilight_util::builder::command::CommandBuilder;
use crate::command::State;
const NAME: &str = "opt-out";
const DESCRIPTION: &str = "Opt out of being recorded";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) {
todo!();
}
use std::sync::LazyLock;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
http::interaction::{InteractionResponse, InteractionResponseType},
};
use twilight_util::builder::{InteractionResponseDataBuilder, command::CommandBuilder};
use crate::{command::State, user_capnp::user::Consent};
const NAME: &str = "opt-out";
const DESCRIPTION: &str = "Opt out of being recorded";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) {
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id));
let user_id = match user_id {
Some(user_id) => user_id,
None => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.content("TODO")
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
let previous_consent = state
.user_data_manager
.update(user_id, |mut user_data| {
let previous_consent = user_data
.reborrow()
.get_voice_recording_consent()
.expect("TODO");
user_data.set_voice_recording_consent(Consent::Withheld);
previous_consent
})
.await
.expect("TODO");
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.content(format!(
"opted you out, your previous consent was {previous_consent:?}"
))
.build(),
),
},
)
.await
.expect("TODO");
}

View File

@@ -1,18 +1,22 @@
mod command;
mod one_to_many;
mod one_to_many_with_data;
mod one_to_one;
mod storage;
mod track_vcs;
mod vc_user;
pub use command::{Router as CommandRouter, State, all as all_commands};
pub use one_to_many::OneToManyUniqueBTreeMap;
pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData;
pub use one_to_one::OneToOneBTreeMap;
pub use storage::Storage;
pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, initialize_vcs, update_vcs};
pub use vc_user::{UserInVCData, VoiceStatus};
capnp::generated_code!(pub mod user_capnp);
capnp::generated_code!(pub mod bot_capnp);
mod command;
mod one_to_many;
mod one_to_many_with_data;
mod one_to_one;
mod operator_ext;
mod option_ext;
mod storage;
mod track_vcs;
mod user_data;
mod vc_user;
capnp::generated_code!(mod bot_capnp);
capnp::generated_code!(mod user_capnp);
pub use command::{Router as CommandRouter, State, all as all_commands};
pub use one_to_many::OneToManyUniqueBTreeMap;
pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData;
pub use one_to_one::OneToOneBTreeMap;
pub use operator_ext::OperatorExt;
pub use storage::Storage;
pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, initialize_vcs, update_vcs};
pub use user_data::UserDataManager;
pub use vc_user::{UserInVCData, VoiceStatus};

View File

@@ -1,7 +1,7 @@
use clap::Parser;
use fomo_reducer::{
CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, all_commands, initialize_vcs,
update_vcs,
CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, all_commands,
initialize_vcs, update_vcs,
};
use secrecy::{ExposeSecret, SecretString};
use snafu::{OptionExt, ResultExt, Snafu};
@@ -303,6 +303,8 @@ async fn main() -> Result<(), MainError> {
let recording_data = recording_data.into_inner();
let user_data = user_data.into_inner();
let user_data_manager = UserDataManager::new(user_data);
let discord_voice_channel_corresponding_text_channel = {
let mut map = GuildVoiceChannelToTextChannel::default();
@@ -331,7 +333,7 @@ async fn main() -> Result<(), MainError> {
discord_voice_channel_corresponding_text_channel,
recording_data,
songbird,
user_data,
user_data_manager,
vcs,
};

24
src/operator_ext.rs Normal file
View File

@@ -0,0 +1,24 @@
use extension_traits::extension;
use opendal::{Buffer, Error, ErrorKind, FuturesAsyncReader, Operator};
#[extension(pub trait OperatorExt)]
impl Operator {
async fn read_if_exists(&self, path: &str) -> Result<Option<Buffer>, Error> {
match self.read(path).await {
Ok(buffer) => Ok(Some(buffer)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error),
}
}
async fn async_reader_if_exists(
&self,
path: &str,
) -> Result<Option<FuturesAsyncReader>, Error> {
match self.reader(path).await {
Ok(reader) => Ok(Some(reader.into_futures_async_read(..).await?)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error),
}
}
}

11
src/option_ext.rs Normal file
View File

@@ -0,0 +1,11 @@
use extension_traits::extension;
#[extension(pub trait OptionExt)]
impl<S> Option<S> {
async fn map_async<M, Fut: Future<Output = M>, F: FnOnce(S) -> Fut>(self, f: F) -> Option<M> {
match self {
Some(s) => Some(f(s).await),
None => None,
}
}
}

View File

@@ -2,7 +2,6 @@ use std::{fmt::Debug, str::FromStr};
use opendal::{IntoOperatorUri, Operator, OperatorUri};
#[derive(Clone)]
pub struct Storage {
uri: OperatorUri,

View File

@@ -1,134 +1,135 @@
use std::collections::BTreeMap;
use dashmap::DashMap;
use futures::{StreamExt, stream::FuturesUnordered};
use twilight_model::{
gateway::payload::incoming::VoiceStateUpdate,
id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
},
};
use crate::{OneToManyUniqueBTreeMapWithData, OneToOneBTreeMap, UserInVCData, VoiceStatus};
pub type GuildVoiceChannelToTextChannel = BTreeMap<Id<GuildMarker>, OneToOneBTreeMap<Id<ChannelMarker>, Id<ChannelMarker>>>;
type VCsInGuild = OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>;
pub type VCs = DashMap<Id<GuildMarker>, VCsInGuild>;
#[tracing::instrument(skip(discord_client), ret)]
async fn initialize_user_in_vc(
discord_client: &twilight_http::Client,
guild_id: Id<GuildMarker>,
user_id: Id<UserMarker>,
) -> Option<(Id<ChannelMarker>, UserInVCData)> {
if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await
&& let Ok(voice_state) = voice_state_res.model().await
{
tracing::info!(?user_id, ?voice_state);
let voice_status = VoiceStatus::builder()
.self_deafened(voice_state.self_deaf)
.self_muted(voice_state.self_mute)
.server_deafened(voice_state.deaf)
.server_muted(voice_state.mute)
.camming(voice_state.self_video)
.streaming(voice_state.self_stream)
.build();
let user_in_vc_data = voice_status.into();
voice_state
.channel_id
.map(|channel_id| (channel_id, user_in_vc_data))
} else {
None // TODO
}
}
#[tracing::instrument(skip(discord_client), ret)]
async fn initialize_server_vcs(
discord_client: &twilight_http::Client,
id: Id<GuildMarker>,
) -> VCsInGuild {
if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await
&& let Ok(guild_members) = guild_members_res.model().await
{
FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move {
(
member.user.id,
initialize_user_in_vc(discord_client, id, member.user.id).await,
)
}))
.filter_map(
|(user_id, channel_id_and_user_in_vc_data_option)| async move {
channel_id_and_user_in_vc_data_option
.map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data))
},
)
.collect()
.await
} else {
Default::default()
}
}
#[tracing::instrument(skip(discord_client), ret)]
pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs {
if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await
&& let Ok(guilds) = guilds_res.model().await
{
FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move {
let guild_vcs = initialize_server_vcs(discord_client, guild.id).await;
(guild.id, guild_vcs)
}))
.collect()
.await
} else {
Default::default()
}
}
#[tracing::instrument(skip(vcs))]
pub fn update_vcs(voice_state_update: &VoiceStateUpdate, vcs: &VCs) {
let user_id = voice_state_update.user_id;
match voice_state_update.guild_id {
Some(guild_id) => match voice_state_update.channel_id {
Some(channel_id) => {
let voice_status = VoiceStatus::builder()
.self_deafened(voice_state_update.self_deaf)
.self_muted(voice_state_update.self_mute)
.server_deafened(voice_state_update.deaf)
.server_muted(voice_state_update.mute)
.camming(voice_state_update.self_video)
.streaming(voice_state_update.self_stream)
.build();
let user_in_vc_data = voice_status.into();
vcs.entry(guild_id)
.or_default()
.insert(channel_id, user_id, user_in_vc_data);
tracing::info!(
?guild_id,
?channel_id,
?user_id,
"connected or otherwise changed state while connected"
);
}
None => {
if let Some(mut channel_vcers) = vcs.get_mut(&guild_id) {
channel_vcers.remove_right(&user_id);
}
tracing::info!(?guild_id, ?user_id, "disconnected");
}
},
None => {
tracing::error!("why doesn't this have a guild id attached?!");
}
}
}
use std::collections::BTreeMap;
use dashmap::DashMap;
use futures::{StreamExt, stream::FuturesUnordered};
use twilight_model::{
gateway::payload::incoming::VoiceStateUpdate,
id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
},
};
use crate::{OneToManyUniqueBTreeMapWithData, OneToOneBTreeMap, UserInVCData, VoiceStatus};
pub type GuildVoiceChannelToTextChannel =
BTreeMap<Id<GuildMarker>, OneToOneBTreeMap<Id<ChannelMarker>, Id<ChannelMarker>>>;
type VCsInGuild = OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>;
pub type VCs = DashMap<Id<GuildMarker>, VCsInGuild>;
#[tracing::instrument(skip(discord_client), ret)]
async fn initialize_user_in_vc(
discord_client: &twilight_http::Client,
guild_id: Id<GuildMarker>,
user_id: Id<UserMarker>,
) -> Option<(Id<ChannelMarker>, UserInVCData)> {
if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await
&& let Ok(voice_state) = voice_state_res.model().await
{
tracing::info!(?user_id, ?voice_state);
let voice_status = VoiceStatus::builder()
.self_deafened(voice_state.self_deaf)
.self_muted(voice_state.self_mute)
.server_deafened(voice_state.deaf)
.server_muted(voice_state.mute)
.camming(voice_state.self_video)
.streaming(voice_state.self_stream)
.build();
let user_in_vc_data = voice_status.into();
voice_state
.channel_id
.map(|channel_id| (channel_id, user_in_vc_data))
} else {
None // TODO
}
}
#[tracing::instrument(skip(discord_client), ret)]
async fn initialize_server_vcs(
discord_client: &twilight_http::Client,
id: Id<GuildMarker>,
) -> VCsInGuild {
if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await
&& let Ok(guild_members) = guild_members_res.model().await
{
FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move {
(
member.user.id,
initialize_user_in_vc(discord_client, id, member.user.id).await,
)
}))
.filter_map(
|(user_id, channel_id_and_user_in_vc_data_option)| async move {
channel_id_and_user_in_vc_data_option
.map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data))
},
)
.collect()
.await
} else {
Default::default()
}
}
#[tracing::instrument(skip(discord_client), ret)]
pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs {
if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await
&& let Ok(guilds) = guilds_res.model().await
{
FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move {
let guild_vcs = initialize_server_vcs(discord_client, guild.id).await;
(guild.id, guild_vcs)
}))
.collect()
.await
} else {
Default::default()
}
}
#[tracing::instrument(skip(vcs))]
pub fn update_vcs(voice_state_update: &VoiceStateUpdate, vcs: &VCs) {
let user_id = voice_state_update.user_id;
match voice_state_update.guild_id {
Some(guild_id) => match voice_state_update.channel_id {
Some(channel_id) => {
let voice_status = VoiceStatus::builder()
.self_deafened(voice_state_update.self_deaf)
.self_muted(voice_state_update.self_mute)
.server_deafened(voice_state_update.deaf)
.server_muted(voice_state_update.mute)
.camming(voice_state_update.self_video)
.streaming(voice_state_update.self_stream)
.build();
let user_in_vc_data = voice_status.into();
vcs.entry(guild_id)
.or_default()
.insert(channel_id, user_id, user_in_vc_data);
tracing::info!(
?guild_id,
?channel_id,
?user_id,
"connected or otherwise changed state while connected"
);
}
None => {
if let Some(mut channel_vcers) = vcs.get_mut(&guild_id) {
channel_vcers.remove_right(&user_id);
}
tracing::info!(?guild_id, ?user_id, "disconnected");
}
},
None => {
tracing::error!("why doesn't this have a guild id attached?!");
}
}
}

173
src/user_data.rs Normal file
View File

@@ -0,0 +1,173 @@
use async_compression::futures::{bufread::BrotliDecoder, write::BrotliEncoder};
use capnp::message::{TypedBuilder, TypedReader};
use futures::{AsyncReadExt, AsyncWriteExt};
use opendal::Operator;
use snafu::{ResultExt, Snafu};
use twilight_model::id::{Id, marker::UserMarker};
use crate::{OperatorExt, option_ext::OptionExt, user_capnp};
#[derive(Debug, Clone)]
pub struct UserDataManager {
operator: Operator,
}
impl UserDataManager {
pub fn new(operator: Operator) -> Self {
Self { operator }
}
fn path(id: Id<UserMarker>) -> String {
format!("{id}/data.bin.brotli")
}
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum WithError {
/// couldn't read data for this user from the storage operator
ReadError { source: opendal::Error },
/// couldn't decompress the user data from storage
DecompressionError { source: std::io::Error },
/// couldn't deserialize the user data
DeserializeError { source: capnp::Error },
}
impl UserDataManager {
pub async fn with<R>(
&self,
id: Id<UserMarker>,
f: impl FnOnce(user_capnp::user::Reader<'_>) -> R,
) -> Result<R, WithError> {
let compressed_buffer = self
.operator
.async_reader_if_exists(&UserDataManager::path(id))
.await
.context(with_error::ReadSnafu)?;
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
let decompressed = decompressed_reader
.map_async(|mut reader| async move {
let mut vec = Vec::new();
reader.read_to_end(&mut vec).await?;
Ok(vec)
})
.await
.transpose()
.context(with_error::DecompressionSnafu)?;
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
let fallback = message.init_root();
let mut user_data = fallback.into_reader();
let message_reader;
if let Some(mut bytes) = decompressed.as_deref() {
message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
&mut bytes,
Default::default(),
)
.context(with_error::DeserializeSnafu)?;
user_data = message_reader
.get_root()
.context(with_error::DeserializeSnafu)?;
}
Ok(f(user_data))
}
}
#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum UpdateError {
/// couldn't read data for this user from the storage operator
ReadError { source: opendal::Error },
/// couldn't decompress the user data from storage
DecompressionError { source: std::io::Error },
/// couldn't deserialize the user data
DeserializeError { source: capnp::Error },
/// couldn't serialize the (modified) user data
SerializeError { source: capnp::Error },
/// couldn't create a writer for this user data in the storage operator
WriterError { source: opendal::Error },
/// couldn't write (modified) data for this user to the storage operator
WriteError { source: std::io::Error },
}
impl UserDataManager {
pub async fn update<R>(
&self,
id: Id<UserMarker>,
f: impl FnOnce(user_capnp::user::Builder<'_>) -> R,
) -> Result<R, UpdateError> {
let path = UserDataManager::path(id);
let compressed_buffer = self
.operator
.async_reader_if_exists(&path)
.await
.context(update_error::ReadSnafu)?;
let decompressed_reader = compressed_buffer.map(BrotliDecoder::new);
let decompressed = decompressed_reader
.map_async(|mut reader| async move {
let mut vec = Vec::new();
reader.read_to_end(&mut vec).await?;
Ok(vec)
})
.await
.transpose()
.context(update_error::DecompressionSnafu)?;
let mut message = TypedBuilder::<user_capnp::user::Owned>::new_default();
let ret = if let Some(mut bytes) = decompressed.as_deref() {
let message_reader = capnp::serialize::read_message_from_flat_slice_no_alloc(
&mut bytes,
Default::default(),
)
.context(update_error::DeserializeSnafu)?;
let user_data = message_reader
.get_root()
.context(update_error::DeserializeSnafu)?;
message
.set_root(user_data)
.context(update_error::DeserializeSnafu)?;
f(
message.get_root().unwrap(), // this is logically impossible
)
} else {
f(message.init_root())
};
let mut buffer = Vec::new();
capnp::serialize::write_message(&mut buffer, message.borrow_inner())
.context(update_error::SerializeSnafu)?;
let compressed_writer = self
.operator
.writer(&path)
.await
.context(update_error::WriterSnafu)?
.into_futures_async_write();
let mut decompressed_writer = BrotliEncoder::new(compressed_writer);
decompressed_writer
.write_all(&buffer)
.await
.context(update_error::WriteSnafu)?;
Ok(ret)
}
}

View File

@@ -3,5 +3,11 @@
struct User {
notificationScript @0 :Text;
voiceRecordingConsent @1 :Bool;
voiceRecordingConsent @1 :Consent = unspecified;
enum Consent {
unspecified @0;
granted @1;
withheld @2;
}
}