aboutsummaryrefslogtreecommitdiff
path: root/pw_tokenizer/py/pw_tokenizer/parse_message.py
diff options
context:
space:
mode:
Diffstat (limited to 'pw_tokenizer/py/pw_tokenizer/parse_message.py')
-rw-r--r--pw_tokenizer/py/pw_tokenizer/parse_message.py114
1 files changed, 75 insertions, 39 deletions
diff --git a/pw_tokenizer/py/pw_tokenizer/parse_message.py b/pw_tokenizer/py/pw_tokenizer/parse_message.py
index f8655e1f3..7f33e1ffa 100644
--- a/pw_tokenizer/py/pw_tokenizer/parse_message.py
+++ b/pw_tokenizer/py/pw_tokenizer/parse_message.py
@@ -40,11 +40,12 @@ PREFIX = '$'
def attempt_to_decode(
- arg_data: bytes,
- format_specs: Collection[str] = DEFAULT_FORMAT_SPECS,
- max_args: int = DEFAULT_MAX_ARGS,
- yield_failures: bool = False) -> Iterator[FormattedString]:
- """Attemps to decode arguments using the provided format specifiers."""
+ arg_data: bytes,
+ format_specs: Collection[str] = DEFAULT_FORMAT_SPECS,
+ max_args: int = DEFAULT_MAX_ARGS,
+ yield_failures: bool = False,
+) -> Iterator[FormattedString]:
+ """Attempts to decode arguments using the provided format specifiers."""
format_strings = [(0, '')] # (argument count, format string)
# Each argument requires at least 1 byte.
@@ -59,7 +60,8 @@ def attempt_to_decode(
if arg_count < max_args:
format_strings.extend(
- (arg_count + 1, string + spec) for spec in format_specs)
+ (arg_count + 1, string + spec) for spec in format_specs
+ )
@dataclass(frozen=True)
@@ -79,13 +81,16 @@ class TokenizedMessage:
def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage':
if not message.startswith(prefix):
raise ValueError(
- f'{message} does not start wtih {prefix!r} as expected')
+ f'{message} does not start with {prefix!r} as expected'
+ )
binary = base64.b64decode(message[1:])
if len(binary) < 4:
- raise ValueError(f'{message} is only {len(binary)} bytes; '
- 'tokenized messages must be at least 4 bytes')
+ raise ValueError(
+ f'{message} is only {len(binary)} bytes; '
+ 'tokenized messages must be at least 4 bytes'
+ )
return cls(message, binary)
@@ -105,8 +110,12 @@ def _text_list(items: Sequence, conjunction: str = 'or') -> str:
return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}'
-def main(messages: Iterable[str], max_args: int, specs: Sequence[str],
- show_failures: bool) -> int:
+def main(
+ messages: Iterable[str],
+ max_args: int,
+ specs: Sequence[str],
+ show_failures: bool,
+) -> int:
"""Parses the arguments for a series of tokenized messages."""
exit_code = 0
@@ -125,30 +134,47 @@ def main(messages: Iterable[str], max_args: int, specs: Sequence[str],
exit_code = 2
continue
- _LOG.info('Binary: %r [%s] (%d bytes)', parsed.binary,
- parsed.binary.hex(' ', 1), len(parsed.binary))
+ _LOG.info(
+ 'Binary: %r [%s] (%d bytes)',
+ parsed.binary,
+ parsed.binary.hex(' ', 1),
+ len(parsed.binary),
+ )
_LOG.info('Token: 0x%08x', parsed.token)
- _LOG.info('Args: %r [%s] (%d bytes)', parsed.binary_args,
- parsed.binary_args.hex(' ', 1), len(parsed.binary_args))
- _LOG.info('Decoding with up to %d %s arguments', max_args,
- _text_list(specs))
-
- results = sorted(attempt_to_decode(parsed.binary_args, specs, max_args,
- show_failures),
- key=FormattedString.score,
- reverse=True)
+ _LOG.info(
+ 'Args: %r [%s] (%d bytes)',
+ parsed.binary_args,
+ parsed.binary_args.hex(' ', 1),
+ len(parsed.binary_args),
+ )
+ _LOG.info(
+ 'Decoding with up to %d %s arguments', max_args, _text_list(specs)
+ )
+
+ results = sorted(
+ attempt_to_decode(
+ parsed.binary_args, specs, max_args, show_failures
+ ),
+ key=FormattedString.score,
+ reverse=True,
+ )
if not any(result.ok() for result in results):
_LOG.warning(
' No combinations of up to %d %s arguments decoded '
- 'successfully', max_args, _text_list(specs))
+ 'successfully',
+ max_args,
+ _text_list(specs),
+ )
exit_code = 1
for i, result in enumerate(results, 1):
_LOG.info( # pylint: disable=logging-fstring-interpolation
- f' Attempt %{len(str(len(results)))}d: [%s] %s', i,
+ f' Attempt %{len(str(len(results)))}d: [%s] %s',
+ i,
' '.join(str(a.specifier) for a in result.args),
- ' '.join(str(a) for a in result.args))
+ ' '.join(str(a) for a in result.args),
+ )
print()
return exit_code
@@ -157,23 +183,33 @@ def main(messages: Iterable[str], max_args: int, specs: Sequence[str],
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=__doc__,
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--max-args',
- default=DEFAULT_MAX_ARGS,
- type=int,
- help='Maximum number of printf-style arguments')
- parser.add_argument('--specs',
- nargs='*',
- default=DEFAULT_FORMAT_SPECS,
- help='Which printf-style format specifiers to check')
- parser.add_argument('--show-failures',
- action='store_true',
- help='Show argument combintations that fail to decode')
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+ parser.add_argument(
+ '--max-args',
+ default=DEFAULT_MAX_ARGS,
+ type=int,
+ help='Maximum number of printf-style arguments',
+ )
+ parser.add_argument(
+ '--specs',
+ nargs='*',
+ default=DEFAULT_FORMAT_SPECS,
+ help='Which printf-style format specifiers to check',
+ )
+ parser.add_argument(
+ '--show-failures',
+ action='store_true',
+ help='Show argument combintations that fail to decode',
+ )
parser.add_argument(
'messages',
nargs='*',
- help=
- 'Base64-encoded tokenized messages to decode; omit to read from stdin')
+ help=(
+ 'Base64-encoded tokenized messages to decode; omit to read from '
+ 'stdin'
+ ),
+ )
return parser.parse_args()