1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Wrapers for pyserial classes to log read and write data."""
import collections
import logging
from dataclasses import dataclass
import time
from typing import Optional
_LOG = logging.getLogger('pw_console')
@dataclass
class EventCountHistory:
"""Track counts of events over time.
Example usage: ::
events = EventCountHistory(
base_count_units='Bytes',
display_unit_title='KiB/s',
display_unit_factor=0.001,
interval=1.0,
show_sparkline=True)
# Log 1 event now.
events.log(1)
time.sleep(1)
# Log 100 events at this time.
events.log(100)
time.sleep(1)
events.log(200)
time.sleep(1)
events.log(400)
print(events)
▂▄█ 0.400 [KiB/s]
"""
base_count_units: str = 'Bytes'
display_unit_title: str = 'KiB/s'
display_unit_factor: float = 0.001
interval: float = 1.0 # Number of seconds per sum of events.
history_limit: int = 20
scale_characters = ' ▁▂▃▄▅▆▇█'
history: collections.deque = collections.deque()
show_sparkline: bool = False
_this_count: int = 0
_last_count: int = 0
_last_update_time: float = time.time()
def log(self, count: int) -> None:
self._this_count += count
this_time = time.time()
if this_time - self._last_update_time >= self.interval:
self._last_update_time = this_time
self._last_count = self._this_count
self._this_count = 0
self.history.append(self._last_count)
if len(self.history) > self.history_limit:
self.history.popleft()
def last_count(self) -> float:
return self._last_count * self.display_unit_factor
def last_count_raw(self) -> int:
return self._last_count
def last_count_with_units(self) -> str:
return '{:.3f} [{}]'.format(
self._last_count * self.display_unit_factor, self.display_unit_title
)
def __repr__(self) -> str:
sparkline = ''
if self.show_sparkline:
sparkline = self.sparkline()
return ' '.join([sparkline, self.last_count_with_units()])
def __pt_formatted_text__(self):
return [('', self.__repr__())]
def sparkline(
self, min_value: int = 0, max_value: Optional[int] = None
) -> str:
msg = ''.rjust(self.history_limit)
if len(self.history) == 0:
return msg
minimum = min_value
maximum = max_value if max_value else max(self.history)
max_minus_min = maximum - min_value
if max_minus_min == 0:
return msg
msg = ''
for i in self.history:
# (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
index = int(
(((1.0 * i) - minimum) / max_minus_min)
* len(self.scale_characters)
)
if index >= len(self.scale_characters):
index = len(self.scale_characters) - 1
msg += self.scale_characters[index]
return msg.rjust(self.history_limit)
|