summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Murashkin <iam@google.com>2019-02-11 15:37:26 -0800
committerKirk Shoop <kirk.shoop@gmail.com>2019-02-12 17:53:44 -0800
commitaac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a (patch)
tree81bf553d46f17e9d81b32f9885cb95437fe40756
parentaee39b961ea5dd4b1fe3a7b7090a2bd7e13300fe (diff)
downloadRxCpp-aac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a.tar.gz
rxcpp: Fix data race in composite_subscription
composite_subscription_inner had missing checks which could lead to add/remove/clear racing against unsubscribe. (See the issue for more details). Fixes: #475
-rw-r--r--Rx/v2/src/rxcpp/rx-subscription.hpp109
1 files changed, 102 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index ee4e53e..2d6bcb6 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -117,6 +117,14 @@ private:
std::terminate();
}
}
+
+ explicit subscription(std::shared_ptr<base_subscription_state> s)
+ : state(std::move(s))
+ {
+ if (!state) {
+ std::terminate();
+ }
+ }
public:
subscription()
@@ -178,9 +186,23 @@ public:
weak_state_type get_weak() {
return state;
}
+
+ // Atomically promote weak subscription to strong.
+ // Calls std::terminate if w has already expired.
static subscription lock(weak_state_type w) {
return subscription(w);
}
+
+ // Atomically try to promote weak subscription to strong.
+ // Returns an empty maybe<> if w has already expired.
+ static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
+ auto strong_subscription = w.lock();
+ if (!strong_subscription) {
+ return rxu::detail::maybe<subscription>{};
+ } else {
+ return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
+ }
+ }
};
inline bool operator<(const subscription& lhs, const subscription& rhs) {
@@ -223,8 +245,14 @@ private:
typedef subscription::weak_state_type weak_subscription;
struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
{
+ // invariant: cannot access this data without the lock held.
std::set<subscription> subscriptions;
+ // double checked locking:
+ // issubscribed must be loaded again after each lock acquisition.
+ // invariant:
+ // never call subscription::unsubscribe with lock held.
std::mutex lock;
+ // invariant: transitions from 'true' to 'false' exactly once, at any time.
std::atomic<bool> issubscribed;
~composite_subscription_state()
@@ -242,29 +270,78 @@ private:
{
}
+ // Atomically add 's' to the set of subscriptions.
+ //
+ // If unsubscribe() has already occurred, this immediately
+ // calls s.unsubscribe().
+ //
+ // cs.unsubscribe() [must] happens-before s.unsubscribe()
+ //
+ // Due to the un-atomic nature of calling 's.unsubscribe()',
+ // it is possible to observe the unintuitive
+ // add(s)=>s.unsubscribe() prior
+ // to any of the unsubscribe()=>sN.unsubscribe().
inline weak_subscription add(subscription s) {
- if (!issubscribed) {
+ if (!issubscribed) { // load.acq [seq_cst]
s.unsubscribe();
} else if (s.is_subscribed()) {
std::unique_lock<decltype(lock)> guard(lock);
- subscriptions.insert(s);
+ if (!issubscribed) { // load.acq [seq_cst]
+ // unsubscribe was called concurrently.
+ guard.unlock();
+ // invariant: do not call unsubscribe with lock held.
+ s.unsubscribe();
+ } else {
+ subscriptions.insert(s);
+ }
}
return s.get_weak();
}
+ // Atomically remove 'w' from the set of subscriptions.
+ //
+ // This does nothing if 'w' was already previously removed,
+ // or refers to an expired value.
inline void remove(weak_subscription w) {
- if (issubscribed && !w.expired()) {
- auto s = subscription::lock(w);
+ if (issubscribed) { // load.acq [seq_cst]
+ rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
+
+ if (maybe_subscription.empty()) {
+ // Do nothing if the subscription has already expired.
+ return;
+ }
+
std::unique_lock<decltype(lock)> guard(lock);
- subscriptions.erase(std::move(s));
+ // invariant: subscriptions must be accessed under the lock.
+
+ if (issubscribed) { // load.acq [seq_cst]
+ subscription& s = maybe_subscription.get();
+ subscriptions.erase(std::move(s));
+ } // else unsubscribe() was called concurrently; this becomes a no-op.
}
}
+ // Atomically clear all subscriptions that were observably added
+ // (and not subsequently observably removed).
+ //
+ // Un-atomically call unsubscribe on those subscriptions.
+ //
+ // forall subscriptions in {add(s1),add(s2),...}
+ // - {remove(s3), remove(s4), ...}:
+ // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
+ //
+ // cs.unsubscribe() observed-before cs.clear ==> do nothing.
inline void clear() {
- if (issubscribed) {
+ if (issubscribed) { // load.acq [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);
+ if (!issubscribed) { // load.acq [seq_cst]
+ // unsubscribe was called concurrently.
+ return;
+ }
+
std::set<subscription> v(std::move(subscriptions));
+ // invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
@@ -272,11 +349,29 @@ private:
}
}
+ // Atomically clear all subscriptions that were observably added
+ // (and not subsequently observably removed).
+ //
+ // Un-atomically call unsubscribe on those subscriptions.
+ //
+ // Switches to an 'unsubscribed' state, all subsequent
+ // adds are immediately unsubscribed.
+ //
+ // cs.unsubscribe() [must] happens-before
+ // cs.add(s) ==> s.unsubscribe()
+ //
+ // forall subscriptions in {add(s1),add(s2),...}
+ // - {remove(s3), remove(s4), ...}:
+ // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
inline void unsubscribe() {
- if (issubscribed.exchange(false)) {
+ if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);
+ // is_subscribed can only transition to 'false' once,
+ // does not need an extra atomic access here.
+
std::set<subscription> v(std::move(subscriptions));
+ // invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {