AI Developer Agent
AI Developer Agent v1.0 backend
763ef0d
"""
SQLite task persistence.
Tables:
tasks(id, title, description, state, created_at, updated_at, payload_json)
task_events(id, task_id, ts, kind, message)
retries(id, task_id, attempt, error, ts)
deployments(id, task_id, target, url, status, ts)
sandboxes(id, task_id, sandbox_id, status, ts)
provider_usage(id, provider, ts, ok, latency_ms, error)
checkpoints(id, task_id, step_index, state_json, ts)
"""
from __future__ import annotations
import json
import os
import sqlite3
import threading
import time
import uuid
from typing import Any, Dict, List, Optional
DB_PATH = os.getenv("TASKS_DB_PATH", os.path.join(os.path.dirname(__file__), "tasks.db"))
_LOCK = threading.RLock()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT,
description TEXT,
state TEXT,
created_at REAL,
updated_at REAL,
payload_json TEXT
);
CREATE TABLE IF NOT EXISTS task_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
ts REAL,
kind TEXT,
message TEXT
);
CREATE INDEX IF NOT EXISTS idx_task_events_task ON task_events(task_id);
CREATE TABLE IF NOT EXISTS retries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
attempt INTEGER,
error TEXT,
ts REAL
);
CREATE TABLE IF NOT EXISTS deployments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
target TEXT,
url TEXT,
status TEXT,
ts REAL
);
CREATE TABLE IF NOT EXISTS sandboxes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
sandbox_id TEXT,
status TEXT,
ts REAL
);
CREATE TABLE IF NOT EXISTS provider_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider TEXT,
ts REAL,
ok INTEGER,
latency_ms REAL,
error TEXT
);
CREATE TABLE IF NOT EXISTS checkpoints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT,
step_index INTEGER,
state_json TEXT,
ts REAL
);
"""
# Valid task states
TASK_STATES = [
"queued", "planning", "thinking", "executing",
"repairing", "retrying", "deploying", "completed", "failed",
]
def _conn() -> sqlite3.Connection:
c = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=30.0)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL")
return c
def init_db() -> None:
with _LOCK:
c = _conn()
try:
c.executescript(SCHEMA)
c.commit()
finally:
c.close()
# ---------------------------------------------------------------------------
# Task CRUD
# ---------------------------------------------------------------------------
def create_task(title: str, description: str, payload: Optional[Dict[str, Any]] = None) -> str:
task_id = uuid.uuid4().hex[:12]
now = time.time()
with _LOCK:
c = _conn()
try:
c.execute(
"INSERT INTO tasks(id, title, description, state, created_at, updated_at, payload_json) VALUES (?,?,?,?,?,?,?)",
(task_id, title, description, "queued", now, now, json.dumps(payload or {})),
)
c.commit()
finally:
c.close()
log_event(task_id, "create", f"Task created: {title}")
return task_id
def update_state(task_id: str, state: str) -> None:
with _LOCK:
c = _conn()
try:
c.execute("UPDATE tasks SET state=?, updated_at=? WHERE id=?", (state, time.time(), task_id))
c.commit()
finally:
c.close()
log_event(task_id, "state", state)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _LOCK:
c = _conn()
try:
row = c.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
return dict(row) if row else None
finally:
c.close()
def list_tasks(limit: int = 50) -> List[Dict[str, Any]]:
with _LOCK:
c = _conn()
try:
rows = c.execute(
"SELECT * FROM tasks ORDER BY updated_at DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
finally:
c.close()
def log_event(task_id: str, kind: str, message: str) -> None:
with _LOCK:
c = _conn()
try:
c.execute(
"INSERT INTO task_events(task_id, ts, kind, message) VALUES (?,?,?,?)",
(task_id, time.time(), kind, message[:8000]),
)
c.commit()
finally:
c.close()
def get_events(task_id: str, since_id: int = 0, limit: int = 1000) -> List[Dict[str, Any]]:
with _LOCK:
c = _conn()
try:
rows = c.execute(
"SELECT * FROM task_events WHERE task_id=? AND id>? ORDER BY id ASC LIMIT ?",
(task_id, since_id, limit),
).fetchall()
return [dict(r) for r in rows]
finally:
c.close()
def record_retry(task_id: str, attempt: int, error: str) -> None:
with _LOCK:
c = _conn()
try:
c.execute(
"INSERT INTO retries(task_id, attempt, error, ts) VALUES (?,?,?,?)",
(task_id, attempt, error[:4000], time.time()),
)
c.commit()
finally:
c.close()
def record_deployment(task_id: str, target: str, url: str, status: str) -> None:
with _LOCK:
c = _conn()
try:
c.execute(
"INSERT INTO deployments(task_id, target, url, status, ts) VALUES (?,?,?,?,?)",
(task_id, target, url, status, time.time()),
)
c.commit()
finally:
c.close()
def save_checkpoint(task_id: str, step_index: int, state: Dict[str, Any]) -> None:
with _LOCK:
c = _conn()
try:
c.execute(
"INSERT INTO checkpoints(task_id, step_index, state_json, ts) VALUES (?,?,?,?)",
(task_id, step_index, json.dumps(state)[:64000], time.time()),
)
c.commit()
finally:
c.close()
def latest_checkpoint(task_id: str) -> Optional[Dict[str, Any]]:
with _LOCK:
c = _conn()
try:
row = c.execute(
"SELECT * FROM checkpoints WHERE task_id=? ORDER BY id DESC LIMIT 1", (task_id,)
).fetchone()
if not row:
return None
d = dict(row)
d["state"] = json.loads(d["state_json"] or "{}")
return d
finally:
c.close()
def record_provider_usage(provider: str, ok: bool, latency_ms: float, error: str = "") -> None:
with _LOCK:
c = _conn()
try:
c.execute(
"INSERT INTO provider_usage(provider, ts, ok, latency_ms, error) VALUES (?,?,?,?,?)",
(provider, time.time(), 1 if ok else 0, latency_ms, error[:1000]),
)
c.commit()
finally:
c.close()
init_db()