summaryrefslogtreecommitdiff
path: root/src/wrappers/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wrappers/watch.rs')
-rw-r--r--src/wrappers/watch.rs36
1 files changed, 33 insertions, 3 deletions
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index bd3a18a..ec8ead0 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
-/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardless of whether it was the initial value or sent afterwards.
+/// This stream will start by yielding the current value when the WatchStream is polled,
+/// regardless of whether it was the initial value or sent afterwards,
+/// unless you use [`WatchStream<T>::from_changes`].
///
/// # Examples
///
@@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError;
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::new(rx);
///
+/// // existing rx output with "hello" is ignored here
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// Example with [`WatchStream<T>::from_changes`]:
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::future::FutureExt;
+/// use tokio::sync::watch;
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::from_changes(rx);
+///
+/// // no output from rx is available at this point - let's check this:
+/// assert!(rx.next().now_or_never().is_none());
+///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
@@ -49,7 +72,7 @@ use tokio::sync::watch::error::RecvError;
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
- inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
}
async fn make_future<T: Clone + Send + Sync>(
@@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
+
+ /// Create a new `WatchStream` that waits for the value to be changed.
+ pub fn from_changes(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
}
impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {