Spaces:
Sleeping
Sleeping
| import json | |
| import zipfile | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from llama_index.core import Document | |
| from my_logging import log_message | |
| from llama_index.core.text_splitter import SentenceSplitter | |
| from config import CHUNK_SIZE, CHUNK_OVERLAP | |
| from table_prep import table_to_document, load_table_data | |
| def chunk_document(doc, chunk_size=None, chunk_overlap=None): | |
| if chunk_size is None: | |
| chunk_size = CHUNK_SIZE | |
| if chunk_overlap is None: | |
| chunk_overlap = CHUNK_OVERLAP | |
| text_splitter = SentenceSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| separator=" " | |
| ) | |
| text_chunks = text_splitter.split_text(doc.text) | |
| chunked_docs = [] | |
| for i, chunk_text in enumerate(text_chunks): | |
| chunk_metadata = doc.metadata.copy() | |
| chunk_metadata.update({ | |
| "chunk_id": i, | |
| "total_chunks": len(text_chunks), | |
| "chunk_size": len(chunk_text), | |
| "original_doc_id": doc.id_ if hasattr(doc, 'id_') else None | |
| }) | |
| chunked_doc = Document( | |
| text=chunk_text, | |
| metadata=chunk_metadata | |
| ) | |
| chunked_docs.append(chunked_doc) | |
| return chunked_docs | |
| def process_documents_with_chunking(documents): | |
| all_chunked_docs = [] | |
| chunk_info = [] | |
| table_count = 0 | |
| table_chunks_count = 0 | |
| image_count = 0 | |
| image_chunks_count = 0 | |
| text_chunks_count = 0 | |
| for doc in documents: | |
| doc_type = doc.metadata.get('type', 'text') | |
| is_already_chunked = doc.metadata.get('is_chunked', False) | |
| if doc_type == 'table': | |
| if is_already_chunked: | |
| table_chunks_count += 1 | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': doc.metadata.get('chunk_id', 0), | |
| 'total_chunks': doc.metadata.get('total_chunks', 1), | |
| 'chunk_size': len(doc.text), | |
| 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, | |
| 'type': 'table', | |
| 'table_number': doc.metadata.get('table_number', 'unknown') | |
| }) | |
| else: | |
| table_count += 1 | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': 0, | |
| 'chunk_size': len(doc.text), | |
| 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, | |
| 'type': 'table', | |
| 'table_number': doc.metadata.get('table_number', 'unknown') | |
| }) | |
| elif doc_type == 'image': | |
| image_count += 1 | |
| doc_size = len(doc.text) | |
| if doc_size > CHUNK_SIZE: | |
| log_message(f"📷 CHUNKING: Изображение {doc.metadata.get('image_number', 'unknown')} | " | |
| f"Размер: {doc_size} > {CHUNK_SIZE}") | |
| chunked_docs = chunk_document(doc) | |
| image_chunks_count += len(chunked_docs) | |
| all_chunked_docs.extend(chunked_docs) | |
| log_message(f" ✂️ Разделено на {len(chunked_docs)} чанков") | |
| for i, chunk_doc in enumerate(chunked_docs): | |
| chunk_info.append({ | |
| 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': i, | |
| 'chunk_size': len(chunk_doc.text), | |
| 'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text, | |
| 'type': 'image', | |
| 'image_number': chunk_doc.metadata.get('image_number', 'unknown') | |
| }) | |
| else: | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': 0, | |
| 'chunk_size': doc_size, | |
| 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, | |
| 'type': 'image', | |
| 'image_number': doc.metadata.get('image_number', 'unknown') | |
| }) | |
| else: | |
| doc_size = len(doc.text) | |
| if doc_size > CHUNK_SIZE: | |
| log_message(f"📝 CHUNKING: Текст из '{doc.metadata.get('document_id', 'unknown')}' | " | |
| f"Размер: {doc_size} > {CHUNK_SIZE}") | |
| chunked_docs = chunk_document(doc) | |
| text_chunks_count += len(chunked_docs) | |
| all_chunked_docs.extend(chunked_docs) | |
| log_message(f" ✂️ Разделен на {len(chunked_docs)} чанков") | |
| for i, chunk_doc in enumerate(chunked_docs): | |
| chunk_info.append({ | |
| 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': i, | |
| 'chunk_size': len(chunk_doc.text), | |
| 'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text, | |
| 'type': 'text' | |
| }) | |
| else: | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': 0, | |
| 'chunk_size': doc_size, | |
| 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, | |
| 'type': 'text' | |
| }) | |
| log_message(f"\n{'='*60}") | |
| log_message(f"ИТОГО ОБРАБОТАНО ДОКУМЕНТОВ:") | |
| log_message(f" • Таблицы (целые): {table_count}") | |
| log_message(f" • Таблицы (чанки): {table_chunks_count}") | |
| log_message(f" • Изображения (целые): {image_count - (image_chunks_count > 0)}") | |
| log_message(f" • Изображения (чанки): {image_chunks_count}") | |
| log_message(f" • Текстовые чанки: {text_chunks_count}") | |
| log_message(f" • Всего документов: {len(all_chunked_docs)}") | |
| log_message(f"{'='*60}\n") | |
| return all_chunked_docs, chunk_info | |
| def extract_text_from_json(data, document_id, document_name): | |
| documents = [] | |
| if 'sections' in data: | |
| for section in data['sections']: | |
| section_id = section.get('section_id', 'Unknown') | |
| section_text = section.get('section_text', '') | |
| section_path = f"{section_id}" | |
| section_title = extract_section_title(section_text) | |
| if section_text.strip(): | |
| doc = Document( | |
| text=section_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": section_id, | |
| "section_text": section_title[:200], | |
| "section_path": section_path, | |
| "level": "section" | |
| } | |
| ) | |
| documents.append(doc) | |
| if 'subsections' in section: | |
| for subsection in section['subsections']: | |
| subsection_id = subsection.get('subsection_id', 'Unknown') | |
| subsection_text = subsection.get('subsection_text', '') | |
| subsection_title = extract_section_title(subsection_text) | |
| subsection_path = f"{section_path}.{subsection_id}" | |
| if subsection_text.strip(): | |
| doc = Document( | |
| text=subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": subsection_id, | |
| "section_text": subsection_title[:200], | |
| "section_path": subsection_path, | |
| "level": "subsection", | |
| "parent_section": section_id, | |
| "parent_title": section_title[:100] | |
| } | |
| ) | |
| documents.append(doc) | |
| if 'sub_subsections' in subsection: | |
| for sub_subsection in subsection['sub_subsections']: | |
| sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown') | |
| sub_subsection_text = sub_subsection.get('sub_subsection_text', '') | |
| sub_subsection_title = extract_section_title(sub_subsection_text) | |
| sub_subsection_path = f"{subsection_path}.{sub_subsection_id}" | |
| if sub_subsection_text.strip(): | |
| doc = Document( | |
| text=sub_subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": sub_subsection_id, | |
| "section_text": sub_subsection_title[:200], | |
| "section_path": sub_subsection_path, | |
| "level": "sub_subsection", | |
| "parent_section": subsection_id, | |
| "parent_title": subsection_title[:100] | |
| } | |
| ) | |
| documents.append(doc) | |
| if 'sub_sub_subsections' in sub_subsection: | |
| for sub_sub_subsection in sub_subsection['sub_sub_subsections']: | |
| sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown') | |
| sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '') | |
| sub_sub_subsection_title = extract_section_title(sub_sub_subsection_text) | |
| if sub_sub_subsection_text.strip(): | |
| doc = Document( | |
| text=sub_sub_subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": sub_sub_subsection_id, | |
| "section_text": sub_sub_subsection_title[:200], | |
| "section_path": f"{sub_subsection_path}.{sub_sub_subsection_id}", | |
| "level": "sub_sub_subsection", | |
| "parent_section": sub_subsection_id, | |
| "parent_title": sub_subsection_title[:100] | |
| } | |
| ) | |
| documents.append(doc) | |
| return documents | |
| def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): | |
| log_message("Начинаю загрузку JSON документов") | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] | |
| json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} прямых JSON файлов") | |
| all_documents = [] | |
| for zip_file_path in zip_files: | |
| try: | |
| log_message(f"Загружаю ZIP архив: {zip_file_path}") | |
| local_zip_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=zip_file_path, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| documents = extract_zip_and_process_json(local_zip_path) | |
| all_documents.extend(documents) | |
| log_message(f"Извлечено {len(documents)} документов из ZIP архива {zip_file_path}") | |
| except Exception as e: | |
| log_message(f"Ошибка обработки ZIP файла {zip_file_path}: {str(e)}") | |
| continue | |
| for file_path in json_files: | |
| try: | |
| log_message(f"Обрабатываю прямой JSON файл: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| document_metadata = json_data.get('document_metadata', {}) | |
| document_id = document_metadata.get('document_id', 'unknown') | |
| document_name = document_metadata.get('document_name', 'unknown') | |
| documents = extract_text_from_json(json_data, document_id, document_name) | |
| all_documents.extend(documents) | |
| log_message(f"Извлечено {len(documents)} документов из {file_path}") | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Всего создано {len(all_documents)} исходных документов из JSON файлов") | |
| # Process documents through chunking function | |
| chunked_documents, chunk_info = process_documents_with_chunking(all_documents) | |
| log_message(f"После chunking получено {len(chunked_documents)} чанков из JSON данных") | |
| return chunked_documents, chunk_info | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки JSON документов: {str(e)}") | |
| return [], [] | |
| def extract_section_title(section_text): | |
| if not section_text.strip(): | |
| return "" | |
| lines = section_text.strip().split('\n') | |
| first_line = lines[0].strip() | |
| if len(first_line) < 200 and not first_line.endswith('.'): | |
| return first_line | |
| # Otherwise, extract first sentence | |
| sentences = first_line.split('.') | |
| if len(sentences) > 1: | |
| return sentences[0].strip() | |
| return first_line[:100] + "..." if len(first_line) > 100 else first_line | |
| def extract_zip_and_process_json(zip_path): | |
| documents = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_files = zip_ref.namelist() | |
| json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] | |
| log_message(f"Найдено {len(json_files)} JSON файлов в архиве") | |
| for json_file in json_files: | |
| try: | |
| log_message(f"Обрабатываю файл из архива: {json_file}") | |
| with zip_ref.open(json_file) as f: | |
| json_data = json.load(f) | |
| document_metadata = json_data.get('document_metadata', {}) | |
| document_id = document_metadata.get('document_id', 'unknown') | |
| document_name = document_metadata.get('document_name', 'unknown') | |
| docs = extract_text_from_json(json_data, document_id, document_name) | |
| documents.extend(docs) | |
| log_message(f"Извлечено {len(docs)} документов из {json_file}") | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {json_file}: {str(e)}") | |
| continue | |
| except Exception as e: | |
| log_message(f"Ошибка извлечения ZIP архива {zip_path}: {str(e)}") | |
| return documents | |
| def load_image_data(repo_id, hf_token, image_data_dir): | |
| log_message("Начинаю загрузку данных изображений") | |
| image_files = [] | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| for file in files: | |
| if file.startswith(image_data_dir) and file.endswith('.csv'): | |
| image_files.append(file) | |
| log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") | |
| image_documents = [] | |
| for file_path in image_files: | |
| try: | |
| log_message(f"Обрабатываю файл изображений: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| local_dir='', | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| df = pd.read_csv(local_path) | |
| log_message(f"Загружено {len(df)} записей изображений из файла {file_path}") | |
| # Обработка с правильными названиями колонок | |
| for _, row in df.iterrows(): | |
| section_value = row.get('Раздел документа', 'Неизвестно') | |
| content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" | |
| content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" | |
| content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" # Опечатка в названии колонки | |
| content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" | |
| content += f"Раздел: {section_value}\n" | |
| content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n" | |
| doc = Document( | |
| text=content, | |
| metadata={ | |
| "type": "image", | |
| "image_number": str(row.get('№ Изображения', 'unknown')), | |
| "image_title": str(row.get('Название изображения', 'unknown')), | |
| "image_description": str(row.get('Описание изображение', 'unknown')), | |
| "document_id": str(row.get('Обозначение документа', 'unknown')), | |
| "file_path": str(row.get('Файл изображения', 'unknown')), | |
| "section": str(section_value), | |
| "section_id": str(section_value) | |
| } | |
| ) | |
| image_documents.append(doc) | |
| except Exception as e: | |
| log_message(f"Ошибка обработки файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Создано {len(image_documents)} документов из изображений") | |
| return image_documents | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки данных изображений: {str(e)}") | |
| return [] | |
| def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): | |
| log_message("Загружаю данные чанков из CSV") | |
| try: | |
| chunks_csv_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=chunks_filename, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| chunks_df = pd.read_csv(chunks_csv_path) | |
| log_message(f"Загружено {len(chunks_df)} чанков из CSV") | |
| text_column = None | |
| for col in chunks_df.columns: | |
| if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower(): | |
| text_column = col | |
| break | |
| if text_column is None: | |
| text_column = chunks_df.columns[0] | |
| log_message(f"Использую колонку: {text_column}") | |
| documents = [] | |
| for i, (_, row) in enumerate(chunks_df.iterrows()): | |
| doc = Document( | |
| text=str(row[text_column]), | |
| metadata={ | |
| "chunk_id": row.get('chunk_id', i), | |
| "document_id": row.get('document_id', 'unknown'), | |
| "type": "text" | |
| } | |
| ) | |
| documents.append(doc) | |
| log_message(f"Создано {len(documents)} текстовых документов из CSV") | |
| return documents, chunks_df | |
| except Exception as e: | |
| log_message(f"Ошибка загрузки CSV данных: {str(e)}") | |
| return [], None |