| from __future__ import annotations |
|
|
| import json |
| import os |
| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Optional |
|
|
|
|
| class LoggingStore: |
| def __init__(self, root: str = "logs"): |
| self.root = root |
| os.makedirs(self.root, exist_ok=True) |
| self.sessions_path = os.path.join(self.root, "sessions.jsonl") |
| self.events_path = os.path.join(self.root, "events.jsonl") |
|
|
| def _append(self, path: str, payload: Dict[str, Any]) -> None: |
| with open(path, "a", encoding="utf-8") as f: |
| f.write(json.dumps(payload, ensure_ascii=False) + "\n") |
|
|
| def _now(self) -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
| def start_session(self, session_id: str, user_id: Optional[str], condition: Optional[str], metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]: |
| record = { |
| "session_id": session_id, |
| "user_id": user_id, |
| "condition": condition, |
| "metadata": metadata or {}, |
| "started_at": self._now(), |
| "type": "session_start", |
| } |
| self._append(self.sessions_path, record) |
| return record |
|
|
| def log_event(self, session_id: str, event_type: str, payload: Optional[Dict[str, Any]], timestamp: Optional[str]) -> Dict[str, Any]: |
| record = { |
| "session_id": session_id, |
| "event_type": event_type, |
| "timestamp": timestamp or self._now(), |
| "payload": payload or {}, |
| } |
| self._append(self.events_path, record) |
| return record |
|
|
| def finalize_session(self, session_id: str, summary: Optional[Dict[str, Any]]) -> Dict[str, Any]: |
| record = { |
| "session_id": session_id, |
| "summary": summary or {}, |
| "finalized_at": self._now(), |
| "type": "session_finalize", |
| } |
| self._append(self.sessions_path, record) |
| return record |
|
|
| def _read_jsonl(self, path: str) -> List[Dict[str, Any]]: |
| if not os.path.exists(path): |
| return [] |
| rows = [] |
| with open(path, "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| rows.append(json.loads(line)) |
| except Exception: |
| continue |
| return rows |
|
|
| def list_sessions(self) -> List[Dict[str, Any]]: |
| return self._read_jsonl(self.sessions_path) |
|
|
| def get_session(self, session_id: str) -> Dict[str, Any]: |
| sessions = [r for r in self._read_jsonl(self.sessions_path) if r.get("session_id") == session_id] |
| events = [r for r in self._read_jsonl(self.events_path) if r.get("session_id") == session_id] |
| return {"session_id": session_id, "records": sessions, "events": events} |
|
|