aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-10-12 14:47:39 +0000
committerGerrit Code Review <noreply-gerritcodereview@google.com>2021-10-12 14:47:39 +0000
commit67b7ffd62996a8b094a119cf0945c072ff8d6f67 (patch)
tree630ac4f8f0e947984e7ee872e9d0ee74cf0f047e
parent50ca4591e06ed2d193b54dbae47686180c3f24e1 (diff)
parentdc20fc8367e78e4b3ac62def28f0bf1082da81d7 (diff)
downloadfutures-util-android-s-v2-preview-1.tar.gz
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp64
-rw-r--r--Cargo.toml14
-rw-r--r--Cargo.toml.orig14
-rw-r--r--METADATA8
-rw-r--r--src/async_await/mod.rs7
-rw-r--r--src/async_await/stream_select_mod.rs45
-rw-r--r--src/future/join_all.rs95
-rw-r--r--src/future/mod.rs3
-rw-r--r--src/future/option.rs6
-rw-r--r--src/future/poll_immediate.rs126
-rw-r--r--src/io/buf_reader.rs69
-rw-r--r--src/io/mod.rs2
-rw-r--r--src/lib.rs8
-rw-r--r--src/stream/mod.rs7
-rw-r--r--src/stream/poll_immediate.rs80
-rw-r--r--src/stream/stream/mod.rs2
-rw-r--r--src/stream/stream/peek.rs92
18 files changed, 534 insertions, 110 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 99dc8b0..ffd4f55 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee"
+ "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
}
}
diff --git a/Android.bp b/Android.bp
index 859f9da..4a73e79 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,6 +41,8 @@ rust_defaults {
name: "futures-util_test_defaults",
crate_name: "futures_util",
srcs: ["src/lib.rs"],
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.17",
test_suites: ["general-tests"],
auto_gen_config: true,
edition: "2018",
@@ -99,6 +101,8 @@ rust_library {
name: "libfutures_util",
host_supported: true,
crate_name: "futures_util",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.17",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -143,63 +147,3 @@ rust_library {
],
min_sdk_version: "29",
}
-
-// dependent_library ["feature_list"]
-// autocfg-1.0.1
-// byteorder-1.4.3 "default,std"
-// bytes-0.4.12
-// cfg-if-0.1.10
-// cfg-if-1.0.0
-// crossbeam-deque-0.7.4
-// crossbeam-epoch-0.8.2 "default,lazy_static,std"
-// crossbeam-queue-0.2.3 "default,std"
-// crossbeam-utils-0.7.2 "default,lazy_static,std"
-// fnv-1.0.7 "default,std"
-// futures-0.1.31 "default,use_std,with-deprecated"
-// futures-channel-0.3.16 "alloc,std"
-// futures-core-0.3.16 "alloc,std"
-// futures-io-0.3.16 "std"
-// futures-macro-0.3.16
-// futures-sink-0.3.16
-// futures-task-0.3.16 "alloc,std"
-// iovec-0.1.4
-// lazy_static-1.4.0
-// libc-0.2.98 "default,std"
-// lock_api-0.3.4
-// log-0.4.14
-// maybe-uninit-2.0.0
-// memchr-2.4.0 "default,std"
-// memoffset-0.5.6 "default"
-// mio-0.6.23 "default,with-deprecated"
-// mio-uds-0.6.8
-// net2-0.2.37 "default,duration"
-// num_cpus-1.13.0
-// parking_lot-0.9.0 "default"
-// parking_lot_core-0.6.2
-// pin-project-lite-0.2.7
-// pin-utils-0.1.0
-// proc-macro-hack-0.5.19
-// proc-macro-nested-0.1.7
-// proc-macro2-1.0.28 "default,proc-macro"
-// quote-1.0.9 "default,proc-macro"
-// rustc_version-0.2.3
-// scopeguard-1.1.0
-// semver-0.9.0 "default"
-// semver-parser-0.7.0
-// slab-0.4.4 "default,std"
-// smallvec-0.6.14 "default,std"
-// syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
-// tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds"
-// tokio-codec-0.1.2
-// tokio-current-thread-0.1.7
-// tokio-executor-0.1.10
-// tokio-fs-0.1.7
-// tokio-io-0.1.13
-// tokio-reactor-0.1.12
-// tokio-sync-0.1.8
-// tokio-tcp-0.1.4
-// tokio-threadpool-0.1.18
-// tokio-timer-0.2.13
-// tokio-udp-0.1.6
-// tokio-uds-0.2.7
-// unicode-xid-0.2.2 "default"
diff --git a/Cargo.toml b/Cargo.toml
index aaaebac..90010e9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,7 +12,7 @@
[package]
edition = "2018"
name = "futures-util"
-version = "0.3.16"
+version = "0.3.17"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "Common utilities and extension traits for the futures-rs library.\n"
homepage = "https://rust-lang.github.io/futures-rs"
@@ -23,33 +23,33 @@ repository = "https://github.com/rust-lang/futures-rs"
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-channel]
-version = "0.3.16"
+version = "0.3.17"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.16"
+version = "0.3.17"
default-features = false
[dependencies.futures-io]
-version = "0.3.16"
+version = "0.3.17"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.16"
+version = "=0.3.17"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.16"
+version = "0.3.17"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.16"
+version = "0.3.17"
default-features = false
[dependencies.futures_01]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 98cace6..a8e9362 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
[package]
name = "futures-util"
edition = "2018"
-version = "0.3.16"
+version = "0.3.17"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
@@ -39,12 +39,12 @@ cfg-target-has-atomic = []
autocfg = "1"
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.16", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.16", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.16", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.16", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.17", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.17", default-features = false, optional = true }
proc-macro-hack = { version = "0.5.19", optional = true }
proc-macro-nested = { version = "0.1.2", optional = true }
slab = { version = "0.4.2", optional = true }
diff --git a/METADATA b/METADATA
index ce359d5..0ace75a 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.16.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.17.crate"
}
- version: "0.3.16"
+ version: "0.3.17"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 8
- day: 9
+ month: 9
+ day: 22
}
}
diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs
index 5f5d4ac..7276da2 100644
--- a/src/async_await/mod.rs
+++ b/src/async_await/mod.rs
@@ -30,6 +30,13 @@ mod select_mod;
#[cfg(feature = "async-await-macro")]
pub use self::select_mod::*;
+// Primary export is a macro
+#[cfg(feature = "async-await-macro")]
+mod stream_select_mod;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
+#[cfg(feature = "async-await-macro")]
+pub use self::stream_select_mod::*;
+
#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
mod random;
diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs
new file mode 100644
index 0000000..7743406
--- /dev/null
+++ b/src/async_await/stream_select_mod.rs
@@ -0,0 +1,45 @@
+//! The `stream_select` macro.
+
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)]
+#[doc(hidden)]
+#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
+pub use futures_macro::stream_select_internal;
+
+/// Combines several streams, all producing the same `Item` type, into one stream.
+/// This is similar to `select_all` but does not require the streams to all be the same type.
+/// It also keeps the streams inline, and does not require `Box<dyn Stream>`s to be allocated.
+/// Streams passed to this macro must be `Unpin`.
+///
+/// If multiple streams are ready, one will be pseudo randomly selected at runtime.
+///
+/// This macro is gated behind the `async-await` feature of this library, which is activated by default.
+/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion
+/// limit very high, e.g. `#![recursion_limit="1024"]`.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{stream, StreamExt, stream_select};
+/// let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()).fuse();
+///
+/// let mut endless_numbers = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
+/// match endless_numbers.next().await {
+/// Some(1) => println!("Got a 1"),
+/// Some(2) => println!("Got a 2"),
+/// Some(3) => println!("Got a 3"),
+/// _ => unreachable!(),
+/// }
+/// # });
+/// ```
+#[cfg(feature = "std")]
+#[macro_export]
+macro_rules! stream_select {
+ ($($tokens:tt)*) => {{
+ use $crate::__private as __futures_crate;
+ $crate::stream_select_internal! {
+ $( $tokens )*
+ }
+ }}
+}
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index 427e71c..2e52ac1 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -12,6 +12,9 @@ use core::task::{Context, Poll};
use super::{assert_future, MaybeDone};
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{Collect, FuturesOrdered, StreamExt};
+
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
@@ -19,13 +22,29 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}
-/// Future for the [`join_all`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
+/// Future for the [`join_all`] function.
pub struct JoinAll<F>
where
F: Future,
{
- elems: Pin<Box<[MaybeDone<F>]>>,
+ kind: JoinAllKind<F>,
+}
+
+#[cfg(not(futures_no_atomic_cas))]
+const SMALL: usize = 30;
+
+pub(crate) enum JoinAllKind<F>
+where
+ F: Future,
+{
+ Small {
+ elems: Pin<Box<[MaybeDone<F>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
+ },
}
impl<F> fmt::Debug for JoinAll<F>
@@ -34,7 +53,13 @@ where
F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("JoinAll").field("elems", &self.elems).finish()
+ match self.kind {
+ JoinAllKind::Small { ref elems } => {
+ f.debug_struct("JoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
}
}
@@ -50,10 +75,9 @@ where
///
/// # See Also
///
-/// This is purposefully a very simple API for basic use-cases. In a lot of
-/// cases you will want to use the more powerful
-/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
-/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
+/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
///
/// Some examples for additional functionality provided by these are:
///
@@ -75,13 +99,33 @@ where
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
/// # });
/// ```
-pub fn join_all<I>(i: I) -> JoinAll<I::Item>
+pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
- let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
- assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
+ let kind = JoinAllKind::Small { elems };
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let iter = iter.into_iter();
+ let kind = match iter.size_hint().1 {
+ None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
+ Some(max) => {
+ if max <= SMALL {
+ let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
+ JoinAllKind::Small { elems }
+ } else {
+ JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
+ }
+ }
+ };
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+ }
}
impl<F> Future for JoinAll<F>
@@ -91,20 +135,27 @@ where
type Output = Vec<F::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut all_done = true;
+ match &mut self.kind {
+ JoinAllKind::Small { elems } => {
+ let mut all_done = true;
- for elem in iter_pin_mut(self.elems.as_mut()) {
- if elem.poll(cx).is_pending() {
- all_done = false;
- }
- }
+ for elem in iter_pin_mut(elems.as_mut()) {
+ if elem.poll(cx).is_pending() {
+ all_done = false;
+ }
+ }
- if all_done {
- let mut elems = mem::replace(&mut self.elems, Box::pin([]));
- let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
- Poll::Ready(result)
- } else {
- Poll::Pending
+ if all_done {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let result =
+ iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
+ Poll::Ready(result)
+ } else {
+ Poll::Pending
+ }
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
diff --git a/src/future/mod.rs b/src/future/mod.rs
index cd08264..374e365 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -68,6 +68,9 @@ pub use self::option::OptionFuture;
mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};
+mod poll_immediate;
+pub use self::poll_immediate::{poll_immediate, PollImmediate};
+
mod ready;
pub use self::ready::{err, ok, ready, Ready};
diff --git a/src/future/option.rs b/src/future/option.rs
index 426fe50..0bc3777 100644
--- a/src/future/option.rs
+++ b/src/future/option.rs
@@ -31,6 +31,12 @@ pin_project! {
}
}
+impl<F> Default for OptionFuture<F> {
+ fn default() -> Self {
+ Self { inner: None }
+ }
+}
+
impl<F: Future> Future for OptionFuture<F> {
type Output = Option<F::Output>;
diff --git a/src/future/poll_immediate.rs b/src/future/poll_immediate.rs
new file mode 100644
index 0000000..5ae555c
--- /dev/null
+++ b/src/future/poll_immediate.rs
@@ -0,0 +1,126 @@
+use super::assert_future;
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_core::{FusedFuture, Future, Stream};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`poll_immediate`](poll_immediate()) function.
+ ///
+ /// It will never return [Poll::Pending](core::task::Poll::Pending)
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct PollImmediate<T> {
+ #[pin]
+ future: Option<T>
+ }
+}
+
+impl<T, F> Future for PollImmediate<F>
+where
+ F: Future<Output = T>,
+{
+ type Output = Option<T>;
+
+ #[inline]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let mut this = self.project();
+ let inner =
+ this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
+ match inner.poll(cx) {
+ Poll::Ready(t) => {
+ this.future.set(None);
+ Poll::Ready(Some(t))
+ }
+ Poll::Pending => Poll::Ready(None),
+ }
+ }
+}
+
+impl<T: Future> FusedFuture for PollImmediate<T> {
+ fn is_terminated(&self) -> bool {
+ self.future.is_none()
+ }
+}
+
+/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done.
+/// The stream will never return [Poll::Pending](core::task::Poll::Pending)
+/// so polling it in a tight loop is worse than using a blocking synchronous function.
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::task::Poll;
+/// use futures::{StreamExt, future, pin_mut};
+/// use future::FusedFuture;
+///
+/// let f = async { 1_u32 };
+/// pin_mut!(f);
+/// let mut r = future::poll_immediate(f);
+/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
+///
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// let mut p = future::poll_immediate(f);
+/// assert_eq!(p.next().await, Some(Poll::Pending));
+/// assert!(!p.is_terminated());
+/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
+/// assert!(p.is_terminated());
+/// assert_eq!(p.next().await, None);
+/// # });
+/// ```
+impl<T, F> Stream for PollImmediate<F>
+where
+ F: Future<Output = T>,
+{
+ type Item = Poll<T>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ match this.future.as_mut().as_pin_mut() {
+ // inner is gone, so we can signal that the stream is closed.
+ None => Poll::Ready(None),
+ Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
+ this.future.set(None);
+ t
+ }))),
+ }
+ }
+}
+
+/// Creates a future that is immediately ready with an Option of a value.
+/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready).
+///
+/// # Caution
+///
+/// When consuming the future by this function, note the following:
+///
+/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value.
+/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let r = future::poll_immediate(async { 1_u32 });
+/// assert_eq!(r.await, Some(1));
+///
+/// let p = future::poll_immediate(future::pending::<i32>());
+/// assert_eq!(p.await, None);
+/// # });
+/// ```
+///
+/// ### Reusing a future
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{future, pin_mut};
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// assert_eq!(None, future::poll_immediate(&mut f).await);
+/// assert_eq!(42, f.await);
+/// # });
+/// ```
+pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
+ assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
+}
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 5931edc..2d585a9 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -1,4 +1,5 @@
use super::DEFAULT_BUF_SIZE;
+use futures_core::future::Future;
use futures_core::ready;
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
@@ -73,6 +74,40 @@ impl<R: AsyncRead> BufReader<R> {
}
}
+impl<R: AsyncRead + AsyncSeek> BufReader<R> {
+ /// Seeks relative to the current position. If the new position lies within the buffer,
+ /// the buffer will not be flushed, allowing for more efficient seeks.
+ /// This method does not return the location of the underlying reader, so the caller
+ /// must track this information themselves if it is required.
+ pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
+ SeeKRelative { inner: self, offset, first: true }
+ }
+
+ /// Attempts to seek relative to the current position. If the new position lies within the buffer,
+ /// the buffer will not be flushed, allowing for more efficient seeks.
+ /// This method does not return the location of the underlying reader, so the caller
+ /// must track this information themselves if it is required.
+ pub fn poll_seek_relative(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ offset: i64,
+ ) -> Poll<io::Result<()>> {
+ let pos = self.pos as u64;
+ if offset < 0 {
+ if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
+ *self.project().pos = new_pos as usize;
+ return Poll::Ready(Ok(()));
+ }
+ } else if let Some(new_pos) = pos.checked_add(offset as u64) {
+ if new_pos <= self.cap as u64 {
+ *self.project().pos = new_pos as usize;
+ return Poll::Ready(Ok(()));
+ }
+ }
+ self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
+ }
+}
+
impl<R: AsyncRead> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
@@ -163,6 +198,10 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
/// `.into_inner()` immediately after a seek yields the underlying reader
/// at the same position.
///
+ /// To seek without discarding the internal buffer, use
+ /// [`BufReader::seek_relative`](BufReader::seek_relative) or
+ /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
+ ///
/// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
///
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
@@ -200,3 +239,33 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
Poll::Ready(Ok(result))
}
}
+
+/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct SeeKRelative<'a, R> {
+ inner: Pin<&'a mut BufReader<R>>,
+ offset: i64,
+ first: bool,
+}
+
+impl<R> Future for SeeKRelative<'_, R>
+where
+ R: AsyncRead + AsyncSeek,
+{
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let offset = self.offset;
+ if self.first {
+ self.first = false;
+ self.inner.as_mut().poll_seek_relative(cx, offset)
+ } else {
+ self.inner
+ .as_mut()
+ .as_mut()
+ .poll_seek(cx, SeekFrom::Current(offset))
+ .map(|res| res.map(|_| ()))
+ }
+ }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index b96223d..16cf5a7 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -56,7 +56,7 @@ mod allow_std;
pub use self::allow_std::AllowStdIo;
mod buf_reader;
-pub use self::buf_reader::BufReader;
+pub use self::buf_reader::{BufReader, SeeKRelative};
mod buf_writer;
pub use self::buf_writer::BufWriter;
diff --git a/src/lib.rs b/src/lib.rs
index 5f803a7..76d3799 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -301,18 +301,18 @@ macro_rules! delegate_all {
}
pub mod future;
-#[doc(hidden)]
+#[doc(no_inline)]
pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
pub mod stream;
-#[doc(hidden)]
+#[doc(no_inline)]
pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub mod sink;
#[cfg(feature = "sink")]
-#[doc(hidden)]
+#[doc(no_inline)]
pub use crate::sink::{Sink, SinkExt};
pub mod task;
@@ -329,7 +329,7 @@ pub mod compat;
pub mod io;
#[cfg(feature = "io")]
#[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
pub use crate::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
AsyncWriteExt,
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index 8cf9f80..ec685b9 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -19,8 +19,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
- Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip,
- SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
+ Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
+ Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};
#[cfg(feature = "std")]
@@ -88,6 +88,9 @@ pub use self::pending::{pending, Pending};
mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};
+mod poll_immediate;
+pub use self::poll_immediate::{poll_immediate, PollImmediate};
+
mod select;
pub use self::select::{select, Select};
diff --git a/src/stream/poll_immediate.rs b/src/stream/poll_immediate.rs
new file mode 100644
index 0000000..c7e8a5b
--- /dev/null
+++ b/src/stream/poll_immediate.rs
@@ -0,0 +1,80 @@
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_core::Stream;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [poll_immediate](poll_immediate()) function.
+ ///
+ /// It will never return [Poll::Pending](core::task::Poll::Pending)
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct PollImmediate<S> {
+ #[pin]
+ stream: Option<S>
+ }
+}
+
+impl<T, S> Stream for PollImmediate<S>
+where
+ S: Stream<Item = T>,
+{
+ type Item = Poll<T>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ let stream = match this.stream.as_mut().as_pin_mut() {
+ // inner is gone, so we can continue to signal that the stream is closed.
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
+
+ match stream.poll_next(cx) {
+ Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
+ Poll::Ready(None) => {
+ this.stream.set(None);
+ Poll::Ready(None)
+ }
+ Poll::Pending => Poll::Ready(Some(Poll::Pending)),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint)
+ }
+}
+
+impl<S: Stream> super::FusedStream for PollImmediate<S> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_none()
+ }
+}
+
+/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it.
+///
+/// This is useful when immediacy is more important than waiting for the next item to be ready.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::stream::{self, StreamExt};
+/// use futures::task::Poll;
+///
+/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3)));
+/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
+/// assert_eq!(r.next().await, Some(Poll::Ready(2)));
+/// assert_eq!(r.next().await, None);
+///
+/// let mut p = stream::poll_immediate(Box::pin(stream::once(async {
+/// futures::pending!();
+/// 42_u8
+/// })));
+/// assert_eq!(p.next().await, Some(Poll::Pending));
+/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
+/// assert_eq!(p.next().await, None);
+/// # });
+/// ```
+pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> {
+ super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) })
+}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index b3b0155..86997f4 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -131,7 +131,7 @@ pub use self::select_next_some::SelectNextSome;
mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::peek::{NextIf, NextIfEq, Peek, Peekable};
+pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index 217faba..c72dfc3 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> {
delegate_access_inner!(stream, St, (.));
- /// Produces a `Peek` future which retrieves a reference to the next item
+ /// Produces a future which retrieves a reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
Peek { inner: Some(self) }
@@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> {
})
}
+ /// Produces a future which retrieves a mutable reference to the next item
+ /// in the stream, or `None` if the underlying stream terminates.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ /// use futures::pin_mut;
+ ///
+ /// let stream = stream::iter(vec![1, 2, 3]).peekable();
+ /// pin_mut!(stream);
+ ///
+ /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
+ /// assert_eq!(stream.as_mut().next().await, Some(1));
+ ///
+ /// // Peek into the stream and modify the value which will be returned next
+ /// if let Some(p) = stream.as_mut().peek_mut().await {
+ /// if *p == 2 {
+ /// *p = 5;
+ /// }
+ /// }
+ ///
+ /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
+ /// # });
+ /// ```
+ pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
+ PeekMut { inner: Some(self) }
+ }
+
+ /// Peek retrieves a mutable reference to the next item in the stream.
+ pub fn poll_peek_mut(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<&mut St::Item>> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ if this.peeked.is_some() {
+ break this.peeked.as_mut();
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ *this.peeked = Some(item);
+ } else {
+ break None;
+ }
+ })
+ }
+
/// Creates a future which will consume and return the next value of this
/// stream if a condition is true.
///
@@ -221,6 +269,48 @@ where
}
pin_project! {
+ /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
+ #[must_use = "futures do nothing unless polled"]
+ pub struct PeekMut<'a, St: Stream> {
+ inner: Option<Pin<&'a mut Peekable<St>>>,
+ }
+}
+
+impl<St> fmt::Debug for PeekMut<'_, St>
+where
+ St: Stream + fmt::Debug,
+ St::Item: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("PeekMut").field("inner", &self.inner).finish()
+ }
+}
+
+impl<St: Stream> FusedFuture for PeekMut<'_, St> {
+ fn is_terminated(&self) -> bool {
+ self.inner.is_none()
+ }
+}
+
+impl<'a, St> Future for PeekMut<'a, St>
+where
+ St: Stream,
+{
+ type Output = Option<&'a mut St::Item>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = self.project().inner;
+ if let Some(peekable) = inner {
+ ready!(peekable.as_mut().poll_peek_mut(cx));
+
+ inner.take().unwrap().poll_peek_mut(cx)
+ } else {
+ panic!("PeekMut polled after completion")
+ }
+ }
+}
+
+pin_project! {
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
#[must_use = "futures do nothing unless polled"]
pub struct NextIf<'a, St: Stream, F> {