aboutsummaryrefslogtreecommitdiff
path: root/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py')
-rw-r--r--pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py87
1 files changed, 87 insertions, 0 deletions
diff --git a/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py b/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py
new file mode 100644
index 000000000..78a8e4eb0
--- /dev/null
+++ b/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py
@@ -0,0 +1,87 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+"""Utils to decode logs."""
+
+import logging
+
+from pw_log.log_decoder import LogStreamDecoder
+from pw_log.proto import log_pb2
+import pw_rpc
+import pw_status
+
+_LOG = logging.getLogger(__name__)
+
+
+class LogStreamHandler:
+ """Handles an RPC Log Stream.
+
+ Args:
+ rpcs: RPC services to request RPC Log Streams.
+ decoder: LogStreamDecoder
+ """
+
+ def __init__(
+ self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder
+ ) -> None:
+ self.rpcs = rpcs
+ self._decoder = decoder
+
+ def listen_to_logs(self) -> None:
+ """Requests Logs streamed over RPC.
+
+ The RPCs remain open until the server cancels or closes them, either
+ with a response or error packet.
+ """
+
+ def on_log_entries(_, log_entries_proto: log_pb2.LogEntries) -> None:
+ self._decoder.parse_log_entries_proto(log_entries_proto)
+
+ self.rpcs.pw.log.Logs.Listen.open(
+ on_next=on_log_entries,
+ on_completed=lambda _, status: self.handle_log_stream_completed(
+ status
+ ),
+ on_error=lambda _, error: self.handle_log_stream_error(error),
+ )
+
+ def handle_log_stream_error(self, error: pw_status.Status) -> None:
+ """Resets the log stream RPC on error to avoid losing logs.
+
+ Override this function to change default behavior.
+ """
+ _LOG.error(
+ 'Log stream error: %s from source %s',
+ error,
+ self.source_name,
+ )
+ # Only re-request logs if the RPC was not cancelled by the client.
+ if error != pw_status.Status.CANCELLED:
+ self.listen_to_logs()
+
+ def handle_log_stream_completed(self, status: pw_status.Status) -> None:
+ """Resets the log stream RPC on completed to avoid losing logs.
+
+ Override this function to change default behavior.
+ """
+ _LOG.debug(
+ 'Log stream completed with status: %s for source: %s',
+ status,
+ self.source_name,
+ )
+ self.listen_to_logs()
+
+ @property
+ def source_name(self) -> str:
+ return self._decoder.source_name