diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2024-03-14 18:12:07 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2024-03-14 18:12:07 +0000 |
commit | 3479879194f41ad1d26b06870befde1c5c6e76c5 (patch) | |
tree | 09481a8f820f07e7f2912d53fab5ca71123c7d1f | |
parent | 1ddf2391d1f56623688c5d29f91e5f9f1ef2315d (diff) | |
parent | d32b655bf547114b57812e3218d61607a23382ff (diff) | |
download | pica-emu-35-1-release.tar.gz |
Snap for 11574415 from d32b655bf547114b57812e3218d61607a23382ff to emu-35-1-releaseemu-35-1-release
Change-Id: I36dca99114da032fefce0f7b3258fe1d6b20590d
-rw-r--r-- | .github/workflows/build_and_test.yml | 4 | ||||
-rw-r--r-- | Android.bp | 7 | ||||
-rw-r--r-- | Cargo.lock | 304 | ||||
-rw-r--r-- | Cargo.toml | 32 | ||||
-rw-r--r-- | METADATA | 4 | ||||
-rw-r--r-- | src/bin/http-server/main.rs | 564 | ||||
-rw-r--r-- | src/bin/http-server/position.rs (renamed from src/position.rs) | 0 | ||||
-rw-r--r-- | src/bin/main.rs (renamed from src/bin/server/mod.rs) | 65 | ||||
-rw-r--r-- | src/bin/server/web.rs | 269 | ||||
-rw-r--r-- | src/device.rs | 172 | ||||
-rw-r--r-- | src/lib.rs | 871 | ||||
-rw-r--r-- | src/mac_address.rs | 61 | ||||
-rw-r--r-- | src/packets.rs | 170 | ||||
-rw-r--r-- | src/pcapng.rs | 61 | ||||
-rw-r--r-- | src/session.rs | 78 | ||||
-rwxr-xr-x | tests/data_transfer.py | 190 | ||||
-rw-r--r-- | tests/test_runner.py | 3 |
17 files changed, 1754 insertions, 1101 deletions
diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 7a19021..bf154ef 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -16,10 +16,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Install Rust 1.67.1 + - name: Install Rust 1.76.0 uses: actions-rs/toolchain@v1 with: - toolchain: 1.67.1 + toolchain: 1.76.0 override: true components: rustfmt, clippy - name: Set Up Python 3.11 @@ -33,8 +33,10 @@ rust_library_host { rustlibs: [ "libanyhow", "libbytes", + "libfutures", "libglam", "libhex", + "liblog_rust", "libnum_traits", "libpdl_runtime", "libthiserror", @@ -45,10 +47,13 @@ rust_library_host { rust_binary_host { name: "pica", - srcs: ["src/bin/server/mod.rs"], + srcs: ["src/bin/main.rs"], proc_macros: ["libnum_derive"], rustlibs: [ "libanyhow", + "libenv_logger", + "libfutures", + "liblog_rust", "libpica", "libclap", "libtokio", @@ -18,6 +18,63 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] +name = "aho-corasick" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + +[[package]] name = "anyhow" version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -156,6 +213,12 @@ dependencies = [ ] [[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + +[[package]] name = "cpufeatures" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -185,48 +248,121 @@ dependencies = [ ] [[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e7cf40684ae96ade6232ed84582f40ce0a66efcd43a5117aef610534f8e0b8" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + +[[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -307,6 +443,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] name = "hyper" version = "0.14.18" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -349,12 +491,9 @@ checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "log" -version = "0.4.16" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" -dependencies = [ - "cfg-if", -] +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" @@ -379,7 +518,7 @@ checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -514,9 +653,12 @@ dependencies = [ "anyhow", "bytes", "clap", + "env_logger", + "futures", "glam", "hex", "hyper", + "log", "num-derive", "num-traits", "pdl-compiler", @@ -593,6 +735,35 @@ dependencies = [ ] [[package]] +name = "regex" +version = "1.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + +[[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -647,6 +818,15 @@ dependencies = [ ] [[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] name = "socket2" version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -663,7 +843,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -731,7 +911,7 @@ dependencies = [ "pin-project-lite", "socket2 0.5.5", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -834,6 +1014,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" [[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + +[[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -892,7 +1078,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.0", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -901,13 +1096,28 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -917,37 +1127,79 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" [[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + +[[package]] name = "windows_aarch64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" [[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + +[[package]] name = "windows_i686_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" [[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + +[[package]] name = "windows_i686_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" [[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + +[[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" [[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + +[[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" [[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + +[[package]] name = "windows_x86_64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" @@ -1,6 +1,6 @@ [package] name = "pica" -version = "0.1.7" +version = "0.1.8" edition = "2021" description = "Pica is a virtual UWB Controller implementing the FiRa UCI specification." repository = "https://github.com/google/pica" @@ -14,7 +14,7 @@ authors = [ "David De Jesus Duarte <licorne@google.com>", "Henri Chataing <henrichataing@google.com>", ] -default-run = "pica-server" +default-run = "pica" exclude = [ "res/*", "scripts/*" @@ -28,8 +28,13 @@ name = "pica" path = "src/lib.rs" [[bin]] -name = "pica-server" -path = "src/bin/server/mod.rs" +name = "pica" +path = "src/bin/main.rs" + +[[bin]] +name = "pica-http" +path = "src/bin/http-server/main.rs" +features = ["web"] [features] default = ["web"] @@ -39,17 +44,20 @@ web = ["hyper", "tokio/rt-multi-thread"] pdl-compiler = "0.2.3" [dependencies] -tokio = { version = "1.32.0", features = [ "fs", "io-util", "macros", "net", "rt" ] } -tokio-stream = { version = "0.1.8", features = ["sync"] } -bytes = "1" anyhow = "1.0.56" +bytes = "1" +futures = "0.3.30" +clap = { version = "4.1.8", default-features = false, features = ["derive", "error-context", "help", "std", "usage"] } +glam = "0.25.0" +hex = "0.4.3" +hyper = { version = "0.14", features = ["server", "stream", "http1", "tcp"], optional = true } +log = "0.4.20" +env_logger = "0.11.1" num-derive = "0.3.3" num-traits = "0.2.17" pdl-runtime = "0.2.2" -thiserror = "1.0.49" -glam = "0.25.0" -hyper = { version = "0.14", features = ["server", "stream", "http1", "tcp"], optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -hex = "0.4.3" -clap = { version = "4.1.8", default-features = false, features = ["derive", "error-context", "help", "std", "usage"] } +thiserror = "1.0.49" +tokio = { version = "1.32.0", features = [ "fs", "io-util", "macros", "net", "rt" ] } +tokio-stream = { version = "0.1.8", features = ["sync"] } @@ -12,7 +12,7 @@ third_party { type: GIT value: "https://github.com/google/pica.git" } - version: "v0.1.0" + version: "v0.1.8" license_type: NOTICE - last_upgrade_date { year: 2022 month: 12 day: 5 } + last_upgrade_date { year: 2024 month: 3 day: 13 } } diff --git a/src/bin/http-server/main.rs b/src/bin/http-server/main.rs new file mode 100644 index 0000000..0882661 --- /dev/null +++ b/src/bin/http-server/main.rs @@ -0,0 +1,564 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use clap::Parser; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode}; +use serde::{Deserialize, Serialize}; +use serde_json::error::Category as SerdeErrorCategory; +use std::collections::HashMap; +use std::convert::Infallible; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::net::TcpListener; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio::try_join; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; + +use pica::{Category, MacAddress, Pica, PicaCommand, PicaCommandError, PicaEvent}; + +mod position; +use position::Position; + +const DEFAULT_UCI_PORT: u16 = 7000; +const DEFAULT_WEB_PORT: u16 = 3000; + +const STATIC_FILES: &[(&str, &str, &str)] = &[ + ("/", "text/html", include_str!("../../../static/index.html")), + ( + "/openapi", + "text/html", + include_str!("../../../static/openapi.html"), + ), + ( + "/openapi.yaml", + "text/yaml", + include_str!("../../../static/openapi.yaml"), + ), + ( + "/src/components/Map.js", + "application/javascript", + include_str!("../../../static/src/components/Map.js"), + ), + ( + "/src/components/DeviceInfo.js", + "application/javascript", + include_str!("../../../static/src/components/DeviceInfo.js"), + ), + ( + "/src/components/Orientation.js", + "application/javascript", + include_str!("../../../static/src/components/Orientation.js"), + ), +]; + +/// Record information about an active device. +#[derive(Debug, Serialize, Clone)] +struct DeviceInformation { + category: pica::Category, + mac_address: MacAddress, + #[serde(flatten)] + position: Position, +} + +/// Record information about an active device. +#[derive(Clone, Debug, Serialize)] +#[serde(untagged)] +pub enum Event { + DeviceAdded { + category: Category, + mac_address: MacAddress, + #[serde(flatten)] + position: Position, + }, + DeviceRemoved { + category: Category, + mac_address: MacAddress, + }, + DeviceUpdated { + category: Category, + mac_address: MacAddress, + #[serde(flatten)] + position: Position, + }, + NeighborUpdated { + source_category: Category, + source_mac_address: MacAddress, + destination_category: Category, + destination_mac_address: MacAddress, + distance: u16, + azimuth: i16, + elevation: i8, + }, +} + +/// Record the position of active devices for reference by the +/// ranging estimator. +#[derive(Clone)] +struct Context { + devices: Arc<Mutex<HashMap<pica::Handle, DeviceInformation>>>, + events: broadcast::Sender<Event>, +} + +impl Context { + fn new() -> Self { + let (events, _) = broadcast::channel(1024); + Context { + devices: Arc::new(Mutex::new(HashMap::new())), + events, + } + } + + async fn handle_connection_events( + self, + mut events: broadcast::Receiver<PicaEvent>, + ) -> Result<()> { + loop { + match events.recv().await { + Ok(PicaEvent::Connected { + mac_address, + handle, + }) => { + let mut devices = self.devices.lock().unwrap(); + devices.insert( + handle, + DeviceInformation { + category: Category::Uci, + mac_address, + position: Default::default(), + }, + ); + self.events + .send(Event::DeviceAdded { + category: Category::Uci, + mac_address, + position: Default::default(), + }) + .unwrap(); + } + Ok(PicaEvent::Disconnected { + mac_address, + handle, + }) => { + let mut devices = self.devices.lock().unwrap(); + devices.remove(&handle); + self.events + .send(Event::DeviceRemoved { + category: Category::Uci, + mac_address, + }) + .unwrap(); + } + Err(err) => anyhow::bail!(err), + } + } + } + + fn http_events(&self) -> Response<Body> { + let stream = BroadcastStream::new(self.events.subscribe()).map(|result| { + result.map(|event| { + format!( + "event: {}\ndata: {}\n\n", + event.name(), + serde_json::to_string(&event).unwrap() + ) + }) + }); + Response::builder() + .header("content-type", "text/event-stream") + .body(Body::wrap_stream(stream)) + .unwrap() + } + + fn http_set_position(&self, mac_address: MacAddress, position: Position) -> Response<Body> { + log::info!("set-position({}, {})", mac_address, position); + + let mut devices = self.devices.lock().unwrap(); + let mut found_device = None; + for (_, device) in devices.iter_mut() { + if device.mac_address == mac_address { + device.position = position; + found_device = Some(device.clone()); + break; + } + } + + let Some(device) = found_device else { + return Response::builder() + .status(HttpStatusCode::NOT_FOUND) + .body("".into()) + .unwrap(); + }; + + self.events + .send(Event::DeviceUpdated { + category: device.category, + mac_address, + position, + }) + .unwrap(); + + for other in devices.values() { + if other.mac_address != device.mac_address { + let local = device + .position + .compute_range_azimuth_elevation(&other.position); + let remote = other + .position + .compute_range_azimuth_elevation(&device.position); + + assert!(local.0 == remote.0); + + self.events + .send(Event::NeighborUpdated { + source_category: device.category, + source_mac_address: device.mac_address, + destination_category: other.category, + destination_mac_address: other.mac_address, + distance: local.0, + azimuth: local.1, + elevation: local.2, + }) + .unwrap(); + + let _ = self + .events + .send(Event::NeighborUpdated { + source_category: other.category, + source_mac_address: other.mac_address, + destination_category: device.category, + destination_mac_address: device.mac_address, + distance: remote.0, + azimuth: remote.1, + elevation: remote.2, + }) + .unwrap(); + } + } + + Response::builder() + .status(HttpStatusCode::OK) + .body("".into()) + .unwrap() + } + + async fn http_create_anchor( + &self, + mac_address: MacAddress, + position: Position, + cmd_tx: mpsc::Sender<PicaCommand>, + ) -> Response<Body> { + log::info!("create-anchor({}, {})", mac_address, position); + + let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>(); + cmd_tx + .send(PicaCommand::CreateAnchor(mac_address, rsp_tx)) + .await + .unwrap(); + + let status = match rsp_rx.await { + Ok(Ok(handle)) => { + let mut devices = self.devices.lock().unwrap(); + devices.insert( + handle, + DeviceInformation { + position, + mac_address, + category: Category::Anchor, + }, + ); + self.events + .send(Event::DeviceAdded { + category: Category::Anchor, + mac_address, + position, + }) + .unwrap(); + HttpStatusCode::OK + } + Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT, + Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND, + Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR, + }; + + Response::builder().status(status).body("".into()).unwrap() + } + + async fn http_destroy_anchor( + &self, + mac_address: MacAddress, + cmd_tx: mpsc::Sender<PicaCommand>, + ) -> Response<Body> { + log::info!("destroy-anchor({})", mac_address); + + let (rsp_tx, rsp_rx) = oneshot::channel::<Result<pica::Handle, PicaCommandError>>(); + cmd_tx + .send(PicaCommand::DestroyAnchor(mac_address, rsp_tx)) + .await + .unwrap(); + + let status = match rsp_rx.await { + Ok(Ok(handle)) => { + let mut devices = self.devices.lock().unwrap(); + devices.remove(&handle); + self.events + .send(Event::DeviceRemoved { + category: Category::Anchor, + mac_address, + }) + .unwrap(); + HttpStatusCode::OK + } + Ok(Err(PicaCommandError::DeviceAlreadyExists(_))) => HttpStatusCode::CONFLICT, + Ok(Err(PicaCommandError::DeviceNotFound(_))) => HttpStatusCode::NOT_FOUND, + Err(_) => HttpStatusCode::INTERNAL_SERVER_ERROR, + }; + + Response::builder().status(status).body("".into()).unwrap() + } + + fn http_get_state(&self) -> Response<Body> { + log::info!("get-state()"); + + #[derive(Serialize)] + struct GetStateResponse { + devices: Vec<DeviceInformation>, + } + + let devices = self.devices.lock().unwrap(); + let response = GetStateResponse { + devices: devices.values().cloned().collect::<Vec<_>>(), + }; + let body = serde_json::to_string(&response).unwrap(); + Response::builder() + .status(HttpStatusCode::OK) + .body(body.into()) + .unwrap() + } +} + +impl pica::RangingEstimator for Context { + fn estimate( + &self, + left: &pica::Handle, + right: &pica::Handle, + ) -> anyhow::Result<pica::RangingMeasurement> { + let devices = self + .devices + .lock() + .map_err(|_| anyhow::anyhow!("cannot take lock"))?; + let left_pos = devices + .get(left) + .ok_or(anyhow::anyhow!("unknown position"))? + .position; + let right_pos = devices + .get(right) + .ok_or(anyhow::anyhow!("unknown position"))? + .position; + let (range, azimuth, elevation) = left_pos.compute_range_azimuth_elevation(&right_pos); + Ok(pica::RangingMeasurement { + range, + azimuth, + elevation, + }) + } +} + +#[derive(Deserialize)] +struct PositionBody { + x: i16, + y: i16, + z: i16, + yaw: i16, + pitch: i8, + roll: i16, +} + +macro_rules! position { + ($body: ident) => { + position!($body, false) + }; + ($body: ident, $mandatory: ident) => { + match serde_json::from_slice::<PositionBody>(&$body) { + Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll), + Err(err) => { + if !$mandatory && err.classify() == SerdeErrorCategory::Eof { + Position::default() + } else { + let reason = format!("Error while deserializing position: {}", err); + log::error!("{}", reason); + return Ok(Response::builder().status(406).body(reason.into()).unwrap()); + } + } + } + }; +} + +macro_rules! mac_address { + ($mac_address: ident) => { + match MacAddress::new($mac_address.to_string()) { + Ok(mac_address) => mac_address, + Err(err) => { + let reason = format!("Error mac_address: {}", err); + log::error!("{}", reason); + return Ok(Response::builder().status(406).body(reason.into()).unwrap()); + } + } + }; +} + +impl Event { + fn name(&self) -> &'static str { + match self { + Event::DeviceAdded { .. } => "device-added", + Event::DeviceRemoved { .. } => "device-removed", + Event::DeviceUpdated { .. } => "device-updated", + Event::NeighborUpdated { .. } => "neighbor-updated", + } + } +} + +async fn http_request( + mut req: Request<Body>, + cmd_tx: mpsc::Sender<PicaCommand>, + context: Context, +) -> Result<Response<Body>, Infallible> { + let static_file = STATIC_FILES + .iter() + .find(|(path, _, _)| req.uri().path() == *path); + + if let Some((_, mime, content)) = static_file { + return Ok(Response::builder() + .header("content-type", *mime) + .body((*content).into()) + .unwrap()); + } + + let body = body::to_bytes(req.body_mut()).await.unwrap(); + let response = match req + .uri_mut() + .path() + .trim_start_matches('/') + .split('/') + .collect::<Vec<_>>()[..] + { + ["events"] => context.http_events(), + ["init-uci-device", mac_address] => { + context.http_set_position(mac_address!(mac_address), position!(body)) + } + ["set-position", mac_address] => { + context.http_set_position(mac_address!(mac_address), position!(body)) + } + ["create-anchor", mac_address] => { + context + .http_create_anchor(mac_address!(mac_address), position!(body), cmd_tx) + .await + } + ["destroy-anchor", mac_address] => { + context + .http_destroy_anchor(mac_address!(mac_address), cmd_tx) + .await + } + ["get-state"] => context.http_get_state(), + + _ => Response::builder() + .status(HttpStatusCode::NOT_FOUND) + .body("".into()) + .unwrap(), + }; + + Ok(response) +} + +async fn serve(context: Context, tx: mpsc::Sender<PicaCommand>, web_port: u16) -> Result<()> { + let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port); + let make_svc = make_service_fn(move |_conn| { + let tx = tx.clone(); + let local_context = context.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + http_request(req, tx.clone(), local_context.clone()) + })) + } + }); + + let server = Server::bind(&addr.into()).serve(make_svc); + + log::info!("Pica: Web server started on http://0.0.0.0:{}", web_port); + + server.await?; + Ok(()) +} + +async fn listen(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> { + let uci_socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, uci_port); + let uci_listener = TcpListener::bind(uci_socket).await?; + log::info!("Pica: Listening on: {}", uci_port); + + loop { + let (socket, addr) = uci_listener.accept().await?; + log::info!("Uwb host addr: {}", addr); + + let (read_half, write_half) = socket.into_split(); + let stream = Box::pin(futures::stream::unfold(read_half, pica::packets::uci::read)); + let sink = Box::pin(futures::sink::unfold(write_half, pica::packets::uci::write)); + + tx.send(PicaCommand::Connect(stream, sink)) + .await + .map_err(|_| anyhow::anyhow!("pica command stream closed"))? + } +} + +#[derive(Parser, Debug)] +#[command(name = "pica", about = "Virtual UWB subsystem")] +struct Args { + /// Output directory for storing .pcapng traces. + /// If provided, .pcapng traces of client connections are automatically + /// saved under the name `device-{handle}.pcapng`. + #[arg(short, long, value_name = "DIR")] + pcapng_dir: Option<PathBuf>, + /// Configure the TCP port for the UCI server. + #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_UCI_PORT)] + uci_port: u16, + /// Configure the HTTP port for the web interface. + #[arg(short, long, value_name = "PORT", default_value_t = DEFAULT_WEB_PORT)] + web_port: u16, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + assert_ne!( + args.uci_port, args.web_port, + "UCI port and WEB port must be different." + ); + + let context = Context::new(); + + let pica = Pica::new(Box::new(context.clone()), args.pcapng_dir); + let cmd_tx = pica.commands(); + let events_rx = pica.events(); + + try_join!( + pica.run(), + listen(cmd_tx.clone(), args.uci_port), + serve(context.clone(), cmd_tx.clone(), args.web_port), + context.handle_connection_events(events_rx), + )?; + + Ok(()) +} diff --git a/src/position.rs b/src/bin/http-server/position.rs index 4ce8531..4ce8531 100644 --- a/src/position.rs +++ b/src/bin/http-server/position.rs diff --git a/src/bin/server/mod.rs b/src/bin/main.rs index 616ef5b..205550e 100644 --- a/src/bin/server/mod.rs +++ b/src/bin/main.rs @@ -12,16 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate bytes; -extern crate num_derive; -extern crate num_traits; -extern crate thiserror; - -#[cfg(feature = "web")] -mod web; - use anyhow::Result; use clap::Parser; +use env_logger::Env; use pica::{Pica, PicaCommand}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; @@ -30,17 +23,24 @@ use tokio::sync::mpsc; use tokio::try_join; const DEFAULT_UCI_PORT: u16 = 7000; -const DEFAULT_WEB_PORT: u16 = 3000; -async fn accept_incoming(tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> { +async fn accept_incoming(cmd_tx: mpsc::Sender<PicaCommand>, uci_port: u16) -> Result<()> { let uci_socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, uci_port); let uci_listener = TcpListener::bind(uci_socket).await?; - println!("Pica: Listening on: {}", uci_port); + log::info!("Pica: Listening on: {}", uci_port); loop { let (socket, addr) = uci_listener.accept().await?; - println!("Uwb host addr: {}", addr); - tx.send(PicaCommand::Connect(socket)).await? + log::info!("Uwb host addr: {}", addr); + + let (read_half, write_half) = socket.into_split(); + let stream = Box::pin(futures::stream::unfold(read_half, pica::packets::uci::read)); + let sink = Box::pin(futures::sink::unfold(write_half, pica::packets::uci::write)); + + cmd_tx + .send(PicaCommand::Connect(stream, sink)) + .await + .map_err(|_| anyhow::anyhow!("pica command stream closed"))? } } @@ -55,32 +55,33 @@ struct Args { /// Configure the TCP port for the UCI server. #[arg(short, long, value_name = "UCI_PORT", default_value_t = DEFAULT_UCI_PORT)] uci_port: u16, - /// Configure the HTTP port for the web interface. - #[arg(short, long, value_name = "WEB_PORT", default_value_t = DEFAULT_WEB_PORT)] - web_port: u16, +} + +struct MockRangingEstimator(); + +/// The position cannot be communicated to the pica environment when +/// using the default binary (HTTP interface not available). +/// Thus the ranging estimator cannot produce any result. +impl pica::RangingEstimator for MockRangingEstimator { + fn estimate( + &self, + _left: &pica::Handle, + _right: &pica::Handle, + ) -> Result<pica::RangingMeasurement> { + Err(anyhow::anyhow!("position not available")) + } } #[tokio::main] async fn main() -> Result<()> { - let args = Args::parse(); - assert_ne!( - args.uci_port, args.web_port, - "UCI port and Web port shall be different." - ); + env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init(); - let mut pica = Pica::new(args.pcapng_dir); - let pica_tx = pica.tx(); - let pica_events = pica.events(); + let args = Args::parse(); - #[cfg(feature = "web")] - try_join!( - accept_incoming(pica_tx.clone(), args.uci_port), - pica.run(), - web::serve(pica_tx, pica_events, args.web_port) - )?; + let pica = Pica::new(Box::new(MockRangingEstimator()), args.pcapng_dir); + let commands = pica.commands(); - #[cfg(not(feature = "web"))] - try_join!(accept_incoming(pica_tx.clone(), args.uci_port), pica.run(),)?; + try_join!(accept_incoming(commands.clone(), args.uci_port), pica.run(),)?; Ok(()) } diff --git a/src/bin/server/web.rs b/src/bin/server/web.rs deleted file mode 100644 index 1481e54..0000000 --- a/src/bin/server/web.rs +++ /dev/null @@ -1,269 +0,0 @@ -// Copyright 2022 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::convert::Infallible; -use std::net::{Ipv4Addr, SocketAddrV4}; - -use anyhow::{Context, Result}; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode}; -use serde::{Deserialize, Serialize}; -use serde_json::error::Category as SerdeErrorCategory; -use tokio::sync::{broadcast, mpsc, oneshot}; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; - -use pica::{ - Category, MacAddress, PicaCommand, PicaCommandError, PicaCommandStatus, PicaEvent, Position, -}; -use PicaEvent::{DeviceAdded, DeviceRemoved, DeviceUpdated, NeighborUpdated}; - -const STATIC_FILES: &[(&str, &str, &str)] = &[ - ("/", "text/html", include_str!("../../../static/index.html")), - ( - "/openapi", - "text/html", - include_str!("../../../static/openapi.html"), - ), - ( - "/openapi.yaml", - "text/yaml", - include_str!("../../../static/openapi.yaml"), - ), - ( - "/src/components/Map.js", - "application/javascript", - include_str!("../../../static/src/components/Map.js"), - ), - ( - "/src/components/DeviceInfo.js", - "application/javascript", - include_str!("../../../static/src/components/DeviceInfo.js"), - ), - ( - "/src/components/Orientation.js", - "application/javascript", - include_str!("../../../static/src/components/Orientation.js"), - ), -]; - -#[derive(Deserialize)] -struct PositionBody { - x: i16, - y: i16, - z: i16, - yaw: i16, - pitch: i8, - roll: i16, -} - -macro_rules! position { - ($body: ident) => { - position!($body, false) - }; - ($body: ident, $mandatory: ident) => { - match serde_json::from_slice::<PositionBody>(&$body) { - Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll), - Err(err) => { - if !$mandatory && err.classify() == SerdeErrorCategory::Eof { - Position::default() - } else { - let reason = format!("Error while deserializing position: {}", err); - println!("{}", reason); - return Ok(Response::builder().status(406).body(reason.into()).unwrap()); - } - } - } - }; -} - -macro_rules! mac_address { - ($mac_address: ident) => { - match MacAddress::new($mac_address.to_string()) { - Ok(mac_address) => mac_address, - Err(err) => { - let reason = format!("Error mac_address: {}", err); - println!("{}", reason); - return Ok(Response::builder().status(406).body(reason.into()).unwrap()); - } - } - }; -} - -#[derive(Debug, Serialize, Clone)] -struct Device { - pub category: Category, - pub mac_address: String, - #[serde(flatten)] - pub position: Position, -} - -fn event_name(event: &PicaEvent) -> &'static str { - match event { - DeviceAdded { .. } => "device-added", - DeviceRemoved { .. } => "device-removed", - DeviceUpdated { .. } => "device-updated", - NeighborUpdated { .. } => "neighbor-updated", - } -} - -async fn handle( - mut req: Request<Body>, - tx: mpsc::Sender<PicaCommand>, - events: broadcast::Receiver<PicaEvent>, -) -> Result<Response<Body>, Infallible> { - let static_file = STATIC_FILES - .iter() - .find(|(path, _, _)| req.uri().path() == *path); - - if let Some((_, mime, content)) = static_file { - return Ok(Response::builder() - .header("content-type", *mime) - .body((*content).into()) - .unwrap()); - } - - let body = body::to_bytes(req.body_mut()).await.unwrap(); - let (pica_cmd_rsp_tx, pica_cmd_rsp_rx) = oneshot::channel::<PicaCommandStatus>(); - - let send_cmd = |pica_cmd| async { - println!("PicaCommand: {}", pica_cmd); - tx.send(pica_cmd).await.unwrap(); - let (status, description) = match pica_cmd_rsp_rx.await { - Ok(Ok(_)) => (HttpStatusCode::OK, "success".into()), - Ok(Err(err)) => ( - match err { - PicaCommandError::DeviceAlreadyExists(_) => HttpStatusCode::CONFLICT, - PicaCommandError::DeviceNotFound(_) => HttpStatusCode::NOT_FOUND, - }, - format!("{}", err), - ), - Err(err) => ( - HttpStatusCode::INTERNAL_SERVER_ERROR, - format!("Error getting command response: {}", err), - ), - }; - println!(" status: {}, {}", status, description); - Response::builder() - .status(status) - .body(description.into()) - .unwrap() - }; - - match req - .uri_mut() - .path() - .trim_start_matches('/') - .split('/') - .collect::<Vec<_>>()[..] - { - ["events"] => { - let stream = BroadcastStream::new(events).map(|result| { - result.map(|event| { - format!( - "event: {}\ndata: {}\n\n", - event_name(&event), - serde_json::to_string(&event).unwrap() - ) - }) - }); - return Ok(Response::builder() - .header("content-type", "text/event-stream") - .body(Body::wrap_stream(stream)) - .unwrap()); - } - ["init-uci-device", mac_address] => { - return Ok(send_cmd(PicaCommand::InitUciDevice( - mac_address!(mac_address), - position!(body), - pica_cmd_rsp_tx, - )) - .await); - } - ["set-position", mac_address] => { - return Ok(send_cmd(PicaCommand::SetPosition( - mac_address!(mac_address), - position!(body), - pica_cmd_rsp_tx, - )) - .await); - } - ["create-anchor", mac_address] => { - return Ok(send_cmd(PicaCommand::CreateAnchor( - mac_address!(mac_address), - position!(body), - pica_cmd_rsp_tx, - )) - .await); - } - ["destroy-anchor", mac_address] => { - return Ok(send_cmd(PicaCommand::DestroyAnchor( - mac_address!(mac_address), - pica_cmd_rsp_tx, - )) - .await); - } - ["get-state"] => { - #[derive(Serialize)] - struct GetStateResponse { - devices: Vec<Device>, - } - println!("PicaCommand: GetState"); - let (state_tx, state_rx) = oneshot::channel::<Vec<_>>(); - tx.send(PicaCommand::GetState(state_tx)).await.unwrap(); - let devices = match state_rx.await { - Ok(devices) => GetStateResponse { - devices: devices - .into_iter() - .map(|(category, mac_address, position)| Device { - category, - mac_address: mac_address.into(), - position, - }) - .collect(), - }, - Err(_) => GetStateResponse { devices: vec![] }, - }; - let body = serde_json::to_string(&devices).unwrap(); - return Ok(Response::builder().status(200).body(body.into()).unwrap()); - } - - _ => (), - } - - Ok(Response::builder().status(404).body("".into()).unwrap()) -} - -pub async fn serve( - tx: mpsc::Sender<PicaCommand>, - events: broadcast::Sender<PicaEvent>, - web_port: u16, -) -> Result<()> { - let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port); - - let make_svc = make_service_fn(move |_conn| { - let tx = tx.clone(); - let events = events.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req| { - handle(req, tx.clone(), events.subscribe()) - })) - } - }); - - let server = Server::bind(&addr.into()).serve(make_svc); - - println!("Pica: Web server started on http://0.0.0.0:{}", web_port); - - server.await.context("Web Server Error") -} diff --git a/src/device.rs b/src/device.rs index e1cbfcb..48d2bd2 100644 --- a/src/device.rs +++ b/src/device.rs @@ -13,7 +13,6 @@ // limitations under the License. use crate::packets::uci::*; -use crate::position::Position; use crate::MacAddress; use crate::PicaCommand; @@ -25,8 +24,10 @@ use tokio::sync::mpsc; use tokio::time; use super::session::{Session, MAX_SESSION}; +use super::UciPacket; pub const MAX_DEVICE: usize = 4; + const UCI_VERSION: u16 = 0x0002; // Version 2.0 const MAC_VERSION: u16 = 0x3001; // Version 1.3.0 const PHY_VERSION: u16 = 0x3001; // Version 1.3.0 @@ -72,13 +73,12 @@ pub const DEFAULT_CAPS_INFO: &[(CapTlvType, &[u8])] = &[ ]; pub struct Device { - handle: usize, + pub handle: usize, pub mac_address: MacAddress, - pub position: Position, /// [UCI] 5. UWBS Device State Machine state: DeviceState, sessions: HashMap<u32, Session>, - pub tx: mpsc::Sender<ControlPacket>, + pub tx: mpsc::UnboundedSender<UciPacket>, pica_tx: mpsc::Sender<PicaCommand>, config: HashMap<DeviceConfigId, Vec<u8>>, country_code: [u8; 2], @@ -88,18 +88,14 @@ pub struct Device { impl Device { pub fn new( - device_handle: usize, - tx: mpsc::Sender<ControlPacket>, + handle: usize, + mac_address: MacAddress, + tx: mpsc::UnboundedSender<UciPacket>, pica_tx: mpsc::Sender<PicaCommand>, ) -> Self { - let mac_address = { - let handle = device_handle as u16; - MacAddress::Short(handle.to_be_bytes()) - }; Device { - handle: device_handle, + handle, mac_address, - position: Position::default(), state: DeviceState::DeviceStateError, // Will be overwitten sessions: Default::default(), tx, @@ -122,7 +118,6 @@ impl Device { tokio::spawn(async move { time::sleep(Duration::from_millis(5)).await; tx.send(DeviceStatusNtfBuilder { device_state }.build().into()) - .await .unwrap() }); } @@ -131,25 +126,69 @@ impl Device { self.set_state(DeviceState::DeviceStateReady); } - pub fn get_session(&self, session_id: u32) -> Option<&Session> { + pub fn session(&self, session_id: u32) -> Option<&Session> { self.sessions.get(&session_id) } - pub fn get_session_mut(&mut self, session_id: u32) -> Option<&mut Session> { + pub fn session_mut(&mut self, session_id: u32) -> Option<&mut Session> { self.sessions.get_mut(&session_id) } + pub fn can_start_ranging(&self, peer_session: &Session, session_id: u32) -> bool { + match self.session(session_id) { + Some(session) => { + session.session_state() == SessionState::SessionStateActive + && session + .app_config + .is_compatible_for_ranging(&peer_session.app_config) + } + None => false, + } + } + + pub fn can_start_data_transfer(&self, session_id: u32) -> bool { + match self.session(session_id) { + Some(session) => { + session.session_state() == SessionState::SessionStateActive + && session.session_type() == SessionType::FiraRangingAndInBandDataSession + && session.app_config.can_start_data_transfer() + } + None => false, + } + } + + pub fn can_receive_data_transfer(&self, session_id: u32) -> bool { + match self.session(session_id) { + Some(session) => { + session.session_state() == SessionState::SessionStateActive + && session.session_type() == SessionType::FiraRangingAndInBandDataSession + && session.app_config.can_receive_data_transfer() + } + None => false, + } + } + + // Send a response or notification to the Host. + fn send_control(&mut self, packet: impl Into<Vec<u8>>) { + let _ = self.tx.send(packet.into()); + } + // The fira norm specify to send a response, then reset, then // send a notification once the reset is done fn command_device_reset(&mut self, cmd: DeviceResetCmd) -> DeviceResetRsp { let reset_config = cmd.get_reset_config(); - println!("[{}] DeviceReset", self.handle); - println!(" reset_config={:?}", reset_config); + log::debug!("[{}] DeviceReset", self.handle); + log::debug!(" reset_config={:?}", reset_config); let status = match reset_config { ResetConfig::UwbsReset => StatusCode::UciStatusOk, }; - *self = Device::new(self.handle, self.tx.clone(), self.pica_tx.clone()); + *self = Device::new( + self.handle, + self.mac_address, + self.tx.clone(), + self.pica_tx.clone(), + ); self.init(); DeviceResetRspBuilder { status }.build() @@ -157,7 +196,7 @@ impl Device { fn command_get_device_info(&self, _cmd: GetDeviceInfoCmd) -> GetDeviceInfoRsp { // TODO: Implement a fancy build time state machine instead of crash at runtime - println!("[{}] GetDeviceInfo", self.handle); + log::debug!("[{}] GetDeviceInfo", self.handle); assert_eq!(self.state, DeviceState::DeviceStateReady); GetDeviceInfoRspBuilder { status: StatusCode::UciStatusOk, @@ -171,7 +210,7 @@ impl Device { } pub fn command_get_caps_info(&self, _cmd: GetCapsInfoCmd) -> GetCapsInfoRsp { - println!("[{}] GetCapsInfo", self.handle); + log::debug!("[{}] GetCapsInfo", self.handle); let caps = DEFAULT_CAPS_INFO .iter() @@ -189,7 +228,7 @@ impl Device { } pub fn command_set_config(&mut self, cmd: SetConfigCmd) -> SetConfigRsp { - println!("[{}] SetConfig", self.handle); + log::debug!("[{}] SetConfig", self.handle); assert_eq!(self.state, DeviceState::DeviceStateReady); // UCI 6.3 let (valid_parameters, invalid_config_status) = cmd.get_tlvs().iter().fold( @@ -217,7 +256,7 @@ impl Device { } pub fn command_get_config(&self, cmd: GetConfigCmd) -> GetConfigRsp { - println!("[{}] GetConfig", self.handle); + log::debug!("[{}] GetConfig", self.handle); // TODO: do this config shall be set on device reset let ids = cmd.get_cfg_id(); @@ -240,7 +279,7 @@ impl Device { v: Vec::new(), }), }, - Err(_) => println!("Failed to parse config id: {:?}", id), + Err(_) => log::error!("Failed to parse config id: {:?}", id), } (valid_parameters, invalid_parameters) @@ -264,9 +303,9 @@ impl Device { let session_id = cmd.get_session_id(); let session_type = cmd.get_session_type(); - println!("[{}] Session init", self.handle); - println!(" session_id=0x{:x}", session_id); - println!(" session_type={:?}", session_type); + log::debug!("[{}] Session init", self.handle); + log::debug!(" session_id=0x{:x}", session_id); + log::debug!(" session_type={:?}", session_type); let status = if self.sessions.len() >= MAX_SESSION { StatusCode::UciStatusMaxSessionsExceeded @@ -284,7 +323,7 @@ impl Device { Some(_) => StatusCode::UciStatusSessionDuplicate, None => { // Should not fail - self.get_session_mut(session_id).unwrap().init(); + self.session_mut(session_id).unwrap().init(); StatusCode::UciStatusOk } } @@ -295,8 +334,8 @@ impl Device { fn command_session_deinit(&mut self, cmd: SessionDeinitCmd) -> SessionDeinitRsp { let session_id = cmd.get_session_token(); - println!("[{}] Session deinit", self.handle); - println!(" session_id=0x{:x}", session_id); + log::debug!("[{}] Session deinit", self.handle); + log::debug!(" session_id=0x{:x}", session_id); let status = match self.sessions.get_mut(&session_id) { Some(session) => { @@ -315,7 +354,7 @@ impl Device { } fn command_session_get_count(&self, _cmd: SessionGetCountCmd) -> SessionGetCountRsp { - println!("[{}] Session get count", self.handle); + log::debug!("[{}] Session get count", self.handle); SessionGetCountRspBuilder { status: StatusCode::UciStatusOk, @@ -329,8 +368,8 @@ impl Device { cmd: AndroidSetCountryCodeCmd, ) -> AndroidSetCountryCodeRsp { let country_code = *cmd.get_country_code(); - println!("[{}] Set country code", self.handle); - println!(" country_code={},{}", country_code[0], country_code[1]); + log::debug!("[{}] Set country code", self.handle); + log::debug!(" country_code={},{}", country_code[0], country_code[1]); self.country_code = country_code; AndroidSetCountryCodeRspBuilder { @@ -343,7 +382,7 @@ impl Device { &mut self, _cmd: AndroidGetPowerStatsCmd, ) -> AndroidGetPowerStatsRsp { - println!("[{}] Get power stats", self.handle); + log::debug!("[{}] Get power stats", self.handle); // TODO AndroidGetPowerStatsRspBuilder { @@ -359,10 +398,11 @@ impl Device { } pub fn data_message_snd(&mut self, data: DataPacket) -> SessionControlNotification { + log::debug!("[{}] data_message_send", self.handle); match data.specialize() { DataPacketChild::DataMessageSnd(data_msg_snd) => { let session_token = data_msg_snd.get_session_handle(); - if let Some(session) = self.get_session_mut(session_token) { + if let Some(session) = self.session_mut(session_token) { session.data_message_snd(data_msg_snd) } else { DataTransferStatusNtfBuilder { @@ -393,7 +433,7 @@ impl Device { } } - pub fn command(&mut self, cmd: UciCommand) -> UciResponse { + fn receive_command(&mut self, cmd: UciCommand) -> UciResponse { match cmd.specialize() { // Handle commands for this device UciCommandChild::CoreCommand(core_command) => match core_command.specialize() { @@ -406,7 +446,6 @@ impl Device { }, // Handle commands for session management UciCommandChild::SessionConfigCommand(session_command) => { - // Session commands directly handled at Device level match session_command.specialize() { SessionConfigCommandChild::SessionInitCmd(cmd) => { return self.command_session_init(cmd).into(); @@ -435,7 +474,7 @@ impl Device { _ => panic!("Unsupported session command type"), }; - if let Some(session) = self.get_session_mut(session_id) { + if let Some(session) = self.session_mut(session_id) { // There is a session matching the session_id in the command // Pass the command through match session_command.specialize() { @@ -486,7 +525,7 @@ impl Device { } UciCommandChild::SessionControlCommand(ranging_command) => { let session_id = ranging_command.get_session_id(); - if let Some(session) = self.get_session_mut(session_id) { + if let Some(session) = self.session_mut(session_id) { // Forward to the proper session let response = session.ranging_command(ranging_command); match response.specialize() { @@ -577,4 +616,59 @@ impl Device { .build(), } } -}
\ No newline at end of file + + pub fn receive_packet(&mut self, packet: Vec<u8>) { + let mt = parse_message_type(packet[0]); + match mt { + MessageType::Data => match DataPacket::parse(&packet) { + Ok(packet) => { + let notification = self.data_message_snd(packet); + self.send_control(notification) + } + Err(err) => log::error!("failed to parse incoming Data packet: {}", err), + }, + MessageType::Command => { + match ControlPacket::parse(&packet) { + // Parsing error. Determine what error response should be + // returned to the host: + // - response and notifications are ignored, no response + // - if the group id is not known, STATUS_UNKNOWN_GID, + // - otherwise, and to simplify the code, STATUS_UNKNOWN_OID is + // always returned. That means that malformed commands + // get the same status code, instead of + // STATUS_SYNTAX_ERROR. + Err(_) => { + let group_id = packet[0] & 0xf; + let opcode_id = packet[1] & 0x3f; + + let status = if GroupId::try_from(group_id).is_ok() { + StatusCode::UciStatusUnknownOid + } else { + StatusCode::UciStatusUnknownGid + }; + // The PDL generated code cannot be used to generate + // responses with invalid group identifiers. + let response = vec![ + (u8::from(MessageType::Response) << 5) | group_id, + opcode_id, + 0, + 1, + status.into(), + ]; + self.send_control(response) + } + + // Parsing success, ignore non command packets. + Ok(cmd) => { + let response = self.receive_command(cmd.try_into().unwrap()); + self.send_control(response) + } + } + } + + // Message types for notifications and responses ignored + // by the controller. + _ => log::warn!("received unexpected packet of MT {:?}", mt), + } + } +} @@ -13,23 +13,17 @@ // limitations under the License. use anyhow::Result; -use bytes::Bytes; -use pdl_runtime::Packet; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Display; use std::path::PathBuf; +use std::pin::Pin; use thiserror::Error; -use tokio::net::TcpStream; use tokio::sync::{broadcast, mpsc, oneshot}; +pub mod packets; mod pcapng; -mod position; -pub use position::Position; - -mod packets; - use packets::uci::StatusCode as UciStatusCode; use packets::uci::*; @@ -37,14 +31,54 @@ mod device; use device::{Device, MAX_DEVICE}; mod session; -use session::{AppConfig, MAX_SESSION}; +use session::MAX_SESSION; mod mac_address; pub use mac_address::MacAddress; use crate::session::RangeDataNtfConfig; -pub type PicaCommandStatus = Result<(), PicaCommandError>; +pub type UciPacket = Vec<u8>; +pub type UciStream = Pin<Box<dyn futures::stream::Stream<Item = Vec<u8>> + Send>>; +pub type UciSink = Pin<Box<dyn futures::sink::Sink<Vec<u8>, Error = anyhow::Error> + Send>>; + +/// Handle allocated for created devices or anchors. +/// The handle is unique across the lifetime of the Pica context +/// and callers may assume that one handle is never reused. +pub type Handle = usize; + +/// Ranging measurement produced by a ranging estimator. +#[derive(Clone, Copy, Default, Debug)] +pub struct RangingMeasurement { + pub range: u16, + pub azimuth: i16, + pub elevation: i8, +} + +/// Trait matching the capabilities of a ranging estimator. +/// The estimator manages the position of the devices, and chooses +/// the algorithm used to generate the ranging measurements. +pub trait RangingEstimator: Send + Sync { + /// Evaluate the ranging measurement for the two input devices + /// identified by their respective handle. The result is a triplet + /// containing the range, azimuth, and elevation of the right device + /// relative to the left device. + fn estimate(&self, left: &Handle, right: &Handle) -> Result<RangingMeasurement>; +} + +/// Pica emulation environment. +/// All the devices added to this environment are emulated as if they were +/// from the same physical space. +pub struct Pica { + counter: usize, + devices: HashMap<Handle, Device>, + anchors: HashMap<MacAddress, Anchor>, + command_rx: Option<mpsc::Receiver<PicaCommand>>, + command_tx: mpsc::Sender<PicaCommand>, + event_tx: broadcast::Sender<PicaEvent>, + ranging_estimator: Box<dyn RangingEstimator>, + pcapng_dir: Option<PathBuf>, +} #[derive(Error, Debug, Clone, PartialEq, Eq)] pub enum PicaCommandError { @@ -54,46 +88,39 @@ pub enum PicaCommandError { DeviceNotFound(MacAddress), } -#[derive(Debug)] pub enum PicaCommand { // Connect a new device. - Connect(TcpStream), + Connect(UciStream, UciSink), // Disconnect the selected device. Disconnect(usize), // Execute ranging command for selected device and session. Ranging(usize, u32), // Send an in-band request to stop ranging to a peer controlee identified by address and session id. StopRanging(MacAddress, u32), - // Execute data message send for selected device and data. - UciData(usize, DataPacket), - // Execute UCI command received for selected device. - UciCommand(usize, UciCommand), - // Init Uci Device - InitUciDevice(MacAddress, Position, oneshot::Sender<PicaCommandStatus>), - // Set Position - SetPosition(MacAddress, Position, oneshot::Sender<PicaCommandStatus>), + // UCI packet received for the selected device. + UciPacket(usize, Vec<u8>), // Create Anchor - CreateAnchor(MacAddress, Position, oneshot::Sender<PicaCommandStatus>), + CreateAnchor( + MacAddress, + oneshot::Sender<Result<Handle, PicaCommandError>>, + ), // Destroy Anchor - DestroyAnchor(MacAddress, oneshot::Sender<PicaCommandStatus>), - // Get State - GetState(oneshot::Sender<Vec<(Category, MacAddress, Position)>>), + DestroyAnchor( + MacAddress, + oneshot::Sender<Result<Handle, PicaCommandError>>, + ), } impl Display for PicaCommand { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let cmd = match self { - PicaCommand::Connect(_) => "Connect", + PicaCommand::Connect(_, _) => "Connect", PicaCommand::Disconnect(_) => "Disconnect", PicaCommand::Ranging(_, _) => "Ranging", PicaCommand::StopRanging(_, _) => "StopRanging", - PicaCommand::UciData(_, _) => "UciData", - PicaCommand::UciCommand(_, _) => "UciCommand", - PicaCommand::InitUciDevice(_, _, _) => "InitUciDevice", - PicaCommand::SetPosition(_, _, _) => "SetPosition", - PicaCommand::CreateAnchor(_, _, _) => "CreateAnchor", + PicaCommand::UciPacket(_, _) => "UciPacket", + PicaCommand::CreateAnchor(_, _) => "CreateAnchor", PicaCommand::DestroyAnchor(_, _) => "DestroyAnchor", - PicaCommand::GetState(_) => "GetState", }; write!(f, "{}", cmd) } @@ -102,33 +129,15 @@ impl Display for PicaCommand { #[derive(Clone, Debug, Serialize)] #[serde(untagged)] pub enum PicaEvent { - // A Device was added - DeviceAdded { - category: Category, - mac_address: MacAddress, - #[serde(flatten)] - position: Position, - }, - // A Device was removed - DeviceRemoved { - category: Category, + // A UCI connection was added + Connected { + handle: Handle, mac_address: MacAddress, }, - // A Device position has changed - DeviceUpdated { - category: Category, + // A UCI connection was lost + Disconnected { + handle: Handle, mac_address: MacAddress, - #[serde(flatten)] - position: Position, - }, - NeighborUpdated { - source_category: Category, - source_mac_address: MacAddress, - destination_category: Category, - destination_mac_address: MacAddress, - distance: u16, - azimuth: i16, - elevation: i8, }, } @@ -140,99 +149,29 @@ pub enum Category { #[derive(Debug, Clone, Copy)] struct Anchor { + handle: Handle, + #[allow(unused)] mac_address: MacAddress, - position: Position, -} - -pub struct Pica { - devices: HashMap<usize, Device>, - anchors: HashMap<MacAddress, Anchor>, - counter: usize, - rx: mpsc::Receiver<PicaCommand>, - tx: mpsc::Sender<PicaCommand>, - event_tx: broadcast::Sender<PicaEvent>, - pcapng_dir: Option<PathBuf>, -} - -/// Result of UCI packet parsing. -enum UciParseResult { - UciCommand(UciCommand), - UciData(DataPacket), - Err(Bytes), - Skip, -} - -/// Parse incoming UCI packets. -/// Handle parsing errors by crafting a suitable error response packet. -fn parse_uci_packet(bytes: &[u8]) -> UciParseResult { - let message_type = parse_message_type(bytes[0]); - match message_type { - MessageType::Data => match DataPacket::parse(bytes) { - Ok(packet) => UciParseResult::UciData(packet), - Err(_) => UciParseResult::Skip, - }, - _ => { - match ControlPacket::parse(bytes) { - // Parsing error. Determine what error response should be - // returned to the host: - // - response and notifications are ignored, no response - // - if the group id is not known, STATUS_UNKNOWN_GID, - // - otherwise, and to simplify the code, STATUS_UNKNOWN_OID is - // always returned. That means that malformed commands - // get the same status code, instead of - // STATUS_SYNTAX_ERROR. - Err(_) => { - let group_id = bytes[0] & 0xf; - let opcode_id = bytes[1] & 0x3f; - - let status = match (message_type, GroupId::try_from(group_id)) { - (MessageType::Command, Ok(_)) => UciStatusCode::UciStatusUnknownOid, - (MessageType::Command, Err(_)) => UciStatusCode::UciStatusUnknownGid, - _ => return UciParseResult::Skip, - }; - // The PDL generated code cannot be used to generate - // responses with invalid group identifiers. - let response = vec![ - (u8::from(MessageType::Response) << 5) | group_id, - opcode_id, - 0, - 1, - status.into(), - ]; - UciParseResult::Err(response.into()) - } - - // Parsing success, ignore non command packets. - Ok(packet) => { - if let Ok(cmd) = packet.try_into() { - UciParseResult::UciCommand(cmd) - } else { - UciParseResult::Skip - } - } - } - } - } } fn make_measurement( mac_address: &MacAddress, - local: (u16, i16, i8), - remote: (u16, i16, i8), + local: RangingMeasurement, + remote: RangingMeasurement, ) -> ShortAddressTwoWayRangingMeasurement { if let MacAddress::Short(address) = mac_address { ShortAddressTwoWayRangingMeasurement { mac_address: u16::from_le_bytes(*address), status: UciStatusCode::UciStatusOk, nlos: 0, // in Line Of Sight - distance: local.0, - aoa_azimuth: local.1 as u16, + distance: local.range, + aoa_azimuth: local.azimuth as u16, aoa_azimuth_fom: 100, // Yup, pretty sure about this - aoa_elevation: local.2 as u16, + aoa_elevation: local.elevation as u16, aoa_elevation_fom: 100, // Yup, pretty sure about this - aoa_destination_azimuth: remote.1 as u16, + aoa_destination_azimuth: remote.azimuth as u16, aoa_destination_azimuth_fom: 100, - aoa_destination_elevation: remote.2 as u16, + aoa_destination_elevation: remote.elevation as u16, aoa_destination_elevation_fom: 100, slot_index: 0, rssi: u8::MAX, @@ -243,26 +182,27 @@ fn make_measurement( } impl Pica { - pub fn new(pcapng_dir: Option<PathBuf>) -> Self { - let (tx, rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE); + pub fn new(ranging_estimator: Box<dyn RangingEstimator>, pcapng_dir: Option<PathBuf>) -> Self { + let (command_tx, command_rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE); let (event_tx, _) = broadcast::channel(16); Pica { devices: HashMap::new(), anchors: HashMap::new(), counter: 0, - rx, - tx, + command_rx: Some(command_rx), + command_tx, event_tx, + ranging_estimator, pcapng_dir, } } - pub fn events(&self) -> broadcast::Sender<PicaEvent> { - self.event_tx.clone() + pub fn events(&self) -> broadcast::Receiver<PicaEvent> { + self.event_tx.subscribe() } - pub fn tx(&self) -> mpsc::Sender<PicaCommand> { - self.tx.clone() + pub fn commands(&self) -> mpsc::Sender<PicaCommand> { + self.command_tx.clone() } fn get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device> { @@ -287,177 +227,254 @@ impl Pica { } } - fn get_device_mut_by_mac(&mut self, mac_address: MacAddress) -> Option<&mut Device> { - self.devices - .values_mut() - .find(|d| d.mac_address == mac_address) + fn send_event(&self, event: PicaEvent) { + // An error here means that we have + // no receivers, so ignore it + let _ = self.event_tx.send(event); } - fn get_device_by_mac( - &self, - mac_address: &MacAddress, - local_app_config: &AppConfig, - session_id: u32, - ) -> Option<&Device> { - self.devices.values().find(|device| { - if let Some(session) = device.get_session(session_id) { - session.app_config.device_mac_address == *mac_address - && local_app_config.can_start_ranging_with_peer(&session.app_config) - && session.session_state() == SessionState::SessionStateActive - } else { - false - } - }) - } + /// Handle an incoming stream of UCI packets. + /// Reassemble control packets when fragmented, data packets are unmodified. + async fn read_routine( + mut uci_stream: impl futures::stream::Stream<Item = Vec<u8>> + Unpin, + cmd_tx: mpsc::Sender<PicaCommand>, + handle: Handle, + pcapng_file: Option<&pcapng::File>, + ) -> anyhow::Result<()> { + use futures::stream::StreamExt; - fn get_device_mut_by_mac_and_session_id( - &mut self, - mac_address: &MacAddress, - session_id: u32, - ) -> Option<&mut Device> { - self.devices.values_mut().find(|device| { - if let Some(session) = device.get_session(session_id) { - session.app_config.device_mac_address == *mac_address - && session.session_state() == SessionState::SessionStateActive - } else { - false + loop { + let mut complete_packet: Option<Vec<u8>> = None; + loop { + let packet = uci_stream + .next() + .await + .ok_or(anyhow::anyhow!("input packet stream closed"))?; + let header = + packets::uci::CommonPacketHeader::parse(&packet[0..COMMON_HEADER_SIZE])?; + + if let Some(file) = pcapng_file { + file.write(&packet, pcapng::Direction::Tx)?; + } + + match &mut complete_packet { + Some(complete_packet) => { + complete_packet.extend_from_slice(&packet[HEADER_SIZE..]) + } + None => complete_packet = Some(packet), + } + + if header.get_pbf() == packets::uci::PacketBoundaryFlag::Complete + || header.get_mt() == packets::uci::MessageType::Data + { + break; + } } - }) + + cmd_tx + .send(PicaCommand::UciPacket(handle, complete_packet.unwrap())) + .await + .unwrap() + } } - fn send_event(&self, event: PicaEvent) { - // An error here means that we have - // no receivers, so ignore it - let _ = self.event_tx.send(event); + /// Segment a stream of UCI packets. + async fn write_routine( + mut uci_sink: impl futures::sink::Sink<Vec<u8>> + Unpin, + mut packet_rx: mpsc::UnboundedReceiver<UciPacket>, + _handle: Handle, + pcapng_file: Option<&pcapng::File>, + ) -> anyhow::Result<()> { + use futures::sink::SinkExt; + + loop { + let complete_packet = packet_rx + .recv() + .await + .ok_or(anyhow::anyhow!("output packet stream closed"))?; + let mut offset = HEADER_SIZE; + let mt = parse_message_type(complete_packet[0]); + + while offset < complete_packet.len() { + let remaining_length = complete_packet.len() - offset; + let fragment_length = std::cmp::min( + remaining_length, + if mt == MessageType::Data { + MAX_DATA_PACKET_PAYLOAD_SIZE + } else { + MAX_CTRL_PACKET_PAYLOAD_SIZE + }, + ); + let pbf = if fragment_length == remaining_length { + PacketBoundaryFlag::Complete + } else { + PacketBoundaryFlag::NotComplete + }; + + let mut packet = Vec::with_capacity(HEADER_SIZE + fragment_length); + + packet.extend_from_slice(&complete_packet[0..HEADER_SIZE]); + const PBF_MASK: u8 = 0x10; + packet[0] &= !PBF_MASK; + packet[0] |= (pbf as u8) << 4; + + match mt { + MessageType::Data => { + packet[2..4].copy_from_slice(&(fragment_length as u16).to_le_bytes()) + } + _ => packet[3] = fragment_length as u8, + } + + packet.extend_from_slice(&complete_packet[offset..offset + fragment_length]); + + if let Some(file) = pcapng_file { + file.write(&packet, pcapng::Direction::Rx)?; + } + + uci_sink + .send(packet) + .await + .map_err(|_| anyhow::anyhow!("output packet sink closed"))?; + + offset += fragment_length; + } + } } - async fn connect(&mut self, stream: TcpStream) { - let (packet_tx, mut packet_rx) = mpsc::channel(MAX_SESSION); - let device_handle = self.counter; - let pica_tx = self.tx.clone(); + pub fn add_device(&mut self, stream: UciStream, sink: UciSink) -> Result<Handle> { + let (packet_tx, packet_rx) = mpsc::unbounded_channel(); + let pica_tx = self.command_tx.clone(); + let disconnect_tx = self.command_tx.clone(); let pcapng_dir = self.pcapng_dir.clone(); - println!("[{}] Connecting device", device_handle); - + let handle = self.counter; self.counter += 1; - let mut device = Device::new(device_handle, packet_tx, self.tx.clone()); + + log::debug!("[{}] Connecting device", handle); + + let mac_address = MacAddress::Short((handle as u16).to_be_bytes()); + let mut device = Device::new(handle, mac_address, packet_tx, self.command_tx.clone()); device.init(); - self.send_event(PicaEvent::DeviceAdded { - category: Category::Uci, + self.send_event(PicaEvent::Connected { + handle, mac_address: device.mac_address, - position: device.position, }); - self.devices.insert(device_handle, device); + self.devices.insert(handle, device); // Spawn and detach the connection handling task. // The task notifies pica when exiting to let it clean // the state. tokio::task::spawn(async move { - let mut pcapng_file = if let Some(dir) = pcapng_dir { - let full_path = dir.join(format!("device-{}.pcapng", device_handle)); - println!("Recording pcapng to file {}", full_path.as_path().display()); - Some(pcapng::File::create(full_path).await.unwrap()) + let pcapng_file = if let Some(dir) = pcapng_dir { + let full_path = dir.join(format!("device-{}.pcapng", handle)); + log::debug!("Recording pcapng to file {}", full_path.as_path().display()); + Some(pcapng::File::create(full_path).unwrap()) } else { None }; - let (uci_rx, uci_tx) = stream.into_split(); - let mut uci_reader = packets::uci::Reader::new(uci_rx); - let mut uci_writer = packets::uci::Writer::new(uci_tx); - - 'outer: loop { - tokio::select! { - // Read command packet sent from connected UWB host. - // Run associated command. - result = uci_reader.read(&mut pcapng_file) => - match result { - Ok(packet) => - match parse_uci_packet(&packet) { - UciParseResult::UciCommand(cmd) => { - pica_tx.send(PicaCommand::UciCommand(device_handle, cmd)).await.unwrap() - }, - UciParseResult::UciData(data) => { - pica_tx.send(PicaCommand::UciData(device_handle, data)).await.unwrap() - }, - UciParseResult::Err(response) => - uci_writer.write(&response, &mut pcapng_file).await.unwrap(), - UciParseResult::Skip => (), - }, - Err(_) => break 'outer - }, - - // Send response packets to the connected UWB host. - Some(packet) = packet_rx.recv() => - if uci_writer.write(&packet.to_bytes(), &mut pcapng_file).await.is_err() { - break 'outer - } - } - } - pica_tx - .send(PicaCommand::Disconnect(device_handle)) + let _ = tokio::try_join!( + async { Self::read_routine(stream, pica_tx, handle, pcapng_file.as_ref()).await }, + async { Self::write_routine(sink, packet_rx, handle, pcapng_file.as_ref()).await } + ); + + disconnect_tx + .send(PicaCommand::Disconnect(handle)) .await .unwrap() }); + + Ok(handle) } fn disconnect(&mut self, device_handle: usize) { - println!("[{}] Disconnecting device", device_handle); + log::debug!("[{}] Disconnecting device", device_handle); - match self - .devices - .get(&device_handle) - .ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into())) - { - Ok(device) => { - self.send_event(PicaEvent::DeviceRemoved { - category: Category::Uci, - mac_address: device.mac_address, - }); - self.devices.remove(&device_handle); - } - Err(err) => println!("{}", err), + if let Some(device) = self.devices.get(&device_handle) { + self.send_event(PicaEvent::Disconnected { + handle: device_handle, + mac_address: device.mac_address, + }); + self.devices.remove(&device_handle); } } - async fn ranging(&mut self, device_handle: usize, session_id: u32) { - println!("[{}] Ranging event", device_handle); - println!(" session_id={}", session_id); + fn ranging(&mut self, device_handle: usize, session_id: u32) { + log::debug!("[{}] Ranging event", device_handle); + log::debug!(" session_id={}", session_id); let device = self.get_device(device_handle).unwrap(); - let session = device.get_session(session_id).unwrap(); + let session = device.session(session_id).unwrap(); + let mut data_transfer = Vec::new(); let mut measurements = Vec::new(); - session - .get_dst_mac_addresses() - .iter() - .for_each(|mac_address| { - if let Some(anchor) = self.anchors.get(mac_address) { - let local = device - .position - .compute_range_azimuth_elevation(&anchor.position); - let remote = anchor - .position - .compute_range_azimuth_elevation(&device.position); - - assert!(local.0 == remote.0); - measurements.push(make_measurement(mac_address, local, remote)); - } - if let Some(peer_device) = - self.get_device_by_mac(mac_address, &session.app_config, session_id) - { - let local: (u16, i16, i8) = device - .position - .compute_range_azimuth_elevation(&peer_device.position); - let remote = peer_device - .position - .compute_range_azimuth_elevation(&device.position); - - assert!(local.0 == remote.0); - measurements.push(make_measurement(mac_address, local, remote)); - } - }); + + // Look for compatible anchors. + for mac_address in session.get_dst_mac_addresses() { + if let Some(other) = self.anchors.get(mac_address) { + let local = self + .ranging_estimator + .estimate(&device.handle, &other.handle) + .unwrap_or(Default::default()); + let remote = self + .ranging_estimator + .estimate(&other.handle, &device.handle) + .unwrap_or(Default::default()); + measurements.push(make_measurement(mac_address, local, remote)); + } + } + + // Look for compatible ranging sessions in other devices. + for peer_device in self.devices.values() { + if peer_device.handle == device_handle { + continue; + } + + if peer_device.can_start_ranging(session, session_id) { + let peer_mac_address = peer_device + .session(session_id) + .unwrap() + .app_config + .device_mac_address; + let local = self + .ranging_estimator + .estimate(&device.handle, &peer_device.handle) + .unwrap_or(Default::default()); + let remote = self + .ranging_estimator + .estimate(&peer_device.handle, &device.handle) + .unwrap_or(Default::default()); + measurements.push(make_measurement(&peer_mac_address, local, remote)); + } + + if device.can_start_data_transfer(session_id) + && peer_device.can_receive_data_transfer(session_id) + { + data_transfer.push(peer_device); + } + } + + // TODO: Data transfer should be limited in size for + // each round of ranging + for peer_device in data_transfer.iter() { + peer_device + .tx + .send( + DataMessageRcvBuilder { + application_data: session.data().clone().into(), + data_sequence_number: 0x01, + pbf: PacketBoundaryFlag::Complete, + session_handle: session_id, + source_address: device.mac_address.into(), + status: UciStatusCode::UciStatusOk, + } + .build() + .into(), + ) + .unwrap(); + } if session.is_ranging_data_ntf_enabled() != RangeDataNtfConfig::Disable { device .tx @@ -474,275 +491,155 @@ impl Pica { .build() .into(), ) - .await .unwrap(); let device = self.get_device_mut(device_handle).unwrap(); - let session = device.get_session_mut(session_id).unwrap(); + let session = device.session_mut(session_id).unwrap(); session.sequence_number += 1; } + + // TODO: Clean the data only when all the data is transfered + let device = self.get_device_mut(device_handle).unwrap(); + let session = device.session_mut(session_id).unwrap(); + + session.clear_data(); } - async fn uci_data(&mut self, device_handle: usize, data: DataPacket) { - match self - .get_device_mut(device_handle) - .ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into())) - { - Ok(device) => { - let response: SessionControlNotification = device.data_message_snd(data); - device.tx.send(response.into()).await.unwrap_or_else(|err| { - println!("Failed to send UCI data packet response: {}", err) - }); - } - Err(err) => println!("{}", err), + fn uci_packet(&mut self, device_handle: usize, packet: Vec<u8>) { + match self.get_device_mut(device_handle) { + Some(device) => device.receive_packet(packet), + None => log::error!("Device {} not found", device_handle), } } - async fn command(&mut self, device_handle: usize, cmd: UciCommand) { - match self - .get_device_mut(device_handle) - .ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into())) - { - Ok(device) => { - let response: ControlPacket = device.command(cmd).into(); - device - .tx - .send(response) - .await - .unwrap_or_else(|err| println!("Failed to send UCI command response: {}", err)); + + fn pica_command(&mut self, command: PicaCommand) { + use PicaCommand::*; + match command { + Connect(stream, sink) => { + let _ = self.add_device(stream, sink); + } + Disconnect(device_handle) => self.disconnect(device_handle), + Ranging(device_handle, session_id) => self.ranging(device_handle, session_id), + StopRanging(mac_address, session_id) => { + self.stop_controlee_ranging(&mac_address, session_id) + } + UciPacket(device_handle, packet) => self.uci_packet(device_handle, packet), + CreateAnchor(mac_address, pica_cmd_rsp_tx) => { + self.create_anchor(mac_address, pica_cmd_rsp_tx) + } + DestroyAnchor(mac_address, pica_cmd_rsp_tx) => { + self.destroy_anchor(mac_address, pica_cmd_rsp_tx) } - Err(err) => println!("{}", err), } } - pub async fn run(&mut self) -> Result<()> { + /// Run the internal pica event loop. + pub async fn run(mut self) -> Result<()> { + let Some(mut command_rx) = self.command_rx.take() else { + anyhow::bail!("missing pica command receiver") + }; loop { - use PicaCommand::*; - match self.rx.recv().await { - Some(Connect(stream)) => { - self.connect(stream).await; - } - Some(Disconnect(device_handle)) => self.disconnect(device_handle), - Some(Ranging(device_handle, session_id)) => { - self.ranging(device_handle, session_id).await; - } - Some(StopRanging(mac_address, session_id)) => { - self.stop_controlee_ranging(&mac_address, session_id).await; - } - Some(UciData(device_handle, data)) => self.uci_data(device_handle, data).await, - Some(UciCommand(device_handle, cmd)) => self.command(device_handle, cmd).await, - Some(SetPosition(mac_address, position, pica_cmd_rsp_tx)) => { - self.set_position(mac_address, position, pica_cmd_rsp_tx) - } - Some(CreateAnchor(mac_address, position, pica_cmd_rsp_tx)) => { - self.create_anchor(mac_address, position, pica_cmd_rsp_tx) - } - Some(DestroyAnchor(mac_address, pica_cmd_rsp_tx)) => { - self.destroy_anchor(mac_address, pica_cmd_rsp_tx) - } - Some(GetState(state_tx)) => self.get_state(state_tx), - Some(InitUciDevice(mac_address, position, pica_cmd_rsp_tx)) => { - self.init_uci_device(mac_address, position, pica_cmd_rsp_tx); - } - None => (), - }; + if let Some(command) = command_rx.recv().await { + self.pica_command(command) + } } } // Handle the in-band StopRanging command sent from controller to the controlee with // corresponding mac_address and session_id. - async fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) { - if let Some(device) = self.get_device_mut_by_mac_and_session_id(mac_address, session_id) { - // If such device with target session is found, stop the ranging session. - let session = device.get_session_mut(session_id).unwrap(); - session.stop_ranging_task(); - session.set_state( - SessionState::SessionStateIdle, - ReasonCode::SessionStoppedDueToInbandSignal, - ); - device.n_active_sessions -= 1; - if device.n_active_sessions == 0 { - device.set_state(DeviceState::DeviceStateReady); - } - } - } - - // TODO: Assign a reserved range of mac addresses for UCI devices - // to protect against conflicts with user defined Anchor addresses - // b/246000641 - fn init_uci_device( - &mut self, - mac_address: MacAddress, - position: Position, - pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>, - ) { - println!("[_] Init device"); - println!(" mac_address: {}", mac_address); - println!(" position={:?}", position); - - let status = self - .get_device_mut_by_mac(mac_address) - .ok_or(PicaCommandError::DeviceNotFound(mac_address)) - .map(|uci_device| { - uci_device.mac_address = mac_address; - uci_device.position = position; - }); - - pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| { - println!("Failed to send init-uci-device command response: {:?}", err) - }); - } - - fn set_position( - &mut self, - mac_address: MacAddress, - position: Position, - pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>, - ) { - let mut status = if let Some(uci_device) = self.get_device_mut_by_mac(mac_address) { - uci_device.position = position; - Ok(()) - } else if let Some(anchor) = self.anchors.get_mut(&mac_address) { - anchor.position = position; - Ok(()) - } else { - Err(PicaCommandError::DeviceNotFound(mac_address)) - }; - - if status.is_ok() { - status = self.update_position(mac_address, position) - } - - pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| { - println!("Failed to send set-position command response: {:?}", err) - }); - } + fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) { + for device in self.devices.values_mut() { + let Some(session) = device.session_mut(session_id) else { + continue; + }; - fn update_position( - &self, - mac_address: MacAddress, - position: Position, - ) -> Result<(), PicaCommandError> { - let category = match self.get_category(&mac_address) { - Some(category) => category, - None => { - return Err(PicaCommandError::DeviceNotFound(mac_address)); + if &session.app_config.device_mac_address != mac_address { + continue; } - }; - self.send_event(PicaEvent::DeviceUpdated { - category, - mac_address, - position, - }); - let devices = self.devices.values().map(|d| (d.mac_address, d.position)); - let anchors = self.anchors.values().map(|b| (b.mac_address, b.position)); - - let update_neighbors = |device_category, device_mac_address, device_position| { - if mac_address != device_mac_address { - let local = position.compute_range_azimuth_elevation(&device_position); - let remote = device_position.compute_range_azimuth_elevation(&position); - - assert!(local.0 == remote.0); - - self.send_event(PicaEvent::NeighborUpdated { - source_category: category, - source_mac_address: mac_address, - destination_category: device_category, - destination_mac_address: device_mac_address, - distance: local.0, - azimuth: local.1, - elevation: local.2, - }); - - self.send_event(PicaEvent::NeighborUpdated { - source_category: device_category, - source_mac_address: device_mac_address, - destination_category: category, - destination_mac_address: mac_address, - distance: remote.0, - azimuth: remote.1, - elevation: remote.2, - }); + if session.session_state() == SessionState::SessionStateActive { + session.stop_ranging_task(); + session.set_state( + SessionState::SessionStateIdle, + ReasonCode::SessionStoppedDueToInbandSignal, + ); + device.n_active_sessions = device.n_active_sessions.saturating_sub(1); + if device.n_active_sessions == 0 { + device.set_state(DeviceState::DeviceStateReady); + } + } else { + log::warn!("stop_controlee_ranging: session is not active !"); } - }; - - devices.for_each(|device| update_neighbors(Category::Uci, device.0, device.1)); - anchors.for_each(|anchor| update_neighbors(Category::Anchor, anchor.0, anchor.1)); - Ok(()) + } } #[allow(clippy::map_entry)] fn create_anchor( &mut self, mac_address: MacAddress, - position: Position, - pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>, + rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>, ) { - println!("Create anchor: {} {}", mac_address, position); + log::debug!("[_] Create anchor"); + log::debug!(" mac_address: {}", mac_address); + let status = if self.get_category(&mac_address).is_some() { Err(PicaCommandError::DeviceAlreadyExists(mac_address)) } else { - self.send_event(PicaEvent::DeviceAdded { - category: Category::Anchor, - mac_address, - position, - }); + let handle = self.counter; + self.counter += 1; + assert!(self .anchors .insert( mac_address, Anchor { + handle, mac_address, - position, }, ) .is_none()); - Ok(()) + + Ok(handle) }; - pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| { - println!("Failed to send create-anchor command response: {:?}", err) + rsp_tx.send(status).unwrap_or_else(|err| { + log::error!("Failed to send create-anchor command response: {:?}", err) }) } fn destroy_anchor( &mut self, mac_address: MacAddress, - pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>, + rsp_tx: oneshot::Sender<Result<Handle, PicaCommandError>>, ) { - println!("[_] Destroy anchor"); - println!(" mac_address: {}", mac_address); + log::debug!("[_] Destroy anchor"); + log::debug!(" mac_address: {}", mac_address); - let status = if self.anchors.remove(&mac_address).is_none() { - Err(PicaCommandError::DeviceNotFound(mac_address)) - } else { - self.send_event(PicaEvent::DeviceRemoved { - category: Category::Anchor, - mac_address, - }); - Ok(()) + let status = match self.anchors.remove(&mac_address) { + None => Err(PicaCommandError::DeviceNotFound(mac_address)), + Some(anchor) => Ok(anchor.handle), }; - pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| { - println!("Failed to send destroy-anchor command response: {:?}", err) + + rsp_tx.send(status).unwrap_or_else(|err| { + log::error!("Failed to send destroy-anchor command response: {:?}", err) }) } +} - fn get_state(&self, state_tx: oneshot::Sender<Vec<(Category, MacAddress, Position)>>) { - println!("[_] Get State"); - - state_tx - .send( - self.anchors - .values() - .map(|anchor| (Category::Anchor, anchor.mac_address, anchor.position)) - .chain( - self.devices - .values() - .map(|device| (Category::Uci, device.mac_address, device.position)), - ) - .collect(), - ) - .unwrap(); +/// Run the internal pica event loop. +/// As opposed to Pica::run, the context is passed under a mutex, which +/// allows synchronous access to the context for device creation. +pub async fn run(this: &std::sync::Mutex<Pica>) -> Result<()> { + // Extract the mpsc receiver from the Pica context. + // The receiver cannot be cloned. + let Some(mut command_rx) = this.lock().unwrap().command_rx.take() else { + anyhow::bail!("missing pica command receiver"); + }; + + loop { + if let Some(command) = command_rx.recv().await { + this.lock().unwrap().pica_command(command) + } } }
\ No newline at end of file diff --git a/src/mac_address.rs b/src/mac_address.rs index 320631a..79cfe94 100644 --- a/src/mac_address.rs +++ b/src/mac_address.rs @@ -19,21 +19,22 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; const SHORT_MAC_ADDRESS_SIZE: usize = 2; -const STRING_SHORT_MAC_ADDRESS_SIZE: usize = 2 * SHORT_MAC_ADDRESS_SIZE; +const SHORT_MAC_ADDRESS_STR_SIZE: usize = 2 * SHORT_MAC_ADDRESS_SIZE; -const EXTEND_MAC_ADDRESS_SIZE: usize = 8; -const STRING_EXTEND_MAC_ADDRESS_SIZE: usize = 2 * EXTEND_MAC_ADDRESS_SIZE; +const EXTENDED_MAC_ADDRESS_SIZE: usize = 8; +const EXTENDED_MAC_ADDRESS_STR_SIZE: usize = 2 * EXTENDED_MAC_ADDRESS_SIZE; #[derive(Error, Debug)] pub enum Error { #[error("MacAddress has the wrong format: 0")] MacAddressWrongFormat(String), } + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(try_from = "String", into = "String")] pub enum MacAddress { Short([u8; SHORT_MAC_ADDRESS_SIZE]), - Extend([u8; EXTEND_MAC_ADDRESS_SIZE]), + Extended([u8; EXTENDED_MAC_ADDRESS_SIZE]), } impl MacAddress { @@ -42,30 +43,30 @@ impl MacAddress { } } -impl From<usize> for MacAddress { - fn from(device_handle: usize) -> Self { - let handle = device_handle as u64; - MacAddress::Extend(handle.to_be_bytes()) +impl From<MacAddress> for u64 { + fn from(mac_address: MacAddress) -> Self { + match mac_address { + MacAddress::Short(addr) => u16::from_le_bytes(addr) as u64, + MacAddress::Extended(addr) => u64::from_le_bytes(addr), + } } } impl TryFrom<String> for MacAddress { type Error = Error; fn try_from(mac_address: String) -> std::result::Result<Self, Error> { - let mac_address = mac_address.replace(':', ""); - let mac_address = mac_address.replace("%3A", ""); - let uwb_mac_address = match mac_address.len() { - STRING_SHORT_MAC_ADDRESS_SIZE => MacAddress::Short( + let mac_address = mac_address.replace(':', "").replace("%3A", ""); + Ok(match mac_address.len() { + SHORT_MAC_ADDRESS_STR_SIZE => MacAddress::Short( <[u8; SHORT_MAC_ADDRESS_SIZE]>::from_hex(mac_address) .map_err(|err| Error::MacAddressWrongFormat(err.to_string()))?, ), - STRING_EXTEND_MAC_ADDRESS_SIZE => MacAddress::Extend( - <[u8; EXTEND_MAC_ADDRESS_SIZE]>::from_hex(mac_address) + EXTENDED_MAC_ADDRESS_STR_SIZE => MacAddress::Extended( + <[u8; EXTENDED_MAC_ADDRESS_SIZE]>::from_hex(mac_address) .map_err(|err| Error::MacAddressWrongFormat(err.to_string()))?, ), _ => return Err(Error::MacAddressWrongFormat(mac_address)), - }; - Ok(uwb_mac_address) + }) } } @@ -81,7 +82,7 @@ impl From<&MacAddress> for String { }; match mac_address { MacAddress::Short(address) => to_string(address), - MacAddress::Extend(address) => to_string(address), + MacAddress::Extended(address) => to_string(address), } } } @@ -112,7 +113,7 @@ mod tests { let valid_mac_address = "FF:77:AA:DD:EE:BB:CC:10"; assert_eq!( MacAddress::new(valid_mac_address.into()).unwrap(), - MacAddress::Extend([0xFF, 0x77, 0xAA, 0xDD, 0xEE, 0xBB, 0xCC, 0x10]) + MacAddress::Extended([0xFF, 0x77, 0xAA, 0xDD, 0xEE, 0xBB, 0xCC, 0x10]) ); } @@ -132,16 +133,32 @@ mod tests { #[test] fn display_mac_address() { - let extend_mac_address = "00:FF:77:AA:DD:EE:CC:45"; + let extended_mac_address = "00:FF:77:AA:DD:EE:CC:45"; let short_mac_address = "00:FF"; assert_eq!( - format!("{}", MacAddress::new(extend_mac_address.into()).unwrap()), - extend_mac_address + format!("{}", MacAddress::new(extended_mac_address.into()).unwrap()), + extended_mac_address ); assert_eq!( format!("{}", MacAddress::new(short_mac_address.into()).unwrap()), short_mac_address ); - assert_eq!(extend_mac_address.to_string(), extend_mac_address); + assert_eq!(extended_mac_address.to_string(), extended_mac_address); + } + + #[test] + fn test_short_mac_to_u64() { + let short_mac = MacAddress::Short([0x01, 0x02]); + let result: u64 = short_mac.into(); + let expected: u64 = 0x0201; + assert_eq!(result, expected); + } + + #[test] + fn test_extended_mac_to_u64() { + let extended_mac = MacAddress::Extended([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]); + let result: u64 = extended_mac.into(); + let expected: u64 = 0x0807060504030201; + assert_eq!(result, expected); } } diff --git a/src/packets.rs b/src/packets.rs index f0b4b64..16e7b4f 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -39,152 +39,50 @@ pub mod uci { MessageType::try_from((byte >> 5) & 0x7).unwrap_or(MessageType::Command) } - use crate::pcapng; - use std::pin::Pin; - use tokio::io::{AsyncRead, AsyncWrite}; - use tokio::sync::Mutex; + /// Read a single UCI packet from a TCP read half. + /// This function does not reassemble segmented packets. + pub async fn read( + mut socket: tokio::net::tcp::OwnedReadHalf, + ) -> Option<(Vec<u8>, tokio::net::tcp::OwnedReadHalf)> { + use tokio::io::AsyncReadExt; - /// Read UCI Control and Data packets received on the UCI transport. - /// Performs recombination of the segmented packets. - pub struct Reader { - socket: Pin<Box<dyn AsyncRead + Send>>, - } - - /// Write UCI Control and Data packets received to the UCI transport. - /// Performs segmentation of the packets. - pub struct Writer { - socket: Pin<Box<dyn AsyncWrite + Send>>, - } - - impl Reader { - /// Create an UCI reader from an UCI transport. - pub fn new<T: AsyncRead + Send + 'static>(rx: T) -> Self { - Reader { - socket: Box::pin(rx), - } - } - - /// Read a single UCI packet from the reader. The packet is automatically - /// re-assembled if segmented on the UCI transport. Data segments - /// are _not_ re-assembled but returned immediatly for credit - /// acknowledgment. - pub async fn read(&mut self, pcapng: &mut Option<pcapng::File>) -> anyhow::Result<Vec<u8>> { - use tokio::io::AsyncReadExt; - - let mut complete_packet = vec![0; HEADER_SIZE]; - - // Note on reassembly: - // For each segment of a Control Message, the - // header of the Control Packet SHALL contain the same MT, GID and OID - // values. It is correct to keep only the last header of the segmented packet. - loop { - // Read the common packet header. - self.socket - .read_exact(&mut complete_packet[0..HEADER_SIZE]) - .await?; - let common_packet_header = - CommonPacketHeader::parse(&complete_packet[0..COMMON_HEADER_SIZE])?; + let mut packet = vec![0; HEADER_SIZE]; - // Read the packet payload. - let payload_length = match common_packet_header.get_mt() { - MessageType::Data => { - let data_packet_header = - DataPacketHeader::parse(&complete_packet[0..HEADER_SIZE])?; - data_packet_header.get_payload_length() as usize - } - _ => { - let control_packet_header = - ControlPacketHeader::parse(&complete_packet[0..HEADER_SIZE])?; - control_packet_header.get_payload_length() as usize - } - }; - let mut payload_bytes = vec![0; payload_length]; - self.socket.read_exact(&mut payload_bytes).await?; - complete_packet.extend(&payload_bytes); + // Read the common packet header. + socket.read_exact(&mut packet[0..HEADER_SIZE]).await.ok()?; - if let Some(ref mut pcapng) = pcapng { - let mut packet_bytes = vec![]; - packet_bytes.extend(&complete_packet[0..HEADER_SIZE]); - packet_bytes.extend(&payload_bytes); - pcapng.write(&packet_bytes, pcapng::Direction::Tx).await?; - } + let common_packet_header = + CommonPacketHeader::parse(&packet[0..COMMON_HEADER_SIZE]).ok()?; - if common_packet_header.get_mt() == MessageType::Data { - return Ok(complete_packet); - } - - // Check the Packet Boundary Flag. - match common_packet_header.get_pbf() { - PacketBoundaryFlag::Complete => return Ok(complete_packet), - PacketBoundaryFlag::NotComplete => (), - } + let payload_length = match common_packet_header.get_mt() { + MessageType::Data => { + let data_packet_header = DataPacketHeader::parse(&packet[0..HEADER_SIZE]).ok()?; + data_packet_header.get_payload_length() as usize } - } - } - - impl Writer { - /// Create an UCI writer from an UCI transport. - pub fn new<T: AsyncWrite + Send + 'static>(rx: T) -> Self { - Writer { - socket: Box::pin(rx), + _ => { + let control_packet_header = + ControlPacketHeader::parse(&packet[0..HEADER_SIZE]).ok()?; + control_packet_header.get_payload_length() as usize } - } - - /// Write a single UCI packet to the writer. The packet is automatically - /// segmented if the payload exceeds the maximum size limit. - pub async fn write( - &mut self, - mut packet: &[u8], - pcapng: &mut Option<pcapng::File>, - ) -> anyhow::Result<()> { - use tokio::io::AsyncWriteExt; + }; - let mut header_bytes = [packet[0], packet[1], packet[2], 0]; - packet = &packet[HEADER_SIZE..]; + // Read the packet payload. + packet.resize(payload_length + HEADER_SIZE, 0); + socket.read_exact(&mut packet[HEADER_SIZE..]).await.ok()?; - loop { - let message_type = parse_message_type(header_bytes[0]); - let chunk_length = std::cmp::min( - packet.len(), - match message_type { - MessageType::Data => MAX_DATA_PACKET_PAYLOAD_SIZE, - _ => MAX_CTRL_PACKET_PAYLOAD_SIZE, - }, - ); - // Update header with framing information. - let pbf = if chunk_length < packet.len() { - PacketBoundaryFlag::NotComplete - } else { - PacketBoundaryFlag::Complete - }; - const PBF_MASK: u8 = 0x10; - header_bytes[0] &= !PBF_MASK; - header_bytes[0] |= (pbf as u8) << 4; - - match message_type { - MessageType::Data => { - let chunk_le_bytes = (chunk_length as u16).to_le_bytes(); - header_bytes[2..4].copy_from_slice(&chunk_le_bytes); - } - _ => header_bytes[3] = chunk_length as u8, - } + Some((packet, socket)) + } - if let Some(ref mut pcapng) = pcapng { - let mut packet_bytes = vec![]; - packet_bytes.extend(&header_bytes); - packet_bytes.extend(&packet[..chunk_length]); - pcapng.write(&packet_bytes, pcapng::Direction::Rx).await? - } + /// Write a single UCI packet to a TCP write half. + /// This function accepts segmented packets only. + pub async fn write( + mut socket: tokio::net::tcp::OwnedWriteHalf, + packet: Vec<u8>, + ) -> std::result::Result<tokio::net::tcp::OwnedWriteHalf, anyhow::Error> { + use tokio::io::AsyncWriteExt; - // Write the header and payload segment bytes. - self.socket.write_all(&header_bytes).await?; - self.socket.write_all(&packet[..chunk_length]).await?; - packet = &packet[chunk_length..]; + socket.write_all(&packet).await?; - if packet.is_empty() { - return Ok(()); - } - } - } + Ok(socket) } } diff --git a/src/pcapng.rs b/src/pcapng.rs index 3119bb1..9c1da7e 100644 --- a/src/pcapng.rs +++ b/src/pcapng.rs @@ -14,12 +14,12 @@ #![allow(clippy::unused_io_amount)] +use std::io::Write; use std::path::Path; use std::time::Instant; -use tokio::io::AsyncWriteExt; pub struct File { - file: tokio::fs::File, + file: std::sync::Mutex<std::fs::File>, start_time: Instant, } @@ -29,51 +29,50 @@ pub enum Direction { } impl File { - pub async fn create<P: AsRef<Path>>(path: P) -> std::io::Result<File> { - let mut file = tokio::fs::File::create(path).await?; + pub fn create<P: AsRef<Path>>(path: P) -> std::io::Result<File> { + let mut file = std::fs::File::create(path)?; // PCAPng files must start with a Section Header Block. - file.write(&u32::to_le_bytes(0x0A0D0D0A)).await?; // Block Type - file.write(&u32::to_le_bytes(28)).await?; // Block Total Length - file.write(&u32::to_le_bytes(0x1A2B3C4D)).await?; // Byte-Order Magic - file.write(&u16::to_le_bytes(1)).await?; // Major Version - file.write(&u16::to_le_bytes(0)).await?; // Minor Version - file.write(&u64::to_le_bytes(0xFFFFFFFFFFFFFFFF)).await?; // Section Length (not specified) - file.write(&u32::to_le_bytes(28)).await?; // Block Total Length + file.write(&u32::to_le_bytes(0x0A0D0D0A))?; // Block Type + file.write(&u32::to_le_bytes(28))?; // Block Total Length + file.write(&u32::to_le_bytes(0x1A2B3C4D))?; // Byte-Order Magic + file.write(&u16::to_le_bytes(1))?; // Major Version + file.write(&u16::to_le_bytes(0))?; // Minor Version + file.write(&u64::to_le_bytes(0xFFFFFFFFFFFFFFFF))?; // Section Length (not specified) + file.write(&u32::to_le_bytes(28))?; // Block Total Length // Write the Interface Description Block used for all // UCI records. - file.write(&u32::to_le_bytes(0x00000001)).await?; // Block Type - file.write(&u32::to_le_bytes(20)).await?; // Block Total Length - file.write(&u16::to_le_bytes(293)).await?; // LinkType - file.write(&u16::to_le_bytes(0)).await?; // Reserved - file.write(&u32::to_le_bytes(0)).await?; // SnapLen (no limit) - file.write(&u32::to_le_bytes(20)).await?; // Block Total Length + file.write(&u32::to_le_bytes(0x00000001))?; // Block Type + file.write(&u32::to_le_bytes(20))?; // Block Total Length + file.write(&u16::to_le_bytes(293))?; // LinkType + file.write(&u16::to_le_bytes(0))?; // Reserved + file.write(&u32::to_le_bytes(0))?; // SnapLen (no limit) + file.write(&u32::to_le_bytes(20))?; // Block Total Length Ok(File { - file, + file: std::sync::Mutex::new(file), start_time: Instant::now(), }) } - pub async fn write(&mut self, packet: &[u8], _dir: Direction) -> std::io::Result<()> { + pub fn write(&self, packet: &[u8], _dir: Direction) -> std::io::Result<()> { let packet_data_padding: usize = 4 - packet.len() % 4; let block_total_length: u32 = packet.len() as u32 + packet_data_padding as u32 + 32; let timestamp = self.start_time.elapsed().as_micros(); - let file = &mut self.file; + let mut file = self.file.lock().unwrap(); // Wrap the packet inside an Enhanced Packet Block. - file.write(&u32::to_le_bytes(0x00000006)).await?; // Block Type - file.write(&u32::to_le_bytes(block_total_length)).await?; - file.write(&u32::to_le_bytes(0)).await?; // Interface ID - file.write(&u32::to_le_bytes((timestamp >> 32) as u32)) - .await?; // Timestamp (High) - file.write(&u32::to_le_bytes(timestamp as u32)).await?; // Timestamp (Low) - file.write(&u32::to_le_bytes(packet.len() as u32)).await?; // Captured Packet Length - file.write(&u32::to_le_bytes(packet.len() as u32)).await?; // Original Packet Length - file.write(packet).await?; - file.write(&vec![0; packet_data_padding]).await?; - file.write(&u32::to_le_bytes(block_total_length)).await?; // Block Total Length + file.write(&u32::to_le_bytes(0x00000006))?; // Block Type + file.write(&u32::to_le_bytes(block_total_length))?; + file.write(&u32::to_le_bytes(0))?; // Interface ID + file.write(&u32::to_le_bytes((timestamp >> 32) as u32))?; // Timestamp (High) + file.write(&u32::to_le_bytes(timestamp as u32))?; // Timestamp (Low) + file.write(&u32::to_le_bytes(packet.len() as u32))?; // Captured Packet Length + file.write(&u32::to_le_bytes(packet.len() as u32))?; // Original Packet Length + file.write(packet)?; + file.write(&vec![0; packet_data_padding])?; + file.write(&u32::to_le_bytes(block_total_length))?; // Block Total Length Ok(()) } } diff --git a/src/session.rs b/src/session.rs index 8425b1b..eb4e063 100644 --- a/src/session.rs +++ b/src/session.rs @@ -18,6 +18,7 @@ use crate::packets::uci::{self, *}; use crate::{MacAddress, PicaCommand}; +use bytes::BytesMut; use std::collections::HashMap; use std::time::Duration; use tokio::sync::mpsc; @@ -27,6 +28,8 @@ use tokio::time; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::FromPrimitive; +use super::UciPacket; + pub const MAX_SESSION: usize = 255; pub const DEFAULT_RANGING_INTERVAL: Duration = time::Duration::from_millis(200); pub const DEFAULT_SLOT_DURATION: u16 = 2400; // RTSU unit @@ -462,7 +465,7 @@ impl AppConfig { MacAddress::Short(value[..].try_into().unwrap()) } MacAddressMode::AddressMode2 => { - MacAddress::Extend(value[..].try_into().unwrap()) + MacAddress::Extended(value[..].try_into().unwrap()) } _ => panic!("Unexpected MAC Address Mode"), }; @@ -486,7 +489,7 @@ impl AppConfig { .chunks(mac_address_size) .map(|c| match self.mac_address_mode { MacAddressMode::AddressMode0 => MacAddress::Short(c.try_into().unwrap()), - MacAddressMode::AddressMode2 => MacAddress::Extend(c.try_into().unwrap()), + MacAddressMode::AddressMode2 => MacAddress::Extended(c.try_into().unwrap()), _ => panic!("Unexpected MAC Address Mode"), }) .collect(); @@ -623,7 +626,7 @@ impl AppConfig { } AppConfigTlvType::ApplicationDataEndpoint => self.application_data_endpoint = value[0], id => { - println!("Ignored AppConfig parameter {:?}", id); + log::error!("Ignored AppConfig parameter {:?}", id); return Err(StatusCode::UciStatusInvalidParam); } }; @@ -637,7 +640,7 @@ impl AppConfig { self.raw.get(&id).cloned() } - pub fn can_start_ranging_with_peer(&self, peer_config: &Self) -> bool { + pub fn is_compatible_for_ranging(&self, peer_config: &Self) -> bool { self == peer_config && self.device_role != peer_config.device_role && self.device_type != peer_config.device_type @@ -649,6 +652,14 @@ impl AppConfig { .contains(&peer_config.device_mac_address) } + pub fn can_start_data_transfer(&self) -> bool { + self.device_role == DeviceRole::Initiator + } + + pub fn can_receive_data_transfer(&self) -> bool { + self.device_role == DeviceRole::Responder + } + fn extend(&mut self, configs: &[AppConfigTlv]) -> Vec<AppConfigStatus> { if !app_config_has_mandatory_parameters(configs) { // TODO: What shall we do in this situation? @@ -717,12 +728,13 @@ pub struct Session { /// cf. [UCI] 7.2 Table 13: 4 octets unique random number generated by application id: u32, device_handle: usize, + data: BytesMut, session_type: SessionType, pub sequence_number: u32, pub app_config: AppConfig, ranging_task: Option<JoinHandle<()>>, - tx: mpsc::Sender<ControlPacket>, + tx: mpsc::UnboundedSender<UciPacket>, pica_tx: mpsc::Sender<PicaCommand>, } @@ -731,13 +743,14 @@ impl Session { id: u32, session_type: SessionType, device_handle: usize, - tx: mpsc::Sender<ControlPacket>, + tx: mpsc::UnboundedSender<UciPacket>, pica_tx: mpsc::Sender<PicaCommand>, ) -> Self { Self { state: SessionState::SessionStateDeinit, id, device_handle, + data: BytesMut::new(), session_type, sequence_number: 0, app_config: AppConfig::default(), @@ -768,7 +781,6 @@ impl Session { .build() .into(), ) - .await .unwrap() }); } @@ -781,6 +793,18 @@ impl Session { self.app_config.rng_data_ntf } + pub fn data(&self) -> &BytesMut { + &self.data + } + + pub fn clear_data(&mut self) { + self.data.clear() + } + + pub fn session_type(&self) -> SessionType { + self.session_type + } + pub fn session_state(&self) -> SessionState { self.state } @@ -794,9 +818,10 @@ impl Session { fn command_set_app_config(&mut self, cmd: SessionSetAppConfigCmd) -> SessionSetAppConfigRsp { // TODO properly handle these asserts - println!( + log::debug!( "[{}:0x{:x}] Session Set App Config", - self.device_handle, self.id + self.device_handle, + self.id ); assert_eq!(self.id, cmd.get_session_token()); assert!( @@ -850,9 +875,10 @@ impl Session { } fn command_get_app_config(&self, cmd: SessionGetAppConfigCmd) -> SessionGetAppConfigRsp { - println!( + log::debug!( "[{}:0x{:x}] Session Get App Config", - self.device_handle, self.id + self.device_handle, + self.id ); assert_eq!(self.id, cmd.get_session_token()); @@ -871,7 +897,7 @@ impl Session { v: Vec::new(), }), }, - Err(_) => println!("Failed to parse AppConfigTlv: {:?}", *config_id), + Err(_) => log::error!("Failed to parse AppConfigTlv: {:?}", *config_id), } (valid_parameters, invalid_parameters) }, @@ -890,7 +916,7 @@ impl Session { } fn command_get_state(&self, cmd: SessionGetStateCmd) -> SessionGetStateRsp { - println!("[{}:0x{:x}] Session Get State", self.device_handle, self.id); + log::debug!("[{}:0x{:x}] Session Get State", self.device_handle, self.id); assert_eq!(self.id, cmd.get_session_token()); SessionGetStateRspBuilder { status: StatusCode::UciStatusOk, @@ -903,9 +929,10 @@ impl Session { &mut self, cmd: SessionUpdateControllerMulticastListCmd, ) -> SessionUpdateControllerMulticastListRsp { - println!( + log::debug!( "[{}:0x{:x}] Session Update Controller Multicast List", - self.device_handle, self.id + self.device_handle, + self.id ); assert_eq!(self.id, cmd.get_session_token()); if (self.state != SessionState::SessionStateActive @@ -1009,7 +1036,7 @@ impl Session { controlee_status.push(ControleeStatus { mac_address: match controlee.short_address { MacAddress::Short(address) => address, - MacAddress::Extend(_) => panic!("Extended address is not supported!"), + MacAddress::Extended(_) => panic!("Extended address is not supported!"), }, subsession_id: controlee.sub_session_id, status: update_status, @@ -1045,7 +1072,7 @@ impl Session { controlee_status.push(ControleeStatus { mac_address: match address { MacAddress::Short(addr) => addr, - MacAddress::Extend(_) => panic!("Extended address is not supported!"), + MacAddress::Extended(_) => panic!("Extended address is not supported!"), }, subsession_id: controlee.sub_session_id, status: update_status, @@ -1075,14 +1102,13 @@ impl Session { .build() .into(), ) - .await .unwrap() }); SessionUpdateControllerMulticastListRspBuilder { status }.build() } fn command_range_start(&mut self, cmd: SessionStartCmd) -> SessionStartRsp { - println!("[{}:0x{:x}] Range Start", self.device_handle, self.id); + log::debug!("[{}:0x{:x}] Range Start", self.device_handle, self.id); assert_eq!(self.id, cmd.get_session_id()); let status = if self.state != SessionState::SessionStateIdle { @@ -1119,7 +1145,7 @@ impl Session { } } fn command_range_stop(&mut self, cmd: SessionStopCmd) -> SessionStopRsp { - println!("[{}:0x{:x}] Range Stop", self.device_handle, self.id); + log::debug!("[{}:0x{:x}] Range Stop", self.device_handle, self.id); assert_eq!(self.id, cmd.get_session_id()); let status = if self.state != SessionState::SessionStateActive { @@ -1139,9 +1165,10 @@ impl Session { &self, cmd: SessionGetRangingCountCmd, ) -> SessionGetRangingCountRsp { - println!( + log::debug!( "[{}:0x{:x}] Range Get Ranging Count", - self.device_handle, self.id + self.device_handle, + self.id ); assert_eq!(self.id, cmd.get_session_id()); @@ -1184,6 +1211,7 @@ impl Session { } pub fn data_message_snd(&mut self, data: DataMessageSnd) -> SessionControlNotification { + log::debug!("[{}] data_message_snd", self.device_handle); let session_token = data.get_session_handle(); let uci_sequence_number = data.get_data_sequence_number() as u8; @@ -1200,11 +1228,7 @@ impl Session { assert_eq!(self.id, session_token); - // TODO: perform actual data transfer across devices - println!( - "Data packet received, payload bytes: {:?}", - data.get_application_data() - ); + self.data.extend_from_slice(data.get_application_data()); DataCreditNtfBuilder { credit_availability: CreditAvailability::CreditAvailable, diff --git a/tests/data_transfer.py b/tests/data_transfer.py index f7d731c..241a6b9 100755 --- a/tests/data_transfer.py +++ b/tests/data_transfer.py @@ -16,6 +16,7 @@ import asyncio import argparse +import logging from pica import Host from pica.packets import uci from .helper import init @@ -24,7 +25,7 @@ from pathlib import Path MAX_DATA_PACKET_PAYLOAD_SIZE = 1024 -async def data_message_send(host: Host, peer: Host, file: Path): +async def controller(host: Host, peer: Host, file: Path): await init(host) host.send_control( @@ -92,6 +93,167 @@ async def data_message_send(host: Host, peer: Host, file: Path): await data_transfer(host, peer.mac_address, file, 0) + # START SESSION CMD + host.send_control(uci.SessionStartCmd(session_id=0)) + + await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK)) + + await host.expect_control( + uci.SessionStatusNtf( + session_token=0, + session_state=uci.SessionState.SESSION_STATE_ACTIVE, + reason_code=0, + ) + ) + + await host.expect_control( + uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) + ) + + event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) + event.show() + + event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) + event.show() + + # STOP SESSION + host.send_control(uci.SessionStopCmd(session_id=0)) + + await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK)) + + await host.expect_control( + uci.SessionStatusNtf( + session_token=0, + session_state=uci.SessionState.SESSION_STATE_IDLE, + reason_code=0, + ) + ) + + await host.expect_control( + uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + ) + + # DEINIT + host.send_control(uci.SessionDeinitCmd(session_token=0)) + + await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + + +async def controlee(host: Host, peer: Host, file: Path): + await init(host) + + host.send_control( + uci.SessionInitCmd( + session_id=0, + session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION, + ) + ) + + await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK)) + + await host.expect_control( + uci.SessionStatusNtf( + session_token=0, + session_state=uci.SessionState.SESSION_STATE_INIT, + reason_code=0, + ) + ) + + mac_address_mode = 0x0 + ranging_duration = int(1000).to_bytes(4, byteorder="little") + device_role_responder = bytes([1]) + device_type_controllee = bytes([0]) + host.send_control( + uci.SessionSetAppConfigCmd( + session_token=0, + tlvs=[ + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, v=device_role_responder + ), + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, v=device_type_controllee + ), + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address + ), + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE, + v=bytes([mac_address_mode]), + ), + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration + ), + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.NO_OF_CONTROLEE, v=bytes([1]) + ), + uci.AppConfigTlv( + cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address + ), + ], + ) + ) + + await host.expect_control( + uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[]) + ) + + await host.expect_control( + uci.SessionStatusNtf( + session_token=0, + session_state=uci.SessionState.SESSION_STATE_IDLE, + reason_code=0, + ) + ) + + host.send_control(uci.SessionStartCmd(session_id=0)) + + await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK)) + + await host.expect_control( + uci.SessionStatusNtf( + session_token=0, + session_state=uci.SessionState.SESSION_STATE_ACTIVE, + reason_code=0, + ) + ) + + await host.expect_control( + uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE) + ) + + with file.open("rb") as f: + application_data = list(bytearray(f.read())) + event = await host.expect_data( + uci.DataMessageRcv( + session_handle=0, + status=uci.StatusCode.UCI_STATUS_OK, + source_address=int.from_bytes(peer.mac_address, "big"), + data_sequence_number=0x01, + application_data=application_data, + ), + timeout=2.0, + ) + event.show() + + event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0) + event.show() + + host.send_control(uci.SessionStopCmd(session_id=0)) + + await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK)) + + await host.expect_control( + uci.SessionStatusNtf( + session_token=0, + session_state=uci.SessionState.SESSION_STATE_IDLE, + reason_code=0, + ) + ) + + await host.expect_control( + uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY) + ) + host.send_control(uci.SessionDeinitCmd(session_token=0)) await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK)) @@ -163,22 +325,24 @@ async def data_transfer( async def run(address: str, uci_port: int, file: Path): try: - host0 = await Host.connect(address, uci_port, bytes([0, 1])) - host1 = await Host.connect(address, uci_port, bytes([0, 2])) - except Exception: - print( + host0 = await Host.connect(address, uci_port, bytes([0, 0])) + host1 = await Host.connect(address, uci_port, bytes([0, 1])) + except Exception as e: + raise Exception( f"Failed to connect to Pica server at address {address}:{uci_port}\n" + "Make sure the server is running" ) - exit(1) - - async with asyncio.TaskGroup() as tg: - tg.create_task(data_message_send(host0, host1, file)) - host0.disconnect() - host1.disconnect() - - print("Data transfer test completed") + try: + async with asyncio.TaskGroup() as tg: + tg.create_task(controller(host0, host1, file)) + tg.create_task(controlee(host1, host0, file)) + except Exception as e: + raise e + finally: + host0.disconnect() + host1.disconnect() + logging.debug("Data transfer test completed") def main(): diff --git a/tests/test_runner.py b/tests/test_runner.py index aa7a709..83c5d5a 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,5 +1,4 @@ import asyncio -from asyncio.subprocess import Process import pytest import pytest_asyncio import logging @@ -11,7 +10,7 @@ from typing import Tuple from . import ranging, data_transfer -PICA_BIN = Path("./target/debug/pica-server") +PICA_BIN = Path("./target/debug/pica") DATA_FILE = Path("README.md") PICA_LOCALHOST = "127.0.0.1" |