ScottzillaSystems's picture
Upload app.py
fafcc25 verified
#!/usr/bin/env python3
"""
Conversation Memory System - SQLite-backed conversation tracker with text search and Markdown export.
Designed for offline use with optional sentence-transformers for semantic search.
"""
import sqlite3
import json
import os
from pathlib import Path
from datetime import datetime
import gradio as gr
DB_PATH = Path("/data/conversations.db")
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
# Try optional semantic search
try:
from sentence_transformers import SentenceTransformer
import numpy as np
EMBED_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
HAS_EMBEDDINGS = True
except ImportError:
HAS_EMBEDDINGS = False
class ConversationMemory:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS threads (
thread_id TEXT PRIMARY KEY,
title TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_id TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('user','assistant','system','agent-zero')),
content TEXT NOT NULL,
source TEXT DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (thread_id) REFERENCES threads(thread_id)
);
CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id);
CREATE INDEX IF NOT EXISTS idx_threads_updated ON threads(updated_at);
""")
if HAS_EMBEDDINGS:
conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
message_id INTEGER PRIMARY KEY,
embedding BLOB,
FOREIGN KEY (message_id) REFERENCES messages(id)
)
""")
conn.commit()
def save_message(self, role: str, content: str, thread_id: str = None, source: str = "user"):
if thread_id is None:
thread_id = f"thread-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
with sqlite3.connect(self.db_path) as conn:
conn.execute("INSERT OR IGNORE INTO threads (thread_id, title) VALUES (?, ?)",
(thread_id, thread_id))
c = conn.execute("INSERT INTO messages (thread_id, role, content, source) VALUES (?,?,?,?)",
(thread_id, role, content, source))
conn.execute("UPDATE threads SET updated_at = CURRENT_TIMESTAMP WHERE thread_id = ?", (thread_id,))
conn.commit()
msg_id = c.lastrowid
if HAS_EMBEDDINGS:
emb = EMBED_MODEL.encode(content[:512])
conn.execute("INSERT INTO embeddings (message_id, embedding) VALUES (?,?)",
(msg_id, emb.tobytes()))
conn.commit()
return thread_id
def search_text(self, query: str, limit: int = 20):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT thread_id, role, content, created_at FROM messages WHERE content LIKE ? ORDER BY created_at DESC LIMIT ?",
(f"%{query}%", limit)
).fetchall()
def search_semantic(self, query: str, limit: int = 10):
if not HAS_EMBEDDINGS:
return [("error", "system", "Install sentence-transformers for semantic search", "")]
q_emb = EMBED_MODEL.encode(query)
results = []
with sqlite3.connect(self.db_path) as conn:
rows = conn.execute("""
SELECT m.thread_id, m.role, m.content, m.created_at, e.embedding
FROM messages m JOIN embeddings e ON m.id = e.message_id
ORDER BY m.created_at DESC LIMIT 500
""").fetchall()
for row in rows:
emb = np.frombuffer(row[4], dtype=np.float32)
score = np.dot(q_emb, emb) / (np.linalg.norm(q_emb) * np.linalg.norm(emb) + 1e-8)
results.append((score, row[0], row[1], row[2][:500], row[3]))
results.sort(key=lambda x: x[0], reverse=True)
return [(r[1], r[2], r[3], r[4]) for r in results[:limit]]
def list_threads(self):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT thread_id, title, created_at, updated_at, (SELECT COUNT(*) FROM messages WHERE thread_id = t.thread_id) as msg_count FROM threads t ORDER BY updated_at DESC LIMIT 50"
).fetchall()
def get_thread(self, thread_id: str):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT role, content, source, created_at FROM messages WHERE thread_id = ? ORDER BY created_at",
(thread_id,)
).fetchall()
def export_markdown(self, thread_id: str) -> str:
msgs = self.get_thread(thread_id)
md = f"# Thread: {thread_id}\n\n"
for role, content, source, ts in msgs:
md += f"## {role} ({source}) - {ts}\n\n{content}\n\n---\n\n"
return md
def export_all(self) -> str:
threads = self.list_threads()
md = "# All Conversations\n\n"
for tid, title, created, updated, count in threads:
md += f"## {title} ({count} msgs)\n"
md += f"Created: {created} | Updated: {updated}\n\n"
for role, content, source, ts in self.get_thread(tid):
md += f"### {role} ({source}) - {ts}\n\n{content[:500]}...\n\n"
md += "---\n\n"
return md
# Initialize
memory = ConversationMemory(str(DB_PATH))
# Gradio UI
with gr.Blocks(title="Conversation Memory", theme=gr.themes.Soft()) as demo:
gr.Markdown("""# ๐Ÿ’พ Conversation Memory System
**SQLite-backed** | **Text + Semantic Search** | **Markdown Export** | **Fully Offline**
""")
with gr.Tabs():
with gr.TabItem("๐Ÿ’ฌ Save"):
role = gr.Dropdown(["user", "assistant", "system", "agent-zero"], label="Role", value="user")
content = gr.Textbox(label="Message", lines=4, placeholder="Enter conversation message...")
thread_id = gr.Textbox(label="Thread ID (optional)", placeholder="auto-generated if blank")
source = gr.Textbox(label="Source", value="user")
save_btn = gr.Button("Save Message")
save_status = gr.Textbox(label="Status")
def save(role, content, thread_id, source):
tid = memory.save_message(role, content, thread_id or None, source)
return f"Saved to thread: {tid}"
save_btn.click(save, [role, content, thread_id, source], save_status)
with gr.TabItem("๐Ÿ” Search"):
search_query = gr.Textbox(label="Search Query")
search_mode = gr.Radio(["Text", "Semantic"], label="Mode", value="Text")
search_btn = gr.Button("Search")
search_results = gr.Dataframe(
headers=["Thread", "Role", "Content", "Time"],
label="Results"
)
def do_search(query, mode):
if mode == "Text":
results = memory.search_text(query)
else:
results = memory.search_semantic(query)
return [[r[0], r[1], r[2][:300], r[3]] for r in results]
search_btn.click(do_search, [search_query, search_mode], search_results)
with gr.TabItem("๐Ÿ“‹ Threads"):
threads_list = gr.Dataframe(
headers=["Thread ID", "Title", "Created", "Updated", "Messages"],
label="All Threads"
)
refresh_btn = gr.Button("Refresh")
refresh_btn.click(lambda: memory.list_threads(), None, threads_list)
thread_detail_id = gr.Textbox(label="Thread ID")
show_thread_btn = gr.Button("Show Thread")
thread_content = gr.Markdown(label="Thread Content")
def show_thread(tid):
if not tid: return "Enter a thread ID"
msgs = memory.get_thread(tid)
return "\n\n".join(f"**{r[0]}** ({r[2]}) - {r[3]}\n\n{r[1]}" for r in msgs)
show_thread_btn.click(show_thread, thread_detail_id, thread_content)
with gr.TabItem("๐Ÿ“ค Export"):
export_thread_id = gr.Textbox(label="Thread ID (or 'all' for everything)")
export_btn = gr.Button("Export to Markdown")
export_output = gr.Markdown(label="Exported Markdown")
def do_export(tid):
if tid == "all":
return memory.export_all()
return memory.export_markdown(tid)
export_btn.click(do_export, export_thread_id, export_output)
demo.queue().launch(server_name="0.0.0.0", server_port=7860)