aboutsummaryrefslogtreecommitdiff
path: root/pw_stream/public/pw_stream/mpsc_stream.h
blob: ee9a0d6cb54c5367f69436e5aabd4edde8438003 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once

/// @file
/// This file defines types related to a multi-producer, single-consumer stream.
///
/// The single readers must be constructed in place, while writers can be moved.
/// A reader and writer may be connected using `CreateMpscStream()`. Additional
/// writers may be connected by copying a previously connected writer.
///
/// Example:
///
/// @code{.cpp}
///    void WriteThreadRoutine(void* arg) {
///      auto *writer = static_cast<MpscWriter *>(arg);
///      ConstByteSpan data = GenerateSomeData();
///      Status status = writer->Write(data);
///      ...
///    }
///    ...
///    MpscReader reader;
///    MpscWriter writer;
///    CreateMpscStream(reader, writer);
///    thread::Thread t(MakeThreadOptions(), WriteThreadRoutine, &writer);
///    std::byte buffer[kBufSize];
///    if (auto status = reader.Read(ByteSpan(buffer)); status.ok()) {
///      ProcessSomeData(buffer);
///    }
/// @endcode
///
/// See the `MpscReader::ReadAll()` for additional examples.
///
/// The types in the files are designed to be used across different threads,
/// but are not completely thread-safe. Data must only be written by an
/// MpscWriter using a single thread, and data must only be read by an
/// MpscReader using a single thread. In other words, multiple calls to
/// `Write()` must not be made concurrently, and multiple calls to `Read()` and
/// `ReadAll()` must not be made concurrently. Calls to other methods, e.g.
/// `Close()`, are thread-safe and may be made from any thread.

#include <cstddef>

#include "pw_bytes/span.h"
#include "pw_chrono/system_clock.h"
#include "pw_containers/intrusive_list.h"
#include "pw_function/function.h"
#include "pw_status/status.h"
#include "pw_status/status_with_size.h"
#include "pw_stream/stream.h"
#include "pw_sync/lock_annotations.h"
#include "pw_sync/mutex.h"
#include "pw_sync/timed_thread_notification.h"

namespace pw::stream {

// Forward declaration.
class MpscReader;
class MpscWriter;

/// Creates a multi-producer, single consumer stream.
///
/// This method creates a stream by associating a reader and writer. Both are
/// reset before being connected. This is the only way to connect a reader.
/// Additional writers may be connected by copying the given writer after it is
/// connected.
///
/// This method is thread-safe with respect to other MpscReader and MpscWriter
/// methods. It is not thread-safe with respect to itself, i.e. callers must
/// not make concurrent calls to `CreateMpscStream()` from different threads
/// with the same objects.
///
/// @param[out]   reader  The reader to connect.
/// @param[out]   writer  The writer to connect.
void CreateMpscStream(MpscReader& reader, MpscWriter& writer);

/// Writer for a multi-producer, single consumer stream.
///
/// This class has a default constructor that only produces disconnected
/// writers. To connect writers, use `CreateMpscStream()`. Additional connected
/// writers can be created by copying an existing one.
///
/// Each thread should have its own dedicated writer. This class is thread-safe
/// with respect to the reader, but not with respect to itself. In particular,
/// attempting to call `Write()` concurrently on different threads may cause
/// result in a failure.
class MpscWriter : public NonSeekableWriter,
                   public IntrusiveList<MpscWriter>::Item {
 public:
  using duration = std::optional<chrono::SystemClock::duration>;

  /// A per-writer thread notification that can be added to a reader's list.
  ///
  /// The reader maintains a list of outstanding requests to write data. As
  /// data is read, and space to write data becomes available, it uses these
  /// requests to signal the waiting the writers.
  struct Request : public IntrusiveList<Request>::Item {
    sync::TimedThreadNotification notification;
    using IntrusiveList<Request>::Item::unlisted;
  };

  MpscWriter() = default;
  MpscWriter(const MpscWriter& other);
  MpscWriter& operator=(const MpscWriter& other);
  MpscWriter(MpscWriter&& other);
  MpscWriter& operator=(MpscWriter&& other);
  ~MpscWriter() override;

  /// Returns whether this object is connected to a reader.
  bool connected() const PW_LOCKS_EXCLUDED(mutex_);

  /// Indicates how much data was sent in the last call to `Write()`.
  size_t last_write() const PW_LOCKS_EXCLUDED(mutex_);

  /// Returns the optional maximum time elapsed before a `Write()` fails.
  const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_);

