Spaces:
Runtime error
Runtime error
| import sqlite3 | |
| import json | |
| import os | |
| from pathlib import Path | |
| class MemoryStore: | |
| """ | |
| Python wrapper for the SQLite database used by the Synesthesia Conductor. | |
| Matches the schema in extensions/synesthesia-conductor/internal/memory/store.go. | |
| """ | |
| def __init__(self, db_path=None): | |
| if db_path is None: | |
| db_path = os.getenv("DATABASE_PATH", "./data/orchestration.db") | |
| self.db_path = Path(db_path) | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| self.conn = sqlite3.connect(str(self.db_path)) | |
| self.conn.row_factory = sqlite3.Row | |
| self.conn.execute("PRAGMA foreign_keys = ON;") | |
| self._create_tables() | |
| def _create_tables(self): | |
| schema = """ | |
| CREATE TABLE IF NOT EXISTS tasks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| task_id TEXT UNIQUE NOT NULL, | |
| description TEXT, | |
| status TEXT DEFAULT 'pending', | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| updated_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS task_events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| task_id TEXT NOT NULL, | |
| event_type TEXT NOT NULL, | |
| payload TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (task_id) REFERENCES tasks(task_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS flow_runs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| flow_name TEXT NOT NULL, | |
| status TEXT NOT NULL, | |
| metadata TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS agent_decisions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| agent_name TEXT NOT NULL, | |
| task_id TEXT, | |
| decision TEXT NOT NULL, | |
| reasoning TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS improvement_proposals ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| description TEXT, | |
| status TEXT DEFAULT 'pending', | |
| failure_ref TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS jules_sessions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| session_name TEXT UNIQUE NOT NULL, | |
| task_id TEXT, | |
| status TEXT DEFAULT 'active', | |
| pr_url TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| last_polled_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_task_events_task_id ON task_events(task_id); | |
| CREATE INDEX IF NOT EXISTS idx_task_events_type ON task_events(event_type); | |
| CREATE INDEX IF NOT EXISTS idx_flow_runs_flow_name ON flow_runs(flow_name); | |
| CREATE INDEX IF NOT EXISTS idx_agent_decisions_task_id ON agent_decisions(task_id); | |
| CREATE INDEX IF NOT EXISTS idx_improvement_status ON improvement_proposals(status); | |
| CREATE INDEX IF NOT EXISTS idx_jules_sessions_status ON jules_sessions(status); | |
| CREATE INDEX IF NOT EXISTS idx_jules_sessions_task_id ON jules_sessions(task_id); | |
| -- Optimized for SearchSimilarFailures | |
| CREATE INDEX IF NOT EXISTS idx_task_events_created_at ON task_events(created_at); | |
| CREATE INDEX IF NOT EXISTS idx_flow_runs_created_at ON flow_runs(created_at); | |
| """ | |
| self.conn.executescript(schema) | |
| self.conn.commit() | |
| def close(self): | |
| if self.conn: | |
| self.conn.close() | |
| self.conn = None | |
| def record_task(self, task_id, description, status): | |
| """Creates or updates a task record.""" | |
| query = """ | |
| INSERT INTO tasks (task_id, description, status) | |
| VALUES (?, ?, ?) | |
| ON CONFLICT(task_id) DO UPDATE SET | |
| status = excluded.status, | |
| updated_at = CURRENT_TIMESTAMP | |
| """ | |
| self.conn.execute(query, (task_id, description, status)) | |
| self.conn.commit() | |
| def record_task_event(self, task_id, event_type, payload): | |
| """Records an event for a task with the given payload.""" | |
| payload_json = json.dumps(payload) | |
| query = "INSERT INTO task_events (task_id, event_type, payload) VALUES (?, ?, ?)" | |
| self.conn.execute(query, (task_id, event_type, payload_json)) | |
| self.conn.commit() | |
| def record_flow_run(self, flow_name, status, metadata): | |
| """Records a flow execution with the given status and metadata.""" | |
| metadata_json = json.dumps(metadata) | |
| query = "INSERT INTO flow_runs (flow_name, status, metadata) VALUES (?, ?, ?)" | |
| self.conn.execute(query, (flow_name, status, metadata_json)) | |
| self.conn.commit() | |
| def record_agent_decision(self, agent_name, task_id, decision, reasoning): | |
| """Records a decision made by an agent.""" | |
| query = """ | |
| INSERT INTO agent_decisions (agent_name, task_id, decision, reasoning) | |
| VALUES (?, ?, ?, ?) | |
| """ | |
| self.conn.execute(query, (agent_name, task_id, decision, reasoning)) | |
| self.conn.commit() | |
| def record_improvement_proposal(self, title, description, failure_ref): | |
| """Records a new improvement proposal.""" | |
| query = """ | |
| INSERT INTO improvement_proposals (title, description, failure_ref, status) | |
| VALUES (?, ?, ?, 'pending') | |
| """ | |
| self.conn.execute(query, (title, description, failure_ref)) | |
| self.conn.commit() | |
| def record_jules_session(self, session_name, task_id): | |
| """Records a new Jules session.""" | |
| query = """ | |
| INSERT INTO jules_sessions (session_name, task_id, status) | |
| VALUES (?, ?, 'active') | |
| ON CONFLICT(session_name) DO UPDATE SET | |
| task_id = excluded.task_id, | |
| last_polled_at = CURRENT_TIMESTAMP | |
| """ | |
| self.conn.execute(query, (session_name, task_id)) | |
| self.conn.commit() | |
| def get_task(self, task_id): | |
| """Retrieves a task by ID.""" | |
| query = "SELECT status, description FROM tasks WHERE task_id = ?" | |
| row = self.conn.execute(query, (task_id,)).fetchone() | |
| if row: | |
| return dict(row) | |
| return None | |
| def get_task_history(self, task_id): | |
| """Retrieves all events for a specific task, ordered by creation time.""" | |
| query = """ | |
| SELECT id, task_id, event_type, payload, created_at | |
| FROM task_events | |
| WHERE task_id = ? | |
| ORDER BY created_at ASC | |
| """ | |
| rows = self.conn.execute(query, (task_id,)).fetchall() | |
| return [dict(row) for row in rows] | |
| def get_agent_decisions(self, task_id=None): | |
| """Retrieves agent decisions, optionally filtered by task ID.""" | |
| if task_id: | |
| query = """ | |
| SELECT id, agent_name, task_id, decision, reasoning, created_at | |
| FROM agent_decisions | |
| WHERE task_id = ? | |
| ORDER BY created_at ASC | |
| """ | |
| rows = self.conn.execute(query, (task_id,)).fetchall() | |
| else: | |
| query = """ | |
| SELECT id, agent_name, task_id, decision, reasoning, created_at | |
| FROM agent_decisions | |
| ORDER BY created_at ASC | |
| """ | |
| rows = self.conn.execute(query).fetchall() | |
| return [dict(row) for row in rows] | |
| def get_improvement_proposals(self, status=None): | |
| """Retrieves improvement proposals, optionally filtered by status.""" | |
| if status: | |
| query = """ | |
| SELECT id, title, description, status, failure_ref, created_at | |
| FROM improvement_proposals | |
| WHERE status = ? | |
| ORDER BY created_at DESC | |
| """ | |
| rows = self.conn.execute(query, (status,)).fetchall() | |
| else: | |
| query = """ | |
| SELECT id, title, description, status, failure_ref, created_at | |
| FROM improvement_proposals | |
| ORDER BY created_at DESC | |
| """ | |
| rows = self.conn.execute(query).fetchall() | |
| return [dict(row) for row in rows] | |
| def search_similar_failures(self, description): | |
| """ | |
| Searches for similar failure events using keyword matching. | |
| Returns up to 10 matching failures ordered by recency. | |
| """ | |
| query = """ | |
| SELECT te.id, te.task_id, te.payload as description, COALESCE(fr.flow_name, 'unknown') as flow_name, te.created_at | |
| FROM task_events te | |
| LEFT JOIN flow_runs fr ON te.created_at BETWEEN | |
| datetime(fr.created_at, '-5 minutes') AND datetime(fr.created_at, '+5 minutes') | |
| WHERE te.event_type = 'failure' | |
| AND te.payload LIKE ? | |
| ORDER BY te.created_at DESC | |
| LIMIT 10 | |
| """ | |
| like_pattern = f"%{description}%" | |
| rows = self.conn.execute(query, (like_pattern,)).fetchall() | |
| return [dict(row) for row in rows] | |
| def get_active_jules_sessions(self): | |
| """Retrieves all active Jules sessions.""" | |
| query = """ | |
| SELECT id, session_name, task_id, status, pr_url, created_at, last_polled_at | |
| FROM jules_sessions | |
| WHERE status = 'active' | |
| ORDER BY created_at ASC | |
| """ | |
| rows = self.conn.execute(query).fetchall() | |
| return [dict(row) for row in rows] | |
| def get_jules_session(self, session_name): | |
| """Retrieves a Jules session by name.""" | |
| query = """ | |
| SELECT id, session_name, task_id, status, pr_url, created_at, last_polled_at | |
| FROM jules_sessions | |
| WHERE session_name = ? | |
| """ | |
| row = self.conn.execute(query, (session_name,)).fetchone() | |
| if row: | |
| return dict(row) | |
| return None | |
| def update_jules_session_status(self, session_name, status): | |
| """Updates the status of a Jules session.""" | |
| query = """ | |
| UPDATE jules_sessions | |
| SET status = ?, last_polled_at = CURRENT_TIMESTAMP | |
| WHERE session_name = ? | |
| """ | |
| self.conn.execute(query, (status, session_name)) | |
| self.conn.commit() | |
| def update_jules_session_pr_url(self, session_name, pr_url): | |
| """Updates the PR URL and status of a completed Jules session.""" | |
| query = """ | |
| UPDATE jules_sessions | |
| SET pr_url = ?, status = 'completed', last_polled_at = CURRENT_TIMESTAMP | |
| WHERE session_name = ? | |
| """ | |
| self.conn.execute(query, (pr_url, session_name)) | |
| self.conn.commit() | |
| def update_jules_session_failed(self, session_name): | |
| """Marks a Jules session as failed.""" | |
| query = """ | |
| UPDATE jules_sessions | |
| SET status = 'failed', last_polled_at = CURRENT_TIMESTAMP | |
| WHERE session_name = ? | |
| """ | |
| self.conn.execute(query, (session_name,)) | |
| self.conn.commit() | |