Compare commits

...

41 Commits

Author SHA1 Message Date
e6cd882b32 fix: crash and graceful shutdown improvements 2026-06-13 00:10:54 -04:00
ce77590777 fix: watchdog panicking 2026-06-12 23:53:17 -04:00
50fc35883d chore: remove "this process is still alive" that I forgot to delete 2026-06-12 23:45:30 -04:00
fa0093d582 feat: announce disconnection upon deadlock detection 2026-06-12 20:29:50 -04:00
c7300a9e6d feat: watchdog for detecting tokio runtime deadlock (I wish I could prevent it, but I don't know what's causing it) 2026-06-12 20:01:46 -04:00
a0a0632a1d chore: raise default audio sample rate to 24 khz 2026-06-12 00:02:56 -04:00
a22965a3be feat: make reconnecting after discord disconnect faster (not obligate an entire process restart) 2026-06-12 00:01:04 -04:00
c8d676693d fix: reduce rebuilding from extraneous files 2026-06-12 00:00:07 -04:00
ba0450e999 fix: coerce sample_rate to u64 correctly 2026-06-01 23:27:46 -04:00
a7f11a7202 fix: cast sample_rate to u64 in calculations 2026-06-01 23:23:43 -04:00
4fa25305b5 fix: use u64 to prevent overflowing (I can't believe I was stupid enough to think u32 could fit it) and add logging in case I was mistaken 2026-06-01 23:18:55 -04:00
65611d676d chore: reduce logging of unnecessary things in initialize_vcs 2026-05-31 16:11:38 -04:00
c8fd99cdf1 fix: in /info move the user mention to the description field (where mentions are allowed) 2026-05-31 15:30:04 -04:00
c03ccc9d39 chore: add more logging in /render to debug 2026-05-31 15:28:16 -04:00
5e989289bd feat: don't block bot startup waiting to initialize vcs 2026-05-31 14:24:09 -04:00
f5b6dc5c76 chore: remove /join and /leave to prevent confusion 2026-05-31 02:46:47 -04:00
03689f4764 fix: don't log interaction in /render 2026-05-30 01:08:59 -04:00
20bb0e4c31 fix: switch to async read-write lock instead of sync mutex in call handler to try to prevent deadlock 2026-05-29 22:22:55 -04:00
c6aa8e5d13 fix: scope borrow in heat seeking to try to prevent deadlock 2026-05-29 22:22:15 -04:00
eb17c0a33b chore: log more in /render so I can check if the number of recordings used is as expected 2026-05-29 01:21:15 -04:00
98a7a1e6fd chore: reduce amount of logging while recording in case my home server has been struggling to keep up with it 2026-05-28 12:52:58 -04:00
b27d0f32c2 fix: encode renders as wav instead of opus to address errors 2026-05-28 12:32:31 -04:00
c85e420a75 chore: use Display when logging an opus encode error in rendering instead of Debug so I can get a better look 2026-05-28 12:15:37 -04:00
8dad5648f5 fix: sample rate doesn't need to be part of the estimated max size calculation cause samples already covers that 2026-05-28 12:07:41 -04:00
e6c2342e1a chore: debug the estimated max size in rendering 2026-05-28 11:59:21 -04:00
8f29c30bec fix: more realistic estimated max size in rendering 2026-05-28 11:47:17 -04:00
6198387cc4 fix: use the encoded bytes instead of mistakenly discarding them 2026-05-28 11:35:08 -04:00
7b5be35112 fix: encode to a new vec instead of an empty one when rendering, log more stuff for debugging 2026-05-28 11:30:48 -04:00
c351358947 chore: log more stuff around rendering to debug 2026-05-28 02:45:16 -04:00
4463ff7b3a fix: constrain seconds and milliseconds in between 2026-05-28 02:35:07 -04:00
0c052ea303 chore: log recording details in /render to try to debug why so many are falling out of range 2026-05-28 02:23:57 -04:00
31d53c1e58 fix: don't log state in /render 2026-05-28 02:14:33 -04:00
24ef5a67c4 feat: implement rendering 2026-05-28 01:48:52 -04:00
862a333131 chore: rename data managers to just managers 2026-05-28 00:50:20 -04:00
b5a56b1273 chore: initialize samples array to write to later in render 2026-05-28 00:17:28 -04:00
0137f97788 feat: implement RecordingDataManager::between and between_in_vc, start using it in /render 2026-05-27 21:48:31 -04:00
23f86ace3b feat: support reading recordings, address some warnings 2026-05-27 01:56:47 -04:00
e72633f26a chore: refactor into a RecordingDataManager, lay the ground work for a RenderManager 2026-05-27 01:28:47 -04:00
f86c094dda feat: more work on /render 2026-05-26 00:10:52 -04:00
453208ff17 chore: satisfy warnings about async fn in traits 2026-05-26 00:10:28 -04:00
cfe6ddf218 fix: look on the right side for text channels when posting the join notice in heat seeking 2026-05-24 20:03:21 -04:00
35 changed files with 1913 additions and 689 deletions

8
Cargo.lock generated
View File

@@ -1777,11 +1777,13 @@ dependencies = [
"extension-traits", "extension-traits",
"futures", "futures",
"hound", "hound",
"humantime",
"itertools", "itertools",
"moka", "moka",
"opendal", "opendal",
"opus2", "opus2",
"patricia_tree 0.10.1", "patricia_tree 0.10.1",
"rayon",
"rhai", "rhai",
"rustls 0.23.40", "rustls 0.23.40",
"secrecy 0.10.3", "secrecy 0.10.3",
@@ -2517,6 +2519,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.9.0" version = "1.9.0"

View File

@@ -14,6 +14,7 @@ dashmap = "6.1.0"
extension-traits = "2.0.2" extension-traits = "2.0.2"
futures = "0.3.32" futures = "0.3.32"
hound = "3.5.1" hound = "3.5.1"
humantime = "2.3.0"
itertools = "0.14.0" itertools = "0.14.0"
moka = { version = "0.12.15", features = ["future"] } moka = { version = "0.12.15", features = ["future"] }
opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [ opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109830b9978ba89759adfee79", features = [
@@ -55,6 +56,7 @@ opendal = { git = "https://github.com/apache/opendal", rev = "ecf840b04afd2be109
] } ] }
opus2 = "0.4.0" opus2 = "0.4.0"
patricia_tree = "0.10.1" patricia_tree = "0.10.1"
rayon = "1.12"
rhai = { version = "1.23.6", features = ["sync"] } rhai = { version = "1.23.6", features = ["sync"] }
rustls = "0.23" rustls = "0.23"
secrecy = { version = "0.10.3", features = ["serde"] } secrecy = { version = "0.10.3", features = ["serde"] }
@@ -69,7 +71,7 @@ songbird = { version = "0.6.0", default-features = false, features = [
"tws", "tws",
] } ] }
strum = { version = "0.28.0", features = ["derive"] } strum = { version = "0.28.0", features = ["derive"] }
time = "0.3.47" time = { version = "0.3.47", features = ["formatting", "macros", "parsing"] }
tokio = { version = "1.46.0", features = ["rt-multi-thread", "macros", "signal", "time"] } tokio = { version = "1.46.0", features = ["rt-multi-thread", "macros", "signal", "time"] }
tokio-util = { version = "0.7.18", features = ["io"] } tokio-util = { version = "0.7.18", features = ["io"] }
tokio-websockets-0-13 = { package = "tokio-websockets", version = "0.13", features = [ tokio-websockets-0-13 = { package = "tokio-websockets", version = "0.13", features = [

View File

@@ -1,6 +1,9 @@
use shadow_rs::{BuildPattern, ShadowBuilder}; use shadow_rs::{BuildPattern, ShadowBuilder};
fn main() { fn main() {
println!("cargo::rerun-if-changed=bot.capnp");
println!("cargo::rerun-if-changed=user.capnp");
capnpc::CompilerCommand::new() capnpc::CompilerCommand::new()
.file("bot.capnp") .file("bot.capnp")
.file("user.capnp") .file("user.capnp")

26
src/audio_channels.rs Normal file
View File

@@ -0,0 +1,26 @@
use opus2::Channels;
use strum::EnumString;
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
pub enum AudioChannels {
Mono,
Stereo,
}
impl From<AudioChannels> for Channels {
fn from(value: AudioChannels) -> Self {
match value {
AudioChannels::Mono => Channels::Mono,
AudioChannels::Stereo => Channels::Stereo,
}
}
}
impl From<AudioChannels> for songbird::driver::Channels {
fn from(value: AudioChannels) -> Self {
match value {
AudioChannels::Mono => songbird::driver::Channels::Mono,
AudioChannels::Stereo => songbird::driver::Channels::Stereo,
}
}
}

34
src/audio_sample_rate.rs Normal file
View File

@@ -0,0 +1,34 @@
use songbird::driver::SampleRate;
use strum::EnumString;
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
pub enum AudioSampleRate {
#[strum(serialize = "8000Hz")]
Hz8000,
#[strum(serialize = "12000Hz")]
Hz12000,
#[strum(serialize = "16000Hz")]
Hz16000,
#[strum(serialize = "24000Hz")]
Hz24000,
#[strum(serialize = "48000Hz")]
Hz48000,
}
impl From<AudioSampleRate> for SampleRate {
fn from(value: AudioSampleRate) -> Self {
match value {
AudioSampleRate::Hz8000 => SampleRate::Hz8000,
AudioSampleRate::Hz12000 => SampleRate::Hz12000,
AudioSampleRate::Hz16000 => SampleRate::Hz16000,
AudioSampleRate::Hz24000 => SampleRate::Hz24000,
AudioSampleRate::Hz48000 => SampleRate::Hz48000,
}
}
}
impl From<AudioSampleRate> for u32 {
fn from(value: AudioSampleRate) -> Self {
SampleRate::from(value).into()
}
}

View File

@@ -7,11 +7,11 @@ use snafu::{ResultExt as _, Snafu};
use crate::{OperatorExt, bot_capnp, option_ext::OptionExt as _}; use crate::{OperatorExt, bot_capnp, option_ext::OptionExt as _};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BotDataManager { pub struct BotManager {
operator: Operator, operator: Operator,
} }
impl BotDataManager { impl BotManager {
pub fn new(operator: Operator) -> Self { pub fn new(operator: Operator) -> Self {
Self { operator } Self { operator }
} }
@@ -32,7 +32,7 @@ pub enum WithError {
DeserializeError { source: capnp::Error }, DeserializeError { source: capnp::Error },
} }
impl BotDataManager { impl BotManager {
pub async fn with<R>( pub async fn with<R>(
&self, &self,
f: impl FnOnce(bot_capnp::bot::Reader<'_>) -> R, f: impl FnOnce(bot_capnp::bot::Reader<'_>) -> R,
@@ -101,7 +101,7 @@ pub enum UpdateError {
FinalizeError { source: std::io::Error }, FinalizeError { source: std::io::Error },
} }
impl BotDataManager { impl BotManager {
pub async fn update<R>( pub async fn update<R>(
&self, &self,
f: impl FnOnce(bot_capnp::bot::Builder<'_>) -> R, f: impl FnOnce(bot_capnp::bot::Builder<'_>) -> R,

View File

@@ -1,21 +1,19 @@
use crate::{ use crate::{
OneToManyUniqueBTreeMap, UserDataManager, option_ext::OptionExt as _, OneToManyUniqueBTreeMap, UserManager,
user_capnp::user::Consent, user_data::RECORD_IF_CONSENT_UNSPECIFIED, option_ext::OptionExt as _,
recording_data::{Clip, Recording, RecordingData, RecordingManager},
user_capnp::user::Consent,
user_data::RECORD_IF_CONSENT_UNSPECIFIED,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use futures::FutureExt; use futures::FutureExt;
use hound::{SampleFormat, WavSpec};
use opendal::Operator;
use songbird::{ use songbird::{
CoreEvent, Event, EventContext, EventHandler, Songbird, CoreEvent, Event, EventContext, EventHandler, Songbird,
driver::{Channels, SampleRate}, driver::{Channels, SampleRate},
}; };
use std::{ use std::{sync::Arc, time::Instant};
io::Cursor,
sync::{Arc, Mutex},
time::Instant,
};
use time::UtcDateTime; use time::UtcDateTime;
use tokio::sync::RwLock;
use twilight_model::id::{ use twilight_model::id::{
Id, Id,
marker::{ChannelMarker, GuildMarker, UserMarker}, marker::{ChannelMarker, GuildMarker, UserMarker},
@@ -26,17 +24,17 @@ struct Handler {
start_instant: Instant, start_instant: Instant,
start_utc: UtcDateTime, start_utc: UtcDateTime,
recording_data: Operator, recording_manager: RecordingManager,
guild_id: Id<GuildMarker>, guild_id: Id<GuildMarker>,
channel_id: Id<ChannelMarker>, channel_id: Id<ChannelMarker>,
known_ssrcs: Arc<Mutex<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>, known_ssrcs: Arc<RwLock<OneToManyUniqueBTreeMap<Id<UserMarker>, u32>>>,
audio_channels: u16, audio_channels: u16,
audio_sample_rate: u32, audio_sample_rate: u32,
user_data_manager: UserDataManager, user_manager: UserManager,
} }
#[async_trait] #[async_trait]
@@ -53,27 +51,22 @@ impl EventHandler for Handler {
let user_id = Id::new(user_id.0); let user_id = Id::new(user_id.0);
self.known_ssrcs self.known_ssrcs
.lock() .write()
.unwrap() .await
.insert(user_id, speaking.ssrc); .insert(user_id, speaking.ssrc);
} }
} }
EventContext::VoiceTick(voice_tick) => { EventContext::VoiceTick(voice_tick) => {
tracing::debug!(?voice_tick);
for (ssrc, voice_data) in &voice_tick.speaking { for (ssrc, voice_data) in &voice_tick.speaking {
let user_id = { let user_id = {
let known_ssrcs = self.known_ssrcs.lock().unwrap(); let known_ssrcs = self.known_ssrcs.read().await;
tracing::debug!(?known_ssrcs, ?ssrc);
known_ssrcs.get_left_for(ssrc).cloned() known_ssrcs.get_left_for(ssrc).cloned()
}; };
tracing::info!(?user_id);
if let Some(pcm) = &voice_data.decoded_voice { if let Some(pcm) = &voice_data.decoded_voice {
let may_record = user_id let may_record = user_id
.map_async(|user_id| { .map_async(|user_id| {
self.user_data_manager self.user_manager
.with(user_id, |user_data| { .with(user_id, |user_data| {
user_data.get_voice_recording_consent().unwrap() user_data.get_voice_recording_consent().unwrap()
}) })
@@ -104,49 +97,48 @@ impl EventHandler for Handler {
let minute = now_utc.minute(); let minute = now_utc.minute();
let second = now_utc.second(); let second = now_utc.second();
let microseconds = now_utc.microsecond(); let microsecond = now_utc.microsecond();
let guild_id = self.guild_id; let guild = self.guild_id;
let channel_id = self.channel_id; let voice_channel = self.channel_id;
let user = user_id let user = user_id;
.as_ref()
.map_or_else(|| "UNKNOWN".into(), ToString::to_string);
let path = format!( let clip = Clip {
"{year}/{month}/{day}/{hour}/{minute}/audio-{second}.{microseconds}-{guild_id}-{channel_id}-{user}.wav" second,
); microsecond,
guild,
voice_channel,
user,
};
let recording = Recording {
year,
month,
day,
hour,
minute,
clip,
};
let channels = self.audio_channels; let channels = self.audio_channels;
let sample_rate = self.audio_sample_rate; let sample_rate = self.audio_sample_rate;
let wav_spec = WavSpec { let recording_manager = self.recording_manager.clone();
let samples = pcm.clone();
let recording_data = RecordingData {
channels, channels,
sample_rate, sample_rate,
bits_per_sample: 16, samples,
sample_format: SampleFormat::Int,
}; };
let mut buffer = Vec::new();
let writer = Cursor::new(&mut buffer);
let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO");
let mut sample_writer = wav_writer.get_i16_writer(pcm.len() as u32);
for sample in pcm {
sample_writer.write_sample(*sample);
}
sample_writer.flush().expect("TODO");
wav_writer.finalize().expect("TODO");
tracing::info!("going to write the audio shortly");
let recording_data = self.recording_data.clone();
tokio::spawn(async move { tokio::spawn(async move {
recording_data.write(&path, buffer).await.expect("TODO"); recording_manager
tracing::info!(?path, "successfully wrote the audio!"); .write(&recording, recording_data)
.await
.expect("TODO");
tracing::info!(%recording, "successfully wrote the audio!");
}); });
} }
} }
@@ -174,9 +166,9 @@ pub async fn join_and_record(
audio_channels: Channels, audio_channels: Channels,
audio_sample_rate: SampleRate, audio_sample_rate: SampleRate,
guild_id: Id<GuildMarker>, guild_id: Id<GuildMarker>,
recording_data: Operator, recording_manager: RecordingManager,
songbird: &Songbird, songbird: &Songbird,
user_data_manager: UserDataManager, user_manager: UserManager,
voice_channel_id: Id<ChannelMarker>, voice_channel_id: Id<ChannelMarker>,
) -> Result<(), songbird::error::JoinError> { ) -> Result<(), songbird::error::JoinError> {
let start_instant = Instant::now(); let start_instant = Instant::now();
@@ -189,7 +181,7 @@ pub async fn join_and_record(
let handler = Handler { let handler = Handler {
start_instant, start_instant,
start_utc, start_utc,
recording_data, recording_manager,
guild_id, guild_id,
channel_id: voice_channel_id, channel_id: voice_channel_id,
known_ssrcs: Default::default(), known_ssrcs: Default::default(),
@@ -197,7 +189,7 @@ pub async fn join_and_record(
audio_channels, audio_channels,
audio_sample_rate, audio_sample_rate,
user_data_manager, user_manager,
}; };
let call = songbird.get_or_insert(guild_id); let call = songbird.get_or_insert(guild_id);

View File

@@ -88,7 +88,7 @@ pub async fn handle(state: State, interaction: Interaction) {
if is_bot_owner { if is_bot_owner {
let heat_script_description = state let heat_script_description = state
.bot_data_manager .bot_manager
.with(|bot_data| { .with(|bot_data| {
let heat_script_option = bot_data.has_heat_script().then(|| { let heat_script_option = bot_data.has_heat_script().then(|| {
bot_data bot_data
@@ -118,11 +118,11 @@ pub async fn handle(state: State, interaction: Interaction) {
.await .await
.expect("TODO"); .expect("TODO");
let mut user_id_stream = state.user_data_manager.list().await.expect("TODO"); let mut user_id_stream = state.user_manager.list().await.expect("TODO");
while let Some(user_id) = user_id_stream.try_next().await.expect("TODO") { while let Some(user_id) = user_id_stream.try_next().await.expect("TODO") {
let (consent, notification_script) = state let (consent, notification_script) = state
.user_data_manager .user_manager
.with(user_id, |user_data| { .with(user_id, |user_data| {
let consent = user_data.get_voice_recording_consent().unwrap(); let consent = user_data.get_voice_recording_consent().unwrap();
let notification_script = user_data.has_notification_script().then_some( let notification_script = user_data.has_notification_script().then_some(
@@ -145,7 +145,7 @@ pub async fn handle(state: State, interaction: Interaction) {
.interaction(state.discord_application_id) .interaction(state.discord_application_id)
.create_followup(&interaction.token) .create_followup(&interaction.token)
.embeds(&[EmbedBuilder::new() .embeds(&[EmbedBuilder::new()
.author(EmbedAuthorBuilder::new(user_mention)) .description(user_mention)
.field(EmbedFieldBuilder::new("Consent", format!("{consent:?}")).build()) .field(EmbedFieldBuilder::new("Consent", format!("{consent:?}")).build())
.field( .field(
EmbedFieldBuilder::new( EmbedFieldBuilder::new(

View File

@@ -1,170 +0,0 @@
use crate::{VCs, call::join_and_record, command::State};
use snafu::{OptionExt as _, Snafu};
use std::sync::LazyLock;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
channel::message::{Embed, MessageFlags},
http::interaction::{InteractionResponse, InteractionResponseType},
id::{
Id,
marker::{ChannelMarker, GuildMarker},
},
};
use twilight_util::builder::{
InteractionResponseDataBuilder, command::CommandBuilder, embed::EmbedBuilder,
};
const NAME: &str = "join";
const DESCRIPTION: &str = "The bot will join the same VC as you (with intention to record)";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[derive(Debug, Snafu)]
enum GetGuildAndVoiceChannelIdError {
/// this command was not used inside a guild (Discord server)
NotInGuild,
/// there is no user who invoked this command
NoUser,
/// the user is not in a voice chat in this guild
UserNotInVC,
}
#[tracing::instrument]
fn get_guild_and_voice_channel_id(
interaction: &Interaction,
vcs: &VCs,
) -> Result<(Id<GuildMarker>, Id<ChannelMarker>), GetGuildAndVoiceChannelIdError> {
let guild_id = interaction.guild_id.context(NotInGuildSnafu)?;
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.context(NoUserSnafu)?;
let &voice_channel_id = vcs
.get(&guild_id)
.context(UserNotInVCSnafu)?
.get_left_for(&user_id)
.context(UserNotInVCSnafu)?;
Ok((guild_id, voice_channel_id))
}
fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Embed {
match error {
GetGuildAndVoiceChannelIdError::NotInGuild => {
EmbedBuilder::new().title("Use this in a server").description("This bot can't find a VC to join if the command is used outside of a server (you might've used it in a DM?).").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::NoUser => {
EmbedBuilder::new().title("Not invoked by a user").description("This command works by joining the same VC as the user, but this bot didn't receive any user data. So did no user invoke it?! (This error should be impossible!)").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::UserNotInVC => {
EmbedBuilder::new().title("You're not in a VC").description("This bot can't follow you into VC if you aren't in one in this server.").validate().unwrap().build()
},
}
}
#[tracing::instrument(skip(state))]
pub async fn handle(state: State, interaction: Interaction) {
let guild_and_voice_channel_id_res =
{ get_guild_and_voice_channel_id(&interaction, &state.vcs_sender.borrow()) };
let (guild_id, voice_channel_id) = match guild_and_voice_channel_id_res {
Ok((guild_id, voice_channel_id)) => (guild_id, voice_channel_id),
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([get_guild_and_vc_error_to_embed(error)])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::DeferredChannelMessageWithSource,
data: None,
},
)
.await
.expect("TODO");
match join_and_record()
.audio_channels(state.audio_channels)
.audio_sample_rate(state.audio_sample_rate)
.guild_id(guild_id)
.recording_data(state.recording_data)
.songbird(&state.songbird)
.user_data_manager(state.user_data_manager)
.voice_channel_id(voice_channel_id)
.call()
.await
{
Ok(()) => {
let channel_mention = format!("<#{voice_channel_id}>");
let info_mention = format!(
"</{}:{}>",
state.discord_info_command_name, state.discord_info_command_id
);
let opt_in_mention = format!(
"</{}:{}>",
state.discord_opt_in_command_name, state.discord_opt_in_command_id
);
let opt_out_mention = format!(
"</{}:{}>",
state.discord_opt_out_command_name, state.discord_opt_out_command_id
);
state
.discord_client
.interaction(state.discord_application_id)
.update_response(
&interaction.token,
).embeds(Some(&[
EmbedBuilder::new()
.title("Joined VC to record")
.description(format!("This bot joined {channel_mention} and intends to record. You can opt out with {opt_out_mention} or explicitly opt in with {opt_in_mention} (I'd appreciate this one). Please use {info_mention} for more information about this bot."))
.validate()
.unwrap()
.build()
]))
.await
.expect("TODO");
}
Err(join_error) => {
tracing::error!(?join_error);
let _ = state.songbird.remove(guild_id).await;
}
}
}

View File

@@ -1,162 +0,0 @@
use crate::VCs;
use crate::command::State;
use snafu::{OptionExt, Snafu};
use std::sync::LazyLock;
use twilight_model::channel::message::{Embed, MessageFlags};
use twilight_model::http::interaction::{InteractionResponse, InteractionResponseType};
use twilight_model::id::marker::UserMarker;
use twilight_model::{
application::{
command::{Command, CommandType},
interaction::Interaction,
},
id::{
Id,
marker::{ChannelMarker, GuildMarker},
},
};
use twilight_util::builder::InteractionResponseDataBuilder;
use twilight_util::builder::command::CommandBuilder;
use twilight_util::builder::embed::EmbedBuilder;
const NAME: &str = "leave";
const DESCRIPTION: &str = "The bot will leave the VC it's in (so it won't record anyone anymore)";
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.validate()
.expect("command wasn't correct")
.build()
});
#[derive(Debug, Snafu)]
pub enum GetGuildAndVoiceChannelIdError {
/// this command was not used inside a guild (Discord server)
NotInGuild,
/// there is no user who invoked this command
NoUser,
/// the bot is not in a voice chat in this guild
BotNotInVC,
/// the user is not in a voice chat with the bot in this guild
UserNotInVCWithBot,
}
#[tracing::instrument]
pub fn get_user_and_guild_and_voice_channel_id(
bot_user_id: Id<UserMarker>,
interaction: &Interaction,
vcs: &VCs,
) -> Result<(Id<GuildMarker>, Id<ChannelMarker>), GetGuildAndVoiceChannelIdError> {
let guild_id = interaction.guild_id.context(NotInGuildSnafu)?;
let user_id = interaction
.member
.as_ref()
.and_then(|member| member.user.as_ref().map(|user| user.id))
.context(NoUserSnafu)?;
let &bot_voice_channel_id = vcs
.get(&guild_id)
.context(BotNotInVCSnafu)?
.get_left_for(&bot_user_id)
.context(BotNotInVCSnafu)?;
let &user_voice_channel_id = vcs
.get(&guild_id)
.context(UserNotInVCWithBotSnafu)?
.get_left_for(&user_id)
.context(UserNotInVCWithBotSnafu)?;
if user_voice_channel_id != bot_voice_channel_id {
return Err(GetGuildAndVoiceChannelIdError::UserNotInVCWithBot);
}
Ok((guild_id, bot_voice_channel_id))
}
fn get_guild_and_vc_error_to_embed(error: GetGuildAndVoiceChannelIdError) -> Embed {
match error {
GetGuildAndVoiceChannelIdError::NotInGuild => {
EmbedBuilder::new().title("Use this in a server").description("This bot can't tell which VC to leave if the command is used outside of a server (you might've used it in a DM?).").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::NoUser => {
EmbedBuilder::new().title("Not invoked by a user").description("This command works by joining the same VC as the user, but this bot didn't receive any user data. So did no user invoke it?! (This error should be impossible!)").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::BotNotInVC => {
EmbedBuilder::new().title("Not in a VC").description("This bot can't leave VC if it isn't in one in this server.").validate().unwrap().build()
},
GetGuildAndVoiceChannelIdError::UserNotInVCWithBot => {
EmbedBuilder::new().title("Not in a VC with the Bot").description("You have to be in the VC with the bot to make it leave (to prevent griefing and abuse).").validate().unwrap().build()
},
}
}
#[tracing::instrument]
pub async fn handle(state: State, interaction: Interaction) {
let guild_and_voice_channel_id_result = {
get_user_and_guild_and_voice_channel_id(
state.discord_user_id,
&interaction,
&state.vcs_sender.borrow(),
)
};
let (guild_id, voice_channel_id) = match guild_and_voice_channel_id_result {
Ok((guild_id, voice_channel_id)) => (guild_id, voice_channel_id),
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([get_guild_and_vc_error_to_embed(error)])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
state.songbird.leave(guild_id).await.expect("TODO");
tracing::error!("TODO: successfully left the call");
let channel_mention = format!("<#{voice_channel_id}>");
state
.discord_client
.interaction(state.discord_application_id)
.create_response(interaction.id, &interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([
EmbedBuilder::new()
.title("Left VC")
.description(format!(
"This bot left {channel_mention} (and is thereby unable to record anymore)."
))
.validate()
.unwrap()
.build()
])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},)
.await
.expect("TODO");
}

View File

@@ -1,7 +1,6 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use opendal::Operator;
use patricia_tree::StringPatriciaMap; use patricia_tree::StringPatriciaMap;
use songbird::{ use songbird::{
Songbird, Songbird,
@@ -16,20 +15,21 @@ use twilight_model::{
}, },
}; };
use crate::{BotDataManager, GuildVoiceChannelToTextChannel, UserDataManager, VCsSender}; use crate::{
AudioChannels, AudioSampleRate, BotManager, GuildVoiceChannelToTextChannel, UserManager,
VCsSender, recording_data::RecordingManager, render_data::RenderManager,
};
pub mod info; pub mod info;
pub mod join;
pub mod leave;
pub mod opt_in; pub mod opt_in;
pub mod opt_out; pub mod opt_out;
pub mod render; pub mod render;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct State { pub struct State {
pub audio_channels: Channels, pub audio_channels: AudioChannels,
pub audio_sample_rate: SampleRate, pub audio_sample_rate: AudioSampleRate,
pub bot_data_manager: BotDataManager, pub bot_manager: BotManager,
pub cancellation_token: CancellationToken, pub cancellation_token: CancellationToken,
pub discord_application_id: Id<ApplicationMarker>, pub discord_application_id: Id<ApplicationMarker>,
pub discord_bot_owner_user_id: Id<UserMarker>, pub discord_bot_owner_user_id: Id<UserMarker>,
@@ -42,9 +42,10 @@ pub struct State {
pub discord_opt_out_command_name: Arc<str>, pub discord_opt_out_command_name: Arc<str>,
pub discord_user_id: Id<UserMarker>, pub discord_user_id: Id<UserMarker>,
pub discord_voice_channel_corresponding_text_channel: Arc<GuildVoiceChannelToTextChannel>, pub discord_voice_channel_corresponding_text_channel: Arc<GuildVoiceChannelToTextChannel>,
pub recording_data: Operator, pub recording_manager: RecordingManager,
pub render_manager: RenderManager,
pub songbird: Arc<Songbird>, pub songbird: Arc<Songbird>,
pub user_data_manager: UserDataManager, pub user_manager: UserManager,
pub vcs_sender: VCsSender, pub vcs_sender: VCsSender,
} }
@@ -62,8 +63,6 @@ where
pub fn all() -> Vec<(&'static Command, BoxedHandler)> { pub fn all() -> Vec<(&'static Command, BoxedHandler)> {
vec![ vec![
(&info::COMMAND, box_handler(info::handle)), (&info::COMMAND, box_handler(info::handle)),
(&join::COMMAND, box_handler(join::handle)),
(&leave::COMMAND, box_handler(leave::handle)),
(&opt_in::COMMAND, box_handler(opt_in::handle)), (&opt_in::COMMAND, box_handler(opt_in::handle)),
(&opt_out::COMMAND, box_handler(opt_out::handle)), (&opt_out::COMMAND, box_handler(opt_out::handle)),
(&render::COMMAND, box_handler(render::handle)), (&render::COMMAND, box_handler(render::handle)),

View File

@@ -54,7 +54,7 @@ pub async fn handle(state: State, interaction: Interaction) {
}; };
let previous_consent = state let previous_consent = state
.user_data_manager .user_manager
.update(user_id, |mut user_data| { .update(user_id, |mut user_data| {
let previous_consent = user_data let previous_consent = user_data
.reborrow() .reborrow()

View File

@@ -54,7 +54,7 @@ pub async fn handle(state: State, interaction: Interaction) {
}; };
let previous_consent = state let previous_consent = state
.user_data_manager .user_manager
.update(user_id, |mut user_data| { .update(user_id, |mut user_data| {
let previous_consent = user_data let previous_consent = user_data
.reborrow() .reborrow()

View File

@@ -1,25 +1,226 @@
use std::sync::LazyLock; use futures::TryStreamExt as _;
use twilight_model::application::{ use snafu::{OptionExt as _, Report, ResultExt as _, Snafu};
command::{Command, CommandType}, use std::{collections::BTreeMap, sync::LazyLock};
interaction::Interaction, use time::{Date, Time, UtcDateTime, format_description::well_known::Rfc3339};
use twilight_model::{
application::{
command::{Command, CommandOption, CommandOptionType, CommandType},
interaction::{Interaction, InteractionData, application_command::CommandOptionValue},
},
channel::{
ChannelType,
message::{Embed, MessageFlags},
},
http::interaction::{InteractionResponse, InteractionResponseType},
id::{Id, marker::ChannelMarker},
};
use twilight_util::builder::{
InteractionResponseDataBuilder, command::CommandBuilder, embed::EmbedBuilder,
}; };
use twilight_util::builder::command::CommandBuilder;
use crate::command::State; use crate::{
command::State,
recording_data::{Clip, Recording, RecordingData},
render_data::{Render, RenderData},
};
const NAME: &str = "render"; const NAME: &str = "render";
const DESCRIPTION: &str = const DESCRIPTION: &str =
"(Only the bot owner can use this) Make an audio file from the specified range of VC"; "(Only the bot owner can use this) Make an audio file from the specified range of VC";
const OPTION_VOICE_CHANNEL: &str = "voice-channel";
const OPTION_START: &str = "start";
const OPTION_END: &str = "end";
const DATE_FORMAT: Rfc3339 = Rfc3339;
pub static COMMAND: LazyLock<Command> = LazyLock::new(|| { pub static COMMAND: LazyLock<Command> = LazyLock::new(|| {
CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput) CommandBuilder::new(NAME, DESCRIPTION, CommandType::ChatInput)
.option(CommandOption {
autocomplete: None,
channel_types: Some(vec![ChannelType::GuildVoice]),
choices: None,
description: "Which voice channel to render a recording from".into(),
description_localizations: None,
kind: CommandOptionType::Channel,
max_length: None,
max_value: None,
min_length: None,
min_value: None,
name: OPTION_VOICE_CHANNEL.into(),
name_localizations: None,
options: None,
required: Some(true),
})
.option(CommandOption {
autocomplete: None,
channel_types: None,
choices: None,
description: "What UTC datetime to start from".into(),
description_localizations: None,
kind: CommandOptionType::String,
max_length: None,
max_value: None,
min_length: None,
min_value: None,
name: OPTION_START.into(),
name_localizations: None,
options: None,
required: Some(true),
})
.option(CommandOption {
autocomplete: None,
channel_types: None,
choices: None,
description: "What UTC datetime to end at".into(),
description_localizations: None,
kind: CommandOptionType::String,
max_length: None,
max_value: None,
min_length: None,
min_value: None,
name: OPTION_END.into(),
name_localizations: None,
options: None,
required: Some(true),
})
.validate() .validate()
.expect("command wasn't correct") .expect("command wasn't correct")
.build() .build()
}); });
#[tracing::instrument] struct Options {
voice_channel_id: Id<ChannelMarker>,
start: UtcDateTime,
end: UtcDateTime,
}
#[derive(Debug, Snafu)]
enum ParseOptionsError {
/// could not get any interaction data
NoInteractionData,
/// this wasn't a command invocation
NotCommandInvocation,
/// a voice channel wasn't selected
NoVoiceChannel,
/// a start time wasn't specified
NoStart,
/// an end time wasn't specified
NoEnd,
/// voice channel was {actual:?} instead of a channel ID
VoiceChannelInvalidType { actual: CommandOptionValue },
/// start was {actual:?} instead of a string
StartInvalidType { actual: CommandOptionValue },
/// end was {actual:?} instead of a string
EndInvalidType { actual: CommandOptionValue },
/// could not parse `start` as a date in RFC3339 format
StartInvalidDate { source: time::error::Parse },
/// could not parse `start` as a date in RFC3339 format
EndInvalidDate { source: time::error::Parse },
}
impl From<ParseOptionsError> for Embed {
fn from(error: ParseOptionsError) -> Self {
EmbedBuilder::new()
.title("Error parsing options")
.description(Report::from_error(error).to_string())
.validate()
.unwrap()
.build()
}
}
fn parse_options(interaction: &Interaction) -> Result<Options, ParseOptionsError> {
let interaction_data = interaction.data.as_ref().context(NoInteractionDataSnafu)?;
let InteractionData::ApplicationCommand(command_data) = interaction_data else {
return Err(ParseOptionsError::NotCommandInvocation);
};
let mut options: BTreeMap<&str, &CommandOptionValue> = command_data
.options
.iter()
.map(|command_data_option| {
(
command_data_option.name.as_str(),
&command_data_option.value,
)
})
.collect();
let voice_channel_id = options
.remove(OPTION_VOICE_CHANNEL)
.context(NoVoiceChannelSnafu)?;
let start = options.remove(OPTION_START).context(NoStartSnafu)?;
let end = options.remove(OPTION_END).context(NoEndSnafu)?;
let &CommandOptionValue::Channel(voice_channel_id) = voice_channel_id else {
return Err(ParseOptionsError::VoiceChannelInvalidType {
actual: voice_channel_id.clone(),
});
};
let &CommandOptionValue::String(ref start) = start else {
return Err(ParseOptionsError::StartInvalidType {
actual: start.clone(),
});
};
let &CommandOptionValue::String(ref end) = end else {
return Err(ParseOptionsError::StartInvalidType {
actual: end.clone(),
});
};
let start = UtcDateTime::parse(start, &DATE_FORMAT).context(StartInvalidDateSnafu)?;
let end = UtcDateTime::parse(end, &DATE_FORMAT).context(EndInvalidDateSnafu)?;
Ok(Options {
voice_channel_id,
start,
end,
})
}
#[tracing::instrument(skip(state, interaction))]
pub async fn handle(state: State, interaction: Interaction) { pub async fn handle(state: State, interaction: Interaction) {
let Some(guild_id) = interaction.guild_id else {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([EmbedBuilder::new()
.title("Not in a server")
.description(
"This command can only work when used in a Discord server.",
)
.validate()
.unwrap()
.build()])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
};
let bot_owner_user_id = state.discord_bot_owner_user_id; let bot_owner_user_id = state.discord_bot_owner_user_id;
let is_bot_owner = interaction let is_bot_owner = interaction
@@ -29,5 +230,186 @@ pub async fn handle(state: State, interaction: Interaction) {
.map(|user_id| user_id == bot_owner_user_id) .map(|user_id| user_id == bot_owner_user_id)
.unwrap_or(false); .unwrap_or(false);
if !is_bot_owner {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([EmbedBuilder::new()
.title("No permission")
.description("Only the bot owner can use this command.")
.validate()
.unwrap()
.build()])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
let Options {
voice_channel_id,
start,
end,
} = match parse_options(&interaction) {
Ok(options) => options,
Err(error) => {
state
.discord_client
.interaction(state.discord_application_id)
.create_response(
interaction.id,
&interaction.token,
&InteractionResponse {
kind: InteractionResponseType::ChannelMessageWithSource,
data: Some(
InteractionResponseDataBuilder::new()
.embeds([error.into()])
.flags(MessageFlags::EPHEMERAL)
.build(),
),
},
)
.await
.expect("TODO");
return;
}
};
let duration = end - start;
tracing::info!(?voice_channel_id, ?start, ?end, ?duration);
let channels = state.audio_channels.into();
let sample_rate = state.audio_sample_rate.into();
let total_samples = (duration.whole_seconds() as u64 * sample_rate as u64)
+ (duration.subsec_microseconds() as u64 * sample_rate as u64 / 1_000_000);
let mut composite = vec![0; total_samples as usize];
let mut recordings =
state
.recording_manager
.between_in_vc(start, end, guild_id, voice_channel_id);
let mut recordings_used = 0;
while let Some(recording) = recordings.try_next().await.expect("TODO") {
let recording_data = state
.recording_manager
.read(&recording)
.await
.expect("TODO");
let RecordingData {
channels,
sample_rate,
samples,
} = recording_data;
let Recording {
year,
month,
day,
hour,
minute,
clip,
} = recording;
let Clip {
second,
microsecond,
guild,
voice_channel,
user,
} = clip;
let date = Date::from_calendar_date(year, month, day).unwrap();
let time = Time::from_hms_micro(hour, minute, second, microsecond).unwrap();
let datetime = UtcDateTime::new(date, time);
let after_start = datetime - start;
let progress_by_time = after_start / duration;
let origin = (after_start.whole_seconds() as u64 * sample_rate as u64)
+ (after_start.subsec_microseconds() as u64 * sample_rate as u64 / 1_000_000);
let origin = origin as usize;
let progress_by_sample = (origin as f64) / (total_samples as f64);
let samples_length = samples.len();
let extent = origin + samples_length;
tracing::debug!(
progress_by_time,
progress_by_sample,
?after_start,
?duration,
origin,
total_samples,
samples_length,
extent
);
for (i, sample) in samples.into_iter().enumerate() {
if let Some(composite_sample) = composite.get_mut(origin + i) {
*composite_sample += sample;
} else {
tracing::error!(
?start,
?end,
?year,
?month,
?day,
?hour,
?minute,
?second,
?microsecond,
origin,
samples_length,
extent,
i,
total_samples,
"out of range"
);
}
}
recordings_used += 1;
}
tracing::debug!(?recordings_used);
let render = Render {
start,
end,
guild_id,
voice_channel_id,
};
let render_data = RenderData {
channels,
sample_rate,
samples: composite,
};
let render_result = state.render_manager.write(&render, render_data).await;
tracing::info!(?render_result);
render_result.expect("TODO");
tracing::info!(%render, "written");
todo!(); todo!();
} }

View File

@@ -12,7 +12,7 @@ use twilight_model::id::{
use twilight_util::builder::embed::EmbedBuilder; use twilight_util::builder::embed::EmbedBuilder;
use crate::{ use crate::{
BotDataManager, OneToManyUniqueBTreeMap, State, UserInVCData, bot_data, BotManager, OneToManyUniqueBTreeMap, State, UserInVCData, bot_data,
call::join_and_record, call::join_and_record,
track_vcs::VCsInGuild, track_vcs::VCsInGuild,
vc_user::{Camera, Headphone, Microphone, Stream}, vc_user::{Camera, Headphone, Microphone, Stream},
@@ -30,18 +30,22 @@ pub async fn heat_seek(state: State) {
let mut vcs_in_guild_senders = BTreeMap::default(); let mut vcs_in_guild_senders = BTreeMap::default();
loop { loop {
{
for (&guild_id, vcs_in_guild) in &*vcs_watcher.borrow() { for (&guild_id, vcs_in_guild) in &*vcs_watcher.borrow() {
let vcs_in_guild_sender = vcs_in_guild_senders.entry(guild_id).or_insert_with(|| { let vcs_in_guild_sender =
vcs_in_guild_senders.entry(guild_id).or_insert_with(|| {
let (vcs_in_guild_sender, vcs_in_guild_watcher) = let (vcs_in_guild_sender, vcs_in_guild_watcher) =
watch::channel(Default::default()); watch::channel(Default::default());
let (channel_heat_sender, channel_heat_watcher) = let (channel_heat_sender, channel_heat_watcher) =
watch::channel(Default::default()); watch::channel(Default::default());
let (heat_map_sender, heat_map_watcher) = watch::channel(Default::default()); let (heat_map_sender, heat_map_watcher) =
let (hottest_vc_sender, hottest_vc_watcher) = watch::channel(Default::default()); watch::channel(Default::default());
let (hottest_vc_sender, hottest_vc_watcher) =
watch::channel(Default::default());
tokio::spawn( tokio::spawn(
evaluate_heat() evaluate_heat()
.bot_data_manager(state.bot_data_manager.clone()) .bot_manager(state.bot_manager.clone())
.bot_owner_user_id(state.discord_bot_owner_user_id) .bot_owner_user_id(state.discord_bot_owner_user_id)
.bot_user_id(state.discord_user_id) .bot_user_id(state.discord_user_id)
.cancellation_token(state.cancellation_token.clone()) .cancellation_token(state.cancellation_token.clone())
@@ -73,6 +77,7 @@ pub async fn heat_seek(state: State) {
}); });
vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone())); vcs_in_guild_sender.send_replace(Arc::new(vcs_in_guild.clone()));
} }
}
if matches!( if matches!(
vcs_watcher vcs_watcher
@@ -110,9 +115,9 @@ async fn get_heat(
users_in_vc: &BTreeMap<Id<UserMarker>, UserInVCData>, users_in_vc: &BTreeMap<Id<UserMarker>, UserInVCData>,
bot_user_id: Id<UserMarker>, bot_user_id: Id<UserMarker>,
bot_owner_user_id: Id<UserMarker>, bot_owner_user_id: Id<UserMarker>,
bot_data_manager: &BotDataManager, bot_manager: &BotManager,
) -> Result<Heat, GetHeatError> { ) -> Result<Heat, GetHeatError> {
let heat_script = bot_data_manager let heat_script = bot_manager
.with(|bot_data| { .with(|bot_data| {
bot_data.has_heat_script().then(|| { bot_data.has_heat_script().then(|| {
bot_data bot_data
@@ -169,7 +174,8 @@ async fn get_heat(
} }
} }
let bot_owner_might_be_listening = bot_owner.is_some_and(|user_data| matches!(user_data.headphone, Headphone::Undeafened)); let bot_owner_might_be_listening =
bot_owner.is_some_and(|user_data| matches!(user_data.headphone, Headphone::Undeafened));
if bot_owner_might_be_listening { if bot_owner_might_be_listening {
heat = heat.min(999); heat = heat.min(999);
@@ -184,7 +190,7 @@ async fn get_heat(
#[bon::builder] #[bon::builder]
#[tracing::instrument(skip(vcs_in_guild_watcher, channel_heat_sender))] #[tracing::instrument(skip(vcs_in_guild_watcher, channel_heat_sender))]
async fn evaluate_heat( async fn evaluate_heat(
bot_data_manager: BotDataManager, bot_manager: BotManager,
bot_owner_user_id: Id<UserMarker>, bot_owner_user_id: Id<UserMarker>,
bot_user_id: Id<UserMarker>, bot_user_id: Id<UserMarker>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
@@ -198,12 +204,12 @@ async fn evaluate_heat(
let channel_heat_results: BTreeMap<_, _> = { let channel_heat_results: BTreeMap<_, _> = {
FuturesUnordered::from_iter((&*vcs_in_guild).into_iter().map( FuturesUnordered::from_iter((&*vcs_in_guild).into_iter().map(
|(&channel_id, users_in_vc)| { |(&channel_id, users_in_vc)| {
let bot_data_manager = bot_data_manager.clone(); let bot_manager = bot_manager.clone();
async move { async move {
( (
channel_id, channel_id,
get_heat() get_heat()
.bot_data_manager(&bot_data_manager) .bot_manager(&bot_manager)
.bot_owner_user_id(bot_owner_user_id) .bot_owner_user_id(bot_owner_user_id)
.bot_user_id(bot_user_id) .bot_user_id(bot_user_id)
.users_in_vc(users_in_vc) .users_in_vc(users_in_vc)
@@ -322,12 +328,12 @@ async fn follow_hottest_vc(
match hottest_vc_option { match hottest_vc_option {
Some(hottest_vc) => { Some(hottest_vc) => {
match join_and_record() match join_and_record()
.audio_channels(state.audio_channels) .audio_channels(state.audio_channels.into())
.audio_sample_rate(state.audio_sample_rate) .audio_sample_rate(state.audio_sample_rate.into())
.guild_id(guild_id) .guild_id(guild_id)
.recording_data(state.recording_data.clone()) .recording_manager(state.recording_manager.clone())
.songbird(&state.songbird) .songbird(&state.songbird)
.user_data_manager(state.user_data_manager.clone()) .user_manager(state.user_manager.clone())
.voice_channel_id(hottest_vc) .voice_channel_id(hottest_vc)
.call() .call()
.await .await
@@ -337,7 +343,7 @@ async fn follow_hottest_vc(
.discord_voice_channel_corresponding_text_channel .discord_voice_channel_corresponding_text_channel
.get(&guild_id) .get(&guild_id)
.and_then(|guild_mappings| { .and_then(|guild_mappings| {
guild_mappings.get_left_for(&hottest_vc).copied() guild_mappings.get_right_for(&hottest_vc).copied()
}) })
.unwrap_or(hottest_vc); .unwrap_or(hottest_vc);

View File

@@ -1,3 +1,5 @@
mod audio_channels;
mod audio_sample_rate;
mod bot_data; mod bot_data;
mod call; mod call;
pub mod command; pub mod command;
@@ -7,6 +9,8 @@ mod one_to_many_with_data;
mod one_to_one; mod one_to_one;
mod operator_ext; mod operator_ext;
mod option_ext; mod option_ext;
mod recording_data;
mod render_data;
mod storage; mod storage;
mod track_vcs; mod track_vcs;
mod user_data; mod user_data;
@@ -15,14 +19,18 @@ capnp::generated_code!(mod bot_capnp);
capnp::generated_code!(mod user_capnp); capnp::generated_code!(mod user_capnp);
shadow_rs::shadow!(build_info); shadow_rs::shadow!(build_info);
pub use bot_data::BotDataManager; pub use audio_channels::AudioChannels;
pub use audio_sample_rate::AudioSampleRate;
pub use bot_data::BotManager;
pub use command::{Router as CommandRouter, State, all as all_commands}; pub use command::{Router as CommandRouter, State, all as all_commands};
pub use heat_seek::heat_seek; pub use heat_seek::heat_seek;
pub use one_to_many::OneToManyUniqueBTreeMap; pub use one_to_many::OneToManyUniqueBTreeMap;
pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData; pub use one_to_many_with_data::OneToManyUniqueBTreeMapWithData;
pub use one_to_one::OneToOneBTreeMap; pub use one_to_one::OneToOneBTreeMap;
pub use operator_ext::OperatorExt; pub use operator_ext::OperatorExt;
pub use recording_data::RecordingManager;
pub use render_data::RenderManager;
pub use storage::Storage; pub use storage::Storage;
pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, VCsSender, initialize_vcs, update_vcs}; pub use track_vcs::{GuildVoiceChannelToTextChannel, VCs, VCsSender, initialize_vcs, update_vcs};
pub use user_data::UserDataManager; pub use user_data::UserManager;
pub use vc_user::{UserInVCData, VoiceStatus}; pub use vc_user::{UserInVCData, VoiceStatus};

View File

@@ -1,17 +1,22 @@
use clap::Parser; use clap::Parser;
use fomo_reducer::{ use fomo_reducer::{
BotDataManager, CommandRouter, GuildVoiceChannelToTextChannel, State, Storage, UserDataManager, AudioChannels, AudioSampleRate, BotManager, CommandRouter, GuildVoiceChannelToTextChannel,
VCsSender, all_commands, command, heat_seek, initialize_vcs, update_vcs, RecordingManager, RenderManager, State, Storage, UserManager, VCsSender, all_commands, command,
heat_seek, initialize_vcs, update_vcs,
}; };
use futures::{StreamExt as _, stream::FuturesUnordered};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use secrecy::{ExposeSecret, SecretString}; use secrecy::{ExposeSecret, SecretString};
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use songbird::{ use songbird::{Config, Songbird, driver::DecodeConfig, shards::TwilightMap};
Config, Songbird, use std::{
driver::{Channels, DecodeConfig, SampleRate}, collections::BTreeMap,
shards::TwilightMap, fmt::{Debug, Display},
num::NonZero,
str::FromStr,
sync::Arc,
time::Duration,
}; };
use std::{collections::BTreeMap, fmt::Debug, str::FromStr, sync::Arc, time::Duration};
use strum::EnumString;
use tokio::{select, signal::ctrl_c, task::JoinSet}; use tokio::{select, signal::ctrl_c, task::JoinSet};
use tokio_util::{sync::CancellationToken, time::FutureExt as _}; use tokio_util::{sync::CancellationToken, time::FutureExt as _};
use tracing::Level; use tracing::Level;
@@ -19,7 +24,7 @@ use tracing_subscriber::{
EnvFilter, EnvFilter,
fmt::{format::FmtSpan, writer::MakeWriterExt}, fmt::{format::FmtSpan, writer::MakeWriterExt},
}; };
use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt}; use twilight_gateway::{Event, EventTypeFlags, Intents, Shard, StreamExt as _};
use twilight_model::{ use twilight_model::{
application::interaction::InteractionData, application::interaction::InteractionData,
gateway::{ gateway::{
@@ -32,47 +37,6 @@ use twilight_model::{
}, },
}; };
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
enum AudioChannels {
Mono,
Stereo,
}
impl From<AudioChannels> for Channels {
fn from(value: AudioChannels) -> Self {
match value {
AudioChannels::Mono => Channels::Mono,
AudioChannels::Stereo => Channels::Stereo,
}
}
}
#[derive(Clone, Copy, Debug, strum::Display, EnumString)]
enum AudioSampleRate {
#[strum(serialize = "8000Hz")]
Hz8000,
#[strum(serialize = "12000Hz")]
Hz12000,
#[strum(serialize = "16000Hz")]
Hz16000,
#[strum(serialize = "24000Hz")]
Hz24000,
#[strum(serialize = "48000Hz")]
Hz48000,
}
impl From<AudioSampleRate> for SampleRate {
fn from(value: AudioSampleRate) -> Self {
match value {
AudioSampleRate::Hz8000 => SampleRate::Hz8000,
AudioSampleRate::Hz12000 => SampleRate::Hz12000,
AudioSampleRate::Hz16000 => SampleRate::Hz16000,
AudioSampleRate::Hz24000 => SampleRate::Hz24000,
AudioSampleRate::Hz48000 => SampleRate::Hz48000,
}
}
}
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
enum ParseGuildVCToTextChannelError { enum ParseGuildVCToTextChannelError {
/// the guild ID needs to be included with : before the voice channel to text channel mapping /// the guild ID needs to be included with : before the voice channel to text channel mapping
@@ -113,6 +77,29 @@ fn parse_guild_vc_to_text_channel(
Ok((guild, voice_channel, text_channel)) Ok((guild, voice_channel, text_channel))
} }
#[derive(Clone)]
struct HumanDuration(Duration);
impl FromStr for HumanDuration {
type Err = humantime::DurationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
humantime::parse_duration(s).map(Self)
}
}
impl Debug for HumanDuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Display for HumanDuration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", humantime::format_duration(self.0))
}
}
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
struct AppArgs { struct AppArgs {
#[arg(long, env)] #[arg(long, env)]
@@ -125,7 +112,7 @@ struct AppArgs {
discord_nickname: Option<Arc<str>>, discord_nickname: Option<Arc<str>>,
#[arg(long, env)] #[arg(long, env)]
discord_status: Option<Arc<str>>, discord_status: Option<String>,
#[arg(long, env, value_parser = parse_guild_vc_to_text_channel)] #[arg(long, env, value_parser = parse_guild_vc_to_text_channel)]
discord_voice_channel_corresponding_text_channel: discord_voice_channel_corresponding_text_channel:
@@ -134,7 +121,7 @@ struct AppArgs {
#[arg(long, env, default_value_t = AudioChannels::Mono)] #[arg(long, env, default_value_t = AudioChannels::Mono)]
audio_channels: AudioChannels, audio_channels: AudioChannels,
#[arg(long, env, default_value_t = AudioSampleRate::Hz12000)] #[arg(long, env, default_value_t = AudioSampleRate::Hz24000)]
audio_sample_rate: AudioSampleRate, audio_sample_rate: AudioSampleRate,
#[arg(long, env)] #[arg(long, env)]
@@ -145,6 +132,18 @@ struct AppArgs {
#[arg(long, env)] #[arg(long, env)]
recording_data: Storage, recording_data: Storage,
#[arg(long, env)]
render_data: Storage,
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(120)))]
watchdog_warmup: HumanDuration,
#[arg(long, env, default_value_t = HumanDuration(Duration::from_secs(5)))]
watchdog_frequency: HumanDuration,
#[arg(long, env, default_value_t = 8.try_into().unwrap())]
watchdog_channel_size: NonZero<usize>,
} }
#[derive(Parser)] #[derive(Parser)]
@@ -207,6 +206,10 @@ async fn main() -> Result<(), MainError> {
bot_data, bot_data,
user_data, user_data,
recording_data, recording_data,
render_data,
watchdog_warmup: HumanDuration(watchdog_warmup),
watchdog_frequency: HumanDuration(watchdog_frequency),
watchdog_channel_size,
} = app_args; } = app_args;
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
@@ -257,35 +260,6 @@ async fn main() -> Result<(), MainError> {
let discord_application_id = current_application.id; let discord_application_id = current_application.id;
let intents = Intents::GUILD_VOICE_STATES;
let config = twilight_gateway::Config::new(discord_token.expose_secret().to_owned(), intents);
let shards = twilight_gateway::create_recommended(&discord_client, config, |_id, builder| {
builder.build()
})
.await
.expect("TODO");
let shards = Vec::from_iter(shards);
let senders = TwilightMap::new(
shards
.iter()
.map(|shard| (shard.id().number(), shard.sender()))
.collect(),
);
let audio_channels = audio_channels.into();
let audio_sample_rate = audio_sample_rate.into();
let senders = Arc::new(senders);
let songbird = Songbird::twilight(senders, discord_user_id);
songbird.set_config(
Config::default().decode_mode(songbird::driver::DecodeMode::Decode(DecodeConfig::new(
audio_channels,
audio_sample_rate,
))),
);
let interaction_client = discord_client.interaction(discord_application_id); let interaction_client = discord_client.interaction(discord_application_id);
let commands = all_commands(); let commands = all_commands();
@@ -325,25 +299,25 @@ async fn main() -> Result<(), MainError> {
let discord_opt_in_command_id = discord_opt_in_command.id.expect("TODO"); let discord_opt_in_command_id = discord_opt_in_command.id.expect("TODO");
let discord_opt_out_command_id = discord_opt_out_command.id.expect("TODO"); let discord_opt_out_command_id = discord_opt_out_command.id.expect("TODO");
let discord_info_command_name = discord_info_command.name.into(); let discord_info_command_name: Arc<str> = discord_info_command.name.into();
let discord_opt_in_command_name = discord_opt_in_command.name.into(); let discord_opt_in_command_name: Arc<str> = discord_opt_in_command.name.into();
let discord_opt_out_command_name = discord_opt_out_command.name.into(); let discord_opt_out_command_name: Arc<str> = discord_opt_out_command.name.into();
let vcs = initialize_vcs(&discord_client).await;
let command_router = CommandRouter::from_iter(commands); let command_router = CommandRouter::from_iter(commands);
let command_router = Arc::new(command_router); let command_router = Arc::new(command_router);
let discord_client = Arc::new(discord_client); let discord_client = Arc::new(discord_client);
let songbird = Arc::new(songbird); let vcs_sender = VCsSender::new(Default::default());
let vcs_sender = VCsSender::new(vcs);
let bot_data = bot_data.into_inner(); let bot_data = bot_data.into_inner();
let recording_data = recording_data.into_inner(); let recording_data = recording_data.into_inner();
let render_data = render_data.into_inner();
let user_data = user_data.into_inner(); let user_data = user_data.into_inner();
let bot_data_manager = BotDataManager::new(bot_data); let bot_manager = BotManager::new(bot_data);
let user_data_manager = UserDataManager::new(user_data); let recording_manager = RecordingManager::new(recording_data);
let render_manager = RenderManager::new(render_data);
let user_manager = UserManager::new(user_data);
let discord_voice_channel_corresponding_text_channel = { let discord_voice_channel_corresponding_text_channel = {
let mut map = GuildVoiceChannelToTextChannel::default(); let mut map = GuildVoiceChannelToTextChannel::default();
@@ -361,38 +335,117 @@ async fn main() -> Result<(), MainError> {
let discord_voice_channel_corresponding_text_channel = let discord_voice_channel_corresponding_text_channel =
Arc::new(discord_voice_channel_corresponding_text_channel); Arc::new(discord_voice_channel_corresponding_text_channel);
let state = State { tokio::spawn({
audio_channels, let cancellation_token = cancellation_token.clone();
audio_sample_rate, async move {
bot_data_manager, match ctrl_c().await {
cancellation_token: cancellation_token.clone(), Ok(()) => cancellation_token.cancel(),
discord_application_id, Err(error) => tracing::error!(?error, "failed to listen for interrupt signal"),
discord_bot_owner_user_id, }
discord_client, }
discord_info_command_id, });
discord_info_command_name,
discord_opt_in_command_id,
discord_opt_in_command_name,
discord_opt_out_command_id,
discord_opt_out_command_name,
discord_user_id,
discord_voice_channel_corresponding_text_channel,
recording_data,
songbird,
user_data_manager,
vcs_sender,
};
let heat_seeking = tokio::spawn(heat_seek(state.clone())); let (mut watchdog_tx, mut watchdog_rx) =
futures::channel::mpsc::channel(watchdog_channel_size.get());
if let Some(discord_status) = discord_status { std::thread::spawn({
let discord_voice_channel_corresponding_text_channel =
discord_voice_channel_corresponding_text_channel.clone();
let discord_client = discord_client.clone();
let vcs_watcher = vcs_sender.subscribe();
move || {
std::thread::sleep(watchdog_warmup);
loop {
tracing::debug!("waiting to send check-in");
if watchdog_tx.try_send(()).is_err() {
tracing::error!("tokio runtime deadlocked");
vcs_watcher
.borrow()
.par_iter()
.for_each(|(&guild_id, vcs_in_guild)| {
if let Some(&voice_channel_id) =
vcs_in_guild.get_left_for(&discord_user_id)
{
let text_channel_id =
discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&voice_channel_id).copied()
})
.unwrap_or(voice_channel_id);
tokio::runtime::Runtime::new().unwrap().block_on(discord_client.create_message(text_channel_id).content("so sorry I died, I'm in purgatory now, I don't like it here.\nbut I will be back in 5-20 minutes (even if it says I'm still there, I'm not currently recording and will be disconnected soon before later reconnecting and announcing recording again)").into_future());
}
});
std::process::exit(1);
}
std::thread::sleep(watchdog_frequency);
}
}
});
tokio::spawn(async move {
tokio::time::sleep(watchdog_warmup).await;
loop {
tracing::debug!("waiting to acknowledge the watchdog");
if watchdog_rx.recv().await.is_err() {
tracing::error!("watchdog died (this should be impossible)");
std::process::exit(1);
}
}
});
loop {
tokio::spawn({
let vcs_sender = vcs_sender.clone();
let discord_client = discord_client.clone();
async move { initialize_vcs(&vcs_sender, &discord_client).await }
});
let intents = Intents::GUILD_VOICE_STATES;
let config =
twilight_gateway::Config::new(discord_token.expose_secret().to_owned(), intents);
let shards =
twilight_gateway::create_recommended(&discord_client, config, |_id, builder| {
builder.build()
})
.await
.expect("TODO");
let shards = Vec::from_iter(shards);
let senders = TwilightMap::new(
shards
.iter()
.map(|shard| (shard.id().number(), shard.sender()))
.collect(),
);
let senders = Arc::new(senders);
let songbird = Songbird::twilight(senders, discord_user_id);
songbird.set_config(
Config::default().decode_mode(songbird::driver::DecodeMode::Decode(DecodeConfig::new(
audio_channels.into(),
audio_sample_rate.into(),
))),
);
if let Some(discord_status) = &discord_status {
shards.iter().for_each(|shard| { shards.iter().for_each(|shard| {
shard.command( shard.command(
&UpdatePresence::new( &UpdatePresence::new(
vec![ vec![
MinimalActivity { MinimalActivity {
kind: ActivityType::Listening, kind: ActivityType::Listening,
name: (*discord_status).to_owned(), name: discord_status.clone(),
url: None, url: None,
} }
.into(), .into(),
@@ -406,48 +459,82 @@ async fn main() -> Result<(), MainError> {
}); });
} }
let songbird = Arc::new(songbird);
let state = State {
audio_channels,
audio_sample_rate,
bot_manager: bot_manager.clone(),
cancellation_token: cancellation_token.clone(),
discord_application_id,
discord_bot_owner_user_id,
discord_client: discord_client.clone(),
discord_info_command_id,
discord_info_command_name: discord_info_command_name.clone(),
discord_opt_in_command_id,
discord_opt_in_command_name: discord_opt_in_command_name.clone(),
discord_opt_out_command_id,
discord_opt_out_command_name: discord_opt_out_command_name.clone(),
discord_user_id,
discord_voice_channel_corresponding_text_channel:
discord_voice_channel_corresponding_text_channel.clone(),
recording_manager: recording_manager.clone(),
render_manager: render_manager.clone(),
songbird,
user_manager: user_manager.clone(),
vcs_sender: vcs_sender.clone(),
};
let mut heat_seeking = tokio::spawn(heat_seek(state.clone()));
let run_shards = shards let run_shards = shards
.into_iter() .into_iter()
.map(|shard| handle_events(command_router.clone(), state.clone(), shard)); .map(|shard| handle_events(command_router.clone(), state.clone(), shard));
let run_shards = JoinSet::from_iter(run_shards); let mut run_shards = JoinSet::from_iter(run_shards);
let run_shards = run_shards.join_all();
tokio::spawn({
let cancellation_token = cancellation_token.clone();
async move {
match ctrl_c().await {
Ok(()) => cancellation_token.cancel(),
Err(error) => tracing::error!(?error, "failed to listen for interrupt signal"),
}
}
});
tokio::spawn(async {
let duration = Duration::from_secs(120);
let mut interval = tokio::time::interval(duration);
loop {
interval.tick().await;
tracing::debug!("this process is still alive");
}
});
let finished_naturally = async move {
heat_seeking.await.unwrap();
run_shards.await;
};
tokio::pin!(finished_naturally);
select! { select! {
_ = &mut finished_naturally => { _heat_seeking_exited = &mut heat_seeking => {
Ok(()) tracing::warn!("heat seeking exited, which shouldn't happen. let's try again");
continue;
}
_first_shard_exited = run_shards.join_next() => {
tracing::warn!("a shard exited when it's not supposed to, let's try reconnecting them all");
heat_seeking.abort();
continue;
} }
() = cancellation_token.cancelled() => { () = cancellation_token.cancelled() => {
tracing::warn!("waiting for tasks to gracefully shut down"); tracing::warn!("gracefully shutting down");
finished_naturally.await;
Err(MainError::Cancelled) FuturesUnordered::from_iter(
vcs_sender
.borrow()
.iter()
.map(|(&guild_id, vcs_in_guild)| {
let discord_client = discord_client.clone();
let discord_voice_channel_corresponding_text_channel = discord_voice_channel_corresponding_text_channel.clone();
async move {
if let Some(&voice_channel_id) =
vcs_in_guild.get_left_for(&discord_user_id)
{
let text_channel_id =
discord_voice_channel_corresponding_text_channel
.get(&guild_id)
.and_then(|guild_mappings| {
guild_mappings.get_right_for(&voice_channel_id).copied()
})
.unwrap_or(voice_channel_id);
let _ = discord_client.create_message(text_channel_id).content("probably about to be updated, back in 5-20 minutes").await;
}
}
})
).collect::<()>().await;
heat_seeking.await.unwrap();
run_shards.join_all().await;
return Err(MainError::Cancelled);
}
} }
} }
} }

View File

@@ -3,18 +3,24 @@ use opendal::{Buffer, Error, ErrorKind, FuturesAsyncReader, Operator};
#[extension(pub trait OperatorExt)] #[extension(pub trait OperatorExt)]
impl Operator { impl Operator {
async fn read_if_exists(&self, path: &str) -> Result<Option<Buffer>, Error> { fn read_if_exists(
&self,
path: &str,
) -> impl Future<Output = Result<Option<Buffer>, Error>> + Send {
async {
match self.read(path).await { match self.read(path).await {
Ok(buffer) => Ok(Some(buffer)), Ok(buffer) => Ok(Some(buffer)),
Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None), Err(error) if matches!(error.kind(), ErrorKind::NotFound) => Ok(None),
Err(error) => Err(error), Err(error) => Err(error),
} }
} }
}
async fn async_reader_if_exists( fn async_reader_if_exists(
&self, &self,
path: &str, path: &str,
) -> Result<Option<FuturesAsyncReader>, Error> { ) -> impl Future<Output = Result<Option<FuturesAsyncReader>, Error>> + Send {
async {
let reader = self.reader(path).await?; let reader = self.reader(path).await?;
match reader.into_futures_async_read(..).await { match reader.into_futures_async_read(..).await {
Ok(reader) => Ok(Some(reader)), Ok(reader) => Ok(Some(reader)),
@@ -23,3 +29,4 @@ impl Operator {
} }
} }
} }
}

View File

@@ -0,0 +1,196 @@
use futures::{SinkExt, StreamExt as _, TryStream, TryStreamExt as _};
use snafu::Snafu;
use time::{Month, UtcDateTime};
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker},
};
use super::{ClipEntryError, ListError, Recording, RecordingManager};
const BUFFER_SIZE: usize = 2048;
#[derive(Debug, Snafu)]
pub enum RecordingEntryError {
/// could not list (at least some) clips
ListClipsError { source: ListError },
/// could not receive this clip entry
ClipEntryError { source: ClipEntryError },
}
impl RecordingManager {
pub fn between(
&self,
start: UtcDateTime,
end: UtcDateTime,
) -> impl TryStream<Ok = Recording, Error = RecordingEntryError> + Unpin {
let this = self.clone();
let (mut sink, stream) = futures::channel::mpsc::channel(BUFFER_SIZE);
tokio::spawn(async move {
let year_start = start.year();
let year_end = end.year();
let years = year_start..=year_end;
for year in years {
let mut month_start = start.month();
let mut month_end = end.month();
if year > year_start {
month_start = Month::January;
}
if year < year_end {
month_end = Month::December;
}
let months = month_start as u8..=month_end as u8;
let months = months.map(|month| Month::try_from(month).unwrap());
for month in months {
let mut day_start = start.day();
let mut day_end = end.day();
if month > month_start {
day_start = 1;
}
if month < month_end {
day_end = 31;
}
let days = day_start..=day_end;
for day in days {
let mut hour_start = start.hour();
let mut hour_end = end.hour();
if day > day_start {
hour_start = 0;
}
if day < day_end {
hour_end = 23;
}
let hours = hour_start..=hour_end;
for hour in hours {
let mut minute_start = start.minute();
let mut minute_end = end.minute();
if hour > hour_start {
minute_start = 0;
}
if hour < hour_end {
minute_end = 59;
}
let minutes = minute_start..=minute_end;
for minute in minutes {
match this.clips(year, month, day, hour, minute).await {
Err(list_error) => {
let _ = sink
.send(Err(RecordingEntryError::ListClipsError {
source: list_error,
}))
.await;
}
Ok(clips) => {
let mut clips = clips.into_stream();
while let Some(clip_result) = clips.next().await {
match clip_result {
Err(entry_error) => {
let _ = sink
.send(Err(
RecordingEntryError::ClipEntryError {
source: entry_error,
},
))
.await;
}
Ok(clip) => {
let mut second_start = start.second();
let mut second_end = end.second();
if minute > minute_start {
second_start = 0;
}
if minute < minute_end {
second_end = 59;
}
let seconds = second_start..=second_end;
let second = clip.second;
if !seconds.contains(&second) {
continue;
}
let mut microsecond_start = start.microsecond();
let mut microsecond_end = end.microsecond();
if second > second_start {
microsecond_start = 0;
}
if second < second_end {
microsecond_end = 999_999;
}
let microseconds =
microsecond_start..=microsecond_end;
let microsecond = clip.microsecond;
if !microseconds.contains(&microsecond) {
continue;
}
let recording = Recording {
year,
month,
day,
hour,
minute,
clip,
};
let _ = sink.send(Ok(recording)).await;
}
}
}
}
}
}
}
}
}
}
});
stream
}
}
impl RecordingManager {
pub fn between_in_vc(
&self,
start: UtcDateTime,
end: UtcDateTime,
guild_id: Id<GuildMarker>,
voice_channel_id: Id<ChannelMarker>,
) -> impl TryStream<Ok = Recording, Error = RecordingEntryError> + Unpin {
self.between(start, end).try_filter(move |recording| {
std::future::ready(
recording.clip.guild == guild_id
&& recording.clip.voice_channel == voice_channel_id,
)
})
}
}

110
src/recording_data/clip.rs Normal file
View File

@@ -0,0 +1,110 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{ResultExt as _, Snafu};
use std::{fmt::Display, str::FromStr};
use super::{
CreateListerSnafu, Day, Guild, Hour, ListError, Microsecond, Minute, Month, RecordingManager,
Second, User, VoiceChannel, Year, guild, microsecond, second, user, voice_channel,
};
#[derive(Debug, Clone)]
pub struct Clip {
pub second: Second,
pub microsecond: Microsecond,
pub guild: Guild,
pub voice_channel: VoiceChannel,
pub user: User,
}
#[derive(Debug, Snafu)]
pub enum TakeError {
/// could not parse the second out of the clip metadata
TakeSecondError { source: second::TakeError },
/// could not parse the microsecond out of the clip metadata
TakeMicrosecondError { source: microsecond::TakeError },
/// could not parse the guild out of the clip metadata
TakeGuildError { source: guild::TakeError },
/// could not parse the voice channel out of the clip metadata
TakeVoiceChannelError { source: voice_channel::TakeError },
/// could not parse the user out of the clip metadata
TakeUserError { source: user::TakeError },
}
pub fn take(s: &str) -> Result<Clip, TakeError> {
let (second, s) = second::take(s).context(TakeSecondSnafu)?;
let (microsecond, s) = microsecond::take(s).context(TakeMicrosecondSnafu)?;
let (guild, s) = guild::take(s).context(TakeGuildSnafu)?;
let (voice_channel, s) = voice_channel::take(s).context(TakeVoiceChannelSnafu)?;
let user = user::take(s).context(TakeUserSnafu)?;
Ok(Clip {
second,
microsecond,
guild,
voice_channel,
user,
})
}
impl FromStr for Clip {
type Err = TakeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
take(s)
}
}
impl Display for Clip {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
second,
microsecond,
guild,
voice_channel,
user,
} = self;
let user = user
.as_ref()
.map_or_else(|| "UNKNOWN".into(), ToString::to_string);
write!(
f,
"audio-{second}.{microsecond}-{guild}-{voice_channel}-{user}.wav"
)
}
}
#[derive(Debug, Snafu)]
pub enum ClipEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a clip
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn clips(
&self,
year: Year,
month: Month,
day: Day,
hour: Hour,
minute: Minute,
) -> Result<impl TryStream<Ok = Clip, Error = ClipEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/{day}/{hour}/{minute}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| ClipEntryError::ReceiveEntryError { source: error })
.and_then(|entry| std::future::ready(entry.name().parse().context(ParseSnafu))))
}
}

57
src/recording_data/day.rs Normal file
View File

@@ -0,0 +1,57 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, ListError, Month, RecordingManager, Year};
pub type Day = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// days are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the day as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(s: &str) -> Result<(Day, &str), TakeError> {
let (day, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let day = day.parse().context(ParseIntegerSnafu)?;
Ok((day, rest))
}
#[derive(Debug, Snafu)]
pub enum DayEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a day
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn days(
&self,
year: Year,
month: Month,
) -> Result<impl TryStream<Ok = Day, Error = DayEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| DayEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(day, _rest)| day)
.context(ParseSnafu),
)
}))
}
}

View File

@@ -0,0 +1,22 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::str::FromStr;
use twilight_model::id::{Id, marker::GuildMarker};
pub type Guild = Id<GuildMarker>;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// guilds are supposed to be followed by -
Malformed,
/// could not parse the guild ID
ParseIdError { source: <Guild as FromStr>::Err },
}
pub fn take(path: &str) -> Result<(Guild, &str), TakeError> {
let (guild, rest) = path.split_once('-').context(MalformedSnafu)?;
let guild = guild.parse().context(ParseIdSnafu)?;
Ok((guild, rest))
}

View File

@@ -0,0 +1,58 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, Day, ListError, Month, RecordingManager, Year};
pub type Hour = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// hours are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the hour as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(s: &str) -> Result<(Hour, &str), TakeError> {
let (hour, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let hour = hour.parse().context(ParseIntegerSnafu)?;
Ok((hour, rest))
}
#[derive(Debug, Snafu)]
pub enum HourEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a hour
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn hours(
&self,
year: Year,
month: Month,
day: Day,
) -> Result<impl TryStream<Ok = Hour, Error = HourEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/{day}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| HourEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(hour, _rest)| hour)
.context(ParseSnafu),
)
}))
}
}

View File

@@ -0,0 +1,21 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
pub type Microsecond = u32;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// microseconds are supposed to be followed by -
Malformed,
/// could not parse the microsecond as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<(Microsecond, &str), TakeError> {
let (microsecond, rest) = path.split_once('-').context(MalformedSnafu)?;
let microsecond = microsecond.parse().context(ParseIntegerSnafu)?;
Ok((microsecond, rest))
}

View File

@@ -0,0 +1,59 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, Day, Hour, ListError, Month, RecordingManager, Year};
pub type Minute = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// minutes are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the minute as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(s: &str) -> Result<(Minute, &str), TakeError> {
let (minute, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let minute = minute.parse().context(ParseIntegerSnafu)?;
Ok((minute, rest))
}
#[derive(Debug, Snafu)]
pub enum MinuteEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a minute
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn minutes(
&self,
year: Year,
month: Month,
day: Day,
hour: Hour,
) -> Result<impl TryStream<Ok = Minute, Error = MinuteEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/{month}/{day}/{hour}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| MinuteEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(minute, _rest)| minute)
.context(ParseSnafu),
)
}))
}
}

131
src/recording_data/mod.rs Normal file
View File

@@ -0,0 +1,131 @@
use std::{convert::Infallible, io::Cursor};
use hound::{SampleFormat, WavReader, WavSpec};
use opendal::Operator;
use snafu::Snafu;
mod between;
mod clip;
mod day;
mod guild;
mod hour;
mod microsecond;
mod minute;
mod month;
mod recording;
mod second;
mod user;
mod voice_channel;
mod year;
pub use between::RecordingEntryError;
pub use clip::{Clip, ClipEntryError};
use day::Day;
use guild::Guild;
use hour::Hour;
use microsecond::Microsecond;
use minute::Minute;
use month::Month;
pub use recording::Recording;
use second::Second;
use user::User;
use voice_channel::VoiceChannel;
use year::Year;
#[derive(Debug, Clone)]
pub struct RecordingManager {
operator: Operator,
}
impl RecordingManager {
pub fn new(operator: Operator) -> Self {
Self { operator }
}
}
#[derive(Debug)]
pub struct RecordingData {
pub channels: u16,
pub sample_rate: u32,
pub samples: Vec<i16>,
}
impl RecordingManager {
pub async fn write(
&self,
recording: &Recording,
RecordingData {
channels,
sample_rate,
samples,
}: RecordingData,
) -> Result<
(),
Infallible, // TODO: a real error type
> {
let wav_spec = WavSpec {
channels,
sample_rate,
bits_per_sample: 16,
sample_format: SampleFormat::Int,
};
let mut buffer = Vec::new();
let writer = Cursor::new(&mut buffer);
let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO");
let mut sample_writer = wav_writer.get_i16_writer(samples.len() as u32);
for sample in samples {
sample_writer.write_sample(sample);
}
sample_writer.flush().expect("TODO");
wav_writer.finalize().expect("TODO");
let path = recording.to_string();
self.operator.write(&path, buffer).await.expect("TODO");
Ok(())
}
}
impl RecordingManager {
pub async fn read(
&self,
recording: &Recording,
) -> Result<
RecordingData,
Infallible, // TODO: a real error type
> {
let path = recording.to_string();
let buffer = self.operator.read(&path).await.expect("TODO");
let bytes = buffer.to_bytes();
let cursor = Cursor::new(bytes);
let wav_reader = WavReader::new(cursor).expect("TODO");
let wav_spec = wav_reader.spec();
let channels = wav_spec.channels;
let sample_rate = wav_spec.sample_rate;
let samples = wav_reader.into_samples();
let samples = Result::from_iter(samples).expect("TODO");
Ok(RecordingData {
channels,
sample_rate,
samples,
})
}
}
#[derive(Debug, Snafu)]
pub enum ListError {
/// error creating a lister through the storage operator
CreateListerError { source: opendal::Error },
}

View File

@@ -0,0 +1,55 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use super::{CreateListerSnafu, ListError, RecordingManager, Year};
pub use time::Month;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// months are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the month as its name
ParseMonthNameError { source: time::error::InvalidVariant },
}
pub fn take(s: &str) -> Result<(Month, &str), TakeError> {
let (month, rest) = s.split_once('/').context(NotADirectorySnafu)?;
let month = month.parse().context(ParseMonthNameSnafu)?;
Ok((month, rest))
}
#[derive(Debug, Snafu)]
pub enum MonthEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a month
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn months(
&self,
year: Year,
) -> Result<impl TryStream<Ok = Month, Error = MonthEntryError> + Unpin, ListError> {
let lister = self
.operator
.lister(&format!("{year}/"))
.await
.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| MonthEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(month, _rest)| month)
.context(ParseSnafu),
)
}))
}
}

View File

@@ -0,0 +1,76 @@
use snafu::{ResultExt as _, Snafu};
use std::{fmt::Display, str::FromStr};
use super::{Clip, Day, Hour, Minute, Month, Year, clip, day, hour, minute, month, year};
#[derive(Debug, Clone)]
pub struct Recording {
pub year: Year,
pub month: Month,
pub day: Day,
pub hour: Hour,
pub minute: Minute,
pub clip: Clip,
}
#[derive(Debug, Snafu)]
pub enum TakeError {
/// could not parse the year out of the recording
TakeYearError { source: year::TakeError },
/// could not parse the month out of the recording
TakeMonthError { source: month::TakeError },
/// could not parse the day out of the recording
TakeDayError { source: day::TakeError },
/// could not parse the hour out of the recording
TakeHourError { source: hour::TakeError },
/// could not parse the minute out of the recording
TakeMinuteError { source: minute::TakeError },
/// could not parse the clip out of the recording
TakeClipError { source: clip::TakeError },
}
pub fn take(s: &str) -> Result<Recording, TakeError> {
let (year, s) = year::take(s).context(TakeYearSnafu)?;
let (month, s) = month::take(s).context(TakeMonthSnafu)?;
let (day, s) = day::take(s).context(TakeDaySnafu)?;
let (hour, s) = hour::take(s).context(TakeHourSnafu)?;
let (minute, s) = minute::take(s).context(TakeMinuteSnafu)?;
let clip = clip::take(s).context(TakeClipSnafu)?;
Ok(Recording {
year,
month,
day,
hour,
minute,
clip,
})
}
impl FromStr for Recording {
type Err = TakeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
take(s)
}
}
impl Display for Recording {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
year,
month,
day,
hour,
minute,
clip,
} = self;
write!(f, "{year}/{month}/{day}/{hour}/{minute}/{clip}")
}
}

View File

@@ -0,0 +1,25 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
pub type Second = u8;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// seconds are supposed to be preceded by audio-
MalformedPrefix,
/// seconds are supposed to be followed by .
MalformedSuffix,
/// could not parse the second as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<(Second, &str), TakeError> {
let (_prefix, path) = path.split_once("audio-").context(MalformedPrefixSnafu)?;
let (second, rest) = path.split_once('.').context(MalformedSuffixSnafu)?;
let second = second.parse().context(ParseIntegerSnafu)?;
Ok((second, rest))
}

View File

@@ -0,0 +1,25 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use twilight_model::id::{Id, marker::UserMarker};
pub type User = Option<Id<UserMarker>>;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// users are supposed to be terminated by .wav
Malformed,
/// could not parse the user ID
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<User, TakeError> {
let user = path.strip_suffix(".wav").context(MalformedSnafu)?;
let user = match user {
"UNKNOWN" => None,
user => Some(user.parse().context(ParseIntegerSnafu)?),
};
Ok(user)
}

View File

@@ -0,0 +1,25 @@
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::str::FromStr;
use twilight_model::id::Id;
use twilight_model::id::marker::ChannelMarker;
pub type VoiceChannel = Id<ChannelMarker>;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// voice channels are supposed to be followed by -
Malformed,
/// could not parse the voice channel ID
ParseIdError {
source: <VoiceChannel as FromStr>::Err,
},
}
pub fn take(path: &str) -> Result<(VoiceChannel, &str), TakeError> {
let (voice_channel, rest) = path.split_once('-').context(MalformedSnafu)?;
let voice_channel = voice_channel.parse().context(ParseIdSnafu)?;
Ok((voice_channel, rest))
}

View File

@@ -0,0 +1,51 @@
use futures::{TryStream, TryStreamExt as _};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use std::num::ParseIntError;
use super::{CreateListerSnafu, ListError, RecordingManager};
pub type Year = i32;
#[derive(Debug, Snafu)]
pub enum TakeError {
/// years are supposed to be directories, but this wasn't (because it didn't end with `/`)
NotADirectory,
/// could not parse the year as an integer
ParseIntegerError { source: ParseIntError },
}
pub fn take(path: &str) -> Result<(Year, &str), TakeError> {
let (year, rest) = path.split_once('/').context(NotADirectorySnafu)?;
let year = year.parse().context(ParseIntegerSnafu)?;
Ok((year, rest))
}
#[derive(Debug, Snafu)]
pub enum YearEntryError {
/// failed to get an entry from the storage operator's lister
ReceiveEntryError { source: opendal::Error },
/// failed to parse the entry as a year
ParseError { source: TakeError },
}
impl RecordingManager {
pub async fn years(
&self,
) -> Result<impl TryStream<Ok = Year, Error = YearEntryError> + Unpin, ListError> {
let lister = self.operator.lister("").await.context(CreateListerSnafu)?;
Ok(lister
.map_err(|error| YearEntryError::ReceiveEntryError { source: error })
.and_then(|entry| {
std::future::ready(
take(entry.name())
.map(|(year, _rest)| year)
.context(ParseSnafu),
)
}))
}
}

98
src/render_data.rs Normal file
View File

@@ -0,0 +1,98 @@
use std::{convert::Infallible, fmt::Display, io::Cursor};
use hound::{SampleFormat, WavSpec};
use opendal::Operator;
use time::{UtcDateTime, format_description::StaticFormatDescription, macros::format_description};
use twilight_model::id::{
Id,
marker::{ChannelMarker, GuildMarker},
};
#[derive(Debug, Clone)]
pub struct RenderManager {
operator: Operator,
}
impl RenderManager {
pub fn new(operator: Operator) -> Self {
Self { operator }
}
}
#[derive(Debug, Clone)]
pub struct Render {
pub start: UtcDateTime,
pub end: UtcDateTime,
pub guild_id: Id<GuildMarker>,
pub voice_channel_id: Id<ChannelMarker>,
}
const DATE_FORMAT: StaticFormatDescription =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second].[subsecond]Z");
impl Display for Render {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
start,
end,
guild_id,
voice_channel_id,
} = self;
let start = start.format(&DATE_FORMAT).unwrap();
let end = end.format(&DATE_FORMAT).unwrap();
write!(f, "{guild_id}/{voice_channel_id}/{start}_{end}.wav")
}
}
#[derive(Debug)]
pub struct RenderData {
pub channels: opus2::Channels,
pub sample_rate: u32,
pub samples: Vec<i16>,
}
impl RenderManager {
pub async fn write(
&self,
render: &Render,
RenderData {
channels,
sample_rate,
samples,
}: RenderData,
) -> Result<
(),
Infallible, // TODO: a real error type
> {
let channels = channels as u16;
let wav_spec = WavSpec {
channels,
sample_rate,
bits_per_sample: 16,
sample_format: SampleFormat::Int,
};
let mut buffer = Vec::new();
let writer = Cursor::new(&mut buffer);
let mut wav_writer = hound::WavWriter::new(writer, wav_spec).expect("TODO");
let mut sample_writer = wav_writer.get_i16_writer(samples.len() as u32);
for sample in samples {
sample_writer.write_sample(sample);
}
sample_writer.flush().expect("TODO");
wav_writer.finalize().expect("TODO");
let path = render.to_string();
self.operator.write(&path, buffer).await.expect("TODO");
Ok(())
}
}

View File

@@ -20,17 +20,19 @@ pub type VCsInGuild =
pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>; pub type VCs = BTreeMap<Id<GuildMarker>, VCsInGuild>;
pub type VCsSender = watch::Sender<VCs>; pub type VCsSender = watch::Sender<VCs>;
#[tracing::instrument(skip(discord_client), ret)] #[tracing::instrument(skip(vcs_sender, discord_client))]
async fn initialize_user_in_vc( async fn initialize_user_in_vc(
vcs_sender: &VCsSender,
discord_client: &twilight_http::Client, discord_client: &twilight_http::Client,
guild_id: Id<GuildMarker>, guild_id: Id<GuildMarker>,
user_id: Id<UserMarker>, user_id: Id<UserMarker>,
) -> Option<(Id<ChannelMarker>, UserInVCData)> { ) {
if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await if let Ok(voice_state_res) = discord_client.user_voice_state(guild_id, user_id).await
&& let Ok(voice_state) = voice_state_res.model().await && let Ok(voice_state) = voice_state_res.model().await
{ {
tracing::info!(?user_id, ?voice_state); tracing::info!(?user_id, ?voice_state);
if let Some(voice_channel_id) = voice_state.channel_id {
let voice_status = VoiceStatus::builder() let voice_status = VoiceStatus::builder()
.self_deafened(voice_state.self_deaf) .self_deafened(voice_state.self_deaf)
.self_muted(voice_state.self_mute) .self_muted(voice_state.self_mute)
@@ -41,55 +43,46 @@ async fn initialize_user_in_vc(
.build(); .build();
let user_in_vc_data = voice_status.into(); let user_in_vc_data = voice_status.into();
voice_state vcs_sender.send_modify(|vcs| {
.channel_id vcs.entry(guild_id)
.map(|channel_id| (channel_id, user_in_vc_data)) .or_default()
} else { .insert(voice_channel_id, user_id, user_in_vc_data);
None // TODO });
}
} }
} }
#[tracing::instrument(skip(discord_client), ret)] #[tracing::instrument(skip(vcs_sender, discord_client))]
async fn initialize_server_vcs( async fn initialize_server_vcs(
vcs_sender: &VCsSender,
discord_client: &twilight_http::Client, discord_client: &twilight_http::Client,
id: Id<GuildMarker>, id: Id<GuildMarker>,
) -> VCsInGuild { ) {
if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await if let Ok(guild_members_res) = discord_client.guild_members(id).limit(999).await
&& let Ok(guild_members) = guild_members_res.model().await && let Ok(guild_members) = guild_members_res.model().await
{ {
FuturesUnordered::from_iter(guild_members.into_iter().map(|member| async move { FuturesUnordered::from_iter(
( guild_members.into_iter().map(|member| {
member.user.id, initialize_user_in_vc(vcs_sender, discord_client, id, member.user.id)
initialize_user_in_vc(discord_client, id, member.user.id).await, }),
)
}))
.filter_map(
|(user_id, channel_id_and_user_in_vc_data_option)| async move {
channel_id_and_user_in_vc_data_option
.map(|(channel_id, user_in_vc_data)| (channel_id, user_id, user_in_vc_data))
},
) )
.collect() .collect()
.await .await
} else {
Default::default()
} }
} }
#[tracing::instrument(skip(discord_client), ret)] #[tracing::instrument(skip(vcs_sender, discord_client))]
pub async fn initialize_vcs(discord_client: &twilight_http::Client) -> VCs { pub async fn initialize_vcs(vcs_sender: &VCsSender, discord_client: &twilight_http::Client) {
if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await if let Ok(guilds_res) = discord_client.current_user_guilds().limit(200).await
&& let Ok(guilds) = guilds_res.model().await && let Ok(guilds) = guilds_res.model().await
{ {
FuturesUnordered::from_iter(guilds.into_iter().map(|guild| async move { FuturesUnordered::from_iter(
let guild_vcs = initialize_server_vcs(discord_client, guild.id).await; guilds
.into_iter()
(guild.id, guild_vcs) .map(|guild| initialize_server_vcs(vcs_sender, discord_client, guild.id)),
})) )
.collect() .collect()
.await .await
} else {
Default::default()
} }
} }

View File

@@ -14,12 +14,12 @@ use crate::{OperatorExt, option_ext::OptionExt as _, user_capnp};
pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true; pub const RECORD_IF_CONSENT_UNSPECIFIED: bool = true;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UserDataManager { pub struct UserManager {
operator: Operator, operator: Operator,
cache: Cache<Id<UserMarker>, Bytes>, cache: Cache<Id<UserMarker>, Bytes>,
} }
impl UserDataManager { impl UserManager {
pub fn new(operator: Operator) -> Self { pub fn new(operator: Operator) -> Self {
Self { Self {
operator, operator,
@@ -83,7 +83,7 @@ pub enum EntryError {
ParsePathError { source: ParsePathError }, ParsePathError { source: ParsePathError },
} }
impl UserDataManager { impl UserManager {
pub async fn list( pub async fn list(
&self, &self,
) -> Result<impl TryStream<Ok = Id<UserMarker>, Error = EntryError> + Unpin, ListError> { ) -> Result<impl TryStream<Ok = Id<UserMarker>, Error = EntryError> + Unpin, ListError> {
@@ -111,7 +111,7 @@ pub enum ReadError {
DecompressionError { source: std::io::Error }, DecompressionError { source: std::io::Error },
} }
impl UserDataManager { impl UserManager {
async fn read(&self, id: Id<UserMarker>) -> Result<Bytes, ReadError> { async fn read(&self, id: Id<UserMarker>) -> Result<Bytes, ReadError> {
self.cache self.cache
.try_get_with::<_, ReadError>(id, async { .try_get_with::<_, ReadError>(id, async {
@@ -168,7 +168,7 @@ pub enum WriteError {
FinalizeError { source: std::io::Error }, FinalizeError { source: std::io::Error },
} }
impl UserDataManager { impl UserManager {
async fn write(&self, id: Id<UserMarker>, bytes: Bytes) -> Result<(), WriteError> { async fn write(&self, id: Id<UserMarker>, bytes: Bytes) -> Result<(), WriteError> {
let path = path(id); let path = path(id);
@@ -207,7 +207,7 @@ pub enum WithError {
DeserializeError { source: capnp::Error }, DeserializeError { source: capnp::Error },
} }
impl UserDataManager { impl UserManager {
pub async fn with<R>( pub async fn with<R>(
&self, &self,
id: Id<UserMarker>, id: Id<UserMarker>,
@@ -245,7 +245,7 @@ pub enum UpdateError {
WriteError { source: WriteError }, WriteError { source: WriteError },
} }
impl UserDataManager { impl UserManager {
pub async fn update<R>( pub async fn update<R>(
&self, &self,
id: Id<UserMarker>, id: Id<UserMarker>,