| """ |
| Conversation Memory System for ScottzillaSystems |
| Self-contained — no external APIs needed |
| Uses SQLite + sentence-transformers for local embeddings |
| Replaces the fraudulent MemPalace project with a real working system. |
| """ |
|
|
| import os |
| import json |
| import sqlite3 |
| import hashlib |
| from datetime import datetime |
| from typing import List, Dict, Optional |
|
|
| try: |
| from sentence_transformers import SentenceTransformer |
| EMBEDDING_MODEL = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
| USE_VECTOR_SEARCH = True |
| except Exception: |
| EMBEDDING_MODEL = None |
| USE_VECTOR_SEARCH = False |
| print("[Memory] sentence-transformers not available, using text search fallback") |
|
|
|
|
| class ConversationMemory: |
| def __init__(self, db_path: str = "./memory_db/conversations.db", user_id: str = "scottzilla"): |
| self.db_path = db_path |
| self.user_id = user_id |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) |
| self.conn = sqlite3.connect(db_path) |
| self._init_db() |
|
|
| def _init_db(self): |
| self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS memories ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id TEXT NOT NULL, |
| thread_id TEXT NOT NULL, |
| timestamp TEXT NOT NULL, |
| role TEXT NOT NULL, |
| content TEXT NOT NULL, |
| content_hash TEXT NOT NULL, |
| metadata TEXT, |
| embedding BLOB |
| ) |
| """) |
| self.conn.execute("CREATE INDEX IF NOT EXISTS idx_user_thread ON memories(user_id, thread_id, timestamp)") |
| self.conn.execute("CREATE INDEX IF NOT EXISTS idx_content ON memories(content)") |
| self.conn.commit() |
|
|
| def _get_embedding(self, text: str) -> Optional[bytes]: |
| if not USE_VECTOR_SEARCH or EMBEDDING_MODEL is None: |
| return None |
| try: |
| embedding = EMBEDDING_MODEL.encode(text, convert_to_numpy=True) |
| return embedding.tobytes() |
| except Exception as e: |
| print(f"[Memory] Embedding error: {e}") |
| return None |
|
|
| def save_message(self, role: str, content: str, thread_id: str, metadata: Optional[Dict] = None) -> Dict: |
| timestamp = datetime.utcnow().isoformat() |
| content_hash = hashlib.sha256(content.encode()).hexdigest()[:16] |
| embedding = self._get_embedding(content) |
| meta_json = json.dumps(metadata or {}) |
| cursor = self.conn.execute( |
| "INSERT INTO memories (user_id, thread_id, timestamp, role, content, content_hash, metadata, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", |
| (self.user_id, thread_id, timestamp, role, content, content_hash, meta_json, embedding) |
| ) |
| self.conn.commit() |
| return {"id": cursor.lastrowid, "thread_id": thread_id, "timestamp": timestamp, "role": role, "content": content, "metadata": metadata or {}} |
|
|
| def save_conversation(self, messages: List[Dict], thread_id: str, title: Optional[str] = None) -> List[Dict]: |
| results = [] |
| for msg in messages: |
| result = self.save_message( |
| role=msg.get("role", "unknown"), |
| content=msg.get("content", ""), |
| thread_id=thread_id, |
| metadata={"title": title, **msg.get("metadata", {})} |
| ) |
| results.append(result) |
| return results |
|
|
| def get_thread(self, thread_id: str, limit: int = 1000) -> List[Dict]: |
| cursor = self.conn.execute( |
| "SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? ORDER BY timestamp ASC LIMIT ?", |
| (self.user_id, thread_id, limit) |
| ) |
| rows = cursor.fetchall() |
| return [{"id": row[0], "timestamp": row[1], "role": row[2], "content": row[3], "metadata": json.loads(row[4])} for row in rows] |
|
|
| def search(self, query: str, thread_id: Optional[str] = None, limit: int = 20) -> List[Dict]: |
| if thread_id: |
| cursor = self.conn.execute( |
| "SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?", |
| (self.user_id, thread_id, f"%{query}%", limit) |
| ) |
| rows = cursor.fetchall() |
| return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4])} for r in rows] |
| else: |
| cursor = self.conn.execute( |
| "SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?", |
| (self.user_id, f"%{query}%", limit) |
| ) |
| rows = cursor.fetchall() |
| return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in rows] |
|
|
| def get_all_threads(self) -> List[Dict]: |
| cursor = self.conn.execute( |
| "SELECT thread_id, COUNT(*) as msg_count, MIN(timestamp) as started, MAX(timestamp) as last_msg FROM memories WHERE user_id = ? GROUP BY thread_id ORDER BY last_msg DESC", |
| (self.user_id,) |
| ) |
| rows = cursor.fetchall() |
| return [{"thread_id": row[0], "message_count": row[1], "started": row[2], "last_message": row[3]} for row in rows] |
|
|
| def export_to_json(self, filepath: str, thread_id: Optional[str] = None): |
| if thread_id: |
| memories = self.get_thread(thread_id) |
| else: |
| cursor = self.conn.execute("SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? ORDER BY timestamp", (self.user_id,)) |
| memories = [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in cursor.fetchall()] |
| with open(filepath, 'w') as f: |
| json.dump(memories, f, indent=2) |
| print(f"[Memory] Exported {len(memories)} memories to {filepath}") |
|
|
| def export_to_markdown(self, filepath: str, thread_id: str): |
| memories = self.get_thread(thread_id) |
| with open(filepath, 'w') as f: |
| f.write(f"# Conversation: {thread_id}\n\n*Exported: {datetime.utcnow().isoformat()}*\n\n---\n\n") |
| for mem in memories: |
| role = mem.get("role", "unknown") |
| timestamp = mem.get("timestamp", "unknown") |
| content = mem.get("content", "") |
| emoji = "👤" if role == "user" else "🤖" if role == "assistant" else "📝" |
| f.write(f"### {emoji} {role.title()} *({timestamp})*\n\n{content}\n\n---\n\n") |
| print(f"[Memory] Exported conversation to {filepath}") |
|
|
| def close(self): |
| self.conn.close() |
|
|