Compare commits

...

19 Commits

Author SHA1 Message Date
e6cd882b32 fix: crash and graceful shutdown improvements 2026-06-13 00:10:54 -04:00
ce77590777 fix: watchdog panicking 2026-06-12 23:53:17 -04:00
50fc35883d chore: remove "this process is still alive" that I forgot to delete 2026-06-12 23:45:30 -04:00
fa0093d582 feat: announce disconnection upon deadlock detection 2026-06-12 20:29:50 -04:00
c7300a9e6d feat: watchdog for detecting tokio runtime deadlock (I wish I could prevent it, but I don't know what's causing it) 2026-06-12 20:01:46 -04:00
a0a0632a1d chore: raise default audio sample rate to 24 khz 2026-06-12 00:02:56 -04:00
a22965a3be feat: make reconnecting after discord disconnect faster (not obligate an entire process restart) 2026-06-12 00:01:04 -04:00
c8d676693d fix: reduce rebuilding from extraneous files 2026-06-12 00:00:07 -04:00
ba0450e999 fix: coerce sample_rate to u64 correctly 2026-06-01 23:27:46 -04:00
a7f11a7202 fix: cast sample_rate to u64 in calculations 2026-06-01 23:23:43 -04:00
4fa25305b5 fix: use u64 to prevent overflowing (I can't believe I was stupid enough to think u32 could fit it) and add logging in case I was mistaken 2026-06-01 23:18:55 -04:00
65611d676d chore: reduce logging of unnecessary things in initialize_vcs 2026-05-31 16:11:38 -04:00
c8fd99cdf1 fix: in /info move the user mention to the description field (where mentions are allowed) 2026-05-31 15:30:04 -04:00
c03ccc9d39 chore: add more logging in /render to debug 2026-05-31 15:28:16 -04:00
5e989289bd feat: don't block bot startup waiting to initialize vcs 2026-05-31 14:24:09 -04:00
f5b6dc5c76 chore: remove /join and /leave to prevent confusion 2026-05-31 02:46:47 -04:00
03689f4764 fix: don't log interaction in /render 2026-05-30 01:08:59 -04:00
20bb0e4c31 fix: switch to async read-write lock instead of sync mutex in call handler to try to prevent deadlock 2026-05-29 22:22:55 -04:00
c6aa8e5d13 fix: scope borrow in heat seeking to try to prevent deadlock 2026-05-29 22:22:15 -04:00
12 changed files with 356 additions and 539 deletions

8
Cargo.lock generated
View File

@@ -1777,11 +1777,13 @@ dependencies = [
"extension-traits", "extension-traits",
"futures", "futures",
"hound", "hound",
"humantime",
"itertools", "itertools",
"moka", "moka",
"opendal", "opendal",
"opus2", "opus2",
"patricia_tree 0.10.1", "patricia_tree 0.10.1",
"rayon",
"rhai", "rhai",
"rustls 0.23.40", "rustls 0.23.40",
"secrecy 0.10.3", "secrecy 0.10.3",
@@ -2517,6 +2519,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.9.0" version = "1.9.0"

View File

@@ -14,6 +14,7 @@ dashmap = "6.1.0"
extension-traits = "2.0.2" extension-traits = "2.0.2"
futures = "0.3.32" futures = "0.3.32"
hound = "3.5.1" hound = "3.5.1"
humantime = "2.3.0"
itertools = "0.14.0" itertools = "0.14.0"
moka = { version = "0.12.15", features = ["future"] } moka = { version = "0.12.15", features = ["future"] }
opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [ opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [
@@ -55,6 +56,7 @@ opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109
] } ] }
opus2 = "0.4.0" opus2 = "0.4.0"
patricia_tree = "0.10.1" patricia_tree = "0.10.1"
rayon = "1.12"
rhai = { version = "1.23.6", features = ["sync"] } rhai = { version = "1.23.6", features = ["sync"] }
rustls = "0.23" rustls = "0.23"
secrecy = { version = "0.10.3", features = ["serde"] } secrecy = { version = "0.10.3", features = ["serde"] }

View File

