aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeongik Cha <jeongik@google.com>2023-07-26 01:40:53 +0000
committerJeongik Cha <jeongik@google.com>2023-08-03 19:53:01 +0900
commit965a1e7d0710a300704b74c023df63c88597bfba (patch)
tree5c440884231ba96b418ead9bc627f891fd733e3f
parenta3307d6a5a89a764779bab2f0c53e4cec6f48387 (diff)
downloadvhost-device-vsock-main-16k.tar.gz
Import platform/external/rust/crates/vhost-device-vsockmain-16k
Bug: 277909042 Test: n/a Change-Id: Ifd78839d19d1e1fad98cd60a5d8a2491f6553ea0
-rw-r--r--.cargo_vcs_info.json6
-rw-r--r--CHANGELOG.md15
-rw-r--r--Cargo.lock1143
-rw-r--r--Cargo.toml95
-rw-r--r--Cargo.toml.orig36
-rw-r--r--LICENSE228
-rw-r--r--LICENSE-APACHE202
-rw-r--r--LICENSE-BSD-3-Clause26
-rw-r--r--METADATA19
-rw-r--r--MODULE_LICENSE_APACHE20
-rw-r--r--OWNERS2
-rw-r--r--README.md178
-rw-r--r--src/main.rs471
-rw-r--r--src/rxops.rs36
-rw-r--r--src/rxqueue.rs157
-rw-r--r--src/thread_backend.rs506
-rw-r--r--src/txbuf.rs233
-rw-r--r--src/vhu_vsock.rs518
-rw-r--r--src/vhu_vsock_thread.rs837
-rw-r--r--src/vsock_conn.rs781
20 files changed, 5489 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..2e81cc1
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,6 @@
+{
+ "git": {
+ "sha1": "6305c66f227717e639b7269c6c37efc8a7b1c5f8"
+ },
+ "path_in_vcs": "crates/vsock"
+} \ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..51d3f04
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,15 @@
+# Changelog
+## [Unreleased]
+
+### Added
+
+### Changed
+
+### Fixed
+
+### Deprecated
+
+## [0.1.0]
+
+First release
+
diff --git a/Cargo.lock b/Cargo.lock
new file mode 100644
index 0000000..bb1fe3e
--- /dev/null
+++ b/Cargo.lock
@@ -0,0 +1,1143 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 3
+
+[[package]]
+name = "ahash"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
+dependencies = [
+ "getrandom",
+ "once_cell",
+ "version_check",
+]
+
+[[package]]
+name = "aho-corasick"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "anstream"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163"
+dependencies = [
+ "anstyle",
+ "anstyle-parse",
+ "anstyle-query",
+ "anstyle-wincon",
+ "colorchoice",
+ "is-terminal",
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd"
+
+[[package]]
+name = "anstyle-parse"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333"
+dependencies = [
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle-query"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
+dependencies = [
+ "windows-sys",
+]
+
+[[package]]
+name = "anstyle-wincon"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
+dependencies = [
+ "anstyle",
+ "windows-sys",
+]
+
+[[package]]
+name = "arc-swap"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
+
+[[package]]
+name = "async-trait"
+version = "0.1.71"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "autocfg"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+
+[[package]]
+name = "base64"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
+
+[[package]]
+name = "bitflags"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
+
+[[package]]
+name = "bitflags"
+version = "2.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42"
+
+[[package]]
+name = "block-buffer"
+version = "0.10.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
+dependencies = [
+ "generic-array",
+]
+
+[[package]]
+name = "byteorder"
+version = "1.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
+
+[[package]]
+name = "cc"
+version = "1.0.79"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
+
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
+[[package]]
+name = "clap"
+version = "4.3.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d"
+dependencies = [
+ "clap_builder",
+ "clap_derive",
+ "once_cell",
+]
+
+[[package]]
+name = "clap_builder"
+version = "4.3.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1"
+dependencies = [
+ "anstream",
+ "anstyle",
+ "clap_lex",
+ "strsim",
+]
+
+[[package]]
+name = "clap_derive"
+version = "4.3.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "clap_lex"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
+
+[[package]]
+name = "colorchoice"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
+
+[[package]]
+name = "config"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d379af7f68bfc21714c6c7dea883544201741d2ce8274bb12fa54f89507f52a7"
+dependencies = [
+ "async-trait",
+ "json5",
+ "lazy_static",
+ "nom",
+ "pathdiff",
+ "ron",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "toml",
+ "yaml-rust",
+]
+
+[[package]]
+name = "cpufeatures"
+version = "0.2.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "crypto-common"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
+dependencies = [
+ "generic-array",
+ "typenum",
+]
+
+[[package]]
+name = "digest"
+version = "0.10.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
+dependencies = [
+ "block-buffer",
+ "crypto-common",
+]
+
+[[package]]
+name = "dlv-list"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
+
+[[package]]
+name = "env_logger"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
+dependencies = [
+ "humantime",
+ "is-terminal",
+ "log",
+ "regex",
+ "termcolor",
+]
+
+[[package]]
+name = "epoll"
+version = "4.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "74351c3392ea1ff6cd2628e0042d268ac2371cb613252ff383b6dfa50d22fa79"
+dependencies = [
+ "bitflags 2.3.3",
+ "libc",
+]
+
+[[package]]
+name = "equivalent"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1"
+
+[[package]]
+name = "errno"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
+dependencies = [
+ "errno-dragonfly",
+ "libc",
+ "windows-sys",
+]
+
+[[package]]
+name = "errno-dragonfly"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
+dependencies = [
+ "cc",
+ "libc",
+]
+
+[[package]]
+name = "fastrand"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
+dependencies = [
+ "instant",
+]
+
+[[package]]
+name = "futures"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
+name = "futures-core"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+ "num_cpus",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
+
+[[package]]
+name = "futures-task"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
+
+[[package]]
+name = "futures-util"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
+[[package]]
+name = "generic-array"
+version = "0.14.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
+dependencies = [
+ "typenum",
+ "version_check",
+]
+
+[[package]]
+name = "getrandom"
+version = "0.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "wasi",
+]
+
+[[package]]
+name = "hashbrown"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
+dependencies = [
+ "ahash",
+]
+
+[[package]]
+name = "hashbrown"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
+
+[[package]]
+name = "heck"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
+
+[[package]]
+name = "hermit-abi"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
+
+[[package]]
+name = "humantime"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
+
+[[package]]
+name = "indexmap"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
+dependencies = [
+ "equivalent",
+ "hashbrown 0.14.0",
+]
+
+[[package]]
+name = "instant"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
+name = "io-lifetimes"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "windows-sys",
+]
+
+[[package]]
+name = "is-terminal"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
+dependencies = [
+ "hermit-abi",
+ "rustix 0.38.3",
+ "windows-sys",
+]
+
+[[package]]
+name = "itoa"
+version = "1.0.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "62b02a5381cc465bd3041d84623d0fa3b66738b52b8e2fc3bab8ad63ab032f4a"
+
+[[package]]
+name = "json5"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
+dependencies = [
+ "pest",
+ "pest_derive",
+ "serde",
+]
+
+[[package]]
+name = "lazy_static"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+
+[[package]]
+name = "libc"
+version = "0.2.147"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
+
+[[package]]
+name = "linked-hash-map"
+version = "0.5.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
+
+[[package]]
+name = "linux-raw-sys"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
+
+[[package]]
+name = "linux-raw-sys"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0"
+
+[[package]]
+name = "log"
+version = "0.4.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
+
+[[package]]
+name = "memchr"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
+
+[[package]]
+name = "minimal-lexical"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+
+[[package]]
+name = "nom"
+version = "7.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
+dependencies = [
+ "memchr",
+ "minimal-lexical",
+]
+
+[[package]]
+name = "num_cpus"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
+dependencies = [
+ "hermit-abi",
+ "libc",
+]
+
+[[package]]
+name = "once_cell"
+version = "1.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+
+[[package]]
+name = "ordered-multimap"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a"
+dependencies = [
+ "dlv-list",
+ "hashbrown 0.12.3",
+]
+
+[[package]]
+name = "pathdiff"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd"
+
+[[package]]
+name = "pest"
+version = "2.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0d2d1d55045829d65aad9d389139882ad623b33b904e7c9f1b10c5b8927298e5"
+dependencies = [
+ "thiserror",
+ "ucd-trie",
+]
+
+[[package]]
+name = "pest_derive"
+version = "2.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aef623c9bbfa0eedf5a0efba11a5ee83209c326653ca31ff019bec3a95bfff2b"
+dependencies = [
+ "pest",
+ "pest_generator",
+]
+
+[[package]]
+name = "pest_generator"
+version = "2.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b3e8cba4ec22bada7fc55ffe51e2deb6a0e0db2d0b7ab0b103acc80d2510c190"
+dependencies = [
+ "pest",
+ "pest_meta",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "pest_meta"
+version = "2.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a01f71cb40bd8bb94232df14b946909e14660e33fc05db3e50ae2a82d7ea0ca0"
+dependencies = [
+ "once_cell",
+ "pest",
+ "sha2",
+]
+
+[[package]]
+name = "pin-project-lite"
+version = "0.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57"
+
+[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
+[[package]]
+name = "proc-macro2"
+version = "1.0.63"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb"
+dependencies = [
+ "unicode-ident",
+]
+
+[[package]]
+name = "quote"
+version = "1.0.29"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105"
+dependencies = [
+ "proc-macro2",
+]
+
+[[package]]
+name = "redox_syscall"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
+dependencies = [
+ "bitflags 1.3.2",
+]
+
+[[package]]
+name = "regex"
+version = "1.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-automata",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "83d3daa6976cffb758ec878f108ba0e062a45b2d6ca3a2cca965338855476caf"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
+
+[[package]]
+name = "ron"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a"
+dependencies = [
+ "base64",
+ "bitflags 1.3.2",
+ "serde",
+]
+
+[[package]]
+name = "rust-ini"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df"
+dependencies = [
+ "cfg-if",
+ "ordered-multimap",
+]
+
+[[package]]
+name = "rustix"
+version = "0.37.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06"
+dependencies = [
+ "bitflags 1.3.2",
+ "errno",
+ "io-lifetimes",
+ "libc",
+ "linux-raw-sys 0.3.8",
+ "windows-sys",
+]
+
+[[package]]
+name = "rustix"
+version = "0.38.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4"
+dependencies = [
+ "bitflags 2.3.3",
+ "errno",
+ "libc",
+ "linux-raw-sys 0.4.3",
+ "windows-sys",
+]
+
+[[package]]
+name = "ryu"
+version = "1.0.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
+
+[[package]]
+name = "serde"
+version = "1.0.168"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d614f89548720367ded108b3c843be93f3a341e22d5674ca0dd5cd57f34926af"
+dependencies = [
+ "serde_derive",
+]
+
+[[package]]
+name = "serde_derive"
+version = "1.0.168"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4fe589678c688e44177da4f27152ee2d190757271dc7f1d5b6b9f68d869d641"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "serde_json"
+version = "1.0.100"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f1e14e89be7aa4c4b78bdbdc9eb5bf8517829a600ae8eaa39a6e1d960b5185c"
+dependencies = [
+ "itoa",
+ "ryu",
+ "serde",
+]
+
+[[package]]
+name = "serde_yaml"
+version = "0.9.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "452e67b9c20c37fa79df53201dc03839651086ed9bbe92b3ca585ca9fdaa7d85"
+dependencies = [
+ "indexmap",
+ "itoa",
+ "ryu",
+ "serde",
+ "unsafe-libyaml",
+]
+
+[[package]]
+name = "sha2"
+version = "0.10.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8"
+dependencies = [
+ "cfg-if",
+ "cpufeatures",
+ "digest",
+]
+
+[[package]]
+name = "slab"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "strsim"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+
+[[package]]
+name = "syn"
+version = "2.0.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
+[[package]]
+name = "tempfile"
+version = "3.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6"
+dependencies = [
+ "autocfg",
+ "cfg-if",
+ "fastrand",
+ "redox_syscall",
+ "rustix 0.37.23",
+ "windows-sys",
+]
+
+[[package]]
+name = "termcolor"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
+dependencies = [
+ "winapi-util",
+]
+
+[[package]]
+name = "thiserror"
+version = "1.0.41"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c16a64ba9387ef3fdae4f9c1a7f07a0997fce91985c0336f1ddc1822b3b37802"
+dependencies = [
+ "thiserror-impl",
+]
+
+[[package]]
+name = "thiserror-impl"
+version = "1.0.41"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d14928354b01c4d6a4f0e549069adef399a284e7995c7ccca94e8a07a5346c59"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "toml"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "typenum"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
+
+[[package]]
+name = "ucd-trie"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
+
+[[package]]
+name = "unicode-ident"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
+
+[[package]]
+name = "unsafe-libyaml"
+version = "0.2.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1865806a559042e51ab5414598446a5871b561d21b6764f2eabb0dd481d880a6"
+
+[[package]]
+name = "utf8parse"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
+
+[[package]]
+name = "version_check"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
+
+[[package]]
+name = "vhost"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73832f4d8d636d63d9b145e8ef22a2c50b93f4d24eb7a99c9e6781b1b08549cf"
+dependencies = [
+ "bitflags 1.3.2",
+ "libc",
+ "vm-memory",
+ "vmm-sys-util",
+]
+
+[[package]]
+name = "vhost-device-vsock"
+version = "0.1.0"
+dependencies = [
+ "byteorder",
+ "clap",
+ "config",
+ "env_logger",
+ "epoll",
+ "futures",
+ "log",
+ "serde",
+ "serde_yaml",
+ "tempfile",
+ "thiserror",
+ "vhost",
+ "vhost-user-backend",
+ "virtio-bindings",
+ "virtio-queue",
+ "virtio-vsock",
+ "vm-memory",
+ "vmm-sys-util",
+]
+
+[[package]]
+name = "vhost-user-backend"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3ea9d5e8ec847cde4df1c04e586698a479706fd6beca37323f9d425b24b4c2f"
+dependencies = [
+ "libc",
+ "log",
+ "vhost",
+ "virtio-bindings",
+ "virtio-queue",
+ "vm-memory",
+ "vmm-sys-util",
+]
+
+[[package]]
+name = "virtio-bindings"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c18d7b74098a946470ea265b5bacbbf877abc3373021388454de0d47735a5b98"
+
+[[package]]
+name = "virtio-queue"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35aca00da06841bd99162c381ec65893cace23ca0fb89254302cfe4bec4c300f"
+dependencies = [
+ "log",
+ "virtio-bindings",
+ "vm-memory",
+ "vmm-sys-util",
+]
+
+[[package]]
+name = "virtio-vsock"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c92d1d0c0db339e03dc275e86e5de2654ed94b351f02d405a3a0260dfc1b839f"
+dependencies = [
+ "virtio-bindings",
+ "virtio-queue",
+ "vm-memory",
+]
+
+[[package]]
+name = "vm-memory"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a77c7a0891cbac53618f5f6eec650ed1dc4f7e506bbe14877aff49d94b8408b0"
+dependencies = [
+ "arc-swap",
+ "bitflags 1.3.2",
+ "libc",
+ "thiserror",
+ "vmm-sys-util",
+ "winapi",
+]
+
+[[package]]
+name = "vmm-sys-util"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd64fe09d8e880e600c324e7d664760a17f56e9672b7495a86381b49e4f72f46"
+dependencies = [
+ "bitflags 1.3.2",
+ "libc",
+]
+
+[[package]]
+name = "wasi"
+version = "0.11.0+wasi-snapshot-preview1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
+
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
+[[package]]
+name = "winapi-util"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
+dependencies = [
+ "winapi",
+]
+
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
+[[package]]
+name = "windows-sys"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
+dependencies = [
+ "windows-targets",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.48.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f"
+dependencies = [
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc",
+ "windows_i686_gnu",
+ "windows_i686_msvc",
+ "windows_x86_64_gnu",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc",
+]
+
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
+
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
+
+[[package]]
+name = "windows_i686_gnu"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
+
+[[package]]
+name = "windows_i686_msvc"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
+
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
+
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
+
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
+
+[[package]]
+name = "yaml-rust"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
+dependencies = [
+ "linked-hash-map",
+]
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..ec30591
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,95 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+name = "vhost-device-vsock"
+version = "0.1.0"
+authors = [
+ "Harshavardhan Unnibhavi <harshanavkis@gmail.com>",
+ "Stefano Garzarella <sgarzare@redhat.com>",
+]
+description = "A virtio-vsock device using the vhost-user protocol."
+readme = "README.md"
+keywords = [
+ "vhost",
+ "vsock",
+]
+license = "Apache-2.0 OR BSD-3-Clause"
+repository = "https://github.com/rust-vmm/vhost-device"
+
+[dependencies.byteorder]
+version = "1"
+
+[dependencies.clap]
+version = "4.3"
+features = ["derive"]
+
+[dependencies.config]
+version = "0.13"
+
+[dependencies.env_logger]
+version = "0.10"
+
+[dependencies.epoll]
+version = "4.3.2"
+
+[dependencies.futures]
+version = "0.3"
+features = ["thread-pool"]
+
+[dependencies.log]
+version = "0.4"
+
+[dependencies.serde]
+version = "1"
+
+[dependencies.serde_yaml]
+version = "0.9"
+
+[dependencies.thiserror]
+version = "1.0"
+
+[dependencies.vhost]
+version = "0.8"
+features = ["vhost-user-slave"]
+
+[dependencies.vhost-user-backend]
+version = "0.10"
+
+[dependencies.virtio-bindings]
+version = "0.2.1"
+
+[dependencies.virtio-queue]
+version = "0.9"
+
+[dependencies.virtio-vsock]
+version = "0.3.1"
+
+[dependencies.vm-memory]
+version = "0.12"
+
+[dependencies.vmm-sys-util]
+version = "0.11"
+
+[dev-dependencies.tempfile]
+version = "3.6.0"
+
+[dev-dependencies.virtio-queue]
+version = "0.9"
+features = ["test-utils"]
+
+[features]
+xen = [
+ "vm-memory/xen",
+ "vhost/xen",
+ "vhost-user-backend/xen",
+]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..2b37c56
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,36 @@
+[package]
+name = "vhost-device-vsock"
+version = "0.1.0"
+authors = ["Harshavardhan Unnibhavi <harshanavkis@gmail.com>", "Stefano Garzarella <sgarzare@redhat.com>"]
+description = "A virtio-vsock device using the vhost-user protocol."
+repository = "https://github.com/rust-vmm/vhost-device"
+readme = "README.md"
+keywords = ["vhost", "vsock"]
+license = "Apache-2.0 OR BSD-3-Clause"
+edition = "2018"
+
+[features]
+xen = ["vm-memory/xen", "vhost/xen", "vhost-user-backend/xen"]
+
+[dependencies]
+byteorder = "1"
+clap = { version = "4.3", features = ["derive"] }
+env_logger = "0.10"
+epoll = "4.3.2"
+futures = { version = "0.3", features = ["thread-pool"] }
+log = "0.4"
+thiserror = "1.0"
+vhost = { version = "0.8", features = ["vhost-user-slave"] }
+vhost-user-backend = "0.10"
+virtio-bindings = "0.2.1"
+virtio-queue = "0.9"
+virtio-vsock = "0.3.1"
+vm-memory = "0.12"
+vmm-sys-util = "0.11"
+config = "0.13"
+serde = "1"
+serde_yaml = "0.9"
+
+[dev-dependencies]
+virtio-queue = { version = "0.9", features = ["test-utils"] }
+tempfile = "3.6.0"
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..59dc6b5
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,228 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+Copyright 2022 The rust-vmm authors.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its contributors
+may be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/LICENSE-BSD-3-Clause b/LICENSE-BSD-3-Clause
new file mode 100644
index 0000000..dd975d9
--- /dev/null
+++ b/LICENSE-BSD-3-Clause
@@ -0,0 +1,26 @@
+Copyright 2022 The rust-vmm authors.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its contributors
+may be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..456b5a6
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,19 @@
+name: "vhost-device-vsock"
+description: "A virtio-vsock device using the vhost-user protocol."
+third_party {
+ url {
+ type: HOMEPAGE
+ value: "https://crates.io/crates/vhost-device-vsock"
+ }
+ url {
+ type: ARCHIVE
+ value: "https://static.crates.io/crates/vhost-device-vsock/vhost-device-vsock-0.1.0.crate"
+ }
+ version: "0.1.0"
+ license_type: NOTICE
+ last_upgrade_date {
+ year: 2023
+ month: 7
+ day: 26
+ }
+}
diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_APACHE2
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..0b793ad
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1,2 @@
+include platform/prebuilts/rust:master:/OWNERS
+jeongik@google.com
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..f4e946d
--- /dev/null
+++ b/README.md
@@ -0,0 +1,178 @@
+# vhost-device-vsock
+
+## Design
+
+The crate introduces a vhost-device-vsock device that enables communication between an
+application running in the guest i.e inside a VM and an application running on the
+host i.e outside the VM. The application running in the guest communicates over VM
+sockets i.e over AF_VSOCK sockets. The application running on the host connects to a
+unix socket on the host i.e communicates over AF_UNIX sockets. The main components of
+the crate are split into various files as described below:
+
+- [packet.rs](src/packet.rs)
+ - Introduces the **VsockPacket** structure that represents a single vsock packet
+ processing methods.
+- [rxops.rs](src/rxops.rs)
+ - Introduces various vsock operations that are enqueued into the rxqueue to be sent to the
+ guest. Exposes a **RxOps** structure.
+- [rxqueue.rs](src/rxqueue.rs)
+ - rxqueue contains the pending rx operations corresponding to that connection. The queue is
+ represented as a bitmap as we handle connection-oriented connections. The module contains
+ various queue manipulation methods. Exposes a **RxQueue** structure.
+- [thread_backend.rs](src/thread_backend.rs)
+ - Multiplexes connections between host and guest and calls into per connection methods that
+ are responsible for processing data and packets corresponding to the connection. Exposes a
+ **VsockThreadBackend** structure.
+- [txbuf.rs](src/txbuf.rs)
+ - Module to buffer data that is sent from the guest to the host. The module exposes a **LocalTxBuf**
+ structure.
+- [vhost_user_vsock_thread.rs](src/vhost_user_vsock_thread.rs)
+ - Module exposes a **VhostUserVsockThread** structure. It also handles new host initiated
+ connections and provides interfaces for registering host connections with the epoll fd. Also
+ provides interfaces for iterating through the rx and tx queues.
+- [vsock_conn.rs](src/vsock_conn.rs)
+ - Module introduces a **VsockConnection** structure that represents a single vsock connection
+ between the guest and the host. It also processes packets according to their type.
+- [vhu_vsock.rs](src/vhu_vsock.rs)
+ - exposes the main vhost user vsock backend interface.
+
+## Usage
+
+Run the vhost-device-vsock device:
+```
+vhost-device-vsock --guest-cid=<CID assigned to the guest> \
+ --socket=<path to the Unix socket to be created to communicate with the VMM via the vhost-user protocol> \
+ --uds-path=<path to the Unix socket to communicate with the guest via the virtio-vsock device> \
+ [--tx-buffer-size=<size of the buffer used for the TX virtqueue (guest->host packets)>]
+```
+or
+```
+vhost-device-vsock --vm guest_cid=<CID assigned to the guest>,socket=<path to the Unix socket to be created to communicate with the VMM via the vhost-user protocol>,uds-path=<path to the Unix socket to communicate with the guest via the virtio-vsock device>[,tx-buffer-size=<size of the buffer used for the TX virtqueue (guest->host packets)>]
+```
+
+Specify the `--vm` argument multiple times to specify multiple devices like this:
+```
+vhost-device-vsock \
+--vm guest-cid=3,socket=/tmp/vhost3.socket,uds-path=/tmp/vm3.vsock \
+--vm guest-cid=4,socket=/tmp/vhost4.socket,uds-path=/tmp/vm4.vsock,tx-buffer-size=32768
+```
+
+Or use a configuration file:
+```
+vhost-device-vsock --config=<path to the local yaml configuration file>
+```
+
+Configuration file example:
+```yaml
+vms:
+ - guest_cid: 3
+ socket: /tmp/vhost3.socket
+ uds_path: /tmp/vm3.sock
+ tx_buffer_size: 65536
+ - guest_cid: 4
+ socket: /tmp/vhost4.socket
+ uds_path: /tmp/vm4.sock
+ tx_buffer_size: 32768
+```
+
+Run VMM (e.g. QEMU):
+
+```
+qemu-system-x86_64 \
+ <normal QEMU options> \
+ -object memory-backend-file,share=on,id=mem0,size=<Guest RAM size>,mem-path=<Guest RAM file path> \ # size == -m size
+ -machine <machine options>,memory-backend=mem0 \
+ -chardev socket,id=char0,reconnect=0,path=<vhost-user socket path> \
+ -device vhost-user-vsock-pci,chardev=char0
+```
+
+## Working example
+
+```sh
+shell1$ vhost-device-vsock --vm guest-cid=4,uds-path=/tmp/vm4.vsock,socket=/tmp/vhost4.socket
+```
+or if you want to configure the TX buffer size
+```sh
+shell1$ vhost-device-vsock --vm guest-cid=4,uds-path=/tmp/vm4.vsock,socket=/tmp/vhost4.socket,tx-buffer-size=65536
+```
+
+```sh
+shell2$ qemu-system-x86_64 \
+ -drive file=vm.qcow2,format=qcow2,if=virtio -smp 2 -m 512M -mem-prealloc \
+ -object memory-backend-file,share=on,id=mem0,size=512M,mem-path="/dev/hugepages" \
+ -machine q35,accel=kvm,memory-backend=mem0 \
+ -chardev socket,id=char0,reconnect=0,path=/tmp/vhost4.socket \
+ -device vhost-user-vsock-pci,chardev=char0
+```
+
+### Guest listening
+
+#### iperf
+
+```sh
+# https://github.com/stefano-garzarella/iperf-vsock
+guest$ iperf3 --vsock -s
+host$ iperf3 --vsock -c /tmp/vm4.vsock
+```
+
+#### netcat
+
+```sh
+guest$ nc --vsock -l 1234
+
+host$ nc -U /tmp/vm4.vsock
+CONNECT 1234
+```
+
+### Host listening
+
+#### iperf
+
+```sh
+# https://github.com/stefano-garzarella/iperf-vsock
+host$ iperf3 --vsock -s -B /tmp/vm4.vsock
+guest$ iperf3 --vsock -c 2
+```
+
+#### netcat
+
+```sh
+host$ nc -l -U /tmp/vm4.vsock_1234
+
+guest$ nc --vsock 2 1234
+```
+
+### Sibling VM communication
+
+If you add multiple VMs, they can communicate with each other. For example, if you have two VMs with
+CID 3 and 4, you can run the following commands to make them communicate:
+
+```sh
+shell1$ vhost-device-vsock --vm guest-cid=3,uds-path=/tmp/vm3.vsock,socket=/tmp/vhost3.socket \
+ --vm guest-cid=4,uds-path=/tmp/vm4.vsock,socket=/tmp/vhost4.socket
+shell2$ qemu-system-x86_64 \
+ -drive file=vm1.qcow2,format=qcow2,if=virtio -smp 2 -m 512M -mem-prealloc \
+ -object memory-backend-file,share=on,id=mem0,size=512M,mem-path="/dev/hugepages" \
+ -machine q35,accel=kvm,memory-backend=mem0 \
+ -chardev socket,id=char0,reconnect=0,path=/tmp/vhost3.socket \
+ -device vhost-user-vsock-pci,chardev=char0
+shell3$ qemu-system-x86_64 \
+ -drive file=vm2.qcow2,format=qcow2,if=virtio -smp 2 -m 512M -mem-prealloc \
+ -object memory-backend-file,share=on,id=mem0,size=512M,mem-path="/dev/hugepages2" \
+ -machine q35,accel=kvm,memory-backend=mem0 \
+ -chardev socket,id=char0,reconnect=0,path=/tmp/vhost4.socket \
+ -device vhost-user-vsock-pci,chardev=char0
+```
+
+```sh
+# nc-vsock patched to set `.svm_flags = VMADDR_FLAG_TO_HOST`
+guest_cid3$ nc-vsock -l 1234
+guest_cid4$ nc-vsock 3 1234
+```
+
+## License
+
+This project is licensed under either of
+
+- [Apache License](http://www.apache.org/licenses/LICENSE-2.0), Version 2.0
+- [BSD-3-Clause License](https://opensource.org/licenses/BSD-3-Clause)
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..51f3759
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,471 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+mod rxops;
+mod rxqueue;
+mod thread_backend;
+mod txbuf;
+mod vhu_vsock;
+mod vhu_vsock_thread;
+mod vsock_conn;
+
+use std::{
+ collections::HashMap,
+ convert::TryFrom,
+ process::exit,
+ sync::{Arc, RwLock},
+ thread,
+};
+
+use crate::vhu_vsock::{CidMap, VhostUserVsockBackend, VsockConfig};
+use clap::{Args, Parser};
+use log::{error, info, warn};
+use serde::Deserialize;
+use thiserror::Error as ThisError;
+use vhost::{vhost_user, vhost_user::Listener};
+use vhost_user_backend::VhostUserDaemon;
+use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
+
+const DEFAULT_GUEST_CID: u64 = 3;
+const DEFAULT_TX_BUFFER_SIZE: u32 = 64 * 1024;
+
+#[derive(Debug, ThisError)]
+enum CliError {
+ #[error("No arguments provided")]
+ NoArgsProvided,
+ #[error("Failed to parse configuration file")]
+ ConfigParse,
+}
+
+#[derive(Debug, ThisError)]
+enum VmArgsParseError {
+ #[error("Bad argument")]
+ BadArgument,
+ #[error("Invalid key `{0}`")]
+ InvalidKey(String),
+ #[error("Unable to convert string to integer: {0}")]
+ ParseInteger(std::num::ParseIntError),
+ #[error("Required key `{0}` not found")]
+ RequiredKeyNotFound(String),
+}
+
+#[derive(Debug, ThisError)]
+enum BackendError {
+ #[error("Could not create backend: {0}")]
+ CouldNotCreateBackend(vhu_vsock::Error),
+ #[error("Could not create daemon: {0}")]
+ CouldNotCreateDaemon(vhost_user_backend::Error),
+}
+
+#[derive(Args, Clone, Debug, Deserialize)]
+struct VsockParam {
+ /// Context identifier of the guest which uniquely identifies the device for its lifetime.
+ #[arg(
+ long,
+ default_value_t = DEFAULT_GUEST_CID,
+ conflicts_with = "config",
+ conflicts_with = "vm"
+ )]
+ guest_cid: u64,
+
+ /// Unix socket to which a hypervisor connects to and sets up the control path with the device.
+ #[arg(long, conflicts_with = "config", conflicts_with = "vm")]
+ socket: String,
+
+ /// Unix socket to which a host-side application connects to.
+ #[arg(long, conflicts_with = "config", conflicts_with = "vm")]
+ uds_path: String,
+
+ /// The size of the buffer used for the TX virtqueue
+ #[clap(long, default_value_t = DEFAULT_TX_BUFFER_SIZE, conflicts_with = "config", conflicts_with = "vm")]
+ tx_buffer_size: u32,
+}
+
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct VsockArgs {
+ #[command(flatten)]
+ param: Option<VsockParam>,
+
+ /// Device parameters corresponding to a VM in the form of comma separated key=value pairs.
+ /// The allowed keys are: guest_cid, socket, uds_path and tx_buffer_size
+ /// Example: --vm guest-cid=3,socket=/tmp/vhost3.socket,uds-path=/tmp/vm3.vsock,tx-buffer-size=65536
+ /// Multiple instances of this argument can be provided to configure devices for multiple guests.
+ #[arg(long, conflicts_with = "config", verbatim_doc_comment, value_parser = parse_vm_params)]
+ vm: Option<Vec<VsockConfig>>,
+
+ /// Load from a given configuration file
+ #[arg(long)]
+ config: Option<String>,
+}
+
+fn parse_vm_params(s: &str) -> Result<VsockConfig, VmArgsParseError> {
+ let mut guest_cid = None;
+ let mut socket = None;
+ let mut uds_path = None;
+ let mut tx_buffer_size = None;
+
+ for arg in s.trim().split(',') {
+ let mut parts = arg.split('=');
+ let key = parts.next().ok_or(VmArgsParseError::BadArgument)?;
+ let val = parts.next().ok_or(VmArgsParseError::BadArgument)?;
+
+ match key {
+ "guest_cid" | "guest-cid" => {
+ guest_cid = Some(val.parse().map_err(VmArgsParseError::ParseInteger)?)
+ }
+ "socket" => socket = Some(val.to_string()),
+ "uds_path" | "uds-path" => uds_path = Some(val.to_string()),
+ "tx_buffer_size" | "tx-buffer-size" => {
+ tx_buffer_size = Some(val.parse().map_err(VmArgsParseError::ParseInteger)?)
+ }
+ _ => return Err(VmArgsParseError::InvalidKey(key.to_string())),
+ }
+ }
+
+ Ok(VsockConfig::new(
+ guest_cid.unwrap_or(DEFAULT_GUEST_CID),
+ socket.ok_or_else(|| VmArgsParseError::RequiredKeyNotFound("socket".to_string()))?,
+ uds_path.ok_or_else(|| VmArgsParseError::RequiredKeyNotFound("uds-path".to_string()))?,
+ tx_buffer_size.unwrap_or(DEFAULT_TX_BUFFER_SIZE),
+ ))
+}
+
+impl VsockArgs {
+ pub fn parse_config(&self) -> Option<Result<Vec<VsockConfig>, CliError>> {
+ if let Some(c) = &self.config {
+ let b = config::Config::builder()
+ .add_source(config::File::new(c.as_str(), config::FileFormat::Yaml))
+ .build();
+ if let Ok(s) = b {
+ let mut v = s.get::<Vec<VsockParam>>("vms").unwrap();
+ if !v.is_empty() {
+ let parsed: Vec<VsockConfig> = v
+ .drain(..)
+ .map(|p| {
+ VsockConfig::new(
+ p.guest_cid,
+ p.socket.trim().to_string(),
+ p.uds_path.trim().to_string(),
+ p.tx_buffer_size,
+ )
+ })
+ .collect();
+ return Some(Ok(parsed));
+ } else {
+ return Some(Err(CliError::ConfigParse));
+ }
+ } else {
+ return Some(Err(CliError::ConfigParse));
+ }
+ }
+ None
+ }
+}
+
+impl TryFrom<VsockArgs> for Vec<VsockConfig> {
+ type Error = CliError;
+
+ fn try_from(cmd_args: VsockArgs) -> Result<Self, CliError> {
+ // we try to use the configuration first, if failed, then fall back to the manual settings.
+ match cmd_args.parse_config() {
+ Some(c) => c,
+ _ => match cmd_args.vm {
+ Some(v) => Ok(v),
+ _ => cmd_args.param.map_or(Err(CliError::NoArgsProvided), |p| {
+ Ok(vec![VsockConfig::new(
+ p.guest_cid,
+ p.socket.trim().to_string(),
+ p.uds_path.trim().to_string(),
+ p.tx_buffer_size,
+ )])
+ }),
+ },
+ }
+ }
+}
+
+/// This is the public API through which an external program starts the
+/// vhost-device-vsock backend server.
+pub(crate) fn start_backend_server(
+ config: VsockConfig,
+ cid_map: Arc<RwLock<CidMap>>,
+) -> Result<(), BackendError> {
+ loop {
+ let backend = Arc::new(
+ VhostUserVsockBackend::new(config.clone(), cid_map.clone())
+ .map_err(BackendError::CouldNotCreateBackend)?,
+ );
+
+ let listener = Listener::new(config.get_socket_path(), true).unwrap();
+
+ let mut daemon = VhostUserDaemon::new(
+ String::from("vhost-device-vsock"),
+ backend.clone(),
+ GuestMemoryAtomic::new(GuestMemoryMmap::new()),
+ )
+ .map_err(BackendError::CouldNotCreateDaemon)?;
+
+ let mut vring_workers = daemon.get_epoll_handlers();
+
+ for thread in backend.threads.iter() {
+ thread
+ .lock()
+ .unwrap()
+ .set_vring_worker(Some(vring_workers.remove(0)));
+ }
+
+ daemon.start(listener).unwrap();
+
+ match daemon.wait() {
+ Ok(()) => {
+ info!("Stopping cleanly");
+ }
+ Err(vhost_user_backend::Error::HandleRequest(
+ vhost_user::Error::PartialMessage | vhost_user::Error::Disconnected,
+ )) => {
+ info!("vhost-user connection closed with partial message. If the VM is shutting down, this is expected behavior; otherwise, it might be a bug.");
+ }
+ Err(e) => {
+ warn!("Error running daemon: {:?}", e);
+ }
+ }
+
+ // No matter the result, we need to shut down the worker thread.
+ backend.exit_event.write(1).unwrap();
+ }
+}
+
+pub(crate) fn start_backend_servers(configs: &[VsockConfig]) -> Result<(), BackendError> {
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+ let mut handles = Vec::new();
+
+ for c in configs.iter() {
+ let config = c.clone();
+ let cid_map = cid_map.clone();
+ let handle = thread::Builder::new()
+ .name(format!("vhu-vsock-cid-{}", c.get_guest_cid()))
+ .spawn(move || start_backend_server(config, cid_map))
+ .unwrap();
+ handles.push(handle);
+ }
+
+ for handle in handles {
+ handle.join().unwrap()?;
+ }
+
+ Ok(())
+}
+
+fn main() {
+ env_logger::init();
+
+ let configs = match Vec::<VsockConfig>::try_from(VsockArgs::parse()) {
+ Ok(c) => c,
+ Err(e) => {
+ println!("Error parsing arguments: {}", e);
+ return;
+ }
+ };
+
+ if let Err(e) = start_backend_servers(&configs) {
+ error!("{e}");
+ exit(1);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::fs::File;
+ use std::io::Write;
+ use tempfile::tempdir;
+
+ impl VsockArgs {
+ fn from_args(guest_cid: u64, socket: &str, uds_path: &str, tx_buffer_size: u32) -> Self {
+ VsockArgs {
+ param: Some(VsockParam {
+ guest_cid,
+ socket: socket.to_string(),
+ uds_path: uds_path.to_string(),
+ tx_buffer_size,
+ }),
+ vm: None,
+ config: None,
+ }
+ }
+ fn from_file(config: &str) -> Self {
+ VsockArgs {
+ param: None,
+ vm: None,
+ config: Some(config.to_string()),
+ }
+ }
+ }
+
+ #[test]
+ fn test_vsock_config_setup() {
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let socket_path = test_dir.path().join("vhost4.socket").display().to_string();
+ let uds_path = test_dir.path().join("vm4.vsock").display().to_string();
+ let args = VsockArgs::from_args(3, &socket_path, &uds_path, 64 * 1024);
+
+ let configs = Vec::<VsockConfig>::try_from(args);
+ assert!(configs.is_ok());
+
+ let configs = configs.unwrap();
+ assert_eq!(configs.len(), 1);
+
+ let config = &configs[0];
+ assert_eq!(config.get_guest_cid(), 3);
+ assert_eq!(config.get_socket_path(), socket_path);
+ assert_eq!(config.get_uds_path(), uds_path);
+ assert_eq!(config.get_tx_buffer_size(), 64 * 1024);
+
+ test_dir.close().unwrap();
+ }
+
+ #[test]
+ fn test_vsock_config_setup_from_vm_args() {
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let socket_paths = [
+ test_dir.path().join("vhost3.socket"),
+ test_dir.path().join("vhost4.socket"),
+ test_dir.path().join("vhost5.socket"),
+ ];
+ let uds_paths = [
+ test_dir.path().join("vm3.vsock"),
+ test_dir.path().join("vm4.vsock"),
+ test_dir.path().join("vm5.vsock"),
+ ];
+ let params = format!(
+ "--vm socket={vhost3_socket},uds_path={vm3_vsock} \
+ --vm socket={vhost4_socket},uds-path={vm4_vsock},guest-cid=4,tx_buffer_size=65536 \
+ --vm guest-cid=5,socket={vhost5_socket},uds_path={vm5_vsock},tx-buffer-size=32768",
+ vhost3_socket = socket_paths[0].display(),
+ vhost4_socket = socket_paths[1].display(),
+ vhost5_socket = socket_paths[2].display(),
+ vm3_vsock = uds_paths[0].display(),
+ vm4_vsock = uds_paths[1].display(),
+ vm5_vsock = uds_paths[2].display(),
+ );
+
+ let mut params = params.split_whitespace().collect::<Vec<&str>>();
+ params.insert(0, ""); // to make the test binary name agnostic
+
+ let args = VsockArgs::parse_from(params);
+
+ let configs = Vec::<VsockConfig>::try_from(args);
+ assert!(configs.is_ok());
+
+ let configs = configs.unwrap();
+ assert_eq!(configs.len(), 3);
+
+ let config = configs.get(0).unwrap();
+ assert_eq!(config.get_guest_cid(), 3);
+ assert_eq!(
+ config.get_socket_path(),
+ socket_paths[0].display().to_string()
+ );
+ assert_eq!(config.get_uds_path(), uds_paths[0].display().to_string());
+ assert_eq!(config.get_tx_buffer_size(), 65536);
+
+ let config = configs.get(1).unwrap();
+ assert_eq!(config.get_guest_cid(), 4);
+ assert_eq!(
+ config.get_socket_path(),
+ socket_paths[1].display().to_string()
+ );
+ assert_eq!(config.get_uds_path(), uds_paths[1].display().to_string());
+ assert_eq!(config.get_tx_buffer_size(), 65536);
+
+ let config = configs.get(2).unwrap();
+ assert_eq!(config.get_guest_cid(), 5);
+ assert_eq!(
+ config.get_socket_path(),
+ socket_paths[2].display().to_string()
+ );
+ assert_eq!(config.get_uds_path(), uds_paths[2].display().to_string());
+ assert_eq!(config.get_tx_buffer_size(), 32768);
+
+ test_dir.close().unwrap();
+ }
+
+ #[test]
+ fn test_vsock_config_setup_from_file() {
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let config_path = test_dir.path().join("config.yaml");
+ let socket_path = test_dir.path().join("vhost4.socket");
+ let uds_path = test_dir.path().join("vm4.vsock");
+
+ let mut yaml = File::create(&config_path).unwrap();
+ yaml.write_all(
+ format!(
+ "vms:
+ - guest_cid: 4
+ socket: {}
+ uds_path: {}
+ tx_buffer_size: 65536",
+ socket_path.display(),
+ uds_path.display(),
+ )
+ .as_bytes(),
+ )
+ .unwrap();
+ let args = VsockArgs::from_file(&config_path.display().to_string());
+
+ let configs = Vec::<VsockConfig>::try_from(args).unwrap();
+ assert_eq!(configs.len(), 1);
+
+ let config = &configs[0];
+ assert_eq!(config.get_guest_cid(), 4);
+ assert_eq!(config.get_socket_path(), socket_path.display().to_string());
+ assert_eq!(config.get_uds_path(), uds_path.display().to_string());
+ std::fs::remove_file(&config_path).unwrap();
+
+ test_dir.close().unwrap();
+ }
+
+ #[test]
+ fn test_vsock_server() {
+ const CID: u64 = 3;
+ const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let vhost_socket_path = test_dir
+ .path()
+ .join("test_vsock_server.socket")
+ .display()
+ .to_string();
+ let vsock_socket_path = test_dir
+ .path()
+ .join("test_vsock_server.vsock")
+ .display()
+ .to_string();
+
+ let config = VsockConfig::new(CID, vhost_socket_path, vsock_socket_path, CONN_TX_BUF_SIZE);
+
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let backend = Arc::new(VhostUserVsockBackend::new(config, cid_map).unwrap());
+
+ let daemon = VhostUserDaemon::new(
+ String::from("vhost-device-vsock"),
+ backend.clone(),
+ GuestMemoryAtomic::new(GuestMemoryMmap::new()),
+ )
+ .unwrap();
+
+ let vring_workers = daemon.get_epoll_handlers();
+
+ // VhostUserVsockBackend support a single thread that handles the TX and RX queues
+ assert_eq!(backend.threads.len(), 1);
+
+ assert_eq!(vring_workers.len(), backend.threads.len());
+
+ test_dir.close().unwrap();
+ }
+}
diff --git a/src/rxops.rs b/src/rxops.rs
new file mode 100644
index 0000000..6f20466
--- /dev/null
+++ b/src/rxops.rs
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+#[derive(Clone, Copy, Eq, PartialEq, Debug)]
+pub(crate) enum RxOps {
+ /// VSOCK_OP_REQUEST
+ Request = 0,
+ /// VSOCK_OP_RW
+ Rw = 1,
+ /// VSOCK_OP_RESPONSE
+ Response = 2,
+ /// VSOCK_OP_CREDIT_UPDATE
+ CreditUpdate = 3,
+ /// VSOCK_OP_RST
+ Reset = 4,
+}
+
+impl RxOps {
+ /// Convert enum value into bitmask.
+ pub fn bitmask(self) -> u8 {
+ 1u8 << (self as u8)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_bitmask() {
+ assert_eq!(1, RxOps::Request.bitmask());
+ assert_eq!(2, RxOps::Rw.bitmask());
+ assert_eq!(4, RxOps::Response.bitmask());
+ assert_eq!(8, RxOps::CreditUpdate.bitmask());
+ assert_eq!(16, RxOps::Reset.bitmask());
+ }
+}
diff --git a/src/rxqueue.rs b/src/rxqueue.rs
new file mode 100644
index 0000000..4b76727
--- /dev/null
+++ b/src/rxqueue.rs
@@ -0,0 +1,157 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+use crate::rxops::RxOps;
+
+#[derive(Debug, Eq, PartialEq)]
+pub(crate) struct RxQueue {
+ /// Bitmap of rx operations.
+ queue: u8,
+}
+
+impl RxQueue {
+ /// New instance of RxQueue.
+ pub fn new() -> Self {
+ RxQueue { queue: 0_u8 }
+ }
+
+ /// Enqueue a new rx operation into the queue.
+ pub fn enqueue(&mut self, op: RxOps) {
+ self.queue |= op.bitmask();
+ }
+
+ /// Dequeue an rx operation from the queue.
+ pub fn dequeue(&mut self) -> Option<RxOps> {
+ match self.peek() {
+ Some(req) => {
+ self.queue &= !req.bitmask();
+ Some(req)
+ }
+ None => None,
+ }
+ }
+
+ /// Peek into the queue to check if it contains an rx operation.
+ pub fn peek(&self) -> Option<RxOps> {
+ if self.contains(RxOps::Request.bitmask()) {
+ return Some(RxOps::Request);
+ }
+ if self.contains(RxOps::Rw.bitmask()) {
+ return Some(RxOps::Rw);
+ }
+ if self.contains(RxOps::Response.bitmask()) {
+ return Some(RxOps::Response);
+ }
+ if self.contains(RxOps::CreditUpdate.bitmask()) {
+ return Some(RxOps::CreditUpdate);
+ }
+ if self.contains(RxOps::Reset.bitmask()) {
+ Some(RxOps::Reset)
+ } else {
+ None
+ }
+ }
+
+ /// Check if the queue contains a particular rx operation.
+ pub fn contains(&self, op: u8) -> bool {
+ (self.queue & op) != 0
+ }
+
+ /// Check if there are any pending rx operations in the queue.
+ pub fn pending_rx(&self) -> bool {
+ self.queue != 0
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_contains() {
+ let mut rxqueue = RxQueue::new();
+ rxqueue.queue = 31;
+
+ assert!(rxqueue.contains(RxOps::Request.bitmask()));
+ assert!(rxqueue.contains(RxOps::Rw.bitmask()));
+ assert!(rxqueue.contains(RxOps::Response.bitmask()));
+ assert!(rxqueue.contains(RxOps::CreditUpdate.bitmask()));
+ assert!(rxqueue.contains(RxOps::Reset.bitmask()));
+
+ rxqueue.queue = 0;
+ assert!(!rxqueue.contains(RxOps::Request.bitmask()));
+ assert!(!rxqueue.contains(RxOps::Rw.bitmask()));
+ assert!(!rxqueue.contains(RxOps::Response.bitmask()));
+ assert!(!rxqueue.contains(RxOps::CreditUpdate.bitmask()));
+ assert!(!rxqueue.contains(RxOps::Reset.bitmask()));
+ }
+
+ #[test]
+ fn test_enqueue() {
+ let mut rxqueue = RxQueue::new();
+
+ rxqueue.enqueue(RxOps::Request);
+ assert!(rxqueue.contains(RxOps::Request.bitmask()));
+
+ rxqueue.enqueue(RxOps::Rw);
+ assert!(rxqueue.contains(RxOps::Rw.bitmask()));
+
+ rxqueue.enqueue(RxOps::Response);
+ assert!(rxqueue.contains(RxOps::Response.bitmask()));
+
+ rxqueue.enqueue(RxOps::CreditUpdate);
+ assert!(rxqueue.contains(RxOps::CreditUpdate.bitmask()));
+
+ rxqueue.enqueue(RxOps::Reset);
+ assert!(rxqueue.contains(RxOps::Reset.bitmask()));
+ }
+
+ #[test]
+ fn test_peek() {
+ let mut rxqueue = RxQueue::new();
+
+ rxqueue.queue = 31;
+ assert_eq!(rxqueue.peek(), Some(RxOps::Request));
+
+ rxqueue.queue = 30;
+ assert_eq!(rxqueue.peek(), Some(RxOps::Rw));
+
+ rxqueue.queue = 28;
+ assert_eq!(rxqueue.peek(), Some(RxOps::Response));
+
+ rxqueue.queue = 24;
+ assert_eq!(rxqueue.peek(), Some(RxOps::CreditUpdate));
+
+ rxqueue.queue = 16;
+ assert_eq!(rxqueue.peek(), Some(RxOps::Reset));
+ }
+
+ #[test]
+ fn test_dequeue() {
+ let mut rxqueue = RxQueue::new();
+ rxqueue.queue = 31;
+
+ assert_eq!(rxqueue.dequeue(), Some(RxOps::Request));
+ assert!(!rxqueue.contains(RxOps::Request.bitmask()));
+
+ assert_eq!(rxqueue.dequeue(), Some(RxOps::Rw));
+ assert!(!rxqueue.contains(RxOps::Rw.bitmask()));
+
+ assert_eq!(rxqueue.dequeue(), Some(RxOps::Response));
+ assert!(!rxqueue.contains(RxOps::Response.bitmask()));
+
+ assert_eq!(rxqueue.dequeue(), Some(RxOps::CreditUpdate));
+ assert!(!rxqueue.contains(RxOps::CreditUpdate.bitmask()));
+
+ assert_eq!(rxqueue.dequeue(), Some(RxOps::Reset));
+ assert!(!rxqueue.contains(RxOps::Reset.bitmask()));
+ }
+
+ #[test]
+ fn test_pending_rx() {
+ let mut rxqueue = RxQueue::new();
+ assert!(!rxqueue.pending_rx());
+
+ rxqueue.queue = 1;
+ assert!(rxqueue.pending_rx());
+ }
+}
diff --git a/src/thread_backend.rs b/src/thread_backend.rs
new file mode 100644
index 0000000..5677ec5
--- /dev/null
+++ b/src/thread_backend.rs
@@ -0,0 +1,506 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+use std::{
+ collections::{HashMap, HashSet, VecDeque},
+ os::unix::{
+ net::UnixStream,
+ prelude::{AsRawFd, RawFd},
+ },
+ sync::{Arc, RwLock},
+};
+
+use log::{info, warn};
+use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
+use vm_memory::bitmap::BitmapSlice;
+
+use crate::{
+ rxops::*,
+ vhu_vsock::{
+ CidMap, ConnMapKey, Error, Result, VSOCK_HOST_CID, VSOCK_OP_REQUEST, VSOCK_OP_RST,
+ VSOCK_TYPE_STREAM,
+ },
+ vhu_vsock_thread::VhostUserVsockThread,
+ vsock_conn::*,
+};
+
+pub(crate) type RawPktsQ = VecDeque<RawVsockPacket>;
+
+pub(crate) struct RawVsockPacket {
+ pub header: [u8; PKT_HEADER_SIZE],
+ pub data: Vec<u8>,
+}
+
+impl RawVsockPacket {
+ fn from_vsock_packet<B: BitmapSlice>(pkt: &VsockPacket<B>) -> Result<Self> {
+ let mut raw_pkt = Self {
+ header: [0; PKT_HEADER_SIZE],
+ data: vec![0; pkt.len() as usize],
+ };
+
+ pkt.header_slice().copy_to(&mut raw_pkt.header);
+ if !pkt.is_empty() {
+ pkt.data_slice()
+ .ok_or(Error::PktBufMissing)?
+ .copy_to(raw_pkt.data.as_mut());
+ }
+
+ Ok(raw_pkt)
+ }
+}
+
+pub(crate) struct VsockThreadBackend {
+ /// Map of ConnMapKey objects indexed by raw file descriptors.
+ pub listener_map: HashMap<RawFd, ConnMapKey>,
+ /// Map of vsock connection objects indexed by ConnMapKey objects.
+ pub conn_map: HashMap<ConnMapKey, VsockConnection<UnixStream>>,
+ /// Queue of ConnMapKey objects indicating pending rx operations.
+ pub backend_rxq: VecDeque<ConnMapKey>,
+ /// Map of host-side unix streams indexed by raw file descriptors.
+ pub stream_map: HashMap<i32, UnixStream>,
+ /// Host side socket for listening to new connections from the host.
+ host_socket_path: String,
+ /// epoll for registering new host-side connections.
+ epoll_fd: i32,
+ /// CID of the guest.
+ guest_cid: u64,
+ /// Set of allocated local ports.
+ pub local_port_set: HashSet<u32>,
+ tx_buffer_size: u32,
+ /// Maps the guest CID to the corresponding backend. Used for sibling VM communication.
+ pub cid_map: Arc<RwLock<CidMap>>,
+ /// Queue of raw vsock packets recieved from sibling VMs to be sent to the guest.
+ pub raw_pkts_queue: Arc<RwLock<RawPktsQ>>,
+}
+
+impl VsockThreadBackend {
+ /// New instance of VsockThreadBackend.
+ pub fn new(
+ host_socket_path: String,
+ epoll_fd: i32,
+ guest_cid: u64,
+ tx_buffer_size: u32,
+ cid_map: Arc<RwLock<CidMap>>,
+ ) -> Self {
+ Self {
+ listener_map: HashMap::new(),
+ conn_map: HashMap::new(),
+ backend_rxq: VecDeque::new(),
+ // Need this map to prevent connected stream from closing
+ // TODO: think of a better solution
+ stream_map: HashMap::new(),
+ host_socket_path,
+ epoll_fd,
+ guest_cid,
+ local_port_set: HashSet::new(),
+ tx_buffer_size,
+ cid_map,
+ raw_pkts_queue: Arc::new(RwLock::new(VecDeque::new())),
+ }
+ }
+
+ /// Checks if there are pending rx requests in the backend rxq.
+ pub fn pending_rx(&self) -> bool {
+ !self.backend_rxq.is_empty()
+ }
+
+ /// Checks if there are pending raw vsock packets to be sent to the guest.
+ pub fn pending_raw_pkts(&self) -> bool {
+ !self.raw_pkts_queue.read().unwrap().is_empty()
+ }
+
+ /// Deliver a vsock packet to the guest vsock driver.
+ ///
+ /// Returns:
+ /// - `Ok(())` if the packet was successfully filled in
+ /// - `Err(Error::EmptyBackendRxQ) if there was no available data
+ pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
+ // Pop an event from the backend_rxq
+ let key = self.backend_rxq.pop_front().ok_or(Error::EmptyBackendRxQ)?;
+ let conn = match self.conn_map.get_mut(&key) {
+ Some(conn) => conn,
+ None => {
+ // assume that the connection does not exist
+ return Ok(());
+ }
+ };
+
+ if conn.rx_queue.peek() == Some(RxOps::Reset) {
+ // Handle RST events here
+ let conn = self.conn_map.remove(&key).unwrap();
+ self.listener_map.remove(&conn.stream.as_raw_fd());
+ self.stream_map.remove(&conn.stream.as_raw_fd());
+ self.local_port_set.remove(&conn.local_port);
+ VhostUserVsockThread::epoll_unregister(conn.epoll_fd, conn.stream.as_raw_fd())
+ .unwrap_or_else(|err| {
+ warn!(
+ "Could not remove epoll listener for fd {:?}: {:?}",
+ conn.stream.as_raw_fd(),
+ err
+ )
+ });
+
+ // Initialize the packet header to contain a VSOCK_OP_RST operation
+ pkt.set_op(VSOCK_OP_RST)
+ .set_src_cid(VSOCK_HOST_CID)
+ .set_dst_cid(conn.guest_cid)
+ .set_src_port(conn.local_port)
+ .set_dst_port(conn.peer_port)
+ .set_len(0)
+ .set_type(VSOCK_TYPE_STREAM)
+ .set_flags(0)
+ .set_buf_alloc(0)
+ .set_fwd_cnt(0);
+
+ return Ok(());
+ }
+
+ // Handle other packet types per connection
+ conn.recv_pkt(pkt)?;
+
+ Ok(())
+ }
+
+ /// Deliver a guest generated packet to its destination in the backend.
+ ///
+ /// Absorbs unexpected packets, handles rest to respective connection
+ /// object.
+ ///
+ /// Returns:
+ /// - always `Ok(())` if packet has been consumed correctly
+ pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
+ if pkt.src_cid() != self.guest_cid {
+ warn!(
+ "vsock: dropping packet with inconsistent src_cid: {:?} from guest configured with CID: {:?}",
+ pkt.src_cid(), self.guest_cid
+ );
+ return Ok(());
+ }
+
+ let dst_cid = pkt.dst_cid();
+ if dst_cid != VSOCK_HOST_CID {
+ let cid_map = self.cid_map.read().unwrap();
+ if cid_map.contains_key(&dst_cid) {
+ let (sibling_raw_pkts_queue, sibling_event_fd) = cid_map.get(&dst_cid).unwrap();
+
+ sibling_raw_pkts_queue
+ .write()
+ .unwrap()
+ .push_back(RawVsockPacket::from_vsock_packet(pkt)?);
+ let _ = sibling_event_fd.write(1);
+ } else {
+ warn!("vsock: dropping packet for unknown cid: {:?}", dst_cid);
+ }
+
+ return Ok(());
+ }
+
+ // TODO: Rst if packet has unsupported type
+ if pkt.type_() != VSOCK_TYPE_STREAM {
+ info!("vsock: dropping packet of unknown type");
+ return Ok(());
+ }
+
+ let key = ConnMapKey::new(pkt.dst_port(), pkt.src_port());
+
+ // TODO: Handle cases where connection does not exist and packet op
+ // is not VSOCK_OP_REQUEST
+ if !self.conn_map.contains_key(&key) {
+ // The packet contains a new connection request
+ if pkt.op() == VSOCK_OP_REQUEST {
+ self.handle_new_guest_conn(pkt);
+ } else {
+ // TODO: send back RST
+ }
+ return Ok(());
+ }
+
+ if pkt.op() == VSOCK_OP_RST {
+ // Handle an RST packet from the guest here
+ let conn = self.conn_map.get(&key).unwrap();
+ if conn.rx_queue.contains(RxOps::Reset.bitmask()) {
+ return Ok(());
+ }
+ let conn = self.conn_map.remove(&key).unwrap();
+ self.listener_map.remove(&conn.stream.as_raw_fd());
+ self.stream_map.remove(&conn.stream.as_raw_fd());
+ self.local_port_set.remove(&conn.local_port);
+ VhostUserVsockThread::epoll_unregister(conn.epoll_fd, conn.stream.as_raw_fd())
+ .unwrap_or_else(|err| {
+ warn!(
+ "Could not remove epoll listener for fd {:?}: {:?}",
+ conn.stream.as_raw_fd(),
+ err
+ )
+ });
+ return Ok(());
+ }
+
+ // Forward this packet to its listening connection
+ let conn = self.conn_map.get_mut(&key).unwrap();
+ conn.send_pkt(pkt)?;
+
+ if conn.rx_queue.pending_rx() {
+ // Required if the connection object adds new rx operations
+ self.backend_rxq.push_back(key);
+ }
+
+ Ok(())
+ }
+
+ /// Deliver a raw vsock packet sent from a sibling VM to the guest vsock driver.
+ ///
+ /// Returns:
+ /// - `Ok(())` if packet was successfully filled in
+ /// - `Err(Error::EmptyRawPktsQueue)` if there was no available data
+ pub fn recv_raw_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
+ let raw_vsock_pkt = self
+ .raw_pkts_queue
+ .write()
+ .unwrap()
+ .pop_front()
+ .ok_or(Error::EmptyRawPktsQueue)?;
+
+ pkt.set_header_from_raw(&raw_vsock_pkt.header).unwrap();
+ if !raw_vsock_pkt.data.is_empty() {
+ let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?;
+ buf.copy_from(&raw_vsock_pkt.data);
+ }
+
+ Ok(())
+ }
+
+ /// Handle a new guest initiated connection, i.e from the peer, the guest driver.
+ ///
+ /// Attempts to connect to a host side unix socket listening on a path
+ /// corresponding to the destination port as follows:
+ /// - "{self.host_sock_path}_{local_port}""
+ fn handle_new_guest_conn<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) {
+ let port_path = format!("{}_{}", self.host_socket_path, pkt.dst_port());
+
+ UnixStream::connect(port_path)
+ .and_then(|stream| stream.set_nonblocking(true).map(|_| stream))
+ .map_err(Error::UnixConnect)
+ .and_then(|stream| self.add_new_guest_conn(stream, pkt))
+ .unwrap_or_else(|_| self.enq_rst());
+ }
+
+ /// Wrapper to add new connection to relevant HashMaps.
+ fn add_new_guest_conn<B: BitmapSlice>(
+ &mut self,
+ stream: UnixStream,
+ pkt: &VsockPacket<B>,
+ ) -> Result<()> {
+ let stream_fd = stream.as_raw_fd();
+ self.listener_map
+ .insert(stream_fd, ConnMapKey::new(pkt.dst_port(), pkt.src_port()));
+
+ let conn = VsockConnection::new_peer_init(
+ stream.try_clone().map_err(Error::UnixConnect)?,
+ pkt.dst_cid(),
+ pkt.dst_port(),
+ pkt.src_cid(),
+ pkt.src_port(),
+ self.epoll_fd,
+ pkt.buf_alloc(),
+ self.tx_buffer_size,
+ );
+
+ self.conn_map
+ .insert(ConnMapKey::new(pkt.dst_port(), pkt.src_port()), conn);
+ self.backend_rxq
+ .push_back(ConnMapKey::new(pkt.dst_port(), pkt.src_port()));
+
+ self.stream_map.insert(stream_fd, stream);
+ self.local_port_set.insert(pkt.dst_port());
+
+ VhostUserVsockThread::epoll_register(
+ self.epoll_fd,
+ stream_fd,
+ epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
+ )?;
+ Ok(())
+ }
+
+ /// Enqueue RST packets to be sent to guest.
+ fn enq_rst(&mut self) {
+ // TODO
+ dbg!("New guest conn error: Enqueue RST");
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::vhu_vsock::{VhostUserVsockBackend, VsockConfig, VSOCK_OP_RW};
+ use std::os::unix::net::UnixListener;
+ use tempfile::tempdir;
+ use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
+
+ const DATA_LEN: usize = 16;
+ const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
+
+ #[test]
+ fn test_vsock_thread_backend() {
+ const CID: u64 = 3;
+ const VSOCK_PEER_PORT: u32 = 1234;
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let vsock_socket_path = test_dir.path().join("test_vsock_thread_backend.vsock");
+ let vsock_peer_path = test_dir.path().join("test_vsock_thread_backend.vsock_1234");
+
+ let _listener = UnixListener::bind(&vsock_peer_path).unwrap();
+
+ let epoll_fd = epoll::create(false).unwrap();
+
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let mut vtp = VsockThreadBackend::new(
+ vsock_socket_path.display().to_string(),
+ epoll_fd,
+ CID,
+ CONN_TX_BUF_SIZE,
+ cid_map,
+ );
+
+ assert!(!vtp.pending_rx());
+
+ let mut pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN];
+ let (hdr_raw, data_raw) = pkt_raw.split_at_mut(PKT_HEADER_SIZE);
+
+ // SAFETY: Safe as hdr_raw and data_raw are guaranteed to be valid.
+ let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() };
+
+ assert_eq!(
+ vtp.recv_pkt(&mut packet).unwrap_err().to_string(),
+ Error::EmptyBackendRxQ.to_string()
+ );
+
+ assert!(vtp.send_pkt(&packet).is_ok());
+
+ packet.set_type(VSOCK_TYPE_STREAM);
+ assert!(vtp.send_pkt(&packet).is_ok());
+
+ packet.set_src_cid(CID);
+ packet.set_dst_cid(VSOCK_HOST_CID);
+ packet.set_dst_port(VSOCK_PEER_PORT);
+ assert!(vtp.send_pkt(&packet).is_ok());
+
+ packet.set_op(VSOCK_OP_REQUEST);
+ assert!(vtp.send_pkt(&packet).is_ok());
+
+ packet.set_op(VSOCK_OP_RW);
+ assert!(vtp.send_pkt(&packet).is_ok());
+
+ packet.set_op(VSOCK_OP_RST);
+ assert!(vtp.send_pkt(&packet).is_ok());
+
+ assert!(vtp.recv_pkt(&mut packet).is_ok());
+
+ // cleanup
+ let _ = std::fs::remove_file(&vsock_peer_path);
+ let _ = std::fs::remove_file(&vsock_socket_path);
+
+ test_dir.close().unwrap();
+ }
+
+ #[test]
+ fn test_vsock_thread_backend_sibling_vms() {
+ const CID: u64 = 3;
+ const SIBLING_CID: u64 = 4;
+ const SIBLING_LISTENING_PORT: u32 = 1234;
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let vsock_socket_path = test_dir
+ .path()
+ .join("test_vsock_thread_backend.vsock")
+ .display()
+ .to_string();
+ let sibling_vhost_socket_path = test_dir
+ .path()
+ .join("test_vsock_thread_backend_sibling.socket")
+ .display()
+ .to_string();
+ let sibling_vsock_socket_path = test_dir
+ .path()
+ .join("test_vsock_thread_backend_sibling.vsock")
+ .display()
+ .to_string();
+
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let sibling_config = VsockConfig::new(
+ SIBLING_CID,
+ sibling_vhost_socket_path,
+ sibling_vsock_socket_path,
+ CONN_TX_BUF_SIZE,
+ );
+
+ let sibling_backend =
+ Arc::new(VhostUserVsockBackend::new(sibling_config, cid_map.clone()).unwrap());
+
+ let epoll_fd = epoll::create(false).unwrap();
+ let mut vtp =
+ VsockThreadBackend::new(vsock_socket_path, epoll_fd, CID, CONN_TX_BUF_SIZE, cid_map);
+
+ assert!(!vtp.pending_raw_pkts());
+
+ let mut pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN];
+ let (hdr_raw, data_raw) = pkt_raw.split_at_mut(PKT_HEADER_SIZE);
+
+ // SAFETY: Safe as hdr_raw and data_raw are guaranteed to be valid.
+ let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() };
+
+ assert_eq!(
+ vtp.recv_raw_pkt(&mut packet).unwrap_err().to_string(),
+ Error::EmptyRawPktsQueue.to_string()
+ );
+
+ packet.set_type(VSOCK_TYPE_STREAM);
+ packet.set_src_cid(CID);
+ packet.set_dst_cid(SIBLING_CID);
+ packet.set_dst_port(SIBLING_LISTENING_PORT);
+ packet.set_op(VSOCK_OP_RW);
+ packet.set_len(DATA_LEN as u32);
+ packet
+ .data_slice()
+ .unwrap()
+ .copy_from(&[0xCAu8, 0xFEu8, 0xBAu8, 0xBEu8]);
+
+ assert!(vtp.send_pkt(&packet).is_ok());
+ assert!(sibling_backend.threads[0]
+ .lock()
+ .unwrap()
+ .thread_backend
+ .pending_raw_pkts());
+
+ let mut recvd_pkt_raw = [0u8; PKT_HEADER_SIZE + DATA_LEN];
+ let (recvd_hdr_raw, recvd_data_raw) = recvd_pkt_raw.split_at_mut(PKT_HEADER_SIZE);
+
+ let mut recvd_packet =
+ // SAFETY: Safe as recvd_hdr_raw and recvd_data_raw are guaranteed to be valid.
+ unsafe { VsockPacket::new(recvd_hdr_raw, Some(recvd_data_raw)).unwrap() };
+
+ assert!(sibling_backend.threads[0]
+ .lock()
+ .unwrap()
+ .thread_backend
+ .recv_raw_pkt(&mut recvd_packet)
+ .is_ok());
+
+ assert_eq!(recvd_packet.type_(), VSOCK_TYPE_STREAM);
+ assert_eq!(recvd_packet.src_cid(), CID);
+ assert_eq!(recvd_packet.dst_cid(), SIBLING_CID);
+ assert_eq!(recvd_packet.dst_port(), SIBLING_LISTENING_PORT);
+ assert_eq!(recvd_packet.op(), VSOCK_OP_RW);
+ assert_eq!(recvd_packet.len(), DATA_LEN as u32);
+
+ assert_eq!(recvd_data_raw[0], 0xCAu8);
+ assert_eq!(recvd_data_raw[1], 0xFEu8);
+ assert_eq!(recvd_data_raw[2], 0xBAu8);
+ assert_eq!(recvd_data_raw[3], 0xBEu8);
+
+ test_dir.close().unwrap();
+ }
+}
diff --git a/src/txbuf.rs b/src/txbuf.rs
new file mode 100644
index 0000000..ef718d7
--- /dev/null
+++ b/src/txbuf.rs
@@ -0,0 +1,233 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+use std::{io::Write, num::Wrapping};
+
+use vm_memory::{bitmap::BitmapSlice, VolatileSlice};
+
+use crate::vhu_vsock::{Error, Result};
+
+#[derive(Debug)]
+pub(crate) struct LocalTxBuf {
+ /// Buffer holding data to be forwarded to a host-side application
+ buf: Vec<u8>,
+ /// Index into buffer from which data can be consumed from the buffer
+ head: Wrapping<u32>,
+ /// Index into buffer from which data can be added to the buffer
+ tail: Wrapping<u32>,
+}
+
+impl LocalTxBuf {
+ /// Create a new instance of LocalTxBuf.
+ pub fn new(buf_size: u32) -> Self {
+ Self {
+ buf: vec![0; buf_size as usize],
+ head: Wrapping(0),
+ tail: Wrapping(0),
+ }
+ }
+
+ /// Get the buffer size
+ pub fn get_buf_size(&self) -> u32 {
+ self.buf.len() as u32
+ }
+
+ /// Check if the buf is empty.
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Add new data to the tx buffer, push all or none.
+ /// Returns LocalTxBufFull error if space not sufficient.
+ pub fn push<B: BitmapSlice>(&mut self, data_buf: &VolatileSlice<B>) -> Result<()> {
+ if self.get_buf_size() as usize - self.len() < data_buf.len() {
+ // Tx buffer is full
+ return Err(Error::LocalTxBufFull);
+ }
+
+ // Get index into buffer at which data can be inserted
+ let tail_idx = self.tail.0 as usize % self.get_buf_size() as usize;
+
+ // Check if we can fit the data buffer between head and end of buffer
+ let len = std::cmp::min(self.get_buf_size() as usize - tail_idx, data_buf.len());
+ let txbuf = &mut self.buf[tail_idx..tail_idx + len];
+ data_buf.copy_to(txbuf);
+
+ // Check if there is more data to be wrapped around
+ if len < data_buf.len() {
+ let remain_txbuf = &mut self.buf[..(data_buf.len() - len)];
+ data_buf.copy_to(remain_txbuf);
+ }
+
+ // Increment tail by the amount of data that has been added to the buffer
+ self.tail += Wrapping(data_buf.len() as u32);
+
+ Ok(())
+ }
+
+ /// Flush buf data to stream.
+ pub fn flush_to<S: Write>(&mut self, stream: &mut S) -> Result<usize> {
+ if self.is_empty() {
+ // No data to be flushed
+ return Ok(0);
+ }
+
+ // Get index into buffer from which data can be read
+ let head_idx = self.head.0 as usize % self.get_buf_size() as usize;
+
+ // First write from head to end of buffer
+ let len = std::cmp::min(self.get_buf_size() as usize - head_idx, self.len());
+ let written = stream
+ .write(&self.buf[head_idx..(head_idx + len)])
+ .map_err(Error::LocalTxBufFlush)?;
+
+ // Increment head by amount of data that has been flushed to the stream
+ self.head += Wrapping(written as u32);
+
+ // If written length is less than the expected length we can try again in the future
+ if written < len {
+ return Ok(written);
+ }
+
+ // The head index has wrapped around the end of the buffer, we call self again
+ Ok(written + self.flush_to(stream).unwrap_or(0))
+ }
+
+ /// Return amount of data in the buffer.
+ fn len(&self) -> usize {
+ (self.tail - self.head).0 as usize
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
+
+ #[test]
+ fn test_txbuf_len() {
+ let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE);
+
+ // Zero length tx buf
+ assert_eq!(loc_tx_buf.len(), 0);
+
+ // finite length tx buf
+ loc_tx_buf.head = Wrapping(0);
+ loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE);
+ assert_eq!(loc_tx_buf.len(), CONN_TX_BUF_SIZE as usize);
+
+ loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE / 2);
+ assert_eq!(loc_tx_buf.len(), (CONN_TX_BUF_SIZE / 2) as usize);
+
+ loc_tx_buf.head = Wrapping(256);
+ assert_eq!(loc_tx_buf.len(), 32512);
+ }
+
+ #[test]
+ fn test_txbuf_is_empty() {
+ let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE);
+
+ // empty tx buffer
+ assert!(loc_tx_buf.is_empty());
+
+ // non empty tx buffer
+ loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE);
+ assert!(!loc_tx_buf.is_empty());
+ }
+
+ #[test]
+ fn test_txbuf_push() {
+ let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE);
+ let mut buf = [0; CONN_TX_BUF_SIZE as usize];
+ // SAFETY: Safe as the buffer is guaranteed to be valid here.
+ let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
+
+ // push data into empty tx buffer
+ let res_push = loc_tx_buf.push(&data);
+ assert!(res_push.is_ok());
+ assert_eq!(loc_tx_buf.head, Wrapping(0));
+ assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE));
+
+ // push data into full tx buffer
+ let res_push = loc_tx_buf.push(&data);
+ assert!(res_push.is_err());
+
+ // head and tail wrap at full
+ loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE);
+ let res_push = loc_tx_buf.push(&data);
+ assert!(res_push.is_ok());
+ assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE * 2));
+
+ // only tail wraps at full
+ let mut buf = vec![1; 4];
+ // SAFETY: Safe as the buffer is guaranteed to be valid here.
+ let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
+ let mut cmp_data = vec![1; 4];
+ cmp_data.append(&mut vec![0; (CONN_TX_BUF_SIZE - 4) as usize]);
+ loc_tx_buf.head = Wrapping(4);
+ loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE);
+ let res_push = loc_tx_buf.push(&data);
+ assert!(res_push.is_ok());
+ assert_eq!(loc_tx_buf.head, Wrapping(4));
+ assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE + 4));
+ assert_eq!(loc_tx_buf.buf, cmp_data);
+ }
+
+ #[test]
+ fn test_txbuf_flush_to() {
+ let mut loc_tx_buf = LocalTxBuf::new(CONN_TX_BUF_SIZE);
+
+ // data to be flushed
+ let mut buf = vec![1; CONN_TX_BUF_SIZE as usize];
+ // SAFETY: Safe as the buffer is guaranteed to be valid here.
+ let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
+
+ // target to which data is flushed
+ let mut cmp_vec = Vec::with_capacity(data.len());
+
+ // flush no data
+ let res_flush = loc_tx_buf.flush_to(&mut cmp_vec);
+ assert!(res_flush.is_ok());
+ assert_eq!(res_flush.unwrap(), 0);
+
+ // flush data of CONN_TX_BUF_SIZE amount
+ let res_push = loc_tx_buf.push(&data);
+ assert!(res_push.is_ok());
+ let res_flush = loc_tx_buf.flush_to(&mut cmp_vec);
+ if let Ok(n) = res_flush {
+ assert_eq!(loc_tx_buf.head, Wrapping(n as u32));
+ assert_eq!(loc_tx_buf.tail, Wrapping(CONN_TX_BUF_SIZE));
+ assert_eq!(n, cmp_vec.len());
+ assert_eq!(cmp_vec, buf[..n]);
+ }
+
+ // wrapping head flush
+ let mut buf = vec![0; (CONN_TX_BUF_SIZE / 2) as usize];
+ buf.append(&mut vec![1; (CONN_TX_BUF_SIZE / 2) as usize]);
+ // SAFETY: Safe as the buffer is guaranteed to be valid here.
+ let data = unsafe { VolatileSlice::new(buf.as_mut_ptr(), buf.len()) };
+
+ loc_tx_buf.head = Wrapping(0);
+ loc_tx_buf.tail = Wrapping(0);
+ let res_push = loc_tx_buf.push(&data);
+ assert!(res_push.is_ok());
+ cmp_vec.clear();
+ loc_tx_buf.head = Wrapping(CONN_TX_BUF_SIZE / 2);
+ loc_tx_buf.tail = Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2));
+ let res_flush = loc_tx_buf.flush_to(&mut cmp_vec);
+ if let Ok(n) = res_flush {
+ assert_eq!(
+ loc_tx_buf.head,
+ Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2))
+ );
+ assert_eq!(
+ loc_tx_buf.tail,
+ Wrapping(CONN_TX_BUF_SIZE + (CONN_TX_BUF_SIZE / 2))
+ );
+ assert_eq!(n, cmp_vec.len());
+ let mut data = vec![1; (CONN_TX_BUF_SIZE / 2) as usize];
+ data.append(&mut vec![0; (CONN_TX_BUF_SIZE / 2) as usize]);
+ assert_eq!(cmp_vec, data[..n]);
+ }
+ }
+}
diff --git a/src/vhu_vsock.rs b/src/vhu_vsock.rs
new file mode 100644
index 0000000..bb59e05
--- /dev/null
+++ b/src/vhu_vsock.rs
@@ -0,0 +1,518 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+use std::{
+ collections::HashMap,
+ io::{self, Result as IoResult},
+ sync::{Arc, Mutex, RwLock},
+ u16, u32, u64, u8,
+};
+
+use log::warn;
+use thiserror::Error as ThisError;
+use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
+use vhost_user_backend::{VhostUserBackend, VringRwLock};
+use virtio_bindings::bindings::{
+ virtio_config::VIRTIO_F_NOTIFY_ON_EMPTY, virtio_config::VIRTIO_F_VERSION_1,
+ virtio_ring::VIRTIO_RING_F_EVENT_IDX,
+};
+use vm_memory::{ByteValued, GuestMemoryAtomic, GuestMemoryMmap, Le64};
+use vmm_sys_util::{
+ epoll::EventSet,
+ eventfd::{EventFd, EFD_NONBLOCK},
+};
+
+use crate::thread_backend::RawPktsQ;
+use crate::vhu_vsock_thread::*;
+
+pub(crate) type CidMap = HashMap<u64, (Arc<RwLock<RawPktsQ>>, EventFd)>;
+
+const NUM_QUEUES: usize = 2;
+const QUEUE_SIZE: usize = 256;
+
+// New descriptors pending on the rx queue
+const RX_QUEUE_EVENT: u16 = 0;
+// New descriptors are pending on the tx queue.
+const TX_QUEUE_EVENT: u16 = 1;
+// New descriptors are pending on the event queue.
+const EVT_QUEUE_EVENT: u16 = 2;
+
+/// Notification coming from the backend.
+pub(crate) const BACKEND_EVENT: u16 = 3;
+
+/// Notification coming from the sibling VM.
+pub(crate) const SIBLING_VM_EVENT: u16 = 4;
+
+/// CID of the host
+pub(crate) const VSOCK_HOST_CID: u64 = 2;
+
+/// Connection oriented packet
+pub(crate) const VSOCK_TYPE_STREAM: u16 = 1;
+
+/// Vsock packet operation ID
+
+/// Connection request
+pub(crate) const VSOCK_OP_REQUEST: u16 = 1;
+/// Connection response
+pub(crate) const VSOCK_OP_RESPONSE: u16 = 2;
+/// Connection reset
+pub(crate) const VSOCK_OP_RST: u16 = 3;
+/// Shutdown connection
+pub(crate) const VSOCK_OP_SHUTDOWN: u16 = 4;
+/// Data read/write
+pub(crate) const VSOCK_OP_RW: u16 = 5;
+/// Flow control credit update
+pub(crate) const VSOCK_OP_CREDIT_UPDATE: u16 = 6;
+/// Flow control credit request
+pub(crate) const VSOCK_OP_CREDIT_REQUEST: u16 = 7;
+
+/// Vsock packet flags
+
+/// VSOCK_OP_SHUTDOWN: Packet sender will receive no more data
+pub(crate) const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1;
+/// VSOCK_OP_SHUTDOWN: Packet sender will send no more data
+pub(crate) const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2;
+
+// Queue mask to select vrings.
+const QUEUE_MASK: u64 = 0b11;
+
+pub(crate) type Result<T> = std::result::Result<T, Error>;
+
+/// Custom error types
+#[derive(Debug, ThisError)]
+pub(crate) enum Error {
+ #[error("Failed to handle event other than EPOLLIN event")]
+ HandleEventNotEpollIn,
+ #[error("Failed to handle unknown event")]
+ HandleUnknownEvent,
+ #[error("Failed to accept new local socket connection")]
+ UnixAccept(std::io::Error),
+ #[error("Failed to bind a unix stream")]
+ UnixBind(std::io::Error),
+ #[error("Failed to create an epoll fd")]
+ EpollFdCreate(std::io::Error),
+ #[error("Failed to add to epoll")]
+ EpollAdd(std::io::Error),
+ #[error("Failed to modify evset associated with epoll")]
+ EpollModify(std::io::Error),
+ #[error("Failed to read from unix stream")]
+ UnixRead(std::io::Error),
+ #[error("Failed to convert byte array to string")]
+ ConvertFromUtf8(std::str::Utf8Error),
+ #[error("Invalid vsock connection request from host")]
+ InvalidPortRequest,
+ #[error("Unable to convert string to integer")]
+ ParseInteger(std::num::ParseIntError),
+ #[error("Error reading stream port")]
+ ReadStreamPort(Box<Error>),
+ #[error("Failed to de-register fd from epoll")]
+ EpollRemove(std::io::Error),
+ #[error("No memory configured")]
+ NoMemoryConfigured,
+ #[error("Unable to iterate queue")]
+ IterateQueue,
+ #[error("No rx request available")]
+ NoRequestRx,
+ #[error("Unable to create thread pool")]
+ CreateThreadPool(std::io::Error),
+ #[error("Packet missing data buffer")]
+ PktBufMissing,
+ #[error("Failed to connect to unix socket")]
+ UnixConnect(std::io::Error),
+ #[error("Unable to write to unix stream")]
+ UnixWrite,
+ #[error("Unable to push data to local tx buffer")]
+ LocalTxBufFull,
+ #[error("Unable to flush data from local tx buffer")]
+ LocalTxBufFlush(std::io::Error),
+ #[error("No free local port available for new host inititated connection")]
+ NoFreeLocalPort,
+ #[error("Backend rx queue is empty")]
+ EmptyBackendRxQ,
+ #[error("Failed to create an EventFd")]
+ EventFdCreate(std::io::Error),
+ #[error("Raw vsock packets queue is empty")]
+ EmptyRawPktsQueue,
+}
+
+impl std::convert::From<Error> for std::io::Error {
+ fn from(e: Error) -> Self {
+ std::io::Error::new(io::ErrorKind::Other, e)
+ }
+}
+
+#[derive(Debug, Clone)]
+/// This structure is the public API through which an external program
+/// is allowed to configure the backend.
+pub(crate) struct VsockConfig {
+ guest_cid: u64,
+ socket: String,
+ uds_path: String,
+ tx_buffer_size: u32,
+}
+
+impl VsockConfig {
+ /// Create a new instance of the VsockConfig struct, containing the
+ /// parameters to be fed into the vsock-backend server.
+ pub fn new(guest_cid: u64, socket: String, uds_path: String, tx_buffer_size: u32) -> Self {
+ Self {
+ guest_cid,
+ socket,
+ uds_path,
+ tx_buffer_size,
+ }
+ }
+
+ /// Return the guest's current CID.
+ pub fn get_guest_cid(&self) -> u64 {
+ self.guest_cid
+ }
+
+ /// Return the path of the unix domain socket which is listening to
+ /// requests from the host side application.
+ pub fn get_uds_path(&self) -> String {
+ String::from(&self.uds_path)
+ }
+
+ /// Return the path of the unix domain socket which is listening to
+ /// requests from the guest.
+ pub fn get_socket_path(&self) -> String {
+ String::from(&self.socket)
+ }
+
+ pub fn get_tx_buffer_size(&self) -> u32 {
+ self.tx_buffer_size
+ }
+}
+
+/// A local port and peer port pair used to retrieve
+/// the corresponding connection.
+#[derive(Hash, PartialEq, Eq, Debug, Clone)]
+pub(crate) struct ConnMapKey {
+ local_port: u32,
+ peer_port: u32,
+}
+
+impl ConnMapKey {
+ pub fn new(local_port: u32, peer_port: u32) -> Self {
+ Self {
+ local_port,
+ peer_port,
+ }
+ }
+}
+
+/// Virtio Vsock Configuration
+#[derive(Copy, Clone, Debug, Default, PartialEq)]
+#[repr(C)]
+struct VirtioVsockConfig {
+ pub guest_cid: Le64,
+}
+
+// SAFETY: The layout of the structure is fixed and can be initialized by
+// reading its content from byte array.
+unsafe impl ByteValued for VirtioVsockConfig {}
+
+pub(crate) struct VhostUserVsockBackend {
+ config: VirtioVsockConfig,
+ pub threads: Vec<Mutex<VhostUserVsockThread>>,
+ queues_per_thread: Vec<u64>,
+ pub exit_event: EventFd,
+}
+
+impl VhostUserVsockBackend {
+ pub fn new(config: VsockConfig, cid_map: Arc<RwLock<CidMap>>) -> Result<Self> {
+ let thread = Mutex::new(VhostUserVsockThread::new(
+ config.get_uds_path(),
+ config.get_guest_cid(),
+ config.get_tx_buffer_size(),
+ cid_map,
+ )?);
+ let queues_per_thread = vec![QUEUE_MASK];
+
+ Ok(Self {
+ config: VirtioVsockConfig {
+ guest_cid: From::from(config.get_guest_cid()),
+ },
+ threads: vec![thread],
+ queues_per_thread,
+ exit_event: EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?,
+ })
+ }
+}
+
+impl VhostUserBackend<VringRwLock, ()> for VhostUserVsockBackend {
+ fn num_queues(&self) -> usize {
+ NUM_QUEUES
+ }
+
+ fn max_queue_size(&self) -> usize {
+ QUEUE_SIZE
+ }
+
+ fn features(&self) -> u64 {
+ 1 << VIRTIO_F_VERSION_1
+ | 1 << VIRTIO_F_NOTIFY_ON_EMPTY
+ | 1 << VIRTIO_RING_F_EVENT_IDX
+ | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
+ }
+
+ fn protocol_features(&self) -> VhostUserProtocolFeatures {
+ VhostUserProtocolFeatures::CONFIG
+ }
+
+ fn set_event_idx(&self, enabled: bool) {
+ for thread in self.threads.iter() {
+ thread.lock().unwrap().event_idx = enabled;
+ }
+ }
+
+ fn update_memory(&self, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>) -> IoResult<()> {
+ for thread in self.threads.iter() {
+ thread.lock().unwrap().mem = Some(atomic_mem.clone());
+ }
+ Ok(())
+ }
+
+ fn handle_event(
+ &self,
+ device_event: u16,
+ evset: EventSet,
+ vrings: &[VringRwLock],
+ thread_id: usize,
+ ) -> IoResult<bool> {
+ let vring_rx = &vrings[0];
+ let vring_tx = &vrings[1];
+
+ if evset != EventSet::IN {
+ return Err(Error::HandleEventNotEpollIn.into());
+ }
+
+ let mut thread = self.threads[thread_id].lock().unwrap();
+ let evt_idx = thread.event_idx;
+
+ match device_event {
+ RX_QUEUE_EVENT => {}
+ TX_QUEUE_EVENT => {
+ thread.process_tx(vring_tx, evt_idx)?;
+ }
+ EVT_QUEUE_EVENT => {}
+ BACKEND_EVENT => {
+ thread.process_backend_evt(evset);
+ if let Err(e) = thread.process_tx(vring_tx, evt_idx) {
+ match e {
+ Error::NoMemoryConfigured => {
+ warn!("Received a backend event before vring initialization")
+ }
+ _ => return Err(e.into()),
+ }
+ }
+ }
+ SIBLING_VM_EVENT => {
+ let _ = thread.sibling_event_fd.read();
+ thread.process_raw_pkts(vring_rx, evt_idx)?;
+ return Ok(false);
+ }
+ _ => {
+ return Err(Error::HandleUnknownEvent.into());
+ }
+ }
+
+ if device_event != EVT_QUEUE_EVENT {
+ thread.process_rx(vring_rx, evt_idx)?;
+ }
+
+ Ok(false)
+ }
+
+ fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
+ let offset = offset as usize;
+ let size = size as usize;
+
+ let buf = self.config.as_slice();
+
+ if offset + size > buf.len() {
+ return Vec::new();
+ }
+
+ buf[offset..offset + size].to_vec()
+ }
+
+ fn queues_per_thread(&self) -> Vec<u64> {
+ self.queues_per_thread.clone()
+ }
+
+ fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
+ self.exit_event.try_clone().ok()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::convert::TryInto;
+ use tempfile::tempdir;
+ use vhost_user_backend::VringT;
+ use vm_memory::GuestAddress;
+
+ const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
+
+ #[test]
+ fn test_vsock_backend() {
+ const CID: u64 = 3;
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let vhost_socket_path = test_dir
+ .path()
+ .join("test_vsock_backend.socket")
+ .display()
+ .to_string();
+ let vsock_socket_path = test_dir
+ .path()
+ .join("test_vsock_backend.vsock")
+ .display()
+ .to_string();
+
+ let config = VsockConfig::new(
+ CID,
+ vhost_socket_path.to_string(),
+ vsock_socket_path.to_string(),
+ CONN_TX_BUF_SIZE,
+ );
+
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let backend = VhostUserVsockBackend::new(config, cid_map);
+
+ assert!(backend.is_ok());
+ let backend = backend.unwrap();
+
+ assert_eq!(backend.num_queues(), NUM_QUEUES);
+ assert_eq!(backend.max_queue_size(), QUEUE_SIZE);
+ assert_ne!(backend.features(), 0);
+ assert!(!backend.protocol_features().is_empty());
+ backend.set_event_idx(false);
+
+ let mem = GuestMemoryAtomic::new(
+ GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
+ );
+ let vrings = [
+ VringRwLock::new(mem.clone(), 0x1000).unwrap(),
+ VringRwLock::new(mem.clone(), 0x2000).unwrap(),
+ ];
+ vrings[0].set_queue_info(0x100, 0x200, 0x300).unwrap();
+ vrings[0].set_queue_ready(true);
+ vrings[1].set_queue_info(0x1100, 0x1200, 0x1300).unwrap();
+ vrings[1].set_queue_ready(true);
+
+ assert!(backend.update_memory(mem).is_ok());
+
+ let queues_per_thread = backend.queues_per_thread();
+ assert_eq!(queues_per_thread.len(), 1);
+ assert_eq!(queues_per_thread[0], 0b11);
+
+ let config = backend.get_config(0, 8);
+ assert_eq!(config.len(), 8);
+ let cid = u64::from_le_bytes(config.try_into().unwrap());
+ assert_eq!(cid, CID);
+
+ let exit = backend.exit_event(0);
+ assert!(exit.is_some());
+ exit.unwrap().write(1).unwrap();
+
+ let ret = backend.handle_event(RX_QUEUE_EVENT, EventSet::IN, &vrings, 0);
+ assert!(ret.is_ok());
+ assert!(!ret.unwrap());
+
+ let ret = backend.handle_event(TX_QUEUE_EVENT, EventSet::IN, &vrings, 0);
+ assert!(ret.is_ok());
+ assert!(!ret.unwrap());
+
+ let ret = backend.handle_event(EVT_QUEUE_EVENT, EventSet::IN, &vrings, 0);
+ assert!(ret.is_ok());
+ assert!(!ret.unwrap());
+
+ let ret = backend.handle_event(BACKEND_EVENT, EventSet::IN, &vrings, 0);
+ assert!(ret.is_ok());
+ assert!(!ret.unwrap());
+
+ // cleanup
+ let _ = std::fs::remove_file(vhost_socket_path);
+ let _ = std::fs::remove_file(vsock_socket_path);
+
+ test_dir.close().unwrap();
+ }
+
+ #[test]
+ fn test_vsock_backend_failures() {
+ const CID: u64 = 3;
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let vhost_socket_path = test_dir
+ .path()
+ .join("test_vsock_backend_failures.socket")
+ .display()
+ .to_string();
+ let vsock_socket_path = test_dir
+ .path()
+ .join("test_vsock_backend_failures.vsock")
+ .display()
+ .to_string();
+
+ let config = VsockConfig::new(
+ CID,
+ "/sys/not_allowed.socket".to_string(),
+ "/sys/not_allowed.vsock".to_string(),
+ CONN_TX_BUF_SIZE,
+ );
+
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let backend = VhostUserVsockBackend::new(config, cid_map.clone());
+ assert!(backend.is_err());
+
+ let config = VsockConfig::new(
+ CID,
+ vhost_socket_path.to_string(),
+ vsock_socket_path.to_string(),
+ CONN_TX_BUF_SIZE,
+ );
+
+ let backend = VhostUserVsockBackend::new(config, cid_map).unwrap();
+ let mem = GuestMemoryAtomic::new(
+ GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
+ );
+ let vrings = [
+ VringRwLock::new(mem.clone(), 0x1000).unwrap(),
+ VringRwLock::new(mem.clone(), 0x2000).unwrap(),
+ ];
+
+ backend.update_memory(mem).unwrap();
+
+ // reading out of the config space, expecting empty config
+ let config = backend.get_config(2, 8);
+ assert_eq!(config.len(), 0);
+
+ assert_eq!(
+ backend
+ .handle_event(RX_QUEUE_EVENT, EventSet::OUT, &vrings, 0)
+ .unwrap_err()
+ .to_string(),
+ Error::HandleEventNotEpollIn.to_string()
+ );
+ assert_eq!(
+ backend
+ .handle_event(SIBLING_VM_EVENT + 1, EventSet::IN, &vrings, 0)
+ .unwrap_err()
+ .to_string(),
+ Error::HandleUnknownEvent.to_string()
+ );
+
+ // cleanup
+ let _ = std::fs::remove_file(vhost_socket_path);
+ let _ = std::fs::remove_file(vsock_socket_path);
+
+ test_dir.close().unwrap();
+ }
+}
diff --git a/src/vhu_vsock_thread.rs b/src/vhu_vsock_thread.rs
new file mode 100644
index 0000000..726e2b6
--- /dev/null
+++ b/src/vhu_vsock_thread.rs
@@ -0,0 +1,837 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+use std::{
+ fs::File,
+ io,
+ io::Read,
+ num::Wrapping,
+ ops::Deref,
+ os::unix::{
+ net::{UnixListener, UnixStream},
+ prelude::{AsRawFd, FromRawFd, RawFd},
+ },
+ sync::{Arc, RwLock},
+};
+
+use futures::executor::{ThreadPool, ThreadPoolBuilder};
+use log::warn;
+use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
+use virtio_queue::QueueOwnedT;
+use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
+use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
+use vmm_sys_util::{
+ epoll::EventSet,
+ eventfd::{EventFd, EFD_NONBLOCK},
+};
+
+use crate::{
+ rxops::*,
+ thread_backend::*,
+ vhu_vsock::{
+ CidMap, ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, SIBLING_VM_EVENT,
+ VSOCK_HOST_CID,
+ },
+ vsock_conn::*,
+};
+
+type ArcVhostBknd = Arc<VhostUserVsockBackend>;
+
+enum RxQueueType {
+ Standard,
+ RawPkts,
+}
+pub(crate) struct VhostUserVsockThread {
+ /// Guest memory map.
+ pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
+ /// VIRTIO_RING_F_EVENT_IDX.
+ pub event_idx: bool,
+ /// Host socket raw file descriptor.
+ host_sock: RawFd,
+ /// Host socket path
+ host_sock_path: String,
+ /// Listener listening for new connections on the host.
+ host_listener: UnixListener,
+ /// Instance of VringWorker.
+ vring_worker: Option<Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>>,
+ /// epoll fd to which new host connections are added.
+ epoll_file: File,
+ /// VsockThreadBackend instance.
+ pub thread_backend: VsockThreadBackend,
+ /// CID of the guest.
+ guest_cid: u64,
+ /// Thread pool to handle event idx.
+ pool: ThreadPool,
+ /// host side port on which application listens.
+ local_port: Wrapping<u32>,
+ /// The tx buffer size
+ tx_buffer_size: u32,
+ /// EventFd to notify this thread for custom events. Currently used to notify
+ /// this thread to process raw vsock packets sent from a sibling VM.
+ pub sibling_event_fd: EventFd,
+ /// Keeps track of which RX queue was processed first in the last iteration.
+ /// Used to alternate between the RX queues to prevent the starvation of one by the other.
+ last_processed: RxQueueType,
+}
+
+impl VhostUserVsockThread {
+ /// Create a new instance of VhostUserVsockThread.
+ pub fn new(
+ uds_path: String,
+ guest_cid: u64,
+ tx_buffer_size: u32,
+ cid_map: Arc<RwLock<CidMap>>,
+ ) -> Result<Self> {
+ // TODO: better error handling, maybe add a param to force the unlink
+ let _ = std::fs::remove_file(uds_path.clone());
+ let host_sock = UnixListener::bind(&uds_path)
+ .and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
+ .map_err(Error::UnixBind)?;
+
+ let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?;
+ // SAFETY: Safe as the fd is guaranteed to be valid here.
+ let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
+
+ let host_raw_fd = host_sock.as_raw_fd();
+
+ let sibling_event_fd = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
+
+ let thread_backend = VsockThreadBackend::new(
+ uds_path.clone(),
+ epoll_fd,
+ guest_cid,
+ tx_buffer_size,
+ cid_map.clone(),
+ );
+
+ cid_map.write().unwrap().insert(
+ guest_cid,
+ (
+ thread_backend.raw_pkts_queue.clone(),
+ sibling_event_fd.try_clone().unwrap(),
+ ),
+ );
+
+ let thread = VhostUserVsockThread {
+ mem: None,
+ event_idx: false,
+ host_sock: host_sock.as_raw_fd(),
+ host_sock_path: uds_path,
+ host_listener: host_sock,
+ vring_worker: None,
+ epoll_file,
+ thread_backend,
+ guest_cid,
+ pool: ThreadPoolBuilder::new()
+ .pool_size(1)
+ .create()
+ .map_err(Error::CreateThreadPool)?,
+ local_port: Wrapping(0),
+ tx_buffer_size,
+ sibling_event_fd,
+ last_processed: RxQueueType::Standard,
+ };
+
+ VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?;
+
+ Ok(thread)
+ }
+
+ /// Register a file with an epoll to listen for events in evset.
+ pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
+ epoll::ctl(
+ epoll_fd,
+ epoll::ControlOptions::EPOLL_CTL_ADD,
+ fd,
+ epoll::Event::new(evset, fd as u64),
+ )
+ .map_err(Error::EpollAdd)?;
+
+ Ok(())
+ }
+
+ /// Remove a file from the epoll.
+ pub fn epoll_unregister(epoll_fd: RawFd, fd: RawFd) -> Result<()> {
+ epoll::ctl(
+ epoll_fd,
+ epoll::ControlOptions::EPOLL_CTL_DEL,
+ fd,
+ epoll::Event::new(epoll::Events::empty(), 0),
+ )
+ .map_err(Error::EpollRemove)?;
+
+ Ok(())
+ }
+
+ /// Modify the events we listen to for the fd in the epoll.
+ pub fn epoll_modify(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
+ epoll::ctl(
+ epoll_fd,
+ epoll::ControlOptions::EPOLL_CTL_MOD,
+ fd,
+ epoll::Event::new(evset, fd as u64),
+ )
+ .map_err(Error::EpollModify)?;
+
+ Ok(())
+ }
+
+ /// Return raw file descriptor of the epoll file.
+ fn get_epoll_fd(&self) -> RawFd {
+ self.epoll_file.as_raw_fd()
+ }
+
+ /// Set self's VringWorker.
+ pub fn set_vring_worker(
+ &mut self,
+ vring_worker: Option<Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>>,
+ ) {
+ self.vring_worker = vring_worker;
+ self.vring_worker
+ .as_ref()
+ .unwrap()
+ .register_listener(self.get_epoll_fd(), EventSet::IN, u64::from(BACKEND_EVENT))
+ .unwrap();
+ self.vring_worker
+ .as_ref()
+ .unwrap()
+ .register_listener(
+ self.sibling_event_fd.as_raw_fd(),
+ EventSet::IN,
+ u64::from(SIBLING_VM_EVENT),
+ )
+ .unwrap();
+ }
+
+ /// Process a BACKEND_EVENT received by VhostUserVsockBackend.
+ pub fn process_backend_evt(&mut self, _evset: EventSet) {
+ let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32];
+ 'epoll: loop {
+ match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) {
+ Ok(ev_cnt) => {
+ for evt in epoll_events.iter().take(ev_cnt) {
+ self.handle_event(
+ evt.data as RawFd,
+ epoll::Events::from_bits(evt.events).unwrap(),
+ );
+ }
+ }
+ Err(e) => {
+ if e.kind() == io::ErrorKind::Interrupted {
+ continue;
+ }
+ warn!("failed to consume new epoll event");
+ }
+ }
+ break 'epoll;
+ }
+ }
+
+ /// Handle a BACKEND_EVENT by either accepting a new connection or
+ /// forwarding a request to the appropriate connection object.
+ fn handle_event(&mut self, fd: RawFd, evset: epoll::Events) {
+ if fd == self.host_sock {
+ // This is a new connection initiated by an application running on the host
+ let conn = self.host_listener.accept().map_err(Error::UnixAccept);
+ if self.mem.is_some() {
+ conn.and_then(|(stream, _)| {
+ stream
+ .set_nonblocking(true)
+ .map(|_| stream)
+ .map_err(Error::UnixAccept)
+ })
+ .and_then(|stream| self.add_stream_listener(stream))
+ .unwrap_or_else(|err| {
+ warn!("Unable to accept new local connection: {:?}", err);
+ });
+ } else {
+ // If we aren't ready to process requests, accept and immediately close
+ // the connection.
+ conn.map(drop).unwrap_or_else(|err| {
+ warn!("Error closing an incoming connection: {:?}", err);
+ });
+ }
+ } else {
+ // Check if the stream represented by fd has already established a
+ // connection with the application running in the guest
+ if let std::collections::hash_map::Entry::Vacant(_) =
+ self.thread_backend.listener_map.entry(fd)
+ {
+ // New connection from the host
+ if evset.bits() != epoll::Events::EPOLLIN.bits() {
+ // Has to be EPOLLIN as it was not connected previously
+ return;
+ }
+ let mut unix_stream = match self.thread_backend.stream_map.remove(&fd) {
+ Some(uds) => uds,
+ None => {
+ warn!("Error while searching fd in the stream map");
+ return;
+ }
+ };
+
+ // Local peer is sending a "connect PORT\n" command
+ let peer_port = match Self::read_local_stream_port(&mut unix_stream) {
+ Ok(port) => port,
+ Err(err) => {
+ warn!("Error while parsing \"connect PORT\n\" command: {:?}", err);
+ return;
+ }
+ };
+
+ // Allocate a local port number
+ let local_port = match self.allocate_local_port() {
+ Ok(lp) => lp,
+ Err(err) => {
+ warn!("Error while allocating local port: {:?}", err);
+ return;
+ }
+ };
+
+ // Insert the fd into the backend's maps
+ self.thread_backend
+ .listener_map
+ .insert(fd, ConnMapKey::new(local_port, peer_port));
+
+ // Create a new connection object an enqueue a connection request
+ // packet to be sent to the guest
+ let conn_map_key = ConnMapKey::new(local_port, peer_port);
+ let mut new_conn = VsockConnection::new_local_init(
+ unix_stream,
+ VSOCK_HOST_CID,
+ local_port,
+ self.guest_cid,
+ peer_port,
+ self.get_epoll_fd(),
+ self.tx_buffer_size,
+ );
+ new_conn.rx_queue.enqueue(RxOps::Request);
+ new_conn.set_peer_port(peer_port);
+
+ // Add connection object into the backend's maps
+ self.thread_backend.conn_map.insert(conn_map_key, new_conn);
+
+ self.thread_backend
+ .backend_rxq
+ .push_back(ConnMapKey::new(local_port, peer_port));
+
+ // Re-register the fd to listen for EPOLLIN and EPOLLOUT events
+ Self::epoll_modify(
+ self.get_epoll_fd(),
+ fd,
+ epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
+ )
+ .unwrap();
+ } else {
+ // Previously connected connection
+ let key = self.thread_backend.listener_map.get(&fd).unwrap();
+ let conn = self.thread_backend.conn_map.get_mut(key).unwrap();
+
+ if evset.bits() == epoll::Events::EPOLLOUT.bits() {
+ // Flush any remaining data from the tx buffer
+ match conn.tx_buf.flush_to(&mut conn.stream) {
+ Ok(cnt) => {
+ if cnt > 0 {
+ conn.fwd_cnt += Wrapping(cnt as u32);
+ conn.rx_queue.enqueue(RxOps::CreditUpdate);
+ }
+ self.thread_backend
+ .backend_rxq
+ .push_back(ConnMapKey::new(conn.local_port, conn.peer_port));
+ }
+ Err(e) => {
+ dbg!("Error: {:?}", e);
+ }
+ }
+ return;
+ }
+
+ // Unregister stream from the epoll, register when connection is
+ // established with the guest
+ Self::epoll_unregister(self.epoll_file.as_raw_fd(), fd).unwrap();
+
+ // Enqueue a read request
+ conn.rx_queue.enqueue(RxOps::Rw);
+ self.thread_backend
+ .backend_rxq
+ .push_back(ConnMapKey::new(conn.local_port, conn.peer_port));
+ }
+ }
+ }
+
+ /// Allocate a new local port number.
+ fn allocate_local_port(&mut self) -> Result<u32> {
+ // TODO: Improve space efficiency of this operation
+ // TODO: Reuse the conn_map HashMap
+ // TODO: Test this.
+ let mut alloc_local_port = self.local_port.0;
+ loop {
+ if !self
+ .thread_backend
+ .local_port_set
+ .contains(&alloc_local_port)
+ {
+ // The port set doesn't contain the newly allocated port number.
+ self.local_port = Wrapping(alloc_local_port + 1);
+ self.thread_backend.local_port_set.insert(alloc_local_port);
+ return Ok(alloc_local_port);
+ } else {
+ if alloc_local_port == self.local_port.0 {
+ // We have exhausted our search and wrapped back to the current port number
+ return Err(Error::NoFreeLocalPort);
+ }
+ alloc_local_port += 1;
+ }
+ }
+ }
+
+ /// Read `CONNECT PORT_NUM\n` from the connected stream.
+ fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> {
+ let mut buf = [0u8; 32];
+
+ // Minimum number of bytes we should be able to read
+ // Corresponds to 'CONNECT 0\n'
+ const MIN_READ_LEN: usize = 10;
+
+ // Read in the minimum number of bytes we can read
+ stream
+ .read_exact(&mut buf[..MIN_READ_LEN])
+ .map_err(Error::UnixRead)?;
+
+ let mut read_len = MIN_READ_LEN;
+ while buf[read_len - 1] != b'\n' && read_len < buf.len() {
+ stream
+ .read_exact(&mut buf[read_len..read_len + 1])
+ .map_err(Error::UnixRead)?;
+ read_len += 1;
+ }
+
+ let mut word_iter = std::str::from_utf8(&buf[..read_len])
+ .map_err(Error::ConvertFromUtf8)?
+ .split_whitespace();
+
+ word_iter
+ .next()
+ .ok_or(Error::InvalidPortRequest)
+ .and_then(|word| {
+ if word.to_lowercase() == "connect" {
+ Ok(())
+ } else {
+ Err(Error::InvalidPortRequest)
+ }
+ })
+ .and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
+ .and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger))
+ .map_err(|e| Error::ReadStreamPort(Box::new(e)))
+ }
+
+ /// Add a stream to epoll to listen for EPOLLIN events.
+ fn add_stream_listener(&mut self, stream: UnixStream) -> Result<()> {
+ let stream_fd = stream.as_raw_fd();
+ self.thread_backend.stream_map.insert(stream_fd, stream);
+ VhostUserVsockThread::epoll_register(
+ self.get_epoll_fd(),
+ stream_fd,
+ epoll::Events::EPOLLIN,
+ )?;
+
+ Ok(())
+ }
+
+ /// Iterate over the rx queue and process rx requests.
+ fn process_rx_queue(
+ &mut self,
+ vring: &VringRwLock,
+ rx_queue_type: RxQueueType,
+ ) -> Result<bool> {
+ let mut used_any = false;
+ let atomic_mem = match &self.mem {
+ Some(m) => m,
+ None => return Err(Error::NoMemoryConfigured),
+ };
+
+ let mut vring_mut = vring.get_mut();
+
+ let queue = vring_mut.get_queue_mut();
+
+ while let Some(mut avail_desc) = queue
+ .iter(atomic_mem.memory())
+ .map_err(|_| Error::IterateQueue)?
+ .next()
+ {
+ used_any = true;
+ let mem = atomic_mem.clone().memory();
+
+ let head_idx = avail_desc.head_index();
+ let used_len = match VsockPacket::from_rx_virtq_chain(
+ mem.deref(),
+ &mut avail_desc,
+ self.tx_buffer_size,
+ ) {
+ Ok(mut pkt) => {
+ let recv_result = match rx_queue_type {
+ RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt),
+ RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt),
+ };
+
+ if recv_result.is_ok() {
+ PKT_HEADER_SIZE + pkt.len() as usize
+ } else {
+ queue.iter(mem).unwrap().go_to_previous_position();
+ break;
+ }
+ }
+ Err(e) => {
+ warn!("vsock: RX queue error: {:?}", e);
+ 0
+ }
+ };
+
+ let vring = vring.clone();
+ let event_idx = self.event_idx;
+
+ self.pool.spawn_ok(async move {
+ // TODO: Understand why doing the following in the pool works
+ if event_idx {
+ if vring.add_used(head_idx, used_len as u32).is_err() {
+ warn!("Could not return used descriptors to ring");
+ }
+ match vring.needs_notification() {
+ Err(_) => {
+ warn!("Could not check if queue needs to be notified");
+ vring.signal_used_queue().unwrap();
+ }
+ Ok(needs_notification) => {
+ if needs_notification {
+ vring.signal_used_queue().unwrap();
+ }
+ }
+ }
+ } else {
+ if vring.add_used(head_idx, used_len as u32).is_err() {
+ warn!("Could not return used descriptors to ring");
+ }
+ vring.signal_used_queue().unwrap();
+ }
+ });
+
+ match rx_queue_type {
+ RxQueueType::Standard => {
+ if !self.thread_backend.pending_rx() {
+ break;
+ }
+ }
+ RxQueueType::RawPkts => {
+ if !self.thread_backend.pending_raw_pkts() {
+ break;
+ }
+ }
+ }
+ }
+ Ok(used_any)
+ }
+
+ /// Wrapper to process rx queue based on whether event idx is enabled or not.
+ fn process_unix_sockets(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
+ if event_idx {
+ // To properly handle EVENT_IDX we need to keep calling
+ // process_rx_queue until it stops finding new requests
+ // on the queue, as vm-virtio's Queue implementation
+ // only checks avail_index once
+ loop {
+ if !self.thread_backend.pending_rx() {
+ break;
+ }
+ vring.disable_notification().unwrap();
+
+ self.process_rx_queue(vring, RxQueueType::Standard)?;
+ if !vring.enable_notification().unwrap() {
+ break;
+ }
+ }
+ } else {
+ self.process_rx_queue(vring, RxQueueType::Standard)?;
+ }
+ Ok(false)
+ }
+
+ /// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not.
+ pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
+ if event_idx {
+ loop {
+ if !self.thread_backend.pending_raw_pkts() {
+ break;
+ }
+ vring.disable_notification().unwrap();
+
+ self.process_rx_queue(vring, RxQueueType::RawPkts)?;
+ if !vring.enable_notification().unwrap() {
+ break;
+ }
+ }
+ } else {
+ self.process_rx_queue(vring, RxQueueType::RawPkts)?;
+ }
+ Ok(false)
+ }
+
+ pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
+ match self.last_processed {
+ RxQueueType::Standard => {
+ if self.thread_backend.pending_raw_pkts() {
+ self.process_raw_pkts(vring, event_idx)?;
+ self.last_processed = RxQueueType::RawPkts;
+ }
+ if self.thread_backend.pending_rx() {
+ self.process_unix_sockets(vring, event_idx)?;
+ }
+ }
+ RxQueueType::RawPkts => {
+ if self.thread_backend.pending_rx() {
+ self.process_unix_sockets(vring, event_idx)?;
+ self.last_processed = RxQueueType::Standard;
+ }
+ if self.thread_backend.pending_raw_pkts() {
+ self.process_raw_pkts(vring, event_idx)?;
+ }
+ }
+ }
+ Ok(false)
+ }
+
+ /// Process tx queue and send requests to the backend for processing.
+ fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result<bool> {
+ let mut used_any = false;
+
+ let atomic_mem = match &self.mem {
+ Some(m) => m,
+ None => return Err(Error::NoMemoryConfigured),
+ };
+
+ while let Some(mut avail_desc) = vring
+ .get_mut()
+ .get_queue_mut()
+ .iter(atomic_mem.memory())
+ .map_err(|_| Error::IterateQueue)?
+ .next()
+ {
+ used_any = true;
+ let mem = atomic_mem.clone().memory();
+
+ let head_idx = avail_desc.head_index();
+ let pkt = match VsockPacket::from_tx_virtq_chain(
+ mem.deref(),
+ &mut avail_desc,
+ self.tx_buffer_size,
+ ) {
+ Ok(pkt) => pkt,
+ Err(e) => {
+ dbg!("vsock: error reading TX packet: {:?}", e);
+ continue;
+ }
+ };
+
+ if self.thread_backend.send_pkt(&pkt).is_err() {
+ vring
+ .get_mut()
+ .get_queue_mut()
+ .iter(mem)
+ .unwrap()
+ .go_to_previous_position();
+ break;
+ }
+
+ // TODO: Check if the protocol requires read length to be correct
+ let used_len = 0;
+
+ let vring = vring.clone();
+ let event_idx = self.event_idx;
+
+ self.pool.spawn_ok(async move {
+ if event_idx {
+ if vring.add_used(head_idx, used_len as u32).is_err() {
+ warn!("Could not return used descriptors to ring");
+ }
+ match vring.needs_notification() {
+ Err(_) => {
+ warn!("Could not check if queue needs to be notified");
+ vring.signal_used_queue().unwrap();
+ }
+ Ok(needs_notification) => {
+ if needs_notification {
+ vring.signal_used_queue().unwrap();
+ }
+ }
+ }
+ } else {
+ if vring.add_used(head_idx, used_len as u32).is_err() {
+ warn!("Could not return used descriptors to ring");
+ }
+ vring.signal_used_queue().unwrap();
+ }
+ });
+ }
+
+ Ok(used_any)
+ }
+
+ /// Wrapper to process tx queue based on whether event idx is enabled or not.
+ pub fn process_tx(&mut self, vring_lock: &VringRwLock, event_idx: bool) -> Result<bool> {
+ if event_idx {
+ // To properly handle EVENT_IDX we need to keep calling
+ // process_rx_queue until it stops finding new requests
+ // on the queue, as vm-virtio's Queue implementation
+ // only checks avail_index once
+ loop {
+ vring_lock.disable_notification().unwrap();
+ self.process_tx_queue(vring_lock)?;
+ if !vring_lock.enable_notification().unwrap() {
+ break;
+ }
+ }
+ } else {
+ self.process_tx_queue(vring_lock)?;
+ }
+ Ok(false)
+ }
+}
+
+impl Drop for VhostUserVsockThread {
+ fn drop(&mut self) {
+ let _ = std::fs::remove_file(&self.host_sock_path);
+ self.thread_backend
+ .cid_map
+ .write()
+ .unwrap()
+ .remove(&self.guest_cid);
+ }
+}
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+ use tempfile::tempdir;
+ use vm_memory::GuestAddress;
+ use vmm_sys_util::eventfd::EventFd;
+
+ const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
+
+ impl VhostUserVsockThread {
+ fn get_epoll_file(&self) -> &File {
+ &self.epoll_file
+ }
+ }
+
+ #[test]
+ fn test_vsock_thread() {
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let t = VhostUserVsockThread::new(
+ test_dir
+ .path()
+ .join("test_vsock_thread.vsock")
+ .display()
+ .to_string(),
+ 3,
+ CONN_TX_BUF_SIZE,
+ cid_map,
+ );
+ assert!(t.is_ok());
+
+ let mut t = t.unwrap();
+ let epoll_fd = t.get_epoll_file().as_raw_fd();
+
+ let mem = GuestMemoryAtomic::new(
+ GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
+ );
+
+ t.mem = Some(mem.clone());
+
+ let dummy_fd = EventFd::new(0).unwrap();
+
+ assert!(VhostUserVsockThread::epoll_register(
+ epoll_fd,
+ dummy_fd.as_raw_fd(),
+ epoll::Events::EPOLLOUT
+ )
+ .is_ok());
+ assert!(VhostUserVsockThread::epoll_modify(
+ epoll_fd,
+ dummy_fd.as_raw_fd(),
+ epoll::Events::EPOLLIN
+ )
+ .is_ok());
+ assert!(VhostUserVsockThread::epoll_unregister(epoll_fd, dummy_fd.as_raw_fd()).is_ok());
+ assert!(VhostUserVsockThread::epoll_register(
+ epoll_fd,
+ dummy_fd.as_raw_fd(),
+ epoll::Events::EPOLLIN
+ )
+ .is_ok());
+
+ let vring = VringRwLock::new(mem, 0x1000).unwrap();
+ vring.set_queue_info(0x100, 0x200, 0x300).unwrap();
+ vring.set_queue_ready(true);
+
+ assert!(t.process_tx(&vring, false).is_ok());
+ assert!(t.process_tx(&vring, true).is_ok());
+ // add backend_rxq to avoid that RX processing is skipped
+ t.thread_backend
+ .backend_rxq
+ .push_back(ConnMapKey::new(0, 0));
+ assert!(t.process_rx(&vring, false).is_ok());
+ assert!(t.process_rx(&vring, true).is_ok());
+
+ dummy_fd.write(1).unwrap();
+
+ t.process_backend_evt(EventSet::empty());
+
+ test_dir.close().unwrap();
+ }
+
+ #[test]
+ fn test_vsock_thread_failures() {
+ let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
+
+ let test_dir = tempdir().expect("Could not create a temp test directory.");
+
+ let t = VhostUserVsockThread::new(
+ "/sys/not_allowed.vsock".to_string(),
+ 3,
+ CONN_TX_BUF_SIZE,
+ cid_map.clone(),
+ );
+ assert!(t.is_err());
+
+ let vsock_socket_path = test_dir
+ .path()
+ .join("test_vsock_thread_failures.vsock")
+ .display()
+ .to_string();
+ let mut t =
+ VhostUserVsockThread::new(vsock_socket_path, 3, CONN_TX_BUF_SIZE, cid_map).unwrap();
+ assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err());
+ assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err());
+ assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err());
+
+ let mem = GuestMemoryAtomic::new(
+ GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
+ );
+
+ let vring = VringRwLock::new(mem, 0x1000).unwrap();
+
+ // memory is not configured, so processing TX should fail
+ assert!(t.process_tx(&vring, false).is_err());
+ assert!(t.process_tx(&vring, true).is_err());
+
+ // add backend_rxq to avoid that RX processing is skipped
+ t.thread_backend
+ .backend_rxq
+ .push_back(ConnMapKey::new(0, 0));
+ assert!(t.process_rx(&vring, false).is_err());
+ assert!(t.process_rx(&vring, true).is_err());
+
+ test_dir.close().unwrap();
+ }
+}
diff --git a/src/vsock_conn.rs b/src/vsock_conn.rs
new file mode 100644
index 0000000..058c2e1
--- /dev/null
+++ b/src/vsock_conn.rs
@@ -0,0 +1,781 @@
+// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
+
+use std::{
+ io::{ErrorKind, Read, Write},
+ num::Wrapping,
+ os::unix::prelude::{AsRawFd, RawFd},
+};
+
+use log::info;
+use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
+use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice};
+
+use crate::{
+ rxops::*,
+ rxqueue::*,
+ txbuf::*,
+ vhu_vsock::{
+ Error, Result, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND,
+ VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE,
+ VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM,
+ },
+ vhu_vsock_thread::VhostUserVsockThread,
+};
+
+#[derive(Debug)]
+pub(crate) struct VsockConnection<S> {
+ /// Host-side stream corresponding to this vsock connection.
+ pub stream: S,
+ /// Specifies if the stream is connected to a listener on the host.
+ pub connect: bool,
+ /// Port at which a guest application is listening to.
+ pub peer_port: u32,
+ /// Queue holding pending rx operations per connection.
+ pub rx_queue: RxQueue,
+ /// CID of the host.
+ local_cid: u64,
+ /// Port on the host at which a host-side application listens to.
+ pub local_port: u32,
+ /// CID of the guest.
+ pub guest_cid: u64,
+ /// Total number of bytes written to stream from tx buffer.
+ pub fwd_cnt: Wrapping<u32>,
+ /// Total number of bytes previously forwarded to stream.
+ last_fwd_cnt: Wrapping<u32>,
+ /// Size of buffer the guest has allocated for this connection.
+ peer_buf_alloc: u32,
+ /// Number of bytes the peer has forwarded to a connection.
+ peer_fwd_cnt: Wrapping<u32>,
+ /// The total number of bytes sent to the guest vsock driver.
+ rx_cnt: Wrapping<u32>,
+ /// epoll fd to which this connection's stream has to be added.
+ pub epoll_fd: RawFd,
+ /// Local tx buffer.
+ pub tx_buf: LocalTxBuf,
+ /// Local tx buffer size
+ tx_buffer_size: u32,
+}
+
+impl<S: AsRawFd + Read + Write> VsockConnection<S> {
+ /// Create a new vsock connection object for locally i.e host-side
+ /// inititated connections.
+ pub fn new_local_init(
+ stream: S,
+ local_cid: u64,
+ local_port: u32,
+ guest_cid: u64,
+ guest_port: u32,
+ epoll_fd: RawFd,
+ tx_buffer_size: u32,
+ ) -> Self {
+ Self {
+ stream,
+ connect: false,
+ peer_port: guest_port,
+ rx_queue: RxQueue::new(),
+ local_cid,
+ local_port,
+ guest_cid,
+ fwd_cnt: Wrapping(0),
+ last_fwd_cnt: Wrapping(0),
+ peer_buf_alloc: 0,
+ peer_fwd_cnt: Wrapping(0),
+ rx_cnt: Wrapping(0),
+ epoll_fd,
+ tx_buf: LocalTxBuf::new(tx_buffer_size),
+ tx_buffer_size,
+ }
+ }
+
+ /// Create a new vsock connection object for connections initiated by
+ /// an application running in the guest.
+ #[allow(clippy::too_many_arguments)]
+ pub fn new_peer_init(
+ stream: S,
+ local_cid: u64,
+ local_port: u32,
+ guest_cid: u64,
+ guest_port: u32,
+ epoll_fd: RawFd,
+ peer_buf_alloc: u32,
+ tx_buffer_size: u32,
+ ) -> Self {
+ let mut rx_queue = RxQueue::new();
+ rx_queue.enqueue(RxOps::Response);
+ Self {
+ stream,
+ connect: false,
+ peer_port: guest_port,
+ rx_queue,
+ local_cid,
+ local_port,
+ guest_cid,
+ fwd_cnt: Wrapping(0),
+ last_fwd_cnt: Wrapping(0),
+ peer_buf_alloc,
+ peer_fwd_cnt: Wrapping(0),
+ rx_cnt: Wrapping(0),
+ epoll_fd,
+ tx_buf: LocalTxBuf::new(tx_buffer_size),
+ tx_buffer_size,
+ }
+ }
+
+ /// Set the peer port to the guest side application's port.
+ pub fn set_peer_port(&mut self, peer_port: u32) {
+ self.peer_port = peer_port;
+ }
+
+ /// Process a vsock packet that is meant for this connection.
+ /// Forward data to the host-side application if the vsock packet
+ /// contains a RW operation.
+ pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
+ // Initialize all fields in the packet header
+ self.init_pkt(pkt);
+
+ match self.rx_queue.dequeue() {
+ Some(RxOps::Request) => {
+ // Send a connection request to the guest-side application
+ pkt.set_op(VSOCK_OP_REQUEST);
+ Ok(())
+ }
+ Some(RxOps::Rw) => {
+ if !self.connect {
+ // There is no host-side application listening for this
+ // packet, hence send back an RST.
+ pkt.set_op(VSOCK_OP_RST);
+ return Ok(());
+ }
+
+ // Check if peer has space for receiving data
+ if self.need_credit_update_from_peer() {
+ self.last_fwd_cnt = self.fwd_cnt;
+ pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
+ return Ok(());
+ }
+ let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?;
+
+ // Perform a credit check to find the maximum read size. The read
+ // data must fit inside a packet buffer and be within peer's
+ // available buffer space
+ let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit());
+
+ // Read data from the stream directly into the buffer
+ if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) {
+ if read_cnt == 0 {
+ // If no data was read then the stream was closed down unexpectedly.
+ // Send a shutdown packet to the guest-side application.
+ pkt.set_op(VSOCK_OP_SHUTDOWN)
+ .set_flag(VSOCK_FLAGS_SHUTDOWN_RCV)
+ .set_flag(VSOCK_FLAGS_SHUTDOWN_SEND);
+ } else {
+ // If data was read, then set the length field in the packet header
+ // to the amount of data that was read.
+ pkt.set_op(VSOCK_OP_RW).set_len(read_cnt as u32);
+
+ // Re-register the stream file descriptor for read and write events
+ VhostUserVsockThread::epoll_register(
+ self.epoll_fd,
+ self.stream.as_raw_fd(),
+ epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
+ )?;
+ }
+
+ // Update the rx_cnt with the amount of data in the vsock packet.
+ self.rx_cnt += Wrapping(pkt.len());
+ self.last_fwd_cnt = self.fwd_cnt;
+ }
+ Ok(())
+ }
+ Some(RxOps::Response) => {
+ // A response has been received to a newly initiated host-side connection
+ self.connect = true;
+ pkt.set_op(VSOCK_OP_RESPONSE);
+ Ok(())
+ }
+ Some(RxOps::CreditUpdate) => {
+ // Request credit update from the guest.
+ if !self.rx_queue.pending_rx() {
+ // Waste an rx buffer if no rx is pending
+ pkt.set_op(VSOCK_OP_CREDIT_UPDATE);
+ self.last_fwd_cnt = self.fwd_cnt;
+ }
+ Ok(())
+ }
+ _ => Err(Error::NoRequestRx),
+ }
+ }
+
+ /// Deliver a guest generated packet to this connection.
+ ///
+ /// Returns:
+ /// - always `Ok(())` to indicate that the packet has been consumed
+ pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
+ // Update peer credit information
+ self.peer_buf_alloc = pkt.buf_alloc();
+ self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
+
+ match pkt.op() {
+ VSOCK_OP_RESPONSE => {
+ // Confirmation for a host initiated connection
+ // TODO: Handle stream write error in a better manner
+ let response = format!("OK {}\n", self.peer_port);
+ self.stream.write_all(response.as_bytes()).unwrap();
+ self.connect = true;
+ }
+ VSOCK_OP_RW => {
+ // Data has to be written to the host-side stream
+ match pkt.data_slice() {
+ None => {
+ info!(
+ "Dropping empty packet from guest (lp={}, pp={})",
+ self.local_port, self.peer_port
+ );
+ return Ok(());
+ }
+ Some(buf) => {
+ if let Err(err) = self.send_bytes(buf) {
+ // TODO: Terminate this connection
+ dbg!("err:{:?}", err);
+ return Ok(());
+ }
+ }
+ }
+ }
+ VSOCK_OP_CREDIT_UPDATE => {
+ // Already updated the credit
+
+ // Re-register the stream file descriptor for read and write events
+ if VhostUserVsockThread::epoll_modify(
+ self.epoll_fd,
+ self.stream.as_raw_fd(),
+ epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
+ )
+ .is_err()
+ {
+ VhostUserVsockThread::epoll_register(
+ self.epoll_fd,
+ self.stream.as_raw_fd(),
+ epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
+ )
+ .unwrap();
+ };
+ }
+ VSOCK_OP_CREDIT_REQUEST => {
+ // Send back this connection's credit information
+ self.rx_queue.enqueue(RxOps::CreditUpdate);
+ }
+ VSOCK_OP_SHUTDOWN => {
+ // Shutdown this connection
+ let recv_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_RCV != 0;
+ let send_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_SEND != 0;
+
+ if recv_off && send_off && self.tx_buf.is_empty() {
+ self.rx_queue.enqueue(RxOps::Reset);
+ }
+ }
+ _ => {}
+ }
+
+ Ok(())
+ }
+
+ /// Write data to the host-side stream.
+ ///
+ /// Returns:
+ /// - Ok(cnt) where cnt is the number of bytes written to the stream
+ /// - Err(Error::UnixWrite) if there was an error writing to the stream
+ fn send_bytes<B: BitmapSlice>(&mut self, buf: &VolatileSlice<B>) -> Result<()> {
+ if !self.tx_buf.is_empty() {
+ // Data is already present in the buffer and the backend
+ // is waiting for a EPOLLOUT event to flush it
+ return self.tx_buf.push(buf);
+ }
+
+ // Write data to the stream
+ let written_count = match buf.write_to(0, &mut self.stream, buf.len()) {
+ Ok(cnt) => cnt,
+ Err(vm_memory::VolatileMemoryError::IOError(e)) => {
+ if e.kind() == ErrorKind::WouldBlock {
+ 0
+ } else {
+ dbg!("send_bytes error: {:?}", e);
+ return Err(Error::UnixWrite);
+ }
+ }
+ Err(e) => {
+ dbg!("send_bytes error: {:?}", e);
+ return Err(Error::UnixWrite);
+ }
+ };
+
+ if written_count > 0 {
+ // Increment forwarded count by number of bytes written to the stream
+ self.fwd_cnt += Wrapping(written_count as u32);
+
+ // At what point in available credits should we send a credit update.
+ // This is set to 1/4th of the tx buffer size. If we keep it too low,
+ // we will end up sending too many credit updates. If we keep it too
+ // high, we will end up sending too few credit updates and cause stalls.
+ // Stalls are more bad than too many credit updates.
+ let free_space = self
+ .tx_buffer_size
+ .wrapping_sub((self.fwd_cnt - self.last_fwd_cnt).0);
+ if free_space < self.tx_buffer_size / 4 {
+ self.rx_queue.enqueue(RxOps::CreditUpdate);
+ }
+ }
+
+ if written_count != buf.len() {
+ return self.tx_buf.push(&buf.offset(written_count).unwrap());
+ }
+
+ Ok(())
+ }
+
+ /// Initialize all header fields in the vsock packet.
+ fn init_pkt<'a, 'b, B: BitmapSlice>(
+ &self,
+ pkt: &'a mut VsockPacket<'b, B>,
+ ) -> &'a mut VsockPacket<'b, B> {
+ // Zero out the packet header
+ pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap();
+
+ pkt.set_src_cid(self.local_cid)
+ .set_dst_cid(self.guest_cid)
+ .set_src_port(self.local_port)
+ .set_dst_port(self.peer_port)
+ .set_type(VSOCK_TYPE_STREAM)
+ .set_buf_alloc(self.tx_buffer_size)
+ .set_fwd_cnt(self.fwd_cnt.0)
+ }
+
+ /// Get max number of bytes we can send to peer without overflowing
+ /// the peer's buffer.
+ fn peer_avail_credit(&self) -> usize {
+ (Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
+ }
+
+ /// Check if we need a credit update from the peer before sending
+ /// more data to it.
+ fn need_credit_update_from_peer(&self) -> bool {
+ self.peer_avail_credit() == 0
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use byteorder::{ByteOrder, LittleEndian};
+
+ use super::*;
+ use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM};
+ use std::io::Result as IoResult;
+ use std::ops::Deref;
+ use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE};
+ use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT};
+ use vm_memory::{
+ Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard,
+ GuestMemoryMmap,
+ };
+
+ const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
+
+ struct HeadParams {
+ head_len: usize,
+ data_len: u32,
+ }
+
+ impl HeadParams {
+ fn new(head_len: usize, data_len: u32) -> Self {
+ Self { head_len, data_len }
+ }
+ fn construct_head(&self) -> Vec<u8> {
+ let mut header = vec![0_u8; self.head_len];
+ if self.head_len == PKT_HEADER_SIZE {
+ // Offset into the header for data length
+ const HDROFF_LEN: usize = 24;
+ LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len);
+ }
+ header
+ }
+ }
+
+ fn prepare_desc_chain_vsock(
+ write_only: bool,
+ head_params: &HeadParams,
+ data_chain_len: u16,
+ head_data_len: u32,
+ ) -> (
+ GuestMemoryAtomic<GuestMemoryMmap>,
+ DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>,
+ ) {
+ let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap();
+ let virt_queue = MockSplitQueue::new(&mem, 16);
+ let mut next_addr = virt_queue.desc_table().total_size() + 0x100;
+ let mut flags = 0;
+
+ if write_only {
+ flags |= VRING_DESC_F_WRITE;
+ }
+
+ let mut head_flags = if data_chain_len > 0 {
+ flags | VRING_DESC_F_NEXT
+ } else {
+ flags
+ };
+
+ // vsock packet header
+ // let header = vec![0 as u8; head_params.head_len];
+ let header = head_params.construct_head();
+ let head_desc =
+ Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1);
+ mem.write(&header, head_desc.addr()).unwrap();
+ assert!(virt_queue.desc_table().store(0, head_desc).is_ok());
+ next_addr += head_params.head_len as u64;
+
+ // Put the descriptor index 0 in the first available ring position.
+ mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4))
+ .unwrap();
+
+ // Set `avail_idx` to 1.
+ mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2))
+ .unwrap();
+
+ // chain len excludes the head
+ for i in 0..(data_chain_len) {
+ // last descr in chain
+ if i == data_chain_len - 1 {
+ head_flags &= !VRING_DESC_F_NEXT;
+ }
+ // vsock data
+ let data = vec![0_u8; head_data_len as usize];
+ let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2);
+ mem.write(&data, data_desc.addr()).unwrap();
+ assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok());
+ next_addr += head_data_len as u64;
+ }
+
+ // Create descriptor chain from pre-filled memory
+ (
+ GuestMemoryAtomic::new(mem.clone()),
+ virt_queue
+ .create_queue::<Queue>()
+ .unwrap()
+ .iter(GuestMemoryAtomic::new(mem.clone()).memory())
+ .unwrap()
+ .next()
+ .unwrap(),
+ )
+ }
+
+ struct VsockDummySocket {
+ data: Vec<u8>,
+ }
+
+ impl VsockDummySocket {
+ fn new() -> Self {
+ Self { data: Vec::new() }
+ }
+ }
+
+ impl Write for VsockDummySocket {
+ fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> {
+ self.data.clear();
+ self.data.extend_from_slice(buf);
+
+ Ok(buf.len())
+ }
+ fn flush(&mut self) -> IoResult<()> {
+ Ok(())
+ }
+ }
+
+ impl Read for VsockDummySocket {
+ fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
+ buf[..self.data.len()].copy_from_slice(&self.data);
+ Ok(self.data.len())
+ }
+ }
+
+ impl AsRawFd for VsockDummySocket {
+ fn as_raw_fd(&self) -> RawFd {
+ -1
+ }
+ }
+
+ #[test]
+ fn test_vsock_conn_init() {
+ // new locally inititated connection
+ let dummy_file = VsockDummySocket::new();
+ let mut conn_local = VsockConnection::new_local_init(
+ dummy_file,
+ VSOCK_HOST_CID,
+ 5000,
+ 3,
+ 5001,
+ -1,
+ CONN_TX_BUF_SIZE,
+ );
+
+ assert!(!conn_local.connect);
+ assert_eq!(conn_local.peer_port, 5001);
+ assert_eq!(conn_local.rx_queue, RxQueue::new());
+ assert_eq!(conn_local.local_cid, VSOCK_HOST_CID);
+ assert_eq!(conn_local.local_port, 5000);
+ assert_eq!(conn_local.guest_cid, 3);
+
+ // set peer port
+ conn_local.set_peer_port(5002);
+ assert_eq!(conn_local.peer_port, 5002);
+
+ // New connection initiated by the peer/guest
+ let dummy_file = VsockDummySocket::new();
+ let mut conn_peer = VsockConnection::new_peer_init(
+ dummy_file,
+ VSOCK_HOST_CID,
+ 5000,
+ 3,
+ 5001,
+ -1,
+ 65536,
+ CONN_TX_BUF_SIZE,
+ );
+
+ assert!(!conn_peer.connect);
+ assert_eq!(conn_peer.peer_port, 5001);
+ assert_eq!(conn_peer.rx_queue.dequeue().unwrap(), RxOps::Response);
+ assert!(!conn_peer.rx_queue.pending_rx());
+ assert_eq!(conn_peer.local_cid, VSOCK_HOST_CID);
+ assert_eq!(conn_peer.local_port, 5000);
+ assert_eq!(conn_peer.guest_cid, 3);
+ assert_eq!(conn_peer.peer_buf_alloc, 65536);
+ }
+
+ #[test]
+ fn test_vsock_conn_credit() {
+ // new locally inititated connection
+ let dummy_file = VsockDummySocket::new();
+ let mut conn_local = VsockConnection::new_local_init(
+ dummy_file,
+ VSOCK_HOST_CID,
+ 5000,
+ 3,
+ 5001,
+ -1,
+ CONN_TX_BUF_SIZE,
+ );
+
+ assert_eq!(conn_local.peer_avail_credit(), 0);
+ assert!(conn_local.need_credit_update_from_peer());
+
+ conn_local.peer_buf_alloc = 65536;
+ assert_eq!(conn_local.peer_avail_credit(), 65536);
+ assert!(!conn_local.need_credit_update_from_peer());
+
+ conn_local.rx_cnt = Wrapping(32768);
+ assert_eq!(conn_local.peer_avail_credit(), 32768);
+ assert!(!conn_local.need_credit_update_from_peer());
+
+ conn_local.rx_cnt = Wrapping(65536);
+ assert_eq!(conn_local.peer_avail_credit(), 0);
+ assert!(conn_local.need_credit_update_from_peer());
+ }
+
+ #[test]
+ fn test_vsock_conn_init_pkt() {
+ // parameters for packet head construction
+ let head_params = HeadParams::new(PKT_HEADER_SIZE, 10);
+
+ // new locally inititated connection
+ let dummy_file = VsockDummySocket::new();
+ let conn_local = VsockConnection::new_local_init(
+ dummy_file,
+ VSOCK_HOST_CID,
+ 5000,
+ 3,
+ 5001,
+ -1,
+ CONN_TX_BUF_SIZE,
+ );
+
+ // write only descriptor chain
+ let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10);
+ let mem = mem.memory();
+ let mut pkt =
+ VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
+ .unwrap();
+
+ // initialize a vsock packet for the guest
+ conn_local.init_pkt(&mut pkt);
+
+ assert_eq!(pkt.src_cid(), VSOCK_HOST_CID);
+ assert_eq!(pkt.dst_cid(), 3);
+ assert_eq!(pkt.src_port(), 5000);
+ assert_eq!(pkt.dst_port(), 5001);
+ assert_eq!(pkt.type_(), VSOCK_TYPE_STREAM);
+ assert_eq!(pkt.buf_alloc(), CONN_TX_BUF_SIZE);
+ assert_eq!(pkt.fwd_cnt(), 0);
+ }
+
+ #[test]
+ fn test_vsock_conn_recv_pkt() {
+ // parameters for packet head construction
+ let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
+
+ // new locally inititated connection
+ let dummy_file = VsockDummySocket::new();
+ let mut conn_local = VsockConnection::new_local_init(
+ dummy_file,
+ VSOCK_HOST_CID,
+ 5000,
+ 3,
+ 5001,
+ -1,
+ CONN_TX_BUF_SIZE,
+ );
+
+ // write only descriptor chain
+ let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5);
+ let mem = mem.memory();
+ let mut pkt =
+ VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
+ .unwrap();
+
+ // VSOCK_OP_REQUEST: new local conn request
+ conn_local.rx_queue.enqueue(RxOps::Request);
+ let op_req = conn_local.recv_pkt(&mut pkt);
+ assert!(op_req.is_ok());
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert_eq!(pkt.op(), VSOCK_OP_REQUEST);
+
+ // VSOCK_OP_RST: reset if connection not established
+ conn_local.rx_queue.enqueue(RxOps::Rw);
+ let op_rst = conn_local.recv_pkt(&mut pkt);
+ assert!(op_rst.is_ok());
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert_eq!(pkt.op(), VSOCK_OP_RST);
+
+ // VSOCK_OP_CREDIT_UPDATE: need credit update from peer/guest
+ conn_local.connect = true;
+ conn_local.rx_queue.enqueue(RxOps::Rw);
+ conn_local.fwd_cnt = Wrapping(1024);
+ let op_credit_update = conn_local.recv_pkt(&mut pkt);
+ assert!(op_credit_update.is_ok());
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert_eq!(pkt.op(), VSOCK_OP_CREDIT_REQUEST);
+ assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
+
+ // VSOCK_OP_SHUTDOWN: zero data read from stream/file
+ conn_local.peer_buf_alloc = 65536;
+ conn_local.rx_queue.enqueue(RxOps::Rw);
+ let op_zero_read_shutdown = conn_local.recv_pkt(&mut pkt);
+ assert!(op_zero_read_shutdown.is_ok());
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert_eq!(conn_local.rx_cnt, Wrapping(0));
+ assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
+ assert_eq!(pkt.op(), VSOCK_OP_SHUTDOWN);
+ assert_eq!(
+ pkt.flags(),
+ VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND
+ );
+
+ // VSOCK_OP_RW: finite data read from stream/file
+ conn_local.stream.write_all(b"hello").unwrap();
+ conn_local.rx_queue.enqueue(RxOps::Rw);
+ let op_zero_read = conn_local.recv_pkt(&mut pkt);
+ // below error due to epoll add
+ assert!(op_zero_read.is_err());
+ assert_eq!(pkt.op(), VSOCK_OP_RW);
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert_eq!(pkt.len(), 5);
+ let buf = &mut [0u8; 5];
+ assert!(pkt.data_slice().unwrap().read_slice(buf, 0).is_ok());
+ assert_eq!(buf, b"hello");
+
+ // VSOCK_OP_RESPONSE: response from a locally initiated connection
+ conn_local.rx_queue.enqueue(RxOps::Response);
+ let op_response = conn_local.recv_pkt(&mut pkt);
+ assert!(op_response.is_ok());
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert_eq!(pkt.op(), VSOCK_OP_RESPONSE);
+ assert!(conn_local.connect);
+
+ // VSOCK_OP_CREDIT_UPDATE: guest needs credit update
+ conn_local.rx_queue.enqueue(RxOps::CreditUpdate);
+ let op_credit_update = conn_local.recv_pkt(&mut pkt);
+ assert!(!conn_local.rx_queue.pending_rx());
+ assert!(op_credit_update.is_ok());
+ assert_eq!(pkt.op(), VSOCK_OP_CREDIT_UPDATE);
+ assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
+
+ // non-existent request
+ let op_error = conn_local.recv_pkt(&mut pkt);
+ assert!(op_error.is_err());
+ }
+
+ #[test]
+ fn test_vsock_conn_send_pkt() {
+ // parameters for packet head construction
+ let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
+
+ // new locally inititated connection
+ let dummy_file = VsockDummySocket::new();
+ let mut conn_local = VsockConnection::new_local_init(
+ dummy_file,
+ VSOCK_HOST_CID,
+ 5000,
+ 3,
+ 5001,
+ -1,
+ CONN_TX_BUF_SIZE,
+ );
+
+ // write only descriptor chain
+ let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);
+ let mem = mem.memory();
+ let mut pkt =
+ VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
+ .unwrap();
+
+ // peer credit information
+ pkt.set_buf_alloc(65536).set_fwd_cnt(1024);
+
+ // check if peer credit information is updated currently
+ let credit_check = conn_local.send_pkt(&pkt);
+ assert!(credit_check.is_ok());
+ assert_eq!(conn_local.peer_buf_alloc, 65536);
+ assert_eq!(conn_local.peer_fwd_cnt, Wrapping(1024));
+
+ // VSOCK_OP_RESPONSE
+ pkt.set_op(VSOCK_OP_RESPONSE);
+ let peer_response = conn_local.send_pkt(&pkt);
+ assert!(peer_response.is_ok());
+ assert!(conn_local.connect);
+ let mut resp_buf = vec![0; 8];
+ conn_local.stream.read_exact(&mut resp_buf).unwrap();
+ assert_eq!(resp_buf, b"OK 5001\n");
+
+ // VSOCK_OP_RW
+ pkt.set_op(VSOCK_OP_RW);
+ let buf = b"hello";
+ assert!(pkt.data_slice().unwrap().write_slice(buf, 0).is_ok());
+ let rw_response = conn_local.send_pkt(&pkt);
+ assert!(rw_response.is_ok());
+ let mut resp_buf = vec![0; 5];
+ conn_local.stream.read_exact(&mut resp_buf).unwrap();
+ assert_eq!(resp_buf, b"hello");
+
+ // VSOCK_OP_CREDIT_REQUEST
+ pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
+ let credit_response = conn_local.send_pkt(&pkt);
+ assert!(credit_response.is_ok());
+ assert_eq!(conn_local.rx_queue.peek().unwrap(), RxOps::CreditUpdate);
+
+ // VSOCK_OP_SHUTDOWN
+ pkt.set_op(VSOCK_OP_SHUTDOWN);
+ pkt.set_flags(VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND);
+ let shutdown_response = conn_local.send_pkt(&pkt);
+ assert!(shutdown_response.is_ok());
+ assert!(conn_local.rx_queue.contains(RxOps::Reset.bitmask()));
+ }
+}