Spaces:
Sleeping
Sleeping
File size: 2,913 Bytes
950dcd2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | """SQLite cache for generated docs."""
import json
import sqlite3
import os
_SCHEMA = """
PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
CREATE TABLE IF NOT EXISTS repos (
id INTEGER PRIMARY KEY,
owner TEXT NOT NULL,
repo TEXT NOT NULL,
info TEXT NOT NULL, -- JSON
tree TEXT NOT NULL, -- JSON array
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(owner, repo)
);
CREATE TABLE IF NOT EXISTS docs (
id INTEGER PRIMARY KEY,
repo_id INTEGER NOT NULL REFERENCES repos(id) ON DELETE CASCADE,
doc_type TEXT NOT NULL, -- readme | architecture | api
content TEXT NOT NULL, -- JSON
generated_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(repo_id, doc_type)
);
CREATE INDEX IF NOT EXISTS idx_docs_repo ON docs(repo_id);
"""
def get_db(path: str) -> sqlite3.Connection:
con = sqlite3.connect(path)
con.row_factory = sqlite3.Row
con.executescript(_SCHEMA)
con.commit()
return con
def upsert_repo(db, owner: str, repo: str, info: dict, tree: list) -> int:
db.execute("""
INSERT INTO repos (owner, repo, info, tree)
VALUES (?, ?, ?, ?)
ON CONFLICT(owner, repo) DO UPDATE SET
info=excluded.info, tree=excluded.tree, created_at=datetime('now')
""", (owner, repo, json.dumps(info), json.dumps(tree)))
db.commit()
row = db.execute("SELECT id FROM repos WHERE owner=? AND repo=?",
(owner, repo)).fetchone()
return row["id"]
def upsert_doc(db, repo_id: int, doc_type: str, content: dict):
db.execute("""
INSERT INTO docs (repo_id, doc_type, content)
VALUES (?, ?, ?)
ON CONFLICT(repo_id, doc_type) DO UPDATE SET
content=excluded.content, generated_at=datetime('now')
""", (repo_id, doc_type, json.dumps(content)))
db.commit()
def get_docs(db, owner: str, repo: str) -> dict | None:
row = db.execute("SELECT id FROM repos WHERE owner=? AND repo=?",
(owner, repo)).fetchone()
if not row:
return None
docs = db.execute("SELECT doc_type, content FROM docs WHERE repo_id=?",
(row["id"],)).fetchall()
if not docs:
return None
result = {}
for d in docs:
result[d["doc_type"]] = json.loads(d["content"])
return result
def list_recent(db, limit: int = 10) -> list:
rows = db.execute("""
SELECT r.owner, r.repo, r.info, r.created_at,
COUNT(d.id) as doc_count
FROM repos r LEFT JOIN docs d ON d.repo_id = r.id
GROUP BY r.id ORDER BY r.created_at DESC LIMIT ?
""", (limit,)).fetchall()
return [{"owner": r["owner"], "repo": r["repo"],
"info": json.loads(r["info"]),
"created_at": r["created_at"], "doc_count": r["doc_count"]}
for r in rows]
|