aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-06-15 21:43:58 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-06-15 21:43:58 +0000
commit9d8eecbcef7019c85a6aeca985df23facf509e2b (patch)
treeb70bd833b2f3f8450b92506a5a4f7e2a04b09e15
parent2d7e4c7689730f30310ca78b72d3ad78991d27b9 (diff)
parent163c4fabb8b1b35c31fcd4a77f34f24a14093cef (diff)
downloadfutures-channel-android12-mainline-tzdata3-release.tar.gz
Change-Id: Ia292ea706d9827b0c64b5a3554748294cca4a9c0
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp6
-rw-r--r--Cargo.toml38
-rw-r--r--Cargo.toml.orig18
-rw-r--r--METADATA8
-rw-r--r--README.md23
-rw-r--r--TEST_MAPPING49
-rw-r--r--benches/sync_mpsc.rs15
-rw-r--r--build.rs41
-rw-r--r--cargo2android.json5
-rw-r--r--no_atomic_cas.rs13
-rw-r--r--src/lib.rs54
-rw-r--r--src/lock.rs7
-rw-r--r--src/mpsc/mod.rs196
-rw-r--r--src/mpsc/queue.rs22
-rw-r--r--src/mpsc/sink_impl.rs58
-rw-r--r--src/oneshot.rs39
-rw-r--r--tests/channel.rs10
-rw-r--r--tests/mpsc-close.rs51
-rw-r--r--tests/mpsc.rs58
-rw-r--r--tests/oneshot.rs18
21 files changed, 355 insertions, 381 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index deccf0d..f3ad3ab 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,5 @@
{
"git": {
- "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
- },
- "path_in_vcs": "futures-channel"
-} \ No newline at end of file
+ "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
+ }
+}
diff --git a/Android.bp b/Android.bp
index c7bc955..61e78bc 100644
--- a/Android.bp
+++ b/Android.bp
@@ -45,8 +45,6 @@ rust_library {
name: "libfutures_channel",
host_supported: true,
crate_name: "futures_channel",
- cargo_env_compat: true,
- cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -59,9 +57,11 @@ rust_library {
],
apex_available: [
"//apex_available:platform",
- "com.android.bluetooth",
"com.android.resolv",
"com.android.virt",
],
min_sdk_version: "29",
}
+
+// dependent_library ["feature_list"]
+// futures-core-0.3.14 "alloc,std"
diff --git a/Cargo.toml b/Cargo.toml
index d0a13f6..30ee771 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,37 +3,32 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies.
+# to registry (e.g., crates.io) dependencies
#
-# If you are reading this file be aware that the original Cargo.toml
-# will likely look very different (and much more reasonable).
-# See Cargo.toml.orig for the original contents.
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
[package]
edition = "2018"
-rust-version = "1.45"
name = "futures-channel"
-version = "0.3.21"
-description = """
-Channels for asynchronous communication using futures-rs.
-"""
+version = "0.3.13"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
+description = "Channels for asynchronous communication using futures-rs.\n"
homepage = "https://rust-lang.github.io/futures-rs"
+documentation = "https://docs.rs/futures-channel/0.3"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
-
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = [
- "--cfg",
- "docsrs",
-]
-
+rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-core]
-version = "0.3.21"
+version = "0.3.13"
default-features = false
[dependencies.futures-sink]
-version = "0.3.21"
+version = "0.3.13"
optional = true
default-features = false
@@ -41,11 +36,8 @@ default-features = false
[features]
alloc = ["futures-core/alloc"]
-cfg-target-has-atomic = []
+cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
default = ["std"]
sink = ["futures-sink"]
-std = [
- "alloc",
- "futures-core/std",
-]
-unstable = []
+std = ["alloc", "futures-core/std"]
+unstable = ["futures-core/unstable"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index f356eab..9a33320 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,11 +1,12 @@
[package]
name = "futures-channel"
-version = "0.3.21"
edition = "2018"
-rust-version = "1.45"
+version = "0.3.13"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
+documentation = "https://docs.rs/futures-channel/0.3"
description = """
Channels for asynchronous communication using futures-rs.
"""
@@ -16,14 +17,15 @@ std = ["alloc", "futures-core/std"]
alloc = ["futures-core/alloc"]
sink = ["futures-sink"]
-# These features are no longer used.
-# TODO: remove in the next major version.
-unstable = []
-cfg-target-has-atomic = []
+# Unstable features
+# These features are outside of the normal semver guarantees and require the
+# `unstable` feature as an explicit opt-in to unstable API.
+unstable = ["futures-core/unstable"]
+cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true }
[dev-dependencies]
futures = { path = "../futures", default-features = true }
diff --git a/METADATA b/METADATA
index 29bf06e..b53d840 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.21.crate"
+ value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate"
}
- version: "0.3.21"
+ version: "0.3.13"
license_type: NOTICE
last_upgrade_date {
- year: 2022
- month: 3
+ year: 2021
+ month: 4
day: 1
}
}
diff --git a/README.md b/README.md
deleted file mode 100644
index 3287be9..0000000
--- a/README.md
+++ /dev/null
@@ -1,23 +0,0 @@
-# futures-channel
-
-Channels for asynchronous communication using futures-rs.
-
-## Usage
-
-Add this to your `Cargo.toml`:
-
-```toml
-[dependencies]
-futures-channel = "0.3"
-```
-
-The current `futures-channel` requires Rust 1.45 or later.
-
-## License
-
-Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
-[MIT license](LICENSE-MIT) at your option.
-
-Unless you explicitly state otherwise, any contribution intentionally submitted
-for inclusion in the work by you, as defined in the Apache-2.0 license, shall
-be dual licensed as above, without any additional terms or conditions.
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 5ef61de..6798806 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,45 +1,56 @@
// Generated by update_crate_tests.py for tests that depend on this crate.
{
- "imports": [
+ "presubmit": [
{
- "path": "external/rust/crates/anyhow"
+ "name": "anyhow_device_test_tests_test_boxed"
},
{
- "path": "external/rust/crates/futures-util"
+ "name": "anyhow_device_test_tests_test_convert"
},
{
- "path": "external/rust/crates/tokio"
+ "name": "anyhow_device_test_tests_test_ffi"
},
{
- "path": "external/rust/crates/tokio-test"
- }
- ],
- "presubmit": [
+ "name": "anyhow_device_test_tests_test_repr"
+ },
{
- "name": "ZipFuseTest"
+ "name": "tokio-test_device_test_tests_block_on"
},
{
- "name": "authfs_device_test_src_lib"
+ "name": "anyhow_device_test_tests_test_chain"
},
{
- "name": "doh_unit_test"
+ "name": "anyhow_device_test_tests_test_source"
},
{
- "name": "virtualizationservice_device_test"
- }
- ],
- "presubmit-rust": [
+ "name": "tokio-test_device_test_tests_io"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_autotrait"
+ },
+ {
+ "name": "anyhow_device_test_src_lib"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_context"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_downcast"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_macros"
+ },
{
- "name": "ZipFuseTest"
+ "name": "futures-util_device_test_src_lib"
},
{
- "name": "authfs_device_test_src_lib"
+ "name": "anyhow_device_test_tests_test_fmt"
},
{
- "name": "doh_unit_test"
+ "name": "tokio-test_device_test_tests_macros"
},
{
- "name": "virtualizationservice_device_test"
+ "name": "tokio-test_device_test_src_lib"
}
]
}
diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs
index 7c3c3d3..e22fe60 100644
--- a/benches/sync_mpsc.rs
+++ b/benches/sync_mpsc.rs
@@ -7,8 +7,8 @@ use {
futures::{
channel::mpsc::{self, Sender, UnboundedSender},
ready,
- sink::Sink,
stream::{Stream, StreamExt},
+ sink::Sink,
task::{Context, Poll},
},
futures_test::task::noop_context,
@@ -25,6 +25,7 @@ fn unbounded_1_tx(b: &mut Bencher) {
// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
for i in 0..1000 {
+
// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
@@ -72,6 +73,7 @@ fn unbounded_uncontended(b: &mut Bencher) {
})
}
+
/// A Stream that continuously sends incrementing number of the queue
struct TestSender {
tx: Sender<u32>,
@@ -82,7 +84,9 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> Poll<Option<Self::Item>>
+ {
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);
@@ -119,7 +123,12 @@ fn bounded_100_tx(b: &mut Bencher) {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);
- let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
+ let mut tx: Vec<_> = (0..100).map(|_| {
+ TestSender {
+ tx: tx.clone(),
+ last: 0
+ }
+ }).collect();
for i in 0..10 {
for x in &mut tx {
diff --git a/build.rs b/build.rs
deleted file mode 100644
index 05e0496..0000000
--- a/build.rs
+++ /dev/null
@@ -1,41 +0,0 @@
-// The rustc-cfg listed below are considered public API, but it is *unstable*
-// and outside of the normal semver guarantees:
-//
-// - `futures_no_atomic_cas`
-// Assume the target does *not* support atomic CAS operations.
-// This is usually detected automatically by the build script, but you may
-// need to enable it manually when building for custom targets or using
-// non-cargo build systems that don't run the build script.
-//
-// With the exceptions mentioned above, the rustc-cfg emitted by the build
-// script are *not* public API.
-
-#![warn(rust_2018_idioms, single_use_lifetimes)]
-
-use std::env;
-
-include!("no_atomic_cas.rs");
-
-fn main() {
- let target = match env::var("TARGET") {
- Ok(target) => target,
- Err(e) => {
- println!(
- "cargo:warning={}: unable to get TARGET environment variable: {}",
- env!("CARGO_PKG_NAME"),
- e
- );
- return;
- }
- };
-
- // Note that this is `no_*`, not `has_*`. This allows treating
- // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
- // run. This is needed for compatibility with non-cargo build systems that
- // don't run the build script.
- if NO_ATOMIC_CAS.contains(&&*target) {
- println!("cargo:rustc-cfg=futures_no_atomic_cas");
- }
-
- println!("cargo:rerun-if-changed=no_atomic_cas.rs");
-}
diff --git a/cargo2android.json b/cargo2android.json
index a7e2a4b..01465d0 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,12 +1,11 @@
{
"apex-available": [
"//apex_available:platform",
- "com.android.bluetooth",
"com.android.resolv",
"com.android.virt"
],
+ "min_sdk_version": "29",
"dependencies": true,
"device": true,
- "min-sdk-version": "29",
"run": true
-}
+} \ No newline at end of file
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
deleted file mode 100644
index 9b05d4b..0000000
--- a/no_atomic_cas.rs
+++ /dev/null
@@ -1,13 +0,0 @@
-// This file is @generated by no_atomic_cas.sh.
-// It is not intended for manual editing.
-
-const NO_ATOMIC_CAS: &[&str] = &[
- "avr-unknown-gnu-atmega328",
- "bpfeb-unknown-none",
- "bpfel-unknown-none",
- "msp430-none-elf",
- "riscv32i-unknown-none-elf",
- "riscv32imc-unknown-none-elf",
- "thumbv4t-none-eabi",
- "thumbv6m-none-eabi",
-];
diff --git a/src/lib.rs b/src/lib.rs
index 4cd936d..22d90d8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,32 +11,34 @@
//! All items are only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.
+#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
+
#![cfg_attr(not(feature = "std"), no_std)]
-#![warn(
- missing_debug_implementations,
- missing_docs,
- rust_2018_idioms,
- single_use_lifetimes,
- unreachable_pub
-)]
-#![doc(test(
- no_crate_inject,
- attr(
- deny(warnings, rust_2018_idioms, single_use_lifetimes),
- allow(dead_code, unused_assignments, unused_variables)
- )
-))]
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "alloc")]
-extern crate alloc;
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
+// It cannot be included in the published code because this lints have false positives in the minimum required version.
+#![cfg_attr(test, warn(single_use_lifetimes))]
+#![warn(clippy::all)]
+#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
+
+#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
+compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
+
+macro_rules! cfg_target_has_atomic {
+ ($($item:item)*) => {$(
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ $item
+ )*};
+}
+
+cfg_target_has_atomic! {
+ #[cfg(feature = "alloc")]
+ extern crate alloc;
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "alloc")]
-mod lock;
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "std")]
-pub mod mpsc;
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "alloc")]
-pub mod oneshot;
+ #[cfg(feature = "alloc")]
+ mod lock;
+ #[cfg(feature = "std")]
+ pub mod mpsc;
+ #[cfg(feature = "alloc")]
+ pub mod oneshot;
+}
diff --git a/src/lock.rs b/src/lock.rs
index b328d0f..5eecdd9 100644
--- a/src/lock.rs
+++ b/src/lock.rs
@@ -6,8 +6,8 @@
use core::cell::UnsafeCell;
use core::ops::{Deref, DerefMut};
-use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::SeqCst;
+use core::sync::atomic::AtomicBool;
/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
///
@@ -37,7 +37,10 @@ unsafe impl<T: Send> Sync for Lock<T> {}
impl<T> Lock<T> {
/// Creates a new lock around the given value.
pub(crate) fn new(t: T) -> Self {
- Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) }
+ Self {
+ locked: AtomicBool::new(false),
+ data: UnsafeCell::new(t),
+ }
}
/// Attempts to acquire this lock, returning whether the lock was acquired or
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
index 44834b7..dd50343 100644
--- a/src/mpsc/mod.rs
+++ b/src/mpsc/mod.rs
@@ -79,13 +79,13 @@
// by the queue structure.
use futures_core::stream::{FusedStream, Stream};
-use futures_core::task::__internal::AtomicWaker;
use futures_core::task::{Context, Poll, Waker};
+use futures_core::task::__internal::AtomicWaker;
use std::fmt;
use std::pin::Pin;
+use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
-use std::sync::{Arc, Mutex};
use std::thread;
use crate::mpsc::queue::Queue;
@@ -209,7 +209,9 @@ impl SendError {
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
+ f.debug_struct("TrySendError")
+ .field("kind", &self.err.kind)
+ .finish()
}
}
@@ -249,7 +251,8 @@ impl<T> TrySendError<T> {
impl fmt::Debug for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_tuple("TryRecvError").finish()
+ f.debug_tuple("TryRecvError")
+ .finish()
}
}
@@ -332,7 +335,10 @@ struct SenderTask {
impl SenderTask {
fn new() -> Self {
- Self { task: None, is_parked: false }
+ Self {
+ task: None,
+ is_parked: false,
+ }
}
fn notify(&mut self) {
@@ -375,7 +381,9 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
maybe_parked: false,
};
- let rx = Receiver { inner: Some(inner) };
+ let rx = Receiver {
+ inner: Some(inner),
+ };
(Sender(Some(tx)), rx)
}
@@ -391,6 +399,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+
let inner = Arc::new(UnboundedInner {
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
@@ -398,9 +407,13 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
recv_task: AtomicWaker::new(),
});
- let tx = UnboundedSenderInner { inner: inner.clone() };
+ let tx = UnboundedSenderInner {
+ inner: inner.clone(),
+ };
- let rx = UnboundedReceiver { inner: Some(inner) };
+ let rx = UnboundedReceiver {
+ inner: Some(inner),
+ };
(UnboundedSender(Some(tx)), rx)
}
@@ -417,10 +430,13 @@ impl<T> UnboundedSenderInner<T> {
if state.is_open {
Poll::Ready(Ok(()))
} else {
- Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
+ Poll::Ready(Err(SendError {
+ kind: SendErrorKind::Disconnected,
+ }))
}
}
+
// Push message to the queue and signal to the receiver
fn queue_push_and_signal(&self, msg: T) {
// Push the message onto the message queue
@@ -446,17 +462,16 @@ impl<T> UnboundedSenderInner<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
- assert!(
- state.num_messages < MAX_CAPACITY,
- "buffer space \
- exhausted; sending this messages would overflow the state"
- );
+ assert!(state.num_messages < MAX_CAPACITY, "buffer space \
+ exhausted; sending this messages would overflow the state");
state.num_messages += 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
- Ok(_) => return Some(state.num_messages),
+ Ok(_) => {
+ return Some(state.num_messages)
+ }
Err(actual) => curr = actual,
}
}
@@ -501,7 +516,12 @@ impl<T> BoundedSenderInner<T> {
fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(None).is_ready() {
- return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
+ return Err(TrySendError {
+ err: SendError {
+ kind: SendErrorKind::Full,
+ },
+ val: msg,
+ });
}
// The channel has capacity to accept the message, so send it
@@ -510,8 +530,11 @@ impl<T> BoundedSenderInner<T> {
// Do the send without failing.
// Can be called only by bounded sender.
- fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
- // Anyone calling do_send *should* make sure there is room first,
+ #[allow(clippy::debug_assert_with_mut_call)]
+ fn do_send_b(&mut self, msg: T)
+ -> Result<(), TrySendError<T>>
+ {
+ // Anyone callig do_send *should* make sure there is room first,
// but assert here for tests as a sanity check.
debug_assert!(self.poll_unparked(None).is_ready());
@@ -528,12 +551,12 @@ impl<T> BoundedSenderInner<T> {
// the configured buffer size
num_messages > self.inner.buffer
}
- None => {
- return Err(TrySendError {
- err: SendError { kind: SendErrorKind::Disconnected },
- val: msg,
- })
- }
+ None => return Err(TrySendError {
+ err: SendError {
+ kind: SendErrorKind::Disconnected,
+ },
+ val: msg,
+ }),
};
// If the channel has reached capacity, then the sender task needs to
@@ -577,17 +600,16 @@ impl<T> BoundedSenderInner<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
- assert!(
- state.num_messages < MAX_CAPACITY,
- "buffer space \
- exhausted; sending this messages would overflow the state"
- );
+ assert!(state.num_messages < MAX_CAPACITY, "buffer space \
+ exhausted; sending this messages would overflow the state");
state.num_messages += 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
- Ok(_) => return Some(state.num_messages),
+ Ok(_) => {
+ return Some(state.num_messages)
+ }
Err(actual) => curr = actual,
}
}
@@ -622,10 +644,15 @@ impl<T> BoundedSenderInner<T> {
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ fn poll_ready(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
- return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
+ return Poll::Ready(Err(SendError {
+ kind: SendErrorKind::Disconnected,
+ }));
}
self.poll_unparked(Some(cx)).map(Ok)
@@ -672,7 +699,7 @@ impl<T> BoundedSenderInner<T> {
if !task.is_parked {
self.maybe_parked = false;
- return Poll::Ready(());
+ return Poll::Ready(())
}
// At this point, an unpark request is pending, so there will be an
@@ -697,7 +724,12 @@ impl<T> Sender<T> {
if let Some(inner) = &mut self.0 {
inner.try_send(msg)
} else {
- Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
+ Err(TrySendError {
+ err: SendError {
+ kind: SendErrorKind::Disconnected,
+ },
+ val: msg,
+ })
}
}
@@ -707,7 +739,8 @@ impl<T> Sender<T> {
/// [`poll_ready`](Sender::poll_ready) has reported that the channel is
/// ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
- self.try_send(msg).map_err(|e| e.err)
+ self.try_send(msg)
+ .map_err(|e| e.err)
}
/// Polls the channel to determine if there is guaranteed capacity to send
@@ -722,8 +755,13 @@ impl<T> Sender<T> {
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
- let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
+ pub fn poll_ready(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_mut().ok_or(SendError {
+ kind: SendErrorKind::Disconnected,
+ })?;
inner.poll_ready(cx)
}
@@ -761,10 +799,7 @@ impl<T> Sender<T> {
}
/// Hashes the receiver into the provided hasher
- pub fn hash_receiver<H>(&self, hasher: &mut H)
- where
- H: std::hash::Hasher,
- {
+ pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
use std::hash::Hash;
let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -774,8 +809,13 @@ impl<T> Sender<T> {
impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
- pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
- let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
+ pub fn poll_ready(
+ &self,
+ _: &mut Context<'_>,
+ ) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_ref().ok_or(SendError {
+ kind: SendErrorKind::Disconnected,
+ })?;
inner.poll_ready_nb()
}
@@ -805,7 +845,12 @@ impl<T> UnboundedSender<T> {
}
}
- Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
+ Err(TrySendError {
+ err: SendError {
+ kind: SendErrorKind::Disconnected,
+ },
+ val: msg,
+ })
}
/// Send a message on the channel.
@@ -813,7 +858,8 @@ impl<T> UnboundedSender<T> {
/// This method should only be called after `poll_ready` has been used to
/// verify that the channel is ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
- self.do_send_nb(msg).map_err(|e| e.err)
+ self.do_send_nb(msg)
+ .map_err(|e| e.err)
}
/// Sends a message along this channel.
@@ -842,10 +888,7 @@ impl<T> UnboundedSender<T> {
}
/// Hashes the receiver into the provided hasher
- pub fn hash_receiver<H>(&self, hasher: &mut H)
- where
- H: std::hash::Hasher,
- {
+ pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
use std::hash::Hash;
let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -885,7 +928,9 @@ impl<T> Clone for UnboundedSenderInner<T> {
Ok(_) => {
// The ABA problem doesn't matter here. We only care that the
// number of senders never exceeds the maximum.
- return Self { inner: self.inner.clone() };
+ return Self {
+ inner: self.inner.clone(),
+ };
}
Err(actual) => curr = actual,
}
@@ -976,22 +1021,19 @@ impl<T> Receiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
- /// This function returns:
- /// * `Ok(Some(t))` when message is fetched
- /// * `Ok(None)` when channel is closed and no messages left in the queue
- /// * `Err(e)` when there are no messages available, but channel is not yet closed
+ /// This function will panic if called after `try_next` or `poll_next` has
+ /// returned `None`.
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
- Poll::Ready(msg) => Ok(msg),
+ Poll::Ready(msg) => {
+ Ok(msg)
+ },
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}
fn next_message(&mut self) -> Poll<Option<T>> {
- let inner = match self.inner.as_mut() {
- None => return Poll::Ready(None),
- Some(inner) => inner,
- };
+ let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
@@ -1056,15 +1098,18 @@ impl<T> FusedStream for Receiver<T> {
impl<T> Stream for Receiver<T> {
type Item = T;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- // Try to read a message off of the message queue.
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T>> {
+ // Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
if msg.is_none() {
self.inner = None;
}
Poll::Ready(msg)
- }
+ },
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1124,22 +1169,19 @@ impl<T> UnboundedReceiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
- /// This function returns:
- /// * `Ok(Some(t))` when message is fetched
- /// * `Ok(None)` when channel is closed and no messages left in the queue
- /// * `Err(e)` when there are no messages available, but channel is not yet closed
+ /// This function will panic if called after `try_next` or `poll_next` has
+ /// returned `None`.
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
- Poll::Ready(msg) => Ok(msg),
+ Poll::Ready(msg) => {
+ Ok(msg)
+ },
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}
fn next_message(&mut self) -> Poll<Option<T>> {
- let inner = match self.inner.as_mut() {
- None => return Poll::Ready(None),
- Some(inner) => inner,
- };
+ let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
@@ -1188,7 +1230,10 @@ impl<T> FusedStream for UnboundedReceiver<T> {
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T>> {
// Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
@@ -1196,7 +1241,7 @@ impl<T> Stream for UnboundedReceiver<T> {
self.inner = None;
}
Poll::Ready(msg)
- }
+ },
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1294,7 +1339,10 @@ impl State {
*/
fn decode_state(num: usize) -> State {
- State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
+ State {
+ is_open: num & OPEN_MASK == OPEN_MASK,
+ num_messages: num & MAX_CAPACITY,
+ }
}
fn encode_state(state: &State) -> usize {
diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs
index 57dc7f5..b00e1b1 100644
--- a/src/mpsc/queue.rs
+++ b/src/mpsc/queue.rs
@@ -43,10 +43,10 @@
pub(super) use self::PopResult::*;
+use std::thread;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
-use std::thread;
/// A result of the `pop` function.
pub(super) enum PopResult<T> {
@@ -76,12 +76,15 @@ pub(super) struct Queue<T> {
tail: UnsafeCell<*mut Node<T>>,
}
-unsafe impl<T: Send> Send for Queue<T> {}
-unsafe impl<T: Send> Sync for Queue<T> {}
+unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send> Sync for Queue<T> { }
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Self {
- Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v }))
+ Box::into_raw(Box::new(Self {
+ next: AtomicPtr::new(ptr::null_mut()),
+ value: v,
+ }))
}
}
@@ -90,7 +93,10 @@ impl<T> Queue<T> {
/// one consumer.
pub(super) fn new() -> Self {
let stub = unsafe { Node::new(None) };
- Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
+ Self {
+ head: AtomicPtr::new(stub),
+ tail: UnsafeCell::new(stub),
+ }
}
/// Pushes a new value onto this queue.
@@ -127,11 +133,7 @@ impl<T> Queue<T> {
return Data(ret);
}
- if self.head.load(Ordering::Acquire) == tail {
- Empty
- } else {
- Inconsistent
- }
+ if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
}
/// Pop an element similarly to `pop` function, but spin-wait on inconsistent
diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs
index 1be2016..4ce66b4 100644
--- a/src/mpsc/sink_impl.rs
+++ b/src/mpsc/sink_impl.rs
@@ -6,15 +6,24 @@ use std::pin::Pin;
impl<T> Sink<T> for Sender<T> {
type Error = SendError;
- fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
(*self).poll_ready(cx)
}
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ fn start_send(
+ mut self: Pin<&mut Self>,
+ msg: T,
+ ) -> Result<(), Self::Error> {
(*self).start_send(msg)
}
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
match (*self).poll_ready(cx) {
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
// If the receiver disconnected, we consider the sink to be flushed.
@@ -24,7 +33,10 @@ impl<T> Sink<T> for Sender<T> {
}
}
- fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
@@ -33,19 +45,31 @@ impl<T> Sink<T> for Sender<T> {
impl<T> Sink<T> for UnboundedSender<T> {
type Error = SendError;
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
Self::poll_ready(&*self, cx)
}
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ fn start_send(
+ mut self: Pin<&mut Self>,
+ msg: T,
+ ) -> Result<(), Self::Error> {
Self::start_send(&mut *self, msg)
}
- fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
- fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
@@ -54,19 +78,29 @@ impl<T> Sink<T> for UnboundedSender<T> {
impl<T> Sink<T> for &UnboundedSender<T> {
type Error = SendError;
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
UnboundedSender::poll_ready(*self, cx)
}
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.unbounded_send(msg).map_err(TrySendError::into_send_error)
+ self.unbounded_send(msg)
+ .map_err(TrySendError::into_send_error)
}
- fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
- fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
self.close_channel();
Poll::Ready(Ok(()))
}
diff --git a/src/oneshot.rs b/src/oneshot.rs
index 5af651b..dbbce81 100644
--- a/src/oneshot.rs
+++ b/src/oneshot.rs
@@ -7,7 +7,7 @@ use core::fmt;
use core::pin::Pin;
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::SeqCst;
-use futures_core::future::{FusedFuture, Future};
+use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll, Waker};
use crate::lock::Lock;
@@ -16,6 +16,7 @@ use crate::lock::Lock;
///
/// This is created by the [`channel`](channel) function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
@@ -23,6 +24,7 @@ pub struct Receiver<T> {
/// A means of transmitting a single value to another task.
///
/// This is created by the [`channel`](channel) function.
+#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
@@ -33,6 +35,7 @@ impl<T> Unpin for Sender<T> {}
/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
/// the internal synchronization between the two for send/recv operations.
+#[derive(Debug)]
struct Inner<T> {
/// Indicates whether this oneshot is complete yet. This is filled in both
/// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
@@ -103,8 +106,12 @@ struct Inner<T> {
/// ```
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner::new());
- let receiver = Receiver { inner: inner.clone() };
- let sender = Sender { inner };
+ let receiver = Receiver {
+ inner: inner.clone(),
+ };
+ let sender = Sender {
+ inner,
+ };
(sender, receiver)
}
@@ -120,7 +127,7 @@ impl<T> Inner<T> {
fn send(&self, t: T) -> Result<(), T> {
if self.complete.load(SeqCst) {
- return Err(t);
+ return Err(t)
}
// Note that this lock acquisition may fail if the receiver
@@ -157,7 +164,7 @@ impl<T> Inner<T> {
// destructor, but our destructor hasn't run yet so if it's set then the
// oneshot is gone.
if self.complete.load(SeqCst) {
- return Poll::Ready(());
+ return Poll::Ready(())
}
// If our other half is not gone then we need to park our current task
@@ -266,10 +273,7 @@ impl<T> Inner<T> {
} else {
let task = cx.waker().clone();
match self.rx_task.try_lock() {
- Some(mut slot) => {
- *slot = Some(task);
- false
- }
+ Some(mut slot) => { *slot = Some(task); false },
None => true,
}
};
@@ -390,12 +394,6 @@ impl<T> Drop for Sender<T> {
}
}
-impl<T: fmt::Debug> fmt::Debug for Sender<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Sender").field("complete", &self.inner.complete).finish()
- }
-}
-
/// A future that resolves when the receiving end of a channel has hung up.
///
/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
@@ -455,7 +453,10 @@ impl<T> Receiver<T> {
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<T, Canceled>> {
self.inner.recv(cx)
}
}
@@ -480,9 +481,3 @@ impl<T> Drop for Receiver<T> {
self.inner.drop_rx()
}
}
-
-impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Receiver").field("complete", &self.inner.complete).finish()
- }
-}
diff --git a/tests/channel.rs b/tests/channel.rs
index 5f01a8e..73dac64 100644
--- a/tests/channel.rs
+++ b/tests/channel.rs
@@ -1,8 +1,8 @@
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
-use futures::sink::SinkExt;
use futures::stream::StreamExt;
+use futures::sink::SinkExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
@@ -11,7 +11,9 @@ fn sequence() {
let (tx, rx) = mpsc::channel(1);
let amt = 20;
- let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
+ let t = thread::spawn(move || {
+ block_on(send_sequence(amt, tx))
+ });
let list: Vec<_> = block_on(rx.collect());
let mut list = list.into_iter();
for i in (1..=amt).rev() {
@@ -32,7 +34,9 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
- let f = poll_fn(|cx| rx.poll_next_unpin(cx));
+ let f = poll_fn(|cx| {
+ rx.poll_next_unpin(cx)
+ });
assert_eq!(block_on(f), None)
}
diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs
index 1a14067..9eb5296 100644
--- a/tests/mpsc-close.rs
+++ b/tests/mpsc-close.rs
@@ -13,7 +13,9 @@ use std::time::{Duration, Instant};
fn smoke() {
let (mut sender, receiver) = mpsc::channel(1);
- let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
+ let t = thread::spawn(move || {
+ while let Ok(()) = block_on(sender.send(42)) {}
+ });
// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
@@ -147,7 +149,6 @@ fn single_receiver_drop_closes_channel_and_drains() {
// Stress test that `try_send()`s occurring concurrently with receiver
// close/drops don't appear as successful sends.
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_try_send_as_receiver_closes() {
const AMT: usize = 10000;
@@ -165,7 +166,7 @@ fn stress_try_send_as_receiver_closes() {
struct TestRx {
rx: mpsc::Receiver<Arc<()>>,
// The number of times to query `rx` before dropping it.
- poll_count: usize,
+ poll_count: usize
}
struct TestTask {
command_rx: mpsc::Receiver<TestRx>,
@@ -189,11 +190,14 @@ fn stress_try_send_as_receiver_closes() {
impl Future for TestTask {
type Output = ();
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
// Poll the test channel, if one is present.
if let Some(rx) = &mut self.test_rx {
if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
- let _ = v.expect("test finished unexpectedly!");
+ let _ = v.expect("test finished unexpectedly!");
}
self.countdown -= 1;
// Busy-poll until the countdown is finished.
@@ -205,9 +209,9 @@ fn stress_try_send_as_receiver_closes() {
self.test_rx = Some(rx);
self.countdown = poll_count;
cx.waker().wake_by_ref();
- }
+ },
Poll::Ready(None) => return Poll::Ready(()),
- Poll::Pending => {}
+ Poll::Pending => {},
}
if self.countdown == 0 {
// Countdown complete -- drop the Receiver.
@@ -251,14 +255,10 @@ fn stress_try_send_as_receiver_closes() {
if prev_weak.upgrade().is_none() {
break;
}
- assert!(
- t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+ assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
"item not dropped on iteration {} after \
{} sends ({} successful). spin=({})",
- i,
- attempted_sends,
- successful_sends,
- spins
+ i, attempted_sends, successful_sends, spins
);
spins += 1;
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
@@ -273,27 +273,6 @@ fn stress_try_send_as_receiver_closes() {
}
}
drop(cmd_tx);
- bg.join().expect("background thread join");
-}
-
-#[test]
-fn unbounded_try_next_after_none() {
- let (tx, mut rx) = mpsc::unbounded::<String>();
- // Drop the sender, close the channel.
- drop(tx);
- // Receive the end of channel.
- assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
- // None received, check we can call `try_next` again.
- assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
-}
-
-#[test]
-fn bounded_try_next_after_none() {
- let (tx, mut rx) = mpsc::channel::<String>(17);
- // Drop the sender, close the channel.
- drop(tx);
- // Receive the end of channel.
- assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
- // None received, check we can call `try_next` again.
- assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ bg.join()
+ .expect("background thread join");
}
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index da0899d..61c5a50 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -1,13 +1,13 @@
use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
-use futures::future::{poll_fn, FutureExt};
-use futures::pin_mut;
-use futures::sink::{Sink, SinkExt};
+use futures::future::{FutureExt, poll_fn};
use futures::stream::{Stream, StreamExt};
+use futures::sink::{Sink, SinkExt};
use futures::task::{Context, Poll};
+use futures::pin_mut;
use futures_test::task::{new_count_waker, noop_context};
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
trait AssertSend: Send {}
@@ -77,7 +77,7 @@ fn send_shared_recv() {
fn send_recv_threads() {
let (mut tx, rx) = mpsc::channel::<i32>(16);
- let t = thread::spawn(move || {
+ let t = thread::spawn(move|| {
block_on(tx.send(1)).unwrap();
});
@@ -200,14 +200,11 @@ fn tx_close_gets_none() {
#[test]
fn stress_shared_unbounded() {
- #[cfg(miri)]
- const AMT: u32 = 100;
- #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();
- let t = thread::spawn(move || {
+ let t = thread::spawn(move|| {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
@@ -218,7 +215,7 @@ fn stress_shared_unbounded() {
for _ in 0..NTHREADS {
let tx = tx.clone();
- thread::spawn(move || {
+ thread::spawn(move|| {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
@@ -232,14 +229,11 @@ fn stress_shared_unbounded() {
#[test]
fn stress_shared_bounded_hard() {
- #[cfg(miri)]
- const AMT: u32 = 100;
- #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);
- let t = thread::spawn(move || {
+ let t = thread::spawn(move|| {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
@@ -265,9 +259,6 @@ fn stress_shared_bounded_hard() {
#[allow(clippy::same_item_push)]
#[test]
fn stress_receiver_multi_task_bounded_hard() {
- #[cfg(miri)]
- const AMT: usize = 100;
- #[cfg(not(miri))]
const AMT: usize = 10_000;
const NTHREADS: u32 = 2;
@@ -306,9 +297,9 @@ fn stress_receiver_multi_task_bounded_hard() {
}
Poll::Ready(None) => {
*rx_opt = None;
- break;
- }
- Poll::Pending => {}
+ break
+ },
+ Poll::Pending => {},
}
}
} else {
@@ -320,6 +311,7 @@ fn stress_receiver_multi_task_bounded_hard() {
th.push(t);
}
+
for i in 0..AMT {
block_on(tx.send(i)).unwrap();
}
@@ -336,12 +328,7 @@ fn stress_receiver_multi_task_bounded_hard() {
/// after sender dropped.
#[test]
fn stress_drop_sender() {
- #[cfg(miri)]
- const ITER: usize = 100;
- #[cfg(not(miri))]
- const ITER: usize = 10000;
-
- fn list() -> impl Stream<Item = i32> {
+ fn list() -> impl Stream<Item=i32> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(send_one_two_three(tx));
@@ -349,7 +336,7 @@ fn stress_drop_sender() {
rx
}
- for _ in 0..ITER {
+ for _ in 0..10000 {
let v: Vec<_> = block_on(list().collect());
assert_eq!(v, vec![1, 2, 3]);
}
@@ -394,12 +381,9 @@ fn stress_close_receiver_iter() {
}
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_close_receiver() {
- const ITER: usize = 10000;
-
- for _ in 0..ITER {
+ for _ in 0..10000 {
stress_close_receiver_iter();
}
}
@@ -414,9 +398,6 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
#[allow(clippy::same_item_push)]
#[test]
fn stress_poll_ready() {
- #[cfg(miri)]
- const AMT: u32 = 100;
- #[cfg(not(miri))]
const AMT: u32 = 1000;
const NTHREADS: u32 = 8;
@@ -426,7 +407,9 @@ fn stress_poll_ready() {
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
- threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
+ threads.push(thread::spawn(move || {
+ block_on(stress_poll_ready_sender(sender, AMT))
+ }));
}
drop(tx);
@@ -444,7 +427,6 @@ fn stress_poll_ready() {
stress(16);
}
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn try_send_1() {
const N: usize = 3000;
@@ -454,7 +436,7 @@ fn try_send_1() {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
- break;
+ break
}
}
}
@@ -560,8 +542,8 @@ fn is_connected_to() {
#[test]
fn hash_receiver() {
- use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
+ use std::collections::hash_map::DefaultHasher;
let mut hasher_a1 = DefaultHasher::new();
let mut hasher_a2 = DefaultHasher::new();
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
index c9f5508..a22d039 100644
--- a/tests/oneshot.rs
+++ b/tests/oneshot.rs
@@ -1,6 +1,6 @@
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
-use futures::future::{poll_fn, FutureExt};
+use futures::future::{FutureExt, poll_fn};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::sync::mpsc;
@@ -35,11 +35,6 @@ fn cancel_notifies() {
#[test]
fn cancel_lots() {
- #[cfg(miri)]
- const N: usize = 100;
- #[cfg(not(miri))]
- const N: usize = 20000;
-
let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
let t = thread::spawn(move || {
for (mut tx, tx2) in rx {
@@ -48,7 +43,7 @@ fn cancel_lots() {
}
});
- for _ in 0..N {
+ for _ in 0..20000 {
let (otx, orx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
tx.send((otx, tx2)).unwrap();
@@ -75,7 +70,7 @@ fn close() {
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
- Poll::Ready(Err(_)) => {}
+ Poll::Ready(Err(_)) => {},
_ => panic!(),
};
assert!(tx.poll_canceled(cx).is_ready());
@@ -106,11 +101,6 @@ fn is_canceled() {
#[test]
fn cancel_sends() {
- #[cfg(miri)]
- const N: usize = 100;
- #[cfg(not(miri))]
- const N: usize = 20000;
-
let (tx, rx) = mpsc::channel::<Sender<_>>();
let t = thread::spawn(move || {
for otx in rx {
@@ -118,7 +108,7 @@ fn cancel_sends() {
}
});
- for _ in 0..N {
+ for _ in 0..20000 {
let (otx, mut orx) = oneshot::channel::<u32>();
tx.send(otx).unwrap();