| |
| """ |
| Conversation Memory System - SQLite-backed conversation tracker with text search and Markdown export. |
| Designed for offline use with optional sentence-transformers for semantic search. |
| """ |
|
|
| import sqlite3 |
| import json |
| import os |
| from pathlib import Path |
| from datetime import datetime |
| import gradio as gr |
|
|
| DB_PATH = Path("/data/conversations.db") |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) |
|
|
| |
| try: |
| from sentence_transformers import SentenceTransformer |
| import numpy as np |
| EMBED_MODEL = SentenceTransformer("all-MiniLM-L6-v2") |
| HAS_EMBEDDINGS = True |
| except ImportError: |
| HAS_EMBEDDINGS = False |
|
|
| class ConversationMemory: |
| def __init__(self, db_path: str): |
| self.db_path = db_path |
| self._init_db() |
| |
| def _init_db(self): |
| with sqlite3.connect(self.db_path) as conn: |
| conn.executescript(""" |
| CREATE TABLE IF NOT EXISTS threads ( |
| thread_id TEXT PRIMARY KEY, |
| title TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ); |
| CREATE TABLE IF NOT EXISTS messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| thread_id TEXT NOT NULL, |
| role TEXT NOT NULL CHECK(role IN ('user','assistant','system','agent-zero')), |
| content TEXT NOT NULL, |
| source TEXT DEFAULT 'user', |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (thread_id) REFERENCES threads(thread_id) |
| ); |
| CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id); |
| CREATE INDEX IF NOT EXISTS idx_threads_updated ON threads(updated_at); |
| """) |
| if HAS_EMBEDDINGS: |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS embeddings ( |
| message_id INTEGER PRIMARY KEY, |
| embedding BLOB, |
| FOREIGN KEY (message_id) REFERENCES messages(id) |
| ) |
| """) |
| conn.commit() |
| |
| def save_message(self, role: str, content: str, thread_id: str = None, source: str = "user"): |
| if thread_id is None: |
| thread_id = f"thread-{datetime.now().strftime('%Y%m%d-%H%M%S')}" |
| with sqlite3.connect(self.db_path) as conn: |
| conn.execute("INSERT OR IGNORE INTO threads (thread_id, title) VALUES (?, ?)", |
| (thread_id, thread_id)) |
| c = conn.execute("INSERT INTO messages (thread_id, role, content, source) VALUES (?,?,?,?)", |
| (thread_id, role, content, source)) |
| conn.execute("UPDATE threads SET updated_at = CURRENT_TIMESTAMP WHERE thread_id = ?", (thread_id,)) |
| conn.commit() |
| msg_id = c.lastrowid |
| if HAS_EMBEDDINGS: |
| emb = EMBED_MODEL.encode(content[:512]) |
| conn.execute("INSERT INTO embeddings (message_id, embedding) VALUES (?,?)", |
| (msg_id, emb.tobytes())) |
| conn.commit() |
| return thread_id |
| |
| def search_text(self, query: str, limit: int = 20): |
| with sqlite3.connect(self.db_path) as conn: |
| return conn.execute( |
| "SELECT thread_id, role, content, created_at FROM messages WHERE content LIKE ? ORDER BY created_at DESC LIMIT ?", |
| (f"%{query}%", limit) |
| ).fetchall() |
| |
| def search_semantic(self, query: str, limit: int = 10): |
| if not HAS_EMBEDDINGS: |
| return [("error", "system", "Install sentence-transformers for semantic search", "")] |
| q_emb = EMBED_MODEL.encode(query) |
| results = [] |
| with sqlite3.connect(self.db_path) as conn: |
| rows = conn.execute(""" |
| SELECT m.thread_id, m.role, m.content, m.created_at, e.embedding |
| FROM messages m JOIN embeddings e ON m.id = e.message_id |
| ORDER BY m.created_at DESC LIMIT 500 |
| """).fetchall() |
| for row in rows: |
| emb = np.frombuffer(row[4], dtype=np.float32) |
| score = np.dot(q_emb, emb) / (np.linalg.norm(q_emb) * np.linalg.norm(emb) + 1e-8) |
| results.append((score, row[0], row[1], row[2][:500], row[3])) |
| results.sort(key=lambda x: x[0], reverse=True) |
| return [(r[1], r[2], r[3], r[4]) for r in results[:limit]] |
| |
| def list_threads(self): |
| with sqlite3.connect(self.db_path) as conn: |
| return conn.execute( |
| "SELECT thread_id, title, created_at, updated_at, (SELECT COUNT(*) FROM messages WHERE thread_id = t.thread_id) as msg_count FROM threads t ORDER BY updated_at DESC LIMIT 50" |
| ).fetchall() |
| |
| def get_thread(self, thread_id: str): |
| with sqlite3.connect(self.db_path) as conn: |
| return conn.execute( |
| "SELECT role, content, source, created_at FROM messages WHERE thread_id = ? ORDER BY created_at", |
| (thread_id,) |
| ).fetchall() |
| |
| def export_markdown(self, thread_id: str) -> str: |
| msgs = self.get_thread(thread_id) |
| md = f"# Thread: {thread_id}\n\n" |
| for role, content, source, ts in msgs: |
| md += f"## {role} ({source}) - {ts}\n\n{content}\n\n---\n\n" |
| return md |
| |
| def export_all(self) -> str: |
| threads = self.list_threads() |
| md = "# All Conversations\n\n" |
| for tid, title, created, updated, count in threads: |
| md += f"## {title} ({count} msgs)\n" |
| md += f"Created: {created} | Updated: {updated}\n\n" |
| for role, content, source, ts in self.get_thread(tid): |
| md += f"### {role} ({source}) - {ts}\n\n{content[:500]}...\n\n" |
| md += "---\n\n" |
| return md |
|
|
| |
| memory = ConversationMemory(str(DB_PATH)) |
|
|
| |
| with gr.Blocks(title="Conversation Memory", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("""# ๐พ Conversation Memory System |
| **SQLite-backed** | **Text + Semantic Search** | **Markdown Export** | **Fully Offline** |
| """) |
| |
| with gr.Tabs(): |
| with gr.TabItem("๐ฌ Save"): |
| role = gr.Dropdown(["user", "assistant", "system", "agent-zero"], label="Role", value="user") |
| content = gr.Textbox(label="Message", lines=4, placeholder="Enter conversation message...") |
| thread_id = gr.Textbox(label="Thread ID (optional)", placeholder="auto-generated if blank") |
| source = gr.Textbox(label="Source", value="user") |
| save_btn = gr.Button("Save Message") |
| save_status = gr.Textbox(label="Status") |
| |
| def save(role, content, thread_id, source): |
| tid = memory.save_message(role, content, thread_id or None, source) |
| return f"Saved to thread: {tid}" |
| |
| save_btn.click(save, [role, content, thread_id, source], save_status) |
| |
| with gr.TabItem("๐ Search"): |
| search_query = gr.Textbox(label="Search Query") |
| search_mode = gr.Radio(["Text", "Semantic"], label="Mode", value="Text") |
| search_btn = gr.Button("Search") |
| search_results = gr.Dataframe( |
| headers=["Thread", "Role", "Content", "Time"], |
| label="Results" |
| ) |
| |
| def do_search(query, mode): |
| if mode == "Text": |
| results = memory.search_text(query) |
| else: |
| results = memory.search_semantic(query) |
| return [[r[0], r[1], r[2][:300], r[3]] for r in results] |
| |
| search_btn.click(do_search, [search_query, search_mode], search_results) |
| |
| with gr.TabItem("๐ Threads"): |
| threads_list = gr.Dataframe( |
| headers=["Thread ID", "Title", "Created", "Updated", "Messages"], |
| label="All Threads" |
| ) |
| refresh_btn = gr.Button("Refresh") |
| refresh_btn.click(lambda: memory.list_threads(), None, threads_list) |
| |
| thread_detail_id = gr.Textbox(label="Thread ID") |
| show_thread_btn = gr.Button("Show Thread") |
| thread_content = gr.Markdown(label="Thread Content") |
| |
| def show_thread(tid): |
| if not tid: return "Enter a thread ID" |
| msgs = memory.get_thread(tid) |
| return "\n\n".join(f"**{r[0]}** ({r[2]}) - {r[3]}\n\n{r[1]}" for r in msgs) |
| |
| show_thread_btn.click(show_thread, thread_detail_id, thread_content) |
| |
| with gr.TabItem("๐ค Export"): |
| export_thread_id = gr.Textbox(label="Thread ID (or 'all' for everything)") |
| export_btn = gr.Button("Export to Markdown") |
| export_output = gr.Markdown(label="Exported Markdown") |
| |
| def do_export(tid): |
| if tid == "all": |
| return memory.export_all() |
| return memory.export_markdown(tid) |
| |
| export_btn.click(do_export, export_thread_id, export_output) |
|
|
| demo.queue().launch(server_name="0.0.0.0", server_port=7860) |