import os
import json
import pickle
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import traceback
import re
from datetime import datetime
try:
from pydantic import BaseModel, Field
HAS_PYDANTIC = True
except ImportError:
HAS_PYDANTIC = False
print("⚠️ Pydantic не установлен, структурированный вывод недоступен")
try:
import numpy as np
import faiss
HAS_FAISS = True
except ImportError:
HAS_FAISS = False
print("⚠️ FAISS не установлен, будет использован поиск по ключевым словам")
try:
import gradio as gr
HAS_GRADIO = True
except ImportError:
HAS_GRADIO = False
print("⚠️ Gradio не установлен")
from openai import OpenAI
# Pydantic модели для структурированного вывода
if HAS_PYDANTIC:
class SourceInfo(BaseModel):
"""Информация об источнике"""
page: int = Field(description="Номер страницы в отчете")
relevance_score: float = Field(description="Оценка релевантности от 0 до 1")
content_preview: str = Field(description="Краткое описание содержимого")
class ThinkingProcess(BaseModel):
"""Процесс рассуждений (Chain-of-Thought)"""
question_analysis: str = Field(description="Анализ вопроса пользователя")
information_found: str = Field(description="Найденная в источниках информация")
reasoning_steps: List[str] = Field(description="Шаги логических рассуждений")
conclusion: str = Field(description="Выводы на основе анализа")
class FinancialAnswer(BaseModel):
"""Структурированный ответ по финансовой отчетности"""
thinking: ThinkingProcess = Field(description="Процесс рассуждений")
answer: str = Field(description="Основной ответ на вопрос")
confidence: float = Field(description="Уверенность в ответе от 0 до 1")
sources: List[SourceInfo] = Field(description="Использованные источники")
key_metrics: Optional[Dict[str, Any]] = Field(description="Ключевые числовые показатели", default=None)
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
class VectorRAGSystem:
"""RAG система с векторным поиском и резервным режимом"""
def __init__(self):
self.chunks = []
self.word_index = {}
self.faiss_index = None
self.metadata = {}
self.client = None
self.is_initialized = False
self.pdf_doc = None
self.page_texts = {} # Кеш текстов страниц
# Модели и параметры
self.embedding_model = "text-embedding-3-large"
self.embedding_dim = 3072
self.generation_model = "gpt-4o"
self.reranking_model = "gpt-4o-mini"
# Параметры поиска
self.max_chunks_for_rerank = 15
self.final_chunks_count = 5
self.vector_search_k = 20
# Режим работы
self.vector_mode = HAS_FAISS
def initialize_with_api_key(self, api_key: str) -> Tuple[str, str]:
"""Инициализация системы с API ключом"""
try:
if not api_key.strip():
return "❌ Введите OpenAI API ключ", ""
# Инициализация OpenAI клиента
self.client = OpenAI(api_key=api_key.strip())
# Загрузка данных
if not self.load_data():
return "❌ Ошибка загрузки данных", ""
self.is_initialized = True
stats = self._generate_stats()
return "✅ Векторная RAG система инициализирована", stats
except Exception as e:
return f"❌ Ошибка инициализации: {str(e)}", ""
def load_data(self) -> bool:
"""Загрузка векторных данных"""
try:
# Загружаем только векторные данные
if self.vector_mode and self.load_vector_data():
return True
print("❌ Векторные данные не найдены или не удалось загрузить")
return False
except Exception as e:
print(f"❌ Ошибка загрузки данных: {e}")
return False
def load_vector_data(self) -> bool:
"""Загрузка векторных данных и сохранение полной metadata_list с caption."""
try:
print("🔄 Попытка загрузки векторных данных...")
faiss_file = "chunks_flatip.faiss"
metadata_file = "metadata.json"
if not all(os.path.exists(f) for f in [faiss_file, metadata_file]):
print("📁 Векторные файлы не найдены")
return False
# 1) Читаем весь список метаданных, сохраняем его
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata_list = json.load(f)
self.metadata_list = metadata_list
# 2) Строим self.chunks, сохраняя каждый item целиком
self.chunks = []
for i, item in enumerate(metadata_list):
chunk_id = item.get("chunk_id",
item.get("table_id",
item.get("img_id", None)))
self.chunks.append({
"page": item["page"],
"chunk_id": chunk_id,
"chunk_index": i,
"text": "", # заполним в vector_search
"metadata": item # здесь есть caption, type и т.д.
})
# 3) Загружаем FAISS-индекс
if HAS_FAISS:
self.faiss_index = faiss.read_index(faiss_file)
# 4) Загружаем PDF для parent-page enrichment
pdf_path = "data/Сбер 2023.pdf"
if os.path.exists(pdf_path):
import fitz
self.pdf_doc = fitz.open(pdf_path)
print(f"✅ PDF загружен: {self.pdf_doc.page_count} страниц")
else:
print("❌ PDF не найден для enrichment")
self.pdf_doc = None
print(f"✅ Загружены векторы: {len(self.chunks)} чанков")
return True
except Exception as e:
print(f"❌ Ошибка load_vector_data: {e}")
return False
def get_page_text(self, page_num: int) -> str:
"""Получение полного текста страницы с кешированием"""
if page_num in self.page_texts:
return self.page_texts[page_num]
try:
if not self.pdf_doc or page_num < 1 or page_num > self.pdf_doc.page_count:
return ""
page = self.pdf_doc[page_num - 1] # PyMuPDF использует 0-based индексы
text = page.get_text()
# Кешируем текст
self.page_texts[page_num] = text
return text
except Exception as e:
print(f"❌ Ошибка получения текста страницы {page_num}: {e}")
return ""
def _generate_stats(self) -> str:
"""Генерация статистики системы"""
total_chunks = len(self.chunks)
mode = "Векторный поиск" if self.vector_mode and self.faiss_index else "Базовый режим"
structured_output = "✅ Pydantic" if HAS_PYDANTIC else "❌ Недоступно"
pdf_enrichment = "✅ Активен" if self.pdf_doc else "❌ Недоступен"
stats = f"""🧠 **Advanced RAG система с Chain-of-Thought готова!**
📊 **Технические характеристики:**
- 📦 Векторных эмбеддингов: {total_chunks}
- 🔍 Режим поиска: {mode}
- 🧠 Модель генерации: {self.generation_model}
- 🎯 LLM реранкинг: {self.reranking_model}
- 📄 Parent-page enrichment: {pdf_enrichment}
- 📋 Структурированный вывод: {structured_output}
🚀 **Архитектурные особенности:**
- 📚 **Предобработка** PDF файла (текст и таблицы) через pdfplumber
- 🔎 **Векторный поиск** с text-embedding-3-large
- 📄 **Parent-page enrichment** через PyMuPDF
- 🧠 **LLM реранкинг** для повышения релевантности
- 🤔 **Chain-of-Thought** рассуждения
- 📋 **JSON Schema** для структурированных ответов
- 📊 **Confidence scoring** и детальная аналитика
💡 **Готова к интеллектуальному анализу отчета ПАО Сбербанк 2023!**"""
return stats
def search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]:
"""Основной метод поиска"""
if self.vector_mode and self.faiss_index and self.client:
return self.vector_search(query, k)
else:
print("⚠️ Векторный режим отключен")
return []
def vector_search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]:
"""Векторный поиск + enrichment с caption из metadata_list."""
try:
response = self.client.embeddings.create(
model=self.embedding_model,
input=[query]
)
q_emb = np.array(response.data[0].embedding, dtype=np.float32).reshape(1, -1)
faiss.normalize_L2(q_emb)
scores, indices = self.faiss_index.search(q_emb, k)
results = []
for score, idx in zip(scores[0], indices[0]):
if 0 <= idx < len(self.chunks):
record = self.chunks[idx].copy()
meta_item = self.metadata_list[idx]
# базовый текст страницы
page_text = self.get_page_text(record["page"]) or ""
# если это картинка и есть caption — добавляем его сверху
if meta_item.get("type") == "image" and meta_item.get("caption"):
caption = meta_item["caption"]
record["text"] = caption + "\n\n" + page_text
else:
record["text"] = page_text
results.append((record, float(score)))
return results
except Exception as e:
print(f"❌ Ошибка vector_search: {e}")
return []
def rerank_with_llm(self, query: str, chunks: List[Tuple[Dict, float]]) -> List[Tuple[Dict, float]]:
"""LLM реранкинг результатов"""
if not chunks or not self.client:
return chunks
try:
chunks_to_rerank = chunks[:self.max_chunks_for_rerank]
docs_text = ""
for i, (chunk, _) in enumerate(chunks_to_rerank):
preview = chunk['text'][:300] + "..." if len(chunk['text']) > 300 else chunk['text']
docs_text += f"\nДокумент {i+1} (стр. {chunk['page']}):\n{preview}\n"
prompt = f"""Оцени релевантность каждого документа для ответа на вопрос по шкале 1-10.
Вопрос: {query}
Документы:{docs_text}
Инструкции:
1. Оценивай точность и полноту информации для ответа
2. Высшие баллы (8-10) - прямой ответ на вопрос
3. Средние баллы (5-7) - частично релевантная информация
4. Низкие баллы (1-4) - слабо связано с вопросом
Верни только числа через запятую (например: 8,6,9,4,7):"""
response = self.client.chat.completions.create(
model=self.reranking_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=100,
temperature=0
)
scores_text = response.choices[0].message.content.strip()
numbers = re.findall(r'\d+\.?\d*', scores_text)
scores = [max(0, min(10, float(num))) for num in numbers]
reranked = []
for i, (chunk, original_score) in enumerate(chunks):
rerank_score = scores[i] if i < len(scores) else 0
reranked.append((chunk, rerank_score))
reranked.sort(key=lambda x: x[1], reverse=True)
return reranked
except Exception as e:
print(f"❌ Ошибка реранкинга: {e}")
return chunks
def generate_answer(self, query: str, context_chunks: List[Tuple[Dict, float]]) -> str:
"""Генерация ответа с Chain-of-Thought и структурированным выводом"""
if not self.client:
return "❌ OpenAI API не настроен"
try:
# Подготавливаем контекст с метаинформацией
context_parts = []
sources_info = []
for i, (chunk, score) in enumerate(context_chunks[:self.final_chunks_count]):
text = chunk.get('text', '')
clean_text = text.encode('utf-8', errors='ignore').decode('utf-8')
# Ограничиваем длину для лучшей обработки
if len(clean_text) > 1500:
clean_text = clean_text[:1500] + "..."
context_parts.append(f"Источник {i+1} (страница {chunk['page']}):\n{clean_text}")
sources_info.append({
"page": chunk['page'],
"score": float(score),
"preview": clean_text[:200] + "..." if len(clean_text) > 200 else clean_text
})
context = "\n\n".join(context_parts)
clean_query = query.encode('utf-8', errors='ignore').decode('utf-8')
# Используем структурированный вывод, если доступен Pydantic
if HAS_PYDANTIC:
return self._generate_structured_answer(clean_query, context, sources_info)
else:
return self._generate_simple_answer(clean_query, context)
except Exception as e:
return f"❌ Ошибка генерации ответа: {str(e)}"
def _generate_structured_answer(self, query: str, context: str, sources_info: List[Dict]) -> str:
"""Генерация структурированного ответа с Chain-of-Thought"""
try:
# JSON Schema для принуждения к структуре
schema = FinancialAnswer.model_json_schema()
prompt = f"""Ты — эксперт по анализу финансовых отчетов ПАО Сбербанк. Проанализируй вопрос пользователя, используя Chain-of-Thought.
ВОПРОС: {query}
КОНТЕКСТ ИЗ ОТЧЕТА:
{context}
Отвечай строго JSON со следующими полями:
{{
"thinking": {{
"question_analysis": "<анализ вопроса>",
"information_found": "<что найдено в источниках>",
"reasoning_steps": ["<шаг 1>", "<шаг 2>", "..."],
"conclusion": "<краткий вывод>"
}},
"answer": "<основной ответ на русском языке>",
"confidence": <число от 0 до 1>,
"sources": [
{{
"page": <номер страницы>,
"relevance_score": <0–1>,
"content_preview": "<превью текста>"
}},
…
]
}}"""
response = self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=2000,
temperature=0.1,
response_format={"type": "json_object"}
)
json_response = response.choices[0].message.content.strip()
print("Raw JSON response:", json_response)
# Парсим и валидируем JSON
parsed = json.loads(json_response)
validated = FinancialAnswer(**parsed)
return self._format_structured_response(validated)
except Exception as e:
print(f"⚠️ Ошибка в _generate_structured_answer: {e}")
return self._generate_simple_answer(query, context)
def _generate_simple_answer(self, query: str, context: str) -> str:
"""Генерация простого ответа с Chain-of-Thought (fallback)"""
prompt = f"""Ты - эксперт по анализу финансовых отчетов. Ответь на вопрос, используя Chain-of-Thought рассуждения.
ВОПРОС: {query}
КОНТЕКСТ ИЗ ОТЧЕТА:
{context}
ФОРМАТ ОТВЕТА:
🤔 **АНАЛИЗ ВОПРОСА:**
[Что именно спрашивает пользователь]
📊 **НАЙДЕННАЯ ИНФОРМАЦИЯ:**
[Какие данные есть в источниках]
🔍 **РАССУЖДЕНИЯ:**
[Логические шаги анализа]
✅ **ВЫВОДЫ:**
[Финальный ответ с конкретными данными]
ИНСТРУКЦИИ:
- Отвечай только на основе предоставленной информации
- Используй конкретные данные и цифры из отчета
- Указывай номера страниц при цитировании
- Отвечай на русском языке"""
response = self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1500,
temperature=0.1
)
return response.choices[0].message.content.strip()
def _format_structured_response(self, response: 'FinancialAnswer') -> str:
"""Форматирование структурированного ответа для отображения"""
formatted = f"""🤔 **ПРОЦЕСС РАССУЖДЕНИЙ:**
📝 **Анализ вопроса:** {response.thinking.question_analysis}
📊 **Найденная информация:** {response.thinking.information_found}
🔍 **Шаги рассуждений:**
"""
for i, step in enumerate(response.thinking.reasoning_steps, 1):
formatted += f"{i}. {step}\n"
formatted += f"\n💡 **Выводы:** {response.thinking.conclusion}\n"
formatted += f"\n✅ **ФИНАЛЬНЫЙ ОТВЕТ:**\n{response.answer}\n"
formatted += f"\n📊 **Уверенность:** {response.confidence:.1%}\n"
if response.key_metrics:
formatted += f"\n📈 **Ключевые показатели:**\n"
for key, value in response.key_metrics.items():
formatted += f"- {key}: {value}\n"
formatted += f"\n📚 **Источники:**\n"
for i, source in enumerate(response.sources, 1):
formatted += f"{i}. Страница {source.page} (релевантность: {source.relevance_score:.1%})\n"
formatted += f" {source.content_preview}\n"
return formatted
def process_query(self, query: str) -> Dict[str, Any]:
"""Обработка пользовательского запроса"""
if not self.is_initialized:
return {
"answer": "❌ Система не инициализирована. Введите API ключ.",
"sources": [],
"debug_info": {}
}
if not query.strip():
return {
"answer": "Пожалуйста, введите ваш вопрос.",
"sources": [],
"debug_info": {}
}
try:
# Поиск
search_results = self.search(query, k=self.vector_search_k)
if not search_results:
return {
"answer": "К сожалению, не удалось найти релевантную информацию по вашему вопросу.",
"sources": [],
"debug_info": {"step": "search", "results_count": 0}
}
# Реранкинг
reranked_results = self.rerank_with_llm(query, search_results)
# Генерация ответа
answer = self.generate_answer(query, reranked_results)
# Подготовка источников
sources = []
# Создаем словарь для быстрого поиска search_score по chunk_index
search_scores = {}
for chunk, score in search_results:
search_scores[chunk.get("chunk_index", -1)] = score
for chunk, score in reranked_results[:self.final_chunks_count]:
sources.append({
"page": chunk["page"],
"search_score": search_scores.get(chunk.get("chunk_index", -1), 0),
"rerank_score": score,
"preview": chunk["text"][:200] + "..." if len(chunk["text"]) > 200 else chunk["text"]
})
debug_info = {
"search_results": len(search_results),
"reranked_results": len(reranked_results),
"final_chunks": len(sources),
"search_method": "vector" if self.vector_mode else "keyword"
}
return {
"answer": answer,
"sources": sources,
"debug_info": debug_info
}
except Exception as e:
print(f"❌ Ошибка обработки запроса: {e}")
traceback.print_exc()
return {
"answer": f"❌ Ошибка обработки запроса: {str(e)}",
"sources": [],
"debug_info": {"error": str(e)}
}
# Глобальная переменная системы
rag_system = VectorRAGSystem()
def initialize_system(api_key: str) -> Tuple[str, str]:
"""Инициализация системы"""
return rag_system.initialize_with_api_key(api_key)
def ask_question(question: str) -> Tuple[str, str]:
"""Обработка вопроса"""
result = rag_system.process_query(question)
answer = result["answer"]
# Форматируем информацию об источниках
sources_info = ""
if result["sources"]:
sources_info = "\n📚 **Источники:**\n"
for i, source in enumerate(result["sources"], 1):
sources_info += f"\n**{i}.** Страница {source['page']} "
sources_info += f"(поиск: {source['search_score']:.3f}, "
sources_info += f"релевантность: {source['rerank_score']:.1f}/10)\n"
sources_info += f"*Превью:* {source['preview']}\n"
# Добавляем отладочную информацию
if result.get("debug_info"):
debug = result["debug_info"]
sources_info += f"\n🔍 **Статистика поиска:**\n"
sources_info += f"- Метод поиска: {debug.get('search_method', 'unknown')}\n"
sources_info += f"- Найдено результатов: {debug.get('search_results', 0)}\n"
sources_info += f"- После реранкинга: {debug.get('reranked_results', 0)}\n"
sources_info += f"- Использовано в ответе: {debug.get('final_chunks', 0)}\n"
return answer, sources_info
def create_demo_interface():
"""Создание демо интерфейса"""
if not HAS_GRADIO:
print("❌ Gradio не установлен. Установите: pip install gradio")
return None
with gr.Blocks(
title="Vector RAG Demo - Сбер 2023",
theme=gr.themes.Soft(),
css="""
.main-header { text-align: center; margin-bottom: 2rem; }
.feature-box { background-color: #f8f9fa; padding: 1rem; border-radius: 8px; margin: 1rem 0; }
"""
) as demo:
gr.Markdown("""
🧠 Advanced RAG with Chain-of-Thought: Анализ отчета Сбера 2023
Интеллектуальная система с векторным поиском, LLM реранкингом и структурированными рассуждениями
text-embedding-3-large • FAISS • GPT-4o • JSON Schema • Chain-of-Thought • Parent-page enrichment
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### ⚙️ Настройка")
api_key_input = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-proj-...",
type="password",
info="Введите ваш OpenAI API ключ"
)
init_btn = gr.Button("🚀 Инициализировать", variant="primary")
status_output = gr.Textbox(
label="Статус",
interactive=False,
lines=2
)
with gr.Column(scale=1):
stats_output = gr.Markdown("### 📊 Ожидание инициализации...")
gr.Markdown("### 💬 Задайте вопрос")
with gr.Row():
question_input = gr.Textbox(
label="Ваш вопрос",
placeholder="Например: Каковы основные финансовые показатели Сбера за 2023 год?",
lines=2,
scale=4
)
ask_btn = gr.Button("🔍 Поиск", variant="primary", scale=1)
with gr.Row():
with gr.Column(scale=2):
answer_output = gr.Textbox(
label="Ответ системы",
lines=12,
interactive=False
)
with gr.Column(scale=1):
sources_output = gr.Textbox(
label="Источники и статистика",
lines=12,
interactive=False
)
# Примеры вопросов
gr.Markdown("""
### 💡 Примеры вопросов:
- Каковы основные финансовые показатели Сбера за 2023 год?
- Какова чистая прибыль банка в 2023 году?
- Расскажите о кредитном портфеле Сбербанка
- Какие технологические инициативы развивает Сбер?
- Каковы показатели рентабельности банка?
- Какие ESG инициативы реализует Сбер?
""")
# Event handlers
init_btn.click(
fn=initialize_system,
inputs=[api_key_input],
outputs=[status_output, stats_output]
)
ask_btn.click(
fn=ask_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
question_input.submit(
fn=ask_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
return demo
# Запуск для Hugging Face Spaces
demo = create_demo_interface()
if __name__ == "__main__":
if demo:
demo.launch()
else:
print("❌ Не удалось создать интерфейс")