aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenri Chataing <henrichataing@google.com>2024-03-27 17:12:42 -0700
committerhchataing <104974782+hchataing@users.noreply.github.com>2024-04-01 16:37:50 -0700
commitb5006afdc4533af8fa4d5e6e0674f2a6a5c1af45 (patch)
treec1eec21f5011084c70f57ebf3f4a77312479b6bc
parent45c2327092a065d4e8c675e14e62dfbfe99fd47c (diff)
downloadpica-b5006afdc4533af8fa4d5e6e0674f2a6a5c1af45.tar.gz
Review the implementation of Core Set Config and Core Get Config
-rw-r--r--py/pica/pica/packets/uci.py2645
-rw-r--r--src/device.rs149
-rw-r--r--src/lib.rs2
-rw-r--r--src/mac_address.rs14
-rw-r--r--src/uci_packets.pdl40
-rwxr-xr-xtests/data_transfer.py40
-rw-r--r--tests/helper.py8
-rwxr-xr-xtests/ranging.py28
8 files changed, 903 insertions, 2023 deletions
diff --git a/py/pica/pica/packets/uci.py b/py/pica/pica/packets/uci.py
index 0043c45..574feab 100644
--- a/py/pica/pica/packets/uci.py
+++ b/py/pica/pica/packets/uci.py
@@ -94,16 +94,30 @@ class PacketBoundaryFlag(enum.IntEnum):
raise exn
+class MessageType(enum.IntEnum):
+ DATA = 0x0
+ COMMAND = 0x1
+ RESPONSE = 0x2
+ NOTIFICATION = 0x3
+
+ @staticmethod
+ def from_int(v: int) -> Union[int, 'MessageType']:
+ try:
+ return MessageType(v)
+ except ValueError as exn:
+ raise exn
+
+
class GroupId(enum.IntEnum):
CORE = 0x0
SESSION_CONFIG = 0x1
SESSION_CONTROL = 0x2
DATA_CONTROL = 0x3
- TEST = 0xd
VENDOR_RESERVED_9 = 0x9
VENDOR_RESERVED_A = 0xa
VENDOR_RESERVED_B = 0xb
VENDOR_ANDROID = 0xc
+ TEST = 0xd
VENDOR_RESERVED_E = 0xe
VENDOR_RESERVED_F = 0xf
@@ -127,154 +141,112 @@ class DataPacketFormat(enum.IntEnum):
raise exn
-class GroupIdOrDataPacketFormat(enum.IntEnum):
- CORE = 0x0
- SESSION_CONFIG_OR_DATA_SND = 0x1
- SESSION_CONTROL_OR_DATA_RCV = 0x2
- DATA_CONTROL = 0x3
- TEST = 0xd
- VENDOR_RESERVED_9 = 0x9
- VENDOR_RESERVED_A = 0xa
- VENDOR_RESERVED_B = 0xb
- VENDOR_ANDROID = 0xc
- VENDOR_RESERVED_E = 0xe
- VENDOR_RESERVED_F = 0xf
-
- @staticmethod
- def from_int(v: int) -> Union[int, 'GroupIdOrDataPacketFormat']:
- try:
- return GroupIdOrDataPacketFormat(v)
- except ValueError as exn:
- raise exn
-
-
-class CoreOpCode(enum.IntEnum):
- CORE_DEVICE_RESET = 0x0
- CORE_DEVICE_STATUS_NTF = 0x1
- CORE_DEVICE_INFO = 0x2
- CORE_GET_CAPS_INFO = 0x3
- CORE_SET_CONFIG = 0x4
- CORE_GET_CONFIG = 0x5
- CORE_DEVICE_SUSPEND = 0x6
- CORE_GENERIC_ERROR_NTF = 0x7
- CORE_QUERY_UWBS_TIMESTAMP = 0x8
+class CoreOpcodeId(enum.IntEnum):
+ DEVICE_RESET = 0x0
+ DEVICE_STATUS = 0x1
+ GET_DEVICE_INFO = 0x2
+ GET_CAPS_INFO = 0x3
+ SET_CONFIG = 0x4
+ GET_CONFIG = 0x5
+ GENERIC_ERROR = 0x7
+ QUERY_UWBS_TIMESTAMP = 0x8
@staticmethod
- def from_int(v: int) -> Union[int, 'CoreOpCode']:
+ def from_int(v: int) -> Union[int, 'CoreOpcodeId']:
try:
- return CoreOpCode(v)
+ return CoreOpcodeId(v)
except ValueError as exn:
raise exn
-class SessionConfigOpCode(enum.IntEnum):
- SESSION_INIT = 0x0
- SESSION_DEINIT = 0x1
- SESSION_STATUS_NTF = 0x2
- SESSION_SET_APP_CONFIG = 0x3
- SESSION_GET_APP_CONFIG = 0x4
- SESSION_GET_COUNT = 0x5
- SESSION_GET_STATE = 0x6
- SESSION_UPDATE_CONTROLLER_MULTICAST_LIST = 0x7
- SESSION_UPDATE_ACTIVE_ROUNDS_ANCHOR = 0x8
- SESSION_UPDATE_ACTIVE_ROUNDS_DT_TAG = 0x9
- SESSION_SET_INITIATOR_DT_ANCHOR_RR_RDM_LIST = 0xa
- SESSION_QUERY_DATA_SIZE_IN_RANGING = 0xb
- SESSION_SET_HUS_CONFIG = 0xc
+class SessionConfigOpcodeId(enum.IntEnum):
+ INIT = 0x0
+ DEINIT = 0x1
+ STATUS = 0x2
+ SET_APP_CONFIG = 0x3
+ GET_APP_CONFIG = 0x4
+ GET_COUNT = 0x5
+ GET_STATE = 0x6
+ UPDATE_CONTROLLER_MULTICAST_LIST = 0x7
+ UPDATE_DT_ANCHOR_RANGING_ROUNDS = 0x8
+ UPDATE_DT_TAG_RANGING_ROUNDS = 0x9
+ QUERY_DATA_SIZE_IN_RANGING = 0xb
@staticmethod
- def from_int(v: int) -> Union[int, 'SessionConfigOpCode']:
+ def from_int(v: int) -> Union[int, 'SessionConfigOpcodeId']:
try:
- return SessionConfigOpCode(v)
+ return SessionConfigOpcodeId(v)
except ValueError as exn:
raise exn
-class SessionControlOpCode(enum.IntEnum):
- SESSION_START = 0x0
- SESSION_STOP = 0x1
- SESSION_RESERVED = 0x2
- SESSION_GET_RANGING_COUNT = 0x3
- SESSION_DATA_CREDIT_NTF = 0x4
- SESSION_DATA_TRANSFER_STATUS_NTF = 0x5
+class SessionControlOpcodeId(enum.IntEnum):
+ START = 0x0
+ STOP = 0x1
+ GET_RANGING_COUNT = 0x3
+ DATA_CREDIT = 0x4
+ DATA_TRANSFER_STATUS = 0x5
@staticmethod
- def from_int(v: int) -> Union[int, 'SessionControlOpCode']:
+ def from_int(v: int) -> Union[int, 'SessionControlOpcodeId']:
try:
- return SessionControlOpCode(v)
+ return SessionControlOpcodeId(v)
except ValueError as exn:
raise exn
-class AppDataOpCode(enum.IntEnum):
- APP_DATA_TX = 0x0
- APP_DATA_RX = 0x1
+class AndroidOpcodeId(enum.IntEnum):
+ GET_POWER_STATS = 0x0
+ SET_COUNTRY_CODE = 0x1
+ FIRA_RANGE_DIAGNOSTICS = 0x2
@staticmethod
- def from_int(v: int) -> Union[int, 'AppDataOpCode']:
+ def from_int(v: int) -> Union[int, 'AndroidOpcodeId']:
try:
- return AppDataOpCode(v)
+ return AndroidOpcodeId(v)
except ValueError as exn:
raise exn
-class AndroidOpCode(enum.IntEnum):
- ANDROID_GET_POWER_STATS = 0x0
- ANDROID_SET_COUNTRY_CODE = 0x1
- ANDROID_FIRA_RANGE_DIAGNOSTICS = 0x2
-
- @staticmethod
- def from_int(v: int) -> Union[int, 'AndroidOpCode']:
- try:
- return AndroidOpCode(v)
- except ValueError as exn:
- raise exn
-
-
-class StatusCode(enum.IntEnum):
- UCI_STATUS_OK = 0x0
- UCI_STATUS_REJECTED = 0x1
- UCI_STATUS_FAILED = 0x2
- UCI_STATUS_SYNTAX_ERROR = 0x3
- UCI_STATUS_INVALID_PARAM = 0x4
- UCI_STATUS_INVALID_RANGE = 0x5
- UCI_STATUS_INVALID_MSG_SIZE = 0x6
- UCI_STATUS_UNKNOWN_GID = 0x7
- UCI_STATUS_UNKNOWN_OID = 0x8
- UCI_STATUS_READ_ONLY = 0x9
- UCI_STATUS_COMMAND_RETRY = 0xa
- UCI_STATUS_UNKNOWN = 0xb
- UCI_STATUS_NOT_APPLICABLE = 0xc
- UCI_STATUS_SESSION_NOT_EXIST = 0x11
- UCI_STATUS_SESSION_DUPLICATE = 0x12
- UCI_STATUS_SESSION_ACTIVE = 0x13
- UCI_STATUS_MAX_SESSIONS_EXCEEDED = 0x14
- UCI_STATUS_SESSION_NOT_CONFIGURED = 0x15
- UCI_STATUS_ACTIVE_SESSIONS_ONGOING = 0x16
- UCI_STATUS_MULTICAST_LIST_FULL = 0x17
- UCI_STATUS_ADDRESS_NOT_FOUND = 0x18
- UCI_STATUS_ADDRESS_ALREADY_PRESENT = 0x19
- UCI_STATUS_ERROR_UWB_INITIATION_TIME_TOO_OLD = 0x1a
- UCI_STATUS_OK_NEGATIVE_DISTANCE_REPORT = 0x1b
- UCI_STATUS_RANGING_TX_FAILED = 0x20
- UCI_STATUS_RANGING_RX_TIMEOUT = 0x21
- UCI_STATUS_RANGING_RX_PHY_DEC_FAILED = 0x22
- UCI_STATUS_RANGING_RX_PHY_TOA_FAILED = 0x23
- UCI_STATUS_RANGING_RX_PHY_STS_FAILED = 0x24
- UCI_STATUS_RANGING_RX_MAC_DEC_FAILED = 0x25
- UCI_STATUS_RANGING_RX_MAC_IE_DEC_FAILED = 0x26
- UCI_STATUS_RANGING_RX_MAC_IE_MISSING = 0x27
- UCI_STATUS_ERROR_ROUND_INDEX_NOT_ACTIVATED = 0x28
- UCI_STATUS_ERROR_NUMBER_OF_ACTIVE_RANGING_ROUNDS_EXCEEDED = 0x29
- UCI_STATUS_ERROR_DL_TDOA_DEVICE_ADDRESS_NOT_MATCHING_IN_REPLY_TIME_LIST = 0x2a
- UCI_STATUS_DATA_MAX_TX_PSDU_SIZE_EXCEEDED = 0x30
- UCI_STATUS_DATA_RX_CRC_ERROR = 0x31
- VENDOR_SPECIFIC_STATUS_CODE_2 = 0xff
-
- @staticmethod
- def from_int(v: int) -> Union[int, 'StatusCode']:
- try:
- return StatusCode(v)
+class Status(enum.IntEnum):
+ OK = 0x0
+ REJECTED = 0x1
+ FAILED = 0x2
+ SYNTAX_ERROR = 0x3
+ INVALID_PARAM = 0x4
+ INVALID_RANGE = 0x5
+ INVALID_MESSAGE_SIZE = 0x6
+ UNKNOWN_GID = 0x7
+ UNKNOWN_OID = 0x8
+ READ_ONLY = 0x9
+ UCI_MESSAGE_RETRY = 0xa
+ UNKNOWN = 0xb
+ NOT_APPLICABLE = 0xc
+ ERROR_SESSION_NOT_EXIST = 0x11
+ ERROR_SESSION_DUPLICATE = 0x12
+ ERROR_SESSION_ACTIVE = 0x13
+ ERROR_MAX_SESSIONS_EXCEEDED = 0x14
+ ERROR_SESSION_NOT_CONFIGURED = 0x15
+ ERROR_ACTIVE_SESSIONS_ONGOING = 0x16
+ ERROR_MULTICAST_LIST_FULL = 0x17
+ ERROR_UWB_INITIATION_TIME_TOO_OLD = 0x1a
+ OK_NEGATIVE_DISTANCE_REPORT = 0x1b
+ RANGING_TX_FAILED = 0x20
+ RANGING_RX_TIMEOUT = 0x21
+ RANGING_RX_PHY_DEC_FAILED = 0x22
+ RANGING_RX_PHY_TOA_FAILED = 0x23
+ RANGING_RX_PHY_STS_FAILED = 0x24
+ RANGING_RX_MAC_DEC_FAILED = 0x25
+ RANGING_RX_MAC_IE_DEC_FAILED = 0x26
+ RANGING_RX_MAC_IE_MISSING = 0x27
+ ERROR_ROUND_INDEX_NOT_ACTIVATED = 0x28
+ ERROR_NUMBER_OF_ACTIVE_RANGING_ROUNDS_EXCEEDED = 0x29
+ ERROR_DL_TDOA_DEVICE_ADDRESS_NOT_MATCHING_IN_REPLY_TIME_LIST = 0x2a
+
+ @staticmethod
+ def from_int(v: int) -> Union[int, 'Status']:
+ try:
+ return Status(v)
except ValueError as exn:
return v
@@ -333,18 +305,6 @@ class ResetConfig(enum.IntEnum):
raise exn
-class DeviceConfigId(enum.IntEnum):
- DEVICE_STATE = 0x0
- LOW_POWER_MODE = 0x1
-
- @staticmethod
- def from_int(v: int) -> Union[int, 'DeviceConfigId']:
- try:
- return DeviceConfigId(v)
- except ValueError as exn:
- raise exn
-
-
class AppConfigTlvType(enum.IntEnum):
DEVICE_TYPE = 0x0
RANGING_ROUND_USAGE = 0x1
@@ -982,20 +942,21 @@ class ReasonCode(enum.IntEnum):
return v
-class MulticastUpdateStatusCode(enum.IntEnum):
- STATUS_OK_MULTICAST_LIST_UPDATE = 0x0
- STATUS_ERROR_MULTICAST_LIST_FULL = 0x1
- STATUS_ERROR_KEY_FETCH_FAIL = 0x2
- STATUS_ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3
- STATUS_ERROR_SUB_SESSION_KEY_NOT_FOUND = 0x5
- STATUS_ERROR_SUB_SESSION_KEY_NOT_APPLICABLE = 0x6
- STATUS_ERROR_SESSION_KEY_NOT_FOUND = 0x7
- STATUS_ERROR_ADDRESS_ALREADY_PRESENT = 0x8
+class MulticastUpdateStatus(enum.IntEnum):
+ OK_MULTICAST_LIST_UPDATE = 0x0
+ ERROR_MULTICAST_LIST_FULL = 0x1
+ ERROR_KEY_FETCH_FAIL = 0x2
+ ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3
+ ERROR_SUB_SESSION_KEY_NOT_FOUND = 0x4
+ ERROR_SUB_SESSION_KEY_NOT_APPLICABLE = 0x5
+ ERROR_SESSION_KEY_NOT_FOUND = 0x6
+ ERROR_ADDRESS_NOT_FOUND = 0x7
+ ERROR_ADDRESS_ALREADY_PRESENT = 0x8
@staticmethod
- def from_int(v: int) -> Union[int, 'MulticastUpdateStatusCode']:
+ def from_int(v: int) -> Union[int, 'MulticastUpdateStatus']:
try:
- return MulticastUpdateStatusCode(v)
+ return MulticastUpdateStatus(v)
except ValueError as exn:
raise exn
@@ -1030,22 +991,6 @@ class SessionType(enum.IntEnum):
raise exn
-class MessageType(enum.IntEnum):
- DATA = 0x0
- COMMAND = 0x1
- RESPONSE = 0x2
- NOTIFICATION = 0x3
- RESERVED_FOR_TESTING_1 = 0x4
- RESERVED_FOR_TESTING_2 = 0x5
-
- @staticmethod
- def from_int(v: int) -> Union[int, 'MessageType']:
- try:
- return MessageType(v)
- except ValueError as exn:
- raise exn
-
-
@dataclass
class CommonPacketHeader(Packet):
pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE)
@@ -1159,7 +1104,6 @@ class DataPacketHeader(Packet):
class ControlPacket(Packet):
gid: GroupId = field(kw_only=True, default=GroupId.CORE)
mt: MessageType = field(kw_only=True, default=MessageType.DATA)
- opcode: int = field(kw_only=True, default=0)
def __post_init__(self):
pass
@@ -1167,210 +1111,28 @@ class ControlPacket(Packet):
@staticmethod
def parse(span: bytes) -> Tuple['ControlPacket', bytes]:
fields = {'payload': None}
- if len(span) < 4:
+ if len(span) < 1:
raise Exception('Invalid packet size')
fields['gid'] = GroupId.from_int((span[0] >> 0) & 0xf)
fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7)
- fields['opcode'] = (span[1] >> 0) & 0x3f
- value_ = int.from_bytes(span[2:4], byteorder='little')
- span = span[4:]
+ span = span[1:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
- return DeviceResetCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetDeviceInfoCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetCapsInfoCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SetConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return CoreQueryTimeStampCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionInitCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionDeinitCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetAppConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetAppConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetCountCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetStateCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetHybridConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionQueryMaxDataSizeCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionControlCommand.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidGetPowerStatsCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidSetCountryCodeCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return DeviceResetRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetDeviceInfoRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetCapsInfoRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SetConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return CoreQueryTimeStampRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionInitRsp_V2.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionInitRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionDeinitRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetAppConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetAppConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetCountRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetStateRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateDtTagRangingRoundsRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetHybridConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateControllerMulticastListRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionQueryMaxDataSizeRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionStartRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionStopRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetRangingCountRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidGetPowerStatsRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidSetCountryCodeRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return DeviceStatusNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GenericError.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionStatusNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return DataCreditNtf.parse(fields.copy(), payload)
+ return CorePacket.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return DataTransferStatusNtf.parse(fields.copy(), payload)
+ return SessionConfigPacket.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionInfoNtf.parse(fields.copy(), payload)
+ return SessionControlPacket.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload)
+ return AndroidPacket.parse(fields.copy(), payload)
except Exception as exn:
pass
return ControlPacket(**fields), span
@@ -1382,17 +1144,12 @@ class ControlPacket(Packet):
(self.mt << 5)
)
_span.append(_value)
- if self.opcode > 63:
- print(f"Invalid value for field ControlPacket::opcode: {self.opcode} > 63; the value will be truncated")
- self.opcode &= 63
- _span.append((self.opcode << 0))
- _span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return bytes(_span)
@property
def size(self) -> int:
- return len(self.payload) + 4
+ return len(self.payload) + 1
@dataclass
class DataPacket(Packet):
@@ -1500,7 +1257,7 @@ class DataMessageSnd(DataPacket):
@dataclass
class DataMessageRcv(DataPacket):
session_handle: int = field(kw_only=True, default=0)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
source_address: int = field(kw_only=True, default=0)
data_sequence_number: int = field(kw_only=True, default=0)
application_data: bytearray = field(kw_only=True, default_factory=bytearray)
@@ -1517,7 +1274,7 @@ class DataMessageRcv(DataPacket):
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_handle'] = value_
- fields['status'] = StatusCode.from_int(span[4])
+ fields['status'] = Status.from_int(span[4])
value_ = int.from_bytes(span[5:13], byteorder='little')
fields['source_address'] = value_
value_ = int.from_bytes(span[13:15], byteorder='little')
@@ -1555,142 +1312,114 @@ class DataMessageRcv(DataPacket):
return len(self.application_data) * 1 + 17
@dataclass
-class UciCommand(ControlPacket):
-
+class CorePacket(ControlPacket):
+ oid: CoreOpcodeId = field(kw_only=True, default=CoreOpcodeId.DEVICE_RESET)
def __post_init__(self):
- self.mt = MessageType.COMMAND
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciCommand', bytes]:
- if fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['CorePacket', bytes]:
+ if fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
+ if len(span) < 3:
+ raise Exception('Invalid packet size')
+ fields['oid'] = CoreOpcodeId.from_int((span[0] >> 0) & 0x3f)
+ value_ = int.from_bytes(span[1:3], byteorder='little')
+ span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
- return DeviceResetCmd.parse(fields.copy(), payload)
+ return CoreDeviceResetCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return GetDeviceInfoCmd.parse(fields.copy(), payload)
+ return CoreDeviceResetRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return GetCapsInfoCmd.parse(fields.copy(), payload)
+ return CoreDeviceStatusNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SetConfigCmd.parse(fields.copy(), payload)
+ return CoreGetDeviceInfoCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return GetConfigCmd.parse(fields.copy(), payload)
+ return CoreGetDeviceInfoRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return CoreQueryTimeStampCmd.parse(fields.copy(), payload)
+ return CoreGetCapsInfoCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionInitCmd.parse(fields.copy(), payload)
+ return CoreGetCapsInfoRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionDeinitCmd.parse(fields.copy(), payload)
+ return CoreSetConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionSetAppConfigCmd.parse(fields.copy(), payload)
+ return CoreSetConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetAppConfigCmd.parse(fields.copy(), payload)
+ return CoreGetConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetCountCmd.parse(fields.copy(), payload)
+ return CoreGetConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetStateCmd.parse(fields.copy(), payload)
+ return CoreGenericErrorNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetHybridConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionQueryMaxDataSizeCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionControlCommand.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidGetPowerStatsCmd.parse(fields.copy(), payload)
+ return CoreQueryTimeStampCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return AndroidSetCountryCodeCmd.parse(fields.copy(), payload)
+ return CoreQueryTimeStampRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
- return UciCommand(**fields), span
+ return CorePacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
+ _span.append((self.oid << 0))
+ _span.extend([0] * 2)
_span.extend(payload or self.payload or [])
return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return len(self.payload)
+ return len(self.payload) + 3
@dataclass
-class UciResponse(ControlPacket):
-
+class SessionConfigPacket(ControlPacket):
+ oid: SessionConfigOpcodeId = field(kw_only=True, default=SessionConfigOpcodeId.INIT)
def __post_init__(self):
- self.mt = MessageType.RESPONSE
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciResponse', bytes]:
- if fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['SessionConfigPacket', bytes]:
+ if fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
+ if len(span) < 3:
+ raise Exception('Invalid packet size')
+ fields['oid'] = SessionConfigOpcodeId.from_int((span[0] >> 0) & 0x3f)
+ value_ = int.from_bytes(span[1:3], byteorder='little')
+ span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
- return DeviceResetRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetDeviceInfoRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetCapsInfoRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SetConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return CoreQueryTimeStampRsp.parse(fields.copy(), payload)
+ return SessionInitCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
@@ -1702,92 +1431,11 @@ class UciResponse(ControlPacket):
except Exception as exn:
pass
try:
- return SessionDeinitRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetAppConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetAppConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetCountRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetStateRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateDtTagRangingRoundsRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionSetHybridConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateControllerMulticastListRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionQueryMaxDataSizeRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionStartRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionStopRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetRangingCountRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidGetPowerStatsRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidSetCountryCodeRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- return UciResponse(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return ControlPacket.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciNotification(ControlPacket):
-
-
- def __post_init__(self):
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciNotification', bytes]:
- if fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- try:
- return DeviceStatusNtf.parse(fields.copy(), payload)
+ return SessionDeinitCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return GenericError.parse(fields.copy(), payload)
+ return SessionDeinitRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
@@ -1795,277 +1443,47 @@ class UciNotification(ControlPacket):
except Exception as exn:
pass
try:
- return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return DataCreditNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return DataTransferStatusNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionInfoNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- return UciNotification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return ControlPacket.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class CoreCommand(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.CORE
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['CoreCommand', bytes]:
- if fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- try:
- return DeviceResetCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetDeviceInfoCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetCapsInfoCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SetConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return CoreQueryTimeStampCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- return CoreCommand(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class CoreResponse(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.CORE
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['CoreResponse', bytes]:
- if fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- try:
- return DeviceResetRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetDeviceInfoRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetCapsInfoRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SetConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GetConfigRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return CoreQueryTimeStampRsp.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- return CoreResponse(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class CoreNotification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.CORE
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['CoreNotification', bytes]:
- if fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- try:
- return DeviceStatusNtf.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return GenericError.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- return CoreNotification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class SessionConfigCommand(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.SESSION_CONFIG
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionConfigCommand', bytes]:
- if fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- try:
- return SessionInitCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionDeinitCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
return SessionSetAppConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetAppConfigCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetCountCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionGetStateCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload)
- except Exception as exn:
- pass
- try:
- return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload)
+ return SessionSetAppConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionSetHybridConfigCmd.parse(fields.copy(), payload)
+ return SessionGetAppConfigCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionQueryMaxDataSizeCmd.parse(fields.copy(), payload)
+ return SessionGetAppConfigRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
- return SessionConfigCommand(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class SessionConfigResponse(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.SESSION_CONFIG
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionConfigResponse', bytes]:
- if fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
try:
- return SessionInitRsp_V2.parse(fields.copy(), payload)
+ return SessionGetCountCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionInitRsp.parse(fields.copy(), payload)
+ return SessionGetCountRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionDeinitRsp.parse(fields.copy(), payload)
+ return SessionGetStateCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionSetAppConfigRsp.parse(fields.copy(), payload)
+ return SessionGetStateRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetAppConfigRsp.parse(fields.copy(), payload)
+ return SessionUpdateDtAnchorRangingRoundsCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetCountRsp.parse(fields.copy(), payload)
+ return SessionUpdateDtAnchorRangingRoundsRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetStateRsp.parse(fields.copy(), payload)
+ return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
@@ -2073,7 +1491,7 @@ class SessionConfigResponse(UciResponse):
except Exception as exn:
pass
try:
- return SessionSetHybridConfigRsp.parse(fields.copy(), payload)
+ return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
@@ -2081,189 +1499,114 @@ class SessionConfigResponse(UciResponse):
except Exception as exn:
pass
try:
- return SessionQueryMaxDataSizeRsp.parse(fields.copy(), payload)
+ return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
- return SessionConfigResponse(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class SessionConfigNotification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.SESSION_CONFIG
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionConfigNotification', bytes]:
- if fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
try:
- return SessionStatusNtf.parse(fields.copy(), payload)
+ return SessionQueryMaxDataSizeInRangingCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload)
+ return SessionQueryMaxDataSizeInRangingRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
- return SessionConfigNotification(**fields), span
+ return SessionConfigPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
+ _span.append((self.oid << 0))
+ _span.extend([0] * 2)
_span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
+ return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return len(self.payload)
+ return len(self.payload) + 3
@dataclass
-class SessionControlCommand(UciCommand):
- session_id: int = field(kw_only=True, default=0)
+class SessionControlPacket(ControlPacket):
+ oid: SessionControlOpcodeId = field(kw_only=True, default=SessionControlOpcodeId.START)
def __post_init__(self):
self.gid = GroupId.SESSION_CONTROL
- self.mt = MessageType.COMMAND
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionControlCommand', bytes]:
- if fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['SessionControlPacket', bytes]:
+ if fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
- if len(span) < 4:
+ if len(span) < 3:
raise Exception('Invalid packet size')
- value_ = int.from_bytes(span[0:4], byteorder='little')
- fields['session_id'] = value_
- span = span[4:]
+ fields['oid'] = SessionControlOpcodeId.from_int((span[0] >> 0) & 0x3f)
+ value_ = int.from_bytes(span[1:3], byteorder='little')
+ span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
try:
- return SessionStartCmd.parse(fields.copy(), payload)
+ return SessionDataCreditNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionStopCmd.parse(fields.copy(), payload)
+ return SessionDataTransferStatusNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetRangingCountCmd.parse(fields.copy(), payload)
+ return SessionStartCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
- return SessionControlCommand(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- if self.session_id > 4294967295:
- print(f"Invalid value for field SessionControlCommand::session_id: {self.session_id} > 4294967295; the value will be truncated")
- self.session_id &= 4294967295
- _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload) + 4
-
-@dataclass
-class SessionControlResponse(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.SESSION_CONTROL
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionControlResponse', bytes]:
- if fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
try:
return SessionStartRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionStopRsp.parse(fields.copy(), payload)
+ return SessionInfoNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionGetRangingCountRsp.parse(fields.copy(), payload)
+ return SessionStopCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
- return SessionControlResponse(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class SessionControlNotification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.SESSION_CONTROL
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionControlNotification', bytes]:
- if fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
try:
- return DataCreditNtf.parse(fields.copy(), payload)
+ return SessionStopRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return DataTransferStatusNtf.parse(fields.copy(), payload)
+ return SessionGetRangingCountCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
- return SessionInfoNtf.parse(fields.copy(), payload)
+ return SessionGetRangingCountRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
- return SessionControlNotification(**fields), span
+ return SessionControlPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
+ _span.append((self.oid << 0))
+ _span.extend([0] * 2)
_span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
+ return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return len(self.payload)
+ return len(self.payload) + 3
@dataclass
-class AndroidCommand(UciCommand):
-
+class AndroidPacket(ControlPacket):
+ oid: AndroidOpcodeId = field(kw_only=True, default=AndroidOpcodeId.GET_POWER_STATS)
def __post_init__(self):
self.gid = GroupId.VENDOR_ANDROID
- self.mt = MessageType.COMMAND
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['AndroidCommand', bytes]:
- if fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['AndroidPacket', bytes]:
+ if fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
+ if len(span) < 3:
+ raise Exception('Invalid packet size')
+ fields['oid'] = AndroidOpcodeId.from_int((span[0] >> 0) & 0x3f)
+ value_ = int.from_bytes(span[1:3], byteorder='little')
+ span = span[3:]
payload = span
span = bytes([])
fields['payload'] = payload
@@ -2272,194 +1615,144 @@ class AndroidCommand(UciCommand):
except Exception as exn:
pass
try:
- return AndroidSetCountryCodeCmd.parse(fields.copy(), payload)
+ return AndroidGetPowerStatsRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
- return AndroidCommand(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class AndroidResponse(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_ANDROID
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['AndroidResponse', bytes]:
- if fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
try:
- return AndroidGetPowerStatsRsp.parse(fields.copy(), payload)
+ return AndroidSetCountryCodeCmd.parse(fields.copy(), payload)
except Exception as exn:
pass
try:
return AndroidSetCountryCodeRsp.parse(fields.copy(), payload)
except Exception as exn:
pass
- return AndroidResponse(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class AndroidNotification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_ANDROID
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['AndroidNotification', bytes]:
- if fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
try:
return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload)
except Exception as exn:
pass
- return AndroidNotification(**fields), span
+ return AndroidPacket(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
+ _span.append((self.oid << 0))
+ _span.extend([0] * 2)
_span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
+ return ControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return len(self.payload)
+ return len(self.payload) + 3
@dataclass
-class DeviceResetCmd(CoreCommand):
+class CoreDeviceResetCmd(CorePacket):
reset_config: ResetConfig = field(kw_only=True, default=ResetConfig.UWBS_RESET)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.CORE
self.mt = MessageType.COMMAND
+ self.oid = CoreOpcodeId.DEVICE_RESET
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['DeviceResetCmd', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['reset_config'] = ResetConfig.from_int(span[0])
span = span[1:]
- return DeviceResetCmd(**fields), span
+ return CoreDeviceResetCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.reset_config << 0))
- return CoreCommand.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class DeviceResetRsp(CoreResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class CoreDeviceResetRsp(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.CORE
self.mt = MessageType.RESPONSE
+ self.oid = CoreOpcodeId.DEVICE_RESET
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['DeviceResetRsp', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
- return DeviceResetRsp(**fields), span
+ return CoreDeviceResetRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return CoreResponse.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class DeviceStatusNtf(CoreNotification):
+class CoreDeviceStatusNtf(CorePacket):
device_state: DeviceState = field(kw_only=True, default=DeviceState.DEVICE_STATE_READY)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.CORE
self.mt = MessageType.NOTIFICATION
+ self.oid = CoreOpcodeId.DEVICE_STATUS
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['DeviceStatusNtf', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.NOTIFICATION:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceStatusNtf', bytes]:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.DEVICE_STATUS or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
fields['device_state'] = DeviceState.from_int(span[0])
span = span[1:]
- return DeviceStatusNtf(**fields), span
+ return CoreDeviceStatusNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.device_state << 0))
- return CoreNotification.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class GetDeviceInfoCmd(CoreCommand):
+class CoreGetDeviceInfoCmd(CorePacket):
def __post_init__(self):
- self.opcode = 2
- self.gid = GroupId.CORE
self.mt = MessageType.COMMAND
+ self.oid = CoreOpcodeId.GET_DEVICE_INFO
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoCmd', bytes]:
- if fields['opcode'] != 0x2 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
- return GetDeviceInfoCmd(**fields), span
+ return CoreGetDeviceInfoCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return CoreCommand.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
-class GetDeviceInfoRsp(CoreResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class CoreGetDeviceInfoRsp(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
uci_version: int = field(kw_only=True, default=0)
mac_version: int = field(kw_only=True, default=0)
phy_version: int = field(kw_only=True, default=0)
@@ -2467,17 +1760,17 @@ class GetDeviceInfoRsp(CoreResponse):
vendor_spec_info: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
- self.opcode = 2
- self.gid = GroupId.CORE
self.mt = MessageType.RESPONSE
+ self.oid = CoreOpcodeId.GET_DEVICE_INFO
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoRsp', bytes]:
- if fields['opcode'] != 0x2 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 10:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:3], byteorder='little')
fields['uci_version'] = value_
value_ = int.from_bytes(span[3:5], byteorder='little')
@@ -2492,56 +1785,56 @@ class GetDeviceInfoRsp(CoreResponse):
raise Exception('Invalid packet size')
fields['vendor_spec_info'] = list(span[:vendor_spec_info_count])
span = span[vendor_spec_info_count:]
- return GetDeviceInfoRsp(**fields), span
+ return CoreGetDeviceInfoRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if self.uci_version > 65535:
- print(f"Invalid value for field GetDeviceInfoRsp::uci_version: {self.uci_version} > 65535; the value will be truncated")
+ print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_version: {self.uci_version} > 65535; the value will be truncated")
self.uci_version &= 65535
_span.extend(int.to_bytes((self.uci_version << 0), length=2, byteorder='little'))
if self.mac_version > 65535:
- print(f"Invalid value for field GetDeviceInfoRsp::mac_version: {self.mac_version} > 65535; the value will be truncated")
+ print(f"Invalid value for field CoreGetDeviceInfoRsp::mac_version: {self.mac_version} > 65535; the value will be truncated")
self.mac_version &= 65535
_span.extend(int.to_bytes((self.mac_version << 0), length=2, byteorder='little'))
if self.phy_version > 65535:
- print(f"Invalid value for field GetDeviceInfoRsp::phy_version: {self.phy_version} > 65535; the value will be truncated")
+ print(f"Invalid value for field CoreGetDeviceInfoRsp::phy_version: {self.phy_version} > 65535; the value will be truncated")
self.phy_version &= 65535
_span.extend(int.to_bytes((self.phy_version << 0), length=2, byteorder='little'))
if self.uci_test_version > 65535:
- print(f"Invalid value for field GetDeviceInfoRsp::uci_test_version: {self.uci_test_version} > 65535; the value will be truncated")
+ print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_test_version: {self.uci_test_version} > 65535; the value will be truncated")
self.uci_test_version &= 65535
_span.extend(int.to_bytes((self.uci_test_version << 0), length=2, byteorder='little'))
if len(self.vendor_spec_info) > 255:
- print(f"Invalid length for field GetDeviceInfoRsp::vendor_spec_info: {len(self.vendor_spec_info)} > 255; the array will be truncated")
+ print(f"Invalid length for field CoreGetDeviceInfoRsp::vendor_spec_info: {len(self.vendor_spec_info)} > 255; the array will be truncated")
del self.vendor_spec_info[255:]
_span.append((len(self.vendor_spec_info) << 0))
_span.extend(self.vendor_spec_info)
- return CoreResponse.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.vendor_spec_info) * 1 + 10
@dataclass
-class GetCapsInfoCmd(CoreCommand):
+class CoreGetCapsInfoCmd(CorePacket):
def __post_init__(self):
- self.opcode = 3
- self.gid = GroupId.CORE
self.mt = MessageType.COMMAND
+ self.oid = CoreOpcodeId.GET_CAPS_INFO
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoCmd', bytes]:
- if fields['opcode'] != 0x3 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
- return GetCapsInfoCmd(**fields), span
+ return CoreGetCapsInfoCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return CoreCommand.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -2584,22 +1877,22 @@ class CapTlv(Packet):
return len(self.v) * 1 + 2
@dataclass
-class GetCapsInfoRsp(CoreResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class CoreGetCapsInfoRsp(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
tlvs: List[CapTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 3
- self.gid = GroupId.CORE
self.mt = MessageType.RESPONSE
+ self.oid = CoreOpcodeId.GET_CAPS_INFO
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoRsp', bytes]:
- if fields['opcode'] != 0x3 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
tlvs_count = span[1]
span = span[2:]
tlvs = []
@@ -2607,118 +1900,127 @@ class GetCapsInfoRsp(CoreResponse):
element, span = CapTlv.parse(span)
tlvs.append(element)
fields['tlvs'] = tlvs
- return GetCapsInfoRsp(**fields), span
+ return CoreGetCapsInfoRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
if len(self.tlvs) > 255:
- print(f"Invalid length for field GetCapsInfoRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
+ print(f"Invalid length for field CoreGetCapsInfoRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
del self.tlvs[255:]
_span.append((len(self.tlvs) << 0))
for _elt in self.tlvs:
_span.extend(_elt.serialize())
- return CoreResponse.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.tlvs]) + 2
+class ConfigParameterId(enum.IntEnum):
+ DEVICE_STATE = 0x0
+ LOW_POWER_MODE = 0x1
+
+ @staticmethod
+ def from_int(v: int) -> Union[int, 'ConfigParameterId']:
+ try:
+ return ConfigParameterId(v)
+ except ValueError as exn:
+ return v
+
+
@dataclass
-class DeviceConfigTlv(Packet):
- cfg_id: DeviceConfigId = field(kw_only=True, default=DeviceConfigId.DEVICE_STATE)
- v: bytearray = field(kw_only=True, default_factory=bytearray)
+class ConfigParameter(Packet):
+ id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE)
+ value: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
pass
@staticmethod
- def parse(span: bytes) -> Tuple['DeviceConfigTlv', bytes]:
+ def parse(span: bytes) -> Tuple['ConfigParameter', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['cfg_id'] = DeviceConfigId.from_int(span[0])
- v_count = span[1]
+ fields['id'] = ConfigParameterId.from_int(span[0])
+ value_size = span[1]
span = span[2:]
- if len(span) < v_count:
+ if len(span) < value_size:
raise Exception('Invalid packet size')
- fields['v'] = list(span[:v_count])
- span = span[v_count:]
- return DeviceConfigTlv(**fields), span
+ fields['value'] = list(span[:value_size])
+ span = span[value_size:]
+ return ConfigParameter(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- _span.append((self.cfg_id << 0))
- if len(self.v) > 255:
- print(f"Invalid length for field DeviceConfigTlv::v: {len(self.v)} > 255; the array will be truncated")
- del self.v[255:]
- _span.append((len(self.v) << 0))
- _span.extend(self.v)
+ _span.append((self.id << 0))
+ _span.append(((len(self.value) * 1) << 0))
+ _span.extend(self.value)
return bytes(_span)
@property
def size(self) -> int:
- return len(self.v) * 1 + 2
+ return len(self.value) * 1 + 2
@dataclass
-class SetConfigCmd(CoreCommand):
- tlvs: List[DeviceConfigTlv] = field(kw_only=True, default_factory=list)
+class CoreSetConfigCmd(CorePacket):
+ parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 4
- self.gid = GroupId.CORE
self.mt = MessageType.COMMAND
+ self.oid = CoreOpcodeId.SET_CONFIG
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SetConfigCmd', bytes]:
- if fields['opcode'] != 0x4 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- tlvs_count = span[0]
+ parameters_count = span[0]
span = span[1:]
- tlvs = []
- for n in range(tlvs_count):
- element, span = DeviceConfigTlv.parse(span)
- tlvs.append(element)
- fields['tlvs'] = tlvs
- return SetConfigCmd(**fields), span
+ parameters = []
+ for n in range(parameters_count):
+ element, span = ConfigParameter.parse(span)
+ parameters.append(element)
+ fields['parameters'] = parameters
+ return CoreSetConfigCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- if len(self.tlvs) > 255:
- print(f"Invalid length for field SetConfigCmd::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
- del self.tlvs[255:]
- _span.append((len(self.tlvs) << 0))
- for _elt in self.tlvs:
+ if len(self.parameters) > 255:
+ print(f"Invalid length for field CoreSetConfigCmd::parameters: {len(self.parameters)} > 255; the array will be truncated")
+ del self.parameters[255:]
+ _span.append((len(self.parameters) << 0))
+ for _elt in self.parameters:
_span.extend(_elt.serialize())
- return CoreCommand.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return sum([elt.size for elt in self.tlvs]) + 1
+ return sum([elt.size for elt in self.parameters]) + 1
@dataclass
-class DeviceConfigStatus(Packet):
- cfg_id: DeviceConfigId = field(kw_only=True, default=DeviceConfigId.DEVICE_STATE)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class ConfigParameterStatus(Packet):
+ id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE)
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
pass
@staticmethod
- def parse(span: bytes) -> Tuple['DeviceConfigStatus', bytes]:
+ def parse(span: bytes) -> Tuple['ConfigParameterStatus', bytes]:
fields = {'payload': None}
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['cfg_id'] = DeviceConfigId.from_int(span[0])
- fields['status'] = StatusCode.from_int(span[1])
+ fields['id'] = ConfigParameterId.from_int(span[0])
+ fields['status'] = Status.from_int(span[1])
span = span[2:]
- return DeviceConfigStatus(**fields), span
+ return ConfigParameterStatus(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- _span.append((self.cfg_id << 0))
+ _span.append((self.id << 0))
_span.append((self.status << 0))
return bytes(_span)
@@ -2727,193 +2029,197 @@ class DeviceConfigStatus(Packet):
return 2
@dataclass
-class SetConfigRsp(CoreResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
- cfg_status: List[DeviceConfigStatus] = field(kw_only=True, default_factory=list)
+class CoreSetConfigRsp(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
+ parameters: List[ConfigParameterStatus] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 4
- self.gid = GroupId.CORE
self.mt = MessageType.RESPONSE
+ self.oid = CoreOpcodeId.SET_CONFIG
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SetConfigRsp', bytes]:
- if fields['opcode'] != 0x4 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
- cfg_status_count = span[1]
+ fields['status'] = Status.from_int(span[0])
+ parameters_count = span[1]
span = span[2:]
- if len(span) < cfg_status_count * 2:
+ if len(span) < parameters_count * 2:
raise Exception('Invalid packet size')
- cfg_status = []
- for n in range(cfg_status_count):
- cfg_status.append(DeviceConfigStatus.parse_all(span[n * 2:(n + 1) * 2]))
- fields['cfg_status'] = cfg_status
- span = span[cfg_status_count * 2:]
- return SetConfigRsp(**fields), span
+ parameters = []
+ for n in range(parameters_count):
+ parameters.append(ConfigParameterStatus.parse_all(span[n * 2:(n + 1) * 2]))
+ fields['parameters'] = parameters
+ span = span[parameters_count * 2:]
+ return CoreSetConfigRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- if len(self.cfg_status) > 255:
- print(f"Invalid length for field SetConfigRsp::cfg_status: {len(self.cfg_status)} > 255; the array will be truncated")
- del self.cfg_status[255:]
- _span.append((len(self.cfg_status) << 0))
- for _elt in self.cfg_status:
+ if len(self.parameters) > 255:
+ print(f"Invalid length for field CoreSetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated")
+ del self.parameters[255:]
+ _span.append((len(self.parameters) << 0))
+ for _elt in self.parameters:
_span.extend(_elt.serialize())
- return CoreResponse.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return sum([elt.size for elt in self.cfg_status]) + 2
+ return sum([elt.size for elt in self.parameters]) + 2
@dataclass
-class GetConfigCmd(CoreCommand):
- cfg_id: bytearray = field(kw_only=True, default_factory=bytearray)
+class CoreGetConfigCmd(CorePacket):
+ parameter_ids: List[ConfigParameterId] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 5
- self.gid = GroupId.CORE
self.mt = MessageType.COMMAND
+ self.oid = CoreOpcodeId.GET_CONFIG
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GetConfigCmd', bytes]:
- if fields['opcode'] != 0x5 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- cfg_id_count = span[0]
+ parameter_ids_count = span[0]
span = span[1:]
- if len(span) < cfg_id_count:
+ if len(span) < parameter_ids_count:
raise Exception('Invalid packet size')
- fields['cfg_id'] = list(span[:cfg_id_count])
- span = span[cfg_id_count:]
- return GetConfigCmd(**fields), span
+ parameter_ids = []
+ for n in range(parameter_ids_count):
+ parameter_ids.append(ConfigParameterId(int.from_bytes(span[n:n + 1], byteorder='little')))
+ fields['parameter_ids'] = parameter_ids
+ span = span[parameter_ids_count:]
+ return CoreGetConfigCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- if len(self.cfg_id) > 255:
- print(f"Invalid length for field GetConfigCmd::cfg_id: {len(self.cfg_id)} > 255; the array will be truncated")
- del self.cfg_id[255:]
- _span.append((len(self.cfg_id) << 0))
- _span.extend(self.cfg_id)
- return CoreCommand.serialize(self, payload = bytes(_span))
+ if len(self.parameter_ids) > 255:
+ print(f"Invalid length for field CoreGetConfigCmd::parameter_ids: {len(self.parameter_ids)} > 255; the array will be truncated")
+ del self.parameter_ids[255:]
+ _span.append((len(self.parameter_ids) << 0))
+ for _elt in self.parameter_ids:
+ _span.append(_elt)
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return len(self.cfg_id) * 1 + 1
+ return len(self.parameter_ids) * 8 + 1
@dataclass
-class GetConfigRsp(CoreResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
- tlvs: List[DeviceConfigTlv] = field(kw_only=True, default_factory=list)
+class CoreGetConfigRsp(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
+ parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 5
- self.gid = GroupId.CORE
self.mt = MessageType.RESPONSE
+ self.oid = CoreOpcodeId.GET_CONFIG
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GetConfigRsp', bytes]:
- if fields['opcode'] != 0x5 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
- tlvs_count = span[1]
+ fields['status'] = Status.from_int(span[0])
+ parameters_count = span[1]
span = span[2:]
- tlvs = []
- for n in range(tlvs_count):
- element, span = DeviceConfigTlv.parse(span)
- tlvs.append(element)
- fields['tlvs'] = tlvs
- return GetConfigRsp(**fields), span
+ parameters = []
+ for n in range(parameters_count):
+ element, span = ConfigParameter.parse(span)
+ parameters.append(element)
+ fields['parameters'] = parameters
+ return CoreGetConfigRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- if len(self.tlvs) > 255:
- print(f"Invalid length for field GetConfigRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated")
- del self.tlvs[255:]
- _span.append((len(self.tlvs) << 0))
- for _elt in self.tlvs:
+ if len(self.parameters) > 255:
+ print(f"Invalid length for field CoreGetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated")
+ del self.parameters[255:]
+ _span.append((len(self.parameters) << 0))
+ for _elt in self.parameters:
_span.extend(_elt.serialize())
- return CoreResponse.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return sum([elt.size for elt in self.tlvs]) + 2
+ return sum([elt.size for elt in self.parameters]) + 2
@dataclass
-class GenericError(CoreNotification):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class CoreGenericErrorNtf(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 7
- self.gid = GroupId.CORE
self.mt = MessageType.NOTIFICATION
+ self.oid = CoreOpcodeId.GENERIC_ERROR
+ self.gid = GroupId.CORE
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['GenericError', bytes]:
- if fields['opcode'] != 0x7 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.NOTIFICATION:
+ def parse(fields: dict, span: bytes) -> Tuple['CoreGenericErrorNtf', bytes]:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.GENERIC_ERROR or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
- return GenericError(**fields), span
+ return CoreGenericErrorNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return CoreNotification.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class CoreQueryTimeStampCmd(CoreCommand):
+class CoreQueryTimeStampCmd(CorePacket):
def __post_init__(self):
- self.opcode = 8
- self.gid = GroupId.CORE
self.mt = MessageType.COMMAND
+ self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP
+ self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampCmd', bytes]:
- if fields['opcode'] != 0x8 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
return CoreQueryTimeStampCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return CoreCommand.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
-class CoreQueryTimeStampRsp(CoreResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class CoreQueryTimeStampRsp(CorePacket):
+ status: Status = field(kw_only=True, default=Status.OK)
timeStamp: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 8
- self.gid = GroupId.CORE
self.mt = MessageType.RESPONSE
+ self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP
+ self.gid = GroupId.CORE
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampRsp', bytes]:
- if fields['opcode'] != 0x8 or fields['gid'] != GroupId.CORE or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE:
raise Exception("Invalid constraint field values")
if len(span) < 9:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:9], byteorder='little')
fields['timeStamp'] = value_
span = span[9:]
@@ -2926,25 +2232,25 @@ class CoreQueryTimeStampRsp(CoreResponse):
print(f"Invalid value for field CoreQueryTimeStampRsp::timeStamp: {self.timeStamp} > 18446744073709551615; the value will be truncated")
self.timeStamp &= 18446744073709551615
_span.extend(int.to_bytes((self.timeStamp << 0), length=8, byteorder='little'))
- return CoreResponse.serialize(self, payload = bytes(_span))
+ return CorePacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 9
@dataclass
-class SessionInitCmd(SessionConfigCommand):
+class SessionInitCmd(SessionConfigPacket):
session_id: int = field(kw_only=True, default=0)
session_type: SessionType = field(kw_only=True, default=SessionType.FIRA_RANGING_SESSION)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.INIT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInitCmd', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
@@ -2961,29 +2267,29 @@ class SessionInitCmd(SessionConfigCommand):
self.session_id &= 4294967295
_span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
_span.append((self.session_type << 0))
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
-class SessionInitRsp_V2(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionInitRsp_V2(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
session_handle: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.INIT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp_V2', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:5], byteorder='little')
fields['session_handle'] = value_
span = span[5:]
@@ -2996,52 +2302,52 @@ class SessionInitRsp_V2(SessionConfigResponse):
print(f"Invalid value for field SessionInitRsp_V2::session_handle: {self.session_handle} > 4294967295; the value will be truncated")
self.session_handle &= 4294967295
_span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little'))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
-class SessionInitRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionInitRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.INIT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionInitRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class SessionDeinitCmd(SessionConfigCommand):
+class SessionDeinitCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.DEINIT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitCmd', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
@@ -3056,54 +2362,54 @@ class SessionDeinitCmd(SessionConfigCommand):
print(f"Invalid value for field SessionDeinitCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
-class SessionDeinitRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionDeinitRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.DEINIT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitRsp', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionDeinitRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class SessionStatusNtf(SessionConfigNotification):
+class SessionStatusNtf(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT)
reason_code: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 2
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionConfigOpcodeId.STATUS
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStatusNtf', bytes]:
- if fields['opcode'] != 0x2 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.STATUS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 6:
raise Exception('Invalid packet size')
@@ -3125,7 +2431,7 @@ class SessionStatusNtf(SessionConfigNotification):
print(f"Invalid value for field SessionStatusNtf::reason_code: {self.reason_code} > 255; the value will be truncated")
self.reason_code &= 255
_span.append((self.reason_code << 0))
- return SessionConfigNotification.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -3168,18 +2474,18 @@ class AppConfigTlv(Packet):
return len(self.v) * 1 + 2
@dataclass
-class SessionSetAppConfigCmd(SessionConfigCommand):
+class SessionSetAppConfigCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 3
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.SET_APP_CONFIG
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigCmd', bytes]:
- if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
@@ -3206,7 +2512,7 @@ class SessionSetAppConfigCmd(SessionConfigCommand):
_span.append((len(self.tlvs) << 0))
for _elt in self.tlvs:
_span.extend(_elt.serialize())
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -3215,7 +2521,7 @@ class SessionSetAppConfigCmd(SessionConfigCommand):
@dataclass
class AppConfigStatus(Packet):
cfg_id: AppConfigTlvType = field(kw_only=True, default=AppConfigTlvType.DEVICE_TYPE)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
pass
@@ -3226,7 +2532,7 @@ class AppConfigStatus(Packet):
if len(span) < 2:
raise Exception('Invalid packet size')
fields['cfg_id'] = AppConfigTlvType.from_int(span[0])
- fields['status'] = StatusCode.from_int(span[1])
+ fields['status'] = Status.from_int(span[1])
span = span[2:]
return AppConfigStatus(**fields), span
@@ -3241,22 +2547,22 @@ class AppConfigStatus(Packet):
return 2
@dataclass
-class SessionSetAppConfigRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionSetAppConfigRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
cfg_status: List[AppConfigStatus] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 3
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.SET_APP_CONFIG
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigRsp', bytes]:
- if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
cfg_status_count = span[1]
span = span[2:]
if len(span) < cfg_status_count * 2:
@@ -3277,25 +2583,25 @@ class SessionSetAppConfigRsp(SessionConfigResponse):
_span.append((len(self.cfg_status) << 0))
for _elt in self.cfg_status:
_span.extend(_elt.serialize())
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.cfg_status]) + 2
@dataclass
-class SessionGetAppConfigCmd(SessionConfigCommand):
+class SessionGetAppConfigCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
app_cfg: List[AppConfigTlvType] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 4
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.GET_APP_CONFIG
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigCmd', bytes]:
- if fields['opcode'] != 0x4 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
@@ -3324,29 +2630,29 @@ class SessionGetAppConfigCmd(SessionConfigCommand):
_span.append((len(self.app_cfg) << 0))
for _elt in self.app_cfg:
_span.append(_elt)
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.app_cfg) * 8 + 5
@dataclass
-class SessionGetAppConfigRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionGetAppConfigRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 4
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.GET_APP_CONFIG
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigRsp', bytes]:
- if fields['opcode'] != 0x4 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
tlvs_count = span[1]
span = span[2:]
tlvs = []
@@ -3365,52 +2671,52 @@ class SessionGetAppConfigRsp(SessionConfigResponse):
_span.append((len(self.tlvs) << 0))
for _elt in self.tlvs:
_span.extend(_elt.serialize())
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.tlvs]) + 2
@dataclass
-class SessionGetCountCmd(SessionConfigCommand):
+class SessionGetCountCmd(SessionConfigPacket):
def __post_init__(self):
- self.opcode = 5
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.GET_COUNT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountCmd', bytes]:
- if fields['opcode'] != 0x5 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
return SessionGetCountCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 0
@dataclass
-class SessionGetCountRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionGetCountRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
session_count: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 5
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.GET_COUNT
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountRsp', bytes]:
- if fields['opcode'] != 0x5 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
fields['session_count'] = span[1]
span = span[2:]
return SessionGetCountRsp(**fields), span
@@ -3422,24 +2728,24 @@ class SessionGetCountRsp(SessionConfigResponse):
print(f"Invalid value for field SessionGetCountRsp::session_count: {self.session_count} > 255; the value will be truncated")
self.session_count &= 255
_span.append((self.session_count << 0))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 2
@dataclass
-class SessionGetStateCmd(SessionConfigCommand):
+class SessionGetStateCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 6
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.GET_STATE
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateCmd', bytes]:
- if fields['opcode'] != 0x6 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
@@ -3454,29 +2760,29 @@ class SessionGetStateCmd(SessionConfigCommand):
print(f"Invalid value for field SessionGetStateCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
-class SessionGetStateRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionGetStateRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT)
def __post_init__(self):
- self.opcode = 6
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.GET_STATE
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateRsp', bytes]:
- if fields['opcode'] != 0x6 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
fields['session_state'] = SessionState.from_int(span[1])
span = span[2:]
return SessionGetStateRsp(**fields), span
@@ -3485,25 +2791,71 @@ class SessionGetStateRsp(SessionConfigResponse):
_span = bytearray()
_span.append((self.status << 0))
_span.append((self.session_state << 0))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 2
@dataclass
-class SessionUpdateDtTagRangingRoundsCmd(SessionConfigCommand):
+class SessionUpdateDtAnchorRangingRoundsCmd(SessionConfigPacket):
+
+
+ def __post_init__(self):
+ self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS
+ self.gid = GroupId.SESSION_CONFIG
+
+ @staticmethod
+ def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
+ raise Exception("Invalid constraint field values")
+ return SessionUpdateDtAnchorRangingRoundsCmd(**fields), span
+
+ def serialize(self, payload: bytes = None) -> bytes:
+ _span = bytearray()
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
+
+ @property
+ def size(self) -> int:
+ return 0
+
+@dataclass
+class SessionUpdateDtAnchorRangingRoundsRsp(SessionConfigPacket):
+
+
+ def __post_init__(self):
+ self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS
+ self.gid = GroupId.SESSION_CONFIG
+
+ @staticmethod
+ def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
+ raise Exception("Invalid constraint field values")
+ return SessionUpdateDtAnchorRangingRoundsRsp(**fields), span
+
+ def serialize(self, payload: bytes = None) -> bytes:
+ _span = bytearray()
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
+
+ @property
+ def size(self) -> int:
+ return 0
+
+@dataclass
+class SessionUpdateDtTagRangingRoundsCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
- self.opcode = 9
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsCmd', bytes]:
- if fields['opcode'] != 0x9 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
@@ -3528,29 +2880,29 @@ class SessionUpdateDtTagRangingRoundsCmd(SessionConfigCommand):
del self.ranging_round_indexes[255:]
_span.append((len(self.ranging_round_indexes) << 0))
_span.extend(self.ranging_round_indexes)
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.ranging_round_indexes) * 1 + 5
@dataclass
-class SessionUpdateDtTagRangingRoundsRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionUpdateDtTagRangingRoundsRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
- self.opcode = 9
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsRsp', bytes]:
- if fields['opcode'] != 0x9 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
ranging_round_indexes_count = span[1]
span = span[2:]
if len(span) < ranging_round_indexes_count:
@@ -3567,7 +2919,7 @@ class SessionUpdateDtTagRangingRoundsRsp(SessionConfigResponse):
del self.ranging_round_indexes[255:]
_span.append((len(self.ranging_round_indexes) << 0))
_span.extend(self.ranging_round_indexes)
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -3705,18 +3057,18 @@ class UpdateMulticastListAction(enum.IntEnum):
@dataclass
-class SessionUpdateControllerMulticastListCmd(SessionConfigCommand):
+class SessionUpdateControllerMulticastListCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
action: UpdateMulticastListAction = field(kw_only=True, default=UpdateMulticastListAction.ADD_CONTROLEE)
def __post_init__(self):
- self.opcode = 7
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd', bytes]:
- if fields['opcode'] != 0x7 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
@@ -3737,139 +3089,13 @@ class SessionUpdateControllerMulticastListCmd(SessionConfigCommand):
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
_span.append((self.action << 0))
_span.extend(payload or self.payload or [])
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return len(self.payload) + 5
@dataclass
-class PhaseList(Packet):
- session_token: int = field(kw_only=True, default=0)
- start_slot_index: int = field(kw_only=True, default=0)
- end_slot_index: int = field(kw_only=True, default=0)
-
- def __post_init__(self):
- pass
-
- @staticmethod
- def parse(span: bytes) -> Tuple['PhaseList', bytes]:
- fields = {'payload': None}
- if len(span) < 8:
- raise Exception('Invalid packet size')
- value_ = int.from_bytes(span[0:4], byteorder='little')
- fields['session_token'] = value_
- value_ = int.from_bytes(span[4:6], byteorder='little')
- fields['start_slot_index'] = value_
- value_ = int.from_bytes(span[6:8], byteorder='little')
- fields['end_slot_index'] = value_
- span = span[8:]
- return PhaseList(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- if self.session_token > 4294967295:
- print(f"Invalid value for field PhaseList::session_token: {self.session_token} > 4294967295; the value will be truncated")
- self.session_token &= 4294967295
- _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
- if self.start_slot_index > 65535:
- print(f"Invalid value for field PhaseList::start_slot_index: {self.start_slot_index} > 65535; the value will be truncated")
- self.start_slot_index &= 65535
- _span.extend(int.to_bytes((self.start_slot_index << 0), length=2, byteorder='little'))
- if self.end_slot_index > 65535:
- print(f"Invalid value for field PhaseList::end_slot_index: {self.end_slot_index} > 65535; the value will be truncated")
- self.end_slot_index &= 65535
- _span.extend(int.to_bytes((self.end_slot_index << 0), length=2, byteorder='little'))
- return bytes(_span)
-
- @property
- def size(self) -> int:
- return 8
-
-@dataclass
-class SessionSetHybridConfigCmd(SessionConfigCommand):
- session_token: int = field(kw_only=True, default=0)
- number_of_phases: int = field(kw_only=True, default=0)
- update_time: bytearray = field(kw_only=True, default_factory=bytearray)
- phase_list: List[PhaseList] = field(kw_only=True, default_factory=list)
-
- def __post_init__(self):
- self.opcode = 12
- self.gid = GroupId.SESSION_CONFIG
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionSetHybridConfigCmd', bytes]:
- if fields['opcode'] != 0xc or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- if len(span) < 5:
- raise Exception('Invalid packet size')
- value_ = int.from_bytes(span[0:4], byteorder='little')
- fields['session_token'] = value_
- fields['number_of_phases'] = span[4]
- span = span[5:]
- if len(span) < 8:
- raise Exception('Invalid packet size')
- fields['update_time'] = list(span[:8])
- span = span[8:]
- if len(span) % 8 != 0:
- raise Exception('Array size is not a multiple of the element size')
- phase_list_count = int(len(span) / 8)
- phase_list = []
- for n in range(phase_list_count):
- phase_list.append(PhaseList.parse_all(span[n * 8:(n + 1) * 8]))
- fields['phase_list'] = phase_list
- span = bytes()
- return SessionSetHybridConfigCmd(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- if self.session_token > 4294967295:
- print(f"Invalid value for field SessionSetHybridConfigCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
- self.session_token &= 4294967295
- _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
- if self.number_of_phases > 255:
- print(f"Invalid value for field SessionSetHybridConfigCmd::number_of_phases: {self.number_of_phases} > 255; the value will be truncated")
- self.number_of_phases &= 255
- _span.append((self.number_of_phases << 0))
- _span.extend(self.update_time)
- for _elt in self.phase_list:
- _span.extend(_elt.serialize())
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return sum([elt.size for elt in self.phase_list]) + 13
-
-@dataclass
-class SessionSetHybridConfigRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
-
- def __post_init__(self):
- self.opcode = 12
- self.gid = GroupId.SESSION_CONFIG
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionSetHybridConfigRsp', bytes]:
- if fields['opcode'] != 0xc or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- if len(span) < 1:
- raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
- span = span[1:]
- return SessionSetHybridConfigRsp(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.append((self.status << 0))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return 1
-
-@dataclass
class SessionUpdateControllerMulticastListCmdPayload(Packet):
controlees: List[Controlee] = field(kw_only=True, default_factory=list)
@@ -3981,28 +3207,28 @@ class SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload(Packet):
return sum([elt.size for elt in self.controlees]) + 1
@dataclass
-class SessionUpdateControllerMulticastListRsp(SessionConfigResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionUpdateControllerMulticastListRsp(SessionConfigPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 7
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListRsp', bytes]:
- if fields['opcode'] != 0x7 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionUpdateControllerMulticastListRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -4012,7 +3238,7 @@ class SessionUpdateControllerMulticastListRsp(SessionConfigResponse):
class ControleeStatus(Packet):
mac_address: bytearray = field(kw_only=True, default_factory=bytearray)
subsession_id: int = field(kw_only=True, default=0)
- status: MulticastUpdateStatusCode = field(kw_only=True, default=MulticastUpdateStatusCode.STATUS_OK_MULTICAST_LIST_UPDATE)
+ status: MulticastUpdateStatus = field(kw_only=True, default=MulticastUpdateStatus.OK_MULTICAST_LIST_UPDATE)
def __post_init__(self):
pass
@@ -4028,7 +3254,7 @@ class ControleeStatus(Packet):
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['subsession_id'] = value_
- fields['status'] = MulticastUpdateStatusCode.from_int(span[4])
+ fields['status'] = MulticastUpdateStatus.from_int(span[4])
span = span[5:]
return ControleeStatus(**fields), span
@@ -4047,19 +3273,19 @@ class ControleeStatus(Packet):
return 7
@dataclass
-class SessionUpdateControllerMulticastListNtf(SessionConfigNotification):
+class SessionUpdateControllerMulticastListNtf(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
remaining_multicast_list_size: int = field(kw_only=True, default=0)
controlee_status: List[ControleeStatus] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 7
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListNtf', bytes]:
- if fields['opcode'] != 0x7 or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 6:
raise Exception('Invalid packet size')
@@ -4093,25 +3319,25 @@ class SessionUpdateControllerMulticastListNtf(SessionConfigNotification):
_span.append((len(self.controlee_status) << 0))
for _elt in self.controlee_status:
_span.extend(_elt.serialize())
- return SessionConfigNotification.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.controlee_status]) + 6
@dataclass
-class DataCreditNtf(SessionControlNotification):
+class SessionDataCreditNtf(SessionControlPacket):
session_token: int = field(kw_only=True, default=0)
credit_availability: CreditAvailability = field(kw_only=True, default=CreditAvailability.CREDIT_NOT_AVAILABLE)
def __post_init__(self):
- self.opcode = 4
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.DATA_CREDIT
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['DataCreditNtf', bytes]:
- if fields['opcode'] != 0x4 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ def parse(fields: dict, span: bytes) -> Tuple['SessionDataCreditNtf', bytes]:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_CREDIT or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
@@ -4119,36 +3345,36 @@ class DataCreditNtf(SessionControlNotification):
fields['session_token'] = value_
fields['credit_availability'] = CreditAvailability.from_int(span[4])
span = span[5:]
- return DataCreditNtf(**fields), span
+ return SessionDataCreditNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
- print(f"Invalid value for field DataCreditNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
+ print(f"Invalid value for field SessionDataCreditNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
_span.append((self.credit_availability << 0))
- return SessionControlNotification.serialize(self, payload = bytes(_span))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
-class DataTransferStatusNtf(SessionControlNotification):
+class SessionDataTransferStatusNtf(SessionControlPacket):
session_token: int = field(kw_only=True, default=0)
uci_sequence_number: int = field(kw_only=True, default=0)
status: DataTransferNtfStatusCode = field(kw_only=True, default=DataTransferNtfStatusCode.UCI_DATA_TRANSFER_STATUS_REPETITION_OK)
tx_count: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 5
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.DATA_TRANSFER_STATUS
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['DataTransferStatusNtf', bytes]:
- if fields['opcode'] != 0x5 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ def parse(fields: dict, span: bytes) -> Tuple['SessionDataTransferStatusNtf', bytes]:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_TRANSFER_STATUS or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 7:
raise Exception('Invalid packet size')
@@ -4158,74 +3384,74 @@ class DataTransferStatusNtf(SessionControlNotification):
fields['status'] = DataTransferNtfStatusCode.from_int(span[5])
fields['tx_count'] = span[6]
span = span[7:]
- return DataTransferStatusNtf(**fields), span
+ return SessionDataTransferStatusNtf(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
- print(f"Invalid value for field DataTransferStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
+ print(f"Invalid value for field SessionDataTransferStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.uci_sequence_number > 255:
- print(f"Invalid value for field DataTransferStatusNtf::uci_sequence_number: {self.uci_sequence_number} > 255; the value will be truncated")
+ print(f"Invalid value for field SessionDataTransferStatusNtf::uci_sequence_number: {self.uci_sequence_number} > 255; the value will be truncated")
self.uci_sequence_number &= 255
_span.append((self.uci_sequence_number << 0))
_span.append((self.status << 0))
if self.tx_count > 255:
- print(f"Invalid value for field DataTransferStatusNtf::tx_count: {self.tx_count} > 255; the value will be truncated")
+ print(f"Invalid value for field SessionDataTransferStatusNtf::tx_count: {self.tx_count} > 255; the value will be truncated")
self.tx_count &= 255
_span.append((self.tx_count << 0))
- return SessionControlNotification.serialize(self, payload = bytes(_span))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 7
@dataclass
-class SessionQueryMaxDataSizeCmd(SessionConfigCommand):
+class SessionQueryMaxDataSizeInRangingCmd(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 11
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.COMMAND
+ self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeCmd', bytes]:
- if fields['opcode'] != 0xb or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.COMMAND:
+ def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingCmd', bytes]:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 4:
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:4], byteorder='little')
fields['session_token'] = value_
span = span[4:]
- return SessionQueryMaxDataSizeCmd(**fields), span
+ return SessionQueryMaxDataSizeInRangingCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
- print(f"Invalid value for field SessionQueryMaxDataSizeCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
+ print(f"Invalid value for field SessionQueryMaxDataSizeInRangingCmd::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
- return SessionConfigCommand.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 4
@dataclass
-class SessionQueryMaxDataSizeRsp(SessionConfigResponse):
+class SessionQueryMaxDataSizeInRangingRsp(SessionConfigPacket):
session_token: int = field(kw_only=True, default=0)
max_data_size: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 11
- self.gid = GroupId.SESSION_CONFIG
self.mt = MessageType.RESPONSE
+ self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING
+ self.gid = GroupId.SESSION_CONFIG
@staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeRsp', bytes]:
- if fields['opcode'] != 0xb or fields['gid'] != GroupId.SESSION_CONFIG or fields['mt'] != MessageType.RESPONSE:
+ def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingRsp', bytes]:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG:
raise Exception("Invalid constraint field values")
if len(span) < 6:
raise Exception('Invalid packet size')
@@ -4234,70 +3460,79 @@ class SessionQueryMaxDataSizeRsp(SessionConfigResponse):
value_ = int.from_bytes(span[4:6], byteorder='little')
fields['max_data_size'] = value_
span = span[6:]
- return SessionQueryMaxDataSizeRsp(**fields), span
+ return SessionQueryMaxDataSizeInRangingRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
if self.session_token > 4294967295:
- print(f"Invalid value for field SessionQueryMaxDataSizeRsp::session_token: {self.session_token} > 4294967295; the value will be truncated")
+ print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::session_token: {self.session_token} > 4294967295; the value will be truncated")
self.session_token &= 4294967295
_span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little'))
if self.max_data_size > 65535:
- print(f"Invalid value for field SessionQueryMaxDataSizeRsp::max_data_size: {self.max_data_size} > 65535; the value will be truncated")
+ print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::max_data_size: {self.max_data_size} > 65535; the value will be truncated")
self.max_data_size &= 65535
_span.extend(int.to_bytes((self.max_data_size << 0), length=2, byteorder='little'))
- return SessionConfigResponse.serialize(self, payload = bytes(_span))
+ return SessionConfigPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 6
@dataclass
-class SessionStartCmd(SessionControlCommand):
-
+class SessionStartCmd(SessionControlPacket):
+ session_id: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.COMMAND
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStartCmd', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
+ if len(span) < 4:
+ raise Exception('Invalid packet size')
+ value_ = int.from_bytes(span[0:4], byteorder='little')
+ fields['session_id'] = value_
+ span = span[4:]
return SessionStartCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return SessionControlCommand.serialize(self, payload = bytes(_span))
+ if self.session_id > 4294967295:
+ print(f"Invalid value for field SessionStartCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
+ self.session_id &= 4294967295
+ _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return 0
+ return 4
@dataclass
-class SessionStartRsp(SessionControlResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionStartRsp(SessionControlPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.RESPONSE
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStartRsp', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionStartRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return SessionControlResponse.serialize(self, payload = bytes(_span))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -4306,7 +3541,7 @@ class SessionStartRsp(SessionControlResponse):
@dataclass
class ShortAddressTwoWayRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
distance: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
@@ -4330,7 +3565,7 @@ class ShortAddressTwoWayRangingMeasurement(Packet):
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:2], byteorder='little')
fields['mac_address'] = value_
- fields['status'] = StatusCode.from_int(span[2])
+ fields['status'] = Status.from_int(span[2])
fields['nlos'] = span[3]
value_ = int.from_bytes(span[4:6], byteorder='little')
fields['distance'] = value_
@@ -4419,7 +3654,7 @@ class ShortAddressTwoWayRangingMeasurement(Packet):
@dataclass
class ExtendedAddressTwoWayRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
distance: int = field(kw_only=True, default=0)
aoa_azimuth: int = field(kw_only=True, default=0)
@@ -4443,7 +3678,7 @@ class ExtendedAddressTwoWayRangingMeasurement(Packet):
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:8], byteorder='little')
fields['mac_address'] = value_
- fields['status'] = StatusCode.from_int(span[8])
+ fields['status'] = Status.from_int(span[8])
fields['nlos'] = span[9]
value_ = int.from_bytes(span[10:12], byteorder='little')
fields['distance'] = value_
@@ -4530,7 +3765,7 @@ class ExtendedAddressTwoWayRangingMeasurement(Packet):
@dataclass
class ShortAddressOwrAoaRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
frame_sequence_number: int = field(kw_only=True, default=0)
block_index: int = field(kw_only=True, default=0)
@@ -4549,7 +3784,7 @@ class ShortAddressOwrAoaRangingMeasurement(Packet):
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:2], byteorder='little')
fields['mac_address'] = value_
- fields['status'] = StatusCode.from_int(span[2])
+ fields['status'] = Status.from_int(span[2])
fields['nlos'] = span[3]
fields['frame_sequence_number'] = span[4]
value_ = int.from_bytes(span[5:7], byteorder='little')
@@ -4607,7 +3842,7 @@ class ShortAddressOwrAoaRangingMeasurement(Packet):
@dataclass
class ExtendedAddressOwrAoaRangingMeasurement(Packet):
mac_address: int = field(kw_only=True, default=0)
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
nlos: int = field(kw_only=True, default=0)
frame_sequence_number: int = field(kw_only=True, default=0)
block_index: int = field(kw_only=True, default=0)
@@ -4626,7 +3861,7 @@ class ExtendedAddressOwrAoaRangingMeasurement(Packet):
raise Exception('Invalid packet size')
value_ = int.from_bytes(span[0:8], byteorder='little')
fields['mac_address'] = value_
- fields['status'] = StatusCode.from_int(span[8])
+ fields['status'] = Status.from_int(span[8])
fields['nlos'] = span[9]
fields['frame_sequence_number'] = span[10]
value_ = int.from_bytes(span[11:13], byteorder='little')
@@ -4696,7 +3931,7 @@ class RangingMeasurementType(enum.IntEnum):
@dataclass
-class SessionInfoNtf(SessionControlNotification):
+class SessionInfoNtf(SessionControlPacket):
sequence_number: int = field(kw_only=True, default=0)
session_token: int = field(kw_only=True, default=0)
rcr_indicator: int = field(kw_only=True, default=0)
@@ -4705,13 +3940,13 @@ class SessionInfoNtf(SessionControlNotification):
mac_address_indicator: MacAddressIndicator = field(kw_only=True, default=MacAddressIndicator.SHORT_ADDRESS)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionInfoNtf', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 24:
raise Exception('Invalid packet size')
@@ -4778,7 +4013,7 @@ class SessionInfoNtf(SessionControlNotification):
_span.append((self.mac_address_indicator << 0))
_span.extend([0] * 8)
_span.extend(payload or self.payload or [])
- return SessionControlNotification.serialize(self, payload = bytes(_span))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -4792,13 +4027,13 @@ class ShortMacTwoWaySessionInfoNtf(SessionInfoNtf):
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.TWO_WAY
self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ShortMacTwoWaySessionInfoNtf', bytes]:
- if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
@@ -4841,13 +4076,13 @@ class ExtendedMacTwoWaySessionInfoNtf(SessionInfoNtf):
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.TWO_WAY
self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacTwoWaySessionInfoNtf', bytes]:
- if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
@@ -4890,13 +4125,13 @@ class ShortMacDlTDoASessionInfoNtf(SessionInfoNtf):
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.DL_TDOA
self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ShortMacDlTDoASessionInfoNtf', bytes]:
- if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
@@ -4927,13 +4162,13 @@ class ExtendedMacDlTDoASessionInfoNtf(SessionInfoNtf):
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.DL_TDOA
self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacDlTDoASessionInfoNtf', bytes]:
- if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
@@ -4964,13 +4199,13 @@ class ShortMacOwrAoaSessionInfoNtf(SessionInfoNtf):
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.OWR_AOA
self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ShortMacOwrAoaSessionInfoNtf', bytes]:
- if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
@@ -5013,13 +4248,13 @@ class ExtendedMacOwrAoaSessionInfoNtf(SessionInfoNtf):
def __post_init__(self):
self.ranging_measurement_type = RangingMeasurementType.OWR_AOA
self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS
- self.opcode = 0
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.NOTIFICATION
+ self.oid = SessionControlOpcodeId.START
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacOwrAoaSessionInfoNtf', bytes]:
- if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['opcode'] != 0x0 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
@@ -5055,96 +4290,114 @@ class ExtendedMacOwrAoaSessionInfoNtf(SessionInfoNtf):
)
@dataclass
-class SessionStopCmd(SessionControlCommand):
-
+class SessionStopCmd(SessionControlPacket):
+ session_id: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.COMMAND
+ self.oid = SessionControlOpcodeId.STOP
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStopCmd', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
+ if len(span) < 4:
+ raise Exception('Invalid packet size')
+ value_ = int.from_bytes(span[0:4], byteorder='little')
+ fields['session_id'] = value_
+ span = span[4:]
return SessionStopCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return SessionControlCommand.serialize(self, payload = bytes(_span))
+ if self.session_id > 4294967295:
+ print(f"Invalid value for field SessionStopCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
+ self.session_id &= 4294967295
+ _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return 0
+ return 4
@dataclass
-class SessionStopRsp(SessionControlResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionStopRsp(SessionControlPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.RESPONSE
+ self.oid = SessionControlOpcodeId.STOP
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionStopRsp', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
return SessionStopRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return SessionControlResponse.serialize(self, payload = bytes(_span))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 1
@dataclass
-class SessionGetRangingCountCmd(SessionControlCommand):
-
+class SessionGetRangingCountCmd(SessionControlPacket):
+ session_id: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 3
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.COMMAND
+ self.oid = SessionControlOpcodeId.GET_RANGING_COUNT
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountCmd', bytes]:
- if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
+ if len(span) < 4:
+ raise Exception('Invalid packet size')
+ value_ = int.from_bytes(span[0:4], byteorder='little')
+ fields['session_id'] = value_
+ span = span[4:]
return SessionGetRangingCountCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return SessionControlCommand.serialize(self, payload = bytes(_span))
+ if self.session_id > 4294967295:
+ print(f"Invalid value for field SessionGetRangingCountCmd::session_id: {self.session_id} > 4294967295; the value will be truncated")
+ self.session_id &= 4294967295
+ _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little'))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
- return 0
+ return 4
@dataclass
-class SessionGetRangingCountRsp(SessionControlResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class SessionGetRangingCountRsp(SessionControlPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
count: int = field(kw_only=True, default=0)
def __post_init__(self):
- self.opcode = 3
- self.gid = GroupId.SESSION_CONTROL
self.mt = MessageType.RESPONSE
+ self.oid = SessionControlOpcodeId.GET_RANGING_COUNT
+ self.gid = GroupId.SESSION_CONTROL
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountRsp', bytes]:
- if fields['opcode'] != 0x3 or fields['gid'] != GroupId.SESSION_CONTROL or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL:
raise Exception("Invalid constraint field values")
if len(span) < 5:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:5], byteorder='little')
fields['count'] = value_
span = span[5:]
@@ -5157,30 +4410,30 @@ class SessionGetRangingCountRsp(SessionControlResponse):
print(f"Invalid value for field SessionGetRangingCountRsp::count: {self.count} > 4294967295; the value will be truncated")
self.count &= 4294967295
_span.extend(int.to_bytes((self.count << 0), length=4, byteorder='little'))
- return SessionControlResponse.serialize(self, payload = bytes(_span))
+ return SessionControlPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 5
@dataclass
-class AndroidGetPowerStatsCmd(AndroidCommand):
+class AndroidGetPowerStatsCmd(AndroidPacket):
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.VENDOR_ANDROID
self.mt = MessageType.COMMAND
+ self.oid = AndroidOpcodeId.GET_POWER_STATS
+ self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsCmd', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
return AndroidGetPowerStatsCmd(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
- return AndroidCommand.serialize(self, payload = bytes(_span))
+ return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -5188,7 +4441,7 @@ class AndroidGetPowerStatsCmd(AndroidCommand):
@dataclass
class PowerStats(Packet):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+ status: Status = field(kw_only=True, default=Status.OK)
idle_time_ms: int = field(kw_only=True, default=0)
tx_time_ms: int = field(kw_only=True, default=0)
rx_time_ms: int = field(kw_only=True, default=0)
@@ -5202,7 +4455,7 @@ class PowerStats(Packet):
fields = {'payload': None}
if len(span) < 17:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
value_ = int.from_bytes(span[1:5], byteorder='little')
fields['idle_time_ms'] = value_
value_ = int.from_bytes(span[5:9], byteorder='little')
@@ -5240,17 +4493,17 @@ class PowerStats(Packet):
return 17
@dataclass
-class AndroidGetPowerStatsRsp(AndroidResponse):
+class AndroidGetPowerStatsRsp(AndroidPacket):
stats: PowerStats = field(kw_only=True, default_factory=PowerStats)
def __post_init__(self):
- self.opcode = 0
- self.gid = GroupId.VENDOR_ANDROID
self.mt = MessageType.RESPONSE
+ self.oid = AndroidOpcodeId.GET_POWER_STATS
+ self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsRsp', bytes]:
- if fields['opcode'] != 0x0 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 17:
raise Exception('Invalid packet size')
@@ -5261,24 +4514,24 @@ class AndroidGetPowerStatsRsp(AndroidResponse):
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.stats.serialize())
- return AndroidResponse.serialize(self, payload = bytes(_span))
+ return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 17
@dataclass
-class AndroidSetCountryCodeCmd(AndroidCommand):
+class AndroidSetCountryCodeCmd(AndroidPacket):
country_code: bytearray = field(kw_only=True, default_factory=bytearray)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.VENDOR_ANDROID
self.mt = MessageType.COMMAND
+ self.oid = AndroidOpcodeId.SET_COUNTRY_CODE
+ self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeCmd', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.COMMAND:
+ if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 2:
raise Exception('Invalid packet size')
@@ -5289,35 +4542,35 @@ class AndroidSetCountryCodeCmd(AndroidCommand):
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.extend(self.country_code)
- return AndroidCommand.serialize(self, payload = bytes(_span))
+ return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return 2
@dataclass
-class AndroidSetCountryCodeRsp(AndroidResponse):
- status: StatusCode = field(kw_only=True, default=StatusCode.UCI_STATUS_OK)
+class AndroidSetCountryCodeRsp(AndroidPacket):
+ status: Status = field(kw_only=True, default=Status.OK)
def __post_init__(self):
- self.opcode = 1
- self.gid = GroupId.VENDOR_ANDROID
self.mt = MessageType.RESPONSE
+ self.oid = AndroidOpcodeId.SET_COUNTRY_CODE
+ self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeRsp', bytes]:
- if fields['opcode'] != 0x1 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.RESPONSE:
+ if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 1:
raise Exception('Invalid packet size')
- fields['status'] = StatusCode.from_int(span[0])
+ fields['status'] = Status.from_int(span[0])
span = span[1:]
return AndroidSetCountryCodeRsp(**fields), span
def serialize(self, payload: bytes = None) -> bytes:
_span = bytearray()
_span.append((self.status << 0))
- return AndroidResponse.serialize(self, payload = bytes(_span))
+ return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
@@ -5705,19 +4958,19 @@ class FrameReport(Packet):
return sum([elt.size for elt in self.frame_report_tlvs]) + 4
@dataclass
-class AndroidRangeDiagnosticsNtf(AndroidNotification):
+class AndroidRangeDiagnosticsNtf(AndroidPacket):
session_token: int = field(kw_only=True, default=0)
sequence_number: int = field(kw_only=True, default=0)
frame_reports: List[FrameReport] = field(kw_only=True, default_factory=list)
def __post_init__(self):
- self.opcode = 2
- self.gid = GroupId.VENDOR_ANDROID
self.mt = MessageType.NOTIFICATION
+ self.oid = AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS
+ self.gid = GroupId.VENDOR_ANDROID
@staticmethod
def parse(fields: dict, span: bytes) -> Tuple['AndroidRangeDiagnosticsNtf', bytes]:
- if fields['opcode'] != 0x2 or fields['gid'] != GroupId.VENDOR_ANDROID or fields['mt'] != MessageType.NOTIFICATION:
+ if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS or fields['gid'] != GroupId.VENDOR_ANDROID:
raise Exception("Invalid constraint field values")
if len(span) < 9:
raise Exception('Invalid packet size')
@@ -5750,424 +5003,8 @@ class AndroidRangeDiagnosticsNtf(AndroidNotification):
_span.append((len(self.frame_reports) << 0))
for _elt in self.frame_reports:
_span.extend(_elt.serialize())
- return AndroidNotification.serialize(self, payload = bytes(_span))
+ return AndroidPacket.serialize(self, payload = bytes(_span))
@property
def size(self) -> int:
return sum([elt.size for elt in self.frame_reports]) + 9
-
-@dataclass
-class UciVendor_9_Command(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_9
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_9_Command', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_9 or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_9_Command(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_A_Command(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_A
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Command', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_A or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_A_Command(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_B_Command(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_B
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Command', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_B or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_B_Command(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_E_Command(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_E
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Command', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_E or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_E_Command(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_F_Command(UciCommand):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_F
- self.mt = MessageType.COMMAND
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Command', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_F or fields['mt'] != MessageType.COMMAND:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_F_Command(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciCommand.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_9_Response(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_9
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_9_Response', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_9 or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_9_Response(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_A_Response(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_A
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Response', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_A or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_A_Response(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_B_Response(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_B
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Response', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_B or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_B_Response(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_E_Response(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_E
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Response', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_E or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_E_Response(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_F_Response(UciResponse):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_F
- self.mt = MessageType.RESPONSE
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Response', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_F or fields['mt'] != MessageType.RESPONSE:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_F_Response(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciResponse.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_9_Notification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_9
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_9_Notification', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_9 or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_9_Notification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_A_Notification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_A
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Notification', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_A or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_A_Notification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_B_Notification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_B
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Notification', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_B or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_B_Notification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_E_Notification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_E
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Notification', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_E or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_E_Notification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class UciVendor_F_Notification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.VENDOR_RESERVED_F
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Notification', bytes]:
- if fields['gid'] != GroupId.VENDOR_RESERVED_F or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return UciVendor_F_Notification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
-
-@dataclass
-class TestNotification(UciNotification):
-
-
- def __post_init__(self):
- self.gid = GroupId.TEST
- self.mt = MessageType.NOTIFICATION
-
- @staticmethod
- def parse(fields: dict, span: bytes) -> Tuple['TestNotification', bytes]:
- if fields['gid'] != GroupId.TEST or fields['mt'] != MessageType.NOTIFICATION:
- raise Exception("Invalid constraint field values")
- payload = span
- span = bytes([])
- fields['payload'] = payload
- return TestNotification(**fields), span
-
- def serialize(self, payload: bytes = None) -> bytes:
- _span = bytearray()
- _span.extend(payload or self.payload or [])
- return UciNotification.serialize(self, payload = bytes(_span))
-
- @property
- def size(self) -> int:
- return len(self.payload)
diff --git a/src/device.rs b/src/device.rs
index 449ec7b..e0fcf61 100644
--- a/src/device.rs
+++ b/src/device.rs
@@ -17,7 +17,6 @@ use crate::MacAddress;
use crate::PicaCommand;
use std::collections::HashMap;
-use std::iter::Extend;
use std::time::Duration;
use tokio::sync::mpsc;
@@ -77,15 +76,36 @@ pub const DEFAULT_CAPS_INFO: &[(CapTlvType, &[u8])] = &[
),
];
+/// [UCI] 8.2 Device Configuration Parameters
+pub struct DeviceConfig {
+ device_state: DeviceState,
+ // This config is used to enable/disable the low power mode.
+ // 0x00 = Disable low power mode
+ // 0x01 = Enable low power mode (default)
+ low_power_mode: bool,
+}
+
+// [UCI] 6.3.1 Setting the Configuration
+// All device configuration parameters within the UWBS are set to
+// default values at Table 44 [..].
+impl Default for DeviceConfig {
+ fn default() -> Self {
+ DeviceConfig {
+ device_state: DeviceState::DeviceStateError,
+ low_power_mode: true,
+ }
+ }
+}
+
pub struct Device {
pub handle: usize,
pub mac_address: MacAddress,
+ config: DeviceConfig,
/// [UCI] 5. UWBS Device State Machine
state: DeviceState,
sessions: HashMap<u32, Session>,
pub tx: mpsc::UnboundedSender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
- config: HashMap<DeviceConfigId, Vec<u8>>,
country_code: [u8; 2],
pub n_active_sessions: usize,
@@ -101,11 +121,11 @@ impl Device {
Device {
handle,
mac_address,
+ config: Default::default(),
state: DeviceState::DeviceStateError, // Will be overwitten
sessions: Default::default(),
tx,
pica_tx,
- config: HashMap::new(),
country_code: Default::default(),
n_active_sessions: 0,
}
@@ -236,26 +256,41 @@ impl Device {
log::debug!("[{}] SetConfig", self.handle);
assert_eq!(self.state, DeviceState::DeviceStateReady); // UCI 6.3
- let (valid_parameters, invalid_config_status) = cmd.get_tlvs().iter().fold(
- (HashMap::new(), Vec::new()),
- |(mut valid_parameters, invalid_config_status), param| {
- // TODO: DeviceState is a read only parameter
- valid_parameters.insert(param.cfg_id, param.v.clone());
-
- (valid_parameters, invalid_config_status)
- },
- );
-
- let (status, parameters) = if invalid_config_status.is_empty() {
- self.config.extend(valid_parameters);
- (uci::Status::Ok, Vec::new())
- } else {
- (uci::Status::InvalidParam, invalid_config_status)
- };
+ // [UCI] 6.3.1 Setting the Configuration
+ // The UWBS shall respond with CORE_SET_CONFIG_RSP setting the Status
+ // field of STATUS_INVALID_PARAM and including one or more invalid
+ // Parameter ID(s) If the Host tries to set a parameter that is not
+ // available in the UWBS. All other configuration parameters should
+ // have been set to the new values within the UWBS.
+ let mut invalid_parameters = vec![];
+ for parameter in cmd.get_parameters() {
+ match parameter.id {
+ uci::ConfigParameterId::DeviceState => {
+ invalid_parameters.push(uci::ConfigParameterStatus {
+ id: parameter.id,
+ status: uci::Status::ReadOnly,
+ })
+ }
+ uci::ConfigParameterId::LowPowerMode => {
+ self.config.low_power_mode = parameter.value.first().copied().unwrap_or(1) != 0;
+ }
+ uci::ConfigParameterId::Rfu(id) => {
+ log::warn!("unknown config parameter id 0x{:02x}", *id);
+ invalid_parameters.push(uci::ConfigParameterStatus {
+ id: parameter.id,
+ status: uci::Status::InvalidParam,
+ })
+ }
+ }
+ }
CoreSetConfigRspBuilder {
- cfg_status: parameters,
- status,
+ status: if invalid_parameters.is_empty() {
+ uci::Status::Ok
+ } else {
+ uci::Status::InvalidParam
+ },
+ parameters: invalid_parameters,
}
.build()
}
@@ -263,45 +298,45 @@ impl Device {
pub fn core_get_config(&self, cmd: CoreGetConfigCmd) -> CoreGetConfigRsp {
log::debug!("[{}] GetConfig", self.handle);
- // TODO: do this config shall be set on device reset
- let ids = cmd.get_cfg_id();
- let (valid_parameters, invalid_parameters) = ids.iter().fold(
- (Vec::new(), Vec::new()),
- |(mut valid_parameters, mut invalid_parameters), id| {
- // UCI Core Section 6.3.2 Table 8
- // UCI Core Section 6.3.2 - Return the Configuration
- // If the status code is ok, return the params
- // If there is at least one invalid param, return the list of invalid params
- // If the ID is not present in our config, return the Type with length = 0
- match DeviceConfigId::try_from(*id) {
- Ok(cfg_id) => match self.config.get(&cfg_id) {
- Some(value) => valid_parameters.push(DeviceConfigTlv {
- cfg_id,
- v: value.clone(),
- }),
- None => invalid_parameters.push(DeviceConfigTlv {
- cfg_id,
- v: Vec::new(),
- }),
- },
- Err(_) => log::error!("Failed to parse config id: {:?}", id),
- }
-
- (valid_parameters, invalid_parameters)
- },
- );
+ // [UCI] 6.3.2 Retrieve the Configuration
+ // If the Host tries to retrieve any Parameter(s) that are not available
+ // in the UWBS, the UWBS shall respond with a CORE_GET_CONFIG_RSP with
+ // a Status field of STATUS_INVALID_PARAM, containing each unavailable
+ // Device Configuration Parameter Type with Length field is zero. In
+ // this case, the CORE_GET_CONFIG_RSP shall not include any parameter(s)
+ // that are available in the UWBS.
+ let mut valid_parameters = vec![];
+ let mut invalid_parameters = vec![];
+ for id in cmd.get_parameter_ids() {
+ match id {
+ ConfigParameterId::DeviceState => valid_parameters.push(ConfigParameter {
+ id: *id,
+ value: vec![self.config.device_state.into()],
+ }),
+ ConfigParameterId::LowPowerMode => valid_parameters.push(ConfigParameter {
+ id: *id,
+ value: vec![self.config.low_power_mode.into()],
+ }),
+ ConfigParameterId::Rfu(_) => invalid_parameters.push(ConfigParameter {
+ id: *id,
+ value: vec![],
+ }),
+ }
+ }
- let (status, parameters) = if invalid_parameters.is_empty() {
- (uci::Status::Ok, valid_parameters)
+ if invalid_parameters.is_empty() {
+ CoreGetConfigRspBuilder {
+ status: uci::Status::Ok,
+ parameters: valid_parameters,
+ }
+ .build()
} else {
- (uci::Status::InvalidParam, invalid_parameters)
- };
-
- CoreGetConfigRspBuilder {
- status,
- tlvs: parameters,
+ CoreGetConfigRspBuilder {
+ status: uci::Status::InvalidParam,
+ parameters: invalid_parameters,
+ }
+ .build()
}
- .build()
}
fn session_init(&mut self, cmd: SessionInitCmd) -> SessionInitRsp {
diff --git a/src/lib.rs b/src/lib.rs
index d3168c4..2c46733 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -467,7 +467,7 @@ impl Pica {
data_sequence_number: 0x01,
pbf: PacketBoundaryFlag::Complete,
session_handle: session_id,
- source_address: device.mac_address.into(),
+ source_address: session.app_config.device_mac_address.unwrap().into(),
status: uci::Status::Ok,
}
.build()
diff --git a/src/mac_address.rs b/src/mac_address.rs
index 6b67e7e..6c93981 100644
--- a/src/mac_address.rs
+++ b/src/mac_address.rs
@@ -43,15 +43,21 @@ impl MacAddress {
}
}
-impl From<MacAddress> for u64 {
- fn from(mac_address: MacAddress) -> Self {
+impl From<&MacAddress> for u64 {
+ fn from(mac_address: &MacAddress) -> Self {
match mac_address {
- MacAddress::Short(addr) => u16::from_le_bytes(addr) as u64,
- MacAddress::Extended(addr) => u64::from_le_bytes(addr),
+ MacAddress::Short(addr) => u16::from_le_bytes(*addr) as u64,
+ MacAddress::Extended(addr) => u64::from_le_bytes(*addr),
}
}
}
+impl From<MacAddress> for u64 {
+ fn from(mac_address: MacAddress) -> Self {
+ u64::from(&mac_address)
+ }
+}
+
impl From<&MacAddress> for Vec<u8> {
fn from(mac_address: &MacAddress) -> Self {
match mac_address {
diff --git a/src/uci_packets.pdl b/src/uci_packets.pdl
index 06fd9b0..4a2f69f 100644
--- a/src/uci_packets.pdl
+++ b/src/uci_packets.pdl
@@ -166,11 +166,6 @@ enum ResetConfig : 8 {
UWBS_RESET = 0x00,
}
-enum DeviceConfigId : 8 {
- DEVICE_STATE = 0x00,
- LOW_POWER_MODE = 0x01,
-}
-
// [UCI] Table 45: APP Configuration Parameters IDs
enum AppConfigTlvType : 8 {
DEVICE_TYPE = 0x00,
@@ -774,30 +769,37 @@ test CoreGetCapsInfoRsp {
"\x40\x03\x00\x05\x00\x00\x00\x00\x01\x00\x01\x01",
}
-struct DeviceConfigTlv {
- cfg_id: DeviceConfigId,
- _count_(v): 8,
- v: 8[],
+// [UCI] Table 44: Device Configuration Parameters
+enum ConfigParameterId : 8 {
+ DEVICE_STATE = 0x00,
+ LOW_POWER_MODE = 0x01,
+ RFU = ..,
+}
+
+struct ConfigParameter {
+ id: ConfigParameterId,
+ _size_(value): 8,
+ value: 8[],
}
packet CoreSetConfigCmd : CorePacket (mt = COMMAND, oid = SET_CONFIG) {
- _count_(tlvs): 8,
- tlvs: DeviceConfigTlv[],
+ _count_(parameters): 8,
+ parameters: ConfigParameter[],
}
test CoreSetConfigCmd {
"\x20\x04\x00\x03\x00\x00\x00\x01\x01\x00",
}
-struct DeviceConfigStatus {
- cfg_id: DeviceConfigId,
+struct ConfigParameterStatus {
+ id: ConfigParameterId,
status: Status,
}
packet CoreSetConfigRsp : CorePacket (mt = RESPONSE, oid = SET_CONFIG) {
status: Status,
- _count_(cfg_status): 8,
- cfg_status: DeviceConfigStatus[],
+ _count_(parameters): 8,
+ parameters: ConfigParameterStatus[],
}
test CoreSetConfigRsp {
@@ -806,8 +808,8 @@ test CoreSetConfigRsp {
}
packet CoreGetConfigCmd : CorePacket (mt = COMMAND, oid = GET_CONFIG) {
- _count_(cfg_id): 8,
- cfg_id: 8[], // DeviceConfigId
+ _count_(parameter_ids): 8,
+ parameter_ids: ConfigParameterId[],
}
test CoreGetConfigCmd {
@@ -816,8 +818,8 @@ test CoreGetConfigCmd {
packet CoreGetConfigRsp : CorePacket (mt = RESPONSE, oid = GET_CONFIG) {
status: Status,
- _count_(tlvs): 8,
- tlvs: DeviceConfigTlv[]
+ _count_(parameters): 8,
+ parameters: ConfigParameter[]
}
test CoreGetConfigRsp {
diff --git a/tests/data_transfer.py b/tests/data_transfer.py
index 7449a35..6d4fb8c 100755
--- a/tests/data_transfer.py
+++ b/tests/data_transfer.py
@@ -35,7 +35,7 @@ async def controller(host: Host, peer: Host, file: Path):
)
)
- await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -93,7 +93,7 @@ async def controller(host: Host, peer: Host, file: Path):
)
await host.expect_control(
- uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[])
+ uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
)
await host.expect_control(
@@ -109,7 +109,7 @@ async def controller(host: Host, peer: Host, file: Path):
# START SESSION CMD
host.send_control(uci.SessionStartCmd(session_id=0))
- await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -120,7 +120,7 @@ async def controller(host: Host, peer: Host, file: Path):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
)
event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
@@ -132,7 +132,7 @@ async def controller(host: Host, peer: Host, file: Path):
# STOP SESSION
host.send_control(uci.SessionStopCmd(session_id=0))
- await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -143,13 +143,13 @@ async def controller(host: Host, peer: Host, file: Path):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)
# DEINIT
host.send_control(uci.SessionDeinitCmd(session_token=0))
- await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
async def controlee(host: Host, peer: Host, file: Path):
@@ -162,7 +162,7 @@ async def controlee(host: Host, peer: Host, file: Path):
)
)
- await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -220,7 +220,7 @@ async def controlee(host: Host, peer: Host, file: Path):
)
await host.expect_control(
- uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[])
+ uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
)
await host.expect_control(
@@ -233,7 +233,7 @@ async def controlee(host: Host, peer: Host, file: Path):
host.send_control(uci.SessionStartCmd(session_id=0))
- await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -244,7 +244,7 @@ async def controlee(host: Host, peer: Host, file: Path):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
)
with file.open("rb") as f:
@@ -252,8 +252,8 @@ async def controlee(host: Host, peer: Host, file: Path):
event = await host.expect_data(
uci.DataMessageRcv(
session_handle=0,
- status=uci.StatusCode.UCI_STATUS_OK,
- source_address=int.from_bytes(peer.mac_address, "big"),
+ status=uci.Status.OK,
+ source_address=int.from_bytes(peer.mac_address, "little"),
data_sequence_number=0x01,
application_data=application_data,
),
@@ -266,7 +266,7 @@ async def controlee(host: Host, peer: Host, file: Path):
host.send_control(uci.SessionStopCmd(session_id=0))
- await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -277,12 +277,12 @@ async def controlee(host: Host, peer: Host, file: Path):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)
host.send_control(uci.SessionDeinitCmd(session_token=0))
- await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
async def data_transfer(
@@ -322,7 +322,7 @@ async def data_transfer(
seq_num = 0
event = await host.expect_control(
- uci.DataCreditNtf(
+ uci.SessionDataCreditNtf(
session_token=int(session_id),
credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE,
)
@@ -338,7 +338,7 @@ async def data_transfer(
)
)
event = await host.expect_control(
- uci.DataCreditNtf(
+ uci.SessionDataCreditNtf(
session_token=int(session_id),
credit_availability=uci.CreditAvailability.CREDIT_AVAILABLE,
)
@@ -351,8 +351,8 @@ async def data_transfer(
async def run(address: str, uci_port: int, file: Path):
try:
- host0 = await Host.connect(address, uci_port, bytes([0, 0]))
- host1 = await Host.connect(address, uci_port, bytes([0, 1]))
+ host0 = await Host.connect(address, uci_port, bytes([0x34, 0x12]))
+ host1 = await Host.connect(address, uci_port, bytes([0x78, 0x56]))
except Exception as e:
raise Exception(
f"Failed to connect to Pica server at address {address}:{uci_port}\n"
diff --git a/tests/helper.py b/tests/helper.py
index bbce7ad..f5d9db5 100644
--- a/tests/helper.py
+++ b/tests/helper.py
@@ -18,13 +18,13 @@ from pica.packets import uci
async def init(host: Host):
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)
- host.send_control(uci.DeviceResetCmd(reset_config=uci.ResetConfig.UWBS_RESET))
+ host.send_control(uci.CoreDeviceResetCmd(reset_config=uci.ResetConfig.UWBS_RESET))
- await host.expect_control(uci.DeviceResetRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.CoreDeviceResetRsp(status=uci.Status.OK))
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)
diff --git a/tests/ranging.py b/tests/ranging.py
index 9af4376..165c5e3 100755
--- a/tests/ranging.py
+++ b/tests/ranging.py
@@ -32,7 +32,7 @@ async def controller(host: Host, peer: Host):
)
)
- await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -90,7 +90,7 @@ async def controller(host: Host, peer: Host):
)
await host.expect_control(
- uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[])
+ uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
)
await host.expect_control(
@@ -103,7 +103,7 @@ async def controller(host: Host, peer: Host):
host.send_control(uci.SessionStartCmd(session_id=0))
- await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -114,7 +114,7 @@ async def controller(host: Host, peer: Host):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
)
for _ in range(1, 3):
@@ -123,7 +123,7 @@ async def controller(host: Host, peer: Host):
host.send_control(uci.SessionStopCmd(session_id=0))
- await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -134,12 +134,12 @@ async def controller(host: Host, peer: Host):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)
host.send_control(uci.SessionDeinitCmd(session_token=0))
- await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
async def controlee(host: Host, peer: Host):
@@ -151,7 +151,7 @@ async def controlee(host: Host, peer: Host):
)
)
- await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionInitRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -209,7 +209,7 @@ async def controlee(host: Host, peer: Host):
)
await host.expect_control(
- uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[])
+ uci.SessionSetAppConfigRsp(status=uci.Status.OK, cfg_status=[])
)
await host.expect_control(
@@ -222,7 +222,7 @@ async def controlee(host: Host, peer: Host):
host.send_control(uci.SessionStartCmd(session_id=0))
- await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStartRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -233,7 +233,7 @@ async def controlee(host: Host, peer: Host):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
)
for _ in range(1, 3):
@@ -242,7 +242,7 @@ async def controlee(host: Host, peer: Host):
host.send_control(uci.SessionStopCmd(session_id=0))
- await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionStopRsp(status=uci.Status.OK))
await host.expect_control(
uci.SessionStatusNtf(
@@ -253,12 +253,12 @@ async def controlee(host: Host, peer: Host):
)
await host.expect_control(
- uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ uci.CoreDeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)
host.send_control(uci.SessionDeinitCmd(session_token=0))
- await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+ await host.expect_control(uci.SessionDeinitRsp(status=uci.Status.OK))
async def run(address: str, uci_port: int):