chore+feat(emitter-and-signal): slightly update design and errors
This commit is contained in:
@@ -7,4 +7,5 @@ license = { workspace = true }
|
||||
[dependencies]
|
||||
deranged = { workspace = true }
|
||||
ext-trait = { workspace = true }
|
||||
snafu = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
use std::{future::Future, num::NonZero};
|
||||
|
||||
use deranged::RangedUsize;
|
||||
use tokio::{
|
||||
sync::{broadcast, mpsc},
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
use super::ProducerExited;
|
||||
use deranged::RangedUsize;
|
||||
use snafu::Snafu;
|
||||
use std::{future::Future, num::NonZero};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
pub use tokio::task::JoinError;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Publisher<T> {
|
||||
@@ -47,7 +44,7 @@ impl<T> Emitter<T> {
|
||||
pub fn new<R, Fut>(
|
||||
producer: impl FnOnce(PublisherStream<T>) -> Fut,
|
||||
capacity: Capacity,
|
||||
) -> (Self, JoinHandle<R>)
|
||||
) -> (Self, impl Future<Output = Result<R, JoinError>>)
|
||||
where
|
||||
Fut: Future<Output = R> + Send + 'static,
|
||||
T: Clone,
|
||||
@@ -93,8 +90,12 @@ pub struct Subscription<T> {
|
||||
receiver: broadcast::Receiver<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Snafu)]
|
||||
pub enum NextError {
|
||||
ProducerExited(ProducerExited),
|
||||
/// the producer backing this emitter exited
|
||||
ProducerExited { source: ProducerExited },
|
||||
|
||||
/// the broadcast channel underlying this emitter lagged and {skipped_events} events were skipped
|
||||
Lagged { skipped_events: NonZero<u64> },
|
||||
}
|
||||
|
||||
@@ -104,7 +105,9 @@ impl<T> Subscription<T> {
|
||||
T: Clone,
|
||||
{
|
||||
self.receiver.recv().await.map_err(|err| match err {
|
||||
broadcast::error::RecvError::Closed => NextError::ProducerExited(ProducerExited),
|
||||
broadcast::error::RecvError::Closed => NextError::ProducerExited {
|
||||
source: ProducerExited,
|
||||
},
|
||||
broadcast::error::RecvError::Lagged(skipped_events) => NextError::Lagged {
|
||||
skipped_events: skipped_events
|
||||
.try_into()
|
||||
|
||||
@@ -1,11 +1,17 @@
|
||||
use ext_trait::extension;
|
||||
use tokio::{select, task::JoinHandle};
|
||||
use std::future::Future;
|
||||
|
||||
use super::emitter::{Capacity, Emitter, NextError};
|
||||
use ext_trait::extension;
|
||||
use tokio::select;
|
||||
|
||||
use super::emitter::{Capacity, Emitter, JoinError, NextError};
|
||||
|
||||
#[extension(pub trait EmitterExt)]
|
||||
impl<T> Emitter<T> {
|
||||
fn map<M, F>(self, mut func: F, capacity: Capacity) -> (Emitter<M>, JoinHandle<()>)
|
||||
fn map<M, F>(
|
||||
self,
|
||||
mut func: F,
|
||||
capacity: Capacity,
|
||||
) -> (Emitter<M>, impl Future<Output = Result<(), JoinError>>)
|
||||
where
|
||||
T: Send + 'static + Clone,
|
||||
M: Send + 'static + Clone,
|
||||
@@ -28,7 +34,7 @@ impl<T> Emitter<T> {
|
||||
match event_res {
|
||||
Ok(event) => publisher.publish(func(event)),
|
||||
Err(NextError::Lagged { .. }) => {},
|
||||
Err(NextError::ProducerExited(_)) => return,
|
||||
Err(NextError::ProducerExited { .. }) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -39,7 +45,11 @@ impl<T> Emitter<T> {
|
||||
)
|
||||
}
|
||||
|
||||
fn filter<F>(self, mut func: F, capacity: Capacity) -> (Emitter<T>, JoinHandle<()>)
|
||||
fn filter<F>(
|
||||
self,
|
||||
mut func: F,
|
||||
capacity: Capacity,
|
||||
) -> (Emitter<T>, impl Future<Output = Result<(), JoinError>>)
|
||||
where
|
||||
T: Send + 'static + Clone,
|
||||
F: Send + 'static + FnMut(&T) -> bool,
|
||||
@@ -63,7 +73,7 @@ impl<T> Emitter<T> {
|
||||
publisher.publish(event)
|
||||
},
|
||||
Err(NextError::Lagged { .. }) => {},
|
||||
Err(NextError::ProducerExited(_)) => return,
|
||||
Err(NextError::ProducerExited { .. }) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -74,7 +84,11 @@ impl<T> Emitter<T> {
|
||||
)
|
||||
}
|
||||
|
||||
fn filter_mut<F>(self, mut func: F, capacity: Capacity) -> (Emitter<T>, JoinHandle<()>)
|
||||
fn filter_mut<F>(
|
||||
self,
|
||||
mut func: F,
|
||||
capacity: Capacity,
|
||||
) -> (Emitter<T>, impl Future<Output = Result<(), JoinError>>)
|
||||
where
|
||||
T: Send + 'static + Clone,
|
||||
F: Send + 'static + FnMut(&mut T) -> bool,
|
||||
@@ -98,7 +112,7 @@ impl<T> Emitter<T> {
|
||||
publisher.publish(event)
|
||||
},
|
||||
Err(NextError::Lagged { .. }) => {},
|
||||
Err(NextError::ProducerExited(_)) => return,
|
||||
Err(NextError::ProducerExited { .. }) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -109,7 +123,11 @@ impl<T> Emitter<T> {
|
||||
)
|
||||
}
|
||||
|
||||
fn filter_map<M, F>(self, mut func: F, capacity: Capacity) -> (Emitter<M>, JoinHandle<()>)
|
||||
fn filter_map<M, F>(
|
||||
self,
|
||||
mut func: F,
|
||||
capacity: Capacity,
|
||||
) -> (Emitter<M>, impl Future<Output = Result<(), JoinError>>)
|
||||
where
|
||||
T: Send + 'static + Clone,
|
||||
M: Send + 'static + Clone,
|
||||
@@ -134,7 +152,7 @@ impl<T> Emitter<T> {
|
||||
publisher.publish(mapped)
|
||||
},
|
||||
Err(NextError::Lagged { .. }) => {},
|
||||
Err(NextError::ProducerExited(_)) => return,
|
||||
Err(NextError::ProducerExited { .. }) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
pub mod emitter;
|
||||
mod emitter_ext;
|
||||
pub mod signal;
|
||||
mod signal_ext;
|
||||
use snafu::Snafu;
|
||||
|
||||
pub mod emitter;
|
||||
pub mod emitter_ext;
|
||||
pub mod signal;
|
||||
pub mod signal_ext;
|
||||
|
||||
pub use emitter::Emitter;
|
||||
pub use emitter_ext::EmitterExt;
|
||||
pub use signal::Signal;
|
||||
pub use signal_ext::SignalExt;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
/// the producer backing this [`Signal`] or [`Emitter`] exited
|
||||
#[derive(Debug, Clone, Copy, Snafu)]
|
||||
pub struct ProducerExited;
|
||||
|
||||
@@ -3,6 +3,8 @@ use std::future::Future;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
pub use tokio::task::JoinError;
|
||||
|
||||
use crate::ProducerExited;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Publisher<T> {
|
||||
sender: watch::Sender<T>,
|
||||
@@ -87,9 +89,6 @@ pub struct Subscription<T> {
|
||||
receiver: watch::Receiver<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ProducerExited;
|
||||
|
||||
impl<T> Subscription<T> {
|
||||
pub async fn changed(&mut self) -> Result<(), ProducerExited> {
|
||||
self.receiver.changed().await.map_err(|_| ProducerExited)
|
||||
|
||||
@@ -1,6 +1,55 @@
|
||||
use ext_trait::extension;
|
||||
use std::future::Future;
|
||||
|
||||
use super::signal::Signal;
|
||||
use ext_trait::extension;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use tokio::select;
|
||||
|
||||
use crate::ProducerExited;
|
||||
|
||||
use super::signal::{JoinError, Signal};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub struct ProducerAlreadyExited {
|
||||
source: ProducerExited,
|
||||
}
|
||||
|
||||
#[extension(pub trait SignalExt)]
|
||||
impl<T> Signal<T> {}
|
||||
impl<T> Signal<T> {
|
||||
fn map<M, F>(
|
||||
self,
|
||||
mut func: F,
|
||||
) -> Result<(Signal<M>, impl Future<Output = Result<(), JoinError>>), ProducerAlreadyExited>
|
||||
where
|
||||
T: 'static + Sync + Send + Clone,
|
||||
M: 'static + Sync + Send + Clone,
|
||||
F: 'static + Send + FnMut(T) -> M,
|
||||
{
|
||||
let initial = func(self.subscribe().context(ProducerAlreadyExitedSnafu)?.get());
|
||||
|
||||
Ok(Signal::new(initial, |mut publisher_stream| async move {
|
||||
while let Some(publisher) = publisher_stream.wait().await {
|
||||
let Ok(mut subscription) = self.subscribe() else {
|
||||
return;
|
||||
};
|
||||
|
||||
loop {
|
||||
select! {
|
||||
biased;
|
||||
_ = publisher.all_unsubscribed() => {
|
||||
break;
|
||||
}
|
||||
changed_res = subscription.changed() => {
|
||||
match changed_res {
|
||||
Ok(()) => {
|
||||
let value = subscription.get();
|
||||
publisher.publish(func(value))
|
||||
},
|
||||
Err(ProducerExited) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user