Spaces:
Sleeping
Sleeping
| """ | |
| RAG основной pipeline | |
| """ | |
| from typing import List, Dict | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| import base64 | |
| import os | |
| from pathlib import Path | |
| from config import ( | |
| OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS, | |
| LANGUAGE, CHROMA_DB_PATH | |
| ) | |
| class VisualMultimodalRAG: | |
| """ | |
| RAG - подготовительный этап: | |
| 1. Кодирует изображение в base64 и отправляет в gpt-4o-mini | |
| 2. Получает описание изображения | |
| 3. Сохраняет описание в векторное хранилище | |
| """ | |
| def __init__(self, api_key: str = None, debug: bool = True): | |
| api_key = api_key or OPENAI_API_KEY | |
| self.debug = debug | |
| self.llm = ChatOpenAI( | |
| model_name=OPENAI_MODEL, | |
| api_key=api_key, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| self.language = LANGUAGE | |
| self.visual_summaries_log = [] | |
| if self.debug: | |
| print(f"VisualMultimodalRAG with {OPENAI_MODEL}") | |
| def _debug_print(self, label: str, data: any): | |
| """Debug""" | |
| if self.debug: | |
| print(f"\nDEBUG [{label}]:") | |
| if isinstance(data, (list, dict)): | |
| print(f" Type: {type(data).__name__}") | |
| print(f" Content: {str(data)[:300]}...") | |
| else: | |
| print(f" {data}") | |
| def _image_to_base64(self, image_path: str) -> str: | |
| """Конвертирует изображение в base64""" | |
| try: | |
| with open(image_path, 'rb') as image_file: | |
| image_data = base64.b64encode(image_file.read()).decode('utf-8') | |
| return image_data | |
| except Exception as e: | |
| print(f"Error converting image to base64: {e}") | |
| return None | |
| def analyze_image_visually(self, image_path: str, image_idx: int) -> str: | |
| """ | |
| Отправляет в модель изображение для суммаризации | |
| """ | |
| if not os.path.exists(image_path): | |
| return f"[Image {image_idx}: File not found - {image_path}]" | |
| try: | |
| image_base64 = self._image_to_base64(image_path) | |
| if not image_base64: | |
| return f"[Image {image_idx}: Error converting to base64]" | |
| file_ext = Path(image_path).suffix.lower() | |
| media_type_map = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.webp': 'image/webp' | |
| } | |
| media_type = media_type_map.get(file_ext, 'image/png') | |
| print(f" Analyzing image {image_idx}...") | |
| message = HumanMessage( | |
| content=[ | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:{media_type};base64,{image_base64}", | |
| }, | |
| }, | |
| { | |
| "type": "text", | |
| "text": f"""Ты - ассистент по сбору и обобщению информации. Проанализируй изображение. | |
| По результатам анализа предоставь информацию: | |
| 1. Что изображено на картинке - основные объекты и элементы | |
| 2. Тип данных и содержимое - числа, графики, зависимости. | |
| 3. Назначение изображения - для чего оно представлено и что отображает | |
| 4. Связь с текстом | |
| Будь краток и содержателен. Фокусируйся на визуальной информации. | |
| Результат:""" | |
| } | |
| ], | |
| ) | |
| response = self.llm.invoke([message]) | |
| analysis = response.content.strip() | |
| if self.debug: | |
| self._debug_print(f"Image {image_idx} Visual Analysis", analysis) | |
| print(f" Image {image_idx} analyzed successfully") | |
| return analysis | |
| except Exception as e: | |
| error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]" | |
| print(f" Error analyzing image {image_idx}: {e}") | |
| return error_msg | |
| def analyze_images_visually(self, images: List[Dict]) -> List[Dict]: | |
| """ | |
| Считывает изображения и отправляет на анализ | |
| """ | |
| visual_analyses = [] | |
| for idx, image in enumerate(images): | |
| image_path = image.get('path', '') | |
| if not image_path: | |
| print(f" Image {idx}: No path") | |
| continue | |
| visual_analysis = self.analyze_image_visually(image_path, idx) | |
| visual_analyses.append({ | |
| 'type': 'image_visual', | |
| 'image_index': idx, | |
| 'image_path': image_path, | |
| 'visual_analysis': visual_analysis, | |
| 'ocr_text': image.get('ocr_text', '') | |
| }) | |
| return visual_analyses | |
| def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]: | |
| """ | |
| Отправляет куски текста на суммаризацию | |
| """ | |
| chunks = [] | |
| text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300) | |
| self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks") | |
| for idx, chunk in enumerate(text_chunks): | |
| if len(chunk.strip()) < 50: | |
| continue | |
| try: | |
| prompt = f"""Ты - ассистент по обобщению и суммаризации информации. Проанализируй и суммаризируй следующий кусок текста. | |
| Выдели основные моменты, факты и идеи. Будь краток. | |
| Текст : | |
| {chunk} | |
| Результат:""" | |
| message = HumanMessage(content=prompt) | |
| response = self.llm.invoke([message]) | |
| summary = response.content.strip() | |
| chunks.append({ | |
| 'type': 'text_chunk', | |
| 'chunk_index': len(chunks), | |
| 'original_text': chunk[:500], | |
| 'summary': summary, | |
| 'chunk_length': len(chunk) | |
| }) | |
| if self.debug: | |
| self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary) | |
| except Exception as e: | |
| print(f"Error summarizing text chunk: {e}") | |
| return chunks | |
| def summarize_tables(self, tables: List[Dict]) -> List[Dict]: | |
| """ | |
| Отправляет таблицы на суммаризацию | |
| """ | |
| summaries = [] | |
| for idx, table in enumerate(tables): | |
| table_content = table.get('content', '') | |
| if not table_content or len(table_content.strip()) < 10: | |
| continue | |
| try: | |
| prompt = f"""Ты - ассистент по обобщению и суммаризации информации. Проанализируй и суммаризируй следующию таблицу. | |
| Выдели основные моменты, числа, и значения строк/колонок. Будь краток. | |
| Таблица: | |
| {table_content} | |
| Результат:""" | |
| message = HumanMessage(content=prompt) | |
| response = self.llm.invoke([message]) | |
| summary = response.content.strip() | |
| summaries.append({ | |
| 'type': 'table', | |
| 'table_index': idx, | |
| 'original_content': table_content[:500], | |
| 'summary': summary, | |
| 'table_length': len(table_content) | |
| }) | |
| if self.debug: | |
| self._debug_print(f"Table {idx} Summary", summary) | |
| except Exception as e: | |
| print(f"Error summarizing table {idx}: {e}") | |
| return summaries | |
| def process_and_store_document( | |
| self, | |
| text: str, | |
| images: List[Dict], | |
| tables: List[Dict], | |
| vector_store, | |
| doc_id: str | |
| ) -> Dict: | |
| """ | |
| Основной pipeline анализирует и сохраняет документы в хранилище | |
| """ | |
| print(f"PROCESSING ANALYSIS: {doc_id}") | |
| results = { | |
| 'doc_id': doc_id, | |
| 'image_visual_analyses': [], | |
| 'text_summaries': [], | |
| 'table_summaries': [], | |
| 'total_stored': 0 | |
| } | |
| print(f"\n VISUAL IMAGE ANALYSIS ({len(images)} )") | |
| image_analyses = self.analyze_images_visually(images) | |
| results['image_visual_analyses'] = image_analyses | |
| image_docs = { | |
| 'text': ' | '.join([ | |
| f"Image {a['image_index']}: {a['visual_analysis']}" | |
| for a in image_analyses | |
| ]), | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| for analysis in image_analyses: | |
| print(f" Image {analysis['image_index']}") | |
| print(f" Path: {analysis['image_path']}") | |
| print(f" Analysis: {analysis['visual_analysis'][:100]}...") | |
| if image_analyses: | |
| try: | |
| vector_store.add_documents( | |
| image_docs, | |
| f"{doc_id}_images_visual" | |
| ) | |
| results['total_stored'] += len(image_analyses) | |
| print(f" Stored {len(image_analyses)} imagу analyses") | |
| except Exception as e: | |
| print(f"Error storing image analyses: {e}") | |
| print(f"\n TEXT CHUNK SUMMARIZATION") | |
| text_summaries = self.summarize_text_chunks(text) | |
| results['text_summaries'] = text_summaries | |
| text_docs = { | |
| 'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}" | |
| for s in text_summaries]), | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| for summary in text_summaries: | |
| print(f" Chunk {summary['chunk_index']}: {summary['summary'][:50]}...") | |
| if text_summaries: | |
| try: | |
| vector_store.add_documents( | |
| text_docs, | |
| f"{doc_id}_text_chunks" | |
| ) | |
| results['total_stored'] += len(text_summaries) | |
| print(f" Stored {len(text_summaries)} text chunk summaries") | |
| except Exception as e: | |
| print(f" Error text summaries: {e}") | |
| print(f"\n TABLE SUMMARIZATION ({len(tables)}") | |
| table_summaries = self.summarize_tables(tables) | |
| results['table_summaries'] = table_summaries | |
| table_docs = { | |
| 'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}" | |
| for s in table_summaries]), | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| for summary in table_summaries: | |
| print(f" Table {summary['table_index']}: {summary['summary'][:50]}...") | |
| if table_summaries: | |
| try: | |
| vector_store.add_documents( | |
| table_docs, | |
| f"{doc_id}_tables" | |
| ) | |
| results['total_stored'] += len(table_summaries) | |
| print(f" Stored {len(table_summaries)} table summaries") | |
| except Exception as e: | |
| print(f" Error storing table summaries: {e}") | |
| print(f" STORAGE SUMMARY") | |
| print(f" Images analyzed: {len(image_analyses)}") | |
| print(f" Text chunks summarized: {len(text_summaries)}") | |
| print(f" Tables summarized: {len(table_summaries)}") | |
| print(f" Total items stored in vector: {results['total_stored']}") | |
| self.visual_summaries_log.append(results) | |
| return results | |
| def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]: | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunks.append(text[start:end]) | |
| start = end - overlap | |
| return chunks | |
| def get_visual_summaries_log(self) -> List[Dict]: | |
| return self.visual_summaries_log | |
| class AnsweringRAG: | |
| """ | |
| RAG - работа с ответом на запрос: | |
| 1. Поиск в векторном хранилище | |
| 2. Анализ результатов | |
| 3. Предоставление ответа | |
| """ | |
| def __init__(self, api_key: str = None, debug: bool = True): | |
| api_key = api_key or OPENAI_API_KEY | |
| self.debug = debug | |
| self.llm = ChatOpenAI( | |
| model_name=OPENAI_MODEL, | |
| api_key=api_key, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| self.language = LANGUAGE | |
| self.answer_log = [] | |
| if self.debug: | |
| print(" AnsweringRAG initialized") | |
| def _debug_print(self, label: str, data: any): | |
| """Debug""" | |
| if self.debug: | |
| print(f"\n🔍 DEBUG [{label}]:") | |
| if isinstance(data, (list, dict)): | |
| print(f" Type: {type(data).__name__}") | |
| print(f" Content: {str(data)[:300]}...") | |
| else: | |
| print(f" {data}") | |
| def analyze_and_answer( | |
| self, | |
| question: str, | |
| search_results: List[Dict] | |
| ) -> Dict: | |
| """ | |
| Проанализируй найденные документов и на основе их предоставь ответ на вопрос пользователя | |
| Ответ: | |
| { | |
| 'question': user question, | |
| 'answer': detailed answer, | |
| 'sources_used': number of sources, | |
| 'confidence': low/medium/high, | |
| 'search_results': original search results | |
| } | |
| """ | |
| print(f"ANALYZING QUESTION & GENERATING ANSWER") | |
| print(f"\n Question: {question}") | |
| print(f" Search Results: {len(search_results)}") | |
| if not search_results: | |
| print(f" No search results found!") | |
| answer = f"""Релевантная информация в документах отсутствует: "{question}" | |
| """ | |
| result = { | |
| 'question': question, | |
| 'answer': answer, | |
| 'sources_used': 0, | |
| 'confidence': 'low', | |
| 'search_results': [] | |
| } | |
| self.answer_log.append(result) | |
| return result | |
| context_parts = [] | |
| for idx, result in enumerate(search_results, 1): | |
| content = result.get('content', '') | |
| metadata = result.get('metadata', {}) | |
| content_type = result.get('type', 'unknown') | |
| distance = result.get('distance', 0) | |
| relevance = 1 - distance if distance else 0 | |
| context_parts.append(f""" | |
| [Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})] | |
| {content}""") | |
| full_context = "\n".join(context_parts) | |
| self._debug_print("Context Prepared", f"{len(context_parts)} sources") | |
| analysis_prompt = f"""Ты - ассистент по анализу документов и ответов на вопросы по ним. | |
| ВОПРОС: | |
| "{question}" | |
| РЕЛЕВАНТНАЯ ИНФОРМАЦИЯ: | |
| {full_context} | |
| ИНСТРУКЦИИ: | |
| 1. Проанализируй предоставленный контент | |
| 2. Выдели информацию имеющую отношение к вопросу | |
| 3. Предоставь понятный и исчерпывающий ответ | |
| 4. Если контент полностью не отвечает на вопрос предосавь информацию которая доступна в контенте | |
| 5. Построй свой ответ опираясь на ключевые моменты | |
| Ответ:""" | |
| print(f"\n Analyzing search results...") | |
| print(f" Context size: {len(full_context)} chars") | |
| print(f" Sources: {len(search_results)}") | |
| try: | |
| message = HumanMessage(content=analysis_prompt) | |
| response = self.llm.invoke([message]) | |
| answer = response.content.strip() | |
| confidence = self._estimate_confidence(len(search_results), answer) | |
| print(f" Answer generated successfully") | |
| print(f" Confidence: {confidence}") | |
| print(f" Answer length: {len(answer)} chars") | |
| result = { | |
| 'question': question, | |
| 'answer': answer, | |
| 'sources_used': len(search_results), | |
| 'confidence': confidence, | |
| 'search_results': search_results | |
| } | |
| self.answer_log.append(result) | |
| return result | |
| except Exception as e: | |
| print(f" Error generating answer: {e}") | |
| answer = f"Error while analyzing the search results." | |
| result = { | |
| 'question': question, | |
| 'answer': answer, | |
| 'sources_used': len(search_results), | |
| 'confidence': 'low', | |
| 'error': str(e), | |
| 'search_results': search_results | |
| } | |
| self.answer_log.append(result) | |
| return result | |
| def _estimate_confidence(self, sources_count: int, answer: str) -> str: | |
| """Уверенность в ответе на основании найденных источников информации""" | |
| answer_length = len(answer) | |
| if sources_count >= 3 and answer_length > 500: | |
| return "high" | |
| elif sources_count >= 2 and answer_length > 200: | |
| return "medium" | |
| else: | |
| return "low" | |