RAG_AIEXP_01 / documents_prep.py
MrSimple07's picture
fixing the json zip file reading
d3d0d1e
raw
history blame
22.2 kB
import json
import pandas as pd
import os
import zipfile
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
import logging
logger = logging.getLogger(__name__)
def log_message(message):
logger.info(message)
print(message, flush=True)
class DocumentsPreparation:
def __init__(self, repo_id, hf_token):
self.repo_id = repo_id
self.hf_token = hf_token
self.json_files_dir = "JSON"
self.table_data_dir = "Табличные данные_JSON"
self.image_data_dir = "Изображения"
self.download_dir = "rag_files"
def extract_text_from_json(self, data, document_id, document_name):
documents = []
if 'sections' in data:
for section in data['sections']:
section_id = section.get('section_id', 'Unknown')
section_text = section.get('section_text', '')
if section_text.strip():
doc = Document(
text=section_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": section_id,
"level": "section"
}
)
documents.append(doc)
if 'subsections' in section:
for subsection in section['subsections']:
subsection_id = subsection.get('subsection_id', 'Unknown')
subsection_text = subsection.get('subsection_text', '')
if subsection_text.strip():
doc = Document(
text=subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": section_id,
"subsection_id": subsection_id,
"level": "subsection"
}
)
documents.append(doc)
if 'sub_subsections' in subsection:
for sub_subsection in subsection['sub_subsections']:
sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown')
sub_subsection_text = sub_subsection.get('sub_subsection_text', '')
if sub_subsection_text.strip():
doc = Document(
text=sub_subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": section_id,
"subsection_id": subsection_id,
"sub_subsection_id": sub_subsection_id,
"level": "sub_subsection"
}
)
documents.append(doc)
if 'sub_sub_subsections' in sub_subsection:
for sub_sub_subsection in sub_subsection['sub_sub_subsections']:
sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown')
sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '')
if sub_sub_subsection_text.strip():
doc = Document(
text=sub_sub_subsection_text,
metadata={
"type": "text",
"document_id": document_id,
"document_name": document_name,
"section_id": section_id,
"subsection_id": subsection_id,
"sub_subsection_id": sub_subsection_id,
"sub_sub_subsection_id": sub_sub_subsection_id,
"level": "sub_sub_subsection"
}
)
documents.append(doc)
return documents
def extract_zip_and_process_json(self, zip_path):
"""Extract ZIP file and process JSON files inside"""
documents = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Get list of files in ZIP
zip_files = zip_ref.namelist()
json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')]
log_message(f"Найдено {len(json_files)} JSON файлов в архиве")
for json_file in json_files:
try:
log_message(f"Обрабатываю файл из архива: {json_file}")
# Read JSON file from ZIP
with zip_ref.open(json_file) as f:
json_data = json.load(f)
document_metadata = json_data.get('document_metadata', {})
document_id = document_metadata.get('document_id', 'unknown')
document_name = document_metadata.get('document_name', 'unknown')
docs = self.extract_text_from_json(json_data, document_id, document_name)
documents.extend(docs)
log_message(f"Извлечено {len(docs)} документов из {json_file}")
except Exception as e:
log_message(f"Ошибка обработки файла {json_file}: {str(e)}")
continue
except Exception as e:
log_message(f"Ошибка извлечения ZIP архива {zip_path}: {str(e)}")
return documents
def load_json_documents(self):
log_message("Начинаю загрузку JSON документов")
try:
files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token)
# Look for ZIP files in the JSON directory
zip_files = [f for f in files if f.startswith(self.json_files_dir) and f.endswith('.zip')]
# Also look for direct JSON files (fallback)
json_files = [f for f in files if f.startswith(self.json_files_dir) and f.endswith('.json')]
log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} прямых JSON файлов")
all_documents = []
# Process ZIP files first
for zip_file_path in zip_files:
try:
log_message(f"Загружаю ZIP архив: {zip_file_path}")
local_zip_path = hf_hub_download(
repo_id=self.repo_id,
filename=zip_file_path,
local_dir=self.download_dir,
repo_type="dataset",
token=self.hf_token
)
documents = self.extract_zip_and_process_json(local_zip_path)
all_documents.extend(documents)
except Exception as e:
log_message(f"Ошибка обработки ZIP файла {zip_file_path}: {str(e)}")
continue
# Process direct JSON files (if any)
for file_path in json_files:
try:
log_message(f"Обрабатываю прямой JSON файл: {file_path}")
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=file_path,
local_dir=self.download_dir,
repo_type="dataset",
token=self.hf_token
)
with open(local_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
document_metadata = json_data.get('document_metadata', {})
document_id = document_metadata.get('document_id', 'unknown')
document_name = document_metadata.get('document_name', 'unknown')
documents = self.extract_text_from_json(json_data, document_id, document_name)
all_documents.extend(documents)
log_message(f"Извлечено {len(documents)} документов из {file_path}")
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Всего создано {len(all_documents)} текстовых документов")
return all_documents
except Exception as e:
log_message(f"Ошибка загрузки JSON документов: {str(e)}")
return []
def table_to_document(self, table_data, document_id=None):
content = ""
if isinstance(table_data, dict):
doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно'))
table_num = table_data.get('table_number', 'Неизвестно')
table_title = table_data.get('table_title', 'Неизвестно')
section = table_data.get('section', 'Неизвестно')
content += f"Таблица: {table_num}\n"
content += f"Название: {table_title}\n"
content += f"Документ: {doc_id}\n"
content += f"Раздел: {section}\n"
if 'data' in table_data and isinstance(table_data['data'], list):
for row in table_data['data']:
if isinstance(row, dict):
row_text = " | ".join([f"{k}: {v}" for k, v in row.items()])
content += f"{row_text}\n"
return Document(
text=content,
metadata={
"type": "table",
"table_number": table_data.get('table_number', 'unknown'),
"table_title": table_data.get('table_title', 'unknown'),
"document_id": doc_id or table_data.get('document_id', table_data.get('document', 'unknown')),
"section": table_data.get('section', 'unknown')
}
)
def extract_zip_and_process_tables(self, zip_path):
"""Extract ZIP file and process table JSON files inside"""
documents = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Get list of files in ZIP
zip_files = zip_ref.namelist()
json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')]
log_message(f"Найдено {len(json_files)} JSON файлов таблиц в архиве")
for json_file in json_files:
try:
log_message(f"Обрабатываю файл таблицы из архива: {json_file}")
# Read JSON file from ZIP
with zip_ref.open(json_file) as f:
table_data = json.load(f)
if isinstance(table_data, dict):
document_id = table_data.get('document', 'unknown')
if 'sheets' in table_data:
for sheet in table_data['sheets']:
sheet['document'] = document_id
doc = self.table_to_document(sheet, document_id)
documents.append(doc)
else:
doc = self.table_to_document(table_data, document_id)
documents.append(doc)
elif isinstance(table_data, list):
for table_json in table_data:
doc = self.table_to_document(table_json)
documents.append(doc)
except Exception as e:
log_message(f"Ошибка обработки файла таблицы {json_file}: {str(e)}")
continue
except Exception as e:
log_message(f"Ошибка извлечения ZIP архива таблиц {zip_path}: {str(e)}")
return documents
def load_table_documents(self):
log_message("Начинаю загрузку табличных данных")
try:
files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token)
# Look for ZIP files in the table directory
zip_files = [f for f in files if f.startswith(self.table_data_dir) and f.endswith('.zip')]
# Also look for direct JSON files (fallback)
table_files = [f for f in files if f.startswith(self.table_data_dir) and f.endswith('.json')]
log_message(f"Найдено {len(zip_files)} ZIP файлов с таблицами и {len(table_files)} прямых JSON файлов")
table_documents = []
# Process ZIP files first
for zip_file_path in zip_files:
try:
log_message(f"Загружаю ZIP архив таблиц: {zip_file_path}")
local_zip_path = hf_hub_download(
repo_id=self.repo_id,
filename=zip_file_path,
local_dir=self.download_dir,
repo_type="dataset",
token=self.hf_token
)
documents = self.extract_zip_and_process_tables(local_zip_path)
table_documents.extend(documents)
except Exception as e:
log_message(f"Ошибка обработки ZIP файла таблиц {zip_file_path}: {str(e)}")
continue
# Process direct JSON files (if any)
for file_path in table_files:
try:
log_message(f"Обрабатываю прямой файл таблицы: {file_path}")
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=file_path,
local_dir=self.download_dir,
repo_type="dataset",
token=self.hf_token
)
with open(local_path, 'r', encoding='utf-8') as f:
table_data = json.load(f)
if isinstance(table_data, dict):
document_id = table_data.get('document', 'unknown')
if 'sheets' in table_data:
for sheet in table_data['sheets']:
sheet['document'] = document_id
doc = self.table_to_document(sheet, document_id)
table_documents.append(doc)
else:
doc = self.table_to_document(table_data, document_id)
table_documents.append(doc)
elif isinstance(table_data, list):
for table_json in table_data:
doc = self.table_to_document(table_json)
table_documents.append(doc)
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Создано {len(table_documents)} документов из таблиц")
return table_documents
except Exception as e:
log_message(f"Ошибка загрузки табличных данных: {str(e)}")
return []
def load_image_documents(self):
log_message("Начинаю загрузку данных изображений")
try:
files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token)
image_files = [f for f in files if f.startswith(self.image_data_dir) and f.endswith('.csv')]
log_message(f"Найдено {len(image_files)} CSV файлов с изображениями")
image_documents = []
for file_path in image_files:
try:
log_message(f"Обрабатываю файл изображений: {file_path}")
local_path = hf_hub_download(
repo_id=self.repo_id,
filename=file_path,
local_dir=self.download_dir,
repo_type="dataset",
token=self.hf_token
)
df = pd.read_csv(local_path)
log_message(f"Загружено {len(df)} записей изображений из файла {file_path}")
for _, row in df.iterrows():
content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n"
content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n"
content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n"
content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n"
content += f"Раздел: {row.get('Раздел документа', 'Неизвестно')}\n"
content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n"
doc = Document(
text=content,
metadata={
"type": "image",
"image_number": row.get('№ Изображения', 'unknown'),
"document_id": row.get('Обозначение документа', 'unknown'),
"file_path": row.get('Файл изображения', 'unknown'),
"section": row.get('Раздел документа', 'unknown')
}
)
image_documents.append(doc)
except Exception as e:
log_message(f"Ошибка обработки файла {file_path}: {str(e)}")
continue
log_message(f"Создано {len(image_documents)} документов из изображений")
return image_documents
except Exception as e:
log_message(f"Ошибка загрузки данных изображений: {str(e)}")
return []
def prepare_all_documents(self):
log_message("Подготовка всех документов")
all_documents = []
json_documents = self.load_json_documents()
all_documents.extend(json_documents)
table_documents = self.load_table_documents()
all_documents.extend(table_documents)
image_documents = self.load_image_documents()
all_documents.extend(image_documents)
log_message(f"Всего подготовлено {len(all_documents)} документов")
return all_documents