JARVIS / memory.py
Khanna, Videh Rakesh Rakesh
feat: security hardening, transcription mode, 15 new features, Gradio UI
0191dfa
"""JARVIS Memory System β€” persistent SQLite-backed memory with cloud sync."""
import os
import re
import sqlite3
import json
import asyncio
import logging
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
_log = logging.getLogger("jarvis.memory")
DB_PATH = Path(__file__).parent / "jarvis_memory.db"
# Encryption key from environment (optional β€” if not set, no encryption)
_ENCRYPTION_KEY = os.environ.get("JARVIS_DB_KEY", "")
# Validate encryption key at startup β€” reject unsafe characters to prevent SQL injection
if _ENCRYPTION_KEY and not re.match(r'^[a-zA-Z0-9_\-+=/.]+$', _ENCRYPTION_KEY):
_log.error("JARVIS_DB_KEY contains unsafe characters. Only alphanumeric, _, -, +, =, /, . allowed.")
raise SystemExit("Invalid JARVIS_DB_KEY β€” contains unsafe characters")
def get_db():
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
if _ENCRYPTION_KEY:
try:
# Use hex-encoded key to avoid SQL injection via single quotes
hex_key = _ENCRYPTION_KEY.encode().hex()
conn.execute(f"PRAGMA key=\"x'{hex_key}'\"")
except Exception:
pass # Standard sqlite3 doesn't support PRAGMA key
return conn
@contextmanager
def _db():
"""Context manager that guarantees connection is closed."""
conn = get_db()
try:
yield conn
finally:
conn.close()
def init_db():
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id INTEGER,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (conversation_id) REFERENCES conversations(id)
);
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
UNIQUE(category, key)
);
CREATE TABLE IF NOT EXISTS deferred_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT DEFAULT '',
task_type TEXT DEFAULT 'general',
status TEXT DEFAULT 'pending',
priority INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
metadata TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_messages_conv ON messages(conversation_id);
CREATE INDEX IF NOT EXISTS idx_memories_cat ON memories(category);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON deferred_tasks(status);
CREATE TABLE IF NOT EXISTS command_analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
command_type TEXT NOT NULL,
tool_name TEXT DEFAULT '',
query_text TEXT DEFAULT '',
hour_of_day INTEGER DEFAULT 0,
day_of_week INTEGER DEFAULT 0,
user_id TEXT DEFAULT 'default',
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_analytics_type ON command_analytics(command_type);
CREATE INDEX IF NOT EXISTS idx_analytics_hour ON command_analytics(hour_of_day);
""")
conn.commit()
conn.close()
class Memory:
def __init__(self):
init_db()
self._cloud = None
def _get_cloud(self):
if self._cloud is None:
try:
from cloud_db import CloudDB
self._cloud = CloudDB()
except Exception:
pass
return self._cloud
def _sync_to_cloud(self, category: str, key: str, value: str, user_id: str = "default"):
"""Best-effort async sync of a memory item to cloud."""
cloud = self._get_cloud()
if cloud is None:
return
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(cloud.save_memory(user_id, category, key, value))
else:
loop.run_until_complete(cloud.save_memory(user_id, category, key, value))
except RuntimeError:
try:
asyncio.run(cloud.save_memory(user_id, category, key, value))
except Exception as e:
_log.debug(f"Cloud memory sync failed: {e}")
def save_memory(self, category: str, key: str, value: str):
with _db() as conn:
conn.execute(
"""INSERT INTO memories (category, key, value)
VALUES (?, ?, ?)
ON CONFLICT(category, key) DO UPDATE SET
value=excluded.value, updated_at=datetime('now')""",
(category, key, value),
)
conn.commit()
self._sync_to_cloud(category, key, value)
def get_memories(self, category: str = None) -> list[dict]:
with _db() as conn:
if category:
rows = conn.execute(
"SELECT * FROM memories WHERE category=? ORDER BY updated_at DESC",
(category,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM memories ORDER BY updated_at DESC"
).fetchall()
return [dict(r) for r in rows]
def search_memories(self, query: str) -> list[dict]:
with _db() as conn:
rows = conn.execute(
"SELECT * FROM memories WHERE value LIKE ? OR key LIKE ? ORDER BY updated_at DESC",
(f"%{query}%", f"%{query}%"),
).fetchall()
return [dict(r) for r in rows]
def delete_memory(self, memory_id: int):
with _db() as conn:
conn.execute("DELETE FROM memories WHERE id=?", (memory_id,))
conn.commit()
# Conversation management
def create_conversation(self, title: str = "New Conversation") -> int:
with _db() as conn:
cur = conn.execute(
"INSERT INTO conversations (title) VALUES (?)", (title,)
)
conv_id = cur.lastrowid
conn.commit()
return conv_id
def add_message(self, conversation_id: int, role: str, content: str):
with _db() as conn:
conn.execute(
"INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)",
(conversation_id, role, content),
)
conn.execute(
"UPDATE conversations SET updated_at=datetime('now') WHERE id=?",
(conversation_id,),
)
conn.commit()
def get_messages(self, conversation_id: int, limit: int = 50) -> list[dict]:
with _db() as conn:
rows = conn.execute(
"""SELECT role, content, created_at FROM messages
WHERE conversation_id=? ORDER BY id DESC LIMIT ?""",
(conversation_id, limit),
).fetchall()
return [dict(r) for r in reversed(rows)]
def get_conversations(self, limit: int = 20) -> list[dict]:
with _db() as conn:
rows = conn.execute(
"SELECT * FROM conversations ORDER BY updated_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def get_context_summary(self) -> str:
"""Get a summary of stored memories for system prompt injection."""
memories = self.get_memories()
if not memories:
return ""
lines = ["\n[LONG-TERM MEMORY]"]
for m in memories[:20]:
lines.append(f"- [{m['category']}] {m['key']}: {m['value']}")
return "\n".join(lines)
# ── Deferred Task Queue ──────────────────────────────────
def add_task(self, title: str, description: str = "", task_type: str = "general",
priority: int = 0, metadata: dict = None) -> int:
with _db() as conn:
cur = conn.execute(
"""INSERT INTO deferred_tasks (title, description, task_type, priority, metadata)
VALUES (?, ?, ?, ?, ?)""",
(title, description, task_type, priority, json.dumps(metadata or {})),
)
task_id = cur.lastrowid
conn.commit()
return task_id
def get_pending_tasks(self) -> list[dict]:
with _db() as conn:
rows = conn.execute(
"""SELECT * FROM deferred_tasks WHERE status='pending'
ORDER BY priority DESC, created_at ASC"""
).fetchall()
return [dict(r) for r in rows]
def get_task(self, task_id: int) -> dict | None:
with _db() as conn:
row = conn.execute(
"SELECT * FROM deferred_tasks WHERE id=?", (task_id,)
).fetchone()
return dict(row) if row else None
def update_task_status(self, task_id: int, status: str) -> bool:
with _db() as conn:
completed_at = datetime.now().isoformat() if status == "completed" else None
conn.execute(
"""UPDATE deferred_tasks SET status=?, updated_at=datetime('now'),
completed_at=COALESCE(?, completed_at) WHERE id=?""",
(status, completed_at, task_id),
)
conn.commit()
return True
def delete_task(self, task_id: int) -> bool:
with _db() as conn:
conn.execute("DELETE FROM deferred_tasks WHERE id=?", (task_id,))
conn.commit()
return True
def get_all_tasks(self, include_completed: bool = False) -> list[dict]:
with _db() as conn:
if include_completed:
rows = conn.execute(
"SELECT * FROM deferred_tasks ORDER BY status ASC, priority DESC, created_at ASC"
).fetchall()
else:
rows = conn.execute(
"""SELECT * FROM deferred_tasks WHERE status != 'completed'
ORDER BY priority DESC, created_at ASC"""
).fetchall()
return [dict(r) for r in rows]
def get_pending_tasks_summary(self) -> str:
"""Get a formatted summary of pending tasks for greeting."""
tasks = self.get_pending_tasks()
if not tasks:
return ""
lines = [f"You have {len(tasks)} pending task(s):"]
for t in tasks:
prio = {2: "HIGH", 1: "MED", 0: ""}.get(t["priority"], "")
prio_str = f" [{prio}]" if prio else ""
lines.append(f" β€’ {t['title']}{prio_str}")
if t["description"]:
lines.append(f" {t['description'][:80]}")
return "\n".join(lines)
# ── Command Analytics ────────────────────────────────────
def track_command(self, command_type: str, tool_name: str = "", query_text: str = "", user_id: str = "default"):
"""Record a command for pattern analysis."""
now = datetime.now()
with _db() as conn:
conn.execute(
"""INSERT INTO command_analytics (command_type, tool_name, query_text,
hour_of_day, day_of_week, user_id)
VALUES (?, ?, ?, ?, ?, ?)""",
(command_type, tool_name, query_text[:200], now.hour, now.weekday(), user_id),
)
conn.commit()
def get_command_patterns(self, user_id: str = "default", days: int = 30) -> dict:
"""Analyze command usage patterns for habit learning."""
with _db() as conn:
# Most used tools
top_tools = conn.execute(
"""SELECT tool_name, COUNT(*) as cnt FROM command_analytics
WHERE user_id=? AND tool_name != '' AND created_at >= datetime('now', ?)
GROUP BY tool_name ORDER BY cnt DESC LIMIT 10""",
(user_id, f"-{days} days"),
).fetchall()
# Peak usage hours
peak_hours = conn.execute(
"""SELECT hour_of_day, COUNT(*) as cnt FROM command_analytics
WHERE user_id=? AND created_at >= datetime('now', ?)
GROUP BY hour_of_day ORDER BY cnt DESC LIMIT 5""",
(user_id, f"-{days} days"),
).fetchall()
# Total commands
total = conn.execute(
"""SELECT COUNT(*) FROM command_analytics
WHERE user_id=? AND created_at >= datetime('now', ?)""",
(user_id, f"-{days} days"),
).fetchone()[0]
return {
"total_commands": total,
"top_tools": [{"tool": r["tool_name"], "count": r["cnt"]} for r in top_tools],
"peak_hours": [{"hour": r["hour_of_day"], "count": r["cnt"]} for r in peak_hours],
}