aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark D. Roth <roth@google.com>2023-11-06 14:42:43 -0800
committerGitHub <noreply@github.com>2023-11-06 14:42:43 -0800
commit8a000f45f80ab349020d1060e38f221edda63cc9 (patch)
tree2458da3a5c216346fd8c6cd31ed062e949711975
parent3869ef09a554dd4e8e0703b4099cc9891bbdd8c2 (diff)
downloadgrpc-grpc-8a000f45f80ab349020d1060e38f221edda63cc9.tar.gz
[grpclb and fake resolver] clean up e2e tests and simplify fake resolver (#34887)
Changes to fake resolver: - Add `WaitForReresolutionRequest()` method to fake resolver response generator to allow tests to tell when re-resolution has been requested. - Change fake resolver response generator API to have only one mechanism for injecting results, regardless of whether the result is an error or whether it's triggered by a re-resolution. Changes to grpclb_end2end_test: - Change balancer interface such that instead of setting a list of responses with fixed delays, the test can control exactly when each response is set. - Change balancer impl to always send the initial LB response, as expected by the grpclb protocol. - Change balancer impl to always read load reports, even if load reporting is not expected to be enabled. (The latter case will still cause the test to fail.) Reads are done in a different thread than writes. - Allow each test to directly control how many backends and balancers are started and the client load reporting interval, so that (a) we don't waste resources starting servers we don't need and (b) there is no need to arbitrarily split tests across different test classes. - Add timeouts to `WaitForAllBackends()` functionality, so that tests will fail with a useful error rather than timing out. - Improved ergonomics of various helper functions in the test framework. In the process of making these changes, I found a couple of bugs: - A bug in pick_first, which I fixed in #34885. - A bug in grpclb, in which we were using the wrong condition to decide whether to propagate a re-resolution request from the child policy, which I've fixed in this PR. (This bug probably originated way back in #18344.) This should address a lot of the flakes seen in grpclb_e2e_test recently.
-rw-r--r--BUILD6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc33
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc263
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h71
-rw-r--r--test/core/client_channel/resolvers/fake_resolver_test.cc316
-rw-r--r--test/core/end2end/BUILD9
-rw-r--r--test/core/end2end/no_server_test.cc25
-rw-r--r--test/core/transport/chttp2/too_many_pings_test.cc46
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc49
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc1701
11 files changed, 1037 insertions, 1484 deletions
diff --git a/BUILD b/BUILD
index 5d360a2dfc..fa48d19545 100644
--- a/BUILD
+++ b/BUILD
@@ -3748,9 +3748,9 @@ grpc_cc_library(
hdrs = ["//src/core:ext/filters/client_channel/resolver/fake/fake_resolver.h"],
external_deps = [
"absl/base:core_headers",
- "absl/status",
- "absl/status:statusor",
"absl/strings",
+ "absl/time",
+ "absl/types:optional",
],
language = "c++",
visibility = [
@@ -3760,7 +3760,6 @@ grpc_cc_library(
deps = [
"config",
"debug_location",
- "endpoint_addresses",
"gpr",
"grpc_public_hdrs",
"grpc_resolver",
@@ -3769,7 +3768,6 @@ grpc_cc_library(
"uri_parser",
"work_serializer",
"//src/core:channel_args",
- "//src/core:grpc_service_config",
"//src/core:notification",
"//src/core:ref_counted",
"//src/core:useful",
diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
index f82ac6f53d..598df7ec1b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
@@ -98,7 +98,7 @@ class ChildPolicyHandler::Helper
: parent()->child_policy_.get();
if (child_ != latest_child_policy) return;
if (GRPC_TRACE_FLAG_ENABLED(*(parent()->tracer_))) {
- gpr_log(GPR_INFO, "[child_policy_handler %p] started name re-resolving",
+ gpr_log(GPR_INFO, "[child_policy_handler %p] requesting re-resolution",
parent());
}
parent()->channel_control_helper()->RequestReresolution();
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 85f04a3b4a..4d3ebc1455 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -841,14 +841,10 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
void GrpcLb::Helper::RequestReresolution() {
if (parent()->shutting_down_) return;
- // If we are talking to a balancer, we expect to get updated addresses
- // from the balancer, so we can ignore the re-resolution request from
- // the child policy. Otherwise, pass the re-resolution request up to the
- // channel.
- if (parent()->lb_calld_ == nullptr ||
- !parent()->lb_calld_->seen_initial_response()) {
- parent()->channel_control_helper()->RequestReresolution();
- }
+ // Ignore if we're not in fallback mode, because if we got the backend
+ // addresses from the balancer, re-resolving is not going to fix it.
+ if (!parent()->fallback_mode_) return;
+ parent()->channel_control_helper()->RequestReresolution();
}
//
@@ -1508,6 +1504,9 @@ void GrpcLb::ResetBackoffLocked() {
}
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "[grpclb %p] received update", this);
+ }
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
@@ -1516,11 +1515,15 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
// Add null LB token attributes.
- for (EndpointAddresses& addresses : *fallback_backend_addresses_) {
- addresses = EndpointAddresses(
- addresses.addresses(),
- addresses.args().SetObject(
+ for (EndpointAddresses& endpoint : *fallback_backend_addresses_) {
+ endpoint = EndpointAddresses(
+ endpoint.addresses(),
+ endpoint.args().SetObject(
MakeRefCounted<TokenAndClientStatsArg>("", nullptr)));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
+ endpoint.ToString().c_str());
+ }
}
}
resolution_note_ = std::move(args.resolution_note);
@@ -1569,6 +1572,12 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
absl::Status GrpcLb::UpdateBalancerChannelLocked() {
// Get balancer addresses.
EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
+ for (const auto& endpoint : balancer_addresses) {
+ gpr_log(GPR_INFO, "[grpclb %p] balancer address: %s", this,
+ endpoint.ToString().c_str());
+ }
+ }
absl::Status status;
if (balancer_addresses.empty()) {
status = absl::UnavailableError("balancer address list must be non-empty");
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index 193e3c2b42..91b0d9e721 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -22,10 +22,9 @@
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include <memory>
+#include <type_traits>
#include <utility>
-#include "absl/status/status.h"
-#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include <grpc/support/log.h>
@@ -36,9 +35,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/work_serializer.h"
-#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/resolver_factory.h"
-#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
@@ -55,47 +52,38 @@ class FakeResolver : public Resolver {
private:
friend class FakeResolverResponseGenerator;
- friend class FakeResolverResponseSetter;
void ShutdownLocked() override;
void MaybeSendResultLocked();
- void ReturnReresolutionResult();
-
// passed-in parameters
- ChannelArgs channel_args_;
std::shared_ptr<WorkSerializer> work_serializer_;
std::unique_ptr<ResultHandler> result_handler_;
+ ChannelArgs channel_args_;
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
- // If has_next_result_ is true, next_result_ is the next resolution result
- // to be returned.
- bool has_next_result_ = false;
- Result next_result_;
- // Result to use for the pretended re-resolution in
- // RequestReresolutionLocked().
- bool has_reresolution_result_ = false;
- Result reresolution_result_;
+ // The next resolution result to be returned, if any. Present when we
+ // get a result before the resolver is started.
+ absl::optional<Result> next_result_;
// True after the call to StartLocked().
bool started_ = false;
// True after the call to ShutdownLocked().
bool shutdown_ = false;
- // if true, return failure
- bool return_failure_ = false;
- // pending re-resolution
- bool reresolution_closure_pending_ = false;
};
FakeResolver::FakeResolver(ResolverArgs args)
: work_serializer_(std::move(args.work_serializer)),
result_handler_(std::move(args.result_handler)),
+ channel_args_(
+ // Channels sharing the same subchannels may have different resolver
+ // response generators. If we don't remove this arg, subchannel pool
+ // will create new subchannels for the same address instead of
+ // reusing existing ones because of different values of this channel
+ // arg. Can't just use GRPC_ARG_NO_SUBCHANNEL_PREFIX, since
+ // that can't be passed into the channel from test code.
+ args.args.Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR)),
response_generator_(
args.args.GetObjectRef<FakeResolverResponseGenerator>()) {
- // Channels sharing the same subchannels may have different resolver response
- // generators. If we don't remove this arg, subchannel pool will create new
- // subchannels for the same address instead of reusing existing ones because
- // of different values of this channel arg.
- channel_args_ = args.args.Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
if (response_generator_ != nullptr) {
response_generator_->SetFakeResolver(Ref());
}
@@ -107,19 +95,9 @@ void FakeResolver::StartLocked() {
}
void FakeResolver::RequestReresolutionLocked() {
- if (has_reresolution_result_ || return_failure_) {
- next_result_ = reresolution_result_;
- has_next_result_ = true;
- // Return the result in a different closure, so that we don't call
- // back into the LB policy while it's still processing the previous
- // update.
- if (!reresolution_closure_pending_) {
- reresolution_closure_pending_ = true;
- Ref().release(); // ref held by closure
- work_serializer_->Run([this]() { ReturnReresolutionResult(); },
- DEBUG_LOCATION);
- }
- }
+ // Re-resolution can't happen until after we return an initial result.
+ GPR_ASSERT(response_generator_ != nullptr);
+ response_generator_->ReresolutionRequested();
}
void FakeResolver::ShutdownLocked() {
@@ -132,80 +110,15 @@ void FakeResolver::ShutdownLocked() {
void FakeResolver::MaybeSendResultLocked() {
if (!started_ || shutdown_) return;
- if (return_failure_) {
- // TODO(roth): Change resolver result generator to be able to inject
- // the error to be returned and to be able to independently set errors
- // for addresses and service config.
- Result result;
- result.addresses = absl::UnavailableError("Resolver transient failure");
- result.service_config = result.addresses.status();
- result.args = channel_args_;
- result_handler_->ReportResult(std::move(result));
- return_failure_ = false;
- } else if (has_next_result_) {
+ if (next_result_.has_value()) {
// When both next_results_ and channel_args_ contain an arg with the same
- // name, only the one in next_results_.
- next_result_.args = next_result_.args.UnionWith(channel_args_);
- result_handler_->ReportResult(std::move(next_result_));
- has_next_result_ = false;
+ // name, use the one in next_results_.
+ next_result_->args = next_result_->args.UnionWith(channel_args_);
+ result_handler_->ReportResult(std::move(*next_result_));
+ next_result_.reset();
}
}
-void FakeResolver::ReturnReresolutionResult() {
- reresolution_closure_pending_ = false;
- MaybeSendResultLocked();
- Unref();
-}
-
-class FakeResolverResponseSetter {
- public:
- explicit FakeResolverResponseSetter(RefCountedPtr<FakeResolver> resolver,
- Resolver::Result result,
- bool has_result = false,
- bool immediate = true)
- : resolver_(std::move(resolver)),
- result_(std::move(result)),
- has_result_(has_result),
- immediate_(immediate) {}
- void SetResponseLocked();
- void SetReresolutionResponseLocked();
- void SetFailureLocked();
-
- private:
- RefCountedPtr<FakeResolver> resolver_;
- Resolver::Result result_;
- bool has_result_;
- bool immediate_;
-};
-
-// Deletes object when done
-void FakeResolverResponseSetter::SetReresolutionResponseLocked() {
- if (!resolver_->shutdown_) {
- resolver_->reresolution_result_ = std::move(result_);
- resolver_->has_reresolution_result_ = has_result_;
- }
- delete this;
-}
-
-// Deletes object when done
-void FakeResolverResponseSetter::SetResponseLocked() {
- if (!resolver_->shutdown_) {
- resolver_->next_result_ = std::move(result_);
- resolver_->has_next_result_ = true;
- resolver_->MaybeSendResultLocked();
- }
- delete this;
-}
-
-// Deletes object when done
-void FakeResolverResponseSetter::SetFailureLocked() {
- if (!resolver_->shutdown_) {
- resolver_->return_failure_ = true;
- if (immediate_) resolver_->MaybeSendResultLocked();
- }
- delete this;
-}
-
//
// FakeResolverResponseGenerator
//
@@ -220,101 +133,73 @@ void FakeResolverResponseGenerator::SetResponseAndNotify(
{
MutexLock lock(&mu_);
if (resolver_ == nullptr) {
- has_result_ = true;
result_ = std::move(result);
if (notify_when_set != nullptr) notify_when_set->Notify();
return;
}
resolver = resolver_->Ref();
}
- FakeResolverResponseSetter* arg =
- new FakeResolverResponseSetter(resolver, std::move(result));
- resolver->work_serializer_->Run(
- [arg, notify_when_set]() {
- arg->SetResponseLocked();
- if (notify_when_set != nullptr) notify_when_set->Notify();
- },
- DEBUG_LOCATION);
+ SendResultToResolver(std::move(resolver), std::move(result), notify_when_set);
}
-void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify(
- Resolver::Result result, Notification* notify_when_set) {
- RefCountedPtr<FakeResolver> resolver;
+void FakeResolverResponseGenerator::SetFakeResolver(
+ RefCountedPtr<FakeResolver> resolver) {
+ Resolver::Result result;
{
MutexLock lock(&mu_);
- GPR_ASSERT(resolver_ != nullptr);
- resolver = resolver_->Ref();
+ resolver_ = resolver;
+ if (resolver_set_cv_ != nullptr) resolver_set_cv_->SignalAll();
+ if (resolver == nullptr) return;
+ if (!result_.has_value()) return;
+ result = std::move(*result_);
+ result_.reset();
}
- FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
- resolver, std::move(result), true /* has_result */);
- resolver->work_serializer_->Run(
- [arg, notify_when_set]() {
- arg->SetReresolutionResponseLocked();
+ SendResultToResolver(std::move(resolver), std::move(result), nullptr);
+}
+
+void FakeResolverResponseGenerator::SendResultToResolver(
+ RefCountedPtr<FakeResolver> resolver, Resolver::Result result,
+ Notification* notify_when_set) {
+ auto* resolver_ptr = resolver.get();
+ resolver_ptr->work_serializer_->Run(
+ [resolver = std::move(resolver), result = std::move(result),
+ notify_when_set]() mutable {
+ if (!resolver->shutdown_) {
+ resolver->next_result_ = std::move(result);
+ resolver->MaybeSendResultLocked();
+ }
if (notify_when_set != nullptr) notify_when_set->Notify();
},
DEBUG_LOCATION);
}
-void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
- RefCountedPtr<FakeResolver> resolver;
- {
- MutexLock lock(&mu_);
- GPR_ASSERT(resolver_ != nullptr);
- resolver = resolver_->Ref();
- }
- FakeResolverResponseSetter* arg =
- new FakeResolverResponseSetter(resolver, Resolver::Result());
- resolver->work_serializer_->Run(
- [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
-}
-
-void FakeResolverResponseGenerator::SetFailure() {
- RefCountedPtr<FakeResolver> resolver;
- {
- MutexLock lock(&mu_);
- GPR_ASSERT(resolver_ != nullptr);
- resolver = resolver_->Ref();
- }
- FakeResolverResponseSetter* arg =
- new FakeResolverResponseSetter(resolver, Resolver::Result());
- resolver->work_serializer_->Run([arg]() { arg->SetFailureLocked(); },
- DEBUG_LOCATION);
-}
-
-void FakeResolverResponseGenerator::SetFailureOnReresolution() {
- RefCountedPtr<FakeResolver> resolver;
- {
- MutexLock lock(&mu_);
- GPR_ASSERT(resolver_ != nullptr);
- resolver = resolver_->Ref();
+bool FakeResolverResponseGenerator::WaitForResolverSet(absl::Duration timeout) {
+ MutexLock lock(&mu_);
+ if (resolver_ == nullptr) {
+ CondVar condition;
+ resolver_set_cv_ = &condition;
+ condition.WaitWithTimeout(&mu_, timeout);
+ resolver_set_cv_ = nullptr;
}
- FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
- resolver, Resolver::Result(), false /* has_result */,
- false /* immediate */);
- resolver->work_serializer_->Run([arg]() { arg->SetFailureLocked(); },
- DEBUG_LOCATION);
+ return resolver_ != nullptr;
}
-void FakeResolverResponseGenerator::SetFakeResolver(
- RefCountedPtr<FakeResolver> resolver) {
- MutexLock lock(&mu_);
- resolver_ = std::move(resolver);
- cv_.SignalAll();
- if (resolver_ == nullptr) return;
- if (has_result_) {
- FakeResolverResponseSetter* arg =
- new FakeResolverResponseSetter(resolver_, std::move(result_));
- resolver_->work_serializer_->Run([arg]() { arg->SetResponseLocked(); },
- DEBUG_LOCATION);
- has_result_ = false;
+bool FakeResolverResponseGenerator::WaitForReresolutionRequest(
+ absl::Duration timeout) {
+ MutexLock lock(&reresolution_mu_);
+ if (!reresolution_requested_) {
+ CondVar condition;
+ reresolution_cv_ = &condition;
+ condition.WaitWithTimeout(&reresolution_mu_, timeout);
+ reresolution_cv_ = nullptr;
}
+ return std::exchange(reresolution_requested_, false);
}
-void FakeResolverResponseGenerator::WaitForResolverSet() {
- MutexLock lock(&mu_);
- while (resolver_ == nullptr) {
- cv_.Wait(&mu_);
- }
+void FakeResolverResponseGenerator::ReresolutionRequested() {
+ MutexLock lock(&reresolution_mu_);
+ reresolution_requested_ = true;
+ if (reresolution_cv_ != nullptr) reresolution_cv_->SignalAll();
}
namespace {
@@ -341,22 +226,6 @@ const grpc_arg_pointer_vtable
ResponseGeneratorChannelArgCopy, ResponseGeneratorChannelArgDestroy,
ResponseGeneratorChannelArgCmp};
-grpc_arg FakeResolverResponseGenerator::MakeChannelArg(
- FakeResolverResponseGenerator* generator) {
- return grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR), generator,
- &kChannelArgPointerVtable);
-}
-
-RefCountedPtr<FakeResolverResponseGenerator>
-FakeResolverResponseGenerator::GetFromArgs(const grpc_channel_args* args) {
- auto* response_generator =
- grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
- args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
- if (response_generator == nullptr) return nullptr;
- return response_generator->Ref();
-}
-
//
// Factory
//
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index b0463eee9d..0a39dd4235 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -23,6 +23,8 @@
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "absl/types/optional.h"
#include <grpc/grpc.h>
@@ -42,8 +44,7 @@ class FakeResolver;
/// A mechanism for generating responses for the fake resolver.
/// An instance of this class is passed to the fake resolver via a channel
-/// argument (see \a MakeChannelArg()) and used to inject and trigger custom
-/// resolutions.
+/// argument and used to inject and trigger custom resolutions.
// TODO(roth): I would ideally like this to be InternallyRefCounted
// instead of RefCounted, but external refs are currently needed to
// encode this in channel args. Once channel_args are converted to C++,
@@ -77,50 +78,20 @@ class FakeResolverResponseGenerator
n.WaitForNotification();
}
- // Sets the re-resolution response, which is returned by the fake resolver
- // when re-resolution is requested (via \a RequestReresolutionLocked()).
- // The new re-resolution response replaces any previous re-resolution
- // response that may have been set by a previous call.
- // notify_when_set is an optional notification to signal when the response has
- // been set.
- void SetReresolutionResponseAndNotify(Resolver::Result result,
- Notification* notify_when_set);
- void SetReresolutionResponseAsync(Resolver::Result result) {
- SetReresolutionResponseAndNotify(std::move(result), nullptr);
- }
- void SetReresolutionResponseSynchronously(Resolver::Result result) {
- Notification n;
- SetReresolutionResponseAndNotify(std::move(result), &n);
- n.WaitForNotification();
- }
-
- // Unsets the re-resolution response. After this, the fake resolver will
- // not return anything when \a RequestReresolutionLocked() is called.
- void UnsetReresolutionResponse();
-
- // Tells the resolver to return a transient failure.
- void SetFailure();
-
- // Same as SetFailure(), but instead of returning the error
- // immediately, waits for the next call to RequestReresolutionLocked().
- void SetFailureOnReresolution();
-
- // Returns a channel arg containing \a generator.
- // TODO(roth): When we have time, make this a non-static method.
- static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator);
+ // Waits up to timeout for a re-resolution request. Returns true if a
+ // re-resolution request is seen, or false if timeout occurs. Returns
+ // true immediately if there was a re-resolution request since the
+ // last time this method was called.
+ bool WaitForReresolutionRequest(absl::Duration timeout);
- // Returns the response generator in \a args, or null if not found.
- static RefCountedPtr<FakeResolverResponseGenerator> GetFromArgs(
- const grpc_channel_args* args);
+ // Wait for a resolver to be set (setting may be happening asynchronously, so
+ // this may block - consider it test only).
+ bool WaitForResolverSet(absl::Duration timeout);
static absl::string_view ChannelArgName() {
return GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR;
}
- // Wait for a resolver to be set (setting may be happening asynchronously, so
- // this may block - consider it test only).
- void WaitForResolverSet();
-
static int ChannelArgsCompare(const FakeResolverResponseGenerator* a,
const FakeResolverResponseGenerator* b) {
return QsortCompare(a, b);
@@ -128,15 +99,29 @@ class FakeResolverResponseGenerator
private:
friend class FakeResolver;
+
// Set the corresponding FakeResolver to this generator.
void SetFakeResolver(RefCountedPtr<FakeResolver> resolver);
+ // Called by FakeResolver when re-resolution is requested.
+ void ReresolutionRequested();
+
+ // Helper function to send a result to the resolver.
+ static void SendResultToResolver(RefCountedPtr<FakeResolver> resolver,
+ Resolver::Result result,
+ Notification* notify_when_set);
+
// Mutex protecting the members below.
Mutex mu_;
- CondVar cv_;
+ CondVar* resolver_set_cv_ ABSL_GUARDED_BY(mu_) = nullptr;
RefCountedPtr<FakeResolver> resolver_ ABSL_GUARDED_BY(mu_);
- Resolver::Result result_ ABSL_GUARDED_BY(mu_);
- bool has_result_ ABSL_GUARDED_BY(mu_) = false;
+ // Temporarily stores the result when it gets set before the response
+ // generator is seen by the FakeResolver.
+ absl::optional<Resolver::Result> result_ ABSL_GUARDED_BY(mu_);
+
+ Mutex reresolution_mu_;
+ CondVar* reresolution_cv_ ABSL_GUARDED_BY(reresolution_mu_) = nullptr;
+ bool reresolution_requested_ ABSL_GUARDED_BY(reresolution_mu_) = false;
};
} // namespace grpc_core
diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc
index a02bfa2441..97f6ec587e 100644
--- a/test/core/client_channel/resolvers/fake_resolver_test.cc
+++ b/test/core/client_channel/resolvers/fake_resolver_test.cc
@@ -32,11 +32,10 @@
#include "absl/container/inlined_vector.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
+#include "absl/synchronization/notification.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
@@ -54,180 +53,175 @@
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/util/test_config.h"
-class ResultHandler : public grpc_core::Resolver::ResultHandler {
- public:
- void SetExpectedAndEvent(grpc_core::Resolver::Result expected,
- gpr_event* ev) {
- grpc_core::MutexLock lock(&mu_);
- ASSERT_EQ(ev_, nullptr);
- expected_ = std::move(expected);
- ev_ = ev;
- }
+namespace grpc_core {
+namespace testing {
- void ReportResult(grpc_core::Resolver::Result actual) override {
- grpc_core::MutexLock lock(&mu_);
- ASSERT_NE(ev_, nullptr);
- // We only check the addresses, because that's the only thing
- // explicitly set by the test via
- // FakeResolverResponseGenerator::SetResponse().
- ASSERT_TRUE(actual.addresses.ok());
- ASSERT_EQ(actual.addresses->size(), expected_.addresses->size());
- for (size_t i = 0; i < expected_.addresses->size(); ++i) {
- ASSERT_EQ((*actual.addresses)[i], (*expected_.addresses)[i]);
+class FakeResolverTest : public ::testing::Test {
+ protected:
+ class ResultHandler : public Resolver::ResultHandler {
+ public:
+ void SetExpectedAndNotification(Resolver::Result expected,
+ absl::Notification* notification) {
+ MutexLock lock(&mu_);
+ ASSERT_EQ(notification_, nullptr);
+ expected_ = std::move(expected);
+ notification_ = notification;
}
- gpr_event_set(ev_, reinterpret_cast<void*>(1));
- ev_ = nullptr;
- }
- private:
- grpc_core::Mutex mu_;
- grpc_core::Resolver::Result expected_ ABSL_GUARDED_BY(mu_);
- gpr_event* ev_ ABSL_GUARDED_BY(mu_) = nullptr;
-};
+ void ReportResult(Resolver::Result actual) override {
+ MutexLock lock(&mu_);
+ ASSERT_NE(notification_, nullptr);
+ // TODO(roth): Check fields other than just the addresses.
+ // Note: No good way to compare result_health_callback.
+ ASSERT_TRUE(actual.addresses.ok());
+ ASSERT_EQ(actual.addresses->size(), expected_.addresses->size());
+ for (size_t i = 0; i < expected_.addresses->size(); ++i) {
+ ASSERT_EQ((*actual.addresses)[i], (*expected_.addresses)[i]);
+ }
+ notification_->Notify();
+ notification_ = nullptr;
+ }
-static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
- std::shared_ptr<grpc_core::WorkSerializer> work_serializer,
- grpc_core::FakeResolverResponseGenerator* response_generator,
- std::unique_ptr<grpc_core::Resolver::ResultHandler> result_handler) {
- grpc_core::ResolverFactory* factory = grpc_core::CoreConfiguration::Get()
- .resolver_registry()
- .LookupResolverFactory("fake");
- grpc_arg generator_arg =
- grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
- response_generator);
- grpc_channel_args channel_args = {1, &generator_arg};
- grpc_core::ResolverArgs args;
- args.args = grpc_core::ChannelArgs::FromC(&channel_args);
- args.work_serializer = std::move(work_serializer);
- args.result_handler = std::move(result_handler);
- grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
- factory->CreateResolver(std::move(args));
- return resolver;
-}
+ private:
+ Mutex mu_;
+ Resolver::Result expected_ ABSL_GUARDED_BY(mu_);
+ absl::Notification* notification_ ABSL_GUARDED_BY(mu_) = nullptr;
+ };
-// Create a new resolution containing 2 addresses.
-static grpc_core::Resolver::Result create_new_resolver_result() {
- static size_t test_counter = 0;
- const size_t num_addresses = 2;
- // Create address list.
- grpc_core::EndpointAddressesList addresses;
- for (size_t i = 0; i < num_addresses; ++i) {
- std::string uri_string = absl::StrFormat("ipv4:127.0.0.1:100%" PRIuPTR,
- test_counter * num_addresses + i);
- absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(uri_string);
- EXPECT_TRUE(uri.ok());
- grpc_resolved_address address;
- EXPECT_TRUE(grpc_parse_uri(*uri, &address));
- absl::InlinedVector<grpc_arg, 2> args_to_add;
- addresses.emplace_back(address, grpc_core::ChannelArgs());
+ static OrphanablePtr<Resolver> BuildFakeResolver(
+ std::shared_ptr<WorkSerializer> work_serializer,
+ RefCountedPtr<FakeResolverResponseGenerator> response_generator,
+ std::unique_ptr<Resolver::ResultHandler> result_handler) {
+ ResolverFactory* factory =
+ CoreConfiguration::Get().resolver_registry().LookupResolverFactory(
+ "fake");
+ ResolverArgs args;
+ args.args = ChannelArgs().SetObject(std::move(response_generator));
+ args.work_serializer = std::move(work_serializer);
+ args.result_handler = std::move(result_handler);
+ return factory->CreateResolver(std::move(args));
}
- ++test_counter;
- grpc_core::Resolver::Result result;
- result.addresses = std::move(addresses);
- return result;
-}
-TEST(FakeResolverTest, FakeResolver) {
- grpc_core::ExecCtx exec_ctx;
- std::shared_ptr<grpc_core::WorkSerializer> work_serializer =
- std::make_shared<grpc_core::WorkSerializer>(
- grpc_event_engine::experimental::GetDefaultEventEngine());
- auto synchronously = [work_serializer](std::function<void()> do_this_thing) {
- grpc_core::Notification notification;
- work_serializer->Run(
- [do_this_thing = std::move(do_this_thing), &notification]() mutable {
- do_this_thing();
+ // Create a new resolution containing 2 addresses.
+ static Resolver::Result CreateResolverResult() {
+ static size_t test_counter = 0;
+ const size_t num_addresses = 2;
+ // Create address list.
+ EndpointAddressesList addresses;
+ for (size_t i = 0; i < num_addresses; ++i) {
+ std::string uri_string = absl::StrFormat(
+ "ipv4:127.0.0.1:100%" PRIuPTR, test_counter * num_addresses + i);
+ absl::StatusOr<URI> uri = URI::Parse(uri_string);
+ EXPECT_TRUE(uri.ok());
+ grpc_resolved_address address;
+ EXPECT_TRUE(grpc_parse_uri(*uri, &address));
+ absl::InlinedVector<grpc_arg, 2> args_to_add;
+ addresses.emplace_back(address, ChannelArgs());
+ }
+ ++test_counter;
+ Resolver::Result result;
+ result.addresses = std::move(addresses);
+ return result;
+ }
+
+ OrphanablePtr<Resolver> CreateResolver() {
+ result_handler_ = new ResultHandler();
+ return BuildFakeResolver(
+ work_serializer_, response_generator_,
+ std::unique_ptr<Resolver::ResultHandler>(result_handler_));
+ }
+
+ void RunSynchronously(std::function<void()> callback) {
+ Notification notification;
+ work_serializer_->Run(
+ [callback = std::move(callback), &notification]() {
+ callback();
notification.Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
- };
+ }
+
+ ExecCtx exec_ctx_;
+ std::shared_ptr<WorkSerializer> work_serializer_ =
+ std::make_shared<WorkSerializer>(
+ grpc_event_engine::experimental::GetDefaultEventEngine());
+ RefCountedPtr<FakeResolverResponseGenerator> response_generator_ =
+ MakeRefCounted<FakeResolverResponseGenerator>();
+ ResultHandler* result_handler_ = nullptr;
+};
+
+TEST_F(FakeResolverTest, WaitForResolverSet) {
+ EXPECT_FALSE(response_generator_->WaitForResolverSet(absl::Milliseconds(1)));
+ auto resolver = CreateResolver();
+ ASSERT_NE(resolver, nullptr);
+ EXPECT_TRUE(response_generator_->WaitForResolverSet(absl::Milliseconds(1)));
+}
+
+TEST_F(FakeResolverTest, ReturnResultBeforeResolverCreated) {
+ // Return result via response generator.
+ Resolver::Result result = CreateResolverResult();
+ response_generator_->SetResponseAsync(result);
+ // Create and start resolver.
+ auto resolver = CreateResolver();
+ ASSERT_NE(resolver, nullptr);
+ absl::Notification notification;
+ result_handler_->SetExpectedAndNotification(std::move(result), &notification);
+ RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); });
+ // Expect result.
+ ASSERT_TRUE(notification.WaitForNotificationWithTimeout(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+}
+
+TEST_F(FakeResolverTest, ReturnResultBeforeResolverStarted) {
// Create resolver.
- ResultHandler* result_handler = new ResultHandler();
- grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
- response_generator =
- grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- grpc_core::OrphanablePtr<grpc_core::Resolver> resolver = build_fake_resolver(
- work_serializer, response_generator.get(),
- std::unique_ptr<grpc_core::Resolver::ResultHandler>(result_handler));
- ASSERT_NE(resolver.get(), nullptr);
- synchronously([resolver = resolver.get()] { resolver->StartLocked(); });
- // Test 1: normal resolution.
- // next_results != NULL, reresolution_results == NULL.
- // Expected response is next_results.
- gpr_log(GPR_INFO, "TEST 1");
- grpc_core::Resolver::Result result = create_new_resolver_result();
- gpr_event ev1;
- gpr_event_init(&ev1);
- result_handler->SetExpectedAndEvent(result, &ev1);
- response_generator->SetResponseSynchronously(std::move(result));
- grpc_core::ExecCtx::Get()->Flush();
- ASSERT_NE(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)), nullptr);
- // Test 2: update resolution.
- // next_results != NULL, reresolution_results == NULL.
- // Expected response is next_results.
- gpr_log(GPR_INFO, "TEST 2");
- result = create_new_resolver_result();
- gpr_event ev2;
- gpr_event_init(&ev2);
- result_handler->SetExpectedAndEvent(result, &ev2);
- response_generator->SetResponseSynchronously(std::move(result));
- grpc_core::ExecCtx::Get()->Flush();
- ASSERT_NE(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)), nullptr);
- // Test 3: normal re-resolution.
- // next_results == NULL, reresolution_results != NULL.
- // Expected response is reresolution_results.
- gpr_log(GPR_INFO, "TEST 3");
- grpc_core::Resolver::Result reresolution_result =
- create_new_resolver_result();
- gpr_event ev3;
- gpr_event_init(&ev3);
- result_handler->SetExpectedAndEvent(reresolution_result, &ev3);
- // Set reresolution_results.
- // No result will be returned until re-resolution is requested.
- response_generator->SetReresolutionResponseSynchronously(reresolution_result);
- grpc_core::ExecCtx::Get()->Flush();
- // Trigger a re-resolution.
- synchronously(
- [resolver = resolver.get()] { resolver->RequestReresolutionLocked(); });
- grpc_core::ExecCtx::Get()->Flush();
- ASSERT_NE(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)), nullptr);
- // Test 4: repeat re-resolution.
- // next_results == NULL, reresolution_results != NULL.
- // Expected response is reresolution_results.
- gpr_log(GPR_INFO, "TEST 4");
- gpr_event ev4;
- gpr_event_init(&ev4);
- result_handler->SetExpectedAndEvent(std::move(reresolution_result), &ev4);
- // Trigger a re-resolution.
- synchronously(
+ auto resolver = CreateResolver();
+ ASSERT_NE(resolver, nullptr);
+ Resolver::Result result = CreateResolverResult();
+ absl::Notification notification;
+ result_handler_->SetExpectedAndNotification(result, &notification);
+ // Return result via response generator.
+ response_generator_->SetResponseAsync(std::move(result));
+ // Start resolver.
+ RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); });
+ // Expect result.
+ ASSERT_TRUE(notification.WaitForNotificationWithTimeout(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+}
+
+TEST_F(FakeResolverTest, ReturnResult) {
+ // Create and start resolver.
+ auto resolver = CreateResolver();
+ ASSERT_NE(resolver, nullptr);
+ RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); });
+ Resolver::Result result = CreateResolverResult();
+ absl::Notification notification;
+ result_handler_->SetExpectedAndNotification(result, &notification);
+ // Return result via response generator.
+ response_generator_->SetResponseAsync(std::move(result));
+ // Expect result.
+ ASSERT_TRUE(notification.WaitForNotificationWithTimeout(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+}
+
+TEST_F(FakeResolverTest, WaitForReresolutionRequest) {
+ // Create and start resolver.
+ auto resolver = CreateResolver();
+ ASSERT_NE(resolver, nullptr);
+ RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); });
+ // No re-resolution requested yet.
+ EXPECT_FALSE(
+ response_generator_->WaitForReresolutionRequest(absl::Milliseconds(1)));
+ // Request re-resolution, then try again.
+ RunSynchronously(
[resolver = resolver.get()] { resolver->RequestReresolutionLocked(); });
- grpc_core::ExecCtx::Get()->Flush();
- ASSERT_NE(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)), nullptr);
- // Test 5: normal resolution.
- // next_results != NULL, reresolution_results != NULL.
- // Expected response is next_results.
- gpr_log(GPR_INFO, "TEST 5");
- result = create_new_resolver_result();
- gpr_event ev5;
- gpr_event_init(&ev5);
- result_handler->SetExpectedAndEvent(result, &ev5);
- response_generator->SetResponseSynchronously(std::move(result));
- grpc_core::ExecCtx::Get()->Flush();
- ASSERT_NE(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)), nullptr);
- // Test 6: no-op.
- // Requesting a new resolution without setting the response shouldn't trigger
- // the resolution callback.
- gpr_log(GPR_INFO, "TEST 6");
- gpr_event ev6;
- gpr_event_init(&ev6);
- result_handler->SetExpectedAndEvent(grpc_core::Resolver::Result(), &ev6);
- ASSERT_EQ(gpr_event_wait(&ev6, grpc_timeout_milliseconds_to_deadline(100)),
- nullptr);
- // Clean up.
- resolver.reset();
+ EXPECT_TRUE(
+ response_generator_->WaitForReresolutionRequest(absl::Milliseconds(1)));
}
+} // namespace testing
+} // namespace grpc_core
+
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index 0e53196d72..46c9aa3d5b 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -575,14 +575,23 @@ grpc_cc_test(
grpc_cc_test(
name = "no_server_test",
srcs = ["no_server_test.cc"],
+ external_deps = [
+ "absl/status",
+ "absl/status:statusor",
+ "absl/time",
+ ],
language = "C++",
deps = [
"cq_verifier",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_public_hdrs",
+ "//:grpc_resolver",
"//:grpc_resolver_fake",
"//:ref_counted_ptr",
+ "//src/core:channel_args",
+ "//src/core:grpc_service_config",
"//test/core/util:grpc_test_util",
],
)
diff --git a/test/core/end2end/no_server_test.cc b/test/core/end2end/no_server_test.cc
index 8172ae83bd..59e25efc9a 100644
--- a/test/core/end2end/no_server_test.cc
+++ b/test/core/end2end/no_server_test.cc
@@ -18,6 +18,12 @@
#include <string.h>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/time/time.h"
+
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/propagation_bits.h>
@@ -27,8 +33,12 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
+#include "src/core/lib/resolver/resolver.h"
+#include "src/core/lib/service_config/service_config.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/test_config.h"
@@ -43,13 +53,12 @@ void run_test(bool wait_for_ready) {
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
- response_generator.get());
- grpc_channel_args args = {1, &arg};
+ auto args = grpc_core::ChannelArgs().SetObject(response_generator).ToC();
// create a call, channel to a non existant server
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
- grpc_channel* chan = grpc_channel_create("fake:nonexistant", creds, &args);
+ grpc_channel* chan =
+ grpc_channel_create("fake:nonexistant", creds, args.get());
grpc_channel_credentials_release(creds);
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(2);
grpc_call* call = grpc_channel_create_call(
@@ -80,9 +89,13 @@ void run_test(bool wait_for_ready) {
grpc_core::CqVerifier::tag(1), nullptr));
{
- response_generator->WaitForResolverSet();
+ response_generator->WaitForResolverSet(
+ absl::Seconds(5 * grpc_test_slowdown_factor()));
grpc_core::ExecCtx exec_ctx;
- response_generator->SetFailure();
+ grpc_core::Resolver::Result result;
+ result.addresses = absl::UnavailableError("Resolver transient failure");
+ result.service_config = result.addresses.status();
+ response_generator->SetResponseSynchronously(std::move(result));
}
// verify that all tags get completed
diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc
index 4bbbf8a276..f5d8565f95 100644
--- a/test/core/transport/chttp2/too_many_pings_test.cc
+++ b/test/core/transport/chttp2/too_many_pings_test.cc
@@ -467,22 +467,17 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
// response generator to switch between the two.
auto response_generator =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- grpc_arg client_args[] = {
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
- grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
- response_generator.get())};
- grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
- client_args};
+ auto client_channel_args =
+ grpc_core::ChannelArgs()
+ .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0)
+ .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0)
+ .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000)
+ .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0)
+ .SetObject(response_generator)
+ .ToC();
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
grpc_channel* channel =
- grpc_channel_create("fake:///", creds, &client_channel_args);
+ grpc_channel_create("fake:///", creds, client_channel_args.get());
grpc_channel_credentials_release(creds);
// For a single subchannel 3 GOAWAYs would be sufficient to increase the
// keepalive time from 1 second to beyond 5 seconds. Even though we are
@@ -539,22 +534,17 @@ TEST_F(KeepaliveThrottlingTest,
// create a single channel with round robin load balancing policy.
auto response_generator =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- grpc_arg client_args[] = {
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
- grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
- response_generator.get())};
- grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
- client_args};
+ auto client_channel_args =
+ grpc_core::ChannelArgs()
+ .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0)
+ .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0)
+ .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000)
+ .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0)
+ .SetObject(response_generator)
+ .ToC();
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
grpc_channel* channel =
- grpc_channel_create("fake:///", creds, &client_channel_args);
+ grpc_channel_create("fake:///", creds, client_channel_args.get());
grpc_channel_credentials_release(creds);
response_generator->SetResponseSynchronously(
BuildResolverResult({absl::StrCat("ipv4:", server_address1),
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index ad4bbfb649..4a2eb3f1d4 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -213,29 +213,16 @@ class FakeResolverResponseGeneratorWrapper {
response_generator_ = std::move(other.response_generator_);
}
+ void SetResponse(grpc_core::Resolver::Result result) {
+ grpc_core::ExecCtx exec_ctx;
+ response_generator_->SetResponseSynchronously(std::move(result));
+ }
+
void SetNextResolution(const std::vector<int>& ports,
const char* service_config_json = nullptr,
const grpc_core::ChannelArgs& per_address_args =
grpc_core::ChannelArgs()) {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetResponseSynchronously(
- BuildFakeResults(ports, service_config_json, per_address_args));
- }
-
- void SetNextResolutionUponError(const std::vector<int>& ports) {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetReresolutionResponseSynchronously(
- BuildFakeResults(ports));
- }
-
- void SetFailureOnReresolution() {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetFailureOnReresolution();
- }
-
- void SetResponse(grpc_core::Resolver::Result result) {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetResponseSynchronously(std::move(result));
+ SetResponse(BuildFakeResults(ports, service_config_json, per_address_args));
}
grpc_core::FakeResolverResponseGenerator* Get() const {
@@ -1155,10 +1142,15 @@ TEST_F(PickFirstTest, ReresolutionNoSelected) {
DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
MakeConnectionFailureRegex("failed to connect to all addresses"));
}
- // Set a re-resolution result that contains reachable ports, so that the
+ // PF should request re-resolution.
+ gpr_log(GPR_INFO, "****** WAITING FOR RE-RESOLUTION *******");
+ EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+ gpr_log(GPR_INFO, "****** RE-RESOLUTION SEEN *******");
+ // Send a resolver result that contains reachable ports, so that the
// pick_first LB policy can recover soon.
- response_generator.SetNextResolutionUponError(alive_ports);
- gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
+ response_generator.SetNextResolution(alive_ports);
+ gpr_log(GPR_INFO, "****** RE-RESOLUTION SENT *******");
WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) {
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
EXPECT_THAT(status.error_message(),
@@ -1294,7 +1286,6 @@ TEST_F(PickFirstTest, IdleOnDisconnect) {
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Stop server. Channel should go into state IDLE.
- response_generator.SetFailureOnReresolution();
servers_[0]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
@@ -1603,16 +1594,20 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
response_generator.SetNextResolution(ports);
// Wait for both servers to be seen.
WaitForServers(DEBUG_LOCATION, stub, 0, 2);
- // Tell the fake resolver to send an update that adds the last server, but
- // only when the LB policy requests re-resolution.
- ports.push_back(servers_[2]->port_);
- response_generator.SetNextResolutionUponError(ports);
// Have server 0 send a GOAWAY. This should trigger a re-resolution.
gpr_log(GPR_INFO, "****** SENDING GOAWAY FROM SERVER 0 *******");
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
}
+ gpr_log(GPR_INFO, "****** WAITING FOR RE-RESOLUTION REQUEST *******");
+ EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+ gpr_log(GPR_INFO, "****** RE-RESOLUTION REQUEST SEEN *******");
+ // Tell the fake resolver to send an update that adds the last server, but
+ // only when the LB policy requests re-resolution.
+ ports.push_back(servers_[2]->port_);
+ response_generator.SetNextResolution(ports);
// Wait for the client to see server 2.
WaitForServer(DEBUG_LOCATION, stub, 2);
}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 949ab9f5f1..51ab18b45a 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -25,9 +25,12 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include "absl/cleanup/cleanup.h"
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
+#include "absl/synchronization/notification.h"
+#include "absl/types/span.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@@ -48,6 +51,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/crash.h"
+#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -82,12 +86,12 @@
// 2) the retry timer is active. Again, the weak reference it holds should
// prevent a premature call to \a glb_destroy.
-using std::chrono::system_clock;
-
using grpc::lb::v1::LoadBalancer;
using grpc::lb::v1::LoadBalanceRequest;
using grpc::lb::v1::LoadBalanceResponse;
+using grpc_core::SourceLocation;
+
namespace grpc {
namespace testing {
namespace {
@@ -102,8 +106,10 @@ constexpr char kDefaultServiceConfig[] =
using BackendService = CountedService<TestServiceImpl>;
using BalancerService = CountedService<LoadBalancer::Service>;
-const char g_kCallCredsMdKey[] = "call-creds";
-const char g_kCallCredsMdValue[] = "should not be received by balancer";
+const char kCallCredsMdKey[] = "call-creds";
+const char kCallCredsMdValue[] = "should not be received by balancer";
+const char kRequestMessage[] = "Live long and prosper.";
+const absl::string_view kApplicationTargetName = "application_target_name";
// A test user agent string sent by the client only to the grpclb loadbalancer.
// The backend should not see this user-agent string.
@@ -123,10 +129,10 @@ class BackendServiceImpl : public BackendService {
}
// Backend should receive the call credentials metadata.
auto call_credentials_entry =
- context->client_metadata().find(g_kCallCredsMdKey);
+ context->client_metadata().find(kCallCredsMdKey);
EXPECT_NE(call_credentials_entry, context->client_metadata().end());
if (call_credentials_entry != context->client_metadata().end()) {
- EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue);
+ EXPECT_EQ(call_credentials_entry->second, kCallCredsMdValue);
}
IncreaseRequestCount();
const auto status = TestServiceImpl::Echo(context, request, response);
@@ -200,125 +206,44 @@ class BalancerServiceImpl : public BalancerService {
using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
- explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
- : client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds) {}
-
- Status BalanceLoad(ServerContext* context, Stream* stream) override {
- gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
+ void Start() {
{
grpc::internal::MutexLock lock(&mu_);
- if (serverlist_done_) goto done;
+ shutdown_stream_ = false;
+ response_queue_.clear();
}
{
- // The loadbalancer should see a test user agent because it was
- // specifically configured at the client using
- // GRPC_ARG_GRPCLB_CHANNEL_ARGS
- auto it = context->client_metadata().find("user-agent");
- EXPECT_TRUE(it != context->client_metadata().end());
- if (it != context->client_metadata().end()) {
- EXPECT_THAT(std::string(it->second.data(), it->second.length()),
- ::testing::StartsWith(kGrpclbSpecificUserAgentString));
- }
- // Balancer shouldn't receive the call credentials metadata.
- EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
- context->client_metadata().end());
- LoadBalanceRequest request;
- std::vector<ResponseDelayPair> responses_and_delays;
-
- if (!stream->Read(&request)) {
- goto done;
- } else {
- if (request.has_initial_request()) {
- grpc::internal::MutexLock lock(&mu_);
- service_names_.push_back(request.initial_request().name());
- }
- }
- IncreaseRequestCount();
- gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this,
- request.DebugString().c_str());
-
- // TODO(juanlishen): Initial response should always be the first response.
- if (client_load_reporting_interval_seconds_ > 0) {
- LoadBalanceResponse initial_response;
- initial_response.mutable_initial_response()
- ->mutable_client_stats_report_interval()
- ->set_seconds(client_load_reporting_interval_seconds_);
- stream->Write(initial_response);
- }
-
- {
- grpc::internal::MutexLock lock(&mu_);
- responses_and_delays = responses_and_delays_;
- }
- for (const auto& response_and_delay : responses_and_delays) {
- SendResponse(stream, response_and_delay.first,
- response_and_delay.second);
- }
- {
- grpc::internal::MutexLock lock(&mu_);
- while (!serverlist_done_) {
- serverlist_cond_.Wait(&mu_);
- }
- }
-
- if (client_load_reporting_interval_seconds_ > 0) {
- request.Clear();
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'",
- this, request.DebugString().c_str());
- GPR_ASSERT(request.has_client_stats());
- ClientStats load_report;
- load_report.num_calls_started =
- request.client_stats().num_calls_started();
- load_report.num_calls_finished =
- request.client_stats().num_calls_finished();
- load_report.num_calls_finished_with_client_failed_to_send =
- request.client_stats()
- .num_calls_finished_with_client_failed_to_send();
- load_report.num_calls_finished_known_received =
- request.client_stats().num_calls_finished_known_received();
- for (const auto& drop_token_count :
- request.client_stats().calls_finished_with_drop()) {
- load_report
- .drop_token_counts[drop_token_count.load_balance_token()] =
- drop_token_count.num_calls();
- }
- // We need to acquire the lock here in order to prevent the notify_one
- // below from firing before its corresponding wait is executed.
- grpc::internal::MutexLock lock(&mu_);
- load_report_queue_.emplace_back(std::move(load_report));
- load_report_cond_.Signal();
- }
- }
+ grpc::internal::MutexLock lock(&load_report_mu_);
+ load_report_queue_.clear();
}
- done:
- gpr_log(GPR_INFO, "LB[%p]: done", this);
- return Status::OK;
}
- void add_response(const LoadBalanceResponse& response, int send_after_ms) {
- grpc::internal::MutexLock lock(&mu_);
- responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
+ void Shutdown() {
+ ShutdownStream();
+ gpr_log(GPR_INFO, "LB[%p]: shut down", this);
}
- void Start() {
+ void set_client_load_reporting_interval_seconds(int seconds) {
+ client_load_reporting_interval_seconds_ = seconds;
+ }
+
+ void SendResponse(LoadBalanceResponse response) {
grpc::internal::MutexLock lock(&mu_);
- serverlist_done_ = false;
- responses_and_delays_.clear();
- load_report_queue_.clear();
+ response_queue_.emplace_back(std::move(response));
+ if (response_cond_ != nullptr) response_cond_->SignalAll();
}
- void Shutdown() {
- NotifyDoneWithServerlists();
- gpr_log(GPR_INFO, "LB[%p]: shut down", this);
+ void ShutdownStream() {
+ grpc::internal::MutexLock lock(&mu_);
+ shutdown_stream_ = true;
+ if (response_cond_ != nullptr) response_cond_->SignalAll();
}
ClientStats WaitForLoadReport() {
- grpc::internal::MutexLock lock(&mu_);
+ grpc::internal::MutexLock lock(&load_report_mu_);
if (load_report_queue_.empty()) {
while (load_report_queue_.empty()) {
- load_report_cond_.Wait(&mu_);
+ load_report_cond_.Wait(&load_report_mu_);
}
}
ClientStats load_report = std::move(load_report_queue_.front());
@@ -326,52 +251,200 @@ class BalancerServiceImpl : public BalancerService {
return load_report;
}
- void NotifyDoneWithServerlists() {
- grpc::internal::MutexLock lock(&mu_);
- if (!serverlist_done_) {
- serverlist_done_ = true;
- serverlist_cond_.SignalAll();
- }
- }
-
std::vector<std::string> service_names() {
grpc::internal::MutexLock lock(&mu_);
return service_names_;
}
private:
- void SendResponse(Stream* stream, const LoadBalanceResponse& response,
- int delay_ms) {
- gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
- if (delay_ms > 0) {
- gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
+ // Request handler.
+ Status BalanceLoad(ServerContext* context, Stream* stream) override {
+ gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
+ {
+ grpc::internal::MutexLock lock(&mu_);
+ if (shutdown_stream_) {
+ gpr_log(GPR_INFO, "LB[%p]: stream shutdown at start", this);
+ return Status::OK;
+ }
+ }
+ // The loadbalancer should see a test user agent because it was
+ // specifically configured at the client using
+ // GRPC_ARG_GRPCLB_CHANNEL_ARGS
+ auto it = context->client_metadata().find("user-agent");
+ EXPECT_TRUE(it != context->client_metadata().end());
+ if (it != context->client_metadata().end()) {
+ EXPECT_THAT(std::string(it->second.data(), it->second.length()),
+ ::testing::StartsWith(kGrpclbSpecificUserAgentString));
+ }
+ // Balancer shouldn't receive the call credentials metadata.
+ EXPECT_EQ(context->client_metadata().find(kCallCredsMdKey),
+ context->client_metadata().end());
+ std::vector<ResponseDelayPair> response_queue_and_delays;
+ // Read initial request.
+ LoadBalanceRequest request;
+ if (!stream->Read(&request)) {
+ gpr_log(GPR_INFO, "LB[%p]: stream read returned false", this);
+ return Status::OK;
+ }
+ EXPECT_TRUE(request.has_initial_request());
+ {
+ grpc::internal::MutexLock lock(&mu_);
+ service_names_.push_back(request.initial_request().name());
+ }
+ IncreaseRequestCount();
+ gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this,
+ request.DebugString().c_str());
+ // Send initial response.
+ LoadBalanceResponse response;
+ auto* initial_response = response.mutable_initial_response();
+ if (client_load_reporting_interval_seconds_ > 0) {
+ initial_response->mutable_client_stats_report_interval()->set_seconds(
+ client_load_reporting_interval_seconds_);
}
- gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
- response.DebugString().c_str());
- IncreaseResponseCount();
stream->Write(response);
+ // Spawn a separate thread to read requests from the client.
+ absl::Notification reader_shutdown;
+ std::thread reader(std::bind(&BalancerServiceImpl::ReadThread, this, stream,
+ &reader_shutdown));
+ auto thread_cleanup = absl::MakeCleanup([&]() {
+ gpr_log(GPR_INFO, "shutting down reader thread");
+ reader_shutdown.Notify();
+ gpr_log(GPR_INFO, "joining reader thread");
+ reader.join();
+ gpr_log(GPR_INFO, "joining reader thread complete");
+ });
+ // Send responses as instructed by the test.
+ while (true) {
+ auto response = GetNextResponse();
+ if (!response.has_value()) break;
+ gpr_log(GPR_INFO, "LB[%p]: Sending response: %s", this,
+ response->DebugString().c_str());
+ IncreaseResponseCount();
+ stream->Write(*response);
+ }
+ gpr_log(GPR_INFO, "LB[%p]: done", this);
+ return Status::OK;
+ }
+
+ // Reader thread spawned by request handler.
+ void ReadThread(Stream* stream, absl::Notification* shutdown) {
+ LoadBalanceRequest request;
+ while (!shutdown->HasBeenNotified() && stream->Read(&request)) {
+ gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'",
+ this, request.DebugString().c_str());
+ EXPECT_GT(client_load_reporting_interval_seconds_, 0);
+ EXPECT_TRUE(request.has_client_stats());
+ ClientStats load_report;
+ load_report.num_calls_started =
+ request.client_stats().num_calls_started();
+ load_report.num_calls_finished =
+ request.client_stats().num_calls_finished();
+ load_report.num_calls_finished_with_client_failed_to_send =
+ request.client_stats()
+ .num_calls_finished_with_client_failed_to_send();
+ load_report.num_calls_finished_known_received =
+ request.client_stats().num_calls_finished_known_received();
+ for (const auto& drop_token_count :
+ request.client_stats().calls_finished_with_drop()) {
+ load_report.drop_token_counts[drop_token_count.load_balance_token()] =
+ drop_token_count.num_calls();
+ }
+ // We need to acquire the lock here in order to prevent the notify_one
+ // below from firing before its corresponding wait is executed.
+ grpc::internal::MutexLock lock(&load_report_mu_);
+ load_report_queue_.emplace_back(std::move(load_report));
+ load_report_cond_.Signal();
+ }
+ }
+
+ // Helper for request handler thread to get the next response to be
+ // sent on the stream. Returns nullopt when the test has requested
+ // stream shutdown.
+ absl::optional<LoadBalanceResponse> GetNextResponse() {
+ grpc::internal::MutexLock lock(&mu_);
+ if (!shutdown_stream_ && response_queue_.empty()) {
+ grpc::internal::CondVar condition;
+ response_cond_ = &condition;
+ condition.Wait(&mu_);
+ response_cond_ = nullptr;
+ }
+ if (response_queue_.empty()) return absl::nullopt;
+ LoadBalanceResponse response = response_queue_.front();
+ response_queue_.pop_front();
+ return response;
}
- const int client_load_reporting_interval_seconds_;
- std::vector<ResponseDelayPair> responses_and_delays_;
- std::vector<std::string> service_names_;
+ int client_load_reporting_interval_seconds_ = 0;
grpc::internal::Mutex mu_;
- grpc::internal::CondVar serverlist_cond_;
- bool serverlist_done_ ABSL_GUARDED_BY(mu_) = false;
+ std::vector<std::string> service_names_ ABSL_GUARDED_BY(mu_);
+ bool shutdown_stream_ ABSL_GUARDED_BY(mu_) = false;
+ std::deque<LoadBalanceResponse> response_queue_ ABSL_GUARDED_BY(mu_);
+ grpc::internal::CondVar* response_cond_ ABSL_GUARDED_BY(mu_) = nullptr;
+
+ grpc::internal::Mutex load_report_mu_;
grpc::internal::CondVar load_report_cond_;
- std::deque<ClientStats> load_report_queue_ ABSL_GUARDED_BY(mu_);
+ std::deque<ClientStats> load_report_queue_ ABSL_GUARDED_BY(load_report_mu_);
};
class GrpclbEnd2endTest : public ::testing::Test {
protected:
- GrpclbEnd2endTest(size_t num_backends, size_t num_balancers,
- int client_load_reporting_interval_seconds)
- : server_host_("localhost"),
- num_backends_(num_backends),
- num_balancers_(num_balancers),
- client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds) {}
+ template <typename T>
+ struct ServerThread {
+ template <typename... Args>
+ explicit ServerThread(const std::string& type, Args&&... args)
+ : port_(grpc_pick_unused_port_or_die()),
+ type_(type),
+ service_(std::forward<Args>(args)...) {}
+
+ ~ServerThread() { Shutdown(); }
+
+ void Start() {
+ gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
+ GPR_ASSERT(!running_);
+ running_ = true;
+ service_.Start();
+ grpc::internal::Mutex mu;
+ // We need to acquire the lock here in order to prevent the notify_one
+ // by ServerThread::Serve from firing before the wait below is hit.
+ grpc::internal::MutexLock lock(&mu);
+ grpc::internal::CondVar cond;
+ thread_ = std::make_unique<std::thread>(
+ std::bind(&ServerThread::Serve, this, &mu, &cond));
+ cond.Wait(&mu);
+ gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
+ }
+
+ void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) {
+ // We need to acquire the lock here in order to prevent the notify_one
+ // below from firing before its corresponding wait is executed.
+ grpc::internal::MutexLock lock(mu);
+ ServerBuilder builder;
+ std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
+ grpc_fake_transport_security_server_credentials_create()));
+ builder.AddListeningPort(grpc_core::LocalIpAndPort(port_), creds);
+ builder.RegisterService(&service_);
+ server_ = builder.BuildAndStart();
+ cond->Signal();
+ }
+
+ void Shutdown() {
+ if (!running_) return;
+ gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
+ service_.Shutdown();
+ server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
+ thread_->join();
+ gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
+ running_ = false;
+ }
+
+ const int port_;
+ std::string type_;
+ T service_;
+ std::unique_ptr<Server> server_;
+ std::unique_ptr<std::thread> thread_;
+ bool running_ = false;
+ };
static void SetUpTestSuite() {
// Make the backup poller poll very frequently in order to pick up
@@ -391,30 +464,28 @@ class GrpclbEnd2endTest : public ::testing::Test {
void SetUp() override {
response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- // Start the backends.
- for (size_t i = 0; i < num_backends_; ++i) {
- backends_.emplace_back(new ServerThread<BackendServiceImpl>("backend"));
- backends_.back()->Start(server_host_);
- }
- // Start the load balancers.
- for (size_t i = 0; i < num_balancers_; ++i) {
- balancers_.emplace_back(new ServerThread<BalancerServiceImpl>(
- "balancer", client_load_reporting_interval_seconds_));
- balancers_.back()->Start(server_host_);
- }
+ balancer_ = CreateAndStartBalancer();
ResetStub();
}
void TearDown() override {
ShutdownAllBackends();
- for (auto& balancer : balancers_) balancer->Shutdown();
+ balancer_->Shutdown();
+ }
+
+ void CreateBackends(size_t num_backends) {
+ for (size_t i = 0; i < num_backends; ++i) {
+ backends_.emplace_back(
+ std::make_unique<ServerThread<BackendServiceImpl>>("backend"));
+ backends_.back()->Start();
+ }
}
void StartAllBackends() {
- for (auto& backend : backends_) backend->Start(server_host_);
+ for (auto& backend : backends_) backend->Start();
}
- void StartBackend(size_t index) { backends_[index]->Start(server_host_); }
+ void StartBackend(size_t index) { backends_[index]->Start(); }
void ShutdownAllBackends() {
for (auto& backend : backends_) backend->Shutdown();
@@ -422,7 +493,14 @@ class GrpclbEnd2endTest : public ::testing::Test {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
- void ResetStub(int fallback_timeout = 0,
+ std::unique_ptr<ServerThread<BalancerServiceImpl>> CreateAndStartBalancer() {
+ auto balancer =
+ std::make_unique<ServerThread<BalancerServiceImpl>>("balancer");
+ balancer->Start();
+ return balancer;
+ }
+
+ void ResetStub(int fallback_timeout_ms = 0,
const std::string& expected_targets = "",
int subchannel_cache_delay_ms = 0) {
// Send a separate user agent string for the grpclb load balancer alone.
@@ -432,7 +510,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpclb_channel_args = grpclb_channel_args.Set(
GRPC_ARG_PRIMARY_USER_AGENT_STRING, kGrpclbSpecificUserAgentString);
ChannelArguments args;
- if (fallback_timeout > 0) args.SetGrpclbFallbackTimeout(fallback_timeout);
+ if (fallback_timeout_ms > 0) {
+ args.SetGrpclbFallbackTimeout(fallback_timeout_ms *
+ grpc_test_slowdown_factor());
+ }
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
if (!expected_targets.empty()) {
@@ -464,20 +545,19 @@ class GrpclbEnd2endTest : public ::testing::Test {
GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS,
const_cast<grpc_channel_args*>(grpclb_channel_args.ToC().get()),
&channel_args_vtable);
- std::ostringstream uri;
- uri << "fake:///" << kApplicationTargetName_;
// TODO(dgq): templatize tests to run everything using both secure and
// insecure channel credentials.
grpc_channel_credentials* channel_creds =
grpc_fake_transport_security_credentials_create();
grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create(
- g_kCallCredsMdKey, g_kCallCredsMdValue);
+ kCallCredsMdKey, kCallCredsMdValue);
std::shared_ptr<ChannelCredentials> creds(
new SecureChannelCredentials(grpc_composite_channel_credentials_create(
channel_creds, call_creds, nullptr)));
call_creds->Unref();
channel_creds->Unref();
- channel_ = grpc::CreateCustomChannel(uri.str(), creds, args);
+ channel_ = grpc::CreateCustomChannel(
+ absl::StrCat("fake:", kApplicationTargetName), creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
@@ -486,11 +566,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
}
ClientStats WaitForLoadReports() {
- ClientStats client_stats;
- for (auto& balancer : balancers_) {
- client_stats += balancer->service_.WaitForLoadReport();
- }
- return client_stats;
+ return balancer_->service_.WaitForLoadReport();
}
bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0) {
@@ -516,102 +592,106 @@ class GrpclbEnd2endTest : public ::testing::Test {
++*num_total;
}
- std::tuple<int, int, int> WaitForAllBackends(int num_requests_multiple_of = 1,
- size_t start_index = 0,
- size_t stop_index = 0) {
+ struct WaitForBackendOptions {
+ int timeout_seconds = 10;
+ int num_requests_multiple_of = 1;
+
+ WaitForBackendOptions() {}
+ WaitForBackendOptions& SetTimeoutSeconds(int seconds) {
+ timeout_seconds = seconds;
+ return *this;
+ }
+ WaitForBackendOptions& SetNumRequestsMultipleOf(int multiple) {
+ num_requests_multiple_of = multiple;
+ return *this;
+ }
+ };
+
+ std::tuple<int, int, int> WaitForAllBackends(
+ size_t start_index = 0, size_t stop_index = 0,
+ WaitForBackendOptions options = WaitForBackendOptions(),
+ SourceLocation location = SourceLocation()) {
+ gpr_log(GPR_INFO, "Waiting for backends [%" PRIuPTR ", %" PRIuPTR ")",
+ start_index, stop_index);
+ const absl::Time deadline =
+ absl::Now() +
+ absl::Seconds(options.timeout_seconds * grpc_test_slowdown_factor());
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
int num_total = 0;
while (!SeenAllBackends(start_index, stop_index)) {
+ absl::Time now = absl::Now();
+ EXPECT_LT(now, deadline) << location.file() << ":" << location.line();
+ if (now > deadline) break;
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
}
- while (num_total % num_requests_multiple_of != 0) {
+ while (num_total % options.num_requests_multiple_of != 0) {
+ absl::Time now = absl::Now();
+ EXPECT_LT(now, deadline) << location.file() << ":" << location.line();
+ if (now > deadline) break;
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
}
ResetBackendCounters();
gpr_log(GPR_INFO,
"Performed %d warm up requests (a multiple of %d) against the "
"backends. %d succeeded, %d failed, %d dropped.",
- num_total, num_requests_multiple_of, num_ok, num_failure,
+ num_total, options.num_requests_multiple_of, num_ok, num_failure,
num_drops);
return std::make_tuple(num_ok, num_failure, num_drops);
}
- void WaitForBackend(size_t backend_idx) {
- do {
- (void)SendRpc();
- } while (backends_[backend_idx]->service_.request_count() == 0);
- ResetBackendCounters();
+ void WaitForBackend(size_t backend_idx,
+ WaitForBackendOptions options = WaitForBackendOptions(),
+ SourceLocation location = SourceLocation()) {
+ WaitForAllBackends(backend_idx, backend_idx + 1, options, location);
}
- struct AddressData {
- int port;
- std::string balancer_name;
- };
-
- grpc_core::EndpointAddressesList CreateLbAddressesFromAddressDataList(
- const std::vector<AddressData>& address_data) {
+ grpc_core::EndpointAddressesList CreateAddressListFromPorts(
+ const absl::Span<const int> ports, absl::string_view balancer_name = "") {
grpc_core::EndpointAddressesList addresses;
- for (const auto& addr : address_data) {
+ for (int port : ports) {
absl::StatusOr<grpc_core::URI> lb_uri =
- grpc_core::URI::Parse(grpc_core::LocalIpUri(addr.port));
+ grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
GPR_ASSERT(lb_uri.ok());
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
- addresses.emplace_back(
- address, grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY,
- addr.balancer_name));
+ grpc_core::ChannelArgs args;
+ if (!balancer_name.empty()) {
+ args = args.Set(GRPC_ARG_DEFAULT_AUTHORITY, balancer_name);
+ }
+ addresses.emplace_back(address, args);
}
return addresses;
}
- grpc_core::Resolver::Result MakeResolverResult(
- const std::vector<AddressData>& balancer_address_data,
- const std::vector<AddressData>& backend_address_data = {},
+ void SetNextResolutionFromEndpoints(
+ grpc_core::EndpointAddressesList balancers,
+ grpc_core::EndpointAddressesList backends = {},
const char* service_config_json = kDefaultServiceConfig) {
+ grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
- result.addresses =
- CreateLbAddressesFromAddressDataList(backend_address_data);
+ result.addresses = std::move(backends);
result.service_config = grpc_core::ServiceConfigImpl::Create(
grpc_core::ChannelArgs(), service_config_json);
GPR_ASSERT(result.service_config.ok());
- grpc_core::EndpointAddressesList balancer_addresses =
- CreateLbAddressesFromAddressDataList(balancer_address_data);
result.args = grpc_core::SetGrpcLbBalancerAddresses(
- grpc_core::ChannelArgs(), std::move(balancer_addresses));
- return result;
- }
-
- void SetNextResolutionAllBalancers(
- const char* service_config_json = kDefaultServiceConfig) {
- std::vector<AddressData> addresses;
- for (size_t i = 0; i < balancers_.size(); ++i) {
- addresses.emplace_back(AddressData{balancers_[i]->port_, ""});
- }
- SetNextResolution(addresses, {}, service_config_json);
+ grpc_core::ChannelArgs(), std::move(balancers));
+ response_generator_->SetResponseSynchronously(std::move(result));
}
void SetNextResolution(
- const std::vector<AddressData>& balancer_address_data,
- const std::vector<AddressData>& backend_address_data = {},
+ const absl::Span<const int> balancer_ports,
+ const absl::Span<const int> backend_ports = {},
const char* service_config_json = kDefaultServiceConfig) {
- grpc_core::ExecCtx exec_ctx;
- grpc_core::Resolver::Result result = MakeResolverResult(
- balancer_address_data, backend_address_data, service_config_json);
- response_generator_->SetResponseSynchronously(std::move(result));
+ SetNextResolutionFromEndpoints(CreateAddressListFromPorts(balancer_ports),
+ CreateAddressListFromPorts(backend_ports),
+ service_config_json);
}
- void SetNextReresolutionResponse(
- const std::vector<AddressData>& balancer_address_data,
- const std::vector<AddressData>& backend_address_data = {},
+ void SetNextResolutionDefaultBalancer(
const char* service_config_json = kDefaultServiceConfig) {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->WaitForResolverSet();
- grpc_core::Resolver::Result result = MakeResolverResult(
- balancer_address_data, backend_address_data, service_config_json);
- response_generator_->SetReresolutionResponseSynchronously(
- std::move(result));
+ SetNextResolution({balancer_->port_}, {}, service_config_json);
}
std::vector<int> GetBackendPorts(size_t start_index = 0,
@@ -624,10 +704,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
return backend_ports;
}
- void ScheduleResponseForBalancer(size_t i,
- const LoadBalanceResponse& response,
- int delay_ms) {
- balancers_[i]->service_.add_response(response, delay_ms);
+ void SendBalancerResponse(LoadBalanceResponse response) {
+ balancer_->service_.SendResponse(std::move(response));
}
LoadBalanceResponse BuildResponseForBackends(
@@ -660,7 +738,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
- request.set_message(kRequestMessage_);
+ request.set_message(kRequestMessage);
if (!expected_status.ok()) {
auto* error = request.mutable_param()->mutable_expected_error();
error->set_code(expected_status.error_code());
@@ -681,7 +759,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
const Status status = SendRpc(&response, timeout_ms, wait_for_ready);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kRequestMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage);
}
}
@@ -690,159 +768,66 @@ class GrpclbEnd2endTest : public ::testing::Test {
EXPECT_FALSE(status.ok());
}
- template <typename T>
- struct ServerThread {
- template <typename... Args>
- explicit ServerThread(const std::string& type, Args&&... args)
- : port_(grpc_pick_unused_port_or_die()),
- type_(type),
- service_(std::forward<Args>(args)...) {}
-
- void Start(const std::string& server_host) {
- gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
- GPR_ASSERT(!running_);
- running_ = true;
- service_.Start();
- grpc::internal::Mutex mu;
- // We need to acquire the lock here in order to prevent the notify_one
- // by ServerThread::Serve from firing before the wait below is hit.
- grpc::internal::MutexLock lock(&mu);
- grpc::internal::CondVar cond;
- thread_ = std::make_unique<std::thread>(
- std::bind(&ServerThread::Serve, this, server_host, &mu, &cond));
- cond.Wait(&mu);
- gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
- }
-
- void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
- grpc::internal::CondVar* cond) {
- // We need to acquire the lock here in order to prevent the notify_one
- // below from firing before its corresponding wait is executed.
- grpc::internal::MutexLock lock(mu);
- std::ostringstream server_address;
- server_address << server_host << ":" << port_;
- ServerBuilder builder;
- std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
- grpc_fake_transport_security_server_credentials_create()));
- builder.AddListeningPort(server_address.str(), creds);
- builder.RegisterService(&service_);
- server_ = builder.BuildAndStart();
- cond->Signal();
- }
-
- void Shutdown() {
- if (!running_) return;
- gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
- service_.Shutdown();
- server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
- thread_->join();
- gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
- running_ = false;
- }
-
- const int port_;
- std::string type_;
- T service_;
- std::unique_ptr<Server> server_;
- std::unique_ptr<std::thread> thread_;
- bool running_ = false;
- };
-
- const std::string server_host_;
- const size_t num_backends_;
- const size_t num_balancers_;
- const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerThread<BackendServiceImpl>>> backends_;
- std::vector<std::unique_ptr<ServerThread<BalancerServiceImpl>>> balancers_;
+ std::unique_ptr<ServerThread<BalancerServiceImpl>> balancer_;
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator_;
- const std::string kRequestMessage_ = "Live long and prosper.";
- const std::string kApplicationTargetName_ = "application_target_name";
};
-class SingleBalancerTest : public GrpclbEnd2endTest {
- public:
- SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
-};
-
-TEST_F(SingleBalancerTest, Vanilla) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, Vanilla) {
+ const size_t kNumBackends = 3;
const size_t kNumRpcsPerAddress = 100;
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
+ CreateBackends(kNumBackends);
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
+ SetNextResolutionDefaultBalancer();
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server.
- CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
-
+ CheckRpcSendOk(kNumRpcsPerAddress * kNumBackends);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count());
}
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
-
+ EXPECT_EQ(1U, balancer_->service_.response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, SubchannelCaching) {
- ResetStub(/*fallback_timeout=*/0, /*expected_targets=*/"",
+TEST_F(GrpclbEnd2endTest, SubchannelCaching) {
+ CreateBackends(3);
+ ResetStub(/*fallback_timeout_ms=*/0, /*expected_targets=*/"",
/*subchannel_cache_delay_ms=*/1500);
- SetNextResolutionAllBalancers();
- // Initially send all backends.
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- // Then remove backends 0 and 1.
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(2), {}), 1000);
+ SetNextResolutionDefaultBalancer();
+ // Initially send backends 0 and 1.
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(0, 2), {}));
+ WaitForAllBackends(0, 2);
+ // Now remove backends 0 and 1 and add backend 2.
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(2), {}));
+ WaitForBackend(2);
// Now re-add backend 1.
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(1), {}), 1000);
- // Wait for all backends to come online.
- WaitForAllBackends();
- // Send RPCs for long enough to get all responses.
- gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(3000);
- do {
- CheckRpcSendOk();
- } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
- // Backend 0 should have received less traffic than the others.
- // Backend 1 would have received less traffic than 2 and 3.
- gpr_log(GPR_INFO, "BACKEND 0: %" PRIuPTR " requests",
- backends_[0]->service_.request_count());
- EXPECT_GT(backends_[0]->service_.request_count(), 0);
- for (size_t i = 1; i < backends_.size(); ++i) {
- gpr_log(GPR_INFO, "BACKEND %" PRIuPTR ": %" PRIuPTR " requests", i,
- backends_[i]->service_.request_count());
- EXPECT_GT(backends_[i]->service_.request_count(),
- backends_[0]->service_.request_count())
- << "backend " << i;
- if (i >= 2) {
- EXPECT_GT(backends_[i]->service_.request_count(),
- backends_[1]->service_.request_count())
- << "backend " << i;
- }
- }
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(1), {}));
+ WaitForBackend(1);
// Backend 1 should never have lost its connection from the client.
EXPECT_EQ(1UL, backends_[1]->service_.clients().size());
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// And sent 3 responses.
- EXPECT_EQ(3U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(3U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest, ReturnServerStatus) {
- SetNextResolutionAllBalancers();
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
+TEST_F(GrpclbEnd2endTest, ReturnServerStatus) {
+ CreateBackends(1);
+ SetNextResolutionDefaultBalancer();
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
// We need to wait for all backends to come online.
WaitForAllBackends();
// Send a request that the backend will fail, and make sure we get
@@ -854,29 +839,29 @@ TEST_F(SingleBalancerTest, ReturnServerStatus) {
EXPECT_EQ(actual.error_message(), expected.error_message());
}
-TEST_F(SingleBalancerTest, SelectGrpclbWithMigrationServiceConfig) {
- SetNextResolutionAllBalancers(
+TEST_F(GrpclbEnd2endTest, SelectGrpclbWithMigrationServiceConfig) {
+ CreateBackends(1);
+ SetNextResolutionDefaultBalancer(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"does_not_exist\":{} },\n"
" { \"grpclb\":{} }\n"
" ]\n"
"}");
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
CheckRpcSendOk(1, 3000 /* timeout_ms */, true /* wait_for_ready */);
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest,
+TEST_F(GrpclbEnd2endTest,
SelectGrpclbWithMigrationServiceConfigAndNoAddresses) {
- const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
+ const int kFallbackTimeoutMs = 200;
ResetStub(kFallbackTimeoutMs);
SetNextResolution({}, {},
"{\n"
@@ -898,8 +883,11 @@ TEST_F(SingleBalancerTest,
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) {
- SetNextResolutionAllBalancers(
+TEST_F(GrpclbEnd2endTest, UsePickFirstChildPolicy) {
+ const size_t kNumBackends = 2;
+ const size_t kNumRpcs = kNumBackends * 2;
+ CreateBackends(kNumBackends);
+ SetNextResolutionDefaultBalancer(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
@@ -909,11 +897,9 @@ TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) {
" } }\n"
" ]\n"
"}");
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- const size_t kNumRpcs = num_backends_ * 2;
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */);
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// Check that all requests went to the first backend. This verifies
// that we used pick_first instead of round_robin as the child policy.
EXPECT_EQ(backends_[0]->service_.request_count(), kNumRpcs);
@@ -921,15 +907,18 @@ TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) {
EXPECT_EQ(backends_[i]->service_.request_count(), 0UL);
}
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, SwapChildPolicy) {
- SetNextResolutionAllBalancers(
+TEST_F(GrpclbEnd2endTest, SwapChildPolicy) {
+ const size_t kNumBackends = 2;
+ const size_t kNumRpcs = kNumBackends * 2;
+ CreateBackends(kNumBackends);
+ SetNextResolutionDefaultBalancer(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
@@ -939,9 +928,7 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) {
" } }\n"
" ]\n"
"}");
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- const size_t kNumRpcs = num_backends_ * 2;
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */);
// Check that all requests went to the first backend. This verifies
// that we used pick_first instead of round_robin as the child policy.
@@ -950,7 +937,7 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) {
EXPECT_EQ(backends_[i]->service_.request_count(), 0UL);
}
// Send new resolution that removes child policy from service config.
- SetNextResolutionAllBalancers();
+ SetNextResolutionDefaultBalancer();
WaitForAllBackends();
CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */);
// Check that every backend saw the same number of requests. This verifies
@@ -959,23 +946,24 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) {
EXPECT_EQ(backends_[i]->service_.request_count(), 2UL);
}
// Done.
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, SameBackendListedMultipleTimes) {
+ CreateBackends(1);
+ SetNextResolutionDefaultBalancer();
// Same backend listed twice.
std::vector<int> ports;
ports.push_back(backends_[0]->port_);
ports.push_back(backends_[0]->port_);
const size_t kNumRpcsPerAddress = 10;
- ScheduleResponseForBalancer(0, BuildResponseForBackends(ports, {}), 0);
+ SendBalancerResponse(BuildResponseForBackends(ports, {}));
// We need to wait for the backend to come online.
WaitForBackend(0);
// Send kNumRpcsPerAddress RPCs per server.
@@ -985,281 +973,133 @@ TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
// And they should have come from a single client port, because of
// subchannel sharing.
EXPECT_EQ(1UL, backends_[0]->service_.clients().size());
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
}
-TEST_F(SingleBalancerTest, SecureNaming) {
- ResetStub(0, kApplicationTargetName_ + ";lb");
- SetNextResolution({AddressData{balancers_[0]->port_, "lb"}});
- const size_t kNumRpcsPerAddress = 100;
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- // Make sure that trying to connect works without a call.
- channel_->GetState(true /* try_to_connect */);
+TEST_F(GrpclbEnd2endTest, SecureNaming) {
+ CreateBackends(1);
+ ResetStub(/*fallback_timeout_ms=*/0,
+ absl::StrCat(kApplicationTargetName, ";lb"));
+ SetNextResolutionFromEndpoints(
+ CreateAddressListFromPorts({balancer_->port_}, "lb"));
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
// We need to wait for all backends to come online.
WaitForAllBackends();
- // Send kNumRpcsPerAddress RPCs per server.
- CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
-
- // Each backend should have gotten 100 requests.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count());
- }
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
- SetNextResolutionAllBalancers();
- const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
- const int kCallDeadlineMs = kServerlistDelayMs * 10;
- // First response is an empty serverlist, sent right away.
- ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
- // Send non-empty serverlist only after kServerlistDelayMs
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs);
- const auto t0 = system_clock::now();
- // Client will block: LB will initially send empty serverlist.
- CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */);
- const auto ellapsed_ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- system_clock::now() - t0);
- // but eventually, the LB sends a serverlist update that allows the call to
- // proceed. The call delay must be larger than the delay in sending the
- // populated serverlist but under the call's deadline (which is enforced by
- // the call's deadline).
- EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs);
- balancers_[0]->service_.NotifyDoneWithServerlists();
+TEST_F(GrpclbEnd2endTest, InitiallyEmptyServerlist) {
+ CreateBackends(1);
+ SetNextResolutionDefaultBalancer();
+ // First response is an empty serverlist. RPCs should fail.
+ SendBalancerResponse(LoadBalanceResponse());
+ CheckRpcSendFailure();
+ // Now send a non-empty serverlist.
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
+ CheckRpcSendOk(1, /*timeout_ms=*/3000, /*wait_for_ready=*/true);
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent two responses.
- EXPECT_EQ(2U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(2U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, AllServersUnreachableFailFast) {
+ SetNextResolutionDefaultBalancer();
const size_t kNumUnreachableServers = 5;
std::vector<int> ports;
for (size_t i = 0; i < kNumUnreachableServers; ++i) {
ports.push_back(grpc_pick_unused_port_or_die());
}
- ScheduleResponseForBalancer(0, BuildResponseForBackends(ports, {}), 0);
+ SendBalancerResponse(BuildResponseForBackends(ports, {}));
const Status status = SendRpc();
// The error shouldn't be DEADLINE_EXCEEDED.
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest, Fallback) {
- SetNextResolutionAllBalancers();
- const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
- const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
- const size_t kNumBackendsInResolution = backends_.size() / 2;
-
- ResetStub(kFallbackTimeoutMs);
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- std::vector<AddressData> backend_addresses;
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""});
- }
- SetNextResolution(balancer_addresses, backend_addresses);
-
- // Send non-empty serverlist only after kServerlistDelayMs.
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(
- GetBackendPorts(kNumBackendsInResolution /* start_index */), {}),
- kServerlistDelayMs);
-
+TEST_F(GrpclbEnd2endTest, Fallback) {
+ const size_t kNumBackends = 4;
+ const size_t kNumBackendsInResolution = kNumBackends / 2;
+ CreateBackends(kNumBackends);
+ // Inject resolver result that contains the fallback backends.
+ SetNextResolution({balancer_->port_},
+ GetBackendPorts(0, kNumBackendsInResolution));
+ // Balancer has not sent a serverlist, so we should use fallback.
// Wait until all the fallback backends are reachable.
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- WaitForBackend(i);
- }
-
- // The first request.
- gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- CheckRpcSendOk(kNumBackendsInResolution);
- gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
-
- // Fallback is used: each backend returned by the resolver should have
- // gotten one request.
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- EXPECT_EQ(1U, backends_[i]->service_.request_count());
- }
- for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
- EXPECT_EQ(0U, backends_[i]->service_.request_count());
- }
-
- // Wait until the serverlist reception has been processed and all backends
- // in the serverlist are reachable.
- for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
- WaitForBackend(i);
- }
-
- // Send out the second request.
- gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- CheckRpcSendOk(backends_.size() - kNumBackendsInResolution);
- gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
-
- // Serverlist is used: each backend returned by the balancer should
- // have gotten one request.
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- EXPECT_EQ(0U, backends_[i]->service_.request_count());
- }
- for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
- EXPECT_EQ(1U, backends_[i]->service_.request_count());
- }
-
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ WaitForAllBackends(0, kNumBackendsInResolution,
+ WaitForBackendOptions().SetTimeoutSeconds(20));
+ // Send serverlist.
+ SendBalancerResponse(BuildResponseForBackends(
+ GetBackendPorts(/*start_index=*/kNumBackendsInResolution), {}));
+ // Now we should be using the backends from the balancer.
+ WaitForAllBackends(kNumBackendsInResolution);
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest, FallbackUpdate) {
- SetNextResolutionAllBalancers();
- const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
- const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
- const size_t kNumBackendsInResolution = backends_.size() / 3;
- const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3;
-
- ResetStub(kFallbackTimeoutMs);
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- std::vector<AddressData> backend_addresses;
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""});
- }
- SetNextResolution(balancer_addresses, backend_addresses);
-
- // Send non-empty serverlist only after kServerlistDelayMs.
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(
- GetBackendPorts(kNumBackendsInResolution +
- kNumBackendsInResolutionUpdate /* start_index */),
- {}),
- kServerlistDelayMs);
-
+TEST_F(GrpclbEnd2endTest, FallbackUpdate) {
+ const size_t kNumBackends = 6;
+ const size_t kNumBackendsInResolution = kNumBackends / 3;
+ const size_t kNumBackendsInResolutionUpdate = kNumBackends / 3;
+ ResetStub(/*fallback_timeout_ms=*/500);
+ CreateBackends(kNumBackends);
+ // Inject resolver result with fallback addresses.
+ SetNextResolution({balancer_->port_},
+ GetBackendPorts(0, kNumBackendsInResolution));
+ // Balancer has not sent a serverlist, so we should use fallback.
// Wait until all the fallback backends are reachable.
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- WaitForBackend(i);
- }
-
- // The first request.
- gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- CheckRpcSendOk(kNumBackendsInResolution);
- gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
-
- // Fallback is used: each backend returned by the resolver should have
- // gotten one request.
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- EXPECT_EQ(1U, backends_[i]->service_.request_count());
- }
- for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
- EXPECT_EQ(0U, backends_[i]->service_.request_count());
- }
-
- balancer_addresses.clear();
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- backend_addresses.clear();
- for (size_t i = kNumBackendsInResolution;
- i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
- backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""});
- }
- SetNextResolution(balancer_addresses, backend_addresses);
-
- // Wait until the resolution update has been processed and all the new
- // fallback backends are reachable.
- for (size_t i = kNumBackendsInResolution;
- i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
- WaitForBackend(i);
- }
-
- // Send out the second request.
- gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- CheckRpcSendOk(kNumBackendsInResolutionUpdate);
- gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
-
- // The resolution update is used: each backend in the resolution update should
- // have gotten one request.
- for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
- EXPECT_EQ(0U, backends_[i]->service_.request_count());
- }
- for (size_t i = kNumBackendsInResolution;
- i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
- EXPECT_EQ(1U, backends_[i]->service_.request_count());
- }
- for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
- i < backends_.size(); ++i) {
- EXPECT_EQ(0U, backends_[i]->service_.request_count());
- }
-
- // Wait until the serverlist reception has been processed and all backends
- // in the serverlist are reachable.
- for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
- i < backends_.size(); ++i) {
- WaitForBackend(i);
- }
-
- // Send out the third request.
- gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
- CheckRpcSendOk(backends_.size() - kNumBackendsInResolution -
- kNumBackendsInResolutionUpdate);
- gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
-
- // Serverlist is used: each backend returned by the balancer should
- // have gotten one request.
- for (size_t i = 0;
- i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
- EXPECT_EQ(0U, backends_[i]->service_.request_count());
- }
- for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
- i < backends_.size(); ++i) {
- EXPECT_EQ(1U, backends_[i]->service_.request_count());
- }
-
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ WaitForAllBackends(0, kNumBackendsInResolution);
+ // Now send a resolver result with a different set of backend addresses.
+ SetNextResolution({balancer_->port_},
+ GetBackendPorts(kNumBackendsInResolution,
+ kNumBackendsInResolution +
+ kNumBackendsInResolutionUpdate));
+ // Wait until the new fallback backends are reachable.
+ WaitForAllBackends(kNumBackendsInResolution,
+ kNumBackendsInResolution + kNumBackendsInResolutionUpdate);
+ // Send non-empty serverlist.
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(kNumBackendsInResolution +
+ kNumBackendsInResolutionUpdate),
+ {}));
+ // Wait for backends from balancer to be seen.
+ WaitForAllBackends(kNumBackendsInResolution + kNumBackendsInResolutionUpdate);
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest,
+TEST_F(GrpclbEnd2endTest,
FallbackAfterStartupLoseContactWithBalancerThenBackends) {
// First two backends are fallback, last two are pointed to by balancer.
+ const size_t kNumBackends = 4;
const size_t kNumFallbackBackends = 2;
- const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends;
- std::vector<AddressData> backend_addresses;
- for (size_t i = 0; i < kNumFallbackBackends; ++i) {
- backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""});
- }
- std::vector<AddressData> balancer_addresses;
- for (size_t i = 0; i < balancers_.size(); ++i) {
- balancer_addresses.emplace_back(AddressData{balancers_[i]->port_, ""});
- }
- SetNextResolution(balancer_addresses, backend_addresses);
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
- 0);
+ const size_t kNumBalancerBackends = kNumBackends - kNumFallbackBackends;
+ CreateBackends(kNumBackends);
+ SetNextResolution({balancer_->port_},
+ GetBackendPorts(0, kNumFallbackBackends));
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}));
// Try to connect.
- channel_->GetState(true /* try_to_connect */);
- WaitForAllBackends(1 /* num_requests_multiple_of */,
- kNumFallbackBackends /* start_index */);
+ WaitForAllBackends(kNumFallbackBackends /* start_index */);
// Stop balancer. RPCs should continue going to backends from balancer.
- balancers_[0]->Shutdown();
+ balancer_->Shutdown();
CheckRpcSendOk(100 * kNumBalancerBackends);
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
EXPECT_EQ(100UL, backends_[i]->service_.request_count());
@@ -1268,11 +1108,9 @@ TEST_F(SingleBalancerTest,
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
ShutdownBackend(i);
}
- WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
- kNumFallbackBackends /* stop_index */);
+ WaitForAllBackends(0, kNumFallbackBackends);
// Restart the backends from the balancer. We should *not* start
- // sending traffic back to them at this point (although the behavior
- // in xds may be different).
+ // sending traffic back to them at this point.
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
StartBackend(i);
}
@@ -1282,35 +1120,25 @@ TEST_F(SingleBalancerTest,
}
// Now start the balancer again. This should cause us to exit
// fallback mode.
- balancers_[0]->Start(server_host_);
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
- 0);
- WaitForAllBackends(1 /* num_requests_multiple_of */,
- kNumFallbackBackends /* start_index */);
+ balancer_->Start();
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}));
+ WaitForAllBackends(kNumFallbackBackends);
}
-TEST_F(SingleBalancerTest,
+TEST_F(GrpclbEnd2endTest,
FallbackAfterStartupLoseContactWithBackendsThenBalancer) {
// First two backends are fallback, last two are pointed to by balancer.
+ const size_t kNumBackends = 4;
const size_t kNumFallbackBackends = 2;
- const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends;
- std::vector<AddressData> backend_addresses;
- for (size_t i = 0; i < kNumFallbackBackends; ++i) {
- backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""});
- }
- std::vector<AddressData> balancer_addresses;
- for (size_t i = 0; i < balancers_.size(); ++i) {
- balancer_addresses.emplace_back(AddressData{balancers_[i]->port_, ""});
- }
- SetNextResolution(balancer_addresses, backend_addresses);
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
- 0);
+ const size_t kNumBalancerBackends = kNumBackends - kNumFallbackBackends;
+ CreateBackends(kNumBackends);
+ SetNextResolution({balancer_->port_},
+ GetBackendPorts(0, kNumFallbackBackends));
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}));
// Try to connect.
- channel_->GetState(true /* try_to_connect */);
- WaitForAllBackends(1 /* num_requests_multiple_of */,
- kNumFallbackBackends /* start_index */);
+ WaitForAllBackends(kNumFallbackBackends);
// Stop backends from balancer. Since we are still in contact with
// the balancer at this point, RPCs should be failing.
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
@@ -1318,9 +1146,8 @@ TEST_F(SingleBalancerTest,
}
CheckRpcSendFailure();
// Stop balancer. This should put us in fallback mode.
- balancers_[0]->Shutdown();
- WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
- kNumFallbackBackends /* stop_index */);
+ balancer_->Shutdown();
+ WaitForAllBackends(0, kNumFallbackBackends);
// Restart the backends from the balancer. We should *not* start
// sending traffic back to them at this point (although the behavior
// in xds may be different).
@@ -1333,99 +1160,76 @@ TEST_F(SingleBalancerTest,
}
// Now start the balancer again. This should cause us to exit
// fallback mode.
- balancers_[0]->Start(server_host_);
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}),
- 0);
- WaitForAllBackends(1 /* num_requests_multiple_of */,
- kNumFallbackBackends /* start_index */);
+ balancer_->Start();
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}));
+ WaitForAllBackends(kNumFallbackBackends);
}
-TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
- const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
+TEST_F(GrpclbEnd2endTest, FallbackEarlyWhenBalancerChannelFails) {
+ const int kFallbackTimeoutMs = 10000;
ResetStub(kFallbackTimeoutMs);
+ CreateBackends(1);
// Return an unreachable balancer and one fallback backend.
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(
- AddressData{grpc_pick_unused_port_or_die(), ""});
- std::vector<AddressData> backend_addresses;
- backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""});
- SetNextResolution(balancer_addresses, backend_addresses);
+ SetNextResolution({grpc_pick_unused_port_or_die()}, GetBackendPorts());
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 3000,
/* wait_for_ready */ false);
}
-TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) {
- const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
+TEST_F(GrpclbEnd2endTest, FallbackEarlyWhenBalancerCallFails) {
+ const int kFallbackTimeoutMs = 10000;
ResetStub(kFallbackTimeoutMs);
+ CreateBackends(1);
// Return one balancer and one fallback backend.
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- std::vector<AddressData> backend_addresses;
- backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""});
- SetNextResolution(balancer_addresses, backend_addresses);
+ SetNextResolution({balancer_->port_}, GetBackendPorts());
// Balancer drops call without sending a serverlist.
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 3000,
/* wait_for_ready */ false);
}
-TEST_F(SingleBalancerTest, FallbackControlledByBalancerBeforeFirstServerlist) {
- const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
+TEST_F(GrpclbEnd2endTest, FallbackControlledByBalancerBeforeFirstServerlist) {
+ const int kFallbackTimeoutMs = 10000;
ResetStub(kFallbackTimeoutMs);
+ CreateBackends(1);
// Return one balancer and one fallback backend.
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- std::vector<AddressData> backend_addresses;
- backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""});
- SetNextResolution(balancer_addresses, backend_addresses);
+ SetNextResolution({balancer_->port_}, GetBackendPorts());
// Balancer explicitly tells client to fallback.
- LoadBalanceResponse resp;
- resp.mutable_fallback_response();
- ScheduleResponseForBalancer(0, resp, 0);
+ LoadBalanceResponse response;
+ response.mutable_fallback_response();
+ SendBalancerResponse(std::move(response));
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 3000,
/* wait_for_ready */ false);
}
-TEST_F(SingleBalancerTest, FallbackControlledByBalancerAfterFirstServerlist) {
+TEST_F(GrpclbEnd2endTest, FallbackControlledByBalancerAfterFirstServerlist) {
+ CreateBackends(2);
// Return one balancer and one fallback backend (backend 0).
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- std::vector<AddressData> backend_addresses;
- backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""});
- SetNextResolution(balancer_addresses, backend_addresses);
- // Balancer initially sends serverlist, then tells client to fall back,
- // then sends the serverlist again.
- // The serverlist points to backend 1.
- LoadBalanceResponse serverlist_resp =
- BuildResponseForBackends({backends_[1]->port_}, {});
- LoadBalanceResponse fallback_resp;
- fallback_resp.mutable_fallback_response();
- ScheduleResponseForBalancer(0, serverlist_resp, 0);
- ScheduleResponseForBalancer(0, fallback_resp, 100);
- ScheduleResponseForBalancer(0, serverlist_resp, 100);
- // Requests initially go to backend 1, then go to backend 0 in
- // fallback mode, then go back to backend 1 when we exit fallback.
+ SetNextResolution({balancer_->port_}, {backends_[0]->port_});
+ // Balancer sends a serverlist pointing to backend 1.
+ SendBalancerResponse(BuildResponseForBackends({backends_[1]->port_}, {}));
WaitForBackend(1);
+ // Balancer tells client to fall back.
+ LoadBalanceResponse fallback_response;
+ fallback_response.mutable_fallback_response();
+ SendBalancerResponse(std::move(fallback_response));
WaitForBackend(0);
+ // Balancer sends a new serverlist, so client exits fallback.
+ SendBalancerResponse(BuildResponseForBackends({backends_[1]->port_}, {}));
WaitForBackend(1);
}
-TEST_F(SingleBalancerTest, BackendsRestart) {
- SetNextResolutionAllBalancers();
- const size_t kNumRpcsPerAddress = 100;
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- // Make sure that trying to connect works without a call.
- channel_->GetState(true /* try_to_connect */);
- // Send kNumRpcsPerAddress RPCs per server.
- CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
+TEST_F(GrpclbEnd2endTest, BackendsRestart) {
+ CreateBackends(2);
+ SetNextResolutionDefaultBalancer();
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
+ WaitForAllBackends();
// Stop backends. RPCs should fail.
ShutdownAllBackends();
CheckRpcSendFailure();
@@ -1434,12 +1238,12 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
CheckRpcSendOk(1 /* times */, 3000 /* timeout_ms */,
true /* wait_for_ready */);
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) {
+TEST_F(GrpclbEnd2endTest, ServiceNameFromLbPolicyConfig) {
constexpr char kServiceConfigWithTarget[] =
"{\n"
" \"loadBalancingConfig\":[\n"
@@ -1448,20 +1252,16 @@ TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) {
" }}\n"
" ]\n"
"}";
-
- SetNextResolutionAllBalancers(kServiceConfigWithTarget);
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- // Make sure that trying to connect works without a call.
- channel_->GetState(true /* try_to_connect */);
- // We need to wait for all backends to come online.
+ SetNextResolutionDefaultBalancer(kServiceConfigWithTarget);
+ CreateBackends(1);
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
WaitForAllBackends();
- EXPECT_EQ(balancers_[0]->service_.service_names().back(), "test_service");
+ EXPECT_EQ(balancer_->service_.service_names().back(), "test_service");
}
// This death test is kept separate from the rest to ensure that it's run before
// any others. See https://github.com/grpc/grpc/pull/32269 for details.
-using SingleBalancerDeathTest = SingleBalancerTest;
+using SingleBalancerDeathTest = GrpclbEnd2endTest;
TEST_F(SingleBalancerDeathTest, SecureNaming) {
GTEST_FLAG_SET(death_test_style, "threadsafe");
@@ -1469,26 +1269,24 @@ TEST_F(SingleBalancerDeathTest, SecureNaming) {
// the name from the balancer doesn't match expectations.
ASSERT_DEATH_IF_SUPPORTED(
{
- ResetStub(0, kApplicationTargetName_ + ";lb");
- SetNextResolution({AddressData{balancers_[0]->port_, "woops"}});
+ ResetStub(/*fallback_timeout_ms=*/0,
+ absl::StrCat(kApplicationTargetName, ";lb"));
+ SetNextResolutionFromEndpoints(
+ CreateAddressListFromPorts({balancer_->port_}, "woops"));
channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
},
"");
}
-class UpdatesTest : public GrpclbEnd2endTest {
- public:
- UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
-};
-
-TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, UpdateBalancersButKeepUsingOriginalBalancer) {
+ SetNextResolutionDefaultBalancer();
+ CreateBackends(2);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
- ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
- 0);
- ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
- 0);
+ SendBalancerResponse(BuildResponseForBackends(first_backend, {}));
+ auto balancer2 = CreateAndStartBalancer();
+ balancer2->service_.SendResponse(
+ BuildResponseForBackends(second_backend, {}));
// Wait until the first backend is ready.
WaitForBackend(0);
@@ -1501,19 +1299,14 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backends_[0]->service_.request_count());
- // Balancer 0 got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- // and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
- EXPECT_EQ(0U, balancers_[1]->service_.request_count());
- EXPECT_EQ(0U, balancers_[1]->service_.response_count());
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
-
- std::vector<AddressData> addresses;
- addresses.emplace_back(AddressData{balancers_[1]->port_, ""});
+ // Balancer 0 got a single request and sent a single response.
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
+ EXPECT_EQ(0U, balancer2->service_.request_count());
+ EXPECT_EQ(0U, balancer2->service_.response_count());
+
gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
- SetNextResolution(addresses);
+ SetNextResolution({balancer2->port_});
gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
EXPECT_EQ(0U, backends_[1]->service_.request_count());
@@ -1527,26 +1320,25 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
// first balancer, which doesn't assign the second backend.
EXPECT_EQ(0U, backends_[1]->service_.request_count());
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
- EXPECT_EQ(0U, balancers_[1]->service_.request_count());
- EXPECT_EQ(0U, balancers_[1]->service_.response_count());
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
+ EXPECT_EQ(0U, balancer2->service_.request_count());
+ EXPECT_EQ(0U, balancer2->service_.response_count());
}
-// Send an update with the same set of LBs as the one in SetUp() in order to
+// Send an update with the same set of LBs as the previous one in order to
// verify that the LB channel inside grpclb keeps the initial connection (which
// by definition is also present in the update).
-TEST_F(UpdatesTest, UpdateBalancersRepeated) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, UpdateBalancersRepeated) {
+ CreateBackends(2);
const std::vector<int> first_backend{GetBackendPorts()[0]};
- const std::vector<int> second_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[1]};
+ SendBalancerResponse(BuildResponseForBackends(first_backend, {}));
+ auto balancer2 = CreateAndStartBalancer();
+ balancer2->service_.SendResponse(
+ BuildResponseForBackends(second_backend, {}));
- ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
- 0);
- ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
- 0);
+ SetNextResolution({balancer_->port_, balancer2->port_});
// Wait until the first backend is ready.
WaitForBackend(0);
@@ -1559,22 +1351,15 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backends_[0]->service_.request_count());
- balancers_[0]->service_.NotifyDoneWithServerlists();
- // Balancer 0 got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- // and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
- EXPECT_EQ(0U, balancers_[1]->service_.request_count());
- EXPECT_EQ(0U, balancers_[1]->service_.response_count());
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
-
- std::vector<AddressData> addresses;
- addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- addresses.emplace_back(AddressData{balancers_[1]->port_, ""});
- addresses.emplace_back(AddressData{balancers_[2]->port_, ""});
+ balancer_->service_.ShutdownStream();
+ // Balancer 0 got a single request and sent a single response.
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
+ EXPECT_EQ(0U, balancer2->service_.request_count());
+ EXPECT_EQ(0U, balancer2->service_.response_count());
+
gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
- SetNextResolution(addresses);
+ SetNextResolution({balancer_->port_, balancer2->port_});
gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
EXPECT_EQ(0U, backends_[1]->service_.request_count());
@@ -1587,39 +1372,19 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
EXPECT_EQ(0U, backends_[1]->service_.request_count());
- balancers_[0]->service_.NotifyDoneWithServerlists();
-
- addresses.clear();
- addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- addresses.emplace_back(AddressData{balancers_[1]->port_, ""});
- gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 ==========");
- SetNextResolution(addresses);
- gpr_log(GPR_INFO, "========= UPDATE 2 DONE ==========");
-
- EXPECT_EQ(0U, backends_[1]->service_.request_count());
- deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(10000, GPR_TIMESPAN));
- // Send 10 seconds worth of RPCs
- do {
- CheckRpcSendOk();
- } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
- // grpclb continued using the original LB call to the first balancer, which
- // doesn't assign the second backend.
- EXPECT_EQ(0U, backends_[1]->service_.request_count());
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
}
-TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
- std::vector<AddressData> addresses;
- addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- SetNextResolution(addresses);
+TEST_F(GrpclbEnd2endTest, UpdateBalancersDeadUpdate) {
+ SetNextResolutionDefaultBalancer();
+ CreateBackends(2);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
- ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
- 0);
- ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
- 0);
+ SendBalancerResponse(BuildResponseForBackends(first_backend, {}));
+ auto balancer2 = CreateAndStartBalancer();
+ balancer2->service_.SendResponse(
+ BuildResponseForBackends(second_backend, {}));
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -1630,7 +1395,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// Kill balancer 0
gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
- balancers_[0]->Shutdown();
+ balancer_->Shutdown();
gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
// This is serviced by the existing RR policy
@@ -1642,18 +1407,14 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
EXPECT_EQ(0U, backends_[1]->service_.request_count());
// Balancer 0 got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
- EXPECT_EQ(0U, balancers_[1]->service_.request_count());
- EXPECT_EQ(0U, balancers_[1]->service_.response_count());
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
-
- addresses.clear();
- addresses.emplace_back(AddressData{balancers_[1]->port_, ""});
+ EXPECT_EQ(1U, balancer_->service_.response_count());
+ EXPECT_EQ(0U, balancer2->service_.request_count());
+ EXPECT_EQ(0U, balancer2->service_.response_count());
+
gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
- SetNextResolution(addresses);
+ SetNextResolution({balancer2->port_});
gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
// Wait until update has been processed, as signaled by the second backend
@@ -1670,98 +1431,63 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backends_[1]->service_.request_count());
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
// The second balancer, published as part of the first update, may end up
// getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
// firing races with the arrival of the update containing the second
// balancer.
- EXPECT_GE(balancers_[1]->service_.request_count(), 1U);
- EXPECT_GE(balancers_[1]->service_.response_count(), 1U);
- EXPECT_LE(balancers_[1]->service_.request_count(), 2U);
- EXPECT_LE(balancers_[1]->service_.response_count(), 2U);
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
+ EXPECT_GE(balancer2->service_.request_count(), 1U);
+ EXPECT_GE(balancer2->service_.response_count(), 1U);
+ EXPECT_LE(balancer2->service_.request_count(), 2U);
+ EXPECT_LE(balancer2->service_.response_count(), 2U);
}
-TEST_F(UpdatesTest, ReresolveDeadBackend) {
- ResetStub(500);
+TEST_F(GrpclbEnd2endTest, ReresolveDeadBackend) {
+ ResetStub(/*fallback_timeout_ms=*/500);
+ CreateBackends(2);
// The first resolution contains the addresses of a balancer that never
// responds, and a fallback backend.
- std::vector<AddressData> balancer_addresses;
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- std::vector<AddressData> backend_addresses;
- backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""});
- SetNextResolution(balancer_addresses, backend_addresses);
- // Ask channel to connect to trigger resolver creation.
- channel_->GetState(true);
- // The re-resolution result will contain the addresses of the same balancer
- // and a new fallback backend.
- balancer_addresses.clear();
- balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- backend_addresses.clear();
- backend_addresses.emplace_back(AddressData{backends_[1]->port_, ""});
- SetNextReresolutionResponse(balancer_addresses, backend_addresses);
-
+ SetNextResolution({balancer_->port_}, {backends_[0]->port_});
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// All 10 requests should have gone to the fallback backend.
EXPECT_EQ(10U, backends_[0]->service_.request_count());
-
// Kill backend 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
backends_[0]->Shutdown();
gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
-
- // Wait until re-resolution has finished, as signaled by the second backend
+ // This should trigger re-resolution.
+ EXPECT_TRUE(response_generator_->WaitForReresolutionRequest(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+ // The re-resolution result will contain the addresses of the same balancer
+ // and a new fallback backend.
+ SetNextResolution({balancer_->port_}, {backends_[1]->port_});
+ // Wait until re-resolution has been seen, as signaled by the second backend
// receiving a request.
WaitForBackend(1);
-
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backends_[1]->service_.request_count());
-
- balancers_[0]->service_.NotifyDoneWithServerlists();
- balancers_[1]->service_.NotifyDoneWithServerlists();
- balancers_[2]->service_.NotifyDoneWithServerlists();
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- EXPECT_EQ(0U, balancers_[0]->service_.response_count());
- EXPECT_EQ(0U, balancers_[1]->service_.request_count());
- EXPECT_EQ(0U, balancers_[1]->service_.response_count());
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
+ balancer_->service_.ShutdownStream();
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(0U, balancer_->service_.response_count());
}
-// TODO(juanlishen): Should be removed when the first response is always the
-// initial response. Currently, if client load reporting is not enabled, the
-// balancer doesn't send initial response. When the backend shuts down, an
-// unexpected re-resolution will happen. This test configuration is a workaround
-// for test ReresolveDeadBalancer.
-class UpdatesWithClientLoadReportingTest : public GrpclbEnd2endTest {
- public:
- UpdatesWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 3, 2) {}
-};
-
-TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
+TEST_F(GrpclbEnd2endTest, ReresolveDeadBalancer) {
+ CreateBackends(2);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
- ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}),
- 0);
- ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}),
- 0);
-
- // Ask channel to connect to trigger resolver creation.
- channel_->GetState(true);
- std::vector<AddressData> addresses;
- addresses.emplace_back(AddressData{balancers_[0]->port_, ""});
- SetNextResolution(addresses);
- addresses.clear();
- addresses.emplace_back(AddressData{balancers_[1]->port_, ""});
- SetNextReresolutionResponse(addresses);
+ SendBalancerResponse(BuildResponseForBackends(first_backend, {}));
+ auto balancer2 = CreateAndStartBalancer();
+ balancer2->service_.SendResponse(
+ BuildResponseForBackends(second_backend, {}));
+
+ SetNextResolutionDefaultBalancer();
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -1770,28 +1496,24 @@ TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backends_[0]->service_.request_count());
- // Kill backend 0.
- gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
- backends_[0]->Shutdown();
- gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
-
- CheckRpcSendFailure();
-
- // Balancer 0 got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- // and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
- EXPECT_EQ(0U, balancers_[1]->service_.request_count());
- EXPECT_EQ(0U, balancers_[1]->service_.response_count());
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
+ // Balancer 0 got a single request and sent a single request.
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
+ EXPECT_EQ(0U, balancer2->service_.request_count());
+ EXPECT_EQ(0U, balancer2->service_.response_count());
// Kill balancer 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
- balancers_[0]->Shutdown();
+ balancer_->Shutdown();
gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
- // Wait until re-resolution has finished, as signaled by the second backend
+ // This should trigger a re-resolution.
+ EXPECT_TRUE(response_generator_->WaitForReresolutionRequest(
+ absl::Seconds(5 * grpc_test_slowdown_factor())));
+ gpr_log(GPR_INFO, "********** SAW RE-RESOLUTION REQUEST *************");
+ // Re-resolution result switches to a new balancer.
+ SetNextResolution({balancer2->port_});
+ // Wait until re-resolution has been seen, as signaled by the second backend
// receiving a request.
WaitForBackend(1);
@@ -1802,42 +1524,31 @@ TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backends_[1]->service_.request_count());
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
- // After balancer 0 is killed, we restart an LB call immediately (because we
- // disconnect to a previously connected balancer). Although we will cancel
- // this call when the re-resolution update is done and another LB call restart
- // is needed, this old call may still succeed reaching the LB server if
- // re-resolution is slow. So balancer 1 may have received 2 requests and sent
- // 2 responses.
- EXPECT_GE(balancers_[1]->service_.request_count(), 1U);
- EXPECT_GE(balancers_[1]->service_.response_count(), 1U);
- EXPECT_LE(balancers_[1]->service_.request_count(), 2U);
- EXPECT_LE(balancers_[1]->service_.response_count(), 2U);
- EXPECT_EQ(0U, balancers_[2]->service_.request_count());
- EXPECT_EQ(0U, balancers_[2]->service_.response_count());
+ // First and second balancer should each have handled one request and
+ // sent one response.
+ EXPECT_EQ(1U, balancer_->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
+ EXPECT_EQ(1U, balancer2->service_.request_count());
+ EXPECT_EQ(1U, balancer2->service_.response_count());
}
-TEST_F(SingleBalancerTest, Drop) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
- const int num_of_drop_by_rate_limiting_addresses = 1;
- const int num_of_drop_by_load_balancing_addresses = 2;
- const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
- num_of_drop_by_load_balancing_addresses;
- const int num_total_addresses = num_backends_ + num_of_drop_addresses;
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(
- GetBackendPorts(),
- {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
- {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
- 0);
+ const size_t kNumBackends = 2;
+ const int kNumDropRateLimiting = 1;
+ const int kNumDropLoadBalancing = 2;
+ const int kNumDropTotal = kNumDropRateLimiting + kNumDropLoadBalancing;
+ const int kNumAddressesTotal = kNumBackends + kNumDropTotal;
+ SetNextResolutionDefaultBalancer();
+ CreateBackends(kNumBackends);
+ SendBalancerResponse(BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", kNumDropRateLimiting},
+ {"load_balancing", kNumDropLoadBalancing}}));
// Wait until all backends are ready.
WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs for each server and drop address.
size_t num_drops = 0;
- for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) {
+ for (size_t i = 0; i < kNumRpcsPerAddress * kNumAddressesTotal; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
@@ -1846,52 +1557,41 @@ TEST_F(SingleBalancerTest, Drop) {
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kRequestMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage);
}
}
- EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
+ EXPECT_EQ(kNumRpcsPerAddress * kNumDropTotal, num_drops);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count());
}
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
}
-TEST_F(SingleBalancerTest, DropAllFirst) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, DropAllFirst) {
+ SetNextResolutionDefaultBalancer();
// All registered addresses are marked as "drop".
- const int num_of_drop_by_rate_limiting_addresses = 1;
- const int num_of_drop_by_load_balancing_addresses = 1;
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(
- {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
- {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
- 0);
+ const int kNumDropRateLimiting = 1;
+ const int kNumDropLoadBalancing = 1;
+ SendBalancerResponse(BuildResponseForBackends(
+ {}, {{"rate_limiting", kNumDropRateLimiting},
+ {"load_balancing", kNumDropLoadBalancing}}));
const Status status = SendRpc(nullptr, 3000, true);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "drop directed by grpclb balancer");
}
-TEST_F(SingleBalancerTest, DropAll) {
- SetNextResolutionAllBalancers();
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
- const int num_of_drop_by_rate_limiting_addresses = 1;
- const int num_of_drop_by_load_balancing_addresses = 1;
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(
- {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
- {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
- 1000);
-
- // First call succeeds.
+TEST_F(GrpclbEnd2endTest, DropAll) {
+ CreateBackends(1);
+ SetNextResolutionDefaultBalancer();
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
CheckRpcSendOk();
- // But eventually, the update with only dropped servers is processed and calls
+ SendBalancerResponse(BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}));
+ // Eventually, the update with only dropped servers is processed, and calls
// fail.
Status status;
do {
@@ -1901,66 +1601,61 @@ TEST_F(SingleBalancerTest, DropAll) {
EXPECT_EQ(status.error_message(), "drop directed by grpclb balancer");
}
-class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
- public:
- SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 3) {}
-};
-
-TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, ClientLoadReporting) {
+ const size_t kNumBackends = 3;
+ CreateBackends(kNumBackends);
+ balancer_->service_.set_client_load_reporting_interval_seconds(3);
+ SetNextResolutionDefaultBalancer();
const size_t kNumRpcsPerAddress = 100;
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
+ SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {}));
// Wait until all backends are ready.
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server.
- CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
+ CheckRpcSendOk(kNumRpcsPerAddress * kNumBackends);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count());
}
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
-
+ EXPECT_EQ(1U, balancer_->service_.response_count());
ClientStats client_stats;
do {
client_stats += WaitForLoadReports();
} while (client_stats.num_calls_finished !=
- kNumRpcsPerAddress * num_backends_ + num_ok);
- EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
+ kNumRpcsPerAddress * kNumBackends + num_ok);
+ EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + num_ok,
client_stats.num_calls_started);
- EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
+ EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + num_ok,
client_stats.num_calls_finished);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
- EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + (num_ok + num_drops),
+ EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + (num_ok + num_drops),
client_stats.num_calls_finished_known_received);
EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
}
-TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, LoadReportingWithBalancerRestart) {
+ const size_t kNumBackends = 4;
const size_t kNumBackendsFirstPass = 2;
- const size_t kNumBackendsSecondPass =
- backends_.size() - kNumBackendsFirstPass;
+ const size_t kNumBackendsSecondPass = kNumBackends - kNumBackendsFirstPass;
+ CreateBackends(kNumBackends);
+ balancer_->service_.set_client_load_reporting_interval_seconds(3);
+ SetNextResolutionDefaultBalancer();
// Balancer returns backends starting at index 1.
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(GetBackendPorts(0, kNumBackendsFirstPass), {}),
- 0);
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(0, kNumBackendsFirstPass), {}));
// Wait until all backends returned by the balancer are ready.
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) =
- WaitForAllBackends(/* num_requests_multiple_of */ 1, /* start_index */ 0,
- /* stop_index */ kNumBackendsFirstPass);
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ WaitForAllBackends(0, kNumBackendsFirstPass);
+ balancer_->service_.ShutdownStream();
ClientStats client_stats = WaitForLoadReports();
EXPECT_EQ(static_cast<size_t>(num_ok), client_stats.num_calls_started);
EXPECT_EQ(static_cast<size_t>(num_ok), client_stats.num_calls_finished);
@@ -1969,7 +1664,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
client_stats.num_calls_finished_known_received);
EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
// Shut down the balancer.
- balancers_[0]->Shutdown();
+ balancer_->Shutdown();
// Send 10 more requests per backend. This will continue using the
// last serverlist we received from the balancer before it was shut down.
ResetBackendCounters();
@@ -1979,10 +1674,9 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
EXPECT_EQ(1UL, backends_[i]->service_.request_count());
}
// Now restart the balancer, this time pointing to all backends.
- balancers_[0]->Start(server_host_);
- ScheduleResponseForBalancer(
- 0, BuildResponseForBackends(GetBackendPorts(kNumBackendsFirstPass), {}),
- 0);
+ balancer_->Start();
+ SendBalancerResponse(
+ BuildResponseForBackends(GetBackendPorts(kNumBackendsFirstPass), {}));
// Wait for queries to start going to one of the new backends.
// This tells us that we're now using the new serverlist.
do {
@@ -1991,7 +1685,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
backends_[3]->service_.request_count() == 0);
// Send one RPC per backend.
CheckRpcSendOk(kNumBackendsSecondPass);
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// Check client stats.
client_stats = WaitForLoadReports();
EXPECT_EQ(kNumBackendsSecondPass + 1, client_stats.num_calls_started);
@@ -2002,31 +1696,31 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
}
-TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
- SetNextResolutionAllBalancers();
+TEST_F(GrpclbEnd2endTest, LoadReportingWithDrops) {
+ const size_t kNumBackends = 3;
const size_t kNumRpcsPerAddress = 3;
- const int num_of_drop_by_rate_limiting_addresses = 2;
- const int num_of_drop_by_load_balancing_addresses = 1;
- const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses +
- num_of_drop_by_load_balancing_addresses;
- const int num_total_addresses = num_backends_ + num_of_drop_addresses;
- ScheduleResponseForBalancer(
- 0,
- BuildResponseForBackends(
- GetBackendPorts(),
- {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
- {"load_balancing", num_of_drop_by_load_balancing_addresses}}),
- 0);
+ const int kNumDropRateLimiting = 2;
+ const int kNumDropLoadBalancing = 1;
+ const int kNumDropTotal = kNumDropRateLimiting + kNumDropLoadBalancing;
+ const int kNumAddressesTotal = kNumBackends + kNumDropTotal;
+ CreateBackends(kNumBackends);
+ balancer_->service_.set_client_load_reporting_interval_seconds(3);
+ SetNextResolutionDefaultBalancer();
+ SendBalancerResponse(BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", kNumDropRateLimiting},
+ {"load_balancing", kNumDropLoadBalancing}}));
// Wait until all backends are ready.
int num_warmup_ok = 0;
int num_warmup_failure = 0;
int num_warmup_drops = 0;
std::tie(num_warmup_ok, num_warmup_failure, num_warmup_drops) =
- WaitForAllBackends(num_total_addresses /* num_requests_multiple_of */);
+ WaitForAllBackends(
+ 0, kNumBackends,
+ WaitForBackendOptions().SetNumRequestsMultipleOf(kNumAddressesTotal));
const int num_total_warmup_requests =
num_warmup_ok + num_warmup_failure + num_warmup_drops;
size_t num_drops = 0;
- for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) {
+ for (size_t i = 0; i < kNumRpcsPerAddress * kNumAddressesTotal; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
@@ -2035,35 +1729,32 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kRequestMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage);
}
}
- EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
+ EXPECT_EQ(kNumRpcsPerAddress * kNumDropTotal, num_drops);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count());
}
- balancers_[0]->service_.NotifyDoneWithServerlists();
+ balancer_->service_.ShutdownStream();
// The balancer got a single request.
- EXPECT_EQ(1U, balancers_[0]->service_.request_count());
+ EXPECT_EQ(1U, balancer_->service_.request_count());
// and sent a single response.
- EXPECT_EQ(1U, balancers_[0]->service_.response_count());
+ EXPECT_EQ(1U, balancer_->service_.response_count());
const ClientStats client_stats = WaitForLoadReports();
- EXPECT_EQ(
- kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
- client_stats.num_calls_started);
- EXPECT_EQ(
- kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests,
- client_stats.num_calls_finished);
+ EXPECT_EQ(kNumRpcsPerAddress * kNumAddressesTotal + num_total_warmup_requests,
+ client_stats.num_calls_started);
+ EXPECT_EQ(kNumRpcsPerAddress * kNumAddressesTotal + num_total_warmup_requests,
+ client_stats.num_calls_finished);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
- EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_warmup_ok,
+ EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + num_warmup_ok,
client_stats.num_calls_finished_known_received);
// The number of warmup request is a multiple of the number of addresses.
// Therefore, all addresses in the scheduled balancer response are hit the
// same number of times.
- const int num_times_drop_addresses_hit =
- num_warmup_drops / num_of_drop_addresses;
+ const int num_times_drop_addresses_hit = num_warmup_drops / kNumDropTotal;
EXPECT_THAT(
client_stats.drop_token_counts,
::testing::ElementsAre(