diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2024-02-06 08:51:07 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2024-02-06 08:51:07 +0000 |
commit | ec5ce4148e4305e26cac35f17c256dfb061f4080 (patch) | |
tree | 7d82b51451e1a8a40106414e34385ff76a3f7c3e | |
parent | cf61bbd97b6e6934c14617749aac69c90ce516bd (diff) | |
parent | ddb59c87394b27e094ff61ba6ec50ea25e2bac61 (diff) | |
download | tokio-stream-emu-34-2-dev.tar.gz |
Upgrade tokio-stream to 0.1.14 am: ddb59c8739emu-34-2-dev
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2950466
Change-Id: Iefc83ade9080a6253372b48389d00b769ce1c442
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 4 | ||||
-rw-r--r-- | CHANGELOG.md | 23 | ||||
-rw-r--r-- | Cargo.toml | 21 | ||||
-rw-r--r-- | Cargo.toml.orig | 18 | ||||
-rw-r--r-- | METADATA | 25 | ||||
-rw-r--r-- | src/lib.rs | 3 | ||||
-rw-r--r-- | src/stream_close.rs | 93 | ||||
-rw-r--r-- | src/stream_ext.rs | 116 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/timeout_repeating.rs | 56 | ||||
-rw-r--r-- | src/stream_map.rs | 32 | ||||
-rw-r--r-- | tests/stream_close.rs | 11 |
13 files changed, 374 insertions, 32 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index d866201..a31b22b 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0" + "sha1": "398dfda56d3ee4b0d4d9e86abe15039e86979d83" }, "path_in_vcs": "tokio-stream" }
\ No newline at end of file @@ -23,9 +23,9 @@ rust_library { host_supported: true, crate_name: "tokio_stream", cargo_env_compat: true, - cargo_pkg_version: "0.1.12", + cargo_pkg_version: "0.1.14", srcs: ["src/lib.rs"], - edition: "2018", + edition: "2021", features: [ "fs", "io-util", diff --git a/CHANGELOG.md b/CHANGELOG.md index c475c7c..c14ad07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,31 @@ -# 0.1.12 (January 20, 2022) +# 0.1.14 (April 26th, 2023) + +This bugfix release bumps the minimum version of Tokio to 1.15, which is +necessary for `timeout_repeating` to compile. ([#5657]) + +[#5657]: https://github.com/tokio-rs/tokio/pull/5657 + +# 0.1.13 (April 25th, 2023) + +This release bumps the MSRV of tokio-stream to 1.56. + +- stream: add "full" feature flag ([#5639]) +- stream: add `StreamExt::timeout_repeating` ([#5577]) +- stream: add `StreamNotifyClose` ([#4851]) + +[#4851]: https://github.com/tokio-rs/tokio/pull/4851 +[#5577]: https://github.com/tokio-rs/tokio/pull/5577 +[#5639]: https://github.com/tokio-rs/tokio/pull/5639 + +# 0.1.12 (January 20, 2023) - time: remove `Unpin` bound on `Throttle` methods ([#5105]) - time: document that `throttle` operates on ms granularity ([#5101]) +- sync: add `WatchStream::from_changes` ([#5432]) [#5105]: https://github.com/tokio-rs/tokio/pull/5105 [#5101]: https://github.com/tokio-rs/tokio/pull/5101 +[#5432]: https://github.com/tokio-rs/tokio/pull/5432 # 0.1.11 (October 11, 2022) @@ -10,10 +10,10 @@ # See Cargo.toml.orig for the original contents. [package] -edition = "2018" -rust-version = "1.49" +edition = "2021" +rust-version = "1.56" name = "tokio-stream" -version = "0.1.12" +version = "0.1.14" authors = ["Tokio Contributors <team@tokio.rs>"] description = """ Utilities to work with `Stream` and `tokio`. @@ -22,14 +22,15 @@ homepage = "https://tokio.rs" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" +resolver = "1" [package.metadata.docs.rs] all-features = true -rustdoc-args = [ +rustc-args = [ "--cfg", "docsrs", ] -rustc-args = [ +rustdoc-args = [ "--cfg", "docsrs", ] @@ -41,7 +42,7 @@ version = "0.3.0" version = "0.2.0" [dependencies.tokio] -version = "1.8.0" +version = "1.15.0" features = ["sync"] [dependencies.tokio-util] @@ -68,6 +69,14 @@ features = [ [features] default = ["time"] fs = ["tokio/fs"] +full = [ + "time", + "net", + "io-util", + "fs", + "sync", + "signal", +] io-util = ["tokio/io-util"] net = ["tokio/net"] signal = ["tokio/signal"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index f87b59a..9a90cd3 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,9 +4,9 @@ name = "tokio-stream" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.12" -edition = "2018" -rust-version = "1.49" +version = "0.1.14" +edition = "2021" +rust-version = "1.56" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" @@ -18,6 +18,16 @@ categories = ["asynchronous"] [features] default = ["time"] + +full = [ + "time", + "net", + "io-util", + "fs", + "sync", + "signal" +] + time = ["tokio/time"] net = ["tokio/net"] io-util = ["tokio/io-util"] @@ -28,7 +38,7 @@ signal = ["tokio/signal"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" -tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] } +tokio = { version = "1.15.0", path = "../tokio", features = ["sync"] } tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true } [dev-dependencies] @@ -1,23 +1,20 @@ # This project was upgraded with external_updater. -# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream -# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md +# Usage: tools/external_updater/updater.sh update external/rust/crates/tokio-stream +# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md name: "tokio-stream" description: "Utilities to work with `Stream` and `tokio`." third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/tokio-stream" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate" - } - version: "0.1.12" license_type: NOTICE last_upgrade_date { - year: 2023 - month: 3 - day: 30 + year: 2024 + month: 2 + day: 5 + } + homepage: "https://crates.io/crates/tokio-stream" + identifier { + type: "Archive" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.14.crate" + version: "0.1.14" } } @@ -96,5 +96,8 @@ pub use pending::{pending, Pending}; mod stream_map; pub use stream_map::StreamMap; +mod stream_close; +pub use stream_close::StreamNotifyClose; + #[doc(no_inline)] pub use futures_core::Stream; diff --git a/src/stream_close.rs b/src/stream_close.rs new file mode 100644 index 0000000..735acf0 --- /dev/null +++ b/src/stream_close.rs @@ -0,0 +1,93 @@ +use crate::Stream; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// A `Stream` that wraps the values in an `Option`. + /// + /// Whenever the wrapped stream yields an item, this stream yields that item + /// wrapped in `Some`. When the inner stream ends, then this stream first + /// yields a `None` item, and then this stream will also end. + /// + /// # Example + /// + /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. + /// + /// ``` + /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut map = StreamMap::new(); + /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); + /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); + /// map.insert(0, stream); + /// map.insert(1, stream2); + /// while let Some((key, val)) = map.next().await { + /// match val { + /// Some(val) => println!("got {val:?} from stream {key:?}"), + /// None => println!("stream {key:?} closed"), + /// } + /// } + /// } + /// ``` + #[must_use = "streams do nothing unless polled"] + pub struct StreamNotifyClose<S> { + #[pin] + inner: Option<S>, + } +} + +impl<S> StreamNotifyClose<S> { + /// Create a new `StreamNotifyClose`. + pub fn new(stream: S) -> Self { + Self { + inner: Some(stream), + } + } + + /// Get back the inner `Stream`. + /// + /// Returns `None` if the stream has reached its end. + pub fn into_inner(self) -> Option<S> { + self.inner + } +} + +impl<S> Stream for StreamNotifyClose<S> +where + S: Stream, +{ + type Item = Option<S::Item>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // We can't invoke poll_next after it ended, so we unset the inner stream as a marker. + match self + .as_mut() + .project() + .inner + .as_pin_mut() + .map(|stream| S::poll_next(stream, cx)) + { + Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))), + Some(Poll::Ready(None)) => { + self.project().inner.set(None); + Poll::Ready(Some(None)) + } + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(None), + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option<usize>) { + if let Some(inner) = &self.inner { + // We always return +1 because when there's stream there's atleast one more item. + let (l, u) = inner.size_hint(); + (l.saturating_add(1), u.and_then(|u| u.checked_add(1))) + } else { + (0, Some(0)) + } + } +} diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 52d3202..a4ab8a0 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -57,8 +57,10 @@ use try_next::TryNext; cfg_time! { pub(crate) mod timeout; + pub(crate) mod timeout_repeating; use timeout::Timeout; - use tokio::time::Duration; + use timeout_repeating::TimeoutRepeating; + use tokio::time::{Duration, Interval}; mod throttle; use throttle::{throttle, Throttle}; mod chunks_timeout; @@ -924,7 +926,9 @@ pub trait StreamExt: Stream { /// If the wrapped stream yields a value before the deadline is reached, the /// value is returned. Otherwise, an error is returned. The caller may decide /// to continue consuming the stream and will eventually get the next source - /// stream value once it becomes available. + /// stream value once it becomes available. See + /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative + /// where the timeouts will repeat. /// /// # Notes /// @@ -971,6 +975,25 @@ pub trait StreamExt: Stream { /// assert_eq!(int_stream.try_next().await, Ok(None)); /// # } /// ``` + /// + /// Once a timeout error is received, no further events will be received + /// unless the wrapped stream yields a value (timeouts do not repeat). + /// + /// ``` + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # async fn main() { + /// use tokio_stream::{StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); + /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10)); + /// tokio::pin!(timeout_stream); + /// + /// // Only one timeout will be received between values in the source stream. + /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); + /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts"); + /// # } + /// ``` #[cfg(all(feature = "time"))] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] fn timeout(self, duration: Duration) -> Timeout<Self> @@ -980,6 +1003,95 @@ pub trait StreamExt: Stream { Timeout::new(self, duration) } + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that + /// controls the time each element of the stream has to complete before + /// timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. Unlike `timeout()`, if no value + /// becomes available before the deadline is reached, additional errors are + /// returned at the specified interval. See [`timeout`](StreamExt::timeout) + /// for an alternative where the timeouts do not repeat. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1))); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + /// + /// Timeout errors will be continuously produced at the specified interval + /// until the wrapped stream yields a value. + /// + /// ``` + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # async fn main() { + /// use tokio_stream::{StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); + /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9))); + /// tokio::pin!(timeout_stream); + /// + /// // Multiple timeouts will be received between values in the source stream. + /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); + /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout"); + /// // Will eventually receive another value from the source stream... + /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout"); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> + where + Self: Sized, + { + TimeoutRepeating::new(self, interval) + } + /// Slows down a stream by enforcing a delay between items. /// /// The underlying timer behind this utility has a granularity of one millisecond. diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index a440d20..17d1349 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -23,7 +23,7 @@ pin_project! { } } -/// Error returned by `Timeout`. +/// Error returned by `Timeout` and `TimeoutRepeating`. #[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); diff --git a/src/stream_ext/timeout_repeating.rs b/src/stream_ext/timeout_repeating.rs new file mode 100644 index 0000000..253d2fd --- /dev/null +++ b/src/stream_ext/timeout_repeating.rs @@ -0,0 +1,56 @@ +use crate::stream_ext::Fuse; +use crate::{Elapsed, Stream}; +use tokio::time::Interval; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct TimeoutRepeating<S> { + #[pin] + stream: Fuse<S>, + #[pin] + interval: Interval, + } +} + +impl<S: Stream> TimeoutRepeating<S> { + pub(super) fn new(stream: S, interval: Interval) -> Self { + TimeoutRepeating { + stream: Fuse::new(stream), + interval, + } + } +} + +impl<S: Stream> Stream for TimeoutRepeating<S> { + type Item = Result<S::Item, Elapsed>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.project(); + + match me.stream.poll_next(cx) { + Poll::Ready(v) => { + if v.is_some() { + me.interval.reset(); + } + return Poll::Ready(v.map(Ok)); + } + Poll::Pending => {} + }; + + ready!(me.interval.poll_tick(cx)); + Poll::Ready(Some(Err(Elapsed::new()))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, _) = self.stream.size_hint(); + + // The timeout stream may insert an error an infinite number of times. + (lower, None) + } +} diff --git a/src/stream_map.rs b/src/stream_map.rs index 2159804..0c11bf1 100644 --- a/src/stream_map.rs +++ b/src/stream_map.rs @@ -42,10 +42,18 @@ use std::task::{Context, Poll}; /// to be merged, it may be advisable to use tasks sending values on a shared /// [`mpsc`] channel. /// +/// # Notes +/// +/// `StreamMap` removes finished streams automatically, without alerting the user. +/// In some scenarios, the caller would want to know on closed streams. +/// To do this, use [`StreamNotifyClose`] as a wrapper to your stream. +/// It will return None when the stream is closed. +/// /// [`StreamExt::merge`]: crate::StreamExt::merge /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html /// [`Box::pin`]: std::boxed::Box::pin +/// [`StreamNotifyClose`]: crate::StreamNotifyClose /// /// # Examples /// @@ -170,6 +178,28 @@ use std::task::{Context, Poll}; /// } /// } /// ``` +/// +/// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. +/// +/// ``` +/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut map = StreamMap::new(); +/// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); +/// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); +/// map.insert(0, stream); +/// map.insert(1, stream2); +/// while let Some((key, val)) = map.next().await { +/// match val { +/// Some(val) => println!("got {val:?} from stream {key:?}"), +/// None => println!("stream {key:?} closed"), +/// } +/// } +/// } +/// ``` + #[derive(Debug)] pub struct StreamMap<K, V> { /// Streams stored in the map @@ -568,7 +598,7 @@ where } } -impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V> +impl<K, V> FromIterator<(K, V)> for StreamMap<K, V> where K: Hash + Eq, { diff --git a/tests/stream_close.rs b/tests/stream_close.rs new file mode 100644 index 0000000..9ddb565 --- /dev/null +++ b/tests/stream_close.rs @@ -0,0 +1,11 @@ +use tokio_stream::{StreamExt, StreamNotifyClose}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); + + assert_eq!(stream.next().await, Some(Some(0))); + assert_eq!(stream.next().await, Some(Some(1))); + assert_eq!(stream.next().await, Some(None)); + assert_eq!(stream.next().await, None); +} |