From eff0ad2bf8642794425560b3e7a9f096db81b9af Mon Sep 17 00:00:00 2001 From: Jacob Date: Wed, 7 Jan 2026 02:14:06 -0500 Subject: [PATCH] feat(persisted): initial implementation --- persisted/Cargo.toml | 14 ++++++ persisted/src/lib.rs | 111 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 persisted/Cargo.toml create mode 100644 persisted/src/lib.rs diff --git a/persisted/Cargo.toml b/persisted/Cargo.toml new file mode 100644 index 0000000..a9f6c20 --- /dev/null +++ b/persisted/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "persisted" +version = "0.1.0" +edition = "2021" +license.workspace = true + +[dependencies] +bytes = { workspace = true } +emitter-and-signal = { path = "../emitter-and-signal" } +fjall = "2.11" +postcard = { version = "1.1", features = ["use-std"] } +serde = { workspace = true } +snafu = { workspace = true } +tokio = { workspace = true, features = ["rt", "sync"] } diff --git a/persisted/src/lib.rs b/persisted/src/lib.rs new file mode 100644 index 0000000..9a07521 --- /dev/null +++ b/persisted/src/lib.rs @@ -0,0 +1,111 @@ +pub use bytes::Bytes; +use emitter_and_signal::signal::{JoinError, Signal}; +pub use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, Snafu}; +use std::{fmt::Debug, future::Future, num::NonZeroUsize, ops::Deref, sync::Arc}; +use tokio::{ + select, + sync::{mpsc, oneshot}, + task::spawn_blocking, +}; + +#[derive(Debug, Clone, Snafu)] +pub enum PersistedError { + Missing, + CreationError { + // Wrapped in [`Arc`] to make this [`Clone`] + source: Arc, + }, + DeserializationError { + source: postcard::Error, + }, +} + +pub async fn persisted Deserialize<'a>>( + partition: Partition, + identifier: Bytes, + buffer: NonZeroUsize, +) -> ( + Setter, + Signal>, + impl Future>, +) { + let initial = spawn_blocking({ + let partition = partition.clone(); + let identifier = identifier.clone(); + move || partition.get(identifier.deref()) + }) + .await + .unwrap() + .map_err(Arc::new) + .context(CreationSnafu) + .and_then(|op| op.context(MissingSnafu)) + .and_then(|slice| postcard::from_bytes(&slice).context(DeserializationSnafu)); + + let (set_tx, mut set_rx) = mpsc::channel(buffer.get()); + let setter = Setter { sender: set_tx }; + + let (signal, task) = Signal::new(initial, move |mut publisher_stream| async move { + while let Some(publisher) = publisher_stream.wait().await { + loop { + select! { + biased; + _ = publisher.all_unsubscribed() => { + break; + } + new_value_opt = set_rx.recv() => { + let Some((new_value, callback)) = new_value_opt else { return }; + let serialized_res = postcard::to_stdvec(&new_value).context(SerializationSnafu); + // Stand-in for Option::async_and_then + let persisted_res = match serialized_res { + Ok(serialized) => spawn_blocking({ + let partition = partition.clone(); + let identifier = identifier.clone(); + move || partition.insert(identifier.deref(), serialized) + }).await.unwrap().context(SavingSnafu), + Err(error) => Err(error), + }; + + if persisted_res.is_ok() { + publisher.publish(Ok(new_value)); + } + + let _ = callback.send(persisted_res); + } + } + } + } + }); + + (setter, signal, task) +} + +#[derive(Debug, Clone)] +pub struct Setter { + sender: mpsc::Sender<(T, oneshot::Sender>>)>, +} + +// TODO: add doc comments functioning as error Display +#[derive(Debug, Snafu)] +pub enum SetError { + Closed { source: mpsc::error::SendError }, + NoFeedback { source: oneshot::error::RecvError }, + SerializationError { source: postcard::Error }, + SavingError { source: fjall::Error }, +} + +impl Setter { + pub async fn set(&self, value: T) -> Result<(), SetError> { + let (callback_tx, callback_rx) = oneshot::channel(); + + self.sender + .send((value, callback_tx)) + .await + .map_err(|send_error| mpsc::error::SendError(send_error.0 .0)) + .context(ClosedSnafu)?; + + let set_res = callback_rx.await.context(NoFeedbackSnafu)?; + set_res + } +}