aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer/public/pw_transfer/internal/context.h
blob: 6baaa6eddb41e7fac22ecb22e83ad2d87413758c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#pragma once

#include <cinttypes>
#include <cstddef>
#include <limits>
#include <optional>

#include "pw_assert/assert.h"
#include "pw_chrono/system_clock.h"
#include "pw_rpc/writer.h"
#include "pw_status/status.h"
#include "pw_stream/stream.h"
#include "pw_transfer/internal/chunk.h"
#include "pw_transfer/internal/event.h"
#include "pw_transfer/internal/protocol.h"
#include "pw_transfer/rate_estimate.h"

namespace pw::transfer::internal {

class TransferThread;

class TransferParameters {
 public:
  constexpr TransferParameters(uint32_t pending_bytes,
                               uint32_t max_chunk_size_bytes,
                               uint32_t extend_window_divisor)
      : pending_bytes_(pending_bytes),
        max_chunk_size_bytes_(max_chunk_size_bytes),
        extend_window_divisor_(extend_window_divisor) {
    PW_ASSERT(pending_bytes > 0);
    PW_ASSERT(max_chunk_size_bytes > 0);
    PW_ASSERT(extend_window_divisor > 1);
  }

  uint32_t pending_bytes() const { return pending_bytes_; }
  void set_pending_bytes(uint32_t pending_bytes) {
    pending_bytes_ = pending_bytes;
  }

  uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; }
  void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) {
    max_chunk_size_bytes_ = max_chunk_size_bytes;
  }

  uint32_t extend_window_divisor() const { return extend_window_divisor_; }
  void set_extend_window_divisor(uint32_t extend_window_divisor) {
    PW_DASSERT(extend_window_divisor > 1);
    extend_window_divisor_ = extend_window_divisor;
  }

 private:
  uint32_t pending_bytes_;
  uint32_t max_chunk_size_bytes_;
  uint32_t extend_window_divisor_;
};

// Information about a single transfer.
class Context {
 public:
  static constexpr uint32_t kUnassignedSessionId = 0;

  Context(const Context&) = delete;
  Context(Context&&) = delete;
  Context& operator=(const Context&) = delete;
  Context& operator=(Context&&) = delete;

  constexpr uint32_t session_id() const { return session_id_; }
  constexpr uint32_t resource_id() const { return resource_id_; }

  // True if the context has been used for a transfer (it has an ID).
  bool initialized() const {
    return transfer_state_ != TransferState::kInactive;
  }

  // True if the transfer is active.
  bool active() const { return transfer_state_ >= TransferState::kInitiating; }

  std::optional<chrono::SystemClock::time_point> timeout() const {
    return active() && next_timeout_ != kNoTimeout
               ? std::optional(next_timeout_)
               : std::nullopt;
  }

  // Returns true if the transfer's most recently set timeout has passed.
  bool timed_out() const {
    std::optional<chrono::SystemClock::time_point> next_timeout = timeout();
    return next_timeout.has_value() &&
           chrono::SystemClock::now() >= next_timeout.value();
  }

  // Processes an event for this transfer.
  void HandleEvent(const Event& event);

 protected:
  ~Context() = default;

  constexpr Context()
      : session_id_(kUnassignedSessionId),
        resource_id_(0),
        desired_protocol_version_(ProtocolVersion::kUnknown),
        configured_protocol_version_(ProtocolVersion::kUnknown),
        flags_(0),
        transfer_state_(TransferState::kInactive),
        retries_(0),
        max_retries_(0),
        lifetime_retries_(0),
        max_lifetime_retries_(0),
        stream_(nullptr),
        rpc_writer_(nullptr),
        offset_(0),
        window_size_(0),
        window_end_offset_(0),
        max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
        max_parameters_(nullptr),
        thread_(nullptr),
        last_chunk_sent_(Chunk::Type::kData),
        last_chunk_offset_(0),
        chunk_timeout_(chrono::SystemClock::duration::zero()),
        initial_chunk_timeout_(chrono::SystemClock::duration::zero()),
        interchunk_delay_(chrono::SystemClock::for_at_least(
            std::chrono::microseconds(kDefaultChunkDelayMicroseconds))),
        next_timeout_(kNoTimeout) {}

  constexpr TransferType type() const {
    return static_cast<TransferType>(flags_ & kFlagsType);
  }

 private:
  enum class TransferState : uint8_t {
    // The context is available for use for a new transfer.
    kInactive,

