From 36665bc1d11392e7c0fdeabe06bdaaa6e64db62a Mon Sep 17 00:00:00 2001 From: Jeff Vander Stoep Date: Fri, 24 Nov 2023 15:34:34 +0100 Subject: Upgrade async_task to 4.5.0 This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update rust/crates/async-task For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I16e93bf8399f49f5fc537d9c1205bb25fd8ac97a --- .cargo_vcs_info.json | 2 +- Android.bp | 2 +- CHANGELOG.md | 16 ++ Cargo.lock.saved | 443 ------------------------------- Cargo.toml | 15 +- Cargo.toml.orig | 10 +- METADATA | 26 +- examples/with-metadata.rs | 145 ++++++++++ src/header.rs | 23 +- src/lib.rs | 10 +- src/raw.rs | 126 ++++++--- src/runnable.rs | 657 +++++++++++++++++++++++++++++++++++++++------- src/task.rs | 97 ++++--- src/utils.rs | 6 +- tests/metadata.rs | 58 ++++ tests/panic.rs | 4 +- tests/waker_ready.rs | 1 + 17 files changed, 1015 insertions(+), 626 deletions(-) delete mode 100644 Cargo.lock.saved create mode 100644 examples/with-metadata.rs create mode 100644 tests/metadata.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index e80a435..14a73fc 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "f910d25edb04d05a24c9e58d73a4e5d8a31163a6" + "sha1": "af1ed48bad0f0e2e631d09d878bdbfd50f1c5058" }, "path_in_vcs": "" } \ No newline at end of file diff --git a/Android.bp b/Android.bp index d7c98f5..03a4787 100644 --- a/Android.bp +++ b/Android.bp @@ -42,7 +42,7 @@ rust_library { host_supported: true, crate_name: "async_task", cargo_env_compat: true, - cargo_pkg_version: "4.3.0", + cargo_pkg_version: "4.5.0", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a3e0b..475b0dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,19 @@ +# Version 4.5.0 + +- Add a `portable-atomic` feature that enables the usage of fallback primitives for CPUs without atomics. (#58) + +# Version 4.4.1 + +- Clarify safety documentation for `spawn_unchecked`. (#49) + +# Version 4.4.0 + +- Ensure that the allocation doesn't exceed `isize::MAX` (#32) +- Add `FallibleTask::is_finished()` (#34) +- Add a metadata generic parameter to tasks (#33) +- Add panic propagation to tasks (#37) +- Add a way to tell if the task was woken while running from the schedule function (#42) + # Version 4.3.0 - Bump MSRV to Rust 1.47. (#30) diff --git a/Cargo.lock.saved b/Cargo.lock.saved deleted file mode 100644 index 388f5ba..0000000 --- a/Cargo.lock.saved +++ /dev/null @@ -1,443 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-executor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" -dependencies = [ - "async-task 4.2.0", - "concurrent-queue", - "fastrand", - "futures-lite", - "once_cell", - "slab", -] - -[[package]] -name = "async-fs" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2" -dependencies = [ - "async-lock", - "blocking", - "futures-lite", -] - -[[package]] -name = "async-io" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07" -dependencies = [ - "concurrent-queue", - "futures-lite", - "libc", - "log", - "once_cell", - "parking", - "polling", - "slab", - "socket2", - "waker-fn", - "winapi", -] - -[[package]] -name = "async-lock" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-net" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5373304df79b9b4395068fb080369ec7178608827306ce4d081cba51cac551df" -dependencies = [ - "async-io", - "blocking", - "futures-lite", -] - -[[package]] -name = "async-process" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c" -dependencies = [ - "async-io", - "blocking", - "cfg-if", - "event-listener", - "futures-lite", - "libc", - "once_cell", - "signal-hook", - "winapi", -] - -[[package]] -name = "async-task" -version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" - -[[package]] -name = "async-task" -version = "4.3.0" -dependencies = [ - "atomic-waker", - "easy-parallel", - "flaky_test", - "flume", - "once_cell", - "smol", -] - -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - -[[package]] -name = "autocfg" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" - -[[package]] -name = "blocking" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc" -dependencies = [ - "async-channel", - "async-task 4.2.0", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - -[[package]] -name = "cache-padded" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" - -[[package]] -name = "cc" -version = "1.0.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - -[[package]] -name = "easy-parallel" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6907e25393cdcc1f4f3f513d9aac1e840eb1cc341a0fccb01171f7d14d10b946" - -[[package]] -name = "event-listener" -version = "2.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" - -[[package]] -name = "fastrand" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" -dependencies = [ - "instant", -] - -[[package]] -name = "flaky_test" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479cde5eb168cf5a056dd98f311cbfab7494c216394e4fb9eba0336827a8db93" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "flume" -version = "0.10.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da" -dependencies = [ - "spin", -] - -[[package]] -name = "futures-core" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" - -[[package]] -name = "futures-io" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" - -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "libc" -version = "0.2.126" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" - -[[package]] -name = "lock_api" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" -dependencies = [ - "autocfg", - "scopeguard", -] - -[[package]] -name = "log" -version = "0.4.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "memchr" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" - -[[package]] -name = "once_cell" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" - -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - -[[package]] -name = "pin-project-lite" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" - -[[package]] -name = "polling" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259" -dependencies = [ - "cfg-if", - "libc", - "log", - "wepoll-ffi", - "winapi", -] - -[[package]] -name = "proc-macro2" -version = "1.0.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd96a1e8ed2596c337f8eae5f24924ec83f5ad5ab21ea8e455d3566c69fbcaf7" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "quote" -version = "1.0.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "scopeguard" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" - -[[package]] -name = "signal-hook" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" -dependencies = [ - "libc", - "signal-hook-registry", -] - -[[package]] -name = "signal-hook-registry" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" -dependencies = [ - "libc", -] - -[[package]] -name = "slab" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" - -[[package]] -name = "smol" -version = "1.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cf3b5351f3e783c1d79ab5fc604eeed8b8ae9abd36b166e8b87a089efd85e4" -dependencies = [ - "async-channel", - "async-executor", - "async-fs", - "async-io", - "async-lock", - "async-net", - "async-process", - "blocking", - "futures-lite", - "once_cell", -] - -[[package]] -name = "socket2" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" -dependencies = [ - "libc", - "winapi", -] - -[[package]] -name = "spin" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" -dependencies = [ - "lock_api", -] - -[[package]] -name = "syn" -version = "1.0.98" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c50aef8a904de4c23c788f104b7dddc7d6f79c647c7c8ce4cc8f73eb0ca773dd" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "unicode-ident" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" - -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - -[[package]] -name = "wepoll-ffi" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" -dependencies = [ - "cc", -] - -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index 6e989e5..0b06c5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" rust-version = "1.47" name = "async-task" -version = "4.3.0" +version = "4.5.0" authors = ["Stjepan Glavina "] exclude = ["/.*"] description = "Task abstraction for building executors" @@ -32,6 +32,11 @@ categories = [ license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-task" +[dependencies.portable-atomic] +version = "1" +optional = true +default-features = false + [dev-dependencies.atomic-waker] version = "1" @@ -42,12 +47,18 @@ version = "3" version = "0.1" [dev-dependencies.flume] -version = "0.10" +version = "0.11" default-features = false +[dev-dependencies.futures-lite] +version = "1.12.0" + [dev-dependencies.once_cell] version = "1" +[dev-dependencies.pin-project-lite] +version = "0.2.10" + [dev-dependencies.smol] version = "1" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 8c611bf..9a8ee61 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -3,7 +3,7 @@ name = "async-task" # When publishing a new version: # - Update CHANGELOG.md # - Create "v4.x.y" git tag -version = "4.3.0" +version = "4.5.0" authors = ["Stjepan Glavina "] edition = "2018" rust-version = "1.47" @@ -18,12 +18,18 @@ exclude = ["/.*"] default = ["std"] std = [] +[dependencies] +# Uses portable-atomic polyfill atomics on targets without them +portable-atomic = { version = "1", optional = true, default-features = false } + [dev-dependencies] atomic-waker = "1" easy-parallel = "3" flaky_test = "0.1" -flume = { version = "0.10", default-features = false } +flume = { version = "0.11", default-features = false } +futures-lite = "1.12.0" once_cell = "1" +pin-project-lite = "0.2.10" smol = "1" # rewrite dependencies to use the this version of async-task when running tests diff --git a/METADATA b/METADATA index 799c6da..baaf343 100644 --- a/METADATA +++ b/METADATA @@ -1,23 +1,23 @@ # This project was upgraded with external_updater. # Usage: tools/external_updater/updater.sh update rust/crates/async-task -# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md +# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md name: "async_task" description: "Task abstraction for building executors" third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/async-task" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/async_task/async_task-4.3.0.crate" - } - version: "4.3.0" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 6 + year: 2023 + month: 11 + day: 24 + } + identifier { + type: "HOMEPAGE" + value: "https://crates.io/crates/async-task" + } + identifier { + type: "ARCHIVE" + value: "https://static.crates.io/crates/async_task/async_task-4.5.0.crate" + version: "4.5.0" } } diff --git a/examples/with-metadata.rs b/examples/with-metadata.rs new file mode 100644 index 0000000..ed84e31 --- /dev/null +++ b/examples/with-metadata.rs @@ -0,0 +1,145 @@ +//! A single threaded executor that uses shortest-job-first scheduling. + +use std::cell::RefCell; +use std::collections::BinaryHeap; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::thread; +use std::time::{Duration, Instant}; +use std::{cell::Cell, future::Future}; + +use async_task::{Builder, Runnable, Task}; +use pin_project_lite::pin_project; +use smol::{channel, future}; + +struct ByDuration(Runnable); + +impl ByDuration { + fn duration(&self) -> Duration { + self.0.metadata().inner.get() + } +} + +impl PartialEq for ByDuration { + fn eq(&self, other: &Self) -> bool { + self.duration() == other.duration() + } +} + +impl Eq for ByDuration {} + +impl PartialOrd for ByDuration { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByDuration { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.duration().cmp(&other.duration()).reverse() + } +} + +pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct MeasureRuntime<'a, F> { + #[pin] + f: F, + duration: &'a Cell + } +} + +impl<'a, F: Future> Future for MeasureRuntime<'a, F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let duration_cell: &Cell = this.duration; + let start = Instant::now(); + let res = F::poll(this.f, cx); + let new_duration = Instant::now() - start; + duration_cell.set(duration_cell.get() / 2 + new_duration / 2); + res + } +} + +pub struct DurationMetadata { + inner: Cell, +} + +thread_local! { + // A queue that holds scheduled tasks. + static QUEUE: RefCell> = RefCell::new(BinaryHeap::new()); +} + +fn make_future_fn<'a, F>( + future: F, +) -> impl (FnOnce(&'a DurationMetadata) -> MeasureRuntime<'a, F>) { + move |duration_meta| MeasureRuntime { + f: future, + duration: &duration_meta.inner, + } +} + +fn ensure_safe_schedule(f: F) -> F { + f +} + +/// Spawns a future on the executor. +pub fn spawn(future: F) -> Task +where + F: Future + 'static, + T: 'static, +{ + let spawn_thread_id = thread::current().id(); + // Create a task that is scheduled by pushing it into the queue. + let schedule = ensure_safe_schedule(move |runnable| { + if thread::current().id() != spawn_thread_id { + panic!("Task would be run on a different thread than spawned on."); + } + QUEUE.with(move |queue| queue.borrow_mut().push(ByDuration(runnable))); + }); + let future_fn = make_future_fn(future); + let (runnable, task) = unsafe { + Builder::new() + .metadata(DurationMetadata { + inner: Cell::new(Duration::default()), + }) + .spawn_unchecked(future_fn, schedule) + }; + + // Schedule the task by pushing it into the queue. + runnable.schedule(); + + task +} + +pub fn block_on(future: F) +where + F: Future + 'static, +{ + let task = spawn(future); + while !task.is_finished() { + let Some(runnable) = QUEUE.with(|queue| queue.borrow_mut().pop()) else { + thread::yield_now(); + continue; + }; + runnable.0.run(); + } +} + +fn main() { + // Spawn a future and await its result. + block_on(async { + let (sender, receiver) = channel::bounded(1); + let world = spawn(async move { + receiver.recv().await.unwrap(); + println!("world.") + }); + let hello = spawn(async move { + sender.send(()).await.unwrap(); + print!("Hello, ") + }); + future::zip(hello, world).await; + }); +} diff --git a/src/header.rs b/src/header.rs index 8a3a0b9..ee84035 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,8 +1,13 @@ use core::cell::UnsafeCell; use core::fmt; -use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::Waker; +#[cfg(not(feature = "portable-atomic"))] +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering; +#[cfg(feature = "portable-atomic")] +use portable_atomic::AtomicUsize; + use crate::raw::TaskVTable; use crate::state::*; use crate::utils::abort_on_panic; @@ -10,7 +15,7 @@ use crate::utils::abort_on_panic; /// The header of a task. /// /// This header is stored in memory at the beginning of the heap-allocated task. -pub(crate) struct Header { +pub(crate) struct Header { /// Current state of the task. /// /// Contains flags representing the current state and the reference count. @@ -26,9 +31,18 @@ pub(crate) struct Header { /// In addition to the actual waker virtual table, it also contains pointers to several other /// methods necessary for bookkeeping the heap-allocated task. pub(crate) vtable: &'static TaskVTable, + + /// Metadata associated with the task. + /// + /// This metadata may be provided to the user. + pub(crate) metadata: M, + + /// Whether or not a panic that occurs in the task should be propagated. + #[cfg(feature = "std")] + pub(crate) propagate_panic: bool, } -impl Header { +impl Header { /// Notifies the awaiter blocked on this task. /// /// If the awaiter is the same as the current waker, it will not be notified. @@ -145,7 +159,7 @@ impl Header { } } -impl fmt::Debug for Header { +impl fmt::Debug for Header { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let state = self.state.load(Ordering::SeqCst); @@ -157,6 +171,7 @@ impl fmt::Debug for Header { .field("awaiter", &(state & AWAITER != 0)) .field("task", &(state & TASK != 0)) .field("ref_count", &(state / REFERENCE)) + .field("metadata", &self.metadata) .finish() } } diff --git a/src/lib.rs b/src/lib.rs index dd689ec..67b3b62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,12 @@ #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] +#![doc( + html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" +)] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" +)] extern crate alloc; @@ -92,7 +98,9 @@ mod state; mod task; mod utils; -pub use crate::runnable::{spawn, spawn_unchecked, Runnable}; +pub use crate::runnable::{ + spawn, spawn_unchecked, Builder, Runnable, Schedule, ScheduleInfo, WithInfo, +}; pub use crate::task::{FallibleTask, Task}; #[cfg(feature = "std")] diff --git a/src/raw.rs b/src/raw.rs index bb031da..50109ab 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,21 +1,34 @@ use alloc::alloc::Layout as StdLayout; use core::cell::UnsafeCell; use core::future::Future; +use core::marker::PhantomData; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; -use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +#[cfg(not(feature = "portable-atomic"))] +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering; +#[cfg(feature = "portable-atomic")] +use portable_atomic::AtomicUsize; + use crate::header::Header; +use crate::runnable::{Schedule, ScheduleInfo}; use crate::state::*; use crate::utils::{abort, abort_on_panic, max, Layout}; use crate::Runnable; +#[cfg(feature = "std")] +pub(crate) type Panic = alloc::boxed::Box; + +#[cfg(not(feature = "std"))] +pub(crate) type Panic = core::convert::Infallible; + /// The vtable for a task. pub(crate) struct TaskVTable { /// Schedules the task. - pub(crate) schedule: unsafe fn(*const ()), + pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), /// Drops the future inside the task. pub(crate) drop_future: unsafe fn(*const ()), @@ -64,9 +77,9 @@ pub(crate) struct TaskLayout { } /// Raw pointers to the fields inside a task. -pub(crate) struct RawTask { +pub(crate) struct RawTask { /// The task header. - pub(crate) header: *const Header, + pub(crate) header: *const Header, /// The schedule function. pub(crate) schedule: *const S, @@ -75,28 +88,28 @@ pub(crate) struct RawTask { pub(crate) future: *mut F, /// The output of the future. - pub(crate) output: *mut T, + pub(crate) output: *mut Result, } -impl Copy for RawTask {} +impl Copy for RawTask {} -impl Clone for RawTask { +impl Clone for RawTask { fn clone(&self) -> Self { *self } } -impl RawTask { +impl RawTask { const TASK_LAYOUT: Option = Self::eval_task_layout(); /// Computes the memory layout for a task. #[inline] const fn eval_task_layout() -> Option { // Compute the layouts for `Header`, `S`, `F`, and `T`. - let layout_header = Layout::new::
(); + let layout_header = Layout::new::>(); let layout_s = Layout::new::(); let layout_f = Layout::new::(); - let layout_r = Layout::new::(); + let layout_r = Layout::new::>(); // Compute the layout for `union { F, T }`. let size_union = max(layout_f.size(), layout_r.size()); @@ -119,10 +132,10 @@ impl RawTask { } } -impl RawTask +impl RawTask where F: Future, - S: Fn(Runnable), + S: Schedule, { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, @@ -134,7 +147,15 @@ where /// Allocates a task with the given `future` and `schedule` function. /// /// It is assumed that initially only the `Runnable` and the `Task` exist. - pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>( + future: Gen, + schedule: S, + builder: crate::Builder, + ) -> NonNull<()> + where + F: 'a, + M: 'a, + { // Compute the layout of the task for allocation. Abort if the computation fails. // // n.b. notgull: task_layout now automatically aborts instead of panicking @@ -149,8 +170,14 @@ where let raw = Self::from_ptr(ptr.as_ptr()); + let crate::Builder { + metadata, + #[cfg(feature = "std")] + propagate_panic, + } = builder; + // Write the header as the first field of the task. - (raw.header as *mut Header).write(Header { + (raw.header as *mut Header).write(Header { state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: UnsafeCell::new(None), vtable: &TaskVTable { @@ -163,11 +190,17 @@ where clone_waker: Self::clone_waker, layout_info: &Self::TASK_LAYOUT, }, + metadata, + #[cfg(feature = "std")] + propagate_panic, }); // Write the schedule function as the third field of the task. (raw.schedule as *mut S).write(schedule); + // Generate the future, now that the metadata has been pinned in place. + let future = abort_on_panic(|| future(&(*raw.header).metadata)); + // Write the future as the fourth field of the task. raw.future.write(future); @@ -183,10 +216,10 @@ where unsafe { Self { - header: p as *const Header, + header: p as *const Header, schedule: p.add(task_layout.offset_s) as *const S, future: p.add(task_layout.offset_f) as *mut F, - output: p.add(task_layout.offset_r) as *mut T, + output: p.add(task_layout.offset_r) as *mut Result, } } } @@ -252,7 +285,7 @@ where // time to schedule it. if state & RUNNING == 0 { // Schedule the task. - Self::schedule(ptr); + Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Drop the waker. Self::drop_waker(ptr); @@ -310,7 +343,7 @@ where // If the task is not running, now is the time to schedule. if state & RUNNING == 0 { // If the reference count overflowed, abort. - if state > isize::max_value() as usize { + if state > isize::MAX as usize { abort(); } @@ -319,8 +352,9 @@ where // still alive. let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, }; - (*raw.schedule)(task); + (*raw.schedule).schedule(task, ScheduleInfo::new(false)); } break; @@ -340,7 +374,7 @@ where let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); // If the reference count overflowed, abort. - if state > isize::max_value() as usize { + if state > isize::MAX as usize { abort(); } @@ -368,7 +402,7 @@ where (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule(ptr); + Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Otherwise, destroy the task right away. Self::destroy(ptr); @@ -398,7 +432,7 @@ where /// /// This function doesn't modify the state of the task. It only passes the task reference to /// its schedule function. - unsafe fn schedule(ptr: *const ()) { + unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { let raw = Self::from_ptr(ptr); // If the schedule function has captured variables, create a temporary waker that prevents @@ -410,8 +444,9 @@ where let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, }; - (*raw.schedule)(task); + (*raw.schedule).schedule(task, info); } /// Drops the future inside a task. @@ -442,6 +477,9 @@ where // We need a safeguard against panics because destructors can panic. abort_on_panic(|| { + // Drop the header along with the metadata. + (raw.header as *mut Header).drop_in_place(); + // Drop the schedule function. (raw.schedule as *mut S).drop_in_place(); }); @@ -507,8 +545,30 @@ where // Poll the inner future, but surround it with a guard that closes the task in case polling // panics. + // If available, we should also try to catch the panic so that it is propagated correctly. let guard = Guard(raw); - let poll = ::poll(Pin::new_unchecked(&mut *raw.future), cx); + + // Panic propagation is not available for no_std. + #[cfg(not(feature = "std"))] + let poll = ::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok); + + #[cfg(feature = "std")] + let poll = { + // Check if we should propagate panics. + if (*raw.header).propagate_panic { + // Use catch_unwind to catch the panic. + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + ::poll(Pin::new_unchecked(&mut *raw.future), cx) + })) { + Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Poll::Pending) => Poll::Pending, + Err(e) => Poll::Ready(Err(e)), + } + } else { + ::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok) + } + }; + mem::forget(guard); match poll { @@ -608,7 +668,7 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule(ptr); + Self::schedule(ptr, ScheduleInfo::new(true)); return true; } else { // Drop the task reference. @@ -625,15 +685,15 @@ where return false; /// A guard that closes the task if polling its future panics. - struct Guard(RawTask) + struct Guard(RawTask) where F: Future, - S: Fn(Runnable); + S: Schedule; - impl Drop for Guard + impl Drop for Guard where F: Future, - S: Fn(Runnable), + S: Schedule, { fn drop(&mut self) { let raw = self.0; @@ -648,7 +708,7 @@ where if state & CLOSED != 0 { // The thread that closed the task didn't drop the future because it // was running so now it's our responsibility to do so. - RawTask::::drop_future(ptr); + RawTask::::drop_future(ptr); // Mark the task as not running and not scheduled. (*raw.header) @@ -662,7 +722,7 @@ where } // Drop the task reference. - RawTask::::drop_ref(ptr); + RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -680,7 +740,7 @@ where ) { Ok(state) => { // Drop the future because the task is now closed. - RawTask::::drop_future(ptr); + RawTask::::drop_future(ptr); // Take the awaiter out. let mut awaiter = None; @@ -689,7 +749,7 @@ where } // Drop the task reference. - RawTask::::drop_ref(ptr); + RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { diff --git a/src/runnable.rs b/src/runnable.rs index cb70ef3..8b1b062 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -6,11 +6,536 @@ use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::Waker; +use alloc::boxed::Box; + use crate::header::Header; use crate::raw::RawTask; use crate::state::*; use crate::Task; +mod sealed { + use super::*; + pub trait Sealed {} + + impl Sealed for F where F: Fn(Runnable) {} + + impl Sealed for WithInfo where F: Fn(Runnable, ScheduleInfo) {} +} + +/// A builder that creates a new task. +#[derive(Debug)] +pub struct Builder { + /// The metadata associated with the task. + pub(crate) metadata: M, + + /// Whether or not a panic that occurs in the task should be propagated. + #[cfg(feature = "std")] + pub(crate) propagate_panic: bool, +} + +impl Default for Builder { + fn default() -> Self { + Builder::new().metadata(M::default()) + } +} + +/// Extra scheduling information that can be passed to the scheduling function. +/// +/// The data source of this struct is directly from the actual implementation +/// of the crate itself, different from [`Runnable`]'s metadata, which is +/// managed by the caller. +/// +/// # Examples +/// +/// ``` +/// use async_task::{Runnable, ScheduleInfo, WithInfo}; +/// use std::sync::{Arc, Mutex}; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up while running, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// // Otherwise, it will be placed into this slot. +/// let lifo_slot = Arc::new(Mutex::new(None)); +/// let schedule = move |runnable: Runnable, info: ScheduleInfo| { +/// if info.woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = lifo_slot.lock().unwrap().replace(runnable); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// +/// // Create the actual scheduler to be spawned with some future. +/// let scheduler = WithInfo(schedule); +/// // Create a task with the future and the scheduler. +/// let (runnable, task) = async_task::spawn(future, scheduler); +/// ``` +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub struct ScheduleInfo { + /// Indicates whether the task gets woken up while running. + /// + /// It is set to true usually because the task has yielded itself to the + /// scheduler. + pub woken_while_running: bool, +} + +impl ScheduleInfo { + pub(crate) fn new(woken_while_running: bool) -> Self { + ScheduleInfo { + woken_while_running, + } + } +} + +/// The trait for scheduling functions. +pub trait Schedule: sealed::Sealed { + /// The actual scheduling procedure. + fn schedule(&self, runnable: Runnable, info: ScheduleInfo); +} + +impl Schedule for F +where + F: Fn(Runnable), +{ + fn schedule(&self, runnable: Runnable, _: ScheduleInfo) { + self(runnable) + } +} + +/// Pass a scheduling function with more scheduling information - a.k.a. +/// [`ScheduleInfo`]. +/// +/// Sometimes, it's useful to pass the runnable's state directly to the +/// scheduling function, such as whether it's woken up while running. The +/// scheduler can thus use the information to determine its scheduling +/// strategy. +/// +/// The data source of [`ScheduleInfo`] is directly from the actual +/// implementation of the crate itself, different from [`Runnable`]'s metadata, +/// which is managed by the caller. +/// +/// # Examples +/// +/// ``` +/// use async_task::{ScheduleInfo, WithInfo}; +/// use std::sync::{Arc, Mutex}; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up while running, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// // Otherwise, it will be placed into this slot. +/// let lifo_slot = Arc::new(Mutex::new(None)); +/// let schedule = move |runnable, info: ScheduleInfo| { +/// if info.woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = lifo_slot.lock().unwrap().replace(runnable); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = async_task::spawn(future, WithInfo(schedule)); +/// ``` +#[derive(Debug)] +pub struct WithInfo(pub F); + +impl From for WithInfo { + fn from(value: F) -> Self { + WithInfo(value) + } +} + +impl Schedule for WithInfo +where + F: Fn(Runnable, ScheduleInfo), +{ + fn schedule(&self, runnable: Runnable, info: ScheduleInfo) { + (self.0)(runnable, info) + } +} + +impl Builder<()> { + /// Creates a new task builder. + /// + /// By default, this task builder has no metadata. Use the [`metadata`] method to + /// set the metadata. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// + /// let (runnable, task) = Builder::new().spawn(|()| async {}, |_| {}); + /// ``` + pub fn new() -> Builder<()> { + Builder { + metadata: (), + #[cfg(feature = "std")] + propagate_panic: false, + } + } + + /// Adds metadata to the task. + /// + /// In certain cases, it may be useful to associate some metadata with a task. For instance, + /// you may want to associate a name with a task, or a priority for a priority queue. This + /// method allows the user to attach arbitrary metadata to a task that is available through + /// the [`Runnable`] or the [`Task`]. + /// + /// # Examples + /// + /// This example creates an executor that associates a "priority" number with each task, and + /// then runs the tasks in order of priority. + /// + /// ``` + /// use async_task::{Builder, Runnable}; + /// use once_cell::sync::Lazy; + /// use std::cmp; + /// use std::collections::BinaryHeap; + /// use std::sync::Mutex; + /// + /// # smol::future::block_on(async { + /// /// A wrapper around a `Runnable` that implements `Ord` so that it can be used in a + /// /// priority queue. + /// struct TaskWrapper(Runnable); + /// + /// impl PartialEq for TaskWrapper { + /// fn eq(&self, other: &Self) -> bool { + /// self.0.metadata() == other.0.metadata() + /// } + /// } + /// + /// impl Eq for TaskWrapper {} + /// + /// impl PartialOrd for TaskWrapper { + /// fn partial_cmp(&self, other: &Self) -> Option { + /// Some(self.cmp(other)) + /// } + /// } + /// + /// impl Ord for TaskWrapper { + /// fn cmp(&self, other: &Self) -> cmp::Ordering { + /// self.0.metadata().cmp(other.0.metadata()) + /// } + /// } + /// + /// static EXECUTOR: Lazy>> = Lazy::new(|| { + /// Mutex::new(BinaryHeap::new()) + /// }); + /// + /// let schedule = |runnable| { + /// EXECUTOR.lock().unwrap().push(TaskWrapper(runnable)); + /// }; + /// + /// // Spawn a few tasks with different priorities. + /// let spawn_task = move |priority| { + /// let (runnable, task) = Builder::new().metadata(priority).spawn( + /// move |_| async move { priority }, + /// schedule, + /// ); + /// runnable.schedule(); + /// task + /// }; + /// + /// let t1 = spawn_task(1); + /// let t2 = spawn_task(2); + /// let t3 = spawn_task(3); + /// + /// // Run the tasks in order of priority. + /// let mut metadata_seen = vec![]; + /// while let Some(TaskWrapper(runnable)) = EXECUTOR.lock().unwrap().pop() { + /// metadata_seen.push(*runnable.metadata()); + /// runnable.run(); + /// } + /// + /// assert_eq!(metadata_seen, vec![3, 2, 1]); + /// assert_eq!(t1.await, 1); + /// assert_eq!(t2.await, 2); + /// assert_eq!(t3.await, 3); + /// # }); + /// ``` + pub fn metadata(self, metadata: M) -> Builder { + Builder { + metadata, + #[cfg(feature = "std")] + propagate_panic: self.propagate_panic, + } + } +} + +impl Builder { + /// Propagates panics that occur in the task. + /// + /// When this is `true`, panics that occur in the task will be propagated to the caller of + /// the [`Task`]. When this is false, no special action is taken when a panic occurs in the + /// task, meaning that the caller of [`Runnable::run`] will observe a panic. + /// + /// This is only available when the `std` feature is enabled. By default, this is `false`. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// use futures_lite::future::poll_fn; + /// use std::future::Future; + /// use std::panic; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// fn did_panic(f: F) -> bool { + /// panic::catch_unwind(panic::AssertUnwindSafe(f)).is_err() + /// } + /// + /// # smol::future::block_on(async { + /// let (runnable1, mut task1) = Builder::new() + /// .propagate_panic(true) + /// .spawn(|()| async move { panic!() }, |_| {}); + /// + /// let (runnable2, mut task2) = Builder::new() + /// .propagate_panic(false) + /// .spawn(|()| async move { panic!() }, |_| {}); + /// + /// assert!(!did_panic(|| { runnable1.run(); })); + /// assert!(did_panic(|| { runnable2.run(); })); + /// + /// let waker = poll_fn(|cx| Poll::Ready(cx.waker().clone())).await; + /// let mut cx = Context::from_waker(&waker); + /// assert!(did_panic(|| { let _ = Pin::new(&mut task1).poll(&mut cx); })); + /// assert!(did_panic(|| { let _ = Pin::new(&mut task2).poll(&mut cx); })); + /// # }); + /// ``` + #[cfg(feature = "std")] + pub fn propagate_panic(self, propagate_panic: bool) -> Builder { + Builder { + metadata: self.metadata, + propagate_panic, + } + } + + /// Creates a new task. + /// + /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its + /// output. + /// + /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] + /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run + /// again. + /// + /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. + /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it + /// should push it into a task queue so that it can be processed later. + /// + /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider + /// using [`spawn_local()`] or [`spawn_unchecked()`] instead. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = Builder::new().spawn(|()| future, schedule); + /// ``` + pub fn spawn(self, future: F, schedule: S) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + S: Schedule + Send + Sync + 'static, + { + unsafe { self.spawn_unchecked(future, schedule) } + } + + /// Creates a new thread-local task. + /// + /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the + /// [`Runnable`] is used or dropped on another thread, a panic will occur. + /// + /// This function is only available when the `std` feature for this crate is enabled. + /// + /// # Examples + /// + /// ``` + /// use async_task::{Builder, Runnable}; + /// use flume::{Receiver, Sender}; + /// use std::rc::Rc; + /// + /// thread_local! { + /// // A queue that holds scheduled tasks. + /// static QUEUE: (Sender, Receiver) = flume::unbounded(); + /// } + /// + /// // Make a non-Send future. + /// let msg: Rc = "Hello, world!".into(); + /// let future = async move { + /// println!("{}", msg); + /// }; + /// + /// // A function that schedules the task when it gets woken up. + /// let s = QUEUE.with(|(s, _)| s.clone()); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = Builder::new().spawn_local(move |()| future, schedule); + /// ``` + #[cfg(feature = "std")] + pub fn spawn_local( + self, + future: F, + schedule: S, + ) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + 'static, + Fut::Output: 'static, + S: Schedule + Send + Sync + 'static, + { + use std::mem::ManuallyDrop; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::thread::{self, ThreadId}; + + #[inline] + fn thread_id() -> ThreadId { + thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + struct Checked { + id: ThreadId, + inner: ManuallyDrop, + } + + impl Drop for Checked { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it" + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl Future for Checked { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it" + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + // Wrap the future into one that checks which thread it's on. + let future = move |meta| { + let future = future(meta); + + Checked { + id: thread_id(), + inner: ManuallyDrop::new(future), + } + }; + + unsafe { self.spawn_unchecked(future, schedule) } + } + + /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. + /// + /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and + /// `'static` on `future` and `schedule`. + /// + /// # Safety + /// + /// - If `Fut` is not [`Send`], its [`Runnable`] must be used and dropped on the original + /// thread. + /// - If `Fut` is not `'static`, borrowed non-metadata variables must outlive its [`Runnable`]. + /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] + /// must be used and dropped on the original thread. + /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the + /// [`Runnable`]'s [`Waker`]. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // If the task gets woken up, it will be sent into this channel. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked(move |()| future, schedule) }; + /// ``` + pub unsafe fn spawn_unchecked<'a, F, Fut, S>( + self, + future: F, + schedule: S, + ) -> (Runnable, Task) + where + F: FnOnce(&'a M) -> Fut, + Fut: Future + 'a, + S: Schedule, + M: 'a, + { + // Allocate large futures on the heap. + let ptr = if mem::size_of::() >= 2048 { + let future = |meta| { + let future = future(meta); + Box::pin(future) + }; + + RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, self) + } else { + RawTask::::allocate(future, schedule, self) + }; + + let runnable = Runnable { + ptr, + _marker: PhantomData, + }; + let task = Task { + ptr, + _marker: PhantomData, + }; + (runnable, task) + } +} + /// Creates a new task. /// /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its @@ -46,7 +571,7 @@ pub fn spawn(future: F, schedule: S) -> (Runnable, Task) where F: Future + Send + 'static, F::Output: Send + 'static, - S: Fn(Runnable) + Send + Sync + 'static, + S: Schedule + Send + Sync + 'static, { unsafe { spawn_unchecked(future, schedule) } } @@ -88,58 +613,9 @@ pub fn spawn_local(future: F, schedule: S) -> (Runnable, Task) where F: Future + 'static, F::Output: 'static, - S: Fn(Runnable) + Send + Sync + 'static, + S: Schedule + Send + Sync + 'static, { - use std::mem::ManuallyDrop; - use std::pin::Pin; - use std::task::{Context, Poll}; - use std::thread::{self, ThreadId}; - - #[inline] - fn thread_id() -> ThreadId { - thread_local! { - static ID: ThreadId = thread::current().id(); - } - ID.try_with(|id| *id) - .unwrap_or_else(|_| thread::current().id()) - } - - struct Checked { - id: ThreadId, - inner: ManuallyDrop, - } - - impl Drop for Checked { - fn drop(&mut self) { - assert!( - self.id == thread_id(), - "local task dropped by a thread that didn't spawn it" - ); - unsafe { - ManuallyDrop::drop(&mut self.inner); - } - } - } - - impl Future for Checked { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!( - self.id == thread_id(), - "local task polled by a thread that didn't spawn it" - ); - unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } - } - } - - // Wrap the future into one that checks which thread it's on. - let future = Checked { - id: thread_id(), - inner: ManuallyDrop::new(future), - }; - - unsafe { spawn_unchecked(future, schedule) } + Builder::new().spawn_local(move |()| future, schedule) } /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. @@ -152,9 +628,10 @@ where /// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original /// thread. /// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. -/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on -/// the original thread. -/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. +/// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`] +/// must be used and dropped on the original thread. +/// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the +/// [`Runnable`]'s [`Waker`]. /// /// # Examples /// @@ -174,22 +651,9 @@ where pub unsafe fn spawn_unchecked(future: F, schedule: S) -> (Runnable, Task) where F: Future, - S: Fn(Runnable), + S: Schedule, { - // Allocate large futures on the heap. - let ptr = if mem::size_of::() >= 2048 { - let future = alloc::boxed::Box::pin(future); - RawTask::<_, F::Output, S>::allocate(future, schedule) - } else { - RawTask::::allocate(future, schedule) - }; - - let runnable = Runnable { ptr }; - let task = Task { - ptr, - _marker: PhantomData, - }; - (runnable, task) + Builder::new().spawn_unchecked(move |()| future, schedule) } /// A handle to a runnable task. @@ -230,20 +694,31 @@ where /// runnable.schedule(); /// assert_eq!(smol::future::block_on(task), 3); /// ``` -pub struct Runnable { +pub struct Runnable { /// A pointer to the heap-allocated task. pub(crate) ptr: NonNull<()>, + + /// A marker capturing generic type `M`. + pub(crate) _marker: PhantomData, } -unsafe impl Send for Runnable {} -unsafe impl Sync for Runnable {} +unsafe impl Send for Runnable {} +unsafe impl Sync for Runnable {} #[cfg(feature = "std")] -impl std::panic::UnwindSafe for Runnable {} +impl std::panic::UnwindSafe for Runnable {} #[cfg(feature = "std")] -impl std::panic::RefUnwindSafe for Runnable {} +impl std::panic::RefUnwindSafe for Runnable {} + +impl Runnable { + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { + &self.header().metadata + } -impl Runnable { /// Schedules the task. /// /// This is a convenience method that passes the [`Runnable`] to the schedule function. @@ -265,11 +740,11 @@ impl Runnable { /// ``` pub fn schedule(self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } } @@ -303,7 +778,7 @@ impl Runnable { /// ``` pub fn run(self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.run)(ptr) } @@ -334,22 +809,26 @@ impl Runnable { /// ``` pub fn waker(&self) -> Waker { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let raw_waker = ((*header).vtable.clone_waker)(ptr); Waker::from_raw(raw_waker) } } + + fn header(&self) -> &Header { + unsafe { &*(self.ptr.as_ptr() as *const Header) } + } } -impl Drop for Runnable { +impl Drop for Runnable { fn drop(&mut self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = self.header(); unsafe { - let mut state = (*header).state.load(Ordering::Acquire); + let mut state = header.state.load(Ordering::Acquire); loop { // If the task has been completed or closed, it can't be canceled. @@ -358,7 +837,7 @@ impl Drop for Runnable { } // Mark the task as closed. - match (*header).state.compare_exchange_weak( + match header.state.compare_exchange_weak( state, state | CLOSED, Ordering::AcqRel, @@ -370,10 +849,10 @@ impl Drop for Runnable { } // Drop the future. - ((*header).vtable.drop_future)(ptr); + (header.vtable.drop_future)(ptr); // Mark the task as unscheduled. - let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel); // Notify the awaiter that the future has been dropped. if state & AWAITER != 0 { @@ -381,15 +860,15 @@ impl Drop for Runnable { } // Drop the task reference. - ((*header).vtable.drop_ref)(ptr); + (header.vtable.drop_ref)(ptr); } } } -impl fmt::Debug for Runnable { +impl fmt::Debug for Runnable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; f.debug_struct("Runnable") .field("header", unsafe { &(*header) }) diff --git a/src/task.rs b/src/task.rs index 8ecd746..178b28e 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,6 +1,6 @@ use core::fmt; use core::future::Future; -use core::marker::{PhantomData, Unpin}; +use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; @@ -8,6 +8,8 @@ use core::sync::atomic::Ordering; use core::task::{Context, Poll}; use crate::header::Header; +use crate::raw::Panic; +use crate::runnable::ScheduleInfo; use crate::state::*; /// A spawned task. @@ -44,25 +46,25 @@ use crate::state::*; /// assert_eq!(future::block_on(task), 3); /// ``` #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -pub struct Task { +pub struct Task { /// A raw task pointer. pub(crate) ptr: NonNull<()>, - /// A marker capturing generic type `T`. - pub(crate) _marker: PhantomData, + /// A marker capturing generic types `T` and `M`. + pub(crate) _marker: PhantomData<(T, M)>, } -unsafe impl Send for Task {} -unsafe impl Sync for Task {} +unsafe impl Send for Task {} +unsafe impl Sync for Task {} -impl Unpin for Task {} +impl Unpin for Task {} #[cfg(feature = "std")] -impl std::panic::UnwindSafe for Task {} +impl std::panic::UnwindSafe for Task {} #[cfg(feature = "std")] -impl std::panic::RefUnwindSafe for Task {} +impl std::panic::RefUnwindSafe for Task {} -impl Task { +impl Task { /// Detaches the task to let it keep running in the background. /// /// # Examples @@ -173,14 +175,14 @@ impl Task { /// // Wait for the task's output. /// assert_eq!(future::block_on(task.fallible()), None); /// ``` - pub fn fallible(self) -> FallibleTask { + pub fn fallible(self) -> FallibleTask { FallibleTask { task: self } } /// Puts the task in canceled state. fn set_canceled(&mut self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -209,7 +211,7 @@ impl Task { // If the task is not scheduled nor running, schedule it one more time so // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } // Notify the awaiter that the task has been closed. @@ -226,9 +228,9 @@ impl Task { } /// Puts the task in detached state. - fn set_detached(&mut self) -> Option { + fn set_detached(&mut self) -> Option> { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { // A place where the output will be stored in case it needs to be dropped. @@ -256,8 +258,10 @@ impl Task { ) { Ok(_) => { // Read the output. - output = - Some((((*header).vtable.get_output)(ptr) as *mut T).read()); + output = Some( + (((*header).vtable.get_output)(ptr) as *mut Result) + .read(), + ); // Update the state variable because we're continuing the loop. state |= CLOSED; @@ -286,7 +290,7 @@ impl Task { // schedule dropping its future or destroy it. if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } else { ((*header).vtable.destroy)(ptr); } @@ -316,7 +320,7 @@ impl Task { /// 4. It is completed and the `Task` gets dropped. fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -382,8 +386,22 @@ impl Task { } // Take the output from the task. - let output = ((*header).vtable.get_output)(ptr) as *mut T; - return Poll::Ready(Some(output.read())); + let output = ((*header).vtable.get_output)(ptr) as *mut Result; + let output = output.read(); + + // Propagate the panic if the task panicked. + let output = match output { + Ok(output) => output, + Err(panic) => { + #[cfg(feature = "std")] + std::panic::resume_unwind(panic); + + #[cfg(not(feature = "std"))] + match panic {} + } + }; + + return Poll::Ready(Some(output)); } Err(s) => state = s, } @@ -391,9 +409,9 @@ impl Task { } } - fn header(&self) -> &Header { + fn header(&self) -> &Header { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { &*header } } @@ -402,23 +420,31 @@ impl Task { /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. pub fn is_finished(&self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let state = (*header).state.load(Ordering::Acquire); state & (CLOSED | COMPLETED) != 0 } } + + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { + &self.header().metadata + } } -impl Drop for Task { +impl Drop for Task { fn drop(&mut self) { self.set_canceled(); self.set_detached(); } } -impl Future for Task { +impl Future for Task { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -429,7 +455,7 @@ impl Future for Task { } } -impl fmt::Debug for Task { +impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") .field("header", self.header()) @@ -446,11 +472,11 @@ impl fmt::Debug for Task { /// This can be useful to avoid the panic produced when polling the `Task` /// future if the executor dropped its `Runnable`. #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -pub struct FallibleTask { - task: Task, +pub struct FallibleTask { + task: Task, } -impl FallibleTask { +impl FallibleTask { /// Detaches the task to let it keep running in the background. /// /// # Examples @@ -513,9 +539,16 @@ impl FallibleTask { pub async fn cancel(self) -> Option { self.task.cancel().await } + + /// Returns `true` if the current task is finished. + /// + /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. + pub fn is_finished(&self) -> bool { + self.task.is_finished() + } } -impl Future for FallibleTask { +impl Future for FallibleTask { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -523,7 +556,7 @@ impl Future for FallibleTask { } } -impl fmt::Debug for FallibleTask { +impl fmt::Debug for FallibleTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FallibleTask") .field("header", self.task.header()) diff --git a/src/utils.rs b/src/utils.rs index 189e9af..5c2170c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -83,7 +83,7 @@ impl Layout { /// Returns the layout for `a` followed by `b` and the offset of `b`. /// - /// This function was adapted from the currently unstable `Layout::extend()`: + /// This function was adapted from the `Layout::extend()`: /// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend #[inline] pub(crate) const fn extend(self, other: Layout) -> Option<(Layout, usize)> { @@ -97,7 +97,7 @@ impl Layout { // - align is 0 (implied false by is_power_of_two()) // - align is not a power of 2 // - size rounded up to align overflows - if !new_align.is_power_of_two() || new_size > core::usize::MAX - (new_align - 1) { + if !new_align.is_power_of_two() || new_size > isize::MAX as usize - (new_align - 1) { return None; } @@ -107,7 +107,7 @@ impl Layout { /// Returns the padding after `layout` that aligns the following address to `align`. /// - /// This function was adapted from the currently unstable `Layout::padding_needed_for()`: + /// This function was adapted from the `Layout::padding_needed_for()`: /// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for #[inline] pub(crate) const fn padding_needed_for(self, align: usize) -> usize { diff --git a/tests/metadata.rs b/tests/metadata.rs new file mode 100644 index 0000000..d3d8d53 --- /dev/null +++ b/tests/metadata.rs @@ -0,0 +1,58 @@ +use async_task::{Builder, Runnable}; +use flume::unbounded; +use smol::future; + +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[test] +fn metadata_use_case() { + // Each future has a counter that is incremented every time it is scheduled. + let (sender, receiver) = unbounded::>(); + let schedule = move |runnable: Runnable| { + runnable.metadata().fetch_add(1, Ordering::SeqCst); + sender.send(runnable).ok(); + }; + + async fn my_future(counter: &AtomicUsize) { + loop { + // Loop until we've been scheduled five times. + let count = counter.load(Ordering::SeqCst); + if count < 5 { + // Make sure that we are immediately scheduled again. + future::yield_now().await; + continue; + } + + // We've been scheduled five times, so we're done. + break; + } + } + + let make_task = || { + // SAFETY: We are spawning a non-'static future, so we need to use the unsafe API. + // The borrowed variables, in this case the metadata, are guaranteed to outlive the runnable. + let (runnable, task) = unsafe { + Builder::new() + .metadata(AtomicUsize::new(0)) + .spawn_unchecked(my_future, schedule.clone()) + }; + + runnable.schedule(); + task + }; + + // Make tasks. + let t1 = make_task(); + let t2 = make_task(); + + // Run the tasks. + while let Ok(runnable) = receiver.try_recv() { + runnable.run(); + } + + // Unwrap the tasks. + smol::future::block_on(async move { + t1.await; + t2.await; + }); +} diff --git a/tests/panic.rs b/tests/panic.rs index 09ffb28..85684a0 100644 --- a/tests/panic.rs +++ b/tests/panic.rs @@ -131,7 +131,7 @@ fn try_join_and_run_and_join() { schedule!(s, SCHEDULE, DROP_S); let (runnable, mut task) = async_task::spawn(f, s); - future::block_on(future::or(&mut task, future::ready(Default::default()))); + future::block_on(future::or(&mut task, future::ready(()))); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); @@ -197,7 +197,7 @@ fn try_join_during_run() { .add(|| { thread::sleep(ms(200)); - future::block_on(future::or(&mut task, future::ready(Default::default()))); + future::block_on(future::or(&mut task, future::ready(()))); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); diff --git a/tests/waker_ready.rs b/tests/waker_ready.rs index 10d38cb..335134e 100644 --- a/tests/waker_ready.rs +++ b/tests/waker_ready.rs @@ -175,6 +175,7 @@ fn wake_by_ref() { assert_eq!(chan.len(), 0); } +#[allow(clippy::redundant_clone)] // This is intentional #[test] fn clone() { future!(f, get_waker, POLL, DROP_F); -- cgit v1.2.3