fix: crash and graceful shutdown improvements
This commit is contained in:
42
src/main.rs
42
src/main.rs
@@ -4,6 +4,7 @@ use fomo_reducer::{
|
|||||||
RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command,
|
RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command,
|
||||||
heat_seek, initialize_vcs, update_vcs,
|
heat_seek, initialize_vcs, update_vcs,
|
||||||
};
|
};
|
||||||
|
use futures::{StreamExt as _, stream::FuturesUnordered};
|
||||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||||
use secrecy::{ExposeSecret, SecretString};
|
use secrecy::{ExposeSecret, SecretString};
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
@@ -23,7 +24,7 @@ use tracing_subscriber::{
|
|||||||
EnvFilter,
|
EnvFilter,
|
||||||
fmt::{format::FmtSpan, writer::MakeWriterExt},
|
fmt::{format::FmtSpan, writer::MakeWriterExt},
|
||||||
};
|
};
|
||||||
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt};
|
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt as _};
|
||||||
use twilight_model::{
|
use twilight_model::{
|
||||||
application::interaction::InteractionData,
|
application::interaction::InteractionData,
|
||||||
gateway::{
|
gateway::{
|
||||||
@@ -135,6 +136,9 @@ struct AppArgs {
|
|||||||
#[arg(long, env)]
|
#[arg(long, env)]
|
||||||
render_data: Storage,
|
render_data: Storage,
|
||||||
|
|
||||||
|
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(120)))]
|
||||||
|
watchdog_warmup: HumanDuration,
|
||||||
|
|
||||||
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))]
|
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))]
|
||||||
watchdog_frequency: HumanDuration,
|
watchdog_frequency: HumanDuration,
|
||||||
|
|
||||||
@@ -203,6 +207,7 @@ async fn main() -> Result<(), MainError> {
|
|||||||
user_data,
|
user_data,
|
||||||
recording_data,
|
recording_data,
|
||||||
render_data,
|
render_data,
|
||||||
|
watchdog_warmup: HumanDuration(watchdog_warmup),
|
||||||
watchdog_frequency: HumanDuration(watchdog_frequency),
|
watchdog_frequency: HumanDuration(watchdog_frequency),
|
||||||
watchdog_channel_size,
|
watchdog_channel_size,
|
||||||
} = app_args;
|
} = app_args;
|
||||||
@@ -350,6 +355,8 @@ async fn main() -> Result<(), MainError> {
|
|||||||
let vcs_watcher = vcs_sender.subscribe();
|
let vcs_watcher = vcs_sender.subscribe();
|
||||||
|
|
||||||
move || {
|
move || {
|
||||||
|
std::thread::sleep(watchdog_warmup);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tracing::debug!("waiting to send check-in");
|
tracing::debug!("waiting to send check-in");
|
||||||
|
|
||||||
@@ -372,7 +379,6 @@ async fn main() -> Result<(), MainError> {
|
|||||||
.unwrap_or(voice_channel_id);
|
.unwrap_or(voice_channel_id);
|
||||||
|
|
||||||
tokio::runtime::Runtime::new().unwrap().block_on(discord_client.create_message(text_channel_id).content("so sorry I died, I'm in purgatory now, I don't like it here.\nbut I will be back in 5-20 minutes (even if it says I'm still there, I'm not currently recording and will be disconnected soon before later reconnecting and announcing recording again)").into_future());
|
tokio::runtime::Runtime::new().unwrap().block_on(discord_client.create_message(text_channel_id).content("so sorry I died, I'm in purgatory now, I don't like it here.\nbut I will be back in 5-20 minutes (even if it says I'm still there, I'm not currently recording and will be disconnected soon before later reconnecting and announcing recording again)").into_future());
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -385,6 +391,8 @@ async fn main() -> Result<(), MainError> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
tokio::time::sleep(watchdog_warmup).await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tracing::debug!("waiting to acknowledge the watchdog");
|
tracing::debug!("waiting to acknowledge the watchdog");
|
||||||
|
|
||||||
@@ -486,14 +494,42 @@ async fn main() -> Result<(), MainError> {
|
|||||||
|
|
||||||
select! {
|
select! {
|
||||||
_heat_seeking_exited = &mut heat_seeking => {
|
_heat_seeking_exited = &mut heat_seeking => {
|
||||||
// this shouldn't happen, but let's try again
|
tracing::warn!("heat seeking exited, which shouldn't happen. let's try again");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
_first_shard_exited = run_shards.join_next() => {
|
_first_shard_exited = run_shards.join_next() => {
|
||||||
|
tracing::warn!("a shard exited when it's not supposed to, let's try reconnecting them all");
|
||||||
heat_seeking.abort();
|
heat_seeking.abort();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
() = cancellation_token.cancelled() => {
|
() = cancellation_token.cancelled() => {
|
||||||
|
tracing::warn!("gracefully shutting down");
|
||||||
|
|
||||||
|
FuturesUnordered::from_iter(
|
||||||
|
vcs_sender
|
||||||
|
.borrow()
|
||||||
|
.iter()
|
||||||
|
.map(|(&guild_id, vcs_in_guild)| {
|
||||||
|
let discord_client = discord_client.clone();
|
||||||
|
let discord_voice_channel_corresponding_text_channel = discord_voice_channel_corresponding_text_channel.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
if let Some(&voice_channel_id) =
|
||||||
|
vcs_in_guild.get_left_for(&discord_user_id)
|
||||||
|
{
|
||||||
|
let text_channel_id =
|
||||||
|
discord_voice_channel_corresponding_text_channel
|
||||||
|
.get(&guild_id)
|
||||||
|
.and_then(|guild_mappings| {
|
||||||
|
guild_mappings.get_right_for(&voice_channel_id).copied()
|
||||||
|
})
|
||||||
|
.unwrap_or(voice_channel_id);
|
||||||
|
|
||||||
|
let _ = discord_client.create_message(text_channel_id).content("probably about to be updated, back in 5-20 minutes").await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
).collect::<()>().await;
|
||||||
heat_seeking.await.unwrap();
|
heat_seeking.await.unwrap();
|
||||||
run_shards.join_all().await;
|
run_shards.join_all().await;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user