aboutsummaryrefslogtreecommitdiff
path: root/pw_containers/py/pw_containers/variable_length_entry_queue.py
blob: 2c6e53d12e668964b48b21ee622c5f3d46b1f2ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Decodes the in-memory representation of a sized-entry ring buffer."""

import struct
from typing import Iterable, Tuple

_HEADER = struct.Struct('III')  # data_size_bytes, head, tail


def _decode_leb128(
    data: bytes, offset: int = 0, max_bits: int = 32
) -> Tuple[int, int]:
    count = value = shift = 0

    while offset < len(data):
        byte = data[offset]

        count += 1
        value |= (byte & 0x7F) << shift

        if not byte & 0x80:
            return offset + count, value

        shift += 7
        if shift >= max_bits:
            raise ValueError(f'Varint exceeded {max_bits}-bit limit')

    raise ValueError(f'Unterminated varint {data[offset:]!r}')


def parse(queue: bytes) -> Iterable[bytes]:
    """Decodes the in-memory representation of a variable-length entry queue.

    Args:
      queue: The bytes representation of a variable-length entry queue.

    Yields:
      Each entry in the buffer as bytes.
    """
    array_size_bytes, head, tail = _HEADER.unpack_from(queue)

    total_encoded_size = _HEADER.size + array_size_bytes
    if len(queue) < total_encoded_size:
        raise ValueError(
            f'Ring buffer data ({len(queue)} B) is smaller than the encoded '
            f'size ({total_encoded_size} B)'
        )

    data = queue[_HEADER.size : total_encoded_size]

    if tail < head:
        data = data[head:] + data[:tail]
    else:
        data = data[head:tail]

    index = 0
    while index < len(data):
        index, size = _decode_leb128(data, index)

        if index + size > len(data):
            raise ValueError(
                f'Corruption detected; '
                f'encoded size {size} B is too large for a {len(data)} B array'
            )
        yield data[index : index + size]

        index += size