RAG_AIEXP_01 / table_chunks.py
MrSimple07's picture
added the sheet name + - table number handling + 4500, 20
07e9959
import json
import os
from pathlib import Path
from config import TABLE_DATA_DIR, MAX_CHARS_TABLE, MAX_ROWS_TABLE
def normalize_text(text):
"""Normalize text for consistency"""
if not text:
return text
text = text.replace('С-', 'C')
import re
text = re.sub(r'\bС(\d)', r'С\1', text)
return text
def format_table_header(doc_id, table_identifier, table_num, table_title, section, headers):
"""Format table header"""
content = f"ТАБЛИЦА {normalize_text(table_identifier)} из документа {doc_id}\n"
if table_title:
content += f"НАЗВАНИЕ: {normalize_text(table_title)}\n"
if section:
content += f"РАЗДЕЛ: {section}\n"
content += f"{'='*70}\n"
if headers:
header_str = ' | '.join(str(h) for h in headers)
content += f"ЗАГОЛОВКИ: {header_str}\n\n"
content += "ДАННЫЕ:\n"
return content
def format_single_row(row, idx):
"""Format a single row"""
if isinstance(row, dict):
parts = [f"{k}: {v}" for k, v in row.items()
if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']]
if parts:
return f"{idx}. {' | '.join(parts)}\n"
elif isinstance(row, list):
parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']]
if parts:
return f"{idx}. {' | '.join(parts)}\n"
return ""
def format_table_rows(rows):
"""Format multiple rows"""
content = ""
for row in rows:
idx = row.get('_idx', 0)
content += format_single_row(row, idx)
return content
def format_table_footer(table_identifier, doc_id):
"""Format table footer"""
return f"\n{'='*70}\nКОНЕЦ ТАБЛИЦЫ {table_identifier} ИЗ {doc_id}\n"
def chunk_table_by_content(table_data, doc_id, max_chars=MAX_CHARS_TABLE, max_rows=MAX_ROWS_TABLE):
"""Split table into chunks"""
headers = table_data.get('headers', [])
rows = table_data.get('data', [])
table_num = table_data.get('table_number', 'unknown')
table_title = table_data.get('table_title', '')
section = table_data.get('section', '')
table_num_clean = str(table_num).strip()
table_title_normalized = normalize_text(str(table_title))
import re
if 'приложени' in section.lower():
appendix_match = re.search(r'приложени[еия]\s*(\d+|[а-яА-Я])', section.lower())
if appendix_match:
appendix_num = appendix_match.group(1).upper()
table_identifier = f"{table_num_clean} Приложение {appendix_num}"
else:
table_identifier = table_num_clean
else:
table_identifier = table_num_clean
if not rows:
return []
# Calculate base metadata size
base_content = format_table_header(doc_id, table_identifier, table_num, table_title_normalized, section, headers)
base_size = len(base_content)
available_space = max_chars - base_size - 200
# If entire table fits, return as one chunk
full_rows_content = format_table_rows([{**row, '_idx': i+1} for i, row in enumerate(rows)])
if base_size + len(full_rows_content) <= max_chars and len(rows) <= max_rows:
content = base_content + full_rows_content + format_table_footer(table_identifier, doc_id)
return [content]
# Split into chunks
chunks = []
current_rows = []
current_size = 0
for i, row in enumerate(rows):
row_text = format_single_row(row, i + 1)
row_size = len(row_text)
should_split = (current_size + row_size > available_space or len(current_rows) >= max_rows) and current_rows
if should_split:
content = base_content + format_table_rows(current_rows)
content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(rows)}\n"
content += format_table_footer(table_identifier, doc_id)
chunks.append(content)
current_rows = []
current_size = 0
row_copy = row.copy() if isinstance(row, dict) else {'data': row}
row_copy['_idx'] = i + 1
current_rows.append(row_copy)
current_size += row_size
# Add final chunk
if current_rows:
content = base_content + format_table_rows(current_rows)
content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(rows)}\n"
content += format_table_footer(table_identifier, doc_id)
chunks.append(content)
return chunks
def export_table_chunks():
"""Main function to export table chunks to text files"""
# Create output directory
output_dir = Path("table_chunks_export")
output_dir.mkdir(exist_ok=True)
# Get all JSON files from table data directory
table_dir = Path(TABLE_DATA_DIR)
if not table_dir.exists():
print(f"❌ Directory not found: {TABLE_DATA_DIR}")
return
json_files = list(table_dir.glob("*.json"))
print(f"📁 Found {len(json_files)} JSON files in {TABLE_DATA_DIR}")
total_chunks = 0
total_files = 0
for json_file in json_files:
try:
print(f"\n📄 Processing: {json_file.name}")
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
doc_id = data.get('document_id', data.get('document', 'unknown'))
# Collect all chunks for this JSON file
all_chunks = []
for sheet_idx, sheet in enumerate(data.get('sheets', [])):
sheet_doc_id = sheet.get('document_id', sheet.get('document', doc_id))
# Get chunks for this table
chunks = chunk_table_by_content(sheet, sheet_doc_id)
all_chunks.extend(chunks)
# Save all chunks to one text file
if all_chunks:
# Use the JSON filename (without .json) for the txt file
output_filename = json_file.stem + ".txt"
output_path = output_dir / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
for i, chunk in enumerate(all_chunks):
f.write(f"\n{'#'*70}\n")
f.write(f"CHUNK {i+1} of {len(all_chunks)}\n")
f.write(f"{'#'*70}\n\n")
f.write(chunk)
f.write("\n\n")
print(f" ✓ Saved: {output_filename} with {len(all_chunks)} chunks")
total_chunks += len(all_chunks)
total_files += 1
else:
print(f" ⚠️ No chunks found in {json_file.name}")
except Exception as e:
print(f" ❌ Error processing {json_file.name}: {e}")
print(f"\n{'='*60}")
print(f"✅ Export complete!")
print(f" Total files created: {total_files}")
print(f" Total chunks exported: {total_chunks}")
print(f" Output directory: {output_dir.absolute()}")
print(f"{'='*60}")
if __name__ == "__main__":
export_table_chunks()