import os import logging import uuid import threading from contextlib import contextmanager from datetime import datetime from functools import wraps from typing import Any, Callable TELEMETRY_ENABLED = os.environ.get("TELEMETRY_ENABLED", "false").lower() == "true" APPINSIGHTS_PARAM = os.environ.get("APPINSIGHTS_PARAM", "") TELEMETRY_DEBUG = os.environ.get("TELEMETRY_DEBUG", "false").lower() == "true" CLOUD_ROLE_NAME = os.environ.get("OTEL_SERVICE_NAME") _telemetry_initialized = False _tracer = None _meter = None _logger = None _counters: dict[str, Any] = {} _histograms: dict[str, Any] = {} # ============================================================================= # Session Context Management # ============================================================================= _session_context = threading.local() def _generate_event_id() -> str: return str(uuid.uuid4())[:8] def set_session_context(session_hash: str | None) -> str: if session_hash: _session_context.session_id = session_hash[:12] else: if not hasattr(_session_context, 'session_id') or not _session_context.session_id: _session_context.session_id = f"anon_{_generate_event_id()}" return _session_context.session_id def get_session_id() -> str: if not hasattr(_session_context, 'session_id') or not _session_context.session_id: return set_session_context(None) return _session_context.session_id def clear_session_context() -> None: if hasattr(_session_context, 'session_id'): _session_context.session_id = None def _debug_log(message: str) -> None: if TELEMETRY_DEBUG: print(f"[TELEMETRY] {message}", flush=True) def _initialize_telemetry() -> bool: global _telemetry_initialized, _tracer, _meter, _logger if _telemetry_initialized: return True if not TELEMETRY_ENABLED: return False if not APPINSIGHTS_PARAM: logging.warning("APPINSIGHTS_PARAM not set. Telemetry disabled.") return False if not CLOUD_ROLE_NAME: logging.warning("OTEL_SERVICE_NAME not set. Telemetry disabled.") return False try: from azure.monitor.opentelemetry import configure_azure_monitor from opentelemetry import trace, metrics os.environ.setdefault("OTEL_SERVICE_NAME", CLOUD_ROLE_NAME) configure_azure_monitor( connection_string=APPINSIGHTS_PARAM, logger_name="pazabench", ) _tracer = trace.get_tracer("pazabench.telemetry") _meter = metrics.get_meter("pazabench.telemetry") _logger = logging.getLogger("pazabench") _logger.setLevel(logging.INFO) _telemetry_initialized = True logging.info(f"Telemetry initialized. Cloud Role Name: {CLOUD_ROLE_NAME}") return True except ImportError as e: logging.warning(f"Azure Monitor OpenTelemetry packages not installed: {e}") return False except Exception as e: logging.warning(f"Failed to initialize telemetry: {e}") return False def _ensure_initialized(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): if not _telemetry_initialized: _initialize_telemetry() if not _telemetry_initialized: return None return func(*args, **kwargs) return wrapper # ============================================================================= # Custom Events - Track user interactions and behavior # ============================================================================= @_ensure_initialized def track_event( name: str, properties: dict[str, Any] | None = None, parent_event_id: str | None = None ) -> str | None: event_id = _generate_event_id() session_id = get_session_id() all_properties: dict[str, Any] = { "event_id": event_id, "session_id": session_id, "timestamp": datetime.utcnow().isoformat() + "Z", } if parent_event_id: all_properties["parent_event_id"] = parent_event_id if properties: all_properties.update(properties) props_str = ", ".join(f"{k}={v}" for k, v in all_properties.items()) _debug_log(f"EVENT: {name} | {props_str}") if not _logger: return event_id extra = {"microsoft.custom_event.name": name} extra.update(all_properties) _logger.info(f"Event: {name}", extra=extra) return event_id @_ensure_initialized def track_page_view(page_name: str, properties: dict[str, Any] | None = None) -> None: event_properties = {"page_name": page_name} if properties: event_properties.update(properties) track_event("page_view", event_properties) @_ensure_initialized def track_filter_change( filter_type: str, values: list[str] | None, tab: str | None = None ) -> None: properties = { "filter_type": filter_type, "values_count": len(values) if values else 0, "values": ",".join(values[:10]) if values else "", } if tab: properties["tab"] = tab track_event("filter_change", properties) @_ensure_initialized def track_language_interest( language: str, context: str, properties: dict[str, Any] | None = None ) -> None: event_properties = {"language": language, "context": context} if properties: event_properties.update(properties) track_event("language_interest", event_properties) @_ensure_initialized def track_model_interest( model: str, context: str, properties: dict[str, Any] | None = None ) -> None: event_properties = {"model": model, "context": context} if properties: event_properties.update(properties) track_event("model_interest", event_properties) @_ensure_initialized def track_visualization_view( chart_name: str, properties: dict[str, Any] | None = None, languages: list[str] | None = None, models: list[str] | None = None ) -> str | None: event_properties: dict[str, Any] = {"chart_name": chart_name} if languages: event_properties["languages"] = ",".join(languages[:10]) event_properties["languages_count"] = str(len(languages)) if models: event_properties["models"] = ",".join(models[:10]) event_properties["models_count"] = str(len(models)) if properties: event_properties.update(properties) return track_event("visualization_view", event_properties) @_ensure_initialized def track_submission_attempt( dataset_name: str, success: bool, error_message: str | None = None ) -> None: properties = { "dataset_name": dataset_name, "success": str(success), } if error_message: properties["error_message"] = error_message[:500] track_event("submission_attempt", properties) @_ensure_initialized def track_accordion_toggle(accordion_name: str, is_open: bool) -> None: track_event("accordion_toggle", { "accordion_name": accordion_name, "action": "open" if is_open else "close" }) @_ensure_initialized def track_metric_tab_view(metric: str) -> None: track_event("metric_tab_view", {"metric": metric}) @_ensure_initialized def track_view_mode_change(view_mode: str) -> None: track_event("view_mode_change", {"view_mode": view_mode}) @_ensure_initialized def track_region_filter(regions: list[str] | None) -> None: track_filter_change("african_region", regions, tab="asr_metrics") # ============================================================================= # Metrics - Track numeric measurements # ============================================================================= def _get_counter(name: str, description: str = "") -> Any: if not _meter: return None if name not in _counters: _counters[name] = _meter.create_counter( name, description=description, unit="1" ) return _counters[name] def _get_histogram(name: str, description: str = "", unit: str = "ms") -> Any: if not _meter: return None if name not in _histograms: _histograms[name] = _meter.create_histogram( name, description=description, unit=unit ) return _histograms[name] @_ensure_initialized def track_metric( name: str, value: float, attributes: dict[str, str] | None = None, metric_type: str = "histogram" ) -> None: attrs = attributes or {} if metric_type == "counter": counter = _get_counter(name, f"Counter for {name}") if counter: counter.add(int(value), attrs) else: histogram = _get_histogram(name, f"Histogram for {name}") if histogram: histogram.record(value, attrs) @_ensure_initialized def increment_counter(name: str, attributes: dict[str, str] | None = None) -> None: track_metric(name, 1, attributes, metric_type="counter") # ============================================================================= # Error Tracking - Track exceptions and errors # ============================================================================= @_ensure_initialized def track_error( exception: Exception | None = None, properties: dict[str, Any] | None = None, context: str | None = None ) -> None: exc_info = f"{type(exception).__name__}: {exception}" if exception else "No exception" _debug_log(f"ERROR: context={context} | {exc_info}") if not _tracer: return from opentelemetry.trace import SpanKind error_properties = properties or {} if context: error_properties["context"] = context with _tracer.start_as_current_span( f"error_{context or 'unknown'}", kind=SpanKind.INTERNAL ) as span: if exception: span.record_exception(exception) error_properties["exception_type"] = type(exception).__name__ error_properties["exception_message"] = str(exception)[:500] for key, value in error_properties.items(): span.set_attribute(f"error.{key}", str(value)) # ============================================================================= # Tracing - Track operations with spans # ============================================================================= @contextmanager def start_span(name: str, attributes: dict[str, str] | None = None): if not _tracer or not _telemetry_initialized: yield None return with _tracer.start_as_current_span(name) as span: if attributes: for key, value in attributes.items(): span.set_attribute(key, str(value)) yield span # ============================================================================= # Utility Functions # ============================================================================= def is_telemetry_enabled() -> bool: return _telemetry_initialized def get_telemetry_status() -> dict[str, Any]: return { "enabled": TELEMETRY_ENABLED, "initialized": _telemetry_initialized, "connection_string_set": bool(APPINSIGHTS_PARAM), "cloud_role_name": CLOUD_ROLE_NAME, } # ============================================================================= # Convenience wrapper for tracking in Gradio callbacks # ============================================================================= def with_telemetry(event_name: str, **event_props): def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): track_event(event_name, event_props) return func(*args, **kwargs) return wrapper return decorator def init_session_from_request(request) -> str: session_hash = None if request: session_hash = getattr(request, 'session_hash', None) if not session_hash: session_hash = getattr(request, 'username', None) return set_session_context(session_hash) def track_page_load(page_name: str, request=None) -> str | None: if request: init_session_from_request(request) return track_page_view(page_name) if TELEMETRY_ENABLED: _initialize_telemetry()