diff options
Diffstat (limited to 'src/stream_ext.rs')
-rw-r--r-- | src/stream_ext.rs | 179 |
1 files changed, 173 insertions, 6 deletions
diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 1157c9e..52d3202 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -1,3 +1,4 @@ +use core::future::Future; use futures_core::Stream; mod all; @@ -27,6 +28,9 @@ use fuse::Fuse; mod map; use map::Map; +mod map_while; +use map_while::MapWhile; + mod merge; use merge::Merge; @@ -39,21 +43,26 @@ use skip::Skip; mod skip_while; use skip_while::SkipWhile; -mod try_next; -use try_next::TryNext; - mod take; use take::Take; mod take_while; use take_while::TakeWhile; +mod then; +use then::Then; + +mod try_next; +use try_next::TryNext; + cfg_time! { - mod timeout; + pub(crate) mod timeout; use timeout::Timeout; use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; + mod chunks_timeout; + use chunks_timeout::ChunksTimeout; } /// An extension trait for the [`Stream`] trait that provides a variety of @@ -106,6 +115,12 @@ pub trait StreamExt: Stream { /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` /// crate. /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// /// # Examples /// /// ``` @@ -142,6 +157,12 @@ pub trait StreamExt: Stream { /// an [`Option<Result<T, E>>`](Option), making for easy use /// with the [`?`](std::ops::Try) operator. /// + /// # Cancel safety + /// + /// This method is cancel safe. The returned future only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// /// # Examples /// /// ``` @@ -197,6 +218,93 @@ pub trait StreamExt: Stream { Map::new(self, f) } + /// Map this stream's items to a different type for as long as determined by + /// the provided closure. A stream of the target type will be returned, + /// which will yield elements until the closure returns `None`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, until it returns `None`. It is executed inline + /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned, + /// the underlying stream will not be polled again. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the [`Iterator::map_while`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let mut stream = stream.map_while(|x| { + /// if x < 4 { + /// Some(x + 3) + /// } else { + /// None + /// } + /// }); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + MapWhile::new(self, f) + } + + /// Maps this stream's items asynchronously to a different type, returning a + /// new stream of the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the returned future is executed. Only one + /// future is executed at the time. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `then` methods in the + /// standard library. + /// + /// Be aware that if the future is not `Unpin`, then neither is the `Stream` + /// returned by this method. To handle this, you can use `tokio::pin!` as in + /// the example below or put the stream in a `Box` with `Box::pin(stream)`. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// async fn do_async_work(value: i32) -> i32 { + /// value + 3 + /// } + /// + /// let stream = stream::iter(1..=3); + /// let stream = stream.then(do_async_work); + /// + /// tokio::pin!(stream); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + Then::new(self, f) + } + /// Combine two streams into one by interleaving the output of both as it /// is produced. /// @@ -874,6 +982,8 @@ pub trait StreamExt: Stream { /// Slows down a stream by enforcing a delay between items. /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// /// # Example /// /// Create a throttled stream. @@ -899,6 +1009,63 @@ pub trait StreamExt: Stream { { throttle(duration, self) } + + /// Batches the items in the given stream using a maximum duration and size for each batch. + /// + /// This stream returns the next batch of items in the following situations: + /// 1. The inner stream has returned at least `max_size` many items since the last batch. + /// 2. The time since the first item of a batch is greater than the given duration. + /// 3. The end of the stream is reached. + /// + /// The length of the returned vector is never empty or greater than the maximum size. Empty batches + /// will not be emitted if no items are received upstream. + /// + /// # Panics + /// + /// This function panics if `max_size` is zero + /// + /// # Example + /// + /// ```rust + /// use std::time::Duration; + /// use tokio::time; + /// use tokio_stream::{self as stream, StreamExt}; + /// use futures::FutureExt; + /// + /// #[tokio::main] + /// # async fn _unused() {} + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// async fn main() { + /// let iter = vec![1, 2, 3, 4].into_iter(); + /// let stream0 = stream::iter(iter); + /// + /// let iter = vec![5].into_iter(); + /// let stream1 = stream::iter(iter) + /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + /// + /// let chunk_stream = stream0 + /// .chain(stream1) + /// .chunks_timeout(3, Duration::from_secs(2)); + /// tokio::pin!(chunk_stream); + /// + /// // a full batch was received + /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// // deadline was reached before max_size was reached + /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// // last element in the stream + /// assert_eq!(chunk_stream.next().await, Some(vec![5])); + /// } + /// ``` + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + #[track_caller] + fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> + where + Self: Sized, + { + assert!(max_size > 0, "`max_size` must be non-zero."); + ChunksTimeout::new(self, max_size, duration) + } } impl<St: ?Sized> StreamExt for St where St: Stream {} @@ -906,10 +1073,10 @@ impl<St: ?Sized> StreamExt for St where St: Stream {} /// Merge the size hints from two streams. fn merge_size_hints( (left_low, left_high): (usize, Option<usize>), - (right_low, right_hign): (usize, Option<usize>), + (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>) { let low = left_low.saturating_add(right_low); - let high = match (left_high, right_hign) { + let high = match (left_high, right_high) { (Some(h1), Some(h2)) => h1.checked_add(h2), _ => None, }; |