ConverTA / backend /storage /sqlite_run_store.py
MikelWL's picture
Fix: support older aiosqlite without execute_fetch*
d2c64aa
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
import aiosqlite
from .models import RunRecord, RunSummary
class SQLiteRunStore:
def __init__(self, db_path: str):
self.db_path = str(db_path)
async def init(self) -> None:
Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True)
async with aiosqlite.connect(self.db_path) as db:
await db.execute("PRAGMA foreign_keys=ON;")
await db.execute(
"""
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
mode TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
ended_at TEXT NOT NULL,
title TEXT,
input_summary TEXT,
config_json TEXT NOT NULL,
sealed_at TEXT NOT NULL
);
"""
)
await db.execute(
"CREATE INDEX IF NOT EXISTS runs_mode_created_at ON runs(mode, created_at DESC);"
)
await db.execute(
"CREATE INDEX IF NOT EXISTS runs_created_at ON runs(created_at DESC);"
)
await db.execute(
"""
CREATE TABLE IF NOT EXISTS run_messages (
run_id TEXT NOT NULL,
message_index INTEGER NOT NULL,
role TEXT NOT NULL,
persona_label TEXT,
content TEXT NOT NULL,
timestamp TEXT,
PRIMARY KEY (run_id, message_index),
FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE
);
"""
)
await db.execute(
"""
CREATE TABLE IF NOT EXISTS run_analyses (
run_id TEXT NOT NULL,
analysis_key TEXT NOT NULL,
schema_version TEXT,
prompt_version TEXT,
result_json TEXT NOT NULL,
PRIMARY KEY (run_id, analysis_key),
FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE
);
"""
)
await db.execute(
"""
CREATE TABLE IF NOT EXISTS run_persona_snapshots (
run_id TEXT NOT NULL,
role TEXT NOT NULL,
persona_id TEXT,
persona_version_id TEXT,
snapshot_json TEXT NOT NULL,
PRIMARY KEY (run_id, role),
FOREIGN KEY (run_id) REFERENCES runs(run_id) ON DELETE CASCADE
);
"""
)
await db.commit()
async def save_sealed_run(self, record: RunRecord) -> None:
Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True)
async with aiosqlite.connect(self.db_path) as db:
await db.execute("PRAGMA foreign_keys=ON;")
await db.execute("BEGIN;")
try:
await db.execute(
"""
INSERT INTO runs (
run_id, mode, status, created_at, ended_at,
title, input_summary, config_json, sealed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
""",
(
record.run_id,
record.mode,
record.status,
record.created_at,
record.ended_at,
record.title,
record.input_summary,
json.dumps(record.config, ensure_ascii=False),
record.sealed_at,
),
)
if record.messages:
await db.executemany(
"""
INSERT INTO run_messages (
run_id, message_index, role, persona_label, content, timestamp
) VALUES (?, ?, ?, ?, ?, ?);
""",
[
(
record.run_id,
int(msg.get("index")),
str(msg.get("role") or ""),
msg.get("persona"),
str(msg.get("content") or ""),
msg.get("timestamp"),
)
for msg in record.messages
if isinstance(msg.get("index"), int)
],
)
if record.analyses:
await db.executemany(
"""
INSERT INTO run_analyses (
run_id, analysis_key, schema_version, prompt_version, result_json
) VALUES (?, ?, ?, ?, ?);
""",
[
(
record.run_id,
key,
(val or {}).get("schema_version"),
(val or {}).get("analysis_prompt_version"),
json.dumps(val or {}, ensure_ascii=False),
)
for key, val in record.analyses.items()
],
)
if record.persona_snapshots:
await db.executemany(
"""
INSERT INTO run_persona_snapshots (
run_id, role, persona_id, persona_version_id, snapshot_json
) VALUES (?, ?, ?, ?, ?);
""",
[
(
record.run_id,
role,
(snap or {}).get("persona_id"),
(snap or {}).get("persona_version_id"),
json.dumps((snap or {}).get("snapshot") or {}, ensure_ascii=False),
)
for role, snap in record.persona_snapshots.items()
],
)
await db.commit()
except Exception:
await db.execute("ROLLBACK;")
raise
async def list_runs(
self,
*,
mode: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[RunSummary]:
limit = max(1, min(int(limit), 200))
offset = max(0, int(offset))
async with aiosqlite.connect(self.db_path) as db:
await db.execute("PRAGMA foreign_keys=ON;")
if mode:
cursor = await db.execute(
"""
SELECT run_id, mode, status, created_at, ended_at, title, input_summary
FROM runs
WHERE mode = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?;
""",
(mode, limit, offset),
)
else:
cursor = await db.execute(
"""
SELECT run_id, mode, status, created_at, ended_at, title, input_summary
FROM runs
ORDER BY created_at DESC
LIMIT ? OFFSET ?;
""",
(limit, offset),
)
rows = await cursor.fetchall()
return [
RunSummary(
run_id=row[0],
mode=row[1],
status=row[2],
created_at=row[3],
ended_at=row[4],
title=row[5],
input_summary=row[6],
)
for row in rows
]
async def get_run(self, run_id: str) -> Optional[RunRecord]:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("PRAGMA foreign_keys=ON;")
cursor = await db.execute(
"""
SELECT run_id, mode, status, created_at, ended_at, title, input_summary, config_json, sealed_at
FROM runs
WHERE run_id = ?;
""",
(run_id,),
)
row = await cursor.fetchone()
if not row:
return None
cursor = await db.execute(
"""
SELECT message_index, role, persona_label, content, timestamp
FROM run_messages
WHERE run_id = ?
ORDER BY message_index ASC;
""",
(run_id,),
)
message_rows = await cursor.fetchall()
cursor = await db.execute(
"""
SELECT analysis_key, result_json
FROM run_analyses
WHERE run_id = ?;
""",
(run_id,),
)
analysis_rows = await cursor.fetchall()
cursor = await db.execute(
"""
SELECT role, persona_id, persona_version_id, snapshot_json
FROM run_persona_snapshots
WHERE run_id = ?;
""",
(run_id,),
)
snapshot_rows = await cursor.fetchall()
messages: List[Dict[str, Any]] = []
for (idx, role, persona, content, ts) in message_rows:
messages.append(
{
"index": idx,
"role": role,
"persona": persona,
"content": content,
"timestamp": ts,
}
)
analyses: Dict[str, Dict[str, Any]] = {}
for (analysis_key, result_json) in analysis_rows:
try:
analyses[str(analysis_key)] = json.loads(result_json or "{}")
except Exception:
analyses[str(analysis_key)] = {}
persona_snapshots: Dict[str, Dict[str, Any]] = {}
for (role, persona_id, persona_version_id, snapshot_json) in snapshot_rows:
try:
snapshot = json.loads(snapshot_json or "{}")
except Exception:
snapshot = {}
persona_snapshots[str(role)] = {
"persona_id": persona_id,
"persona_version_id": persona_version_id,
"snapshot": snapshot,
}
try:
config = json.loads(row[7] or "{}")
except Exception:
config = {}
return RunRecord(
run_id=row[0],
mode=row[1],
status=row[2],
created_at=row[3],
ended_at=row[4],
title=row[5],
input_summary=row[6],
config=config,
sealed_at=row[8],
messages=messages,
analyses=analyses,
persona_snapshots=persona_snapshots,
)