Compare commits

...

68 Commits

Author SHA1 Message Date
e6cd882b32 fix: crash and graceful shutdown improvements 2026-06-13 00:10:54 -04:00
ce77590777 fix: watchdog panicking 2026-06-12 23:53:17 -04:00
50fc35883d chore: remove "this process is still alive" that I forgot to delete 2026-06-12 23:45:30 -04:00
fa0093d582 feat: announce disconnection upon deadlock detection 2026-06-12 20:29:50 -04:00
c7300a9e6d feat: watchdog for detecting tokio runtime deadlock (I wish I could prevent it, but I don't know what's causing it) 2026-06-12 20:01:46 -04:00
a0a0632a1d chore: raise default audio sample rate to 24 khz 2026-06-12 00:02:56 -04:00
a22965a3be feat: make reconnecting after discord disconnect faster (not obligate an entire process restart) 2026-06-12 00:01:04 -04:00
c8d676693d fix: reduce rebuilding from extraneous files 2026-06-12 00:00:07 -04:00
ba0450e999 fix: coerce sample_rate to u64 correctly 2026-06-01 23:27:46 -04:00
a7f11a7202 fix: cast sample_rate to u64 in calculations 2026-06-01 23:23:43 -04:00
4fa25305b5 fix: use u64 to prevent overflowing (I can't believe I was stupid enough to think u32 could fit it) and add logging in case I was mistaken 2026-06-01 23:18:55 -04:00
65611d676d chore: reduce logging of unnecessary things in initialize_vcs 2026-05-31 16:11:38 -04:00
c8fd99cdf1 fix: in /info move the user mention to the description field (where mentions are allowed) 2026-05-31 15:30:04 -04:00
c03ccc9d39 chore: add more logging in /render to debug 2026-05-31 15:28:16 -04:00
5e989289bd feat: don't block bot startup waiting to initialize vcs 2026-05-31 14:24:09 -04:00
f5b6dc5c76 chore: remove /join and /leave to prevent confusion 2026-05-31 02:46:47 -04:00
03689f4764 fix: don't log interaction in /render 2026-05-30 01:08:59 -04:00
20bb0e4c31 fix: switch to async read-write lock instead of sync mutex in call handler to try to prevent deadlock 2026-05-29 22:22:55 -04:00
c6aa8e5d13 fix: scope borrow in heat seeking to try to prevent deadlock 2026-05-29 22:22:15 -04:00
eb17c0a33b chore: log more in /render so I can check if the number of recordings used is as expected 2026-05-29 01:21:15 -04:00
98a7a1e6fd chore: reduce amount of logging while recording in case my home server has been struggling to keep up with it 2026-05-28 12:52:58 -04:00
b27d0f32c2 fix: encode renders as wav instead of opus to address errors 2026-05-28 12:32:31 -04:00
c85e420a75 chore: use Display when logging an opus encode error in rendering instead of Debug so I can get a better look 2026-05-28 12:15:37 -04:00
8dad5648f5 fix: sample rate doesn't need to be part of the estimated max size calculation cause samples already covers that 2026-05-28 12:07:41 -04:00
e6c2342e1a chore: debug the estimated max size in rendering 2026-05-28 11:59:21 -04:00
8f29c30bec fix: more realistic estimated max size in rendering 2026-05-28 11:47:17 -04:00
6198387cc4 fix: use the encoded bytes instead of mistakenly discarding them 2026-05-28 11:35:08 -04:00
7b5be35112 fix: encode to a new vec instead of an empty one when rendering, log more stuff for debugging 2026-05-28 11:30:48 -04:00
c351358947 chore: log more stuff around rendering to debug 2026-05-28 02:45:16 -04:00
4463ff7b3a fix: constrain seconds and milliseconds in between 2026-05-28 02:35:07 -04:00
0c052ea303 chore: log recording details in /render to try to debug why so many are falling out of range 2026-05-28 02:23:57 -04:00
31d53c1e58 fix: don't log state in /render 2026-05-28 02:14:33 -04:00
24ef5a67c4 feat: implement rendering 2026-05-28 01:48:52 -04:00
862a333131 chore: rename data managers to just managers 2026-05-28 00:50:20 -04:00
b5a56b1273 chore: initialize samples array to write to later in render 2026-05-28 00:17:28 -04:00
0137f97788 feat: implement RecordingDataManager::between and between_in_vc, start using it in /render 2026-05-27 21:48:31 -04:00
23f86ace3b feat: support reading recordings, address some warnings 2026-05-27 01:56:47 -04:00
e72633f26a chore: refactor into a RecordingDataManager, lay the ground work for a RenderManager 2026-05-27 01:28:47 -04:00
f86c094dda feat: more work on /render 2026-05-26 00:10:52 -04:00
453208ff17 chore: satisfy warnings about async fn in traits 2026-05-26 00:10:28 -04:00
cfe6ddf218 fix: look on the right side for text channels when posting the join notice in heat seeking 2026-05-24 20:03:21 -04:00
6529fa02b5 feat: in the default heat algorithm, check if the bot owner is undeafened to decide if the score should be capped (so that if another vc has another unmuted person it prioritizes that one) 2026-05-24 19:34:44 -04:00
b2222384f5 chore: address clippy concern for readability 2026-05-24 19:28:19 -04:00
581a747cac fix: let heatseeking tasks be cancelled 2026-05-24 16:46:18 -04:00
c351511c8a fix: set the cancellation token when discord disconnects 2026-05-24 16:35:37 -04:00
d373352ae1 chore: don't log stuff that clogs the logs up 2026-05-24 14:29:35 -04:00
5cb938aa24 chore: format 2026-05-24 13:28:02 -04:00
56ec8aaf8c feat: allow anyone in a VC being recorded to make the bot leave 2026-05-24 13:27:53 -04:00
b598adb498 feat: heatseeking 2026-05-24 13:20:43 -04:00
e1aab0a8fb feat: get first or last (left) element with data of OneToManyUniqueBTreeMapWithData 2026-05-23 20:49:44 -04:00
97763877d8 feat: laying the groundwork for heat seeking 2026-05-21 00:59:22 -04:00
48a0c8250b chore: refactor joining and recording into a reusable function 2026-05-21 00:59:22 -04:00
4ed8d6d241 feat: get first or last element of OneToManyUniqueBTreeMap 2026-05-21 00:59:22 -04:00
c04338155b feat: error messages for ParseGuildVCToTextChannelError 2026-05-20 16:00:47 -04:00
8f433a065e fix: spawn songbird processing the event to try to address deadlock issue 2026-05-19 23:18:07 -04:00
97acdac467 fix: shorten the description of render to try to make Discord accept it 2026-05-15 00:56:15 -04:00
6befbf280e chore: very beginning of making a /render command 2026-05-14 00:59:45 -04:00
b2af146360 fix: remove handlers after failing to join call 2026-05-14 00:54:34 -04:00
c20bab2761 chore: go back to only listening for relevant events 2026-05-14 00:48:09 -04:00
f548955d16 fix: add event handlers before joining call 2026-05-14 00:47:06 -04:00
298799d43a chore: update lockfile 2026-05-14 00:41:35 -04:00
65e35e2403 fix: (hopefully) listen for all types of events 2026-05-13 23:34:12 -04:00
b1888af842 chore: more logging in call handler to try to see why all users are unknown so far 2026-05-13 22:58:40 -04:00
949ce5b38b chore: move call configuration to after handler instantiation to reduce time between them 2026-05-13 22:25:05 -04:00
a64e7190fd fix: (hopefully) scope the lock so it ends earlier 2026-05-13 22:21:51 -04:00
8416ef0609 style: cargo fmt --all 2026-05-12 23:37:20 -04:00
3d2df64ffe fix: scope the vcs_watcher borrow to possibly fix deadlock 2026-05-12 23:37:08 -04:00
09107bc97b chore: add a watchdog task that tells me if the process is still running while I debug why the bot dies sometimes without existing 2026-05-07 02:32:17 -04:00
39 changed files with 3094 additions and 1288 deletions

1082
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ edition = "2024"
[dependencies]
async-compression = { version = "0.4.41", features = ["brotli", "futures-io"] }
async-trait = "0.1.89"
bon = "3.9.1"
bytes = "1.11.1"
capnp = "0.25.3"
clap = { version = "4.5.40", features = ["derive", "env"] }
@@ -13,6 +14,8 @@ dashmap = "6.1.0"
extension-traits = "2.0.2"
futures = "0.3.32"
hound = "3.5.1"
humantime = "2.3.0"
itertools = "0.14.0"
moka = { version = "0.12.15", features = ["future"] }
opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [
"services-azfile",
@@ -53,7 +56,8 @@ opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109
] }
opus2 = "0.4.0"
patricia_tree = "0.10.1"
rhai = "1.23.6"
rayon = "1.12"
rhai = { version = "1.23.6", features = ["sync"] }
rustls = "0.23"
secrecy = { version = "0.10.3", features = ["serde"] }
shadow-rs = { version = "2.0.0", default-features = false }
@@ -67,12 +71,8 @@ songbird = { version = "0.6.0", default-features = false, features = [
"tws",
] }
strum = { version = "0.28.0", features = ["derive"] }
time = "0.3.47"
tokio = { version = "1.46.0", features = [
"rt-multi-thread",
"macros",
"signal",
] }
time = { version = "0.3.47", features = ["formatting", "macros", "parsing"] }
tokio = { version = "1.46.0", features = ["rt-multi-thread", "macros", "signal", "time"] }
tokio-util = { version = "0.7.18", features = ["io"] }
tokio-websockets-0-13 = { package = "tokio-websockets", version = "0.13", features = [
"rustls-webpki-roots",
@@ -81,6 +81,7 @@ tokio-websockets-0-11 = { package = "tokio-websockets", version = "0.11", featur
"rustls-webpki-roots",
] }
tracing = "0.1.41"
tracing-appender = "0.2.5"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
twilight-gateway = { version = "0.17", default-features = false, features = [
"rustls-webpki-roots",
@@ -94,8 +95,6 @@ twilight-http = { version = "0.17", default-features = false, features = [
twilight-model = "0.17"
twilight-util = { version = "0.17", features = ["builder"] }
typed-builder = "0.23.2"
yoke = "0.8.2"
tracing-appender = "0.2.5"
[build-dependencies]
capnpc = "0.25.3"

View File

@@ -1,6 +1,9 @@
use shadow_rs::{BuildPattern, ShadowBuilder};
fn main() {
println!("cargo::rerun-if-changed=bot.capnp");
println!("cargo::rerun-if-changed=user.capnp");
capnpc::CompilerCommand::new()
.file("bot.capnp")
.file("user.capnp")

26
src/audio_channels.rs Normal file
View File

@@ -0,0 +1,26 @@
use opus2::Channels;
use strum::EnumString;
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
pub enum AudioChannels {
Mono,
Stereo,
}
impl From<AudioChannels> for Channels {
fn from(value: AudioChannels) -> Self {
match value {
AudioChannels::Mono => Channels::Mono,
AudioChannels::Stereo => Channels::Stereo,
}
}
}
impl From<AudioChannels> for songbird::driver::Channels {
fn from(value: AudioChannels) -> Self {
match value {
AudioChannels::Mono => songbird::driver::Channels::Mono,
AudioChannels::Stereo => songbird::driver::Channels::Stereo,
}
}
}

34
src/audio_sample_rate.rs Normal file
View File

@@ -0,0 +1,34 @@
use songbird::driver::SampleRate;
use strum::EnumString;
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
pub enum AudioSampleRate {
#[strum(serialize = "8000Hz")]
Hz8000,
#[strum(serialize = "12000Hz")]
Hz12000,
#[strum(serialize = "16000Hz")]
Hz16000,
#[strum(serialize = "24000Hz")]
Hz24000,
#[strum(serialize = "48000Hz")]
Hz48000,
}
impl From<AudioSampleRate> for SampleRate {
fn from(value: AudioSampleRate) -> Self {
match value {
AudioSampleRate::Hz8000 => SampleRate::Hz8000,
AudioSampleRate::Hz12000 => SampleRate::Hz12000,
AudioSampleRate::Hz16000 => SampleRate::Hz16000,
AudioSampleRate::Hz24000 => SampleRate::Hz24000,
AudioSampleRate::Hz48000 => SampleRate::Hz48000,
}
}
}
impl From<AudioSampleRate> for u32 {
fn from(value: AudioSampleRate) -> Self {
SampleRate::from(value).into()
}
}

View File

@@ -7,11 +7,11 @@ use snafu::{ResultExt as _, Snafu};
use crate::{OperatorExt, bot_capnp, option_ext::OptionExt as _};
#[derive(Debug, Clone)]
pub struct BotDataManager {
pub struct BotManager {
operator: Operator,
}
impl BotDataManager {
impl BotManager {
pub fn new(operator: Operator) -> Self {
Self { operator }
}
@@ -32,7 +32,7 @@ pub enum WithError {
DeserializeError { source: capnp::Error },
}
impl BotDataManager {
impl BotManager {
pub async fn with<R>(
&self,
f: impl FnOnce(bot_capnp::bot::Reader<'_>) -> R,
@@ -101,7 +101,7 @@ pub enum UpdateError {
FinalizeError { source: std::io::Error },
}
impl BotDataManager {
impl BotManager {
pub async fn update<R>(
&self,
f: impl FnOnce(bot_capnp::bot::Builder<'_>) -> R,

211
src/call.rs Normal file
View File

@@ -0,0 +1,211 @@
use crate::{
OneToManyUniqueBTreeMap, UserManager,
option_ext::OptionExt as _,
recording_data::{Clip, Recording, RecordingData, RecordingManager},
user_capnp::user::Consent,
user_data::RECORD_IF_CONSENT_UNSPECIFIED,
};
use async_trait::async_trait;
use futures::FutureExt;
use songbird::{
CoreEvent, Event, EventContext, EventHandler, Songbird,
driver::{Channels, SampleRate},
};
use std::{sync::Arc, time::Instant};
use time::UtcDateTime;
use tokio::sync::RwLock;
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
};
#[derive(Debug, Clone)]
struct Handler {
start_instant: Instant,
start_utc: UtcDateTime,
recording_manager: RecordingManager,
guild_id: Id<GuildMarker>,
channel_id: Id<ChannelMarker>,
known_ssrcs: Arc<RwLock<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>,
audio_channels: u16,
audio_sample_rate: u32,
user_manager: UserManager,
}
#[async_trait]
impl EventHandler for Handler {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
match ctx {
EventContext::Track(_items) => {
// Not expected to fire
}
EventContext::SpeakingStateUpdate(speaking) => {
tracing::error!(?speaking);
if let Some(user_id) = speaking.user_id {
let user_id = Id::new(user_id.0);
self.known_ssrcs
.write()
.await
.insert(user_id, speaking.ssrc);
}
}
EventContext::VoiceTick(voice_tick) => {
for (ssrc, voice_data) in &voice_tick.speaking {
let user_id = {
let known_ssrcs = self.known_ssrcs.read().await;
known_ssrcs.get_left_for(ssrc).cloned()
};
if let Some(pcm) = &voice_data.decoded_voice {
let may_record = user_id
.map_async(|user_id| {
self.user_manager
.with(user_id, |user_data| {
user_data.get_voice_recording_consent().unwrap()
})
.map(|result| result.expect("TODO"))
})
.await
.map_or(RECORD_IF_CONSENT_UNSPECIFIED, |consent| match consent {
Consent::Unspecified => RECORD_IF_CONSENT_UNSPECIFIED,
Consent::Granted => true,
Consent::Withheld => false,
});
if !may_record {
tracing::warn!(?user_id, "may not be recorded");
continue;
}
let elapsed = self.start_instant.elapsed();
let elapsed = elapsed.try_into().expect("TODO");
let now_utc = self.start_utc.checked_add(elapsed).expect("TODO");
let year = now_utc.year();
let month = now_utc.month();
let day = now_utc.day();
let hour = now_utc.hour();
let minute = now_utc.minute();
let second = now_utc.second();
let microsecond = now_utc.microsecond();
let guild = self.guild_id;
let voice_channel = self.channel_id;
let user = user_id;
let clip = Clip {
second,
microsecond,
guild,
voice_channel,
user,
};
let recording = Recording {
year,
month,
day,
hour,
minute,
clip,
};
let channels = self.audio_channels;
let sample_rate = self.audio_sample_rate;
let recording_manager = self.recording_manager.clone();
let samples = pcm.clone();
let recording_data = RecordingData {
channels,
sample_rate,
samples,
};
tokio::spawn(async move {
recording_manager
.write(&recording, recording_data)
.await
.expect("TODO");
tracing::info!(%recording, "successfully wrote the audio!");
});
}
}
}
EventContext::RtpPacket(_rtp_data) => {}
EventContext::RtcpPacket(_rtcp_data) => {}
EventContext::ClientDisconnect(_client_disconnect) => {
// This is already taken care of elsewhere
}
EventContext::DriverConnect(_connect_data) => {}
EventContext::DriverReconnect(_connect_data) => {}
EventContext::DriverDisconnect(_disconnect_data) => {}
other => {
tracing::warn!(?other, "cannot be handled yet");
}
}
None
}
}
#[bon::builder]
#[tracing::instrument]
pub async fn join_and_record(
audio_channels: Channels,
audio_sample_rate: SampleRate,
guild_id: Id<GuildMarker>,
recording_manager: RecordingManager,
songbird: &Songbird,
user_manager: UserManager,
voice_channel_id: Id<ChannelMarker>,
) -> Result<(), songbird::error::JoinError> {
let start_instant = Instant::now();
let start_utc = UtcDateTime::now();
let audio_channels = opus2::Channels::from(audio_channels) as u16;
let audio_sample_rate = u32::from(audio_sample_rate);
let handler = Handler {
start_instant,
start_utc,
recording_manager,
guild_id,
channel_id: voice_channel_id,
known_ssrcs: Default::default(),
audio_channels,
audio_sample_rate,
user_manager,
};
let call = songbird.get_or_insert(guild_id);
{
let mut call = call.lock().await;
call.remove_all_global_events(); // TODO: WIP: investigating
call.add_global_event(CoreEvent::SpeakingStateUpdate.into(), handler.clone());
call.add_global_event(CoreEvent::VoiceTick.into(), handler);
if let Err(muting_error) = call.mute(true).await {
tracing::warn!(?muting_error, "couldn't mute, but that's okay");
};
}
songbird.join(guild_id, voice_channel_id).await?;
Ok(())
}

View File

@@ -88,7 +88,7 @@ pub async fn handle(state: State, interaction: Interaction) {
if is_bot_owner {
let heat_script_description = state
.bot_data_manager
.bot_manager
.with(|bot_data| {
let heat_script_option = bot_data.has_heat_script().then(|| {
bot_data
@@ -118,11 +118,11 @@ pub async fn handle(state: State, interaction: Interaction) {
.await
.expect("TODO");
let mut user_id_stream = state.user_data_manager.list().await.expect("TODO");
let mut user_id_stream = state.user_manager.list().await.expect("TODO");
while let Some(user_id) = user_id_stream.try_next().await.expect("TODO") {
let (consent, notification_script) = state
.user_data_manager
.user_manager
.with(user_id, |user_data| {
let consent = user_data.get_voice_recording_consent().unwrap();
let notification_script = user_data.has_notification_script().then_some(
@@ -145,7 +145,7 @@ pub async fn handle(state: State, interaction: Interaction) {
.interaction(state.discord_application_id)
.create_followup(&interaction.token)
.embeds(&[EmbedBuilder::new()
.author(EmbedAuthorBuilder::new(user_mention))
.description(user_mention)
.field(EmbedFieldBuilder::new("Consent", format!("{consent:?}")).build())
.field(
EmbedFieldBuilder::new(

View File

@@ -1,346 +0,0 @@
use crate::{
OneToManyUniqueBTreeMap, UserDataManager, VCs, command::State, option_ext::OptionExt as _,
user_capnp::user::Consent, user_data::RECORD_IF_CONSENT_UNSPECIFIED,
};
use async_trait::async_trait;
use futures::FutureExt;
use hound::{SampleFormat, WavSpec};
use opendal::Operator;
use snafu::{OptionExt as _, Snafu};
use songbird::{CoreEvent, Event, EventContext, EventHandler};
use std::{
io::Cursor,
sync::{Arc, LazyLock, Mutex},
time::Instant,
};
use time::UtcDateTime;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
channel::message::{Embed, MessageFlags},
http::interaction::{InteractionResponse, InteractionResponseType},
id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
},
};
use twilight_util::builder::{
InteractionResponseDataBuilder, command::CommandBuilder, embed::EmbedBuilder,
};
const NAME: &str = "join";
const DESCRIPTION: &str = "The bot will join the same VC as you (with intention to record)";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[derive(Debug, Snafu)]
enum GetGuildAndVoiceChannelIdError {
/// this command was not used inside a guild (Discord server)
NotInGuild,
/// there is no user who invoked this command
NoUser,
/// the user is not in a voice chat in this guild
UserNotInVC,
}
#[tracing::instrument]
fn get_guild_and_voice_channel_id(
interaction: &Interaction,
vcs: &VCs,
) -> Result<(Id<GuildMarker>, Id<ChannelMarker>), GetGuildAndVoiceChannelIdError> {
let guild_id = interaction.guild_id.context(NotInGuildSnafu)?;
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.context(NoUserSnafu)?;
let &voice_channel_id = vcs
.get(&guild_id)
.context(UserNotInVCSnafu)?
.get_left_for(&user_id)
.context(UserNotInVCSnafu)?;
Ok((guild_id, voice_channel_id))
}
fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Embed {
match error {
GetGuildAndVoiceChannelIdError::NotInGuild => {
EmbedBuilder::new().title("Use this in a server").description("This bot can't find a VC to join if the command is used outside of a server (you might've used it in a DM?).").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::NoUser => {
EmbedBuilder::new().title("Not invoked by a user").description("This command works by joining the same VC as the user, but this bot didn't receive any user data. So did no user invoke it?! (This error should be impossible!)").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::UserNotInVC => {
EmbedBuilder::new().title("You're not in a VC").description("This bot can't follow you into VC if you aren't in one in this server.").validate().unwrap().build()
},
}
}
#[derive(Debug, Clone)]
struct Handler {
start_instant: Instant,
start_utc: UtcDateTime,
recordings: Operator,
guild_id: Id<GuildMarker>,
channel_id: Id<ChannelMarker>,
known_ssrcs: Arc<Mutex<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>,
audio_channels: u16,
audio_sample_rate: u32,
user_data_manager: UserDataManager,
}
#[async_trait]
impl EventHandler for Handler {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
match ctx {
EventContext::Track(_items) => {
// Not expected to fire
}
EventContext::SpeakingStateUpdate(speaking) => {
tracing::error!(?speaking);
if let Some(user_id) = speaking.user_id {
let user_id = Id::new(user_id.0);
self.known_ssrcs
.lock()
.unwrap()
.insert(user_id, speaking.ssrc);
}
}
EventContext::VoiceTick(voice_tick) => {
tracing::debug!(?voice_tick);
for (ssrc, voice_data) in &voice_tick.speaking {
let user_id = self.known_ssrcs.lock().unwrap().get_left_for(ssrc).cloned();
tracing::info!(?user_id);
if let Some(pcm) = &voice_data.decoded_voice {
let may_record = user_id
.map_async(|user_id| {
self.user_data_manager
.with(user_id, |user_data| {
user_data.get_voice_recording_consent().unwrap()
})
.map(|result| result.expect("TODO"))
})
.await
.map_or(RECORD_IF_CONSENT_UNSPECIFIED, |consent| match consent {
Consent::Unspecified => RECORD_IF_CONSENT_UNSPECIFIED,
Consent::Granted => true,
Consent::Withheld => false,
});
if !may_record {
tracing::warn!(?user_id, "may not be recorded");
continue;
}
let elapsed = self.start_instant.elapsed();
let elapsed = elapsed.try_into().expect("TODO");
let now_utc = self.start_utc.checked_add(elapsed).expect("TODO");
let year = now_utc.year();
let month = now_utc.month();
let day = now_utc.day();
let hour = now_utc.hour();
let minute = now_utc.minute();
let second = now_utc.second();
let microseconds = now_utc.microsecond();
let guild_id = self.guild_id;
let channel_id = self.channel_id;
let user = user_id
.as_ref()
.map_or_else(|| "UNKNOWN".into(), ToString::to_string);
let path = format!(
"{year}/{month}/{day}/{hour}/{minute}/audio-{second}.{microseconds}-{guild_id}-{channel_id}-{user}.wav"
);
let channels = self.audio_channels;
let sample_rate = self.audio_sample_rate;
let wav_spec = WavSpec {
channels,
sample_rate,
bits_per_sample: 16,
sample_format: SampleFormat::Int,
};
let mut buffer = Vec::new();
let writer = Cursor::new(&mut buffer);
let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO");
let mut sample_writer = wav_writer.get_i16_writer(pcm.len() as u32);
for sample in pcm {
sample_writer.write_sample(*sample);
}
sample_writer.flush().expect("TODO");
wav_writer.finalize().expect("TODO");
tracing::info!("going to write the audio shortly");
let recordings = self.recordings.clone();
tokio::spawn(async move {
recordings.write(&path, buffer).await.expect("TODO");
tracing::info!("successfully wrote the audio!");
});
}
}
}
EventContext::RtpPacket(_rtp_data) => {}
EventContext::RtcpPacket(_rtcp_data) => {}
EventContext::ClientDisconnect(_client_disconnect) => {
// This is already taken care of elsewhere
}
EventContext::DriverConnect(_connect_data) => {}
EventContext::DriverReconnect(_connect_data) => {}
EventContext::DriverDisconnect(_disconnect_data) => {}
other => {
tracing::warn!(?other, "cannot be handled yet");
}
}
None
}
}
#[tracing::instrument(skip(state))]
pub async fn handle(state: State, interaction: Interaction) {
let guild_and_voice_channel_id_res =
get_guild_and_voice_channel_id(&interaction, &state.vcs_watcher.borrow());
let (guild_id, voice_channel_id) = match guild_and_voice_channel_id_res {
Ok((guild_id, voice_channel_id)) => (guild_id, voice_channel_id),
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([get_guild_and_vc_error_to_embed(error)])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::DeferredChannelMessageWithSource,
data: None,
},
)
.await
.expect("TODO");
let call = state
.songbird
.join(guild_id, voice_channel_id)
.await
.expect("TODO");
tracing::error!(?call, "successfully joined");
let start_instant = Instant::now();
let start_utc = UtcDateTime::now();
let audio_channels = opus2::Channels::from(state.audio_channels) as u16;
let audio_sample_rate = u32::from(state.audio_sample_rate);
let handler = Handler {
start_instant,
start_utc,
recordings: state.recording_data,
guild_id,
channel_id: voice_channel_id,
known_ssrcs: Default::default(),
audio_channels,
audio_sample_rate,
user_data_manager: state.user_data_manager,
};
{
let mut call = call.lock().await;
call.add_global_event(CoreEvent::SpeakingStateUpdate.into(), handler.clone());
call.add_global_event(CoreEvent::VoiceTick.into(), handler);
call.mute(true).await.expect("TODO");
}
let channel_mention = format!("<#{voice_channel_id}>");
let info_mention = format!(
"</{}:{}>",
state.discord_info_command_name, state.discord_info_command_id
);
let opt_in_mention = format!(
"</{}:{}>",
state.discord_opt_in_command_name, state.discord_opt_in_command_id
);
let opt_out_mention = format!(
"</{}:{}>",
state.discord_opt_out_command_name, state.discord_opt_out_command_id
);
state
.discord_client
.interaction(state.discord_application_id)
.update_response(
&interaction.token,
).embeds(Some(&[
EmbedBuilder::new()
.title("Joined VC to record")
.description(format!("This bot joined {channel_mention} and intends to record. You can opt out with {opt_out_mention} or explicitly opt in with {opt_in_mention} (I'd appreciate this one). Please use {info_mention} for more information about this bot."))
.validate()
.unwrap()
.build()
]))
.await
.expect("TODO");
}

View File

@@ -1,169 +0,0 @@
use crate::VCs;
use crate::command::State;
use snafu::{OptionExt, Snafu};
use std::sync::LazyLock;
use twilight_model::channel::message::{Embed, MessageFlags};
use twilight_model::http::interaction::{InteractionResponse, InteractionResponseType};
use twilight_model::id::marker::UserMarker;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
id::{
Id,
marker::{ChannelMarker, GuildMarker},
},
};
use twilight_util::builder::InteractionResponseDataBuilder;
use twilight_util::builder::command::CommandBuilder;
use twilight_util::builder::embed::EmbedBuilder;
const NAME: &str = "leave";
const DESCRIPTION: &str = "The bot will leave the VC it's in (so it won't record anyone anymore)";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[derive(Debug, Snafu)]
pub enum GetGuildAndVoiceChannelIdError {
/// this command was not used inside a guild (Discord server)
NotInGuild,
/// there is no user who invoked this command
NoUser,
/// the bot is not in a voice chat in this guild
BotNotInVC,
}
#[tracing::instrument]
pub fn get_user_and_guild_and_voice_channel_id(
bot_user_id: Id<UserMarker>,
interaction: &Interaction,
vcs: &VCs,
) -> Result<(Id<UserMarker>, Id<GuildMarker>, Id<ChannelMarker>), GetGuildAndVoiceChannelIdError> {
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.context(NoUserSnafu)?;
let guild_id = interaction.guild_id.context(NotInGuildSnafu)?;
let &voice_channel_id = vcs
.get(&guild_id)
.context(BotNotInVCSnafu)?
.get_left_for(&bot_user_id)
.context(BotNotInVCSnafu)?;
Ok((user_id, guild_id, voice_channel_id))
}
fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Embed {
match error {
GetGuildAndVoiceChannelIdError::NotInGuild => {
EmbedBuilder::new().title("Use this in a server").description("This bot can't tell which VC to leave if the command is used outside of a server (you might've used it in a DM?).").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::NoUser => {
EmbedBuilder::new().title("Not invoked by a user").description("This command works by joining the same VC as the user, but this bot didn't receive any user data. So did no user invoke it?! (This error should be impossible!)").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::BotNotInVC => {
EmbedBuilder::new().title("Not in a VC").description("This bot can't leave VC if it isn't in one in this server.").validate().unwrap().build()
},
}
}
#[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) {
let user_and_guild_and_voice_channel_id_res = get_user_and_guild_and_voice_channel_id(
state.discord_user_id,
&interaction,
&state.vcs_watcher.borrow(),
);
let (user_id, guild_id, voice_channel_id) = match user_and_guild_and_voice_channel_id_res {
Ok((user_id, guild_id, voice_channel_id)) => (user_id, guild_id, voice_channel_id),
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([get_guild_and_vc_error_to_embed(error)])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
if user_id != state.discord_bot_owner_user_id {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([
EmbedBuilder::new().title("No permission to make this bot leave").description("Only the owner of this bot is allowed to make the bot leave RIGHT NOW. You might be looking for the opt out command.").validate().unwrap().build()
])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
state.songbird.leave(guild_id).await.expect("TODO");
tracing::error!("TODO: successfully left the call");
let channel_mention = format!("<#{voice_channel_id}>");
state
.discord_client
.interaction(state.discord_application_id)
.create_response(interaction.id, &interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([
EmbedBuilder::new()
.title("Left VC")
.description(format!(
"This bot left {channel_mention} (and is thereby unable to record anymore)."
))
.validate()
.unwrap()
.build()
])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},)
.await
.expect("TODO");
}

View File

@@ -1,7 +1,6 @@
use std::{fmt::Debug, sync::Arc};
use futures::future::BoxFuture;
use opendal::Operator;
use patricia_tree::StringPatriciaMap;
use songbird::{
Songbird,
@@ -16,19 +15,21 @@ use twilight_model::{
},
};
use crate::{BotDataManager, GuildVoiceChannelToTextChannel, UserDataManager, VCsWatcher};
use crate::{
AudioChannels, AudioSampleRate, BotManager, GuildVoiceChannelToTextChannel, UserManager,
VCsSender, recording_data::RecordingManager, render_data::RenderManager,
};
pub mod info;
pub mod join;
pub mod leave;
pub mod opt_in;
pub mod opt_out;
pub mod render;
#[derive(Debug, Clone)]
pub struct State {
pub audio_channels: Channels,
pub audio_sample_rate: SampleRate,
pub bot_data_manager: BotDataManager,
pub audio_channels: AudioChannels,
pub audio_sample_rate: AudioSampleRate,
pub bot_manager: BotManager,
pub cancellation_token: CancellationToken,
pub discord_application_id: Id<ApplicationMarker>,
pub discord_bot_owner_user_id: Id<UserMarker>,
@@ -41,10 +42,11 @@ pub struct State {
pub discord_opt_out_command_name: Arc<str>,
pub discord_user_id: Id<UserMarker>,
pub discord_voice_channel_corresponding_text_channel: Arc<GuildVoiceChannelToTextChannel>,
pub recording_data: Operator,
pub recording_manager: RecordingManager,
pub render_manager: RenderManager,
pub songbird: Arc<Songbird>,
pub user_data_manager: UserDataManager,
pub vcs_watcher: VCsWatcher,
pub user_manager: UserManager,
pub vcs_sender: VCsSender,
}
type Return = ();
@@ -61,10 +63,9 @@ where
pub fn all() -> Vec<(&'static Command, BoxedHandler)> {
vec![
(&info::COMMAND, box_handler(info::handle)),
(&join::COMMAND, box_handler(join::handle)),
(&leave::COMMAND, box_handler(leave::handle)),
(&opt_in::COMMAND, box_handler(opt_in::handle)),
(&opt_out::COMMAND, box_handler(opt_out::handle)),
(&render::COMMAND, box_handler(render::handle)),
]
}

View File

@@ -54,7 +54,7 @@ pub async fn handle(state: State, interaction: Interaction) {
};
let previous_consent = state
.user_data_manager
.user_manager
.update(user_id, |mut user_data| {
let previous_consent = user_data
.reborrow()

View File

@@ -54,7 +54,7 @@ pub async fn handle(state: State, interaction: Interaction) {
};
let previous_consent = state
.user_data_manager
.user_manager
.update(user_id, |mut user_data| {
let previous_consent = user_data
.reborrow()

415
src/command/render.rs Normal file
View File

@@ -0,0 +1,415 @@
use futures::TryStreamExt as _;
use snafu::{OptionExt as _, Report, ResultExt as _, Snafu};
use std::{collections::BTreeMap, sync::LazyLock};
use time::{Date, Time, UtcDateTime, format_description::well_known::Rfc3339};
use twilight_model::{
application::{
command::{Command, CommandOption, CommandOptionType, CommandType},
interaction::{Interaction, InteractionData, application_command::CommandOptionValue},
},
channel::{
ChannelType,
message::{Embed, MessageFlags},
},
http::interaction::{InteractionResponse, InteractionResponseType},
id::{Id, marker::ChannelMarker},
};
use twilight_util::builder::{
InteractionResponseDataBuilder, command::CommandBuilder, embed::EmbedBuilder,
};
use crate::{
command::State,
recording_data::{Clip, Recording, RecordingData},
render_data::{Render, RenderData},
};
const NAME: &str = "render";
const DESCRIPTION: &str =
"(Only the bot owner can use this) Make an audio file from the specified range of VC";
const OPTION_VOICE_CHANNEL: &str = "voice-channel";
const OPTION_START: &str = "start";
const OPTION_END: &str = "end";
const DATE_FORMAT: Rfc3339 = Rfc3339;
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.option(CommandOption {
autocomplete: None,
channel_types: Some(vec![ChannelType::GuildVoice]),
choices: None,
description: "Which voice channel to render a recording from".into(),
description_localizations: None,
kind: CommandOptionType::Channel,
max_length: None,
max_value: None,
min_length: None,
min_value: None,
name: OPTION_VOICE_CHANNEL.into(),
name_localizations: None,
options: None,
required: Some(true),
})
.option(CommandOption {
autocomplete: None,
channel_types: None,
choices: None,
description: "What UTC datetime to start from".into(),
description_localizations: None,
kind: CommandOptionType::String,
max_length: None,
max_value: None,
min_length: None,
min_value: None,
name: OPTION_START.into(),
name_localizations: None,
options: None,
required: Some(true),
})
.option(CommandOption {
autocomplete: None,
channel_types: None,
choices: None,
description: "What UTC datetime to end at".into(),
description_localizations: None,
kind: CommandOptionType::String,
max_length: None,
max_value: None,
min_length: None,
min_value: None,
name: OPTION_END.into(),
name_localizations: None,
options: None,
required: Some(true),
})
.validate()
.expect("command wasn't correct")
.build()
});
struct Options {
voice_channel_id: Id<ChannelMarker>,
start: UtcDateTime,
end: UtcDateTime,
}
#[derive(Debug, Snafu)]
enum ParseOptionsError {
/// could not get any interaction data
NoInteractionData,
/// this wasn't a command invocation
NotCommandInvocation,
/// a voice channel wasn't selected
NoVoiceChannel,
/// a start time wasn't specified
NoStart,
/// an end time wasn't specified
NoEnd,
/// voice channel was {actual:?} instead of a channel ID
VoiceChannelInvalidType { actual: CommandOptionValue },
/// start was {actual:?} instead of a string
StartInvalidType { actual: CommandOptionValue },
/// end was {actual:?} instead of a string
EndInvalidType { actual: CommandOptionValue },
/// could not parse `start` as a date in RFC3339 format
StartInvalidDate { source: time::error::Parse },
/// could not parse `start` as a date in RFC3339 format
EndInvalidDate { source: time::error::Parse },
}
impl From<ParseOptionsError> for Embed {
fn from(error: ParseOptionsError) -> Self {
EmbedBuilder::new()
.title("Error parsing options")
.description(Report::from_error(error).to_string())
.validate()
.unwrap()
.build()
}
}
fn parse_options(interaction: &Interaction) -> Result<Options, ParseOptionsError> {
let interaction_data = interaction.data.as_ref().context(NoInteractionDataSnafu)?;
let InteractionData::ApplicationCommand(command_data) = interaction_data else {
return Err(ParseOptionsError::NotCommandInvocation);
};
let mut options: BTreeMap<&str, &CommandOptionValue> = command_data
.options
.iter()
.map(|command_data_option| {
(
command_data_option.name.as_str(),
&command_data_option.value,
)
})
.collect();
let voice_channel_id = options
.remove(OPTION_VOICE_CHANNEL)
.context(NoVoiceChannelSnafu)?;
let start = options.remove(OPTION_START).context(NoStartSnafu)?;
let end = options.remove(OPTION_END).context(NoEndSnafu)?;
let &CommandOptionValue::Channel(voice_channel_id) = voice_channel_id else {
return Err(ParseOptionsError::VoiceChannelInvalidType {
actual: voice_channel_id.clone(),
});
};
let &CommandOptionValue::String(ref start) = start else {
return Err(ParseOptionsError::StartInvalidType {
actual: start.clone(),
});
};
let &CommandOptionValue::String(ref end) = end else {
return Err(ParseOptionsError::StartInvalidType {
actual: end.clone(),
});
};
let start = UtcDateTime::parse(start, &DATE_FORMAT).context(StartInvalidDateSnafu)?;
let end = UtcDateTime::parse(end, &DATE_FORMAT).context(EndInvalidDateSnafu)?;
Ok(Options {
voice_channel_id,
start,
end,
})
}
#[tracing::instrument(skip(state, interaction))]
pub async fn handle(state: State, interaction: Interaction) {
let Some(guild_id) = interaction.guild_id else {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([EmbedBuilder::new()
.title("Not in a server")
.description(
"This command can only work when used in a Discord server.",
)
.validate()
.unwrap()
.build()])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
};
let bot_owner_user_id = state.discord_bot_owner_user_id;
let is_bot_owner = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.map(|user_id| user_id == bot_owner_user_id)
.unwrap_or(false);
if !is_bot_owner {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([EmbedBuilder::new()
.title("No permission")
.description("Only the bot owner can use this command.")
.validate()
.unwrap()
.build()])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
let Options {
voice_channel_id,
start,
end,
} = match parse_options(&interaction) {
Ok(options) => options,
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([error.into()])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
let duration = end - start;
tracing::info!(?voice_channel_id, ?start, ?end, ?duration);
let channels = state.audio_channels.into();
let sample_rate = state.audio_sample_rate.into();
let total_samples = (duration.whole_seconds() as u64 * sample_rate as u64)
+ (duration.subsec_microseconds() as u64 * sample_rate as u64 / 1_000_000);
let mut composite = vec![0; total_samples as usize];
let mut recordings =
state
.recording_manager
.between_in_vc(start, end, guild_id, voice_channel_id);
let mut recordings_used = 0;
while let Some(recording) = recordings.try_next().await.expect("TODO") {
let recording_data = state
.recording_manager
.read(&recording)
.await
.expect("TODO");
let RecordingData {
channels,
sample_rate,
samples,
} = recording_data;
let Recording {
year,
month,
day,
hour,
minute,
clip,
} = recording;
let Clip {
second,
microsecond,
guild,
voice_channel,
user,
} = clip;
let date = Date::from_calendar_date(year, month, day).unwrap();
let time = Time::from_hms_micro(hour, minute, second, microsecond).unwrap();
let datetime = UtcDateTime::new(date, time);
let after_start = datetime - start;
let progress_by_time = after_start / duration;
let origin = (after_start.whole_seconds() as u64 * sample_rate as u64)
+ (after_start.subsec_microseconds() as u64 * sample_rate as u64 / 1_000_000);
let origin = origin as usize;
let progress_by_sample = (origin as f64) / (total_samples as f64);
let samples_length = samples.len();
let extent = origin + samples_length;
tracing::debug!(
progress_by_time,
progress_by_sample,
?after_start,
?duration,
origin,
total_samples,
samples_length,
extent
);
for (i, sample) in samples.into_iter().enumerate() {
if let Some(composite_sample) = composite.get_mut(origin + i) {
*composite_sample += sample;
} else {
tracing::error!(
?start,
?end,
?year,
?month,
?day,
?hour,
?minute,
?second,
?microsecond,
origin,
samples_length,
extent,
i,
total_samples,
"out of range"
);
}
}
recordings_used += 1;
}
tracing::debug!(?recordings_used);
let render = Render {
start,
end,
guild_id,
voice_channel_id,
};
let render_data = RenderData {
channels,
sample_rate,
samples: composite,
};
let render_result = state.render_manager.write(&render, render_data).await;
tracing::info!(?render_result);
render_result.expect("TODO");
tracing::info!(%render, "written");
todo!();
}

406
src/heat_seek.rs Normal file
View File

@@ -0,0 +1,406 @@
use std::{collections::BTreeMap, num::NonZero, str::Utf8Error, sync::Arc};
use futures::{StreamExt as _, stream::FuturesUnordered};
use itertools::Itertools as _;
use snafu::{ResultExt as _, Snafu};
use tokio::sync::watch;
use tokio_util::{sync::CancellationToken, time::FutureExt};
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
};
use twilight_util::builder::embed::EmbedBuilder;
use crate::{
BotManager, OneToManyUniqueBTreeMap, State, UserInVCData, bot_data,
call::join_and_record,
track_vcs::VCsInGuild,
vc_user::{Camera, Headphone, Microphone, Stream},
};
type Heat = u64;
type Hot = NonZero<Heat>;
type ChannelHeat = BTreeMap<Id<ChannelMarker>, Heat>;
type HeatMap = OneToManyUniqueBTreeMap<Heat, Id<ChannelMarker>>;
#[tracing::instrument(skip(state))]
pub async fn heat_seek(state: State) {
let mut vcs_watcher = state.vcs_sender.subscribe();
let mut vcs_in_guild_senders = BTreeMap::default();
loop {
{
for (&guild_id, vcs_in_guild) in &*vcs_watcher.borrow() {
let vcs_in_guild_sender =
vcs_in_guild_senders.entry(guild_id).or_insert_with(|| {
let (vcs_in_guild_sender, vcs_in_guild_watcher) =
watch::channel(Default::default());
let (channel_heat_sender, channel_heat_watcher) =
watch::channel(Default::default());
let (heat_map_sender, heat_map_watcher) =
watch::channel(Default::default());
let (hottest_vc_sender, hottest_vc_watcher) =
watch::channel(Default::default());
tokio::spawn(
evaluate_heat()
.bot_manager(state.bot_manager.clone())
.bot_owner_user_id(state.discord_bot_owner_user_id)
.bot_user_id(state.discord_user_id)
.cancellation_token(state.cancellation_token.clone())
.channel_heat_sender(channel_heat_sender)
.vcs_in_guild_watcher(vcs_in_guild_watcher)
.call(),
);
tokio::spawn(
map_heat()
.cancellation_token(state.cancellation_token.clone())
.channel_heat_watcher(channel_heat_watcher)
.heat_map_sender(heat_map_sender)
.call(),
);
tokio::spawn(
track_hottest_vc()
.cancellation_token(state.cancellation_token.clone())
.heat_map_watcher(heat_map_watcher)
.hottest_vc_sender(hottest_vc_sender)
.call(),
);
tokio::spawn(follow_hottest_vc(
state.clone(),
guild_id,
hottest_vc_watcher,
));
vcs_in_guild_sender
});
vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone()));
}
}
if matches!(
vcs_watcher
.changed()
.with_cancellation_token(&state.cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}
#[derive(Debug, Snafu)]
enum GetHeatError {
/// couldn't retrieve bot data
WithBotDataError { source: bot_data::WithError },
/// couldn't get the heat script from the bot data
GetHeatScriptError { source: capnp::Error },
/// the heat script is not a valid UTF-8 string
HeatScriptInvalidUtf8 { source: Utf8Error },
/// the heat script is not valid Rhai code
HeatScriptInvalidRhai { source: rhai::ParseError },
/// failed while evaluating the heat script
HeatScriptEvaluationError { source: Box<rhai::EvalAltResult> },
}
#[bon::builder]
#[tracing::instrument]
async fn get_heat(
users_in_vc: &BTreeMap<Id<UserMarker>, UserInVCData>,
bot_user_id: Id<UserMarker>,
bot_owner_user_id: Id<UserMarker>,
bot_manager: &BotManager,
) -> Result<Heat, GetHeatError> {
let heat_script = bot_manager
.with(|bot_data| {
bot_data.has_heat_script().then(|| {
bot_data
.get_heat_script()
.map(|heat_script| heat_script.to_string())
})
})
.await
.context(WithBotDataSnafu)?
.transpose()
.context(GetHeatScriptSnafu)?
.transpose()
.context(HeatScriptInvalidUtf8Snafu)?;
let engine = rhai::Engine::new();
let heat_function = heat_script
.map(|heat_script| engine.compile(heat_script))
.transpose()
.context(HeatScriptInvalidRhaiSnafu)?;
let heat = heat_function
.map(|heat_function| {
let mut scope = Default::default();
let args = (); // TODO
engine.call_fn(&mut scope, &heat_function, "heat", args)
})
.transpose()
.context(HeatScriptEvaluationSnafu)?;
let heat = heat.unwrap_or_else(|| {
tracing::warn!("using default heat scoring algorithm as no script was specified");
let mut users_in_vc = users_in_vc.clone();
let _bot = users_in_vc.remove(&bot_user_id);
let bot_owner = users_in_vc.remove(&bot_owner_user_id);
let mut heat = 0;
for (_user_id, user_in_vc_data) in users_in_vc {
if matches!(user_in_vc_data.microphone, Microphone::Unmuted) {
heat += 1000;
}
if matches!(user_in_vc_data.camera, Camera::Showing) {
heat += 100;
}
if matches!(user_in_vc_data.stream, Stream::Sharing) {
heat += 10;
}
if matches!(user_in_vc_data.headphone, Headphone::Undeafened) {
heat += 1;
}
}
let bot_owner_might_be_listening =
bot_owner.is_some_and(|user_data| matches!(user_data.headphone, Headphone::Undeafened));
if bot_owner_might_be_listening {
heat = heat.min(999);
}
heat
});
Ok(heat)
}
#[bon::builder]
#[tracing::instrument(skip(vcs_in_guild_watcher, channel_heat_sender))]
async fn evaluate_heat(
bot_manager: BotManager,
bot_owner_user_id: Id<UserMarker>,
bot_user_id: Id<UserMarker>,
cancellation_token: CancellationToken,
mut vcs_in_guild_watcher: watch::Receiver<Arc<VCsInGuild>>,
channel_heat_sender: watch::Sender<ChannelHeat>,
) {
loop {
let vcs_in_guild = { vcs_in_guild_watcher.borrow().clone() };
let channel_heat_results: BTreeMap<_, _> = {
FuturesUnordered::from_iter((&*vcs_in_guild).into_iter().map(
|(&channel_id, users_in_vc)| {
let bot_manager = bot_manager.clone();
async move {
(
channel_id,
get_heat()
.bot_manager(&bot_manager)
.bot_owner_user_id(bot_owner_user_id)
.bot_user_id(bot_user_id)
.users_in_vc(users_in_vc)
.call()
.await,
)
}
},
))
}
.collect()
.await;
let (channel_heat, get_heat_errors): (ChannelHeat, Vec<_>) = channel_heat_results
.into_iter()
.map(|(channel_id, heat_result)| heat_result.map(|heat| (channel_id, heat)))
.partition_result();
channel_heat_sender.send_replace(channel_heat);
for get_heat_error in get_heat_errors {
tracing::error!(?get_heat_error, "failed to evaluate heat of channel")
}
if matches!(
vcs_in_guild_watcher
.changed()
.with_cancellation_token(&cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}
#[bon::builder]
#[tracing::instrument(skip(channel_heat_watcher, heat_map_sender))]
async fn map_heat(
cancellation_token: CancellationToken,
mut channel_heat_watcher: watch::Receiver<ChannelHeat>,
heat_map_sender: watch::Sender<HeatMap>,
) {
loop {
heat_map_sender.send_if_modified(|heat_map| {
let mut changed = false;
for (&channel, &heat) in &*channel_heat_watcher.borrow() {
let existing = heat_map.insert(heat, channel);
if existing.is_none_or(|(old_heat, old_channel)| {
old_heat != heat || channel != old_channel
}) {
changed = true;
}
}
changed
});
if matches!(
channel_heat_watcher
.changed()
.with_cancellation_token(&cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}
#[bon::builder]
#[tracing::instrument(skip(heat_map_watcher, hottest_vc_sender))]
async fn track_hottest_vc(
cancellation_token: CancellationToken,
mut heat_map_watcher: watch::Receiver<HeatMap>,
hottest_vc_sender: watch::Sender<Option<Id<ChannelMarker>>>,
) {
loop {
let new_hottest_vc_option = {
heat_map_watcher
.borrow()
.last_left_and_rights()
.and_then(|(&heat, hottest_vcs)| {
let hot_option = Hot::new(heat);
hot_option.map(|_| *hottest_vcs.first().unwrap())
})
};
hottest_vc_sender.send_if_modified(|old_hottest_vc_option| {
let modified = (*old_hottest_vc_option) != new_hottest_vc_option;
*old_hottest_vc_option = new_hottest_vc_option;
modified
});
if matches!(
heat_map_watcher
.changed()
.with_cancellation_token(&cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}
#[tracing::instrument(skip(state, hottest_vc_watcher))]
async fn follow_hottest_vc(
state: State,
guild_id: Id<GuildMarker>,
mut hottest_vc_watcher: watch::Receiver<Option<Id<ChannelMarker>>>,
) {
loop {
let hottest_vc_option = { *hottest_vc_watcher.borrow() };
match hottest_vc_option {
Some(hottest_vc) => {
match join_and_record()
.audio_channels(state.audio_channels.into())
.audio_sample_rate(state.audio_sample_rate.into())
.guild_id(guild_id)
.recording_manager(state.recording_manager.clone())
.songbird(&state.songbird)
.user_manager(state.user_manager.clone())
.voice_channel_id(hottest_vc)
.call()
.await
{
Ok(()) => {
let text_channel = state
.discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&hottest_vc).copied()
})
.unwrap_or(hottest_vc);
let vc_mention = format!("<#{hottest_vc}>");
let info_mention = format!(
"</{}:{}>",
state.discord_info_command_name, state.discord_info_command_id
);
let opt_in_mention = format!(
"</{}:{}>",
state.discord_opt_in_command_name, state.discord_opt_in_command_id
);
let opt_out_mention = format!(
"</{}:{}>",
state.discord_opt_out_command_name, state.discord_opt_out_command_id
);
if let Err(posting_recording_disclosure_error) = state
.discord_client
.create_message(text_channel)
.embeds(&[
EmbedBuilder::new()
.title("Joined VC to record")
.description(format!("This bot joined {vc_mention} and intends to record. You can opt out with {opt_out_mention} or explicitly opt in with {opt_in_mention} (I'd appreciate this one). Please use {info_mention} for more information about this bot."))
.validate()
.unwrap()
.build()
])
.await {
tracing::error!(?text_channel, ?posting_recording_disclosure_error, "couldn't post a recording disclosure");
}
}
Err(joining_to_record_error) => {
tracing::error!(
?hottest_vc,
?joining_to_record_error,
"couldn't join to record"
);
}
}
}
None => {
if let Err(leaving_error) = state.songbird.leave(guild_id).await {
tracing::error!(?leaving_error, "couldn't leave vc");
}
}
}
if matches!(
hottest_vc_watcher
.changed()
.with_cancellation_token(&state.cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}

View File

@@ -1,10 +1,16 @@
mod audio_channels;
mod audio_sample_rate;
mod bot_data;
mod call;
pub mod command;
mod heat_seek;
mod one_to_many;
mod one_to_many_with_data;
mod one_to_one;
mod operator_ext;
mod option_ext;
mod recording_data;
mod render_data;
mod storage;
mod track_vcs;
mod user_data;
@@ -13,13 +19,18 @@ capnp::generated_code!(mod bot_capnp);
capnp::generated_code!(mod user_capnp);
shadow_rs::shadow!(build_info);
pub use bot_data::BotDataManager;
pub use audio_channels::AudioChannels;
pub use audio_sample_rate::AudioSampleRate;
pub use bot_data::BotManager;
pub use command::{Router as CommandRouter, State, all as all_commands};
pub use heat_seek::heat_seek;
pub use one_to_many::OneToManyUniqueBTreeMap;
pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData;
pub use one_to_one::OneToOneBTreeMap;
pub use operator_ext::OperatorExt;
pub use recording_data::RecordingManager;
pub use render_data::RenderManager;
pub use storage::Storage;
pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, VCsWatcher, initialize_vcs, update_vcs};
pub use user_data::UserDataManager;
pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, VCsSender, initialize_vcs, update_vcs};
pub use user_data::UserManager;
pub use vc_user::{UserInVCData, VoiceStatus};

View File

@@ -1,17 +1,22 @@
use clap::Parser;
use fomo_reducer::{
BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager,
VCsWatcher, all_commands, command, initialize_vcs, update_vcs,
AudioChannels, AudioSampleRate, BotManager, CommandRouter, GuildVoiceChannelToTextChannel,
RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command,
heat_seek, initialize_vcs, update_vcs,
};
use futures::{StreamExt as _, stream::FuturesUnordered};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use secrecy::{ExposeSecret, SecretString};
use snafu::{OptionExt, ResultExt, Snafu};
use songbird::{
Config, Songbird,
driver::{Channels, DecodeConfig, SampleRate},
shards::TwilightMap,
use songbird::{Config, Songbird, driver::DecodeConfig, shards::TwilightMap};
use std::{
collections::BTreeMap,
fmt::{Debug, Display},
num::NonZero,
str::FromStr,
sync::Arc,
time::Duration,
};
use std::{collections::BTreeMap, fmt::Debug, str::FromStr, sync::Arc};
use strum::EnumString;
use tokio::{select, signal::ctrl_c, task::JoinSet};
use tokio_util::{sync::CancellationToken, time::FutureExt as _};
use tracing::Level;
@@ -19,7 +24,7 @@ use tracing_subscriber::{
EnvFilter,
fmt::{format::FmtSpan, writer::MakeWriterExt},
};
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt};
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt as _};
use twilight_model::{
application::interaction::InteractionData,
gateway::{
@@ -32,61 +37,25 @@ use twilight_model::{
},
};
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
enum AudioChannels {
Mono,
Stereo,
}
impl From<AudioChannels> for Channels {
fn from(value: AudioChannels) -> Self {
match value {
AudioChannels::Mono => Channels::Mono,
AudioChannels::Stereo => Channels::Stereo,
}
}
}
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
enum AudioSampleRate {
#[strum(serialize = "8000Hz")]
Hz8000,
#[strum(serialize = "12000Hz")]
Hz12000,
#[strum(serialize = "16000Hz")]
Hz16000,
#[strum(serialize = "24000Hz")]
Hz24000,
#[strum(serialize = "48000Hz")]
Hz48000,
}
impl From<AudioSampleRate> for SampleRate {
fn from(value: AudioSampleRate) -> Self {
match value {
AudioSampleRate::Hz8000 => SampleRate::Hz8000,
AudioSampleRate::Hz12000 => SampleRate::Hz12000,
AudioSampleRate::Hz16000 => SampleRate::Hz16000,
AudioSampleRate::Hz24000 => SampleRate::Hz24000,
AudioSampleRate::Hz48000 => SampleRate::Hz48000,
}
}
}
#[derive(Debug, Snafu)]
enum ParseGuildVCToTextChannelError {
/// the guild ID needs to be included with : before the voice channel to text channel mapping
NoScope,
/// a voice channel ID needs to be specified then -> to the corresponding text channel ID
NoRelation,
/// could not parse the guild ID
ParseGuildError {
source: <Id<GuildMarker> as FromStr>::Err,
},
/// could not parse the voice channel ID
ParseVoiceChannelError {
source: <Id<ChannelMarker> as FromStr>::Err,
},
/// could not parse the text channel ID
ParseTextChannelError {
source: <Id<ChannelMarker> as FromStr>::Err,
},
@@ -108,6 +77,29 @@ fn parse_guild_vc_to_text_channel(
Ok((guild, voice_channel, text_channel))
}
#[derive(Clone)]
struct HumanDuration(Duration);
impl FromStr for HumanDuration {
type Err = humantime::DurationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
humantime::parse_duration(s).map(Self)
}
}
impl Debug for HumanDuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Display for HumanDuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", humantime::format_duration(self.0))
}
}
#[derive(Debug, Parser)]
struct AppArgs {
#[arg(long, env)]
@@ -120,7 +112,7 @@ struct AppArgs {
discord_nickname: Option<Arc<str>>,
#[arg(long, env)]
discord_status: Option<Arc<str>>,
discord_status: Option<String>,
#[arg(long, env, value_parser = parse_guild_vc_to_text_channel)]
discord_voice_channel_corresponding_text_channel:
@@ -129,7 +121,7 @@ struct AppArgs {
#[arg(long, env, default_value_t = AudioChannels::Mono)]
audio_channels: AudioChannels,
#[arg(long, env, default_value_t = AudioSampleRate::Hz12000)]
#[arg(long, env, default_value_t = AudioSampleRate::Hz24000)]
audio_sample_rate: AudioSampleRate,
#[arg(long, env)]
@@ -140,6 +132,18 @@ struct AppArgs {
#[arg(long, env)]
recording_data: Storage,
#[arg(long, env)]
render_data: Storage,
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(120)))]
watchdog_warmup: HumanDuration,
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))]
watchdog_frequency: HumanDuration,
#[arg(long, env, default_value_t = 8.try_into().unwrap())]
watchdog_channel_size: NonZero<usize>,
}
#[derive(Parser)]
@@ -202,6 +206,10 @@ async fn main() -> Result<(), MainError> {
bot_data,
user_data,
recording_data,
render_data,
watchdog_warmup: HumanDuration(watchdog_warmup),
watchdog_frequency: HumanDuration(watchdog_frequency),
watchdog_channel_size,
} = app_args;
let cancellation_token = CancellationToken::new();
@@ -252,35 +260,6 @@ async fn main() -> Result<(), MainError> {
let discord_application_id = current_application.id;
let intents = Intents::GUILD_VOICE_STATES;
let config = twilight_gateway::Config::new(discord_token.expose_secret().to_owned(), intents);
let shards = twilight_gateway::create_recommended(&discord_client, config, |_id, builder| {
builder.build()
})
.await
.expect("TODO");
let shards = Vec::from_iter(shards);
let senders = TwilightMap::new(
shards
.iter()
.map(|shard| (shard.id().number(), shard.sender()))
.collect(),
);
let audio_channels = audio_channels.into();
let audio_sample_rate = audio_sample_rate.into();
let senders = Arc::new(senders);
let songbird = Songbird::twilight(senders, discord_user_id);
songbird.set_config(
Config::default().decode_mode(songbird::driver::DecodeMode::Decode(DecodeConfig::new(
audio_channels,
audio_sample_rate,
))),
);
let interaction_client = discord_client.interaction(discord_application_id);
let commands = all_commands();
@@ -320,25 +299,25 @@ async fn main() -> Result<(), MainError> {
let discord_opt_in_command_id = discord_opt_in_command.id.expect("TODO");
let discord_opt_out_command_id = discord_opt_out_command.id.expect("TODO");
let discord_info_command_name = discord_info_command.name.into();
let discord_opt_in_command_name = discord_opt_in_command.name.into();
let discord_opt_out_command_name = discord_opt_out_command.name.into();
let vcs = initialize_vcs(&discord_client).await;
let discord_info_command_name: Arc<str> = discord_info_command.name.into();
let discord_opt_in_command_name: Arc<str> = discord_opt_in_command.name.into();
let discord_opt_out_command_name: Arc<str> = discord_opt_out_command.name.into();
let command_router = CommandRouter::from_iter(commands);
let command_router = Arc::new(command_router);
let discord_client = Arc::new(discord_client);
let songbird = Arc::new(songbird);
let vcs_watcher = VCsWatcher::new(vcs);
let vcs_sender = VCsSender::new(Default::default());
let bot_data = bot_data.into_inner();
let recording_data = recording_data.into_inner();
let render_data = render_data.into_inner();
let user_data = user_data.into_inner();
let bot_data_manager = BotDataManager::new(bot_data);
let user_data_manager = UserDataManager::new(user_data);
let bot_manager = BotManager::new(bot_data);
let recording_manager = RecordingManager::new(recording_data);
let render_manager = RenderManager::new(render_data);
let user_manager = UserManager::new(user_data);
let discord_voice_channel_corresponding_text_channel = {
let mut map = GuildVoiceChannelToTextChannel::default();
@@ -356,56 +335,6 @@ async fn main() -> Result<(), MainError> {
let discord_voice_channel_corresponding_text_channel =
Arc::new(discord_voice_channel_corresponding_text_channel);
let state = State {
audio_channels,
audio_sample_rate,
bot_data_manager,
cancellation_token: cancellation_token.clone(),
discord_application_id,
discord_bot_owner_user_id,
discord_client,
discord_info_command_id,
discord_info_command_name,
discord_opt_in_command_id,
discord_opt_in_command_name,
discord_opt_out_command_id,
discord_opt_out_command_name,
discord_user_id,
discord_voice_channel_corresponding_text_channel,
recording_data,
songbird,
user_data_manager,
vcs_watcher,
};
if let Some(discord_status) = discord_status {
shards.iter().for_each(|shard| {
shard.command(
&UpdatePresence::new(
vec![
MinimalActivity {
kind: ActivityType::Listening,
name: (*discord_status).to_owned(),
url: None,
}
.into(),
],
false,
None,
Status::Idle,
)
.expect("TODO"),
)
});
}
let run_shards = shards
.into_iter()
.map(|shard| handle_events(command_router.clone(), state.clone(), shard));
let run_shards = JoinSet::from_iter(run_shards);
let run_shards = run_shards.join_all();
tokio::pin!(run_shards);
tokio::spawn({
let cancellation_token = cancellation_token.clone();
async move {
@@ -416,15 +345,196 @@ async fn main() -> Result<(), MainError> {
}
});
select! {
_ = &mut run_shards => {
Ok(())
}
() = cancellation_token.cancelled() => {
tracing::warn!("waiting for tasks to gracefully shut down");
run_shards.await;
let (mut watchdog_tx, mut watchdog_rx) =
futures::channel::mpsc::channel(watchdog_channel_size.get());
Err(MainError::Cancelled)
std::thread::spawn({
let discord_voice_channel_corresponding_text_channel =
discord_voice_channel_corresponding_text_channel.clone();
let discord_client = discord_client.clone();
let vcs_watcher = vcs_sender.subscribe();
move || {
std::thread::sleep(watchdog_warmup);
loop {
tracing::debug!("waiting to send check-in");
if watchdog_tx.try_send(()).is_err() {
tracing::error!("tokio runtime deadlocked");
vcs_watcher
.borrow()
.par_iter()
.for_each(|(&guild_id, vcs_in_guild)| {
if let Some(&voice_channel_id) =
vcs_in_guild.get_left_for(&discord_user_id)
{
let text_channel_id =
discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&voice_channel_id).copied()
})
.unwrap_or(voice_channel_id);
tokio::runtime::Runtime::new().unwrap().block_on(discord_client.create_message(text_channel_id).content("so sorry I died, I'm in purgatory now, I don't like it here.\nbut I will be back in 5-20 minutes (even if it says I'm still there, I'm not currently recording and will be disconnected soon before later reconnecting and announcing recording again)").into_future());
}
});
std::process::exit(1);
}
std::thread::sleep(watchdog_frequency);
}
}
});
tokio::spawn(async move {
tokio::time::sleep(watchdog_warmup).await;
loop {
tracing::debug!("waiting to acknowledge the watchdog");
if watchdog_rx.recv().await.is_err() {
tracing::error!("watchdog died (this should be impossible)");
std::process::exit(1);
}
}
});
loop {
tokio::spawn({
let vcs_sender = vcs_sender.clone();
let discord_client = discord_client.clone();
async move { initialize_vcs(&vcs_sender, &discord_client).await }
});
let intents = Intents::GUILD_VOICE_STATES;
let config =
twilight_gateway::Config::new(discord_token.expose_secret().to_owned(), intents);
let shards =
twilight_gateway::create_recommended(&discord_client, config, |_id, builder| {
builder.build()
})
.await
.expect("TODO");
let shards = Vec::from_iter(shards);
let senders = TwilightMap::new(
shards
.iter()
.map(|shard| (shard.id().number(), shard.sender()))
.collect(),
);
let senders = Arc::new(senders);
let songbird = Songbird::twilight(senders, discord_user_id);
songbird.set_config(
Config::default().decode_mode(songbird::driver::DecodeMode::Decode(DecodeConfig::new(
audio_channels.into(),
audio_sample_rate.into(),
))),
);
if let Some(discord_status) = &discord_status {
shards.iter().for_each(|shard| {
shard.command(
&UpdatePresence::new(
vec![
MinimalActivity {
kind: ActivityType::Listening,
name: discord_status.clone(),
url: None,
}
.into(),
],
false,
None,
Status::Idle,
)
.expect("TODO"),
)
});
}
let songbird = Arc::new(songbird);
let state = State {
audio_channels,
audio_sample_rate,
bot_manager: bot_manager.clone(),
cancellation_token: cancellation_token.clone(),
discord_application_id,
discord_bot_owner_user_id,
discord_client: discord_client.clone(),
discord_info_command_id,
discord_info_command_name: discord_info_command_name.clone(),
discord_opt_in_command_id,
discord_opt_in_command_name: discord_opt_in_command_name.clone(),
discord_opt_out_command_id,
discord_opt_out_command_name: discord_opt_out_command_name.clone(),
discord_user_id,
discord_voice_channel_corresponding_text_channel:
discord_voice_channel_corresponding_text_channel.clone(),
recording_manager: recording_manager.clone(),
render_manager: render_manager.clone(),
songbird,
user_manager: user_manager.clone(),
vcs_sender: vcs_sender.clone(),
};
let mut heat_seeking = tokio::spawn(heat_seek(state.clone()));
let run_shards = shards
.into_iter()
.map(|shard| handle_events(command_router.clone(), state.clone(), shard));
let mut run_shards = JoinSet::from_iter(run_shards);
select! {
_heat_seeking_exited = &mut heat_seeking => {
tracing::warn!("heat seeking exited, which shouldn't happen. let's try again");
continue;
}
_first_shard_exited = run_shards.join_next() => {
tracing::warn!("a shard exited when it's not supposed to, let's try reconnecting them all");
heat_seeking.abort();
continue;
}
() = cancellation_token.cancelled() => {
tracing::warn!("gracefully shutting down");
FuturesUnordered::from_iter(
vcs_sender
.borrow()
.iter()
.map(|(&guild_id, vcs_in_guild)| {
let discord_client = discord_client.clone();
let discord_voice_channel_corresponding_text_channel = discord_voice_channel_corresponding_text_channel.clone();
async move {
if let Some(&voice_channel_id) =
vcs_in_guild.get_left_for(&discord_user_id)
{
let text_channel_id =
discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&voice_channel_id).copied()
})
.unwrap_or(voice_channel_id);
let _ = discord_client.create_message(text_channel_id).content("probably about to be updated, back in 5-20 minutes").await;
}
}
})
).collect::<()>().await;
heat_seeking.await.unwrap();
run_shards.join_all().await;
return Err(MainError::Cancelled);
}
}
}
}
@@ -444,7 +554,7 @@ async fn handle_events(command_router: Arc<CommandRouter>, state: State, mut sha
match event_res {
Ok(twilight_model::gateway::event::Event::GatewayClose(frame_option)) => {
tracing::warn!(?frame_option);
return;
break;
}
Ok(event) => {
handle_event(command_router.clone(), state.clone(), event).await;
@@ -456,23 +566,33 @@ async fn handle_events(command_router: Arc<CommandRouter>, state: State, mut sha
) =>
{
tracing::error!(?reconnect_error);
return;
break;
}
Err(error) => {
tracing::error!(?error);
}
}
}
state.cancellation_token.cancel();
}
#[tracing::instrument(skip(command_router, state))]
async fn handle_event(command_router: Arc<CommandRouter>, state: State, event: Event) {
state.songbird.process(&event).await;
tokio::spawn({
let event = event.clone();
let songbird = state.songbird.clone();
async move {
songbird.process(&event).await;
}
})
.await
.unwrap();
match event {
Event::VoiceStateUpdate(voice_state_update) => {
state
.vcs_watcher
.vcs_sender
.send_modify(|vcs| update_vcs(&voice_state_update, vcs));
}
Event::InteractionCreate(interaction_create) => {

View File

@@ -42,6 +42,14 @@ where
self.right_to_left.get(right)
}
pub fn first_left_and_rights(&self) -> Option<(&Left, &BTreeSet<Right>)> {
self.left_to_rights.first_key_value()
}
pub fn last_left_and_rights(&self) -> Option<(&Left, &BTreeSet<Right>)> {
self.left_to_rights.last_key_value()
}
pub fn remove_left(&mut self, left: &Left) -> Option<(Left, BTreeSet<Right>)> {
let (left, rights) = self.left_to_rights.remove_entry(left)?;

View File

@@ -40,7 +40,7 @@ where
old
}
pub fn get_rights_for(&self, left: &Left) -> Option<&BTreeMap<Right, RightData>> {
pub fn get_rights_and_data_for(&self, left: &Left) -> Option<&BTreeMap<Right, RightData>> {
self.left_to_rights.get(left)
}
@@ -48,9 +48,17 @@ where
self.right_to_left.get(right)
}
pub fn first_left_and_rights_and_data(&self) -> Option<(&Left, &BTreeMap<Right, RightData>)> {
self.left_to_rights.first_key_value()
}
pub fn last_left_and_rights_and_data(&self) -> Option<(&Left, &BTreeMap<Right, RightData>)> {
self.left_to_rights.last_key_value()
}
pub fn get_left_and_data_for(&self, right: &Right) -> Option<(&Left, &RightData)> {
let left = self.right_to_left.get(right)?;
let rights = self.get_rights_for(left)?;
let rights = self.get_rights_and_data_for(left)?;
let right_data = rights.get(right)?;
@@ -86,6 +94,30 @@ where
}
}
impl<Left, Right, RightData> IntoIterator
for OneToManyUniqueBTreeMapWithData<Left, Right, RightData>
{
type Item = (Left, BTreeMap<Right, RightData>);
type IntoIter = <BTreeMap<Left, BTreeMap<Right, RightData>> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.left_to_rights.into_iter()
}
}
impl<'a, Left, Right, RightData> IntoIterator
for &'a OneToManyUniqueBTreeMapWithData<Left, Right, RightData>
{
type Item = (&'a Left, &'a BTreeMap<Right, RightData>);
type IntoIter = <&'a BTreeMap<Left, BTreeMap<Right, RightData>> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.left_to_rights.iter()
}
}
impl<Left, Right, RightData> FromIterator<(Left, Right, RightData)>
for OneToManyUniqueBTreeMapWithData<Left, Right, RightData>
where

View File

@@ -3,23 +3,30 @@ use opendal::{Buffer, Error, ErrorKind, FuturesAsyncReader, Operator};
#[extension(pub trait OperatorExt)]
impl Operator {
async fn read_if_exists(&self, path: &str) -> Result<Option<Buffer>, Error> {
match self.read(path).await {
Ok(buffer) => Ok(Some(buffer)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error),
fn read_if_exists(
&self,
path: &str,
) -> impl Future<Output = Result<Option<Buffer>, Error>> + Send {
async {
match self.read(path).await {
Ok(buffer) => Ok(Some(buffer)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error),
}
}
}
async fn async_reader_if_exists(
fn async_reader_if_exists(
&self,
path: &str,
) -> Result<Option<FuturesAsyncReader>, Error> {
let reader = self.reader(path).await?;
match reader.into_futures_async_read(..).await {
Ok(reader) => Ok(Some(reader)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error),
) -> impl Future<Output = Result<Option<FuturesAsyncReader>, Error>> + Send {
async {
let reader = self.reader(path).await?;
match reader.into_futures_async_read(..).await {
Ok(reader) => Ok(Some(reader)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error),
}
}
}
}

View File

@@ -0,0 +1,196 @@
use futures::{SinkExt, StreamExt as _, TryStream, TryStreamExt as _};
use snafu::Snafu;
use time::{Month, UtcDateTime};
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker},
};
use super::{ClipEntryError, ListError, Recording, RecordingManager};
const BUFFER_SIZE: usize = 2048;
#[derive(Debug, Snafu)]
pub enum RecordingEntryError {
/// could not list (at least some) clips
ListClipsError { source: ListError },
/// could not receive this clip entry
ClipEntryError { source: ClipEntryError },
}
impl RecordingManager {
pub fn between(
&self,
start: UtcDateTime,
end: UtcDateTime,
) -> impl TryStream<Ok = Recording, Error = RecordingEntryError> + Unpin {
let this = self.clone();
let (mut sink, stream) = futures::channel::mpsc::channel(BUFFER_SIZE);
tokio::spawn(async move {
let year_start = start.year();
let year_end = end.year();
let years = year_start..=year_end;
for year in years {
let mut month_start = start.month();
let mut month_end = end.month();
if year > year_start {
month_start = Month::January;
}
if year < year_end {
month_end = Month::December;
}
let months = month_start as u8..=month_end as u8;
let months = months.map(|month| Month::try_from(month).unwrap());
for month in months {
let mut day_start = start.day();
let mut day_end = end.day();
if month > month_start {
day_start = 1;
}
if month < month_end {
day_end = 31;
}
let days = day_start..=day_end;
for day in days {
let mut hour_start = start.hour();
let mut hour_end = end.hour();
if day > day_start {
hour_start = 0;
}
if day < day_end {
hour_end = 23;
}
let hours = hour_start..=hour_end;
for hour in hours {
let mut minute_start = start.minute();
let mut minute_end = end.minute();
if hour > hour_start {
minute_start = 0;
}
if hour < hour_end {
minute_end = 59;
}
let minutes = minute_start..=minute_end;
for minute in minutes {
match this.clips(year, month, day, hour, minute).await {
Err(list_error) => {
let _ = sink
.send(Err(RecordingEntryError::ListClipsError {
source: list_error,
}))
.await;
}
Ok(clips) => {
let mut clips = clips.into_stream();
while let Some(clip_result) = clips.next().await {
match clip_result {
Err(entry_error) => {
let _ = sink
.send(Err(
RecordingEntryError::ClipEntryError {
source: entry_error,
},
))
.await;
}
Ok(clip) => {
let mut second_start = start.second();
let mut second_end = end.second();
if minute > minute_start {
second_start = 0;
}
if minute < minute_end {
second_end = 59;
}
let seconds = second_start..=second_end;
let second = clip.second;
if !seconds.contains(&second) {
continue;
}
let mut microsecond_start = start.microsecond();
let mut microsecond_end = end.microsecond();
if second > second_start {
microsecond_start = 0;
}
if second < second_end {
microsecond_end = 999_999;
}
let microseconds =
microsecond_start..=microsecond_end;
let microsecond = clip.microsecond;
if !microseconds.contains(&microsecond) {
continue;
}
let recording = Recording {
year,
month,
day,
hour,
minute,
clip,
};
let _ = sink.send(Ok(recording)).await;
}
}
}
}
}
}
}
}
}
}
});
stream
}
}
impl RecordingManager {
pub fn between_in_vc(
&self,
start: UtcDateTime,
end: UtcDateTime,
guild_id: Id<GuildMarker>,
voice_channel_id: Id<ChannelMarker>,
) -> impl TryStream<Ok = Recording, Error = RecordingEntryError> + Unpin {
self.between(start, end).try_filter(move |recording| {
std::future::ready(
recording.clip.guild == guild_id
&& recording.clip.voice_channel == voice_channel_id,
)
})
}
}

110
src/recording_data/clip.rs Normal file
View File

@@ -0,0 +1,110 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{ResultExt as _, Snafu};
use std::{fmt::Display, str::FromStr};
use super::{
CreateListerSnafu, Day, Guild, Hour, ListError, Microsecond, Minute, Month, RecordingManager,
Second, User, VoiceChannel, Year, guild, microsecond, second, user, voice_channel,
};
#[derive(Debug, Clone)]
pub struct Clip {
pub second: Second,
pub microsecond: Microsecond,
pub guild: Guild,
pub voice_channel: VoiceChannel,
pub user: User,
}
#[derive(Debug, Snafu)]
pub enum TakeError {
/// could not parse the second out of the clip metadata
TakeSecondError { source: second::TakeError },
/// could not parse the microsecond out of the clip metadata
TakeMicrosecondError { source: microsecond::TakeError },
/// could not parse the guild out of the clip metadata
TakeGuildError { source: guild::TakeError },
/// could not parse the voice channel out of the clip metadata
TakeVoiceChannelError { source: voice_channel::TakeError },
/// could not parse the user out of the clip metadata
TakeUserError { source: user::TakeError },
}
pub fn take(s: &str) -> Result<Clip, TakeError> {
let (second, s) = second::take(s).context(TakeSecondSnafu)?;
let (microsecond, s) = microsecond::take(s).context(TakeMicrosecondSnafu)?;
let (guild, s) = guild::take(s).context(TakeGuildSnafu)?;
let (voice_channel, s) = voice_channel::take(s).context(TakeVoiceChannelSnafu)?;
let user = user::take(s).context(TakeUserSnafu)?;
Ok(Clip {
second,
microsecond,
guild,
voice_channel,
user,
})
}
impl FromStr for Clip {
type Err = TakeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
take(s)
}
}
impl Display for Clip {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
second,
microsecond,
guild,
voice_channel,
user,
} = self;
let user = user
.as_ref()
.map_or_else(|| "UNKNOWN".into(), ToString::to_string);
write!(
f,
"audio-{second}.{microsecond}-{guild}-{voice_channel}-{user}.wav"
)
}
}
#[derive(Debug, Snafu)]
pub enum ClipEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a clip
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn clips(
&self,
year: Year,
month: Month,
day: Day,
hour: Hour,
minute: Minute,
) -> Result<impl TryStream<Ok = Clip, Error = ClipEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/{day}/{hour}/{minute}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| ClipEntryError::ReceiveEntryError { source: error })
.and_then(|entry| std::future::ready(entry.name().parse().context(ParseSnafu))))
}
}

57
src/recording_data/day.rs Normal file
View File

@@ -0,0 +1,57 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, ListError, Month, RecordingManager, Year};
pub type Day = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// days are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the day as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(s: &str) -> Result<(Day, &str), TakeError> {
let (day, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let day = day.parse().context(ParseIntegerSnafu)?;
Ok((day, rest))
}
#[derive(Debug, Snafu)]
pub enum DayEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a day
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn days(
&self,
year: Year,
month: Month,
) -> Result<impl TryStream<Ok = Day, Error = DayEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| DayEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(day, _rest)| day)
.context(ParseSnafu),
)
}))
}
}

View File

@@ -0,0 +1,22 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::str::FromStr;
use twilight_model::id::{Id, marker::GuildMarker};
pub type Guild = Id<GuildMarker>;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// guilds are supposed to be followed by -
Malformed,
/// could not parse the guild ID
ParseIdError { source: <Guild as FromStr>::Err },
}
pub fn take(path: &str) -> Result<(Guild, &str), TakeError> {
let (guild, rest) = path.split_once('-').context(MalformedSnafu)?;
let guild = guild.parse().context(ParseIdSnafu)?;
Ok((guild, rest))
}

View File

@@ -0,0 +1,58 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, Day, ListError, Month, RecordingManager, Year};
pub type Hour = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// hours are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the hour as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(s: &str) -> Result<(Hour, &str), TakeError> {
let (hour, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let hour = hour.parse().context(ParseIntegerSnafu)?;
Ok((hour, rest))
}
#[derive(Debug, Snafu)]
pub enum HourEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a hour
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn hours(
&self,
year: Year,
month: Month,
day: Day,
) -> Result<impl TryStream<Ok = Hour, Error = HourEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/{day}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| HourEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(hour, _rest)| hour)
.context(ParseSnafu),
)
}))
}
}

View File

@@ -0,0 +1,21 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
pub type Microsecond = u32;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// microseconds are supposed to be followed by -
Malformed,
/// could not parse the microsecond as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<(Microsecond, &str), TakeError> {
let (microsecond, rest) = path.split_once('-').context(MalformedSnafu)?;
let microsecond = microsecond.parse().context(ParseIntegerSnafu)?;
Ok((microsecond, rest))
}

View File

@@ -0,0 +1,59 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, Day, Hour, ListError, Month, RecordingManager, Year};
pub type Minute = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// minutes are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the minute as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(s: &str) -> Result<(Minute, &str), TakeError> {
let (minute, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let minute = minute.parse().context(ParseIntegerSnafu)?;
Ok((minute, rest))
}
#[derive(Debug, Snafu)]
pub enum MinuteEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a minute
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn minutes(
&self,
year: Year,
month: Month,
day: Day,
hour: Hour,
) -> Result<impl TryStream<Ok = Minute, Error = MinuteEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/{day}/{hour}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| MinuteEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(minute, _rest)| minute)
.context(ParseSnafu),
)
}))
}
}

131
src/recording_data/mod.rs Normal file
View File

@@ -0,0 +1,131 @@
use std::{convert::Infallible, io::Cursor};
use hound::{SampleFormat, WavReader, WavSpec};
use opendal::Operator;
use snafu::Snafu;
mod between;
mod clip;
mod day;
mod guild;
mod hour;
mod microsecond;
mod minute;
mod month;
mod recording;
mod second;
mod user;
mod voice_channel;
mod year;
pub use between::RecordingEntryError;
pub use clip::{Clip, ClipEntryError};
use day::Day;
use guild::Guild;
use hour::Hour;
use microsecond::Microsecond;
use minute::Minute;
use month::Month;
pub use recording::Recording;
use second::Second;
use user::User;
use voice_channel::VoiceChannel;
use year::Year;
#[derive(Debug, Clone)]
pub struct RecordingManager {
operator: Operator,
}
impl RecordingManager {
pub fn new(operator: Operator) -> Self {
Self { operator }
}
}
#[derive(Debug)]
pub struct RecordingData {
pub channels: u16,
pub sample_rate: u32,
pub samples: Vec<i16>,
}
impl RecordingManager {
pub async fn write(
&self,
recording: &Recording,
RecordingData {
channels,
sample_rate,
samples,
}: RecordingData,
) -> Result<
(),
Infallible, // TODO: a real error type
> {
let wav_spec = WavSpec {
channels,
sample_rate,
bits_per_sample: 16,
sample_format: SampleFormat::Int,
};
let mut buffer = Vec::new();
let writer = Cursor::new(&mut buffer);
let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO");
let mut sample_writer = wav_writer.get_i16_writer(samples.len() as u32);
for sample in samples {
sample_writer.write_sample(sample);
}
sample_writer.flush().expect("TODO");
wav_writer.finalize().expect("TODO");
let path = recording.to_string();
self.operator.write(&path, buffer).await.expect("TODO");
Ok(())
}
}
impl RecordingManager {
pub async fn read(
&self,
recording: &Recording,
) -> Result<
RecordingData,
Infallible, // TODO: a real error type
> {
let path = recording.to_string();
let buffer = self.operator.read(&path).await.expect("TODO");
let bytes = buffer.to_bytes();
let cursor = Cursor::new(bytes);
let wav_reader = WavReader::new(cursor).expect("TODO");
let wav_spec = wav_reader.spec();
let channels = wav_spec.channels;
let sample_rate = wav_spec.sample_rate;
let samples = wav_reader.into_samples();
let samples = Result::from_iter(samples).expect("TODO");
Ok(RecordingData {
channels,
sample_rate,
samples,
})
}
}
#[derive(Debug, Snafu)]
pub enum ListError {
/// error creating a lister through the storage operator
CreateListerError { source: opendal::Error },
}

View File

@@ -0,0 +1,55 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use super::{CreateListerSnafu, ListError, RecordingManager, Year};
pub use time::Month;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// months are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the month as its name
ParseMonthNameError { source: time::error::InvalidVariant },
}
pub fn take(s: &str) -> Result<(Month, &str), TakeError> {
let (month, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let month = month.parse().context(ParseMonthNameSnafu)?;
Ok((month, rest))
}
#[derive(Debug, Snafu)]
pub enum MonthEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a month
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn months(
&self,
year: Year,
) -> Result<impl TryStream<Ok = Month, Error = MonthEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| MonthEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(month, _rest)| month)
.context(ParseSnafu),
)
}))
}
}

