""" Production SQLite Database — Async via aiosqlite Handles tasks, memory, sessions, events """ import aiosqlite import os import json import time from typing import Optional, List, Dict, Any import structlog log = structlog.get_logger() DB_PATH = os.environ.get("DB_PATH", "/tmp/devin_agent.db") async def get_db() -> aiosqlite.Connection: db = await aiosqlite.connect(DB_PATH) db.row_factory = aiosqlite.Row await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA foreign_keys=ON") return db async def init_db(): """Initialize all tables.""" log.info("Initializing database", path=DB_PATH) async with aiosqlite.connect(DB_PATH) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA foreign_keys=ON") # Tasks table await db.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, session_id TEXT, project_id TEXT, goal TEXT NOT NULL, status TEXT DEFAULT 'queued', plan TEXT, result TEXT, error TEXT, metadata TEXT DEFAULT '{}', created_at REAL, started_at REAL, completed_at REAL, retry_count INTEGER DEFAULT 0 ) """) # Task events table await db.execute(""" CREATE TABLE IF NOT EXISTS task_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, event_type TEXT NOT NULL, data TEXT DEFAULT '{}', timestamp REAL, FOREIGN KEY (task_id) REFERENCES tasks(id) ) """) # Memory table await db.execute(""" CREATE TABLE IF NOT EXISTS memory ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, project_id TEXT, memory_type TEXT NOT NULL, key TEXT, content TEXT NOT NULL, metadata TEXT DEFAULT '{}', embedding TEXT, created_at REAL, updated_at REAL ) """) # Sessions table await db.execute(""" CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, project_id TEXT, user_id TEXT, metadata TEXT DEFAULT '{}', created_at REAL, last_active REAL ) """) # GitHub operations table await db.execute(""" CREATE TABLE IF NOT EXISTS github_ops ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT, operation TEXT NOT NULL, repo TEXT, branch TEXT, status TEXT DEFAULT 'pending', result TEXT, created_at REAL ) """) # Indexes await db.execute("CREATE INDEX IF NOT EXISTS idx_tasks_session ON tasks(session_id)") await db.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)") await db.execute("CREATE INDEX IF NOT EXISTS idx_events_task ON task_events(task_id)") await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_session ON memory(session_id)") await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_project ON memory(project_id)") await db.execute("CREATE INDEX IF NOT EXISTS idx_memory_type ON memory(memory_type)") await db.commit() log.info("✅ Database initialized") # ─── Task CRUD ───────────────────────────────────────────────────────────────── async def create_task(task_id: str, goal: str, session_id: str = "", project_id: str = "", metadata: dict = {}): async with aiosqlite.connect(DB_PATH) as db: await db.execute(""" INSERT INTO tasks (id, session_id, project_id, goal, status, metadata, created_at) VALUES (?, ?, ?, ?, 'queued', ?, ?) """, (task_id, session_id, project_id, goal, json.dumps(metadata), time.time())) await db.commit() async def update_task_status(task_id: str, status: str, **kwargs): fields = ["status = ?"] values = [status] if status == "executing": fields.append("started_at = ?") values.append(time.time()) if status in ("completed", "failed", "cancelled"): fields.append("completed_at = ?") values.append(time.time()) for k, v in kwargs.items(): if k in ("plan", "result", "error"): fields.append(f"{k} = ?") values.append(v if isinstance(v, str) else json.dumps(v)) elif k == "retry_count": fields.append("retry_count = ?") values.append(v) values.append(task_id) async with aiosqlite.connect(DB_PATH) as db: await db.execute(f"UPDATE tasks SET {', '.join(fields)} WHERE id = ?", values) await db.commit() async def get_task(task_id: str) -> Optional[Dict]: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) as cursor: row = await cursor.fetchone() if row: d = dict(row) d["metadata"] = json.loads(d.get("metadata") or "{}") d["plan"] = json.loads(d["plan"]) if d.get("plan") else None return d return None async def list_tasks(session_id: str = "", limit: int = 50) -> List[Dict]: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row if session_id: async with db.execute( "SELECT * FROM tasks WHERE session_id = ? ORDER BY created_at DESC LIMIT ?", (session_id, limit) ) as cursor: rows = await cursor.fetchall() else: async with db.execute( "SELECT * FROM tasks ORDER BY created_at DESC LIMIT ?", (limit,) ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows] async def save_task_event(task_id: str, event_type: str, data: dict = {}): async with aiosqlite.connect(DB_PATH) as db: await db.execute(""" INSERT INTO task_events (task_id, event_type, data, timestamp) VALUES (?, ?, ?, ?) """, (task_id, event_type, json.dumps(data), time.time())) await db.commit() async def get_task_events(task_id: str) -> List[Dict]: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM task_events WHERE task_id = ? ORDER BY timestamp ASC", (task_id,) ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows] # ─── Memory CRUD ─────────────────────────────────────────────────────────────── async def save_memory( content: str, memory_type: str, session_id: str = "", project_id: str = "", key: str = "", metadata: dict = {} ): now = time.time() async with aiosqlite.connect(DB_PATH) as db: await db.execute(""" INSERT INTO memory (session_id, project_id, memory_type, key, content, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (session_id, project_id, memory_type, key, content, json.dumps(metadata), now, now)) await db.commit() async def search_memory(query: str, session_id: str = "", project_id: str = "", limit: int = 20) -> List[Dict]: """Simple keyword search (upgrade to vector search in production).""" async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row q = f"%{query}%" if session_id: async with db.execute( "SELECT * FROM memory WHERE session_id = ? AND content LIKE ? ORDER BY updated_at DESC LIMIT ?", (session_id, q, limit) ) as cursor: rows = await cursor.fetchall() elif project_id: async with db.execute( "SELECT * FROM memory WHERE project_id = ? AND content LIKE ? ORDER BY updated_at DESC LIMIT ?", (project_id, q, limit) ) as cursor: rows = await cursor.fetchall() else: async with db.execute( "SELECT * FROM memory WHERE content LIKE ? ORDER BY updated_at DESC LIMIT ?", (q, limit) ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows] async def get_project_memory(project_id: str, memory_type: str = "", limit: int = 100) -> List[Dict]: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row if memory_type: async with db.execute( "SELECT * FROM memory WHERE project_id = ? AND memory_type = ? ORDER BY updated_at DESC LIMIT ?", (project_id, memory_type, limit) ) as cursor: rows = await cursor.fetchall() else: async with db.execute( "SELECT * FROM memory WHERE project_id = ? ORDER BY updated_at DESC LIMIT ?", (project_id, limit) ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows] async def get_history(session_id: str, limit: int = 50) -> List[Dict]: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( "SELECT * FROM memory WHERE session_id = ? AND memory_type = 'conversation' ORDER BY created_at DESC LIMIT ?", (session_id, limit) ) as cursor: rows = await cursor.fetchall() return [dict(r) for r in rows]