RAG_AIEXP / documents_prep.py
MrSimple01's picture
Upload 10 files
fa02ae1 verified
import json
import zipfile
import pandas as pd
from huggingface_hub import hf_hub_download, list_repo_files
from llama_index.core import Document
from llama_index.core.text_splitter import SentenceSplitter
from my_logging import log_message
from config import CHUNK_SIZE, CHUNK_OVERLAP, MAX_CHARS_TABLE, MAX_ROWS_TABLE
import re
def normalize_text(text):
if not text:
return text
# Replace Cyrillic 'C' with Latin 'С' (U+0421)
# This is for welding types like C-25 -> С-25
text = text.replace('С-', 'C')
text = re.sub(r'\bС(\d)', r'С\1', text)
return text
def normalize_steel_designations(text):
if not text:
return text, 0, []
import re
changes_count = 0
changes_list = []
# Mapping of Cyrillic to Latin for steel designations
replacements = {
'Х': 'X',
'Н': 'H',
'Т': 'T',
'С': 'C',
'В': 'B',
'К': 'K',
'М': 'M',
'А': 'A',
'Р': 'P',
}
# Pattern: starts with digits, then letters+digits (steel grade pattern)
# Examples: 08Х18Н10Т, 12Х18Н9, 10Н17Н13М2Т, СВ-08Х19Н10
pattern = r'\b\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b'
# Also match welding wire patterns like СВ-08Х19Н10
pattern_wire = r'\b[СC][ВB]-\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b'
def replace_in_steel_grade(match):
nonlocal changes_count, changes_list
original = match.group(0)
converted = ''.join(replacements.get(ch, ch) for ch in original)
if converted != original:
changes_count += 1
changes_list.append(f"{original}{converted}")
return converted
normalized_text = re.sub(pattern, replace_in_steel_grade, text)
normalized_text = re.sub(pattern_wire, replace_in_steel_grade, normalized_text)
return normalized_text, changes_count, changes_list
def chunk_text_documents(documents):
text_splitter = SentenceSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
total_normalizations = 0
chunks_with_changes = 0
chunked = []
for doc in documents:
chunks = text_splitter.get_nodes_from_documents([doc])
for i, chunk in enumerate(chunks):
original_text = chunk.text
chunk.text, changes, change_list = normalize_steel_designations(chunk.text)
if changes > 0:
chunks_with_changes += 1
total_normalizations += changes
chunk.metadata.update({
'chunk_id': i,
'total_chunks': len(chunks),
'chunk_size': len(chunk.text)
})
chunked.append(chunk)
# Log statistics
if chunked:
avg_size = sum(len(c.text) for c in chunked) / len(chunked)
min_size = min(len(c.text) for c in chunked)
max_size = max(len(c.text) for c in chunked)
log_message(f"✓ Text: {len(documents)} docs → {len(chunked)} chunks")
log_message(f" Size stats: avg={avg_size:.0f}, min={min_size}, max={max_size} chars")
log_message(f" Steel designation normalization:")
log_message(f" - Chunks with changes: {chunks_with_changes}/{len(chunked)}")
log_message(f" - Total steel grades normalized: {total_normalizations}")
log_message(f" - Avg per affected chunk: {total_normalizations/chunks_with_changes:.1f}" if chunks_with_changes > 0 else " - No normalizations needed")
log_message("="*60)
return chunked
def chunk_table_by_content(table_data, doc_id, max_chars=MAX_CHARS_TABLE, max_rows=MAX_ROWS_TABLE):
headers = table_data.get('headers', [])
rows = table_data.get('data', [])
table_num = table_data.get('table_number', 'unknown')
table_title = table_data.get('table_title', '')
section = table_data.get('section', '')
sheet_name = table_data.get('sheet_name', '')
# Apply steel designation normalization to title and section
table_title, title_changes, title_list = normalize_steel_designations(str(table_title))
section, section_changes, section_list = normalize_steel_designations(section)
table_num_clean = str(table_num).strip()
import re
if table_num_clean in ['-', '', 'unknown', 'nan']:
if 'приложени' in sheet_name.lower() or 'приложени' in section.lower():
appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)',
(sheet_name + ' ' + section).lower())
if appendix_match:
appendix_num = appendix_match.group(1)
table_identifier = f"Приложение {appendix_num}"
else:
table_identifier = "Приложение"
else:
if table_title:
first_words = ' '.join(table_title.split()[:5])
table_identifier = f"{first_words}"
else:
table_identifier = section.split(',')[0] if section else "БезНомера"
else:
if 'приложени' in section.lower():
appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', section.lower())
if appendix_match:
appendix_num = appendix_match.group(1)
table_identifier = f"{table_num_clean} Приложение {appendix_num}"
else:
table_identifier = table_num_clean
else:
table_identifier = table_num_clean
if not rows:
return []
log_message(f" 📊 Processing: {doc_id} - {table_identifier} ({len(rows)} rows)")
# Normalize all row content (including steel designations)
normalized_rows = []
total_row_changes = 0
rows_with_changes = 0
all_row_changes = []
for row in rows:
if isinstance(row, dict):
normalized_row = {}
row_had_changes = False
for k, v in row.items():
normalized_val, changes, change_list = normalize_steel_designations(str(v))
normalized_row[k] = normalized_val
if changes > 0:
total_row_changes += changes
row_had_changes = True
all_row_changes.extend(change_list) # NEW
if row_had_changes:
rows_with_changes += 1
normalized_rows.append(normalized_row)
else:
normalized_rows.append(row)
# Log normalization stats with examples
if total_row_changes > 0 or title_changes > 0 or section_changes > 0:
log_message(f" Steel normalization: title={title_changes}, section={section_changes}, "
f"rows={rows_with_changes}/{len(rows)} ({total_row_changes} total)")
if title_list:
log_message(f" Title changes: {', '.join(title_list[:3])}")
if section_list:
log_message(f" Section changes: {', '.join(section_list[:3])}")
if all_row_changes:
log_message(f" Row examples: {', '.join(all_row_changes[:5])}")
base_content = format_table_header(doc_id, table_identifier, table_num,
table_title, section, headers,
sheet_name)
base_size = len(base_content)
available_space = max_chars - base_size - 200
# If entire table fits, return as one chunk
full_rows_content = format_table_rows([{**row, '_idx': i+1}
for i, row in enumerate(normalized_rows)])
if base_size + len(full_rows_content) <= max_chars and len(normalized_rows) <= max_rows:
content = base_content + full_rows_content + format_table_footer(table_identifier, doc_id)
metadata = {
'type': 'table',
'document_id': doc_id,
'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier,
'table_identifier': table_identifier,
'table_title': table_title,
'section': section,
'sheet_name': sheet_name,
'total_rows': len(normalized_rows),
'chunk_size': len(content),
'is_complete_table': True,
'keywords': f"{doc_id} {table_identifier} {table_title} {section} сталь материал"
}
log_message(f" Single chunk: {len(content)} chars, {len(normalized_rows)} rows")
return [Document(text=content, metadata=metadata)]
chunks = []
current_rows = []
current_size = 0
chunk_num = 0
for i, row in enumerate(normalized_rows):
row_text = format_single_row(row, i + 1)
row_size = len(row_text)
should_split = (current_size + row_size > available_space or
len(current_rows) >= max_rows) and current_rows
if should_split:
content = base_content + format_table_rows(current_rows)
content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(normalized_rows)}\n"
content += format_table_footer(table_identifier, doc_id)
metadata = {
'type': 'table',
'document_id': doc_id,
'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier,
'table_identifier': table_identifier,
'table_title': table_title,
'section': section,
'sheet_name': sheet_name,
'chunk_id': chunk_num,
'row_start': current_rows[0]['_idx'] - 1,
'row_end': current_rows[-1]['_idx'],
'total_rows': len(normalized_rows),
'chunk_size': len(content),
'is_complete_table': False,
'keywords': f"{doc_id} {table_identifier} {table_title} {section} сталь материал"
}
chunks.append(Document(text=content, metadata=metadata))
log_message(f" Chunk {chunk_num + 1}: {len(content)} chars, {len(current_rows)} rows")
chunk_num += 1
current_rows = []
current_size = 0
row_copy = row.copy() if isinstance(row, dict) else {'data': row}
row_copy['_idx'] = i + 1
current_rows.append(row_copy)
current_size += row_size
if current_rows:
content = base_content + format_table_rows(current_rows)
content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(normalized_rows)}\n"
content += format_table_footer(table_identifier, doc_id)
metadata = {
'type': 'table',
'document_id': doc_id,
'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier,
'table_identifier': table_identifier,
'table_title': table_title,
'section': section,
'sheet_name': sheet_name,
'chunk_id': chunk_num,
'row_start': current_rows[0]['_idx'] - 1,
'row_end': current_rows[-1]['_idx'],
'total_rows': len(normalized_rows),
'chunk_size': len(content),
'is_complete_table': False,
'keywords': f"{doc_id} {table_identifier} {table_title} {section} сталь материал"
}
chunks.append(Document(text=content, metadata=metadata))
log_message(f" Chunk {chunk_num + 1}: {len(content)} chars, {len(current_rows)} rows")
return chunks
def format_table_header(doc_id, table_identifier, table_num, table_title, section, headers, sheet_name=''):
content = f"ТАБЛИЦА {normalize_text(table_identifier)} из документа {doc_id}\n"
# Add multiple searchable identifiers
if table_num and table_num not in ['-', 'unknown']:
content += f"НОМЕР ТАБЛИЦЫ: {normalize_text(table_num)}\n"
if sheet_name:
content += f"ЛИСТ: {sheet_name}\n"
if table_title:
content += f"НАЗВАНИЕ: {normalize_text(table_title)}\n"
if section:
content += f"РАЗДЕЛ: {section}\n"
content += f"КЛЮЧЕВЫЕ СЛОВА: материалы стали марки стандарты {doc_id}\n"
content += f"{'='*70}\n"
if headers:
# Normalize headers too
normalized_headers = [normalize_text(str(h)) for h in headers]
header_str = ' | '.join(normalized_headers)
content += f"ЗАГОЛОВКИ: {header_str}\n\n"
content += "ДАННЫЕ:\n"
return content
def format_single_row(row, idx):
if isinstance(row, dict):
parts = [f"{k}: {v}" for k, v in row.items()
if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']]
if parts:
return f"{idx}. {' | '.join(parts)}\n"
elif isinstance(row, list):
parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']]
if parts:
return f"{idx}. {' | '.join(parts)}\n"
return ""
def format_table_rows(rows):
content = ""
for row in rows:
idx = row.get('_idx', 0)
content += format_single_row(row, idx)
return content
def format_table_footer(table_identifier, doc_id):
return f"\n{'='*70}\nКОНЕЦ ТАБЛИЦЫ {table_identifier} ИЗ {doc_id}\n"
def load_json_documents(repo_id, hf_token, json_dir):
import zipfile
import tempfile
import os
log_message("Loading JSON documents...")
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')]
zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')]
log_message(f"Found {len(json_files)} JSON files and {len(zip_files)} ZIP files")
documents = []
stats = {'success': 0, 'failed': 0, 'empty': 0}
for file_path in json_files:
try:
log_message(f" Loading: {file_path}")
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
repo_type="dataset",
token=hf_token
)
docs = extract_sections_from_json(local_path)
if docs:
documents.extend(docs)
stats['success'] += 1
log_message(f" ✓ Extracted {len(docs)} sections")
else:
stats['empty'] += 1
log_message(f" ⚠ No sections found")
except Exception as e:
stats['failed'] += 1
log_message(f" ✗ Error: {e}")
for zip_path in zip_files:
try:
log_message(f" Processing ZIP: {zip_path}")
local_zip = hf_hub_download(
repo_id=repo_id,
filename=zip_path,
repo_type="dataset",
token=hf_token,
force_download=True
)
with zipfile.ZipFile(local_zip, 'r') as zf:
json_files_in_zip = [f for f in zf.namelist()
if f.endswith('.json')
and not f.startswith('__MACOSX')
and not f.startswith('.')
and not '._' in f]
log_message(f" Found {len(json_files_in_zip)} JSON files in ZIP")
for json_file in json_files_in_zip:
try:
file_content = zf.read(json_file)
# Skip if file is too small
if len(file_content) < 10:
log_message(f" ✗ Skipping: {json_file} (file too small)")
stats['failed'] += 1
continue
try:
text_content = file_content.decode('utf-8')
except UnicodeDecodeError:
try:
text_content = file_content.decode('utf-8-sig')
except UnicodeDecodeError:
try:
text_content = file_content.decode('utf-16')
except UnicodeDecodeError:
try:
text_content = file_content.decode('windows-1251')
except UnicodeDecodeError:
log_message(f" ✗ Skipping: {json_file} (encoding failed)")
stats['failed'] += 1
continue
# Validate JSON structure
if not text_content.strip().startswith('{') and not text_content.strip().startswith('['):
log_message(f" ✗ Skipping: {json_file} (not valid JSON)")
stats['failed'] += 1
continue
with tempfile.NamedTemporaryFile(mode='w', delete=False,
suffix='.json', encoding='utf-8') as tmp:
tmp.write(text_content)
tmp_path = tmp.name
docs = extract_sections_from_json(tmp_path)
if docs:
documents.extend(docs)
stats['success'] += 1
log_message(f" ✓ {json_file}: {len(docs)} sections")
else:
stats['empty'] += 1
log_message(f" ⚠ {json_file}: No sections")
os.unlink(tmp_path)
except json.JSONDecodeError as e:
stats['failed'] += 1
log_message(f" ✗ {json_file}: Invalid JSON")
except Exception as e:
stats['failed'] += 1
log_message(f" ✗ {json_file}: {str(e)[:100]}")
except Exception as e:
log_message(f" ✗ Error with ZIP: {e}")
log_message(f"="*60)
log_message(f"JSON Loading Stats:")
log_message(f" Success: {stats['success']}")
log_message(f" Empty: {stats['empty']}")
log_message(f" Failed: {stats['failed']}")
log_message(f"="*60)
return documents
def extract_sections_from_json(json_path):
documents = []
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
doc_id = data.get('document_metadata', {}).get('document_id', 'unknown')
# Extract all section levels
for section in data.get('sections', []):
if section.get('section_text', '').strip():
documents.append(Document(
text=section['section_text'],
metadata={
'type': 'text',
'document_id': doc_id,
'section_id': section.get('section_id', '')
}
))
# Subsections
for subsection in section.get('subsections', []):
if subsection.get('subsection_text', '').strip():
documents.append(Document(
text=subsection['subsection_text'],
metadata={
'type': 'text',
'document_id': doc_id,
'section_id': subsection.get('subsection_id', '')
}
))
# Sub-subsections
for sub_sub in subsection.get('sub_subsections', []):
if sub_sub.get('sub_subsection_text', '').strip():
documents.append(Document(
text=sub_sub['sub_subsection_text'],
metadata={
'type': 'text',
'document_id': doc_id,
'section_id': sub_sub.get('sub_subsection_id', '')
}
))
except Exception as e:
log_message(f"Error extracting from {json_path}: {e}")
return documents
def load_table_documents(repo_id, hf_token, table_dir):
log_message("Loading tables...")
log_message("="*60)
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
table_files = [f for f in files if f.startswith(table_dir) and (f.endswith('.json') or f.endswith('.xlsx') or f.endswith('.xls'))]
all_chunks = []
tables_processed = 0
for file_path in table_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
repo_type="dataset",
token=hf_token
)
# Convert Excel to JSON if needed
if file_path.endswith(('.xlsx', '.xls')):
from converters.converter import convert_single_excel_to_json
import tempfile
import os
with tempfile.TemporaryDirectory() as temp_dir:
json_path = convert_single_excel_to_json(local_path, temp_dir)
local_path = json_path
with open(local_path, 'r', encoding='utf-8') as f:
data = json.load(f)
file_doc_id = data.get('document_id', data.get('document', 'unknown'))
for sheet in data.get('sheets', []):
sheet_doc_id = sheet.get('document_id', sheet.get('document', file_doc_id))
tables_processed += 1
chunks = chunk_table_by_content(sheet, sheet_doc_id,
max_chars=MAX_CHARS_TABLE,
max_rows=MAX_ROWS_TABLE)
all_chunks.extend(chunks)
except Exception as e:
log_message(f"Error loading {file_path}: {e}")
log_message(f"✓ Loaded {len(all_chunks)} table chunks from {tables_processed} tables")
log_message("="*60)
return all_chunks
def load_image_documents(repo_id, hf_token, image_dir):
log_message("Loading images...")
files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token)
csv_files = [f for f in files if f.startswith(image_dir) and (f.endswith('.csv') or f.endswith('.xlsx') or f.endswith('.xls'))]
documents = []
for file_path in csv_files:
try:
local_path = hf_hub_download(
repo_id=repo_id,
filename=file_path,
repo_type="dataset",
token=hf_token
)
# Convert Excel to CSV if needed
if file_path.endswith(('.xlsx', '.xls')):
from converters.converter import convert_single_excel_to_csv
import tempfile
import os
with tempfile.TemporaryDirectory() as temp_dir:
csv_path = convert_single_excel_to_csv(local_path, temp_dir)
local_path = csv_path
df = pd.read_csv(local_path)
for _, row in df.iterrows():
content = f"Документ: {row.get('Обозначение документа', 'unknown')}\n"
content += f"Рисунок: {row.get('№ Изображения', 'unknown')}\n"
content += f"Название: {row.get('Название изображения', '')}\n"
content += f"Описание: {row.get('Описание изображение', '')}\n"
content += f"Раздел: {row.get('Раздел документа', '')}\n"
chunk_size = len(content)
documents.append(Document(
text=content,
metadata={
'type': 'image',
'document_id': str(row.get('Обозначение документа', 'unknown')),
'image_number': str(row.get('№ Изображения', 'unknown')),
'section': str(row.get('Раздел документа', '')),
'chunk_size': chunk_size
}
))
except Exception as e:
log_message(f"Error loading {file_path}: {e}")
if documents:
avg_size = sum(d.metadata['chunk_size'] for d in documents) / len(documents)
log_message(f"✓ Loaded {len(documents)} images (avg size: {avg_size:.0f} chars)")
return documents
def load_all_documents(repo_id, hf_token, json_dir, table_dir, image_dir):
"""Main loader - combines all document types"""
log_message("="*60)
log_message("STARTING DOCUMENT LOADING")
log_message("="*60)
# Load text sections
text_docs = load_json_documents(repo_id, hf_token, json_dir)
text_chunks = chunk_text_documents(text_docs)
# Load tables (already chunked)
table_chunks = load_table_documents(repo_id, hf_token, table_dir)
# Load images (no chunking needed)
image_docs = load_image_documents(repo_id, hf_token, image_dir)
all_docs = text_chunks + table_chunks + image_docs
log_message("="*60)
log_message(f"TOTAL DOCUMENTS: {len(all_docs)}")
log_message(f" Text chunks: {len(text_chunks)}")
log_message(f" Table chunks: {len(table_chunks)}")
log_message(f" Images: {len(image_docs)}")
log_message("="*60)
return all_docs