Spaces:
Running
Running
| import os | |
| import logging | |
| import uuid | |
| import threading | |
| from contextlib import contextmanager | |
| from datetime import datetime | |
| from functools import wraps | |
| from typing import Any, Callable | |
| TELEMETRY_ENABLED = os.environ.get("TELEMETRY_ENABLED", "false").lower() == "true" | |
| APPINSIGHTS_PARAM = os.environ.get("APPINSIGHTS_PARAM", "") | |
| TELEMETRY_DEBUG = os.environ.get("TELEMETRY_DEBUG", "false").lower() == "true" | |
| CLOUD_ROLE_NAME = os.environ.get("OTEL_SERVICE_NAME") | |
| _telemetry_initialized = False | |
| _tracer = None | |
| _meter = None | |
| _logger = None | |
| _counters: dict[str, Any] = {} | |
| _histograms: dict[str, Any] = {} | |
| # ============================================================================= | |
| # Session Context Management | |
| # ============================================================================= | |
| _session_context = threading.local() | |
| def _generate_event_id() -> str: | |
| return str(uuid.uuid4())[:8] | |
| def set_session_context(session_hash: str | None) -> str: | |
| if session_hash: | |
| _session_context.session_id = session_hash[:12] | |
| else: | |
| if not hasattr(_session_context, 'session_id') or not _session_context.session_id: | |
| _session_context.session_id = f"anon_{_generate_event_id()}" | |
| return _session_context.session_id | |
| def get_session_id() -> str: | |
| if not hasattr(_session_context, 'session_id') or not _session_context.session_id: | |
| return set_session_context(None) | |
| return _session_context.session_id | |
| def clear_session_context() -> None: | |
| if hasattr(_session_context, 'session_id'): | |
| _session_context.session_id = None | |
| def _debug_log(message: str) -> None: | |
| if TELEMETRY_DEBUG: | |
| print(f"[TELEMETRY] {message}", flush=True) | |
| def _initialize_telemetry() -> bool: | |
| global _telemetry_initialized, _tracer, _meter, _logger | |
| if _telemetry_initialized: | |
| return True | |
| if not TELEMETRY_ENABLED: | |
| return False | |
| if not APPINSIGHTS_PARAM: | |
| logging.warning("APPINSIGHTS_PARAM not set. Telemetry disabled.") | |
| return False | |
| if not CLOUD_ROLE_NAME: | |
| logging.warning("OTEL_SERVICE_NAME not set. Telemetry disabled.") | |
| return False | |
| try: | |
| from azure.monitor.opentelemetry import configure_azure_monitor | |
| from opentelemetry import trace, metrics | |
| os.environ.setdefault("OTEL_SERVICE_NAME", CLOUD_ROLE_NAME) | |
| configure_azure_monitor( | |
| connection_string=APPINSIGHTS_PARAM, | |
| logger_name="pazabench", | |
| ) | |
| _tracer = trace.get_tracer("pazabench.telemetry") | |
| _meter = metrics.get_meter("pazabench.telemetry") | |
| _logger = logging.getLogger("pazabench") | |
| _logger.setLevel(logging.INFO) | |
| _telemetry_initialized = True | |
| logging.info(f"Telemetry initialized. Cloud Role Name: {CLOUD_ROLE_NAME}") | |
| return True | |
| except ImportError as e: | |
| logging.warning(f"Azure Monitor OpenTelemetry packages not installed: {e}") | |
| return False | |
| except Exception as e: | |
| logging.warning(f"Failed to initialize telemetry: {e}") | |
| return False | |
| def _ensure_initialized(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| if not _telemetry_initialized: | |
| _initialize_telemetry() | |
| if not _telemetry_initialized: | |
| return None | |
| return func(*args, **kwargs) | |
| return wrapper | |
| # ============================================================================= | |
| # Custom Events - Track user interactions and behavior | |
| # ============================================================================= | |
| def track_event( | |
| name: str, | |
| properties: dict[str, Any] | None = None, | |
| parent_event_id: str | None = None | |
| ) -> str | None: | |
| event_id = _generate_event_id() | |
| session_id = get_session_id() | |
| all_properties: dict[str, Any] = { | |
| "event_id": event_id, | |
| "session_id": session_id, | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| } | |
| if parent_event_id: | |
| all_properties["parent_event_id"] = parent_event_id | |
| if properties: | |
| all_properties.update(properties) | |
| props_str = ", ".join(f"{k}={v}" for k, v in all_properties.items()) | |
| _debug_log(f"EVENT: {name} | {props_str}") | |
| if not _logger: | |
| return event_id | |
| extra = {"microsoft.custom_event.name": name} | |
| extra.update(all_properties) | |
| _logger.info(f"Event: {name}", extra=extra) | |
| return event_id | |
| def track_page_view(page_name: str, properties: dict[str, Any] | None = None) -> None: | |
| event_properties = {"page_name": page_name} | |
| if properties: | |
| event_properties.update(properties) | |
| track_event("page_view", event_properties) | |
| def track_filter_change( | |
| filter_type: str, | |
| values: list[str] | None, | |
| tab: str | None = None | |
| ) -> None: | |
| properties = { | |
| "filter_type": filter_type, | |
| "values_count": len(values) if values else 0, | |
| "values": ",".join(values[:10]) if values else "", | |
| } | |
| if tab: | |
| properties["tab"] = tab | |
| track_event("filter_change", properties) | |
| def track_language_interest( | |
| language: str, | |
| context: str, | |
| properties: dict[str, Any] | None = None | |
| ) -> None: | |
| event_properties = {"language": language, "context": context} | |
| if properties: | |
| event_properties.update(properties) | |
| track_event("language_interest", event_properties) | |
| def track_model_interest( | |
| model: str, | |
| context: str, | |
| properties: dict[str, Any] | None = None | |
| ) -> None: | |
| event_properties = {"model": model, "context": context} | |
| if properties: | |
| event_properties.update(properties) | |
| track_event("model_interest", event_properties) | |
| def track_visualization_view( | |
| chart_name: str, | |
| properties: dict[str, Any] | None = None, | |
| languages: list[str] | None = None, | |
| models: list[str] | None = None | |
| ) -> str | None: | |
| event_properties: dict[str, Any] = {"chart_name": chart_name} | |
| if languages: | |
| event_properties["languages"] = ",".join(languages[:10]) | |
| event_properties["languages_count"] = str(len(languages)) | |
| if models: | |
| event_properties["models"] = ",".join(models[:10]) | |
| event_properties["models_count"] = str(len(models)) | |
| if properties: | |
| event_properties.update(properties) | |
| return track_event("visualization_view", event_properties) | |
| def track_submission_attempt( | |
| dataset_name: str, | |
| success: bool, | |
| error_message: str | None = None | |
| ) -> None: | |
| properties = { | |
| "dataset_name": dataset_name, | |
| "success": str(success), | |
| } | |
| if error_message: | |
| properties["error_message"] = error_message[:500] | |
| track_event("submission_attempt", properties) | |
| def track_accordion_toggle(accordion_name: str, is_open: bool) -> None: | |
| track_event("accordion_toggle", { | |
| "accordion_name": accordion_name, | |
| "action": "open" if is_open else "close" | |
| }) | |
| def track_metric_tab_view(metric: str) -> None: | |
| track_event("metric_tab_view", {"metric": metric}) | |
| def track_view_mode_change(view_mode: str) -> None: | |
| track_event("view_mode_change", {"view_mode": view_mode}) | |
| def track_region_filter(regions: list[str] | None) -> None: | |
| track_filter_change("african_region", regions, tab="asr_metrics") | |
| # ============================================================================= | |
| # Metrics - Track numeric measurements | |
| # ============================================================================= | |
| def _get_counter(name: str, description: str = "") -> Any: | |
| if not _meter: | |
| return None | |
| if name not in _counters: | |
| _counters[name] = _meter.create_counter( | |
| name, | |
| description=description, | |
| unit="1" | |
| ) | |
| return _counters[name] | |
| def _get_histogram(name: str, description: str = "", unit: str = "ms") -> Any: | |
| if not _meter: | |
| return None | |
| if name not in _histograms: | |
| _histograms[name] = _meter.create_histogram( | |
| name, | |
| description=description, | |
| unit=unit | |
| ) | |
| return _histograms[name] | |
| def track_metric( | |
| name: str, | |
| value: float, | |
| attributes: dict[str, str] | None = None, | |
| metric_type: str = "histogram" | |
| ) -> None: | |
| attrs = attributes or {} | |
| if metric_type == "counter": | |
| counter = _get_counter(name, f"Counter for {name}") | |
| if counter: | |
| counter.add(int(value), attrs) | |
| else: | |
| histogram = _get_histogram(name, f"Histogram for {name}") | |
| if histogram: | |
| histogram.record(value, attrs) | |
| def increment_counter(name: str, attributes: dict[str, str] | None = None) -> None: | |
| track_metric(name, 1, attributes, metric_type="counter") | |
| # ============================================================================= | |
| # Error Tracking - Track exceptions and errors | |
| # ============================================================================= | |
| def track_error( | |
| exception: Exception | None = None, | |
| properties: dict[str, Any] | None = None, | |
| context: str | None = None | |
| ) -> None: | |
| exc_info = f"{type(exception).__name__}: {exception}" if exception else "No exception" | |
| _debug_log(f"ERROR: context={context} | {exc_info}") | |
| if not _tracer: | |
| return | |
| from opentelemetry.trace import SpanKind | |
| error_properties = properties or {} | |
| if context: | |
| error_properties["context"] = context | |
| with _tracer.start_as_current_span( | |
| f"error_{context or 'unknown'}", | |
| kind=SpanKind.INTERNAL | |
| ) as span: | |
| if exception: | |
| span.record_exception(exception) | |
| error_properties["exception_type"] = type(exception).__name__ | |
| error_properties["exception_message"] = str(exception)[:500] | |
| for key, value in error_properties.items(): | |
| span.set_attribute(f"error.{key}", str(value)) | |
| # ============================================================================= | |
| # Tracing - Track operations with spans | |
| # ============================================================================= | |
| def start_span(name: str, attributes: dict[str, str] | None = None): | |
| if not _tracer or not _telemetry_initialized: | |
| yield None | |
| return | |
| with _tracer.start_as_current_span(name) as span: | |
| if attributes: | |
| for key, value in attributes.items(): | |
| span.set_attribute(key, str(value)) | |
| yield span | |
| # ============================================================================= | |
| # Utility Functions | |
| # ============================================================================= | |
| def is_telemetry_enabled() -> bool: | |
| return _telemetry_initialized | |
| def get_telemetry_status() -> dict[str, Any]: | |
| return { | |
| "enabled": TELEMETRY_ENABLED, | |
| "initialized": _telemetry_initialized, | |
| "connection_string_set": bool(APPINSIGHTS_PARAM), | |
| "cloud_role_name": CLOUD_ROLE_NAME, | |
| } | |
| # ============================================================================= | |
| # Convenience wrapper for tracking in Gradio callbacks | |
| # ============================================================================= | |
| def with_telemetry(event_name: str, **event_props): | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| track_event(event_name, event_props) | |
| return func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def init_session_from_request(request) -> str: | |
| session_hash = None | |
| if request: | |
| session_hash = getattr(request, 'session_hash', None) | |
| if not session_hash: | |
| session_hash = getattr(request, 'username', None) | |
| return set_session_context(session_hash) | |
| def track_page_load(page_name: str, request=None) -> str | None: | |
| if request: | |
| init_session_from_request(request) | |
| return track_page_view(page_name) | |
| if TELEMETRY_ENABLED: | |
| _initialize_telemetry() |