Spaces:
Sleeping
Sleeping
| import json | |
| import zipfile | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from llama_index.core import Document | |
| from llama_index.core.text_splitter import SentenceSplitter | |
| from my_logging import log_message | |
| from config import CHUNK_SIZE, CHUNK_OVERLAP, MAX_CHARS_TABLE, MAX_ROWS_TABLE | |
| import re | |
| def normalize_text(text): | |
| if not text: | |
| return text | |
| # Replace Cyrillic 'C' with Latin 'С' (U+0421) | |
| # This is for welding types like C-25 -> С-25 | |
| text = text.replace('С-', 'C') | |
| text = re.sub(r'\bС(\d)', r'С\1', text) | |
| return text | |
| def normalize_steel_designations(text): | |
| if not text: | |
| return text, 0, [] | |
| import re | |
| changes_count = 0 | |
| changes_list = [] | |
| # Mapping of Cyrillic to Latin for steel designations | |
| replacements = { | |
| 'Х': 'X', | |
| 'Н': 'H', | |
| 'Т': 'T', | |
| 'С': 'C', | |
| 'В': 'B', | |
| 'К': 'K', | |
| 'М': 'M', | |
| 'А': 'A', | |
| 'Р': 'P', | |
| } | |
| # Pattern: starts with digits, then letters+digits (steel grade pattern) | |
| # Examples: 08Х18Н10Т, 12Х18Н9, 10Н17Н13М2Т, СВ-08Х19Н10 | |
| pattern = r'\b\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b' | |
| # Also match welding wire patterns like СВ-08Х19Н10 | |
| pattern_wire = r'\b[СC][ВB]-\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b' | |
| def replace_in_steel_grade(match): | |
| nonlocal changes_count, changes_list | |
| original = match.group(0) | |
| converted = ''.join(replacements.get(ch, ch) for ch in original) | |
| if converted != original: | |
| changes_count += 1 | |
| changes_list.append(f"{original} → {converted}") | |
| return converted | |
| normalized_text = re.sub(pattern, replace_in_steel_grade, text) | |
| normalized_text = re.sub(pattern_wire, replace_in_steel_grade, normalized_text) | |
| return normalized_text, changes_count, changes_list | |
| def chunk_text_documents(documents): | |
| text_splitter = SentenceSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP | |
| ) | |
| total_normalizations = 0 | |
| chunks_with_changes = 0 | |
| chunked = [] | |
| for doc in documents: | |
| chunks = text_splitter.get_nodes_from_documents([doc]) | |
| for i, chunk in enumerate(chunks): | |
| original_text = chunk.text | |
| chunk.text, changes, change_list = normalize_steel_designations(chunk.text) | |
| if changes > 0: | |
| chunks_with_changes += 1 | |
| total_normalizations += changes | |
| chunk.metadata.update({ | |
| 'chunk_id': i, | |
| 'total_chunks': len(chunks), | |
| 'chunk_size': len(chunk.text) | |
| }) | |
| chunked.append(chunk) | |
| # Log statistics | |
| if chunked: | |
| avg_size = sum(len(c.text) for c in chunked) / len(chunked) | |
| min_size = min(len(c.text) for c in chunked) | |
| max_size = max(len(c.text) for c in chunked) | |
| log_message(f"✓ Text: {len(documents)} docs → {len(chunked)} chunks") | |
| log_message(f" Size stats: avg={avg_size:.0f}, min={min_size}, max={max_size} chars") | |
| log_message(f" Steel designation normalization:") | |
| log_message(f" - Chunks with changes: {chunks_with_changes}/{len(chunked)}") | |
| log_message(f" - Total steel grades normalized: {total_normalizations}") | |
| log_message(f" - Avg per affected chunk: {total_normalizations/chunks_with_changes:.1f}" if chunks_with_changes > 0 else " - No normalizations needed") | |
| log_message("="*60) | |
| return chunked | |
| def chunk_table_by_content(table_data, doc_id, max_chars=MAX_CHARS_TABLE, max_rows=MAX_ROWS_TABLE): | |
| headers = table_data.get('headers', []) | |
| rows = table_data.get('data', []) | |
| table_num = table_data.get('table_number', 'unknown') | |
| table_title = table_data.get('table_title', '') | |
| section = table_data.get('section', '') | |
| sheet_name = table_data.get('sheet_name', '') | |
| # Apply steel designation normalization to title and section | |
| table_title, title_changes, title_list = normalize_steel_designations(str(table_title)) | |
| section, section_changes, section_list = normalize_steel_designations(section) | |
| table_num_clean = str(table_num).strip() | |
| import re | |
| if table_num_clean in ['-', '', 'unknown', 'nan']: | |
| if 'приложени' in sheet_name.lower() or 'приложени' in section.lower(): | |
| appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', | |
| (sheet_name + ' ' + section).lower()) | |
| if appendix_match: | |
| appendix_num = appendix_match.group(1) | |
| table_identifier = f"Приложение {appendix_num}" | |
| else: | |
| table_identifier = "Приложение" | |
| else: | |
| if table_title: | |
| first_words = ' '.join(table_title.split()[:5]) | |
| table_identifier = f"{first_words}" | |
| else: | |
| table_identifier = section.split(',')[0] if section else "БезНомера" | |
| else: | |
| if 'приложени' in section.lower(): | |
| appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', section.lower()) | |
| if appendix_match: | |
| appendix_num = appendix_match.group(1) | |
| table_identifier = f"{table_num_clean} Приложение {appendix_num}" | |
| else: | |
| table_identifier = table_num_clean | |
| else: | |
| table_identifier = table_num_clean | |
| if not rows: | |
| return [] | |
| log_message(f" 📊 Processing: {doc_id} - {table_identifier} ({len(rows)} rows)") | |
| # Normalize all row content (including steel designations) | |
| normalized_rows = [] | |
| total_row_changes = 0 | |
| rows_with_changes = 0 | |
| all_row_changes = [] | |
| for row in rows: | |
| if isinstance(row, dict): | |
| normalized_row = {} | |
| row_had_changes = False | |
| for k, v in row.items(): | |
| normalized_val, changes, change_list = normalize_steel_designations(str(v)) | |
| normalized_row[k] = normalized_val | |
| if changes > 0: | |
| total_row_changes += changes | |
| row_had_changes = True | |
| all_row_changes.extend(change_list) # NEW | |
| if row_had_changes: | |
| rows_with_changes += 1 | |
| normalized_rows.append(normalized_row) | |
| else: | |
| normalized_rows.append(row) | |
| # Log normalization stats with examples | |
| if total_row_changes > 0 or title_changes > 0 or section_changes > 0: | |
| log_message(f" Steel normalization: title={title_changes}, section={section_changes}, " | |
| f"rows={rows_with_changes}/{len(rows)} ({total_row_changes} total)") | |
| if title_list: | |
| log_message(f" Title changes: {', '.join(title_list[:3])}") | |
| if section_list: | |
| log_message(f" Section changes: {', '.join(section_list[:3])}") | |
| if all_row_changes: | |
| log_message(f" Row examples: {', '.join(all_row_changes[:5])}") | |
| base_content = format_table_header(doc_id, table_identifier, table_num, | |
| table_title, section, headers, | |
| sheet_name) | |
| base_size = len(base_content) | |
| available_space = max_chars - base_size - 200 | |
| # If entire table fits, return as one chunk | |
| full_rows_content = format_table_rows([{**row, '_idx': i+1} | |
| for i, row in enumerate(normalized_rows)]) | |
| if base_size + len(full_rows_content) <= max_chars and len(normalized_rows) <= max_rows: | |
| content = base_content + full_rows_content + format_table_footer(table_identifier, doc_id) | |
| metadata = { | |
| 'type': 'table', | |
| 'document_id': doc_id, | |
| 'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier, | |
| 'table_identifier': table_identifier, | |
| 'table_title': table_title, | |
| 'section': section, | |
| 'sheet_name': sheet_name, | |
| 'total_rows': len(normalized_rows), | |
| 'chunk_size': len(content), | |
| 'is_complete_table': True, | |
| 'keywords': f"{doc_id} {table_identifier} {table_title} {section} сталь материал" | |
| } | |
| log_message(f" Single chunk: {len(content)} chars, {len(normalized_rows)} rows") | |
| return [Document(text=content, metadata=metadata)] | |
| chunks = [] | |
| current_rows = [] | |
| current_size = 0 | |
| chunk_num = 0 | |
| for i, row in enumerate(normalized_rows): | |
| row_text = format_single_row(row, i + 1) | |
| row_size = len(row_text) | |
| should_split = (current_size + row_size > available_space or | |
| len(current_rows) >= max_rows) and current_rows | |
| if should_split: | |
| content = base_content + format_table_rows(current_rows) | |
| content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(normalized_rows)}\n" | |
| content += format_table_footer(table_identifier, doc_id) | |
| metadata = { | |
| 'type': 'table', | |
| 'document_id': doc_id, | |
| 'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier, | |
| 'table_identifier': table_identifier, | |
| 'table_title': table_title, | |
| 'section': section, | |
| 'sheet_name': sheet_name, | |
| 'chunk_id': chunk_num, | |
| 'row_start': current_rows[0]['_idx'] - 1, | |
| 'row_end': current_rows[-1]['_idx'], | |
| 'total_rows': len(normalized_rows), | |
| 'chunk_size': len(content), | |
| 'is_complete_table': False, | |
| 'keywords': f"{doc_id} {table_identifier} {table_title} {section} сталь материал" | |
| } | |
| chunks.append(Document(text=content, metadata=metadata)) | |
| log_message(f" Chunk {chunk_num + 1}: {len(content)} chars, {len(current_rows)} rows") | |
| chunk_num += 1 | |
| current_rows = [] | |
| current_size = 0 | |
| row_copy = row.copy() if isinstance(row, dict) else {'data': row} | |
| row_copy['_idx'] = i + 1 | |
| current_rows.append(row_copy) | |
| current_size += row_size | |
| if current_rows: | |
| content = base_content + format_table_rows(current_rows) | |
| content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(normalized_rows)}\n" | |
| content += format_table_footer(table_identifier, doc_id) | |
| metadata = { | |
| 'type': 'table', | |
| 'document_id': doc_id, | |
| 'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier, | |
| 'table_identifier': table_identifier, | |
| 'table_title': table_title, | |
| 'section': section, | |
| 'sheet_name': sheet_name, | |
| 'chunk_id': chunk_num, | |
| 'row_start': current_rows[0]['_idx'] - 1, | |
| 'row_end': current_rows[-1]['_idx'], | |
| 'total_rows': len(normalized_rows), | |
| 'chunk_size': len(content), | |
| 'is_complete_table': False, | |
| 'keywords': f"{doc_id} {table_identifier} {table_title} {section} сталь материал" | |
| } | |
| chunks.append(Document(text=content, metadata=metadata)) | |
| log_message(f" Chunk {chunk_num + 1}: {len(content)} chars, {len(current_rows)} rows") | |
| return chunks | |
| def format_table_header(doc_id, table_identifier, table_num, table_title, section, headers, sheet_name=''): | |
| content = f"ТАБЛИЦА {normalize_text(table_identifier)} из документа {doc_id}\n" | |
| # Add multiple searchable identifiers | |
| if table_num and table_num not in ['-', 'unknown']: | |
| content += f"НОМЕР ТАБЛИЦЫ: {normalize_text(table_num)}\n" | |
| if sheet_name: | |
| content += f"ЛИСТ: {sheet_name}\n" | |
| if table_title: | |
| content += f"НАЗВАНИЕ: {normalize_text(table_title)}\n" | |
| if section: | |
| content += f"РАЗДЕЛ: {section}\n" | |
| content += f"КЛЮЧЕВЫЕ СЛОВА: материалы стали марки стандарты {doc_id}\n" | |
| content += f"{'='*70}\n" | |
| if headers: | |
| # Normalize headers too | |
| normalized_headers = [normalize_text(str(h)) for h in headers] | |
| header_str = ' | '.join(normalized_headers) | |
| content += f"ЗАГОЛОВКИ: {header_str}\n\n" | |
| content += "ДАННЫЕ:\n" | |
| return content | |
| def format_single_row(row, idx): | |
| if isinstance(row, dict): | |
| parts = [f"{k}: {v}" for k, v in row.items() | |
| if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] | |
| if parts: | |
| return f"{idx}. {' | '.join(parts)}\n" | |
| elif isinstance(row, list): | |
| parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] | |
| if parts: | |
| return f"{idx}. {' | '.join(parts)}\n" | |
| return "" | |
| def format_table_rows(rows): | |
| content = "" | |
| for row in rows: | |
| idx = row.get('_idx', 0) | |
| content += format_single_row(row, idx) | |
| return content | |
| def format_table_footer(table_identifier, doc_id): | |
| return f"\n{'='*70}\nКОНЕЦ ТАБЛИЦЫ {table_identifier} ИЗ {doc_id}\n" | |
| def load_json_documents(repo_id, hf_token, json_dir): | |
| import zipfile | |
| import tempfile | |
| import os | |
| log_message("Loading JSON documents...") | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')] | |
| zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')] | |
| log_message(f"Found {len(json_files)} JSON files and {len(zip_files)} ZIP files") | |
| documents = [] | |
| stats = {'success': 0, 'failed': 0, 'empty': 0} | |
| for file_path in json_files: | |
| try: | |
| log_message(f" Loading: {file_path}") | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| docs = extract_sections_from_json(local_path) | |
| if docs: | |
| documents.extend(docs) | |
| stats['success'] += 1 | |
| log_message(f" ✓ Extracted {len(docs)} sections") | |
| else: | |
| stats['empty'] += 1 | |
| log_message(f" ⚠ No sections found") | |
| except Exception as e: | |
| stats['failed'] += 1 | |
| log_message(f" ✗ Error: {e}") | |
| for zip_path in zip_files: | |
| try: | |
| log_message(f" Processing ZIP: {zip_path}") | |
| local_zip = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=zip_path, | |
| repo_type="dataset", | |
| token=hf_token, | |
| force_download=True | |
| ) | |
| with zipfile.ZipFile(local_zip, 'r') as zf: | |
| json_files_in_zip = [f for f in zf.namelist() | |
| if f.endswith('.json') | |
| and not f.startswith('__MACOSX') | |
| and not f.startswith('.') | |
| and not '._' in f] | |
| log_message(f" Found {len(json_files_in_zip)} JSON files in ZIP") | |
| for json_file in json_files_in_zip: | |
| try: | |
| file_content = zf.read(json_file) | |
| # Skip if file is too small | |
| if len(file_content) < 10: | |
| log_message(f" ✗ Skipping: {json_file} (file too small)") | |
| stats['failed'] += 1 | |
| continue | |
| try: | |
| text_content = file_content.decode('utf-8') | |
| except UnicodeDecodeError: | |
| try: | |
| text_content = file_content.decode('utf-8-sig') | |
| except UnicodeDecodeError: | |
| try: | |
| text_content = file_content.decode('utf-16') | |
| except UnicodeDecodeError: | |
| try: | |
| text_content = file_content.decode('windows-1251') | |
| except UnicodeDecodeError: | |
| log_message(f" ✗ Skipping: {json_file} (encoding failed)") | |
| stats['failed'] += 1 | |
| continue | |
| # Validate JSON structure | |
| if not text_content.strip().startswith('{') and not text_content.strip().startswith('['): | |
| log_message(f" ✗ Skipping: {json_file} (not valid JSON)") | |
| stats['failed'] += 1 | |
| continue | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, | |
| suffix='.json', encoding='utf-8') as tmp: | |
| tmp.write(text_content) | |
| tmp_path = tmp.name | |
| docs = extract_sections_from_json(tmp_path) | |
| if docs: | |
| documents.extend(docs) | |
| stats['success'] += 1 | |
| log_message(f" ✓ {json_file}: {len(docs)} sections") | |
| else: | |
| stats['empty'] += 1 | |
| log_message(f" ⚠ {json_file}: No sections") | |
| os.unlink(tmp_path) | |
| except json.JSONDecodeError as e: | |
| stats['failed'] += 1 | |
| log_message(f" ✗ {json_file}: Invalid JSON") | |
| except Exception as e: | |
| stats['failed'] += 1 | |
| log_message(f" ✗ {json_file}: {str(e)[:100]}") | |
| except Exception as e: | |
| log_message(f" ✗ Error with ZIP: {e}") | |
| log_message(f"="*60) | |
| log_message(f"JSON Loading Stats:") | |
| log_message(f" Success: {stats['success']}") | |
| log_message(f" Empty: {stats['empty']}") | |
| log_message(f" Failed: {stats['failed']}") | |
| log_message(f"="*60) | |
| return documents | |
| def extract_sections_from_json(json_path): | |
| documents = [] | |
| try: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| doc_id = data.get('document_metadata', {}).get('document_id', 'unknown') | |
| # Extract all section levels | |
| for section in data.get('sections', []): | |
| if section.get('section_text', '').strip(): | |
| documents.append(Document( | |
| text=section['section_text'], | |
| metadata={ | |
| 'type': 'text', | |
| 'document_id': doc_id, | |
| 'section_id': section.get('section_id', '') | |
| } | |
| )) | |
| # Subsections | |
| for subsection in section.get('subsections', []): | |
| if subsection.get('subsection_text', '').strip(): | |
| documents.append(Document( | |
| text=subsection['subsection_text'], | |
| metadata={ | |
| 'type': 'text', | |
| 'document_id': doc_id, | |
| 'section_id': subsection.get('subsection_id', '') | |
| } | |
| )) | |
| # Sub-subsections | |
| for sub_sub in subsection.get('sub_subsections', []): | |
| if sub_sub.get('sub_subsection_text', '').strip(): | |
| documents.append(Document( | |
| text=sub_sub['sub_subsection_text'], | |
| metadata={ | |
| 'type': 'text', | |
| 'document_id': doc_id, | |
| 'section_id': sub_sub.get('sub_subsection_id', '') | |
| } | |
| )) | |
| except Exception as e: | |
| log_message(f"Error extracting from {json_path}: {e}") | |
| return documents | |
| def load_table_documents(repo_id, hf_token, table_dir): | |
| log_message("Loading tables...") | |
| log_message("="*60) | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| table_files = [f for f in files if f.startswith(table_dir) and (f.endswith('.json') or f.endswith('.xlsx') or f.endswith('.xls'))] | |
| all_chunks = [] | |
| tables_processed = 0 | |
| for file_path in table_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| # Convert Excel to JSON if needed | |
| if file_path.endswith(('.xlsx', '.xls')): | |
| from converters.converter import convert_single_excel_to_json | |
| import tempfile | |
| import os | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| json_path = convert_single_excel_to_json(local_path, temp_dir) | |
| local_path = json_path | |
| with open(local_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| file_doc_id = data.get('document_id', data.get('document', 'unknown')) | |
| for sheet in data.get('sheets', []): | |
| sheet_doc_id = sheet.get('document_id', sheet.get('document', file_doc_id)) | |
| tables_processed += 1 | |
| chunks = chunk_table_by_content(sheet, sheet_doc_id, | |
| max_chars=MAX_CHARS_TABLE, | |
| max_rows=MAX_ROWS_TABLE) | |
| all_chunks.extend(chunks) | |
| except Exception as e: | |
| log_message(f"Error loading {file_path}: {e}") | |
| log_message(f"✓ Loaded {len(all_chunks)} table chunks from {tables_processed} tables") | |
| log_message("="*60) | |
| return all_chunks | |
| def load_image_documents(repo_id, hf_token, image_dir): | |
| log_message("Loading images...") | |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) | |
| csv_files = [f for f in files if f.startswith(image_dir) and (f.endswith('.csv') or f.endswith('.xlsx') or f.endswith('.xls'))] | |
| documents = [] | |
| for file_path in csv_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_path, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| # Convert Excel to CSV if needed | |
| if file_path.endswith(('.xlsx', '.xls')): | |
| from converters.converter import convert_single_excel_to_csv | |
| import tempfile | |
| import os | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| csv_path = convert_single_excel_to_csv(local_path, temp_dir) | |
| local_path = csv_path | |
| df = pd.read_csv(local_path) | |
| for _, row in df.iterrows(): | |
| content = f"Документ: {row.get('Обозначение документа', 'unknown')}\n" | |
| content += f"Рисунок: {row.get('№ Изображения', 'unknown')}\n" | |
| content += f"Название: {row.get('Название изображения', '')}\n" | |
| content += f"Описание: {row.get('Описание изображение', '')}\n" | |
| content += f"Раздел: {row.get('Раздел документа', '')}\n" | |
| chunk_size = len(content) | |
| documents.append(Document( | |
| text=content, | |
| metadata={ | |
| 'type': 'image', | |
| 'document_id': str(row.get('Обозначение документа', 'unknown')), | |
| 'image_number': str(row.get('№ Изображения', 'unknown')), | |
| 'section': str(row.get('Раздел документа', '')), | |
| 'chunk_size': chunk_size | |
| } | |
| )) | |
| except Exception as e: | |
| log_message(f"Error loading {file_path}: {e}") | |
| if documents: | |
| avg_size = sum(d.metadata['chunk_size'] for d in documents) / len(documents) | |
| log_message(f"✓ Loaded {len(documents)} images (avg size: {avg_size:.0f} chars)") | |
| return documents | |
| def load_all_documents(repo_id, hf_token, json_dir, table_dir, image_dir): | |
| """Main loader - combines all document types""" | |
| log_message("="*60) | |
| log_message("STARTING DOCUMENT LOADING") | |
| log_message("="*60) | |
| # Load text sections | |
| text_docs = load_json_documents(repo_id, hf_token, json_dir) | |
| text_chunks = chunk_text_documents(text_docs) | |
| # Load tables (already chunked) | |
| table_chunks = load_table_documents(repo_id, hf_token, table_dir) | |
| # Load images (no chunking needed) | |
| image_docs = load_image_documents(repo_id, hf_token, image_dir) | |
| all_docs = text_chunks + table_chunks + image_docs | |
| log_message("="*60) | |
| log_message(f"TOTAL DOCUMENTS: {len(all_docs)}") | |
| log_message(f" Text chunks: {len(text_chunks)}") | |
| log_message(f" Table chunks: {len(table_chunks)}") | |
| log_message(f" Images: {len(image_docs)}") | |
| log_message("="*60) | |
| return all_docs |