aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-09 20:35:43 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-09 20:35:43 +0000
commitd05d7a7db7e0ed4a206fb9ea2b16ad25080b0297 (patch)
tree4bd872c7931921cfe12cc2fa1b72915778e481d3
parent1454173d5097444c2156243392330c6e638ca8d0 (diff)
parent3ccd22c1882fcff9f5442478123ad654ce0b9105 (diff)
downloadfutures-channel-android13-mainline-media-release.tar.gz
Change-Id: I2fef201d7337211c9c35a7479096c57805a86594
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp6
-rw-r--r--Cargo.toml38
-rw-r--r--Cargo.toml.orig18
-rw-r--r--METADATA8
-rw-r--r--README.md23
-rw-r--r--TEST_MAPPING49
-rw-r--r--benches/sync_mpsc.rs15
-rw-r--r--build.rs41
-rw-r--r--cargo2android.json5
-rw-r--r--no_atomic_cas.rs13
-rw-r--r--src/lib.rs54
-rw-r--r--src/lock.rs7
-rw-r--r--src/mpsc/mod.rs196
-rw-r--r--src/mpsc/queue.rs22
-rw-r--r--src/mpsc/sink_impl.rs58
-rw-r--r--src/oneshot.rs39
-rw-r--r--tests/channel.rs10
-rw-r--r--tests/mpsc-close.rs51
-rw-r--r--tests/mpsc.rs58
-rw-r--r--tests/oneshot.rs18
21 files changed, 381 insertions, 355 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f3ad3ab..deccf0d 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
- }
-}
+ "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ },
+ "path_in_vcs": "futures-channel"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 61e78bc..c7bc955 100644
--- a/Android.bp
+++ b/Android.bp
@@ -45,6 +45,8 @@ rust_library {
name: "libfutures_channel",
host_supported: true,
crate_name: "futures_channel",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -57,11 +59,9 @@ rust_library {
],
apex_available: [
"//apex_available:platform",
+ "com.android.bluetooth",
"com.android.resolv",
"com.android.virt",
],
min_sdk_version: "29",
}
-
-// dependent_library ["feature_list"]
-// futures-core-0.3.14 "alloc,std"
diff --git a/Cargo.toml b/Cargo.toml
index 30ee771..d0a13f6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,32 +3,37 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
+rust-version = "1.45"
name = "futures-channel"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "Channels for asynchronous communication using futures-rs.\n"
+version = "0.3.21"
+description = """
+Channels for asynchronous communication using futures-rs.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-channel/0.3"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
[dependencies.futures-core]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-sink]
-version = "0.3.13"
+version = "0.3.21"
optional = true
default-features = false
@@ -36,8 +41,11 @@ default-features = false
[features]
alloc = ["futures-core/alloc"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
+cfg-target-has-atomic = []
default = ["std"]
sink = ["futures-sink"]
-std = ["alloc", "futures-core/std"]
-unstable = ["futures-core/unstable"]
+std = [
+ "alloc",
+ "futures-core/std",
+]
+unstable = []
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 9a33320..f356eab 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,11 @@
[package]
name = "futures-channel"
+version = "0.3.21"
edition = "2018"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-channel/0.3"
description = """
Channels for asynchronous communication using futures-rs.
"""
@@ -17,15 +16,14 @@ std = ["alloc", "futures-core/std"]
alloc = ["futures-core/alloc"]
sink = ["futures-sink"]
-# Unstable features
-# These features are outside of the normal semver guarantees and require the
-# `unstable` feature as an explicit opt-in to unstable API.
-unstable = ["futures-core/unstable"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
+# These features are no longer used.
+# TODO: remove in the next major version.
+unstable = []
+cfg-target-has-atomic = []
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true }
[dev-dependencies]
futures = { path = "../futures", default-features = true }
diff --git a/METADATA b/METADATA
index b53d840..29bf06e 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate"
+ value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.21.crate"
}
- version: "0.3.13"
+ version: "0.3.21"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 4
+ year: 2022
+ month: 3
day: 1
}
}
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..3287be9
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# futures-channel
+
+Channels for asynchronous communication using futures-rs.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-channel = "0.3"
+```
+
+The current `futures-channel` requires Rust 1.45 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 6798806..5ef61de 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,56 +1,45 @@
// Generated by update_crate_tests.py for tests that depend on this crate.
{
- "presubmit": [
- {
- "name": "anyhow_device_test_tests_test_boxed"
- },
- {
- "name": "anyhow_device_test_tests_test_convert"
- },
- {
- "name": "anyhow_device_test_tests_test_ffi"
- },
- {
- "name": "anyhow_device_test_tests_test_repr"
- },
- {
- "name": "tokio-test_device_test_tests_block_on"
- },
+ "imports": [
{
- "name": "anyhow_device_test_tests_test_chain"
+ "path": "external/rust/crates/anyhow"
},
{
- "name": "anyhow_device_test_tests_test_source"
+ "path": "external/rust/crates/futures-util"
},
{
- "name": "tokio-test_device_test_tests_io"
+ "path": "external/rust/crates/tokio"
},
{
- "name": "anyhow_device_test_tests_test_autotrait"
- },
+ "path": "external/rust/crates/tokio-test"
+ }
+ ],
+ "presubmit": [
{
- "name": "anyhow_device_test_src_lib"
+ "name": "ZipFuseTest"
},
{
- "name": "anyhow_device_test_tests_test_context"
+ "name": "authfs_device_test_src_lib"
},
{
- "name": "anyhow_device_test_tests_test_downcast"
+ "name": "doh_unit_test"
},
{
- "name": "anyhow_device_test_tests_test_macros"
- },
+ "name": "virtualizationservice_device_test"
+ }
+ ],
+ "presubmit-rust": [
{
- "name": "futures-util_device_test_src_lib"
+ "name": "ZipFuseTest"
},
{
- "name": "anyhow_device_test_tests_test_fmt"
+ "name": "authfs_device_test_src_lib"
},
{
- "name": "tokio-test_device_test_tests_macros"
+ "name": "doh_unit_test"
},
{
- "name": "tokio-test_device_test_src_lib"
+ "name": "virtualizationservice_device_test"
}
]
}
diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs
index e22fe60..7c3c3d3 100644
--- a/benches/sync_mpsc.rs
+++ b/benches/sync_mpsc.rs
@@ -7,8 +7,8 @@ use {
futures::{
channel::mpsc::{self, Sender, UnboundedSender},
ready,
- stream::{Stream, StreamExt},
sink::Sink,
+ stream::{Stream, StreamExt},
task::{Context, Poll},
},
futures_test::task::noop_context,
@@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) {
// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
for i in 0..1000 {
-
// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
@@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) {
})
}
-
/// A Stream that continuously sends incrementing number of the queue
struct TestSender {
tx: Sender<u32>,
@@ -84,9 +82,7 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
- -> Poll<Option<Self::Item>>
- {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);
@@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);
- let mut tx: Vec<_> = (0..100).map(|_| {
- TestSender {
- tx: tx.clone(),
- last: 0
- }
- }).collect();
+ let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
for i in 0..10 {
for x in &mut tx {
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..05e0496
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,41 @@
+// The rustc-cfg listed below are considered public API, but it is *unstable*
+// and outside of the normal semver guarantees:
+//
+// - `futures_no_atomic_cas`
+// Assume the target does *not* support atomic CAS operations.
+// This is usually detected automatically by the build script, but you may
+// need to enable it manually when building for custom targets or using
+// non-cargo build systems that don't run the build script.
+//
+// With the exceptions mentioned above, the rustc-cfg emitted by the build
+// script are *not* public API.
+
+#![warn(rust_2018_idioms, single_use_lifetimes)]
+
+use std::env;
+
+include!("no_atomic_cas.rs");
+
+fn main() {
+ let target = match env::var("TARGET") {
+ Ok(target) => target,
+ Err(e) => {
+ println!(
+ "cargo:warning={}: unable to get TARGET environment variable: {}",
+ env!("CARGO_PKG_NAME"),
+ e
+ );
+ return;
+ }
+ };
+
+ // Note that this is `no_*`, not `has_*`. This allows treating
+ // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
+ // run. This is needed for compatibility with non-cargo build systems that
+ // don't run the build script.
+ if NO_ATOMIC_CAS.contains(&&*target) {
+ println!("cargo:rustc-cfg=futures_no_atomic_cas");
+ }
+
+ println!("cargo:rerun-if-changed=no_atomic_cas.rs");
+}
diff --git a/cargo2android.json b/cargo2android.json
index 01465d0..a7e2a4b 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,11 +1,12 @@
{
"apex-available": [
"//apex_available:platform",
+ "com.android.bluetooth",
"com.android.resolv",
"com.android.virt"
],
- "min_sdk_version": "29",
"dependencies": true,
"device": true,
+ "min-sdk-version": "29",
"run": true
-} \ No newline at end of file
+}
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
new file mode 100644
index 0000000..9b05d4b
--- /dev/null
+++ b/no_atomic_cas.rs
@@ -0,0 +1,13 @@
+// This file is @generated by no_atomic_cas.sh.
+// It is not intended for manual editing.
+
+const NO_ATOMIC_CAS: &[&str] = &[
+ "avr-unknown-gnu-atmega328",
+ "bpfeb-unknown-none",
+ "bpfel-unknown-none",
+ "msp430-none-elf",
+ "riscv32i-unknown-none-elf",
+ "riscv32imc-unknown-none-elf",
+ "thumbv4t-none-eabi",
+ "thumbv6m-none-eabi",
+];
diff --git a/src/lib.rs b/src/lib.rs
index 22d90d8..4cd936d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,34 +11,32 @@
//! All items are only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.
-#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
-
#![cfg_attr(not(feature = "std"), no_std)]
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
-// It cannot be included in the published code because this lints have false positives in the minimum required version.
-#![cfg_attr(test, warn(single_use_lifetimes))]
-#![warn(clippy::all)]
-#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-
-#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
-compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
-macro_rules! cfg_target_has_atomic {
- ($($item:item)*) => {$(
- #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
- $item
- )*};
-}
-
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- extern crate alloc;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+extern crate alloc;
- #[cfg(feature = "alloc")]
- mod lock;
- #[cfg(feature = "std")]
- pub mod mpsc;
- #[cfg(feature = "alloc")]
- pub mod oneshot;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod lock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "std")]
+pub mod mpsc;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub mod oneshot;
diff --git a/src/lock.rs b/src/lock.rs
index 5eecdd9..b328d0f 100644
--- a/src/lock.rs
+++ b/src/lock.rs
@@ -6,8 +6,8 @@
use core::cell::UnsafeCell;
use core::ops::{Deref, DerefMut};
-use core::sync::atomic::Ordering::SeqCst;
use core::sync::atomic::AtomicBool;
+use core::sync::atomic::Ordering::SeqCst;
/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
///
@@ -37,10 +37,7 @@ unsafe impl<T: Send> Sync for Lock<T> {}
impl<T> Lock<T> {
/// Creates a new lock around the given value.
pub(crate) fn new(t: T) -> Self {
- Self {
- locked: AtomicBool::new(false),
- data: UnsafeCell::new(t),
- }
+ Self { locked: AtomicBool::new(false), data: UnsafeCell::new(t) }
}
/// Attempts to acquire this lock, returning whether the lock was acquired or
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
index dd50343..44834b7 100644
--- a/src/mpsc/mod.rs
+++ b/src/mpsc/mod.rs
@@ -79,13 +79,13 @@
// by the queue structure.
use futures_core::stream::{FusedStream, Stream};
-use futures_core::task::{Context, Poll, Waker};
use futures_core::task::__internal::AtomicWaker;
+use futures_core::task::{Context, Poll, Waker};
use std::fmt;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
use std::thread;
use crate::mpsc::queue::Queue;
@@ -209,9 +209,7 @@ impl SendError {
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TrySendError")
- .field("kind", &self.err.kind)
- .finish()
+ f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
}
}
@@ -251,8 +249,7 @@ impl<T> TrySendError<T> {
impl fmt::Debug for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_tuple("TryRecvError")
- .finish()
+ f.debug_tuple("TryRecvError").finish()
}
}
@@ -335,10 +332,7 @@ struct SenderTask {
impl SenderTask {
fn new() -> Self {
- Self {
- task: None,
- is_parked: false,
- }
+ Self { task: None, is_parked: false }
}
fn notify(&mut self) {
@@ -381,9 +375,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
maybe_parked: false,
};
- let rx = Receiver {
- inner: Some(inner),
- };
+ let rx = Receiver { inner: Some(inner) };
(Sender(Some(tx)), rx)
}
@@ -399,7 +391,6 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
-
let inner = Arc::new(UnboundedInner {
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
@@ -407,13 +398,9 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
recv_task: AtomicWaker::new(),
});
- let tx = UnboundedSenderInner {
- inner: inner.clone(),
- };
+ let tx = UnboundedSenderInner { inner: inner.clone() };
- let rx = UnboundedReceiver {
- inner: Some(inner),
- };
+ let rx = UnboundedReceiver { inner: Some(inner) };
(UnboundedSender(Some(tx)), rx)
}
@@ -430,13 +417,10 @@ impl<T> UnboundedSenderInner<T> {
if state.is_open {
Poll::Ready(Ok(()))
} else {
- Poll::Ready(Err(SendError {
- kind: SendErrorKind::Disconnected,
- }))
+ Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
}
}
-
// Push message to the queue and signal to the receiver
fn queue_push_and_signal(&self, msg: T) {
// Push the message onto the message queue
@@ -462,16 +446,17 @@ impl<T> UnboundedSenderInner<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
- assert!(state.num_messages < MAX_CAPACITY, "buffer space \
- exhausted; sending this messages would overflow the state");
+ assert!(
+ state.num_messages < MAX_CAPACITY,
+ "buffer space \
+ exhausted; sending this messages would overflow the state"
+ );
state.num_messages += 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
- Ok(_) => {
- return Some(state.num_messages)
- }
+ Ok(_) => return Some(state.num_messages),
Err(actual) => curr = actual,
}
}
@@ -516,12 +501,7 @@ impl<T> BoundedSenderInner<T> {
fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(None).is_ready() {
- return Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Full,
- },
- val: msg,
- });
+ return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
}
// The channel has capacity to accept the message, so send it
@@ -530,11 +510,8 @@ impl<T> BoundedSenderInner<T> {
// Do the send without failing.
// Can be called only by bounded sender.
- #[allow(clippy::debug_assert_with_mut_call)]
- fn do_send_b(&mut self, msg: T)
- -> Result<(), TrySendError<T>>
- {
- // Anyone callig do_send *should* make sure there is room first,
+ fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+ // Anyone calling do_send *should* make sure there is room first,
// but assert here for tests as a sanity check.
debug_assert!(self.poll_unparked(None).is_ready());
@@ -551,12 +528,12 @@ impl<T> BoundedSenderInner<T> {
// the configured buffer size
num_messages > self.inner.buffer
}
- None => return Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Disconnected,
- },
- val: msg,
- }),
+ None => {
+ return Err(TrySendError {
+ err: SendError { kind: SendErrorKind::Disconnected },
+ val: msg,
+ })
+ }
};
// If the channel has reached capacity, then the sender task needs to
@@ -600,16 +577,17 @@ impl<T> BoundedSenderInner<T> {
// This probably is never hit? Odds are the process will run out of
// memory first. It may be worth to return something else in this
// case?
- assert!(state.num_messages < MAX_CAPACITY, "buffer space \
- exhausted; sending this messages would overflow the state");
+ assert!(
+ state.num_messages < MAX_CAPACITY,
+ "buffer space \
+ exhausted; sending this messages would overflow the state"
+ );
state.num_messages += 1;
let next = encode_state(&state);
match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
- Ok(_) => {
- return Some(state.num_messages)
- }
+ Ok(_) => return Some(state.num_messages),
Err(actual) => curr = actual,
}
}
@@ -644,15 +622,10 @@ impl<T> BoundedSenderInner<T> {
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
- fn poll_ready(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), SendError>> {
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
- return Poll::Ready(Err(SendError {
- kind: SendErrorKind::Disconnected,
- }));
+ return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
}
self.poll_unparked(Some(cx)).map(Ok)
@@ -699,7 +672,7 @@ impl<T> BoundedSenderInner<T> {
if !task.is_parked {
self.maybe_parked = false;
- return Poll::Ready(())
+ return Poll::Ready(());
}
// At this point, an unpark request is pending, so there will be an
@@ -724,12 +697,7 @@ impl<T> Sender<T> {
if let Some(inner) = &mut self.0 {
inner.try_send(msg)
} else {
- Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Disconnected,
- },
- val: msg,
- })
+ Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
}
}
@@ -739,8 +707,7 @@ impl<T> Sender<T> {
/// [`poll_ready`](Sender::poll_ready) has reported that the channel is
/// ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
- self.try_send(msg)
- .map_err(|e| e.err)
+ self.try_send(msg).map_err(|e| e.err)
}
/// Polls the channel to determine if there is guaranteed capacity to send
@@ -755,13 +722,8 @@ impl<T> Sender<T> {
/// capacity, in which case the current task is queued to be notified once
/// capacity is available;
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
- pub fn poll_ready(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), SendError>> {
- let inner = self.0.as_mut().ok_or(SendError {
- kind: SendErrorKind::Disconnected,
- })?;
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
inner.poll_ready(cx)
}
@@ -799,7 +761,10 @@ impl<T> Sender<T> {
}
/// Hashes the receiver into the provided hasher
- pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+ pub fn hash_receiver<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
use std::hash::Hash;
let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -809,13 +774,8 @@ impl<T> Sender<T> {
impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
- pub fn poll_ready(
- &self,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), SendError>> {
- let inner = self.0.as_ref().ok_or(SendError {
- kind: SendErrorKind::Disconnected,
- })?;
+ pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
inner.poll_ready_nb()
}
@@ -845,12 +805,7 @@ impl<T> UnboundedSender<T> {
}
}
- Err(TrySendError {
- err: SendError {
- kind: SendErrorKind::Disconnected,
- },
- val: msg,
- })
+ Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
}
/// Send a message on the channel.
@@ -858,8 +813,7 @@ impl<T> UnboundedSender<T> {
/// This method should only be called after `poll_ready` has been used to
/// verify that the channel is ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
- self.do_send_nb(msg)
- .map_err(|e| e.err)
+ self.do_send_nb(msg).map_err(|e| e.err)
}
/// Sends a message along this channel.
@@ -888,7 +842,10 @@ impl<T> UnboundedSender<T> {
}
/// Hashes the receiver into the provided hasher
- pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+ pub fn hash_receiver<H>(&self, hasher: &mut H)
+ where
+ H: std::hash::Hasher,
+ {
use std::hash::Hash;
let ptr = self.0.as_ref().map(|inner| inner.ptr());
@@ -928,9 +885,7 @@ impl<T> Clone for UnboundedSenderInner<T> {
Ok(_) => {
// The ABA problem doesn't matter here. We only care that the
// number of senders never exceeds the maximum.
- return Self {
- inner: self.inner.clone(),
- };
+ return Self { inner: self.inner.clone() };
}
Err(actual) => curr = actual,
}
@@ -1021,19 +976,22 @@ impl<T> Receiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
- /// This function will panic if called after `try_next` or `poll_next` has
- /// returned `None`.
+ /// This function returns:
+ /// * `Ok(Some(t))` when message is fetched
+ /// * `Ok(None)` when channel is closed and no messages left in the queue
+ /// * `Err(e)` when there are no messages available, but channel is not yet closed
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
- Poll::Ready(msg) => {
- Ok(msg)
- },
+ Poll::Ready(msg) => Ok(msg),
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}
fn next_message(&mut self) -> Poll<Option<T>> {
- let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+ let inner = match self.inner.as_mut() {
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
@@ -1098,18 +1056,15 @@ impl<T> FusedStream for Receiver<T> {
impl<T> Stream for Receiver<T> {
type Item = T;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<T>> {
- // Try to read a message off of the message queue.
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ // Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
if msg.is_none() {
self.inner = None;
}
Poll::Ready(msg)
- },
+ }
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1169,19 +1124,22 @@ impl<T> UnboundedReceiver<T> {
/// only when you've otherwise arranged to be notified when the channel is
/// no longer empty.
///
- /// This function will panic if called after `try_next` or `poll_next` has
- /// returned `None`.
+ /// This function returns:
+ /// * `Ok(Some(t))` when message is fetched
+ /// * `Ok(None)` when channel is closed and no messages left in the queue
+ /// * `Err(e)` when there are no messages available, but channel is not yet closed
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
- Poll::Ready(msg) => {
- Ok(msg)
- },
+ Poll::Ready(msg) => Ok(msg),
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}
fn next_message(&mut self) -> Poll<Option<T>> {
- let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+ let inner = match self.inner.as_mut() {
+ None => return Poll::Ready(None),
+ Some(inner) => inner,
+ };
// Pop off a message
match unsafe { inner.message_queue.pop_spin() } {
Some(msg) => {
@@ -1230,10 +1188,7 @@ impl<T> FusedStream for UnboundedReceiver<T> {
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<T>> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
// Try to read a message off of the message queue.
match self.next_message() {
Poll::Ready(msg) => {
@@ -1241,7 +1196,7 @@ impl<T> Stream for UnboundedReceiver<T> {
self.inner = None;
}
Poll::Ready(msg)
- },
+ }
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
@@ -1339,10 +1294,7 @@ impl State {
*/
fn decode_state(num: usize) -> State {
- State {
- is_open: num & OPEN_MASK == OPEN_MASK,
- num_messages: num & MAX_CAPACITY,
- }
+ State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
}
fn encode_state(state: &State) -> usize {
diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs
index b00e1b1..57dc7f5 100644
--- a/src/mpsc/queue.rs
+++ b/src/mpsc/queue.rs
@@ -43,10 +43,10 @@
pub(super) use self::PopResult::*;
-use std::thread;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
+use std::thread;
/// A result of the `pop` function.
pub(super) enum PopResult<T> {
@@ -76,15 +76,12 @@ pub(super) struct Queue<T> {
tail: UnsafeCell<*mut Node<T>>,
}
-unsafe impl<T: Send> Send for Queue<T> { }
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send> Send for Queue<T> {}
+unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Self {
- Box::into_raw(Box::new(Self {
- next: AtomicPtr::new(ptr::null_mut()),
- value: v,
- }))
+ Box::into_raw(Box::new(Self { next: AtomicPtr::new(ptr::null_mut()), value: v }))
}
}
@@ -93,10 +90,7 @@ impl<T> Queue<T> {
/// one consumer.
pub(super) fn new() -> Self {
let stub = unsafe { Node::new(None) };
- Self {
- head: AtomicPtr::new(stub),
- tail: UnsafeCell::new(stub),
- }
+ Self { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}
/// Pushes a new value onto this queue.
@@ -133,7 +127,11 @@ impl<T> Queue<T> {
return Data(ret);
}
- if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+ if self.head.load(Ordering::Acquire) == tail {
+ Empty
+ } else {
+ Inconsistent
+ }
}
/// Pop an element similarly to `pop` function, but spin-wait on inconsistent
diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs
index 4ce66b4..1be2016 100644
--- a/src/mpsc/sink_impl.rs
+++ b/src/mpsc/sink_impl.rs
@@ -6,24 +6,15 @@ use std::pin::Pin;
impl<T> Sink<T> for Sender<T> {
type Error = SendError;
- fn poll_ready(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(*self).poll_ready(cx)
}
- fn start_send(
- mut self: Pin<&mut Self>,
- msg: T,
- ) -> Result<(), Self::Error> {
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
(*self).start_send(msg)
}
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match (*self).poll_ready(cx) {
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
// If the receiver disconnected, we consider the sink to be flushed.
@@ -33,10 +24,7 @@ impl<T> Sink<T> for Sender<T> {
}
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
@@ -45,31 +33,19 @@ impl<T> Sink<T> for Sender<T> {
impl<T> Sink<T> for UnboundedSender<T> {
type Error = SendError;
- fn poll_ready(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Self::poll_ready(&*self, cx)
}
- fn start_send(
- mut self: Pin<&mut Self>,
- msg: T,
- ) -> Result<(), Self::Error> {
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
Self::start_send(&mut *self, msg)
}
- fn poll_flush(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.disconnect();
Poll::Ready(Ok(()))
}
@@ -78,29 +54,19 @@ impl<T> Sink<T> for UnboundedSender<T> {
impl<T> Sink<T> for &UnboundedSender<T> {
type Error = SendError;
- fn poll_ready(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
UnboundedSender::poll_ready(*self, cx)
}
fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.unbounded_send(msg)
- .map_err(TrySendError::into_send_error)
+ self.unbounded_send(msg).map_err(TrySendError::into_send_error)
}
- fn poll_flush(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
- fn poll_close(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close_channel();
Poll::Ready(Ok(()))
}
diff --git a/src/oneshot.rs b/src/oneshot.rs
index dbbce81..5af651b 100644
--- a/src/oneshot.rs
+++ b/src/oneshot.rs
@@ -7,7 +7,7 @@ use core::fmt;
use core::pin::Pin;
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering::SeqCst;
-use futures_core::future::{Future, FusedFuture};
+use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll, Waker};
use crate::lock::Lock;
@@ -16,7 +16,6 @@ use crate::lock::Lock;
///
/// This is created by the [`channel`](channel) function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
-#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
@@ -24,7 +23,6 @@ pub struct Receiver<T> {
/// A means of transmitting a single value to another task.
///
/// This is created by the [`channel`](channel) function.
-#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
@@ -35,7 +33,6 @@ impl<T> Unpin for Sender<T> {}
/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
/// the internal synchronization between the two for send/recv operations.
-#[derive(Debug)]
struct Inner<T> {
/// Indicates whether this oneshot is complete yet. This is filled in both
/// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
@@ -106,12 +103,8 @@ struct Inner<T> {
/// ```
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner::new());
- let receiver = Receiver {
- inner: inner.clone(),
- };
- let sender = Sender {
- inner,
- };
+ let receiver = Receiver { inner: inner.clone() };
+ let sender = Sender { inner };
(sender, receiver)
}
@@ -127,7 +120,7 @@ impl<T> Inner<T> {
fn send(&self, t: T) -> Result<(), T> {
if self.complete.load(SeqCst) {
- return Err(t)
+ return Err(t);
}
// Note that this lock acquisition may fail if the receiver
@@ -164,7 +157,7 @@ impl<T> Inner<T> {
// destructor, but our destructor hasn't run yet so if it's set then the
// oneshot is gone.
if self.complete.load(SeqCst) {
- return Poll::Ready(())
+ return Poll::Ready(());
}
// If our other half is not gone then we need to park our current task
@@ -273,7 +266,10 @@ impl<T> Inner<T> {
} else {
let task = cx.waker().clone();
match self.rx_task.try_lock() {
- Some(mut slot) => { *slot = Some(task); false },
+ Some(mut slot) => {
+ *slot = Some(task);
+ false
+ }
None => true,
}
};
@@ -394,6 +390,12 @@ impl<T> Drop for Sender<T> {
}
}
+impl<T: fmt::Debug> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Sender").field("complete", &self.inner.complete).finish()
+ }
+}
+
/// A future that resolves when the receiving end of a channel has hung up.
///
/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
@@ -453,10 +455,7 @@ impl<T> Receiver<T> {
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<T, Canceled>> {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
self.inner.recv(cx)
}
}
@@ -481,3 +480,9 @@ impl<T> Drop for Receiver<T> {
self.inner.drop_rx()
}
}
+
+impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Receiver").field("complete", &self.inner.complete).finish()
+ }
+}
diff --git a/tests/channel.rs b/tests/channel.rs
index 73dac64..5f01a8e 100644
--- a/tests/channel.rs
+++ b/tests/channel.rs
@@ -1,8 +1,8 @@
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
-use futures::stream::StreamExt;
use futures::sink::SinkExt;
+use futures::stream::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
@@ -11,9 +11,7 @@ fn sequence() {
let (tx, rx) = mpsc::channel(1);
let amt = 20;
- let t = thread::spawn(move || {
- block_on(send_sequence(amt, tx))
- });
+ let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
let list: Vec<_> = block_on(rx.collect());
let mut list = list.into_iter();
for i in (1..=amt).rev() {
@@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
- let f = poll_fn(|cx| {
- rx.poll_next_unpin(cx)
- });
+ let f = poll_fn(|cx| rx.poll_next_unpin(cx));
assert_eq!(block_on(f), None)
}
diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs
index 9eb5296..1a14067 100644
--- a/tests/mpsc-close.rs
+++ b/tests/mpsc-close.rs
@@ -13,9 +13,7 @@ use std::time::{Duration, Instant};
fn smoke() {
let (mut sender, receiver) = mpsc::channel(1);
- let t = thread::spawn(move || {
- while let Ok(()) = block_on(sender.send(42)) {}
- });
+ let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
@@ -149,6 +147,7 @@ fn single_receiver_drop_closes_channel_and_drains() {
// Stress test that `try_send()`s occurring concurrently with receiver
// close/drops don't appear as successful sends.
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_try_send_as_receiver_closes() {
const AMT: usize = 10000;
@@ -166,7 +165,7 @@ fn stress_try_send_as_receiver_closes() {
struct TestRx {
rx: mpsc::Receiver<Arc<()>>,
// The number of times to query `rx` before dropping it.
- poll_count: usize
+ poll_count: usize,
}
struct TestTask {
command_rx: mpsc::Receiver<TestRx>,
@@ -190,14 +189,11 @@ fn stress_try_send_as_receiver_closes() {
impl Future for TestTask {
type Output = ();
- fn poll(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Self::Output> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the test channel, if one is present.
if let Some(rx) = &mut self.test_rx {
if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
- let _ = v.expect("test finished unexpectedly!");
+ let _ = v.expect("test finished unexpectedly!");
}
self.countdown -= 1;
// Busy-poll until the countdown is finished.
@@ -209,9 +205,9 @@ fn stress_try_send_as_receiver_closes() {
self.test_rx = Some(rx);
self.countdown = poll_count;
cx.waker().wake_by_ref();
- },
+ }
Poll::Ready(None) => return Poll::Ready(()),
- Poll::Pending => {},
+ Poll::Pending => {}
}
if self.countdown == 0 {
// Countdown complete -- drop the Receiver.
@@ -255,10 +251,14 @@ fn stress_try_send_as_receiver_closes() {
if prev_weak.upgrade().is_none() {
break;
}
- assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+ assert!(
+ t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
"item not dropped on iteration {} after \
{} sends ({} successful). spin=({})",
- i, attempted_sends, successful_sends, spins
+ i,
+ attempted_sends,
+ successful_sends,
+ spins
);
spins += 1;
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
@@ -273,6 +273,27 @@ fn stress_try_send_as_receiver_closes() {
}
}
drop(cmd_tx);
- bg.join()
- .expect("background thread join");
+ bg.join().expect("background thread join");
+}
+
+#[test]
+fn unbounded_try_next_after_none() {
+ let (tx, mut rx) = mpsc::unbounded::<String>();
+ // Drop the sender, close the channel.
+ drop(tx);
+ // Receive the end of channel.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ // None received, check we can call `try_next` again.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+}
+
+#[test]
+fn bounded_try_next_after_none() {
+ let (tx, mut rx) = mpsc::channel::<String>(17);
+ // Drop the sender, close the channel.
+ drop(tx);
+ // Receive the end of channel.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ // None received, check we can call `try_next` again.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index 61c5a50..da0899d 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -1,13 +1,13 @@
use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
-use futures::future::{FutureExt, poll_fn};
-use futures::stream::{Stream, StreamExt};
+use futures::future::{poll_fn, FutureExt};
+use futures::pin_mut;
use futures::sink::{Sink, SinkExt};
+use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
-use futures::pin_mut;
use futures_test::task::{new_count_waker, noop_context};
-use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
use std::thread;
trait AssertSend: Send {}
@@ -77,7 +77,7 @@ fn send_shared_recv() {
fn send_recv_threads() {
let (mut tx, rx) = mpsc::channel::<i32>(16);
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
block_on(tx.send(1)).unwrap();
});
@@ -200,11 +200,14 @@ fn tx_close_gets_none() {
#[test]
fn stress_shared_unbounded() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
@@ -215,7 +218,7 @@ fn stress_shared_unbounded() {
for _ in 0..NTHREADS {
let tx = tx.clone();
- thread::spawn(move|| {
+ thread::spawn(move || {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
@@ -229,11 +232,14 @@ fn stress_shared_unbounded() {
#[test]
fn stress_shared_bounded_hard() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
@@ -259,6 +265,9 @@ fn stress_shared_bounded_hard() {
#[allow(clippy::same_item_push)]
#[test]
fn stress_receiver_multi_task_bounded_hard() {
+ #[cfg(miri)]
+ const AMT: usize = 100;
+ #[cfg(not(miri))]
const AMT: usize = 10_000;
const NTHREADS: u32 = 2;
@@ -297,9 +306,9 @@ fn stress_receiver_multi_task_bounded_hard() {
}
Poll::Ready(None) => {
*rx_opt = None;
- break
- },
- Poll::Pending => {},
+ break;
+ }
+ Poll::Pending => {}
}
}
} else {
@@ -311,7 +320,6 @@ fn stress_receiver_multi_task_bounded_hard() {
th.push(t);
}
-
for i in 0..AMT {
block_on(tx.send(i)).unwrap();
}
@@ -328,7 +336,12 @@ fn stress_receiver_multi_task_bounded_hard() {
/// after sender dropped.
#[test]
fn stress_drop_sender() {
- fn list() -> impl Stream<Item=i32> {
+ #[cfg(miri)]
+ const ITER: usize = 100;
+ #[cfg(not(miri))]
+ const ITER: usize = 10000;
+
+ fn list() -> impl Stream<Item = i32> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(send_one_two_three(tx));
@@ -336,7 +349,7 @@ fn stress_drop_sender() {
rx
}
- for _ in 0..10000 {
+ for _ in 0..ITER {
let v: Vec<_> = block_on(list().collect());
assert_eq!(v, vec![1, 2, 3]);
}
@@ -381,9 +394,12 @@ fn stress_close_receiver_iter() {
}
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_close_receiver() {
- for _ in 0..10000 {
+ const ITER: usize = 10000;
+
+ for _ in 0..ITER {
stress_close_receiver_iter();
}
}
@@ -398,6 +414,9 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
#[allow(clippy::same_item_push)]
#[test]
fn stress_poll_ready() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 1000;
const NTHREADS: u32 = 8;
@@ -407,9 +426,7 @@ fn stress_poll_ready() {
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
- threads.push(thread::spawn(move || {
- block_on(stress_poll_ready_sender(sender, AMT))
- }));
+ threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
}
drop(tx);
@@ -427,6 +444,7 @@ fn stress_poll_ready() {
stress(16);
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn try_send_1() {
const N: usize = 3000;
@@ -436,7 +454,7 @@ fn try_send_1() {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
- break
+ break;
}
}
}
@@ -542,8 +560,8 @@ fn is_connected_to() {
#[test]
fn hash_receiver() {
- use std::hash::Hasher;
use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
let mut hasher_a1 = DefaultHasher::new();
let mut hasher_a2 = DefaultHasher::new();
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
index a22d039..c9f5508 100644
--- a/tests/oneshot.rs
+++ b/tests/oneshot.rs
@@ -1,6 +1,6 @@
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
-use futures::future::{FutureExt, poll_fn};
+use futures::future::{poll_fn, FutureExt};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::sync::mpsc;
@@ -35,6 +35,11 @@ fn cancel_notifies() {
#[test]
fn cancel_lots() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 20000;
+
let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
let t = thread::spawn(move || {
for (mut tx, tx2) in rx {
@@ -43,7 +48,7 @@ fn cancel_lots() {
}
});
- for _ in 0..20000 {
+ for _ in 0..N {
let (otx, orx) = oneshot::channel::<u32>();
let (tx2, rx2) = mpsc::channel();
tx.send((otx, tx2)).unwrap();
@@ -70,7 +75,7 @@ fn close() {
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
- Poll::Ready(Err(_)) => {},
+ Poll::Ready(Err(_)) => {}
_ => panic!(),
};
assert!(tx.poll_canceled(cx).is_ready());
@@ -101,6 +106,11 @@ fn is_canceled() {
#[test]
fn cancel_sends() {
+ #[cfg(miri)]
+ const N: usize = 100;
+ #[cfg(not(miri))]
+ const N: usize = 20000;
+
let (tx, rx) = mpsc::channel::<Sender<_>>();
let t = thread::spawn(move || {
for otx in rx {
@@ -108,7 +118,7 @@ fn cancel_sends() {
}
});
- for _ in 0..20000 {
+ for _ in 0..N {
let (otx, mut orx) = oneshot::channel::<u32>();
tx.send(otx).unwrap();