import logging import sys from llama_index.llms.google_genai import GoogleGenAI from llama_index.llms.openai import OpenAI from llama_index.embeddings.huggingface import HuggingFaceEmbedding from sentence_transformers import CrossEncoder from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY import time from index_retriever import rerank_nodes from my_logging import log_message from config import PROMPT_SIMPLE_POISK def get_llm_model(model_name): try: model_config = AVAILABLE_MODELS.get(model_name) if not model_config: log_message(f"Модель {model_name} не найдена, использую модель по умолчанию") model_config = AVAILABLE_MODELS[DEFAULT_MODEL] if not model_config.get("api_key"): raise Exception(f"API ключ не найден для модели {model_name}") if model_config["provider"] == "google": return GoogleGenAI( model=model_config["model_name"], api_key=model_config["api_key"] ) elif model_config["provider"] == "openai": return OpenAI( model=model_config["model_name"], api_key=model_config["api_key"] ) else: raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}") except Exception as e: log_message(f"Ошибка создания модели {model_name}: {str(e)}") return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY) def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"): return HuggingFaceEmbedding(model_name=model_name) def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'): return CrossEncoder(model_name) def format_context_for_llm(nodes): context_parts = [] for node in nodes: metadata = node.metadata if hasattr(node, 'metadata') else {} doc_id = metadata.get('document_id', 'Неизвестный документ') section_info = "" if metadata.get('section_path'): section_path = metadata['section_path'] section_text = metadata.get('section_text', '') parent_section = metadata.get('parent_section', '') parent_title = metadata.get('parent_title', '') if metadata.get('level') in ['subsection', 'sub_subsection', 'sub_sub_subsection'] and parent_section and parent_title: section_info = f"пункт {section_path} ({section_text}) в разделе {parent_section} ({parent_title})" elif section_text: section_info = f"пункт {section_path} ({section_text})" else: section_info = f"пункт {section_path}" elif metadata.get('section_id'): section_id = metadata['section_id'] section_text = metadata.get('section_text', '') if section_text: section_info = f"пункт {section_id} ({section_text})" else: section_info = f"пункт {section_id}" if metadata.get('type') == 'table' and metadata.get('table_number'): table_num = metadata['table_number'] if not str(table_num).startswith('№'): table_num = f"№{table_num}" section_info = f"таблица {table_num}" if metadata.get('type') == 'image' and metadata.get('image_number'): image_num = metadata['image_number'] if not str(image_num).startswith('№'): image_num = f"№{image_num}" section_info = f"рисунок {image_num}" context_text = node.text if hasattr(node, 'text') else str(node) if section_info: formatted_context = f"[ИСТОЧНИК: {section_info} документа {doc_id}]\n{context_text}\n" else: formatted_context = f"[ИСТОЧНИК: документ {doc_id}]\n{context_text}\n" context_parts.append(formatted_context) return "\n".join(context_parts) def generate_sources_html(nodes, chunks_df=None): html = "
" html += "

Источники:

" # Group nodes by document to avoid duplicates sources_by_doc = {} for i, node in enumerate(nodes): metadata = node.metadata if hasattr(node, 'metadata') else {} doc_type = metadata.get('type', 'text') doc_id = metadata.get('document_id', 'unknown') section_id = metadata.get('section_id', '') section_text = metadata.get('section_text', '') section_path = metadata.get('section_path', '') # Create a unique key for grouping if doc_type == 'table': table_num = metadata.get('table_number', 'unknown') key = f"{doc_id}_table_{table_num}" elif doc_type == 'image': image_num = metadata.get('image_number', 'unknown') key = f"{doc_id}_image_{image_num}" else: # For text documents, group by section path or section id section_key = section_path if section_path else section_id key = f"{doc_id}_text_{section_key}" if key not in sources_by_doc: sources_by_doc[key] = { 'doc_id': doc_id, 'doc_type': doc_type, 'metadata': metadata, 'sections': set() } # Add section information if section_path: sources_by_doc[key]['sections'].add(f"пункт {section_path}") elif section_id and section_id != 'unknown': sources_by_doc[key]['sections'].add(f"пункт {section_id}") # Generate HTML for each unique source for source_info in sources_by_doc.values(): metadata = source_info['metadata'] doc_type = source_info['doc_type'] doc_id = source_info['doc_id'] html += f"
" if doc_type == 'text': html += f"

