from typing import List, Dict from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage import base64 import os from pathlib import Path from config import ( OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS, LANGUAGE, CHROMA_DB_PATH ) class VisualMultimodalRAG: def __init__(self, api_key: str = None, debug: bool = True): api_key = api_key or OPENAI_API_KEY self.debug = debug self.llm = ChatOpenAI( model_name="gpt-4o-mini", api_key=api_key, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) self.language = LANGUAGE self.visual_summaries_log = [] if self.debug: print("āœ… VisualMultimodalRAG initialized") def _debug_print(self, label: str, data: any): if self.debug: print(f"\nšŸ” DEBUG [{label}]:") if isinstance(data, (list, dict)): print(f" Type: {type(data).__name__}") print(f" Content: {str(data)[:300]}...") else: print(f" {data}") def _image_to_base64(self, image_path: str) -> str: try: with open(image_path, 'rb') as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') return image_data except Exception as e: print(f"Error converting image to base64: {e}") return None def analyze_image_visually(self, image_path: str, image_idx: int) -> str: if not os.path.exists(image_path): return f"[Image {image_idx}: File not found - {image_path}]" try: image_base64 = self._image_to_base64(image_path) if not image_base64: return f"[Image {image_idx}: Could not convert to base64]" file_ext = Path(image_path).suffix.lower() media_type_map = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' } media_type = media_type_map.get(file_ext, 'image/png') print(f"šŸ” Analyzing image {image_idx} visually (as {media_type})...") # Create message with image message = HumanMessage( content=[ { "type": "image_url", "image_url": { "url": f"data:{media_type};base64,{image_base64}", }, }, { "type": "text", "text": f"""You are assistant for analyzing and aggregating information. Analyze this image. Provide a visual analysis that includes: 1. Main objects and element 2. Data/Content - Any numbers, text, charts, graphs 3. What this image is showing or representing 4. Important patterns, trends, or information 5. How image relates to document content Be brief and meaningful. Focus on visual information that cannot be extracted from text. Response on {self.language}. Analysis:""" } ], ) response = self.llm.invoke([message]) analysis = response.content.strip() if self.debug: self._debug_print(f"Image {image_idx} Visual Analysis", analysis) print(f"āœ… Image {image_idx} analyzed successfully") return analysis except Exception as e: error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]" print(f"āŒ Error analyzing image {image_idx}: {e}") return error_msg def analyze_images_visually(self, images: List[Dict]) -> List[Dict]: visual_analyses = [] for idx, image in enumerate(images): image_path = image.get('path', '') if not image_path: print(f"āš ļø Image {idx}: No path provided") continue visual_analysis = self.analyze_image_visually(image_path, idx) visual_analyses.append({ 'type': 'image_visual', 'image_index': idx, 'image_path': image_path, 'visual_analysis': visual_analysis, 'ocr_text': image.get('ocr_text', '') # Keep OCR as backup }) return visual_analyses def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]: chunks = [] text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300) self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks") for idx, chunk in enumerate(text_chunks): if len(chunk.strip()) < 50: continue try: prompt = f"""Summarize this text chunk in {self.language}. Be brief and meaningful. Extract key points, facts, and main ideas. Text Chunk: {chunk} Summary:""" message = HumanMessage(content=prompt) response = self.llm.invoke([message]) summary = response.content.strip() chunks.append({ 'type': 'text_chunk', 'chunk_index': len(chunks), 'original_text': chunk[:500], 'summary': summary, 'chunk_length': len(chunk) }) if self.debug: self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary) except Exception as e: print(f"Error summarizing text chunk: {e}") return chunks def summarize_tables(self, tables: List[Dict]) -> List[Dict]: summaries = [] for idx, table in enumerate(tables): table_content = table.get('content', '') if not table_content or len(table_content.strip()) < 10: continue try: prompt = f"""Analyze and summarize this table/structured data in {self.language}. Extract key insights, row/column meanings, and important figures. Be brief and meaningful. Table Content: {table_content} Summary:""" message = HumanMessage(content=prompt) response = self.llm.invoke([message]) summary = response.content.strip() summaries.append({ 'type': 'table', 'table_index': idx, 'original_content': table_content[:500], 'summary': summary, 'table_length': len(table_content) }) if self.debug: self._debug_print(f"Table {idx} Summary", summary) except Exception as e: print(f"Error summarizing table {idx}: {e}") return summaries def process_and_store_document( self, text: str, images: List[Dict], tables: List[Dict], vector_store, doc_id: str ) -> Dict: print(f"\n{'='*70}") print(f"PROCESSING WITH VISUAL IMAGE ANALYSIS: {doc_id}") print(f"{'='*70}") results = { 'doc_id': doc_id, 'image_visual_analyses': [], 'text_summaries': [], 'table_summaries': [], 'total_stored': 0 } print(f"\nšŸ–¼ļø VISUAL IMAGE ANALYSIS (gpt-4o vision) ({len(images)} total)") print(f"{'─'*70}") image_analyses = self.analyze_images_visually(images) results['image_visual_analyses'] = image_analyses image_docs = { 'text': ' | '.join([ f"Image {a['image_index']}: {a['visual_analysis']}" for a in image_analyses ]), 'images': [], 'tables': [] } for analysis in image_analyses: print(f" āœ… Image {analysis['image_index']} (visual analysis)") print(f" Path: {analysis['image_path']}") print(f" Analysis: {analysis['visual_analysis'][:100]}...") if image_analyses: try: vector_store.add_documents( image_docs, f"{doc_id}_images_visual" ) results['total_stored'] += len(image_analyses) print(f"āœ… Stored {len(image_analyses)} image visual analyses") except Exception as e: print(f"āŒ Error storing image analyses: {e}") print(f"\nšŸ“ TEXT CHUNK SUMMARIZATION") print(f"{'─'*70}") text_summaries = self.summarize_text_chunks(text) results['text_summaries'] = text_summaries text_docs = { 'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}" for s in text_summaries]), 'images': [], 'tables': [] } for summary in text_summaries: print(f" āœ… Chunk {summary['chunk_index']}: {summary['summary'][:50]}...") if text_summaries: try: vector_store.add_documents( text_docs, f"{doc_id}_text_chunks" ) results['total_stored'] += len(text_summaries) print(f"āœ… Stored {len(text_summaries)} text chunk summaries") except Exception as e: print(f"āŒ Error storing text summaries: {e}") print(f"\nšŸ“‹ TABLE SUMMARIZATION ({len(tables)} total)") print(f"{'─'*70}") table_summaries = self.summarize_tables(tables) results['table_summaries'] = table_summaries table_docs = { 'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}" for s in table_summaries]), 'images': [], 'tables': [] } for summary in table_summaries: print(f" āœ… Table {summary['table_index']}: {summary['summary'][:50]}...") if table_summaries: try: vector_store.add_documents( table_docs, f"{doc_id}_tables" ) results['total_stored'] += len(table_summaries) print(f"āœ… Stored {len(table_summaries)} table summaries") except Exception as e: print(f"āŒ Error storing table summaries: {e}") print(f"\n{'='*70}") print(f"šŸ“Š STORAGE SUMMARY") print(f"{'='*70}") print(f" Images analyzed visually & stored: {len(image_analyses)}") print(f" Text chunks summarized & stored: {len(text_summaries)}") print(f" Tables summarized & stored: {len(table_summaries)}") print(f" Total items stored in vector: {results['total_stored']}") print(f"{'='*70}") self.visual_summaries_log.append(results) return results def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]: chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start = end - overlap return chunks def get_visual_summaries_log(self) -> List[Dict]: return self.visual_summaries_log class AnsweringRAG: def __init__(self, api_key: str = None, debug: bool = True): api_key = api_key or OPENAI_API_KEY self.debug = debug self.llm = ChatOpenAI( model_name="gpt-4o-mini", api_key=api_key, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) self.language = LANGUAGE self.answer_log = [] if self.debug: print("āœ… AnsweringRAG initialized") def _debug_print(self, label: str, data: any): if self.debug: print(f"\nšŸ” DEBUG [{label}]:") if isinstance(data, (list, dict)): print(f" Type: {type(data).__name__}") print(f" Content: {str(data)[:300]}...") else: print(f" {data}") def analyze_and_answer( self, question: str, search_results: List[Dict] ) -> Dict: print(f"\n{'='*70}") print(f"ANALYZING QUESTION & GENERATING ANSWER") print(f"{'='*70}") print(f"\nā“ Question: {question}") print(f"šŸ“Š Search Results Found: {len(search_results)}") if not search_results: print(f"āš ļø No search results found!") answer = f"""No relevant information in the document to answer question: "{question}" """ result = { 'question': question, 'answer': answer, 'sources_used': 0, 'confidence': 'low', 'search_results': [] } self.answer_log.append(result) return result context_parts = [] for idx, result in enumerate(search_results, 1): content = result.get('content', '') metadata = result.get('metadata', {}) content_type = result.get('type', 'unknown') distance = result.get('distance', 0) relevance = 1 - distance if distance else 0 context_parts.append(f""" [Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})] {content}""") full_context = "\n".join(context_parts) self._debug_print("Context Prepared", f"{len(context_parts)} sources, {len(full_context)} chars") analysis_prompt = f"""You are a helpful assistant analyzing document content to answer user questions. USER QUESTION: {question} RELEVANT CONTENT FROM DOCUMENT: {full_context} INSTRUCTIONS: 1. Analyze the provided content carefully 2. Extract information relevant to the question 3. Synthesize a clear, comprehensive answer in {self.language} 4. If the content doesn't fully answer the question, explain what information is available 5. Be specific and cite the content when relevant 6. Structure your answer clearly with key points ANSWER:""" print(f"\nšŸ” Analyzing search results...") print(f" Context size: {len(full_context)} characters") print(f" Sources: {len(search_results)}") try: message = HumanMessage(content=analysis_prompt) response = self.llm.invoke([message]) answer = response.content.strip() confidence = self._estimate_confidence(len(search_results), answer) print(f"āœ… Answer generated successfully") print(f" Confidence: {confidence}") print(f" Answer length: {len(answer)} characters") result = { 'question': question, 'answer': answer, 'sources_used': len(search_results), 'confidence': confidence, 'search_results': search_results } self.answer_log.append(result) return result except Exception as e: print(f"āŒ Error generating answer: {e}") answer = f"I encountered an error while analyzing the search results. Please try again." result = { 'question': question, 'answer': answer, 'sources_used': len(search_results), 'confidence': 'low', 'error': str(e), 'search_results': search_results } self.answer_log.append(result) return result def _estimate_confidence(self, sources_count: int, answer: str) -> str: answer_length = len(answer) if sources_count >= 3 and answer_length > 500: return "high" elif sources_count >= 2 and answer_length > 200: return "medium" else: return "low" def get_answer_with_sources( self, question: str, search_results: List[Dict] ) -> Dict: result = self.analyze_and_answer(question, search_results) formatted_sources = [] for idx, source in enumerate(result['search_results'], 1): formatted_sources.append({ 'index': idx, 'type': source.get('type', 'unknown'), 'content': source.get('content', ''), 'relevance': 1 - source.get('distance', 0) if source.get('distance') else 0 }) result['formatted_sources'] = formatted_sources return result def get_answer_log(self) -> List[Dict]: return self.answer_log def print_answer_with_sources(self, result: Dict, max_source_length: int = 300): print(f"\n{'='*70}") print(f"ANSWER TO: {result['question']}") print(f"{'='*70}") print(f"\nšŸ“ ANSWER (Confidence: {result['confidence'].upper()}):") print(f"{'-'*70}") print(result['answer']) print(f"{'-'*70}") if result.get('formatted_sources'): print(f"\nšŸ“š SOURCES USED ({len(result['formatted_sources'])} total):") for source in result['formatted_sources']: print(f"\n[Source {source['index']} - {source['type'].upper()} ({source['relevance']:.0%} relevant)]") print(f"{source['content'][:max_source_length]}...") print(f"\n{'='*70}")