diff --git a/Cargo.lock b/Cargo.lock index 155e614..79d4c6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -492,6 +492,31 @@ dependencies = [ "piper", ] +[[package]] +name = "bon" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" +dependencies = [ + "darling 0.23.0", + "ident_case", + "prettyplease", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.117", +] + [[package]] name = "brotli" version = "8.0.2" @@ -1743,6 +1768,7 @@ version = "0.1.0" dependencies = [ "async-compression", "async-trait", + "bon", "bytes", "capnp", "capnpc", diff --git a/Cargo.toml b/Cargo.toml index bcb5a1f..47dfcda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] async-compression = { version = "0.4.41", features = ["brotli", "futures-io"] } async-trait = "0.1.89" +bon = "3.9.1" bytes = "1.11.1" capnp = "0.25.3" clap = { version = "4.5.40", features = ["derive", "env"] } diff --git a/src/call.rs b/src/call.rs new file mode 100644 index 0000000..228a602 --- /dev/null +++ b/src/call.rs @@ -0,0 +1,221 @@ +use crate::{ + OneToManyUniqueBTreeMap, UserDataManager, VCs, command::State, option_ext::OptionExt as _, + user_capnp::user::Consent, user_data::RECORD_IF_CONSENT_UNSPECIFIED, +}; +use async_trait::async_trait; +use futures::FutureExt; +use hound::{SampleFormat, WavSpec}; +use opendal::Operator; +use songbird::{ + CoreEvent, Event, EventContext, EventHandler, Songbird, + driver::{Channels, SampleRate}, +}; +use std::{ + io::Cursor, + sync::{Arc, Mutex}, + time::Instant, +}; +use time::UtcDateTime; +use twilight_model:: + id::{ + Id, + marker::{ChannelMarker, GuildMarker, UserMarker}, + } +; + +#[derive(Debug, Clone)] +struct Handler { + start_instant: Instant, + start_utc: UtcDateTime, + + recording_data: Operator, + + guild_id: Id, + channel_id: Id, + + known_ssrcs: Arc, u32>>>, + + audio_channels: u16, + audio_sample_rate: u32, + + user_data_manager: UserDataManager, +} + +#[async_trait] +impl EventHandler for Handler { + async fn act(&self, ctx: &EventContext<'_>) -> Option { + match ctx { + EventContext::Track(_items) => { + // Not expected to fire + } + EventContext::SpeakingStateUpdate(speaking) => { + tracing::error!(?speaking); + + if let Some(user_id) = speaking.user_id { + let user_id = Id::new(user_id.0); + + self.known_ssrcs + .lock() + .unwrap() + .insert(user_id, speaking.ssrc); + } + } + EventContext::VoiceTick(voice_tick) => { + tracing::debug!(?voice_tick); + + for (ssrc, voice_data) in &voice_tick.speaking { + let user_id = { + let known_ssrcs = self.known_ssrcs.lock().unwrap(); + tracing::debug!(?known_ssrcs, ?ssrc); + known_ssrcs.get_left_for(ssrc).cloned() + }; + + tracing::info!(?user_id); + + if let Some(pcm) = &voice_data.decoded_voice { + let may_record = user_id + .map_async(|user_id| { + self.user_data_manager + .with(user_id, |user_data| { + user_data.get_voice_recording_consent().unwrap() + }) + .map(|result| result.expect("TODO")) + }) + .await + .map_or(RECORD_IF_CONSENT_UNSPECIFIED, |consent| match consent { + Consent::Unspecified => RECORD_IF_CONSENT_UNSPECIFIED, + Consent::Granted => true, + Consent::Withheld => false, + }); + + if !may_record { + tracing::warn!(?user_id, "may not be recorded"); + continue; + } + + let elapsed = self.start_instant.elapsed(); + let elapsed = elapsed.try_into().expect("TODO"); + + let now_utc = self.start_utc.checked_add(elapsed).expect("TODO"); + + let year = now_utc.year(); + let month = now_utc.month(); + let day = now_utc.day(); + + let hour = now_utc.hour(); + let minute = now_utc.minute(); + let second = now_utc.second(); + + let microseconds = now_utc.microsecond(); + + let guild_id = self.guild_id; + let channel_id = self.channel_id; + + let user = user_id + .as_ref() + .map_or_else(|| "UNKNOWN".into(), ToString::to_string); + + let path = format!( + "{year}/{month}/{day}/{hour}/{minute}/audio-{second}.{microseconds}-{guild_id}-{channel_id}-{user}.wav" + ); + + let channels = self.audio_channels; + let sample_rate = self.audio_sample_rate; + + let wav_spec = WavSpec { + channels, + sample_rate, + bits_per_sample: 16, + sample_format: SampleFormat::Int, + }; + + let mut buffer = Vec::new(); + let writer = Cursor::new(&mut buffer); + + let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO"); + + let mut sample_writer = wav_writer.get_i16_writer(pcm.len() as u32); + + for sample in pcm { + sample_writer.write_sample(*sample); + } + sample_writer.flush().expect("TODO"); + + wav_writer.finalize().expect("TODO"); + + tracing::info!("going to write the audio shortly"); + + let recording_data = self.recording_data.clone(); + tokio::spawn(async move { + recording_data.write(&path, buffer).await.expect("TODO"); + tracing::info!(?path, "successfully wrote the audio!"); + }); + } + } + } + EventContext::RtpPacket(_rtp_data) => {} + EventContext::RtcpPacket(_rtcp_data) => {} + EventContext::ClientDisconnect(_client_disconnect) => { + // This is already taken care of elsewhere + } + EventContext::DriverConnect(_connect_data) => {} + EventContext::DriverReconnect(_connect_data) => {} + EventContext::DriverDisconnect(_disconnect_data) => {} + other => { + tracing::warn!(?other, "cannot be handled yet"); + } + } + + None + } +} + +#[bon::builder] +#[tracing::instrument] +pub async fn join_and_record( + audio_channels: Channels, + audio_sample_rate: SampleRate, + guild_id: Id, + recording_data: Operator, + songbird: &Songbird, + user_data_manager: UserDataManager, + voice_channel_id: Id, +) -> Result<(), songbird::error::JoinError> { + let start_instant = Instant::now(); + let start_utc = UtcDateTime::now(); + + let audio_channels = opus2::Channels::from(audio_channels) as u16; + + let audio_sample_rate = u32::from(audio_sample_rate); + + let handler = Handler { + start_instant, + start_utc, + recording_data, + guild_id, + channel_id: voice_channel_id, + known_ssrcs: Default::default(), + + audio_channels, + audio_sample_rate, + + user_data_manager, + }; + + let call = songbird.get_or_insert(guild_id); + { + let mut call = call.lock().await; + call.remove_all_global_events(); // TODO: WIP: investigating + + call.add_global_event(CoreEvent::SpeakingStateUpdate.into(), handler.clone()); + call.add_global_event(CoreEvent::VoiceTick.into(), handler); + + if let Err(muting_error) = call.mute(true).await { + tracing::warn!(?muting_error, "couldn't mute, but that's okay"); + }; + } + + songbird.join(guild_id, voice_channel_id).await?; + + Ok(()) +} diff --git a/src/command/join.rs b/src/command/join.rs index 07c638d..86a2939 100644 --- a/src/command/join.rs +++ b/src/command/join.rs @@ -1,19 +1,10 @@ use crate::{ - OneToManyUniqueBTreeMap, UserDataManager, VCs, command::State, option_ext::OptionExt as _, - user_capnp::user::Consent, user_data::RECORD_IF_CONSENT_UNSPECIFIED, + VCs, call::join_and_record, command::State, }; -use async_trait::async_trait; -use futures::FutureExt; -use hound::{SampleFormat, WavSpec}; -use opendal::Operator; use snafu::{OptionExt as _, Snafu}; -use songbird::{CoreEvent, Event, EventContext, EventHandler}; -use std::{ - io::Cursor, - sync::{Arc, LazyLock, Mutex}, - time::Instant, -}; -use time::UtcDateTime; +use std:: + sync::LazyLock +; use twilight_model::{ application::{ command::{Command, CommandType}, @@ -23,7 +14,7 @@ use twilight_model::{ http::interaction::{InteractionResponse, InteractionResponseType}, id::{ Id, - marker::{ChannelMarker, GuildMarker, UserMarker}, + marker::{ChannelMarker, GuildMarker}, }, }; use twilight_util::builder::{ @@ -88,153 +79,6 @@ fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Emb } } -#[derive(Debug, Clone)] -struct Handler { - start_instant: Instant, - start_utc: UtcDateTime, - - recordings: Operator, - - guild_id: Id, - channel_id: Id, - - known_ssrcs: Arc, u32>>>, - - audio_channels: u16, - audio_sample_rate: u32, - - user_data_manager: UserDataManager, -} - -#[async_trait] -impl EventHandler for Handler { - async fn act(&self, ctx: &EventContext<'_>) -> Option { - match ctx { - EventContext::Track(_items) => { - // Not expected to fire - } - EventContext::SpeakingStateUpdate(speaking) => { - tracing::error!(?speaking); - - if let Some(user_id) = speaking.user_id { - let user_id = Id::new(user_id.0); - - self.known_ssrcs - .lock() - .unwrap() - .insert(user_id, speaking.ssrc); - } - } - EventContext::VoiceTick(voice_tick) => { - tracing::debug!(?voice_tick); - - for (ssrc, voice_data) in &voice_tick.speaking { - let user_id = { - let known_ssrcs = self.known_ssrcs.lock().unwrap(); - tracing::debug!(?known_ssrcs, ?ssrc); - known_ssrcs.get_left_for(ssrc).cloned() - }; - - tracing::info!(?user_id); - - if let Some(pcm) = &voice_data.decoded_voice { - let may_record = user_id - .map_async(|user_id| { - self.user_data_manager - .with(user_id, |user_data| { - user_data.get_voice_recording_consent().unwrap() - }) - .map(|result| result.expect("TODO")) - }) - .await - .map_or(RECORD_IF_CONSENT_UNSPECIFIED, |consent| match consent { - Consent::Unspecified => RECORD_IF_CONSENT_UNSPECIFIED, - Consent::Granted => true, - Consent::Withheld => false, - }); - - if !may_record { - tracing::warn!(?user_id, "may not be recorded"); - continue; - } - - let elapsed = self.start_instant.elapsed(); - let elapsed = elapsed.try_into().expect("TODO"); - - let now_utc = self.start_utc.checked_add(elapsed).expect("TODO"); - - let year = now_utc.year(); - let month = now_utc.month(); - let day = now_utc.day(); - - let hour = now_utc.hour(); - let minute = now_utc.minute(); - let second = now_utc.second(); - - let microseconds = now_utc.microsecond(); - - let guild_id = self.guild_id; - let channel_id = self.channel_id; - - let user = user_id - .as_ref() - .map_or_else(|| "UNKNOWN".into(), ToString::to_string); - - let path = format!( - "{year}/{month}/{day}/{hour}/{minute}/audio-{second}.{microseconds}-{guild_id}-{channel_id}-{user}.wav" - ); - - let channels = self.audio_channels; - let sample_rate = self.audio_sample_rate; - - let wav_spec = WavSpec { - channels, - sample_rate, - bits_per_sample: 16, - sample_format: SampleFormat::Int, - }; - - let mut buffer = Vec::new(); - let writer = Cursor::new(&mut buffer); - - let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO"); - - let mut sample_writer = wav_writer.get_i16_writer(pcm.len() as u32); - - for sample in pcm { - sample_writer.write_sample(*sample); - } - sample_writer.flush().expect("TODO"); - - wav_writer.finalize().expect("TODO"); - - tracing::info!("going to write the audio shortly"); - - let recordings = self.recordings.clone(); - tokio::spawn(async move { - recordings.write(&path, buffer).await.expect("TODO"); - tracing::info!(?path, "successfully wrote the audio!"); - }); - } - } - } - EventContext::RtpPacket(_rtp_data) => {} - EventContext::RtcpPacket(_rtcp_data) => {} - EventContext::ClientDisconnect(_client_disconnect) => { - // This is already taken care of elsewhere - } - EventContext::DriverConnect(_connect_data) => {} - EventContext::DriverReconnect(_connect_data) => {} - EventContext::DriverDisconnect(_disconnect_data) => {} - other => { - tracing::warn!(?other, "cannot be handled yet"); - } - } - - None - } -} - #[tracing::instrument(skip(state))] pub async fn handle(state: State, interaction: Interaction) { let guild_and_voice_channel_id_res = @@ -279,41 +123,17 @@ pub async fn handle(state: State, interaction: Interaction) { .await .expect("TODO"); - let start_instant = Instant::now(); - let start_utc = UtcDateTime::now(); - - let audio_channels = opus2::Channels::from(state.audio_channels) as u16; - - let audio_sample_rate = u32::from(state.audio_sample_rate); - - let handler = Handler { - start_instant, - start_utc, - recordings: state.recording_data, - guild_id, - channel_id: voice_channel_id, - known_ssrcs: Default::default(), - - audio_channels, - audio_sample_rate, - - user_data_manager: state.user_data_manager, - }; - - let call = state.songbird.get_or_insert(guild_id); + match join_and_record() + .audio_channels(state.audio_channels) + .audio_sample_rate(state.audio_sample_rate) + .guild_id(guild_id) + .recording_data(state.recording_data) + .songbird(&state.songbird) + .user_data_manager(state.user_data_manager) + .voice_channel_id(voice_channel_id) + .call().await { - let mut call = call.lock().await; - - call.add_global_event(CoreEvent::SpeakingStateUpdate.into(), handler.clone()); - call.add_global_event(CoreEvent::VoiceTick.into(), handler); - - call.mute(true).await.expect("TODO"); - } - - match state.songbird.join(guild_id, voice_channel_id).await { - Ok(_call) => { - tracing::error!(?call, "successfully joined"); - + Ok(()) => { let channel_mention = format!("<#{voice_channel_id}>"); let info_mention = format!( @@ -344,7 +164,7 @@ pub async fn handle(state: State, interaction: Interaction) { ])) .await .expect("TODO"); - }, + } Err(join_error) => { tracing::error!(?join_error); let _ = state.songbird.remove(guild_id).await; diff --git a/src/lib.rs b/src/lib.rs index ae754c5..58f84b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod bot_data; +mod call; pub mod command; mod one_to_many; mod one_to_many_with_data;