diff options
Diffstat (limited to 'pw_work_queue/public/pw_work_queue/work_queue.h')
-rw-r--r-- | pw_work_queue/public/pw_work_queue/work_queue.h | 98 |
1 files changed, 60 insertions, 38 deletions
diff --git a/pw_work_queue/public/pw_work_queue/work_queue.h b/pw_work_queue/public/pw_work_queue/work_queue.h index 7afb74e02..de1fe25d9 100644 --- a/pw_work_queue/public/pw_work_queue/work_queue.h +++ b/pw_work_queue/public/pw_work_queue/work_queue.h @@ -17,6 +17,7 @@ #include <array> #include <cstdint> +#include "pw_containers/inline_queue.h" #include "pw_function/function.h" #include "pw_metric/metric.h" #include "pw_span/span.h" @@ -25,51 +26,75 @@ #include "pw_sync/lock_annotations.h" #include "pw_sync/thread_notification.h" #include "pw_thread/thread_core.h" -#include "pw_work_queue/internal/circular_buffer.h" namespace pw::work_queue { using WorkItem = Function<void()>; -// The WorkQueue class enables threads and interrupts to enqueue work as a -// pw::work_queue::WorkItem for execution by the work queue. -// -// The entire API is thread and interrupt safe. +/// Enables threads and interrupts to enqueue work as a +/// `pw::work_queue::WorkItem` for execution by the work queue. +/// +/// **Queue sizing**: The number of outstanding work requests is limited +/// based on the internal queue size. The queue size is set through either +/// the size of the `queue_storage` buffer passed into the constructor or by +/// using the templated `pw::work_queue::WorkQueueWithBuffer` helper. When the +/// queue is full, the queue will not accept further work. +/// +/// **Cooperative thread cancellation**: The class is a +/// `pw::thread::ThreadCore`, meaning it should be executed as a single thread. +/// To facilitate clean shutdown, it provides a `RequestStop()` method for +/// cooperative cancellation which should be invoked before joining the thread. +/// Once a stop has been requested the queue will no longer accept further work. +/// +/// The entire API is thread-safe and interrupt-safe. class WorkQueue : public thread::ThreadCore { public: - // Note: the ThreadNotification prevents this from being constexpr. - explicit WorkQueue(span<WorkItem> queue_storage) - : stop_requested_(false), circular_buffer_(queue_storage) {} + /// @param[in] queue The work entries to enqueue. + /// + /// @param[in] queue_capacity The internal queue size which limits the number + /// of outstanding work requests. + /// + /// @note The `ThreadNotification` prevents this from being `constexpr`. + WorkQueue(InlineQueue<WorkItem>& queue, size_t queue_capacity) + : stop_requested_(false), queue_(queue) { + min_queue_remaining_.Set(static_cast<uint32_t>(queue_capacity)); + } - // Enqueues a work_item for execution by the work queue thread. - // - // Returns: - // Ok - Success, entry was enqueued for execution. - // FailedPrecondition - the work queue is shutting down, entries are no - // longer permitted. - // ResourceExhausted - internal work queue is full, entry was not enqueued. + /// Enqueues a `work_item` for execution by the work queue thread. + /// + /// @param[in] work_item The entry to enqueue. + /// + /// @returns + /// * @pw_status{OK} - Success. Entry was enqueued for execution. + /// * @pw_status{FAILED_PRECONDITION} - The work queue is shutting down. + /// Entries are no longer permitted. + /// * @pw_status{RESOURCE_EXHAUSTED} - Internal work queue is full. + /// Entry was not enqueued. Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) { return InternalPushWork(std::move(work_item)); } - // Queue work for execution. Crash if the work cannot be queued due to a - // full queue or a stopped worker thread. - // - // This call is recommended where possible since it saves error handling code - // at the callsite; and in many practical cases, it is a bug if the work - // queue is full (and so a crash is useful to detect the problem). - // - // Precondition: The queue must not overflow, i.e. be full. - // Precondition: The queue must not have been requested to stop, i.e. it must - // not be in the process of shutting down. + /// Queues work for execution. Crashes if the work cannot be queued due to a + /// full queue or a stopped worker thread. + /// + /// This call is recommended where possible since it saves error handling code + /// at the callsite; and in many practical cases, it is a bug if the work + /// queue is full (and so a crash is useful to detect the problem). + /// + /// @param[in] work_item The entry to enqueue. + /// + /// @pre + /// * The queue must not overflow, i.e. be full. + /// * The queue must not have been requested to stop, i.e. it must + /// not be in the process of shutting down. void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_); - // Locks the queue to prevent further work enqueing, finishes outstanding - // work, then shuts down the worker thread. - // - // The WorkQueue cannot be resumed after stopping as the ThreadCore thread - // returns and may be joined. It must be reconstructed for re-use after - // the thread has been joined. + /// Locks the queue to prevent further work enqueing, finishes outstanding + /// work, then shuts down the worker thread. + /// + /// The `WorkQueue` cannot be resumed after stopping because the `ThreadCore` + /// thread returns and may be joined. The `WorkQueue` must be reconstructed + /// for re-use after the thread has been joined. void RequestStop() PW_LOCKS_EXCLUDED(lock_); private: @@ -78,7 +103,7 @@ class WorkQueue : public thread::ThreadCore { sync::InterruptSpinLock lock_; bool stop_requested_ PW_GUARDED_BY(lock_); - internal::CircularBuffer<WorkItem> circular_buffer_ PW_GUARDED_BY(lock_); + InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_); sync::ThreadNotification work_notification_; // TODO(ewout): The group and/or its name token should be passed as a ctor @@ -90,19 +115,16 @@ class WorkQueue : public thread::ThreadCore { // metrics work as intended. PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue"); PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u); - PW_METRIC(metrics_, - min_queue_remaining_, - "min_queue_remaining", - static_cast<uint32_t>(circular_buffer_.capacity())); + PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u); }; template <size_t kWorkQueueEntries> class WorkQueueWithBuffer : public WorkQueue { public: - constexpr WorkQueueWithBuffer() : WorkQueue(queue_storage_) {} + constexpr WorkQueueWithBuffer() : WorkQueue(queue_, kWorkQueueEntries) {} private: - std::array<WorkItem, kWorkQueueEntries> queue_storage_; + InlineQueue<WorkItem, kWorkQueueEntries> queue_; }; } // namespace pw::work_queue |