RAG_AIEXP_01 / documents_prep.py
MrSimple07's picture
new documents_prep
f0cb4f3
raw
history blame
14.6 kB
import json
import zipfile
import pandas as pd
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from llama_index.core.text_splitter import SentenceSplitter
from my_logging import log_message
from config import CHUNK_SIZE, CHUNK_OVERLAP
import os
def load_json_documents(repo_id, hf_token, json_files_dir, download_dir):
log_message(f"Загрузка JSON документов из {json_files_dir}")
documents = []
chunk_info = []
try:
files = list_repo_files(repo_id, token=hf_token)
zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')]
log_message(f"Найдено {len(zip_files)} ZIP файлов")
for zip_file in zip_files:
zip_path = hf_hub_download(
repo_id=repo_id,
filename=zip_file,
token=hf_token,
repo_type="dataset",
local_dir=download_dir
)
log_message(f"Обрабатываю архив: {zip_file}")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
json_files = [f for f in zip_ref.namelist()
if f.endswith('.json') and not f.startswith('__MACOSX')]
log_message(f"Найдено {len(json_files)} JSON файлов в архиве")
for json_file in json_files:
try:
with zip_ref.open(json_file) as f:
json_data = json.load(f)
doc_id = json_data.get('document_id', os.path.basename(json_file))
sections = json_data.get('sections', [])
log_message(f"Обработка документа {doc_id}: {len(sections)} разделов")
for section in sections:
doc, info = process_text_section(section, doc_id)
if doc:
documents.append(doc)
chunk_info.append(info)
except Exception as e:
log_message(f"Ошибка при обработке {json_file}: {str(e)}")
log_message(f"Загружено {len(documents)} текстовых документов")
return documents, chunk_info
except Exception as e:
log_message(f"Ошибка загрузки JSON: {str(e)}")
return [], []
def process_text_section(section, doc_id):
section_id = section.get('section_id', 'unknown')
section_path = section.get('section_path', '')
section_text = section.get('section_text', '')
section_content = section.get('section_content', '')
parent_section = section.get('parent_section', '')
parent_title = section.get('parent_title', '')
level = section.get('level', 'section')
full_text = f"{section_text}\n{section_content}".strip()
if not full_text:
return None, None
metadata = {
'document_id': doc_id,
'section_id': section_id,
'section_path': section_path,
'section_text': section_text,
'parent_section': parent_section,
'parent_title': parent_title,
'level': level,
'type': 'text',
'chunk_text': full_text
}
doc = Document(
text=full_text,
metadata=metadata
)
chunk_info = {
'document_id': doc_id,
'section_id': section_id,
'section_path': section_path,
'section_text': section_text,
'parent_section': parent_section,
'parent_title': parent_title,
'level': level,
'type': 'text',
'chunk_text': full_text
}
return doc, chunk_info
def load_table_data(repo_id, hf_token, table_data_dir):
log_message(f"Загрузка табличных данных из {table_data_dir}")
documents = []
try:
files = list_repo_files(repo_id, token=hf_token)
json_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')]
log_message(f"Найдено {len(json_files)} табличных JSON файлов")
for json_file in json_files:
try:
file_path = hf_hub_download(
repo_id=repo_id,
filename=json_file,
token=hf_token,
repo_type="dataset"
)
with open(file_path, 'r', encoding='utf-8') as f:
table_data = json.load(f)
doc = create_table_document(table_data)
if doc:
documents.append(doc)
except Exception as e:
log_message(f"Ошибка при обработке таблицы {json_file}: {str(e)}")
log_message(f"Загружено {len(documents)} табличных документов")
return documents
except Exception as e:
log_message(f"Ошибка загрузки таблиц: {str(e)}")
return []
def create_table_document(table_data):
doc_id = table_data.get('document_id', 'unknown')
table_number = table_data.get('table_number', 'unknown')
table_title = table_data.get('table_title', '')
section = table_data.get('section', '')
headers = table_data.get('headers', [])
data = table_data.get('data', [])
if not data:
return None
token_count = estimate_tokens(str(table_data))
if token_count < 2000:
text = format_table_as_text(table_number, table_title, section, headers, data)
metadata = {
'document_id': doc_id,
'table_number': table_number,
'table_title': table_title,
'section': section,
'type': 'table',
'headers': str(headers),
'row_count': len(data)
}
return Document(text=text, metadata=metadata)
else:
return create_chunked_table_document(
doc_id, table_number, table_title, section, headers, data
)
def create_chunked_table_document(doc_id, table_number, table_title, section, headers, data, rows_per_chunk=30):
chunks = []
for i in range(0, len(data), rows_per_chunk):
chunk_rows = data[i:i+rows_per_chunk]
text = format_table_as_text(
table_number,
table_title,
section,
headers,
chunk_rows,
chunk_info=f"строки {i+1}-{i+len(chunk_rows)}"
)
metadata = {
'document_id': doc_id,
'table_number': table_number,
'table_title': table_title,
'section': section,
'type': 'table',
'headers': str(headers),
'chunk_index': i // rows_per_chunk,
'row_start': i,
'row_end': i + len(chunk_rows),
'row_count': len(chunk_rows)
}
chunks.append(Document(text=text, metadata=metadata))
return chunks[0] if len(chunks) == 1 else chunks
def format_table_as_text(table_number, table_title, section, headers, data, chunk_info=""):
text_parts = []
text_parts.append(f"Таблица {table_number}")
if table_title:
text_parts.append(f"Название: {table_title}")
if section:
text_parts.append(f"Раздел: {section}")
if chunk_info:
text_parts.append(f"({chunk_info})")
text_parts.append(f"\nЗаголовки: {', '.join(headers)}")
text_parts.append("\nДанные:")
for row in data[:100]:
row_text = " | ".join([str(cell) for cell in row])
text_parts.append(row_text)
return "\n".join(text_parts)
def load_image_data(repo_id, hf_token, image_data_dir):
log_message(f"Загрузка данных изображений из {image_data_dir}")
documents = []
try:
files = list_repo_files(repo_id, token=hf_token)
json_files = [f for f in files if f.startswith(image_data_dir) and f.endswith('.json')]
log_message(f"Найдено {len(json_files)} JSON файлов изображений")
for json_file in json_files:
try:
file_path = hf_hub_download(
repo_id=repo_id,
filename=json_file,
token=hf_token,
repo_type="dataset"
)
with open(file_path, 'r', encoding='utf-8') as f:
image_data = json.load(f)
doc = create_image_document(image_data)
if doc:
documents.append(doc)
except Exception as e:
log_message(f"Ошибка при обработке изображения {json_file}: {str(e)}")
log_message(f"Загружено {len(documents)} документов изображений")
return documents
except Exception as e:
log_message(f"Ошибка загрузки изображений: {str(e)}")
return []
def create_image_document(image_data):
doc_id = image_data.get('document_id', 'unknown')
image_number = image_data.get('image_number', 'unknown')
image_title = image_data.get('image_title', '')
image_description = image_data.get('image_description', '')
section = image_data.get('section', '')
text_parts = []
text_parts.append(f"Рисунок {image_number}")
if image_title:
text_parts.append(f"Название: {image_title}")
if section:
text_parts.append(f"Раздел: {section}")
if image_description:
text_parts.append(f"Описание: {image_description}")
text = "\n".join(text_parts)
metadata = {
'document_id': doc_id,
'image_number': image_number,
'image_title': image_title,
'section': section,
'type': 'image'
}
return Document(text=text, metadata=metadata)
def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir):
log_message(f"Загрузка CSV чанков из {chunks_filename}")
try:
csv_path = hf_hub_download(
repo_id=repo_id,
filename=chunks_filename,
token=hf_token,
repo_type="dataset",
local_dir=download_dir
)
df = pd.read_csv(csv_path)
log_message(f"Загружено {len(df)} строк из CSV")
documents = []
for _, row in df.iterrows():
metadata = {
'document_id': row.get('document_id', 'unknown'),
'section_id': row.get('section_id', 'unknown'),
'section_path': row.get('section_path', ''),
'type': 'text'
}
text = row.get('chunk_text', '')
if text:
doc = Document(text=text, metadata=metadata)
documents.append(doc)
log_message(f"Создано {len(documents)} документов из CSV")
return documents, df
except Exception as e:
log_message(f"Ошибка загрузки CSV: {str(e)}")
return [], None
def process_documents_with_chunking(documents):
log_message(f"Чанкинг {len(documents)} документов")
text_splitter = SentenceSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separator=" ",
backup_separators=["\n", ".", "!", "?"]
)
chunked_documents = []
chunk_info = []
for doc in documents:
doc_type = doc.metadata.get('type', 'text')
if doc_type == 'table':
if isinstance(doc, list):
chunked_documents.extend(doc)
for d in doc:
chunk_info.append(create_chunk_info(d))
else:
chunked_documents.append(doc)
chunk_info.append(create_chunk_info(doc))
elif doc_type == 'image':
chunked_documents.append(doc)
chunk_info.append(create_chunk_info(doc))
else:
token_count = estimate_tokens(doc.text)
if token_count <= CHUNK_SIZE:
chunked_documents.append(doc)
chunk_info.append(create_chunk_info(doc))
else:
nodes = text_splitter.get_nodes_from_documents([doc])
for node in nodes:
new_doc = Document(
text=node.text,
metadata=doc.metadata
)
chunked_documents.append(new_doc)
chunk_info.append(create_chunk_info(new_doc))
log_message(f"Получено {len(chunked_documents)} чанков после обработки")
return chunked_documents, chunk_info
def create_chunk_info(doc):
metadata = doc.metadata
info = {
'document_id': metadata.get('document_id', 'unknown'),
'type': metadata.get('type', 'text'),
'chunk_text': doc.text[:500]
}
if metadata.get('type') == 'table':
info['table_number'] = metadata.get('table_number', 'unknown')
info['table_title'] = metadata.get('table_title', '')
info['section'] = metadata.get('section', '')
elif metadata.get('type') == 'image':
info['image_number'] = metadata.get('image_number', 'unknown')
info['image_title'] = metadata.get('image_title', '')
info['section'] = metadata.get('section', '')
else:
info['section_id'] = metadata.get('section_id', 'unknown')
info['section_path'] = metadata.get('section_path', '')
info['section_text'] = metadata.get('section_text', '')
info['parent_section'] = metadata.get('parent_section', '')
info['parent_title'] = metadata.get('parent_title', '')
info['level'] = metadata.get('level', 'section')
return info
def estimate_tokens(text):
return len(text.split()) * 1.3