aboutsummaryrefslogtreecommitdiff
path: root/pw_console/py/pw_console/plugins/bandwidth_toolbar.py
diff options
context:
space:
mode:
Diffstat (limited to 'pw_console/py/pw_console/plugins/bandwidth_toolbar.py')
-rw-r--r--pw_console/py/pw_console/plugins/bandwidth_toolbar.py152
1 files changed, 150 insertions, 2 deletions
diff --git a/pw_console/py/pw_console/plugins/bandwidth_toolbar.py b/pw_console/py/pw_console/plugins/bandwidth_toolbar.py
index e0135a6c5..2f4ecfdc9 100644
--- a/pw_console/py/pw_console/plugins/bandwidth_toolbar.py
+++ b/pw_console/py/pw_console/plugins/bandwidth_toolbar.py
@@ -11,13 +11,161 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-"""Bandwidth Monitor Toolbar"""
+"""Bandwidth Monitor Toolbar and Tracker."""
+from __future__ import annotations
+
+from contextvars import ContextVar
+import logging
+import textwrap
+from typing import TYPE_CHECKING
from prompt_toolkit.layout import WindowAlign
from pw_console.plugin_mixin import PluginMixin
from pw_console.widgets import ToolbarButton, WindowPaneToolbar
-from pw_console.pyserial_wrapper import BANDWIDTH_HISTORY_CONTEXTVAR
+from pw_console.widgets.event_count_history import EventCountHistory
+
+if TYPE_CHECKING:
+ from _typeshed import ReadableBuffer
+
+_LOG = logging.getLogger('pw_console.serial_debug_logger')
+
+
+def _log_hex_strings(data: bytes, prefix=''):
+ """Create alinged hex number and character view log messages."""
+ # Make a list of 2 character hex number strings.
+ hex_numbers = textwrap.wrap(data.hex(), 2)
+
+ hex_chars = [
+ ('<' + str(b.to_bytes(1, byteorder='big')) + '>')
+ .replace("<b'\\x", '', 1) # Remove b'\x from the beginning
+ .replace("<b'", '', 1) # Remove b' from the beginning
+ .replace("'>", '', 1) # Remove ' from the end
+ .rjust(2)
+ for b in data
+ ]
+
+ # Replace non-printable bytes with dots.
+ for i, num in enumerate(hex_numbers):
+ if num == hex_chars[i]:
+ hex_chars[i] = '..'
+
+ hex_numbers_msg = ' '.join(hex_numbers)
+ hex_chars_msg = ' '.join(hex_chars)
+
+ _LOG.debug(
+ '%s%s',
+ prefix,
+ hex_numbers_msg,
+ extra=dict(
+ extra_metadata_fields={
+ 'msg': hex_numbers_msg,
+ 'view': 'hex',
+ }
+ ),
+ )
+ _LOG.debug(
+ '%s%s',
+ prefix,
+ hex_chars_msg,
+ extra=dict(
+ extra_metadata_fields={
+ 'msg': hex_chars_msg,
+ 'view': 'chars',
+ }
+ ),
+ )
+
+
+BANDWIDTH_HISTORY_CONTEXTVAR = ContextVar(
+ 'pw_console_bandwidth_history',
+ default={
+ 'total': EventCountHistory(interval=3),
+ 'read': EventCountHistory(interval=3),
+ 'write': EventCountHistory(interval=3),
+ },
+)
+
+
+class SerialBandwidthTracker:
+ """Tracks and logs the data read and written by a serial tranport."""
+
+ def __init__(self):
+ self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get()
+
+ def track_read_data(self, data: bytes) -> None:
+ """Tracks and logs data read."""
+ self.pw_bps_history['read'].log(len(data))
+ self.pw_bps_history['total'].log(len(data))
+
+ if len(data) > 0:
+ prefix = 'Read %2d B: ' % len(data)
+ _LOG.debug(
+ '%s%s',
+ prefix,
+ data,
+ extra=dict(
+ extra_metadata_fields={
+ 'mode': 'Read',
+ 'bytes': len(data),
+ 'view': 'bytes',
+ 'msg': str(data),
+ }
+ ),
+ )
+ _log_hex_strings(data, prefix=prefix)
+
+ # Print individual lines
+ for line in data.decode(
+ encoding='utf-8', errors='ignore'
+ ).splitlines():
+ _LOG.debug(
+ '%s',
+ line,
+ extra=dict(
+ extra_metadata_fields={
+ 'msg': line,
+ 'view': 'lines',
+ }
+ ),
+ )
+
+ def track_write_data(self, data: ReadableBuffer) -> None:
+ """Tracks and logs data to be written."""
+ if isinstance(data, bytes) and len(data) > 0:
+ self.pw_bps_history['write'].log(len(data))
+ self.pw_bps_history['total'].log(len(data))
+
+ prefix = 'Write %2d B: ' % len(data)
+ _LOG.debug(
+ '%s%s',
+ prefix,
+ data,
+ extra=dict(
+ extra_metadata_fields={
+ 'mode': 'Write',
+ 'bytes': len(data),
+ 'view': 'bytes',
+ 'msg': str(data),
+ }
+ ),
+ )
+ _log_hex_strings(data, prefix=prefix)
+
+ # Print individual lines
+ for line in data.decode(
+ encoding='utf-8', errors='ignore'
+ ).splitlines():
+ _LOG.debug(
+ '%s',
+ line,
+ extra=dict(
+ extra_metadata_fields={
+ 'msg': line,
+ 'view': 'lines',
+ }
+ ),
+ )
class BandwidthToolbar(WindowPaneToolbar, PluginMixin):