diff options
Diffstat (limited to 'src/python/grpcio_observability/grpc_observability/_gcp_observability.py')
-rw-r--r-- | src/python/grpcio_observability/grpc_observability/_gcp_observability.py | 108 |
1 files changed, 24 insertions, 84 deletions
diff --git a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py index d62653bcd5..797c4797d0 100644 --- a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py +++ b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py @@ -13,20 +13,15 @@ # limitations under the License. from __future__ import annotations -from dataclasses import dataclass -from dataclasses import field import logging -import threading import time -from typing import Any, Mapping, Optional +from typing import Any import grpc -from grpc_observability import _cyobservability # pytype: disable=pyi-error -from grpc_observability._open_census_exporter import CENSUS_UPLOAD_INTERVAL_SECS -from grpc_observability._open_census_exporter import OpenCensusExporter -from opencensus.trace import execution_context -from opencensus.trace import span_context as span_context_module -from opencensus.trace import trace_options as trace_options_module + +# pytype: disable=pyi-error +from grpc_observability import _cyobservability +from grpc_observability import _observability_config _LOGGER = logging.getLogger(__name__) @@ -56,42 +51,6 @@ GRPC_STATUS_CODE_TO_STRING = { grpc.StatusCode.DATA_LOSS: "DATA_LOSS", } -GRPC_SPAN_CONTEXT = "grpc_span_context" - - -@dataclass -class GcpObservabilityPythonConfig: - _singleton = None - _lock: threading.RLock = threading.RLock() - project_id: str = "" - stats_enabled: bool = False - tracing_enabled: bool = False - labels: Optional[Mapping[str, str]] = field(default_factory=dict) - sampling_rate: Optional[float] = 0.0 - - @staticmethod - def get(): - with GcpObservabilityPythonConfig._lock: - if GcpObservabilityPythonConfig._singleton is None: - GcpObservabilityPythonConfig._singleton = ( - GcpObservabilityPythonConfig() - ) - return GcpObservabilityPythonConfig._singleton - - def set_configuration( - self, - project_id: str, - sampling_rate: Optional[float] = 0.0, - labels: Optional[Mapping[str, str]] = None, - tracing_enabled: bool = False, - stats_enabled: bool = False, - ) -> None: - self.project_id = project_id - self.stats_enabled = stats_enabled - self.tracing_enabled = tracing_enabled - self.labels = labels - self.sampling_rate = sampling_rate - # pylint: disable=no-self-use class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): @@ -108,25 +67,22 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): exporter: Exporter used to export data. """ - config: GcpObservabilityPythonConfig + config: _observability_config.GcpObservabilityConfig exporter: "grpc_observability.Exporter" - use_open_census_exporter: bool def __init__(self, exporter: "grpc_observability.Exporter" = None): self.exporter = None - self.config = GcpObservabilityPythonConfig.get() - self.use_open_census_exporter = False - config_valid = _cyobservability.set_gcp_observability_config( - self.config - ) - if not config_valid: - raise ValueError("Invalid configuration") + self.config = None + try: + self.config = _observability_config.read_config() + _cyobservability.activate_config(self.config) + except Exception as e: # pylint: disable=broad-except + raise ValueError(f"Reading configuration failed with: {e}") if exporter: self.exporter = exporter else: - self.exporter = OpenCensusExporter(self.config) - self.use_open_census_exporter = True + raise ValueError(f"Please provide an exporter!") if self.config.tracing_enabled: self.set_tracing(True) @@ -156,31 +112,18 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): # TODO(xuanwn): explicit synchronization # https://github.com/grpc/grpc/issues/33262 time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) - if self.use_open_census_exporter: - # Sleep so StackDriver can upload data to GCP. - time.sleep(CENSUS_UPLOAD_INTERVAL_SECS) self.set_tracing(False) self.set_stats(False) _cyobservability.observability_deinit() grpc._observability.observability_deinit() def create_client_call_tracer( - self, method_name: bytes + self, method_name: bytes, target: bytes ) -> ClientCallTracerCapsule: - grpc_span_context = execution_context.get_opencensus_attr( - GRPC_SPAN_CONTEXT + trace_id = b"TRACE_ID" + capsule = _cyobservability.create_client_call_tracer( + method_name, target, trace_id ) - if grpc_span_context: - trace_id = grpc_span_context.trace_id.encode("utf8") - parent_span_id = grpc_span_context.span_id.encode("utf8") - capsule = _cyobservability.create_client_call_tracer( - method_name, trace_id, parent_span_id - ) - else: - trace_id = span_context_module.generate_trace_id().encode("utf8") - capsule = _cyobservability.create_client_call_tracer( - method_name, trace_id - ) return capsule def create_server_call_tracer_factory( @@ -197,19 +140,16 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): def save_trace_context( self, trace_id: str, span_id: str, is_sampled: bool ) -> None: - trace_options = trace_options_module.TraceOptions(0) - trace_options.set_enabled(is_sampled) - span_context = span_context_module.SpanContext( - trace_id=trace_id, - span_id=span_id, - trace_options=trace_options, - ) - execution_context.set_opencensus_attr(GRPC_SPAN_CONTEXT, span_context) + pass def record_rpc_latency( - self, method: str, rpc_latency: float, status_code: grpc.StatusCode + self, + method: str, + target: str, + rpc_latency: float, + status_code: grpc.StatusCode, ) -> None: status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") _cyobservability._record_rpc_latency( - self.exporter, method, rpc_latency, status_code + self.exporter, method, target, rpc_latency, status_code ) |