import os from collections import defaultdict import json import zipfile import pandas as pd from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from my_logging import log_message # Add this configuration at the top of your documents_prep file CUSTOM_TABLE_CONFIGS = { "ГОСТ Р 50.05.01-2018": { "tables": { "№3": {"method": "group_by_column", "group_column": "Класс герметичности и чувствительности"}, "№Б.1": {"method": "group_by_column", "group_column": "Класс чувствительности системы контроля"} } }, "ГОСТ Р 50.06.01-2017": { "tables": { "№ Б.2": {"method": "split_by_rows"} } }, "ГОСТ Р 59023.2-2020": { "tables": { "*": {"method": "group_entire_table"} # All tables } }, "НП-068-05": { "tables": { "Таблица 1": {"method": "group_by_column", "group_column": "Рабочее давление среды, МПа"}, "Таблица 2": {"method": "group_by_column", "group_column": "Рабочее давление среды, МПа"}, "Таблица Приложения 1": {"method": "group_by_column", "group_column": "Тип"} } }, "ГОСТ Р 59023.1-2020": { "tables": { "№ 1": {"method": "split_by_rows"}, "№ 2": {"method": "split_by_rows"}, "№ 3": {"method": "split_by_rows"} } } } def create_meta_info(document_name, section, table_number, table_title, extra_info=""): """Create standard meta information string""" base_info = f'Документ "{document_name}", Раздел: {section}, Номер таблицы: {table_number}, Название таблицы: {table_title}' if extra_info: base_info += f', {extra_info}' return base_info + '\n' def create_chunk_text(meta_info, headers, rows, add_row_numbers=False): """Create chunk text with headers and rows""" header_line = " | ".join(headers) chunk_lines = [meta_info + "Заголовки: " + header_line] for i, row in enumerate(rows, start=1): row_text = " | ".join([f"{h}: {row.get(h, '')}" for h in headers]) if add_row_numbers: chunk_lines.append(f"Строка {i}: {row_text}") else: chunk_lines.append(row_text) return "\n".join(chunk_lines) def group_by_column_method(table_data, document_name, group_column): """Group rows by specified column value""" documents = [] headers = table_data.get("headers", []) rows = table_data.get("data", []) section = table_data.get("section", "") table_number = table_data.get("table_number", "") table_title = table_data.get("table_title", "") grouped = defaultdict(list) for row in rows: key = row.get(group_column, "UNKNOWN") grouped[key].append(row) for group_value, group_rows in grouped.items(): meta_info = create_meta_info(document_name, section, table_number, table_title, f'Группа по "{group_column}": {group_value}') chunk_text = create_chunk_text(meta_info, headers, group_rows, add_row_numbers=True) doc = Document( text=chunk_text, metadata={ "type": "table", "table_number": table_number, "table_title": table_title, "document_id": document_name, "section": section, "section_id": section, "group_column": group_column, "group_value": group_value, "total_rows": len(group_rows), "processing_method": "group_by_column" } ) documents.append(doc) log_message(f"Created grouped chunk for {group_column}={group_value}, rows: {len(group_rows)}, length: {len(chunk_text)}") return documents def split_by_rows_method(table_data, document_name): """Split table into individual row chunks""" documents = [] headers = table_data.get("headers", []) rows = table_data.get("data", []) section = table_data.get("section", "") table_number = table_data.get("table_number", "") table_title = table_data.get("table_title", "") for i, row in enumerate(rows, start=1): meta_info = create_meta_info(document_name, section, table_number, table_title, f'Строка: {i}') chunk_text = create_chunk_text(meta_info, headers, [row]) doc = Document( text=chunk_text, metadata={ "type": "table", "table_number": table_number, "table_title": table_title, "document_id": document_name, "section": section, "section_id": section, "row_number": i, "total_rows": len(rows), "processing_method": "split_by_rows" } ) documents.append(doc) log_message(f"Split table {table_number} into {len(rows)} row chunks") return documents def group_entire_table_method(table_data, document_name): """Group entire table as one chunk""" headers = table_data.get("headers", []) rows = table_data.get("data", []) section = table_data.get("section", "") table_number = table_data.get("table_number", "") table_title = table_data.get("table_title", "") meta_info = create_meta_info(document_name, section, table_number, table_title) chunk_text = create_chunk_text(meta_info, headers, rows) doc = Document( text=chunk_text, metadata={ "type": "table", "table_number": table_number, "table_title": table_title, "document_id": document_name, "section": section, "section_id": section, "total_rows": len(rows), "processing_method": "group_entire_table" } ) log_message(f"Grouped entire table {table_number}, rows: {len(rows)}, length: {len(chunk_text)}") return [doc] def should_use_custom_processing(document_id, table_number): """Check if table should use custom processing""" for doc_pattern, config in CUSTOM_TABLE_CONFIGS.items(): if document_id.startswith(doc_pattern): tables_config = config.get("tables", {}) # Check for exact match or wildcard if table_number in tables_config or "*" in tables_config: return True, doc_pattern, tables_config.get(table_number, tables_config.get("*")) return False, None, None def process_table_with_custom_method(table_data, document_name, method_config): """Process table using custom method""" method = method_config.get("method") if method == "group_by_column": group_column = method_config.get("group_column") return group_by_column_method(table_data, document_name, group_column) elif method == "split_by_rows": return split_by_rows_method(table_data, document_name) elif method == "group_entire_table": return group_entire_table_method(table_data, document_name) else: log_message(f"Unknown custom method: {method}, falling back to default processing") return None def table_to_document(table_data, document_id=None): if isinstance(table_data, dict): doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно')) table_num = table_data.get('table_number', 'Неизвестно') # Check if this table should use custom processing use_custom, doc_pattern, method_config = should_use_custom_processing(doc_id, table_num) if use_custom: log_message(f"Using custom processing for table {table_num} in document {doc_id}") custom_docs = process_table_with_custom_method(table_data, doc_id, method_config) if custom_docs: # Return custom processed documents and skip default processing return custom_docs # Default processing for tables not in custom config table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') header_content = f"Таблица: {table_num}\nНазвание: {table_title}\nДокумент: {doc_id}\nРаздел: {section}\n" if 'data' in table_data and isinstance(table_data['data'], list): table_content = header_content + "\nДанные таблицы:\n" for row_idx, row in enumerate(table_data['data']): if isinstance(row, dict): row_text = " | ".join([f"{k}: {v}" for k, v in row.items()]) table_content += f"Строка {row_idx + 1}: {row_text}\n" doc = Document( text=table_content, metadata={ "type": "table", "table_number": table_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "total_rows": len(table_data['data']), "processing_method": "default" } ) return [doc] else: doc = Document( text=header_content, metadata={ "type": "table", "table_number": table_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "processing_method": "default" } ) return [doc] return [] def load_table_data(repo_id, hf_token, table_data_dir): """Modified function with custom table processing integration""" log_message("Начинаю загрузку табличных данных") table_files = [] try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) for file in files: if file.startswith(table_data_dir) and file.endswith('.json'): table_files.append(file) log_message(f"Найдено {len(table_files)} JSON файлов с таблицами") table_documents = [] for file_path in table_files: try: log_message(f"Обрабатываю файл: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) with open(local_path, 'r', encoding='utf-8') as f: table_data = json.load(f) if isinstance(table_data, dict): document_id = table_data.get('document', 'unknown') if 'sheets' in table_data: for sheet in table_data['sheets']: sheet['document'] = document_id # Check if this table uses custom processing table_num = sheet.get('table_number', 'Неизвестно') use_custom, _, _ = should_use_custom_processing(document_id, table_num) if use_custom: log_message(f"Skipping default processing for custom table {table_num} in {document_id}") docs_list = table_to_document(sheet, document_id) table_documents.extend(docs_list) else: # Check if this table uses custom processing table_num = table_data.get('table_number', 'Неизвестно') use_custom, _, _ = should_use_custom_processing(document_id, table_num) if use_custom: log_message(f"Skipping default processing for custom table {table_num} in {document_id}") docs_list = table_to_document(table_data, document_id) table_documents.extend(docs_list) elif isinstance(table_data, list): for table_json in table_data: document_id = table_json.get('document', 'unknown') table_num = table_json.get('table_number', 'Неизвестно') use_custom, _, _ = should_use_custom_processing(document_id, table_num) if use_custom: log_message(f"Skipping default processing for custom table {table_num} in {document_id}") docs_list = table_to_document(table_json) table_documents.extend(docs_list) except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Создано {len(table_documents)} документов из таблиц") return table_documents except Exception as e: log_message(f"Ошибка загрузки табличных данных: {str(e)}") return []