recon / src /memory.py
MukulRay's picture
Phase 13: HF Spaces deploy ready - verdict logging, clean requirements
6f237d6
import sqlite3
import json
import os
from datetime import datetime
from src.state import SessionContext, SessionUpdate, Claim
_DEFAULT_DB = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "sessions.db")
DB_PATH = os.environ.get("SESSION_DB_PATH", _DEFAULT_DB)
def _get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
"""Create tables if they don't exist. Call once at app startup."""
with _get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS session_turns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
query TEXT NOT NULL,
position TEXT NOT NULL,
claim_json TEXT NOT NULL, -- JSON list of Claim dicts
contradictions TEXT NOT NULL, -- JSON list of strings
created_at TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS verdict_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
query TEXT NOT NULL,
verdict TEXT NOT NULL, -- PASS/STALE/CONTRADICTED/INSUFFICIENT/FORCED_PASS
mean_age_months REAL, -- mean age of retrieved papers in months
retry_count INTEGER,
papers_json TEXT NOT NULL, -- JSON list of {title, year, citation_count, paper_id}
critic_notes TEXT,
session_id TEXT
)
""")
conn.commit()
def log_verdict(
query: str,
verdict: str,
papers: list,
critic_notes: str = "",
mean_age_months: float = 0.0,
retry_count: int = 0,
session_id: str = "",
) -> None:
"""
Log every critic verdict to verdict_log for leaderboard generation.
Called from synthesizer_node after every completed pipeline run.
Each row represents one query where the critic fired a specific verdict,
along with the papers that were retrieved and evaluated.
This is the raw data that generates the real, query-driven superseded
paper leaderboard — not a pre-written document.
Args:
query: The original research question
verdict: PASS / STALE / CONTRADICTED / INSUFFICIENT / FORCED_PASS
papers: List of Paper dataclass objects retrieved for this query
critic_notes: Human-readable explanation from the critic
mean_age_months: Mean age of retrieved papers in months
retry_count: Number of retries before this verdict
session_id: Session UUID (optional, for traceability)
"""
now = datetime.utcnow().isoformat()
# Serialise paper metadata — just what's needed for the leaderboard
papers_data = []
for p in papers[:8]: # cap at 8 — same as synthesizer display limit
try:
papers_data.append({
"title": getattr(p, "title", "") or "",
"year": getattr(p, "year", 0) or 0,
"citation_count": getattr(p, "citation_count", 0) or 0,
"paper_id": getattr(p, "paper_id", "") or "",
"authors": (getattr(p, "authors", []) or [])[:2],
"hybrid_score": round(getattr(p, "hybrid_score", 0.0) or 0.0, 4),
})
except Exception:
continue
try:
with _get_conn() as conn:
conn.execute(
"""
INSERT INTO verdict_log
(created_at, query, verdict, mean_age_months,
retry_count, papers_json, critic_notes, session_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
now,
query,
verdict,
round(mean_age_months, 1),
retry_count,
json.dumps(papers_data),
critic_notes or "",
session_id or "",
),
)
conn.commit()
except Exception as e:
# Non-fatal — never let logging break the pipeline
import logging
logging.getLogger(__name__).warning(f"verdict_log insert failed: {e}")
def query_verdict_log(
verdict_filter: list[str] | None = None,
min_count: int = 1,
limit: int = 500,
) -> list[dict]:
"""
Query the verdict log for leaderboard generation.
Args:
verdict_filter: List of verdicts to include, e.g. ['STALE', 'CONTRADICTED'].
None = all verdicts.
min_count: Minimum number of times a paper must appear to be included.
limit: Max rows to return from the log.
Returns:
List of raw verdict_log rows as dicts.
"""
with _get_conn() as conn:
if verdict_filter:
placeholders = ",".join("?" * len(verdict_filter))
rows = conn.execute(
f"""
SELECT * FROM verdict_log
WHERE verdict IN ({placeholders})
ORDER BY created_at DESC
LIMIT ?
""",
(*verdict_filter, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM verdict_log ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def load_session(session_id: str) -> SessionContext:
"""
Load prior positions and contradictions for this session.
Returns an empty SessionContext if the session doesn't exist yet.
"""
with _get_conn() as conn:
rows = conn.execute(
"""
SELECT query, position, contradictions
FROM session_turns
WHERE session_id = ?
ORDER BY created_at ASC
""",
(session_id,)
).fetchall()
if not rows:
return SessionContext()
prior_positions = [r["position"] for r in rows]
prior_queries = [r["query"] for r in rows]
flagged_contradictions = []
for r in rows:
flagged_contradictions.extend(json.loads(r["contradictions"]))
return SessionContext(
prior_positions=prior_positions,
prior_queries=prior_queries,
flagged_contradictions=flagged_contradictions,
)
def save_turn(session_id: str, update: SessionUpdate) -> None:
"""
Persist one completed turn (query + synthesized position + claims).
Creates the session row if it doesn't exist.
"""
now = datetime.utcnow().isoformat()
claim_json = json.dumps([
{
"text": c.text,
"source_title": c.source_title,
"source_year": c.source_year,
"confidence": c.confidence,
"flagged": c.flagged,
}
for c in update.claim_confidences
])
contradictions_json = json.dumps(update.contradictions_found)
with _get_conn() as conn:
# Upsert the session header row
conn.execute(
"""
INSERT INTO sessions (session_id, created_at, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET updated_at = excluded.updated_at
""",
(session_id, now, now),
)
# Insert the turn
conn.execute(
"""
INSERT INTO session_turns
(session_id, query, position, claim_json, contradictions, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(session_id, update.query, update.position,
claim_json, contradictions_json, now),
)
conn.commit()
def export_session_md(session_id: str) -> str:
"""
Export the full session as a markdown research note.
Returns the markdown string (not saved to disk here — caller decides).
"""
with _get_conn() as conn:
session_row = conn.execute(
"SELECT created_at FROM sessions WHERE session_id = ?",
(session_id,)
).fetchone()
turns = conn.execute(
"""
SELECT query, position, claim_json, contradictions, created_at
FROM session_turns
WHERE session_id = ?
ORDER BY created_at ASC
""",
(session_id,)
).fetchall()
if not session_row:
return "# Session not found\n"
lines = [
f"# RECON Research Session",
f"**Session ID:** `{session_id}` ",
f"**Started:** {session_row['created_at']} ",
f"**Turns:** {len(turns)}",
"",
"---",
"",
]
for i, turn in enumerate(turns, 1):
claims = json.loads(turn["claim_json"])
contradictions = json.loads(turn["contradictions"])
lines += [
f"## Turn {i}: {turn['query']}",
"",
"### Position",
turn["position"],
"",
]
if claims:
lines += ["### Claims", ""]
for c in claims:
flag = " ⚠️" if c["flagged"] else ""
lines.append(
f"- **[{c['confidence'].upper()}]** {c['text']} "
f"— *{c['source_title']} ({c['source_year']})*{flag}"
)
lines.append("")
if contradictions:
lines += ["### Contradictions flagged", ""]
for contradiction in contradictions:
lines.append(f"- {contradiction}")
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def delete_session(session_id: str) -> None:
"""Hard delete a session and all its turns."""
with _get_conn() as conn:
conn.execute("DELETE FROM session_turns WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.commit()