From 41e437e55b496ba9779e4e5767abe63c77eb42bf Mon Sep 17 00:00:00 2001 From: Jeff Vander Stoep Date: Thu, 1 Feb 2024 15:12:47 +0100 Subject: Upgrade futures-util to 0.3.30 This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update external/rust/crates/futures-util For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I4d01cd43e35cccd70ff58f1669ac3697035e8c60 --- .cargo_vcs_info.json | 2 +- Android.bp | 4 +- Cargo.toml | 16 +- Cargo.toml.orig | 16 +- METADATA | 23 +- README.md | 2 +- benches/bilock.rs | 68 ++++++ benches/flatten_unordered.rs | 18 +- benches_disabled/bilock.rs | 122 ----------- build.rs | 41 ---- no_atomic_cas.rs | 17 -- src/abortable.rs | 24 ++ src/async_await/mod.rs | 2 + src/async_await/stream_select_mod.rs | 2 - src/future/future/fuse.rs | 12 +- src/future/future/mod.rs | 4 - src/future/future/shared.rs | 4 - src/future/join_all.rs | 17 +- src/future/mod.rs | 6 +- src/future/select.rs | 17 +- src/future/try_join_all.rs | 13 +- src/future/try_select.rs | 13 +- src/io/copy_buf_abortable.rs | 2 +- src/io/cursor.rs | 8 - src/io/fill_buf.rs | 18 +- src/io/mod.rs | 6 +- src/io/read_line.rs | 1 + src/io/split.rs | 14 ++ src/io/window.rs | 2 +- src/lib.rs | 2 +- src/lock/bilock.rs | 53 +++-- src/lock/mod.rs | 10 +- src/stream/futures_ordered.rs | 71 +++--- src/stream/futures_unordered/mod.rs | 23 +- src/stream/futures_unordered/ready_to_run_queue.rs | 37 +--- src/stream/mod.rs | 47 ++-- src/stream/select_all.rs | 35 ++- src/stream/stream/all.rs | 23 +- src/stream/stream/any.rs | 23 +- src/stream/stream/flatten_unordered.rs | 243 ++++++++++++--------- src/stream/stream/mod.rs | 57 +++-- src/stream/stream/split.rs | 80 +++++++ src/stream/try_stream/mod.rs | 205 ++++++++++++++++- src/stream/try_stream/try_all.rs | 98 +++++++++ src/stream/try_stream/try_any.rs | 98 +++++++++ src/stream/try_stream/try_chunks.rs | 5 +- src/stream/try_stream/try_flatten_unordered.rs | 176 +++++++++++++++ src/stream/try_stream/try_ready_chunks.rs | 126 +++++++++++ src/stream/unfold.rs | 2 +- src/task/mod.rs | 11 +- 50 files changed, 1319 insertions(+), 600 deletions(-) create mode 100644 benches/bilock.rs delete mode 100644 benches_disabled/bilock.rs delete mode 100644 build.rs delete mode 100644 no_atomic_cas.rs create mode 100644 src/stream/try_stream/try_all.rs create mode 100644 src/stream/try_stream/try_any.rs create mode 100644 src/stream/try_stream/try_flatten_unordered.rs create mode 100644 src/stream/try_stream/try_ready_chunks.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index dde4c7f..1833f75 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "5e3693a350f96244151081d2c030208cd15f9572" + "sha1": "de1a0fd64a1bcae9a1534ed4da1699632993cc26" }, "path_in_vcs": "futures-util" } \ No newline at end of file diff --git a/Android.bp b/Android.bp index 60e4e8e..a38886e 100644 --- a/Android.bp +++ b/Android.bp @@ -42,7 +42,7 @@ rust_test { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.26", + cargo_pkg_version: "0.3.30", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -86,7 +86,7 @@ rust_library { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.26", + cargo_pkg_version: "0.3.30", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/Cargo.toml b/Cargo.toml index 47e9f55..c95816a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,9 @@ [package] edition = "2018" -rust-version = "1.45" +rust-version = "1.56" name = "futures-util" -version = "0.3.26" +version = "0.3.30" description = """ Common utilities and extension traits for the futures-rs library. """ @@ -30,33 +30,33 @@ rustdoc-args = [ ] [dependencies.futures-channel] -version = "0.3.26" +version = "0.3.30" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.26" +version = "0.3.30" default-features = false [dependencies.futures-io] -version = "0.3.26" +version = "0.3.30" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.26" +version = "=0.3.30" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.26" +version = "0.3.30" optional = true default-features = false [dependencies.futures-task] -version = "0.3.26" +version = "0.3.30" default-features = false [dependencies.futures_01] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 95c3dee..dcdbce4 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,8 +1,8 @@ [package] name = "futures-util" -version = "0.3.26" +version = "0.3.30" edition = "2018" -rust-version = "1.45" +rust-version = "1.56" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" @@ -35,12 +35,12 @@ write-all-vectored = ["io"] cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.26", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.26", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.26", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.26", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.30", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.30", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.30", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.30", default-features = false, optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } diff --git a/METADATA b/METADATA index 93b66be..8ba8c32 100644 --- a/METADATA +++ b/METADATA @@ -1,23 +1,20 @@ # This project was upgraded with external_updater. -# Usage: tools/external_updater/updater.sh update rust/crates/futures-util -# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md +# Usage: tools/external_updater/updater.sh update external/rust/crates/futures-util +# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md name: "futures-util" description: "Common utilities and extension traits for the futures-rs library." third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/futures-util" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.26.crate" - } - version: "0.3.26" license_type: NOTICE last_upgrade_date { - year: 2023 + year: 2024 month: 2 - day: 15 + day: 1 + } + homepage: "https://crates.io/crates/futures-util" + identifier { + type: "Archive" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.30.crate" + version: "0.3.30" } } diff --git a/README.md b/README.md index 6e0aaed..60e2c21 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Add this to your `Cargo.toml`: futures-util = "0.3" ``` -The current `futures-util` requires Rust 1.45 or later. +The current `futures-util` requires Rust 1.56 or later. ## License diff --git a/benches/bilock.rs b/benches/bilock.rs new file mode 100644 index 0000000..013f335 --- /dev/null +++ b/benches/bilock.rs @@ -0,0 +1,68 @@ +#![feature(test)] +#![cfg(feature = "bilock")] + +extern crate test; + +use futures::task::Poll; +use futures_test::task::noop_context; +use futures_util::lock::BiLock; + +use crate::test::Bencher; + +#[bench] +fn contended(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_lock(&mut context) { + Poll::Pending => (), + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }); +} + +#[bench] +fn lock_unlock(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }) +} diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs index 64d5f9a..517b281 100644 --- a/benches/flatten_unordered.rs +++ b/benches/flatten_unordered.rs @@ -5,9 +5,10 @@ use crate::test::Bencher; use futures::channel::oneshot; use futures::executor::block_on; -use futures::future::{self, FutureExt}; +use futures::future; use futures::stream::{self, StreamExt}; use futures::task::Poll; +use futures_util::FutureExt; use std::collections::VecDeque; use std::thread; @@ -34,18 +35,9 @@ fn oneshot_streams(b: &mut Bencher) { } }); - let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { - async { - if let Some(next) = vals.next() { - let val = next.await.unwrap(); - Some((val, vals)) - } else { - None - } - } - .boxed() - }) - .flatten_unordered(None); + let mut flatten = stream::iter(rxs) + .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten()) + .flatten_unordered(None); block_on(future::poll_fn(move |cx| { let mut count = 0; diff --git a/benches_disabled/bilock.rs b/benches_disabled/bilock.rs deleted file mode 100644 index 417f75d..0000000 --- a/benches_disabled/bilock.rs +++ /dev/null @@ -1,122 +0,0 @@ -#![feature(test)] - -#[cfg(feature = "bilock")] -mod bench { - use futures::executor::LocalPool; - use futures::task::{Context, Waker}; - use futures_util::lock::BiLock; - use futures_util::lock::BiLockAcquire; - use futures_util::lock::BiLockAcquired; - use futures_util::task::ArcWake; - - use std::sync::Arc; - use test::Bencher; - - fn notify_noop() -> Waker { - struct Noop; - - impl ArcWake for Noop { - fn wake(_: &Arc) {} - } - - ArcWake::into_waker(Arc::new(Noop)) - } - - /// Pseudo-stream which simply calls `lock.poll()` on `poll` - struct LockStream { - lock: BiLockAcquire, - } - - impl LockStream { - fn new(lock: BiLock) -> Self { - Self { lock: lock.lock() } - } - - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired) { - self.lock = guard.unlock().lock() - } - } - - impl Stream for LockStream { - type Item = BiLockAcquired; - type Error = (); - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) - } - } - - #[bench] - fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }); - } - - #[bench] - fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }) - } -} diff --git a/build.rs b/build.rs deleted file mode 100644 index 05e0496..0000000 --- a/build.rs +++ /dev/null @@ -1,41 +0,0 @@ -// The rustc-cfg listed below are considered public API, but it is *unstable* -// and outside of the normal semver guarantees: -// -// - `futures_no_atomic_cas` -// Assume the target does *not* support atomic CAS operations. -// This is usually detected automatically by the build script, but you may -// need to enable it manually when building for custom targets or using -// non-cargo build systems that don't run the build script. -// -// With the exceptions mentioned above, the rustc-cfg emitted by the build -// script are *not* public API. - -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - -fn main() { - let target = match env::var("TARGET") { - Ok(target) => target, - Err(e) => { - println!( - "cargo:warning={}: unable to get TARGET environment variable: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Note that this is `no_*`, not `has_*`. This allows treating - // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't - // run. This is needed for compatibility with non-cargo build systems that - // don't run the build script. - if NO_ATOMIC_CAS.contains(&&*target) { - println!("cargo:rustc-cfg=futures_no_atomic_cas"); - } - - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); -} diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs deleted file mode 100644 index 16ec628..0000000 --- a/no_atomic_cas.rs +++ /dev/null @@ -1,17 +0,0 @@ -// This file is @generated by no_atomic_cas.sh. -// It is not intended for manual editing. - -const NO_ATOMIC_CAS: &[&str] = &[ - "armv4t-none-eabi", - "armv5te-none-eabi", - "avr-unknown-gnu-atmega328", - "bpfeb-unknown-none", - "bpfel-unknown-none", - "msp430-none-elf", - "riscv32i-unknown-none-elf", - "riscv32im-unknown-none-elf", - "riscv32imc-unknown-none-elf", - "thumbv4t-none-eabi", - "thumbv5te-none-eabi", - "thumbv6m-none-eabi", -]; diff --git a/src/abortable.rs b/src/abortable.rs index e0afd47..9dbcfc2 100644 --- a/src/abortable.rs +++ b/src/abortable.rs @@ -78,6 +78,17 @@ pub struct AbortRegistration { pub(crate) inner: Arc, } +impl AbortRegistration { + /// Create an [`AbortHandle`] from the given [`AbortRegistration`]. + /// + /// The created [`AbortHandle`] is functionally the same as any other + /// [`AbortHandle`]s that are associated with the same [`AbortRegistration`], + /// such as the one created by [`AbortHandle::new_pair`]. + pub fn handle(&self) -> AbortHandle { + AbortHandle { inner: self.inner.clone() } + } +} + /// A handle to an `Abortable` task. #[derive(Debug, Clone)] pub struct AbortHandle { @@ -182,4 +193,17 @@ impl AbortHandle { self.inner.aborted.store(true, Ordering::Relaxed); self.inner.waker.wake(); } + + /// Checks whether [`AbortHandle::abort`] was *called* on any associated + /// [`AbortHandle`]s, which includes all the [`AbortHandle`]s linked with + /// the same [`AbortRegistration`]. This means that it will return `true` + /// even if: + /// * `abort` was called after the task had completed. + /// * `abort` was called while the task was being polled - the task may still be running and + /// will not be stopped until `poll` returns. + /// + /// This operation has a Relaxed ordering. + pub fn is_aborted(&self) -> bool { + self.inner.aborted.load(Ordering::Relaxed) + } } diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs index 7276da2..7e3f12c 100644 --- a/src/async_await/mod.rs +++ b/src/async_await/mod.rs @@ -31,9 +31,11 @@ mod select_mod; pub use self::select_mod::*; // Primary export is a macro +#[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] mod stream_select_mod; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 +#[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] pub use self::stream_select_mod::*; diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs index 1c8002f..61e3fa1 100644 --- a/src/async_await/stream_select_mod.rs +++ b/src/async_await/stream_select_mod.rs @@ -1,6 +1,5 @@ //! The `stream_select` macro. -#[cfg(feature = "std")] #[allow(unreachable_pub)] #[doc(hidden)] pub use futures_macro::stream_select_internal; @@ -28,7 +27,6 @@ pub use futures_macro::stream_select_internal; /// } /// # }); /// ``` -#[cfg(feature = "std")] #[macro_export] macro_rules! stream_select { ($($tokens:tt)*) => {{ diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs index 597aec1..2257906 100644 --- a/src/future/future/fuse.rs +++ b/src/future/future/fuse.rs @@ -1,6 +1,5 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; -use futures_core::ready; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -81,13 +80,12 @@ impl Future for Fuse { type Output = Fut::Output; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { - Some(fut) => { - let output = ready!(fut.poll(cx)); + match self.as_mut().project().inner.as_pin_mut() { + Some(fut) => fut.poll(cx).map(|output| { self.project().inner.set(None); output - } - None => return Poll::Pending, - }) + }), + None => Poll::Pending, + } } } diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index c11d108..955af37 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -463,10 +463,6 @@ pub trait FutureExt: Future { /// ``` /// /// ``` - /// // Note, unlike most examples this is written in the context of a - /// // synchronous function to better illustrate the cross-thread aspect of - /// // the `shared` combinator. - /// /// # futures::executor::block_on(async { /// use futures::future::FutureExt; /// use futures::executor::block_on; diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index ecd1b42..9ab3b4f 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -37,10 +37,6 @@ impl Clone for WeakShared { } } -// The future itself is polled behind the `Arc`, so it won't be moved -// when `Shared` is moved. -impl Unpin for Shared {} - impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Shared") diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 7dc159b..79eee8d 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -12,7 +12,7 @@ use core::task::{Context, Poll}; use super::{assert_future, MaybeDone}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { @@ -31,7 +31,7 @@ where kind: JoinAllKind, } -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] pub(crate) const SMALL: usize = 30; enum JoinAllKind @@ -41,7 +41,7 @@ where Small { elems: Pin]>>, }, - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] Big { fut: Collect, Vec>, }, @@ -57,7 +57,7 @@ where JoinAllKind::Small { ref elems } => { f.debug_struct("JoinAll").field("elems", elems).finish() } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), } } @@ -77,7 +77,7 @@ where /// /// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance /// reasons if the number of futures is large. You may want to look into using it or -/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// its counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. /// /// Some examples for additional functionality provided by these are: /// @@ -106,7 +106,8 @@ where { let iter = iter.into_iter(); - #[cfg(futures_no_atomic_cas)] + #[cfg(target_os = "none")] + #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))] { let kind = JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; @@ -114,7 +115,7 @@ where assert_future::::Output>, _>(JoinAll { kind }) } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] { let kind = match iter.size_hint().1 { Some(max) if max <= SMALL => JoinAllKind::Small { @@ -153,7 +154,7 @@ where Poll::Pending } } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] JoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } diff --git a/src/future/mod.rs b/src/future/mod.rs index 374e365..2d8fa4f 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -111,13 +111,13 @@ pub use self::select_ok::{select_ok, SelectOk}; mod either; pub use self::either::Either; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod abortable; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use abortable::abortable; diff --git a/src/future/select.rs b/src/future/select.rs index e693a30..7e33d19 100644 --- a/src/future/select.rs +++ b/src/future/select.rs @@ -99,17 +99,26 @@ where type Output = Either<(A::Output, B), (B::Output, A)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where + /// `Option::unwrap` does not. + #[inline(always)] + fn unwrap_option(value: Option) -> T { + match value { + None => unreachable!(), + Some(value) => value, + } + } + + let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); if let Poll::Ready(val) = a.poll_unpin(cx) { - return Poll::Ready(Either::Left((val, b))); + return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1))); } if let Poll::Ready(val) = b.poll_unpin(cx) { - return Poll::Ready(Either::Right((val, a))); + return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0))); } - self.inner = Some((a, b)); Poll::Pending } } diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 506f450..2d6a2a0 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs @@ -12,7 +12,7 @@ use core::task::{Context, Poll}; use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; use crate::TryFutureExt; @@ -38,7 +38,7 @@ where Small { elems: Pin>]>>, }, - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] Big { fut: TryCollect>, Vec>, }, @@ -56,7 +56,7 @@ where TryJoinAllKind::Small { ref elems } => { f.debug_struct("TryJoinAll").field("elems", elems).finish() } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), } } @@ -121,7 +121,8 @@ where { let iter = iter.into_iter().map(TryFutureExt::into_future); - #[cfg(futures_no_atomic_cas)] + #[cfg(target_os = "none")] + #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))] { let kind = TryJoinAllKind::Small { elems: iter.map(TryMaybeDone::Future).collect::>().into(), @@ -132,7 +133,7 @@ where ) } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] { let kind = match iter.size_hint().1 { Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { @@ -184,7 +185,7 @@ where } } } - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } diff --git a/src/future/try_select.rs b/src/future/try_select.rs index 4d0b7ff..bc282f7 100644 --- a/src/future/try_select.rs +++ b/src/future/try_select.rs @@ -12,6 +12,9 @@ pub struct TrySelect { impl Unpin for TrySelect {} +type EitherOk = Either<(::Ok, B), (::Ok, A)>; +type EitherErr = Either<(::Error, B), (::Error, A)>; + /// Waits for either one of two differently-typed futures to complete. /// /// This function will return a new future which awaits for either one of both @@ -52,10 +55,9 @@ where A: TryFuture + Unpin, B: TryFuture + Unpin, { - super::assert_future::< - Result, Either<(A::Error, B), (B::Error, A)>>, - _, - >(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::, EitherErr>, _>(TrySelect { + inner: Some((future1, future2)), + }) } impl Future for TrySelect @@ -63,8 +65,7 @@ where A: TryFuture, B: TryFuture, { - #[allow(clippy::type_complexity)] - type Output = Result, Either<(A::Error, B), (B::Error, A)>>; + type Output = Result, EitherErr>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); diff --git a/src/io/copy_buf_abortable.rs b/src/io/copy_buf_abortable.rs index fdbc4a5..ed22d62 100644 --- a/src/io/copy_buf_abortable.rs +++ b/src/io/copy_buf_abortable.rs @@ -57,7 +57,7 @@ where } pin_project! { - /// Future for the [`copy_buf()`] function. + /// Future for the [`copy_buf_abortable()`] function. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct CopyBufAbortable<'a, R, W: ?Sized> { diff --git a/src/io/cursor.rs b/src/io/cursor.rs index b6fb372..c6e2aee 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read_initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use std::io; use std::pin::Pin; @@ -159,12 +157,6 @@ where } impl + Unpin> AsyncRead for Cursor { - #[cfg(feature = "read_initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - io::Read::initializer(&self.inner) - } - fn poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs index a1484c0..45862b8 100644 --- a/src/io/fill_buf.rs +++ b/src/io/fill_buf.rs @@ -3,6 +3,7 @@ use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; use std::io; use std::pin::Pin; +use std::slice; /// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method. #[derive(Debug)] @@ -30,17 +31,12 @@ where let reader = this.reader.take().expect("Polled FillBuf after completion"); match Pin::new(&mut *reader).poll_fill_buf(cx) { - // With polonius it is possible to remove this inner match and just have the correct - // lifetime of the reference inferred based on which branch is taken - Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) { - Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)), - Poll::Ready(Err(err)) => { - unreachable!("reader indicated readiness but then returned an error: {:?}", err) - } - Poll::Pending => { - unreachable!("reader indicated readiness but then returned pending") - } - }, + Poll::Ready(Ok(slice)) => { + // With polonius it is possible to remove this lifetime transmutation and just have + // the correct lifetime of the reference inferred based on which branch is taken + let slice: &'a [u8] = unsafe { slice::from_raw_parts(slice.as_ptr(), slice.len()) }; + Poll::Ready(Ok(slice)) + } Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Pending => { this.reader = Some(reader); diff --git a/src/io/mod.rs b/src/io/mod.rs index 8ce3ad6..fdad60b 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -804,11 +804,11 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// use futures::io::{AsyncBufReadExt, Cursor}; /// use futures::stream::StreamExt; /// - /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); + /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor"); /// - /// let mut lines_stream = cursor.lines().map(|l| l.unwrap()); + /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8"))); /// assert_eq!(lines_stream.next().await, Some(String::from("lorem"))); - /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum"))); + /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8"))); /// assert_eq!(lines_stream.next().await, Some(String::from("dolor"))); /// assert_eq!(lines_stream.next().await, None); /// # Ok::<(), Box>(()) }).unwrap(); diff --git a/src/io/read_line.rs b/src/io/read_line.rs index e1b8fc9..df782c9 100644 --- a/src/io/read_line.rs +++ b/src/io/read_line.rs @@ -35,6 +35,7 @@ pub(super) fn read_line_internal( ) -> Poll> { let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); if str::from_utf8(bytes).is_err() { + bytes.clear(); Poll::Ready(ret.and_then(|_| { Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) })) diff --git a/src/io/split.rs b/src/io/split.rs index 3f1b9af..81d1e6d 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -31,6 +31,13 @@ pub(super) fn split(t: T) -> (ReadHalf, WriteHalf< (ReadHalf { handle: a }, WriteHalf { handle: b }) } +impl ReadHalf { + /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &WriteHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + impl ReadHalf { /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back /// together. Succeeds only if the `ReadHalf` and `WriteHalf` are @@ -42,6 +49,13 @@ impl ReadHalf { } } +impl WriteHalf { + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same stream. + pub fn is_pair_of(&self, other: &ReadHalf) -> bool { + self.handle.is_pair_of(&other.handle) + } +} + impl WriteHalf { /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back /// together. Succeeds only if the `ReadHalf` and `WriteHalf` are diff --git a/src/io/window.rs b/src/io/window.rs index 77b7267..d857282 100644 --- a/src/io/window.rs +++ b/src/io/window.rs @@ -1,6 +1,6 @@ use std::ops::{Bound, Range, RangeBounds}; -/// A owned window around an underlying buffer. +/// An owned window around an underlying buffer. /// /// Normally slices work great for considering sub-portions of a buffer, but /// unfortunately a slice is a *borrowed* type in Rust which has an associated diff --git a/src/lib.rs b/src/lib.rs index 9a10c93..208eb73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -329,7 +329,7 @@ pub use crate::io::{ #[cfg(feature = "alloc")] pub mod lock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod abortable; diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs index 2174079..a89678e 100644 --- a/src/lock/bilock.rs +++ b/src/lock/bilock.rs @@ -3,11 +3,11 @@ use alloc::boxed::Box; use alloc::sync::Arc; use core::cell::UnsafeCell; -use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; -use core::sync::atomic::AtomicUsize; +use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering::SeqCst; +use core::{fmt, ptr}; #[cfg(feature = "bilock")] use futures_core::future::Future; use futures_core::task::{Context, Poll, Waker}; @@ -41,7 +41,7 @@ pub struct BiLock { #[derive(Debug)] struct Inner { - state: AtomicUsize, + state: AtomicPtr, value: Option>, } @@ -61,7 +61,10 @@ impl BiLock { /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); + let arc = Arc::new(Inner { + state: AtomicPtr::new(ptr::null_mut()), + value: Some(UnsafeCell::new(t)), + }); (Self { arc: arc.clone() }, Self { arc }) } @@ -87,7 +90,8 @@ impl BiLock { pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { let mut waker = None; loop { - match self.arc.state.swap(1, SeqCst) { + let n = self.arc.state.swap(invalid_ptr(1), SeqCst); + match n as usize { // Woohoo, we grabbed the lock! 0 => return Poll::Ready(BiLockGuard { bilock: self }), @@ -96,8 +100,8 @@ impl BiLock { // A task was previously blocked on this lock, likely our task, // so we need to update that task. - n => unsafe { - let mut prev = Box::from_raw(n as *mut Waker); + _ => unsafe { + let mut prev = Box::from_raw(n); *prev = cx.waker().clone(); waker = Some(prev); }, @@ -105,9 +109,9 @@ impl BiLock { // type ascription for safety's sake! let me: Box = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); - let me = Box::into_raw(me) as usize; + let me = Box::into_raw(me); - match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { + match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) { // The lock is still locked, but we've now parked ourselves, so // just report that we're scheduled to receive a notification. Ok(_) => return Poll::Pending, @@ -115,8 +119,8 @@ impl BiLock { // Oops, looks like the lock was unlocked after our swap above // and before the compare_exchange. Deallocate what we just // allocated and go through the loop again. - Err(0) => unsafe { - waker = Some(Box::from_raw(me as *mut Waker)); + Err(n) if n.is_null() => unsafe { + waker = Some(Box::from_raw(me)); }, // The top of this loop set the previous state to 1, so if we @@ -125,7 +129,7 @@ impl BiLock { // but we're trying to acquire the lock and there's only one // other reference of the lock, so it should be impossible for // that task to ever block itself. - Err(n) => panic!("invalid state: {}", n), + Err(n) => panic!("invalid state: {}", n as usize), } } } @@ -145,6 +149,11 @@ impl BiLock { BiLockAcquire { bilock: self } } + /// Returns `true` only if the other `BiLock` originated from the same call to `BiLock::new`. + pub fn is_pair_of(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.arc, &other.arc) + } + /// Attempts to put the two "halves" of a `BiLock` back together and /// recover the original value. Succeeds only if the two `BiLock`s /// originated from the same call to `BiLock::new`. @@ -152,7 +161,7 @@ impl BiLock { where T: Unpin, { - if Arc::ptr_eq(&self.arc, &other.arc) { + if self.is_pair_of(&other) { drop(other); let inner = Arc::try_unwrap(self.arc) .ok() @@ -164,7 +173,8 @@ impl BiLock { } fn unlock(&self) { - match self.arc.state.swap(0, SeqCst) { + let n = self.arc.state.swap(ptr::null_mut(), SeqCst); + match n as usize { // we've locked the lock, shouldn't be possible for us to see an // unlocked lock. 0 => panic!("invalid unlocked state"), @@ -174,8 +184,8 @@ impl BiLock { // Another task has parked themselves on this lock, let's wake them // up as its now their turn. - n => unsafe { - Box::from_raw(n as *mut Waker).wake(); + _ => unsafe { + Box::from_raw(n).wake(); }, } } @@ -189,7 +199,7 @@ impl Inner { impl Drop for Inner { fn drop(&mut self) { - assert_eq!(self.state.load(SeqCst), 0); + assert!(self.state.load(SeqCst).is_null()); } } @@ -277,3 +287,12 @@ impl<'a, T> Future for BiLockAcquire<'a, T> { self.bilock.poll_lock(cx) } } + +// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. +#[allow(clippy::useless_transmute)] +#[inline] +fn invalid_ptr(addr: usize) -> *mut T { + // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that + // pointer). + unsafe { core::mem::transmute(addr) } +} diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 0be7271..8ca0ff6 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -3,25 +3,25 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(any(feature = "sink", feature = "io"))] #[cfg(not(feature = "bilock"))] pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "std")] pub use self::mutex::{ MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, }; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "std")] mod mutex; diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index 618bf1b..2cc144e 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -19,7 +19,8 @@ pin_project! { struct OrderWrapper { #[pin] data: T, // A future or a future's output - index: isize, + // Use i64 for index since isize may overflow in 32-bit targets. + index: i64, } } @@ -58,36 +59,39 @@ where /// An unbounded queue of futures. /// -/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order -/// on top of the set of futures. While futures in the set will race to +/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO +/// order on top of the set of futures. While futures in the set will race to /// completion in parallel, results will only be returned in the order their /// originating futures were added to the queue. /// /// Futures are pushed into this queue and their realized values are yielded in /// order. This structure is optimized to manage a large number of futures. -/// Futures managed by `FuturesOrdered` will only be polled when they generate +/// Futures managed by [`FuturesOrdered`] will only be polled when they generate /// notifications. This reduces the required amount of work needed to coordinate /// large numbers of futures. /// -/// When a `FuturesOrdered` is first created, it does not contain any futures. -/// Calling `poll` in this state will result in `Poll::Ready(None))` to be -/// returned. Futures are submitted to the queue using `push`; however, the -/// future will **not** be polled at this point. `FuturesOrdered` will only -/// poll managed futures when `FuturesOrdered::poll` is called. As such, it -/// is important to call `poll` after pushing new futures. +/// When a [`FuturesOrdered`] is first created, it does not contain any futures. +/// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result +/// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted +/// to the queue using [`push_back`](FuturesOrdered::push_back) (or +/// [`push_front`](FuturesOrdered::push_front)); however, the future will +/// **not** be polled at this point. [`FuturesOrdered`] will only poll managed +/// futures when [`FuturesOrdered::poll_next`] is called. As such, it +/// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing +/// new futures. /// -/// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that -/// the queue is currently not managing any futures. A future may be submitted -/// to the queue at a later time. At that point, a call to -/// `FuturesOrdered::poll` will either return the future's resolved value -/// **or** `Poll::Pending` if the future has not yet completed. When -/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will -/// return `Poll::Pending` until the first future completes, even if +/// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready) +/// this means that the queue is currently not managing any futures. A future +/// may be submitted to the queue at a later time. At that point, a call to +/// [`FuturesOrdered::poll_next`] will either return the future's resolved value +/// **or** [`Poll::Pending`] if the future has not yet completed. When +/// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`] +/// will return [`Poll::Pending`] until the first future completes, even if /// some of the later futures have already completed. /// -/// Note that you can create a ready-made `FuturesOrdered` via the +/// Note that you can create a ready-made [`FuturesOrdered`] via the /// [`collect`](Iterator::collect) method, or you can start with an empty queue -/// with the `FuturesOrdered::new` constructor. +/// with the [`FuturesOrdered::new`] constructor. /// /// This type is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. @@ -95,8 +99,8 @@ where pub struct FuturesOrdered { in_progress_queue: FuturesUnordered>, queued_outputs: BinaryHeap>, - next_incoming_index: isize, - next_outgoing_index: isize, + next_incoming_index: i64, + next_outgoing_index: i64, } impl Unpin for FuturesOrdered {} @@ -104,8 +108,9 @@ impl Unpin for FuturesOrdered {} impl FuturesOrdered { /// Constructs a new, empty `FuturesOrdered` /// - /// The returned `FuturesOrdered` does not contain any futures and, in this - /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. + /// The returned [`FuturesOrdered`] does not contain any futures and, in + /// this state, [`FuturesOrdered::poll_next`] will return + /// [`Poll::Ready(None)`](Poll::Ready). pub fn new() -> Self { Self { in_progress_queue: FuturesUnordered::new(), @@ -132,9 +137,9 @@ impl FuturesOrdered { /// Push a future into the queue. /// /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { self.push_back(future); @@ -143,9 +148,9 @@ impl FuturesOrdered { /// Pushes a future to the back of the queue. /// /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; @@ -155,10 +160,10 @@ impl FuturesOrdered { /// Pushes a future to the front of the queue. /// /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. This future will be the next future to be returned - /// complete. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. This future will be + /// the next future to be returned complete. pub fn push_front(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; self.next_outgoing_index -= 1; diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 6b5804d..dedf75d 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -62,7 +62,7 @@ pub struct FuturesUnordered { } unsafe impl Send for FuturesUnordered {} -unsafe impl Sync for FuturesUnordered {} +unsafe impl Sync for FuturesUnordered {} impl Unpin for FuturesUnordered {} impl Spawn for FuturesUnordered> { @@ -558,20 +558,7 @@ impl Debug for FuturesUnordered { impl FuturesUnordered { /// Clears the set, removing all futures. pub fn clear(&mut self) { - self.clear_head_all(); - - // we just cleared all the tasks, and we have &mut self, so this is safe. - unsafe { self.ready_to_run_queue.clear() }; - - self.is_terminated.store(false, Relaxed); - } - - fn clear_head_all(&mut self) { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); - } + *self = Self::new(); } } @@ -581,7 +568,11 @@ impl Drop for FuturesUnordered { // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. - self.clear_head_all(); + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + } // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs index 4518705..a924935 100644 --- a/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/src/stream/futures_unordered/ready_to_run_queue.rs @@ -85,38 +85,25 @@ impl ReadyToRunQueue { pub(super) fn stub(&self) -> *const Task { Arc::as_ptr(&self.stub) } - - // Clear the queue of tasks. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. This method just pulls out - // tasks and drops their refcounts. - // - // # Safety - // - // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) - // - The caller **must** guarantee unique access to `self` - pub(crate) unsafe fn clear(&self) { - loop { - // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } - } } impl Drop for ReadyToRunQueue { fn drop(&mut self) { // Once we're in the destructor for `Inner` we need to clear out // the ready to run queue of tasks if there's anything left in there. - - // All tasks have had their futures dropped already by the `FuturesUnordered` - // destructor above, and we have &mut self, so this is safe. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. All tasks should have had + // their futures dropped already by the `FuturesUnordered` destructor + // above, so we're just pulling out tasks and dropping their refcounts. unsafe { - self.clear(); + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } } } } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index ec685b9..2438e58 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, - Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, + Unzip, Zip, }; #[cfg(feature = "std")] @@ -36,11 +37,13 @@ pub use self::stream::ReadyChunks; #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::stream::Forward; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; +pub use self::stream::{ + BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent, +}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] @@ -48,9 +51,9 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ - try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, - TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext, - TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, + try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll, + TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, + TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; #[cfg(feature = "io")] @@ -58,12 +61,14 @@ pub use self::try_stream::{ #[cfg(feature = "std")] pub use self::try_stream::IntoAsyncRead; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +pub use self::try_stream::{ + TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent, +}; #[cfg(feature = "alloc")] -pub use self::try_stream::{TryChunks, TryChunksError}; +pub use self::try_stream::{TryChunks, TryChunksError, TryReadyChunks, TryReadyChunksError}; // Primitive streams @@ -100,36 +105,36 @@ pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithS mod unfold; pub use self::unfold::{unfold, Unfold}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod futures_ordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use self::futures_ordered::FuturesOrdered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod futures_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[doc(inline)] pub use self::futures_unordered::FuturesUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod select_all; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[doc(inline)] pub use self::select_all::{select_all, SelectAll}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod abortable; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use abortable::abortable; diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index 3474331..121b6a0 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs @@ -8,29 +8,24 @@ use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project_lite::pin_project; - use super::assert_stream; use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; -pin_project! { - /// An unbounded set of streams - /// - /// This "combinator" provides the ability to maintain a set of streams - /// and drive them all to completion. - /// - /// Streams are pushed into this set and their realized values are - /// yielded as they become ready. Streams will only be polled when they - /// generate notifications. This allows to coordinate a large number of streams. - /// - /// Note that you can create a ready-made `SelectAll` via the - /// `select_all` function in the `stream` module, or you can start with an - /// empty set with the `SelectAll::new` constructor. - #[must_use = "streams do nothing unless polled"] - pub struct SelectAll { - #[pin] - inner: FuturesUnordered>, - } +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll { + inner: FuturesUnordered>, } impl Debug for SelectAll { diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index ba2baa5..1435c79 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("All") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(true), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,21 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() && ready!(fut.poll(cx)); - if !acc { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.future.set(None); + if !res { + *this.done = true; break false; } // early exit - *this.accum = Some(acc); - this.future.set(None); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break true; } } } else { diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index f023125..cc3d695 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Any") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(false), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,21 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() || ready!(fut.poll(cx)); - if acc { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.future.set(None); + if res { + *this.done = true; break true; } // early exit - *this.accum = Some(acc); - this.future.set(None); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break false; } } } else { diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs index 07f971c..44c6ace 100644 --- a/src/stream/stream/flatten_unordered.rs +++ b/src/stream/stream/flatten_unordered.rs @@ -3,6 +3,7 @@ use core::{ cell::UnsafeCell, convert::identity, fmt, + marker::PhantomData, num::NonZeroUsize, pin::Pin, sync::atomic::{AtomicU8, Ordering}, @@ -22,8 +23,11 @@ use futures_task::{waker, ArcWake}; use crate::stream::FuturesUnordered; -/// There is nothing to poll and stream isn't being -/// polled or waking at the moment. +/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) +/// method. +pub type FlattenUnordered = FlattenUnorderedWithFlowController; + +/// There is nothing to poll and stream isn't being polled/waking/woken at the moment. const NONE: u8 = 0; /// Inner streams need to be polled. @@ -32,26 +36,19 @@ const NEED_TO_POLL_INNER_STREAMS: u8 = 1; /// The base stream needs to be polled. const NEED_TO_POLL_STREAM: u8 = 0b10; -/// It needs to poll base stream and inner streams. +/// Both base stream and inner streams need to be polled. const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM; /// The current stream is being polled at the moment. const POLLING: u8 = 0b100; -/// Inner streams are being woken at the moment. -const WAKING_INNER_STREAMS: u8 = 0b1000; - -/// The base stream is being woken at the moment. -const WAKING_STREAM: u8 = 0b10000; - -/// The base stream and inner streams are being woken at the moment. -const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS; +/// Stream is being woken at the moment. +const WAKING: u8 = 0b1000; /// The stream was waked and will be polled. -const WOKEN: u8 = 0b100000; +const WOKEN: u8 = 0b10000; -/// Determines what needs to be polled, and is stream being polled at the -/// moment or not. +/// Internal polling state of the stream. #[derive(Clone, Debug)] struct SharedPollState { state: Arc, @@ -64,14 +61,14 @@ impl SharedPollState { } /// Attempts to start polling, returning stored state in case of success. - /// Returns `None` if some waker is waking at the moment. + /// Returns `None` if either waker is waking at the moment. fn start_polling( &self, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - if value & WAKING_ALL == NONE { + if value & WAKING == NONE { Some(POLLING) } else { None @@ -83,23 +80,20 @@ impl SharedPollState { Some((value, bomb)) } - /// Starts the waking process and performs bitwise or with the given value. + /// Attempts to start the waking process and performs bitwise or with the given value. + /// + /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however + /// state will be disjuncted with the given value. fn start_waking( &self, to_poll: u8, - waking: u8, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - // Waking process for this waker already started - if value & waking != NONE { - return None; - } let mut next_value = value | to_poll; - // Only start the waking process if we're not in the polling phase and the stream isn't woken already if value & (WOKEN | POLLING) == NONE { - next_value |= waking; + next_value |= WAKING; } if next_value != value { @@ -110,8 +104,9 @@ impl SharedPollState { }) .ok()?; - if value & (WOKEN | POLLING) == NONE { - let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); + // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already + if value & (WOKEN | POLLING | WAKING) == NONE { + let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); Some((value, bomb)) } else { @@ -123,7 +118,7 @@ impl SharedPollState { /// - `!POLLING` allowing to use wakers /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, /// or `will_be_woken` flag supplied - /// - `!WAKING_ALL` as + /// - `!WAKING` as /// * Wakers called during the `POLLING` phase won't propagate their calls /// * `POLLING` phase can't start if some of the wakers are active /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. @@ -138,20 +133,17 @@ impl SharedPollState { } next_value |= value; - Some(next_value & !POLLING & !WAKING_ALL) + Some(next_value & !POLLING & !WAKING) }) .unwrap() } /// Toggles state to non-waking, allowing to start polling. - fn stop_waking(&self, waking: u8) -> u8 { - self.state + fn stop_waking(&self) -> u8 { + let value = self + .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let mut next_value = value & !waking; - // Waker will be called only if the current waking state is the same as the specified waker state - if value & WAKING_ALL == waking { - next_value |= WOKEN; - } + let next_value = value & !WAKING | WOKEN; if next_value != value { Some(next_value) @@ -159,12 +151,15 @@ impl SharedPollState { None } }) - .unwrap_or_else(identity) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value } /// Resets current state allowing to poll the stream and wake up wakers. fn reset(&self) -> u8 { - self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel) + self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst) } } @@ -184,11 +179,6 @@ impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { fn deactivate(mut self) { self.drop.take(); } - - /// Manually fires the bomb, returning supplied state. - fn fire(mut self) -> Option { - self.drop.take().map(|drop| (drop)(self.state)) - } } impl u8> Drop for PollStateBomb<'_, F> { @@ -201,16 +191,16 @@ impl u8> Drop for PollStateBomb<'_, F> { /// Will update state with the provided value on `wake_by_ref` call /// and then, if there is a need, call `inner_waker`. -struct InnerWaker { +struct WrappedWaker { inner_waker: UnsafeCell>, poll_state: SharedPollState, need_to_poll: u8, } -unsafe impl Send for InnerWaker {} -unsafe impl Sync for InnerWaker {} +unsafe impl Send for WrappedWaker {} +unsafe impl Sync for WrappedWaker {} -impl InnerWaker { +impl WrappedWaker { /// Replaces given waker's inner_waker for polling stream/futures which will /// update poll state on `wake_by_ref` call. Use only if you need several /// contexts. @@ -218,25 +208,19 @@ impl InnerWaker { /// ## Safety /// /// This function will modify waker's `inner_waker` via `UnsafeCell`, so - /// it should be used only during `POLLING` phase. - unsafe fn replace_waker(self_arc: &mut Arc, cx: &Context<'_>) -> Waker { + /// it should be used only during `POLLING` phase by one thread at the time. + unsafe fn replace_waker(self_arc: &mut Arc, cx: &Context<'_>) { *self_arc.inner_waker.get() = cx.waker().clone().into(); - waker(self_arc.clone()) } /// Attempts to start the waking process for the waker with the given value. /// If succeeded, then the stream isn't yet woken and not being polled at the moment. fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { - self.poll_state.start_waking(self.need_to_poll, self.waking_state()) - } - - /// Returns the corresponding waking state toggled by this waker. - fn waking_state(&self) -> u8 { - self.need_to_poll << 3 + self.poll_state.start_waking(self.need_to_poll) } } -impl ArcWake for InnerWaker { +impl ArcWake for WrappedWaker { fn wake_by_ref(self_arc: &Arc) { if let Some((_, state_bomb)) = self_arc.start_waking() { // Safety: now state is not `POLLING` @@ -244,24 +228,17 @@ impl ArcWake for InnerWaker { if let Some(inner_waker) = waker_opt.clone() { // Stop waking to allow polling stream - let poll_state_value = state_bomb.fire().unwrap(); - - // Here we want to call waker only if stream isn't woken yet and - // also to optimize the case when two wakers are called at the same time. - // - // In this case the best strategy will be to propagate only the latest waker's awake, - // and then poll both entities in a single `poll_next` call - if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() { - // Wake up inner waker - inner_waker.wake(); - } + drop(state_bomb); + + // Wake up inner waker + inner_waker.wake(); } } } } pin_project! { - /// Future which contains optional stream. + /// Future which polls optional inner stream. /// /// If it's `Some`, it will attempt to call `poll_next` on it, /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` @@ -303,10 +280,10 @@ impl Future for PollStreamFut { pin_project! { /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) - /// method. - #[project = FlattenUnorderedProj] + /// method with ability to specify flow controller. + #[project = FlattenUnorderedWithFlowControllerProj] #[must_use = "streams do nothing unless polled"] - pub struct FlattenUnordered where St: Stream { + pub struct FlattenUnorderedWithFlowController where St: Stream { #[pin] inner_streams: FuturesUnordered>, #[pin] @@ -314,80 +291,110 @@ pin_project! { poll_state: SharedPollState, limit: Option, is_stream_done: bool, - inner_streams_waker: Arc, - stream_waker: Arc, + inner_streams_waker: Arc, + stream_waker: Arc, + flow_controller: PhantomData } } -impl fmt::Debug for FlattenUnordered +impl fmt::Debug for FlattenUnorderedWithFlowController where St: Stream + fmt::Debug, St::Item: Stream + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FlattenUnordered") + f.debug_struct("FlattenUnorderedWithFlowController") .field("poll_state", &self.poll_state) .field("inner_streams", &self.inner_streams) .field("limit", &self.limit) .field("stream", &self.stream) .field("is_stream_done", &self.is_stream_done) + .field("flow_controller", &self.flow_controller) .finish() } } -impl FlattenUnordered +impl FlattenUnorderedWithFlowController where St: Stream, + Fc: FlowController::Item>, St::Item: Stream + Unpin, { - pub(super) fn new(stream: St, limit: Option) -> FlattenUnordered { + pub(crate) fn new( + stream: St, + limit: Option, + ) -> FlattenUnorderedWithFlowController { let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); - FlattenUnordered { + FlattenUnorderedWithFlowController { inner_streams: FuturesUnordered::new(), stream, is_stream_done: false, limit: limit.and_then(NonZeroUsize::new), - inner_streams_waker: Arc::new(InnerWaker { + inner_streams_waker: Arc::new(WrappedWaker { inner_waker: UnsafeCell::new(None), poll_state: poll_state.clone(), need_to_poll: NEED_TO_POLL_INNER_STREAMS, }), - stream_waker: Arc::new(InnerWaker { + stream_waker: Arc::new(WrappedWaker { inner_waker: UnsafeCell::new(None), poll_state: poll_state.clone(), need_to_poll: NEED_TO_POLL_STREAM, }), poll_state, + flow_controller: PhantomData, } } delegate_access_inner!(stream, St, ()); } -impl FlattenUnorderedProj<'_, St> +/// Returns the next flow step based on the received item. +pub trait FlowController { + /// Handles an item producing `FlowStep` describing the next flow step. + fn next_step(item: I) -> FlowStep; +} + +impl FlowController for () { + fn next_step(item: I) -> FlowStep { + FlowStep::Continue(item) + } +} + +/// Describes the next flow step. +#[derive(Debug, Clone)] +pub enum FlowStep { + /// Just yields an item and continues standard flow. + Continue(C), + /// Immediately returns an underlying item from the function. + Return(R), +} + +impl FlattenUnorderedWithFlowControllerProj<'_, St, Fc> where St: Stream, { - /// Checks if current `inner_streams` size is less than optional limit. + /// Checks if current `inner_streams` bucket size is greater than optional limit. fn is_exceeded_limit(&self) -> bool { self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) } } -impl FusedStream for FlattenUnordered +impl FusedStream for FlattenUnorderedWithFlowController where St: FusedStream, - St::Item: FusedStream + Unpin, + Fc: FlowController::Item>, + St::Item: Stream + Unpin, { fn is_terminated(&self) -> bool { self.stream.is_terminated() && self.inner_streams.is_empty() } } -impl Stream for FlattenUnordered +impl Stream for FlattenUnorderedWithFlowController where St: Stream, + Fc: FlowController::Item>, St::Item: Stream + Unpin, { type Item = ::Item; @@ -398,17 +405,21 @@ where let mut this = self.as_mut().project(); - let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() { - Some(value) => value, - _ => { - // Waker was called, just wait for the next poll - return Poll::Pending; + // Attempt to start polling, in case some waker is holding the lock, wait in loop + let (mut poll_state_value, state_bomb) = loop { + if let Some(value) = this.poll_state.start_polling() { + break value; } }; + // Safety: now state is `POLLING`. + unsafe { + WrappedWaker::replace_waker(this.stream_waker, cx); + WrappedWaker::replace_waker(this.inner_streams_waker, cx) + }; + if poll_state_value & NEED_TO_POLL_STREAM != NONE { - // Safety: now state is `POLLING`. - let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) }; + let mut stream_waker = None; // Here we need to poll the base stream. // @@ -424,15 +435,35 @@ where break; } else { - match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) { - Poll::Ready(Some(inner_stream)) => { + let mut cx = Context::from_waker( + stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())), + ); + + match this.stream.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(item)) => { + let next_item_fut = match Fc::next_step(item) { + // Propagates an item immediately (the main use-case is for errors) + FlowStep::Return(item) => { + need_to_poll_next |= NEED_TO_POLL_STREAM + | (poll_state_value & NEED_TO_POLL_INNER_STREAMS); + poll_state_value &= !NEED_TO_POLL_INNER_STREAMS; + + next_item = Some(item); + + break; + } + // Yields an item and continues processing (normal case) + FlowStep::Continue(inner_stream) => { + PollStreamFut::new(inner_stream) + } + }; // Add new stream to the inner streams bucket - this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream)); + this.inner_streams.as_mut().push(next_item_fut); // Inner streams must be polled afterward poll_state_value |= NEED_TO_POLL_INNER_STREAMS; } Poll::Ready(None) => { - // Mark the stream as done + // Mark the base stream as done *this.is_stream_done = true; } Poll::Pending => { @@ -444,15 +475,10 @@ where } if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { - // Safety: now state is `POLLING`. - let inner_streams_waker = - unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) }; - - match this - .inner_streams - .as_mut() - .poll_next(&mut Context::from_waker(&inner_streams_waker)) - { + let inner_streams_waker = waker(this.inner_streams_waker.clone()); + let mut cx = Context::from_waker(&inner_streams_waker); + + match this.inner_streams.as_mut().poll_next(&mut cx) { Poll::Ready(Some(Some((item, next_item_fut)))) => { // Push next inner stream item future to the list of inner streams futures this.inner_streams.as_mut().push(next_item_fut); @@ -472,15 +498,16 @@ where // We didn't have any `poll_next` panic, so it's time to deactivate the bomb state_bomb.deactivate(); + // Call the waker at the end of polling if let mut force_wake = // we need to poll the stream and didn't reach the limit yet need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() - // or we need to poll inner streams again + // or we need to poll the inner streams again || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; // Stop polling and swap the latest state poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); - // If state was changed during `POLLING` phase, need to manually call a waker + // If state was changed during `POLLING` phase, we also need to manually call a waker force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; let is_done = *this.is_stream_done && this.inner_streams.is_empty(); @@ -499,7 +526,7 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for FlattenUnordered +impl Sink for FlattenUnorderedWithFlowController where St: Stream + Sink, { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index bb5e249..2da7036 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -181,32 +181,32 @@ mod scan; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::scan::Scan; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod buffer_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::buffer_unordered::BufferUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::buffered::Buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -mod flatten_unordered; +pub(crate) mod flatten_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] pub use self::flatten_unordered::FlattenUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] delegate_all!( /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. @@ -216,20 +216,20 @@ delegate_all!( where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U ); -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod for_each_concurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::for_each_concurrent::ForEachConcurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] mod split; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] @@ -323,6 +323,9 @@ pub trait StreamExt: Stream { /// wrapped version of it, similar to the existing `map` methods in the /// standard library. /// + /// See [`StreamExt::then`](Self::then) if you want to use a closure that + /// returns a future instead of a value. + /// /// # Examples /// /// ``` @@ -467,6 +470,9 @@ pub trait StreamExt: Stream { /// Note that this function consumes the stream passed into it and returns a /// wrapped version of it. /// + /// See [`StreamExt::map`](Self::map) if you want to use a closure that + /// returns a value instead of a future. + /// /// # Examples /// /// ``` @@ -774,7 +780,14 @@ pub trait StreamExt: Stream { } /// Flattens a stream of streams into just one continuous stream. Polls - /// inner streams concurrently. + /// inner streams produced by the base stream concurrently. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. /// /// # Examples /// @@ -807,14 +820,14 @@ pub trait StreamExt: Stream { /// assert_eq!(output, vec![1, 2, 3, 4]); /// # }); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn flatten_unordered(self, limit: impl Into>) -> FlattenUnordered where Self::Item: Stream + Unpin, Self: Sized, { - FlattenUnordered::new(self, limit.into()) + assert_stream::<::Item, _>(FlattenUnordered::new(self, limit.into())) } /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. @@ -863,7 +876,7 @@ pub trait StreamExt: Stream { /// /// The first argument is an optional limit on the number of concurrently /// polled streams. If this limit is not `None`, no more than `limit` streams - /// will be polled concurrently. The `limit` argument is of type + /// will be polled at the same time. The `limit` argument is of type /// `Into>`, and so can be provided as either `None`, /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as /// no limit at all, and will have the same result as passing in `None`. @@ -889,7 +902,7 @@ pub trait StreamExt: Stream { /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); /// # }); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn flat_map_unordered( self, @@ -901,7 +914,7 @@ pub trait StreamExt: Stream { F: FnMut(Self::Item) -> U, Self: Sized, { - FlatMapUnordered::new(self, limit.into(), f) + assert_stream::(FlatMapUnordered::new(self, limit.into(), f)) } /// Combinator similar to [`StreamExt::fold`] that holds internal state @@ -1129,7 +1142,7 @@ pub trait StreamExt: Stream { /// fut.await; /// # }) /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn for_each_concurrent( self, @@ -1352,7 +1365,7 @@ pub trait StreamExt: Stream { /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn buffered(self, n: usize) -> Buffered where @@ -1397,7 +1410,7 @@ pub trait StreamExt: Stream { /// assert_eq!(buffered.next().await, None); /// # Ok::<(), i32>(()) }).unwrap(); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn buffer_unordered(self, n: usize) -> BufferUnordered where @@ -1564,7 +1577,7 @@ pub trait StreamExt: Stream { /// library is activated, and it is activated by default. #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn split(self) -> (SplitSink, SplitStream) where diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index e2034e0..1a7fdcb 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs @@ -15,6 +15,13 @@ pub struct SplitStream(BiLock); impl Unpin for SplitStream {} +impl SplitStream { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitSink) -> bool { + other.is_pair_of(&self) + } +} + impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are @@ -60,6 +67,13 @@ impl + Unpin, Item> SplitSink { } } +impl SplitSink { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair_of(&self, other: &SplitStream) -> bool { + self.lock.is_pair_of(&other.0) + } +} + impl, Item> SplitSink { fn poll_flush_slot( mut inner: Pin<&mut S>, @@ -142,3 +156,69 @@ impl fmt::Display for ReuniteError { #[cfg(feature = "std")] impl std::error::Error for ReuniteError {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{sink::Sink, stream::StreamExt}; + use core::marker::PhantomData; + + struct NopStream { + phantom: PhantomData, + } + + impl Stream for NopStream { + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + todo!() + } + } + + impl Sink for NopStream { + type Error = (); + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + } + + #[test] + fn test_pairing() { + let s1 = NopStream::<()> { phantom: PhantomData }; + let (sink1, stream1) = s1.split(); + assert!(sink1.is_pair_of(&stream1)); + assert!(stream1.is_pair_of(&sink1)); + + let s2 = NopStream::<()> { phantom: PhantomData }; + let (sink2, stream2) = s2.split(); + assert!(sink2.is_pair_of(&stream2)); + assert!(stream2.is_pair_of(&sink2)); + + assert!(!sink1.is_pair_of(&stream2)); + assert!(!stream1.is_pair_of(&sink2)); + assert!(!sink2.is_pair_of(&stream1)); + assert!(!stream2.is_pair_of(&sink1)); + } +} diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index bc4c6e4..7b55444 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map}; #[cfg(feature = "alloc")] use alloc::vec::Vec; use core::pin::Pin; + use futures_core::{ future::{Future, TryFuture}, stream::TryStream, @@ -88,6 +89,14 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +mod try_flatten_unordered; +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_flatten_unordered::TryFlattenUnordered; + mod try_collect; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_collect::TryCollect; @@ -102,6 +111,12 @@ mod try_chunks; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_chunks::{TryChunks, TryChunksError}; +#[cfg(feature = "alloc")] +mod try_ready_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError}; + mod try_fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_fold::TryFold; @@ -118,26 +133,26 @@ mod try_take_while; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_take_while::TryTakeWhile; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_buffer_unordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_buffer_unordered::TryBufferUnordered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_buffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_buffered::TryBuffered; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] mod try_for_each_concurrent; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_for_each_concurrent::TryForEachConcurrent; @@ -151,6 +166,14 @@ mod into_async_read; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; +mod try_all; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_all::TryAll; + +mod try_any; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_any::TryAny; + impl TryStreamExt for S {} /// Adapters specific to `Result`-returning streams @@ -528,7 +551,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(Err(oneshot::Canceled), fut.await); /// # }) /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_for_each_concurrent( self, @@ -631,6 +654,55 @@ pub trait TryStreamExt: TryStream { ) } + /// An adaptor for chunking up successful, ready items of the stream inside a vector. + /// + /// This combinator will attempt to pull successful items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. If the underlying stream + /// returns `Poll::Pending`, and the collected chunk is not empty, it will + /// be immidiatly returned. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// This function is similar to + /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, TryReadyChunksError, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); + /// let mut stream = stream.try_ready_chunks(2); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); + /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4))); + /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); + /// # }) + /// ``` + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks + where + Self: Sized, + { + assert_stream::, TryReadyChunksError>, _>( + TryReadyChunks::new(self, capacity), + ) + } + /// Attempt to filter the values produced by this stream according to the /// provided asynchronous closure. /// @@ -711,6 +783,63 @@ pub trait TryStreamExt: TryStream { assert_stream::, _>(TryFilterMap::new(self, f)) } + /// Flattens a stream of streams into just one continuous stream. Produced streams + /// will be polled concurrently and any errors will be passed through without looking at them. + /// If the underlying base stream returns an error, it will be **immediately** propagated. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(Ok(1)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(Ok(2)).unwrap(); + /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(Ok(rx1)).unwrap(); + /// tx3.unbounded_send(Ok(rx2)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); + /// }); + /// + /// let stream = rx3.try_flatten_unordered(None); + /// let mut values: Vec<_> = stream.collect().await; + /// values.sort(); + /// + /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); + /// # }); + /// ``` + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + fn try_flatten_unordered(self, limit: impl Into>) -> TryFlattenUnordered + where + Self::Ok: TryStream + Unpin, + ::Error: From, + Self: Sized, + { + assert_stream::::Ok, ::Error>, _>( + TryFlattenUnordered::new(self, limit), + ) + } + /// Flattens a stream of streams into just one continuous stream. /// /// If this stream's elements are themselves streams then this combinator @@ -900,7 +1029,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered where @@ -976,7 +1105,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - #[cfg(not(futures_no_atomic_cas))] + #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn try_buffered(self, n: usize) -> TryBuffered where @@ -1061,4 +1190,62 @@ pub trait TryStreamExt: TryStream { { crate::io::assert_read(IntoAsyncRead::new(self)) } + + /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items + /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found + /// that does not satisfy the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use std::convert::Infallible; + /// + /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>); + /// let positive = number_stream.try_all(|i| async move { i > 0 }); + /// assert_eq!(positive.await, Ok(true)); + /// + /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); + /// let positive = stream_with_errors.try_all(|i| async move { i > 0 }); + /// assert_eq!(positive.await, Err("err")); + /// # }); + /// ``` + fn try_all(self, f: F) -> TryAll + where + Self: Sized, + F: FnMut(Self::Ok) -> Fut, + Fut: Future, + { + assert_future::, _>(TryAll::new(self, f)) + } + + /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items + /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found + /// that satisfies the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use std::convert::Infallible; + /// + /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>); + /// let contain_three = number_stream.try_any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, Ok(true)); + /// + /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); + /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, Err("err")); + /// # }); + /// ``` + fn try_any(self, f: F) -> TryAny + where + Self: Sized, + F: FnMut(Self::Ok) -> Fut, + Fut: Future, + { + assert_future::, _>(TryAny::new(self, f)) + } } diff --git a/src/stream/try_stream/try_all.rs b/src/stream/try_stream/try_all.rs new file mode 100644 index 0000000..8179f86 --- /dev/null +++ b/src/stream/try_stream/try_all.rs @@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_all`](super::TryStreamExt::try_all) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryAll { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + future: Option, + } +} + +impl fmt::Debug for TryAll +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryAll") + .field("stream", &self.stream) + .field("done", &self.done) + .field("future", &self.future) + .finish() + } +} + +impl TryAll +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, future: None } + } +} + +impl FusedFuture for TryAll +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done && self.future.is_none() + } +} + +impl Future for TryAll +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let acc = ready!(fut.poll(cx)); + this.future.set(None); + if !acc { + *this.done = true; + break Ok(false); + } // early exit + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => { + this.future.set(Some((this.f)(item))); + } + Some(Err(err)) => { + *this.done = true; + break Err(err); + } + None => { + *this.done = true; + break Ok(true); + } + } + } else { + panic!("TryAll polled after completion") + } + }) + } +} diff --git a/src/stream/try_stream/try_any.rs b/src/stream/try_stream/try_any.rs new file mode 100644 index 0000000..55e876b --- /dev/null +++ b/src/stream/try_stream/try_any.rs @@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_any`](super::TryStreamExt::try_any) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryAny { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + future: Option, + } +} + +impl fmt::Debug for TryAny +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryAny") + .field("stream", &self.stream) + .field("done", &self.done) + .field("future", &self.future) + .finish() + } +} + +impl TryAny +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, future: None } + } +} + +impl FusedFuture for TryAny +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done && self.future.is_none() + } +} + +impl Future for TryAny +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let acc = ready!(fut.poll(cx)); + this.future.set(None); + if acc { + *this.done = true; + break Ok(true); + } // early exit + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => { + this.future.set(Some((this.f)(item))); + } + Some(Err(err)) => { + *this.done = true; + break Err(err); + } + None => { + *this.done = true; + break Ok(false); + } + } + } else { + panic!("TryAny polled after completion") + } + }) + } +} diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs index 3bb253a..ec53f4b 100644 --- a/src/stream/try_stream/try_chunks.rs +++ b/src/stream/try_stream/try_chunks.rs @@ -41,9 +41,10 @@ impl TryChunks { delegate_access_inner!(stream, St, (. .)); } +type TryChunksStreamError = TryChunksError<::Ok, ::Error>; + impl Stream for TryChunks { - #[allow(clippy::type_complexity)] - type Item = Result, TryChunksError>; + type Item = Result, TryChunksStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); diff --git a/src/stream/try_stream/try_flatten_unordered.rs b/src/stream/try_stream/try_flatten_unordered.rs new file mode 100644 index 0000000..a74dfc4 --- /dev/null +++ b/src/stream/try_stream/try_flatten_unordered.rs @@ -0,0 +1,176 @@ +use core::marker::PhantomData; +use core::pin::Pin; + +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use pin_project_lite::pin_project; + +use crate::future::Either; +use crate::stream::stream::flatten_unordered::{ + FlattenUnorderedWithFlowController, FlowController, FlowStep, +}; +use crate::stream::IntoStream; +use crate::TryStreamExt; + +delegate_all!( + /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. + TryFlattenUnordered( + FlattenUnorderedWithFlowController, PropagateBaseStreamError> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + + New[ + |stream: St, limit: impl Into>| + FlattenUnorderedWithFlowController::new( + NestedTryStreamIntoEitherTryStream::new(stream), + limit.into() + ) + ] + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + ::Error: From +); + +pin_project! { + /// Emits either successful streams or single-item streams containing the underlying errors. + /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct NestedTryStreamIntoEitherTryStream + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + ::Error: From + { + #[pin] + stream: St + } +} + +impl NestedTryStreamIntoEitherTryStream +where + St: TryStream, + St::Ok: TryStream + Unpin, + ::Error: From, +{ + fn new(stream: St) -> Self { + Self { stream } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Emits a single item immediately, then stream will be terminated. +#[derive(Debug, Clone)] +pub struct Single(Option); + +impl Single { + /// Constructs new `Single` with the given value. + fn new(val: T) -> Self { + Self(Some(val)) + } + + /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. + fn next_immediate(&mut self) -> Option { + self.0.take() + } +} + +impl Unpin for Single {} + +impl Stream for Single { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.0.take()) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) + } +} + +/// Immediately propagates errors occurred in the base stream. +#[derive(Debug, Clone, Copy)] +pub struct PropagateBaseStreamError(PhantomData); + +type BaseStreamItem = as Stream>::Item; +type InnerStreamItem = as Stream>::Item; + +impl FlowController, InnerStreamItem> for PropagateBaseStreamError +where + St: TryStream, + St::Ok: TryStream + Unpin, + ::Error: From, +{ + fn next_step(item: BaseStreamItem) -> FlowStep, InnerStreamItem> { + match item { + // A new successful inner stream received + st @ Either::Left(_) => FlowStep::Continue(st), + // An error encountered + Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), + } + } +} + +type SingleStreamResult = Single::Ok, ::Error>>; + +impl Stream for NestedTryStreamIntoEitherTryStream +where + St: TryStream, + St::Ok: TryStream + Unpin, + ::Error: From, +{ + // Item is either an inner stream or a stream containing a single error. + // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. + type Item = Either, SingleStreamResult>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = ready!(self.project().stream.try_poll_next(cx)); + + let out = match item { + Some(res) => match res { + // Emit successful inner stream as is + Ok(stream) => Either::Left(stream.into_stream()), + // Wrap an error into a stream containing a single item + err @ Err(_) => { + let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); + + Either::Right(Single::new(res)) + } + }, + None => return Poll::Ready(None), + }; + + Poll::Ready(Some(out)) + } +} + +impl FusedStream for NestedTryStreamIntoEitherTryStream +where + St: TryStream + FusedStream, + St::Ok: TryStream + Unpin, + ::Error: From, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for NestedTryStreamIntoEitherTryStream +where + St: TryStream + Sink, + St::Ok: TryStream + Unpin, + ::Error: From<::Error>, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/try_stream/try_ready_chunks.rs b/src/stream/try_stream/try_ready_chunks.rs new file mode 100644 index 0000000..8b1470e --- /dev/null +++ b/src/stream/try_stream/try_ready_chunks.rs @@ -0,0 +1,126 @@ +use crate::stream::{Fuse, IntoStream, StreamExt}; + +use alloc::vec::Vec; +use core::fmt; +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryReadyChunks { + #[pin] + stream: Fuse>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl TryReadyChunks { + pub(super) fn new(stream: St, capacity: usize) -> Self { + assert!(capacity > 0); + + Self { stream: IntoStream::new(stream).fuse(), cap: capacity } + } + + delegate_access_inner!(stream, St, (. .)); +} + +type TryReadyChunksStreamError = + TryReadyChunksError<::Ok, ::Error>; + +impl Stream for TryReadyChunks { + type Item = Result, TryReadyChunksStreamError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + let mut items: Vec = Vec::new(); + + loop { + match this.stream.as_mut().poll_next(cx) { + // Flush all the collected data if the underlying stream doesn't + // contain more ready values + Poll::Pending => { + return if items.is_empty() { + Poll::Pending + } else { + Poll::Ready(Some(Ok(items))) + } + } + + // Push the ready item into the buffer and check whether it is full. + // If so, return the buffer. + Poll::Ready(Some(Ok(item))) => { + if items.is_empty() { + items.reserve_exact(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(Ok(items))); + } + } + + // Return the already collected items and the error. + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(TryReadyChunksError(items, e)))); + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Poll::Ready(None) => { + let last = if items.is_empty() { None } else { Some(Ok(items)) }; + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let (lower, upper) = self.stream.size_hint(); + let lower = lower / self.cap; + (lower, upper) + } +} + +impl FusedStream for TryReadyChunks { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryReadyChunks +where + S: TryStream + Sink, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} + +/// Error indicating, that while chunk was collected inner stream produced an error. +/// +/// Contains all items that were collected before an error occurred, and the stream error itself. +#[derive(PartialEq, Eq)] +pub struct TryReadyChunksError(pub Vec, pub E); + +impl fmt::Debug for TryReadyChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +impl fmt::Display for TryReadyChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TryReadyChunksError {} diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index 7d8ef6b..2f48ccc 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -36,7 +36,7 @@ use pin_project_lite::pin_project; /// let stream = stream::unfold(0, |state| async move { /// if state <= 2 { /// let next_state = state + 1; -/// let yielded = state * 2; +/// let yielded = state * 2; /// Some((yielded, next_state)) /// } else { /// None diff --git a/src/task/mod.rs b/src/task/mod.rs index 0a31eea..7a9e993 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -18,19 +18,22 @@ pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, pub use futures_task::noop_waker; pub use futures_task::noop_waker_ref; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use futures_task::ArcWake; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use futures_task::waker; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub use futures_task::{waker_ref, WakerRef}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg_attr( + target_os = "none", + cfg(any(target_has_atomic = "ptr", feature = "portable-atomic")) +)] pub use futures_core::task::__internal::AtomicWaker; mod spawn; -- cgit v1.2.3