diff options
Diffstat (limited to 'src/wrappers/watch.rs')
-rw-r--r-- | src/wrappers/watch.rs | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index bd3a18a..ec8ead0 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. /// /// # Examples /// @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } @@ -49,7 +72,7 @@ use tokio::sync::watch::error::RecvError; /// [`Stream`]: trait@crate::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] pub struct WatchStream<T> { - inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>, } async fn make_future<T: Clone + Send + Sync>( @@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { |