RAG_AIEXP_01 / documents_prep_1.py
MrSimple07's picture
new documents_prep
f0cb4f3
import json
import zipfile
import pandas as pd
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from my_logging import log_message
from llama_index.core.text_splitter import SentenceSplitter
from config import CHUNK_SIZE, CHUNK_OVERLAP
from table_prep import table_to_document, load_table_data
def chunk_document(doc, chunk_size=None, chunk_overlap=None):
if chunk_size is None:
chunk_size = CHUNK_SIZE
if chunk_overlap is None:
chunk_overlap = CHUNK_OVERLAP
text_splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator=" "
)
text_chunks = text_splitter.split_text(doc.text)
chunked_docs = []
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = doc.metadata.copy()
chunk_metadata.update({
"chunk_id": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text),
"original_doc_id": doc.id_ if hasattr(doc, 'id_') else None
})
chunked_doc = Document(
text=chunk_text,
metadata=chunk_metadata
)
chunked_docs.append(chunked_doc)
return chunked_docs
def process_documents_with_chunking(documents):
all_chunked_docs = []
chunk_info = []
table_count = 0
table_chunks_count = 0
image_count = 0
image_chunks_count = 0
text_chunks_count = 0
for doc in documents:
doc_type = doc.metadata.get('type', 'text')
is_already_chunked = doc.metadata.get('is_chunked', False)
if doc_type == 'table':
if is_already_chunked:
table_chunks_count += 1
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': doc.metadata.get('chunk_id', 0),
'total_chunks': doc.metadata.get('total_chunks', 1),
'chunk_size': len(doc.text),
'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text,
'type': 'table',
'table_number': doc.metadata.get('table_number', 'unknown')
})
else:
table_count += 1
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': 0,
'chunk_size': len(doc.text),
'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text,
'type': 'table',
'table_number': doc.metadata.get('table_number', 'unknown')
})
elif doc_type == 'image':
image_count += 1
doc_size = len(doc.text)
if doc_size > CHUNK_SIZE:
log_message(f"📷 CHUNKING: Изображение {doc.metadata.get('image_number', 'unknown')} | "
f"Размер: {doc_size} > {CHUNK_SIZE}")
chunked_docs = chunk_document(doc)
image_chunks_count += len(chunked_docs)
all_chunked_docs.extend(chunked_docs)
log_message(f" ✂️ Разделено на {len(chunked_docs)} чанков")
for i, chunk_doc in enumerate(chunked_docs):
chunk_info.append({
'document_id': chunk_doc.metadata.get('document_id', 'unknown'),
'section_id': chunk_doc.metadata.get('section_id', 'unknown'),
'chunk_id': i,
'chunk_size': len(chunk_doc.text),
'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text,
'type': 'image',
'image_number': chunk_doc.metadata.get('image_number', 'unknown')
})
else:
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': 0,
'chunk_size': doc_size,
'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text,
'type': 'image',
'image_number': doc.metadata.get('image_number', 'unknown')
})
else:
doc_size = len(doc.text)
if doc_size > CHUNK_SIZE:
log_message(f"📝 CHUNKING: Текст из '{doc.metadata.get('document_id', 'unknown')}' | "
f"Размер: {doc_size} > {CHUNK_SIZE}")
chunked_docs = chunk_document(doc)
text_chunks_count += len(chunked_docs)
all_chunked_docs.extend(chunked_docs)
log_message(f" ✂️ Разделен на {len(chunked_docs)} чанков")
for i, chunk_doc in enumerate(chunked_docs):
chunk_info.append({
'document_id': chunk_doc.metadata.get('document_id', 'unknown'),
'section_id': chunk_doc.metadata.get('section_id', 'unknown'),
'chunk_id': i,
'chunk_size': len(chunk_doc.text),
'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text,
'type': 'text'
})
else:
all_chunked_docs.append(doc)
chunk_info.append({
'document_id': doc.metadata.get('document_id', 'unknown'),
'section_id': doc.metadata.get('section_id', 'unknown'),
'chunk_id': 0,
'chunk_size': doc_size,
'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text,
'type': 'text'
})
log_message(f"\n{'='*60}")
log_message(f"ИТОГО ОБРАБОТАНО ДОКУМЕНТОВ:")
log_message(f" • Таблицы (целые): {table_count}")
log_message(f" • Таблицы (чанки): {table_chunks_count}")
log_message(f" • Изображения (целые): {image_count - (image_chunks_count > 0)}")
log_message(f" • Изображения (чанки): {image_chunks_count}")
log_message(f" • Текстовые чанки: {text_chunks_count}")
log_message(f" • Всего документов: {len(all_chunked_docs)}")
log_message(f"{'='*60}\n")
return all_chunked_docs, chunk_info
def extract_text_from_json(data, document_id, document_name):
documents = []
if 'sections' in data:
for section in data['sections']:
section_id = section.get('section_id', 'Unknown')
section_text = section.get('section_text', '')
section_path = f"{section_id}"
section_title = extract_section_title(section_text)
if section_text.strip():
doc = Document(
text=section_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": section_id,
"section_text": section_title[:200],
"section_path": section_path,
"level": "section"
}
)
documents.append(doc)
if 'subsections' in section:
for subsection in section['subsections']:
subsection_id = subsection.get('subsection_id', 'Unknown')
subsection_text = subsection.get('subsection_text', '')
subsection_title = extract_section_title(subsection_text)
subsection_path = f"{section_path}.{subsection_id}"
if subsection_text.strip():
doc = Document(
text=subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": subsection_id,
"section_text": subsection_title[:200],
"section_path": subsection_path,
"level": "subsection",
"parent_section": section_id,
"parent_title": section_title[:100]
}
)
documents.append(doc)
if 'sub_subsections' in subsection:
for sub_subsection in subsection['sub_subsections']:
sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown')
sub_subsection_text = sub_subsection.get('sub_subsection_text', '')
sub_subsection_title = extract_section_title(sub_subsection_text)
sub_subsection_path = f"{subsection_path}.{sub_subsection_id}"
if sub_subsection_text.strip():
doc = Document(
text=sub_subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": sub_subsection_id,
"section_text": sub_subsection_title[:200],
"section_path": sub_subsection_path,
"level": "sub_subsection",
"parent_section": subsection_id,
"parent_title": subsection_title[:100]
}
)
documents.append(doc)
if 'sub_sub_subsections' in sub_subsection:
for sub_sub_subsection in sub_subsection['sub_sub_subsections']:
sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown')
sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '')
sub_sub_subsection_title = extract_section_title(sub_sub_subsection_text)
if sub_sub_subsection_text.strip():
doc = Document(
text=sub_sub_subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": sub_sub_subsection_id,
"section_text": sub_sub_subsection_title[:200],
"section_path": f"{sub_subsection_path}.{sub_sub_subsection_id}",
"level": "sub_sub_subsection",
"parent_section": sub_subsection_id,
"parent_title": sub_subsection_title[:100]
}
)
documents.append(doc)
return documents
def load_json_documents(repo_id, hf_token, json_files_dir, download_dir):
log_message("Начинаю загрузку JSON документов")
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')]
json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')]
log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} прямых JSON файлов")
all_documents = []
for zip_file_path in zip_files:
try:
log_message(f"Загружаю ZIP архив: {zip_file_path}")
local_zip_path = hf_hub_download(
repo_id=repo_id,
filename=zip_file_path,
local_dir=download_dir,
repo_type="dataset",
token=hf_token
)
documents = extract_zip_and_process_json(local_zip_path)
all_documents.extend(documents)
log_message(f"Извлечено {len(documents)} документов из ZIP архива {zip_file_path}")
except Exception as e:
log_message(f"Ошибка обработки ZIP файла {zip_file_path}: {str(e)}")
continue
for file_path in json_files:
try:
log_message(f"Обрабатываю прямой JSON файл: {file_path}")
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir=download_dir,
repo_type="dataset",
token=hf_token
)
with open(local_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
document_metadata = json_data.get('document_metadata', {})
document_id = document_metadata.get('document_id', 'unknown')
document_name = document_metadata.get('document_name', 'unknown')
documents = extract_text_from_json(json_data, document_id, document_name)
all_documents.extend(documents)
log_message(f"Извлечено {len(documents)} документов из {file_path}")
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Всего создано {len(all_documents)} исходных документов из JSON файлов")
# Process documents through chunking function
chunked_documents, chunk_info = process_documents_with_chunking(all_documents)
log_message(f"После chunking получено {len(chunked_documents)} чанков из JSON данных")
return chunked_documents, chunk_info
except Exception as e:
log_message(f"Ошибка загрузки JSON документов: {str(e)}")
return [], []
def extract_section_title(section_text):
if not section_text.strip():
return ""
lines = section_text.strip().split('\n')
first_line = lines[0].strip()
if len(first_line) < 200 and not first_line.endswith('.'):
return first_line
# Otherwise, extract first sentence
sentences = first_line.split('.')
if len(sentences) > 1:
return sentences[0].strip()
return first_line[:100] + "..." if len(first_line) > 100 else first_line
def extract_zip_and_process_json(zip_path):
documents = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_files = zip_ref.namelist()
json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')]
log_message(f"Найдено {len(json_files)} JSON файлов в архиве")
for json_file in json_files:
try:
log_message(f"Обрабатываю файл из архива: {json_file}")
with zip_ref.open(json_file) as f:
json_data = json.load(f)
document_metadata = json_data.get('document_metadata', {})
document_id = document_metadata.get('document_id', 'unknown')
document_name = document_metadata.get('document_name', 'unknown')
docs = extract_text_from_json(json_data, document_id, document_name)
documents.extend(docs)
log_message(f"Извлечено {len(docs)} документов из {json_file}")
except Exception as e:
log_message(f"Ошибка обработки файла {json_file}: {str(e)}")
continue
except Exception as e:
log_message(f"Ошибка извлечения ZIP архива {zip_path}: {str(e)}")
return documents
def load_image_data(repo_id, hf_token, image_data_dir):
log_message("Начинаю загрузку данных изображений")
image_files = []
try:
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
for file in files:
if file.startswith(image_data_dir) and file.endswith('.csv'):
image_files.append(file)
log_message(f"Найдено {len(image_files)} CSV файлов с изображениями")
image_documents = []
for file_path in image_files:
try:
log_message(f"Обрабатываю файл изображений: {file_path}")
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
local_dir='',
repo_type="dataset",
token=hf_token
)
df = pd.read_csv(local_path)
log_message(f"Загружено {len(df)} записей изображений из файла {file_path}")
# Обработка с правильными названиями колонок
for _, row in df.iterrows():
section_value = row.get('Раздел документа', 'Неизвестно')
content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n"
content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n"
content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" # Опечатка в названии колонки
content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n"
content += f"Раздел: {section_value}\n"
content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n"
doc = Document(
text=content,
metadata={
"type": "image",
"image_number": str(row.get('№ Изображения', 'unknown')),
"image_title": str(row.get('Название изображения', 'unknown')),
"image_description": str(row.get('Описание изображение', 'unknown')),
"document_id": str(row.get('Обозначение документа', 'unknown')),
"file_path": str(row.get('Файл изображения', 'unknown')),
"section": str(section_value),
"section_id": str(section_value)
}
)
image_documents.append(doc)
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Создано {len(image_documents)} документов из изображений")
return image_documents
except Exception as e:
log_message(f"Ошибка загрузки данных изображений: {str(e)}")
return []
def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir):
log_message("Загружаю данные чанков из CSV")
try:
chunks_csv_path = hf_hub_download(
repo_id=repo_id,
filename=chunks_filename,
local_dir=download_dir,
repo_type="dataset",
token=hf_token
)
chunks_df = pd.read_csv(chunks_csv_path)
log_message(f"Загружено {len(chunks_df)} чанков из CSV")
text_column = None
for col in chunks_df.columns:
if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower():
text_column = col
break
if text_column is None:
text_column = chunks_df.columns[0]
log_message(f"Использую колонку: {text_column}")
documents = []
for i, (_, row) in enumerate(chunks_df.iterrows()):
doc = Document(
text=str(row[text_column]),
metadata={
"chunk_id": row.get('chunk_id', i),
"document_id": row.get('document_id', 'unknown'),
"type": "text"
}
)
documents.append(doc)
log_message(f"Создано {len(documents)} текстовых документов из CSV")
return documents, chunks_df
except Exception as e:
log_message(f"Ошибка загрузки CSV данных: {str(e)}")
return [], None