aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-09 20:35:44 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-09 20:35:44 +0000
commit2296871c97fbaac20194d8da22f75bc58947559f (patch)
treeb43e15fe04d1c6ea9e0efa054185733a5959b366
parentbdb57517f1beb9f2752bd692125a336c071a1553 (diff)
parentc8aec5c18160241e9b5bc79bbc3580e13df5ab23 (diff)
downloadfutures-executor-android13-mainline-media-release.tar.gz
Change-Id: I138b1554f950166171c3979aa8f257fc30bf3e39
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp13
-rw-r--r--Cargo.toml42
-rw-r--r--Cargo.toml.orig11
-rw-r--r--METADATA8
-rw-r--r--README.md23
-rw-r--r--TEST_MAPPING39
-rw-r--r--benches/thread_notify.rs5
-rw-r--r--cargo2android.json5
-rw-r--r--src/lib.rs27
-rw-r--r--src/local_pool.rs14
-rw-r--r--src/thread_pool.rs53
-rw-r--r--src/unpark_mutex.rs27
-rw-r--r--tests/local_pool.rs189
14 files changed, 283 insertions, 180 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f3ad3ab..77486f4 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
- }
-}
+ "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ },
+ "path_in_vcs": "futures-executor"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index d707ff3..ebd9f28 100644
--- a/Android.bp
+++ b/Android.bp
@@ -43,6 +43,8 @@ rust_library {
name: "libfutures_executor",
host_supported: true,
crate_name: "futures_executor",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -58,18 +60,9 @@ rust_library {
],
apex_available: [
"//apex_available:platform",
+ "com.android.bluetooth",
"com.android.resolv",
"com.android.virt",
],
min_sdk_version: "29",
}
-
-// dependent_library ["feature_list"]
-// futures-core-0.3.14 "alloc,std"
-// futures-task-0.3.14 "alloc,std"
-// futures-util-0.3.14 "alloc,slab,std"
-// libc-0.2.94 "default,std"
-// num_cpus-1.13.0
-// pin-project-lite-0.2.6
-// pin-utils-0.1.0
-// slab-0.4.3 "default,std"
diff --git a/Cargo.toml b/Cargo.toml
index 62cb740..4b79bba 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,36 +3,41 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
+rust-version = "1.45"
name = "futures-executor"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "Executors for asynchronous tasks based on the futures-rs library.\n"
+version = "0.3.21"
+description = """
+Executors for asynchronous tasks based on the futures-rs library.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-executor/0.3"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
[dependencies.futures-core]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-task]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-util]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.num_cpus]
@@ -43,5 +48,12 @@ optional = true
[features]
default = ["std"]
-std = ["futures-core/std", "futures-task/std", "futures-util/std"]
-thread-pool = ["std", "num_cpus"]
+std = [
+ "futures-core/std",
+ "futures-task/std",
+ "futures-util/std",
+]
+thread-pool = [
+ "std",
+ "num_cpus",
+]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index bc1853b..dae5f22 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,11 @@
[package]
name = "futures-executor"
+version = "0.3.21"
edition = "2018"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-executor/0.3"
description = """
Executors for asynchronous tasks based on the futures-rs library.
"""
@@ -17,9 +16,9 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"]
thread-pool = ["std", "num_cpus"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.13", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.13", default-features = false }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.21", default-features = false }
num_cpus = { version = "1.8.0", optional = true }
[dev-dependencies]
diff --git a/METADATA b/METADATA
index 75a29b9..26071bc 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-executor/futures-executor-0.3.13.crate"
+ value: "https://static.crates.io/crates/futures-executor/futures-executor-0.3.21.crate"
}
- version: "0.3.13"
+ version: "0.3.21"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 4
+ year: 2022
+ month: 3
day: 1
}
}
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6708685
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# futures-executor
+
+Executors for asynchronous tasks based on the futures-rs library.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-executor = "0.3"
+```
+
+The current `futures-executor` requires Rust 1.45 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/TEST_MAPPING b/TEST_MAPPING
new file mode 100644
index 0000000..e2df61d
--- /dev/null
+++ b/TEST_MAPPING
@@ -0,0 +1,39 @@
+// Generated by update_crate_tests.py for tests that depend on this crate.
+{
+ "imports": [
+ {
+ "path": "external/rust/crates/anyhow"
+ },
+ {
+ "path": "external/rust/crates/tokio"
+ }
+ ],
+ "presubmit": [
+ {
+ "name": "ZipFuseTest"
+ },
+ {
+ "name": "authfs_device_test_src_lib"
+ },
+ {
+ "name": "doh_unit_test"
+ },
+ {
+ "name": "virtualizationservice_device_test"
+ }
+ ],
+ "presubmit-rust": [
+ {
+ "name": "ZipFuseTest"
+ },
+ {
+ "name": "authfs_device_test_src_lib"
+ },
+ {
+ "name": "doh_unit_test"
+ },
+ {
+ "name": "virtualizationservice_device_test"
+ }
+ ]
+}
diff --git a/benches/thread_notify.rs b/benches/thread_notify.rs
index d8fbec4..88d0447 100644
--- a/benches/thread_notify.rs
+++ b/benches/thread_notify.rs
@@ -102,10 +102,7 @@ fn thread_yield_multi_thread(b: &mut Bencher) {
});
b.iter(move || {
- let y = Yield {
- rem: NUM,
- tx: tx.clone(),
- };
+ let y = Yield { rem: NUM, tx: tx.clone() };
block_on(y);
});
diff --git a/cargo2android.json b/cargo2android.json
index 05ca777..da40e17 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,12 +1,13 @@
{
"apex-available": [
"//apex_available:platform",
+ "com.android.bluetooth",
"com.android.resolv",
"com.android.virt"
],
- "min_sdk_version": "29",
"dependencies": true,
"device": true,
"features": "thread-pool",
+ "min-sdk-version": "29",
"run": true
-} \ No newline at end of file
+}
diff --git a/src/lib.rs b/src/lib.rs
index b679649..b1af875 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -37,13 +37,20 @@
//! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
#![cfg_attr(not(feature = "std"), no_std)]
-
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
-// It cannot be included in the published code because this lints have false positives in the minimum required version.
-#![cfg_attr(test, warn(single_use_lifetimes))]
-#![warn(clippy::all)]
-#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "std")]
@@ -52,13 +59,13 @@ mod local_pool;
pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner};
#[cfg(feature = "thread-pool")]
-#[cfg(feature = "std")]
-mod unpark_mutex;
-#[cfg(feature = "thread-pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
#[cfg(feature = "std")]
mod thread_pool;
#[cfg(feature = "thread-pool")]
+#[cfg(feature = "std")]
+mod unpark_mutex;
+#[cfg(feature = "thread-pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
#[cfg(feature = "std")]
pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder};
diff --git a/src/local_pool.rs b/src/local_pool.rs
index 156d5cc..bee96d8 100644
--- a/src/local_pool.rs
+++ b/src/local_pool.rs
@@ -10,7 +10,10 @@ use futures_util::stream::StreamExt;
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
-use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
use std::thread::{self, Thread};
/// A single-threaded task pool for polling futures to completion.
@@ -119,17 +122,12 @@ fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
impl LocalPool {
/// Create a new, empty pool of tasks.
pub fn new() -> Self {
- Self {
- pool: FuturesUnordered::new(),
- incoming: Default::default(),
- }
+ Self { pool: FuturesUnordered::new(), incoming: Default::default() }
}
/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawner {
- LocalSpawner {
- incoming: Rc::downgrade(&self.incoming),
- }
+ LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
}
/// Run all tasks in the pool to completion.
diff --git a/src/thread_pool.rs b/src/thread_pool.rs
index 741e6d9..5e1f586 100644
--- a/src/thread_pool.rs
+++ b/src/thread_pool.rs
@@ -2,8 +2,8 @@ use crate::enter;
use crate::unpark_mutex::UnparkMutex;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
+use futures_task::{waker_ref, ArcWake};
use futures_task::{FutureObj, Spawn, SpawnError};
-use futures_task::{ArcWake, waker_ref};
use futures_util::future::FutureExt;
use std::cmp;
use std::fmt;
@@ -54,9 +54,7 @@ struct PoolState {
impl fmt::Debug for ThreadPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("ThreadPool")
- .field("size", &self.state.size)
- .finish()
+ f.debug_struct("ThreadPool").field("size", &self.state.size).finish()
}
}
@@ -100,10 +98,7 @@ impl ThreadPool {
pub fn spawn_obj_ok(&self, future: FutureObj<'static, ()>) {
let task = Task {
future,
- wake_handle: Arc::new(WakeHandle {
- exec: self.clone(),
- mutex: UnparkMutex::new(),
- }),
+ wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new() }),
exec: self.clone(),
};
self.state.send(Message::Run(task));
@@ -132,10 +127,7 @@ impl ThreadPool {
}
impl Spawn for ThreadPool {
- fn spawn_obj(
- &self,
- future: FutureObj<'static, ()>,
- ) -> Result<(), SpawnError> {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.spawn_obj_ok(future);
Ok(())
}
@@ -146,10 +138,12 @@ impl PoolState {
self.tx.lock().unwrap().send(msg).unwrap();
}
- fn work(&self,
- idx: usize,
- after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>,
- before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>) {
+ fn work(
+ &self,
+ idx: usize,
+ after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ ) {
let _scope = enter().unwrap();
if let Some(after_start) = after_start {
after_start(idx);
@@ -241,7 +235,8 @@ impl ThreadPoolBuilder {
/// The closure provided will receive an index corresponding to the worker
/// thread it's running on.
pub fn after_start<F>(&mut self, f: F) -> &mut Self
- where F: Fn(usize) + Send + Sync + 'static
+ where
+ F: Fn(usize) + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self
@@ -250,13 +245,14 @@ impl ThreadPoolBuilder {
/// Execute closure `f` just prior to shutting down each worker thread.
///
/// This hook is intended for bookkeeping and monitoring.
- /// The closure `f` will be dropped after the `builder` is droppped
+ /// The closure `f` will be dropped after the `builder` is dropped
/// and all threads in the pool have executed it.
///
/// The closure provided will receive an index corresponding to the worker
/// thread it's running on.
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
- where F: Fn(usize) + Send + Sync + 'static
+ where
+ F: Fn(usize) + Send + Sync + 'static,
{
self.before_stop = Some(Arc::new(f));
self
@@ -328,14 +324,11 @@ impl Task {
Poll::Pending => {}
Poll::Ready(()) => return wake_handle.mutex.complete(),
}
- let task = Self {
- future,
- wake_handle: wake_handle.clone(),
- exec,
- };
+ let task = Self { future, wake_handle: wake_handle.clone(), exec };
match wake_handle.mutex.wait(task) {
Ok(()) => return, // we've waited
- Err(task) => { // someone's notified us
+ Err(task) => {
+ // someone's notified us
future = task.future;
exec = task.exec;
}
@@ -347,9 +340,7 @@ impl Task {
impl fmt::Debug for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Task")
- .field("contents", &"...")
- .finish()
+ f.debug_struct("Task").field("contents", &"...").finish()
}
}
@@ -372,9 +363,11 @@ mod tests {
let (tx, rx) = mpsc::sync_channel(2);
let _cpu_pool = ThreadPoolBuilder::new()
.pool_size(2)
- .after_start(move |_| tx.send(1).unwrap()).create().unwrap();
+ .after_start(move |_| tx.send(1).unwrap())
+ .create()
+ .unwrap();
- // After ThreadPoolBuilder is deconstructed, the tx should be droped
+ // After ThreadPoolBuilder is deconstructed, the tx should be dropped
// so that we can use rx as an iterator.
let count = rx.into_iter().count();
assert_eq!(count, 2);
diff --git a/src/unpark_mutex.rs b/src/unpark_mutex.rs
index c49c64c..ac5112c 100644
--- a/src/unpark_mutex.rs
+++ b/src/unpark_mutex.rs
@@ -29,25 +29,22 @@ unsafe impl<D: Send> Sync for UnparkMutex<D> {}
// transitions:
// The task is blocked, waiting on an event
-const WAITING: usize = 0; // --> POLLING
+const WAITING: usize = 0; // --> POLLING
// The task is actively being polled by a thread; arrival of additional events
// of interest should move it to the REPOLL state
-const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE
+const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE
// The task is actively being polled, but will need to be re-polled upon
// completion to ensure that all events were observed.
-const REPOLL: usize = 2; // --> POLLING
+const REPOLL: usize = 2; // --> POLLING
// The task has finished executing (either successfully or with an error/panic)
-const COMPLETE: usize = 3; // No transitions out
+const COMPLETE: usize = 3; // No transitions out
impl<D> UnparkMutex<D> {
pub(crate) fn new() -> Self {
- Self {
- status: AtomicUsize::new(WAITING),
- inner: UnsafeCell::new(None),
- }
+ Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) }
}
/// Attempt to "notify" the mutex that a poll should occur.
@@ -62,8 +59,7 @@ impl<D> UnparkMutex<D> {
match status {
// The task is idle, so try to run it immediately.
WAITING => {
- match self.status.compare_exchange(WAITING, POLLING,
- SeqCst, SeqCst) {
+ match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) {
Ok(_) => {
let data = unsafe {
// SAFETY: we've ensured mutual exclusion via
@@ -82,13 +78,10 @@ impl<D> UnparkMutex<D> {
// The task is being polled, so we need to record that it should
// be *repolled* when complete.
- POLLING => {
- match self.status.compare_exchange(POLLING, REPOLL,
- SeqCst, SeqCst) {
- Ok(_) => return Err(()),
- Err(cur) => status = cur,
- }
- }
+ POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) {
+ Ok(_) => return Err(()),
+ Err(cur) => status = cur,
+ },
// The task is already scheduled for polling, or is complete, so
// we've got nothing to do.
diff --git a/tests/local_pool.rs b/tests/local_pool.rs
index b31f103..9b1316b 100644
--- a/tests/local_pool.rs
+++ b/tests/local_pool.rs
@@ -1,14 +1,14 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
-use futures::future::{self, Future, lazy, poll_fn};
-use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker};
+use futures::future::{self, lazy, poll_fn, Future};
+use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
use std::thread;
use std::time::Duration;
-use std::sync::Arc;
-use std::sync::atomic::{Ordering, AtomicBool};
struct Pending(Rc<()>);
@@ -52,9 +52,14 @@ fn run_until_executes_spawned() {
let (tx, rx) = oneshot::channel();
let mut pool = LocalPool::new();
let spawn = pool.spawner();
- spawn.spawn_local_obj(Box::pin(lazy(move |_| {
- tx.send(()).unwrap();
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ tx.send(()).unwrap();
+ }))
+ .into(),
+ )
+ .unwrap();
pool.run_until(rx).unwrap();
}
@@ -74,18 +79,27 @@ fn run_executes_spawned() {
let spawn = pool.spawner();
let spawn2 = pool.spawner();
- spawn.spawn_local_obj(Box::pin(lazy(move |_| {
- spawn2.spawn_local_obj(Box::pin(lazy(move |_| {
- cnt2.set(cnt2.get() + 1);
- })).into()).unwrap();
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ spawn2
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt2.set(cnt2.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+ }))
+ .into(),
+ )
+ .unwrap();
pool.run();
assert_eq!(cnt.get(), 1);
}
-
#[test]
fn run_spawn_many() {
const ITER: usize = 200;
@@ -97,9 +111,14 @@ fn run_spawn_many() {
for _ in 0..ITER {
let cnt = cnt.clone();
- spawn.spawn_local_obj(Box::pin(lazy(move |_| {
- cnt.set(cnt.get() + 1);
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
}
pool.run();
@@ -126,9 +145,14 @@ fn try_run_one_executes_one_ready() {
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
let cnt = cnt.clone();
- spawn.spawn_local_obj(Box::pin(lazy(move |_| {
- cnt.set(cnt.get() + 1);
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
}
@@ -154,15 +178,20 @@ fn try_run_one_returns_on_no_progress() {
{
let cnt = cnt.clone();
let waker = waker.clone();
- spawn.spawn_local_obj(Box::pin(poll_fn(move |ctx| {
- cnt.set(cnt.get() + 1);
- waker.set(Some(ctx.waker().clone()));
- if cnt.get() == ITER {
- Poll::Ready(())
- } else {
- Poll::Pending
- }
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |ctx| {
+ cnt.set(cnt.get() + 1);
+ waker.set(Some(ctx.waker().clone()));
+ if cnt.get() == ITER {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }))
+ .into(),
+ )
+ .unwrap();
}
for i in 0..ITER - 1 {
@@ -185,16 +214,21 @@ fn try_run_one_runs_sub_futures() {
let inner_spawner = spawn.clone();
let cnt1 = cnt.clone();
- spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
- cnt1.set(cnt1.get() + 1);
-
- let cnt2 = cnt1.clone();
- inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
- cnt2.set(cnt2.get() + 1)
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |_| {
+ cnt1.set(cnt1.get() + 1);
- Poll::Pending
- })).into()).unwrap();
+ let cnt2 = cnt1.clone();
+ inner_spawner
+ .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
+ .unwrap();
+
+ Poll::Pending
+ }))
+ .into(),
+ )
+ .unwrap();
pool.try_run_one();
assert_eq!(cnt.get(), 2);
@@ -214,12 +248,12 @@ fn run_until_stalled_returns_multiple_times() {
let cnt = Rc::new(Cell::new(0));
let cnt1 = cnt.clone();
- spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt1.set(cnt1.get() + 1) })).into()).unwrap();
+ spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
pool.run_until_stalled();
assert_eq!(cnt.get(), 1);
let cnt2 = cnt.clone();
- spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap();
+ spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
pool.run_until_stalled();
assert_eq!(cnt.get(), 2);
}
@@ -232,16 +266,21 @@ fn run_until_stalled_runs_spawned_sub_futures() {
let inner_spawner = spawn.clone();
let cnt1 = cnt.clone();
- spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
- cnt1.set(cnt1.get() + 1);
-
- let cnt2 = cnt1.clone();
- inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
- cnt2.set(cnt2.get() + 1)
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |_| {
+ cnt1.set(cnt1.get() + 1);
- Poll::Pending
- })).into()).unwrap();
+ let cnt2 = cnt1.clone();
+ inner_spawner
+ .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
+ .unwrap();
+
+ Poll::Pending
+ }))
+ .into(),
+ )
+ .unwrap();
pool.run_until_stalled();
assert_eq!(cnt.get(), 2);
@@ -262,9 +301,14 @@ fn run_until_stalled_executes_all_ready() {
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
let cnt = cnt.clone();
- spawn.spawn_local_obj(Box::pin(lazy(move |_| {
- cnt.set(cnt.get() + 1);
- })).into()).unwrap();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
// also add some pending tasks to test if they are ignored
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
@@ -281,10 +325,15 @@ fn nesting_run() {
let mut pool = LocalPool::new();
let spawn = pool.spawner();
- spawn.spawn_obj(Box::pin(lazy(|_| {
- let mut pool = LocalPool::new();
- pool.run();
- })).into()).unwrap();
+ spawn
+ .spawn_obj(
+ Box::pin(lazy(|_| {
+ let mut pool = LocalPool::new();
+ pool.run();
+ }))
+ .into(),
+ )
+ .unwrap();
pool.run();
}
@@ -295,10 +344,15 @@ fn nesting_run_run_until_stalled() {
let mut pool = LocalPool::new();
let spawn = pool.spawner();
- spawn.spawn_obj(Box::pin(lazy(|_| {
- let mut pool = LocalPool::new();
- pool.run_until_stalled();
- })).into()).unwrap();
+ spawn
+ .spawn_obj(
+ Box::pin(lazy(|_| {
+ let mut pool = LocalPool::new();
+ pool.run_until_stalled();
+ }))
+ .into(),
+ )
+ .unwrap();
pool.run();
}
@@ -342,32 +396,26 @@ fn tasks_are_scheduled_fairly() {
let mut pool = LocalPool::new();
let spawn = pool.spawner();
- spawn.spawn_local_obj(Box::pin(Spin {
- state: state.clone(),
- idx: 0,
- }).into()).unwrap();
+ spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
- spawn.spawn_local_obj(Box::pin(Spin {
- state,
- idx: 1,
- }).into()).unwrap();
+ spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
pool.run();
}
// Tests that the use of park/unpark in user-code has no
-// effect on the expected behaviour of the executor.
+// effect on the expected behavior of the executor.
#[test]
fn park_unpark_independence() {
let mut done = false;
let future = future::poll_fn(move |cx| {
if done {
- return Poll::Ready(())
+ return Poll::Ready(());
}
done = true;
cx.waker().clone().wake(); // (*)
- // some user-code that temporarily parks the thread
+ // some user-code that temporarily parks the thread
let test = thread::current();
let latch = Arc::new(AtomicBool::new(false));
let signal = latch.clone();
@@ -384,4 +432,3 @@ fn park_unpark_independence() {
futures::executor::block_on(future)
}
-