Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import math | |
| import re | |
| import tiktoken | |
| import logging | |
| import asyncio | |
| from openai import AsyncOpenAI | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def chunk_text( | |
| text: str, chunk_size: int = 8000, overlap: int = 400, max_chunks: int | None = None, use_tokens: bool = True) -> list[str]: | |
| """ | |
| Robust chunker with safety guards and fallback. | |
| - If tiktoken is available and use_tokens=True: chunk by tokens. | |
| - Otherwise: chunk by characters. | |
| - Ensures forward progress even with bad params. | |
| - For small files, reduces overlap to avoid duplicate content. | |
| """ | |
| if not text or not isinstance(text, str): | |
| return [] | |
| # 1) Sanitize control chars that sometimes appear in OCR/Aspose output | |
| # Keep \n and \t; remove other C0 control chars. | |
| text = re.sub(r"[^\S\r\n]+", " ", text) # collapse runs of spaces | |
| text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F]", "", text).strip() | |
| # 2) Guard parameters | |
| if chunk_size <= 0: | |
| chunk_size = 1000 | |
| if overlap < 0: | |
| overlap = 0 | |
| if overlap >= chunk_size: | |
| # auto-fix: keep 20% overlap | |
| overlap = max(0, int(chunk_size * 0.2)) | |
| # 3) For very small files, return as single chunk to avoid duplicates | |
| text_length = len(text) | |
| if text_length < chunk_size: # If file is smaller than one chunk | |
| logger.info(f"Very small file detected ({text_length} chars), returning as single chunk") | |
| return [text] | |
| # 4) For small files, reduce overlap to avoid duplicate content | |
| if text_length < chunk_size * 2: # If file is smaller than 2 chunks | |
| overlap = min(overlap, max(0, int(text_length * 0.1))) # Reduce overlap to 10% of file size | |
| logger.info(f"Small file detected ({text_length} chars), reduced overlap to {overlap}") | |
| # 3) Try token-based chunking | |
| tokens = None | |
| encoding = None | |
| token_mode = False | |
| t0 = time.time() | |
| if use_tokens: | |
| try: | |
| import tiktoken | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| tokens = encoding.encode(text) | |
| token_mode = True | |
| except Exception: | |
| token_mode = False # fall back to char mode | |
| chunks: list[str] = [] | |
| if token_mode: | |
| # Safety: compute theoretical max chunk count to avoid infinite loops | |
| step = chunk_size - overlap | |
| if step <= 0: | |
| step = max(1, chunk_size // 2) # should not happen due to guard above | |
| theoretical = math.ceil(max(1, len(tokens)) / step) | |
| hard_cap = min(theoretical + 2, 20000) # absolute safety cap | |
| if max_chunks is None: | |
| max_chunks = hard_cap | |
| else: | |
| max_chunks = min(max_chunks, hard_cap) | |
| start = 0 | |
| made_progress = True | |
| count = 0 | |
| while start < len(tokens) and count < max_chunks and made_progress: | |
| end = min(start + chunk_size, len(tokens)) | |
| chunk_tokens = tokens[start:end] | |
| chunk_text = encoding.decode(chunk_tokens).strip() | |
| if chunk_text: | |
| chunks.append(chunk_text) | |
| prev_start = start | |
| start = end - overlap | |
| # Guarantee forward progress | |
| if start <= prev_start: | |
| start = prev_start + 1 | |
| count += 1 | |
| else: | |
| # Character-based fallback | |
| step = chunk_size - overlap | |
| if step <= 0: | |
| step = max(1, chunk_size // 2) | |
| theoretical = math.ceil(max(1, len(text)) / step) | |
| hard_cap = min(theoretical + 2, 20000) | |
| if max_chunks is None: | |
| max_chunks = hard_cap | |
| else: | |
| max_chunks = min(max_chunks, hard_cap) | |
| start = 0 | |
| count = 0 | |
| while start < len(text) and count < max_chunks: | |
| end = min(start + chunk_size, len(text)) | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| prev_start = start | |
| start = end - overlap | |
| if start <= prev_start: | |
| start = prev_start + 1 | |
| count += 1 | |
| t1 = time.time() | |
| return chunks | |
| def validate_chunk_sizes(chunks, max_tokens=8000): | |
| """Validate that all chunks are within token limits""" | |
| try: | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| valid_chunks = [] | |
| for i, chunk in enumerate(chunks): | |
| tokens = encoding.encode(chunk) | |
| if len(tokens) <= max_tokens: | |
| valid_chunks.append(chunk) | |
| else: | |
| logger.warning(f"Chunk {i+1} exceeds {max_tokens} tokens ({len(tokens)} tokens). Splitting...") | |
| # Split oversized chunk | |
| split_chunks = chunk_text(chunk, chunk_size=max_tokens, overlap=200) | |
| valid_chunks.extend(split_chunks) | |
| return valid_chunks | |
| except Exception as e: | |
| logger.error(f"Error validating chunk sizes: {e}") | |
| return chunks # Return original chunks if validation fails | |
| async def generate_embeddings_batch(texts, progress_callback=None, batch_size=100) -> list: | |
| try: | |
| # Ensure texts is a list | |
| if isinstance(texts, str): | |
| texts = [texts] | |
| # Show progress message | |
| if progress_callback: | |
| await progress_callback("Generating embeddings...") | |
| all_embeddings = [] | |
| total_batches = math.ceil(len(texts) / batch_size) | |
| for batch_idx in range(0, len(texts), batch_size): | |
| batch_texts = texts[batch_idx:batch_idx + batch_size] | |
| current_batch = (batch_idx // batch_size) + 1 | |
| if progress_callback: | |
| await progress_callback(f"Generating embeddings batch {current_batch}/{total_batches}...") | |
| try: | |
| response = await client.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=batch_texts | |
| ) | |
| batch_embeddings = [item.embedding for item in response.data] | |
| all_embeddings.extend(batch_embeddings) | |
| # Small delay to avoid rate limiting | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| logger.error(f"Error generating batch {current_batch} embeddings: {e}") | |
| # If batch fails, try individual texts | |
| for text in batch_texts: | |
| try: | |
| response = await client.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=[text] | |
| ) | |
| all_embeddings.append(response.data[0].embedding) | |
| except Exception as individual_error: | |
| logger.error(f"Error generating individual embedding: {individual_error}") | |
| # Add zero vector as fallback | |
| all_embeddings.append([0.0] * 1536) # text-embedding-3-small has 1536 dimensions | |
| return all_embeddings | |
| except Exception as e: | |
| logger.error(f"Error generating batch embeddings: {e}") | |
| raise Exception(f"Error generating batch embeddings: {str(e)}") | |
| def extract_visual_elements_from_text(document_text: str) -> dict: | |
| """ | |
| Extract Visual Elements sections from document text with page context. | |
| Enhanced to extract all figures with their page numbers. | |
| Args: | |
| document_text: Full document text content | |
| Returns: | |
| Dict mapping figure identifiers to dict with: description, page_number, figure_number | |
| Format: {"figure_5": {"description": "...", "page_number": 2, "figure_number": 5}} | |
| """ | |
| try: | |
| visual_elements = {} | |
| if not document_text: | |
| return visual_elements | |
| # Split document by page markers to get page context | |
| pages = re.split(r'---\s*PAGE\s*(\d+)\s*---', document_text) | |
| current_page = None | |
| # Process pages: pages[0] is content before first page marker, then alternating page_num and content | |
| for i in range(len(pages)): | |
| if i % 2 == 1: # Odd indices are page numbers | |
| current_page = int(pages[i]) | |
| elif i > 0 and current_page is not None: # Even indices (after first) are page content | |
| page_content = pages[i] | |
| # Extract figures from this page content | |
| page_figures = _extract_figures_from_page_content(page_content, current_page) | |
| for fig_key, fig_data in page_figures.items(): | |
| if fig_key not in visual_elements: # Don't overwrite if already found | |
| visual_elements[fig_key] = fig_data | |
| # Also check content before first page marker (assume page 1) | |
| if pages and len(pages) > 0: | |
| pre_content = pages[0] | |
| pre_figures = _extract_figures_from_page_content(pre_content, 1) | |
| for fig_key, fig_data in pre_figures.items(): | |
| if fig_key not in visual_elements: | |
| visual_elements[fig_key] = fig_data | |
| # Also search entire document for any missed figures (fallback) | |
| _extract_figures_from_full_document(document_text, visual_elements) | |
| logger.info(f"Total visual elements extracted: {len(visual_elements)}") | |
| return visual_elements | |
| except Exception as e: | |
| logger.error(f"Error extracting visual elements from text: {e}") | |
| return {} | |
| def _extract_figures_from_page_content(page_content: str, page_number: int) -> dict: | |
| """ | |
| Extract figures from a single page's content. | |
| Args: | |
| page_content: Text content of a single page | |
| page_number: Page number | |
| Returns: | |
| Dict mapping figure keys to figure data | |
| """ | |
| figures = {} | |
| # Pattern 1: Find "Visual Elements" sections | |
| visual_sections_patterns = [ | |
| r'\*\*Visual Elements.*?\*\*:(.*?)(?=\n--- PAGE|\n\*\*[A-Z]|$)', | |
| r'Visual Elements[:\-](.*?)(?=\n--- PAGE|\n\*\*[A-Z]|$)', | |
| r'\*\*Visual.*?Elements.*?\*\*:(.*?)(?=\n--- PAGE|\n\*\*[A-Z]|$)' | |
| ] | |
| visual_sections = [] | |
| for pattern in visual_sections_patterns: | |
| matches = re.findall(pattern, page_content, re.DOTALL | re.IGNORECASE) | |
| visual_sections.extend(matches) | |
| # If no visual sections, search entire page content | |
| if not visual_sections: | |
| visual_sections = [page_content] | |
| for section in visual_sections: | |
| # Enhanced patterns to extract full figure descriptions | |
| figure_patterns = [ | |
| r'\*\*Figure\s*(\d+):\s*([^\*]+?)(?=\*\*Figure|\*\*Table|\*\*Chart|\*\*|$)', | |
| r'図\s*(\d+)[:\-]\s*([^\*\n]+?)(?=図|\*\*|$)', | |
| r'Figure\s*(\d+)\s*[:\-]\s*([^\*\n]+?)(?=Figure|\*\*|$)', | |
| r'\*\*Figure\s*(\d+)[:\-]\s*\*\*([^\*]+?)(?=\*\*Figure|\*\*Table|\*\*Chart|\*\*|$)', | |
| # Pattern for numbered list format: "1. **Figure 5:** ..." | |
| r'(?:^|\n)\s*\d+\.\s*\*\*Figure\s*(\d+)[:\-]\s*\*\*([^\*]+?)(?=\*\*Figure|\*\*Table|\*\*Chart|\*\*|$)', | |
| # Pattern for bullet points: "* **Figure 5:** ..." | |
| r'(?:^|\n)\s*\*\s*\*\*Figure\s*(\d+)[:\-]\s*\*\*([^\*]+?)(?=\*\*Figure|\*\*Table|\*\*Chart|\*\*|$)', | |
| # Pattern for abbreviated format: "Fig. 1", "Fig. 2", etc. (with markdown) | |
| r'\*\*Fig\.\s*(\d+)[:\-]\s*\*\*([^\*]+?)(?=\*\*Fig|\*\*Figure|\*\*Table|\*\*Chart|\*\*|$)', | |
| r'\*\*Fig\s*(\d+)[:\-]\s*\*\*([^\*]+?)(?=\*\*Fig|\*\*Figure|\*\*Table|\*\*Chart|\*\*|$)', | |
| # Pattern for abbreviated format: "Fig. 1:", "Fig. 2 -", "Fig.1 Title", "Fig. 5 Title" etc. (without markdown) | |
| r'Fig\.\s*(\d+)\s*[:\-]?\s*([^\n\r]+(?:\n[^\n\r]*?){0,15}?)(?=\n\s*(?:Fig|Figure|\*\*|$))', | |
| r'Fig\s*(\d+)\s*[:\-]?\s*([^\n\r]+(?:\n[^\n\r]*?){0,15}?)(?=\n\s*(?:Fig|Figure|\*\*|$))', | |
| # Pattern for "Fig.1" (no space after period, no colon) | |
| r'Fig\.(\d+)\s+([^\n\r]+(?:\n[^\n\r]*?){0,15}?)(?=\n\s*(?:Fig|Figure|\*\*|$))' | |
| ] | |
| for pattern in figure_patterns: | |
| matches = re.findall(pattern, section, re.DOTALL | re.IGNORECASE | re.MULTILINE) | |
| for fig_num, description in matches: | |
| fig_num_int = int(fig_num) | |
| fig_key = f"figure_{fig_num_int}" | |
| clean_description = re.sub(r'\n\s*\n+', '\n', description.strip()) | |
| clean_description = re.sub(r' +', ' ', clean_description) | |
| if fig_key not in figures: | |
| figures[fig_key] = { | |
| "description": clean_description, | |
| "page_number": page_number, | |
| "figure_number": fig_num_int, | |
| "title": "" | |
| } | |
| logger.info(f"Extracted figure {fig_num_int} from page {page_number}") | |
| # Also look for direct figure references in page text | |
| direct_figure_patterns = [ | |
| r'(Figure\s*\d+)[:\-]\s*([^\n\r]+(?:\n[^\n\r]*?){0,10}?)(?=\n\s*(?:Figure|\*\*|$))', | |
| r'(図\s*\d+)[:\-]\s*([^\n\r]+(?:\n[^\n\r]*?){0,10}?)(?=\n\s*(?:図|\*\*|$))', | |
| # Pattern for abbreviated format: "Fig. 1", "Fig. 2", "Fig.1 Title", "Fig. 5 Title" etc. | |
| r'(Fig\.\s*\d+)[:\-]?\s*([^\n\r]+(?:\n[^\n\r]*?){0,15}?)(?=\n\s*(?:Fig|Figure|\*\*|$))', | |
| r'(Fig\s*\d+)[:\-]?\s*([^\n\r]+(?:\n[^\n\r]*?){0,15}?)(?=\n\s*(?:Fig|Figure|\*\*|$))', | |
| # Pattern for "Fig.1 Title" (no space after period, no colon) | |
| r'(Fig\.\d+)\s+([^\n\r]+(?:\n[^\n\r]*?){0,15}?)(?=\n\s*(?:Fig|Figure|\*\*|$))' | |
| ] | |
| for pattern in direct_figure_patterns: | |
| matches = re.findall(pattern, page_content, re.MULTILINE | re.IGNORECASE) | |
| for figure_ref, description in matches: | |
| fig_num_match = re.search(r'(\d+)', figure_ref) | |
| if fig_num_match: | |
| fig_num = int(fig_num_match.group(1)) | |
| fig_key = f"figure_{fig_num}" | |
| if fig_key not in figures: # Don't overwrite detailed descriptions | |
| clean_description = re.sub(r'\n\s*\n+', '\n', description.strip()) | |
| clean_description = re.sub(r' +', ' ', clean_description) | |
| figures[fig_key] = { | |
| "description": f"{figure_ref}: {clean_description}", | |
| "page_number": page_number, | |
| "figure_number": fig_num, | |
| "title": "" | |
| } | |
| logger.info(f"Extracted direct figure reference {fig_num} from page {page_number}") | |
| return figures | |
| def _extract_figures_from_full_document(document_text: str, visual_elements: dict): | |
| """ | |
| Fallback: Extract any missed figures from entire document without page context. | |
| Args: | |
| document_text: Full document text | |
| visual_elements: Existing visual elements dict to update | |
| """ | |
| # Look for any figure references we might have missed | |
| figure_ref_pattern = r'(?:^|\n)\s*(?:Figure|Fig\.|Fig|図)\s*(\d+)[:\-]' | |
| matches = re.finditer(figure_ref_pattern, document_text, re.MULTILINE | re.IGNORECASE) | |
| for match in matches: | |
| fig_num = int(match.group(1)) | |
| fig_key = f"figure_{fig_num}" | |
| # Only add if not already found | |
| if fig_key not in visual_elements: | |
| # Try to extract description after the figure reference | |
| start_pos = match.end() | |
| # Look for next figure or end of section | |
| end_match = re.search(r'(?:Figure|図)\s*\d+[:\-]', document_text[start_pos:], re.IGNORECASE) | |
| if end_match: | |
| description = document_text[start_pos:start_pos + end_match.start()].strip() | |
| else: | |
| # Take next 500 chars | |
| description = document_text[start_pos:start_pos + 500].strip() | |
| if description: | |
| clean_description = re.sub(r'\n\s*\n+', '\n', description) | |
| clean_description = re.sub(r' +', ' ', clean_description) | |
| visual_elements[fig_key] = { | |
| "description": clean_description[:200], # Limit length | |
| "page_number": None, # Unknown | |
| "figure_number": fig_num, | |
| "title": "" | |
| } | |
| logger.info(f"Extracted fallback figure reference: {fig_key}") | |
| def match_image_to_figure(image_id: str, visual_elements: dict, used_figures: set = None) -> tuple: | |
| """ | |
| Match image ID to figure description from visual elements using page-based matching. | |
| Enhanced with conflict resolution to prevent duplicate mappings. | |
| Args: | |
| image_id: Image identifier (e.g., "page2_image1") | |
| visual_elements: Dict of extracted visual elements (with page_number in each entry) | |
| used_figures: Set of figure numbers already matched (for conflict resolution) | |
| Returns: | |
| Tuple of (figure_number, figure_description) or (None, "") if no match | |
| """ | |
| try: | |
| if used_figures is None: | |
| used_figures = set() | |
| # Extract page number from image_id | |
| page_match = re.search(r'page(\d+)', image_id) | |
| if not page_match: | |
| return (None, "") | |
| image_page_num = int(page_match.group(1)) | |
| # Extract image index from image_id | |
| img_match = re.search(r'image(\d+)', image_id) | |
| img_index = int(img_match.group(1)) if img_match else 1 | |
| # Strategy 1: Match by actual page number (most accurate) | |
| page_matches = [] | |
| all_figures_on_page = [] # Track all figures on this page (used or not) | |
| for fig_key, fig_data in visual_elements.items(): | |
| # Handle both old format (string) and new format (dict) | |
| if isinstance(fig_data, dict): | |
| fig_page_num = fig_data.get("page_number") | |
| fig_num = fig_data.get("figure_number") | |
| description = fig_data.get("description", "") | |
| else: | |
| # Old format - try to extract from string | |
| fig_num_match = re.search(r'figure_(\d+)', fig_key) | |
| if not fig_num_match: | |
| continue | |
| fig_num = int(fig_num_match.group(1)) | |
| description = fig_data if isinstance(fig_data, str) else "" | |
| fig_page_num = None # Unknown for old format | |
| # Exact page match is best | |
| if fig_page_num is not None and fig_page_num == image_page_num: | |
| # Track all figures on this page | |
| all_figures_on_page.append((fig_num, fig_key, description)) | |
| # Skip if already used (conflict resolution) - but we'll check later if we can reuse | |
| if fig_num not in used_figures: | |
| page_matches.append((fig_num, fig_key, description, img_index)) | |
| # If we have page matches, use image index to select the right one | |
| if page_matches: | |
| # Sort by figure number, then use image index | |
| page_matches.sort(key=lambda x: x[0]) # Sort by figure number | |
| # If multiple figures on same page, match by image index order | |
| if len(page_matches) >= img_index: | |
| best_match = page_matches[img_index - 1] # image1 = first figure, image2 = second, etc. | |
| else: | |
| best_match = page_matches[0] # Fallback to first match | |
| # Mark as used | |
| used_figures.add(best_match[0]) | |
| try: | |
| logger.info(f"Matched {image_id} (page {image_page_num}, image {img_index}) to Figure {best_match[0]} on page {image_page_num}") | |
| except UnicodeEncodeError: | |
| logger.info(f"Matched {image_id} to figure (Unicode characters present)") | |
| return (best_match[0], best_match[2]) | |
| # Solution 1: Relax conflict resolution for same-page images | |
| # If no unused figures on this page, but all figures on page are used, | |
| # allow reusing the last figure for remaining images on the same page | |
| if not page_matches and all_figures_on_page: | |
| # Check if all figures on this page are already used | |
| all_used = all(fig_num in used_figures for fig_num, _, _ in all_figures_on_page) | |
| if all_used: | |
| # All figures on this page are used, but we have more images on this page | |
| # Allow reusing the last figure (or first if only one) for remaining images | |
| all_figures_on_page.sort(key=lambda x: x[0]) # Sort by figure number | |
| # Use the last figure on the page for remaining images | |
| # This handles the case: 1 figure, multiple images on same page | |
| best_match = all_figures_on_page[-1] # Last figure on page | |
| fig_num, fig_key, description = best_match | |
| # Don't mark as used again (it's already used) | |
| # But allow this match since it's on the same page | |
| try: | |
| logger.info(f"Matched {image_id} (page {image_page_num}, image {img_index}) to Figure {fig_num} on page {image_page_num} (reused - all figures on page already used)") | |
| except UnicodeEncodeError: | |
| logger.info(f"Matched {image_id} to figure (reused, Unicode characters present)") | |
| return (fig_num, description) | |
| # Strategy 2: Fallback - if no page match, try proximity (only if page numbers unknown) | |
| potential_figures = [] | |
| for fig_key, fig_data in visual_elements.items(): | |
| if isinstance(fig_data, dict): | |
| fig_num = fig_data.get("figure_number") | |
| description = fig_data.get("description", "") | |
| fig_page_num = fig_data.get("page_number") | |
| else: | |
| fig_num_match = re.search(r'figure_(\d+)', fig_key) | |
| if not fig_num_match: | |
| continue | |
| fig_num = int(fig_num_match.group(1)) | |
| description = fig_data if isinstance(fig_data, str) else "" | |
| fig_page_num = None | |
| if fig_num is None: | |
| continue | |
| # Only use fallback if page number is unknown AND not already used | |
| if fig_page_num is None and fig_num not in used_figures: | |
| distance = abs(fig_num - (image_page_num + img_index)) | |
| if distance <= 2: # Tighter threshold | |
| potential_figures.append((fig_num, fig_key, description, distance)) | |
| if potential_figures: | |
| potential_figures.sort(key=lambda x: x[3]) # Sort by distance | |
| best_match = potential_figures[0] | |
| used_figures.add(best_match[0]) | |
| logger.warning(f"Using fallback matching for {image_id} to Figure {best_match[0]} (page number unknown)") | |
| return (best_match[0], best_match[2]) | |
| return (None, "") | |
| except Exception as e: | |
| logger.error(f"Error matching image {image_id} to figure: {e}") | |
| return (None, "") | |
| async def merge_visual_elements_with_ai_summary(image_id: str, ai_summary: str, document_text: str, used_figures: set = None) -> tuple: | |
| """ | |
| Merge extracted visual elements with AI-generated image summary. | |
| Enhanced to handle page numbers and conflict resolution. | |
| Args: | |
| image_id: Image identifier | |
| ai_summary: AI-generated summary | |
| document_text: Full document text | |
| used_figures: Set of figure numbers already matched (for conflict resolution) | |
| Returns: | |
| Tuple of (enhanced_summary, figure_metadata_dict) | |
| figure_metadata_dict contains: figure_number, figure_description, page_number, or None if no match | |
| """ | |
| try: | |
| # Extract visual elements from document text (run in executor to avoid blocking) | |
| try: | |
| loop = asyncio.get_event_loop() | |
| visual_elements = await loop.run_in_executor(None, extract_visual_elements_from_text, document_text) | |
| except Exception as e: | |
| # Fallback to synchronous if executor fails | |
| logger.warning(f"Failed to run extract_visual_elements_from_text in executor: {e}, using sync") | |
| visual_elements = extract_visual_elements_from_text(document_text) | |
| if not visual_elements: | |
| logger.info(f"No visual elements found for {image_id}, using AI summary only") | |
| return (ai_summary, None) | |
| # Initialize used_figures if not provided | |
| if used_figures is None: | |
| used_figures = set() | |
| # Try to match image to figure description with conflict resolution | |
| figure_num, matched_description = match_image_to_figure(image_id, visual_elements, used_figures) | |
| # Get page number from visual elements if available | |
| page_number = None | |
| if figure_num is not None: | |
| fig_key = f"figure_{figure_num}" | |
| if fig_key in visual_elements: | |
| fig_data = visual_elements[fig_key] | |
| if isinstance(fig_data, dict): | |
| page_number = fig_data.get("page_number") | |
| figure_metadata = None | |
| if matched_description and figure_num is not None: | |
| # Create enhanced summary combining both sources | |
| try: | |
| enhanced_summary = f"""**Figure {figure_num} - Document Description:** | |
| {matched_description} | |
| **AI Visual Analysis:** | |
| {ai_summary}""" | |
| # Ensure the summary can be encoded properly | |
| enhanced_summary.encode('utf-8') | |
| # Create figure metadata | |
| figure_metadata = { | |
| "figure_number": figure_num, | |
| "figure_key": f"figure_{figure_num}", | |
| "figure_description": matched_description, | |
| "image_id": image_id, | |
| "page_number": page_number | |
| } | |
| logger.info(f"Enhanced summary created for {image_id} -> Figure {figure_num} (page {page_number}) using document + AI") | |
| return (enhanced_summary, figure_metadata) | |
| except UnicodeEncodeError as ue: | |
| logger.warning(f"Unicode encoding issue for {image_id}, using AI summary only: {ue}") | |
| return (ai_summary, None) | |
| else: | |
| logger.info(f"No matching figure found for {image_id}, using AI summary only") | |
| return (ai_summary, None) | |
| except Exception as e: | |
| logger.error(f"Error merging visual elements for {image_id}: {e}") | |
| return (ai_summary, None) |