import json import zipfile import pandas as pd from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from llama_index.core.text_splitter import SentenceSplitter from my_logging import log_message # Configuration CHUNK_SIZE = 1500 CHUNK_OVERLAP = 128 def chunk_text_documents(documents): text_splitter = SentenceSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) chunked = [] for doc in documents: chunks = text_splitter.get_nodes_from_documents([doc]) for i, chunk in enumerate(chunks): chunk.metadata.update({ 'chunk_id': i, 'total_chunks': len(chunks), 'chunk_size': len(chunk.text) # Add chunk size }) chunked.append(chunk) # Log statistics if chunked: avg_size = sum(len(c.text) for c in chunked) / len(chunked) min_size = min(len(c.text) for c in chunked) max_size = max(len(c.text) for c in chunked) log_message(f"✓ Text: {len(documents)} docs → {len(chunked)} chunks") log_message(f" Size stats: avg={avg_size:.0f}, min={min_size}, max={max_size} chars") return chunked def chunk_table_by_rows(table_data, doc_id, rows_per_chunk=10, max_chars=2000): """ Chunk tables by rows with fallback to character limit. Keeps 3-4 rows together, but splits individual rows if they're too large. """ headers = table_data.get('headers', []) rows = table_data.get('data', []) table_num = str(table_data.get('table_number', 'unknown')).strip() table_title = table_data.get('table_title', '') section = table_data.get('section', '') # Section-aware identifier (keep your existing logic) import re if 'приложени' in section.lower(): appendix_match = re.search(r'приложени[еия]\s*(\d+|[а-яА-Я])', section.lower()) if appendix_match: appendix_num = appendix_match.group(1).upper() table_identifier = f"{table_num} Приложение {appendix_num}" else: table_identifier = table_num else: table_identifier = table_num if not rows: return [] log_message(f" 📊 Processing: {doc_id} - {table_identifier} ({len(rows)} rows)") # Build base header (compact version) base_header = f"ДОКУМЕНТ: {doc_id} | ТАБЛИЦА: {table_identifier}\n" if table_title: base_header += f"НАЗВАНИЕ: {table_title}\n" base_header += f"{'='*60}\n" if headers: header_str = ' | '.join(str(h)[:30] for h in headers) # Truncate long headers base_header += f"ЗАГОЛОВКИ: {header_str}\n\n" # Calculate available space base_size = len(base_header) footer_size = 100 available_space = max_chars - base_size - footer_size chunks = [] current_batch = [] current_size = 0 chunk_num = 0 for i, row in enumerate(rows): row_text = format_single_row(row, i + 1) row_size = len(row_text) # Case 1: Single row exceeds max - split it internally if row_size > available_space: # Flush current batch first if current_batch: chunks.append(_create_chunk( base_header, current_batch, table_identifier, doc_id, table_num, table_title, section, len(rows), chunk_num, False )) chunk_num += 1 current_batch = [] current_size = 0 log_message(f" ⚠ Row {i+1} too large ({row_size} chars), splitting...") # Split the large row split_chunks = _split_large_row( row, i + 1, base_header, available_space, table_identifier, doc_id, table_num, table_title, section, len(rows), chunk_num ) chunks.extend(split_chunks) log_message(f" → Created {len(split_chunks)} chunks from row {i+1}") chunk_num += len(split_chunks) continue # Case 2: Adding this row would exceed limit - flush current batch if current_size + row_size > available_space and current_batch: chunks.append(_create_chunk( base_header, current_batch, table_identifier, doc_id, table_num, table_title, section, len(rows), chunk_num, False )) chunk_num += 1 current_batch = [] current_size = 0 # Case 3: Add row to current batch current_batch.append({'row': row, 'idx': i + 1, 'text': row_text}) log_message(f" + Row {i+1} ({row_size} chars) added to chunk {chunk_num}") current_size += row_size # Flush if we hit target row count if len(current_batch) >= rows_per_chunk: chunks.append(_create_chunk( base_header, current_batch, table_identifier, doc_id, table_num, table_title, section, len(rows), chunk_num, False )) chunk_num += 1 current_batch = [] current_size = 0 # Flush remaining rows if current_batch: chunks.append(_create_chunk( base_header, current_batch, table_identifier, doc_id, table_num, table_title, section, len(rows), chunk_num, len(chunks) == 0 )) log_message(f" Created {len(chunks)} chunks from {len(rows)} rows") return chunks def _create_chunk(base_header, batch, table_identifier, doc_id, table_num, table_title, section, total_rows, chunk_num, is_complete): """Helper to create a chunk with full metadata""" content = base_header + "ДАННЫЕ:\n" for item in batch: content += item['text'] row_start = batch[0]['idx'] row_end = batch[-1]['idx'] # Add footer with row info if not is_complete: content += f"\n[Строки {row_start}-{row_end} из {total_rows}]" # EMBED ALL METADATA IN TEXT for better retrieval content += f"\n\n--- МЕТАДАННЫЕ ---\n" content += f"Документ: {doc_id}\n" content += f"Таблица: {table_identifier}\n" content += f"Название таблицы: {table_title}\n" content += f"Раздел: {section}\n" content += f"Строки: {row_start}-{row_end} из {total_rows}\n" metadata = { 'type': 'table', 'document_id': doc_id, 'table_number': table_num, 'table_identifier': table_identifier, 'table_title': table_title, 'section': section, 'chunk_id': chunk_num, 'row_start': row_start - 1, 'row_end': row_end, 'total_rows': total_rows, 'chunk_size': len(content), 'is_complete_table': is_complete, 'rows_in_chunk': len(batch) } return Document(text=content, metadata=metadata) def _split_large_row(row, row_idx, base_header, max_size, table_identifier, doc_id, table_num, table_title, section, total_rows, base_chunk_num): """Split a single large row into multiple chunks""" if isinstance(row, dict): items = list(row.items()) else: items = [(f"col_{i}", v) for i, v in enumerate(row)] chunks = [] current_items = [] current_size = 0 part_num = 0 for key, value in items: item_text = f"{key}: {value}\n" item_size = len(item_text) if current_size + item_size > max_size and current_items: # Create chunk for current items content = base_header + "ДАННЫЕ:\n" content += f"Строка {row_idx} (часть {part_num + 1}):\n" content += "".join(current_items) content += f"\n[Строка {row_idx} из {total_rows} - продолжается]" chunks.append(_create_chunk_from_text( content, doc_id, table_num, table_identifier, table_title, section, row_idx, row_idx, total_rows, base_chunk_num + part_num )) part_num += 1 current_items = [] current_size = 0 current_items.append(item_text) current_size += item_size # Flush remaining if current_items: content = base_header + "ДАННЫЕ:\n" content += f"Строка {row_idx} (часть {part_num + 1}):\n" content += "".join(current_items) chunks.append(_create_chunk_from_text( content, doc_id, table_num, table_identifier, table_title, section, row_idx, row_idx, total_rows, base_chunk_num + part_num )) return chunks def _create_chunk_from_text(content, doc_id, table_num, table_identifier, table_title, section, row_start, row_end, total_rows, chunk_num): """Helper for creating chunk from pre-built text""" metadata = { 'type': 'table', 'document_id': doc_id, 'table_number': table_num, 'table_identifier': table_identifier, 'table_title': table_title, 'section': section, 'chunk_id': chunk_num, 'row_start': row_start - 1, 'row_end': row_end, 'total_rows': total_rows, 'chunk_size': len(content), 'is_complete_table': False } return Document(text=content, metadata=metadata) def format_single_row(row, idx): """Format a single row""" if isinstance(row, dict): parts = [f"{k}: {v}" for k, v in row.items() if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] if parts: return f"{idx}. {' | '.join(parts)}\n" elif isinstance(row, list): parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] if parts: return f"{idx}. {' | '.join(parts)}\n" return "" def load_table_documents(repo_id, hf_token, table_dir): log_message("Loading tables...") files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) table_files = [f for f in files if f.startswith(table_dir) and f.endswith('.json')] all_chunks = [] for file_path in table_files: try: local_path = hf_hub_download( repo_id=repo_id, filename=file_path, repo_type="dataset", token=hf_token ) with open(local_path, 'r', encoding='utf-8') as f: data = json.load(f) file_doc_id = data.get('document_id', data.get('document', 'unknown')) for sheet in data.get('sheets', []): sheet_doc_id = sheet.get('document_id', sheet.get('document', file_doc_id)) # USE NEW ADAPTIVE CHUNKING chunks = chunk_table_by_rows(sheet, sheet_doc_id, max_chars=3072) all_chunks.extend(chunks) log_message(f" 📄 {sheet_doc_id}: {len(chunks)} chunks") except Exception as e: log_message(f"Error loading {file_path}: {e}") log_message(f"✓ Loaded {len(all_chunks)} table chunks") return all_chunks def load_json_documents(repo_id, hf_token, json_dir): import zipfile import tempfile import os log_message("Loading JSON documents...") files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')] zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')] log_message(f"Found {len(json_files)} JSON files and {len(zip_files)} ZIP files") documents = [] stats = {'success': 0, 'failed': 0, 'empty': 0} for file_path in json_files: try: log_message(f" Loading: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, repo_type="dataset", token=hf_token ) docs = extract_sections_from_json(local_path) if docs: documents.extend(docs) stats['success'] += 1 log_message(f" ✓ Extracted {len(docs)} sections") else: stats['empty'] += 1 log_message(f" ⚠ No sections found") except Exception as e: stats['failed'] += 1 log_message(f" ✗ Error: {e}") for zip_path in zip_files: try: log_message(f" Processing ZIP: {zip_path}") local_zip = hf_hub_download( repo_id=repo_id, filename=zip_path, repo_type="dataset", token=hf_token ) with zipfile.ZipFile(local_zip, 'r') as zf: json_files_in_zip = [f for f in zf.namelist() if f.endswith('.json') and not f.startswith('__MACOSX') and not f.startswith('.') and not '._' in f] log_message(f" Found {len(json_files_in_zip)} JSON files in ZIP") for json_file in json_files_in_zip: try: file_content = zf.read(json_file) # Skip if file is too small if len(file_content) < 10: log_message(f" ✗ Skipping: {json_file} (file too small)") stats['failed'] += 1 continue # Try UTF-8 first (most common) try: text_content = file_content.decode('utf-8') except UnicodeDecodeError: try: text_content = file_content.decode('utf-8-sig') except UnicodeDecodeError: try: # Try UTF-16 (the issue you're seeing) text_content = file_content.decode('utf-16') except UnicodeDecodeError: try: text_content = file_content.decode('windows-1251') except UnicodeDecodeError: log_message(f" ✗ Skipping: {json_file} (encoding failed)") stats['failed'] += 1 continue # Validate JSON structure if not text_content.strip().startswith('{') and not text_content.strip().startswith('['): log_message(f" ✗ Skipping: {json_file} (not valid JSON)") stats['failed'] += 1 continue with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json', encoding='utf-8') as tmp: tmp.write(text_content) tmp_path = tmp.name docs = extract_sections_from_json(tmp_path) if docs: documents.extend(docs) stats['success'] += 1 log_message(f" ✓ {json_file}: {len(docs)} sections") else: stats['empty'] += 1 log_message(f" ⚠ {json_file}: No sections") os.unlink(tmp_path) except json.JSONDecodeError as e: stats['failed'] += 1 log_message(f" ✗ {json_file}: Invalid JSON") except Exception as e: stats['failed'] += 1 log_message(f" ✗ {json_file}: {str(e)[:100]}") except Exception as e: log_message(f" ✗ Error with ZIP: {e}") log_message(f"="*60) log_message(f"JSON Loading Stats:") log_message(f" Success: {stats['success']}") log_message(f" Empty: {stats['empty']}") log_message(f" Failed: {stats['failed']}") log_message(f" Total sections: {len(documents)}") log_message(f"="*60) return documents def extract_sections_from_json(json_path): """Extract sections from a single JSON file""" documents = [] try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) doc_id = data.get('document_metadata', {}).get('document_id', 'unknown') # Extract all section levels for section in data.get('sections', []): if section.get('section_text', '').strip(): documents.append(Document( text=section['section_text'], metadata={ 'type': 'text', 'document_id': doc_id, 'section_id': section.get('section_id', '') } )) # Subsections for subsection in section.get('subsections', []): if subsection.get('subsection_text', '').strip(): documents.append(Document( text=subsection['subsection_text'], metadata={ 'type': 'text', 'document_id': doc_id, 'section_id': subsection.get('subsection_id', '') } )) # Sub-subsections for sub_sub in subsection.get('sub_subsections', []): if sub_sub.get('sub_subsection_text', '').strip(): documents.append(Document( text=sub_sub['sub_subsection_text'], metadata={ 'type': 'text', 'document_id': doc_id, 'section_id': sub_sub.get('sub_subsection_id', '') } )) except Exception as e: log_message(f"Error extracting from {json_path}: {e}") return documents def load_image_data(repo_id, hf_token, image_data_dir): log_message("Начинаю загрузку данных изображений") image_files = [] try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) for file in files: if file.startswith(image_data_dir) and file.endswith('.csv'): image_files.append(file) log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") image_documents = [] for file_path in image_files: try: log_message(f"Обрабатываю файл изображений: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) df = pd.read_csv(local_path) log_message(f"Загружено {len(df)} записей изображений из файла {file_path}") # Обработка с правильными названиями колонок for _, row in df.iterrows(): section_value = row.get('Раздел документа', 'Неизвестно') content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" # Опечатка в названии колонки content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" content += f"Раздел: {section_value}\n" content += f"Файл: {row.get('Файл изображения', 'Неизвестно')}\n" doc = Document( text=content, metadata={ "type": "image", "image_number": str(row.get('№ Изображения', 'unknown')), "image_title": str(row.get('Название изображения', 'unknown')), "image_description": str(row.get('Описание изображение', 'unknown')), "document_id": str(row.get('Обозначение документа', 'unknown')), "file_path": str(row.get('Файл изображения', 'unknown')), "section": str(section_value), "section_id": str(section_value) } ) image_documents.append(doc) except Exception as e: log_message(f"Ошибка обработки файла {file_path}: {str(e)}") continue log_message(f"Создано {len(image_documents)} документов из изображений") return image_documents except Exception as e: log_message(f"Ошибка загрузки данных изображений: {str(e)}") return [] def load_all_documents(repo_id, hf_token, json_dir, table_dir, image_dir): """Main loader - combines all document types""" log_message("="*60) log_message("STARTING DOCUMENT LOADING") log_message("="*60) # Load text sections text_docs = load_json_documents(repo_id, hf_token, json_dir) text_chunks = chunk_text_documents(text_docs) # Load tables (already chunked) table_chunks = load_table_documents(repo_id, hf_token, table_dir) # Load images (no chunking needed) image_docs = load_image_data(repo_id, hf_token, image_dir) all_docs = text_chunks + table_chunks + image_docs log_message("="*60) log_message(f"TOTAL DOCUMENTS: {len(all_docs)}") log_message(f" Text chunks: {len(text_chunks)}") log_message(f" Table chunks: {len(table_chunks)}") log_message(f" Images: {len(image_docs)}") log_message("="*60) return all_docs