aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 01:04:37 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 01:04:37 +0000
commit0271bfc31d40c780186ca1b451f1d4419791386c (patch)
tree30fab8e942b35661e92e0f597cb0da429f72430d
parent33362b198e15e06a8c5e51d9d8d4f62eb96449e5 (diff)
parentb0f9c5ad609b8a73c2c9f86fd63086a05ece90e9 (diff)
downloadspin-android14-mainline-cellbroadcast-release.tar.gz
Change-Id: Ibbdd723d49c6425d1721080ba3eb00ab13617d35
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--.github/workflows/rust.yml48
-rw-r--r--Android.bp16
-rw-r--r--CHANGELOG.md47
-rw-r--r--Cargo.toml62
-rw-r--r--Cargo.toml.orig31
-rw-r--r--METADATA14
-rw-r--r--README.md22
-rw-r--r--TEST_MAPPING51
-rw-r--r--benches/mutex.rs128
-rw-r--r--cargo2android.json2
-rw-r--r--src/barrier.rs16
-rw-r--r--src/lazy.rs12
-rw-r--r--src/lib.rs45
-rw-r--r--src/mutex.rs20
-rw-r--r--src/mutex/fair.rs735
-rw-r--r--src/mutex/spin.rs70
-rw-r--r--src/mutex/ticket.rs18
-rw-r--r--src/once.rs257
-rw-r--r--src/relax.rs5
-rw-r--r--src/rwlock.rs123
21 files changed, 1496 insertions, 233 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 56f48b2..fd12933 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "95e2993afe52104d6d585173ddedb3da6afba533"
- }
-}
+ "sha1": "a080ab5a952290e32bc455213631ffddb4d794e4"
+ },
+ "path_in_vcs": ""
+} \ No newline at end of file
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 3c13d1b..ed2b6ce 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -9,14 +9,50 @@ on:
env:
CARGO_TERM_COLOR: always
-jobs:
- build:
+permissions: read-all
+jobs:
+ test:
runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ rust: [stable, beta, nightly]
steps:
- - uses: actions/checkout@v2
- - name: Build
- run: cargo build --verbose
- - name: Run tests
+ - uses: actions/checkout@v3
+ - name: Install Rust
+ run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
+ - name: Run Tests
run: cargo test --verbose
+ - run: cargo build --all --all-features --all-targets
+ - name: Catch missing feature flags
+ if: startsWith(matrix.rust, 'nightly')
+ run: cargo check -Z features=dev_dep
+ - name: Install cargo-hack
+ uses: taiki-e/install-action@cargo-hack
+ - run: rustup target add thumbv7m-none-eabi
+ - name: Ensure we don't depend on libstd
+ run: cargo hack build --target thumbv7m-none-eabi --no-dev-deps --no-default-features
+
+ msrv:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ version: [1.38.0]
+ steps:
+ - uses: actions/checkout@v3
+ - name: Install Rust
+ run: rustup update ${{ matrix.version }} && rustup default ${{ matrix.version }}
+ - run: cargo build --all --all-features --all-targets
+
+ miri:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Install Rust
+ run: rustup toolchain install nightly --component miri && rustup default nightly
+ - run: cargo miri test
+ env:
+ MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks
+ RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
diff --git a/Android.bp b/Android.bp
index 6d0e88c..e2e6eb4 100644
--- a/Android.bp
+++ b/Android.bp
@@ -36,7 +36,7 @@ rust_library {
host_supported: true,
crate_name: "spin",
cargo_env_compat: true,
- cargo_pkg_version: "0.9.2",
+ cargo_pkg_version: "0.9.7",
srcs: ["src/lib.rs"],
edition: "2015",
features: [
@@ -49,6 +49,8 @@ rust_library {
"com.android.resolv",
"com.android.virt",
],
+ product_available: true,
+ vendor_available: true,
min_sdk_version: "29",
}
@@ -57,7 +59,7 @@ rust_test {
host_supported: true,
crate_name: "spin",
cargo_env_compat: true,
- cargo_pkg_version: "0.9.2",
+ cargo_pkg_version: "0.9.7",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -69,6 +71,9 @@ rust_test {
"once",
"std",
],
+ rustlibs: [
+ "libcriterion",
+ ],
}
rust_library_rlib {
@@ -89,10 +94,3 @@ rust_library_rlib {
],
min_sdk_version: "29",
}
-
-
-// Errors when listing tests:
-// error[E0433]: failed to resolve: could not find `Mutex` in `spin`
-// error[E0433]: failed to resolve: could not find `RwLock` in `spin`
-// error: could not compile `spin` due to 2 previous errors
-// error: build failed
diff --git a/CHANGELOG.md b/CHANGELOG.md
index abbeee1..e62adfc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,53 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
+# [0.9.7] - 2023-03-27
+
+### Fixed
+
+- Relaxed accidentally restricted `Send`/`Sync` bounds for `Mutex` guards
+
+# [0.9.6] - 2023-03-13
+
+### Fixed
+
+- Relaxed accidentally restricted `Send`/`Sync` bounds for `RwLock` guards
+
+# [0.9.5] - 2023-02-07
+
+### Added
+
+- `FairMutex`, a new mutex implementation that reduces writer starvation.
+- A MSRV policy: Rust 1.38 is currently required
+
+### Changed
+
+- The crate's CI now has full MIRI integration, further improving the confidence you can have in the implementation.
+
+### Fixed
+
+- Ensured that the crate's abstractions comply with stacked borrows rules.
+- Unsoundness in the `RwLock` that could be triggered via a reader overflow
+- Relaxed various `Send`/`Sync` bound requirements to make the crate more flexible
+
+# [0.9.4] - 2022-07-14
+
+### Fixed
+
+- Fixed unsoundness in `RwLock` on reader overflow
+- Relaxed `Send`/`Sync` bounds for `SpinMutex` and `TicketMutex` (doesn't affect `Mutex` itself)
+
+# [0.9.3] - 2022-04-17
+
+### Added
+
+- Implemented `Default` for `Once`
+- `Once::try_call_once`
+
+### Fixed
+
+- Fixed bug that caused `Once::call_once` to incorrectly fail
+
# [0.9.2] - 2021-07-09
### Changed
diff --git a/Cargo.toml b/Cargo.toml
index c32199c..0a4f8cd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,38 +3,78 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
+rust-version = "1.38"
name = "spin"
-version = "0.9.2"
-authors = ["Mathijs van de Nes <git@mathijs.vd-nes.nl>", "John Ericson <git@JohnEricson.me>", "Joshua Barretto <joshua.s.barretto@gmail.com>"]
+version = "0.9.7"
+authors = [
+ "Mathijs van de Nes <git@mathijs.vd-nes.nl>",
+ "John Ericson <git@JohnEricson.me>",
+ "Joshua Barretto <joshua.s.barretto@gmail.com>",
+]
description = "Spin-based synchronization primitives"
-keywords = ["spinlock", "mutex", "rwlock"]
+readme = "README.md"
+keywords = [
+ "spinlock",
+ "mutex",
+ "rwlock",
+]
license = "MIT"
repository = "https://github.com/mvdnes/spin-rs.git"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
+[[bench]]
+name = "mutex"
+harness = false
+required-features = ["ticket_mutex"]
+
[dependencies.lock_api_crate]
version = "0.4"
optional = true
package = "lock_api"
+[dependencies.portable-atomic]
+version = "1"
+optional = true
+default-features = false
+
+[dev-dependencies.criterion]
+version = "0.4"
+
[features]
barrier = ["mutex"]
-default = ["lock_api", "mutex", "spin_mutex", "rwlock", "once", "lazy", "barrier"]
+default = [
+ "lock_api",
+ "mutex",
+ "spin_mutex",
+ "rwlock",
+ "once",
+ "lazy",
+ "barrier",
+]
+fair_mutex = ["mutex"]
lazy = ["once"]
lock_api = ["lock_api_crate"]
mutex = []
once = []
+portable_atomic = ["portable-atomic"]
rwlock = []
spin_mutex = ["mutex"]
std = []
ticket_mutex = ["mutex"]
-use_ticket_mutex = ["mutex", "ticket_mutex"]
+use_ticket_mutex = [
+ "mutex",
+ "ticket_mutex",
+]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index ee6fb09..ca2fdc3 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,18 +1,20 @@
[package]
name = "spin"
-version = "0.9.2"
+version = "0.9.7"
authors = [
- "Mathijs van de Nes <git@mathijs.vd-nes.nl>",
- "John Ericson <git@JohnEricson.me>",
- "Joshua Barretto <joshua.s.barretto@gmail.com>",
+ "Mathijs van de Nes <git@mathijs.vd-nes.nl>",
+ "John Ericson <git@JohnEricson.me>",
+ "Joshua Barretto <joshua.s.barretto@gmail.com>",
]
license = "MIT"
repository = "https://github.com/mvdnes/spin-rs.git"
keywords = ["spinlock", "mutex", "rwlock"]
description = "Spin-based synchronization primitives"
+rust-version = "1.38"
[dependencies]
lock_api_crate = { package = "lock_api", version = "0.4", optional = true }
+portable-atomic = { version = "1", optional = true, default-features = false }
[features]
default = ["lock_api", "mutex", "spin_mutex", "rwlock", "once", "lazy", "barrier"]
@@ -26,6 +28,9 @@ spin_mutex = ["mutex"]
# Enables `TicketMutex`.
ticket_mutex = ["mutex"]
+# Enables `FairMutex`.
+fair_mutex = ["mutex"]
+
# Enables the non-default ticket mutex implementation for `Mutex`.
use_ticket_mutex = ["mutex", "ticket_mutex"]
@@ -47,6 +52,24 @@ lock_api = ["lock_api_crate"]
# Enables std-only features such as yield-relaxing.
std = []
+# Use the portable_atomic crate to support platforms without native atomic operations.
+# The `portable_atomic_unsafe_assume_single_core` cfg or `critical-section` feature
+# of `portable-atomic` crate must also be set by the final binary crate.
+# When using the cfg, note that it is unsafe and enabling it for multicore systems is unsound.
+# When using the `critical-section` feature, you need to implement the critical-section
+# implementation that sound for your system by implementing an unsafe trait.
+# See the documentation for the `portable-atomic` crate for more information:
+# https://docs.rs/portable-atomic/latest/portable_atomic/#optional-cfg
+portable_atomic = ["portable-atomic"]
+
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
+
+[dev-dependencies]
+criterion = "0.4"
+
+[[bench]]
+name = "mutex"
+harness = false
+required-features = ["ticket_mutex"]
diff --git a/METADATA b/METADATA
index 0213653..7de7cf6 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/spin
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "spin"
description: "Spin-based synchronization primitives"
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/spin/spin-0.9.2.crate"
+ value: "https://static.crates.io/crates/spin/spin-0.9.7.crate"
}
- version: "0.9.2"
+ version: "0.9.7"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 8
- day: 9
+ year: 2023
+ month: 4
+ day: 3
}
}
diff --git a/README.md b/README.md
index 3d7d758..7fd3780 100644
--- a/README.md
+++ b/README.md
@@ -92,6 +92,23 @@ The crate comes with a few feature flags that you may wish to use.
- `std` enables support for thread yielding instead of spinning.
+- `portable_atomic` enables usage of the `portable-atomic` crate
+ to support platforms without native atomic operations (Cortex-M0, etc.).
+ The `portable_atomic_unsafe_assume_single_core` cfg or `critical-section` feature
+ of `portable-atomic` crate must also be set by the final binary crate.
+
+ When using the cfg, this can be done by adapting the following snippet to the `.cargo/config` file:
+ ```
+ [target.<target>]
+ rustflags = [ "--cfg", "portable_atomic_unsafe_assume_single_core" ]
+ ```
+ Note that this cfg is unsafe by nature, and enabling it for multicore systems is unsound.
+
+ When using the `critical-section` feature, you need to implement the critical-section
+ implementation that sound for your system by implementing an unsafe trait.
+ See [the documentation for the `portable-atomic` crate](https://docs.rs/portable-atomic/latest/portable_atomic/#optional-cfg)
+ for more information.
+
## Remarks
It is often desirable to have a lock shared between threads. Wrapping the lock in an
@@ -116,6 +133,11 @@ time for your crate's users. You can do this like so:
spin = { version = "x.y", default-features = false, features = [...] }
```
+## Minimum Safe Rust Version (MSRV)
+
+This crate is guaranteed to compile on a Minimum Safe Rust Version (MSRV) of 1.38.0 and above.
+This version will not be changed without a minor version bump.
+
## License
`spin` is distributed under the MIT License, (See `LICENSE`).
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 777e539..c028b97 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -9,64 +9,19 @@
},
{
"path": "external/rust/crates/webpki"
+ },
+ {
+ "path": "packages/modules/DnsResolver"
}
],
"presubmit": [
{
- "name": "apkdmverity.test"
- },
- {
- "name": "authfs_device_test_src_lib"
- },
- {
- "name": "doh_unit_test"
- },
- {
- "name": "libapkverify.integration_test"
- },
- {
- "name": "libapkverify.test"
- },
- {
- "name": "libidsig.test"
- },
- {
- "name": "microdroid_manager_test"
- },
- {
"name": "spin_test_src_lib"
- },
- {
- "name": "virtualizationservice_device_test"
}
],
"presubmit-rust": [
{
- "name": "apkdmverity.test"
- },
- {
- "name": "authfs_device_test_src_lib"
- },
- {
- "name": "doh_unit_test"
- },
- {
- "name": "libapkverify.integration_test"
- },
- {
- "name": "libapkverify.test"
- },
- {
- "name": "libidsig.test"
- },
- {
- "name": "microdroid_manager_test"
- },
- {
"name": "spin_test_src_lib"
- },
- {
- "name": "virtualizationservice_device_test"
}
]
}
diff --git a/benches/mutex.rs b/benches/mutex.rs
new file mode 100644
index 0000000..5201145
--- /dev/null
+++ b/benches/mutex.rs
@@ -0,0 +1,128 @@
+#![feature(generic_associated_types)]
+
+#[macro_use]
+extern crate criterion;
+
+use criterion::{Criterion, Bencher, black_box};
+use std::{
+ ops::DerefMut,
+ sync::Arc,
+};
+
+trait Mutex<T>: Send + Sync + 'static {
+ type Guard<'a>: DerefMut<Target = T> where Self: 'a;
+ fn new(x: T) -> Self;
+ fn lock(&self) -> Self::Guard<'_>;
+}
+
+impl<T: Send + 'static> Mutex<T> for spin::mutex::SpinMutex<T> {
+ type Guard<'a> = spin::mutex::SpinMutexGuard<'a, T> where Self: 'a;
+ fn new(x: T) -> Self { spin::mutex::SpinMutex::new(x) }
+ fn lock(&self) -> Self::Guard<'_> { self.lock() }
+}
+
+impl<T: Send + 'static> Mutex<T> for spin::mutex::TicketMutex<T> {
+ type Guard<'a> = spin::mutex::TicketMutexGuard<'a, T> where Self: 'a;
+ fn new(x: T) -> Self { spin::mutex::TicketMutex::new(x) }
+ fn lock(&self) -> Self::Guard<'_> { self.lock() }
+}
+
+impl<T: Send + 'static> Mutex<T> for std::sync::Mutex<T> {
+ type Guard<'a> = std::sync::MutexGuard<'a, T> where Self: 'a;
+ fn new(x: T) -> Self { std::sync::Mutex::new(x) }
+ fn lock(&self) -> Self::Guard<'_> { self.lock().unwrap() }
+}
+
+fn gen_create<M: Mutex<u32>>(b: &mut Bencher) {
+ b.iter(|| {
+ let n = black_box(42);
+ M::new(n)
+ });
+}
+
+fn gen_lock_unlock<M: Mutex<u32>>(b: &mut Bencher) {
+ let m = M::new(0);
+ b.iter(|| {
+ let mut m = m.lock();
+ *m = m.wrapping_add(1);
+ drop(m);
+ });
+}
+
+fn gen_lock_unlock_read_contention<M: Mutex<u32>>(b: &mut Bencher) {
+ let m = Arc::new(M::new(0));
+ let thread = std::thread::spawn({
+ let m = m.clone();
+ move || {
+ while Arc::strong_count(&m) > 1 {
+ for _ in 0..1000 {
+ black_box(*m.lock());
+ }
+ }
+ }
+ });
+ b.iter(|| {
+ let mut m = m.lock();
+ *m = m.wrapping_add(1);
+ drop(m);
+ });
+ drop(m);
+ thread.join().unwrap();
+}
+
+fn gen_lock_unlock_write_contention<M: Mutex<u32>>(b: &mut Bencher) {
+ let m = Arc::new(M::new(0));
+ let thread = std::thread::spawn({
+ let m = m.clone();
+ move || {
+ while Arc::strong_count(&m) > 1 {
+ for _ in 0..1000 {
+ let mut m = m.lock();
+ *m = m.wrapping_add(1);
+ drop(m);
+ }
+ }
+ }
+ });
+ b.iter(|| {
+ let mut m = m.lock();
+ *m = m.wrapping_add(1);
+ drop(m);
+ });
+ drop(m);
+ thread.join().unwrap();
+}
+
+fn create(b: &mut Criterion) {
+ b.bench_function("create-spin-spinmutex", |b| gen_create::<spin::mutex::SpinMutex<u32>>(b));
+ b.bench_function("create-spin-ticketmutex", |b| gen_create::<spin::mutex::TicketMutex<u32>>(b));
+ b.bench_function("create-std", |b| gen_create::<std::sync::Mutex<u32>>(b));
+}
+
+fn lock_unlock(b: &mut Criterion) {
+ b.bench_function("lock_unlock-spin-spinmutex", |b| gen_lock_unlock::<spin::mutex::SpinMutex<u32>>(b));
+ b.bench_function("lock_unlock-spin-ticketmutex", |b| gen_lock_unlock::<spin::mutex::TicketMutex<u32>>(b));
+ b.bench_function("lock_unlock-std", |b| gen_lock_unlock::<std::sync::Mutex<u32>>(b));
+}
+
+fn lock_unlock_read_contention(b: &mut Criterion) {
+ b.bench_function("lock_unlock_read_contention-spin-spinmutex", |b| gen_lock_unlock_read_contention::<spin::mutex::SpinMutex<u32>>(b));
+ b.bench_function("lock_unlock_read_contention-spin-ticketmutex", |b| gen_lock_unlock_read_contention::<spin::mutex::TicketMutex<u32>>(b));
+ b.bench_function("lock_unlock_read_contention-std", |b| gen_lock_unlock_read_contention::<std::sync::Mutex<u32>>(b));
+}
+
+fn lock_unlock_write_contention(b: &mut Criterion) {
+ b.bench_function("lock_unlock_write_contention-spin-spinmutex", |b| gen_lock_unlock_write_contention::<spin::mutex::SpinMutex<u32>>(b));
+ b.bench_function("lock_unlock_write_contention-spin-ticketmutex", |b| gen_lock_unlock_write_contention::<spin::mutex::TicketMutex<u32>>(b));
+ b.bench_function("lock_unlock_write_contention-std", |b| gen_lock_unlock_write_contention::<std::sync::Mutex<u32>>(b));
+}
+
+criterion_group!(
+ mutex,
+ create,
+ lock_unlock,
+ lock_unlock_read_contention,
+ lock_unlock_write_contention,
+);
+
+criterion_main!(mutex);
diff --git a/cargo2android.json b/cargo2android.json
index 086d38a..0be577a 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -12,4 +12,4 @@
"min-sdk-version": "29",
"run": true,
"tests": true
-} \ No newline at end of file
+}
diff --git a/src/barrier.rs b/src/barrier.rs
index 7a13890..c3a1c92 100644
--- a/src/barrier.rs
+++ b/src/barrier.rs
@@ -115,8 +115,7 @@ impl<R: RelaxStrategy> Barrier<R> {
// not the leader
let local_gen = lock.generation_id;
- while local_gen == lock.generation_id &&
- lock.count < self.num_threads {
+ while local_gen == lock.generation_id && lock.count < self.num_threads {
drop(lock);
R::relax();
lock = self.lock.lock();
@@ -176,7 +175,9 @@ impl BarrierWaitResult {
/// let barrier_wait_result = barrier.wait();
/// println!("{:?}", barrier_wait_result.is_leader());
/// ```
- pub fn is_leader(&self) -> bool { self.0 }
+ pub fn is_leader(&self) -> bool {
+ self.0
+ }
}
#[cfg(test)]
@@ -192,12 +193,13 @@ mod tests {
fn use_barrier(n: usize, barrier: Arc<Barrier>) {
let (tx, rx) = channel();
+ let mut ts = Vec::new();
for _ in 0..n - 1 {
let c = barrier.clone();
let tx = tx.clone();
- thread::spawn(move|| {
+ ts.push(thread::spawn(move || {
tx.send(c.wait().is_leader()).unwrap();
- });
+ }));
}
// At this point, all spawned threads should be blocked,
@@ -217,6 +219,10 @@ mod tests {
}
}
assert!(leader_found);
+
+ for t in ts {
+ t.join().unwrap();
+ }
}
#[test]
diff --git a/src/lazy.rs b/src/lazy.rs
index 1473db1..6e5efe4 100644
--- a/src/lazy.rs
+++ b/src/lazy.rs
@@ -3,8 +3,8 @@
//! Implementation adapted from the `SyncLazy` type of the standard library. See:
//! <https://doc.rust-lang.org/std/lazy/struct.SyncLazy.html>
-use core::{cell::Cell, fmt, ops::Deref};
use crate::{once::Once, RelaxStrategy, Spin};
+use core::{cell::Cell, fmt, ops::Deref};
/// A value which is initialized on the first access.
///
@@ -45,7 +45,10 @@ pub struct Lazy<T, F = fn() -> T, R = Spin> {
impl<T: fmt::Debug, F, R> fmt::Debug for Lazy<T, F, R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Lazy").field("cell", &self.cell).field("init", &"..").finish()
+ f.debug_struct("Lazy")
+ .field("cell", &self.cell)
+ .field("init", &"..")
+ .finish()
}
}
@@ -61,7 +64,10 @@ impl<T, F, R> Lazy<T, F, R> {
/// Creates a new lazy value with the given initializing
/// function.
pub const fn new(f: F) -> Self {
- Self { cell: Once::new(), init: Cell::new(Some(f)) }
+ Self {
+ cell: Once::new(),
+ init: Cell::new(Some(f)),
+ }
}
/// Retrieves a mutable pointer to the inner data.
///
diff --git a/src/lib.rs b/src/lib.rs
index 92af28a..50768bc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -55,11 +55,22 @@
//!
//! - `ticket_mutex` uses a ticket lock for the implementation of `Mutex`
//!
+//! - `fair_mutex` enables a fairer implementation of `Mutex` that uses eventual fairness to avoid
+//! starvation
+//!
//! - `std` enables support for thread yielding instead of spinning
#[cfg(any(test, feature = "std"))]
extern crate core;
+#[cfg(feature = "portable_atomic")]
+extern crate portable_atomic;
+
+#[cfg(not(feature = "portable_atomic"))]
+use core::sync::atomic;
+#[cfg(feature = "portable_atomic")]
+use portable_atomic as atomic;
+
#[cfg(feature = "barrier")]
#[cfg_attr(docsrs, doc(cfg(feature = "barrier")))]
pub mod barrier;
@@ -72,21 +83,21 @@ pub mod mutex;
#[cfg(feature = "once")]
#[cfg_attr(docsrs, doc(cfg(feature = "once")))]
pub mod once;
+pub mod relax;
#[cfg(feature = "rwlock")]
#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))]
pub mod rwlock;
-pub mod relax;
#[cfg(feature = "mutex")]
#[cfg_attr(docsrs, doc(cfg(feature = "mutex")))]
pub use mutex::MutexGuard;
-#[cfg(feature = "rwlock")]
-#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))]
-pub use rwlock::RwLockReadGuard;
-pub use relax::{Spin, RelaxStrategy};
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub use relax::Yield;
+pub use relax::{RelaxStrategy, Spin};
+#[cfg(feature = "rwlock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))]
+pub use rwlock::RwLockReadGuard;
// Avoid confusing inference errors by aliasing away the relax strategy parameter. Users that need to use a different
// relax strategy can do so by accessing the types through their fully-qualified path. This is a little bit horrible
@@ -184,3 +195,27 @@ pub mod lock_api {
pub type RwLockUpgradableReadGuard<'a, T> =
lock_api_crate::RwLockUpgradableReadGuard<'a, crate::RwLock<()>, T>;
}
+
+/// In the event of an invalid operation, it's best to abort the current process.
+#[cfg(feature = "fair_mutex")]
+fn abort() -> ! {
+ #[cfg(not(feature = "std"))]
+ {
+ // Panicking while panicking is defined by Rust to result in an abort.
+ struct Panic;
+
+ impl Drop for Panic {
+ fn drop(&mut self) {
+ panic!("aborting due to invalid operation");
+ }
+ }
+
+ let _panic = Panic;
+ panic!("aborting due to invalid operation");
+ }
+
+ #[cfg(feature = "std")]
+ {
+ std::process::abort();
+ }
+}
diff --git a/src/mutex.rs b/src/mutex.rs
index 2335051..e333d8a 100644
--- a/src/mutex.rs
+++ b/src/mutex.rs
@@ -27,11 +27,18 @@ pub mod ticket;
#[cfg_attr(docsrs, doc(cfg(feature = "ticket_mutex")))]
pub use self::ticket::{TicketMutex, TicketMutexGuard};
+#[cfg(feature = "fair_mutex")]
+#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))]
+pub mod fair;
+#[cfg(feature = "fair_mutex")]
+#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))]
+pub use self::fair::{FairMutex, FairMutexGuard, Starvation};
+
+use crate::{RelaxStrategy, Spin};
use core::{
fmt,
ops::{Deref, DerefMut},
};
-use crate::{RelaxStrategy, Spin};
#[cfg(all(not(feature = "spin_mutex"), not(feature = "use_ticket_mutex")))]
compile_error!("The `mutex` feature flag was used (perhaps through another feature?) without either `spin_mutex` or `use_ticket_mutex`. One of these is required.");
@@ -78,9 +85,11 @@ type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>;
/// // We use a barrier to ensure the readout happens after all writing
/// let barrier = Arc::new(Barrier::new(thread_count + 1));
///
+/// # let mut ts = Vec::new();
/// for _ in (0..thread_count) {
/// let my_barrier = barrier.clone();
/// let my_lock = spin_mutex.clone();
+/// # let t =
/// std::thread::spawn(move || {
/// let mut guard = my_lock.lock();
/// *guard += 1;
@@ -89,12 +98,17 @@ type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>;
/// drop(guard);
/// my_barrier.wait();
/// });
+/// # ts.push(t);
/// }
///
/// barrier.wait();
///
/// let answer = { *spin_mutex.lock() };
/// assert_eq!(answer, thread_count);
+///
+/// # for t in ts {
+/// # t.join().unwrap();
+/// # }
/// ```
pub struct Mutex<T: ?Sized, R = Spin> {
inner: InnerMutex<T, R>,
@@ -132,7 +146,9 @@ impl<T, R> Mutex<T, R> {
/// ```
#[inline(always)]
pub const fn new(value: T) -> Self {
- Self { inner: InnerMutex::new(value) }
+ Self {
+ inner: InnerMutex::new(value),
+ }
}
/// Consumes this [`Mutex`] and unwraps the underlying data.
diff --git a/src/mutex/fair.rs b/src/mutex/fair.rs
new file mode 100644
index 0000000..db07ad6
--- /dev/null
+++ b/src/mutex/fair.rs
@@ -0,0 +1,735 @@
+//! A spinning mutex with a fairer unlock algorithm.
+//!
+//! This mutex is similar to the `SpinMutex` in that it uses spinning to avoid
+//! context switches. However, it uses a fairer unlock algorithm that avoids
+//! starvation of threads that are waiting for the lock.
+
+use crate::{
+ atomic::{AtomicUsize, Ordering},
+ RelaxStrategy, Spin,
+};
+use core::{
+ cell::UnsafeCell,
+ fmt,
+ marker::PhantomData,
+ mem::ManuallyDrop,
+ ops::{Deref, DerefMut},
+};
+
+// The lowest bit of `lock` is used to indicate whether the mutex is locked or not. The rest of the bits are used to
+// store the number of starving threads.
+const LOCKED: usize = 1;
+const STARVED: usize = 2;
+
+/// Number chosen by fair roll of the dice, adjust as needed.
+const STARVATION_SPINS: usize = 1024;
+
+/// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data, but with a fairer
+/// algorithm.
+///
+/// # Example
+///
+/// ```
+/// use spin;
+///
+/// let lock = spin::mutex::FairMutex::<_>::new(0);
+///
+/// // Modify the data
+/// *lock.lock() = 2;
+///
+/// // Read the data
+/// let answer = *lock.lock();
+/// assert_eq!(answer, 2);
+/// ```
+///
+/// # Thread safety example
+///
+/// ```
+/// use spin;
+/// use std::sync::{Arc, Barrier};
+///
+/// let thread_count = 1000;
+/// let spin_mutex = Arc::new(spin::mutex::FairMutex::<_>::new(0));
+///
+/// // We use a barrier to ensure the readout happens after all writing
+/// let barrier = Arc::new(Barrier::new(thread_count + 1));
+///
+/// for _ in (0..thread_count) {
+/// let my_barrier = barrier.clone();
+/// let my_lock = spin_mutex.clone();
+/// std::thread::spawn(move || {
+/// let mut guard = my_lock.lock();
+/// *guard += 1;
+///
+/// // Release the lock to prevent a deadlock
+/// drop(guard);
+/// my_barrier.wait();
+/// });
+/// }
+///
+/// barrier.wait();
+///
+/// let answer = { *spin_mutex.lock() };
+/// assert_eq!(answer, thread_count);
+/// ```
+pub struct FairMutex<T: ?Sized, R = Spin> {
+ phantom: PhantomData<R>,
+ pub(crate) lock: AtomicUsize,
+ data: UnsafeCell<T>,
+}
+
+/// A guard that provides mutable data access.
+///
+/// When the guard falls out of scope it will release the lock.
+pub struct FairMutexGuard<'a, T: ?Sized + 'a> {
+ lock: &'a AtomicUsize,
+ data: *mut T,
+}
+
+/// A handle that indicates that we have been trying to acquire the lock for a while.
+///
+/// This handle is used to prevent starvation.
+pub struct Starvation<'a, T: ?Sized + 'a, R> {
+ lock: &'a FairMutex<T, R>,
+}
+
+/// Indicates whether a lock was rejected due to the lock being held by another thread or due to starvation.
+#[derive(Debug)]
+pub enum LockRejectReason {
+ /// The lock was rejected due to the lock being held by another thread.
+ Locked,
+
+ /// The lock was rejected due to starvation.
+ Starved,
+}
+
+// Same unsafe impls as `std::sync::Mutex`
+unsafe impl<T: ?Sized + Send, R> Sync for FairMutex<T, R> {}
+unsafe impl<T: ?Sized + Send, R> Send for FairMutex<T, R> {}
+
+unsafe impl<T: ?Sized + Sync> Sync for FairMutexGuard<'_, T> {}
+unsafe impl<T: ?Sized + Send> Send for FairMutexGuard<'_, T> {}
+
+impl<T, R> FairMutex<T, R> {
+ /// Creates a new [`FairMutex`] wrapping the supplied data.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use spin::mutex::FairMutex;
+ ///
+ /// static MUTEX: FairMutex<()> = FairMutex::<_>::new(());
+ ///
+ /// fn demo() {
+ /// let lock = MUTEX.lock();
+ /// // do something with lock
+ /// drop(lock);
+ /// }
+ /// ```
+ #[inline(always)]
+ pub const fn new(data: T) -> Self {
+ FairMutex {
+ lock: AtomicUsize::new(0),
+ data: UnsafeCell::new(data),
+ phantom: PhantomData,
+ }
+ }
+
+ /// Consumes this [`FairMutex`] and unwraps the underlying data.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// let lock = spin::mutex::FairMutex::<_>::new(42);
+ /// assert_eq!(42, lock.into_inner());
+ /// ```
+ #[inline(always)]
+ pub fn into_inner(self) -> T {
+ // We know statically that there are no outstanding references to
+ // `self` so there's no need to lock.
+ let FairMutex { data, .. } = self;
+ data.into_inner()
+ }
+
+ /// Returns a mutable pointer to the underlying data.
+ ///
+ /// This is mostly meant to be used for applications which require manual unlocking, but where
+ /// storing both the lock and the pointer to the inner data gets inefficient.
+ ///
+ /// # Example
+ /// ```
+ /// let lock = spin::mutex::FairMutex::<_>::new(42);
+ ///
+ /// unsafe {
+ /// core::mem::forget(lock.lock());
+ ///
+ /// assert_eq!(lock.as_mut_ptr().read(), 42);
+ /// lock.as_mut_ptr().write(58);
+ ///
+ /// lock.force_unlock();
+ /// }
+ ///
+ /// assert_eq!(*lock.lock(), 58);
+ ///
+ /// ```
+ #[inline(always)]
+ pub fn as_mut_ptr(&self) -> *mut T {
+ self.data.get()
+ }
+}
+
+impl<T: ?Sized, R: RelaxStrategy> FairMutex<T, R> {
+ /// Locks the [`FairMutex`] and returns a guard that permits access to the inner data.
+ ///
+ /// The returned value may be dereferenced for data access
+ /// and the lock will be dropped when the guard falls out of scope.
+ ///
+ /// ```
+ /// let lock = spin::mutex::FairMutex::<_>::new(0);
+ /// {
+ /// let mut data = lock.lock();
+ /// // The lock is now locked and the data can be accessed
+ /// *data += 1;
+ /// // The lock is implicitly dropped at the end of the scope
+ /// }
+ /// ```
+ #[inline(always)]
+ pub fn lock(&self) -> FairMutexGuard<T> {
+ // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock`
+ // when called in a loop.
+ let mut spins = 0;
+ while self
+ .lock
+ .compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ // Wait until the lock looks unlocked before retrying
+ while self.is_locked() {
+ R::relax();
+
+ // If we've been spinning for a while, switch to a fairer strategy that will prevent
+ // newer users from stealing our lock from us.
+ if spins > STARVATION_SPINS {
+ return self.starve().lock();
+ }
+ spins += 1;
+ }
+ }
+
+ FairMutexGuard {
+ lock: &self.lock,
+ data: unsafe { &mut *self.data.get() },
+ }
+ }
+}
+
+impl<T: ?Sized, R> FairMutex<T, R> {
+ /// Returns `true` if the lock is currently held.
+ ///
+ /// # Safety
+ ///
+ /// This function provides no synchronization guarantees and so its result should be considered 'out of date'
+ /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic.
+ #[inline(always)]
+ pub fn is_locked(&self) -> bool {
+ self.lock.load(Ordering::Relaxed) & LOCKED != 0
+ }
+
+ /// Force unlock this [`FairMutex`].
+ ///
+ /// # Safety
+ ///
+ /// This is *extremely* unsafe if the lock is not held by the current
+ /// thread. However, this can be useful in some instances for exposing the
+ /// lock to FFI that doesn't know how to deal with RAII.
+ #[inline(always)]
+ pub unsafe fn force_unlock(&self) {
+ self.lock.fetch_and(!LOCKED, Ordering::Release);
+ }
+
+ /// Try to lock this [`FairMutex`], returning a lock guard if successful.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// let lock = spin::mutex::FairMutex::<_>::new(42);
+ ///
+ /// let maybe_guard = lock.try_lock();
+ /// assert!(maybe_guard.is_some());
+ ///
+ /// // `maybe_guard` is still held, so the second call fails
+ /// let maybe_guard2 = lock.try_lock();
+ /// assert!(maybe_guard2.is_none());
+ /// ```
+ #[inline(always)]
+ pub fn try_lock(&self) -> Option<FairMutexGuard<T>> {
+ self.try_lock_starver().ok()
+ }
+
+ /// Tries to lock this [`FairMutex`] and returns a result that indicates whether the lock was
+ /// rejected due to a starver or not.
+ #[inline(always)]
+ pub fn try_lock_starver(&self) -> Result<FairMutexGuard<T>, LockRejectReason> {
+ match self
+ .lock
+ .compare_exchange(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
+ .unwrap_or_else(|x| x)
+ {
+ 0 => Ok(FairMutexGuard {
+ lock: &self.lock,
+ data: unsafe { &mut *self.data.get() },
+ }),
+ LOCKED => Err(LockRejectReason::Locked),
+ _ => Err(LockRejectReason::Starved),
+ }
+ }
+
+ /// Indicates that the current user has been waiting for the lock for a while
+ /// and that the lock should yield to this thread over a newly arriving thread.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// let lock = spin::mutex::FairMutex::<_>::new(42);
+ ///
+ /// // Lock the mutex to simulate it being used by another user.
+ /// let guard1 = lock.lock();
+ ///
+ /// // Try to lock the mutex.
+ /// let guard2 = lock.try_lock();
+ /// assert!(guard2.is_none());
+ ///
+ /// // Wait for a while.
+ /// wait_for_a_while();
+ ///
+ /// // We are now starved, indicate as such.
+ /// let starve = lock.starve();
+ ///
+ /// // Once the lock is released, another user trying to lock it will
+ /// // fail.
+ /// drop(guard1);
+ /// let guard3 = lock.try_lock();
+ /// assert!(guard3.is_none());
+ ///
+ /// // However, we will be able to lock it.
+ /// let guard4 = starve.try_lock();
+ /// assert!(guard4.is_ok());
+ ///
+ /// # fn wait_for_a_while() {}
+ /// ```
+ pub fn starve(&self) -> Starvation<'_, T, R> {
+ // Add a new starver to the state.
+ if self.lock.fetch_add(STARVED, Ordering::Relaxed) > (core::isize::MAX - 1) as usize {
+ // In the event of a potential lock overflow, abort.
+ crate::abort();
+ }
+
+ Starvation { lock: self }
+ }
+
+ /// Returns a mutable reference to the underlying data.
+ ///
+ /// Since this call borrows the [`FairMutex`] mutably, and a mutable reference is guaranteed to be exclusive in
+ /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As
+ /// such, this is a 'zero-cost' operation.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// let mut lock = spin::mutex::FairMutex::<_>::new(0);
+ /// *lock.get_mut() = 10;
+ /// assert_eq!(*lock.lock(), 10);
+ /// ```
+ #[inline(always)]
+ pub fn get_mut(&mut self) -> &mut T {
+ // We know statically that there are no other references to `self`, so
+ // there's no need to lock the inner mutex.
+ unsafe { &mut *self.data.get() }
+ }
+}
+
+impl<T: ?Sized + fmt::Debug, R> fmt::Debug for FairMutex<T, R> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ struct LockWrapper<'a, T: ?Sized + fmt::Debug>(Option<FairMutexGuard<'a, T>>);
+
+ impl<T: ?Sized + fmt::Debug> fmt::Debug for LockWrapper<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match &self.0 {
+ Some(guard) => fmt::Debug::fmt(guard, f),
+ None => f.write_str("<locked>"),
+ }
+ }
+ }
+
+ f.debug_struct("FairMutex")
+ .field("data", &LockWrapper(self.try_lock()))
+ .finish()
+ }
+}
+
+impl<T: ?Sized + Default, R> Default for FairMutex<T, R> {
+ fn default() -> Self {
+ Self::new(Default::default())
+ }
+}
+
+impl<T, R> From<T> for FairMutex<T, R> {
+ fn from(data: T) -> Self {
+ Self::new(data)
+ }
+}
+
+impl<'a, T: ?Sized> FairMutexGuard<'a, T> {
+ /// Leak the lock guard, yielding a mutable reference to the underlying data.
+ ///
+ /// Note that this function will permanently lock the original [`FairMutex`].
+ ///
+ /// ```
+ /// let mylock = spin::mutex::FairMutex::<_>::new(0);
+ ///
+ /// let data: &mut i32 = spin::mutex::FairMutexGuard::leak(mylock.lock());
+ ///
+ /// *data = 1;
+ /// assert_eq!(*data, 1);
+ /// ```
+ #[inline(always)]
+ pub fn leak(this: Self) -> &'a mut T {
+ // Use ManuallyDrop to avoid stacked-borrow invalidation
+ let mut this = ManuallyDrop::new(this);
+ // We know statically that only we are referencing data
+ unsafe { &mut *this.data }
+ }
+}
+
+impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for FairMutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fmt::Debug::fmt(&**self, f)
+ }
+}
+
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for FairMutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fmt::Display::fmt(&**self, f)
+ }
+}
+
+impl<'a, T: ?Sized> Deref for FairMutexGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ // We know statically that only we are referencing data
+ unsafe { &*self.data }
+ }
+}
+
+impl<'a, T: ?Sized> DerefMut for FairMutexGuard<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ // We know statically that only we are referencing data
+ unsafe { &mut *self.data }
+ }
+}
+
+impl<'a, T: ?Sized> Drop for FairMutexGuard<'a, T> {
+ /// The dropping of the MutexGuard will release the lock it was created from.
+ fn drop(&mut self) {
+ self.lock.fetch_and(!LOCKED, Ordering::Release);
+ }
+}
+
+impl<'a, T: ?Sized, R> Starvation<'a, T, R> {
+ /// Attempts the lock the mutex if we are the only starving user.
+ ///
+ /// This allows another user to lock the mutex if they are starving as well.
+ pub fn try_lock_fair(self) -> Result<FairMutexGuard<'a, T>, Self> {
+ // Try to lock the mutex.
+ if self
+ .lock
+ .lock
+ .compare_exchange(
+ STARVED,
+ STARVED | LOCKED,
+ Ordering::Acquire,
+ Ordering::Relaxed,
+ )
+ .is_ok()
+ {
+ // We are the only starving user, lock the mutex.
+ Ok(FairMutexGuard {
+ lock: &self.lock.lock,
+ data: self.lock.data.get(),
+ })
+ } else {
+ // Another user is starving, fail.
+ Err(self)
+ }
+ }
+
+ /// Attempts to lock the mutex.
+ ///
+ /// If the lock is currently held by another thread, this will return `None`.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// let lock = spin::mutex::FairMutex::<_>::new(42);
+ ///
+ /// // Lock the mutex to simulate it being used by another user.
+ /// let guard1 = lock.lock();
+ ///
+ /// // Try to lock the mutex.
+ /// let guard2 = lock.try_lock();
+ /// assert!(guard2.is_none());
+ ///
+ /// // Wait for a while.
+ /// wait_for_a_while();
+ ///
+ /// // We are now starved, indicate as such.
+ /// let starve = lock.starve();
+ ///
+ /// // Once the lock is released, another user trying to lock it will
+ /// // fail.
+ /// drop(guard1);
+ /// let guard3 = lock.try_lock();
+ /// assert!(guard3.is_none());
+ ///
+ /// // However, we will be able to lock it.
+ /// let guard4 = starve.try_lock();
+ /// assert!(guard4.is_ok());
+ ///
+ /// # fn wait_for_a_while() {}
+ /// ```
+ pub fn try_lock(self) -> Result<FairMutexGuard<'a, T>, Self> {
+ // Try to lock the mutex.
+ if self.lock.lock.fetch_or(LOCKED, Ordering::Acquire) & LOCKED == 0 {
+ // We have successfully locked the mutex.
+ // By dropping `self` here, we decrement the starvation count.
+ Ok(FairMutexGuard {
+ lock: &self.lock.lock,
+ data: self.lock.data.get(),
+ })
+ } else {
+ Err(self)
+ }
+ }
+}
+
+impl<'a, T: ?Sized, R: RelaxStrategy> Starvation<'a, T, R> {
+ /// Locks the mutex.
+ pub fn lock(mut self) -> FairMutexGuard<'a, T> {
+ // Try to lock the mutex.
+ loop {
+ match self.try_lock() {
+ Ok(lock) => return lock,
+ Err(starve) => self = starve,
+ }
+
+ // Relax until the lock is released.
+ while self.lock.is_locked() {
+ R::relax();
+ }
+ }
+ }
+}
+
+impl<'a, T: ?Sized, R> Drop for Starvation<'a, T, R> {
+ fn drop(&mut self) {
+ // As there is no longer a user being starved, we decrement the starver count.
+ self.lock.lock.fetch_sub(STARVED, Ordering::Release);
+ }
+}
+
+impl fmt::Display for LockRejectReason {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ LockRejectReason::Locked => write!(f, "locked"),
+ LockRejectReason::Starved => write!(f, "starved"),
+ }
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for LockRejectReason {}
+
+#[cfg(feature = "lock_api")]
+unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for FairMutex<(), R> {
+ type GuardMarker = lock_api_crate::GuardSend;
+
+ const INIT: Self = Self::new(());
+
+ fn lock(&self) {
+ // Prevent guard destructor running
+ core::mem::forget(Self::lock(self));
+ }
+
+ fn try_lock(&self) -> bool {
+ // Prevent guard destructor running
+ Self::try_lock(self).map(core::mem::forget).is_some()
+ }
+
+ unsafe fn unlock(&self) {
+ self.force_unlock();
+ }
+
+ fn is_locked(&self) -> bool {
+ Self::is_locked(self)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::prelude::v1::*;
+
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::mpsc::channel;
+ use std::sync::Arc;
+ use std::thread;
+
+ type FairMutex<T> = super::FairMutex<T>;
+
+ #[derive(Eq, PartialEq, Debug)]
+ struct NonCopy(i32);
+
+ #[test]
+ fn smoke() {
+ let m = FairMutex::<_>::new(());
+ drop(m.lock());
+ drop(m.lock());
+ }
+
+ #[test]
+ fn lots_and_lots() {
+ static M: FairMutex<()> = FairMutex::<_>::new(());
+ static mut CNT: u32 = 0;
+ const J: u32 = 1000;
+ const K: u32 = 3;
+
+ fn inc() {
+ for _ in 0..J {
+ unsafe {
+ let _g = M.lock();
+ CNT += 1;
+ }
+ }
+ }
+
+ let (tx, rx) = channel();
+ for _ in 0..K {
+ let tx2 = tx.clone();
+ thread::spawn(move || {
+ inc();
+ tx2.send(()).unwrap();
+ });
+ let tx2 = tx.clone();
+ thread::spawn(move || {
+ inc();
+ tx2.send(()).unwrap();
+ });
+ }
+
+ drop(tx);
+ for _ in 0..2 * K {
+ rx.recv().unwrap();
+ }
+ assert_eq!(unsafe { CNT }, J * K * 2);
+ }
+
+ #[test]
+ fn try_lock() {
+ let mutex = FairMutex::<_>::new(42);
+
+ // First lock succeeds
+ let a = mutex.try_lock();
+ assert_eq!(a.as_ref().map(|r| **r), Some(42));
+
+ // Additional lock fails
+ let b = mutex.try_lock();
+ assert!(b.is_none());
+
+ // After dropping lock, it succeeds again
+ ::core::mem::drop(a);
+ let c = mutex.try_lock();
+ assert_eq!(c.as_ref().map(|r| **r), Some(42));
+ }
+
+ #[test]
+ fn test_into_inner() {
+ let m = FairMutex::<_>::new(NonCopy(10));
+ assert_eq!(m.into_inner(), NonCopy(10));
+ }
+
+ #[test]
+ fn test_into_inner_drop() {
+ struct Foo(Arc<AtomicUsize>);
+ impl Drop for Foo {
+ fn drop(&mut self) {
+ self.0.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+ let num_drops = Arc::new(AtomicUsize::new(0));
+ let m = FairMutex::<_>::new(Foo(num_drops.clone()));
+ assert_eq!(num_drops.load(Ordering::SeqCst), 0);
+ {
+ let _inner = m.into_inner();
+ assert_eq!(num_drops.load(Ordering::SeqCst), 0);
+ }
+ assert_eq!(num_drops.load(Ordering::SeqCst), 1);
+ }
+
+ #[test]
+ fn test_mutex_arc_nested() {
+ // Tests nested mutexes and access
+ // to underlying data.
+ let arc = Arc::new(FairMutex::<_>::new(1));
+ let arc2 = Arc::new(FairMutex::<_>::new(arc));
+ let (tx, rx) = channel();
+ let _t = thread::spawn(move || {
+ let lock = arc2.lock();
+ let lock2 = lock.lock();
+ assert_eq!(*lock2, 1);
+ tx.send(()).unwrap();
+ });
+ rx.recv().unwrap();
+ }
+
+ #[test]
+ fn test_mutex_arc_access_in_unwind() {
+ let arc = Arc::new(FairMutex::<_>::new(1));
+ let arc2 = arc.clone();
+ let _ = thread::spawn(move || -> () {
+ struct Unwinder {
+ i: Arc<FairMutex<i32>>,
+ }
+ impl Drop for Unwinder {
+ fn drop(&mut self) {
+ *self.i.lock() += 1;
+ }
+ }
+ let _u = Unwinder { i: arc2 };
+ panic!();
+ })
+ .join();
+ let lock = arc.lock();
+ assert_eq!(*lock, 2);
+ }
+
+ #[test]
+ fn test_mutex_unsized() {
+ let mutex: &FairMutex<[i32]> = &FairMutex::<_>::new([1, 2, 3]);
+ {
+ let b = &mut *mutex.lock();
+ b[0] = 4;
+ b[2] = 5;
+ }
+ let comp: &[i32] = &[4, 2, 5];
+ assert_eq!(&*mutex.lock(), comp);
+ }
+
+ #[test]
+ fn test_mutex_force_lock() {
+ let lock = FairMutex::<_>::new(());
+ ::std::mem::forget(lock.lock());
+ unsafe {
+ lock.force_unlock();
+ }
+ assert!(lock.try_lock().is_some());
+ }
+}
diff --git a/src/mutex/spin.rs b/src/mutex/spin.rs
index fce3eb9..561d765 100644
--- a/src/mutex/spin.rs
+++ b/src/mutex/spin.rs
@@ -3,14 +3,17 @@
//! Waiting threads hammer an atomic variable until it becomes available. Best-case latency is low, but worst-case
//! latency is theoretically infinite.
+use crate::{
+ atomic::{AtomicBool, Ordering},
+ RelaxStrategy, Spin,
+};
use core::{
cell::UnsafeCell,
fmt,
- ops::{Deref, DerefMut},
- sync::atomic::{AtomicBool, Ordering},
marker::PhantomData,
+ mem::ManuallyDrop,
+ ops::{Deref, DerefMut},
};
-use crate::{RelaxStrategy, Spin};
/// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data.
///
@@ -41,9 +44,11 @@ use crate::{RelaxStrategy, Spin};
/// // We use a barrier to ensure the readout happens after all writing
/// let barrier = Arc::new(Barrier::new(thread_count + 1));
///
+/// # let mut ts = Vec::new();
/// for _ in (0..thread_count) {
/// let my_barrier = barrier.clone();
/// let my_lock = spin_mutex.clone();
+/// # let t =
/// std::thread::spawn(move || {
/// let mut guard = my_lock.lock();
/// *guard += 1;
@@ -52,12 +57,17 @@ use crate::{RelaxStrategy, Spin};
/// drop(guard);
/// my_barrier.wait();
/// });
+/// # ts.push(t);
/// }
///
/// barrier.wait();
///
/// let answer = { *spin_mutex.lock() };
/// assert_eq!(answer, thread_count);
+///
+/// # for t in ts {
+/// # t.join().unwrap();
+/// # }
/// ```
pub struct SpinMutex<T: ?Sized, R = Spin> {
phantom: PhantomData<R>,
@@ -70,12 +80,15 @@ pub struct SpinMutex<T: ?Sized, R = Spin> {
/// When the guard falls out of scope it will release the lock.
pub struct SpinMutexGuard<'a, T: ?Sized + 'a> {
lock: &'a AtomicBool,
- data: &'a mut T,
+ data: *mut T,
}
// Same unsafe impls as `std::sync::Mutex`
-unsafe impl<T: ?Sized + Send> Sync for SpinMutex<T> {}
-unsafe impl<T: ?Sized + Send> Send for SpinMutex<T> {}
+unsafe impl<T: ?Sized + Send, R> Sync for SpinMutex<T, R> {}
+unsafe impl<T: ?Sized + Send, R> Send for SpinMutex<T, R> {}
+
+unsafe impl<T: ?Sized + Sync> Sync for SpinMutexGuard<'_, T> {}
+unsafe impl<T: ?Sized + Send> Send for SpinMutexGuard<'_, T> {}
impl<T, R> SpinMutex<T, R> {
/// Creates a new [`SpinMutex`] wrapping the supplied data.
@@ -129,7 +142,7 @@ impl<T, R> SpinMutex<T, R> {
///
/// unsafe {
/// core::mem::forget(lock.lock());
- ///
+ ///
/// assert_eq!(lock.as_mut_ptr().read(), 42);
/// lock.as_mut_ptr().write(58);
///
@@ -164,7 +177,11 @@ impl<T: ?Sized, R: RelaxStrategy> SpinMutex<T, R> {
pub fn lock(&self) -> SpinMutexGuard<T> {
// Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock`
// when called in a loop.
- while self.lock.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+ while self
+ .lock
+ .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
// Wait until the lock looks unlocked before retrying
while self.is_locked() {
R::relax();
@@ -220,7 +237,11 @@ impl<T: ?Sized, R> SpinMutex<T, R> {
pub fn try_lock(&self) -> Option<SpinMutexGuard<T>> {
// The reason for using a strong compare_exchange is explained here:
// https://github.com/Amanieu/parking_lot/pull/207#issuecomment-575869107
- if self.lock.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_ok() {
+ if self
+ .lock
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
+ .is_ok()
+ {
Some(SpinMutexGuard {
lock: &self.lock,
data: unsafe { &mut *self.data.get() },
@@ -289,9 +310,10 @@ impl<'a, T: ?Sized> SpinMutexGuard<'a, T> {
/// ```
#[inline(always)]
pub fn leak(this: Self) -> &'a mut T {
- let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing
- core::mem::forget(this);
- unsafe { &mut *data }
+ // Use ManuallyDrop to avoid stacked-borrow invalidation
+ let mut this = ManuallyDrop::new(this);
+ // We know statically that only we are referencing data
+ unsafe { &mut *this.data }
}
}
@@ -310,13 +332,15 @@ impl<'a, T: ?Sized + fmt::Display> fmt::Display for SpinMutexGuard<'a, T> {
impl<'a, T: ?Sized> Deref for SpinMutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
- self.data
+ // We know statically that only we are referencing data
+ unsafe { &*self.data }
}
}
impl<'a, T: ?Sized> DerefMut for SpinMutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
- self.data
+ // We know statically that only we are referencing data
+ unsafe { &mut *self.data }
}
}
@@ -390,17 +414,18 @@ mod tests {
}
let (tx, rx) = channel();
+ let mut ts = Vec::new();
for _ in 0..K {
let tx2 = tx.clone();
- thread::spawn(move || {
+ ts.push(thread::spawn(move || {
inc();
tx2.send(()).unwrap();
- });
+ }));
let tx2 = tx.clone();
- thread::spawn(move || {
+ ts.push(thread::spawn(move || {
inc();
tx2.send(()).unwrap();
- });
+ }));
}
drop(tx);
@@ -408,6 +433,10 @@ mod tests {
rx.recv().unwrap();
}
assert_eq!(unsafe { CNT }, J * K * 2);
+
+ for t in ts {
+ t.join().unwrap();
+ }
}
#[test]
@@ -418,7 +447,7 @@ mod tests {
let a = mutex.try_lock();
assert_eq!(a.as_ref().map(|r| **r), Some(42));
- // Additional lock failes
+ // Additional lock fails
let b = mutex.try_lock();
assert!(b.is_none());
@@ -459,13 +488,14 @@ mod tests {
let arc = Arc::new(SpinMutex::<_>::new(1));
let arc2 = Arc::new(SpinMutex::<_>::new(arc));
let (tx, rx) = channel();
- let _t = thread::spawn(move || {
+ let t = thread::spawn(move || {
let lock = arc2.lock();
let lock2 = lock.lock();
assert_eq!(*lock2, 1);
tx.send(()).unwrap();
});
rx.recv().unwrap();
+ t.join().unwrap();
}
#[test]
diff --git a/src/mutex/ticket.rs b/src/mutex/ticket.rs
index 128b434..01b905e 100644
--- a/src/mutex/ticket.rs
+++ b/src/mutex/ticket.rs
@@ -5,18 +5,20 @@
//! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the
//! queue to finish.
+use crate::{
+ atomic::{AtomicUsize, Ordering},
+ RelaxStrategy, Spin,
+};
use core::{
cell::UnsafeCell,
fmt,
- ops::{Deref, DerefMut},
- sync::atomic::{AtomicUsize, Ordering},
marker::PhantomData,
+ ops::{Deref, DerefMut},
};
-use crate::{RelaxStrategy, Spin};
/// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data.
///
-/// A ticket lock is analagous to a queue management system for lock requests. When a thread tries to take a lock, it
+/// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it
/// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the
/// next ticket will be processed.
///
@@ -84,8 +86,8 @@ pub struct TicketMutexGuard<'a, T: ?Sized + 'a> {
data: &'a mut T,
}
-unsafe impl<T: ?Sized + Send> Sync for TicketMutex<T> {}
-unsafe impl<T: ?Sized + Send> Send for TicketMutex<T> {}
+unsafe impl<T: ?Sized + Send, R> Sync for TicketMutex<T, R> {}
+unsafe impl<T: ?Sized + Send, R> Send for TicketMutex<T, R> {}
impl<T, R> TicketMutex<T, R> {
/// Creates a new [`TicketMutex`] wrapping the supplied data.
@@ -136,7 +138,7 @@ impl<T, R> TicketMutex<T, R> {
///
/// unsafe {
/// core::mem::forget(lock.lock());
- ///
+ ///
/// assert_eq!(lock.as_mut_ptr().read(), 42);
/// lock.as_mut_ptr().write(58);
///
@@ -440,7 +442,7 @@ mod tests {
let a = mutex.try_lock();
assert_eq!(a.as_ref().map(|r| **r), Some(42));
- // Additional lock failes
+ // Additional lock fails
let b = mutex.try_lock();
assert!(b.is_none());
diff --git a/src/once.rs b/src/once.rs
index e4aadee..5f0186d 100644
--- a/src/once.rs
+++ b/src/once.rs
@@ -1,13 +1,10 @@
- //! Synchronization primitives for one-time evaluation.
-
-use core::{
- cell::UnsafeCell,
- mem::MaybeUninit,
- sync::atomic::{AtomicU8, Ordering},
- marker::PhantomData,
- fmt,
+//! Synchronization primitives for one-time evaluation.
+
+use crate::{
+ atomic::{AtomicU8, Ordering},
+ RelaxStrategy, Spin,
};
-use crate::{RelaxStrategy, Spin};
+use core::{cell::UnsafeCell, fmt, marker::PhantomData, mem::MaybeUninit};
/// A primitive that provides lazy one-time initialization.
///
@@ -34,13 +31,19 @@ pub struct Once<T = (), R = Spin> {
data: UnsafeCell<MaybeUninit<T>>,
}
+impl<T, R> Default for Once<T, R> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl<T: fmt::Debug, R> fmt::Debug for Once<T, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.get() {
Some(s) => write!(f, "Once {{ data: ")
- .and_then(|()| s.fmt(f))
- .and_then(|()| write!(f, "}}")),
- None => write!(f, "Once {{ <uninitialized> }}")
+ .and_then(|()| s.fmt(f))
+ .and_then(|()| write!(f, "}}")),
+ None => write!(f, "Once {{ <uninitialized> }}"),
}
}
}
@@ -99,14 +102,22 @@ mod status {
self.0.store(status as u8, ordering);
}
#[inline(always)]
- pub fn compare_exchange(&self, old: Status, new: Status, success: Ordering, failure: Ordering) -> Result<Status, Status> {
- match self.0.compare_exchange(old as u8, new as u8, success, failure) {
+ pub fn compare_exchange(
+ &self,
+ old: Status,
+ new: Status,
+ success: Ordering,
+ failure: Ordering,
+ ) -> Result<Status, Status> {
+ match self
+ .0
+ .compare_exchange(old as u8, new as u8, success, failure)
+ {
// SAFETY: A compare exchange will always return a value that was later stored into
// the atomic u8, but due to the invariant that it must be a valid Status, we know
// that both Ok(_) and Err(_) will be safely transmutable.
-
Ok(ok) => Ok(unsafe { Status::new_unchecked(ok) }),
- Err(err) => Ok(unsafe { Status::new_unchecked(err) }),
+ Err(err) => Err(unsafe { Status::new_unchecked(err) }),
}
}
#[inline(always)]
@@ -117,7 +128,7 @@ mod status {
}
}
}
-use self::status::{Status, AtomicStatus};
+use self::status::{AtomicStatus, Status};
use core::hint::unreachable_unchecked as unreachable;
@@ -157,6 +168,46 @@ impl<T, R: RelaxStrategy> Once<T, R> {
/// }
/// ```
pub fn call_once<F: FnOnce() -> T>(&self, f: F) -> &T {
+ match self.try_call_once(|| Ok::<T, core::convert::Infallible>(f())) {
+ Ok(x) => x,
+ Err(void) => match void {},
+ }
+ }
+
+ /// This method is similar to `call_once`, but allows the given closure to
+ /// fail, and lets the `Once` in a uninitialized state if it does.
+ ///
+ /// This method will block the calling thread if another initialization
+ /// routine is currently running.
+ ///
+ /// When this function returns without error, it is guaranteed that some
+ /// initialization has run and completed (it may not be the closure
+ /// specified). The returned reference will point to the result from the
+ /// closure that was run.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if the [`Once`] previously panicked while attempting
+ /// to initialize. This is similar to the poisoning behaviour of `std::sync`'s
+ /// primitives.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use spin;
+ ///
+ /// static INIT: spin::Once<usize> = spin::Once::new();
+ ///
+ /// fn get_cached_val() -> Result<usize, String> {
+ /// INIT.try_call_once(expensive_fallible_computation).map(|x| *x)
+ /// }
+ ///
+ /// fn expensive_fallible_computation() -> Result<usize, String> {
+ /// // ...
+ /// # Ok(2)
+ /// }
+ /// ```
+ pub fn try_call_once<F: FnOnce() -> Result<T, E>, E>(&self, f: F) -> Result<&T, E> {
// SAFETY: We perform an Acquire load because if this were to return COMPLETE, then we need
// the preceding stores done while initializing, to become visible after this load.
let mut status = self.status.load(Ordering::Acquire);
@@ -181,16 +232,27 @@ impl<T, R: RelaxStrategy> Once<T, R> {
Ordering::Acquire,
) {
Ok(_must_be_state_incomplete) => {
- // The compare-exchange suceeded, so we shall initialize it.
+ // The compare-exchange succeeded, so we shall initialize it.
// We use a guard (Finish) to catch panics caused by builder
- let finish = Finish { status: &self.status };
+ let finish = Finish {
+ status: &self.status,
+ };
+ let val = match f() {
+ Ok(val) => val,
+ Err(err) => {
+ // If an error occurs, clean up everything and leave.
+ core::mem::forget(finish);
+ self.status.store(Status::Incomplete, Ordering::Release);
+ return Err(err);
+ }
+ };
unsafe {
// SAFETY:
// `UnsafeCell`/deref: currently the only accessor, mutably
// and immutably by cas exclusion.
// `write`: pointer comes from `MaybeUninit`.
- (*self.data.get()).as_mut_ptr().write(f())
+ (*self.data.get()).as_mut_ptr().write(val);
};
// If there were to be a panic with unwind enabled, the code would
// short-circuit and never reach the point where it writes the inner data.
@@ -214,7 +276,7 @@ impl<T, R: RelaxStrategy> Once<T, R> {
self.status.store(Status::Complete, Ordering::Release);
// This next line is mainly an optimization.
- return unsafe { self.force_get() };
+ return unsafe { Ok(self.force_get()) };
}
// The compare-exchange failed, so we know for a fact that the status cannot be
// INCOMPLETE, or it would have succeeded.
@@ -222,29 +284,27 @@ impl<T, R: RelaxStrategy> Once<T, R> {
}
}
- match status {
+ Ok(match status {
// SAFETY: We have either checked with an Acquire load, that the status is COMPLETE, or
// initialized it ourselves, in which case no additional synchronization is needed.
Status::Complete => unsafe { self.force_get() },
Status::Panicked => panic!("Once panicked"),
- Status::Running => self
- .poll()
- .unwrap_or_else(|| {
- if cfg!(debug_assertions) {
- unreachable!("Encountered INCOMPLETE when polling Once")
- } else {
- // SAFETY: This poll is guaranteed never to fail because the API of poll
- // promises spinning if initialization is in progress. We've already
- // checked that initialisation is in progress, and initialisation is
- // monotonic: once done, it cannot be undone. We also fetched the status
- // with Acquire semantics, thereby guaranteeing that the later-executed
- // poll will also agree with us that initialization is in progress. Ergo,
- // this poll cannot fail.
- unsafe {
- unreachable();
- }
+ Status::Running => self.poll().unwrap_or_else(|| {
+ if cfg!(debug_assertions) {
+ unreachable!("Encountered INCOMPLETE when polling Once")
+ } else {
+ // SAFETY: This poll is guaranteed never to fail because the API of poll
+ // promises spinning if initialization is in progress. We've already
+ // checked that initialisation is in progress, and initialisation is
+ // monotonic: once done, it cannot be undone. We also fetched the status
+ // with Acquire semantics, thereby guaranteeing that the later-executed
+ // poll will also agree with us that initialization is in progress. Ergo,
+ // this poll cannot fail.
+ unsafe {
+ unreachable();
}
- }),
+ }
+ }),
// SAFETY: The only invariant possible in addition to the aforementioned ones at the
// moment, is INCOMPLETE. However, the only way for this match statement to be
@@ -252,8 +312,7 @@ impl<T, R: RelaxStrategy> Once<T, R> {
// which case we know for a fact that the state cannot be changed back to INCOMPLETE as
// `Once`s are monotonic.
Status::Incomplete => unsafe { unreachable() },
- }
-
+ })
}
/// Spins until the [`Once`] contains a value.
@@ -309,7 +368,7 @@ impl<T, R> Once<T, R> {
};
/// Creates a new [`Once`].
- pub const fn new() -> Self{
+ pub const fn new() -> Self {
Self::INIT
}
@@ -395,6 +454,23 @@ impl<T, R> Once<T, R> {
}
}
+ /// Returns a mutable reference to the inner value
+ ///
+ /// # Safety
+ ///
+ /// This is *extremely* unsafe if the `Once` has not already been initialized because a reference to uninitialized
+ /// memory will be returned, immediately triggering undefined behaviour (even if the reference goes unused).
+ /// However, this can be useful in some instances for exposing the `Once` to FFI or when the overhead of atomically
+ /// checking initialization is unacceptable and the `Once` has already been initialized.
+ pub unsafe fn get_mut_unchecked(&mut self) -> &mut T {
+ debug_assert_eq!(
+ self.status.load(Ordering::SeqCst),
+ Status::Complete,
+ "Attempted to access an unintialized Once. If this was to run without debug checks, this would be undefined behavior. This is a serious bug and you must fix it.",
+ );
+ self.force_get_mut()
+ }
+
/// Returns a the inner value if the [`Once`] has been initialized.
///
/// Because this method requires ownership of the [`Once`], no synchronization overhead
@@ -406,6 +482,22 @@ impl<T, R> Once<T, R> {
}
}
+ /// Returns a the inner value if the [`Once`] has been initialized.
+ /// # Safety
+ ///
+ /// This is *extremely* unsafe if the `Once` has not already been initialized because a reference to uninitialized
+ /// memory will be returned, immediately triggering undefined behaviour (even if the reference goes unused)
+ /// This can be useful, if `Once` has already been initialized, and you want to bypass an
+ /// option check.
+ pub unsafe fn into_inner_unchecked(self) -> T {
+ debug_assert_eq!(
+ self.status.load(Ordering::SeqCst),
+ Status::Complete,
+ "Attempted to access an unintialized Once. If this was to run without debug checks, this would be undefined behavior. This is a serious bug and you must fix it.",
+ );
+ self.force_into_inner()
+ }
+
/// Checks whether the value has been initialized.
///
/// This is done using [`Acquire`](core::sync::atomic::Ordering::Acquire) ordering, and
@@ -485,10 +577,13 @@ mod tests {
static mut RUN: bool = false;
let (tx, rx) = channel();
+ let mut ts = Vec::new();
for _ in 0..10 {
let tx = tx.clone();
- thread::spawn(move|| {
- for _ in 0..4 { thread::yield_now() }
+ ts.push(thread::spawn(move || {
+ for _ in 0..4 {
+ thread::yield_now()
+ }
unsafe {
O.call_once(|| {
assert!(!RUN);
@@ -497,7 +592,7 @@ mod tests {
assert!(RUN);
}
tx.send(()).unwrap();
- });
+ }));
}
unsafe {
@@ -511,6 +606,10 @@ mod tests {
for _ in 0..10 {
rx.recv().unwrap();
}
+
+ for t in ts {
+ t.join().unwrap();
+ }
}
#[test]
@@ -527,12 +626,16 @@ mod tests {
static INIT: Once<usize> = Once::new();
assert!(INIT.get().is_none());
- thread::spawn(move|| {
- INIT.call_once(|| loop { });
+ let t = thread::spawn(move || {
+ INIT.call_once(|| {
+ thread::sleep(std::time::Duration::from_secs(3));
+ 42
+ });
});
assert!(INIT.get().is_none());
- }
+ t.join().unwrap();
+ }
#[test]
fn poll() {
@@ -543,26 +646,29 @@ mod tests {
assert_eq!(INIT.poll().map(|r| *r), Some(3));
}
-
#[test]
fn wait() {
static INIT: Once<usize> = Once::new();
- std::thread::spawn(|| {
+ let t = std::thread::spawn(|| {
assert_eq!(*INIT.wait(), 3);
assert!(INIT.is_completed());
});
- for _ in 0..4 { thread::yield_now() }
+ for _ in 0..4 {
+ thread::yield_now()
+ }
assert!(INIT.poll().is_none());
INIT.call_once(|| 3);
+
+ t.join().unwrap();
}
#[test]
#[ignore = "Android uses panic_abort"]
fn panic() {
- use ::std::panic;
+ use std::panic;
static INIT: Once = Once::new();
@@ -601,8 +707,11 @@ mod tests {
}
}
+ // This is sort of two test cases, but if we write them as separate test methods
+ // they can be executed concurrently and then fail some small fraction of the
+ // time.
#[test]
- fn drop_occurs() {
+ fn drop_occurs_and_skip_uninit_drop() {
unsafe {
CALLED = false;
}
@@ -612,13 +721,8 @@ mod tests {
once.call_once(|| DropTest {});
}
- assert!(unsafe {
- CALLED
- });
- }
-
- #[test]
- fn skip_uninit_drop() {
+ assert!(unsafe { CALLED });
+ // Now test that we skip drops for the uninitialized case.
unsafe {
CALLED = false;
}
@@ -626,8 +730,35 @@ mod tests {
let once = Once::<DropTest>::new();
drop(once);
- assert!(unsafe {
- !CALLED
- });
+ assert!(unsafe { !CALLED });
+ }
+
+ #[test]
+ fn call_once_test() {
+ for _ in 0..20 {
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::Arc;
+ use std::time::Duration;
+ let share = Arc::new(AtomicUsize::new(0));
+ let once = Arc::new(Once::<_, Spin>::new());
+ let mut hs = Vec::new();
+ for _ in 0..8 {
+ let h = thread::spawn({
+ let share = share.clone();
+ let once = once.clone();
+ move || {
+ thread::sleep(Duration::from_millis(10));
+ once.call_once(|| {
+ share.fetch_add(1, Ordering::SeqCst);
+ });
+ }
+ });
+ hs.push(h);
+ }
+ for h in hs {
+ h.join().unwrap();
+ }
+ assert_eq!(1, share.load(Ordering::SeqCst));
+ }
}
}
diff --git a/src/relax.rs b/src/relax.rs
index 6d9a690..8842f80 100644
--- a/src/relax.rs
+++ b/src/relax.rs
@@ -23,7 +23,10 @@ pub struct Spin;
impl RelaxStrategy for Spin {
#[inline(always)]
fn relax() {
- core::hint::spin_loop();
+ // Use the deprecated spin_loop_hint() to ensure that we don't get
+ // a higher MSRV than we need to.
+ #[allow(deprecated)]
+ core::sync::atomic::spin_loop_hint();
}
}
diff --git a/src/rwlock.rs b/src/rwlock.rs
index 28602c9..beae5c1 100644
--- a/src/rwlock.rs
+++ b/src/rwlock.rs
@@ -1,14 +1,17 @@
//! A lock that provides data access to either one writer or many readers.
+use crate::{
+ atomic::{AtomicUsize, Ordering},
+ RelaxStrategy, Spin,
+};
use core::{
cell::UnsafeCell,
- ops::{Deref, DerefMut},
- sync::atomic::{AtomicUsize, Ordering},
- marker::PhantomData,
fmt,
+ marker::PhantomData,
mem,
+ mem::ManuallyDrop,
+ ops::{Deref, DerefMut},
};
-use crate::{RelaxStrategy, Spin};
/// A lock that provides data access to either one writer or many readers.
///
@@ -79,7 +82,7 @@ const WRITER: usize = 1;
/// potentially releasing the lock.
pub struct RwLockReadGuard<'a, T: 'a + ?Sized> {
lock: &'a AtomicUsize,
- data: &'a T,
+ data: *const T,
}
/// A guard that provides mutable data access.
@@ -88,7 +91,7 @@ pub struct RwLockReadGuard<'a, T: 'a + ?Sized> {
pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> {
phantom: PhantomData<R>,
inner: &'a RwLock<T, R>,
- data: &'a mut T,
+ data: *mut T,
}
/// A guard that provides immutable data access but can be upgraded to [`RwLockWriteGuard`].
@@ -101,13 +104,22 @@ pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> {
pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized, R = Spin> {
phantom: PhantomData<R>,
inner: &'a RwLock<T, R>,
- data: &'a T,
+ data: *const T,
}
// Same unsafe impls as `std::sync::RwLock`
unsafe impl<T: ?Sized + Send, R> Send for RwLock<T, R> {}
unsafe impl<T: ?Sized + Send + Sync, R> Sync for RwLock<T, R> {}
+unsafe impl<T: ?Sized + Send + Sync, R> Send for RwLockWriteGuard<'_, T, R> {}
+unsafe impl<T: ?Sized + Send + Sync, R> Sync for RwLockWriteGuard<'_, T, R> {}
+
+unsafe impl<T: ?Sized + Sync> Send for RwLockReadGuard<'_, T> {}
+unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
+
+unsafe impl<T: ?Sized + Send + Sync, R> Send for RwLockUpgradableGuard<'_, T, R> {}
+unsafe impl<T: ?Sized + Send + Sync, R> Sync for RwLockUpgradableGuard<'_, T, R> {}
+
impl<T, R> RwLock<T, R> {
/// Creates a new spinlock wrapping the supplied data.
///
@@ -155,7 +167,7 @@ impl<T, R> RwLock<T, R> {
///
/// unsafe {
/// core::mem::forget(lock.write());
- ///
+ ///
/// assert_eq!(lock.as_mut_ptr().read(), 42);
/// lock.as_mut_ptr().write(58);
///
@@ -245,6 +257,21 @@ impl<T: ?Sized, R: RelaxStrategy> RwLock<T, R> {
}
impl<T: ?Sized, R> RwLock<T, R> {
+ // Acquire a read lock, returning the new lock value.
+ fn acquire_reader(&self) -> usize {
+ // An arbitrary cap that allows us to catch overflows long before they happen
+ const MAX_READERS: usize = core::usize::MAX / READER / 2;
+
+ let value = self.lock.fetch_add(READER, Ordering::Acquire);
+
+ if value > MAX_READERS * READER {
+ self.lock.fetch_sub(READER, Ordering::Relaxed);
+ panic!("Too many lock readers, cannot safely proceed");
+ } else {
+ value
+ }
+ }
+
/// Attempt to acquire this lock with shared read access.
///
/// This function will never block and will return immediately if `read`
@@ -269,7 +296,7 @@ impl<T: ?Sized, R> RwLock<T, R> {
/// ```
#[inline]
pub fn try_read(&self) -> Option<RwLockReadGuard<T>> {
- let value = self.lock.fetch_add(READER, Ordering::Acquire);
+ let value = self.acquire_reader();
// We check the UPGRADED bit here so that new readers are prevented when an UPGRADED lock is held.
// This helps reduce writer starvation.
@@ -398,18 +425,18 @@ impl<T: ?Sized, R> RwLock<T, R> {
}
}
- /// Returns a mutable reference to the underlying data.
- ///
- /// Since this call borrows the `RwLock` mutably, no actual locking needs to
- /// take place -- the mutable borrow statically guarantees no locks exist.
- ///
- /// # Examples
- ///
- /// ```
- /// let mut lock = spin::RwLock::new(0);
- /// *lock.get_mut() = 10;
- /// assert_eq!(*lock.read(), 10);
- /// ```
+ /// Returns a mutable reference to the underlying data.
+ ///
+ /// Since this call borrows the `RwLock` mutably, no actual locking needs to
+ /// take place -- the mutable borrow statically guarantees no locks exist.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// let mut lock = spin::RwLock::new(0);
+ /// *lock.get_mut() = 10;
+ /// assert_eq!(*lock.read(), 10);
+ /// ```
pub fn get_mut(&mut self) -> &mut T {
// We know statically that there are no other references to `self`, so
// there's no need to lock the inner lock.
@@ -454,8 +481,9 @@ impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> {
/// ```
#[inline]
pub fn leak(this: Self) -> &'rwlock T {
- let Self { data, .. } = this;
- data
+ let this = ManuallyDrop::new(this);
+ // Safety: We know statically that only we are referencing data
+ unsafe { &*this.data }
}
}
@@ -554,7 +582,7 @@ impl<'rwlock, T: ?Sized, R> RwLockUpgradableGuard<'rwlock, T, R> {
/// ```
pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> {
// Reserve the read guard for ourselves
- self.inner.lock.fetch_add(READER, Ordering::Acquire);
+ self.inner.acquire_reader();
let inner = self.inner;
@@ -580,8 +608,9 @@ impl<'rwlock, T: ?Sized, R> RwLockUpgradableGuard<'rwlock, T, R> {
/// ```
#[inline]
pub fn leak(this: Self) -> &'rwlock T {
- let Self { data, .. } = this;
- data
+ let this = ManuallyDrop::new(this);
+ // Safety: We know statically that only we are referencing data
+ unsafe { &*this.data }
}
}
@@ -613,7 +642,7 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> {
#[inline]
pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> {
// Reserve the read guard for ourselves
- self.inner.lock.fetch_add(READER, Ordering::Acquire);
+ self.inner.acquire_reader();
let inner = self.inner;
@@ -639,7 +668,10 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> {
/// ```
#[inline]
pub fn downgrade_to_upgradeable(self) -> RwLockUpgradableGuard<'rwlock, T, R> {
- debug_assert_eq!(self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED), WRITER);
+ debug_assert_eq!(
+ self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED),
+ WRITER
+ );
// Reserve the read guard for ourselves
self.inner.lock.store(UPGRADED, Ordering::Release);
@@ -670,9 +702,9 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> {
/// ```
#[inline]
pub fn leak(this: Self) -> &'rwlock mut T {
- let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing
- core::mem::forget(this);
- unsafe { &mut *data }
+ let mut this = ManuallyDrop::new(this);
+ // Safety: We know statically that only we are referencing data
+ unsafe { &mut *this.data }
}
}
@@ -692,7 +724,8 @@ impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> {
type Target = T;
fn deref(&self) -> &T {
- self.data
+ // Safety: We know statically that only we are referencing data
+ unsafe { &*self.data }
}
}
@@ -700,7 +733,8 @@ impl<'rwlock, T: ?Sized, R> Deref for RwLockUpgradableGuard<'rwlock, T, R> {
type Target = T;
fn deref(&self) -> &T {
- self.data
+ // Safety: We know statically that only we are referencing data
+ unsafe { &*self.data }
}
}
@@ -708,13 +742,15 @@ impl<'rwlock, T: ?Sized, R> Deref for RwLockWriteGuard<'rwlock, T, R> {
type Target = T;
fn deref(&self) -> &T {
- self.data
+ // Safety: We know statically that only we are referencing data
+ unsafe { &*self.data }
}
}
impl<'rwlock, T: ?Sized, R> DerefMut for RwLockWriteGuard<'rwlock, T, R> {
fn deref_mut(&mut self) -> &mut T {
- self.data
+ // Safety: We know statically that only we are referencing data
+ unsafe { &mut *self.data }
}
}
@@ -741,7 +777,9 @@ impl<'rwlock, T: ?Sized, R> Drop for RwLockWriteGuard<'rwlock, T, R> {
// Writer is responsible for clearing both WRITER and UPGRADED bits.
// The UPGRADED bit may be set if an upgradeable lock attempts an upgrade while this lock is held.
- self.inner.lock.fetch_and(!(WRITER | UPGRADED), Ordering::Release);
+ self.inner
+ .lock
+ .fetch_and(!(WRITER | UPGRADED), Ordering::Release);
}
}
@@ -825,7 +863,9 @@ unsafe impl<R: RelaxStrategy> lock_api_crate::RawRwLockUpgrade for RwLock<(), R>
#[inline(always)]
fn try_lock_upgradable(&self) -> bool {
// Prevent guard destructor running
- self.try_upgradeable_read().map(|g| core::mem::forget(g)).is_some()
+ self.try_upgradeable_read()
+ .map(|g| core::mem::forget(g))
+ .is_some()
}
#[inline(always)]
@@ -854,7 +894,10 @@ unsafe impl<R: RelaxStrategy> lock_api_crate::RawRwLockUpgrade for RwLock<(), R>
data: &(),
phantom: PhantomData,
};
- tmp_guard.try_upgrade().map(|g| core::mem::forget(g)).is_ok()
+ tmp_guard
+ .try_upgrade()
+ .map(|g| core::mem::forget(g))
+ .is_ok()
}
}
@@ -947,7 +990,7 @@ mod tests {
let arc2 = arc.clone();
let (tx, rx) = channel();
- thread::spawn(move || {
+ let t = thread::spawn(move || {
let mut lock = arc2.write();
for _ in 0..10 {
let tmp = *lock;
@@ -977,6 +1020,8 @@ mod tests {
rx.recv().unwrap();
let lock = arc.read();
assert_eq!(*lock, 10);
+
+ assert!(t.join().is_ok());
}
#[test]