Spaces:
Sleeping
Sleeping
| import json | |
| import zipfile | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from llama_index.core import Document | |
| from llama_index.core.text_splitter import SentenceSplitter | |
| from my_logging import log_message | |
| from config import CHUNK_SIZE, CHUNK_OVERLAP | |
| import os | |
| def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): | |
| log_message(f"Загрузка JSON документов из {json_files_dir}") | |
| documents = [] | |
| chunk_info = [] | |
| try: | |
| files = list_repo_files(repo_id, token=hf_token) | |
| zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] | |
| log_message(f"Найдено {len(zip_files)} ZIP файлов") | |
| for zip_file in zip_files: | |
| zip_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=zip_file, | |
| token=hf_token, | |
| repo_type="dataset", | |
| local_dir=download_dir | |
| ) | |
| log_message(f"Обрабатываю архив: {zip_file}") | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| json_files = [f for f in zip_ref.namelist() | |
| if f.endswith('.json') and not f.startswith('__MACOSX')] | |
| log_message(f"Найдено {len(json_files)} JSON файлов в архиве") | |
| for json_file in json_files: | |
| try: | |
| with zip_ref.open(json_file) as f: | |
| json_data = json.load(f) | |
| doc_id = json_data.get('document_id', os.path.basename(json_file)) | |
| sections = json_data.get('sections', []) | |
| log_message(f"Обработка документа {doc_id}: {len(sections)} разделов") | |
| for section in sections: | |
| doc, info = process_text_section(section, doc_id) | |
| if doc: | |
| documents.append(doc) | |
| chunk_info.append(info) | |
| except Exception as e: | |
| log_message(f"Ошибка при обработке {json_file}: {str(e)}") | |
| log_message(f"Загружено {len(documents)} текстовых документов") | |
| return documents, chunk_info | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки JSON: {str(e)}") | |
| return [], [] | |
| def process_text_section(section, doc_id): | |
| section_id = section.get('section_id', 'unknown') | |
| section_path = section.get('section_path', '') | |
| section_text = section.get('section_text', '') | |
| section_content = section.get('section_content', '') | |
| parent_section = section.get('parent_section', '') | |
| parent_title = section.get('parent_title', '') | |
| level = section.get('level', 'section') | |
| full_text = f"{section_text}\n{section_content}".strip() | |
| if not full_text: | |
| return None, None | |
| metadata = { | |
| 'document_id': doc_id, | |
| 'section_id': section_id, | |
| 'section_path': section_path, | |
| 'section_text': section_text, | |
| 'parent_section': parent_section, | |
| 'parent_title': parent_title, | |
| 'level': level, | |
| 'type': 'text', | |
| 'chunk_text': full_text | |
| } | |
| doc = Document( | |
| text=full_text, | |
| metadata=metadata | |
| ) | |
| chunk_info = { | |
| 'document_id': doc_id, | |
| 'section_id': section_id, | |
| 'section_path': section_path, | |
| 'section_text': section_text, | |
| 'parent_section': parent_section, | |
| 'parent_title': parent_title, | |
| 'level': level, | |
| 'type': 'text', | |
| 'chunk_text': full_text | |
| } | |
| return doc, chunk_info | |
| def load_table_data(repo_id, hf_token, table_data_dir): | |
| log_message(f"Загрузка табличных данных из {table_data_dir}") | |
| documents = [] | |
| try: | |
| files = list_repo_files(repo_id, token=hf_token) | |
| json_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(json_files)} табличных JSON файлов") | |
| for json_file in json_files: | |
| try: | |
| file_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=json_file, | |
| token=hf_token, | |
| repo_type="dataset" | |
| ) | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| table_data = json.load(f) | |
| doc = create_table_document(table_data) | |
| if doc: | |
| documents.append(doc) | |
| except Exception as e: | |
| log_message(f"Ошибка при обработке таблицы {json_file}: {str(e)}") | |
| log_message(f"Загружено {len(documents)} табличных документов") | |
| return documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки таблиц: {str(e)}") | |
| return [] | |
| def create_table_document(table_data): | |
| doc_id = table_data.get('document_id', 'unknown') | |
| table_number = table_data.get('table_number', 'unknown') | |
| table_title = table_data.get('table_title', '') | |
| section = table_data.get('section', '') | |
| headers = table_data.get('headers', []) | |
| data = table_data.get('data', []) | |
| if not data: | |
| return None | |
| token_count = estimate_tokens(str(table_data)) | |
| if token_count < 2000: | |
| text = format_table_as_text(table_number, table_title, section, headers, data) | |
| metadata = { | |
| 'document_id': doc_id, | |
| 'table_number': table_number, | |
| 'table_title': table_title, | |
| 'section': section, | |
| 'type': 'table', | |
| 'headers': str(headers), | |
| 'row_count': len(data) | |
| } | |
| return Document(text=text, metadata=metadata) | |
| else: | |
| return create_chunked_table_document( | |
| doc_id, table_number, table_title, section, headers, data | |
| ) | |
| def create_chunked_table_document(doc_id, table_number, table_title, section, headers, data, rows_per_chunk=30): | |
| chunks = [] | |
| for i in range(0, len(data), rows_per_chunk): | |
| chunk_rows = data[i:i+rows_per_chunk] | |
| text = format_table_as_text( | |
| table_number, | |
| table_title, | |
| section, | |
| headers, | |
| chunk_rows, | |
| chunk_info=f"строки {i+1}-{i+len(chunk_rows)}" | |
| ) | |
| metadata = { | |
| 'document_id': doc_id, | |
| 'table_number': table_number, | |
| 'table_title': table_title, | |
| 'section': section, | |
| 'type': 'table', | |
| 'headers': str(headers), | |
| 'chunk_index': i // rows_per_chunk, | |
| 'row_start': i, | |
| 'row_end': i + len(chunk_rows), | |
| 'row_count': len(chunk_rows) | |
| } | |
| chunks.append(Document(text=text, metadata=metadata)) | |
| return chunks[0] if len(chunks) == 1 else chunks | |
| def format_table_as_text(table_number, table_title, section, headers, data, chunk_info=""): | |
| text_parts = [] | |
| text_parts.append(f"Таблица {table_number}") | |
| if table_title: | |
| text_parts.append(f"Название: {table_title}") | |
| if section: | |
| text_parts.append(f"Раздел: {section}") | |
| if chunk_info: | |
| text_parts.append(f"({chunk_info})") | |
| text_parts.append(f"\nЗаголовки: {', '.join(headers)}") | |
| text_parts.append("\nДанные:") | |
| for row in data[:100]: | |
| row_text = " | ".join([str(cell) for cell in row]) | |
| text_parts.append(row_text) | |
| return "\n".join(text_parts) | |
| def load_image_data(repo_id, hf_token, image_data_dir): | |
| log_message(f"Загрузка данных изображений из {image_data_dir}") | |
| documents = [] | |
| try: | |
| files = list_repo_files(repo_id, token=hf_token) | |
| json_files = [f for f in files if f.startswith(image_data_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(json_files)} JSON файлов изображений") | |
| for json_file in json_files: | |
| try: | |
| file_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=json_file, | |
| token=hf_token, | |
| repo_type="dataset" | |
| ) | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| image_data = json.load(f) | |
| doc = create_image_document(image_data) | |
| if doc: | |
| documents.append(doc) | |
| except Exception as e: | |
| log_message(f"Ошибка при обработке изображения {json_file}: {str(e)}") | |
| log_message(f"Загружено {len(documents)} документов изображений") | |
| return documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки изображений: {str(e)}") | |
| return [] | |
| def create_image_document(image_data): | |
| doc_id = image_data.get('document_id', 'unknown') | |
| image_number = image_data.get('image_number', 'unknown') | |
| image_title = image_data.get('image_title', '') | |
| image_description = image_data.get('image_description', '') | |
| section = image_data.get('section', '') | |
| text_parts = [] | |
| text_parts.append(f"Рисунок {image_number}") | |
| if image_title: | |
| text_parts.append(f"Название: {image_title}") | |
| if section: | |
| text_parts.append(f"Раздел: {section}") | |
| if image_description: | |
| text_parts.append(f"Описание: {image_description}") | |
| text = "\n".join(text_parts) | |
| metadata = { | |
| 'document_id': doc_id, | |
| 'image_number': image_number, | |
| 'image_title': image_title, | |
| 'section': section, | |
| 'type': 'image' | |
| } | |
| return Document(text=text, metadata=metadata) | |
| def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): | |
| log_message(f"Загрузка CSV чанков из {chunks_filename}") | |
| try: | |
| csv_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=chunks_filename, | |
| token=hf_token, | |
| repo_type="dataset", | |
| local_dir=download_dir | |
| ) | |
| df = pd.read_csv(csv_path) | |
| log_message(f"Загружено {len(df)} строк из CSV") | |
| documents = [] | |
| for _, row in df.iterrows(): | |
| metadata = { | |
| 'document_id': row.get('document_id', 'unknown'), | |
| 'section_id': row.get('section_id', 'unknown'), | |
| 'section_path': row.get('section_path', ''), | |
| 'type': 'text' | |
| } | |
| text = row.get('chunk_text', '') | |
| if text: | |
| doc = Document(text=text, metadata=metadata) | |
| documents.append(doc) | |
| log_message(f"Создано {len(documents)} документов из CSV") | |
| return documents, df | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки CSV: {str(e)}") | |
| return [], None | |
| def process_documents_with_chunking(documents): | |
| log_message(f"Чанкинг {len(documents)} документов") | |
| text_splitter = SentenceSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| separator=" ", | |
| backup_separators=["\n", ".", "!", "?"] | |
| ) | |
| chunked_documents = [] | |
| chunk_info = [] | |
| for doc in documents: | |
| doc_type = doc.metadata.get('type', 'text') | |
| if doc_type == 'table': | |
| if isinstance(doc, list): | |
| chunked_documents.extend(doc) | |
| for d in doc: | |
| chunk_info.append(create_chunk_info(d)) | |
| else: | |
| chunked_documents.append(doc) | |
| chunk_info.append(create_chunk_info(doc)) | |
| elif doc_type == 'image': | |
| chunked_documents.append(doc) | |
| chunk_info.append(create_chunk_info(doc)) | |
| else: | |
| token_count = estimate_tokens(doc.text) | |
| if token_count <= CHUNK_SIZE: | |
| chunked_documents.append(doc) | |
| chunk_info.append(create_chunk_info(doc)) | |
| else: | |
| nodes = text_splitter.get_nodes_from_documents([doc]) | |
| for node in nodes: | |
| new_doc = Document( | |
| text=node.text, | |
| metadata=doc.metadata | |
| ) | |
| chunked_documents.append(new_doc) | |
| chunk_info.append(create_chunk_info(new_doc)) | |
| log_message(f"Получено {len(chunked_documents)} чанков после обработки") | |
| return chunked_documents, chunk_info | |
| def create_chunk_info(doc): | |
| metadata = doc.metadata | |
| info = { | |
| 'document_id': metadata.get('document_id', 'unknown'), | |
| 'type': metadata.get('type', 'text'), | |
| 'chunk_text': doc.text[:500] | |
| } | |
| if metadata.get('type') == 'table': | |
| info['table_number'] = metadata.get('table_number', 'unknown') | |
| info['table_title'] = metadata.get('table_title', '') | |
| info['section'] = metadata.get('section', '') | |
| elif metadata.get('type') == 'image': | |
| info['image_number'] = metadata.get('image_number', 'unknown') | |
| info['image_title'] = metadata.get('image_title', '') | |
| info['section'] = metadata.get('section', '') | |
| else: | |
| info['section_id'] = metadata.get('section_id', 'unknown') | |
| info['section_path'] = metadata.get('section_path', '') | |
| info['section_text'] = metadata.get('section_text', '') | |
| info['parent_section'] = metadata.get('parent_section', '') | |
| info['parent_title'] = metadata.get('parent_title', '') | |
| info['level'] = metadata.get('level', 'section') | |
| return info | |
| def estimate_tokens(text): | |
| return len(text.split()) * 1.3 |