📄 {doc_id}

" elif doc_type == 'table' or doc_type == 'table_row': table_num = metadata.get('table_number', 'unknown') table_title = metadata.get('table_title', '') if table_num and table_num != 'unknown': if not str(table_num).startswith('№'): table_num = f"№{table_num}" html += f"

📊 Таблица {table_num} - {doc_id}

" if table_title and table_title != 'unknown': html += f"

{table_title}

" else: html += f"

📊 Таблица - {doc_id}

" elif doc_type == 'image': image_num = metadata.get('image_number', 'unknown') image_title = metadata.get('image_title', '') section = metadata.get('section', '') if image_num and image_num != 'unknown': if not str(image_num).startswith('№'): image_num = f"№{image_num}" html += f"

🖼️ Изображение {image_num} - {doc_id}

" if image_title and image_title != 'unknown': html += f"

{image_title}

" if section and section != 'unknown': html += f"

Раздел: {section}

" else: html += f"

🖼️ Изображение - {doc_id}

" # Add file link if available if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text': doc_rows = chunks_df[chunks_df['document_id'] == doc_id] if not doc_rows.empty: file_link = doc_rows.iloc[0]['file_link'] html += f"🔗 Ссылка на документ
" html += "
" html += "
" return html def answer_question(question, query_engine, reranker, current_model, chunks_df=None): if query_engine is None: return "
Система не инициализирована
", "" try: log_message(f"Получен вопрос: {question}") start_time = time.time() # Извлечение узлов retrieved_nodes = query_engine.retriever.retrieve(question) log_message(f"Извлечено {len(retrieved_nodes)} узлов") # ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ ИСТОЧНИКОВ log_message("=== ДЕТАЛЬНАЯ ИНФОРМАЦИЯ О НАЙДЕННЫХ УЗЛАХ ===") for i, node in enumerate(retrieved_nodes): log_message(f"Узел {i+1}:") log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}") log_message(f" Тип: {node.metadata.get('type', 'unknown')}") log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}") log_message(f" Текст (первые 200 символов): {node.text[:200]}...") log_message(f" Метаданные: {node.metadata}") # Переранжировка reranked_nodes = rerank_nodes(question, retrieved_nodes, reranker, top_k=10) log_message("=== УЗЛЫ ПОСЛЕ ПЕРЕРАНЖИРОВКИ ===") for i, node in enumerate(reranked_nodes): log_message(f"Переранжированный узел {i+1}:") log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}") log_message(f" Тип: {node.metadata.get('type', 'unknown')}") log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}") log_message(f" Полный текст: {node.text}") formatted_context = format_context_for_llm(reranked_nodes) log_message(f"ПОЛНЫЙ КОНТЕКСТ ДЛЯ LLM:\n{formatted_context}") enhanced_question = f""" Контекст из базы данных: {formatted_context} Вопрос пользователя: {question}""" response = query_engine.query(enhanced_question) log_message(f"ОТВЕТ LLM: {response.response}") end_time = time.time() processing_time = end_time - start_time log_message(f"Обработка завершена за {processing_time:.2f} секунд") sources_html = generate_sources_html(reranked_nodes, chunks_df) answer_with_time = f"""

Ответ (Модель: {current_model}):

