diff options
Diffstat (limited to 'src/core/ext/xds/xds_endpoint.cc')
-rw-r--r-- | src/core/ext/xds/xds_endpoint.cc | 144 |
1 files changed, 95 insertions, 49 deletions
diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc index 26aa7e789f..761d54969e 100644 --- a/src/core/ext/xds/xds_endpoint.cc +++ b/src/core/ext/xds/xds_endpoint.cc @@ -50,18 +50,35 @@ #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/resolved_address.h" +// IWYU pragma: no_include "absl/meta/type_traits.h" + namespace grpc_core { +namespace { + +// TODO(roth): Remove this once dualstack support is stable. +bool XdsDualstackEndpointsEnabled() { + auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS"); + if (!value.has_value()) return false; + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value); + return parse_succeeded && parsed_value; +} + +} // namespace + // // XdsEndpointResource // std::string XdsEndpointResource::Priority::Locality::ToString() const { std::vector<std::string> endpoint_strings; - for (const ServerAddress& endpoint : endpoints) { + for (const EndpointAddresses& endpoint : endpoints) { endpoint_strings.emplace_back(endpoint.ToString()); } return absl::StrCat("{name=", name->AsHumanReadableString(), @@ -125,8 +142,9 @@ std::string XdsEndpointResource::ToString() const { priority_strings.emplace_back( absl::StrCat("priority ", i, ": ", priority.ToString())); } - return absl::StrCat("priorities=[", absl::StrJoin(priority_strings, ", "), - "], drop_config=", drop_config->ToString()); + return absl::StrCat( + "priorities=[", absl::StrJoin(priority_strings, ", "), "], drop_config=", + drop_config == nullptr ? "<null>" : drop_config->ToString()); } // @@ -144,13 +162,46 @@ void MaybeLogClusterLoadAssignment( envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef( context.symtab); char buf[10240]; - upb_TextEncode(cla, msg_type, nullptr, 0, buf, sizeof(buf)); + upb_TextEncode(reinterpret_cast<const upb_Message*>(cla), msg_type, nullptr, + 0, buf, sizeof(buf)); gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s", context.client, buf); } } -absl::optional<ServerAddress> ServerAddressParse( +absl::optional<grpc_resolved_address> ParseCoreAddress( + const envoy_config_core_v3_Address* address, ValidationErrors* errors) { + if (address == nullptr) { + errors->AddError("field not present"); + return absl::nullopt; + } + ValidationErrors::ScopedField field(errors, ".socket_address"); + const envoy_config_core_v3_SocketAddress* socket_address = + envoy_config_core_v3_Address_socket_address(address); + if (socket_address == nullptr) { + errors->AddError("field not present"); + return absl::nullopt; + } + std::string address_str = UpbStringToStdString( + envoy_config_core_v3_SocketAddress_address(socket_address)); + uint32_t port; + { + ValidationErrors::ScopedField field(errors, ".port_value"); + port = envoy_config_core_v3_SocketAddress_port_value(socket_address); + if (GPR_UNLIKELY(port >> 16) != 0) { + errors->AddError("invalid port"); + return absl::nullopt; + } + } + auto addr = StringToSockaddr(address_str, port); + if (!addr.ok()) { + errors->AddError(addr.status().message()); + return absl::nullopt; + } + return *addr; +} + +absl::optional<EndpointAddresses> EndpointAddressesParse( const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint, ValidationErrors* errors) { // health_status @@ -172,7 +223,7 @@ absl::optional<ServerAddress> ServerAddressParse( } } // endpoint - grpc_resolved_address grpc_address; + std::vector<grpc_resolved_address> addresses; { ValidationErrors::ScopedField field(errors, ".endpoint"); const envoy_config_endpoint_v3_Endpoint* endpoint = @@ -181,43 +232,34 @@ absl::optional<ServerAddress> ServerAddressParse( errors->AddError("field not present"); return absl::nullopt; } - ValidationErrors::ScopedField field2(errors, ".address"); - const envoy_config_core_v3_Address* address = - envoy_config_endpoint_v3_Endpoint_address(endpoint); - if (address == nullptr) { - errors->AddError("field not present"); - return absl::nullopt; - } - ValidationErrors::ScopedField field3(errors, ".socket_address"); - const envoy_config_core_v3_SocketAddress* socket_address = - envoy_config_core_v3_Address_socket_address(address); - if (socket_address == nullptr) { - errors->AddError("field not present"); - return absl::nullopt; - } - std::string address_str = UpbStringToStdString( - envoy_config_core_v3_SocketAddress_address(socket_address)); - uint32_t port; { - ValidationErrors::ScopedField field(errors, ".port_value"); - port = envoy_config_core_v3_SocketAddress_port_value(socket_address); - if (GPR_UNLIKELY(port >> 16) != 0) { - errors->AddError("invalid port"); - return absl::nullopt; - } + ValidationErrors::ScopedField field(errors, ".address"); + auto address = ParseCoreAddress( + envoy_config_endpoint_v3_Endpoint_address(endpoint), errors); + if (address.has_value()) addresses.push_back(*address); } - auto addr = StringToSockaddr(address_str, port); - if (!addr.ok()) { - errors->AddError(addr.status().message()); - } else { - grpc_address = *addr; + if (XdsDualstackEndpointsEnabled()) { + size_t size; + auto* additional_addresses = + envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint, + &size); + for (size_t i = 0; i < size; ++i) { + ValidationErrors::ScopedField field( + errors, absl::StrCat(".additional_addresses[", i, "].address")); + auto address = ParseCoreAddress( + envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address( + additional_addresses[i]), + errors); + if (address.has_value()) addresses.push_back(*address); + } } } - // Convert to ServerAddress. - return ServerAddress(grpc_address, - ChannelArgs() - .Set(GRPC_ARG_ADDRESS_WEIGHT, weight) - .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status())); + if (addresses.empty()) return absl::nullopt; + // Convert to EndpointAddresses. + return EndpointAddresses( + addresses, ChannelArgs() + .Set(GRPC_ARG_ADDRESS_WEIGHT, weight) + .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status())); } struct ParsedLocality { @@ -277,16 +319,17 @@ absl::optional<ParsedLocality> LocalityParse( for (size_t i = 0; i < size; ++i) { ValidationErrors::ScopedField field(errors, absl::StrCat(".lb_endpoints[", i, "]")); - auto address = ServerAddressParse(lb_endpoints[i], errors); - if (address.has_value()) { - bool inserted = address_set->insert(address->address()).second; - if (!inserted) { - errors->AddError(absl::StrCat( - "duplicate endpoint address \"", - grpc_sockaddr_to_uri(&address->address()).value_or("<unknown>"), - "\"")); + auto endpoint = EndpointAddressesParse(lb_endpoints[i], errors); + if (endpoint.has_value()) { + for (const auto& address : endpoint->addresses()) { + bool inserted = address_set->insert(address).second; + if (!inserted) { + errors->AddError(absl::StrCat( + "duplicate endpoint address \"", + grpc_sockaddr_to_uri(&address).value_or("<unknown>"), "\"")); + } } - parsed_locality.locality.endpoints.push_back(std::move(*address)); + parsed_locality.locality.endpoints.push_back(std::move(*endpoint)); } } // priority @@ -406,7 +449,6 @@ absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse( } } // policy - eds_resource->drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>(); const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy( cluster_load_assignment); if (policy != nullptr) { @@ -415,6 +457,10 @@ absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse( const auto* const* drop_overload = envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads( policy, &drop_size); + if (drop_size > 0) { + eds_resource->drop_config = + MakeRefCounted<XdsEndpointResource::DropConfig>(); + } for (size_t i = 0; i < drop_size; ++i) { ValidationErrors::ScopedField field( &errors, absl::StrCat(".drop_overloads[", i, "]")); |