""" Conversation Memory System for ScottzillaSystems Self-contained — no external APIs needed Uses SQLite + sentence-transformers for local embeddings Replaces the fraudulent MemPalace project with a real working system. """ import os import json import sqlite3 import hashlib from datetime import datetime from typing import List, Dict, Optional try: from sentence_transformers import SentenceTransformer EMBEDDING_MODEL = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') USE_VECTOR_SEARCH = True except Exception: EMBEDDING_MODEL = None USE_VECTOR_SEARCH = False print("[Memory] sentence-transformers not available, using text search fallback") class ConversationMemory: def __init__(self, db_path: str = "./memory_db/conversations.db", user_id: str = "scottzilla"): self.db_path = db_path self.user_id = user_id os.makedirs(os.path.dirname(db_path), exist_ok=True) self.conn = sqlite3.connect(db_path) self._init_db() def _init_db(self): self.conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, thread_id TEXT NOT NULL, timestamp TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, content_hash TEXT NOT NULL, metadata TEXT, embedding BLOB ) """) self.conn.execute("CREATE INDEX IF NOT EXISTS idx_user_thread ON memories(user_id, thread_id, timestamp)") self.conn.execute("CREATE INDEX IF NOT EXISTS idx_content ON memories(content)") self.conn.commit() def _get_embedding(self, text: str) -> Optional[bytes]: if not USE_VECTOR_SEARCH or EMBEDDING_MODEL is None: return None try: embedding = EMBEDDING_MODEL.encode(text, convert_to_numpy=True) return embedding.tobytes() except Exception as e: print(f"[Memory] Embedding error: {e}") return None def save_message(self, role: str, content: str, thread_id: str, metadata: Optional[Dict] = None) -> Dict: timestamp = datetime.utcnow().isoformat() content_hash = hashlib.sha256(content.encode()).hexdigest()[:16] embedding = self._get_embedding(content) meta_json = json.dumps(metadata or {}) cursor = self.conn.execute( "INSERT INTO memories (user_id, thread_id, timestamp, role, content, content_hash, metadata, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (self.user_id, thread_id, timestamp, role, content, content_hash, meta_json, embedding) ) self.conn.commit() return {"id": cursor.lastrowid, "thread_id": thread_id, "timestamp": timestamp, "role": role, "content": content, "metadata": metadata or {}} def save_conversation(self, messages: List[Dict], thread_id: str, title: Optional[str] = None) -> List[Dict]: results = [] for msg in messages: result = self.save_message( role=msg.get("role", "unknown"), content=msg.get("content", ""), thread_id=thread_id, metadata={"title": title, **msg.get("metadata", {})} ) results.append(result) return results def get_thread(self, thread_id: str, limit: int = 1000) -> List[Dict]: cursor = self.conn.execute( "SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? ORDER BY timestamp ASC LIMIT ?", (self.user_id, thread_id, limit) ) rows = cursor.fetchall() return [{"id": row[0], "timestamp": row[1], "role": row[2], "content": row[3], "metadata": json.loads(row[4])} for row in rows] def search(self, query: str, thread_id: Optional[str] = None, limit: int = 20) -> List[Dict]: if thread_id: cursor = self.conn.execute( "SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?", (self.user_id, thread_id, f"%{query}%", limit) ) rows = cursor.fetchall() return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4])} for r in rows] else: cursor = self.conn.execute( "SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?", (self.user_id, f"%{query}%", limit) ) rows = cursor.fetchall() return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in rows] def get_all_threads(self) -> List[Dict]: cursor = self.conn.execute( "SELECT thread_id, COUNT(*) as msg_count, MIN(timestamp) as started, MAX(timestamp) as last_msg FROM memories WHERE user_id = ? GROUP BY thread_id ORDER BY last_msg DESC", (self.user_id,) ) rows = cursor.fetchall() return [{"thread_id": row[0], "message_count": row[1], "started": row[2], "last_message": row[3]} for row in rows] def export_to_json(self, filepath: str, thread_id: Optional[str] = None): if thread_id: memories = self.get_thread(thread_id) else: cursor = self.conn.execute("SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? ORDER BY timestamp", (self.user_id,)) memories = [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in cursor.fetchall()] with open(filepath, 'w') as f: json.dump(memories, f, indent=2) print(f"[Memory] Exported {len(memories)} memories to {filepath}") def export_to_markdown(self, filepath: str, thread_id: str): memories = self.get_thread(thread_id) with open(filepath, 'w') as f: f.write(f"# Conversation: {thread_id}\n\n*Exported: {datetime.utcnow().isoformat()}*\n\n---\n\n") for mem in memories: role = mem.get("role", "unknown") timestamp = mem.get("timestamp", "unknown") content = mem.get("content", "") emoji = "👤" if role == "user" else "🤖" if role == "assistant" else "📝" f.write(f"### {emoji} {role.title()} *({timestamp})*\n\n{content}\n\n---\n\n") print(f"[Memory] Exported conversation to {filepath}") def close(self): self.conn.close()