aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/xds/xds_endpoint.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/xds/xds_endpoint.cc')
-rw-r--r--src/core/ext/xds/xds_endpoint.cc144
1 files changed, 95 insertions, 49 deletions
diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc
index 26aa7e789f..761d54969e 100644
--- a/src/core/ext/xds/xds_endpoint.cc
+++ b/src/core/ext/xds/xds_endpoint.cc
@@ -50,18 +50,35 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/resolved_address.h"
+// IWYU pragma: no_include "absl/meta/type_traits.h"
+
namespace grpc_core {
+namespace {
+
+// TODO(roth): Remove this once dualstack support is stable.
+bool XdsDualstackEndpointsEnabled() {
+ auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ if (!value.has_value()) return false;
+ bool parsed_value;
+ bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
+ return parse_succeeded && parsed_value;
+}
+
+} // namespace
+
//
// XdsEndpointResource
//
std::string XdsEndpointResource::Priority::Locality::ToString() const {
std::vector<std::string> endpoint_strings;
- for (const ServerAddress& endpoint : endpoints) {
+ for (const EndpointAddresses& endpoint : endpoints) {
endpoint_strings.emplace_back(endpoint.ToString());
}
return absl::StrCat("{name=", name->AsHumanReadableString(),
@@ -125,8 +142,9 @@ std::string XdsEndpointResource::ToString() const {
priority_strings.emplace_back(
absl::StrCat("priority ", i, ": ", priority.ToString()));
}
- return absl::StrCat("priorities=[", absl::StrJoin(priority_strings, ", "),
- "], drop_config=", drop_config->ToString());
+ return absl::StrCat(
+ "priorities=[", absl::StrJoin(priority_strings, ", "), "], drop_config=",
+ drop_config == nullptr ? "<null>" : drop_config->ToString());
}
//
@@ -144,13 +162,46 @@ void MaybeLogClusterLoadAssignment(
envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
context.symtab);
char buf[10240];
- upb_TextEncode(cla, msg_type, nullptr, 0, buf, sizeof(buf));
+ upb_TextEncode(reinterpret_cast<const upb_Message*>(cla), msg_type, nullptr,
+ 0, buf, sizeof(buf));
gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s",
context.client, buf);
}
}
-absl::optional<ServerAddress> ServerAddressParse(
+absl::optional<grpc_resolved_address> ParseCoreAddress(
+ const envoy_config_core_v3_Address* address, ValidationErrors* errors) {
+ if (address == nullptr) {
+ errors->AddError("field not present");
+ return absl::nullopt;
+ }
+ ValidationErrors::ScopedField field(errors, ".socket_address");
+ const envoy_config_core_v3_SocketAddress* socket_address =
+ envoy_config_core_v3_Address_socket_address(address);
+ if (socket_address == nullptr) {
+ errors->AddError("field not present");
+ return absl::nullopt;
+ }
+ std::string address_str = UpbStringToStdString(
+ envoy_config_core_v3_SocketAddress_address(socket_address));
+ uint32_t port;
+ {
+ ValidationErrors::ScopedField field(errors, ".port_value");
+ port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
+ if (GPR_UNLIKELY(port >> 16) != 0) {
+ errors->AddError("invalid port");
+ return absl::nullopt;
+ }
+ }
+ auto addr = StringToSockaddr(address_str, port);
+ if (!addr.ok()) {
+ errors->AddError(addr.status().message());
+ return absl::nullopt;
+ }
+ return *addr;
+}
+
+absl::optional<EndpointAddresses> EndpointAddressesParse(
const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
ValidationErrors* errors) {
// health_status
@@ -172,7 +223,7 @@ absl::optional<ServerAddress> ServerAddressParse(
}
}
// endpoint
- grpc_resolved_address grpc_address;
+ std::vector<grpc_resolved_address> addresses;
{
ValidationErrors::ScopedField field(errors, ".endpoint");
const envoy_config_endpoint_v3_Endpoint* endpoint =
@@ -181,43 +232,34 @@ absl::optional<ServerAddress> ServerAddressParse(
errors->AddError("field not present");
return absl::nullopt;
}
- ValidationErrors::ScopedField field2(errors, ".address");
- const envoy_config_core_v3_Address* address =
- envoy_config_endpoint_v3_Endpoint_address(endpoint);
- if (address == nullptr) {
- errors->AddError("field not present");
- return absl::nullopt;
- }
- ValidationErrors::ScopedField field3(errors, ".socket_address");
- const envoy_config_core_v3_SocketAddress* socket_address =
- envoy_config_core_v3_Address_socket_address(address);
- if (socket_address == nullptr) {
- errors->AddError("field not present");
- return absl::nullopt;
- }
- std::string address_str = UpbStringToStdString(
- envoy_config_core_v3_SocketAddress_address(socket_address));
- uint32_t port;
{
- ValidationErrors::ScopedField field(errors, ".port_value");
- port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
- if (GPR_UNLIKELY(port >> 16) != 0) {
- errors->AddError("invalid port");
- return absl::nullopt;
- }
+ ValidationErrors::ScopedField field(errors, ".address");
+ auto address = ParseCoreAddress(
+ envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
+ if (address.has_value()) addresses.push_back(*address);
}
- auto addr = StringToSockaddr(address_str, port);
- if (!addr.ok()) {
- errors->AddError(addr.status().message());
- } else {
- grpc_address = *addr;
+ if (XdsDualstackEndpointsEnabled()) {
+ size_t size;
+ auto* additional_addresses =
+ envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
+ &size);
+ for (size_t i = 0; i < size; ++i) {
+ ValidationErrors::ScopedField field(
+ errors, absl::StrCat(".additional_addresses[", i, "].address"));
+ auto address = ParseCoreAddress(
+ envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
+ additional_addresses[i]),
+ errors);
+ if (address.has_value()) addresses.push_back(*address);
+ }
}
}
- // Convert to ServerAddress.
- return ServerAddress(grpc_address,
- ChannelArgs()
- .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
- .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
+ if (addresses.empty()) return absl::nullopt;
+ // Convert to EndpointAddresses.
+ return EndpointAddresses(
+ addresses, ChannelArgs()
+ .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
+ .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
}
struct ParsedLocality {
@@ -277,16 +319,17 @@ absl::optional<ParsedLocality> LocalityParse(
for (size_t i = 0; i < size; ++i) {
ValidationErrors::ScopedField field(errors,
absl::StrCat(".lb_endpoints[", i, "]"));
- auto address = ServerAddressParse(lb_endpoints[i], errors);
- if (address.has_value()) {
- bool inserted = address_set->insert(address->address()).second;
- if (!inserted) {
- errors->AddError(absl::StrCat(
- "duplicate endpoint address \"",
- grpc_sockaddr_to_uri(&address->address()).value_or("<unknown>"),
- "\""));
+ auto endpoint = EndpointAddressesParse(lb_endpoints[i], errors);
+ if (endpoint.has_value()) {
+ for (const auto& address : endpoint->addresses()) {
+ bool inserted = address_set->insert(address).second;
+ if (!inserted) {
+ errors->AddError(absl::StrCat(
+ "duplicate endpoint address \"",
+ grpc_sockaddr_to_uri(&address).value_or("<unknown>"), "\""));
+ }
}
- parsed_locality.locality.endpoints.push_back(std::move(*address));
+ parsed_locality.locality.endpoints.push_back(std::move(*endpoint));
}
}
// priority
@@ -406,7 +449,6 @@ absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
}
}
// policy
- eds_resource->drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
cluster_load_assignment);
if (policy != nullptr) {
@@ -415,6 +457,10 @@ absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
const auto* const* drop_overload =
envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
policy, &drop_size);
+ if (drop_size > 0) {
+ eds_resource->drop_config =
+ MakeRefCounted<XdsEndpointResource::DropConfig>();
+ }
for (size_t i = 0; i < drop_size; ++i) {
ValidationErrors::ScopedField field(
&errors, absl::StrCat(".drop_overloads[", i, "]"));