diff --git a/emitter-and-signal/Cargo.toml b/emitter-and-signal/Cargo.toml index 27bde7c..ee4560c 100644 --- a/emitter-and-signal/Cargo.toml +++ b/emitter-and-signal/Cargo.toml @@ -7,4 +7,5 @@ license = { workspace = true } [dependencies] deranged = { workspace = true } ext-trait = { workspace = true } +snafu = { workspace = true } tokio = { workspace = true, features = ["sync"] } diff --git a/emitter-and-signal/src/emitter.rs b/emitter-and-signal/src/emitter.rs index 5e2251f..fa58d3b 100644 --- a/emitter-and-signal/src/emitter.rs +++ b/emitter-and-signal/src/emitter.rs @@ -1,12 +1,9 @@ -use std::{future::Future, num::NonZero}; - -use deranged::RangedUsize; -use tokio::{ - sync::{broadcast, mpsc}, - task::JoinHandle, -}; - use super::ProducerExited; +use deranged::RangedUsize; +use snafu::Snafu; +use std::{future::Future, num::NonZero}; +use tokio::sync::{broadcast, mpsc}; +pub use tokio::task::JoinError; #[derive(Debug)] pub struct Publisher { @@ -47,7 +44,7 @@ impl Emitter { pub fn new( producer: impl FnOnce(PublisherStream) -> Fut, capacity: Capacity, - ) -> (Self, JoinHandle) + ) -> (Self, impl Future>) where Fut: Future + Send + 'static, T: Clone, @@ -93,8 +90,12 @@ pub struct Subscription { receiver: broadcast::Receiver, } +#[derive(Debug, Clone, Snafu)] pub enum NextError { - ProducerExited(ProducerExited), + /// the producer backing this emitter exited + ProducerExited { source: ProducerExited }, + + /// the broadcast channel underlying this emitter lagged and {skipped_events} events were skipped Lagged { skipped_events: NonZero }, } @@ -104,7 +105,9 @@ impl Subscription { T: Clone, { self.receiver.recv().await.map_err(|err| match err { - broadcast::error::RecvError::Closed => NextError::ProducerExited(ProducerExited), + broadcast::error::RecvError::Closed => NextError::ProducerExited { + source: ProducerExited, + }, broadcast::error::RecvError::Lagged(skipped_events) => NextError::Lagged { skipped_events: skipped_events .try_into() diff --git a/emitter-and-signal/src/emitter_ext.rs b/emitter-and-signal/src/emitter_ext.rs index 529e505..a555daa 100644 --- a/emitter-and-signal/src/emitter_ext.rs +++ b/emitter-and-signal/src/emitter_ext.rs @@ -1,11 +1,17 @@ -use ext_trait::extension; -use tokio::{select, task::JoinHandle}; +use std::future::Future; -use super::emitter::{Capacity, Emitter, NextError}; +use ext_trait::extension; +use tokio::select; + +use super::emitter::{Capacity, Emitter, JoinError, NextError}; #[extension(pub trait EmitterExt)] impl Emitter { - fn map(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + fn map( + self, + mut func: F, + capacity: Capacity, + ) -> (Emitter, impl Future>) where T: Send + 'static + Clone, M: Send + 'static + Clone, @@ -28,7 +34,7 @@ impl Emitter { match event_res { Ok(event) => publisher.publish(func(event)), Err(NextError::Lagged { .. }) => {}, - Err(NextError::ProducerExited(_)) => return, + Err(NextError::ProducerExited { .. }) => return, } } } @@ -39,7 +45,11 @@ impl Emitter { ) } - fn filter(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + fn filter( + self, + mut func: F, + capacity: Capacity, + ) -> (Emitter, impl Future>) where T: Send + 'static + Clone, F: Send + 'static + FnMut(&T) -> bool, @@ -63,7 +73,7 @@ impl Emitter { publisher.publish(event) }, Err(NextError::Lagged { .. }) => {}, - Err(NextError::ProducerExited(_)) => return, + Err(NextError::ProducerExited { .. }) => return, } } } @@ -74,7 +84,11 @@ impl Emitter { ) } - fn filter_mut(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + fn filter_mut( + self, + mut func: F, + capacity: Capacity, + ) -> (Emitter, impl Future>) where T: Send + 'static + Clone, F: Send + 'static + FnMut(&mut T) -> bool, @@ -98,7 +112,7 @@ impl Emitter { publisher.publish(event) }, Err(NextError::Lagged { .. }) => {}, - Err(NextError::ProducerExited(_)) => return, + Err(NextError::ProducerExited { .. }) => return, } } } @@ -109,7 +123,11 @@ impl Emitter { ) } - fn filter_map(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + fn filter_map( + self, + mut func: F, + capacity: Capacity, + ) -> (Emitter, impl Future>) where T: Send + 'static + Clone, M: Send + 'static + Clone, @@ -134,7 +152,7 @@ impl Emitter { publisher.publish(mapped) }, Err(NextError::Lagged { .. }) => {}, - Err(NextError::ProducerExited(_)) => return, + Err(NextError::ProducerExited { .. }) => return, } } } diff --git a/emitter-and-signal/src/lib.rs b/emitter-and-signal/src/lib.rs index 28de249..ff38fea 100644 --- a/emitter-and-signal/src/lib.rs +++ b/emitter-and-signal/src/lib.rs @@ -1,10 +1,15 @@ -pub mod emitter; -mod emitter_ext; -pub mod signal; -mod signal_ext; +use snafu::Snafu; +pub mod emitter; +pub mod emitter_ext; +pub mod signal; +pub mod signal_ext; + +pub use emitter::Emitter; pub use emitter_ext::EmitterExt; +pub use signal::Signal; pub use signal_ext::SignalExt; -#[derive(Debug, Clone, Copy)] +/// the producer backing this [`Signal`] or [`Emitter`] exited +#[derive(Debug, Clone, Copy, Snafu)] pub struct ProducerExited; diff --git a/emitter-and-signal/src/signal.rs b/emitter-and-signal/src/signal.rs index 792258a..46f90ec 100644 --- a/emitter-and-signal/src/signal.rs +++ b/emitter-and-signal/src/signal.rs @@ -3,6 +3,8 @@ use std::future::Future; use tokio::sync::{mpsc, watch}; pub use tokio::task::JoinError; +use crate::ProducerExited; + #[derive(Debug)] pub struct Publisher { sender: watch::Sender, @@ -87,9 +89,6 @@ pub struct Subscription { receiver: watch::Receiver, } -#[derive(Debug, Clone, Copy)] -pub struct ProducerExited; - impl Subscription { pub async fn changed(&mut self) -> Result<(), ProducerExited> { self.receiver.changed().await.map_err(|_| ProducerExited) diff --git a/emitter-and-signal/src/signal_ext.rs b/emitter-and-signal/src/signal_ext.rs index f28b1ce..e873e7f 100644 --- a/emitter-and-signal/src/signal_ext.rs +++ b/emitter-and-signal/src/signal_ext.rs @@ -1,6 +1,55 @@ -use ext_trait::extension; +use std::future::Future; -use super::signal::Signal; +use ext_trait::extension; +use snafu::{ResultExt, Snafu}; +use tokio::select; + +use crate::ProducerExited; + +use super::signal::{JoinError, Signal}; + +#[derive(Debug, Snafu)] +pub struct ProducerAlreadyExited { + source: ProducerExited, +} #[extension(pub trait SignalExt)] -impl Signal {} +impl Signal { + fn map( + self, + mut func: F, + ) -> Result<(Signal, impl Future>), ProducerAlreadyExited> + where + T: 'static + Sync + Send + Clone, + M: 'static + Sync + Send + Clone, + F: 'static + Send + FnMut(T) -> M, + { + let initial = func(self.subscribe().context(ProducerAlreadyExitedSnafu)?.get()); + + Ok(Signal::new(initial, |mut publisher_stream| async move { + while let Some(publisher) = publisher_stream.wait().await { + let Ok(mut subscription) = self.subscribe() else { + return; + }; + + loop { + select! { + biased; + _ = publisher.all_unsubscribed() => { + break; + } + changed_res = subscription.changed() => { + match changed_res { + Ok(()) => { + let value = subscription.get(); + publisher.publish(func(value)) + }, + Err(ProducerExited) => return, + } + } + } + } + } + })) + } +}