fix: let heatseeking tasks be cancelled
This commit is contained in:
@@ -4,6 +4,7 @@ use futures::{StreamExt as _, stream::FuturesUnordered};
|
||||
use itertools::Itertools as _;
|
||||
use snafu::{ResultExt as _, Snafu};
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::{sync::CancellationToken, time::FutureExt};
|
||||
use twilight_model::id::{
|
||||
Id,
|
||||
marker::{ChannelMarker, GuildMarker, UserMarker},
|
||||
@@ -43,16 +44,25 @@ pub async fn heat_seek(state: State) {
|
||||
.bot_data_manager(state.bot_data_manager.clone())
|
||||
.bot_owner_user_id(state.discord_bot_owner_user_id)
|
||||
.bot_user_id(state.discord_user_id)
|
||||
.cancellation_token(state.cancellation_token.clone())
|
||||
.channel_heat_sender(channel_heat_sender)
|
||||
.vcs_in_guild_watcher(vcs_in_guild_watcher)
|
||||
.call(),
|
||||
);
|
||||
tokio::spawn(map_heat(channel_heat_watcher, heat_map_sender));
|
||||
tokio::spawn(track_hottest_vc(
|
||||
state.discord_bot_owner_user_id,
|
||||
heat_map_watcher,
|
||||
hottest_vc_sender,
|
||||
));
|
||||
tokio::spawn(
|
||||
map_heat()
|
||||
.cancellation_token(state.cancellation_token.clone())
|
||||
.channel_heat_watcher(channel_heat_watcher)
|
||||
.heat_map_sender(heat_map_sender)
|
||||
.call(),
|
||||
);
|
||||
tokio::spawn(
|
||||
track_hottest_vc()
|
||||
.cancellation_token(state.cancellation_token.clone())
|
||||
.heat_map_watcher(heat_map_watcher)
|
||||
.hottest_vc_sender(hottest_vc_sender)
|
||||
.call(),
|
||||
);
|
||||
tokio::spawn(follow_hottest_vc(
|
||||
state.clone(),
|
||||
guild_id,
|
||||
@@ -64,7 +74,13 @@ pub async fn heat_seek(state: State) {
|
||||
vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone()));
|
||||
}
|
||||
|
||||
if let Err(_closed) = vcs_watcher.changed().await {
|
||||
if matches!(
|
||||
vcs_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&state.cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -169,6 +185,7 @@ async fn evaluate_heat(
|
||||
bot_data_manager: BotDataManager,
|
||||
bot_owner_user_id: Id<UserMarker>,
|
||||
bot_user_id: Id<UserMarker>,
|
||||
cancellation_token: CancellationToken,
|
||||
|
||||
mut vcs_in_guild_watcher: watch::Receiver<Arc<VCsInGuild>>,
|
||||
channel_heat_sender: watch::Sender<ChannelHeat>,
|
||||
@@ -208,15 +225,22 @@ async fn evaluate_heat(
|
||||
for get_heat_error in get_heat_errors {
|
||||
tracing::error!(?get_heat_error, "failed to evaluate heat of channel")
|
||||
}
|
||||
|
||||
if let Err(_closed) = vcs_in_guild_watcher.changed().await {
|
||||
if matches!(
|
||||
vcs_in_guild_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[bon::builder]
|
||||
#[tracing::instrument(skip(channel_heat_watcher, heat_map_sender))]
|
||||
async fn map_heat(
|
||||
cancellation_token: CancellationToken,
|
||||
mut channel_heat_watcher: watch::Receiver<ChannelHeat>,
|
||||
heat_map_sender: watch::Sender<HeatMap>,
|
||||
) {
|
||||
@@ -234,15 +258,23 @@ async fn map_heat(
|
||||
changed
|
||||
});
|
||||
|
||||
if let Err(_closed) = channel_heat_watcher.changed().await {
|
||||
if matches!(
|
||||
channel_heat_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[bon::builder]
|
||||
#[tracing::instrument(skip(heat_map_watcher, hottest_vc_sender))]
|
||||
async fn track_hottest_vc(
|
||||
bot_owner_id: Id<UserMarker>,
|
||||
cancellation_token: CancellationToken,
|
||||
|
||||
mut heat_map_watcher: watch::Receiver<HeatMap>,
|
||||
hottest_vc_sender: watch::Sender<Option<Id<ChannelMarker>>>,
|
||||
) {
|
||||
@@ -264,7 +296,13 @@ async fn track_hottest_vc(
|
||||
modified
|
||||
});
|
||||
|
||||
if let Err(_closed) = heat_map_watcher.changed().await {
|
||||
if matches!(
|
||||
heat_map_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -347,7 +385,13 @@ async fn follow_hottest_vc(
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(_closed) = hottest_vc_watcher.changed().await {
|
||||
if matches!(
|
||||
hottest_vc_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&state.cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user