Files
fomo-reducer/src/call.rs

221 lines
7.2 KiB
Rust

use crate::{
OneToManyUniqueBTreeMap, UserManager,
option_ext::OptionExt as _,
recording_data::{Clip, Recording, RecordingData, RecordingManager},
user_capnp::user::Consent,
user_data::RECORD_IF_CONSENT_UNSPECIFIED,
};
use async_trait::async_trait;
use futures::FutureExt;
use songbird::{
CoreEvent, Event, EventContext, EventHandler, Songbird,
driver::{Channels, SampleRate},
};
use std::{
sync::{Arc, Mutex},
time::Instant,
};
use time::UtcDateTime;
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
};
#[derive(Debug, Clone)]
struct Handler {
start_instant: Instant,
start_utc: UtcDateTime,
recording_manager: RecordingManager,
guild_id: Id<GuildMarker>,
channel_id: Id<ChannelMarker>,
known_ssrcs: Arc<Mutex<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>,
audio_channels: u16,
audio_sample_rate: u32,
user_manager: UserManager,
}
#[async_trait]
impl EventHandler for Handler {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
match ctx {
EventContext::Track(_items) => {
// Not expected to fire
}
EventContext::SpeakingStateUpdate(speaking) => {
tracing::error!(?speaking);
if let Some(user_id) = speaking.user_id {
let user_id = Id::new(user_id.0);
self.known_ssrcs
.lock()
.unwrap()
.insert(user_id, speaking.ssrc);
}
}
EventContext::VoiceTick(voice_tick) => {
tracing::debug!(?voice_tick);
for (ssrc, voice_data) in &voice_tick.speaking {
let user_id = {
let known_ssrcs = self.known_ssrcs.lock().unwrap();
tracing::debug!(?known_ssrcs, ?ssrc);
known_ssrcs.get_left_for(ssrc).cloned()
};
tracing::info!(?user_id);
if let Some(pcm) = &voice_data.decoded_voice {
let may_record = user_id
.map_async(|user_id| {
self.user_manager
.with(user_id, |user_data| {
user_data.get_voice_recording_consent().unwrap()
})
.map(|result| result.expect("TODO"))
})
.await
.map_or(RECORD_IF_CONSENT_UNSPECIFIED, |consent| match consent {
Consent::Unspecified => RECORD_IF_CONSENT_UNSPECIFIED,
Consent::Granted => true,
Consent::Withheld => false,
});
if !may_record {
tracing::warn!(?user_id, "may not be recorded");
continue;
}
let elapsed = self.start_instant.elapsed();
let elapsed = elapsed.try_into().expect("TODO");
let now_utc = self.start_utc.checked_add(elapsed).expect("TODO");
let year = now_utc.year();
let month = now_utc.month();
let day = now_utc.day();
let hour = now_utc.hour();
let minute = now_utc.minute();
let second = now_utc.second();
let microsecond = now_utc.microsecond();
let guild = self.guild_id;
let voice_channel = self.channel_id;
let user = user_id;
let clip = Clip {
second,
microsecond,
guild,
voice_channel,
user,
};
let recording = Recording {
year,
month,
day,
hour,
minute,
clip,
};
let channels = self.audio_channels;
let sample_rate = self.audio_sample_rate;
tracing::info!("going to write the audio shortly");
let recording_manager = self.recording_manager.clone();
let samples = pcm.clone();
let recording_data = RecordingData {
channels,
sample_rate,
samples,
};
tokio::spawn(async move {
recording_manager
.write(&recording, recording_data)
.await
.expect("TODO");
tracing::info!(?recording, "successfully wrote the audio!");
});
}
}
}
EventContext::RtpPacket(_rtp_data) => {}
EventContext::RtcpPacket(_rtcp_data) => {}
EventContext::ClientDisconnect(_client_disconnect) => {
// This is already taken care of elsewhere
}
EventContext::DriverConnect(_connect_data) => {}
EventContext::DriverReconnect(_connect_data) => {}
EventContext::DriverDisconnect(_disconnect_data) => {}
other => {
tracing::warn!(?other, "cannot be handled yet");
}
}
None
}
}
#[bon::builder]
#[tracing::instrument]
pub async fn join_and_record(
audio_channels: Channels,
audio_sample_rate: SampleRate,
guild_id: Id<GuildMarker>,
recording_manager: RecordingManager,
songbird: &Songbird,
user_manager: UserManager,
voice_channel_id: Id<ChannelMarker>,
) -> Result<(), songbird::error::JoinError> {
let start_instant = Instant::now();
let start_utc = UtcDateTime::now();
let audio_channels = opus2::Channels::from(audio_channels) as u16;
let audio_sample_rate = u32::from(audio_sample_rate);
let handler = Handler {
start_instant,
start_utc,
recording_manager,
guild_id,
channel_id: voice_channel_id,
known_ssrcs: Default::default(),
audio_channels,
audio_sample_rate,
user_manager,
};
let call = songbird.get_or_insert(guild_id);
{
let mut call = call.lock().await;
call.remove_all_global_events(); // TODO: WIP: investigating
call.add_global_event(CoreEvent::SpeakingStateUpdate.into(), handler.clone());
call.add_global_event(CoreEvent::VoiceTick.into(), handler);
if let Err(muting_error) = call.mute(true).await {
tracing::warn!(?muting_error, "couldn't mute, but that's okay");
};
}
songbird.join(guild_id, voice_channel_id).await?;
Ok(())
}