RAG_AIEXP_01 / utils.py
MrSimple07's picture
Merge branch 'main' of https://huggingface.co/spaces/MrSimple01/RAG_AIEXP_01
19e03d0
raw
history blame
26.5 kB
import logging
import sys
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from sentence_transformers import CrossEncoder
from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY
import time
from index_retriever import rerank_nodes
from my_logging import log_message
from config import PROMPT_SIMPLE_POISK
def get_llm_model(model_name):
try:
model_config = AVAILABLE_MODELS.get(model_name)
if not model_config:
log_message(f"Модель {model_name} не найдена, использую модель по умолчанию")
model_config = AVAILABLE_MODELS[DEFAULT_MODEL]
if not model_config.get("api_key"):
raise Exception(f"API ключ не найден для модели {model_name}")
if model_config["provider"] == "google":
return GoogleGenAI(
model=model_config["model_name"],
api_key=model_config["api_key"]
)
elif model_config["provider"] == "openai":
return OpenAI(
model=model_config["model_name"],
api_key=model_config["api_key"]
)
else:
raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}")
except Exception as e:
log_message(f"Ошибка создания модели {model_name}: {str(e)}")
return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY)
def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
return HuggingFaceEmbedding(model_name=model_name)
def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'):
return CrossEncoder(model_name)
def format_context_for_llm(nodes):
context_parts = []
for node in nodes:
metadata = node.metadata if hasattr(node, 'metadata') else {}
doc_id = metadata.get('document_id', 'Неизвестный документ')
section_info = ""
if metadata.get('section_path'):
section_path = metadata['section_path']
section_text = metadata.get('section_text', '')
parent_section = metadata.get('parent_section', '')
parent_title = metadata.get('parent_title', '')
if metadata.get('level') in ['subsection', 'sub_subsection', 'sub_sub_subsection'] and parent_section and parent_title:
section_info = f"пункт {section_path} ({section_text}) в разделе {parent_section} ({parent_title})"
elif section_text:
section_info = f"пункт {section_path} ({section_text})"
else:
section_info = f"пункт {section_path}"
elif metadata.get('section_id'):
section_id = metadata['section_id']
section_text = metadata.get('section_text', '')
if section_text:
section_info = f"пункт {section_id} ({section_text})"
else:
section_info = f"пункт {section_id}"
if metadata.get('type') == 'table' and metadata.get('table_number'):
table_num = metadata['table_number']
if not str(table_num).startswith('№'):
table_num = f"№{table_num}"
section_info = f"таблица {table_num}"
if metadata.get('type') == 'image' and metadata.get('image_number'):
image_num = metadata['image_number']
if not str(image_num).startswith('№'):
image_num = f"№{image_num}"
section_info = f"рисунок {image_num}"
context_text = node.text if hasattr(node, 'text') else str(node)
if section_info:
formatted_context = f"[ИСТОЧНИК: {section_info} документа {doc_id}]\n{context_text}\n"
else:
formatted_context = f"[ИСТОЧНИК: документ {doc_id}]\n{context_text}\n"
context_parts.append(formatted_context)
return "\n".join(context_parts)
def generate_sources_html(nodes, chunks_df=None):
html = "<div style='background-color: #2d3748; color: white; padding: 20px; border-radius: 10px; max-height: 400px; overflow-y: auto;'>"
html += "<h3 style='color: #63b3ed; margin-top: 0;'>Источники:</h3>"
# Group nodes by document to avoid duplicates
sources_by_doc = {}
for i, node in enumerate(nodes):
metadata = node.metadata if hasattr(node, 'metadata') else {}
doc_type = metadata.get('type', 'text')
doc_id = metadata.get('document_id', 'unknown')
section_id = metadata.get('section_id', '')
section_text = metadata.get('section_text', '')
section_path = metadata.get('section_path', '')
# Create a unique key for grouping
if doc_type == 'table':
table_num = metadata.get('table_number', 'unknown')
key = f"{doc_id}_table_{table_num}"
elif doc_type == 'image':
image_num = metadata.get('image_number', 'unknown')
key = f"{doc_id}_image_{image_num}"
else:
# For text documents, group by section path or section id
section_key = section_path if section_path else section_id
key = f"{doc_id}_text_{section_key}"
if key not in sources_by_doc:
sources_by_doc[key] = {
'doc_id': doc_id,
'doc_type': doc_type,
'metadata': metadata,
'sections': set()
}
# Add section information
if section_path:
sources_by_doc[key]['sections'].add(f"пункт {section_path}")
elif section_id and section_id != 'unknown':
sources_by_doc[key]['sections'].add(f"пункт {section_id}")
# Generate HTML for each unique source
for source_info in sources_by_doc.values():
metadata = source_info['metadata']
doc_type = source_info['doc_type']
doc_id = source_info['doc_id']
html += f"<div style='margin-bottom: 15px; padding: 15px; border: 1px solid #4a5568; border-radius: 8px; background-color: #1a202c;'>"
if doc_type == 'text':
html += f"<h4 style='margin: 0 0 10px 0; color: #63b3ed;'>📄 {doc_id}</h4>"
elif doc_type == 'table' or doc_type == 'table_row':
table_num = metadata.get('table_number', 'unknown')
table_title = metadata.get('table_title', '')
if table_num and table_num != 'unknown':
if not str(table_num).startswith('№'):
table_num = f"№{table_num}"
html += f"<h4 style='margin: 0 0 10px 0; color: #68d391;'>📊 Таблица {table_num} - {doc_id}</h4>"
if table_title and table_title != 'unknown':
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{table_title}</p>"
else:
html += f"<h4 style='margin: 0 0 10px 0; color: #68d391;'>📊 Таблица - {doc_id}</h4>"
elif doc_type == 'image':
image_num = metadata.get('image_number', 'unknown')
image_title = metadata.get('image_title', '')
section = metadata.get('section', '')
if image_num and image_num != 'unknown':
if not str(image_num).startswith('№'):
image_num = f"№{image_num}"
html += f"<h4 style='margin: 0 0 10px 0; color: #fbb6ce;'>🖼️ Изображение {image_num} - {doc_id}</h4>"
if image_title and image_title != 'unknown':
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{image_title}</p>"
if section and section != 'unknown':
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 12px;'>Раздел: {section}</p>"
else:
html += f"<h4 style='margin: 0 0 10px 0; color: #fbb6ce;'>🖼️ Изображение - {doc_id}</h4>"
# Add file link if available
if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text':
doc_rows = chunks_df[chunks_df['document_id'] == doc_id]
if not doc_rows.empty:
file_link = doc_rows.iloc[0]['file_link']
html += f"<a href='{file_link}' target='_blank' style='color: #68d391; text-decoration: none; font-size: 14px; display: inline-block; margin-top: 10px;'>🔗 Ссылка на документ</a><br>"
html += "</div>"
html += "</div>"
return html
def answer_question(question, query_engine, reranker, current_model, chunks_df=None):
if query_engine is None:
return "<div style='background-color: #e53e3e; color: white; padding: 20px; border-radius: 10px;'>Система не инициализирована</div>", ""
try:
log_message(f"Получен вопрос: {question}")
start_time = time.time()
# Извлечение узлов
retrieved_nodes = query_engine.retriever.retrieve(question)
log_message(f"Извлечено {len(retrieved_nodes)} узлов")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ ИСТОЧНИКОВ
log_message("=== ДЕТАЛЬНАЯ ИНФОРМАЦИЯ О НАЙДЕННЫХ УЗЛАХ ===")
for i, node in enumerate(retrieved_nodes):
log_message(f"Узел {i+1}:")
log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}")
log_message(f" Тип: {node.metadata.get('type', 'unknown')}")
log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}")
log_message(f" Текст (первые 200 символов): {node.text[:200]}...")
log_message(f" Метаданные: {node.metadata}")
# Переранжировка
reranked_nodes = rerank_nodes(question, retrieved_nodes, reranker, top_k=10)
log_message("=== УЗЛЫ ПОСЛЕ ПЕРЕРАНЖИРОВКИ ===")
for i, node in enumerate(reranked_nodes):
log_message(f"Переранжированный узел {i+1}:")
log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}")
log_message(f" Тип: {node.metadata.get('type', 'unknown')}")
log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}")
log_message(f" Полный текст: {node.text}")
formatted_context = format_context_for_llm(reranked_nodes)
log_message(f"ПОЛНЫЙ КОНТЕКСТ ДЛЯ LLM:\n{formatted_context}")
enhanced_question = f"""
Контекст из базы данных:
{formatted_context}
Вопрос пользователя: {question}"""
response = query_engine.query(enhanced_question)
log_message(f"ОТВЕТ LLM: {response.response}")
end_time = time.time()
processing_time = end_time - start_time
log_message(f"Обработка завершена за {processing_time:.2f} секунд")
sources_html = generate_sources_html(reranked_nodes, chunks_df)
answer_with_time = f"""<div style='background-color: #2d3748; color: white; padding: 20px; border-radius: 10px; margin-bottom: 10px;'>
<h3 style='color: #63b3ed; margin-top: 0;'>Ответ (Модель: {current_model}):</h3>
<div style='line-height: 1.6; font-size: 16px;'>{response.response}</div>
<div style='margin-top: 15px; padding-top: 10px; border-top: 1px solid #4a5568; font-size: 14px; color: #a0aec0;'>
Время обработки: {processing_time:.2f} секунд
</div>
</div>"""
chunk_info = []
for node in reranked_nodes:
section_id = node.metadata.get('section_id', node.metadata.get('section', 'unknown'))
chunk_info.append({
'document_id': node.metadata.get('document_id', 'unknown'),
'section_id': section_id,
'chunk_size': len(node.text),
'chunk_text': node.text
})
from app import create_chunks_display_html
chunks_html = create_chunks_display_html(chunk_info)
return answer_with_time, sources_html, chunks_html
except Exception as e:
log_message(f"Ошибка обработки вопроса: {str(e)}")
error_msg = f"<div style='background-color: #e53e3e; color: white; padding: 20px; border-radius: 10px;'>Ошибка обработки вопроса: {str(e)}</div>"
return error_msg, ""
import logging
import sys
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from sentence_transformers import CrossEncoder
from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY
import time
from index_retriever import rerank_nodes
from my_logging import log_message
from config import PROMPT_SIMPLE_POISK
def get_llm_model(model_name):
try:
model_config = AVAILABLE_MODELS.get(model_name)
if not model_config:
log_message(f"Модель {model_name} не найдена, использую модель по умолчанию")
model_config = AVAILABLE_MODELS[DEFAULT_MODEL]
if not model_config.get("api_key"):
raise Exception(f"API ключ не найден для модели {model_name}")
if model_config["provider"] == "google":
return GoogleGenAI(
model=model_config["model_name"],
api_key=model_config["api_key"]
)
elif model_config["provider"] == "openai":
return OpenAI(
model=model_config["model_name"],
api_key=model_config["api_key"]
)
else:
raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}")
except Exception as e:
log_message(f"Ошибка создания модели {model_name}: {str(e)}")
return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY)
def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
return HuggingFaceEmbedding(model_name=model_name)
def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'):
return CrossEncoder(model_name)
def format_context_for_llm(nodes):
context_parts = []
for node in nodes:
metadata = node.metadata if hasattr(node, 'metadata') else {}
doc_id = metadata.get('document_id', 'Неизвестный документ')
section_info = ""
if metadata.get('section_path'):
section_path = metadata['section_path']
section_text = metadata.get('section_text', '')
parent_section = metadata.get('parent_section', '')
parent_title = metadata.get('parent_title', '')
if metadata.get('level') in ['subsection', 'sub_subsection', 'sub_sub_subsection'] and parent_section and parent_title:
section_info = f"пункт {section_path} ({section_text}) в разделе {parent_section} ({parent_title})"
elif section_text:
section_info = f"пункт {section_path} ({section_text})"
else:
section_info = f"пункт {section_path}"
elif metadata.get('section_id'):
section_id = metadata['section_id']
section_text = metadata.get('section_text', '')
if section_text:
section_info = f"пункт {section_id} ({section_text})"
else:
section_info = f"пункт {section_id}"
if metadata.get('type') == 'table' and metadata.get('table_number'):
table_num = metadata['table_number']
if not str(table_num).startswith('№'):
table_num = f"№{table_num}"
section_info = f"таблица {table_num}"
if metadata.get('type') == 'image' and metadata.get('image_number'):
image_num = metadata['image_number']
if not str(image_num).startswith('№'):
image_num = f"№{image_num}"
section_info = f"рисунок {image_num}"
context_text = node.text if hasattr(node, 'text') else str(node)
if section_info:
formatted_context = f"[ИСТОЧНИК: {section_info} документа {doc_id}]\n{context_text}\n"
else:
formatted_context = f"[ИСТОЧНИК: документ {doc_id}]\n{context_text}\n"
context_parts.append(formatted_context)
return "\n".join(context_parts)
def generate_sources_html(nodes, chunks_df=None):
html = "<div style='background-color: #2d3748; color: white; padding: 20px; border-radius: 10px; max-height: 400px; overflow-y: auto;'>"
html += "<h3 style='color: #63b3ed; margin-top: 0;'>Источники:</h3>"
# Group nodes by document to avoid duplicates
sources_by_doc = {}
for i, node in enumerate(nodes):
metadata = node.metadata if hasattr(node, 'metadata') else {}
doc_type = metadata.get('type', 'text')
doc_id = metadata.get('document_id', 'unknown')
section_id = metadata.get('section_id', '')
section_text = metadata.get('section_text', '')
section_path = metadata.get('section_path', '')
# Create a unique key for grouping
if doc_type == 'table':
table_num = metadata.get('table_number', 'unknown')
key = f"{doc_id}_table_{table_num}"
elif doc_type == 'image':
image_num = metadata.get('image_number', 'unknown')
key = f"{doc_id}_image_{image_num}"
else:
# For text documents, group by section path or section id
section_key = section_path if section_path else section_id
key = f"{doc_id}_text_{section_key}"
if key not in sources_by_doc:
sources_by_doc[key] = {
'doc_id': doc_id,
'doc_type': doc_type,
'metadata': metadata,
'sections': set()
}
# Add section information
if section_path:
sources_by_doc[key]['sections'].add(f"пункт {section_path}")
elif section_id and section_id != 'unknown':
sources_by_doc[key]['sections'].add(f"пункт {section_id}")
# Generate HTML for each unique source
for source_info in sources_by_doc.values():
metadata = source_info['metadata']
doc_type = source_info['doc_type']
doc_id = source_info['doc_id']
html += f"<div style='margin-bottom: 15px; padding: 15px; border: 1px solid #4a5568; border-radius: 8px; background-color: #1a202c;'>"
if doc_type == 'text':
html += f"<h4 style='margin: 0 0 10px 0; color: #63b3ed;'>📄 {doc_id}</h4>"
# Show all sections for this document
if source_info['sections']:
sections_text = ", ".join(sorted(source_info['sections']))
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{sections_text}</p>"
elif doc_type == 'table' or doc_type == 'table_row':
table_num = metadata.get('table_number', 'unknown')
table_title = metadata.get('table_title', '')
if table_num and table_num != 'unknown':
if not str(table_num).startswith('№'):
table_num = f"№{table_num}"
html += f"<h4 style='margin: 0 0 10px 0; color: #68d391;'>📊 Таблица {table_num} - {doc_id}</h4>"
if table_title and table_title != 'unknown':
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{table_title}</p>"
else:
html += f"<h4 style='margin: 0 0 10px 0; color: #68d391;'>📊 Таблица - {doc_id}</h4>"
elif doc_type == 'image':
image_num = metadata.get('image_number', 'unknown')
image_title = metadata.get('image_title', '')
section = metadata.get('section', '')
if image_num and image_num != 'unknown':
if not str(image_num).startswith('№'):
image_num = f"№{image_num}"
html += f"<h4 style='margin: 0 0 10px 0; color: #fbb6ce;'>🖼️ Изображение {image_num} - {doc_id}</h4>"
if image_title and image_title != 'unknown':
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{image_title}</p>"
if section and section != 'unknown':
html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 12px;'>Раздел: {section}</p>"
else:
html += f"<h4 style='margin: 0 0 10px 0; color: #fbb6ce;'>🖼️ Изображение - {doc_id}</h4>"
# Add file link if available
if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text':
doc_rows = chunks_df[chunks_df['document_id'] == doc_id]
if not doc_rows.empty:
file_link = doc_rows.iloc[0]['file_link']
html += f"<a href='{file_link}' target='_blank' style='color: #68d391; text-decoration: none; font-size: 14px; display: inline-block; margin-top: 10px;'>🔗 Ссылка на документ</a><br>"
html += "</div>"
html += "</div>"
return html
def answer_question(question, query_engine, reranker, current_model, chunks_df=None):
if query_engine is None:
return "<div style='background-color: #e53e3e; color: white; padding: 20px; border-radius: 10px;'>Система не инициализирована</div>", ""
try:
log_message(f"Получен вопрос: {question}")
start_time = time.time()
# Извлечение узлов
retrieved_nodes = query_engine.retriever.retrieve(question)
log_message(f"Извлечено {len(retrieved_nodes)} узлов")
# ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ ИСТОЧНИКОВ
log_message("=== ДЕТАЛЬНАЯ ИНФОРМАЦИЯ О НАЙДЕННЫХ УЗЛАХ ===")
for i, node in enumerate(retrieved_nodes):
log_message(f"Узел {i+1}:")
log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}")
log_message(f" Тип: {node.metadata.get('type', 'unknown')}")
log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}")
log_message(f" Текст (первые 200 символов): {node.text[:200]}...")
log_message(f" Метаданные: {node.metadata}")
# Переранжировка
reranked_nodes = rerank_nodes(question, retrieved_nodes, reranker, top_k=10)
log_message("=== УЗЛЫ ПОСЛЕ ПЕРЕРАНЖИРОВКИ ===")
for i, node in enumerate(reranked_nodes):
log_message(f"Переранжированный узел {i+1}:")
log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}")
log_message(f" Тип: {node.metadata.get('type', 'unknown')}")
log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}")
log_message(f" Полный текст: {node.text}")
formatted_context = format_context_for_llm(reranked_nodes)
log_message(f"ПОЛНЫЙ КОНТЕКСТ ДЛЯ LLM:\n{formatted_context}")
enhanced_question = f"""
Контекст из базы данных:
{formatted_context}
Вопрос пользователя: {question}"""
response = query_engine.query(enhanced_question)
log_message(f"ОТВЕТ LLM: {response.response}")
end_time = time.time()
processing_time = end_time - start_time
log_message(f"Обработка завершена за {processing_time:.2f} секунд")
sources_html = generate_sources_html(reranked_nodes, chunks_df)
answer_with_time = f"""<div style='background-color: #2d3748; color: white; padding: 20px; border-radius: 10px; margin-bottom: 10px;'>
<h3 style='color: #63b3ed; margin-top: 0;'>Ответ (Модель: {current_model}):</h3>
<div style='line-height: 1.6; font-size: 16px;'>{response.response}</div>
<div style='margin-top: 15px; padding-top: 10px; border-top: 1px solid #4a5568; font-size: 14px; color: #a0aec0;'>
Время обработки: {processing_time:.2f} секунд
</div>
</div>"""
chunk_info = []
for node in reranked_nodes:
section_id = node.metadata.get('section_id', node.metadata.get('section', 'unknown'))
chunk_info.append({
'document_id': node.metadata.get('document_id', 'unknown'),
'section_id': section_id,
'chunk_size': len(node.text),
'chunk_text': node.text
})
from app import create_chunks_display_html
chunks_html = create_chunks_display_html(chunk_info)
return answer_with_time, sources_html, chunks_html
except Exception as e:
log_message(f"Ошибка обработки вопроса: {str(e)}")
error_msg = f"<div style='background-color: #e53e3e; color: white; padding: 20px; border-radius: 10px;'>Ошибка обработки вопроса: {str(e)}</div>"
return error_msg, ""