View File

@@ -0,0 +1,76 @@
use snafu::{ResultExt as _, Snafu};
use std::{fmt::Display, str::FromStr};
use super::{Clip, Day, Hour, Minute, Month, Year, clip, day, hour, minute, month, year};
#[derive(Debug, Clone)]
pub struct Recording {
pub year: Year,
pub month: Month,
pub day: Day,
pub hour: Hour,
pub minute: Minute,
pub clip: Clip,
}
#[derive(Debug, Snafu)]
pub enum TakeError {
/// could not parse the year out of the recording
TakeYearError { source: year::TakeError },
/// could not parse the month out of the recording
TakeMonthError { source: month::TakeError },
/// could not parse the day out of the recording
TakeDayError { source: day::TakeError },
/// could not parse the hour out of the recording
TakeHourError { source: hour::TakeError },
/// could not parse the minute out of the recording
TakeMinuteError { source: minute::TakeError },
/// could not parse the clip out of the recording
TakeClipError { source: clip::TakeError },
}
pub fn take(s: &str) -> Result<Recording, TakeError> {
let (year, s) = year::take(s).context(TakeYearSnafu)?;
let (month, s) = month::take(s).context(TakeMonthSnafu)?;
let (day, s) = day::take(s).context(TakeDaySnafu)?;
let (hour, s) = hour::take(s).context(TakeHourSnafu)?;
let (minute, s) = minute::take(s).context(TakeMinuteSnafu)?;
let clip = clip::take(s).context(TakeClipSnafu)?;
Ok(Recording {
year,
month,
day,
hour,
minute,
clip,
})
}
impl FromStr for Recording {
type Err = TakeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
take(s)
}
}
impl Display for Recording {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
year,
month,
day,
hour,
minute,
clip,
} = self;
write!(f, "{year}/{month}/{day}/{hour}/{minute}/{clip}")
}
}