  /// Set the timeout for writing to this stream.
  ///
  /// After setting a timeout, if the given duration elapses while making a call
  /// to `Write()`, @pw_status{RESOURCE_EXHAUSTED} will be returned. If desired,
  /// a timeout should be set before calling `Write()`. Setting a timeout when a
  /// writer is awaiting notification from a reader will not affect the duration
  /// of that wait.
  ///
  /// Note that setting a write timeout makes partial writes possible. For
  /// example, if a call to `Write()` of some length corresponds to 2 calls to
  /// `Read()` of half that length with an sufficient delay between the calls
  /// will result in the first half being written and read, but not the second.
  /// This differs from `Stream::Write()` which stipulates that no data is
  /// written on failure. If this happens, the length of the data written can be
  /// retrieved using `last_write()`.
  ///
  /// Generally, callers should use one of three approaches:
  ///  1. Do not set a write timeout, and let writers block arbitrarily long
  ///     until space is available or the reader is disconnected.
  ///  2. Use only a single writer, and use `last_write()` to resend data.
  ///  3. Structure the data being sent so that the reader can always read
  ///     complete messages and avoid blocking or performing complex work
  ///     mid-message.
  ///
  /// @param[in]  timeout   The duration to wait before returning an error.
  void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);

  /// Sets the maximum amount that can be written by this writer.
  ///
  /// By default, writers can write an unlimited amount of data. This method can
  /// be used to set a limit, or remove it by providing a value of
  /// Stream::kUnlimited.
  ///
  /// If a limit is set, the writer will automatically close once it has written
  /// that much data. The current number of bytes remaining until the limit is
  /// reached can be retrieved using `ConservativeWriteLimit()`.
  ///
  /// @param[in]  limit   The maximum amount that can be written by this writer.
  void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_);

  /// Disconnects this writer from its reader.
  ///
  /// This method does nothing if the writer is not connected.
  void Close() PW_LOCKS_EXCLUDED(mutex_);

 private:
  // The factory method is allowed to directly modify a writer to connect it
  // to the reader.
  friend void CreateMpscStream(MpscReader&, MpscWriter&);

  /// @copydoc Stream::ConservativeLimit
  size_t ConservativeLimit(LimitType type) const override;

  /// @copydoc Stream::DoWrite
  ///
  /// This method is *not* thread-safe with respect to itself. If multiple
  /// threads attempt to write concurrently using the same writer, those calls
  /// may fail. Instead, each thread should have its own writer.
  ///
  /// @pre No other thread has called `Write()` on this object.
  Status DoWrite(ConstByteSpan data) override;

  /// Locked implementation of `Close()`.
  void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  mutable sync::Mutex mutex_;
  MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr;
  size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited;
  Request write_request_;
  duration timeout_ PW_GUARDED_BY(mutex_);
  size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
};

/// Reader of a multi-producer, single-consumer stream.
///
/// The reader manages 3 aspects of the stream:
///   * The storage used to hold written data that is to be read.
///   * The list of connected writers.
///   * Accounting for how much data has and can be written.
///
/// This class has a default constructor that can only produce a disconnected
/// reader. To connect a reader, use `CreateMpscStream()`.
class MpscReader : public NonSeekableReader {
 public:
  using duration = std::optional<chrono::SystemClock::duration>;

  MpscReader();
  ~MpscReader() override;

  /// Returns whether this object has any connected writers.
  bool connected() const PW_LOCKS_EXCLUDED(mutex_);

