import os import json import pickle import tempfile from pathlib import Path from typing import Optional, Dict, Any, List, Tuple import traceback import re from datetime import datetime try: from pydantic import BaseModel, Field HAS_PYDANTIC = True except ImportError: HAS_PYDANTIC = False print("⚠️ Pydantic не установлен, структурированный вывод недоступен") try: import numpy as np import faiss HAS_FAISS = True except ImportError: HAS_FAISS = False print("⚠️ FAISS не установлен, будет использован поиск по ключевым словам") try: import gradio as gr HAS_GRADIO = True except ImportError: HAS_GRADIO = False print("⚠️ Gradio не установлен") from openai import OpenAI # Pydantic модели для структурированного вывода if HAS_PYDANTIC: class SourceInfo(BaseModel): """Информация об источнике""" page: int = Field(description="Номер страницы в отчете") relevance_score: float = Field(description="Оценка релевантности от 0 до 1") content_preview: str = Field(description="Краткое описание содержимого") class ThinkingProcess(BaseModel): """Процесс рассуждений (Chain-of-Thought)""" question_analysis: str = Field(description="Анализ вопроса пользователя") information_found: str = Field(description="Найденная в источниках информация") reasoning_steps: List[str] = Field(description="Шаги логических рассуждений") conclusion: str = Field(description="Выводы на основе анализа") class FinancialAnswer(BaseModel): """Структурированный ответ по финансовой отчетности""" thinking: ThinkingProcess = Field(description="Процесс рассуждений") answer: str = Field(description="Основной ответ на вопрос") confidence: float = Field(description="Уверенность в ответе от 0 до 1") sources: List[SourceInfo] = Field(description="Использованные источники") key_metrics: Optional[Dict[str, Any]] = Field(description="Ключевые числовые показатели", default=None) timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) class VectorRAGSystem: """RAG система с векторным поиском и резервным режимом""" def __init__(self): self.chunks = [] self.word_index = {} self.faiss_index = None self.metadata = {} self.client = None self.is_initialized = False self.pdf_doc = None self.page_texts = {} # Кеш текстов страниц # Модели и параметры self.embedding_model = "text-embedding-3-large" self.embedding_dim = 3072 self.generation_model = "gpt-4o" self.reranking_model = "gpt-4o-mini" # Параметры поиска self.max_chunks_for_rerank = 15 self.final_chunks_count = 5 self.vector_search_k = 20 # Режим работы self.vector_mode = HAS_FAISS def initialize_with_api_key(self, api_key: str) -> Tuple[str, str]: """Инициализация системы с API ключом""" try: if not api_key.strip(): return "❌ Введите OpenAI API ключ", "" # Инициализация OpenAI клиента self.client = OpenAI(api_key=api_key.strip()) # Загрузка данных if not self.load_data(): return "❌ Ошибка загрузки данных", "" self.is_initialized = True stats = self._generate_stats() return "✅ Векторная RAG система инициализирована", stats except Exception as e: return f"❌ Ошибка инициализации: {str(e)}", "" def load_data(self) -> bool: """Загрузка векторных данных""" try: # Загружаем только векторные данные if self.vector_mode and self.load_vector_data(): return True print("❌ Векторные данные не найдены или не удалось загрузить") return False except Exception as e: print(f"❌ Ошибка загрузки данных: {e}") return False def load_vector_data(self) -> bool: """Загрузка векторных данных""" try: print("🔄 Попытка загрузки векторных данных...") # Файлы векторных данных faiss_file = "chunks_flatip.faiss" metadata_file = "metadata.json" if not all(os.path.exists(f) for f in [faiss_file, metadata_file]): print("📁 Файлы векторных данных не найдены") return False # Загружаем метаданные with open(metadata_file, 'r', encoding='utf-8') as f: metadata_list = json.load(f) # Собираем self.chunks для всех типов элементов self.chunks = [] for i, item in enumerate(metadata_list): # унифицируем идентификатор чанка chunk_id = item.get("chunk_id", item.get("table_id", item.get("img_id", None))) self.chunks.append({ "page": item["page"], "chunk_id": chunk_id, "chunk_index": i, "text": "", # подгрузим из PDF при выдаче "metadata": {} }) # Сохраняем общую статистику self.metadata = {"total_chunks": len(self.chunks)} # Загружаем FAISS-индекс if HAS_FAISS: self.faiss_index = faiss.read_index(faiss_file) # Загружаем PDF для parent-page enrichment pdf_path = "data/Сбер 2023.pdf" if os.path.exists(pdf_path): import fitz # PyMuPDF self.pdf_doc = fitz.open(pdf_path) print(f"✅ PDF загружен: {self.pdf_doc.page_count} страниц") else: print("❌ PDF файл не найден для parent-page enrichment") self.pdf_doc = None print(f"✅ Загружены векторные данные: {len(self.chunks)} чанков") return True except Exception as e: print(f"❌ Ошибка загрузки векторных данных: {e}") return False def get_page_text(self, page_num: int) -> str: """Получение полного текста страницы с кешированием""" if page_num in self.page_texts: return self.page_texts[page_num] try: if not self.pdf_doc or page_num < 1 or page_num > self.pdf_doc.page_count: return "" page = self.pdf_doc[page_num - 1] # PyMuPDF использует 0-based индексы text = page.get_text() # Кешируем текст self.page_texts[page_num] = text return text except Exception as e: print(f"❌ Ошибка получения текста страницы {page_num}: {e}") return "" def _generate_stats(self) -> str: """Генерация статистики системы""" total_chunks = len(self.chunks) mode = "Векторный поиск" if self.vector_mode and self.faiss_index else "Базовый режим" structured_output = "✅ Pydantic" if HAS_PYDANTIC else "❌ Недоступно" pdf_enrichment = "✅ Активен" if self.pdf_doc else "❌ Недоступен" stats = f"""🧠 **Advanced RAG система с Chain-of-Thought готова!** 📊 **Технические характеристики:** - 📦 Векторных эмбеддингов: {total_chunks} - 🔍 Режим поиска: {mode} - 🧠 Модель генерации: {self.generation_model} - 🎯 LLM реранкинг: {self.reranking_model} - 📄 Parent-page enrichment: {pdf_enrichment} - 📋 Структурированный вывод: {structured_output} 🚀 **Архитектурные особенности:** - 📚 **Предобработка** PDF файла (текст и таблицы) через pdfplumber - 🔎 **Векторный поиск** с text-embedding-3-large - 📄 **Parent-page enrichment** через PyMuPDF - 🧠 **LLM реранкинг** для повышения релевантности - 🤔 **Chain-of-Thought** рассуждения - 📋 **JSON Schema** для структурированных ответов - 📊 **Confidence scoring** и детальная аналитика 💡 **Готова к интеллектуальному анализу отчета ПАО Сбербанк 2023!**""" return stats def search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]: """Основной метод поиска""" if self.vector_mode and self.faiss_index and self.client: return self.vector_search(query, k) else: print("⚠️ Векторный режим отключен") return [] def vector_search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]: """Векторный поиск по запросу""" if not self.faiss_index or not self.client: print("⚠️ FAISS индекс или OpenAI клиент недоступны") return [] try: # Создаем эмбеддинг для запроса response = self.client.embeddings.create( model=self.embedding_model, input=[query] ) query_embedding = np.array(response.data[0].embedding, dtype=np.float32) query_embedding = query_embedding.reshape(1, -1) # Нормализуем для Inner Product faiss.normalize_L2(query_embedding) # Поиск в FAISS индексе scores, indices = self.faiss_index.search(query_embedding, k) # Формируем результаты с parent-page enrichment results = [] for score, idx in zip(scores[0], indices[0]): if 0 <= idx < len(self.chunks): chunk = self.chunks[idx].copy() # Получаем полный текст страницы для parent-page enrichment page_text = self.get_page_text(chunk["page"]) chunk["text"] = page_text if page_text else chunk["text"] results.append((chunk, float(score))) return results except Exception as e: print(f"❌ Ошибка векторного поиска: {e}") print("⚠️ Переход на поиск без векторов невозможен") return [] def rerank_with_llm(self, query: str, chunks: List[Tuple[Dict, float]]) -> List[Tuple[Dict, float]]: """LLM реранкинг результатов""" if not chunks or not self.client: return chunks try: chunks_to_rerank = chunks[:self.max_chunks_for_rerank] docs_text = "" for i, (chunk, _) in enumerate(chunks_to_rerank): preview = chunk['text'][:300] + "..." if len(chunk['text']) > 300 else chunk['text'] docs_text += f"\nДокумент {i+1} (стр. {chunk['page']}):\n{preview}\n" prompt = f"""Оцени релевантность каждого документа для ответа на вопрос по шкале 1-10. Вопрос: {query} Документы:{docs_text} Инструкции: 1. Оценивай точность и полноту информации для ответа 2. Высшие баллы (8-10) - прямой ответ на вопрос 3. Средние баллы (5-7) - частично релевантная информация 4. Низкие баллы (1-4) - слабо связано с вопросом Верни только числа через запятую (например: 8,6,9,4,7):""" response = self.client.chat.completions.create( model=self.reranking_model, messages=[{"role": "user", "content": prompt}], max_tokens=100, temperature=0 ) scores_text = response.choices[0].message.content.strip() numbers = re.findall(r'\d+\.?\d*', scores_text) scores = [max(0, min(10, float(num))) for num in numbers] reranked = [] for i, (chunk, original_score) in enumerate(chunks): rerank_score = scores[i] if i < len(scores) else 0 reranked.append((chunk, rerank_score)) reranked.sort(key=lambda x: x[1], reverse=True) return reranked except Exception as e: print(f"❌ Ошибка реранкинга: {e}") return chunks def generate_answer(self, query: str, context_chunks: List[Tuple[Dict, float]]) -> str: """Генерация ответа с Chain-of-Thought и структурированным выводом""" if not self.client: return "❌ OpenAI API не настроен" try: # Подготавливаем контекст с метаинформацией context_parts = [] sources_info = [] for i, (chunk, score) in enumerate(context_chunks[:self.final_chunks_count]): text = chunk.get('text', '') clean_text = text.encode('utf-8', errors='ignore').decode('utf-8') # Ограничиваем длину для лучшей обработки if len(clean_text) > 1500: clean_text = clean_text[:1500] + "..." context_parts.append(f"Источник {i+1} (страница {chunk['page']}):\n{clean_text}") sources_info.append({ "page": chunk['page'], "score": float(score), "preview": clean_text[:200] + "..." if len(clean_text) > 200 else clean_text }) context = "\n\n".join(context_parts) clean_query = query.encode('utf-8', errors='ignore').decode('utf-8') # Используем структурированный вывод, если доступен Pydantic if HAS_PYDANTIC: return self._generate_structured_answer(clean_query, context, sources_info) else: return self._generate_simple_answer(clean_query, context) except Exception as e: return f"❌ Ошибка генерации ответа: {str(e)}" def _generate_structured_answer(self, query: str, context: str, sources_info: List[Dict]) -> str: """Генерация структурированного ответа с Chain-of-Thought""" try: # JSON Schema для принуждения к структуре schema = FinancialAnswer.model_json_schema() prompt = f"""Ты - эксперт по анализу финансовых отчетов ПАО Сбербанк. Проанализируй вопрос пользователя используя Chain-of-Thought рассуждения. ВОПРОС: {query} КОНТЕКСТ ИЗ ОТЧЕТА: {context} ИНСТРУКЦИИ ДЛЯ АНАЛИЗА: 1. МЫШЛЕНИЕ (thinking): - Проанализируй, что именно спрашивает пользователь - Определи, какая информация есть в предоставленных источниках - Пройди через логические шаги рассуждений - Сделай выводы на основе найденной информации 2. ОТВЕТ: - Дай четкий и полный ответ на русском языке - Используй конкретные данные и цифры из отчета - Укажи номера страниц при цитировании 3. УВЕРЕННОСТЬ: - Оцени от 0 до 1, насколько уверен в ответе - Учитывай полноту и качество найденной информации 4. ИСТОЧНИКИ: - Для каждого использованного источника укажи релевантность (0-1) - Кратко опиши содержимое Отвечай ТОЛЬКО в формате JSON согласно схеме. Все тексты на русском языке.""" response = self.client.chat.completions.create( model=self.generation_model, messages=[{"role": "user", "content": prompt}], max_tokens=2000, temperature=0.1, response_format={"type": "json_object"} ) json_response = response.choices[0].message.content.strip() # Парсим и валидируем JSON try: parsed_response = json.loads(json_response) validated_response = FinancialAnswer(**parsed_response) return self._format_structured_response(validated_response) except Exception as parse_error: print(f"⚠️ Ошибка парсинга JSON: {parse_error}") return self._generate_simple_answer(query, context) except Exception as e: print(f"⚠️ Ошибка структурированной генерации: {e}") return self._generate_simple_answer(query, context) def _generate_simple_answer(self, query: str, context: str) -> str: """Генерация простого ответа с Chain-of-Thought (fallback)""" prompt = f"""Ты - эксперт по анализу финансовых отчетов. Ответь на вопрос используя Chain-of-Thought рассуждения. ВОПРОС: {query} КОНТЕКСТ ИЗ ОТЧЕТА: {context} ФОРМАТ ОТВЕТА: 🤔 **АНАЛИЗ ВОПРОСА:** [Что именно спрашивает пользователь] 📊 **НАЙДЕННАЯ ИНФОРМАЦИЯ:** [Какие данные есть в источниках] 🔍 **РАССУЖДЕНИЯ:** [Логические шаги анализа] ✅ **ВЫВОДЫ:** [Финальный ответ с конкретными данными] ИНСТРУКЦИИ: - Отвечай только на основе предоставленной информации - Используй конкретные данные и цифры из отчета - Указывай номера страниц при цитировании - Отвечай на русском языке""" response = self.client.chat.completions.create( model=self.generation_model, messages=[{"role": "user", "content": prompt}], max_tokens=1500, temperature=0.1 ) return response.choices[0].message.content.strip() def _format_structured_response(self, response: 'FinancialAnswer') -> str: """Форматирование структурированного ответа для отображения""" formatted = f"""🤔 **ПРОЦЕСС РАССУЖДЕНИЙ:** 📝 **Анализ вопроса:** {response.thinking.question_analysis} 📊 **Найденная информация:** {response.thinking.information_found} 🔍 **Шаги рассуждений:** """ for i, step in enumerate(response.thinking.reasoning_steps, 1): formatted += f"{i}. {step}\n" formatted += f"\n💡 **Выводы:** {response.thinking.conclusion}\n" formatted += f"\n✅ **ФИНАЛЬНЫЙ ОТВЕТ:**\n{response.answer}\n" formatted += f"\n📊 **Уверенность:** {response.confidence:.1%}\n" if response.key_metrics: formatted += f"\n📈 **Ключевые показатели:**\n" for key, value in response.key_metrics.items(): formatted += f"- {key}: {value}\n" formatted += f"\n📚 **Источники:**\n" for i, source in enumerate(response.sources, 1): formatted += f"{i}. Страница {source.page} (релевантность: {source.relevance_score:.1%})\n" formatted += f" {source.content_preview}\n" return formatted def process_query(self, query: str) -> Dict[str, Any]: """Обработка пользовательского запроса""" if not self.is_initialized: return { "answer": "❌ Система не инициализирована. Введите API ключ.", "sources": [], "debug_info": {} } if not query.strip(): return { "answer": "Пожалуйста, введите ваш вопрос.", "sources": [], "debug_info": {} } try: # Поиск search_results = self.search(query, k=self.vector_search_k) if not search_results: return { "answer": "К сожалению, не удалось найти релевантную информацию по вашему вопросу.", "sources": [], "debug_info": {"step": "search", "results_count": 0} } # Реранкинг reranked_results = self.rerank_with_llm(query, search_results) # Генерация ответа answer = self.generate_answer(query, reranked_results) # Подготовка источников sources = [] # Создаем словарь для быстрого поиска search_score по chunk_index search_scores = {} for chunk, score in search_results: search_scores[chunk.get("chunk_index", -1)] = score for chunk, score in reranked_results[:self.final_chunks_count]: sources.append({ "page": chunk["page"], "search_score": search_scores.get(chunk.get("chunk_index", -1), 0), "rerank_score": score, "preview": chunk["text"][:200] + "..." if len(chunk["text"]) > 200 else chunk["text"] }) debug_info = { "search_results": len(search_results), "reranked_results": len(reranked_results), "final_chunks": len(sources), "search_method": "vector" if self.vector_mode else "keyword" } return { "answer": answer, "sources": sources, "debug_info": debug_info } except Exception as e: print(f"❌ Ошибка обработки запроса: {e}") traceback.print_exc() return { "answer": f"❌ Ошибка обработки запроса: {str(e)}", "sources": [], "debug_info": {"error": str(e)} } # Глобальная переменная системы rag_system = VectorRAGSystem() def initialize_system(api_key: str) -> Tuple[str, str]: """Инициализация системы""" return rag_system.initialize_with_api_key(api_key) def ask_question(question: str) -> Tuple[str, str]: """Обработка вопроса""" result = rag_system.process_query(question) answer = result["answer"] # Форматируем информацию об источниках sources_info = "" if result["sources"]: sources_info = "\n📚 **Источники:**\n" for i, source in enumerate(result["sources"], 1): sources_info += f"\n**{i}.** Страница {source['page']} " sources_info += f"(поиск: {source['search_score']:.3f}, " sources_info += f"релевантность: {source['rerank_score']:.1f}/10)\n" sources_info += f"*Превью:* {source['preview']}\n" # Добавляем отладочную информацию if result.get("debug_info"): debug = result["debug_info"] sources_info += f"\n🔍 **Статистика поиска:**\n" sources_info += f"- Метод поиска: {debug.get('search_method', 'unknown')}\n" sources_info += f"- Найдено результатов: {debug.get('search_results', 0)}\n" sources_info += f"- После реранкинга: {debug.get('reranked_results', 0)}\n" sources_info += f"- Использовано в ответе: {debug.get('final_chunks', 0)}\n" return answer, sources_info def create_demo_interface(): """Создание демо интерфейса""" if not HAS_GRADIO: print("❌ Gradio не установлен. Установите: pip install gradio") return None with gr.Blocks( title="Vector RAG Demo - Сбер 2023", theme=gr.themes.Soft(), css=""" .main-header { text-align: center; margin-bottom: 2rem; } .feature-box { background-color: #f8f9fa; padding: 1rem; border-radius: 8px; margin: 1rem 0; } """ ) as demo: gr.Markdown("""