{response.response}
Время обработки: {processing_time:.2f} секунд
""" chunk_info = [] for node in reranked_nodes: section_id = node.metadata.get('section_id', node.metadata.get('section', 'unknown')) chunk_info.append({ 'document_id': node.metadata.get('document_id', 'unknown'), 'section_id': section_id, 'chunk_size': len(node.text), 'chunk_text': node.text }) from app import create_chunks_display_html chunks_html = create_chunks_display_html(chunk_info) return answer_with_time, sources_html, chunks_html except Exception as e: log_message(f"Ошибка обработки вопроса: {str(e)}") error_msg = f"
Ошибка обработки вопроса: {str(e)}
" return error_msg, "" import logging import sys from llama_index.llms.google_genai import GoogleGenAI from llama_index.llms.openai import OpenAI from llama_index.embeddings.huggingface import HuggingFaceEmbedding from sentence_transformers import CrossEncoder from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY import time from index_retriever import rerank_nodes from my_logging import log_message from config import PROMPT_SIMPLE_POISK def get_llm_model(model_name): try: model_config = AVAILABLE_MODELS.get(model_name) if not model_config: log_message(f"Модель {model_name} не найдена, использую модель по умолчанию") model_config = AVAILABLE_MODELS[DEFAULT_MODEL] if not model_config.get("api_key"): raise Exception(f"API ключ не найден для модели {model_name}") if model_config["provider"] == "google": return GoogleGenAI( model=model_config["model_name"], api_key=model_config["api_key"] ) elif model_config["provider"] == "openai": return OpenAI( model=model_config["model_name"], api_key=model_config["api_key"] ) else: raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}") except Exception as e: log_message(f"Ошибка создания модели {model_name}: {str(e)}") return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY) def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"): return HuggingFaceEmbedding(model_name=model_name) def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'): return CrossEncoder(model_name) def format_context_for_llm(nodes): context_parts = [] for node in nodes: metadata = node.metadata if hasattr(node, 'metadata') else {} doc_id = metadata.get('document_id', 'Неизвестный документ') section_info = "" if metadata.get('section_path'): section_path = metadata['section_path'] section_text = metadata.get('section_text', '') parent_section = metadata.get('parent_section', '') parent_title = metadata.get('parent_title', '') if metadata.get('level') in ['subsection', 'sub_subsection', 'sub_sub_subsection'] and parent_section and parent_title: section_info = f"пункт {section_path} ({section_text}) в разделе {parent_section} ({parent_title})" elif section_text: section_info = f"пункт {section_path} ({section_text})" else: section_info = f"пункт {section_path}" elif metadata.get('section_id'): section_id = metadata['section_id'] section_text = metadata.get('section_text', '') if section_text: section_info = f"пункт {section_id} ({section_text})" else: section_info = f"пункт {section_id}" if metadata.get('type') == 'table' and metadata.get('table_number'): table_num = metadata['table_number'] if not str(table_num).startswith('№'): table_num = f"№{table_num}" section_info = f"таблица {table_num}" if metadata.get('type') == 'image' and metadata.get('image_number'): image_num = metadata['image_number'] if not str(image_num).startswith('№'): image_num = f"№{image_num}" section_info = f"рисунок {image_num}" context_text = node.text if hasattr(node, 'text') else str(node) if section_info: formatted_context = f"[ИСТОЧНИК: {section_info} документа {doc_id}]\n{context_text}\n" else: formatted_context = f"[ИСТОЧНИК: документ {doc_id}]\n{context_text}\n" context_parts.append(formatted_context) return "\n".join(context_parts) def generate_sources_html(nodes, chunks_df=None): html = "
" html += "

Источники:

" # Group nodes by document to avoid duplicates sources_by_doc = {} for i, node in enumerate(nodes): metadata = node.metadata if hasattr(node, 'metadata') else {} doc_type = metadata.get('type', 'text') doc_id = metadata.get('document_id', 'unknown') section_id = metadata.get('section_id', '') section_text = metadata.get('section_text', '') section_path = metadata.get('section_path', '') # Create a unique key for grouping if doc_type == 'table': table_num = metadata.get('table_number', 'unknown') key = f"{doc_id}_table_{table_num}" elif doc_type == 'image': image_num = metadata.get('image_number', 'unknown') key = f"{doc_id}_image_{image_num}" else: # For text documents, group by section path or section id section_key = section_path if section_path else section_id key = f"{doc_id}_text_{section_key}" if key not in sources_by_doc: sources_by_doc[key] = { 'doc_id': doc_id, 'doc_type': doc_type, 'metadata': metadata, 'sections': set() } # Add section information if section_path: sources_by_doc[key]['sections'].add(f"пункт {section_path}") elif section_id and section_id != 'unknown': sources_by_doc[key]['sections'].add(f"пункт {section_id}") # Generate HTML for each unique source for source_info in sources_by_doc.values(): metadata = source_info['metadata'] doc_type = source_info['doc_type'] doc_id = source_info['doc_id'] html += f"
" if doc_type == 'text': html += f"

📄 {doc_id}

" # Show all sections for this document if source_info['sections']: sections_text = ", ".join(sorted(source_info['sections'])) html += f"

{sections_text}

" elif doc_type == 'table' or doc_type == 'table_row': table_num = metadata.get('table_number', 'unknown') table_title = metadata.get('table_title', '') if table_num and table_num != 'unknown': if not str(table_num).startswith('№'): table_num = f"№{table_num}" html += f"

📊 Таблица {table_num} - {doc_id}

" if table_title and table_title != 'unknown': html += f"

{table_title}

" else: html += f"

📊 Таблица - {doc_id}

" elif doc_type == 'image': image_num = metadata.get('image_number', 'unknown') image_title = metadata.get('image_title', '') section = metadata.get('section', '') if image_num and image_num != 'unknown': if not str(image_num).startswith('№'): image_num = f"№{image_num}" html += f"

🖼️ Изображение {image_num} - {doc_id}

" if image_title and image_title != 'unknown': html += f"

{image_title}

" if section and section != 'unknown': html += f"

Раздел: {section}

" else: html += f"

🖼️ Изображение - {doc_id}

" # Add file link if available if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text': doc_rows = chunks_df[chunks_df['document_id'] == doc_id] if not doc_rows.empty: file_link = doc_rows.iloc[0]['file_link'] html += f"🔗 Ссылка на документ
" html += "
" html += "
" return html def answer_question(question, query_engine, reranker, current_model, chunks_df=None): if query_engine is None: return "
Система не инициализирована
", "" try: log_message(f"Получен вопрос: {question}") start_time = time.time() # Извлечение узлов retrieved_nodes = query_engine.retriever.retrieve(question) log_message(f"Извлечено {len(retrieved_nodes)} узлов") # ДЕТАЛЬНОЕ ЛОГИРОВАНИЕ ИСТОЧНИКОВ log_message("=== ДЕТАЛЬНАЯ ИНФОРМАЦИЯ О НАЙДЕННЫХ УЗЛАХ ===") for i, node in enumerate(retrieved_nodes): log_message(f"Узел {i+1}:") log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}") log_message(f" Тип: {node.metadata.get('type', 'unknown')}") log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}") log_message(f" Текст (первые 200 символов): {node.text[:200]}...") log_message(f" Метаданные: {node.metadata}") # Переранжировка reranked_nodes = rerank_nodes(question, retrieved_nodes, reranker, top_k=10) log_message("=== УЗЛЫ ПОСЛЕ ПЕРЕРАНЖИРОВКИ ===") for i, node in enumerate(reranked_nodes): log_message(f"Переранжированный узел {i+1}:") log_message(f" Документ: {node.metadata.get('document_id', 'unknown')}") log_message(f" Тип: {node.metadata.get('type', 'unknown')}") log_message(f" Раздел: {node.metadata.get('section_id', 'unknown')}") log_message(f" Полный текст: {node.text}") formatted_context = format_context_for_llm(reranked_nodes) log_message(f"ПОЛНЫЙ КОНТЕКСТ ДЛЯ LLM:\n{formatted_context}") enhanced_question = f""" Контекст из базы данных: {formatted_context} Вопрос пользователя: {question}""" response = query_engine.query(enhanced_question) log_message(f"ОТВЕТ LLM: {response.response}") end_time = time.time() processing_time = end_time - start_time log_message(f"Обработка завершена за {processing_time:.2f} секунд") sources_html = generate_sources_html(reranked_nodes, chunks_df) answer_with_time = f"""

Ответ (Модель: {current_model}):

{response.response}
Время обработки: {processing_time:.2f} секунд
""" chunk_info = [] for node in reranked_nodes: section_id = node.metadata.get('section_id', node.metadata.get('section', 'unknown')) chunk_info.append({ 'document_id': node.metadata.get('document_id', 'unknown'), 'section_id': section_id, 'chunk_size': len(node.text), 'chunk_text': node.text }) from app import create_chunks_display_html chunks_html = create_chunks_display_html(chunk_info) return answer_with_time, sources_html, chunks_html except Exception as e: log_message(f"Ошибка обработки вопроса: {str(e)}") error_msg = f"
Ошибка обработки вопроса: {str(e)}
" return error_msg, ""