from collections import defaultdict import json from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from my_logging import log_message def create_table_content(table_data): """Create formatted content from table data""" doc_id = table_data.get('document_id', table_data.get('document', 'Неизвестно')) table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') content = f"Таблица: {table_num}\n" content += f"Название: {table_title}\n" content += f"Документ: {doc_id}\n" content += f"Раздел: {section}\n" headers = table_data.get('headers', []) if headers: content += f"\nЗаголовки: {' | '.join(headers)}\n" if 'data' in table_data and isinstance(table_data['data'], list): content += "\nДанные таблицы:\n" for row_idx, row in enumerate(table_data['data'], start=1): if isinstance(row, dict): row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v]) content += f"Строка {row_idx}: {row_text}\n" return content from llama_index.core.text_splitter import SentenceSplitter from config import CHUNK_SIZE, CHUNK_OVERLAP # In table_prep.py - replace chunk_table_document function def chunk_table_document(doc, chunk_size=None, chunk_overlap=None): if chunk_size is None: chunk_size = CHUNK_SIZE if chunk_overlap is None: chunk_overlap = CHUNK_OVERLAP # Extract critical metadata from table before chunking table_metadata = extract_table_metadata(doc.text) text_splitter = SentenceSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separator="\n" ) text_chunks = text_splitter.split_text(doc.text) chunked_docs = [] for i, chunk_text in enumerate(text_chunks): chunk_metadata = doc.metadata.copy() # Add extracted keywords/materials to each chunk chunk_metadata.update({ "chunk_id": i, "total_chunks": len(text_chunks), "chunk_size": len(chunk_text), "is_chunked": True, "materials": table_metadata.get("materials", []), # All materials from table "key_terms": table_metadata.get("key_terms", []), # Technical terms "table_summary": table_metadata.get("summary", "") # Brief table description }) # Enrich chunk text with context from full table enriched_text = f"""[Таблица {doc.metadata.get('table_number')}: {doc.metadata.get('table_title')}] [Материалы в таблице: {', '.join(table_metadata.get('materials', [])[:10])}] [Ключевые термины: {', '.join(table_metadata.get('key_terms', [])[:10])}] {chunk_text}""" chunked_doc = Document( text=enriched_text, metadata=chunk_metadata ) chunked_docs.append(chunked_doc) return chunked_docs def extract_table_metadata(table_text): """Extract searchable metadata from table content""" import re # Extract material codes (e.g., 08Х18Н10Т) material_pattern = r'\b\d{2}[ХНТМКВБА]+\d{1,2}[ХНТМКВБА]*\d*\b' materials = list(set(re.findall(material_pattern, table_text, re.IGNORECASE))) # Extract GOST standards gost_pattern = r'ГОСТ\s+[РЕН\s]*\d+[\.\-\d]*' gosts = list(set(re.findall(gost_pattern, table_text, re.IGNORECASE))) # Extract class/category codes class_pattern = r'\b\d[АБВСI]+[IVX]+[a-z]*\b' classes = list(set(re.findall(class_pattern, table_text, re.IGNORECASE))) # Extract common technical terms tech_terms = [] keywords = ['контроль', 'испытание', 'сертификат', 'качество', 'план', 'полуфабрикат', 'оборудование', 'арматура', 'деталь'] for keyword in keywords: if keyword.lower() in table_text.lower(): tech_terms.append(keyword) # Create brief summary lines = table_text.split('\n')[:5] summary = ' '.join([l.strip() for l in lines if l.strip()])[:200] return { "materials": materials, "gosts": gosts, "classes": classes, "key_terms": tech_terms + gosts, "summary": summary } def table_to_document(table_data, document_id=None): if not isinstance(table_data, dict): log_message(f"⚠️ ПРОПУЩЕНА: table_data не является словарем") return [] doc_id = document_id or table_data.get('document_id') or table_data.get('document', 'Неизвестно') table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') table_rows = table_data.get('data', []) if not table_rows or len(table_rows) == 0: log_message(f"⚠️ ПРОПУЩЕНА: Таблица {table_num} из '{doc_id}' - нет данных в 'data'") return [] content = create_table_content(table_data) content_size = len(content) row_count = len(table_rows) base_doc = Document( text=content, metadata={ "type": "table", "table_number": table_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "total_rows": row_count, "content_size": content_size } ) if content_size > CHUNK_SIZE: log_message(f"📊 CHUNKING: Таблица {table_num} из '{doc_id}' | " f"Размер: {content_size} > {CHUNK_SIZE} | Строк: {row_count}") chunked_docs = chunk_table_document(base_doc) log_message(f" ✂️ Разделена на {len(chunked_docs)} чанков") for i, chunk_doc in enumerate(chunked_docs): log_message(f" Чанк {i+1}: {chunk_doc.metadata['chunk_size']} символов") return chunked_docs else: log_message(f"✓ ДОБАВЛЕНА: Таблица {table_num} из документа '{doc_id}' | " f"Размер: {content_size} символов | Строк: {row_count}") return [base_doc] def load_table_data(repo_id, hf_token, table_data_dir): log_message("=" * 60) log_message("НАЧАЛО ЗАГРУЗКИ ТАБЛИЧНЫХ ДАННЫХ") log_message("=" * 60) try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) table_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')] log_message(f"Найдено {len(table_files)} JSON файлов с таблицами") table_documents = [] stats = { 'total_tables': 0, 'total_size': 0, 'by_document': defaultdict(lambda: {'count': 0, 'size': 0}) } for file_path in table_files: try: local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) log_message(f"\nОбработка файла: {file_path}") with open(local_path, 'r', encoding='utf-8') as f: table_data = json.load(f) if isinstance(table_data, dict): document_id = table_data.get('document', 'unknown') if 'sheets' in table_data: sorted_sheets = sorted( table_data['sheets'], key=lambda sheet: sheet.get('table_number', '') # or use 'table_number' ) for sheet in sorted_sheets: sheet['document'] = document_id docs_list = table_to_document(sheet, document_id) table_documents.extend(docs_list) for doc in docs_list: stats['total_tables'] += 1 size = doc.metadata.get('content_size', 0) stats['total_size'] += size stats['by_document'][document_id]['count'] += 1 stats['by_document'][document_id]['size'] += size else: docs_list = table_to_document(table_data, document_id) table_documents.extend(docs_list) for doc in docs_list: stats['total_tables'] += 1 size = doc.metadata.get('content_size', 0) stats['total_size'] += size stats['by_document'][document_id]['count'] += 1 stats['by_document'][document_id]['size'] += size except Exception as e: log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}") continue # Log summary statistics log_message("\n" + "=" * 60) log_message("СТАТИСТИКА ПО ТАБЛИЦАМ") log_message("=" * 60) log_message(f"Всего таблиц добавлено: {stats['total_tables']}") log_message(f"Общий размер: {stats['total_size']:,} символов") log_message(f"Средний размер таблицы: {stats['total_size'] // stats['total_tables'] if stats['total_tables'] > 0 else 0:,} символов") log_message("\nПо документам:") for doc_id, doc_stats in sorted(stats['by_document'].items()): log_message(f" • {doc_id}: {doc_stats['count']} таблиц, " f"{doc_stats['size']:,} символов") log_message("=" * 60) return table_documents except Exception as e: log_message(f"❌ КРИТИЧЕСКАЯ ОШИБКА загрузки табличных данных: {str(e)}") return []