File size: 2,850 Bytes
e9462cd
 
 
fa70564
e9462cd
 
 
 
 
fa70564
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9462cd
 
fa70564
 
 
 
 
 
e9462cd
fa70564
 
e9462cd
fa70564
 
 
 
 
 
 
 
 
e9462cd
fa70564
 
 
 
 
 
 
 
 
 
 
 
 
 
e9462cd
 
fa70564
e9462cd
fa70564
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from __future__ import annotations

import json
import os
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional


class LoggingStore:
    def __init__(self, root: str = "logs"):
        self.root = root
        os.makedirs(self.root, exist_ok=True)
        self.sessions_path = os.path.join(self.root, "sessions.jsonl")
        self.events_path = os.path.join(self.root, "events.jsonl")

    def _append(self, path: str, payload: Dict[str, Any]) -> None:
        with open(path, "a", encoding="utf-8") as f:
            f.write(json.dumps(payload, ensure_ascii=False) + "\n")

    def _now(self) -> str:
        return datetime.now(timezone.utc).isoformat()

    def start_session(self, session_id: str, user_id: Optional[str], condition: Optional[str], metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]:
        record = {
            "session_id": session_id,
            "user_id": user_id,
            "condition": condition,
            "metadata": metadata or {},
            "started_at": self._now(),
            "type": "session_start",
        }
        self._append(self.sessions_path, record)
        return record

    def log_event(self, session_id: str, event_type: str, payload: Optional[Dict[str, Any]], timestamp: Optional[str]) -> Dict[str, Any]:
        record = {
            "session_id": session_id,
            "event_type": event_type,
            "timestamp": timestamp or self._now(),
            "payload": payload or {},
        }
        self._append(self.events_path, record)
        return record

    def finalize_session(self, session_id: str, summary: Optional[Dict[str, Any]]) -> Dict[str, Any]:
        record = {
            "session_id": session_id,
            "summary": summary or {},
            "finalized_at": self._now(),
            "type": "session_finalize",
        }
        self._append(self.sessions_path, record)
        return record

    def _read_jsonl(self, path: str) -> List[Dict[str, Any]]:
        if not os.path.exists(path):
            return []
        rows = []
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    rows.append(json.loads(line))
                except Exception:
                    continue
        return rows

    def list_sessions(self) -> List[Dict[str, Any]]:
        return self._read_jsonl(self.sessions_path)

    def get_session(self, session_id: str) -> Dict[str, Any]:
        sessions = [r for r in self._read_jsonl(self.sessions_path) if r.get("session_id") == session_id]
        events = [r for r in self._read_jsonl(self.events_path) if r.get("session_id") == session_id]
        return {"session_id": session_id, "records": sessions, "events": events}