""" Enhanced RAG System - Visual Image Analysis Sends base64 images directly to GPT-4o for visual analysis (not just OCR) Then stores results in vector store """ from typing import List, Dict from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage import base64 import os from pathlib import Path from config import ( OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS, LANGUAGE, CHROMA_DB_PATH ) class VisualMultimodalRAG: """ RAG system that: 1. Sends images as base64 to GPT-4o for visual analysis 2. Gets detailed visual descriptions and insights 3. Stores visual analysis in vector store 4. Enables image-based semantic search """ def __init__(self, api_key: str = None, debug: bool = True): api_key = api_key or OPENAI_API_KEY self.debug = debug # Use gpt-4o for vision capabilities self.llm = ChatOpenAI( model_name="gpt-4o-mini", # CRITICAL: gpt-4o has vision api_key=api_key, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) self.language = LANGUAGE self.visual_summaries_log = [] if self.debug: print("āœ… VisualMultimodalRAG initialized with gpt-4o (vision model)") def _debug_print(self, label: str, data: any): """Print debug information""" if self.debug: print(f"\nšŸ” DEBUG [{label}]:") if isinstance(data, (list, dict)): print(f" Type: {type(data).__name__}") print(f" Content: {str(data)[:300]}...") else: print(f" {data}") def _image_to_base64(self, image_path: str) -> str: """Convert image file to base64 string""" try: with open(image_path, 'rb') as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') return image_data except Exception as e: print(f"Error converting image to base64: {e}") return None def analyze_image_visually(self, image_path: str, image_idx: int) -> str: """ Send actual image (base64) to gpt-4o for visual analysis Returns detailed visual analysis/description gpt-4o can see: - Charts, graphs, diagrams - Tables and structured data - Photos and drawings - Handwritten text - Screenshots - Any visual content """ if not os.path.exists(image_path): return f"[Image {image_idx}: File not found - {image_path}]" try: # Convert image to base64 image_base64 = self._image_to_base64(image_path) if not image_base64: return f"[Image {image_idx}: Could not convert to base64]" # Determine image type file_ext = Path(image_path).suffix.lower() media_type_map = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' } media_type = media_type_map.get(file_ext, 'image/png') print(f"šŸ” Analyzing image {image_idx} visually (as {media_type})...") # Create message with image message = HumanMessage( content=[ { "type": "image_url", "image_url": { "url": f"data:{media_type};base64,{image_base64}", }, }, { "type": "text", "text": f"""Analyze this image in detail in {self.language}. Provide a comprehensive visual analysis including: 1. **What you see** - Main objects, elements, structure 2. **Data/Content** - Any numbers, text, charts, graphs 3. **Purpose** - What this image is showing or representing 4. **Key insights** - Important patterns, trends, or information 5. **Connections** - How this relates to document content Be specific and detailed. Focus on visual information that cannot be extracted from text alone. Analysis:""" } ], ) # Call gpt-4o with vision response = self.llm.invoke([message]) analysis = response.content.strip() if self.debug: self._debug_print(f"Image {image_idx} Visual Analysis", analysis) print(f"āœ… Image {image_idx} analyzed successfully") return analysis except Exception as e: error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]" print(f"āŒ Error analyzing image {image_idx}: {e}") return error_msg def analyze_images_visually(self, images: List[Dict]) -> List[Dict]: """ Analyze each image visually using gpt-4o vision Returns list of {image_index, visual_analysis, type} """ visual_analyses = [] for idx, image in enumerate(images): image_path = image.get('path', '') if not image_path: print(f"āš ļø Image {idx}: No path provided") continue # Analyze image visually (not just OCR) visual_analysis = self.analyze_image_visually(image_path, idx) visual_analyses.append({ 'type': 'image_visual', 'image_index': idx, 'image_path': image_path, 'visual_analysis': visual_analysis, 'ocr_text': image.get('ocr_text', '') # Keep OCR as backup }) return visual_analyses def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]: """ Chunk text and summarize each chunk individually """ chunks = [] text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300) self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks") for idx, chunk in enumerate(text_chunks): if len(chunk.strip()) < 50: continue try: prompt = f"""Summarize this text chunk in {self.language}. Keep it concise. Extract key points, facts, and main ideas. Text Chunk: {chunk} Summary (2-3 sentences maximum):""" message = HumanMessage(content=prompt) response = self.llm.invoke([message]) summary = response.content.strip() chunks.append({ 'type': 'text_chunk', 'chunk_index': len(chunks), 'original_text': chunk[:500], 'summary': summary, 'chunk_length': len(chunk) }) if self.debug: self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary) except Exception as e: print(f"Error summarizing text chunk: {e}") return chunks def summarize_tables(self, tables: List[Dict]) -> List[Dict]: """ Summarize each table individually """ summaries = [] for idx, table in enumerate(tables): table_content = table.get('content', '') if not table_content or len(table_content.strip()) < 10: continue try: prompt = f"""Analyze and summarize this table/structured data in {self.language}. Extract key insights, row/column meanings, and important figures. Table Content: {table_content} Summary (2-3 sentences maximum):""" message = HumanMessage(content=prompt) response = self.llm.invoke([message]) summary = response.content.strip() summaries.append({ 'type': 'table', 'table_index': idx, 'original_content': table_content[:500], 'summary': summary, 'table_length': len(table_content) }) if self.debug: self._debug_print(f"Table {idx} Summary", summary) except Exception as e: print(f"Error summarizing table {idx}: {e}") return summaries def process_and_store_document( self, text: str, images: List[Dict], tables: List[Dict], vector_store, doc_id: str ) -> Dict: """ Main function: Analyze all components visually and store in vector store Images are analyzed using gpt-4o vision (not just OCR) """ print(f"\n{'='*70}") print(f"PROCESSING WITH VISUAL IMAGE ANALYSIS: {doc_id}") print(f"{'='*70}") results = { 'doc_id': doc_id, 'image_visual_analyses': [], 'text_summaries': [], 'table_summaries': [], 'total_stored': 0 } # 1. Analyze images VISUALLY using gpt-4o print(f"\nšŸ–¼ļø VISUAL IMAGE ANALYSIS (gpt-4o vision) ({len(images)} total)") print(f"{'─'*70}") image_analyses = self.analyze_images_visually(images) results['image_visual_analyses'] = image_analyses # Store each image analysis in vector store image_docs = { 'text': ' | '.join([ f"Image {a['image_index']}: {a['visual_analysis']}" for a in image_analyses ]), 'images': [], 'tables': [] } for analysis in image_analyses: print(f" āœ… Image {analysis['image_index']} (visual analysis)") print(f" Path: {analysis['image_path']}") print(f" Analysis: {analysis['visual_analysis'][:100]}...") if image_analyses: try: vector_store.add_documents( image_docs, f"{doc_id}_images_visual" ) results['total_stored'] += len(image_analyses) print(f"āœ… Stored {len(image_analyses)} image visual analyses") except Exception as e: print(f"āŒ Error storing image analyses: {e}") # 2. Summarize and store text chunks print(f"\nšŸ“ TEXT CHUNK SUMMARIZATION") print(f"{'─'*70}") text_summaries = self.summarize_text_chunks(text) results['text_summaries'] = text_summaries text_docs = { 'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}" for s in text_summaries]), 'images': [], 'tables': [] } for summary in text_summaries: print(f" āœ… Chunk {summary['chunk_index']}: {summary['summary'][:50]}...") if text_summaries: try: vector_store.add_documents( text_docs, f"{doc_id}_text_chunks" ) results['total_stored'] += len(text_summaries) print(f"āœ… Stored {len(text_summaries)} text chunk summaries") except Exception as e: print(f"āŒ Error storing text summaries: {e}") # 3. Summarize and store tables print(f"\nšŸ“‹ TABLE SUMMARIZATION ({len(tables)} total)") print(f"{'─'*70}") table_summaries = self.summarize_tables(tables) results['table_summaries'] = table_summaries table_docs = { 'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}" for s in table_summaries]), 'images': [], 'tables': [] } for summary in table_summaries: print(f" āœ… Table {summary['table_index']}: {summary['summary'][:50]}...") if table_summaries: try: vector_store.add_documents( table_docs, f"{doc_id}_tables" ) results['total_stored'] += len(table_summaries) print(f"āœ… Stored {len(table_summaries)} table summaries") except Exception as e: print(f"āŒ Error storing table summaries: {e}") # 4. Summary statistics print(f"\n{'='*70}") print(f"šŸ“Š STORAGE SUMMARY") print(f"{'='*70}") print(f" Images analyzed visually & stored: {len(image_analyses)}") print(f" Text chunks summarized & stored: {len(text_summaries)}") print(f" Tables summarized & stored: {len(table_summaries)}") print(f" Total items stored in vector: {results['total_stored']}") print(f"{'='*70}") self.visual_summaries_log.append(results) return results def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]: """Split text into overlapping chunks""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start = end - overlap return chunks def get_visual_summaries_log(self) -> List[Dict]: """Get all visual analysis logs""" return self.visual_summaries_log class AnsweringRAG: """ RAG system that: 1. Searches vector store for relevant content 2. ANALYZES search results 3. Generates intelligent answers based on context """ def __init__(self, api_key: str = None, debug: bool = True): api_key = api_key or OPENAI_API_KEY self.debug = debug self.llm = ChatOpenAI( model_name="gpt-4o-mini", # Use gpt-4o for better understanding api_key=api_key, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) self.language = LANGUAGE self.answer_log = [] if self.debug: print("āœ… AnsweringRAG initialized with answer generation") def _debug_print(self, label: str, data: any): """Print debug information""" if self.debug: print(f"\nšŸ” DEBUG [{label}]:") if isinstance(data, (list, dict)): print(f" Type: {type(data).__name__}") print(f" Content: {str(data)[:300]}...") else: print(f" {data}") def analyze_and_answer( self, question: str, search_results: List[Dict] ) -> Dict: """ Analyze search results and generate intelligent answer Returns: { 'question': user question, 'answer': detailed answer, 'sources_used': number of sources, 'confidence': low/medium/high, 'search_results': original search results } """ print(f"\n{'='*70}") print(f"ANALYZING QUESTION & GENERATING ANSWER") print(f"{'='*70}") print(f"\nā“ Question: {question}") print(f"šŸ“Š Search Results Found: {len(search_results)}") # Check if we have search results if not search_results: print(f"āš ļø No search results found!") answer = f"""I could not find relevant information in the document to answer your question: "{question}" Try: - Using different keywords - Breaking the question into smaller parts - Asking about other topics in the document""" result = { 'question': question, 'answer': answer, 'sources_used': 0, 'confidence': 'low', 'search_results': [] } self.answer_log.append(result) return result # Build context from search results context_parts = [] for idx, result in enumerate(search_results, 1): content = result.get('content', '') metadata = result.get('metadata', {}) content_type = result.get('type', 'unknown') distance = result.get('distance', 0) relevance = 1 - distance if distance else 0 context_parts.append(f""" [Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})] {content}""") full_context = "\n".join(context_parts) self._debug_print("Context Prepared", f"{len(context_parts)} sources, {len(full_context)} chars") # Build prompt to analyze results and answer question analysis_prompt = f"""You are a helpful assistant analyzing document content to answer user questions. USER QUESTION: "{question}" RELEVANT CONTENT FROM DOCUMENT: {full_context} INSTRUCTIONS: 1. Analyze the provided content carefully 2. Extract information relevant to the question 3. Synthesize a clear, comprehensive answer in {self.language} 4. If the content doesn't fully answer the question, explain what information is available 5. Be specific and cite the content when relevant 6. Structure your answer clearly with key points ANSWER:""" print(f"\nšŸ” Analyzing search results...") print(f" Context size: {len(full_context)} characters") print(f" Sources: {len(search_results)}") try: # Call LLM to analyze and answer message = HumanMessage(content=analysis_prompt) response = self.llm.invoke([message]) answer = response.content.strip() # Determine confidence level confidence = self._estimate_confidence(len(search_results), answer) print(f"āœ… Answer generated successfully") print(f" Confidence: {confidence}") print(f" Answer length: {len(answer)} characters") result = { 'question': question, 'answer': answer, 'sources_used': len(search_results), 'confidence': confidence, 'search_results': search_results } self.answer_log.append(result) return result except Exception as e: print(f"āŒ Error generating answer: {e}") answer = f"I encountered an error while analyzing the search results. Please try again." result = { 'question': question, 'answer': answer, 'sources_used': len(search_results), 'confidence': 'low', 'error': str(e), 'search_results': search_results } self.answer_log.append(result) return result def _estimate_confidence(self, sources_count: int, answer: str) -> str: """Estimate confidence level of answer""" answer_length = len(answer) # High confidence: multiple sources, substantial answer if sources_count >= 3 and answer_length > 500: return "high" # Medium confidence: some sources, decent answer elif sources_count >= 2 and answer_length > 200: return "medium" # Low confidence: few sources or short answer else: return "low" def get_answer_with_sources( self, question: str, search_results: List[Dict] ) -> Dict: """ Get answer AND properly formatted sources Returns both answer and formatted source citations """ result = self.analyze_and_answer(question, search_results) # Format sources for display formatted_sources = [] for idx, source in enumerate(result['search_results'], 1): formatted_sources.append({ 'index': idx, 'type': source.get('type', 'unknown'), 'content': source.get('content', ''), 'relevance': 1 - source.get('distance', 0) if source.get('distance') else 0 }) result['formatted_sources'] = formatted_sources return result def get_answer_log(self) -> List[Dict]: """Get all answer generation logs""" return self.answer_log def print_answer_with_sources(self, result: Dict, max_source_length: int = 300): """Pretty print answer with sources""" print(f"\n{'='*70}") print(f"ANSWER TO: {result['question']}") print(f"{'='*70}") print(f"\nšŸ“ ANSWER (Confidence: {result['confidence'].upper()}):") print(f"{'-'*70}") print(result['answer']) print(f"{'-'*70}") if result.get('formatted_sources'): print(f"\nšŸ“š SOURCES USED ({len(result['formatted_sources'])} total):") for source in result['formatted_sources']: print(f"\n[Source {source['index']} - {source['type'].upper()} ({source['relevance']:.0%} relevant)]") print(f"{source['content'][:max_source_length]}...") print(f"\n{'='*70}")