diff options
Diffstat (limited to 'pw_async_basic/dispatcher.cc')
-rw-r--r-- | pw_async_basic/dispatcher.cc | 61 |
1 files changed, 17 insertions, 44 deletions
diff --git a/pw_async_basic/dispatcher.cc b/pw_async_basic/dispatcher.cc index 682ff4f6d..4b51956a1 100644 --- a/pw_async_basic/dispatcher.cc +++ b/pw_async_basic/dispatcher.cc @@ -13,16 +13,14 @@ // the License. #include "pw_async_basic/dispatcher.h" -#include "pw_assert/check.h" +#include <mutex> + #include "pw_chrono/system_clock.h" -#include "pw_log/log.h" using namespace std::chrono_literals; namespace pw::async { -const chrono::SystemClock::duration SLEEP_DURATION = 5s; - BasicDispatcher::~BasicDispatcher() { RequestStop(); lock_.lock(); @@ -70,13 +68,16 @@ void BasicDispatcher::MaybeSleep() { // Sleep until a notification is received or until the due time of the // next task. Notifications are sent when tasks are posted or 'stop' is // requested. - chrono::SystemClock::time_point wake_time = - task_queue_.empty() ? now() + SLEEP_DURATION - : task_queue_.front().due_time_; - + std::optional<chrono::SystemClock::time_point> wake_time = std::nullopt; + if (!task_queue_.empty()) { + wake_time = task_queue_.front().due_time_; + } lock_.unlock(); - PW_LOG_DEBUG("no task due; waiting for signal"); - timed_notification_.try_acquire_until(wake_time); + if (wake_time.has_value()) { + timed_notification_.try_acquire_until(*wake_time); + } else { + timed_notification_.acquire(); + } lock_.lock(); } } @@ -87,12 +88,7 @@ void BasicDispatcher::ExecuteDueTasks() { backend::NativeTask& task = task_queue_.front(); task_queue_.pop_front(); - if (task.interval().has_value()) { - PostTaskInternal(task, task.due_time_ + task.interval().value()); - } - lock_.unlock(); - PW_LOG_DEBUG("running task"); Context ctx{this, &task.task_}; task(ctx, OkStatus()); lock_.lock(); @@ -100,52 +96,27 @@ void BasicDispatcher::ExecuteDueTasks() { } void BasicDispatcher::RequestStop() { - std::lock_guard lock(lock_); - PW_LOG_DEBUG("stop requested"); - stop_requested_ = true; + { + std::lock_guard lock(lock_); + stop_requested_ = true; + } timed_notification_.release(); } void BasicDispatcher::DrainTaskQueue() { - PW_LOG_DEBUG("draining task queue"); while (!task_queue_.empty()) { backend::NativeTask& task = task_queue_.front(); task_queue_.pop_front(); lock_.unlock(); - PW_LOG_DEBUG("running cancelled task"); Context ctx{this, &task.task_}; task(ctx, Status::Cancelled()); lock_.lock(); } } -void BasicDispatcher::Post(Task& task) { PostAt(task, now()); } - -void BasicDispatcher::PostAfter(Task& task, - chrono::SystemClock::duration delay) { - PostAt(task, now() + delay); -} - void BasicDispatcher::PostAt(Task& task, chrono::SystemClock::time_point time) { - lock_.lock(); - PW_LOG_DEBUG("posting task"); PostTaskInternal(task.native_type(), time); - lock_.unlock(); -} - -void BasicDispatcher::PostPeriodic(Task& task, - chrono::SystemClock::duration interval) { - PostPeriodicAt(task, interval, now()); -} - -void BasicDispatcher::PostPeriodicAt( - Task& task, - chrono::SystemClock::duration interval, - chrono::SystemClock::time_point start_time) { - PW_DCHECK(interval != chrono::SystemClock::duration::zero()); - task.native_type().set_interval(interval); - PostAt(task, start_time); } bool BasicDispatcher::Cancel(Task& task) { @@ -155,6 +126,7 @@ bool BasicDispatcher::Cancel(Task& task) { void BasicDispatcher::PostTaskInternal( backend::NativeTask& task, chrono::SystemClock::time_point time_due) { + lock_.lock(); task.due_time_ = time_due; auto it_front = task_queue_.begin(); auto it_behind = task_queue_.before_begin(); @@ -163,6 +135,7 @@ void BasicDispatcher::PostTaskInternal( ++it_behind; } task_queue_.insert_after(it_behind, task); + lock_.unlock(); timed_notification_.release(); } |