Source code for opentelemetry.sdk.trace.export

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import sys
import typing
from enum import Enum
from os import environ, linesep

from opentelemetry.context import (
    _SUPPRESS_INSTRUMENTATION_KEY,
    Context,
    attach,
    detach,
    set_value,
)
from opentelemetry.metrics import MeterProvider, get_meter_provider
from opentelemetry.sdk._shared_internal import BatchProcessor, ProcessorMetrics
from opentelemetry.sdk.environment_variables import (
    OTEL_BSP_EXPORT_TIMEOUT,
    OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
    OTEL_BSP_MAX_QUEUE_SIZE,
    OTEL_BSP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
    OtelComponentTypeValues,
)

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
_DEFAULT_MAX_QUEUE_SIZE = 2048
_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
    "Unable to parse value for %s as integer. Defaulting to %s."
)

logger = logging.getLogger(__name__)


[docs] class SpanExportResult(Enum): SUCCESS = 0 FAILURE = 1
[docs] class SpanExporter: """Interface for exporting spans. Interface to be implemented by services that want to export spans recorded in their own format. To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a `SimpleSpanProcessor` or a `BatchSpanProcessor`. """
[docs] def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: # pyright: ignore[reportReturnType] """Exports a batch of telemetry data. Args: spans: The list of `opentelemetry.trace.Span` objects to be exported Returns: The result of the export """
[docs] def shutdown(self) -> None: """Shuts down the exporter. Called when the SDK is shut down. """
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: # pyright: ignore[reportReturnType] """Hint to ensure that the export of any spans the exporter has received prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably before returning from this method. """
[docs] class SimpleSpanProcessor(SpanProcessor): """Simple SpanProcessor implementation. SimpleSpanProcessor is an implementation of `SpanProcessor` that passes ended spans directly to the configured `SpanExporter`. """ def __init__( self, span_exporter: SpanExporter, *, meter_provider: MeterProvider | None = None, ): self.span_exporter = span_exporter self._metrics = ProcessorMetrics( "traces", OtelComponentTypeValues.SIMPLE_SPAN_PROCESSOR, meter_provider or get_meter_provider(), )
[docs] def on_start( self, span: Span, parent_context: typing.Optional[Context] = None ) -> None: pass
def _on_ending(self, span: Span) -> None: pass
[docs] def on_end(self, span: ReadableSpan) -> None: if not (span.context and span.context.trace_flags.sampled): return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) error: Exception | None = None try: self.span_exporter.export((span,)) # pylint: disable=broad-exception-caught except Exception as err: error = err logger.exception("Exception while exporting Span.") finally: self._metrics.finish_items(1, error) detach(token)
[docs] def shutdown(self) -> None: self.span_exporter.shutdown()
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=unused-argument return True
[docs] class BatchSpanProcessor(SpanProcessor): """Batch span processor implementation. `BatchSpanProcessor` is an implementation of `SpanProcessor` that batches ended spans and pushes them to the configured `SpanExporter`. `BatchSpanProcessor` is configurable with the following environment variables which correspond to constructor parameters: - :envvar:`OTEL_BSP_SCHEDULE_DELAY` - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` - :envvar:`OTEL_BSP_EXPORT_TIMEOUT` All the logic for emitting spans, shutting down etc. resides in the `BatchProcessor` class. """ def __init__( self, span_exporter: SpanExporter, max_queue_size: int | None = None, schedule_delay_millis: float | None = None, max_export_batch_size: int | None = None, export_timeout_millis: float | None = None, *, meter_provider: MeterProvider | None = None, ): if max_queue_size is None: max_queue_size = BatchSpanProcessor._default_max_queue_size() if schedule_delay_millis is None: schedule_delay_millis = ( BatchSpanProcessor._default_schedule_delay_millis() ) if max_export_batch_size is None: max_export_batch_size = ( BatchSpanProcessor._default_max_export_batch_size() ) # Not used. No way currently to pass timeout to export. if export_timeout_millis is None: export_timeout_millis = ( BatchSpanProcessor._default_export_timeout_millis() ) BatchSpanProcessor._validate_arguments( max_queue_size, schedule_delay_millis, max_export_batch_size ) self._batch_processor = BatchProcessor( span_exporter, schedule_delay_millis, max_export_batch_size, export_timeout_millis, max_queue_size, "Span", ProcessorMetrics( "traces", OtelComponentTypeValues.BATCHING_SPAN_PROCESSOR, meter_provider or get_meter_provider(), capacity=max_queue_size, ), ) # Added for backward compatibility. Not recommended to directly access/use underlying exporter. @property def span_exporter(self): return self._batch_processor._exporter # pylint: disable=protected-access
[docs] def on_start( self, span: Span, parent_context: Context | None = None ) -> None: pass
def _on_ending(self, span: Span) -> None: pass
[docs] def on_end(self, span: ReadableSpan) -> None: if not (span.context and span.context.trace_flags.sampled): return self._batch_processor.emit(span)
[docs] def shutdown(self): return self._batch_processor.shutdown()
[docs] def force_flush(self, timeout_millis: typing.Optional[int] = None) -> bool: return self._batch_processor.force_flush(timeout_millis)
@staticmethod def _default_max_queue_size(): try: return int( environ.get(OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) ) except ValueError: logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BSP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE, ) return _DEFAULT_MAX_QUEUE_SIZE @staticmethod def _default_schedule_delay_millis(): try: return int( environ.get( OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS ) ) except ValueError: logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BSP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS, ) return _DEFAULT_SCHEDULE_DELAY_MILLIS @staticmethod def _default_max_export_batch_size(): try: return int( environ.get( OTEL_BSP_MAX_EXPORT_BATCH_SIZE, _DEFAULT_MAX_EXPORT_BATCH_SIZE, ) ) except ValueError: logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, _DEFAULT_MAX_EXPORT_BATCH_SIZE, ) return _DEFAULT_MAX_EXPORT_BATCH_SIZE @staticmethod def _default_export_timeout_millis(): try: return int( environ.get( OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS ) ) except ValueError: logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BSP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS, ) return _DEFAULT_EXPORT_TIMEOUT_MILLIS @staticmethod def _validate_arguments( max_queue_size, schedule_delay_millis, max_export_batch_size ): if max_queue_size <= 0: raise ValueError("max_queue_size must be a positive integer.") if schedule_delay_millis <= 0: raise ValueError("schedule_delay_millis must be positive.") if max_export_batch_size <= 0: raise ValueError( "max_export_batch_size must be a positive integer." ) if max_export_batch_size > max_queue_size: raise ValueError( "max_export_batch_size must be less than or equal to max_queue_size." )
[docs] class ConsoleSpanExporter(SpanExporter): """Implementation of :class:`SpanExporter` that prints spans to the console. This class can be used for diagnostic purposes. It prints the exported spans to the console STDOUT. """ def __init__( self, service_name: str | None = None, out: typing.IO = sys.stdout, formatter: typing.Callable[[ReadableSpan], str] = lambda span: ( span.to_json() + linesep ), ): self.out = out self.formatter = formatter self.service_name = service_name
[docs] def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: for span in spans: self.out.write(self.formatter(span)) self.out.flush() return SpanExportResult.SUCCESS
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: return True