File size: 1,969 Bytes
23cdeed
66ad25b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# -*- coding: utf-8 -*-
"""
pluto/tracer.py — Logging & trace system for pipeline execution.

Maintains an in-memory log of all events and can export as JSON.
"""

from __future__ import annotations

import json
import time
from dataclasses import dataclass, field
from typing import Any


@dataclass
class TraceEvent:
    timestamp: float
    event: str
    payload: dict[str, Any]


class Tracer:
    """Collects structured trace events throughout a pipeline run."""

    def __init__(self) -> None:
        self.events: list[TraceEvent] = []
        self.search_queries: list[str] = []
        self.docs_opened: set[str] = set()
        self.modes_used: dict[str, int] = {}
        self.models_used: set[str] = set()
        self.chunks_processed: int = 0
        self._start = time.perf_counter()

    def log(self, event: str, payload: dict[str, Any] | None = None) -> None:
        payload = payload or {}
        self.events.append(TraceEvent(
            timestamp=time.perf_counter() - self._start,
            event=event,
            payload=payload,
        ))

        # Auto-track mode / model usage from dispatch events
        if event == "dispatch":
            mode = payload.get("mode", "")
            model = payload.get("model_id", "")
            self.modes_used[mode] = self.modes_used.get(mode, 0) + 1
            self.models_used.add(model)

    def record_search(self, query: str) -> None:
        self.search_queries.append(query)

    def record_doc_opened(self, doc_id: str) -> None:
        self.docs_opened.add(doc_id)

    def record_chunk_processed(self) -> None:
        self.chunks_processed += 1

    def to_json(self) -> list[dict]:
        return [
            {
                "t": round(e.timestamp, 4),
                "event": e.event,
                "payload": e.payload,
            }
            for e in self.events
        ]

    def elapsed(self) -> float:
        return round(time.perf_counter() - self._start, 3)