🧠 Advanced RAG with Chain-of-Thought: Анализ отчета Сбера 2023

Интеллектуальная система с векторным поиском, LLM реранкингом и структурированными рассуждениями

text-embedding-3-large • FAISS • GPT-4o • JSON Schema • Chain-of-Thought • Parent-page enrichment

""") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### ⚙️ Настройка") api_key_input = gr.Textbox( label="OpenAI API Key", placeholder="sk-proj-...", type="password", info="Введите ваш OpenAI API ключ" ) init_btn = gr.Button("🚀 Инициализировать", variant="primary") status_output = gr.Textbox( label="Статус", interactive=False, lines=2 ) with gr.Column(scale=1): stats_output = gr.Markdown("### 📊 Ожидание инициализации...") gr.Markdown("### 💬 Задайте вопрос") with gr.Row(): question_input = gr.Textbox( label="Ваш вопрос", placeholder="Например: Каковы основные финансовые показатели Сбера за 2023 год?", lines=2, scale=4 ) ask_btn = gr.Button("🔍 Поиск", variant="primary", scale=1) with gr.Row(): with gr.Column(scale=2): answer_output = gr.Textbox( label="Ответ системы", lines=12, interactive=False ) with gr.Column(scale=1): sources_output = gr.Textbox( label="Источники и статистика", lines=12, interactive=False ) # Примеры вопросов gr.Markdown(""" ### 💡 Примеры вопросов: - Каковы основные финансовые показатели Сбера за 2023 год? - Какова чистая прибыль банка в 2023 году? - Расскажите о кредитном портфеле Сбербанка - Какие технологические инициативы развивает Сбер? - Каковы показатели рентабельности банка? - Какие ESG инициативы реализует Сбер? """) # Event handlers init_btn.click( fn=initialize_system, inputs=[api_key_input], outputs=[status_output, stats_output] ) ask_btn.click( fn=ask_question, inputs=[question_input], outputs=[answer_output, sources_output] ) question_input.submit( fn=ask_question, inputs=[question_input], outputs=[answer_output, sources_output] ) return demo # Запуск для Hugging Face Spaces demo = create_demo_interface() if __name__ == "__main__": if demo: demo.launch() else: print("❌ Не удалось создать интерфейс")