    // A transfer completed and the final status chunk was sent. The Context is
    // available for use for a new transfer. A receive transfer uses this state
    // to allow a transmitter to retry its last chunk if the final status chunk
    // was dropped.
    //
    // Only used by the legacy protocol. Starting from version 2, transfer
    // completions are acknowledged, for which the TERMINATING state is used.
    kCompleted,

    // Transfer is starting. The server and client are performing an initial
    // handshake and negotiating protocol and feature flags.
    kInitiating,

    // Waiting for the other end to send a chunk.
    kWaiting,

    // Transmitting a window of data to a receiver.
    kTransmitting,

    // Recovering after one or more chunks was dropped in an active transfer.
    kRecovery,

    // Transfer has completed locally and is waiting for the peer to acknowledge
    // its final status. Only entered by the terminating side of the transfer.
    //
    // The context remains in a TERMINATING state until it receives an
    // acknowledgement from the peer or times out. Either way, the context
    // transitions to INACTIVE afterwards, fully cleaning it up for reuse.
    //
    // Used instead of COMPLETED starting from version 2. Unlike COMPLETED,
    // contexts in a TERMINATING state cannot be used to start new transfers.
    kTerminating,
  };

  enum class TransmitAction {
    // Start of a new transfer.
    kBegin,
    // Extend the current window length.
    kExtend,
    // Retransmit from a specified offset.
    kRetransmit,
  };

  void set_transfer_state(TransferState state) { transfer_state_ = state; }

  // The session ID as unsigned instead of uint32_t so it can be used with %u.
  unsigned id_for_log() const {
    static_assert(sizeof(unsigned) >= sizeof(session_id_));
    return static_cast<unsigned>(session_id_);
  }

  stream::Reader& reader() {
    PW_DASSERT(active() && type() == TransferType::kTransmit);
    return static_cast<stream::Reader&>(*stream_);
  }

  stream::Writer& writer() {
    PW_DASSERT(active() && type() == TransferType::kReceive);
    return static_cast<stream::Writer&>(*stream_);
  }

  bool DataTransferComplete() const {
    return transfer_state_ == TransferState::kTerminating ||
           transfer_state_ == TransferState::kCompleted;
  }

  bool ShouldSkipCompletionHandshake() const {
    // Completion handshakes are not part of the legacy protocol. Additionally,
    // transfers which have not yet fully established should not handshake and
    // simply time out.
    return configured_protocol_version_ <= ProtocolVersion::kLegacy ||
           transfer_state_ == TransferState::kInitiating;
  }

