Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from pathlib import Path | |
| from config import TABLE_DATA_DIR, MAX_CHARS_TABLE, MAX_ROWS_TABLE | |
| def normalize_text(text): | |
| """Normalize text for consistency""" | |
| if not text: | |
| return text | |
| text = text.replace('С-', 'C') | |
| import re | |
| text = re.sub(r'\bС(\d)', r'С\1', text) | |
| return text | |
| def format_table_header(doc_id, table_identifier, table_num, table_title, section, headers): | |
| """Format table header""" | |
| content = f"ТАБЛИЦА {normalize_text(table_identifier)} из документа {doc_id}\n" | |
| if table_title: | |
| content += f"НАЗВАНИЕ: {normalize_text(table_title)}\n" | |
| if section: | |
| content += f"РАЗДЕЛ: {section}\n" | |
| content += f"{'='*70}\n" | |
| if headers: | |
| header_str = ' | '.join(str(h) for h in headers) | |
| content += f"ЗАГОЛОВКИ: {header_str}\n\n" | |
| content += "ДАННЫЕ:\n" | |
| return content | |
| def format_single_row(row, idx): | |
| """Format a single row""" | |
| if isinstance(row, dict): | |
| parts = [f"{k}: {v}" for k, v in row.items() | |
| if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] | |
| if parts: | |
| return f"{idx}. {' | '.join(parts)}\n" | |
| elif isinstance(row, list): | |
| parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] | |
| if parts: | |
| return f"{idx}. {' | '.join(parts)}\n" | |
| return "" | |
| def format_table_rows(rows): | |
| """Format multiple rows""" | |
| content = "" | |
| for row in rows: | |
| idx = row.get('_idx', 0) | |
| content += format_single_row(row, idx) | |
| return content | |
| def format_table_footer(table_identifier, doc_id): | |
| """Format table footer""" | |
| return f"\n{'='*70}\nКОНЕЦ ТАБЛИЦЫ {table_identifier} ИЗ {doc_id}\n" | |
| def chunk_table_by_content(table_data, doc_id, max_chars=MAX_CHARS_TABLE, max_rows=MAX_ROWS_TABLE): | |
| """Split table into chunks""" | |
| headers = table_data.get('headers', []) | |
| rows = table_data.get('data', []) | |
| table_num = table_data.get('table_number', 'unknown') | |
| table_title = table_data.get('table_title', '') | |
| section = table_data.get('section', '') | |
| table_num_clean = str(table_num).strip() | |
| table_title_normalized = normalize_text(str(table_title)) | |
| import re | |
| if 'приложени' in section.lower(): | |
| appendix_match = re.search(r'приложени[еия]\s*(\d+|[а-яА-Я])', section.lower()) | |
| if appendix_match: | |
| appendix_num = appendix_match.group(1).upper() | |
| table_identifier = f"{table_num_clean} Приложение {appendix_num}" | |
| else: | |
| table_identifier = table_num_clean | |
| else: | |
| table_identifier = table_num_clean | |
| if not rows: | |
| return [] | |
| # Calculate base metadata size | |
| base_content = format_table_header(doc_id, table_identifier, table_num, table_title_normalized, section, headers) | |
| base_size = len(base_content) | |
| available_space = max_chars - base_size - 200 | |
| # If entire table fits, return as one chunk | |
| full_rows_content = format_table_rows([{**row, '_idx': i+1} for i, row in enumerate(rows)]) | |
| if base_size + len(full_rows_content) <= max_chars and len(rows) <= max_rows: | |
| content = base_content + full_rows_content + format_table_footer(table_identifier, doc_id) | |
| return [content] | |
| # Split into chunks | |
| chunks = [] | |
| current_rows = [] | |
| current_size = 0 | |
| for i, row in enumerate(rows): | |
| row_text = format_single_row(row, i + 1) | |
| row_size = len(row_text) | |
| should_split = (current_size + row_size > available_space or len(current_rows) >= max_rows) and current_rows | |
| if should_split: | |
| content = base_content + format_table_rows(current_rows) | |
| content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(rows)}\n" | |
| content += format_table_footer(table_identifier, doc_id) | |
| chunks.append(content) | |
| current_rows = [] | |
| current_size = 0 | |
| row_copy = row.copy() if isinstance(row, dict) else {'data': row} | |
| row_copy['_idx'] = i + 1 | |
| current_rows.append(row_copy) | |
| current_size += row_size | |
| # Add final chunk | |
| if current_rows: | |
| content = base_content + format_table_rows(current_rows) | |
| content += f"\n\nСтроки {current_rows[0]['_idx']}-{current_rows[-1]['_idx']} из {len(rows)}\n" | |
| content += format_table_footer(table_identifier, doc_id) | |
| chunks.append(content) | |
| return chunks | |
| def export_table_chunks(): | |
| """Main function to export table chunks to text files""" | |
| # Create output directory | |
| output_dir = Path("table_chunks_export") | |
| output_dir.mkdir(exist_ok=True) | |
| # Get all JSON files from table data directory | |
| table_dir = Path(TABLE_DATA_DIR) | |
| if not table_dir.exists(): | |
| print(f"❌ Directory not found: {TABLE_DATA_DIR}") | |
| return | |
| json_files = list(table_dir.glob("*.json")) | |
| print(f"📁 Found {len(json_files)} JSON files in {TABLE_DATA_DIR}") | |
| total_chunks = 0 | |
| total_files = 0 | |
| for json_file in json_files: | |
| try: | |
| print(f"\n📄 Processing: {json_file.name}") | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| doc_id = data.get('document_id', data.get('document', 'unknown')) | |
| # Collect all chunks for this JSON file | |
| all_chunks = [] | |
| for sheet_idx, sheet in enumerate(data.get('sheets', [])): | |
| sheet_doc_id = sheet.get('document_id', sheet.get('document', doc_id)) | |
| # Get chunks for this table | |
| chunks = chunk_table_by_content(sheet, sheet_doc_id) | |
| all_chunks.extend(chunks) | |
| # Save all chunks to one text file | |
| if all_chunks: | |
| # Use the JSON filename (without .json) for the txt file | |
| output_filename = json_file.stem + ".txt" | |
| output_path = output_dir / output_filename | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| for i, chunk in enumerate(all_chunks): | |
| f.write(f"\n{'#'*70}\n") | |
| f.write(f"CHUNK {i+1} of {len(all_chunks)}\n") | |
| f.write(f"{'#'*70}\n\n") | |
| f.write(chunk) | |
| f.write("\n\n") | |
| print(f" ✓ Saved: {output_filename} with {len(all_chunks)} chunks") | |
| total_chunks += len(all_chunks) | |
| total_files += 1 | |
| else: | |
| print(f" ⚠️ No chunks found in {json_file.name}") | |
| except Exception as e: | |
| print(f" ❌ Error processing {json_file.name}: {e}") | |
| print(f"\n{'='*60}") | |
| print(f"✅ Export complete!") | |
| print(f" Total files created: {total_files}") | |
| print(f" Total chunks exported: {total_chunks}") | |
| print(f" Output directory: {output_dir.absolute()}") | |
| print(f"{'='*60}") | |
| if __name__ == "__main__": | |
| export_table_chunks() |