ayushKishor's picture
Add Pluto memory layer and pipeline fixes
23cdeed
# -*- coding: utf-8 -*-
"""
pluto/tracer.py — Logging & trace system for pipeline execution.
Maintains an in-memory log of all events and can export as JSON.
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TraceEvent:
timestamp: float
event: str
payload: dict[str, Any]
class Tracer:
"""Collects structured trace events throughout a pipeline run."""
def __init__(self) -> None:
self.events: list[TraceEvent] = []
self.search_queries: list[str] = []
self.docs_opened: set[str] = set()
self.modes_used: dict[str, int] = {}
self.models_used: set[str] = set()
self.chunks_processed: int = 0
self._start = time.perf_counter()
def log(self, event: str, payload: dict[str, Any] | None = None) -> None:
payload = payload or {}
self.events.append(TraceEvent(
timestamp=time.perf_counter() - self._start,
event=event,
payload=payload,
))
# Auto-track mode / model usage from dispatch events
if event == "dispatch":
mode = payload.get("mode", "")
model = payload.get("model_id", "")
self.modes_used[mode] = self.modes_used.get(mode, 0) + 1
self.models_used.add(model)
def record_search(self, query: str) -> None:
self.search_queries.append(query)
def record_doc_opened(self, doc_id: str) -> None:
self.docs_opened.add(doc_id)
def record_chunk_processed(self) -> None:
self.chunks_processed += 1
def to_json(self) -> list[dict]:
return [
{
"t": round(e.timestamp, 4),
"event": e.event,
"payload": e.payload,
}
for e in self.events
]
def elapsed(self) -> float:
return round(time.perf_counter() - self._start, 3)