File size: 6,771 Bytes
ab886a2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
Conversation Memory System for ScottzillaSystems
Self-contained — no external APIs needed
Uses SQLite + sentence-transformers for local embeddings
Replaces the fraudulent MemPalace project with a real working system.
"""
import os
import json
import sqlite3
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
try:
from sentence_transformers import SentenceTransformer
EMBEDDING_MODEL = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
USE_VECTOR_SEARCH = True
except Exception:
EMBEDDING_MODEL = None
USE_VECTOR_SEARCH = False
print("[Memory] sentence-transformers not available, using text search fallback")
class ConversationMemory:
def __init__(self, db_path: str = "./memory_db/conversations.db", user_id: str = "scottzilla"):
self.db_path = db_path
self.user_id = user_id
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
thread_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
metadata TEXT,
embedding BLOB
)
""")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_user_thread ON memories(user_id, thread_id, timestamp)")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_content ON memories(content)")
self.conn.commit()
def _get_embedding(self, text: str) -> Optional[bytes]:
if not USE_VECTOR_SEARCH or EMBEDDING_MODEL is None:
return None
try:
embedding = EMBEDDING_MODEL.encode(text, convert_to_numpy=True)
return embedding.tobytes()
except Exception as e:
print(f"[Memory] Embedding error: {e}")
return None
def save_message(self, role: str, content: str, thread_id: str, metadata: Optional[Dict] = None) -> Dict:
timestamp = datetime.utcnow().isoformat()
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
embedding = self._get_embedding(content)
meta_json = json.dumps(metadata or {})
cursor = self.conn.execute(
"INSERT INTO memories (user_id, thread_id, timestamp, role, content, content_hash, metadata, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(self.user_id, thread_id, timestamp, role, content, content_hash, meta_json, embedding)
)
self.conn.commit()
return {"id": cursor.lastrowid, "thread_id": thread_id, "timestamp": timestamp, "role": role, "content": content, "metadata": metadata or {}}
def save_conversation(self, messages: List[Dict], thread_id: str, title: Optional[str] = None) -> List[Dict]:
results = []
for msg in messages:
result = self.save_message(
role=msg.get("role", "unknown"),
content=msg.get("content", ""),
thread_id=thread_id,
metadata={"title": title, **msg.get("metadata", {})}
)
results.append(result)
return results
def get_thread(self, thread_id: str, limit: int = 1000) -> List[Dict]:
cursor = self.conn.execute(
"SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? ORDER BY timestamp ASC LIMIT ?",
(self.user_id, thread_id, limit)
)
rows = cursor.fetchall()
return [{"id": row[0], "timestamp": row[1], "role": row[2], "content": row[3], "metadata": json.loads(row[4])} for row in rows]
def search(self, query: str, thread_id: Optional[str] = None, limit: int = 20) -> List[Dict]:
if thread_id:
cursor = self.conn.execute(
"SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?",
(self.user_id, thread_id, f"%{query}%", limit)
)
rows = cursor.fetchall()
return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4])} for r in rows]
else:
cursor = self.conn.execute(
"SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?",
(self.user_id, f"%{query}%", limit)
)
rows = cursor.fetchall()
return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in rows]
def get_all_threads(self) -> List[Dict]:
cursor = self.conn.execute(
"SELECT thread_id, COUNT(*) as msg_count, MIN(timestamp) as started, MAX(timestamp) as last_msg FROM memories WHERE user_id = ? GROUP BY thread_id ORDER BY last_msg DESC",
(self.user_id,)
)
rows = cursor.fetchall()
return [{"thread_id": row[0], "message_count": row[1], "started": row[2], "last_message": row[3]} for row in rows]
def export_to_json(self, filepath: str, thread_id: Optional[str] = None):
if thread_id:
memories = self.get_thread(thread_id)
else:
cursor = self.conn.execute("SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? ORDER BY timestamp", (self.user_id,))
memories = [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in cursor.fetchall()]
with open(filepath, 'w') as f:
json.dump(memories, f, indent=2)
print(f"[Memory] Exported {len(memories)} memories to {filepath}")
def export_to_markdown(self, filepath: str, thread_id: str):
memories = self.get_thread(thread_id)
with open(filepath, 'w') as f:
f.write(f"# Conversation: {thread_id}\n\n*Exported: {datetime.utcnow().isoformat()}*\n\n---\n\n")
for mem in memories:
role = mem.get("role", "unknown")
timestamp = mem.get("timestamp", "unknown")
content = mem.get("content", "")
emoji = "👤" if role == "user" else "🤖" if role == "assistant" else "📝"
f.write(f"### {emoji} {role.title()} *({timestamp})*\n\n{content}\n\n---\n\n")
print(f"[Memory] Exported conversation to {filepath}")
def close(self):
self.conn.close()
|