import json import zipfile import pandas as pd from huggingface_hub import hf_hub_download, list_repo_files from llama_index.core import Document from my_logging import log_message from llama_index.core.text_splitter import SentenceSplitter from config import CHUNK_SIZE, CHUNK_OVERLAP from table_prep import table_to_document, load_table_data def chunk_document(doc, chunk_size=None, chunk_overlap=None): if chunk_size is None: chunk_size = CHUNK_SIZE if chunk_overlap is None: chunk_overlap = CHUNK_OVERLAP text_splitter = SentenceSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separator=" " ) text_chunks = text_splitter.split_text(doc.text) chunked_docs = [] for i, chunk_text in enumerate(text_chunks): chunk_metadata = doc.metadata.copy() chunk_metadata.update({ "chunk_id": i, "total_chunks": len(text_chunks), "chunk_size": len(chunk_text), "original_doc_id": doc.id_ if hasattr(doc, 'id_') else None }) chunked_doc = Document( text=chunk_text, metadata=chunk_metadata ) chunked_docs.append(chunked_doc) return chunked_docs def process_documents_with_chunking(documents): all_chunked_docs = [] chunk_info = [] table_count = 0 table_chunks_count = 0 image_count = 0 image_chunks_count = 0 text_chunks_count = 0 for doc in documents: doc_type = doc.metadata.get('type', 'text') is_already_chunked = doc.metadata.get('is_chunked', False) if doc_type == 'table': if is_already_chunked: table_chunks_count += 1 all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': doc.metadata.get('chunk_id', 0), 'total_chunks': doc.metadata.get('total_chunks', 1), 'chunk_size': len(doc.text), 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'table', 'table_number': doc.metadata.get('table_number', 'unknown') }) else: table_count += 1 all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': len(doc.text), 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'table', 'table_number': doc.metadata.get('table_number', 'unknown') }) elif doc_type == 'image': image_count += 1 doc_size = len(doc.text) if doc_size > CHUNK_SIZE: log_message(f"πŸ“· CHUNKING: Π˜Π·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠ΅ {doc.metadata.get('image_number', 'unknown')} | " f"Π Π°Π·ΠΌΠ΅Ρ€: {doc_size} > {CHUNK_SIZE}") chunked_docs = chunk_document(doc) image_chunks_count += len(chunked_docs) all_chunked_docs.extend(chunked_docs) log_message(f" βœ‚οΈ Π Π°Π·Π΄Π΅Π»Π΅Π½ΠΎ Π½Π° {len(chunked_docs)} Ρ‡Π°Π½ΠΊΠΎΠ²") for i, chunk_doc in enumerate(chunked_docs): chunk_info.append({ 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), 'chunk_id': i, 'chunk_size': len(chunk_doc.text), 'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text, 'type': 'image', 'image_number': chunk_doc.metadata.get('image_number', 'unknown') }) else: all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': doc_size, 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'image', 'image_number': doc.metadata.get('image_number', 'unknown') }) else: doc_size = len(doc.text) if doc_size > CHUNK_SIZE: log_message(f"πŸ“ CHUNKING: ВСкст ΠΈΠ· '{doc.metadata.get('document_id', 'unknown')}' | " f"Π Π°Π·ΠΌΠ΅Ρ€: {doc_size} > {CHUNK_SIZE}") chunked_docs = chunk_document(doc) text_chunks_count += len(chunked_docs) all_chunked_docs.extend(chunked_docs) log_message(f" βœ‚οΈ Π Π°Π·Π΄Π΅Π»Π΅Π½ Π½Π° {len(chunked_docs)} Ρ‡Π°Π½ΠΊΠΎΠ²") for i, chunk_doc in enumerate(chunked_docs): chunk_info.append({ 'document_id': chunk_doc.metadata.get('document_id', 'unknown'), 'section_id': chunk_doc.metadata.get('section_id', 'unknown'), 'chunk_id': i, 'chunk_size': len(chunk_doc.text), 'chunk_preview': chunk_doc.text[:200] + "..." if len(chunk_doc.text) > 200 else chunk_doc.text, 'type': 'text' }) else: all_chunked_docs.append(doc) chunk_info.append({ 'document_id': doc.metadata.get('document_id', 'unknown'), 'section_id': doc.metadata.get('section_id', 'unknown'), 'chunk_id': 0, 'chunk_size': doc_size, 'chunk_preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text, 'type': 'text' }) log_message(f"\n{'='*60}") log_message(f"Π˜Π’ΠžΠ“Πž ΠžΠ‘Π ΠΠ‘ΠžΠ’ΠΠΠž Π”ΠžΠšΠ£ΠœΠ•ΠΠ’ΠžΠ’:") log_message(f" β€’ Π’Π°Π±Π»ΠΈΡ†Ρ‹ (Ρ†Π΅Π»Ρ‹Π΅): {table_count}") log_message(f" β€’ Π’Π°Π±Π»ΠΈΡ†Ρ‹ (Ρ‡Π°Π½ΠΊΠΈ): {table_chunks_count}") log_message(f" β€’ Π˜Π·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΡ (Ρ†Π΅Π»Ρ‹Π΅): {image_count - (image_chunks_count > 0)}") log_message(f" β€’ Π˜Π·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΡ (Ρ‡Π°Π½ΠΊΠΈ): {image_chunks_count}") log_message(f" β€’ ВСкстовыС Ρ‡Π°Π½ΠΊΠΈ: {text_chunks_count}") log_message(f" β€’ ВсСго Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ²: {len(all_chunked_docs)}") log_message(f"{'='*60}\n") return all_chunked_docs, chunk_info def extract_text_from_json(data, document_id, document_name): documents = [] if 'sections' in data: for section in data['sections']: section_id = section.get('section_id', 'Unknown') section_text = section.get('section_text', '') section_path = f"{section_id}" section_title = extract_section_title(section_text) if section_text.strip(): doc = Document( text=section_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": section_id, "section_text": section_title[:200], "section_path": section_path, "level": "section" } ) documents.append(doc) if 'subsections' in section: for subsection in section['subsections']: subsection_id = subsection.get('subsection_id', 'Unknown') subsection_text = subsection.get('subsection_text', '') subsection_title = extract_section_title(subsection_text) subsection_path = f"{section_path}.{subsection_id}" if subsection_text.strip(): doc = Document( text=subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": subsection_id, "section_text": subsection_title[:200], "section_path": subsection_path, "level": "subsection", "parent_section": section_id, "parent_title": section_title[:100] } ) documents.append(doc) if 'sub_subsections' in subsection: for sub_subsection in subsection['sub_subsections']: sub_subsection_id = sub_subsection.get('sub_subsection_id', 'Unknown') sub_subsection_text = sub_subsection.get('sub_subsection_text', '') sub_subsection_title = extract_section_title(sub_subsection_text) sub_subsection_path = f"{subsection_path}.{sub_subsection_id}" if sub_subsection_text.strip(): doc = Document( text=sub_subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": sub_subsection_id, "section_text": sub_subsection_title[:200], "section_path": sub_subsection_path, "level": "sub_subsection", "parent_section": subsection_id, "parent_title": subsection_title[:100] } ) documents.append(doc) if 'sub_sub_subsections' in sub_subsection: for sub_sub_subsection in sub_subsection['sub_sub_subsections']: sub_sub_subsection_id = sub_sub_subsection.get('sub_sub_subsection_id', 'Unknown') sub_sub_subsection_text = sub_sub_subsection.get('sub_sub_subsection_text', '') sub_sub_subsection_title = extract_section_title(sub_sub_subsection_text) if sub_sub_subsection_text.strip(): doc = Document( text=sub_sub_subsection_text, metadata={ "type": "text", "document_id": document_id, "document_name": document_name, "section_id": sub_sub_subsection_id, "section_text": sub_sub_subsection_title[:200], "section_path": f"{sub_subsection_path}.{sub_sub_subsection_id}", "level": "sub_sub_subsection", "parent_section": sub_subsection_id, "parent_title": sub_subsection_title[:100] } ) documents.append(doc) return documents def load_json_documents(repo_id, hf_token, json_files_dir, download_dir): log_message("ΠΠ°Ρ‡ΠΈΠ½Π°ΡŽ Π·Π°Π³Ρ€ΡƒΠ·ΠΊΡƒ JSON Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ²") try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) zip_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.zip')] json_files = [f for f in files if f.startswith(json_files_dir) and f.endswith('.json')] log_message(f"НайдСно {len(zip_files)} ZIP Ρ„Π°ΠΉΠ»ΠΎΠ² ΠΈ {len(json_files)} прямых JSON Ρ„Π°ΠΉΠ»ΠΎΠ²") all_documents = [] for zip_file_path in zip_files: try: log_message(f"Π—Π°Π³Ρ€ΡƒΠΆΠ°ΡŽ ZIP Π°Ρ€Ρ…ΠΈΠ²: {zip_file_path}") local_zip_path = hf_hub_download( repo_id=repo_id, filename=zip_file_path, local_dir=download_dir, repo_type="dataset", token=hf_token ) documents = extract_zip_and_process_json(local_zip_path) all_documents.extend(documents) log_message(f"Π˜Π·Π²Π»Π΅Ρ‡Π΅Π½ΠΎ {len(documents)} Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ· ZIP Π°Ρ€Ρ…ΠΈΠ²Π° {zip_file_path}") except Exception as e: log_message(f"Ошибка ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ ZIP Ρ„Π°ΠΉΠ»Π° {zip_file_path}: {str(e)}") continue for file_path in json_files: try: log_message(f"ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°ΡŽ прямой JSON Ρ„Π°ΠΉΠ»: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir=download_dir, repo_type="dataset", token=hf_token ) with open(local_path, 'r', encoding='utf-8') as f: json_data = json.load(f) document_metadata = json_data.get('document_metadata', {}) document_id = document_metadata.get('document_id', 'unknown') document_name = document_metadata.get('document_name', 'unknown') documents = extract_text_from_json(json_data, document_id, document_name) all_documents.extend(documents) log_message(f"Π˜Π·Π²Π»Π΅Ρ‡Π΅Π½ΠΎ {len(documents)} Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ· {file_path}") except Exception as e: log_message(f"Ошибка ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ Ρ„Π°ΠΉΠ»Π° {file_path}: {str(e)}") continue log_message(f"ВсСго создано {len(all_documents)} исходных Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ· JSON Ρ„Π°ΠΉΠ»ΠΎΠ²") # Process documents through chunking function chunked_documents, chunk_info = process_documents_with_chunking(all_documents) log_message(f"ПослС chunking ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΎ {len(chunked_documents)} Ρ‡Π°Π½ΠΊΠΎΠ² ΠΈΠ· JSON Π΄Π°Π½Π½Ρ‹Ρ…") return chunked_documents, chunk_info except Exception as e: log_message(f"Ошибка Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ JSON Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ²: {str(e)}") return [], [] def extract_section_title(section_text): if not section_text.strip(): return "" lines = section_text.strip().split('\n') first_line = lines[0].strip() if len(first_line) < 200 and not first_line.endswith('.'): return first_line # Otherwise, extract first sentence sentences = first_line.split('.') if len(sentences) > 1: return sentences[0].strip() return first_line[:100] + "..." if len(first_line) > 100 else first_line def extract_zip_and_process_json(zip_path): documents = [] try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_files = zip_ref.namelist() json_files = [f for f in zip_files if f.endswith('.json') and not f.startswith('__MACOSX')] log_message(f"НайдСно {len(json_files)} JSON Ρ„Π°ΠΉΠ»ΠΎΠ² Π² Π°Ρ€Ρ…ΠΈΠ²Π΅") for json_file in json_files: try: log_message(f"ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°ΡŽ Ρ„Π°ΠΉΠ» ΠΈΠ· Π°Ρ€Ρ…ΠΈΠ²Π°: {json_file}") with zip_ref.open(json_file) as f: json_data = json.load(f) document_metadata = json_data.get('document_metadata', {}) document_id = document_metadata.get('document_id', 'unknown') document_name = document_metadata.get('document_name', 'unknown') docs = extract_text_from_json(json_data, document_id, document_name) documents.extend(docs) log_message(f"Π˜Π·Π²Π»Π΅Ρ‡Π΅Π½ΠΎ {len(docs)} Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ· {json_file}") except Exception as e: log_message(f"Ошибка ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ Ρ„Π°ΠΉΠ»Π° {json_file}: {str(e)}") continue except Exception as e: log_message(f"Ошибка извлСчСния ZIP Π°Ρ€Ρ…ΠΈΠ²Π° {zip_path}: {str(e)}") return documents def load_image_data(repo_id, hf_token, image_data_dir): log_message("ΠΠ°Ρ‡ΠΈΠ½Π°ΡŽ Π·Π°Π³Ρ€ΡƒΠ·ΠΊΡƒ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠΉ") image_files = [] try: files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) for file in files: if file.startswith(image_data_dir) and file.endswith('.csv'): image_files.append(file) log_message(f"НайдСно {len(image_files)} CSV Ρ„Π°ΠΉΠ»ΠΎΠ² с изобраТСниями") image_documents = [] for file_path in image_files: try: log_message(f"ΠžΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°ΡŽ Ρ„Π°ΠΉΠ» ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠΉ: {file_path}") local_path = hf_hub_download( repo_id=repo_id, filename=file_path, local_dir='', repo_type="dataset", token=hf_token ) df = pd.read_csv(local_path) log_message(f"Π—Π°Π³Ρ€ΡƒΠΆΠ΅Π½ΠΎ {len(df)} записСй ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠΉ ΠΈΠ· Ρ„Π°ΠΉΠ»Π° {file_path}") # ΠžΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° с ΠΏΡ€Π°Π²ΠΈΠ»ΡŒΠ½Ρ‹ΠΌΠΈ названиями ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ for _, row in df.iterrows(): section_value = row.get('Π Π°Π·Π΄Π΅Π» Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°', 'НСизвСстно') content = f"Π˜Π·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠ΅: {row.get('β„– Π˜Π·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΡ', 'НСизвСстно')}\n" content += f"НазваниС: {row.get('НазваниС изобраТСния', 'НСизвСстно')}\n" content += f"ОписаниС: {row.get('ОписаниС ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠ΅', 'НСизвСстно')}\n" # ΠžΠΏΠ΅Ρ‡Π°Ρ‚ΠΊΠ° Π² Π½Π°Π·Π²Π°Π½ΠΈΠΈ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ content += f"Π”ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚: {row.get('ΠžΠ±ΠΎΠ·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°', 'НСизвСстно')}\n" content += f"Π Π°Π·Π΄Π΅Π»: {section_value}\n" content += f"Π€Π°ΠΉΠ»: {row.get('Π€Π°ΠΉΠ» изобраТСния', 'НСизвСстно')}\n" doc = Document( text=content, metadata={ "type": "image", "image_number": str(row.get('β„– Π˜Π·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΡ', 'unknown')), "image_title": str(row.get('НазваниС изобраТСния', 'unknown')), "image_description": str(row.get('ОписаниС ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠ΅', 'unknown')), "document_id": str(row.get('ΠžΠ±ΠΎΠ·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°', 'unknown')), "file_path": str(row.get('Π€Π°ΠΉΠ» изобраТСния', 'unknown')), "section": str(section_value), "section_id": str(section_value) } ) image_documents.append(doc) except Exception as e: log_message(f"Ошибка ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ Ρ„Π°ΠΉΠ»Π° {file_path}: {str(e)}") continue log_message(f"Π‘ΠΎΠ·Π΄Π°Π½ΠΎ {len(image_documents)} Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ· ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠΉ") return image_documents except Exception as e: log_message(f"Ошибка Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ·ΠΎΠ±Ρ€Π°ΠΆΠ΅Π½ΠΈΠΉ: {str(e)}") return [] def load_csv_chunks(repo_id, hf_token, chunks_filename, download_dir): log_message("Π—Π°Π³Ρ€ΡƒΠΆΠ°ΡŽ Π΄Π°Π½Π½Ρ‹Π΅ Ρ‡Π°Π½ΠΊΠΎΠ² ΠΈΠ· CSV") try: chunks_csv_path = hf_hub_download( repo_id=repo_id, filename=chunks_filename, local_dir=download_dir, repo_type="dataset", token=hf_token ) chunks_df = pd.read_csv(chunks_csv_path) log_message(f"Π—Π°Π³Ρ€ΡƒΠΆΠ΅Π½ΠΎ {len(chunks_df)} Ρ‡Π°Π½ΠΊΠΎΠ² ΠΈΠ· CSV") text_column = None for col in chunks_df.columns: if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower(): text_column = col break if text_column is None: text_column = chunks_df.columns[0] log_message(f"Π˜ΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽ ΠΊΠΎΠ»ΠΎΠ½ΠΊΡƒ: {text_column}") documents = [] for i, (_, row) in enumerate(chunks_df.iterrows()): doc = Document( text=str(row[text_column]), metadata={ "chunk_id": row.get('chunk_id', i), "document_id": row.get('document_id', 'unknown'), "type": "text" } ) documents.append(doc) log_message(f"Π‘ΠΎΠ·Π΄Π°Π½ΠΎ {len(documents)} тСкстовых Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚ΠΎΠ² ΠΈΠ· CSV") return documents, chunks_df except Exception as e: log_message(f"Ошибка Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ CSV Π΄Π°Π½Π½Ρ‹Ρ…: {str(e)}") return [], None