"""SQLite cache for generated docs.""" import json import sqlite3 import os _SCHEMA = """ PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON; CREATE TABLE IF NOT EXISTS repos ( id INTEGER PRIMARY KEY, owner TEXT NOT NULL, repo TEXT NOT NULL, info TEXT NOT NULL, -- JSON tree TEXT NOT NULL, -- JSON array created_at TEXT NOT NULL DEFAULT (datetime('now')), UNIQUE(owner, repo) ); CREATE TABLE IF NOT EXISTS docs ( id INTEGER PRIMARY KEY, repo_id INTEGER NOT NULL REFERENCES repos(id) ON DELETE CASCADE, doc_type TEXT NOT NULL, -- readme | architecture | api content TEXT NOT NULL, -- JSON generated_at TEXT NOT NULL DEFAULT (datetime('now')), UNIQUE(repo_id, doc_type) ); CREATE INDEX IF NOT EXISTS idx_docs_repo ON docs(repo_id); """ def get_db(path: str) -> sqlite3.Connection: con = sqlite3.connect(path) con.row_factory = sqlite3.Row con.executescript(_SCHEMA) con.commit() return con def upsert_repo(db, owner: str, repo: str, info: dict, tree: list) -> int: db.execute(""" INSERT INTO repos (owner, repo, info, tree) VALUES (?, ?, ?, ?) ON CONFLICT(owner, repo) DO UPDATE SET info=excluded.info, tree=excluded.tree, created_at=datetime('now') """, (owner, repo, json.dumps(info), json.dumps(tree))) db.commit() row = db.execute("SELECT id FROM repos WHERE owner=? AND repo=?", (owner, repo)).fetchone() return row["id"] def upsert_doc(db, repo_id: int, doc_type: str, content: dict): db.execute(""" INSERT INTO docs (repo_id, doc_type, content) VALUES (?, ?, ?) ON CONFLICT(repo_id, doc_type) DO UPDATE SET content=excluded.content, generated_at=datetime('now') """, (repo_id, doc_type, json.dumps(content))) db.commit() def get_docs(db, owner: str, repo: str) -> dict | None: row = db.execute("SELECT id FROM repos WHERE owner=? AND repo=?", (owner, repo)).fetchone() if not row: return None docs = db.execute("SELECT doc_type, content FROM docs WHERE repo_id=?", (row["id"],)).fetchall() if not docs: return None result = {} for d in docs: result[d["doc_type"]] = json.loads(d["content"]) return result def list_recent(db, limit: int = 10) -> list: rows = db.execute(""" SELECT r.owner, r.repo, r.info, r.created_at, COUNT(d.id) as doc_count FROM repos r LEFT JOIN docs d ON d.repo_id = r.id GROUP BY r.id ORDER BY r.created_at DESC LIMIT ? """, (limit,)).fetchall() return [{"owner": r["owner"], "repo": r["repo"], "info": json.loads(r["info"]), "created_at": r["created_at"], "doc_count": r["doc_count"]} for r in rows]