diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:22:04 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:22:04 +0000 |
commit | 3f3ff5be9f8846c88c9bc070b80b654f73e91026 (patch) | |
tree | b43e15fe04d1c6ea9e0efa054185733a5959b366 | |
parent | 0813e514d0bb457ee45cc2fd0ee21332c865495a (diff) | |
parent | c8aec5c18160241e9b5bc79bbc3580e13df5ab23 (diff) | |
download | futures-executor-aml_art_331612010.tar.gz |
Snap for 8564071 from c8aec5c18160241e9b5bc79bbc3580e13df5ab23 to mainline-art-releaseaml_art_331813100aml_art_331813010aml_art_331711080aml_art_331612010aml_art_331413030aml_art_331314010aml_art_331113000aml_art_331012050android13-mainline-art-release
Change-Id: I3c6e8d73c83a5ef18bcaeebf65c9eb6d4e8f66f6
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 13 | ||||
-rw-r--r-- | Cargo.toml | 42 | ||||
-rw-r--r-- | Cargo.toml.orig | 11 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | TEST_MAPPING | 39 | ||||
-rw-r--r-- | benches/thread_notify.rs | 5 | ||||
-rw-r--r-- | cargo2android.json | 5 | ||||
-rw-r--r-- | src/lib.rs | 27 | ||||
-rw-r--r-- | src/local_pool.rs | 14 | ||||
-rw-r--r-- | src/thread_pool.rs | 53 | ||||
-rw-r--r-- | src/unpark_mutex.rs | 27 | ||||
-rw-r--r-- | tests/local_pool.rs | 189 |
14 files changed, 283 insertions, 180 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f3ad3ab..77486f4 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" - } -} + "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + }, + "path_in_vcs": "futures-executor" +}
\ No newline at end of file @@ -43,6 +43,8 @@ rust_library { name: "libfutures_executor", host_supported: true, crate_name: "futures_executor", + cargo_env_compat: true, + cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -58,18 +60,9 @@ rust_library { ], apex_available: [ "//apex_available:platform", + "com.android.bluetooth", "com.android.resolv", "com.android.virt", ], min_sdk_version: "29", } - -// dependent_library ["feature_list"] -// futures-core-0.3.14 "alloc,std" -// futures-task-0.3.14 "alloc,std" -// futures-util-0.3.14 "alloc,slab,std" -// libc-0.2.94 "default,std" -// num_cpus-1.13.0 -// pin-project-lite-0.2.6 -// pin-utils-0.1.0 -// slab-0.4.3 "default,std" @@ -3,36 +3,41 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" +rust-version = "1.45" name = "futures-executor" -version = "0.3.13" -authors = ["Alex Crichton <alex@alexcrichton.com>"] -description = "Executors for asynchronous tasks based on the futures-rs library.\n" +version = "0.3.21" +description = """ +Executors for asynchronous tasks based on the futures-rs library. +""" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-executor/0.3" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] + [dependencies.futures-core] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-task] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-util] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.num_cpus] @@ -43,5 +48,12 @@ optional = true [features] default = ["std"] -std = ["futures-core/std", "futures-task/std", "futures-util/std"] -thread-pool = ["std", "num_cpus"] +std = [ + "futures-core/std", + "futures-task/std", + "futures-util/std", +] +thread-pool = [ + "std", + "num_cpus", +] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index bc1853b..dae5f22 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,12 +1,11 @@ [package] name = "futures-executor" +version = "0.3.21" edition = "2018" -version = "0.3.13" -authors = ["Alex Crichton <alex@alexcrichton.com>"] +rust-version = "1.45" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-executor/0.3" description = """ Executors for asynchronous tasks based on the futures-rs library. """ @@ -17,9 +16,9 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"] thread-pool = ["std", "num_cpus"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.13", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.13", default-features = false } +futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.21", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.21", default-features = false } num_cpus = { version = "1.8.0", optional = true } [dev-dependencies] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-executor/futures-executor-0.3.13.crate" + value: "https://static.crates.io/crates/futures-executor/futures-executor-0.3.21.crate" } - version: "0.3.13" + version: "0.3.21" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 4 + year: 2022 + month: 3 day: 1 } } diff --git a/README.md b/README.md new file mode 100644 index 0000000..6708685 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# futures-executor + +Executors for asynchronous tasks based on the futures-rs library. + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-executor = "0.3" +``` + +The current `futures-executor` requires Rust 1.45 or later. + +## License + +Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or +[MIT license](LICENSE-MIT) at your option. + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. diff --git a/TEST_MAPPING b/TEST_MAPPING new file mode 100644 index 0000000..e2df61d --- /dev/null +++ b/TEST_MAPPING @@ -0,0 +1,39 @@ +// Generated by update_crate_tests.py for tests that depend on this crate. +{ + "imports": [ + { + "path": "external/rust/crates/anyhow" + }, + { + "path": "external/rust/crates/tokio" + } + ], + "presubmit": [ + { + "name": "ZipFuseTest" + }, + { + "name": "authfs_device_test_src_lib" + }, + { + "name": "doh_unit_test" + }, + { + "name": "virtualizationservice_device_test" + } + ], + "presubmit-rust": [ + { + "name": "ZipFuseTest" + }, + { + "name": "authfs_device_test_src_lib" + }, + { + "name": "doh_unit_test" + }, + { + "name": "virtualizationservice_device_test" + } + ] +} diff --git a/benches/thread_notify.rs b/benches/thread_notify.rs index d8fbec4..88d0447 100644 --- a/benches/thread_notify.rs +++ b/benches/thread_notify.rs @@ -102,10 +102,7 @@ fn thread_yield_multi_thread(b: &mut Bencher) { }); b.iter(move || { - let y = Yield { - rem: NUM, - tx: tx.clone(), - }; + let y = Yield { rem: NUM, tx: tx.clone() }; block_on(y); }); diff --git a/cargo2android.json b/cargo2android.json index 05ca777..da40e17 100644 --- a/cargo2android.json +++ b/cargo2android.json @@ -1,12 +1,13 @@ { "apex-available": [ "//apex_available:platform", + "com.android.bluetooth", "com.android.resolv", "com.android.virt" ], - "min_sdk_version": "29", "dependencies": true, "device": true, "features": "thread-pool", + "min-sdk-version": "29", "run": true -}
\ No newline at end of file +} @@ -37,13 +37,20 @@ //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj #![cfg_attr(not(feature = "std"), no_std)] - -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] -// It cannot be included in the published code because this lints have false positives in the minimum required version. -#![cfg_attr(test, warn(single_use_lifetimes))] -#![warn(clippy::all)] -#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] - +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "std")] @@ -52,13 +59,13 @@ mod local_pool; pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner}; #[cfg(feature = "thread-pool")] -#[cfg(feature = "std")] -mod unpark_mutex; -#[cfg(feature = "thread-pool")] #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] #[cfg(feature = "std")] mod thread_pool; #[cfg(feature = "thread-pool")] +#[cfg(feature = "std")] +mod unpark_mutex; +#[cfg(feature = "thread-pool")] #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] #[cfg(feature = "std")] pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder}; diff --git a/src/local_pool.rs b/src/local_pool.rs index 156d5cc..bee96d8 100644 --- a/src/local_pool.rs +++ b/src/local_pool.rs @@ -10,7 +10,10 @@ use futures_util::stream::StreamExt; use std::cell::RefCell; use std::ops::{Deref, DerefMut}; use std::rc::{Rc, Weak}; -use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use std::thread::{self, Thread}; /// A single-threaded task pool for polling futures to completion. @@ -119,17 +122,12 @@ fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T { impl LocalPool { /// Create a new, empty pool of tasks. pub fn new() -> Self { - Self { - pool: FuturesUnordered::new(), - incoming: Default::default(), - } + Self { pool: FuturesUnordered::new(), incoming: Default::default() } } /// Get a clonable handle to the pool as a [`Spawn`]. pub fn spawner(&self) -> LocalSpawner { - LocalSpawner { - incoming: Rc::downgrade(&self.incoming), - } + LocalSpawner { incoming: Rc::downgrade(&self.incoming) } } /// Run all tasks in the pool to completion. diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 741e6d9..5e1f586 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -2,8 +2,8 @@ use crate::enter; use crate::unpark_mutex::UnparkMutex; use futures_core::future::Future; use futures_core::task::{Context, Poll}; +use futures_task::{waker_ref, ArcWake}; use futures_task::{FutureObj, Spawn, SpawnError}; -use futures_task::{ArcWake, waker_ref}; use futures_util::future::FutureExt; use std::cmp; use std::fmt; @@ -54,9 +54,7 @@ struct PoolState { impl fmt::Debug for ThreadPool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ThreadPool") - .field("size", &self.state.size) - .finish() + f.debug_struct("ThreadPool").field("size", &self.state.size).finish() } } @@ -100,10 +98,7 @@ impl ThreadPool { pub fn spawn_obj_ok(&self, future: FutureObj<'static, ()>) { let task = Task { future, - wake_handle: Arc::new(WakeHandle { - exec: self.clone(), - mutex: UnparkMutex::new(), - }), + wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new() }), exec: self.clone(), }; self.state.send(Message::Run(task)); @@ -132,10 +127,7 @@ impl ThreadPool { } impl Spawn for ThreadPool { - fn spawn_obj( - &self, - future: FutureObj<'static, ()>, - ) -> Result<(), SpawnError> { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.spawn_obj_ok(future); Ok(()) } @@ -146,10 +138,12 @@ impl PoolState { self.tx.lock().unwrap().send(msg).unwrap(); } - fn work(&self, - idx: usize, - after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>, - before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>) { + fn work( + &self, + idx: usize, + after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>, + before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>, + ) { let _scope = enter().unwrap(); if let Some(after_start) = after_start { after_start(idx); @@ -241,7 +235,8 @@ impl ThreadPoolBuilder { /// The closure provided will receive an index corresponding to the worker /// thread it's running on. pub fn after_start<F>(&mut self, f: F) -> &mut Self - where F: Fn(usize) + Send + Sync + 'static + where + F: Fn(usize) + Send + Sync + 'static, { self.after_start = Some(Arc::new(f)); self @@ -250,13 +245,14 @@ impl ThreadPoolBuilder { /// Execute closure `f` just prior to shutting down each worker thread. /// /// This hook is intended for bookkeeping and monitoring. - /// The closure `f` will be dropped after the `builder` is droppped + /// The closure `f` will be dropped after the `builder` is dropped /// and all threads in the pool have executed it. /// /// The closure provided will receive an index corresponding to the worker /// thread it's running on. pub fn before_stop<F>(&mut self, f: F) -> &mut Self - where F: Fn(usize) + Send + Sync + 'static + where + F: Fn(usize) + Send + Sync + 'static, { self.before_stop = Some(Arc::new(f)); self @@ -328,14 +324,11 @@ impl Task { Poll::Pending => {} Poll::Ready(()) => return wake_handle.mutex.complete(), } - let task = Self { - future, - wake_handle: wake_handle.clone(), - exec, - }; + let task = Self { future, wake_handle: wake_handle.clone(), exec }; match wake_handle.mutex.wait(task) { Ok(()) => return, // we've waited - Err(task) => { // someone's notified us + Err(task) => { + // someone's notified us future = task.future; exec = task.exec; } @@ -347,9 +340,7 @@ impl Task { impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Task") - .field("contents", &"...") - .finish() + f.debug_struct("Task").field("contents", &"...").finish() } } @@ -372,9 +363,11 @@ mod tests { let (tx, rx) = mpsc::sync_channel(2); let _cpu_pool = ThreadPoolBuilder::new() .pool_size(2) - .after_start(move |_| tx.send(1).unwrap()).create().unwrap(); + .after_start(move |_| tx.send(1).unwrap()) + .create() + .unwrap(); - // After ThreadPoolBuilder is deconstructed, the tx should be droped + // After ThreadPoolBuilder is deconstructed, the tx should be dropped // so that we can use rx as an iterator. let count = rx.into_iter().count(); assert_eq!(count, 2); diff --git a/src/unpark_mutex.rs b/src/unpark_mutex.rs index c49c64c..ac5112c 100644 --- a/src/unpark_mutex.rs +++ b/src/unpark_mutex.rs @@ -29,25 +29,22 @@ unsafe impl<D: Send> Sync for UnparkMutex<D> {} // transitions: // The task is blocked, waiting on an event -const WAITING: usize = 0; // --> POLLING +const WAITING: usize = 0; // --> POLLING // The task is actively being polled by a thread; arrival of additional events // of interest should move it to the REPOLL state -const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE +const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE // The task is actively being polled, but will need to be re-polled upon // completion to ensure that all events were observed. -const REPOLL: usize = 2; // --> POLLING +const REPOLL: usize = 2; // --> POLLING // The task has finished executing (either successfully or with an error/panic) -const COMPLETE: usize = 3; // No transitions out +const COMPLETE: usize = 3; // No transitions out impl<D> UnparkMutex<D> { pub(crate) fn new() -> Self { - Self { - status: AtomicUsize::new(WAITING), - inner: UnsafeCell::new(None), - } + Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) } } /// Attempt to "notify" the mutex that a poll should occur. @@ -62,8 +59,7 @@ impl<D> UnparkMutex<D> { match status { // The task is idle, so try to run it immediately. WAITING => { - match self.status.compare_exchange(WAITING, POLLING, - SeqCst, SeqCst) { + match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) { Ok(_) => { let data = unsafe { // SAFETY: we've ensured mutual exclusion via @@ -82,13 +78,10 @@ impl<D> UnparkMutex<D> { // The task is being polled, so we need to record that it should // be *repolled* when complete. - POLLING => { - match self.status.compare_exchange(POLLING, REPOLL, - SeqCst, SeqCst) { - Ok(_) => return Err(()), - Err(cur) => status = cur, - } - } + POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) { + Ok(_) => return Err(()), + Err(cur) => status = cur, + }, // The task is already scheduled for polling, or is complete, so // we've got nothing to do. diff --git a/tests/local_pool.rs b/tests/local_pool.rs index b31f103..9b1316b 100644 --- a/tests/local_pool.rs +++ b/tests/local_pool.rs @@ -1,14 +1,14 @@ use futures::channel::oneshot; use futures::executor::LocalPool; -use futures::future::{self, Future, lazy, poll_fn}; -use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker}; +use futures::future::{self, lazy, poll_fn, Future}; +use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::thread; use std::time::Duration; -use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicBool}; struct Pending(Rc<()>); @@ -52,9 +52,14 @@ fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); let spawn = pool.spawner(); - spawn.spawn_local_obj(Box::pin(lazy(move |_| { - tx.send(()).unwrap(); - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + tx.send(()).unwrap(); + })) + .into(), + ) + .unwrap(); pool.run_until(rx).unwrap(); } @@ -74,18 +79,27 @@ fn run_executes_spawned() { let spawn = pool.spawner(); let spawn2 = pool.spawner(); - spawn.spawn_local_obj(Box::pin(lazy(move |_| { - spawn2.spawn_local_obj(Box::pin(lazy(move |_| { - cnt2.set(cnt2.get() + 1); - })).into()).unwrap(); - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + spawn2 + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt2.set(cnt2.get() + 1); + })) + .into(), + ) + .unwrap(); + })) + .into(), + ) + .unwrap(); pool.run(); assert_eq!(cnt.get(), 1); } - #[test] fn run_spawn_many() { const ITER: usize = 200; @@ -97,9 +111,14 @@ fn run_spawn_many() { for _ in 0..ITER { let cnt = cnt.clone(); - spawn.spawn_local_obj(Box::pin(lazy(move |_| { - cnt.set(cnt.get() + 1); - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt.set(cnt.get() + 1); + })) + .into(), + ) + .unwrap(); } pool.run(); @@ -126,9 +145,14 @@ fn try_run_one_executes_one_ready() { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); let cnt = cnt.clone(); - spawn.spawn_local_obj(Box::pin(lazy(move |_| { - cnt.set(cnt.get() + 1); - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt.set(cnt.get() + 1); + })) + .into(), + ) + .unwrap(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); } @@ -154,15 +178,20 @@ fn try_run_one_returns_on_no_progress() { { let cnt = cnt.clone(); let waker = waker.clone(); - spawn.spawn_local_obj(Box::pin(poll_fn(move |ctx| { - cnt.set(cnt.get() + 1); - waker.set(Some(ctx.waker().clone())); - if cnt.get() == ITER { - Poll::Ready(()) - } else { - Poll::Pending - } - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(poll_fn(move |ctx| { + cnt.set(cnt.get() + 1); + waker.set(Some(ctx.waker().clone())); + if cnt.get() == ITER { + Poll::Ready(()) + } else { + Poll::Pending + } + })) + .into(), + ) + .unwrap(); } for i in 0..ITER - 1 { @@ -185,16 +214,21 @@ fn try_run_one_runs_sub_futures() { let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); - spawn.spawn_local_obj(Box::pin(poll_fn(move |_| { - cnt1.set(cnt1.get() + 1); - - let cnt2 = cnt1.clone(); - inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{ - cnt2.set(cnt2.get() + 1) - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(poll_fn(move |_| { + cnt1.set(cnt1.get() + 1); - Poll::Pending - })).into()).unwrap(); + let cnt2 = cnt1.clone(); + inner_spawner + .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()) + .unwrap(); + + Poll::Pending + })) + .into(), + ) + .unwrap(); pool.try_run_one(); assert_eq!(cnt.get(), 2); @@ -214,12 +248,12 @@ fn run_until_stalled_returns_multiple_times() { let cnt = Rc::new(Cell::new(0)); let cnt1 = cnt.clone(); - spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt1.set(cnt1.get() + 1) })).into()).unwrap(); + spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 1); let cnt2 = cnt.clone(); - spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap(); + spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 2); } @@ -232,16 +266,21 @@ fn run_until_stalled_runs_spawned_sub_futures() { let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); - spawn.spawn_local_obj(Box::pin(poll_fn(move |_| { - cnt1.set(cnt1.get() + 1); - - let cnt2 = cnt1.clone(); - inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{ - cnt2.set(cnt2.get() + 1) - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(poll_fn(move |_| { + cnt1.set(cnt1.get() + 1); - Poll::Pending - })).into()).unwrap(); + let cnt2 = cnt1.clone(); + inner_spawner + .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()) + .unwrap(); + + Poll::Pending + })) + .into(), + ) + .unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 2); @@ -262,9 +301,14 @@ fn run_until_stalled_executes_all_ready() { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); let cnt = cnt.clone(); - spawn.spawn_local_obj(Box::pin(lazy(move |_| { - cnt.set(cnt.get() + 1); - })).into()).unwrap(); + spawn + .spawn_local_obj( + Box::pin(lazy(move |_| { + cnt.set(cnt.get() + 1); + })) + .into(), + ) + .unwrap(); // also add some pending tasks to test if they are ignored spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); @@ -281,10 +325,15 @@ fn nesting_run() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); - spawn.spawn_obj(Box::pin(lazy(|_| { - let mut pool = LocalPool::new(); - pool.run(); - })).into()).unwrap(); + spawn + .spawn_obj( + Box::pin(lazy(|_| { + let mut pool = LocalPool::new(); + pool.run(); + })) + .into(), + ) + .unwrap(); pool.run(); } @@ -295,10 +344,15 @@ fn nesting_run_run_until_stalled() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); - spawn.spawn_obj(Box::pin(lazy(|_| { - let mut pool = LocalPool::new(); - pool.run_until_stalled(); - })).into()).unwrap(); + spawn + .spawn_obj( + Box::pin(lazy(|_| { + let mut pool = LocalPool::new(); + pool.run_until_stalled(); + })) + .into(), + ) + .unwrap(); pool.run(); } @@ -342,32 +396,26 @@ fn tasks_are_scheduled_fairly() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); - spawn.spawn_local_obj(Box::pin(Spin { - state: state.clone(), - idx: 0, - }).into()).unwrap(); + spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap(); - spawn.spawn_local_obj(Box::pin(Spin { - state, - idx: 1, - }).into()).unwrap(); + spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap(); pool.run(); } // Tests that the use of park/unpark in user-code has no -// effect on the expected behaviour of the executor. +// effect on the expected behavior of the executor. #[test] fn park_unpark_independence() { let mut done = false; let future = future::poll_fn(move |cx| { if done { - return Poll::Ready(()) + return Poll::Ready(()); } done = true; cx.waker().clone().wake(); // (*) - // some user-code that temporarily parks the thread + // some user-code that temporarily parks the thread let test = thread::current(); let latch = Arc::new(AtomicBool::new(false)); let signal = latch.clone(); @@ -384,4 +432,3 @@ fn park_unpark_independence() { futures::executor::block_on(future) } - |