| |
| """ |
| pluto/tracer.py — Logging & trace system for pipeline execution. |
| |
| Maintains an in-memory log of all events and can export as JSON. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import time |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
|
|
| @dataclass |
| class TraceEvent: |
| timestamp: float |
| event: str |
| payload: dict[str, Any] |
|
|
|
|
| class Tracer: |
| """Collects structured trace events throughout a pipeline run.""" |
|
|
| def __init__(self) -> None: |
| self.events: list[TraceEvent] = [] |
| self.search_queries: list[str] = [] |
| self.docs_opened: set[str] = set() |
| self.modes_used: dict[str, int] = {} |
| self.models_used: set[str] = set() |
| self.chunks_processed: int = 0 |
| self._start = time.perf_counter() |
|
|
| def log(self, event: str, payload: dict[str, Any] | None = None) -> None: |
| payload = payload or {} |
| self.events.append(TraceEvent( |
| timestamp=time.perf_counter() - self._start, |
| event=event, |
| payload=payload, |
| )) |
|
|
| |
| if event == "dispatch": |
| mode = payload.get("mode", "") |
| model = payload.get("model_id", "") |
| self.modes_used[mode] = self.modes_used.get(mode, 0) + 1 |
| self.models_used.add(model) |
|
|
| def record_search(self, query: str) -> None: |
| self.search_queries.append(query) |
|
|
| def record_doc_opened(self, doc_id: str) -> None: |
| self.docs_opened.add(doc_id) |
|
|
| def record_chunk_processed(self) -> None: |
| self.chunks_processed += 1 |
|
|
| def to_json(self) -> list[dict]: |
| return [ |
| { |
| "t": round(e.timestamp, 4), |
| "event": e.event, |
| "payload": e.payload, |
| } |
| for e in self.events |
| ] |
|
|
| def elapsed(self) -> float: |
| return round(time.perf_counter() - self._start, 3) |
|
|