RAG_AIEXP_01 / documents_prep.py
MrSimple07's picture
new documents_prep
9160af0
raw
history blame
27.4 kB
import json
import zipfile
import pandas as pd
from collections import Counter
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from llama_index.core.text_splitter import SentenceSplitter
from my_logging import log_message
from config import CHUNK_SIZE, CHUNK_OVERLAP
# ============================================================================
# TEXT CHUNKING
# ============================================================================
def chunk_text_document(doc):
"""Split text document into chunks using sentence splitter"""
text_splitter = SentenceSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separator=" "
)
text_chunks = text_splitter.split_text(doc.text)
chunked_docs = []
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = doc.metadata.copy()
chunk_metadata.update({
"chunk_id": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text)
})
chunked_docs.append(Document(text=chunk_text, metadata=chunk_metadata))
return chunked_docs
# ============================================================================
# TABLE PROCESSING
# ============================================================================
def extract_table_metadata(table_text):
"""Extract key terms from table for enrichment"""
words = table_text.split()
# Filter stopwords and short words
stopwords = {"и", "в", "на", "по", "с", "для", "из", "при", "а", "как", "или", "но", "к", "от"}
filtered = [w for w in words if len(w) > 3 and w.lower() not in stopwords]
# Get top 15 most common terms
common = Counter(filtered).most_common(15)
key_terms = [w for w, _ in common]
return {
"summary": f"Таблица содержит {len(words)} слов",
"key_terms": key_terms
}
def create_table_content(table_data):
"""Format table data as text"""
doc_id = table_data.get('document_id', table_data.get('document', 'Неизвестно'))
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
content = f"Таблица: {table_num}\n"
content += f"Название: {table_title}\n"
content += f"Документ: {doc_id}\n"
content += f"Раздел: {section}\n"
# Add headers
headers = table_data.get('headers', [])
if headers:
content += f"\nЗаголовки: {' | '.join(headers)}\n"
# Add data rows
if 'data' in table_data and isinstance(table_data['data'], list):
content += "\nДанные таблицы:\n"
for row_idx, row in enumerate(table_data['data'], start=1):
if isinstance(row, dict):
row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v])
content += f"Строка {row_idx}: {row_text}\n"
return content
def chunk_table_by_rows(doc):
"""Split large table into chunks by rows, preserving headers"""
# Extract metadata
table_metadata = extract_table_metadata(doc.text)
table_num = doc.metadata.get('table_number', 'unknown')
table_title = doc.metadata.get('table_title', 'unknown')
# Parse table structure
lines = doc.text.strip().split('\n')
# Separate header and data rows
table_header_lines = []
data_rows = []
in_data = False
for line in lines:
if line.startswith('Данные таблицы:'):
in_data = True
table_header_lines.append(line)
elif in_data and line.startswith('Строка'):
data_rows.append(line)
elif not in_data:
table_header_lines.append(line)
table_header = '\n'.join(table_header_lines) + '\n'
# If no rows, use standard text splitting
if not data_rows:
log_message(f" ⚠️ Таблица {table_num}: нет строк данных, использую стандартное разбиение")
return chunk_text_document(doc)
log_message(f" 📋 Таблица {table_num}: найдено {len(data_rows)} строк данных")
# Row-based chunking
header_size = len(table_header)
available_size = CHUNK_SIZE - header_size - 300 # Reserve space for enrichment
text_chunks = []
current_chunk_rows = []
current_size = 0
for row in data_rows:
row_size = len(row) + 1
# If adding this row exceeds limit, create chunk
if current_size + row_size > available_size and current_chunk_rows:
chunk_text = table_header + '\n'.join(current_chunk_rows)
text_chunks.append(chunk_text)
log_message(f" ✂️ Создан чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов")
# Keep last 2 rows for overlap
overlap_count = min(2, len(current_chunk_rows))
current_chunk_rows = current_chunk_rows[-overlap_count:]
current_size = sum(len(r) + 1 for r in current_chunk_rows)
current_chunk_rows.append(row)
current_size += row_size
# Final chunk
if current_chunk_rows:
chunk_text = table_header + '\n'.join(current_chunk_rows)
text_chunks.append(chunk_text)
log_message(f" ✂️ Последний чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов")
log_message(f" 📊 Таблица {table_num} разделена на {len(text_chunks)} чанков")
# Create enriched chunks with metadata
chunked_docs = []
key_terms = table_metadata.get("key_terms", [])
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = doc.metadata.copy()
chunk_metadata.update({
"chunk_id": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text),
"is_chunked": True,
"key_terms": key_terms
})
# Add enrichment prefix
terms_str = ', '.join(key_terms[:10]) if key_terms else 'нет'
enriched_text = f"""[Таблица {table_num}: {table_title}]
[Ключевые термины: {terms_str}]
{chunk_text}"""
chunked_docs.append(Document(text=enriched_text, metadata=chunk_metadata))
return chunked_docs
def table_to_document(table_data, document_id=None):
"""Convert table data to Document, chunking if needed"""
if not isinstance(table_data, dict):
log_message(f"⚠️ ПРОПУЩЕНА: table_data не является словарем")
return []
doc_id = document_id or table_data.get('document_id') or table_data.get('document', 'Неизвестно')
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
table_rows = table_data.get('data', [])
if not table_rows:
log_message(f"⚠️ ПРОПУЩЕНА: Таблица {table_num} - нет данных")
return []
content = create_table_content(table_data)
content_size = len(content)
base_doc = Document(
text=content,
metadata={
"type": "table",
"table_number": table_num,
"table_title": table_title,
"document_id": doc_id,
"section": section,
"section_id": section,
"total_rows": len(table_rows),
"content_size": content_size
}
)
# Chunk if needed
if content_size > CHUNK_SIZE:
log_message(f"📊 CHUNKING: Таблица {table_num} | Размер: {content_size} > {CHUNK_SIZE}")
return chunk_table_by_rows(base_doc)
else:
log_message(f"✓ Таблица {table_num} | Размер: {content_size} символов | Строк: {len(table_rows)}")
return [base_doc]
def load_table_data(repo_id, hf_token, table_data_dir):
"""Load all table data from HuggingFace repo"""
log_message("=" * 60)
log_message("ЗАГРУЗКА ТАБЛИЧНЫХ ДАННЫХ")
log_message("=" * 60)
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
table_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')]
log_message(f"Найдено {len(table_files)} JSON файлов с таблицами")
table_documents = []
for file_path in table_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
log_message(f"\nОбработка файла: {file_path}")
with open(local_path, 'r', encoding='utf-8') as f:
table_data = json.load(f)
if isinstance(table_data, dict):
document_id = table_data.get('document', 'unknown')
# Process sheets if present
if 'sheets' in table_data:
sorted_sheets = sorted(
table_data['sheets'],
key=lambda sheet: sheet.get('table_number', '')
)
for sheet in sorted_sheets:
sheet['document'] = document_id
docs_list = table_to_document(sheet, document_id)
table_documents.extend(docs_list)
else:
docs_list = table_to_document(table_data, document_id)
table_documents.extend(docs_list)
except Exception as e:
log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}")
continue
log_message(f"\n{'='*60}")
log_message(f"Загружено {len(table_documents)} табличных документов")
log_message("=" * 60)
return table_documents
except Exception as e:
log_message(f"❌ ОШИБКА загрузки таблиц: {str(e)}")
return []
# ============================================================================
# JSON TEXT DOCUMENTS
# ============================================================================
def extract_section_title(section_text):
"""Extract clean title from section text"""
if not section_text.strip():
return ""
first_line = section_text.strip().split('\n')[0].strip()
if len(first_line) < 200 and not first_line.endswith('.'):
return first_line
sentences = first_line.split('.')
if len(sentences) > 1:
return sentences[0].strip()
return first_line[:100] + "..." if len(first_line) > 100 else first_line
def extract_text_from_json(data, document_id, document_name):
"""Extract text documents from JSON structure"""
documents = []
if 'sections' not in data:
return documents
for section in data['sections']:
section_id = section.get('section_id', 'Unknown')
section_text = section.get('section_text', '')
if section_text.strip():
section_title = extract_section_title(section_text)
doc = Document(
text=section_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": section_id,
"section_text": section_title[:200],
"section_path": section_id,
"level": "section"
}
)
documents.append(doc)
# Process subsections recursively
if 'subsections' in section:
for subsection in section['subsections']:
subsection_id = subsection.get('subsection_id', 'Unknown')
subsection_text = subsection.get('subsection_text', '')
if subsection_text.strip():
subsection_title = extract_section_title(subsection_text)
doc = Document(
text=subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": subsection_id,
"section_text": subsection_title[:200],
"section_path": f"{section_id}.{subsection_id}",
"level": "subsection",
"parent_section": section_id
}
)
documents.append(doc)
return documents
def load_json_documents(repo_id, hf_token, json_files_dir, download_dir):
"""Load JSON documents from HuggingFace repo"""
log_message("=" * 60)
log_message("ЗАГРУЗКА JSON ДОКУМЕНТОВ")
log_message("=" * 60)
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')]
json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')]
log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} JSON файлов")
all_documents = []
# Process ZIP files
for zip_file_path in zip_files:
try:
log_message(f"Загружаю ZIP: {zip_file_path}")
local_zip_path = hf_hub_download(
repo_id=repo_id,
filename=zip_file_path,
local_dir=download_dir,
repo_type="dataset",
token=hf_token
)
with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
json_files_in_zip = [f for f in zip_ref.namelist()
if f.endswith('.json') and not f.startswith('__MACOSX')]
for json_file in json_files_in_zip:
with zip_ref.open(json_file) as f:
json_data = json.load(f)
metadata = json_data.get('document_metadata', {})
doc_id = metadata.get('document_id', 'unknown')
doc_name = metadata.get('document_name', 'unknown')
docs = extract_text_from_json(json_data, doc_id, doc_name)
all_documents.extend(docs)
log_message(f"Извлечено документов из ZIP: {len(all_documents)}")
except Exception as e:
log_message(f"❌ ОШИБКА ZIP {zip_file_path}: {str(e)}")
continue
# Process direct JSON files
for file_path in json_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir=download_dir,
repo_type="dataset",
token=hf_token
)
with open(local_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
metadata = json_data.get('document_metadata', {})
doc_id = metadata.get('document_id', 'unknown')
doc_name = metadata.get('document_name', 'unknown')
docs = extract_text_from_json(json_data, doc_id, doc_name)
all_documents.extend(docs)
except Exception as e:
log_message(f"❌ ОШИБКА JSON {file_path}: {str(e)}")
continue
log_message(f"Всего загружено {len(all_documents)} текстовых документов")
# Chunk all documents
chunked_documents, chunk_info = process_documents_with_chunking(all_documents)
log_message(f"После chunking: {len(chunked_documents)} чанков")
log_message("=" * 60)
return chunked_documents, chunk_info
except Exception as e:
log_message(f"❌ ОШИБКА загрузки JSON: {str(e)}")
return [], []
# ============================================================================
# IMAGE DATA
# ============================================================================
def load_image_data(repo_id, hf_token, image_data_dir):
"""Load image metadata from CSV files"""
log_message("=" * 60)
log_message("ЗАГРУЗКА ДАННЫХ ИЗОБРАЖЕНИЙ")
log_message("=" * 60)
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
image_files = [f for f in files if f.startswith(image_data_dir) and f.endswith('.csv')]
log_message(f"Найдено {len(image_files)} CSV файлов с изображениями")
image_documents = []
for file_path in image_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
df = pd.read_csv(local_path)
log_message(f"Загружено {len(df)} изображений из {file_path}")
for _, row in df.iterrows():
content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n"
content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n"
content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n"
content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n"
content += f"Раздел: {row.get('Раздел документа', 'Неизвестно')}\n"
doc = Document(
text=content,
metadata={
"type": "image",
"image_number": str(row.get('№ Изображения', 'unknown')),
"image_title": str(row.get('Название изображения', 'unknown')),
"document_id": str(row.get('Обозначение документа', 'unknown')),
"section": str(row.get('Раздел документа', 'unknown'))
}
)
image_documents.append(doc)
except Exception as e:
log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}")
continue
log_message(f"Загружено {len(image_documents)} документов изображений")
log_message("=" * 60)
return image_documents
except Exception as e:
log_message(f"❌ ОШИБКА загрузки изображений: {str(e)}")
return []
# ============================================================================
# DOCUMENT PROCESSING WITH CHUNKING
# ============================================================================
def process_documents_with_chunking(documents):
"""Process all documents and chunk if needed"""
all_chunked_docs = []
chunk_info = []
stats = {
'text_chunks': 0,
'table_whole': 0,
'table_chunks': 0,
'image_whole': 0,
'image_chunks': 0
}
for doc in documents:
doc_type = doc.metadata.get('type', 'text')
is_already_chunked = doc.metadata.get('is_chunked', False)
doc_size = len(doc.text)
# Tables - already chunked or whole
if doc_type == 'table':
if is_already_chunked:
stats['table_chunks'] += 1
else:
stats['table_whole'] += 1
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': doc.metadata.get('chunk_id', 0),
'total_chunks': doc.metadata.get('total_chunks', 1),
'chunk_size': doc_size,
'chunk_preview': doc.text[:200] + "..." if doc_size > 200 else doc.text,
'type': 'table',
'table_number': doc.metadata.get('table_number', 'unknown')
})
# Images - chunk if too large
elif doc_type == 'image':
if doc_size > CHUNK_SIZE:
log_message(f"📷 CHUNKING: Изображение {doc.metadata.get('image_number')} | Размер: {doc_size}")
chunked_docs = chunk_text_document(doc)
stats['image_chunks'] += len(chunked_docs)
all_chunked_docs.extend(chunked_docs)
for i, chunk_doc in enumerate(chunked_docs):
chunk_info.append({
'document_id': chunk_doc.metadata.get('document_id', 'unknown'),
'section_id': chunk_doc.metadata.get('section_id', 'unknown'),
'chunk_id': i,
'chunk_size': len(chunk_doc.text),
'chunk_preview': chunk_doc.text[:200] + "...",
'type': 'image',
'image_number': chunk_doc.metadata.get('image_number', 'unknown')
})
else:
stats['image_whole'] += 1
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': 0,
'chunk_size': doc_size,
'chunk_preview': doc.text[:200] + "...",
'type': 'image',
'image_number': doc.metadata.get('image_number', 'unknown')
})
# Text - chunk if too large
else:
if doc_size > CHUNK_SIZE:
log_message(f"📝 CHUNKING: Текст '{doc.metadata.get('document_id')}' | Размер: {doc_size}")
chunked_docs = chunk_text_document(doc)
stats['text_chunks'] += len(chunked_docs)
all_chunked_docs.extend(chunked_docs)
for i, chunk_doc in enumerate(chunked_docs):
chunk_info.append({
'document_id': chunk_doc.metadata.get('document_id', 'unknown'),
'section_id': chunk_doc.metadata.get('section_id', 'unknown'),
'chunk_id': i,
'chunk_size': len(chunk_doc.text),
'chunk_preview': chunk_doc.text[:200] + "...",
'type': 'text'
})
else:
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': 0,
'chunk_size': doc_size,
'chunk_preview': doc.text[:200] + "...",
'type': 'text'
})
# Log summary
log_message(f"\n{'='*60}")
log_message("ИТОГОВАЯ СТАТИСТИКА:")
log_message(f" • Текстовые чанки: {stats['text_chunks']}")
log_message(f" • Таблицы (целые): {stats['table_whole']}")
log_message(f" • Таблицы (чанки): {stats['table_chunks']}")
log_message(f" • Изображения (целые): {stats['image_whole']}")
log_message(f" • Изображения (чанки): {stats['image_chunks']}")
log_message(f" • ВСЕГО ДОКУМЕНТОВ: {len(all_chunked_docs)}")
log_message(f"{'='*60}\n")
return all_chunked_docs, chunk_info
# ============================================================================
# CSV CHUNKS (Legacy support)
# ============================================================================
def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir):
"""Load pre-chunked data from CSV (legacy support)"""
log_message("Загрузка данных из CSV")
try:
chunks_csv_path = hf_hub_download(
repo_id=repo_id,
filename=chunks_filename,
local_dir=download_dir,
repo_type="dataset",
token=hf_token
)
chunks_df = pd.read_csv(chunks_csv_path)
log_message(f"Загружено {len(chunks_df)} чанков из CSV")
# Find text column
text_column = None
for col in chunks_df.columns:
if any(keyword in col.lower() for keyword in ['text', 'content', 'chunk']):
text_column = col
break
if text_column is None:
text_column = chunks_df.columns[0]
documents = []
for i, (_, row) in enumerate(chunks_df.iterrows()):
doc = Document(
text=str(row[text_column]),
metadata={
"chunk_id": row.get('chunk_id', i),
"document_id": row.get('document_id', 'unknown'),
"type": "text"
}
)
documents.append(doc)
log_message(f"Создано {len(documents)} документов из CSV")
return documents, chunks_df
except Exception as e:
log_message(f"❌ ОШИБКА загрузки CSV: {str(e)}")
return [], None