aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2021-10-15 01:05:12 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2021-10-15 01:05:12 +0000
commitaf2aab25d57eb3bf86de46d2bf674304ddc7dd0b (patch)
tree3b2fc897370dbc19b8f0f0a0593a3dafa8b7665b
parent82fe7202143e2bb9e476c804e478bd77404271f1 (diff)
parent5e0588aecefd494dd0a35c857aa03bfa22643e9f (diff)
downloadfutures-af2aab25d57eb3bf86de46d2bf674304ddc7dd0b.tar.gz
Snap for 7825444 from 5e0588aecefd494dd0a35c857aa03bfa22643e9f to tm-d1-release
Change-Id: If223d22fef4c275a20c33d393711b6ec4c03b653
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp23
-rw-r--r--Cargo.toml16
-rw-r--r--Cargo.toml.orig16
-rw-r--r--METADATA8
-rw-r--r--src/lib.rs20
-rw-r--r--tests/async_await_macros.rs40
-rw-r--r--tests/auto_traits.rs28
-rw-r--r--tests/io_buf_reader.rs292
-rw-r--r--tests/stream_peekable.rs19
10 files changed, 319 insertions, 145 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 99dc8b0..ffd4f55 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee"
+ "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
}
}
diff --git a/Android.bp b/Android.bp
index f6e9a98..9053466 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,6 +41,8 @@ rust_library {
name: "libfutures",
host_supported: true,
crate_name: "futures",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.17",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -67,24 +69,3 @@ rust_library {
],
min_sdk_version: "29",
}
-
-// dependent_library ["feature_list"]
-// autocfg-1.0.1
-// futures-channel-0.3.16 "alloc,futures-sink,sink,std"
-// futures-core-0.3.16 "alloc,std"
-// futures-executor-0.3.16 "std"
-// futures-io-0.3.16 "std"
-// futures-macro-0.3.16
-// futures-sink-0.3.16 "alloc,std"
-// futures-task-0.3.16 "alloc,std"
-// futures-util-0.3.16 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
-// memchr-2.4.0 "default,std"
-// pin-project-lite-0.2.7
-// pin-utils-0.1.0
-// proc-macro-hack-0.5.19
-// proc-macro-nested-0.1.7
-// proc-macro2-1.0.28 "default,proc-macro"
-// quote-1.0.9 "default,proc-macro"
-// slab-0.4.4 "default,std"
-// syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
-// unicode-xid-0.2.2 "default"
diff --git a/Cargo.toml b/Cargo.toml
index c934741..17c4d5f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,7 +12,7 @@
[package]
edition = "2018"
name = "futures"
-version = "0.3.16"
+version = "0.3.17"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
homepage = "https://rust-lang.github.io/futures-rs"
@@ -29,33 +29,33 @@ rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
[dependencies.futures-channel]
-version = "0.3.16"
+version = "0.3.17"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.16"
+version = "0.3.17"
default-features = false
[dependencies.futures-executor]
-version = "0.3.16"
+version = "0.3.17"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.16"
+version = "0.3.17"
default-features = false
[dependencies.futures-sink]
-version = "0.3.16"
+version = "0.3.17"
default-features = false
[dependencies.futures-task]
-version = "0.3.16"
+version = "0.3.17"
default-features = false
[dependencies.futures-util]
-version = "0.3.16"
+version = "0.3.17"
features = ["sink"]
default-features = false
[dev-dependencies.assert_matches]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 04cea8a..b01b12e 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
[package]
name = "futures"
edition = "2018"
-version = "0.3.16"
+version = "0.3.17"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
readme = "../README.md"
@@ -16,13 +16,13 @@ composability, and iterator-like interfaces.
categories = ["asynchronous"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.16", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.16", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.16", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.16", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.16", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.17", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.17", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.17", default-features = false, features = ["sink"] }
[dev-dependencies]
futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
diff --git a/METADATA b/METADATA
index 60344d2..ba70d3e 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures/futures-0.3.16.crate"
+ value: "https://static.crates.io/crates/futures/futures-0.3.17.crate"
}
- version: "0.3.16"
+ version: "0.3.17"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 8
- day: 9
+ month: 9
+ day: 22
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 287696f..362aa3c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -102,26 +102,26 @@ compile_error!("The `bilock` feature requires the `unstable` feature as an expli
#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_core::future::{Future, TryFuture};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::future::{FutureExt, TryFutureExt};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_core::stream::{Stream, TryStream};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::stream::{StreamExt, TryStreamExt};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_sink::Sink;
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::sink::SinkExt;
#[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
// Macro reexports
@@ -137,6 +137,10 @@ pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-a
#[doc(inline)]
pub use futures_util::{future, never, sink, stream, task};
+#[cfg(feature = "std")]
+#[cfg(feature = "async-await")]
+pub use futures_util::stream_select;
+
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_channel as channel;
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index 19833d0..ce1f3a3 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -4,7 +4,9 @@ use futures::future::{self, poll_fn, FutureExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::{Context, Poll};
-use futures::{join, pending, pin_mut, poll, select, select_biased, try_join};
+use futures::{
+ join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
+};
use std::mem;
#[test]
@@ -309,6 +311,42 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
}
#[test]
+#[allow(unused_assignments)]
+fn stream_select() {
+ // stream_select! macro
+ block_on(async {
+ let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
+
+ let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
+ assert_eq!(endless_ones.next().await, Some(1));
+ assert_eq!(endless_ones.next().await, Some(1));
+
+ let mut finite_list =
+ stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
+ assert_eq!(finite_list.next().await, Some(1));
+ assert_eq!(finite_list.next().await, Some(1));
+ assert_eq!(finite_list.next().await, None);
+
+ let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
+ // Take 1000, and assert a somewhat even distribution of values.
+ // The fairness is randomized, but over 1000 samples we should be pretty close to even.
+ // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
+ let mut count = 0;
+ let results = endless_mixed
+ .take_while(move |_| {
+ count += 1;
+ let ret = count < 1000;
+ async move { ret }
+ })
+ .collect::<Vec<_>>()
+ .await;
+ assert!(results.iter().filter(|x| **x == 1).count() >= 299);
+ assert!(results.iter().filter(|x| **x == 2).count() >= 299);
+ assert!(results.iter().filter(|x| **x == 3).count() >= 299);
+ });
+}
+
+#[test]
fn join_size() {
let fut = async {
let ready = future::ready(0i32);
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index e0192a1..b3d8b00 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -470,6 +470,13 @@ pub mod future {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);
+ assert_impl!(PollImmediate<SendStream>: Send);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+ assert_impl!(PollImmediate<SyncStream>: Sync);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+ assert_impl!(PollImmediate<UnpinStream>: Unpin);
+ assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
assert_impl!(Ready<()>: Send);
assert_not_impl!(Ready<*const ()>: Send);
assert_impl!(Ready<()>: Sync);
@@ -810,6 +817,12 @@ pub mod io {
assert_impl!(Seek<'_, ()>: Unpin);
assert_not_impl!(Seek<'_, PhantomPinned>: Unpin);
+ assert_impl!(SeeKRelative<'_, ()>: Send);
+ assert_not_impl!(SeeKRelative<'_, *const ()>: Send);
+ assert_impl!(SeeKRelative<'_, ()>: Sync);
+ assert_not_impl!(SeeKRelative<'_, *const ()>: Sync);
+ assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin);
+
assert_impl!(Sink: Send);
assert_impl!(Sink: Sync);
assert_impl!(Sink: Unpin);
@@ -1430,6 +1443,14 @@ pub mod stream {
assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
assert_impl!(Peek<'_, PinnedStream>: Unpin);
+ assert_impl!(PeekMut<'_, SendStream<()>>: Send);
+ assert_not_impl!(PeekMut<'_, SendStream>: Send);
+ assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
+ assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
+ assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
+ assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
+ assert_impl!(PeekMut<'_, PinnedStream>: Unpin);
+
assert_impl!(Peekable<SendStream<()>>: Send);
assert_not_impl!(Peekable<SendStream>: Send);
assert_not_impl!(Peekable<LocalStream>: Send);
@@ -1451,6 +1472,13 @@ pub mod stream {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);
+ assert_impl!(PollImmediate<SendStream>: Send);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+ assert_impl!(PollImmediate<SyncStream>: Sync);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+ assert_impl!(PollImmediate<UnpinStream>: Unpin);
+ assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
assert_impl!(ReadyChunks<SendStream<()>>: Send);
assert_not_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
index d60df87..717297c 100644
--- a/tests/io_buf_reader.rs
+++ b/tests/io_buf_reader.rs
@@ -2,25 +2,17 @@ use futures::executor::block_on;
use futures::future::{Future, FutureExt};
use futures::io::{
AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
- BufReader, Cursor, SeekFrom,
+ BufReader, SeekFrom,
};
+use futures::pin_mut;
use futures::task::{Context, Poll};
use futures_test::task::noop_context;
+use pin_project::pin_project;
use std::cmp;
use std::io;
use std::pin::Pin;
-macro_rules! run_fill_buf {
- ($reader:expr) => {{
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
- break x;
- }
- }
- }};
-}
-
+// helper for maybe_pending_* tests
fn run<F: Future + Unpin>(mut f: F) -> F::Output {
let mut cx = noop_context();
loop {
@@ -30,6 +22,49 @@ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
}
}
+// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719
+#[pin_project(!Unpin)]
+struct Cursor<T> {
+ #[pin]
+ inner: futures::io::Cursor<T>,
+}
+
+impl<T> Cursor<T> {
+ fn new(inner: T) -> Self {
+ Self { inner: futures::io::Cursor::new(inner) }
+ }
+}
+
+impl AsyncRead for Cursor<&[u8]> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_read(cx, buf)
+ }
+}
+
+impl AsyncBufRead for Cursor<&[u8]> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.project().inner.poll_fill_buf(cx)
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ self.project().inner.consume(amt)
+ }
+}
+
+impl AsyncSeek for Cursor<&[u8]> {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ self.project().inner.poll_seek(cx, pos)
+ }
+}
+
struct MaybePending<'a> {
inner: &'a [u8],
ready_read: bool,
@@ -80,54 +115,119 @@ impl AsyncBufRead for MaybePending<'_> {
#[test]
fn test_buffered_reader() {
- let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
- let mut reader = BufReader::with_capacity(2, inner);
-
- let mut buf = [0, 0, 0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 3);
- assert_eq!(buf, [5, 6, 7]);
- assert_eq!(reader.buffer(), []);
-
- let mut buf = [0, 0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 2);
- assert_eq!(buf, [0, 1]);
- assert_eq!(reader.buffer(), []);
-
- let mut buf = [0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 1);
- assert_eq!(buf, [2]);
- assert_eq!(reader.buffer(), [3]);
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, inner);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ });
+}
- let mut buf = [0, 0, 0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 1);
- assert_eq!(buf, [3, 0, 0]);
- assert_eq!(reader.buffer(), []);
+#[test]
+fn test_buffered_reader_seek() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(2, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]);
+ reader.as_mut().consume(1);
+ assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
+ });
+}
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 1);
- assert_eq!(buf, [4, 0, 0]);
- assert_eq!(reader.buffer(), []);
+#[test]
+fn test_buffered_reader_seek_relative() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(2, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert!(reader.as_mut().seek_relative(3).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.as_mut().seek_relative(0).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.as_mut().seek_relative(1).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]);
+ assert!(reader.as_mut().seek_relative(-1).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.as_mut().seek_relative(2).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]);
+ });
+}
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+#[test]
+fn test_buffered_reader_invalidated_after_read() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(3, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+ reader.as_mut().consume(3);
+
+ let mut buffer = [0, 0, 0, 0, 0];
+ assert_eq!(reader.read(&mut buffer).await.unwrap(), 5);
+ assert_eq!(buffer, [0, 1, 2, 3, 4]);
+
+ assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+ let mut buffer = [0, 0];
+ assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+ assert_eq!(buffer, [3, 4]);
+ });
}
#[test]
-fn test_buffered_reader_seek() {
- let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
- let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
-
- assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
- assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
- assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
- Pin::new(&mut reader).consume(1);
- assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+fn test_buffered_reader_invalidated_after_seek() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(3, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+ reader.as_mut().consume(3);
+
+ assert!(reader.seek(SeekFrom::Current(5)).await.is_ok());
+
+ assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+ let mut buffer = [0, 0];
+ assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+ assert_eq!(buffer, [3, 4]);
+ });
}
#[test]
@@ -156,24 +256,27 @@ fn test_buffered_reader_seek_underflow() {
self.pos = self.pos.wrapping_add(n as u64);
}
SeekFrom::End(n) => {
- self.pos = u64::max_value().wrapping_add(n as u64);
+ self.pos = u64::MAX.wrapping_add(n as u64);
}
}
Ok(self.pos)
}
}
- let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
- assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5));
- assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
- // the following seek will require two underlying seeks
- let expected = 9_223_372_036_854_775_802;
- assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
- assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
- // seeking to 0 should empty the buffer.
- assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
- assert_eq!(reader.get_ref().get_ref().pos, expected);
+ block_on(async {
+ let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
+ pin_mut!(reader);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]);
+ assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+ // the following seek will require two underlying seeks
+ let expected = 9_223_372_036_854_775_802;
+ assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+ // seeking to 0 should empty the buffer.
+ assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
+ assert_eq!(reader.get_ref().get_ref().pos, expected);
+ });
}
#[test]
@@ -193,16 +296,18 @@ fn test_short_reads() {
}
}
- let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
- let mut reader = BufReader::new(AllowStdIo::new(inner));
- let mut buf = [0, 0];
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ block_on(async {
+ let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
+ let mut reader = BufReader::new(AllowStdIo::new(inner));
+ let mut buf = [0, 0];
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ });
}
#[test]
@@ -263,7 +368,9 @@ fn maybe_pending_buf_read() {
// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
#[test]
fn maybe_pending_seek() {
+ #[pin_project]
struct MaybePendingSeek<'a> {
+ #[pin]
inner: Cursor<&'a [u8]>,
ready: bool,
}
@@ -276,25 +383,21 @@ fn maybe_pending_seek() {
impl AsyncRead for MaybePendingSeek<'_> {
fn poll_read(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- Pin::new(&mut self.inner).poll_read(cx, buf)
+ self.project().inner.poll_read(cx, buf)
}
}
impl AsyncBufRead for MaybePendingSeek<'_> {
- fn poll_fill_buf(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<&[u8]>> {
- let this: *mut Self = &mut *self as *mut _;
- Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.project().inner.poll_fill_buf(cx)
}
- fn consume(mut self: Pin<&mut Self>, amt: usize) {
- Pin::new(&mut self.inner).consume(amt)
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ self.project().inner.consume(amt)
}
}
@@ -305,24 +408,25 @@ fn maybe_pending_seek() {
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
if self.ready {
- self.ready = false;
- Pin::new(&mut self.inner).poll_seek(cx, pos)
+ *self.as_mut().project().ready = false;
+ self.project().inner.poll_seek(cx, pos)
} else {
- self.ready = true;
+ *self.project().ready = true;
Poll::Pending
}
}
}
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
- let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+ let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+ pin_mut!(reader);
assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
- assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None);
+ assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+ assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..]));
Pin::new(&mut reader).consume(1);
assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
}
diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs
index 2fa7f3a..153fcc2 100644
--- a/tests/stream_peekable.rs
+++ b/tests/stream_peekable.rs
@@ -9,6 +9,25 @@ fn peekable() {
pin_mut!(peekable);
assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
+
+ let s = stream::once(async { 1 }).peekable();
+ pin_mut!(s);
+ assert_eq!(s.as_mut().peek().await, Some(&1u8));
+ assert_eq!(s.collect::<Vec<u8>>().await, vec![1]);
+ });
+}
+
+#[test]
+fn peekable_mut() {
+ block_on(async {
+ let s = stream::iter(vec![1u8, 2, 3]).peekable();
+ pin_mut!(s);
+ if let Some(p) = s.as_mut().peek_mut().await {
+ if *p == 1 {
+ *p = 5;
+ }
+ }
+ assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
});
}