Spaces:
Sleeping
Sleeping
File size: 5,139 Bytes
5dc68a0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | """
Lightweight observability and KPI tracking.
Provides:
- run IDs for each search execution
- structured JSON log events
- per-run counters/attributes
- in-memory snapshots for API health checks
"""
import json
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from threading import Lock
from typing import Any, Dict, List, Optional
@dataclass
class RunState:
"""State container for one tracked run."""
run_id: str
component: str
started_at: float
params: Dict[str, Any] = field(default_factory=dict)
counters: Dict[str, float] = field(default_factory=dict)
attributes: Dict[str, Any] = field(default_factory=dict)
ended_at: Optional[float] = None
status: str = "running"
def to_summary(self) -> Dict[str, Any]:
"""Serialize a stable summary payload for UI/API output."""
ended_at_value = self.ended_at
if ended_at_value is not None:
end_ts = float(ended_at_value)
ended_at_iso = datetime.fromtimestamp(end_ts).isoformat()
else:
end_ts = float(time.time())
ended_at_iso = None
started_at = float(self.started_at)
duration = max(0.0, end_ts - started_at)
return {
"run_id": self.run_id,
"component": self.component,
"status": self.status,
"started_at": datetime.fromtimestamp(started_at).isoformat(),
"ended_at": ended_at_iso,
"duration_seconds": float(f"{duration:.2f}"),
"params": self.params,
"counters": self.counters,
"attributes": self.attributes,
}
class ObservabilityTracker:
"""Thread-safe in-memory tracker with structured event logging."""
def __init__(self, max_history: int = 200):
self.max_history = max_history
self._active_runs: Dict[str, RunState] = {}
self._history: List[Dict[str, Any]] = []
self._lock = Lock()
def start_run(self, component: str, params: Optional[Dict[str, Any]] = None) -> str:
"""Start a new run and return its run id."""
run_id_raw = uuid.uuid4().hex
run_id = "".join(run_id_raw[i] for i in range(12))
state = RunState(
run_id=run_id,
component=component,
started_at=time.time(),
params=params or {},
)
with self._lock:
self._active_runs[run_id] = state
self.event(run_id, "run_started", component=component)
return run_id
def event(self, run_id: str, event: str, **fields: Any) -> None:
"""Emit a structured JSON log line for a run event."""
payload = {
"timestamp": datetime.now().isoformat(),
"run_id": run_id,
"event": event,
**fields,
}
try:
print(f"[OBS] {json.dumps(payload, ensure_ascii=False, default=str)}")
except Exception:
print(f"[OBS] run_id={run_id} event={event}")
def incr(self, run_id: str, key: str, value: float = 1.0) -> None:
"""Increment a numeric counter for a run."""
with self._lock:
state = self._active_runs.get(run_id)
if not state:
return
state.counters[key] = float(state.counters.get(key, 0.0) + value)
def set_attr(self, run_id: str, key: str, value: Any) -> None:
"""Set or overwrite an attribute on the active run."""
with self._lock:
state = self._active_runs.get(run_id)
if not state:
return
state.attributes[key] = value
def end_run(self, run_id: str, status: str = "ok", **final_attrs: Any) -> Dict[str, Any]:
"""Finish an active run and return its summary."""
with self._lock:
state = self._active_runs.pop(run_id, None)
if not state:
return {}
state.ended_at = time.time()
state.status = status
if final_attrs:
state.attributes.update(final_attrs)
summary = state.to_summary()
self._history.append(summary)
while len(self._history) > self.max_history:
self._history.pop(0)
self.event(run_id, "run_finished", status=status, duration_seconds=summary.get("duration_seconds", 0))
return summary
def snapshot(self) -> Dict[str, Any]:
"""Return a compact snapshot for health endpoints."""
with self._lock:
active_summaries = [state.to_summary() for state in self._active_runs.values()]
recent: List[Dict[str, Any]] = []
history_len = len(self._history)
start_idx = history_len - 10 if history_len > 10 else 0
idx = start_idx
while idx < history_len:
recent.append(self._history[idx])
idx += 1
return {
"active_runs": len(active_summaries),
"recent_runs": recent,
"currently_running": active_summaries,
}
observability_tracker = ObservabilityTracker()
|