diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-09 20:35:43 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-09 20:35:43 +0000 |
commit | d05d7a7db7e0ed4a206fb9ea2b16ad25080b0297 (patch) | |
tree | 4bd872c7931921cfe12cc2fa1b72915778e481d3 | |
parent | 1454173d5097444c2156243392330c6e638ca8d0 (diff) | |
parent | 3ccd22c1882fcff9f5442478123ad654ce0b9105 (diff) | |
download | futures-channel-android13-mainline-media-release.tar.gz |
Snap for 8562061 from 3ccd22c1882fcff9f5442478123ad654ce0b9105 to mainline-media-releaseaml_med_331911000aml_med_331712010aml_med_331612000aml_med_331511000aml_med_331410000aml_med_331318000aml_med_331115000aml_med_331012020android13-mainline-media-release
Change-Id: I2fef201d7337211c9c35a7479096c57805a86594
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 6 | ||||
-rw-r--r-- | Cargo.toml | 38 | ||||
-rw-r--r-- | Cargo.toml.orig | 18 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | TEST_MAPPING | 49 | ||||
-rw-r--r-- | benches/sync_mpsc.rs | 15 | ||||
-rw-r--r-- | build.rs | 41 | ||||
-rw-r--r-- | cargo2android.json | 5 | ||||
-rw-r--r-- | no_atomic_cas.rs | 13 | ||||
-rw-r--r-- | src/lib.rs | 54 | ||||
-rw-r--r-- | src/lock.rs | 7 | ||||
-rw-r--r-- | src/mpsc/mod.rs | 196 | ||||
-rw-r--r-- | src/mpsc/queue.rs | 22 | ||||
-rw-r--r-- | src/mpsc/sink_impl.rs | 58 | ||||
-rw-r--r-- | src/oneshot.rs | 39 | ||||
-rw-r--r-- | tests/channel.rs | 10 | ||||
-rw-r--r-- | tests/mpsc-close.rs | 51 | ||||
-rw-r--r-- | tests/mpsc.rs | 58 | ||||
-rw-r--r-- | tests/oneshot.rs | 18 |
21 files changed, 381 insertions, 355 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f3ad3ab..deccf0d 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" - } -} + "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + }, + "path_in_vcs": "futures-channel" +}
\ No newline at end of file @@ -45,6 +45,8 @@ rust_library { name: "libfutures_channel", host_supported: true, crate_name: "futures_channel", + cargo_env_compat: true, + cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -57,11 +59,9 @@ rust_library { ], apex_available: [ "//apex_available:platform", + "com.android.bluetooth", "com.android.resolv", "com.android.virt", ], min_sdk_version: "29", } - -// dependent_library ["feature_list"] -// futures-core-0.3.14 "alloc,std" @@ -3,32 +3,37 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" +rust-version = "1.45" name = "futures-channel" -version = "0.3.13" -authors = ["Alex Crichton <alex@alexcrichton.com>"] -description = "Channels for asynchronous communication using futures-rs.\n" +version = "0.3.21" +description = """ +Channels for asynchronous communication using futures-rs. +""" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-channel/0.3" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] + [dependencies.futures-core] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-sink] -version = "0.3.13" +version = "0.3.21" optional = true default-features = false @@ -36,8 +41,11 @@ default-features = false [features] alloc = ["futures-core/alloc"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] +cfg-target-has-atomic = [] default = ["std"] sink = ["futures-sink"] -std = ["alloc", "futures-core/std"] -unstable = ["futures-core/unstable"] +std = [ + "alloc", + "futures-core/std", +] +unstable = [] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 9a33320..f356eab 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,12 +1,11 @@ [package] name = "futures-channel" +version = "0.3.21" edition = "2018" -version = "0.3.13" -authors = ["Alex Crichton <alex@alexcrichton.com>"] +rust-version = "1.45" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-channel/0.3" description = """ Channels for asynchronous communication using futures-rs. """ @@ -17,15 +16,14 @@ std = ["alloc", "futures-core/std"] alloc = ["futures-core/alloc"] sink = ["futures-sink"] -# Unstable features -# These features are outside of the normal semver guarantees and require the -# `unstable` feature as an explicit opt-in to unstable API. -unstable = ["futures-core/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"] +# These features are no longer used. +# TODO: remove in the next major version. +unstable = [] +cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true } [dev-dependencies] futures = { path = "../futures", default-features = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate" + value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.21.crate" } - version: "0.3.13" + version: "0.3.21" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 4 + year: 2022 + month: 3 day: 1 } } diff --git a/README.md b/README.md new file mode 100644 index 0000000..3287be9 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# futures-channel + +Channels for asynchronous communication using futures-rs. + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-channel = "0.3" +``` + +The current `futures-channel` requires Rust 1.45 or later. + +## License + +Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or +[MIT license](LICENSE-MIT) at your option. + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. diff --git a/TEST_MAPPING b/TEST_MAPPING index 6798806..5ef61de 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -1,56 +1,45 @@ // Generated by update_crate_tests.py for tests that depend on this crate. { - "presubmit": [ - { - "name": "anyhow_device_test_tests_test_boxed" - }, - { - "name": "anyhow_device_test_tests_test_convert" - }, - { - "name": "anyhow_device_test_tests_test_ffi" - }, - { - "name": "anyhow_device_test_tests_test_repr" - }, - { - "name": "tokio-test_device_test_tests_block_on" - }, + "imports": [ { - "name": "anyhow_device_test_tests_test_chain" + "path": "external/rust/crates/anyhow" }, { - "name": "anyhow_device_test_tests_test_source" + "path": "external/rust/crates/futures-util" }, { - "name": "tokio-test_device_test_tests_io" + "path": "external/rust/crates/tokio" }, { - "name": "anyhow_device_test_tests_test_autotrait" - }, + "path": "external/rust/crates/tokio-test" + } + ], + "presubmit": [ { - "name": "anyhow_device_test_src_lib" + "name": "ZipFuseTest" }, { - "name": "anyhow_device_test_tests_test_context" + "name": "authfs_device_test_src_lib" }, { - "name": "anyhow_device_test_tests_test_downcast" + "name": "doh_unit_test" }, { - "name": "anyhow_device_test_tests_test_macros" - }, + "name": "virtualizationservice_device_test" + } + ], + "presubmit-rust": [ { - "name": "futures-util_device_test_src_lib" + "name": "ZipFuseTest" }, { - "name": "anyhow_device_test_tests_test_fmt" + "name": "authfs_device_test_src_lib" }, { - "name": "tokio-test_device_test_tests_macros" + "name": "doh_unit_test" }, { - "name": "tokio-test_device_test_src_lib" + "name": "virtualizationservice_device_test" } ] } diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index e22fe60..7c3c3d3 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -7,8 +7,8 @@ use { futures::{ channel::mpsc::{self, Sender, UnboundedSender}, ready, - stream::{Stream, StreamExt}, sink::Sink, + stream::{Stream, StreamExt}, task::{Context, Poll}, }, futures_test::task::noop_context, @@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) { // 1000 iterations to avoid measuring overhead of initialization // Result should be divided by 1000 for i in 0..1000 { - // Poll, not ready, park assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); @@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) { }) } - /// A Stream that continuously sends incrementing number of the queue struct TestSender { tx: Sender<u32>, @@ -84,9 +82,7 @@ struct TestSender { impl Stream for TestSender { type Item = u32; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Option<Self::Item>> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = &mut *self; let mut tx = Pin::new(&mut this.tx); @@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) { // Each sender can send one item after specified capacity let (tx, mut rx) = mpsc::channel(0); - let mut tx: Vec<_> = (0..100).map(|_| { - TestSender { - tx: tx.clone(), - last: 0 - } - }).collect(); + let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); for i in 0..10 { for x in &mut tx { diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..05e0496 --- /dev/null +++ b/build.rs @@ -0,0 +1,41 @@ +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `futures_no_atomic_cas` +// Assume the target does *not* support atomic CAS operations. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't + // run. This is needed for compatibility with non-cargo build systems that + // don't run the build script. + if NO_ATOMIC_CAS.contains(&&*target) { + println!("cargo:rustc-cfg=futures_no_atomic_cas"); + } + + println!("cargo:rerun-if-changed=no_atomic_cas.rs"); +} diff --git a/cargo2android.json b/cargo2android.json index 01465d0..a7e2a4b 100644 --- a/cargo2android.json +++ b/cargo2android.json @@ -1,11 +1,12 @@ { "apex-available": [ "//apex_available:platform", + "com.android.bluetooth", "com.android.resolv", "com.android.virt" ], - "min_sdk_version": "29", "dependencies": true, "device": true, + "min-sdk-version": "29", "run": true -}
\ No newline at end of file +} diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs new file mode 100644 index 0000000..9b05d4b --- /dev/null +++ b/no_atomic_cas.rs @@ -0,0 +1,13 @@ +// This file is @generated by no_atomic_cas.sh. +// It is not intended for manual editing. + +const NO_ATOMIC_CAS: &[&str] = &[ + "avr-unknown-gnu-atmega328", + "bpfeb-unknown-none", + "bpfel-unknown-none", + "msp430-none-elf", + "riscv32i-unknown-none-elf", + "riscv32imc-unknown-none-elf", + "thumbv4t-none-eabi", + "thumbv6m-none-eabi", +]; @@ -11,34 +11,32 @@ //! All items are only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] - #![cfg_attr(not(feature = "std"), no_std)] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] -// It cannot be included in the published code because this lints have false positives in the minimum required version. -#![cfg_attr(test, warn(single_use_lifetimes))] -#![warn(clippy::all)] -#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] - -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - -macro_rules! cfg_target_has_atomic { - ($($item:item)*) => {$( - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - $item - )*}; -} - -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - extern crate alloc; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +extern crate alloc; - #[cfg(feature = "alloc")] - mod lock; - #[cfg(feature = "std")] - pub mod mpsc; - #[cfg(feature = "alloc")] - pub mod oneshot; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod lock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "std")] +pub mod mpsc; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub mod oneshot; diff --git a/src/lock.rs b/src/lock.rs index 5eecdd9..b328d0f 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -6,8 +6,8 @@ use core::cell::UnsafeCell; use core::ops::{Deref, DerefMut}; -use core::sync::atomic::Ordering::SeqCst; use core::sync::atomic::AtomicBool; +use core::sync::atomic::Ordering::SeqCst; /// A "mutex" around a value, similar to `std::sync::Mutex<T>`. /// @@ -37,10 +37,7 @@ unsafe impl<T: Send> Sync for Lock<T> {} impl<T> Lock<T> { /// Creates a new lock around the given value. pub(crate) fn new(t: T) -> Self { - Self { - locked: AtomicBool::new(false), - data: UnsafeCell::new(t), - } + Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) } } /// Attempts to acquire this lock, returning whether the lock was acquired or diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs index dd50343..44834b7 100644 --- a/src/mpsc/mod.rs +++ b/src/mpsc/mod.rs @@ -79,13 +79,13 @@ // by the queue structure. use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll, Waker}; use futures_core::task::__internal::AtomicWaker; +use futures_core::task::{Context, Poll, Waker}; use std::fmt; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; use std::thread; use crate::mpsc::queue::Queue; @@ -209,9 +209,7 @@ impl SendError { impl<T> fmt::Debug for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TrySendError") - .field("kind", &self.err.kind) - .finish() + f.debug_struct("TrySendError").field("kind", &self.err.kind).finish() } } @@ -251,8 +249,7 @@ impl<T> TrySendError<T> { impl fmt::Debug for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("TryRecvError") - .finish() + f.debug_tuple("TryRecvError").finish() } } @@ -335,10 +332,7 @@ struct SenderTask { impl SenderTask { fn new() -> Self { - Self { - task: None, - is_parked: false, - } + Self { task: None, is_parked: false } } fn notify(&mut self) { @@ -381,9 +375,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { maybe_parked: false, }; - let rx = Receiver { - inner: Some(inner), - }; + let rx = Receiver { inner: Some(inner) }; (Sender(Some(tx)), rx) } @@ -399,7 +391,6 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { - let inner = Arc::new(UnboundedInner { state: AtomicUsize::new(INIT_STATE), message_queue: Queue::new(), @@ -407,13 +398,9 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { recv_task: AtomicWaker::new(), }); - let tx = UnboundedSenderInner { - inner: inner.clone(), - }; + let tx = UnboundedSenderInner { inner: inner.clone() }; - let rx = UnboundedReceiver { - inner: Some(inner), - }; + let rx = UnboundedReceiver { inner: Some(inner) }; (UnboundedSender(Some(tx)), rx) } @@ -430,13 +417,10 @@ impl<T> UnboundedSenderInner<T> { if state.is_open { Poll::Ready(Ok(())) } else { - Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })) + Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) } } - // Push message to the queue and signal to the receiver fn queue_push_and_signal(&self, msg: T) { // Push the message onto the message queue @@ -462,16 +446,17 @@ impl<T> UnboundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -516,12 +501,7 @@ impl<T> BoundedSenderInner<T> { fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { // If the sender is currently blocked, reject the message if !self.poll_unparked(None).is_ready() { - return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Full, - }, - val: msg, - }); + return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); } // The channel has capacity to accept the message, so send it @@ -530,11 +510,8 @@ impl<T> BoundedSenderInner<T> { // Do the send without failing. // Can be called only by bounded sender. - #[allow(clippy::debug_assert_with_mut_call)] - fn do_send_b(&mut self, msg: T) - -> Result<(), TrySendError<T>> - { - // Anyone callig do_send *should* make sure there is room first, + fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { + // Anyone calling do_send *should* make sure there is room first, // but assert here for tests as a sanity check. debug_assert!(self.poll_unparked(None).is_ready()); @@ -551,12 +528,12 @@ impl<T> BoundedSenderInner<T> { // the configured buffer size num_messages > self.inner.buffer } - None => return Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }), + None => { + return Err(TrySendError { + err: SendError { kind: SendErrorKind::Disconnected }, + val: msg, + }) + } }; // If the channel has reached capacity, then the sender task needs to @@ -600,16 +577,17 @@ impl<T> BoundedSenderInner<T> { // This probably is never hit? Odds are the process will run out of // memory first. It may be worth to return something else in this // case? - assert!(state.num_messages < MAX_CAPACITY, "buffer space \ - exhausted; sending this messages would overflow the state"); + assert!( + state.num_messages < MAX_CAPACITY, + "buffer space \ + exhausted; sending this messages would overflow the state" + ); state.num_messages += 1; let next = encode_state(&state); match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { - Ok(_) => { - return Some(state.num_messages) - } + Ok(_) => return Some(state.num_messages), Err(actual) => curr = actual, } } @@ -644,15 +622,10 @@ impl<T> BoundedSenderInner<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { let state = decode_state(self.inner.state.load(SeqCst)); if !state.is_open { - return Poll::Ready(Err(SendError { - kind: SendErrorKind::Disconnected, - })); + return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); } self.poll_unparked(Some(cx)).map(Ok) @@ -699,7 +672,7 @@ impl<T> BoundedSenderInner<T> { if !task.is_parked { self.maybe_parked = false; - return Poll::Ready(()) + return Poll::Ready(()); } // At this point, an unpark request is pending, so there will be an @@ -724,12 +697,7 @@ impl<T> Sender<T> { if let Some(inner) = &mut self.0 { inner.try_send(msg) } else { - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } } @@ -739,8 +707,7 @@ impl<T> Sender<T> { /// [`poll_ready`](Sender::poll_ready) has reported that the channel is /// ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.try_send(msg) - .map_err(|e| e.err) + self.try_send(msg).map_err(|e| e.err) } /// Polls the channel to determine if there is guaranteed capacity to send @@ -755,13 +722,8 @@ impl<T> Sender<T> { /// capacity, in which case the current task is queued to be notified once /// capacity is available; /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. - pub fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { - let inner = self.0.as_mut().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready(cx) } @@ -799,7 +761,10 @@ impl<T> Sender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -809,13 +774,8 @@ impl<T> Sender<T> { impl<T> UnboundedSender<T> { /// Check if the channel is ready to receive a message. - pub fn poll_ready( - &self, - _: &mut Context<'_>, - ) -> Poll<Result<(), SendError>> { - let inner = self.0.as_ref().ok_or(SendError { - kind: SendErrorKind::Disconnected, - })?; + pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { + let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; inner.poll_ready_nb() } @@ -845,12 +805,7 @@ impl<T> UnboundedSender<T> { } } - Err(TrySendError { - err: SendError { - kind: SendErrorKind::Disconnected, - }, - val: msg, - }) + Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) } /// Send a message on the channel. @@ -858,8 +813,7 @@ impl<T> UnboundedSender<T> { /// This method should only be called after `poll_ready` has been used to /// verify that the channel is ready to receive a message. pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.do_send_nb(msg) - .map_err(|e| e.err) + self.do_send_nb(msg).map_err(|e| e.err) } /// Sends a message along this channel. @@ -888,7 +842,10 @@ impl<T> UnboundedSender<T> { } /// Hashes the receiver into the provided hasher - pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher { + pub fn hash_receiver<H>(&self, hasher: &mut H) + where + H: std::hash::Hasher, + { use std::hash::Hash; let ptr = self.0.as_ref().map(|inner| inner.ptr()); @@ -928,9 +885,7 @@ impl<T> Clone for UnboundedSenderInner<T> { Ok(_) => { // The ABA problem doesn't matter here. We only care that the // number of senders never exceeds the maximum. - return Self { - inner: self.inner.clone(), - }; + return Self { inner: self.inner.clone() }; } Err(actual) => curr = actual, } @@ -1021,19 +976,22 @@ impl<T> Receiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1098,18 +1056,15 @@ impl<T> FusedStream for Receiver<T> { impl<T> Stream for Receiver<T> { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { - // Try to read a message off of the message queue. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { if msg.is_none() { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1169,19 +1124,22 @@ impl<T> UnboundedReceiver<T> { /// only when you've otherwise arranged to be notified when the channel is /// no longer empty. /// - /// This function will panic if called after `try_next` or `poll_next` has - /// returned `None`. + /// This function returns: + /// * `Ok(Some(t))` when message is fetched + /// * `Ok(None)` when channel is closed and no messages left in the queue + /// * `Err(e)` when there are no messages available, but channel is not yet closed pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { match self.next_message() { - Poll::Ready(msg) => { - Ok(msg) - }, + Poll::Ready(msg) => Ok(msg), Poll::Pending => Err(TryRecvError { _priv: () }), } } fn next_message(&mut self) -> Poll<Option<T>> { - let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`"); + let inner = match self.inner.as_mut() { + None => return Poll::Ready(None), + Some(inner) => inner, + }; // Pop off a message match unsafe { inner.message_queue.pop_spin() } { Some(msg) => { @@ -1230,10 +1188,7 @@ impl<T> FusedStream for UnboundedReceiver<T> { impl<T> Stream for UnboundedReceiver<T> { type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { // Try to read a message off of the message queue. match self.next_message() { Poll::Ready(msg) => { @@ -1241,7 +1196,7 @@ impl<T> Stream for UnboundedReceiver<T> { self.inner = None; } Poll::Ready(msg) - }, + } Poll::Pending => { // There are no messages to read, in this case, park. self.inner.as_ref().unwrap().recv_task.register(cx.waker()); @@ -1339,10 +1294,7 @@ impl State { */ fn decode_state(num: usize) -> State { - State { - is_open: num & OPEN_MASK == OPEN_MASK, - num_messages: num & MAX_CAPACITY, - } + State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } } fn encode_state(state: &State) -> usize { diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs index b00e1b1..57dc7f5 100644 --- a/src/mpsc/queue.rs +++ b/src/mpsc/queue.rs @@ -43,10 +43,10 @@ pub(super) use self::PopResult::*; -use std::thread; use std::cell::UnsafeCell; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; +use std::thread; /// A result of the `pop` function. pub(super) enum PopResult<T> { @@ -76,15 +76,12 @@ pub(super) struct Queue<T> { tail: UnsafeCell<*mut Node<T>>, } -unsafe impl<T: Send> Send for Queue<T> { } -unsafe impl<T: Send> Sync for Queue<T> { } +unsafe impl<T: Send> Send for Queue<T> {} +unsafe impl<T: Send> Sync for Queue<T> {} impl<T> Node<T> { unsafe fn new(v: Option<T>) -> *mut Self { - Box::into_raw(Box::new(Self { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - })) + Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v })) } } @@ -93,10 +90,7 @@ impl<T> Queue<T> { /// one consumer. pub(super) fn new() -> Self { let stub = unsafe { Node::new(None) }; - Self { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } + Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } } /// Pushes a new value onto this queue. @@ -133,7 +127,11 @@ impl<T> Queue<T> { return Data(ret); } - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail { + Empty + } else { + Inconsistent + } } /// Pop an element similarly to `pop` function, but spin-wait on inconsistent diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs index 4ce66b4..1be2016 100644 --- a/src/mpsc/sink_impl.rs +++ b/src/mpsc/sink_impl.rs @@ -6,24 +6,15 @@ use std::pin::Pin; impl<T> Sink<T> for Sender<T> { type Error = SendError; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { (*self).poll_ready(cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { (*self).start_send(msg) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -33,10 +24,7 @@ impl<T> Sink<T> for Sender<T> { } } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -45,31 +33,19 @@ impl<T> Sink<T> for Sender<T> { impl<T> Sink<T> for UnboundedSender<T> { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Self::poll_ready(&*self, cx) } - fn start_send( - mut self: Pin<&mut Self>, - msg: T, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { Self::start_send(&mut *self, msg) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.disconnect(); Poll::Ready(Ok(())) } @@ -78,29 +54,19 @@ impl<T> Sink<T> for UnboundedSender<T> { impl<T> Sink<T> for &UnboundedSender<T> { type Error = SendError; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { UnboundedSender::poll_ready(*self, cx) } fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.unbounded_send(msg) - .map_err(TrySendError::into_send_error) + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.close_channel(); Poll::Ready(Ok(())) } diff --git a/src/oneshot.rs b/src/oneshot.rs index dbbce81..5af651b 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -7,7 +7,7 @@ use core::fmt; use core::pin::Pin; use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::SeqCst; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use crate::lock::Lock; @@ -16,7 +16,6 @@ use crate::lock::Lock; /// /// This is created by the [`channel`](channel) function. #[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] pub struct Receiver<T> { inner: Arc<Inner<T>>, } @@ -24,7 +23,6 @@ pub struct Receiver<T> { /// A means of transmitting a single value to another task. /// /// This is created by the [`channel`](channel) function. -#[derive(Debug)] pub struct Sender<T> { inner: Arc<Inner<T>>, } @@ -35,7 +33,6 @@ impl<T> Unpin for Sender<T> {} /// Internal state of the `Receiver`/`Sender` pair above. This is all used as /// the internal synchronization between the two for send/recv operations. -#[derive(Debug)] struct Inner<T> { /// Indicates whether this oneshot is complete yet. This is filled in both /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it @@ -106,12 +103,8 @@ struct Inner<T> { /// ``` pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Arc::new(Inner::new()); - let receiver = Receiver { - inner: inner.clone(), - }; - let sender = Sender { - inner, - }; + let receiver = Receiver { inner: inner.clone() }; + let sender = Sender { inner }; (sender, receiver) } @@ -127,7 +120,7 @@ impl<T> Inner<T> { fn send(&self, t: T) -> Result<(), T> { if self.complete.load(SeqCst) { - return Err(t) + return Err(t); } // Note that this lock acquisition may fail if the receiver @@ -164,7 +157,7 @@ impl<T> Inner<T> { // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. if self.complete.load(SeqCst) { - return Poll::Ready(()) + return Poll::Ready(()); } // If our other half is not gone then we need to park our current task @@ -273,7 +266,10 @@ impl<T> Inner<T> { } else { let task = cx.waker().clone(); match self.rx_task.try_lock() { - Some(mut slot) => { *slot = Some(task); false }, + Some(mut slot) => { + *slot = Some(task); + false + } None => true, } }; @@ -394,6 +390,12 @@ impl<T> Drop for Sender<T> { } } +impl<T: fmt::Debug> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").field("complete", &self.inner.complete).finish() + } +} + /// A future that resolves when the receiving end of a channel has hung up. /// /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). @@ -453,10 +455,7 @@ impl<T> Receiver<T> { impl<T> Future for Receiver<T> { type Output = Result<T, Canceled>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<T, Canceled>> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { self.inner.recv(cx) } } @@ -481,3 +480,9 @@ impl<T> Drop for Receiver<T> { self.inner.drop_rx() } } + +impl<T: fmt::Debug> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").field("complete", &self.inner.complete).finish() + } +} diff --git a/tests/channel.rs b/tests/channel.rs index 73dac64..5f01a8e 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,8 +1,8 @@ use futures::channel::mpsc; use futures::executor::block_on; use futures::future::poll_fn; -use futures::stream::StreamExt; use futures::sink::SinkExt; +use futures::stream::StreamExt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; @@ -11,9 +11,7 @@ fn sequence() { let (tx, rx) = mpsc::channel(1); let amt = 20; - let t = thread::spawn(move || { - block_on(send_sequence(amt, tx)) - }); + let t = thread::spawn(move || block_on(send_sequence(amt, tx))); let list: Vec<_> = block_on(rx.collect()); let mut list = list.into_iter(); for i in (1..=amt).rev() { @@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { fn drop_sender() { let (tx, mut rx) = mpsc::channel::<u32>(1); drop(tx); - let f = poll_fn(|cx| { - rx.poll_next_unpin(cx) - }); + let f = poll_fn(|cx| rx.poll_next_unpin(cx)); assert_eq!(block_on(f), None) } diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 9eb5296..1a14067 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -13,9 +13,7 @@ use std::time::{Duration, Instant}; fn smoke() { let (mut sender, receiver) = mpsc::channel(1); - let t = thread::spawn(move || { - while let Ok(()) = block_on(sender.send(42)) {} - }); + let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); @@ -149,6 +147,7 @@ fn single_receiver_drop_closes_channel_and_drains() { // Stress test that `try_send()`s occurring concurrently with receiver // close/drops don't appear as successful sends. +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_try_send_as_receiver_closes() { const AMT: usize = 10000; @@ -166,7 +165,7 @@ fn stress_try_send_as_receiver_closes() { struct TestRx { rx: mpsc::Receiver<Arc<()>>, // The number of times to query `rx` before dropping it. - poll_count: usize + poll_count: usize, } struct TestTask { command_rx: mpsc::Receiver<TestRx>, @@ -190,14 +189,11 @@ fn stress_try_send_as_receiver_closes() { impl Future for TestTask { type Output = (); - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Poll the test channel, if one is present. if let Some(rx) = &mut self.test_rx { if let Poll::Ready(v) = rx.poll_next_unpin(cx) { - let _ = v.expect("test finished unexpectedly!"); + let _ = v.expect("test finished unexpectedly!"); } self.countdown -= 1; // Busy-poll until the countdown is finished. @@ -209,9 +205,9 @@ fn stress_try_send_as_receiver_closes() { self.test_rx = Some(rx); self.countdown = poll_count; cx.waker().wake_by_ref(); - }, + } Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => {}, + Poll::Pending => {} } if self.countdown == 0 { // Countdown complete -- drop the Receiver. @@ -255,10 +251,14 @@ fn stress_try_send_as_receiver_closes() { if prev_weak.upgrade().is_none() { break; } - assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + assert!( + t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), "item not dropped on iteration {} after \ {} sends ({} successful). spin=({})", - i, attempted_sends, successful_sends, spins + i, + attempted_sends, + successful_sends, + spins ); spins += 1; thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); @@ -273,6 +273,27 @@ fn stress_try_send_as_receiver_closes() { } } drop(cmd_tx); - bg.join() - .expect("background thread join"); + bg.join().expect("background thread join"); +} + +#[test] +fn unbounded_try_next_after_none() { + let (tx, mut rx) = mpsc::unbounded::<String>(); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} + +#[test] +fn bounded_try_next_after_none() { + let (tx, mut rx) = mpsc::channel::<String>(17); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); } diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 61c5a50..da0899d 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -1,13 +1,13 @@ use futures::channel::{mpsc, oneshot}; use futures::executor::{block_on, block_on_stream}; -use futures::future::{FutureExt, poll_fn}; -use futures::stream::{Stream, StreamExt}; +use futures::future::{poll_fn, FutureExt}; +use futures::pin_mut; use futures::sink::{Sink, SinkExt}; +use futures::stream::{Stream, StreamExt}; use futures::task::{Context, Poll}; -use futures::pin_mut; use futures_test::task::{new_count_waker, noop_context}; -use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::thread; trait AssertSend: Send {} @@ -77,7 +77,7 @@ fn send_shared_recv() { fn send_recv_threads() { let (mut tx, rx) = mpsc::channel::<i32>(16); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { block_on(tx.send(1)).unwrap(); }); @@ -200,11 +200,14 @@ fn tx_close_gets_none() { #[test] fn stress_shared_unbounded() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -215,7 +218,7 @@ fn stress_shared_unbounded() { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { for _ in 0..AMT { tx.unbounded_send(1).unwrap(); } @@ -229,11 +232,14 @@ fn stress_shared_unbounded() { #[test] fn stress_shared_bounded_hard() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::<i32>(0); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -259,6 +265,9 @@ fn stress_shared_bounded_hard() { #[allow(clippy::same_item_push)] #[test] fn stress_receiver_multi_task_bounded_hard() { + #[cfg(miri)] + const AMT: usize = 100; + #[cfg(not(miri))] const AMT: usize = 10_000; const NTHREADS: u32 = 2; @@ -297,9 +306,9 @@ fn stress_receiver_multi_task_bounded_hard() { } Poll::Ready(None) => { *rx_opt = None; - break - }, - Poll::Pending => {}, + break; + } + Poll::Pending => {} } } } else { @@ -311,7 +320,6 @@ fn stress_receiver_multi_task_bounded_hard() { th.push(t); } - for i in 0..AMT { block_on(tx.send(i)).unwrap(); } @@ -328,7 +336,12 @@ fn stress_receiver_multi_task_bounded_hard() { /// after sender dropped. #[test] fn stress_drop_sender() { - fn list() -> impl Stream<Item=i32> { + #[cfg(miri)] + const ITER: usize = 100; + #[cfg(not(miri))] + const ITER: usize = 10000; + + fn list() -> impl Stream<Item = i32> { let (tx, rx) = mpsc::channel(1); thread::spawn(move || { block_on(send_one_two_three(tx)); @@ -336,7 +349,7 @@ fn stress_drop_sender() { rx } - for _ in 0..10000 { + for _ in 0..ITER { let v: Vec<_> = block_on(list().collect()); assert_eq!(v, vec![1, 2, 3]); } @@ -381,9 +394,12 @@ fn stress_close_receiver_iter() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_close_receiver() { - for _ in 0..10000 { + const ITER: usize = 10000; + + for _ in 0..ITER { stress_close_receiver_iter(); } } @@ -398,6 +414,9 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { #[allow(clippy::same_item_push)] #[test] fn stress_poll_ready() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 1000; const NTHREADS: u32 = 8; @@ -407,9 +426,7 @@ fn stress_poll_ready() { let mut threads = Vec::new(); for _ in 0..NTHREADS { let sender = tx.clone(); - threads.push(thread::spawn(move || { - block_on(stress_poll_ready_sender(sender, AMT)) - })); + threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); } drop(tx); @@ -427,6 +444,7 @@ fn stress_poll_ready() { stress(16); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn try_send_1() { const N: usize = 3000; @@ -436,7 +454,7 @@ fn try_send_1() { for i in 0..N { loop { if tx.try_send(i).is_ok() { - break + break; } } } @@ -542,8 +560,8 @@ fn is_connected_to() { #[test] fn hash_receiver() { - use std::hash::Hasher; use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; let mut hasher_a1 = DefaultHasher::new(); let mut hasher_a2 = DefaultHasher::new(); diff --git a/tests/oneshot.rs b/tests/oneshot.rs index a22d039..c9f5508 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -1,6 +1,6 @@ use futures::channel::oneshot::{self, Sender}; use futures::executor::block_on; -use futures::future::{FutureExt, poll_fn}; +use futures::future::{poll_fn, FutureExt}; use futures::task::{Context, Poll}; use futures_test::task::panic_waker_ref; use std::sync::mpsc; @@ -35,6 +35,11 @@ fn cancel_notifies() { #[test] fn cancel_lots() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 20000; + let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); let t = thread::spawn(move || { for (mut tx, tx2) in rx { @@ -43,7 +48,7 @@ fn cancel_lots() { } }); - for _ in 0..20000 { + for _ in 0..N { let (otx, orx) = oneshot::channel::<u32>(); let (tx2, rx2) = mpsc::channel(); tx.send((otx, tx2)).unwrap(); @@ -70,7 +75,7 @@ fn close() { rx.close(); block_on(poll_fn(|cx| { match rx.poll_unpin(cx) { - Poll::Ready(Err(_)) => {}, + Poll::Ready(Err(_)) => {} _ => panic!(), }; assert!(tx.poll_canceled(cx).is_ready()); @@ -101,6 +106,11 @@ fn is_canceled() { #[test] fn cancel_sends() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 20000; + let (tx, rx) = mpsc::channel::<Sender<_>>(); let t = thread::spawn(move || { for otx in rx { @@ -108,7 +118,7 @@ fn cancel_sends() { } }); - for _ in 0..20000 { + for _ in 0..N { let (otx, mut orx) = oneshot::channel::<u32>(); tx.send(otx).unwrap(); |