diff options
Diffstat (limited to 'pw_tokenizer/py/pw_tokenizer/parse_message.py')
-rw-r--r-- | pw_tokenizer/py/pw_tokenizer/parse_message.py | 114 |
1 files changed, 75 insertions, 39 deletions
diff --git a/pw_tokenizer/py/pw_tokenizer/parse_message.py b/pw_tokenizer/py/pw_tokenizer/parse_message.py index f8655e1f3..7f33e1ffa 100644 --- a/pw_tokenizer/py/pw_tokenizer/parse_message.py +++ b/pw_tokenizer/py/pw_tokenizer/parse_message.py @@ -40,11 +40,12 @@ PREFIX = '$' def attempt_to_decode( - arg_data: bytes, - format_specs: Collection[str] = DEFAULT_FORMAT_SPECS, - max_args: int = DEFAULT_MAX_ARGS, - yield_failures: bool = False) -> Iterator[FormattedString]: - """Attemps to decode arguments using the provided format specifiers.""" + arg_data: bytes, + format_specs: Collection[str] = DEFAULT_FORMAT_SPECS, + max_args: int = DEFAULT_MAX_ARGS, + yield_failures: bool = False, +) -> Iterator[FormattedString]: + """Attempts to decode arguments using the provided format specifiers.""" format_strings = [(0, '')] # (argument count, format string) # Each argument requires at least 1 byte. @@ -59,7 +60,8 @@ def attempt_to_decode( if arg_count < max_args: format_strings.extend( - (arg_count + 1, string + spec) for spec in format_specs) + (arg_count + 1, string + spec) for spec in format_specs + ) @dataclass(frozen=True) @@ -79,13 +81,16 @@ class TokenizedMessage: def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage': if not message.startswith(prefix): raise ValueError( - f'{message} does not start wtih {prefix!r} as expected') + f'{message} does not start with {prefix!r} as expected' + ) binary = base64.b64decode(message[1:]) if len(binary) < 4: - raise ValueError(f'{message} is only {len(binary)} bytes; ' - 'tokenized messages must be at least 4 bytes') + raise ValueError( + f'{message} is only {len(binary)} bytes; ' + 'tokenized messages must be at least 4 bytes' + ) return cls(message, binary) @@ -105,8 +110,12 @@ def _text_list(items: Sequence, conjunction: str = 'or') -> str: return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}' -def main(messages: Iterable[str], max_args: int, specs: Sequence[str], - show_failures: bool) -> int: +def main( + messages: Iterable[str], + max_args: int, + specs: Sequence[str], + show_failures: bool, +) -> int: """Parses the arguments for a series of tokenized messages.""" exit_code = 0 @@ -125,30 +134,47 @@ def main(messages: Iterable[str], max_args: int, specs: Sequence[str], exit_code = 2 continue - _LOG.info('Binary: %r [%s] (%d bytes)', parsed.binary, - parsed.binary.hex(' ', 1), len(parsed.binary)) + _LOG.info( + 'Binary: %r [%s] (%d bytes)', + parsed.binary, + parsed.binary.hex(' ', 1), + len(parsed.binary), + ) _LOG.info('Token: 0x%08x', parsed.token) - _LOG.info('Args: %r [%s] (%d bytes)', parsed.binary_args, - parsed.binary_args.hex(' ', 1), len(parsed.binary_args)) - _LOG.info('Decoding with up to %d %s arguments', max_args, - _text_list(specs)) - - results = sorted(attempt_to_decode(parsed.binary_args, specs, max_args, - show_failures), - key=FormattedString.score, - reverse=True) + _LOG.info( + 'Args: %r [%s] (%d bytes)', + parsed.binary_args, + parsed.binary_args.hex(' ', 1), + len(parsed.binary_args), + ) + _LOG.info( + 'Decoding with up to %d %s arguments', max_args, _text_list(specs) + ) + + results = sorted( + attempt_to_decode( + parsed.binary_args, specs, max_args, show_failures + ), + key=FormattedString.score, + reverse=True, + ) if not any(result.ok() for result in results): _LOG.warning( ' No combinations of up to %d %s arguments decoded ' - 'successfully', max_args, _text_list(specs)) + 'successfully', + max_args, + _text_list(specs), + ) exit_code = 1 for i, result in enumerate(results, 1): _LOG.info( # pylint: disable=logging-fstring-interpolation - f' Attempt %{len(str(len(results)))}d: [%s] %s', i, + f' Attempt %{len(str(len(results)))}d: [%s] %s', + i, ' '.join(str(a.specifier) for a in result.args), - ' '.join(str(a) for a in result.args)) + ' '.join(str(a) for a in result.args), + ) print() return exit_code @@ -157,23 +183,33 @@ def main(messages: Iterable[str], max_args: int, specs: Sequence[str], def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--max-args', - default=DEFAULT_MAX_ARGS, - type=int, - help='Maximum number of printf-style arguments') - parser.add_argument('--specs', - nargs='*', - default=DEFAULT_FORMAT_SPECS, - help='Which printf-style format specifiers to check') - parser.add_argument('--show-failures', - action='store_true', - help='Show argument combintations that fail to decode') + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + '--max-args', + default=DEFAULT_MAX_ARGS, + type=int, + help='Maximum number of printf-style arguments', + ) + parser.add_argument( + '--specs', + nargs='*', + default=DEFAULT_FORMAT_SPECS, + help='Which printf-style format specifiers to check', + ) + parser.add_argument( + '--show-failures', + action='store_true', + help='Show argument combintations that fail to decode', + ) parser.add_argument( 'messages', nargs='*', - help= - 'Base64-encoded tokenized messages to decode; omit to read from stdin') + help=( + 'Base64-encoded tokenized messages to decode; omit to read from ' + 'stdin' + ), + ) return parser.parse_args() |