diff options
author | Henri Chataing <henrichataing@google.com> | 2024-03-27 17:12:42 -0700 |
---|---|---|
committer | hchataing <104974782+hchataing@users.noreply.github.com> | 2024-04-01 16:37:50 -0700 |
commit | b5006afdc4533af8fa4d5e6e0674f2a6a5c1af45 (patch) | |
tree | c1eec21f5011084c70f57ebf3f4a77312479b6bc | |
parent | 45c2327092a065d4e8c675e14e62dfbfe99fd47c (diff) | |
download | pica-b5006afdc4533af8fa4d5e6e0674f2a6a5c1af45.tar.gz |
Review the implementation of Core Set Config and Core Get Config
-rw-r--r-- | py/pica/pica/packets/uci.py | 2645 | ||||
-rw-r--r-- | src/device.rs | 149 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/mac_address.rs | 14 | ||||
-rw-r--r-- | src/uci_packets.pdl | 40 | ||||
-rwxr-xr-x | tests/data_transfer.py | 40 | ||||
-rw-r--r-- | tests/helper.py | 8 | ||||
-rwxr-xr-x | tests/ranging.py | 28 |
8 files changed, 903 insertions, 2023 deletions
diff --git a/py/pica/pica/packets/uci.py b/py/pica/pica/packets/uci.py index 0043c45..574feab 100644 --- a/py/pica/pica/packets/uci.py +++ b/py/pica/pica/packets/uci.py @@ -94,16 +94,30 @@ class PacketBoundaryFlag(enum.IntEnum): raise exn +class MessageType(enum.IntEnum): + DATA = 0x0 + COMMAND = 0x1 + RESPONSE = 0x2 + NOTIFICATION = 0x3 + + @staticmethod + def from_int(v: int) -> Union[int, 'MessageType']: + try: + return MessageType(v) + except ValueError as exn: + raise exn + + class GroupId(enum.IntEnum): CORE = 0x0 SESSION_CONFIG = 0x1 SESSION_CONTROL = 0x2 DATA_CONTROL = 0x3 - TEST = 0xd VENDOR_RESERVED_9 = 0x9 VENDOR_RESERVED_A = 0xa VENDOR_RESERVED_B = 0xb VENDOR_ANDROID = 0xc + TEST = 0xd VENDOR_RESERVED_E = 0xe VENDOR_RESERVED_F = 0xf @@ -127,154 +141,112 @@ class DataPacketFormat(enum.IntEnum): raise exn -class GroupIdOrDataPacketFormat(enum.IntEnum): - CORE = 0x0 - SESSION_CONFIG_OR_DATA_SND = 0x1 - SESSION_CONTROL_OR_DATA_RCV = 0x2 - DATA_CONTROL = 0x3 - TEST = 0xd - VENDOR_RESERVED_9 = 0x9 - VENDOR_RESERVED_A = 0xa - VENDOR_RESERVED_B = 0xb - VENDOR_ANDROID = 0xc - VENDOR_RESERVED_E = 0xe - VENDOR_RESERVED_F = 0xf - - @staticmethod - def from_int(v: int) -> Union[int, 'GroupIdOrDataPacketFormat']: - try: - return GroupIdOrDataPacketFormat(v) - except ValueError as exn: - raise exn - - -class CoreOpCode(enum.IntEnum): - CORE_DEVICE_RESET = 0x0 - CORE_DEVICE_STATUS_NTF = 0x1 - CORE_DEVICE_INFO = 0x2 - CORE_GET_CAPS_INFO = 0x3 - CORE_SET_CONFIG = 0x4 - CORE_GET_CONFIG = 0x5 - CORE_DEVICE_SUSPEND = 0x6 - CORE_GENERIC_ERROR_NTF = 0x7 - CORE_QUERY_UWBS_TIMESTAMP = 0x8 +class CoreOpcodeId(enum.IntEnum): + DEVICE_RESET = 0x0 + DEVICE_STATUS = 0x1 + GET_DEVICE_INFO = 0x2 + GET_CAPS_INFO = 0x3 + SET_CONFIG = 0x4 + GET_CONFIG = 0x5 + GENERIC_ERROR = 0x7 + QUERY_UWBS_TIMESTAMP = 0x8 @staticmethod - def from_int(v: int) -> Union[int, 'CoreOpCode']: + def from_int(v: int) -> Union[int, 'CoreOpcodeId']: try: - return CoreOpCode(v) + return CoreOpcodeId(v) except ValueError as exn: raise exn -class SessionConfigOpCode(enum.IntEnum): - SESSION_INIT = 0x0 - SESSION_DEINIT = 0x1 - SESSION_STATUS_NTF = 0x2 - SESSION_SET_APP_CONFIG = 0x3 - SESSION_GET_APP_CONFIG = 0x4 - SESSION_GET_COUNT = 0x5 - SESSION_GET_STATE = 0x6 - SESSION_UPDATE_CONTROLLER_MULTICAST_LIST = 0x7 - SESSION_UPDATE_ACTIVE_ROUNDS_ANCHOR = 0x8 - SESSION_UPDATE_ACTIVE_ROUNDS_DT_TAG = 0x9 - SESSION_SET_INITIATOR_DT_ANCHOR_RR_RDM_LIST = 0xa - SESSION_QUERY_DATA_SIZE_IN_RANGING = 0xb - SESSION_SET_HUS_CONFIG = 0xc +class SessionConfigOpcodeId(enum.IntEnum): + INIT = 0x0 + DEINIT = 0x1 + STATUS = 0x2 + SET_APP_CONFIG = 0x3 + GET_APP_CONFIG = 0x4 + GET_COUNT = 0x5 + GET_STATE = 0x6 + UPDATE_CONTROLLER_MULTICAST_LIST = 0x7 + UPDATE_DT_ANCHOR_RANGING_ROUNDS = 0x8 + UPDATE_DT_TAG_RANGING_ROUNDS = 0x9 + QUERY_DATA_SIZE_IN_RANGING = 0xb @staticmethod - def from_int(v: int) -> Union[int, 'SessionConfigOpCode']: + def from_int(v: int) -> Union[int, 'SessionConfigOpcodeId']: try: - return SessionConfigOpCode(v) + return SessionConfigOpcodeId(v) except ValueError as exn: raise exn -class SessionControlOpCode(enum.IntEnum): - SESSION_START = 0x0 - SESSION_STOP = 0x1 - SESSION_RESERVED = 0x2 - SESSION_GET_RANGING_COUNT = 0x3 - SESSION_DATA_CREDIT_NTF = 0x4 - SESSION_DATA_TRANSFER_STATUS_NTF = 0x5 +class SessionControlOpcodeId(enum.IntEnum): + START = 0x0 + STOP = 0x1 + GET_RANGING_COUNT = 0x3 + DATA_CREDIT = 0x4 + DATA_TRANSFER_STATUS = 0x5 @staticmethod - def from_int(v: int) -> Union[int, 'SessionControlOpCode']: + def from_int(v: int) -> Union[int, 'SessionControlOpcodeId']: try: - return SessionControlOpCode(v) + return SessionControlOpcodeId(v) except ValueError as exn: raise exn -class AppDataOpCode(enum.IntEnum): - APP_DATA_TX = 0x0 - APP_DATA_RX = 0x1 +class AndroidOpcodeId(enum.IntEnum): + GET_POWER_STATS = 0x0 + SET_COUNTRY_CODE = 0x1 + FIRA_RANGE_DIAGNOSTICS = 0x2 @staticmethod - def from_int(v: int) -> Union[int, 'AppDataOpCode']: + def from_int(v: int) -> Union[int, 'AndroidOpcodeId']: try: - return AppDataOpCode(v) + return AndroidOpcodeId(v) except ValueError as exn: raise exn -class AndroidOpCode(enum.IntEnum): - ANDROID_GET_POWER_STATS = 0x0 - ANDROID_SET_COUNTRY_CODE = 0x1 - ANDROID_FIRA_RANGE_DIAGNOSTICS = 0x2 - - @staticmethod - def from_int(v: int) -> Union[int, 'AndroidOpCode']: - try: - return AndroidOpCode(v) - except ValueError as exn: - raise exn - - -class StatusCode(enum.IntEnum): - UCI_STATUS_OK = 0x0 - UCI_STATUS_REJECTED = 0x1 - UCI_STATUS_FAILED = 0x2 - UCI_STATUS_SYNTAX_ERROR = 0x3 - UCI_STATUS_INVALID_PARAM = 0x4 - UCI_STATUS_INVALID_RANGE = 0x5 - UCI_STATUS_INVALID_MSG_SIZE = 0x6 - UCI_STATUS_UNKNOWN_GID = 0x7 - UCI_STATUS_UNKNOWN_OID = 0x8 - UCI_STATUS_READ_ONLY = 0x9 - UCI_STATUS_COMMAND_RETRY = 0xa - UCI_STATUS_UNKNOWN = 0xb - UCI_STATUS_NOT_APPLICABLE = 0xc - UCI_STATUS_SESSION_NOT_EXIST = 0x11 - UCI_STATUS_SESSION_DUPLICATE = 0x12 - UCI_STATUS_SESSION_ACTIVE = 0x13 - UCI_STATUS_MAX_SESSIONS_EXCEEDED = 0x14 - UCI_STATUS_SESSION_NOT_CONFIGURED = 0x15 - UCI_STATUS_ACTIVE_SESSIONS_ONGOING = 0x16 - UCI_STATUS_MULTICAST_LIST_FULL = 0x17 - UCI_STATUS_ADDRESS_NOT_FOUND = 0x18 - UCI_STATUS_ADDRESS_ALREADY_PRESENT = 0x19 - UCI_STATUS_ERROR_UWB_INITIATION_TIME_TOO_OLD = 0x1a - UCI_STATUS_OK_NEGATIVE_DISTANCE_REPORT = 0x1b - UCI_STATUS_RANGING_TX_FAILED = 0x20 - UCI_STATUS_RANGING_RX_TIMEOUT = 0x21 - UCI_STATUS_RANGING_RX_PHY_DEC_FAILED = 0x22 - UCI_STATUS_RANGING_RX_PHY_TOA_FAILED = 0x23 - UCI_STATUS_RANGING_RX_PHY_STS_FAILED = 0x24 - UCI_STATUS_RANGING_RX_MAC_DEC_FAILED = 0x25 - UCI_STATUS_RANGING_RX_MAC_IE_DEC_FAILED = 0x26 - UCI_STATUS_RANGING_RX_MAC_IE_MISSING = 0x27 - UCI_STATUS_ERROR_ROUND_INDEX_NOT_ACTIVATED = 0x28 - UCI_STATUS_ERROR_NUMBER_OF_ACTIVE_RANGING_ROUNDS_EXCEEDED = 0x29 - UCI_STATUS_ERROR_DL_TDOA_DEVICE_ADDRESS_NOT_MATCHING_IN_REPLY_TIME_LIST = 0x2a - UCI_STATUS_DATA_MAX_TX_PSDU_SIZE_EXCEEDED = 0x30 - UCI_STATUS_DATA_RX_CRC_ERROR = 0x31 - VENDOR_SPECIFIC_STATUS_CODE_2 = 0xff - - @staticmethod - def from_int(v: int) -> Union[int, 'StatusCode']: - try: - return StatusCode(v) +class Status(enum.IntEnum): + OK = 0x0 + REJECTED = 0x1 + FAILED = 0x2 + SYNTAX_ERROR = 0x3 + INVALID_PARAM = 0x4 + INVALID_RANGE = 0x5 + INVALID_MESSAGE_SIZE = 0x6 + UNKNOWN_GID = 0x7 + UNKNOWN_OID = 0x8 + READ_ONLY = 0x9 + UCI_MESSAGE_RETRY = 0xa + UNKNOWN = 0xb + NOT_APPLICABLE = 0xc + ERROR_SESSION_NOT_EXIST = 0x11 + ERROR_SESSION_DUPLICATE = 0x12 + ERROR_SESSION_ACTIVE = 0x13 + ERROR_MAX_SESSIONS_EXCEEDED = 0x14 + ERROR_SESSION_NOT_CONFIGURED = 0x15 + ERROR_ACTIVE_SESSIONS_ONGOING = 0x16 + ERROR_MULTICAST_LIST_FULL = 0x17 + ERROR_UWB_INITIATION_TIME_TOO_OLD = 0x1a + OK_NEGATIVE_DISTANCE_REPORT = 0x1b + RANGING_TX_FAILED = 0x20 + RANGING_RX_TIMEOUT = 0x21 + RANGING_RX_PHY_DEC_FAILED = 0x22 + RANGING_RX_PHY_TOA_FAILED = 0x23 + RANGING_RX_PHY_STS_FAILED = 0x24 + RANGING_RX_MAC_DEC_FAILED = 0x25 + RANGING_RX_MAC_IE_DEC_FAILED = 0x26 + RANGING_RX_MAC_IE_MISSING = 0x27 + ERROR_ROUND_INDEX_NOT_ACTIVATED = 0x28 + ERROR_NUMBER_OF_ACTIVE_RANGING_ROUNDS_EXCEEDED = 0x29 + ERROR_DL_TDOA_DEVICE_ADDRESS_NOT_MATCHING_IN_REPLY_TIME_LIST = 0x2a + + @staticmethod + def from_int(v: int) -> Union[int, 'Status']: + try: + return Status(v) except ValueError as exn: return v @@ -333,18 +305,6 @@ class ResetConfig(enum.IntEnum): raise exn -class DeviceConfigId(enum.IntEnum): - DEVICE_STATE = 0x0 - LOW_POWER_MODE = 0x1 - - @staticmethod - def from_int(v: int) -> Union[int, 'DeviceConfigId']: - try: - return DeviceConfigId(v) - except ValueError as exn: - raise exn - - class AppConfigTlvType(enum.IntEnum): DEVICE_TYPE = 0x0 RANGING_ROUND_USAGE = 0x1 @@ -982,20 +942,21 @@ class ReasonCode(enum.IntEnum): return v -class MulticastUpdateStatusCode(enum.IntEnum): - STATUS_OK_MULTICAST_LIST_UPDATE = 0x0 - STATUS_ERROR_MULTICAST_LIST_FULL = 0x1 - STATUS_ERROR_KEY_FETCH_FAIL = 0x2 - STATUS_ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3 - STATUS_ERROR_SUB_SESSION_KEY_NOT_FOUND = 0x5 - STATUS_ERROR_SUB_SESSION_KEY_NOT_APPLICABLE = 0x6 - STATUS_ERROR_SESSION_KEY_NOT_FOUND = 0x7 - STATUS_ERROR_ADDRESS_ALREADY_PRESENT = 0x8 +class MulticastUpdateStatus(enum.IntEnum): + OK_MULTICAST_LIST_UPDATE = 0x0 + ERROR_MULTICAST_LIST_FULL = 0x1 + ERROR_KEY_FETCH_FAIL = 0x2 + ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3 + ERROR_SUB_SESSION_KEY_NOT_FOUND = 0x4 + ERROR_SUB_SESSION_KEY_NOT_APPLICABLE = 0x5 + ERROR_SESSION_KEY_NOT_FOUND = 0x6 + ERROR_ADDRESS_NOT_FOUND = 0x7 + ERROR_ADDRESS_ALREADY_PRESENT = 0x8 @staticmethod - def from_int(v: int) -> Union[int, 'MulticastUpdateStatusCode']: + def from_int(v: int) -> Union[int, 'MulticastUpdateStatus']: try: - return MulticastUpdateStatusCode(v) + return MulticastUpdateStatus(v) except ValueError as exn: raise exn @@ -1030,22 +991,6 @@ class SessionType(enum.IntEnum): raise exn -class MessageType(enum.IntEnum): - DATA = 0x0 - COMMAND = 0x1 - RESPONSE = 0x2 - NOTIFICATION = 0x3 - RESERVED_FOR_TESTING_1 = 0x4 - RESERVED_FOR_TESTING_2 = 0x5 - - @staticmethod - def from_int(v: int) -> Union[int, 'MessageType']: - try: - return MessageType(v) - except ValueError as exn: - raise exn - - @dataclass class CommonPacketHeader(Packet): pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE) @@ -1159,7 +1104,6 @@ class DataPacketHeader(Packet): class ControlPacket(Packet): gid: GroupId = field(kw_only=True, default=GroupId.CORE) mt: MessageType = field(kw_only=True, default=MessageType.DATA) - opcode: int = field(kw_only=True, default=0) def __post_init__(self): pass @@ -1167,210 +1111,28 @@ class ControlPacket(Packet): @staticmethod def parse(span: bytes) -> Tuple['ControlPacket', bytes]: fields = {'payload': None} - if len(span) < 4: + if len(span) < 1: raise Exception('Invalid packet size') fields['gid'] = GroupId.from_int((span[0] >> 0) & 0xf) fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7) - fields['opcode'] = (span[1] >> 0) & 0x3f - value_ = int.from_bytes(span[2:4], byteorder='little') - span = span[4:] + span = span[1:] payload = span span = bytes([]) fields['payload'] = payload try: - return DeviceResetCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetDeviceInfoCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetCapsInfoCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SetConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return CoreQueryTimeStampCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionInitCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionDeinitCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetAppConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetAppConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetCountCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetStateCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetHybridConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionQueryMaxDataSizeCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionControlCommand.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidGetPowerStatsCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidSetCountryCodeCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return DeviceResetRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetDeviceInfoRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetCapsInfoRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SetConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return CoreQueryTimeStampRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionInitRsp_V2.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionInitRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionDeinitRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetAppConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetAppConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetCountRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetStateRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateDtTagRangingRoundsRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetHybridConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateControllerMulticastListRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionQueryMaxDataSizeRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionStartRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionStopRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetRangingCountRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidGetPowerStatsRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidSetCountryCodeRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return DeviceStatusNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GenericError.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionStatusNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return DataCreditNtf.parse(fields.copy(), payload) + return CorePacket.parse(fields.copy(), payload) except Exception as exn: pass try: - return DataTransferStatusNtf.parse(fields.copy(), payload) + return SessionConfigPacket.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionInfoNtf.parse(fields.copy(), payload) + return SessionControlPacket.parse(fields.copy(), payload) except Exception as exn: pass try: - return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload) + return AndroidPacket.parse(fields.copy(), payload) except Exception as exn: pass return ControlPacket(**fields), span @@ -1382,17 +1144,12 @@ class ControlPacket(Packet): (self.mt << 5) ) _span.append(_value) - if self.opcode > 63: - print(f"Invalid value for field ControlPacket::opcode: {self.opcode} > 63; the value will be truncated") - self.opcode &= 63 - _span.append((self.opcode << 0)) - _span.extend([0] * 2) _span.extend(payload or self.payload or []) return bytes(_span) @property def size(self) -> int: - return len(self.payload) + 4 + return len(self.payload) + 1 @dataclass class DataPacket(Packet): @@ -1500,7 +1257,7 @@ class DataMessageSnd(DataPacket): @dataclass class DataMessageRcv(DataPacket): session_handle: int = field(kw_only=True, default=0) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) source_address: int = field(kw_only=True, default=0) data_sequence_number: int = field(kw_only=True, default=0) application_data: bytearray = field(kw_only=True, default_factory=bytearray) @@ -1517,7 +1274,7 @@ class DataMessageRcv(DataPacket): raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:4], byteorder='little') fields['session_handle'] = value_ - fields['status'] = StatusCode.from_int(span[4]) + fields['status'] = Status.from_int(span[4]) value_ = int.from_bytes(span[5:13], byteorder='little') fields['source_address'] = value_ value_ = int.from_bytes(span[13:15], byteorder='little') @@ -1555,142 +1312,114 @@ class DataMessageRcv(DataPacket): return len(self.application_data) * 1 + 17 @dataclass -class UciCommand(ControlPacket): - +class CorePacket(ControlPacket): + oid: CoreOpcodeId = field(kw_only=True, default=CoreOpcodeId.DEVICE_RESET) def __post_init__(self): - self.mt = MessageType.COMMAND + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciCommand', bytes]: - if fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['CorePacket', bytes]: + if fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") + if len(span) < 3: + raise Exception('Invalid packet size') + fields['oid'] = CoreOpcodeId.from_int((span[0] >> 0) & 0x3f) + value_ = int.from_bytes(span[1:3], byteorder='little') + span = span[3:] payload = span span = bytes([]) fields['payload'] = payload try: - return DeviceResetCmd.parse(fields.copy(), payload) + return CoreDeviceResetCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return GetDeviceInfoCmd.parse(fields.copy(), payload) + return CoreDeviceResetRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return GetCapsInfoCmd.parse(fields.copy(), payload) + return CoreDeviceStatusNtf.parse(fields.copy(), payload) except Exception as exn: pass try: - return SetConfigCmd.parse(fields.copy(), payload) + return CoreGetDeviceInfoCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return GetConfigCmd.parse(fields.copy(), payload) + return CoreGetDeviceInfoRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return CoreQueryTimeStampCmd.parse(fields.copy(), payload) + return CoreGetCapsInfoCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionInitCmd.parse(fields.copy(), payload) + return CoreGetCapsInfoRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionDeinitCmd.parse(fields.copy(), payload) + return CoreSetConfigCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionSetAppConfigCmd.parse(fields.copy(), payload) + return CoreSetConfigRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetAppConfigCmd.parse(fields.copy(), payload) + return CoreGetConfigCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetCountCmd.parse(fields.copy(), payload) + return CoreGetConfigRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetStateCmd.parse(fields.copy(), payload) + return CoreGenericErrorNtf.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetHybridConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionQueryMaxDataSizeCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionControlCommand.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidGetPowerStatsCmd.parse(fields.copy(), payload) + return CoreQueryTimeStampCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return AndroidSetCountryCodeCmd.parse(fields.copy(), payload) + return CoreQueryTimeStampRsp.parse(fields.copy(), payload) except Exception as exn: pass - return UciCommand(**fields), span + return CorePacket(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() + _span.append((self.oid << 0)) + _span.extend([0] * 2) _span.extend(payload or self.payload or []) return ControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return len(self.payload) + return len(self.payload) + 3 @dataclass -class UciResponse(ControlPacket): - +class SessionConfigPacket(ControlPacket): + oid: SessionConfigOpcodeId = field(kw_only=True, default=SessionConfigOpcodeId.INIT) def __post_init__(self): - self.mt = MessageType.RESPONSE + self.gid = GroupId.SESSION_CONFIG @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciResponse', bytes]: - if fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['SessionConfigPacket', bytes]: + if fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") + if len(span) < 3: + raise Exception('Invalid packet size') + fields['oid'] = SessionConfigOpcodeId.from_int((span[0] >> 0) & 0x3f) + value_ = int.from_bytes(span[1:3], byteorder='little') + span = span[3:] payload = span span = bytes([]) fields['payload'] = payload try: - return DeviceResetRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetDeviceInfoRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetCapsInfoRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SetConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return CoreQueryTimeStampRsp.parse(fields.copy(), payload) + return SessionInitCmd.parse(fields.copy(), payload) except Exception as exn: pass try: @@ -1702,92 +1431,11 @@ class UciResponse(ControlPacket): except Exception as exn: pass try: - return SessionDeinitRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetAppConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetAppConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetCountRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetStateRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateDtTagRangingRoundsRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionSetHybridConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateControllerMulticastListRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionQueryMaxDataSizeRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionStartRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionStopRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetRangingCountRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidGetPowerStatsRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidSetCountryCodeRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - return UciResponse(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return ControlPacket.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciNotification(ControlPacket): - - - def __post_init__(self): - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciNotification', bytes]: - if fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - try: - return DeviceStatusNtf.parse(fields.copy(), payload) + return SessionDeinitCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return GenericError.parse(fields.copy(), payload) + return SessionDeinitRsp.parse(fields.copy(), payload) except Exception as exn: pass try: @@ -1795,277 +1443,47 @@ class UciNotification(ControlPacket): except Exception as exn: pass try: - return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return DataCreditNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return DataTransferStatusNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionInfoNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - return UciNotification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return ControlPacket.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class CoreCommand(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.CORE - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['CoreCommand', bytes]: - if fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - try: - return DeviceResetCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetDeviceInfoCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetCapsInfoCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SetConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return CoreQueryTimeStampCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - return CoreCommand(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class CoreResponse(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.CORE - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['CoreResponse', bytes]: - if fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - try: - return DeviceResetRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetDeviceInfoRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetCapsInfoRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SetConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GetConfigRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return CoreQueryTimeStampRsp.parse(fields.copy(), payload) - except Exception as exn: - pass - return CoreResponse(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class CoreNotification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.CORE - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['CoreNotification', bytes]: - if fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - try: - return DeviceStatusNtf.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return GenericError.parse(fields.copy(), payload) - except Exception as exn: - pass - return CoreNotification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class SessionConfigCommand(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.SESSION_CONFIG - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionConfigCommand', bytes]: - if fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - try: - return SessionInitCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionDeinitCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: return SessionSetAppConfigCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetAppConfigCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetCountCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionGetStateCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload) - except Exception as exn: - pass - try: - return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload) + return SessionSetAppConfigRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionSetHybridConfigCmd.parse(fields.copy(), payload) + return SessionGetAppConfigCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionQueryMaxDataSizeCmd.parse(fields.copy(), payload) + return SessionGetAppConfigRsp.parse(fields.copy(), payload) except Exception as exn: pass - return SessionConfigCommand(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class SessionConfigResponse(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.SESSION_CONFIG - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionConfigResponse', bytes]: - if fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload try: - return SessionInitRsp_V2.parse(fields.copy(), payload) + return SessionGetCountCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionInitRsp.parse(fields.copy(), payload) + return SessionGetCountRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionDeinitRsp.parse(fields.copy(), payload) + return SessionGetStateCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionSetAppConfigRsp.parse(fields.copy(), payload) + return SessionGetStateRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetAppConfigRsp.parse(fields.copy(), payload) + return SessionUpdateDtAnchorRangingRoundsCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetCountRsp.parse(fields.copy(), payload) + return SessionUpdateDtAnchorRangingRoundsRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetStateRsp.parse(fields.copy(), payload) + return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload) except Exception as exn: pass try: @@ -2073,7 +1491,7 @@ class SessionConfigResponse(UciResponse): except Exception as exn: pass try: - return SessionSetHybridConfigRsp.parse(fields.copy(), payload) + return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload) except Exception as exn: pass try: @@ -2081,189 +1499,114 @@ class SessionConfigResponse(UciResponse): except Exception as exn: pass try: - return SessionQueryMaxDataSizeRsp.parse(fields.copy(), payload) + return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload) except Exception as exn: pass - return SessionConfigResponse(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class SessionConfigNotification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.SESSION_CONFIG - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionConfigNotification', bytes]: - if fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload try: - return SessionStatusNtf.parse(fields.copy(), payload) + return SessionQueryMaxDataSizeInRangingCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload) + return SessionQueryMaxDataSizeInRangingRsp.parse(fields.copy(), payload) except Exception as exn: pass - return SessionConfigNotification(**fields), span + return SessionConfigPacket(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() + _span.append((self.oid << 0)) + _span.extend([0] * 2) _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) + return ControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return len(self.payload) + return len(self.payload) + 3 @dataclass -class SessionControlCommand(UciCommand): - session_id: int = field(kw_only=True, default=0) +class SessionControlPacket(ControlPacket): + oid: SessionControlOpcodeId = field(kw_only=True, default=SessionControlOpcodeId.START) def __post_init__(self): self.gid = GroupId.SESSION_CONTROL - self.mt = MessageType.COMMAND @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionControlCommand', bytes]: - if fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['SessionControlPacket', bytes]: + if fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") - if len(span) < 4: + if len(span) < 3: raise Exception('Invalid packet size') - value_ = int.from_bytes(span[0:4], byteorder='little') - fields['session_id'] = value_ - span = span[4:] + fields['oid'] = SessionControlOpcodeId.from_int((span[0] >> 0) & 0x3f) + value_ = int.from_bytes(span[1:3], byteorder='little') + span = span[3:] payload = span span = bytes([]) fields['payload'] = payload try: - return SessionStartCmd.parse(fields.copy(), payload) + return SessionDataCreditNtf.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionStopCmd.parse(fields.copy(), payload) + return SessionDataTransferStatusNtf.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetRangingCountCmd.parse(fields.copy(), payload) + return SessionStartCmd.parse(fields.copy(), payload) except Exception as exn: pass - return SessionControlCommand(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - if self.session_id > 4294967295: - print(f"Invalid value for field SessionControlCommand::session_id: {self.session_id} > 4294967295; the value will be truncated") - self.session_id &= 4294967295 - _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) + 4 - -@dataclass -class SessionControlResponse(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.SESSION_CONTROL - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionControlResponse', bytes]: - if fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload try: return SessionStartRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionStopRsp.parse(fields.copy(), payload) + return SessionInfoNtf.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionGetRangingCountRsp.parse(fields.copy(), payload) + return SessionStopCmd.parse(fields.copy(), payload) except Exception as exn: pass - return SessionControlResponse(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class SessionControlNotification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.SESSION_CONTROL - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionControlNotification', bytes]: - if fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload try: - return DataCreditNtf.parse(fields.copy(), payload) + return SessionStopRsp.parse(fields.copy(), payload) except Exception as exn: pass try: - return DataTransferStatusNtf.parse(fields.copy(), payload) + return SessionGetRangingCountCmd.parse(fields.copy(), payload) except Exception as exn: pass try: - return SessionInfoNtf.parse(fields.copy(), payload) + return SessionGetRangingCountRsp.parse(fields.copy(), payload) except Exception as exn: pass - return SessionControlNotification(**fields), span + return SessionControlPacket(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() + _span.append((self.oid << 0)) + _span.extend([0] * 2) _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) + return ControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return len(self.payload) + return len(self.payload) + 3 @dataclass -class AndroidCommand(UciCommand): - +class AndroidPacket(ControlPacket): + oid: AndroidOpcodeId = field(kw_only=True, default=AndroidOpcodeId.GET_POWER_STATS) def __post_init__(self): self.gid = GroupId.VENDOR_ANDROID - self.mt = MessageType.COMMAND @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['AndroidCommand', bytes]: - if fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['AndroidPacket', bytes]: + if fields['gid'] != GroupId.VENDOR_ANDROID: raise Exception("Invalid constraint field values") + if len(span) < 3: + raise Exception('Invalid packet size') + fields['oid'] = AndroidOpcodeId.from_int((span[0] >> 0) & 0x3f) + value_ = int.from_bytes(span[1:3], byteorder='little') + span = span[3:] payload = span span = bytes([]) fields['payload'] = payload @@ -2272,194 +1615,144 @@ class AndroidCommand(UciCommand): except Exception as exn: pass try: - return AndroidSetCountryCodeCmd.parse(fields.copy(), payload) + return AndroidGetPowerStatsRsp.parse(fields.copy(), payload) except Exception as exn: pass - return AndroidCommand(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class AndroidResponse(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_ANDROID - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['AndroidResponse', bytes]: - if fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload try: - return AndroidGetPowerStatsRsp.parse(fields.copy(), payload) + return AndroidSetCountryCodeCmd.parse(fields.copy(), payload) except Exception as exn: pass try: return AndroidSetCountryCodeRsp.parse(fields.copy(), payload) except Exception as exn: pass - return AndroidResponse(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class AndroidNotification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_ANDROID - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['AndroidNotification', bytes]: - if fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload try: return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload) except Exception as exn: pass - return AndroidNotification(**fields), span + return AndroidPacket(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() + _span.append((self.oid << 0)) + _span.extend([0] * 2) _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) + return ControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return len(self.payload) + return len(self.payload) + 3 @dataclass -class DeviceResetCmd(CoreCommand): +class CoreDeviceResetCmd(CorePacket): reset_config: ResetConfig = field(kw_only=True, default=ResetConfig.UWBS_RESET) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.CORE self.mt = MessageType.COMMAND + self.oid = CoreOpcodeId.DEVICE_RESET + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['DeviceResetCmd', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') fields['reset_config'] = ResetConfig.from_int(span[0]) span = span[1:] - return DeviceResetCmd(**fields), span + return CoreDeviceResetCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.reset_config << 0)) - return CoreCommand.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class DeviceResetRsp(CoreResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class CoreDeviceResetRsp(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.CORE self.mt = MessageType.RESPONSE + self.oid = CoreOpcodeId.DEVICE_RESET + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['DeviceResetRsp', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] - return DeviceResetRsp(**fields), span + return CoreDeviceResetRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return CoreResponse.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class DeviceStatusNtf(CoreNotification): +class CoreDeviceStatusNtf(CorePacket): device_state: DeviceState = field(kw_only=True, default=DeviceState.DEVICE_STATE_READY) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.CORE self.mt = MessageType.NOTIFICATION + self.oid = CoreOpcodeId.DEVICE_STATUS + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['DeviceStatusNtf', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.NOTIFICATION: + def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceStatusNtf', bytes]: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.DEVICE_STATUS or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') fields['device_state'] = DeviceState.from_int(span[0]) span = span[1:] - return DeviceStatusNtf(**fields), span + return CoreDeviceStatusNtf(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.device_state << 0)) - return CoreNotification.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class GetDeviceInfoCmd(CoreCommand): +class CoreGetDeviceInfoCmd(CorePacket): def __post_init__(self): - self.opcode = 2 - self.gid = GroupId.CORE self.mt = MessageType.COMMAND + self.oid = CoreOpcodeId.GET_DEVICE_INFO + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoCmd', bytes]: - if fields['opcode'] != 0x2 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") - return GetDeviceInfoCmd(**fields), span + return CoreGetDeviceInfoCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return CoreCommand.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 0 @dataclass -class GetDeviceInfoRsp(CoreResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class CoreGetDeviceInfoRsp(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) uci_version: int = field(kw_only=True, default=0) mac_version: int = field(kw_only=True, default=0) phy_version: int = field(kw_only=True, default=0) @@ -2467,17 +1760,17 @@ class GetDeviceInfoRsp(CoreResponse): vendor_spec_info: bytearray = field(kw_only=True, default_factory=bytearray) def __post_init__(self): - self.opcode = 2 - self.gid = GroupId.CORE self.mt = MessageType.RESPONSE + self.oid = CoreOpcodeId.GET_DEVICE_INFO + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoRsp', bytes]: - if fields['opcode'] != 0x2 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 10: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) value_ = int.from_bytes(span[1:3], byteorder='little') fields['uci_version'] = value_ value_ = int.from_bytes(span[3:5], byteorder='little') @@ -2492,56 +1785,56 @@ class GetDeviceInfoRsp(CoreResponse): raise Exception('Invalid packet size') fields['vendor_spec_info'] = list(span[:vendor_spec_info_count]) span = span[vendor_spec_info_count:] - return GetDeviceInfoRsp(**fields), span + return CoreGetDeviceInfoRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) if self.uci_version > 65535: - print(f"Invalid value for field GetDeviceInfoRsp::uci_version: {self.uci_version} > 65535; the value will be truncated") + print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_version: {self.uci_version} > 65535; the value will be truncated") self.uci_version &= 65535 _span.extend(int.to_bytes((self.uci_version << 0), length=2, byteorder='little')) if self.mac_version > 65535: - print(f"Invalid value for field GetDeviceInfoRsp::mac_version: {self.mac_version} > 65535; the value will be truncated") + print(f"Invalid value for field CoreGetDeviceInfoRsp::mac_version: {self.mac_version} > 65535; the value will be truncated") self.mac_version &= 65535 _span.extend(int.to_bytes((self.mac_version << 0), length=2, byteorder='little')) if self.phy_version > 65535: - print(f"Invalid value for field GetDeviceInfoRsp::phy_version: {self.phy_version} > 65535; the value will be truncated") + print(f"Invalid value for field CoreGetDeviceInfoRsp::phy_version: {self.phy_version} > 65535; the value will be truncated") self.phy_version &= 65535 _span.extend(int.to_bytes((self.phy_version << 0), length=2, byteorder='little')) if self.uci_test_version > 65535: - print(f"Invalid value for field GetDeviceInfoRsp::uci_test_version: {self.uci_test_version} > 65535; the value will be truncated") + print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_test_version: {self.uci_test_version} > 65535; the value will be truncated") self.uci_test_version &= 65535 _span.extend(int.to_bytes((self.uci_test_version << 0), length=2, byteorder='little')) if len(self.vendor_spec_info) > 255: - print(f"Invalid length for field GetDeviceInfoRsp::vendor_spec_info: {len(self.vendor_spec_info)} > 255; the array will be truncated") + print(f"Invalid length for field CoreGetDeviceInfoRsp::vendor_spec_info: {len(self.vendor_spec_info)} > 255; the array will be truncated") del self.vendor_spec_info[255:] _span.append((len(self.vendor_spec_info) << 0)) _span.extend(self.vendor_spec_info) - return CoreResponse.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return len(self.vendor_spec_info) * 1 + 10 @dataclass -class GetCapsInfoCmd(CoreCommand): +class CoreGetCapsInfoCmd(CorePacket): def __post_init__(self): - self.opcode = 3 - self.gid = GroupId.CORE self.mt = MessageType.COMMAND + self.oid = CoreOpcodeId.GET_CAPS_INFO + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoCmd', bytes]: - if fields['opcode'] != 0x3 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") - return GetCapsInfoCmd(**fields), span + return CoreGetCapsInfoCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return CoreCommand.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -2584,22 +1877,22 @@ class CapTlv(Packet): return len(self.v) * 1 + 2 @dataclass -class GetCapsInfoRsp(CoreResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class CoreGetCapsInfoRsp(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) tlvs: List[CapTlv] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 3 - self.gid = GroupId.CORE self.mt = MessageType.RESPONSE + self.oid = CoreOpcodeId.GET_CAPS_INFO + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoRsp', bytes]: - if fields['opcode'] != 0x3 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) tlvs_count = span[1] span = span[2:] tlvs = [] @@ -2607,118 +1900,127 @@ class GetCapsInfoRsp(CoreResponse): element, span = CapTlv.parse(span) tlvs.append(element) fields['tlvs'] = tlvs - return GetCapsInfoRsp(**fields), span + return CoreGetCapsInfoRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) if len(self.tlvs) > 255: - print(f"Invalid length for field GetCapsInfoRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated") + print(f"Invalid length for field CoreGetCapsInfoRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated") del self.tlvs[255:] _span.append((len(self.tlvs) << 0)) for _elt in self.tlvs: _span.extend(_elt.serialize()) - return CoreResponse.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return sum([elt.size for elt in self.tlvs]) + 2 +class ConfigParameterId(enum.IntEnum): + DEVICE_STATE = 0x0 + LOW_POWER_MODE = 0x1 + + @staticmethod + def from_int(v: int) -> Union[int, 'ConfigParameterId']: + try: + return ConfigParameterId(v) + except ValueError as exn: + return v + + @dataclass -class DeviceConfigTlv(Packet): - cfg_id: DeviceConfigId = field(kw_only=True, default=DeviceConfigId.DEVICE_STATE) - v: bytearray = field(kw_only=True, default_factory=bytearray) +class ConfigParameter(Packet): + id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE) + value: bytearray = field(kw_only=True, default_factory=bytearray) def __post_init__(self): pass @staticmethod - def parse(span: bytes) -> Tuple['DeviceConfigTlv', bytes]: + def parse(span: bytes) -> Tuple['ConfigParameter', bytes]: fields = {'payload': None} if len(span) < 2: raise Exception('Invalid packet size') - fields['cfg_id'] = DeviceConfigId.from_int(span[0]) - v_count = span[1] + fields['id'] = ConfigParameterId.from_int(span[0]) + value_size = span[1] span = span[2:] - if len(span) < v_count: + if len(span) < value_size: raise Exception('Invalid packet size') - fields['v'] = list(span[:v_count]) - span = span[v_count:] - return DeviceConfigTlv(**fields), span + fields['value'] = list(span[:value_size]) + span = span[value_size:] + return ConfigParameter(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - _span.append((self.cfg_id << 0)) - if len(self.v) > 255: - print(f"Invalid length for field DeviceConfigTlv::v: {len(self.v)} > 255; the array will be truncated") - del self.v[255:] - _span.append((len(self.v) << 0)) - _span.extend(self.v) + _span.append((self.id << 0)) + _span.append(((len(self.value) * 1) << 0)) + _span.extend(self.value) return bytes(_span) @property def size(self) -> int: - return len(self.v) * 1 + 2 + return len(self.value) * 1 + 2 @dataclass -class SetConfigCmd(CoreCommand): - tlvs: List[DeviceConfigTlv] = field(kw_only=True, default_factory=list) +class CoreSetConfigCmd(CorePacket): + parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 4 - self.gid = GroupId.CORE self.mt = MessageType.COMMAND + self.oid = CoreOpcodeId.SET_CONFIG + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SetConfigCmd', bytes]: - if fields['opcode'] != 0x4 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - tlvs_count = span[0] + parameters_count = span[0] span = span[1:] - tlvs = [] - for n in range(tlvs_count): - element, span = DeviceConfigTlv.parse(span) - tlvs.append(element) - fields['tlvs'] = tlvs - return SetConfigCmd(**fields), span + parameters = [] + for n in range(parameters_count): + element, span = ConfigParameter.parse(span) + parameters.append(element) + fields['parameters'] = parameters + return CoreSetConfigCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - if len(self.tlvs) > 255: - print(f"Invalid length for field SetConfigCmd::tlvs: {len(self.tlvs)} > 255; the array will be truncated") - del self.tlvs[255:] - _span.append((len(self.tlvs) << 0)) - for _elt in self.tlvs: + if len(self.parameters) > 255: + print(f"Invalid length for field CoreSetConfigCmd::parameters: {len(self.parameters)} > 255; the array will be truncated") + del self.parameters[255:] + _span.append((len(self.parameters) << 0)) + for _elt in self.parameters: _span.extend(_elt.serialize()) - return CoreCommand.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return sum([elt.size for elt in self.tlvs]) + 1 + return sum([elt.size for elt in self.parameters]) + 1 @dataclass -class DeviceConfigStatus(Packet): - cfg_id: DeviceConfigId = field(kw_only=True, default=DeviceConfigId.DEVICE_STATE) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class ConfigParameterStatus(Packet): + id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE) + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): pass @staticmethod - def parse(span: bytes) -> Tuple['DeviceConfigStatus', bytes]: + def parse(span: bytes) -> Tuple['ConfigParameterStatus', bytes]: fields = {'payload': None} if len(span) < 2: raise Exception('Invalid packet size') - fields['cfg_id'] = DeviceConfigId.from_int(span[0]) - fields['status'] = StatusCode.from_int(span[1]) + fields['id'] = ConfigParameterId.from_int(span[0]) + fields['status'] = Status.from_int(span[1]) span = span[2:] - return DeviceConfigStatus(**fields), span + return ConfigParameterStatus(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - _span.append((self.cfg_id << 0)) + _span.append((self.id << 0)) _span.append((self.status << 0)) return bytes(_span) @@ -2727,193 +2029,197 @@ class DeviceConfigStatus(Packet): return 2 @dataclass -class SetConfigRsp(CoreResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) - cfg_status: List[DeviceConfigStatus] = field(kw_only=True, default_factory=list) +class CoreSetConfigRsp(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) + parameters: List[ConfigParameterStatus] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 4 - self.gid = GroupId.CORE self.mt = MessageType.RESPONSE + self.oid = CoreOpcodeId.SET_CONFIG + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SetConfigRsp', bytes]: - if fields['opcode'] != 0x4 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) - cfg_status_count = span[1] + fields['status'] = Status.from_int(span[0]) + parameters_count = span[1] span = span[2:] - if len(span) < cfg_status_count * 2: + if len(span) < parameters_count * 2: raise Exception('Invalid packet size') - cfg_status = [] - for n in range(cfg_status_count): - cfg_status.append(DeviceConfigStatus.parse_all(span[n * 2:(n + 1) * 2])) - fields['cfg_status'] = cfg_status - span = span[cfg_status_count * 2:] - return SetConfigRsp(**fields), span + parameters = [] + for n in range(parameters_count): + parameters.append(ConfigParameterStatus.parse_all(span[n * 2:(n + 1) * 2])) + fields['parameters'] = parameters + span = span[parameters_count * 2:] + return CoreSetConfigRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - if len(self.cfg_status) > 255: - print(f"Invalid length for field SetConfigRsp::cfg_status: {len(self.cfg_status)} > 255; the array will be truncated") - del self.cfg_status[255:] - _span.append((len(self.cfg_status) << 0)) - for _elt in self.cfg_status: + if len(self.parameters) > 255: + print(f"Invalid length for field CoreSetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated") + del self.parameters[255:] + _span.append((len(self.parameters) << 0)) + for _elt in self.parameters: _span.extend(_elt.serialize()) - return CoreResponse.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return sum([elt.size for elt in self.cfg_status]) + 2 + return sum([elt.size for elt in self.parameters]) + 2 @dataclass -class GetConfigCmd(CoreCommand): - cfg_id: bytearray = field(kw_only=True, default_factory=bytearray) +class CoreGetConfigCmd(CorePacket): + parameter_ids: List[ConfigParameterId] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 5 - self.gid = GroupId.CORE self.mt = MessageType.COMMAND + self.oid = CoreOpcodeId.GET_CONFIG + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GetConfigCmd', bytes]: - if fields['opcode'] != 0x5 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - cfg_id_count = span[0] + parameter_ids_count = span[0] span = span[1:] - if len(span) < cfg_id_count: + if len(span) < parameter_ids_count: raise Exception('Invalid packet size') - fields['cfg_id'] = list(span[:cfg_id_count]) - span = span[cfg_id_count:] - return GetConfigCmd(**fields), span + parameter_ids = [] + for n in range(parameter_ids_count): + parameter_ids.append(ConfigParameterId(int.from_bytes(span[n:n + 1], byteorder='little'))) + fields['parameter_ids'] = parameter_ids + span = span[parameter_ids_count:] + return CoreGetConfigCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - if len(self.cfg_id) > 255: - print(f"Invalid length for field GetConfigCmd::cfg_id: {len(self.cfg_id)} > 255; the array will be truncated") - del self.cfg_id[255:] - _span.append((len(self.cfg_id) << 0)) - _span.extend(self.cfg_id) - return CoreCommand.serialize(self, payload = bytes(_span)) + if len(self.parameter_ids) > 255: + print(f"Invalid length for field CoreGetConfigCmd::parameter_ids: {len(self.parameter_ids)} > 255; the array will be truncated") + del self.parameter_ids[255:] + _span.append((len(self.parameter_ids) << 0)) + for _elt in self.parameter_ids: + _span.append(_elt) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return len(self.cfg_id) * 1 + 1 + return len(self.parameter_ids) * 8 + 1 @dataclass -class GetConfigRsp(CoreResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) - tlvs: List[DeviceConfigTlv] = field(kw_only=True, default_factory=list) +class CoreGetConfigRsp(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) + parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 5 - self.gid = GroupId.CORE self.mt = MessageType.RESPONSE + self.oid = CoreOpcodeId.GET_CONFIG + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GetConfigRsp', bytes]: - if fields['opcode'] != 0x5 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) - tlvs_count = span[1] + fields['status'] = Status.from_int(span[0]) + parameters_count = span[1] span = span[2:] - tlvs = [] - for n in range(tlvs_count): - element, span = DeviceConfigTlv.parse(span) - tlvs.append(element) - fields['tlvs'] = tlvs - return GetConfigRsp(**fields), span + parameters = [] + for n in range(parameters_count): + element, span = ConfigParameter.parse(span) + parameters.append(element) + fields['parameters'] = parameters + return CoreGetConfigRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - if len(self.tlvs) > 255: - print(f"Invalid length for field GetConfigRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated") - del self.tlvs[255:] - _span.append((len(self.tlvs) << 0)) - for _elt in self.tlvs: + if len(self.parameters) > 255: + print(f"Invalid length for field CoreGetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated") + del self.parameters[255:] + _span.append((len(self.parameters) << 0)) + for _elt in self.parameters: _span.extend(_elt.serialize()) - return CoreResponse.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return sum([elt.size for elt in self.tlvs]) + 2 + return sum([elt.size for elt in self.parameters]) + 2 @dataclass -class GenericError(CoreNotification): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class CoreGenericErrorNtf(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 7 - self.gid = GroupId.CORE self.mt = MessageType.NOTIFICATION + self.oid = CoreOpcodeId.GENERIC_ERROR + self.gid = GroupId.CORE @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['GenericError', bytes]: - if fields['opcode'] != 0x7 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.NOTIFICATION: + def parse(fields: dict, span: bytes) -> Tuple['CoreGenericErrorNtf', bytes]: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.GENERIC_ERROR or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] - return GenericError(**fields), span + return CoreGenericErrorNtf(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return CoreNotification.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class CoreQueryTimeStampCmd(CoreCommand): +class CoreQueryTimeStampCmd(CorePacket): def __post_init__(self): - self.opcode = 8 - self.gid = GroupId.CORE self.mt = MessageType.COMMAND + self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP + self.gid = GroupId.CORE @staticmethod def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampCmd', bytes]: - if fields['opcode'] != 0x8 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") return CoreQueryTimeStampCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return CoreCommand.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 0 @dataclass -class CoreQueryTimeStampRsp(CoreResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class CoreQueryTimeStampRsp(CorePacket): + status: Status = field(kw_only=True, default=Status.OK) timeStamp: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 8 - self.gid = GroupId.CORE self.mt = MessageType.RESPONSE + self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP + self.gid = GroupId.CORE @staticmethod def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampRsp', bytes]: - if fields['opcode'] != 0x8 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE: raise Exception("Invalid constraint field values") if len(span) < 9: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) value_ = int.from_bytes(span[1:9], byteorder='little') fields['timeStamp'] = value_ span = span[9:] @@ -2926,25 +2232,25 @@ class CoreQueryTimeStampRsp(CoreResponse): print(f"Invalid value for field CoreQueryTimeStampRsp::timeStamp: {self.timeStamp} > 18446744073709551615; the value will be truncated") self.timeStamp &= 18446744073709551615 _span.extend(int.to_bytes((self.timeStamp << 0), length=8, byteorder='little')) - return CoreResponse.serialize(self, payload = bytes(_span)) + return CorePacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 9 @dataclass -class SessionInitCmd(SessionConfigCommand): +class SessionInitCmd(SessionConfigPacket): session_id: int = field(kw_only=True, default=0) session_type: SessionType = field(kw_only=True, default=SessionType.FIRA_RANGING_SESSION) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.INIT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionInitCmd', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') @@ -2961,29 +2267,29 @@ class SessionInitCmd(SessionConfigCommand): self.session_id &= 4294967295 _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) _span.append((self.session_type << 0)) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 5 @dataclass -class SessionInitRsp_V2(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionInitRsp_V2(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) session_handle: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.INIT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp_V2', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) value_ = int.from_bytes(span[1:5], byteorder='little') fields['session_handle'] = value_ span = span[5:] @@ -2996,52 +2302,52 @@ class SessionInitRsp_V2(SessionConfigResponse): print(f"Invalid value for field SessionInitRsp_V2::session_handle: {self.session_handle} > 4294967295; the value will be truncated") self.session_handle &= 4294967295 _span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little')) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 5 @dataclass -class SessionInitRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionInitRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.INIT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] return SessionInitRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class SessionDeinitCmd(SessionConfigCommand): +class SessionDeinitCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.DEINIT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitCmd', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 4: raise Exception('Invalid packet size') @@ -3056,54 +2362,54 @@ class SessionDeinitCmd(SessionConfigCommand): print(f"Invalid value for field SessionDeinitCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") self.session_token &= 4294967295 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 4 @dataclass -class SessionDeinitRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionDeinitRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.DEINIT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitRsp', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] return SessionDeinitRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class SessionStatusNtf(SessionConfigNotification): +class SessionStatusNtf(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT) reason_code: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 2 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.NOTIFICATION + self.oid = SessionConfigOpcodeId.STATUS + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionStatusNtf', bytes]: - if fields['opcode'] != 0x2 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.NOTIFICATION: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.STATUS or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 6: raise Exception('Invalid packet size') @@ -3125,7 +2431,7 @@ class SessionStatusNtf(SessionConfigNotification): print(f"Invalid value for field SessionStatusNtf::reason_code: {self.reason_code} > 255; the value will be truncated") self.reason_code &= 255 _span.append((self.reason_code << 0)) - return SessionConfigNotification.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -3168,18 +2474,18 @@ class AppConfigTlv(Packet): return len(self.v) * 1 + 2 @dataclass -class SessionSetAppConfigCmd(SessionConfigCommand): +class SessionSetAppConfigCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 3 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.SET_APP_CONFIG + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigCmd', bytes]: - if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') @@ -3206,7 +2512,7 @@ class SessionSetAppConfigCmd(SessionConfigCommand): _span.append((len(self.tlvs) << 0)) for _elt in self.tlvs: _span.extend(_elt.serialize()) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -3215,7 +2521,7 @@ class SessionSetAppConfigCmd(SessionConfigCommand): @dataclass class AppConfigStatus(Packet): cfg_id: AppConfigTlvType = field(kw_only=True, default=AppConfigTlvType.DEVICE_TYPE) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): pass @@ -3226,7 +2532,7 @@ class AppConfigStatus(Packet): if len(span) < 2: raise Exception('Invalid packet size') fields['cfg_id'] = AppConfigTlvType.from_int(span[0]) - fields['status'] = StatusCode.from_int(span[1]) + fields['status'] = Status.from_int(span[1]) span = span[2:] return AppConfigStatus(**fields), span @@ -3241,22 +2547,22 @@ class AppConfigStatus(Packet): return 2 @dataclass -class SessionSetAppConfigRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionSetAppConfigRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) cfg_status: List[AppConfigStatus] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 3 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.SET_APP_CONFIG + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigRsp', bytes]: - if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) cfg_status_count = span[1] span = span[2:] if len(span) < cfg_status_count * 2: @@ -3277,25 +2583,25 @@ class SessionSetAppConfigRsp(SessionConfigResponse): _span.append((len(self.cfg_status) << 0)) for _elt in self.cfg_status: _span.extend(_elt.serialize()) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return sum([elt.size for elt in self.cfg_status]) + 2 @dataclass -class SessionGetAppConfigCmd(SessionConfigCommand): +class SessionGetAppConfigCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) app_cfg: List[AppConfigTlvType] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 4 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.GET_APP_CONFIG + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigCmd', bytes]: - if fields['opcode'] != 0x4 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') @@ -3324,29 +2630,29 @@ class SessionGetAppConfigCmd(SessionConfigCommand): _span.append((len(self.app_cfg) << 0)) for _elt in self.app_cfg: _span.append(_elt) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return len(self.app_cfg) * 8 + 5 @dataclass -class SessionGetAppConfigRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionGetAppConfigRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 4 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.GET_APP_CONFIG + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigRsp', bytes]: - if fields['opcode'] != 0x4 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) tlvs_count = span[1] span = span[2:] tlvs = [] @@ -3365,52 +2671,52 @@ class SessionGetAppConfigRsp(SessionConfigResponse): _span.append((len(self.tlvs) << 0)) for _elt in self.tlvs: _span.extend(_elt.serialize()) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return sum([elt.size for elt in self.tlvs]) + 2 @dataclass -class SessionGetCountCmd(SessionConfigCommand): +class SessionGetCountCmd(SessionConfigPacket): def __post_init__(self): - self.opcode = 5 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.GET_COUNT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountCmd', bytes]: - if fields['opcode'] != 0x5 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") return SessionGetCountCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 0 @dataclass -class SessionGetCountRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionGetCountRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) session_count: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 5 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.GET_COUNT + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountRsp', bytes]: - if fields['opcode'] != 0x5 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) fields['session_count'] = span[1] span = span[2:] return SessionGetCountRsp(**fields), span @@ -3422,24 +2728,24 @@ class SessionGetCountRsp(SessionConfigResponse): print(f"Invalid value for field SessionGetCountRsp::session_count: {self.session_count} > 255; the value will be truncated") self.session_count &= 255 _span.append((self.session_count << 0)) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 2 @dataclass -class SessionGetStateCmd(SessionConfigCommand): +class SessionGetStateCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 6 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.GET_STATE + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateCmd', bytes]: - if fields['opcode'] != 0x6 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 4: raise Exception('Invalid packet size') @@ -3454,29 +2760,29 @@ class SessionGetStateCmd(SessionConfigCommand): print(f"Invalid value for field SessionGetStateCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") self.session_token &= 4294967295 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 4 @dataclass -class SessionGetStateRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionGetStateRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT) def __post_init__(self): - self.opcode = 6 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.GET_STATE + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateRsp', bytes]: - if fields['opcode'] != 0x6 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) fields['session_state'] = SessionState.from_int(span[1]) span = span[2:] return SessionGetStateRsp(**fields), span @@ -3485,25 +2791,71 @@ class SessionGetStateRsp(SessionConfigResponse): _span = bytearray() _span.append((self.status << 0)) _span.append((self.session_state << 0)) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 2 @dataclass -class SessionUpdateDtTagRangingRoundsCmd(SessionConfigCommand): +class SessionUpdateDtAnchorRangingRoundsCmd(SessionConfigPacket): + + + def __post_init__(self): + self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS + self.gid = GroupId.SESSION_CONFIG + + @staticmethod + def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: + raise Exception("Invalid constraint field values") + return SessionUpdateDtAnchorRangingRoundsCmd(**fields), span + + def serialize(self, payload: bytes = None) -> bytes: + _span = bytearray() + return SessionConfigPacket.serialize(self, payload = bytes(_span)) + + @property + def size(self) -> int: + return 0 + +@dataclass +class SessionUpdateDtAnchorRangingRoundsRsp(SessionConfigPacket): + + + def __post_init__(self): + self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS + self.gid = GroupId.SESSION_CONFIG + + @staticmethod + def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: + raise Exception("Invalid constraint field values") + return SessionUpdateDtAnchorRangingRoundsRsp(**fields), span + + def serialize(self, payload: bytes = None) -> bytes: + _span = bytearray() + return SessionConfigPacket.serialize(self, payload = bytes(_span)) + + @property + def size(self) -> int: + return 0 + +@dataclass +class SessionUpdateDtTagRangingRoundsCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray) def __post_init__(self): - self.opcode = 9 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsCmd', bytes]: - if fields['opcode'] != 0x9 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') @@ -3528,29 +2880,29 @@ class SessionUpdateDtTagRangingRoundsCmd(SessionConfigCommand): del self.ranging_round_indexes[255:] _span.append((len(self.ranging_round_indexes) << 0)) _span.extend(self.ranging_round_indexes) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return len(self.ranging_round_indexes) * 1 + 5 @dataclass -class SessionUpdateDtTagRangingRoundsRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionUpdateDtTagRangingRoundsRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray) def __post_init__(self): - self.opcode = 9 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsRsp', bytes]: - if fields['opcode'] != 0x9 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) ranging_round_indexes_count = span[1] span = span[2:] if len(span) < ranging_round_indexes_count: @@ -3567,7 +2919,7 @@ class SessionUpdateDtTagRangingRoundsRsp(SessionConfigResponse): del self.ranging_round_indexes[255:] _span.append((len(self.ranging_round_indexes) << 0)) _span.extend(self.ranging_round_indexes) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -3705,18 +3057,18 @@ class UpdateMulticastListAction(enum.IntEnum): @dataclass -class SessionUpdateControllerMulticastListCmd(SessionConfigCommand): +class SessionUpdateControllerMulticastListCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) action: UpdateMulticastListAction = field(kw_only=True, default=UpdateMulticastListAction.ADD_CONTROLEE) def __post_init__(self): - self.opcode = 7 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd', bytes]: - if fields['opcode'] != 0x7 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') @@ -3737,139 +3089,13 @@ class SessionUpdateControllerMulticastListCmd(SessionConfigCommand): _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) _span.append((self.action << 0)) _span.extend(payload or self.payload or []) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return len(self.payload) + 5 @dataclass -class PhaseList(Packet): - session_token: int = field(kw_only=True, default=0) - start_slot_index: int = field(kw_only=True, default=0) - end_slot_index: int = field(kw_only=True, default=0) - - def __post_init__(self): - pass - - @staticmethod - def parse(span: bytes) -> Tuple['PhaseList', bytes]: - fields = {'payload': None} - if len(span) < 8: - raise Exception('Invalid packet size') - value_ = int.from_bytes(span[0:4], byteorder='little') - fields['session_token'] = value_ - value_ = int.from_bytes(span[4:6], byteorder='little') - fields['start_slot_index'] = value_ - value_ = int.from_bytes(span[6:8], byteorder='little') - fields['end_slot_index'] = value_ - span = span[8:] - return PhaseList(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - if self.session_token > 4294967295: - print(f"Invalid value for field PhaseList::session_token: {self.session_token} > 4294967295; the value will be truncated") - self.session_token &= 4294967295 - _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) - if self.start_slot_index > 65535: - print(f"Invalid value for field PhaseList::start_slot_index: {self.start_slot_index} > 65535; the value will be truncated") - self.start_slot_index &= 65535 - _span.extend(int.to_bytes((self.start_slot_index << 0), length=2, byteorder='little')) - if self.end_slot_index > 65535: - print(f"Invalid value for field PhaseList::end_slot_index: {self.end_slot_index} > 65535; the value will be truncated") - self.end_slot_index &= 65535 - _span.extend(int.to_bytes((self.end_slot_index << 0), length=2, byteorder='little')) - return bytes(_span) - - @property - def size(self) -> int: - return 8 - -@dataclass -class SessionSetHybridConfigCmd(SessionConfigCommand): - session_token: int = field(kw_only=True, default=0) - number_of_phases: int = field(kw_only=True, default=0) - update_time: bytearray = field(kw_only=True, default_factory=bytearray) - phase_list: List[PhaseList] = field(kw_only=True, default_factory=list) - - def __post_init__(self): - self.opcode = 12 - self.gid = GroupId.SESSION_CONFIG - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionSetHybridConfigCmd', bytes]: - if fields['opcode'] != 0xc or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - if len(span) < 5: - raise Exception('Invalid packet size') - value_ = int.from_bytes(span[0:4], byteorder='little') - fields['session_token'] = value_ - fields['number_of_phases'] = span[4] - span = span[5:] - if len(span) < 8: - raise Exception('Invalid packet size') - fields['update_time'] = list(span[:8]) - span = span[8:] - if len(span) % 8 != 0: - raise Exception('Array size is not a multiple of the element size') - phase_list_count = int(len(span) / 8) - phase_list = [] - for n in range(phase_list_count): - phase_list.append(PhaseList.parse_all(span[n * 8:(n + 1) * 8])) - fields['phase_list'] = phase_list - span = bytes() - return SessionSetHybridConfigCmd(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - if self.session_token > 4294967295: - print(f"Invalid value for field SessionSetHybridConfigCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") - self.session_token &= 4294967295 - _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) - if self.number_of_phases > 255: - print(f"Invalid value for field SessionSetHybridConfigCmd::number_of_phases: {self.number_of_phases} > 255; the value will be truncated") - self.number_of_phases &= 255 - _span.append((self.number_of_phases << 0)) - _span.extend(self.update_time) - for _elt in self.phase_list: - _span.extend(_elt.serialize()) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return sum([elt.size for elt in self.phase_list]) + 13 - -@dataclass -class SessionSetHybridConfigRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) - - def __post_init__(self): - self.opcode = 12 - self.gid = GroupId.SESSION_CONFIG - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionSetHybridConfigRsp', bytes]: - if fields['opcode'] != 0xc or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - if len(span) < 1: - raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) - span = span[1:] - return SessionSetHybridConfigRsp(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.append((self.status << 0)) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return 1 - -@dataclass class SessionUpdateControllerMulticastListCmdPayload(Packet): controlees: List[Controlee] = field(kw_only=True, default_factory=list) @@ -3981,28 +3207,28 @@ class SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload(Packet): return sum([elt.size for elt in self.controlees]) + 1 @dataclass -class SessionUpdateControllerMulticastListRsp(SessionConfigResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionUpdateControllerMulticastListRsp(SessionConfigPacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 7 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListRsp', bytes]: - if fields['opcode'] != 0x7 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] return SessionUpdateControllerMulticastListRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -4012,7 +3238,7 @@ class SessionUpdateControllerMulticastListRsp(SessionConfigResponse): class ControleeStatus(Packet): mac_address: bytearray = field(kw_only=True, default_factory=bytearray) subsession_id: int = field(kw_only=True, default=0) - status: MulticastUpdateStatusCode = field(kw_only=True, default=MulticastUpdateStatusCode.STATUS_OK_MULTICAST_LIST_UPDATE) + status: MulticastUpdateStatus = field(kw_only=True, default=MulticastUpdateStatus.OK_MULTICAST_LIST_UPDATE) def __post_init__(self): pass @@ -4028,7 +3254,7 @@ class ControleeStatus(Packet): raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:4], byteorder='little') fields['subsession_id'] = value_ - fields['status'] = MulticastUpdateStatusCode.from_int(span[4]) + fields['status'] = MulticastUpdateStatus.from_int(span[4]) span = span[5:] return ControleeStatus(**fields), span @@ -4047,19 +3273,19 @@ class ControleeStatus(Packet): return 7 @dataclass -class SessionUpdateControllerMulticastListNtf(SessionConfigNotification): +class SessionUpdateControllerMulticastListNtf(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) remaining_multicast_list_size: int = field(kw_only=True, default=0) controlee_status: List[ControleeStatus] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 7 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.NOTIFICATION + self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST + self.gid = GroupId.SESSION_CONFIG @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListNtf', bytes]: - if fields['opcode'] != 0x7 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.NOTIFICATION: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 6: raise Exception('Invalid packet size') @@ -4093,25 +3319,25 @@ class SessionUpdateControllerMulticastListNtf(SessionConfigNotification): _span.append((len(self.controlee_status) << 0)) for _elt in self.controlee_status: _span.extend(_elt.serialize()) - return SessionConfigNotification.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return sum([elt.size for elt in self.controlee_status]) + 6 @dataclass -class DataCreditNtf(SessionControlNotification): +class SessionDataCreditNtf(SessionControlPacket): session_token: int = field(kw_only=True, default=0) credit_availability: CreditAvailability = field(kw_only=True, default=CreditAvailability.CREDIT_NOT_AVAILABLE) def __post_init__(self): - self.opcode = 4 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.DATA_CREDIT + self.gid = GroupId.SESSION_CONTROL @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['DataCreditNtf', bytes]: - if fields['opcode'] != 0x4 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + def parse(fields: dict, span: bytes) -> Tuple['SessionDataCreditNtf', bytes]: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_CREDIT or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') @@ -4119,36 +3345,36 @@ class DataCreditNtf(SessionControlNotification): fields['session_token'] = value_ fields['credit_availability'] = CreditAvailability.from_int(span[4]) span = span[5:] - return DataCreditNtf(**fields), span + return SessionDataCreditNtf(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() if self.session_token > 4294967295: - print(f"Invalid value for field DataCreditNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") + print(f"Invalid value for field SessionDataCreditNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") self.session_token &= 4294967295 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) _span.append((self.credit_availability << 0)) - return SessionControlNotification.serialize(self, payload = bytes(_span)) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 5 @dataclass -class DataTransferStatusNtf(SessionControlNotification): +class SessionDataTransferStatusNtf(SessionControlPacket): session_token: int = field(kw_only=True, default=0) uci_sequence_number: int = field(kw_only=True, default=0) status: DataTransferNtfStatusCode = field(kw_only=True, default=DataTransferNtfStatusCode.UCI_DATA_TRANSFER_STATUS_REPETITION_OK) tx_count: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 5 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.DATA_TRANSFER_STATUS + self.gid = GroupId.SESSION_CONTROL @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['DataTransferStatusNtf', bytes]: - if fields['opcode'] != 0x5 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + def parse(fields: dict, span: bytes) -> Tuple['SessionDataTransferStatusNtf', bytes]: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_TRANSFER_STATUS or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 7: raise Exception('Invalid packet size') @@ -4158,74 +3384,74 @@ class DataTransferStatusNtf(SessionControlNotification): fields['status'] = DataTransferNtfStatusCode.from_int(span[5]) fields['tx_count'] = span[6] span = span[7:] - return DataTransferStatusNtf(**fields), span + return SessionDataTransferStatusNtf(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() if self.session_token > 4294967295: - print(f"Invalid value for field DataTransferStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") + print(f"Invalid value for field SessionDataTransferStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") self.session_token &= 4294967295 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) if self.uci_sequence_number > 255: - print(f"Invalid value for field DataTransferStatusNtf::uci_sequence_number: {self.uci_sequence_number} > 255; the value will be truncated") + print(f"Invalid value for field SessionDataTransferStatusNtf::uci_sequence_number: {self.uci_sequence_number} > 255; the value will be truncated") self.uci_sequence_number &= 255 _span.append((self.uci_sequence_number << 0)) _span.append((self.status << 0)) if self.tx_count > 255: - print(f"Invalid value for field DataTransferStatusNtf::tx_count: {self.tx_count} > 255; the value will be truncated") + print(f"Invalid value for field SessionDataTransferStatusNtf::tx_count: {self.tx_count} > 255; the value will be truncated") self.tx_count &= 255 _span.append((self.tx_count << 0)) - return SessionControlNotification.serialize(self, payload = bytes(_span)) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 7 @dataclass -class SessionQueryMaxDataSizeCmd(SessionConfigCommand): +class SessionQueryMaxDataSizeInRangingCmd(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 11 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.COMMAND + self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING + self.gid = GroupId.SESSION_CONFIG @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeCmd', bytes]: - if fields['opcode'] != 0xb or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND: + def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingCmd', bytes]: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 4: raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:4], byteorder='little') fields['session_token'] = value_ span = span[4:] - return SessionQueryMaxDataSizeCmd(**fields), span + return SessionQueryMaxDataSizeInRangingCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() if self.session_token > 4294967295: - print(f"Invalid value for field SessionQueryMaxDataSizeCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") + print(f"Invalid value for field SessionQueryMaxDataSizeInRangingCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") self.session_token &= 4294967295 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) - return SessionConfigCommand.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 4 @dataclass -class SessionQueryMaxDataSizeRsp(SessionConfigResponse): +class SessionQueryMaxDataSizeInRangingRsp(SessionConfigPacket): session_token: int = field(kw_only=True, default=0) max_data_size: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 11 - self.gid = GroupId.SESSION_CONFIG self.mt = MessageType.RESPONSE + self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING + self.gid = GroupId.SESSION_CONFIG @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeRsp', bytes]: - if fields['opcode'] != 0xb or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE: + def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingRsp', bytes]: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG: raise Exception("Invalid constraint field values") if len(span) < 6: raise Exception('Invalid packet size') @@ -4234,70 +3460,79 @@ class SessionQueryMaxDataSizeRsp(SessionConfigResponse): value_ = int.from_bytes(span[4:6], byteorder='little') fields['max_data_size'] = value_ span = span[6:] - return SessionQueryMaxDataSizeRsp(**fields), span + return SessionQueryMaxDataSizeInRangingRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() if self.session_token > 4294967295: - print(f"Invalid value for field SessionQueryMaxDataSizeRsp::session_token: {self.session_token} > 4294967295; the value will be truncated") + print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::session_token: {self.session_token} > 4294967295; the value will be truncated") self.session_token &= 4294967295 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) if self.max_data_size > 65535: - print(f"Invalid value for field SessionQueryMaxDataSizeRsp::max_data_size: {self.max_data_size} > 65535; the value will be truncated") + print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::max_data_size: {self.max_data_size} > 65535; the value will be truncated") self.max_data_size &= 65535 _span.extend(int.to_bytes((self.max_data_size << 0), length=2, byteorder='little')) - return SessionConfigResponse.serialize(self, payload = bytes(_span)) + return SessionConfigPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 6 @dataclass -class SessionStartCmd(SessionControlCommand): - +class SessionStartCmd(SessionControlPacket): + session_id: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.COMMAND + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionStartCmd', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") + if len(span) < 4: + raise Exception('Invalid packet size') + value_ = int.from_bytes(span[0:4], byteorder='little') + fields['session_id'] = value_ + span = span[4:] return SessionStartCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return SessionControlCommand.serialize(self, payload = bytes(_span)) + if self.session_id > 4294967295: + print(f"Invalid value for field SessionStartCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") + self.session_id &= 4294967295 + _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return 0 + return 4 @dataclass -class SessionStartRsp(SessionControlResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionStartRsp(SessionControlPacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.RESPONSE + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionStartRsp', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] return SessionStartRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return SessionControlResponse.serialize(self, payload = bytes(_span)) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -4306,7 +3541,7 @@ class SessionStartRsp(SessionControlResponse): @dataclass class ShortAddressTwoWayRangingMeasurement(Packet): mac_address: int = field(kw_only=True, default=0) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) nlos: int = field(kw_only=True, default=0) distance: int = field(kw_only=True, default=0) aoa_azimuth: int = field(kw_only=True, default=0) @@ -4330,7 +3565,7 @@ class ShortAddressTwoWayRangingMeasurement(Packet): raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:2], byteorder='little') fields['mac_address'] = value_ - fields['status'] = StatusCode.from_int(span[2]) + fields['status'] = Status.from_int(span[2]) fields['nlos'] = span[3] value_ = int.from_bytes(span[4:6], byteorder='little') fields['distance'] = value_ @@ -4419,7 +3654,7 @@ class ShortAddressTwoWayRangingMeasurement(Packet): @dataclass class ExtendedAddressTwoWayRangingMeasurement(Packet): mac_address: int = field(kw_only=True, default=0) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) nlos: int = field(kw_only=True, default=0) distance: int = field(kw_only=True, default=0) aoa_azimuth: int = field(kw_only=True, default=0) @@ -4443,7 +3678,7 @@ class ExtendedAddressTwoWayRangingMeasurement(Packet): raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:8], byteorder='little') fields['mac_address'] = value_ - fields['status'] = StatusCode.from_int(span[8]) + fields['status'] = Status.from_int(span[8]) fields['nlos'] = span[9] value_ = int.from_bytes(span[10:12], byteorder='little') fields['distance'] = value_ @@ -4530,7 +3765,7 @@ class ExtendedAddressTwoWayRangingMeasurement(Packet): @dataclass class ShortAddressOwrAoaRangingMeasurement(Packet): mac_address: int = field(kw_only=True, default=0) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) nlos: int = field(kw_only=True, default=0) frame_sequence_number: int = field(kw_only=True, default=0) block_index: int = field(kw_only=True, default=0) @@ -4549,7 +3784,7 @@ class ShortAddressOwrAoaRangingMeasurement(Packet): raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:2], byteorder='little') fields['mac_address'] = value_ - fields['status'] = StatusCode.from_int(span[2]) + fields['status'] = Status.from_int(span[2]) fields['nlos'] = span[3] fields['frame_sequence_number'] = span[4] value_ = int.from_bytes(span[5:7], byteorder='little') @@ -4607,7 +3842,7 @@ class ShortAddressOwrAoaRangingMeasurement(Packet): @dataclass class ExtendedAddressOwrAoaRangingMeasurement(Packet): mac_address: int = field(kw_only=True, default=0) - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) nlos: int = field(kw_only=True, default=0) frame_sequence_number: int = field(kw_only=True, default=0) block_index: int = field(kw_only=True, default=0) @@ -4626,7 +3861,7 @@ class ExtendedAddressOwrAoaRangingMeasurement(Packet): raise Exception('Invalid packet size') value_ = int.from_bytes(span[0:8], byteorder='little') fields['mac_address'] = value_ - fields['status'] = StatusCode.from_int(span[8]) + fields['status'] = Status.from_int(span[8]) fields['nlos'] = span[9] fields['frame_sequence_number'] = span[10] value_ = int.from_bytes(span[11:13], byteorder='little') @@ -4696,7 +3931,7 @@ class RangingMeasurementType(enum.IntEnum): @dataclass -class SessionInfoNtf(SessionControlNotification): +class SessionInfoNtf(SessionControlPacket): sequence_number: int = field(kw_only=True, default=0) session_token: int = field(kw_only=True, default=0) rcr_indicator: int = field(kw_only=True, default=0) @@ -4705,13 +3940,13 @@ class SessionInfoNtf(SessionControlNotification): mac_address_indicator: MacAddressIndicator = field(kw_only=True, default=MacAddressIndicator.SHORT_ADDRESS) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionInfoNtf', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 24: raise Exception('Invalid packet size') @@ -4778,7 +4013,7 @@ class SessionInfoNtf(SessionControlNotification): _span.append((self.mac_address_indicator << 0)) _span.extend([0] * 8) _span.extend(payload or self.payload or []) - return SessionControlNotification.serialize(self, payload = bytes(_span)) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -4792,13 +4027,13 @@ class ShortMacTwoWaySessionInfoNtf(SessionInfoNtf): def __post_init__(self): self.ranging_measurement_type = RangingMeasurementType.TWO_WAY self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['ShortMacTwoWaySessionInfoNtf', bytes]: - if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') @@ -4841,13 +4076,13 @@ class ExtendedMacTwoWaySessionInfoNtf(SessionInfoNtf): def __post_init__(self): self.ranging_measurement_type = RangingMeasurementType.TWO_WAY self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacTwoWaySessionInfoNtf', bytes]: - if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') @@ -4890,13 +4125,13 @@ class ShortMacDlTDoASessionInfoNtf(SessionInfoNtf): def __post_init__(self): self.ranging_measurement_type = RangingMeasurementType.DL_TDOA self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['ShortMacDlTDoASessionInfoNtf', bytes]: - if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') @@ -4927,13 +4162,13 @@ class ExtendedMacDlTDoASessionInfoNtf(SessionInfoNtf): def __post_init__(self): self.ranging_measurement_type = RangingMeasurementType.DL_TDOA self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacDlTDoASessionInfoNtf', bytes]: - if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') @@ -4964,13 +4199,13 @@ class ShortMacOwrAoaSessionInfoNtf(SessionInfoNtf): def __post_init__(self): self.ranging_measurement_type = RangingMeasurementType.OWR_AOA self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['ShortMacOwrAoaSessionInfoNtf', bytes]: - if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') @@ -5013,13 +4248,13 @@ class ExtendedMacOwrAoaSessionInfoNtf(SessionInfoNtf): def __post_init__(self): self.ranging_measurement_type = RangingMeasurementType.OWR_AOA self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS - self.opcode = 0 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.NOTIFICATION + self.oid = SessionControlOpcodeId.START + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacOwrAoaSessionInfoNtf', bytes]: - if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION: + if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') @@ -5055,96 +4290,114 @@ class ExtendedMacOwrAoaSessionInfoNtf(SessionInfoNtf): ) @dataclass -class SessionStopCmd(SessionControlCommand): - +class SessionStopCmd(SessionControlPacket): + session_id: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.COMMAND + self.oid = SessionControlOpcodeId.STOP + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionStopCmd', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") + if len(span) < 4: + raise Exception('Invalid packet size') + value_ = int.from_bytes(span[0:4], byteorder='little') + fields['session_id'] = value_ + span = span[4:] return SessionStopCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return SessionControlCommand.serialize(self, payload = bytes(_span)) + if self.session_id > 4294967295: + print(f"Invalid value for field SessionStopCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") + self.session_id &= 4294967295 + _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return 0 + return 4 @dataclass -class SessionStopRsp(SessionControlResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionStopRsp(SessionControlPacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.RESPONSE + self.oid = SessionControlOpcodeId.STOP + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionStopRsp', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] return SessionStopRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return SessionControlResponse.serialize(self, payload = bytes(_span)) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 1 @dataclass -class SessionGetRangingCountCmd(SessionControlCommand): - +class SessionGetRangingCountCmd(SessionControlPacket): + session_id: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 3 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.COMMAND + self.oid = SessionControlOpcodeId.GET_RANGING_COUNT + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountCmd', bytes]: - if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") + if len(span) < 4: + raise Exception('Invalid packet size') + value_ = int.from_bytes(span[0:4], byteorder='little') + fields['session_id'] = value_ + span = span[4:] return SessionGetRangingCountCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return SessionControlCommand.serialize(self, payload = bytes(_span)) + if self.session_id > 4294967295: + print(f"Invalid value for field SessionGetRangingCountCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") + self.session_id &= 4294967295 + _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: - return 0 + return 4 @dataclass -class SessionGetRangingCountRsp(SessionControlResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class SessionGetRangingCountRsp(SessionControlPacket): + status: Status = field(kw_only=True, default=Status.OK) count: int = field(kw_only=True, default=0) def __post_init__(self): - self.opcode = 3 - self.gid = GroupId.SESSION_CONTROL self.mt = MessageType.RESPONSE + self.oid = SessionControlOpcodeId.GET_RANGING_COUNT + self.gid = GroupId.SESSION_CONTROL @staticmethod def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountRsp', bytes]: - if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL: raise Exception("Invalid constraint field values") if len(span) < 5: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) value_ = int.from_bytes(span[1:5], byteorder='little') fields['count'] = value_ span = span[5:] @@ -5157,30 +4410,30 @@ class SessionGetRangingCountRsp(SessionControlResponse): print(f"Invalid value for field SessionGetRangingCountRsp::count: {self.count} > 4294967295; the value will be truncated") self.count &= 4294967295 _span.extend(int.to_bytes((self.count << 0), length=4, byteorder='little')) - return SessionControlResponse.serialize(self, payload = bytes(_span)) + return SessionControlPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 5 @dataclass -class AndroidGetPowerStatsCmd(AndroidCommand): +class AndroidGetPowerStatsCmd(AndroidPacket): def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.VENDOR_ANDROID self.mt = MessageType.COMMAND + self.oid = AndroidOpcodeId.GET_POWER_STATS + self.gid = GroupId.VENDOR_ANDROID @staticmethod def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsCmd', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID: raise Exception("Invalid constraint field values") return AndroidGetPowerStatsCmd(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() - return AndroidCommand.serialize(self, payload = bytes(_span)) + return AndroidPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -5188,7 +4441,7 @@ class AndroidGetPowerStatsCmd(AndroidCommand): @dataclass class PowerStats(Packet): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) + status: Status = field(kw_only=True, default=Status.OK) idle_time_ms: int = field(kw_only=True, default=0) tx_time_ms: int = field(kw_only=True, default=0) rx_time_ms: int = field(kw_only=True, default=0) @@ -5202,7 +4455,7 @@ class PowerStats(Packet): fields = {'payload': None} if len(span) < 17: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) value_ = int.from_bytes(span[1:5], byteorder='little') fields['idle_time_ms'] = value_ value_ = int.from_bytes(span[5:9], byteorder='little') @@ -5240,17 +4493,17 @@ class PowerStats(Packet): return 17 @dataclass -class AndroidGetPowerStatsRsp(AndroidResponse): +class AndroidGetPowerStatsRsp(AndroidPacket): stats: PowerStats = field(kw_only=True, default_factory=PowerStats) def __post_init__(self): - self.opcode = 0 - self.gid = GroupId.VENDOR_ANDROID self.mt = MessageType.RESPONSE + self.oid = AndroidOpcodeId.GET_POWER_STATS + self.gid = GroupId.VENDOR_ANDROID @staticmethod def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsRsp', bytes]: - if fields['opcode'] != 0x0 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID: raise Exception("Invalid constraint field values") if len(span) < 17: raise Exception('Invalid packet size') @@ -5261,24 +4514,24 @@ class AndroidGetPowerStatsRsp(AndroidResponse): def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.extend(self.stats.serialize()) - return AndroidResponse.serialize(self, payload = bytes(_span)) + return AndroidPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 17 @dataclass -class AndroidSetCountryCodeCmd(AndroidCommand): +class AndroidSetCountryCodeCmd(AndroidPacket): country_code: bytearray = field(kw_only=True, default_factory=bytearray) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.VENDOR_ANDROID self.mt = MessageType.COMMAND + self.oid = AndroidOpcodeId.SET_COUNTRY_CODE + self.gid = GroupId.VENDOR_ANDROID @staticmethod def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeCmd', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.COMMAND: + if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID: raise Exception("Invalid constraint field values") if len(span) < 2: raise Exception('Invalid packet size') @@ -5289,35 +4542,35 @@ class AndroidSetCountryCodeCmd(AndroidCommand): def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.extend(self.country_code) - return AndroidCommand.serialize(self, payload = bytes(_span)) + return AndroidPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return 2 @dataclass -class AndroidSetCountryCodeRsp(AndroidResponse): - status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK) +class AndroidSetCountryCodeRsp(AndroidPacket): + status: Status = field(kw_only=True, default=Status.OK) def __post_init__(self): - self.opcode = 1 - self.gid = GroupId.VENDOR_ANDROID self.mt = MessageType.RESPONSE + self.oid = AndroidOpcodeId.SET_COUNTRY_CODE + self.gid = GroupId.VENDOR_ANDROID @staticmethod def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeRsp', bytes]: - if fields['opcode'] != 0x1 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.RESPONSE: + if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID: raise Exception("Invalid constraint field values") if len(span) < 1: raise Exception('Invalid packet size') - fields['status'] = StatusCode.from_int(span[0]) + fields['status'] = Status.from_int(span[0]) span = span[1:] return AndroidSetCountryCodeRsp(**fields), span def serialize(self, payload: bytes = None) -> bytes: _span = bytearray() _span.append((self.status << 0)) - return AndroidResponse.serialize(self, payload = bytes(_span)) + return AndroidPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: @@ -5705,19 +4958,19 @@ class FrameReport(Packet): return sum([elt.size for elt in self.frame_report_tlvs]) + 4 @dataclass -class AndroidRangeDiagnosticsNtf(AndroidNotification): +class AndroidRangeDiagnosticsNtf(AndroidPacket): session_token: int = field(kw_only=True, default=0) sequence_number: int = field(kw_only=True, default=0) frame_reports: List[FrameReport] = field(kw_only=True, default_factory=list) def __post_init__(self): - self.opcode = 2 - self.gid = GroupId.VENDOR_ANDROID self.mt = MessageType.NOTIFICATION + self.oid = AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS + self.gid = GroupId.VENDOR_ANDROID @staticmethod def parse(fields: dict, span: bytes) -> Tuple['AndroidRangeDiagnosticsNtf', bytes]: - if fields['opcode'] != 0x2 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.NOTIFICATION: + if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS or fields['gid'] != GroupId.VENDOR_ANDROID: raise Exception("Invalid constraint field values") if len(span) < 9: raise Exception('Invalid packet size') @@ -5750,424 +5003,8 @@ class AndroidRangeDiagnosticsNtf(AndroidNotification): _span.append((len(self.frame_reports) << 0)) for _elt in self.frame_reports: _span.extend(_elt.serialize()) - return AndroidNotification.serialize(self, payload = bytes(_span)) + return AndroidPacket.serialize(self, payload = bytes(_span)) @property def size(self) -> int: return sum([elt.size for elt in self.frame_reports]) + 9 - -@dataclass -class UciVendor_9_Command(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_9 - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_9_Command', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_9 or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_9_Command(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_A_Command(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_A - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Command', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_A or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_A_Command(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_B_Command(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_B - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Command', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_B or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_B_Command(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_E_Command(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_E - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Command', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_E or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_E_Command(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_F_Command(UciCommand): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_F - self.mt = MessageType.COMMAND - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Command', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_F or fields['mt'] != MessageType.COMMAND: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_F_Command(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciCommand.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_9_Response(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_9 - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_9_Response', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_9 or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_9_Response(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_A_Response(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_A - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Response', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_A or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_A_Response(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_B_Response(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_B - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Response', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_B or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_B_Response(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_E_Response(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_E - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Response', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_E or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_E_Response(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_F_Response(UciResponse): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_F - self.mt = MessageType.RESPONSE - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Response', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_F or fields['mt'] != MessageType.RESPONSE: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_F_Response(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciResponse.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_9_Notification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_9 - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_9_Notification', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_9 or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_9_Notification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_A_Notification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_A - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Notification', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_A or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_A_Notification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_B_Notification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_B - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Notification', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_B or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_B_Notification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_E_Notification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_E - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Notification', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_E or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_E_Notification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class UciVendor_F_Notification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.VENDOR_RESERVED_F - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Notification', bytes]: - if fields['gid'] != GroupId.VENDOR_RESERVED_F or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return UciVendor_F_Notification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) - -@dataclass -class TestNotification(UciNotification): - - - def __post_init__(self): - self.gid = GroupId.TEST - self.mt = MessageType.NOTIFICATION - - @staticmethod - def parse(fields: dict, span: bytes) -> Tuple['TestNotification', bytes]: - if fields['gid'] != GroupId.TEST or fields['mt'] != MessageType.NOTIFICATION: - raise Exception("Invalid constraint field values") - payload = span - span = bytes([]) - fields['payload'] = payload - return TestNotification(**fields), span - - def serialize(self, payload: bytes = None) -> bytes: - _span = bytearray() - _span.extend(payload or self.payload or []) - return UciNotification.serialize(self, payload = bytes(_span)) - - @property - def size(self) -> int: - return len(self.payload) diff --git a/src/device.rs b/src/device.rs index 449ec7b..e0fcf61 100644 --- a/src/device.rs +++ b/src/device.rs @@ -17,7 +17,6 @@ use crate::MacAddress; use crate::PicaCommand; use std::collections::HashMap; -use std::iter::Extend; use std::time::Duration; use tokio::sync::mpsc; @@ -77,15 +76,36 @@ pub const DEFAULT_CAPS_INFO: &[(CapTlvType, &[u8])] = &[ ), ]; +/// [UCI] 8.2 Device Configuration Parameters +pub struct DeviceConfig { + device_state: DeviceState, + // This config is used to enable/disable the low power mode. + // 0x00 = Disable low power mode + // 0x01 = Enable low power mode (default) + low_power_mode: bool, +} + +// [UCI] 6.3.1 Setting the Configuration +// All device configuration parameters within the UWBS are set to +// default values at Table 44 [..]. +impl Default for DeviceConfig { + fn default() -> Self { + DeviceConfig { + device_state: DeviceState::DeviceStateError, + low_power_mode: true, + } + } +} + pub struct Device { pub handle: usize, pub mac_address: MacAddress, + config: DeviceConfig, /// [UCI] 5. UWBS Device State Machine state: DeviceState, sessions: HashMap<u32, Session>, pub tx: mpsc::UnboundedSender<UciPacket>, pica_tx: mpsc::Sender<PicaCommand>, - config: HashMap<DeviceConfigId, Vec<u8>>, country_code: [u8; 2], pub n_active_sessions: usize, @@ -101,11 +121,11 @@ impl Device { Device { handle, mac_address, + config: Default::default(), state: DeviceState::DeviceStateError, // Will be overwitten sessions: Default::default(), tx, pica_tx, - config: HashMap::new(), country_code: Default::default(), n_active_sessions: 0, } @@ -236,26 +256,41 @@ impl Device { log::debug!("[{}] SetConfig", self.handle); assert_eq!(self.state, DeviceState::DeviceStateReady); // UCI 6.3 - let (valid_parameters, invalid_config_status) = cmd.get_tlvs().iter().fold( - (HashMap::new(), Vec::new()), - |(mut valid_parameters, invalid_config_status), param| { - // TODO: DeviceState is a read only parameter - valid_parameters.insert(param.cfg_id, param.v.clone()); - - (valid_parameters, invalid_config_status) - }, - ); - - let (status, parameters) = if invalid_config_status.is_empty() { - self.config.extend(valid_parameters); - (uci::Status::Ok, Vec::new()) - } else { - (uci::Status::InvalidParam, invalid_config_status) - }; + // [UCI] 6.3.1 Setting the Configuration + // The UWBS shall respond with CORE_SET_CONFIG_RSP setting the Status + // field of STATUS_INVALID_PARAM and including one or more invalid + // Parameter ID(s) If the Host tries to set a parameter that is not + // available in the UWBS. All other configuration parameters should + // have been set to the new values within the UWBS. + let mut invalid_parameters = vec![]; + for parameter in cmd.get_parameters() { + match parameter.id { + uci::ConfigParameterId::DeviceState => { + invalid_parameters.push(uci::ConfigParameterStatus { + id: parameter.id, + status: uci::Status::ReadOnly, + }) + } + uci::ConfigParameterId::LowPowerMode => { + self.config.low_power_mode = parameter.value.first().copied().unwrap_or(1) != 0; + } + uci::ConfigParameterId::Rfu(id) => { + log::warn!("unknown config parameter id 0x{:02x}", *id); + invalid_parameters.push(uci::ConfigParameterStatus { + id: parameter.id, + status: uci::Status::InvalidParam, + }) + } + } + } CoreSetConfigRspBuilder { - cfg_status: parameters, - status, + status: if invalid_parameters.is_empty() { + uci::Status::Ok + } else { + uci::Status::InvalidParam + }, + parameters: invalid_parameters, } .build() } @@ -263,45 +298,45 @@ impl Device { pub fn core_get_config(&self, cmd: CoreGetConfigCmd) -> CoreGetConfigRsp { log::debug!("[{}] GetConfig", self.handle); - // TODO: do this config shall be set on device reset - let ids = cmd.get_cfg_id(); - let (valid_parameters, invalid_parameters) = ids.iter().fold( - (Vec::new(), Vec::new()), - |(mut valid_parameters, mut invalid_parameters), id| { - // UCI Core Section 6.3.2 Table 8 - // UCI Core Section 6.3.2 - Return the Configuration - // If the status code is ok, return the params - // If there is at least one invalid param, return the list of invalid params - // If the ID is not present in our config, return the Type with length = 0 - match DeviceConfigId::try_from(*id) { - Ok(cfg_id) => match self.config.get(&cfg_id) { - Some(value) => valid_parameters.push(DeviceConfigTlv { - cfg_id, - v: value.clone(), - }), - None => invalid_parameters.push(DeviceConfigTlv { - cfg_id, - v: Vec::new(), - }), - }, - Err(_) => log::error!("Failed to parse config id: {:?}", id), - } - - (valid_parameters, invalid_parameters) - }, - ); + // [UCI] 6.3.2 Retrieve the Configuration + // If the Host tries to retrieve any Parameter(s) that are not available + // in the UWBS, the UWBS shall respond with a CORE_GET_CONFIG_RSP with + // a Status field of STATUS_INVALID_PARAM, containing each unavailable + // Device Configuration Parameter Type with Length field is zero. In + // this case, the CORE_GET_CONFIG_RSP shall not include any parameter(s) + // that are available in the UWBS. + let mut valid_parameters = vec![]; + let mut invalid_parameters = vec![]; + for id in cmd.get_parameter_ids() { + match id { + ConfigParameterId::DeviceState => valid_parameters.push(ConfigParameter { + id: *id, + value: vec![self.config.device_state.into()], + }), + ConfigParameterId::LowPowerMode => valid_parameters.push(ConfigParameter { + id: *id, + value: vec![self.config.low_power_mode.into()], + }), + ConfigParameterId::Rfu(_) => invalid_parameters.push(ConfigParameter { + id: *id, + value: vec![], + }), + } + } - let (status, parameters) = if invalid_parameters.is_empty() { - (uci::Status::Ok, valid_parameters) + if invalid_parameters.is_empty() { + CoreGetConfigRspBuilder { + status: uci::Status::Ok, + parameters: valid_parameters, + } + .build() } else { - (uci::Status::InvalidParam, invalid_parameters) - }; - - CoreGetConfigRspBuilder { - status, - tlvs: parameters, + CoreGetConfigRspBuilder { + status: uci::Status::InvalidParam, + parameters: invalid_parameters, + } + .build() } - .build() } fn session_init(&mut self, cmd: SessionInitCmd) -> SessionInitRsp { @@ -467,7 +467,7 @@ impl Pica { data_sequence_number: 0x01, pbf: PacketBoundaryFlag::Complete, session_handle: session_id, - source_address: device.mac_address.into(), + source_address: session.app_config.device_mac_address.unwrap().into(), status: uci::Status::Ok, } .build() diff --git a/src/mac_address.rs b/src/mac_address.rs index 6b67e7e..6c93981 100644 --- a/src/mac_address.rs +++ b/src/mac_address.rs @@ -43,15 +43,21 @@ impl MacAddress { } } -impl From<MacAddress> for u64 { - fn from(mac_address: MacAddress) -> Self { +impl From<&MacAddress> for u64 { + fn from(mac_address: &MacAddress) -> Self { match mac_address { - MacAddress::Short(addr) => u16::from_le_bytes(addr) as u64, - MacAddress::Extended(addr) => u64::from_le_bytes(addr), + MacAddress::Short(addr) => u16::from_le_bytes(*addr) as u64, + MacAddress::Extended(addr) => u64::from_le_bytes(*addr), } } } +impl From<MacAddress> for u64 { + fn from(mac_address: MacAddress) -> Self { + u64::from(&mac_address) + } +} + impl From<&MacAddress> for Vec<u8> { fn from(mac_address: &MacAddress) -> Self { match mac_address { diff --git a/src/uci_packets.pdl b/src/uci_packets.pdl index 06fd9b0..4a2f69f 100644 --- a/src/uci_packets.pdl +++ b/src/uci_packets.pdl @@ -166,11 +166,6 @@ enum ResetConfig : 8 { UWBS_RESET = 0x00, } -enum DeviceConfigId : 8 { - DEVICE_STATE = 0x00, - LOW_POWER_MODE = 0x01, -} - // [UCI] Table 45: APP Configuration Parameters IDs enum AppConfigTlvType : 8 { DEVICE_TYPE = 0x00, @@ -774,30 +769,37 @@ test CoreGetCapsInfoRsp { "\x40\x03\x00\x05\x00\x00\x00\x00\x01\x00\x01\x01", } -struct DeviceConfigTlv { - cfg_id: DeviceConfigId, - _count_(v): 8, - v: 8[], +// [UCI] Table 44: Device Configuration Parameters +enum ConfigParameterId : 8 { + DEVICE_STATE = 0x00, + LOW_POWER_MODE = 0x01, + RFU = .., +} + +struct ConfigParameter { + id: ConfigParameterId, + _size_(value): 8, + value: 8[], } packet CoreSetConfigCmd : CorePacket (mt = COMMAND, oid = SET_CONFIG) { - _count_(tlvs): 8, - tlvs: DeviceConfigTlv[], + _count_(parameters): 8, + parameters: ConfigParameter[], } test CoreSetConfigCmd { "\x20\x04\x00\x03\x00\x00\x00\x01\x01\x00", } -struct DeviceConfigStatus { - cfg_id: DeviceConfigId, +struct ConfigParameterStatus { + id: ConfigParameterId, status: Status, } packet CoreSetConfigRsp : CorePacket (mt = RESPONSE, oid = SET_CONFIG) { status: Status, - _count_(cfg_status): 8, - cfg_status: DeviceConfigStatus[], + _count_(parameters): 8, + parameters: ConfigParameterStatus[], } test CoreSetConfigRsp { @@ -806,8 +808,8 @@ test CoreSetConfigRsp { } packet CoreGetConfigCmd : CorePacket (mt = COMMAND, oid = GET_CONFIG) { - _count_(cfg_id): 8, - cfg_id: 8[], // DeviceConfigId + _count_(parameter_ids): 8, + parameter_ids: ConfigParameterId[], } test CoreGetConfigCmd { @@ -816,8 +818,8 @@ test CoreGetConfigCmd { packet CoreGetConfigRsp : CorePacket (mt = RESPONSE, oid = GET_CONFIG) { status: Status, - _count_(tlvs): 8, - tlvs: DeviceConfigTlv[] + _count_(parameters): 8, + parameters: ConfigParameter[] } test CoreGetConfigRsp { diff --git a/tests/data_transfer.py b/tests/data_transfer.py index 7449a35..6d4fb8c 100755 --- a/tests/data_transfer.py +++ b/tests/data_transfer.py @@ -35,7 +35,7 @@ async def controller(host: Host, peer: Host, file: Path): ) ) - await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -93,7 +93,7 @@ async def controller(host: Host, peer: Host, file: Path): ) await host.expect_control( - uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[]) + uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) ) await host.expect_control( @@ -109,7 +109,7 @@ async def controller(host: Host, peer: Host, file: Path): # START SESSION CMD host.send_control(uci.SessionStartCmd(session_id=0)) - await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -120,7 +120,7 @@ async def controller(host: Host, peer: Host, file: Path): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) ) event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) @@ -132,7 +132,7 @@ async def controller(host: Host, peer: Host, file: Path): # STOP SESSION host.send_control(uci.SessionStopCmd(session_id=0)) - await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -143,13 +143,13 @@ async def controller(host: Host, peer: Host, file: Path): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) ) # DEINIT host.send_control(uci.SessionDeinitCmd(session_token=0)) - await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) async def controlee(host: Host, peer: Host, file: Path): @@ -162,7 +162,7 @@ async def controlee(host: Host, peer: Host, file: Path): ) ) - await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -220,7 +220,7 @@ async def controlee(host: Host, peer: Host, file: Path): ) await host.expect_control( - uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[]) + uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) ) await host.expect_control( @@ -233,7 +233,7 @@ async def controlee(host: Host, peer: Host, file: Path): host.send_control(uci.SessionStartCmd(session_id=0)) - await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -244,7 +244,7 @@ async def controlee(host: Host, peer: Host, file: Path): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) ) with file.open("rb") as f: @@ -252,8 +252,8 @@ async def controlee(host: Host, peer: Host, file: Path): event = await host.expect_data( uci.DataMessageRcv( session_handle=0, - status=uci.StatusCode.UCI_STATUS_OK, - source_address=int.from_bytes(peer.mac_address, "big"), + status=uci.Status.OK, + source_address=int.from_bytes(peer.mac_address, "little"), data_sequence_number=0x01, application_data=application_data, ), @@ -266,7 +266,7 @@ async def controlee(host: Host, peer: Host, file: Path): host.send_control(uci.SessionStopCmd(session_id=0)) - await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -277,12 +277,12 @@ async def controlee(host: Host, peer: Host, file: Path): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) ) host.send_control(uci.SessionDeinitCmd(session_token=0)) - await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) async def data_transfer( @@ -322,7 +322,7 @@ async def data_transfer( seq_num = 0 event = await host.expect_control( - uci.DataCreditNtf( + uci.SessionDataCreditNtf( session_token=int(session_id), credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE, ) @@ -338,7 +338,7 @@ async def data_transfer( ) ) event = await host.expect_control( - uci.DataCreditNtf( + uci.SessionDataCreditNtf( session_token=int(session_id), credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE, ) @@ -351,8 +351,8 @@ async def data_transfer( async def run(address: str, uci_port: int, file: Path): try: - host0 = await Host.connect(address, uci_port, bytes([0, 0])) - host1 = await Host.connect(address, uci_port, bytes([0, 1])) + host0 = await Host.connect(address, uci_port, bytes([0x34, 0x12])) + host1 = await Host.connect(address, uci_port, bytes([0x78, 0x56])) except Exception as e: raise Exception( f"Failed to connect to Pica server at address {address}:{uci_port}\n" diff --git a/tests/helper.py b/tests/helper.py index bbce7ad..f5d9db5 100644 --- a/tests/helper.py +++ b/tests/helper.py @@ -18,13 +18,13 @@ from pica.packets import uci async def init(host: Host): await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) ) - host.send_control(uci.DeviceResetCmd(reset_config=uci.ResetConfig.UWBS_RESET)) + host.send_control(uci.CoreDeviceResetCmd(reset_config=uci.ResetConfig.UWBS_RESET)) - await host.expect_control(uci.DeviceResetRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.CoreDeviceResetRsp(status=uci.Status.OK)) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) ) diff --git a/tests/ranging.py b/tests/ranging.py index 9af4376..165c5e3 100755 --- a/tests/ranging.py +++ b/tests/ranging.py @@ -32,7 +32,7 @@ async def controller(host: Host, peer: Host): ) ) - await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -90,7 +90,7 @@ async def controller(host: Host, peer: Host): ) await host.expect_control( - uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[]) + uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) ) await host.expect_control( @@ -103,7 +103,7 @@ async def controller(host: Host, peer: Host): host.send_control(uci.SessionStartCmd(session_id=0)) - await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -114,7 +114,7 @@ async def controller(host: Host, peer: Host): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) ) for _ in range(1, 3): @@ -123,7 +123,7 @@ async def controller(host: Host, peer: Host): host.send_control(uci.SessionStopCmd(session_id=0)) - await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -134,12 +134,12 @@ async def controller(host: Host, peer: Host): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) ) host.send_control(uci.SessionDeinitCmd(session_token=0)) - await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) async def controlee(host: Host, peer: Host): @@ -151,7 +151,7 @@ async def controlee(host: Host, peer: Host): ) ) - await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -209,7 +209,7 @@ async def controlee(host: Host, peer: Host): ) await host.expect_control( - uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[]) + uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[]) ) await host.expect_control( @@ -222,7 +222,7 @@ async def controlee(host: Host, peer: Host): host.send_control(uci.SessionStartCmd(session_id=0)) - await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -233,7 +233,7 @@ async def controlee(host: Host, peer: Host): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) ) for _ in range(1, 3): @@ -242,7 +242,7 @@ async def controlee(host: Host, peer: Host): host.send_control(uci.SessionStopCmd(session_id=0)) - await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK)) await host.expect_control( uci.SessionStatusNtf( @@ -253,12 +253,12 @@ async def controlee(host: Host, peer: Host): ) await host.expect_control( - uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) ) host.send_control(uci.SessionDeinitCmd(session_token=0)) - await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK)) async def run(address: str, uci_port: int): |