aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-04-02 21:33:18 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-04-02 21:33:18 +0000
commitcd0dfe04a46259f7e56110c7d5d11bacd8e221ae (patch)
treea54e24d149f00ad487d9da00d4ded4043d6d1787
parent2dfe4e1060da94929d2b92f935e8fb893d8713b8 (diff)
parent1c0bfa292aff0c938d949689a250bc3fa2fdd3e3 (diff)
downloadgrpcio-android12-qpr1-d-s1-release.tar.gz
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/grpcio/+/1662923 Change-Id: I0969f20093796804aad8602c0f9e602623cbc2f1
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--.clang-tidy42
-rw-r--r--.github/workflows/ci.yml9
-rw-r--r--.travis.yml23
-rw-r--r--Android.bp68
-rw-r--r--CHANGELOG.md20
-rw-r--r--Cargo.toml12
-rw-r--r--Cargo.toml.orig13
-rw-r--r--METADATA10
-rw-r--r--README.md2
-rwxr-xr-xscripts/generate-bindings.sh2
-rw-r--r--src/buf.rs4
-rw-r--r--src/call/client.rs2
-rw-r--r--src/call/mod.rs8
-rw-r--r--src/call/server.rs26
-rw-r--r--src/channel.rs67
-rw-r--r--src/cq.rs48
-rw-r--r--src/metadata.rs16
-rw-r--r--src/server.rs12
-rw-r--r--src/task/mod.rs25
-rw-r--r--src/task/promise.rs20
21 files changed, 287 insertions, 144 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index c4d77c4..05823ce 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "37188956eb8e71631326d708b8afb0940918f5d8"
+ "sha1": "b35467b1bfe58eedf4e77025432074d361be591f"
}
}
diff --git a/.clang-tidy b/.clang-tidy
index d217441..752b25e 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -1,6 +1,44 @@
---
-Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*'
-WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*'
+# Disable abseil-no-namespace: https://bugs.llvm.org/show_bug.cgi?id=47947
+Checks: '-*,
+ abseil-*,
+ -abseil-no-namespace,
+ bugprone-*,
+ -bugprone-narrowing-conversions,
+ -bugprone-too-small-loop-variable,
+ performance-*,
+ -performance-unnecessary-copy-initialization,
+ -performance-unnecessary-value-param,
+ google-*,
+ -google-runtime-int,
+ -google-runtime-references,
+ misc-definitions-in-headers,
+ misc-static-assert,
+ misc-unconventional-assign-operator,
+ misc-uniqueptr-reset-release,
+ misc-unused-alias-decls,
+ misc-unused-using-decls,
+ modernize-make-unique,
+ -modernize-redundant-void-arg,
+ modernize-replace-auto-ptr,
+ modernize-shrink-to-fit,
+ modernize-use-bool-literals,
+ modernize-use-nullptr,
+ modernize-use-override,
+ readability-container-size-empty,
+ readability-deleted-default,
+ readability-function-size,
+ readability-inconsistent-declaration-parameter-name,
+ readability-redundant-control-flow,
+ readability-redundant-smartptr-get,
+ readability-string-compare'
+WarningsAsErrors: '*'
CheckOptions:
- key: readability-function-size.StatementThreshold
value: '450'
+ - key: modernize-make-unique.MakeSmartPtrFunction
+ value: 'absl::make_unique'
+ - key: modernize-make-unique.MakeSmartPtrFunctionHeader
+ value: 'absl/memory/memory.h'
+ - key: google-readability-braces-around-statements.ShortStatementLines
+ value: 1
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1c42285..3731f9e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -39,6 +39,7 @@ jobs:
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features protobuf-codec
- run: cargo build --no-default-features --features prost-codec
+ - run: cd proto && cargo build --no-default-features --features prost-codec
- run: cargo build
- run: cargo test --all
@@ -75,9 +76,9 @@ jobs:
- uses: actions/checkout@v2
- run: which go && go version && which cargo && cargo version && clang --version && openssl version
- run: scripts/reset-submodule.cmd
- - run: cargo build --no-default-features
- - run: cargo build --no-default-features --features protobuf-codec
- - run: cargo build --no-default-features --features prost-codec
+ - run: cargo build --no-default-features --features use-bindgen
+ - run: cargo build --no-default-features --features "protobuf-codec use-bindgen"
+ - run: cargo build --no-default-features --features "prost-codec use-bindgen"
- run: cargo build
- run: cargo test --all
@@ -95,6 +96,8 @@ jobs:
Win:
name: Windows
runs-on: windows-latest
+ env:
+ LIBCLANG_PATH: 'C:\Program Files\LLVM\bin'
steps:
- uses: actions/checkout@v2
- run: choco install -y llvm
diff --git a/.travis.yml b/.travis.yml
index 822e0f1..9cc0821 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,7 +1,7 @@
# Travis is only used to test ARM64 Linux
dist: focal
-sudo: false
+sudo: true
language: rust
git:
submodules: false
@@ -26,12 +26,12 @@ addons:
jobs:
include:
- os: linux
- arch: arm64
+ arch: arm64-graviton2
+ vm: virt
before_script:
- scripts/reset-submodule.cmd
- - export GRPC_VERSION=1.33.1
+ - export GRPC_VERSION=1.35.0
- export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin"
- - sudo apt-get update && sudo apt-get -y install libssl-dev
- which cmake && cmake --version && openssl version
- eval "$(gimme stable)"
script:
@@ -44,5 +44,16 @@ jobs:
- cargo build --no-default-features --features protobuf-codec
- cargo build --no-default-features --features prost-codec
- cargo build
- - cargo test --release --all
- - cargo test --release --features "openssl-vendored" --all
+ - travis_wait 40 cargo test --release --all
+ - os: linux
+ arch: arm64-graviton2
+ vm: virt
+ before_script:
+ - scripts/reset-submodule.cmd
+ - export GRPC_VERSION=1.35.0
+ - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin"
+ - sudo apt-get update && sudo apt-get -y install libssl-dev
+ - which cmake && cmake --version && openssl version
+ - eval "$(gimme stable)"
+ script:
+ - travis_wait 40 cargo test --release --features "openssl-vendored" --all
diff --git a/Android.bp b/Android.bp
index ef9f992..44ad1a2 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,5 @@
// This file is generated by cargo2android.py --run --device --dependencies --features=protobuf,protobuf-codec.
+// Do not modify this file as changes will be overridden on upgrade.
package {
default_applicable_licenses: ["external_rust_crates_grpcio_license"],
@@ -38,59 +39,40 @@ rust_library {
}
// dependent_library ["feature_list"]
-// aho-corasick-0.7.15 "default,std"
-// bindgen-0.51.1
-// bitflags-1.2.1 "default"
-// boringssl-src-0.1.0
-// cc-1.0.66
-// cexpr-0.3.6
-// cfg-if-0.1.10
+// boringssl-src-0.2.0
+// cc-1.0.67
// cfg-if-1.0.0
-// clang-sys-0.28.1 "clang_6_0,gte_clang_3_6,gte_clang_3_7,gte_clang_3_8,gte_clang_3_9,gte_clang_4_0,gte_clang_5_0,gte_clang_6_0,libloading,runtime"
// cmake-0.1.45
-// futures-0.3.8 "alloc,async-await,default,executor,futures-executor,std"
-// futures-channel-0.3.8 "alloc,futures-sink,sink,std"
-// futures-core-0.3.8 "alloc,std"
-// futures-executor-0.3.8 "std"
-// futures-io-0.3.8 "std"
-// futures-macro-0.3.8
-// futures-sink-0.3.8 "alloc,std"
-// futures-task-0.3.8 "alloc,once_cell,std"
-// futures-util-0.3.8 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
-// glob-0.3.0
-// grpcio-sys-0.7.2 "default"
+// futures-0.3.13 "alloc,async-await,default,executor,futures-executor,std"
+// futures-channel-0.3.13 "alloc,futures-sink,sink,std"
+// futures-core-0.3.13 "alloc,std"
+// futures-executor-0.3.13 "std"
+// futures-io-0.3.13 "std"
+// futures-macro-0.3.13
+// futures-sink-0.3.13 "alloc,std"
+// futures-task-0.3.13 "alloc,std"
+// futures-util-0.3.13 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
+// grpcio-sys-0.8.1
// instant-0.1.9
-// lazy_static-1.4.0
-// libc-0.2.81 "default,std"
-// libloading-0.5.2
+// libc-0.2.92 "default,std"
// libz-sys-1.1.2 "default,libc,static,stock-zlib"
// lock_api-0.4.2
-// log-0.4.11
-// memchr-2.3.4 "default,std,use_std"
-// nom-4.2.3 "alloc,default,std,verbose-errors"
-// once_cell-1.5.2 "alloc,std"
+// log-0.4.14
+// memchr-2.3.4 "default,std"
// parking_lot-0.11.1 "default"
-// parking_lot_core-0.8.1
-// peeking_take_while-0.1.2
-// pin-project-1.0.2
-// pin-project-internal-1.0.2
+// parking_lot_core-0.8.3
+// pin-project-lite-0.2.6
// pin-utils-0.1.0
// pkg-config-0.3.19
// proc-macro-hack-0.5.19
-// proc-macro-nested-0.1.6
-// proc-macro2-1.0.24 "default,proc-macro"
-// protobuf-2.18.1
-// quote-1.0.7 "default,proc-macro"
-// regex-1.4.2 "aho-corasick,default,memchr,perf,perf-cache,perf-dfa,perf-inline,perf-literal,std,thread_local,unicode,unicode-age,unicode-bool,unicode-case,unicode-gencat,unicode-perl,unicode-script,unicode-segment"
-// regex-syntax-0.6.21 "default,unicode,unicode-age,unicode-bool,unicode-case,unicode-gencat,unicode-perl,unicode-script,unicode-segment"
-// rustc-hash-1.1.0 "default,std"
+// proc-macro-nested-0.1.7
+// proc-macro2-1.0.26 "default,proc-macro"
+// protobuf-2.22.1
+// quote-1.0.9 "default,proc-macro"
// same-file-1.0.6
// scopeguard-1.1.0
-// shlex-0.1.1
// slab-0.4.2
-// smallvec-1.5.1
-// syn-1.0.54 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
-// thread_local-1.0.1
+// smallvec-1.6.1
+// syn-1.0.68 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
// unicode-xid-0.2.1 "default"
-// version_check-0.1.5
-// walkdir-2.3.1
+// walkdir-2.3.2
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6a6dfdb..f928b24 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,23 @@
+# 0.8.2 - 2012-03-10
+
+- Fix send requirement in connectivity APIs (#516)
+
+# 0.8.1 - 2021-03-05
+
+- Support watch connectivity state (#513)
+- Fix prost build of grpcio-proto (#515)
+
+# grpcio-sys 0.8.1 - 2021-03-02
+
+- Detect changes ahead to ease pain of upgrading compiler (#511)
+
+# 0.8.0 - 2021-02-19
+
+- Fix clippy warnings (#504)
+- Add a way to not use bindgen (#499)
+- Update gRPC C core to 1.35.0 (#506)
+- Update bindgen to 0.57.0 (#507)
+
# 0.7.1 - 2020-12-18
- Allow CXX environment variable to override g++ for musl build (#500)
diff --git a/Cargo.toml b/Cargo.toml
index b03eb7c..c8ccf28 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "grpcio"
-version = "0.7.1"
+version = "0.8.2"
authors = ["The TiKV Project Developers"]
autoexamples = false
description = "The rust language implementation of gRPC, base on the gRPC c core library."
@@ -29,14 +29,15 @@ all-features = true
[profile.release]
debug = true
[dependencies.bytes]
-version = "0.5"
+version = "1.0"
optional = true
[dependencies.futures]
version = "0.3"
[dependencies.grpcio-sys]
-version = "0.7"
+version = "0.8"
+default-features = false
[dependencies.libc]
version = "0.2"
@@ -48,7 +49,7 @@ version = "0.4"
version = "0.11"
[dependencies.prost]
-version = "0.6"
+version = "0.7"
optional = true
[dependencies.protobuf]
@@ -56,12 +57,13 @@ version = "2.0"
optional = true
[features]
-default = ["protobuf-codec", "secure"]
+default = ["protobuf-codec", "secure", "use-bindgen"]
no-omit-frame-pointer = ["grpcio-sys/no-omit-frame-pointer"]
openssl = ["secure", "grpcio-sys/openssl"]
openssl-vendored = ["secure", "grpcio-sys/openssl-vendored"]
prost-codec = ["prost", "bytes"]
protobuf-codec = ["protobuf"]
secure = ["grpcio-sys/secure"]
+use-bindgen = ["grpcio-sys/use-bindgen"]
[badges.travis-ci]
repository = "tikv/grpc-rs"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 7daa61e..8938f97 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "grpcio"
-version = "0.7.1"
+version = "0.8.2"
edition = "2018"
authors = ["The TiKV Project Developers"]
license = "Apache-2.0"
@@ -17,12 +17,12 @@ autoexamples = false
all-features = true
[dependencies]
-grpcio-sys = { path = "grpc-sys", version = "0.7" }
+grpcio-sys = { path = "grpc-sys", version = "0.8", default-features = false }
libc = "0.2"
futures = "0.3"
protobuf = { version = "2.0", optional = true }
-prost = { version = "0.6", optional = true }
-bytes = { version = "0.5", optional = true }
+prost = { version = "0.7", optional = true }
+bytes = { version = "1.0", optional = true }
log = "0.4"
parking_lot = "0.11"
@@ -30,13 +30,14 @@ parking_lot = "0.11"
members = ["proto", "benchmark", "compiler", "interop", "tests-and-examples"]
[features]
-default = ["protobuf-codec", "secure"]
+default = ["protobuf-codec", "secure", "use-bindgen"]
protobuf-codec = ["protobuf"]
prost-codec = ["prost", "bytes"]
secure = ["grpcio-sys/secure"]
openssl = ["secure", "grpcio-sys/openssl"]
openssl-vendored = ["secure", "grpcio-sys/openssl-vendored"]
no-omit-frame-pointer = ["grpcio-sys/no-omit-frame-pointer"]
+use-bindgen = ["grpcio-sys/use-bindgen"]
[profile.release]
debug = true
@@ -45,4 +46,4 @@ debug = true
travis-ci = { repository = "tikv/grpc-rs" }
[patch.crates-io]
-grpcio-compiler = { path = "compiler", version = "0.7.0", default-features = false }
+grpcio-compiler = { path = "compiler", version = "0.8.0", default-features = false }
diff --git a/METADATA b/METADATA
index ab343d9..a81517a 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/grpcio/grpcio-0.7.1.crate"
+ value: "https://static.crates.io/crates/grpcio/grpcio-0.8.2.crate"
}
- version: "0.7.1"
+ version: "0.8.2"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 12
- day: 18
+ year: 2021
+ month: 4
+ day: 1
}
}
diff --git a/README.md b/README.md
index 03f4e19..d4d384a 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ This project is still under development. The following features with the check m
- LLVM and Clang >= 3.9 if you need to generate bindings at compile time.
- By default, the [secure feature](#feature-secure) is provided by boringssl. You can also use openssl instead by enabling [openssl feature](#feature-openssl).
-For Linux and MacOS, you also need to install gcc (or clang) too.
+For Linux and MacOS, you also need to install gcc 4.9+ (or clang) too.
Bindings are pre-generated for x86_64/arm64 Linux. For other platforms, bindings are generated at compile time.
diff --git a/scripts/generate-bindings.sh b/scripts/generate-bindings.sh
index 462b2fa..81a043e 100755
--- a/scripts/generate-bindings.sh
+++ b/scripts/generate-bindings.sh
@@ -11,5 +11,5 @@ export UPDATE_BIND=1
cargo build -p grpcio-sys --target ${ARCH}-unknown-linux-gnu
rustfmt grpc-sys/bindings/*
if [ "$(uname -s)" == "Linux" ]; then
- sed -i '/pub type .*= ::std::os::raw::.*/d' grpc-sys/bindings/*
+ sed -i '/^pub type .*= ::std::os::raw::.*/d' grpc-sys/bindings/*
fi
diff --git a/src/buf.rs b/src/buf.rs
index d51f274..de8fe54 100644
--- a/src/buf.rs
+++ b/src/buf.rs
@@ -427,7 +427,7 @@ impl bytes::Buf for GrpcByteBufferReader {
self.remain
}
- fn bytes(&self) -> &[u8] {
+ fn chunk(&self) -> &[u8] {
// This is similar but not identical to `BuffRead::fill_buf`, since `self`
// is not mutable, we can only return bytes up to the end of the current
// slice.
@@ -595,7 +595,7 @@ mod tests {
let mut count = 100;
while reader.remaining() > 0 {
assert_eq!(remaining, reader.remaining());
- let bytes = Buf::bytes(&reader);
+ let bytes = Buf::chunk(&reader);
bytes.iter().for_each(|b| assert_eq!(*b, len as u8));
let mut read = bytes.len();
// We don't have to advance by the whole amount we read.
diff --git a/src/call/client.rs b/src/call/client.rs
index 279e619..d1047bf 100644
--- a/src/call/client.rs
+++ b/src/call/client.rs
@@ -399,7 +399,7 @@ impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> {
t.close_f = Some(close_f);
}
- if let Poll::Pending = Pin::new(t.close_f.as_mut().unwrap()).poll(cx)? {
+ if Pin::new(t.close_f.as_mut().unwrap()).poll(cx)?.is_pending() {
// if call is finished, can return early here.
call.check_alive()?;
return Poll::Pending;
diff --git a/src/call/mod.rs b/src/call/mod.rs
index a06e003..7f1582f 100644
--- a/src/call/mod.rs
+++ b/src/call/mod.rs
@@ -33,9 +33,9 @@ impl From<i32> for RpcStatusCode {
}
}
-impl Into<i32> for RpcStatusCode {
- fn into(self) -> i32 {
- self.0
+impl From<RpcStatusCode> for i32 {
+ fn from(code: RpcStatusCode) -> i32 {
+ code.0
}
}
@@ -533,7 +533,7 @@ impl StreamingBase {
if !skip_finish_check {
let mut finished = false;
if let Some(close_f) = &mut self.close_f {
- if let Poll::Ready(_) = Pin::new(close_f).poll(cx)? {
+ if Pin::new(close_f).poll(cx)?.is_ready() {
// Don't return immediately, there may be pending data.
finished = true;
}
diff --git a/src/call/server.rs b/src/call/server.rs
index 875555e..0fed656 100644
--- a/src/call/server.rs
+++ b/src/call/server.rs
@@ -3,6 +3,7 @@
use std::ffi::CStr;
use std::pin::Pin;
use std::sync::Arc;
+use std::time::Duration;
use std::{result, slice};
use crate::grpc_sys::{
@@ -30,8 +31,10 @@ use crate::server::{BoxHandler, RequestCallContext};
use crate::task::{BatchFuture, CallTag, Executor, Kicker};
use crate::CheckResult;
+/// A time point that an rpc or operation should finished before it.
+#[derive(Clone, Copy)]
pub struct Deadline {
- spec: gpr_timespec,
+ pub(crate) spec: gpr_timespec,
}
impl Deadline {
@@ -44,12 +47,27 @@ impl Deadline {
}
}
- pub fn exceeded(&self) -> bool {
+ /// Checks if the deadline is exceeded.
+ pub fn exceeded(self) -> bool {
unsafe {
let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
grpc_sys::gpr_time_cmp(now, self.spec) >= 0
}
}
+
+ pub(crate) fn spec(self) -> gpr_timespec {
+ self.spec
+ }
+}
+
+impl From<Duration> for Deadline {
+ /// Build a deadline from given duration.
+ ///
+ /// The deadline will be `now + duration`.
+ #[inline]
+ fn from(dur: Duration) -> Deadline {
+ Deadline::new(dur.into())
+ }
}
/// Context for accepting a request.
@@ -626,8 +644,8 @@ impl<'a> RpcContext<'a> {
self.ctx.host()
}
- pub fn deadline(&self) -> &Deadline {
- &self.deadline
+ pub fn deadline(&self) -> Deadline {
+ self.deadline
}
/// Get the initial metadata sent by client.
diff --git a/src/channel.rs b/src/channel.rs
index a33a4be..c8c67b1 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -4,12 +4,14 @@ use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
+use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, i32, ptr};
-use crate::grpc_sys::{
- self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args,
+use crate::{
+ grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
+ Deadline,
};
use libc::{self, c_char, c_int};
@@ -17,6 +19,7 @@ use crate::call::{Call, Method};
use crate::cq::CompletionQueue;
use crate::env::Environment;
use crate::error::Result;
+use crate::task::CallTag;
use crate::task::Kicker;
use crate::CallOption;
use crate::ResourceQuota;
@@ -28,7 +31,7 @@ pub use crate::grpc_sys::{
/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
fn format_user_agent_string(agent: &str) -> CString {
- let version = "0.7.1";
+ let version = "0.8.2";
let trimed_agent = agent.trim();
let val = if trimed_agent.is_empty() {
format!("grpc-rust/{}", version)
@@ -589,12 +592,66 @@ impl Channel {
}
}
- // If try_to_connect is true, the channel will try to establish a connection, potentially
- // changing the state.
+ /// If try_to_connect is true, the channel will try to establish a connection, potentially
+ /// changing the state.
pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
self.inner.check_connectivity_state(try_to_connect)
}
+ /// Blocking wait for channel state change or deadline expiration.
+ ///
+ /// `check_connectivity_state` needs to be called to get the current state. Returns false
+ /// means deadline excceeds before observing any state changes.
+ pub fn wait_for_state_change(
+ &self,
+ last_observed: ConnectivityState,
+ deadline: impl Into<Deadline>,
+ ) -> impl Future<Output = bool> {
+ let (cq_f, prom) = CallTag::action_pair();
+ let prom_box = Box::new(prom);
+ let tag = Box::into_raw(prom_box);
+ let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
+ unsafe {
+ grpcio_sys::grpc_channel_watch_connectivity_state(
+ self.inner.channel,
+ last_observed,
+ deadline.into().spec(),
+ cq_ref.as_ptr(),
+ tag as *mut _,
+ )
+ }
+ true
+ } else {
+ // It's already shutdown.
+ false
+ };
+ async move { should_wait && cq_f.await.unwrap() }
+ }
+
+ /// Wait for this channel to be connected.
+ ///
+ /// Returns false means deadline excceeds before connection is connected.
+ pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
+ // Fast path, it's probably connected.
+ let mut state = self.check_connectivity_state(true);
+ if ConnectivityState::GRPC_CHANNEL_READY == state {
+ return true;
+ }
+ let deadline = deadline.into();
+ loop {
+ if self.wait_for_state_change(state, deadline).await {
+ state = self.check_connectivity_state(true);
+ match state {
+ ConnectivityState::GRPC_CHANNEL_READY => return true,
+ ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
+ _ => (),
+ }
+ continue;
+ }
+ return false;
+ }
+ }
+
/// Create a Kicker.
pub(crate) fn create_kicker(&self) -> Result<Kicker> {
let cq_ref = self.cq.borrow()?;
diff --git a/src/cq.rs b/src/cq.rs
index 60b6bb3..58d6d6b 100644
--- a/src/cq.rs
+++ b/src/cq.rs
@@ -36,35 +36,39 @@ impl CompletionQueueHandle {
}
fn add_ref(&self) -> Result<()> {
+ let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
loop {
- let cnt = self.ref_cnt.load(Ordering::SeqCst);
if cnt <= 0 {
// `shutdown` has been called, reject any requests.
return Err(Error::QueueShutdown);
}
let new_cnt = cnt + 1;
- if cnt
- == self
- .ref_cnt
- .compare_and_swap(cnt, new_cnt, Ordering::SeqCst)
- {
- return Ok(());
+ match self.ref_cnt.compare_exchange_weak(
+ cnt,
+ new_cnt,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ ) {
+ Ok(_) => return Ok(()),
+ Err(c) => cnt = c,
}
}
}
fn unref(&self) {
+ let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
let shutdown = loop {
- let cnt = self.ref_cnt.load(Ordering::SeqCst);
// If `shutdown` is not called, `cnt` > 0, so minus 1 to unref.
// If `shutdown` is called, `cnt` < 0, so plus 1 to unref.
let new_cnt = cnt - cnt.signum();
- if cnt
- == self
- .ref_cnt
- .compare_and_swap(cnt, new_cnt, Ordering::SeqCst)
- {
- break new_cnt == 0;
+ match self.ref_cnt.compare_exchange_weak(
+ cnt,
+ new_cnt,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ ) {
+ Ok(_) => break new_cnt == 0,
+ Err(c) => cnt = c,
}
};
if shutdown {
@@ -75,8 +79,8 @@ impl CompletionQueueHandle {
}
fn shutdown(&self) {
+ let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
let shutdown = loop {
- let cnt = self.ref_cnt.load(Ordering::SeqCst);
if cnt <= 0 {
// `shutdown` is called, skipped.
return;
@@ -85,12 +89,14 @@ impl CompletionQueueHandle {
// Because `cnt` is initialized to 1, so minus 1 to make it reach
// toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`.
let new_cnt = -cnt + 1;
- if cnt
- == self
- .ref_cnt
- .compare_and_swap(cnt, new_cnt, Ordering::SeqCst)
- {
- break new_cnt == 0;
+ match self.ref_cnt.compare_exchange_weak(
+ cnt,
+ new_cnt,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ ) {
+ Ok(_) => break new_cnt == 0,
+ Err(c) => cnt = c,
}
};
if shutdown {
diff --git a/src/metadata.rs b/src/metadata.rs
index 746b593..893f6e2 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -16,11 +16,11 @@ fn normalize_key(key: &str, binary: bool) -> Result<Cow<'_, str>> {
let mut is_upper_case = false;
for b in key.as_bytes() {
let b = *b;
- if b >= b'A' && b <= b'Z' {
+ if (b'A'..=b'Z').contains(&b) {
is_upper_case = true;
continue;
- } else if b >= b'a' && b <= b'z'
- || b >= b'0' && b <= b'9'
+ } else if (b'a'..=b'z').contains(&b)
+ || (b'0'..=b'9').contains(&b)
|| b == b'_'
|| b == b'-'
|| b == b'.'
@@ -83,10 +83,10 @@ impl MetadataBuilder {
}
}
let key = normalize_key(key, false)?;
- self.add_metadata(&key, value.as_bytes())
+ Ok(self.add_metadata(&key, value.as_bytes()))
}
- fn add_metadata(&mut self, key: &str, value: &[u8]) -> Result<&mut MetadataBuilder> {
+ fn add_metadata(&mut self, key: &str, value: &[u8]) -> &mut MetadataBuilder {
unsafe {
grpc_sys::grpcwrap_metadata_array_add(
&mut self.arr.0,
@@ -96,7 +96,7 @@ impl MetadataBuilder {
value.len(),
)
}
- Ok(self)
+ self
}
/// Add a metadata holding a binary value.
@@ -104,7 +104,7 @@ impl MetadataBuilder {
/// `key` needs to have suffix (-bin) indicating a binary valued metadata entry.
pub fn add_bytes(&mut self, key: &str, value: &[u8]) -> Result<&mut MetadataBuilder> {
let key = normalize_key(key, true)?;
- self.add_metadata(&key, value)
+ Ok(self.add_metadata(&key, value))
}
/// Create `Metadata` with configured entries.
@@ -221,7 +221,7 @@ impl Clone for Metadata {
let mut builder = MetadataBuilder::with_capacity(self.len());
for (k, v) in self.iter() {
// use `add_metadata` to skip validation.
- builder.add_metadata(k, v).unwrap();
+ builder.add_metadata(k, v);
}
builder.build()
}
diff --git a/src/server.rs b/src/server.rs
index 0f01690..a612b13 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -11,6 +11,7 @@ use std::sync::Arc;
use crate::grpc_sys::{self, grpc_call_error, grpc_server};
use futures::future::Future;
+use futures::ready;
use futures::task::{Context, Poll};
use crate::call::server::*;
@@ -523,14 +524,19 @@ pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) {
/// A `Future` that will resolve when shutdown completes.
pub struct ShutdownFuture {
- cq_f: CqFuture<()>,
+ /// `true` means the future finishes successfully.
+ cq_f: CqFuture<bool>,
}
impl Future for ShutdownFuture {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- Pin::new(&mut self.cq_f).poll(cx)
+ match ready!(Pin::new(&mut self.cq_f).poll(cx)) {
+ Ok(true) => Poll::Ready(Ok(())),
+ Ok(false) => Poll::Ready(Err(Error::ShutdownFailed)),
+ Err(e) => unreachable!("action future should never resolve to error: {}", e),
+ }
}
}
@@ -549,7 +555,7 @@ pub struct Server {
impl Server {
/// Shutdown the server asynchronously.
pub fn shutdown(&mut self) -> ShutdownFuture {
- let (cq_f, prom) = CallTag::shutdown_pair();
+ let (cq_f, prom) = CallTag::action_pair();
let prom_box = Box::new(prom);
let tag = Box::into_raw(prom_box);
unsafe {
diff --git a/src/task/mod.rs b/src/task/mod.rs
index f151d0e..53369f1 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -14,7 +14,7 @@ use parking_lot::Mutex;
use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
use self::executor::SpawnTask;
-use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
+use self::promise::{Action as ActionPromise, Batch as BatchPromise};
use crate::call::server::RequestContext;
use crate::call::{BatchContext, Call, MessageReader};
use crate::cq::CompletionQueue;
@@ -113,7 +113,7 @@ pub enum CallTag {
Request(RequestCallback),
UnaryRequest(UnaryRequestCallback),
Abort(Abort),
- Shutdown(ShutdownPromise),
+ Action(ActionPromise),
Spawn(Arc<SpawnTask>),
}
@@ -131,11 +131,12 @@ impl CallTag {
CallTag::Request(RequestCallback::new(ctx))
}
- /// Generate a Future/CallTag pair for shutdown call.
- pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
+ /// Generate a Future/CallTag pair for action call that only cares if the result is
+ /// successful.
+ pub fn action_pair() -> (CqFuture<bool>, CallTag) {
let inner = new_inner();
- let shutdown = ShutdownPromise::new(inner.clone());
- (CqFuture::new(inner), CallTag::Shutdown(shutdown))
+ let action = ActionPromise::new(inner.clone());
+ (CqFuture::new(inner), CallTag::Action(action))
}
/// Generate a CallTag for abort call before handler is called.
@@ -175,7 +176,7 @@ impl CallTag {
CallTag::Request(cb) => cb.resolve(cq, success),
CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
CallTag::Abort(_) => {}
- CallTag::Shutdown(prom) => prom.resolve(success),
+ CallTag::Action(prom) => prom.resolve(success),
CallTag::Spawn(notify) => self::executor::resolve(notify, success),
}
}
@@ -188,7 +189,7 @@ impl Debug for CallTag {
CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
- CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
+ CallTag::Action(_) => write!(f, "CallTag::Action"),
CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
}
}
@@ -208,8 +209,8 @@ mod tests {
fn test_resolve() {
let env = Environment::new(1);
- let (cq_f1, tag1) = CallTag::shutdown_pair();
- let (cq_f2, tag2) = CallTag::shutdown_pair();
+ let (cq_f1, tag1) = CallTag::action_pair();
+ let (cq_f2, tag2) = CallTag::action_pair();
let (tx, rx) = mpsc::channel();
let handler = thread::spawn(move || {
@@ -224,8 +225,8 @@ mod tests {
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
tag2.resolve(&env.pick_cq(), false);
match rx.recv() {
- Ok(Err(Error::ShutdownFailed)) => {}
- res => panic!("expect shutdown failed, but got {:?}", res),
+ Ok(Ok(false)) => {}
+ res => panic!("expect Ok(false), but got {:?}", res),
}
handler.join().unwrap();
diff --git a/src/task/promise.rs b/src/task/promise.rs
index 02e9419..50add06 100644
--- a/src/task/promise.rs
+++ b/src/task/promise.rs
@@ -104,24 +104,22 @@ impl Debug for Batch {
}
}
-/// A promise used to resolve async shutdown result.
-pub struct Shutdown {
- inner: Arc<Inner<()>>,
+/// A promise used to resolve async action status.
+///
+/// The action can only succeed or fail without extra error hint.
+pub struct Action {
+ inner: Arc<Inner<bool>>,
}
-impl Shutdown {
- pub fn new(inner: Arc<Inner<()>>) -> Shutdown {
- Shutdown { inner }
+impl Action {
+ pub fn new(inner: Arc<Inner<bool>>) -> Action {
+ Action { inner }
}
pub fn resolve(self, success: bool) {
let task = {
let mut guard = self.inner.lock();
- if success {
- guard.set_result(Ok(()))
- } else {
- guard.set_result(Err(Error::ShutdownFailed))
- }
+ guard.set_result(Ok(success))
};
task.map(|t| t.wake());
}