| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import aiosqlite | |
| from .models import RunRecord, RunSummary | |
| class SQLiteRunStore: | |
| def __init__(self, db_path: str): | |
| self.db_path = str(db_path) | |
| async def init(self) -> None: | |
| Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True) | |
| async with aiosqlite.connect(self.db_path) as db: | |
| await db.execute("PRAGMA foreign_keys=ON;") | |
| await db.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS runs ( | |
| run_id TEXT PRIMARY KEY, | |
| mode TEXT NOT NULL, | |
| status TEXT NOT NULL, | |
| created_at TEXT NOT NULL, | |
| ended_at TEXT NOT NULL, | |
| title TEXT, | |
| input_summary TEXT, | |
| config_json TEXT NOT NULL, | |
| sealed_at TEXT NOT NULL | |
| ); | |
| """ | |
| ) | |
| await db.execute( | |
| "CREATE INDEX IF NOT EXISTS runs_mode_created_at ON runs(mode, created_at DESC);" | |
| ) | |
| await db.execute( | |
| "CREATE INDEX IF NOT EXISTS runs_created_at ON runs(created_at DESC);" | |
| ) | |
| await db.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS run_messages ( | |
| run_id TEXT NOT NULL, | |
| message_index INTEGER NOT NULL, | |
| role TEXT NOT NULL, | |
| persona_label TEXT, | |
| content TEXT NOT NULL, | |
| timestamp TEXT, | |
| PRIMARY KEY (run_id, message_index), | |
| FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE | |
| ); | |
| """ | |
| ) | |
| await db.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS run_analyses ( | |
| run_id TEXT NOT NULL, | |
| analysis_key TEXT NOT NULL, | |
| schema_version TEXT, | |
| prompt_version TEXT, | |
| result_json TEXT NOT NULL, | |
| PRIMARY KEY (run_id, analysis_key), | |
| FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE | |
| ); | |
| """ | |
| ) | |
| await db.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS run_persona_snapshots ( | |
| run_id TEXT NOT NULL, | |
| role TEXT NOT NULL, | |
| persona_id TEXT, | |
| persona_version_id TEXT, | |
| snapshot_json TEXT NOT NULL, | |
| PRIMARY KEY (run_id, role), | |
| FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE | |
| ); | |
| """ | |
| ) | |
| await db.commit() | |
| async def save_sealed_run(self, record: RunRecord) -> None: | |
| Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True) | |
| async with aiosqlite.connect(self.db_path) as db: | |
| await db.execute("PRAGMA foreign_keys=ON;") | |
| await db.execute("BEGIN;") | |
| try: | |
| await db.execute( | |
| """ | |
| INSERT INTO runs ( | |
| run_id, mode, status, created_at, ended_at, | |
| title, input_summary, config_json, sealed_at | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); | |
| """, | |
| ( | |
| record.run_id, | |
| record.mode, | |
| record.status, | |
| record.created_at, | |
| record.ended_at, | |
| record.title, | |
| record.input_summary, | |
| json.dumps(record.config, ensure_ascii=False), | |
| record.sealed_at, | |
| ), | |
| ) | |
| if record.messages: | |
| await db.executemany( | |
| """ | |
| INSERT INTO run_messages ( | |
| run_id, message_index, role, persona_label, content, timestamp | |
| ) VALUES (?, ?, ?, ?, ?, ?); | |
| """, | |
| [ | |
| ( | |
| record.run_id, | |
| int(msg.get("index")), | |
| str(msg.get("role") or ""), | |
| msg.get("persona"), | |
| str(msg.get("content") or ""), | |
| msg.get("timestamp"), | |
| ) | |
| for msg in record.messages | |
| if isinstance(msg.get("index"), int) | |
| ], | |
| ) | |
| if record.analyses: | |
| await db.executemany( | |
| """ | |
| INSERT INTO run_analyses ( | |
| run_id, analysis_key, schema_version, prompt_version, result_json | |
| ) VALUES (?, ?, ?, ?, ?); | |
| """, | |
| [ | |
| ( | |
| record.run_id, | |
| key, | |
| (val or {}).get("schema_version"), | |
| (val or {}).get("analysis_prompt_version"), | |
| json.dumps(val or {}, ensure_ascii=False), | |
| ) | |
| for key, val in record.analyses.items() | |
| ], | |
| ) | |
| if record.persona_snapshots: | |
| await db.executemany( | |
| """ | |
| INSERT INTO run_persona_snapshots ( | |
| run_id, role, persona_id, persona_version_id, snapshot_json | |
| ) VALUES (?, ?, ?, ?, ?); | |
| """, | |
| [ | |
| ( | |
| record.run_id, | |
| role, | |
| (snap or {}).get("persona_id"), | |
| (snap or {}).get("persona_version_id"), | |
| json.dumps((snap or {}).get("snapshot") or {}, ensure_ascii=False), | |
| ) | |
| for role, snap in record.persona_snapshots.items() | |
| ], | |
| ) | |
| await db.commit() | |
| except Exception: | |
| await db.execute("ROLLBACK;") | |
| raise | |
| async def list_runs( | |
| self, | |
| *, | |
| mode: Optional[str] = None, | |
| limit: int = 50, | |
| offset: int = 0, | |
| ) -> List[RunSummary]: | |
| limit = max(1, min(int(limit), 200)) | |
| offset = max(0, int(offset)) | |
| async with aiosqlite.connect(self.db_path) as db: | |
| await db.execute("PRAGMA foreign_keys=ON;") | |
| if mode: | |
| cursor = await db.execute( | |
| """ | |
| SELECT run_id, mode, status, created_at, ended_at, title, input_summary | |
| FROM runs | |
| WHERE mode = ? | |
| ORDER BY created_at DESC | |
| LIMIT ? OFFSET ?; | |
| """, | |
| (mode, limit, offset), | |
| ) | |
| else: | |
| cursor = await db.execute( | |
| """ | |
| SELECT run_id, mode, status, created_at, ended_at, title, input_summary | |
| FROM runs | |
| ORDER BY created_at DESC | |
| LIMIT ? OFFSET ?; | |
| """, | |
| (limit, offset), | |
| ) | |
| rows = await cursor.fetchall() | |
| return [ | |
| RunSummary( | |
| run_id=row[0], | |
| mode=row[1], | |
| status=row[2], | |
| created_at=row[3], | |
| ended_at=row[4], | |
| title=row[5], | |
| input_summary=row[6], | |
| ) | |
| for row in rows | |
| ] | |
| async def get_run(self, run_id: str) -> Optional[RunRecord]: | |
| async with aiosqlite.connect(self.db_path) as db: | |
| await db.execute("PRAGMA foreign_keys=ON;") | |
| cursor = await db.execute( | |
| """ | |
| SELECT run_id, mode, status, created_at, ended_at, title, input_summary, config_json, sealed_at | |
| FROM runs | |
| WHERE run_id = ?; | |
| """, | |
| (run_id,), | |
| ) | |
| row = await cursor.fetchone() | |
| if not row: | |
| return None | |
| cursor = await db.execute( | |
| """ | |
| SELECT message_index, role, persona_label, content, timestamp | |
| FROM run_messages | |
| WHERE run_id = ? | |
| ORDER BY message_index ASC; | |
| """, | |
| (run_id,), | |
| ) | |
| message_rows = await cursor.fetchall() | |
| cursor = await db.execute( | |
| """ | |
| SELECT analysis_key, result_json | |
| FROM run_analyses | |
| WHERE run_id = ?; | |
| """, | |
| (run_id,), | |
| ) | |
| analysis_rows = await cursor.fetchall() | |
| cursor = await db.execute( | |
| """ | |
| SELECT role, persona_id, persona_version_id, snapshot_json | |
| FROM run_persona_snapshots | |
| WHERE run_id = ?; | |
| """, | |
| (run_id,), | |
| ) | |
| snapshot_rows = await cursor.fetchall() | |
| messages: List[Dict[str, Any]] = [] | |
| for (idx, role, persona, content, ts) in message_rows: | |
| messages.append( | |
| { | |
| "index": idx, | |
| "role": role, | |
| "persona": persona, | |
| "content": content, | |
| "timestamp": ts, | |
| } | |
| ) | |
| analyses: Dict[str, Dict[str, Any]] = {} | |
| for (analysis_key, result_json) in analysis_rows: | |
| try: | |
| analyses[str(analysis_key)] = json.loads(result_json or "{}") | |
| except Exception: | |
| analyses[str(analysis_key)] = {} | |
| persona_snapshots: Dict[str, Dict[str, Any]] = {} | |
| for (role, persona_id, persona_version_id, snapshot_json) in snapshot_rows: | |
| try: | |
| snapshot = json.loads(snapshot_json or "{}") | |
| except Exception: | |
| snapshot = {} | |
| persona_snapshots[str(role)] = { | |
| "persona_id": persona_id, | |
| "persona_version_id": persona_version_id, | |
| "snapshot": snapshot, | |
| } | |
| try: | |
| config = json.loads(row[7] or "{}") | |
| except Exception: | |
| config = {} | |
| return RunRecord( | |
| run_id=row[0], | |
| mode=row[1], | |
| status=row[2], | |
| created_at=row[3], | |
| ended_at=row[4], | |
| title=row[5], | |
| input_summary=row[6], | |
| config=config, | |
| sealed_at=row[8], | |
| messages=messages, | |
| analyses=analyses, | |
| persona_snapshots=persona_snapshots, | |
| ) | |