"""JARVIS Memory System — persistent SQLite-backed memory with cloud sync.""" import os import re import sqlite3 import json import asyncio import logging from contextlib import contextmanager from datetime import datetime from pathlib import Path _log = logging.getLogger("jarvis.memory") DB_PATH = Path(__file__).parent / "jarvis_memory.db" # Encryption key from environment (optional — if not set, no encryption) _ENCRYPTION_KEY = os.environ.get("JARVIS_DB_KEY", "") # Validate encryption key at startup — reject unsafe characters to prevent SQL injection if _ENCRYPTION_KEY and not re.match(r'^[a-zA-Z0-9_\-+=/.]+$', _ENCRYPTION_KEY): _log.error("JARVIS_DB_KEY contains unsafe characters. Only alphanumeric, _, -, +, =, /, . allowed.") raise SystemExit("Invalid JARVIS_DB_KEY — contains unsafe characters") def get_db(): conn = sqlite3.connect(str(DB_PATH), timeout=10) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") if _ENCRYPTION_KEY: try: # Use hex-encoded key to avoid SQL injection via single quotes hex_key = _ENCRYPTION_KEY.encode().hex() conn.execute(f"PRAGMA key=\"x'{hex_key}'\"") except Exception: pass # Standard sqlite3 doesn't support PRAGMA key return conn @contextmanager def _db(): """Context manager that guarantees connection is closed.""" conn = get_db() try: yield conn finally: conn.close() def init_db(): conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id INTEGER, role TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), FOREIGN KEY (conversation_id) REFERENCES conversations(id) ); CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), UNIQUE(category, key) ); CREATE TABLE IF NOT EXISTS deferred_tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT DEFAULT '', task_type TEXT DEFAULT 'general', status TEXT DEFAULT 'pending', priority INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), completed_at TEXT, metadata TEXT DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS idx_messages_conv ON messages(conversation_id); CREATE INDEX IF NOT EXISTS idx_memories_cat ON memories(category); CREATE INDEX IF NOT EXISTS idx_tasks_status ON deferred_tasks(status); CREATE TABLE IF NOT EXISTS command_analytics ( id INTEGER PRIMARY KEY AUTOINCREMENT, command_type TEXT NOT NULL, tool_name TEXT DEFAULT '', query_text TEXT DEFAULT '', hour_of_day INTEGER DEFAULT 0, day_of_week INTEGER DEFAULT 0, user_id TEXT DEFAULT 'default', created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_analytics_type ON command_analytics(command_type); CREATE INDEX IF NOT EXISTS idx_analytics_hour ON command_analytics(hour_of_day); """) conn.commit() conn.close() class Memory: def __init__(self): init_db() self._cloud = None def _get_cloud(self): if self._cloud is None: try: from cloud_db import CloudDB self._cloud = CloudDB() except Exception: pass return self._cloud def _sync_to_cloud(self, category: str, key: str, value: str, user_id: str = "default"): """Best-effort async sync of a memory item to cloud.""" cloud = self._get_cloud() if cloud is None: return try: loop = asyncio.get_event_loop() if loop.is_running(): asyncio.ensure_future(cloud.save_memory(user_id, category, key, value)) else: loop.run_until_complete(cloud.save_memory(user_id, category, key, value)) except RuntimeError: try: asyncio.run(cloud.save_memory(user_id, category, key, value)) except Exception as e: _log.debug(f"Cloud memory sync failed: {e}") def save_memory(self, category: str, key: str, value: str): with _db() as conn: conn.execute( """INSERT INTO memories (category, key, value) VALUES (?, ?, ?) ON CONFLICT(category, key) DO UPDATE SET value=excluded.value, updated_at=datetime('now')""", (category, key, value), ) conn.commit() self._sync_to_cloud(category, key, value) def get_memories(self, category: str = None) -> list[dict]: with _db() as conn: if category: rows = conn.execute( "SELECT * FROM memories WHERE category=? ORDER BY updated_at DESC", (category,), ).fetchall() else: rows = conn.execute( "SELECT * FROM memories ORDER BY updated_at DESC" ).fetchall() return [dict(r) for r in rows] def search_memories(self, query: str) -> list[dict]: with _db() as conn: rows = conn.execute( "SELECT * FROM memories WHERE value LIKE ? OR key LIKE ? ORDER BY updated_at DESC", (f"%{query}%", f"%{query}%"), ).fetchall() return [dict(r) for r in rows] def delete_memory(self, memory_id: int): with _db() as conn: conn.execute("DELETE FROM memories WHERE id=?", (memory_id,)) conn.commit() # Conversation management def create_conversation(self, title: str = "New Conversation") -> int: with _db() as conn: cur = conn.execute( "INSERT INTO conversations (title) VALUES (?)", (title,) ) conv_id = cur.lastrowid conn.commit() return conv_id def add_message(self, conversation_id: int, role: str, content: str): with _db() as conn: conn.execute( "INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)", (conversation_id, role, content), ) conn.execute( "UPDATE conversations SET updated_at=datetime('now') WHERE id=?", (conversation_id,), ) conn.commit() def get_messages(self, conversation_id: int, limit: int = 50) -> list[dict]: with _db() as conn: rows = conn.execute( """SELECT role, content, created_at FROM messages WHERE conversation_id=? ORDER BY id DESC LIMIT ?""", (conversation_id, limit), ).fetchall() return [dict(r) for r in reversed(rows)] def get_conversations(self, limit: int = 20) -> list[dict]: with _db() as conn: rows = conn.execute( "SELECT * FROM conversations ORDER BY updated_at DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] def get_context_summary(self) -> str: """Get a summary of stored memories for system prompt injection.""" memories = self.get_memories() if not memories: return "" lines = ["\n[LONG-TERM MEMORY]"] for m in memories[:20]: lines.append(f"- [{m['category']}] {m['key']}: {m['value']}") return "\n".join(lines) # ── Deferred Task Queue ────────────────────────────────── def add_task(self, title: str, description: str = "", task_type: str = "general", priority: int = 0, metadata: dict = None) -> int: with _db() as conn: cur = conn.execute( """INSERT INTO deferred_tasks (title, description, task_type, priority, metadata) VALUES (?, ?, ?, ?, ?)""", (title, description, task_type, priority, json.dumps(metadata or {})), ) task_id = cur.lastrowid conn.commit() return task_id def get_pending_tasks(self) -> list[dict]: with _db() as conn: rows = conn.execute( """SELECT * FROM deferred_tasks WHERE status='pending' ORDER BY priority DESC, created_at ASC""" ).fetchall() return [dict(r) for r in rows] def get_task(self, task_id: int) -> dict | None: with _db() as conn: row = conn.execute( "SELECT * FROM deferred_tasks WHERE id=?", (task_id,) ).fetchone() return dict(row) if row else None def update_task_status(self, task_id: int, status: str) -> bool: with _db() as conn: completed_at = datetime.now().isoformat() if status == "completed" else None conn.execute( """UPDATE deferred_tasks SET status=?, updated_at=datetime('now'), completed_at=COALESCE(?, completed_at) WHERE id=?""", (status, completed_at, task_id), ) conn.commit() return True def delete_task(self, task_id: int) -> bool: with _db() as conn: conn.execute("DELETE FROM deferred_tasks WHERE id=?", (task_id,)) conn.commit() return True def get_all_tasks(self, include_completed: bool = False) -> list[dict]: with _db() as conn: if include_completed: rows = conn.execute( "SELECT * FROM deferred_tasks ORDER BY status ASC, priority DESC, created_at ASC" ).fetchall() else: rows = conn.execute( """SELECT * FROM deferred_tasks WHERE status != 'completed' ORDER BY priority DESC, created_at ASC""" ).fetchall() return [dict(r) for r in rows] def get_pending_tasks_summary(self) -> str: """Get a formatted summary of pending tasks for greeting.""" tasks = self.get_pending_tasks() if not tasks: return "" lines = [f"You have {len(tasks)} pending task(s):"] for t in tasks: prio = {2: "HIGH", 1: "MED", 0: ""}.get(t["priority"], "") prio_str = f" [{prio}]" if prio else "" lines.append(f" • {t['title']}{prio_str}") if t["description"]: lines.append(f" {t['description'][:80]}") return "\n".join(lines) # ── Command Analytics ──────────────────────────────────── def track_command(self, command_type: str, tool_name: str = "", query_text: str = "", user_id: str = "default"): """Record a command for pattern analysis.""" now = datetime.now() with _db() as conn: conn.execute( """INSERT INTO command_analytics (command_type, tool_name, query_text, hour_of_day, day_of_week, user_id) VALUES (?, ?, ?, ?, ?, ?)""", (command_type, tool_name, query_text[:200], now.hour, now.weekday(), user_id), ) conn.commit() def get_command_patterns(self, user_id: str = "default", days: int = 30) -> dict: """Analyze command usage patterns for habit learning.""" with _db() as conn: # Most used tools top_tools = conn.execute( """SELECT tool_name, COUNT(*) as cnt FROM command_analytics WHERE user_id=? AND tool_name != '' AND created_at >= datetime('now', ?) GROUP BY tool_name ORDER BY cnt DESC LIMIT 10""", (user_id, f"-{days} days"), ).fetchall() # Peak usage hours peak_hours = conn.execute( """SELECT hour_of_day, COUNT(*) as cnt FROM command_analytics WHERE user_id=? AND created_at >= datetime('now', ?) GROUP BY hour_of_day ORDER BY cnt DESC LIMIT 5""", (user_id, f"-{days} days"), ).fetchall() # Total commands total = conn.execute( """SELECT COUNT(*) FROM command_analytics WHERE user_id=? AND created_at >= datetime('now', ?)""", (user_id, f"-{days} days"), ).fetchone()[0] return { "total_commands": total, "top_tools": [{"tool": r["tool_name"], "count": r["cnt"]} for r in top_tools], "peak_hours": [{"hour": r["hour_of_day"], "count": r["cnt"]} for r in peak_hours], }