Spaces:
Sleeping
Sleeping
| import json | |
| import zipfile | |
| import pandas as pd | |
| from collections import Counter | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from llama_index.core import Document | |
| from llama_index.core.text_splitter import SentenceSplitter | |
| from my_logging import log_message | |
| from config import CHUNK_SIZE, CHUNK_OVERLAP | |
| # ============================================================================ | |
| # TEXT CHUNKING | |
| # ============================================================================ | |
| def chunk_text_document(doc): | |
| """Split text document into chunks using sentence splitter""" | |
| text_splitter = SentenceSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| separator=" " | |
| ) | |
| text_chunks = text_splitter.split_text(doc.text) | |
| chunked_docs = [] | |
| for i, chunk_text in enumerate(text_chunks): | |
| chunk_metadata = doc.metadata.copy() | |
| chunk_metadata.update({ | |
| "chunk_id": i, | |
| "total_chunks": len(text_chunks), | |
| "chunk_size": len(chunk_text) | |
| }) | |
| chunked_docs.append(Document(text=chunk_text, metadata=chunk_metadata)) | |
| return chunked_docs | |
| # ============================================================================ | |
| # TABLE PROCESSING | |
| # ============================================================================ | |
| def extract_table_metadata(table_text): | |
| """Extract key terms from table for enrichment""" | |
| words = table_text.split() | |
| # Filter stopwords and short words | |
| stopwords = {"и", "в", "на", "по", "с", "для", "из", "при", "а", "как", "или", "но", "к", "от"} | |
| filtered = [w for w in words if len(w) > 3 and w.lower() not in stopwords] | |
| # Get top 15 most common terms | |
| common = Counter(filtered).most_common(15) | |
| key_terms = [w for w, _ in common] | |
| return { | |
| "summary": f"Таблица содержит {len(words)} слов", | |
| "key_terms": key_terms | |
| } | |
| def create_table_content(table_data): | |
| """Format table data as text""" | |
| doc_id = table_data.get('document_id', table_data.get('document', 'Неизвестно')) | |
| table_num = table_data.get('table_number', 'Неизвестно') | |
| table_title = table_data.get('table_title', 'Неизвестно') | |
| section = table_data.get('section', 'Неизвестно') | |
| content = f"Таблица: {table_num}\n" | |
| content += f"Название: {table_title}\n" | |
| content += f"Документ: {doc_id}\n" | |
| content += f"Раздел: {section}\n" | |
| # Add headers | |
| headers = table_data.get('headers', []) | |
| if headers: | |
| content += f"\nЗаголовки: {' | '.join(headers)}\n" | |
| # Add data rows | |
| if 'data' in table_data and isinstance(table_data['data'], list): | |
| content += "\nДанные таблицы:\n" | |
| for row_idx, row in enumerate(table_data['data'], start=1): | |
| if isinstance(row, dict): | |
| row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v]) | |
| content += f"Строка {row_idx}: {row_text}\n" | |
| return content | |
| def chunk_table_by_rows(doc): | |
| """Split large table into chunks by rows, preserving headers""" | |
| # Extract metadata | |
| table_metadata = extract_table_metadata(doc.text) | |
| table_num = doc.metadata.get('table_number', 'unknown') | |
| table_title = doc.metadata.get('table_title', 'unknown') | |
| # Parse table structure | |
| lines = doc.text.strip().split('\n') | |
| # Separate header and data rows | |
| table_header_lines = [] | |
| data_rows = [] | |
| in_data = False | |
| for line in lines: | |
| if line.startswith('Данные таблицы:'): | |
| in_data = True | |
| table_header_lines.append(line) | |
| elif in_data and line.startswith('Строка'): | |
| data_rows.append(line) | |
| elif not in_data: | |
| table_header_lines.append(line) | |
| table_header = '\n'.join(table_header_lines) + '\n' | |
| # If no rows, use standard text splitting | |
| if not data_rows: | |
| log_message(f" ⚠️ Таблица {table_num}: нет строк данных, использую стандартное разбиение") | |
| return chunk_text_document(doc) | |
| log_message(f" 📋 Таблица {table_num}: найдено {len(data_rows)} строк данных") | |
| # Row-based chunking | |
| header_size = len(table_header) | |
| available_size = CHUNK_SIZE - header_size - 300 # Reserve space for enrichment | |
| text_chunks = [] | |
| current_chunk_rows = [] | |
| current_size = 0 | |
| for row in data_rows: | |
| row_size = len(row) + 1 | |
| # If adding this row exceeds limit, create chunk | |
| if current_size + row_size > available_size and current_chunk_rows: | |
| chunk_text = table_header + '\n'.join(current_chunk_rows) | |
| text_chunks.append(chunk_text) | |
| log_message(f" ✂️ Создан чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов") | |
| # Keep last 2 rows for overlap | |
| overlap_count = min(2, len(current_chunk_rows)) | |
| current_chunk_rows = current_chunk_rows[-overlap_count:] | |
| current_size = sum(len(r) + 1 for r in current_chunk_rows) | |
| current_chunk_rows.append(row) | |
| current_size += row_size | |
| # Final chunk | |
| if current_chunk_rows: | |
| chunk_text = table_header + '\n'.join(current_chunk_rows) | |
| text_chunks.append(chunk_text) | |
| log_message(f" ✂️ Последний чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов") | |
| log_message(f" 📊 Таблица {table_num} разделена на {len(text_chunks)} чанков") | |
| # Create enriched chunks with metadata | |
| chunked_docs = [] | |
| key_terms = table_metadata.get("key_terms", []) | |
| for i, chunk_text in enumerate(text_chunks): | |
| chunk_metadata = doc.metadata.copy() | |
| chunk_metadata.update({ | |
| "chunk_id": i, | |
| "total_chunks": len(text_chunks), | |
| "chunk_size": len(chunk_text), | |
| "is_chunked": True, | |
| "key_terms": key_terms | |
| }) | |
| # Add enrichment prefix | |
| terms_str = ', '.join(key_terms[:10]) if key_terms else 'нет' | |
| enriched_text = f"""[Таблица {table_num}: {table_title}] | |
| [Ключевые термины: {terms_str}] | |
| {chunk_text}""" | |
| chunked_docs.append(Document(text=enriched_text, metadata=chunk_metadata)) | |
| return chunked_docs | |
| def table_to_document(table_data, document_id=None): | |
| """Convert table data to Document, chunking if needed""" | |
| if not isinstance(table_data, dict): | |
| log_message(f"⚠️ ПРОПУЩЕНА: table_data не является словарем") | |
| return [] | |
| doc_id = document_id or table_data.get('document_id') or table_data.get('document', 'Неизвестно') | |
| table_num = table_data.get('table_number', 'Неизвестно') | |
| table_title = table_data.get('table_title', 'Неизвестно') | |
| section = table_data.get('section', 'Неизвестно') | |
| table_rows = table_data.get('data', []) | |
| if not table_rows: | |
| log_message(f"⚠️ ПРОПУЩЕНА: Таблица {table_num} - нет данных") | |
| return [] | |
| content = create_table_content(table_data) | |
| content_size = len(content) | |
| base_doc = Document( | |
| text=content, | |
| metadata={ | |
| "type": "table", | |
| "table_number": table_num, | |
| "table_title": table_title, | |
| "document_id": doc_id, | |
| "section": section, | |
| "section_id": section, | |
| "total_rows": len(table_rows), | |
| "content_size": content_size | |
| } | |
| ) | |
| # Chunk if needed | |
| if content_size > CHUNK_SIZE: | |
| log_message(f"📊 CHUNKING: Таблица {table_num} | Размер: {content_size} > {CHUNK_SIZE}") | |
| return chunk_table_by_rows(base_doc) | |
| else: | |
| log_message(f"✓ Таблица {table_num} | Размер: {content_size} символов | Строк: {len(table_rows)}") | |
| return [base_doc] | |
| def load_table_data(repo_id, hf_token, table_data_dir): | |
| """Load all table data from HuggingFace repo""" | |
| log_message("=" * 60) | |
| log_message("ЗАГРУЗКА ТАБЛИЧНЫХ ДАННЫХ") | |
| log_message("=" * 60) | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| table_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(table_files)} JSON файлов с таблицами") | |
| table_documents = [] | |
| for file_path in table_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| local_dir='', | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| log_message(f"\nОбработка файла: {file_path}") | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| table_data = json.load(f) | |
| if isinstance(table_data, dict): | |
| document_id = table_data.get('document', 'unknown') | |
| # Process sheets if present | |
| if 'sheets' in table_data: | |
| sorted_sheets = sorted( | |
| table_data['sheets'], | |
| key=lambda sheet: sheet.get('table_number', '') | |
| ) | |
| for sheet in sorted_sheets: | |
| sheet['document'] = document_id | |
| docs_list = table_to_document(sheet, document_id) | |
| table_documents.extend(docs_list) | |
| else: | |
| docs_list = table_to_document(table_data, document_id) | |
| table_documents.extend(docs_list) | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"\n{'='*60}") | |
| log_message(f"Загружено {len(table_documents)} табличных документов") | |
| log_message("=" * 60) | |
| return table_documents | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА загрузки таблиц: {str(e)}") | |
| return [] | |
| # ============================================================================ | |
| # JSON TEXT DOCUMENTS | |
| # ============================================================================ | |
| def extract_section_title(section_text): | |
| """Extract clean title from section text""" | |
| if not section_text.strip(): | |
| return "" | |
| first_line = section_text.strip().split('\n')[0].strip() | |
| if len(first_line) < 200 and not first_line.endswith('.'): | |
| return first_line | |
| sentences = first_line.split('.') | |
| if len(sentences) > 1: | |
| return sentences[0].strip() | |
| return first_line[:100] + "..." if len(first_line) > 100 else first_line | |
| def extract_text_from_json(data, document_id, document_name): | |
| """Extract text documents from JSON structure""" | |
| documents = [] | |
| if 'sections' not in data: | |
| return documents | |
| for section in data['sections']: | |
| section_id = section.get('section_id', 'Unknown') | |
| section_text = section.get('section_text', '') | |
| if section_text.strip(): | |
| section_title = extract_section_title(section_text) | |
| doc = Document( | |
| text=section_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": section_id, | |
| "section_text": section_title[:200], | |
| "section_path": section_id, | |
| "level": "section" | |
| } | |
| ) | |
| documents.append(doc) | |
| # Process subsections recursively | |
| if 'subsections' in section: | |
| for subsection in section['subsections']: | |
| subsection_id = subsection.get('subsection_id', 'Unknown') | |
| subsection_text = subsection.get('subsection_text', '') | |
| if subsection_text.strip(): | |
| subsection_title = extract_section_title(subsection_text) | |
| doc = Document( | |
| text=subsection_text, | |
| metadata={ | |
| "type": "text", | |
| "document_id": document_id, | |
| "document_name": document_name, | |
| "section_id": subsection_id, | |
| "section_text": subsection_title[:200], | |
| "section_path": f"{section_id}.{subsection_id}", | |
| "level": "subsection", | |
| "parent_section": section_id | |
| } | |
| ) | |
| documents.append(doc) | |
| return documents | |
| def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): | |
| """Load JSON documents from HuggingFace repo""" | |
| log_message("=" * 60) | |
| log_message("ЗАГРУЗКА JSON ДОКУМЕНТОВ") | |
| log_message("=" * 60) | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] | |
| json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')] | |
| log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} JSON файлов") | |
| all_documents = [] | |
| # Process ZIP files | |
| for zip_file_path in zip_files: | |
| try: | |
| log_message(f"Загружаю ZIP: {zip_file_path}") | |
| local_zip_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=zip_file_path, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| with zipfile.ZipFile(local_zip_path, 'r') as zip_ref: | |
| json_files_in_zip = [f for f in zip_ref.namelist() | |
| if f.endswith('.json') and not f.startswith('__MACOSX')] | |
| for json_file in json_files_in_zip: | |
| with zip_ref.open(json_file) as f: | |
| json_data = json.load(f) | |
| metadata = json_data.get('document_metadata', {}) | |
| doc_id = metadata.get('document_id', 'unknown') | |
| doc_name = metadata.get('document_name', 'unknown') | |
| docs = extract_text_from_json(json_data, doc_id, doc_name) | |
| all_documents.extend(docs) | |
| log_message(f"Извлечено документов из ZIP: {len(all_documents)}") | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА ZIP {zip_file_path}: {str(e)}") | |
| continue | |
| # Process direct JSON files | |
| for file_path in json_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| metadata = json_data.get('document_metadata', {}) | |
| doc_id = metadata.get('document_id', 'unknown') | |
| doc_name = metadata.get('document_name', 'unknown') | |
| docs = extract_text_from_json(json_data, doc_id, doc_name) | |
| all_documents.extend(docs) | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА JSON {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Всего загружено {len(all_documents)} текстовых документов") | |
| # Chunk all documents | |
| chunked_documents, chunk_info = process_documents_with_chunking(all_documents) | |
| log_message(f"После chunking: {len(chunked_documents)} чанков") | |
| log_message("=" * 60) | |
| return chunked_documents, chunk_info | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА загрузки JSON: {str(e)}") | |
| return [], [] | |
| # ============================================================================ | |
| # IMAGE DATA | |
| # ============================================================================ | |
| def load_image_data(repo_id, hf_token, image_data_dir): | |
| """Load image metadata from CSV files""" | |
| log_message("=" * 60) | |
| log_message("ЗАГРУЗКА ДАННЫХ ИЗОБРАЖЕНИЙ") | |
| log_message("=" * 60) | |
| try: | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| image_files = [f for f in files if f.startswith(image_data_dir) and f.endswith('.csv')] | |
| log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") | |
| image_documents = [] | |
| for file_path in image_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| local_dir='', | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| df = pd.read_csv(local_path) | |
| log_message(f"Загружено {len(df)} изображений из {file_path}") | |
| for _, row in df.iterrows(): | |
| content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" | |
| content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" | |
| content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" | |
| content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" | |
| content += f"Раздел: {row.get('Раздел документа', 'Неизвестно')}\n" | |
| doc = Document( | |
| text=content, | |
| metadata={ | |
| "type": "image", | |
| "image_number": str(row.get('№ Изображения', 'unknown')), | |
| "image_title": str(row.get('Название изображения', 'unknown')), | |
| "document_id": str(row.get('Обозначение документа', 'unknown')), | |
| "section": str(row.get('Раздел документа', 'unknown')) | |
| } | |
| ) | |
| image_documents.append(doc) | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}") | |
| continue | |
| log_message(f"Загружено {len(image_documents)} документов изображений") | |
| log_message("=" * 60) | |
| return image_documents | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА загрузки изображений: {str(e)}") | |
| return [] | |
| # ============================================================================ | |
| # DOCUMENT PROCESSING WITH CHUNKING | |
| # ============================================================================ | |
| def process_documents_with_chunking(documents): | |
| """Process all documents and chunk if needed""" | |
| all_chunked_docs = [] | |
| chunk_info = [] | |
| stats = { | |
| 'text_chunks': 0, | |
| 'table_whole': 0, | |
| 'table_chunks': 0, | |
| 'image_whole': 0, | |
| 'image_chunks': 0 | |
| } | |
| for doc in documents: | |
| doc_type = doc.metadata.get('type', 'text') | |
| is_already_chunked = doc.metadata.get('is_chunked', False) | |
| doc_size = len(doc.text) | |
| # Tables - already chunked or whole | |
| if doc_type == 'table': | |
| if is_already_chunked: | |
| stats['table_chunks'] += 1 | |
| else: | |
| stats['table_whole'] += 1 | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': doc.metadata.get('chunk_id', 0), | |
| 'total_chunks': doc.metadata.get('total_chunks', 1), | |
| 'chunk_size': doc_size, | |
| 'chunk_preview': doc.text[:200] + "..." if doc_size > 200 else doc.text, | |
| 'type': 'table', | |
| 'table_number': doc.metadata.get('table_number', 'unknown') | |
| }) | |
| # Images - chunk if too large | |
| elif doc_type == 'image': | |
| if doc_size > CHUNK_SIZE: | |
| log_message(f"📷 CHUNKING: Изображение {doc.metadata.get('image_number')} | Размер: {doc_size}") | |
| chunked_docs = chunk_text_document(doc) | |
| stats['image_chunks'] += len(chunked_docs) | |
| all_chunked_docs.extend(chunked_docs) | |
| for i, chunk_doc in enumerate(chunked_docs): | |
| chunk_info.append({ | |
| 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': i, | |
| 'chunk_size': len(chunk_doc.text), | |
| 'chunk_preview': chunk_doc.text[:200] + "...", | |
| 'type': 'image', | |
| 'image_number': chunk_doc.metadata.get('image_number', 'unknown') | |
| }) | |
| else: | |
| stats['image_whole'] += 1 | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': 0, | |
| 'chunk_size': doc_size, | |
| 'chunk_preview': doc.text[:200] + "...", | |
| 'type': 'image', | |
| 'image_number': doc.metadata.get('image_number', 'unknown') | |
| }) | |
| # Text - chunk if too large | |
| else: | |
| if doc_size > CHUNK_SIZE: | |
| log_message(f"📝 CHUNKING: Текст '{doc.metadata.get('document_id')}' | Размер: {doc_size}") | |
| chunked_docs = chunk_text_document(doc) | |
| stats['text_chunks'] += len(chunked_docs) | |
| all_chunked_docs.extend(chunked_docs) | |
| for i, chunk_doc in enumerate(chunked_docs): | |
| chunk_info.append({ | |
| 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': i, | |
| 'chunk_size': len(chunk_doc.text), | |
| 'chunk_preview': chunk_doc.text[:200] + "...", | |
| 'type': 'text' | |
| }) | |
| else: | |
| all_chunked_docs.append(doc) | |
| chunk_info.append({ | |
| 'document_id': doc.metadata.get('document_id', 'unknown'), | |
| 'section_id': doc.metadata.get('section_id', 'unknown'), | |
| 'chunk_id': 0, | |
| 'chunk_size': doc_size, | |
| 'chunk_preview': doc.text[:200] + "...", | |
| 'type': 'text' | |
| }) | |
| # Log summary | |
| log_message(f"\n{'='*60}") | |
| log_message("ИТОГОВАЯ СТАТИСТИКА:") | |
| log_message(f" • Текстовые чанки: {stats['text_chunks']}") | |
| log_message(f" • Таблицы (целые): {stats['table_whole']}") | |
| log_message(f" • Таблицы (чанки): {stats['table_chunks']}") | |
| log_message(f" • Изображения (целые): {stats['image_whole']}") | |
| log_message(f" • Изображения (чанки): {stats['image_chunks']}") | |
| log_message(f" • ВСЕГО ДОКУМЕНТОВ: {len(all_chunked_docs)}") | |
| log_message(f"{'='*60}\n") | |
| return all_chunked_docs, chunk_info | |
| # ============================================================================ | |
| # CSV CHUNKS (Legacy support) | |
| # ============================================================================ | |
| def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): | |
| """Load pre-chunked data from CSV (legacy support)""" | |
| log_message("Загрузка данных из CSV") | |
| try: | |
| chunks_csv_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=chunks_filename, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| chunks_df = pd.read_csv(chunks_csv_path) | |
| log_message(f"Загружено {len(chunks_df)} чанков из CSV") | |
| # Find text column | |
| text_column = None | |
| for col in chunks_df.columns: | |
| if any(keyword in col.lower() for keyword in ['text', 'content', 'chunk']): | |
| text_column = col | |
| break | |
| if text_column is None: | |
| text_column = chunks_df.columns[0] | |
| documents = [] | |
| for i, (_, row) in enumerate(chunks_df.iterrows()): | |
| doc = Document( | |
| text=str(row[text_column]), | |
| metadata={ | |
| "chunk_id": row.get('chunk_id', i), | |
| "document_id": row.get('document_id', 'unknown'), | |
| "type": "text" | |
| } | |
| ) | |
| documents.append(doc) | |
| log_message(f"Создано {len(documents)} документов из CSV") | |
| return documents, chunks_df | |
| except Exception as e: | |
| log_message(f"❌ ОШИБКА загрузки CSV: {str(e)}") | |
| return [], None |