View File

@@ -0,0 +1,25 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
pub type Second = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// seconds are supposed to be preceded by audio-
MalformedPrefix,
/// seconds are supposed to be followed by .
MalformedSuffix,
/// could not parse the second as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<(Second, &str), TakeError> {
let (_prefix, path) = path.split_once("audio-").context(MalformedPrefixSnafu)?;
let (second, rest) = path.split_once('.').context(MalformedSuffixSnafu)?;
let second = second.parse().context(ParseIntegerSnafu)?;
Ok((second, rest))
}

View File

@@ -0,0 +1,25 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use twilight_model::id::{Id, marker::UserMarker};
pub type User = Option<Id<UserMarker>>;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// users are supposed to be terminated by .wav
Malformed,
/// could not parse the user ID
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<User, TakeError> {
let user = path.strip_suffix(".wav").context(MalformedSnafu)?;
let user = match user {
"UNKNOWN" => None,
user => Some(user.parse().context(ParseIntegerSnafu)?),
};
Ok(user)
}

View File

@@ -0,0 +1,25 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::str::FromStr;
use twilight_model::id::Id;
use twilight_model::id::marker::ChannelMarker;
pub type VoiceChannel = Id<ChannelMarker>;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// voice channels are supposed to be followed by -
Malformed,
/// could not parse the voice channel ID
ParseIdError {
source: <VoiceChannel as FromStr>::Err,
},
}
pub fn take(path: &str) -> Result<(VoiceChannel, &str), TakeError> {
let (voice_channel, rest) = path.split_once('-').context(MalformedSnafu)?;
let voice_channel = voice_channel.parse().context(ParseIdSnafu)?;
Ok((voice_channel, rest))
}

