diff options
Diffstat (limited to 'pw_transfer/py/pw_transfer/transfer.py')
-rw-r--r-- | pw_transfer/py/pw_transfer/transfer.py | 649 |
1 files changed, 482 insertions, 167 deletions
diff --git a/pw_transfer/py/pw_transfer/transfer.py b/pw_transfer/py/pw_transfer/transfer.py index 06574f4e7..11dc55636 100644 --- a/pw_transfer/py/pw_transfer/transfer.py +++ b/pw_transfer/py/pw_transfer/transfer.py @@ -16,13 +16,14 @@ import abc import asyncio from dataclasses import dataclass +import enum import logging import math import threading from typing import Any, Callable, Optional from pw_status import Status -from pw_transfer.transfer_pb2 import Chunk +from pw_transfer.chunk import Chunk, ProtocolVersion _LOG = logging.getLogger(__package__) @@ -40,10 +41,13 @@ class ProgressStats: return self.bytes_confirmed_received / self.total_size_bytes * 100 def __str__(self) -> str: - total = str( - self.total_size_bytes) if self.total_size_bytes else 'unknown' - return (f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, ' - f'{self.bytes_confirmed_received} B received of {total} B)') + total = ( + str(self.total_size_bytes) if self.total_size_bytes else 'unknown' + ) + return ( + f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, ' + f'{self.bytes_confirmed_received} B received of {total} B)' + ) ProgressCallback = Callable[[ProgressStats], Any] @@ -51,12 +55,13 @@ ProgressCallback = Callable[[ProgressStats], Any] class _Timer: """A timer which invokes a callback after a certain timeout.""" + def __init__(self, timeout_s: float, callback: Callable[[], Any]): self.timeout_s = timeout_s self._callback = callback self._task: Optional[asyncio.Task[Any]] = None - def start(self, timeout_s: float = None) -> None: + def start(self, timeout_s: Optional[float] = None) -> None: """Starts a new timer. If a timer is already running, it is stopped and a new timer started. @@ -86,23 +91,76 @@ class Transfer(abc.ABC): of transfer, receiving messages from the server and sending the appropriate messages in response. """ - def __init__(self, - transfer_id: int, - send_chunk: Callable[[Chunk], None], - end_transfer: Callable[['Transfer'], None], - response_timeout_s: float, - initial_response_timeout_s: float, - max_retries: int, - progress_callback: ProgressCallback = None): - self.id = transfer_id + + # pylint: disable=too-many-instance-attributes + + class _State(enum.Enum): + # Transfer is starting. The server and client are performing an initial + # handshake and negotiating protocol and feature flags. + INITIATING = 0 + + # Waiting for the other end to send a chunk. + WAITING = 1 + + # Transmitting a window of data to a receiver. + TRANSMITTING = 2 + + # Recovering after one or more chunks was dropped in an active transfer. + RECOVERY = 3 + + # Transfer has completed locally and is waiting for the peer to + # acknowledge its final status. Only entered by the terminating side of + # the transfer. + # + # The context remains in a TERMINATING state until it receives an + # acknowledgement from the peer or times out. + TERMINATING = 4 + + # A transfer has fully completed. + COMPLETE = 5 + + _UNASSIGNED_SESSION_ID = 0 + + def __init__( # pylint: disable=too-many-arguments + self, + resource_id: int, + send_chunk: Callable[[Chunk], None], + end_transfer: Callable[['Transfer'], None], + response_timeout_s: float, + initial_response_timeout_s: float, + max_retries: int, + max_lifetime_retries: int, + protocol_version: ProtocolVersion, + progress_callback: Optional[ProgressCallback] = None, + ): self.status = Status.OK self.done = threading.Event() - self._send_chunk = send_chunk + self._session_id = self._UNASSIGNED_SESSION_ID + self._resource_id = resource_id + + self._send_chunk_fn = send_chunk self._end_transfer = end_transfer + self._desired_protocol_version = protocol_version + self._configured_protocol_version = ProtocolVersion.UNKNOWN + + if self._desired_protocol_version is ProtocolVersion.LEGACY: + # In a legacy transfer, there is no protocol negotiation stage. + # Automatically configure the context to run the legacy protocol and + # proceed to waiting for a chunk. + self._configured_protocol_version = ProtocolVersion.LEGACY + self._state = Transfer._State.WAITING + self._session_id = self._resource_id + else: + self._state = Transfer._State.INITIATING + + self._last_chunk: Optional[Chunk] = None + self._retries = 0 self._max_retries = max_retries + self._lifetime_retries = 0 + self._max_lifetime_retries = max_lifetime_retries self._response_timer = _Timer(response_timeout_s, self._on_timeout) self._initial_response_timeout_s = initial_response_timeout_s @@ -110,17 +168,57 @@ class Transfer(abc.ABC): async def begin(self) -> None: """Sends the initial chunk of the transfer.""" - self._send_chunk(self._initial_chunk()) + + if ( + self._desired_protocol_version is ProtocolVersion.UNKNOWN + or self._desired_protocol_version.value + > ProtocolVersion.LATEST.value + ): + _LOG.error( + 'Cannot start a transfer with unsupported protocol version %d', + self._desired_protocol_version.value, + ) + self.finish(Status.INVALID_ARGUMENT) + return + + initial_chunk = Chunk( + self._desired_protocol_version, + Chunk.Type.START, + resource_id=self._resource_id, + ) + + # Regardless of the desired protocol version, set any additional fields + # on the opening chunk, in case the server only runs legacy. + self._set_initial_chunk_fields(initial_chunk) + + self._send_chunk(initial_chunk) self._response_timer.start(self._initial_response_timeout_s) @property + def id(self) -> int: + """Returns the identifier for the active transfer.""" + if self._session_id != self._UNASSIGNED_SESSION_ID: + return self._session_id + return self._resource_id + + @property + def resource_id(self) -> int: + """Returns the identifier of the resource being transferred.""" + return self._resource_id + + @property @abc.abstractmethod def data(self) -> bytes: """Returns the data read or written in this transfer.""" @abc.abstractmethod - def _initial_chunk(self) -> Chunk: - """Returns the initial chunk to notify the sever of the transfer.""" + def _set_initial_chunk_fields(self, chunk: Chunk) -> None: + """Sets fields for the initial non-handshake chunk of the transfer.""" + + def _send_chunk(self, chunk: Chunk) -> None: + """Sends a chunk to the server, keeping track of the last chunk sent.""" + self._send_chunk_fn(chunk) + self._last_chunk = chunk async def handle_chunk(self, chunk: Chunk) -> None: """Processes an incoming chunk from the server. @@ -131,41 +229,144 @@ class Transfer(abc.ABC): self._response_timer.stop() self._retries = 0 # Received data from service, so reset the retries. - _LOG.debug('Received chunk\n%s', str(chunk).rstrip()) + _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip()) # Status chunks are only used to terminate a transfer. They do not # contain any data that requires processing. - if chunk.HasField('status'): + if chunk.status is not None: + if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: + self._send_chunk( + Chunk( + self._configured_protocol_version, + Chunk.Type.COMPLETION_ACK, + session_id=self._session_id, + ) + ) + self.finish(Status(chunk.status)) return - await self._handle_data_chunk(chunk) + if self._state is Transfer._State.INITIATING: + await self._perform_initial_handshake(chunk) + elif self._state is Transfer._State.TERMINATING: + if chunk.type is Chunk.Type.COMPLETION_ACK: + self.finish(self.status) + else: + # Expecting a completion ACK but didn't receive one. Go through + # the retry process. + self._on_timeout() + else: + await self._handle_data_chunk(chunk) # Start the timeout for the server to send a chunk in response. self._response_timer.start() + async def _perform_initial_handshake(self, chunk: Chunk) -> None: + """Progresses the initial handshake phase of a v2+ transfer.""" + assert self._state is Transfer._State.INITIATING + + # If a non-handshake chunk is received during an INITIATING state, the + # transfer server is running a legacy protocol version, which does not + # perform a handshake. End the handshake, revert to the legacy protocol, + # and process the chunk appropriately. + if chunk.type is not Chunk.Type.START_ACK: + _LOG.debug( + 'Transfer %d got non-handshake chunk, reverting to legacy', + self.id, + ) + + self._configured_protocol_version = ProtocolVersion.LEGACY + self._state = Transfer._State.WAITING + + # Update the transfer's session ID in case it was expecting one to + # be assigned by the server. + self._session_id = chunk.session_id + + await self._handle_data_chunk(chunk) + return + + self._session_id = chunk.session_id + + self._configured_protocol_version = ProtocolVersion( + min( + self._desired_protocol_version.value, + chunk.protocol_version.value, + ) + ) + _LOG.debug( + 'Transfer %d negotiating protocol version: ours=%d, theirs=%d', + self.id, + self._desired_protocol_version.value, + chunk.protocol_version.value, + ) + + # Send a confirmation chunk to the server accepting the assigned session + # ID and protocol version. Tag any initial transfer parameters onto the + # chunk to begin the data transfer. + start_ack_confirmation = Chunk( + self._configured_protocol_version, + Chunk.Type.START_ACK_CONFIRMATION, + session_id=self._session_id, + ) + self._set_initial_chunk_fields(start_ack_confirmation) + + self._state = Transfer._State.WAITING + self._send_chunk(start_ack_confirmation) + @abc.abstractmethod async def _handle_data_chunk(self, chunk: Chunk) -> None: """Handles a chunk that contains or requests data.""" @abc.abstractmethod - def _retry_after_timeout(self) -> None: - """Retries after a timeout occurs.""" + def _retry_after_data_timeout(self) -> None: + """Retries after a timeout occurs during the data transfer phase. + + Only invoked when in the data transfer phase (i.e. state is in + {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an + opening or closing handshake are handled by the base Transfer. + """ def _on_timeout(self) -> None: """Handles a timeout while waiting for a chunk.""" - if self.done.is_set(): + if self._state is Transfer._State.COMPLETE: return self._retries += 1 - if self._retries > self._max_retries: - self.finish(Status.DEADLINE_EXCEEDED) + self._lifetime_retries += 1 + + if ( + self._retries > self._max_retries + or self._lifetime_retries > self._max_lifetime_retries + ): + if self._state is Transfer._State.TERMINATING: + # If the server never responded to the sent completion chunk, + # simply end the transfer locally with its original status. + self.finish(self.status) + else: + self.finish(Status.DEADLINE_EXCEEDED) return - _LOG.debug('Received no responses for %.3fs; retrying %d/%d', - self._response_timer.timeout_s, self._retries, - self._max_retries) - self._retry_after_timeout() + _LOG.debug( + 'Received no responses for %.3fs; retrying %d/%d', + self._response_timer.timeout_s, + self._retries, + self._max_retries, + ) + + retry_handshake_chunk = self._state in ( + Transfer._State.INITIATING, + Transfer._State.TERMINATING, + ) or ( + self._last_chunk is not None + and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION + ) + + if retry_handshake_chunk: + assert self._last_chunk is not None + self._send_chunk(self._last_chunk) + else: + self._retry_after_data_timeout() + self._response_timer.start() def finish(self, status: Status, skip_callback: bool = False) -> None: @@ -181,49 +382,74 @@ class Transfer(abc.ABC): self._end_transfer(self) # Set done last so that the transfer has been fully cleaned up. + self._state = Transfer._State.COMPLETE self.done.set() - def _update_progress(self, bytes_sent: int, bytes_confirmed_received: int, - total_size_bytes: Optional[int]) -> None: + def _update_progress( + self, + bytes_sent: int, + bytes_confirmed_received: int, + total_size_bytes: Optional[int], + ) -> None: """Invokes the provided progress callback, if any, with the progress.""" - stats = ProgressStats(bytes_sent, bytes_confirmed_received, - total_size_bytes) + stats = ProgressStats( + bytes_sent, bytes_confirmed_received, total_size_bytes + ) _LOG.debug('Transfer %d progress: %s', self.id, stats) if self._progress_callback: self._progress_callback(stats) - def _send_error(self, error: Status) -> None: - """Sends an error chunk to the server and finishes the transfer.""" + def _send_final_chunk(self, status: Status) -> None: + """Sends a status chunk to the server and finishes the transfer.""" self._send_chunk( - Chunk(transfer_id=self.id, - status=error.value, - type=Chunk.Type.TRANSFER_COMPLETION)) - self.finish(error) + Chunk( + self._configured_protocol_version, + Chunk.Type.COMPLETION, + session_id=self.id, + status=status, + ) + ) + + if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: + # Wait for a completion ACK from the server. + self.status = status + self._state = Transfer._State.TERMINATING + self._response_timer.start() + else: + self.finish(status) class WriteTransfer(Transfer): """A client -> server write transfer.""" - def __init__( + + def __init__( # pylint: disable=too-many-arguments self, - transfer_id: int, + resource_id: int, data: bytes, send_chunk: Callable[[Chunk], None], end_transfer: Callable[[Transfer], None], response_timeout_s: float, initial_response_timeout_s: float, max_retries: int, - progress_callback: ProgressCallback = None, + max_lifetime_retries: int, + protocol_version: ProtocolVersion, + progress_callback: Optional[ProgressCallback] = None, ): - super().__init__(transfer_id, send_chunk, end_transfer, - response_timeout_s, initial_response_timeout_s, - max_retries, progress_callback) + super().__init__( + resource_id, + send_chunk, + end_transfer, + response_timeout_s, + initial_response_timeout_s, + max_retries, + max_lifetime_retries, + protocol_version, + progress_callback, + ) self._data = data - # Guard this class with a lock since a transfer parameters update might - # arrive while responding to a prior update. - self._lock = asyncio.Lock() self._offset = 0 self._window_end_offset = 0 self._max_chunk_size = 0 @@ -232,14 +458,15 @@ class WriteTransfer(Transfer): # The window ID increments for each parameters update. self._window_id = 0 - self._last_chunk = self._initial_chunk() + self._bytes_confirmed_received = 0 @property def data(self) -> bytes: return self._data - def _initial_chunk(self) -> Chunk: - return Chunk(transfer_id=self.id, type=Chunk.Type.TRANSFER_START) + def _set_initial_chunk_fields(self, chunk: Chunk) -> None: + # Nothing to tag onto the initial chunk in a write transfer. + pass async def _handle_data_chunk(self, chunk: Chunk) -> None: """Processes an incoming chunk from the server. @@ -249,107 +476,124 @@ class WriteTransfer(Transfer): send data accordingly. """ - async with self._lock: - self._window_id += 1 - window_id = self._window_id + if self._state is Transfer._State.TRANSMITTING: + self._state = Transfer._State.WAITING - if not self._handle_parameters_update(chunk): - return + assert self._state is Transfer._State.WAITING + + if not self._handle_parameters_update(chunk): + return + + self._bytes_confirmed_received = chunk.offset + self._state = Transfer._State.TRANSMITTING - bytes_acknowledged = chunk.offset + self._window_id += 1 + asyncio.create_task(self._transmit_next_chunk(self._window_id)) - while True: - if self._chunk_delay_us: - await asyncio.sleep(self._chunk_delay_us / 1e6) + async def _transmit_next_chunk( + self, window_id: int, timeout_us: Optional[int] = None + ) -> None: + """Transmits a single data chunk to the server. - async with self._lock: - if self.done.is_set(): - return + If the chunk completes the active window, returns to a WAITING state. + Otherwise, schedules another transmission for the next chunk. + """ + if timeout_us is not None: + await asyncio.sleep(timeout_us / 1e6) - if window_id != self._window_id: - _LOG.debug('Transfer %d: Skipping stale window', self.id) - return + if self._state is not Transfer._State.TRANSMITTING: + return - write_chunk = self._next_chunk() - self._offset += len(write_chunk.data) - sent_requested_bytes = self._offset == self._window_end_offset + if window_id != self._window_id: + _LOG.debug('Transfer %d: Skipping stale window', self.id) + return - self._send_chunk(write_chunk) + chunk = self._next_chunk() + self._offset += len(chunk.data) - self._update_progress(self._offset, bytes_acknowledged, - len(self.data)) + sent_requested_bytes = self._offset == self._window_end_offset - if sent_requested_bytes: - break + self._send_chunk(chunk) + self._update_progress( + self._offset, self._bytes_confirmed_received, len(self.data) + ) - self._last_chunk = write_chunk + if sent_requested_bytes: + self._state = Transfer._State.WAITING + else: + asyncio.create_task( + self._transmit_next_chunk( + window_id, timeout_us=self._chunk_delay_us + ) + ) def _handle_parameters_update(self, chunk: Chunk) -> bool: """Updates transfer state based on a transfer parameters update.""" - retransmit = True - if chunk.HasField('type'): - retransmit = (chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT - or chunk.type == Chunk.Type.TRANSFER_START) - if chunk.offset > len(self.data): # Bad offset; terminate the transfer. _LOG.error( 'Transfer %d: server requested invalid offset %d (size %d)', - self.id, chunk.offset, len(self.data)) + self.id, + chunk.offset, + len(self.data), + ) - self._send_error(Status.OUT_OF_RANGE) + self._send_final_chunk(Status.OUT_OF_RANGE) return False - if chunk.pending_bytes == 0: + if chunk.offset == chunk.window_end_offset: _LOG.error( 'Transfer %d: service requested 0 bytes (invalid); aborting', - self.id) - self._send_error(Status.INTERNAL) + self.id, + ) + self._send_final_chunk(Status.INTERNAL) return False - if retransmit: + # Extend the window to the new end offset specified by the server. + self._window_end_offset = min(chunk.window_end_offset, len(self.data)) + + if chunk.requests_transmission_from_offset(): # Check whether the client has sent a previous data offset, which # indicates that some chunks were lost in transmission. if chunk.offset < self._offset: - _LOG.debug('Write transfer %d rolling back: offset %d from %d', - self.id, chunk.offset, self._offset) + _LOG.debug( + 'Write transfer %d rolling back: offset %d from %d', + self.id, + chunk.offset, + self._offset, + ) self._offset = chunk.offset - # Retransmit is the default behavior for older versions of the - # transfer protocol. The window_end_offset field is not guaranteed - # to be set in these version, so it must be calculated. - max_bytes_to_send = min(chunk.pending_bytes, - len(self.data) - self._offset) - self._window_end_offset = self._offset + max_bytes_to_send - else: - assert chunk.type == Chunk.Type.PARAMETERS_CONTINUE - - # Extend the window to the new end offset specified by the server. - self._window_end_offset = min(chunk.window_end_offset, - len(self.data)) - - if chunk.HasField('max_chunk_size_bytes'): + if chunk.max_chunk_size_bytes is not None: self._max_chunk_size = chunk.max_chunk_size_bytes - if chunk.HasField('min_delay_microseconds'): + if chunk.min_delay_microseconds is not None: self._chunk_delay_us = chunk.min_delay_microseconds return True - def _retry_after_timeout(self) -> None: - self._send_chunk(self._last_chunk) + def _retry_after_data_timeout(self) -> None: + if ( + self._state is Transfer._State.WAITING + and self._last_chunk is not None + ): + self._send_chunk(self._last_chunk) def _next_chunk(self) -> Chunk: """Returns the next Chunk message to send in the data transfer.""" - chunk = Chunk(transfer_id=self.id, - offset=self._offset, - type=Chunk.Type.TRANSFER_DATA) - max_bytes_in_chunk = min(self._max_chunk_size, - self._window_end_offset - self._offset) - - chunk.data = self.data[self._offset:self._offset + max_bytes_in_chunk] + chunk = Chunk( + self._configured_protocol_version, + Chunk.Type.DATA, + session_id=self.id, + offset=self._offset, + ) + + max_bytes_in_chunk = min( + self._max_chunk_size, self._window_end_offset - self._offset + ) + chunk.data = self.data[self._offset : self._offset + max_bytes_in_chunk] # Mark the final chunk of the transfer. if len(self.data) - self._offset <= max_bytes_in_chunk: @@ -376,20 +620,31 @@ class ReadTransfer(Transfer): EXTEND_WINDOW_DIVISOR = 2 def __init__( # pylint: disable=too-many-arguments - self, - transfer_id: int, - send_chunk: Callable[[Chunk], None], - end_transfer: Callable[[Transfer], None], - response_timeout_s: float, - initial_response_timeout_s: float, - max_retries: int, - max_bytes_to_receive: int = 8192, - max_chunk_size: int = 1024, - chunk_delay_us: int = None, - progress_callback: ProgressCallback = None): - super().__init__(transfer_id, send_chunk, end_transfer, - response_timeout_s, initial_response_timeout_s, - max_retries, progress_callback) + self, + resource_id: int, + send_chunk: Callable[[Chunk], None], + end_transfer: Callable[[Transfer], None], + response_timeout_s: float, + initial_response_timeout_s: float, + max_retries: int, + max_lifetime_retries: int, + protocol_version: ProtocolVersion, + max_bytes_to_receive: int = 8192, + max_chunk_size: int = 1024, + chunk_delay_us: Optional[int] = None, + progress_callback: Optional[ProgressCallback] = None, + ): + super().__init__( + resource_id, + send_chunk, + end_transfer, + response_timeout_s, + initial_response_timeout_s, + max_retries, + max_lifetime_retries, + protocol_version, + progress_callback, + ) self._max_bytes_to_receive = max_bytes_to_receive self._max_chunk_size = max_chunk_size self._chunk_delay_us = chunk_delay_us @@ -397,16 +652,16 @@ class ReadTransfer(Transfer): self._remaining_transfer_size: Optional[int] = None self._data = bytearray() self._offset = 0 - self._pending_bytes = max_bytes_to_receive self._window_end_offset = max_bytes_to_receive + self._last_chunk_offset: Optional[int] = None @property def data(self) -> bytes: """Returns an immutable copy of the data that has been read.""" return bytes(self._data) - def _initial_chunk(self) -> Chunk: - return self._transfer_parameters(Chunk.Type.TRANSFER_START) + def _set_initial_chunk_fields(self, chunk: Chunk) -> None: + self._set_transfer_parameters(chunk) async def _handle_data_chunk(self, chunk: Chunk) -> None: """Processes an incoming chunk from the server. @@ -415,26 +670,67 @@ class ReadTransfer(Transfer): Once all pending data is received, the transfer parameters are updated. """ + if self._state is Transfer._State.RECOVERY: + if chunk.offset != self._offset: + if self._last_chunk_offset == chunk.offset: + _LOG.debug( + 'Transfer %d received repeated offset %d: ' + 'retry detected, resending transfer parameters', + self.id, + chunk.offset, + ) + self._send_chunk( + self._transfer_parameters( + Chunk.Type.PARAMETERS_RETRANSMIT + ) + ) + else: + _LOG.debug( + 'Transfer %d waiting for offset %d, ignoring %d', + self.id, + self._offset, + chunk.offset, + ) + self._last_chunk_offset = chunk.offset + return + + _LOG.info( + 'Transfer %d received expected offset %d, resuming transfer', + self.id, + chunk.offset, + ) + self._state = Transfer._State.WAITING + + assert self._state is Transfer._State.WAITING + if chunk.offset != self._offset: # Initially, the transfer service only supports in-order transfers. # If data is received out of order, request that the server # retransmit from the previous offset. + _LOG.debug( + 'Transfer %d expected offset %d, received %d: ' + 'entering recovery state', + self.id, + self._offset, + chunk.offset, + ) + self._state = Transfer._State.RECOVERY + self._send_chunk( - self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) + self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT) + ) return self._data += chunk.data - self._pending_bytes -= len(chunk.data) self._offset += len(chunk.data) - if chunk.HasField('remaining_bytes'): + # Update the last offset seen so that retries can be detected. + self._last_chunk_offset = chunk.offset + + if chunk.remaining_bytes is not None: if chunk.remaining_bytes == 0: # No more data to read. Acknowledge receipt and finish. - self._send_chunk( - Chunk(transfer_id=self.id, - status=Status.OK.value, - type=Chunk.Type.TRANSFER_COMPLETION)) - self.finish(Status.OK) + self._send_final_chunk(Status.OK) return # The server may indicate if the amount of remaining data is known. @@ -447,61 +743,80 @@ class ReadTransfer(Transfer): if self._remaining_transfer_size <= 0: self._remaining_transfer_size = None - total_size = None if self._remaining_transfer_size is None else ( - self._remaining_transfer_size + self._offset) + total_size = ( + None + if self._remaining_transfer_size is None + else (self._remaining_transfer_size + self._offset) + ) self._update_progress(self._offset, self._offset, total_size) if chunk.window_end_offset != 0: if chunk.window_end_offset < self._offset: _LOG.error( 'Transfer %d: transmitter sent invalid earlier end offset ' - '%d (receiver offset %d)', self.id, - chunk.window_end_offset, self._offset) - self._send_error(Status.INTERNAL) + '%d (receiver offset %d)', + self.id, + chunk.window_end_offset, + self._offset, + ) + self._send_final_chunk(Status.INTERNAL) return if chunk.window_end_offset > self._window_end_offset: _LOG.error( 'Transfer %d: transmitter sent invalid later end offset ' - '%d (receiver end offset %d)', self.id, - chunk.window_end_offset, self._window_end_offset) - self._send_error(Status.INTERNAL) + '%d (receiver end offset %d)', + self.id, + chunk.window_end_offset, + self._window_end_offset, + ) + self._send_final_chunk(Status.INTERNAL) return self._window_end_offset = chunk.window_end_offset - self._pending_bytes -= chunk.window_end_offset - self._offset remaining_window_size = self._window_end_offset - self._offset - extend_window = (remaining_window_size <= self._max_bytes_to_receive / - ReadTransfer.EXTEND_WINDOW_DIVISOR) + extend_window = ( + remaining_window_size + <= self._max_bytes_to_receive / ReadTransfer.EXTEND_WINDOW_DIVISOR + ) - if self._pending_bytes == 0: + if self._offset == self._window_end_offset: # All pending data was received. Send out a new parameters chunk for # the next block. self._send_chunk( - self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) + self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT) + ) elif extend_window: self._send_chunk( - self._transfer_parameters(Chunk.Type.PARAMETERS_CONTINUE)) - - def _retry_after_timeout(self) -> None: - self._send_chunk( - self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) - - def _transfer_parameters(self, chunk_type: Any) -> Chunk: - """Sends an updated transfer parameters chunk to the server.""" + self._transfer_parameters(Chunk.Type.PARAMETERS_CONTINUE) + ) + + def _retry_after_data_timeout(self) -> None: + if ( + self._state is Transfer._State.WAITING + or self._state is Transfer._State.RECOVERY + ): + self._send_chunk( + self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT) + ) - self._pending_bytes = self._max_bytes_to_receive + def _set_transfer_parameters(self, chunk: Chunk) -> None: self._window_end_offset = self._offset + self._max_bytes_to_receive - chunk = Chunk(transfer_id=self.id, - pending_bytes=self._pending_bytes, - window_end_offset=self._window_end_offset, - max_chunk_size_bytes=self._max_chunk_size, - offset=self._offset, - type=chunk_type) + chunk.offset = self._offset + chunk.window_end_offset = self._window_end_offset + chunk.max_chunk_size_bytes = self._max_chunk_size if self._chunk_delay_us: chunk.min_delay_microseconds = self._chunk_delay_us + def _transfer_parameters(self, chunk_type: Any) -> Chunk: + """Returns an updated transfer parameters chunk.""" + + chunk = Chunk( + self._configured_protocol_version, chunk_type, session_id=self.id + ) + self._set_transfer_parameters(chunk) + return chunk |