aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2024-02-01 15:12:47 +0100
committerJeff Vander Stoep <jeffv@google.com>2024-02-01 15:12:48 +0100
commit41e437e55b496ba9779e4e5767abe63c77eb42bf (patch)
tree1d802034c6a30b7b67341a1da3bb5cdbea1b6956
parent37d065cc93c720c4c2d100385b6d872106bf5a9e (diff)
downloadfutures-util-41e437e55b496ba9779e4e5767abe63c77eb42bf.tar.gz
Upgrade futures-util to 0.3.30
This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update external/rust/crates/futures-util For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I4d01cd43e35cccd70ff58f1669ac3697035e8c60
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp4
-rw-r--r--Cargo.toml16
-rw-r--r--Cargo.toml.orig16
-rw-r--r--METADATA23
-rw-r--r--README.md2
-rw-r--r--benches/bilock.rs68
-rw-r--r--benches/flatten_unordered.rs18
-rw-r--r--benches_disabled/bilock.rs122
-rw-r--r--build.rs41
-rw-r--r--no_atomic_cas.rs17
-rw-r--r--src/abortable.rs24
-rw-r--r--src/async_await/mod.rs2
-rw-r--r--src/async_await/stream_select_mod.rs2
-rw-r--r--src/future/future/fuse.rs12
-rw-r--r--src/future/future/mod.rs4
-rw-r--r--src/future/future/shared.rs4
-rw-r--r--src/future/join_all.rs17
-rw-r--r--src/future/mod.rs6
-rw-r--r--src/future/select.rs17
-rw-r--r--src/future/try_join_all.rs13
-rw-r--r--src/future/try_select.rs13
-rw-r--r--src/io/copy_buf_abortable.rs2
-rw-r--r--src/io/cursor.rs8
-rw-r--r--src/io/fill_buf.rs18
-rw-r--r--src/io/mod.rs6
-rw-r--r--src/io/read_line.rs1
-rw-r--r--src/io/split.rs14
-rw-r--r--src/io/window.rs2
-rw-r--r--src/lib.rs2
-rw-r--r--src/lock/bilock.rs53
-rw-r--r--src/lock/mod.rs10
-rw-r--r--src/stream/futures_ordered.rs71
-rw-r--r--src/stream/futures_unordered/mod.rs23
-rw-r--r--src/stream/futures_unordered/ready_to_run_queue.rs37
-rw-r--r--src/stream/mod.rs47
-rw-r--r--src/stream/select_all.rs35
-rw-r--r--src/stream/stream/all.rs23
-rw-r--r--src/stream/stream/any.rs23
-rw-r--r--src/stream/stream/flatten_unordered.rs243
-rw-r--r--src/stream/stream/mod.rs57
-rw-r--r--src/stream/stream/split.rs80
-rw-r--r--src/stream/try_stream/mod.rs205
-rw-r--r--src/stream/try_stream/try_all.rs98
-rw-r--r--src/stream/try_stream/try_any.rs98
-rw-r--r--src/stream/try_stream/try_chunks.rs5
-rw-r--r--src/stream/try_stream/try_flatten_unordered.rs176
-rw-r--r--src/stream/try_stream/try_ready_chunks.rs126
-rw-r--r--src/stream/unfold.rs2
-rw-r--r--src/task/mod.rs11
50 files changed, 1319 insertions, 600 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index dde4c7f..1833f75 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "5e3693a350f96244151081d2c030208cd15f9572"
+ "sha1": "de1a0fd64a1bcae9a1534ed4da1699632993cc26"
},
"path_in_vcs": "futures-util"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 60e4e8e..a38886e 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_test {
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.26",
+ cargo_pkg_version: "0.3.30",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -86,7 +86,7 @@ rust_library {
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.26",
+ cargo_pkg_version: "0.3.30",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/Cargo.toml b/Cargo.toml
index 47e9f55..c95816a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,9 +11,9 @@
[package]
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
name = "futures-util"
-version = "0.3.26"
+version = "0.3.30"
description = """
Common utilities and extension traits for the futures-rs library.
"""
@@ -30,33 +30,33 @@ rustdoc-args = [
]
[dependencies.futures-channel]
-version = "0.3.26"
+version = "0.3.30"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.26"
+version = "0.3.30"
default-features = false
[dependencies.futures-io]
-version = "0.3.26"
+version = "0.3.30"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.26"
+version = "=0.3.30"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.26"
+version = "0.3.30"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.26"
+version = "0.3.30"
default-features = false
[dependencies.futures_01]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 95c3dee..dcdbce4 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,8 +1,8 @@
[package]
name = "futures-util"
-version = "0.3.26"
+version = "0.3.30"
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
@@ -35,12 +35,12 @@ write-all-vectored = ["io"]
cfg-target-has-atomic = []
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.26", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.26", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.26", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.26", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.30", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.30", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.30", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.30", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
diff --git a/METADATA b/METADATA
index 93b66be..8ba8c32 100644
--- a/METADATA
+++ b/METADATA
@@ -1,23 +1,20 @@
# This project was upgraded with external_updater.
-# Usage: tools/external_updater/updater.sh update rust/crates/futures-util
-# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+# Usage: tools/external_updater/updater.sh update external/rust/crates/futures-util
+# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md
name: "futures-util"
description: "Common utilities and extension traits for the futures-rs library."
third_party {
- url {
- type: HOMEPAGE
- value: "https://crates.io/crates/futures-util"
- }
- url {
- type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.26.crate"
- }
- version: "0.3.26"
license_type: NOTICE
last_upgrade_date {
- year: 2023
+ year: 2024
month: 2
- day: 15
+ day: 1
+ }
+ homepage: "https://crates.io/crates/futures-util"
+ identifier {
+ type: "Archive"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.30.crate"
+ version: "0.3.30"
}
}
diff --git a/README.md b/README.md
index 6e0aaed..60e2c21 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@ Add this to your `Cargo.toml`:
futures-util = "0.3"
```
-The current `futures-util` requires Rust 1.45 or later.
+The current `futures-util` requires Rust 1.56 or later.
## License
diff --git a/benches/bilock.rs b/benches/bilock.rs
new file mode 100644
index 0000000..013f335
--- /dev/null
+++ b/benches/bilock.rs
@@ -0,0 +1,68 @@
+#![feature(test)]
+#![cfg(feature = "bilock")]
+
+extern crate test;
+
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use futures_util::lock::BiLock;
+
+use crate::test::Bencher;
+
+#[bench]
+fn contended(b: &mut Bencher) {
+ let mut context = noop_context();
+
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ // Try poll second lock while first lock still holds the lock
+ match y.poll_lock(&mut context) {
+ Poll::Pending => (),
+ _ => panic!(),
+ };
+
+ drop(x_guard);
+
+ let y_guard = match y.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ drop(y_guard);
+ }
+ (x, y)
+ });
+}
+
+#[bench]
+fn lock_unlock(b: &mut Bencher) {
+ let mut context = noop_context();
+
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ drop(x_guard);
+
+ let y_guard = match y.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ drop(y_guard);
+ }
+ (x, y)
+ })
+}
diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs
index 64d5f9a..517b281 100644
--- a/benches/flatten_unordered.rs
+++ b/benches/flatten_unordered.rs
@@ -5,9 +5,10 @@ use crate::test::Bencher;
use futures::channel::oneshot;
use futures::executor::block_on;
-use futures::future::{self, FutureExt};
+use futures::future;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
+use futures_util::FutureExt;
use std::collections::VecDeque;
use std::thread;
@@ -34,18 +35,9 @@ fn oneshot_streams(b: &mut Bencher) {
}
});
- let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
- async {
- if let Some(next) = vals.next() {
- let val = next.await.unwrap();
- Some((val, vals))
- } else {
- None
- }
- }
- .boxed()
- })
- .flatten_unordered(None);
+ let mut flatten = stream::iter(rxs)
+ .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten())
+ .flatten_unordered(None);
block_on(future::poll_fn(move |cx| {
let mut count = 0;
diff --git a/benches_disabled/bilock.rs b/benches_disabled/bilock.rs
deleted file mode 100644
index 417f75d..0000000
--- a/benches_disabled/bilock.rs
+++ /dev/null
@@ -1,122 +0,0 @@
-#![feature(test)]
-
-#[cfg(feature = "bilock")]
-mod bench {
- use futures::executor::LocalPool;
- use futures::task::{Context, Waker};
- use futures_util::lock::BiLock;
- use futures_util::lock::BiLockAcquire;
- use futures_util::lock::BiLockAcquired;
- use futures_util::task::ArcWake;
-
- use std::sync::Arc;
- use test::Bencher;
-
- fn notify_noop() -> Waker {
- struct Noop;
-
- impl ArcWake for Noop {
- fn wake(_: &Arc<Self>) {}
- }
-
- ArcWake::into_waker(Arc::new(Noop))
- }
-
- /// Pseudo-stream which simply calls `lock.poll()` on `poll`
- struct LockStream {
- lock: BiLockAcquire<u32>,
- }
-
- impl LockStream {
- fn new(lock: BiLock<u32>) -> Self {
- Self { lock: lock.lock() }
- }
-
- /// Release a lock after it was acquired in `poll`,
- /// so `poll` could be called again.
- fn release_lock(&mut self, guard: BiLockAcquired<u32>) {
- self.lock = guard.unlock().lock()
- }
- }
-
- impl Stream for LockStream {
- type Item = BiLockAcquired<u32>;
- type Error = ();
-
- fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> {
- self.lock.poll(cx).map(|a| a.map(Some))
- }
- }
-
- #[bench]
- fn contended(b: &mut Bencher) {
- let pool = LocalPool::new();
- let mut exec = pool.executor();
- let waker = notify_noop();
- let mut map = task::LocalMap::new();
- let mut waker = task::Context::new(&mut map, &waker, &mut exec);
-
- b.iter(|| {
- let (x, y) = BiLock::new(1);
-
- let mut x = LockStream::new(x);
- let mut y = LockStream::new(y);
-
- for _ in 0..1000 {
- let x_guard = match x.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- // Try poll second lock while first lock still holds the lock
- match y.poll_next(&mut waker) {
- Ok(Poll::Pending) => (),
- _ => panic!(),
- };
-
- x.release_lock(x_guard);
-
- let y_guard = match y.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- y.release_lock(y_guard);
- }
- (x, y)
- });
- }
-
- #[bench]
- fn lock_unlock(b: &mut Bencher) {
- let pool = LocalPool::new();
- let mut exec = pool.executor();
- let waker = notify_noop();
- let mut map = task::LocalMap::new();
- let mut waker = task::Context::new(&mut map, &waker, &mut exec);
-
- b.iter(|| {
- let (x, y) = BiLock::new(1);
-
- let mut x = LockStream::new(x);
- let mut y = LockStream::new(y);
-
- for _ in 0..1000 {
- let x_guard = match x.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- x.release_lock(x_guard);
-
- let y_guard = match y.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- y.release_lock(y_guard);
- }
- (x, y)
- })
- }
-}
diff --git a/build.rs b/build.rs
deleted file mode 100644
index 05e0496..0000000
--- a/build.rs
+++ /dev/null
@@ -1,41 +0,0 @@
-// The rustc-cfg listed below are considered public API, but it is *unstable*
-// and outside of the normal semver guarantees:
-//
-// - `futures_no_atomic_cas`
-// Assume the target does *not* support atomic CAS operations.
-// This is usually detected automatically by the build script, but you may
-// need to enable it manually when building for custom targets or using
-// non-cargo build systems that don't run the build script.
-//
-// With the exceptions mentioned above, the rustc-cfg emitted by the build
-// script are *not* public API.
-
-#![warn(rust_2018_idioms, single_use_lifetimes)]
-
-use std::env;
-
-include!("no_atomic_cas.rs");
-
-fn main() {
- let target = match env::var("TARGET") {
- Ok(target) => target,
- Err(e) => {
- println!(
- "cargo:warning={}: unable to get TARGET environment variable: {}",
- env!("CARGO_PKG_NAME"),
- e
- );
- return;
- }
- };
-
- // Note that this is `no_*`, not `has_*`. This allows treating
- // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
- // run. This is needed for compatibility with non-cargo build systems that
- // don't run the build script.
- if NO_ATOMIC_CAS.contains(&&*target) {
- println!("cargo:rustc-cfg=futures_no_atomic_cas");
- }
-
- println!("cargo:rerun-if-changed=no_atomic_cas.rs");
-}
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
deleted file mode 100644
index 16ec628..0000000
--- a/no_atomic_cas.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-// This file is @generated by no_atomic_cas.sh.
-// It is not intended for manual editing.
-
-const NO_ATOMIC_CAS: &[&str] = &[
- "armv4t-none-eabi",
- "armv5te-none-eabi",
- "avr-unknown-gnu-atmega328",
- "bpfeb-unknown-none",
- "bpfel-unknown-none",
- "msp430-none-elf",
- "riscv32i-unknown-none-elf",
- "riscv32im-unknown-none-elf",
- "riscv32imc-unknown-none-elf",
- "thumbv4t-none-eabi",
- "thumbv5te-none-eabi",
- "thumbv6m-none-eabi",
-];
diff --git a/src/abortable.rs b/src/abortable.rs
index e0afd47..9dbcfc2 100644
--- a/src/abortable.rs
+++ b/src/abortable.rs
@@ -78,6 +78,17 @@ pub struct AbortRegistration {
pub(crate) inner: Arc<AbortInner>,
}
+impl AbortRegistration {
+ /// Create an [`AbortHandle`] from the given [`AbortRegistration`].
+ ///
+ /// The created [`AbortHandle`] is functionally the same as any other
+ /// [`AbortHandle`]s that are associated with the same [`AbortRegistration`],
+ /// such as the one created by [`AbortHandle::new_pair`].
+ pub fn handle(&self) -> AbortHandle {
+ AbortHandle { inner: self.inner.clone() }
+ }
+}
+
/// A handle to an `Abortable` task.
#[derive(Debug, Clone)]
pub struct AbortHandle {
@@ -182,4 +193,17 @@ impl AbortHandle {
self.inner.aborted.store(true, Ordering::Relaxed);
self.inner.waker.wake();
}
+
+ /// Checks whether [`AbortHandle::abort`] was *called* on any associated
+ /// [`AbortHandle`]s, which includes all the [`AbortHandle`]s linked with
+ /// the same [`AbortRegistration`]. This means that it will return `true`
+ /// even if:
+ /// * `abort` was called after the task had completed.
+ /// * `abort` was called while the task was being polled - the task may still be running and
+ /// will not be stopped until `poll` returns.
+ ///
+ /// This operation has a Relaxed ordering.
+ pub fn is_aborted(&self) -> bool {
+ self.inner.aborted.load(Ordering::Relaxed)
+ }
}
diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs
index 7276da2..7e3f12c 100644
--- a/src/async_await/mod.rs
+++ b/src/async_await/mod.rs
@@ -31,9 +31,11 @@ mod select_mod;
pub use self::select_mod::*;
// Primary export is a macro
+#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
mod stream_select_mod;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
+#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
pub use self::stream_select_mod::*;
diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs
index 1c8002f..61e3fa1 100644
--- a/src/async_await/stream_select_mod.rs
+++ b/src/async_await/stream_select_mod.rs
@@ -1,6 +1,5 @@
//! The `stream_select` macro.
-#[cfg(feature = "std")]
#[allow(unreachable_pub)]
#[doc(hidden)]
pub use futures_macro::stream_select_internal;
@@ -28,7 +27,6 @@ pub use futures_macro::stream_select_internal;
/// }
/// # });
/// ```
-#[cfg(feature = "std")]
#[macro_export]
macro_rules! stream_select {
($($tokens:tt)*) => {{
diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs
index 597aec1..2257906 100644
--- a/src/future/future/fuse.rs
+++ b/src/future/future/fuse.rs
@@ -1,6 +1,5 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
-use futures_core::ready;
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
@@ -81,13 +80,12 @@ impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
- Poll::Ready(match self.as_mut().project().inner.as_pin_mut() {
- Some(fut) => {
- let output = ready!(fut.poll(cx));
+ match self.as_mut().project().inner.as_pin_mut() {
+ Some(fut) => fut.poll(cx).map(|output| {
self.project().inner.set(None);
output
- }
- None => return Poll::Pending,
- })
+ }),
+ None => Poll::Pending,
+ }
}
}
diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs
index c11d108..955af37 100644
--- a/src/future/future/mod.rs
+++ b/src/future/future/mod.rs
@@ -463,10 +463,6 @@ pub trait FutureExt: Future {
/// ```
///
/// ```
- /// // Note, unlike most examples this is written in the context of a
- /// // synchronous function to better illustrate the cross-thread aspect of
- /// // the `shared` combinator.
- ///
/// # futures::executor::block_on(async {
/// use futures::future::FutureExt;
/// use futures::executor::block_on;
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs
index ecd1b42..9ab3b4f 100644
--- a/src/future/future/shared.rs
+++ b/src/future/future/shared.rs
@@ -37,10 +37,6 @@ impl<Fut: Future> Clone for WeakShared<Fut> {
}
}
-// The future itself is polled behind the `Arc`, so it won't be moved
-// when `Shared` is moved.
-impl<Fut: Future> Unpin for Shared<Fut> {}
-
impl<Fut: Future> fmt::Debug for Shared<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Shared")
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index 7dc159b..79eee8d 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -12,7 +12,7 @@ use core::task::{Context, Poll};
use super::{assert_future, MaybeDone};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
use crate::stream::{Collect, FuturesOrdered, StreamExt};
pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
@@ -31,7 +31,7 @@ where
kind: JoinAllKind<F>,
}
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
pub(crate) const SMALL: usize = 30;
enum JoinAllKind<F>
@@ -41,7 +41,7 @@ where
Small {
elems: Pin<Box<[MaybeDone<F>]>>,
},
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
Big {
fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
},
@@ -57,7 +57,7 @@ where
JoinAllKind::Small { ref elems } => {
f.debug_struct("JoinAll").field("elems", elems).finish()
}
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
}
}
@@ -77,7 +77,7 @@ where
///
/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
/// reasons if the number of futures is large. You may want to look into using it or
-/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
+/// its counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
///
/// Some examples for additional functionality provided by these are:
///
@@ -106,7 +106,8 @@ where
{
let iter = iter.into_iter();
- #[cfg(futures_no_atomic_cas)]
+ #[cfg(target_os = "none")]
+ #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))]
{
let kind =
JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() };
@@ -114,7 +115,7 @@ where
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
{
let kind = match iter.size_hint().1 {
Some(max) if max <= SMALL => JoinAllKind::Small {
@@ -153,7 +154,7 @@ where
Poll::Pending
}
}
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
diff --git a/src/future/mod.rs b/src/future/mod.rs
index 374e365..2d8fa4f 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -111,13 +111,13 @@ pub use self::select_ok::{select_ok, SelectOk};
mod either;
pub use self::either::Either;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod abortable;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use abortable::abortable;
diff --git a/src/future/select.rs b/src/future/select.rs
index e693a30..7e33d19 100644
--- a/src/future/select.rs
+++ b/src/future/select.rs
@@ -99,17 +99,26 @@ where
type Output = Either<(A::Output, B), (B::Output, A)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
+ /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where
+ /// `Option::unwrap` does not.
+ #[inline(always)]
+ fn unwrap_option<T>(value: Option<T>) -> T {
+ match value {
+ None => unreachable!(),
+ Some(value) => value,
+ }
+ }
+
+ let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");
if let Poll::Ready(val) = a.poll_unpin(cx) {
- return Poll::Ready(Either::Left((val, b)));
+ return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1)));
}
if let Poll::Ready(val) = b.poll_unpin(cx) {
- return Poll::Ready(Either::Right((val, a)));
+ return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0)));
}
- self.inner = Some((a, b));
Poll::Pending
}
}
diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs
index 506f450..2d6a2a0 100644
--- a/src/future/try_join_all.rs
+++ b/src/future/try_join_all.rs
@@ -12,7 +12,7 @@ use core::task::{Context, Poll};
use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt};
use crate::TryFutureExt;
@@ -38,7 +38,7 @@ where
Small {
elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>,
},
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
Big {
fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
},
@@ -56,7 +56,7 @@ where
TryJoinAllKind::Small { ref elems } => {
f.debug_struct("TryJoinAll").field("elems", elems).finish()
}
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
}
}
@@ -121,7 +121,8 @@ where
{
let iter = iter.into_iter().map(TryFutureExt::into_future);
- #[cfg(futures_no_atomic_cas)]
+ #[cfg(target_os = "none")]
+ #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))]
{
let kind = TryJoinAllKind::Small {
elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
@@ -132,7 +133,7 @@ where
)
}
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
{
let kind = match iter.size_hint().1 {
Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small {
@@ -184,7 +185,7 @@ where
}
}
}
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
diff --git a/src/future/try_select.rs b/src/future/try_select.rs
index 4d0b7ff..bc282f7 100644
--- a/src/future/try_select.rs
+++ b/src/future/try_select.rs
@@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {
impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
+type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
+type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;
+
/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
@@ -52,10 +55,9 @@ where
A: TryFuture + Unpin,
B: TryFuture + Unpin,
{
- super::assert_future::<
- Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
- _,
- >(TrySelect { inner: Some((future1, future2)) })
+ super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
+ inner: Some((future1, future2)),
+ })
}
impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
@@ -63,8 +65,7 @@ where
A: TryFuture,
B: TryFuture,
{
- #[allow(clippy::type_complexity)]
- type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
+ type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
diff --git a/src/io/copy_buf_abortable.rs b/src/io/copy_buf_abortable.rs
index fdbc4a5..ed22d62 100644
--- a/src/io/copy_buf_abortable.rs
+++ b/src/io/copy_buf_abortable.rs
@@ -57,7 +57,7 @@ where
}
pin_project! {
- /// Future for the [`copy_buf()`] function.
+ /// Future for the [`copy_buf_abortable()`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyBufAbortable<'a, R, W: ?Sized> {
diff --git a/src/io/cursor.rs b/src/io/cursor.rs
index b6fb372..c6e2aee 100644
--- a/src/io/cursor.rs
+++ b/src/io/cursor.rs
@@ -1,6 +1,4 @@
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read_initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
use std::io;
use std::pin::Pin;
@@ -159,12 +157,6 @@ where
}
impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> {
- #[cfg(feature = "read_initializer")]
- #[inline]
- unsafe fn initializer(&self) -> Initializer {
- io::Read::initializer(&self.inner)
- }
-
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs
index a1484c0..45862b8 100644
--- a/src/io/fill_buf.rs
+++ b/src/io/fill_buf.rs
@@ -3,6 +3,7 @@ use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
use std::io;
use std::pin::Pin;
+use std::slice;
/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method.
#[derive(Debug)]
@@ -30,17 +31,12 @@ where
let reader = this.reader.take().expect("Polled FillBuf after completion");
match Pin::new(&mut *reader).poll_fill_buf(cx) {
- // With polonius it is possible to remove this inner match and just have the correct
- // lifetime of the reference inferred based on which branch is taken
- Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
- Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
- Poll::Ready(Err(err)) => {
- unreachable!("reader indicated readiness but then returned an error: {:?}", err)
- }
- Poll::Pending => {
- unreachable!("reader indicated readiness but then returned pending")
- }
- },
+ Poll::Ready(Ok(slice)) => {
+ // With polonius it is possible to remove this lifetime transmutation and just have
+ // the correct lifetime of the reference inferred based on which branch is taken
+ let slice: &'a [u8] = unsafe { slice::from_raw_parts(slice.as_ptr(), slice.len()) };
+ Poll::Ready(Ok(slice))
+ }
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => {
this.reader = Some(reader);
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 8ce3ad6..fdad60b 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -804,11 +804,11 @@ pub trait AsyncBufReadExt: AsyncBufRead {
/// use futures::io::{AsyncBufReadExt, Cursor};
/// use futures::stream::StreamExt;
///
- /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
+ /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
///
- /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
+ /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
/// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
- /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
+ /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
/// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
/// assert_eq!(lines_stream.next().await, None);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
diff --git a/src/io/read_line.rs b/src/io/read_line.rs
index e1b8fc9..df782c9 100644
--- a/src/io/read_line.rs
+++ b/src/io/read_line.rs
@@ -35,6 +35,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
) -> Poll<io::Result<usize>> {
let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(bytes).is_err() {
+ bytes.clear();
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
}))
diff --git a/src/io/split.rs b/src/io/split.rs
index 3f1b9af..81d1e6d 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -31,6 +31,13 @@ pub(super) fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<
(ReadHalf { handle: a }, WriteHalf { handle: b })
}
+impl<T> ReadHalf<T> {
+ /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same stream.
+ pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool {
+ self.handle.is_pair_of(&other.handle)
+ }
+}
+
impl<T: Unpin> ReadHalf<T> {
/// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back
/// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are
@@ -42,6 +49,13 @@ impl<T: Unpin> ReadHalf<T> {
}
}
+impl<T> WriteHalf<T> {
+ /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same stream.
+ pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool {
+ self.handle.is_pair_of(&other.handle)
+ }
+}
+
impl<T: Unpin> WriteHalf<T> {
/// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back
/// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are
diff --git a/src/io/window.rs b/src/io/window.rs
index 77b7267..d857282 100644
--- a/src/io/window.rs
+++ b/src/io/window.rs
@@ -1,6 +1,6 @@
use std::ops::{Bound, Range, RangeBounds};
-/// A owned window around an underlying buffer.
+/// An owned window around an underlying buffer.
///
/// Normally slices work great for considering sub-portions of a buffer, but
/// unfortunately a slice is a *borrowed* type in Rust which has an associated
diff --git a/src/lib.rs b/src/lib.rs
index 9a10c93..208eb73 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -329,7 +329,7 @@ pub use crate::io::{
#[cfg(feature = "alloc")]
pub mod lock;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod abortable;
diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs
index 2174079..a89678e 100644
--- a/src/lock/bilock.rs
+++ b/src/lock/bilock.rs
@@ -3,11 +3,11 @@
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::cell::UnsafeCell;
-use core::fmt;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
-use core::sync::atomic::AtomicUsize;
+use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering::SeqCst;
+use core::{fmt, ptr};
#[cfg(feature = "bilock")]
use futures_core::future::Future;
use futures_core::task::{Context, Poll, Waker};
@@ -41,7 +41,7 @@ pub struct BiLock<T> {
#[derive(Debug)]
struct Inner<T> {
- state: AtomicUsize,
+ state: AtomicPtr<Waker>,
value: Option<UnsafeCell<T>>,
}
@@ -61,7 +61,10 @@ impl<T> BiLock<T> {
/// Similarly, reuniting the lock and extracting the inner value is only
/// possible when `T` is `Unpin`.
pub fn new(t: T) -> (Self, Self) {
- let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) });
+ let arc = Arc::new(Inner {
+ state: AtomicPtr::new(ptr::null_mut()),
+ value: Some(UnsafeCell::new(t)),
+ });
(Self { arc: arc.clone() }, Self { arc })
}
@@ -87,7 +90,8 @@ impl<T> BiLock<T> {
pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
let mut waker = None;
loop {
- match self.arc.state.swap(1, SeqCst) {
+ let n = self.arc.state.swap(invalid_ptr(1), SeqCst);
+ match n as usize {
// Woohoo, we grabbed the lock!
0 => return Poll::Ready(BiLockGuard { bilock: self }),
@@ -96,8 +100,8 @@ impl<T> BiLock<T> {
// A task was previously blocked on this lock, likely our task,
// so we need to update that task.
- n => unsafe {
- let mut prev = Box::from_raw(n as *mut Waker);
+ _ => unsafe {
+ let mut prev = Box::from_raw(n);
*prev = cx.waker().clone();
waker = Some(prev);
},
@@ -105,9 +109,9 @@ impl<T> BiLock<T> {
// type ascription for safety's sake!
let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
- let me = Box::into_raw(me) as usize;
+ let me = Box::into_raw(me);
- match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) {
+ match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {
// The lock is still locked, but we've now parked ourselves, so
// just report that we're scheduled to receive a notification.
Ok(_) => return Poll::Pending,
@@ -115,8 +119,8 @@ impl<T> BiLock<T> {
// Oops, looks like the lock was unlocked after our swap above
// and before the compare_exchange. Deallocate what we just
// allocated and go through the loop again.
- Err(0) => unsafe {
- waker = Some(Box::from_raw(me as *mut Waker));
+ Err(n) if n.is_null() => unsafe {
+ waker = Some(Box::from_raw(me));
},
// The top of this loop set the previous state to 1, so if we
@@ -125,7 +129,7 @@ impl<T> BiLock<T> {
// but we're trying to acquire the lock and there's only one
// other reference of the lock, so it should be impossible for
// that task to ever block itself.
- Err(n) => panic!("invalid state: {}", n),
+ Err(n) => panic!("invalid state: {}", n as usize),
}
}
}
@@ -145,6 +149,11 @@ impl<T> BiLock<T> {
BiLockAcquire { bilock: self }
}
+ /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`.
+ pub fn is_pair_of(&self, other: &Self) -> bool {
+ Arc::ptr_eq(&self.arc, &other.arc)
+ }
+
/// Attempts to put the two "halves" of a `BiLock<T>` back together and
/// recover the original value. Succeeds only if the two `BiLock<T>`s
/// originated from the same call to `BiLock::new`.
@@ -152,7 +161,7 @@ impl<T> BiLock<T> {
where
T: Unpin,
{
- if Arc::ptr_eq(&self.arc, &other.arc) {
+ if self.is_pair_of(&other) {
drop(other);
let inner = Arc::try_unwrap(self.arc)
.ok()
@@ -164,7 +173,8 @@ impl<T> BiLock<T> {
}
fn unlock(&self) {
- match self.arc.state.swap(0, SeqCst) {
+ let n = self.arc.state.swap(ptr::null_mut(), SeqCst);
+ match n as usize {
// we've locked the lock, shouldn't be possible for us to see an
// unlocked lock.
0 => panic!("invalid unlocked state"),
@@ -174,8 +184,8 @@ impl<T> BiLock<T> {
// Another task has parked themselves on this lock, let's wake them
// up as its now their turn.
- n => unsafe {
- Box::from_raw(n as *mut Waker).wake();
+ _ => unsafe {
+ Box::from_raw(n).wake();
},
}
}
@@ -189,7 +199,7 @@ impl<T: Unpin> Inner<T> {
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
- assert_eq!(self.state.load(SeqCst), 0);
+ assert!(self.state.load(SeqCst).is_null());
}
}
@@ -277,3 +287,12 @@ impl<'a, T> Future for BiLockAcquire<'a, T> {
self.bilock.poll_lock(cx)
}
}
+
+// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible.
+#[allow(clippy::useless_transmute)]
+#[inline]
+fn invalid_ptr<T>(addr: usize) -> *mut T {
+ // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that
+ // pointer).
+ unsafe { core::mem::transmute(addr) }
+}
diff --git a/src/lock/mod.rs b/src/lock/mod.rs
index 0be7271..8ca0ff6 100644
--- a/src/lock/mod.rs
+++ b/src/lock/mod.rs
@@ -3,25 +3,25 @@
//! This module is only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(any(feature = "sink", feature = "io"))]
#[cfg(not(feature = "bilock"))]
pub(crate) use self::bilock::BiLock;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "bilock")]
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "std")]
pub use self::mutex::{
MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture,
};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
mod bilock;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "std")]
mod mutex;
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index 618bf1b..2cc144e 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -19,7 +19,8 @@ pin_project! {
struct OrderWrapper<T> {
#[pin]
data: T, // A future or a future's output
- index: isize,
+ // Use i64 for index since isize may overflow in 32-bit targets.
+ index: i64,
}
}
@@ -58,36 +59,39 @@ where
/// An unbounded queue of futures.
///
-/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order
-/// on top of the set of futures. While futures in the set will race to
+/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO
+/// order on top of the set of futures. While futures in the set will race to
/// completion in parallel, results will only be returned in the order their
/// originating futures were added to the queue.
///
/// Futures are pushed into this queue and their realized values are yielded in
/// order. This structure is optimized to manage a large number of futures.
-/// Futures managed by `FuturesOrdered` will only be polled when they generate
+/// Futures managed by [`FuturesOrdered`] will only be polled when they generate
/// notifications. This reduces the required amount of work needed to coordinate
/// large numbers of futures.
///
-/// When a `FuturesOrdered` is first created, it does not contain any futures.
-/// Calling `poll` in this state will result in `Poll::Ready(None))` to be
-/// returned. Futures are submitted to the queue using `push`; however, the
-/// future will **not** be polled at this point. `FuturesOrdered` will only
-/// poll managed futures when `FuturesOrdered::poll` is called. As such, it
-/// is important to call `poll` after pushing new futures.
+/// When a [`FuturesOrdered`] is first created, it does not contain any futures.
+/// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result
+/// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted
+/// to the queue using [`push_back`](FuturesOrdered::push_back) (or
+/// [`push_front`](FuturesOrdered::push_front)); however, the future will
+/// **not** be polled at this point. [`FuturesOrdered`] will only poll managed
+/// futures when [`FuturesOrdered::poll_next`] is called. As such, it
+/// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing
+/// new futures.
///
-/// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that
-/// the queue is currently not managing any futures. A future may be submitted
-/// to the queue at a later time. At that point, a call to
-/// `FuturesOrdered::poll` will either return the future's resolved value
-/// **or** `Poll::Pending` if the future has not yet completed. When
-/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
-/// return `Poll::Pending` until the first future completes, even if
+/// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready)
+/// this means that the queue is currently not managing any futures. A future
+/// may be submitted to the queue at a later time. At that point, a call to
+/// [`FuturesOrdered::poll_next`] will either return the future's resolved value
+/// **or** [`Poll::Pending`] if the future has not yet completed. When
+/// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`]
+/// will return [`Poll::Pending`] until the first future completes, even if
/// some of the later futures have already completed.
///
-/// Note that you can create a ready-made `FuturesOrdered` via the
+/// Note that you can create a ready-made [`FuturesOrdered`] via the
/// [`collect`](Iterator::collect) method, or you can start with an empty queue
-/// with the `FuturesOrdered::new` constructor.
+/// with the [`FuturesOrdered::new`] constructor.
///
/// This type is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
@@ -95,8 +99,8 @@ where
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
- next_incoming_index: isize,
- next_outgoing_index: isize,
+ next_incoming_index: i64,
+ next_outgoing_index: i64,
}
impl<T: Future> Unpin for FuturesOrdered<T> {}
@@ -104,8 +108,9 @@ impl<T: Future> Unpin for FuturesOrdered<T> {}
impl<Fut: Future> FuturesOrdered<Fut> {
/// Constructs a new, empty `FuturesOrdered`
///
- /// The returned `FuturesOrdered` does not contain any futures and, in this
- /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`.
+ /// The returned [`FuturesOrdered`] does not contain any futures and, in
+ /// this state, [`FuturesOrdered::poll_next`] will return
+ /// [`Poll::Ready(None)`](Poll::Ready).
pub fn new() -> Self {
Self {
in_progress_queue: FuturesUnordered::new(),
@@ -132,9 +137,9 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// Push a future into the queue.
///
/// This function submits the given future to the internal set for managing.
- /// This function will not call `poll` on the submitted future. The caller
- /// must ensure that `FuturesOrdered::poll` is called in order to receive
- /// task notifications.
+ /// This function will not call [`poll`](Future::poll) on the submitted
+ /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
+ /// called in order to receive task notifications.
#[deprecated(note = "use `push_back` instead")]
pub fn push(&mut self, future: Fut) {
self.push_back(future);
@@ -143,9 +148,9 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// Pushes a future to the back of the queue.
///
/// This function submits the given future to the internal set for managing.
- /// This function will not call `poll` on the submitted future. The caller
- /// must ensure that `FuturesOrdered::poll` is called in order to receive
- /// task notifications.
+ /// This function will not call [`poll`](Future::poll) on the submitted
+ /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
+ /// called in order to receive task notifications.
pub fn push_back(&mut self, future: Fut) {
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
self.next_incoming_index += 1;
@@ -155,10 +160,10 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// Pushes a future to the front of the queue.
///
/// This function submits the given future to the internal set for managing.
- /// This function will not call `poll` on the submitted future. The caller
- /// must ensure that `FuturesOrdered::poll` is called in order to receive
- /// task notifications. This future will be the next future to be returned
- /// complete.
+ /// This function will not call [`poll`](Future::poll) on the submitted
+ /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
+ /// called in order to receive task notifications. This future will be
+ /// the next future to be returned complete.
pub fn push_front(&mut self, future: Fut) {
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
self.next_outgoing_index -= 1;
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 6b5804d..dedf75d 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -62,7 +62,7 @@ pub struct FuturesUnordered<Fut> {
}
unsafe impl<Fut: Send> Send for FuturesUnordered<Fut> {}
-unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
+unsafe impl<Fut: Send + Sync> Sync for FuturesUnordered<Fut> {}
impl<Fut> Unpin for FuturesUnordered<Fut> {}
impl Spawn for FuturesUnordered<FutureObj<'_, ()>> {
@@ -558,20 +558,7 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
impl<Fut> FuturesUnordered<Fut> {
/// Clears the set, removing all futures.
pub fn clear(&mut self) {
- self.clear_head_all();
-
- // we just cleared all the tasks, and we have &mut self, so this is safe.
- unsafe { self.ready_to_run_queue.clear() };
-
- self.is_terminated.store(false, Relaxed);
- }
-
- fn clear_head_all(&mut self) {
- while !self.head_all.get_mut().is_null() {
- let head = *self.head_all.get_mut();
- let task = unsafe { self.unlink(head) };
- self.release_task(task);
- }
+ *self = Self::new();
}
}
@@ -581,7 +568,11 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
- self.clear_head_all();
+ while !self.head_all.get_mut().is_null() {
+ let head = *self.head_all.get_mut();
+ let task = unsafe { self.unlink(head) };
+ self.release_task(task);
+ }
// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs
index 4518705..a924935 100644
--- a/src/stream/futures_unordered/ready_to_run_queue.rs
+++ b/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -85,38 +85,25 @@ impl<Fut> ReadyToRunQueue<Fut> {
pub(super) fn stub(&self) -> *const Task<Fut> {
Arc::as_ptr(&self.stub)
}
-
- // Clear the queue of tasks.
- //
- // Note that each task has a strong reference count associated with it
- // which is owned by the ready to run queue. This method just pulls out
- // tasks and drops their refcounts.
- //
- // # Safety
- //
- // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
- // - The caller **must** guarantee unique access to `self`
- pub(crate) unsafe fn clear(&self) {
- loop {
- // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
- match self.dequeue() {
- Dequeue::Empty => break,
- Dequeue::Inconsistent => abort("inconsistent in drop"),
- Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
- }
- }
- }
}
impl<Fut> Drop for ReadyToRunQueue<Fut> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner<Fut>` we need to clear out
// the ready to run queue of tasks if there's anything left in there.
-
- // All tasks have had their futures dropped already by the `FuturesUnordered`
- // destructor above, and we have &mut self, so this is safe.
+ //
+ // Note that each task has a strong reference count associated with it
+ // which is owned by the ready to run queue. All tasks should have had
+ // their futures dropped already by the `FuturesUnordered` destructor
+ // above, so we're just pulling out tasks and dropping their refcounts.
unsafe {
- self.clear();
+ loop {
+ match self.dequeue() {
+ Dequeue::Empty => break,
+ Dequeue::Inconsistent => abort("inconsistent in drop"),
+ Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
+ }
+ }
}
}
}
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index ec685b9..2438e58 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
- Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
- Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
- Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
+ All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
+ Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
+ SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
+ Unzip, Zip,
};
#[cfg(feature = "std")]
@@ -36,11 +37,13 @@ pub use self::stream::ReadyChunks;
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::stream::Forward;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
-pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
+pub use self::stream::{
+ BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent,
+};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg(feature = "alloc")]
@@ -48,9 +51,9 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream};
mod try_stream;
pub use self::try_stream::{
- try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
- TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
- TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
+ try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll,
+ TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach,
+ TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
};
#[cfg(feature = "io")]
@@ -58,12 +61,14 @@ pub use self::try_stream::{
#[cfg(feature = "std")]
pub use self::try_stream::IntoAsyncRead;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
-pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
+pub use self::try_stream::{
+ TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent,
+};
#[cfg(feature = "alloc")]
-pub use self::try_stream::{TryChunks, TryChunksError};
+pub use self::try_stream::{TryChunks, TryChunksError, TryReadyChunks, TryReadyChunksError};
// Primitive streams
@@ -100,36 +105,36 @@ pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithS
mod unfold;
pub use self::unfold::{unfold, Unfold};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod futures_ordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use self::futures_ordered::FuturesOrdered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub mod futures_unordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use self::futures_unordered::FuturesUnordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub mod select_all;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use self::select_all::{select_all, SelectAll};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod abortable;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use abortable::abortable;
diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs
index 3474331..121b6a0 100644
--- a/src/stream/select_all.rs
+++ b/src/stream/select_all.rs
@@ -8,29 +8,24 @@ use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project_lite::pin_project;
-
use super::assert_stream;
use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture};
-pin_project! {
- /// An unbounded set of streams
- ///
- /// This "combinator" provides the ability to maintain a set of streams
- /// and drive them all to completion.
- ///
- /// Streams are pushed into this set and their realized values are
- /// yielded as they become ready. Streams will only be polled when they
- /// generate notifications. This allows to coordinate a large number of streams.
- ///
- /// Note that you can create a ready-made `SelectAll` via the
- /// `select_all` function in the `stream` module, or you can start with an
- /// empty set with the `SelectAll::new` constructor.
- #[must_use = "streams do nothing unless polled"]
- pub struct SelectAll<St> {
- #[pin]
- inner: FuturesUnordered<StreamFuture<St>>,
- }
+/// An unbounded set of streams
+///
+/// This "combinator" provides the ability to maintain a set of streams
+/// and drive them all to completion.
+///
+/// Streams are pushed into this set and their realized values are
+/// yielded as they become ready. Streams will only be polled when they
+/// generate notifications. This allows to coordinate a large number of streams.
+///
+/// Note that you can create a ready-made `SelectAll` via the
+/// `select_all` function in the `stream` module, or you can start with an
+/// empty set with the `SelectAll::new` constructor.
+#[must_use = "streams do nothing unless polled"]
+pub struct SelectAll<St> {
+ inner: FuturesUnordered<StreamFuture<St>>,
}
impl<St: Debug> Debug for SelectAll<St> {
diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs
index ba2baa5..1435c79 100644
--- a/src/stream/stream/all.rs
+++ b/src/stream/stream/all.rs
@@ -13,7 +13,7 @@ pin_project! {
#[pin]
stream: St,
f: F,
- accum: Option<bool>,
+ done: bool,
#[pin]
future: Option<Fut>,
}
@@ -27,7 +27,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("All")
.field("stream", &self.stream)
- .field("accum", &self.accum)
+ .field("done", &self.done)
.field("future", &self.future)
.finish()
}
@@ -40,7 +40,7 @@ where
Fut: Future<Output = bool>,
{
pub(super) fn new(stream: St, f: F) -> Self {
- Self { stream, f, accum: Some(true), future: None }
+ Self { stream, f, done: false, future: None }
}
}
@@ -51,7 +51,7 @@ where
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
- self.accum.is_none() && self.future.is_none()
+ self.done && self.future.is_none()
}
}
@@ -67,21 +67,22 @@ where
let mut this = self.project();
Poll::Ready(loop {
if let Some(fut) = this.future.as_mut().as_pin_mut() {
- // we're currently processing a future to produce a new accum value
- let acc = this.accum.unwrap() && ready!(fut.poll(cx));
- if !acc {
+ // we're currently processing a future to produce a new value
+ let res = ready!(fut.poll(cx));
+ this.future.set(None);
+ if !res {
+ *this.done = true;
break false;
} // early exit
- *this.accum = Some(acc);
- this.future.set(None);
- } else if this.accum.is_some() {
+ } else if !*this.done {
// we're waiting on a new item from the stream
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(item) => {
this.future.set(Some((this.f)(item)));
}
None => {
- break this.accum.take().unwrap();
+ *this.done = true;
+ break true;
}
}
} else {
diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs
index f023125..cc3d695 100644
--- a/src/stream/stream/any.rs
+++ b/src/stream/stream/any.rs
@@ -13,7 +13,7 @@ pin_project! {
#[pin]
stream: St,
f: F,
- accum: Option<bool>,
+ done: bool,
#[pin]
future: Option<Fut>,
}
@@ -27,7 +27,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Any")
.field("stream", &self.stream)
- .field("accum", &self.accum)
+ .field("done", &self.done)
.field("future", &self.future)
.finish()
}
@@ -40,7 +40,7 @@ where
Fut: Future<Output = bool>,
{
pub(super) fn new(stream: St, f: F) -> Self {
- Self { stream, f, accum: Some(false), future: None }
+ Self { stream, f, done: false, future: None }
}
}
@@ -51,7 +51,7 @@ where
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
- self.accum.is_none() && self.future.is_none()
+ self.done && self.future.is_none()
}
}
@@ -67,21 +67,22 @@ where
let mut this = self.project();
Poll::Ready(loop {
if let Some(fut) = this.future.as_mut().as_pin_mut() {
- // we're currently processing a future to produce a new accum value
- let acc = this.accum.unwrap() || ready!(fut.poll(cx));
- if acc {
+ // we're currently processing a future to produce a new value
+ let res = ready!(fut.poll(cx));
+ this.future.set(None);
+ if res {
+ *this.done = true;
break true;
} // early exit
- *this.accum = Some(acc);
- this.future.set(None);
- } else if this.accum.is_some() {
+ } else if !*this.done {
// we're waiting on a new item from the stream
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(item) => {
this.future.set(Some((this.f)(item)));
}
None => {
- break this.accum.take().unwrap();
+ *this.done = true;
+ break false;
}
}
} else {
diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs
index 07f971c..44c6ace 100644
--- a/src/stream/stream/flatten_unordered.rs
+++ b/src/stream/stream/flatten_unordered.rs
@@ -3,6 +3,7 @@ use core::{
cell::UnsafeCell,
convert::identity,
fmt,
+ marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
sync::atomic::{AtomicU8, Ordering},
@@ -22,8 +23,11 @@ use futures_task::{waker, ArcWake};
use crate::stream::FuturesUnordered;
-/// There is nothing to poll and stream isn't being
-/// polled or waking at the moment.
+/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+/// method.
+pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>;
+
+/// There is nothing to poll and stream isn't being polled/waking/woken at the moment.
const NONE: u8 = 0;
/// Inner streams need to be polled.
@@ -32,26 +36,19 @@ const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
/// The base stream needs to be polled.
const NEED_TO_POLL_STREAM: u8 = 0b10;
-/// It needs to poll base stream and inner streams.
+/// Both base stream and inner streams need to be polled.
const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
/// The current stream is being polled at the moment.
const POLLING: u8 = 0b100;
-/// Inner streams are being woken at the moment.
-const WAKING_INNER_STREAMS: u8 = 0b1000;
-
-/// The base stream is being woken at the moment.
-const WAKING_STREAM: u8 = 0b10000;
-
-/// The base stream and inner streams are being woken at the moment.
-const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
+/// Stream is being woken at the moment.
+const WAKING: u8 = 0b1000;
/// The stream was waked and will be polled.
-const WOKEN: u8 = 0b100000;
+const WOKEN: u8 = 0b10000;
-/// Determines what needs to be polled, and is stream being polled at the
-/// moment or not.
+/// Internal polling state of the stream.
#[derive(Clone, Debug)]
struct SharedPollState {
state: Arc<AtomicU8>,
@@ -64,14 +61,14 @@ impl SharedPollState {
}
/// Attempts to start polling, returning stored state in case of success.
- /// Returns `None` if some waker is waking at the moment.
+ /// Returns `None` if either waker is waking at the moment.
fn start_polling(
&self,
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
- if value & WAKING_ALL == NONE {
+ if value & WAKING == NONE {
Some(POLLING)
} else {
None
@@ -83,23 +80,20 @@ impl SharedPollState {
Some((value, bomb))
}
- /// Starts the waking process and performs bitwise or with the given value.
+ /// Attempts to start the waking process and performs bitwise or with the given value.
+ ///
+ /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however
+ /// state will be disjuncted with the given value.
fn start_waking(
&self,
to_poll: u8,
- waking: u8,
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
- // Waking process for this waker already started
- if value & waking != NONE {
- return None;
- }
let mut next_value = value | to_poll;
- // Only start the waking process if we're not in the polling phase and the stream isn't woken already
if value & (WOKEN | POLLING) == NONE {
- next_value |= waking;
+ next_value |= WAKING;
}
if next_value != value {
@@ -110,8 +104,9 @@ impl SharedPollState {
})
.ok()?;
- if value & (WOKEN | POLLING) == NONE {
- let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
+ // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already
+ if value & (WOKEN | POLLING | WAKING) == NONE {
+ let bomb = PollStateBomb::new(self, SharedPollState::stop_waking);
Some((value, bomb))
} else {
@@ -123,7 +118,7 @@ impl SharedPollState {
/// - `!POLLING` allowing to use wakers
/// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
/// or `will_be_woken` flag supplied
- /// - `!WAKING_ALL` as
+ /// - `!WAKING` as
/// * Wakers called during the `POLLING` phase won't propagate their calls
/// * `POLLING` phase can't start if some of the wakers are active
/// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
@@ -138,20 +133,17 @@ impl SharedPollState {
}
next_value |= value;
- Some(next_value & !POLLING & !WAKING_ALL)
+ Some(next_value & !POLLING & !WAKING)
})
.unwrap()
}
/// Toggles state to non-waking, allowing to start polling.
- fn stop_waking(&self, waking: u8) -> u8 {
- self.state
+ fn stop_waking(&self) -> u8 {
+ let value = self
+ .state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
- let mut next_value = value & !waking;
- // Waker will be called only if the current waking state is the same as the specified waker state
- if value & WAKING_ALL == waking {
- next_value |= WOKEN;
- }
+ let next_value = value & !WAKING | WOKEN;
if next_value != value {
Some(next_value)
@@ -159,12 +151,15 @@ impl SharedPollState {
None
}
})
- .unwrap_or_else(identity)
+ .unwrap_or_else(identity);
+
+ debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING);
+ value
}
/// Resets current state allowing to poll the stream and wake up wakers.
fn reset(&self) -> u8 {
- self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
+ self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
}
}
@@ -184,11 +179,6 @@ impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
fn deactivate(mut self) {
self.drop.take();
}
-
- /// Manually fires the bomb, returning supplied state.
- fn fire(mut self) -> Option<u8> {
- self.drop.take().map(|drop| (drop)(self.state))
- }
}
impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
@@ -201,16 +191,16 @@ impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
/// Will update state with the provided value on `wake_by_ref` call
/// and then, if there is a need, call `inner_waker`.
-struct InnerWaker {
+struct WrappedWaker {
inner_waker: UnsafeCell<Option<Waker>>,
poll_state: SharedPollState,
need_to_poll: u8,
}
-unsafe impl Send for InnerWaker {}
-unsafe impl Sync for InnerWaker {}
+unsafe impl Send for WrappedWaker {}
+unsafe impl Sync for WrappedWaker {}
-impl InnerWaker {
+impl WrappedWaker {
/// Replaces given waker's inner_waker for polling stream/futures which will
/// update poll state on `wake_by_ref` call. Use only if you need several
/// contexts.
@@ -218,25 +208,19 @@ impl InnerWaker {
/// ## Safety
///
/// This function will modify waker's `inner_waker` via `UnsafeCell`, so
- /// it should be used only during `POLLING` phase.
- unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker {
+ /// it should be used only during `POLLING` phase by one thread at the time.
+ unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) {
*self_arc.inner_waker.get() = cx.waker().clone().into();
- waker(self_arc.clone())
}
/// Attempts to start the waking process for the waker with the given value.
/// If succeeded, then the stream isn't yet woken and not being polled at the moment.
fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
- self.poll_state.start_waking(self.need_to_poll, self.waking_state())
- }
-
- /// Returns the corresponding waking state toggled by this waker.
- fn waking_state(&self) -> u8 {
- self.need_to_poll << 3
+ self.poll_state.start_waking(self.need_to_poll)
}
}
-impl ArcWake for InnerWaker {
+impl ArcWake for WrappedWaker {
fn wake_by_ref(self_arc: &Arc<Self>) {
if let Some((_, state_bomb)) = self_arc.start_waking() {
// Safety: now state is not `POLLING`
@@ -244,24 +228,17 @@ impl ArcWake for InnerWaker {
if let Some(inner_waker) = waker_opt.clone() {
// Stop waking to allow polling stream
- let poll_state_value = state_bomb.fire().unwrap();
-
- // Here we want to call waker only if stream isn't woken yet and
- // also to optimize the case when two wakers are called at the same time.
- //
- // In this case the best strategy will be to propagate only the latest waker's awake,
- // and then poll both entities in a single `poll_next` call
- if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() {
- // Wake up inner waker
- inner_waker.wake();
- }
+ drop(state_bomb);
+
+ // Wake up inner waker
+ inner_waker.wake();
}
}
}
}
pin_project! {
- /// Future which contains optional stream.
+ /// Future which polls optional inner stream.
///
/// If it's `Some`, it will attempt to call `poll_next` on it,
/// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
@@ -303,10 +280,10 @@ impl<St: Stream + Unpin> Future for PollStreamFut<St> {
pin_project! {
/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
- /// method.
- #[project = FlattenUnorderedProj]
+ /// method with ability to specify flow controller.
+ #[project = FlattenUnorderedWithFlowControllerProj]
#[must_use = "streams do nothing unless polled"]
- pub struct FlattenUnordered<St> where St: Stream {
+ pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream {
#[pin]
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
#[pin]
@@ -314,80 +291,110 @@ pin_project! {
poll_state: SharedPollState,
limit: Option<NonZeroUsize>,
is_stream_done: bool,
- inner_streams_waker: Arc<InnerWaker>,
- stream_waker: Arc<InnerWaker>,
+ inner_streams_waker: Arc<WrappedWaker>,
+ stream_waker: Arc<WrappedWaker>,
+ flow_controller: PhantomData<Fc>
}
}
-impl<St> fmt::Debug for FlattenUnordered<St>
+impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream + fmt::Debug,
St::Item: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("FlattenUnordered")
+ f.debug_struct("FlattenUnorderedWithFlowController")
.field("poll_state", &self.poll_state)
.field("inner_streams", &self.inner_streams)
.field("limit", &self.limit)
.field("stream", &self.stream)
.field("is_stream_done", &self.is_stream_done)
+ .field("flow_controller", &self.flow_controller)
.finish()
}
}
-impl<St> FlattenUnordered<St>
+impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
- pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
+ pub(crate) fn new(
+ stream: St,
+ limit: Option<usize>,
+ ) -> FlattenUnorderedWithFlowController<St, Fc> {
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
- FlattenUnordered {
+ FlattenUnorderedWithFlowController {
inner_streams: FuturesUnordered::new(),
stream,
is_stream_done: false,
limit: limit.and_then(NonZeroUsize::new),
- inner_streams_waker: Arc::new(InnerWaker {
+ inner_streams_waker: Arc::new(WrappedWaker {
inner_waker: UnsafeCell::new(None),
poll_state: poll_state.clone(),
need_to_poll: NEED_TO_POLL_INNER_STREAMS,
}),
- stream_waker: Arc::new(InnerWaker {
+ stream_waker: Arc::new(WrappedWaker {
inner_waker: UnsafeCell::new(None),
poll_state: poll_state.clone(),
need_to_poll: NEED_TO_POLL_STREAM,
}),
poll_state,
+ flow_controller: PhantomData,
}
}
delegate_access_inner!(stream, St, ());
}
-impl<St> FlattenUnorderedProj<'_, St>
+/// Returns the next flow step based on the received item.
+pub trait FlowController<I, O> {
+ /// Handles an item producing `FlowStep` describing the next flow step.
+ fn next_step(item: I) -> FlowStep<I, O>;
+}
+
+impl<I, O> FlowController<I, O> for () {
+ fn next_step(item: I) -> FlowStep<I, O> {
+ FlowStep::Continue(item)
+ }
+}
+
+/// Describes the next flow step.
+#[derive(Debug, Clone)]
+pub enum FlowStep<C, R> {
+ /// Just yields an item and continues standard flow.
+ Continue(C),
+ /// Immediately returns an underlying item from the function.
+ Return(R),
+}
+
+impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc>
where
St: Stream,
{
- /// Checks if current `inner_streams` size is less than optional limit.
+ /// Checks if current `inner_streams` bucket size is greater than optional limit.
fn is_exceeded_limit(&self) -> bool {
self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
}
}
-impl<St> FusedStream for FlattenUnordered<St>
+impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc>
where
St: FusedStream,
- St::Item: FusedStream + Unpin,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.inner_streams.is_empty()
}
}
-impl<St> Stream for FlattenUnordered<St>
+impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
type Item = <St::Item as Stream>::Item;
@@ -398,17 +405,21 @@ where
let mut this = self.as_mut().project();
- let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() {
- Some(value) => value,
- _ => {
- // Waker was called, just wait for the next poll
- return Poll::Pending;
+ // Attempt to start polling, in case some waker is holding the lock, wait in loop
+ let (mut poll_state_value, state_bomb) = loop {
+ if let Some(value) = this.poll_state.start_polling() {
+ break value;
}
};
+ // Safety: now state is `POLLING`.
+ unsafe {
+ WrappedWaker::replace_waker(this.stream_waker, cx);
+ WrappedWaker::replace_waker(this.inner_streams_waker, cx)
+ };
+
if poll_state_value & NEED_TO_POLL_STREAM != NONE {
- // Safety: now state is `POLLING`.
- let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };
+ let mut stream_waker = None;
// Here we need to poll the base stream.
//
@@ -424,15 +435,35 @@ where
break;
} else {
- match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) {
- Poll::Ready(Some(inner_stream)) => {
+ let mut cx = Context::from_waker(
+ stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())),
+ );
+
+ match this.stream.as_mut().poll_next(&mut cx) {
+ Poll::Ready(Some(item)) => {
+ let next_item_fut = match Fc::next_step(item) {
+ // Propagates an item immediately (the main use-case is for errors)
+ FlowStep::Return(item) => {
+ need_to_poll_next |= NEED_TO_POLL_STREAM
+ | (poll_state_value & NEED_TO_POLL_INNER_STREAMS);
+ poll_state_value &= !NEED_TO_POLL_INNER_STREAMS;
+
+ next_item = Some(item);
+
+ break;
+ }
+ // Yields an item and continues processing (normal case)
+ FlowStep::Continue(inner_stream) => {
+ PollStreamFut::new(inner_stream)
+ }
+ };
// Add new stream to the inner streams bucket
- this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream));
+ this.inner_streams.as_mut().push(next_item_fut);
// Inner streams must be polled afterward
poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
}
Poll::Ready(None) => {
- // Mark the stream as done
+ // Mark the base stream as done
*this.is_stream_done = true;
}
Poll::Pending => {
@@ -444,15 +475,10 @@ where
}
if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
- // Safety: now state is `POLLING`.
- let inner_streams_waker =
- unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) };
-
- match this
- .inner_streams
- .as_mut()
- .poll_next(&mut Context::from_waker(&inner_streams_waker))
- {
+ let inner_streams_waker = waker(this.inner_streams_waker.clone());
+ let mut cx = Context::from_waker(&inner_streams_waker);
+
+ match this.inner_streams.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(Some((item, next_item_fut)))) => {
// Push next inner stream item future to the list of inner streams futures
this.inner_streams.as_mut().push(next_item_fut);
@@ -472,15 +498,16 @@ where
// We didn't have any `poll_next` panic, so it's time to deactivate the bomb
state_bomb.deactivate();
+ // Call the waker at the end of polling if
let mut force_wake =
// we need to poll the stream and didn't reach the limit yet
need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
- // or we need to poll inner streams again
+ // or we need to poll the inner streams again
|| need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;
// Stop polling and swap the latest state
poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
- // If state was changed during `POLLING` phase, need to manually call a waker
+ // If state was changed during `POLLING` phase, we also need to manually call a waker
force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;
let is_done = *this.is_stream_done && this.inner_streams.is_empty();
@@ -499,7 +526,7 @@ where
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
-impl<St, Item> Sink<Item> for FlattenUnordered<St>
+impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream + Sink<Item>,
{
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index bb5e249..2da7036 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -181,32 +181,32 @@ mod scan;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::scan::Scan;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod buffer_unordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buffer_unordered::BufferUnordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod buffered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buffered::Buffered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
-mod flatten_unordered;
+pub(crate) mod flatten_unordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)]
pub use self::flatten_unordered::FlattenUnordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
delegate_all!(
/// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
@@ -216,20 +216,20 @@ delegate_all!(
where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
);
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod for_each_concurrent;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::for_each_concurrent::ForEachConcurrent;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg(feature = "alloc")]
mod split;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg(feature = "alloc")]
@@ -323,6 +323,9 @@ pub trait StreamExt: Stream {
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
///
+ /// See [`StreamExt::then`](Self::then) if you want to use a closure that
+ /// returns a future instead of a value.
+ ///
/// # Examples
///
/// ```
@@ -467,6 +470,9 @@ pub trait StreamExt: Stream {
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it.
///
+ /// See [`StreamExt::map`](Self::map) if you want to use a closure that
+ /// returns a value instead of a future.
+ ///
/// # Examples
///
/// ```
@@ -774,7 +780,14 @@ pub trait StreamExt: Stream {
}
/// Flattens a stream of streams into just one continuous stream. Polls
- /// inner streams concurrently.
+ /// inner streams produced by the base stream concurrently.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
///
/// # Examples
///
@@ -807,14 +820,14 @@ pub trait StreamExt: Stream {
/// assert_eq!(output, vec![1, 2, 3, 4]);
/// # });
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
where
Self::Item: Stream + Unpin,
Self: Sized,
{
- FlattenUnordered::new(self, limit.into())
+ assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
}
/// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
@@ -863,7 +876,7 @@ pub trait StreamExt: Stream {
///
/// The first argument is an optional limit on the number of concurrently
/// polled streams. If this limit is not `None`, no more than `limit` streams
- /// will be polled concurrently. The `limit` argument is of type
+ /// will be polled at the same time. The `limit` argument is of type
/// `Into<Option<usize>>`, and so can be provided as either `None`,
/// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
/// no limit at all, and will have the same result as passing in `None`.
@@ -889,7 +902,7 @@ pub trait StreamExt: Stream {
/// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
/// # });
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn flat_map_unordered<U, F>(
self,
@@ -901,7 +914,7 @@ pub trait StreamExt: Stream {
F: FnMut(Self::Item) -> U,
Self: Sized,
{
- FlatMapUnordered::new(self, limit.into(), f)
+ assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))
}
/// Combinator similar to [`StreamExt::fold`] that holds internal state
@@ -1129,7 +1142,7 @@ pub trait StreamExt: Stream {
/// fut.await;
/// # })
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn for_each_concurrent<Fut, F>(
self,
@@ -1352,7 +1365,7 @@ pub trait StreamExt: Stream {
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn buffered(self, n: usize) -> Buffered<Self>
where
@@ -1397,7 +1410,7 @@ pub trait StreamExt: Stream {
/// assert_eq!(buffered.next().await, None);
/// # Ok::<(), i32>(()) }).unwrap();
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
where
@@ -1564,7 +1577,7 @@ pub trait StreamExt: Stream {
/// library is activated, and it is activated by default.
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
where
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index e2034e0..1a7fdcb 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -15,6 +15,13 @@ pub struct SplitStream<S>(BiLock<S>);
impl<S> Unpin for SplitStream<S> {}
+impl<S> SplitStream<S> {
+ /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
+ pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
+ other.is_pair_of(&self)
+ }
+}
+
impl<S: Unpin> SplitStream<S> {
/// Attempts to put the two "halves" of a split `Stream + Sink` back
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
@@ -60,6 +67,13 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
}
}
+impl<S, Item> SplitSink<S, Item> {
+ /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
+ pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
+ self.lock.is_pair_of(&other.0)
+ }
+}
+
impl<S: Sink<Item>, Item> SplitSink<S, Item> {
fn poll_flush_slot(
mut inner: Pin<&mut S>,
@@ -142,3 +156,69 @@ impl<T, Item> fmt::Display for ReuniteError<T, Item> {
#[cfg(feature = "std")]
impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{sink::Sink, stream::StreamExt};
+ use core::marker::PhantomData;
+
+ struct NopStream<Item> {
+ phantom: PhantomData<Item>,
+ }
+
+ impl<Item> Stream for NopStream<Item> {
+ type Item = Item;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ todo!()
+ }
+ }
+
+ impl<Item> Sink<Item> for NopStream<Item> {
+ type Error = ();
+
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+
+ fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
+ todo!()
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+
+ fn poll_close(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ todo!()
+ }
+ }
+
+ #[test]
+ fn test_pairing() {
+ let s1 = NopStream::<()> { phantom: PhantomData };
+ let (sink1, stream1) = s1.split();
+ assert!(sink1.is_pair_of(&stream1));
+ assert!(stream1.is_pair_of(&sink1));
+
+ let s2 = NopStream::<()> { phantom: PhantomData };
+ let (sink2, stream2) = s2.split();
+ assert!(sink2.is_pair_of(&stream2));
+ assert!(stream2.is_pair_of(&sink2));
+
+ assert!(!sink1.is_pair_of(&stream2));
+ assert!(!stream1.is_pair_of(&sink2));
+ assert!(!sink2.is_pair_of(&stream1));
+ assert!(!stream2.is_pair_of(&sink1));
+ }
+}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index bc4c6e4..7b55444 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
+
use futures_core::{
future::{Future, TryFuture},
stream::TryStream,
@@ -88,6 +89,14 @@ mod try_flatten;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_flatten::TryFlatten;
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
+#[cfg(feature = "alloc")]
+mod try_flatten_unordered;
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_flatten_unordered::TryFlattenUnordered;
+
mod try_collect;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_collect::TryCollect;
@@ -102,6 +111,12 @@ mod try_chunks;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_chunks::{TryChunks, TryChunksError};
+#[cfg(feature = "alloc")]
+mod try_ready_chunks;
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
+
mod try_fold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_fold::TryFold;
@@ -118,26 +133,26 @@ mod try_take_while;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_take_while::TryTakeWhile;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod try_buffer_unordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_buffer_unordered::TryBufferUnordered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod try_buffered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_buffered::TryBuffered;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
mod try_for_each_concurrent;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_for_each_concurrent::TryForEachConcurrent;
@@ -151,6 +166,14 @@ mod into_async_read;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_async_read::IntoAsyncRead;
+mod try_all;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_all::TryAll;
+
+mod try_any;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_any::TryAny;
+
impl<S: ?Sized + TryStream> TryStreamExt for S {}
/// Adapters specific to `Result`-returning streams
@@ -528,7 +551,7 @@ pub trait TryStreamExt: TryStream {
/// assert_eq!(Err(oneshot::Canceled), fut.await);
/// # })
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn try_for_each_concurrent<Fut, F>(
self,
@@ -631,6 +654,55 @@ pub trait TryStreamExt: TryStream {
)
}
+ /// An adaptor for chunking up successful, ready items of the stream inside a vector.
+ ///
+ /// This combinator will attempt to pull successful items from this stream and buffer
+ /// them into a local vector. At most `capacity` items will get buffered
+ /// before they're yielded from the returned stream. If the underlying stream
+ /// returns `Poll::Pending`, and the collected chunk is not empty, it will
+ /// be immidiatly returned.
+ ///
+ /// Note that the vectors returned from this iterator may not always have
+ /// `capacity` elements. If the underlying stream ended and only a partial
+ /// vector was created, it'll be returned. Additionally if an error happens
+ /// from the underlying stream then the currently buffered items will be
+ /// yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// This function is similar to
+ /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits
+ /// early if an error occurs.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryReadyChunksError, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
+ /// let mut stream = stream.try_ready_chunks(2);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
+ /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4)));
+ /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
+ /// # })
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `capacity` is zero.
+ #[cfg(feature = "alloc")]
+ fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
+ where
+ Self: Sized,
+ {
+ assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>(
+ TryReadyChunks::new(self, capacity),
+ )
+ }
+
/// Attempt to filter the values produced by this stream according to the
/// provided asynchronous closure.
///
@@ -711,6 +783,63 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
}
+ /// Flattens a stream of streams into just one continuous stream. Produced streams
+ /// will be polled concurrently and any errors will be passed through without looking at them.
+ /// If the underlying base stream returns an error, it will be **immediately** propagated.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(Ok(1)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(Ok(2)).unwrap();
+ /// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(Ok(rx1)).unwrap();
+ /// tx3.unbounded_send(Ok(rx2)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
+ /// });
+ ///
+ /// let stream = rx3.try_flatten_unordered(None);
+ /// let mut values: Vec<_> = stream.collect().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
+ /// # });
+ /// ```
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
+ #[cfg(feature = "alloc")]
+ fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
+ where
+ Self::Ok: TryStream + Unpin,
+ <Self::Ok as TryStream>::Error: From<Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+ TryFlattenUnordered::new(self, limit),
+ )
+ }
+
/// Flattens a stream of streams into just one continuous stream.
///
/// If this stream's elements are themselves streams then this combinator
@@ -900,7 +1029,7 @@ pub trait TryStreamExt: TryStream {
/// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
where
@@ -976,7 +1105,7 @@ pub trait TryStreamExt: TryStream {
/// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
- #[cfg(not(futures_no_atomic_cas))]
+ #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn try_buffered(self, n: usize) -> TryBuffered<Self>
where
@@ -1061,4 +1190,62 @@ pub trait TryStreamExt: TryStream {
{
crate::io::assert_read(IntoAsyncRead::new(self))
}
+
+ /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
+ /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
+ /// that does not satisfy the predicate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ /// use std::convert::Infallible;
+ ///
+ /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>);
+ /// let positive = number_stream.try_all(|i| async move { i > 0 });
+ /// assert_eq!(positive.await, Ok(true));
+ ///
+ /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
+ /// let positive = stream_with_errors.try_all(|i| async move { i > 0 });
+ /// assert_eq!(positive.await, Err("err"));
+ /// # });
+ /// ```
+ fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
+ where
+ Self: Sized,
+ F: FnMut(Self::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+ {
+ assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f))
+ }
+
+ /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
+ /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
+ /// that satisfies the predicate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ /// use std::convert::Infallible;
+ ///
+ /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>);
+ /// let contain_three = number_stream.try_any(|i| async move { i == 3 });
+ /// assert_eq!(contain_three.await, Ok(true));
+ ///
+ /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
+ /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 });
+ /// assert_eq!(contain_three.await, Err("err"));
+ /// # });
+ /// ```
+ fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
+ where
+ Self: Sized,
+ F: FnMut(Self::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+ {
+ assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f))
+ }
}
diff --git a/src/stream/try_stream/try_all.rs b/src/stream/try_stream/try_all.rs
new file mode 100644
index 0000000..8179f86
--- /dev/null
+++ b/src/stream/try_stream/try_all.rs
@@ -0,0 +1,98 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_all`](super::TryStreamExt::try_all) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryAll<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ done: bool,
+ #[pin]
+ future: Option<Fut>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryAll<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryAll")
+ .field("stream", &self.stream)
+ .field("done", &self.done)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryAll<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, done: false, future: None }
+ }
+}
+
+impl<St, Fut, F> FusedFuture for TryAll<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done && self.future.is_none()
+ }
+}
+
+impl<St, Fut, F> Future for TryAll<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ type Output = Result<bool, St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, St::Error>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new value
+ let acc = ready!(fut.poll(cx));
+ this.future.set(None);
+ if !acc {
+ *this.done = true;
+ break Ok(false);
+ } // early exit
+ } else if !*this.done {
+ // we're waiting on a new item from the stream
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(item)) => {
+ this.future.set(Some((this.f)(item)));
+ }
+ Some(Err(err)) => {
+ *this.done = true;
+ break Err(err);
+ }
+ None => {
+ *this.done = true;
+ break Ok(true);
+ }
+ }
+ } else {
+ panic!("TryAll polled after completion")
+ }
+ })
+ }
+}
diff --git a/src/stream/try_stream/try_any.rs b/src/stream/try_stream/try_any.rs
new file mode 100644
index 0000000..55e876b
--- /dev/null
+++ b/src/stream/try_stream/try_any.rs
@@ -0,0 +1,98 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::TryStream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`try_any`](super::TryStreamExt::try_any) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryAny<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ done: bool,
+ #[pin]
+ future: Option<Fut>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for TryAny<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryAny")
+ .field("stream", &self.stream)
+ .field("done", &self.done)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryAny<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, done: false, future: None }
+ }
+}
+
+impl<St, Fut, F> FusedFuture for TryAny<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done && self.future.is_none()
+ }
+}
+
+impl<St, Fut, F> Future for TryAny<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(St::Ok) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ type Output = Result<bool, St::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, St::Error>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new value
+ let acc = ready!(fut.poll(cx));
+ this.future.set(None);
+ if acc {
+ *this.done = true;
+ break Ok(true);
+ } // early exit
+ } else if !*this.done {
+ // we're waiting on a new item from the stream
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(item)) => {
+ this.future.set(Some((this.f)(item)));
+ }
+ Some(Err(err)) => {
+ *this.done = true;
+ break Err(err);
+ }
+ None => {
+ *this.done = true;
+ break Ok(false);
+ }
+ }
+ } else {
+ panic!("TryAny polled after completion")
+ }
+ })
+ }
+}
diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs
index 3bb253a..ec53f4b 100644
--- a/src/stream/try_stream/try_chunks.rs
+++ b/src/stream/try_stream/try_chunks.rs
@@ -41,9 +41,10 @@ impl<St: TryStream> TryChunks<St> {
delegate_access_inner!(stream, St, (. .));
}
+type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
+
impl<St: TryStream> Stream for TryChunks<St> {
- #[allow(clippy::type_complexity)]
- type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
+ type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
diff --git a/src/stream/try_stream/try_flatten_unordered.rs b/src/stream/try_stream/try_flatten_unordered.rs
new file mode 100644
index 0000000..a74dfc4
--- /dev/null
+++ b/src/stream/try_stream/try_flatten_unordered.rs
@@ -0,0 +1,176 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+use pin_project_lite::pin_project;
+
+use crate::future::Either;
+use crate::stream::stream::flatten_unordered::{
+ FlattenUnorderedWithFlowController, FlowController, FlowStep,
+};
+use crate::stream::IntoStream;
+use crate::TryStreamExt;
+
+delegate_all!(
+ /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
+ TryFlattenUnordered<St>(
+ FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ + New[
+ |stream: St, limit: impl Into<Option<usize>>|
+ FlattenUnorderedWithFlowController::new(
+ NestedTryStreamIntoEitherTryStream::new(stream),
+ limit.into()
+ )
+ ]
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+);
+
+pin_project! {
+ /// Emits either successful streams or single-item streams containing the underlying errors.
+ /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct NestedTryStreamIntoEitherTryStream<St>
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+ {
+ #[pin]
+ stream: St
+ }
+}
+
+impl<St> NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn new(stream: St) -> Self {
+ Self { stream }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+/// Emits a single item immediately, then stream will be terminated.
+#[derive(Debug, Clone)]
+pub struct Single<T>(Option<T>);
+
+impl<T> Single<T> {
+ /// Constructs new `Single` with the given value.
+ fn new(val: T) -> Self {
+ Self(Some(val))
+ }
+
+ /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
+ fn next_immediate(&mut self) -> Option<T> {
+ self.0.take()
+ }
+}
+
+impl<T> Unpin for Single<T> {}
+
+impl<T> Stream for Single<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(self.0.take())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
+ }
+}
+
+/// Immediately propagates errors occurred in the base stream.
+#[derive(Debug, Clone, Copy)]
+pub struct PropagateBaseStreamError<St>(PhantomData<St>);
+
+type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
+type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;
+
+impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
+ match item {
+ // A new successful inner stream received
+ st @ Either::Left(_) => FlowStep::Continue(st),
+ // An error encountered
+ Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
+ }
+ }
+}
+
+type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
+
+impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ // Item is either an inner stream or a stream containing a single error.
+ // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
+ type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let item = ready!(self.project().stream.try_poll_next(cx));
+
+ let out = match item {
+ Some(res) => match res {
+ // Emit successful inner stream as is
+ Ok(stream) => Either::Left(stream.into_stream()),
+ // Wrap an error into a stream containing a single item
+ err @ Err(_) => {
+ let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
+
+ Either::Right(Single::new(res))
+ }
+ },
+ None => return Poll::Ready(None),
+ };
+
+ Poll::Ready(Some(out))
+ }
+}
+
+impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + FusedStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + Sink<Item>,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
+{
+ type Error = <St as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/try_stream/try_ready_chunks.rs b/src/stream/try_stream/try_ready_chunks.rs
new file mode 100644
index 0000000..8b1470e
--- /dev/null
+++ b/src/stream/try_stream/try_ready_chunks.rs
@@ -0,0 +1,126 @@
+use crate::stream::{Fuse, IntoStream, StreamExt};
+
+use alloc::vec::Vec;
+use core::fmt;
+use core::pin::Pin;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryReadyChunks<St: TryStream> {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<St: TryStream> TryReadyChunks<St> {
+ pub(super) fn new(stream: St, capacity: usize) -> Self {
+ assert!(capacity > 0);
+
+ Self { stream: IntoStream::new(stream).fuse(), cap: capacity }
+ }
+
+ delegate_access_inner!(stream, St, (. .));
+}
+
+type TryReadyChunksStreamError<St> =
+ TryReadyChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
+
+impl<St: TryStream> Stream for TryReadyChunks<St> {
+ type Item = Result<Vec<St::Ok>, TryReadyChunksStreamError<St>>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.as_mut().project();
+
+ let mut items: Vec<St::Ok> = Vec::new();
+
+ loop {
+ match this.stream.as_mut().poll_next(cx) {
+ // Flush all the collected data if the underlying stream doesn't
+ // contain more ready values
+ Poll::Pending => {
+ return if items.is_empty() {
+ Poll::Pending
+ } else {
+ Poll::Ready(Some(Ok(items)))
+ }
+ }
+
+ // Push the ready item into the buffer and check whether it is full.
+ // If so, return the buffer.
+ Poll::Ready(Some(Ok(item))) => {
+ if items.is_empty() {
+ items.reserve_exact(*this.cap);
+ }
+ items.push(item);
+ if items.len() >= *this.cap {
+ return Poll::Ready(Some(Ok(items)));
+ }
+ }
+
+ // Return the already collected items and the error.
+ Poll::Ready(Some(Err(e))) => {
+ return Poll::Ready(Some(Err(TryReadyChunksError(items, e))));
+ }
+
+ // Since the underlying stream ran out of values, return what we
+ // have buffered, if we have anything.
+ Poll::Ready(None) => {
+ let last = if items.is_empty() { None } else { Some(Ok(items)) };
+ return Poll::Ready(last);
+ }
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let (lower, upper) = self.stream.size_hint();
+ let lower = lower / self.cap;
+ (lower, upper)
+ }
+}
+
+impl<St: TryStream + FusedStream> FusedStream for TryReadyChunks<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item> Sink<Item> for TryReadyChunks<S>
+where
+ S: TryStream + Sink<Item>,
+{
+ type Error = <S as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
+
+/// Error indicating, that while chunk was collected inner stream produced an error.
+///
+/// Contains all items that were collected before an error occurred, and the stream error itself.
+#[derive(PartialEq, Eq)]
+pub struct TryReadyChunksError<T, E>(pub Vec<T>, pub E);
+
+impl<T, E: fmt::Debug> fmt::Debug for TryReadyChunksError<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.1.fmt(f)
+ }
+}
+
+impl<T, E: fmt::Display> fmt::Display for TryReadyChunksError<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.1.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryReadyChunksError<T, E> {}
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index 7d8ef6b..2f48ccc 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -36,7 +36,7 @@ use pin_project_lite::pin_project;
/// let stream = stream::unfold(0, |state| async move {
/// if state <= 2 {
/// let next_state = state + 1;
-/// let yielded = state * 2;
+/// let yielded = state * 2;
/// Some((yielded, next_state))
/// } else {
/// None
diff --git a/src/task/mod.rs b/src/task/mod.rs
index 0a31eea..7a9e993 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -18,19 +18,22 @@ pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError,
pub use futures_task::noop_waker;
pub use futures_task::noop_waker_ref;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use futures_task::ArcWake;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use futures_task::waker;
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use futures_task::{waker_ref, WakerRef};
-#[cfg(not(futures_no_atomic_cas))]
+#[cfg_attr(
+ target_os = "none",
+ cfg(any(target_has_atomic = "ptr", feature = "portable-atomic"))
+)]
pub use futures_core::task::__internal::AtomicWaker;
mod spawn;