# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any, Iterable, Optional
import grpc
# pytype: disable=pyi-error
from grpc_observability import _cyobservability
from grpc_observability._open_telemetry_exporter import (
_OpenTelemetryExporterDelegator,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin
_LOGGER = logging.getLogger(__name__)
ClientCallTracerCapsule = Any # it appears only once in the function signature
ServerCallTracerFactoryCapsule = (
Any # it appears only once in the function signature
)
grpc_observability = Any # grpc_observability.py imports this module.
GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.OK: "OK",
grpc.StatusCode.CANCELLED: "CANCELLED",
grpc.StatusCode.UNKNOWN: "UNKNOWN",
grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT",
grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED",
grpc.StatusCode.NOT_FOUND: "NOT_FOUND",
grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS",
grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED",
grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED",
grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED",
grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION",
grpc.StatusCode.ABORTED: "ABORTED",
grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE",
grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED",
grpc.StatusCode.INTERNAL: "INTERNAL",
grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE",
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}
# pylint: disable=no-self-use
[docs]class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
"""OpenTelemetry based plugin implementation.
This is class is part of an EXPERIMENTAL API.
Args:
plugin: OpenTelemetryPlugin to enable.
"""
exporter: "grpc_observability.Exporter"
plugins: Iterable[OpenTelemetryPlugin]
def __init__(
self,
*,
plugins: Optional[Iterable[OpenTelemetryPlugin]] = None,
):
_plugins = []
if plugins:
for plugin in plugins:
_plugins.append(_OpenTelemetryPlugin(plugin))
self.exporter = _OpenTelemetryExporterDelegator(_plugins)
try:
_cyobservability.activate_stats()
self.set_stats(True)
except Exception as e: # pylint: disable=broad-except
raise ValueError(f"Activate observability metrics failed with: {e}")
def __enter__(self):
try:
_cyobservability.cyobservability_init(self.exporter)
# TODO(xuanwn): Use specific exceptons
except Exception as e: # pylint: disable=broad-except
_LOGGER.exception("Initiate observability failed with: %s", e)
grpc._observability.observability_init(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.exit()
def exit(self) -> None:
# Sleep so we don't loss any data. If we shutdown export thread
# immediately after exit, it's possible that core didn't call RecordEnd
# in callTracer, and all data recorded by calling RecordEnd will be
# lost.
# CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in
# AwaitNextBatchLocked.
# TODO(xuanwn): explicit synchronization
# https://github.com/grpc/grpc/issues/33262
time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS)
self.set_tracing(False)
self.set_stats(False)
_cyobservability.observability_deinit()
grpc._observability.observability_deinit()
[docs] def create_client_call_tracer(
self, method_name: bytes, target: bytes
) -> ClientCallTracerCapsule:
trace_id = b"TRACE_ID"
capsule = _cyobservability.create_client_call_tracer(
method_name, target, trace_id
)
return capsule
[docs] def create_server_call_tracer_factory(
self,
) -> ServerCallTracerFactoryCapsule:
capsule = _cyobservability.create_server_call_tracer_factory_capsule()
return capsule
[docs] def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)
[docs] def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
) -> None:
pass
[docs] def record_rpc_latency(
self,
method: str,
target: str,
rpc_latency: float,
status_code: grpc.StatusCode,
) -> None:
status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
_cyobservability._record_rpc_latency(
self.exporter, method, target, rpc_latency, status_code
)