diff options
Diffstat (limited to 'pw_console/py/pw_console/plugins/bandwidth_toolbar.py')
-rw-r--r-- | pw_console/py/pw_console/plugins/bandwidth_toolbar.py | 152 |
1 files changed, 150 insertions, 2 deletions
diff --git a/pw_console/py/pw_console/plugins/bandwidth_toolbar.py b/pw_console/py/pw_console/plugins/bandwidth_toolbar.py index e0135a6c5..2f4ecfdc9 100644 --- a/pw_console/py/pw_console/plugins/bandwidth_toolbar.py +++ b/pw_console/py/pw_console/plugins/bandwidth_toolbar.py @@ -11,13 +11,161 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. -"""Bandwidth Monitor Toolbar""" +"""Bandwidth Monitor Toolbar and Tracker.""" +from __future__ import annotations + +from contextvars import ContextVar +import logging +import textwrap +from typing import TYPE_CHECKING from prompt_toolkit.layout import WindowAlign from pw_console.plugin_mixin import PluginMixin from pw_console.widgets import ToolbarButton, WindowPaneToolbar -from pw_console.pyserial_wrapper import BANDWIDTH_HISTORY_CONTEXTVAR +from pw_console.widgets.event_count_history import EventCountHistory + +if TYPE_CHECKING: + from _typeshed import ReadableBuffer + +_LOG = logging.getLogger('pw_console.serial_debug_logger') + + +def _log_hex_strings(data: bytes, prefix=''): + """Create alinged hex number and character view log messages.""" + # Make a list of 2 character hex number strings. + hex_numbers = textwrap.wrap(data.hex(), 2) + + hex_chars = [ + ('<' + str(b.to_bytes(1, byteorder='big')) + '>') + .replace("<b'\\x", '', 1) # Remove b'\x from the beginning + .replace("<b'", '', 1) # Remove b' from the beginning + .replace("'>", '', 1) # Remove ' from the end + .rjust(2) + for b in data + ] + + # Replace non-printable bytes with dots. + for i, num in enumerate(hex_numbers): + if num == hex_chars[i]: + hex_chars[i] = '..' + + hex_numbers_msg = ' '.join(hex_numbers) + hex_chars_msg = ' '.join(hex_chars) + + _LOG.debug( + '%s%s', + prefix, + hex_numbers_msg, + extra=dict( + extra_metadata_fields={ + 'msg': hex_numbers_msg, + 'view': 'hex', + } + ), + ) + _LOG.debug( + '%s%s', + prefix, + hex_chars_msg, + extra=dict( + extra_metadata_fields={ + 'msg': hex_chars_msg, + 'view': 'chars', + } + ), + ) + + +BANDWIDTH_HISTORY_CONTEXTVAR = ContextVar( + 'pw_console_bandwidth_history', + default={ + 'total': EventCountHistory(interval=3), + 'read': EventCountHistory(interval=3), + 'write': EventCountHistory(interval=3), + }, +) + + +class SerialBandwidthTracker: + """Tracks and logs the data read and written by a serial tranport.""" + + def __init__(self): + self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get() + + def track_read_data(self, data: bytes) -> None: + """Tracks and logs data read.""" + self.pw_bps_history['read'].log(len(data)) + self.pw_bps_history['total'].log(len(data)) + + if len(data) > 0: + prefix = 'Read %2d B: ' % len(data) + _LOG.debug( + '%s%s', + prefix, + data, + extra=dict( + extra_metadata_fields={ + 'mode': 'Read', + 'bytes': len(data), + 'view': 'bytes', + 'msg': str(data), + } + ), + ) + _log_hex_strings(data, prefix=prefix) + + # Print individual lines + for line in data.decode( + encoding='utf-8', errors='ignore' + ).splitlines(): + _LOG.debug( + '%s', + line, + extra=dict( + extra_metadata_fields={ + 'msg': line, + 'view': 'lines', + } + ), + ) + + def track_write_data(self, data: ReadableBuffer) -> None: + """Tracks and logs data to be written.""" + if isinstance(data, bytes) and len(data) > 0: + self.pw_bps_history['write'].log(len(data)) + self.pw_bps_history['total'].log(len(data)) + + prefix = 'Write %2d B: ' % len(data) + _LOG.debug( + '%s%s', + prefix, + data, + extra=dict( + extra_metadata_fields={ + 'mode': 'Write', + 'bytes': len(data), + 'view': 'bytes', + 'msg': str(data), + } + ), + ) + _log_hex_strings(data, prefix=prefix) + + # Print individual lines + for line in data.decode( + encoding='utf-8', errors='ignore' + ).splitlines(): + _LOG.debug( + '%s', + line, + extra=dict( + extra_metadata_fields={ + 'msg': line, + 'view': 'lines', + } + ), + ) class BandwidthToolbar(WindowPaneToolbar, PluginMixin): |