summaryrefslogtreecommitdiff
path: root/src/stream_ext.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream_ext.rs')
-rw-r--r--src/stream_ext.rs179
1 files changed, 173 insertions, 6 deletions
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 1157c9e..52d3202 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -1,3 +1,4 @@
+use core::future::Future;
use futures_core::Stream;
mod all;
@@ -27,6 +28,9 @@ use fuse::Fuse;
mod map;
use map::Map;
+mod map_while;
+use map_while::MapWhile;
+
mod merge;
use merge::Merge;
@@ -39,21 +43,26 @@ use skip::Skip;
mod skip_while;
use skip_while::SkipWhile;
-mod try_next;
-use try_next::TryNext;
-
mod take;
use take::Take;
mod take_while;
use take_while::TakeWhile;
+mod then;
+use then::Then;
+
+mod try_next;
+use try_next::TryNext;
+
cfg_time! {
- mod timeout;
+ pub(crate) mod timeout;
use timeout::Timeout;
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
+ mod chunks_timeout;
+ use chunks_timeout::ChunksTimeout;
}
/// An extension trait for the [`Stream`] trait that provides a variety of
@@ -106,6 +115,12 @@ pub trait StreamExt: Stream {
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. The returned future only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
/// # Examples
///
/// ```
@@ -142,6 +157,12 @@ pub trait StreamExt: Stream {
/// an [`Option<Result<T, E>>`](Option), making for easy use
/// with the [`?`](std::ops::Try) operator.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. The returned future only
+ /// holds onto a reference to the underlying stream,
+ /// so dropping it will never lose a value.
+ ///
/// # Examples
///
/// ```
@@ -197,6 +218,93 @@ pub trait StreamExt: Stream {
Map::new(self, f)
}
+ /// Map this stream's items to a different type for as long as determined by
+ /// the provided closure. A stream of the target type will be returned,
+ /// which will yield elements until the closure returns `None`.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, until it returns `None`. It is executed inline
+ /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
+ /// the underlying stream will not be polled again.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
+ /// standard library.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=10);
+ /// let mut stream = stream.map_while(|x| {
+ /// if x < 4 {
+ /// Some(x + 3)
+ /// } else {
+ /// None
+ /// }
+ /// });
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// assert_eq!(stream.next().await, None);
+ /// # }
+ /// ```
+ fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
+ where
+ F: FnMut(Self::Item) -> Option<T>,
+ Self: Sized,
+ {
+ MapWhile::new(self, f)
+ }
+
+ /// Maps this stream's items asynchronously to a different type, returning a
+ /// new stream of the resulting type.
+ ///
+ /// The provided closure is executed over all elements of this stream as
+ /// they are made available, and the returned future is executed. Only one
+ /// future is executed at the time.
+ ///
+ /// Note that this function consumes the stream passed into it and returns a
+ /// wrapped version of it, similar to the existing `then` methods in the
+ /// standard library.
+ ///
+ /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
+ /// returned by this method. To handle this, you can use `tokio::pin!` as in
+ /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() {
+ /// use tokio_stream::{self as stream, StreamExt};
+ ///
+ /// async fn do_async_work(value: i32) -> i32 {
+ /// value + 3
+ /// }
+ ///
+ /// let stream = stream::iter(1..=3);
+ /// let stream = stream.then(do_async_work);
+ ///
+ /// tokio::pin!(stream);
+ ///
+ /// assert_eq!(stream.next().await, Some(4));
+ /// assert_eq!(stream.next().await, Some(5));
+ /// assert_eq!(stream.next().await, Some(6));
+ /// # }
+ /// ```
+ fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
+ where
+ F: FnMut(Self::Item) -> Fut,
+ Fut: Future,
+ Self: Sized,
+ {
+ Then::new(self, f)
+ }
+
/// Combine two streams into one by interleaving the output of both as it
/// is produced.
///
@@ -874,6 +982,8 @@ pub trait StreamExt: Stream {
/// Slows down a stream by enforcing a delay between items.
///
+ /// The underlying timer behind this utility has a granularity of one millisecond.
+ ///
/// # Example
///
/// Create a throttled stream.
@@ -899,6 +1009,63 @@ pub trait StreamExt: Stream {
{
throttle(duration, self)
}
+
+ /// Batches the items in the given stream using a maximum duration and size for each batch.
+ ///
+ /// This stream returns the next batch of items in the following situations:
+ /// 1. The inner stream has returned at least `max_size` many items since the last batch.
+ /// 2. The time since the first item of a batch is greater than the given duration.
+ /// 3. The end of the stream is reached.
+ ///
+ /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
+ /// will not be emitted if no items are received upstream.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if `max_size` is zero
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use std::time::Duration;
+ /// use tokio::time;
+ /// use tokio_stream::{self as stream, StreamExt};
+ /// use futures::FutureExt;
+ ///
+ /// #[tokio::main]
+ /// # async fn _unused() {}
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// async fn main() {
+ /// let iter = vec![1, 2, 3, 4].into_iter();
+ /// let stream0 = stream::iter(iter);
+ ///
+ /// let iter = vec![5].into_iter();
+ /// let stream1 = stream::iter(iter)
+ /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+ ///
+ /// let chunk_stream = stream0
+ /// .chain(stream1)
+ /// .chunks_timeout(3, Duration::from_secs(2));
+ /// tokio::pin!(chunk_stream);
+ ///
+ /// // a full batch was received
+ /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
+ /// // deadline was reached before max_size was reached
+ /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ /// // last element in the stream
+ /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
+ /// }
+ /// ```
+ #[cfg(feature = "time")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ #[track_caller]
+ fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
+ where
+ Self: Sized,
+ {
+ assert!(max_size > 0, "`max_size` must be non-zero.");
+ ChunksTimeout::new(self, max_size, duration)
+ }
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
@@ -906,10 +1073,10 @@ impl<St: ?Sized> StreamExt for St where St: Stream {}
/// Merge the size hints from two streams.
fn merge_size_hints(
(left_low, left_high): (usize, Option<usize>),
- (right_low, right_hign): (usize, Option<usize>),
+ (right_low, right_high): (usize, Option<usize>),
) -> (usize, Option<usize>) {
let low = left_low.saturating_add(right_low);
- let high = match (left_high, right_hign) {
+ let high = match (left_high, right_high) {
(Some(h1), Some(h2)) => h1.checked_add(h2),
_ => None,
};