Spaces:
Sleeping
Sleeping
| import logging | |
| import sys | |
| from llama_index.llms.google_genai import GoogleGenAI | |
| from llama_index.llms.openai import OpenAI | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from sentence_transformers import CrossEncoder | |
| from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY | |
| import time | |
| from index_retriever import rerank_nodes | |
| from my_logging import log_message | |
| from config import PROMPT_SIMPLE_POISK | |
| from config import QUERY_EXPANSION_PROMPT | |
| from documents_prep import normalize_text, normalize_steel_designations | |
| KEYWORD_EXPANSIONS = { | |
| "08X18H10T": ["Листы", "Трубы", "Поковки", "Крепежные изделия", "Сортовой прокат", "Отливки"], | |
| "12X18H10T": ["Листы", "Поковки", "Сортовой прокат"], | |
| "10X17H13M2T": ["Трубы", "Арматура", "Поковки", "Фланцы"], | |
| "20X23H18": ["Листы", "Сортовой прокат", "Поковки"], | |
| "03X17H14M3": ["Трубы", "Листы", "Проволока"], | |
| "СВ-08X19H10": ["Сварочная проволока", "Сварка", "Сварочные материалы"], | |
| } | |
| def get_llm_model(model_name): | |
| try: | |
| model_config = AVAILABLE_MODELS.get(model_name) | |
| if not model_config: | |
| log_message(f"Модель {model_name} не найдена, использую модель по умолчанию") | |
| model_config = AVAILABLE_MODELS[DEFAULT_MODEL] | |
| if not model_config.get("api_key"): | |
| raise Exception(f"API ключ не найден для модели {model_name}") | |
| if model_config["provider"] == "google": | |
| return GoogleGenAI( | |
| model=model_config["model_name"], | |
| api_key=model_config["api_key"] | |
| ) | |
| elif model_config["provider"] == "openai": | |
| return OpenAI( | |
| model=model_config["model_name"], | |
| api_key=model_config["api_key"] | |
| ) | |
| else: | |
| raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}") | |
| except Exception as e: | |
| log_message(f"Ошибка создания модели {model_name}: {str(e)}") | |
| return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY) | |
| def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"): | |
| return HuggingFaceEmbedding(model_name=model_name) | |
| def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'): | |
| return CrossEncoder(model_name) | |
| def generate_sources_html(nodes, chunks_df=None): | |
| html = "<div style='background-color: #2d3748; color: white; padding: 20px; border-radius: 10px; max-height: 400px; overflow-y: auto;'>" | |
| html += "<h3 style='color: #63b3ed; margin-top: 0;'>Источники:</h3>" | |
| sources_by_doc = {} | |
| for i, node in enumerate(nodes): | |
| metadata = node.metadata if hasattr(node, 'metadata') else {} | |
| doc_type = metadata.get('type', 'text') | |
| doc_id = metadata.get('document_id', 'unknown') | |
| if doc_type == 'table' or doc_type == 'table_row': | |
| table_num = metadata.get('table_number', 'unknown') | |
| key = f"{doc_id}_table_{table_num}" | |
| elif doc_type == 'image': | |
| image_num = metadata.get('image_number', 'unknown') | |
| key = f"{doc_id}_image_{image_num}" | |
| else: | |
| section_path = metadata.get('section_path', '') | |
| section_id = metadata.get('section_id', '') | |
| section_key = section_path if section_path else section_id | |
| key = f"{doc_id}_text_{section_key}" | |
| if key not in sources_by_doc: | |
| sources_by_doc[key] = { | |
| 'doc_id': doc_id, | |
| 'doc_type': doc_type, | |
| 'metadata': metadata, | |
| 'sections': set() | |
| } | |
| if doc_type not in ['table', 'table_row', 'image']: | |
| section_path = metadata.get('section_path', '') | |
| section_id = metadata.get('section_id', '') | |
| if section_path: | |
| sources_by_doc[key]['sections'].add(f"пункт {section_path}") | |
| elif section_id and section_id != 'unknown': | |
| sources_by_doc[key]['sections'].add(f"пункт {section_id}") | |
| for source_info in sources_by_doc.values(): | |
| metadata = source_info['metadata'] | |
| doc_type = source_info['doc_type'] | |
| doc_id = source_info['doc_id'] | |
| html += f"<div style='margin-bottom: 15px; padding: 15px; border: 1px solid #4a5568; border-radius: 8px; background-color: #1a202c;'>" | |
| if doc_type == 'text': | |
| html += f"<h4 style='margin: 0 0 10px 0; color: #63b3ed;'>📄 {doc_id}</h4>" | |
| elif doc_type == 'table' or doc_type == 'table_row': | |
| table_num = metadata.get('table_number', 'unknown') | |
| table_title = metadata.get('table_title', '') | |
| if table_num and table_num != 'unknown': | |
| if not str(table_num).startswith('№'): | |
| table_num = f"№{table_num}" | |
| html += f"<h4 style='margin: 0 0 10px 0; color: #68d391;'>📊 Таблица {table_num} - {doc_id}</h4>" | |
| if table_title and table_title != 'unknown': | |
| html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{table_title}</p>" | |
| else: | |
| html += f"<h4 style='margin: 0 0 10px 0; color: #68d391;'>📊 Таблица - {doc_id}</h4>" | |
| elif doc_type == 'image': | |
| image_num = metadata.get('image_number', 'unknown') | |
| image_title = metadata.get('image_title', '') | |
| if image_num and image_num != 'unknown': | |
| if not str(image_num).startswith('№'): | |
| image_num = f"№{image_num}" | |
| html += f"<h4 style='margin: 0 0 10px 0; color: #fbb6ce;'>🖼️ Изображение {image_num} - {doc_id}</h4>" | |
| if image_title and image_title != 'unknown': | |
| html += f"<p style='margin: 5px 0; color: #a0aec0; font-size: 14px;'>{image_title}</p>" | |
| if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text': | |
| doc_rows = chunks_df[chunks_df['document_id'] == doc_id] | |
| if not doc_rows.empty: | |
| file_link = doc_rows.iloc[0]['file_link'] | |
| html += f"<a href='{file_link}' target='_blank' style='color: #68d391; text-decoration: none; font-size: 14px; display: inline-block; margin-top: 10px;'>🔗 Ссылка на документ</a><br>" | |
| html += "</div>" | |
| html += "</div>" | |
| return html | |
| def deduplicate_nodes(nodes): | |
| """Deduplicate retrieved nodes based on content and metadata""" | |
| seen = set() | |
| unique_nodes = [] | |
| for node in nodes: | |
| doc_id = node.metadata.get('document_id', '') | |
| node_type = node.metadata.get('type', 'text') | |
| if node_type == 'table' or node_type == 'table_row': | |
| table_num = node.metadata.get('table_number', '') | |
| table_identifier = node.metadata.get('table_identifier', table_num) | |
| # Use row range to distinguish table chunks | |
| row_start = node.metadata.get('row_start', '') | |
| row_end = node.metadata.get('row_end', '') | |
| is_complete = node.metadata.get('is_complete_table', False) | |
| if is_complete: | |
| identifier = f"{doc_id}|table|{table_identifier}|complete" | |
| elif row_start != '' and row_end != '': | |
| identifier = f"{doc_id}|table|{table_identifier}|rows_{row_start}_{row_end}" | |
| else: | |
| # Fallback: use chunk_id if available | |
| chunk_id = node.metadata.get('chunk_id', '') | |
| if chunk_id != '': | |
| identifier = f"{doc_id}|table|{table_identifier}|chunk_{chunk_id}" | |
| else: | |
| # Last resort: hash first 100 chars of content | |
| import hashlib | |
| content_hash = hashlib.md5(node.text[:100].encode()).hexdigest()[:8] | |
| identifier = f"{doc_id}|table|{table_identifier}|{content_hash}" | |
| elif node_type == 'image': | |
| img_num = node.metadata.get('image_number', '') | |
| identifier = f"{doc_id}|image|{img_num}" | |
| else: # text | |
| section_id = node.metadata.get('section_id', '') | |
| chunk_id = node.metadata.get('chunk_id', 0) | |
| # For text, section_id + chunk_id should be unique | |
| identifier = f"{doc_id}|text|{section_id}|{chunk_id}" | |
| if identifier not in seen: | |
| seen.add(identifier) | |
| unique_nodes.append(node) | |
| return unique_nodes | |
| def enhance_query_with_keywords(query): | |
| query_upper = query.upper() | |
| added_context = [] | |
| keywords_found = [] | |
| for keyword, expansions in KEYWORD_EXPANSIONS.items(): | |
| keyword_upper = keyword.upper() | |
| if keyword_upper in query_upper: | |
| context = ' '.join(expansions) | |
| added_context.append(context) | |
| keywords_found.append(keyword) | |
| log_message(f" Found keyword '{keyword}': added context '{context}'") | |
| if added_context: | |
| unique_context = ' '.join(set(' '.join(added_context).split())) | |
| enhanced = f"{query} {unique_context}" | |
| log_message(f"Enhanced query with keywords: {', '.join(keywords_found)}") | |
| log_message(f"Added context: {unique_context[:100]}...") | |
| return enhanced | |
| return f"{query}" | |
| def get_repository_stats(repo_id, hf_token, json_dir, table_dir, image_dir): | |
| """Get statistics about documents in the repository""" | |
| try: | |
| from huggingface_hub import list_repo_files | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| # Count JSON text files | |
| json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')] | |
| zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')] | |
| # Count table files | |
| table_files = [f for f in files if f.startswith(table_dir) and | |
| (f.endswith('.json') or f.endswith('.xlsx') or f.endswith('.xls'))] | |
| # Count image files | |
| image_files = [f for f in files if f.startswith(image_dir) and | |
| (f.endswith('.csv') or f.endswith('.xlsx') or f.endswith('.xls'))] | |
| stats = { | |
| 'text_files': len(json_files) + len(zip_files), | |
| 'table_files': len(table_files), | |
| 'image_files': len(image_files), | |
| 'total_files': len(json_files) + len(zip_files) + len(table_files) + len(image_files) | |
| } | |
| log_message(f"Repository stats: {stats}") | |
| return stats | |
| except Exception as e: | |
| log_message(f"Error getting repository stats: {e}") | |
| return {'text_files': 0, 'table_files': 0, 'image_files': 0, 'total_files': 0} | |
| def format_stats_display(stats): | |
| """Format statistics for display""" | |
| return f"""📊 **Статистика базы данных:** | |
| 📝 Текстовые документы (JSON): **{stats['text_files']}** | |
| 📊 Табличные данные: **{stats['table_files']}** | |
| 🖼️ Изображения: **{stats['image_files']}** | |
| ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ | |
| 📦 Всего файлов: **{stats['total_files']}** | |
| """ | |
| def merge_table_chunks(chunk_info): | |
| merged = {} | |
| for chunk in chunk_info: | |
| doc_type = chunk.get('type', 'text') | |
| doc_id = chunk.get('document_id', 'unknown') | |
| if doc_type == 'table' or doc_type == 'table_row': | |
| table_num = chunk.get('table_number', '') | |
| key = f"{doc_id}_{table_num}" | |
| if key not in merged: | |
| merged[key] = { | |
| 'document_id': doc_id, | |
| 'type': 'table', | |
| 'table_number': table_num, | |
| 'section_id': chunk.get('section_id', 'unknown'), | |
| 'chunk_text': chunk.get('chunk_text', '') | |
| } | |
| else: | |
| merged[key]['chunk_text'] += '\n' + chunk.get('chunk_text', '') | |
| else: | |
| unique_key = f"{doc_id}_{chunk.get('section_id', '')}_{chunk.get('chunk_id', 0)}" | |
| merged[unique_key] = chunk | |
| return list(merged.values()) | |
| def create_chunks_display_html(chunk_info): | |
| if not chunk_info: | |
| return "<div style='padding: 20px; text-align: center; color: black;'>Нет данных о чанках</div>" | |
| merged_chunks = merge_table_chunks(chunk_info) | |
| html = "<div style='max-height: 500px; overflow-y: auto; padding: 10px; color: black;'>" | |
| html += f"<h4 style='color: black;'>Найдено релевантных чанков: {len(merged_chunks)}</h4>" | |
| for i, chunk in enumerate(merged_chunks): | |
| bg_color = "#f8f9fa" if i % 2 == 0 else "#e9ecef" | |
| section_display = get_section_display(chunk) | |
| formatted_content = get_formatted_content(chunk) | |
| html += f""" | |
| <div style='background-color: {bg_color}; padding: 10px; margin: 5px 0; border-radius: 5px; border-left: 4px solid #007bff; color: black;'> | |
| <strong style='color: black;'>Документ:</strong> <span style='color: black;'>{chunk['document_id']}</span><br> | |
| <strong style='color: black;'>Раздел:</strong> <span style='color: black;'>{section_display}</span><br> | |
| <strong style='color: black;'>Содержание:</strong><br> | |
| <div style='background-color: white; padding: 8px; margin-top: 5px; border-radius: 3px; font-family: monospace; font-size: 12px; color: black; max-height: 200px; overflow-y: auto;'> | |
| {formatted_content} | |
| </div> | |
| </div> | |
| """ | |
| html += "</div>" | |
| return html | |
| def get_section_display(chunk): | |
| section_path = chunk.get('section_path', '') | |
| section_id = chunk.get('section_id', 'unknown') | |
| doc_type = chunk.get('type', 'text') | |
| if doc_type == 'table' and chunk.get('table_number'): | |
| table_num = chunk.get('table_number') | |
| if not str(table_num).startswith('№'): | |
| table_num = f"№{table_num}" | |
| return f"таблица {table_num}" | |
| if doc_type == 'image' and chunk.get('image_number'): | |
| image_num = chunk.get('image_number') | |
| if not str(image_num).startswith('№'): | |
| image_num = f"№{image_num}" | |
| return f"рисунок {image_num}" | |
| if section_path: | |
| return section_path | |
| elif section_id and section_id != 'unknown': | |
| return section_id | |
| return section_id | |
| def get_formatted_content(chunk): | |
| document_id = chunk.get('document_id', 'unknown') | |
| section_path = chunk.get('section_path', '') | |
| section_id = chunk.get('section_id', 'unknown') | |
| section_text = chunk.get('section_text', '') | |
| parent_section = chunk.get('parent_section', '') | |
| parent_title = chunk.get('parent_title', '') | |
| level = chunk.get('level', '') | |
| chunk_text = chunk.get('chunk_text', '') | |
| doc_type = chunk.get('type', 'text') | |
| # For text documents | |
| if level in ['subsection', 'sub_subsection', 'sub_sub_subsection'] and parent_section: | |
| current_section = section_path if section_path else section_id | |
| parent_info = f"{parent_section} ({parent_title})" if parent_title else parent_section | |
| return f"В разделе {parent_info} в документе {document_id}, пункт {current_section}: {chunk_text}" | |
| else: | |
| current_section = section_path if section_path else section_id | |
| clean_text = chunk_text | |
| if section_text and chunk_text.startswith(section_text): | |
| section_title = section_text | |
| elif chunk_text.startswith(f"{current_section} "): | |
| clean_text = chunk_text[len(f"{current_section} "):].strip() | |
| section_title = section_text if section_text else f"{current_section} {clean_text.split('.')[0] if '.' in clean_text else clean_text[:50]}" | |
| else: | |
| section_title = section_text if section_text else current_section | |
| return f"В разделе {current_section} в документе {document_id}, пункт {section_title}: {clean_text}" | |
| def answer_question(question, query_engine, reranker, current_model, chunks_df=None, rerank_top_k=20): | |
| normalized_question = normalize_text(question) | |
| normalized_question_2, query_changes, change_list = normalize_steel_designations(question) | |
| enhanced_question = enhance_query_with_keywords(normalized_question_2) | |
| try: | |
| llm = get_llm_model(current_model) | |
| expansion_prompt = QUERY_EXPANSION_PROMPT.format(original_query=enhanced_question) | |
| expanded_queries = llm.complete(expansion_prompt).text.strip() | |
| enhanced_question = f"{enhanced_question} {expanded_queries}" | |
| log_message(f"LLM expanded query: {expanded_queries[:200]}...") | |
| except Exception as e: | |
| log_message(f"Query expansion failed: {e}, using keyword-only enhancement") | |
| if change_list: | |
| log_message(f"Query changes: {', '.join(change_list)}") | |
| if change_list: | |
| log_message(f"Query changes: {', '.join(change_list)}") | |
| if query_engine is None: | |
| return "<div style='background-color: #e53e3e; color: white; padding: 20px; border-radius: 10px;'>Система не инициализирована</div>", "", "" | |
| try: | |
| start_time = time.time() | |
| retrieved_nodes = query_engine.retriever.retrieve(enhanced_question) | |
| log_message(f"user query: {question}") | |
| log_message(f"after steel normalization: {normalized_question_2}") | |
| log_message(f"enhanced query: {enhanced_question}") | |
| unique_retrieved = deduplicate_nodes(retrieved_nodes) | |
| log_message(f"RETRIEVED: unique {len(unique_retrieved)} nodes") | |
| for i, node in enumerate(unique_retrieved): | |
| node_type = node.metadata.get('type', 'text') | |
| doc_id = node.metadata.get('document_id', 'N/A') | |
| if node_type == 'table': | |
| table_num = node.metadata.get('table_number', 'N/A') | |
| table_id = node.metadata.get('table_identifier', 'N/A') | |
| table_title = node.metadata.get('table_title', 'N/A') | |
| content_preview = node.text[:200].replace('\n', ' ') | |
| log_message(f" [{i+1}] {doc_id} - Table {table_num} | ID: {table_id}") | |
| log_message(f" Title: {table_title[:80]}") | |
| log_message(f" Content: {content_preview}...") | |
| else: | |
| section = node.metadata.get('section_id', 'N/A') | |
| log_message(f" [{i+1}] {doc_id} - Text section {section}") | |
| log_message(f"UNIQUE NODES: {len(unique_retrieved)} nodes") | |
| reranked_nodes = rerank_nodes(enhanced_question, unique_retrieved, reranker, | |
| top_k=rerank_top_k) | |
| response = query_engine.query(enhanced_question) | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| log_message(f"Обработка завершена за {processing_time:.2f}с") | |
| sources_html = generate_sources_html(reranked_nodes, chunks_df) | |
| answer_with_time = f"""<div style='background-color: #2d3748; color: white; padding: 20px; border-radius: 10px; margin-bottom: 10px;'> | |
| <h3 style='color: #63b3ed; margin-top: 0;'>Ответ (Модель: {current_model}):</h3> | |
| <div style='line-height: 1.6; font-size: 16px;'>{response.response}</div> | |
| <div style='margin-top: 15px; padding-top: 10px; border-top: 1px solid #4a5568; font-size: 14px; color: #a0aec0;'> | |
| Время обработки: {processing_time:.2f} секунд | |
| </div> | |
| </div>""" | |
| log_message(f"Model Answer: {response.response}") | |
| chunk_info = [] | |
| for node in reranked_nodes: | |
| metadata = node.metadata if hasattr(node, 'metadata') else {} | |
| chunk_info.append({ | |
| 'document_id': metadata.get('document_id', 'unknown'), | |
| 'section_id': metadata.get('section_id', 'unknown'), | |
| 'section_path': metadata.get('section_path', ''), | |
| 'section_text': metadata.get('section_text', ''), | |
| 'type': metadata.get('type', 'text'), | |
| 'table_number': metadata.get('table_number', ''), | |
| 'image_number': metadata.get('image_number', ''), | |
| 'chunk_size': len(node.text), | |
| 'chunk_text': node.text | |
| }) | |
| from app import create_chunks_display_html | |
| chunks_html = create_chunks_display_html(chunk_info) | |
| return answer_with_time, sources_html, chunks_html | |
| except Exception as e: | |
| log_message(f"Ошибка: {str(e)}") | |
| error_msg = f"<div style='background-color: #e53e3e; color: white; padding: 20px; border-radius: 10px;'>Ошибка: {str(e)}</div>" | |
| return error_msg, "", "" |