aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc164
1 files changed, 85 insertions, 79 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
index 0b877298ad..2bcf45a211 100644
--- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
@@ -32,6 +32,7 @@
#include <vector>
#include "absl/base/thread_annotations.h"
+#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@@ -73,6 +74,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -245,14 +247,13 @@ class OldWeightedRoundRobin : public LoadBalancingPolicy {
WeightedRoundRobinSubchannelData> {
public:
WeightedRoundRobinSubchannelList(OldWeightedRoundRobin* policy,
- ServerAddressList addresses,
+ EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WeightedRoundRobinSubchannelList"
: nullptr),
- std::move(addresses), policy->channel_control_helper(),
- args) {
+ addresses, policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@@ -609,10 +610,9 @@ void OldWeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
scheduler_ = std::move(scheduler);
}
// Start timer.
- WeakRefCountedPtr<Picker> self = WeakRef();
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter(
config_->weight_update_period(),
- [self = std::move(self),
+ [self = WeakRefAsSubclass<Picker>(),
work_serializer = wrr_->work_serializer()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
@@ -672,12 +672,11 @@ void OldWeightedRoundRobin::ResetBackoffLocked() {
absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
- config_ = std::move(args.config);
- ServerAddressList addresses;
+ config_ = args.config.TakeAsSubclass<WeightedRoundRobinConfig>();
+ std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
- gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
- this, args.addresses->size());
+ gpr_log(GPR_INFO, "[WRR %p] received update", this);
}
// Weed out duplicate addresses. Also sort the addresses so that if
// the set of the addresses don't change, their indexes in the
@@ -696,10 +695,12 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
}
};
- std::set<ServerAddress, AddressLessThan> ordered_addresses(
- args.addresses->begin(), args.addresses->end());
- addresses =
- ServerAddressList(ordered_addresses.begin(), ordered_addresses.end());
+ std::set<ServerAddress, AddressLessThan> ordered_addresses;
+ (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
+ ordered_addresses.insert(endpoint);
+ });
+ addresses = std::make_shared<EndpointAddressesListIterator>(
+ ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
@@ -716,8 +717,8 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ =
- MakeRefCounted<WeightedRoundRobinSubchannelList>(
- this, std::move(addresses), args.args);
+ MakeRefCounted<WeightedRoundRobinSubchannelList>(this, addresses.get(),
+ args.args);
latest_pending_subchannel_list_->StartWatchingLocked(args.args);
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
@@ -755,8 +756,9 @@ OldWeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) {
auto weight = it->second->RefIfNonZero();
if (weight != nullptr) return weight;
}
- auto weight =
- MakeRefCounted<AddressWeight>(Ref(DEBUG_LOCATION, "AddressWeight"), *key);
+ auto weight = MakeRefCounted<AddressWeight>(
+ RefAsSubclass<OldWeightedRoundRobin>(DEBUG_LOCATION, "AddressWeight"),
+ *key);
address_weight_map_.emplace(*key, weight.get());
return weight;
}
@@ -831,7 +833,8 @@ void OldWeightedRoundRobin::WeightedRoundRobinSubchannelList::
}
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
- MakeRefCounted<Picker>(p->Ref(), this));
+ MakeRefCounted<Picker>(p->RefAsSubclass<OldWeightedRoundRobin>(),
+ this));
} else if (num_connecting_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with subchannel list %p",
@@ -1009,7 +1012,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
// Represents the weight for a given address.
class EndpointWeight : public RefCounted<EndpointWeight> {
public:
- EndpointWeight(RefCountedPtr<WeightedRoundRobin> wrr, std::string key)
+ EndpointWeight(RefCountedPtr<WeightedRoundRobin> wrr,
+ EndpointAddressSet key)
: wrr_(std::move(wrr)), key_(std::move(key)) {}
~EndpointWeight() override;
@@ -1023,7 +1027,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
private:
RefCountedPtr<WeightedRoundRobin> wrr_;
- const std::string key_;
+ const EndpointAddressSet key_;
Mutex mu_;
float weight_ ABSL_GUARDED_BY(&mu_) = 0;
@@ -1035,13 +1039,13 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
public:
class WrrEndpoint : public Endpoint {
public:
- WrrEndpoint(RefCountedPtr<WrrEndpointList> endpoint_list,
- const ServerAddress& address, const ChannelArgs& args,
+ WrrEndpoint(RefCountedPtr<EndpointList> endpoint_list,
+ const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
: Endpoint(std::move(endpoint_list)),
weight_(policy<WeightedRoundRobin>()->GetOrCreateWeight(
- address.address())) {
- Init(address, args, std::move(work_serializer));
+ addresses.addresses())) {
+ Init(addresses, args, std::move(work_serializer));
}
RefCountedPtr<EndpointWeight> weight() const { return weight_; }
@@ -1063,7 +1067,9 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
RefCountedPtr<SubchannelInterface> CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args,
+ const ChannelArgs& args) override;
// Called when the child policy reports a connectivity state update.
void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
@@ -1074,16 +1080,17 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
WrrEndpointList(RefCountedPtr<WeightedRoundRobin> wrr,
- const ServerAddressList& addresses, const ChannelArgs& args)
+ EndpointAddressesIterator* endpoints,
+ const ChannelArgs& args)
: EndpointList(std::move(wrr),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WrrEndpointList"
: nullptr) {
- Init(addresses, args,
- [&](RefCountedPtr<WrrEndpointList> endpoint_list,
- const ServerAddress& address, const ChannelArgs& args) {
+ Init(endpoints, args,
+ [&](RefCountedPtr<EndpointList> endpoint_list,
+ const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable<WrrEndpoint>(
- std::move(endpoint_list), address, args,
+ std::move(endpoint_list), addresses, args,
policy<WeightedRoundRobin>()->work_serializer());
});
}
@@ -1192,7 +1199,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
void ShutdownLocked() override;
RefCountedPtr<EndpointWeight> GetOrCreateWeight(
- const grpc_resolved_address& address);
+ const std::vector<grpc_resolved_address>& addresses);
RefCountedPtr<WeightedRoundRobinConfig> config_;
@@ -1205,7 +1212,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
OrphanablePtr<WrrEndpointList> latest_pending_endpoint_list_;
Mutex endpoint_weight_map_mu_;
- std::map<std::string, EndpointWeight*, std::less<>> endpoint_weight_map_
+ std::map<EndpointAddressSet, EndpointWeight*> endpoint_weight_map_
ABSL_GUARDED_BY(&endpoint_weight_map_mu_);
bool shutdown_ = false;
@@ -1245,7 +1252,7 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight(
gpr_log(GPR_INFO,
"[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f: "
"error_util_penalty=%f, weight=%f (not updating)",
- wrr_.get(), key_.c_str(), qps, eps, utilization,
+ wrr_.get(), key_.ToString().c_str(), qps, eps, utilization,
error_utilization_penalty, weight);
}
return;
@@ -1258,7 +1265,7 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight(
"[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f "
"error_util_penalty=%f : setting weight=%f weight_=%f now=%s "
"last_update_time_=%s non_empty_since_=%s",
- wrr_.get(), key_.c_str(), qps, eps, utilization,
+ wrr_.get(), key_.ToString().c_str(), qps, eps, utilization,
error_utilization_penalty, weight, weight_, now.ToString().c_str(),
last_update_time_.ToString().c_str(),
non_empty_since_.ToString().c_str());
@@ -1277,7 +1284,7 @@ float WeightedRoundRobin::EndpointWeight::GetWeight(
"[WRR %p] subchannel %s: getting weight: now=%s "
"weight_expiration_period=%s blackout_period=%s "
"last_update_time_=%s non_empty_since_=%s weight_=%f",
- wrr_.get(), key_.c_str(), now.ToString().c_str(),
+ wrr_.get(), key_.ToString().c_str(), now.ToString().c_str(),
weight_expiration_period.ToString().c_str(),
blackout_period.ToString().c_str(),
last_update_time_.ToString().c_str(),
@@ -1446,10 +1453,9 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
gpr_log(GPR_INFO, "[WRR %p picker %p] scheduling timer for %s", wrr_.get(),
this, config_->weight_update_period().ToString().c_str());
}
- WeakRefCountedPtr<Picker> self = WeakRef();
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter(
config_->weight_update_period(),
- [self = std::move(self),
+ [self = WeakRefAsSubclass<Picker>(),
work_serializer = wrr_->work_serializer()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
@@ -1509,60 +1515,59 @@ void WeightedRoundRobin::ResetBackoffLocked() {
absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
- config_ = std::move(args.config);
- ServerAddressList addresses;
+ config_ = args.config.TakeAsSubclass<WeightedRoundRobinConfig>();
+ std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
- gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
- this, args.addresses->size());
+ gpr_log(GPR_INFO, "[WRR %p] received update", this);
}
- // Weed out duplicate addresses. Also sort the addresses so that if
- // the set of the addresses don't change, their indexes in the
- // subchannel list don't change, since this avoids unnecessary churn
- // in the picker. Note that this does not ensure that if a given
- // address remains present that it will have the same index; if,
- // for example, an address at the end of the list is replaced with one
- // that sorts much earlier in the list, then all of the addresses in
- // between those two positions will have changed indexes.
- struct AddressLessThan {
- bool operator()(const ServerAddress& address1,
- const ServerAddress& address2) const {
- const grpc_resolved_address& addr1 = address1.address();
- const grpc_resolved_address& addr2 = address2.address();
- if (addr1.len != addr2.len) return addr1.len < addr2.len;
- return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
+ // Weed out duplicate endpoints. Also sort the endpoints so that if
+ // the set of endpoints doesn't change, their indexes in the endpoint
+ // list don't change, since this avoids unnecessary churn in the
+ // picker. Note that this does not ensure that if a given endpoint
+ // remains present that it will have the same index; if, for example,
+ // an endpoint at the end of the list is replaced with one that sorts
+ // much earlier in the list, then all of the endpoints in between those
+ // two positions will have changed indexes.
+ struct EndpointAddressesLessThan {
+ bool operator()(const EndpointAddresses& endpoint1,
+ const EndpointAddresses& endpoint2) const {
+ // Compare unordered addresses only, not channel args.
+ EndpointAddressSet e1(endpoint1.addresses());
+ EndpointAddressSet e2(endpoint2.addresses());
+ return e1 < e2;
}
};
- std::set<ServerAddress, AddressLessThan> ordered_addresses(
- args.addresses->begin(), args.addresses->end());
+ std::set<EndpointAddresses, EndpointAddressesLessThan> ordered_addresses;
+ (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
+ ordered_addresses.insert(endpoint);
+ });
addresses =
- ServerAddressList(ordered_addresses.begin(), ordered_addresses.end());
+ std::make_shared<EndpointAddressesListIterator>(EndpointAddressesList(
+ ordered_addresses.begin(), ordered_addresses.end()));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
- // If we already have a subchannel list, then keep using the existing
+ // If we already have an endpoint list, then keep using the existing
// list, but still report back that the update was not accepted.
if (endpoint_list_ != nullptr) return args.addresses.status();
}
- // Create new subchannel list, replacing the previous pending list, if any.
+ // Create new endpoint list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) &&
latest_pending_endpoint_list_ != nullptr) {
- gpr_log(GPR_INFO, "[WRR %p] replacing previous pending subchannel list %p",
+ gpr_log(GPR_INFO, "[WRR %p] replacing previous pending endpoint list %p",
this, latest_pending_endpoint_list_.get());
}
- latest_pending_endpoint_list_ =
- MakeOrphanable<WrrEndpointList>(Ref(), std::move(addresses), args.args);
+ latest_pending_endpoint_list_ = MakeOrphanable<WrrEndpointList>(
+ RefAsSubclass<WeightedRoundRobin>(), addresses.get(), args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
- // TODO(roth): As part of adding dualstack backend support, we need to
- // also handle the case where the list of addresses for a given
- // endpoint is empty.
if (latest_pending_endpoint_list_->size() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) &&
endpoint_list_ != nullptr) {
- gpr_log(GPR_INFO, "[WRR %p] replacing previous subchannel list %p", this,
+ gpr_log(GPR_INFO, "[WRR %p] replacing previous endpoint list %p", this,
endpoint_list_.get());
}
endpoint_list_ = std::move(latest_pending_endpoint_list_);
@@ -1584,18 +1589,18 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
}
RefCountedPtr<WeightedRoundRobin::EndpointWeight>
-WeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) {
- auto key = grpc_sockaddr_to_uri(&address);
- if (!key.ok()) return nullptr;
+WeightedRoundRobin::GetOrCreateWeight(
+ const std::vector<grpc_resolved_address>& addresses) {
+ EndpointAddressSet key(addresses);
MutexLock lock(&endpoint_weight_map_mu_);
- auto it = endpoint_weight_map_.find(*key);
+ auto it = endpoint_weight_map_.find(key);
if (it != endpoint_weight_map_.end()) {
auto weight = it->second->RefIfNonZero();
if (weight != nullptr) return weight;
}
auto weight = MakeRefCounted<EndpointWeight>(
- Ref(DEBUG_LOCATION, "EndpointWeight"), *key);
- endpoint_weight_map_.emplace(*key, weight.get());
+ RefAsSubclass<WeightedRoundRobin>(DEBUG_LOCATION, "EndpointWeight"), key);
+ endpoint_weight_map_.emplace(key, weight.get());
return weight;
}
@@ -1619,10 +1624,11 @@ void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OobWatcher::
RefCountedPtr<SubchannelInterface>
WeightedRoundRobin::WrrEndpointList::WrrEndpoint::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
auto* wrr = policy<WeightedRoundRobin>();
- auto subchannel =
- wrr->channel_control_helper()->CreateSubchannel(std::move(address), args);
+ auto subchannel = wrr->channel_control_helper()->CreateSubchannel(
+ address, per_address_args, args);
// Start OOB watch if configured.
if (wrr->config_->enable_oob_load_report()) {
subchannel->AddDataWatcher(MakeOobBackendMetricWatcher(
@@ -1657,7 +1663,7 @@ void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OnStateUpdate(
} else if (new_state == GRPC_CHANNEL_READY) {
// If we transition back to READY state, restart the blackout period.
// Skip this if this is the initial notification for this
- // subchannel (which happens whenever we get updated addresses and
+ // endpoint (which happens whenever we get updated addresses and
// create a new endpoint list). Also skip it if the previous state
// was READY (which should never happen in practice, but we've seen
// at least one bug that caused this in the outlier_detection
@@ -1753,7 +1759,7 @@ void WeightedRoundRobin::WrrEndpointList::
}
wrr->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
- MakeRefCounted<Picker>(wrr->Ref(), this));
+ MakeRefCounted<Picker>(wrr->RefAsSubclass<WeightedRoundRobin>(), this));
} else if (num_connecting_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with endpoint list %p",