ldostadi's picture
Update app.py from anycoder
41fb074 verified
import gradio as gr
import os
import sqlite3
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import threading
from utils import (
process_document,
extract_axioms,
generate_response,
get_embedding,
compute_similarity,
Document,
Axiom,
ActivityLog
)
# Initialize database
DB_PATH = "rag_nexus.db"
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
# Create tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
name TEXT,
content TEXT,
size INTEGER,
uploaded_at TEXT,
chunk_count INTEGER
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS axioms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_id TEXT,
source TEXT,
axiom TEXT,
confidence REAL,
FOREIGN KEY (doc_id) REFERENCES documents (id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS activity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
action TEXT,
details TEXT,
timestamp TEXT
)
""")
conn.commit()
# Thread-local storage for database connections
thread_local = threading.local()
def get_db():
"""Get thread-local database connection"""
if not hasattr(thread_local, 'conn'):
thread_local.conn = sqlite3.connect(DB_PATH)
return thread_local.conn
class RAGState:
def __init__(self):
self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
self.document_chunks = []
self.chunk_metadata = []
self.is_initialized = False
def initialize_models(self):
"""Initialize models (simulated)"""
if not self.is_initialized:
# Load existing documents
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT id, content FROM documents")
docs = cursor.fetchall()
if docs:
chunks = []
metadata = []
for doc_id, content in docs:
doc_chunks = [content[i:i+500] for i in range(0, len(content), 500)]
chunks.extend(doc_chunks)
metadata.extend([{"doc_id": doc_id, "chunk_idx": i} for i in range(len(doc_chunks))])
if chunks:
self.vectorizer.fit(chunks)
self.document_chunks = chunks
self.chunk_metadata = metadata
self.is_initialized = True
def get_state():
"""Get global state"""
if not hasattr(get_state, 'state'):
get_state.state = RAGState()
return get_state.state
def log_activity(action: str, details: Dict[str, Any]):
"""Log activity to database"""
conn = get_db()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO activity (action, details, timestamp) VALUES (?, ?, ?)",
(action, json.dumps(details), datetime.now().isoformat())
)
conn.commit()
def get_stats():
"""Get system statistics"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM documents")
doc_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM axioms")
axiom_count = cursor.fetchone()[0]
cursor.execute("SELECT SUM(size) FROM documents")
storage = cursor.fetchone()[0] or 0
return {
"doc_count": doc_count,
"axiom_count": axiom_count,
"storage_mb": round(storage / 1024 / 1024, 2)
}
def load_documents():
"""Load all documents"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT id, name, size, uploaded_at FROM documents ORDER BY uploaded_at DESC")
docs = cursor.fetchall()
if not docs:
return [["No documents found", "", "", ""]]
return [[doc[1], f"{doc[2]} bytes", doc[3], doc[0]] for doc in docs]
def load_axioms(source_filter: str = ""):
"""Load axioms with optional source filter"""
conn = get_db()
cursor = conn.cursor()
if source_filter:
cursor.execute("""
SELECT a.id, a.source, a.axiom, a.confidence, d.name
FROM axioms a
JOIN documents d ON a.doc_id = d.id
WHERE d.name LIKE ?
ORDER BY a.confidence DESC
""", (f"%{source_filter}%",))
else:
cursor.execute("""
SELECT a.id, a.source, a.axiom, a.confidence, d.name
FROM axioms a
JOIN documents d ON a.doc_id = d.id
ORDER BY a.confidence DESC
""")
axioms = cursor.fetchall()
if not axioms:
return [["No axioms found", "", "", "", ""]]
return [[ax[4], ax[1], ax[2][:100] + "...", f"{ax[3]:.2f}", str(ax[0])] for ax in axioms]
def load_activity():
"""Load recent activity"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT action, details, timestamp FROM activity ORDER BY timestamp DESC LIMIT 20")
activities = cursor.fetchall()
if not activities:
return [["No activity yet", "", ""]]
return [[act[0], json.loads(act[1]).get('description', ''), act[2]] for act in activities]
def process_uploaded_files(files: List[str]) -> Tuple[str, str]:
"""Process uploaded files and return status"""
if not files:
return "No files uploaded", "⚠️"
state = get_state()
success_count = 0
total_count = len(files)
for file_path in files:
try:
# Process document
doc = process_document(file_path)
# Save to database
conn = get_db()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO documents (id, name, content, size, uploaded_at, chunk_count) VALUES (?, ?, ?, ?, ?, ?)",
(doc.id, doc.name, doc.content, doc.size, doc.uploaded_at, doc.chunk_count)
)
# Extract axioms
axioms = extract_axioms(doc.content, doc.id)
for axiom in axioms:
cursor.execute(
"INSERT INTO axioms (doc_id, source, axiom, confidence) VALUES (?, ?, ?, ?)",
(doc.id, axiom.source, axiom.text, axiom.confidence)
)
conn.commit()
# Update vector store
chunks = [doc.content[i:i+500] for i in range(0, len(doc.content), 500)]
state.document_chunks.extend(chunks)
state.chunk_metadata.extend([{"doc_id": doc.id, "chunk_idx": i} for i in range(len(chunks))])
# Refit vectorizer if needed
if state.document_chunks:
state.vectorizer.fit(state.document_chunks)
log_activity("document_uploaded", {
"name": doc.name,
"size": doc.size,
"chunks": doc.chunk_count
})
success_count += 1
except Exception as e:
log_activity("upload_failed", {
"file": os.path.basename(file_path),
"error": str(e)
})
# Clean up temporary files
for file_path in files:
try:
os.unlink(file_path)
except:
pass
return f"Processed {success_count}/{total_count} files", "✅" if success_count == total_count else "⚠️"
def generate_rag_response(query: str, use_axioms: bool, use_context: bool) -> Tuple[str, str]:
"""Generate response using RAG"""
if not query.strip():
return "Please enter a query", ""
state = get_state()
state.initialize_models()
# Retrieve context
context = ""
retrieved_docs = []
if use_context and state.document_chunks:
try:
query_vec = state.vectorizer.transform([query])
doc_vecs = state.vectorizer.transform(state.document_chunks)
similarities = cosine_similarity(query_vec, doc_vecs).flatten()
# Get top 3 chunks
top_indices = np.argsort(similarities)[-3:][::-1]
for idx in top_indices:
if similarities[idx] > 0.1:
chunk = state.document_chunks[idx]
doc_id = state.chunk_metadata[idx]["doc_id"]
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT name FROM documents WHERE id = ?", (doc_id,))
doc_name = cursor.fetchone()[0]
context += f"\n\n--- From {doc_name} ---\n{chunk}"
retrieved_docs.append(f"{doc_name} (similarity: {similarities[idx]:.2f})")
except:
context = ""
retrieved_docs = ["No relevant context found"]
# Get axioms
axioms = []
if use_axioms:
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT axiom FROM axioms ORDER BY RANDOM() LIMIT 5")
axioms = [row[0] for row in cursor.fetchall()]
# Generate response
response = generate_response(query, context, axioms)
# Log activity
log_activity("response_generated", {
"query": query[:100],
"used_axioms": use_axioms,
"used_context": use_context
})
# Format context info
context_info = "\n".join(retrieved_docs) if retrieved_docs else "No context retrieved"
return response, context_info
def clear_all_data():
"""Clear all data from database"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("DELETE FROM documents")
cursor.execute("DELETE FROM axioms")
cursor.execute("DELETE FROM activity")
conn.commit()
# Reset state
state = get_state()
state.document_chunks = []
state.chunk_metadata = []
log_activity("data_cleared", {"all": True})
return "All data cleared successfully", "✅"
def export_axioms():
"""Export axioms as JSON"""
conn = get_db()
cursor = conn.cursor()
cursor.execute("""
SELECT d.name as document, a.source, a.axiom, a.confidence
FROM axioms a
JOIN documents d ON a.doc_id = d.id
""")
axioms = [{"document": row[0], "source": row[1], "axiom": row[2], "confidence": row[3]}
for row in cursor.fetchall()]
if not axioms:
return "No axioms to export", "⚠️"
filename = f"axioms_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(axioms, f, indent=2)
log_activity("axioms_exported", {"count": len(axioms), "file": filename})
return f"Exported {len(axioms)} axioms to {filename}", "✅"
# Initialize app state on load
def initialize_app():
state = get_state()
state.initialize_models()
return "✅ Models initialized"
# Create Gradio interface
with gr.Blocks() as demo:
gr.Markdown(
"""
# 🔮 RAG Nexus
### Intelligent Document Analysis & Axiom Extraction System
**Built with anycoder** | [View on Hugging Face](https://huggingface.co/spaces/akhaliq/anycoder)
"""
)
# Status bar
with gr.Row():
status_text = gr.Textbox("Initializing...", label="System Status", scale=4)
init_btn = gr.Button("🔄 Reinitialize", scale=1)
# Tabs
with gr.Tabs() as tabs:
# Upload Tab
with gr.TabItem("📤 Upload", id="upload"):
gr.Markdown("### Upload Documents for Analysis")
file_output = gr.File(
label="Drop files here or click to browse",
file_count="multiple",
file_types=[".txt", ".md", ".pdf", ".doc", ".docx"]
)
upload_btn = gr.Button("🚀 Process Files", variant="primary")
upload_status = gr.Textbox(label="Upload Status", interactive=False)
with gr.Accordion("📋 Upload Queue", open=False):
upload_queue = gr.Dataframe(
headers=["File", "Status", "Size (bytes)"],
datatype=["str", "str", "number"],
label="Processed Files"
)
# Documents Tab
with gr.TabItem("📚 Documents", id="documents"):
gr.Markdown("### Indexed Documents")
with gr.Row():
doc_search = gr.Textbox(
placeholder="Search documents...",
label="Search",
scale=3
)
clear_docs_btn = gr.Button("🗑️ Clear All", variant="stop", scale=1)
documents_table = gr.Dataframe(
headers=["Name", "Size", "Uploaded", "ID"],
datatype=["str", "str", "str", "str"],
label="Documents",
wrap=True
)
doc_search.change(
fn=lambda search: load_documents(),
inputs=doc_search,
outputs=documents_table,
api_visibility="private"
)
# Axioms Tab
with gr.TabItem("⚡ Axioms", id="axioms"):
gr.Markdown("### Extracted Axioms")
with gr.Row():
axiom_search = gr.Textbox(
placeholder="Search axioms...",
label="Search",
scale=2
)
axiom_filter = gr.Dropdown(
choices=[],
label="Filter by Document",
scale=1
)
export_axioms_btn = gr.Button("💾 Export JSON", scale=1)
axioms_table = gr.Dataframe(
headers=["Document", "Source", "Axiom", "Confidence", "ID"],
datatype=["str", "str", "str", "number", "str"],
label="Axioms",
wrap=True
)
export_status = gr.Textbox(label="Export Status", interactive=False)
# Generate Tab
with gr.TabItem("🤖 Generate", id="generate"):
gr.Markdown("### Intelligent Response Generation")
query_input = gr.Textbox(
label="Enter your query",
placeholder="Ask anything about your documents... (e.g., 'What are the fundamental principles based on the uploaded documents?')",
lines=4,
max_lines=8
)
with gr.Row():
use_axioms = gr.Checkbox(label="Use Axioms", value=True)
use_context = gr.Checkbox(label="Use Context (RAG)", value=True)
generate_btn = gr.Button("🚀 Generate Response", variant="primary")
with gr.Group():
response_output = gr.Markdown(
label="Generated Response",
show_copy_button=True
)
with gr.Accordion("📚 Retrieved Context & Axioms", open=False):
context_output = gr.Textbox(
label="Retrieved Documents",
lines=5,
interactive=False
)
query_stats = gr.Textbox(
label="Query Statistics",
interactive=False,
visible=False
)
# Analytics Tab
with gr.TabItem("📊 Analytics", id="analytics"):
gr.Markdown("### System Analytics")
with gr.Row():
with gr.Column():
doc_count_label = gr.Label(value="0", label="📄 Documents", show_label=True)
with gr.Column():
axiom_count_label = gr.Label(value="0", label="⚡ Axioms", show_label=True)
with gr.Column():
storage_label = gr.Label(value="0MB", label="💾 Storage Used", show_label=True)
with gr.Accordion("📈 Recent Activity", open=True):
activity_log = gr.Dataframe(
headers=["Action", "Details", "Timestamp"],
datatype=["str", "str", "str"],
label="Activity Log",
wrap=True,
max_height=300
)
# Event handlers
init_btn.click(
fn=initialize_app,
outputs=status_text,
api_visibility="private"
)
# Upload events
def process_and_update(files):
if not files:
return "No files selected", []
# Process files
status, icon = process_uploaded_files(files)
# Create queue table
queue_data = []
for f in files:
name = os.path.basename(f)
size = os.path.getsize(f) if os.path.exists(f) else 0
queue_data.append([name, "✅ Processed", size])
return f"{icon} {status}", queue_data
upload_btn.click(
fn=process_and_update,
inputs=file_output,
outputs=[upload_status, upload_queue],
api_visibility="private"
).then(
fn=load_documents,
outputs=documents_table
).then(
fn=lambda: load_axioms(),
outputs=axioms_table
).then(
fn=get_stats,
outputs=[doc_count_label, axiom_count_label, storage_label]
).then(
fn=load_activity,
outputs=activity_log
)
# Documents tab events
def refresh_documents():
docs = load_documents()
# Update filter choices
return docs
tabs.change(
fn=refresh_documents,
outputs=documents_table,
api_visibility="private"
)
clear_docs_btn.click(
fn=clear_all_data,
outputs=[status_text],
api_visibility="private"
).then(
fn=load_documents,
outputs=documents_table
).then(
fn=lambda: load_axioms(),
outputs=axioms_table
).then(
fn=get_stats,
outputs=[doc_count_label, axiom_count_label, storage_label]
)
# Axioms tab events
def update_axiom_filter():
conn = get_db()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM documents")
docs = [row[0] for row in cursor.fetchall()]
return gr.Dropdown(choices=[""] + docs)
tabs.change(
fn=update_axiom_filter,
outputs=axiom_filter,
api_visibility="private"
)
axiom_filter.change(
fn=lambda filter_val: load_axioms(filter_val or ""),
inputs=axiom_filter,
outputs=axioms_table,
api_visibility="private"
)
export_axioms_btn.click(
fn=export_axioms,
outputs=[export_status],
api_visibility="private"
)
# Generate tab events
generate_btn.click(
fn=generate_rag_response,
inputs=[query_input, use_axioms, use_context],
outputs=[response_output, context_output],
api_visibility="private"
).then(
fn=load_activity,
outputs=activity_log
)
# Load initial data
demo.load(
fn=initialize_app,
outputs=status_text,
api_visibility="private"
).then(
fn=load_documents,
outputs=documents_table
).then(
fn=lambda: load_axioms(),
outputs=axioms_table
).then(
fn=get_stats,
outputs=[doc_count_label, axiom_count_label, storage_label]
).then(
fn=load_activity,
outputs=activity_log
).then(
fn=update_axiom_filter,
outputs=axiom_filter
)
# Launch with Gradio 6 theme
demo.launch(
theme=gr.themes.Soft(
primary_hue="indigo",
secondary_hue="violet",
neutral_hue="slate",
font=gr.themes.GoogleFont("Inter"),
text_size="lg",
spacing_size="lg",
radius_size="md"
).set(
button_primary_background_fill="*primary_600",
button_primary_background_fill_hover="*primary_700",
block_title_text_weight="600",
block_background_fill="*neutral_50"
),
footer_links=[{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}],
show_error=True,
max_threads=40
)