import logging import sys from llama_index.llms.google_genai import GoogleGenAI from llama_index.llms.openai import OpenAI from llama_index.embeddings.huggingface import HuggingFaceEmbedding from sentence_transformers import CrossEncoder from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY import time from index_retriever import rerank_nodes from my_logging import log_message from config import PROMPT_SIMPLE_POISK from config import QUERY_EXPANSION_PROMPT from documents_prep import normalize_text, normalize_steel_designations KEYWORD_EXPANSIONS = { "08X18H10T": ["Листы", "Трубы", "Поковки", "Крепежные изделия", "Сортовой прокат", "Отливки"], "12X18H10T": ["Листы", "Поковки", "Сортовой прокат"], "10X17H13M2T": ["Трубы", "Арматура", "Поковки", "Фланцы"], "20X23H18": ["Листы", "Сортовой прокат", "Поковки"], "03X17H14M3": ["Трубы", "Листы", "Проволока"], "СВ-08X19H10": ["Сварочная проволока", "Сварка", "Сварочные материалы"], } def get_llm_model(model_name): try: model_config = AVAILABLE_MODELS.get(model_name) if not model_config: log_message(f"Модель {model_name} не найдена, использую модель по умолчанию") model_config = AVAILABLE_MODELS[DEFAULT_MODEL] if not model_config.get("api_key"): raise Exception(f"API ключ не найден для модели {model_name}") if model_config["provider"] == "google": return GoogleGenAI( model=model_config["model_name"], api_key=model_config["api_key"] ) elif model_config["provider"] == "openai": return OpenAI( model=model_config["model_name"], api_key=model_config["api_key"] ) else: raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}") except Exception as e: log_message(f"Ошибка создания модели {model_name}: {str(e)}") return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY) def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"): return HuggingFaceEmbedding(model_name=model_name) def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'): return CrossEncoder(model_name) def generate_sources_html(nodes, chunks_df=None): html = "
" html += "

Источники:

" sources_by_doc = {} for i, node in enumerate(nodes): metadata = node.metadata if hasattr(node, 'metadata') else {} doc_type = metadata.get('type', 'text') doc_id = metadata.get('document_id', 'unknown') if doc_type == 'table' or doc_type == 'table_row': table_num = metadata.get('table_number', 'unknown') key = f"{doc_id}_table_{table_num}" elif doc_type == 'image': image_num = metadata.get('image_number', 'unknown') key = f"{doc_id}_image_{image_num}" else: section_path = metadata.get('section_path', '') section_id = metadata.get('section_id', '') section_key = section_path if section_path else section_id key = f"{doc_id}_text_{section_key}" if key not in sources_by_doc: sources_by_doc[key] = { 'doc_id': doc_id, 'doc_type': doc_type, 'metadata': metadata, 'sections': set() } if doc_type not in ['table', 'table_row', 'image']: section_path = metadata.get('section_path', '') section_id = metadata.get('section_id', '') if section_path: sources_by_doc[key]['sections'].add(f"пункт {section_path}") elif section_id and section_id != 'unknown': sources_by_doc[key]['sections'].add(f"пункт {section_id}") for source_info in sources_by_doc.values(): metadata = source_info['metadata'] doc_type = source_info['doc_type'] doc_id = source_info['doc_id'] html += f"
" if doc_type == 'text': html += f"

📄 {doc_id}

" elif doc_type == 'table' or doc_type == 'table_row': table_num = metadata.get('table_number', 'unknown') table_title = metadata.get('table_title', '') if table_num and table_num != 'unknown': if not str(table_num).startswith('№'): table_num = f"№{table_num}" html += f"

📊 Таблица {table_num} - {doc_id}

" if table_title and table_title != 'unknown': html += f"

{table_title}

" else: html += f"

📊 Таблица - {doc_id}

" elif doc_type == 'image': image_num = metadata.get('image_number', 'unknown') image_title = metadata.get('image_title', '') if image_num and image_num != 'unknown': if not str(image_num).startswith('№'): image_num = f"№{image_num}" html += f"

🖼️ Изображение {image_num} - {doc_id}

" if image_title and image_title != 'unknown': html += f"

{image_title}

