RAG_AIEXP_01 / table_prep.py
MrSimple07's picture
table processing + new version of np104
5884230
raw
history blame
14.2 kB
import os
from collections import defaultdict
import json
import zipfile
import pandas as pd
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from my_logging import log_message
# Add this configuration at the top of your documents_prep file
CUSTOM_TABLE_CONFIGS = {
"ГОСТ Р 50.05.01-2018": {
"tables": {
"№3": {"method": "group_by_column", "group_column": "Класс герметичности и чувствительности"},
"№Б.1": {"method": "group_by_column", "group_column": "Класс чувствительности системы контроля"}
}
},
"ГОСТ Р 50.06.01-2017": {
"tables": {
"№ Б.2": {"method": "split_by_rows"}
}
},
"ГОСТ Р 59023.2-2020": {
"tables": {
"*": {"method": "group_entire_table"} # All tables
}
},
"НП-068-05": {
"tables": {
"Таблица 1": {"method": "group_by_column", "group_column": "Рабочее давление среды, МПа"},
"Таблица 2": {"method": "group_by_column", "group_column": "Рабочее давление среды, МПа"},
"Таблица Приложения 1": {"method": "group_by_column", "group_column": "Тип"}
}
},
"ГОСТ Р 59023.1-2020": {
"tables": {
"№ 1": {"method": "split_by_rows"},
"№ 2": {"method": "split_by_rows"},
"№ 3": {"method": "split_by_rows"}
}
}
}
def create_meta_info(document_name, section, table_number, table_title, extra_info=""):
"""Create standard meta information string"""
base_info = f'Документ "{document_name}", Раздел: {section}, Номер таблицы: {table_number}, Название таблицы: {table_title}'
if extra_info:
base_info += f', {extra_info}'
return base_info + '\n'
def create_chunk_text(meta_info, headers, rows, add_row_numbers=False):
"""Create chunk text with headers and rows"""
header_line = " | ".join(headers)
chunk_lines = [meta_info + "Заголовки: " + header_line]
for i, row in enumerate(rows, start=1):
row_text = " | ".join([f"{h}: {row.get(h, '')}" for h in headers])
if add_row_numbers:
chunk_lines.append(f"Строка {i}: {row_text}")
else:
chunk_lines.append(row_text)
return "\n".join(chunk_lines)
def group_by_column_method(table_data, document_name, group_column):
"""Group rows by specified column value"""
documents = []
headers = table_data.get("headers", [])
rows = table_data.get("data", [])
section = table_data.get("section", "")
table_number = table_data.get("table_number", "")
table_title = table_data.get("table_title", "")
grouped = defaultdict(list)
for row in rows:
key = row.get(group_column, "UNKNOWN")
grouped[key].append(row)
for group_value, group_rows in grouped.items():
meta_info = create_meta_info(document_name, section, table_number, table_title,
f'Группа по "{group_column}": {group_value}')
chunk_text = create_chunk_text(meta_info, headers, group_rows, add_row_numbers=True)
doc = Document(
text=chunk_text,
metadata={
"type": "table",
"table_number": table_number,
"table_title": table_title,
"document_id": document_name,
"section": section,
"section_id": section,
"group_column": group_column,
"group_value": group_value,
"total_rows": len(group_rows),
"processing_method": "group_by_column"
}
)
documents.append(doc)
log_message(f"Created grouped chunk for {group_column}={group_value}, rows: {len(group_rows)}, length: {len(chunk_text)}")
return documents
def split_by_rows_method(table_data, document_name):
"""Split table into individual row chunks"""
documents = []
headers = table_data.get("headers", [])
rows = table_data.get("data", [])
section = table_data.get("section", "")
table_number = table_data.get("table_number", "")
table_title = table_data.get("table_title", "")
for i, row in enumerate(rows, start=1):
meta_info = create_meta_info(document_name, section, table_number, table_title, f'Строка: {i}')
chunk_text = create_chunk_text(meta_info, headers, [row])
doc = Document(
text=chunk_text,
metadata={
"type": "table",
"table_number": table_number,
"table_title": table_title,
"document_id": document_name,
"section": section,
"section_id": section,
"row_number": i,
"total_rows": len(rows),
"processing_method": "split_by_rows"
}
)
documents.append(doc)
log_message(f"Split table {table_number} into {len(rows)} row chunks")
return documents
def group_entire_table_method(table_data, document_name):
"""Group entire table as one chunk"""
headers = table_data.get("headers", [])
rows = table_data.get("data", [])
section = table_data.get("section", "")
table_number = table_data.get("table_number", "")
table_title = table_data.get("table_title", "")
meta_info = create_meta_info(document_name, section, table_number, table_title)
chunk_text = create_chunk_text(meta_info, headers, rows)
doc = Document(
text=chunk_text,
metadata={
"type": "table",
"table_number": table_number,
"table_title": table_title,
"document_id": document_name,
"section": section,
"section_id": section,
"total_rows": len(rows),
"processing_method": "group_entire_table"
}
)
log_message(f"Grouped entire table {table_number}, rows: {len(rows)}, length: {len(chunk_text)}")
return [doc]
def should_use_custom_processing(document_id, table_number):
"""Check if table should use custom processing"""
for doc_pattern, config in CUSTOM_TABLE_CONFIGS.items():
if document_id.startswith(doc_pattern):
tables_config = config.get("tables", {})
# Check for exact match or wildcard
if table_number in tables_config or "*" in tables_config:
return True, doc_pattern, tables_config.get(table_number, tables_config.get("*"))
return False, None, None
def process_table_with_custom_method(table_data, document_name, method_config):
"""Process table using custom method"""
method = method_config.get("method")
if method == "group_by_column":
group_column = method_config.get("group_column")
return group_by_column_method(table_data, document_name, group_column)
elif method == "split_by_rows":
return split_by_rows_method(table_data, document_name)
elif method == "group_entire_table":
return group_entire_table_method(table_data, document_name)
else:
log_message(f"Unknown custom method: {method}, falling back to default processing")
return None
def table_to_document(table_data, document_id=None):
if isinstance(table_data, dict):
doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно'))
table_num = table_data.get('table_number', 'Неизвестно')
# Check if this table should use custom processing
use_custom, doc_pattern, method_config = should_use_custom_processing(doc_id, table_num)
if use_custom:
log_message(f"Using custom processing for table {table_num} in document {doc_id}")
custom_docs = process_table_with_custom_method(table_data, doc_id, method_config)
if custom_docs:
# Return custom processed documents and skip default processing
return custom_docs
# Default processing for tables not in custom config
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
header_content = f"Таблица: {table_num}\nНазвание: {table_title}\nДокумент: {doc_id}\nРаздел: {section}\n"
if 'data' in table_data and isinstance(table_data['data'], list):
table_content = header_content + "\nДанные таблицы:\n"
for row_idx, row in enumerate(table_data['data']):
if isinstance(row, dict):
row_text = " | ".join([f"{k}: {v}" for k, v in row.items()])
table_content += f"Строка {row_idx + 1}: {row_text}\n"
doc = Document(
text=table_content,
metadata={
"type": "table",
"table_number": table_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"total_rows": len(table_data['data']),
"processing_method": "default"
}
)
return [doc]
else:
doc = Document(
text=header_content,
metadata={
"type": "table",
"table_number": table_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"processing_method": "default"
}
)
return [doc]
return []
def load_table_data(repo_id, hf_token, table_data_dir):
"""Modified function with custom table processing integration"""
log_message("Начинаю загрузку табличных данных")
table_files = []
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
for file in files:
if file.startswith(table_data_dir) and file.endswith('.json'):
table_files.append(file)
log_message(f"Найдено {len(table_files)} JSON файлов с таблицами")
table_documents = []
for file_path in table_files:
try:
log_message(f"Обрабатываю файл: {file_path}")
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
with open(local_path, 'r', encoding='utf-8') as f:
table_data = json.load(f)
if isinstance(table_data, dict):
document_id = table_data.get('document', 'unknown')
if 'sheets' in table_data:
for sheet in table_data['sheets']:
sheet['document'] = document_id
# Check if this table uses custom processing
table_num = sheet.get('table_number', 'Неизвестно')
use_custom, _, _ = should_use_custom_processing(document_id, table_num)
if use_custom:
log_message(f"Skipping default processing for custom table {table_num} in {document_id}")
docs_list = table_to_document(sheet, document_id)
table_documents.extend(docs_list)
else:
# Check if this table uses custom processing
table_num = table_data.get('table_number', 'Неизвестно')
use_custom, _, _ = should_use_custom_processing(document_id, table_num)
if use_custom:
log_message(f"Skipping default processing for custom table {table_num} in {document_id}")
docs_list = table_to_document(table_data, document_id)
table_documents.extend(docs_list)
elif isinstance(table_data, list):
for table_json in table_data:
document_id = table_json.get('document', 'unknown')
table_num = table_json.get('table_number', 'Неизвестно')
use_custom, _, _ = should_use_custom_processing(document_id, table_num)
if use_custom:
log_message(f"Skipping default processing for custom table {table_num} in {document_id}")
docs_list = table_to_document(table_json)
table_documents.extend(docs_list)
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Создано {len(table_documents)} документов из таблиц")
return table_documents
except Exception as e:
log_message(f"Ошибка загрузки табличных данных: {str(e)}")
return []