Compare commits

...

2 Commits

Author SHA1 Message Date
581a747cac fix: let heatseeking tasks be cancelled 2026-05-24 16:46:18 -04:00
c351511c8a fix: set the cancellation token when discord disconnects 2026-05-24 16:35:37 -04:00
2 changed files with 61 additions and 15 deletions

View File

@@ -4,6 +4,7 @@ use futures::{StreamExt as _, stream::FuturesUnordered};
use itertools::Itertools as _;
use snafu::{ResultExt as _, Snafu};
use tokio::sync::watch;
use tokio_util::{sync::CancellationToken, time::FutureExt};
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker, UserMarker},
@@ -43,16 +44,25 @@ pub async fn heat_seek(state: State) {
.bot_data_manager(state.bot_data_manager.clone())
.bot_owner_user_id(state.discord_bot_owner_user_id)
.bot_user_id(state.discord_user_id)
.cancellation_token(state.cancellation_token.clone())
.channel_heat_sender(channel_heat_sender)
.vcs_in_guild_watcher(vcs_in_guild_watcher)
.call(),
);
tokio::spawn(map_heat(channel_heat_watcher, heat_map_sender));
tokio::spawn(track_hottest_vc(
state.discord_bot_owner_user_id,
heat_map_watcher,
hottest_vc_sender,
));
tokio::spawn(
map_heat()
.cancellation_token(state.cancellation_token.clone())
.channel_heat_watcher(channel_heat_watcher)
.heat_map_sender(heat_map_sender)
.call(),
);
tokio::spawn(
track_hottest_vc()
.cancellation_token(state.cancellation_token.clone())
.heat_map_watcher(heat_map_watcher)
.hottest_vc_sender(hottest_vc_sender)
.call(),
);
tokio::spawn(follow_hottest_vc(
state.clone(),
guild_id,
@@ -64,7 +74,13 @@ pub async fn heat_seek(state: State) {
vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone()));
}
if let Err(_closed) = vcs_watcher.changed().await {
if matches!(
vcs_watcher
.changed()
.with_cancellation_token(&state.cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
@@ -169,6 +185,7 @@ async fn evaluate_heat(
bot_data_manager: BotDataManager,
bot_owner_user_id: Id<UserMarker>,
bot_user_id: Id<UserMarker>,
cancellation_token: CancellationToken,
mut vcs_in_guild_watcher: watch::Receiver<Arc<VCsInGuild>>,
channel_heat_sender: watch::Sender<ChannelHeat>,
@@ -208,15 +225,22 @@ async fn evaluate_heat(
for get_heat_error in get_heat_errors {
tracing::error!(?get_heat_error, "failed to evaluate heat of channel")
}
if let Err(_closed) = vcs_in_guild_watcher.changed().await {
if matches!(
vcs_in_guild_watcher
.changed()
.with_cancellation_token(&cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}
#[bon::builder]
#[tracing::instrument(skip(channel_heat_watcher, heat_map_sender))]
async fn map_heat(
cancellation_token: CancellationToken,
mut channel_heat_watcher: watch::Receiver<ChannelHeat>,
heat_map_sender: watch::Sender<HeatMap>,
) {
@@ -234,15 +258,23 @@ async fn map_heat(
changed
});
if let Err(_closed) = channel_heat_watcher.changed().await {
if matches!(
channel_heat_watcher
.changed()
.with_cancellation_token(&cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
}
#[bon::builder]
#[tracing::instrument(skip(heat_map_watcher, hottest_vc_sender))]
async fn track_hottest_vc(
bot_owner_id: Id<UserMarker>,
cancellation_token: CancellationToken,
mut heat_map_watcher: watch::Receiver<HeatMap>,
hottest_vc_sender: watch::Sender<Option<Id<ChannelMarker>>>,
) {
@@ -264,7 +296,13 @@ async fn track_hottest_vc(
modified
});
if let Err(_closed) = heat_map_watcher.changed().await {
if matches!(
heat_map_watcher
.changed()
.with_cancellation_token(&cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}
@@ -347,7 +385,13 @@ async fn follow_hottest_vc(
}
}
if let Err(_closed) = hottest_vc_watcher.changed().await {
if matches!(
hottest_vc_watcher
.changed()
.with_cancellation_token(&state.cancellation_token)
.await,
None | Some(Err(_))
) {
break;
}
}

View File

@@ -467,7 +467,7 @@ async fn handle_events(command_router: Arc<CommandRouter>, state: State, mut sha
match event_res {
Ok(twilight_model::gateway::event::Event::GatewayClose(frame_option)) => {
tracing::warn!(?frame_option);
return;
break;
}
Ok(event) => {
handle_event(command_router.clone(), state.clone(), event).await;
@@ -479,13 +479,15 @@ async fn handle_events(command_router: Arc<CommandRouter>, state: State, mut sha
) =>
{
tracing::error!(?reconnect_error);
return;
break;
}
Err(error) => {
tracing::error!(?error);
}
}
}
state.cancellation_token.cancel();
}
#[tracing::instrument(skip(command_router, state))]