import json import zipfile import pandas as pd from collections import Counter from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from llama_index.core.text_splitter import SentenceSplitter from my_logging import log_message from config import CHUNK_SIZE, CHUNK_OVERLAP # ============================================================================ # TEXT CHUNKING # ============================================================================ def chunk_text_document(doc): """Split text document into chunks using sentence splitter""" text_splitter = SentenceSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator=" " ) text_chunks = text_splitter.split_text(doc.text) chunked_docs = [] for i, chunk_text in enumerate(text_chunks): chunk_metadata = doc.metadata.copy() chunk_metadata.update({ "chunk_id": i, "total_chunks": len(text_chunks), "chunk_size": len(chunk_text) }) chunked_docs.append(Document(text=chunk_text, metadata=chunk_metadata)) return chunked_docs # ============================================================================ # TABLE PROCESSING # ============================================================================ def extract_table_metadata(table_text): """Extract key terms from table for enrichment""" words = table_text.split() # Filter stopwords and short words stopwords = {"и", "в", "на", "по", "с", "для", "из", "при", "а", "как", "или", "но", "к", "от"} filtered = [w for w in words if len(w) > 3 and w.lower() not in stopwords] # Get top 15 most common terms common = Counter(filtered).most_common(15) key_terms = [w for w, _ in common] return { "summary": f"Таблица содержит {len(words)} слов", "key_terms": key_terms } def create_table_content(table_data): """Format table data as text""" doc_id = table_data.get('document_id', table_data.get('document', 'Неизвестно')) table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') content = f"Таблица: {table_num}\n" content += f"Название: {table_title}\n" content += f"Документ: {doc_id}\n" content += f"Раздел: {section}\n" # Add headers headers = table_data.get('headers', []) if headers: content += f"\nЗаголовки: {' | '.join(headers)}\n" # Add data rows if 'data' in table_data and isinstance(table_data['data'], list): content += "\nДанные таблицы:\n" for row_idx, row in enumerate(table_data['data'], start=1): if isinstance(row, dict): row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v]) content += f"Строка {row_idx}: {row_text}\n" return content def chunk_table_by_rows(doc): """Split large table into chunks by rows, preserving headers""" # Extract metadata table_metadata = extract_table_metadata(doc.text) table_num = doc.metadata.get('table_number', 'unknown') table_title = doc.metadata.get('table_title', 'unknown') # Parse table structure lines = doc.text.strip().split('\n') # Separate header and data rows table_header_lines = [] data_rows = [] in_data = False for line in lines: if line.startswith('Данные таблицы:'): in_data = True table_header_lines.append(line) elif in_data and line.startswith('Строка'): data_rows.append(line) elif not in_data: table_header_lines.append(line) table_header = '\n'.join(table_header_lines) + '\n' # If no rows, use standard text splitting if not data_rows: log_message(f" ⚠️ Таблица {table_num}: нет строк данных, использую стандартное разбиение") return chunk_text_document(doc) log_message(f" 📋 Таблица {table_num}: найдено {len(data_rows)} строк данных") # Row-based chunking header_size = len(table_header) available_size = CHUNK_SIZE - header_size - 300 # Reserve space for enrichment text_chunks = [] current_chunk_rows = [] current_size = 0 for row in data_rows: row_size = len(row) + 1 # If adding this row exceeds limit, create chunk if current_size + row_size > available_size and current_chunk_rows: chunk_text = table_header + '\n'.join(current_chunk_rows) text_chunks.append(chunk_text) log_message(f" ✂️ Создан чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов") # Keep last 2 rows for overlap overlap_count = min(2, len(current_chunk_rows)) current_chunk_rows = current_chunk_rows[-overlap_count:] current_size = sum(len(r) + 1 for r in current_chunk_rows) current_chunk_rows.append(row) current_size += row_size # Final chunk if current_chunk_rows: chunk_text = table_header + '\n'.join(current_chunk_rows) text_chunks.append(chunk_text) log_message(f" ✂️ Последний чанк: {len(current_chunk_rows)} строк, {len(chunk_text)} символов") log_message(f" 📊 Таблица {table_num} разделена на {len(text_chunks)} чанков") # Create enriched chunks with metadata chunked_docs = [] key_terms = table_metadata.get("key_terms", []) for i, chunk_text in enumerate(text_chunks): chunk_metadata = doc.metadata.copy() chunk_metadata.update({ "chunk_id": i, "total_chunks": len(text_chunks), "chunk_size": len(chunk_text), "is_chunked": True, "key_terms": key_terms }) # Add enrichment prefix terms_str = ', '.join(key_terms[:10]) if key_terms else 'нет' enriched_text = f"""[Таблица {table_num}: {table_title}] [Ключевые термины: {terms_str}] {chunk_text}""" chunked_docs.append(Document(text=enriched_text, metadata=chunk_metadata)) return chunked_docs def table_to_document(table_data, document_id=None): """Convert table data to Document, chunking if needed""" if not isinstance(table_data, dict): log_message(f"⚠️ ПРОПУЩЕНА: table_data не является словарем") return [] doc_id = document_id or table_data.get('document_id') or table_data.get('document', 'Неизвестно') table_num = table_data.get('table_number', 'Неизвестно') table_title = table_data.get('table_title', 'Неизвестно') section = table_data.get('section', 'Неизвестно') table_rows = table_data.get('data', []) if not table_rows: log_message(f"⚠️ ПРОПУЩЕНА: Таблица {table_num} - нет данных") return [] content = create_table_content(table_data) content_size = len(content) base_doc = Document( text=content, metadata={ "type": "table", "table_number": table_num, "table_title": table_title, "document_id": doc_id, "section": section, "section_id": section, "total_rows": len(table_rows), "content_size": content_size } ) # Chunk if needed if content_size > CHUNK_SIZE: log_message(f"📊 CHUNKING: Таблица {table_num} | Размер: {content_size} > {CHUNK_SIZE}") return chunk_table_by_rows(base_doc) else: log_message(f"✓ Таблица {table_num} | Размер: {content_size} символов | Строк: {len(table_rows)}") return [base_doc] def load_table_data(repo_id, hf_token, table_data_dir): """Load all table data from HuggingFace repo""" log_message("=" * 60) log_message("ЗАГРУЗКА ТАБЛИЧНЫХ ДАННЫХ") log_message("=" * 60) try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) table_files = [f for f in files if f.startswith(table_data_dir) and f.endswith('.json')] log_message(f"Найдено {len(table_files)} JSON файлов с таблицами") table_documents = [] for file_path in table_files: try: local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) log_message(f"\nОбработка файла: {file_path}") with open(local_path, 'r', encoding='utf-8') as f: table_data = json.load(f) if isinstance(table_data, dict): document_id = table_data.get('document', 'unknown') # Process sheets if present if 'sheets' in table_data: sorted_sheets = sorted( table_data['sheets'], key=lambda sheet: sheet.get('table_number', '') ) for sheet in sorted_sheets: sheet['document'] = document_id docs_list = table_to_document(sheet, document_id) table_documents.extend(docs_list) else: docs_list = table_to_document(table_data, document_id) table_documents.extend(docs_list) except Exception as e: log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}") continue log_message(f"\n{'='*60}") log_message(f"Загружено {len(table_documents)} табличных документов") log_message("=" * 60) return table_documents except Exception as e: log_message(f"❌ ОШИБКА загрузки таблиц: {str(e)}") return [] # ============================================================================ # JSON TEXT DOCUMENTS # ============================================================================ def extract_section_title(section_text): """Extract clean title from section text""" if not section_text.strip(): return "" first_line = section_text.strip().split('\n')[0].strip() if len(first_line) < 200 and not first_line.endswith('.'): return first_line sentences = first_line.split('.') if len(sentences) > 1: return sentences[0].strip() return first_line[:100] + "..." if len(first_line) > 100 else first_line def extract_text_from_json(data, document_id, document_name): """Extract text documents from JSON structure""" documents = [] if 'sections' not in data: return documents for section in data['sections']: section_id = section.get('section_id', 'Unknown') section_text = section.get('section_text', '') if section_text.strip(): section_title = extract_section_title(section_text) doc = Document( text=section_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "section_text": section_title[:200], "section_path": section_id, "level": "section" } ) documents.append(doc) # Process subsections recursively if 'subsections' in section: for subsection in section['subsections']: subsection_id = subsection.get('subsection_id', 'Unknown') subsection_text = subsection.get('subsection_text', '') if subsection_text.strip(): subsection_title = extract_section_title(subsection_text) doc = Document( text=subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": subsection_id, "section_text": subsection_title[:200], "section_path": f"{section_id}.{subsection_id}", "level": "subsection", "parent_section": section_id } ) documents.append(doc) return documents def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): """Load JSON documents from HuggingFace repo""" log_message("=" * 60) log_message("ЗАГРУЗКА JSON ДОКУМЕНТОВ") log_message("=" * 60) try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')] log_message(f"Найдено {len(zip_files)} ZIP файлов и {len(json_files)} JSON файлов") all_documents = [] # Process ZIP files for zip_file_path in zip_files: try: log_message(f"Загружаю ZIP: {zip_file_path}") local_zip_path = hf_hub_download( repo_id=repo_id, filename=zip_file_path, local_dir=download_dir, repo_type="dataset", token=hf_token ) with zipfile.ZipFile(local_zip_path, 'r') as zip_ref: json_files_in_zip = [f for f in zip_ref.namelist() if f.endswith('.json') and not f.startswith('__MACOSX')] for json_file in json_files_in_zip: with zip_ref.open(json_file) as f: json_data = json.load(f) metadata = json_data.get('document_metadata', {}) doc_id = metadata.get('document_id', 'unknown') doc_name = metadata.get('document_name', 'unknown') docs = extract_text_from_json(json_data, doc_id, doc_name) all_documents.extend(docs) log_message(f"Извлечено документов из ZIP: {len(all_documents)}") except Exception as e: log_message(f"❌ ОШИБКА ZIP {zip_file_path}: {str(e)}") continue # Process direct JSON files for file_path in json_files: try: local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir=download_dir, repo_type="dataset", token=hf_token ) with open(local_path, 'r', encoding='utf-8') as f: json_data = json.load(f) metadata = json_data.get('document_metadata', {}) doc_id = metadata.get('document_id', 'unknown') doc_name = metadata.get('document_name', 'unknown') docs = extract_text_from_json(json_data, doc_id, doc_name) all_documents.extend(docs) except Exception as e: log_message(f"❌ ОШИБКА JSON {file_path}: {str(e)}") continue log_message(f"Всего загружено {len(all_documents)} текстовых документов") # Chunk all documents chunked_documents, chunk_info = process_documents_with_chunking(all_documents) log_message(f"После chunking: {len(chunked_documents)} чанков") log_message("=" * 60) return chunked_documents, chunk_info except Exception as e: log_message(f"❌ ОШИБКА загрузки JSON: {str(e)}") return [], [] # ============================================================================ # IMAGE DATA # ============================================================================ def load_image_data(repo_id, hf_token, image_data_dir): """Load image metadata from CSV files""" log_message("=" * 60) log_message("ЗАГРУЗКА ДАННЫХ ИЗОБРАЖЕНИЙ") log_message("=" * 60) try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) image_files = [f for f in files if f.startswith(image_data_dir) and f.endswith('.csv')] log_message(f"Найдено {len(image_files)} CSV файлов с изображениями") image_documents = [] for file_path in image_files: try: local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) df = pd.read_csv(local_path) log_message(f"Загружено {len(df)} изображений из {file_path}") for _, row in df.iterrows(): content = f"Изображение: {row.get('№ Изображения', 'Неизвестно')}\n" content += f"Название: {row.get('Название изображения', 'Неизвестно')}\n" content += f"Описание: {row.get('Описание изображение', 'Неизвестно')}\n" content += f"Документ: {row.get('Обозначение документа', 'Неизвестно')}\n" content += f"Раздел: {row.get('Раздел документа', 'Неизвестно')}\n" doc = Document( text=content, metadata={ "type": "image", "image_number": str(row.get('№ Изображения', 'unknown')), "image_title": str(row.get('Название изображения', 'unknown')), "document_id": str(row.get('Обозначение документа', 'unknown')), "section": str(row.get('Раздел документа', 'unknown')) } ) image_documents.append(doc) except Exception as e: log_message(f"❌ ОШИБКА файла {file_path}: {str(e)}") continue log_message(f"Загружено {len(image_documents)} документов изображений") log_message("=" * 60) return image_documents except Exception as e: log_message(f"❌ ОШИБКА загрузки изображений: {str(e)}") return [] # ============================================================================ # DOCUMENT PROCESSING WITH CHUNKING # ============================================================================ def process_documents_with_chunking(documents): """Process all documents and chunk if needed""" all_chunked_docs = [] chunk_info = [] stats = { 'text_chunks': 0, 'table_whole': 0, 'table_chunks': 0, 'image_whole': 0, 'image_chunks': 0 } for doc in documents: doc_type = doc.metadata.get('type', 'text') is_already_chunked = doc.metadata.get('is_chunked', False) doc_size = len(doc.text) # Tables - already chunked or whole if doc_type == 'table': if is_already_chunked: stats['table_chunks'] += 1 else: stats['table_whole'] += 1 all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': doc.metadata.get('chunk_id', 0), 'total_chunks': doc.metadata.get('total_chunks', 1), 'chunk_size': doc_size, 'chunk_preview': doc.text[:200] + "..." if doc_size > 200 else doc.text, 'type': 'table', 'table_number': doc.metadata.get('table_number', 'unknown') }) # Images - chunk if too large elif doc_type == 'image': if doc_size > CHUNK_SIZE: log_message(f"📷 CHUNKING: Изображение {doc.metadata.get('image_number')} | Размер: {doc_size}") chunked_docs = chunk_text_document(doc) stats['image_chunks'] += len(chunked_docs) all_chunked_docs.extend(chunked_docs) for i, chunk_doc in enumerate(chunked_docs): chunk_info.append({ 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), 'chunk_id': i, 'chunk_size': len(chunk_doc.text), 'chunk_preview': chunk_doc.text[:200] + "...", 'type': 'image', 'image_number': chunk_doc.metadata.get('image_number', 'unknown') }) else: stats['image_whole'] += 1 all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': doc_size, 'chunk_preview': doc.text[:200] + "...", 'type': 'image', 'image_number': doc.metadata.get('image_number', 'unknown') }) # Text - chunk if too large else: if doc_size > CHUNK_SIZE: log_message(f"📝 CHUNKING: Текст '{doc.metadata.get('document_id')}' | Размер: {doc_size}") chunked_docs = chunk_text_document(doc) stats['text_chunks'] += len(chunked_docs) all_chunked_docs.extend(chunked_docs) for i, chunk_doc in enumerate(chunked_docs): chunk_info.append({ 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), 'chunk_id': i, 'chunk_size': len(chunk_doc.text), 'chunk_preview': chunk_doc.text[:200] + "...", 'type': 'text' }) else: all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': doc_size, 'chunk_preview': doc.text[:200] + "...", 'type': 'text' }) # Log summary log_message(f"\n{'='*60}") log_message("ИТОГОВАЯ СТАТИСТИКА:") log_message(f" • Текстовые чанки: {stats['text_chunks']}") log_message(f" • Таблицы (целые): {stats['table_whole']}") log_message(f" • Таблицы (чанки): {stats['table_chunks']}") log_message(f" • Изображения (целые): {stats['image_whole']}") log_message(f" • Изображения (чанки): {stats['image_chunks']}") log_message(f" • ВСЕГО ДОКУМЕНТОВ: {len(all_chunked_docs)}") log_message(f"{'='*60}\n") return all_chunked_docs, chunk_info # ============================================================================ # CSV CHUNKS (Legacy support) # ============================================================================ def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): """Load pre-chunked data from CSV (legacy support)""" log_message("Загрузка данных из CSV") try: chunks_csv_path = hf_hub_download( repo_id=repo_id, filename=chunks_filename, local_dir=download_dir, repo_type="dataset", token=hf_token ) chunks_df = pd.read_csv(chunks_csv_path) log_message(f"Загружено {len(chunks_df)} чанков из CSV") # Find text column text_column = None for col in chunks_df.columns: if any(keyword in col.lower() for keyword in ['text', 'content', 'chunk']): text_column = col break if text_column is None: text_column = chunks_df.columns[0] documents = [] for i, (_, row) in enumerate(chunks_df.iterrows()): doc = Document( text=str(row[text_column]), metadata={ "chunk_id": row.get('chunk_id', i), "document_id": row.get('document_id', 'unknown'), "type": "text" } ) documents.append(doc) log_message(f"Создано {len(documents)} документов из CSV") return documents, chunks_df except Exception as e: log_message(f"❌ ОШИБКА загрузки CSV: {str(e)}") return [], None