import sqlite3 import json import os from pathlib import Path class MemoryStore: """ Python wrapper for the SQLite database used by the Synesthesia Conductor. Matches the schema in extensions/synesthesia-conductor/internal/memory/store.go. """ def __init__(self, db_path=None): if db_path is None: db_path = os.getenv("DATABASE_PATH", "./data/orchestration.db") self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(str(self.db_path)) self.conn.row_factory = sqlite3.Row self.conn.execute("PRAGMA foreign_keys = ON;") self._create_tables() def _create_tables(self): schema = """ CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT UNIQUE NOT NULL, description TEXT, status TEXT DEFAULT 'pending', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS task_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, event_type TEXT NOT NULL, payload TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (task_id) REFERENCES tasks(task_id) ); CREATE TABLE IF NOT EXISTS flow_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, flow_name TEXT NOT NULL, status TEXT NOT NULL, metadata TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS agent_decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_name TEXT NOT NULL, task_id TEXT, decision TEXT NOT NULL, reasoning TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS improvement_proposals ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'pending', failure_ref TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS jules_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_name TEXT UNIQUE NOT NULL, task_id TEXT, status TEXT DEFAULT 'active', pr_url TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_polled_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_task_events_task_id ON task_events(task_id); CREATE INDEX IF NOT EXISTS idx_task_events_type ON task_events(event_type); CREATE INDEX IF NOT EXISTS idx_flow_runs_flow_name ON flow_runs(flow_name); CREATE INDEX IF NOT EXISTS idx_agent_decisions_task_id ON agent_decisions(task_id); CREATE INDEX IF NOT EXISTS idx_improvement_status ON improvement_proposals(status); CREATE INDEX IF NOT EXISTS idx_jules_sessions_status ON jules_sessions(status); CREATE INDEX IF NOT EXISTS idx_jules_sessions_task_id ON jules_sessions(task_id); -- Optimized for SearchSimilarFailures CREATE INDEX IF NOT EXISTS idx_task_events_created_at ON task_events(created_at); CREATE INDEX IF NOT EXISTS idx_flow_runs_created_at ON flow_runs(created_at); """ self.conn.executescript(schema) self.conn.commit() def close(self): if self.conn: self.conn.close() self.conn = None def record_task(self, task_id, description, status): """Creates or updates a task record.""" query = """ INSERT INTO tasks (task_id, description, status) VALUES (?, ?, ?) ON CONFLICT(task_id) DO UPDATE SET status = excluded.status, updated_at = CURRENT_TIMESTAMP """ self.conn.execute(query, (task_id, description, status)) self.conn.commit() def record_task_event(self, task_id, event_type, payload): """Records an event for a task with the given payload.""" payload_json = json.dumps(payload) query = "INSERT INTO task_events (task_id, event_type, payload) VALUES (?, ?, ?)" self.conn.execute(query, (task_id, event_type, payload_json)) self.conn.commit() def record_flow_run(self, flow_name, status, metadata): """Records a flow execution with the given status and metadata.""" metadata_json = json.dumps(metadata) query = "INSERT INTO flow_runs (flow_name, status, metadata) VALUES (?, ?, ?)" self.conn.execute(query, (flow_name, status, metadata_json)) self.conn.commit() def record_agent_decision(self, agent_name, task_id, decision, reasoning): """Records a decision made by an agent.""" query = """ INSERT INTO agent_decisions (agent_name, task_id, decision, reasoning) VALUES (?, ?, ?, ?) """ self.conn.execute(query, (agent_name, task_id, decision, reasoning)) self.conn.commit() def record_improvement_proposal(self, title, description, failure_ref): """Records a new improvement proposal.""" query = """ INSERT INTO improvement_proposals (title, description, failure_ref, status) VALUES (?, ?, ?, 'pending') """ self.conn.execute(query, (title, description, failure_ref)) self.conn.commit() def record_jules_session(self, session_name, task_id): """Records a new Jules session.""" query = """ INSERT INTO jules_sessions (session_name, task_id, status) VALUES (?, ?, 'active') ON CONFLICT(session_name) DO UPDATE SET task_id = excluded.task_id, last_polled_at = CURRENT_TIMESTAMP """ self.conn.execute(query, (session_name, task_id)) self.conn.commit() def get_task(self, task_id): """Retrieves a task by ID.""" query = "SELECT status, description FROM tasks WHERE task_id = ?" row = self.conn.execute(query, (task_id,)).fetchone() if row: return dict(row) return None def get_task_history(self, task_id): """Retrieves all events for a specific task, ordered by creation time.""" query = """ SELECT id, task_id, event_type, payload, created_at FROM task_events WHERE task_id = ? ORDER BY created_at ASC """ rows = self.conn.execute(query, (task_id,)).fetchall() return [dict(row) for row in rows] def get_agent_decisions(self, task_id=None): """Retrieves agent decisions, optionally filtered by task ID.""" if task_id: query = """ SELECT id, agent_name, task_id, decision, reasoning, created_at FROM agent_decisions WHERE task_id = ? ORDER BY created_at ASC """ rows = self.conn.execute(query, (task_id,)).fetchall() else: query = """ SELECT id, agent_name, task_id, decision, reasoning, created_at FROM agent_decisions ORDER BY created_at ASC """ rows = self.conn.execute(query).fetchall() return [dict(row) for row in rows] def get_improvement_proposals(self, status=None): """Retrieves improvement proposals, optionally filtered by status.""" if status: query = """ SELECT id, title, description, status, failure_ref, created_at FROM improvement_proposals WHERE status = ? ORDER BY created_at DESC """ rows = self.conn.execute(query, (status,)).fetchall() else: query = """ SELECT id, title, description, status, failure_ref, created_at FROM improvement_proposals ORDER BY created_at DESC """ rows = self.conn.execute(query).fetchall() return [dict(row) for row in rows] def search_similar_failures(self, description): """ Searches for similar failure events using keyword matching. Returns up to 10 matching failures ordered by recency. """ query = """ SELECT te.id, te.task_id, te.payload as description, COALESCE(fr.flow_name, 'unknown') as flow_name, te.created_at FROM task_events te LEFT JOIN flow_runs fr ON te.created_at BETWEEN datetime(fr.created_at, '-5 minutes') AND datetime(fr.created_at, '+5 minutes') WHERE te.event_type = 'failure' AND te.payload LIKE ? ORDER BY te.created_at DESC LIMIT 10 """ like_pattern = f"%{description}%" rows = self.conn.execute(query, (like_pattern,)).fetchall() return [dict(row) for row in rows] def get_active_jules_sessions(self): """Retrieves all active Jules sessions.""" query = """ SELECT id, session_name, task_id, status, pr_url, created_at, last_polled_at FROM jules_sessions WHERE status = 'active' ORDER BY created_at ASC """ rows = self.conn.execute(query).fetchall() return [dict(row) for row in rows] def get_jules_session(self, session_name): """Retrieves a Jules session by name.""" query = """ SELECT id, session_name, task_id, status, pr_url, created_at, last_polled_at FROM jules_sessions WHERE session_name = ? """ row = self.conn.execute(query, (session_name,)).fetchone() if row: return dict(row) return None def update_jules_session_status(self, session_name, status): """Updates the status of a Jules session.""" query = """ UPDATE jules_sessions SET status = ?, last_polled_at = CURRENT_TIMESTAMP WHERE session_name = ? """ self.conn.execute(query, (status, session_name)) self.conn.commit() def update_jules_session_pr_url(self, session_name, pr_url): """Updates the PR URL and status of a completed Jules session.""" query = """ UPDATE jules_sessions SET pr_url = ?, status = 'completed', last_polled_at = CURRENT_TIMESTAMP WHERE session_name = ? """ self.conn.execute(query, (pr_url, session_name)) self.conn.commit() def update_jules_session_failed(self, session_name): """Marks a Jules session as failed.""" query = """ UPDATE jules_sessions SET status = 'failed', last_polled_at = CURRENT_TIMESTAMP WHERE session_name = ? """ self.conn.execute(query, (session_name,)) self.conn.commit()