conversation-memory / memory_system.py
ScottzillaSystems's picture
Add self-contained conversation memory system
ab886a2 verified
"""
Conversation Memory System for ScottzillaSystems
Self-contained — no external APIs needed
Uses SQLite + sentence-transformers for local embeddings
Replaces the fraudulent MemPalace project with a real working system.
"""
import os
import json
import sqlite3
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
try:
from sentence_transformers import SentenceTransformer
EMBEDDING_MODEL = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
USE_VECTOR_SEARCH = True
except Exception:
EMBEDDING_MODEL = None
USE_VECTOR_SEARCH = False
print("[Memory] sentence-transformers not available, using text search fallback")
class ConversationMemory:
def __init__(self, db_path: str = "./memory_db/conversations.db", user_id: str = "scottzilla"):
self.db_path = db_path
self.user_id = user_id
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
thread_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
metadata TEXT,
embedding BLOB
)
""")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_user_thread ON memories(user_id, thread_id, timestamp)")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_content ON memories(content)")
self.conn.commit()
def _get_embedding(self, text: str) -> Optional[bytes]:
if not USE_VECTOR_SEARCH or EMBEDDING_MODEL is None:
return None
try:
embedding = EMBEDDING_MODEL.encode(text, convert_to_numpy=True)
return embedding.tobytes()
except Exception as e:
print(f"[Memory] Embedding error: {e}")
return None
def save_message(self, role: str, content: str, thread_id: str, metadata: Optional[Dict] = None) -> Dict:
timestamp = datetime.utcnow().isoformat()
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
embedding = self._get_embedding(content)
meta_json = json.dumps(metadata or {})
cursor = self.conn.execute(
"INSERT INTO memories (user_id, thread_id, timestamp, role, content, content_hash, metadata, embedding) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(self.user_id, thread_id, timestamp, role, content, content_hash, meta_json, embedding)
)
self.conn.commit()
return {"id": cursor.lastrowid, "thread_id": thread_id, "timestamp": timestamp, "role": role, "content": content, "metadata": metadata or {}}
def save_conversation(self, messages: List[Dict], thread_id: str, title: Optional[str] = None) -> List[Dict]:
results = []
for msg in messages:
result = self.save_message(
role=msg.get("role", "unknown"),
content=msg.get("content", ""),
thread_id=thread_id,
metadata={"title": title, **msg.get("metadata", {})}
)
results.append(result)
return results
def get_thread(self, thread_id: str, limit: int = 1000) -> List[Dict]:
cursor = self.conn.execute(
"SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? ORDER BY timestamp ASC LIMIT ?",
(self.user_id, thread_id, limit)
)
rows = cursor.fetchall()
return [{"id": row[0], "timestamp": row[1], "role": row[2], "content": row[3], "metadata": json.loads(row[4])} for row in rows]
def search(self, query: str, thread_id: Optional[str] = None, limit: int = 20) -> List[Dict]:
if thread_id:
cursor = self.conn.execute(
"SELECT id, timestamp, role, content, metadata FROM memories WHERE user_id = ? AND thread_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?",
(self.user_id, thread_id, f"%{query}%", limit)
)
rows = cursor.fetchall()
return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4])} for r in rows]
else:
cursor = self.conn.execute(
"SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ?",
(self.user_id, f"%{query}%", limit)
)
rows = cursor.fetchall()
return [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in rows]
def get_all_threads(self) -> List[Dict]:
cursor = self.conn.execute(
"SELECT thread_id, COUNT(*) as msg_count, MIN(timestamp) as started, MAX(timestamp) as last_msg FROM memories WHERE user_id = ? GROUP BY thread_id ORDER BY last_msg DESC",
(self.user_id,)
)
rows = cursor.fetchall()
return [{"thread_id": row[0], "message_count": row[1], "started": row[2], "last_message": row[3]} for row in rows]
def export_to_json(self, filepath: str, thread_id: Optional[str] = None):
if thread_id:
memories = self.get_thread(thread_id)
else:
cursor = self.conn.execute("SELECT id, timestamp, role, content, metadata, thread_id FROM memories WHERE user_id = ? ORDER BY timestamp", (self.user_id,))
memories = [{"id": r[0], "timestamp": r[1], "role": r[2], "content": r[3], "metadata": json.loads(r[4]), "thread_id": r[5]} for r in cursor.fetchall()]
with open(filepath, 'w') as f:
json.dump(memories, f, indent=2)
print(f"[Memory] Exported {len(memories)} memories to {filepath}")
def export_to_markdown(self, filepath: str, thread_id: str):
memories = self.get_thread(thread_id)
with open(filepath, 'w') as f:
f.write(f"# Conversation: {thread_id}\n\n*Exported: {datetime.utcnow().isoformat()}*\n\n---\n\n")
for mem in memories:
role = mem.get("role", "unknown")
timestamp = mem.get("timestamp", "unknown")
content = mem.get("content", "")
emoji = "👤" if role == "user" else "🤖" if role == "assistant" else "📝"
f.write(f"### {emoji} {role.title()} *({timestamp})*\n\n{content}\n\n---\n\n")
print(f"[Memory] Exported conversation to {filepath}")
def close(self):
self.conn.close()