View File

@@ -0,0 +1,51 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, ListError, RecordingManager};
pub type Year = i32;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// years are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the year as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<(Year, &str), TakeError> {
let (year, rest) = path.split_once('/').context(NotADirectorySnafu)?;
let year = year.parse().context(ParseIntegerSnafu)?;
Ok((year, rest))
}
#[derive(Debug, Snafu)]
pub enum YearEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a year
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn years(
&self,
) -> Result<impl TryStream<Ok = Year, Error = YearEntryError> + Unpin, ListError> {
let lister = self.operator.lister("").await.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| YearEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(year, _rest)| year)
.context(ParseSnafu),
)
}))
}
}

98
src/render_data.rs Normal file
View File

@@ -0,0 +1,98 @@
use std::{convert::Infallible, fmt::Display, io::Cursor};
use hound::{SampleFormat, WavSpec};
use opendal::Operator;
use time::{UtcDateTime, format_description::StaticFormatDescription, macros::format_description};
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker},
};
#[derive(Debug, Clone)]
pub struct RenderManager {
operator: Operator,
}
impl RenderManager {
pub fn new(operator: Operator) -> Self {
Self { operator }
}
}
#[derive(Debug, Clone)]
pub struct Render {
pub start: UtcDateTime,
pub end: UtcDateTime,
pub guild_id: Id<GuildMarker>,
pub voice_channel_id: Id<ChannelMarker>,
}
const DATE_FORMAT: StaticFormatDescription =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second].[subsecond]Z");
impl Display for Render {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
start,
end,
guild_id,
voice_channel_id,
} = self;
let start = start.format(&DATE_FORMAT).unwrap();
let end = end.format(&DATE_FORMAT).unwrap();
write!(f, "{guild_id}/{voice_channel_id}/{start}_{end}.wav")
}
}
#[derive(Debug)]
pub struct RenderData {
pub channels: opus2::Channels,
pub sample_rate: u32,
pub samples: Vec<i16>,
}
impl RenderManager {
pub async fn write(
&self,
render: &Render,
RenderData {
channels,
sample_rate,
samples,
}: RenderData,
) -> Result<
(),
Infallible, // TODO: a real error type
> {
let channels = channels as u16;
let wav_spec = WavSpec {
channels,
sample_rate,
bits_per_sample: 16,
sample_format: SampleFormat::Int,
};
let mut buffer = Vec::new();
let writer = Cursor::new(&mut buffer);
let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO");
let mut sample_writer = wav_writer.get_i16_writer(samples.len() as u32);
for sample in samples {
sample_writer.write_sample(sample);
}
sample_writer.flush().expect("TODO");
wav_writer.finalize().expect("TODO");
let path = render.to_string();
self.operator.write(&path, buffer).await.expect("TODO");
Ok(())
}
}

