aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2023-02-06 16:18:44 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-02-06 16:18:44 +0000
commit65647f91f23b1650e6783e09c178803ec888efb4 (patch)
treed585fc992b047e0d1fb217f44c39d241d2c81c72
parent9fd106efa0a96f20540601c4e3ac9a10594b1282 (diff)
parenta77df93fec1b865e952e7a931ea3cd129aca6a46 (diff)
downloadtokio-65647f91f23b1650e6783e09c178803ec888efb4.tar.gz
Upgrade tokio to 1.25.0 am: a77df93fec
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/2421506 Change-Id: I6550d7e74439e71f0b98771360133f82b75ff675 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp2
-rw-r--r--CHANGELOG.md112
-rw-r--r--Cargo.toml6
-rw-r--r--Cargo.toml.orig6
-rw-r--r--LICENSE2
-rw-r--r--METADATA10
-rw-r--r--README.md2
-rw-r--r--build.rs53
-rw-r--r--docs/reactor-refactor.md4
-rw-r--r--patches/panic_unwind_tests.patch45
-rw-r--r--src/fs/file/tests.rs8
-rw-r--r--src/fs/read_dir.rs91
-rw-r--r--src/io/blocking.rs2
-rw-r--r--src/io/split.rs5
-rw-r--r--src/io/stdio_common.rs3
-rw-r--r--src/io/util/read.rs2
-rw-r--r--src/lib.rs15
-rw-r--r--src/loom/mocked.rs7
-rw-r--r--src/loom/std/atomic_u64.rs9
-rw-r--r--src/loom/std/atomic_u64_as_mutex.rs18
-rw-r--r--src/loom/std/atomic_u64_native.rs4
-rw-r--r--src/loom/std/atomic_u64_static_const_new.rs12
-rw-r--r--src/loom/std/atomic_u64_static_once_cell.rs57
-rw-r--r--src/loom/std/mod.rs24
-rw-r--r--src/macros/cfg.rs30
-rw-r--r--src/net/tcp/listener.rs13
-rw-r--r--src/net/tcp/socket.rs11
-rw-r--r--src/net/tcp/split_owned.rs4
-rw-r--r--src/net/tcp/stream.rs13
-rw-r--r--src/net/udp.rs13
-rw-r--r--src/net/unix/datagram/socket.rs31
-rw-r--r--src/net/unix/listener.rs46
-rw-r--r--src/net/unix/split_owned.rs4
-rw-r--r--src/net/unix/stream.rs28
-rw-r--r--src/net/windows/named_pipe.rs54
-rw-r--r--src/process/unix/mod.rs2
-rw-r--r--src/process/unix/orphan.rs2
-rw-r--r--src/runtime/blocking/mod.rs2
-rw-r--r--src/runtime/blocking/pool.rs9
-rw-r--r--src/runtime/blocking/schedule.rs49
-rw-r--r--src/runtime/builder.rs26
-rw-r--r--src/runtime/context.rs22
-rw-r--r--src/runtime/driver.rs9
-rw-r--r--src/runtime/handle.rs6
-rw-r--r--src/runtime/io/mod.rs16
-rw-r--r--src/runtime/io/registration.rs26
-rw-r--r--src/runtime/io/scheduled_io.rs43
-rw-r--r--src/runtime/metrics/batch.rs13
-rw-r--r--src/runtime/metrics/mock.rs1
-rw-r--r--src/runtime/metrics/runtime.rs59
-rw-r--r--src/runtime/metrics/worker.rs6
-rw-r--r--src/runtime/mod.rs3
-rw-r--r--src/runtime/runtime.rs6
-rw-r--r--src/runtime/scheduler/multi_thread/queue.rs3
-rw-r--r--src/runtime/task/harness.rs49
-rw-r--r--src/runtime/task/id.rs87
-rw-r--r--src/runtime/task/join.rs6
-rw-r--r--src/runtime/task/mod.rs136
-rw-r--r--src/runtime/task/state.rs8
-rw-r--r--src/runtime/tests/loom_blocking.rs21
-rw-r--r--src/runtime/tests/loom_queue.rs2
-rw-r--r--src/runtime/tests/mod.rs20
-rw-r--r--src/runtime/tests/task.rs2
-rw-r--r--src/runtime/thread_id.rs31
-rw-r--r--src/runtime/time/mod.rs2
-rw-r--r--src/signal/registry.rs5
-rw-r--r--src/signal/unix.rs3
-rw-r--r--src/sync/broadcast.rs98
-rw-r--r--src/sync/mod.rs8
-rw-r--r--src/sync/mpsc/block.rs143
-rw-r--r--src/sync/mpsc/list.rs2
-rw-r--r--src/sync/mpsc/mod.rs6
-rw-r--r--src/task/join_set.rs24
-rw-r--r--src/task/local.rs47
-rw-r--r--src/task/spawn.rs6
-rw-r--r--src/time/clock.rs19
-rw-r--r--src/time/sleep.rs2
-rw-r--r--src/util/linked_list.rs2
-rw-r--r--src/util/mod.rs7
-rw-r--r--src/util/once_cell.rs4
-rw-r--r--tests/_require_full.rs8
-rw-r--r--tests/buffered.rs4
-rw-r--r--tests/io_driver.rs2
-rw-r--r--tests/macros_join.rs2
-rw-r--r--tests/macros_select.rs2
-rw-r--r--tests/macros_try_join.rs2
-rw-r--r--tests/rt_common.rs2
-rw-r--r--tests/rt_metrics.rs19
-rw-r--r--tests/support/leaked_buffers.rs6
-rw-r--r--tests/support/panic.rs8
-rw-r--r--tests/sync_broadcast.rs60
-rw-r--r--tests/sync_once_cell.rs355
-rw-r--r--tests/task_blocking.rs83
-rw-r--r--tests/task_join_set.rs95
-rw-r--r--tests/tcp_peek.rs2
96 files changed, 1716 insertions, 725 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 624eb53..fb46d1e 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "3ce5a2681c734e134c2aa6d6cf91b8d2631bd82b"
+ "sha1": "88b1eb54fb66461b9f3524f4b5316241a019279a"
},
"path_in_vcs": "tokio"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index c430388..7d066e6 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,7 +23,7 @@ rust_library {
host_supported: true,
crate_name: "tokio",
cargo_env_compat: true,
- cargo_pkg_version: "1.23.0",
+ cargo_pkg_version: "1.25.0",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f930dfc..39a57fd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,75 @@
+# 1.25.0 (January 28, 2023)
+
+### Fixed
+
+- rt: fix runtime metrics reporting ([#5330])
+
+### Added
+
+- sync: add `broadcast::Sender::len` ([#5343])
+
+### Changed
+
+- fs: increase maximum read buffer size to 2MiB ([#5397])
+
+[#5330]: https://github.com/tokio-rs/tokio/pull/5330
+[#5343]: https://github.com/tokio-rs/tokio/pull/5343
+[#5397]: https://github.com/tokio-rs/tokio/pull/5397
+
+# 1.24.2 (January 17, 2023)
+
+Forward ports 1.18.5 changes.
+
+### Fixed
+
+- io: fix unsoundness in `ReadHalf::unsplit` ([#5375])
+
+[#5375]: https://github.com/tokio-rs/tokio/pull/5375
+
+# 1.24.1 (January 6, 2022)
+
+This release fixes a compilation failure on targets without `AtomicU64` when using rustc older than 1.63. ([#5356])
+
+[#5356]: https://github.com/tokio-rs/tokio/pull/5356
+
+# 1.24.0 (January 5, 2022)
+
+### Fixed
+ - rt: improve native `AtomicU64` support detection ([#5284])
+
+### Added
+ - rt: add configuration option for max number of I/O events polled from the OS
+ per tick ([#5186])
+ - rt: add an environment variable for configuring the default number of worker
+ threads per runtime instance ([#4250])
+
+### Changed
+ - sync: reduce MPSC channel stack usage ([#5294])
+ - io: reduce lock contention in I/O operations ([#5300])
+ - fs: speed up `read_dir()` by chunking operations ([#5309])
+ - rt: use internal `ThreadId` implementation ([#5329])
+ - test: don't auto-advance time when a `spawn_blocking` task is running ([#5115])
+
+[#5186]: https://github.com/tokio-rs/tokio/pull/5186
+[#5294]: https://github.com/tokio-rs/tokio/pull/5294
+[#5284]: https://github.com/tokio-rs/tokio/pull/5284
+[#4250]: https://github.com/tokio-rs/tokio/pull/4250
+[#5300]: https://github.com/tokio-rs/tokio/pull/5300
+[#5329]: https://github.com/tokio-rs/tokio/pull/5329
+[#5115]: https://github.com/tokio-rs/tokio/pull/5115
+[#5309]: https://github.com/tokio-rs/tokio/pull/5309
+
+# 1.23.1 (January 4, 2022)
+
+This release forward ports changes from 1.18.4.
+
+### Fixed
+
+- net: fix Windows named pipe server builder to maintain option when toggling
+ pipe mode ([#5336]).
+
+[#5336]: https://github.com/tokio-rs/tokio/pull/5336
+
# 1.23.0 (December 5, 2022)
### Fixed
@@ -262,6 +334,27 @@ wasm32-wasi target is given unstable support for the `net` feature.
[#4956]: https://github.com/tokio-rs/tokio/pull/4956
[#4959]: https://github.com/tokio-rs/tokio/pull/4959
+# 1.20.4 (January 17, 2023)
+
+Forward ports 1.18.5 changes.
+
+### Fixed
+
+- io: fix unsoundness in `ReadHalf::unsplit` ([#5375])
+
+[#5375]: https://github.com/tokio-rs/tokio/pull/5375
+
+# 1.20.3 (January 3, 2022)
+
+This release forward ports changes from 1.18.4.
+
+### Fixed
+
+- net: fix Windows named pipe server builder to maintain option when toggling
+ pipe mode ([#5336]).
+
+[#5336]: https://github.com/tokio-rs/tokio/pull/5336
+
# 1.20.2 (September 27, 2022)
This release removes the dependency on the `once_cell` crate to restore the MSRV
@@ -387,6 +480,23 @@ This release fixes a bug in `Notified::enable`. ([#4747])
[#4729]: https://github.com/tokio-rs/tokio/pull/4729
[#4739]: https://github.com/tokio-rs/tokio/pull/4739
+# 1.18.5 (January 17, 2023)
+
+### Fixed
+
+- io: fix unsoundness in `ReadHalf::unsplit` ([#5375])
+
+[#5375]: https://github.com/tokio-rs/tokio/pull/5375
+
+# 1.18.4 (January 3, 2022)
+
+### Fixed
+
+- net: fix Windows named pipe server builder to maintain option when toggling
+ pipe mode ([#5336]).
+
+[#5336]: https://github.com/tokio-rs/tokio/pull/5336
+
# 1.18.3 (September 27, 2022)
This release removes the dependency on the `once_cell` crate to restore the MSRV
@@ -514,7 +624,7 @@ performance improvements.
- time: use bit manipulation instead of modulo to improve performance ([#4480])
- net: use `std::future::Ready` instead of our own `Ready` future ([#4271])
- replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop` ([#4491])
-- fix miri failures in intrusive linked lists ([#4397])
+- fix miri failures in intrusive linked lists ([#4397])
### Documented
diff --git a/Cargo.toml b/Cargo.toml
index 4c10770..2ea9473 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
edition = "2018"
rust-version = "1.49"
name = "tokio"
-version = "1.23.0"
+version = "1.25.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = """
An event-driven, non-blocking I/O platform for writing asynchronous I/O
@@ -203,7 +203,7 @@ version = "1"
version = "0.4"
[target."cfg(target_os = \"freebsd\")".dev-dependencies.mio-aio]
-version = "0.6.0"
+version = "0.7.0"
features = ["tokio"]
[target."cfg(tokio_unstable)".dependencies.tracing]
@@ -224,7 +224,7 @@ optional = true
version = "0.2.42"
[target."cfg(unix)".dev-dependencies.nix]
-version = "0.24"
+version = "0.26"
features = [
"fs",
"socket",
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 78cba81..0f6d30a 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
-version = "1.23.0"
+version = "1.25.0"
edition = "2018"
rust-version = "1.49"
authors = ["Tokio Contributors <team@tokio.rs>"]
@@ -122,7 +122,7 @@ signal-hook-registry = { version = "1.1.1", optional = true }
[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
-nix = { version = "0.24", default-features = false, features = ["fs", "socket"] }
+nix = { version = "0.26", default-features = false, features = ["fs", "socket"] }
[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.42.0"
@@ -157,7 +157,7 @@ rand = "0.8.0"
wasm-bindgen-test = "0.3.0"
[target.'cfg(target_os = "freebsd")'.dev-dependencies]
-mio-aio = { version = "0.6.0", features = ["tokio"] }
+mio-aio = { version = "0.7.0", features = ["tokio"] }
[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5.2", features = ["futures", "checkpoint"] }
diff --git a/LICENSE b/LICENSE
index 8af5baf..8bdf6bd 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2022 Tokio Contributors
+Copyright (c) 2023 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 4be4653..b8a5d8a 100644
--- a/METADATA
+++ b/METADATA
@@ -11,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio/tokio-1.23.0.crate"
+ value: "https://static.crates.io/crates/tokio/tokio-1.25.0.crate"
}
- version: "1.23.0"
+ version: "1.25.0"
license_type: NOTICE
last_upgrade_date {
- year: 2022
- month: 12
- day: 12
+ year: 2023
+ month: 2
+ day: 6
}
}
diff --git a/README.md b/README.md
index 3e51cf5..462e6e8 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:
```toml
[dependencies]
-tokio = { version = "1.23.0", features = ["full"] }
+tokio = { version = "1.25.0", features = ["full"] }
```
Then, on your main.rs:
diff --git a/build.rs b/build.rs
index 93b0509..ddade28 100644
--- a/build.rs
+++ b/build.rs
@@ -24,10 +24,26 @@ const CONST_MUTEX_NEW_PROBE: &str = r#"
}
"#;
+const TARGET_HAS_ATOMIC_PROBE: &str = r#"
+{
+ #[cfg(target_has_atomic = "ptr")]
+ let _ = ();
+}
+"#;
+
+const TARGET_ATOMIC_U64_PROBE: &str = r#"
+{
+ #[allow(unused_imports)]
+ use std::sync::atomic::AtomicU64 as _;
+}
+"#;
+
fn main() {
let mut enable_const_thread_local = false;
let mut enable_addr_of = false;
+ let mut enable_target_has_atomic = false;
let mut enable_const_mutex_new = false;
+ let mut target_needs_atomic_u64_fallback = false;
match AutoCfg::new() {
Ok(ac) => {
@@ -66,6 +82,27 @@ fn main() {
}
}
+ // The `target_has_atomic` cfg was stabilized in 1.60.
+ if ac.probe_rustc_version(1, 61) {
+ enable_target_has_atomic = true;
+ } else if ac.probe_rustc_version(1, 60) {
+ // This compiler claims to be 1.60, but there are some nightly
+ // compilers that claim to be 1.60 without supporting the
+ // feature. Explicitly probe to check if code using them
+ // compiles.
+ //
+ // The oldest nightly that supports the feature is 2022-02-11.
+ if ac.probe_expression(TARGET_HAS_ATOMIC_PROBE) {
+ enable_target_has_atomic = true;
+ }
+ }
+
+ // If we can't tell using `target_has_atomic`, tell if the target
+ // has `AtomicU64` by trying to use it.
+ if !enable_target_has_atomic && !ac.probe_expression(TARGET_ATOMIC_U64_PROBE) {
+ target_needs_atomic_u64_fallback = true;
+ }
+
// The `Mutex::new` method was made const in 1.63.
if ac.probe_rustc_version(1, 64) {
enable_const_mutex_new = true;
@@ -109,6 +146,14 @@ fn main() {
autocfg::emit("tokio_no_addr_of")
}
+ if !enable_target_has_atomic {
+ // To disable this feature on compilers that support it, you can
+ // explicitly pass this flag with the following environment variable:
+ //
+ // RUSTFLAGS="--cfg tokio_no_target_has_atomic"
+ autocfg::emit("tokio_no_target_has_atomic")
+ }
+
if !enable_const_mutex_new {
// To disable this feature on compilers that support it, you can
// explicitly pass this flag with the following environment variable:
@@ -117,6 +162,14 @@ fn main() {
autocfg::emit("tokio_no_const_mutex_new")
}
+ if target_needs_atomic_u64_fallback {
+ // To disable this feature on compilers that support it, you can
+ // explicitly pass this flag with the following environment variable:
+ //
+ // RUSTFLAGS="--cfg tokio_no_atomic_u64"
+ autocfg::emit("tokio_no_atomic_u64")
+ }
+
let target = ::std::env::var("TARGET").unwrap_or_default();
// We emit cfgs instead of using `target_family = "wasm"` that requires Rust 1.54.
diff --git a/docs/reactor-refactor.md b/docs/reactor-refactor.md
index 3005afc..77e64f4 100644
--- a/docs/reactor-refactor.md
+++ b/docs/reactor-refactor.md
@@ -188,12 +188,12 @@ readiness, the driver's tick is packed into the atomic `usize`.
The `ScheduledIo` readiness `AtomicUsize` is structured as:
```
-| reserved | generation | driver tick | readiness |
+| shutdown | generation | driver tick | readiness |
|----------+------------+--------------+-----------|
| 1 bit | 7 bits + 8 bits + 16 bits |
```
-The `reserved` and `generation` components exist today.
+The `shutdown` and `generation` components exist today.
The `readiness()` function returns a `ReadyEvent` value. This value includes the
`tick` component read with the resource's readiness value. When
diff --git a/patches/panic_unwind_tests.patch b/patches/panic_unwind_tests.patch
index adf431e..c11065b 100644
--- a/patches/panic_unwind_tests.patch
+++ b/patches/panic_unwind_tests.patch
@@ -1,24 +1,53 @@
+diff --git a/patches/panic_unwind_tests.patch b/patches/panic_unwind_tests.patch
+index adf431e..e69de29 100644
+--- a/patches/panic_unwind_tests.patch
++++ b/patches/panic_unwind_tests.patch
+@@ -1,24 +0,0 @@
+-diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
+-index 9aa3484..53ee7d8 100644
+---- a/tests/sync_broadcast.rs
+-+++ b/tests/sync_broadcast.rs
+-@@ -292,6 +292,7 @@ fn capacity_too_big() {
+-
+- #[test]
+-+#[cfg(panic = "unwind")]
+- #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
+- fn panic_in_clone() {
+- use std::panic::{self, AssertUnwindSafe};
+-
+-diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs
+-index 34f9b78..e8eacce 100644
+---- a/tests/sync_watch.rs
+-+++ b/tests/sync_watch.rs
+-@@ -214,6 +214,7 @@ fn reopened_after_subscribe() {
+-
+- #[test]
+-+#[cfg(panic = "unwind")]
+- #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
+- fn send_modify_panic() {
+- let (tx, mut rx) = watch::channel("one");
+-
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
-index 9aa3484..53ee7d8 100644
+index 67c378b..cd66924 100644
--- a/tests/sync_broadcast.rs
+++ b/tests/sync_broadcast.rs
-@@ -292,6 +292,7 @@ fn capacity_too_big() {
-
+@@ -291,6 +291,7 @@ fn capacity_too_big() {
+ }
+
#[test]
+#[cfg(panic = "unwind")]
#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn panic_in_clone() {
use std::panic::{self, AssertUnwindSafe};
-
diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs
-index 34f9b78..e8eacce 100644
+index 34f9b78..d4f8ce8 100644
--- a/tests/sync_watch.rs
+++ b/tests/sync_watch.rs
-@@ -214,6 +214,7 @@ fn reopened_after_subscribe() {
-
+@@ -213,6 +213,7 @@ fn reopened_after_subscribe() {
+ }
+
#[test]
+#[cfg(panic = "unwind")]
#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn send_modify_panic() {
let (tx, mut rx) = watch::channel("one");
-
diff --git a/src/fs/file/tests.rs b/src/fs/file/tests.rs
index 1c90a8d..7c61b3c 100644
--- a/src/fs/file/tests.rs
+++ b/src/fs/file/tests.rs
@@ -231,12 +231,12 @@ fn flush_while_idle() {
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn read_with_buffer_larger_than_max() {
// Chunks
- let chunk_a = 16 * 1024;
+ let chunk_a = crate::io::blocking::MAX_BUF;
let chunk_b = chunk_a * 2;
let chunk_c = chunk_a * 3;
let chunk_d = chunk_a * 4;
- assert_eq!(chunk_d / 1024, 64);
+ assert_eq!(chunk_d / 1024 / 1024, 8);
let mut data = vec![];
for i in 0..(chunk_d - 1) {
@@ -303,12 +303,12 @@ fn read_with_buffer_larger_than_max() {
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn write_with_buffer_larger_than_max() {
// Chunks
- let chunk_a = 16 * 1024;
+ let chunk_a = crate::io::blocking::MAX_BUF;
let chunk_b = chunk_a * 2;
let chunk_c = chunk_a * 3;
let chunk_d = chunk_a * 4;
- assert_eq!(chunk_d / 1024, 64);
+ assert_eq!(chunk_d / 1024 / 1024, 8);
let mut data = vec![];
for i in 0..(chunk_d - 1) {
diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs
index 10ad150..9471e8c 100644
--- a/src/fs/read_dir.rs
+++ b/src/fs/read_dir.rs
@@ -1,5 +1,6 @@
use crate::fs::asyncify;
+use std::collections::VecDeque;
use std::ffi::OsString;
use std::fs::{FileType, Metadata};
use std::future::Future;
@@ -19,6 +20,8 @@ use crate::blocking::spawn_blocking;
#[cfg(not(test))]
use crate::blocking::JoinHandle;
+const CHUNK_SIZE: usize = 32;
+
/// Returns a stream over the entries within a directory.
///
/// This is an async version of [`std::fs::read_dir`](std::fs::read_dir)
@@ -29,9 +32,14 @@ use crate::blocking::JoinHandle;
/// [`spawn_blocking`]: crate::task::spawn_blocking
pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
let path = path.as_ref().to_owned();
- let std = asyncify(|| std::fs::read_dir(path)).await?;
+ asyncify(|| -> io::Result<ReadDir> {
+ let mut std = std::fs::read_dir(path)?;
+ let mut buf = VecDeque::with_capacity(CHUNK_SIZE);
+ ReadDir::next_chunk(&mut buf, &mut std);
- Ok(ReadDir(State::Idle(Some(std))))
+ Ok(ReadDir(State::Idle(Some((buf, std)))))
+ })
+ .await
}
/// Reads the entries in a directory.
@@ -58,8 +66,8 @@ pub struct ReadDir(State);
#[derive(Debug)]
enum State {
- Idle(Option<std::fs::ReadDir>),
- Pending(JoinHandle<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>),
+ Idle(Option<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir)>),
+ Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir)>),
}
impl ReadDir {
@@ -94,29 +102,57 @@ impl ReadDir {
pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
loop {
match self.0 {
- State::Idle(ref mut std) => {
- let mut std = std.take().unwrap();
+ State::Idle(ref mut data) => {
+ let (buf, _) = data.as_mut().unwrap();
+
+ if let Some(ent) = buf.pop_front() {
+ return Poll::Ready(ent.map(Some));
+ };
+
+ let (mut buf, mut std) = data.take().unwrap();
self.0 = State::Pending(spawn_blocking(move || {
- let ret = std.next();
- (ret, std)
+ ReadDir::next_chunk(&mut buf, &mut std);
+ (buf, std)
}));
}
State::Pending(ref mut rx) => {
- let (ret, std) = ready!(Pin::new(rx).poll(cx))?;
- self.0 = State::Idle(Some(std));
+ let (mut buf, std) = ready!(Pin::new(rx).poll(cx))?;
- let ret = match ret {
- Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))),
+ let ret = match buf.pop_front() {
+ Some(Ok(x)) => Ok(Some(x)),
Some(Err(e)) => Err(e),
None => Ok(None),
};
+ self.0 = State::Idle(Some((buf, std)));
+
return Poll::Ready(ret);
}
}
}
}
+
+ fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut std::fs::ReadDir) {
+ for ret in std.by_ref().take(CHUNK_SIZE) {
+ let success = ret.is_ok();
+
+ buf.push_back(ret.map(|std| DirEntry {
+ #[cfg(not(any(
+ target_os = "solaris",
+ target_os = "illumos",
+ target_os = "haiku",
+ target_os = "vxworks"
+ )))]
+ file_type: std.file_type().ok(),
+ std: Arc::new(std),
+ }));
+
+ if !success {
+ break;
+ }
+ }
+ }
}
feature! {
@@ -160,7 +196,16 @@ feature! {
/// filesystem. Each entry can be inspected via methods to learn about the full
/// path or possibly other metadata through per-platform extension traits.
#[derive(Debug)]
-pub struct DirEntry(Arc<std::fs::DirEntry>);
+pub struct DirEntry {
+ #[cfg(not(any(
+ target_os = "solaris",
+ target_os = "illumos",
+ target_os = "haiku",
+ target_os = "vxworks"
+ )))]
+ file_type: Option<FileType>,
+ std: Arc<std::fs::DirEntry>,
+}
impl DirEntry {
/// Returns the full path to the file that this entry represents.
@@ -193,7 +238,7 @@ impl DirEntry {
///
/// The exact text, of course, depends on what files you have in `.`.
pub fn path(&self) -> PathBuf {
- self.0.path()
+ self.std.path()
}
/// Returns the bare file name of this directory entry without any other
@@ -214,7 +259,7 @@ impl DirEntry {
/// # }
/// ```
pub fn file_name(&self) -> OsString {
- self.0.file_name()
+ self.std.file_name()
}
/// Returns the metadata for the file that this entry points at.
@@ -248,7 +293,7 @@ impl DirEntry {
/// # }
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
- let std = self.0.clone();
+ let std = self.std.clone();
asyncify(move || std.metadata()).await
}
@@ -283,13 +328,23 @@ impl DirEntry {
/// # }
/// ```
pub async fn file_type(&self) -> io::Result<FileType> {
- let std = self.0.clone();
+ #[cfg(not(any(
+ target_os = "solaris",
+ target_os = "illumos",
+ target_os = "haiku",
+ target_os = "vxworks"
+ )))]
+ if let Some(file_type) = self.file_type {
+ return Ok(file_type);
+ }
+
+ let std = self.std.clone();
asyncify(move || std.file_type()).await
}
/// Returns a reference to the underlying `std::fs::DirEntry`.
#[cfg(unix)]
pub(super) fn as_inner(&self) -> &std::fs::DirEntry {
- &self.0
+ &self.std
}
}
diff --git a/src/io/blocking.rs b/src/io/blocking.rs
index f6db450..416573e 100644
--- a/src/io/blocking.rs
+++ b/src/io/blocking.rs
@@ -26,7 +26,7 @@ pub(crate) struct Buf {
pos: usize,
}
-pub(crate) const MAX_BUF: usize = 16 * 1024;
+pub(crate) const MAX_BUF: usize = 2 * 1024 * 1024;
#[derive(Debug)]
enum State<T> {
diff --git a/src/io/split.rs b/src/io/split.rs
index 2e0da95..f067b65 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -75,7 +75,10 @@ impl<T> ReadHalf<T> {
/// This can be checked ahead of time by comparing the stream ID
/// of the two halves.
#[track_caller]
- pub fn unsplit(self, wr: WriteHalf<T>) -> T {
+ pub fn unsplit(self, wr: WriteHalf<T>) -> T
+ where
+ T: Unpin,
+ {
if self.is_pair_of(&wr) {
drop(wr);
diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs
index 2715ba7..b1cc61d 100644
--- a/src/io/stdio_common.rs
+++ b/src/io/stdio_common.rs
@@ -108,14 +108,13 @@ where
#[cfg(test)]
#[cfg(not(loom))]
mod tests {
+ use crate::io::blocking::MAX_BUF;
use crate::io::AsyncWriteExt;
use std::io;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
- const MAX_BUF: usize = 16 * 1024;
-
struct TextMockWriter;
impl crate::io::AsyncWrite for TextMockWriter {
diff --git a/src/io/util/read.rs b/src/io/util/read.rs
index edc9d5a..a1f9c8a 100644
--- a/src/io/util/read.rs
+++ b/src/io/util/read.rs
@@ -48,7 +48,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = self.project();
- let mut buf = ReadBuf::new(*me.buf);
+ let mut buf = ReadBuf::new(me.buf);
ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
diff --git a/src/lib.rs b/src/lib.rs
index e745fe9..05767d0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -174,12 +174,15 @@
//! swapping the currently running task on each thread. However, this kind of
//! swapping can only happen at `.await` points, so code that spends a long time
//! without reaching an `.await` will prevent other tasks from running. To
-//! combat this, Tokio provides two kinds of threads: Core threads and blocking
-//! threads. The core threads are where all asynchronous code runs, and Tokio
-//! will by default spawn one for each CPU core. The blocking threads are
-//! spawned on demand, can be used to run blocking code that would otherwise
-//! block other tasks from running and are kept alive when not used for a certain
-//! amount of time which can be configured with [`thread_keep_alive`].
+//! combat this, Tokio provides two kinds of threads: Core threads and blocking threads.
+//!
+//! The core threads are where all asynchronous code runs, and Tokio will by default
+//! spawn one for each CPU core. You can use the environment variable `TOKIO_WORKER_THREADS`
+//! to override the default value.
+//!
+//! The blocking threads are spawned on demand, can be used to run blocking code
+//! that would otherwise block other tasks from running and are kept alive when
+//! not used for a certain amount of time which can be configured with [`thread_keep_alive`].
//! Since it is not possible for Tokio to swap out blocking tasks, like it
//! can do with asynchronous code, the upper limit on the number of blocking
//! threads is very large. These limits can be configured on the [`Builder`].
diff --git a/src/loom/mocked.rs b/src/loom/mocked.rs
index 1c4a32d..56dc1a0 100644
--- a/src/loom/mocked.rs
+++ b/src/loom/mocked.rs
@@ -25,6 +25,13 @@ pub(crate) mod sync {
}
}
pub(crate) use loom::sync::*;
+
+ pub(crate) mod atomic {
+ pub(crate) use loom::sync::atomic::*;
+
+ // TODO: implement a loom version
+ pub(crate) type StaticAtomicU64 = std::sync::atomic::AtomicU64;
+ }
}
pub(crate) mod rand {
diff --git a/src/loom/std/atomic_u64.rs b/src/loom/std/atomic_u64.rs
index 5d1d8a8..ce391be 100644
--- a/src/loom/std/atomic_u64.rs
+++ b/src/loom/std/atomic_u64.rs
@@ -7,12 +7,13 @@
// `#[cfg(target_has_atomic = "64")]`.
// Refs: https://github.com/rust-lang/rust/tree/master/src/librustc_target
cfg_has_atomic_u64! {
- pub(crate) use std::sync::atomic::AtomicU64;
+ #[path = "atomic_u64_native.rs"]
+ mod imp;
}
cfg_not_has_atomic_u64! {
#[path = "atomic_u64_as_mutex.rs"]
- mod atomic_u64_as_mutex;
-
- pub(crate) use atomic_u64_as_mutex::AtomicU64;
+ mod imp;
}
+
+pub(crate) use imp::{AtomicU64, StaticAtomicU64};
diff --git a/src/loom/std/atomic_u64_as_mutex.rs b/src/loom/std/atomic_u64_as_mutex.rs
index 84ddff0..9b3b6fa 100644
--- a/src/loom/std/atomic_u64_as_mutex.rs
+++ b/src/loom/std/atomic_u64_as_mutex.rs
@@ -1,18 +1,24 @@
use crate::loom::sync::Mutex;
use std::sync::atomic::Ordering;
+cfg_has_const_mutex_new! {
+ #[path = "atomic_u64_static_const_new.rs"]
+ mod static_macro;
+}
+
+cfg_not_has_const_mutex_new! {
+ #[path = "atomic_u64_static_once_cell.rs"]
+ mod static_macro;
+}
+
+pub(crate) use static_macro::StaticAtomicU64;
+
#[derive(Debug)]
pub(crate) struct AtomicU64 {
inner: Mutex<u64>,
}
impl AtomicU64 {
- pub(crate) fn new(val: u64) -> Self {
- Self {
- inner: Mutex::new(val),
- }
- }
-
pub(crate) fn load(&self, _: Ordering) -> u64 {
*self.inner.lock()
}
diff --git a/src/loom/std/atomic_u64_native.rs b/src/loom/std/atomic_u64_native.rs
new file mode 100644
index 0000000..08adb28
--- /dev/null
+++ b/src/loom/std/atomic_u64_native.rs
@@ -0,0 +1,4 @@
+pub(crate) use std::sync::atomic::{AtomicU64, Ordering};
+
+/// Alias `AtomicU64` to `StaticAtomicU64`
+pub(crate) type StaticAtomicU64 = AtomicU64;
diff --git a/src/loom/std/atomic_u64_static_const_new.rs b/src/loom/std/atomic_u64_static_const_new.rs
new file mode 100644
index 0000000..a421534
--- /dev/null
+++ b/src/loom/std/atomic_u64_static_const_new.rs
@@ -0,0 +1,12 @@
+use super::AtomicU64;
+use crate::loom::sync::Mutex;
+
+pub(crate) type StaticAtomicU64 = AtomicU64;
+
+impl AtomicU64 {
+ pub(crate) const fn new(val: u64) -> Self {
+ Self {
+ inner: Mutex::const_new(val),
+ }
+ }
+}
diff --git a/src/loom/std/atomic_u64_static_once_cell.rs b/src/loom/std/atomic_u64_static_once_cell.rs
new file mode 100644
index 0000000..40c6172
--- /dev/null
+++ b/src/loom/std/atomic_u64_static_once_cell.rs
@@ -0,0 +1,57 @@
+use super::AtomicU64;
+use crate::loom::sync::{atomic::Ordering, Mutex};
+use crate::util::once_cell::OnceCell;
+
+pub(crate) struct StaticAtomicU64 {
+ init: u64,
+ cell: OnceCell<Mutex<u64>>,
+}
+
+impl AtomicU64 {
+ pub(crate) fn new(val: u64) -> Self {
+ Self {
+ inner: Mutex::new(val),
+ }
+ }
+}
+
+impl StaticAtomicU64 {
+ pub(crate) const fn new(val: u64) -> StaticAtomicU64 {
+ StaticAtomicU64 {
+ init: val,
+ cell: OnceCell::new(),
+ }
+ }
+
+ pub(crate) fn load(&self, order: Ordering) -> u64 {
+ *self.inner().lock()
+ }
+
+ pub(crate) fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
+ let mut lock = self.inner().lock();
+ let prev = *lock;
+ *lock = prev + val;
+ prev
+ }
+
+ pub(crate) fn compare_exchange_weak(
+ &self,
+ current: u64,
+ new: u64,
+ _success: Ordering,
+ _failure: Ordering,
+ ) -> Result<u64, u64> {
+ let mut lock = self.inner().lock();
+
+ if *lock == current {
+ *lock = new;
+ Ok(current)
+ } else {
+ Err(*lock)
+ }
+ }
+
+ fn inner(&self) -> &Mutex<u64> {
+ self.cell.get(|| Mutex::new(self.init))
+ }
+}
diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs
index 1fc0032..6bd1ad9 100644
--- a/src/loom/std/mod.rs
+++ b/src/loom/std/mod.rs
@@ -71,7 +71,7 @@ pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use crate::loom::std::atomic_u16::AtomicU16;
pub(crate) use crate::loom::std::atomic_u32::AtomicU32;
- pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
+ pub(crate) use crate::loom::std::atomic_u64::{AtomicU64, StaticAtomicU64};
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering};
@@ -81,7 +81,27 @@ pub(crate) mod sync {
pub(crate) mod sys {
#[cfg(feature = "rt-multi-thread")]
pub(crate) fn num_cpus() -> usize {
- usize::max(1, num_cpus::get())
+ const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS";
+
+ match std::env::var(ENV_WORKER_THREADS) {
+ Ok(s) => {
+ let n = s.parse().unwrap_or_else(|e| {
+ panic!(
+ "\"{}\" must be usize, error: {}, value: {}",
+ ENV_WORKER_THREADS, e, s
+ )
+ });
+ assert!(n > 0, "\"{}\" cannot be set to 0", ENV_WORKER_THREADS);
+ n
+ }
+ Err(std::env::VarError::NotPresent) => usize::max(1, num_cpus::get()),
+ Err(std::env::VarError::NotUnicode(e)) => {
+ panic!(
+ "\"{}\" must be valid unicode, error: {:?}",
+ ENV_WORKER_THREADS, e
+ )
+ }
+ }
}
#[cfg(not(feature = "rt-multi-thread"))]
diff --git a/src/macros/cfg.rs b/src/macros/cfg.rs
index 2eea344..1c66d24 100644
--- a/src/macros/cfg.rs
+++ b/src/macros/cfg.rs
@@ -461,14 +461,14 @@ macro_rules! cfg_not_coop {
macro_rules! cfg_has_atomic_u64 {
($($item:item)*) => {
$(
- #[cfg(not(any(
- target_arch = "arm",
- target_arch = "mips",
- target_arch = "powerpc",
- target_arch = "riscv32",
- tokio_wasm,
- tokio_no_atomic_u64,
- )))]
+ #[cfg_attr(
+ not(tokio_no_target_has_atomic),
+ cfg(all(target_has_atomic = "64", not(tokio_no_atomic_u64))
+ ))]
+ #[cfg_attr(
+ tokio_no_target_has_atomic,
+ cfg(not(tokio_no_atomic_u64))
+ )]
$item
)*
}
@@ -477,14 +477,14 @@ macro_rules! cfg_has_atomic_u64 {
macro_rules! cfg_not_has_atomic_u64 {
($($item:item)*) => {
$(
- #[cfg(any(
- target_arch = "arm",
- target_arch = "mips",
- target_arch = "powerpc",
- target_arch = "riscv32",
- tokio_wasm,
- tokio_no_atomic_u64,
+ #[cfg_attr(
+ not(tokio_no_target_has_atomic),
+ cfg(any(not(target_has_atomic = "64"), tokio_no_atomic_u64)
))]
+ #[cfg_attr(
+ tokio_no_target_has_atomic,
+ cfg(tokio_no_atomic_u64)
+ )]
$item
)*
}
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index 4a022fa..4441313 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -195,15 +195,22 @@ impl TcpListener {
/// Creates new `TcpListener` from a `std::net::TcpListener`.
///
/// This function is intended to be used to wrap a TCP listener from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying listener; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
///
/// This API is typically paired with the `socket2` crate and the `Socket`
/// type to build up and customize a listener before it's shipped off to the
/// backing event loop. This allows configuration of options like
/// `SO_REUSEPORT`, binding to multiple addresses, etc.
///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the listener is in
+ /// non-blocking mode. Otherwise all I/O operations on the listener
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpListener::set_nonblocking
+ ///
/// # Examples
///
/// ```rust,no_run
diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs
index 9453411..09349fe 100644
--- a/src/net/tcp/socket.rs
+++ b/src/net/tcp/socket.rs
@@ -670,6 +670,15 @@ impl TcpSocket {
/// [`std::net::TcpStream`]: struct@std::net::TcpStream
/// [`socket2`]: https://docs.rs/socket2/
///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socket is in
+ /// non-blocking mode. Otherwise all I/O operations on the socket
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
+ ///
/// # Examples
///
/// ```
@@ -678,8 +687,8 @@ impl TcpSocket {
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
- ///
/// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
+ /// socket2_socket.set_nonblocking(true)?;
///
/// let socket = TcpSocket::from_std_stream(socket2_socket.into());
///
diff --git a/src/net/tcp/split_owned.rs b/src/net/tcp/split_owned.rs
index b2730e8..53fc5f0 100644
--- a/src/net/tcp/split_owned.rs
+++ b/src/net/tcp/split_owned.rs
@@ -490,12 +490,12 @@ impl AsyncWrite for OwnedWriteHalf {
impl AsRef<TcpStream> for OwnedReadHalf {
fn as_ref(&self) -> &TcpStream {
- &*self.inner
+ &self.inner
}
}
impl AsRef<TcpStream> for OwnedWriteHalf {
fn as_ref(&self) -> &TcpStream {
- &*self.inner
+ &self.inner
}
}
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index b7dd337..b17d33f 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -165,9 +165,16 @@ impl TcpStream {
/// Creates new `TcpStream` from a `std::net::TcpStream`.
///
/// This function is intended to be used to wrap a TCP stream from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying stream; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the stream is in
+ /// non-blocking mode. Otherwise all I/O operations on the stream
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
///
/// # Examples
///
diff --git a/src/net/udp.rs b/src/net/udp.rs
index af343f2..213d914 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -179,14 +179,21 @@ impl UdpSocket {
/// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
///
/// This function is intended to be used to wrap a UDP socket from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying socket; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
///
/// This can be used in conjunction with socket2's `Socket` interface to
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socket is in
+ /// non-blocking mode. Otherwise all I/O operations on the socket
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::UdpSocket::set_nonblocking
+ ///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs
index 5e1453e..76c6b19 100644
--- a/src/net/unix/datagram/socket.rs
+++ b/src/net/unix/datagram/socket.rs
@@ -426,9 +426,16 @@ impl UnixDatagram {
/// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`.
///
/// This function is intended to be used to wrap a UnixDatagram from the
- /// standard library in the Tokio equivalent. The conversion assumes
- /// nothing about the underlying datagram; it is left up to the user to set
- /// it in non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socker is in
+ /// non-blocking mode. Otherwise all I/O operations on the socket
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
///
/// # Panics
///
@@ -470,21 +477,19 @@ impl UnixDatagram {
/// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
///
/// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
- /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
+ /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
/// if needed.
///
/// # Examples
///
/// ```rust,no_run
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?;
- /// let std_socket = tokio_socket.into_std()?;
- /// std_socket.set_nonblocking(false)?;
- /// Ok(())
- /// }
+ /// # use std::error::Error;
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
+ /// let std_socket = tokio_socket.into_std()?;
+ /// std_socket.set_nonblocking(false)?;
+ /// # Ok(())
+ /// # }
/// ```
///
/// [`tokio::net::UnixDatagram`]: UnixDatagram
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs
index fbea3e7..9887f73 100644
--- a/src/net/unix/listener.rs
+++ b/src/net/unix/listener.rs
@@ -73,9 +73,31 @@ impl UnixListener {
/// Creates new `UnixListener` from a `std::os::unix::net::UnixListener `.
///
/// This function is intended to be used to wrap a UnixListener from the
- /// standard library in the Tokio equivalent. The conversion assumes
- /// nothing about the underlying listener; it is left up to the user to set
- /// it in non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the listener is in
+ /// non-blocking mode. Otherwise all I/O operations on the listener
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixListener;
+ /// use std::os::unix::net::UnixListener as StdUnixListener;
+ /// # use std::error::Error;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let std_listener = StdUnixListener::bind("/path/to/the/socket")?;
+ /// std_listener.set_nonblocking(true)?;
+ /// let listener = UnixListener::from_std(std_listener)?;
+ /// # Ok(())
+ /// # }
+ /// ```
///
/// # Panics
///
@@ -95,20 +117,18 @@ impl UnixListener {
/// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
///
/// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode
- /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
+ /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
///
/// # Examples
///
/// ```rust,no_run
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// let tokio_listener = tokio::net::UnixListener::bind("127.0.0.1:0")?;
- /// let std_listener = tokio_listener.into_std()?;
- /// std_listener.set_nonblocking(false)?;
- /// Ok(())
- /// }
+ /// # use std::error::Error;
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket")?;
+ /// let std_listener = tokio_listener.into_std()?;
+ /// std_listener.set_nonblocking(false)?;
+ /// # Ok(())
+ /// # }
/// ```
///
/// [`tokio::net::UnixListener`]: UnixListener
diff --git a/src/net/unix/split_owned.rs b/src/net/unix/split_owned.rs
index da41ced..2cb561d 100644
--- a/src/net/unix/split_owned.rs
+++ b/src/net/unix/split_owned.rs
@@ -398,12 +398,12 @@ impl AsyncWrite for OwnedWriteHalf {
impl AsRef<UnixStream> for OwnedReadHalf {
fn as_ref(&self) -> &UnixStream {
- &*self.inner
+ &self.inner
}
}
impl AsRef<UnixStream> for OwnedWriteHalf {
fn as_ref(&self) -> &UnixStream {
- &*self.inner
+ &self.inner
}
}
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index 2d27898..c249bf4 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -709,9 +709,31 @@ impl UnixStream {
/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
- /// standard library in the Tokio equivalent. The conversion assumes
- /// nothing about the underlying stream; it is left up to the user to set
- /// it in non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the stream is in
+ /// non-blocking mode. Otherwise all I/O operations on the stream
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::os::unix::net::UnixStream as StdUnixStream;
+ /// # use std::error::Error;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
+ /// std_stream.set_nonblocking(true)?;
+ /// let stream = UnixStream::from_std(std_stream)?;
+ /// # Ok(())
+ /// # }
+ /// ```
///
/// # Panics
///
diff --git a/src/net/windows/named_pipe.rs b/src/net/windows/named_pipe.rs
index 692c69d..9ede94e 100644
--- a/src/net/windows/named_pipe.rs
+++ b/src/net/windows/named_pipe.rs
@@ -1705,11 +1705,10 @@ impl ServerOptions {
///
/// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
- self.pipe_mode = match pipe_mode {
- PipeMode::Byte => windows_sys::PIPE_TYPE_BYTE,
- PipeMode::Message => windows_sys::PIPE_TYPE_MESSAGE,
- };
-
+ let is_msg = matches!(pipe_mode, PipeMode::Message);
+ // Pipe mode is implemented as a bit flag 0x4. Set is message and unset
+ // is byte.
+ bool_flag!(self.pipe_mode, is_msg, windows_sys::PIPE_TYPE_MESSAGE);
self
}
@@ -2554,3 +2553,48 @@ unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
max_instances,
})
}
+
+#[cfg(test)]
+mod test {
+ use self::windows_sys::{PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE};
+ use super::*;
+
+ #[test]
+ fn opts_default_pipe_mode() {
+ let opts = ServerOptions::new();
+ assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
+ }
+
+ #[test]
+ fn opts_unset_reject_remote() {
+ let mut opts = ServerOptions::new();
+ opts.reject_remote_clients(false);
+ assert_eq!(opts.pipe_mode & PIPE_REJECT_REMOTE_CLIENTS, 0);
+ }
+
+ #[test]
+ fn opts_set_pipe_mode_maintains_reject_remote_clients() {
+ let mut opts = ServerOptions::new();
+ opts.pipe_mode(PipeMode::Byte);
+ assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
+
+ opts.reject_remote_clients(false);
+ opts.pipe_mode(PipeMode::Byte);
+ assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE);
+
+ opts.reject_remote_clients(true);
+ opts.pipe_mode(PipeMode::Byte);
+ assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
+
+ opts.reject_remote_clients(false);
+ opts.pipe_mode(PipeMode::Message);
+ assert_eq!(opts.pipe_mode, PIPE_TYPE_MESSAGE);
+
+ opts.reject_remote_clients(true);
+ opts.pipe_mode(PipeMode::Message);
+ assert_eq!(
+ opts.pipe_mode,
+ PIPE_TYPE_MESSAGE | PIPE_REJECT_REMOTE_CLIENTS
+ );
+ }
+}
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs
index 0345083..78c792c 100644
--- a/src/process/unix/mod.rs
+++ b/src/process/unix/mod.rs
@@ -156,7 +156,7 @@ impl Future for Child {
#[derive(Debug)]
pub(crate) struct Pipe {
- // Actually a pipe and not a File. However, we are reusing `File` to get
+ // Actually a pipe is not a File. However, we are reusing `File` to get
// close on drop. This is a similar trick as `mio`.
fd: File,
}
diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs
index 66572ef..3407196 100644
--- a/src/process/unix/orphan.rs
+++ b/src/process/unix/orphan.rs
@@ -294,7 +294,7 @@ pub(crate) mod test {
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
- let (io_driver, io_handle) = IoDriver::new().unwrap();
+ let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
let handle = signal_driver.handle();
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs
index 88bdcfd..c42924b 100644
--- a/src/runtime/blocking/mod.rs
+++ b/src/runtime/blocking/mod.rs
@@ -17,8 +17,6 @@ cfg_trace! {
mod schedule;
mod shutdown;
mod task;
-#[cfg(all(test, not(tokio_wasm)))]
-pub(crate) use schedule::NoopSchedule;
pub(crate) use task::BlockingTask;
use crate::runtime::Builder;
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 9c53614..e9f6b66 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -2,7 +2,7 @@
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
-use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::schedule::BlockingSchedule;
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
@@ -120,7 +120,7 @@ struct Shared {
}
pub(crate) struct Task {
- task: task::UnownedTask<NoopSchedule>,
+ task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}
@@ -151,7 +151,7 @@ impl From<SpawnError> for io::Error {
}
impl Task {
- pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
+ pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
}
@@ -379,7 +379,8 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
- let (task, handle) = task::unowned(fut, NoopSchedule, id);
+ let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
+
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
}
diff --git a/src/runtime/blocking/schedule.rs b/src/runtime/blocking/schedule.rs
index 5425224..edf775b 100644
--- a/src/runtime/blocking/schedule.rs
+++ b/src/runtime/blocking/schedule.rs
@@ -1,15 +1,52 @@
+#[cfg(feature = "test-util")]
+use crate::runtime::scheduler;
use crate::runtime::task::{self, Task};
+use crate::runtime::Handle;
-/// `task::Schedule` implementation that does nothing. This is unique to the
-/// blocking scheduler as tasks scheduled are not really futures but blocking
-/// operations.
+/// `task::Schedule` implementation that does nothing (except some bookkeeping
+/// in test-util builds). This is unique to the blocking scheduler as tasks
+/// scheduled are not really futures but blocking operations.
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
-/// in `release.
-pub(crate) struct NoopSchedule;
+/// in `release`.
+pub(crate) struct BlockingSchedule {
+ #[cfg(feature = "test-util")]
+ handle: Handle,
+}
+
+impl BlockingSchedule {
+ #[cfg_attr(not(feature = "test-util"), allow(unused_variables))]
+ pub(crate) fn new(handle: &Handle) -> Self {
+ #[cfg(feature = "test-util")]
+ {
+ match &handle.inner {
+ scheduler::Handle::CurrentThread(handle) => {
+ handle.driver.clock.inhibit_auto_advance();
+ }
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Handle::MultiThread(_) => {}
+ }
+ }
+ BlockingSchedule {
+ #[cfg(feature = "test-util")]
+ handle: handle.clone(),
+ }
+ }
+}
-impl task::Schedule for NoopSchedule {
+impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ #[cfg(feature = "test-util")]
+ {
+ match &self.handle.inner {
+ scheduler::Handle::CurrentThread(handle) => {
+ handle.driver.clock.allow_auto_advance();
+ handle.driver.unpark();
+ }
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Handle::MultiThread(_) => {}
+ }
+ }
None
}
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index d49a4da..ea0df2e 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -44,6 +44,7 @@ pub struct Builder {
/// Whether or not to enable the I/O driver
enable_io: bool,
+ nevents: usize,
/// Whether or not to enable the time driver
enable_time: bool,
@@ -181,6 +182,7 @@ cfg_unstable! {
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
+#[derive(Clone, Copy)]
pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
@@ -228,6 +230,7 @@ impl Builder {
// I/O defaults to "off"
enable_io: false,
+ nevents: 1024,
// Time defaults to "off"
enable_time: false,
@@ -235,6 +238,7 @@ impl Builder {
// The clock starts not-paused
start_paused: false,
+ // Read from environment variable first in multi-threaded mode.
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,
@@ -302,6 +306,8 @@ impl Builder {
/// This can be any number above 0 though it is advised to keep this value
/// on the smaller side.
///
+ /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
+ ///
/// # Default
///
/// The default value is the number of cores available to the system.
@@ -647,6 +653,7 @@ impl Builder {
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
+ nevents: self.nevents,
}
}
@@ -938,6 +945,25 @@ cfg_io_driver! {
self.enable_io = true;
self
}
+
+ /// Enables the I/O driver and configures the max number of events to be
+ /// processed per tick.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new_current_thread()
+ /// .enable_io()
+ /// .max_io_events_per_tick(1024)
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
+ self.nevents = capacity;
+ self
+ }
}
}
diff --git a/src/runtime/context.rs b/src/runtime/context.rs
index 2e54c8b..fef53ca 100644
--- a/src/runtime/context.rs
+++ b/src/runtime/context.rs
@@ -15,6 +15,10 @@ cfg_rt! {
}
struct Context {
+ /// Uniquely identifies the current thread
+ #[cfg(feature = "rt")]
+ thread_id: Cell<Option<ThreadId>>,
+
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,
@@ -46,6 +50,9 @@ struct Context {
tokio_thread_local! {
static CONTEXT: Context = {
Context {
+ #[cfg(feature = "rt")]
+ thread_id: Cell::new(None),
+
/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
#[cfg(feature = "rt")]
@@ -82,10 +89,23 @@ pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, A
}
cfg_rt! {
- use crate::runtime::TryCurrentError;
+ use crate::runtime::{ThreadId, TryCurrentError};
use std::fmt;
+ pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
+ CONTEXT.try_with(|ctx| {
+ match ctx.thread_id.get() {
+ Some(id) => id,
+ None => {
+ let id = ThreadId::next();
+ ctx.thread_id.set(Some(id));
+ id
+ }
+ }
+ })
+ }
+
#[derive(Debug, Clone, Copy)]
#[must_use]
pub(crate) enum EnterRuntime {
diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs
index 8f9c512..4fb6b87 100644
--- a/src/runtime/driver.rs
+++ b/src/runtime/driver.rs
@@ -36,11 +36,12 @@ pub(crate) struct Cfg {
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
+ pub(crate) nevents: usize,
}
impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
- let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
+ let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
@@ -135,12 +136,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}
- fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
+ fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);
let ret = if enabled {
- let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;
+ let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
@@ -201,7 +202,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);
- fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
+ fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index da47ecb..c5dc65f 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -118,9 +118,9 @@ impl Handle {
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn` is called.
+ /// The provided future will start running in the background immediately
+ /// when `spawn` is called, even if you don't await the returned
+ /// `JoinHandle`.
///
/// See [module level][mod] documentation for more details.
///
diff --git a/src/runtime/io/mod.rs b/src/runtime/io/mod.rs
index 02039f2..2e578b6 100644
--- a/src/runtime/io/mod.rs
+++ b/src/runtime/io/mod.rs
@@ -60,6 +60,7 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
+ is_shutdown: bool,
}
struct IoDispatcher {
@@ -104,7 +105,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
- pub(crate) fn new() -> io::Result<(Driver, Handle)> {
+ pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
@@ -116,7 +117,7 @@ impl Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
- events: mio::Events::with_capacity(1024),
+ events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};
@@ -147,9 +148,8 @@ impl Driver {
if handle.shutdown() {
self.resources.for_each(|io| {
- // If a task is waiting on the I/O resource, notify it. The task
- // will then attempt to use the I/O resource and fail due to the
- // driver being shutdown. And shutdown will clear all wakers.
+ // If a task is waiting on the I/O resource, notify it that the
+ // runtime is being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
@@ -282,16 +282,12 @@ impl Handle {
true
}
- fn is_shutdown(&self) -> bool {
- return self.io_dispatch.read().unwrap().is_shutdown;
- }
-
fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
- "failed to find event loop",
+ crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}
io.allocator.allocate().ok_or_else(|| {
diff --git a/src/runtime/io/registration.rs b/src/runtime/io/registration.rs
index 7b95f7f..140b924 100644
--- a/src/runtime/io/registration.rs
+++ b/src/runtime/io/registration.rs
@@ -148,7 +148,7 @@ impl Registration {
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
- if self.handle().is_shutdown() {
+ if ev.is_shutdown {
return Poll::Ready(Err(gone()));
}
@@ -217,28 +217,22 @@ impl Drop for Registration {
}
fn gone() -> io::Error {
- io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
+ io::Error::new(
+ io::ErrorKind::Other,
+ crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
+ )
}
cfg_io_readiness! {
impl Registration {
pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
- use std::future::Future;
- use std::pin::Pin;
+ let ev = self.shared.readiness(interest).await;
- let fut = self.shared.readiness(interest);
- pin!(fut);
-
- crate::future::poll_fn(|cx| {
- if self.handle().is_shutdown() {
- return Poll::Ready(Err(io::Error::new(
- io::ErrorKind::Other,
- crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
- )));
- }
+ if ev.is_shutdown {
+ return Err(gone())
+ }
- Pin::new(&mut fut).poll(cx).map(Ok)
- }).await
+ Ok(ev)
}
pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
diff --git a/src/runtime/io/scheduled_io.rs b/src/runtime/io/scheduled_io.rs
index 1709091..197a4e0 100644
--- a/src/runtime/io/scheduled_io.rs
+++ b/src/runtime/io/scheduled_io.rs
@@ -46,9 +46,6 @@ struct Waiters {
/// Waker used for AsyncWrite.
writer: Option<Waker>,
-
- /// True if this ScheduledIo has been killed due to IO driver shutdown.
- is_shutdown: bool,
}
cfg_io_readiness! {
@@ -95,7 +92,7 @@ cfg_io_readiness! {
// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
//
-// | reserved | generation | driver tick | readiness |
+// | shutdown | generation | driver tick | readiness |
// |----------+------------+--------------+-----------|
// | 1 bit | 7 bits + 8 bits + 16 bits |
@@ -105,6 +102,8 @@ const TICK: bit::Pack = READINESS.then(8);
const GENERATION: bit::Pack = TICK.then(7);
+const SHUTDOWN: bit::Pack = GENERATION.then(1);
+
#[test]
fn test_generations_assert_same() {
assert_eq!(super::GENERATION, GENERATION);
@@ -138,9 +137,11 @@ impl ScheduledIo {
}
/// Invoked when the IO driver is shut down; forces this ScheduledIo into a
- /// permanently ready state.
+ /// permanently shutdown state.
pub(super) fn shutdown(&self) {
- self.wake0(Ready::ALL, true)
+ let mask = SHUTDOWN.pack(1, 0);
+ self.readiness.fetch_or(mask, AcqRel);
+ self.wake(Ready::ALL);
}
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
@@ -219,16 +220,10 @@ impl ScheduledIo {
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
- self.wake0(ready, false);
- }
-
- fn wake0(&self, ready: Ready, shutdown: bool) {
let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
- waiters.is_shutdown |= shutdown;
-
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
@@ -283,6 +278,7 @@ impl ScheduledIo {
ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
+ is_shutdown: SHUTDOWN.unpack(curr) != 0,
}
}
@@ -299,8 +295,9 @@ impl ScheduledIo {
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
- if ready.is_empty() {
+ if ready.is_empty() && !is_shutdown {
// Update the task info
let mut waiters = self.waiters.lock();
let slot = match direction {
@@ -325,10 +322,12 @@ impl ScheduledIo {
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
- if waiters.is_shutdown {
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+ if is_shutdown {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
+ is_shutdown,
})
} else if ready.is_empty() {
Poll::Pending
@@ -336,12 +335,14 @@ impl ScheduledIo {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
+ is_shutdown,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
+ is_shutdown,
})
}
}
@@ -433,16 +434,17 @@ cfg_io_readiness! {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
- if !ready.is_empty() {
+ if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
+ return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}
// Wasn't ready, take the lock (and check again while locked).
@@ -450,18 +452,19 @@ cfg_io_readiness! {
let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
- if waiters.is_shutdown {
+ if is_shutdown {
ready = Ready::ALL;
}
let ready = ready.intersection(interest);
- if !ready.is_empty() {
+ if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
+ return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}
// Not ready even after locked, insert into list...
@@ -514,6 +517,7 @@ cfg_io_readiness! {
let w = unsafe { &mut *waiter.get() };
let curr = scheduled_io.readiness.load(Acquire);
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
@@ -528,6 +532,7 @@ cfg_io_readiness! {
return Poll::Ready(ReadyEvent {
tick,
ready,
+ is_shutdown,
});
}
}
diff --git a/src/runtime/metrics/batch.rs b/src/runtime/metrics/batch.rs
index f1c3fa6..4e6b28d 100644
--- a/src/runtime/metrics/batch.rs
+++ b/src/runtime/metrics/batch.rs
@@ -11,9 +11,12 @@ pub(crate) struct MetricsBatch {
/// Number of times the worker woke w/o doing work.
noop_count: u64,
- /// Number of times stolen.
+ /// Number of tasks stolen.
steal_count: u64,
+ /// Number of times tasks where stolen.
+ steal_operations: u64,
+
/// Number of tasks that were polled by the worker.
poll_count: u64,
@@ -39,6 +42,7 @@ impl MetricsBatch {
park_count: 0,
noop_count: 0,
steal_count: 0,
+ steal_operations: 0,
poll_count: 0,
poll_count_on_last_park: 0,
local_schedule_count: 0,
@@ -52,6 +56,9 @@ impl MetricsBatch {
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
+ worker
+ .steal_operations
+ .store(self.steal_operations, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
worker
@@ -98,6 +105,10 @@ cfg_rt_multi_thread! {
self.steal_count += by as u64;
}
+ pub(crate) fn incr_steal_operations(&mut self) {
+ self.steal_operations += 1;
+ }
+
pub(crate) fn incr_overflow_count(&mut self) {
self.overflow_count += 1;
}
diff --git a/src/runtime/metrics/mock.rs b/src/runtime/metrics/mock.rs
index 6b9cf70..c388dc0 100644
--- a/src/runtime/metrics/mock.rs
+++ b/src/runtime/metrics/mock.rs
@@ -38,6 +38,7 @@ impl MetricsBatch {
cfg_rt_multi_thread! {
impl MetricsBatch {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
+ pub(crate) fn incr_steal_operations(&mut self) {}
pub(crate) fn incr_overflow_count(&mut self) {}
}
}
diff --git a/src/runtime/metrics/runtime.rs b/src/runtime/metrics/runtime.rs
index dee14a4..d29cb3d 100644
--- a/src/runtime/metrics/runtime.rs
+++ b/src/runtime/metrics/runtime.rs
@@ -210,10 +210,57 @@ impl RuntimeMetrics {
.load(Relaxed)
}
+ /// Returns the number of tasks the given worker thread stole from
+ /// another worker thread.
+ ///
+ /// This metric only applies to the **multi-threaded** runtime and will
+ /// always return `0` when using the current thread runtime.
+ ///
+ /// The worker steal count starts at zero when the runtime is created and
+ /// increases by `N` each time the worker has processed its scheduled queue
+ /// and successfully steals `N` more pending tasks from another worker.
+ ///
+ /// The counter is monotonically increasing. It is never decremented or
+ /// reset to zero.
+ ///
+ /// # Arguments
+ ///
+ /// `worker` is the index of the worker being queried. The given value must
+ /// be between 0 and `num_workers()`. The index uniquely identifies a single
+ /// worker and will continue to identify the worker throughout the lifetime
+ /// of the runtime instance.
+ ///
+ /// # Panics
+ ///
+ /// The method panics when `worker` represents an invalid worker, i.e. is
+ /// greater than or equal to `num_workers()`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Handle;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let metrics = Handle::current().metrics();
+ ///
+ /// let n = metrics.worker_steal_count(0);
+ /// println!("worker 0 has stolen {} tasks", n);
+ /// }
+ /// ```
+ pub fn worker_steal_count(&self, worker: usize) -> u64 {
+ self.handle
+ .inner
+ .worker_metrics(worker)
+ .steal_count
+ .load(Relaxed)
+ }
+
/// Returns the number of times the given worker thread stole tasks from
/// another worker thread.
///
- /// This metric only applies to the **multi-threaded** runtime and will always return `0` when using the current thread runtime.
+ /// This metric only applies to the **multi-threaded** runtime and will
+ /// always return `0` when using the current thread runtime.
///
/// The worker steal count starts at zero when the runtime is created and
/// increases by one each time the worker has processed its scheduled queue
@@ -243,15 +290,15 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
- /// let n = metrics.worker_noop_count(0);
+ /// let n = metrics.worker_steal_operations(0);
/// println!("worker 0 has stolen tasks {} times", n);
/// }
/// ```
- pub fn worker_steal_count(&self, worker: usize) -> u64 {
+ pub fn worker_steal_operations(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
- .steal_count
+ .steal_operations
.load(Relaxed)
}
@@ -328,8 +375,8 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
- /// let n = metrics.worker_poll_count(0);
- /// println!("worker 0 has polled {} tasks", n);
+ /// let n = metrics.worker_total_busy_duration(0);
+ /// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
diff --git a/src/runtime/metrics/worker.rs b/src/runtime/metrics/worker.rs
index ec58de6..a40c76e 100644
--- a/src/runtime/metrics/worker.rs
+++ b/src/runtime/metrics/worker.rs
@@ -17,9 +17,12 @@ pub(crate) struct WorkerMetrics {
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: AtomicU64,
- /// Number of times the worker attempted to steal.
+ /// Number of tasks the worker stole.
pub(crate) steal_count: AtomicU64,
+ /// Number of times the worker stole
+ pub(crate) steal_operations: AtomicU64,
+
/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,
@@ -43,6 +46,7 @@ impl WorkerMetrics {
park_count: AtomicU64::new(0),
noop_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
+ steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index 45b79b0..b6f43ea 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -237,6 +237,9 @@ cfg_rt! {
mod runtime;
pub use runtime::{Runtime, RuntimeFlavor};
+ mod thread_id;
+ pub(crate) use thread_id::ThreadId;
+
cfg_metrics! {
mod metrics;
pub use metrics::RuntimeMetrics;
diff --git a/src/runtime/runtime.rs b/src/runtime/runtime.rs
index 9ede0a7..1985673 100644
--- a/src/runtime/runtime.rs
+++ b/src/runtime/runtime.rs
@@ -163,9 +163,9 @@ impl Runtime {
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the
- /// background immediately when `spawn` is called.
+ /// The provided future will start running in the background immediately
+ /// when `spawn` is called, even if you don't await the returned
+ /// `JoinHandle`.
///
/// See [module level][mod] documentation for more details.
///
diff --git a/src/runtime/scheduler/multi_thread/queue.rs b/src/runtime/scheduler/multi_thread/queue.rs
index 59b448d..faf56db 100644
--- a/src/runtime/scheduler/multi_thread/queue.rs
+++ b/src/runtime/scheduler/multi_thread/queue.rs
@@ -263,7 +263,7 @@ impl<T> Local<T> {
// safety: The CAS above ensures that no consumer will look at these
// values again, and we are the only producer.
let batch_iter = BatchTaskIter {
- buffer: &*self.inner.buffer,
+ buffer: &self.inner.buffer,
head: head as UnsignedLong,
i: 0,
};
@@ -353,6 +353,7 @@ impl<T> Steal<T> {
}
dst_metrics.incr_steal_count(n as u16);
+ dst_metrics.incr_steal_operations();
// We are returning a task here
n -= 1;
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index c079297..8e3c3d1 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -194,7 +194,7 @@ where
TransitionToRunning::Success => {
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
- let cx = Context::from_waker(&*waker_ref);
+ let cx = Context::from_waker(&waker_ref);
let res = poll_future(self.core(), cx);
if res == Poll::Ready(()) {
@@ -315,9 +315,10 @@ where
// this task. It is our responsibility to drop the
// output.
self.core().drop_future_or_output();
- } else if snapshot.has_join_waker() {
- // Notify the join handle. The previous transition obtains the
- // lock on the waker cell.
+ } else if snapshot.is_join_waker_set() {
+ // Notify the waker. Reading the waker field is safe per rule 4
+ // in task/mod.rs, since the JOIN_WAKER bit is set and the call
+ // to transition_to_complete() above set the COMPLETE bit.
self.trailer().wake_join();
}
}));
@@ -367,36 +368,30 @@ fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
debug_assert!(snapshot.is_join_interested());
if !snapshot.is_complete() {
- // The waker must be stored in the task struct.
- let res = if snapshot.has_join_waker() {
- // There already is a waker stored in the struct. If it matches
- // the provided waker, then there is no further work to do.
- // Otherwise, the waker must be swapped.
- let will_wake = unsafe {
- // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
- // may mutate the `waker` field.
- trailer.will_wake(waker)
- };
-
- if will_wake {
- // The task is not complete **and** the waker is up to date,
- // there is nothing further that needs to be done.
+ // If the task is not complete, try storing the provided waker in the
+ // task's waker field.
+
+ let res = if snapshot.is_join_waker_set() {
+ // If JOIN_WAKER is set, then JoinHandle has previously stored a
+ // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
+
+ // Optimization: if the stored waker and the provided waker wake the
+ // same task, then return without touching the waker field. (Reading
+ // the waker field below is safe per rule 3 in task/mod.rs.)
+ if unsafe { trailer.will_wake(waker) } {
return false;
}
- // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
- // field then update the field with the new join worker.
- //
- // This requires two atomic operations, unsetting the bit and
- // then resetting it. If the task transitions to complete
- // concurrently to either one of those operations, then setting
- // the join waker fails and we proceed to reading the task
- // output.
+ // Otherwise swap the stored waker with the provided waker by
+ // following the rule 5 in task/mod.rs.
header
.state
.unset_waker()
.and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
} else {
+ // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
+ // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
+ // of rule 5 and try to store the provided waker in the waker field.
set_join_waker(header, trailer, waker.clone(), snapshot)
};
@@ -417,7 +412,7 @@ fn set_join_waker(
snapshot: Snapshot,
) -> Result<Snapshot, Snapshot> {
assert!(snapshot.is_join_interested());
- assert!(!snapshot.has_join_waker());
+ assert!(!snapshot.is_join_waker_set());
// Safety: Only the `JoinHandle` may set the `waker` field. When
// `JOIN_INTEREST` is **not** set, nothing else will touch the field.
diff --git a/src/runtime/task/id.rs b/src/runtime/task/id.rs
new file mode 100644
index 0000000..2b0d95c
--- /dev/null
+++ b/src/runtime/task/id.rs
@@ -0,0 +1,87 @@
+use crate::runtime::context;
+
+use std::fmt;
+
+/// An opaque ID that uniquely identifies a task relative to all other currently
+/// running tasks.
+///
+/// # Notes
+///
+/// - Task IDs are unique relative to other *currently running* tasks. When a
+/// task completes, the same ID may be used for another task.
+/// - Task IDs are *not* sequential, and do not indicate the order in which
+/// tasks are spawned, what runtime a task is spawned on, or any other data.
+/// - The task ID of the currently running task can be obtained from inside the
+/// task via the [`task::try_id()`](crate::task::try_id()) and
+/// [`task::id()`](crate::task::id()) functions and from outside the task via
+/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [unstable]: crate#unstable-features
+#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
+pub struct Id(u64);
+
+/// Returns the [`Id`] of the currently running task.
+///
+/// # Panics
+///
+/// This function panics if called from outside a task. Please note that calls
+/// to `block_on` do not have task IDs, so the method will panic if called from
+/// within a call to `block_on`. For a version of this function that doesn't
+/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn id() -> Id {
+ context::current_task_id().expect("Can't get a task id when not inside a task")
+}
+
+/// Returns the [`Id`] of the currently running task, or `None` if called outside
+/// of a task.
+///
+/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
+/// that it returns `None` rather than panicking if called outside of a task
+/// context.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn try_id() -> Option<Id> {
+ context::current_task_id()
+}
+
+impl fmt::Display for Id {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
+impl Id {
+ pub(crate) fn next() -> Self {
+ use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
+
+ static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
+
+ Self(NEXT_ID.fetch_add(1, Relaxed))
+ }
+
+ pub(crate) fn as_u64(&self) -> u64 {
+ self.0
+ }
+}
diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs
index 5660575..11c4b9b 100644
--- a/src/runtime/task/join.rs
+++ b/src/runtime/task/join.rs
@@ -11,9 +11,9 @@ cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
/// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
- /// for a Tokio task rather than a thread. You do not need to `.await` the
- /// `JoinHandle` to make the task execute — it will start running in the
- /// background immediately.
+ /// for a Tokio task rather than a thread. Note that the background task
+ /// associated with this `JoinHandle` started running immediately when you
+ /// called spawn, even if you have not yet awaited the `JoinHandle`.
///
/// A `JoinHandle` *detaches* the associated task when it is dropped, which
/// means that there is no longer any handle to the task, and no way to `join`
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs
index fea6e0f..55131ac 100644
--- a/src/runtime/task/mod.rs
+++ b/src/runtime/task/mod.rs
@@ -168,19 +168,20 @@
// unstable. This should be removed once `JoinSet` is stabilized.
#![cfg_attr(not(tokio_unstable), allow(dead_code))]
-use crate::runtime::context;
-
mod core;
use self::core::Cell;
use self::core::Header;
mod error;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::error::JoinError;
mod harness;
use self::harness::Harness;
+mod id;
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+pub use id::{id, try_id, Id};
+
cfg_rt_multi_thread! {
mod inject;
pub(super) use self::inject::Inject;
@@ -191,10 +192,8 @@ mod abort;
mod join;
#[cfg(feature = "rt")]
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::abort::AbortHandle;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;
mod list;
@@ -215,70 +214,6 @@ use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
-/// An opaque ID that uniquely identifies a task relative to all other currently
-/// running tasks.
-///
-/// # Notes
-///
-/// - Task IDs are unique relative to other *currently running* tasks. When a
-/// task completes, the same ID may be used for another task.
-/// - Task IDs are *not* sequential, and do not indicate the order in which
-/// tasks are spawned, what runtime a task is spawned on, or any other data.
-/// - The task ID of the currently running task can be obtained from inside the
-/// task via the [`task::try_id()`](crate::task::try_id()) and
-/// [`task::id()`](crate::task::id()) functions and from outside the task via
-/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
-///
-/// **Note**: This is an [unstable API][unstable]. The public API of this type
-/// may break in 1.x releases. See [the documentation on unstable
-/// features][unstable] for details.
-///
-/// [unstable]: crate#unstable-features
-#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
-#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
-#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
-pub struct Id(u64);
-
-/// Returns the [`Id`] of the currently running task.
-///
-/// # Panics
-///
-/// This function panics if called from outside a task. Please note that calls
-/// to `block_on` do not have task IDs, so the method will panic if called from
-/// within a call to `block_on`. For a version of this function that doesn't
-/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
-///
-/// **Note**: This is an [unstable API][unstable]. The public API of this type
-/// may break in 1.x releases. See [the documentation on unstable
-/// features][unstable] for details.
-///
-/// [task ID]: crate::task::Id
-/// [unstable]: crate#unstable-features
-#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
-#[track_caller]
-pub fn id() -> Id {
- context::current_task_id().expect("Can't get a task id when not inside a task")
-}
-
-/// Returns the [`Id`] of the currently running task, or `None` if called outside
-/// of a task.
-///
-/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
-/// that it returns `None` rather than panicking if called outside of a task
-/// context.
-///
-/// **Note**: This is an [unstable API][unstable]. The public API of this type
-/// may break in 1.x releases. See [the documentation on unstable
-/// features][unstable] for details.
-///
-/// [task ID]: crate::task::Id
-/// [unstable]: crate#unstable-features
-#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
-#[track_caller]
-pub fn try_id() -> Option<Id> {
- context::current_task_id()
-}
-
/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
@@ -554,66 +489,3 @@ unsafe impl<S> linked_list::Link for Task<S> {
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
}
}
-
-impl fmt::Display for Id {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
-}
-
-impl Id {
- // When 64-bit atomics are available, use a static `AtomicU64` counter to
- // generate task IDs.
- //
- // Note(eliza): we _could_ just use `crate::loom::AtomicU64`, which switches
- // between an atomic and mutex-based implementation here, rather than having
- // two separate functions for targets with and without 64-bit atomics.
- // However, because we can't use the mutex-based implementation in a static
- // initializer directly, the 32-bit impl also has to use a `OnceCell`, and I
- // thought it was nicer to avoid the `OnceCell` overhead on 64-bit
- // platforms...
- cfg_has_atomic_u64! {
- pub(crate) fn next() -> Self {
- use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
- static NEXT_ID: AtomicU64 = AtomicU64::new(1);
- Self(NEXT_ID.fetch_add(1, Relaxed))
- }
- }
-
- cfg_not_has_atomic_u64! {
- cfg_has_const_mutex_new! {
- pub(crate) fn next() -> Self {
- use crate::loom::sync::Mutex;
- static NEXT_ID: Mutex<u64> = Mutex::const_new(1);
-
- let mut lock = NEXT_ID.lock();
- let id = *lock;
- *lock += 1;
- Self(id)
- }
- }
-
- cfg_not_has_const_mutex_new! {
- pub(crate) fn next() -> Self {
- use crate::util::once_cell::OnceCell;
- use crate::loom::sync::Mutex;
-
- fn init_next_id() -> Mutex<u64> {
- Mutex::new(1)
- }
-
- static NEXT_ID: OnceCell<Mutex<u64>> = OnceCell::new();
-
- let next_id = NEXT_ID.get(init_next_id);
- let mut lock = next_id.lock();
- let id = *lock;
- *lock += 1;
- Self(id)
- }
- }
- }
-
- pub(crate) fn as_u64(&self) -> u64 {
- self.0
- }
-}
diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs
index c2d5b28..7728312 100644
--- a/src/runtime/task/state.rs
+++ b/src/runtime/task/state.rs
@@ -378,7 +378,7 @@ impl State {
pub(super) fn set_join_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
- assert!(!curr.has_join_waker());
+ assert!(!curr.is_join_waker_set());
if curr.is_complete() {
return None;
@@ -398,7 +398,7 @@ impl State {
pub(super) fn unset_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
- assert!(curr.has_join_waker());
+ assert!(curr.is_join_waker_set());
if curr.is_complete() {
return None;
@@ -546,7 +546,7 @@ impl Snapshot {
self.0 &= !JOIN_INTEREST
}
- pub(super) fn has_join_waker(self) -> bool {
+ pub(super) fn is_join_waker_set(self) -> bool {
self.0 & JOIN_WAKER == JOIN_WAKER
}
@@ -588,7 +588,7 @@ impl fmt::Debug for Snapshot {
.field("is_notified", &self.is_notified())
.field("is_cancelled", &self.is_cancelled())
.field("is_join_interested", &self.is_join_interested())
- .field("has_join_waker", &self.has_join_waker())
+ .field("is_join_waker_set", &self.is_join_waker_set())
.field("ref_count", &self.ref_count())
.finish()
}
diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs
index 89de85e..5c4aeae 100644
--- a/src/runtime/tests/loom_blocking.rs
+++ b/src/runtime/tests/loom_blocking.rs
@@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread
});
}
+#[test]
+fn spawn_blocking_when_paused() {
+ use std::time::Duration;
+ loom::model(|| {
+ let rt = crate::runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+ let handle = rt.handle();
+ let _enter = handle.enter();
+ let a = crate::task::spawn_blocking(|| {});
+ let b = crate::task::spawn_blocking(|| {});
+ rt.block_on(crate::time::timeout(Duration::from_millis(1), async move {
+ a.await.expect("blocking task should finish");
+ b.await.expect("blocking task should finish");
+ }))
+ .expect("timeout should not trigger");
+ });
+}
+
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs
index 8d4e1d3..fc93bf3 100644
--- a/src/runtime/tests/loom_queue.rs
+++ b/src/runtime/tests/loom_queue.rs
@@ -1,6 +1,6 @@
-use crate::runtime::blocking::NoopSchedule;
use crate::runtime::scheduler::multi_thread::queue;
use crate::runtime::task::Inject;
+use crate::runtime::tests::NoopSchedule;
use crate::runtime::MetricsBatch;
use loom::thread;
diff --git a/src/runtime/tests/mod.rs b/src/runtime/tests/mod.rs
index 1c67dfe..4e7c245 100644
--- a/src/runtime/tests/mod.rs
+++ b/src/runtime/tests/mod.rs
@@ -2,11 +2,29 @@
// other code when running loom tests.
#![cfg_attr(loom, warn(dead_code, unreachable_pub))]
+use self::noop_scheduler::NoopSchedule;
use self::unowned_wrapper::unowned;
+mod noop_scheduler {
+ use crate::runtime::task::{self, Task};
+
+ /// `task::Schedule` implementation that does nothing, for testing.
+ pub(crate) struct NoopSchedule;
+
+ impl task::Schedule for NoopSchedule {
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+ }
+}
+
mod unowned_wrapper {
- use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{Id, JoinHandle, Notified};
+ use crate::runtime::tests::NoopSchedule;
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs
index 173e5b0..a79c0f5 100644
--- a/src/runtime/tests/task.rs
+++ b/src/runtime/tests/task.rs
@@ -1,5 +1,5 @@
-use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;
use std::collections::VecDeque;
diff --git a/src/runtime/thread_id.rs b/src/runtime/thread_id.rs
new file mode 100644
index 0000000..ef39289
--- /dev/null
+++ b/src/runtime/thread_id.rs
@@ -0,0 +1,31 @@
+use std::num::NonZeroU64;
+
+#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
+pub(crate) struct ThreadId(NonZeroU64);
+
+impl ThreadId {
+ pub(crate) fn next() -> Self {
+ use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
+
+ static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0);
+
+ let mut last = NEXT_ID.load(Relaxed);
+ loop {
+ let id = match last.checked_add(1) {
+ Some(id) => id,
+ None => exhausted(),
+ };
+
+ match NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) {
+ Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()),
+ Err(id) => last = id,
+ }
+ }
+ }
+}
+
+#[cold]
+#[allow(dead_code)]
+fn exhausted() -> ! {
+ panic!("failed to generate unique thread ID: bitspace exhausted")
+}
diff --git a/src/runtime/time/mod.rs b/src/runtime/time/mod.rs
index 240f8f1..f81cab8 100644
--- a/src/runtime/time/mod.rs
+++ b/src/runtime/time/mod.rs
@@ -222,7 +222,7 @@ impl Driver {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
- if clock.is_paused() {
+ if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
// If the time driver was woken, then the park completed
diff --git a/src/signal/registry.rs b/src/signal/registry.rs
index e1b3d10..48e98c8 100644
--- a/src/signal/registry.rs
+++ b/src/signal/registry.rs
@@ -5,7 +5,6 @@ use crate::sync::watch;
use crate::util::once_cell::OnceCell;
use std::ops;
-use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
pub(crate) type EventId = usize;
@@ -162,14 +161,14 @@ where
}
}
-pub(crate) fn globals() -> Pin<&'static Globals>
+pub(crate) fn globals() -> &'static Globals
where
OsExtraData: 'static + Send + Sync + Init,
OsStorage: 'static + Send + Sync + Init,
{
static GLOBALS: OnceCell<Globals> = OnceCell::new();
- Pin::new(GLOBALS.get(globals_init))
+ GLOBALS.get(globals_init)
}
#[cfg(all(test, not(loom)))]
diff --git a/src/signal/unix.rs b/src/signal/unix.rs
index 0e1329e..e5345fd 100644
--- a/src/signal/unix.rs
+++ b/src/signal/unix.rs
@@ -14,7 +14,6 @@ use crate::sync::watch;
use mio::net::UnixStream;
use std::io::{self, Error, ErrorKind, Write};
-use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
@@ -240,7 +239,7 @@ impl Default for SignalInfo {
/// 2. Wake up the driver by writing a byte to a pipe
///
/// Those two operations should both be async-signal safe.
-fn action(globals: Pin<&'static Globals>, signal: libc::c_int) {
+fn action(globals: &'static Globals, signal: libc::c_int) {
globals.record_event(signal as EventId);
// Send a wakeup, ignore any errors (anything reasonably possible is
diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs
index 452f3b1..1c6b2ca 100644
--- a/src/sync/broadcast.rs
+++ b/src/sync/broadcast.rs
@@ -18,6 +18,9 @@
//! returned [`Receiver`] will receive values sent **after** the call to
//! `subscribe`.
//!
+//! This channel is also suitable for the single-producer multi-consumer
+//! use-case, where a single sender broadcasts values to many receivers.
+//!
//! ## Lagging
//!
//! As sent messages must be retained until **all** [`Receiver`] handles receive
@@ -600,6 +603,97 @@ impl<T> Sender<T> {
new_receiver(shared)
}
+ /// Returns the number of queued values.
+ ///
+ /// A value is queued until it has either been seen by all receivers that were alive at the time
+ /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
+ /// queue's capacity.
+ ///
+ /// # Note
+ ///
+ /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
+ /// have been evicted from the queue before being seen by all receivers.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::broadcast;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx1) = broadcast::channel(16);
+ /// let mut rx2 = tx.subscribe();
+ ///
+ /// tx.send(10).unwrap();
+ /// tx.send(20).unwrap();
+ /// tx.send(30).unwrap();
+ ///
+ /// assert_eq!(tx.len(), 3);
+ ///
+ /// rx1.recv().await.unwrap();
+ ///
+ /// // The len is still 3 since rx2 hasn't seen the first value yet.
+ /// assert_eq!(tx.len(), 3);
+ ///
+ /// rx2.recv().await.unwrap();
+ ///
+ /// assert_eq!(tx.len(), 2);
+ /// }
+ /// ```
+ pub fn len(&self) -> usize {
+ let tail = self.shared.tail.lock();
+
+ let base_idx = (tail.pos & self.shared.mask as u64) as usize;
+ let mut low = 0;
+ let mut high = self.shared.buffer.len();
+ while low < high {
+ let mid = low + (high - low) / 2;
+ let idx = base_idx.wrapping_add(mid) & self.shared.mask;
+ if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
+ low = mid + 1;
+ } else {
+ high = mid;
+ }
+ }
+
+ self.shared.buffer.len() - low
+ }
+
+ /// Returns true if there are no queued values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::broadcast;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx1) = broadcast::channel(16);
+ /// let mut rx2 = tx.subscribe();
+ ///
+ /// assert!(tx.is_empty());
+ ///
+ /// tx.send(10).unwrap();
+ ///
+ /// assert!(!tx.is_empty());
+ ///
+ /// rx1.recv().await.unwrap();
+ ///
+ /// // The queue is still not empty since rx2 hasn't seen the value.
+ /// assert!(!tx.is_empty());
+ ///
+ /// rx2.recv().await.unwrap();
+ ///
+ /// assert!(tx.is_empty());
+ /// }
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ let tail = self.shared.tail.lock();
+
+ let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
+ self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
+ }
+
/// Returns the number of active receivers
///
/// An active receiver is a [`Receiver`] handle returned from [`channel`] or
@@ -728,7 +822,7 @@ impl<T> Receiver<T> {
/// assert_eq!(rx1.len(), 2);
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// assert_eq!(rx1.len(), 1);
- /// assert_eq!(rx1.recv().await.unwrap(), 20);
+ /// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert_eq!(rx1.len(), 0);
/// }
/// ```
@@ -758,7 +852,7 @@ impl<T> Receiver<T> {
///
/// assert!(!rx1.is_empty());
/// assert_eq!(rx1.recv().await.unwrap(), 10);
- /// assert_eq!(rx1.recv().await.unwrap(), 20);
+ /// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert!(rx1.is_empty());
/// }
/// ```
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 457e6ab..8fba196 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -94,6 +94,10 @@
//! producers to a single consumer. This channel is often used to send work to a
//! task or to receive the result of many computations.
//!
+//! This is also the channel you should use if you want to send many messages
+//! from a single producer to a single consumer. There is no dedicated spsc
+//! channel.
+//!
//! **Example:** using an mpsc to incrementally stream the results of a series
//! of computations.
//!
@@ -244,6 +248,10 @@
//! This channel tends to be used less often than `oneshot` and `mpsc` but still
//! has its use cases.
//!
+//! This is also the channel you should use if you want to broadcast values from
+//! a single producer to many consumers. There is no dedicated spmc broadcast
+//! channel.
+//!
//! Basic usage
//!
//! ```
diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs
index 58f4a9f..39c3e1b 100644
--- a/src/sync/mpsc/block.rs
+++ b/src/sync/mpsc/block.rs
@@ -1,6 +1,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
+use std::alloc::Layout;
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
@@ -10,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
pub(crate) struct Block<T> {
+ /// The header fields.
+ header: BlockHeader<T>,
+
+ /// Array containing values pushed into the block. Values are stored in a
+ /// continuous array in order to improve cache line behavior when reading.
+ /// The values must be manually dropped.
+ values: Values<T>,
+}
+
+/// Extra fields for a `Block<T>`.
+struct BlockHeader<T> {
/// The start index of this block.
///
/// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
@@ -24,11 +36,6 @@ pub(crate) struct Block<T> {
/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
observed_tail_position: UnsafeCell<usize>,
-
- /// Array containing values pushed into the block. Values are stored in a
- /// continuous array in order to improve cache line behavior when reading.
- /// The values must be manually dropped.
- values: Values<T>,
}
pub(crate) enum Read<T> {
@@ -36,6 +43,7 @@ pub(crate) enum Read<T> {
Closed,
}
+#[repr(transparent)]
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
use super::BLOCK_CAP;
@@ -71,28 +79,56 @@ pub(crate) fn offset(slot_index: usize) -> usize {
SLOT_MASK & slot_index
}
+generate_addr_of_methods! {
+ impl<T> Block<T> {
+ unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
+ &self.header
+ }
+
+ unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
+ &self.values
+ }
+ }
+}
+
impl<T> Block<T> {
- pub(crate) fn new(start_index: usize) -> Block<T> {
- Block {
- // The absolute index in the channel of the first slot in the block.
- start_index,
+ pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
+ unsafe {
+ // Allocate the block on the heap.
+ // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
+ let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
+ let block = match NonNull::new(block) {
+ Some(block) => block,
+ None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
+ };
+
+ // Write the header to the block.
+ Block::addr_of_header(block).as_ptr().write(BlockHeader {
+ // The absolute index in the channel of the first slot in the block.
+ start_index,
- // Pointer to the next block in the linked list.
- next: AtomicPtr::new(ptr::null_mut()),
+ // Pointer to the next block in the linked list.
+ next: AtomicPtr::new(ptr::null_mut()),
- ready_slots: AtomicUsize::new(0),
+ ready_slots: AtomicUsize::new(0),
- observed_tail_position: UnsafeCell::new(0),
+ observed_tail_position: UnsafeCell::new(0),
+ });
- // Value storage
- values: unsafe { Values::uninitialized() },
+ // Initialize the values array.
+ Values::initialize(Block::addr_of_values(block));
+
+ // Convert the pointer to a `Box`.
+ // Safety: The raw pointer was allocated using the global allocator, and with
+ // the layout for a `Block<T>`, so it's valid to convert it to box.
+ Box::from_raw(block.as_ptr())
}
}
/// Returns `true` if the block matches the given index.
pub(crate) fn is_at_index(&self, index: usize) -> bool {
debug_assert!(offset(index) == 0);
- self.start_index == index
+ self.header.start_index == index
}
/// Returns the number of blocks between `self` and the block at the
@@ -101,7 +137,7 @@ impl<T> Block<T> {
/// `start_index` must represent a block *after* `self`.
pub(crate) fn distance(&self, other_index: usize) -> usize {
debug_assert!(offset(other_index) == 0);
- other_index.wrapping_sub(self.start_index) / BLOCK_CAP
+ other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
}
/// Reads the value at the given offset.
@@ -116,7 +152,7 @@ impl<T> Block<T> {
pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
let offset = offset(slot_index);
- let ready_bits = self.ready_slots.load(Acquire);
+ let ready_bits = self.header.ready_slots.load(Acquire);
if !is_ready(ready_bits, offset) {
if is_tx_closed(ready_bits) {
@@ -156,7 +192,7 @@ impl<T> Block<T> {
/// Signal to the receiver that the sender half of the list is closed.
pub(crate) unsafe fn tx_close(&self) {
- self.ready_slots.fetch_or(TX_CLOSED, Release);
+ self.header.ready_slots.fetch_or(TX_CLOSED, Release);
}
/// Resets the block to a blank state. This enables reusing blocks in the
@@ -169,9 +205,9 @@ impl<T> Block<T> {
/// * All slots are empty.
/// * The caller holds a unique pointer to the block.
pub(crate) unsafe fn reclaim(&mut self) {
- self.start_index = 0;
- self.next = AtomicPtr::new(ptr::null_mut());
- self.ready_slots = AtomicUsize::new(0);
+ self.header.start_index = 0;
+ self.header.next = AtomicPtr::new(ptr::null_mut());
+ self.header.ready_slots = AtomicUsize::new(0);
}
/// Releases the block to the rx half for freeing.
@@ -187,19 +223,20 @@ impl<T> Block<T> {
pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
// Track the observed tail_position. Any sender targeting a greater
// tail_position is guaranteed to not access this block.
- self.observed_tail_position
+ self.header
+ .observed_tail_position
.with_mut(|ptr| *ptr = tail_position);
// Set the released bit, signalling to the receiver that it is safe to
// free the block's memory as soon as all slots **prior** to
// `observed_tail_position` have been filled.
- self.ready_slots.fetch_or(RELEASED, Release);
+ self.header.ready_slots.fetch_or(RELEASED, Release);
}
/// Mark a slot as ready
fn set_ready(&self, slot: usize) {
let mask = 1 << slot;
- self.ready_slots.fetch_or(mask, Release);
+ self.header.ready_slots.fetch_or(mask, Release);
}
/// Returns `true` when all slots have their `ready` bits set.
@@ -214,25 +251,31 @@ impl<T> Block<T> {
/// single atomic cell. However, this could have negative impact on cache
/// behavior as there would be many more mutations to a single slot.
pub(crate) fn is_final(&self) -> bool {
- self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
+ self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
}
/// Returns the `observed_tail_position` value, if set
pub(crate) fn observed_tail_position(&self) -> Option<usize> {
- if 0 == RELEASED & self.ready_slots.load(Acquire) {
+ if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
None
} else {
- Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
+ Some(
+ self.header
+ .observed_tail_position
+ .with(|ptr| unsafe { *ptr }),
+ )
}
}
/// Loads the next block
pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
- let ret = NonNull::new(self.next.load(ordering));
+ let ret = NonNull::new(self.header.next.load(ordering));
debug_assert!(unsafe {
- ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
- .unwrap_or(true)
+ ret.map(|block| {
+ block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
+ })
+ .unwrap_or(true)
});
ret
@@ -260,9 +303,10 @@ impl<T> Block<T> {
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
- block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
+ block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP);
let next_ptr = self
+ .header
.next
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);
@@ -291,7 +335,7 @@ impl<T> Block<T> {
// Create the new block. It is assumed that the block will become the
// next one after `&self`. If this turns out to not be the case,
// `start_index` is updated accordingly.
- let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
+ let new_block = Block::new(self.header.start_index + BLOCK_CAP);
let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
@@ -308,7 +352,8 @@ impl<T> Block<T> {
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(
- self.next
+ self.header
+ .next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);
@@ -360,19 +405,20 @@ fn is_tx_closed(bits: usize) -> bool {
}
impl<T> Values<T> {
- unsafe fn uninitialized() -> Values<T> {
- let mut vals = MaybeUninit::uninit();
-
+ /// Initialize a `Values` struct from a pointer.
+ ///
+ /// # Safety
+ ///
+ /// The raw pointer must be valid for writing a `Values<T>`.
+ unsafe fn initialize(_value: NonNull<Values<T>>) {
// When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
- let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
+ let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}
-
- Values(vals.assume_init())
}
}
@@ -383,3 +429,20 @@ impl<T> ops::Index<usize> for Values<T> {
self.0.index(index)
}
}
+
+#[cfg(all(test, not(loom)))]
+#[test]
+fn assert_no_stack_overflow() {
+ // https://github.com/tokio-rs/tokio/issues/5293
+
+ struct Foo {
+ _a: [u8; 2_000_000],
+ }
+
+ assert_eq!(
+ Layout::new::<MaybeUninit<Block<Foo>>>(),
+ Layout::new::<Block<Foo>>()
+ );
+
+ let _block = Block::<Foo>::new(0);
+}
diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs
index e4eeb45..10b2957 100644
--- a/src/sync/mpsc/list.rs
+++ b/src/sync/mpsc/list.rs
@@ -44,7 +44,7 @@ pub(crate) enum TryPopResult<T> {
pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
// Create the initial block shared between the tx and rx halves.
- let initial_block = Box::new(Block::new(0));
+ let initial_block = Block::new(0);
let initial_block_ptr = Box::into_raw(initial_block);
let tx = Tx {
diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs
index fff3091..33889fa 100644
--- a/src/sync/mpsc/mod.rs
+++ b/src/sync/mpsc/mod.rs
@@ -21,6 +21,9 @@
//! when additional capacity is available. In other words, the channel provides
//! backpressure.
//!
+//! This channel is also suitable for the single-producer single-consumer
+//! use-case. (Unless you only need to send one message, in which case you
+//! should use the [oneshot] channel.)
//!
//! # Disconnection
//!
@@ -62,7 +65,7 @@
//! in mind, but they can also be generalized to other kinds of channels. In
//! general, any channel method that isn't marked async can be called anywhere,
//! including outside of the runtime. For example, sending a message on a
-//! oneshot channel from outside the runtime is perfectly fine.
+//! [oneshot] channel from outside the runtime is perfectly fine.
//!
//! # Multiple runtimes
//!
@@ -82,6 +85,7 @@
//! [blocking-recv]: crate::sync::mpsc::Receiver::blocking_recv()
//! [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
//! [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
+//! [oneshot]: crate::sync::oneshot
//! [`Handle::block_on`]: crate::runtime::Handle::block_on()
//! [std-unbounded]: std::sync::mpsc::channel
//! [crossbeam-unbounded]: https://docs.rs/crossbeam/*/crossbeam/channel/fn.unbounded.html
diff --git a/src/task/join_set.rs b/src/task/join_set.rs
index f181302..e6d8d62 100644
--- a/src/task/join_set.rs
+++ b/src/task/join_set.rs
@@ -120,9 +120,9 @@ impl<T: 'static> JoinSet<T> {
/// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
/// that can be used to remotely cancel the task.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn` is called.
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
///
/// # Panics
///
@@ -143,9 +143,9 @@ impl<T: 'static> JoinSet<T> {
/// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
/// cancel the task.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn_on` is called.
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
///
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
@@ -162,9 +162,9 @@ impl<T: 'static> JoinSet<T> {
/// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
/// cancel the task.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn_local` is called.
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
///
/// # Panics
///
@@ -186,10 +186,8 @@ impl<T: 'static> JoinSet<T> {
/// remotely cancel the task.
///
/// Unlike the [`spawn_local`] method, this method may be used to spawn local
- /// tasks when the `LocalSet` is _not_ running. You do not have to `.await`
- /// the returned `JoinHandle` to make the provided future start execution.
- /// It will start running immediately whenever the `LocalSet` is next
- /// started.
+ /// tasks on a `LocalSet` that is _not_ currently running. The provided
+ /// future will start running whenever the `LocalSet` is next started.
///
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
diff --git a/src/task/local.rs b/src/task/local.rs
index e4a198b..0675faa 100644
--- a/src/task/local.rs
+++ b/src/task/local.rs
@@ -1,8 +1,8 @@
//! Runs `!Send` futures on the current thread.
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::{Arc, Mutex};
-use crate::loom::thread::{self, ThreadId};
use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
+use crate::runtime::{context, ThreadId};
use crate::sync::AtomicWaker;
use crate::util::RcCell;
@@ -277,12 +277,10 @@ pin_project! {
}
tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
- thread_id: Cell::new(None),
ctx: RcCell::new(),
} });
struct LocalData {
- thread_id: Cell<Option<ThreadId>>,
ctx: RcCell<Context>,
}
@@ -291,9 +289,9 @@ cfg_rt! {
///
/// The spawned future will run on the same thread that called `spawn_local`.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn_local` is called.
+ /// The provided future will start running in the background immediately
+ /// when `spawn_local` is called, even if you don't await the returned
+ /// `JoinHandle`.
///
/// # Panics
///
@@ -379,12 +377,14 @@ impl fmt::Debug for LocalEnterGuard {
impl LocalSet {
/// Returns a new local task set.
pub fn new() -> LocalSet {
+ let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
+
LocalSet {
tick: Cell::new(0),
context: Rc::new(Context {
shared: Arc::new(Shared {
local_state: LocalState {
- owner: thread_id().expect("cannot create LocalSet during thread shutdown"),
+ owner,
owned: LocalOwnedTasks::new(),
local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
},
@@ -417,10 +417,9 @@ impl LocalSet {
/// This task is guaranteed to be run on the current thread.
///
/// Unlike the free function [`spawn_local`], this method may be used to
- /// spawn local tasks when the `LocalSet` is _not_ running. You do not have
- /// to `.await` the returned `JoinHandle` to make the provided future start
- /// execution. It will start running immediately whenever the `LocalSet` is
- /// next started.
+ /// spawn local tasks when the `LocalSet` is _not_ running. The provided
+ /// future will start running once the `LocalSet` is next started, even if
+ /// you don't await the returned `JoinHandle`.
///
/// # Examples
///
@@ -949,7 +948,7 @@ impl Shared {
// We are on the thread that owns the `LocalSet`, so we can
// wake to the local queue.
- _ if localdata.get_id() == Some(self.local_state.owner) => {
+ _ if context::thread_id().ok() == Some(self.local_state.owner) => {
unsafe {
// Safety: we just checked that the thread ID matches
// the localset's owner, so this is safe.
@@ -1093,7 +1092,9 @@ impl LocalState {
// if we couldn't get the thread ID because we're dropping the local
// data, skip the assertion --- the `Drop` impl is not going to be
// called from another thread, because `LocalSet` is `!Send`
- thread_id().map(|id| id == self.owner).unwrap_or(true),
+ context::thread_id()
+ .map(|id| id == self.owner)
+ .unwrap_or(true),
"`LocalSet`'s local run queue must not be accessed by another thread!"
);
}
@@ -1103,26 +1104,6 @@ impl LocalState {
// ensure they are on the same thread that owns the `LocalSet`.
unsafe impl Send for LocalState {}
-impl LocalData {
- fn get_id(&self) -> Option<ThreadId> {
- self.thread_id.get()
- }
-
- fn get_or_insert_id(&self) -> ThreadId {
- self.thread_id.get().unwrap_or_else(|| {
- let id = thread::current().id();
- self.thread_id.set(Some(id));
- id
- })
- }
-}
-
-fn thread_id() -> Option<ThreadId> {
- CURRENT
- .try_with(|localdata| localdata.get_or_insert_id())
- .ok()
-}
-
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index 5db11a4..66b0d67 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -7,9 +7,9 @@ cfg_rt! {
/// Spawns a new asynchronous task, returning a
/// [`JoinHandle`](super::JoinHandle) for it.
///
- /// You do not have to `.await` the returned `JoinHandle` to make the
- /// provided future start execution. It will start running in the background
- /// immediately when `spawn` is called.
+ /// The provided future will start running in the background immediately
+ /// when `spawn` is called, even if you don't await the returned
+ /// `JoinHandle`.
///
/// Spawning a task enables the task to execute concurrently to other tasks. The
/// spawned task may execute on the current thread, or it may be sent to a
diff --git a/src/time/clock.rs b/src/time/clock.rs
index 0343c4f..cd11a67 100644
--- a/src/time/clock.rs
+++ b/src/time/clock.rs
@@ -65,6 +65,9 @@ cfg_test_util! {
/// Instant at which the clock was last unfrozen.
unfrozen: Option<std::time::Instant>,
+
+ /// Number of `inhibit_auto_advance` calls still in effect.
+ auto_advance_inhibit_count: usize,
}
/// Pauses time.
@@ -187,6 +190,7 @@ cfg_test_util! {
enable_pausing,
base: now,
unfrozen: Some(now),
+ auto_advance_inhibit_count: 0,
})),
};
@@ -212,9 +216,20 @@ cfg_test_util! {
inner.unfrozen = None;
}
- pub(crate) fn is_paused(&self) -> bool {
+ /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
+ pub(crate) fn inhibit_auto_advance(&self) {
+ let mut inner = self.inner.lock();
+ inner.auto_advance_inhibit_count += 1;
+ }
+
+ pub(crate) fn allow_auto_advance(&self) {
+ let mut inner = self.inner.lock();
+ inner.auto_advance_inhibit_count -= 1;
+ }
+
+ pub(crate) fn can_auto_advance(&self) -> bool {
let inner = self.inner.lock();
- inner.unfrozen.is_none()
+ inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0
}
#[track_caller]
diff --git a/src/time/sleep.rs b/src/time/sleep.rs
index d974e1a..0a012e2 100644
--- a/src/time/sleep.rs
+++ b/src/time/sleep.rs
@@ -357,7 +357,7 @@ impl Sleep {
fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline);
- (*me.inner).deadline = deadline;
+ (me.inner).deadline = deadline;
#[cfg(all(tokio_unstable, feature = "tracing"))]
{
diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs
index 9698f72..b46bd6d 100644
--- a/src/util/linked_list.rs
+++ b/src/util/linked_list.rs
@@ -126,7 +126,7 @@ impl<L: Link> LinkedList<L, L::Target> {
pub(crate) fn push_front(&mut self, val: L::Handle) {
// The value should not be dropped, it is being inserted into the list
let val = ManuallyDrop::new(val);
- let ptr = L::as_raw(&*val);
+ let ptr = L::as_raw(&val);
assert_ne!(self.head, Some(ptr));
unsafe {
L::pointers(ptr).as_mut().set_next(self.head);
diff --git a/src/util/mod.rs b/src/util/mod.rs
index 245e64d..9f6119a 100644
--- a/src/util/mod.rs
+++ b/src/util/mod.rs
@@ -6,7 +6,12 @@ cfg_io_driver! {
#[cfg(feature = "rt")]
pub(crate) mod atomic_cell;
-#[cfg(any(feature = "rt", feature = "signal", feature = "process"))]
+#[cfg(any(
+ feature = "rt",
+ feature = "signal",
+ feature = "process",
+ tokio_no_const_mutex_new,
+))]
pub(crate) mod once_cell;
#[cfg(any(
diff --git a/src/util/once_cell.rs b/src/util/once_cell.rs
index 138d2a7..1925f0a 100644
--- a/src/util/once_cell.rs
+++ b/src/util/once_cell.rs
@@ -25,7 +25,7 @@ impl<T> OnceCell<T> {
/// If the `init` closure panics, then the `OnceCell` is poisoned and all
/// future calls to `get` will panic.
#[inline]
- pub(crate) fn get(&self, init: fn() -> T) -> &T {
+ pub(crate) fn get(&self, init: impl FnOnce() -> T) -> &T {
if !self.once.is_completed() {
self.do_init(init);
}
@@ -41,7 +41,7 @@ impl<T> OnceCell<T> {
}
#[cold]
- fn do_init(&self, init: fn() -> T) {
+ fn do_init(&self, init: impl FnOnce() -> T) {
let value_ptr = self.value.get() as *mut T;
self.once.call_once(|| {
diff --git a/tests/_require_full.rs b/tests/_require_full.rs
index a339374..4b9698a 100644
--- a/tests/_require_full.rs
+++ b/tests/_require_full.rs
@@ -1,2 +1,8 @@
-#![cfg(not(any(feature = "full", tokio_wasm)))]
+#[cfg(not(any(feature = "full", tokio_wasm)))]
compile_error!("run main Tokio tests with `--features full`");
+
+// CI sets `--cfg tokio_no_parking_lot` when trying to run tests with
+// `parking_lot` disabled. This check prevents "silent failure" if `parking_lot`
+// accidentally gets enabled.
+#[cfg(all(tokio_no_parking_lot, feature = "parking_lot"))]
+compile_error!("parking_lot feature enabled when it should not be");
diff --git a/tests/buffered.rs b/tests/buffered.rs
index 19afebd..4251c3f 100644
--- a/tests/buffered.rs
+++ b/tests/buffered.rs
@@ -18,10 +18,10 @@ async fn echo_server() {
let msg = "foo bar baz";
let t = thread::spawn(move || {
- let mut s = assert_ok!(TcpStream::connect(&addr));
+ let mut s = assert_ok!(TcpStream::connect(addr));
let t2 = thread::spawn(move || {
- let mut s = assert_ok!(TcpStream::connect(&addr));
+ let mut s = assert_ok!(TcpStream::connect(addr));
let mut b = vec![0; msg.len() * N];
assert_ok!(s.read_exact(&mut b));
b
diff --git a/tests/io_driver.rs b/tests/io_driver.rs
index 2ca5630..97018e0 100644
--- a/tests/io_driver.rs
+++ b/tests/io_driver.rs
@@ -80,7 +80,7 @@ fn test_drop_on_notify() {
drop(task);
// Establish a connection to the acceptor
- let _s = TcpStream::connect(&addr).unwrap();
+ let _s = TcpStream::connect(addr).unwrap();
// Force the reactor to turn
rt.block_on(async {});
diff --git a/tests/macros_join.rs b/tests/macros_join.rs
index 4441582..9e4d234 100644
--- a/tests/macros_join.rs
+++ b/tests/macros_join.rs
@@ -1,5 +1,5 @@
#![cfg(feature = "macros")]
-#![allow(clippy::blacklisted_name)]
+#![allow(clippy::disallowed_names)]
use std::sync::Arc;
#[cfg(tokio_wasm_not_wasi)]
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index 60f3738..26d6fec 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -1,5 +1,5 @@
#![cfg(feature = "macros")]
-#![allow(clippy::blacklisted_name)]
+#![allow(clippy::disallowed_names)]
#[cfg(tokio_wasm_not_wasi)]
use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
diff --git a/tests/macros_try_join.rs b/tests/macros_try_join.rs
index 209516b..6c43222 100644
--- a/tests/macros_try_join.rs
+++ b/tests/macros_try_join.rs
@@ -1,5 +1,5 @@
#![cfg(feature = "macros")]
-#![allow(clippy::blacklisted_name)]
+#![allow(clippy::disallowed_names)]
use std::sync::Arc;
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 53248b2..3892998 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -661,7 +661,7 @@ rt_test! {
loop {
// Don't use Tokio's `yield_now()` to avoid special defer
// logic.
- let _: () = futures::future::poll_fn(|cx| {
+ futures::future::poll_fn::<(), _>(|cx| {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}).await;
diff --git a/tests/rt_metrics.rs b/tests/rt_metrics.rs
index 2a9f998..fdb2fb5 100644
--- a/tests/rt_metrics.rs
+++ b/tests/rt_metrics.rs
@@ -31,6 +31,19 @@ fn num_idle_blocking_threads() {
rt.block_on(async {
time::sleep(Duration::from_millis(5)).await;
});
+
+ // We need to wait until the blocking thread has become idle. Usually 5ms is
+ // enough for this to happen, but not always. When it isn't enough, sleep
+ // for another second. We don't always wait for a whole second since we want
+ // the test suite to finish quickly.
+ //
+ // Note that the timeout for idle threads to be killed is 10 seconds.
+ if 0 == rt.metrics().num_idle_blocking_threads() {
+ rt.block_on(async {
+ time::sleep(Duration::from_secs(1)).await;
+ });
+ }
+
assert_eq!(1, rt.metrics().num_idle_blocking_threads());
}
@@ -128,7 +141,7 @@ fn worker_noop_count() {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
- assert!(2 <= metrics.worker_noop_count(0));
+ assert!(0 < metrics.worker_noop_count(0));
let rt = threaded();
let metrics = rt.metrics();
@@ -136,8 +149,8 @@ fn worker_noop_count() {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
- assert!(1 <= metrics.worker_noop_count(0));
- assert!(1 <= metrics.worker_noop_count(1));
+ assert!(0 < metrics.worker_noop_count(0));
+ assert!(0 < metrics.worker_noop_count(1));
}
#[test]
diff --git a/tests/support/leaked_buffers.rs b/tests/support/leaked_buffers.rs
index 3ee8a18..a6079fb 100644
--- a/tests/support/leaked_buffers.rs
+++ b/tests/support/leaked_buffers.rs
@@ -18,9 +18,9 @@ impl LeakedBuffers {
}
}
pub unsafe fn create<'a>(&mut self, size: usize) -> &'a mut [u8] {
- let mut new_mem = vec![0u8; size].into_boxed_slice();
- let slice = std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len());
+ let new_mem = vec![0u8; size].into_boxed_slice();
self.leaked_vecs.push(new_mem);
- slice
+ let new_mem = self.leaked_vecs.last_mut().unwrap();
+ std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len())
}
}
diff --git a/tests/support/panic.rs b/tests/support/panic.rs
index 7f60c76..df2f59d 100644
--- a/tests/support/panic.rs
+++ b/tests/support/panic.rs
@@ -1,9 +1,8 @@
-use parking_lot::{const_mutex, Mutex};
use std::panic;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
- static PANIC_MUTEX: Mutex<()> = const_mutex(());
+ static PANIC_MUTEX: Mutex<()> = Mutex::new(());
{
let _guard = PANIC_MUTEX.lock();
@@ -16,6 +15,7 @@ pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<Stri
let panic_location = panic_info.location().unwrap();
panic_file
.lock()
+ .unwrap()
.clone_from(&Some(panic_location.file().to_string()));
}));
}
@@ -26,7 +26,7 @@ pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<Stri
panic::set_hook(prev_hook);
if result.is_err() {
- panic_file.lock().clone()
+ panic_file.lock().unwrap().clone()
} else {
None
}
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
index 2221fe3..cd66924 100644
--- a/tests/sync_broadcast.rs
+++ b/tests/sync_broadcast.rs
@@ -527,3 +527,63 @@ fn resubscribe_to_closed_channel() {
let mut rx_resub = rx.resubscribe();
assert_closed!(rx_resub.try_recv());
}
+
+#[test]
+fn sender_len() {
+ let (tx, mut rx1) = broadcast::channel(4);
+ let mut rx2 = tx.subscribe();
+
+ assert_eq!(tx.len(), 0);
+ assert!(tx.is_empty());
+
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ tx.send(3).unwrap();
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx1);
+ assert_recv!(rx1);
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx2);
+
+ assert_eq!(tx.len(), 2);
+ assert!(!tx.is_empty());
+
+ tx.send(4).unwrap();
+ tx.send(5).unwrap();
+ tx.send(6).unwrap();
+
+ assert_eq!(tx.len(), 4);
+ assert!(!tx.is_empty());
+}
+
+#[test]
+#[cfg(not(tokio_wasm_not_wasi))]
+fn sender_len_random() {
+ use rand::Rng;
+
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ for _ in 0..1000 {
+ match rand::thread_rng().gen_range(0..4) {
+ 0 => {
+ let _ = rx1.try_recv();
+ }
+ 1 => {
+ let _ = rx2.try_recv();
+ }
+ _ => {
+ tx.send(0).unwrap();
+ }
+ }
+
+ let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
+ assert_eq!(tx.len(), expected_len);
+ }
+}
diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs
index 18eaf93..38dfa7c 100644
--- a/tests/sync_once_cell.rs
+++ b/tests/sync_once_cell.rs
@@ -4,178 +4,7 @@
use std::mem;
use std::ops::Drop;
use std::sync::atomic::{AtomicU32, Ordering};
-use std::time::Duration;
-use tokio::runtime;
-use tokio::sync::{OnceCell, SetError};
-use tokio::time;
-
-async fn func1() -> u32 {
- 5
-}
-
-async fn func2() -> u32 {
- time::sleep(Duration::from_millis(1)).await;
- 10
-}
-
-async fn func_err() -> Result<u32, ()> {
- Err(())
-}
-
-async fn func_ok() -> Result<u32, ()> {
- Ok(10)
-}
-
-async fn func_panic() -> u32 {
- time::sleep(Duration::from_millis(1)).await;
- panic!();
-}
-
-async fn sleep_and_set() -> u32 {
- // Simulate sleep by pausing time and waiting for another thread to
- // resume clock when calling `set`, then finding the cell being initialized
- // by this call
- time::sleep(Duration::from_millis(2)).await;
- 5
-}
-
-async fn advance_time_and_set(cell: &'static OnceCell<u32>, v: u32) -> Result<(), SetError<u32>> {
- time::advance(Duration::from_millis(1)).await;
- cell.set(v)
-}
-
-#[test]
-fn get_or_init() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .start_paused(true)
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
- let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await });
-
- time::advance(Duration::from_millis(1)).await;
- time::resume();
-
- let result1 = handle1.await.unwrap();
- let result2 = handle2.await.unwrap();
-
- assert_eq!(*result1, 5);
- assert_eq!(*result2, 5);
- });
-}
-
-#[test]
-fn get_or_init_panic() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- time::pause();
-
- let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
- let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await });
-
- time::advance(Duration::from_millis(1)).await;
-
- let result1 = handle1.await.unwrap();
- let result2 = handle2.await.unwrap();
-
- assert_eq!(*result1, 5);
- assert_eq!(*result2, 5);
- });
-}
-
-#[test]
-fn set_and_get() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- let _ = rt.spawn(async { ONCE.set(5) }).await;
- let value = ONCE.get().unwrap();
- assert_eq!(*value, 5);
- });
-}
-
-#[test]
-fn get_uninit() {
- static ONCE: OnceCell<u32> = OnceCell::const_new();
- let uninit = ONCE.get();
- assert!(uninit.is_none());
-}
-
-#[test]
-fn set_twice() {
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- let first = ONCE.set(5);
- assert_eq!(first, Ok(()));
- let second = ONCE.set(6);
- assert!(second.err().unwrap().is_already_init_err());
-}
-
-#[test]
-fn set_while_initializing() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- time::pause();
-
- let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await });
- let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await });
-
- time::advance(Duration::from_millis(2)).await;
-
- let result1 = handle1.await.unwrap();
- let result2 = handle2.await.unwrap();
-
- assert_eq!(*result1, 5);
- assert!(result2.err().unwrap().is_initializing_err());
- });
-}
-
-#[test]
-fn get_or_try_init() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .start_paused(true)
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
- let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
-
- time::advance(Duration::from_millis(1)).await;
- time::resume();
-
- let result1 = handle1.await.unwrap();
- assert!(result1.is_err());
-
- let result2 = handle2.await.unwrap();
- assert_eq!(*result2.unwrap(), 10);
- });
-}
+use tokio::sync::OnceCell;
#[test]
fn drop_cell() {
@@ -272,3 +101,185 @@ fn from() {
let cell = OnceCell::from(2);
assert_eq!(*cell.get().unwrap(), 2);
}
+
+#[cfg(feature = "parking_lot")]
+mod parking_lot {
+ use super::*;
+
+ use tokio::runtime;
+ use tokio::sync::SetError;
+ use tokio::time;
+
+ use std::time::Duration;
+
+ async fn func1() -> u32 {
+ 5
+ }
+
+ async fn func2() -> u32 {
+ time::sleep(Duration::from_millis(1)).await;
+ 10
+ }
+
+ async fn func_err() -> Result<u32, ()> {
+ Err(())
+ }
+
+ async fn func_ok() -> Result<u32, ()> {
+ Ok(10)
+ }
+
+ async fn func_panic() -> u32 {
+ time::sleep(Duration::from_millis(1)).await;
+ panic!();
+ }
+
+ async fn sleep_and_set() -> u32 {
+ // Simulate sleep by pausing time and waiting for another thread to
+ // resume clock when calling `set`, then finding the cell being initialized
+ // by this call
+ time::sleep(Duration::from_millis(2)).await;
+ 5
+ }
+
+ async fn advance_time_and_set(
+ cell: &'static OnceCell<u32>,
+ v: u32,
+ ) -> Result<(), SetError<u32>> {
+ time::advance(Duration::from_millis(1)).await;
+ cell.set(v)
+ }
+
+ #[test]
+ fn get_or_init() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await });
+
+ time::advance(Duration::from_millis(1)).await;
+ time::resume();
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert_eq!(*result2, 5);
+ });
+ }
+
+ #[test]
+ fn get_or_init_panic() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ time::pause();
+
+ let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await });
+
+ time::advance(Duration::from_millis(1)).await;
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert_eq!(*result2, 5);
+ });
+ }
+
+ #[test]
+ fn set_and_get() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let _ = rt.spawn(async { ONCE.set(5) }).await;
+ let value = ONCE.get().unwrap();
+ assert_eq!(*value, 5);
+ });
+ }
+
+ #[test]
+ fn get_uninit() {
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+ let uninit = ONCE.get();
+ assert!(uninit.is_none());
+ }
+
+ #[test]
+ fn set_twice() {
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ let first = ONCE.set(5);
+ assert_eq!(first, Ok(()));
+ let second = ONCE.set(6);
+ assert!(second.err().unwrap().is_already_init_err());
+ }
+
+ #[test]
+ fn set_while_initializing() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ time::pause();
+
+ let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await });
+ let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await });
+
+ time::advance(Duration::from_millis(2)).await;
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert!(result2.err().unwrap().is_initializing_err());
+ });
+ }
+
+ #[test]
+ fn get_or_try_init() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
+
+ time::advance(Duration::from_millis(1)).await;
+ time::resume();
+
+ let result1 = handle1.await.unwrap();
+ assert!(result1.is_err());
+
+ let result2 = handle2.await.unwrap();
+ assert_eq!(*result2.unwrap(), 10);
+ });
+ }
+}
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
index 2e0881c..d82a0e0 100644
--- a/tests/task_blocking.rs
+++ b/tests/task_blocking.rs
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
-use tokio::{runtime, task};
+use tokio::{runtime, task, time};
use tokio_test::assert_ok;
use std::thread;
@@ -227,3 +227,84 @@ fn coop_disabled_in_block_in_place_in_block_on() {
done_rx.recv().unwrap().unwrap();
}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn blocking_when_paused() {
+ // Do not auto-advance time when we have started a blocking task that has
+ // not yet finished.
+ time::timeout(
+ Duration::from_secs(3),
+ task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
+ )
+ .await
+ .expect("timeout should not trigger")
+ .expect("blocking task should finish");
+
+ // Really: Do not auto-advance time, even if the timeout is short and the
+ // blocking task runs for longer than that. It doesn't matter: Tokio time
+ // is paused; system time is not.
+ time::timeout(
+ Duration::from_millis(1),
+ task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
+ )
+ .await
+ .expect("timeout should not trigger")
+ .expect("blocking task should finish");
+}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn blocking_task_wakes_paused_runtime() {
+ let t0 = std::time::Instant::now();
+ time::timeout(
+ Duration::from_secs(15),
+ task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
+ )
+ .await
+ .expect("timeout should not trigger")
+ .expect("blocking task should finish");
+ assert!(
+ t0.elapsed() < Duration::from_secs(10),
+ "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
+ );
+}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn unawaited_blocking_task_wakes_paused_runtime() {
+ let t0 = std::time::Instant::now();
+
+ // When this task finishes, time should auto-advance, even though the
+ // JoinHandle has not been awaited yet.
+ let a = task::spawn_blocking(|| {
+ thread::sleep(Duration::from_millis(1));
+ });
+
+ crate::time::sleep(Duration::from_secs(15)).await;
+ a.await.expect("blocking task should finish");
+ assert!(
+ t0.elapsed() < Duration::from_secs(10),
+ "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
+ );
+}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn panicking_blocking_task_wakes_paused_runtime() {
+ let t0 = std::time::Instant::now();
+ let result = time::timeout(
+ Duration::from_secs(15),
+ task::spawn_blocking(|| {
+ thread::sleep(Duration::from_millis(1));
+ panic!("blocking task panicked");
+ }),
+ )
+ .await
+ .expect("timeout should not trigger");
+ assert!(result.is_err(), "blocking task should have panicked");
+ assert!(
+ t0.elapsed() < Duration::from_secs(10),
+ "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
+ );
+}
diff --git a/tests/task_join_set.rs b/tests/task_join_set.rs
index 20d4927..b1b6cf9 100644
--- a/tests/task_join_set.rs
+++ b/tests/task_join_set.rs
@@ -5,8 +5,6 @@ use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio::time::Duration;
-use futures::future::FutureExt;
-
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
@@ -156,49 +154,6 @@ fn runtime_gone() {
.is_cancelled());
}
-// This ensures that `join_next` works correctly when the coop budget is
-// exhausted.
-#[tokio::test(flavor = "current_thread")]
-async fn join_set_coop() {
- // Large enough to trigger coop.
- const TASK_NUM: u32 = 1000;
-
- static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
-
- let mut set = JoinSet::new();
-
- for _ in 0..TASK_NUM {
- set.spawn(async {
- SEM.add_permits(1);
- });
- }
-
- // Wait for all tasks to complete.
- //
- // Since this is a `current_thread` runtime, there's no race condition
- // between the last permit being added and the task completing.
- let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
-
- let mut count = 0;
- let mut coop_count = 0;
- loop {
- match set.join_next().now_or_never() {
- Some(Some(Ok(()))) => {}
- Some(Some(Err(err))) => panic!("failed: {}", err),
- None => {
- coop_count += 1;
- tokio::task::yield_now().await;
- continue;
- }
- Some(None) => break,
- }
-
- count += 1;
- }
- assert!(coop_count >= 1);
- assert_eq!(count, TASK_NUM);
-}
-
#[tokio::test(start_paused = true)]
async fn abort_all() {
let mut set: JoinSet<()> = JoinSet::new();
@@ -228,3 +183,53 @@ async fn abort_all() {
assert_eq!(count, 10);
assert_eq!(set.len(), 0);
}
+
+#[cfg(feature = "parking_lot")]
+mod parking_lot {
+ use super::*;
+
+ use futures::future::FutureExt;
+
+ // This ensures that `join_next` works correctly when the coop budget is
+ // exhausted.
+ #[tokio::test(flavor = "current_thread")]
+ async fn join_set_coop() {
+ // Large enough to trigger coop.
+ const TASK_NUM: u32 = 1000;
+
+ static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
+
+ let mut set = JoinSet::new();
+
+ for _ in 0..TASK_NUM {
+ set.spawn(async {
+ SEM.add_permits(1);
+ });
+ }
+
+ // Wait for all tasks to complete.
+ //
+ // Since this is a `current_thread` runtime, there's no race condition
+ // between the last permit being added and the task completing.
+ let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
+
+ let mut count = 0;
+ let mut coop_count = 0;
+ loop {
+ match set.join_next().now_or_never() {
+ Some(Some(Ok(()))) => {}
+ Some(Some(Err(err))) => panic!("failed: {}", err),
+ None => {
+ coop_count += 1;
+ tokio::task::yield_now().await;
+ continue;
+ }
+ Some(None) => break,
+ }
+
+ count += 1;
+ }
+ assert!(coop_count >= 1);
+ assert_eq!(count, TASK_NUM);
+ }
+}
diff --git a/tests/tcp_peek.rs b/tests/tcp_peek.rs
index 03813c2..b712023 100644
--- a/tests/tcp_peek.rs
+++ b/tests/tcp_peek.rs
@@ -15,7 +15,7 @@ async fn peek() {
let addr = listener.local_addr().unwrap();
let t = thread::spawn(move || assert_ok!(listener.accept()).0);
- let left = net::TcpStream::connect(&addr).unwrap();
+ let left = net::TcpStream::connect(addr).unwrap();
let mut right = t.join().unwrap();
let _ = right.write(&[1, 2, 3, 4]).unwrap();