File size: 5,139 Bytes
5dc68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
Lightweight observability and KPI tracking.

Provides:
- run IDs for each search execution
- structured JSON log events
- per-run counters/attributes
- in-memory snapshots for API health checks
"""

import json
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from threading import Lock
from typing import Any, Dict, List, Optional


@dataclass
class RunState:
    """State container for one tracked run."""

    run_id: str
    component: str
    started_at: float
    params: Dict[str, Any] = field(default_factory=dict)
    counters: Dict[str, float] = field(default_factory=dict)
    attributes: Dict[str, Any] = field(default_factory=dict)
    ended_at: Optional[float] = None
    status: str = "running"

    def to_summary(self) -> Dict[str, Any]:
        """Serialize a stable summary payload for UI/API output."""
        ended_at_value = self.ended_at
        if ended_at_value is not None:
            end_ts = float(ended_at_value)
            ended_at_iso = datetime.fromtimestamp(end_ts).isoformat()
        else:
            end_ts = float(time.time())
            ended_at_iso = None

        started_at = float(self.started_at)
        duration = max(0.0, end_ts - started_at)

        return {
            "run_id": self.run_id,
            "component": self.component,
            "status": self.status,
            "started_at": datetime.fromtimestamp(started_at).isoformat(),
            "ended_at": ended_at_iso,
            "duration_seconds": float(f"{duration:.2f}"),
            "params": self.params,
            "counters": self.counters,
            "attributes": self.attributes,
        }


class ObservabilityTracker:
    """Thread-safe in-memory tracker with structured event logging."""

    def __init__(self, max_history: int = 200):
        self.max_history = max_history
        self._active_runs: Dict[str, RunState] = {}
        self._history: List[Dict[str, Any]] = []
        self._lock = Lock()

    def start_run(self, component: str, params: Optional[Dict[str, Any]] = None) -> str:
        """Start a new run and return its run id."""
        run_id_raw = uuid.uuid4().hex
        run_id = "".join(run_id_raw[i] for i in range(12))
        state = RunState(
            run_id=run_id,
            component=component,
            started_at=time.time(),
            params=params or {},
        )
        with self._lock:
            self._active_runs[run_id] = state
        self.event(run_id, "run_started", component=component)
        return run_id

    def event(self, run_id: str, event: str, **fields: Any) -> None:
        """Emit a structured JSON log line for a run event."""
        payload = {
            "timestamp": datetime.now().isoformat(),
            "run_id": run_id,
            "event": event,
            **fields,
        }
        try:
            print(f"[OBS] {json.dumps(payload, ensure_ascii=False, default=str)}")
        except Exception:
            print(f"[OBS] run_id={run_id} event={event}")

    def incr(self, run_id: str, key: str, value: float = 1.0) -> None:
        """Increment a numeric counter for a run."""
        with self._lock:
            state = self._active_runs.get(run_id)
            if not state:
                return
            state.counters[key] = float(state.counters.get(key, 0.0) + value)

    def set_attr(self, run_id: str, key: str, value: Any) -> None:
        """Set or overwrite an attribute on the active run."""
        with self._lock:
            state = self._active_runs.get(run_id)
            if not state:
                return
            state.attributes[key] = value

    def end_run(self, run_id: str, status: str = "ok", **final_attrs: Any) -> Dict[str, Any]:
        """Finish an active run and return its summary."""
        with self._lock:
            state = self._active_runs.pop(run_id, None)
            if not state:
                return {}

            state.ended_at = time.time()
            state.status = status
            if final_attrs:
                state.attributes.update(final_attrs)
            summary = state.to_summary()
            self._history.append(summary)
            while len(self._history) > self.max_history:
                self._history.pop(0)

        self.event(run_id, "run_finished", status=status, duration_seconds=summary.get("duration_seconds", 0))
        return summary

    def snapshot(self) -> Dict[str, Any]:
        """Return a compact snapshot for health endpoints."""
        with self._lock:
            active_summaries = [state.to_summary() for state in self._active_runs.values()]
            recent: List[Dict[str, Any]] = []
            history_len = len(self._history)
            start_idx = history_len - 10 if history_len > 10 else 0
            idx = start_idx
            while idx < history_len:
                recent.append(self._history[idx])
                idx += 1

        return {
            "active_runs": len(active_summaries),
            "recent_runs": recent,
            "currently_running": active_summaries,
        }


observability_tracker = ObservabilityTracker()