Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import gradio as gr | |
| import asyncio | |
| from openai import AsyncOpenAI | |
| from dotenv import load_dotenv | |
| from file_preprocessing import * | |
| from file_preprocessing import _extract_text_async | |
| from image_retrieval import intelligent_search_images, store_image_summaries_batch, process_and_store_image_with_figure_index | |
| from selfHosted_functions import * | |
| # Set up logging | |
| logging.basicConfig(level=logging.WARNING) # Reduced from INFO to WARNING | |
| logger = logging.getLogger(__name__) | |
| logging.getLogger("openai").setLevel(logging.WARNING) | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| #load environment variables | |
| load_dotenv() | |
| client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Global variables for RAG state (self-hosted only: FAISS + HF storage) | |
| current_rag_status = {"status": "none", "message": "No document processed yet"} | |
| current_document_name = "" | |
| markdown_kv_enabled = False # Toggle for Markdown-KV (table) conversion | |
| async def search_rag_documents(query: str, top_k: int = 5, method_suffix: str = "") -> list: | |
| """ | |
| Search for relevant documents using the self-hosted FAISS index. | |
| Args: | |
| query (str): The search query string to find relevant documents. | |
| top_k (int, optional): Maximum number of results to return. Defaults to 5. | |
| method_suffix (str, optional): Optional suffix for index name. | |
| Returns: | |
| list: A list of document results containing content, source, title, | |
| chunk_index, and total_chunks information. | |
| """ | |
| try: | |
| return await search_rag_documents_selfhosted(query, top_k, method_suffix) | |
| except Exception as e: | |
| logger.error(f"Error searching RAG documents: {e}") | |
| return [] | |
| async def generate_chat_response(user_question: str, chat_history: list) -> str: | |
| """ | |
| Generate chatbot response using GPT-4.1-nano with RAG context. | |
| This function creates an AI-powered response by first searching for relevant | |
| document chunks using the user's question, then using those chunks as context | |
| for the GPT-4.1-nano model to generate a comprehensive answer. The response | |
| is based on the uploaded and processed documents. | |
| Args: | |
| user_question (str): The user's question or query. | |
| chat_history (list): List of previous conversation messages in the format: | |
| [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] | |
| Returns: | |
| str: The AI-generated response based on the RAG context and chat history. | |
| Returns an error message if processing fails. | |
| Raises: | |
| Exception: Logs errors and returns error message instead of raising. | |
| Note: | |
| - Uses self-hosted FAISS for document search | |
| - Searches for top 3 most relevant document chunks | |
| - Uses temperature=0.0 for consistent, factual responses | |
| - Limited to 1000 max tokens for concise responses | |
| Example: | |
| >>> chat_history = [{"role": "user", "content": "What is machine learning?"}] | |
| >>> response = await generate_chat_response("How does it work?", chat_history) | |
| >>> print(response) | |
| """ | |
| try: | |
| # Search for relevant context | |
| relevant_docs = await search_rag_documents(user_question, top_k=3) | |
| # Prepare context from relevant documents | |
| context = "" | |
| if relevant_docs: | |
| context = "\n\n".join([ | |
| f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}" | |
| for doc in relevant_docs | |
| ]) | |
| # Prepare system message | |
| system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context. | |
| Use the following context to answer the user's question. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification. | |
| Context from document:{context} Answer the user's question based on the provided context. Be helpful, accurate, and concise.""" | |
| # Prepare messages | |
| messages = [{"role": "system", "content": system_message}] | |
| # Add chat history | |
| for message in chat_history: | |
| messages.append(message) | |
| # Add current question | |
| messages.append({"role": "user", "content": user_question}) | |
| # Generate response | |
| response = await client.chat.completions.create( | |
| model="gpt-4.1-nano", | |
| messages=messages, | |
| temperature=0.0 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| logger.error(f"Error generating chat response: {e}") | |
| return f"Sorry, I encountered an error while processing your question: {str(e)}" | |
| async def generate_chat_response_with_intelligent_images(user_question: str, chat_history: list) -> tuple: | |
| """ | |
| Generate chatbot response with intelligent image retrieval support. | |
| This enhanced version uses intelligent image retrieval to show only the most | |
| relevant images based on similarity scoring between user query and image summaries. | |
| Args: | |
| user_question (str): The user's question or query. | |
| chat_history (list): List of previous conversation messages. | |
| Returns: | |
| tuple: A tuple containing: | |
| - response_text (str): The AI-generated response based on RAG context | |
| - retrieved_images (dict): Dictionary of intelligently retrieved images | |
| """ | |
| try: | |
| # First, try intelligent image search | |
| intelligent_images = await intelligent_search_images(user_question, "selfhosted", top_k=1) | |
| if intelligent_images: | |
| # Use intelligent image search results | |
| logger.info(f"Found {len(intelligent_images)} relevant images using intelligent search") | |
| # Search for relevant text context | |
| relevant_docs = await search_rag_documents(user_question, top_k=3) | |
| # Prepare context from relevant documents | |
| text_context = "" | |
| if relevant_docs: | |
| text_context = "\n\n".join([ | |
| f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}" | |
| for doc in relevant_docs | |
| ]) | |
| # Fetch image summaries and add to context | |
| image_context = "" | |
| if intelligent_images: | |
| from image_retrieval import get_intelligent_retrieval | |
| retrieval = get_intelligent_retrieval("selfhosted") | |
| image_summaries_list = [] | |
| for image_id, image_data in intelligent_images.items(): | |
| try: | |
| # Get image summary | |
| summary = await retrieval.get_image_summary(image_id) | |
| if summary: | |
| # Format image summary with image ID for clarity | |
| image_summaries_list.append(f"Image {image_id}:\n{summary}") | |
| logger.info(f"Included image summary for {image_id} in LLM context") | |
| except Exception as e: | |
| logger.warning(f"Could not retrieve summary for {image_id}: {e}") | |
| if image_summaries_list: | |
| image_context = "\n\n--- Context from Retrieved Images ---\n" + "\n\n".join(image_summaries_list) | |
| # Combine text and image context | |
| full_context = "" | |
| if text_context: | |
| full_context = f"--- Context from Document Text ---\n{text_context}" | |
| if image_context: | |
| if full_context: | |
| full_context += "\n\n" + image_context | |
| else: | |
| full_context = image_context | |
| # Prepare system message | |
| system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context. | |
| Use the following context to answer the user's question. The context includes both text from the document and descriptions of retrieved images. | |
| If the context doesn't contain enough information to answer the question, say so politely and ask for clarification. | |
| Context:{full_context if full_context else "No context available"} | |
| Answer the user's question based on the provided context. Be helpful, accurate, and concise. When referring to figures or images, use the information from the image context section.""" | |
| # Prepare messages | |
| messages = [{"role": "system", "content": system_message}] | |
| # Add chat history | |
| for message in chat_history: | |
| messages.append(message) | |
| # Add current question | |
| messages.append({"role": "user", "content": user_question}) | |
| # Generate response | |
| response = await client.chat.completions.create( | |
| model="gpt-4.1-nano", | |
| messages=messages, | |
| temperature=0.0 | |
| ) | |
| response_text = response.choices[0].message.content.strip() | |
| # Return response text and associated images (no duplicate message) | |
| return response_text, intelligent_images | |
| else: | |
| # Fallback to original search if no intelligent images found | |
| logger.info("No relevant images found with intelligent search, falling back to original method") | |
| return await generate_chat_response(user_question, chat_history), {} | |
| except Exception as e: | |
| logger.error(f"Error in intelligent image search: {e}") | |
| # Fallback to original method | |
| return await generate_chat_response(user_question, chat_history), {} | |
| async def generate_chat_response_with_images(user_question: str, chat_history: list) -> tuple: | |
| """ | |
| Generate chatbot response with image retrieval support. | |
| This enhanced version of the chat response function not only searches for | |
| relevant text chunks but also retrieves associated images from the documents. | |
| It's particularly useful for documents containing visual content like diagrams, | |
| charts, or illustrations that can provide additional context for the user's question. | |
| Args: | |
| user_question (str): The user's question or query. | |
| chat_history (list): List of previous conversation messages in the format: | |
| [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] | |
| Returns: | |
| tuple: A tuple containing: | |
| - response_text (str): The AI-generated response based on RAG context | |
| - retrieved_images (dict): Dictionary of retrieved images with metadata | |
| Raises: | |
| Exception: Logs errors and returns error message with empty images dict. | |
| Note: | |
| - Uses self-hosted FAISS for text search and HF storage for image retrieval | |
| - Images are returned as base64-encoded data with metadata | |
| - Searches for top 3 most relevant document chunks | |
| Example: | |
| >>> chat_history = [] | |
| >>> response, images = await generate_chat_response_with_images("Show me the diagram", chat_history) | |
| >>> print(f"Response: {response}, Images found: {len(images)}") | |
| """ | |
| try: | |
| # Search for relevant context with images | |
| # Self-hosted: text search via FAISS; images via intelligent search if needed | |
| relevant_docs = await search_rag_documents(user_question, top_k=3) | |
| search_result = {"text_chunks": relevant_docs, "images": {}, "total_chunks": len(relevant_docs), "total_images": 0} | |
| # Prepare context from relevant documents | |
| context = "" | |
| if search_result.get("text_chunks"): | |
| context = "\n\n".join([ | |
| f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}" | |
| for doc in search_result["text_chunks"] | |
| ]) | |
| # Prepare system message | |
| system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context. | |
| Use the following context to answer the user's question. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification. | |
| Context from document:{context} Answer the user's question based on the provided context. Be helpful, accurate, and concise.""" | |
| # Prepare messages | |
| messages = [{"role": "system", "content": system_message}] | |
| # Add chat history | |
| for message in chat_history: | |
| messages.append(message) | |
| # Add current question | |
| messages.append({"role": "user", "content": user_question}) | |
| # Generate response | |
| response = await client.chat.completions.create( | |
| model="gpt-4.1-nano", | |
| messages=messages, | |
| temperature=0.0 | |
| ) | |
| response_text = response.choices[0].message.content.strip() | |
| # Return response text and associated images | |
| return response_text, search_result.get("images", {}) | |
| except Exception as e: | |
| logger.error(f"Error generating chat response with images: {e}") | |
| return f"Sorry, I encountered an error while processing your question: {str(e)}", {} | |
| def chat_with_rag(user_question: str, chat_history: list): | |
| """ | |
| Wrapper function for chat interface with image support and summaries. | |
| This function serves as the main interface for the chat functionality, | |
| handling the complete flow from user question to response with image display. | |
| It validates that a document has been processed, generates the AI response | |
| with image retrieval, updates the chat history, and formats images with their | |
| summaries for display in the Gradio interface. | |
| Args: | |
| user_question (str): The user's question or query. | |
| chat_history (list): List of previous conversation messages in the format: | |
| [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] | |
| Returns: | |
| tuple: A tuple containing: | |
| - empty_string (str): Empty string to clear the input field | |
| - updated_chat_history (list): Updated chat history with new messages | |
| - image_path (str): Path to the most relevant image file | |
| - image_summary (str): AI-generated summary of the image | |
| Raises: | |
| Exception: Logs errors and adds error message to chat history. | |
| Note: | |
| - Validates that a document has been successfully processed | |
| - Creates temporary files for image display in Gradio | |
| - Handles base64 image decoding and file creation | |
| - Retrieves and includes image summaries for each image | |
| - Updates global chat history state | |
| - Returns empty string to clear the input field after processing | |
| Example: | |
| >>> chat_history = [] | |
| >>> empty_input, history, image, summary = chat_with_rag("What does this chart show?", chat_history) | |
| >>> print(f"Chat updated: {len(history)} messages, image: {image}, summary: {summary}") | |
| """ | |
| try: | |
| if current_rag_status["status"] != "success": | |
| return "Please upload and process a document first before asking questions.", chat_history, None, "", "" | |
| # Generate response with intelligent images | |
| response, retrieved_images = asyncio.run(generate_chat_response_with_intelligent_images(user_question, chat_history)) | |
| # Update chat history | |
| chat_history.append({"role": "user", "content": user_question}) | |
| chat_history.append({"role": "assistant", "content": response}) | |
| # Convert images to display format with summaries | |
| image_path = None | |
| image_summary = "" | |
| if retrieved_images: | |
| # Get intelligent retrieval instance to fetch summaries | |
| from image_retrieval import get_intelligent_retrieval | |
| retrieval = get_intelligent_retrieval("selfhosted") | |
| # Get the first (and only) image | |
| for image_id, image_data in retrieved_images.items(): | |
| if 'error' not in image_data and 'base64_data' in image_data: | |
| # Create a temporary file with a short path for Gradio | |
| import tempfile | |
| import base64 | |
| try: | |
| # Decode base64 and create temporary file | |
| image_bytes = base64.b64decode(image_data['base64_data']) | |
| # Create temp file with short name | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg', prefix='img_') as temp_file: | |
| temp_file.write(image_bytes) | |
| image_path = temp_file.name | |
| # Get image summary | |
| image_summary = asyncio.run(retrieval.get_image_summary(image_id)) | |
| if not image_summary: | |
| image_summary = f"Image {image_id}: Visual content from document" | |
| break # Only process the first image since we only want top 1 | |
| except Exception as e: | |
| logger.error(f"Error creating temp file for {image_id}: {e}") | |
| continue | |
| else: | |
| logger.warning(f"Skipping image {image_id}: {image_data.get('error', 'No base64_data')}") | |
| # Route image summary to appropriate display component based on Markdown-KV setting | |
| if markdown_kv_enabled: | |
| # Display in markdown component for table formatting support | |
| formatted_summary = image_summary if image_summary else "*No image summary available*" | |
| return "", chat_history, image_path, formatted_summary, "" | |
| else: | |
| # Display in text component for standard summaries | |
| return "", chat_history, image_path, "", image_summary | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| logger.error(f"Error in chat_with_rag: {str(e)}") | |
| chat_history.append({"role": "user", "content": user_question}) | |
| chat_history.append({"role": "assistant", "content": error_msg}) | |
| return "", chat_history, None, "", "" | |
| async def auto_process_input(file, youtube_id, web_url) -> tuple: | |
| """ | |
| Automatically process input: extract text and create RAG. | |
| This is the main processing function that handles the complete pipeline from | |
| input (file, YouTube video, or web URL) to a fully functional RAG system. | |
| It supports multiple file types including PDF, PPTX, DOCX, and other formats, | |
| with special handling for documents containing images. | |
| Args: | |
| file: Uploaded file object (can be None). Supported formats: | |
| - PDF: Extracts text and images separately, generates image summaries | |
| - PPTX: Extracts text and images from slides, generates image summaries | |
| - DOCX: Extracts text and images, generates image summaries | |
| - Other formats: Standard text extraction | |
| youtube_id (str): YouTube video ID or URL (can be None or empty). | |
| web_url (str): Web URL to scrape content from (can be None or empty). | |
| Returns: | |
| tuple: A tuple containing: | |
| - extracted_text (str): The processed and enhanced text content | |
| - status_msg (str): Human-readable status message for the UI | |
| Raises: | |
| Exception: Logs errors and returns error message with status. | |
| Note: | |
| - Updates global variables: current_rag_status, current_document_name | |
| - For PDF/PPTX/DOCX: Extracts images, generates GPT summaries, merges with text | |
| - Creates RAG using self-hosted FAISS | |
| - Provides progress updates through callback functions | |
| - Handles file size limits and format validation | |
| Example: | |
| >>> text, status = await auto_process_input(file_obj, None, None) | |
| >>> print(f"Processing result: {status}") | |
| """ | |
| global current_rag_status, current_document_name | |
| try: | |
| # Determine input type and validate | |
| input_type = None | |
| source_name = "" | |
| if file is not None: | |
| input_type = "file" | |
| source_name = os.path.basename(file.name) if hasattr(file, 'name') else "document" | |
| elif youtube_id and youtube_id.strip(): | |
| input_type = "youtube" | |
| source_name = f"YouTube Video: {youtube_id.strip()}" | |
| elif web_url and web_url.strip(): | |
| input_type = "web" | |
| source_name = f"Web URL: {web_url.strip()}" | |
| else: | |
| return "No input provided", "Please provide a file, YouTube video, or web URL" | |
| # Clear old images and summaries from self-hosted (HF) storage first | |
| logger.info("Clearing old images and summaries from self-hosted storage...") | |
| from hf_storage import clear_all_images, clear_all_summaries | |
| from image_retrieval import clear_image_summaries | |
| await clear_all_images() | |
| await clear_all_summaries() | |
| await clear_image_summaries("selfhosted") | |
| # Update status | |
| current_rag_status = {"status": "processing", "message": f"Extracting text from {input_type}..."} | |
| status_msg = f"🔄 Extracting text from {input_type}..." | |
| # Check if it's a PDF or PPTX file for enhanced processing | |
| if file is not None and hasattr(file, 'name'): | |
| file_ext = os.path.splitext(file.name)[1].lower() | |
| if file_ext == ".pdf": | |
| # Extract images and text separately | |
| from file_preprocessing import extract_images_from_pdf, process_pdf_hybrid, summarize_image_with_gpt, capture_page_images_as_fallback | |
| # Define progress callback to update status | |
| async def update_progress(message): | |
| nonlocal status_msg | |
| status_msg = message | |
| current_rag_status["message"] = message | |
| # Use hybrid method: HTML + PDF images with Gemini | |
| await update_progress("🔄 Processing PDF with hybrid method (HTML + Images with Gemini)...") | |
| extracted_text = await process_pdf_hybrid(file.name, update_progress) | |
| # Extract images separately for display/retrieval | |
| await update_progress("📸 Extracting images from PDF for retrieval...") | |
| images_data = await extract_images_from_pdf(file.name, None, update_progress, "selfhosted") | |
| # Check if no images were found - if so, capture page images as fallback | |
| no_images_message = "" | |
| if not images_data['image_blob_urls']: | |
| await update_progress("📸 No extractable images found, capturing page images as fallback...") | |
| images_data = await capture_page_images_as_fallback(file.name, "selfhosted", update_progress) | |
| if not images_data['image_blob_urls']: | |
| no_images_message = "No images found in this document." | |
| # Step 3: Generate image summaries | |
| image_summaries = {} | |
| figure_metadata_dict = {} # Store figure metadata for index | |
| used_figures = set() # Track used figures for conflict resolution | |
| if images_data['image_blob_urls']: | |
| await update_progress("🤖 Generating image summaries with GPT...") | |
| total_images = len(images_data['image_blob_urls']) | |
| for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1): | |
| try: | |
| summary = await summarize_image_with_gpt( | |
| blob_url, | |
| image_id, | |
| "selfhosted", | |
| markdown_kv_enabled, | |
| document_text=extracted_text | |
| ) | |
| image_summaries[image_id] = summary | |
| # Process with figure index (merge visual elements and store) | |
| from image_retrieval import process_and_store_image_with_figure_index | |
| enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index( | |
| image_id, summary, extracted_text, "selfhosted", used_figures | |
| ) | |
| # Update with enhanced summary | |
| image_summaries[image_id] = enhanced_summary | |
| # Store figure metadata if found | |
| if figure_metadata: | |
| figure_metadata_dict[image_id] = figure_metadata | |
| except Exception as e: | |
| logger.error(f"Error summarizing image {image_id}: {e}") | |
| image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" | |
| # Store summaries | |
| await update_progress("💾 Storing image summaries for intelligent retrieval...") | |
| await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict) | |
| # Step 4: Merge text with image summaries | |
| await update_progress("🔗 Merging text with image summaries...") | |
| # Split text by pages and add image summaries | |
| text_sections = extracted_text.split("--- PAGE") | |
| enhanced_sections = [] | |
| for i, section in enumerate(text_sections): | |
| if not section.strip(): | |
| continue | |
| page_num = i | |
| if page_num in images_data['image_ids_by_page']: | |
| image_ids = images_data['image_ids_by_page'][page_num] | |
| image_summary_text = "\n\nImage Summaries:\n" | |
| for img_id in image_ids: | |
| if img_id in image_summaries: | |
| image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n" | |
| else: | |
| image_summary_text += f"- {img_id}: [Image content from PDF]\n" | |
| enhanced_sections.append(f"--- PAGE{section}{image_summary_text}") | |
| else: | |
| enhanced_sections.append(f"--- PAGE{section}") | |
| final_text = "\n".join(enhanced_sections) | |
| # Create pipeline result format | |
| pipeline_result = { | |
| "status": "success", | |
| "vectorized_ready_text": final_text, | |
| "image_blob_urls": images_data['image_blob_urls'], | |
| "image_summaries": image_summaries, | |
| "total_pages": images_data['total_pages'], | |
| "total_images": len(images_data['image_blob_urls']), | |
| "no_images_message": no_images_message | |
| } | |
| if pipeline_result["status"] == "success": | |
| # Create RAG from the vectorized-ready text | |
| current_document_name = source_name | |
| source_info = source_name | |
| rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress) | |
| if rag_result["status"] == "success": | |
| current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"} | |
| status_msg = """✅ PDF processed successfully (self-hosted)!""" | |
| else: | |
| current_rag_status = {"status": "error", "message": rag_result['message']} | |
| status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" | |
| return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "") | |
| else: | |
| current_rag_status = {"status": "error", "message": pipeline_result['message']} | |
| return pipeline_result['message'], f"❌ PDF Pipeline Failed: {pipeline_result['message']}", {}, "" | |
| elif file_ext == ".pptx": | |
| # Extract images and text separately | |
| from file_preprocessing import extract_images_from_pptx, process_pptx, summarize_image_with_gpt | |
| # Define progress callback to update status | |
| async def update_progress(message): | |
| nonlocal status_msg | |
| status_msg = message | |
| current_rag_status["message"] = message | |
| # Step 1: Extract images | |
| await update_progress("📸 Extracting images from PPTX...") | |
| images_data = await extract_images_from_pptx(file.name, None, update_progress, "selfhosted") | |
| # Check if no images were found | |
| no_images_message = "" | |
| if not images_data['image_blob_urls']: | |
| no_images_message = "No images found in this document." | |
| # Step 3: Extract text | |
| await update_progress("📄 Extracting text from PPTX...") | |
| extracted_text = await process_pptx(file.name, update_progress) | |
| # Step 4: Generate image summaries | |
| image_summaries = {} | |
| if images_data['image_blob_urls']: | |
| await update_progress("🤖 Generating image summaries with GPT...") | |
| total_images = len(images_data['image_blob_urls']) | |
| figure_metadata_dict = {} # Store figure metadata for index | |
| used_figures = set() # Track used figures for conflict resolution | |
| for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1): | |
| try: | |
| # Get AI summary | |
| ai_summary = await summarize_image_with_gpt(blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text) | |
| # Process with figure index (merge visual elements and store) | |
| from image_retrieval import process_and_store_image_with_figure_index | |
| enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index( | |
| image_id, ai_summary, extracted_text, "selfhosted", used_figures | |
| ) | |
| image_summaries[image_id] = enhanced_summary | |
| # Store figure metadata if found | |
| if figure_metadata: | |
| figure_metadata_dict[image_id] = figure_metadata | |
| except Exception as e: | |
| logger.error(f"Error summarizing image {image_id}: {e}") | |
| image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" | |
| # Note: Summaries and figure index are already stored by process_and_store_image_with_figure_index | |
| # But we still call store_image_summaries_batch for backward compatibility | |
| await update_progress("💾 Storing image summaries for intelligent retrieval...") | |
| await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict) | |
| # Step 4: Merge text with image summaries | |
| await update_progress("🔗 Merging text with image summaries...") | |
| # Split text by slides and add image summaries | |
| text_sections = extracted_text.split("=== SLIDE") | |
| enhanced_sections = [] | |
| for i, section in enumerate(text_sections): | |
| if not section.strip(): | |
| continue | |
| slide_num = i | |
| if slide_num in images_data['image_ids_by_slide']: | |
| image_ids = images_data['image_ids_by_slide'][slide_num] | |
| image_summary_text = "\n\nImage Summaries:\n" | |
| for img_id in image_ids: | |
| if img_id in image_summaries: | |
| image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n" | |
| else: | |
| image_summary_text += f"- {img_id}: [Image content from PPTX]\n" | |
| enhanced_sections.append(f"=== SLIDE{section}{image_summary_text}") | |
| else: | |
| enhanced_sections.append(f"=== SLIDE{section}") | |
| final_text = "\n".join(enhanced_sections) | |
| # Create pipeline result format | |
| pipeline_result = { | |
| "status": "success", | |
| "vectorized_ready_text": final_text, | |
| "image_blob_urls": images_data['image_blob_urls'], | |
| "image_summaries": image_summaries, | |
| "total_slides": images_data['total_slides'], | |
| "total_images": len(images_data['image_blob_urls']), | |
| "no_images_message": no_images_message | |
| } | |
| if pipeline_result["status"] == "success": | |
| # Create RAG from the vectorized-ready text | |
| current_document_name = source_name | |
| source_info = source_name | |
| rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress) | |
| if rag_result["status"] == "success": | |
| current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"} | |
| status_msg = """✅ PPTX processed successfully (self-hosted)!""" | |
| else: | |
| current_rag_status = {"status": "error", "message": rag_result['message']} | |
| status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" | |
| return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "") | |
| else: | |
| current_rag_status = {"status": "error", "message": pipeline_result['message']} | |
| return pipeline_result['message'], f"❌ PPTX Pipeline Failed: {pipeline_result['message']}", {}, "" | |
| elif file_ext in [".doc", ".docx"]: | |
| # Extract images and text separately | |
| from file_preprocessing import extract_images_from_docx, process_docx, summarize_image_with_gpt | |
| # Define progress callback to update status | |
| async def update_progress(message): | |
| nonlocal status_msg | |
| status_msg = message | |
| current_rag_status["message"] = message | |
| # Step 1: Extract images | |
| await update_progress("📸 Extracting images from DOCX...") | |
| images_data = await extract_images_from_docx(file.name, None, update_progress, "selfhosted") | |
| # Check if no images were found | |
| no_images_message = "" | |
| if not images_data['image_blob_urls']: | |
| no_images_message = "No images found in this document." | |
| # Step 3: Extract text | |
| await update_progress("📄 Extracting text from DOCX...") | |
| extracted_text = await process_docx(file.name, update_progress) | |
| # Step 4: Generate image summaries | |
| image_summaries = {} | |
| if images_data['image_blob_urls']: | |
| await update_progress("🤖 Generating image summaries with GPT...") | |
| total_images = len(images_data['image_blob_urls']) | |
| figure_metadata_dict = {} # Store figure metadata for index | |
| used_figures = set() # Track used figures for conflict resolution | |
| for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1): | |
| try: | |
| # Get AI summary | |
| ai_summary = await summarize_image_with_gpt(blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text) | |
| # Process with figure index (merge visual elements and store) | |
| from image_retrieval import process_and_store_image_with_figure_index | |
| enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index( | |
| image_id, ai_summary, extracted_text, "selfhosted", used_figures | |
| ) | |
| image_summaries[image_id] = enhanced_summary | |
| # Store figure metadata if found | |
| if figure_metadata: | |
| figure_metadata_dict[image_id] = figure_metadata | |
| except Exception as e: | |
| logger.error(f"Error summarizing image {image_id}: {e}") | |
| image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" | |
| # Note: Summaries and figure index are already stored by process_and_store_image_with_figure_index | |
| # But we still call store_image_summaries_batch for backward compatibility | |
| await update_progress("💾 Storing image summaries for intelligent retrieval...") | |
| await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict) | |
| # Step 4: Merge text with image summaries | |
| await update_progress("🔗 Merging text with image summaries...") | |
| # Add image summaries to the document text | |
| if images_data['image_ids']: | |
| image_summary_text = "\n\nImage Summaries:\n" | |
| for img_id in images_data['image_ids']: | |
| if img_id in image_summaries: | |
| image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n" | |
| else: | |
| image_summary_text += f"- {img_id}: [Image content from DOCX]\n" | |
| final_text = extracted_text + image_summary_text | |
| else: | |
| final_text = extracted_text | |
| # Create pipeline result format | |
| pipeline_result = { | |
| "status": "success", | |
| "vectorized_ready_text": final_text, | |
| "image_blob_urls": images_data['image_blob_urls'], | |
| "image_summaries": image_summaries, | |
| "total_paragraphs": images_data['total_paragraphs'], | |
| "total_images": len(images_data['image_blob_urls']), | |
| "no_images_message": no_images_message | |
| } | |
| if pipeline_result["status"] == "success": | |
| # Create RAG from the vectorized-ready text | |
| current_document_name = source_name | |
| source_info = source_name | |
| rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress) | |
| if rag_result["status"] == "success": | |
| current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"} | |
| status_msg = """✅ DOCX processed successfully (self-hosted)!""" | |
| else: | |
| current_rag_status = {"status": "error", "message": rag_result['message']} | |
| status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" | |
| return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "") | |
| else: | |
| current_rag_status = {"status": "error", "message": pipeline_result['message']} | |
| return pipeline_result['message'], f"❌ DOCX Pipeline Failed: {pipeline_result['message']}", {}, "" | |
| # Handle web URL processing specially to get both text and image summaries | |
| image_summaries = {} | |
| display_text = "" | |
| if web_url and web_url.strip(): | |
| try: | |
| # Get the full result from process_web_url to access both text and image summaries | |
| from file_preprocessing import process_web_url | |
| web_result = await process_web_url(web_url.strip(), "selfhosted") | |
| if isinstance(web_result, dict): | |
| extracted_text = web_result["text"].strip() # Full content for RAG | |
| display_text = web_result.get("display_text", extracted_text).strip() # Clean text for UI | |
| image_summaries = web_result.get("image_summaries", {}) | |
| else: | |
| extracted_text = web_result.strip() | |
| display_text = extracted_text | |
| image_summaries = {} | |
| except Exception as e: | |
| logger.error(f"Error processing web URL: {e}") | |
| extracted_text = f"Error processing URL: {str(e)}" | |
| display_text = extracted_text | |
| image_summaries = {} | |
| else: | |
| # For non-web URLs, use the original processing | |
| extracted_text = await _extract_text_async(file, youtube_id, web_url, "selfhosted") | |
| display_text = extracted_text | |
| if extracted_text.startswith("Error"): | |
| current_rag_status = {"status": "error", "message": extracted_text} | |
| return extracted_text, f"❌ {extracted_text}", {}, "" | |
| # Update status | |
| current_rag_status = {"status": "processing", "message": "Creating RAG (self-hosted)..."} | |
| status_msg = "🤖 Creating RAG (self-hosted)..." | |
| # Create RAG with progress callback | |
| current_document_name = source_name | |
| source_info = source_name | |
| # Define progress callback to update status | |
| async def update_progress(message): | |
| nonlocal status_msg | |
| status_msg = message | |
| current_rag_status["message"] = message | |
| rag_result = await create_rag_from_text_selfhosted(extracted_text, source_info, update_progress) | |
| if rag_result["status"] == "success": | |
| current_rag_status = {"status": "success", "message": "RAG created successfully (self-hosted)"} | |
| status_msg = f"""✅ {input_type.title()} processed successfully (self-hosted)!""" | |
| else: | |
| current_rag_status = {"status": "error", "message": rag_result['message']} | |
| status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" | |
| # Return appropriate no_images_message based on input type | |
| no_images_message = "" | |
| if not image_summaries: | |
| if web_url and web_url.strip(): | |
| no_images_message = "No images found on this web page." | |
| elif youtube_id and youtube_id.strip(): | |
| no_images_message = "No images found in this YouTube video." | |
| else: | |
| no_images_message = "No images found in this document." | |
| return display_text, status_msg, image_summaries, no_images_message | |
| except Exception as e: | |
| error_msg = f"Error processing {input_type if 'input_type' in locals() else 'input'}: {str(e)}" | |
| current_rag_status = {"status": "error", "message": error_msg} | |
| return error_msg, f"❌ {error_msg}", {}, "" | |
| def auto_process_input_sync(file, youtube_id, web_url): | |
| """ | |
| Synchronous wrapper for auto_process_input. | |
| This function provides a synchronous interface to the async auto_process_input | |
| function, making it compatible with Gradio's synchronous event handlers. | |
| It runs the async function using asyncio.run() and returns the same results. | |
| Args: | |
| file: Uploaded file object (can be None). | |
| youtube_id (str): YouTube video ID or URL (can be None or empty). | |
| web_url (str): Web URL to scrape content from (can be None or empty). | |
| Returns: | |
| tuple: A tuple containing: | |
| - extracted_text (str): The processed and enhanced text content | |
| - status_msg (str): Human-readable status message for the UI | |
| - image_summaries (dict): Dictionary of image summaries | |
| - no_images_message (str): Message if no images were found | |
| Note: | |
| This is a convenience wrapper that allows the async auto_process_input | |
| function to be used in synchronous contexts like Gradio event handlers. | |
| Example: | |
| >>> text, status, images, no_images = auto_process_input_sync(file_obj, None, None) | |
| >>> print(f"Sync processing result: {status}") | |
| """ | |
| return asyncio.run(auto_process_input(file, youtube_id, web_url)) | |
| # Unified APIs for MCP exposure | |
| async def create_rag_from_text_unified(text: str, source_info: str, provider: str = "selfhosted", progress_callback=None): | |
| """ | |
| Create RAG from text using self-hosted FAISS. | |
| Args: | |
| text (str): The text content to create RAG from. | |
| source_info (str): Information about the source of the text (filename, URL, etc.). | |
| provider (str): Ignored; kept for API compatibility. Self-hosted only. | |
| progress_callback: Optional callback function for progress updates. | |
| Returns: | |
| dict: Result dictionary with status and message. | |
| """ | |
| try: | |
| return await create_rag_from_text_selfhosted(text, source_info, progress_callback) | |
| except Exception as e: | |
| logger.error(f"Error in create_rag_from_text: {e}") | |
| return {"status": "error", "message": str(e)} | |
| async def search_rag_documents_unified(query: str, top_k: int = 5, provider: str = "selfhosted") -> list: | |
| """ | |
| Search RAG documents using self-hosted FAISS. | |
| Args: | |
| query (str): The search query string. | |
| top_k (int): Maximum number of results to return. | |
| provider (str): Ignored; kept for API compatibility. Self-hosted only. | |
| Returns: | |
| list: Document results with content, source, title, chunk_index, total_chunks. | |
| """ | |
| try: | |
| return await search_rag_documents_selfhosted(query, top_k) | |
| except Exception as e: | |
| logger.error(f"Error in search_rag_documents: {e}") | |
| return [] | |
| async def extract_text_async_unified(file_path=None, youtube_id=None, web_url=None, provider: str = "selfhosted"): | |
| """ | |
| Unified API to extract text from files, YouTube videos, or web URLs using either AWS or Azure. | |
| Args: | |
| file_path (str, optional): Path to the file to extract text from. | |
| youtube_id (str, optional): YouTube video ID or URL to extract text from. | |
| web_url (str, optional): Web URL to scrape and extract text from. | |
| provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". | |
| Returns: | |
| str: Extracted text content or error message if extraction fails. | |
| Example: | |
| >>> text = await extract_text_async_unified("/path/to/document.pdf", provider="aws") | |
| >>> print(f"Extracted {len(text)} characters") | |
| """ | |
| try: | |
| # Create a mock file object for file_path if provided | |
| file_obj = None | |
| if file_path and os.path.exists(file_path): | |
| class MockFile: | |
| def __init__(self, path): | |
| self.name = path | |
| self.size = os.path.getsize(path) | |
| file_obj = MockFile(file_path) | |
| return await _extract_text_async(file_obj, youtube_id, web_url, "selfhosted") | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # Synchronous wrappers for MCP exposure | |
| def create_rag_from_text_sync(text: str, source_info: str, provider: str = "selfhosted"): | |
| """ | |
| Synchronous wrapper for create_rag_from_text_unified. | |
| Args: | |
| text (str): The text content to create RAG from. | |
| source_info (str): Information about the source of the text. | |
| provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". | |
| Returns: | |
| dict: Result dictionary containing status and message. | |
| """ | |
| return asyncio.run(create_rag_from_text_unified(text, source_info, provider)) | |
| def search_rag_documents_sync(query: str, top_k: int = 5, provider: str = "selfhosted"): | |
| """ | |
| Synchronous wrapper for search_rag_documents_unified. | |
| Args: | |
| query (str): The search query string. | |
| top_k (int): Maximum number of results to return. | |
| provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". | |
| Returns: | |
| list: A list of document results. | |
| """ | |
| return asyncio.run(search_rag_documents_unified(query, top_k, provider)) | |
| def extract_text_sync(file_path=None, youtube_id=None, web_url=None, provider: str = "selfhosted"): | |
| """ | |
| Synchronous wrapper for extract_text_async_unified. | |
| This function provides a synchronous interface to the async text extraction | |
| function, making it compatible with Gradio's synchronous event handlers | |
| and MCP (Model Context Protocol) exposure. It handles file path conversion | |
| and runs the async extraction function. | |
| Args: | |
| file_path (str, optional): Path to the file to extract text from. | |
| youtube_id (str, optional): YouTube video ID or URL to extract text from. | |
| web_url (str, optional): Web URL to scrape and extract text from. | |
| provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". | |
| Returns: | |
| str: Extracted text content or error message if extraction fails. | |
| Raises: | |
| Exception: Logs errors and returns error message instead of raising. | |
| Note: | |
| - Creates a mock file object if file_path is provided | |
| - Uses asyncio.run() to execute the async extraction function | |
| - Primarily used for MCP server exposure and testing | |
| Example: | |
| >>> text = extract_text_sync("/path/to/document.pdf", provider="aws") | |
| >>> print(f"Extracted {len(text)} characters") | |
| """ | |
| return asyncio.run(extract_text_async_unified(file_path, youtube_id, web_url, provider)) | |
| def clear_s3_images_sync(): | |
| """ | |
| Synchronous wrapper for clearing S3 images. | |
| This function provides a synchronous interface to clear all images from S3, | |
| making it compatible with Gradio's synchronous event handlers. | |
| Returns: | |
| dict: Result dictionary containing: | |
| - status (str): "success" or "error" | |
| - message (str): Success message or error description | |
| Note: | |
| - Uses the AWS S3 client to clear all images from the bucket | |
| - Requires AWS credentials to be configured in environment variables | |
| - Clears images from pdf_images, pptx_images, docx_images, and web_images folders | |
| Example: | |
| >>> result = clear_s3_images_sync() | |
| >>> print(f"Image clearing: {result['status']}") | |
| """ | |
| try: | |
| from aws_functions import clear_images_from_s3 | |
| success = asyncio.run(clear_images_from_s3()) | |
| if success: | |
| return {"status": "success", "message": "Successfully cleared all images from S3 bucket"} | |
| else: | |
| return {"status": "error", "message": "Failed to clear images from S3 bucket"} | |
| except Exception as e: | |
| return {"status": "error", "message": f"Error clearing S3 images: {str(e)}"} | |
| # Create Gradio interface | |
| def create_gradio_app(): | |
| """ | |
| Create and configure the Gradio web application interface. | |
| This function builds a comprehensive web interface for the RAG chat application, | |
| including file upload, YouTube/URL input, provider selection, chat interface, | |
| and image gallery. It also exposes core functions for MCP (Model Context Protocol) | |
| server integration. | |
| Returns: | |
| gr.Blocks: A configured Gradio Blocks application with: | |
| - File upload interface supporting multiple formats | |
| - YouTube video and web URL input fields | |
| - Self-hosted RAG (FAISS + HF storage) | |
| - Real-time processing status display | |
| - Interactive chat interface with message history | |
| - Image gallery for retrieved document images | |
| - Hidden MCP-exposed functions for external integration | |
| Features: | |
| - Responsive design with custom CSS styling | |
| - Automatic input processing on file/URL change | |
| - Progress indicators and status updates | |
| - Image display with preview and download capabilities | |
| - Chat history management with clear functionality | |
| - Provider switching with real-time updates | |
| - MCP server compatibility for external tool integration | |
| Supported File Types: | |
| - Documents: PDF, DOC, DOCX, TXT, PPTX, XLSX | |
| - Media: MP3, WAV, MP4 | |
| - Images: JPG, JPEG, PNG | |
| - CAD: DXF | |
| Example: | |
| >>> app = create_gradio_app() | |
| >>> app.launch(server_name="0.0.0.0", server_port=7860) | |
| """ | |
| css = """ | |
| #file-upload { | |
| min-height: 255px !important; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="Document Chat Assistant", | |
| theme=gr.themes.Soft(), | |
| css=css | |
| ) as app: | |
| with gr.Tab("Create RAG"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| markdown_kv_toggle = gr.Checkbox( | |
| label="🔄 Enable Markdown-KV (Table) Conversion", | |
| value=False, | |
| info="When enabled, table images will be converted to markdown format" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown( | |
| """ | |
| **⚠️ File size limit:** Maximum 20 MB per file | |
| **🚀 Auto Processing:** Upload, paste YouTube or Web URL! | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_input = gr.File( | |
| label="📁 Upload Document", | |
| file_types=[".pdf", ".doc", ".docx", ".txt", ".pptx", ".xlsx", ".mp3", ".wav", ".mp4", ".dxf", ".jpg", ".jpeg", ".png"], | |
| height=255, | |
| elem_id="file-upload" | |
| ) | |
| with gr.Column(scale=1): | |
| youtube_input = gr.Textbox( | |
| label="🎥 YouTube Video", | |
| placeholder="Enter YouTube URL or ID", | |
| lines=1, | |
| ) | |
| web_url_input = gr.Textbox( | |
| label="🌐 Web URL", | |
| placeholder="Enter web URL", | |
| lines=1, | |
| ) | |
| clear_file_btn = gr.Button( | |
| "🗑️ Clear All", | |
| variant="secondary", | |
| size="lg", | |
| ) | |
| notification_display = gr.Markdown("") | |
| doc_status = gr.Textbox( | |
| label="Current Document", | |
| value="No document processed yet", | |
| interactive=False, | |
| lines=2, | |
| visible=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 📄 Extracted Text") | |
| extracted_text_display = gr.Textbox( | |
| label="Document Content", | |
| lines=10, | |
| max_lines=15, | |
| show_copy_button=True, | |
| container=True, | |
| interactive=False, | |
| placeholder="Extracted text will appear here after processing...", | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### 🖼️ Image Descriptions") | |
| image_descriptions_display = gr.Textbox( | |
| label="AI-Generated Image Summaries", | |
| lines=10, | |
| max_lines=15, | |
| show_copy_button=True, | |
| container=True, | |
| interactive=False, | |
| placeholder="Image descriptions will appear here after processing...", | |
| ) | |
| with gr.Tab("Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### 💬 Chat with Your Document") | |
| chatbot = gr.Chatbot( | |
| label="Chat History", | |
| height=550, | |
| type="messages", | |
| allow_tags=False | |
| ) | |
| msg_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Ask a question about your uploaded document...", | |
| lines=3, | |
| ) | |
| with gr.Row(): | |
| send_btn = gr.Button( | |
| "Send", | |
| variant="primary", | |
| size="lg", | |
| ) | |
| clear_chat_btn = gr.Button( | |
| "🗑️ Clear Chat", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📸 Most Relevant Image") | |
| image_display = gr.Image( | |
| label="", | |
| height=250, | |
| show_label=False, | |
| container=True, | |
| show_download_button=True, | |
| ) | |
| gr.Markdown("### 📝 AI-Generated Summary") | |
| image_summary_markdown = gr.Markdown( | |
| value="*Image summary will appear here...*", | |
| visible=False | |
| ) | |
| image_summary_text = gr.Textbox( | |
| label="", | |
| lines=20, | |
| max_lines=20, | |
| show_copy_button=True, | |
| container=True, | |
| interactive=False, | |
| placeholder="Image summary will appear here...", | |
| visible=True | |
| ) | |
| # Event handlers for automatic input processing | |
| def process_input_and_update_status(file, youtube_id, web_url, chat_history): | |
| """ | |
| Process input and return status only (no text preview). | |
| This function handles the automatic processing of user inputs (file upload, | |
| YouTube URL, or web URL) and updates the UI with processing status. | |
| It clears the chat history when new input is provided and provides | |
| real-time feedback on the processing progress. | |
| Args: | |
| file: Uploaded file object (can be None). | |
| youtube_id (str): YouTube video ID or URL (can be None or empty). | |
| web_url (str): Web URL to scrape content from (can be None or empty). | |
| chat_history (list): Current chat history (will be cleared for new input). | |
| Returns: | |
| tuple: A tuple containing: | |
| - status_msg (str): Processing status message for display | |
| - doc_status_msg (str): Document status message for chat interface | |
| - cleared_chat (list): Empty chat history (cleared for new input) | |
| - empty_images (list): Empty image gallery (cleared for new input) | |
| - extracted_text (str): Extracted text content | |
| - image_descriptions (str): Formatted image descriptions | |
| Note: | |
| - Validates that at least one input is provided | |
| - Clears chat history when new input is processed | |
| - Updates global variables for document status | |
| - Provides user-friendly status messages with emojis | |
| - Handles error states and provides appropriate feedback | |
| - Formats image descriptions for display | |
| """ | |
| # Check if any input is provided | |
| if file is None and (not youtube_id or not youtube_id.strip()) and (not web_url or not web_url.strip()): | |
| return "No input provided", "No document processed yet", chat_history, None, "", "", "", "" | |
| # Clear chat history when new input is provided | |
| cleared_chat = [] | |
| # Process input | |
| extracted_text, status_msg, image_summaries, no_images_message = auto_process_input_sync(file, youtube_id, web_url) | |
| # Format image descriptions for display | |
| image_descriptions = "" | |
| if image_summaries: | |
| image_descriptions = "AI-Generated Image Descriptions:\n\n" | |
| for image_id, description in image_summaries.items(): | |
| image_descriptions += f"🖼️ {image_id}:\n{description}\n\n" | |
| elif no_images_message: | |
| image_descriptions = no_images_message | |
| else: | |
| image_descriptions = "No images found in this document." | |
| # Update document status | |
| global current_document_name, current_rag_status | |
| if current_rag_status["status"] == "success": | |
| doc_status_msg = f"✅ Ready: {current_document_name}" | |
| elif current_rag_status["status"] == "processing": | |
| doc_status_msg = f"🔄 Processing: {current_document_name}" | |
| elif current_rag_status["status"] == "error": | |
| doc_status_msg = f"❌ Error: {current_rag_status['message']}" | |
| else: | |
| doc_status_msg = "No document processed yet" | |
| # Keep AI-Generated Summary empty during document processing - only populate during chat | |
| return status_msg, doc_status_msg, cleared_chat, None, "", "", extracted_text, image_descriptions | |
| # Clear function | |
| def clear_all_inputs(): | |
| """ | |
| Clear all input fields and reset the interface. | |
| This function resets all input fields (file upload, YouTube URL, | |
| web URL) and clears the chat history, image display, extracted text, | |
| and image descriptions, providing a clean slate for new document processing. | |
| Returns: | |
| tuple: A tuple containing: | |
| - None: Cleared file input | |
| - "": Empty YouTube input string | |
| - "": Empty web URL input string | |
| - []: Empty chat history list | |
| - None: Clear image display | |
| - "": Clear image summary display | |
| - "": Empty extracted text | |
| - "": Empty image descriptions | |
| Note: | |
| This function is called when the "Clear All" button is clicked, | |
| providing a quick way to reset the entire interface. | |
| Example: | |
| >>> file, youtube, web, chat, image, summary, text, descriptions = clear_all_inputs() | |
| >>> print(f"Cleared: file={file}, youtube='{youtube}', web='{web}'") | |
| """ | |
| # Reset global state | |
| global current_rag_status, current_document_name | |
| current_rag_status = {"status": "none", "message": "No document processed yet"} | |
| current_document_name = "" | |
| # Reset all components to initial state | |
| return None, "", "", [], None, "", "", "", "" | |
| # Event handlers for all input types | |
| file_input.change( | |
| fn=process_input_and_update_status, | |
| inputs=[file_input, youtube_input, web_url_input, chatbot], | |
| outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], | |
| show_progress=True, | |
| show_api=False | |
| ) | |
| youtube_input.change( | |
| fn=process_input_and_update_status, | |
| inputs=[file_input, youtube_input, web_url_input, chatbot], | |
| outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], | |
| show_progress=True, | |
| show_api=False | |
| ) | |
| web_url_input.change( | |
| fn=process_input_and_update_status, | |
| inputs=[file_input, youtube_input, web_url_input, chatbot], | |
| outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], | |
| show_progress=True, | |
| show_api=False | |
| ) | |
| # Markdown-KV toggle handler | |
| def update_markdown_kv_toggle(enabled): | |
| global markdown_kv_enabled | |
| markdown_kv_enabled = enabled | |
| status = "ENABLED" if enabled else "DISABLED" | |
| status_msg = f"Markdown-KV (Table) Conversion {status}. {'Table images will be converted to markdown format.' if enabled else 'Standard image summaries will be used.'}" | |
| # Return: status message, markdown visibility, text visibility | |
| return ( | |
| status_msg, | |
| gr.update(visible=enabled), # markdown component | |
| gr.update(visible=not enabled) # text component | |
| ) | |
| markdown_kv_toggle.change( | |
| fn=update_markdown_kv_toggle, | |
| inputs=[markdown_kv_toggle], | |
| outputs=[notification_display, image_summary_markdown, image_summary_text], | |
| show_api=False | |
| ) | |
| # Clear button handler | |
| clear_file_btn.click( | |
| fn=clear_all_inputs, | |
| inputs=[], | |
| outputs=[file_input, youtube_input, web_url_input, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], | |
| show_api=False | |
| ) | |
| # Event handlers for chat interface | |
| def clear_chat(): | |
| """ | |
| Clear the chat history and image display. | |
| This function resets the chat interface by clearing the conversation | |
| history and removing any displayed images and summaries, while keeping the current | |
| document processing state intact. | |
| Returns: | |
| tuple: A tuple containing: | |
| - []: Empty chat history list | |
| - None: Clear image display | |
| - "": Clear image summary display | |
| Note: | |
| - Only clears the chat interface, not the document processing state | |
| - Called when the "Clear Chat" button is clicked | |
| - Preserves the current RAG system and document status | |
| Example: | |
| >>> chat, image, summary = clear_chat() | |
| >>> print(f"Chat cleared: {len(chat)} messages") | |
| """ | |
| return [], None, "", "" | |
| # Chat functionality | |
| send_btn.click( | |
| fn=chat_with_rag, | |
| inputs=[msg_input, chatbot], | |
| outputs=[msg_input, chatbot, image_display, image_summary_markdown, image_summary_text], | |
| show_progress=True, | |
| show_api=False | |
| ) | |
| msg_input.submit( | |
| fn=chat_with_rag, | |
| inputs=[msg_input, chatbot], | |
| outputs=[msg_input, chatbot, image_display, image_summary_markdown, image_summary_text], | |
| show_progress=True, | |
| show_api=False | |
| ) | |
| clear_chat_btn.click( | |
| fn=clear_chat, | |
| inputs=[], | |
| outputs=[chatbot, image_display, image_summary_markdown, image_summary_text], | |
| show_api=False | |
| ) | |
| # Add core functions for MCP exposure (hidden from UI) | |
| with gr.Tab("Core Functions", visible=False): | |
| # These functions are only for MCP exposure, not visible in UI | |
| # Unified extract_text_async API | |
| gr.Interface( | |
| fn=extract_text_sync, | |
| inputs=[ | |
| gr.Textbox(label="File Path", visible=False), | |
| gr.Textbox(label="YouTube ID", visible=False), | |
| gr.Textbox(label="Web URL", visible=False), | |
| gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False) | |
| ], | |
| outputs=gr.Textbox(label="Extracted Text", visible=False), | |
| title="extract_text_async", | |
| description="Extract text from files, YouTube videos, or web URLs using specified provider", | |
| api_name="extract_text_async" | |
| ) | |
| # Unified create_rag_from_text API | |
| gr.Interface( | |
| fn=create_rag_from_text_sync, | |
| inputs=[ | |
| gr.Textbox(label="Text", visible=False), | |
| gr.Textbox(label="Source Info", visible=False), | |
| gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False) | |
| ], | |
| outputs=gr.JSON(label="RAG Result", visible=False), | |
| title="create_rag_from_text", | |
| description="Build RAG from text (self-hosted FAISS)", | |
| api_name="create_rag_from_text" | |
| ) | |
| # Unified search_rag_documents API | |
| gr.Interface( | |
| fn=search_rag_documents_sync, | |
| inputs=[ | |
| gr.Textbox(label="Query", visible=False), | |
| gr.Slider(label="Top K", minimum=1, maximum=20, value=5, visible=False), | |
| gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False) | |
| ], | |
| outputs=gr.JSON(label="Search Results", visible=False), | |
| title="search_rag_documents", | |
| description="Search RAG documents (self-hosted FAISS)", | |
| api_name="search_rag_documents" | |
| ) | |
| return app | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app = create_gradio_app() | |
| # Launch with MCP server enabled | |
| app.launch( | |
| mcp_server=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True | |
| ) |