diff --git a/emitter-and-signal/Cargo.toml b/emitter-and-signal/Cargo.toml new file mode 100644 index 0000000..70ca82f --- /dev/null +++ b/emitter-and-signal/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "emitter-and-signal" +version = "0.1.0" +edition = "2021" + +[dependencies] +deranged = { workspace = true } +ext-trait = "2.0.0" +tokio = { workspace = true, features = ["sync"] } diff --git a/emitter-and-signal/src/emitter.rs b/emitter-and-signal/src/emitter.rs new file mode 100644 index 0000000..5e2251f --- /dev/null +++ b/emitter-and-signal/src/emitter.rs @@ -0,0 +1,115 @@ +use std::{future::Future, num::NonZero}; + +use deranged::RangedUsize; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; + +use super::ProducerExited; + +#[derive(Debug)] +pub struct Publisher { + sender: broadcast::Sender, +} + +impl Publisher { + pub async fn all_unsubscribed(&self) { + self.sender.closed().await + } + + pub fn publish(&self, event: T) { + let _ = self.sender.send(event); + } +} + +#[derive(Debug)] +pub struct PublisherStream { + receiver: mpsc::Receiver>, +} + +impl PublisherStream { + /// Returns `None` when no more subscriptions can ever be made + pub async fn wait(&mut self) -> Option> { + self.receiver.recv().await + } +} + +#[derive(Debug, Clone)] +pub struct Emitter { + sender: broadcast::Sender, + publisher_sender: mpsc::Sender>, +} + +pub type Capacity = RangedUsize<1, { usize::MAX / 2 }>; + +impl Emitter { + pub fn new( + producer: impl FnOnce(PublisherStream) -> Fut, + capacity: Capacity, + ) -> (Self, JoinHandle) + where + Fut: Future + Send + 'static, + T: Clone, + R: Send + 'static, + { + let (sender, _) = broadcast::channel(capacity.get()); + + let (publisher_sender, publisher_receiver) = mpsc::channel(1); + + let publisher_stream = PublisherStream { + receiver: publisher_receiver, + }; + + let producer_join_handle = tokio::spawn(producer(publisher_stream)); + + ( + Self { + publisher_sender, + sender, + }, + producer_join_handle, + ) + } + + pub fn listen(&self) -> Result, ProducerExited> { + let receiver = self.sender.subscribe(); + + if self.sender.receiver_count() == 1 { + if let Err(mpsc::error::TrySendError::Closed(_)) = + self.publisher_sender.try_send(Publisher { + sender: self.sender.clone(), + }) + { + return Err(ProducerExited); + } + } + + Ok(Subscription { receiver }) + } +} + +pub struct Subscription { + receiver: broadcast::Receiver, +} + +pub enum NextError { + ProducerExited(ProducerExited), + Lagged { skipped_events: NonZero }, +} + +impl Subscription { + pub async fn next(&mut self) -> Result + where + T: Clone, + { + self.receiver.recv().await.map_err(|err| match err { + broadcast::error::RecvError::Closed => NextError::ProducerExited(ProducerExited), + broadcast::error::RecvError::Lagged(skipped_events) => NextError::Lagged { + skipped_events: skipped_events + .try_into() + .expect("lagging 0 events should be impossible"), + }, + }) + } +} diff --git a/emitter-and-signal/src/emitter_ext.rs b/emitter-and-signal/src/emitter_ext.rs new file mode 100644 index 0000000..529e505 --- /dev/null +++ b/emitter-and-signal/src/emitter_ext.rs @@ -0,0 +1,147 @@ +use ext_trait::extension; +use tokio::{select, task::JoinHandle}; + +use super::emitter::{Capacity, Emitter, NextError}; + +#[extension(pub trait EmitterExt)] +impl Emitter { + fn map(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + where + T: Send + 'static + Clone, + M: Send + 'static + Clone, + F: Send + 'static + FnMut(T) -> M, + { + Emitter::new( + |mut publisher_stream| async move { + while let Some(publisher) = publisher_stream.wait().await { + let Ok(mut subscription) = self.listen() else { + return; + }; + + loop { + select! { + biased; + _ = publisher.all_unsubscribed() => { + break; + } + event_res = subscription.next() => { + match event_res { + Ok(event) => publisher.publish(func(event)), + Err(NextError::Lagged { .. }) => {}, + Err(NextError::ProducerExited(_)) => return, + } + } + } + } + } + }, + capacity, + ) + } + + fn filter(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + where + T: Send + 'static + Clone, + F: Send + 'static + FnMut(&T) -> bool, + { + Emitter::new( + |mut publisher_stream| async move { + while let Some(publisher) = publisher_stream.wait().await { + let Ok(mut subscription) = self.listen() else { + return; + }; + + loop { + select! { + biased; + _ = publisher.all_unsubscribed() => { + break; + } + event_res = subscription.next() => { + match event_res { + Ok(event) => if func(&event) { + publisher.publish(event) + }, + Err(NextError::Lagged { .. }) => {}, + Err(NextError::ProducerExited(_)) => return, + } + } + } + } + } + }, + capacity, + ) + } + + fn filter_mut(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + where + T: Send + 'static + Clone, + F: Send + 'static + FnMut(&mut T) -> bool, + { + Emitter::new( + |mut publisher_stream| async move { + while let Some(publisher) = publisher_stream.wait().await { + let Ok(mut subscription) = self.listen() else { + return; + }; + + loop { + select! { + biased; + _ = publisher.all_unsubscribed() => { + break; + } + event_res = subscription.next() => { + match event_res { + Ok(mut event) => if func(&mut event) { + publisher.publish(event) + }, + Err(NextError::Lagged { .. }) => {}, + Err(NextError::ProducerExited(_)) => return, + } + } + } + } + } + }, + capacity, + ) + } + + fn filter_map(self, mut func: F, capacity: Capacity) -> (Emitter, JoinHandle<()>) + where + T: Send + 'static + Clone, + M: Send + 'static + Clone, + F: Send + 'static + FnMut(T) -> Option, + { + Emitter::new( + |mut publisher_stream| async move { + while let Some(publisher) = publisher_stream.wait().await { + let Ok(mut subscription) = self.listen() else { + return; + }; + + loop { + select! { + biased; + _ = publisher.all_unsubscribed() => { + break; + } + event_res = subscription.next() => { + match event_res { + Ok(event) => if let Some(mapped) = func(event) { + publisher.publish(mapped) + }, + Err(NextError::Lagged { .. }) => {}, + Err(NextError::ProducerExited(_)) => return, + } + } + } + } + } + }, + capacity, + ) + } +} diff --git a/emitter-and-signal/src/lib.rs b/emitter-and-signal/src/lib.rs new file mode 100644 index 0000000..28de249 --- /dev/null +++ b/emitter-and-signal/src/lib.rs @@ -0,0 +1,10 @@ +pub mod emitter; +mod emitter_ext; +pub mod signal; +mod signal_ext; + +pub use emitter_ext::EmitterExt; +pub use signal_ext::SignalExt; + +#[derive(Debug, Clone, Copy)] +pub struct ProducerExited; diff --git a/emitter-and-signal/src/signal.rs b/emitter-and-signal/src/signal.rs new file mode 100644 index 0000000..792258a --- /dev/null +++ b/emitter-and-signal/src/signal.rs @@ -0,0 +1,116 @@ +use std::future::Future; + +use tokio::sync::{mpsc, watch}; +pub use tokio::task::JoinError; + +#[derive(Debug)] +pub struct Publisher { + sender: watch::Sender, +} + +impl Publisher { + pub async fn all_unsubscribed(&self) { + self.sender.closed().await + } + + pub fn publish(&self, value: T) { + self.sender.send_replace(value); + } + + pub fn publish_with bool>(&self, maybe_modify: F) { + self.sender.send_if_modified(maybe_modify); + } +} + +#[derive(Debug)] +pub struct PublisherStream { + receiver: mpsc::Receiver>, +} + +impl PublisherStream { + /// Returns `None` when no more subscriptions can ever be made + pub async fn wait(&mut self) -> Option> { + self.receiver.recv().await + } +} + +#[derive(Debug)] +pub struct Signal { + sender: watch::Sender, + publisher_sender: mpsc::Sender>, +} + +impl Signal { + pub fn new + Send + 'static>( + initial: T, + producer: impl FnOnce(PublisherStream) -> Fut, + ) -> (Self, impl Future>) + where + R: Send + 'static, + { + let (sender, _) = watch::channel(initial); + let (publisher_sender, publisher_receiver) = mpsc::channel(1); + + let publisher_stream = PublisherStream { + receiver: publisher_receiver, + }; + + let producer_join_handle = tokio::spawn(producer(publisher_stream)); + + ( + Self { + publisher_sender, + sender, + }, + producer_join_handle, + ) + } + + pub fn subscribe(&self) -> Result, ProducerExited> { + let receiver = self.sender.subscribe(); + + if self.sender.receiver_count() == 1 { + if let Err(mpsc::error::TrySendError::Closed(_)) = + self.publisher_sender.try_send(Publisher { + sender: self.sender.clone(), + }) + { + return Err(ProducerExited); + } + } + + Ok(Subscription { receiver }) + } +} + +pub struct Subscription { + receiver: watch::Receiver, +} + +#[derive(Debug, Clone, Copy)] +pub struct ProducerExited; + +impl Subscription { + pub async fn changed(&mut self) -> Result<(), ProducerExited> { + self.receiver.changed().await.map_err(|_| ProducerExited) + } + + pub fn get(&mut self) -> T::Owned + where + T: ToOwned, + { + self.receiver.borrow_and_update().to_owned() + } + + pub async fn for_each>(mut self, mut func: impl FnMut(T::Owned) -> Fut) + where + T: ToOwned, + { + loop { + func(self.get()).await; + if self.changed().await.is_err() { + return; + } + } + } +} diff --git a/emitter-and-signal/src/signal_ext.rs b/emitter-and-signal/src/signal_ext.rs new file mode 100644 index 0000000..f28b1ce --- /dev/null +++ b/emitter-and-signal/src/signal_ext.rs @@ -0,0 +1,6 @@ +use ext_trait::extension; + +use super::signal::Signal; + +#[extension(pub trait SignalExt)] +impl Signal {}