aboutsummaryrefslogtreecommitdiff
path: root/pw_console/py/pw_console/widgets/event_count_history.py
blob: 242f615e448b80e51e318fc120d21fe6a2ac81f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Wrapers for pyserial classes to log read and write data."""

import collections
import logging
from dataclasses import dataclass
import time
from typing import Optional

_LOG = logging.getLogger('pw_console')


@dataclass
class EventCountHistory:
    """Track counts of events over time.

    Example usage: ::

        events = EventCountHistory(
            base_count_units='Bytes',
            display_unit_title='KiB/s',
            display_unit_factor=0.001,
            interval=1.0,
            show_sparkline=True)

        # Log 1 event now.
        events.log(1)
        time.sleep(1)

        # Log 100 events at this time.
        events.log(100)
        time.sleep(1)

        events.log(200)
        time.sleep(1)
        events.log(400)
        print(events)
        ▂▄█ 0.400 [KiB/s]

    """

    base_count_units: str = 'Bytes'
    display_unit_title: str = 'KiB/s'
    display_unit_factor: float = 0.001
    interval: float = 1.0  # Number of seconds per sum of events.
    history_limit: int = 20
    scale_characters = ' ▁▂▃▄▅▆▇█'
    history: collections.deque = collections.deque()
    show_sparkline: bool = False
    _this_count: int = 0
    _last_count: int = 0
    _last_update_time: float = time.time()

    def log(self, count: int) -> None:
        self._this_count += count

        this_time = time.time()
        if this_time - self._last_update_time >= self.interval:
            self._last_update_time = this_time
            self._last_count = self._this_count
            self._this_count = 0
            self.history.append(self._last_count)

            if len(self.history) > self.history_limit:
                self.history.popleft()

    def last_count(self) -> float:
        return self._last_count * self.display_unit_factor

    def last_count_raw(self) -> int:
        return self._last_count

    def last_count_with_units(self) -> str:
        return '{:.3f} [{}]'.format(
            self._last_count * self.display_unit_factor, self.display_unit_title
        )

    def __repr__(self) -> str:
        sparkline = ''
        if self.show_sparkline:
            sparkline = self.sparkline()
        return ' '.join([sparkline, self.last_count_with_units()])

    def __pt_formatted_text__(self):
        return [('', self.__repr__())]

    def sparkline(
        self, min_value: int = 0, max_value: Optional[int] = None
    ) -> str:
        msg = ''.rjust(self.history_limit)
        if len(self.history) == 0:
            return msg

        minimum = min_value
        maximum = max_value if max_value else max(self.history)
        max_minus_min = maximum - min_value
        if max_minus_min == 0:
            return msg

        msg = ''
        for i in self.history:
            # (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
            index = int(
                (((1.0 * i) - minimum) / max_minus_min)
                * len(self.scale_characters)
            )
            if index >= len(self.scale_characters):
                index = len(self.scale_characters) - 1
            msg += self.scale_characters[index]
        return msg.rjust(self.history_limit)