Compare commits
4 Commits
a25079c813
...
ead3c6e4a9
Author | SHA1 | Date | |
---|---|---|---|
ead3c6e4a9 | |||
04f9aa24cf | |||
70dda580ee | |||
7b2ebc5fe9 |
@@ -30,7 +30,12 @@ serde_json = "1.0.140"
|
||||
shadow-rs = { version = "1.0.1", default-features = false }
|
||||
snafu = "0.8.5"
|
||||
strum = { version = "0.27.1", features = ["derive"] }
|
||||
tokio = { version = "1.32.0", features = ["rt", "rt-multi-thread", "time"] }
|
||||
tokio = { version = "1.32.0", features = [
|
||||
"rt",
|
||||
"rt-multi-thread",
|
||||
"sync",
|
||||
"time",
|
||||
] }
|
||||
tracing = "0.1.37"
|
||||
tracing-appender = "0.2.3"
|
||||
tracing-subscriber = "0.3.17"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
use std::{fmt::Display, str::FromStr};
|
||||
use std::{convert::Infallible, fmt::Display, str::FromStr};
|
||||
|
||||
use pyo3::{exceptions::PyValueError, prelude::*};
|
||||
use pyo3::{exceptions::PyValueError, prelude::*, types::PyString};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use super::{
|
||||
@@ -58,3 +58,14 @@ impl<'py> FromPyObject<'py> for EntityId {
|
||||
Ok(entity_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> IntoPyObject<'py> for &EntityId {
|
||||
type Target = PyString;
|
||||
type Output = Bound<'py, Self::Target>;
|
||||
type Error = Infallible;
|
||||
|
||||
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
|
||||
let s = self.to_string();
|
||||
s.into_pyobject(py)
|
||||
}
|
||||
}
|
||||
|
@@ -1,3 +1,5 @@
|
||||
use std::convert::Infallible;
|
||||
|
||||
use pyo3::prelude::*;
|
||||
|
||||
use crate::python_utils::{detach, validate_type_by_name};
|
||||
@@ -17,27 +19,37 @@ impl<'source> FromPyObject<'source> for HomeAssistant {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> IntoPyObject<'py> for &HomeAssistant {
|
||||
type Target = PyAny;
|
||||
type Output = Bound<'py, Self::Target>;
|
||||
type Error = Infallible;
|
||||
|
||||
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
|
||||
Ok(self.0.bind(py).to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
impl HomeAssistant {
|
||||
/// Return the representation
|
||||
pub fn repr(&self, py: &Python) -> Result<String, PyErr> {
|
||||
let bound = self.0.bind(*py);
|
||||
pub fn repr(&self, py: Python<'_>) -> Result<String, PyErr> {
|
||||
let bound = self.0.bind(py);
|
||||
let repr = bound.repr()?;
|
||||
repr.extract()
|
||||
}
|
||||
|
||||
/// Return if Home Assistant is running.
|
||||
pub fn is_running(&self, py: &Python) -> Result<bool, PyErr> {
|
||||
let is_running = self.0.getattr(*py, "is_running")?;
|
||||
is_running.extract(*py)
|
||||
pub fn is_running(&self, py: Python<'_>) -> Result<bool, PyErr> {
|
||||
let is_running = self.0.getattr(py, "is_running")?;
|
||||
is_running.extract(py)
|
||||
}
|
||||
/// Return if Home Assistant is stopping.
|
||||
pub fn is_stopping(&self, py: &Python) -> Result<bool, PyErr> {
|
||||
let is_stopping = self.0.getattr(*py, "is_stopping")?;
|
||||
is_stopping.extract(*py)
|
||||
pub fn is_stopping(&self, py: Python<'_>) -> Result<bool, PyErr> {
|
||||
let is_stopping = self.0.getattr(py, "is_stopping")?;
|
||||
is_stopping.extract(py)
|
||||
}
|
||||
|
||||
pub fn states(&self, py: &Python) -> Result<StateMachine, PyErr> {
|
||||
let states = self.0.getattr(*py, "states")?;
|
||||
states.extract(*py)
|
||||
pub fn states(&self, py: Python<'_>) -> Result<StateMachine, PyErr> {
|
||||
let states = self.0.getattr(py, "states")?;
|
||||
states.extract(py)
|
||||
}
|
||||
}
|
||||
|
@@ -23,11 +23,11 @@ impl<'py> FromPyObject<'py> for StateMachine {
|
||||
impl StateMachine {
|
||||
pub fn get<Attributes: for<'py> FromPyObject<'py>, ContextEvent: for<'py> FromPyObject<'py>>(
|
||||
&self,
|
||||
py: &Python,
|
||||
py: Python<'_>,
|
||||
entity_id: EntityId,
|
||||
) -> Result<Option<State<Attributes, ContextEvent>>, PyErr> {
|
||||
) -> PyResult<Option<State<Attributes, ContextEvent>>> {
|
||||
let args = (entity_id.to_string(),);
|
||||
let state = self.0.call_method1(*py, "get", args)?;
|
||||
state.extract(*py)
|
||||
let state = self.0.call_method1(py, "get", args)?;
|
||||
state.extract(py)
|
||||
}
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ use tracing_to_home_assistant::TracingToHomeAssistant;
|
||||
mod arbitrary;
|
||||
mod home_assistant;
|
||||
mod python_utils;
|
||||
mod store;
|
||||
mod tracing_to_home_assistant;
|
||||
|
||||
shadow!(build_info);
|
||||
@@ -46,7 +47,7 @@ async fn real_main(home_assistant: HomeAssistant) -> ! {
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
fn main<'p>(py: Python<'p>, home_assistant: HomeAssistant) -> PyResult<Bound<'p, PyAny>> {
|
||||
fn main<'py>(py: Python<'py>, home_assistant: HomeAssistant) -> PyResult<Bound<'py, PyAny>> {
|
||||
pyo3_async_runtimes::tokio::future_into_py::<_, ()>(py, async {
|
||||
real_main(home_assistant).await;
|
||||
})
|
||||
|
116
src/store/mod.rs
Normal file
116
src/store/mod.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use std::future::Future;
|
||||
|
||||
use tokio::{
|
||||
sync::{mpsc, watch},
|
||||
task::{JoinError, JoinHandle},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PublisherStream<T> {
|
||||
receiver: mpsc::Receiver<Publisher<T>>,
|
||||
}
|
||||
|
||||
impl<T> PublisherStream<T> {
|
||||
pub async fn wait(&mut self) -> Option<Publisher<T>> {
|
||||
self.receiver.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Publisher<T> {
|
||||
sender: watch::Sender<T>,
|
||||
}
|
||||
|
||||
impl<T> Publisher<T> {
|
||||
pub async fn all_unsubscribed(&self) {
|
||||
self.sender.closed().await
|
||||
}
|
||||
|
||||
pub fn publish(&self, value: T) {
|
||||
self.sender.send_replace(value);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Store<T> {
|
||||
sender: watch::Sender<T>,
|
||||
publisher_sender: mpsc::Sender<Publisher<T>>,
|
||||
producer_join_handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl<T> Store<T> {
|
||||
pub fn new<Fut: Future<Output = ()> + Send + 'static>(
|
||||
initial: T,
|
||||
producer: impl FnOnce(PublisherStream<T>) -> Fut,
|
||||
) -> Self {
|
||||
let (sender, _) = watch::channel(initial);
|
||||
let (publisher_sender, publisher_receiver) = mpsc::channel(1);
|
||||
|
||||
let subscribers_stream = PublisherStream {
|
||||
receiver: publisher_receiver,
|
||||
};
|
||||
|
||||
let producer_join_handle = tokio::spawn(producer(subscribers_stream));
|
||||
|
||||
Self {
|
||||
publisher_sender,
|
||||
sender,
|
||||
producer_join_handle,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> Result<Subscription<T>, ProducerExited> {
|
||||
let receiver = self.sender.subscribe();
|
||||
|
||||
if self.sender.receiver_count() == 1 {
|
||||
if let Err(e) = self.publisher_sender.try_send(Publisher {
|
||||
sender: self.sender.clone(),
|
||||
}) {
|
||||
match e {
|
||||
mpsc::error::TrySendError::Full(_) => unreachable!(),
|
||||
mpsc::error::TrySendError::Closed(_) => return Err(ProducerExited),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Subscription { receiver })
|
||||
}
|
||||
|
||||
/// Signify that no one can ever subscribe again,
|
||||
/// and wait for the producer task to complete.
|
||||
pub fn run(self) -> impl Future<Output = Result<(), JoinError>> {
|
||||
self.producer_join_handle
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Subscription<T> {
|
||||
receiver: watch::Receiver<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ProducerExited;
|
||||
|
||||
impl<T> Subscription<T> {
|
||||
pub async fn changed(&mut self) -> Result<(), ProducerExited> {
|
||||
self.receiver.changed().await.map_err(|_| ProducerExited)
|
||||
}
|
||||
|
||||
pub fn get(&mut self) -> T::Owned
|
||||
where
|
||||
T: ToOwned,
|
||||
{
|
||||
self.receiver.borrow_and_update().to_owned()
|
||||
}
|
||||
|
||||
pub async fn for_each<Fut: Future<Output = ()>>(mut self, mut func: impl FnMut(T::Owned) -> Fut)
|
||||
where
|
||||
T: ToOwned,
|
||||
{
|
||||
loop {
|
||||
func(self.get()).await;
|
||||
if self.changed().await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user