Spaces:
Sleeping
Sleeping
| import os | |
| from collections import defaultdict | |
| import json | |
| import zipfile | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from llama_index.core import Document | |
| from my_logging import log_message | |
| # Add this configuration at the top of your documents_prep file | |
| CUSTOM_TABLE_CONFIGS = { | |
| "ГОСТ Р 50.05.01-2018": { | |
| "tables": { | |
| "№3": {"method": "group_by_column", "group_column": "Класс герметичности и чувствительности"}, | |
| "№Б.1": {"method": "group_by_column", "group_column": "Класс чувствительности системы контроля"} | |
| } | |
| }, | |
| "ГОСТ Р 50.06.01-2017": { | |
| "tables": { | |
| "№ Б.2": {"method": "split_by_rows"} | |
| } | |
| }, | |
| "ГОСТ Р 59023.2-2020": { | |
| "tables": { | |
| "*": {"method": "group_entire_table"} # All tables | |
| } | |
| }, | |
| "НП-068-05": { | |
| "tables": { | |
| "Таблица 1": {"method": "group_by_column", "group_column": "Рабочее давление среды, МПа"}, | |
| "Таблица 2": {"method": "group_by_column", "group_column": "Рабочее давление среды, МПа"}, | |
| "Таблица Приложения 1": {"method": "group_by_column", "group_column": "Тип"} | |
| } | |
| }, | |
| "ГОСТ Р 59023.1-2020": { | |
| "tables": { | |
| "№ 1": {"method": "split_by_rows"}, | |
| "№ 2": {"method": "split_by_rows"}, | |
| "№ 3": {"method": "split_by_rows"} | |
| } | |
| } | |
| } | |
| def create_meta_info(document_name, section, table_number, table_title, extra_info=""): | |
| """Create standard meta information string""" | |
| base_info = f'Документ "{document_name}", Раздел: {section}, Номер таблицы: {table_number}, Название таблицы: {table_title}' | |
| if extra_info: | |
| base_info += f', {extra_info}' | |
| return base_info + '\n' | |
| def create_chunk_text(meta_info, headers, rows, add_row_numbers=False): | |
| """Create chunk text with headers and rows""" | |
| header_line = " | ".join(headers) | |
| chunk_lines = [meta_info + "Заголовки: " + header_line] | |
| for i, row in enumerate(rows, start=1): | |
| row_text = " | ".join([f"{h}: {row.get(h, '')}" for h in headers]) | |
| if add_row_numbers: | |
| chunk_lines.append(f"Строка {i}: {row_text}") | |
| else: | |
| chunk_lines.append(row_text) | |
| return "\n".join(chunk_lines) | |
| def group_by_column_method(table_data, document_name, group_column): | |
| """Group rows by specified column value""" | |
| documents = [] | |
| headers = table_data.get("headers", []) | |
| rows = table_data.get("data", []) | |
| section = table_data.get("section", "") | |
| table_number = table_data.get("table_number", "") | |
| table_title = table_data.get("table_title", "") | |
| grouped = defaultdict(list) | |
| for row in rows: | |
| key = row.get(group_column, "UNKNOWN") | |
| grouped[key].append(row) | |
| for group_value, group_rows in grouped.items(): | |
| meta_info = create_meta_info(document_name, section, table_number, table_title, | |
| f'Группа по "{group_column}": {group_value}') | |
| chunk_text = create_chunk_text(meta_info, headers, group_rows, add_row_numbers=True) | |
| doc = Document( | |
| text=chunk_text, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_number, | |
| "table_title": table_title, | |
| "document_id": document_name, | |
| "section": section, | |
| "section_id": section, | |
| "group_column": group_column, | |
| "group_value": group_value, | |
| "total_rows": len(group_rows), | |
| "processing_method": "group_by_column" | |
| } | |
| ) | |
| documents.append(doc) | |
| log_message(f"Created grouped chunk for {group_column}={group_value}, rows: {len(group_rows)}, length: {len(chunk_text)}") | |
| return documents | |
| def split_by_rows_method(table_data, document_name): | |
| """Split table into individual row chunks""" | |
| documents = [] | |
| headers = table_data.get("headers", []) | |
| rows = table_data.get("data", []) | |
| section = table_data.get("section", "") | |
| table_number = table_data.get("table_number", "") | |
| table_title = table_data.get("table_title", "") | |
| for i, row in enumerate(rows, start=1): | |
| meta_info = create_meta_info(document_name, section, table_number, table_title, f'Строка: {i}') | |
| chunk_text = create_chunk_text(meta_info, headers, [row]) | |
| doc = Document( | |
| text=chunk_text, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_number, | |
| "table_title": table_title, | |
| "document_id": document_name, | |
| "section": section, | |
| "section_id": section, | |
| "row_number": i, | |
| "total_rows": len(rows), | |
| "processing_method": "split_by_rows" | |
| } | |
| ) | |
| documents.append(doc) | |
| log_message(f"Split table {table_number} into {len(rows)} row chunks") | |
| return documents | |
| def group_entire_table_method(table_data, document_name): | |
| """Group entire table as one chunk""" | |
| headers = table_data.get("headers", []) | |
| rows = table_data.get("data", []) | |
| section = table_data.get("section", "") | |
| table_number = table_data.get("table_number", "") | |
| table_title = table_data.get("table_title", "") | |
| meta_info = create_meta_info(document_name, section, table_number, table_title) | |
| chunk_text = create_chunk_text(meta_info, headers, rows) | |
| doc = Document( | |
| text=chunk_text, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_number, | |
| "table_title": table_title, | |
| "document_id": document_name, | |
| "section": section, | |
| "section_id": section, | |
| "total_rows": len(rows), | |
| "processing_method": "group_entire_table" | |
| } | |
| ) | |
| log_message(f"Grouped entire table {table_number}, rows: {len(rows)}, length: {len(chunk_text)}") | |
| return [doc] | |
| def should_use_custom_processing(document_id, table_number): | |
| """Check if table should use custom processing""" | |
| for doc_pattern, config in CUSTOM_TABLE_CONFIGS.items(): | |
| if document_id.startswith(doc_pattern): | |
| tables_config = config.get("tables", {}) | |
| # Check for exact match or wildcard | |
| if table_number in tables_config or "*" in tables_config: | |
| return True, doc_pattern, tables_config.get(table_number, tables_config.get("*")) | |
| return False, None, None | |
| def process_table_with_custom_method(table_data, document_name, method_config): | |
| """Process table using custom method""" | |
| method = method_config.get("method") | |
| if method == "group_by_column": | |
| group_column = method_config.get("group_column") | |
| return group_by_column_method(table_data, document_name, group_column) | |
| elif method == "split_by_rows": | |
| return split_by_rows_method(table_data, document_name) | |
| elif method == "group_entire_table": | |
| return group_entire_table_method(table_data, document_name) | |
| else: | |
| log_message(f"Unknown custom method: {method}, falling back to default processing") | |
| return None | |
| def table_to_document(table_data, document_id=None): | |
| if isinstance(table_data, dict): | |
| doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно')) | |
| table_num = table_data.get('table_number', 'Неизвестно') | |
| # Check if this table should use custom processing | |
| use_custom, doc_pattern, method_config = should_use_custom_processing(doc_id, table_num) | |
| if use_custom: | |
| log_message(f"Using custom processing for table {table_num} in document {doc_id}") | |
| custom_docs = process_table_with_custom_method(table_data, doc_id, method_config) | |
| if custom_docs: | |
| # Return custom processed documents and skip default processing | |
| return custom_docs | |
| # Default processing for tables not in custom config | |
| table_title = table_data.get('table_title', 'Неизвестно') | |
| section = table_data.get('section', 'Неизвестно') | |
| header_content = f"Таблица: {table_num}\nНазвание: {table_title}\nДокумент: {doc_id}\nРаздел: {section}\n" | |
| if 'data' in table_data and isinstance(table_data['data'], list): | |
| table_content = header_content + "\nДанные таблицы:\n" | |
| for row_idx, row in enumerate(table_data['data']): | |
| if isinstance(row, dict): | |
| row_text = " | ".join([f"{k}: {v}" for k, v in row.items()]) | |
| table_content += f"Строка {row_idx + 1}: {row_text}\n" | |
| doc = Document( | |
| text=table_content, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_num, | |
| "table_title": table_title, | |
| "document_id": doc_id, | |
| "section": section, | |
| "section_id": section, | |
| "total_rows": len(table_data['data']), | |
| "processing_method": "default" | |
| } | |
| ) | |
| return [doc] | |
| else: | |
| doc = Document( | |
| text=header_content, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_num, | |
| "table_title": table_title, | |
| "document_id": doc_id, | |
| "section": section, | |
| "section_id": section, | |
| "processing_method": "default" | |
| } | |
| ) | |
| return [doc] | |
| return [] | |
| def load_table_data(repo_id, hf_token, table_data_dir): | |
| """Modified function with custom table processing integration""" | |
| log_message("Начинаю загрузку табличных данных") | |
| table_files = [] | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| for file in files: | |
| if file.startswith(table_data_dir) and file.endswith('.json'): | |
| table_files.append(file) | |
| log_message(f"Найдено {len(table_files)} JSON файлов с таблицами") | |
| table_documents = [] | |
| for file_path in table_files: | |
| try: | |
| log_message(f"Обрабатываю файл: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| local_dir='', | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| table_data = json.load(f) | |
| if isinstance(table_data, dict): | |
| document_id = table_data.get('document', 'unknown') | |
| if 'sheets' in table_data: | |
| for sheet in table_data['sheets']: | |
| sheet['document'] = document_id | |
| # Check if this table uses custom processing | |
| table_num = sheet.get('table_number', 'Неизвестно') | |
| use_custom, _, _ = should_use_custom_processing(document_id, table_num) | |
| if use_custom: | |
| log_message(f"Skipping default processing for custom table {table_num} in {document_id}") | |
| docs_list = table_to_document(sheet, document_id) | |
| table_documents.extend(docs_list) | |
| else: | |
| # Check if this table uses custom processing | |
| table_num = table_data.get('table_number', 'Неизвестно') | |
| use_custom, _, _ = should_use_custom_processing(document_id, table_num) | |
| if use_custom: | |
| log_message(f"Skipping default processing for custom table {table_num} in {document_id}") | |
| docs_list = table_to_document(table_data, document_id) | |
| table_documents.extend(docs_list) | |
| elif isinstance(table_data, list): | |
| for table_json in table_data: | |
| document_id = table_json.get('document', 'unknown') | |
| table_num = table_json.get('table_number', 'Неизвестно') | |
| use_custom, _, _ = should_use_custom_processing(document_id, table_num) | |
| if use_custom: | |
| log_message(f"Skipping default processing for custom table {table_num} in {document_id}") | |
| docs_list = table_to_document(table_json) | |
| table_documents.extend(docs_list) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Создано {len(table_documents)} документов из таблиц") | |
| return table_documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки табличных данных: {str(e)}") | |
| return [] |