  /// Set the timeout for reading from this stream.
  ///
  /// After setting a timeout, if the given duration elapses while making a call
  /// to `Read()`, RESOURCE_EXHAUSTED will be returned. If desired, a timeout
  /// should be set before calling `Read()` or `ReadAll()`. Setting a timeout
  /// when a reader is awaiting notification from a writer will not affect the
  /// duration of that wait. `ReadUntilClose()` ignores timeouts entirely.
  ///
  /// @param[in]  timeout   The duration to wait before returning an error.
  void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);

  /// Associates the reader with storage to buffer written data to be read.
  ///
  /// If desired, callers can use this method to buffer written data. This can
  /// improve writer performance by allowing calls to `WriteData()` to avoid
  /// waiting for the reader, albeit at the cost of increased memory. This can
  /// be useful when the reader needs time to process the data it reads, or when
  /// the volume of writes varies over time, i.e. is "bursty".
  ///
  /// The reader does not take ownership of the storage, which must be valid
  /// until a call to the destructor or another call to `SetBuffer()`.
  ///
  /// @param[in]  buffer  A view to the storage.
  void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_);

  /// @fn ReadAll
  /// Reads data in a loop and passes it to a provided callback.
  ///
  /// This will read continuously until all connected writers close.
  ///
  /// Example usage:
  ///
  /// @code(.cpp}
  ///    MpscReader reader;
  ///    MpscWriter writer;
  ///    MpscStreamCreate(reader, writer);
  ///    thread::Thread t(MakeThreadOptions(), [] (void*arg) {
  ///      auto *writer = static_cast<MpscWriter *>(arg);
  ///      writer->Write(GenerateSomeData()).IgnoreError();
  ///    }, &writer);
  ///    auto status = reader.ReadAll([] (ConstByteSpan data) {
  ///      return ProcessSomeData();
  ///    });
  ///    t.join();
  /// @endcode
  ///
  /// @param[in]  callback  A callable object to invoke on data as it is read.
  /// @retval     OK                  Successfully read until writers closed.
  /// @retval     FAILED_PRECONDITION The object does not have a buffer.
  /// @retval     RESOURCE_EXHAUSTED  Timed out when reading data. This can only
  ///                                 occur if a timeout has been set.
  /// @retval     Any other error as returned by the callback.
  using ReadAllCallback = Function<Status(ConstByteSpan data)>;
  Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_);

  /// Disconnects all writers and drops any unread data.
  void Close() PW_LOCKS_EXCLUDED(mutex_);

 private:
  // The factory method is allowed to directly modify the reader to connect it
  // to a writer.
  friend void CreateMpscStream(MpscReader&, MpscWriter&);

  // The writer is allowed to call directly into the reader to:
  //  * Add/remove itself to the reader's list of writer.
  //  * Request space to write data, and to write to that space.
  friend class MpscWriter;

  /// @fn IncreaseLimit
  /// @fn IncreaseLimitLocked
  /// Increases the number of remaining bytes to be written.
  ///
  /// Used by `MpscWriter::SetLimit()` and `MpscWriter::WriteData()`.
  ///
  /// @param[in]  delta   How much to increase the number of remaining bytes.
  void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
  void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  /// @fn DecreaseLimit
  /// @fn DecreaseLimitLocked
  /// Decreases the number of remaining bytes to be written.
  ///
  /// Used by `MpscWriter::SetLimit()` and `MpscWriter::RemoveWriter()`.
  ///
  /// @param[in]  delta   How much to decrease the number of remaining bytes.
  void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
  void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  /// @copydoc Stream::ConservativeLimit
  size_t ConservativeLimit(Stream::LimitType type) const override
      PW_LOCKS_EXCLUDED(mutex_);

  /// Adds the write request to the reader's list of pending requests.
  ///
  /// Used by `MpscWriter::WriteData()`.
  ///
  /// @param[in]  write_request   A writer's request object.
  void RequestWrite(MpscWriter::Request& write_request)
      PW_LOCKS_EXCLUDED(mutex_);

  /// Checks if a writer can write data, and signals it if so.
  ///
  /// A reader may signal a writer because:
  ///   * Space to write data has become available.
  ///   * The queue of write requests has changed.
  ///   * The reader is closing. `WriteData()` will return OUT_OF_RANGE.
  void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  /// Adds data from a writer to the buffer to be read.
  ///
  /// @param[in]  data            The data to be written.
  /// @param[in]  limit           The writer's current write limit.
  ///
  /// @retval OK                  Data was written to the buffer.
  /// @retval RESOURCE_EXHAUSTED  Buffer has insufficent space for data.
  /// @retval OUT_OF_RANGE        Stream is shut down or closed.
  StatusWithSize WriteData(ConstByteSpan data, size_t limit)
      PW_LOCKS_EXCLUDED(mutex_);

  /// @fn CompleteWrite
  /// @fn CompleteWriteLocked
  /// Removes the write request from the reader's list of pending requests.
  ///
  /// Used by `MpscWriter::WriteData()` and `MpscWriter::CloseLocked()`.
  ///
  /// @param[in]  write_request   A writer's request object.
  void CompleteWrite(MpscWriter::Request& write_request)
      PW_LOCKS_EXCLUDED(mutex_);
  void CompleteWriteLocked(MpscWriter::Request& write_request)
      PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  /// @copydoc Stream::DoRead
  StatusWithSize DoRead(ByteSpan destination) override
      PW_LOCKS_EXCLUDED(mutex_);

  // Locked implementations.

  mutable sync::Mutex mutex_;
  IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_);
  IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_);
  IntrusiveList<MpscWriter::Request>::iterator last_request_
      PW_GUARDED_BY(mutex_);

  size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0;
  size_t limit_ PW_GUARDED_BY(mutex_) = 0;

  bool reading_ PW_GUARDED_BY(mutex_) = false;
  sync::TimedThreadNotification readable_;
  sync::ThreadNotification closeable_;
  duration timeout_ PW_GUARDED_BY(mutex_);

  ByteSpan destination_ PW_GUARDED_BY(mutex_);
  size_t written_ PW_GUARDED_BY(mutex_) = 0;

  ByteSpan buffer_ PW_GUARDED_BY(mutex_);
  size_t offset_ PW_GUARDED_BY(mutex_) = 0;
  size_t length_ PW_GUARDED_BY(mutex_) = 0;
};

/// Reader for a multi-producer, single consumer stream.
///
/// This class includes an explicitly-sized buffer. It has a default constructor
/// that can only produce a disconnected reader. To connect a reader, use
/// `CreateMpscStream()`.
template <size_t kCapacity>
class BufferedMpscReader : public MpscReader {
 public:
  BufferedMpscReader() { SetBuffer(buffer_); }

 private:
  std::array<std::byte, kCapacity> buffer_;
};

}  // namespace pw::stream