Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| USLaP Migration Script: Excel โ SQLite relational schema | |
| Extends build_database_v3.py to populate the full USLaP lattice database. | |
| This script: | |
| 1. Creates uslap_lattice.db with the full relational schema (create_uslap_db.sql) | |
| 2. Reads data from structured Excel sheets (skip EXCEL_DATA_CONSOLIDATED) | |
| 3. Normalizes data into the relational schema with proper foreign keys | |
| 4. Registers Python UDF extract_consonants() from USLaP_Engine.py | |
| 5. Generates word_fingerprints table for O(log n) cluster expansion | |
| 6. Uses transaction safety with rollback on error | |
| 7. Creates backup of existing databases | |
| ุจูุณูู ู ุงูููููู ุงูุฑููุญูู ููฐูู ุงูุฑููุญููู ู | |
| """ | |
| import sqlite3 | |
| import openpyxl | |
| import re | |
| import sys | |
| import os | |
| import shutil | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Add the current directory to sys.path to import USLaP_Engine | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| # ============================================================================ | |
| # GLOBAL CONFIGURATION | |
| # ============================================================================ | |
| EXCEL_PATH = "USLaP_Final_Data_Consolidated_Master_v3.xlsx" | |
| DB_PATH = "Code_files/uslap_lattice.db" | |
| SCHEMA_PATH = "Code_files/create_uslap_db.sql" | |
| BACKUP_DIR = "Code_files/backups" | |
| # Sheets to migrate (structured data only, skip EXCEL_DATA_CONSOLIDATED) | |
| SHEETS_TO_MIGRATE = [ | |
| "A1_ENTRIES", | |
| "A1_ะะะะะกะ", | |
| "PERSIAN_A1_MADฤKHIL", | |
| "BITIG_A1_ENTRIES", | |
| "CHILD_SCHEMA", | |
| "A4_DERIVATIVES", | |
| "A5_CROSS_REFS", | |
| "A3_QURAN_REFS", | |
| "M1_PHONETIC_SHIFTS", | |
| "M2_DETECTION_PATTERNS", | |
| "M4_NETWORKS", | |
| "M3_SCHOLARS", | |
| "M5_QUR_VERIFICATION", | |
| ] | |
| # ============================================================================ | |
| # UTILITY FUNCTIONS | |
| # ============================================================================ | |
| def clean_column_name(col): | |
| """Convert Excel column header to valid SQLite column name.""" | |
| if col is None: | |
| return "unknown" | |
| col = str(col).strip() | |
| col = re.sub(r'[^\w\s]', '', col) # Remove punctuation | |
| col = re.sub(r'\s+', '_', col) # Replace spaces with underscore | |
| col = col.lower() | |
| if not col: | |
| return "unknown" | |
| return col | |
| def find_header_row(ws, sheet_name): | |
| """Find the header row in an Excel sheet based on patterns.""" | |
| rows = list(ws.iter_rows(values_only=True)) | |
| # Special handling for each sheet based on debug output | |
| if sheet_name == "CHILD_SCHEMA": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| return rows[2], 3 | |
| elif sheet_name == "PHONETIC_REVERSAL": | |
| # Row 0: title, Row 1: description, Row 2: zone header, Row 3: headers | |
| if len(rows) > 3: | |
| return rows[3], 4 | |
| elif sheet_name in ["UMD_OPERATIONS", "DP_REGISTER", "ATT_TERMS", | |
| "SESSION_INDEX", "PROTOCOL_CORRECTIONS", | |
| "SCHOLAR_WARNINGS", "A1_ENTRIES"]: | |
| # Row 0: title/headers (most sheets) | |
| if len(rows) > 0: | |
| return rows[0], 1 | |
| else: | |
| # Generic fallback: find row with typical column names | |
| for i, row in enumerate(rows): | |
| if row and any(isinstance(cell, str) and ('ID' in cell or 'NAME' in cell or 'CODE' in cell or 'TERM' in cell) for cell in row): | |
| return row, i + 1 | |
| return None, 0 | |
| def backup_database(db_path): | |
| """Create a timestamped backup of existing database.""" | |
| if not os.path.exists(db_path): | |
| return None | |
| os.makedirs(BACKUP_DIR, exist_ok=True) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| backup_path = os.path.join(BACKUP_DIR, f"{os.path.basename(db_path)}_backup_{timestamp}.db") | |
| shutil.copy2(db_path, backup_path) | |
| print(f" Created backup: {backup_path}") | |
| return backup_path | |
| # ============================================================================ | |
| # EXTRACT_CONSONANTS UDF (from USLaP_Engine.py) | |
| # ============================================================================ | |
| def extract_consonants(word): | |
| """ | |
| Python UDF for SQLite: Extract consonant skeleton from a word. | |
| Must be registered before any operations on word_fingerprints table. | |
| This is the same logic as PhoneticReversal.extract_consonants() in USLaP_Engine.py. | |
| """ | |
| if not word: | |
| return "" | |
| # Import the function directly from USLaP_Engine if available | |
| try: | |
| from USLaP_Engine import PhoneticReversal | |
| # Create a minimal instance just to use the method | |
| # We'll implement the logic directly here to avoid dependencies | |
| pass | |
| except ImportError: | |
| pass | |
| # Direct implementation from USLaP_Engine.py | |
| vowels = set('aeiou') | |
| result = [] | |
| i = 0 | |
| word_lower = word.lower() | |
| while i < len(word_lower): | |
| digraph = word_lower[i:i+2] if i + 1 < len(word_lower) else '' | |
| if digraph in ('sh', 'ch', 'gh', 'th', 'ph', 'wh', 'qu'): | |
| result.append(digraph) | |
| i += 2 | |
| elif word_lower[i] not in vowels: | |
| result.append(word_lower[i]) | |
| i += 1 | |
| else: | |
| i += 1 | |
| return ''.join(result) | |
| # ============================================================================ | |
| # MIGRATION FUNCTIONS FOR EACH SHEET | |
| # ============================================================================ | |
| def migrate_a1_entries(conn, cursor, wb): | |
| """Migrate A1_ENTRIES sheet to entries table.""" | |
| print(" Migrating A1_ENTRIES...") | |
| if "A1_ENTRIES" not in wb.sheetnames: | |
| print(" Sheet A1_ENTRIES not found") | |
| return 0 | |
| ws = wb["A1_ENTRIES"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in A1_ENTRIES") | |
| return 0 | |
| # First row is headers | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| # Map to entries table schema | |
| entry_data = { | |
| 'entry_id': row_dict.get('entry_id'), | |
| 'score': row_dict.get('score'), | |
| 'en_term': row_dict.get('en_term'), | |
| 'ar_word': row_dict.get('ar_word'), | |
| 'root_letters': row_dict.get('root_letters'), | |
| 'qur_meaning': row_dict.get('qur_meaning'), | |
| 'pattern': row_dict.get('pattern'), | |
| 'allah_name_id': row_dict.get('allah_name_id'), | |
| 'network_id': row_dict.get('network_id'), | |
| 'phonetic_chain': row_dict.get('phonetic_chain'), | |
| 'inversion_type': row_dict.get('inversion_type'), | |
| 'source_form': row_dict.get('source_form'), | |
| 'foundation_refs': row_dict.get('foundation_ref'), | |
| } | |
| # Extract root ID from root_letters or root_id | |
| root_id = row_dict.get('root_id') | |
| if root_id: | |
| entry_data['root_id'] = root_id | |
| # Extract DS corridor from foundation_ref | |
| foundation_ref = row_dict.get('foundation_ref', '') | |
| ds_match = re.search(r'DS\d+', str(foundation_ref)) | |
| if ds_match: | |
| entry_data['ds_corridor'] = ds_match.group(0) | |
| # Insert into entries table | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO entries ( | |
| entry_id, score, en_term, ar_word, root_id, root_letters, | |
| qur_meaning, pattern, allah_name_id, network_id, phonetic_chain, | |
| inversion_type, source_form, foundation_refs, ds_corridor | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| entry_data['entry_id'], entry_data['score'], entry_data['en_term'], | |
| entry_data['ar_word'], entry_data.get('root_id'), entry_data['root_letters'], | |
| entry_data['qur_meaning'], entry_data['pattern'], entry_data['allah_name_id'], | |
| entry_data['network_id'], entry_data['phonetic_chain'], entry_data['inversion_type'], | |
| entry_data['source_form'], entry_data['foundation_refs'], entry_data.get('ds_corridor') | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {entry_data}") | |
| print(f" Migrated {count} entries") | |
| return count | |
| def migrate_a1_zapisi(conn, cursor, wb): | |
| """Migrate A1_ะะะะะกะ sheet to entries table (Russian entries).""" | |
| print(" Migrating A1_ะะะะะกะ (Russian entries)...") | |
| if "A1_ะะะะะกะ" not in wb.sheetnames: | |
| print(" Sheet A1_ะะะะะกะ not found") | |
| return 0 | |
| ws = wb["A1_ะะะะะกะ"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in A1_ะะะะะกะ") | |
| return 0 | |
| # First row is headers (Russian column names) | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| # Map Russian column names to entries table schema | |
| entry_data = { | |
| 'entry_id': row_dict.get('ะทะฐะฟะธัั_id'), | |
| 'score': row_dict.get('ะฑะฐะปะป'), | |
| 'ru_term': row_dict.get('ััั_ัะตัะผะธะฝ'), | |
| 'ar_word': row_dict.get('ะฐั_ัะปะพะฒะพ'), | |
| 'root_letters': row_dict.get('ะบะพัะฝะตะฒัะต_ะฑัะบะฒั'), | |
| 'qur_meaning': row_dict.get('ะบะพัะฐะฝะธั_ะทะฝะฐัะตะฝะธะต'), | |
| 'pattern': row_dict.get('ะฟะฐััะตัะฝ'), | |
| 'allah_name_id': row_dict.get('ะธะผั_ะฐะปะปะฐั ะฐ_id'), | |
| 'network_id': row_dict.get('ัะตัั_id'), | |
| 'phonetic_chain': row_dict.get('ัะพะฝะตัะธัะตัะบะฐั_ัะตะฟั'), | |
| 'inversion_type': row_dict.get('ัะธะฟ_ะธะฝะฒะตััะธะธ'), | |
| 'source_form': row_dict.get('ะธัั ะพะดะฝะฐั_ัะพัะผะฐ'), | |
| 'foundation_refs': row_dict.get('ะพัะฝะพะฒะฐะฝะธะต'), | |
| } | |
| # Extract root ID | |
| root_id = row_dict.get('ะบะพัะตะฝั_id') | |
| if root_id: | |
| entry_data['root_id'] = root_id | |
| # Insert into entries table (Russian term goes to ru_term) | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO entries ( | |
| entry_id, score, ru_term, ar_word, root_id, root_letters, | |
| qur_meaning, pattern, allah_name_id, network_id, phonetic_chain, | |
| inversion_type, source_form, foundation_refs | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| entry_data['entry_id'], entry_data['score'], entry_data['ru_term'], | |
| entry_data['ar_word'], entry_data.get('root_id'), entry_data['root_letters'], | |
| entry_data['qur_meaning'], entry_data['pattern'], entry_data['allah_name_id'], | |
| entry_data['network_id'], entry_data['phonetic_chain'], entry_data['inversion_type'], | |
| entry_data['source_form'], entry_data['foundation_refs'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {entry_data}") | |
| print(f" Migrated {count} Russian entries") | |
| return count | |
| def migrate_persian_a1_madakhil(conn, cursor, wb): | |
| """Migrate PERSIAN_A1_MADฤKHIL sheet to entries table (Persian entries).""" | |
| print(" Migrating PERSIAN_A1_MADฤKHIL (Persian entries)...") | |
| if "PERSIAN_A1_MADฤKHIL" not in wb.sheetnames: | |
| print(" Sheet PERSIAN_A1_MADฤKHIL not found") | |
| return 0 | |
| ws = wb["PERSIAN_A1_MADฤKHIL"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in PERSIAN_A1_MADฤKHIL") | |
| return 0 | |
| # First row has complex headers with Persian and English | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| # Map to entries table schema | |
| entry_data = { | |
| 'entry_id': row_dict.get('madkhal_identry_id'), | |
| 'score': row_dict.get('nomrescore'), | |
| 'fa_term': row_dict.get('vazhe_farsipersian_term'), | |
| 'source_word': row_dict.get('kalame_aslisource_word'), | |
| 'root_letters': row_dict.get('horuf_e_risheroot_letters'), | |
| 'qur_meaning': row_dict.get('mana_ye_quraniquot_meaning'), | |
| 'pattern': row_dict.get('olgu_pattern'), | |
| 'allah_name_id': row_dict.get('esm_e_allah_idallah_name_id'), | |
| 'network_id': row_dict.get('shabake_idnetwork_id'), | |
| 'phonetic_chain': row_dict.get('zanjire_sawtiphoneic_chain'), | |
| 'inversion_type': row_dict.get('now_e_vazhguniinversion_type'), | |
| 'source_form': row_dict.get('shakl_e_aslisource_form'), | |
| 'foundation_refs': row_dict.get('boniyanfoundation_ref'), | |
| } | |
| # Extract root ID | |
| root_id = row_dict.get('rishe_idroot_id') | |
| if root_id: | |
| entry_data['root_id'] = root_id | |
| # Use Persian term for fa_term and source_word for ar_word | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO entries ( | |
| entry_id, score, fa_term, ar_word, root_id, root_letters, | |
| qur_meaning, pattern, allah_name_id, network_id, phonetic_chain, | |
| inversion_type, source_form, foundation_refs | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| entry_data['entry_id'], entry_data['score'], entry_data['fa_term'], | |
| entry_data['source_word'], entry_data.get('root_id'), entry_data['root_letters'], | |
| entry_data['qur_meaning'], entry_data['pattern'], entry_data['allah_name_id'], | |
| entry_data['network_id'], entry_data['phonetic_chain'], entry_data['inversion_type'], | |
| entry_data['source_form'], entry_data['foundation_refs'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {entry_data}") | |
| print(f" Migrated {count} Persian entries") | |
| return count | |
| def migrate_bitig_a1_entries(conn, cursor, wb): | |
| """Migrate BITIG_A1_ENTRIES sheet to entries table (ORIG2 entries).""" | |
| print(" Migrating BITIG_A1_ENTRIES (ORIG2 entries)...") | |
| if "BITIG_A1_ENTRIES" not in wb.sheetnames: | |
| print(" Sheet BITIG_A1_ENTRIES not found") | |
| return 0 | |
| ws = wb["BITIG_A1_ENTRIES"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in BITIG_A1_ENTRIES") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| # Map to entries table schema | |
| entry_data = { | |
| 'entry_id': row_dict.get('entry_id'), | |
| 'score': row_dict.get('score'), | |
| 'en_term': row_dict.get('orig2_term'), # ORIG2 term goes to en_term | |
| 'orig2_script': row_dict.get('orig2_script'), | |
| 'root_letters': row_dict.get('root_letters'), | |
| 'phonetic_chain': row_dict.get('phonetic_chain'), | |
| 'semantic_field': row_dict.get('semantic_field'), | |
| 'dispersal_range': row_dict.get('dispersal_range'), | |
| 'status': row_dict.get('status'), | |
| 'notes': row_dict.get('notes'), | |
| } | |
| # For ORIG2 entries, we need to handle specially | |
| # Mark as ORIG2 in foundation_refs | |
| foundation_refs = f"ORIG2: {row_dict.get('kashgari_attestation', '')}" | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO entries ( | |
| entry_id, score, en_term, ar_word, root_letters, | |
| phonetic_chain, foundation_refs, notes | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| entry_data['entry_id'], entry_data['score'], entry_data['en_term'], | |
| entry_data.get('orig2_script'), entry_data['root_letters'], | |
| entry_data['phonetic_chain'], foundation_refs, entry_data['notes'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {entry_data}") | |
| print(f" Migrated {count} ORIG2 entries") | |
| return count | |
| def migrate_child_schema(conn, cursor, wb): | |
| """Migrate CHILD_SCHEMA sheet to child_entries table.""" | |
| print(" Migrating CHILD_SCHEMA...") | |
| if "CHILD_SCHEMA" not in wb.sheetnames: | |
| print(" Sheet CHILD_SCHEMA not found") | |
| return 0 | |
| ws = wb["CHILD_SCHEMA"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 4: | |
| print(" No data in CHILD_SCHEMA") | |
| return 0 | |
| # Row 2 is headers (0: title, 1: description, 2: headers) | |
| headers = [clean_column_name(cell) for cell in rows[2]] | |
| count = 0 | |
| for i, row in enumerate(rows[3:], start=3): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| # Map to child_entries table schema | |
| child_data = { | |
| 'child_id': row_dict.get('entry_id'), | |
| 'shell_name': row_dict.get('shell_name'), | |
| 'shell_language': row_dict.get('shell_language'), | |
| 'orig_class': row_dict.get('orig_class'), | |
| 'orig_root': row_dict.get('orig_root'), | |
| 'orig_lemma': row_dict.get('orig_lemma'), | |
| 'orig_meaning': row_dict.get('orig_meaning'), | |
| 'operation_role': row_dict.get('operation_role'), | |
| 'shell_meaning': row_dict.get('shell_meaning'), | |
| 'inversion_direction': row_dict.get('inversion_direction'), | |
| 'phonetic_chain': row_dict.get('phonetic_chain'), | |
| 'qur_anchors': row_dict.get('qur_anchors'), | |
| 'dp_codes': row_dict.get('dp_codes'), | |
| 'nt_code': row_dict.get('nt_code'), | |
| 'pattern': row_dict.get('pattern'), | |
| 'parent_op': row_dict.get('parent_op'), | |
| 'gate_status': row_dict.get('gate_status'), | |
| 'notes': row_dict.get('notes'), | |
| } | |
| # Clean up pattern field | |
| pattern = child_data['pattern'] | |
| if pattern and '+' in pattern: | |
| pattern = pattern.split('+')[0] | |
| child_data['pattern'] = pattern | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO child_entries ( | |
| child_id, shell_name, shell_language, orig_class, orig_root, | |
| orig_lemma, orig_meaning, operation_role, shell_meaning, | |
| inversion_direction, phonetic_chain, qur_anchors, dp_codes, | |
| nt_code, pattern, parent_op, gate_status, notes | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| child_data['child_id'], child_data['shell_name'], child_data['shell_language'], | |
| child_data['orig_class'], child_data['orig_root'], child_data['orig_lemma'], | |
| child_data['orig_meaning'], child_data['operation_role'], child_data['shell_meaning'], | |
| child_data['inversion_direction'], child_data['phonetic_chain'], child_data['qur_anchors'], | |
| child_data['dp_codes'], child_data['nt_code'], child_data['pattern'], | |
| child_data['parent_op'], child_data['gate_status'], child_data['notes'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {child_data}") | |
| print(f" Migrated {count} child entries") | |
| return count | |
| def migrate_a4_derivatives(conn, cursor, wb): | |
| """Migrate A4_DERIVATIVES sheet to derivatives table.""" | |
| print(" Migrating A4_DERIVATIVES...") | |
| if "A4_DERIVATIVES" not in wb.sheetnames: | |
| print(" Sheet A4_DERIVATIVES not found") | |
| return 0 | |
| ws = wb["A4_DERIVATIVES"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in A4_DERIVATIVES") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| derivative_data = { | |
| 'derivative_id': row_dict.get('deriv_id'), | |
| 'entry_id': row_dict.get('entry_id'), | |
| 'en_term': row_dict.get('en_term'), | |
| 'derivative': row_dict.get('derivative'), | |
| 'link_type': row_dict.get('link_type'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO derivatives ( | |
| derivative_id, entry_id, derivative_term, language, link_type | |
| ) VALUES (?, ?, ?, 'en', ?) | |
| ''', ( | |
| derivative_data['derivative_id'], derivative_data['entry_id'], | |
| derivative_data['derivative'], derivative_data['link_type'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {derivative_data}") | |
| print(f" Migrated {count} derivatives") | |
| return count | |
| def migrate_a5_cross_refs(conn, cursor, wb): | |
| """Migrate A5_CROSS_REFS sheet to cross_refs table.""" | |
| print(" Migrating A5_CROSS_REFS...") | |
| if "A5_CROSS_REFS" not in wb.sheetnames: | |
| print(" Sheet A5_CROSS_REFS not found") | |
| return 0 | |
| ws = wb["A5_CROSS_REFS"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in A5_CROSS_REFS") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| crossref_data = { | |
| 'xref_id': row_dict.get('xref_id'), | |
| 'from_id': row_dict.get('from_id'), | |
| 'to_id': row_dict.get('to_id'), | |
| 'link_type': row_dict.get('link_type'), | |
| 'description': row_dict.get('description'), | |
| 'layer_ref': row_dict.get('layer_ref'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO cross_refs ( | |
| xref_id, from_entry_id, to_entry_id, link_type, description, layer_ref | |
| ) VALUES (?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| crossref_data['xref_id'], crossref_data['from_id'], | |
| crossref_data['to_id'], crossref_data['link_type'], | |
| crossref_data['description'], crossref_data['layer_ref'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {crossref_data}") | |
| print(f" Migrated {count} cross-references") | |
| return count | |
| def migrate_a3_quran_refs(conn, cursor, wb): | |
| """Migrate A3_QURAN_REFS sheet to quran_refs table.""" | |
| print(" Migrating A3_QURAN_REFS...") | |
| if "A3_QURAN_REFS" not in wb.sheetnames: | |
| print(" Sheet A3_QURAN_REFS not found") | |
| return 0 | |
| ws = wb["A3_QURAN_REFS"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in A3_QURAN_REFS") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| quran_data = { | |
| 'ref_id': row_dict.get('ref_id'), | |
| 'surah': row_dict.get('surah'), | |
| 'ayah': row_dict.get('ayah'), | |
| 'arabic_text': row_dict.get('arabic_text'), | |
| 'relevance': row_dict.get('relevance'), | |
| 'entry_ids': row_dict.get('entry_ids'), | |
| 'network_id': row_dict.get('network_id'), | |
| 'layer_ref': row_dict.get('layer_ref'), | |
| 'qv_id': row_dict.get('qv_id'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO quran_refs ( | |
| ref_id, surah, ayah, arabic_text, relevance, | |
| entry_ids, network_id, layer_ref, qv_id | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| quran_data['ref_id'], quran_data['surah'], quran_data['ayah'], | |
| quran_data['arabic_text'], quran_data['relevance'], | |
| quran_data['entry_ids'], quran_data['network_id'], | |
| quran_data['layer_ref'], quran_data['qv_id'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {quran_data}") | |
| print(f" Migrated {count} Qur'an references") | |
| return count | |
| def migrate_m1_phonetic_shifts(conn, cursor, wb): | |
| """Migrate M1_PHONETIC_SHIFTS sheet to phonetic_shifts table.""" | |
| print(" Migrating M1_PHONETIC_SHIFTS...") | |
| if "M1_PHONETIC_SHIFTS" not in wb.sheetnames: | |
| print(" Sheet M1_PHONETIC_SHIFTS not found") | |
| return 0 | |
| ws = wb["M1_PHONETIC_SHIFTS"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in M1_PHONETIC_SHIFTS") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| shift_data = { | |
| 'shift_id': row_dict.get('shift_id'), | |
| 'ar_letter': row_dict.get('ar_letter'), | |
| 'ar_name': row_dict.get('ar_name'), | |
| 'en_outputs': row_dict.get('en_outputs'), | |
| 'direction': row_dict.get('direction'), | |
| 'examples': row_dict.get('examples'), | |
| 'entry_ids': row_dict.get('entry_ids'), | |
| 'foundation_ref': row_dict.get('foundation_ref'), | |
| 'ru_outputs': row_dict.get('ru_outputs'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO phonetic_shifts ( | |
| shift_id, ar_letter, ar_name, en_outputs, direction, | |
| examples, entry_ids, foundation_ref, ru_outputs | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| shift_data['shift_id'], shift_data['ar_letter'], shift_data['ar_name'], | |
| shift_data['en_outputs'], shift_data['direction'], shift_data['examples'], | |
| shift_data['entry_ids'], shift_data['foundation_ref'], shift_data['ru_outputs'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {shift_data}") | |
| print(f" Migrated {count} phonetic shifts") | |
| return count | |
| def migrate_m2_detection_patterns(conn, cursor, wb): | |
| """Migrate M2_DETECTION_PATTERNS sheet to detection_patterns table.""" | |
| print(" Migrating M2_DETECTION_PATTERNS...") | |
| if "M2_DETECTION_PATTERNS" not in wb.sheetnames: | |
| print(" Sheet M2_DETECTION_PATTERNS not found") | |
| return 0 | |
| ws = wb["M2_DETECTION_PATTERNS"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in M2_DETECTION_PATTERNS") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| pattern_data = { | |
| 'pattern_id': row_dict.get('pattern_id'), | |
| 'name': row_dict.get('name'), | |
| 'level': row_dict.get('level'), | |
| 'description': row_dict.get('description'), | |
| 'triggers': row_dict.get('triggers'), | |
| 'qur_ref': row_dict.get('qur_ref'), | |
| 'example': row_dict.get('example'), | |
| 'entry_ids': row_dict.get('entry_ids'), | |
| 'foundation_ref': row_dict.get('foundation_ref'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO detection_patterns ( | |
| pattern_id, name, level, description, triggers, | |
| qur_ref, example, entry_ids, foundation_ref | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| pattern_data['pattern_id'], pattern_data['name'], pattern_data['level'], | |
| pattern_data['description'], pattern_data['triggers'], pattern_data['qur_ref'], | |
| pattern_data['example'], pattern_data['entry_ids'], pattern_data['foundation_ref'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {pattern_data}") | |
| print(f" Migrated {count} detection patterns") | |
| return count | |
| def migrate_m4_networks(conn, cursor, wb): | |
| """Migrate M4_NETWORKS sheet to networks table.""" | |
| print(" Migrating M4_NETWORKS...") | |
| if "M4_NETWORKS" not in wb.sheetnames: | |
| print(" Sheet M4_NETWORKS not found") | |
| return 0 | |
| ws = wb["M4_NETWORKS"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in M4_NETWORKS") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| network_data = { | |
| 'network_id': row_dict.get('network_id'), | |
| 'name': row_dict.get('name'), | |
| 'title': row_dict.get('title'), | |
| 'link_verse': row_dict.get('link_verse'), | |
| 'description': row_dict.get('description'), | |
| 'mechanism': row_dict.get('mechanism'), | |
| 'entry_ids': row_dict.get('entry_ids'), | |
| 'status': row_dict.get('status'), | |
| 'foundation_ref': row_dict.get('foundation_ref'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO networks ( | |
| network_id, name, title, link_verse, description, | |
| mechanism, entry_ids, status, foundation_ref | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| network_data['network_id'], network_data['name'], network_data['title'], | |
| network_data['link_verse'], network_data['description'], network_data['mechanism'], | |
| network_data['entry_ids'], network_data['status'], network_data['foundation_ref'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {network_data}") | |
| print(f" Migrated {count} networks") | |
| return count | |
| def migrate_m3_scholars(conn, cursor, wb): | |
| """Migrate M3_SCHOLARS sheet to scholars table.""" | |
| print(" Migrating M3_SCHOLARS...") | |
| if "M3_SCHOLARS" not in wb.sheetnames: | |
| print(" Sheet M3_SCHOLARS not found") | |
| return 0 | |
| ws = wb["M3_SCHOLARS"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in M3_SCHOLARS") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| scholar_data = { | |
| 'scholar_id': row_dict.get('scholar_id'), | |
| 'verified_name': row_dict.get('verified_name'), | |
| 'birth_place': row_dict.get('birthplace'), | |
| 'identity': row_dict.get('identity'), | |
| 'role': row_dict.get('role'), | |
| 'achievement': row_dict.get('achievement'), | |
| 'lies_applied': row_dict.get('lies_applied'), | |
| 'entry_ids': row_dict.get('entry_ids'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO scholars ( | |
| scholar_id, verified_name, birth_place, identity, | |
| role, achievement, lies_applied, entry_ids | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| scholar_data['scholar_id'], scholar_data['verified_name'], | |
| scholar_data['birth_place'], scholar_data['identity'], | |
| scholar_data['role'], scholar_data['achievement'], | |
| scholar_data['lies_applied'], scholar_data['entry_ids'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {scholar_data}") | |
| print(f" Migrated {count} scholars") | |
| return count | |
| def migrate_m5_qur_verification(conn, cursor, wb): | |
| """Migrate M5_QUR_VERIFICATION sheet to qur_verification table.""" | |
| print(" Migrating M5_QUR_VERIFICATION...") | |
| if "M5_QUR_VERIFICATION" not in wb.sheetnames: | |
| print(" Sheet M5_QUR_VERIFICATION not found") | |
| return 0 | |
| ws = wb["M5_QUR_VERIFICATION"] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows or len(rows) < 2: | |
| print(" No data in M5_QUR_VERIFICATION") | |
| return 0 | |
| headers = [clean_column_name(cell) for cell in rows[0]] | |
| count = 0 | |
| for i, row in enumerate(rows[1:], start=1): | |
| if not any(row): | |
| continue | |
| row_dict = {} | |
| for j, cell in enumerate(row): | |
| if j < len(headers): | |
| row_dict[headers[j]] = cell | |
| qv_data = { | |
| 'qv_id': row_dict.get('qv_id'), | |
| 'name': row_dict.get('name'), | |
| 'mechanism': row_dict.get('mechanism'), | |
| 'description': row_dict.get('description'), | |
| 'markers': row_dict.get('markers'), | |
| 'qur_refs': row_dict.get('qur_refs'), | |
| 'contrast_refs': row_dict.get('contrast_refs'), | |
| 'foundation_ref': row_dict.get('foundation_ref'), | |
| } | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO qur_verification ( | |
| qv_id, name, mechanism, description, markers, | |
| qur_refs, contrast_refs, foundation_ref | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| qv_data['qv_id'], qv_data['name'], qv_data['mechanism'], | |
| qv_data['description'], qv_data['markers'], qv_data['qur_refs'], | |
| qv_data['contrast_refs'], qv_data['foundation_ref'] | |
| )) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i}: {e}") | |
| print(f" Data: {qv_data}") | |
| print(f" Migrated {count} Qur'an verification mechanisms") | |
| return count | |
| def extract_and_insert_roots(conn, cursor): | |
| """Extract unique roots from all entries and insert into roots table.""" | |
| print(" Extracting roots from entries...") | |
| # First, collect all root_id values already present in entries | |
| cursor.execute(''' | |
| SELECT DISTINCT root_id | |
| FROM entries | |
| WHERE root_id IS NOT NULL AND root_id != '' | |
| ''') | |
| existing_root_ids = {row[0] for row in cursor.fetchall()} | |
| # For each existing root_id, ensure it exists in roots table | |
| count_existing = 0 | |
| for root_id in existing_root_ids: | |
| cursor.execute('SELECT 1 FROM roots WHERE root_id = ?', (root_id,)) | |
| if not cursor.fetchone(): | |
| # Root doesn't exist, create a minimal entry | |
| cursor.execute(''' | |
| INSERT INTO roots (root_id, root_letters, root_bare, notes) | |
| VALUES (?, ?, ?, ?) | |
| ''', (root_id, f"Root {root_id}", root_id, "Created during migration for existing entries")) | |
| count_existing += 1 | |
| # Get all unique root_letters from entries that don't have root_id yet | |
| cursor.execute(''' | |
| SELECT DISTINCT root_letters | |
| FROM entries | |
| WHERE root_letters IS NOT NULL AND root_letters != '' | |
| AND (root_id IS NULL OR root_id = '') | |
| ''') | |
| root_rows = cursor.fetchall() | |
| count_new = 0 | |
| for root_letters, in root_rows: | |
| if not root_letters: | |
| continue | |
| # Generate root_id (R{count+1:03d}) | |
| # But first check if we already have a root with these letters | |
| cursor.execute('SELECT root_id FROM roots WHERE root_letters = ?', (root_letters,)) | |
| existing = cursor.fetchone() | |
| if existing: | |
| root_id = existing[0] | |
| else: | |
| # Count existing roots to generate new ID | |
| cursor.execute('SELECT COUNT(*) FROM roots') | |
| root_count = cursor.fetchone()[0] | |
| root_id = f"R{root_count + 1:03d}" | |
| # Create bare root (without hyphens) | |
| root_bare = re.sub(r'[\-\s]', '', root_letters) | |
| # Count entries with this root | |
| cursor.execute('SELECT COUNT(*) FROM entries WHERE root_letters = ?', (root_letters,)) | |
| entry_count = cursor.fetchone()[0] | |
| # Try to extract root type from pattern | |
| root_type = 'TRILITERAL' if len(root_bare) == 3 else 'QUADRILITERAL' if len(root_bare) == 4 else 'OTHER' | |
| # Insert into roots table | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO roots ( | |
| root_id, root_letters, root_bare, root_type, notes | |
| ) VALUES (?, ?, ?, ?, ?) | |
| ''', ( | |
| root_id, root_letters, root_bare, root_type, | |
| f"Extracted from {entry_count} entries during migration" | |
| )) | |
| count_new += 1 | |
| except Exception as e: | |
| print(f" Error inserting root {root_letters}: {e}") | |
| continue | |
| # Update entries with the root_id | |
| cursor.execute(''' | |
| UPDATE entries | |
| SET root_id = ? | |
| WHERE root_letters = ? AND (root_id IS NULL OR root_id = '') | |
| ''', (root_id, root_letters)) | |
| print(f" Created {count_existing} roots for existing IDs, {count_new} new roots from root_letters") | |
| return count_existing + count_new | |
| def create_word_fingerprints(conn, cursor): | |
| """Create word_fingerprints entries for all searchable terms.""" | |
| print(" Creating word_fingerprints...") | |
| # The triggers should have already created fingerprints when entries were inserted | |
| # But let's verify and create any missing ones | |
| cursor.execute('SELECT COUNT(*) FROM word_fingerprints') | |
| count = cursor.fetchone()[0] | |
| print(f" {count} fingerprints already created by triggers") | |
| return count | |
| # ============================================================================ | |
| # MAIN MIGRATION FUNCTION | |
| # ============================================================================ | |
| def main(): | |
| print("\n" + "โ" * 70) | |
| print(" USLaP Migration: Excel โ SQLite Relational Database") | |
| print(" ุจูุณูู ู ุงูููููู ุงูุฑููุญูู ููฐูู ุงูุฑููุญููู ู") | |
| print("โ" * 70) | |
| # Check if files exist | |
| if not os.path.exists(EXCEL_PATH): | |
| print(f"\nโ ERROR: Excel file not found: {EXCEL_PATH}") | |
| sys.exit(1) | |
| if not os.path.exists(SCHEMA_PATH): | |
| print(f"\nโ ERROR: Schema file not found: {SCHEMA_PATH}") | |
| sys.exit(1) | |
| # Backup existing database if it exists | |
| if os.path.exists(DB_PATH): | |
| backup_path = backup_database(DB_PATH) | |
| if backup_path: | |
| print(f"\nโ Backed up existing database to: {backup_path}") | |
| # Remove the existing database file to start fresh | |
| os.remove(DB_PATH) | |
| print(f" Removed existing database file to start fresh") | |
| # Load Excel workbook | |
| print(f"\n๐ Loading Excel file: {EXCEL_PATH}") | |
| try: | |
| wb = openpyxl.load_workbook(EXCEL_PATH, read_only=True, data_only=True) | |
| print(f" Found {len(wb.sheetnames)} sheets") | |
| except Exception as e: | |
| print(f"\nโ ERROR: Failed to load Excel file: {e}") | |
| sys.exit(1) | |
| # Create database and execute schema | |
| print(f"\n๐๏ธ Creating database: {DB_PATH}") | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Enable foreign keys | |
| cursor.execute("PRAGMA foreign_keys = ON") | |
| # Read and execute schema | |
| with open(SCHEMA_PATH, 'r', encoding='utf-8') as f: | |
| schema_sql = f.read() | |
| print(" Executing schema...") | |
| cursor.executescript(schema_sql) | |
| # Register extract_consonants UDF (CRITICAL: must be done before any inserts) | |
| print(" Registering extract_consonants() UDF...") | |
| conn.create_function("extract_consonants", 1, extract_consonants) | |
| # Disable foreign keys during data migration to avoid constraint violations | |
| cursor.execute("PRAGMA foreign_keys = OFF") | |
| # Begin transaction | |
| conn.execute("BEGIN TRANSACTION") | |
| # Migrate each sheet | |
| print("\n๐ Migrating data...") | |
| total_counts = {} | |
| # Order matters: create entries first, then related tables | |
| total_counts['A1_ENTRIES'] = migrate_a1_entries(conn, cursor, wb) | |
| total_counts['A1_ะะะะะกะ'] = migrate_a1_zapisi(conn, cursor, wb) | |
| total_counts['PERSIAN_A1_MADฤKHIL'] = migrate_persian_a1_madakhil(conn, cursor, wb) | |
| total_counts['BITIG_A1_ENTRIES'] = migrate_bitig_a1_entries(conn, cursor, wb) | |
| # Extract and insert roots (must be done before other tables that reference roots) | |
| total_counts['roots'] = extract_and_insert_roots(conn, cursor) | |
| # Migrate remaining tables | |
| total_counts['CHILD_SCHEMA'] = migrate_child_schema(conn, cursor, wb) | |
| total_counts['A4_DERIVATIVES'] = migrate_a4_derivatives(conn, cursor, wb) | |
| total_counts['A5_CROSS_REFS'] = migrate_a5_cross_refs(conn, cursor, wb) | |
| total_counts['A3_QURAN_REFS'] = migrate_a3_quran_refs(conn, cursor, wb) | |
| total_counts['M1_PHONETIC_SHIFTS'] = migrate_m1_phonetic_shifts(conn, cursor, wb) | |
| total_counts['M2_DETECTION_PATTERNS'] = migrate_m2_detection_patterns(conn, cursor, wb) | |
| total_counts['M4_NETWORKS'] = migrate_m4_networks(conn, cursor, wb) | |
| total_counts['M3_SCHOLARS'] = migrate_m3_scholars(conn, cursor, wb) | |
| total_counts['M5_QUR_VERIFICATION'] = migrate_m5_qur_verification(conn, cursor, wb) | |
| # Create word fingerprints (triggers should have done this, but verify) | |
| total_counts['word_fingerprints'] = create_word_fingerprints(conn, cursor) | |
| # Commit transaction | |
| conn.commit() | |
| # Re-enable foreign keys and check constraints | |
| cursor.execute("PRAGMA foreign_keys = ON") | |
| # Generate statistics | |
| print("\n๐ Migration Statistics:") | |
| print("โ" * 40) | |
| cursor.execute("SELECT COUNT(*) FROM entries") | |
| entries_count = cursor.fetchone()[0] | |
| print(f" Total entries: {entries_count}") | |
| cursor.execute("SELECT COUNT(*) FROM roots") | |
| roots_count = cursor.fetchone()[0] | |
| print(f" Total roots: {roots_count}") | |
| cursor.execute("SELECT COUNT(*) FROM child_entries") | |
| child_count = cursor.fetchone()[0] | |
| print(f" Child entries: {child_count}") | |
| cursor.execute("SELECT COUNT(*) FROM word_fingerprints") | |
| fingerprints_count = cursor.fetchone()[0] | |
| print(f" Word fingerprints: {fingerprints_count}") | |
| cursor.execute("SELECT COUNT(*) FROM engine_queue") | |
| queue_count = cursor.fetchone()[0] | |
| print(f" Engine queue items: {queue_count}") | |
| # Verify foreign keys | |
| print("\n๐ Verifying foreign key constraints...") | |
| cursor.execute("PRAGMA foreign_key_check") | |
| fk_errors = cursor.fetchall() | |
| if fk_errors: | |
| print("โ Foreign key errors found:") | |
| for error in fk_errors: | |
| print(f" Table: {error[0]}, Row: {error[1]}, Referenced: {error[2]}, FK Index: {error[3]}") | |
| else: | |
| print("โ All foreign key constraints satisfied") | |
| # Close workbook and connection | |
| wb.close() | |
| conn.close() | |
| print("\n" + "โ" * 70) | |
| print("โ MIGRATION COMPLETED SUCCESSFULLY") | |
| print(f"โ Database: {DB_PATH}") | |
| print("โ" * 70) | |
| # Print next steps | |
| print("\n๐ Next steps:") | |
| print(" 1. Test the database with: sqlite3 uslap_lattice.db '.tables'") | |
| print(" 2. Verify data integrity with queries") | |
| print(" 3. Update db_access_layer.py to use the new schema") | |
| print(" 4. Run USLaP_Engine.py to test cluster expansion") | |
| except Exception as e: | |
| print(f"\nโ ERROR during migration: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Rollback if connection is still open | |
| try: | |
| conn.rollback() | |
| conn.close() | |
| except: | |
| pass | |
| print("\nโ ๏ธ Migration failed. Database has been rolled back.") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |