from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional import aiosqlite from .models import RunRecord, RunSummary class SQLiteRunStore: def __init__(self, db_path: str): self.db_path = str(db_path) async def init(self) -> None: Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(self.db_path) as db: await db.execute("PRAGMA foreign_keys=ON;") await db.execute( """ CREATE TABLE IF NOT EXISTS runs ( run_id TEXT PRIMARY KEY, mode TEXT NOT NULL, status TEXT NOT NULL, created_at TEXT NOT NULL, ended_at TEXT NOT NULL, title TEXT, input_summary TEXT, config_json TEXT NOT NULL, sealed_at TEXT NOT NULL ); """ ) await db.execute( "CREATE INDEX IF NOT EXISTS runs_mode_created_at ON runs(mode, created_at DESC);" ) await db.execute( "CREATE INDEX IF NOT EXISTS runs_created_at ON runs(created_at DESC);" ) await db.execute( """ CREATE TABLE IF NOT EXISTS run_messages ( run_id TEXT NOT NULL, message_index INTEGER NOT NULL, role TEXT NOT NULL, persona_label TEXT, content TEXT NOT NULL, timestamp TEXT, PRIMARY KEY (run_id, message_index), FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE ); """ ) await db.execute( """ CREATE TABLE IF NOT EXISTS run_analyses ( run_id TEXT NOT NULL, analysis_key TEXT NOT NULL, schema_version TEXT, prompt_version TEXT, result_json TEXT NOT NULL, PRIMARY KEY (run_id, analysis_key), FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE ); """ ) await db.execute( """ CREATE TABLE IF NOT EXISTS run_persona_snapshots ( run_id TEXT NOT NULL, role TEXT NOT NULL, persona_id TEXT, persona_version_id TEXT, snapshot_json TEXT NOT NULL, PRIMARY KEY (run_id, role), FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE ); """ ) await db.commit() async def save_sealed_run(self, record: RunRecord) -> None: Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(self.db_path) as db: await db.execute("PRAGMA foreign_keys=ON;") await db.execute("BEGIN;") try: await db.execute( """ INSERT INTO runs ( run_id, mode, status, created_at, ended_at, title, input_summary, config_json, sealed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( record.run_id, record.mode, record.status, record.created_at, record.ended_at, record.title, record.input_summary, json.dumps(record.config, ensure_ascii=False), record.sealed_at, ), ) if record.messages: await db.executemany( """ INSERT INTO run_messages ( run_id, message_index, role, persona_label, content, timestamp ) VALUES (?, ?, ?, ?, ?, ?); """, [ ( record.run_id, int(msg.get("index")), str(msg.get("role") or ""), msg.get("persona"), str(msg.get("content") or ""), msg.get("timestamp"), ) for msg in record.messages if isinstance(msg.get("index"), int) ], ) if record.analyses: await db.executemany( """ INSERT INTO run_analyses ( run_id, analysis_key, schema_version, prompt_version, result_json ) VALUES (?, ?, ?, ?, ?); """, [ ( record.run_id, key, (val or {}).get("schema_version"), (val or {}).get("analysis_prompt_version"), json.dumps(val or {}, ensure_ascii=False), ) for key, val in record.analyses.items() ], ) if record.persona_snapshots: await db.executemany( """ INSERT INTO run_persona_snapshots ( run_id, role, persona_id, persona_version_id, snapshot_json ) VALUES (?, ?, ?, ?, ?); """, [ ( record.run_id, role, (snap or {}).get("persona_id"), (snap or {}).get("persona_version_id"), json.dumps((snap or {}).get("snapshot") or {}, ensure_ascii=False), ) for role, snap in record.persona_snapshots.items() ], ) await db.commit() except Exception: await db.execute("ROLLBACK;") raise async def list_runs( self, *, mode: Optional[str] = None, limit: int = 50, offset: int = 0, ) -> List[RunSummary]: limit = max(1, min(int(limit), 200)) offset = max(0, int(offset)) async with aiosqlite.connect(self.db_path) as db: await db.execute("PRAGMA foreign_keys=ON;") if mode: cursor = await db.execute( """ SELECT run_id, mode, status, created_at, ended_at, title, input_summary FROM runs WHERE mode = ? ORDER BY created_at DESC LIMIT ? OFFSET ?; """, (mode, limit, offset), ) else: cursor = await db.execute( """ SELECT run_id, mode, status, created_at, ended_at, title, input_summary FROM runs ORDER BY created_at DESC LIMIT ? OFFSET ?; """, (limit, offset), ) rows = await cursor.fetchall() return [ RunSummary( run_id=row[0], mode=row[1], status=row[2], created_at=row[3], ended_at=row[4], title=row[5], input_summary=row[6], ) for row in rows ] async def get_run(self, run_id: str) -> Optional[RunRecord]: async with aiosqlite.connect(self.db_path) as db: await db.execute("PRAGMA foreign_keys=ON;") cursor = await db.execute( """ SELECT run_id, mode, status, created_at, ended_at, title, input_summary, config_json, sealed_at FROM runs WHERE run_id = ?; """, (run_id,), ) row = await cursor.fetchone() if not row: return None cursor = await db.execute( """ SELECT message_index, role, persona_label, content, timestamp FROM run_messages WHERE run_id = ? ORDER BY message_index ASC; """, (run_id,), ) message_rows = await cursor.fetchall() cursor = await db.execute( """ SELECT analysis_key, result_json FROM run_analyses WHERE run_id = ?; """, (run_id,), ) analysis_rows = await cursor.fetchall() cursor = await db.execute( """ SELECT role, persona_id, persona_version_id, snapshot_json FROM run_persona_snapshots WHERE run_id = ?; """, (run_id,), ) snapshot_rows = await cursor.fetchall() messages: List[Dict[str, Any]] = [] for (idx, role, persona, content, ts) in message_rows: messages.append( { "index": idx, "role": role, "persona": persona, "content": content, "timestamp": ts, } ) analyses: Dict[str, Dict[str, Any]] = {} for (analysis_key, result_json) in analysis_rows: try: analyses[str(analysis_key)] = json.loads(result_json or "{}") except Exception: analyses[str(analysis_key)] = {} persona_snapshots: Dict[str, Dict[str, Any]] = {} for (role, persona_id, persona_version_id, snapshot_json) in snapshot_rows: try: snapshot = json.loads(snapshot_json or "{}") except Exception: snapshot = {} persona_snapshots[str(role)] = { "persona_id": persona_id, "persona_version_id": persona_version_id, "snapshot": snapshot, } try: config = json.loads(row[7] or "{}") except Exception: config = {} return RunRecord( run_id=row[0], mode=row[1], status=row[2], created_at=row[3], ended_at=row[4], title=row[5], input_summary=row[6], config=config, sealed_at=row[8], messages=messages, analyses=analyses, persona_snapshots=persona_snapshots, )