aboutsummaryrefslogtreecommitdiff
path: root/pw_async_basic/dispatcher.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_async_basic/dispatcher.cc')
-rw-r--r--pw_async_basic/dispatcher.cc61
1 files changed, 17 insertions, 44 deletions
diff --git a/pw_async_basic/dispatcher.cc b/pw_async_basic/dispatcher.cc
index 682ff4f6d..4b51956a1 100644
--- a/pw_async_basic/dispatcher.cc
+++ b/pw_async_basic/dispatcher.cc
@@ -13,16 +13,14 @@
// the License.
#include "pw_async_basic/dispatcher.h"
-#include "pw_assert/check.h"
+#include <mutex>
+
#include "pw_chrono/system_clock.h"
-#include "pw_log/log.h"
using namespace std::chrono_literals;
namespace pw::async {
-const chrono::SystemClock::duration SLEEP_DURATION = 5s;
-
BasicDispatcher::~BasicDispatcher() {
RequestStop();
lock_.lock();
@@ -70,13 +68,16 @@ void BasicDispatcher::MaybeSleep() {
// Sleep until a notification is received or until the due time of the
// next task. Notifications are sent when tasks are posted or 'stop' is
// requested.
- chrono::SystemClock::time_point wake_time =
- task_queue_.empty() ? now() + SLEEP_DURATION
- : task_queue_.front().due_time_;
-
+ std::optional<chrono::SystemClock::time_point> wake_time = std::nullopt;
+ if (!task_queue_.empty()) {
+ wake_time = task_queue_.front().due_time_;
+ }
lock_.unlock();
- PW_LOG_DEBUG("no task due; waiting for signal");
- timed_notification_.try_acquire_until(wake_time);
+ if (wake_time.has_value()) {
+ timed_notification_.try_acquire_until(*wake_time);
+ } else {
+ timed_notification_.acquire();
+ }
lock_.lock();
}
}
@@ -87,12 +88,7 @@ void BasicDispatcher::ExecuteDueTasks() {
backend::NativeTask& task = task_queue_.front();
task_queue_.pop_front();
- if (task.interval().has_value()) {
- PostTaskInternal(task, task.due_time_ + task.interval().value());
- }
-
lock_.unlock();
- PW_LOG_DEBUG("running task");
Context ctx{this, &task.task_};
task(ctx, OkStatus());
lock_.lock();
@@ -100,52 +96,27 @@ void BasicDispatcher::ExecuteDueTasks() {
}
void BasicDispatcher::RequestStop() {
- std::lock_guard lock(lock_);
- PW_LOG_DEBUG("stop requested");
- stop_requested_ = true;
+ {
+ std::lock_guard lock(lock_);
+ stop_requested_ = true;
+ }
timed_notification_.release();
}
void BasicDispatcher::DrainTaskQueue() {
- PW_LOG_DEBUG("draining task queue");
while (!task_queue_.empty()) {
backend::NativeTask& task = task_queue_.front();
task_queue_.pop_front();
lock_.unlock();
- PW_LOG_DEBUG("running cancelled task");
Context ctx{this, &task.task_};
task(ctx, Status::Cancelled());
lock_.lock();
}
}
-void BasicDispatcher::Post(Task& task) { PostAt(task, now()); }
-
-void BasicDispatcher::PostAfter(Task& task,
- chrono::SystemClock::duration delay) {
- PostAt(task, now() + delay);
-}
-
void BasicDispatcher::PostAt(Task& task, chrono::SystemClock::time_point time) {
- lock_.lock();
- PW_LOG_DEBUG("posting task");
PostTaskInternal(task.native_type(), time);
- lock_.unlock();
-}
-
-void BasicDispatcher::PostPeriodic(Task& task,
- chrono::SystemClock::duration interval) {
- PostPeriodicAt(task, interval, now());
-}
-
-void BasicDispatcher::PostPeriodicAt(
- Task& task,
- chrono::SystemClock::duration interval,
- chrono::SystemClock::time_point start_time) {
- PW_DCHECK(interval != chrono::SystemClock::duration::zero());
- task.native_type().set_interval(interval);
- PostAt(task, start_time);
}
bool BasicDispatcher::Cancel(Task& task) {
@@ -155,6 +126,7 @@ bool BasicDispatcher::Cancel(Task& task) {
void BasicDispatcher::PostTaskInternal(
backend::NativeTask& task, chrono::SystemClock::time_point time_due) {
+ lock_.lock();
task.due_time_ = time_due;
auto it_front = task_queue_.begin();
auto it_behind = task_queue_.before_begin();
@@ -163,6 +135,7 @@ void BasicDispatcher::PostTaskInternal(
++it_behind;
}
task_queue_.insert_after(it_behind, task);
+ lock_.unlock();
timed_notification_.release();
}