import logging
import sys
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from sentence_transformers import CrossEncoder
from config import AVAILABLE_MODELS, DEFAULT_MODEL, GOOGLE_API_KEY
import time
from index_retriever import rerank_nodes
from my_logging import log_message
from config import PROMPT_SIMPLE_POISK
def get_llm_model(model_name):
try:
model_config = AVAILABLE_MODELS.get(model_name)
if not model_config:
log_message(f"Модель {model_name} не найдена, использую модель по умолчанию")
model_config = AVAILABLE_MODELS[DEFAULT_MODEL]
if not model_config.get("api_key"):
raise Exception(f"API ключ не найден для модели {model_name}")
if model_config["provider"] == "google":
return GoogleGenAI(
model=model_config["model_name"],
api_key=model_config["api_key"]
)
elif model_config["provider"] == "openai":
return OpenAI(
model=model_config["model_name"],
api_key=model_config["api_key"]
)
else:
raise Exception(f"Неподдерживаемый провайдер: {model_config['provider']}")
except Exception as e:
log_message(f"Ошибка создания модели {model_name}: {str(e)}")
return GoogleGenAI(model="gemini-2.0-flash", api_key=GOOGLE_API_KEY)
def get_embedding_model(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
return HuggingFaceEmbedding(model_name=model_name)
def get_reranker_model(model_name='cross-encoder/ms-marco-MiniLM-L-12-v2'):
return CrossEncoder(model_name)
def generate_sources_html(nodes, chunks_df=None):
html = "
"
html += "
Источники:
"
sources_by_doc = {}
for i, node in enumerate(nodes):
metadata = node.metadata if hasattr(node, 'metadata') else {}
doc_type = metadata.get('type', 'text')
doc_id = metadata.get('document_id', 'unknown')
if doc_type == 'table' or doc_type == 'table_row':
table_num = metadata.get('table_number', 'unknown')
key = f"{doc_id}_table_{table_num}"
elif doc_type == 'image':
image_num = metadata.get('image_number', 'unknown')
key = f"{doc_id}_image_{image_num}"
else:
section_path = metadata.get('section_path', '')
section_id = metadata.get('section_id', '')
section_key = section_path if section_path else section_id
key = f"{doc_id}_text_{section_key}"
if key not in sources_by_doc:
sources_by_doc[key] = {
'doc_id': doc_id,
'doc_type': doc_type,
'metadata': metadata,
'sections': set()
}
if doc_type not in ['table', 'table_row', 'image']:
section_path = metadata.get('section_path', '')
section_id = metadata.get('section_id', '')
if section_path:
sources_by_doc[key]['sections'].add(f"пункт {section_path}")
elif section_id and section_id != 'unknown':
sources_by_doc[key]['sections'].add(f"пункт {section_id}")
for source_info in sources_by_doc.values():
metadata = source_info['metadata']
doc_type = source_info['doc_type']
doc_id = source_info['doc_id']
html += f"
"
if doc_type == 'text':
html += f"
📄 {doc_id}
"
elif doc_type == 'table' or doc_type == 'table_row':
table_num = metadata.get('table_number', 'unknown')
table_title = metadata.get('table_title', '')
if table_num and table_num != 'unknown':
if not str(table_num).startswith('№'):
table_num = f"№{table_num}"
html += f"
📊 Таблица {table_num} - {doc_id}
"
if table_title and table_title != 'unknown':
html += f"
{table_title}
"
else:
html += f"
📊 Таблица - {doc_id}
"
elif doc_type == 'image':
image_num = metadata.get('image_number', 'unknown')
image_title = metadata.get('image_title', '')
if image_num and image_num != 'unknown':
if not str(image_num).startswith('№'):
image_num = f"№{image_num}"
html += f"
🖼️ Изображение {image_num} - {doc_id}
"
if image_title and image_title != 'unknown':
html += f"
{image_title}
"
if chunks_df is not None and 'file_link' in chunks_df.columns and doc_type == 'text':
doc_rows = chunks_df[chunks_df['document_id'] == doc_id]
if not doc_rows.empty:
file_link = doc_rows.iloc[0]['file_link']
html += f"
🔗 Ссылка на документ"
html += "
"
html += "
"
return html
def deduplicate_nodes(nodes):
"""Deduplicate retrieved nodes based on content and metadata"""
seen = set()
unique_nodes = []
for node in nodes:
doc_id = node.metadata.get('document_id', '')
node_type = node.metadata.get('type', 'text')
if node_type == 'table' or node_type == 'table_row':
table_num = node.metadata.get('table_number', '')
table_identifier = node.metadata.get('table_identifier', table_num)
# Use row range to distinguish table chunks
row_start = node.metadata.get('row_start', '')
row_end = node.metadata.get('row_end', '')
is_complete = node.metadata.get('is_complete_table', False)
if is_complete:
identifier = f"{doc_id}|table|{table_identifier}|complete"
elif row_start != '' and row_end != '':
identifier = f"{doc_id}|table|{table_identifier}|rows_{row_start}_{row_end}"
else:
# Fallback: use chunk_id if available
chunk_id = node.metadata.get('chunk_id', '')
if chunk_id != '':
identifier = f"{doc_id}|table|{table_identifier}|chunk_{chunk_id}"
else:
# Last resort: hash first 100 chars of content
import hashlib
content_hash = hashlib.md5(node.text[:100].encode()).hexdigest()[:8]
identifier = f"{doc_id}|table|{table_identifier}|{content_hash}"
elif node_type == 'image':
img_num = node.metadata.get('image_number', '')
identifier = f"{doc_id}|image|{img_num}"
else: # text
section_id = node.metadata.get('section_id', '')
chunk_id = node.metadata.get('chunk_id', 0)
# For text, section_id + chunk_id should be unique
identifier = f"{doc_id}|text|{section_id}|{chunk_id}"
if identifier not in seen:
seen.add(identifier)
unique_nodes.append(node)
return unique_nodes
def debug_search_tables(vector_index, search_term="С-25"):
"""Debug function to find all tables containing a specific term"""
all_nodes = list(vector_index.docstore.docs.values())
matching = []
for node in all_nodes:
if node.metadata.get('type') == 'table':
text = node.get_content()
if search_term in text or search_term in node.metadata.get('table_title', ''):
matching.append({
'doc_id': node.metadata.get('document_id'),
'table_num': node.metadata.get('table_number'),
'title': node.metadata.get('table_title', '')[:100]
})
log_message(f"\n{'='*60}")
log_message(f"DEBUG: Found {len(matching)} tables containing '{search_term}'")
for m in matching:
log_message(f" • {m['doc_id']} - Table {m['table_num']}: {m['title']}")
log_message(f"{'='*60}\n")
return matching
from documents_prep import normalize_text, normalize_steel_designations
def answer_question(question, query_engine, reranker, current_model, chunks_df=None, rerank_top_k=20):
normalized_question = normalize_text(question)
log_message(f"Normalized question: {normalized_question}")
normalized_question_2, query_changes, change_list = normalize_steel_designations(question) # FIX: 3 values
log_message(f"After steel normalization: {normalized_question_2}")
if change_list:
log_message(f"Query changes: {', '.join(change_list)}")
if query_engine is None:
return "