Spaces:
Sleeping
Sleeping
| import json | |
| import pandas as pd | |
| import os | |
| import zipfile | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from llama_index.core import Document | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def log_message(message): | |
| logger.info(message) | |
| print(message, flush=True) | |
| class DocumentsPreparation: | |
| def __init__(self, repo_id, hf_token): | |
| self.repo_id = repo_id | |
| self.hf_token = hf_token | |
| self.json_files_dir = "JSON" | |
| self.table_data_dir = "Табличные данные_JSON" | |
| self.image_data_dir = "Изображения" | |
| self.download_dir = "rag_files" | |
| def extract_text_from_json(self, data, document_id, document_name): | |
| documents = [] | |
| if 'sections' in data: | |
| for section in data['sections']: | |
| section_id = section.get('section_id', 'Unknown') | |
| section_text = section.get('section_text', '') | |
| if section_text.strip(): | |
| doc = Document( | |
| text=section_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": section_id, | |
| "level": "section" | |
| } | |
| ) | |
| documents.append(doc) | |
| if 'subsections' in section: | |
| for subsection in section['subsections']: | |
| subsection_id = subsection.get('subsection_id', 'Unknown') | |
| subsection_text = subsection.get('subsection_text', '') | |
| if subsection_text.strip(): | |
| doc = Document( | |
| text=subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": section_id, | |
| "subsection_id": subsection_id, | |
| "level": "subsection" | |
| } | |
| ) | |
| documents.append(doc) | |
| if 'sub_subsections' in subsection: | |
| for sub_subsection in subsection['sub_subsections']: | |
| sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown') | |
| sub_subsection_text = sub_subsection.get('sub_subsection_text', '') | |
| if sub_subsection_text.strip(): | |
| doc = Document( | |
| text=sub_subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": section_id, | |
| "subsection_id": subsection_id, | |
| "sub_subsection_id": sub_subsection_id, | |
| "level": "sub_subsection" | |
| } | |
| ) | |
| documents.append(doc) | |
| if 'sub_sub_subsections' in sub_subsection: | |
| for sub_sub_subsection in sub_subsection['sub_sub_subsections']: | |
| sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown') | |
| sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '') | |
| if sub_sub_subsection_text.strip(): | |
| doc = Document( | |
| text=sub_sub_subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": section_id, | |
| "subsection_id": subsection_id, | |
| "sub_subsection_id": sub_subsection_id, | |
| "sub_sub_subsection_id": sub_sub_subsection_id, | |
| "level": "sub_sub_subsection" | |
| } | |
| ) | |
| documents.append(doc) | |
| return documents | |
| def extract_zip_and_process_json(self, zip_path): | |
| """Extract ZIP file and process JSON files inside""" | |
| documents = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| # Get list of files in ZIP | |
| zip_files = zip_ref.namelist() | |
| json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] | |
| log_message(f"Найдено {len(json_files)} JSON файлов в архиве") | |
| for json_file in json_files: | |
| try: | |
| log_message(f"Обрабатываю файл из архива: {json_file}") | |
| # Read JSON file from ZIP | |
| with zip_ref.open(json_file) as f: | |
| json_data = json.load(f) | |
| document_metadata = json_data.get('document_metadata', {}) | |
| document_id = document_metadata.get('document_id', 'unknown') | |
| document_name = document_metadata.get('document_name', 'unknown') | |
| docs = self.extract_text_from_json(json_data, document_id, document_name) | |
| documents.extend(docs) | |
| log_message(f"Извлечено {len(docs)} документов из {json_file}") | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {json_file}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| log_message(f"Ошибка извлечения ZIP архива {zip_path}: {str(e)}") | |
| return documents | |
| def load_json_documents(self): | |
| log_message("Начинаю загрузку JSON документов") | |
| try: | |
| files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) | |
| # Look for ZIP files in the JSON directory | |
| zip_files = [f for f in files if f.startswith(self.json_files_dir) and f.endswith('.zip')] | |
| # Also look for direct JSON files (fallback) | |
| json_files = [f for f in files if f.startswith(self.json_files_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} прямых JSON файлов") | |
| all_documents = [] | |
| # Process ZIP files first | |
| for zip_file_path in zip_files: | |
| try: | |
| log_message(f"Загружаю ZIP архив: {zip_file_path}") | |
| local_zip_path = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=zip_file_path, | |
| local_dir=self.download_dir, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| documents = self.extract_zip_and_process_json(local_zip_path) | |
| all_documents.extend(documents) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки ZIP файла {zip_file_path}: {str(e)}") | |
| continue | |
| # Process direct JSON files (if any) | |
| for file_path in json_files: | |
| try: | |
| log_message(f"Обрабатываю прямой JSON файл: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=file_path, | |
| local_dir=self.download_dir, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| document_metadata = json_data.get('document_metadata', {}) | |
| document_id = document_metadata.get('document_id', 'unknown') | |
| document_name = document_metadata.get('document_name', 'unknown') | |
| documents = self.extract_text_from_json(json_data, document_id, document_name) | |
| all_documents.extend(documents) | |
| log_message(f"Извлечено {len(documents)} документов из {file_path}") | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Всего создано {len(all_documents)} текстовых документов") | |
| return all_documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки JSON документов: {str(e)}") | |
| return [] | |
| def table_to_document(self, table_data, document_id=None): | |
| content = "" | |
| if isinstance(table_data, dict): | |
| doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно')) | |
| table_num = table_data.get('table_number', 'Неизвестно') | |
| table_title = table_data.get('table_title', 'Неизвестно') | |
| section = table_data.get('section', 'Неизвестно') | |
| content += f"Таблица: {table_num}\n" | |
| content += f"Название: {table_title}\n" | |
| content += f"Документ: {doc_id}\n" | |
| content += f"Раздел: {section}\n" | |
| if 'data' in table_data and isinstance(table_data['data'], list): | |
| for row in table_data['data']: | |
| if isinstance(row, dict): | |
| row_text = " | ".join([f"{k}: {v}" for k, v in row.items()]) | |
| content += f"{row_text}\n" | |
| return Document( | |
| text=content, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_data.get('table_number', 'unknown'), | |
| "table_title": table_data.get('table_title', 'unknown'), | |
| "document_id": doc_id or table_data.get('document_id', table_data.get('document', 'unknown')), | |
| "section": table_data.get('section', 'unknown') | |
| } | |
| ) | |
| def extract_zip_and_process_tables(self, zip_path): | |
| """Extract ZIP file and process table JSON files inside""" | |
| documents = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| # Get list of files in ZIP | |
| zip_files = zip_ref.namelist() | |
| json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] | |
| log_message(f"Найдено {len(json_files)} JSON файлов таблиц в архиве") | |
| for json_file in json_files: | |
| try: | |
| log_message(f"Обрабатываю файл таблицы из архива: {json_file}") | |
| # Read JSON file from ZIP | |
| with zip_ref.open(json_file) as f: | |
| table_data = json.load(f) | |
| if isinstance(table_data, dict): | |
| document_id = table_data.get('document', 'unknown') | |
| if 'sheets' in table_data: | |
| for sheet in table_data['sheets']: | |
| sheet['document'] = document_id | |
| doc = self.table_to_document(sheet, document_id) | |
| documents.append(doc) | |
| else: | |
| doc = self.table_to_document(table_data, document_id) | |
| documents.append(doc) | |
| elif isinstance(table_data, list): | |
| for table_json in table_data: | |
| doc = self.table_to_document(table_json) | |
| documents.append(doc) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла таблицы {json_file}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| log_message(f"Ошибка извлечения ZIP архива таблиц {zip_path}: {str(e)}") | |
| return documents | |
| def load_table_documents(self): | |
| log_message("Начинаю загрузку табличных данных") | |
| try: | |
| files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) | |
| # Look for ZIP files in the table directory | |
| zip_files = [f for f in files if f.startswith(self.table_data_dir) and f.endswith('.zip')] | |
| # Also look for direct JSON files (fallback) | |
| table_files = [f for f in files if f.startswith(self.table_data_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(zip_files)} ZIP файлов с таблицами и {len(table_files)} прямых JSON файлов") | |
| table_documents = [] | |
| # Process ZIP files first | |
| for zip_file_path in zip_files: | |
| try: | |
| log_message(f"Загружаю ZIP архив таблиц: {zip_file_path}") | |
| local_zip_path = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=zip_file_path, | |
| local_dir=self.download_dir, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| documents = self.extract_zip_and_process_tables(local_zip_path) | |
| table_documents.extend(documents) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки ZIP файла таблиц {zip_file_path}: {str(e)}") | |
| continue | |
| # Process direct JSON files (if any) | |
| for file_path in table_files: | |
| try: | |
| log_message(f"Обрабатываю прямой файл таблицы: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=file_path, | |
| local_dir=self.download_dir, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| table_data = json.load(f) | |
| if isinstance(table_data, dict): | |
| document_id = table_data.get('document', 'unknown') | |
| if 'sheets' in table_data: | |
| for sheet in table_data['sheets']: | |
| sheet['document'] = document_id | |
| doc = self.table_to_document(sheet, document_id) | |
| table_documents.append(doc) | |
| else: | |
| doc = self.table_to_document(table_data, document_id) | |
| table_documents.append(doc) | |
| elif isinstance(table_data, list): | |
| for table_json in table_data: | |
| doc = self.table_to_document(table_json) | |
| table_documents.append(doc) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Создано {len(table_documents)} документов из таблиц") | |
| return table_documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки табличных данных: {str(e)}") | |
| return [] | |
| def load_image_documents(self): | |
| log_message("Начинаю загрузку данных изображений") | |
| try: | |
| files = list_repo_files(repo_id=self.repo_id, repo_type="dataset", token=self.hf_token) | |
| image_files = [f for f in files if f.startswith(self.image_data_dir) and f.endswith('.csv')] | |
| log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") | |
| image_documents = [] | |
| for file_path in image_files: | |
| try: | |
| log_message(f"Обрабатываю файл изображений: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=file_path, | |
| local_dir=self.download_dir, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| df = pd.read_csv(local_path) | |
| log_message(f"Загружено {len(df)} записей изображений из файла {file_path}") | |
| for _, row in df.iterrows(): | |
| content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" | |
| content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" | |
| content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" | |
| content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" | |
| content += f"Раздел: {row.get('Раздел документа', 'Неизвестно')}\n" | |
| content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n" | |
| doc = Document( | |
| text=content, | |
| metadata={ | |
| "type": "image", | |
| "image_number": row.get('№ Изображения', 'unknown'), | |
| "document_id": row.get('Обозначение документа', 'unknown'), | |
| "file_path": row.get('Файл изображения', 'unknown'), | |
| "section": row.get('Раздел документа', 'unknown') | |
| } | |
| ) | |
| image_documents.append(doc) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Создано {len(image_documents)} документов из изображений") | |
| return image_documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки данных изображений: {str(e)}") | |
| return [] | |
| def prepare_all_documents(self): | |
| log_message("Подготовка всех документов") | |
| all_documents = [] | |
| json_documents = self.load_json_documents() | |
| all_documents.extend(json_documents) | |
| table_documents = self.load_table_documents() | |
| all_documents.extend(table_documents) | |
| image_documents = self.load_image_documents() | |
| all_documents.extend(image_documents) | |
| log_message(f"Всего подготовлено {len(all_documents)} документов") | |
| return all_documents |