import os import json import logging import gradio as gr import asyncio from openai import AsyncOpenAI from dotenv import load_dotenv from file_preprocessing import * from file_preprocessing import _extract_text_async from image_retrieval import intelligent_search_images, store_image_summaries_batch, process_and_store_image_with_figure_index from selfHosted_functions import * # Set up logging logging.basicConfig(level=logging.WARNING) # Reduced from INFO to WARNING logger = logging.getLogger(__name__) logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) #load environment variables load_dotenv() client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Global variables for RAG state (self-hosted only: FAISS + HF storage) current_rag_status = {"status": "none", "message": "No document processed yet"} current_document_name = "" markdown_kv_enabled = False # Toggle for Markdown-KV (table) conversion async def search_rag_documents(query: str, top_k: int = 5, method_suffix: str = "") -> list: """ Search for relevant documents using the self-hosted FAISS index. Args: query (str): The search query string to find relevant documents. top_k (int, optional): Maximum number of results to return. Defaults to 5. method_suffix (str, optional): Optional suffix for index name. Returns: list: A list of document results containing content, source, title, chunk_index, and total_chunks information. """ try: return await search_rag_documents_selfhosted(query, top_k, method_suffix) except Exception as e: logger.error(f"Error searching RAG documents: {e}") return [] async def generate_chat_response(user_question: str, chat_history: list) -> str: """ Generate chatbot response using GPT-4.1-nano with RAG context. This function creates an AI-powered response by first searching for relevant document chunks using the user's question, then using those chunks as context for the GPT-4.1-nano model to generate a comprehensive answer. The response is based on the uploaded and processed documents. Args: user_question (str): The user's question or query. chat_history (list): List of previous conversation messages in the format: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] Returns: str: The AI-generated response based on the RAG context and chat history. Returns an error message if processing fails. Raises: Exception: Logs errors and returns error message instead of raising. Note: - Uses self-hosted FAISS for document search - Searches for top 3 most relevant document chunks - Uses temperature=0.0 for consistent, factual responses - Limited to 1000 max tokens for concise responses Example: >>> chat_history = [{"role": "user", "content": "What is machine learning?"}] >>> response = await generate_chat_response("How does it work?", chat_history) >>> print(response) """ try: # Search for relevant context relevant_docs = await search_rag_documents(user_question, top_k=3) # Prepare context from relevant documents context = "" if relevant_docs: context = "\n\n".join([ f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}" for doc in relevant_docs ]) # Prepare system message system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context. Use the following context to answer the user's question. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification. Context from document:{context} Answer the user's question based on the provided context. Be helpful, accurate, and concise.""" # Prepare messages messages = [{"role": "system", "content": system_message}] # Add chat history for message in chat_history: messages.append(message) # Add current question messages.append({"role": "user", "content": user_question}) # Generate response response = await client.chat.completions.create( model="gpt-4.1-nano", messages=messages, temperature=0.0 ) return response.choices[0].message.content.strip() except Exception as e: logger.error(f"Error generating chat response: {e}") return f"Sorry, I encountered an error while processing your question: {str(e)}" async def generate_chat_response_with_intelligent_images(user_question: str, chat_history: list) -> tuple: """ Generate chatbot response with intelligent image retrieval support. This enhanced version uses intelligent image retrieval to show only the most relevant images based on similarity scoring between user query and image summaries. Args: user_question (str): The user's question or query. chat_history (list): List of previous conversation messages. Returns: tuple: A tuple containing: - response_text (str): The AI-generated response based on RAG context - retrieved_images (dict): Dictionary of intelligently retrieved images """ try: # First, try intelligent image search intelligent_images = await intelligent_search_images(user_question, "selfhosted", top_k=1) if intelligent_images: # Use intelligent image search results logger.info(f"Found {len(intelligent_images)} relevant images using intelligent search") # Search for relevant text context relevant_docs = await search_rag_documents(user_question, top_k=3) # Prepare context from relevant documents text_context = "" if relevant_docs: text_context = "\n\n".join([ f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}" for doc in relevant_docs ]) # Fetch image summaries and add to context image_context = "" if intelligent_images: from image_retrieval import get_intelligent_retrieval retrieval = get_intelligent_retrieval("selfhosted") image_summaries_list = [] for image_id, image_data in intelligent_images.items(): try: # Get image summary summary = await retrieval.get_image_summary(image_id) if summary: # Format image summary with image ID for clarity image_summaries_list.append(f"Image {image_id}:\n{summary}") logger.info(f"Included image summary for {image_id} in LLM context") except Exception as e: logger.warning(f"Could not retrieve summary for {image_id}: {e}") if image_summaries_list: image_context = "\n\n--- Context from Retrieved Images ---\n" + "\n\n".join(image_summaries_list) # Combine text and image context full_context = "" if text_context: full_context = f"--- Context from Document Text ---\n{text_context}" if image_context: if full_context: full_context += "\n\n" + image_context else: full_context = image_context # Prepare system message system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context. Use the following context to answer the user's question. The context includes both text from the document and descriptions of retrieved images. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification. Context:{full_context if full_context else "No context available"} Answer the user's question based on the provided context. Be helpful, accurate, and concise. When referring to figures or images, use the information from the image context section.""" # Prepare messages messages = [{"role": "system", "content": system_message}] # Add chat history for message in chat_history: messages.append(message) # Add current question messages.append({"role": "user", "content": user_question}) # Generate response response = await client.chat.completions.create( model="gpt-4.1-nano", messages=messages, temperature=0.0 ) response_text = response.choices[0].message.content.strip() # Return response text and associated images (no duplicate message) return response_text, intelligent_images else: # Fallback to original search if no intelligent images found logger.info("No relevant images found with intelligent search, falling back to original method") return await generate_chat_response(user_question, chat_history), {} except Exception as e: logger.error(f"Error in intelligent image search: {e}") # Fallback to original method return await generate_chat_response(user_question, chat_history), {} async def generate_chat_response_with_images(user_question: str, chat_history: list) -> tuple: """ Generate chatbot response with image retrieval support. This enhanced version of the chat response function not only searches for relevant text chunks but also retrieves associated images from the documents. It's particularly useful for documents containing visual content like diagrams, charts, or illustrations that can provide additional context for the user's question. Args: user_question (str): The user's question or query. chat_history (list): List of previous conversation messages in the format: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] Returns: tuple: A tuple containing: - response_text (str): The AI-generated response based on RAG context - retrieved_images (dict): Dictionary of retrieved images with metadata Raises: Exception: Logs errors and returns error message with empty images dict. Note: - Uses self-hosted FAISS for text search and HF storage for image retrieval - Images are returned as base64-encoded data with metadata - Searches for top 3 most relevant document chunks Example: >>> chat_history = [] >>> response, images = await generate_chat_response_with_images("Show me the diagram", chat_history) >>> print(f"Response: {response}, Images found: {len(images)}") """ try: # Search for relevant context with images # Self-hosted: text search via FAISS; images via intelligent search if needed relevant_docs = await search_rag_documents(user_question, top_k=3) search_result = {"text_chunks": relevant_docs, "images": {}, "total_chunks": len(relevant_docs), "total_images": 0} # Prepare context from relevant documents context = "" if search_result.get("text_chunks"): context = "\n\n".join([ f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}" for doc in search_result["text_chunks"] ]) # Prepare system message system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context. Use the following context to answer the user's question. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification. Context from document:{context} Answer the user's question based on the provided context. Be helpful, accurate, and concise.""" # Prepare messages messages = [{"role": "system", "content": system_message}] # Add chat history for message in chat_history: messages.append(message) # Add current question messages.append({"role": "user", "content": user_question}) # Generate response response = await client.chat.completions.create( model="gpt-4.1-nano", messages=messages, temperature=0.0 ) response_text = response.choices[0].message.content.strip() # Return response text and associated images return response_text, search_result.get("images", {}) except Exception as e: logger.error(f"Error generating chat response with images: {e}") return f"Sorry, I encountered an error while processing your question: {str(e)}", {} def chat_with_rag(user_question: str, chat_history: list): """ Wrapper function for chat interface with image support and summaries. This function serves as the main interface for the chat functionality, handling the complete flow from user question to response with image display. It validates that a document has been processed, generates the AI response with image retrieval, updates the chat history, and formats images with their summaries for display in the Gradio interface. Args: user_question (str): The user's question or query. chat_history (list): List of previous conversation messages in the format: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] Returns: tuple: A tuple containing: - empty_string (str): Empty string to clear the input field - updated_chat_history (list): Updated chat history with new messages - image_path (str): Path to the most relevant image file - image_summary (str): AI-generated summary of the image Raises: Exception: Logs errors and adds error message to chat history. Note: - Validates that a document has been successfully processed - Creates temporary files for image display in Gradio - Handles base64 image decoding and file creation - Retrieves and includes image summaries for each image - Updates global chat history state - Returns empty string to clear the input field after processing Example: >>> chat_history = [] >>> empty_input, history, image, summary = chat_with_rag("What does this chart show?", chat_history) >>> print(f"Chat updated: {len(history)} messages, image: {image}, summary: {summary}") """ try: if current_rag_status["status"] != "success": return "Please upload and process a document first before asking questions.", chat_history, None, "", "" # Generate response with intelligent images response, retrieved_images = asyncio.run(generate_chat_response_with_intelligent_images(user_question, chat_history)) # Update chat history chat_history.append({"role": "user", "content": user_question}) chat_history.append({"role": "assistant", "content": response}) # Convert images to display format with summaries image_path = None image_summary = "" if retrieved_images: # Get intelligent retrieval instance to fetch summaries from image_retrieval import get_intelligent_retrieval retrieval = get_intelligent_retrieval("selfhosted") # Get the first (and only) image for image_id, image_data in retrieved_images.items(): if 'error' not in image_data and 'base64_data' in image_data: # Create a temporary file with a short path for Gradio import tempfile import base64 try: # Decode base64 and create temporary file image_bytes = base64.b64decode(image_data['base64_data']) # Create temp file with short name with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg', prefix='img_') as temp_file: temp_file.write(image_bytes) image_path = temp_file.name # Get image summary image_summary = asyncio.run(retrieval.get_image_summary(image_id)) if not image_summary: image_summary = f"Image {image_id}: Visual content from document" break # Only process the first image since we only want top 1 except Exception as e: logger.error(f"Error creating temp file for {image_id}: {e}") continue else: logger.warning(f"Skipping image {image_id}: {image_data.get('error', 'No base64_data')}") # Route image summary to appropriate display component based on Markdown-KV setting if markdown_kv_enabled: # Display in markdown component for table formatting support formatted_summary = image_summary if image_summary else "*No image summary available*" return "", chat_history, image_path, formatted_summary, "" else: # Display in text component for standard summaries return "", chat_history, image_path, "", image_summary except Exception as e: error_msg = f"Error: {str(e)}" logger.error(f"Error in chat_with_rag: {str(e)}") chat_history.append({"role": "user", "content": user_question}) chat_history.append({"role": "assistant", "content": error_msg}) return "", chat_history, None, "", "" async def auto_process_input(file, youtube_id, web_url) -> tuple: """ Automatically process input: extract text and create RAG. This is the main processing function that handles the complete pipeline from input (file, YouTube video, or web URL) to a fully functional RAG system. It supports multiple file types including PDF, PPTX, DOCX, and other formats, with special handling for documents containing images. Args: file: Uploaded file object (can be None). Supported formats: - PDF: Extracts text and images separately, generates image summaries - PPTX: Extracts text and images from slides, generates image summaries - DOCX: Extracts text and images, generates image summaries - Other formats: Standard text extraction youtube_id (str): YouTube video ID or URL (can be None or empty). web_url (str): Web URL to scrape content from (can be None or empty). Returns: tuple: A tuple containing: - extracted_text (str): The processed and enhanced text content - status_msg (str): Human-readable status message for the UI Raises: Exception: Logs errors and returns error message with status. Note: - Updates global variables: current_rag_status, current_document_name - For PDF/PPTX/DOCX: Extracts images, generates GPT summaries, merges with text - Creates RAG using self-hosted FAISS - Provides progress updates through callback functions - Handles file size limits and format validation Example: >>> text, status = await auto_process_input(file_obj, None, None) >>> print(f"Processing result: {status}") """ global current_rag_status, current_document_name try: # Determine input type and validate input_type = None source_name = "" if file is not None: input_type = "file" source_name = os.path.basename(file.name) if hasattr(file, 'name') else "document" elif youtube_id and youtube_id.strip(): input_type = "youtube" source_name = f"YouTube Video: {youtube_id.strip()}" elif web_url and web_url.strip(): input_type = "web" source_name = f"Web URL: {web_url.strip()}" else: return "No input provided", "Please provide a file, YouTube video, or web URL" # Clear old images and summaries from self-hosted (HF) storage first logger.info("Clearing old images and summaries from self-hosted storage...") from hf_storage import clear_all_images, clear_all_summaries from image_retrieval import clear_image_summaries await clear_all_images() await clear_all_summaries() await clear_image_summaries("selfhosted") # Update status current_rag_status = {"status": "processing", "message": f"Extracting text from {input_type}..."} status_msg = f"🔄 Extracting text from {input_type}..." # Check if it's a PDF or PPTX file for enhanced processing if file is not None and hasattr(file, 'name'): file_ext = os.path.splitext(file.name)[1].lower() if file_ext == ".pdf": # Extract images and text separately from file_preprocessing import extract_images_from_pdf, process_pdf_hybrid, summarize_image_with_gpt, capture_page_images_as_fallback # Define progress callback to update status async def update_progress(message): nonlocal status_msg status_msg = message current_rag_status["message"] = message # Use hybrid method: HTML + PDF images with Gemini await update_progress("🔄 Processing PDF with hybrid method (HTML + Images with Gemini)...") extracted_text = await process_pdf_hybrid(file.name, update_progress) # Extract images separately for display/retrieval await update_progress("📸 Extracting images from PDF for retrieval...") images_data = await extract_images_from_pdf(file.name, None, update_progress, "selfhosted") # Check if no images were found - if so, capture page images as fallback no_images_message = "" if not images_data['image_blob_urls']: await update_progress("📸 No extractable images found, capturing page images as fallback...") images_data = await capture_page_images_as_fallback(file.name, "selfhosted", update_progress) if not images_data['image_blob_urls']: no_images_message = "No images found in this document." # Step 3: Generate image summaries image_summaries = {} figure_metadata_dict = {} # Store figure metadata for index used_figures = set() # Track used figures for conflict resolution if images_data['image_blob_urls']: await update_progress("🤖 Generating image summaries with GPT...") total_images = len(images_data['image_blob_urls']) for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1): try: summary = await summarize_image_with_gpt( blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text ) image_summaries[image_id] = summary # Process with figure index (merge visual elements and store) from image_retrieval import process_and_store_image_with_figure_index enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index( image_id, summary, extracted_text, "selfhosted", used_figures ) # Update with enhanced summary image_summaries[image_id] = enhanced_summary # Store figure metadata if found if figure_metadata: figure_metadata_dict[image_id] = figure_metadata except Exception as e: logger.error(f"Error summarizing image {image_id}: {e}") image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" # Store summaries await update_progress("💾 Storing image summaries for intelligent retrieval...") await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict) # Step 4: Merge text with image summaries await update_progress("🔗 Merging text with image summaries...") # Split text by pages and add image summaries text_sections = extracted_text.split("--- PAGE") enhanced_sections = [] for i, section in enumerate(text_sections): if not section.strip(): continue page_num = i if page_num in images_data['image_ids_by_page']: image_ids = images_data['image_ids_by_page'][page_num] image_summary_text = "\n\nImage Summaries:\n" for img_id in image_ids: if img_id in image_summaries: image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n" else: image_summary_text += f"- {img_id}: [Image content from PDF]\n" enhanced_sections.append(f"--- PAGE{section}{image_summary_text}") else: enhanced_sections.append(f"--- PAGE{section}") final_text = "\n".join(enhanced_sections) # Create pipeline result format pipeline_result = { "status": "success", "vectorized_ready_text": final_text, "image_blob_urls": images_data['image_blob_urls'], "image_summaries": image_summaries, "total_pages": images_data['total_pages'], "total_images": len(images_data['image_blob_urls']), "no_images_message": no_images_message } if pipeline_result["status"] == "success": # Create RAG from the vectorized-ready text current_document_name = source_name source_info = source_name rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress) if rag_result["status"] == "success": current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"} status_msg = """✅ PDF processed successfully (self-hosted)!""" else: current_rag_status = {"status": "error", "message": rag_result['message']} status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "") else: current_rag_status = {"status": "error", "message": pipeline_result['message']} return pipeline_result['message'], f"❌ PDF Pipeline Failed: {pipeline_result['message']}", {}, "" elif file_ext == ".pptx": # Extract images and text separately from file_preprocessing import extract_images_from_pptx, process_pptx, summarize_image_with_gpt # Define progress callback to update status async def update_progress(message): nonlocal status_msg status_msg = message current_rag_status["message"] = message # Step 1: Extract images await update_progress("📸 Extracting images from PPTX...") images_data = await extract_images_from_pptx(file.name, None, update_progress, "selfhosted") # Check if no images were found no_images_message = "" if not images_data['image_blob_urls']: no_images_message = "No images found in this document." # Step 3: Extract text await update_progress("📄 Extracting text from PPTX...") extracted_text = await process_pptx(file.name, update_progress) # Step 4: Generate image summaries image_summaries = {} if images_data['image_blob_urls']: await update_progress("🤖 Generating image summaries with GPT...") total_images = len(images_data['image_blob_urls']) figure_metadata_dict = {} # Store figure metadata for index used_figures = set() # Track used figures for conflict resolution for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1): try: # Get AI summary ai_summary = await summarize_image_with_gpt(blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text) # Process with figure index (merge visual elements and store) from image_retrieval import process_and_store_image_with_figure_index enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index( image_id, ai_summary, extracted_text, "selfhosted", used_figures ) image_summaries[image_id] = enhanced_summary # Store figure metadata if found if figure_metadata: figure_metadata_dict[image_id] = figure_metadata except Exception as e: logger.error(f"Error summarizing image {image_id}: {e}") image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" # Note: Summaries and figure index are already stored by process_and_store_image_with_figure_index # But we still call store_image_summaries_batch for backward compatibility await update_progress("💾 Storing image summaries for intelligent retrieval...") await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict) # Step 4: Merge text with image summaries await update_progress("🔗 Merging text with image summaries...") # Split text by slides and add image summaries text_sections = extracted_text.split("=== SLIDE") enhanced_sections = [] for i, section in enumerate(text_sections): if not section.strip(): continue slide_num = i if slide_num in images_data['image_ids_by_slide']: image_ids = images_data['image_ids_by_slide'][slide_num] image_summary_text = "\n\nImage Summaries:\n" for img_id in image_ids: if img_id in image_summaries: image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n" else: image_summary_text += f"- {img_id}: [Image content from PPTX]\n" enhanced_sections.append(f"=== SLIDE{section}{image_summary_text}") else: enhanced_sections.append(f"=== SLIDE{section}") final_text = "\n".join(enhanced_sections) # Create pipeline result format pipeline_result = { "status": "success", "vectorized_ready_text": final_text, "image_blob_urls": images_data['image_blob_urls'], "image_summaries": image_summaries, "total_slides": images_data['total_slides'], "total_images": len(images_data['image_blob_urls']), "no_images_message": no_images_message } if pipeline_result["status"] == "success": # Create RAG from the vectorized-ready text current_document_name = source_name source_info = source_name rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress) if rag_result["status"] == "success": current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"} status_msg = """✅ PPTX processed successfully (self-hosted)!""" else: current_rag_status = {"status": "error", "message": rag_result['message']} status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "") else: current_rag_status = {"status": "error", "message": pipeline_result['message']} return pipeline_result['message'], f"❌ PPTX Pipeline Failed: {pipeline_result['message']}", {}, "" elif file_ext in [".doc", ".docx"]: # Extract images and text separately from file_preprocessing import extract_images_from_docx, process_docx, summarize_image_with_gpt # Define progress callback to update status async def update_progress(message): nonlocal status_msg status_msg = message current_rag_status["message"] = message # Step 1: Extract images await update_progress("📸 Extracting images from DOCX...") images_data = await extract_images_from_docx(file.name, None, update_progress, "selfhosted") # Check if no images were found no_images_message = "" if not images_data['image_blob_urls']: no_images_message = "No images found in this document." # Step 3: Extract text await update_progress("📄 Extracting text from DOCX...") extracted_text = await process_docx(file.name, update_progress) # Step 4: Generate image summaries image_summaries = {} if images_data['image_blob_urls']: await update_progress("🤖 Generating image summaries with GPT...") total_images = len(images_data['image_blob_urls']) figure_metadata_dict = {} # Store figure metadata for index used_figures = set() # Track used figures for conflict resolution for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1): try: # Get AI summary ai_summary = await summarize_image_with_gpt(blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text) # Process with figure index (merge visual elements and store) from image_retrieval import process_and_store_image_with_figure_index enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index( image_id, ai_summary, extracted_text, "selfhosted", used_figures ) image_summaries[image_id] = enhanced_summary # Store figure metadata if found if figure_metadata: figure_metadata_dict[image_id] = figure_metadata except Exception as e: logger.error(f"Error summarizing image {image_id}: {e}") image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" # Note: Summaries and figure index are already stored by process_and_store_image_with_figure_index # But we still call store_image_summaries_batch for backward compatibility await update_progress("💾 Storing image summaries for intelligent retrieval...") await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict) # Step 4: Merge text with image summaries await update_progress("🔗 Merging text with image summaries...") # Add image summaries to the document text if images_data['image_ids']: image_summary_text = "\n\nImage Summaries:\n" for img_id in images_data['image_ids']: if img_id in image_summaries: image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n" else: image_summary_text += f"- {img_id}: [Image content from DOCX]\n" final_text = extracted_text + image_summary_text else: final_text = extracted_text # Create pipeline result format pipeline_result = { "status": "success", "vectorized_ready_text": final_text, "image_blob_urls": images_data['image_blob_urls'], "image_summaries": image_summaries, "total_paragraphs": images_data['total_paragraphs'], "total_images": len(images_data['image_blob_urls']), "no_images_message": no_images_message } if pipeline_result["status"] == "success": # Create RAG from the vectorized-ready text current_document_name = source_name source_info = source_name rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress) if rag_result["status"] == "success": current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"} status_msg = """✅ DOCX processed successfully (self-hosted)!""" else: current_rag_status = {"status": "error", "message": rag_result['message']} status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "") else: current_rag_status = {"status": "error", "message": pipeline_result['message']} return pipeline_result['message'], f"❌ DOCX Pipeline Failed: {pipeline_result['message']}", {}, "" # Handle web URL processing specially to get both text and image summaries image_summaries = {} display_text = "" if web_url and web_url.strip(): try: # Get the full result from process_web_url to access both text and image summaries from file_preprocessing import process_web_url web_result = await process_web_url(web_url.strip(), "selfhosted") if isinstance(web_result, dict): extracted_text = web_result["text"].strip() # Full content for RAG display_text = web_result.get("display_text", extracted_text).strip() # Clean text for UI image_summaries = web_result.get("image_summaries", {}) else: extracted_text = web_result.strip() display_text = extracted_text image_summaries = {} except Exception as e: logger.error(f"Error processing web URL: {e}") extracted_text = f"Error processing URL: {str(e)}" display_text = extracted_text image_summaries = {} else: # For non-web URLs, use the original processing extracted_text = await _extract_text_async(file, youtube_id, web_url, "selfhosted") display_text = extracted_text if extracted_text.startswith("Error"): current_rag_status = {"status": "error", "message": extracted_text} return extracted_text, f"❌ {extracted_text}", {}, "" # Update status current_rag_status = {"status": "processing", "message": "Creating RAG (self-hosted)..."} status_msg = "🤖 Creating RAG (self-hosted)..." # Create RAG with progress callback current_document_name = source_name source_info = source_name # Define progress callback to update status async def update_progress(message): nonlocal status_msg status_msg = message current_rag_status["message"] = message rag_result = await create_rag_from_text_selfhosted(extracted_text, source_info, update_progress) if rag_result["status"] == "success": current_rag_status = {"status": "success", "message": "RAG created successfully (self-hosted)"} status_msg = f"""✅ {input_type.title()} processed successfully (self-hosted)!""" else: current_rag_status = {"status": "error", "message": rag_result['message']} status_msg = f"❌ RAG Creation Failed: {rag_result['message']}" # Return appropriate no_images_message based on input type no_images_message = "" if not image_summaries: if web_url and web_url.strip(): no_images_message = "No images found on this web page." elif youtube_id and youtube_id.strip(): no_images_message = "No images found in this YouTube video." else: no_images_message = "No images found in this document." return display_text, status_msg, image_summaries, no_images_message except Exception as e: error_msg = f"Error processing {input_type if 'input_type' in locals() else 'input'}: {str(e)}" current_rag_status = {"status": "error", "message": error_msg} return error_msg, f"❌ {error_msg}", {}, "" def auto_process_input_sync(file, youtube_id, web_url): """ Synchronous wrapper for auto_process_input. This function provides a synchronous interface to the async auto_process_input function, making it compatible with Gradio's synchronous event handlers. It runs the async function using asyncio.run() and returns the same results. Args: file: Uploaded file object (can be None). youtube_id (str): YouTube video ID or URL (can be None or empty). web_url (str): Web URL to scrape content from (can be None or empty). Returns: tuple: A tuple containing: - extracted_text (str): The processed and enhanced text content - status_msg (str): Human-readable status message for the UI - image_summaries (dict): Dictionary of image summaries - no_images_message (str): Message if no images were found Note: This is a convenience wrapper that allows the async auto_process_input function to be used in synchronous contexts like Gradio event handlers. Example: >>> text, status, images, no_images = auto_process_input_sync(file_obj, None, None) >>> print(f"Sync processing result: {status}") """ return asyncio.run(auto_process_input(file, youtube_id, web_url)) # Unified APIs for MCP exposure async def create_rag_from_text_unified(text: str, source_info: str, provider: str = "selfhosted", progress_callback=None): """ Create RAG from text using self-hosted FAISS. Args: text (str): The text content to create RAG from. source_info (str): Information about the source of the text (filename, URL, etc.). provider (str): Ignored; kept for API compatibility. Self-hosted only. progress_callback: Optional callback function for progress updates. Returns: dict: Result dictionary with status and message. """ try: return await create_rag_from_text_selfhosted(text, source_info, progress_callback) except Exception as e: logger.error(f"Error in create_rag_from_text: {e}") return {"status": "error", "message": str(e)} async def search_rag_documents_unified(query: str, top_k: int = 5, provider: str = "selfhosted") -> list: """ Search RAG documents using self-hosted FAISS. Args: query (str): The search query string. top_k (int): Maximum number of results to return. provider (str): Ignored; kept for API compatibility. Self-hosted only. Returns: list: Document results with content, source, title, chunk_index, total_chunks. """ try: return await search_rag_documents_selfhosted(query, top_k) except Exception as e: logger.error(f"Error in search_rag_documents: {e}") return [] async def extract_text_async_unified(file_path=None, youtube_id=None, web_url=None, provider: str = "selfhosted"): """ Unified API to extract text from files, YouTube videos, or web URLs using either AWS or Azure. Args: file_path (str, optional): Path to the file to extract text from. youtube_id (str, optional): YouTube video ID or URL to extract text from. web_url (str, optional): Web URL to scrape and extract text from. provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". Returns: str: Extracted text content or error message if extraction fails. Example: >>> text = await extract_text_async_unified("/path/to/document.pdf", provider="aws") >>> print(f"Extracted {len(text)} characters") """ try: # Create a mock file object for file_path if provided file_obj = None if file_path and os.path.exists(file_path): class MockFile: def __init__(self, path): self.name = path self.size = os.path.getsize(path) file_obj = MockFile(file_path) return await _extract_text_async(file_obj, youtube_id, web_url, "selfhosted") except Exception as e: return f"Error: {str(e)}" # Synchronous wrappers for MCP exposure def create_rag_from_text_sync(text: str, source_info: str, provider: str = "selfhosted"): """ Synchronous wrapper for create_rag_from_text_unified. Args: text (str): The text content to create RAG from. source_info (str): Information about the source of the text. provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". Returns: dict: Result dictionary containing status and message. """ return asyncio.run(create_rag_from_text_unified(text, source_info, provider)) def search_rag_documents_sync(query: str, top_k: int = 5, provider: str = "selfhosted"): """ Synchronous wrapper for search_rag_documents_unified. Args: query (str): The search query string. top_k (int): Maximum number of results to return. provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". Returns: list: A list of document results. """ return asyncio.run(search_rag_documents_unified(query, top_k, provider)) def extract_text_sync(file_path=None, youtube_id=None, web_url=None, provider: str = "selfhosted"): """ Synchronous wrapper for extract_text_async_unified. This function provides a synchronous interface to the async text extraction function, making it compatible with Gradio's synchronous event handlers and MCP (Model Context Protocol) exposure. It handles file path conversion and runs the async extraction function. Args: file_path (str, optional): Path to the file to extract text from. youtube_id (str, optional): YouTube video ID or URL to extract text from. web_url (str, optional): Web URL to scrape and extract text from. provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure". Returns: str: Extracted text content or error message if extraction fails. Raises: Exception: Logs errors and returns error message instead of raising. Note: - Creates a mock file object if file_path is provided - Uses asyncio.run() to execute the async extraction function - Primarily used for MCP server exposure and testing Example: >>> text = extract_text_sync("/path/to/document.pdf", provider="aws") >>> print(f"Extracted {len(text)} characters") """ return asyncio.run(extract_text_async_unified(file_path, youtube_id, web_url, provider)) def clear_s3_images_sync(): """ Synchronous wrapper for clearing S3 images. This function provides a synchronous interface to clear all images from S3, making it compatible with Gradio's synchronous event handlers. Returns: dict: Result dictionary containing: - status (str): "success" or "error" - message (str): Success message or error description Note: - Uses the AWS S3 client to clear all images from the bucket - Requires AWS credentials to be configured in environment variables - Clears images from pdf_images, pptx_images, docx_images, and web_images folders Example: >>> result = clear_s3_images_sync() >>> print(f"Image clearing: {result['status']}") """ try: from aws_functions import clear_images_from_s3 success = asyncio.run(clear_images_from_s3()) if success: return {"status": "success", "message": "Successfully cleared all images from S3 bucket"} else: return {"status": "error", "message": "Failed to clear images from S3 bucket"} except Exception as e: return {"status": "error", "message": f"Error clearing S3 images: {str(e)}"} # Create Gradio interface def create_gradio_app(): """ Create and configure the Gradio web application interface. This function builds a comprehensive web interface for the RAG chat application, including file upload, YouTube/URL input, provider selection, chat interface, and image gallery. It also exposes core functions for MCP (Model Context Protocol) server integration. Returns: gr.Blocks: A configured Gradio Blocks application with: - File upload interface supporting multiple formats - YouTube video and web URL input fields - Self-hosted RAG (FAISS + HF storage) - Real-time processing status display - Interactive chat interface with message history - Image gallery for retrieved document images - Hidden MCP-exposed functions for external integration Features: - Responsive design with custom CSS styling - Automatic input processing on file/URL change - Progress indicators and status updates - Image display with preview and download capabilities - Chat history management with clear functionality - Provider switching with real-time updates - MCP server compatibility for external tool integration Supported File Types: - Documents: PDF, DOC, DOCX, TXT, PPTX, XLSX - Media: MP3, WAV, MP4 - Images: JPG, JPEG, PNG - CAD: DXF Example: >>> app = create_gradio_app() >>> app.launch(server_name="0.0.0.0", server_port=7860) """ css = """ #file-upload { min-height: 255px !important; } """ with gr.Blocks( title="Document Chat Assistant", theme=gr.themes.Soft(), css=css ) as app: with gr.Tab("Create RAG"): with gr.Row(): with gr.Column(scale=2): markdown_kv_toggle = gr.Checkbox( label="🔄 Enable Markdown-KV (Table) Conversion", value=False, info="When enabled, table images will be converted to markdown format" ) with gr.Column(scale=1): gr.Markdown( """ **⚠️ File size limit:** Maximum 20 MB per file **🚀 Auto Processing:** Upload, paste YouTube or Web URL! """ ) with gr.Row(): with gr.Column(): with gr.Row(): with gr.Column(scale=2): file_input = gr.File( label="📁 Upload Document", file_types=[".pdf", ".doc", ".docx", ".txt", ".pptx", ".xlsx", ".mp3", ".wav", ".mp4", ".dxf", ".jpg", ".jpeg", ".png"], height=255, elem_id="file-upload" ) with gr.Column(scale=1): youtube_input = gr.Textbox( label="🎥 YouTube Video", placeholder="Enter YouTube URL or ID", lines=1, ) web_url_input = gr.Textbox( label="🌐 Web URL", placeholder="Enter web URL", lines=1, ) clear_file_btn = gr.Button( "🗑️ Clear All", variant="secondary", size="lg", ) notification_display = gr.Markdown("") doc_status = gr.Textbox( label="Current Document", value="No document processed yet", interactive=False, lines=2, visible=False ) with gr.Row(): with gr.Column(): gr.Markdown("### 📄 Extracted Text") extracted_text_display = gr.Textbox( label="Document Content", lines=10, max_lines=15, show_copy_button=True, container=True, interactive=False, placeholder="Extracted text will appear here after processing...", ) with gr.Column(): gr.Markdown("### 🖼️ Image Descriptions") image_descriptions_display = gr.Textbox( label="AI-Generated Image Summaries", lines=10, max_lines=15, show_copy_button=True, container=True, interactive=False, placeholder="Image descriptions will appear here after processing...", ) with gr.Tab("Chat"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 💬 Chat with Your Document") chatbot = gr.Chatbot( label="Chat History", height=550, type="messages", allow_tags=False ) msg_input = gr.Textbox( label="Your Question", placeholder="Ask a question about your uploaded document...", lines=3, ) with gr.Row(): send_btn = gr.Button( "Send", variant="primary", size="lg", ) clear_chat_btn = gr.Button( "🗑️ Clear Chat", variant="secondary", size="lg" ) with gr.Column(scale=1): gr.Markdown("### 📸 Most Relevant Image") image_display = gr.Image( label="", height=250, show_label=False, container=True, show_download_button=True, ) gr.Markdown("### 📝 AI-Generated Summary") image_summary_markdown = gr.Markdown( value="*Image summary will appear here...*", visible=False ) image_summary_text = gr.Textbox( label="", lines=20, max_lines=20, show_copy_button=True, container=True, interactive=False, placeholder="Image summary will appear here...", visible=True ) # Event handlers for automatic input processing def process_input_and_update_status(file, youtube_id, web_url, chat_history): """ Process input and return status only (no text preview). This function handles the automatic processing of user inputs (file upload, YouTube URL, or web URL) and updates the UI with processing status. It clears the chat history when new input is provided and provides real-time feedback on the processing progress. Args: file: Uploaded file object (can be None). youtube_id (str): YouTube video ID or URL (can be None or empty). web_url (str): Web URL to scrape content from (can be None or empty). chat_history (list): Current chat history (will be cleared for new input). Returns: tuple: A tuple containing: - status_msg (str): Processing status message for display - doc_status_msg (str): Document status message for chat interface - cleared_chat (list): Empty chat history (cleared for new input) - empty_images (list): Empty image gallery (cleared for new input) - extracted_text (str): Extracted text content - image_descriptions (str): Formatted image descriptions Note: - Validates that at least one input is provided - Clears chat history when new input is processed - Updates global variables for document status - Provides user-friendly status messages with emojis - Handles error states and provides appropriate feedback - Formats image descriptions for display """ # Check if any input is provided if file is None and (not youtube_id or not youtube_id.strip()) and (not web_url or not web_url.strip()): return "No input provided", "No document processed yet", chat_history, None, "", "", "", "" # Clear chat history when new input is provided cleared_chat = [] # Process input extracted_text, status_msg, image_summaries, no_images_message = auto_process_input_sync(file, youtube_id, web_url) # Format image descriptions for display image_descriptions = "" if image_summaries: image_descriptions = "AI-Generated Image Descriptions:\n\n" for image_id, description in image_summaries.items(): image_descriptions += f"🖼️ {image_id}:\n{description}\n\n" elif no_images_message: image_descriptions = no_images_message else: image_descriptions = "No images found in this document." # Update document status global current_document_name, current_rag_status if current_rag_status["status"] == "success": doc_status_msg = f"✅ Ready: {current_document_name}" elif current_rag_status["status"] == "processing": doc_status_msg = f"🔄 Processing: {current_document_name}" elif current_rag_status["status"] == "error": doc_status_msg = f"❌ Error: {current_rag_status['message']}" else: doc_status_msg = "No document processed yet" # Keep AI-Generated Summary empty during document processing - only populate during chat return status_msg, doc_status_msg, cleared_chat, None, "", "", extracted_text, image_descriptions # Clear function def clear_all_inputs(): """ Clear all input fields and reset the interface. This function resets all input fields (file upload, YouTube URL, web URL) and clears the chat history, image display, extracted text, and image descriptions, providing a clean slate for new document processing. Returns: tuple: A tuple containing: - None: Cleared file input - "": Empty YouTube input string - "": Empty web URL input string - []: Empty chat history list - None: Clear image display - "": Clear image summary display - "": Empty extracted text - "": Empty image descriptions Note: This function is called when the "Clear All" button is clicked, providing a quick way to reset the entire interface. Example: >>> file, youtube, web, chat, image, summary, text, descriptions = clear_all_inputs() >>> print(f"Cleared: file={file}, youtube='{youtube}', web='{web}'") """ # Reset global state global current_rag_status, current_document_name current_rag_status = {"status": "none", "message": "No document processed yet"} current_document_name = "" # Reset all components to initial state return None, "", "", [], None, "", "", "", "" # Event handlers for all input types file_input.change( fn=process_input_and_update_status, inputs=[file_input, youtube_input, web_url_input, chatbot], outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], show_progress=True, show_api=False ) youtube_input.change( fn=process_input_and_update_status, inputs=[file_input, youtube_input, web_url_input, chatbot], outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], show_progress=True, show_api=False ) web_url_input.change( fn=process_input_and_update_status, inputs=[file_input, youtube_input, web_url_input, chatbot], outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], show_progress=True, show_api=False ) # Markdown-KV toggle handler def update_markdown_kv_toggle(enabled): global markdown_kv_enabled markdown_kv_enabled = enabled status = "ENABLED" if enabled else "DISABLED" status_msg = f"Markdown-KV (Table) Conversion {status}. {'Table images will be converted to markdown format.' if enabled else 'Standard image summaries will be used.'}" # Return: status message, markdown visibility, text visibility return ( status_msg, gr.update(visible=enabled), # markdown component gr.update(visible=not enabled) # text component ) markdown_kv_toggle.change( fn=update_markdown_kv_toggle, inputs=[markdown_kv_toggle], outputs=[notification_display, image_summary_markdown, image_summary_text], show_api=False ) # Clear button handler clear_file_btn.click( fn=clear_all_inputs, inputs=[], outputs=[file_input, youtube_input, web_url_input, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display], show_api=False ) # Event handlers for chat interface def clear_chat(): """ Clear the chat history and image display. This function resets the chat interface by clearing the conversation history and removing any displayed images and summaries, while keeping the current document processing state intact. Returns: tuple: A tuple containing: - []: Empty chat history list - None: Clear image display - "": Clear image summary display Note: - Only clears the chat interface, not the document processing state - Called when the "Clear Chat" button is clicked - Preserves the current RAG system and document status Example: >>> chat, image, summary = clear_chat() >>> print(f"Chat cleared: {len(chat)} messages") """ return [], None, "", "" # Chat functionality send_btn.click( fn=chat_with_rag, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot, image_display, image_summary_markdown, image_summary_text], show_progress=True, show_api=False ) msg_input.submit( fn=chat_with_rag, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot, image_display, image_summary_markdown, image_summary_text], show_progress=True, show_api=False ) clear_chat_btn.click( fn=clear_chat, inputs=[], outputs=[chatbot, image_display, image_summary_markdown, image_summary_text], show_api=False ) # Add core functions for MCP exposure (hidden from UI) with gr.Tab("Core Functions", visible=False): # These functions are only for MCP exposure, not visible in UI # Unified extract_text_async API gr.Interface( fn=extract_text_sync, inputs=[ gr.Textbox(label="File Path", visible=False), gr.Textbox(label="YouTube ID", visible=False), gr.Textbox(label="Web URL", visible=False), gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False) ], outputs=gr.Textbox(label="Extracted Text", visible=False), title="extract_text_async", description="Extract text from files, YouTube videos, or web URLs using specified provider", api_name="extract_text_async" ) # Unified create_rag_from_text API gr.Interface( fn=create_rag_from_text_sync, inputs=[ gr.Textbox(label="Text", visible=False), gr.Textbox(label="Source Info", visible=False), gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False) ], outputs=gr.JSON(label="RAG Result", visible=False), title="create_rag_from_text", description="Build RAG from text (self-hosted FAISS)", api_name="create_rag_from_text" ) # Unified search_rag_documents API gr.Interface( fn=search_rag_documents_sync, inputs=[ gr.Textbox(label="Query", visible=False), gr.Slider(label="Top K", minimum=1, maximum=20, value=5, visible=False), gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False) ], outputs=gr.JSON(label="Search Results", visible=False), title="search_rag_documents", description="Search RAG documents (self-hosted FAISS)", api_name="search_rag_documents" ) return app # Launch the app if __name__ == "__main__": app = create_gradio_app() # Launch with MCP server enabled app.launch( mcp_server=True, server_name="0.0.0.0", server_port=7860, share=True )