Synesthesia / runtime /memory_store.py
Ashiedu's picture
Sync unified workbench
0490201 verified
import sqlite3
import json
import os
from pathlib import Path
class MemoryStore:
"""
Python wrapper for the SQLite database used by the Synesthesia Conductor.
Matches the schema in extensions/synesthesia-conductor/internal/memory/store.go.
"""
def __init__(self, db_path=None):
if db_path is None:
db_path = os.getenv("DATABASE_PATH", "./data/orchestration.db")
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
self.conn.execute("PRAGMA foreign_keys = ON;")
self._create_tables()
def _create_tables(self):
schema = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT UNIQUE NOT NULL,
description TEXT,
status TEXT DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS task_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (task_id) REFERENCES tasks(task_id)
);
CREATE TABLE IF NOT EXISTS flow_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_name TEXT NOT NULL,
status TEXT NOT NULL,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS agent_decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT NOT NULL,
task_id TEXT,
decision TEXT NOT NULL,
reasoning TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS improvement_proposals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT 'pending',
failure_ref TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS jules_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_name TEXT UNIQUE NOT NULL,
task_id TEXT,
status TEXT DEFAULT 'active',
pr_url TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_polled_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_task_events_task_id ON task_events(task_id);
CREATE INDEX IF NOT EXISTS idx_task_events_type ON task_events(event_type);
CREATE INDEX IF NOT EXISTS idx_flow_runs_flow_name ON flow_runs(flow_name);
CREATE INDEX IF NOT EXISTS idx_agent_decisions_task_id ON agent_decisions(task_id);
CREATE INDEX IF NOT EXISTS idx_improvement_status ON improvement_proposals(status);
CREATE INDEX IF NOT EXISTS idx_jules_sessions_status ON jules_sessions(status);
CREATE INDEX IF NOT EXISTS idx_jules_sessions_task_id ON jules_sessions(task_id);
-- Optimized for SearchSimilarFailures
CREATE INDEX IF NOT EXISTS idx_task_events_created_at ON task_events(created_at);
CREATE INDEX IF NOT EXISTS idx_flow_runs_created_at ON flow_runs(created_at);
"""
self.conn.executescript(schema)
self.conn.commit()
def close(self):
if self.conn:
self.conn.close()
self.conn = None
def record_task(self, task_id, description, status):
"""Creates or updates a task record."""
query = """
INSERT INTO tasks (task_id, description, status)
VALUES (?, ?, ?)
ON CONFLICT(task_id) DO UPDATE SET
status = excluded.status,
updated_at = CURRENT_TIMESTAMP
"""
self.conn.execute(query, (task_id, description, status))
self.conn.commit()
def record_task_event(self, task_id, event_type, payload):
"""Records an event for a task with the given payload."""
payload_json = json.dumps(payload)
query = "INSERT INTO task_events (task_id, event_type, payload) VALUES (?, ?, ?)"
self.conn.execute(query, (task_id, event_type, payload_json))
self.conn.commit()
def record_flow_run(self, flow_name, status, metadata):
"""Records a flow execution with the given status and metadata."""
metadata_json = json.dumps(metadata)
query = "INSERT INTO flow_runs (flow_name, status, metadata) VALUES (?, ?, ?)"
self.conn.execute(query, (flow_name, status, metadata_json))
self.conn.commit()
def record_agent_decision(self, agent_name, task_id, decision, reasoning):
"""Records a decision made by an agent."""
query = """
INSERT INTO agent_decisions (agent_name, task_id, decision, reasoning)
VALUES (?, ?, ?, ?)
"""
self.conn.execute(query, (agent_name, task_id, decision, reasoning))
self.conn.commit()
def record_improvement_proposal(self, title, description, failure_ref):
"""Records a new improvement proposal."""
query = """
INSERT INTO improvement_proposals (title, description, failure_ref, status)
VALUES (?, ?, ?, 'pending')
"""
self.conn.execute(query, (title, description, failure_ref))
self.conn.commit()
def record_jules_session(self, session_name, task_id):
"""Records a new Jules session."""
query = """
INSERT INTO jules_sessions (session_name, task_id, status)
VALUES (?, ?, 'active')
ON CONFLICT(session_name) DO UPDATE SET
task_id = excluded.task_id,
last_polled_at = CURRENT_TIMESTAMP
"""
self.conn.execute(query, (session_name, task_id))
self.conn.commit()
def get_task(self, task_id):
"""Retrieves a task by ID."""
query = "SELECT status, description FROM tasks WHERE task_id = ?"
row = self.conn.execute(query, (task_id,)).fetchone()
if row:
return dict(row)
return None
def get_task_history(self, task_id):
"""Retrieves all events for a specific task, ordered by creation time."""
query = """
SELECT id, task_id, event_type, payload, created_at
FROM task_events
WHERE task_id = ?
ORDER BY created_at ASC
"""
rows = self.conn.execute(query, (task_id,)).fetchall()
return [dict(row) for row in rows]
def get_agent_decisions(self, task_id=None):
"""Retrieves agent decisions, optionally filtered by task ID."""
if task_id:
query = """
SELECT id, agent_name, task_id, decision, reasoning, created_at
FROM agent_decisions
WHERE task_id = ?
ORDER BY created_at ASC
"""
rows = self.conn.execute(query, (task_id,)).fetchall()
else:
query = """
SELECT id, agent_name, task_id, decision, reasoning, created_at
FROM agent_decisions
ORDER BY created_at ASC
"""
rows = self.conn.execute(query).fetchall()
return [dict(row) for row in rows]
def get_improvement_proposals(self, status=None):
"""Retrieves improvement proposals, optionally filtered by status."""
if status:
query = """
SELECT id, title, description, status, failure_ref, created_at
FROM improvement_proposals
WHERE status = ?
ORDER BY created_at DESC
"""
rows = self.conn.execute(query, (status,)).fetchall()
else:
query = """
SELECT id, title, description, status, failure_ref, created_at
FROM improvement_proposals
ORDER BY created_at DESC
"""
rows = self.conn.execute(query).fetchall()
return [dict(row) for row in rows]
def search_similar_failures(self, description):
"""
Searches for similar failure events using keyword matching.
Returns up to 10 matching failures ordered by recency.
"""
query = """
SELECT te.id, te.task_id, te.payload as description, COALESCE(fr.flow_name, 'unknown') as flow_name, te.created_at
FROM task_events te
LEFT JOIN flow_runs fr ON te.created_at BETWEEN
datetime(fr.created_at, '-5 minutes') AND datetime(fr.created_at, '+5 minutes')
WHERE te.event_type = 'failure'
AND te.payload LIKE ?
ORDER BY te.created_at DESC
LIMIT 10
"""
like_pattern = f"%{description}%"
rows = self.conn.execute(query, (like_pattern,)).fetchall()
return [dict(row) for row in rows]
def get_active_jules_sessions(self):
"""Retrieves all active Jules sessions."""
query = """
SELECT id, session_name, task_id, status, pr_url, created_at, last_polled_at
FROM jules_sessions
WHERE status = 'active'
ORDER BY created_at ASC
"""
rows = self.conn.execute(query).fetchall()
return [dict(row) for row in rows]
def get_jules_session(self, session_name):
"""Retrieves a Jules session by name."""
query = """
SELECT id, session_name, task_id, status, pr_url, created_at, last_polled_at
FROM jules_sessions
WHERE session_name = ?
"""
row = self.conn.execute(query, (session_name,)).fetchone()
if row:
return dict(row)
return None
def update_jules_session_status(self, session_name, status):
"""Updates the status of a Jules session."""
query = """
UPDATE jules_sessions
SET status = ?, last_polled_at = CURRENT_TIMESTAMP
WHERE session_name = ?
"""
self.conn.execute(query, (status, session_name))
self.conn.commit()
def update_jules_session_pr_url(self, session_name, pr_url):
"""Updates the PR URL and status of a completed Jules session."""
query = """
UPDATE jules_sessions
SET pr_url = ?, status = 'completed', last_polled_at = CURRENT_TIMESTAMP
WHERE session_name = ?
"""
self.conn.execute(query, (pr_url, session_name))
self.conn.commit()
def update_jules_session_failed(self, session_name):
"""Marks a Jules session as failed."""
query = """
UPDATE jules_sessions
SET status = 'failed', last_polled_at = CURRENT_TIMESTAMP
WHERE session_name = ?
"""
self.conn.execute(query, (session_name,))
self.conn.commit()