Compare commits
2 Commits
d373352ae1
...
581a747cac
| Author | SHA1 | Date | |
|---|---|---|---|
| 581a747cac | |||
| c351511c8a |
@@ -4,6 +4,7 @@ use futures::{StreamExt as _, stream::FuturesUnordered};
|
||||
use itertools::Itertools as _;
|
||||
use snafu::{ResultExt as _, Snafu};
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::{sync::CancellationToken, time::FutureExt};
|
||||
use twilight_model::id::{
|
||||
Id,
|
||||
marker::{ChannelMarker, GuildMarker, UserMarker},
|
||||
@@ -43,16 +44,25 @@ pub async fn heat_seek(state: State) {
|
||||
.bot_data_manager(state.bot_data_manager.clone())
|
||||
.bot_owner_user_id(state.discord_bot_owner_user_id)
|
||||
.bot_user_id(state.discord_user_id)
|
||||
.cancellation_token(state.cancellation_token.clone())
|
||||
.channel_heat_sender(channel_heat_sender)
|
||||
.vcs_in_guild_watcher(vcs_in_guild_watcher)
|
||||
.call(),
|
||||
);
|
||||
tokio::spawn(map_heat(channel_heat_watcher, heat_map_sender));
|
||||
tokio::spawn(track_hottest_vc(
|
||||
state.discord_bot_owner_user_id,
|
||||
heat_map_watcher,
|
||||
hottest_vc_sender,
|
||||
));
|
||||
tokio::spawn(
|
||||
map_heat()
|
||||
.cancellation_token(state.cancellation_token.clone())
|
||||
.channel_heat_watcher(channel_heat_watcher)
|
||||
.heat_map_sender(heat_map_sender)
|
||||
.call(),
|
||||
);
|
||||
tokio::spawn(
|
||||
track_hottest_vc()
|
||||
.cancellation_token(state.cancellation_token.clone())
|
||||
.heat_map_watcher(heat_map_watcher)
|
||||
.hottest_vc_sender(hottest_vc_sender)
|
||||
.call(),
|
||||
);
|
||||
tokio::spawn(follow_hottest_vc(
|
||||
state.clone(),
|
||||
guild_id,
|
||||
@@ -64,7 +74,13 @@ pub async fn heat_seek(state: State) {
|
||||
vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone()));
|
||||
}
|
||||
|
||||
if let Err(_closed) = vcs_watcher.changed().await {
|
||||
if matches!(
|
||||
vcs_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&state.cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -169,6 +185,7 @@ async fn evaluate_heat(
|
||||
bot_data_manager: BotDataManager,
|
||||
bot_owner_user_id: Id<UserMarker>,
|
||||
bot_user_id: Id<UserMarker>,
|
||||
cancellation_token: CancellationToken,
|
||||
|
||||
mut vcs_in_guild_watcher: watch::Receiver<Arc<VCsInGuild>>,
|
||||
channel_heat_sender: watch::Sender<ChannelHeat>,
|
||||
@@ -208,15 +225,22 @@ async fn evaluate_heat(
|
||||
for get_heat_error in get_heat_errors {
|
||||
tracing::error!(?get_heat_error, "failed to evaluate heat of channel")
|
||||
}
|
||||
|
||||
if let Err(_closed) = vcs_in_guild_watcher.changed().await {
|
||||
if matches!(
|
||||
vcs_in_guild_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[bon::builder]
|
||||
#[tracing::instrument(skip(channel_heat_watcher, heat_map_sender))]
|
||||
async fn map_heat(
|
||||
cancellation_token: CancellationToken,
|
||||
mut channel_heat_watcher: watch::Receiver<ChannelHeat>,
|
||||
heat_map_sender: watch::Sender<HeatMap>,
|
||||
) {
|
||||
@@ -234,15 +258,23 @@ async fn map_heat(
|
||||
changed
|
||||
});
|
||||
|
||||
if let Err(_closed) = channel_heat_watcher.changed().await {
|
||||
if matches!(
|
||||
channel_heat_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[bon::builder]
|
||||
#[tracing::instrument(skip(heat_map_watcher, hottest_vc_sender))]
|
||||
async fn track_hottest_vc(
|
||||
bot_owner_id: Id<UserMarker>,
|
||||
cancellation_token: CancellationToken,
|
||||
|
||||
mut heat_map_watcher: watch::Receiver<HeatMap>,
|
||||
hottest_vc_sender: watch::Sender<Option<Id<ChannelMarker>>>,
|
||||
) {
|
||||
@@ -264,7 +296,13 @@ async fn track_hottest_vc(
|
||||
modified
|
||||
});
|
||||
|
||||
if let Err(_closed) = heat_map_watcher.changed().await {
|
||||
if matches!(
|
||||
heat_map_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -347,7 +385,13 @@ async fn follow_hottest_vc(
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(_closed) = hottest_vc_watcher.changed().await {
|
||||
if matches!(
|
||||
hottest_vc_watcher
|
||||
.changed()
|
||||
.with_cancellation_token(&state.cancellation_token)
|
||||
.await,
|
||||
None | Some(Err(_))
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,7 +467,7 @@ async fn handle_events(command_router: Arc<CommandRouter>, state: State, mut sha
|
||||
match event_res {
|
||||
Ok(twilight_model::gateway::event::Event::GatewayClose(frame_option)) => {
|
||||
tracing::warn!(?frame_option);
|
||||
return;
|
||||
break;
|
||||
}
|
||||
Ok(event) => {
|
||||
handle_event(command_router.clone(), state.clone(), event).await;
|
||||
@@ -479,13 +479,15 @@ async fn handle_events(command_router: Arc<CommandRouter>, state: State, mut sha
|
||||
) =>
|
||||
{
|
||||
tracing::error!(?reconnect_error);
|
||||
return;
|
||||
break;
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!(?error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state.cancellation_token.cancel();
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(command_router, state))]
|
||||
|
||||
Reference in New Issue
Block a user