chore: extract Emitter
and Signal
to their own crate
This commit is contained in:
9
emitter-and-signal/Cargo.toml
Normal file
9
emitter-and-signal/Cargo.toml
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
[package]
|
||||||
|
name = "emitter-and-signal"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
deranged = { workspace = true }
|
||||||
|
ext-trait = "2.0.0"
|
||||||
|
tokio = { workspace = true, features = ["sync"] }
|
115
emitter-and-signal/src/emitter.rs
Normal file
115
emitter-and-signal/src/emitter.rs
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
use std::{future::Future, num::NonZero};
|
||||||
|
|
||||||
|
use deranged::RangedUsize;
|
||||||
|
use tokio::{
|
||||||
|
sync::{broadcast, mpsc},
|
||||||
|
task::JoinHandle,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::ProducerExited;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Publisher<T> {
|
||||||
|
sender: broadcast::Sender<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Publisher<T> {
|
||||||
|
pub async fn all_unsubscribed(&self) {
|
||||||
|
self.sender.closed().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn publish(&self, event: T) {
|
||||||
|
let _ = self.sender.send(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PublisherStream<T> {
|
||||||
|
receiver: mpsc::Receiver<Publisher<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> PublisherStream<T> {
|
||||||
|
/// Returns `None` when no more subscriptions can ever be made
|
||||||
|
pub async fn wait(&mut self) -> Option<Publisher<T>> {
|
||||||
|
self.receiver.recv().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Emitter<T> {
|
||||||
|
sender: broadcast::Sender<T>,
|
||||||
|
publisher_sender: mpsc::Sender<Publisher<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Capacity = RangedUsize<1, { usize::MAX / 2 }>;
|
||||||
|
|
||||||
|
impl<T> Emitter<T> {
|
||||||
|
pub fn new<R, Fut>(
|
||||||
|
producer: impl FnOnce(PublisherStream<T>) -> Fut,
|
||||||
|
capacity: Capacity,
|
||||||
|
) -> (Self, JoinHandle<R>)
|
||||||
|
where
|
||||||
|
Fut: Future<Output = R> + Send + 'static,
|
||||||
|
T: Clone,
|
||||||
|
R: Send + 'static,
|
||||||
|
{
|
||||||
|
let (sender, _) = broadcast::channel(capacity.get());
|
||||||
|
|
||||||
|
let (publisher_sender, publisher_receiver) = mpsc::channel(1);
|
||||||
|
|
||||||
|
let publisher_stream = PublisherStream {
|
||||||
|
receiver: publisher_receiver,
|
||||||
|
};
|
||||||
|
|
||||||
|
let producer_join_handle = tokio::spawn(producer(publisher_stream));
|
||||||
|
|
||||||
|
(
|
||||||
|
Self {
|
||||||
|
publisher_sender,
|
||||||
|
sender,
|
||||||
|
},
|
||||||
|
producer_join_handle,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn listen(&self) -> Result<Subscription<T>, ProducerExited> {
|
||||||
|
let receiver = self.sender.subscribe();
|
||||||
|
|
||||||
|
if self.sender.receiver_count() == 1 {
|
||||||
|
if let Err(mpsc::error::TrySendError::Closed(_)) =
|
||||||
|
self.publisher_sender.try_send(Publisher {
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
})
|
||||||
|
{
|
||||||
|
return Err(ProducerExited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Subscription { receiver })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Subscription<T> {
|
||||||
|
receiver: broadcast::Receiver<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum NextError {
|
||||||
|
ProducerExited(ProducerExited),
|
||||||
|
Lagged { skipped_events: NonZero<u64> },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Subscription<T> {
|
||||||
|
pub async fn next(&mut self) -> Result<T, NextError>
|
||||||
|
where
|
||||||
|
T: Clone,
|
||||||
|
{
|
||||||
|
self.receiver.recv().await.map_err(|err| match err {
|
||||||
|
broadcast::error::RecvError::Closed => NextError::ProducerExited(ProducerExited),
|
||||||
|
broadcast::error::RecvError::Lagged(skipped_events) => NextError::Lagged {
|
||||||
|
skipped_events: skipped_events
|
||||||
|
.try_into()
|
||||||
|
.expect("lagging 0 events should be impossible"),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
147
emitter-and-signal/src/emitter_ext.rs
Normal file
147
emitter-and-signal/src/emitter_ext.rs
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
use ext_trait::extension;
|
||||||
|
use tokio::{select, task::JoinHandle};
|
||||||
|
|
||||||
|
use super::emitter::{Capacity, Emitter, NextError};
|
||||||
|
|
||||||
|
#[extension(pub trait EmitterExt)]
|
||||||
|
impl<T> Emitter<T> {
|
||||||
|
fn map<M, F>(self, mut func: F, capacity: Capacity) -> (Emitter<M>, JoinHandle<()>)
|
||||||
|
where
|
||||||
|
T: Send + 'static + Clone,
|
||||||
|
M: Send + 'static + Clone,
|
||||||
|
F: Send + 'static + FnMut(T) -> M,
|
||||||
|
{
|
||||||
|
Emitter::new(
|
||||||
|
|mut publisher_stream| async move {
|
||||||
|
while let Some(publisher) = publisher_stream.wait().await {
|
||||||
|
let Ok(mut subscription) = self.listen() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
biased;
|
||||||
|
_ = publisher.all_unsubscribed() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
event_res = subscription.next() => {
|
||||||
|
match event_res {
|
||||||
|
Ok(event) => publisher.publish(func(event)),
|
||||||
|
Err(NextError::Lagged { .. }) => {},
|
||||||
|
Err(NextError::ProducerExited(_)) => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
capacity,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filter<F>(self, mut func: F, capacity: Capacity) -> (Emitter<T>, JoinHandle<()>)
|
||||||
|
where
|
||||||
|
T: Send + 'static + Clone,
|
||||||
|
F: Send + 'static + FnMut(&T) -> bool,
|
||||||
|
{
|
||||||
|
Emitter::new(
|
||||||
|
|mut publisher_stream| async move {
|
||||||
|
while let Some(publisher) = publisher_stream.wait().await {
|
||||||
|
let Ok(mut subscription) = self.listen() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
biased;
|
||||||
|
_ = publisher.all_unsubscribed() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
event_res = subscription.next() => {
|
||||||
|
match event_res {
|
||||||
|
Ok(event) => if func(&event) {
|
||||||
|
publisher.publish(event)
|
||||||
|
},
|
||||||
|
Err(NextError::Lagged { .. }) => {},
|
||||||
|
Err(NextError::ProducerExited(_)) => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
capacity,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filter_mut<F>(self, mut func: F, capacity: Capacity) -> (Emitter<T>, JoinHandle<()>)
|
||||||
|
where
|
||||||
|
T: Send + 'static + Clone,
|
||||||
|
F: Send + 'static + FnMut(&mut T) -> bool,
|
||||||
|
{
|
||||||
|
Emitter::new(
|
||||||
|
|mut publisher_stream| async move {
|
||||||
|
while let Some(publisher) = publisher_stream.wait().await {
|
||||||
|
let Ok(mut subscription) = self.listen() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
biased;
|
||||||
|
_ = publisher.all_unsubscribed() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
event_res = subscription.next() => {
|
||||||
|
match event_res {
|
||||||
|
Ok(mut event) => if func(&mut event) {
|
||||||
|
publisher.publish(event)
|
||||||
|
},
|
||||||
|
Err(NextError::Lagged { .. }) => {},
|
||||||
|
Err(NextError::ProducerExited(_)) => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
capacity,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filter_map<M, F>(self, mut func: F, capacity: Capacity) -> (Emitter<M>, JoinHandle<()>)
|
||||||
|
where
|
||||||
|
T: Send + 'static + Clone,
|
||||||
|
M: Send + 'static + Clone,
|
||||||
|
F: Send + 'static + FnMut(T) -> Option<M>,
|
||||||
|
{
|
||||||
|
Emitter::new(
|
||||||
|
|mut publisher_stream| async move {
|
||||||
|
while let Some(publisher) = publisher_stream.wait().await {
|
||||||
|
let Ok(mut subscription) = self.listen() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
biased;
|
||||||
|
_ = publisher.all_unsubscribed() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
event_res = subscription.next() => {
|
||||||
|
match event_res {
|
||||||
|
Ok(event) => if let Some(mapped) = func(event) {
|
||||||
|
publisher.publish(mapped)
|
||||||
|
},
|
||||||
|
Err(NextError::Lagged { .. }) => {},
|
||||||
|
Err(NextError::ProducerExited(_)) => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
capacity,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
10
emitter-and-signal/src/lib.rs
Normal file
10
emitter-and-signal/src/lib.rs
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
pub mod emitter;
|
||||||
|
mod emitter_ext;
|
||||||
|
pub mod signal;
|
||||||
|
mod signal_ext;
|
||||||
|
|
||||||
|
pub use emitter_ext::EmitterExt;
|
||||||
|
pub use signal_ext::SignalExt;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct ProducerExited;
|
116
emitter-and-signal/src/signal.rs
Normal file
116
emitter-and-signal/src/signal.rs
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
pub use tokio::task::JoinError;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Publisher<T> {
|
||||||
|
sender: watch::Sender<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Publisher<T> {
|
||||||
|
pub async fn all_unsubscribed(&self) {
|
||||||
|
self.sender.closed().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn publish(&self, value: T) {
|
||||||
|
self.sender.send_replace(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn publish_with<F: FnOnce(&mut T) -> bool>(&self, maybe_modify: F) {
|
||||||
|
self.sender.send_if_modified(maybe_modify);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PublisherStream<T> {
|
||||||
|
receiver: mpsc::Receiver<Publisher<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> PublisherStream<T> {
|
||||||
|
/// Returns `None` when no more subscriptions can ever be made
|
||||||
|
pub async fn wait(&mut self) -> Option<Publisher<T>> {
|
||||||
|
self.receiver.recv().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Signal<T> {
|
||||||
|
sender: watch::Sender<T>,
|
||||||
|
publisher_sender: mpsc::Sender<Publisher<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Signal<T> {
|
||||||
|
pub fn new<R, Fut: Future<Output = R> + Send + 'static>(
|
||||||
|
initial: T,
|
||||||
|
producer: impl FnOnce(PublisherStream<T>) -> Fut,
|
||||||
|
) -> (Self, impl Future<Output = Result<R, JoinError>>)
|
||||||
|
where
|
||||||
|
R: Send + 'static,
|
||||||
|
{
|
||||||
|
let (sender, _) = watch::channel(initial);
|
||||||
|
let (publisher_sender, publisher_receiver) = mpsc::channel(1);
|
||||||
|
|
||||||
|
let publisher_stream = PublisherStream {
|
||||||
|
receiver: publisher_receiver,
|
||||||
|
};
|
||||||
|
|
||||||
|
let producer_join_handle = tokio::spawn(producer(publisher_stream));
|
||||||
|
|
||||||
|
(
|
||||||
|
Self {
|
||||||
|
publisher_sender,
|
||||||
|
sender,
|
||||||
|
},
|
||||||
|
producer_join_handle,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe(&self) -> Result<Subscription<T>, ProducerExited> {
|
||||||
|
let receiver = self.sender.subscribe();
|
||||||
|
|
||||||
|
if self.sender.receiver_count() == 1 {
|
||||||
|
if let Err(mpsc::error::TrySendError::Closed(_)) =
|
||||||
|
self.publisher_sender.try_send(Publisher {
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
})
|
||||||
|
{
|
||||||
|
return Err(ProducerExited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Subscription { receiver })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Subscription<T> {
|
||||||
|
receiver: watch::Receiver<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct ProducerExited;
|
||||||
|
|
||||||
|
impl<T> Subscription<T> {
|
||||||
|
pub async fn changed(&mut self) -> Result<(), ProducerExited> {
|
||||||
|
self.receiver.changed().await.map_err(|_| ProducerExited)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&mut self) -> T::Owned
|
||||||
|
where
|
||||||
|
T: ToOwned,
|
||||||
|
{
|
||||||
|
self.receiver.borrow_and_update().to_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn for_each<Fut: Future<Output = ()>>(mut self, mut func: impl FnMut(T::Owned) -> Fut)
|
||||||
|
where
|
||||||
|
T: ToOwned,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
func(self.get()).await;
|
||||||
|
if self.changed().await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
6
emitter-and-signal/src/signal_ext.rs
Normal file
6
emitter-and-signal/src/signal_ext.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
use ext_trait::extension;
|
||||||
|
|
||||||
|
use super::signal::Signal;
|
||||||
|
|
||||||
|
#[extension(pub trait SignalExt)]
|
||||||
|
impl<T> Signal<T> {}
|
Reference in New Issue
Block a user