diff options
Diffstat (limited to 'pw_sync/condition_variable_test.cc')
-rw-r--r-- | pw_sync/condition_variable_test.cc | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/pw_sync/condition_variable_test.cc b/pw_sync/condition_variable_test.cc new file mode 100644 index 000000000..c0ddc4df0 --- /dev/null +++ b/pw_sync/condition_variable_test.cc @@ -0,0 +1,356 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_sync/condition_variable.h" + +#include <chrono> +#include <functional> + +#include "gtest/gtest.h" +#include "pw_containers/vector.h" +#include "pw_sync/mutex.h" +#include "pw_sync/timed_thread_notification.h" +#include "pw_thread/sleep.h" +#include "pw_thread/test_threads.h" +#include "pw_thread/thread.h" + +namespace pw::sync { +namespace { + +using namespace std::chrono_literals; + +// A timeout for tests where successful behaviour involves waiting. +constexpr auto kRequiredTimeout = 100ms; + +// Maximum extra wait time allowed for test that ensure something waits for +// `kRequiredTimeout`. +const auto kAllowedSlack = kRequiredTimeout * 1.5; + +// A timeout that should only be hit if something goes wrong. +constexpr auto kFailureTimeout = 5s; + +using StateLock = std::unique_lock<Mutex>; + +struct ThreadInfo { + explicit ThreadInfo(int id) : thread_id(id) {} + + // waiting_notifier is signalled in predicates to indicate that the predicate + // has been evaluated. This guarantees (via insider information) that the + // thread will acquire the internal ThreadNotification. + TimedThreadNotification waiting_notifier; + + // Signals when the worker thread is done. + TimedThreadNotification done_notifier; + + // The result of the predicate the worker thread uses with wait*(). Set from + // the main test thread and read by the worker thread. + bool predicate_result = false; + + // Stores the result of ConditionVariable::wait_for() or ::wait_until() for + // use in test asserts. + bool wait_result = false; + + // For use in recording the order in which threads block on a condition. + const int thread_id; + + // Returns a function which will return the current value of + //`predicate_result` and release `waiting_notifier`. + std::function<bool()> Predicate() { + return [this]() { + bool result = this->predicate_result; + this->waiting_notifier.release(); + return result; + }; + } +}; + +// A `ThreadCore` implementation that delegates to an `std::function`. +class LambdaThreadCore : public pw::thread::ThreadCore { + public: + explicit LambdaThreadCore(std::function<void()> work) + : work_(std::move(work)) {} + + private: + void Run() override { work_(); } + + std::function<void()> work_; +}; + +class LambdaThread { + public: + // Starts a new thread which runs `work`, joining the thread on destruction. + explicit LambdaThread( + std::function<void()> work, + pw::thread::Options options = pw::thread::test::TestOptionsThread0()) + : thread_core_(std::move(work)), thread_(options, thread_core_) {} + ~LambdaThread() { thread_.join(); } + LambdaThread(const LambdaThread&) = delete; + LambdaThread(LambdaThread&&) = delete; + LambdaThread& operator=(const LambdaThread&) = delete; + LambdaThread&& operator=(LambdaThread&&) = delete; + + private: + LambdaThreadCore thread_core_; + pw::thread::Thread thread_; +}; + +TEST(Wait, PredicateTrueNoWait) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + + LambdaThread thread([&mutex, &condvar, &info = thread_info] { + StateLock l{mutex}; + condvar.wait(l, [] { return true; }); + + info.done_notifier.release(); + }); + EXPECT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); +} + +TEST(NotifyOne, BlocksUntilSignaled) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + + LambdaThread thread([&mutex, &condvar, &info = thread_info] { + StateLock l{mutex}; + condvar.wait(l, info.Predicate()); + info.done_notifier.release(); + }); + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + { + StateLock l{mutex}; + thread_info.predicate_result = true; + } + condvar.notify_one(); + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); +} + +TEST(NotifyOne, UnblocksOne) { + Mutex mutex; + ConditionVariable condvar; + std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)}; + pw::Vector<int, 2> wait_order; + + LambdaThread thread_1( + [&mutex, &condvar, &info = thread_info[0], &wait_order] { + StateLock l{mutex}; + auto predicate = [&info, &wait_order] { + wait_order.push_back(info.thread_id); + auto result = info.predicate_result; + info.waiting_notifier.release(); + return result; + }; + condvar.wait(l, predicate); + info.done_notifier.release(); + }, + pw::thread::test::TestOptionsThread0()); + LambdaThread thread_2( + [&mutex, &condvar, &info = thread_info[1], &wait_order] { + StateLock l{mutex}; + auto predicate = [&info, &wait_order] { + wait_order.push_back(info.thread_id); + auto result = info.predicate_result; + info.waiting_notifier.release(); + return result; + }; + condvar.wait(l, predicate); + info.done_notifier.release(); + }, + pw::thread::test::TestOptionsThread1()); + + ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout)); + + { + StateLock l{mutex}; + thread_info[1].predicate_result = true; + thread_info[0].predicate_result = true; + } + condvar.notify_one(); + ASSERT_TRUE(thread_info[wait_order[0]].done_notifier.try_acquire_for( + kFailureTimeout)); + ASSERT_FALSE(thread_info[wait_order[0]].done_notifier.try_acquire()); + condvar.notify_one(); + ASSERT_TRUE(thread_info[wait_order[1]].done_notifier.try_acquire_for( + kFailureTimeout)); +} + +TEST(NotifyAll, UnblocksMultiple) { + Mutex mutex; + ConditionVariable condvar; + std::array<ThreadInfo, 2> thread_info = {ThreadInfo(0), ThreadInfo(1)}; + + LambdaThread thread_1( + [&mutex, &condvar, &info = thread_info[0]] { + StateLock l{mutex}; + condvar.wait(l, info.Predicate()); + info.done_notifier.release(); + }, + pw::thread::test::TestOptionsThread0()); + LambdaThread thread_2( + [&mutex, &condvar, &info = thread_info[1]] { + StateLock l{mutex}; + condvar.wait(l, info.Predicate()); + info.done_notifier.release(); + }, + pw::thread::test::TestOptionsThread1()); + + ASSERT_TRUE(thread_info[0].waiting_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info[1].waiting_notifier.try_acquire_for(kFailureTimeout)); + { + StateLock l{mutex}; + thread_info[0].predicate_result = true; + thread_info[1].predicate_result = true; + } + condvar.notify_all(); + ASSERT_TRUE(thread_info[0].done_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info[1].done_notifier.try_acquire_for(kFailureTimeout)); +} + +TEST(WaitFor, ReturnsTrueIfSignalled) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + + LambdaThread thread([&mutex, &condvar, &info = thread_info] { + StateLock l{mutex}; + info.wait_result = condvar.wait_for(l, kFailureTimeout, info.Predicate()); + info.done_notifier.release(); + }); + + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + { + StateLock l{mutex}; + thread_info.predicate_result = true; + } + condvar.notify_one(); + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info.wait_result); +} + +TEST(WaitFor, ReturnsFalseIfTimesOut) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + + LambdaThread thread([&mutex, &condvar, &info = thread_info] { + StateLock l{mutex}; + info.wait_result = condvar.wait_for(l, 0ms, info.Predicate()); + info.done_notifier.release(); + }); + + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_FALSE(thread_info.wait_result); +} + +// NOTE: This test waits even in successful circumstances. +TEST(WaitFor, TimeoutApproximatelyCorrect) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + pw::chrono::SystemClock::duration wait_duration{}; + + LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] { + StateLock l{mutex}; + auto start = pw::chrono::SystemClock::now(); + info.wait_result = condvar.wait_for(l, kRequiredTimeout, info.Predicate()); + wait_duration = pw::chrono::SystemClock::now() - start; + info.done_notifier.release(); + }); + + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + // Wake up thread multiple times. Make sure the timeout is observed. + for (int i = 0; i < 5; ++i) { + condvar.notify_one(); + pw::this_thread::sleep_for(kRequiredTimeout / 6); + } + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); + EXPECT_FALSE(thread_info.wait_result); + EXPECT_GE(wait_duration, kRequiredTimeout); + EXPECT_LT(wait_duration, (kRequiredTimeout + kAllowedSlack)); +} + +TEST(WaitUntil, ReturnsTrueIfSignalled) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + + LambdaThread thread([&mutex, &condvar, &info = thread_info] { + StateLock l{mutex}; + info.wait_result = condvar.wait_until( + l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate()); + info.done_notifier.release(); + }); + + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + { + StateLock l{mutex}; + thread_info.predicate_result = true; + } + condvar.notify_one(); + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info.wait_result); +} + +// NOTE: This test waits even in successful circumstances. +TEST(WaitUntil, ReturnsFalseIfTimesOut) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + + LambdaThread thread([&mutex, &condvar, &info = thread_info] { + StateLock l{mutex}; + info.wait_result = condvar.wait_until( + l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate()); + info.done_notifier.release(); + }); + + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_FALSE(thread_info.wait_result); +} + +// NOTE: This test waits even in successful circumstances. +TEST(WaitUntil, TimeoutApproximatelyCorrect) { + Mutex mutex; + ConditionVariable condvar; + ThreadInfo thread_info(0); + pw::chrono::SystemClock::duration wait_duration{}; + + LambdaThread thread([&mutex, &condvar, &info = thread_info, &wait_duration] { + StateLock l{mutex}; + auto start = pw::chrono::SystemClock::now(); + info.wait_result = condvar.wait_until( + l, pw::chrono::SystemClock::now() + kRequiredTimeout, info.Predicate()); + wait_duration = pw::chrono::SystemClock::now() - start; + info.done_notifier.release(); + }); + + ASSERT_TRUE(thread_info.waiting_notifier.try_acquire_for(kFailureTimeout)); + // Wake up thread multiple times. Make sure the timeout is observed. + for (int i = 0; i < 5; ++i) { + condvar.notify_one(); + pw::this_thread::sleep_for(kRequiredTimeout / 6); + } + ASSERT_TRUE(thread_info.done_notifier.try_acquire_for(kFailureTimeout)); + ASSERT_FALSE(thread_info.wait_result); + ASSERT_GE(wait_duration, kRequiredTimeout); + ASSERT_LE(wait_duration, kRequiredTimeout + kAllowedSlack); +} + +} // namespace +} // namespace pw::sync |