File size: 2,179 Bytes
e2a55ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import json
import time
import hashlib
from dataclasses import dataclass, asdict
from typing import Any, Dict, Optional, List

def utc_ms() -> int:
    return int(time.time() * 1000)

def stable_hash(obj: Any) -> str:
    # Deterministic hash for replay/debug. Avoid huge payloads.
    s = json.dumps(obj, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:16]

@dataclass
class Event:
    run_id: str
    t_sim: int
    t_wall_ms: int
    event_type: str

    agent_id: Optional[str] = None
    role: Optional[str] = None
    model_key: Optional[str] = None

    payload: Dict[str, Any] = None
    latency_ms: Optional[int] = None

    tokens: Dict[str, Any] = None
    cost: Dict[str, Any] = None

    state_hash: Optional[str] = None

class EventStore:
    """
    In-memory event store (append-only). Exports to JSONL.
    For long horizons, this can be sharded/streamed to disk.
    """
    def __init__(self, run_id: str):
        self.run_id = run_id
        self.events: List[Event] = []

    def emit(
        self,
        t_sim: int,
        event_type: str,
        agent_id: Optional[str] = None,
        role: Optional[str] = None,
        model_key: Optional[str] = None,
        payload: Optional[Dict[str, Any]] = None,
        latency_ms: Optional[int] = None,
        tokens: Optional[Dict[str, Any]] = None,
        cost: Optional[Dict[str, Any]] = None,
        state_obj_for_hash: Optional[Any] = None,
    ):
        e = Event(
            run_id=self.run_id,
            t_sim=t_sim,
            t_wall_ms=utc_ms(),
            event_type=event_type,
            agent_id=agent_id,
            role=role,
            model_key=model_key,
            payload=payload or {},
            latency_ms=latency_ms,
            tokens=tokens or {},
            cost=cost or {},
            state_hash=stable_hash(state_obj_for_hash) if state_obj_for_hash is not None else None,
        )
        self.events.append(e)

    def to_jsonl(self) -> str:
        lines = []
        for e in self.events:
            d = asdict(e)
            lines.append(json.dumps(d, ensure_ascii=False))
        return "\n".join(lines)