" if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text': doc_rows = chunks_df[chunks_df['document_id'] == doc_id] if not doc_rows.empty: file_link = doc_rows.iloc[0]['file_link'] html += f"🔗 Ссылка на документ
" html += "
" html += "
" return html def deduplicate_nodes(nodes): """Deduplicate retrieved nodes based on content and metadata""" seen = set() unique_nodes = [] for node in nodes: doc_id = node.metadata.get('document_id', '') node_type = node.metadata.get('type', 'text') if node_type == 'table' or node_type == 'table_row': table_num = node.metadata.get('table_number', '') table_identifier = node.metadata.get('table_identifier', table_num) # Use row range to distinguish table chunks row_start = node.metadata.get('row_start', '') row_end = node.metadata.get('row_end', '') is_complete = node.metadata.get('is_complete_table', False) if is_complete: identifier = f"{doc_id}|table|{table_identifier}|complete" elif row_start != '' and row_end != '': identifier = f"{doc_id}|table|{table_identifier}|rows_{row_start}_{row_end}" else: # Fallback: use chunk_id if available chunk_id = node.metadata.get('chunk_id', '') if chunk_id != '': identifier = f"{doc_id}|table|{table_identifier}|chunk_{chunk_id}" else: # Last resort: hash first 100 chars of content import hashlib content_hash = hashlib.md5(node.text[:100].encode()).hexdigest()[:8] identifier = f"{doc_id}|table|{table_identifier}|{content_hash}" elif node_type == 'image': img_num = node.metadata.get('image_number', '') identifier = f"{doc_id}|image|{img_num}" else: # text section_id = node.metadata.get('section_id', '') chunk_id = node.metadata.get('chunk_id', 0) # For text, section_id + chunk_id should be unique identifier = f"{doc_id}|text|{section_id}|{chunk_id}" if identifier not in seen: seen.add(identifier) unique_nodes.append(node) return unique_nodes def enhance_query_with_keywords(query): query_upper = query.upper() added_context = [] keywords_found = [] for keyword, expansions in KEYWORD_EXPANSIONS.items(): keyword_upper = keyword.upper() if keyword_upper in query_upper: context = ' '.join(expansions) added_context.append(context) keywords_found.append(keyword) log_message(f" Found keyword '{keyword}': added context '{context}'") if added_context: unique_context = ' '.join(set(' '.join(added_context).split())) enhanced = f"{query} {unique_context}" log_message(f"Enhanced query with keywords: {', '.join(keywords_found)}") log_message(f"Added context: {unique_context[:100]}...") return enhanced return f"{query}" def get_repository_stats(repo_id, hf_token, json_dir, table_dir, image_dir): """Get statistics about documents in the repository""" try: from huggingface_hub import list_repo_files files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) # Count JSON text files json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')] zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')] # Count table files table_files = [f for f in files if f.startswith(table_dir) and (f.endswith('.json') or f.endswith('.xlsx') or f.endswith('.xls'))] # Count image files image_files = [f for f in files if f.startswith(image_dir) and (f.endswith('.csv') or f.endswith('.xlsx') or f.endswith('.xls'))] stats = { 'text_files': len(json_files) + len(zip_files), 'table_files': len(table_files), 'image_files': len(image_files), 'total_files': len(json_files) + len(zip_files) + len(table_files) + len(image_files) } log_message(f"Repository stats: {stats}") return stats except Exception as e: log_message(f"Error getting repository stats: {e}") return {'text_files': 0, 'table_files': 0, 'image_files': 0, 'total_files': 0} def format_stats_display(stats): """Format statistics for display""" return f"""📊 **Статистика базы данных:** 📝 Текстовые документы (JSON): **{stats['text_files']}** 📊 Табличные данные: **{stats['table_files']}** 🖼️ Изображения: **{stats['image_files']}** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 📦 Всего файлов: **{stats['total_files']}** """ def merge_table_chunks(chunk_info): merged = {} for chunk in chunk_info: doc_type = chunk.get('type', 'text') doc_id = chunk.get('document_id', 'unknown') if doc_type == 'table' or doc_type == 'table_row': table_num = chunk.get('table_number', '') key = f"{doc_id}_{table_num}" if key not in merged: merged[key] = { 'document_id': doc_id, 'type': 'table', 'table_number': table_num, 'section_id': chunk.get('section_id', 'unknown'), 'chunk_text': chunk.get('chunk_text', '') } else: merged[key]['chunk_text'] += '\n' + chunk.get('chunk_text', '') else: unique_key = f"{doc_id}_{chunk.get('section_id', '')}_{chunk.get('chunk_id', 0)}" merged[unique_key] = chunk return list(merged.values()) def create_chunks_display_html(chunk_info): if not chunk_info: return "
Нет данных о чанках
" merged_chunks = merge_table_chunks(chunk_info) html = "
" html += f"

Найдено релевантных чанков: {len(merged_chunks)}

" for i, chunk in enumerate(merged_chunks): bg_color = "#f8f9fa" if i % 2 == 0 else "#e9ecef" section_display = get_section_display(chunk) formatted_content = get_formatted_content(chunk) html += f"""
Документ: {chunk['document_id']}
Раздел: {section_display}
Содержание:
{formatted_content}
""" html += "
" return html def get_section_display(chunk): section_path = chunk.get('section_path', '') section_id = chunk.get('section_id', 'unknown') doc_type = chunk.get('type', 'text') if doc_type == 'table' and chunk.get('table_number'): table_num = chunk.get('table_number') if not str(table_num).startswith('№'): table_num = f"№{table_num}" return f"таблица {table_num}" if doc_type == 'image' and chunk.get('image_number'): image_num = chunk.get('image_number') if not str(image_num).startswith('№'): image_num = f"№{image_num}" return f"рисунок {image_num}" if section_path: return section_path elif section_id and section_id != 'unknown': return section_id return section_id def get_formatted_content(chunk): document_id = chunk.get('document_id', 'unknown') section_path = chunk.get('section_path', '') section_id = chunk.get('section_id', 'unknown') section_text = chunk.get('section_text', '') parent_section = chunk.get('parent_section', '') parent_title = chunk.get('parent_title', '') level = chunk.get('level', '') chunk_text = chunk.get('chunk_text', '') doc_type = chunk.get('type', 'text') # For text documents if level in ['subsection', 'sub_subsection', 'sub_sub_subsection'] and parent_section: current_section = section_path if section_path else section_id parent_info = f"{parent_section} ({parent_title})" if parent_title else parent_section return f"В разделе {parent_info} в документе {document_id}, пункт {current_section}: {chunk_text}" else: current_section = section_path if section_path else section_id clean_text = chunk_text if section_text and chunk_text.startswith(section_text): section_title = section_text elif chunk_text.startswith(f"{current_section} "): clean_text = chunk_text[len(f"{current_section} "):].strip() section_title = section_text if section_text else f"{current_section} {clean_text.split('.')[0] if '.' in clean_text else clean_text[:50]}" else: section_title = section_text if section_text else current_section return f"В разделе {current_section} в документе {document_id}, пункт {section_title}: {clean_text}" def answer_question(question, query_engine, reranker, current_model, chunks_df=None, rerank_top_k=20): normalized_question = normalize_text(question) normalized_question_2, query_changes, change_list = normalize_steel_designations(question) enhanced_question = enhance_query_with_keywords(normalized_question_2) try: llm = get_llm_model(current_model) expansion_prompt = QUERY_EXPANSION_PROMPT.format(original_query=enhanced_question) expanded_queries = llm.complete(expansion_prompt).text.strip() enhanced_question = f"{enhanced_question} {expanded_queries}" log_message(f"LLM expanded query: {expanded_queries[:200]}...") except Exception as e: log_message(f"Query expansion failed: {e}, using keyword-only enhancement") if change_list: log_message(f"Query changes: {', '.join(change_list)}") if change_list: log_message(f"Query changes: {', '.join(change_list)}") if query_engine is None: return "
Система не инициализирована
", "", "" try: start_time = time.time() retrieved_nodes = query_engine.retriever.retrieve(enhanced_question) log_message(f"user query: {question}") log_message(f"after steel normalization: {normalized_question_2}") log_message(f"enhanced query: {enhanced_question}") unique_retrieved = deduplicate_nodes(retrieved_nodes) log_message(f"RETRIEVED: unique {len(unique_retrieved)} nodes") for i, node in enumerate(unique_retrieved): node_type = node.metadata.get('type', 'text') doc_id = node.metadata.get('document_id', 'N/A') if node_type == 'table': table_num = node.metadata.get('table_number', 'N/A') table_id = node.metadata.get('table_identifier', 'N/A') table_title = node.metadata.get('table_title', 'N/A') content_preview = node.text[:200].replace('\n', ' ') log_message(f" [{i+1}] {doc_id} - Table {table_num} | ID: {table_id}") log_message(f" Title: {table_title[:80]}") log_message(f" Content: {content_preview}...") else: section = node.metadata.get('section_id', 'N/A') log_message(f" [{i+1}] {doc_id} - Text section {section}") log_message(f"UNIQUE NODES: {len(unique_retrieved)} nodes") reranked_nodes = rerank_nodes(enhanced_question, unique_retrieved, reranker, top_k=rerank_top_k) response = query_engine.query(enhanced_question) end_time = time.time() processing_time = end_time - start_time log_message(f"Обработка завершена за {processing_time:.2f}с") sources_html = generate_sources_html(reranked_nodes, chunks_df) answer_with_time = f"""

Ответ (Модель: {current_model}):

{response.response}
Время обработки: {processing_time:.2f} секунд
""" log_message(f"Model Answer: {response.response}") chunk_info = [] for node in reranked_nodes: metadata = node.metadata if hasattr(node, 'metadata') else {} chunk_info.append({ 'document_id': metadata.get('document_id', 'unknown'), 'section_id': metadata.get('section_id', 'unknown'), 'section_path': metadata.get('section_path', ''), 'section_text': metadata.get('section_text', ''), 'type': metadata.get('type', 'text'), 'table_number': metadata.get('table_number', ''), 'image_number': metadata.get('image_number', ''), 'chunk_size': len(node.text), 'chunk_text': node.text }) from app import create_chunks_display_html chunks_html = create_chunks_display_html(chunk_info) return answer_with_time, sources_html, chunks_html except Exception as e: log_message(f"Ошибка: {str(e)}") error_msg = f"
Ошибка: {str(e)}
" return error_msg, "", ""