import json import zipfile import pandas as pd from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from my_logging import log_message from llama_index.core.text_splitter import SentenceSplitter from config import CHUNK_SIZE, CHUNK_OVERLAP def chunk_document(doc, chunk_size=None, chunk_overlap=None): if chunk_size is None: chunk_size = CHUNK_SIZE if chunk_overlap is None: chunk_overlap = CHUNK_OVERLAP text_splitter = SentenceSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separator=" " ) text_chunks = text_splitter.split_text(doc.text) chunked_docs = [] for i, chunk_text in enumerate(text_chunks): chunk_metadata = doc.metadata.copy() chunk_metadata.update({ "chunk_id": i, "total_chunks": len(text_chunks), "chunk_size": len(chunk_text), "original_doc_id": doc.id_ if hasattr(doc, 'id_') else None }) chunked_doc = Document( text=chunk_text, metadata=chunk_metadata ) chunked_docs.append(chunked_doc) return chunked_docs def process_documents_with_chunking(documents): all_chunked_docs = [] chunk_info = [] table_count = 0 image_count = 0 text_chunks_count = 0 large_tables_count = 0 large_images_count = 0 for doc in documents: doc_type = doc.metadata.get('type', 'text') if doc_type == 'table': table_count += 1 if len(doc.text) > CHUNK_SIZE: large_tables_count += 1 log_message(f"Large table found: {doc.metadata.get('table_number', 'unknown')} in document {doc.metadata.get('document_id', 'unknown')}, size: {len(doc.text)} characters") all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': len(doc.text), 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'table' }) elif doc_type == 'image': image_count += 1 if len(doc.text) > CHUNK_SIZE: large_images_count += 1 log_message(f"Large image description found: {doc.metadata.get('image_number', 'unknown')} in document {doc.metadata.get('document_id', 'unknown')}, size: {len(doc.text)} characters") all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': len(doc.text), 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'image' }) else: if len(doc.text) > CHUNK_SIZE: chunked_docs = chunk_document(doc) all_chunked_docs.extend(chunked_docs) text_chunks_count += len(chunked_docs) for i, chunk_doc in enumerate(chunked_docs): chunk_info.append({ 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), 'chunk_id': i, 'chunk_size': len(chunk_doc.text), 'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text, 'type': 'text' }) else: all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': len(doc.text), 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'text' }) log_message(f"=== PROCESSING STATISTICS ===") log_message(f"Total tables processed: {table_count}") log_message(f"Large tables (>{CHUNK_SIZE} chars): {large_tables_count}") log_message(f"Total images processed: {image_count}") log_message(f"Large images (>{CHUNK_SIZE} chars): {large_images_count}") log_message(f"Total text chunks created: {text_chunks_count}") log_message(f"Total documents after processing: {len(all_chunked_docs)}") return all_chunked_docs, chunk_info def extract_text_from_json(data, document_id, document_name): documents = [] if 'sections' in data: for section in data['sections']: section_id = section.get('section_id', 'Unknown') section_text = section.get('section_text', '') section_path = f"{section_id}" section_title = extract_section_title(section_text) if section_text.strip(): doc = Document( text=section_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "section_text": section_title[:200], "section_path": section_path, "level": "section" } ) documents.append(doc) if 'subsections' in section: for subsection in section['subsections']: subsection_id = subsection.get('subsection_id', 'Unknown') subsection_text = subsection.get('subsection_text', '') subsection_title = extract_section_title(subsection_text) subsection_path = f"{section_path}.{subsection_id}" if subsection_text.strip(): doc = Document( text=subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": subsection_id, "section_text": subsection_title[:200], "section_path": subsection_path, "level": "subsection", "parent_section": section_id, "parent_title": section_title[:100] } ) documents.append(doc) if 'sub_subsections' in subsection: for sub_subsection in subsection['sub_subsections']: sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown') sub_subsection_text = sub_subsection.get('sub_subsection_text', '') sub_subsection_title = extract_section_title(sub_subsection_text) sub_subsection_path = f"{subsection_path}.{sub_subsection_id}" if sub_subsection_text.strip(): doc = Document( text=sub_subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": sub_subsection_id, "section_text": sub_subsection_title[:200], "section_path": sub_subsection_path, "level": "sub_subsection", "parent_section": subsection_id, "parent_title": subsection_title[:100] } ) documents.append(doc) if 'sub_sub_subsections' in sub_subsection: for sub_sub_subsection in sub_subsection['sub_sub_subsections']: sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown') sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '') sub_sub_subsection_title = extract_section_title(sub_sub_subsection_text) if sub_sub_subsection_text.strip(): doc = Document( text=sub_sub_subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": sub_sub_subsection_id, "section_text": sub_sub_subsection_title[:200], "section_path": f"{sub_subsection_path}.{sub_sub_subsection_id}", "level": "sub_sub_subsection", "parent_section": sub_subsection_id, "parent_title": sub_subsection_title[:100] } ) documents.append(doc) return documents def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): log_message("Начинаю загрузку JSON документов") try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')] log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} прямых JSON файлов") all_documents = [] for zip_file_path in zip_files: try: log_message(f"Загружаю ZIP архив: {zip_file_path}") local_zip_path = hf_hub_download( repo_id=repo_id, filename=zip_file_path, local_dir=download_dir, repo_type="dataset", token=hf_token ) documents = extract_zip_and_process_json(local_zip_path) all_documents.extend(documents) except Exception as e: log_message(f"Ошибка обработки ZIP файла {zip_file_path}: {str(e)}") continue for file_path in json_files: try: log_message(f"Обрабатываю прямой JSON файл: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir=download_dir, repo_type="dataset", token=hf_token ) with open(local_path, 'r', encoding='utf-8') as f: json_data = json.load(f) document_metadata = json_data.get('document_metadata', {}) document_id = document_metadata.get('document_id', 'unknown') document_name = document_metadata.get('document_name', 'unknown') documents = extract_text_from_json(json_data, document_id, document_name) all_documents.extend(documents) log_message(f"Извлечено {len(documents)} документов из {file_path}") except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue chunked_documents, chunk_info = process_documents_with_chunking(all_documents) log_message(f"Всего создано {len(all_documents)} исходных документов") log_message(f"После chunking получено {len(chunked_documents)} чанков") return chunked_documents, chunk_info except Exception as e: log_message(f"Ошибка загрузки JSON документов: {str(e)}") return [], [] def extract_section_title(section_text): if not section_text.strip(): return "" lines = section_text.strip().split('\n') first_line = lines[0].strip() if len(first_line) < 200 and not first_line.endswith('.'): return first_line # Otherwise, extract first sentence sentences = first_line.split('.') if len(sentences) > 1: return sentences[0].strip() return first_line[:100] + "..." if len(first_line) > 100 else first_line def extract_zip_and_process_json(zip_path): documents = [] try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_files = zip_ref.namelist() json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] log_message(f"Найдено {len(json_files)} JSON файлов в архиве") for json_file in json_files: try: log_message(f"Обрабатываю файл из архива: {json_file}") with zip_ref.open(json_file) as f: json_data = json.load(f) document_metadata = json_data.get('document_metadata', {}) document_id = document_metadata.get('document_id', 'unknown') document_name = document_metadata.get('document_name', 'unknown') docs = extract_text_from_json(json_data, document_id, document_name) documents.extend(docs) log_message(f"Извлечено {len(docs)} документов из {json_file}") except Exception as e: log_message(f"Ошибка обработки файла {json_file}: {str(e)}") continue except Exception as e: log_message(f"Ошибка извлечения ZIP архива {zip_path}: {str(e)}") return documents def table_to_document(table_data, document_id=None): documents = [] if isinstance(table_data, dict): doc_id = document_id or table_data.get('document_id', table_data.get('document', 'Неизвестно')) table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') header_content = f"Таблица: {table_num}\nНазвание: {table_title}\nДокумент: {doc_id}\nРаздел: {section}\n" if 'data' in table_data and isinstance(table_data['data'], list): table_content = header_content + "\nДанные таблицы:\n" for row_idx, row in enumerate(table_data['data']): if isinstance(row, dict): row_text = " | ".join([f"{k}: {v}" for k, v in row.items()]) table_content += f"Строка {row_idx + 1}: {row_text}\n" doc = Document( text=table_content, metadata={ "type": "table", "table_number": table_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "total_rows": len(table_data['data']) } ) documents.append(doc) else: doc = Document( text=header_content, metadata={ "type": "table", "table_number": table_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section } ) documents.append(doc) return documents def load_table_data(repo_id, hf_token, table_data_dir): log_message("Начинаю загрузку табличных данных") table_files = [] try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) for file in files: if file.startswith(table_data_dir) and file.endswith('.json'): table_files.append(file) log_message(f"Найдено {len(table_files)} JSON файлов с таблицами") table_documents = [] for file_path in table_files: try: log_message(f"Обрабатываю файл: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) with open(local_path, 'r', encoding='utf-8') as f: table_data = json.load(f) if isinstance(table_data, dict): document_id = table_data.get('document', 'unknown') if 'sheets' in table_data: for sheet in table_data['sheets']: sheet['document'] = document_id # table_to_document теперь возвращает список docs_list = table_to_document(sheet, document_id) table_documents.extend(docs_list) # extend вместо append else: docs_list = table_to_document(table_data, document_id) table_documents.extend(docs_list) # extend вместо append elif isinstance(table_data, list): for table_json in table_data: docs_list = table_to_document(table_json) table_documents.extend(docs_list) # extend вместо append except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Создано {len(table_documents)} документов из таблиц") return table_documents except Exception as e: log_message(f"Ошибка загрузки табличных данных: {str(e)}") return [] def load_image_data(repo_id, hf_token, image_data_dir): log_message("Начинаю загрузку данных изображений") image_files = [] try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) for file in files: if file.startswith(image_data_dir) and file.endswith('.csv'): image_files.append(file) log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") image_documents = [] for file_path in image_files: try: log_message(f"Обрабатываю файл изображений: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) df = pd.read_csv(local_path) log_message(f"Загружено {len(df)} записей изображений из файла {file_path}") # Обработка с правильными названиями колонок for _, row in df.iterrows(): section_value = row.get('Раздел документа', 'Неизвестно') content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" # Опечатка в названии колонки content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" content += f"Раздел: {section_value}\n" content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n" doc = Document( text=content, metadata={ "type": "image", "image_number": str(row.get('№ Изображения', 'unknown')), "image_title": str(row.get('Название изображения', 'unknown')), "image_description": str(row.get('Описание изображение', 'unknown')), "document_id": str(row.get('Обозначение документа', 'unknown')), "file_path": str(row.get('Файл изображения', 'unknown')), "section": str(section_value), "section_id": str(section_value) } ) image_documents.append(doc) except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Создано {len(image_documents)} документов из изображений") return image_documents except Exception as e: log_message(f"Ошибка загрузки данных изображений: {str(e)}") return [] def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): log_message("Загружаю данные чанков из CSV") try: chunks_csv_path = hf_hub_download( repo_id=repo_id, filename=chunks_filename, local_dir=download_dir, repo_type="dataset", token=hf_token ) chunks_df = pd.read_csv(chunks_csv_path) log_message(f"Загружено {len(chunks_df)} чанков из CSV") text_column = None for col in chunks_df.columns: if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower(): text_column = col break if text_column is None: text_column = chunks_df.columns[0] log_message(f"Использую колонку: {text_column}") documents = [] for i, (_, row) in enumerate(chunks_df.iterrows()): doc = Document( text=str(row[text_column]), metadata={ "chunk_id": row.get('chunk_id', i), "document_id": row.get('document_id', 'unknown'), "type": "text" } ) documents.append(doc) log_message(f"Создано {len(documents)} текстовых документов из CSV") return documents, chunks_df except Exception as e: log_message(f"Ошибка загрузки CSV данных: {str(e)}") return [], None