paza-bench / src /telemetry.py
muchai-mercy's picture
update pazabench space
53a73e0
import os
import logging
import uuid
import threading
from contextlib import contextmanager
from datetime import datetime
from functools import wraps
from typing import Any, Callable
TELEMETRY_ENABLED = os.environ.get("TELEMETRY_ENABLED", "false").lower() == "true"
APPINSIGHTS_PARAM = os.environ.get("APPINSIGHTS_PARAM", "")
TELEMETRY_DEBUG = os.environ.get("TELEMETRY_DEBUG", "false").lower() == "true"
CLOUD_ROLE_NAME = os.environ.get("OTEL_SERVICE_NAME")
_telemetry_initialized = False
_tracer = None
_meter = None
_logger = None
_counters: dict[str, Any] = {}
_histograms: dict[str, Any] = {}
# =============================================================================
# Session Context Management
# =============================================================================
_session_context = threading.local()
def _generate_event_id() -> str:
return str(uuid.uuid4())[:8]
def set_session_context(session_hash: str | None) -> str:
if session_hash:
_session_context.session_id = session_hash[:12]
else:
if not hasattr(_session_context, 'session_id') or not _session_context.session_id:
_session_context.session_id = f"anon_{_generate_event_id()}"
return _session_context.session_id
def get_session_id() -> str:
if not hasattr(_session_context, 'session_id') or not _session_context.session_id:
return set_session_context(None)
return _session_context.session_id
def clear_session_context() -> None:
if hasattr(_session_context, 'session_id'):
_session_context.session_id = None
def _debug_log(message: str) -> None:
if TELEMETRY_DEBUG:
print(f"[TELEMETRY] {message}", flush=True)
def _initialize_telemetry() -> bool:
global _telemetry_initialized, _tracer, _meter, _logger
if _telemetry_initialized:
return True
if not TELEMETRY_ENABLED:
return False
if not APPINSIGHTS_PARAM:
logging.warning("APPINSIGHTS_PARAM not set. Telemetry disabled.")
return False
if not CLOUD_ROLE_NAME:
logging.warning("OTEL_SERVICE_NAME not set. Telemetry disabled.")
return False
try:
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace, metrics
os.environ.setdefault("OTEL_SERVICE_NAME", CLOUD_ROLE_NAME)
configure_azure_monitor(
connection_string=APPINSIGHTS_PARAM,
logger_name="pazabench",
)
_tracer = trace.get_tracer("pazabench.telemetry")
_meter = metrics.get_meter("pazabench.telemetry")
_logger = logging.getLogger("pazabench")
_logger.setLevel(logging.INFO)
_telemetry_initialized = True
logging.info(f"Telemetry initialized. Cloud Role Name: {CLOUD_ROLE_NAME}")
return True
except ImportError as e:
logging.warning(f"Azure Monitor OpenTelemetry packages not installed: {e}")
return False
except Exception as e:
logging.warning(f"Failed to initialize telemetry: {e}")
return False
def _ensure_initialized(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if not _telemetry_initialized:
_initialize_telemetry()
if not _telemetry_initialized:
return None
return func(*args, **kwargs)
return wrapper
# =============================================================================
# Custom Events - Track user interactions and behavior
# =============================================================================
@_ensure_initialized
def track_event(
name: str,
properties: dict[str, Any] | None = None,
parent_event_id: str | None = None
) -> str | None:
event_id = _generate_event_id()
session_id = get_session_id()
all_properties: dict[str, Any] = {
"event_id": event_id,
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
}
if parent_event_id:
all_properties["parent_event_id"] = parent_event_id
if properties:
all_properties.update(properties)
props_str = ", ".join(f"{k}={v}" for k, v in all_properties.items())
_debug_log(f"EVENT: {name} | {props_str}")
if not _logger:
return event_id
extra = {"microsoft.custom_event.name": name}
extra.update(all_properties)
_logger.info(f"Event: {name}", extra=extra)
return event_id
@_ensure_initialized
def track_page_view(page_name: str, properties: dict[str, Any] | None = None) -> None:
event_properties = {"page_name": page_name}
if properties:
event_properties.update(properties)
track_event("page_view", event_properties)
@_ensure_initialized
def track_filter_change(
filter_type: str,
values: list[str] | None,
tab: str | None = None
) -> None:
properties = {
"filter_type": filter_type,
"values_count": len(values) if values else 0,
"values": ",".join(values[:10]) if values else "",
}
if tab:
properties["tab"] = tab
track_event("filter_change", properties)
@_ensure_initialized
def track_language_interest(
language: str,
context: str,
properties: dict[str, Any] | None = None
) -> None:
event_properties = {"language": language, "context": context}
if properties:
event_properties.update(properties)
track_event("language_interest", event_properties)
@_ensure_initialized
def track_model_interest(
model: str,
context: str,
properties: dict[str, Any] | None = None
) -> None:
event_properties = {"model": model, "context": context}
if properties:
event_properties.update(properties)
track_event("model_interest", event_properties)
@_ensure_initialized
def track_visualization_view(
chart_name: str,
properties: dict[str, Any] | None = None,
languages: list[str] | None = None,
models: list[str] | None = None
) -> str | None:
event_properties: dict[str, Any] = {"chart_name": chart_name}
if languages:
event_properties["languages"] = ",".join(languages[:10])
event_properties["languages_count"] = str(len(languages))
if models:
event_properties["models"] = ",".join(models[:10])
event_properties["models_count"] = str(len(models))
if properties:
event_properties.update(properties)
return track_event("visualization_view", event_properties)
@_ensure_initialized
def track_submission_attempt(
dataset_name: str,
success: bool,
error_message: str | None = None
) -> None:
properties = {
"dataset_name": dataset_name,
"success": str(success),
}
if error_message:
properties["error_message"] = error_message[:500]
track_event("submission_attempt", properties)
@_ensure_initialized
def track_accordion_toggle(accordion_name: str, is_open: bool) -> None:
track_event("accordion_toggle", {
"accordion_name": accordion_name,
"action": "open" if is_open else "close"
})
@_ensure_initialized
def track_metric_tab_view(metric: str) -> None:
track_event("metric_tab_view", {"metric": metric})
@_ensure_initialized
def track_view_mode_change(view_mode: str) -> None:
track_event("view_mode_change", {"view_mode": view_mode})
@_ensure_initialized
def track_region_filter(regions: list[str] | None) -> None:
track_filter_change("african_region", regions, tab="asr_metrics")
# =============================================================================
# Metrics - Track numeric measurements
# =============================================================================
def _get_counter(name: str, description: str = "") -> Any:
if not _meter:
return None
if name not in _counters:
_counters[name] = _meter.create_counter(
name,
description=description,
unit="1"
)
return _counters[name]
def _get_histogram(name: str, description: str = "", unit: str = "ms") -> Any:
if not _meter:
return None
if name not in _histograms:
_histograms[name] = _meter.create_histogram(
name,
description=description,
unit=unit
)
return _histograms[name]
@_ensure_initialized
def track_metric(
name: str,
value: float,
attributes: dict[str, str] | None = None,
metric_type: str = "histogram"
) -> None:
attrs = attributes or {}
if metric_type == "counter":
counter = _get_counter(name, f"Counter for {name}")
if counter:
counter.add(int(value), attrs)
else:
histogram = _get_histogram(name, f"Histogram for {name}")
if histogram:
histogram.record(value, attrs)
@_ensure_initialized
def increment_counter(name: str, attributes: dict[str, str] | None = None) -> None:
track_metric(name, 1, attributes, metric_type="counter")
# =============================================================================
# Error Tracking - Track exceptions and errors
# =============================================================================
@_ensure_initialized
def track_error(
exception: Exception | None = None,
properties: dict[str, Any] | None = None,
context: str | None = None
) -> None:
exc_info = f"{type(exception).__name__}: {exception}" if exception else "No exception"
_debug_log(f"ERROR: context={context} | {exc_info}")
if not _tracer:
return
from opentelemetry.trace import SpanKind
error_properties = properties or {}
if context:
error_properties["context"] = context
with _tracer.start_as_current_span(
f"error_{context or 'unknown'}",
kind=SpanKind.INTERNAL
) as span:
if exception:
span.record_exception(exception)
error_properties["exception_type"] = type(exception).__name__
error_properties["exception_message"] = str(exception)[:500]
for key, value in error_properties.items():
span.set_attribute(f"error.{key}", str(value))
# =============================================================================
# Tracing - Track operations with spans
# =============================================================================
@contextmanager
def start_span(name: str, attributes: dict[str, str] | None = None):
if not _tracer or not _telemetry_initialized:
yield None
return
with _tracer.start_as_current_span(name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, str(value))
yield span
# =============================================================================
# Utility Functions
# =============================================================================
def is_telemetry_enabled() -> bool:
return _telemetry_initialized
def get_telemetry_status() -> dict[str, Any]:
return {
"enabled": TELEMETRY_ENABLED,
"initialized": _telemetry_initialized,
"connection_string_set": bool(APPINSIGHTS_PARAM),
"cloud_role_name": CLOUD_ROLE_NAME,
}
# =============================================================================
# Convenience wrapper for tracking in Gradio callbacks
# =============================================================================
def with_telemetry(event_name: str, **event_props):
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
track_event(event_name, event_props)
return func(*args, **kwargs)
return wrapper
return decorator
def init_session_from_request(request) -> str:
session_hash = None
if request:
session_hash = getattr(request, 'session_hash', None)
if not session_hash:
session_hash = getattr(request, 'username', None)
return set_session_context(session_hash)
def track_page_load(page_name: str, request=None) -> str | None:
if request:
init_session_from_request(request)
return track_page_view(page_name)
if TELEMETRY_ENABLED:
_initialize_telemetry()