Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import tempfile | |
| import base64 | |
| from typing import List, Tuple, Optional | |
| import json | |
| from pathlib import Path | |
| # Import our modules | |
| from src.document_processor import DocumentProcessor | |
| from src.vector_store import VectorStore | |
| from src.llm_handler import LLMHandler | |
| from src.utils import setup_directories, get_file_icon | |
| from config import Config | |
| # Initialize configuration | |
| config = Config() | |
| # Setup directories | |
| setup_directories() | |
| # Initialize components | |
| print("π Initializing Smart RAG API components...") | |
| document_processor = DocumentProcessor() | |
| vector_store = VectorStore(document_processor.embedding_model) | |
| llm_handler = LLMHandler() | |
| # Load existing vector store | |
| try: | |
| vector_store.load(config.VECTOR_STORE_DIR) | |
| print(f"β Loaded existing vector store with {len(vector_store.chunks)} documents") | |
| except: | |
| print("π Starting with empty vector store") | |
| # Global state for uploaded files | |
| uploaded_files = [] | |
| def process_uploaded_file(file_path: str) -> Tuple[str, str]: | |
| """Process uploaded file and return status message and file info""" | |
| try: | |
| if file_path is None: | |
| return "β No file uploaded", "" | |
| file_name = Path(file_path).name | |
| file_extension = Path(file_path).suffix.lower() | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| if file_size > config.MAX_FILE_SIZE: | |
| return f"β File too large. Maximum size: {config.MAX_FILE_SIZE/1024/1024:.1f}MB", "" | |
| # Process document | |
| print(f"π Processing {file_name}...") | |
| chunks = document_processor.process_document(file_path, file_extension) | |
| if not chunks: | |
| return "β No text content found in the file", "" | |
| # Generate file ID | |
| file_id = f"file_{len(uploaded_files)}" | |
| # Add to vector store | |
| vector_store.add_documents(chunks, file_id, file_name) | |
| # Save vector store | |
| vector_store.save(config.VECTOR_STORE_DIR) | |
| # Track uploaded file | |
| file_info = { | |
| 'id': file_id, | |
| 'name': file_name, | |
| 'type': file_extension, | |
| 'chunks': len(chunks), | |
| 'size': file_size | |
| } | |
| uploaded_files.append(file_info) | |
| # Create status message | |
| icon = get_file_icon(file_extension) | |
| status_msg = f"β Successfully processed: {file_name}" | |
| file_details = f""" | |
| {icon} **{file_name}** | |
| - Type: {file_extension.upper()} | |
| - Size: {file_size/1024:.1f} KB | |
| - Chunks created: {len(chunks)} | |
| - File ID: {file_id} | |
| """ | |
| return status_msg, file_details | |
| except Exception as e: | |
| error_msg = f"β Error processing file: {str(e)}" | |
| print(error_msg) | |
| return error_msg, "" | |
| def answer_question(question: str, image_input=None) -> Tuple[str, str, str]: | |
| """Answer question based on uploaded documents""" | |
| try: | |
| if not question.strip(): | |
| return "β Please enter a question", "", "" | |
| if len(vector_store.chunks) == 0: | |
| return "β No documents uploaded yet. Please upload a document first.", "", "" | |
| # Handle image input if provided | |
| processed_question = question | |
| if image_input is not None: | |
| try: | |
| # Convert image to base64 and extract text | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp_file: | |
| image_input.save(tmp_file.name) | |
| # Extract text from image | |
| with open(tmp_file.name, 'rb') as img_file: | |
| ocr_text = document_processor.extract_text_from_image(img_file.read()) | |
| os.unlink(tmp_file.name) | |
| if ocr_text.strip(): | |
| processed_question = f"{question}\n\nImage content: {ocr_text}" | |
| except Exception as e: | |
| print(f"Image processing error: {e}") | |
| # Search vector store | |
| search_results = vector_store.search(processed_question, k=5) | |
| if not search_results: | |
| return "β No relevant information found in uploaded documents", "", "" | |
| # Extract context and sources | |
| contexts = [result['text'] for result in search_results] | |
| sources = [result['metadata'] for result in search_results] | |
| # Generate answer | |
| answer = llm_handler.generate_answer(question, contexts) | |
| # Format context | |
| context_display = "\n\n".join([ | |
| f"**Context {i+1}** (Score: {result['score']:.3f}):\n{result['text'][:300]}..." | |
| for i, result in enumerate(search_results[:3]) | |
| ]) | |
| # Format sources | |
| sources_display = "\n".join([ | |
| f"β’ **{source['filename']}** (Chunk {source['chunk_index']})" | |
| for source in sources[:3] | |
| ]) | |
| return answer, context_display, sources_display | |
| except Exception as e: | |
| error_msg = f"β Error generating answer: {str(e)}" | |
| print(error_msg) | |
| return error_msg, "", "" | |
| def get_uploaded_files_status(): | |
| """Get status of all uploaded files""" | |
| if not uploaded_files: | |
| return "π No files uploaded yet" | |
| status = f"π **{len(uploaded_files)} files uploaded** ({len(vector_store.chunks)} total chunks)\n\n" | |
| for file_info in uploaded_files: | |
| icon = get_file_icon(file_info['type']) | |
| status += f"{icon} **{file_info['name']}** ({file_info['chunks']} chunks)\n" | |
| return status | |
| def clear_all_documents(): | |
| """Clear all uploaded documents""" | |
| global uploaded_files | |
| try: | |
| # Reset vector store | |
| vector_store.reset() | |
| # Clear uploaded files list | |
| uploaded_files = [] | |
| # Save empty vector store | |
| vector_store.save(config.VECTOR_STORE_DIR) | |
| return "β All documents cleared successfully", "π No files uploaded" | |
| except Exception as e: | |
| return f"β Error clearing documents: {str(e)}", get_uploaded_files_status() | |
| # Custom CSS | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .file-upload-area { | |
| border: 2px dashed #ccc; | |
| border-radius: 10px; | |
| padding: 20px; | |
| text-align: center; | |
| transition: border-color 0.3s ease; | |
| } | |
| .file-upload-area:hover { | |
| border-color: #007bff; | |
| } | |
| .status-success { | |
| color: #28a745; | |
| font-weight: bold; | |
| } | |
| .status-error { | |
| color: #dc3545; | |
| font-weight: bold; | |
| } | |
| .answer-box { | |
| background: #f8f9fa; | |
| border-left: 4px solid #007bff; | |
| padding: 15px; | |
| border-radius: 5px; | |
| margin: 10px 0; | |
| } | |
| .context-box { | |
| background: #fff3cd; | |
| border-left: 4px solid #ffc107; | |
| padding: 15px; | |
| border-radius: 5px; | |
| margin: 10px 0; | |
| max-height: 300px; | |
| overflow-y: auto; | |
| } | |
| .sources-box { | |
| background: #d4edda; | |
| border-left: 4px solid #28a745; | |
| padding: 15px; | |
| border-radius: 5px; | |
| margin: 10px 0; | |
| } | |
| """ | |
| # Create Gradio interface | |
| with gr.Blocks(css=custom_css, title="Smart RAG API", theme=gr.themes.Soft()) as demo: | |
| # Header | |
| gr.Markdown(""" | |
| # π€ Smart RAG API | |
| ### Intelligent Document Q&A System | |
| Upload documents (PDF, DOCX, TXT, Images, CSV, SQLite) and ask questions about their content! | |
| **Supported formats**: PDF, Word, Text, Images (with OCR), CSV, SQLite databases | |
| """) | |
| with gr.Row(): | |
| # Left Column - File Upload | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π€ Upload Documents") | |
| file_input = gr.File( | |
| label="Choose File", | |
| file_types=[".pdf", ".docx", ".txt", ".jpg", ".jpeg", ".png", ".csv", ".db"], | |
| type="filepath" | |
| ) | |
| upload_btn = gr.Button("π Process Document", variant="primary", size="lg") | |
| upload_status = gr.Markdown("π No files uploaded yet") | |
| file_details = gr.Markdown("") | |
| gr.Markdown("---") | |
| # File Management | |
| with gr.Row(): | |
| refresh_btn = gr.Button("π Refresh Status", size="sm") | |
| clear_btn = gr.Button("ποΈ Clear All", size="sm", variant="secondary") | |
| # Right Column - Question Answering | |
| with gr.Column(scale=2): | |
| gr.Markdown("## β Ask Questions") | |
| question_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="What is this document about?", | |
| lines=2 | |
| ) | |
| image_input = gr.Image( | |
| label="Upload Image (Optional)", | |
| type="pil", | |
| height=150 | |
| ) | |
| ask_btn = gr.Button("π Get Answer", variant="primary", size="lg") | |
| # Results | |
| gr.Markdown("### π‘ Answer") | |
| answer_output = gr.Markdown( | |
| value="Ask a question to see the answer here...", | |
| elem_classes=["answer-box"] | |
| ) | |
| with gr.Accordion("π Context & Sources", open=False): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("**π Context Used:**") | |
| context_output = gr.Markdown(elem_classes=["context-box"]) | |
| with gr.Column(): | |
| gr.Markdown("**π Sources:**") | |
| sources_output = gr.Markdown(elem_classes=["sources-box"]) | |
| # Example Questions | |
| gr.Markdown(""" | |
| ## π‘ Example Questions | |
| Try asking questions like: | |
| - "What is the main topic of this document?" | |
| - "Summarize the key points" | |
| - "What are the important dates mentioned?" | |
| - "Who are the people mentioned in the document?" | |
| - "What are the financial figures?" | |
| """) | |
| # Sample Files | |
| with gr.Accordion("π Sample Files for Testing", open=False): | |
| gr.Markdown(""" | |
| You can test the system with these types of documents: | |
| - **PDF**: Research papers, reports, invoices | |
| - **Word**: Documents, proposals, contracts | |
| - **Text**: Plain text files, logs, notes | |
| - **Images**: Screenshots, scanned documents, diagrams | |
| - **CSV**: Data tables, spreadsheets | |
| - **Database**: SQLite files with structured data | |
| """) | |
| # Event handlers | |
| upload_btn.click( | |
| fn=process_uploaded_file, | |
| inputs=[file_input], | |
| outputs=[upload_status, file_details] | |
| ) | |
| ask_btn.click( | |
| fn=answer_question, | |
| inputs=[question_input, image_input], | |
| outputs=[answer_output, context_output, sources_output] | |
| ) | |
| refresh_btn.click( | |
| fn=get_uploaded_files_status, | |
| outputs=[upload_status] | |
| ) | |
| clear_btn.click( | |
| fn=clear_all_documents, | |
| outputs=[upload_status, file_details] | |
| ) | |
| # Auto-refresh status on file input change | |
| file_input.change( | |
| fn=lambda: get_uploaded_files_status(), | |
| outputs=[upload_status] | |
| ) | |
| # Launch configuration | |
| if __name__ == "__main__": | |
| print("π Launching Smart RAG API...") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, # Creates public link | |
| show_error=True, | |
| show_tips=True, | |
| enable_queue=True | |
| ) |