aboutsummaryrefslogtreecommitdiff
path: root/pw_work_queue/public/pw_work_queue/work_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'pw_work_queue/public/pw_work_queue/work_queue.h')
-rw-r--r--pw_work_queue/public/pw_work_queue/work_queue.h98
1 files changed, 60 insertions, 38 deletions
diff --git a/pw_work_queue/public/pw_work_queue/work_queue.h b/pw_work_queue/public/pw_work_queue/work_queue.h
index 7afb74e02..de1fe25d9 100644
--- a/pw_work_queue/public/pw_work_queue/work_queue.h
+++ b/pw_work_queue/public/pw_work_queue/work_queue.h
@@ -17,6 +17,7 @@
#include <array>
#include <cstdint>
+#include "pw_containers/inline_queue.h"
#include "pw_function/function.h"
#include "pw_metric/metric.h"
#include "pw_span/span.h"
@@ -25,51 +26,75 @@
#include "pw_sync/lock_annotations.h"
#include "pw_sync/thread_notification.h"
#include "pw_thread/thread_core.h"
-#include "pw_work_queue/internal/circular_buffer.h"
namespace pw::work_queue {
using WorkItem = Function<void()>;
-// The WorkQueue class enables threads and interrupts to enqueue work as a
-// pw::work_queue::WorkItem for execution by the work queue.
-//
-// The entire API is thread and interrupt safe.
+/// Enables threads and interrupts to enqueue work as a
+/// `pw::work_queue::WorkItem` for execution by the work queue.
+///
+/// **Queue sizing**: The number of outstanding work requests is limited
+/// based on the internal queue size. The queue size is set through either
+/// the size of the `queue_storage` buffer passed into the constructor or by
+/// using the templated `pw::work_queue::WorkQueueWithBuffer` helper. When the
+/// queue is full, the queue will not accept further work.
+///
+/// **Cooperative thread cancellation**: The class is a
+/// `pw::thread::ThreadCore`, meaning it should be executed as a single thread.
+/// To facilitate clean shutdown, it provides a `RequestStop()` method for
+/// cooperative cancellation which should be invoked before joining the thread.
+/// Once a stop has been requested the queue will no longer accept further work.
+///
+/// The entire API is thread-safe and interrupt-safe.
class WorkQueue : public thread::ThreadCore {
public:
- // Note: the ThreadNotification prevents this from being constexpr.
- explicit WorkQueue(span<WorkItem> queue_storage)
- : stop_requested_(false), circular_buffer_(queue_storage) {}
+ /// @param[in] queue The work entries to enqueue.
+ ///
+ /// @param[in] queue_capacity The internal queue size which limits the number
+ /// of outstanding work requests.
+ ///
+ /// @note The `ThreadNotification` prevents this from being `constexpr`.
+ WorkQueue(InlineQueue<WorkItem>& queue, size_t queue_capacity)
+ : stop_requested_(false), queue_(queue) {
+ min_queue_remaining_.Set(static_cast<uint32_t>(queue_capacity));
+ }
- // Enqueues a work_item for execution by the work queue thread.
- //
- // Returns:
- // Ok - Success, entry was enqueued for execution.
- // FailedPrecondition - the work queue is shutting down, entries are no
- // longer permitted.
- // ResourceExhausted - internal work queue is full, entry was not enqueued.
+ /// Enqueues a `work_item` for execution by the work queue thread.
+ ///
+ /// @param[in] work_item The entry to enqueue.
+ ///
+ /// @returns
+ /// * @pw_status{OK} - Success. Entry was enqueued for execution.
+ /// * @pw_status{FAILED_PRECONDITION} - The work queue is shutting down.
+ /// Entries are no longer permitted.
+ /// * @pw_status{RESOURCE_EXHAUSTED} - Internal work queue is full.
+ /// Entry was not enqueued.
Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
return InternalPushWork(std::move(work_item));
}
- // Queue work for execution. Crash if the work cannot be queued due to a
- // full queue or a stopped worker thread.
- //
- // This call is recommended where possible since it saves error handling code
- // at the callsite; and in many practical cases, it is a bug if the work
- // queue is full (and so a crash is useful to detect the problem).
- //
- // Precondition: The queue must not overflow, i.e. be full.
- // Precondition: The queue must not have been requested to stop, i.e. it must
- // not be in the process of shutting down.
+ /// Queues work for execution. Crashes if the work cannot be queued due to a
+ /// full queue or a stopped worker thread.
+ ///
+ /// This call is recommended where possible since it saves error handling code
+ /// at the callsite; and in many practical cases, it is a bug if the work
+ /// queue is full (and so a crash is useful to detect the problem).
+ ///
+ /// @param[in] work_item The entry to enqueue.
+ ///
+ /// @pre
+ /// * The queue must not overflow, i.e. be full.
+ /// * The queue must not have been requested to stop, i.e. it must
+ /// not be in the process of shutting down.
void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_);
- // Locks the queue to prevent further work enqueing, finishes outstanding
- // work, then shuts down the worker thread.
- //
- // The WorkQueue cannot be resumed after stopping as the ThreadCore thread
- // returns and may be joined. It must be reconstructed for re-use after
- // the thread has been joined.
+ /// Locks the queue to prevent further work enqueing, finishes outstanding
+ /// work, then shuts down the worker thread.
+ ///
+ /// The `WorkQueue` cannot be resumed after stopping because the `ThreadCore`
+ /// thread returns and may be joined. The `WorkQueue` must be reconstructed
+ /// for re-use after the thread has been joined.
void RequestStop() PW_LOCKS_EXCLUDED(lock_);
private:
@@ -78,7 +103,7 @@ class WorkQueue : public thread::ThreadCore {
sync::InterruptSpinLock lock_;
bool stop_requested_ PW_GUARDED_BY(lock_);
- internal::CircularBuffer<WorkItem> circular_buffer_ PW_GUARDED_BY(lock_);
+ InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
sync::ThreadNotification work_notification_;
// TODO(ewout): The group and/or its name token should be passed as a ctor
@@ -90,19 +115,16 @@ class WorkQueue : public thread::ThreadCore {
// metrics work as intended.
PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue");
PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u);
- PW_METRIC(metrics_,
- min_queue_remaining_,
- "min_queue_remaining",
- static_cast<uint32_t>(circular_buffer_.capacity()));
+ PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u);
};
template <size_t kWorkQueueEntries>
class WorkQueueWithBuffer : public WorkQueue {
public:
- constexpr WorkQueueWithBuffer() : WorkQueue(queue_storage_) {}
+ constexpr WorkQueueWithBuffer() : WorkQueue(queue_, kWorkQueueEntries) {}
private:
- std::array<WorkItem, kWorkQueueEntries> queue_storage_;
+ InlineQueue<WorkItem, kWorkQueueEntries> queue_;
};
} // namespace pw::work_queue