aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeffrey Vander Stoep <jeffv@google.com>2022-12-12 16:28:56 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2022-12-12 16:28:56 +0000
commit6af6ebcb3d409c330a40ea3e6c1a85aa5d812865 (patch)
tree7ba0ba051758d1fc44bf179dda3fda51b245ca65
parent72602f9d88ababd4d05ea7a97bbb66cdc271d6ba (diff)
parentcaf7e3a063bea848e425e7bd76a4b5187697e3d2 (diff)
downloadfutures-util-6af6ebcb3d409c330a40ea3e6c1a85aa5d812865.tar.gz
Merge "Upgrade futures-util to 0.3.25" am: caf7e3a063
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/2337754 Change-Id: Iaa67bf187f0324343bb7493b3e66e543f2ae1bb9 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp4
-rw-r--r--Cargo.toml17
-rw-r--r--Cargo.toml.orig16
-rw-r--r--METADATA12
-rw-r--r--benches/select.rs35
-rw-r--r--no_atomic_cas.rs4
-rw-r--r--src/abortable.rs8
-rw-r--r--src/future/future/shared.rs22
-rw-r--r--src/future/join_all.rs29
-rw-r--r--src/future/pending.rs1
-rw-r--r--src/future/select.rs19
-rw-r--r--src/future/select_all.rs2
-rw-r--r--src/future/select_ok.rs2
-rw-r--r--src/future/try_future/mod.rs6
-rw-r--r--src/future/try_join_all.rs123
-rw-r--r--src/io/copy_buf_abortable.rs124
-rw-r--r--src/io/lines.rs2
-rw-r--r--src/io/mod.rs3
-rw-r--r--src/io/read_exact.rs2
-rw-r--r--src/io/read_line.rs2
-rw-r--r--src/io/read_to_string.rs2
-rw-r--r--src/io/write_all.rs2
-rw-r--r--src/lock/bilock.rs3
-rw-r--r--src/lock/mod.rs22
-rw-r--r--src/lock/mutex.rs155
-rw-r--r--src/sink/drain.rs6
-rw-r--r--src/stream/futures_ordered.rs32
-rw-r--r--src/stream/futures_unordered/iter.rs4
-rw-r--r--src/stream/futures_unordered/mod.rs1
-rw-r--r--src/stream/select_with_strategy.rs143
-rw-r--r--src/stream/stream/buffer_unordered.rs6
-rw-r--r--src/stream/stream/buffered.rs2
-rw-r--r--src/stream/stream/chunks.rs11
-rw-r--r--src/stream/stream/collect.rs2
-rw-r--r--src/stream/stream/filter.rs2
-rw-r--r--src/stream/stream/filter_map.rs2
-rw-r--r--src/stream/stream/mod.rs2
-rw-r--r--src/stream/stream/peek.rs2
-rw-r--r--src/stream/stream/ready_chunks.rs11
-rw-r--r--src/stream/stream/skip_while.rs2
-rw-r--r--src/stream/stream/split.rs2
-rw-r--r--src/stream/stream/take.rs6
-rw-r--r--src/stream/stream/take_while.rs2
-rw-r--r--src/stream/stream/then.rs2
-rw-r--r--src/stream/stream/unzip.rs2
-rw-r--r--src/stream/stream/zip.rs4
-rw-r--r--src/stream/try_stream/and_then.rs2
-rw-r--r--src/stream/try_stream/into_async_read.rs101
-rw-r--r--src/stream/try_stream/mod.rs19
-rw-r--r--src/stream/try_stream/or_else.rs2
-rw-r--r--src/stream/try_stream/try_buffered.rs2
-rw-r--r--src/stream/try_stream/try_chunks.rs6
-rw-r--r--src/stream/try_stream/try_collect.rs2
-rw-r--r--src/stream/try_stream/try_filter.rs2
-rw-r--r--src/stream/try_stream/try_filter_map.rs2
-rw-r--r--src/stream/try_stream/try_skip_while.rs2
-rw-r--r--src/stream/try_stream/try_take_while.rs2
-rw-r--r--src/task/spawn.rs9
59 files changed, 751 insertions, 265 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index b52386f..bd04ae0 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ "sha1": "77d82198c5afd04af3e760a6aa50b7e875289fc3"
},
"path_in_vcs": "futures-util"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index a284541..8b06222 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_test {
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.21",
+ cargo_pkg_version: "0.3.25",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -86,7 +86,7 @@ rust_library {
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.21",
+ cargo_pkg_version: "0.3.25",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/Cargo.toml b/Cargo.toml
index a148319..907dbf1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,12 @@
edition = "2018"
rust-version = "1.45"
name = "futures-util"
-version = "0.3.21"
+version = "0.3.25"
description = """
Common utilities and extension traits for the futures-rs library.
"""
homepage = "https://rust-lang.github.io/futures-rs"
+readme = "README.md"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
@@ -29,33 +30,33 @@ rustdoc-args = [
]
[dependencies.futures-channel]
-version = "0.3.21"
+version = "0.3.25"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.21"
+version = "0.3.25"
default-features = false
[dependencies.futures-io]
-version = "0.3.21"
+version = "0.3.25"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.21"
+version = "=0.3.25"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.21"
+version = "0.3.25"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.21"
+version = "0.3.25"
default-features = false
[dependencies.futures_01]
@@ -68,7 +69,7 @@ version = "2.2"
optional = true
[dependencies.pin-project-lite]
-version = "0.2.4"
+version = "0.2.6"
[dependencies.pin-utils]
version = "0.1.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 46ec854..aeecf0f 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "futures-util"
-version = "0.3.21"
+version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
@@ -34,18 +34,18 @@ write-all-vectored = ["io"]
cfg-target-has-atomic = []
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.21", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.21", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.25", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.25", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.25", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.25", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.25", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.25", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
-pin-project-lite = "0.2.4"
+pin-project-lite = "0.2.6"
[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
diff --git a/METADATA b/METADATA
index ed41eb9..99c8fb7 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/futures-util
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "futures-util"
description: "Common utilities and extension traits for the futures-rs library."
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.21.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.25.crate"
}
- version: "0.3.21"
+ version: "0.3.25"
license_type: NOTICE
last_upgrade_date {
year: 2022
- month: 3
- day: 1
+ month: 12
+ day: 12
}
}
diff --git a/benches/select.rs b/benches/select.rs
new file mode 100644
index 0000000..5410a95
--- /dev/null
+++ b/benches/select.rs
@@ -0,0 +1,35 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::executor::block_on;
+use futures::stream::{repeat, select, StreamExt};
+
+#[bench]
+fn select_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+
+ b.iter(|| {
+ let stream1 = repeat(1).take(STREAM_COUNT);
+ let stream2 = repeat(2).take(STREAM_COUNT);
+ let stream3 = repeat(3).take(STREAM_COUNT);
+ let stream4 = repeat(4).take(STREAM_COUNT);
+ let stream5 = repeat(5).take(STREAM_COUNT);
+ let stream6 = repeat(6).take(STREAM_COUNT);
+ let stream7 = repeat(7).take(STREAM_COUNT);
+ let count = block_on(async {
+ let count = select(
+ stream1,
+ select(
+ stream2,
+ select(stream3, select(stream4, select(stream5, select(stream6, stream7)))),
+ ),
+ )
+ .count()
+ .await;
+ count
+ });
+ assert_eq!(count, STREAM_COUNT * 7);
+ });
+}
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
index 9b05d4b..16ec628 100644
--- a/no_atomic_cas.rs
+++ b/no_atomic_cas.rs
@@ -2,12 +2,16 @@
// It is not intended for manual editing.
const NO_ATOMIC_CAS: &[&str] = &[
+ "armv4t-none-eabi",
+ "armv5te-none-eabi",
"avr-unknown-gnu-atmega328",
"bpfeb-unknown-none",
"bpfel-unknown-none",
"msp430-none-elf",
"riscv32i-unknown-none-elf",
+ "riscv32im-unknown-none-elf",
"riscv32imc-unknown-none-elf",
"thumbv4t-none-eabi",
+ "thumbv5te-none-eabi",
"thumbv6m-none-eabi",
];
diff --git a/src/abortable.rs b/src/abortable.rs
index bb82dd0..e0afd47 100644
--- a/src/abortable.rs
+++ b/src/abortable.rs
@@ -75,7 +75,7 @@ impl<T> Abortable<T> {
/// in calls to `Abortable::new`.
#[derive(Debug)]
pub struct AbortRegistration {
- inner: Arc<AbortInner>,
+ pub(crate) inner: Arc<AbortInner>,
}
/// A handle to an `Abortable` task.
@@ -100,9 +100,9 @@ impl AbortHandle {
// Inner type storing the waker to awaken and a bool indicating that it
// should be aborted.
#[derive(Debug)]
-struct AbortInner {
- waker: AtomicWaker,
- aborted: AtomicBool,
+pub(crate) struct AbortInner {
+ pub(crate) waker: AtomicWaker,
+ pub(crate) aborted: AtomicBool,
}
/// Indicator that the `Abortable` task was aborted.
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs
index 9b31932..9859315 100644
--- a/src/future/future/shared.rs
+++ b/src/future/future/shared.rs
@@ -262,19 +262,20 @@ where
let waker = waker_ref(&inner.notifier);
let mut cx = Context::from_waker(&waker);
- struct Reset<'a>(&'a AtomicUsize);
+ struct Reset<'a> {
+ state: &'a AtomicUsize,
+ did_not_panic: bool,
+ }
impl Drop for Reset<'_> {
fn drop(&mut self) {
- use std::thread;
-
- if thread::panicking() {
- self.0.store(POISONED, SeqCst);
+ if !self.did_not_panic {
+ self.state.store(POISONED, SeqCst);
}
}
}
- let _reset = Reset(&inner.notifier.state);
+ let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false };
let output = {
let future = unsafe {
@@ -284,12 +285,15 @@ where
}
};
- match future.poll(&mut cx) {
+ let poll_result = future.poll(&mut cx);
+ reset.did_not_panic = true;
+
+ match poll_result {
Poll::Pending => {
if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
{
// Success
- drop(_reset);
+ drop(reset);
this.inner = Some(inner);
return Poll::Pending;
} else {
@@ -313,7 +317,7 @@ where
waker.wake();
}
- drop(_reset); // Make borrow checker happy
+ drop(reset); // Make borrow checker happy
drop(wakers_guard);
// Safety: We're in the COMPLETE state
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index 2e52ac1..7dc159b 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone};
#[cfg(not(futures_no_atomic_cas))]
use crate::stream::{Collect, FuturesOrdered, StreamExt};
-fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
+pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
@@ -32,9 +32,9 @@ where
}
#[cfg(not(futures_no_atomic_cas))]
-const SMALL: usize = 30;
+pub(crate) const SMALL: usize = 30;
-pub(crate) enum JoinAllKind<F>
+enum JoinAllKind<F>
where
F: Future,
{
@@ -104,26 +104,25 @@ where
I: IntoIterator,
I::Item: Future,
{
+ let iter = iter.into_iter();
+
#[cfg(futures_no_atomic_cas)]
{
- let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
- let kind = JoinAllKind::Small { elems };
+ let kind =
+ JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() };
+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
+
#[cfg(not(futures_no_atomic_cas))]
{
- let iter = iter.into_iter();
let kind = match iter.size_hint().1 {
- None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
- Some(max) => {
- if max <= SMALL {
- let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
- JoinAllKind::Small { elems }
- } else {
- JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
- }
- }
+ Some(max) if max <= SMALL => JoinAllKind::Small {
+ elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
};
+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
}
diff --git a/src/future/pending.rs b/src/future/pending.rs
index 92c78d5..b8e2868 100644
--- a/src/future/pending.rs
+++ b/src/future/pending.rs
@@ -33,6 +33,7 @@ impl<T> FusedFuture for Pending<T> {
/// unreachable!();
/// # });
/// ```
+#[cfg_attr(docsrs, doc(alias = "never"))]
pub fn pending<T>() -> Pending<T> {
assert_future::<T, _>(Pending { _data: marker::PhantomData })
}
diff --git a/src/future/select.rs b/src/future/select.rs
index bd44f20..e693a30 100644
--- a/src/future/select.rs
+++ b/src/future/select.rs
@@ -100,16 +100,17 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
- match a.poll_unpin(cx) {
- Poll::Ready(x) => Poll::Ready(Either::Left((x, b))),
- Poll::Pending => match b.poll_unpin(cx) {
- Poll::Ready(x) => Poll::Ready(Either::Right((x, a))),
- Poll::Pending => {
- self.inner = Some((a, b));
- Poll::Pending
- }
- },
+
+ if let Poll::Ready(val) = a.poll_unpin(cx) {
+ return Poll::Ready(Either::Left((val, b)));
+ }
+
+ if let Poll::Ready(val) = b.poll_unpin(cx) {
+ return Poll::Ready(Either::Right((val, a)));
}
+
+ self.inner = Some((a, b));
+ Poll::Pending
}
}
diff --git a/src/future/select_all.rs b/src/future/select_all.rs
index 106e508..07d65ca 100644
--- a/src/future/select_all.rs
+++ b/src/future/select_all.rs
@@ -59,7 +59,7 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
match item {
Some((idx, res)) => {
let _ = self.inner.swap_remove(idx);
- let rest = mem::replace(&mut self.inner, Vec::new());
+ let rest = mem::take(&mut self.inner);
Poll::Ready((res, idx, rest))
}
None => Poll::Pending,
diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs
index 0ad83c6..5d55799 100644
--- a/src/future/select_ok.rs
+++ b/src/future/select_ok.rs
@@ -59,7 +59,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
drop(self.inner.remove(idx));
match res {
Ok(e) => {
- let rest = mem::replace(&mut self.inner, Vec::new());
+ let rest = mem::take(&mut self.inner);
return Poll::Ready(Ok((e, rest)));
}
Err(e) => {
diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs
index fb3bdd8..e5bc700 100644
--- a/src/future/try_future/mod.rs
+++ b/src/future/try_future/mod.rs
@@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture {
/// assert_eq!(future.await, Ok(1));
/// # });
/// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
where
F: FnOnce(Self::Error) -> E,
@@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture {
/// let future_err_i32 = future_err_u8.err_into::<i32>();
/// # });
/// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
fn err_into<E>(self) -> ErrInto<Self, E>
where
Self: Sized,
diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs
index 29244af..25fcfcb 100644
--- a/src/future/try_join_all.rs
+++ b/src/future/try_join_all.rs
@@ -10,14 +10,11 @@ use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
-use super::{assert_future, TryFuture, TryMaybeDone};
+use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone};
-fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
- // Safety: `std` _could_ make this unsound if it were to decide Pin's
- // invariants aren't required to transmit through slices. Otherwise this has
- // the same safety as a normal field pin projection.
- unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
-}
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt};
+use crate::TryFutureExt;
enum FinalState<E = ()> {
Pending,
@@ -31,7 +28,20 @@ pub struct TryJoinAll<F>
where
F: TryFuture,
{
- elems: Pin<Box<[TryMaybeDone<F>]>>,
+ kind: TryJoinAllKind<F>,
+}
+
+enum TryJoinAllKind<F>
+where
+ F: TryFuture,
+{
+ Small {
+ elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
+ },
}
impl<F> fmt::Debug for TryJoinAll<F>
@@ -39,9 +49,16 @@ where
F: TryFuture + fmt::Debug,
F::Ok: fmt::Debug,
F::Error: fmt::Debug,
+ F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TryJoinAll").field("elems", &self.elems).finish()
+ match self.kind {
+ TryJoinAllKind::Small { ref elems } => {
+ f.debug_struct("TryJoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
}
}
@@ -83,15 +100,37 @@ where
/// assert_eq!(try_join_all(futures).await, Err(2));
/// # });
/// ```
-pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
+pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item>
where
I: IntoIterator,
I::Item: TryFuture,
{
- let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
- assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
- TryJoinAll { elems: elems.into() },
- )
+ let iter = iter.into_iter().map(TryFutureExt::into_future);
+
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let kind = TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
+
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let kind = match iter.size_hint().1 {
+ Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() },
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
}
impl<F> Future for TryJoinAll<F>
@@ -101,36 +140,46 @@ where
type Output = Result<Vec<F::Ok>, F::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut state = FinalState::AllDone;
-
- for elem in iter_pin_mut(self.elems.as_mut()) {
- match elem.try_poll(cx) {
- Poll::Pending => state = FinalState::Pending,
- Poll::Ready(Ok(())) => {}
- Poll::Ready(Err(e)) => {
- state = FinalState::Error(e);
- break;
+ match &mut self.kind {
+ TryJoinAllKind::Small { elems } => {
+ let mut state = FinalState::AllDone;
+
+ for elem in join_all::iter_pin_mut(elems.as_mut()) {
+ match elem.try_poll(cx) {
+ Poll::Pending => state = FinalState::Pending,
+ Poll::Ready(Ok(())) => {}
+ Poll::Ready(Err(e)) => {
+ state = FinalState::Error(e);
+ break;
+ }
+ }
}
- }
- }
- match state {
- FinalState::Pending => Poll::Pending,
- FinalState::AllDone => {
- let mut elems = mem::replace(&mut self.elems, Box::pin([]));
- let results =
- iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
- Poll::Ready(Ok(results))
- }
- FinalState::Error(e) => {
- let _ = mem::replace(&mut self.elems, Box::pin([]));
- Poll::Ready(Err(e))
+ match state {
+ FinalState::Pending => Poll::Pending,
+ FinalState::AllDone => {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let results = join_all::iter_pin_mut(elems.as_mut())
+ .map(|e| e.take_output().unwrap())
+ .collect();
+ Poll::Ready(Ok(results))
+ }
+ FinalState::Error(e) => {
+ let _ = mem::replace(elems, Box::pin([]));
+ Poll::Ready(Err(e))
+ }
+ }
}
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
-impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
+impl<F> FromIterator<F> for TryJoinAll<F>
+where
+ F: TryFuture,
+{
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
try_join_all(iter)
}
diff --git a/src/io/copy_buf_abortable.rs b/src/io/copy_buf_abortable.rs
new file mode 100644
index 0000000..fdbc4a5
--- /dev/null
+++ b/src/io/copy_buf_abortable.rs
@@ -0,0 +1,124 @@
+use crate::abortable::{AbortHandle, AbortInner, Aborted};
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncWrite};
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`.
+///
+/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
+/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+/// use futures::future::Aborted;
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer);
+/// let bytes = fut.await;
+/// abort_handle.abort();
+/// writer.close().await.unwrap();
+/// match bytes {
+/// Ok(Ok(n)) => {
+/// assert_eq!(n, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// Ok(n)
+/// },
+/// Ok(Err(a)) => {
+/// Err::<u64, Aborted>(a)
+/// }
+/// Err(e) => panic!("{}", e)
+/// }
+/// # }).unwrap();
+/// ```
+pub fn copy_buf_abortable<R, W>(
+ reader: R,
+ writer: &mut W,
+) -> (CopyBufAbortable<'_, R, W>, AbortHandle)
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ let (handle, reg) = AbortHandle::new_pair();
+ (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle)
+}
+
+pin_project! {
+ /// Future for the [`copy_buf()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CopyBufAbortable<'a, R, W: ?Sized> {
+ #[pin]
+ reader: R,
+ writer: &'a mut W,
+ amt: u64,
+ inner: Arc<AbortInner>
+ }
+}
+
+macro_rules! ready_or_break {
+ ($e:expr $(,)?) => {
+ match $e {
+ $crate::task::Poll::Ready(t) => t,
+ $crate::task::Poll::Pending => break,
+ }
+ };
+}
+
+impl<R, W> Future for CopyBufAbortable<'_, R, W>
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + Sized,
+{
+ type Output = Result<Result<u64, Aborted>, io::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ // Check if the task has been aborted
+ if this.inner.aborted.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(Err(Aborted)));
+ }
+
+ // Read some bytes from the reader, and if we have reached EOF, return total bytes read
+ let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?;
+ if buffer.is_empty() {
+ ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?;
+ return Poll::Ready(Ok(Ok(*this.amt)));
+ }
+
+ // Pass the buffer to the writer, and update the amount written
+ let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
+ if i == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ *this.amt += i as u64;
+ this.reader.as_mut().consume(i);
+ }
+ // Schedule the task to be woken up again.
+ // Never called unless Poll::Pending is returned from io objects.
+ this.inner.waker.register(cx.waker());
+
+ // Check to see if the task was aborted between the first check and
+ // registration.
+ // Checking with `Relaxed` is sufficient because
+ // `register` introduces an `AcqRel` barrier.
+ if this.inner.aborted.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(Err(Aborted)));
+ }
+ Poll::Pending
+ }
+}
diff --git a/src/io/lines.rs b/src/io/lines.rs
index 13e70df..b5561bf 100644
--- a/src/io/lines.rs
+++ b/src/io/lines.rs
@@ -42,6 +42,6 @@ impl<R: AsyncBufRead> Stream for Lines<R> {
this.buf.pop();
}
}
- Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
+ Poll::Ready(Some(Ok(mem::take(this.buf))))
}
}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 4dd2e02..8ce3ad6 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -66,6 +66,9 @@ pub use self::copy::{copy, Copy};
mod copy_buf;
pub use self::copy_buf::{copy_buf, CopyBuf};
+mod copy_buf_abortable;
+pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
+
mod cursor;
pub use self::cursor::Cursor;
diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs
index 02e38c3..cd0b20e 100644
--- a/src/io/read_exact.rs
+++ b/src/io/read_exact.rs
@@ -30,7 +30,7 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> {
while !this.buf.is_empty() {
let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?;
{
- let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n);
+ let (_, rest) = mem::take(&mut this.buf).split_at_mut(n);
this.buf = rest;
}
if n == 0 {
diff --git a/src/io/read_line.rs b/src/io/read_line.rs
index c75af94..e1b8fc9 100644
--- a/src/io/read_line.rs
+++ b/src/io/read_line.rs
@@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
- Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 }
+ Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 }
}
}
diff --git a/src/io/read_to_string.rs b/src/io/read_to_string.rs
index 457af59..c175396 100644
--- a/src/io/read_to_string.rs
+++ b/src/io/read_to_string.rs
@@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}
impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
let start_len = buf.len();
- Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len }
+ Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len }
}
}
diff --git a/src/io/write_all.rs b/src/io/write_all.rs
index b134bf1..08c025f 100644
--- a/src/io/write_all.rs
+++ b/src/io/write_all.rs
@@ -30,7 +30,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
while !this.buf.is_empty() {
let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
{
- let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n);
+ let (_, rest) = mem::take(&mut this.buf).split_at(n);
this.buf = rest;
}
if n == 0 {
diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs
index 2f51ae7..2174079 100644
--- a/src/lock/bilock.rs
+++ b/src/lock/bilock.rs
@@ -224,6 +224,9 @@ pub struct BiLockGuard<'a, T> {
bilock: &'a BiLock<T>,
}
+// We allow parallel access to T via Deref, so Sync bound is also needed here.
+unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {}
+
impl<T> Deref for BiLockGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
diff --git a/src/lock/mod.rs b/src/lock/mod.rs
index cf374c0..0be7271 100644
--- a/src/lock/mod.rs
+++ b/src/lock/mod.rs
@@ -4,11 +4,18 @@
//! library is activated, and it is activated by default.
#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "std")]
-mod mutex;
+#[cfg(any(feature = "sink", feature = "io"))]
+#[cfg(not(feature = "bilock"))]
+pub(crate) use self::bilock::BiLock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "std")]
-pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
+pub use self::mutex::{
+ MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture,
+};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
@@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
mod bilock;
#[cfg(not(futures_no_atomic_cas))]
-#[cfg(any(feature = "sink", feature = "io"))]
-#[cfg(not(feature = "bilock"))]
-pub(crate) use self::bilock::BiLock;
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "bilock")]
-#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
-pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
+#[cfg(feature = "std")]
+mod mutex;
diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs
index 85dcb15..335ad14 100644
--- a/src/lock/mutex.rs
+++ b/src/lock/mutex.rs
@@ -1,14 +1,16 @@
-use futures_core::future::{FusedFuture, Future};
-use futures_core::task::{Context, Poll, Waker};
-use slab::Slab;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Mutex as StdMutex;
+use std::sync::{Arc, Mutex as StdMutex};
use std::{fmt, mem};
+use slab::Slab;
+
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll, Waker};
+
/// A futures-aware mutex.
///
/// # Fairness
@@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> {
}
}
+ /// Attempt to acquire the lock immediately.
+ ///
+ /// If the lock is currently held, this will return `None`.
+ pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
+ let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
+ if (old_state & IS_LOCKED) == 0 {
+ Some(OwnedMutexGuard { mutex: self.clone() })
+ } else {
+ None
+ }
+ }
+
/// Acquire the lock asynchronously.
///
/// This method returns a future that will resolve once the lock has been
@@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> {
MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
}
+ /// Acquire the lock asynchronously.
+ ///
+ /// This method returns a future that will resolve once the lock has been
+ /// successfully acquired.
+ pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
+ OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
+ }
+
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `Mutex` mutably, no actual locking needs to
@@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> {
}
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
-const WAIT_KEY_NONE: usize = usize::max_value();
+const WAIT_KEY_NONE: usize = usize::MAX;
+
+/// A future which resolves when the target mutex has been successfully acquired, owned version.
+pub struct OwnedMutexLockFuture<T: ?Sized> {
+ // `None` indicates that the mutex was successfully acquired.
+ mutex: Option<Arc<Mutex<T>>>,
+ wait_key: usize,
+}
+
+impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OwnedMutexLockFuture")
+ .field("was_acquired", &self.mutex.is_none())
+ .field("mutex", &self.mutex)
+ .field(
+ "wait_key",
+ &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
+ )
+ .finish()
+ }
+}
+
+impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
+ fn is_terminated(&self) -> bool {
+ self.mutex.is_none()
+ }
+}
+
+impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
+ type Output = OwnedMutexGuard<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.get_mut();
+
+ let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
+
+ if let Some(lock) = mutex.try_lock_owned() {
+ mutex.remove_waker(this.wait_key, false);
+ this.mutex = None;
+ return Poll::Ready(lock);
+ }
+
+ {
+ let mut waiters = mutex.waiters.lock().unwrap();
+ if this.wait_key == WAIT_KEY_NONE {
+ this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
+ if waiters.len() == 1 {
+ mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
+ }
+ } else {
+ waiters[this.wait_key].register(cx.waker());
+ }
+ }
+
+ // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
+ // attempting to acquire the lock again.
+ if let Some(lock) = mutex.try_lock_owned() {
+ mutex.remove_waker(this.wait_key, false);
+ this.mutex = None;
+ return Poll::Ready(lock);
+ }
+
+ Poll::Pending
+ }
+}
+
+impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
+ fn drop(&mut self) {
+ if let Some(mutex) = self.mutex.as_ref() {
+ // This future was dropped before it acquired the mutex.
+ //
+ // Remove ourselves from the map, waking up another waiter if we
+ // had been awoken to acquire the lock.
+ mutex.remove_waker(self.wait_key, true);
+ }
+ }
+}
+
+/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
+/// When this structure is dropped (falls out of scope), the lock will be
+/// unlocked.
+pub struct OwnedMutexGuard<T: ?Sized> {
+ mutex: Arc<Mutex<T>>,
+}
+
+impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OwnedMutexGuard")
+ .field("value", &&**self)
+ .field("mutex", &self.mutex)
+ .finish()
+ }
+}
+
+impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
+ fn drop(&mut self) {
+ self.mutex.unlock()
+ }
+}
+
+impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ unsafe { &*self.mutex.value.get() }
+ }
+}
+
+impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.mutex.value.get() }
+ }
+}
/// A future which resolves when the target mutex has been successfully acquired.
pub struct MutexLockFuture<'a, T: ?Sized> {
@@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
// It's safe to switch which thread the acquire is being attempted on so long as
// `T` can be accessed on that thread.
unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
+
// doesn't have any interesting `&self` methods (only Debug)
unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
+// It's safe to switch which thread the acquire is being attempted on so long as
+// `T` can be accessed on that thread.
+unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
+
+// doesn't have any interesting `&self` methods (only Debug)
+unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
+
// Safe to send since we don't track any thread-specific details-- the inner
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
+
+unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
+unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
+
unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
diff --git a/src/sink/drain.rs b/src/sink/drain.rs
index 5295115..1a5480c 100644
--- a/src/sink/drain.rs
+++ b/src/sink/drain.rs
@@ -32,6 +32,12 @@ pub fn drain<T>() -> Drain<T> {
impl<T> Unpin for Drain<T> {}
+impl<T> Clone for Drain<T> {
+ fn clone(&self) -> Self {
+ drain()
+ }
+}
+
impl<T> Sink<T> for Drain<T> {
type Error = Never;
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index f596b3b..f1c93fd 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -135,11 +135,39 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// This function will not call `poll` on the submitted future. The caller
/// must ensure that `FuturesOrdered::poll` is called in order to receive
/// task notifications.
+ #[deprecated(note = "use `push_back` instead")]
pub fn push(&mut self, future: Fut) {
+ self.push_back(future);
+ }
+
+ /// Pushes a future to the back of the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications.
+ pub fn push_back(&mut self, future: Fut) {
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
self.next_incoming_index += 1;
self.in_progress_queue.push(wrapped);
}
+
+ /// Pushes a future to the front of the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications. This future will be the next future to be returned
+ /// complete.
+ pub fn push_front(&mut self, future: Fut) {
+ if self.next_outgoing_index == 0 {
+ self.push_back(future)
+ } else {
+ let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
+ self.next_outgoing_index -= 1;
+ self.in_progress_queue.push(wrapped);
+ }
+ }
}
impl<Fut: Future> Default for FuturesOrdered<Fut> {
@@ -196,7 +224,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
{
let acc = Self::new();
iter.into_iter().fold(acc, |mut acc, item| {
- acc.push(item);
+ acc.push_back(item);
acc
})
}
@@ -214,7 +242,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
I: IntoIterator<Item = Fut>,
{
for item in iter {
- self.push(item);
+ self.push_back(item);
}
}
}
diff --git a/src/stream/futures_unordered/iter.rs b/src/stream/futures_unordered/iter.rs
index 04db5ee..20248c7 100644
--- a/src/stream/futures_unordered/iter.rs
+++ b/src/stream/futures_unordered/iter.rs
@@ -2,6 +2,7 @@ use super::task::Task;
use super::FuturesUnordered;
use core::marker::PhantomData;
use core::pin::Pin;
+use core::ptr;
use core::sync::atomic::Ordering::Relaxed;
/// Mutable iterator over all futures in the unordered set.
@@ -58,6 +59,9 @@ impl<Fut: Unpin> Iterator for IntoIter<Fut> {
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
+ if !task.is_null() {
+ *(**task).prev_all.get() = ptr::null_mut();
+ }
self.len -= 1;
Some(future)
}
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index fdbd53d..5e995fd 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -22,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;
mod iter;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};
mod task;
diff --git a/src/stream/select_with_strategy.rs b/src/stream/select_with_strategy.rs
index bd86990..224d5f8 100644
--- a/src/stream/select_with_strategy.rs
+++ b/src/stream/select_with_strategy.rs
@@ -1,5 +1,4 @@
use super::assert_stream;
-use crate::stream::{Fuse, StreamExt};
use core::{fmt, pin::Pin};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -18,13 +17,15 @@ impl PollNext {
/// Toggle the value and return the old one.
pub fn toggle(&mut self) -> Self {
let old = *self;
+ *self = self.other();
+ old
+ }
+ fn other(&self) -> PollNext {
match self {
- PollNext::Left => *self = PollNext::Right,
- PollNext::Right => *self = PollNext::Left,
+ PollNext::Left => PollNext::Right,
+ PollNext::Right => PollNext::Left,
}
-
- old
}
}
@@ -34,14 +35,41 @@ impl Default for PollNext {
}
}
+enum InternalState {
+ Start,
+ LeftFinished,
+ RightFinished,
+ BothFinished,
+}
+
+impl InternalState {
+ fn finish(&mut self, ps: PollNext) {
+ match (&self, ps) {
+ (InternalState::Start, PollNext::Left) => {
+ *self = InternalState::LeftFinished;
+ }
+ (InternalState::Start, PollNext::Right) => {
+ *self = InternalState::RightFinished;
+ }
+ (InternalState::LeftFinished, PollNext::Right)
+ | (InternalState::RightFinished, PollNext::Left) => {
+ *self = InternalState::BothFinished;
+ }
+ _ => {}
+ }
+ }
+}
+
pin_project! {
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
#[must_use = "streams do nothing unless polled"]
+ #[project = SelectWithStrategyProj]
pub struct SelectWithStrategy<St1, St2, Clos, State> {
#[pin]
- stream1: Fuse<St1>,
+ stream1: St1,
#[pin]
- stream2: Fuse<St2>,
+ stream2: St2,
+ internal_state: InternalState,
state: State,
clos: Clos,
}
@@ -120,9 +148,10 @@ where
State: Default,
{
assert_stream::<St1::Item, _>(SelectWithStrategy {
- stream1: stream1.fuse(),
- stream2: stream2.fuse(),
+ stream1,
+ stream2,
state: Default::default(),
+ internal_state: InternalState::Start,
clos: which,
})
}
@@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Acquires a reference to the underlying streams that this combinator is
/// pulling from.
pub fn get_ref(&self) -> (&St1, &St2) {
- (self.stream1.get_ref(), self.stream2.get_ref())
+ (&self.stream1, &self.stream2)
}
/// Acquires a mutable reference to the underlying streams that this
@@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
- (self.stream1.get_mut(), self.stream2.get_mut())
+ (&mut self.stream1, &mut self.stream2)
}
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -150,7 +179,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
let this = self.project();
- (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
+ (this.stream1, this.stream2)
}
/// Consumes this combinator, returning the underlying streams.
@@ -158,7 +187,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> (St1, St2) {
- (self.stream1.into_inner(), self.stream2.into_inner())
+ (self.stream1, self.stream2)
}
}
@@ -169,47 +198,93 @@ where
Clos: FnMut(&mut State) -> PollNext,
{
fn is_terminated(&self) -> bool {
- self.stream1.is_terminated() && self.stream2.is_terminated()
+ match self.internal_state {
+ InternalState::BothFinished => true,
+ _ => false,
+ }
}
}
-impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+#[inline]
+fn poll_side<St1, St2, Clos, State>(
+ select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
+ side: PollNext,
+ cx: &mut Context<'_>,
+) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
- Clos: FnMut(&mut State) -> PollNext,
{
- type Item = St1::Item;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
- let this = self.project();
-
- match (this.clos)(this.state) {
- PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
- PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
- }
+ match side {
+ PollNext::Left => select.stream1.as_mut().poll_next(cx),
+ PollNext::Right => select.stream2.as_mut().poll_next(cx),
}
}
-fn poll_inner<St1, St2>(
- a: Pin<&mut St1>,
- b: Pin<&mut St2>,
+#[inline]
+fn poll_inner<St1, St2, Clos, State>(
+ select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
+ side: PollNext,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
- let a_done = match a.poll_next(cx) {
+ let first_done = match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
- Poll::Ready(None) => true,
+ Poll::Ready(None) => {
+ select.internal_state.finish(side);
+ true
+ }
Poll::Pending => false,
};
+ let other = side.other();
+ match poll_side(select, other, cx) {
+ Poll::Ready(None) => {
+ select.internal_state.finish(other);
+ if first_done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+ a => a,
+ }
+}
+
+impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+ Clos: FnMut(&mut State) -> PollNext,
+{
+ type Item = St1::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
+ let mut this = self.project();
- match b.poll_next(cx) {
- Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
- Poll::Ready(None) if a_done => Poll::Ready(None),
- Poll::Ready(None) | Poll::Pending => Poll::Pending,
+ match this.internal_state {
+ InternalState::Start => {
+ let next_side = (this.clos)(this.state);
+ poll_inner(&mut this, next_side, cx)
+ }
+ InternalState::LeftFinished => match this.stream2.poll_next(cx) {
+ Poll::Ready(None) => {
+ *this.internal_state = InternalState::BothFinished;
+ Poll::Ready(None)
+ }
+ a => a,
+ },
+ InternalState::RightFinished => match this.stream1.poll_next(cx) {
+ Poll::Ready(None) => {
+ *this.internal_state = InternalState::BothFinished;
+ Poll::Ready(None)
+ }
+ a => a,
+ },
+ InternalState::BothFinished => Poll::Ready(None),
+ }
}
}
diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs
index d64c142..91b0f6b 100644
--- a/src/stream/stream/buffer_unordered.rs
+++ b/src/stream/stream/buffer_unordered.rs
@@ -41,11 +41,7 @@ where
St: Stream,
St::Item: Future,
{
- pub(super) fn new(stream: St, n: usize) -> Self
- where
- St: Stream,
- St::Item: Future,
- {
+ pub(super) fn new(stream: St, n: usize) -> Self {
Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs
index 6052a73..8ca0391 100644
--- a/src/stream/stream/buffered.rs
+++ b/src/stream/stream/buffered.rs
@@ -64,7 +64,7 @@ where
// our queue of futures.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx) {
- Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs
index 8457869..2a71ebc 100644
--- a/src/stream/stream/chunks.rs
+++ b/src/stream/stream/chunks.rs
@@ -21,10 +21,7 @@ pin_project! {
}
}
-impl<St: Stream> Chunks<St>
-where
- St: Stream,
-{
+impl<St: Stream> Chunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
@@ -66,7 +63,7 @@ impl<St: Stream> Stream for Chunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -77,9 +74,9 @@ impl<St: Stream> Stream for Chunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs
index b0e81b9..970ac26 100644
--- a/src/stream/stream/collect.rs
+++ b/src/stream/stream/collect.rs
@@ -19,7 +19,7 @@ pin_project! {
impl<St: Stream, C: Default> Collect<St, C> {
fn finish(self: Pin<&mut Self>) -> C {
- mem::replace(self.project().collection, Default::default())
+ mem::take(self.project().collection)
}
pub(super) fn new(stream: St) -> Self {
diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs
index ccf1a51..997fe99 100644
--- a/src/stream/stream/filter.rs
+++ b/src/stream/stream/filter.rs
@@ -93,7 +93,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs
index 02a0a43..6b7d007 100644
--- a/src/stream/stream/filter_map.rs
+++ b/src/stream/stream/filter_map.rs
@@ -87,7 +87,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 642b91e..a823fab 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -1674,6 +1674,8 @@ pub trait StreamExt: Stream {
/// assert_eq!(total, 6);
/// # });
/// ```
+ ///
+ /// [`select!`]: crate::select
fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
where
Self: Unpin + FusedStream,
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index c72dfc3..ea3d624 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let peek_len = if self.peeked.is_some() { 1 } else { 0 };
+ let peek_len = usize::from(self.peeked.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(peek_len);
let upper = match upper {
diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs
index 5ebc958..49116d4 100644
--- a/src/stream/stream/ready_chunks.rs
+++ b/src/stream/stream/ready_chunks.rs
@@ -20,10 +20,7 @@ pin_project! {
}
}
-impl<St: Stream> ReadyChunks<St>
-where
- St: Stream,
-{
+impl<St: Stream> ReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
@@ -74,7 +71,7 @@ impl<St: Stream> Stream for ReadyChunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -85,9 +82,9 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs
index 50a21a2..dabd5ee 100644
--- a/src/stream/stream/skip_while.rs
+++ b/src/stream/stream/skip_while.rs
@@ -99,7 +99,7 @@ where
if self.done_skipping {
self.stream.size_hint()
} else {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index 3a72fee..e2034e0 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -35,7 +35,7 @@ impl<S: Stream> Stream for SplitStream<S> {
}
}
-#[allow(bad_style)]
+#[allow(non_snake_case)]
fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
SplitSink { lock, slot: None }
}
diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs
index b1c728e..29d6c39 100644
--- a/src/stream/stream/take.rs
+++ b/src/stream/stream/take.rs
@@ -54,11 +54,11 @@ where
let (lower, upper) = self.stream.size_hint();
- let lower = cmp::min(lower, self.remaining as usize);
+ let lower = cmp::min(lower, self.remaining);
let upper = match upper {
- Some(x) if x < self.remaining as usize => Some(x),
- _ => Some(self.remaining as usize),
+ Some(x) if x < self.remaining => Some(x),
+ _ => Some(self.remaining),
};
(lower, upper)
diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs
index 01b2765..9256943 100644
--- a/src/stream/stream/take_while.rs
+++ b/src/stream/stream/take_while.rs
@@ -91,7 +91,7 @@ where
return (0, Some(0));
}
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs
index d4531d4..9192c0b 100644
--- a/src/stream/stream/then.rs
+++ b/src/stream/stream/then.rs
@@ -78,7 +78,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs
index 15f22e8..a88cf03 100644
--- a/src/stream/stream/unzip.rs
+++ b/src/stream/stream/unzip.rs
@@ -21,7 +21,7 @@ pin_project! {
impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
let this = self.project();
- (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default()))
+ (mem::take(this.left), mem::take(this.right))
}
pub(super) fn new(stream: St) -> Self {
diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs
index 360a8b6..25a47e9 100644
--- a/src/stream/stream/zip.rs
+++ b/src/stream/stream/zip.rs
@@ -102,8 +102,8 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let queued1_len = if self.queued1.is_some() { 1 } else { 0 };
- let queued2_len = if self.queued2.is_some() { 1 } else { 0 };
+ let queued1_len = usize::from(self.queued1.is_some());
+ let queued2_len = usize::from(self.queued2.is_some());
let (stream1_lower, stream1_upper) = self.stream1.size_hint();
let (stream2_lower, stream2_upper) = self.stream2.size_hint();
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs
index a7b50db..2f8b6f2 100644
--- a/src/stream/try_stream/and_then.rs
+++ b/src/stream/try_stream/and_then.rs
@@ -71,7 +71,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs
index 914b277..ffbfc7e 100644
--- a/src/stream/try_stream/into_async_read.rs
+++ b/src/stream/try_stream/into_async_read.rs
@@ -1,30 +1,26 @@
-use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use pin_project_lite::pin_project;
use std::cmp;
use std::io::{Error, Result};
-/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
-#[derive(Debug)]
-#[must_use = "readers do nothing unless polled"]
-#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
-pub struct IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
- stream: St,
- state: ReadState<St::Ok>,
-}
-
-impl<St> Unpin for IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
+pin_project! {
+ /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless polled"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+ pub struct IntoAsyncRead<St>
+ where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+ {
+ #[pin]
+ stream: St,
+ state: ReadState<St::Ok>,
+ }
}
#[derive(Debug)]
@@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {
impl<St> IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
@@ -46,16 +42,18 @@ where
impl<St> AsyncRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_read(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
+ let mut this = self.project();
+
loop {
- match &mut self.state {
+ match this.state {
ReadState::Ready { chunk, chunk_start } => {
let chunk = chunk.as_ref();
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
@@ -64,23 +62,23 @@ where
*chunk_start += len;
if chunk.len() == *chunk_start {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
return Poll::Ready(Ok(len));
}
- ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
+ ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(0));
}
},
@@ -94,51 +92,52 @@ where
impl<St> AsyncWrite for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + AsyncWrite + Unpin,
+ St: TryStream<Error = Error> + AsyncWrite,
St::Ok: AsRef<[u8]>,
{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize>> {
- Pin::new(&mut self.stream).poll_write(cx, buf)
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let this = self.project();
+ this.stream.poll_write(cx, buf)
}
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_flush(cx)
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_flush(cx)
}
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_close(cx)
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_close(cx)
}
}
impl<St> AsyncBufRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- while let ReadState::PendingChunk = self.state {
- match ready!(self.stream.try_poll_next_unpin(cx)) {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
+ let mut this = self.project();
+
+ while let ReadState::PendingChunk = this.state {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}
- if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
+ if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}
@@ -147,16 +146,18 @@ where
Poll::Ready(Ok(&[]))
}
- fn consume(mut self: Pin<&mut Self>, amount: usize) {
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ let this = self.project();
+
// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
if amount == 0 {
return;
}
- if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
+ if let ReadState::Ready { chunk, chunk_start } = this.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 6bf2cb7..bc4c6e4 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -918,7 +918,7 @@ pub trait TryStreamExt: TryStream {
/// that matches the stream's `Error` type.
///
/// This adaptor will buffer up to `n` futures and then return their
- /// outputs in the order. If the underlying stream returns an error, it will
+ /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
/// be immediately propagated.
///
/// The returned stream will be a stream of results, each containing either
@@ -1031,12 +1031,7 @@ pub trait TryStreamExt: TryStream {
Compat::new(self)
}
- /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
- ///
- /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
- /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
- /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
- /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
+ /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
///
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
@@ -1048,12 +1043,12 @@ pub trait TryStreamExt: TryStream {
/// use futures::stream::{self, TryStreamExt};
/// use futures::io::AsyncReadExt;
///
- /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
+ /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
/// let mut reader = stream.into_async_read();
- /// let mut buf = Vec::new();
///
- /// assert!(reader.read_to_end(&mut buf).await.is_ok());
- /// assert_eq!(buf, &[1, 2, 3, 4, 5]);
+ /// let mut buf = Vec::new();
+ /// reader.read_to_end(&mut buf).await.unwrap();
+ /// assert_eq!(buf, [1, 2, 3, 4, 5]);
/// # })
/// ```
#[cfg(feature = "io")]
@@ -1061,7 +1056,7 @@ pub trait TryStreamExt: TryStream {
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
- Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
+ Self: Sized + TryStreamExt<Error = std::io::Error>,
Self::Ok: AsRef<[u8]>,
{
crate::io::assert_read(IntoAsyncRead::new(self))
diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs
index cb69e81..53aceb8 100644
--- a/src/stream/try_stream/or_else.rs
+++ b/src/stream/try_stream/or_else.rs
@@ -75,7 +75,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/src/stream/try_stream/try_buffered.rs b/src/stream/try_stream/try_buffered.rs
index 45bd3f8..9f48e5c 100644
--- a/src/stream/try_stream/try_buffered.rs
+++ b/src/stream/try_stream/try_buffered.rs
@@ -54,7 +54,7 @@ where
// our queue of futures. Propagate errors from the stream immediately.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx)? {
- Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs
index 07d4425..3bb253a 100644
--- a/src/stream/try_stream/try_chunks.rs
+++ b/src/stream/try_stream/try_chunks.rs
@@ -70,7 +70,7 @@ impl<St: TryStream> Stream for TryChunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -81,9 +81,9 @@ impl<St: TryStream> Stream for TryChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs
index 5d3b3d7..3e5963f 100644
--- a/src/stream/try_stream/try_collect.rs
+++ b/src/stream/try_stream/try_collect.rs
@@ -45,7 +45,7 @@ where
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().try_poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
- None => break mem::replace(this.items, Default::default()),
+ None => break mem::take(this.items),
}
}))
}
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs
index 61e6105..11d5824 100644
--- a/src/stream/try_stream/try_filter.rs
+++ b/src/stream/try_stream/try_filter.rs
@@ -90,7 +90,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_fut.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_fut.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs
index bb1b5b9..ed12017 100644
--- a/src/stream/try_stream/try_filter_map.rs
+++ b/src/stream/try_stream/try_filter_map.rs
@@ -84,7 +84,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index a424b6c..52aa2d4 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -87,7 +87,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs
index 3375960..4b5ff1a 100644
--- a/src/stream/try_stream/try_take_while.rs
+++ b/src/stream/try_stream/try_take_while.rs
@@ -96,7 +96,7 @@ where
return (0, Some(0));
}
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index 87ca360..d9e9985 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -34,7 +34,7 @@ pub trait SpawnExt: Spawn {
/// today. Feel free to use this method in the meantime.
///
/// ```
- /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
+ /// # {
/// use futures::executor::ThreadPool;
/// use futures::task::SpawnExt;
///
@@ -42,6 +42,8 @@ pub trait SpawnExt: Spawn {
///
/// let future = async { /* ... */ };
/// executor.spawn(future).unwrap();
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
#[cfg(feature = "alloc")]
fn spawn<Fut>(&self, future: Fut) -> Result<(), SpawnError>
@@ -59,7 +61,7 @@ pub trait SpawnExt: Spawn {
/// resolves to the output of the spawned future.
///
/// ```
- /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
+ /// # {
/// use futures::executor::{block_on, ThreadPool};
/// use futures::future;
/// use futures::task::SpawnExt;
@@ -69,6 +71,8 @@ pub trait SpawnExt: Spawn {
/// let future = future::ready(1);
/// let join_handle_fut = executor.spawn_with_handle(future).unwrap();
/// assert_eq!(block_on(join_handle_fut), 1);
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
#[cfg(feature = "channel")]
#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
@@ -138,7 +142,6 @@ pub trait LocalSpawnExt: LocalSpawn {
/// resolves to the output of the spawned future.
///
/// ```
- /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
/// use futures::executor::LocalPool;
/// use futures::task::LocalSpawnExt;
///