import json import pandas as pd import os import zipfile from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document import logging logger = logging.getLogger(__name__) def log_message(message): logger.info(message) print(message, flush=True) class DocumentsPreparation: def __init__(self, repo_id, hf_token): self.repo_id = repo_id self.hf_token = hf_token self.json_files_dir = "JSON" self.table_data_dir = "Табличные данные_JSON" self.image_data_dir = "Изображения" self.download_dir = "rag_files" def extract_text_from_json(self, data, document_id, document_name): documents = [] if 'sections' in data: for section in data['sections']: section_id = section.get('section_id', 'Unknown') section_text = section.get('section_text', '') if section_text.strip(): doc = Document( text=section_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "level": "section" } ) documents.append(doc) if 'subsections' in section: for subsection in section['subsections']: subsection_id = subsection.get('subsection_id', 'Unknown') subsection_text = subsection.get('subsection_text', '') if subsection_text.strip(): doc = Document( text=subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "subsection_id": subsection_id, "level": "subsection" } ) documents.append(doc) if 'sub_subsections' in subsection: for sub_subsection in subsection['sub_subsections']: sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown') sub_subsection_text = sub_subsection.get('sub_subsection_text', '') if sub_subsection_text.strip(): doc = Document( text=sub_subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "subsection_id": subsection_id, "sub_subsection_id": sub_subsection_id, "level": "sub_subsection" } ) documents.append(doc) if 'sub_sub_subsections' in sub_subsection: for sub_sub_subsection in sub_subsection['sub_sub_subsections']: sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown') sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '') if sub_sub_subsection_text.strip(): doc = Document( text=sub_sub_subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "subsection_id": subsection_id, "sub_subsection_id": sub_subsection_id, "sub_sub_subsection_id": sub_sub_subsection_id, "level": "sub_sub_subsection" } ) documents.append(doc) return documents def extract_zip_and_process_json(self, zip_path): """Extract ZIP file and process JSON files inside""" documents = [] try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: # Get list of files in ZIP zip_files = zip_ref.namelist() json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] log_message(f"Найдено {len(json_files)} JSON файлов в архиве") for json_file in json_files: try: log_message(f"Обрабатываю файл из архива: {json_file}") # Read JSON file from ZIP with zip_ref.open(json_file) as f: json_data = json.load(f) document_metadata = json_data.get('document_metadata', {}) document_id = document_metadata.get('document_id', 'unknown') document_name = document_metadata.get('document_name', 'unknown') docs = self.extract_text_from_json(json_data, document_id, document_name) documents.extend(docs) log_message(f"Извлечено {len(docs)} документов из {json_file}") except Exception as e: log_message(f"Ошибка обработки файла {json_file}: {str(e)}") continue except Exception as e: log_message(f"Ошибка извлечения ZIP архива {zip_path}: {str(e)}") return documents def load_json_documents(self): log_message("Начинаю загрузку JSON документов") try: files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) # Look for ZIP files in the JSON directory zip_files = [f for f in files if f.startswith(self.json_files_dir) and f.endswith('.zip')] # Also look for direct JSON files (fallback) json_files = [f for f in files if f.startswith(self.json_files_dir) and f.endswith('.json')] log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} прямых JSON файлов") all_documents = [] # Process ZIP files first for zip_file_path in zip_files: try: log_message(f"Загружаю ZIP архив: {zip_file_path}") local_zip_path = hf_hub_download( repo_id=self.repo_id, filename=zip_file_path, local_dir=self.download_dir, repo_type="dataset", token=self.hf_token ) documents = self.extract_zip_and_process_json(local_zip_path) all_documents.extend(documents) except Exception as e: log_message(f"Ошибка обработки ZIP файла {zip_file_path}: {str(e)}") continue # Process direct JSON files (if any) for file_path in json_files: try: log_message(f"Обрабатываю прямой JSON файл: {file_path}") local_path = hf_hub_download( repo_id=self.repo_id, filename=file_path, local_dir=self.download_dir, repo_type="dataset", token=self.hf_token ) with open(local_path, 'r', encoding='utf-8') as f: json_data = json.load(f) document_metadata = json_data.get('document_metadata', {}) document_id = document_metadata.get('document_id', 'unknown') document_name = document_metadata.get('document_name', 'unknown') documents = self.extract_text_from_json(json_data, document_id, document_name) all_documents.extend(documents) log_message(f"Извлечено {len(documents)} документов из {file_path}") except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Всего создано {len(all_documents)} текстовых документов") return all_documents except Exception as e: log_message(f"Ошибка загрузки JSON документов: {str(e)}") return [] def table_to_document(self, table_data, document_id=None): content = "" if isinstance(table_data, dict): doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно')) table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') content += f"Таблица: {table_num}\n" content += f"Название: {table_title}\n" content += f"Документ: {doc_id}\n" content += f"Раздел: {section}\n" if 'data' in table_data and isinstance(table_data['data'], list): for row in table_data['data']: if isinstance(row, dict): row_text = " | ".join([f"{k}: {v}" for k, v in row.items()]) content += f"{row_text}\n" return Document( text=content, metadata={ "type": "table", "table_number": table_data.get('table_number', 'unknown'), "table_title": table_data.get('table_title', 'unknown'), "document_id": doc_id or table_data.get('document_id', table_data.get('document', 'unknown')), "section": table_data.get('section', 'unknown') } ) def extract_zip_and_process_tables(self, zip_path): """Extract ZIP file and process table JSON files inside""" documents = [] try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: # Get list of files in ZIP zip_files = zip_ref.namelist() json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] log_message(f"Найдено {len(json_files)} JSON файлов таблиц в архиве") for json_file in json_files: try: log_message(f"Обрабатываю файл таблицы из архива: {json_file}") # Read JSON file from ZIP with zip_ref.open(json_file) as f: table_data = json.load(f) if isinstance(table_data, dict): document_id = table_data.get('document', 'unknown') if 'sheets' in table_data: for sheet in table_data['sheets']: sheet['document'] = document_id doc = self.table_to_document(sheet, document_id) documents.append(doc) else: doc = self.table_to_document(table_data, document_id) documents.append(doc) elif isinstance(table_data, list): for table_json in table_data: doc = self.table_to_document(table_json) documents.append(doc) except Exception as e: log_message(f"Ошибка обработки файла таблицы {json_file}: {str(e)}") continue except Exception as e: log_message(f"Ошибка извлечения ZIP архива таблиц {zip_path}: {str(e)}") return documents def load_table_documents(self): log_message("Начинаю загрузку табличных данных") try: files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) # Look for ZIP files in the table directory zip_files = [f for f in files if f.startswith(self.table_data_dir) and f.endswith('.zip')] # Also look for direct JSON files (fallback) table_files = [f for f in files if f.startswith(self.table_data_dir) and f.endswith('.json')] log_message(f"Найдено {len(zip_files)} ZIP файлов с таблицами и {len(table_files)} прямых JSON файлов") table_documents = [] # Process ZIP files first for zip_file_path in zip_files: try: log_message(f"Загружаю ZIP архив таблиц: {zip_file_path}") local_zip_path = hf_hub_download( repo_id=self.repo_id, filename=zip_file_path, local_dir=self.download_dir, repo_type="dataset", token=self.hf_token ) documents = self.extract_zip_and_process_tables(local_zip_path) table_documents.extend(documents) except Exception as e: log_message(f"Ошибка обработки ZIP файла таблиц {zip_file_path}: {str(e)}") continue # Process direct JSON files (if any) for file_path in table_files: try: log_message(f"Обрабатываю прямой файл таблицы: {file_path}") local_path = hf_hub_download( repo_id=self.repo_id, filename=file_path, local_dir=self.download_dir, repo_type="dataset", token=self.hf_token ) with open(local_path, 'r', encoding='utf-8') as f: table_data = json.load(f) if isinstance(table_data, dict): document_id = table_data.get('document', 'unknown') if 'sheets' in table_data: for sheet in table_data['sheets']: sheet['document'] = document_id doc = self.table_to_document(sheet, document_id) table_documents.append(doc) else: doc = self.table_to_document(table_data, document_id) table_documents.append(doc) elif isinstance(table_data, list): for table_json in table_data: doc = self.table_to_document(table_json) table_documents.append(doc) except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Создано {len(table_documents)} документов из таблиц") return table_documents except Exception as e: log_message(f"Ошибка загрузки табличных данных: {str(e)}") return [] def load_image_documents(self): log_message("Начинаю загрузку данных изображений") try: files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) image_files = [f for f in files if f.startswith(self.image_data_dir) and f.endswith('.csv')] log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") image_documents = [] for file_path in image_files: try: log_message(f"Обрабатываю файл изображений: {file_path}") local_path = hf_hub_download( repo_id=self.repo_id, filename=file_path, local_dir=self.download_dir, repo_type="dataset", token=self.hf_token ) df = pd.read_csv(local_path) log_message(f"Загружено {len(df)} записей изображений из файла {file_path}") for _, row in df.iterrows(): content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" content += f"Раздел: {row.get('Раздел документа', 'Неизвестно')}\n" content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n" doc = Document( text=content, metadata={ "type": "image", "image_number": row.get('№ Изображения', 'unknown'), "document_id": row.get('Обозначение документа', 'unknown'), "file_path": row.get('Файл изображения', 'unknown'), "section": row.get('Раздел документа', 'unknown') } ) image_documents.append(doc) except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Создано {len(image_documents)} документов из изображений") return image_documents except Exception as e: log_message(f"Ошибка загрузки данных изображений: {str(e)}") return [] def prepare_all_documents(self): log_message("Подготовка всех документов") all_documents = [] json_documents = self.load_json_documents() all_documents.extend(json_documents) table_documents = self.load_table_documents() all_documents.extend(table_documents) image_documents = self.load_image_documents() all_documents.extend(image_documents) log_message(f"Всего подготовлено {len(all_documents)} документов") return all_documents