diff --git a/Cargo.lock b/Cargo.lock index 13407f3..716b6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1777,6 +1777,7 @@ dependencies = [ "extension-traits", "futures", "hound", + "humantime", "itertools", "moka", "opendal", @@ -2517,6 +2518,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "hyper" version = "1.9.0" diff --git a/Cargo.toml b/Cargo.toml index 7bb45d5..830806b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ dashmap = "6.1.0" extension-traits = "2.0.2" futures = "0.3.32" hound = "3.5.1" +humantime = "2.3.0" itertools = "0.14.0" moka = { version = "0.12.15", features = ["future"] } opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [ diff --git a/src/main.rs b/src/main.rs index aa4ae64..c83741b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,14 @@ use fomo_reducer::{ use secrecy::{ExposeSecret, SecretString}; use snafu::{OptionExt, ResultExt, Snafu}; use songbird::{Config, Songbird, driver::DecodeConfig, shards::TwilightMap}; -use std::{collections::BTreeMap, fmt::Debug, str::FromStr, sync::Arc, time::Duration}; +use std::{ + collections::BTreeMap, + fmt::{Debug, Display}, + num::NonZero, + str::FromStr, + sync::Arc, + time::Duration, +}; use tokio::{select, signal::ctrl_c, task::JoinSet}; use tokio_util::{sync::CancellationToken, time::FutureExt as _}; use tracing::Level; @@ -68,6 +75,29 @@ fn parse_guild_vc_to_text_channel( Ok((guild, voice_channel, text_channel)) } +#[derive(Clone)] +struct HumanDuration(Duration); + +impl FromStr for HumanDuration { + type Err = humantime::DurationError; + + fn from_str(s: &str) -> Result { + humantime::parse_duration(s).map(Self) + } +} + +impl Debug for HumanDuration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl Display for HumanDuration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", humantime::format_duration(self.0)) + } +} + #[derive(Debug, Parser)] struct AppArgs { #[arg(long, env)] @@ -103,6 +133,12 @@ struct AppArgs { #[arg(long, env)] render_data: Storage, + + #[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))] + watchdog_frequency: HumanDuration, + + #[arg(long, env, default_value_t = 8.try_into().unwrap())] + watchdog_channel_size: NonZero, } #[derive(Parser)] @@ -166,6 +202,8 @@ async fn main() -> Result<(), MainError> { user_data, recording_data, render_data, + watchdog_frequency: HumanDuration(watchdog_frequency), + watchdog_channel_size, } = app_args; let cancellation_token = CancellationToken::new(); @@ -301,6 +339,29 @@ async fn main() -> Result<(), MainError> { } }); + let (mut watchdog_tx, mut watchdog_rx) = + futures::channel::mpsc::channel(watchdog_channel_size.get()); + + std::thread::spawn(move || { + loop { + if watchdog_tx.try_send(()).is_err() { + tracing::error!("tokio runtime deadlocked"); + std::process::exit(1); + } + + std::thread::sleep(watchdog_frequency); + } + }); + + tokio::spawn(async move { + loop { + if watchdog_rx.recv().await.is_err() { + tracing::error!("watchdog died (this should be impossible)"); + std::process::exit(1); + } + } + }); + tokio::spawn(async { let duration = Duration::from_secs(120); let mut interval = tokio::time::interval(duration);