feat(persisted): initial implementation
This commit is contained in:
14
persisted/Cargo.toml
Normal file
14
persisted/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "persisted"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
bytes = { workspace = true }
|
||||
emitter-and-signal = { path = "../emitter-and-signal" }
|
||||
fjall = "2.11"
|
||||
postcard = { version = "1.1", features = ["use-std"] }
|
||||
serde = { workspace = true }
|
||||
snafu = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt", "sync"] }
|
||||
111
persisted/src/lib.rs
Normal file
111
persisted/src/lib.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
pub use bytes::Bytes;
|
||||
use emitter_and_signal::signal::{JoinError, Signal};
|
||||
pub use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{fmt::Debug, future::Future, num::NonZeroUsize, ops::Deref, sync::Arc};
|
||||
use tokio::{
|
||||
select,
|
||||
sync::{mpsc, oneshot},
|
||||
task::spawn_blocking,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Snafu)]
|
||||
pub enum PersistedError {
|
||||
Missing,
|
||||
CreationError {
|
||||
// Wrapped in [`Arc`] to make this [`Clone`]
|
||||
source: Arc<fjall::Error>,
|
||||
},
|
||||
DeserializationError {
|
||||
source: postcard::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn persisted<T: Debug + Send + Sync + 'static + Serialize + for<'a> Deserialize<'a>>(
|
||||
partition: Partition,
|
||||
identifier: Bytes,
|
||||
buffer: NonZeroUsize,
|
||||
) -> (
|
||||
Setter<T>,
|
||||
Signal<Result<T, PersistedError>>,
|
||||
impl Future<Output = Result<(), JoinError>>,
|
||||
) {
|
||||
let initial = spawn_blocking({
|
||||
let partition = partition.clone();
|
||||
let identifier = identifier.clone();
|
||||
move || partition.get(identifier.deref())
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.map_err(Arc::new)
|
||||
.context(CreationSnafu)
|
||||
.and_then(|op| op.context(MissingSnafu))
|
||||
.and_then(|slice| postcard::from_bytes(&slice).context(DeserializationSnafu));
|
||||
|
||||
let (set_tx, mut set_rx) = mpsc::channel(buffer.get());
|
||||
let setter = Setter { sender: set_tx };
|
||||
|
||||
let (signal, task) = Signal::new(initial, move |mut publisher_stream| async move {
|
||||
while let Some(publisher) = publisher_stream.wait().await {
|
||||
loop {
|
||||
select! {
|
||||
biased;
|
||||
_ = publisher.all_unsubscribed() => {
|
||||
break;
|
||||
}
|
||||
new_value_opt = set_rx.recv() => {
|
||||
let Some((new_value, callback)) = new_value_opt else { return };
|
||||
let serialized_res = postcard::to_stdvec(&new_value).context(SerializationSnafu);
|
||||
// Stand-in for Option::async_and_then
|
||||
let persisted_res = match serialized_res {
|
||||
Ok(serialized) => spawn_blocking({
|
||||
let partition = partition.clone();
|
||||
let identifier = identifier.clone();
|
||||
move || partition.insert(identifier.deref(), serialized)
|
||||
}).await.unwrap().context(SavingSnafu),
|
||||
Err(error) => Err(error),
|
||||
};
|
||||
|
||||
if persisted_res.is_ok() {
|
||||
publisher.publish(Ok(new_value));
|
||||
}
|
||||
|
||||
let _ = callback.send(persisted_res);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(setter, signal, task)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Setter<T: 'static> {
|
||||
sender: mpsc::Sender<(T, oneshot::Sender<Result<(), SetError<T>>>)>,
|
||||
}
|
||||
|
||||
// TODO: add doc comments functioning as error Display
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum SetError<T: 'static> {
|
||||
Closed { source: mpsc::error::SendError<T> },
|
||||
NoFeedback { source: oneshot::error::RecvError },
|
||||
SerializationError { source: postcard::Error },
|
||||
SavingError { source: fjall::Error },
|
||||
}
|
||||
|
||||
impl<T: Debug> Setter<T> {
|
||||
pub async fn set(&self, value: T) -> Result<(), SetError<T>> {
|
||||
let (callback_tx, callback_rx) = oneshot::channel();
|
||||
|
||||
self.sender
|
||||
.send((value, callback_tx))
|
||||
.await
|
||||
.map_err(|send_error| mpsc::error::SendError(send_error.0 .0))
|
||||
.context(ClosedSnafu)?;
|
||||
|
||||
let set_res = callback_rx.await.context(NoFeedbackSnafu)?;
|
||||
set_res
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user