import json import zipfile import pandas as pd from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from llama_index.core.text_splitter import SentenceSplitter from my_logging import log_message from config import CHUNK_SIZE, CHUNK_OVERLAP import os def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): log_message(f"Загрузка JSON документов из {json_files_dir}") documents = [] chunk_info = [] try: files = list_repo_files(repo_id, token=hf_token) zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] log_message(f"Найдено {len(zip_files)} ZIP файлов") for zip_file in zip_files: zip_path = hf_hub_download( repo_id=repo_id, filename=zip_file, token=hf_token, repo_type="dataset", local_dir=download_dir ) log_message(f"Обрабатываю архив: {zip_file}") with zipfile.ZipFile(zip_path, 'r') as zip_ref: json_files = [f for f in zip_ref.namelist() if f.endswith('.json') and not f.startswith('__MACOSX')] log_message(f"Найдено {len(json_files)} JSON файлов в архиве") for json_file in json_files: try: with zip_ref.open(json_file) as f: json_data = json.load(f) doc_id = json_data.get('document_id', os.path.basename(json_file)) sections = json_data.get('sections', []) log_message(f"Обработка документа {doc_id}: {len(sections)} разделов") for section in sections: doc, info = process_text_section(section, doc_id) if doc: documents.append(doc) chunk_info.append(info) except Exception as e: log_message(f"Ошибка при обработке {json_file}: {str(e)}") log_message(f"Загружено {len(documents)} текстовых документов") return documents, chunk_info except Exception as e: log_message(f"Ошибка загрузки JSON: {str(e)}") return [], [] def process_text_section(section, doc_id): section_id = section.get('section_id', 'unknown') section_path = section.get('section_path', '') section_text = section.get('section_text', '') section_content = section.get('section_content', '') parent_section = section.get('parent_section', '') parent_title = section.get('parent_title', '') level = section.get('level', 'section') full_text = f"{section_text}\n{section_content}".strip() if not full_text: return None, None metadata = { 'document_id': doc_id, 'section_id': section_id, 'section_path': section_path, 'section_text': section_text, 'parent_section': parent_section, 'parent_title': parent_title, 'level': level, 'type': 'text', 'chunk_text': full_text } doc = Document( text=full_text, metadata=metadata ) chunk_info = { 'document_id': doc_id, 'section_id': section_id, 'section_path': section_path, 'section_text': section_text, 'parent_section': parent_section, 'parent_title': parent_title, 'level': level, 'type': 'text', 'chunk_text': full_text } return doc, chunk_info def load_table_data(repo_id, hf_token, table_data_dir): log_message(f"Загрузка табличных данных из {table_data_dir}") documents = [] try: files = list_repo_files(repo_id, token=hf_token) json_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')] log_message(f"Найдено {len(json_files)} табличных JSON файлов") for json_file in json_files: try: file_path = hf_hub_download( repo_id=repo_id, filename=json_file, token=hf_token, repo_type="dataset" ) with open(file_path, 'r', encoding='utf-8') as f: table_data = json.load(f) doc = create_table_document(table_data) if doc: documents.append(doc) except Exception as e: log_message(f"Ошибка при обработке таблицы {json_file}: {str(e)}") log_message(f"Загружено {len(documents)} табличных документов") return documents except Exception as e: log_message(f"Ошибка загрузки таблиц: {str(e)}") return [] def create_table_document(table_data): doc_id = table_data.get('document_id', 'unknown') table_number = table_data.get('table_number', 'unknown') table_title = table_data.get('table_title', '') section = table_data.get('section', '') headers = table_data.get('headers', []) data = table_data.get('data', []) if not data: return None token_count = estimate_tokens(str(table_data)) if token_count < 2000: text = format_table_as_text(table_number, table_title, section, headers, data) metadata = { 'document_id': doc_id, 'table_number': table_number, 'table_title': table_title, 'section': section, 'type': 'table', 'headers': str(headers), 'row_count': len(data) } return Document(text=text, metadata=metadata) else: return create_chunked_table_document( doc_id, table_number, table_title, section, headers, data ) def create_chunked_table_document(doc_id, table_number, table_title, section, headers, data, rows_per_chunk=30): chunks = [] for i in range(0, len(data), rows_per_chunk): chunk_rows = data[i:i+rows_per_chunk] text = format_table_as_text( table_number, table_title, section, headers, chunk_rows, chunk_info=f"строки {i+1}-{i+len(chunk_rows)}" ) metadata = { 'document_id': doc_id, 'table_number': table_number, 'table_title': table_title, 'section': section, 'type': 'table', 'headers': str(headers), 'chunk_index': i // rows_per_chunk, 'row_start': i, 'row_end': i + len(chunk_rows), 'row_count': len(chunk_rows) } chunks.append(Document(text=text, metadata=metadata)) return chunks[0] if len(chunks) == 1 else chunks def format_table_as_text(table_number, table_title, section, headers, data, chunk_info=""): text_parts = [] text_parts.append(f"Таблица {table_number}") if table_title: text_parts.append(f"Название: {table_title}") if section: text_parts.append(f"Раздел: {section}") if chunk_info: text_parts.append(f"({chunk_info})") text_parts.append(f"\nЗаголовки: {', '.join(headers)}") text_parts.append("\nДанные:") for row in data[:100]: row_text = " | ".join([str(cell) for cell in row]) text_parts.append(row_text) return "\n".join(text_parts) def load_image_data(repo_id, hf_token, image_data_dir): log_message(f"Загрузка данных изображений из {image_data_dir}") documents = [] try: files = list_repo_files(repo_id, token=hf_token) json_files = [f for f in files if f.startswith(image_data_dir) and f.endswith('.json')] log_message(f"Найдено {len(json_files)} JSON файлов изображений") for json_file in json_files: try: file_path = hf_hub_download( repo_id=repo_id, filename=json_file, token=hf_token, repo_type="dataset" ) with open(file_path, 'r', encoding='utf-8') as f: image_data = json.load(f) doc = create_image_document(image_data) if doc: documents.append(doc) except Exception as e: log_message(f"Ошибка при обработке изображения {json_file}: {str(e)}") log_message(f"Загружено {len(documents)} документов изображений") return documents except Exception as e: log_message(f"Ошибка загрузки изображений: {str(e)}") return [] def create_image_document(image_data): doc_id = image_data.get('document_id', 'unknown') image_number = image_data.get('image_number', 'unknown') image_title = image_data.get('image_title', '') image_description = image_data.get('image_description', '') section = image_data.get('section', '') text_parts = [] text_parts.append(f"Рисунок {image_number}") if image_title: text_parts.append(f"Название: {image_title}") if section: text_parts.append(f"Раздел: {section}") if image_description: text_parts.append(f"Описание: {image_description}") text = "\n".join(text_parts) metadata = { 'document_id': doc_id, 'image_number': image_number, 'image_title': image_title, 'section': section, 'type': 'image' } return Document(text=text, metadata=metadata) def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): log_message(f"Загрузка CSV чанков из {chunks_filename}") try: csv_path = hf_hub_download( repo_id=repo_id, filename=chunks_filename, token=hf_token, repo_type="dataset", local_dir=download_dir ) df = pd.read_csv(csv_path) log_message(f"Загружено {len(df)} строк из CSV") documents = [] for _, row in df.iterrows(): metadata = { 'document_id': row.get('document_id', 'unknown'), 'section_id': row.get('section_id', 'unknown'), 'section_path': row.get('section_path', ''), 'type': 'text' } text = row.get('chunk_text', '') if text: doc = Document(text=text, metadata=metadata) documents.append(doc) log_message(f"Создано {len(documents)} документов из CSV") return documents, df except Exception as e: log_message(f"Ошибка загрузки CSV: {str(e)}") return [], None def process_documents_with_chunking(documents): log_message(f"Чанкинг {len(documents)} документов") text_splitter = SentenceSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator=" ", backup_separators=["\n", ".", "!", "?"] ) chunked_documents = [] chunk_info = [] for doc in documents: doc_type = doc.metadata.get('type', 'text') if doc_type == 'table': if isinstance(doc, list): chunked_documents.extend(doc) for d in doc: chunk_info.append(create_chunk_info(d)) else: chunked_documents.append(doc) chunk_info.append(create_chunk_info(doc)) elif doc_type == 'image': chunked_documents.append(doc) chunk_info.append(create_chunk_info(doc)) else: token_count = estimate_tokens(doc.text) if token_count <= CHUNK_SIZE: chunked_documents.append(doc) chunk_info.append(create_chunk_info(doc)) else: nodes = text_splitter.get_nodes_from_documents([doc]) for node in nodes: new_doc = Document( text=node.text, metadata=doc.metadata ) chunked_documents.append(new_doc) chunk_info.append(create_chunk_info(new_doc)) log_message(f"Получено {len(chunked_documents)} чанков после обработки") return chunked_documents, chunk_info def create_chunk_info(doc): metadata = doc.metadata info = { 'document_id': metadata.get('document_id', 'unknown'), 'type': metadata.get('type', 'text'), 'chunk_text': doc.text[:500] } if metadata.get('type') == 'table': info['table_number'] = metadata.get('table_number', 'unknown') info['table_title'] = metadata.get('table_title', '') info['section'] = metadata.get('section', '') elif metadata.get('type') == 'image': info['image_number'] = metadata.get('image_number', 'unknown') info['image_title'] = metadata.get('image_title', '') info['section'] = metadata.get('section', '') else: info['section_id'] = metadata.get('section_id', 'unknown') info['section_path'] = metadata.get('section_path', '') info['section_text'] = metadata.get('section_text', '') info['parent_section'] = metadata.get('parent_section', '') info['parent_title'] = metadata.get('parent_title', '') info['level'] = metadata.get('level', 'section') return info def estimate_tokens(text): return len(text.split()) * 1.3