View File

@@ -14,7 +14,7 @@ impl FromStr for Storage {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let uri = s.into_operator_uri()?;
let operator = Operator::from_uri(&uri)?;
let uri = uri.into();
Ok(Self { uri, operator })

View File

@@ -18,78 +18,71 @@ pub type GuildVoiceChannelToTextChannel =
pub type VCsInGuild =
OneToManyUniqueBTreeMapWithData<Id<ChannelMarker>, Id<UserMarker>, UserInVCData>;
pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>;
pub type VCsWatcher = watch::Sender<VCs>;
pub type VCsSender = watch::Sender<VCs>;
#[tracing::instrument(skip(discord_client), ret)]
#[tracing::instrument(skip(vcs_sender, discord_client))]
async fn initialize_user_in_vc(
vcs_sender: &VCsSender,
discord_client: &twilight_http::Client,
guild_id: Id<GuildMarker>,
user_id: Id<UserMarker>,
) -> Option<(Id<ChannelMarker>, UserInVCData)> {
) {
if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await
&& let Ok(voice_state) = voice_state_res.model().await
{
tracing::info!(?user_id, ?voice_state);
let voice_status = VoiceStatus::builder()
.self_deafened(voice_state.self_deaf)
.self_muted(voice_state.self_mute)
.server_deafened(voice_state.deaf)
.server_muted(voice_state.mute)
.camming(voice_state.self_video)
.streaming(voice_state.self_stream)
.build();
let user_in_vc_data = voice_status.into();
if let Some(voice_channel_id) = voice_state.channel_id {
let voice_status = VoiceStatus::builder()
.self_deafened(voice_state.self_deaf)
.self_muted(voice_state.self_mute)
.server_deafened(voice_state.deaf)
.server_muted(voice_state.mute)
.camming(voice_state.self_video)
.streaming(voice_state.self_stream)
.build();
let user_in_vc_data = voice_status.into();
voice_state
.channel_id
.map(|channel_id| (channel_id, user_in_vc_data))
} else {
None // TODO
vcs_sender.send_modify(|vcs| {
vcs.entry(guild_id)
.or_default()
.insert(voice_channel_id, user_id, user_in_vc_data);
});
}
}
}
#[tracing::instrument(skip(discord_client), ret)]
#[tracing::instrument(skip(vcs_sender, discord_client))]
async fn initialize_server_vcs(
vcs_sender: &VCsSender,
discord_client: &twilight_http::Client,
id: Id<GuildMarker>,
) -> VCsInGuild {
) {
if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await
&& let Ok(guild_members) = guild_members_res.model().await
{
FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move {
(
member.user.id,
initialize_user_in_vc(discord_client, id, member.user.id).await,
)
}))
.filter_map(
|(user_id, channel_id_and_user_in_vc_data_option)| async move {
channel_id_and_user_in_vc_data_option
.map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data))
},
FuturesUnordered::from_iter(
guild_members.into_iter().map(|member| {
initialize_user_in_vc(vcs_sender, discord_client, id, member.user.id)
}),
)
.collect()
.await
} else {
Default::default()
}
}
#[tracing::instrument(skip(discord_client), ret)]
pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs {
#[tracing::instrument(skip(vcs_sender, discord_client))]
pub async fn initialize_vcs(vcs_sender: &VCsSender, discord_client: &twilight_http::Client) {
if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await
&& let Ok(guilds) = guilds_res.model().await
{
FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move {
let guild_vcs = initialize_server_vcs(discord_client, guild.id).await;
(guild.id, guild_vcs)
}))
FuturesUnordered::from_iter(
guilds
.into_iter()
.map(|guild| initialize_server_vcs(vcs_sender, discord_client, guild.id)),
)
.collect()
.await
} else {
Default::default()
}
}

