diff options
author | Jeffrey Vander Stoep <jeffv@google.com> | 2022-12-12 16:05:43 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2022-12-12 16:05:43 +0000 |
commit | 2075f7506e8de73dade86f918711e3ab63259048 (patch) | |
tree | 8e8d0677dafdf7c767f07d71f535f2ae34ef57ab | |
parent | baf3ed4f340906e2c09cef631010cd04d6a21ce2 (diff) | |
parent | 138e0fd15b5dab0f91557097e670a0d4335eeb86 (diff) | |
download | futures-executor-2075f7506e8de73dade86f918711e3ab63259048.tar.gz |
Merge "Upgrade futures-executor to 0.3.25"main-16k-with-phones
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 6 | ||||
-rw-r--r-- | Cargo.toml | 9 | ||||
-rw-r--r-- | Cargo.toml.orig | 10 | ||||
-rw-r--r-- | METADATA | 12 | ||||
-rw-r--r-- | src/enter.rs | 2 | ||||
-rw-r--r-- | src/local_pool.rs | 106 | ||||
-rw-r--r-- | src/thread_pool.rs | 33 | ||||
-rw-r--r-- | tests/local_pool.rs | 66 |
9 files changed, 160 insertions, 86 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 77486f4..399f3c6 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + "sha1": "77d82198c5afd04af3e760a6aa50b7e875289fc3" }, "path_in_vcs": "futures-executor" }
\ No newline at end of file @@ -44,7 +44,7 @@ rust_test { host_supported: true, crate_name: "futures_executor", cargo_env_compat: true, - cargo_pkg_version: "0.3.21", + cargo_pkg_version: "0.3.25", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -71,7 +71,7 @@ rust_test { host_supported: true, crate_name: "local_pool", cargo_env_compat: true, - cargo_pkg_version: "0.3.21", + cargo_pkg_version: "0.3.25", srcs: ["tests/local_pool.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -99,7 +99,7 @@ rust_library { host_supported: true, crate_name: "futures_executor", cargo_env_compat: true, - cargo_pkg_version: "0.3.21", + cargo_pkg_version: "0.3.25", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -13,11 +13,12 @@ edition = "2018" rust-version = "1.45" name = "futures-executor" -version = "0.3.21" +version = "0.3.25" description = """ Executors for asynchronous tasks based on the futures-rs library. """ homepage = "https://rust-lang.github.io/futures-rs" +readme = "README.md" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" @@ -29,15 +30,15 @@ rustdoc-args = [ ] [dependencies.futures-core] -version = "0.3.21" +version = "0.3.25" default-features = false [dependencies.futures-task] -version = "0.3.21" +version = "0.3.25" default-features = false [dependencies.futures-util] -version = "0.3.21" +version = "0.3.25" default-features = false [dependencies.num_cpus] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index dae5f22..beaa843 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "futures-executor" -version = "0.3.21" +version = "0.3.25" edition = "2018" rust-version = "1.45" license = "MIT OR Apache-2.0" @@ -16,13 +16,13 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"] thread-pool = ["std", "num_cpus"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.21", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.21", default-features = false } +futures-core = { path = "../futures-core", version = "0.3.25", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.25", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.25", default-features = false } num_cpus = { version = "1.8.0", optional = true } [dev-dependencies] -futures = { path = "../futures" } +futures = { path = "../futures", features = ["thread-pool"] } [package.metadata.docs.rs] all-features = true @@ -1,3 +1,7 @@ +# This project was upgraded with external_updater. +# Usage: tools/external_updater/updater.sh update rust/crates/futures-executor +# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md + name: "futures-executor" description: "Executors for asynchronous tasks based on the futures-rs library." third_party { @@ -7,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-executor/futures-executor-0.3.21.crate" + value: "https://static.crates.io/crates/futures-executor/futures-executor-0.3.25.crate" } - version: "0.3.21" + version: "0.3.25" license_type: NOTICE last_upgrade_date { year: 2022 - month: 3 - day: 1 + month: 12 + day: 12 } } diff --git a/src/enter.rs b/src/enter.rs index 5895a9e..cb58c30 100644 --- a/src/enter.rs +++ b/src/enter.rs @@ -34,7 +34,7 @@ impl std::error::Error for EnterError {} /// executor. /// /// Executor implementations should call this function before beginning to -/// execute a tasks, and drop the returned [`Enter`](Enter) value after +/// execute a task, and drop the returned [`Enter`](Enter) value after /// completing task execution: /// /// ``` diff --git a/src/local_pool.rs b/src/local_pool.rs index bee96d8..8a9bc2f 100644 --- a/src/local_pool.rs +++ b/src/local_pool.rs @@ -63,7 +63,7 @@ thread_local! { impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc<Self>) { // Make sure the wakeup is remembered until the next `park()`. - let unparked = arc_self.unparked.swap(true, Ordering::Relaxed); + let unparked = arc_self.unparked.swap(true, Ordering::Release); if !unparked { // If the thread has not been unparked yet, it must be done // now. If it was actually parked, it will run again, @@ -90,33 +90,21 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T { if let Poll::Ready(t) = f(&mut cx) { return t; } - // Consume the wakeup that occurred while executing `f`, if any. - let unparked = thread_notify.unparked.swap(false, Ordering::Acquire); - if !unparked { + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { // No wakeup occurred. It may occur now, right before parking, // but in that case the token made available by `unpark()` // is guaranteed to still be available and `park()` is a no-op. thread::park(); - // When the thread is unparked, `unparked` will have been set - // and needs to be unset before the next call to `f` to avoid - // a redundant loop iteration. - thread_notify.unparked.store(false, Ordering::Release); } } }) } -fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T { - let _enter = enter().expect( - "cannot execute `LocalPool` executor from within \ - another executor", - ); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - f(&mut cx) - }) +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) } impl LocalPool { @@ -212,20 +200,26 @@ impl LocalPool { /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { - poll_executor(|ctx| { + run_executor(|cx| { loop { - let ret = self.poll_pool_once(ctx); - - // return if we have executed a future - if let Poll::Ready(Some(_)) = ret { - return true; + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), } - // if there are no new incoming futures - // then there is no feature that can make progress - // and we can return without having completed a single future - if self.incoming.borrow().is_empty() { - return false; + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); } } }) @@ -257,44 +251,52 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { - let _ = self.poll_pool(ctx); + run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } }); } - // Make maximal progress on the entire pool of spawned task, returning `Ready` - // if the pool is empty and `Pending` if no further progress can be made. + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // state for the FuturesUnordered, which will never be used loop { - let ret = self.poll_pool_once(cx); + self.drain_incoming(); - // we queued up some new tasks; add them and poll again + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. if !self.incoming.borrow().is_empty() { continue; } - // no queued tasks; we may be done - match ret { - Poll::Pending => return Poll::Pending, + match pool_ret { + Poll::Ready(Some(())) => continue, Poll::Ready(None) => return Poll::Ready(()), - _ => {} + Poll::Pending => return Poll::Pending, } } } - // Try make minimal progress on the pool of spawned tasks - fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { - // empty the incoming queue of newly-spawned tasks - { - let mut incoming = self.incoming.borrow_mut(); - for task in incoming.drain(..) { - self.pool.push(task) - } + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) } - - // try to execute the next ready future - self.pool.poll_next_unpin(cx) } } diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 5e1f586..5371008 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -108,12 +108,15 @@ impl ThreadPool { /// completion. /// /// ``` + /// # { /// use futures::executor::ThreadPool; /// /// let pool = ThreadPool::new().unwrap(); /// /// let future = async { /* ... */ }; /// pool.spawn_ok(future); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` /// /// > **Note**: This method is similar to `SpawnExt::spawn`, except that @@ -346,9 +349,8 @@ impl fmt::Debug for Task { impl ArcWake for WakeHandle { fn wake_by_ref(arc_self: &Arc<Self>) { - match arc_self.mutex.notify() { - Ok(task) => arc_self.exec.state.send(Message::Run(task)), - Err(()) => {} + if let Ok(task) = arc_self.mutex.notify() { + arc_self.exec.state.send(Message::Run(task)) } } } @@ -360,16 +362,19 @@ mod tests { #[test] fn test_drop_after_start() { - let (tx, rx) = mpsc::sync_channel(2); - let _cpu_pool = ThreadPoolBuilder::new() - .pool_size(2) - .after_start(move |_| tx.send(1).unwrap()) - .create() - .unwrap(); - - // After ThreadPoolBuilder is deconstructed, the tx should be dropped - // so that we can use rx as an iterator. - let count = rx.into_iter().count(); - assert_eq!(count, 2); + { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = ThreadPoolBuilder::new() + .pool_size(2) + .after_start(move |_| tx.send(1).unwrap()) + .create() + .unwrap(); + + // After ThreadPoolBuilder is deconstructed, the tx should be dropped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } } diff --git a/tests/local_pool.rs b/tests/local_pool.rs index 9b1316b..72ce74b 100644 --- a/tests/local_pool.rs +++ b/tests/local_pool.rs @@ -1,7 +1,7 @@ use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, lazy, poll_fn, Future}; -use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker}; +use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; @@ -288,7 +288,7 @@ fn run_until_stalled_runs_spawned_sub_futures() { #[test] fn run_until_stalled_executes_all_ready() { - const ITER: usize = 200; + const ITER: usize = if cfg!(miri) { 50 } else { 200 }; const PER_ITER: usize = 3; let cnt = Rc::new(Cell::new(0)); @@ -432,3 +432,65 @@ fn park_unpark_independence() { futures::executor::block_on(future) } + +struct SelfWaking { + wakeups_remaining: Rc<RefCell<usize>>, +} + +impl Future for SelfWaking { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if *self.wakeups_remaining.borrow() != 0 { + *self.wakeups_remaining.borrow_mut() -= 1; + cx.waker().wake_by_ref(); + } + + Poll::Pending + } +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `run_until_stalled` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_run_until_stalled() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + // This should keep polling until there are no more wakeups. + pool.run_until_stalled(); + + assert_eq!(*wakeups_remaining.borrow(), 0); +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `try_run_one` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_try_run_one() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + spawner.spawn(future::ready(())).unwrap(); + + // The `ready` future should complete. + assert!(pool.try_run_one()); + + // The self-waking futures are each polled once. + assert_eq!(*wakeups_remaining.borrow(), 7); +} |