  // Calculates the maximum size of actual data that can be sent within a
  // single client write transfer chunk, accounting for the overhead of the
  // transfer protocol and RPC system.
  //
  // Note: This function relies on RPC protocol internals. This is generally a
  // *bad* idea, but is necessary here due to limitations of the RPC system
  // and its asymmetric ingress and egress paths.
  //
  // TODO(frolv): This should be investigated further and perhaps addressed
  // within the RPC system, at the least through a helper function.
  uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
                             uint32_t channel_id) const;

  // Initializes a new transfer using new_transfer. The provided stream
  // argument is used in place of the NewTransferEvent's stream. Only
  // initializes state; no packets are sent.
  //
  // Precondition: context is not active.
  void Initialize(const NewTransferEvent& new_transfer);

  // Starts a new transfer from an initialized context by sending the initial
  // transfer chunk. This is only used by transfer clients, as the transfer
  // service cannot initiate transfers.
  //
  // Calls Finish(), which calls the on_completion callback, if initiating a
  // transfer fails.
  void InitiateTransferAsClient();

  // Starts a new transfer on the server after receiving a request from a
  // client.
  bool StartTransferAsServer(const NewTransferEvent& new_transfer);

  // Does final cleanup specific to the server or client. Returns whether the
  // cleanup succeeded. An error in cleanup indicates that the transfer
  // failed.
  virtual Status FinalCleanup(Status status) = 0;

  // Processes a chunk in either a transfer or receive transfer.
  void HandleChunkEvent(const ChunkEvent& event);

  // Runs the initial three-way handshake when starting a new transfer.
  void PerformInitialHandshake(const Chunk& chunk);

  void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk);

  // Processes a chunk in a transmit transfer.
  void HandleTransmitChunk(const Chunk& chunk);

  // Processes a transfer parameters update in a transmit transfer.
  void HandleTransferParametersUpdate(const Chunk& chunk);

  // Sends the next chunk in a transmit transfer, if any.
  void TransmitNextChunk(bool retransmit_requested);

  // Processes a chunk in a receive transfer.
  void HandleReceiveChunk(const Chunk& chunk);

  // Processes a data chunk in a received while in the kWaiting state.
  void HandleReceivedData(const Chunk& chunk);

  // Sends the first chunk in a legacy transmit transfer.
  void SendInitialLegacyTransmitChunk();

  // Updates the current receive transfer parameters based on the context's
  // configuration.
  void UpdateTransferParameters();

  // Populates the transfer parameters fields on a chunk object.
  void SetTransferParameters(Chunk& parameters);

  // In a receive transfer, sends a parameters chunk telling the transmitter
  // how much data they can send.
  void SendTransferParameters(TransmitAction action);

  // Updates the current receive transfer parameters, then sends them.
  void UpdateAndSendTransferParameters(TransmitAction action);

  // Processes a chunk in a terminating state.
  void HandleTerminatingChunk(const Chunk& chunk);

  // Ends the transfer with the specified status, sending a completion chunk to
  // the peer.
  void TerminateTransfer(Status status, bool with_resource_id = false);

  // Ends a transfer following notification of completion from the peer.
  void HandleTermination(Status status);

  // Forcefully ends a transfer locally without contacting the peer.
  void Abort(Status status) {
    Finish(status);
    set_transfer_state(TransferState::kCompleted);
  }

  // Sends a final status chunk of a completed transfer without updating the
  // transfer. Sends status_, which MUST have been set by a previous Finish()
  // call.
  void SendFinalStatusChunk(bool with_resource_id = false);

  // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to
  // the final status for this transfer. The transfer MUST be active when this
  // is called.
  void Finish(Status status);

  // Encodes the specified chunk to the encode buffer and sends it with the
  // rpc_writer_. Calls Finish() with an error if the operation fails.
  void EncodeAndSendChunk(const Chunk& chunk);

  void SetTimeout(chrono::SystemClock::duration timeout);
  void ClearTimeout() { next_timeout_ = kNoTimeout; }

  // Called when the transfer's timeout expires.
  void HandleTimeout();

  // Resends the last packet or aborts the transfer if the maximum retries has
  // been exceeded.
  void Retry();
  void RetryHandshake();

  void LogTransferConfiguration();

  static constexpr uint8_t kFlagsType = 1 << 0;
  static constexpr uint8_t kFlagsDataSent = 1 << 1;
  static constexpr uint8_t kFlagsContactMade = 1 << 2;

  static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000;

  // How long to wait for the other side to ACK a final transfer chunk before
  // resetting the context so that it can be reused. During this time, the
  // status chunk will be re-sent for every non-ACK chunk received,
  // continually notifying the other end that the transfer is over.
  static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout =
      std::chrono::milliseconds(5000);

  static constexpr chrono::SystemClock::time_point kNoTimeout =
      chrono::SystemClock::time_point(chrono::SystemClock::duration(0));

  uint32_t session_id_;
  uint32_t resource_id_;

  // The version of the transfer protocol that this node wants to run.
  ProtocolVersion desired_protocol_version_;

  // The version of the transfer protocol that the context is actually running,
  // following negotiation with the transfer peer.
  ProtocolVersion configured_protocol_version_;

  uint8_t flags_;
  TransferState transfer_state_;
  uint8_t retries_;
  uint8_t max_retries_;
  uint32_t lifetime_retries_;
  uint32_t max_lifetime_retries_;

  // The stream from which to read or to which to write data.
  stream::Stream* stream_;
  rpc::Writer* rpc_writer_;

  uint32_t offset_;
  uint32_t window_size_;
  uint32_t window_end_offset_;
  uint32_t max_chunk_size_bytes_;

  const TransferParameters* max_parameters_;
  TransferThread* thread_;

  Chunk::Type last_chunk_sent_;

  union {
    Status status_;               // Used when state is kCompleted.
    uint32_t last_chunk_offset_;  // Used in states kWaiting and kRecovery.
  };

  // How long to wait for a chunk from the other end.
  chrono::SystemClock::duration chunk_timeout_;

  // How long for a client to wait for an initial server response.
  chrono::SystemClock::duration initial_chunk_timeout_;

  // How long to delay between transmitting subsequent data chunks within a
  // window.
  chrono::SystemClock::duration interchunk_delay_;

  // Timestamp at which the transfer will next time out, or kNoTimeout.
  chrono::SystemClock::time_point next_timeout_;

  RateEstimate transfer_rate_;
};

}  // namespace pw::transfer::internal