from llama_index.core.text_splitter import SentenceSplitter from llama_index.core import Document from config import CHUNK_SIZE, CHUNK_OVERLAP from my_logging import log_message def normalize_table_number(table_num, section): """Normalize table numbers for consistent retrieval""" if not table_num or table_num == 'Неизвестно': return 'Неизвестно' # Clean up common prefixes tn = str(table_num).replace('Таблица', '').replace('№', '').strip() # Add section context for appendix tables if section and ('Приложение' in str(section) or 'приложение' in str(section).lower()): return f"№{tn} ({section})" return f"№{tn}" def create_table_content(table_data): """Create formatted content optimized for semantic search""" doc_id = ( table_data.get('document_id') or table_data.get('document') or table_data.get('Обозначение документа') or 'Неизвестно' ) table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = ( table_data.get('section') or table_data.get('Раздел документа') or 'Неизвестно' ) sheet_name = table_data.get('sheet_name', '') # Enhanced table number with appendix context normalized_num = normalize_table_number(table_num, section) if 'Приложени' in str(section): # Extract appendix number import re appendix_match = re.search(r'Приложени[ея]\s*(\d+)', str(section)) if appendix_match: appendix_num = appendix_match.group(1) normalized_num = f"{normalized_num} Приложения {appendix_num}" # Build searchable header content = f"Документ: {doc_id}\n" content += f"Раздел: {section}\n" content += f"Таблица: {normalized_num}\n" content += f"Название: {table_title}\n" if sheet_name: content += f"Лист: {sheet_name}\n" content += f"\n" headers = table_data.get('headers', []) if headers: header_str = ' | '.join(str(h) for h in headers) content += f"Колонки: {header_str}\n\n" # CRITICAL: Preserve searchable row identifiers if 'data' in table_data and isinstance(table_data['data'], list): for row_idx, row in enumerate(table_data['data'], start=1): if isinstance(row, dict): # Extract ALL key-value pairs naturally row_parts = [] for k, v in row.items(): if v and str(v).strip() and str(v) != 'nan': row_parts.append(f"{k}: {v}") if row_parts: content += ' | '.join(row_parts) + "\n" elif isinstance(row, list): row_str = ' | '.join([str(v) for v in row if v and str(v).strip() and str(v) != 'nan']) if row_str: content += row_str + "\n" return content, normalized_num def chunk_table_document(doc, chunk_size=None, chunk_overlap=None): if chunk_size is None: chunk_size = CHUNK_SIZE if chunk_overlap is None: chunk_overlap = CHUNK_OVERLAP table_num = doc.metadata.get('table_number', 'unknown') doc_id = doc.metadata.get('document_id', 'unknown') section = doc.metadata.get('section', 'Неизвестно') full_table_id = f"{doc_id} | {section} | {table_num}" lines = doc.text.strip().split('\n') # Find where data rows start data_start_idx = 0 for i, line in enumerate(lines): if line.startswith('Колонки:'): data_start_idx = i + 2 # Skip header and blank line break table_header = '\n'.join(lines[:data_start_idx]) data_rows = lines[data_start_idx:] if not data_rows or len(doc.text) < chunk_size * 1.5: log_message(f" 📊 {full_table_id}: малая таблица, без разбиения") return [doc] log_message(f" 📋 {full_table_id}: {len(data_rows)} строк → chunking") header_size = len(table_header) available_size = chunk_size - header_size - 100 text_chunks = [] current_chunk_rows = [] current_size = 0 for row in data_rows: row_size = len(row) + 1 if current_size + row_size > available_size and current_chunk_rows: chunk_text = table_header + '\n' + '\n'.join(current_chunk_rows) text_chunks.append(chunk_text) # Keep last 2 rows for overlap overlap_count = min(2, len(current_chunk_rows)) current_chunk_rows = current_chunk_rows[-overlap_count:] current_size = sum(len(r) + 1 for r in current_chunk_rows) current_chunk_rows.append(row) current_size += row_size if current_chunk_rows: chunk_text = table_header + '\n' + '\n'.join(current_chunk_rows) text_chunks.append(chunk_text) log_message(f" ✂️ {full_table_id} → {len(text_chunks)} чанков") chunked_docs = [] for i, chunk_text in enumerate(text_chunks): chunk_metadata = doc.metadata.copy() chunk_metadata.update({ "chunk_id": i, "total_chunks": len(text_chunks), "chunk_size": len(chunk_text), "is_chunked": True, "full_table_id": full_table_id, "table_number_normalized": doc.metadata.get('table_number_normalized') }) chunked_doc = Document( text=chunk_text, metadata=chunk_metadata ) chunked_docs.append(chunked_doc) return chunked_docs def table_to_document(table_data, document_id=None): """Convert table data to Document with complete metadata""" if not isinstance(table_data, dict): return [] sheet_doc_id = ( table_data.get('document_id') or table_data.get('document') or table_data.get('Обозначение документа') ) doc_id = sheet_doc_id or document_id or 'Неизвестно' table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', table_data.get('Раздел документа', 'Неизвестно')) sheet_name = table_data.get('sheet_name', '') table_rows = table_data.get('data', []) if not table_rows: log_message(f"⚠️ Таблица {table_num} ({doc_id}) пропущена: нет данных") return [] content, normalized_num = create_table_content(table_data) content_size = len(content) base_doc = Document( text=content, metadata={ "type": "table", "table_number": table_num, "table_number_normalized": normalized_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "sheet_name": sheet_name, "total_rows": len(table_rows), "content_size": content_size, "full_table_id": f"{doc_id} | {section} | {normalized_num}" } ) if content_size > CHUNK_SIZE: log_message(f"📊 CHUNKING: {doc_id} | {normalized_num} | {content_size} > {CHUNK_SIZE}") return chunk_table_document(base_doc) else: log_message(f"✓ {doc_id} | {normalized_num} ({content_size} символов)") return [base_doc] def table_to_document(table_data, document_id=None): """Convert table data to Document with proper metadata""" if not isinstance(table_data, dict): return [] # FIXED: Extract sheet-level document_id first sheet_doc_id = ( table_data.get('document_id') or table_data.get('document') or table_data.get('Обозначение документа') ) # Use sheet doc_id if available, otherwise use passed document_id doc_id = sheet_doc_id or document_id or 'Неизвестно' table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', table_data.get('Раздел документа', 'Неизвестно')) table_rows = table_data.get('data', []) if not table_rows: log_message(f"⚠️ Таблица {table_num} ({doc_id}) пропущена: нет данных") return [] content, normalized_num = create_table_content(table_data) content_size = len(content) base_doc = Document( text=content, metadata={ "type": "table", "table_number": table_num, "table_number_normalized": normalized_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "total_rows": len(table_rows), "content_size": content_size, "full_table_id": f"{doc_id} | {section} | {normalized_num}" } ) if content_size > CHUNK_SIZE: log_message(f"📊 CHUNKING: {doc_id} | {normalized_num} | {content_size} > {CHUNK_SIZE}") return chunk_table_document(base_doc) else: log_message(f"✓ {doc_id} | {normalized_num} ({content_size} символов)") return [base_doc]