Spaces:
Running
Running
Made some final touches integrating LLMs running and verifying judging script and making README as per requirements
abb7bf9 | """ | |
| database.py β Vera Message Engine | |
| Embedded SQLite state management matching the exact judge harness contract. | |
| Schema: | |
| contexts β (scope, context_id) composite PK, version-gated upserts | |
| conversations β append-only turn log keyed by conversation_id | |
| """ | |
| import sqlite3 | |
| import json | |
| import os | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Optional, Dict, Any, List, Tuple | |
| logger = logging.getLogger("vera.database") | |
| DB_PATH = os.getenv("VERA_DB_PATH", "vera_state.db") | |
| _conn: Optional[sqlite3.Connection] = None | |
| def get_db() -> sqlite3.Connection: | |
| """Return the singleton database connection, initializing if needed.""" | |
| global _conn | |
| if _conn is None: | |
| _conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| _conn.row_factory = sqlite3.Row | |
| _conn.execute("PRAGMA journal_mode=WAL") | |
| _conn.execute("PRAGMA synchronous=NORMAL") | |
| _conn.execute("PRAGMA busy_timeout=5000") | |
| _init_schema(_conn) | |
| _conn.execute("DELETE FROM contexts") | |
| _conn.execute("DELETE FROM conversations") | |
| _conn.commit() | |
| logger.info("Database initialized at %s", DB_PATH) | |
| return _conn | |
| def close_db(): | |
| """Gracefully close the database connection.""" | |
| global _conn | |
| if _conn is not None: | |
| _conn.close() | |
| _conn = None | |
| logger.info("Database connection closed") | |
| def _init_schema(conn: sqlite3.Connection): | |
| conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS contexts ( | |
| scope TEXT NOT NULL, | |
| context_id TEXT NOT NULL, | |
| version INTEGER NOT NULL DEFAULT 0, | |
| payload TEXT NOT NULL DEFAULT '{}', | |
| delivered_at TEXT, | |
| stored_at TEXT NOT NULL, | |
| PRIMARY KEY (scope, context_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS conversations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| conversation_id TEXT NOT NULL, | |
| turn_number INTEGER NOT NULL DEFAULT 0, | |
| role TEXT NOT NULL, | |
| message TEXT NOT NULL DEFAULT '', | |
| timestamp TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_conv_id ON conversations(conversation_id); | |
| """) | |
| conn.commit() | |
| # βββ Context CRUD βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def upsert_context( | |
| scope: str, | |
| context_id: str, | |
| version: int, | |
| payload: Dict[str, Any], | |
| delivered_at: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Idempotent context upsert matching judge contract: | |
| - If incoming version > stored version β replace atomically, return accepted=True | |
| - If incoming version <= stored version β return accepted=False + 409 (stale) | |
| - If context_id is new β insert, return accepted=True | |
| """ | |
| db = get_db() | |
| now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" | |
| row = db.execute( | |
| "SELECT version FROM contexts WHERE scope = ? AND context_id = ?", | |
| (scope, context_id), | |
| ).fetchone() | |
| if row is not None: | |
| current_version = row["version"] | |
| if version <= current_version: | |
| # Stale or duplicate β return 409 per judge contract | |
| return { | |
| "accepted": False, | |
| "reason": "stale_version", | |
| "current_version": current_version, | |
| "status_code": 409, | |
| } | |
| payload_json = json.dumps(payload, ensure_ascii=False) | |
| db.execute( | |
| """ | |
| INSERT INTO contexts (scope, context_id, version, payload, delivered_at, stored_at) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ON CONFLICT(scope, context_id) DO UPDATE SET | |
| version = excluded.version, | |
| payload = excluded.payload, | |
| delivered_at = excluded.delivered_at, | |
| stored_at = excluded.stored_at | |
| """, | |
| (scope, context_id, version, payload_json, delivered_at, now), | |
| ) | |
| db.commit() | |
| ack_id = f"ack_{context_id}_v{version}" | |
| return { | |
| "accepted": True, | |
| "ack_id": ack_id, | |
| "stored_at": now, | |
| "status_code": 200, | |
| } | |
| def get_context(scope: str, context_id: str) -> Optional[Dict[str, Any]]: | |
| """Retrieve a single context entry.""" | |
| db = get_db() | |
| row = db.execute( | |
| "SELECT * FROM contexts WHERE scope = ? AND context_id = ?", | |
| (scope, context_id), | |
| ).fetchone() | |
| if row is None: | |
| return None | |
| return { | |
| "scope": row["scope"], | |
| "context_id": row["context_id"], | |
| "version": row["version"], | |
| "payload": json.loads(row["payload"]), | |
| } | |
| def get_all_contexts_by_scope(scope: str) -> List[Dict[str, Any]]: | |
| """Get all contexts for a given scope.""" | |
| db = get_db() | |
| rows = db.execute( | |
| "SELECT context_id, version, payload FROM contexts WHERE scope = ?", | |
| (scope,), | |
| ).fetchall() | |
| return [ | |
| { | |
| "context_id": r["context_id"], | |
| "version": r["version"], | |
| "payload": json.loads(r["payload"]), | |
| } | |
| for r in rows | |
| ] | |
| def count_contexts() -> Dict[str, int]: | |
| """Count contexts per scope β used by /v1/healthz.""" | |
| db = get_db() | |
| rows = db.execute( | |
| "SELECT scope, COUNT(*) as cnt FROM contexts GROUP BY scope" | |
| ).fetchall() | |
| counts = {"category": 0, "merchant": 0, "customer": 0, "trigger": 0} | |
| for r in rows: | |
| counts[r["scope"]] = r["cnt"] | |
| return counts | |
| def wipe_all(): | |
| """Teardown β wipe all state.""" | |
| db = get_db() | |
| db.execute("DELETE FROM contexts") | |
| db.execute("DELETE FROM conversations") | |
| db.commit() | |
| logger.info("All state wiped (teardown)") | |
| # βββ Conversation CRUD ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def append_turn( | |
| conversation_id: str, | |
| turn_number: int, | |
| role: str, | |
| message: str, | |
| ): | |
| """Append a turn to a conversation.""" | |
| db = get_db() | |
| now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" | |
| db.execute( | |
| """ | |
| INSERT INTO conversations (conversation_id, turn_number, role, message, timestamp) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, | |
| (conversation_id, turn_number, role, message, now), | |
| ) | |
| db.commit() | |
| def get_conversation(conversation_id: str) -> List[Dict[str, Any]]: | |
| """Retrieve all turns for a conversation in chronological order.""" | |
| db = get_db() | |
| rows = db.execute( | |
| """ | |
| SELECT turn_number, role, message, timestamp | |
| FROM conversations | |
| WHERE conversation_id = ? | |
| ORDER BY turn_number ASC, id ASC | |
| """, | |
| (conversation_id,), | |
| ).fetchall() | |
| return [ | |
| { | |
| "turn_number": r["turn_number"], | |
| "role": r["role"], | |
| "message": r["message"], | |
| "timestamp": r["timestamp"], | |
| } | |
| for r in rows | |
| ] | |
| def conversation_exists(conversation_id: str) -> bool: | |
| """Check if a conversation already has turns logged.""" | |
| db = get_db() | |
| row = db.execute( | |
| "SELECT 1 FROM conversations WHERE conversation_id = ? LIMIT 1", | |
| (conversation_id,), | |
| ).fetchone() | |
| return row is not None | |