RAG_AIEXP_01 / table_prep.py
MrSimple07's picture
added sheet_name
6370d73
raw
history blame
9.75 kB
from llama_index.core.text_splitter import SentenceSplitter
from llama_index.core import Document
from config import CHUNK_SIZE, CHUNK_OVERLAP
from my_logging import log_message
def normalize_table_number(table_num, section):
"""Normalize table numbers for consistent retrieval"""
if not table_num or table_num == 'Неизвестно':
return 'Неизвестно'
# Clean up common prefixes
tn = str(table_num).replace('Таблица', '').replace('№', '').strip()
# Add section context for appendix tables
if section and ('Приложение' in str(section) or 'приложение' in str(section).lower()):
return f"№{tn} ({section})"
return f"№{tn}"
def create_table_content(table_data):
"""Create formatted content optimized for semantic search"""
doc_id = (
table_data.get('document_id') or
table_data.get('document') or
table_data.get('Обозначение документа') or
'Неизвестно'
)
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = (
table_data.get('section') or
table_data.get('Раздел документа') or
'Неизвестно'
)
sheet_name = table_data.get('sheet_name', '')
# Enhanced table number with appendix context
normalized_num = normalize_table_number(table_num, section)
if 'Приложени' in str(section):
# Extract appendix number
import re
appendix_match = re.search(r'Приложени[ея]\s*(\d+)', str(section))
if appendix_match:
appendix_num = appendix_match.group(1)
normalized_num = f"{normalized_num} Приложения {appendix_num}"
# Build searchable header
content = f"Документ: {doc_id}\n"
content += f"Раздел: {section}\n"
content += f"Таблица: {normalized_num}\n"
content += f"Название: {table_title}\n"
if sheet_name:
content += f"Лист: {sheet_name}\n"
content += f"\n"
headers = table_data.get('headers', [])
if headers:
header_str = ' | '.join(str(h) for h in headers)
content += f"Колонки: {header_str}\n\n"
# CRITICAL: Preserve searchable row identifiers
if 'data' in table_data and isinstance(table_data['data'], list):
for row_idx, row in enumerate(table_data['data'], start=1):
if isinstance(row, dict):
# Extract ALL key-value pairs naturally
row_parts = []
for k, v in row.items():
if v and str(v).strip() and str(v) != 'nan':
row_parts.append(f"{k}: {v}")
if row_parts:
content += ' | '.join(row_parts) + "\n"
elif isinstance(row, list):
row_str = ' | '.join([str(v) for v in row if v and str(v).strip() and str(v) != 'nan'])
if row_str:
content += row_str + "\n"
return content, normalized_num
def chunk_table_document(doc, chunk_size=None, chunk_overlap=None):
if chunk_size is None:
chunk_size = CHUNK_SIZE
if chunk_overlap is None:
chunk_overlap = CHUNK_OVERLAP
table_num = doc.metadata.get('table_number', 'unknown')
doc_id = doc.metadata.get('document_id', 'unknown')
section = doc.metadata.get('section', 'Неизвестно')
full_table_id = f"{doc_id} | {section} | {table_num}"
lines = doc.text.strip().split('\n')
# Find where data rows start
data_start_idx = 0
for i, line in enumerate(lines):
if line.startswith('Колонки:'):
data_start_idx = i + 2 # Skip header and blank line
break
table_header = '\n'.join(lines[:data_start_idx])
data_rows = lines[data_start_idx:]
if not data_rows or len(doc.text) < chunk_size * 1.5:
log_message(f" 📊 {full_table_id}: малая таблица, без разбиения")
return [doc]
log_message(f" 📋 {full_table_id}: {len(data_rows)} строк → chunking")
header_size = len(table_header)
available_size = chunk_size - header_size - 100
text_chunks = []
current_chunk_rows = []
current_size = 0
for row in data_rows:
row_size = len(row) + 1
if current_size + row_size > available_size and current_chunk_rows:
chunk_text = table_header + '\n' + '\n'.join(current_chunk_rows)
text_chunks.append(chunk_text)
# Keep last 2 rows for overlap
overlap_count = min(2, len(current_chunk_rows))
current_chunk_rows = current_chunk_rows[-overlap_count:]
current_size = sum(len(r) + 1 for r in current_chunk_rows)
current_chunk_rows.append(row)
current_size += row_size
if current_chunk_rows:
chunk_text = table_header + '\n' + '\n'.join(current_chunk_rows)
text_chunks.append(chunk_text)
log_message(f" ✂️ {full_table_id}{len(text_chunks)} чанков")
chunked_docs = []
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = doc.metadata.copy()
chunk_metadata.update({
"chunk_id": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text),
"is_chunked": True,
"full_table_id": full_table_id,
"table_number_normalized": doc.metadata.get('table_number_normalized')
})
chunked_doc = Document(
text=chunk_text,
metadata=chunk_metadata
)
chunked_docs.append(chunked_doc)
return chunked_docs
def table_to_document(table_data, document_id=None):
"""Convert table data to Document with complete metadata"""
if not isinstance(table_data, dict):
return []
sheet_doc_id = (
table_data.get('document_id') or
table_data.get('document') or
table_data.get('Обозначение документа')
)
doc_id = sheet_doc_id or document_id or 'Неизвестно'
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', table_data.get('Раздел документа', 'Неизвестно'))
sheet_name = table_data.get('sheet_name', '')
table_rows = table_data.get('data', [])
if not table_rows:
log_message(f"⚠️ Таблица {table_num} ({doc_id}) пропущена: нет данных")
return []
content, normalized_num = create_table_content(table_data)
content_size = len(content)
base_doc = Document(
text=content,
metadata={
"type": "table",
"table_number": table_num,
"table_number_normalized": normalized_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"sheet_name": sheet_name,
"total_rows": len(table_rows),
"content_size": content_size,
"full_table_id": f"{doc_id} | {section} | {normalized_num}"
}
)
if content_size > CHUNK_SIZE:
log_message(f"📊 CHUNKING: {doc_id} | {normalized_num} | {content_size} > {CHUNK_SIZE}")
return chunk_table_document(base_doc)
else:
log_message(f"✓ {doc_id} | {normalized_num} ({content_size} символов)")
return [base_doc]
def table_to_document(table_data, document_id=None):
"""Convert table data to Document with proper metadata"""
if not isinstance(table_data, dict):
return []
# FIXED: Extract sheet-level document_id first
sheet_doc_id = (
table_data.get('document_id') or
table_data.get('document') or
table_data.get('Обозначение документа')
)
# Use sheet doc_id if available, otherwise use passed document_id
doc_id = sheet_doc_id or document_id or 'Неизвестно'
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', table_data.get('Раздел документа', 'Неизвестно'))
table_rows = table_data.get('data', [])
if not table_rows:
log_message(f"⚠️ Таблица {table_num} ({doc_id}) пропущена: нет данных")
return []
content, normalized_num = create_table_content(table_data)
content_size = len(content)
base_doc = Document(
text=content,
metadata={
"type": "table",
"table_number": table_num,
"table_number_normalized": normalized_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"total_rows": len(table_rows),
"content_size": content_size,
"full_table_id": f"{doc_id} | {section} | {normalized_num}"
}
)
if content_size > CHUNK_SIZE:
log_message(f"📊 CHUNKING: {doc_id} | {normalized_num} | {content_size} > {CHUNK_SIZE}")
return chunk_table_document(base_doc)
else:
log_message(f"✓ {doc_id} | {normalized_num} ({content_size} символов)")
return [base_doc]