""" SQLite task persistence. Tables: tasks(id, title, description, state, created_at, updated_at, payload_json) task_events(id, task_id, ts, kind, message) retries(id, task_id, attempt, error, ts) deployments(id, task_id, target, url, status, ts) sandboxes(id, task_id, sandbox_id, status, ts) provider_usage(id, provider, ts, ok, latency_ms, error) checkpoints(id, task_id, step_index, state_json, ts) """ from __future__ import annotations import json import os import sqlite3 import threading import time import uuid from typing import Any, Dict, List, Optional DB_PATH = os.getenv("TASKS_DB_PATH", os.path.join(os.path.dirname(__file__), "tasks.db")) _LOCK = threading.RLock() SCHEMA = """ CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, title TEXT, description TEXT, state TEXT, created_at REAL, updated_at REAL, payload_json TEXT ); CREATE TABLE IF NOT EXISTS task_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, ts REAL, kind TEXT, message TEXT ); CREATE INDEX IF NOT EXISTS idx_task_events_task ON task_events(task_id); CREATE TABLE IF NOT EXISTS retries ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, attempt INTEGER, error TEXT, ts REAL ); CREATE TABLE IF NOT EXISTS deployments ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, target TEXT, url TEXT, status TEXT, ts REAL ); CREATE TABLE IF NOT EXISTS sandboxes ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, sandbox_id TEXT, status TEXT, ts REAL ); CREATE TABLE IF NOT EXISTS provider_usage ( id INTEGER PRIMARY KEY AUTOINCREMENT, provider TEXT, ts REAL, ok INTEGER, latency_ms REAL, error TEXT ); CREATE TABLE IF NOT EXISTS checkpoints ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, step_index INTEGER, state_json TEXT, ts REAL ); """ # Valid task states TASK_STATES = [ "queued", "planning", "thinking", "executing", "repairing", "retrying", "deploying", "completed", "failed", ] def _conn() -> sqlite3.Connection: c = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=30.0) c.row_factory = sqlite3.Row c.execute("PRAGMA journal_mode=WAL") return c def init_db() -> None: with _LOCK: c = _conn() try: c.executescript(SCHEMA) c.commit() finally: c.close() # --------------------------------------------------------------------------- # Task CRUD # --------------------------------------------------------------------------- def create_task(title: str, description: str, payload: Optional[Dict[str, Any]] = None) -> str: task_id = uuid.uuid4().hex[:12] now = time.time() with _LOCK: c = _conn() try: c.execute( "INSERT INTO tasks(id, title, description, state, created_at, updated_at, payload_json) VALUES (?,?,?,?,?,?,?)", (task_id, title, description, "queued", now, now, json.dumps(payload or {})), ) c.commit() finally: c.close() log_event(task_id, "create", f"Task created: {title}") return task_id def update_state(task_id: str, state: str) -> None: with _LOCK: c = _conn() try: c.execute("UPDATE tasks SET state=?, updated_at=? WHERE id=?", (state, time.time(), task_id)) c.commit() finally: c.close() log_event(task_id, "state", state) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _LOCK: c = _conn() try: row = c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone() return dict(row) if row else None finally: c.close() def list_tasks(limit: int = 50) -> List[Dict[str, Any]]: with _LOCK: c = _conn() try: rows = c.execute( "SELECT * FROM tasks ORDER BY updated_at DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] finally: c.close() def log_event(task_id: str, kind: str, message: str) -> None: with _LOCK: c = _conn() try: c.execute( "INSERT INTO task_events(task_id, ts, kind, message) VALUES (?,?,?,?)", (task_id, time.time(), kind, message[:8000]), ) c.commit() finally: c.close() def get_events(task_id: str, since_id: int = 0, limit: int = 1000) -> List[Dict[str, Any]]: with _LOCK: c = _conn() try: rows = c.execute( "SELECT * FROM task_events WHERE task_id=? AND id>? ORDER BY id ASC LIMIT ?", (task_id, since_id, limit), ).fetchall() return [dict(r) for r in rows] finally: c.close() def record_retry(task_id: str, attempt: int, error: str) -> None: with _LOCK: c = _conn() try: c.execute( "INSERT INTO retries(task_id, attempt, error, ts) VALUES (?,?,?,?)", (task_id, attempt, error[:4000], time.time()), ) c.commit() finally: c.close() def record_deployment(task_id: str, target: str, url: str, status: str) -> None: with _LOCK: c = _conn() try: c.execute( "INSERT INTO deployments(task_id, target, url, status, ts) VALUES (?,?,?,?,?)", (task_id, target, url, status, time.time()), ) c.commit() finally: c.close() def save_checkpoint(task_id: str, step_index: int, state: Dict[str, Any]) -> None: with _LOCK: c = _conn() try: c.execute( "INSERT INTO checkpoints(task_id, step_index, state_json, ts) VALUES (?,?,?,?)", (task_id, step_index, json.dumps(state)[:64000], time.time()), ) c.commit() finally: c.close() def latest_checkpoint(task_id: str) -> Optional[Dict[str, Any]]: with _LOCK: c = _conn() try: row = c.execute( "SELECT * FROM checkpoints WHERE task_id=? ORDER BY id DESC LIMIT 1", (task_id,) ).fetchone() if not row: return None d = dict(row) d["state"] = json.loads(d["state_json"] or "{}") return d finally: c.close() def record_provider_usage(provider: str, ok: bool, latency_ms: float, error: str = "") -> None: with _LOCK: c = _conn() try: c.execute( "INSERT INTO provider_usage(provider, ts, ok, latency_ms, error) VALUES (?,?,?,?,?)", (provider, time.time(), 1 if ok else 0, latency_ms, error[:1000]), ) c.commit() finally: c.close() init_db()