diff options
author | Jeongik Cha <jeongik@google.com> | 2023-07-26 01:40:53 +0000 |
---|---|---|
committer | Jeongik Cha <jeongik@google.com> | 2023-08-03 19:53:01 +0900 |
commit | 965a1e7d0710a300704b74c023df63c88597bfba (patch) | |
tree | 5c440884231ba96b418ead9bc627f891fd733e3f | |
parent | a3307d6a5a89a764779bab2f0c53e4cec6f48387 (diff) | |
download | vhost-device-vsock-main-16k.tar.gz |
Import platform/external/rust/crates/vhost-device-vsockmain-16k
Bug: 277909042
Test: n/a
Change-Id: Ifd78839d19d1e1fad98cd60a5d8a2491f6553ea0
-rw-r--r-- | .cargo_vcs_info.json | 6 | ||||
-rw-r--r-- | CHANGELOG.md | 15 | ||||
-rw-r--r-- | Cargo.lock | 1143 | ||||
-rw-r--r-- | Cargo.toml | 95 | ||||
-rw-r--r-- | Cargo.toml.orig | 36 | ||||
-rw-r--r-- | LICENSE | 228 | ||||
-rw-r--r-- | LICENSE-APACHE | 202 | ||||
-rw-r--r-- | LICENSE-BSD-3-Clause | 26 | ||||
-rw-r--r-- | METADATA | 19 | ||||
-rw-r--r-- | MODULE_LICENSE_APACHE2 | 0 | ||||
-rw-r--r-- | OWNERS | 2 | ||||
-rw-r--r-- | README.md | 178 | ||||
-rw-r--r-- | src/main.rs | 471 | ||||
-rw-r--r-- | src/rxops.rs | 36 | ||||
-rw-r--r-- | src/rxqueue.rs | 157 | ||||
-rw-r--r-- | src/thread_backend.rs | 506 | ||||
-rw-r--r-- | src/txbuf.rs | 233 | ||||
-rw-r--r-- | src/vhu_vsock.rs | 518 | ||||
-rw-r--r-- | src/vhu_vsock_thread.rs | 837 | ||||
-rw-r--r-- | src/vsock_conn.rs | 781 |
20 files changed, 5489 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..2e81cc1 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "6305c66f227717e639b7269c6c37efc8a7b1c5f8" + }, + "path_in_vcs": "crates/vsock" +}
\ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..51d3f04 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,15 @@ +# Changelog +## [Unreleased] + +### Added + +### Changed + +### Fixed + +### Deprecated + +## [0.1.0] + +First release + diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..bb1fe3e --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1143 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + +[[package]] +name = "aho-corasick" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is-terminal", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" + +[[package]] +name = "anstyle-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +dependencies = [ + "anstyle", + "windows-sys", +] + +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + +[[package]] +name = "async-trait" +version = "0.1.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" + +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "cc" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "4.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" +dependencies = [ + "clap_builder", + "clap_derive", + "once_cell", +] + +[[package]] +name = "clap_builder" +version = "4.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" + +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + +[[package]] +name = "config" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d379af7f68bfc21714c6c7dea883544201741d2ce8274bb12fa54f89507f52a7" +dependencies = [ + "async-trait", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "yaml-rust", +] + +[[package]] +name = "cpufeatures" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "dlv-list" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" + +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "epoll" +version = "4.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74351c3392ea1ff6cd2628e0042d268ac2371cb613252ff383b6dfa50d22fa79" +dependencies = [ + "bitflags 2.3.3", + "libc", +] + +[[package]] +name = "equivalent" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" + +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", + "num_cpus", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + +[[package]] +name = "hermit-abi" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "indexmap" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +dependencies = [ + "equivalent", + "hashbrown 0.14.0", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + +[[package]] +name = "is-terminal" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" +dependencies = [ + "hermit-abi", + "rustix 0.38.3", + "windows-sys", +] + +[[package]] +name = "itoa" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b02a5381cc465bd3041d84623d0fa3b66738b52b8e2fc3bab8ad63ab032f4a" + +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.147" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" + +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + +[[package]] +name = "linux-raw-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" + +[[package]] +name = "log" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "ordered-multimap" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +dependencies = [ + "dlv-list", + "hashbrown 0.12.3", +] + +[[package]] +name = "pathdiff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" + +[[package]] +name = "pest" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d2d1d55045829d65aad9d389139882ad623b33b904e7c9f1b10c5b8927298e5" +dependencies = [ + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef623c9bbfa0eedf5a0efba11a5ee83209c326653ca31ff019bec3a95bfff2b" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3e8cba4ec22bada7fc55ffe51e2deb6a0e0db2d0b7ab0b103acc80d2510c190" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pest_meta" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01f71cb40bd8bb94232df14b946909e14660e33fc05db3e50ae2a82d7ea0ca0" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "regex" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83d3daa6976cffb758ec878f108ba0e062a45b2d6ca3a2cca965338855476caf" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" + +[[package]] +name = "ron" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a" +dependencies = [ + "base64", + "bitflags 1.3.2", + "serde", +] + +[[package]] +name = "rust-ini" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rustix" +version = "0.37.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + +[[package]] +name = "rustix" +version = "0.38.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4" +dependencies = [ + "bitflags 2.3.3", + "errno", + "libc", + "linux-raw-sys 0.4.3", + "windows-sys", +] + +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + +[[package]] +name = "serde" +version = "1.0.168" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d614f89548720367ded108b3c843be93f3a341e22d5674ca0dd5cd57f34926af" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.168" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fe589678c688e44177da4f27152ee2d190757271dc7f1d5b6b9f68d869d641" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f1e14e89be7aa4c4b78bdbdc9eb5bf8517829a600ae8eaa39a6e1d960b5185c" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_yaml" +version = "0.9.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "452e67b9c20c37fa79df53201dc03839651086ed9bbe92b3ca585ca9fdaa7d85" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + +[[package]] +name = "sha2" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "slab" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" +dependencies = [ + "autocfg", +] + +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] +name = "syn" +version = "2.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tempfile" +version = "3.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" +dependencies = [ + "autocfg", + "cfg-if", + "fastrand", + "redox_syscall", + "rustix 0.37.23", + "windows-sys", +] + +[[package]] +name = "termcolor" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "thiserror" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c16a64ba9387ef3fdae4f9c1a7f07a0997fce91985c0336f1ddc1822b3b37802" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d14928354b01c4d6a4f0e549069adef399a284e7995c7ccca94e8a07a5346c59" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "toml" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" +dependencies = [ + "serde", +] + +[[package]] +name = "typenum" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" + +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + +[[package]] +name = "unicode-ident" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" + +[[package]] +name = "unsafe-libyaml" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1865806a559042e51ab5414598446a5871b561d21b6764f2eabb0dd481d880a6" + +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "vhost" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73832f4d8d636d63d9b145e8ef22a2c50b93f4d24eb7a99c9e6781b1b08549cf" +dependencies = [ + "bitflags 1.3.2", + "libc", + "vm-memory", + "vmm-sys-util", +] + +[[package]] +name = "vhost-device-vsock" +version = "0.1.0" +dependencies = [ + "byteorder", + "clap", + "config", + "env_logger", + "epoll", + "futures", + "log", + "serde", + "serde_yaml", + "tempfile", + "thiserror", + "vhost", + "vhost-user-backend", + "virtio-bindings", + "virtio-queue", + "virtio-vsock", + "vm-memory", + "vmm-sys-util", +] + +[[package]] +name = "vhost-user-backend" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ea9d5e8ec847cde4df1c04e586698a479706fd6beca37323f9d425b24b4c2f" +dependencies = [ + "libc", + "log", + "vhost", + "virtio-bindings", + "virtio-queue", + "vm-memory", + "vmm-sys-util", +] + +[[package]] +name = "virtio-bindings" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c18d7b74098a946470ea265b5bacbbf877abc3373021388454de0d47735a5b98" + +[[package]] +name = "virtio-queue" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35aca00da06841bd99162c381ec65893cace23ca0fb89254302cfe4bec4c300f" +dependencies = [ + "log", + "virtio-bindings", + "vm-memory", + "vmm-sys-util", +] + +[[package]] +name = "virtio-vsock" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c92d1d0c0db339e03dc275e86e5de2654ed94b351f02d405a3a0260dfc1b839f" +dependencies = [ + "virtio-bindings", + "virtio-queue", + "vm-memory", +] + +[[package]] +name = "vm-memory" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c7a0891cbac53618f5f6eec650ed1dc4f7e506bbe14877aff49d94b8408b0" +dependencies = [ + "arc-swap", + "bitflags 1.3.2", + "libc", + "thiserror", + "vmm-sys-util", + "winapi", +] + +[[package]] +name = "vmm-sys-util" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd64fe09d8e880e600c324e7d664760a17f56e9672b7495a86381b49e4f72f46" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ec30591 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,95 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +name = "vhost-device-vsock" +version = "0.1.0" +authors = [ + "Harshavardhan Unnibhavi <harshanavkis@gmail.com>", + "Stefano Garzarella <sgarzare@redhat.com>", +] +description = "A virtio-vsock device using the vhost-user protocol." +readme = "README.md" +keywords = [ + "vhost", + "vsock", +] +license = "Apache-2.0 OR BSD-3-Clause" +repository = "https://github.com/rust-vmm/vhost-device" + +[dependencies.byteorder] +version = "1" + +[dependencies.clap] +version = "4.3" +features = ["derive"] + +[dependencies.config] +version = "0.13" + +[dependencies.env_logger] +version = "0.10" + +[dependencies.epoll] +version = "4.3.2" + +[dependencies.futures] +version = "0.3" +features = ["thread-pool"] + +[dependencies.log] +version = "0.4" + +[dependencies.serde] +version = "1" + +[dependencies.serde_yaml] +version = "0.9" + +[dependencies.thiserror] +version = "1.0" + +[dependencies.vhost] +version = "0.8" +features = ["vhost-user-slave"] + +[dependencies.vhost-user-backend] +version = "0.10" + +[dependencies.virtio-bindings] +version = "0.2.1" + +[dependencies.virtio-queue] +version = "0.9" + +[dependencies.virtio-vsock] +version = "0.3.1" + +[dependencies.vm-memory] +version = "0.12" + +[dependencies.vmm-sys-util] +version = "0.11" + +[dev-dependencies.tempfile] +version = "3.6.0" + +[dev-dependencies.virtio-queue] +version = "0.9" +features = ["test-utils"] + +[features] +xen = [ + "vm-memory/xen", + "vhost/xen", + "vhost-user-backend/xen", +] diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..2b37c56 --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,36 @@ +[package] +name = "vhost-device-vsock" +version = "0.1.0" +authors = ["Harshavardhan Unnibhavi <harshanavkis@gmail.com>", "Stefano Garzarella <sgarzare@redhat.com>"] +description = "A virtio-vsock device using the vhost-user protocol." +repository = "https://github.com/rust-vmm/vhost-device" +readme = "README.md" +keywords = ["vhost", "vsock"] +license = "Apache-2.0 OR BSD-3-Clause" +edition = "2018" + +[features] +xen = ["vm-memory/xen", "vhost/xen", "vhost-user-backend/xen"] + +[dependencies] +byteorder = "1" +clap = { version = "4.3", features = ["derive"] } +env_logger = "0.10" +epoll = "4.3.2" +futures = { version = "0.3", features = ["thread-pool"] } +log = "0.4" +thiserror = "1.0" +vhost = { version = "0.8", features = ["vhost-user-slave"] } +vhost-user-backend = "0.10" +virtio-bindings = "0.2.1" +virtio-queue = "0.9" +virtio-vsock = "0.3.1" +vm-memory = "0.12" +vmm-sys-util = "0.11" +config = "0.13" +serde = "1" +serde_yaml = "0.9" + +[dev-dependencies] +virtio-queue = { version = "0.9", features = ["test-utils"] } +tempfile = "3.6.0" @@ -0,0 +1,228 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +Copyright 2022 The rust-vmm authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/LICENSE-BSD-3-Clause b/LICENSE-BSD-3-Clause new file mode 100644 index 0000000..dd975d9 --- /dev/null +++ b/LICENSE-BSD-3-Clause @@ -0,0 +1,26 @@ +Copyright 2022 The rust-vmm authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/METADATA b/METADATA new file mode 100644 index 0000000..456b5a6 --- /dev/null +++ b/METADATA @@ -0,0 +1,19 @@ +name: "vhost-device-vsock" +description: "A virtio-vsock device using the vhost-user protocol." +third_party { + url { + type: HOMEPAGE + value: "https://crates.io/crates/vhost-device-vsock" + } + url { + type: ARCHIVE + value: "https://static.crates.io/crates/vhost-device-vsock/vhost-device-vsock-0.1.0.crate" + } + version: "0.1.0" + license_type: NOTICE + last_upgrade_date { + year: 2023 + month: 7 + day: 26 + } +} diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2 new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/MODULE_LICENSE_APACHE2 @@ -0,0 +1,2 @@ +include platform/prebuilts/rust:master:/OWNERS +jeongik@google.com diff --git a/README.md b/README.md new file mode 100644 index 0000000..f4e946d --- /dev/null +++ b/README.md @@ -0,0 +1,178 @@ +# vhost-device-vsock + +## Design + +The crate introduces a vhost-device-vsock device that enables communication between an +application running in the guest i.e inside a VM and an application running on the +host i.e outside the VM. The application running in the guest communicates over VM +sockets i.e over AF_VSOCK sockets. The application running on the host connects to a +unix socket on the host i.e communicates over AF_UNIX sockets. The main components of +the crate are split into various files as described below: + +- [packet.rs](src/packet.rs) + - Introduces the **VsockPacket** structure that represents a single vsock packet + processing methods. +- [rxops.rs](src/rxops.rs) + - Introduces various vsock operations that are enqueued into the rxqueue to be sent to the + guest. Exposes a **RxOps** structure. +- [rxqueue.rs](src/rxqueue.rs) + - rxqueue contains the pending rx operations corresponding to that connection. The queue is + represented as a bitmap as we handle connection-oriented connections. The module contains + various queue manipulation methods. Exposes a **RxQueue** structure. +- [thread_backend.rs](src/thread_backend.rs) + - Multiplexes connections between host and guest and calls into per connection methods that + are responsible for processing data and packets corresponding to the connection. Exposes a + **VsockThreadBackend** structure. +- [txbuf.rs](src/txbuf.rs) + - Module to buffer data that is sent from the guest to the host. The module exposes a **LocalTxBuf** + structure. +- [vhost_user_vsock_thread.rs](src/vhost_user_vsock_thread.rs) + - Module exposes a **VhostUserVsockThread** structure. It also handles new host initiated + connections and provides interfaces for registering host connections with the epoll fd. Also + provides interfaces for iterating through the rx and tx queues. +- [vsock_conn.rs](src/vsock_conn.rs) + - Module introduces a **VsockConnection** structure that represents a single vsock connection + between the guest and the host. It also processes packets according to their type. +- [vhu_vsock.rs](src/vhu_vsock.rs) + - exposes the main vhost user vsock backend interface. + +## Usage + +Run the vhost-device-vsock device: +``` +vhost-device-vsock --guest-cid=<CID assigned to the guest> \ + --socket=<path to the Unix socket to be created to communicate with the VMM via the vhost-user protocol> \ + --uds-path=<path to the Unix socket to communicate with the guest via the virtio-vsock device> \ + [--tx-buffer-size=<size of the buffer used for the TX virtqueue (guest->host packets)>] +``` +or +``` +vhost-device-vsock --vm guest_cid=<CID assigned to the guest>,socket=<path to the Unix socket to be created to communicate with the VMM via the vhost-user protocol>,uds-path=<path to the Unix socket to communicate with the guest via the virtio-vsock device>[,tx-buffer-size=<size of the buffer used for the TX virtqueue (guest->host packets)>] +``` + +Specify the `--vm` argument multiple times to specify multiple devices like this: +``` +vhost-device-vsock \ +--vm guest-cid=3,socket=/tmp/vhost3.socket,uds-path=/tmp/vm3.vsock \ +--vm guest-cid=4,socket=/tmp/vhost4.socket,uds-path=/tmp/vm4.vsock,tx-buffer-size=32768 +``` + +Or use a configuration file: +``` +vhost-device-vsock --config=<path to the local yaml configuration file> +``` + +Configuration file example: +```yaml +vms: + - guest_cid: 3 + socket: /tmp/vhost3.socket + uds_path: /tmp/vm3.sock + tx_buffer_size: 65536 + - guest_cid: 4 + socket: /tmp/vhost4.socket + uds_path: /tmp/vm4.sock + tx_buffer_size: 32768 +``` + +Run VMM (e.g. QEMU): + +``` +qemu-system-x86_64 \ + <normal QEMU options> \ + -object memory-backend-file,share=on,id=mem0,size=<Guest RAM size>,mem-path=<Guest RAM file path> \ # size == -m size + -machine <machine options>,memory-backend=mem0 \ + -chardev socket,id=char0,reconnect=0,path=<vhost-user socket path> \ + -device vhost-user-vsock-pci,chardev=char0 +``` + +## Working example + +```sh +shell1$ vhost-device-vsock --vm guest-cid=4,uds-path=/tmp/vm4.vsock,socket=/tmp/vhost4.socket +``` +or if you want to configure the TX buffer size +```sh +shell1$ vhost-device-vsock --vm guest-cid=4,uds-path=/tmp/vm4.vsock,socket=/tmp/vhost4.socket,tx-buffer-size=65536 +``` + +```sh +shell2$ qemu-system-x86_64 \ + -drive file=vm.qcow2,format=qcow2,if=virtio -smp 2 -m 512M -mem-prealloc \ + -object memory-backend-file,share=on,id=mem0,size=512M,mem-path="/dev/hugepages" \ + -machine q35,accel=kvm,memory-backend=mem0 \ + -chardev socket,id=char0,reconnect=0,path=/tmp/vhost4.socket \ + -device vhost-user-vsock-pci,chardev=char0 +``` + +### Guest listening + +#### iperf + +```sh +# https://github.com/stefano-garzarella/iperf-vsock +guest$ iperf3 --vsock -s +host$ iperf3 --vsock -c /tmp/vm4.vsock +``` + +#### netcat + +```sh +guest$ nc --vsock -l 1234 + +host$ nc -U /tmp/vm4.vsock +CONNECT 1234 +``` + +### Host listening + +#### iperf + +```sh +# https://github.com/stefano-garzarella/iperf-vsock +host$ iperf3 --vsock -s -B /tmp/vm4.vsock +guest$ iperf3 --vsock -c 2 +``` + +#### netcat + +```sh +host$ nc -l -U /tmp/vm4.vsock_1234 + +guest$ nc --vsock 2 1234 +``` + +### Sibling VM communication + +If you add multiple VMs, they can communicate with each other. For example, if you have two VMs with +CID 3 and 4, you can run the following commands to make them communicate: + +```sh +shell1$ vhost-device-vsock --vm guest-cid=3,uds-path=/tmp/vm3.vsock,socket=/tmp/vhost3.socket \ + --vm guest-cid=4,uds-path=/tmp/vm4.vsock,socket=/tmp/vhost4.socket +shell2$ qemu-system-x86_64 \ + -drive file=vm1.qcow2,format=qcow2,if=virtio -smp 2 -m 512M -mem-prealloc \ + -object memory-backend-file,share=on,id=mem0,size=512M,mem-path="/dev/hugepages" \ + -machine q35,accel=kvm,memory-backend=mem0 \ + -chardev socket,id=char0,reconnect=0,path=/tmp/vhost3.socket \ + -device vhost-user-vsock-pci,chardev=char0 +shell3$ qemu-system-x86_64 \ + -drive file=vm2.qcow2,format=qcow2,if=virtio -smp 2 -m 512M -mem-prealloc \ + -object memory-backend-file,share=on,id=mem0,size=512M,mem-path="/dev/hugepages2" \ + -machine q35,accel=kvm,memory-backend=mem0 \ + -chardev socket,id=char0,reconnect=0,path=/tmp/vhost4.socket \ + -device vhost-user-vsock-pci,chardev=char0 +``` + +```sh +# nc-vsock patched to set `.svm_flags = VMADDR_FLAG_TO_HOST` +guest_cid3$ nc-vsock -l 1234 +guest_cid4$ nc-vsock 3 1234 +``` + +## License + +This project is licensed under either of + +- [Apache License](http://www.apache.org/licenses/LICENSE-2.0), Version 2.0 +- [BSD-3-Clause License](https://opensource.org/licenses/BSD-3-Clause) diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..51f3759 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,471 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +mod rxops; +mod rxqueue; +mod thread_backend; +mod txbuf; +mod vhu_vsock; +mod vhu_vsock_thread; +mod vsock_conn; + +use std::{ + collections::HashMap, + convert::TryFrom, + process::exit, + sync::{Arc, RwLock}, + thread, +}; + +use crate::vhu_vsock::{CidMap, VhostUserVsockBackend, VsockConfig}; +use clap::{Args, Parser}; +use log::{error, info, warn}; +use serde::Deserialize; +use thiserror::Error as ThisError; +use vhost::{vhost_user, vhost_user::Listener}; +use vhost_user_backend::VhostUserDaemon; +use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; + +const DEFAULT_GUEST_CID: u64 = 3; +const DEFAULT_TX_BUFFER_SIZE: u32 = 64 * 1024; + +#[derive(Debug, ThisError)] +enum CliError { + #[error("No arguments provided")] + NoArgsProvided, + #[error("Failed to parse configuration file")] + ConfigParse, +} + +#[derive(Debug, ThisError)] +enum VmArgsParseError { + #[error("Bad argument")] + BadArgument, + #[error("Invalid key `{0}`")] + InvalidKey(String), + #[error("Unable to convert string to integer: {0}")] + ParseInteger(std::num::ParseIntError), + #[error("Required key `{0}` not found")] + RequiredKeyNotFound(String), +} + +#[derive(Debug, ThisError)] +enum BackendError { + #[error("Could not create backend: {0}")] + CouldNotCreateBackend(vhu_vsock::Error), + #[error("Could not create daemon: {0}")] + CouldNotCreateDaemon(vhost_user_backend::Error), +} + +#[derive(Args, Clone, Debug, Deserialize)] +struct VsockParam { + /// Context identifier of the guest which uniquely identifies the device for its lifetime. + #[arg( + long, + default_value_t = DEFAULT_GUEST_CID, + conflicts_with = "config", + conflicts_with = "vm" + )] + guest_cid: u64, + + /// Unix socket to which a hypervisor connects to and sets up the control path with the device. + #[arg(long, conflicts_with = "config", conflicts_with = "vm")] + socket: String, + + /// Unix socket to which a host-side application connects to. + #[arg(long, conflicts_with = "config", conflicts_with = "vm")] + uds_path: String, + + /// The size of the buffer used for the TX virtqueue + #[clap(long, default_value_t = DEFAULT_TX_BUFFER_SIZE, conflicts_with = "config", conflicts_with = "vm")] + tx_buffer_size: u32, +} + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct VsockArgs { + #[command(flatten)] + param: Option<VsockParam>, + + /// Device parameters corresponding to a VM in the form of comma separated key=value pairs. + /// The allowed keys are: guest_cid, socket, uds_path and tx_buffer_size + /// Example: --vm guest-cid=3,socket=/tmp/vhost3.socket,uds-path=/tmp/vm3.vsock,tx-buffer-size=65536 + /// Multiple instances of this argument can be provided to configure devices for multiple guests. + #[arg(long, conflicts_with = "config", verbatim_doc_comment, value_parser = parse_vm_params)] + vm: Option<Vec<VsockConfig>>, + + /// Load from a given configuration file + #[arg(long)] + config: Option<String>, +} + +fn parse_vm_params(s: &str) -> Result<VsockConfig, VmArgsParseError> { + let mut guest_cid = None; + let mut socket = None; + let mut uds_path = None; + let mut tx_buffer_size = None; + + for arg in s.trim().split(',') { + let mut parts = arg.split('='); + let key = parts.next().ok_or(VmArgsParseError::BadArgument)?; + let val = parts.next().ok_or(VmArgsParseError::BadArgument)?; + + match key { + "guest_cid" | "guest-cid" => { + guest_cid = Some(val.parse().map_err(VmArgsParseError::ParseInteger)?) + } + "socket" => socket = Some(val.to_string()), + "uds_path" | "uds-path" => uds_path = Some(val.to_string()), + "tx_buffer_size" | "tx-buffer-size" => { + tx_buffer_size = Some(val.parse().map_err(VmArgsParseError::ParseInteger)?) + } + _ => return Err(VmArgsParseError::InvalidKey(key.to_string())), + } + } + + Ok(VsockConfig::new( + guest_cid.unwrap_or(DEFAULT_GUEST_CID), + socket.ok_or_else(|| VmArgsParseError::RequiredKeyNotFound("socket".to_string()))?, + uds_path.ok_or_else(|| VmArgsParseError::RequiredKeyNotFound("uds-path".to_string()))?, + tx_buffer_size.unwrap_or(DEFAULT_TX_BUFFER_SIZE), + )) +} + +impl VsockArgs { + pub fn parse_config(&self) -> Option<Result<Vec<VsockConfig>, CliError>> { + if let Some(c) = &self.config { + let b = config::Config::builder() + .add_source(config::File::new(c.as_str(), config::FileFormat::Yaml)) + .build(); + if let Ok(s) = b { + let mut v = s.get::<Vec<VsockParam>>("vms").unwrap(); + if !v.is_empty() { + let parsed: Vec<VsockConfig> = v + .drain(..) + .map(|p| { + VsockConfig::new( + p.guest_cid, + p.socket.trim().to_string(), + p.uds_path.trim().to_string(), + p.tx_buffer_size, + ) + }) + .collect(); + return Some(Ok(parsed)); + } else { + return Some(Err(CliError::ConfigParse)); + } + } else { + return Some(Err(CliError::ConfigParse)); + } + } + None + } +} + +impl TryFrom<VsockArgs> for Vec<VsockConfig> { + type Error = CliError; + + fn try_from(cmd_args: VsockArgs) -> Result<Self, CliError> { + // we try to use the configuration first, if failed, then fall back to the manual settings. + match cmd_args.parse_config() { + Some(c) => c, + _ => match cmd_args.vm { + Some(v) => Ok(v), + _ => cmd_args.param.map_or(Err(CliError::NoArgsProvided), |p| { + Ok(vec![VsockConfig::new( + p.guest_cid, + p.socket.trim().to_string(), + p.uds_path.trim().to_string(), + p.tx_buffer_size, + )]) + }), + }, + } + } +} + +/// This is the public API through which an external program starts the +/// vhost-device-vsock backend server. +pub(crate) fn start_backend_server( + config: VsockConfig, + cid_map: Arc<RwLock<CidMap>>, +) -> Result<(), BackendError> { + loop { + let backend = Arc::new( + VhostUserVsockBackend::new(config.clone(), cid_map.clone()) + .map_err(BackendError::CouldNotCreateBackend)?, + ); + + let listener = Listener::new(config.get_socket_path(), true).unwrap(); + + let mut daemon = VhostUserDaemon::new( + String::from("vhost-device-vsock"), + backend.clone(), + GuestMemoryAtomic::new(GuestMemoryMmap::new()), + ) + .map_err(BackendError::CouldNotCreateDaemon)?; + + let mut vring_workers = daemon.get_epoll_handlers(); + + for thread in backend.threads.iter() { + thread + .lock() + .unwrap() + .set_vring_worker(Some(vring_workers.remove(0))); + } + + daemon.start(listener).unwrap(); + + match daemon.wait() { + Ok(()) => { + info!("Stopping cleanly"); + } + Err(vhost_user_backend::Error::HandleRequest( + vhost_user::Error::PartialMessage | vhost_user::Error::Disconnected, + )) => { + info!("vhost-user connection closed with partial message. If the VM is shutting down, this is expected behavior; otherwise, it might be a bug."); + } + Err(e) => { + warn!("Error running daemon: {:?}", e); + } + } + + // No matter the result, we need to shut down the worker thread. + backend.exit_event.write(1).unwrap(); + } +} + +pub(crate) fn start_backend_servers(configs: &[VsockConfig]) -> Result<(), BackendError> { + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + let mut handles = Vec::new(); + + for c in configs.iter() { + let config = c.clone(); + let cid_map = cid_map.clone(); + let handle = thread::Builder::new() + .name(format!("vhu-vsock-cid-{}", c.get_guest_cid())) + .spawn(move || start_backend_server(config, cid_map)) + .unwrap(); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap()?; + } + + Ok(()) +} + +fn main() { + env_logger::init(); + + let configs = match Vec::<VsockConfig>::try_from(VsockArgs::parse()) { + Ok(c) => c, + Err(e) => { + println!("Error parsing arguments: {}", e); + return; + } + }; + + if let Err(e) = start_backend_servers(&configs) { + error!("{e}"); + exit(1); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use std::io::Write; + use tempfile::tempdir; + + impl VsockArgs { + fn from_args(guest_cid: u64, socket: &str, uds_path: &str, tx_buffer_size: u32) -> Self { + VsockArgs { + param: Some(VsockParam { + guest_cid, + socket: socket.to_string(), + uds_path: uds_path.to_string(), + tx_buffer_size, + }), + vm: None, + config: None, + } + } + fn from_file(config: &str) -> Self { + VsockArgs { + param: None, + vm: None, + config: Some(config.to_string()), + } + } + } + + #[test] + fn test_vsock_config_setup() { + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let socket_path = test_dir.path().join("vhost4.socket").display().to_string(); + let uds_path = test_dir.path().join("vm4.vsock").display().to_string(); + let args = VsockArgs::from_args(3, &socket_path, &uds_path, 64 * 1024); + + let configs = Vec::<VsockConfig>::try_from(args); + assert!(configs.is_ok()); + + let configs = configs.unwrap(); + assert_eq!(configs.len(), 1); + + let config = &configs[0]; + assert_eq!(config.get_guest_cid(), 3); + assert_eq!(config.get_socket_path(), socket_path); + assert_eq!(config.get_uds_path(), uds_path); + assert_eq!(config.get_tx_buffer_size(), 64 * 1024); + + test_dir.close().unwrap(); + } + + #[test] + fn test_vsock_config_setup_from_vm_args() { + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let socket_paths = [ + test_dir.path().join("vhost3.socket"), + test_dir.path().join("vhost4.socket"), + test_dir.path().join("vhost5.socket"), + ]; + let uds_paths = [ + test_dir.path().join("vm3.vsock"), + test_dir.path().join("vm4.vsock"), + test_dir.path().join("vm5.vsock"), + ]; + let params = format!( + "--vm socket={vhost3_socket},uds_path={vm3_vsock} \ + --vm socket={vhost4_socket},uds-path={vm4_vsock},guest-cid=4,tx_buffer_size=65536 \ + --vm guest-cid=5,socket={vhost5_socket},uds_path={vm5_vsock},tx-buffer-size=32768", + vhost3_socket = socket_paths[0].display(), + vhost4_socket = socket_paths[1].display(), + vhost5_socket = socket_paths[2].display(), + vm3_vsock = uds_paths[0].display(), + vm4_vsock = uds_paths[1].display(), + vm5_vsock = uds_paths[2].display(), + ); + + let mut params = params.split_whitespace().collect::<Vec<&str>>(); + params.insert(0, ""); // to make the test binary name agnostic + + let args = VsockArgs::parse_from(params); + + let configs = Vec::<VsockConfig>::try_from(args); + assert!(configs.is_ok()); + + let configs = configs.unwrap(); + assert_eq!(configs.len(), 3); + + let config = configs.get(0).unwrap(); + assert_eq!(config.get_guest_cid(), 3); + assert_eq!( + config.get_socket_path(), + socket_paths[0].display().to_string() + ); + assert_eq!(config.get_uds_path(), uds_paths[0].display().to_string()); + assert_eq!(config.get_tx_buffer_size(), 65536); + + let config = configs.get(1).unwrap(); + assert_eq!(config.get_guest_cid(), 4); + assert_eq!( + config.get_socket_path(), + socket_paths[1].display().to_string() + ); + assert_eq!(config.get_uds_path(), uds_paths[1].display().to_string()); + assert_eq!(config.get_tx_buffer_size(), 65536); + + let config = configs.get(2).unwrap(); + assert_eq!(config.get_guest_cid(), 5); + assert_eq!( + config.get_socket_path(), + socket_paths[2].display().to_string() + ); + assert_eq!(config.get_uds_path(), uds_paths[2].display().to_string()); + assert_eq!(config.get_tx_buffer_size(), 32768); + + test_dir.close().unwrap(); + } + + #[test] + fn test_vsock_config_setup_from_file() { + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let config_path = test_dir.path().join("config.yaml"); + let socket_path = test_dir.path().join("vhost4.socket"); + let uds_path = test_dir.path().join("vm4.vsock"); + + let mut yaml = File::create(&config_path).unwrap(); + yaml.write_all( + format!( + "vms: + - guest_cid: 4 + socket: {} + uds_path: {} + tx_buffer_size: 65536", + socket_path.display(), + uds_path.display(), + ) + .as_bytes(), + ) + .unwrap(); + let args = VsockArgs::from_file(&config_path.display().to_string()); + + let configs = Vec::<VsockConfig>::try_from(args).unwrap(); + assert_eq!(configs.len(), 1); + + let config = &configs[0]; + assert_eq!(config.get_guest_cid(), 4); + assert_eq!(config.get_socket_path(), socket_path.display().to_string()); + assert_eq!(config.get_uds_path(), uds_path.display().to_string()); + std::fs::remove_file(&config_path).unwrap(); + + test_dir.close().unwrap(); + } + + #[test] + fn test_vsock_server() { + const CID: u64 = 3; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let vhost_socket_path = test_dir + .path() + .join("test_vsock_server.socket") + .display() + .to_string(); + let vsock_socket_path = test_dir + .path() + .join("test_vsock_server.vsock") + .display() + .to_string(); + + let config = VsockConfig::new(CID, vhost_socket_path, vsock_socket_path, CONN_TX_BUF_SIZE); + + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let backend = Arc::new(VhostUserVsockBackend::new(config, cid_map).unwrap()); + + let daemon = VhostUserDaemon::new( + String::from("vhost-device-vsock"), + backend.clone(), + GuestMemoryAtomic::new(GuestMemoryMmap::new()), + ) + .unwrap(); + + let vring_workers = daemon.get_epoll_handlers(); + + // VhostUserVsockBackend support a single thread that handles the TX and RX queues + assert_eq!(backend.threads.len(), 1); + + assert_eq!(vring_workers.len(), backend.threads.len()); + + test_dir.close().unwrap(); + } +} diff --git a/src/rxops.rs b/src/rxops.rs new file mode 100644 index 0000000..6f20466 --- /dev/null +++ b/src/rxops.rs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +pub(crate) enum RxOps { + /// VSOCK_OP_REQUEST + Request = 0, + /// VSOCK_OP_RW + Rw = 1, + /// VSOCK_OP_RESPONSE + Response = 2, + /// VSOCK_OP_CREDIT_UPDATE + CreditUpdate = 3, + /// VSOCK_OP_RST + Reset = 4, +} + +impl RxOps { + /// Convert enum value into bitmask. + pub fn bitmask(self) -> u8 { + 1u8 << (self as u8) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bitmask() { + assert_eq!(1, RxOps::Request.bitmask()); + assert_eq!(2, RxOps::Rw.bitmask()); + assert_eq!(4, RxOps::Response.bitmask()); + assert_eq!(8, RxOps::CreditUpdate.bitmask()); + assert_eq!(16, RxOps::Reset.bitmask()); + } +} diff --git a/src/rxqueue.rs b/src/rxqueue.rs new file mode 100644 index 0000000..4b76727 --- /dev/null +++ b/src/rxqueue.rs @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use crate::rxops::RxOps; + +#[derive(Debug, Eq, PartialEq)] +pub(crate) struct RxQueue { + /// Bitmap of rx operations. + queue: u8, +} + +impl RxQueue { + /// New instance of RxQueue. + pub fn new() -> Self { + RxQueue { queue: 0_u8 } + } + + /// Enqueue a new rx operation into the queue. + pub fn enqueue(&mut self, op: RxOps) { + self.queue |= op.bitmask(); + } + + /// Dequeue an rx operation from the queue. + pub fn dequeue(&mut self) -> Option<RxOps> { + match self.peek() { + Some(req) => { + self.queue &= !req.bitmask(); + Some(req) + } + None => None, + } + } + + /// Peek into the queue to check if it contains an rx operation. + pub fn peek(&self) -> Option<RxOps> { + if self.contains(RxOps::Request.bitmask()) { + return Some(RxOps::Request); + } + if self.contains(RxOps::Rw.bitmask()) { + return Some(RxOps::Rw); + } + if self.contains(RxOps::Response.bitmask()) { + return Some(RxOps::Response); + } + if self.contains(RxOps::CreditUpdate.bitmask()) { + return Some(RxOps::CreditUpdate); + } + if self.contains(RxOps::Reset.bitmask()) { + Some(RxOps::Reset) + } else { + None + } + } + + /// Check if the queue contains a particular rx operation. + pub fn contains(&self, op: u8) -> bool { + (self.queue & op) != 0 + } + + /// Check if there are any pending rx operations in the queue. + pub fn pending_rx(&self) -> bool { + self.queue != 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_contains() { + let mut rxqueue = RxQueue::new(); + rxqueue.queue = 31; + + assert!(rxqueue.contains(RxOps::Request.bitmask())); + assert!(rxqueue.contains(RxOps::Rw.bitmask())); + assert!(rxqueue.contains(RxOps::Response.bitmask())); + assert!(rxqueue.contains(RxOps::CreditUpdate.bitmask())); + assert!(rxqueue.contains(RxOps::Reset.bitmask())); + + rxqueue.queue = 0; + assert!(!rxqueue.contains(RxOps::Request.bitmask())); + assert!(!rxqueue.contains(RxOps::Rw.bitmask())); + assert!(!rxqueue.contains(RxOps::Response.bitmask())); + assert!(!rxqueue.contains(RxOps::CreditUpdate.bitmask())); + assert!(!rxqueue.contains(RxOps::Reset.bitmask())); + } + + #[test] + fn test_enqueue() { + let mut rxqueue = RxQueue::new(); + + rxqueue.enqueue(RxOps::Request); + assert!(rxqueue.contains(RxOps::Request.bitmask())); + + rxqueue.enqueue(RxOps::Rw); + assert!(rxqueue.contains(RxOps::Rw.bitmask())); + + rxqueue.enqueue(RxOps::Response); + assert!(rxqueue.contains(RxOps::Response.bitmask())); + + rxqueue.enqueue(RxOps::CreditUpdate); + assert!(rxqueue.contains(RxOps::CreditUpdate.bitmask())); + + rxqueue.enqueue(RxOps::Reset); + assert!(rxqueue.contains(RxOps::Reset.bitmask())); + } + + #[test] + fn test_peek() { + let mut rxqueue = RxQueue::new(); + + rxqueue.queue = 31; + assert_eq!(rxqueue.peek(), Some(RxOps::Request)); + + rxqueue.queue = 30; + assert_eq!(rxqueue.peek(), Some(RxOps::Rw)); + + rxqueue.queue = 28; + assert_eq!(rxqueue.peek(), Some(RxOps::Response)); + + rxqueue.queue = 24; + assert_eq!(rxqueue.peek(), Some(RxOps::CreditUpdate)); + + rxqueue.queue = 16; + assert_eq!(rxqueue.peek(), Some(RxOps::Reset)); + } + + #[test] + fn test_dequeue() { + let mut rxqueue = RxQueue::new(); + rxqueue.queue = 31; + + assert_eq!(rxqueue.dequeue(), Some(RxOps::Request)); + assert!(!rxqueue.contains(RxOps::Request.bitmask())); + + assert_eq!(rxqueue.dequeue(), Some(RxOps::Rw)); + assert!(!rxqueue.contains(RxOps::Rw.bitmask())); + + assert_eq!(rxqueue.dequeue(), Some(RxOps::Response)); + assert!(!rxqueue.contains(RxOps::Response.bitmask())); + + assert_eq!(rxqueue.dequeue(), Some(RxOps::CreditUpdate)); + assert!(!rxqueue.contains(RxOps::CreditUpdate.bitmask())); + + assert_eq!(rxqueue.dequeue(), Some(RxOps::Reset)); + assert!(!rxqueue.contains(RxOps::Reset.bitmask())); + } + + #[test] + fn test_pending_rx() { + let mut rxqueue = RxQueue::new(); + assert!(!rxqueue.pending_rx()); + + rxqueue.queue = 1; + assert!(rxqueue.pending_rx()); + } +} diff --git a/src/thread_backend.rs b/src/thread_backend.rs new file mode 100644 index 0000000..5677ec5 --- /dev/null +++ b/src/thread_backend.rs @@ -0,0 +1,506 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + os::unix::{ + net::UnixStream, + prelude::{AsRawFd, RawFd}, + }, + sync::{Arc, RwLock}, +}; + +use log::{info, warn}; +use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; +use vm_memory::bitmap::BitmapSlice; + +use crate::{ + rxops::*, + vhu_vsock::{ + CidMap, ConnMapKey, Error, Result, VSOCK_HOST_CID, VSOCK_OP_REQUEST, VSOCK_OP_RST, + VSOCK_TYPE_STREAM, + }, + vhu_vsock_thread::VhostUserVsockThread, + vsock_conn::*, +}; + +pub(crate) type RawPktsQ = VecDeque<RawVsockPacket>; + +pub(crate) struct RawVsockPacket { + pub header: [u8; PKT_HEADER_SIZE], + pub data: Vec<u8>, +} + +impl RawVsockPacket { + fn from_vsock_packet<B: BitmapSlice>(pkt: &VsockPacket<B>) -> Result<Self> { + let mut raw_pkt = Self { + header: [0; PKT_HEADER_SIZE], + data: vec![0; pkt.len() as usize], + }; + + pkt.header_slice().copy_to(&mut raw_pkt.header); + if !pkt.is_empty() { + pkt.data_slice() + .ok_or(Error::PktBufMissing)? + .copy_to(raw_pkt.data.as_mut()); + } + + Ok(raw_pkt) + } +} + +pub(crate) struct VsockThreadBackend { + /// Map of ConnMapKey objects indexed by raw file descriptors. + pub listener_map: HashMap<RawFd, ConnMapKey>, + /// Map of vsock connection objects indexed by ConnMapKey objects. + pub conn_map: HashMap<ConnMapKey, VsockConnection<UnixStream>>, + /// Queue of ConnMapKey objects indicating pending rx operations. + pub backend_rxq: VecDeque<ConnMapKey>, + /// Map of host-side unix streams indexed by raw file descriptors. + pub stream_map: HashMap<i32, UnixStream>, + /// Host side socket for listening to new connections from the host. + host_socket_path: String, + /// epoll for registering new host-side connections. + epoll_fd: i32, + /// CID of the guest. + guest_cid: u64, + /// Set of allocated local ports. + pub local_port_set: HashSet<u32>, + tx_buffer_size: u32, + /// Maps the guest CID to the corresponding backend. Used for sibling VM communication. + pub cid_map: Arc<RwLock<CidMap>>, + /// Queue of raw vsock packets recieved from sibling VMs to be sent to the guest. + pub raw_pkts_queue: Arc<RwLock<RawPktsQ>>, +} + +impl VsockThreadBackend { + /// New instance of VsockThreadBackend. + pub fn new( + host_socket_path: String, + epoll_fd: i32, + guest_cid: u64, + tx_buffer_size: u32, + cid_map: Arc<RwLock<CidMap>>, + ) -> Self { + Self { + listener_map: HashMap::new(), + conn_map: HashMap::new(), + backend_rxq: VecDeque::new(), + // Need this map to prevent connected stream from closing + // TODO: think of a better solution + stream_map: HashMap::new(), + host_socket_path, + epoll_fd, + guest_cid, + local_port_set: HashSet::new(), + tx_buffer_size, + cid_map, + raw_pkts_queue: Arc::new(RwLock::new(VecDeque::new())), + } + } + + /// Checks if there are pending rx requests in the backend rxq. + pub fn pending_rx(&self) -> bool { + !self.backend_rxq.is_empty() + } + + /// Checks if there are pending raw vsock packets to be sent to the guest. + pub fn pending_raw_pkts(&self) -> bool { + !self.raw_pkts_queue.read().unwrap().is_empty() + } + + /// Deliver a vsock packet to the guest vsock driver. + /// + /// Returns: + /// - `Ok(())` if the packet was successfully filled in + /// - `Err(Error::EmptyBackendRxQ) if there was no available data + pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> { + // Pop an event from the backend_rxq + let key = self.backend_rxq.pop_front().ok_or(Error::EmptyBackendRxQ)?; + let conn = match self.conn_map.get_mut(&key) { + Some(conn) => conn, + None => { + // assume that the connection does not exist + return Ok(()); + } + }; + + if conn.rx_queue.peek() == Some(RxOps::Reset) { + // Handle RST events here + let conn = self.conn_map.remove(&key).unwrap(); + self.listener_map.remove(&conn.stream.as_raw_fd()); + self.stream_map.remove(&conn.stream.as_raw_fd()); + self.local_port_set.remove(&conn.local_port); + VhostUserVsockThread::epoll_unregister(conn.epoll_fd, conn.stream.as_raw_fd()) + .unwrap_or_else(|err| { + warn!( + "Could not remove epoll listener for fd {:?}: {:?}", + conn.stream.as_raw_fd(), + err + ) + }); + + // Initialize the packet header to contain a VSOCK_OP_RST operation + pkt.set_op(VSOCK_OP_RST) + .set_src_cid(VSOCK_HOST_CID) + .set_dst_cid(conn.guest_cid) + .set_src_port(conn.local_port) + .set_dst_port(conn.peer_port) + .set_len(0) + .set_type(VSOCK_TYPE_STREAM) + .set_flags(0) + .set_buf_alloc(0) + .set_fwd_cnt(0); + + return Ok(()); + } + + // Handle other packet types per connection + conn.recv_pkt(pkt)?; + + Ok(()) + } + + /// Deliver a guest generated packet to its destination in the backend. + /// + /// Absorbs unexpected packets, handles rest to respective connection + /// object. + /// + /// Returns: + /// - always `Ok(())` if packet has been consumed correctly + pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> { + if pkt.src_cid() != self.guest_cid { + warn!( + "vsock: dropping packet with inconsistent src_cid: {:?} from guest configured with CID: {:?}", + pkt.src_cid(), self.guest_cid + ); + return Ok(()); + } + + let dst_cid = pkt.dst_cid(); + if dst_cid != VSOCK_HOST_CID { + let cid_map = self.cid_map.read().unwrap(); + if cid_map.contains_key(&dst_cid) { + let (sibling_raw_pkts_queue, sibling_event_fd) = cid_map.get(&dst_cid).unwrap(); + + sibling_raw_pkts_queue + .write() + .unwrap() + .push_back(RawVsockPacket::from_vsock_packet(pkt)?); + let _ = sibling_event_fd.write(1); + } else { + warn!("vsock: dropping packet for unknown cid: {:?}", dst_cid); + } + + return Ok(()); + } + + // TODO: Rst if packet has unsupported type + if pkt.type_() != VSOCK_TYPE_STREAM { + info!("vsock: dropping packet of unknown type"); + return Ok(()); + } + + let key = ConnMapKey::new(pkt.dst_port(), pkt.src_port()); + + // TODO: Handle cases where connection does not exist and packet op + // is not VSOCK_OP_REQUEST + if !self.conn_map.contains_key(&key) { + // The packet contains a new connection request + if pkt.op() == VSOCK_OP_REQUEST { + self.handle_new_guest_conn(pkt); + } else { + // TODO: send back RST + } + return Ok(()); + } + + if pkt.op() == VSOCK_OP_RST { + // Handle an RST packet from the guest here + let conn = self.conn_map.get(&key).unwrap(); + if conn.rx_queue.contains(RxOps::Reset.bitmask()) { + return Ok(()); + } + let conn = self.conn_map.remove(&key).unwrap(); + self.listener_map.remove(&conn.stream.as_raw_fd()); + self.stream_map.remove(&conn.stream.as_raw_fd()); + self.local_port_set.remove(&conn.local_port); + VhostUserVsockThread::epoll_unregister(conn.epoll_fd, conn.stream.as_raw_fd()) + .unwrap_or_else(|err| { + warn!( + "Could not remove epoll listener for fd {:?}: {:?}", + conn.stream.as_raw_fd(), + err + ) + }); + return Ok(()); + } + + // Forward this packet to its listening connection + let conn = self.conn_map.get_mut(&key).unwrap(); + conn.send_pkt(pkt)?; + + if conn.rx_queue.pending_rx() { + // Required if the connection object adds new rx operations + self.backend_rxq.push_back(key); + } + + Ok(()) + } + + /// Deliver a raw vsock packet sent from a sibling VM to the guest vsock driver. + /// + /// Returns: + /// - `Ok(())` if packet was successfully filled in + /// - `Err(Error::EmptyRawPktsQueue)` if there was no available data + pub fn recv_raw_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> { + let raw_vsock_pkt = self + .raw_pkts_queue + .write() + .unwrap() + .pop_front() + .ok_or(Error::EmptyRawPktsQueue)?; + + pkt.set_header_from_raw(&raw_vsock_pkt.header).unwrap(); + if !raw_vsock_pkt.data.is_empty() { + let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?; + buf.copy_from(&raw_vsock_pkt.data); + } + + Ok(()) + } + + /// Handle a new guest initiated connection, i.e from the peer, the guest driver. + /// + /// Attempts to connect to a host side unix socket listening on a path + /// corresponding to the destination port as follows: + /// - "{self.host_sock_path}_{local_port}"" + fn handle_new_guest_conn<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) { + let port_path = format!("{}_{}", self.host_socket_path, pkt.dst_port()); + + UnixStream::connect(port_path) + .and_then(|stream| stream.set_nonblocking(true).map(|_| stream)) + .map_err(Error::UnixConnect) + .and_then(|stream| self.add_new_guest_conn(stream, pkt)) + .unwrap_or_else(|_| self.enq_rst()); + } + + /// Wrapper to add new connection to relevant HashMaps. + fn add_new_guest_conn<B: BitmapSlice>( + &mut self, + stream: UnixStream, + pkt: &VsockPacket<B>, + ) -> Result<()> { + let stream_fd = stream.as_raw_fd(); + self.listener_map + .insert(stream_fd, ConnMapKey::new(pkt.dst_port(), pkt.src_port())); + + let conn = VsockConnection::new_peer_init( + stream.try_clone().map_err(Error::UnixConnect)?, + pkt.dst_cid(), + pkt.dst_port(), + pkt.src_cid(), + pkt.src_port(), + self.epoll_fd, + pkt.buf_alloc(), + self.tx_buffer_size, + ); + + self.conn_map + .insert(ConnMapKey::new(pkt.dst_port(), pkt.src_port()), conn); + self.backend_rxq + .push_back(ConnMapKey::new(pkt.dst_port(), pkt.src_port())); + + self.stream_map.insert(stream_fd, stream); + self.local_port_set.insert(pkt.dst_port()); + + VhostUserVsockThread::epoll_register( + self.epoll_fd, + stream_fd, + epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, + )?; + Ok(()) + } + + /// Enqueue RST packets to be sent to guest. + fn enq_rst(&mut self) { + // TODO + dbg!("New guest conn error: Enqueue RST"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::vhu_vsock::{VhostUserVsockBackend, VsockConfig, VSOCK_OP_RW}; + use std::os::unix::net::UnixListener; + use tempfile::tempdir; + use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; + + const DATA_LEN: usize = 16; + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + #[test] + fn test_vsock_thread_backend() { + const CID: u64 = 3; + const VSOCK_PEER_PORT: u32 = 1234; + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let vsock_socket_path = test_dir.path().join("test_vsock_thread_backend.vsock"); + let vsock_peer_path = test_dir.path().join("test_vsock_thread_backend.vsock_1234"); + + let _listener = UnixListener::bind(&vsock_peer_path).unwrap(); + + let epoll_fd = epoll::create(false).unwrap(); + + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let mut vtp = VsockThreadBackend::new( + vsock_socket_path.display().to_string(), + epoll_fd, + CID, + CONN_TX_BUF_SIZE, + cid_map, + ); + + assert!(!vtp.pending_rx()); + + let mut pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN]; + let (hdr_raw, data_raw) = pkt_raw.split_at_mut(PKT_HEADER_SIZE); + + // SAFETY: Safe as hdr_raw and data_raw are guaranteed to be valid. + let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() }; + + assert_eq!( + vtp.recv_pkt(&mut packet).unwrap_err().to_string(), + Error::EmptyBackendRxQ.to_string() + ); + + assert!(vtp.send_pkt(&packet).is_ok()); + + packet.set_type(VSOCK_TYPE_STREAM); + assert!(vtp.send_pkt(&packet).is_ok()); + + packet.set_src_cid(CID); + packet.set_dst_cid(VSOCK_HOST_CID); + packet.set_dst_port(VSOCK_PEER_PORT); + assert!(vtp.send_pkt(&packet).is_ok()); + + packet.set_op(VSOCK_OP_REQUEST); + assert!(vtp.send_pkt(&packet).is_ok()); + + packet.set_op(VSOCK_OP_RW); + assert!(vtp.send_pkt(&packet).is_ok()); + + packet.set_op(VSOCK_OP_RST); + assert!(vtp.send_pkt(&packet).is_ok()); + + assert!(vtp.recv_pkt(&mut packet).is_ok()); + + // cleanup + let _ = std::fs::remove_file(&vsock_peer_path); + let _ = std::fs::remove_file(&vsock_socket_path); + + test_dir.close().unwrap(); + } + + #[test] + fn test_vsock_thread_backend_sibling_vms() { + const CID: u64 = 3; + const SIBLING_CID: u64 = 4; + const SIBLING_LISTENING_PORT: u32 = 1234; + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let vsock_socket_path = test_dir + .path() + .join("test_vsock_thread_backend.vsock") + .display() + .to_string(); + let sibling_vhost_socket_path = test_dir + .path() + .join("test_vsock_thread_backend_sibling.socket") + .display() + .to_string(); + let sibling_vsock_socket_path = test_dir + .path() + .join("test_vsock_thread_backend_sibling.vsock") + .display() + .to_string(); + + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let sibling_config = VsockConfig::new( + SIBLING_CID, + sibling_vhost_socket_path, + sibling_vsock_socket_path, + CONN_TX_BUF_SIZE, + ); + + let sibling_backend = + Arc::new(VhostUserVsockBackend::new(sibling_config, cid_map.clone()).unwrap()); + + let epoll_fd = epoll::create(false).unwrap(); + let mut vtp = + VsockThreadBackend::new(vsock_socket_path, epoll_fd, CID, CONN_TX_BUF_SIZE, cid_map); + + assert!(!vtp.pending_raw_pkts()); + + let mut pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN]; + let (hdr_raw, data_raw) = pkt_raw.split_at_mut(PKT_HEADER_SIZE); + + // SAFETY: Safe as hdr_raw and data_raw are guaranteed to be valid. + let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() }; + + assert_eq!( + vtp.recv_raw_pkt(&mut packet).unwrap_err().to_string(), + Error::EmptyRawPktsQueue.to_string() + ); + + packet.set_type(VSOCK_TYPE_STREAM); + packet.set_src_cid(CID); + packet.set_dst_cid(SIBLING_CID); + packet.set_dst_port(SIBLING_LISTENING_PORT); + packet.set_op(VSOCK_OP_RW); + packet.set_len(DATA_LEN as u32); + packet + .data_slice() + .unwrap() + .copy_from(&[0xCAu8, 0xFEu8, 0xBAu8, 0xBEu8]); + + assert!(vtp.send_pkt(&packet).is_ok()); + assert!(sibling_backend.threads[0] + .lock() + .unwrap() + .thread_backend + .pending_raw_pkts()); + + let mut recvd_pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN]; + let (recvd_hdr_raw, recvd_data_raw) = recvd_pkt_raw.split_at_mut(PKT_HEADER_SIZE); + + let mut recvd_packet = + // SAFETY: Safe as recvd_hdr_raw and recvd_data_raw are guaranteed to be valid. + unsafe { VsockPacket::new(recvd_hdr_raw, Some(recvd_data_raw)).unwrap() }; + + assert!(sibling_backend.threads[0] + .lock() + .unwrap() + .thread_backend + .recv_raw_pkt(&mut recvd_packet) + .is_ok()); + + assert_eq!(recvd_packet.type_(), VSOCK_TYPE_STREAM); + assert_eq!(recvd_packet.src_cid(), CID); + assert_eq!(recvd_packet.dst_cid(), SIBLING_CID); + assert_eq!(recvd_packet.dst_port(), SIBLING_LISTENING_PORT); + assert_eq!(recvd_packet.op(), VSOCK_OP_RW); + assert_eq!(recvd_packet.len(), DATA_LEN as u32); + + assert_eq!(recvd_data_raw[0], 0xCAu8); + assert_eq!(recvd_data_raw[1], 0xFEu8); + assert_eq!(recvd_data_raw[2], 0xBAu8); + assert_eq!(recvd_data_raw[3], 0xBEu8); + + test_dir.close().unwrap(); + } +} diff --git a/src/txbuf.rs b/src/txbuf.rs new file mode 100644 index 0000000..ef718d7 --- /dev/null +++ b/src/txbuf.rs @@ -0,0 +1,233 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use std::{io::Write, num::Wrapping}; + +use vm_memory::{bitmap::BitmapSlice, VolatileSlice}; + +use crate::vhu_vsock::{Error, Result}; + +#[derive(Debug)] +pub(crate) struct LocalTxBuf { + /// Buffer holding data to be forwarded to a host-side application + buf: Vec<u8>, + /// Index into buffer from which data can be consumed from the buffer + head: Wrapping<u32>, + /// Index into buffer from which data can be added to the buffer + tail: Wrapping<u32>, +} + +impl LocalTxBuf { + /// Create a new instance of LocalTxBuf. + pub fn new(buf_size: u32) -> Self { + Self { + buf: vec![0; buf_size as usize], + head: Wrapping(0), + tail: Wrapping(0), + } + } + + /// Get the buffer size + pub fn get_buf_size(&self) -> u32 { + self.buf.len() as u32 + } + + /// Check if the buf is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Add new data to the tx buffer, push all or none. + /// Returns LocalTxBufFull error if space not sufficient. + pub fn push<B: BitmapSlice>(&mut self, data_buf: &VolatileSlice<B>) -> Result<()> { + if self.get_buf_size() as usize - self.len() < data_buf.len() { + // Tx buffer is full + return Err(Error::LocalTxBufFull); + } + + // Get index into buffer at which data can be inserted + let tail_idx = self.tail.0 as usize % self.get_buf_size() as usize; + + // Check if we can fit the data buffer between head and end of buffer + let len = std::cmp::min(self.get_buf_size() as usize - tail_idx, data_buf.len()); + let txbuf = &mut self.buf[tail_idx..tail_idx + len]; + data_buf.copy_to(txbuf); + + // Check if there is more data to be wrapped around + if len < data_buf.len() { + let remain_txbuf = &mut self.buf[..(data_buf.len() - len)]; + data_buf.copy_to(remain_txbuf); + } + + // Increment tail by the amount of data that has been added to the buffer + self.tail += Wrapping(data_buf.len() as u32); + + Ok(()) + } + + /// Flush buf data to stream. + pub fn flush_to<S: Write>(&mut self, stream: &mut S) -> Result<usize> { + if self.is_empty() { + // No data to be flushed + return Ok(0); + } + + // Get index into buffer from which data can be read + let head_idx = self.head.0 as usize % self.get_buf_size() as usize; + + // First write from head to end of buffer + let len = std::cmp::min(self.get_buf_size() as usize - head_idx, self.len()); + let written = stream + .write(&self.buf[head_idx..(head_idx + len)]) + .map_err(Error::LocalTxBufFlush)?; + + // Increment head by amount of data that has been flushed to the stream + self.head += Wrapping(written as u32); + + // If written length is less than the expected length we can try again in the future + if written < len { + return Ok(written); + } + + // The head index has wrapped around the end of the buffer, we call self again + Ok(written + self.flush_to(stream).unwrap_or(0)) + } + + /// Return amount of data in the buffer. + fn len(&self) -> usize { + (self.tail - self.head).0 as usize + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + #[test] + fn test_txbuf_len() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + + // Zero length tx buf + assert_eq!(loc_tx_buf.len(), 0); + + // finite length tx buf + loc_tx_buf.head = Wrapping(0); + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE); + assert_eq!(loc_tx_buf.len(), CONN_TX_BUF_SIZE as usize); + + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE / 2); + assert_eq!(loc_tx_buf.len(), (CONN_TX_BUF_SIZE / 2) as usize); + + loc_tx_buf.head = Wrapping(256); + assert_eq!(loc_tx_buf.len(), 32512); + } + + #[test] + fn test_txbuf_is_empty() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + + // empty tx buffer + assert!(loc_tx_buf.is_empty()); + + // non empty tx buffer + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE); + assert!(!loc_tx_buf.is_empty()); + } + + #[test] + fn test_txbuf_push() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + let mut buf = [0; CONN_TX_BUF_SIZE as usize]; + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + + // push data into empty tx buffer + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + assert_eq!(loc_tx_buf.head, Wrapping(0)); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE)); + + // push data into full tx buffer + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_err()); + + // head and tail wrap at full + loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE); + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE * 2)); + + // only tail wraps at full + let mut buf = vec![1; 4]; + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + let mut cmp_data = vec![1; 4]; + cmp_data.append(&mut vec![0; (CONN_TX_BUF_SIZE - 4) as usize]); + loc_tx_buf.head = Wrapping(4); + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE); + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + assert_eq!(loc_tx_buf.head, Wrapping(4)); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE + 4)); + assert_eq!(loc_tx_buf.buf, cmp_data); + } + + #[test] + fn test_txbuf_flush_to() { + let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE); + + // data to be flushed + let mut buf = vec![1; CONN_TX_BUF_SIZE as usize]; + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + + // target to which data is flushed + let mut cmp_vec = Vec::with_capacity(data.len()); + + // flush no data + let res_flush = loc_tx_buf.flush_to(&mut cmp_vec); + assert!(res_flush.is_ok()); + assert_eq!(res_flush.unwrap(), 0); + + // flush data of CONN_TX_BUF_SIZE amount + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + let res_flush = loc_tx_buf.flush_to(&mut cmp_vec); + if let Ok(n) = res_flush { + assert_eq!(loc_tx_buf.head, Wrapping(n as u32)); + assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE)); + assert_eq!(n, cmp_vec.len()); + assert_eq!(cmp_vec, buf[..n]); + } + + // wrapping head flush + let mut buf = vec![0; (CONN_TX_BUF_SIZE / 2) as usize]; + buf.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]); + // SAFETY: Safe as the buffer is guaranteed to be valid here. + let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) }; + + loc_tx_buf.head = Wrapping(0); + loc_tx_buf.tail = Wrapping(0); + let res_push = loc_tx_buf.push(&data); + assert!(res_push.is_ok()); + cmp_vec.clear(); + loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE / 2); + loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2)); + let res_flush = loc_tx_buf.flush_to(&mut cmp_vec); + if let Ok(n) = res_flush { + assert_eq!( + loc_tx_buf.head, + Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2)) + ); + assert_eq!( + loc_tx_buf.tail, + Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2)) + ); + assert_eq!(n, cmp_vec.len()); + let mut data = vec![1; (CONN_TX_BUF_SIZE / 2) as usize]; + data.append(&mut vec![0; (CONN_TX_BUF_SIZE / 2) as usize]); + assert_eq!(cmp_vec, data[..n]); + } + } +} diff --git a/src/vhu_vsock.rs b/src/vhu_vsock.rs new file mode 100644 index 0000000..bb59e05 --- /dev/null +++ b/src/vhu_vsock.rs @@ -0,0 +1,518 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use std::{ + collections::HashMap, + io::{self, Result as IoResult}, + sync::{Arc, Mutex, RwLock}, + u16, u32, u64, u8, +}; + +use log::warn; +use thiserror::Error as ThisError; +use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures}; +use vhost_user_backend::{VhostUserBackend, VringRwLock}; +use virtio_bindings::bindings::{ + virtio_config::VIRTIO_F_NOTIFY_ON_EMPTY, virtio_config::VIRTIO_F_VERSION_1, + virtio_ring::VIRTIO_RING_F_EVENT_IDX, +}; +use vm_memory::{ByteValued, GuestMemoryAtomic, GuestMemoryMmap, Le64}; +use vmm_sys_util::{ + epoll::EventSet, + eventfd::{EventFd, EFD_NONBLOCK}, +}; + +use crate::thread_backend::RawPktsQ; +use crate::vhu_vsock_thread::*; + +pub(crate) type CidMap = HashMap<u64, (Arc<RwLock<RawPktsQ>>, EventFd)>; + +const NUM_QUEUES: usize = 2; +const QUEUE_SIZE: usize = 256; + +// New descriptors pending on the rx queue +const RX_QUEUE_EVENT: u16 = 0; +// New descriptors are pending on the tx queue. +const TX_QUEUE_EVENT: u16 = 1; +// New descriptors are pending on the event queue. +const EVT_QUEUE_EVENT: u16 = 2; + +/// Notification coming from the backend. +pub(crate) const BACKEND_EVENT: u16 = 3; + +/// Notification coming from the sibling VM. +pub(crate) const SIBLING_VM_EVENT: u16 = 4; + +/// CID of the host +pub(crate) const VSOCK_HOST_CID: u64 = 2; + +/// Connection oriented packet +pub(crate) const VSOCK_TYPE_STREAM: u16 = 1; + +/// Vsock packet operation ID + +/// Connection request +pub(crate) const VSOCK_OP_REQUEST: u16 = 1; +/// Connection response +pub(crate) const VSOCK_OP_RESPONSE: u16 = 2; +/// Connection reset +pub(crate) const VSOCK_OP_RST: u16 = 3; +/// Shutdown connection +pub(crate) const VSOCK_OP_SHUTDOWN: u16 = 4; +/// Data read/write +pub(crate) const VSOCK_OP_RW: u16 = 5; +/// Flow control credit update +pub(crate) const VSOCK_OP_CREDIT_UPDATE: u16 = 6; +/// Flow control credit request +pub(crate) const VSOCK_OP_CREDIT_REQUEST: u16 = 7; + +/// Vsock packet flags + +/// VSOCK_OP_SHUTDOWN: Packet sender will receive no more data +pub(crate) const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1; +/// VSOCK_OP_SHUTDOWN: Packet sender will send no more data +pub(crate) const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2; + +// Queue mask to select vrings. +const QUEUE_MASK: u64 = 0b11; + +pub(crate) type Result<T> = std::result::Result<T, Error>; + +/// Custom error types +#[derive(Debug, ThisError)] +pub(crate) enum Error { + #[error("Failed to handle event other than EPOLLIN event")] + HandleEventNotEpollIn, + #[error("Failed to handle unknown event")] + HandleUnknownEvent, + #[error("Failed to accept new local socket connection")] + UnixAccept(std::io::Error), + #[error("Failed to bind a unix stream")] + UnixBind(std::io::Error), + #[error("Failed to create an epoll fd")] + EpollFdCreate(std::io::Error), + #[error("Failed to add to epoll")] + EpollAdd(std::io::Error), + #[error("Failed to modify evset associated with epoll")] + EpollModify(std::io::Error), + #[error("Failed to read from unix stream")] + UnixRead(std::io::Error), + #[error("Failed to convert byte array to string")] + ConvertFromUtf8(std::str::Utf8Error), + #[error("Invalid vsock connection request from host")] + InvalidPortRequest, + #[error("Unable to convert string to integer")] + ParseInteger(std::num::ParseIntError), + #[error("Error reading stream port")] + ReadStreamPort(Box<Error>), + #[error("Failed to de-register fd from epoll")] + EpollRemove(std::io::Error), + #[error("No memory configured")] + NoMemoryConfigured, + #[error("Unable to iterate queue")] + IterateQueue, + #[error("No rx request available")] + NoRequestRx, + #[error("Unable to create thread pool")] + CreateThreadPool(std::io::Error), + #[error("Packet missing data buffer")] + PktBufMissing, + #[error("Failed to connect to unix socket")] + UnixConnect(std::io::Error), + #[error("Unable to write to unix stream")] + UnixWrite, + #[error("Unable to push data to local tx buffer")] + LocalTxBufFull, + #[error("Unable to flush data from local tx buffer")] + LocalTxBufFlush(std::io::Error), + #[error("No free local port available for new host inititated connection")] + NoFreeLocalPort, + #[error("Backend rx queue is empty")] + EmptyBackendRxQ, + #[error("Failed to create an EventFd")] + EventFdCreate(std::io::Error), + #[error("Raw vsock packets queue is empty")] + EmptyRawPktsQueue, +} + +impl std::convert::From<Error> for std::io::Error { + fn from(e: Error) -> Self { + std::io::Error::new(io::ErrorKind::Other, e) + } +} + +#[derive(Debug, Clone)] +/// This structure is the public API through which an external program +/// is allowed to configure the backend. +pub(crate) struct VsockConfig { + guest_cid: u64, + socket: String, + uds_path: String, + tx_buffer_size: u32, +} + +impl VsockConfig { + /// Create a new instance of the VsockConfig struct, containing the + /// parameters to be fed into the vsock-backend server. + pub fn new(guest_cid: u64, socket: String, uds_path: String, tx_buffer_size: u32) -> Self { + Self { + guest_cid, + socket, + uds_path, + tx_buffer_size, + } + } + + /// Return the guest's current CID. + pub fn get_guest_cid(&self) -> u64 { + self.guest_cid + } + + /// Return the path of the unix domain socket which is listening to + /// requests from the host side application. + pub fn get_uds_path(&self) -> String { + String::from(&self.uds_path) + } + + /// Return the path of the unix domain socket which is listening to + /// requests from the guest. + pub fn get_socket_path(&self) -> String { + String::from(&self.socket) + } + + pub fn get_tx_buffer_size(&self) -> u32 { + self.tx_buffer_size + } +} + +/// A local port and peer port pair used to retrieve +/// the corresponding connection. +#[derive(Hash, PartialEq, Eq, Debug, Clone)] +pub(crate) struct ConnMapKey { + local_port: u32, + peer_port: u32, +} + +impl ConnMapKey { + pub fn new(local_port: u32, peer_port: u32) -> Self { + Self { + local_port, + peer_port, + } + } +} + +/// Virtio Vsock Configuration +#[derive(Copy, Clone, Debug, Default, PartialEq)] +#[repr(C)] +struct VirtioVsockConfig { + pub guest_cid: Le64, +} + +// SAFETY: The layout of the structure is fixed and can be initialized by +// reading its content from byte array. +unsafe impl ByteValued for VirtioVsockConfig {} + +pub(crate) struct VhostUserVsockBackend { + config: VirtioVsockConfig, + pub threads: Vec<Mutex<VhostUserVsockThread>>, + queues_per_thread: Vec<u64>, + pub exit_event: EventFd, +} + +impl VhostUserVsockBackend { + pub fn new(config: VsockConfig, cid_map: Arc<RwLock<CidMap>>) -> Result<Self> { + let thread = Mutex::new(VhostUserVsockThread::new( + config.get_uds_path(), + config.get_guest_cid(), + config.get_tx_buffer_size(), + cid_map, + )?); + let queues_per_thread = vec![QUEUE_MASK]; + + Ok(Self { + config: VirtioVsockConfig { + guest_cid: From::from(config.get_guest_cid()), + }, + threads: vec![thread], + queues_per_thread, + exit_event: EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?, + }) + } +} + +impl VhostUserBackend<VringRwLock, ()> for VhostUserVsockBackend { + fn num_queues(&self) -> usize { + NUM_QUEUES + } + + fn max_queue_size(&self) -> usize { + QUEUE_SIZE + } + + fn features(&self) -> u64 { + 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_F_NOTIFY_ON_EMPTY + | 1 << VIRTIO_RING_F_EVENT_IDX + | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::CONFIG + } + + fn set_event_idx(&self, enabled: bool) { + for thread in self.threads.iter() { + thread.lock().unwrap().event_idx = enabled; + } + } + + fn update_memory(&self, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>) -> IoResult<()> { + for thread in self.threads.iter() { + thread.lock().unwrap().mem = Some(atomic_mem.clone()); + } + Ok(()) + } + + fn handle_event( + &self, + device_event: u16, + evset: EventSet, + vrings: &[VringRwLock], + thread_id: usize, + ) -> IoResult<bool> { + let vring_rx = &vrings[0]; + let vring_tx = &vrings[1]; + + if evset != EventSet::IN { + return Err(Error::HandleEventNotEpollIn.into()); + } + + let mut thread = self.threads[thread_id].lock().unwrap(); + let evt_idx = thread.event_idx; + + match device_event { + RX_QUEUE_EVENT => {} + TX_QUEUE_EVENT => { + thread.process_tx(vring_tx, evt_idx)?; + } + EVT_QUEUE_EVENT => {} + BACKEND_EVENT => { + thread.process_backend_evt(evset); + if let Err(e) = thread.process_tx(vring_tx, evt_idx) { + match e { + Error::NoMemoryConfigured => { + warn!("Received a backend event before vring initialization") + } + _ => return Err(e.into()), + } + } + } + SIBLING_VM_EVENT => { + let _ = thread.sibling_event_fd.read(); + thread.process_raw_pkts(vring_rx, evt_idx)?; + return Ok(false); + } + _ => { + return Err(Error::HandleUnknownEvent.into()); + } + } + + if device_event != EVT_QUEUE_EVENT { + thread.process_rx(vring_rx, evt_idx)?; + } + + Ok(false) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec<u8> { + let offset = offset as usize; + let size = size as usize; + + let buf = self.config.as_slice(); + + if offset + size > buf.len() { + return Vec::new(); + } + + buf[offset..offset + size].to_vec() + } + + fn queues_per_thread(&self) -> Vec<u64> { + self.queues_per_thread.clone() + } + + fn exit_event(&self, _thread_index: usize) -> Option<EventFd> { + self.exit_event.try_clone().ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::TryInto; + use tempfile::tempdir; + use vhost_user_backend::VringT; + use vm_memory::GuestAddress; + + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + #[test] + fn test_vsock_backend() { + const CID: u64 = 3; + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let vhost_socket_path = test_dir + .path() + .join("test_vsock_backend.socket") + .display() + .to_string(); + let vsock_socket_path = test_dir + .path() + .join("test_vsock_backend.vsock") + .display() + .to_string(); + + let config = VsockConfig::new( + CID, + vhost_socket_path.to_string(), + vsock_socket_path.to_string(), + CONN_TX_BUF_SIZE, + ); + + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let backend = VhostUserVsockBackend::new(config, cid_map); + + assert!(backend.is_ok()); + let backend = backend.unwrap(); + + assert_eq!(backend.num_queues(), NUM_QUEUES); + assert_eq!(backend.max_queue_size(), QUEUE_SIZE); + assert_ne!(backend.features(), 0); + assert!(!backend.protocol_features().is_empty()); + backend.set_event_idx(false); + + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(), + ); + let vrings = [ + VringRwLock::new(mem.clone(), 0x1000).unwrap(), + VringRwLock::new(mem.clone(), 0x2000).unwrap(), + ]; + vrings[0].set_queue_info(0x100, 0x200, 0x300).unwrap(); + vrings[0].set_queue_ready(true); + vrings[1].set_queue_info(0x1100, 0x1200, 0x1300).unwrap(); + vrings[1].set_queue_ready(true); + + assert!(backend.update_memory(mem).is_ok()); + + let queues_per_thread = backend.queues_per_thread(); + assert_eq!(queues_per_thread.len(), 1); + assert_eq!(queues_per_thread[0], 0b11); + + let config = backend.get_config(0, 8); + assert_eq!(config.len(), 8); + let cid = u64::from_le_bytes(config.try_into().unwrap()); + assert_eq!(cid, CID); + + let exit = backend.exit_event(0); + assert!(exit.is_some()); + exit.unwrap().write(1).unwrap(); + + let ret = backend.handle_event(RX_QUEUE_EVENT, EventSet::IN, &vrings, 0); + assert!(ret.is_ok()); + assert!(!ret.unwrap()); + + let ret = backend.handle_event(TX_QUEUE_EVENT, EventSet::IN, &vrings, 0); + assert!(ret.is_ok()); + assert!(!ret.unwrap()); + + let ret = backend.handle_event(EVT_QUEUE_EVENT, EventSet::IN, &vrings, 0); + assert!(ret.is_ok()); + assert!(!ret.unwrap()); + + let ret = backend.handle_event(BACKEND_EVENT, EventSet::IN, &vrings, 0); + assert!(ret.is_ok()); + assert!(!ret.unwrap()); + + // cleanup + let _ = std::fs::remove_file(vhost_socket_path); + let _ = std::fs::remove_file(vsock_socket_path); + + test_dir.close().unwrap(); + } + + #[test] + fn test_vsock_backend_failures() { + const CID: u64 = 3; + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let vhost_socket_path = test_dir + .path() + .join("test_vsock_backend_failures.socket") + .display() + .to_string(); + let vsock_socket_path = test_dir + .path() + .join("test_vsock_backend_failures.vsock") + .display() + .to_string(); + + let config = VsockConfig::new( + CID, + "/sys/not_allowed.socket".to_string(), + "/sys/not_allowed.vsock".to_string(), + CONN_TX_BUF_SIZE, + ); + + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let backend = VhostUserVsockBackend::new(config, cid_map.clone()); + assert!(backend.is_err()); + + let config = VsockConfig::new( + CID, + vhost_socket_path.to_string(), + vsock_socket_path.to_string(), + CONN_TX_BUF_SIZE, + ); + + let backend = VhostUserVsockBackend::new(config, cid_map).unwrap(); + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(), + ); + let vrings = [ + VringRwLock::new(mem.clone(), 0x1000).unwrap(), + VringRwLock::new(mem.clone(), 0x2000).unwrap(), + ]; + + backend.update_memory(mem).unwrap(); + + // reading out of the config space, expecting empty config + let config = backend.get_config(2, 8); + assert_eq!(config.len(), 0); + + assert_eq!( + backend + .handle_event(RX_QUEUE_EVENT, EventSet::OUT, &vrings, 0) + .unwrap_err() + .to_string(), + Error::HandleEventNotEpollIn.to_string() + ); + assert_eq!( + backend + .handle_event(SIBLING_VM_EVENT + 1, EventSet::IN, &vrings, 0) + .unwrap_err() + .to_string(), + Error::HandleUnknownEvent.to_string() + ); + + // cleanup + let _ = std::fs::remove_file(vhost_socket_path); + let _ = std::fs::remove_file(vsock_socket_path); + + test_dir.close().unwrap(); + } +} diff --git a/src/vhu_vsock_thread.rs b/src/vhu_vsock_thread.rs new file mode 100644 index 0000000..726e2b6 --- /dev/null +++ b/src/vhu_vsock_thread.rs @@ -0,0 +1,837 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use std::{ + fs::File, + io, + io::Read, + num::Wrapping, + ops::Deref, + os::unix::{ + net::{UnixListener, UnixStream}, + prelude::{AsRawFd, FromRawFd, RawFd}, + }, + sync::{Arc, RwLock}, +}; + +use futures::executor::{ThreadPool, ThreadPoolBuilder}; +use log::warn; +use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT}; +use virtio_queue::QueueOwnedT; +use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; +use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::{ + epoll::EventSet, + eventfd::{EventFd, EFD_NONBLOCK}, +}; + +use crate::{ + rxops::*, + thread_backend::*, + vhu_vsock::{ + CidMap, ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, SIBLING_VM_EVENT, + VSOCK_HOST_CID, + }, + vsock_conn::*, +}; + +type ArcVhostBknd = Arc<VhostUserVsockBackend>; + +enum RxQueueType { + Standard, + RawPkts, +} +pub(crate) struct VhostUserVsockThread { + /// Guest memory map. + pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>, + /// VIRTIO_RING_F_EVENT_IDX. + pub event_idx: bool, + /// Host socket raw file descriptor. + host_sock: RawFd, + /// Host socket path + host_sock_path: String, + /// Listener listening for new connections on the host. + host_listener: UnixListener, + /// Instance of VringWorker. + vring_worker: Option<Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>>, + /// epoll fd to which new host connections are added. + epoll_file: File, + /// VsockThreadBackend instance. + pub thread_backend: VsockThreadBackend, + /// CID of the guest. + guest_cid: u64, + /// Thread pool to handle event idx. + pool: ThreadPool, + /// host side port on which application listens. + local_port: Wrapping<u32>, + /// The tx buffer size + tx_buffer_size: u32, + /// EventFd to notify this thread for custom events. Currently used to notify + /// this thread to process raw vsock packets sent from a sibling VM. + pub sibling_event_fd: EventFd, + /// Keeps track of which RX queue was processed first in the last iteration. + /// Used to alternate between the RX queues to prevent the starvation of one by the other. + last_processed: RxQueueType, +} + +impl VhostUserVsockThread { + /// Create a new instance of VhostUserVsockThread. + pub fn new( + uds_path: String, + guest_cid: u64, + tx_buffer_size: u32, + cid_map: Arc<RwLock<CidMap>>, + ) -> Result<Self> { + // TODO: better error handling, maybe add a param to force the unlink + let _ = std::fs::remove_file(uds_path.clone()); + let host_sock = UnixListener::bind(&uds_path) + .and_then(|sock| sock.set_nonblocking(true).map(|_| sock)) + .map_err(Error::UnixBind)?; + + let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?; + // SAFETY: Safe as the fd is guaranteed to be valid here. + let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; + + let host_raw_fd = host_sock.as_raw_fd(); + + let sibling_event_fd = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?; + + let thread_backend = VsockThreadBackend::new( + uds_path.clone(), + epoll_fd, + guest_cid, + tx_buffer_size, + cid_map.clone(), + ); + + cid_map.write().unwrap().insert( + guest_cid, + ( + thread_backend.raw_pkts_queue.clone(), + sibling_event_fd.try_clone().unwrap(), + ), + ); + + let thread = VhostUserVsockThread { + mem: None, + event_idx: false, + host_sock: host_sock.as_raw_fd(), + host_sock_path: uds_path, + host_listener: host_sock, + vring_worker: None, + epoll_file, + thread_backend, + guest_cid, + pool: ThreadPoolBuilder::new() + .pool_size(1) + .create() + .map_err(Error::CreateThreadPool)?, + local_port: Wrapping(0), + tx_buffer_size, + sibling_event_fd, + last_processed: RxQueueType::Standard, + }; + + VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?; + + Ok(thread) + } + + /// Register a file with an epoll to listen for events in evset. + pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> { + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_ADD, + fd, + epoll::Event::new(evset, fd as u64), + ) + .map_err(Error::EpollAdd)?; + + Ok(()) + } + + /// Remove a file from the epoll. + pub fn epoll_unregister(epoll_fd: RawFd, fd: RawFd) -> Result<()> { + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_DEL, + fd, + epoll::Event::new(epoll::Events::empty(), 0), + ) + .map_err(Error::EpollRemove)?; + + Ok(()) + } + + /// Modify the events we listen to for the fd in the epoll. + pub fn epoll_modify(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> { + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_MOD, + fd, + epoll::Event::new(evset, fd as u64), + ) + .map_err(Error::EpollModify)?; + + Ok(()) + } + + /// Return raw file descriptor of the epoll file. + fn get_epoll_fd(&self) -> RawFd { + self.epoll_file.as_raw_fd() + } + + /// Set self's VringWorker. + pub fn set_vring_worker( + &mut self, + vring_worker: Option<Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>>, + ) { + self.vring_worker = vring_worker; + self.vring_worker + .as_ref() + .unwrap() + .register_listener(self.get_epoll_fd(), EventSet::IN, u64::from(BACKEND_EVENT)) + .unwrap(); + self.vring_worker + .as_ref() + .unwrap() + .register_listener( + self.sibling_event_fd.as_raw_fd(), + EventSet::IN, + u64::from(SIBLING_VM_EVENT), + ) + .unwrap(); + } + + /// Process a BACKEND_EVENT received by VhostUserVsockBackend. + pub fn process_backend_evt(&mut self, _evset: EventSet) { + let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32]; + 'epoll: loop { + match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) { + Ok(ev_cnt) => { + for evt in epoll_events.iter().take(ev_cnt) { + self.handle_event( + evt.data as RawFd, + epoll::Events::from_bits(evt.events).unwrap(), + ); + } + } + Err(e) => { + if e.kind() == io::ErrorKind::Interrupted { + continue; + } + warn!("failed to consume new epoll event"); + } + } + break 'epoll; + } + } + + /// Handle a BACKEND_EVENT by either accepting a new connection or + /// forwarding a request to the appropriate connection object. + fn handle_event(&mut self, fd: RawFd, evset: epoll::Events) { + if fd == self.host_sock { + // This is a new connection initiated by an application running on the host + let conn = self.host_listener.accept().map_err(Error::UnixAccept); + if self.mem.is_some() { + conn.and_then(|(stream, _)| { + stream + .set_nonblocking(true) + .map(|_| stream) + .map_err(Error::UnixAccept) + }) + .and_then(|stream| self.add_stream_listener(stream)) + .unwrap_or_else(|err| { + warn!("Unable to accept new local connection: {:?}", err); + }); + } else { + // If we aren't ready to process requests, accept and immediately close + // the connection. + conn.map(drop).unwrap_or_else(|err| { + warn!("Error closing an incoming connection: {:?}", err); + }); + } + } else { + // Check if the stream represented by fd has already established a + // connection with the application running in the guest + if let std::collections::hash_map::Entry::Vacant(_) = + self.thread_backend.listener_map.entry(fd) + { + // New connection from the host + if evset.bits() != epoll::Events::EPOLLIN.bits() { + // Has to be EPOLLIN as it was not connected previously + return; + } + let mut unix_stream = match self.thread_backend.stream_map.remove(&fd) { + Some(uds) => uds, + None => { + warn!("Error while searching fd in the stream map"); + return; + } + }; + + // Local peer is sending a "connect PORT\n" command + let peer_port = match Self::read_local_stream_port(&mut unix_stream) { + Ok(port) => port, + Err(err) => { + warn!("Error while parsing \"connect PORT\n\" command: {:?}", err); + return; + } + }; + + // Allocate a local port number + let local_port = match self.allocate_local_port() { + Ok(lp) => lp, + Err(err) => { + warn!("Error while allocating local port: {:?}", err); + return; + } + }; + + // Insert the fd into the backend's maps + self.thread_backend + .listener_map + .insert(fd, ConnMapKey::new(local_port, peer_port)); + + // Create a new connection object an enqueue a connection request + // packet to be sent to the guest + let conn_map_key = ConnMapKey::new(local_port, peer_port); + let mut new_conn = VsockConnection::new_local_init( + unix_stream, + VSOCK_HOST_CID, + local_port, + self.guest_cid, + peer_port, + self.get_epoll_fd(), + self.tx_buffer_size, + ); + new_conn.rx_queue.enqueue(RxOps::Request); + new_conn.set_peer_port(peer_port); + + // Add connection object into the backend's maps + self.thread_backend.conn_map.insert(conn_map_key, new_conn); + + self.thread_backend + .backend_rxq + .push_back(ConnMapKey::new(local_port, peer_port)); + + // Re-register the fd to listen for EPOLLIN and EPOLLOUT events + Self::epoll_modify( + self.get_epoll_fd(), + fd, + epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, + ) + .unwrap(); + } else { + // Previously connected connection + let key = self.thread_backend.listener_map.get(&fd).unwrap(); + let conn = self.thread_backend.conn_map.get_mut(key).unwrap(); + + if evset.bits() == epoll::Events::EPOLLOUT.bits() { + // Flush any remaining data from the tx buffer + match conn.tx_buf.flush_to(&mut conn.stream) { + Ok(cnt) => { + if cnt > 0 { + conn.fwd_cnt += Wrapping(cnt as u32); + conn.rx_queue.enqueue(RxOps::CreditUpdate); + } + self.thread_backend + .backend_rxq + .push_back(ConnMapKey::new(conn.local_port, conn.peer_port)); + } + Err(e) => { + dbg!("Error: {:?}", e); + } + } + return; + } + + // Unregister stream from the epoll, register when connection is + // established with the guest + Self::epoll_unregister(self.epoll_file.as_raw_fd(), fd).unwrap(); + + // Enqueue a read request + conn.rx_queue.enqueue(RxOps::Rw); + self.thread_backend + .backend_rxq + .push_back(ConnMapKey::new(conn.local_port, conn.peer_port)); + } + } + } + + /// Allocate a new local port number. + fn allocate_local_port(&mut self) -> Result<u32> { + // TODO: Improve space efficiency of this operation + // TODO: Reuse the conn_map HashMap + // TODO: Test this. + let mut alloc_local_port = self.local_port.0; + loop { + if !self + .thread_backend + .local_port_set + .contains(&alloc_local_port) + { + // The port set doesn't contain the newly allocated port number. + self.local_port = Wrapping(alloc_local_port + 1); + self.thread_backend.local_port_set.insert(alloc_local_port); + return Ok(alloc_local_port); + } else { + if alloc_local_port == self.local_port.0 { + // We have exhausted our search and wrapped back to the current port number + return Err(Error::NoFreeLocalPort); + } + alloc_local_port += 1; + } + } + } + + /// Read `CONNECT PORT_NUM\n` from the connected stream. + fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> { + let mut buf = [0u8; 32]; + + // Minimum number of bytes we should be able to read + // Corresponds to 'CONNECT 0\n' + const MIN_READ_LEN: usize = 10; + + // Read in the minimum number of bytes we can read + stream + .read_exact(&mut buf[..MIN_READ_LEN]) + .map_err(Error::UnixRead)?; + + let mut read_len = MIN_READ_LEN; + while buf[read_len - 1] != b'\n' && read_len < buf.len() { + stream + .read_exact(&mut buf[read_len..read_len + 1]) + .map_err(Error::UnixRead)?; + read_len += 1; + } + + let mut word_iter = std::str::from_utf8(&buf[..read_len]) + .map_err(Error::ConvertFromUtf8)? + .split_whitespace(); + + word_iter + .next() + .ok_or(Error::InvalidPortRequest) + .and_then(|word| { + if word.to_lowercase() == "connect" { + Ok(()) + } else { + Err(Error::InvalidPortRequest) + } + }) + .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest)) + .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger)) + .map_err(|e| Error::ReadStreamPort(Box::new(e))) + } + + /// Add a stream to epoll to listen for EPOLLIN events. + fn add_stream_listener(&mut self, stream: UnixStream) -> Result<()> { + let stream_fd = stream.as_raw_fd(); + self.thread_backend.stream_map.insert(stream_fd, stream); + VhostUserVsockThread::epoll_register( + self.get_epoll_fd(), + stream_fd, + epoll::Events::EPOLLIN, + )?; + + Ok(()) + } + + /// Iterate over the rx queue and process rx requests. + fn process_rx_queue( + &mut self, + vring: &VringRwLock, + rx_queue_type: RxQueueType, + ) -> Result<bool> { + let mut used_any = false; + let atomic_mem = match &self.mem { + Some(m) => m, + None => return Err(Error::NoMemoryConfigured), + }; + + let mut vring_mut = vring.get_mut(); + + let queue = vring_mut.get_queue_mut(); + + while let Some(mut avail_desc) = queue + .iter(atomic_mem.memory()) + .map_err(|_| Error::IterateQueue)? + .next() + { + used_any = true; + let mem = atomic_mem.clone().memory(); + + let head_idx = avail_desc.head_index(); + let used_len = match VsockPacket::from_rx_virtq_chain( + mem.deref(), + &mut avail_desc, + self.tx_buffer_size, + ) { + Ok(mut pkt) => { + let recv_result = match rx_queue_type { + RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt), + RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt), + }; + + if recv_result.is_ok() { + PKT_HEADER_SIZE + pkt.len() as usize + } else { + queue.iter(mem).unwrap().go_to_previous_position(); + break; + } + } + Err(e) => { + warn!("vsock: RX queue error: {:?}", e); + 0 + } + }; + + let vring = vring.clone(); + let event_idx = self.event_idx; + + self.pool.spawn_ok(async move { + // TODO: Understand why doing the following in the pool works + if event_idx { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + match vring.needs_notification() { + Err(_) => { + warn!("Could not check if queue needs to be notified"); + vring.signal_used_queue().unwrap(); + } + Ok(needs_notification) => { + if needs_notification { + vring.signal_used_queue().unwrap(); + } + } + } + } else { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + vring.signal_used_queue().unwrap(); + } + }); + + match rx_queue_type { + RxQueueType::Standard => { + if !self.thread_backend.pending_rx() { + break; + } + } + RxQueueType::RawPkts => { + if !self.thread_backend.pending_raw_pkts() { + break; + } + } + } + } + Ok(used_any) + } + + /// Wrapper to process rx queue based on whether event idx is enabled or not. + fn process_unix_sockets(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> { + if event_idx { + // To properly handle EVENT_IDX we need to keep calling + // process_rx_queue until it stops finding new requests + // on the queue, as vm-virtio's Queue implementation + // only checks avail_index once + loop { + if !self.thread_backend.pending_rx() { + break; + } + vring.disable_notification().unwrap(); + + self.process_rx_queue(vring, RxQueueType::Standard)?; + if !vring.enable_notification().unwrap() { + break; + } + } + } else { + self.process_rx_queue(vring, RxQueueType::Standard)?; + } + Ok(false) + } + + /// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not. + pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> { + if event_idx { + loop { + if !self.thread_backend.pending_raw_pkts() { + break; + } + vring.disable_notification().unwrap(); + + self.process_rx_queue(vring, RxQueueType::RawPkts)?; + if !vring.enable_notification().unwrap() { + break; + } + } + } else { + self.process_rx_queue(vring, RxQueueType::RawPkts)?; + } + Ok(false) + } + + pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> { + match self.last_processed { + RxQueueType::Standard => { + if self.thread_backend.pending_raw_pkts() { + self.process_raw_pkts(vring, event_idx)?; + self.last_processed = RxQueueType::RawPkts; + } + if self.thread_backend.pending_rx() { + self.process_unix_sockets(vring, event_idx)?; + } + } + RxQueueType::RawPkts => { + if self.thread_backend.pending_rx() { + self.process_unix_sockets(vring, event_idx)?; + self.last_processed = RxQueueType::Standard; + } + if self.thread_backend.pending_raw_pkts() { + self.process_raw_pkts(vring, event_idx)?; + } + } + } + Ok(false) + } + + /// Process tx queue and send requests to the backend for processing. + fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result<bool> { + let mut used_any = false; + + let atomic_mem = match &self.mem { + Some(m) => m, + None => return Err(Error::NoMemoryConfigured), + }; + + while let Some(mut avail_desc) = vring + .get_mut() + .get_queue_mut() + .iter(atomic_mem.memory()) + .map_err(|_| Error::IterateQueue)? + .next() + { + used_any = true; + let mem = atomic_mem.clone().memory(); + + let head_idx = avail_desc.head_index(); + let pkt = match VsockPacket::from_tx_virtq_chain( + mem.deref(), + &mut avail_desc, + self.tx_buffer_size, + ) { + Ok(pkt) => pkt, + Err(e) => { + dbg!("vsock: error reading TX packet: {:?}", e); + continue; + } + }; + + if self.thread_backend.send_pkt(&pkt).is_err() { + vring + .get_mut() + .get_queue_mut() + .iter(mem) + .unwrap() + .go_to_previous_position(); + break; + } + + // TODO: Check if the protocol requires read length to be correct + let used_len = 0; + + let vring = vring.clone(); + let event_idx = self.event_idx; + + self.pool.spawn_ok(async move { + if event_idx { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + match vring.needs_notification() { + Err(_) => { + warn!("Could not check if queue needs to be notified"); + vring.signal_used_queue().unwrap(); + } + Ok(needs_notification) => { + if needs_notification { + vring.signal_used_queue().unwrap(); + } + } + } + } else { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + vring.signal_used_queue().unwrap(); + } + }); + } + + Ok(used_any) + } + + /// Wrapper to process tx queue based on whether event idx is enabled or not. + pub fn process_tx(&mut self, vring_lock: &VringRwLock, event_idx: bool) -> Result<bool> { + if event_idx { + // To properly handle EVENT_IDX we need to keep calling + // process_rx_queue until it stops finding new requests + // on the queue, as vm-virtio's Queue implementation + // only checks avail_index once + loop { + vring_lock.disable_notification().unwrap(); + self.process_tx_queue(vring_lock)?; + if !vring_lock.enable_notification().unwrap() { + break; + } + } + } else { + self.process_tx_queue(vring_lock)?; + } + Ok(false) + } +} + +impl Drop for VhostUserVsockThread { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.host_sock_path); + self.thread_backend + .cid_map + .write() + .unwrap() + .remove(&self.guest_cid); + } +} +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use tempfile::tempdir; + use vm_memory::GuestAddress; + use vmm_sys_util::eventfd::EventFd; + + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + impl VhostUserVsockThread { + fn get_epoll_file(&self) -> &File { + &self.epoll_file + } + } + + #[test] + fn test_vsock_thread() { + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let t = VhostUserVsockThread::new( + test_dir + .path() + .join("test_vsock_thread.vsock") + .display() + .to_string(), + 3, + CONN_TX_BUF_SIZE, + cid_map, + ); + assert!(t.is_ok()); + + let mut t = t.unwrap(); + let epoll_fd = t.get_epoll_file().as_raw_fd(); + + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(), + ); + + t.mem = Some(mem.clone()); + + let dummy_fd = EventFd::new(0).unwrap(); + + assert!(VhostUserVsockThread::epoll_register( + epoll_fd, + dummy_fd.as_raw_fd(), + epoll::Events::EPOLLOUT + ) + .is_ok()); + assert!(VhostUserVsockThread::epoll_modify( + epoll_fd, + dummy_fd.as_raw_fd(), + epoll::Events::EPOLLIN + ) + .is_ok()); + assert!(VhostUserVsockThread::epoll_unregister(epoll_fd, dummy_fd.as_raw_fd()).is_ok()); + assert!(VhostUserVsockThread::epoll_register( + epoll_fd, + dummy_fd.as_raw_fd(), + epoll::Events::EPOLLIN + ) + .is_ok()); + + let vring = VringRwLock::new(mem, 0x1000).unwrap(); + vring.set_queue_info(0x100, 0x200, 0x300).unwrap(); + vring.set_queue_ready(true); + + assert!(t.process_tx(&vring, false).is_ok()); + assert!(t.process_tx(&vring, true).is_ok()); + // add backend_rxq to avoid that RX processing is skipped + t.thread_backend + .backend_rxq + .push_back(ConnMapKey::new(0, 0)); + assert!(t.process_rx(&vring, false).is_ok()); + assert!(t.process_rx(&vring, true).is_ok()); + + dummy_fd.write(1).unwrap(); + + t.process_backend_evt(EventSet::empty()); + + test_dir.close().unwrap(); + } + + #[test] + fn test_vsock_thread_failures() { + let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new())); + + let test_dir = tempdir().expect("Could not create a temp test directory."); + + let t = VhostUserVsockThread::new( + "/sys/not_allowed.vsock".to_string(), + 3, + CONN_TX_BUF_SIZE, + cid_map.clone(), + ); + assert!(t.is_err()); + + let vsock_socket_path = test_dir + .path() + .join("test_vsock_thread_failures.vsock") + .display() + .to_string(); + let mut t = + VhostUserVsockThread::new(vsock_socket_path, 3, CONN_TX_BUF_SIZE, cid_map).unwrap(); + assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err()); + assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err()); + assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err()); + + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(), + ); + + let vring = VringRwLock::new(mem, 0x1000).unwrap(); + + // memory is not configured, so processing TX should fail + assert!(t.process_tx(&vring, false).is_err()); + assert!(t.process_tx(&vring, true).is_err()); + + // add backend_rxq to avoid that RX processing is skipped + t.thread_backend + .backend_rxq + .push_back(ConnMapKey::new(0, 0)); + assert!(t.process_rx(&vring, false).is_err()); + assert!(t.process_rx(&vring, true).is_err()); + + test_dir.close().unwrap(); + } +} diff --git a/src/vsock_conn.rs b/src/vsock_conn.rs new file mode 100644 index 0000000..058c2e1 --- /dev/null +++ b/src/vsock_conn.rs @@ -0,0 +1,781 @@ +// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause + +use std::{ + io::{ErrorKind, Read, Write}, + num::Wrapping, + os::unix::prelude::{AsRawFd, RawFd}, +}; + +use log::info; +use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; +use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice}; + +use crate::{ + rxops::*, + rxqueue::*, + txbuf::*, + vhu_vsock::{ + Error, Result, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND, + VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE, + VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM, + }, + vhu_vsock_thread::VhostUserVsockThread, +}; + +#[derive(Debug)] +pub(crate) struct VsockConnection<S> { + /// Host-side stream corresponding to this vsock connection. + pub stream: S, + /// Specifies if the stream is connected to a listener on the host. + pub connect: bool, + /// Port at which a guest application is listening to. + pub peer_port: u32, + /// Queue holding pending rx operations per connection. + pub rx_queue: RxQueue, + /// CID of the host. + local_cid: u64, + /// Port on the host at which a host-side application listens to. + pub local_port: u32, + /// CID of the guest. + pub guest_cid: u64, + /// Total number of bytes written to stream from tx buffer. + pub fwd_cnt: Wrapping<u32>, + /// Total number of bytes previously forwarded to stream. + last_fwd_cnt: Wrapping<u32>, + /// Size of buffer the guest has allocated for this connection. + peer_buf_alloc: u32, + /// Number of bytes the peer has forwarded to a connection. + peer_fwd_cnt: Wrapping<u32>, + /// The total number of bytes sent to the guest vsock driver. + rx_cnt: Wrapping<u32>, + /// epoll fd to which this connection's stream has to be added. + pub epoll_fd: RawFd, + /// Local tx buffer. + pub tx_buf: LocalTxBuf, + /// Local tx buffer size + tx_buffer_size: u32, +} + +impl<S: AsRawFd + Read + Write> VsockConnection<S> { + /// Create a new vsock connection object for locally i.e host-side + /// inititated connections. + pub fn new_local_init( + stream: S, + local_cid: u64, + local_port: u32, + guest_cid: u64, + guest_port: u32, + epoll_fd: RawFd, + tx_buffer_size: u32, + ) -> Self { + Self { + stream, + connect: false, + peer_port: guest_port, + rx_queue: RxQueue::new(), + local_cid, + local_port, + guest_cid, + fwd_cnt: Wrapping(0), + last_fwd_cnt: Wrapping(0), + peer_buf_alloc: 0, + peer_fwd_cnt: Wrapping(0), + rx_cnt: Wrapping(0), + epoll_fd, + tx_buf: LocalTxBuf::new(tx_buffer_size), + tx_buffer_size, + } + } + + /// Create a new vsock connection object for connections initiated by + /// an application running in the guest. + #[allow(clippy::too_many_arguments)] + pub fn new_peer_init( + stream: S, + local_cid: u64, + local_port: u32, + guest_cid: u64, + guest_port: u32, + epoll_fd: RawFd, + peer_buf_alloc: u32, + tx_buffer_size: u32, + ) -> Self { + let mut rx_queue = RxQueue::new(); + rx_queue.enqueue(RxOps::Response); + Self { + stream, + connect: false, + peer_port: guest_port, + rx_queue, + local_cid, + local_port, + guest_cid, + fwd_cnt: Wrapping(0), + last_fwd_cnt: Wrapping(0), + peer_buf_alloc, + peer_fwd_cnt: Wrapping(0), + rx_cnt: Wrapping(0), + epoll_fd, + tx_buf: LocalTxBuf::new(tx_buffer_size), + tx_buffer_size, + } + } + + /// Set the peer port to the guest side application's port. + pub fn set_peer_port(&mut self, peer_port: u32) { + self.peer_port = peer_port; + } + + /// Process a vsock packet that is meant for this connection. + /// Forward data to the host-side application if the vsock packet + /// contains a RW operation. + pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> { + // Initialize all fields in the packet header + self.init_pkt(pkt); + + match self.rx_queue.dequeue() { + Some(RxOps::Request) => { + // Send a connection request to the guest-side application + pkt.set_op(VSOCK_OP_REQUEST); + Ok(()) + } + Some(RxOps::Rw) => { + if !self.connect { + // There is no host-side application listening for this + // packet, hence send back an RST. + pkt.set_op(VSOCK_OP_RST); + return Ok(()); + } + + // Check if peer has space for receiving data + if self.need_credit_update_from_peer() { + self.last_fwd_cnt = self.fwd_cnt; + pkt.set_op(VSOCK_OP_CREDIT_REQUEST); + return Ok(()); + } + let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?; + + // Perform a credit check to find the maximum read size. The read + // data must fit inside a packet buffer and be within peer's + // available buffer space + let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit()); + + // Read data from the stream directly into the buffer + if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) { + if read_cnt == 0 { + // If no data was read then the stream was closed down unexpectedly. + // Send a shutdown packet to the guest-side application. + pkt.set_op(VSOCK_OP_SHUTDOWN) + .set_flag(VSOCK_FLAGS_SHUTDOWN_RCV) + .set_flag(VSOCK_FLAGS_SHUTDOWN_SEND); + } else { + // If data was read, then set the length field in the packet header + // to the amount of data that was read. + pkt.set_op(VSOCK_OP_RW).set_len(read_cnt as u32); + + // Re-register the stream file descriptor for read and write events + VhostUserVsockThread::epoll_register( + self.epoll_fd, + self.stream.as_raw_fd(), + epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, + )?; + } + + // Update the rx_cnt with the amount of data in the vsock packet. + self.rx_cnt += Wrapping(pkt.len()); + self.last_fwd_cnt = self.fwd_cnt; + } + Ok(()) + } + Some(RxOps::Response) => { + // A response has been received to a newly initiated host-side connection + self.connect = true; + pkt.set_op(VSOCK_OP_RESPONSE); + Ok(()) + } + Some(RxOps::CreditUpdate) => { + // Request credit update from the guest. + if !self.rx_queue.pending_rx() { + // Waste an rx buffer if no rx is pending + pkt.set_op(VSOCK_OP_CREDIT_UPDATE); + self.last_fwd_cnt = self.fwd_cnt; + } + Ok(()) + } + _ => Err(Error::NoRequestRx), + } + } + + /// Deliver a guest generated packet to this connection. + /// + /// Returns: + /// - always `Ok(())` to indicate that the packet has been consumed + pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> { + // Update peer credit information + self.peer_buf_alloc = pkt.buf_alloc(); + self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); + + match pkt.op() { + VSOCK_OP_RESPONSE => { + // Confirmation for a host initiated connection + // TODO: Handle stream write error in a better manner + let response = format!("OK {}\n", self.peer_port); + self.stream.write_all(response.as_bytes()).unwrap(); + self.connect = true; + } + VSOCK_OP_RW => { + // Data has to be written to the host-side stream + match pkt.data_slice() { + None => { + info!( + "Dropping empty packet from guest (lp={}, pp={})", + self.local_port, self.peer_port + ); + return Ok(()); + } + Some(buf) => { + if let Err(err) = self.send_bytes(buf) { + // TODO: Terminate this connection + dbg!("err:{:?}", err); + return Ok(()); + } + } + } + } + VSOCK_OP_CREDIT_UPDATE => { + // Already updated the credit + + // Re-register the stream file descriptor for read and write events + if VhostUserVsockThread::epoll_modify( + self.epoll_fd, + self.stream.as_raw_fd(), + epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, + ) + .is_err() + { + VhostUserVsockThread::epoll_register( + self.epoll_fd, + self.stream.as_raw_fd(), + epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, + ) + .unwrap(); + }; + } + VSOCK_OP_CREDIT_REQUEST => { + // Send back this connection's credit information + self.rx_queue.enqueue(RxOps::CreditUpdate); + } + VSOCK_OP_SHUTDOWN => { + // Shutdown this connection + let recv_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_RCV != 0; + let send_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_SEND != 0; + + if recv_off && send_off && self.tx_buf.is_empty() { + self.rx_queue.enqueue(RxOps::Reset); + } + } + _ => {} + } + + Ok(()) + } + + /// Write data to the host-side stream. + /// + /// Returns: + /// - Ok(cnt) where cnt is the number of bytes written to the stream + /// - Err(Error::UnixWrite) if there was an error writing to the stream + fn send_bytes<B: BitmapSlice>(&mut self, buf: &VolatileSlice<B>) -> Result<()> { + if !self.tx_buf.is_empty() { + // Data is already present in the buffer and the backend + // is waiting for a EPOLLOUT event to flush it + return self.tx_buf.push(buf); + } + + // Write data to the stream + let written_count = match buf.write_to(0, &mut self.stream, buf.len()) { + Ok(cnt) => cnt, + Err(vm_memory::VolatileMemoryError::IOError(e)) => { + if e.kind() == ErrorKind::WouldBlock { + 0 + } else { + dbg!("send_bytes error: {:?}", e); + return Err(Error::UnixWrite); + } + } + Err(e) => { + dbg!("send_bytes error: {:?}", e); + return Err(Error::UnixWrite); + } + }; + + if written_count > 0 { + // Increment forwarded count by number of bytes written to the stream + self.fwd_cnt += Wrapping(written_count as u32); + + // At what point in available credits should we send a credit update. + // This is set to 1/4th of the tx buffer size. If we keep it too low, + // we will end up sending too many credit updates. If we keep it too + // high, we will end up sending too few credit updates and cause stalls. + // Stalls are more bad than too many credit updates. + let free_space = self + .tx_buffer_size + .wrapping_sub((self.fwd_cnt - self.last_fwd_cnt).0); + if free_space < self.tx_buffer_size / 4 { + self.rx_queue.enqueue(RxOps::CreditUpdate); + } + } + + if written_count != buf.len() { + return self.tx_buf.push(&buf.offset(written_count).unwrap()); + } + + Ok(()) + } + + /// Initialize all header fields in the vsock packet. + fn init_pkt<'a, 'b, B: BitmapSlice>( + &self, + pkt: &'a mut VsockPacket<'b, B>, + ) -> &'a mut VsockPacket<'b, B> { + // Zero out the packet header + pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap(); + + pkt.set_src_cid(self.local_cid) + .set_dst_cid(self.guest_cid) + .set_src_port(self.local_port) + .set_dst_port(self.peer_port) + .set_type(VSOCK_TYPE_STREAM) + .set_buf_alloc(self.tx_buffer_size) + .set_fwd_cnt(self.fwd_cnt.0) + } + + /// Get max number of bytes we can send to peer without overflowing + /// the peer's buffer. + fn peer_avail_credit(&self) -> usize { + (Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize + } + + /// Check if we need a credit update from the peer before sending + /// more data to it. + fn need_credit_update_from_peer(&self) -> bool { + self.peer_avail_credit() == 0 + } +} + +#[cfg(test)] +mod tests { + use byteorder::{ByteOrder, LittleEndian}; + + use super::*; + use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM}; + use std::io::Result as IoResult; + use std::ops::Deref; + use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE}; + use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT}; + use vm_memory::{ + Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard, + GuestMemoryMmap, + }; + + const CONN_TX_BUF_SIZE: u32 = 64 * 1024; + + struct HeadParams { + head_len: usize, + data_len: u32, + } + + impl HeadParams { + fn new(head_len: usize, data_len: u32) -> Self { + Self { head_len, data_len } + } + fn construct_head(&self) -> Vec<u8> { + let mut header = vec![0_u8; self.head_len]; + if self.head_len == PKT_HEADER_SIZE { + // Offset into the header for data length + const HDROFF_LEN: usize = 24; + LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len); + } + header + } + } + + fn prepare_desc_chain_vsock( + write_only: bool, + head_params: &HeadParams, + data_chain_len: u16, + head_data_len: u32, + ) -> ( + GuestMemoryAtomic<GuestMemoryMmap>, + DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>, + ) { + let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); + let virt_queue = MockSplitQueue::new(&mem, 16); + let mut next_addr = virt_queue.desc_table().total_size() + 0x100; + let mut flags = 0; + + if write_only { + flags |= VRING_DESC_F_WRITE; + } + + let mut head_flags = if data_chain_len > 0 { + flags | VRING_DESC_F_NEXT + } else { + flags + }; + + // vsock packet header + // let header = vec![0 as u8; head_params.head_len]; + let header = head_params.construct_head(); + let head_desc = + Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1); + mem.write(&header, head_desc.addr()).unwrap(); + assert!(virt_queue.desc_table().store(0, head_desc).is_ok()); + next_addr += head_params.head_len as u64; + + // Put the descriptor index 0 in the first available ring position. + mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4)) + .unwrap(); + + // Set `avail_idx` to 1. + mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2)) + .unwrap(); + + // chain len excludes the head + for i in 0..(data_chain_len) { + // last descr in chain + if i == data_chain_len - 1 { + head_flags &= !VRING_DESC_F_NEXT; + } + // vsock data + let data = vec![0_u8; head_data_len as usize]; + let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2); + mem.write(&data, data_desc.addr()).unwrap(); + assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok()); + next_addr += head_data_len as u64; + } + + // Create descriptor chain from pre-filled memory + ( + GuestMemoryAtomic::new(mem.clone()), + virt_queue + .create_queue::<Queue>() + .unwrap() + .iter(GuestMemoryAtomic::new(mem.clone()).memory()) + .unwrap() + .next() + .unwrap(), + ) + } + + struct VsockDummySocket { + data: Vec<u8>, + } + + impl VsockDummySocket { + fn new() -> Self { + Self { data: Vec::new() } + } + } + + impl Write for VsockDummySocket { + fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> { + self.data.clear(); + self.data.extend_from_slice(buf); + + Ok(buf.len()) + } + fn flush(&mut self) -> IoResult<()> { + Ok(()) + } + } + + impl Read for VsockDummySocket { + fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { + buf[..self.data.len()].copy_from_slice(&self.data); + Ok(self.data.len()) + } + } + + impl AsRawFd for VsockDummySocket { + fn as_raw_fd(&self) -> RawFd { + -1 + } + } + + #[test] + fn test_vsock_conn_init() { + // new locally inititated connection + let dummy_file = VsockDummySocket::new(); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); + + assert!(!conn_local.connect); + assert_eq!(conn_local.peer_port, 5001); + assert_eq!(conn_local.rx_queue, RxQueue::new()); + assert_eq!(conn_local.local_cid, VSOCK_HOST_CID); + assert_eq!(conn_local.local_port, 5000); + assert_eq!(conn_local.guest_cid, 3); + + // set peer port + conn_local.set_peer_port(5002); + assert_eq!(conn_local.peer_port, 5002); + + // New connection initiated by the peer/guest + let dummy_file = VsockDummySocket::new(); + let mut conn_peer = VsockConnection::new_peer_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + 65536, + CONN_TX_BUF_SIZE, + ); + + assert!(!conn_peer.connect); + assert_eq!(conn_peer.peer_port, 5001); + assert_eq!(conn_peer.rx_queue.dequeue().unwrap(), RxOps::Response); + assert!(!conn_peer.rx_queue.pending_rx()); + assert_eq!(conn_peer.local_cid, VSOCK_HOST_CID); + assert_eq!(conn_peer.local_port, 5000); + assert_eq!(conn_peer.guest_cid, 3); + assert_eq!(conn_peer.peer_buf_alloc, 65536); + } + + #[test] + fn test_vsock_conn_credit() { + // new locally inititated connection + let dummy_file = VsockDummySocket::new(); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); + + assert_eq!(conn_local.peer_avail_credit(), 0); + assert!(conn_local.need_credit_update_from_peer()); + + conn_local.peer_buf_alloc = 65536; + assert_eq!(conn_local.peer_avail_credit(), 65536); + assert!(!conn_local.need_credit_update_from_peer()); + + conn_local.rx_cnt = Wrapping(32768); + assert_eq!(conn_local.peer_avail_credit(), 32768); + assert!(!conn_local.need_credit_update_from_peer()); + + conn_local.rx_cnt = Wrapping(65536); + assert_eq!(conn_local.peer_avail_credit(), 0); + assert!(conn_local.need_credit_update_from_peer()); + } + + #[test] + fn test_vsock_conn_init_pkt() { + // parameters for packet head construction + let head_params = HeadParams::new(PKT_HEADER_SIZE, 10); + + // new locally inititated connection + let dummy_file = VsockDummySocket::new(); + let conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); + + // write only descriptor chain + let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); + let mem = mem.memory(); + let mut pkt = + VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) + .unwrap(); + + // initialize a vsock packet for the guest + conn_local.init_pkt(&mut pkt); + + assert_eq!(pkt.src_cid(), VSOCK_HOST_CID); + assert_eq!(pkt.dst_cid(), 3); + assert_eq!(pkt.src_port(), 5000); + assert_eq!(pkt.dst_port(), 5001); + assert_eq!(pkt.type_(), VSOCK_TYPE_STREAM); + assert_eq!(pkt.buf_alloc(), CONN_TX_BUF_SIZE); + assert_eq!(pkt.fwd_cnt(), 0); + } + + #[test] + fn test_vsock_conn_recv_pkt() { + // parameters for packet head construction + let head_params = HeadParams::new(PKT_HEADER_SIZE, 5); + + // new locally inititated connection + let dummy_file = VsockDummySocket::new(); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); + + // write only descriptor chain + let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5); + let mem = mem.memory(); + let mut pkt = + VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) + .unwrap(); + + // VSOCK_OP_REQUEST: new local conn request + conn_local.rx_queue.enqueue(RxOps::Request); + let op_req = conn_local.recv_pkt(&mut pkt); + assert!(op_req.is_ok()); + assert!(!conn_local.rx_queue.pending_rx()); + assert_eq!(pkt.op(), VSOCK_OP_REQUEST); + + // VSOCK_OP_RST: reset if connection not established + conn_local.rx_queue.enqueue(RxOps::Rw); + let op_rst = conn_local.recv_pkt(&mut pkt); + assert!(op_rst.is_ok()); + assert!(!conn_local.rx_queue.pending_rx()); + assert_eq!(pkt.op(), VSOCK_OP_RST); + + // VSOCK_OP_CREDIT_UPDATE: need credit update from peer/guest + conn_local.connect = true; + conn_local.rx_queue.enqueue(RxOps::Rw); + conn_local.fwd_cnt = Wrapping(1024); + let op_credit_update = conn_local.recv_pkt(&mut pkt); + assert!(op_credit_update.is_ok()); + assert!(!conn_local.rx_queue.pending_rx()); + assert_eq!(pkt.op(), VSOCK_OP_CREDIT_REQUEST); + assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); + + // VSOCK_OP_SHUTDOWN: zero data read from stream/file + conn_local.peer_buf_alloc = 65536; + conn_local.rx_queue.enqueue(RxOps::Rw); + let op_zero_read_shutdown = conn_local.recv_pkt(&mut pkt); + assert!(op_zero_read_shutdown.is_ok()); + assert!(!conn_local.rx_queue.pending_rx()); + assert_eq!(conn_local.rx_cnt, Wrapping(0)); + assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); + assert_eq!(pkt.op(), VSOCK_OP_SHUTDOWN); + assert_eq!( + pkt.flags(), + VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND + ); + + // VSOCK_OP_RW: finite data read from stream/file + conn_local.stream.write_all(b"hello").unwrap(); + conn_local.rx_queue.enqueue(RxOps::Rw); + let op_zero_read = conn_local.recv_pkt(&mut pkt); + // below error due to epoll add + assert!(op_zero_read.is_err()); + assert_eq!(pkt.op(), VSOCK_OP_RW); + assert!(!conn_local.rx_queue.pending_rx()); + assert_eq!(pkt.len(), 5); + let buf = &mut [0u8; 5]; + assert!(pkt.data_slice().unwrap().read_slice(buf, 0).is_ok()); + assert_eq!(buf, b"hello"); + + // VSOCK_OP_RESPONSE: response from a locally initiated connection + conn_local.rx_queue.enqueue(RxOps::Response); + let op_response = conn_local.recv_pkt(&mut pkt); + assert!(op_response.is_ok()); + assert!(!conn_local.rx_queue.pending_rx()); + assert_eq!(pkt.op(), VSOCK_OP_RESPONSE); + assert!(conn_local.connect); + + // VSOCK_OP_CREDIT_UPDATE: guest needs credit update + conn_local.rx_queue.enqueue(RxOps::CreditUpdate); + let op_credit_update = conn_local.recv_pkt(&mut pkt); + assert!(!conn_local.rx_queue.pending_rx()); + assert!(op_credit_update.is_ok()); + assert_eq!(pkt.op(), VSOCK_OP_CREDIT_UPDATE); + assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); + + // non-existent request + let op_error = conn_local.recv_pkt(&mut pkt); + assert!(op_error.is_err()); + } + + #[test] + fn test_vsock_conn_send_pkt() { + // parameters for packet head construction + let head_params = HeadParams::new(PKT_HEADER_SIZE, 5); + + // new locally inititated connection + let dummy_file = VsockDummySocket::new(); + let mut conn_local = VsockConnection::new_local_init( + dummy_file, + VSOCK_HOST_CID, + 5000, + 3, + 5001, + -1, + CONN_TX_BUF_SIZE, + ); + + // write only descriptor chain + let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5); + let mem = mem.memory(); + let mut pkt = + VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) + .unwrap(); + + // peer credit information + pkt.set_buf_alloc(65536).set_fwd_cnt(1024); + + // check if peer credit information is updated currently + let credit_check = conn_local.send_pkt(&pkt); + assert!(credit_check.is_ok()); + assert_eq!(conn_local.peer_buf_alloc, 65536); + assert_eq!(conn_local.peer_fwd_cnt, Wrapping(1024)); + + // VSOCK_OP_RESPONSE + pkt.set_op(VSOCK_OP_RESPONSE); + let peer_response = conn_local.send_pkt(&pkt); + assert!(peer_response.is_ok()); + assert!(conn_local.connect); + let mut resp_buf = vec![0; 8]; + conn_local.stream.read_exact(&mut resp_buf).unwrap(); + assert_eq!(resp_buf, b"OK 5001\n"); + + // VSOCK_OP_RW + pkt.set_op(VSOCK_OP_RW); + let buf = b"hello"; + assert!(pkt.data_slice().unwrap().write_slice(buf, 0).is_ok()); + let rw_response = conn_local.send_pkt(&pkt); + assert!(rw_response.is_ok()); + let mut resp_buf = vec![0; 5]; + conn_local.stream.read_exact(&mut resp_buf).unwrap(); + assert_eq!(resp_buf, b"hello"); + + // VSOCK_OP_CREDIT_REQUEST + pkt.set_op(VSOCK_OP_CREDIT_REQUEST); + let credit_response = conn_local.send_pkt(&pkt); + assert!(credit_response.is_ok()); + assert_eq!(conn_local.rx_queue.peek().unwrap(), RxOps::CreditUpdate); + + // VSOCK_OP_SHUTDOWN + pkt.set_op(VSOCK_OP_SHUTDOWN); + pkt.set_flags(VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND); + let shutdown_response = conn_local.send_pkt(&pkt); + assert!(shutdown_response.is_ok()); + assert!(conn_local.rx_queue.contains(RxOps::Reset.bitmask())); + } +} |