""" RAG основной pipeline """ from typing import List, Dict from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, SystemMessage import base64 import os from pathlib import Path from config import ( OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS, LANGUAGE, CHROMA_DB_PATH ) class VisualMultimodalRAG: """ RAG - подготовительный этап: 1. Кодирует изображение в base64 и отправляет в gpt-4o-mini 2. Получает описание изображения 3. Сохраняет описание в векторное хранилище """ def __init__(self, api_key: str = None, debug: bool = True): api_key = api_key or OPENAI_API_KEY self.debug = debug self.llm = ChatOpenAI( model_name=OPENAI_MODEL, api_key=api_key, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) self.language = LANGUAGE self.visual_summaries_log = [] if self.debug: print(f"VisualMultimodalRAG with {OPENAI_MODEL}") def _debug_print(self, label: str, data: any): """Debug""" if self.debug: print(f"\nDEBUG [{label}]:") if isinstance(data, (list, dict)): print(f" Type: {type(data).__name__}") print(f" Content: {str(data)[:300]}...") else: print(f" {data}") def _image_to_base64(self, image_path: str) -> str: """Конвертирует изображение в base64""" try: with open(image_path, 'rb') as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') return image_data except Exception as e: print(f"Error converting image to base64: {e}") return None def analyze_image_visually(self, image_path: str, image_idx: int) -> str: """ Отправляет в модель изображение для суммаризации """ if not os.path.exists(image_path): return f"[Image {image_idx}: File not found - {image_path}]" try: image_base64 = self._image_to_base64(image_path) if not image_base64: return f"[Image {image_idx}: Error converting to base64]" file_ext = Path(image_path).suffix.lower() media_type_map = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' } media_type = media_type_map.get(file_ext, 'image/png') print(f" Analyzing image {image_idx}...") message = HumanMessage( content=[ { "type": "image_url", "image_url": { "url": f"data:{media_type};base64,{image_base64}", }, }, { "type": "text", "text": f"""Ты - ассистент по сбору и обобщению информации. Проанализируй изображение. По результатам анализа предоставь информацию: 1. Что изображено на картинке - основные объекты и элементы 2. Тип данных и содержимое - числа, графики, зависимости. 3. Назначение изображения - для чего оно представлено и что отображает 4. Связь с текстом Будь краток и содержателен. Фокусируйся на визуальной информации. Результат:""" } ], ) response = self.llm.invoke([message]) analysis = response.content.strip() if self.debug: self._debug_print(f"Image {image_idx} Visual Analysis", analysis) print(f" Image {image_idx} analyzed successfully") return analysis except Exception as e: error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]" print(f" Error analyzing image {image_idx}: {e}") return error_msg def analyze_images_visually(self, images: List[Dict]) -> List[Dict]: """ Считывает изображения и отправляет на анализ """ visual_analyses = [] for idx, image in enumerate(images): image_path = image.get('path', '') if not image_path: print(f" Image {idx}: No path") continue visual_analysis = self.analyze_image_visually(image_path, idx) visual_analyses.append({ 'type': 'image_visual', 'image_index': idx, 'image_path': image_path, 'visual_analysis': visual_analysis, 'ocr_text': image.get('ocr_text', '') }) return visual_analyses def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]: """ Отправляет куски текста на суммаризацию """ chunks = [] text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300) self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks") for idx, chunk in enumerate(text_chunks): if len(chunk.strip()) < 50: continue try: prompt = f"""Ты - ассистент по обобщению и суммаризации информации. Проанализируй и суммаризируй следующий кусок текста. Выдели основные моменты, факты и идеи. Будь краток. Текст : {chunk} Результат:""" message = HumanMessage(content=prompt) response = self.llm.invoke([message]) summary = response.content.strip() chunks.append({ 'type': 'text_chunk', 'chunk_index': len(chunks), 'original_text': chunk[:500], 'summary': summary, 'chunk_length': len(chunk) }) if self.debug: self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary) except Exception as e: print(f"Error summarizing text chunk: {e}") return chunks def summarize_tables(self, tables: List[Dict]) -> List[Dict]: """ Отправляет таблицы на суммаризацию """ summaries = [] for idx, table in enumerate(tables): table_content = table.get('content', '') if not table_content or len(table_content.strip()) < 10: continue try: prompt = f"""Ты - ассистент по обобщению и суммаризации информации. Проанализируй и суммаризируй следующию таблицу. Выдели основные моменты, числа, и значения строк/колонок. Будь краток. Таблица: {table_content} Результат:""" message = HumanMessage(content=prompt) response = self.llm.invoke([message]) summary = response.content.strip() summaries.append({ 'type': 'table', 'table_index': idx, 'original_content': table_content[:500], 'summary': summary, 'table_length': len(table_content) }) if self.debug: self._debug_print(f"Table {idx} Summary", summary) except Exception as e: print(f"Error summarizing table {idx}: {e}") return summaries def process_and_store_document( self, text: str, images: List[Dict], tables: List[Dict], vector_store, doc_id: str ) -> Dict: """ Основной pipeline анализирует и сохраняет документы в хранилище """ print(f"PROCESSING ANALYSIS: {doc_id}") results = { 'doc_id': doc_id, 'image_visual_analyses': [], 'text_summaries': [], 'table_summaries': [], 'total_stored': 0 } print(f"\n VISUAL IMAGE ANALYSIS ({len(images)} )") image_analyses = self.analyze_images_visually(images) results['image_visual_analyses'] = image_analyses image_docs = { 'text': ' | '.join([ f"Image {a['image_index']}: {a['visual_analysis']}" for a in image_analyses ]), 'images': [], 'tables': [] } for analysis in image_analyses: print(f" Image {analysis['image_index']}") print(f" Path: {analysis['image_path']}") print(f" Analysis: {analysis['visual_analysis'][:100]}...") if image_analyses: try: vector_store.add_documents( image_docs, f"{doc_id}_images_visual" ) results['total_stored'] += len(image_analyses) print(f" Stored {len(image_analyses)} imagу analyses") except Exception as e: print(f"Error storing image analyses: {e}") print(f"\n TEXT CHUNK SUMMARIZATION") text_summaries = self.summarize_text_chunks(text) results['text_summaries'] = text_summaries text_docs = { 'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}" for s in text_summaries]), 'images': [], 'tables': [] } for summary in text_summaries: print(f" Chunk {summary['chunk_index']}: {summary['summary'][:50]}...") if text_summaries: try: vector_store.add_documents( text_docs, f"{doc_id}_text_chunks" ) results['total_stored'] += len(text_summaries) print(f" Stored {len(text_summaries)} text chunk summaries") except Exception as e: print(f" Error text summaries: {e}") print(f"\n TABLE SUMMARIZATION ({len(tables)}") table_summaries = self.summarize_tables(tables) results['table_summaries'] = table_summaries table_docs = { 'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}" for s in table_summaries]), 'images': [], 'tables': [] } for summary in table_summaries: print(f" Table {summary['table_index']}: {summary['summary'][:50]}...") if table_summaries: try: vector_store.add_documents( table_docs, f"{doc_id}_tables" ) results['total_stored'] += len(table_summaries) print(f" Stored {len(table_summaries)} table summaries") except Exception as e: print(f" Error storing table summaries: {e}") print(f" STORAGE SUMMARY") print(f" Images analyzed: {len(image_analyses)}") print(f" Text chunks summarized: {len(text_summaries)}") print(f" Tables summarized: {len(table_summaries)}") print(f" Total items stored in vector: {results['total_stored']}") self.visual_summaries_log.append(results) return results def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]: chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start = end - overlap return chunks def get_visual_summaries_log(self) -> List[Dict]: return self.visual_summaries_log class AnsweringRAG: """ RAG - работа с ответом на запрос: 1. Поиск в векторном хранилище 2. Анализ результатов 3. Предоставление ответа """ def __init__(self, api_key: str = None, debug: bool = True): api_key = api_key or OPENAI_API_KEY self.debug = debug self.llm = ChatOpenAI( model_name=OPENAI_MODEL, api_key=api_key, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) self.language = LANGUAGE self.answer_log = [] if self.debug: print(" AnsweringRAG initialized") def _debug_print(self, label: str, data: any): """Debug""" if self.debug: print(f"\n🔍 DEBUG [{label}]:") if isinstance(data, (list, dict)): print(f" Type: {type(data).__name__}") print(f" Content: {str(data)[:300]}...") else: print(f" {data}") def analyze_and_answer( self, question: str, search_results: List[Dict] ) -> Dict: """ Проанализируй найденные документов и на основе их предоставь ответ на вопрос пользователя Ответ: { 'question': user question, 'answer': detailed answer, 'sources_used': number of sources, 'confidence': low/medium/high, 'search_results': original search results } """ print(f"ANALYZING QUESTION & GENERATING ANSWER") print(f"\n Question: {question}") print(f" Search Results: {len(search_results)}") if not search_results: print(f" No search results found!") answer = f"""Релевантная информация в документах отсутствует: "{question}" """ result = { 'question': question, 'answer': answer, 'sources_used': 0, 'confidence': 'low', 'search_results': [] } self.answer_log.append(result) return result context_parts = [] for idx, result in enumerate(search_results, 1): content = result.get('content', '') metadata = result.get('metadata', {}) content_type = result.get('type', 'unknown') distance = result.get('distance', 0) relevance = 1 - distance if distance else 0 context_parts.append(f""" [Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})] {content}""") full_context = "\n".join(context_parts) self._debug_print("Context Prepared", f"{len(context_parts)} sources") analysis_prompt = f"""Ты - ассистент по анализу документов и ответов на вопросы по ним. ВОПРОС: "{question}" РЕЛЕВАНТНАЯ ИНФОРМАЦИЯ: {full_context} ИНСТРУКЦИИ: 1. Проанализируй предоставленный контент 2. Выдели информацию имеющую отношение к вопросу 3. Предоставь понятный и исчерпывающий ответ 4. Если контент полностью не отвечает на вопрос предосавь информацию которая доступна в контенте 5. Построй свой ответ опираясь на ключевые моменты Ответ:""" print(f"\n Analyzing search results...") print(f" Context size: {len(full_context)} chars") print(f" Sources: {len(search_results)}") try: message = HumanMessage(content=analysis_prompt) response = self.llm.invoke([message]) answer = response.content.strip() confidence = self._estimate_confidence(len(search_results), answer) print(f" Answer generated successfully") print(f" Confidence: {confidence}") print(f" Answer length: {len(answer)} chars") result = { 'question': question, 'answer': answer, 'sources_used': len(search_results), 'confidence': confidence, 'search_results': search_results } self.answer_log.append(result) return result except Exception as e: print(f" Error generating answer: {e}") answer = f"Error while analyzing the search results." result = { 'question': question, 'answer': answer, 'sources_used': len(search_results), 'confidence': 'low', 'error': str(e), 'search_results': search_results } self.answer_log.append(result) return result def _estimate_confidence(self, sources_count: int, answer: str) -> str: """Уверенность в ответе на основании найденных источников информации""" answer_length = len(answer) if sources_count >= 3 and answer_length > 500: return "high" elif sources_count >= 2 and answer_length > 200: return "medium" else: return "low"