aboutsummaryrefslogtreecommitdiff
path: root/src/python/grpcio_observability/grpc_observability/_gcp_observability.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_observability/grpc_observability/_gcp_observability.py')
-rw-r--r--src/python/grpcio_observability/grpc_observability/_gcp_observability.py108
1 files changed, 24 insertions, 84 deletions
diff --git a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py
index d62653bcd5..797c4797d0 100644
--- a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py
+++ b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py
@@ -13,20 +13,15 @@
# limitations under the License.
from __future__ import annotations
-from dataclasses import dataclass
-from dataclasses import field
import logging
-import threading
import time
-from typing import Any, Mapping, Optional
+from typing import Any
import grpc
-from grpc_observability import _cyobservability # pytype: disable=pyi-error
-from grpc_observability._open_census_exporter import CENSUS_UPLOAD_INTERVAL_SECS
-from grpc_observability._open_census_exporter import OpenCensusExporter
-from opencensus.trace import execution_context
-from opencensus.trace import span_context as span_context_module
-from opencensus.trace import trace_options as trace_options_module
+
+# pytype: disable=pyi-error
+from grpc_observability import _cyobservability
+from grpc_observability import _observability_config
_LOGGER = logging.getLogger(__name__)
@@ -56,42 +51,6 @@ GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}
-GRPC_SPAN_CONTEXT = "grpc_span_context"
-
-
-@dataclass
-class GcpObservabilityPythonConfig:
- _singleton = None
- _lock: threading.RLock = threading.RLock()
- project_id: str = ""
- stats_enabled: bool = False
- tracing_enabled: bool = False
- labels: Optional[Mapping[str, str]] = field(default_factory=dict)
- sampling_rate: Optional[float] = 0.0
-
- @staticmethod
- def get():
- with GcpObservabilityPythonConfig._lock:
- if GcpObservabilityPythonConfig._singleton is None:
- GcpObservabilityPythonConfig._singleton = (
- GcpObservabilityPythonConfig()
- )
- return GcpObservabilityPythonConfig._singleton
-
- def set_configuration(
- self,
- project_id: str,
- sampling_rate: Optional[float] = 0.0,
- labels: Optional[Mapping[str, str]] = None,
- tracing_enabled: bool = False,
- stats_enabled: bool = False,
- ) -> None:
- self.project_id = project_id
- self.stats_enabled = stats_enabled
- self.tracing_enabled = tracing_enabled
- self.labels = labels
- self.sampling_rate = sampling_rate
-
# pylint: disable=no-self-use
class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
@@ -108,25 +67,22 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
exporter: Exporter used to export data.
"""
- config: GcpObservabilityPythonConfig
+ config: _observability_config.GcpObservabilityConfig
exporter: "grpc_observability.Exporter"
- use_open_census_exporter: bool
def __init__(self, exporter: "grpc_observability.Exporter" = None):
self.exporter = None
- self.config = GcpObservabilityPythonConfig.get()
- self.use_open_census_exporter = False
- config_valid = _cyobservability.set_gcp_observability_config(
- self.config
- )
- if not config_valid:
- raise ValueError("Invalid configuration")
+ self.config = None
+ try:
+ self.config = _observability_config.read_config()
+ _cyobservability.activate_config(self.config)
+ except Exception as e: # pylint: disable=broad-except
+ raise ValueError(f"Reading configuration failed with: {e}")
if exporter:
self.exporter = exporter
else:
- self.exporter = OpenCensusExporter(self.config)
- self.use_open_census_exporter = True
+ raise ValueError(f"Please provide an exporter!")
if self.config.tracing_enabled:
self.set_tracing(True)
@@ -156,31 +112,18 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
# TODO(xuanwn): explicit synchronization
# https://github.com/grpc/grpc/issues/33262
time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS)
- if self.use_open_census_exporter:
- # Sleep so StackDriver can upload data to GCP.
- time.sleep(CENSUS_UPLOAD_INTERVAL_SECS)
self.set_tracing(False)
self.set_stats(False)
_cyobservability.observability_deinit()
grpc._observability.observability_deinit()
def create_client_call_tracer(
- self, method_name: bytes
+ self, method_name: bytes, target: bytes
) -> ClientCallTracerCapsule:
- grpc_span_context = execution_context.get_opencensus_attr(
- GRPC_SPAN_CONTEXT
+ trace_id = b"TRACE_ID"
+ capsule = _cyobservability.create_client_call_tracer(
+ method_name, target, trace_id
)
- if grpc_span_context:
- trace_id = grpc_span_context.trace_id.encode("utf8")
- parent_span_id = grpc_span_context.span_id.encode("utf8")
- capsule = _cyobservability.create_client_call_tracer(
- method_name, trace_id, parent_span_id
- )
- else:
- trace_id = span_context_module.generate_trace_id().encode("utf8")
- capsule = _cyobservability.create_client_call_tracer(
- method_name, trace_id
- )
return capsule
def create_server_call_tracer_factory(
@@ -197,19 +140,16 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
) -> None:
- trace_options = trace_options_module.TraceOptions(0)
- trace_options.set_enabled(is_sampled)
- span_context = span_context_module.SpanContext(
- trace_id=trace_id,
- span_id=span_id,
- trace_options=trace_options,
- )
- execution_context.set_opencensus_attr(GRPC_SPAN_CONTEXT, span_context)
+ pass
def record_rpc_latency(
- self, method: str, rpc_latency: float, status_code: grpc.StatusCode
+ self,
+ method: str,
+ target: str,
+ rpc_latency: float,
+ status_code: grpc.StatusCode,
) -> None:
status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
_cyobservability._record_rpc_latency(
- self.exporter, method, rpc_latency, status_code
+ self.exporter, method, target, rpc_latency, status_code
)