Course_Project / app.py
fruitpicker01's picture
Update app.py
7041fc9 verified
raw
history blame
31.5 kB
import os
import json
import pickle
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import traceback
import re
from datetime import datetime
try:
from pydantic import BaseModel, Field
HAS_PYDANTIC = True
except ImportError:
HAS_PYDANTIC = False
print("⚠️ Pydantic не установлен, структурированный вывод недоступен")
try:
import numpy as np
import faiss
HAS_FAISS = True
except ImportError:
HAS_FAISS = False
print("⚠️ FAISS не установлен, будет использован поиск по ключевым словам")
try:
import gradio as gr
HAS_GRADIO = True
except ImportError:
HAS_GRADIO = False
print("⚠️ Gradio не установлен")
from openai import OpenAI
# Pydantic модели для структурированного вывода
if HAS_PYDANTIC:
class SourceInfo(BaseModel):
"""Информация об источнике"""
page: int = Field(description="Номер страницы в отчете")
relevance_score: float = Field(description="Оценка релевантности от 0 до 1")
content_preview: str = Field(description="Краткое описание содержимого")
class ThinkingProcess(BaseModel):
"""Процесс рассуждений (Chain-of-Thought)"""
question_analysis: str = Field(description="Анализ вопроса пользователя")
information_found: str = Field(description="Найденная в источниках информация")
reasoning_steps: List[str] = Field(description="Шаги логических рассуждений")
conclusion: str = Field(description="Выводы на основе анализа")
class FinancialAnswer(BaseModel):
"""Структурированный ответ по финансовой отчетности"""
thinking: ThinkingProcess = Field(description="Процесс рассуждений")
answer: str = Field(description="Основной ответ на вопрос")
confidence: float = Field(description="Уверенность в ответе от 0 до 1")
sources: List[SourceInfo] = Field(description="Использованные источники")
key_metrics: Optional[Dict[str, Any]] = Field(description="Ключевые числовые показатели", default=None)
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
class VectorRAGSystem:
"""RAG система с векторным поиском и резервным режимом"""
def __init__(self):
self.chunks = []
self.word_index = {}
self.faiss_index = None
self.metadata = {}
self.client = None
self.is_initialized = False
self.pdf_doc = None
self.page_texts = {} # Кеш текстов страниц
# Модели и параметры
self.embedding_model = "text-embedding-3-large"
self.embedding_dim = 3072
self.generation_model = "gpt-4o"
self.reranking_model = "gpt-4o-mini"
# Параметры поиска
self.max_chunks_for_rerank = 15
self.final_chunks_count = 5
self.vector_search_k = 20
# Режим работы
self.vector_mode = HAS_FAISS
def initialize_with_api_key(self, api_key: str) -> Tuple[str, str]:
"""Инициализация системы с API ключом"""
try:
if not api_key.strip():
return "❌ Введите OpenAI API ключ", ""
# Инициализация OpenAI клиента
self.client = OpenAI(api_key=api_key.strip())
# Загрузка данных
if not self.load_data():
return "❌ Ошибка загрузки данных", ""
self.is_initialized = True
stats = self._generate_stats()
return "✅ Векторная RAG система инициализирована", stats
except Exception as e:
return f"❌ Ошибка инициализации: {str(e)}", ""
def load_data(self) -> bool:
"""Загрузка векторных данных"""
try:
# Загружаем только векторные данные
if self.vector_mode and self.load_vector_data():
return True
print("❌ Векторные данные не найдены или не удалось загрузить")
return False
except Exception as e:
print(f"❌ Ошибка загрузки данных: {e}")
return False
def load_vector_data(self) -> bool:
"""Загрузка векторных данных"""
try:
print("🔄 Попытка загрузки векторных данных...")
# Файлы векторных данных
faiss_file = "chunks_flatip.faiss"
metadata_file = "metadata.json"
if not all(os.path.exists(f) for f in [faiss_file, metadata_file]):
print("📁 Файлы векторных данных не найдены")
return False
# Загружаем метаданные
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata_list = json.load(f)
# Собираем self.chunks для всех типов элементов
self.chunks = []
for i, item in enumerate(metadata_list):
# унифицируем идентификатор чанка
chunk_id = item.get("chunk_id",
item.get("table_id",
item.get("img_id", None)))
self.chunks.append({
"page": item["page"],
"chunk_id": chunk_id,
"chunk_index": i,
"text": "", # подгрузим из PDF при выдаче
"metadata": {}
})
# Сохраняем общую статистику
self.metadata = {"total_chunks": len(self.chunks)}
# Загружаем FAISS-индекс
if HAS_FAISS:
self.faiss_index = faiss.read_index(faiss_file)
# Загружаем PDF для parent-page enrichment
pdf_path = "data/Сбер 2023.pdf"
if os.path.exists(pdf_path):
import fitz # PyMuPDF
self.pdf_doc = fitz.open(pdf_path)
print(f"✅ PDF загружен: {self.pdf_doc.page_count} страниц")
else:
print("❌ PDF файл не найден для parent-page enrichment")
self.pdf_doc = None
print(f"✅ Загружены векторные данные: {len(self.chunks)} чанков")
return True
except Exception as e:
print(f"❌ Ошибка загрузки векторных данных: {e}")
return False
def get_page_text(self, page_num: int) -> str:
"""Получение полного текста страницы с кешированием"""
if page_num in self.page_texts:
return self.page_texts[page_num]
try:
if not self.pdf_doc or page_num < 1 or page_num > self.pdf_doc.page_count:
return ""
page = self.pdf_doc[page_num - 1] # PyMuPDF использует 0-based индексы
text = page.get_text()
# Кешируем текст
self.page_texts[page_num] = text
return text
except Exception as e:
print(f"❌ Ошибка получения текста страницы {page_num}: {e}")
return ""
def _generate_stats(self) -> str:
"""Генерация статистики системы"""
total_chunks = len(self.chunks)
mode = "Векторный поиск" if self.vector_mode and self.faiss_index else "Базовый режим"
structured_output = "✅ Pydantic" if HAS_PYDANTIC else "❌ Недоступно"
pdf_enrichment = "✅ Активен" if self.pdf_doc else "❌ Недоступен"
stats = f"""🧠 **Advanced RAG система с Chain-of-Thought готова!**
📊 **Технические характеристики:**
- 📦 Векторных эмбеддингов: {total_chunks}
- 🔍 Режим поиска: {mode}
- 🧠 Модель генерации: {self.generation_model}
- 🎯 LLM реранкинг: {self.reranking_model}
- 📄 Parent-page enrichment: {pdf_enrichment}
- 📋 Структурированный вывод: {structured_output}
🚀 **Архитектурные особенности:**
- 📚 **Предобработка** PDF файла (текст и таблицы) через pdfplumber
- 🔎 **Векторный поиск** с text-embedding-3-large
- 📄 **Parent-page enrichment** через PyMuPDF
- 🧠 **LLM реранкинг** для повышения релевантности
- 🤔 **Chain-of-Thought** рассуждения
- 📋 **JSON Schema** для структурированных ответов
- 📊 **Confidence scoring** и детальная аналитика
💡 **Готова к интеллектуальному анализу отчета ПАО Сбербанк 2023!**"""
return stats
def search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]:
"""Основной метод поиска"""
if self.vector_mode and self.faiss_index and self.client:
return self.vector_search(query, k)
else:
print("⚠️ Векторный режим отключен")
return []
def vector_search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]:
"""Векторный поиск по запросу"""
if not self.faiss_index or not self.client:
print("⚠️ FAISS индекс или OpenAI клиент недоступны")
return []
try:
# Создаем эмбеддинг для запроса
response = self.client.embeddings.create(
model=self.embedding_model,
input=[query]
)
query_embedding = np.array(response.data[0].embedding, dtype=np.float32)
query_embedding = query_embedding.reshape(1, -1)
# Нормализуем для Inner Product
faiss.normalize_L2(query_embedding)
# Поиск в FAISS индексе
scores, indices = self.faiss_index.search(query_embedding, k)
# Формируем результаты с parent-page enrichment
results = []
for score, idx in zip(scores[0], indices[0]):
if 0 <= idx < len(self.chunks):
chunk = self.chunks[idx].copy()
# Получаем полный текст страницы для parent-page enrichment
page_text = self.get_page_text(chunk["page"])
chunk["text"] = page_text if page_text else chunk["text"]
results.append((chunk, float(score)))
return results
except Exception as e:
print(f"❌ Ошибка векторного поиска: {e}")
print("⚠️ Переход на поиск без векторов невозможен")
return []
def rerank_with_llm(self, query: str, chunks: List[Tuple[Dict, float]]) -> List[Tuple[Dict, float]]:
"""LLM реранкинг результатов"""
if not chunks or not self.client:
return chunks
try:
chunks_to_rerank = chunks[:self.max_chunks_for_rerank]
docs_text = ""
for i, (chunk, _) in enumerate(chunks_to_rerank):
preview = chunk['text'][:300] + "..." if len(chunk['text']) > 300 else chunk['text']
docs_text += f"\nДокумент {i+1} (стр. {chunk['page']}):\n{preview}\n"
prompt = f"""Оцени релевантность каждого документа для ответа на вопрос по шкале 1-10.
Вопрос: {query}
Документы:{docs_text}
Инструкции:
1. Оценивай точность и полноту информации для ответа
2. Высшие баллы (8-10) - прямой ответ на вопрос
3. Средние баллы (5-7) - частично релевантная информация
4. Низкие баллы (1-4) - слабо связано с вопросом
Верни только числа через запятую (например: 8,6,9,4,7):"""
response = self.client.chat.completions.create(
model=self.reranking_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=100,
temperature=0
)
scores_text = response.choices[0].message.content.strip()
numbers = re.findall(r'\d+\.?\d*', scores_text)
scores = [max(0, min(10, float(num))) for num in numbers]
reranked = []
for i, (chunk, original_score) in enumerate(chunks):
rerank_score = scores[i] if i < len(scores) else 0
reranked.append((chunk, rerank_score))
reranked.sort(key=lambda x: x[1], reverse=True)
return reranked
except Exception as e:
print(f"❌ Ошибка реранкинга: {e}")
return chunks
def generate_answer(self, query: str, context_chunks: List[Tuple[Dict, float]]) -> str:
"""Генерация ответа с Chain-of-Thought и структурированным выводом"""
if not self.client:
return "❌ OpenAI API не настроен"
try:
# Подготавливаем контекст с метаинформацией
context_parts = []
sources_info = []
for i, (chunk, score) in enumerate(context_chunks[:self.final_chunks_count]):
text = chunk.get('text', '')
clean_text = text.encode('utf-8', errors='ignore').decode('utf-8')
# Ограничиваем длину для лучшей обработки
if len(clean_text) > 1500:
clean_text = clean_text[:1500] + "..."
context_parts.append(f"Источник {i+1} (страница {chunk['page']}):\n{clean_text}")
sources_info.append({
"page": chunk['page'],
"score": float(score),
"preview": clean_text[:200] + "..." if len(clean_text) > 200 else clean_text
})
context = "\n\n".join(context_parts)
clean_query = query.encode('utf-8', errors='ignore').decode('utf-8')
# Используем структурированный вывод, если доступен Pydantic
if HAS_PYDANTIC:
return self._generate_structured_answer(clean_query, context, sources_info)
else:
return self._generate_simple_answer(clean_query, context)
except Exception as e:
return f"❌ Ошибка генерации ответа: {str(e)}"
def _generate_structured_answer(self, query: str, context: str, sources_info: List[Dict]) -> str:
"""Генерация структурированного ответа с Chain-of-Thought"""
try:
# JSON Schema для принуждения к структуре
schema = FinancialAnswer.model_json_schema()
prompt = f"""Ты - эксперт по анализу финансовых отчетов ПАО Сбербанк. Проанализируй вопрос пользователя используя Chain-of-Thought рассуждения.
ВОПРОС: {query}
КОНТЕКСТ ИЗ ОТЧЕТА:
{context}
ИНСТРУКЦИИ ДЛЯ АНАЛИЗА:
1. МЫШЛЕНИЕ (thinking):
- Проанализируй, что именно спрашивает пользователь
- Определи, какая информация есть в предоставленных источниках
- Пройди через логические шаги рассуждений
- Сделай выводы на основе найденной информации
2. ОТВЕТ:
- Дай четкий и полный ответ на русском языке
- Используй конкретные данные и цифры из отчета
- Укажи номера страниц при цитировании
3. УВЕРЕННОСТЬ:
- Оцени от 0 до 1, насколько уверен в ответе
- Учитывай полноту и качество найденной информации
4. ИСТОЧНИКИ:
- Для каждого использованного источника укажи релевантность (0-1)
- Кратко опиши содержимое
Отвечай ТОЛЬКО в формате JSON согласно схеме. Все тексты на русском языке."""
response = self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=2000,
temperature=0.1,
response_format={"type": "json_object"}
)
json_response = response.choices[0].message.content.strip()
# Парсим и валидируем JSON
try:
parsed_response = json.loads(json_response)
validated_response = FinancialAnswer(**parsed_response)
return self._format_structured_response(validated_response)
except Exception as parse_error:
print(f"⚠️ Ошибка парсинга JSON: {parse_error}")
return self._generate_simple_answer(query, context)
except Exception as e:
print(f"⚠️ Ошибка структурированной генерации: {e}")
return self._generate_simple_answer(query, context)
def _generate_simple_answer(self, query: str, context: str) -> str:
"""Генерация простого ответа с Chain-of-Thought (fallback)"""
prompt = f"""Ты - эксперт по анализу финансовых отчетов. Ответь на вопрос используя Chain-of-Thought рассуждения.
ВОПРОС: {query}
КОНТЕКСТ ИЗ ОТЧЕТА:
{context}
ФОРМАТ ОТВЕТА:
🤔 **АНАЛИЗ ВОПРОСА:**
[Что именно спрашивает пользователь]
📊 **НАЙДЕННАЯ ИНФОРМАЦИЯ:**
[Какие данные есть в источниках]
🔍 **РАССУЖДЕНИЯ:**
[Логические шаги анализа]
✅ **ВЫВОДЫ:**
[Финальный ответ с конкретными данными]
ИНСТРУКЦИИ:
- Отвечай только на основе предоставленной информации
- Используй конкретные данные и цифры из отчета
- Указывай номера страниц при цитировании
- Отвечай на русском языке"""
response = self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1500,
temperature=0.1
)
return response.choices[0].message.content.strip()
def _format_structured_response(self, response: 'FinancialAnswer') -> str:
"""Форматирование структурированного ответа для отображения"""
formatted = f"""🤔 **ПРОЦЕСС РАССУЖДЕНИЙ:**
📝 **Анализ вопроса:** {response.thinking.question_analysis}
📊 **Найденная информация:** {response.thinking.information_found}
🔍 **Шаги рассуждений:**
"""
for i, step in enumerate(response.thinking.reasoning_steps, 1):
formatted += f"{i}. {step}\n"
formatted += f"\n💡 **Выводы:** {response.thinking.conclusion}\n"
formatted += f"\n✅ **ФИНАЛЬНЫЙ ОТВЕТ:**\n{response.answer}\n"
formatted += f"\n📊 **Уверенность:** {response.confidence:.1%}\n"
if response.key_metrics:
formatted += f"\n📈 **Ключевые показатели:**\n"
for key, value in response.key_metrics.items():
formatted += f"- {key}: {value}\n"
formatted += f"\n📚 **Источники:**\n"
for i, source in enumerate(response.sources, 1):
formatted += f"{i}. Страница {source.page} (релевантность: {source.relevance_score:.1%})\n"
formatted += f" {source.content_preview}\n"
return formatted
def process_query(self, query: str) -> Dict[str, Any]:
"""Обработка пользовательского запроса"""
if not self.is_initialized:
return {
"answer": "❌ Система не инициализирована. Введите API ключ.",
"sources": [],
"debug_info": {}
}
if not query.strip():
return {
"answer": "Пожалуйста, введите ваш вопрос.",
"sources": [],
"debug_info": {}
}
try:
# Поиск
search_results = self.search(query, k=self.vector_search_k)
if not search_results:
return {
"answer": "К сожалению, не удалось найти релевантную информацию по вашему вопросу.",
"sources": [],
"debug_info": {"step": "search", "results_count": 0}
}
# Реранкинг
reranked_results = self.rerank_with_llm(query, search_results)
# Генерация ответа
answer = self.generate_answer(query, reranked_results)
# Подготовка источников
sources = []
# Создаем словарь для быстрого поиска search_score по chunk_index
search_scores = {}
for chunk, score in search_results:
search_scores[chunk.get("chunk_index", -1)] = score
for chunk, score in reranked_results[:self.final_chunks_count]:
sources.append({
"page": chunk["page"],
"search_score": search_scores.get(chunk.get("chunk_index", -1), 0),
"rerank_score": score,
"preview": chunk["text"][:200] + "..." if len(chunk["text"]) > 200 else chunk["text"]
})
debug_info = {
"search_results": len(search_results),
"reranked_results": len(reranked_results),
"final_chunks": len(sources),
"search_method": "vector" if self.vector_mode else "keyword"
}
return {
"answer": answer,
"sources": sources,
"debug_info": debug_info
}
except Exception as e:
print(f"❌ Ошибка обработки запроса: {e}")
traceback.print_exc()
return {
"answer": f"❌ Ошибка обработки запроса: {str(e)}",
"sources": [],
"debug_info": {"error": str(e)}
}
# Глобальная переменная системы
rag_system = VectorRAGSystem()
def initialize_system(api_key: str) -> Tuple[str, str]:
"""Инициализация системы"""
return rag_system.initialize_with_api_key(api_key)
def ask_question(question: str) -> Tuple[str, str]:
"""Обработка вопроса"""
result = rag_system.process_query(question)
answer = result["answer"]
# Форматируем информацию об источниках
sources_info = ""
if result["sources"]:
sources_info = "\n📚 **Источники:**\n"
for i, source in enumerate(result["sources"], 1):
sources_info += f"\n**{i}.** Страница {source['page']} "
sources_info += f"(поиск: {source['search_score']:.3f}, "
sources_info += f"релевантность: {source['rerank_score']:.1f}/10)\n"
sources_info += f"*Превью:* {source['preview']}\n"
# Добавляем отладочную информацию
if result.get("debug_info"):
debug = result["debug_info"]
sources_info += f"\n🔍 **Статистика поиска:**\n"
sources_info += f"- Метод поиска: {debug.get('search_method', 'unknown')}\n"
sources_info += f"- Найдено результатов: {debug.get('search_results', 0)}\n"
sources_info += f"- После реранкинга: {debug.get('reranked_results', 0)}\n"
sources_info += f"- Использовано в ответе: {debug.get('final_chunks', 0)}\n"
return answer, sources_info
def create_demo_interface():
"""Создание демо интерфейса"""
if not HAS_GRADIO:
print("❌ Gradio не установлен. Установите: pip install gradio")
return None
with gr.Blocks(
title="Vector RAG Demo - Сбер 2023",
theme=gr.themes.Soft(),
css="""
.main-header { text-align: center; margin-bottom: 2rem; }
.feature-box { background-color: #f8f9fa; padding: 1rem; border-radius: 8px; margin: 1rem 0; }
"""
) as demo:
gr.Markdown("""
<div class="main-header">
<h1>🧠 Advanced RAG with Chain-of-Thought: Анализ отчета Сбера 2023</h1>
<p>Интеллектуальная система с векторным поиском, LLM реранкингом и структурированными рассуждениями</p>
<p><strong>text-embedding-3-large • FAISS • GPT-4o • JSON Schema • Chain-of-Thought • Parent-page enrichment</strong></p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### ⚙️ Настройка")
api_key_input = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-proj-...",
type="password",
info="Введите ваш OpenAI API ключ"
)
init_btn = gr.Button("🚀 Инициализировать", variant="primary")
status_output = gr.Textbox(
label="Статус",
interactive=False,
lines=2
)
with gr.Column(scale=1):
stats_output = gr.Markdown("### 📊 Ожидание инициализации...")
gr.Markdown("### 💬 Задайте вопрос")
with gr.Row():
question_input = gr.Textbox(
label="Ваш вопрос",
placeholder="Например: Каковы основные финансовые показатели Сбера за 2023 год?",
lines=2,
scale=4
)
ask_btn = gr.Button("🔍 Поиск", variant="primary", scale=1)
with gr.Row():
with gr.Column(scale=2):
answer_output = gr.Textbox(
label="Ответ системы",
lines=12,
interactive=False
)
with gr.Column(scale=1):
sources_output = gr.Textbox(
label="Источники и статистика",
lines=12,
interactive=False
)
# Примеры вопросов
gr.Markdown("""
### 💡 Примеры вопросов:
- Каковы основные финансовые показатели Сбера за 2023 год?
- Какова чистая прибыль банка в 2023 году?
- Расскажите о кредитном портфеле Сбербанка
- Какие технологические инициативы развивает Сбер?
- Каковы показатели рентабельности банка?
- Какие ESG инициативы реализует Сбер?
""")
# Event handlers
init_btn.click(
fn=initialize_system,
inputs=[api_key_input],
outputs=[status_output, stats_output]
)
ask_btn.click(
fn=ask_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
question_input.submit(
fn=ask_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
return demo
# Запуск для Hugging Face Spaces
demo = create_demo_interface()
if __name__ == "__main__":
if demo:
demo.launch()
else:
print("❌ Не удалось создать интерфейс")