import gradio as gr import os import tempfile import base64 from typing import List, Tuple, Optional import json from pathlib import Path # Import our modules from src.document_processor import DocumentProcessor from src.vector_store import VectorStore from src.llm_handler import LLMHandler from src.utils import setup_directories, get_file_icon from config import Config # Initialize configuration config = Config() # Setup directories setup_directories() # Initialize components print("🚀 Initializing Smart RAG API components...") document_processor = DocumentProcessor() vector_store = VectorStore(document_processor.embedding_model) llm_handler = LLMHandler() # Load existing vector store try: vector_store.load(config.VECTOR_STORE_DIR) print(f"✅ Loaded existing vector store with {len(vector_store.chunks)} documents") except: print("📝 Starting with empty vector store") # Global state for uploaded files uploaded_files = [] def process_uploaded_file(file_path: str) -> Tuple[str, str]: """Process uploaded file and return status message and file info""" try: if file_path is None: return "❌ No file uploaded", "" file_name = Path(file_path).name file_extension = Path(file_path).suffix.lower() # Check file size file_size = os.path.getsize(file_path) if file_size > config.MAX_FILE_SIZE: return f"❌ File too large. Maximum size: {config.MAX_FILE_SIZE/1024/1024:.1f}MB", "" # Process document print(f"📄 Processing {file_name}...") chunks = document_processor.process_document(file_path, file_extension) if not chunks: return "❌ No text content found in the file", "" # Generate file ID file_id = f"file_{len(uploaded_files)}" # Add to vector store vector_store.add_documents(chunks, file_id, file_name) # Save vector store vector_store.save(config.VECTOR_STORE_DIR) # Track uploaded file file_info = { 'id': file_id, 'name': file_name, 'type': file_extension, 'chunks': len(chunks), 'size': file_size } uploaded_files.append(file_info) # Create status message icon = get_file_icon(file_extension) status_msg = f"✅ Successfully processed: {file_name}" file_details = f""" {icon} **{file_name}** - Type: {file_extension.upper()} - Size: {file_size/1024:.1f} KB - Chunks created: {len(chunks)} - File ID: {file_id} """ return status_msg, file_details except Exception as e: error_msg = f"❌ Error processing file: {str(e)}" print(error_msg) return error_msg, "" def answer_question(question: str, image_input=None) -> Tuple[str, str, str]: """Answer question based on uploaded documents""" try: if not question.strip(): return "❌ Please enter a question", "", "" if len(vector_store.chunks) == 0: return "❌ No documents uploaded yet. Please upload a document first.", "", "" # Handle image input if provided processed_question = question if image_input is not None: try: # Convert image to base64 and extract text import tempfile with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp_file: image_input.save(tmp_file.name) # Extract text from image with open(tmp_file.name, 'rb') as img_file: ocr_text = document_processor.extract_text_from_image(img_file.read()) os.unlink(tmp_file.name) if ocr_text.strip(): processed_question = f"{question}\n\nImage content: {ocr_text}" except Exception as e: print(f"Image processing error: {e}") # Search vector store search_results = vector_store.search(processed_question, k=5) if not search_results: return "❌ No relevant information found in uploaded documents", "", "" # Extract context and sources contexts = [result['text'] for result in search_results] sources = [result['metadata'] for result in search_results] # Generate answer answer = llm_handler.generate_answer(question, contexts) # Format context context_display = "\n\n".join([ f"**Context {i+1}** (Score: {result['score']:.3f}):\n{result['text'][:300]}..." for i, result in enumerate(search_results[:3]) ]) # Format sources sources_display = "\n".join([ f"• **{source['filename']}** (Chunk {source['chunk_index']})" for source in sources[:3] ]) return answer, context_display, sources_display except Exception as e: error_msg = f"❌ Error generating answer: {str(e)}" print(error_msg) return error_msg, "", "" def get_uploaded_files_status(): """Get status of all uploaded files""" if not uploaded_files: return "📭 No files uploaded yet" status = f"📚 **{len(uploaded_files)} files uploaded** ({len(vector_store.chunks)} total chunks)\n\n" for file_info in uploaded_files: icon = get_file_icon(file_info['type']) status += f"{icon} **{file_info['name']}** ({file_info['chunks']} chunks)\n" return status def clear_all_documents(): """Clear all uploaded documents""" global uploaded_files try: # Reset vector store vector_store.reset() # Clear uploaded files list uploaded_files = [] # Save empty vector store vector_store.save(config.VECTOR_STORE_DIR) return "✅ All documents cleared successfully", "📭 No files uploaded" except Exception as e: return f"❌ Error clearing documents: {str(e)}", get_uploaded_files_status() # Custom CSS custom_css = """ .gradio-container { max-width: 1200px !important; } .file-upload-area { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; text-align: center; transition: border-color 0.3s ease; } .file-upload-area:hover { border-color: #007bff; } .status-success { color: #28a745; font-weight: bold; } .status-error { color: #dc3545; font-weight: bold; } .answer-box { background: #f8f9fa; border-left: 4px solid #007bff; padding: 15px; border-radius: 5px; margin: 10px 0; } .context-box { background: #fff3cd; border-left: 4px solid #ffc107; padding: 15px; border-radius: 5px; margin: 10px 0; max-height: 300px; overflow-y: auto; } .sources-box { background: #d4edda; border-left: 4px solid #28a745; padding: 15px; border-radius: 5px; margin: 10px 0; } """ # Create Gradio interface with gr.Blocks(css=custom_css, title="Smart RAG API", theme=gr.themes.Soft()) as demo: # Header gr.Markdown(""" # 🤖 Smart RAG API ### Intelligent Document Q&A System Upload documents (PDF, DOCX, TXT, Images, CSV, SQLite) and ask questions about their content! **Supported formats**: PDF, Word, Text, Images (with OCR), CSV, SQLite databases """) with gr.Row(): # Left Column - File Upload with gr.Column(scale=1): gr.Markdown("## 📤 Upload Documents") file_input = gr.File( label="Choose File", file_types=[".pdf", ".docx", ".txt", ".jpg", ".jpeg", ".png", ".csv", ".db"], type="filepath" ) upload_btn = gr.Button("📄 Process Document", variant="primary", size="lg") upload_status = gr.Markdown("📭 No files uploaded yet") file_details = gr.Markdown("") gr.Markdown("---") # File Management with gr.Row(): refresh_btn = gr.Button("🔄 Refresh Status", size="sm") clear_btn = gr.Button("🗑️ Clear All", size="sm", variant="secondary") # Right Column - Question Answering with gr.Column(scale=2): gr.Markdown("## ❓ Ask Questions") question_input = gr.Textbox( label="Your Question", placeholder="What is this document about?", lines=2 ) image_input = gr.Image( label="Upload Image (Optional)", type="pil", height=150 ) ask_btn = gr.Button("🔍 Get Answer", variant="primary", size="lg") # Results gr.Markdown("### 💡 Answer") answer_output = gr.Markdown( value="Ask a question to see the answer here...", elem_classes=["answer-box"] ) with gr.Accordion("📋 Context & Sources", open=False): with gr.Row(): with gr.Column(): gr.Markdown("**📄 Context Used:**") context_output = gr.Markdown(elem_classes=["context-box"]) with gr.Column(): gr.Markdown("**📚 Sources:**") sources_output = gr.Markdown(elem_classes=["sources-box"]) # Example Questions gr.Markdown(""" ## 💡 Example Questions Try asking questions like: - "What is the main topic of this document?" - "Summarize the key points" - "What are the important dates mentioned?" - "Who are the people mentioned in the document?" - "What are the financial figures?" """) # Sample Files with gr.Accordion("📁 Sample Files for Testing", open=False): gr.Markdown(""" You can test the system with these types of documents: - **PDF**: Research papers, reports, invoices - **Word**: Documents, proposals, contracts - **Text**: Plain text files, logs, notes - **Images**: Screenshots, scanned documents, diagrams - **CSV**: Data tables, spreadsheets - **Database**: SQLite files with structured data """) # Event handlers upload_btn.click( fn=process_uploaded_file, inputs=[file_input], outputs=[upload_status, file_details] ) ask_btn.click( fn=answer_question, inputs=[question_input, image_input], outputs=[answer_output, context_output, sources_output] ) refresh_btn.click( fn=get_uploaded_files_status, outputs=[upload_status] ) clear_btn.click( fn=clear_all_documents, outputs=[upload_status, file_details] ) # Auto-refresh status on file input change file_input.change( fn=lambda: get_uploaded_files_status(), outputs=[upload_status] ) # Launch configuration if __name__ == "__main__": print("🚀 Launching Smart RAG API...") demo.launch( server_name="0.0.0.0", server_port=7860, share=True, # Creates public link show_error=True, show_tips=True, enable_queue=True )