RAG_AIEXP_01 / table_prep.py
MrSimple07's picture
table prep changed
aa38fcf
raw
history blame
13.3 kB
from collections import defaultdict
import json
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from my_logging import log_message
def create_table_content(table_data):
"""Create formatted content from table data"""
doc_id = table_data.get('document_id', table_data.get('document', 'Неизвестно'))
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
content = f"Таблица: {table_num}\n"
content += f"Название: {table_title}\n"
content += f"Документ: {doc_id}\n"
content += f"Раздел: {section}\n"
headers = table_data.get('headers', [])
if headers:
content += f"\nЗаголовки: {' | '.join(headers)}\n"
if 'data' in table_data and isinstance(table_data['data'], list):
content += "\nДанные таблицы:\n"
for row_idx, row in enumerate(table_data['data'], start=1):
if isinstance(row, dict):
row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v])
content += f"Строка {row_idx}: {row_text}\n"
return content
from llama_index.core.text_splitter import SentenceSplitter
from config import CHUNK_SIZE, CHUNK_OVERLAP
def extract_table_metadata(table_text: str) -> dict:
words = table_text.split()
unique_words = set(words)
from collections import Counter
stopwords = {"и", "в", "на", "по", "с", "для", "из", "при", "а", "как", "или", "но", "к", "от"}
filtered = [w for w in words if len(w) > 3 and w.lower() not in stopwords]
common = Counter(filtered).most_common(15)
key_terms = [w for w, _ in common]
return {
"summary": f"Таблица содержит около {len(words)} слов и {len(unique_words)} уникальных терминов.",
"materials": [], # if you want to extract material names, hook in regex or LLM here
"key_terms": key_terms
}
def chunk_table_document(doc, chunk_size=None, chunk_overlap=None):
if chunk_size is None:
chunk_size = CHUNK_SIZE
if chunk_overlap is None:
chunk_overlap = CHUNK_OVERLAP
# Extract critical metadata from table before chunking
table_metadata = extract_table_metadata(doc.text)
table_num = doc.metadata.get('table_number', 'unknown')
table_title = doc.metadata.get('table_title', 'unknown')
doc_id = doc.metadata.get('document_id', 'unknown')
section = doc.metadata.get('section', 'unknown')
# Parse table structure from your create_table_content format
lines = doc.text.strip().split('\n')
# Find where data rows start
table_header_lines = []
data_rows = []
in_data = False
for line in lines:
if line.startswith('Данные таблицы:'):
in_data = True
table_header_lines.append(line)
elif in_data and line.startswith('Строка'):
data_rows.append(line)
elif not in_data:
table_header_lines.append(line)
table_header = '\n'.join(table_header_lines) + '\n'
if not data_rows:
log_message(f" ⚠️ Таблица {table_num}: нет строк данных, использую стандартное разбиение")
text_splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator="\n"
)
text_chunks = text_splitter.split_text(doc.text)
log_message(f" 📊 Стандартное разбиение: {len(text_chunks)} чанков")
else:
# Row-based chunking
log_message(f" 📋 Таблица {table_num}: найдено {len(data_rows)} строк данных")
header_size = len(table_header)
# Reserve space for enrichment prefix
available_size = chunk_size - header_size - 300
text_chunks = []
current_chunk_rows = []
current_size = 0
for row in data_rows:
row_size = len(row) + 1
# Check if adding this row exceeds limit
if current_size + row_size > available_size and current_chunk_rows:
# Create chunk
chunk_text = table_header + '\n'.join(current_chunk_rows)
text_chunks.append(chunk_text)
log_message(f" ✂️ Чанк создан: {len(current_chunk_rows)} строк, {len(chunk_text)} символов")
# Overlap: keep last 2 rows
overlap_count = min(2, len(current_chunk_rows))
current_chunk_rows = current_chunk_rows[-overlap_count:]
current_size = sum(len(r) + 1 for r in current_chunk_rows)
current_chunk_rows.append(row)
current_size += row_size
# Final chunk
if current_chunk_rows:
chunk_text = table_header + '\n'.join(current_chunk_rows)
text_chunks.append(chunk_text)
log_message(f" ✂️ Последний чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов")
log_message(f" 📊 Таблица {table_num} разделена на {len(text_chunks)} чанков")
# Create enriched chunks
chunked_docs = []
materials = table_metadata.get("materials", [])
key_terms = table_metadata.get("key_terms", [])
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = doc.metadata.copy()
chunk_metadata.update({
"chunk_id": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text),
"is_chunked": True,
"materials": materials,
"key_terms": key_terms,
"table_summary": table_metadata.get("summary", "")
})
# Enrichment prefix
materials_str = ', '.join(materials[:10]) if materials else 'нет'
terms_str = ', '.join(key_terms[:10]) if key_terms else 'нет'
enriched_text = f"""[Таблица {table_num}: {table_title}]
[Материалы в таблице: {materials_str}]
[Ключевые термины: {terms_str}]
{chunk_text}"""
log_message(f" ✓ Чанк {i+1}/{len(text_chunks)}: "
f"размер={len(enriched_text)}, "
f"материалов={len(materials)}, "
f"терминов={len(key_terms)}")
chunked_doc = Document(
text=enriched_text,
metadata=chunk_metadata
)
chunked_docs.append(chunked_doc)
return chunked_docs
def table_to_document(table_data, document_id=None):
if not isinstance(table_data, dict):
log_message(f"⚠️ ПРОПУЩЕНА: table_data не является словарем")
return []
doc_id = document_id or table_data.get('document_id') or table_data.get('document', 'Неизвестно')
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
table_rows = table_data.get('data', [])
if not table_rows or len(table_rows) == 0:
log_message(f"⚠️ ПРОПУЩЕНА: Таблица {table_num} из '{doc_id}' - нет данных в 'data'")
return []
content = create_table_content(table_data)
content_size = len(content)
row_count = len(table_rows)
base_doc = Document(
text=content,
metadata={
"type": "table",
"table_number": table_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"total_rows": row_count,
"content_size": content_size
}
)
if content_size > CHUNK_SIZE:
log_message(f"📊 CHUNKING: Таблица {table_num} из '{doc_id}' | "
f"Размер: {content_size} > {CHUNK_SIZE} | Строк: {row_count}")
chunked_docs = chunk_table_document(base_doc)
log_message(f" ✂️ Разделена на {len(chunked_docs)} чанков")
for i, chunk_doc in enumerate(chunked_docs):
log_message(f" Чанк {i+1}: {chunk_doc.metadata['chunk_size']} символов")
return chunked_docs
else:
log_message(f"✓ ДОБАВЛЕНА: Таблица {table_num} из документа '{doc_id}' | "
f"Размер: {content_size} символов | Строк: {row_count}")
return [base_doc]
def load_table_data(repo_id, hf_token, table_data_dir):
log_message("=" * 60)
log_message("НАЧАЛО ЗАГРУЗКИ ТАБЛИЧНЫХ ДАННЫХ")
log_message("=" * 60)
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
table_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')]
log_message(f"Найдено {len(table_files)} JSON файлов с таблицами")
table_documents = []
stats = {
'total_tables': 0,
'total_size': 0,
'by_document': defaultdict(lambda: {'count': 0, 'size': 0})
}
for file_path in table_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
log_message(f"\nОбработка файла: {file_path}")
with open(local_path, 'r', encoding='utf-8') as f:
table_data = json.load(f)
if isinstance(table_data, dict):
document_id = table_data.get('document', 'unknown')
if 'sheets' in table_data:
sorted_sheets = sorted(
table_data['sheets'],
key=lambda sheet: sheet.get('table_number', '') # or use 'table_number'
)
for sheet in sorted_sheets:
sheet['document'] = document_id
docs_list = table_to_document(sheet, document_id)
table_documents.extend(docs_list)
for doc in docs_list:
stats['total_tables'] += 1
size = doc.metadata.get('content_size', 0)
stats['total_size'] += size
stats['by_document'][document_id]['count'] += 1
stats['by_document'][document_id]['size'] += size
else:
docs_list = table_to_document(table_data, document_id)
table_documents.extend(docs_list)
for doc in docs_list:
stats['total_tables'] += 1
size = doc.metadata.get('content_size', 0)
stats['total_size'] += size
stats['by_document'][document_id]['count'] += 1
stats['by_document'][document_id]['size'] += size
except Exception as e:
log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}")
continue
# Log summary statistics
log_message("\n" + "=" * 60)
log_message("СТАТИСТИКА ПО ТАБЛИЦАМ")
log_message("=" * 60)
log_message(f"Всего таблиц добавлено: {stats['total_tables']}")
log_message(f"Общий размер: {stats['total_size']:,} символов")
log_message(f"Средний размер таблицы: {stats['total_size'] // stats['total_tables'] if stats['total_tables'] > 0 else 0:,} символов")
log_message("\nПо документам:")
for doc_id, doc_stats in sorted(stats['by_document'].items()):
log_message(f" • {doc_id}: {doc_stats['count']} таблиц, "
f"{doc_stats['size']:,} символов")
log_message("=" * 60)
return table_documents
except Exception as e:
log_message(f"❌ КРИТИЧЕСКАЯ ОШИБКА загрузки табличных данных: {str(e)}")
return []