Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import io | |
| import html | |
| import fitz | |
| import ezdxf | |
| import base64 | |
| import asyncio | |
| import logging | |
| import openpyxl | |
| import tempfile | |
| import requests | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv | |
| from spire.presentation import * | |
| from openai import AsyncOpenAI, OpenAI | |
| from spire.presentation.common import * | |
| from audio_extract import extract_audio | |
| from ezdxf.addons.drawing import pymupdf | |
| from urllib.parse import urljoin, urlparse | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from ezdxf.addons.drawing import Frontend, RenderContext, layout, config | |
| from system_prompt import SYSTEM_PROMPT, IMAGE_SUMMARY_PROMPT, TABLE_DETECTION_PROMPT, TABLE_TO_MARKDOWN_PROMPT, SIMPLE_IMAGE_SUMMARY_PROMPT, DXF_ANALYSIS_PROMPT | |
| # Try to import Gemini, but don't fail if not available | |
| try: | |
| import google.generativeai as genai | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| # Logger not yet initialized, use print instead | |
| print("Warning: Google Generative AI (Gemini) not available. Install with: pip install google-generativeai") | |
| # Cache for Gemini model to avoid reconfiguring on every page | |
| _gemini_model_cache = None | |
| load_dotenv() | |
| client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| sync_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| SUPPORTED_FILES = [".pdf", ".doc", ".docx", ".txt", ".pptx", ".xlsx", ".mp3", ".wav", ".mp4", ".dxf", ".jpg", ".jpeg", ".png"] | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Reduce OpenAI logging | |
| logging.getLogger("openai").setLevel(logging.WARNING) | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| # Suppress ezdxf warnings (common and harmless) | |
| logging.getLogger("ezdxf").setLevel(logging.ERROR) | |
| def upload_file_to_openai(file_path: str) -> str: | |
| """Upload file to OpenAI and return file ID""" | |
| try: | |
| # Upload the file | |
| with open(file_path, "rb") as file: | |
| file_response = sync_client.files.create( | |
| file=file, | |
| purpose="assistants" | |
| ) | |
| file_id = file_response.id | |
| logger.info("Successfully uploaded file. File ID") | |
| return file_id | |
| except Exception as e: | |
| logger.error(f"Failed to upload file: {e}") | |
| raise Exception(f"Failed to upload file: {str(e)}") | |
| async def extract_native_text_from_pdf_page(doc, page_num: int) -> str: | |
| """Extract native text from PDF page with proper encoding handling""" | |
| try: | |
| page = doc.load_page(page_num) | |
| # Try multiple text extraction methods with different encoding options | |
| text_methods = [ | |
| lambda: page.get_text("text"), | |
| lambda: page.get_text("dict"), | |
| lambda: page.get_text("html"), | |
| lambda: page.get_text("xml"), | |
| ] | |
| best_text = "" | |
| for method in text_methods: | |
| try: | |
| result = method() | |
| # Handle different return types | |
| if isinstance(result, str): | |
| text = result | |
| elif isinstance(result, dict): | |
| # Extract text from dictionary format | |
| text = "" | |
| for block in result.get("blocks", []): | |
| if "lines" in block: | |
| for line in block["lines"]: | |
| for span in line.get("spans", []): | |
| text += span.get("text", "") | |
| text += "\n" | |
| else: | |
| continue | |
| # Clean up the text | |
| if text and text.strip(): | |
| # Decode HTML entities (for Japanese characters and other Unicode) | |
| text = html.unescape(text) | |
| # Remove HTML tags if present | |
| text = re.sub(r'<[^>]+>', '', text) | |
| # Remove excessive whitespace and normalize | |
| text = re.sub(r'\s+', ' ', text.strip()) | |
| # Check if this text looks better (more readable characters) | |
| # Count printable characters including Japanese, Chinese, Korean, etc. | |
| readable_chars = sum(1 for c in text if c.isprintable() or c.isspace()) | |
| total_chars = len(text) | |
| if total_chars > 0 and readable_chars / total_chars > 0.8: # At least 80% readable | |
| best_text = text | |
| break | |
| except Exception as e: | |
| logger.debug(f"Text extraction method failed: {e}") | |
| continue | |
| # If we still have garbled text, try OCR fallback | |
| if best_text and len(best_text.strip()) > 10: | |
| # Use the universal corruption detection function | |
| if detect_corrupted_text(best_text): | |
| logger.info(f"Page {page_num} has corrupted text, will use OCR") | |
| return "" # Return empty to trigger OCR | |
| return best_text | |
| except Exception as e: | |
| logger.error(f"Error extracting native text from page {page_num}: {e}") | |
| return "" | |
| def detect_corrupted_text(text: str) -> bool: | |
| """ | |
| Detect if text contains corrupted characters that indicate encoding issues. | |
| This function works for any language (English, Japanese, Chinese, etc.) | |
| Args: | |
| text (str): Text to analyze for corruption | |
| Returns: | |
| bool: True if text appears corrupted, False otherwise | |
| """ | |
| if not text or len(text) < 10: | |
| return False | |
| # Check for excessive non-printable characters | |
| non_printable_ratio = sum(1 for c in text if not c.isprintable() and not c.isspace()) / len(text) | |
| # Check for excessive repetition of the same character (common corruption sign) | |
| char_counts = {} | |
| for char in text: | |
| if char.isprintable(): | |
| char_counts[char] = char_counts.get(char, 0) + 1 | |
| max_char_ratio = max(char_counts.values()) / len(text) if char_counts else 0 | |
| # Check for unusual character patterns that indicate encoding issues | |
| unusual_patterns = [ | |
| r'[\u0000-\u001F\u007F-\u009F]', | |
| r'[^\x20-\x7E\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF\uAC00-\uD7AF\u0400-\u04FF\u0100-\u017F\u0180-\u024F]', # Characters outside normal ranges | |
| ] | |
| unusual_count = 0 | |
| for pattern in unusual_patterns: | |
| unusual_count += len(re.findall(pattern, text)) | |
| unusual_ratio = unusual_count / len(text) if len(text) > 0 else 0 | |
| # Check for excessive whitespace or special characters | |
| whitespace_ratio = sum(1 for c in text if c.isspace()) / len(text) | |
| special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text) | |
| is_corrupted = (non_printable_ratio > 0.05 or | |
| max_char_ratio > 0.5 or | |
| unusual_ratio > 0.1 or | |
| whitespace_ratio > 0.6 or | |
| special_char_ratio > 0.7) | |
| if is_corrupted: | |
| logger.info(f"Detected corrupted text: non-printable={non_printable_ratio:.2f}, max_char={max_char_ratio:.2f}, unusual={unusual_ratio:.2f}, whitespace={whitespace_ratio:.2f}, special={special_char_ratio:.2f}") | |
| return is_corrupted | |
| async def validate_pdf_file(file_path: str) -> bool: | |
| """Validate if the PDF file is not corrupted and can be opened""" | |
| try: | |
| with fitz.open(file_path) as doc: | |
| page_count = len(doc) | |
| if page_count == 0: | |
| logger.warning("PDF file has 0 pages") | |
| return False | |
| first_page = doc.load_page(0) | |
| return True | |
| except fitz.FileDataError as e: | |
| logger.error(f"PDF file is corrupted or broken: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error validating PDF file: {e}") | |
| return False | |
| async def render_page_to_base64(doc, page_num: int) -> str: | |
| """Render PDF page to base64 PNG string""" | |
| try: | |
| page = doc.load_page(page_num) | |
| pix = page.get_pixmap(dpi=150) | |
| img_bytes = pix.tobytes("png") | |
| return base64.b64encode(img_bytes).decode("utf-8") | |
| except Exception as e: | |
| logger.error(f"Error rendering page {page_num}: {e}") | |
| raise | |
| async def extract_html_from_pdf_page(doc, page_num: int) -> str: | |
| """ | |
| Extract HTML from PDF page preserving table structure, colors, and formatting. | |
| This method extracts HTML format which preserves: | |
| - Table structure and borders | |
| - Color information | |
| - Text formatting (bold, italic, etc.) | |
| - Layout information | |
| Args: | |
| doc: PyMuPDF document object | |
| page_num: Page number (0-indexed) | |
| Returns: | |
| str: HTML content of the page | |
| """ | |
| try: | |
| page = doc.load_page(page_num) | |
| # Extract HTML format from PDF page | |
| html_content = page.get_text("html") | |
| # Clean and enhance HTML if needed | |
| if html_content: | |
| # Decode HTML entities | |
| html_content = html.unescape(html_content) | |
| # Add page marker | |
| html_content = f"<!-- PAGE {page_num + 1} -->\n{html_content}" | |
| return html_content or "" | |
| except Exception as e: | |
| logger.error(f"Error extracting HTML from page {page_num}: {e}") | |
| return "" | |
| async def extract_html_from_pdf(pdf_path: str, progress_callback=None) -> dict: | |
| """ | |
| Extract HTML from all PDF pages. | |
| Args: | |
| pdf_path: Path to PDF file | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| dict: Dictionary with page numbers as keys and HTML content as values | |
| """ | |
| try: | |
| doc = fitz.open(pdf_path) | |
| page_count = len(doc) | |
| html_pages = {} | |
| for i in range(page_count): | |
| if progress_callback: | |
| await progress_callback(f"📄 Extracting HTML from page {i+1} of {page_count}") | |
| html_content = await extract_html_from_pdf_page(doc, i) | |
| if html_content: | |
| html_pages[i + 1] = html_content | |
| doc.close() | |
| return html_pages | |
| except Exception as e: | |
| logger.error(f"Error extracting HTML from PDF: {e}") | |
| return {} | |
| # Cache for Gemini 3 model specifically for video analysis | |
| _gemini3_model_cache = None | |
| def configure_gemini(): | |
| """Configure Gemini API - cached version to avoid repeated API calls""" | |
| global _gemini_model_cache | |
| # Return cached model if available | |
| if _gemini_model_cache is not None: | |
| return _gemini_model_cache | |
| if not GEMINI_AVAILABLE: | |
| raise Exception("Gemini SDK not installed. Install with: pip install google-generativeai") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| raise Exception("GEMINI_API_KEY not found in environment variables") | |
| genai.configure(api_key=gemini_api_key) | |
| # First, get the list of available models (only once!) | |
| try: | |
| available_models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods] | |
| except Exception as e: | |
| logger.warning(f"Could not list available models: {e}") | |
| available_models = [] | |
| # Try models in order of preference, but only ones that are actually available | |
| preferred_models = [ | |
| 'gemini-1.5-flash', | |
| 'gemini-1.5-pro', | |
| 'gemini-pro', | |
| 'models/gemini-1.5-flash', | |
| 'models/gemini-1.5-pro', | |
| 'models/gemini-pro' | |
| ] | |
| # Try preferred models first if they're in the available list | |
| for model_name in preferred_models: | |
| # Check if model is available (with or without 'models/' prefix) | |
| model_variants = [model_name, f"models/{model_name}", model_name.replace("models/", "")] | |
| is_available = any(variant in available_models for variant in model_variants) | |
| if is_available or not available_models: # Try anyway if we couldn't list models | |
| try: | |
| model = genai.GenerativeModel(model_name) | |
| logger.info(f"Successfully configured Gemini model: {model_name}") | |
| _gemini_model_cache = model # Cache the model | |
| return model | |
| except Exception as e: | |
| logger.debug(f"Failed to configure model {model_name}: {e}") | |
| continue | |
| # If all preferred models fail, try the first available model | |
| if available_models: | |
| try: | |
| first_model = available_models[0] | |
| # Remove 'models/' prefix if present | |
| model_name = first_model.replace('models/', '') | |
| model = genai.GenerativeModel(model_name) | |
| logger.info(f"Using first available Gemini model: {model_name}") | |
| _gemini_model_cache = model # Cache the model | |
| return model | |
| except Exception as e: | |
| logger.error(f"Failed to use first available model {first_model}: {e}") | |
| raise Exception(f"Failed to configure any Gemini model. Available models: {available_models}") | |
| def configure_gemini3(): | |
| """Configure Gemini 3 Pro model specifically for video frame analysis - cached version""" | |
| global _gemini3_model_cache | |
| # Return cached model if available | |
| if _gemini3_model_cache is not None: | |
| return _gemini3_model_cache | |
| if not GEMINI_AVAILABLE: | |
| raise Exception("Gemini SDK not installed. Install with: pip install google-generativeai") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| raise Exception("GEMINI_API_KEY not found in environment variables") | |
| genai.configure(api_key=gemini_api_key) | |
| # Try Gemini 3 Pro with various possible naming conventions | |
| model_candidates = [ | |
| 'gemini-3-pro-preview', | |
| ] | |
| for model_name in model_candidates: | |
| try: | |
| model = genai.GenerativeModel(model_name) | |
| # Test the model with a simple request | |
| test_response = model.generate_content("Hello") | |
| logger.info(f"Successfully configured Gemini 3 Pro: {model_name}") | |
| _gemini3_model_cache = model | |
| return model | |
| except Exception as e: | |
| logger.debug(f"Failed to configure {model_name}: {e}") | |
| continue | |
| # If Gemini 3 Pro is not available, raise clear error | |
| raise Exception( | |
| "Gemini 3 Pro model is not available. " | |
| "Please ensure you have access to Gemini 3 Pro in your API account. " | |
| "Check https://ai.google.dev/ for model availability." | |
| ) | |
| async def process_hybrid_with_gemini(html_content: str, page_image_base64: str, user_query: str = None, page_num: int = None) -> str: | |
| """ | |
| Process hybrid format (HTML + PDF image) with Gemini. | |
| This combines: | |
| - HTML content (table structure, colors, formatting) | |
| - PDF page image (visual information, graphs, layout) | |
| Args: | |
| html_content: HTML content from PDF page | |
| page_image_base64: Base64-encoded image of the PDF page | |
| user_query: Optional user query for context | |
| page_num: Optional page number for reference | |
| Returns: | |
| str: Processed text content from Gemini | |
| """ | |
| try: | |
| if not GEMINI_AVAILABLE: | |
| raise Exception("Gemini SDK not available") | |
| model = configure_gemini() | |
| # Prepare prompt | |
| prompt = f"""Analyze this PDF page content. You are receiving: | |
| 1. HTML content (preserves table structure, colors, and formatting) | |
| 2. Page image (preserves visual information like graphs, images, and layout) | |
| Please extract and combine information from both sources to provide a comprehensive understanding of the page content. | |
| Focus on: | |
| - Table data and structure from HTML | |
| - Color information from HTML | |
| - Visual elements (graphs, charts, images) from the page image | |
| - Overall layout and structure | |
| - Text content from both sources | |
| """ | |
| if page_num: | |
| prompt += f"Page Number: {page_num}\n\n" | |
| if user_query: | |
| prompt += f"User query context: {user_query}\n\n" | |
| # Limit HTML length to avoid token limits | |
| html_preview = html_content[:8000] if len(html_content) > 8000 else html_content | |
| if len(html_content) > 8000: | |
| html_preview += "\n\n[HTML content truncated for length...]" | |
| prompt += "\nHTML Content:\n" + html_preview | |
| # Prepare content parts | |
| content_parts = [prompt] | |
| # Add image if available | |
| if page_image_base64: | |
| try: | |
| image_data = base64.b64decode(page_image_base64) | |
| content_parts.append({ | |
| "mime_type": "image/png", | |
| "data": image_data | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Could not decode image for Gemini: {e}") | |
| # Generate response using async wrapper | |
| def generate_sync(): | |
| return model.generate_content(content_parts) | |
| response = await asyncio.to_thread(generate_sync) | |
| if hasattr(response, 'text'): | |
| return response.text | |
| else: | |
| return str(response) | |
| except Exception as e: | |
| logger.error(f"Error processing hybrid content with Gemini: {e}") | |
| # Fallback: return HTML content if Gemini fails | |
| if html_content: | |
| return f"[Hybrid processing with Gemini failed: {str(e)}]\n\nHTML Content:\n{html_content[:2000]}" | |
| return f"[Error processing with Gemini: {str(e)}]" | |
| async def extract_page_with_llm(base64_img: str, page_num: int) -> str: | |
| """Send base64 page image to GPT-4 Vision asynchronously""" | |
| try: | |
| response = await client.chat.completions.create( | |
| model="gpt-4.1-mini", | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_img}"}} | |
| ]} | |
| ], | |
| temperature=0.0, | |
| ) | |
| return f"--- PAGE {page_num} ---\n{response.choices[0].message.content.strip()}\n" | |
| except Exception as e: | |
| logger.error(f"Error processing page {page_num} with LLM: {e}") | |
| return f"[ERROR on page {page_num}: {str(e)}]" | |
| async def summarize_image_with_gpt(image_blob_url: str, image_id: str, storage_provider: str = "azure", markdown_kv_enabled: bool = False, document_text: str = "") -> str: | |
| """Generate AI-powered summary of an image using GPT-4 Vision | |
| This function analyzes images and creates text summaries that can be used for: | |
| - Document search and retrieval | |
| - RAG (Retrieval-Augmented Generation) systems | |
| - Table conversion to markdown format (when markdown_kv_enabled=True) | |
| Args: | |
| image_blob_url: URL of the image stored in cloud storage | |
| image_id: Unique identifier for the image | |
| storage_provider: Cloud storage provider ("azure" or "aws") | |
| markdown_kv_enabled: If True, converts tables to markdown; otherwise creates simple summaries | |
| document_text: Optional document text for context | |
| Returns: | |
| str: AI-generated summary of the image content | |
| """ | |
| try: | |
| # Download image from self-hosted (HF) storage | |
| from hf_storage import retrieve_image | |
| folder_prefix = "pdf_images" | |
| if "pdf_images" in image_blob_url: | |
| folder_prefix = "pdf_images" | |
| elif "pptx_images" in image_blob_url: | |
| folder_prefix = "pptx_images" | |
| elif "docx_images" in image_blob_url: | |
| folder_prefix = "docx_images" | |
| elif "web_images" in image_blob_url: | |
| folder_prefix = "web_images" | |
| image_id_from_url = image_id | |
| if "/" in image_blob_url: | |
| parts = image_blob_url.split("/") | |
| for part in parts: | |
| if part.endswith(".jpg"): | |
| image_id_from_url = part.replace(".jpg", "") | |
| break | |
| logger.info(f"Downloading image from HF persistent storage: {image_id_from_url}") | |
| image_result = await retrieve_image(image_id_from_url, folder_prefix) | |
| if image_result and "base64_data" in image_result: | |
| image_data = base64.b64decode(image_result["base64_data"]) | |
| logger.info(f"Successfully downloaded {len(image_data)} bytes from HF persistent storage") | |
| else: | |
| logger.warning(f"Image not found in HF persistent storage: {image_id_from_url}") | |
| return f"[Image Summary for {image_id}: Visual content extracted from document]" | |
| base64_img = base64.b64encode(image_data).decode("utf-8") | |
| # If markdown_kv_enabled is False, use the standard summary approach | |
| if not markdown_kv_enabled: | |
| response = await client.chat.completions.create( | |
| model="gpt-4.1-mini", | |
| messages=[ | |
| {"role": "system", "content": IMAGE_SUMMARY_PROMPT}, | |
| {"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}} | |
| ]} | |
| ], | |
| temperature=0.0, | |
| ) | |
| # Check if response is valid and has choices | |
| if not response or not response.choices or len(response.choices) == 0: | |
| logger.error(f"No valid response for image {image_id}") | |
| return f"[Image Summary for {image_id}: Visual content extracted from document - No response from GPT]" | |
| summary = response.choices[0].message.content.strip() | |
| logger.info(f"Generated detailed summary for image {image_id}") | |
| return summary | |
| # If markdown_kv_enabled is True, check if image contains a table | |
| else: | |
| logger.info(f"Markdown-KV mode enabled. Detecting if image {image_id} contains a table...") | |
| # Step 1: Detect if the image contains a table | |
| detection_response = await client.chat.completions.create( | |
| model="gpt-4.1-mini", | |
| messages=[ | |
| {"role": "system", "content": TABLE_DETECTION_PROMPT}, | |
| {"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}} | |
| ]} | |
| ], | |
| temperature=0.0, | |
| ) | |
| if not detection_response or not detection_response.choices or len(detection_response.choices) == 0: | |
| logger.error(f"No valid detection response for image {image_id}") | |
| return f"[Image Summary for {image_id}: Visual content extracted from document - No response from GPT]" | |
| detection_result = detection_response.choices[0].message.content.strip().upper() | |
| logger.info(f"Image {image_id} detection result: {detection_result}") | |
| # Step 2: Process based on detection result | |
| if "TABLE" in detection_result: | |
| # Convert table to markdown format | |
| logger.info(f"Converting table image {image_id} to markdown format...") | |
| markdown_response = await client.chat.completions.create( | |
| model="gpt-4.1-mini", | |
| messages=[ | |
| {"role": "system", "content": TABLE_TO_MARKDOWN_PROMPT}, | |
| {"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}} | |
| ]} | |
| ], | |
| temperature=0.0, | |
| ) | |
| if not markdown_response or not markdown_response.choices or len(markdown_response.choices) == 0: | |
| logger.error(f"No valid markdown conversion response for image {image_id}") | |
| return f"[Image Summary for {image_id}: Table detected but conversion failed]" | |
| markdown_table = markdown_response.choices[0].message.content.strip() | |
| logger.info(f"Successfully converted table image {image_id} to markdown format") | |
| return f"{markdown_table}" | |
| else: | |
| # Generate simple summary for normal images | |
| logger.info(f"Generating simple summary for normal image {image_id}...") | |
| summary_response = await client.chat.completions.create( | |
| model="gpt-4.1-mini", | |
| messages=[ | |
| {"role": "system", "content": IMAGE_SUMMARY_PROMPT}, | |
| {"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}} | |
| ]} | |
| ], | |
| temperature=0.0, | |
| ) | |
| if not summary_response or not summary_response.choices or len(summary_response.choices) == 0: | |
| logger.error(f"No valid summary response for image {image_id}") | |
| return f"[Image Summary for {image_id}: Visual content extracted from document - No response from GPT]" | |
| simple_summary = summary_response.choices[0].message.content.strip() | |
| # Enhance with document visual elements if available | |
| if document_text: | |
| try: | |
| from shared_utilities import merge_visual_elements_with_ai_summary | |
| enhanced_summary, figure_metadata = await merge_visual_elements_with_ai_summary(image_id, simple_summary, document_text) | |
| logger.info(f"Generated enhanced summary for image {image_id} using document text + AI") | |
| # Store figure metadata if available (will be handled by caller) | |
| # For now, just return the enhanced summary | |
| return enhanced_summary | |
| except Exception as e: | |
| logger.warning(f"Failed to enhance summary for {image_id}: {e}") | |
| logger.info(f"Falling back to AI summary only for {image_id}") | |
| logger.info(f"Generated simple summary for image {image_id}") | |
| return simple_summary | |
| except Exception as e: | |
| logger.error(f"Error summarizing image {image_id}: {e}") | |
| # Return a simple fallback summary instead of an error message | |
| return f"Visual content extracted from document - contains images, diagrams, or other visual elements" | |
| async def extract_images_from_pdf(pdf_path: str, bbox_dict=None, progress_callback=None, storage_provider="azure") -> dict: | |
| """Extract ONLY images from PDF with Azure blob storage integration (NO LOCAL STORAGE) | |
| Args: | |
| pdf_path: Path to PDF file | |
| bbox_dict: Optional dict mapping image_id to bounding box (x0, y0, x1, y1) | |
| Example: {"page1_image1": (10, 10, 200, 200)} | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| dict with image_blob_urls and image_ids_by_page | |
| """ | |
| try: | |
| from PIL import Image | |
| doc = fitz.open(pdf_path) | |
| extracted_data = { | |
| "image_blob_urls": {}, | |
| "image_ids_by_page": {}, | |
| "total_pages": len(doc) | |
| } | |
| # Self-hosted: HF persistent storage (no client init needed) | |
| # Allowed image formats | |
| ALLOWED_FORMATS = ['jpeg', 'jpg', 'png'] | |
| total_images_found = 0 | |
| total_images_processed = 0 | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| image_list = page.get_images(full=True) | |
| if image_list: | |
| page_images_found = len(image_list) | |
| total_images_found += page_images_found | |
| if progress_callback: | |
| await progress_callback(f"📸 Page {page_num+1}: Found {page_images_found} images") | |
| else: | |
| print(f"Page {page_num+1}: Found {page_images_found} images") | |
| # Extract images if they exist | |
| page_image_ids = [] | |
| for img_index, img_info in enumerate(image_list): | |
| xref = img_info[0] | |
| base_image = doc.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| image_ext = base_image["ext"].lower() | |
| # Check image format compatibility | |
| if image_ext not in ALLOWED_FORMATS: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping image {img_index+1} on page {page_num+1} (format: {image_ext})") | |
| else: | |
| logger.info(f"Skipping image on page {page_num+1} with format: {image_ext} (only jpg, jpeg, png allowed)") | |
| continue | |
| # Convert image to RGB if necessary | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Filter out small/empty images (less than 50x50 pixels or very small file size) | |
| if image.width < 50 or image.height < 50 or len(image_bytes) < 5000: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping small/empty image {img_index+1} on page {page_num+1} ({image.width}x{image.height}, {len(image_bytes)} bytes)") | |
| else: | |
| print(f"Skipping small/empty image on page {page_num+1}: {image.width}x{image.height}, {len(image_bytes)} bytes") | |
| continue | |
| # Check for duplicate images by comparing image hash | |
| import hashlib | |
| image_hash = hashlib.md5(image_bytes).hexdigest() | |
| if hasattr(extract_images_from_pdf, '_seen_hashes'): | |
| if image_hash in extract_images_from_pdf._seen_hashes: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping duplicate image {img_index+1} on page {page_num+1}") | |
| else: | |
| print(f"Skipping duplicate image on page {page_num+1}") | |
| continue | |
| extract_images_from_pdf._seen_hashes.add(image_hash) | |
| else: | |
| extract_images_from_pdf._seen_hashes = {image_hash} | |
| # Generate unique image ID | |
| image_id = f"page{page_num+1}_image{img_index+1}" | |
| page_image_ids.append(image_id) | |
| total_images_processed += 1 | |
| # Get image bounding box on the page (for caption linking) | |
| image_rect = None | |
| try: | |
| # Try to get image rectangle from page | |
| page = doc.load_page(page_num) | |
| image_rects = page.get_image_rects(xref) | |
| if image_rects: | |
| # Get the first (usually only) rectangle | |
| rect = image_rects[0] | |
| image_rect = (rect.x0, rect.y0, rect.x1, rect.y1) | |
| extracted_data["image_bboxes"][image_id] = image_rect | |
| # Store detailed image info for caption linking | |
| extracted_data["image_info"][image_id] = { | |
| 'bbox': image_rect, | |
| 'page': page_num + 1, | |
| 'index': img_index + 1, | |
| 'width': image.width, | |
| 'height': image.height | |
| } | |
| except Exception as e: | |
| logger.debug(f"Could not get bounding box for image {image_id}: {e}") | |
| if progress_callback: | |
| await progress_callback(f"🖼️ Processing image {total_images_processed}: {image_id} ({image.width}x{image.height})") | |
| else: | |
| print(f"Processing image {total_images_processed}: {image_id} ({image.width}x{image.height})") | |
| # Apply bounding box cropping if provided | |
| bbox = bbox_dict.get(image_id) if bbox_dict else None | |
| if bbox: | |
| image = await crop_image_with_bounding_box(image, bbox) | |
| # Resize image to reasonable size for UI display (max 800px width/height) | |
| max_size = 800 | |
| if image.width > max_size or image.height > max_size: | |
| image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| # Convert to JPEG bytes for blob storage with quality optimization | |
| img_buffer = io.BytesIO() | |
| image.save(img_buffer, format='JPEG', quality=85, optimize=True) | |
| img_buffer.seek(0) | |
| jpeg_bytes = img_buffer.getvalue() | |
| # TESTING: Save image locally for inspection | |
| try: | |
| pdf_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| local_images_dir = os.path.join(os.path.dirname(pdf_path), f"{pdf_name}_extracted_images") | |
| os.makedirs(local_images_dir, exist_ok=True) | |
| local_image_path = os.path.join(local_images_dir, f"{image_id}.jpg") | |
| with open(local_image_path, 'wb') as f: | |
| f.write(jpeg_bytes) | |
| logger.info(f"TESTING: Saved image locally: {local_image_path}") | |
| except Exception as local_e: | |
| logger.warning(f"Failed to save image locally: {local_e}") | |
| # Store image in self-hosted (HF) storage | |
| try: | |
| from hf_storage import store_image | |
| success = await store_image(image_id, jpeg_bytes, folder_prefix="pdf_images") | |
| if success: | |
| blob_url = f"hf://pdf_images/{image_id}.jpg" | |
| extracted_data["image_blob_urls"][image_id] = blob_url | |
| logger.info(f"Uploaded image to HF persistent storage: pdf_images/{image_id}.jpg") | |
| else: | |
| logger.error(f"Failed to upload image {image_id} to HF persistent storage") | |
| except Exception as e: | |
| logger.error(f"Failed to upload image {image_id} to HF storage: {e}") | |
| continue | |
| # Store image IDs for this page | |
| if page_image_ids: | |
| extracted_data["image_ids_by_page"][page_num+1] = page_image_ids | |
| else: | |
| if progress_callback: | |
| await progress_callback(f"📄 Page {page_num+1}: No images found") | |
| else: | |
| logger.info(f"Page {page_num+1}: No images found") | |
| # Final summary | |
| if progress_callback: | |
| await progress_callback(f"✅ Image extraction complete: {total_images_processed}/{total_images_found} images processed") | |
| doc.close() | |
| return extracted_data | |
| except Exception as e: | |
| logger.error(f"Error extracting images from PDF: {e}") | |
| raise Exception(f"Error extracting images from PDF: {str(e)}") | |
| async def process_pdf(file_path: str, progress_callback=None) -> str: | |
| """Process PDF pages with native text extraction priority and OCR fallback""" | |
| try: | |
| if not await validate_pdf_file(file_path): | |
| raise Exception("The uploaded PDF file is corrupted or cannot be opened. Please upload a valid PDF file.") | |
| doc = fitz.open(file_path) | |
| page_count = len(doc) | |
| if page_count == 0: | |
| doc.close() | |
| raise Exception("The PDF file contains no pages.") | |
| logger.info(f"Processing PDF with {page_count} pages") | |
| # First, try to extract native text from all pages | |
| native_text_results = [] | |
| pages_needing_ocr = [] | |
| all_extracted_text = [] | |
| for i in range(page_count): | |
| try: | |
| if progress_callback: | |
| await progress_callback(f"📄 Extracting native text from page {i+1} of {page_count}") | |
| print(f"Extracting native text from page {i+1} of {page_count}") | |
| native_text = await extract_native_text_from_pdf_page(doc, i) | |
| if native_text and len(native_text.strip()) > 50: | |
| native_text_results.append(f"--- PAGE {i+1} ---\n{native_text}\n") | |
| all_extracted_text.append(native_text) | |
| else: | |
| # Page needs OCR processing | |
| print(f"Page {i+1} needs OCR processing") | |
| pages_needing_ocr.append(i) | |
| native_text_results.append(None) | |
| except Exception as e: | |
| logger.error(f"Error processing page {i+1}: {e}") | |
| pages_needing_ocr.append(i) | |
| native_text_results.append(None) | |
| # Process pages that need OCR | |
| if pages_needing_ocr: | |
| if progress_callback: | |
| await progress_callback(f"🔍 Using OCR for {len(pages_needing_ocr)} pages with insufficient native text") | |
| print(f"🔍 Using OCR for {len(pages_needing_ocr)} pages with insufficient native text") | |
| # Process OCR pages sequentially with progress updates | |
| ocr_results = [] | |
| for idx, page_num in enumerate(pages_needing_ocr): | |
| try: | |
| if progress_callback: | |
| await progress_callback(f"🔄 OCR processing page {page_num+1} ({idx+1}/{len(pages_needing_ocr)})") | |
| print(f"🔄 OCR processing page {page_num+1} ({idx+1}/{len(pages_needing_ocr)})") | |
| base64_img = await render_page_to_base64(doc, page_num) | |
| result = await extract_page_with_llm(base64_img, page_num + 1) | |
| ocr_results.append(result) | |
| if progress_callback: | |
| await progress_callback(f"✅ Page {page_num+1} OCR completed ({idx+1}/{len(pages_needing_ocr)})") | |
| print(f"✅ Page {page_num+1} OCR completed ({idx+1}/{len(pages_needing_ocr)})") | |
| except Exception as e: | |
| logger.error(f"Error processing OCR for page {page_num+1}: {e}") | |
| ocr_results.append(f"[ERROR on page {page_num+1}: {str(e)}]") | |
| if progress_callback: | |
| await progress_callback(f"❌ Page {page_num+1} OCR failed ({idx+1}/{len(pages_needing_ocr)})") | |
| print(f"❌ Page {page_num+1} OCR failed ({idx+1}/{len(pages_needing_ocr)})") | |
| # Replace None results with OCR results | |
| ocr_index = 0 | |
| for i, result in enumerate(native_text_results): | |
| if result is None: | |
| if ocr_index < len(ocr_results): | |
| if isinstance(ocr_results[ocr_index], Exception): | |
| native_text_results[i] = f"[ERROR on page {i+1}: {str(ocr_results[ocr_index])}]" | |
| else: | |
| native_text_results[i] = ocr_results[ocr_index] | |
| ocr_index += 1 | |
| else: | |
| native_text_results[i] = f"[ERROR on page {i+1}: OCR processing failed]" | |
| doc.close() | |
| # Post-process: Check for corrupted text and re-process with OCR | |
| # Only check pages that used native text extraction (not already OCR'd) | |
| if progress_callback: | |
| await progress_callback("🔍 Checking for corrupted text and applying OCR fixes...") | |
| print("Checking for corrupted text and applying OCR fixes...") | |
| final_processed_results = [] | |
| pages_to_reprocess = [] | |
| for i, result in enumerate(native_text_results): | |
| # Skip corruption check for pages that were already OCR'd | |
| if i in pages_needing_ocr: | |
| # This page already went through OCR, use the result as-is | |
| final_processed_results.append(result) | |
| elif result and not result.startswith("[ERROR"): | |
| # Extract the actual text content (remove page header) | |
| page_content = result.replace(f"--- PAGE {i+1} ---\n", "").strip() | |
| if detect_corrupted_text(page_content): | |
| logger.info(f"Page {i+1} has corrupted text, will reprocess with OCR") | |
| pages_to_reprocess.append(i) | |
| final_processed_results.append(None) # Placeholder | |
| else: | |
| final_processed_results.append(result) | |
| else: | |
| final_processed_results.append(result) | |
| # Re-process corrupted pages with OCR | |
| if pages_to_reprocess: | |
| if progress_callback: | |
| await progress_callback(f"🔍 Re-processing {len(pages_to_reprocess)} corrupted pages with OCR...") | |
| print(f"Re-processing {len(pages_to_reprocess)} corrupted pages with OCR...") | |
| # Re-open document for OCR processing | |
| doc = fitz.open(file_path) | |
| # Process corrupted pages sequentially with progress updates | |
| ocr_results = [] | |
| for idx, page_num in enumerate(pages_to_reprocess): | |
| try: | |
| if progress_callback: | |
| await progress_callback(f"🔄 OCR reprocessing corrupted page {page_num+1} ({idx+1}/{len(pages_to_reprocess)})") | |
| print(f"🔄 OCR reprocessing corrupted page {page_num+1} ({idx+1}/{len(pages_to_reprocess)})") | |
| base64_img = await render_page_to_base64(doc, page_num) | |
| result = await extract_page_with_llm(base64_img, page_num + 1) | |
| ocr_results.append(result) | |
| if progress_callback: | |
| await progress_callback(f"✅ Corrupted page {page_num+1} OCR completed ({idx+1}/{len(pages_to_reprocess)})") | |
| print(f"✅ Corrupted page {page_num+1} OCR completed ({idx+1}/{len(pages_to_reprocess)})") | |
| except Exception as e: | |
| logger.error(f"Error reprocessing OCR for corrupted page {page_num+1}: {e}") | |
| ocr_results.append(f"[ERROR on page {page_num+1}: {str(e)}]") | |
| if progress_callback: | |
| await progress_callback(f"❌ Corrupted page {page_num+1} OCR failed ({idx+1}/{len(pages_to_reprocess)})") | |
| print(f"❌ Corrupted page {page_num+1} OCR failed ({idx+1}/{len(pages_to_reprocess)})") | |
| # Replace None results with OCR results | |
| ocr_index = 0 | |
| for i, result in enumerate(final_processed_results): | |
| if result is None: | |
| if ocr_index < len(ocr_results): | |
| if isinstance(ocr_results[ocr_index], Exception): | |
| final_processed_results[i] = f"[ERROR on page {i+1}: {str(ocr_results[ocr_index])}]" | |
| else: | |
| final_processed_results[i] = ocr_results[ocr_index] | |
| ocr_index += 1 | |
| else: | |
| final_processed_results[i] = f"[ERROR on page {i+1}: OCR processing failed]" | |
| doc.close() | |
| # Filter out None results and join | |
| final_results = [result for result in final_processed_results if result is not None] | |
| if not final_results: | |
| return "No text content could be extracted from the PDF file." | |
| return "\n".join(final_results) | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {e}") | |
| raise Exception(f"Error processing PDF file: {str(e)}") | |
| async def process_pdf_hybrid(file_path: str, progress_callback=None) -> str: | |
| """ | |
| Process PDF using hybrid method: HTML + PDF images with Gemini. | |
| This method combines: | |
| - HTML conversion (table structure, colors, formatting) | |
| - PDF image conversion (visual information, graphs, layout) | |
| Both are passed to Gemini for comprehensive analysis. | |
| Args: | |
| file_path: Path to PDF file | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| str: Processed text content combining HTML and image information | |
| """ | |
| try: | |
| if not await validate_pdf_file(file_path): | |
| raise Exception("The uploaded PDF file is corrupted or cannot be opened.") | |
| doc = fitz.open(file_path) | |
| page_count = len(doc) | |
| if page_count == 0: | |
| doc.close() | |
| raise Exception("The PDF file contains no pages.") | |
| logger.info(f"Processing PDF with hybrid method: {page_count} pages") | |
| print(f"Processing PDF with hybrid method: {page_count} pages") | |
| # Check if Gemini is available | |
| if not GEMINI_AVAILABLE: | |
| logger.warning("Gemini not available, falling back to standard PDF processing") | |
| doc.close() | |
| return await process_pdf(file_path, progress_callback) | |
| all_results = [] | |
| for i in range(page_count): | |
| try: | |
| if progress_callback: | |
| await progress_callback(f"🔄 Hybrid processing page {i+1} of {page_count} (HTML + Image)") | |
| # Step 1: Extract HTML from page | |
| html_content = await extract_html_from_pdf_page(doc, i) | |
| # Step 2: Render page as image | |
| page_image_base64 = await render_page_to_base64(doc, i) | |
| # Step 3: Process both with Gemini | |
| if html_content or page_image_base64: | |
| processed_content = await process_hybrid_with_gemini( | |
| html_content or "", | |
| page_image_base64, | |
| None, # No user query during processing | |
| i + 1 # Page number | |
| ) | |
| all_results.append(f"--- PAGE {i+1} ---\n{processed_content}\n") | |
| else: | |
| all_results.append(f"--- PAGE {i+1} ---\n[No content extracted]\n") | |
| if progress_callback: | |
| await progress_callback(f"✅ Page {i+1} hybrid processing completed") | |
| except Exception as e: | |
| logger.error(f"Error processing page {i+1} with hybrid method: {e}") | |
| # Fallback to standard extraction for this page | |
| try: | |
| native_text = await extract_native_text_from_pdf_page(doc, i) | |
| if native_text and len(native_text.strip()) > 50: | |
| all_results.append(f"--- PAGE {i+1} ---\n{native_text}\n") | |
| else: | |
| all_results.append(f"--- PAGE {i+1} ---\n[Error: {str(e)}]\n") | |
| except: | |
| all_results.append(f"--- PAGE {i+1} ---\n[Error: {str(e)}]\n") | |
| doc.close() | |
| if not all_results: | |
| return "No content could be extracted from the PDF file." | |
| return "\n".join(all_results) | |
| except Exception as e: | |
| logger.error(f"Error processing PDF with hybrid method: {e}") | |
| # Fallback to standard processing | |
| try: | |
| logger.info("Falling back to standard PDF processing") | |
| return await process_pdf(file_path, progress_callback) | |
| except: | |
| raise Exception(f"Error processing PDF file: {str(e)}") | |
| async def capture_page_images_as_fallback(pdf_path: str, storage_provider: str = "azure", progress_callback=None) -> dict: | |
| """Capture PDF pages as images when no extractable images are found | |
| This fallback mechanism handles PDFs that contain visual content but don't have | |
| extractable image objects. It renders each page as a high-quality image and stores | |
| them for AI analysis and retrieval. | |
| Common use cases: | |
| - Scanned documents where entire pages are images | |
| - PDFs with vector graphics that aren't stored as raster images | |
| - Documents with embedded content in non-standard formats | |
| Args: | |
| pdf_path: Path to the PDF file | |
| storage_provider: Cloud storage provider ("azure" or "aws") | |
| progress_callback: Optional callback function for progress updates | |
| Returns: | |
| dict: Contains image_blob_urls, image_ids_by_page, and total_pages | |
| """ | |
| try: | |
| from PIL import Image | |
| doc = fitz.open(pdf_path) | |
| extracted_data = { | |
| "image_blob_urls": {}, | |
| "image_ids_by_page": {}, | |
| "total_pages": len(doc) | |
| } | |
| # Self-hosted: HF persistent storage (no client init needed) | |
| total_images_processed = 0 | |
| for page_num in range(len(doc)): | |
| try: | |
| page = doc.load_page(page_num) | |
| # Render page as image | |
| pix = page.get_pixmap(dpi=150) | |
| img_bytes = pix.tobytes("png") | |
| # Convert to PIL Image | |
| image = Image.open(io.BytesIO(img_bytes)) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Generate unique image ID | |
| image_id = f"page{page_num+1}_image1" | |
| extracted_data["image_ids_by_page"][page_num+1] = [image_id] | |
| total_images_processed += 1 | |
| if progress_callback: | |
| await progress_callback(f"🖼️ Capturing page {page_num+1} as image: {image_id} ({image.width}x{image.height})") | |
| else: | |
| print(f"Capturing page {page_num+1} as image: {image_id} ({image.width}x{image.height})") | |
| # Resize image to reasonable size for UI display (max 800px width/height) | |
| max_size = 800 | |
| if image.width > max_size or image.height > max_size: | |
| image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| # Convert to JPEG bytes for blob storage with quality optimization | |
| img_buffer = io.BytesIO() | |
| image.save(img_buffer, format='JPEG', quality=85, optimize=True) | |
| img_buffer.seek(0) | |
| jpeg_bytes = img_buffer.getvalue() | |
| # Upload to self-hosted (HF) storage | |
| try: | |
| from hf_storage import store_image | |
| success = await store_image(image_id, jpeg_bytes, folder_prefix="pdf_images") | |
| if success: | |
| blob_url = f"hf://pdf_images/{image_id}.jpg" | |
| extracted_data["image_blob_urls"][image_id] = blob_url | |
| logger.info(f"Uploaded page image to HF persistent storage: {image_id}") | |
| else: | |
| logger.error(f"Failed to upload page image {image_id} to HF storage") | |
| except Exception as e: | |
| logger.error(f"Failed to upload page image {image_id} to HF storage: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Error capturing page {page_num+1} as image: {e}") | |
| continue | |
| # Final summary | |
| if progress_callback: | |
| await progress_callback(f"✅ Page image capture complete: {total_images_processed} pages captured as images") | |
| doc.close() | |
| return extracted_data | |
| except Exception as e: | |
| logger.error(f"Error capturing page images as fallback: {e}") | |
| raise Exception(f"Error capturing page images as fallback: {str(e)}") | |
| async def extract_images_from_docx(docx_path: str, bbox_dict=None, progress_callback=None, storage_provider="azure") -> dict: | |
| """Extract ONLY images from DOCX with Azure blob storage integration (NO LOCAL STORAGE) | |
| Args: | |
| docx_path: Path to DOCX file | |
| bbox_dict: Optional dict mapping image_id to bounding box (x0, y0, x1, y1) | |
| Example: {"docx_image1": (10, 10, 200, 200)} | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| dict with image_blob_urls and image_ids | |
| """ | |
| try: | |
| from docx import Document | |
| from PIL import Image | |
| import imghdr | |
| doc = Document(docx_path) | |
| extracted_data = { | |
| "image_blob_urls": {}, | |
| "image_ids": [], | |
| "total_paragraphs": len(doc.paragraphs) | |
| } | |
| # Self-hosted: HF persistent storage (no client init needed) | |
| # Allowed image formats | |
| ALLOWED_FORMATS = ['jpeg', 'jpg', 'png'] | |
| # Extract images from document relationships | |
| image_counter = 0 | |
| total_images_found = 0 | |
| total_images_processed = 0 | |
| # Count total images first | |
| for rel in doc.part.rels: | |
| if "image" in doc.part.rels[rel].target_ref: | |
| total_images_found += 1 | |
| if progress_callback: | |
| await progress_callback(f"📸 DOCX: Found {total_images_found} images") | |
| else: | |
| print(f"DOCX: Found {total_images_found} images") | |
| for rel in doc.part.rels: | |
| if "image" in doc.part.rels[rel].target_ref: | |
| try: | |
| image_part = doc.part.rels[rel].target_part | |
| image_bytes = image_part.blob | |
| # Detect image format | |
| image_format = imghdr.what(None, h=image_bytes) | |
| # Check image format compatibility | |
| if image_format not in ALLOWED_FORMATS: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping DOCX image (format: {image_format})") | |
| else: | |
| logger.info(f"Skipping DOCX image with format: {image_format} (only jpg, jpeg, png allowed)") | |
| continue | |
| # Convert image to RGB if necessary | |
| pil_image = Image.open(io.BytesIO(image_bytes)) | |
| if pil_image.mode != 'RGB': | |
| pil_image = pil_image.convert('RGB') | |
| # Filter out small/empty images (less than 50x50 pixels or very small file size) | |
| if pil_image.width < 50 or pil_image.height < 50 or len(image_bytes) < 5000: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping small/empty DOCX image ({pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes)") | |
| else: | |
| print(f"Skipping small/empty DOCX image: {pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes") | |
| continue | |
| # Check for duplicate images by comparing image hash | |
| import hashlib | |
| image_hash = hashlib.md5(image_bytes).hexdigest() | |
| if hasattr(extract_images_from_docx, '_seen_hashes'): | |
| if image_hash in extract_images_from_docx._seen_hashes: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping duplicate DOCX image") | |
| else: | |
| print(f"Skipping duplicate DOCX image") | |
| continue | |
| extract_images_from_docx._seen_hashes.add(image_hash) | |
| else: | |
| extract_images_from_docx._seen_hashes = {image_hash} | |
| # Generate unique image ID | |
| image_id = f"docx_image{image_counter+1}" | |
| extracted_data["image_ids"].append(image_id) | |
| image_counter += 1 | |
| total_images_processed += 1 | |
| if progress_callback: | |
| await progress_callback(f"🖼️ Processing DOCX image {total_images_processed}: {image_id} ({pil_image.width}x{pil_image.height})") | |
| else: | |
| print(f"Processing DOCX image {total_images_processed}: {image_id} ({pil_image.width}x{pil_image.height})") | |
| # Apply bounding box cropping if provided | |
| bbox = bbox_dict.get(image_id) if bbox_dict else None | |
| if bbox: | |
| pil_image = await crop_image_with_bounding_box(pil_image, bbox) | |
| # Resize image to reasonable size for UI display (max 800px width/height) | |
| max_size = 800 | |
| if pil_image.width > max_size or pil_image.height > max_size: | |
| pil_image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| # Convert to JPEG bytes for blob storage with quality optimization | |
| img_buffer = io.BytesIO() | |
| pil_image.save(img_buffer, format='JPEG', quality=85, optimize=True) | |
| img_buffer.seek(0) | |
| jpeg_bytes = img_buffer.getvalue() | |
| # Store image in self-hosted (HF) storage | |
| try: | |
| from hf_storage import store_image | |
| success = await store_image(image_id, jpeg_bytes, folder_prefix="docx_images") | |
| if success: | |
| blob_url = f"hf://docx_images/{image_id}.jpg" | |
| extracted_data["image_blob_urls"][image_id] = blob_url | |
| logger.info(f"Uploaded image to HF persistent storage: docx_images/{image_id}.jpg") | |
| else: | |
| logger.error(f"Failed to upload image {image_id} to HF persistent storage") | |
| except Exception as e: | |
| logger.error(f"Failed to upload image {image_id} to HF storage: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Error processing image {rel}: {e}") | |
| continue | |
| # Final summary | |
| if progress_callback: | |
| await progress_callback(f"✅ DOCX image extraction complete: {total_images_processed}/{total_images_found} images processed") | |
| if not extracted_data["image_ids"]: | |
| if progress_callback: | |
| await progress_callback("📄 No images found in DOCX document") | |
| else: | |
| logger.info("No images found in document") | |
| return extracted_data | |
| except Exception as e: | |
| logger.error(f"Error extracting images from DOCX: {e}") | |
| raise Exception(f"Error extracting images from DOCX: {str(e)}") | |
| async def process_docx(file_path: str, progress_callback=None) -> str: | |
| """Process DOCX file with progress logging using python-docx""" | |
| try: | |
| if progress_callback: | |
| await progress_callback("📄 Processing DOCX file...") | |
| else: | |
| print("Processing DOCX file...") | |
| from docx import Document | |
| # Load the document | |
| doc = Document(file_path) | |
| # Extract all text from paragraphs | |
| all_text = [] | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| all_text.append(paragraph.text.strip()) | |
| # Extract text from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = [] | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| row_text.append(cell.text.strip()) | |
| if row_text: | |
| all_text.append(" | ".join(row_text)) | |
| # Combine all text | |
| full_text = "\n".join(all_text) | |
| if not full_text.strip(): | |
| return "No text content found in the DOCX document." | |
| return full_text | |
| except Exception as e: | |
| logger.error(f"Error processing DOCX file: {e}") | |
| raise Exception(f"Error processing DOCX file: {str(e)}") | |
| async def process_txt(file_path: str) -> str: | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| content = file.read() | |
| return content | |
| async def extract_images_from_pptx(pptx_path: str, bbox_dict=None, progress_callback=None, storage_provider="azure") -> dict: | |
| """Extract ONLY images from PPTX with Azure blob storage integration (NO LOCAL STORAGE) | |
| Args: | |
| pptx_path: Path to PPTX file | |
| bbox_dict: Optional dict mapping image_id to bounding box (x0, y0, x1, y1) | |
| Example: {"slide1_image1": (10, 10, 200, 200)} | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| dict with image_blob_urls and image_ids_by_slide | |
| """ | |
| try: | |
| from pptx import Presentation | |
| from PIL import Image | |
| prs = Presentation(pptx_path) | |
| extracted_data = { | |
| "image_blob_urls": {}, | |
| "image_ids_by_slide": {}, | |
| "total_slides": len(prs.slides) | |
| } | |
| # Self-hosted: HF persistent storage (no client init needed) | |
| total_images_found = 0 | |
| total_images_processed = 0 | |
| # Count total images first | |
| for slide_num, slide in enumerate(prs.slides): | |
| for shape in slide.shapes: | |
| if hasattr(shape, "image"): | |
| if shape.image.ext.lower() in ['jpg', 'jpeg', 'png']: | |
| total_images_found += 1 | |
| if progress_callback: | |
| await progress_callback(f"📸 PPTX: Found {total_images_found} images across {len(prs.slides)} slides") | |
| else: | |
| print(f"PPTX: Found {total_images_found} images across {len(prs.slides)} slides") | |
| for slide_num, slide in enumerate(prs.slides): | |
| # Extract images from slide | |
| slide_image_ids = [] | |
| image_counter = 0 | |
| for shape in slide.shapes: | |
| if hasattr(shape, "image"): | |
| try: | |
| image = shape.image | |
| # Only extract jpg, jpeg, and png images | |
| if image.ext.lower() in ['jpg', 'jpeg', 'png']: | |
| image_bytes = image.blob | |
| # Generate unique image ID | |
| image_id = f"slide{slide_num+1}_image{image_counter+1}" | |
| slide_image_ids.append(image_id) | |
| image_counter += 1 | |
| total_images_processed += 1 | |
| if progress_callback: | |
| await progress_callback(f"🖼️ Processing PPTX image {total_images_processed}: {image_id}") | |
| else: | |
| print(f"Processing PPTX image {total_images_processed}: {image_id}") | |
| # Convert image to RGB if necessary | |
| pil_image = Image.open(io.BytesIO(image_bytes)) | |
| if pil_image.mode != 'RGB': | |
| pil_image = pil_image.convert('RGB') | |
| # Filter out small/empty images (less than 50x50 pixels or very small file size) | |
| if pil_image.width < 50 or pil_image.height < 50 or len(image_bytes) < 5000: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping small/empty PPTX image on slide {slide_num+1} ({pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes)") | |
| else: | |
| print(f"Skipping small/empty PPTX image on slide {slide_num+1}: {pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes") | |
| continue | |
| # Check for duplicate images by comparing image hash | |
| import hashlib | |
| image_hash = hashlib.md5(image_bytes).hexdigest() | |
| if hasattr(extract_images_from_pptx, '_seen_hashes'): | |
| if image_hash in extract_images_from_pptx._seen_hashes: | |
| if progress_callback: | |
| await progress_callback(f"⏭️ Skipping duplicate PPTX image on slide {slide_num+1}") | |
| else: | |
| print(f"Skipping duplicate PPTX image on slide {slide_num+1}") | |
| continue | |
| extract_images_from_pptx._seen_hashes.add(image_hash) | |
| else: | |
| extract_images_from_pptx._seen_hashes = {image_hash} | |
| # Apply bounding box cropping if provided | |
| bbox = bbox_dict.get(image_id) if bbox_dict else None | |
| if bbox: | |
| pil_image = await crop_image_with_bounding_box(pil_image, bbox) | |
| # Resize image to reasonable size for UI display (max 800px width/height) | |
| max_size = 800 | |
| if pil_image.width > max_size or pil_image.height > max_size: | |
| pil_image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| # Convert to JPEG bytes for blob storage with quality optimization | |
| img_buffer = io.BytesIO() | |
| pil_image.save(img_buffer, format='JPEG', quality=85, optimize=True) | |
| img_buffer.seek(0) | |
| jpeg_bytes = img_buffer.getvalue() | |
| # Store image in cloud storage | |
| try: | |
| from hf_storage import store_image | |
| success = await store_image(image_id, jpeg_bytes, folder_prefix="pptx_images") | |
| if success: | |
| blob_url = f"hf://pptx_images/{image_id}.jpg" | |
| extracted_data["image_blob_urls"][image_id] = blob_url | |
| logger.info(f"Uploaded image to HF persistent storage: pptx_images/{image_id}.jpg") | |
| else: | |
| logger.error(f"Failed to upload image {image_id} to HF persistent storage") | |
| except Exception as e: | |
| logger.error(f"Failed to upload image {image_id} to HF storage: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Error processing image on slide {slide_num+1}: {e}") | |
| continue | |
| # Store image IDs for this slide | |
| if slide_image_ids: | |
| extracted_data["image_ids_by_slide"][slide_num+1] = slide_image_ids | |
| else: | |
| if progress_callback: | |
| await progress_callback(f"📄 Slide {slide_num+1}: No images found") | |
| else: | |
| logger.info(f"Slide {slide_num+1}: No images found") | |
| # Final summary | |
| if progress_callback: | |
| await progress_callback(f"✅ PPTX image extraction complete: {total_images_processed}/{total_images_found} images processed") | |
| return extracted_data | |
| except Exception as e: | |
| logger.error(f"Error extracting images from PPTX: {e}") | |
| raise Exception(f"Error extracting images from PPTX: {str(e)}") | |
| async def process_pptx(file_path: str, progress_callback=None) -> str: | |
| """Process PPTX file by extracting text and sending to LLM for analysis with progress logging""" | |
| try: | |
| from pptx import Presentation | |
| # Load the presentation | |
| prs = Presentation(file_path) | |
| slide_count = len(prs.slides) | |
| logger.info(f"Processing PPTX with {slide_count} slides") | |
| # Extract all text from the presentation | |
| all_text = [] | |
| for slide_num, slide in enumerate(prs.slides): | |
| if progress_callback: | |
| await progress_callback(f"📊 Processing slide {slide_num+1} of {slide_count}") | |
| else: | |
| print(f"Processing slide {slide_num+1} of {slide_count}") | |
| slide_text = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text.strip(): | |
| slide_text.append(shape.text.strip()) | |
| # Extract tables | |
| if shape.has_table: | |
| table = shape.table | |
| for row in table.rows: | |
| row_text = " | ".join(cell.text for cell in row.cells) | |
| if row_text.strip(): | |
| slide_text.append(row_text) | |
| # Extract notes if available | |
| if slide.has_notes_slide: | |
| notes = slide.notes_slide.notes_text_frame | |
| if notes: | |
| slide_text.append("--- Notes ---") | |
| for para in notes.paragraphs: | |
| if para.text.strip(): | |
| slide_text.append(para.text.strip()) | |
| if slide_text: | |
| all_text.append(f"=== SLIDE {slide_num + 1} ===\n" + "\n".join(slide_text)) | |
| else: | |
| all_text.append(f"=== SLIDE {slide_num + 1} ===\n[No text content]") | |
| # Combine all text | |
| full_presentation_text = "\n\n".join(all_text) | |
| return full_presentation_text | |
| except Exception as e: | |
| logger.error(f"Error processing PPTX file: {e}") | |
| raise Exception(f"Error processing PPTX file: {str(e)}") | |
| def convert_dataframe_to_markdown_kv(data: pd.DataFrame, sheet_name: str) -> str: | |
| """Convert pandas DataFrame to markdown key-value format | |
| Args: | |
| data: pandas DataFrame | |
| sheet_name: Name of the sheet | |
| Returns: | |
| str: Markdown formatted text with key-value pairs for each record | |
| """ | |
| if data.empty: | |
| return f"# {sheet_name}\n\n*No data found in this sheet.*\n" | |
| # Clean column names (remove extra whitespace) | |
| data.columns = data.columns.str.strip() | |
| # Build markdown output | |
| output = [f"# {sheet_name}\n"] | |
| # Convert each row to a record | |
| for record_num, (idx, row) in enumerate(data.iterrows(), start=1): | |
| output.append(f"## Record {record_num}\n") | |
| output.append("```\n") | |
| # Add each column as key-value pair | |
| for col in data.columns: | |
| value = row[col] | |
| # Handle NaN, None, and empty values | |
| if pd.isna(value) or value == "": | |
| value = "" | |
| else: | |
| value = str(value).strip() | |
| output.append(f"{col}: {value}\n") | |
| output.append("```\n") | |
| return "".join(output) | |
| async def process_excel(file_path: str, markdown_kv: bool = False, storage_provider: str = "azure") -> tuple: | |
| """Process Excel file and extract all images and data from all sheets | |
| Args: | |
| file_path: Path to Excel file | |
| markdown_kv: If True, converts cell data and table images to markdown key-value format | |
| storage_provider: Cloud storage provider ("azure", "aws", or "selfhosted") | |
| Returns: | |
| tuple: (extracted_text: str, image_summaries: dict) | |
| """ | |
| try: | |
| extracted_text = [] | |
| image_summaries = {} | |
| pxl_doc = openpyxl.load_workbook(file_path) | |
| # Collect all text data | |
| all_text_data = [] | |
| for sheet_name in pxl_doc.sheetnames: | |
| print(f"Processing sheet: {sheet_name}") | |
| sheet = pxl_doc[sheet_name] | |
| try: | |
| data = pd.read_excel(file_path, sheet_name=sheet_name) | |
| if not data.empty: | |
| if markdown_kv: | |
| # Convert DataFrame to markdown key-value format | |
| sheet_data = convert_dataframe_to_markdown_kv(data, sheet_name) | |
| extracted_text.append(sheet_data) | |
| all_text_data.append(sheet_data) | |
| print(f"Extracted data from sheet: {sheet_name} (markdown-kv format)") | |
| else: | |
| # Use original format | |
| sheet_data = f"=== SHEET: {sheet_name} ===\n{data.to_string()}\n" | |
| extracted_text.append(sheet_data) | |
| all_text_data.append(sheet_data) | |
| print(f"Extracted data from sheet: {sheet_name}") | |
| except Exception as e: | |
| logger.error(f"Error extracting data from sheet {sheet_name}: {e}") | |
| extracted_text.append(f"[ERROR extracting data from sheet {sheet_name}: {str(e)}]") | |
| # Self-hosted: HF storage used when markdown_kv is enabled (no client init needed) | |
| # Process images | |
| for sheet_name in pxl_doc.sheetnames: | |
| sheet = pxl_doc[sheet_name] | |
| if sheet._images: | |
| print(f"Found {len(sheet._images)} images in sheet: {sheet_name}") | |
| for idx, image in enumerate(sheet._images): | |
| try: | |
| print(f"Processing image {idx + 1} from sheet {sheet_name}") | |
| img_data = image._data() | |
| image_id = f"{sheet_name}_image_{idx + 1}" | |
| if markdown_kv: | |
| # Upload image to cloud storage and use summarize_image_with_gpt | |
| from PIL import Image | |
| import io | |
| # Convert image to PIL Image | |
| pil_image = Image.open(io.BytesIO(img_data)) | |
| if pil_image.mode != 'RGB': | |
| pil_image = pil_image.convert('RGB') | |
| # Resize if too large | |
| max_size = 800 | |
| if pil_image.width > max_size or pil_image.height > max_size: | |
| pil_image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| # Convert to JPEG bytes | |
| img_buffer = io.BytesIO() | |
| pil_image.save(img_buffer, format='JPEG', quality=85, optimize=True) | |
| img_buffer.seek(0) | |
| jpeg_bytes = img_buffer.getvalue() | |
| # Upload to self-hosted (HF) storage | |
| blob_url = None | |
| try: | |
| from hf_storage import store_image | |
| success = await store_image(image_id, jpeg_bytes, folder_prefix="xlsx_images") | |
| if success: | |
| blob_url = f"hf://xlsx_images/{image_id}.jpg" | |
| logger.info(f"Uploaded image to HF persistent storage: xlsx_images/{image_id}.jpg") | |
| else: | |
| logger.error(f"Failed to upload image {image_id} to HF persistent storage") | |
| blob_url = None | |
| except Exception as e: | |
| logger.error(f"Failed to upload image {image_id} to HF storage: {e}") | |
| blob_url = None | |
| # Generate summary with markdown-kv support | |
| if blob_url: | |
| try: | |
| summary = await summarize_image_with_gpt(blob_url, image_id, storage_provider, markdown_kv_enabled=True) | |
| image_summaries[image_id] = summary | |
| extracted_text.append(f"=== IMAGE {idx + 1} FROM SHEET: {sheet_name} ===\n{summary}\n") | |
| except Exception as e: | |
| logger.error(f"Error summarizing image {image_id} with GPT: {e}") | |
| extracted_text.append(f"[ERROR processing image {idx + 1} from sheet {sheet_name}: {str(e)}]") | |
| else: | |
| # Fallback to extract_page_with_llm if upload failed | |
| base64_img = base64.b64encode(img_data).decode("utf-8") | |
| result = await extract_page_with_llm(base64_img, image_id) | |
| extracted_text.append(f"=== IMAGE {idx + 1} FROM SHEET: {sheet_name} ===\n{result}\n") | |
| else: | |
| # Use extract_page_with_llm (original behavior) | |
| base64_img = base64.b64encode(img_data).decode("utf-8") | |
| result = await extract_page_with_llm(base64_img, image_id) | |
| extracted_text.append(f"=== IMAGE {idx + 1} FROM SHEET: {sheet_name} ===\n{result}\n") | |
| except Exception as e: | |
| logger.error(f"Error processing image {idx + 1} from sheet {sheet_name}: {e}") | |
| extracted_text.append(f"[ERROR processing image {idx + 1} from sheet {sheet_name}: {str(e)}]") | |
| pxl_doc.close() | |
| if not extracted_text: | |
| return "No data or images found in the Excel file.", {} | |
| return "\n".join(extracted_text), image_summaries | |
| except Exception as e: | |
| logger.error(f"Error processing Excel file: {e}") | |
| raise Exception(f"Error processing Excel file: {str(e)}") | |
| async def process_csv(file_path: str) -> str: | |
| """Process CSV file and extract all data""" | |
| try: | |
| extracted_text = [] | |
| # Try to read CSV with pandas first (handles encoding better) | |
| try: | |
| # Try different encodings | |
| encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] | |
| data = None | |
| for encoding in encodings: | |
| try: | |
| data = pd.read_csv(file_path, encoding=encoding) | |
| logger.info(f"Successfully read CSV with encoding: {encoding}") | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if data is None: | |
| # Fallback to csv module | |
| import csv | |
| rows = [] | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as csvfile: | |
| csv_reader = csv.reader(csvfile) | |
| for row in csv_reader: | |
| rows.append(row) | |
| data = pd.DataFrame(rows) | |
| except Exception as e: | |
| logger.warning(f"Error reading CSV with pandas: {e}, trying csv module") | |
| # Fallback to csv module | |
| import csv | |
| rows = [] | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as csvfile: | |
| csv_reader = csv.reader(csvfile) | |
| for row in csv_reader: | |
| rows.append(row) | |
| data = pd.DataFrame(rows) | |
| except Exception as e2: | |
| logger.error(f"Error reading CSV with csv module: {e2}") | |
| raise Exception(f"Error reading CSV file: {str(e2)}") | |
| if data is not None and not data.empty: | |
| # Extract all data as string | |
| csv_data = data.to_string() | |
| extracted_text.append(f"=== CSV DATA ===\n{csv_data}\n") | |
| logger.info(f"Extracted {len(data)} rows from CSV file") | |
| else: | |
| return "No data found in the CSV file." | |
| return "\n".join(extracted_text) | |
| except Exception as e: | |
| logger.error(f"Error processing CSV file: {e}") | |
| raise Exception(f"Error processing CSV file: {str(e)}") | |
| async def process_audio(file_path: str, progress_callback=None) -> str: | |
| """ | |
| Transcribe audio file using Google Gemini. | |
| Args: | |
| file_path: Path to the audio file to transcribe | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| str: Transcribed text | |
| """ | |
| try: | |
| if not GEMINI_AVAILABLE: | |
| raise Exception("Gemini SDK not available. Install with: pip install google-generativeai") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| raise Exception("GEMINI_API_KEY not found in environment variables") | |
| genai.configure(api_key=gemini_api_key) | |
| if progress_callback: | |
| await progress_callback("📤 Uploading audio to Gemini...") | |
| # Determine MIME type based on file extension | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| mime_types = { | |
| '.mp3': 'audio/mp3', | |
| '.wav': 'audio/wav', | |
| '.flac': 'audio/flac', | |
| '.aac': 'audio/aac', | |
| '.ogg': 'audio/ogg', | |
| '.m4a': 'audio/mp4', | |
| '.wma': 'audio/x-ms-wma', | |
| '.webm': 'audio/webm', | |
| '.amr': 'audio/amr' | |
| } | |
| mime_type = mime_types.get(file_ext, 'audio/mpeg') | |
| # Upload audio using Files API | |
| uploaded_file = genai.upload_file( | |
| path=file_path, | |
| mime_type=mime_type | |
| ) | |
| if uploaded_file: | |
| logger.info(f"✅ Successfully uploaded audio to Gemini Files API: {uploaded_file.name}") | |
| if progress_callback: | |
| await progress_callback("🔄 Transcribing audio with Gemini...") | |
| # Wait for file to be processed | |
| import time | |
| while uploaded_file.state.name == "PROCESSING": | |
| if progress_callback: | |
| await progress_callback("⏳ Waiting for audio processing...") | |
| time.sleep(2) | |
| uploaded_file = genai.get_file(uploaded_file.name) | |
| if uploaded_file.state.name == "FAILED": | |
| raise Exception(f"Audio processing failed: {uploaded_file.state.name}") | |
| # Use Gemini to transcribe the audio | |
| model = configure_gemini() | |
| prompt = """Transcribe this audio file accurately. | |
| Follow these guidelines: | |
| - Transcribe all spoken words exactly as heard | |
| - Preserve the original language of the speech | |
| - Include speaker labels if multiple speakers are detected (e.g., Speaker 1:, Speaker 2:) | |
| - Include timestamps in format [MM:SS] at natural breaks or speaker changes | |
| - Preserve punctuation and sentence structure | |
| - Note any significant non-speech sounds in brackets (e.g., [music], [applause]) | |
| - If the audio is in Japanese, transcribe in Japanese | |
| - Maintain the natural flow and structure of the conversation | |
| """ | |
| # Generate response using async wrapper | |
| def generate_sync(): | |
| return model.generate_content([prompt, uploaded_file]) | |
| response = await asyncio.to_thread(generate_sync) | |
| # Clean up uploaded file | |
| try: | |
| genai.delete_file(uploaded_file.name) | |
| except: | |
| pass | |
| if progress_callback: | |
| await progress_callback("✅ Audio transcription completed!") | |
| if hasattr(response, 'text'): | |
| return response.text | |
| else: | |
| return str(response) | |
| except Exception as e: | |
| logger.error(f"Error processing audio file with Gemini: {e}") | |
| raise Exception(f"Error processing audio file: {str(e)}") | |
| async def extract_video_frames(file_path: str, interval_seconds: float = 4.0, progress_callback=None) -> list: | |
| """ | |
| Extract frames from video at specified intervals. | |
| Args: | |
| file_path: Path to video file | |
| interval_seconds: Interval in seconds between frames (default: 4.0 for ~3-5 sec range) | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| list: List of PIL Image objects representing extracted frames | |
| """ | |
| try: | |
| # Try to import cv2 | |
| try: | |
| import cv2 | |
| except ImportError: | |
| raise Exception("OpenCV (cv2) is required for video frame extraction. Install with: pip install opencv-python") | |
| from PIL import Image | |
| frames = [] | |
| cap = cv2.VideoCapture(file_path) | |
| if not cap.isOpened(): | |
| raise Exception(f"Could not open video file: {file_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| if progress_callback: | |
| await progress_callback(f"📹 Extracting frames from video (duration: {duration:.1f}s, interval: {interval_seconds}s)...") | |
| frame_interval = int(fps * interval_seconds) if fps > 0 else int(30 * interval_seconds) # Default to 30 fps if unknown | |
| frame_count = 0 | |
| extracted_count = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Extract frame at intervals | |
| if frame_count % frame_interval == 0: | |
| # Convert BGR to RGB for PIL | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(frame_rgb) | |
| frames.append(pil_image) | |
| extracted_count += 1 | |
| if progress_callback: | |
| timestamp = frame_count / fps if fps > 0 else frame_count / 30 | |
| await progress_callback(f"📸 Extracted frame {extracted_count} at {timestamp:.1f}s") | |
| frame_count += 1 | |
| cap.release() | |
| if progress_callback: | |
| await progress_callback(f"✅ Extracted {len(frames)} frames from video") | |
| return frames | |
| except Exception as e: | |
| logger.error(f"Error extracting video frames: {e}") | |
| raise Exception(f"Error extracting video frames: {str(e)}") | |
| async def analyze_video_frames_with_gemini(frames: list, progress_callback=None) -> str: | |
| """ | |
| Analyze video frames using Gemini 3 Pro and generate a visual summary. | |
| """ | |
| try: | |
| if not GEMINI_AVAILABLE: | |
| raise Exception("Gemini SDK not available. Install with: pip install google-generativeai") | |
| if not frames: | |
| return "No frames extracted from video." | |
| from PIL import Image | |
| # Use Gemini 3 Pro specifically | |
| model = configure_gemini3() | |
| if progress_callback: | |
| await progress_callback(f"🤖 Analyzing {len(frames)} frames with Gemini 3 Pro...") | |
| import io | |
| frame_descriptions = [] | |
| batch_size = 10 # Process 10 frames at a time | |
| for i in range(0, len(frames), batch_size): | |
| batch = frames[i:i+batch_size] | |
| content_parts = [] | |
| # Add text prompt | |
| frame_nums = f"frames {i+1}-{min(i+len(batch), len(frames))}" | |
| prompt = f"""Analyze these video frames ({frame_nums} of {len(frames)} total frames). | |
| Describe what you see in detail, including: | |
| - Main subjects, objects, or people | |
| - Actions or activities happening | |
| - Setting or environment | |
| - Any text, diagrams, or important visual elements | |
| - Overall scene context | |
| Provide a comprehensive description for this batch of frames.""" | |
| content_parts.append(prompt) | |
| # Add images to content | |
| for frame in batch: | |
| # Resize if too large | |
| max_size = 1536 | |
| if frame.width > max_size or frame.height > max_size: | |
| frame = frame.copy() | |
| frame.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| # Convert to bytes for Gemini | |
| img_buffer = io.BytesIO() | |
| frame.save(img_buffer, format='JPEG', quality=90) | |
| img_buffer.seek(0) | |
| # Add as PIL Image | |
| content_parts.append(Image.open(img_buffer)) | |
| try: | |
| # Generate content using Gemini 3 Pro | |
| def generate_batch(): | |
| return model.generate_content( | |
| content_parts, | |
| generation_config={ | |
| 'temperature': 0.4, | |
| 'max_output_tokens': 2048, | |
| } | |
| ) | |
| response = await asyncio.to_thread(generate_batch) | |
| if response and hasattr(response, 'text') and response.text: | |
| frame_descriptions.append(f"Frames {frame_nums}:\n{response.text}\n") | |
| if progress_callback: | |
| await progress_callback(f"✅ Analyzed batch {i//batch_size + 1}/{(len(frames)-1)//batch_size + 1}") | |
| else: | |
| logger.warning(f"No text in response for batch {i//batch_size + 1}") | |
| frame_descriptions.append(f"Frames {frame_nums}: [No response generated]\n") | |
| except Exception as e: | |
| logger.warning(f"Error analyzing batch {i//batch_size + 1}: {e}") | |
| frame_descriptions.append(f"Frames {frame_nums}: [Analysis error: {str(e)}]\n") | |
| # Generate final summary from all frame descriptions | |
| if progress_callback: | |
| await progress_callback("📝 Generating final visual summary with Gemini 3 Pro...") | |
| if not frame_descriptions: | |
| return "Could not analyze any video frames." | |
| all_descriptions = "\n\n".join(frame_descriptions) | |
| summary_prompt = f"""Based on the following frame-by-frame analysis of a video, create a comprehensive visual summary: | |
| {all_descriptions} | |
| Please provide: | |
| 1. A concise overall summary of what the video shows | |
| 2. Key visual elements, scenes, or moments | |
| 3. Main subjects or topics visible in the video | |
| 4. Any important details that appear across multiple frames | |
| Format the summary in a clear, organized manner.""" | |
| try: | |
| def generate_summary(): | |
| return model.generate_content( | |
| summary_prompt, | |
| generation_config={ | |
| 'temperature': 0.4, | |
| 'max_output_tokens': 2048, | |
| } | |
| ) | |
| summary_response = await asyncio.to_thread(generate_summary) | |
| if summary_response and hasattr(summary_response, 'text') and summary_response.text: | |
| visual_summary = summary_response.text | |
| else: | |
| visual_summary = "\n\n".join(frame_descriptions) | |
| except Exception as e: | |
| logger.warning(f"Error generating final summary: {e}") | |
| visual_summary = "\n\n".join(frame_descriptions) | |
| if progress_callback: | |
| await progress_callback("✅ Visual summary generated successfully with Gemini 3 Pro") | |
| return visual_summary | |
| except Exception as e: | |
| logger.error(f"Error analyzing video frames with Gemini 3 Pro: {e}") | |
| raise Exception(f"Error analyzing video frames with Gemini 3 Pro: {str(e)}") | |
| async def process_video(file_path: str, progress_callback=None) -> dict: | |
| """ | |
| Process video: extract audio for transcription and frames for visual analysis. | |
| Args: | |
| file_path: Path to video file | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| dict: Contains 'transcript' (str) and 'visual_summary' (str) | |
| """ | |
| try: | |
| temp_dir = tempfile.gettempdir() | |
| audio_path = os.path.join(temp_dir, f"temp_audio_{os.getpid()}_{hash(file_path) % 10000}.mp3") | |
| # Step 1: Extract and transcribe audio | |
| if progress_callback: | |
| await progress_callback("🎵 Extracting audio from video...") | |
| audio = extract_audio(input_path=file_path, output_path=audio_path) | |
| transcript = await process_audio(audio_path) | |
| # Step 2: Extract video frames at 3-5 second intervals | |
| frames = [] | |
| visual_summary = "" | |
| try: | |
| # Extract frames (using 4 second interval for ~3-5 sec range) | |
| frames = await extract_video_frames(file_path, interval_seconds=4.0, progress_callback=progress_callback) | |
| # Step 3: Analyze frames with Gemini | |
| if frames and GEMINI_AVAILABLE: | |
| visual_summary = await analyze_video_frames_with_gemini(frames, progress_callback=progress_callback) | |
| elif frames: | |
| visual_summary = f"Extracted {len(frames)} frames from video, but Gemini is not available for analysis." | |
| else: | |
| visual_summary = "No frames could be extracted from the video." | |
| except Exception as e: | |
| logger.warning(f"Error processing video frames: {e}") | |
| visual_summary = f"Visual analysis failed: {str(e)}" | |
| # Clean up temporary audio file | |
| try: | |
| if os.path.exists(audio_path): | |
| os.unlink(audio_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to delete temporary audio file {audio_path}: {e}") | |
| return { | |
| "transcript": transcript, | |
| "visual_summary": visual_summary, | |
| "frame_count": len(frames) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing video file: {e}") | |
| raise Exception(f"Error processing video file: {str(e)}") | |
| async def extract_text_from_dxf(file_path: str) -> str: | |
| """Extract all text content from DXF file using ezdxf (comprehensive extraction)""" | |
| try: | |
| logger.info(f"Extracting text from DXF file: {file_path}") | |
| # Read the DXF file | |
| doc = ezdxf.readfile(file_path) | |
| msp = doc.modelspace() | |
| extracted_texts = [] | |
| # Extract TEXT entities | |
| for text_entity in msp.query("TEXT"): | |
| if hasattr(text_entity.dxf, 'text') and text_entity.dxf.text: | |
| extracted_texts.append(f"TEXT: {text_entity.dxf.text}") | |
| # Extract MTEXT entities (multi-line text) | |
| for mtext_entity in msp.query("MTEXT"): | |
| if hasattr(mtext_entity, 'text') and mtext_entity.text: | |
| extracted_texts.append(f"MTEXT: {mtext_entity.text}") | |
| # Extract INSERT (block references) with attributes | |
| for insert in msp.query("INSERT"): | |
| # Get block reference name | |
| block_name = insert.dxf.name if hasattr(insert.dxf, 'name') else "Unknown Block" | |
| extracted_texts.append(f"BLOCK: {block_name}") | |
| # Extract ATTRIB entities (attributes attached to blocks) | |
| for attrib in insert.attribs: | |
| if hasattr(attrib.dxf, 'text') and attrib.dxf.text: | |
| extracted_texts.append(f" ATTRIB: {attrib.dxf.text}") | |
| # Try to get text from block definition | |
| block = insert.block() | |
| if block is not None: | |
| # Extract TEXT from block definition | |
| for text in block.query("TEXT"): | |
| if hasattr(text.dxf, 'text') and text.dxf.text: | |
| extracted_texts.append(f" BLOCK_TEXT: {text.dxf.text}") | |
| # Extract MTEXT from block definition | |
| for mtext in block.query("MTEXT"): | |
| if hasattr(mtext, 'text') and mtext.text: | |
| extracted_texts.append(f" BLOCK_MTEXT: {mtext.text}") | |
| # Extract DIMENSION entities (may contain measurement text) | |
| for dim in msp.query("DIMENSION"): | |
| if hasattr(dim.dxf, 'text') and dim.dxf.text: | |
| extracted_texts.append(f"DIMENSION: {dim.dxf.text}") | |
| # Extract LEADER entities (may contain annotation text) | |
| for leader in msp.query("LEADER"): | |
| if hasattr(leader.dxf, 'annotation') and leader.dxf.annotation: | |
| extracted_texts.append(f"LEADER: {leader.dxf.annotation}") | |
| # Combine all extracted text | |
| if extracted_texts: | |
| combined_text = "\n".join(extracted_texts) | |
| logger.info(f"Successfully extracted {len(extracted_texts)} text elements from DXF") | |
| return combined_text | |
| else: | |
| logger.warning("No text found in DXF file") | |
| return "No text content found in DXF file" | |
| except Exception as e: | |
| logger.error(f"Error extracting text from DXF: {e}") | |
| return f"Error extracting text from DXF: {str(e)}" | |
| async def process_dxf(file_path: str, progress_callback=None, custom_prompt: str = None) -> str: | |
| """ | |
| Process DXF file with complete pipeline: | |
| 1. Extract text using ezdxf | |
| 2. Convert to PDF and upload to OpenAI | |
| 3. Generate detailed report using LLM with file_id | |
| 4. Combine text + report for embeddings | |
| 5. Store in vector database | |
| Args: | |
| file_path: Path to DXF file | |
| progress_callback: Optional callback for progress updates | |
| custom_prompt: Optional custom system prompt (uses default DXF_ANALYSIS_PROMPT if not provided) | |
| """ | |
| temp_pdf_path = None | |
| file_id = None | |
| async def _update(msg: str): | |
| if progress_callback: | |
| await progress_callback(msg) | |
| try: | |
| await _update("🔄 Processing DXF file with LLM analysis pipeline...") | |
| logger.info("Processing DXF file with complete pipeline") | |
| # Step 1: Extract text from DXF using ezdxf | |
| await _update("📐 Step 1: Extracting text from DXF...") | |
| logger.info("Step 1: Extracting text from DXF") | |
| extracted_text = await extract_text_from_dxf(file_path) | |
| # Step 2: Convert DXF to PDF | |
| await _update("📄 Step 2: Converting DXF to PDF...") | |
| logger.info("Step 2: Converting DXF to PDF") | |
| temp_dir = tempfile.gettempdir() | |
| temp_pdf_path = os.path.join(temp_dir, f"temp_dxf_{os.getpid()}_{hash(file_path) % 10000}.pdf") | |
| doc = ezdxf.readfile(file_path) | |
| msp = doc.modelspace() | |
| # Create render context | |
| ctx = RenderContext(doc) | |
| # Create backend for PyMuPDF (for PDF) | |
| backend = pymupdf.PyMuPdfBackend() | |
| # Configure appearance | |
| cfg = config.Configuration( | |
| background_policy=config.BackgroundPolicy.WHITE, | |
| color_policy=config.ColorPolicy.BLACK | |
| ) | |
| # Create frontend with the configuration | |
| frontend = Frontend(ctx, backend, config=cfg) | |
| # Draw the layout (modelspace) into the backend | |
| frontend.draw_layout(msp) | |
| # Define a page layout (e.g. A4, margins) | |
| page = layout.Page(210, 297, layout.Units.mm, margins=layout.Margins.all(20)) | |
| # Get PDF as bytes | |
| pdf_bytes = backend.get_pdf_bytes(page) | |
| # Write bytes to file | |
| with open(temp_pdf_path, "wb") as f: | |
| f.write(pdf_bytes) | |
| await _update("✅ DXF to PDF conversion completed") | |
| logger.info("DXF to PDF conversion completed") | |
| print(f'DXF to PDF conversion completed', temp_pdf_path) | |
| # Step 3: Upload the PDF file to OpenAI | |
| await _update("📤 Step 3: Uploading PDF to OpenAI...") | |
| logger.info("Step 3: Uploading PDF to OpenAI") | |
| file_id = upload_file_to_openai(temp_pdf_path) | |
| # Step 4: Generate detailed report using LLM with extracted text + file_id | |
| await _update("🤖 Step 4: Generating detailed report with LLM...") | |
| logger.info("Step 4: Generating detailed report with LLM") | |
| # Use custom prompt if provided, otherwise use default | |
| system_prompt = custom_prompt if (custom_prompt and custom_prompt.strip()) else DXF_ANALYSIS_PROMPT | |
| # Prepare enhanced prompt with extracted text | |
| enhanced_prompt = f"""You are analyzing a DXF CAD file. First, here is the extracted text content from the file: {extracted_text} Now, using both the extracted text above AND the visual PDF representation of the file, follow the system prompt instructions to generate the JSON object and detailed narrative explanation.""" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": enhanced_prompt | |
| }, | |
| { | |
| "type": "file", | |
| "file": { | |
| "file_id": file_id | |
| } | |
| } | |
| ] | |
| } | |
| ] | |
| # Use sync client for chat completion | |
| response = await asyncio.to_thread( | |
| sync_client.chat.completions.create, | |
| model="gpt-4.1-mini", | |
| messages=messages, | |
| temperature=0.0 | |
| ) | |
| # Extract the analysis result (detailed report) | |
| llm_report = response.choices[0].message.content.strip() | |
| # Step 5: Combine extracted text + LLM report for embeddings | |
| await _update("✅ Step 5: Combining text and report for embeddings...") | |
| logger.info("Step 5: Combining text and report for embeddings") | |
| combined_content = f"""{extracted_text}{llm_report}""" | |
| await _update("✅ DXF processing completed successfully!") | |
| logger.info(f"Successfully processed DXF file. Combined content length: {len(combined_content)} characters") | |
| return combined_content | |
| except Exception as e: | |
| logger.error(f"Error processing DXF file: {e}") | |
| raise Exception(f"Error processing DXF file: {str(e)}") | |
| finally: | |
| # Clean up - delete the uploaded file from OpenAI | |
| if file_id: | |
| try: | |
| await asyncio.to_thread(sync_client.files.delete, file_id) | |
| logger.info(f"Successfully deleted uploaded file: {file_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to delete uploaded file {file_id}: {e}") | |
| # Clean up temporary PDF file | |
| if temp_pdf_path and os.path.exists(temp_pdf_path): | |
| try: | |
| os.unlink(temp_pdf_path) | |
| logger.info(f"Successfully deleted temporary PDF: {temp_pdf_path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to delete temporary PDF file {temp_pdf_path}: {e}") | |
| async def process_image(file_path: str) -> str: | |
| """Process image files (JPG, PNG) using LLM vision (default: OpenAI)""" | |
| try: | |
| # Read the image file | |
| with open(file_path, "rb") as img_file: | |
| img_data = img_file.read() | |
| base64_img = base64.b64encode(img_data).decode("utf-8") | |
| # Get file extension for context | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| image_type = "image" if file_ext in [".jpg", ".jpeg"] else "PNG image" | |
| # For standalone images, analyze the image for content | |
| result = await extract_page_with_llm(base64_img, f"{image_type}_file") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error processing image file: {e}") | |
| raise Exception(f"Error processing image file: {str(e)}") | |
| async def youtube_video(video_id: str) -> str: | |
| try: | |
| ytt_api = YouTubeTranscriptApi() | |
| transcript = ytt_api.fetch(video_id) | |
| if hasattr(transcript, 'snippets'): | |
| text_content = ' '.join([snippet.text for snippet in transcript.snippets]) | |
| return text_content | |
| else: | |
| return str(transcript) | |
| except Exception as e: | |
| logger.error(f"Error processing YouTube video: {e}") | |
| raise Exception(f"Error processing YouTube video: {str(e)}") | |
| async def process_web_url(url: str, storage_provider: str = "azure") -> str: | |
| try: | |
| # Fetch the webpage using requests | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'Connection': 'keep-alive', | |
| } | |
| resp = requests.get(url, timeout=10, headers=headers) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.content, "html.parser") | |
| # Extract text content using BeautifulSoup | |
| text_content = soup.get_text(separator=' ', strip=True) | |
| # Self-hosted: HF persistent storage (no client init needed) | |
| # Extract image URLs using images_url.py logic | |
| def is_valid_url(url): | |
| parsed = urlparse(url) | |
| return bool(parsed.scheme) and bool(parsed.netloc) | |
| def extract_from_html(soup, base_url): | |
| urls = set() | |
| # 1. <img>, <picture>, <source> tags | |
| for tag in soup.find_all(("img", "source")): | |
| for attr in ("data-srcset", "data-src", "srcset", "src"): | |
| v = tag.get(attr) | |
| if v: | |
| # if srcset has comma list, take largest | |
| if attr in ("srcset", "data-srcset"): | |
| parts = [p.strip().split(" ")[0] for p in v.split(",")] | |
| v2 = parts[-1] | |
| else: | |
| v2 = v | |
| full = urljoin(base_url, v2) | |
| if is_valid_url(full): | |
| urls.add(full) | |
| break | |
| # 2. inline styles – background-image | |
| bg_re = re.compile(r"background(?:-image)?\s*:\s*url\((['\"]?)(.*?)\1\)") | |
| for tag in soup.find_all(style=True): | |
| style = tag["style"] | |
| m = bg_re.search(style) | |
| if m: | |
| u = m.group(2) | |
| full = urljoin(base_url, u) | |
| if is_valid_url(full): | |
| urls.add(full) | |
| return urls | |
| def extract_from_css(css_text, base_url): | |
| urls = set() | |
| # search url(...) patterns | |
| for match in re.finditer(r"url\((['\"]?)(.*?)\1\)", css_text): | |
| u = match.group(2) | |
| full = urljoin(base_url, u) | |
| if is_valid_url(full): | |
| urls.add(full) | |
| return urls | |
| def fetch_css_links(soup, base_url): | |
| links = [] | |
| for link in soup.find_all("link", rel="stylesheet"): | |
| href = link.get("href") | |
| if href: | |
| full = urljoin(base_url, href) | |
| links.append(full) | |
| return links | |
| # Extract image URLs from HTML | |
| image_urls = extract_from_html(soup, url) | |
| # Fetch CSS files and parse them for additional image URLs | |
| css_links = fetch_css_links(soup, url) | |
| for css_url in css_links: | |
| try: | |
| cr = requests.get(css_url, timeout=10, headers=headers) | |
| cr.raise_for_status() | |
| more_urls = extract_from_css(cr.text, css_url) | |
| image_urls.update(more_urls) | |
| except Exception as e: | |
| logger.debug(f"CSS fetch failed: {css_url}, {e}") | |
| logger.info(f"Found {len(image_urls)} candidate image URLs") | |
| # Process images | |
| image_descriptions = [] | |
| image_counter = 0 | |
| image_blob_urls = {} | |
| for img_url in image_urls: | |
| try: | |
| # Download image header first to check type and size | |
| head_response = requests.head(img_url, timeout=10, headers=headers) | |
| head_response.raise_for_status() | |
| # Check content type - only JPG and PNG | |
| content_type = head_response.headers.get('content-type', '').lower() | |
| if not (content_type.startswith("image/jpeg") or content_type.startswith("image/png")): | |
| logger.debug(f"Skipping unsupported format: {content_type}") | |
| continue | |
| # Check image size (skip very small images - likely icons) | |
| content_length = head_response.headers.get('content-length') | |
| if content_length and int(content_length) < 15000: # Less than 15KB | |
| logger.debug(f"Skipping small image (likely icon): {content_length} bytes") | |
| continue | |
| # Download the full image | |
| img_response = requests.get(img_url, timeout=10, stream=True, headers=headers) | |
| img_response.raise_for_status() | |
| # Double-check content type from actual response | |
| actual_content_type = img_response.headers.get('content-type', '').lower() | |
| if not (actual_content_type.startswith("image/jpeg") or actual_content_type.startswith("image/png")): | |
| logger.debug(f"Skipping unsupported format in response: {actual_content_type}") | |
| continue | |
| # Create unique image ID | |
| image_counter += 1 | |
| image_id = f"web_image_{image_counter:03d}" | |
| # Get image data | |
| image_data = img_response.content | |
| # Upload to self-hosted (HF) storage | |
| try: | |
| from hf_storage import store_image | |
| success = await store_image(image_id, image_data, folder_prefix="web_images") | |
| if success: | |
| blob_url = f"hf://web_images/{image_id}.jpg" | |
| image_blob_urls[image_id] = blob_url | |
| logger.info(f"Uploaded web image to HF persistent storage: web_images/{image_id}.jpg") | |
| else: | |
| logger.error(f"Failed to upload web image {image_id} to HF persistent storage") | |
| except Exception as upload_error: | |
| logger.error(f"Failed to upload image {image_id}: {upload_error}") | |
| continue | |
| # Create image description with image ID | |
| img_desc = f"[IMAGE: Web image from {url}] (Image ID: {image_id})" | |
| image_descriptions.append(img_desc) | |
| logger.info(f"Processed web image: {image_id}") | |
| except Exception as img_error: | |
| logger.warning(f"Could not download image {img_url}: {img_error}") | |
| continue | |
| # Generate image summaries using GPT if images were found | |
| image_summaries = {} | |
| if image_blob_urls: | |
| logger.info("Generating image summaries with GPT...") | |
| for image_id, blob_url in image_blob_urls.items(): | |
| try: | |
| # Import markdown_kv_enabled from app module | |
| from app import markdown_kv_enabled | |
| image_summaries[image_id] = await summarize_image_with_gpt(blob_url, image_id, storage_provider, markdown_kv_enabled) | |
| except Exception as e: | |
| logger.error(f"Error summarizing image {image_id}: {e}") | |
| image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]" | |
| # Prepare content for RAG (includes image information and summaries for search) | |
| rag_content = text_content | |
| if image_descriptions: | |
| # Create image IDs list for embedding | |
| image_ids = list(image_blob_urls.keys()) | |
| images_section = f"[Images: {', '.join(image_ids)}]" | |
| # Add image references to the content for RAG search | |
| rag_content = text_content + "\n\n--- IMAGES FOUND ---\n" + "\n".join(image_descriptions) | |
| rag_content += f"\n\n{images_section}" | |
| # Add image summaries for better search capabilities | |
| if image_summaries: | |
| rag_content += "\n\nImage Summaries:" | |
| for img_id, summary in image_summaries.items(): | |
| rag_content += f"\n- {img_id}: {summary}" | |
| # Return both versions: clean text for display, full content for RAG | |
| return { | |
| "text": rag_content, # Full content with image info for RAG | |
| "display_text": text_content, # Clean text for UI display | |
| "image_summaries": image_summaries, | |
| "image_blob_urls": image_blob_urls | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing URL: {e}") | |
| raise Exception(f"Error processing URL: {str(e)}") | |
| async def _extract_text_async(file, video_id, web_url, storage_provider="azure"): | |
| """Async wrapper for the main extraction logic""" | |
| # Handle web URL processing | |
| if web_url and web_url.strip(): | |
| try: | |
| result = await process_web_url(web_url.strip(), storage_provider) | |
| if isinstance(result, dict): | |
| return result["text"].strip() | |
| else: | |
| return result.strip() | |
| except Exception as e: | |
| return f"Error processing URL: {str(e)}" | |
| # Handle YouTube video processing | |
| if video_id and video_id.strip(): | |
| try: | |
| video_id_clean = video_id.strip() | |
| # Extract video ID from full URL if provided | |
| if "youtube.com" in video_id_clean or "youtu.be" in video_id_clean: | |
| patterns = [ | |
| r'(?:youtube\.com\/watch\?v=|youtube\.com\/shorts\/|youtu\.be\/)([a-zA-Z0-9_-]+)', | |
| r'youtube\.com\/embed\/([a-zA-Z0-9_-]+)' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, video_id_clean) | |
| if match: | |
| video_id_clean = match.group(1) | |
| break | |
| else: | |
| return "Error: Invalid YouTube URL format" | |
| extracted_text = await youtube_video(video_id_clean) | |
| return extracted_text | |
| except Exception as e: | |
| return f"Error processing YouTube video: {str(e)}" | |
| # Handle file processing | |
| if file is None: | |
| return "Error: Please provide either a file, YouTube video ID, or web URL" | |
| # Check file size (20MB = 20 * 1024 * 1024 bytes) | |
| try: | |
| # Get file size from the file object | |
| if hasattr(file, 'size'): | |
| file_size_mb = file.size / (1024 * 1024) | |
| elif hasattr(file, 'name'): | |
| # If size attribute doesn't exist, get it from the file path | |
| file_size_mb = os.path.getsize(file.name) / (1024 * 1024) | |
| else: | |
| # If we can't determine file size, skip the check | |
| file_size_mb = 0 | |
| if file_size_mb > 20: | |
| return f"Error: File size ({file_size_mb:.2f} MB) exceeds the maximum allowed size of 20 MB. Please upload a smaller file." | |
| except Exception as e: | |
| # If file size check fails, log it but continue processing | |
| logger.warning(f"Could not check file size: {e}") | |
| # Continue without size check | |
| # Get file extension | |
| filename = file.name | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext not in SUPPORTED_FILES: | |
| return f"Error: Unsupported file type {ext}. Supported types: {', '.join(SUPPORTED_FILES)}" | |
| try: | |
| # Process based on file type | |
| if ext == ".pdf": | |
| extracted_text = await process_pdf(file.name) | |
| elif ext in [".doc", ".docx"]: | |
| extracted_text = await process_docx(file.name) | |
| elif ext == ".txt": | |
| extracted_text = await process_txt(file.name) | |
| elif ext == ".pptx": | |
| extracted_text = await process_pptx(file.name) | |
| elif ext == ".xlsx": | |
| extracted_text, _ = await process_excel(file.name) | |
| elif ext in [".mp3", ".wav"]: | |
| extracted_text = await process_audio(file.name) | |
| elif ext == ".mp4": | |
| video_result = await process_video(file.name) | |
| # Combine transcript and visual summary | |
| if isinstance(video_result, dict): | |
| transcript = video_result.get("transcript", "") | |
| visual_summary = video_result.get("visual_summary", "") | |
| if visual_summary: | |
| extracted_text = f"=== VIDEO TRANSCRIPT ===\n\n{transcript}\n\n=== VISUAL SUMMARY (Based on Video Frames) ===\n\n{visual_summary}" | |
| else: | |
| extracted_text = transcript | |
| else: | |
| # Fallback for old format | |
| extracted_text = video_result | |
| elif ext == ".dxf": | |
| extracted_text = await process_dxf(file.name) | |
| elif ext in [".jpg", ".jpeg", ".png"]: | |
| extracted_text = await process_image(file.name) | |
| else: | |
| return f"Error: Unsupported file type {ext}" | |
| return extracted_text | |
| except Exception as e: | |
| return f"Error processing file: {str(e)}" | |
| def extract_text_from_file(file, video_id, web_url, storage_provider="azure"): | |
| """Main function to handle file extraction through Gradio""" | |
| try: | |
| # Run the async function in an event loop | |
| return asyncio.run(_extract_text_async(file, video_id, web_url, storage_provider)) | |
| except Exception as e: | |
| return f"Error: {str(e)}" |