RAG_AIEXP_01 / documents_prep.py
MrSimple07's picture
chunk size = 2048 + rows=15
2b217eb
raw
history blame
23.8 kB
import json
import zipfile
import pandas as pd
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from llama_index.core.text_splitter import SentenceSplitter
from my_logging import log_message
# Configuration
CHUNK_SIZE = 1500
CHUNK_OVERLAP = 128
def chunk_text_documents(documents):
text_splitter = SentenceSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
chunked = []
for doc in documents:
chunks = text_splitter.get_nodes_from_documents([doc])
for i, chunk in enumerate(chunks):
chunk.metadata.update({
'chunk_id': i,
'total_chunks': len(chunks),
'chunk_size': len(chunk.text) # Add chunk size
})
chunked.append(chunk)
# Log statistics
if chunked:
avg_size = sum(len(c.text) for c in chunked) / len(chunked)
min_size = min(len(c.text) for c in chunked)
max_size = max(len(c.text) for c in chunked)
log_message(f"✓ Text: {len(documents)} docs → {len(chunked)} chunks")
log_message(f" Size stats: avg={avg_size:.0f}, min={min_size}, max={max_size} chars")
return chunked
def chunk_table_by_rows(table_data, doc_id, rows_per_chunk=10, max_chars=2000):
"""
Chunk tables by rows with fallback to character limit.
Keeps 3-4 rows together, but splits individual rows if they're too large.
"""
headers = table_data.get('headers', [])
rows = table_data.get('data', [])
table_num = str(table_data.get('table_number', 'unknown')).strip()
table_title = table_data.get('table_title', '')
section = table_data.get('section', '')
# Section-aware identifier (keep your existing logic)
import re
if 'приложени' in section.lower():
appendix_match = re.search(r'приложени[еия]\s*(\d+|[а-яА-Я])', section.lower())
if appendix_match:
appendix_num = appendix_match.group(1).upper()
table_identifier = f"{table_num} Приложение {appendix_num}"
else:
table_identifier = table_num
else:
table_identifier = table_num
if not rows:
return []
log_message(f" 📊 Processing: {doc_id} - {table_identifier} ({len(rows)} rows)")
# Build base header (compact version)
base_header = f"ДОКУМЕНТ: {doc_id} | ТАБЛИЦА: {table_identifier}\n"
if table_title:
base_header += f"НАЗВАНИЕ: {table_title}\n"
base_header += f"{'='*60}\n"
if headers:
header_str = ' | '.join(str(h)[:30] for h in headers) # Truncate long headers
base_header += f"ЗАГОЛОВКИ: {header_str}\n\n"
# Calculate available space
base_size = len(base_header)
footer_size = 100
available_space = max_chars - base_size - footer_size
chunks = []
current_batch = []
current_size = 0
chunk_num = 0
for i, row in enumerate(rows):
row_text = format_single_row(row, i + 1)
row_size = len(row_text)
# Case 1: Single row exceeds max - split it internally
if row_size > available_space:
# Flush current batch first
if current_batch:
chunks.append(_create_chunk(
base_header, current_batch, table_identifier,
doc_id, table_num, table_title, section,
len(rows), chunk_num, False
))
chunk_num += 1
current_batch = []
current_size = 0
log_message(f" ⚠ Row {i+1} too large ({row_size} chars), splitting...")
# Split the large row
split_chunks = _split_large_row(
row, i + 1, base_header, available_space,
table_identifier, doc_id, table_num, table_title,
section, len(rows), chunk_num
)
chunks.extend(split_chunks)
log_message(f" → Created {len(split_chunks)} chunks from row {i+1}")
chunk_num += len(split_chunks)
continue
# Case 2: Adding this row would exceed limit - flush current batch
if current_size + row_size > available_space and current_batch:
chunks.append(_create_chunk(
base_header, current_batch, table_identifier,
doc_id, table_num, table_title, section,
len(rows), chunk_num, False
))
chunk_num += 1
current_batch = []
current_size = 0
# Case 3: Add row to current batch
current_batch.append({'row': row, 'idx': i + 1, 'text': row_text})
log_message(f" + Row {i+1} ({row_size} chars) added to chunk {chunk_num}")
current_size += row_size
# Flush if we hit target row count
if len(current_batch) >= rows_per_chunk:
chunks.append(_create_chunk(
base_header, current_batch, table_identifier,
doc_id, table_num, table_title, section,
len(rows), chunk_num, False
))
chunk_num += 1
current_batch = []
current_size = 0
# Flush remaining rows
if current_batch:
chunks.append(_create_chunk(
base_header, current_batch, table_identifier,
doc_id, table_num, table_title, section,
len(rows), chunk_num, len(chunks) == 0
))
log_message(f" Created {len(chunks)} chunks from {len(rows)} rows")
return chunks
def _create_chunk(base_header, batch, table_identifier, doc_id,
table_num, table_title, section, total_rows,
chunk_num, is_complete):
"""Helper to create a chunk with full metadata"""
content = base_header + "ДАННЫЕ:\n"
for item in batch:
content += item['text']
row_start = batch[0]['idx']
row_end = batch[-1]['idx']
# Add footer with row info
if not is_complete:
content += f"\n[Строки {row_start}-{row_end} из {total_rows}]"
# EMBED ALL METADATA IN TEXT for better retrieval
content += f"\n\n--- МЕТАДАННЫЕ ---\n"
content += f"Документ: {doc_id}\n"
content += f"Таблица: {table_identifier}\n"
content += f"Название таблицы: {table_title}\n"
content += f"Раздел: {section}\n"
content += f"Строки: {row_start}-{row_end} из {total_rows}\n"
metadata = {
'type': 'table',
'document_id': doc_id,
'table_number': table_num,
'table_identifier': table_identifier,
'table_title': table_title,
'section': section,
'chunk_id': chunk_num,
'row_start': row_start - 1,
'row_end': row_end,
'total_rows': total_rows,
'chunk_size': len(content),
'is_complete_table': is_complete,
'rows_in_chunk': len(batch)
}
return Document(text=content, metadata=metadata)
def _split_large_row(row, row_idx, base_header, max_size,
table_identifier, doc_id, table_num,
table_title, section, total_rows, base_chunk_num):
"""Split a single large row into multiple chunks"""
if isinstance(row, dict):
items = list(row.items())
else:
items = [(f"col_{i}", v) for i, v in enumerate(row)]
chunks = []
current_items = []
current_size = 0
part_num = 0
for key, value in items:
item_text = f"{key}: {value}\n"
item_size = len(item_text)
if current_size + item_size > max_size and current_items:
# Create chunk for current items
content = base_header + "ДАННЫЕ:\n"
content += f"Строка {row_idx} (часть {part_num + 1}):\n"
content += "".join(current_items)
content += f"\n[Строка {row_idx} из {total_rows} - продолжается]"
chunks.append(_create_chunk_from_text(
content, doc_id, table_num, table_identifier,
table_title, section, row_idx, row_idx,
total_rows, base_chunk_num + part_num
))
part_num += 1
current_items = []
current_size = 0
current_items.append(item_text)
current_size += item_size
# Flush remaining
if current_items:
content = base_header + "ДАННЫЕ:\n"
content += f"Строка {row_idx} (часть {part_num + 1}):\n"
content += "".join(current_items)
chunks.append(_create_chunk_from_text(
content, doc_id, table_num, table_identifier,
table_title, section, row_idx, row_idx,
total_rows, base_chunk_num + part_num
))
return chunks
def _create_chunk_from_text(content, doc_id, table_num, table_identifier,
table_title, section, row_start, row_end,
total_rows, chunk_num):
"""Helper for creating chunk from pre-built text"""
metadata = {
'type': 'table',
'document_id': doc_id,
'table_number': table_num,
'table_identifier': table_identifier,
'table_title': table_title,
'section': section,
'chunk_id': chunk_num,
'row_start': row_start - 1,
'row_end': row_end,
'total_rows': total_rows,
'chunk_size': len(content),
'is_complete_table': False
}
return Document(text=content, metadata=metadata)
def format_single_row(row, idx):
"""Format a single row"""
if isinstance(row, dict):
parts = [f"{k}: {v}" for k, v in row.items()
if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']]
if parts:
return f"{idx}. {' | '.join(parts)}\n"
elif isinstance(row, list):
parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']]
if parts:
return f"{idx}. {' | '.join(parts)}\n"
return ""
def load_table_documents(repo_id, hf_token, table_dir):
log_message("Loading tables...")
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
table_files = [f for f in files if f.startswith(table_dir) and f.endswith('.json')]
all_chunks = []
for file_path in table_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
repo_type="dataset",
token=hf_token
)
with open(local_path, 'r', encoding='utf-8') as f:
data = json.load(f)
file_doc_id = data.get('document_id', data.get('document', 'unknown'))
for sheet in data.get('sheets', []):
sheet_doc_id = sheet.get('document_id', sheet.get('document', file_doc_id))
# USE NEW ADAPTIVE CHUNKING
chunks = chunk_table_by_rows(sheet, sheet_doc_id, max_chars=3072)
all_chunks.extend(chunks)
log_message(f" 📄 {sheet_doc_id}: {len(chunks)} chunks")
except Exception as e:
log_message(f"Error loading {file_path}: {e}")
log_message(f"✓ Loaded {len(all_chunks)} table chunks")
return all_chunks
def load_json_documents(repo_id, hf_token, json_dir):
import zipfile
import tempfile
import os
log_message("Loading JSON documents...")
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')]
zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')]
log_message(f"Found {len(json_files)} JSON files and {len(zip_files)} ZIP files")
documents = []
stats = {'success': 0, 'failed': 0, 'empty': 0}
for file_path in json_files:
try:
log_message(f" Loading: {file_path}")
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
repo_type="dataset",
token=hf_token
)
docs = extract_sections_from_json(local_path)
if docs:
documents.extend(docs)
stats['success'] += 1
log_message(f" ✓ Extracted {len(docs)} sections")
else:
stats['empty'] += 1
log_message(f" ⚠ No sections found")
except Exception as e:
stats['failed'] += 1
log_message(f" ✗ Error: {e}")
for zip_path in zip_files:
try:
log_message(f" Processing ZIP: {zip_path}")
local_zip = hf_hub_download(
repo_id=repo_id,
filename=zip_path,
repo_type="dataset",
token=hf_token
)
with zipfile.ZipFile(local_zip, 'r') as zf:
json_files_in_zip = [f for f in zf.namelist()
if f.endswith('.json')
and not f.startswith('__MACOSX')
and not f.startswith('.')
and not '._' in f]
log_message(f" Found {len(json_files_in_zip)} JSON files in ZIP")
for json_file in json_files_in_zip:
try:
file_content = zf.read(json_file)
# Skip if file is too small
if len(file_content) < 10:
log_message(f" ✗ Skipping: {json_file} (file too small)")
stats['failed'] += 1
continue
# Try UTF-8 first (most common)
try:
text_content = file_content.decode('utf-8')
except UnicodeDecodeError:
try:
text_content = file_content.decode('utf-8-sig')
except UnicodeDecodeError:
try:
# Try UTF-16 (the issue you're seeing)
text_content = file_content.decode('utf-16')
except UnicodeDecodeError:
try:
text_content = file_content.decode('windows-1251')
except UnicodeDecodeError:
log_message(f" ✗ Skipping: {json_file} (encoding failed)")
stats['failed'] += 1
continue
# Validate JSON structure
if not text_content.strip().startswith('{') and not text_content.strip().startswith('['):
log_message(f" ✗ Skipping: {json_file} (not valid JSON)")
stats['failed'] += 1
continue
with tempfile.NamedTemporaryFile(mode='w', delete=False,
suffix='.json', encoding='utf-8') as tmp:
tmp.write(text_content)
tmp_path = tmp.name
docs = extract_sections_from_json(tmp_path)
if docs:
documents.extend(docs)
stats['success'] += 1
log_message(f" ✓ {json_file}: {len(docs)} sections")
else:
stats['empty'] += 1
log_message(f" ⚠ {json_file}: No sections")
os.unlink(tmp_path)
except json.JSONDecodeError as e:
stats['failed'] += 1
log_message(f" ✗ {json_file}: Invalid JSON")
except Exception as e:
stats['failed'] += 1
log_message(f" ✗ {json_file}: {str(e)[:100]}")
except Exception as e:
log_message(f" ✗ Error with ZIP: {e}")
log_message(f"="*60)
log_message(f"JSON Loading Stats:")
log_message(f" Success: {stats['success']}")
log_message(f" Empty: {stats['empty']}")
log_message(f" Failed: {stats['failed']}")
log_message(f" Total sections: {len(documents)}")
log_message(f"="*60)
return documents
def extract_sections_from_json(json_path):
"""Extract sections from a single JSON file"""
documents = []
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
doc_id = data.get('document_metadata', {}).get('document_id', 'unknown')
# Extract all section levels
for section in data.get('sections', []):
if section.get('section_text', '').strip():
documents.append(Document(
text=section['section_text'],
metadata={
'type': 'text',
'document_id': doc_id,
'section_id': section.get('section_id', '')
}
))
# Subsections
for subsection in section.get('subsections', []):
if subsection.get('subsection_text', '').strip():
documents.append(Document(
text=subsection['subsection_text'],
metadata={
'type': 'text',
'document_id': doc_id,
'section_id': subsection.get('subsection_id', '')
}
))
# Sub-subsections
for sub_sub in subsection.get('sub_subsections', []):
if sub_sub.get('sub_subsection_text', '').strip():
documents.append(Document(
text=sub_sub['sub_subsection_text'],
metadata={
'type': 'text',
'document_id': doc_id,
'section_id': sub_sub.get('sub_subsection_id', '')
}
))
except Exception as e:
log_message(f"Error extracting from {json_path}: {e}")
return documents
def load_image_data(repo_id, hf_token, image_data_dir):
log_message("Начинаю загрузку данных изображений")
image_files = []
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
for file in files:
if file.startswith(image_data_dir) and file.endswith('.csv'):
image_files.append(file)
log_message(f"Найдено {len(image_files)} CSV файлов с изображениями")
image_documents = []
for file_path in image_files:
try:
log_message(f"Обрабатываю файл изображений: {file_path}")
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
df = pd.read_csv(local_path)
log_message(f"Загружено {len(df)} записей изображений из файла {file_path}")
# Обработка с правильными названиями колонок
for _, row in df.iterrows():
section_value = row.get('Раздел документа', 'Неизвестно')
content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n"
content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n"
content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" # Опечатка в названии колонки
content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n"
content += f"Раздел: {section_value}\n"
content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n"
doc = Document(
text=content,
metadata={
"type": "image",
"image_number": str(row.get('№ Изображения', 'unknown')),
"image_title": str(row.get('Название изображения', 'unknown')),
"image_description": str(row.get('Описание изображение', 'unknown')),
"document_id": str(row.get('Обозначение документа', 'unknown')),
"file_path": str(row.get('Файл изображения', 'unknown')),
"section": str(section_value),
"section_id": str(section_value)
}
)
image_documents.append(doc)
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Создано {len(image_documents)} документов из изображений")
return image_documents
except Exception as e:
log_message(f"Ошибка загрузки данных изображений: {str(e)}")
return []
def load_all_documents(repo_id, hf_token, json_dir, table_dir, image_dir):
"""Main loader - combines all document types"""
log_message("="*60)
log_message("STARTING DOCUMENT LOADING")
log_message("="*60)
# Load text sections
text_docs = load_json_documents(repo_id, hf_token, json_dir)
text_chunks = chunk_text_documents(text_docs)
# Load tables (already chunked)
table_chunks = load_table_documents(repo_id, hf_token, table_dir)
# Load images (no chunking needed)
image_docs = load_image_data(repo_id, hf_token, image_dir)
all_docs = text_chunks + table_chunks + image_docs
log_message("="*60)
log_message(f"TOTAL DOCUMENTS: {len(all_docs)}")
log_message(f" Text chunks: {len(text_chunks)}")
log_message(f" Table chunks: {len(table_chunks)}")
log_message(f" Images: {len(image_docs)}")
log_message("="*60)
return all_docs