Source code for grpc_observability._open_telemetry_observability

# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time
from typing import Any, Iterable, Optional

import grpc

# pytype: disable=pyi-error
from grpc_observability import _cyobservability
from grpc_observability._open_telemetry_exporter import (
    _OpenTelemetryExporterDelegator,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin

_LOGGER = logging.getLogger(__name__)

ClientCallTracerCapsule = Any  # it appears only once in the function signature
ServerCallTracerFactoryCapsule = (
    Any  # it appears only once in the function signature
)
grpc_observability = Any  # grpc_observability.py imports this module.

GRPC_STATUS_CODE_TO_STRING = {
    grpc.StatusCode.OK: "OK",
    grpc.StatusCode.CANCELLED: "CANCELLED",
    grpc.StatusCode.UNKNOWN: "UNKNOWN",
    grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT",
    grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED",
    grpc.StatusCode.NOT_FOUND: "NOT_FOUND",
    grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS",
    grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED",
    grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED",
    grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED",
    grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION",
    grpc.StatusCode.ABORTED: "ABORTED",
    grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE",
    grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED",
    grpc.StatusCode.INTERNAL: "INTERNAL",
    grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE",
    grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}


# pylint: disable=no-self-use
[docs]class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): """OpenTelemetry based plugin implementation. This is class is part of an EXPERIMENTAL API. Args: plugin: OpenTelemetryPlugin to enable. """ exporter: "grpc_observability.Exporter" plugins: Iterable[OpenTelemetryPlugin] def __init__( self, *, plugins: Optional[Iterable[OpenTelemetryPlugin]] = None, ): _plugins = [] if plugins: for plugin in plugins: _plugins.append(_OpenTelemetryPlugin(plugin)) self.exporter = _OpenTelemetryExporterDelegator(_plugins) try: _cyobservability.activate_stats() self.set_stats(True) except Exception as e: # pylint: disable=broad-except raise ValueError(f"Activate observability metrics failed with: {e}") def __enter__(self): try: _cyobservability.cyobservability_init(self.exporter) # TODO(xuanwn): Use specific exceptons except Exception as e: # pylint: disable=broad-except _LOGGER.exception("Initiate observability failed with: %s", e) grpc._observability.observability_init(self) return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.exit() def exit(self) -> None: # Sleep so we don't loss any data. If we shutdown export thread # immediately after exit, it's possible that core didn't call RecordEnd # in callTracer, and all data recorded by calling RecordEnd will be # lost. # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in # AwaitNextBatchLocked. # TODO(xuanwn): explicit synchronization # https://github.com/grpc/grpc/issues/33262 time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) self.set_tracing(False) self.set_stats(False) _cyobservability.observability_deinit() grpc._observability.observability_deinit()
[docs] def create_client_call_tracer( self, method_name: bytes, target: bytes ) -> ClientCallTracerCapsule: trace_id = b"TRACE_ID" capsule = _cyobservability.create_client_call_tracer( method_name, target, trace_id ) return capsule
[docs] def create_server_call_tracer_factory( self, ) -> ServerCallTracerFactoryCapsule: capsule = _cyobservability.create_server_call_tracer_factory_capsule() return capsule
[docs] def delete_client_call_tracer( self, client_call_tracer: ClientCallTracerCapsule ) -> None: _cyobservability.delete_client_call_tracer(client_call_tracer)
[docs] def save_trace_context( self, trace_id: str, span_id: str, is_sampled: bool ) -> None: pass
[docs] def record_rpc_latency( self, method: str, target: str, rpc_latency: float, status_code: grpc.StatusCode, ) -> None: status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") _cyobservability._record_rpc_latency( self.exporter, method, target, rpc_latency, status_code )