RAG_AIEXP_01 / table_prep.py
MrSimple07's picture
a new way with keywords
90e6b4c
raw
history blame
11 kB
from collections import defaultdict
import json
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from my_logging import log_message
def create_table_content(table_data):
"""Create formatted content from table data"""
doc_id = table_data.get('document_id', table_data.get('document', 'Неизвестно'))
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
content = f"Таблица: {table_num}\n"
content += f"Название: {table_title}\n"
content += f"Документ: {doc_id}\n"
content += f"Раздел: {section}\n"
headers = table_data.get('headers', [])
if headers:
content += f"\nЗаголовки: {' | '.join(headers)}\n"
if 'data' in table_data and isinstance(table_data['data'], list):
content += "\nДанные таблицы:\n"
for row_idx, row in enumerate(table_data['data'], start=1):
if isinstance(row, dict):
row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v])
content += f"Строка {row_idx}: {row_text}\n"
return content
from llama_index.core.text_splitter import SentenceSplitter
from config import CHUNK_SIZE, CHUNK_OVERLAP
# In table_prep.py - replace chunk_table_document function
def chunk_table_document(doc, chunk_size=None, chunk_overlap=None):
if chunk_size is None:
chunk_size = CHUNK_SIZE
if chunk_overlap is None:
chunk_overlap = CHUNK_OVERLAP
# Extract critical metadata from table before chunking
table_metadata = extract_table_metadata(doc.text)
text_splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator="\n"
)
text_chunks = text_splitter.split_text(doc.text)
chunked_docs = []
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = doc.metadata.copy()
# Add extracted keywords/materials to each chunk
chunk_metadata.update({
"chunk_id": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text),
"is_chunked": True,
"materials": table_metadata.get("materials", []), # All materials from table
"key_terms": table_metadata.get("key_terms", []), # Technical terms
"table_summary": table_metadata.get("summary", "") # Brief table description
})
# Enrich chunk text with context from full table
enriched_text = f"""[Таблица {doc.metadata.get('table_number')}: {doc.metadata.get('table_title')}]
[Материалы в таблице: {', '.join(table_metadata.get('materials', [])[:10])}]
[Ключевые термины: {', '.join(table_metadata.get('key_terms', [])[:10])}]
{chunk_text}"""
chunked_doc = Document(
text=enriched_text,
metadata=chunk_metadata
)
chunked_docs.append(chunked_doc)
return chunked_docs
def extract_table_metadata(table_text):
"""Extract searchable metadata from table content"""
import re
# Extract material codes (e.g., 08Х18Н10Т)
material_pattern = r'\b\d{2}[ХНТМКВБА]+\d{1,2}[ХНТМКВБА]*\d*\b'
materials = list(set(re.findall(material_pattern, table_text, re.IGNORECASE)))
# Extract GOST standards
gost_pattern = r'ГОСТ\s+[РЕН\s]*\d+[\.\-\d]*'
gosts = list(set(re.findall(gost_pattern, table_text, re.IGNORECASE)))
# Extract class/category codes
class_pattern = r'\b\d[АБВСI]+[IVX]+[a-z]*\b'
classes = list(set(re.findall(class_pattern, table_text, re.IGNORECASE)))
# Extract common technical terms
tech_terms = []
keywords = ['контроль', 'испытание', 'сертификат', 'качество', 'план',
'полуфабрикат', 'оборудование', 'арматура', 'деталь']
for keyword in keywords:
if keyword.lower() in table_text.lower():
tech_terms.append(keyword)
# Create brief summary
lines = table_text.split('\n')[:5]
summary = ' '.join([l.strip() for l in lines if l.strip()])[:200]
return {
"materials": materials,
"gosts": gosts,
"classes": classes,
"key_terms": tech_terms + gosts,
"summary": summary
}
def table_to_document(table_data, document_id=None):
if not isinstance(table_data, dict):
log_message(f"⚠️ ПРОПУЩЕНА: table_data не является словарем")
return []
doc_id = document_id or table_data.get('document_id') or table_data.get('document', 'Неизвестно')
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
table_rows = table_data.get('data', [])
if not table_rows or len(table_rows) == 0:
log_message(f"⚠️ ПРОПУЩЕНА: Таблица {table_num} из '{doc_id}' - нет данных в 'data'")
return []
content = create_table_content(table_data)
content_size = len(content)
row_count = len(table_rows)
base_doc = Document(
text=content,
metadata={
"type": "table",
"table_number": table_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"total_rows": row_count,
"content_size": content_size
}
)
if content_size > CHUNK_SIZE:
log_message(f"📊 CHUNKING: Таблица {table_num} из '{doc_id}' | "
f"Размер: {content_size} > {CHUNK_SIZE} | Строк: {row_count}")
chunked_docs = chunk_table_document(base_doc)
log_message(f" ✂️ Разделена на {len(chunked_docs)} чанков")
for i, chunk_doc in enumerate(chunked_docs):
log_message(f" Чанк {i+1}: {chunk_doc.metadata['chunk_size']} символов")
return chunked_docs
else:
log_message(f"✓ ДОБАВЛЕНА: Таблица {table_num} из документа '{doc_id}' | "
f"Размер: {content_size} символов | Строк: {row_count}")
return [base_doc]
def load_table_data(repo_id, hf_token, table_data_dir):
log_message("=" * 60)
log_message("НАЧАЛО ЗАГРУЗКИ ТАБЛИЧНЫХ ДАННЫХ")
log_message("=" * 60)
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
table_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')]
log_message(f"Найдено {len(table_files)} JSON файлов с таблицами")
table_documents = []
stats = {
'total_tables': 0,
'total_size': 0,
'by_document': defaultdict(lambda: {'count': 0, 'size': 0})
}
for file_path in table_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
log_message(f"\nОбработка файла: {file_path}")
with open(local_path, 'r', encoding='utf-8') as f:
table_data = json.load(f)
if isinstance(table_data, dict):
document_id = table_data.get('document', 'unknown')
if 'sheets' in table_data:
sorted_sheets = sorted(
table_data['sheets'],
key=lambda sheet: sheet.get('table_number', '') # or use 'table_number'
)
for sheet in sorted_sheets:
sheet['document'] = document_id
docs_list = table_to_document(sheet, document_id)
table_documents.extend(docs_list)
for doc in docs_list:
stats['total_tables'] += 1
size = doc.metadata.get('content_size', 0)
stats['total_size'] += size
stats['by_document'][document_id]['count'] += 1
stats['by_document'][document_id]['size'] += size
else:
docs_list = table_to_document(table_data, document_id)
table_documents.extend(docs_list)
for doc in docs_list:
stats['total_tables'] += 1
size = doc.metadata.get('content_size', 0)
stats['total_size'] += size
stats['by_document'][document_id]['count'] += 1
stats['by_document'][document_id]['size'] += size
except Exception as e:
log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}")
continue
# Log summary statistics
log_message("\n" + "=" * 60)
log_message("СТАТИСТИКА ПО ТАБЛИЦАМ")
log_message("=" * 60)
log_message(f"Всего таблиц добавлено: {stats['total_tables']}")
log_message(f"Общий размер: {stats['total_size']:,} символов")
log_message(f"Средний размер таблицы: {stats['total_size'] // stats['total_tables'] if stats['total_tables'] > 0 else 0:,} символов")
log_message("\nПо документам:")
for doc_id, doc_stats in sorted(stats['by_document'].items()):
log_message(f" • {doc_id}: {doc_stats['count']} таблиц, "
f"{doc_stats['size']:,} символов")
log_message("=" * 60)
return table_documents
except Exception as e:
log_message(f"❌ КРИТИЧЕСКАЯ ОШИБКА загрузки табличных данных: {str(e)}")
return []