aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2024-03-14 18:12:07 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2024-03-14 18:12:07 +0000
commit3479879194f41ad1d26b06870befde1c5c6e76c5 (patch)
tree09481a8f820f07e7f2912d53fab5ca71123c7d1f
parent1ddf2391d1f56623688c5d29f91e5f9f1ef2315d (diff)
parentd32b655bf547114b57812e3218d61607a23382ff (diff)
downloadpica-emu-35-1-release.tar.gz
Snap for 11574415 from d32b655bf547114b57812e3218d61607a23382ff to emu-35-1-releaseemu-35-1-release
Change-Id: I36dca99114da032fefce0f7b3258fe1d6b20590d
-rw-r--r--.github/workflows/build_and_test.yml4
-rw-r--r--Android.bp7
-rw-r--r--Cargo.lock304
-rw-r--r--Cargo.toml32
-rw-r--r--METADATA4
-rw-r--r--src/bin/http-server/main.rs564
-rw-r--r--src/bin/http-server/position.rs (renamed from src/position.rs)0
-rw-r--r--src/bin/main.rs (renamed from src/bin/server/mod.rs)65
-rw-r--r--src/bin/server/web.rs269
-rw-r--r--src/device.rs172
-rw-r--r--src/lib.rs871
-rw-r--r--src/mac_address.rs61
-rw-r--r--src/packets.rs170
-rw-r--r--src/pcapng.rs61
-rw-r--r--src/session.rs78
-rwxr-xr-xtests/data_transfer.py190
-rw-r--r--tests/test_runner.py3
17 files changed, 1754 insertions, 1101 deletions
diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml
index 7a19021..bf154ef 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -16,10 +16,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- - name: Install Rust 1.67.1
+ - name: Install Rust 1.76.0
uses: actions-rs/toolchain@v1
with:
- toolchain: 1.67.1
+ toolchain: 1.76.0
override: true
components: rustfmt, clippy
- name: Set Up Python 3.11
diff --git a/Android.bp b/Android.bp
index f8ff3de..42d3c8b 100644
--- a/Android.bp
+++ b/Android.bp
@@ -33,8 +33,10 @@ rust_library_host {
rustlibs: [
"libanyhow",
"libbytes",
+ "libfutures",
"libglam",
"libhex",
+ "liblog_rust",
"libnum_traits",
"libpdl_runtime",
"libthiserror",
@@ -45,10 +47,13 @@ rust_library_host {
rust_binary_host {
name: "pica",
- srcs: ["src/bin/server/mod.rs"],
+ srcs: ["src/bin/main.rs"],
proc_macros: ["libnum_derive"],
rustlibs: [
"libanyhow",
+ "libenv_logger",
+ "libfutures",
+ "liblog_rust",
"libpica",
"libclap",
"libtokio",
diff --git a/Cargo.lock b/Cargo.lock
index 1c9bfdb..a59c5c0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -18,6 +18,63 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
+name = "aho-corasick"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "anstream"
+version = "0.6.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
+dependencies = [
+ "anstyle",
+ "anstyle-parse",
+ "anstyle-query",
+ "anstyle-wincon",
+ "colorchoice",
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc"
+
+[[package]]
+name = "anstyle-parse"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c"
+dependencies = [
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle-query"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
+dependencies = [
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "anstyle-wincon"
+version = "3.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
+dependencies = [
+ "anstyle",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
name = "anyhow"
version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -156,6 +213,12 @@ dependencies = [
]
[[package]]
+name = "colorchoice"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
+
+[[package]]
name = "cpufeatures"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -185,48 +248,121 @@ dependencies = [
]
[[package]]
+name = "env_filter"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea"
+dependencies = [
+ "log",
+ "regex",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8"
+dependencies = [
+ "anstream",
+ "anstyle",
+ "env_filter",
+ "humantime",
+ "log",
+]
+
+[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
+name = "futures"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
-version = "0.3.21"
+version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
+checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
name = "futures-core"
-version = "0.3.21"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.38",
+]
[[package]]
name = "futures-sink"
-version = "0.3.21"
+version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
+checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
-version = "0.3.21"
+version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
+checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
-version = "0.3.21"
+version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
+checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
+ "slab",
]
[[package]]
@@ -307,6 +443,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
+name = "humantime"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
+
+[[package]]
name = "hyper"
version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -349,12 +491,9 @@ checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
[[package]]
name = "log"
-version = "0.4.16"
+version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
-dependencies = [
- "cfg-if",
-]
+checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "memchr"
@@ -379,7 +518,7 @@ checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [
"libc",
"wasi",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -514,9 +653,12 @@ dependencies = [
"anyhow",
"bytes",
"clap",
+ "env_logger",
+ "futures",
"glam",
"hex",
"hyper",
+ "log",
"num-derive",
"num-traits",
"pdl-compiler",
@@ -593,6 +735,35 @@ dependencies = [
]
[[package]]
+name = "regex"
+version = "1.10.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-automata",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
+
+[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -647,6 +818,15 @@ dependencies = [
]
[[package]]
+name = "slab"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
name = "socket2"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -663,7 +843,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
dependencies = [
"libc",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -731,7 +911,7 @@ dependencies = [
"pin-project-lite",
"socket2 0.5.5",
"tokio-macros",
- "windows-sys",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -834,6 +1014,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
+name = "utf8parse"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
+
+[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -892,7 +1078,16 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
- "windows-targets",
+ "windows-targets 0.48.0",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
+dependencies = [
+ "windows-targets 0.52.0",
]
[[package]]
@@ -901,13 +1096,28 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
- "windows_aarch64_gnullvm",
- "windows_aarch64_msvc",
- "windows_i686_gnu",
- "windows_i686_msvc",
- "windows_x86_64_gnu",
- "windows_x86_64_gnullvm",
- "windows_x86_64_msvc",
+ "windows_aarch64_gnullvm 0.48.0",
+ "windows_aarch64_msvc 0.48.0",
+ "windows_i686_gnu 0.48.0",
+ "windows_i686_msvc 0.48.0",
+ "windows_x86_64_gnu 0.48.0",
+ "windows_x86_64_gnullvm 0.48.0",
+ "windows_x86_64_msvc 0.48.0",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
+dependencies = [
+ "windows_aarch64_gnullvm 0.52.0",
+ "windows_aarch64_msvc 0.52.0",
+ "windows_i686_gnu 0.52.0",
+ "windows_i686_msvc 0.52.0",
+ "windows_x86_64_gnu 0.52.0",
+ "windows_x86_64_gnullvm 0.52.0",
+ "windows_x86_64_msvc 0.52.0",
]
[[package]]
@@ -917,37 +1127,79 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
+
+[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
+name = "windows_aarch64_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
+
+[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
+name = "windows_i686_gnu"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
+
+[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
+name = "windows_i686_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
+
+[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
+name = "windows_x86_64_gnu"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
+
+[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
+
+[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
+
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
diff --git a/Cargo.toml b/Cargo.toml
index e4955e4..7fea374 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "pica"
-version = "0.1.7"
+version = "0.1.8"
edition = "2021"
description = "Pica is a virtual UWB Controller implementing the FiRa UCI specification."
repository = "https://github.com/google/pica"
@@ -14,7 +14,7 @@ authors = [
"David De Jesus Duarte <licorne@google.com>",
"Henri Chataing <henrichataing@google.com>",
]
-default-run = "pica-server"
+default-run = "pica"
exclude = [
"res/*",
"scripts/*"
@@ -28,8 +28,13 @@ name = "pica"
path = "src/lib.rs"
[[bin]]
-name = "pica-server"
-path = "src/bin/server/mod.rs"
+name = "pica"
+path = "src/bin/main.rs"
+
+[[bin]]
+name = "pica-http"
+path = "src/bin/http-server/main.rs"
+features = ["web"]
[features]
default = ["web"]
@@ -39,17 +44,20 @@ web = ["hyper", "tokio/rt-multi-thread"]
pdl-compiler = "0.2.3"
[dependencies]
-tokio = { version = "1.32.0", features = [ "fs", "io-util", "macros", "net", "rt" ] }
-tokio-stream = { version = "0.1.8", features = ["sync"] }
-bytes = "1"
anyhow = "1.0.56"
+bytes = "1"
+futures = "0.3.30"
+clap = { version = "4.1.8", default-features = false, features = ["derive", "error-context", "help", "std", "usage"] }
+glam = "0.25.0"
+hex = "0.4.3"
+hyper = { version = "0.14", features = ["server", "stream", "http1", "tcp"], optional = true }
+log = "0.4.20"
+env_logger = "0.11.1"
num-derive = "0.3.3"
num-traits = "0.2.17"
pdl-runtime = "0.2.2"
-thiserror = "1.0.49"
-glam = "0.25.0"
-hyper = { version = "0.14", features = ["server", "stream", "http1", "tcp"], optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
-hex = "0.4.3"
-clap = { version = "4.1.8", default-features = false, features = ["derive", "error-context", "help", "std", "usage"] }
+thiserror = "1.0.49"
+tokio = { version = "1.32.0", features = [ "fs", "io-util", "macros", "net", "rt" ] }
+tokio-stream = { version = "0.1.8", features = ["sync"] }
diff --git a/METADATA b/METADATA
index af3238a..36c8e14 100644
--- a/METADATA
+++ b/METADATA
@@ -12,7 +12,7 @@ third_party {
type: GIT
value: "https://github.com/google/pica.git"
}
- version: "v0.1.0"
+ version: "v0.1.8"
license_type: NOTICE
- last_upgrade_date { year: 2022 month: 12 day: 5 }
+ last_upgrade_date { year: 2024 month: 3 day: 13 }
}
diff --git a/src/bin/http-server/main.rs b/src/bin/http-server/main.rs
new file mode 100644
index 0000000..0882661
--- /dev/null
+++ b/src/bin/http-server/main.rs
@@ -0,0 +1,564 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use anyhow::Result;
+use clap::Parser;
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode};
+use serde::{Deserialize, Serialize};
+use serde_json::error::Category as SerdeErrorCategory;
+use std::collections::HashMap;
+use std::convert::Infallible;
+use std::net::{Ipv4Addr, SocketAddrV4};
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::sync::Mutex;
+use tokio::net::TcpListener;
+use tokio::sync::{broadcast, mpsc, oneshot};
+use tokio::try_join;
+use tokio_stream::{wrappers::BroadcastStream, StreamExt};
+
+use pica::{Category, MacAddress, Pica, PicaCommand, PicaCommandError, PicaEvent};
+
+mod position;
+use position::Position;
+
+const DEFAULT_UCI_PORT: u16 = 7000;
+const DEFAULT_WEB_PORT: u16 = 3000;
+
+const STATIC_FILES: &[(&str, &str, &str)] = &[
+ ("/", "text/html", include_str!("../../../static/index.html")),
+ (
+ "/openapi",
+ "text/html",
+ include_str!("../../../static/openapi.html"),
+ ),
+ (
+ "/openapi.yaml",
+ "text/yaml",
+ include_str!("../../../static/openapi.yaml"),
+ ),
+ (
+ "/src/components/Map.js",
+ "application/javascript",
+ include_str!("../../../static/src/components/Map.js"),
+ ),
+ (
+ "/src/components/DeviceInfo.js",
+ "application/javascript",
+ include_str!("../../../static/src/components/DeviceInfo.js"),
+ ),
+ (
+ "/src/components/Orientation.js",
+ "application/javascript",
+ include_str!("../../../static/src/components/Orientation.js"),
+ ),
+];
+
+/// Record information about an active device.
+#[derive(Debug, Serialize, Clone)]
+struct DeviceInformation {
+ category: pica::Category,
+ mac_address: MacAddress,
+ #[serde(flatten)]
+ position: Position,
+}
+
+/// Record information about an active device.
+#[derive(Clone, Debug, Serialize)]
+#[serde(untagged)]
+pub enum Event {
+ DeviceAdded {
+ category: Category,
+ mac_address: MacAddress,
+ #[serde(flatten)]
+ position: Position,
+ },
+ DeviceRemoved {
+ category: Category,
+ mac_address: MacAddress,
+ },
+ DeviceUpdated {
+ category: Category,
+ mac_address: MacAddress,
+ #[serde(flatten)]
+ position: Position,
+ },
+ NeighborUpdated {
+ source_category: Category,
+ source_mac_address: MacAddress,
+ destination_category: Category,
+ destination_mac_address: MacAddress,
+ distance: u16,
+ azimuth: i16,
+ elevation: i8,
+ },
+}
+
+/// Record the position of active devices for reference by the
+/// ranging estimator.
+#[derive(Clone)]
+struct Context {
+ devices: Arc<Mutex<HashMap<pica::Handle, DeviceInformation>>>,
+ events: broadcast::Sender<Event>,
+}
+
+impl Context {
+ fn new() -> Self {
+ let (events, _) = broadcast::channel(1024);
+ Context {
+ devices: Arc::new(Mutex::new(HashMap::new())),
+ events,
+ }
+ }
+
+ async fn handle_connection_events(
+ self,
+ mut events: broadcast::Receiver<PicaEvent>,
+ ) -> Result<()> {
+ loop {
+ match events.recv().await {
+ Ok(PicaEvent::Connected {
+ mac_address,
+ handle,
+ }) => {
+ let mut devices = self.devices.lock().unwrap();
+ devices.insert(
+ handle,
+ DeviceInformation {
+ category: Category::Uci,
+ mac_address,
+ position: Default::default(),
+ },
+ );
+ self.events
+ .send(Event::DeviceAdded {
+ category: Category::Uci,
+ mac_address,
+ position: Default::default(),
+ })
+ .unwrap();
+ }
+ Ok(PicaEvent::Disconnected {
+ mac_address,
+ handle,
+ }) => {
+ let mut devices = self.devices.lock().unwrap();
+ devices.remove(&handle);
+ self.events
+ .send(Event::DeviceRemoved {
+ category: Category::Uci,
+ mac_address,
+ })
+ .unwrap();
+ }
+ Err(err) => anyhow::bail!(err),
+ }
+ }
+ }
+
+ fn http_events(&self) -> Response<Body> {
+ let stream = BroadcastStream::new(self.events.subscribe()).map(|result| {
+ result.map(|event| {
+ format!(
+ "event: {}\ndata: {}\n\n",
+ event.name(),
+ serde_json::to_string(&event).unwrap()
+ )
+ })
+ });
+ Response::builder()
+ .header("content-type", "text/event-stream")
+ .body(Body::wrap_stream(stream))
+ .unwrap()
+ }
+
+ fn http_set_position(&self, mac_address: MacAddress, position: Position) -> Response<Body> {
+ log::info!("set-position({}, {})", mac_address, position);
+
+ let mut devices = self.devices.lock().unwrap();
+ let mut found_device = None;
+ for (_, device) in devices.iter_mut() {
+ if device.mac_address == mac_address {
+ device.position = position;
+ found_device = Some(device.clone());
+ break;
+ }
+ }
+
+ let Some(device) = found_device else {
+ return Response::builder()
+ .status(HttpStatusCode::NOT_FOUND)
+ .body("".into())
+ .unwrap();
+ };
+
+ self.events
+ .send(Event::DeviceUpdated {
+ category: device.category,
+ mac_address,
+ position,
+ })
+ .unwrap();
+
+ for other in devices.values() {
+ if other.mac_address != device.mac_address {
+ let local = device
+ .position
+ .compute_range_azimuth_elevation(&other.position);
+ let remote = other
+ .position
+ .compute_range_azimuth_elevation(&device.position);
+
+ assert!(local.0 == remote.0);
+
+ self.events
+ .send(Event::NeighborUpdated {
+ source_category: device.category,
+ source_mac_address: device.mac_address,
+ destination_category: other.category,
+ destination_mac_address: other.mac_address,
+ distance: local.0,
+ azimuth: local.1,
+ elevation: local.2,
+ })
+ .unwrap();
+
+ let _ = self
+ .events
+ .send(Event::NeighborUpdated {
+ source_category: other.category,
+ source_mac_address: other.mac_address,
+ destination_category: device.category,
+ destination_mac_address: device.mac_address,
+ distance: remote.0,
+ azimuth: remote.1,
+ elevation: remote.2,
+ })
+ .unwrap();
+ }
+ }
+
+ Response::builder()
+ .status(HttpStatusCode::OK)
+ .body("".into())
+ .unwrap()
+ }
+
+ async fn http_create_anchor(
+ &self,
+ mac_address: MacAddress,
+ position: Position,
+ cmd_tx: mpsc::Sender<PicaCommand>,
+ ) -> Response<Body> {
+ log::info!("create-anchor({}, {})", mac_address, position);
+
+ let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>();
+ cmd_tx
+ .send(PicaCommand::CreateAnchor(mac_address, rsp_tx))
+ .await
+ .unwrap();
+
+ let status = match rsp_rx.await {
+ Ok(Ok(handle)) => {
+ let mut devices = self.devices.lock().unwrap();
+ devices.insert(
+ handle,
+ DeviceInformation {
+ position,
+ mac_address,
+ category: Category::Anchor,
+ },
+ );
+ self.events
+ .send(Event::DeviceAdded {
+ category: Category::Anchor,
+ mac_address,
+ position,
+ })
+ .unwrap();
+ HttpStatusCode::OK
+ }
+ Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT,
+ Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND,
+ Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR,
+ };
+
+ Response::builder().status(status).body("".into()).unwrap()
+ }
+
+ async fn http_destroy_anchor(
+ &self,
+ mac_address: MacAddress,
+ cmd_tx: mpsc::Sender<PicaCommand>,
+ ) -> Response<Body> {
+ log::info!("destroy-anchor({})", mac_address);
+
+ let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>();
+ cmd_tx
+ .send(PicaCommand::DestroyAnchor(mac_address, rsp_tx))
+ .await
+ .unwrap();
+
+ let status = match rsp_rx.await {
+ Ok(Ok(handle)) => {
+ let mut devices = self.devices.lock().unwrap();
+ devices.remove(&handle);
+ self.events
+ .send(Event::DeviceRemoved {
+ category: Category::Anchor,
+ mac_address,
+ })
+ .unwrap();
+ HttpStatusCode::OK
+ }
+ Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT,
+ Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND,
+ Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR,
+ };
+
+ Response::builder().status(status).body("".into()).unwrap()
+ }
+
+ fn http_get_state(&self) -> Response<Body> {
+ log::info!("get-state()");
+
+ #[derive(Serialize)]
+ struct GetStateResponse {
+ devices: Vec<DeviceInformation>,
+ }
+
+ let devices = self.devices.lock().unwrap();
+ let response = GetStateResponse {
+ devices: devices.values().cloned().collect::<Vec<_>>(),
+ };
+ let body = serde_json::to_string(&response).unwrap();
+ Response::builder()
+ .status(HttpStatusCode::OK)
+ .body(body.into())
+ .unwrap()
+ }
+}
+
+impl pica::RangingEstimator for Context {
+ fn estimate(
+ &self,
+ left: &pica::Handle,
+ right: &pica::Handle,
+ ) -> anyhow::Result<pica::RangingMeasurement> {
+ let devices = self
+ .devices
+ .lock()
+ .map_err(|_| anyhow::anyhow!("cannot take lock"))?;
+ let left_pos = devices
+ .get(left)
+ .ok_or(anyhow::anyhow!("unknown position"))?
+ .position;
+ let right_pos = devices
+ .get(right)
+ .ok_or(anyhow::anyhow!("unknown position"))?
+ .position;
+ let (range, azimuth, elevation) = left_pos.compute_range_azimuth_elevation(&right_pos);
+ Ok(pica::RangingMeasurement {
+ range,
+ azimuth,
+ elevation,
+ })
+ }
+}
+
+#[derive(Deserialize)]
+struct PositionBody {
+ x: i16,
+ y: i16,
+ z: i16,
+ yaw: i16,
+ pitch: i8,
+ roll: i16,
+}
+
+macro_rules! position {
+ ($body: ident) => {
+ position!($body, false)
+ };
+ ($body: ident, $mandatory: ident) => {
+ match serde_json::from_slice::<PositionBody>(&$body) {
+ Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll),
+ Err(err) => {
+ if !$mandatory && err.classify() == SerdeErrorCategory::Eof {
+ Position::default()
+ } else {
+ let reason = format!("Error while deserializing position: {}", err);
+ log::error!("{}", reason);
+ return Ok(Response::builder().status(406).body(reason.into()).unwrap());
+ }
+ }
+ }
+ };
+}
+
+macro_rules! mac_address {
+ ($mac_address: ident) => {
+ match MacAddress::new($mac_address.to_string()) {
+ Ok(mac_address) => mac_address,
+ Err(err) => {
+ let reason = format!("Error mac_address: {}", err);
+ log::error!("{}", reason);
+ return Ok(Response::builder().status(406).body(reason.into()).unwrap());
+ }
+ }
+ };
+}
+
+impl Event {
+ fn name(&self) -> &'static str {
+ match self {
+ Event::DeviceAdded { .. } => "device-added",
+ Event::DeviceRemoved { .. } => "device-removed",
+ Event::DeviceUpdated { .. } => "device-updated",
+ Event::NeighborUpdated { .. } => "neighbor-updated",
+ }
+ }
+}
+
+async fn http_request(
+ mut req: Request<Body>,
+ cmd_tx: mpsc::Sender<PicaCommand>,
+ context: Context,
+) -> Result<Response<Body>, Infallible> {
+ let static_file = STATIC_FILES
+ .iter()
+ .find(|(path, _, _)| req.uri().path() == *path);
+
+ if let Some((_, mime, content)) = static_file {
+ return Ok(Response::builder()
+ .header("content-type", *mime)
+ .body((*content).into())
+ .unwrap());
+ }
+
+ let body = body::to_bytes(req.body_mut()).await.unwrap();
+ let response = match req
+ .uri_mut()
+ .path()
+ .trim_start_matches('/')
+ .split('/')
+ .collect::<Vec<_>>()[..]
+ {
+ ["events"] => context.http_events(),
+ ["init-uci-device", mac_address] => {
+ context.http_set_position(mac_address!(mac_address), position!(body))
+ }
+ ["set-position", mac_address] => {
+ context.http_set_position(mac_address!(mac_address), position!(body))
+ }
+ ["create-anchor", mac_address] => {
+ context
+ .http_create_anchor(mac_address!(mac_address), position!(body), cmd_tx)
+ .await
+ }
+ ["destroy-anchor", mac_address] => {
+ context
+ .http_destroy_anchor(mac_address!(mac_address), cmd_tx)
+ .await
+ }
+ ["get-state"] => context.http_get_state(),
+
+ _ => Response::builder()
+ .status(HttpStatusCode::NOT_FOUND)
+ .body("".into())
+ .unwrap(),
+ };
+
+ Ok(response)
+}
+
+async fn serve(context: Context, tx: mpsc::Sender<PicaCommand>, web_port: u16) -> Result<()> {
+ let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port);
+ let make_svc = make_service_fn(move |_conn| {
+ let tx = tx.clone();
+ let local_context = context.clone();
+ async move {
+ Ok::<_, Infallible>(service_fn(move |req| {
+ http_request(req, tx.clone(), local_context.clone())
+ }))
+ }
+ });
+
+ let server = Server::bind(&addr.into()).serve(make_svc);
+
+ log::info!("Pica: Web server started on http://0.0.0.0:{}", web_port);
+
+ server.await?;
+ Ok(())
+}
+
+async fn listen(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> {
+ let uci_socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, uci_port);
+ let uci_listener = TcpListener::bind(uci_socket).await?;
+ log::info!("Pica: Listening on: {}", uci_port);
+
+ loop {
+ let (socket, addr) = uci_listener.accept().await?;
+ log::info!("Uwb host addr: {}", addr);
+
+ let (read_half, write_half) = socket.into_split();
+ let stream = Box::pin(futures::stream::unfold(read_half, pica::packets::uci::read));
+ let sink = Box::pin(futures::sink::unfold(write_half, pica::packets::uci::write));
+
+ tx.send(PicaCommand::Connect(stream, sink))
+ .await
+ .map_err(|_| anyhow::anyhow!("pica command stream closed"))?
+ }
+}
+
+#[derive(Parser, Debug)]
+#[command(name = "pica", about = "Virtual UWB subsystem")]
+struct Args {
+ /// Output directory for storing .pcapng traces.
+ /// If provided, .pcapng traces of client connections are automatically
+ /// saved under the name `device-{handle}.pcapng`.
+ #[arg(short, long, value_name = "DIR")]
+ pcapng_dir: Option<PathBuf>,
+ /// Configure the TCP port for the UCI server.
+ #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_UCI_PORT)]
+ uci_port: u16,
+ /// Configure the HTTP port for the web interface.
+ #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_WEB_PORT)]
+ web_port: u16,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ assert_ne!(
+ args.uci_port, args.web_port,
+ "UCI port and WEB port must be different."
+ );
+
+ let context = Context::new();
+
+ let pica = Pica::new(Box::new(context.clone()), args.pcapng_dir);
+ let cmd_tx = pica.commands();
+ let events_rx = pica.events();
+
+ try_join!(
+ pica.run(),
+ listen(cmd_tx.clone(), args.uci_port),
+ serve(context.clone(), cmd_tx.clone(), args.web_port),
+ context.handle_connection_events(events_rx),
+ )?;
+
+ Ok(())
+}
diff --git a/src/position.rs b/src/bin/http-server/position.rs
index 4ce8531..4ce8531 100644
--- a/src/position.rs
+++ b/src/bin/http-server/position.rs
diff --git a/src/bin/server/mod.rs b/src/bin/main.rs
index 616ef5b..205550e 100644
--- a/src/bin/server/mod.rs
+++ b/src/bin/main.rs
@@ -12,16 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-extern crate bytes;
-extern crate num_derive;
-extern crate num_traits;
-extern crate thiserror;
-
-#[cfg(feature = "web")]
-mod web;
-
use anyhow::Result;
use clap::Parser;
+use env_logger::Env;
use pica::{Pica, PicaCommand};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
@@ -30,17 +23,24 @@ use tokio::sync::mpsc;
use tokio::try_join;
const DEFAULT_UCI_PORT: u16 = 7000;
-const DEFAULT_WEB_PORT: u16 = 3000;
-async fn accept_incoming(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> {
+async fn accept_incoming(cmd_tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> {
let uci_socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, uci_port);
let uci_listener = TcpListener::bind(uci_socket).await?;
- println!("Pica: Listening on: {}", uci_port);
+ log::info!("Pica: Listening on: {}", uci_port);
loop {
let (socket, addr) = uci_listener.accept().await?;
- println!("Uwb host addr: {}", addr);
- tx.send(PicaCommand::Connect(socket)).await?
+ log::info!("Uwb host addr: {}", addr);
+
+ let (read_half, write_half) = socket.into_split();
+ let stream = Box::pin(futures::stream::unfold(read_half, pica::packets::uci::read));
+ let sink = Box::pin(futures::sink::unfold(write_half, pica::packets::uci::write));
+
+ cmd_tx
+ .send(PicaCommand::Connect(stream, sink))
+ .await
+ .map_err(|_| anyhow::anyhow!("pica command stream closed"))?
}
}
@@ -55,32 +55,33 @@ struct Args {
/// Configure the TCP port for the UCI server.
#[arg(short, long, value_name = "UCI_PORT", default_value_t = DEFAULT_UCI_PORT)]
uci_port: u16,
- /// Configure the HTTP port for the web interface.
- #[arg(short, long, value_name = "WEB_PORT", default_value_t = DEFAULT_WEB_PORT)]
- web_port: u16,
+}
+
+struct MockRangingEstimator();
+
+/// The position cannot be communicated to the pica environment when
+/// using the default binary (HTTP interface not available).
+/// Thus the ranging estimator cannot produce any result.
+impl pica::RangingEstimator for MockRangingEstimator {
+ fn estimate(
+ &self,
+ _left: &pica::Handle,
+ _right: &pica::Handle,
+ ) -> Result<pica::RangingMeasurement> {
+ Err(anyhow::anyhow!("position not available"))
+ }
}
#[tokio::main]
async fn main() -> Result<()> {
- let args = Args::parse();
- assert_ne!(
- args.uci_port, args.web_port,
- "UCI port and Web port shall be different."
- );
+ env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init();
- let mut pica = Pica::new(args.pcapng_dir);
- let pica_tx = pica.tx();
- let pica_events = pica.events();
+ let args = Args::parse();
- #[cfg(feature = "web")]
- try_join!(
- accept_incoming(pica_tx.clone(), args.uci_port),
- pica.run(),
- web::serve(pica_tx, pica_events, args.web_port)
- )?;
+ let pica = Pica::new(Box::new(MockRangingEstimator()), args.pcapng_dir);
+ let commands = pica.commands();
- #[cfg(not(feature = "web"))]
- try_join!(accept_incoming(pica_tx.clone(), args.uci_port), pica.run(),)?;
+ try_join!(accept_incoming(commands.clone(), args.uci_port), pica.run(),)?;
Ok(())
}
diff --git a/src/bin/server/web.rs b/src/bin/server/web.rs
deleted file mode 100644
index 1481e54..0000000
--- a/src/bin/server/web.rs
+++ /dev/null
@@ -1,269 +0,0 @@
-// Copyright 2022 Google LLC
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// https://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::convert::Infallible;
-use std::net::{Ipv4Addr, SocketAddrV4};
-
-use anyhow::{Context, Result};
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode};
-use serde::{Deserialize, Serialize};
-use serde_json::error::Category as SerdeErrorCategory;
-use tokio::sync::{broadcast, mpsc, oneshot};
-use tokio_stream::{wrappers::BroadcastStream, StreamExt};
-
-use pica::{
- Category, MacAddress, PicaCommand, PicaCommandError, PicaCommandStatus, PicaEvent, Position,
-};
-use PicaEvent::{DeviceAdded, DeviceRemoved, DeviceUpdated, NeighborUpdated};
-
-const STATIC_FILES: &[(&str, &str, &str)] = &[
- ("/", "text/html", include_str!("../../../static/index.html")),
- (
- "/openapi",
- "text/html",
- include_str!("../../../static/openapi.html"),
- ),
- (
- "/openapi.yaml",
- "text/yaml",
- include_str!("../../../static/openapi.yaml"),
- ),
- (
- "/src/components/Map.js",
- "application/javascript",
- include_str!("../../../static/src/components/Map.js"),
- ),
- (
- "/src/components/DeviceInfo.js",
- "application/javascript",
- include_str!("../../../static/src/components/DeviceInfo.js"),
- ),
- (
- "/src/components/Orientation.js",
- "application/javascript",
- include_str!("../../../static/src/components/Orientation.js"),
- ),
-];
-
-#[derive(Deserialize)]
-struct PositionBody {
- x: i16,
- y: i16,
- z: i16,
- yaw: i16,
- pitch: i8,
- roll: i16,
-}
-
-macro_rules! position {
- ($body: ident) => {
- position!($body, false)
- };
- ($body: ident, $mandatory: ident) => {
- match serde_json::from_slice::<PositionBody>(&$body) {
- Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll),
- Err(err) => {
- if !$mandatory && err.classify() == SerdeErrorCategory::Eof {
- Position::default()
- } else {
- let reason = format!("Error while deserializing position: {}", err);
- println!("{}", reason);
- return Ok(Response::builder().status(406).body(reason.into()).unwrap());
- }
- }
- }
- };
-}
-
-macro_rules! mac_address {
- ($mac_address: ident) => {
- match MacAddress::new($mac_address.to_string()) {
- Ok(mac_address) => mac_address,
- Err(err) => {
- let reason = format!("Error mac_address: {}", err);
- println!("{}", reason);
- return Ok(Response::builder().status(406).body(reason.into()).unwrap());
- }
- }
- };
-}
-
-#[derive(Debug, Serialize, Clone)]
-struct Device {
- pub category: Category,
- pub mac_address: String,
- #[serde(flatten)]
- pub position: Position,
-}
-
-fn event_name(event: &PicaEvent) -> &'static str {
- match event {
- DeviceAdded { .. } => "device-added",
- DeviceRemoved { .. } => "device-removed",
- DeviceUpdated { .. } => "device-updated",
- NeighborUpdated { .. } => "neighbor-updated",
- }
-}
-
-async fn handle(
- mut req: Request<Body>,
- tx: mpsc::Sender<PicaCommand>,
- events: broadcast::Receiver<PicaEvent>,
-) -> Result<Response<Body>, Infallible> {
- let static_file = STATIC_FILES
- .iter()
- .find(|(path, _, _)| req.uri().path() == *path);
-
- if let Some((_, mime, content)) = static_file {
- return Ok(Response::builder()
- .header("content-type", *mime)
- .body((*content).into())
- .unwrap());
- }
-
- let body = body::to_bytes(req.body_mut()).await.unwrap();
- let (pica_cmd_rsp_tx, pica_cmd_rsp_rx) = oneshot::channel::<PicaCommandStatus>();
-
- let send_cmd = |pica_cmd| async {
- println!("PicaCommand: {}", pica_cmd);
- tx.send(pica_cmd).await.unwrap();
- let (status, description) = match pica_cmd_rsp_rx.await {
- Ok(Ok(_)) => (HttpStatusCode::OK, "success".into()),
- Ok(Err(err)) => (
- match err {
- PicaCommandError::DeviceAlreadyExists(_) => HttpStatusCode::CONFLICT,
- PicaCommandError::DeviceNotFound(_) => HttpStatusCode::NOT_FOUND,
- },
- format!("{}", err),
- ),
- Err(err) => (
- HttpStatusCode::INTERNAL_SERVER_ERROR,
- format!("Error getting command response: {}", err),
- ),
- };
- println!(" status: {}, {}", status, description);
- Response::builder()
- .status(status)
- .body(description.into())
- .unwrap()
- };
-
- match req
- .uri_mut()
- .path()
- .trim_start_matches('/')
- .split('/')
- .collect::<Vec<_>>()[..]
- {
- ["events"] => {
- let stream = BroadcastStream::new(events).map(|result| {
- result.map(|event| {
- format!(
- "event: {}\ndata: {}\n\n",
- event_name(&event),
- serde_json::to_string(&event).unwrap()
- )
- })
- });
- return Ok(Response::builder()
- .header("content-type", "text/event-stream")
- .body(Body::wrap_stream(stream))
- .unwrap());
- }
- ["init-uci-device", mac_address] => {
- return Ok(send_cmd(PicaCommand::InitUciDevice(
- mac_address!(mac_address),
- position!(body),
- pica_cmd_rsp_tx,
- ))
- .await);
- }
- ["set-position", mac_address] => {
- return Ok(send_cmd(PicaCommand::SetPosition(
- mac_address!(mac_address),
- position!(body),
- pica_cmd_rsp_tx,
- ))
- .await);
- }
- ["create-anchor", mac_address] => {
- return Ok(send_cmd(PicaCommand::CreateAnchor(
- mac_address!(mac_address),
- position!(body),
- pica_cmd_rsp_tx,
- ))
- .await);
- }
- ["destroy-anchor", mac_address] => {
- return Ok(send_cmd(PicaCommand::DestroyAnchor(
- mac_address!(mac_address),
- pica_cmd_rsp_tx,
- ))
- .await);
- }
- ["get-state"] => {
- #[derive(Serialize)]
- struct GetStateResponse {
- devices: Vec<Device>,
- }
- println!("PicaCommand: GetState");
- let (state_tx, state_rx) = oneshot::channel::<Vec<_>>();
- tx.send(PicaCommand::GetState(state_tx)).await.unwrap();
- let devices = match state_rx.await {
- Ok(devices) => GetStateResponse {
- devices: devices
- .into_iter()
- .map(|(category, mac_address, position)| Device {
- category,
- mac_address: mac_address.into(),
- position,
- })
- .collect(),
- },
- Err(_) => GetStateResponse { devices: vec![] },
- };
- let body = serde_json::to_string(&devices).unwrap();
- return Ok(Response::builder().status(200).body(body.into()).unwrap());
- }
-
- _ => (),
- }
-
- Ok(Response::builder().status(404).body("".into()).unwrap())
-}
-
-pub async fn serve(
- tx: mpsc::Sender<PicaCommand>,
- events: broadcast::Sender<PicaEvent>,
- web_port: u16,
-) -> Result<()> {
- let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port);
-
- let make_svc = make_service_fn(move |_conn| {
- let tx = tx.clone();
- let events = events.clone();
- async move {
- Ok::<_, Infallible>(service_fn(move |req| {
- handle(req, tx.clone(), events.subscribe())
- }))
- }
- });
-
- let server = Server::bind(&addr.into()).serve(make_svc);
-
- println!("Pica: Web server started on http://0.0.0.0:{}", web_port);
-
- server.await.context("Web Server Error")
-}
diff --git a/src/device.rs b/src/device.rs
index e1cbfcb..48d2bd2 100644
--- a/src/device.rs
+++ b/src/device.rs
@@ -13,7 +13,6 @@
// limitations under the License.
use crate::packets::uci::*;
-use crate::position::Position;
use crate::MacAddress;
use crate::PicaCommand;
@@ -25,8 +24,10 @@ use tokio::sync::mpsc;
use tokio::time;
use super::session::{Session, MAX_SESSION};
+use super::UciPacket;
pub const MAX_DEVICE: usize = 4;
+
const UCI_VERSION: u16 = 0x0002; // Version 2.0
const MAC_VERSION: u16 = 0x3001; // Version 1.3.0
const PHY_VERSION: u16 = 0x3001; // Version 1.3.0
@@ -72,13 +73,12 @@ pub const DEFAULT_CAPS_INFO: &[(CapTlvType, &[u8])] = &[
];
pub struct Device {
- handle: usize,
+ pub handle: usize,
pub mac_address: MacAddress,
- pub position: Position,
/// [UCI] 5. UWBS Device State Machine
state: DeviceState,
sessions: HashMap<u32, Session>,
- pub tx: mpsc::Sender<ControlPacket>,
+ pub tx: mpsc::UnboundedSender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
config: HashMap<DeviceConfigId, Vec<u8>>,
country_code: [u8; 2],
@@ -88,18 +88,14 @@ pub struct Device {
impl Device {
pub fn new(
- device_handle: usize,
- tx: mpsc::Sender<ControlPacket>,
+ handle: usize,
+ mac_address: MacAddress,
+ tx: mpsc::UnboundedSender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
) -> Self {
- let mac_address = {
- let handle = device_handle as u16;
- MacAddress::Short(handle.to_be_bytes())
- };
Device {
- handle: device_handle,
+ handle,
mac_address,
- position: Position::default(),
state: DeviceState::DeviceStateError, // Will be overwitten
sessions: Default::default(),
tx,
@@ -122,7 +118,6 @@ impl Device {
tokio::spawn(async move {
time::sleep(Duration::from_millis(5)).await;
tx.send(DeviceStatusNtfBuilder { device_state }.build().into())
- .await
.unwrap()
});
}
@@ -131,25 +126,69 @@ impl Device {
self.set_state(DeviceState::DeviceStateReady);
}
- pub fn get_session(&self, session_id: u32) -> Option<&Session> {
+ pub fn session(&self, session_id: u32) -> Option<&Session> {
self.sessions.get(&session_id)
}
- pub fn get_session_mut(&mut self, session_id: u32) -> Option<&mut Session> {
+ pub fn session_mut(&mut self, session_id: u32) -> Option<&mut Session> {
self.sessions.get_mut(&session_id)
}
+ pub fn can_start_ranging(&self, peer_session: &Session, session_id: u32) -> bool {
+ match self.session(session_id) {
+ Some(session) => {
+ session.session_state() == SessionState::SessionStateActive
+ && session
+ .app_config
+ .is_compatible_for_ranging(&peer_session.app_config)
+ }
+ None => false,
+ }
+ }
+
+ pub fn can_start_data_transfer(&self, session_id: u32) -> bool {
+ match self.session(session_id) {
+ Some(session) => {
+ session.session_state() == SessionState::SessionStateActive
+ && session.session_type() == SessionType::FiraRangingAndInBandDataSession
+ && session.app_config.can_start_data_transfer()
+ }
+ None => false,
+ }
+ }
+
+ pub fn can_receive_data_transfer(&self, session_id: u32) -> bool {
+ match self.session(session_id) {
+ Some(session) => {
+ session.session_state() == SessionState::SessionStateActive
+ && session.session_type() == SessionType::FiraRangingAndInBandDataSession
+ && session.app_config.can_receive_data_transfer()
+ }
+ None => false,
+ }
+ }
+
+ // Send a response or notification to the Host.
+ fn send_control(&mut self, packet: impl Into<Vec<u8>>) {
+ let _ = self.tx.send(packet.into());
+ }
+
// The fira norm specify to send a response, then reset, then
// send a notification once the reset is done
fn command_device_reset(&mut self, cmd: DeviceResetCmd) -> DeviceResetRsp {
let reset_config = cmd.get_reset_config();
- println!("[{}] DeviceReset", self.handle);
- println!(" reset_config={:?}", reset_config);
+ log::debug!("[{}] DeviceReset", self.handle);
+ log::debug!(" reset_config={:?}", reset_config);
let status = match reset_config {
ResetConfig::UwbsReset => StatusCode::UciStatusOk,
};
- *self = Device::new(self.handle, self.tx.clone(), self.pica_tx.clone());
+ *self = Device::new(
+ self.handle,
+ self.mac_address,
+ self.tx.clone(),
+ self.pica_tx.clone(),
+ );
self.init();
DeviceResetRspBuilder { status }.build()
@@ -157,7 +196,7 @@ impl Device {
fn command_get_device_info(&self, _cmd: GetDeviceInfoCmd) -> GetDeviceInfoRsp {
// TODO: Implement a fancy build time state machine instead of crash at runtime
- println!("[{}] GetDeviceInfo", self.handle);
+ log::debug!("[{}] GetDeviceInfo", self.handle);
assert_eq!(self.state, DeviceState::DeviceStateReady);
GetDeviceInfoRspBuilder {
status: StatusCode::UciStatusOk,
@@ -171,7 +210,7 @@ impl Device {
}
pub fn command_get_caps_info(&self, _cmd: GetCapsInfoCmd) -> GetCapsInfoRsp {
- println!("[{}] GetCapsInfo", self.handle);
+ log::debug!("[{}] GetCapsInfo", self.handle);
let caps = DEFAULT_CAPS_INFO
.iter()
@@ -189,7 +228,7 @@ impl Device {
}
pub fn command_set_config(&mut self, cmd: SetConfigCmd) -> SetConfigRsp {
- println!("[{}] SetConfig", self.handle);
+ log::debug!("[{}] SetConfig", self.handle);
assert_eq!(self.state, DeviceState::DeviceStateReady); // UCI 6.3
let (valid_parameters, invalid_config_status) = cmd.get_tlvs().iter().fold(
@@ -217,7 +256,7 @@ impl Device {
}
pub fn command_get_config(&self, cmd: GetConfigCmd) -> GetConfigRsp {
- println!("[{}] GetConfig", self.handle);
+ log::debug!("[{}] GetConfig", self.handle);
// TODO: do this config shall be set on device reset
let ids = cmd.get_cfg_id();
@@ -240,7 +279,7 @@ impl Device {
v: Vec::new(),
}),
},
- Err(_) => println!("Failed to parse config id: {:?}", id),
+ Err(_) => log::error!("Failed to parse config id: {:?}", id),
}
(valid_parameters, invalid_parameters)
@@ -264,9 +303,9 @@ impl Device {
let session_id = cmd.get_session_id();
let session_type = cmd.get_session_type();
- println!("[{}] Session init", self.handle);
- println!(" session_id=0x{:x}", session_id);
- println!(" session_type={:?}", session_type);
+ log::debug!("[{}] Session init", self.handle);
+ log::debug!(" session_id=0x{:x}", session_id);
+ log::debug!(" session_type={:?}", session_type);
let status = if self.sessions.len() >= MAX_SESSION {
StatusCode::UciStatusMaxSessionsExceeded
@@ -284,7 +323,7 @@ impl Device {
Some(_) => StatusCode::UciStatusSessionDuplicate,
None => {
// Should not fail
- self.get_session_mut(session_id).unwrap().init();
+ self.session_mut(session_id).unwrap().init();
StatusCode::UciStatusOk
}
}
@@ -295,8 +334,8 @@ impl Device {
fn command_session_deinit(&mut self, cmd: SessionDeinitCmd) -> SessionDeinitRsp {
let session_id = cmd.get_session_token();
- println!("[{}] Session deinit", self.handle);
- println!(" session_id=0x{:x}", session_id);
+ log::debug!("[{}] Session deinit", self.handle);
+ log::debug!(" session_id=0x{:x}", session_id);
let status = match self.sessions.get_mut(&session_id) {
Some(session) => {
@@ -315,7 +354,7 @@ impl Device {
}
fn command_session_get_count(&self, _cmd: SessionGetCountCmd) -> SessionGetCountRsp {
- println!("[{}] Session get count", self.handle);
+ log::debug!("[{}] Session get count", self.handle);
SessionGetCountRspBuilder {
status: StatusCode::UciStatusOk,
@@ -329,8 +368,8 @@ impl Device {
cmd: AndroidSetCountryCodeCmd,
) -> AndroidSetCountryCodeRsp {
let country_code = *cmd.get_country_code();
- println!("[{}] Set country code", self.handle);
- println!(" country_code={},{}", country_code[0], country_code[1]);
+ log::debug!("[{}] Set country code", self.handle);
+ log::debug!(" country_code={},{}", country_code[0], country_code[1]);
self.country_code = country_code;
AndroidSetCountryCodeRspBuilder {
@@ -343,7 +382,7 @@ impl Device {
&mut self,
_cmd: AndroidGetPowerStatsCmd,
) -> AndroidGetPowerStatsRsp {
- println!("[{}] Get power stats", self.handle);
+ log::debug!("[{}] Get power stats", self.handle);
// TODO
AndroidGetPowerStatsRspBuilder {
@@ -359,10 +398,11 @@ impl Device {
}
pub fn data_message_snd(&mut self, data: DataPacket) -> SessionControlNotification {
+ log::debug!("[{}] data_message_send", self.handle);
match data.specialize() {
DataPacketChild::DataMessageSnd(data_msg_snd) => {
let session_token = data_msg_snd.get_session_handle();
- if let Some(session) = self.get_session_mut(session_token) {
+ if let Some(session) = self.session_mut(session_token) {
session.data_message_snd(data_msg_snd)
} else {
DataTransferStatusNtfBuilder {
@@ -393,7 +433,7 @@ impl Device {
}
}
- pub fn command(&mut self, cmd: UciCommand) -> UciResponse {
+ fn receive_command(&mut self, cmd: UciCommand) -> UciResponse {
match cmd.specialize() {
// Handle commands for this device
UciCommandChild::CoreCommand(core_command) => match core_command.specialize() {
@@ -406,7 +446,6 @@ impl Device {
},
// Handle commands for session management
UciCommandChild::SessionConfigCommand(session_command) => {
- // Session commands directly handled at Device level
match session_command.specialize() {
SessionConfigCommandChild::SessionInitCmd(cmd) => {
return self.command_session_init(cmd).into();
@@ -435,7 +474,7 @@ impl Device {
_ => panic!("Unsupported session command type"),
};
- if let Some(session) = self.get_session_mut(session_id) {
+ if let Some(session) = self.session_mut(session_id) {
// There is a session matching the session_id in the command
// Pass the command through
match session_command.specialize() {
@@ -486,7 +525,7 @@ impl Device {
}
UciCommandChild::SessionControlCommand(ranging_command) => {
let session_id = ranging_command.get_session_id();
- if let Some(session) = self.get_session_mut(session_id) {
+ if let Some(session) = self.session_mut(session_id) {
// Forward to the proper session
let response = session.ranging_command(ranging_command);
match response.specialize() {
@@ -577,4 +616,59 @@ impl Device {
.build(),
}
}
-} \ No newline at end of file
+
+ pub fn receive_packet(&mut self, packet: Vec<u8>) {
+ let mt = parse_message_type(packet[0]);
+ match mt {
+ MessageType::Data => match DataPacket::parse(&packet) {
+ Ok(packet) => {
+ let notification = self.data_message_snd(packet);
+ self.send_control(notification)
+ }
+ Err(err) => log::error!("failed to parse incoming Data packet: {}", err),
+ },
+ MessageType::Command => {
+ match ControlPacket::parse(&packet) {
+ // Parsing error. Determine what error response should be
+ // returned to the host:
+ // - response and notifications are ignored, no response
+ // - if the group id is not known, STATUS_UNKNOWN_GID,
+ // - otherwise, and to simplify the code, STATUS_UNKNOWN_OID is
+ // always returned. That means that malformed commands
+ // get the same status code, instead of
+ // STATUS_SYNTAX_ERROR.
+ Err(_) => {
+ let group_id = packet[0] & 0xf;
+ let opcode_id = packet[1] & 0x3f;
+
+ let status = if GroupId::try_from(group_id).is_ok() {
+ StatusCode::UciStatusUnknownOid
+ } else {
+ StatusCode::UciStatusUnknownGid
+ };
+ // The PDL generated code cannot be used to generate
+ // responses with invalid group identifiers.
+ let response = vec![
+ (u8::from(MessageType::Response) << 5) | group_id,
+ opcode_id,
+ 0,
+ 1,
+ status.into(),
+ ];
+ self.send_control(response)
+ }
+
+ // Parsing success, ignore non command packets.
+ Ok(cmd) => {
+ let response = self.receive_command(cmd.try_into().unwrap());
+ self.send_control(response)
+ }
+ }
+ }
+
+ // Message types for notifications and responses ignored
+ // by the controller.
+ _ => log::warn!("received unexpected packet of MT {:?}", mt),
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index fddcd81..2927d95 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,23 +13,17 @@
// limitations under the License.
use anyhow::Result;
-use bytes::Bytes;
-use pdl_runtime::Packet;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
use std::path::PathBuf;
+use std::pin::Pin;
use thiserror::Error;
-use tokio::net::TcpStream;
use tokio::sync::{broadcast, mpsc, oneshot};
+pub mod packets;
mod pcapng;
-mod position;
-pub use position::Position;
-
-mod packets;
-
use packets::uci::StatusCode as UciStatusCode;
use packets::uci::*;
@@ -37,14 +31,54 @@ mod device;
use device::{Device, MAX_DEVICE};
mod session;
-use session::{AppConfig, MAX_SESSION};
+use session::MAX_SESSION;
mod mac_address;
pub use mac_address::MacAddress;
use crate::session::RangeDataNtfConfig;
-pub type PicaCommandStatus = Result<(), PicaCommandError>;
+pub type UciPacket = Vec<u8>;
+pub type UciStream = Pin<Box<dyn futures::stream::Stream<Item = Vec<u8>> + Send>>;
+pub type UciSink = Pin<Box<dyn futures::sink::Sink<Vec<u8>, Error = anyhow::Error> + Send>>;
+
+/// Handle allocated for created devices or anchors.
+/// The handle is unique across the lifetime of the Pica context
+/// and callers may assume that one handle is never reused.
+pub type Handle = usize;
+
+/// Ranging measurement produced by a ranging estimator.
+#[derive(Clone, Copy, Default, Debug)]
+pub struct RangingMeasurement {
+ pub range: u16,
+ pub azimuth: i16,
+ pub elevation: i8,
+}
+
+/// Trait matching the capabilities of a ranging estimator.
+/// The estimator manages the position of the devices, and chooses
+/// the algorithm used to generate the ranging measurements.
+pub trait RangingEstimator: Send + Sync {
+ /// Evaluate the ranging measurement for the two input devices
+ /// identified by their respective handle. The result is a triplet
+ /// containing the range, azimuth, and elevation of the right device
+ /// relative to the left device.
+ fn estimate(&self, left: &Handle, right: &Handle) -> Result<RangingMeasurement>;
+}
+
+/// Pica emulation environment.
+/// All the devices added to this environment are emulated as if they were
+/// from the same physical space.
+pub struct Pica {
+ counter: usize,
+ devices: HashMap<Handle, Device>,
+ anchors: HashMap<MacAddress, Anchor>,
+ command_rx: Option<mpsc::Receiver<PicaCommand>>,
+ command_tx: mpsc::Sender<PicaCommand>,
+ event_tx: broadcast::Sender<PicaEvent>,
+ ranging_estimator: Box<dyn RangingEstimator>,
+ pcapng_dir: Option<PathBuf>,
+}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum PicaCommandError {
@@ -54,46 +88,39 @@ pub enum PicaCommandError {
DeviceNotFound(MacAddress),
}
-#[derive(Debug)]
pub enum PicaCommand {
// Connect a new device.
- Connect(TcpStream),
+ Connect(UciStream, UciSink),
// Disconnect the selected device.
Disconnect(usize),
// Execute ranging command for selected device and session.
Ranging(usize, u32),
// Send an in-band request to stop ranging to a peer controlee identified by address and session id.
StopRanging(MacAddress, u32),
- // Execute data message send for selected device and data.
- UciData(usize, DataPacket),
- // Execute UCI command received for selected device.
- UciCommand(usize, UciCommand),
- // Init Uci Device
- InitUciDevice(MacAddress, Position, oneshot::Sender<PicaCommandStatus>),
- // Set Position
- SetPosition(MacAddress, Position, oneshot::Sender<PicaCommandStatus>),
+ // UCI packet received for the selected device.
+ UciPacket(usize, Vec<u8>),
// Create Anchor
- CreateAnchor(MacAddress, Position, oneshot::Sender<PicaCommandStatus>),
+ CreateAnchor(
+ MacAddress,
+ oneshot::Sender<Result<Handle, PicaCommandError>>,
+ ),
// Destroy Anchor
- DestroyAnchor(MacAddress, oneshot::Sender<PicaCommandStatus>),
- // Get State
- GetState(oneshot::Sender<Vec<(Category, MacAddress, Position)>>),
+ DestroyAnchor(
+ MacAddress,
+ oneshot::Sender<Result<Handle, PicaCommandError>>,
+ ),
}
impl Display for PicaCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let cmd = match self {
- PicaCommand::Connect(_) => "Connect",
+ PicaCommand::Connect(_, _) => "Connect",
PicaCommand::Disconnect(_) => "Disconnect",
PicaCommand::Ranging(_, _) => "Ranging",
PicaCommand::StopRanging(_, _) => "StopRanging",
- PicaCommand::UciData(_, _) => "UciData",
- PicaCommand::UciCommand(_, _) => "UciCommand",
- PicaCommand::InitUciDevice(_, _, _) => "InitUciDevice",
- PicaCommand::SetPosition(_, _, _) => "SetPosition",
- PicaCommand::CreateAnchor(_, _, _) => "CreateAnchor",
+ PicaCommand::UciPacket(_, _) => "UciPacket",
+ PicaCommand::CreateAnchor(_, _) => "CreateAnchor",
PicaCommand::DestroyAnchor(_, _) => "DestroyAnchor",
- PicaCommand::GetState(_) => "GetState",
};
write!(f, "{}", cmd)
}
@@ -102,33 +129,15 @@ impl Display for PicaCommand {
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]
pub enum PicaEvent {
- // A Device was added
- DeviceAdded {
- category: Category,
- mac_address: MacAddress,
- #[serde(flatten)]
- position: Position,
- },
- // A Device was removed
- DeviceRemoved {
- category: Category,
+ // A UCI connection was added
+ Connected {
+ handle: Handle,
mac_address: MacAddress,
},
- // A Device position has changed
- DeviceUpdated {
- category: Category,
+ // A UCI connection was lost
+ Disconnected {
+ handle: Handle,
mac_address: MacAddress,
- #[serde(flatten)]
- position: Position,
- },
- NeighborUpdated {
- source_category: Category,
- source_mac_address: MacAddress,
- destination_category: Category,
- destination_mac_address: MacAddress,
- distance: u16,
- azimuth: i16,
- elevation: i8,
},
}
@@ -140,99 +149,29 @@ pub enum Category {
#[derive(Debug, Clone, Copy)]
struct Anchor {
+ handle: Handle,
+ #[allow(unused)]
mac_address: MacAddress,
- position: Position,
-}
-
-pub struct Pica {
- devices: HashMap<usize, Device>,
- anchors: HashMap<MacAddress, Anchor>,
- counter: usize,
- rx: mpsc::Receiver<PicaCommand>,
- tx: mpsc::Sender<PicaCommand>,
- event_tx: broadcast::Sender<PicaEvent>,
- pcapng_dir: Option<PathBuf>,
-}
-
-/// Result of UCI packet parsing.
-enum UciParseResult {
- UciCommand(UciCommand),
- UciData(DataPacket),
- Err(Bytes),
- Skip,
-}
-
-/// Parse incoming UCI packets.
-/// Handle parsing errors by crafting a suitable error response packet.
-fn parse_uci_packet(bytes: &[u8]) -> UciParseResult {
- let message_type = parse_message_type(bytes[0]);
- match message_type {
- MessageType::Data => match DataPacket::parse(bytes) {
- Ok(packet) => UciParseResult::UciData(packet),
- Err(_) => UciParseResult::Skip,
- },
- _ => {
- match ControlPacket::parse(bytes) {
- // Parsing error. Determine what error response should be
- // returned to the host:
- // - response and notifications are ignored, no response
- // - if the group id is not known, STATUS_UNKNOWN_GID,
- // - otherwise, and to simplify the code, STATUS_UNKNOWN_OID is
- // always returned. That means that malformed commands
- // get the same status code, instead of
- // STATUS_SYNTAX_ERROR.
- Err(_) => {
- let group_id = bytes[0] & 0xf;
- let opcode_id = bytes[1] & 0x3f;
-
- let status = match (message_type, GroupId::try_from(group_id)) {
- (MessageType::Command, Ok(_)) => UciStatusCode::UciStatusUnknownOid,
- (MessageType::Command, Err(_)) => UciStatusCode::UciStatusUnknownGid,
- _ => return UciParseResult::Skip,
- };
- // The PDL generated code cannot be used to generate
- // responses with invalid group identifiers.
- let response = vec![
- (u8::from(MessageType::Response) << 5) | group_id,
- opcode_id,
- 0,
- 1,
- status.into(),
- ];
- UciParseResult::Err(response.into())
- }
-
- // Parsing success, ignore non command packets.
- Ok(packet) => {
- if let Ok(cmd) = packet.try_into() {
- UciParseResult::UciCommand(cmd)
- } else {
- UciParseResult::Skip
- }
- }
- }
- }
- }
}
fn make_measurement(
mac_address: &MacAddress,
- local: (u16, i16, i8),
- remote: (u16, i16, i8),
+ local: RangingMeasurement,
+ remote: RangingMeasurement,
) -> ShortAddressTwoWayRangingMeasurement {
if let MacAddress::Short(address) = mac_address {
ShortAddressTwoWayRangingMeasurement {
mac_address: u16::from_le_bytes(*address),
status: UciStatusCode::UciStatusOk,
nlos: 0, // in Line Of Sight
- distance: local.0,
- aoa_azimuth: local.1 as u16,
+ distance: local.range,
+ aoa_azimuth: local.azimuth as u16,
aoa_azimuth_fom: 100, // Yup, pretty sure about this
- aoa_elevation: local.2 as u16,
+ aoa_elevation: local.elevation as u16,
aoa_elevation_fom: 100, // Yup, pretty sure about this
- aoa_destination_azimuth: remote.1 as u16,
+ aoa_destination_azimuth: remote.azimuth as u16,
aoa_destination_azimuth_fom: 100,
- aoa_destination_elevation: remote.2 as u16,
+ aoa_destination_elevation: remote.elevation as u16,
aoa_destination_elevation_fom: 100,
slot_index: 0,
rssi: u8::MAX,
@@ -243,26 +182,27 @@ fn make_measurement(
}
impl Pica {
- pub fn new(pcapng_dir: Option<PathBuf>) -> Self {
- let (tx, rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE);
+ pub fn new(ranging_estimator: Box<dyn RangingEstimator>, pcapng_dir: Option<PathBuf>) -> Self {
+ let (command_tx, command_rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE);
let (event_tx, _) = broadcast::channel(16);
Pica {
devices: HashMap::new(),
anchors: HashMap::new(),
counter: 0,
- rx,
- tx,
+ command_rx: Some(command_rx),
+ command_tx,
event_tx,
+ ranging_estimator,
pcapng_dir,
}
}
- pub fn events(&self) -> broadcast::Sender<PicaEvent> {
- self.event_tx.clone()
+ pub fn events(&self) -> broadcast::Receiver<PicaEvent> {
+ self.event_tx.subscribe()
}
- pub fn tx(&self) -> mpsc::Sender<PicaCommand> {
- self.tx.clone()
+ pub fn commands(&self) -> mpsc::Sender<PicaCommand> {
+ self.command_tx.clone()
}
fn get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device> {
@@ -287,177 +227,254 @@ impl Pica {
}
}
- fn get_device_mut_by_mac(&mut self, mac_address: MacAddress) -> Option<&mut Device> {
- self.devices
- .values_mut()
- .find(|d| d.mac_address == mac_address)
+ fn send_event(&self, event: PicaEvent) {
+ // An error here means that we have
+ // no receivers, so ignore it
+ let _ = self.event_tx.send(event);
}
- fn get_device_by_mac(
- &self,
- mac_address: &MacAddress,
- local_app_config: &AppConfig,
- session_id: u32,
- ) -> Option<&Device> {
- self.devices.values().find(|device| {
- if let Some(session) = device.get_session(session_id) {
- session.app_config.device_mac_address == *mac_address
- && local_app_config.can_start_ranging_with_peer(&session.app_config)
- && session.session_state() == SessionState::SessionStateActive
- } else {
- false
- }
- })
- }
+ /// Handle an incoming stream of UCI packets.
+ /// Reassemble control packets when fragmented, data packets are unmodified.
+ async fn read_routine(
+ mut uci_stream: impl futures::stream::Stream<Item = Vec<u8>> + Unpin,
+ cmd_tx: mpsc::Sender<PicaCommand>,
+ handle: Handle,
+ pcapng_file: Option<&pcapng::File>,
+ ) -> anyhow::Result<()> {
+ use futures::stream::StreamExt;
- fn get_device_mut_by_mac_and_session_id(
- &mut self,
- mac_address: &MacAddress,
- session_id: u32,
- ) -> Option<&mut Device> {
- self.devices.values_mut().find(|device| {
- if let Some(session) = device.get_session(session_id) {
- session.app_config.device_mac_address == *mac_address
- && session.session_state() == SessionState::SessionStateActive
- } else {
- false
+ loop {
+ let mut complete_packet: Option<Vec<u8>> = None;
+ loop {
+ let packet = uci_stream
+ .next()
+ .await
+ .ok_or(anyhow::anyhow!("input packet stream closed"))?;
+ let header =
+ packets::uci::CommonPacketHeader::parse(&packet[0..COMMON_HEADER_SIZE])?;
+
+ if let Some(file) = pcapng_file {
+ file.write(&packet, pcapng::Direction::Tx)?;
+ }
+
+ match &mut complete_packet {
+ Some(complete_packet) => {
+ complete_packet.extend_from_slice(&packet[HEADER_SIZE..])
+ }
+ None => complete_packet = Some(packet),
+ }
+
+ if header.get_pbf() == packets::uci::PacketBoundaryFlag::Complete
+ || header.get_mt() == packets::uci::MessageType::Data
+ {
+ break;
+ }
}
- })
+
+ cmd_tx
+ .send(PicaCommand::UciPacket(handle, complete_packet.unwrap()))
+ .await
+ .unwrap()
+ }
}
- fn send_event(&self, event: PicaEvent) {
- // An error here means that we have
- // no receivers, so ignore it
- let _ = self.event_tx.send(event);
+ /// Segment a stream of UCI packets.
+ async fn write_routine(
+ mut uci_sink: impl futures::sink::Sink<Vec<u8>> + Unpin,
+ mut packet_rx: mpsc::UnboundedReceiver<UciPacket>,
+ _handle: Handle,
+ pcapng_file: Option<&pcapng::File>,
+ ) -> anyhow::Result<()> {
+ use futures::sink::SinkExt;
+
+ loop {
+ let complete_packet = packet_rx
+ .recv()
+ .await
+ .ok_or(anyhow::anyhow!("output packet stream closed"))?;
+ let mut offset = HEADER_SIZE;
+ let mt = parse_message_type(complete_packet[0]);
+
+ while offset < complete_packet.len() {
+ let remaining_length = complete_packet.len() - offset;
+ let fragment_length = std::cmp::min(
+ remaining_length,
+ if mt == MessageType::Data {
+ MAX_DATA_PACKET_PAYLOAD_SIZE
+ } else {
+ MAX_CTRL_PACKET_PAYLOAD_SIZE
+ },
+ );
+ let pbf = if fragment_length == remaining_length {
+ PacketBoundaryFlag::Complete
+ } else {
+ PacketBoundaryFlag::NotComplete
+ };
+
+ let mut packet = Vec::with_capacity(HEADER_SIZE + fragment_length);
+
+ packet.extend_from_slice(&complete_packet[0..HEADER_SIZE]);
+ const PBF_MASK: u8 = 0x10;
+ packet[0] &= !PBF_MASK;
+ packet[0] |= (pbf as u8) << 4;
+
+ match mt {
+ MessageType::Data => {
+ packet[2..4].copy_from_slice(&(fragment_length as u16).to_le_bytes())
+ }
+ _ => packet[3] = fragment_length as u8,
+ }
+
+ packet.extend_from_slice(&complete_packet[offset..offset + fragment_length]);
+
+ if let Some(file) = pcapng_file {
+ file.write(&packet, pcapng::Direction::Rx)?;
+ }
+
+ uci_sink
+ .send(packet)
+ .await
+ .map_err(|_| anyhow::anyhow!("output packet sink closed"))?;
+
+ offset += fragment_length;
+ }
+ }
}
- async fn connect(&mut self, stream: TcpStream) {
- let (packet_tx, mut packet_rx) = mpsc::channel(MAX_SESSION);
- let device_handle = self.counter;
- let pica_tx = self.tx.clone();
+ pub fn add_device(&mut self, stream: UciStream, sink: UciSink) -> Result<Handle> {
+ let (packet_tx, packet_rx) = mpsc::unbounded_channel();
+ let pica_tx = self.command_tx.clone();
+ let disconnect_tx = self.command_tx.clone();
let pcapng_dir = self.pcapng_dir.clone();
- println!("[{}] Connecting device", device_handle);
-
+ let handle = self.counter;
self.counter += 1;
- let mut device = Device::new(device_handle, packet_tx, self.tx.clone());
+
+ log::debug!("[{}] Connecting device", handle);
+
+ let mac_address = MacAddress::Short((handle as u16).to_be_bytes());
+ let mut device = Device::new(handle, mac_address, packet_tx, self.command_tx.clone());
device.init();
- self.send_event(PicaEvent::DeviceAdded {
- category: Category::Uci,
+ self.send_event(PicaEvent::Connected {
+ handle,
mac_address: device.mac_address,
- position: device.position,
});
- self.devices.insert(device_handle, device);
+ self.devices.insert(handle, device);
// Spawn and detach the connection handling task.
// The task notifies pica when exiting to let it clean
// the state.
tokio::task::spawn(async move {
- let mut pcapng_file = if let Some(dir) = pcapng_dir {
- let full_path = dir.join(format!("device-{}.pcapng", device_handle));
- println!("Recording pcapng to file {}", full_path.as_path().display());
- Some(pcapng::File::create(full_path).await.unwrap())
+ let pcapng_file = if let Some(dir) = pcapng_dir {
+ let full_path = dir.join(format!("device-{}.pcapng", handle));
+ log::debug!("Recording pcapng to file {}", full_path.as_path().display());
+ Some(pcapng::File::create(full_path).unwrap())
} else {
None
};
- let (uci_rx, uci_tx) = stream.into_split();
- let mut uci_reader = packets::uci::Reader::new(uci_rx);
- let mut uci_writer = packets::uci::Writer::new(uci_tx);
-
- 'outer: loop {
- tokio::select! {
- // Read command packet sent from connected UWB host.
- // Run associated command.
- result = uci_reader.read(&mut pcapng_file) =>
- match result {
- Ok(packet) =>
- match parse_uci_packet(&packet) {
- UciParseResult::UciCommand(cmd) => {
- pica_tx.send(PicaCommand::UciCommand(device_handle, cmd)).await.unwrap()
- },
- UciParseResult::UciData(data) => {
- pica_tx.send(PicaCommand::UciData(device_handle, data)).await.unwrap()
- },
- UciParseResult::Err(response) =>
- uci_writer.write(&response, &mut pcapng_file).await.unwrap(),
- UciParseResult::Skip => (),
- },
- Err(_) => break 'outer
- },
-
- // Send response packets to the connected UWB host.
- Some(packet) = packet_rx.recv() =>
- if uci_writer.write(&packet.to_bytes(), &mut pcapng_file).await.is_err() {
- break 'outer
- }
- }
- }
- pica_tx
- .send(PicaCommand::Disconnect(device_handle))
+ let _ = tokio::try_join!(
+ async { Self::read_routine(stream, pica_tx, handle, pcapng_file.as_ref()).await },
+ async { Self::write_routine(sink, packet_rx, handle, pcapng_file.as_ref()).await }
+ );
+
+ disconnect_tx
+ .send(PicaCommand::Disconnect(handle))
.await
.unwrap()
});
+
+ Ok(handle)
}
fn disconnect(&mut self, device_handle: usize) {
- println!("[{}] Disconnecting device", device_handle);
+ log::debug!("[{}] Disconnecting device", device_handle);
- match self
- .devices
- .get(&device_handle)
- .ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into()))
- {
- Ok(device) => {
- self.send_event(PicaEvent::DeviceRemoved {
- category: Category::Uci,
- mac_address: device.mac_address,
- });
- self.devices.remove(&device_handle);
- }
- Err(err) => println!("{}", err),
+ if let Some(device) = self.devices.get(&device_handle) {
+ self.send_event(PicaEvent::Disconnected {
+ handle: device_handle,
+ mac_address: device.mac_address,
+ });
+ self.devices.remove(&device_handle);
}
}
- async fn ranging(&mut self, device_handle: usize, session_id: u32) {
- println!("[{}] Ranging event", device_handle);
- println!(" session_id={}", session_id);
+ fn ranging(&mut self, device_handle: usize, session_id: u32) {
+ log::debug!("[{}] Ranging event", device_handle);
+ log::debug!(" session_id={}", session_id);
let device = self.get_device(device_handle).unwrap();
- let session = device.get_session(session_id).unwrap();
+ let session = device.session(session_id).unwrap();
+ let mut data_transfer = Vec::new();
let mut measurements = Vec::new();
- session
- .get_dst_mac_addresses()
- .iter()
- .for_each(|mac_address| {
- if let Some(anchor) = self.anchors.get(mac_address) {
- let local = device
- .position
- .compute_range_azimuth_elevation(&anchor.position);
- let remote = anchor
- .position
- .compute_range_azimuth_elevation(&device.position);
-
- assert!(local.0 == remote.0);
- measurements.push(make_measurement(mac_address, local, remote));
- }
- if let Some(peer_device) =
- self.get_device_by_mac(mac_address, &session.app_config, session_id)
- {
- let local: (u16, i16, i8) = device
- .position
- .compute_range_azimuth_elevation(&peer_device.position);
- let remote = peer_device
- .position
- .compute_range_azimuth_elevation(&device.position);
-
- assert!(local.0 == remote.0);
- measurements.push(make_measurement(mac_address, local, remote));
- }
- });
+
+ // Look for compatible anchors.
+ for mac_address in session.get_dst_mac_addresses() {
+ if let Some(other) = self.anchors.get(mac_address) {
+ let local = self
+ .ranging_estimator
+ .estimate(&device.handle, &other.handle)
+ .unwrap_or(Default::default());
+ let remote = self
+ .ranging_estimator
+ .estimate(&other.handle, &device.handle)
+ .unwrap_or(Default::default());
+ measurements.push(make_measurement(mac_address, local, remote));
+ }
+ }
+
+ // Look for compatible ranging sessions in other devices.
+ for peer_device in self.devices.values() {
+ if peer_device.handle == device_handle {
+ continue;
+ }
+
+ if peer_device.can_start_ranging(session, session_id) {
+ let peer_mac_address = peer_device
+ .session(session_id)
+ .unwrap()
+ .app_config
+ .device_mac_address;
+ let local = self
+ .ranging_estimator
+ .estimate(&device.handle, &peer_device.handle)
+ .unwrap_or(Default::default());
+ let remote = self
+ .ranging_estimator
+ .estimate(&peer_device.handle, &device.handle)
+ .unwrap_or(Default::default());
+ measurements.push(make_measurement(&peer_mac_address, local, remote));
+ }
+
+ if device.can_start_data_transfer(session_id)
+ && peer_device.can_receive_data_transfer(session_id)
+ {
+ data_transfer.push(peer_device);
+ }
+ }
+
+ // TODO: Data transfer should be limited in size for
+ // each round of ranging
+ for peer_device in data_transfer.iter() {
+ peer_device
+ .tx
+ .send(
+ DataMessageRcvBuilder {
+ application_data: session.data().clone().into(),
+ data_sequence_number: 0x01,
+ pbf: PacketBoundaryFlag::Complete,
+ session_handle: session_id,
+ source_address: device.mac_address.into(),
+ status: UciStatusCode::UciStatusOk,
+ }
+ .build()
+ .into(),
+ )
+ .unwrap();
+ }
if session.is_ranging_data_ntf_enabled() != RangeDataNtfConfig::Disable {
device
.tx
@@ -474,275 +491,155 @@ impl Pica {
.build()
.into(),
)
- .await
.unwrap();
let device = self.get_device_mut(device_handle).unwrap();
- let session = device.get_session_mut(session_id).unwrap();
+ let session = device.session_mut(session_id).unwrap();
session.sequence_number += 1;
}
+
+ // TODO: Clean the data only when all the data is transfered
+ let device = self.get_device_mut(device_handle).unwrap();
+ let session = device.session_mut(session_id).unwrap();
+
+ session.clear_data();
}
- async fn uci_data(&mut self, device_handle: usize, data: DataPacket) {
- match self
- .get_device_mut(device_handle)
- .ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into()))
- {
- Ok(device) => {
- let response: SessionControlNotification = device.data_message_snd(data);
- device.tx.send(response.into()).await.unwrap_or_else(|err| {
- println!("Failed to send UCI data packet response: {}", err)
- });
- }
- Err(err) => println!("{}", err),
+ fn uci_packet(&mut self, device_handle: usize, packet: Vec<u8>) {
+ match self.get_device_mut(device_handle) {
+ Some(device) => device.receive_packet(packet),
+ None => log::error!("Device {} not found", device_handle),
}
}
- async fn command(&mut self, device_handle: usize, cmd: UciCommand) {
- match self
- .get_device_mut(device_handle)
- .ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into()))
- {
- Ok(device) => {
- let response: ControlPacket = device.command(cmd).into();
- device
- .tx
- .send(response)
- .await
- .unwrap_or_else(|err| println!("Failed to send UCI command response: {}", err));
+
+ fn pica_command(&mut self, command: PicaCommand) {
+ use PicaCommand::*;
+ match command {
+ Connect(stream, sink) => {
+ let _ = self.add_device(stream, sink);
+ }
+ Disconnect(device_handle) => self.disconnect(device_handle),
+ Ranging(device_handle, session_id) => self.ranging(device_handle, session_id),
+ StopRanging(mac_address, session_id) => {
+ self.stop_controlee_ranging(&mac_address, session_id)
+ }
+ UciPacket(device_handle, packet) => self.uci_packet(device_handle, packet),
+ CreateAnchor(mac_address, pica_cmd_rsp_tx) => {
+ self.create_anchor(mac_address, pica_cmd_rsp_tx)
+ }
+ DestroyAnchor(mac_address, pica_cmd_rsp_tx) => {
+ self.destroy_anchor(mac_address, pica_cmd_rsp_tx)
}
- Err(err) => println!("{}", err),
}
}
- pub async fn run(&mut self) -> Result<()> {
+ /// Run the internal pica event loop.
+ pub async fn run(mut self) -> Result<()> {
+ let Some(mut command_rx) = self.command_rx.take() else {
+ anyhow::bail!("missing pica command receiver")
+ };
loop {
- use PicaCommand::*;
- match self.rx.recv().await {
- Some(Connect(stream)) => {
- self.connect(stream).await;
- }
- Some(Disconnect(device_handle)) => self.disconnect(device_handle),
- Some(Ranging(device_handle, session_id)) => {
- self.ranging(device_handle, session_id).await;
- }
- Some(StopRanging(mac_address, session_id)) => {
- self.stop_controlee_ranging(&mac_address, session_id).await;
- }
- Some(UciData(device_handle, data)) => self.uci_data(device_handle, data).await,
- Some(UciCommand(device_handle, cmd)) => self.command(device_handle, cmd).await,
- Some(SetPosition(mac_address, position, pica_cmd_rsp_tx)) => {
- self.set_position(mac_address, position, pica_cmd_rsp_tx)
- }
- Some(CreateAnchor(mac_address, position, pica_cmd_rsp_tx)) => {
- self.create_anchor(mac_address, position, pica_cmd_rsp_tx)
- }
- Some(DestroyAnchor(mac_address, pica_cmd_rsp_tx)) => {
- self.destroy_anchor(mac_address, pica_cmd_rsp_tx)
- }
- Some(GetState(state_tx)) => self.get_state(state_tx),
- Some(InitUciDevice(mac_address, position, pica_cmd_rsp_tx)) => {
- self.init_uci_device(mac_address, position, pica_cmd_rsp_tx);
- }
- None => (),
- };
+ if let Some(command) = command_rx.recv().await {
+ self.pica_command(command)
+ }
}
}
// Handle the in-band StopRanging command sent from controller to the controlee with
// corresponding mac_address and session_id.
- async fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) {
- if let Some(device) = self.get_device_mut_by_mac_and_session_id(mac_address, session_id) {
- // If such device with target session is found, stop the ranging session.
- let session = device.get_session_mut(session_id).unwrap();
- session.stop_ranging_task();
- session.set_state(
- SessionState::SessionStateIdle,
- ReasonCode::SessionStoppedDueToInbandSignal,
- );
- device.n_active_sessions -= 1;
- if device.n_active_sessions == 0 {
- device.set_state(DeviceState::DeviceStateReady);
- }
- }
- }
-
- // TODO: Assign a reserved range of mac addresses for UCI devices
- // to protect against conflicts with user defined Anchor addresses
- // b/246000641
- fn init_uci_device(
- &mut self,
- mac_address: MacAddress,
- position: Position,
- pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
- ) {
- println!("[_] Init device");
- println!(" mac_address: {}", mac_address);
- println!(" position={:?}", position);
-
- let status = self
- .get_device_mut_by_mac(mac_address)
- .ok_or(PicaCommandError::DeviceNotFound(mac_address))
- .map(|uci_device| {
- uci_device.mac_address = mac_address;
- uci_device.position = position;
- });
-
- pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
- println!("Failed to send init-uci-device command response: {:?}", err)
- });
- }
-
- fn set_position(
- &mut self,
- mac_address: MacAddress,
- position: Position,
- pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
- ) {
- let mut status = if let Some(uci_device) = self.get_device_mut_by_mac(mac_address) {
- uci_device.position = position;
- Ok(())
- } else if let Some(anchor) = self.anchors.get_mut(&mac_address) {
- anchor.position = position;
- Ok(())
- } else {
- Err(PicaCommandError::DeviceNotFound(mac_address))
- };
-
- if status.is_ok() {
- status = self.update_position(mac_address, position)
- }
-
- pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
- println!("Failed to send set-position command response: {:?}", err)
- });
- }
+ fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) {
+ for device in self.devices.values_mut() {
+ let Some(session) = device.session_mut(session_id) else {
+ continue;
+ };
- fn update_position(
- &self,
- mac_address: MacAddress,
- position: Position,
- ) -> Result<(), PicaCommandError> {
- let category = match self.get_category(&mac_address) {
- Some(category) => category,
- None => {
- return Err(PicaCommandError::DeviceNotFound(mac_address));
+ if &session.app_config.device_mac_address != mac_address {
+ continue;
}
- };
- self.send_event(PicaEvent::DeviceUpdated {
- category,
- mac_address,
- position,
- });
- let devices = self.devices.values().map(|d| (d.mac_address, d.position));
- let anchors = self.anchors.values().map(|b| (b.mac_address, b.position));
-
- let update_neighbors = |device_category, device_mac_address, device_position| {
- if mac_address != device_mac_address {
- let local = position.compute_range_azimuth_elevation(&device_position);
- let remote = device_position.compute_range_azimuth_elevation(&position);
-
- assert!(local.0 == remote.0);
-
- self.send_event(PicaEvent::NeighborUpdated {
- source_category: category,
- source_mac_address: mac_address,
- destination_category: device_category,
- destination_mac_address: device_mac_address,
- distance: local.0,
- azimuth: local.1,
- elevation: local.2,
- });
-
- self.send_event(PicaEvent::NeighborUpdated {
- source_category: device_category,
- source_mac_address: device_mac_address,
- destination_category: category,
- destination_mac_address: mac_address,
- distance: remote.0,
- azimuth: remote.1,
- elevation: remote.2,
- });
+ if session.session_state() == SessionState::SessionStateActive {
+ session.stop_ranging_task();
+ session.set_state(
+ SessionState::SessionStateIdle,
+ ReasonCode::SessionStoppedDueToInbandSignal,
+ );
+ device.n_active_sessions = device.n_active_sessions.saturating_sub(1);
+ if device.n_active_sessions == 0 {
+ device.set_state(DeviceState::DeviceStateReady);
+ }
+ } else {
+ log::warn!("stop_controlee_ranging: session is not active !");
}
- };
-
- devices.for_each(|device| update_neighbors(Category::Uci, device.0, device.1));
- anchors.for_each(|anchor| update_neighbors(Category::Anchor, anchor.0, anchor.1));
- Ok(())
+ }
}
#[allow(clippy::map_entry)]
fn create_anchor(
&mut self,
mac_address: MacAddress,
- position: Position,
- pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
+ rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>,
) {
- println!("Create anchor: {} {}", mac_address, position);
+ log::debug!("[_] Create anchor");
+ log::debug!(" mac_address: {}", mac_address);
+
let status = if self.get_category(&mac_address).is_some() {
Err(PicaCommandError::DeviceAlreadyExists(mac_address))
} else {
- self.send_event(PicaEvent::DeviceAdded {
- category: Category::Anchor,
- mac_address,
- position,
- });
+ let handle = self.counter;
+ self.counter += 1;
+
assert!(self
.anchors
.insert(
mac_address,
Anchor {
+ handle,
mac_address,
- position,
},
)
.is_none());
- Ok(())
+
+ Ok(handle)
};
- pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
- println!("Failed to send create-anchor command response: {:?}", err)
+ rsp_tx.send(status).unwrap_or_else(|err| {
+ log::error!("Failed to send create-anchor command response: {:?}", err)
})
}
fn destroy_anchor(
&mut self,
mac_address: MacAddress,
- pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
+ rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>,
) {
- println!("[_] Destroy anchor");
- println!(" mac_address: {}", mac_address);
+ log::debug!("[_] Destroy anchor");
+ log::debug!(" mac_address: {}", mac_address);
- let status = if self.anchors.remove(&mac_address).is_none() {
- Err(PicaCommandError::DeviceNotFound(mac_address))
- } else {
- self.send_event(PicaEvent::DeviceRemoved {
- category: Category::Anchor,
- mac_address,
- });
- Ok(())
+ let status = match self.anchors.remove(&mac_address) {
+ None => Err(PicaCommandError::DeviceNotFound(mac_address)),
+ Some(anchor) => Ok(anchor.handle),
};
- pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
- println!("Failed to send destroy-anchor command response: {:?}", err)
+
+ rsp_tx.send(status).unwrap_or_else(|err| {
+ log::error!("Failed to send destroy-anchor command response: {:?}", err)
})
}
+}
- fn get_state(&self, state_tx: oneshot::Sender<Vec<(Category, MacAddress, Position)>>) {
- println!("[_] Get State");
-
- state_tx
- .send(
- self.anchors
- .values()
- .map(|anchor| (Category::Anchor, anchor.mac_address, anchor.position))
- .chain(
- self.devices
- .values()
- .map(|device| (Category::Uci, device.mac_address, device.position)),
- )
- .collect(),
- )
- .unwrap();
+/// Run the internal pica event loop.
+/// As opposed to Pica::run, the context is passed under a mutex, which
+/// allows synchronous access to the context for device creation.
+pub async fn run(this: &std::sync::Mutex<Pica>) -> Result<()> {
+ // Extract the mpsc receiver from the Pica context.
+ // The receiver cannot be cloned.
+ let Some(mut command_rx) = this.lock().unwrap().command_rx.take() else {
+ anyhow::bail!("missing pica command receiver");
+ };
+
+ loop {
+ if let Some(command) = command_rx.recv().await {
+ this.lock().unwrap().pica_command(command)
+ }
}
} \ No newline at end of file
diff --git a/src/mac_address.rs b/src/mac_address.rs
index 320631a..79cfe94 100644
--- a/src/mac_address.rs
+++ b/src/mac_address.rs
@@ -19,21 +19,22 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
const SHORT_MAC_ADDRESS_SIZE: usize = 2;
-const STRING_SHORT_MAC_ADDRESS_SIZE: usize = 2 * SHORT_MAC_ADDRESS_SIZE;
+const SHORT_MAC_ADDRESS_STR_SIZE: usize = 2 * SHORT_MAC_ADDRESS_SIZE;
-const EXTEND_MAC_ADDRESS_SIZE: usize = 8;
-const STRING_EXTEND_MAC_ADDRESS_SIZE: usize = 2 * EXTEND_MAC_ADDRESS_SIZE;
+const EXTENDED_MAC_ADDRESS_SIZE: usize = 8;
+const EXTENDED_MAC_ADDRESS_STR_SIZE: usize = 2 * EXTENDED_MAC_ADDRESS_SIZE;
#[derive(Error, Debug)]
pub enum Error {
#[error("MacAddress has the wrong format: 0")]
MacAddressWrongFormat(String),
}
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(try_from = "String", into = "String")]
pub enum MacAddress {
Short([u8; SHORT_MAC_ADDRESS_SIZE]),
- Extend([u8; EXTEND_MAC_ADDRESS_SIZE]),
+ Extended([u8; EXTENDED_MAC_ADDRESS_SIZE]),
}
impl MacAddress {
@@ -42,30 +43,30 @@ impl MacAddress {
}
}
-impl From<usize> for MacAddress {
- fn from(device_handle: usize) -> Self {
- let handle = device_handle as u64;
- MacAddress::Extend(handle.to_be_bytes())
+impl From<MacAddress> for u64 {
+ fn from(mac_address: MacAddress) -> Self {
+ match mac_address {
+ MacAddress::Short(addr) => u16::from_le_bytes(addr) as u64,
+ MacAddress::Extended(addr) => u64::from_le_bytes(addr),
+ }
}
}
impl TryFrom<String> for MacAddress {
type Error = Error;
fn try_from(mac_address: String) -> std::result::Result<Self, Error> {
- let mac_address = mac_address.replace(':', "");
- let mac_address = mac_address.replace("%3A", "");
- let uwb_mac_address = match mac_address.len() {
- STRING_SHORT_MAC_ADDRESS_SIZE => MacAddress::Short(
+ let mac_address = mac_address.replace(':', "").replace("%3A", "");
+ Ok(match mac_address.len() {
+ SHORT_MAC_ADDRESS_STR_SIZE => MacAddress::Short(
<[u8; SHORT_MAC_ADDRESS_SIZE]>::from_hex(mac_address)
.map_err(|err| Error::MacAddressWrongFormat(err.to_string()))?,
),
- STRING_EXTEND_MAC_ADDRESS_SIZE => MacAddress::Extend(
- <[u8; EXTEND_MAC_ADDRESS_SIZE]>::from_hex(mac_address)
+ EXTENDED_MAC_ADDRESS_STR_SIZE => MacAddress::Extended(
+ <[u8; EXTENDED_MAC_ADDRESS_SIZE]>::from_hex(mac_address)
.map_err(|err| Error::MacAddressWrongFormat(err.to_string()))?,
),
_ => return Err(Error::MacAddressWrongFormat(mac_address)),
- };
- Ok(uwb_mac_address)
+ })
}
}
@@ -81,7 +82,7 @@ impl From<&MacAddress> for String {
};
match mac_address {
MacAddress::Short(address) => to_string(address),
- MacAddress::Extend(address) => to_string(address),
+ MacAddress::Extended(address) => to_string(address),
}
}
}
@@ -112,7 +113,7 @@ mod tests {
let valid_mac_address = "FF:77:AA:DD:EE:BB:CC:10";
assert_eq!(
MacAddress::new(valid_mac_address.into()).unwrap(),
- MacAddress::Extend([0xFF, 0x77, 0xAA, 0xDD, 0xEE, 0xBB, 0xCC, 0x10])
+ MacAddress::Extended([0xFF, 0x77, 0xAA, 0xDD, 0xEE, 0xBB, 0xCC, 0x10])
);
}
@@ -132,16 +133,32 @@ mod tests {
#[test]
fn display_mac_address() {
- let extend_mac_address = "00:FF:77:AA:DD:EE:CC:45";
+ let extended_mac_address = "00:FF:77:AA:DD:EE:CC:45";
let short_mac_address = "00:FF";
assert_eq!(
- format!("{}", MacAddress::new(extend_mac_address.into()).unwrap()),
- extend_mac_address
+ format!("{}", MacAddress::new(extended_mac_address.into()).unwrap()),
+ extended_mac_address
);
assert_eq!(
format!("{}", MacAddress::new(short_mac_address.into()).unwrap()),
short_mac_address
);
- assert_eq!(extend_mac_address.to_string(), extend_mac_address);
+ assert_eq!(extended_mac_address.to_string(), extended_mac_address);
+ }
+
+ #[test]
+ fn test_short_mac_to_u64() {
+ let short_mac = MacAddress::Short([0x01, 0x02]);
+ let result: u64 = short_mac.into();
+ let expected: u64 = 0x0201;
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_extended_mac_to_u64() {
+ let extended_mac = MacAddress::Extended([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]);
+ let result: u64 = extended_mac.into();
+ let expected: u64 = 0x0807060504030201;
+ assert_eq!(result, expected);
}
}
diff --git a/src/packets.rs b/src/packets.rs
index f0b4b64..16e7b4f 100644
--- a/src/packets.rs
+++ b/src/packets.rs
@@ -39,152 +39,50 @@ pub mod uci {
MessageType::try_from((byte >> 5) & 0x7).unwrap_or(MessageType::Command)
}
- use crate::pcapng;
- use std::pin::Pin;
- use tokio::io::{AsyncRead, AsyncWrite};
- use tokio::sync::Mutex;
+ /// Read a single UCI packet from a TCP read half.
+ /// This function does not reassemble segmented packets.
+ pub async fn read(
+ mut socket: tokio::net::tcp::OwnedReadHalf,
+ ) -> Option<(Vec<u8>, tokio::net::tcp::OwnedReadHalf)> {
+ use tokio::io::AsyncReadExt;
- /// Read UCI Control and Data packets received on the UCI transport.
- /// Performs recombination of the segmented packets.
- pub struct Reader {
- socket: Pin<Box<dyn AsyncRead + Send>>,
- }
-
- /// Write UCI Control and Data packets received to the UCI transport.
- /// Performs segmentation of the packets.
- pub struct Writer {
- socket: Pin<Box<dyn AsyncWrite + Send>>,
- }
-
- impl Reader {
- /// Create an UCI reader from an UCI transport.
- pub fn new<T: AsyncRead + Send + 'static>(rx: T) -> Self {
- Reader {
- socket: Box::pin(rx),
- }
- }
-
- /// Read a single UCI packet from the reader. The packet is automatically
- /// re-assembled if segmented on the UCI transport. Data segments
- /// are _not_ re-assembled but returned immediatly for credit
- /// acknowledgment.
- pub async fn read(&mut self, pcapng: &mut Option<pcapng::File>) -> anyhow::Result<Vec<u8>> {
- use tokio::io::AsyncReadExt;
-
- let mut complete_packet = vec![0; HEADER_SIZE];
-
- // Note on reassembly:
- // For each segment of a Control Message, the
- // header of the Control Packet SHALL contain the same MT, GID and OID
- // values. It is correct to keep only the last header of the segmented packet.
- loop {
- // Read the common packet header.
- self.socket
- .read_exact(&mut complete_packet[0..HEADER_SIZE])
- .await?;
- let common_packet_header =
- CommonPacketHeader::parse(&complete_packet[0..COMMON_HEADER_SIZE])?;
+ let mut packet = vec![0; HEADER_SIZE];
- // Read the packet payload.
- let payload_length = match common_packet_header.get_mt() {
- MessageType::Data => {
- let data_packet_header =
- DataPacketHeader::parse(&complete_packet[0..HEADER_SIZE])?;
- data_packet_header.get_payload_length() as usize
- }
- _ => {
- let control_packet_header =
- ControlPacketHeader::parse(&complete_packet[0..HEADER_SIZE])?;
- control_packet_header.get_payload_length() as usize
- }
- };
- let mut payload_bytes = vec![0; payload_length];
- self.socket.read_exact(&mut payload_bytes).await?;
- complete_packet.extend(&payload_bytes);
+ // Read the common packet header.
+ socket.read_exact(&mut packet[0..HEADER_SIZE]).await.ok()?;
- if let Some(ref mut pcapng) = pcapng {
- let mut packet_bytes = vec![];
- packet_bytes.extend(&complete_packet[0..HEADER_SIZE]);
- packet_bytes.extend(&payload_bytes);
- pcapng.write(&packet_bytes, pcapng::Direction::Tx).await?;
- }
+ let common_packet_header =
+ CommonPacketHeader::parse(&packet[0..COMMON_HEADER_SIZE]).ok()?;
- if common_packet_header.get_mt() == MessageType::Data {
- return Ok(complete_packet);
- }
-
- // Check the Packet Boundary Flag.
- match common_packet_header.get_pbf() {
- PacketBoundaryFlag::Complete => return Ok(complete_packet),
- PacketBoundaryFlag::NotComplete => (),
- }
+ let payload_length = match common_packet_header.get_mt() {
+ MessageType::Data => {
+ let data_packet_header = DataPacketHeader::parse(&packet[0..HEADER_SIZE]).ok()?;
+ data_packet_header.get_payload_length() as usize
}
- }
- }
-
- impl Writer {
- /// Create an UCI writer from an UCI transport.
- pub fn new<T: AsyncWrite + Send + 'static>(rx: T) -> Self {
- Writer {
- socket: Box::pin(rx),
+ _ => {
+ let control_packet_header =
+ ControlPacketHeader::parse(&packet[0..HEADER_SIZE]).ok()?;
+ control_packet_header.get_payload_length() as usize
}
- }
-
- /// Write a single UCI packet to the writer. The packet is automatically
- /// segmented if the payload exceeds the maximum size limit.
- pub async fn write(
- &mut self,
- mut packet: &[u8],
- pcapng: &mut Option<pcapng::File>,
- ) -> anyhow::Result<()> {
- use tokio::io::AsyncWriteExt;
+ };
- let mut header_bytes = [packet[0], packet[1], packet[2], 0];
- packet = &packet[HEADER_SIZE..];
+ // Read the packet payload.
+ packet.resize(payload_length + HEADER_SIZE, 0);
+ socket.read_exact(&mut packet[HEADER_SIZE..]).await.ok()?;
- loop {
- let message_type = parse_message_type(header_bytes[0]);
- let chunk_length = std::cmp::min(
- packet.len(),
- match message_type {
- MessageType::Data => MAX_DATA_PACKET_PAYLOAD_SIZE,
- _ => MAX_CTRL_PACKET_PAYLOAD_SIZE,
- },
- );
- // Update header with framing information.
- let pbf = if chunk_length < packet.len() {
- PacketBoundaryFlag::NotComplete
- } else {
- PacketBoundaryFlag::Complete
- };
- const PBF_MASK: u8 = 0x10;
- header_bytes[0] &= !PBF_MASK;
- header_bytes[0] |= (pbf as u8) << 4;
-
- match message_type {
- MessageType::Data => {
- let chunk_le_bytes = (chunk_length as u16).to_le_bytes();
- header_bytes[2..4].copy_from_slice(&chunk_le_bytes);
- }
- _ => header_bytes[3] = chunk_length as u8,
- }
+ Some((packet, socket))
+ }
- if let Some(ref mut pcapng) = pcapng {
- let mut packet_bytes = vec![];
- packet_bytes.extend(&header_bytes);
- packet_bytes.extend(&packet[..chunk_length]);
- pcapng.write(&packet_bytes, pcapng::Direction::Rx).await?
- }
+ /// Write a single UCI packet to a TCP write half.
+ /// This function accepts segmented packets only.
+ pub async fn write(
+ mut socket: tokio::net::tcp::OwnedWriteHalf,
+ packet: Vec<u8>,
+ ) -> std::result::Result<tokio::net::tcp::OwnedWriteHalf, anyhow::Error> {
+ use tokio::io::AsyncWriteExt;
- // Write the header and payload segment bytes.
- self.socket.write_all(&header_bytes).await?;
- self.socket.write_all(&packet[..chunk_length]).await?;
- packet = &packet[chunk_length..];
+ socket.write_all(&packet).await?;
- if packet.is_empty() {
- return Ok(());
- }
- }
- }
+ Ok(socket)
}
}
diff --git a/src/pcapng.rs b/src/pcapng.rs
index 3119bb1..9c1da7e 100644
--- a/src/pcapng.rs
+++ b/src/pcapng.rs
@@ -14,12 +14,12 @@
#![allow(clippy::unused_io_amount)]
+use std::io::Write;
use std::path::Path;
use std::time::Instant;
-use tokio::io::AsyncWriteExt;
pub struct File {
- file: tokio::fs::File,
+ file: std::sync::Mutex<std::fs::File>,
start_time: Instant,
}
@@ -29,51 +29,50 @@ pub enum Direction {
}
impl File {
- pub async fn create<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
- let mut file = tokio::fs::File::create(path).await?;
+ pub fn create<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
+ let mut file = std::fs::File::create(path)?;
// PCAPng files must start with a Section Header Block.
- file.write(&u32::to_le_bytes(0x0A0D0D0A)).await?; // Block Type
- file.write(&u32::to_le_bytes(28)).await?; // Block Total Length
- file.write(&u32::to_le_bytes(0x1A2B3C4D)).await?; // Byte-Order Magic
- file.write(&u16::to_le_bytes(1)).await?; // Major Version
- file.write(&u16::to_le_bytes(0)).await?; // Minor Version
- file.write(&u64::to_le_bytes(0xFFFFFFFFFFFFFFFF)).await?; // Section Length (not specified)
- file.write(&u32::to_le_bytes(28)).await?; // Block Total Length
+ file.write(&u32::to_le_bytes(0x0A0D0D0A))?; // Block Type
+ file.write(&u32::to_le_bytes(28))?; // Block Total Length
+ file.write(&u32::to_le_bytes(0x1A2B3C4D))?; // Byte-Order Magic
+ file.write(&u16::to_le_bytes(1))?; // Major Version
+ file.write(&u16::to_le_bytes(0))?; // Minor Version
+ file.write(&u64::to_le_bytes(0xFFFFFFFFFFFFFFFF))?; // Section Length (not specified)
+ file.write(&u32::to_le_bytes(28))?; // Block Total Length
// Write the Interface Description Block used for all
// UCI records.
- file.write(&u32::to_le_bytes(0x00000001)).await?; // Block Type
- file.write(&u32::to_le_bytes(20)).await?; // Block Total Length
- file.write(&u16::to_le_bytes(293)).await?; // LinkType
- file.write(&u16::to_le_bytes(0)).await?; // Reserved
- file.write(&u32::to_le_bytes(0)).await?; // SnapLen (no limit)
- file.write(&u32::to_le_bytes(20)).await?; // Block Total Length
+ file.write(&u32::to_le_bytes(0x00000001))?; // Block Type
+ file.write(&u32::to_le_bytes(20))?; // Block Total Length
+ file.write(&u16::to_le_bytes(293))?; // LinkType
+ file.write(&u16::to_le_bytes(0))?; // Reserved
+ file.write(&u32::to_le_bytes(0))?; // SnapLen (no limit)
+ file.write(&u32::to_le_bytes(20))?; // Block Total Length
Ok(File {
- file,
+ file: std::sync::Mutex::new(file),
start_time: Instant::now(),
})
}
- pub async fn write(&mut self, packet: &[u8], _dir: Direction) -> std::io::Result<()> {
+ pub fn write(&self, packet: &[u8], _dir: Direction) -> std::io::Result<()> {
let packet_data_padding: usize = 4 - packet.len() % 4;
let block_total_length: u32 = packet.len() as u32 + packet_data_padding as u32 + 32;
let timestamp = self.start_time.elapsed().as_micros();
- let file = &mut self.file;
+ let mut file = self.file.lock().unwrap();
// Wrap the packet inside an Enhanced Packet Block.
- file.write(&u32::to_le_bytes(0x00000006)).await?; // Block Type
- file.write(&u32::to_le_bytes(block_total_length)).await?;
- file.write(&u32::to_le_bytes(0)).await?; // Interface ID
- file.write(&u32::to_le_bytes((timestamp >> 32) as u32))
- .await?; // Timestamp (High)
- file.write(&u32::to_le_bytes(timestamp as u32)).await?; // Timestamp (Low)
- file.write(&u32::to_le_bytes(packet.len() as u32)).await?; // Captured Packet Length
- file.write(&u32::to_le_bytes(packet.len() as u32)).await?; // Original Packet Length
- file.write(packet).await?;
- file.write(&vec![0; packet_data_padding]).await?;
- file.write(&u32::to_le_bytes(block_total_length)).await?; // Block Total Length
+ file.write(&u32::to_le_bytes(0x00000006))?; // Block Type
+ file.write(&u32::to_le_bytes(block_total_length))?;
+ file.write(&u32::to_le_bytes(0))?; // Interface ID
+ file.write(&u32::to_le_bytes((timestamp >> 32) as u32))?; // Timestamp (High)
+ file.write(&u32::to_le_bytes(timestamp as u32))?; // Timestamp (Low)
+ file.write(&u32::to_le_bytes(packet.len() as u32))?; // Captured Packet Length
+ file.write(&u32::to_le_bytes(packet.len() as u32))?; // Original Packet Length
+ file.write(packet)?;
+ file.write(&vec![0; packet_data_padding])?;
+ file.write(&u32::to_le_bytes(block_total_length))?; // Block Total Length
Ok(())
}
}
diff --git a/src/session.rs b/src/session.rs
index 8425b1b..eb4e063 100644
--- a/src/session.rs
+++ b/src/session.rs
@@ -18,6 +18,7 @@
use crate::packets::uci::{self, *};
use crate::{MacAddress, PicaCommand};
+use bytes::BytesMut;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::mpsc;
@@ -27,6 +28,8 @@ use tokio::time;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::FromPrimitive;
+use super::UciPacket;
+
pub const MAX_SESSION: usize = 255;
pub const DEFAULT_RANGING_INTERVAL: Duration = time::Duration::from_millis(200);
pub const DEFAULT_SLOT_DURATION: u16 = 2400; // RTSU unit
@@ -462,7 +465,7 @@ impl AppConfig {
MacAddress::Short(value[..].try_into().unwrap())
}
MacAddressMode::AddressMode2 => {
- MacAddress::Extend(value[..].try_into().unwrap())
+ MacAddress::Extended(value[..].try_into().unwrap())
}
_ => panic!("Unexpected MAC Address Mode"),
};
@@ -486,7 +489,7 @@ impl AppConfig {
.chunks(mac_address_size)
.map(|c| match self.mac_address_mode {
MacAddressMode::AddressMode0 => MacAddress::Short(c.try_into().unwrap()),
- MacAddressMode::AddressMode2 => MacAddress::Extend(c.try_into().unwrap()),
+ MacAddressMode::AddressMode2 => MacAddress::Extended(c.try_into().unwrap()),
_ => panic!("Unexpected MAC Address Mode"),
})
.collect();
@@ -623,7 +626,7 @@ impl AppConfig {
}
AppConfigTlvType::ApplicationDataEndpoint => self.application_data_endpoint = value[0],
id => {
- println!("Ignored AppConfig parameter {:?}", id);
+ log::error!("Ignored AppConfig parameter {:?}", id);
return Err(StatusCode::UciStatusInvalidParam);
}
};
@@ -637,7 +640,7 @@ impl AppConfig {
self.raw.get(&id).cloned()
}
- pub fn can_start_ranging_with_peer(&self, peer_config: &Self) -> bool {
+ pub fn is_compatible_for_ranging(&self, peer_config: &Self) -> bool {
self == peer_config
&& self.device_role != peer_config.device_role
&& self.device_type != peer_config.device_type
@@ -649,6 +652,14 @@ impl AppConfig {
.contains(&peer_config.device_mac_address)
}
+ pub fn can_start_data_transfer(&self) -> bool {
+ self.device_role == DeviceRole::Initiator
+ }
+
+ pub fn can_receive_data_transfer(&self) -> bool {
+ self.device_role == DeviceRole::Responder
+ }
+
fn extend(&mut self, configs: &[AppConfigTlv]) -> Vec<AppConfigStatus> {
if !app_config_has_mandatory_parameters(configs) {
// TODO: What shall we do in this situation?
@@ -717,12 +728,13 @@ pub struct Session {
/// cf. [UCI] 7.2 Table 13: 4 octets unique random number generated by application
id: u32,
device_handle: usize,
+ data: BytesMut,
session_type: SessionType,
pub sequence_number: u32,
pub app_config: AppConfig,
ranging_task: Option<JoinHandle<()>>,
- tx: mpsc::Sender<ControlPacket>,
+ tx: mpsc::UnboundedSender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
}
@@ -731,13 +743,14 @@ impl Session {
id: u32,
session_type: SessionType,
device_handle: usize,
- tx: mpsc::Sender<ControlPacket>,
+ tx: mpsc::UnboundedSender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
) -> Self {
Self {
state: SessionState::SessionStateDeinit,
id,
device_handle,
+ data: BytesMut::new(),
session_type,
sequence_number: 0,
app_config: AppConfig::default(),
@@ -768,7 +781,6 @@ impl Session {
.build()
.into(),
)
- .await
.unwrap()
});
}
@@ -781,6 +793,18 @@ impl Session {
self.app_config.rng_data_ntf
}
+ pub fn data(&self) -> &BytesMut {
+ &self.data
+ }
+
+ pub fn clear_data(&mut self) {
+ self.data.clear()
+ }
+
+ pub fn session_type(&self) -> SessionType {
+ self.session_type
+ }
+
pub fn session_state(&self) -> SessionState {
self.state
}
@@ -794,9 +818,10 @@ impl Session {
fn command_set_app_config(&mut self, cmd: SessionSetAppConfigCmd) -> SessionSetAppConfigRsp {
// TODO properly handle these asserts
- println!(
+ log::debug!(
"[{}:0x{:x}] Session Set App Config",
- self.device_handle, self.id
+ self.device_handle,
+ self.id
);
assert_eq!(self.id, cmd.get_session_token());
assert!(
@@ -850,9 +875,10 @@ impl Session {
}
fn command_get_app_config(&self, cmd: SessionGetAppConfigCmd) -> SessionGetAppConfigRsp {
- println!(
+ log::debug!(
"[{}:0x{:x}] Session Get App Config",
- self.device_handle, self.id
+ self.device_handle,
+ self.id
);
assert_eq!(self.id, cmd.get_session_token());
@@ -871,7 +897,7 @@ impl Session {
v: Vec::new(),
}),
},
- Err(_) => println!("Failed to parse AppConfigTlv: {:?}", *config_id),
+ Err(_) => log::error!("Failed to parse AppConfigTlv: {:?}", *config_id),
}
(valid_parameters, invalid_parameters)
},
@@ -890,7 +916,7 @@ impl Session {
}
fn command_get_state(&self, cmd: SessionGetStateCmd) -> SessionGetStateRsp {
- println!("[{}:0x{:x}] Session Get State", self.device_handle, self.id);
+ log::debug!("[{}:0x{:x}] Session Get State", self.device_handle, self.id);
assert_eq!(self.id, cmd.get_session_token());
SessionGetStateRspBuilder {
status: StatusCode::UciStatusOk,
@@ -903,9 +929,10 @@ impl Session {
&mut self,
cmd: SessionUpdateControllerMulticastListCmd,
) -> SessionUpdateControllerMulticastListRsp {
- println!(
+ log::debug!(
"[{}:0x{:x}] Session Update Controller Multicast List",
- self.device_handle, self.id
+ self.device_handle,
+ self.id
);
assert_eq!(self.id, cmd.get_session_token());
if (self.state != SessionState::SessionStateActive
@@ -1009,7 +1036,7 @@ impl Session {
controlee_status.push(ControleeStatus {
mac_address: match controlee.short_address {
MacAddress::Short(address) => address,
- MacAddress::Extend(_) => panic!("Extended address is not supported!"),
+ MacAddress::Extended(_) => panic!("Extended address is not supported!"),
},
subsession_id: controlee.sub_session_id,
status: update_status,
@@ -1045,7 +1072,7 @@ impl Session {
controlee_status.push(ControleeStatus {
mac_address: match address {
MacAddress::Short(addr) => addr,
- MacAddress::Extend(_) => panic!("Extended address is not supported!"),
+ MacAddress::Extended(_) => panic!("Extended address is not supported!"),
},
subsession_id: controlee.sub_session_id,
status: update_status,
@@ -1075,14 +1102,13 @@ impl Session {
.build()
.into(),
)
- .await
.unwrap()
});
SessionUpdateControllerMulticastListRspBuilder { status }.build()
}
fn command_range_start(&mut self, cmd: SessionStartCmd) -> SessionStartRsp {
- println!("[{}:0x{:x}] Range Start", self.device_handle, self.id);
+ log::debug!("[{}:0x{:x}] Range Start", self.device_handle, self.id);
assert_eq!(self.id, cmd.get_session_id());
let status = if self.state != SessionState::SessionStateIdle {
@@ -1119,7 +1145,7 @@ impl Session {
}
}
fn command_range_stop(&mut self, cmd: SessionStopCmd) -> SessionStopRsp {
- println!("[{}:0x{:x}] Range Stop", self.device_handle, self.id);
+ log::debug!("[{}:0x{:x}] Range Stop", self.device_handle, self.id);
assert_eq!(self.id, cmd.get_session_id());
let status = if self.state != SessionState::SessionStateActive {
@@ -1139,9 +1165,10 @@ impl Session {
&self,
cmd: SessionGetRangingCountCmd,
) -> SessionGetRangingCountRsp {
- println!(
+ log::debug!(
"[{}:0x{:x}] Range Get Ranging Count",
- self.device_handle, self.id
+ self.device_handle,
+ self.id
);
assert_eq!(self.id, cmd.get_session_id());
@@ -1184,6 +1211,7 @@ impl Session {
}
pub fn data_message_snd(&mut self, data: DataMessageSnd) -> SessionControlNotification {
+ log::debug!("[{}] data_message_snd", self.device_handle);
let session_token = data.get_session_handle();
let uci_sequence_number = data.get_data_sequence_number() as u8;
@@ -1200,11 +1228,7 @@ impl Session {
assert_eq!(self.id, session_token);
- // TODO: perform actual data transfer across devices
- println!(
- "Data packet received, payload bytes: {:?}",
- data.get_application_data()
- );
+ self.data.extend_from_slice(data.get_application_data());
DataCreditNtfBuilder {
credit_availability: CreditAvailability::CreditAvailable,
diff --git a/tests/data_transfer.py b/tests/data_transfer.py
index f7d731c..241a6b9 100755
--- a/tests/data_transfer.py
+++ b/tests/data_transfer.py
@@ -16,6 +16,7 @@
import asyncio
import argparse
+import logging
from pica import Host
from pica.packets import uci
from .helper import init
@@ -24,7 +25,7 @@ from pathlib import Path
MAX_DATA_PACKET_PAYLOAD_SIZE = 1024
-async def data_message_send(host: Host, peer: Host, file: Path):
+async def controller(host: Host, peer: Host, file: Path):
await init(host)
host.send_control(
@@ -92,6 +93,167 @@ async def data_message_send(host: Host, peer: Host, file: Path):
await data_transfer(host, peer.mac_address, file, 0)
+ # START SESSION CMD
+ host.send_control(uci.SessionStartCmd(session_id=0))
+
+ await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))
+
+ await host.expect_control(
+ uci.SessionStatusNtf(
+ session_token=0,
+ session_state=uci.SessionState.SESSION_STATE_ACTIVE,
+ reason_code=0,
+ )
+ )
+
+ await host.expect_control(
+ uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
+ )
+
+ event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
+ event.show()
+
+ event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
+ event.show()
+
+ # STOP SESSION
+ host.send_control(uci.SessionStopCmd(session_id=0))
+
+ await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))
+
+ await host.expect_control(
+ uci.SessionStatusNtf(
+ session_token=0,
+ session_state=uci.SessionState.SESSION_STATE_IDLE,
+ reason_code=0,
+ )
+ )
+
+ await host.expect_control(
+ uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ )
+
+ # DEINIT
+ host.send_control(uci.SessionDeinitCmd(session_token=0))
+
+ await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+
+
+async def controlee(host: Host, peer: Host, file: Path):
+ await init(host)
+
+ host.send_control(
+ uci.SessionInitCmd(
+ session_id=0,
+ session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION,
+ )
+ )
+
+ await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK))
+
+ await host.expect_control(
+ uci.SessionStatusNtf(
+ session_token=0,
+ session_state=uci.SessionState.SESSION_STATE_INIT,
+ reason_code=0,
+ )
+ )
+
+ mac_address_mode = 0x0
+ ranging_duration = int(1000).to_bytes(4, byteorder="little")
+ device_role_responder = bytes([1])
+ device_type_controllee = bytes([0])
+ host.send_control(
+ uci.SessionSetAppConfigCmd(
+ session_token=0,
+ tlvs=[
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, v=device_role_responder
+ ),
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, v=device_type_controllee
+ ),
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address
+ ),
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
+ v=bytes([mac_address_mode]),
+ ),
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration
+ ),
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.NO_OF_CONTROLEE, v=bytes([1])
+ ),
+ uci.AppConfigTlv(
+ cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address
+ ),
+ ],
+ )
+ )
+
+ await host.expect_control(
+ uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[])
+ )
+
+ await host.expect_control(
+ uci.SessionStatusNtf(
+ session_token=0,
+ session_state=uci.SessionState.SESSION_STATE_IDLE,
+ reason_code=0,
+ )
+ )
+
+ host.send_control(uci.SessionStartCmd(session_id=0))
+
+ await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))
+
+ await host.expect_control(
+ uci.SessionStatusNtf(
+ session_token=0,
+ session_state=uci.SessionState.SESSION_STATE_ACTIVE,
+ reason_code=0,
+ )
+ )
+
+ await host.expect_control(
+ uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
+ )
+
+ with file.open("rb") as f:
+ application_data = list(bytearray(f.read()))
+ event = await host.expect_data(
+ uci.DataMessageRcv(
+ session_handle=0,
+ status=uci.StatusCode.UCI_STATUS_OK,
+ source_address=int.from_bytes(peer.mac_address, "big"),
+ data_sequence_number=0x01,
+ application_data=application_data,
+ ),
+ timeout=2.0,
+ )
+ event.show()
+
+ event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
+ event.show()
+
+ host.send_control(uci.SessionStopCmd(session_id=0))
+
+ await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))
+
+ await host.expect_control(
+ uci.SessionStatusNtf(
+ session_token=0,
+ session_state=uci.SessionState.SESSION_STATE_IDLE,
+ reason_code=0,
+ )
+ )
+
+ await host.expect_control(
+ uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
+ )
+
host.send_control(uci.SessionDeinitCmd(session_token=0))
await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
@@ -163,22 +325,24 @@ async def data_transfer(
async def run(address: str, uci_port: int, file: Path):
try:
- host0 = await Host.connect(address, uci_port, bytes([0, 1]))
- host1 = await Host.connect(address, uci_port, bytes([0, 2]))
- except Exception:
- print(
+ host0 = await Host.connect(address, uci_port, bytes([0, 0]))
+ host1 = await Host.connect(address, uci_port, bytes([0, 1]))
+ except Exception as e:
+ raise Exception(
f"Failed to connect to Pica server at address {address}:{uci_port}\n"
+ "Make sure the server is running"
)
- exit(1)
-
- async with asyncio.TaskGroup() as tg:
- tg.create_task(data_message_send(host0, host1, file))
- host0.disconnect()
- host1.disconnect()
-
- print("Data transfer test completed")
+ try:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(controller(host0, host1, file))
+ tg.create_task(controlee(host1, host0, file))
+ except Exception as e:
+ raise e
+ finally:
+ host0.disconnect()
+ host1.disconnect()
+ logging.debug("Data transfer test completed")
def main():
diff --git a/tests/test_runner.py b/tests/test_runner.py
index aa7a709..83c5d5a 100644
--- a/tests/test_runner.py
+++ b/tests/test_runner.py
@@ -1,5 +1,4 @@
import asyncio
-from asyncio.subprocess import Process
import pytest
import pytest_asyncio
import logging
@@ -11,7 +10,7 @@ from typing import Tuple
from . import ranging, data_transfer
-PICA_BIN = Path("./target/debug/pica-server")
+PICA_BIN = Path("./target/debug/pica")
DATA_FILE = Path("README.md")
PICA_LOCALHOST = "127.0.0.1"