View File

@@ -14,12 +14,12 @@ use crate::{OperatorExt, option_ext::OptionExt as _, user_capnp};
pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true;
#[derive(Debug, Clone)]
pub struct UserDataManager {
pub struct UserManager {
operator: Operator,
cache: Cache<Id<UserMarker>, Bytes>,
}
impl UserDataManager {
impl UserManager {
pub fn new(operator: Operator) -> Self {
Self {
operator,
@@ -83,7 +83,7 @@ pub enum EntryError {
ParsePathError { source: ParsePathError },
}
impl UserDataManager {
impl UserManager {
pub async fn list(
&self,
) -> Result<impl TryStream<Ok = Id<UserMarker>, Error = EntryError> + Unpin, ListError> {
@@ -111,7 +111,7 @@ pub enum ReadError {
DecompressionError { source: std::io::Error },
}
impl UserDataManager {
impl UserManager {
async fn read(&self, id: Id<UserMarker>) -> Result<Bytes, ReadError> {
self.cache
.try_get_with::<_, ReadError>(id, async {
@@ -168,7 +168,7 @@ pub enum WriteError {
FinalizeError { source: std::io::Error },
}
impl UserDataManager {
impl UserManager {
async fn write(&self, id: Id<UserMarker>, bytes: Bytes) -> Result<(), WriteError> {
let path = path(id);
@@ -207,7 +207,7 @@ pub enum WithError {
DeserializeError { source: capnp::Error },
}
impl UserDataManager {
impl UserManager {
pub async fn with<R>(
&self,
id: Id<UserMarker>,
@@ -245,7 +245,7 @@ pub enum UpdateError {
WriteError { source: WriteError },
}
impl UserDataManager {
impl UserManager {
pub async fn update<R>(
&self,
id: Id<UserMarker>,

View File

@@ -1,20 +1,20 @@
use typed_builder::TypedBuilder;
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Microphone {
Unmuted,
ServerMuted,
Muted,
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Headphone {
Undeafened,
ServerDeafened,
Deafened,
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Camera {
Showing,
Off,
@@ -30,7 +30,7 @@ impl From<bool> for Camera {
}
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Stream {
Sharing,
None,
@@ -46,7 +46,7 @@ impl From<bool> for Stream {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct UserInVCData {
pub microphone: Microphone,
pub headphone: Headphone,
@@ -54,7 +54,7 @@ pub struct UserInVCData {
pub stream: Stream,
}
#[derive(Debug, TypedBuilder)]
#[derive(Debug, Clone, TypedBuilder)]
pub struct VoiceStatus {
server_deafened: bool,
self_deafened: bool,