rag-app / app.py
bhavinmatariya's picture
Upload 13 files
3506c42 verified
import os
import json
import logging
import gradio as gr
import asyncio
from openai import AsyncOpenAI
from dotenv import load_dotenv
from file_preprocessing import *
from file_preprocessing import _extract_text_async
from image_retrieval import intelligent_search_images, store_image_summaries_batch, process_and_store_image_with_figure_index
from selfHosted_functions import *
# Set up logging
logging.basicConfig(level=logging.WARNING) # Reduced from INFO to WARNING
logger = logging.getLogger(__name__)
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
#load environment variables
load_dotenv()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Global variables for RAG state (self-hosted only: FAISS + HF storage)
current_rag_status = {"status": "none", "message": "No document processed yet"}
current_document_name = ""
markdown_kv_enabled = False # Toggle for Markdown-KV (table) conversion
async def search_rag_documents(query: str, top_k: int = 5, method_suffix: str = "") -> list:
"""
Search for relevant documents using the self-hosted FAISS index.
Args:
query (str): The search query string to find relevant documents.
top_k (int, optional): Maximum number of results to return. Defaults to 5.
method_suffix (str, optional): Optional suffix for index name.
Returns:
list: A list of document results containing content, source, title,
chunk_index, and total_chunks information.
"""
try:
return await search_rag_documents_selfhosted(query, top_k, method_suffix)
except Exception as e:
logger.error(f"Error searching RAG documents: {e}")
return []
async def generate_chat_response(user_question: str, chat_history: list) -> str:
"""
Generate chatbot response using GPT-4.1-nano with RAG context.
This function creates an AI-powered response by first searching for relevant
document chunks using the user's question, then using those chunks as context
for the GPT-4.1-nano model to generate a comprehensive answer. The response
is based on the uploaded and processed documents.
Args:
user_question (str): The user's question or query.
chat_history (list): List of previous conversation messages in the format:
[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
Returns:
str: The AI-generated response based on the RAG context and chat history.
Returns an error message if processing fails.
Raises:
Exception: Logs errors and returns error message instead of raising.
Note:
- Uses self-hosted FAISS for document search
- Searches for top 3 most relevant document chunks
- Uses temperature=0.0 for consistent, factual responses
- Limited to 1000 max tokens for concise responses
Example:
>>> chat_history = [{"role": "user", "content": "What is machine learning?"}]
>>> response = await generate_chat_response("How does it work?", chat_history)
>>> print(response)
"""
try:
# Search for relevant context
relevant_docs = await search_rag_documents(user_question, top_k=3)
# Prepare context from relevant documents
context = ""
if relevant_docs:
context = "\n\n".join([
f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}"
for doc in relevant_docs
])
# Prepare system message
system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context.
Use the following context to answer the user's question. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification.
Context from document:{context} Answer the user's question based on the provided context. Be helpful, accurate, and concise."""
# Prepare messages
messages = [{"role": "system", "content": system_message}]
# Add chat history
for message in chat_history:
messages.append(message)
# Add current question
messages.append({"role": "user", "content": user_question})
# Generate response
response = await client.chat.completions.create(
model="gpt-4.1-nano",
messages=messages,
temperature=0.0
)
return response.choices[0].message.content.strip()
except Exception as e:
logger.error(f"Error generating chat response: {e}")
return f"Sorry, I encountered an error while processing your question: {str(e)}"
async def generate_chat_response_with_intelligent_images(user_question: str, chat_history: list) -> tuple:
"""
Generate chatbot response with intelligent image retrieval support.
This enhanced version uses intelligent image retrieval to show only the most
relevant images based on similarity scoring between user query and image summaries.
Args:
user_question (str): The user's question or query.
chat_history (list): List of previous conversation messages.
Returns:
tuple: A tuple containing:
- response_text (str): The AI-generated response based on RAG context
- retrieved_images (dict): Dictionary of intelligently retrieved images
"""
try:
# First, try intelligent image search
intelligent_images = await intelligent_search_images(user_question, "selfhosted", top_k=1)
if intelligent_images:
# Use intelligent image search results
logger.info(f"Found {len(intelligent_images)} relevant images using intelligent search")
# Search for relevant text context
relevant_docs = await search_rag_documents(user_question, top_k=3)
# Prepare context from relevant documents
text_context = ""
if relevant_docs:
text_context = "\n\n".join([
f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}"
for doc in relevant_docs
])
# Fetch image summaries and add to context
image_context = ""
if intelligent_images:
from image_retrieval import get_intelligent_retrieval
retrieval = get_intelligent_retrieval("selfhosted")
image_summaries_list = []
for image_id, image_data in intelligent_images.items():
try:
# Get image summary
summary = await retrieval.get_image_summary(image_id)
if summary:
# Format image summary with image ID for clarity
image_summaries_list.append(f"Image {image_id}:\n{summary}")
logger.info(f"Included image summary for {image_id} in LLM context")
except Exception as e:
logger.warning(f"Could not retrieve summary for {image_id}: {e}")
if image_summaries_list:
image_context = "\n\n--- Context from Retrieved Images ---\n" + "\n\n".join(image_summaries_list)
# Combine text and image context
full_context = ""
if text_context:
full_context = f"--- Context from Document Text ---\n{text_context}"
if image_context:
if full_context:
full_context += "\n\n" + image_context
else:
full_context = image_context
# Prepare system message
system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context.
Use the following context to answer the user's question. The context includes both text from the document and descriptions of retrieved images.
If the context doesn't contain enough information to answer the question, say so politely and ask for clarification.
Context:{full_context if full_context else "No context available"}
Answer the user's question based on the provided context. Be helpful, accurate, and concise. When referring to figures or images, use the information from the image context section."""
# Prepare messages
messages = [{"role": "system", "content": system_message}]
# Add chat history
for message in chat_history:
messages.append(message)
# Add current question
messages.append({"role": "user", "content": user_question})
# Generate response
response = await client.chat.completions.create(
model="gpt-4.1-nano",
messages=messages,
temperature=0.0
)
response_text = response.choices[0].message.content.strip()
# Return response text and associated images (no duplicate message)
return response_text, intelligent_images
else:
# Fallback to original search if no intelligent images found
logger.info("No relevant images found with intelligent search, falling back to original method")
return await generate_chat_response(user_question, chat_history), {}
except Exception as e:
logger.error(f"Error in intelligent image search: {e}")
# Fallback to original method
return await generate_chat_response(user_question, chat_history), {}
async def generate_chat_response_with_images(user_question: str, chat_history: list) -> tuple:
"""
Generate chatbot response with image retrieval support.
This enhanced version of the chat response function not only searches for
relevant text chunks but also retrieves associated images from the documents.
It's particularly useful for documents containing visual content like diagrams,
charts, or illustrations that can provide additional context for the user's question.
Args:
user_question (str): The user's question or query.
chat_history (list): List of previous conversation messages in the format:
[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
Returns:
tuple: A tuple containing:
- response_text (str): The AI-generated response based on RAG context
- retrieved_images (dict): Dictionary of retrieved images with metadata
Raises:
Exception: Logs errors and returns error message with empty images dict.
Note:
- Uses self-hosted FAISS for text search and HF storage for image retrieval
- Images are returned as base64-encoded data with metadata
- Searches for top 3 most relevant document chunks
Example:
>>> chat_history = []
>>> response, images = await generate_chat_response_with_images("Show me the diagram", chat_history)
>>> print(f"Response: {response}, Images found: {len(images)}")
"""
try:
# Search for relevant context with images
# Self-hosted: text search via FAISS; images via intelligent search if needed
relevant_docs = await search_rag_documents(user_question, top_k=3)
search_result = {"text_chunks": relevant_docs, "images": {}, "total_chunks": len(relevant_docs), "total_images": 0}
# Prepare context from relevant documents
context = ""
if search_result.get("text_chunks"):
context = "\n\n".join([
f"Source: {doc.get('source', 'Unknown')}\nContent: {doc.get('content', '')}"
for doc in search_result["text_chunks"]
])
# Prepare system message
system_message = f"""You are a helpful AI assistant that answers questions based on the provided document context.
Use the following context to answer the user's question. If the context doesn't contain enough information to answer the question, say so politely and ask for clarification.
Context from document:{context} Answer the user's question based on the provided context. Be helpful, accurate, and concise."""
# Prepare messages
messages = [{"role": "system", "content": system_message}]
# Add chat history
for message in chat_history:
messages.append(message)
# Add current question
messages.append({"role": "user", "content": user_question})
# Generate response
response = await client.chat.completions.create(
model="gpt-4.1-nano",
messages=messages,
temperature=0.0
)
response_text = response.choices[0].message.content.strip()
# Return response text and associated images
return response_text, search_result.get("images", {})
except Exception as e:
logger.error(f"Error generating chat response with images: {e}")
return f"Sorry, I encountered an error while processing your question: {str(e)}", {}
def chat_with_rag(user_question: str, chat_history: list):
"""
Wrapper function for chat interface with image support and summaries.
This function serves as the main interface for the chat functionality,
handling the complete flow from user question to response with image display.
It validates that a document has been processed, generates the AI response
with image retrieval, updates the chat history, and formats images with their
summaries for display in the Gradio interface.
Args:
user_question (str): The user's question or query.
chat_history (list): List of previous conversation messages in the format:
[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
Returns:
tuple: A tuple containing:
- empty_string (str): Empty string to clear the input field
- updated_chat_history (list): Updated chat history with new messages
- image_path (str): Path to the most relevant image file
- image_summary (str): AI-generated summary of the image
Raises:
Exception: Logs errors and adds error message to chat history.
Note:
- Validates that a document has been successfully processed
- Creates temporary files for image display in Gradio
- Handles base64 image decoding and file creation
- Retrieves and includes image summaries for each image
- Updates global chat history state
- Returns empty string to clear the input field after processing
Example:
>>> chat_history = []
>>> empty_input, history, image, summary = chat_with_rag("What does this chart show?", chat_history)
>>> print(f"Chat updated: {len(history)} messages, image: {image}, summary: {summary}")
"""
try:
if current_rag_status["status"] != "success":
return "Please upload and process a document first before asking questions.", chat_history, None, "", ""
# Generate response with intelligent images
response, retrieved_images = asyncio.run(generate_chat_response_with_intelligent_images(user_question, chat_history))
# Update chat history
chat_history.append({"role": "user", "content": user_question})
chat_history.append({"role": "assistant", "content": response})
# Convert images to display format with summaries
image_path = None
image_summary = ""
if retrieved_images:
# Get intelligent retrieval instance to fetch summaries
from image_retrieval import get_intelligent_retrieval
retrieval = get_intelligent_retrieval("selfhosted")
# Get the first (and only) image
for image_id, image_data in retrieved_images.items():
if 'error' not in image_data and 'base64_data' in image_data:
# Create a temporary file with a short path for Gradio
import tempfile
import base64
try:
# Decode base64 and create temporary file
image_bytes = base64.b64decode(image_data['base64_data'])
# Create temp file with short name
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg', prefix='img_') as temp_file:
temp_file.write(image_bytes)
image_path = temp_file.name
# Get image summary
image_summary = asyncio.run(retrieval.get_image_summary(image_id))
if not image_summary:
image_summary = f"Image {image_id}: Visual content from document"
break # Only process the first image since we only want top 1
except Exception as e:
logger.error(f"Error creating temp file for {image_id}: {e}")
continue
else:
logger.warning(f"Skipping image {image_id}: {image_data.get('error', 'No base64_data')}")
# Route image summary to appropriate display component based on Markdown-KV setting
if markdown_kv_enabled:
# Display in markdown component for table formatting support
formatted_summary = image_summary if image_summary else "*No image summary available*"
return "", chat_history, image_path, formatted_summary, ""
else:
# Display in text component for standard summaries
return "", chat_history, image_path, "", image_summary
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(f"Error in chat_with_rag: {str(e)}")
chat_history.append({"role": "user", "content": user_question})
chat_history.append({"role": "assistant", "content": error_msg})
return "", chat_history, None, "", ""
async def auto_process_input(file, youtube_id, web_url) -> tuple:
"""
Automatically process input: extract text and create RAG.
This is the main processing function that handles the complete pipeline from
input (file, YouTube video, or web URL) to a fully functional RAG system.
It supports multiple file types including PDF, PPTX, DOCX, and other formats,
with special handling for documents containing images.
Args:
file: Uploaded file object (can be None). Supported formats:
- PDF: Extracts text and images separately, generates image summaries
- PPTX: Extracts text and images from slides, generates image summaries
- DOCX: Extracts text and images, generates image summaries
- Other formats: Standard text extraction
youtube_id (str): YouTube video ID or URL (can be None or empty).
web_url (str): Web URL to scrape content from (can be None or empty).
Returns:
tuple: A tuple containing:
- extracted_text (str): The processed and enhanced text content
- status_msg (str): Human-readable status message for the UI
Raises:
Exception: Logs errors and returns error message with status.
Note:
- Updates global variables: current_rag_status, current_document_name
- For PDF/PPTX/DOCX: Extracts images, generates GPT summaries, merges with text
- Creates RAG using self-hosted FAISS
- Provides progress updates through callback functions
- Handles file size limits and format validation
Example:
>>> text, status = await auto_process_input(file_obj, None, None)
>>> print(f"Processing result: {status}")
"""
global current_rag_status, current_document_name
try:
# Determine input type and validate
input_type = None
source_name = ""
if file is not None:
input_type = "file"
source_name = os.path.basename(file.name) if hasattr(file, 'name') else "document"
elif youtube_id and youtube_id.strip():
input_type = "youtube"
source_name = f"YouTube Video: {youtube_id.strip()}"
elif web_url and web_url.strip():
input_type = "web"
source_name = f"Web URL: {web_url.strip()}"
else:
return "No input provided", "Please provide a file, YouTube video, or web URL"
# Clear old images and summaries from self-hosted (HF) storage first
logger.info("Clearing old images and summaries from self-hosted storage...")
from hf_storage import clear_all_images, clear_all_summaries
from image_retrieval import clear_image_summaries
await clear_all_images()
await clear_all_summaries()
await clear_image_summaries("selfhosted")
# Update status
current_rag_status = {"status": "processing", "message": f"Extracting text from {input_type}..."}
status_msg = f"🔄 Extracting text from {input_type}..."
# Check if it's a PDF or PPTX file for enhanced processing
if file is not None and hasattr(file, 'name'):
file_ext = os.path.splitext(file.name)[1].lower()
if file_ext == ".pdf":
# Extract images and text separately
from file_preprocessing import extract_images_from_pdf, process_pdf_hybrid, summarize_image_with_gpt, capture_page_images_as_fallback
# Define progress callback to update status
async def update_progress(message):
nonlocal status_msg
status_msg = message
current_rag_status["message"] = message
# Use hybrid method: HTML + PDF images with Gemini
await update_progress("🔄 Processing PDF with hybrid method (HTML + Images with Gemini)...")
extracted_text = await process_pdf_hybrid(file.name, update_progress)
# Extract images separately for display/retrieval
await update_progress("📸 Extracting images from PDF for retrieval...")
images_data = await extract_images_from_pdf(file.name, None, update_progress, "selfhosted")
# Check if no images were found - if so, capture page images as fallback
no_images_message = ""
if not images_data['image_blob_urls']:
await update_progress("📸 No extractable images found, capturing page images as fallback...")
images_data = await capture_page_images_as_fallback(file.name, "selfhosted", update_progress)
if not images_data['image_blob_urls']:
no_images_message = "No images found in this document."
# Step 3: Generate image summaries
image_summaries = {}
figure_metadata_dict = {} # Store figure metadata for index
used_figures = set() # Track used figures for conflict resolution
if images_data['image_blob_urls']:
await update_progress("🤖 Generating image summaries with GPT...")
total_images = len(images_data['image_blob_urls'])
for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1):
try:
summary = await summarize_image_with_gpt(
blob_url,
image_id,
"selfhosted",
markdown_kv_enabled,
document_text=extracted_text
)
image_summaries[image_id] = summary
# Process with figure index (merge visual elements and store)
from image_retrieval import process_and_store_image_with_figure_index
enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index(
image_id, summary, extracted_text, "selfhosted", used_figures
)
# Update with enhanced summary
image_summaries[image_id] = enhanced_summary
# Store figure metadata if found
if figure_metadata:
figure_metadata_dict[image_id] = figure_metadata
except Exception as e:
logger.error(f"Error summarizing image {image_id}: {e}")
image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]"
# Store summaries
await update_progress("💾 Storing image summaries for intelligent retrieval...")
await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict)
# Step 4: Merge text with image summaries
await update_progress("🔗 Merging text with image summaries...")
# Split text by pages and add image summaries
text_sections = extracted_text.split("--- PAGE")
enhanced_sections = []
for i, section in enumerate(text_sections):
if not section.strip():
continue
page_num = i
if page_num in images_data['image_ids_by_page']:
image_ids = images_data['image_ids_by_page'][page_num]
image_summary_text = "\n\nImage Summaries:\n"
for img_id in image_ids:
if img_id in image_summaries:
image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n"
else:
image_summary_text += f"- {img_id}: [Image content from PDF]\n"
enhanced_sections.append(f"--- PAGE{section}{image_summary_text}")
else:
enhanced_sections.append(f"--- PAGE{section}")
final_text = "\n".join(enhanced_sections)
# Create pipeline result format
pipeline_result = {
"status": "success",
"vectorized_ready_text": final_text,
"image_blob_urls": images_data['image_blob_urls'],
"image_summaries": image_summaries,
"total_pages": images_data['total_pages'],
"total_images": len(images_data['image_blob_urls']),
"no_images_message": no_images_message
}
if pipeline_result["status"] == "success":
# Create RAG from the vectorized-ready text
current_document_name = source_name
source_info = source_name
rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress)
if rag_result["status"] == "success":
current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"}
status_msg = """✅ PDF processed successfully (self-hosted)!"""
else:
current_rag_status = {"status": "error", "message": rag_result['message']}
status_msg = f"❌ RAG Creation Failed: {rag_result['message']}"
return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "")
else:
current_rag_status = {"status": "error", "message": pipeline_result['message']}
return pipeline_result['message'], f"❌ PDF Pipeline Failed: {pipeline_result['message']}", {}, ""
elif file_ext == ".pptx":
# Extract images and text separately
from file_preprocessing import extract_images_from_pptx, process_pptx, summarize_image_with_gpt
# Define progress callback to update status
async def update_progress(message):
nonlocal status_msg
status_msg = message
current_rag_status["message"] = message
# Step 1: Extract images
await update_progress("📸 Extracting images from PPTX...")
images_data = await extract_images_from_pptx(file.name, None, update_progress, "selfhosted")
# Check if no images were found
no_images_message = ""
if not images_data['image_blob_urls']:
no_images_message = "No images found in this document."
# Step 3: Extract text
await update_progress("📄 Extracting text from PPTX...")
extracted_text = await process_pptx(file.name, update_progress)
# Step 4: Generate image summaries
image_summaries = {}
if images_data['image_blob_urls']:
await update_progress("🤖 Generating image summaries with GPT...")
total_images = len(images_data['image_blob_urls'])
figure_metadata_dict = {} # Store figure metadata for index
used_figures = set() # Track used figures for conflict resolution
for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1):
try:
# Get AI summary
ai_summary = await summarize_image_with_gpt(blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text)
# Process with figure index (merge visual elements and store)
from image_retrieval import process_and_store_image_with_figure_index
enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index(
image_id, ai_summary, extracted_text, "selfhosted", used_figures
)
image_summaries[image_id] = enhanced_summary
# Store figure metadata if found
if figure_metadata:
figure_metadata_dict[image_id] = figure_metadata
except Exception as e:
logger.error(f"Error summarizing image {image_id}: {e}")
image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]"
# Note: Summaries and figure index are already stored by process_and_store_image_with_figure_index
# But we still call store_image_summaries_batch for backward compatibility
await update_progress("💾 Storing image summaries for intelligent retrieval...")
await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict)
# Step 4: Merge text with image summaries
await update_progress("🔗 Merging text with image summaries...")
# Split text by slides and add image summaries
text_sections = extracted_text.split("=== SLIDE")
enhanced_sections = []
for i, section in enumerate(text_sections):
if not section.strip():
continue
slide_num = i
if slide_num in images_data['image_ids_by_slide']:
image_ids = images_data['image_ids_by_slide'][slide_num]
image_summary_text = "\n\nImage Summaries:\n"
for img_id in image_ids:
if img_id in image_summaries:
image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n"
else:
image_summary_text += f"- {img_id}: [Image content from PPTX]\n"
enhanced_sections.append(f"=== SLIDE{section}{image_summary_text}")
else:
enhanced_sections.append(f"=== SLIDE{section}")
final_text = "\n".join(enhanced_sections)
# Create pipeline result format
pipeline_result = {
"status": "success",
"vectorized_ready_text": final_text,
"image_blob_urls": images_data['image_blob_urls'],
"image_summaries": image_summaries,
"total_slides": images_data['total_slides'],
"total_images": len(images_data['image_blob_urls']),
"no_images_message": no_images_message
}
if pipeline_result["status"] == "success":
# Create RAG from the vectorized-ready text
current_document_name = source_name
source_info = source_name
rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress)
if rag_result["status"] == "success":
current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"}
status_msg = """✅ PPTX processed successfully (self-hosted)!"""
else:
current_rag_status = {"status": "error", "message": rag_result['message']}
status_msg = f"❌ RAG Creation Failed: {rag_result['message']}"
return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "")
else:
current_rag_status = {"status": "error", "message": pipeline_result['message']}
return pipeline_result['message'], f"❌ PPTX Pipeline Failed: {pipeline_result['message']}", {}, ""
elif file_ext in [".doc", ".docx"]:
# Extract images and text separately
from file_preprocessing import extract_images_from_docx, process_docx, summarize_image_with_gpt
# Define progress callback to update status
async def update_progress(message):
nonlocal status_msg
status_msg = message
current_rag_status["message"] = message
# Step 1: Extract images
await update_progress("📸 Extracting images from DOCX...")
images_data = await extract_images_from_docx(file.name, None, update_progress, "selfhosted")
# Check if no images were found
no_images_message = ""
if not images_data['image_blob_urls']:
no_images_message = "No images found in this document."
# Step 3: Extract text
await update_progress("📄 Extracting text from DOCX...")
extracted_text = await process_docx(file.name, update_progress)
# Step 4: Generate image summaries
image_summaries = {}
if images_data['image_blob_urls']:
await update_progress("🤖 Generating image summaries with GPT...")
total_images = len(images_data['image_blob_urls'])
figure_metadata_dict = {} # Store figure metadata for index
used_figures = set() # Track used figures for conflict resolution
for i, (image_id, blob_url) in enumerate(images_data['image_blob_urls'].items(), 1):
try:
# Get AI summary
ai_summary = await summarize_image_with_gpt(blob_url, image_id, "selfhosted", markdown_kv_enabled, document_text=extracted_text)
# Process with figure index (merge visual elements and store)
from image_retrieval import process_and_store_image_with_figure_index
enhanced_summary, figure_metadata = await process_and_store_image_with_figure_index(
image_id, ai_summary, extracted_text, "selfhosted", used_figures
)
image_summaries[image_id] = enhanced_summary
# Store figure metadata if found
if figure_metadata:
figure_metadata_dict[image_id] = figure_metadata
except Exception as e:
logger.error(f"Error summarizing image {image_id}: {e}")
image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]"
# Note: Summaries and figure index are already stored by process_and_store_image_with_figure_index
# But we still call store_image_summaries_batch for backward compatibility
await update_progress("💾 Storing image summaries for intelligent retrieval...")
await store_image_summaries_batch(image_summaries, "selfhosted", extracted_text, figure_metadata_dict)
# Step 4: Merge text with image summaries
await update_progress("🔗 Merging text with image summaries...")
# Add image summaries to the document text
if images_data['image_ids']:
image_summary_text = "\n\nImage Summaries:\n"
for img_id in images_data['image_ids']:
if img_id in image_summaries:
image_summary_text += f"- {img_id}: {image_summaries[img_id]}\n"
else:
image_summary_text += f"- {img_id}: [Image content from DOCX]\n"
final_text = extracted_text + image_summary_text
else:
final_text = extracted_text
# Create pipeline result format
pipeline_result = {
"status": "success",
"vectorized_ready_text": final_text,
"image_blob_urls": images_data['image_blob_urls'],
"image_summaries": image_summaries,
"total_paragraphs": images_data['total_paragraphs'],
"total_images": len(images_data['image_blob_urls']),
"no_images_message": no_images_message
}
if pipeline_result["status"] == "success":
# Create RAG from the vectorized-ready text
current_document_name = source_name
source_info = source_name
rag_result = await create_rag_from_text_selfhosted(pipeline_result["vectorized_ready_text"], source_info, update_progress)
if rag_result["status"] == "success":
current_rag_status = {"status": "success", "message": f"RAG created successfully (self-hosted) with {pipeline_result['total_images']} images"}
status_msg = """✅ DOCX processed successfully (self-hosted)!"""
else:
current_rag_status = {"status": "error", "message": rag_result['message']}
status_msg = f"❌ RAG Creation Failed: {rag_result['message']}"
return pipeline_result["vectorized_ready_text"], status_msg, pipeline_result.get("image_summaries", {}), pipeline_result.get("no_images_message", "")
else:
current_rag_status = {"status": "error", "message": pipeline_result['message']}
return pipeline_result['message'], f"❌ DOCX Pipeline Failed: {pipeline_result['message']}", {}, ""
# Handle web URL processing specially to get both text and image summaries
image_summaries = {}
display_text = ""
if web_url and web_url.strip():
try:
# Get the full result from process_web_url to access both text and image summaries
from file_preprocessing import process_web_url
web_result = await process_web_url(web_url.strip(), "selfhosted")
if isinstance(web_result, dict):
extracted_text = web_result["text"].strip() # Full content for RAG
display_text = web_result.get("display_text", extracted_text).strip() # Clean text for UI
image_summaries = web_result.get("image_summaries", {})
else:
extracted_text = web_result.strip()
display_text = extracted_text
image_summaries = {}
except Exception as e:
logger.error(f"Error processing web URL: {e}")
extracted_text = f"Error processing URL: {str(e)}"
display_text = extracted_text
image_summaries = {}
else:
# For non-web URLs, use the original processing
extracted_text = await _extract_text_async(file, youtube_id, web_url, "selfhosted")
display_text = extracted_text
if extracted_text.startswith("Error"):
current_rag_status = {"status": "error", "message": extracted_text}
return extracted_text, f"❌ {extracted_text}", {}, ""
# Update status
current_rag_status = {"status": "processing", "message": "Creating RAG (self-hosted)..."}
status_msg = "🤖 Creating RAG (self-hosted)..."
# Create RAG with progress callback
current_document_name = source_name
source_info = source_name
# Define progress callback to update status
async def update_progress(message):
nonlocal status_msg
status_msg = message
current_rag_status["message"] = message
rag_result = await create_rag_from_text_selfhosted(extracted_text, source_info, update_progress)
if rag_result["status"] == "success":
current_rag_status = {"status": "success", "message": "RAG created successfully (self-hosted)"}
status_msg = f"""✅ {input_type.title()} processed successfully (self-hosted)!"""
else:
current_rag_status = {"status": "error", "message": rag_result['message']}
status_msg = f"❌ RAG Creation Failed: {rag_result['message']}"
# Return appropriate no_images_message based on input type
no_images_message = ""
if not image_summaries:
if web_url and web_url.strip():
no_images_message = "No images found on this web page."
elif youtube_id and youtube_id.strip():
no_images_message = "No images found in this YouTube video."
else:
no_images_message = "No images found in this document."
return display_text, status_msg, image_summaries, no_images_message
except Exception as e:
error_msg = f"Error processing {input_type if 'input_type' in locals() else 'input'}: {str(e)}"
current_rag_status = {"status": "error", "message": error_msg}
return error_msg, f"❌ {error_msg}", {}, ""
def auto_process_input_sync(file, youtube_id, web_url):
"""
Synchronous wrapper for auto_process_input.
This function provides a synchronous interface to the async auto_process_input
function, making it compatible with Gradio's synchronous event handlers.
It runs the async function using asyncio.run() and returns the same results.
Args:
file: Uploaded file object (can be None).
youtube_id (str): YouTube video ID or URL (can be None or empty).
web_url (str): Web URL to scrape content from (can be None or empty).
Returns:
tuple: A tuple containing:
- extracted_text (str): The processed and enhanced text content
- status_msg (str): Human-readable status message for the UI
- image_summaries (dict): Dictionary of image summaries
- no_images_message (str): Message if no images were found
Note:
This is a convenience wrapper that allows the async auto_process_input
function to be used in synchronous contexts like Gradio event handlers.
Example:
>>> text, status, images, no_images = auto_process_input_sync(file_obj, None, None)
>>> print(f"Sync processing result: {status}")
"""
return asyncio.run(auto_process_input(file, youtube_id, web_url))
# Unified APIs for MCP exposure
async def create_rag_from_text_unified(text: str, source_info: str, provider: str = "selfhosted", progress_callback=None):
"""
Create RAG from text using self-hosted FAISS.
Args:
text (str): The text content to create RAG from.
source_info (str): Information about the source of the text (filename, URL, etc.).
provider (str): Ignored; kept for API compatibility. Self-hosted only.
progress_callback: Optional callback function for progress updates.
Returns:
dict: Result dictionary with status and message.
"""
try:
return await create_rag_from_text_selfhosted(text, source_info, progress_callback)
except Exception as e:
logger.error(f"Error in create_rag_from_text: {e}")
return {"status": "error", "message": str(e)}
async def search_rag_documents_unified(query: str, top_k: int = 5, provider: str = "selfhosted") -> list:
"""
Search RAG documents using self-hosted FAISS.
Args:
query (str): The search query string.
top_k (int): Maximum number of results to return.
provider (str): Ignored; kept for API compatibility. Self-hosted only.
Returns:
list: Document results with content, source, title, chunk_index, total_chunks.
"""
try:
return await search_rag_documents_selfhosted(query, top_k)
except Exception as e:
logger.error(f"Error in search_rag_documents: {e}")
return []
async def extract_text_async_unified(file_path=None, youtube_id=None, web_url=None, provider: str = "selfhosted"):
"""
Unified API to extract text from files, YouTube videos, or web URLs using either AWS or Azure.
Args:
file_path (str, optional): Path to the file to extract text from.
youtube_id (str, optional): YouTube video ID or URL to extract text from.
web_url (str, optional): Web URL to scrape and extract text from.
provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure".
Returns:
str: Extracted text content or error message if extraction fails.
Example:
>>> text = await extract_text_async_unified("/path/to/document.pdf", provider="aws")
>>> print(f"Extracted {len(text)} characters")
"""
try:
# Create a mock file object for file_path if provided
file_obj = None
if file_path and os.path.exists(file_path):
class MockFile:
def __init__(self, path):
self.name = path
self.size = os.path.getsize(path)
file_obj = MockFile(file_path)
return await _extract_text_async(file_obj, youtube_id, web_url, "selfhosted")
except Exception as e:
return f"Error: {str(e)}"
# Synchronous wrappers for MCP exposure
def create_rag_from_text_sync(text: str, source_info: str, provider: str = "selfhosted"):
"""
Synchronous wrapper for create_rag_from_text_unified.
Args:
text (str): The text content to create RAG from.
source_info (str): Information about the source of the text.
provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure".
Returns:
dict: Result dictionary containing status and message.
"""
return asyncio.run(create_rag_from_text_unified(text, source_info, provider))
def search_rag_documents_sync(query: str, top_k: int = 5, provider: str = "selfhosted"):
"""
Synchronous wrapper for search_rag_documents_unified.
Args:
query (str): The search query string.
top_k (int): Maximum number of results to return.
provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure".
Returns:
list: A list of document results.
"""
return asyncio.run(search_rag_documents_unified(query, top_k, provider))
def extract_text_sync(file_path=None, youtube_id=None, web_url=None, provider: str = "selfhosted"):
"""
Synchronous wrapper for extract_text_async_unified.
This function provides a synchronous interface to the async text extraction
function, making it compatible with Gradio's synchronous event handlers
and MCP (Model Context Protocol) exposure. It handles file path conversion
and runs the async extraction function.
Args:
file_path (str, optional): Path to the file to extract text from.
youtube_id (str, optional): YouTube video ID or URL to extract text from.
web_url (str, optional): Web URL to scrape and extract text from.
provider (str): Cloud provider to use ("azure" or "aws"). Defaults to "azure".
Returns:
str: Extracted text content or error message if extraction fails.
Raises:
Exception: Logs errors and returns error message instead of raising.
Note:
- Creates a mock file object if file_path is provided
- Uses asyncio.run() to execute the async extraction function
- Primarily used for MCP server exposure and testing
Example:
>>> text = extract_text_sync("/path/to/document.pdf", provider="aws")
>>> print(f"Extracted {len(text)} characters")
"""
return asyncio.run(extract_text_async_unified(file_path, youtube_id, web_url, provider))
def clear_s3_images_sync():
"""
Synchronous wrapper for clearing S3 images.
This function provides a synchronous interface to clear all images from S3,
making it compatible with Gradio's synchronous event handlers.
Returns:
dict: Result dictionary containing:
- status (str): "success" or "error"
- message (str): Success message or error description
Note:
- Uses the AWS S3 client to clear all images from the bucket
- Requires AWS credentials to be configured in environment variables
- Clears images from pdf_images, pptx_images, docx_images, and web_images folders
Example:
>>> result = clear_s3_images_sync()
>>> print(f"Image clearing: {result['status']}")
"""
try:
from aws_functions import clear_images_from_s3
success = asyncio.run(clear_images_from_s3())
if success:
return {"status": "success", "message": "Successfully cleared all images from S3 bucket"}
else:
return {"status": "error", "message": "Failed to clear images from S3 bucket"}
except Exception as e:
return {"status": "error", "message": f"Error clearing S3 images: {str(e)}"}
# Create Gradio interface
def create_gradio_app():
"""
Create and configure the Gradio web application interface.
This function builds a comprehensive web interface for the RAG chat application,
including file upload, YouTube/URL input, provider selection, chat interface,
and image gallery. It also exposes core functions for MCP (Model Context Protocol)
server integration.
Returns:
gr.Blocks: A configured Gradio Blocks application with:
- File upload interface supporting multiple formats
- YouTube video and web URL input fields
- Self-hosted RAG (FAISS + HF storage)
- Real-time processing status display
- Interactive chat interface with message history
- Image gallery for retrieved document images
- Hidden MCP-exposed functions for external integration
Features:
- Responsive design with custom CSS styling
- Automatic input processing on file/URL change
- Progress indicators and status updates
- Image display with preview and download capabilities
- Chat history management with clear functionality
- Provider switching with real-time updates
- MCP server compatibility for external tool integration
Supported File Types:
- Documents: PDF, DOC, DOCX, TXT, PPTX, XLSX
- Media: MP3, WAV, MP4
- Images: JPG, JPEG, PNG
- CAD: DXF
Example:
>>> app = create_gradio_app()
>>> app.launch(server_name="0.0.0.0", server_port=7860)
"""
css = """
#file-upload {
min-height: 255px !important;
}
"""
with gr.Blocks(
title="Document Chat Assistant",
theme=gr.themes.Soft(),
css=css
) as app:
with gr.Tab("Create RAG"):
with gr.Row():
with gr.Column(scale=2):
markdown_kv_toggle = gr.Checkbox(
label="🔄 Enable Markdown-KV (Table) Conversion",
value=False,
info="When enabled, table images will be converted to markdown format"
)
with gr.Column(scale=1):
gr.Markdown(
"""
**⚠️ File size limit:** Maximum 20 MB per file
**🚀 Auto Processing:** Upload, paste YouTube or Web URL!
"""
)
with gr.Row():
with gr.Column():
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(
label="📁 Upload Document",
file_types=[".pdf", ".doc", ".docx", ".txt", ".pptx", ".xlsx", ".mp3", ".wav", ".mp4", ".dxf", ".jpg", ".jpeg", ".png"],
height=255,
elem_id="file-upload"
)
with gr.Column(scale=1):
youtube_input = gr.Textbox(
label="🎥 YouTube Video",
placeholder="Enter YouTube URL or ID",
lines=1,
)
web_url_input = gr.Textbox(
label="🌐 Web URL",
placeholder="Enter web URL",
lines=1,
)
clear_file_btn = gr.Button(
"🗑️ Clear All",
variant="secondary",
size="lg",
)
notification_display = gr.Markdown("")
doc_status = gr.Textbox(
label="Current Document",
value="No document processed yet",
interactive=False,
lines=2,
visible=False
)
with gr.Row():
with gr.Column():
gr.Markdown("### 📄 Extracted Text")
extracted_text_display = gr.Textbox(
label="Document Content",
lines=10,
max_lines=15,
show_copy_button=True,
container=True,
interactive=False,
placeholder="Extracted text will appear here after processing...",
)
with gr.Column():
gr.Markdown("### 🖼️ Image Descriptions")
image_descriptions_display = gr.Textbox(
label="AI-Generated Image Summaries",
lines=10,
max_lines=15,
show_copy_button=True,
container=True,
interactive=False,
placeholder="Image descriptions will appear here after processing...",
)
with gr.Tab("Chat"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### 💬 Chat with Your Document")
chatbot = gr.Chatbot(
label="Chat History",
height=550,
type="messages",
allow_tags=False
)
msg_input = gr.Textbox(
label="Your Question",
placeholder="Ask a question about your uploaded document...",
lines=3,
)
with gr.Row():
send_btn = gr.Button(
"Send",
variant="primary",
size="lg",
)
clear_chat_btn = gr.Button(
"🗑️ Clear Chat",
variant="secondary",
size="lg"
)
with gr.Column(scale=1):
gr.Markdown("### 📸 Most Relevant Image")
image_display = gr.Image(
label="",
height=250,
show_label=False,
container=True,
show_download_button=True,
)
gr.Markdown("### 📝 AI-Generated Summary")
image_summary_markdown = gr.Markdown(
value="*Image summary will appear here...*",
visible=False
)
image_summary_text = gr.Textbox(
label="",
lines=20,
max_lines=20,
show_copy_button=True,
container=True,
interactive=False,
placeholder="Image summary will appear here...",
visible=True
)
# Event handlers for automatic input processing
def process_input_and_update_status(file, youtube_id, web_url, chat_history):
"""
Process input and return status only (no text preview).
This function handles the automatic processing of user inputs (file upload,
YouTube URL, or web URL) and updates the UI with processing status.
It clears the chat history when new input is provided and provides
real-time feedback on the processing progress.
Args:
file: Uploaded file object (can be None).
youtube_id (str): YouTube video ID or URL (can be None or empty).
web_url (str): Web URL to scrape content from (can be None or empty).
chat_history (list): Current chat history (will be cleared for new input).
Returns:
tuple: A tuple containing:
- status_msg (str): Processing status message for display
- doc_status_msg (str): Document status message for chat interface
- cleared_chat (list): Empty chat history (cleared for new input)
- empty_images (list): Empty image gallery (cleared for new input)
- extracted_text (str): Extracted text content
- image_descriptions (str): Formatted image descriptions
Note:
- Validates that at least one input is provided
- Clears chat history when new input is processed
- Updates global variables for document status
- Provides user-friendly status messages with emojis
- Handles error states and provides appropriate feedback
- Formats image descriptions for display
"""
# Check if any input is provided
if file is None and (not youtube_id or not youtube_id.strip()) and (not web_url or not web_url.strip()):
return "No input provided", "No document processed yet", chat_history, None, "", "", "", ""
# Clear chat history when new input is provided
cleared_chat = []
# Process input
extracted_text, status_msg, image_summaries, no_images_message = auto_process_input_sync(file, youtube_id, web_url)
# Format image descriptions for display
image_descriptions = ""
if image_summaries:
image_descriptions = "AI-Generated Image Descriptions:\n\n"
for image_id, description in image_summaries.items():
image_descriptions += f"🖼️ {image_id}:\n{description}\n\n"
elif no_images_message:
image_descriptions = no_images_message
else:
image_descriptions = "No images found in this document."
# Update document status
global current_document_name, current_rag_status
if current_rag_status["status"] == "success":
doc_status_msg = f"✅ Ready: {current_document_name}"
elif current_rag_status["status"] == "processing":
doc_status_msg = f"🔄 Processing: {current_document_name}"
elif current_rag_status["status"] == "error":
doc_status_msg = f"❌ Error: {current_rag_status['message']}"
else:
doc_status_msg = "No document processed yet"
# Keep AI-Generated Summary empty during document processing - only populate during chat
return status_msg, doc_status_msg, cleared_chat, None, "", "", extracted_text, image_descriptions
# Clear function
def clear_all_inputs():
"""
Clear all input fields and reset the interface.
This function resets all input fields (file upload, YouTube URL,
web URL) and clears the chat history, image display, extracted text,
and image descriptions, providing a clean slate for new document processing.
Returns:
tuple: A tuple containing:
- None: Cleared file input
- "": Empty YouTube input string
- "": Empty web URL input string
- []: Empty chat history list
- None: Clear image display
- "": Clear image summary display
- "": Empty extracted text
- "": Empty image descriptions
Note:
This function is called when the "Clear All" button is clicked,
providing a quick way to reset the entire interface.
Example:
>>> file, youtube, web, chat, image, summary, text, descriptions = clear_all_inputs()
>>> print(f"Cleared: file={file}, youtube='{youtube}', web='{web}'")
"""
# Reset global state
global current_rag_status, current_document_name
current_rag_status = {"status": "none", "message": "No document processed yet"}
current_document_name = ""
# Reset all components to initial state
return None, "", "", [], None, "", "", "", ""
# Event handlers for all input types
file_input.change(
fn=process_input_and_update_status,
inputs=[file_input, youtube_input, web_url_input, chatbot],
outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display],
show_progress=True,
show_api=False
)
youtube_input.change(
fn=process_input_and_update_status,
inputs=[file_input, youtube_input, web_url_input, chatbot],
outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display],
show_progress=True,
show_api=False
)
web_url_input.change(
fn=process_input_and_update_status,
inputs=[file_input, youtube_input, web_url_input, chatbot],
outputs=[notification_display, doc_status, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display],
show_progress=True,
show_api=False
)
# Markdown-KV toggle handler
def update_markdown_kv_toggle(enabled):
global markdown_kv_enabled
markdown_kv_enabled = enabled
status = "ENABLED" if enabled else "DISABLED"
status_msg = f"Markdown-KV (Table) Conversion {status}. {'Table images will be converted to markdown format.' if enabled else 'Standard image summaries will be used.'}"
# Return: status message, markdown visibility, text visibility
return (
status_msg,
gr.update(visible=enabled), # markdown component
gr.update(visible=not enabled) # text component
)
markdown_kv_toggle.change(
fn=update_markdown_kv_toggle,
inputs=[markdown_kv_toggle],
outputs=[notification_display, image_summary_markdown, image_summary_text],
show_api=False
)
# Clear button handler
clear_file_btn.click(
fn=clear_all_inputs,
inputs=[],
outputs=[file_input, youtube_input, web_url_input, chatbot, image_display, image_summary_markdown, image_summary_text, extracted_text_display, image_descriptions_display],
show_api=False
)
# Event handlers for chat interface
def clear_chat():
"""
Clear the chat history and image display.
This function resets the chat interface by clearing the conversation
history and removing any displayed images and summaries, while keeping the current
document processing state intact.
Returns:
tuple: A tuple containing:
- []: Empty chat history list
- None: Clear image display
- "": Clear image summary display
Note:
- Only clears the chat interface, not the document processing state
- Called when the "Clear Chat" button is clicked
- Preserves the current RAG system and document status
Example:
>>> chat, image, summary = clear_chat()
>>> print(f"Chat cleared: {len(chat)} messages")
"""
return [], None, "", ""
# Chat functionality
send_btn.click(
fn=chat_with_rag,
inputs=[msg_input, chatbot],
outputs=[msg_input, chatbot, image_display, image_summary_markdown, image_summary_text],
show_progress=True,
show_api=False
)
msg_input.submit(
fn=chat_with_rag,
inputs=[msg_input, chatbot],
outputs=[msg_input, chatbot, image_display, image_summary_markdown, image_summary_text],
show_progress=True,
show_api=False
)
clear_chat_btn.click(
fn=clear_chat,
inputs=[],
outputs=[chatbot, image_display, image_summary_markdown, image_summary_text],
show_api=False
)
# Add core functions for MCP exposure (hidden from UI)
with gr.Tab("Core Functions", visible=False):
# These functions are only for MCP exposure, not visible in UI
# Unified extract_text_async API
gr.Interface(
fn=extract_text_sync,
inputs=[
gr.Textbox(label="File Path", visible=False),
gr.Textbox(label="YouTube ID", visible=False),
gr.Textbox(label="Web URL", visible=False),
gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False)
],
outputs=gr.Textbox(label="Extracted Text", visible=False),
title="extract_text_async",
description="Extract text from files, YouTube videos, or web URLs using specified provider",
api_name="extract_text_async"
)
# Unified create_rag_from_text API
gr.Interface(
fn=create_rag_from_text_sync,
inputs=[
gr.Textbox(label="Text", visible=False),
gr.Textbox(label="Source Info", visible=False),
gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False)
],
outputs=gr.JSON(label="RAG Result", visible=False),
title="create_rag_from_text",
description="Build RAG from text (self-hosted FAISS)",
api_name="create_rag_from_text"
)
# Unified search_rag_documents API
gr.Interface(
fn=search_rag_documents_sync,
inputs=[
gr.Textbox(label="Query", visible=False),
gr.Slider(label="Top K", minimum=1, maximum=20, value=5, visible=False),
gr.Dropdown(label="Provider", choices=["azure", "aws"], value="azure", visible=False)
],
outputs=gr.JSON(label="Search Results", visible=False),
title="search_rag_documents",
description="Search RAG documents (self-hosted FAISS)",
api_name="search_rag_documents"
)
return app
# Launch the app
if __name__ == "__main__":
app = create_gradio_app()
# Launch with MCP server enabled
app.launch(
mcp_server=True,
server_name="0.0.0.0",
server_port=7860,
share=True
)