import gradio as gr import os import sqlite3 import json import hashlib from datetime import datetime from typing import List, Dict, Any, Tuple, Optional import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import threading from utils import ( process_document, extract_axioms, generate_response, get_embedding, compute_similarity, Document, Axiom, ActivityLog ) # Initialize database DB_PATH = "rag_nexus.db" conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() # Create tables cursor.execute(""" CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, name TEXT, content TEXT, size INTEGER, uploaded_at TEXT, chunk_count INTEGER ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS axioms ( id INTEGER PRIMARY KEY AUTOINCREMENT, doc_id TEXT, source TEXT, axiom TEXT, confidence REAL, FOREIGN KEY (doc_id) REFERENCES documents (id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS activity ( id INTEGER PRIMARY KEY AUTOINCREMENT, action TEXT, details TEXT, timestamp TEXT ) """) conn.commit() # Thread-local storage for database connections thread_local = threading.local() def get_db(): """Get thread-local database connection""" if not hasattr(thread_local, 'conn'): thread_local.conn = sqlite3.connect(DB_PATH) return thread_local.conn class RAGState: def __init__(self): self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') self.document_chunks = [] self.chunk_metadata = [] self.is_initialized = False def initialize_models(self): """Initialize models (simulated)""" if not self.is_initialized: # Load existing documents conn = get_db() cursor = conn.cursor() cursor.execute("SELECT id, content FROM documents") docs = cursor.fetchall() if docs: chunks = [] metadata = [] for doc_id, content in docs: doc_chunks = [content[i:i+500] for i in range(0, len(content), 500)] chunks.extend(doc_chunks) metadata.extend([{"doc_id": doc_id, "chunk_idx": i} for i in range(len(doc_chunks))]) if chunks: self.vectorizer.fit(chunks) self.document_chunks = chunks self.chunk_metadata = metadata self.is_initialized = True def get_state(): """Get global state""" if not hasattr(get_state, 'state'): get_state.state = RAGState() return get_state.state def log_activity(action: str, details: Dict[str, Any]): """Log activity to database""" conn = get_db() cursor = conn.cursor() cursor.execute( "INSERT INTO activity (action, details, timestamp) VALUES (?, ?, ?)", (action, json.dumps(details), datetime.now().isoformat()) ) conn.commit() def get_stats(): """Get system statistics""" conn = get_db() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM documents") doc_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM axioms") axiom_count = cursor.fetchone()[0] cursor.execute("SELECT SUM(size) FROM documents") storage = cursor.fetchone()[0] or 0 return { "doc_count": doc_count, "axiom_count": axiom_count, "storage_mb": round(storage / 1024 / 1024, 2) } def load_documents(): """Load all documents""" conn = get_db() cursor = conn.cursor() cursor.execute("SELECT id, name, size, uploaded_at FROM documents ORDER BY uploaded_at DESC") docs = cursor.fetchall() if not docs: return [["No documents found", "", "", ""]] return [[doc[1], f"{doc[2]} bytes", doc[3], doc[0]] for doc in docs] def load_axioms(source_filter: str = ""): """Load axioms with optional source filter""" conn = get_db() cursor = conn.cursor() if source_filter: cursor.execute(""" SELECT a.id, a.source, a.axiom, a.confidence, d.name FROM axioms a JOIN documents d ON a.doc_id = d.id WHERE d.name LIKE ? ORDER BY a.confidence DESC """, (f"%{source_filter}%",)) else: cursor.execute(""" SELECT a.id, a.source, a.axiom, a.confidence, d.name FROM axioms a JOIN documents d ON a.doc_id = d.id ORDER BY a.confidence DESC """) axioms = cursor.fetchall() if not axioms: return [["No axioms found", "", "", "", ""]] return [[ax[4], ax[1], ax[2][:100] + "...", f"{ax[3]:.2f}", str(ax[0])] for ax in axioms] def load_activity(): """Load recent activity""" conn = get_db() cursor = conn.cursor() cursor.execute("SELECT action, details, timestamp FROM activity ORDER BY timestamp DESC LIMIT 20") activities = cursor.fetchall() if not activities: return [["No activity yet", "", ""]] return [[act[0], json.loads(act[1]).get('description', ''), act[2]] for act in activities] def process_uploaded_files(files: List[str]) -> Tuple[str, str]: """Process uploaded files and return status""" if not files: return "No files uploaded", "⚠️" state = get_state() success_count = 0 total_count = len(files) for file_path in files: try: # Process document doc = process_document(file_path) # Save to database conn = get_db() cursor = conn.cursor() cursor.execute( "INSERT INTO documents (id, name, content, size, uploaded_at, chunk_count) VALUES (?, ?, ?, ?, ?, ?)", (doc.id, doc.name, doc.content, doc.size, doc.uploaded_at, doc.chunk_count) ) # Extract axioms axioms = extract_axioms(doc.content, doc.id) for axiom in axioms: cursor.execute( "INSERT INTO axioms (doc_id, source, axiom, confidence) VALUES (?, ?, ?, ?)", (doc.id, axiom.source, axiom.text, axiom.confidence) ) conn.commit() # Update vector store chunks = [doc.content[i:i+500] for i in range(0, len(doc.content), 500)] state.document_chunks.extend(chunks) state.chunk_metadata.extend([{"doc_id": doc.id, "chunk_idx": i} for i in range(len(chunks))]) # Refit vectorizer if needed if state.document_chunks: state.vectorizer.fit(state.document_chunks) log_activity("document_uploaded", { "name": doc.name, "size": doc.size, "chunks": doc.chunk_count }) success_count += 1 except Exception as e: log_activity("upload_failed", { "file": os.path.basename(file_path), "error": str(e) }) # Clean up temporary files for file_path in files: try: os.unlink(file_path) except: pass return f"Processed {success_count}/{total_count} files", "✅" if success_count == total_count else "⚠️" def generate_rag_response(query: str, use_axioms: bool, use_context: bool) -> Tuple[str, str]: """Generate response using RAG""" if not query.strip(): return "Please enter a query", "" state = get_state() state.initialize_models() # Retrieve context context = "" retrieved_docs = [] if use_context and state.document_chunks: try: query_vec = state.vectorizer.transform([query]) doc_vecs = state.vectorizer.transform(state.document_chunks) similarities = cosine_similarity(query_vec, doc_vecs).flatten() # Get top 3 chunks top_indices = np.argsort(similarities)[-3:][::-1] for idx in top_indices: if similarities[idx] > 0.1: chunk = state.document_chunks[idx] doc_id = state.chunk_metadata[idx]["doc_id"] conn = get_db() cursor = conn.cursor() cursor.execute("SELECT name FROM documents WHERE id = ?", (doc_id,)) doc_name = cursor.fetchone()[0] context += f"\n\n--- From {doc_name} ---\n{chunk}" retrieved_docs.append(f"{doc_name} (similarity: {similarities[idx]:.2f})") except: context = "" retrieved_docs = ["No relevant context found"] # Get axioms axioms = [] if use_axioms: conn = get_db() cursor = conn.cursor() cursor.execute("SELECT axiom FROM axioms ORDER BY RANDOM() LIMIT 5") axioms = [row[0] for row in cursor.fetchall()] # Generate response response = generate_response(query, context, axioms) # Log activity log_activity("response_generated", { "query": query[:100], "used_axioms": use_axioms, "used_context": use_context }) # Format context info context_info = "\n".join(retrieved_docs) if retrieved_docs else "No context retrieved" return response, context_info def clear_all_data(): """Clear all data from database""" conn = get_db() cursor = conn.cursor() cursor.execute("DELETE FROM documents") cursor.execute("DELETE FROM axioms") cursor.execute("DELETE FROM activity") conn.commit() # Reset state state = get_state() state.document_chunks = [] state.chunk_metadata = [] log_activity("data_cleared", {"all": True}) return "All data cleared successfully", "✅" def export_axioms(): """Export axioms as JSON""" conn = get_db() cursor = conn.cursor() cursor.execute(""" SELECT d.name as document, a.source, a.axiom, a.confidence FROM axioms a JOIN documents d ON a.doc_id = d.id """) axioms = [{"document": row[0], "source": row[1], "axiom": row[2], "confidence": row[3]} for row in cursor.fetchall()] if not axioms: return "No axioms to export", "⚠️" filename = f"axioms_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(filename, 'w') as f: json.dump(axioms, f, indent=2) log_activity("axioms_exported", {"count": len(axioms), "file": filename}) return f"Exported {len(axioms)} axioms to {filename}", "✅" # Initialize app state on load def initialize_app(): state = get_state() state.initialize_models() return "✅ Models initialized" # Create Gradio interface with gr.Blocks() as demo: gr.Markdown( """ # 🔮 RAG Nexus ### Intelligent Document Analysis & Axiom Extraction System **Built with anycoder** | [View on Hugging Face](https://huggingface.co/spaces/akhaliq/anycoder) """ ) # Status bar with gr.Row(): status_text = gr.Textbox("Initializing...", label="System Status", scale=4) init_btn = gr.Button("🔄 Reinitialize", scale=1) # Tabs with gr.Tabs() as tabs: # Upload Tab with gr.TabItem("📤 Upload", id="upload"): gr.Markdown("### Upload Documents for Analysis") file_output = gr.File( label="Drop files here or click to browse", file_count="multiple", file_types=[".txt", ".md", ".pdf", ".doc", ".docx"] ) upload_btn = gr.Button("🚀 Process Files", variant="primary") upload_status = gr.Textbox(label="Upload Status", interactive=False) with gr.Accordion("📋 Upload Queue", open=False): upload_queue = gr.Dataframe( headers=["File", "Status", "Size (bytes)"], datatype=["str", "str", "number"], label="Processed Files" ) # Documents Tab with gr.TabItem("📚 Documents", id="documents"): gr.Markdown("### Indexed Documents") with gr.Row(): doc_search = gr.Textbox( placeholder="Search documents...", label="Search", scale=3 ) clear_docs_btn = gr.Button("🗑️ Clear All", variant="stop", scale=1) documents_table = gr.Dataframe( headers=["Name", "Size", "Uploaded", "ID"], datatype=["str", "str", "str", "str"], label="Documents", wrap=True ) doc_search.change( fn=lambda search: load_documents(), inputs=doc_search, outputs=documents_table, api_visibility="private" ) # Axioms Tab with gr.TabItem("⚡ Axioms", id="axioms"): gr.Markdown("### Extracted Axioms") with gr.Row(): axiom_search = gr.Textbox( placeholder="Search axioms...", label="Search", scale=2 ) axiom_filter = gr.Dropdown( choices=[], label="Filter by Document", scale=1 ) export_axioms_btn = gr.Button("💾 Export JSON", scale=1) axioms_table = gr.Dataframe( headers=["Document", "Source", "Axiom", "Confidence", "ID"], datatype=["str", "str", "str", "number", "str"], label="Axioms", wrap=True ) export_status = gr.Textbox(label="Export Status", interactive=False) # Generate Tab with gr.TabItem("🤖 Generate", id="generate"): gr.Markdown("### Intelligent Response Generation") query_input = gr.Textbox( label="Enter your query", placeholder="Ask anything about your documents... (e.g., 'What are the fundamental principles based on the uploaded documents?')", lines=4, max_lines=8 ) with gr.Row(): use_axioms = gr.Checkbox(label="Use Axioms", value=True) use_context = gr.Checkbox(label="Use Context (RAG)", value=True) generate_btn = gr.Button("🚀 Generate Response", variant="primary") with gr.Group(): response_output = gr.Markdown( label="Generated Response", show_copy_button=True ) with gr.Accordion("📚 Retrieved Context & Axioms", open=False): context_output = gr.Textbox( label="Retrieved Documents", lines=5, interactive=False ) query_stats = gr.Textbox( label="Query Statistics", interactive=False, visible=False ) # Analytics Tab with gr.TabItem("📊 Analytics", id="analytics"): gr.Markdown("### System Analytics") with gr.Row(): with gr.Column(): doc_count_label = gr.Label(value="0", label="📄 Documents", show_label=True) with gr.Column(): axiom_count_label = gr.Label(value="0", label="⚡ Axioms", show_label=True) with gr.Column(): storage_label = gr.Label(value="0MB", label="💾 Storage Used", show_label=True) with gr.Accordion("📈 Recent Activity", open=True): activity_log = gr.Dataframe( headers=["Action", "Details", "Timestamp"], datatype=["str", "str", "str"], label="Activity Log", wrap=True, max_height=300 ) # Event handlers init_btn.click( fn=initialize_app, outputs=status_text, api_visibility="private" ) # Upload events def process_and_update(files): if not files: return "No files selected", [] # Process files status, icon = process_uploaded_files(files) # Create queue table queue_data = [] for f in files: name = os.path.basename(f) size = os.path.getsize(f) if os.path.exists(f) else 0 queue_data.append([name, "✅ Processed", size]) return f"{icon} {status}", queue_data upload_btn.click( fn=process_and_update, inputs=file_output, outputs=[upload_status, upload_queue], api_visibility="private" ).then( fn=load_documents, outputs=documents_table ).then( fn=lambda: load_axioms(), outputs=axioms_table ).then( fn=get_stats, outputs=[doc_count_label, axiom_count_label, storage_label] ).then( fn=load_activity, outputs=activity_log ) # Documents tab events def refresh_documents(): docs = load_documents() # Update filter choices return docs tabs.change( fn=refresh_documents, outputs=documents_table, api_visibility="private" ) clear_docs_btn.click( fn=clear_all_data, outputs=[status_text], api_visibility="private" ).then( fn=load_documents, outputs=documents_table ).then( fn=lambda: load_axioms(), outputs=axioms_table ).then( fn=get_stats, outputs=[doc_count_label, axiom_count_label, storage_label] ) # Axioms tab events def update_axiom_filter(): conn = get_db() cursor = conn.cursor() cursor.execute("SELECT DISTINCT name FROM documents") docs = [row[0] for row in cursor.fetchall()] return gr.Dropdown(choices=[""] + docs) tabs.change( fn=update_axiom_filter, outputs=axiom_filter, api_visibility="private" ) axiom_filter.change( fn=lambda filter_val: load_axioms(filter_val or ""), inputs=axiom_filter, outputs=axioms_table, api_visibility="private" ) export_axioms_btn.click( fn=export_axioms, outputs=[export_status], api_visibility="private" ) # Generate tab events generate_btn.click( fn=generate_rag_response, inputs=[query_input, use_axioms, use_context], outputs=[response_output, context_output], api_visibility="private" ).then( fn=load_activity, outputs=activity_log ) # Load initial data demo.load( fn=initialize_app, outputs=status_text, api_visibility="private" ).then( fn=load_documents, outputs=documents_table ).then( fn=lambda: load_axioms(), outputs=axioms_table ).then( fn=get_stats, outputs=[doc_count_label, axiom_count_label, storage_label] ).then( fn=load_activity, outputs=activity_log ).then( fn=update_axiom_filter, outputs=axiom_filter ) # Launch with Gradio 6 theme demo.launch( theme=gr.themes.Soft( primary_hue="indigo", secondary_hue="violet", neutral_hue="slate", font=gr.themes.GoogleFont("Inter"), text_size="lg", spacing_size="lg", radius_size="md" ).set( button_primary_background_fill="*primary_600", button_primary_background_fill_hover="*primary_700", block_title_text_weight="600", block_background_fill="*neutral_50" ), footer_links=[{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}], show_error=True, max_threads=40 )