diff options
Diffstat (limited to 'pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py')
-rw-r--r-- | pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py b/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py new file mode 100644 index 000000000..78a8e4eb0 --- /dev/null +++ b/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py @@ -0,0 +1,87 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +"""Utils to decode logs.""" + +import logging + +from pw_log.log_decoder import LogStreamDecoder +from pw_log.proto import log_pb2 +import pw_rpc +import pw_status + +_LOG = logging.getLogger(__name__) + + +class LogStreamHandler: + """Handles an RPC Log Stream. + + Args: + rpcs: RPC services to request RPC Log Streams. + decoder: LogStreamDecoder + """ + + def __init__( + self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder + ) -> None: + self.rpcs = rpcs + self._decoder = decoder + + def listen_to_logs(self) -> None: + """Requests Logs streamed over RPC. + + The RPCs remain open until the server cancels or closes them, either + with a response or error packet. + """ + + def on_log_entries(_, log_entries_proto: log_pb2.LogEntries) -> None: + self._decoder.parse_log_entries_proto(log_entries_proto) + + self.rpcs.pw.log.Logs.Listen.open( + on_next=on_log_entries, + on_completed=lambda _, status: self.handle_log_stream_completed( + status + ), + on_error=lambda _, error: self.handle_log_stream_error(error), + ) + + def handle_log_stream_error(self, error: pw_status.Status) -> None: + """Resets the log stream RPC on error to avoid losing logs. + + Override this function to change default behavior. + """ + _LOG.error( + 'Log stream error: %s from source %s', + error, + self.source_name, + ) + # Only re-request logs if the RPC was not cancelled by the client. + if error != pw_status.Status.CANCELLED: + self.listen_to_logs() + + def handle_log_stream_completed(self, status: pw_status.Status) -> None: + """Resets the log stream RPC on completed to avoid losing logs. + + Override this function to change default behavior. + """ + _LOG.debug( + 'Log stream completed with status: %s for source: %s', + status, + self.source_name, + ) + self.listen_to_logs() + + @property + def source_name(self) -> str: + return self._decoder.source_name |