Compare commits
6 Commits
a22965a3be
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
e6cd882b32
|
|||
|
ce77590777
|
|||
|
50fc35883d
|
|||
| fa0093d582 | |||
| c7300a9e6d | |||
|
a0a0632a1d
|
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -1777,11 +1777,13 @@ dependencies = [
|
||||
"extension-traits",
|
||||
"futures",
|
||||
"hound",
|
||||
"humantime",
|
||||
"itertools",
|
||||
"moka",
|
||||
"opendal",
|
||||
"opus2",
|
||||
"patricia_tree 0.10.1",
|
||||
"rayon",
|
||||
"rhai",
|
||||
"rustls 0.23.40",
|
||||
"secrecy 0.10.3",
|
||||
@@ -2517,6 +2519,12 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.9.0"
|
||||
|
||||
@@ -14,6 +14,7 @@ dashmap = "6.1.0"
|
||||
extension-traits = "2.0.2"
|
||||
futures = "0.3.32"
|
||||
hound = "3.5.1"
|
||||
humantime = "2.3.0"
|
||||
itertools = "0.14.0"
|
||||
moka = { version = "0.12.15", features = ["future"] }
|
||||
opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [
|
||||
@@ -55,6 +56,7 @@ opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109
|
||||
] }
|
||||
opus2 = "0.4.0"
|
||||
patricia_tree = "0.10.1"
|
||||
rayon = "1.12"
|
||||
rhai = { version = "1.23.6", features = ["sync"] }
|
||||
rustls = "0.23"
|
||||
secrecy = { version = "0.10.3", features = ["serde"] }
|
||||
|
||||
137
src/main.rs
137
src/main.rs
@@ -4,10 +4,19 @@ use fomo_reducer::{
|
||||
RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command,
|
||||
heat_seek, initialize_vcs, update_vcs,
|
||||
};
|
||||
use futures::{StreamExt as _, stream::FuturesUnordered};
|
||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||
use secrecy::{ExposeSecret, SecretString};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use songbird::{Config, Songbird, driver::DecodeConfig, shards::TwilightMap};
|
||||
use std::{collections::BTreeMap, fmt::Debug, str::FromStr, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::{Debug, Display},
|
||||
num::NonZero,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{select, signal::ctrl_c, task::JoinSet};
|
||||
use tokio_util::{sync::CancellationToken, time::FutureExt as _};
|
||||
use tracing::Level;
|
||||
@@ -15,7 +24,7 @@ use tracing_subscriber::{
|
||||
EnvFilter,
|
||||
fmt::{format::FmtSpan, writer::MakeWriterExt},
|
||||
};
|
||||
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt};
|
||||
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt as _};
|
||||
use twilight_model::{
|
||||
application::interaction::InteractionData,
|
||||
gateway::{
|
||||
@@ -68,6 +77,29 @@ fn parse_guild_vc_to_text_channel(
|
||||
Ok((guild, voice_channel, text_channel))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HumanDuration(Duration);
|
||||
|
||||
impl FromStr for HumanDuration {
|
||||
type Err = humantime::DurationError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
humantime::parse_duration(s).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for HumanDuration {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for HumanDuration {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", humantime::format_duration(self.0))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct AppArgs {
|
||||
#[arg(long, env)]
|
||||
@@ -89,7 +121,7 @@ struct AppArgs {
|
||||
#[arg(long, env, default_value_t = AudioChannels::Mono)]
|
||||
audio_channels: AudioChannels,
|
||||
|
||||
#[arg(long, env, default_value_t = AudioSampleRate::Hz12000)]
|
||||
#[arg(long, env, default_value_t = AudioSampleRate::Hz24000)]
|
||||
audio_sample_rate: AudioSampleRate,
|
||||
|
||||
#[arg(long, env)]
|
||||
@@ -103,6 +135,15 @@ struct AppArgs {
|
||||
|
||||
#[arg(long, env)]
|
||||
render_data: Storage,
|
||||
|
||||
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(120)))]
|
||||
watchdog_warmup: HumanDuration,
|
||||
|
||||
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))]
|
||||
watchdog_frequency: HumanDuration,
|
||||
|
||||
#[arg(long, env, default_value_t = 8.try_into().unwrap())]
|
||||
watchdog_channel_size: NonZero<usize>,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -166,6 +207,9 @@ async fn main() -> Result<(), MainError> {
|
||||
user_data,
|
||||
recording_data,
|
||||
render_data,
|
||||
watchdog_warmup: HumanDuration(watchdog_warmup),
|
||||
watchdog_frequency: HumanDuration(watchdog_frequency),
|
||||
watchdog_channel_size,
|
||||
} = app_args;
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
@@ -301,14 +345,61 @@ async fn main() -> Result<(), MainError> {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async {
|
||||
let duration = Duration::from_secs(120);
|
||||
let mut interval = tokio::time::interval(duration);
|
||||
let (mut watchdog_tx, mut watchdog_rx) =
|
||||
futures::channel::mpsc::channel(watchdog_channel_size.get());
|
||||
|
||||
std::thread::spawn({
|
||||
let discord_voice_channel_corresponding_text_channel =
|
||||
discord_voice_channel_corresponding_text_channel.clone();
|
||||
let discord_client = discord_client.clone();
|
||||
let vcs_watcher = vcs_sender.subscribe();
|
||||
|
||||
move || {
|
||||
std::thread::sleep(watchdog_warmup);
|
||||
|
||||
loop {
|
||||
tracing::debug!("waiting to send check-in");
|
||||
|
||||
if watchdog_tx.try_send(()).is_err() {
|
||||
tracing::error!("tokio runtime deadlocked");
|
||||
|
||||
vcs_watcher
|
||||
.borrow()
|
||||
.par_iter()
|
||||
.for_each(|(&guild_id, vcs_in_guild)| {
|
||||
if let Some(&voice_channel_id) =
|
||||
vcs_in_guild.get_left_for(&discord_user_id)
|
||||
{
|
||||
let text_channel_id =
|
||||
discord_voice_channel_corresponding_text_channel
|
||||
.get(&guild_id)
|
||||
.and_then(|guild_mappings| {
|
||||
guild_mappings.get_right_for(&voice_channel_id).copied()
|
||||
})
|
||||
.unwrap_or(voice_channel_id);
|
||||
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(discord_client.create_message(text_channel_id).content("so sorry I died, I'm in purgatory now, I don't like it here.\nbut I will be back in 5-20 minutes (even if it says I'm still there, I'm not currently recording and will be disconnected soon before later reconnecting and announcing recording again)").into_future());
|
||||
}
|
||||
});
|
||||
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
std::thread::sleep(watchdog_frequency);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(watchdog_warmup).await;
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
tracing::debug!("waiting to acknowledge the watchdog");
|
||||
|
||||
tracing::debug!("this process is still alive");
|
||||
if watchdog_rx.recv().await.is_err() {
|
||||
tracing::error!("watchdog died (this should be impossible)");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -403,14 +494,42 @@ async fn main() -> Result<(), MainError> {
|
||||
|
||||
select! {
|
||||
_heat_seeking_exited = &mut heat_seeking => {
|
||||
// this shouldn't happen, but let's try again
|
||||
tracing::warn!("heat seeking exited, which shouldn't happen. let's try again");
|
||||
continue;
|
||||
}
|
||||
_first_shard_exited = run_shards.join_next() => {
|
||||
tracing::warn!("a shard exited when it's not supposed to, let's try reconnecting them all");
|
||||
heat_seeking.abort();
|
||||
continue;
|
||||
}
|
||||
() = cancellation_token.cancelled() => {
|
||||
tracing::warn!("gracefully shutting down");
|
||||
|
||||
FuturesUnordered::from_iter(
|
||||
vcs_sender
|
||||
.borrow()
|
||||
.iter()
|
||||
.map(|(&guild_id, vcs_in_guild)| {
|
||||
let discord_client = discord_client.clone();
|
||||
let discord_voice_channel_corresponding_text_channel = discord_voice_channel_corresponding_text_channel.clone();
|
||||
|
||||
async move {
|
||||
if let Some(&voice_channel_id) =
|
||||
vcs_in_guild.get_left_for(&discord_user_id)
|
||||
{
|
||||
let text_channel_id =
|
||||
discord_voice_channel_corresponding_text_channel
|
||||
.get(&guild_id)
|
||||
.and_then(|guild_mappings| {
|
||||
guild_mappings.get_right_for(&voice_channel_id).copied()
|
||||
})
|
||||
.unwrap_or(voice_channel_id);
|
||||
|
||||
let _ = discord_client.create_message(text_channel_id).content("probably about to be updated, back in 5-20 minutes").await;
|
||||
}
|
||||
}
|
||||
})
|
||||
).collect::<()>().await;
|
||||
heat_seeking.await.unwrap();
|
||||
run_shards.join_all().await;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user