@@ -1,6 +1,9 @@
use shadow_rs::{BuildPattern, ShadowBuilder}; use shadow_rs::{BuildPattern, ShadowBuilder};
fn main() { fn main() {
println!("cargo::rerun-if-changed=bot.capnp");
println!("cargo::rerun-if-changed=user.capnp");
capnpc::CompilerCommand::new() capnpc::CompilerCommand::new()
.file("bot.capnp") .file("bot.capnp")
.file("user.capnp") .file("user.capnp")

View File

@@ -11,11 +11,9 @@ use songbird::{
CoreEvent, Event, EventContext, EventHandler, Songbird, CoreEvent, Event, EventContext, EventHandler, Songbird,
driver::{Channels, SampleRate}, driver::{Channels, SampleRate},
}; };
use std::{ use std::{sync::Arc, time::Instant};
sync::{Arc, Mutex},
time::Instant,
};
use time::UtcDateTime; use time::UtcDateTime;
use tokio::sync::RwLock;
use twilight_model::id::{ use twilight_model::id::{
Id, Id,
marker::{ChannelMarker, GuildMarker, UserMarker}, marker::{ChannelMarker, GuildMarker, UserMarker},
@@ -31,7 +29,7 @@ struct Handler {
guild_id: Id<GuildMarker>, guild_id: Id<GuildMarker>,
channel_id: Id<ChannelMarker>, channel_id: Id<ChannelMarker>,
known_ssrcs: Arc<Mutex<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>, known_ssrcs: Arc<RwLock<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>,
audio_channels: u16, audio_channels: u16,
audio_sample_rate: u32, audio_sample_rate: u32,
@@ -53,15 +51,15 @@ impl EventHandler for Handler {
let user_id = Id::new(user_id.0); let user_id = Id::new(user_id.0);
self.known_ssrcs self.known_ssrcs
.lock() .write()
.unwrap() .await
.insert(user_id, speaking.ssrc); .insert(user_id, speaking.ssrc);
} }
} }
EventContext::VoiceTick(voice_tick) => { EventContext::VoiceTick(voice_tick) => {
for (ssrc, voice_data) in &voice_tick.speaking { for (ssrc, voice_data) in &voice_tick.speaking {
let user_id = { let user_id = {
let known_ssrcs = self.known_ssrcs.lock().unwrap(); let known_ssrcs = self.known_ssrcs.read().await;
known_ssrcs.get_left_for(ssrc).cloned() known_ssrcs.get_left_for(ssrc).cloned()
}; };
@@ -126,7 +124,6 @@ impl EventHandler for Handler {
let channels = self.audio_channels; let channels = self.audio_channels;
let sample_rate = self.audio_sample_rate; let sample_rate = self.audio_sample_rate;
let recording_manager = self.recording_manager.clone(); let recording_manager = self.recording_manager.clone();
let samples = pcm.clone(); let samples = pcm.clone();

View File

@@ -145,7 +145,7 @@ pub async fn handle(state: State, interaction: Interaction) {
.interaction(state.discord_application_id) .interaction(state.discord_application_id)
.create_followup(&interaction.token) .create_followup(&interaction.token)
.embeds(&[EmbedBuilder::new() .embeds(&[EmbedBuilder::new()
.author(EmbedAuthorBuilder::new(user_mention)) .description(user_mention)
.field(EmbedFieldBuilder::new("Consent", format!("{consent:?}")).build()) .field(EmbedFieldBuilder::new("Consent", format!("{consent:?}")).build())
.field( .field(
EmbedFieldBuilder::new( EmbedFieldBuilder::new(

View File

@@ -1,170 +0,0 @@
use crate::{VCs, call::join_and_record, command::State};
use snafu::{OptionExt as _, Snafu};
use std::sync::LazyLock;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
channel::message::{Embed, MessageFlags},
http::interaction::{InteractionResponse, InteractionResponseType},
id::{
Id,
marker::{ChannelMarker, GuildMarker},
},
};
use twilight_util::builder::{
InteractionResponseDataBuilder, command::CommandBuilder, embed::EmbedBuilder,
};
const NAME: &str = "join";
const DESCRIPTION: &str = "The bot will join the same VC as you (with intention to record)";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[derive(Debug, Snafu)]
enum GetGuildAndVoiceChannelIdError {
/// this command was not used inside a guild (Discord server)
NotInGuild,
/// there is no user who invoked this command
NoUser,
/// the user is not in a voice chat in this guild
UserNotInVC,
}
#[tracing::instrument]
fn get_guild_and_voice_channel_id(
interaction: &Interaction,
vcs: &VCs,
) -> Result<(Id<GuildMarker>, Id<ChannelMarker>), GetGuildAndVoiceChannelIdError> {
let guild_id = interaction.guild_id.context(NotInGuildSnafu)?;
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.context(NoUserSnafu)?;
let &voice_channel_id = vcs
.get(&guild_id)
.context(UserNotInVCSnafu)?
.get_left_for(&user_id)
.context(UserNotInVCSnafu)?;
Ok((guild_id, voice_channel_id))
}
fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Embed {
match error {
GetGuildAndVoiceChannelIdError::NotInGuild => {
EmbedBuilder::new().title("Use this in a server").description("This bot can't find a VC to join if the command is used outside of a server (you might've used it in a DM?).").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::NoUser => {
EmbedBuilder::new().title("Not invoked by a user").description("This command works by joining the same VC as the user, but this bot didn't receive any user data. So did no user invoke it?! (This error should be impossible!)").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::UserNotInVC => {
EmbedBuilder::new().title("You're not in a VC").description("This bot can't follow you into VC if you aren't in one in this server.").validate().unwrap().build()
},
}
}
#[tracing::instrument(skip(state))]
pub async fn handle(state: State, interaction: Interaction) {
let guild_and_voice_channel_id_res =
{ get_guild_and_voice_channel_id(&interaction, &state.vcs_sender.borrow()) };
let (guild_id, voice_channel_id) = match guild_and_voice_channel_id_res {
Ok((guild_id, voice_channel_id)) => (guild_id, voice_channel_id),
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([get_guild_and_vc_error_to_embed(error)])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::DeferredChannelMessageWithSource,
data: None,
},
)
.await
.expect("TODO");
match join_and_record()
.audio_channels(state.audio_channels.into())
.audio_sample_rate(state.audio_sample_rate.into())
.guild_id(guild_id)
.recording_manager(state.recording_manager)
.songbird(&state.songbird)
.user_manager(state.user_manager)
.voice_channel_id(voice_channel_id)
.call()
.await
{
Ok(()) => {
let channel_mention = format!("<#{voice_channel_id}>");
let info_mention = format!(
"</{}:{}>",
state.discord_info_command_name, state.discord_info_command_id
);
let opt_in_mention = format!(
"</{}:{}>",
state.discord_opt_in_command_name, state.discord_opt_in_command_id
);
let opt_out_mention = format!(
"</{}:{}>",
state.discord_opt_out_command_name, state.discord_opt_out_command_id
);
state
.discord_client
.interaction(state.discord_application_id)
.update_response(
&interaction.token,
).embeds(Some(&[
EmbedBuilder::new()
.title("Joined VC to record")
.description(format!("This bot joined {channel_mention} and intends to record. You can opt out with {opt_out_mention} or explicitly opt in with {opt_in_mention} (I'd appreciate this one). Please use {info_mention} for more information about this bot."))
.validate()
.unwrap()
.build()
]))
.await
.expect("TODO");
}
Err(join_error) => {
tracing::error!(?join_error);
let _ = state.songbird.remove(guild_id).await;
}
}
}

View File

@@ -1,162 +0,0 @@
use crate::VCs;
use crate::command::State;
use snafu::{OptionExt, Snafu};
use std::sync::LazyLock;
use twilight_model::channel::message::{Embed, MessageFlags};
use twilight_model::http::interaction::{InteractionResponse, InteractionResponseType};
use twilight_model::id::marker::UserMarker;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
id::{
Id,
marker::{ChannelMarker, GuildMarker},
},
};
use twilight_util::builder::InteractionResponseDataBuilder;
use twilight_util::builder::command::CommandBuilder;
use twilight_util::builder::embed::EmbedBuilder;
const NAME: &str = "leave";
const DESCRIPTION: &str = "The bot will leave the VC it's in (so it won't record anyone anymore)";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[derive(Debug, Snafu)]
pub enum GetGuildAndVoiceChannelIdError {
/// this command was not used inside a guild (Discord server)
NotInGuild,
/// there is no user who invoked this command
NoUser,
/// the bot is not in a voice chat in this guild
BotNotInVC,
/// the user is not in a voice chat with the bot in this guild
UserNotInVCWithBot,
}
#[tracing::instrument]
pub fn get_user_and_guild_and_voice_channel_id(
bot_user_id: Id<UserMarker>,
interaction: &Interaction,
vcs: &VCs,
) -> Result<(Id<GuildMarker>, Id<ChannelMarker>), GetGuildAndVoiceChannelIdError> {
let guild_id = interaction.guild_id.context(NotInGuildSnafu)?;
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.context(NoUserSnafu)?;
let &bot_voice_channel_id = vcs
.get(&guild_id)
.context(BotNotInVCSnafu)?
.get_left_for(&bot_user_id)
.context(BotNotInVCSnafu)?;
let &user_voice_channel_id = vcs
.get(&guild_id)
.context(UserNotInVCWithBotSnafu)?
.get_left_for(&user_id)
.context(UserNotInVCWithBotSnafu)?;
if user_voice_channel_id != bot_voice_channel_id {
return Err(GetGuildAndVoiceChannelIdError::UserNotInVCWithBot);
}
Ok((guild_id, bot_voice_channel_id))
}
fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Embed {
match error {
GetGuildAndVoiceChannelIdError::NotInGuild => {
EmbedBuilder::new().title("Use this in a server").description("This bot can't tell which VC to leave if the command is used outside of a server (you might've used it in a DM?).").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::NoUser => {
EmbedBuilder::new().title("Not invoked by a user").description("This command works by joining the same VC as the user, but this bot didn't receive any user data. So did no user invoke it?! (This error should be impossible!)").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::BotNotInVC => {
EmbedBuilder::new().title("Not in a VC").description("This bot can't leave VC if it isn't in one in this server.").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::UserNotInVCWithBot => {
EmbedBuilder::new().title("Not in a VC with the Bot").description("You have to be in the VC with the bot to make it leave (to prevent griefing and abuse).").validate().unwrap().build()
},
}
}
#[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) {
let guild_and_voice_channel_id_result = {
get_user_and_guild_and_voice_channel_id(
state.discord_user_id,
&interaction,
&state.vcs_sender.borrow(),
)
};
let (guild_id, voice_channel_id) = match guild_and_voice_channel_id_result {
Ok((guild_id, voice_channel_id)) => (guild_id, voice_channel_id),
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([get_guild_and_vc_error_to_embed(error)])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
state.songbird.leave(guild_id).await.expect("TODO");
tracing::error!("TODO: successfully left the call");
let channel_mention = format!("<#{voice_channel_id}>");
state
.discord_client
.interaction(state.discord_application_id)
.create_response(interaction.id, &interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([
EmbedBuilder::new()
.title("Left VC")
.description(format!(
"This bot left {channel_mention} (and is thereby unable to record anymore)."
))
.validate()
.unwrap()
.build()
])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},)
.await
.expect("TODO");
}

View File

@@ -21,8 +21,6 @@ use crate::{
}; };
pub mod info; pub mod info;
pub mod join;
pub mod leave;
pub mod opt_in; pub mod opt_in;
pub mod opt_out; pub mod opt_out;
pub mod render; pub mod render;
@@ -65,8 +63,6 @@ where
pub fn all() -> Vec<(&'static Command, BoxedHandler)> { pub fn all() -> Vec<(&'static Command, BoxedHandler)> {
vec![ vec![
(&info::COMMAND, box_handler(info::handle)), (&info::COMMAND, box_handler(info::handle)),
(&join::COMMAND, box_handler(join::handle)),
(&leave::COMMAND, box_handler(leave::handle)),
(&opt_in::COMMAND, box_handler(opt_in::handle)), (&opt_in::COMMAND, box_handler(opt_in::handle)),
(&opt_out::COMMAND, box_handler(opt_out::handle)), (&opt_out::COMMAND, box_handler(opt_out::handle)),
(&render::COMMAND, box_handler(render::handle)), (&render::COMMAND, box_handler(render::handle)),

View File

@@ -189,7 +189,7 @@ fn parse_options(interaction: &Interaction) -> Result<Options, ParseOptionsError
}) })
} }
#[tracing::instrument(skip(state))] #[tracing::instrument(skip(state, interaction))]
pub async fn handle(state: State, interaction: Interaction) { pub async fn handle(state: State, interaction: Interaction) {
let Some(guild_id) = interaction.guild_id else { let Some(guild_id) = interaction.guild_id else {
state state
@@ -294,8 +294,8 @@ pub async fn handle(state: State, interaction: Interaction) {
let channels = state.audio_channels.into(); let channels = state.audio_channels.into();
let sample_rate = state.audio_sample_rate.into(); let sample_rate = state.audio_sample_rate.into();
let total_samples = (duration.whole_seconds() as u32 * sample_rate) let total_samples = (duration.whole_seconds() as u64 * sample_rate as u64)
+ (duration.subsec_microseconds() as u32 * sample_rate / 1_000_000); + (duration.subsec_microseconds() as u64 * sample_rate as u64 / 1_000_000);
let mut composite = vec![0; total_samples as usize]; let mut composite = vec![0; total_samples as usize];
@@ -340,11 +340,27 @@ pub async fn handle(state: State, interaction: Interaction) {
let after_start = datetime - start; let after_start = datetime - start;
let origin = (after_start.whole_seconds() as u32 * sample_rate) let progress_by_time = after_start / duration;
+ (after_start.subsec_microseconds() as u32 * sample_rate / 1_000_000);
let origin = (after_start.whole_seconds() as u64 * sample_rate as u64)
+ (after_start.subsec_microseconds() as u64 * sample_rate as u64 / 1_000_000);
let origin = origin as usize; let origin = origin as usize;
tracing::debug!(origin, total_samples); let progress_by_sample = (origin as f64) / (total_samples as f64);
let samples_length = samples.len();
let extent = origin + samples_length;
tracing::debug!(
progress_by_time,
progress_by_sample,
?after_start,
?duration,
origin,
total_samples,
samples_length,
extent
);
for (i, sample) in samples.into_iter().enumerate() { for (i, sample) in samples.into_iter().enumerate() {
if let Some(composite_sample) = composite.get_mut(origin + i) { if let Some(composite_sample) = composite.get_mut(origin + i) {
@@ -361,6 +377,8 @@ pub async fn handle(state: State, interaction: Interaction) {
?second, ?second,
?microsecond, ?microsecond,
origin, origin,
samples_length,
extent,
i, i,
total_samples, total_samples,
"out of range" "out of range"

View File

@@ -30,48 +30,53 @@ pub async fn heat_seek(state: State) {
let mut vcs_in_guild_senders = BTreeMap::default(); let mut vcs_in_guild_senders = BTreeMap::default();
loop { loop {
for (&guild_id, vcs_in_guild) in &*vcs_watcher.borrow() { {
let vcs_in_guild_sender = vcs_in_guild_senders.entry(guild_id).or_insert_with(|| { for (&guild_id, vcs_in_guild) in &*vcs_watcher.borrow() {
let (vcs_in_guild_sender, vcs_in_guild_watcher) = let vcs_in_guild_sender =
watch::channel(Default::default()); vcs_in_guild_senders.entry(guild_id).or_insert_with(|| {
let (channel_heat_sender, channel_heat_watcher) = let (vcs_in_guild_sender, vcs_in_guild_watcher) =
watch::channel(Default::default()); watch::channel(Default::default());
let (heat_map_sender, heat_map_watcher) = watch::channel(Default::default()); let (channel_heat_sender, channel_heat_watcher) =
let (hottest_vc_sender, hottest_vc_watcher) = watch::channel(Default::default()); watch::channel(Default::default());
let (heat_map_sender, heat_map_watcher) =
watch::channel(Default::default());
let (hottest_vc_sender, hottest_vc_watcher) =
watch::channel(Default::default());
tokio::spawn( tokio::spawn(
evaluate_heat() evaluate_heat()
.bot_manager(state.bot_manager.clone()) .bot_manager(state.bot_manager.clone())
.bot_owner_user_id(state.discord_bot_owner_user_id) .bot_owner_user_id(state.discord_bot_owner_user_id)
.bot_user_id(state.discord_user_id) .bot_user_id(state.discord_user_id)
.cancellation_token(state.cancellation_token.clone()) .cancellation_token(state.cancellation_token.clone())
.channel_heat_sender(channel_heat_sender) .channel_heat_sender(channel_heat_sender)
.vcs_in_guild_watcher(vcs_in_guild_watcher) .vcs_in_guild_watcher(vcs_in_guild_watcher)
.call(), .call(),
); );
tokio::spawn( tokio::spawn(
map_heat() map_heat()
.cancellation_token(state.cancellation_token.clone()) .cancellation_token(state.cancellation_token.clone())
.channel_heat_watcher(channel_heat_watcher) .channel_heat_watcher(channel_heat_watcher)
.heat_map_sender(heat_map_sender) .heat_map_sender(heat_map_sender)
.call(), .call(),
); );
tokio::spawn( tokio::spawn(
track_hottest_vc() track_hottest_vc()
.cancellation_token(state.cancellation_token.clone()) .cancellation_token(state.cancellation_token.clone())
.heat_map_watcher(heat_map_watcher) .heat_map_watcher(heat_map_watcher)
.hottest_vc_sender(hottest_vc_sender) .hottest_vc_sender(hottest_vc_sender)
.call(), .call(),
); );
tokio::spawn(follow_hottest_vc( tokio::spawn(follow_hottest_vc(
state.clone(), state.clone(),
guild_id, guild_id,
hottest_vc_watcher, hottest_vc_watcher,
)); ));
vcs_in_guild_sender vcs_in_guild_sender
}); });
vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone())); vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone()));
}
} }
if matches!( if matches!(

View File

@@ -4,10 +4,19 @@ use fomo_reducer::{
RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command, RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command,
heat_seek, initialize_vcs, update_vcs, heat_seek, initialize_vcs, update_vcs,
}; };
use futures::{StreamExt as _, stream::FuturesUnordered};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use secrecy::{ExposeSecret, SecretString}; use secrecy::{ExposeSecret, SecretString};
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use songbird::{Config, Songbird, driver::DecodeConfig, shards::TwilightMap}; use songbird::{Config, Songbird, driver::DecodeConfig, shards::TwilightMap};
use std::{collections::BTreeMap, fmt::Debug, str::FromStr, sync::Arc, time::Duration}; use std::{
collections::BTreeMap,
fmt::{Debug, Display},
num::NonZero,
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::{select, signal::ctrl_c, task::JoinSet}; use tokio::{select, signal::ctrl_c, task::JoinSet};
use tokio_util::{sync::CancellationToken, time::FutureExt as _}; use tokio_util::{sync::CancellationToken, time::FutureExt as _};
use tracing::Level; use tracing::Level;
@@ -15,7 +24,7 @@ use tracing_subscriber::{
EnvFilter, EnvFilter,
fmt::{format::FmtSpan, writer::MakeWriterExt}, fmt::{format::FmtSpan, writer::MakeWriterExt},
}; };
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt}; use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt as _};
use twilight_model::{ use twilight_model::{
application::interaction::InteractionData, application::interaction::InteractionData,
gateway::{ gateway::{
@@ -68,6 +77,29 @@ fn parse_guild_vc_to_text_channel(
Ok((guild, voice_channel, text_channel)) Ok((guild, voice_channel, text_channel))
} }
#[derive(Clone)]
struct HumanDuration(Duration);
impl FromStr for HumanDuration {
type Err = humantime::DurationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
humantime::parse_duration(s).map(Self)
}
}
impl Debug for HumanDuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Display for HumanDuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", humantime::format_duration(self.0))
}
}
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
struct AppArgs { struct AppArgs {
#[arg(long, env)] #[arg(long, env)]
@@ -80,7 +112,7 @@ struct AppArgs {
discord_nickname: Option<Arc<str>>, discord_nickname: Option<Arc<str>>,
#[arg(long, env)] #[arg(long, env)]
discord_status: Option<Arc<str>>, discord_status: Option<String>,
#[arg(long, env, value_parser = parse_guild_vc_to_text_channel)] #[arg(long, env, value_parser = parse_guild_vc_to_text_channel)]
discord_voice_channel_corresponding_text_channel: discord_voice_channel_corresponding_text_channel:
@@ -89,7 +121,7 @@ struct AppArgs {
#[arg(long, env, default_value_t = AudioChannels::Mono)] #[arg(long, env, default_value_t = AudioChannels::Mono)]
audio_channels: AudioChannels, audio_channels: AudioChannels,
#[arg(long, env, default_value_t = AudioSampleRate::Hz12000)] #[arg(long, env, default_value_t = AudioSampleRate::Hz24000)]
audio_sample_rate: AudioSampleRate, audio_sample_rate: AudioSampleRate,
#[arg(long, env)] #[arg(long, env)]
@@ -103,6 +135,15 @@ struct AppArgs {
#[arg(long, env)] #[arg(long, env)]
render_data: Storage, render_data: Storage,
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(120)))]
watchdog_warmup: HumanDuration,
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))]
watchdog_frequency: HumanDuration,
#[arg(long, env, default_value_t = 8.try_into().unwrap())]
watchdog_channel_size: NonZero<usize>,
} }
#[derive(Parser)] #[derive(Parser)]
@@ -166,6 +207,9 @@ async fn main() -> Result<(), MainError> {
user_data, user_data,
recording_data, recording_data,
render_data, render_data,
watchdog_warmup: HumanDuration(watchdog_warmup),
watchdog_frequency: HumanDuration(watchdog_frequency),
watchdog_channel_size,
} = app_args; } = app_args;
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
@@ -216,32 +260,6 @@ async fn main() -> Result<(), MainError> {
let discord_application_id = current_application.id; let discord_application_id = current_application.id;
let intents = Intents::GUILD_VOICE_STATES;
let config = twilight_gateway::Config::new(discord_token.expose_secret().to_owned(), intents);
let shards = twilight_gateway::create_recommended(&discord_client, config, |_id, builder| {
builder.build()
})
.await
.expect("TODO");
let shards = Vec::from_iter(shards);
let senders = TwilightMap::new(
shards
.iter()
.map(|shard| (shard.id().number(), shard.sender()))
.collect(),
);
let senders = Arc::new(senders);
let songbird = Songbird::twilight(senders, discord_user_id);
songbird.set_config(
Config::default().decode_mode(songbird::driver::DecodeMode::Decode(DecodeConfig::new(
audio_channels.into(),
audio_sample_rate.into(),
))),
);
let interaction_client = discord_client.interaction(discord_application_id); let interaction_client = discord_client.interaction(discord_application_id);
let commands = all_commands(); let commands = all_commands();
@@ -281,18 +299,15 @@ async fn main() -> Result<(), MainError> {
let discord_opt_in_command_id = discord_opt_in_command.id.expect("TODO"); let discord_opt_in_command_id = discord_opt_in_command.id.expect("TODO");
let discord_opt_out_command_id = discord_opt_out_command.id.expect("TODO"); let discord_opt_out_command_id = discord_opt_out_command.id.expect("TODO");
let discord_info_command_name = discord_info_command.name.into(); let discord_info_command_name: Arc<str> = discord_info_command.name.into();
let discord_opt_in_command_name = discord_opt_in_command.name.into(); let discord_opt_in_command_name: Arc<str> = discord_opt_in_command.name.into();
let discord_opt_out_command_name = discord_opt_out_command.name.into(); let discord_opt_out_command_name: Arc<str> = discord_opt_out_command.name.into();
let vcs = initialize_vcs(&discord_client).await;
let command_router = CommandRouter::from_iter(commands); let command_router = CommandRouter::from_iter(commands);
let command_router = Arc::new(command_router); let command_router = Arc::new(command_router);
let discord_client = Arc::new(discord_client); let discord_client = Arc::new(discord_client);
let songbird = Arc::new(songbird); let vcs_sender = VCsSender::new(Default::default());
let vcs_sender = VCsSender::new(vcs);
let bot_data = bot_data.into_inner(); let bot_data = bot_data.into_inner();
let recording_data = recording_data.into_inner(); let recording_data = recording_data.into_inner();
@@ -320,58 +335,6 @@ async fn main() -> Result<(), MainError> {
let discord_voice_channel_corresponding_text_channel = let discord_voice_channel_corresponding_text_channel =
Arc::new(discord_voice_channel_corresponding_text_channel); Arc::new(discord_voice_channel_corresponding_text_channel);
let state = State {
audio_channels,
audio_sample_rate,
bot_manager,
cancellation_token: cancellation_token.clone(),
discord_application_id,
discord_bot_owner_user_id,
discord_client,
discord_info_command_id,
discord_info_command_name,
discord_opt_in_command_id,
discord_opt_in_command_name,
discord_opt_out_command_id,
discord_opt_out_command_name,
discord_user_id,
discord_voice_channel_corresponding_text_channel,
recording_manager,
render_manager,
songbird,
user_manager,
vcs_sender,
};
let heat_seeking = tokio::spawn(heat_seek(state.clone()));
if let Some(discord_status) = discord_status {
shards.iter().for_each(|shard| {
shard.command(
&UpdatePresence::new(
vec![
MinimalActivity {
kind: ActivityType::Listening,
name: (*discord_status).to_owned(),
url: None,
}
.into(),
],
false,
None,
Status::Idle,
)
.expect("TODO"),
)
});
}
let run_shards = shards
.into_iter()
.map(|shard| handle_events(command_router.clone(), state.clone(), shard));
let run_shards = JoinSet::from_iter(run_shards);
let run_shards = run_shards.join_all();
tokio::spawn({ tokio::spawn({
let cancellation_token = cancellation_token.clone(); let cancellation_token = cancellation_token.clone();
async move { async move {
@@ -382,32 +345,196 @@ async fn main() -> Result<(), MainError> {
} }
}); });
tokio::spawn(async { let (mut watchdog_tx, mut watchdog_rx) =
let duration = Duration::from_secs(120); futures::channel::mpsc::channel(watchdog_channel_size.get());
let mut interval = tokio::time::interval(duration);
loop { std::thread::spawn({
interval.tick().await; let discord_voice_channel_corresponding_text_channel =
discord_voice_channel_corresponding_text_channel.clone();
let discord_client = discord_client.clone();
let vcs_watcher = vcs_sender.subscribe();
tracing::debug!("this process is still alive"); move || {
std::thread::sleep(watchdog_warmup);
loop {
tracing::debug!("waiting to send check-in");
if watchdog_tx.try_send(()).is_err() {
tracing::error!("tokio runtime deadlocked");
vcs_watcher
.borrow()
.par_iter()
.for_each(|(&guild_id, vcs_in_guild)| {
if let Some(&voice_channel_id) =
vcs_in_guild.get_left_for(&discord_user_id)
{
let text_channel_id =
discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&voice_channel_id).copied()
})
.unwrap_or(voice_channel_id);
tokio::runtime::Runtime::new().unwrap().block_on(discord_client.create_message(text_channel_id).content("so sorry I died, I'm in purgatory now, I don't like it here.\nbut I will be back in 5-20 minutes (even if it says I'm still there, I'm not currently recording and will be disconnected soon before later reconnecting and announcing recording again)").into_future());
}
});
std::process::exit(1);
}
std::thread::sleep(watchdog_frequency);
}
} }
}); });
let finished_naturally = async move { tokio::spawn(async move {
heat_seeking.await.unwrap(); tokio::time::sleep(watchdog_warmup).await;
run_shards.await;
};
tokio::pin!(finished_naturally);
select! { loop {
_ = &mut finished_naturally => { tracing::debug!("waiting to acknowledge the watchdog");
Ok(())
if watchdog_rx.recv().await.is_err() {
tracing::error!("watchdog died (this should be impossible)");
std::process::exit(1);
}
} }
() = cancellation_token.cancelled() => { });
tracing::warn!("waiting for tasks to gracefully shut down");
finished_naturally.await;
Err(MainError::Cancelled) loop {
tokio::spawn({
let vcs_sender = vcs_sender.clone();
let discord_client = discord_client.clone();
async move { initialize_vcs(&vcs_sender, &discord_client).await }
});
let intents = Intents::GUILD_VOICE_STATES;
let config =
twilight_gateway::Config::new(discord_token.expose_secret().to_owned(), intents);
let shards =
twilight_gateway::create_recommended(&discord_client, config, |_id, builder| {
builder.build()
})
.await
.expect("TODO");
let shards = Vec::from_iter(shards);
let senders = TwilightMap::new(
shards
.iter()
.map(|shard| (shard.id().number(), shard.sender()))
.collect(),
);
let senders = Arc::new(senders);
let songbird = Songbird::twilight(senders, discord_user_id);
songbird.set_config(
Config::default().decode_mode(songbird::driver::DecodeMode::Decode(DecodeConfig::new(
audio_channels.into(),
audio_sample_rate.into(),
))),
);
if let Some(discord_status) = &discord_status {
shards.iter().for_each(|shard| {
shard.command(
&UpdatePresence::new(
vec![
MinimalActivity {
kind: ActivityType::Listening,
name: discord_status.clone(),
url: None,
}
.into(),
],
false,
None,
Status::Idle,
)
.expect("TODO"),
)
});
}
let songbird = Arc::new(songbird);
let state = State {
audio_channels,
audio_sample_rate,
bot_manager: bot_manager.clone(),
cancellation_token: cancellation_token.clone(),
discord_application_id,
discord_bot_owner_user_id,
discord_client: discord_client.clone(),
discord_info_command_id,
discord_info_command_name: discord_info_command_name.clone(),
discord_opt_in_command_id,
discord_opt_in_command_name: discord_opt_in_command_name.clone(),
discord_opt_out_command_id,
discord_opt_out_command_name: discord_opt_out_command_name.clone(),
discord_user_id,
discord_voice_channel_corresponding_text_channel:
discord_voice_channel_corresponding_text_channel.clone(),
recording_manager: recording_manager.clone(),
render_manager: render_manager.clone(),
songbird,
user_manager: user_manager.clone(),
vcs_sender: vcs_sender.clone(),
};
let mut heat_seeking = tokio::spawn(heat_seek(state.clone()));
let run_shards = shards
.into_iter()
.map(|shard| handle_events(command_router.clone(), state.clone(), shard));
let mut run_shards = JoinSet::from_iter(run_shards);
select! {
_heat_seeking_exited = &mut heat_seeking => {
tracing::warn!("heat seeking exited, which shouldn't happen. let's try again");
continue;
}
_first_shard_exited = run_shards.join_next() => {
tracing::warn!("a shard exited when it's not supposed to, let's try reconnecting them all");
heat_seeking.abort();
continue;
}
() = cancellation_token.cancelled() => {
tracing::warn!("gracefully shutting down");
FuturesUnordered::from_iter(
vcs_sender
.borrow()
.iter()
.map(|(&guild_id, vcs_in_guild)| {
let discord_client = discord_client.clone();
let discord_voice_channel_corresponding_text_channel = discord_voice_channel_corresponding_text_channel.clone();
async move {
if let Some(&voice_channel_id) =
vcs_in_guild.get_left_for(&discord_user_id)
{
let text_channel_id =
discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&voice_channel_id).copied()
})
.unwrap_or(voice_channel_id);
let _ = discord_client.create_message(text_channel_id).content("probably about to be updated, back in 5-20 minutes").await;
}
}
})
).collect::<()>().await;
heat_seeking.await.unwrap();
run_shards.join_all().await;
return Err(MainError::Cancelled);
}
} }
} }
} }

View File

@@ -20,76 +20,69 @@ pub type VCsInGuild =
pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>; pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>;
pub type VCsSender = watch::Sender<VCs>; pub type VCsSender = watch::Sender<VCs>;
#[tracing::instrument(skip(discord_client), ret)] #[tracing::instrument(skip(vcs_sender, discord_client))]
async fn initialize_user_in_vc( async fn initialize_user_in_vc(
vcs_sender: &VCsSender,
discord_client: &twilight_http::Client, discord_client: &twilight_http::Client,
guild_id: Id<GuildMarker>, guild_id: Id<GuildMarker>,
user_id: Id<UserMarker>, user_id: Id<UserMarker>,
) -> Option<(Id<ChannelMarker>, UserInVCData)> { ) {
if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await
&& let Ok(voice_state) = voice_state_res.model().await && let Ok(voice_state) = voice_state_res.model().await
{ {
tracing::info!(?user_id, ?voice_state); tracing::info!(?user_id, ?voice_state);
let voice_status = VoiceStatus::builder() if let Some(voice_channel_id) = voice_state.channel_id {
.self_deafened(voice_state.self_deaf) let voice_status = VoiceStatus::builder()
.self_muted(voice_state.self_mute) .self_deafened(voice_state.self_deaf)
.server_deafened(voice_state.deaf) .self_muted(voice_state.self_mute)
.server_muted(voice_state.mute) .server_deafened(voice_state.deaf)
.camming(voice_state.self_video) .server_muted(voice_state.mute)
.streaming(voice_state.self_stream) .camming(voice_state.self_video)
.build(); .streaming(voice_state.self_stream)
let user_in_vc_data = voice_status.into(); .build();
let user_in_vc_data = voice_status.into();
voice_state vcs_sender.send_modify(|vcs| {
.channel_id vcs.entry(guild_id)
.map(|channel_id| (channel_id, user_in_vc_data)) .or_default()
} else { .insert(voice_channel_id, user_id, user_in_vc_data);
None // TODO });
}
} }
} }
#[tracing::instrument(skip(discord_client), ret)] #[tracing::instrument(skip(vcs_sender, discord_client))]
async fn initialize_server_vcs( async fn initialize_server_vcs(
vcs_sender: &VCsSender,
discord_client: &twilight_http::Client, discord_client: &twilight_http::Client,
id: Id<GuildMarker>, id: Id<GuildMarker>,
) -> VCsInGuild { ) {
if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await
&& let Ok(guild_members) = guild_members_res.model().await && let Ok(guild_members) = guild_members_res.model().await
{ {
FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move { FuturesUnordered::from_iter(
( guild_members.into_iter().map(|member| {
member.user.id, initialize_user_in_vc(vcs_sender, discord_client, id, member.user.id)
initialize_user_in_vc(discord_client, id, member.user.id).await, }),
)
}))
.filter_map(
|(user_id, channel_id_and_user_in_vc_data_option)| async move {
channel_id_and_user_in_vc_data_option
.map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data))
},
) )
.collect() .collect()
.await .await
} else {
Default::default()
} }
} }
#[tracing::instrument(skip(discord_client), ret)] #[tracing::instrument(skip(vcs_sender, discord_client))]
pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs { pub async fn initialize_vcs(vcs_sender: &VCsSender, discord_client: &twilight_http::Client) {
if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await
&& let Ok(guilds) = guilds_res.model().await && let Ok(guilds) = guilds_res.model().await
{ {
FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move { FuturesUnordered::from_iter(
let guild_vcs = initialize_server_vcs(discord_client, guild.id).await; guilds
.into_iter()
(guild.id, guild_vcs) .map(|guild| initialize_server_vcs(vcs_sender, discord_client, guild.id)),
})) )
.collect() .collect()
.await .await
} else {
Default::default()
} }
} }