Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| USLaP Schema Migration — Restore FK-enforced architecture | |
| بِسْمِ اللَّهِ الرَّحْمَٰنِ الرَّحِيمِ | |
| Migrates current 193-table DB → correct schema from create_uslap_db.sql | |
| - Builds `roots` table from root_translations + all A1 root_ids | |
| - Merges 6 A1 tables into unified `entries` with FK to roots | |
| - Migrates all other tables with FK links | |
| - Preserves ALL existing data | |
| - Copies contamination triggers | |
| - Restores PRAGMA foreign_keys = ON | |
| Usage: | |
| python3 Code_files/migrate_to_schema.py check # dry run — show what will happen | |
| python3 Code_files/migrate_to_schema.py migrate # execute migration | |
| """ | |
| import sqlite3 | |
| import sys | |
| import os | |
| import json | |
| from datetime import datetime | |
| OLD_DB = "/Users/mmsetubal/Documents/USLaP workplace/Code_files/uslap_database_v3.db" | |
| NEW_DB = "/Users/mmsetubal/Documents/USLaP workplace/Code_files/uslap_database_v4_migrated.db" | |
| SCHEMA_SQL = "/Users/mmsetubal/Documents/USLaP workplace/Code_files/create_uslap_db.sql" | |
| def connect_old(): | |
| conn = sqlite3.connect(OLD_DB) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def connect_new(): | |
| conn = sqlite3.connect(NEW_DB) | |
| conn.execute("PRAGMA foreign_keys = ON") | |
| conn.execute("PRAGMA journal_mode = WAL") | |
| return conn | |
| # ============================================================================ | |
| # STEP 1: Build roots table from all sources | |
| # ============================================================================ | |
| def build_roots(old, new): | |
| """Collect all unique roots from all tables, merge with root_translations.""" | |
| print("\n[STEP 1] Building roots table...") | |
| roots = {} # root_id -> {root_letters, root_bare, quran_tokens, quran_lemmas, bitig_attested} | |
| # Source 1: root_translations (has Qur'anic data) | |
| rows = old.execute(""" | |
| SELECT root_hyphenated, root_unhyphenated, token_count, form_count, first_ayah | |
| FROM root_translations | |
| """).fetchall() | |
| for r in rows: | |
| # We need to find a root_id for this root | |
| # Check if any entry references it | |
| root_h = r['root_hyphenated'] | |
| root_bare = r['root_unhyphenated'] | |
| roots[root_h] = { | |
| 'root_letters': root_h, | |
| 'root_bare': root_bare, | |
| 'quran_tokens': r['token_count'] or 0, | |
| 'quran_lemmas': r['form_count'] or 0, | |
| 'root_id': None, # will be assigned | |
| 'bitig_attested': False, | |
| 'first_ayah': r['first_ayah'] | |
| } | |
| # Source 2: EN entries (has root_id mapping) | |
| en_roots = old.execute(""" | |
| SELECT DISTINCT root_id, root_letters | |
| FROM a1_entries | |
| WHERE root_id IS NOT NULL AND root_id != '' | |
| """).fetchall() | |
| for r in en_roots: | |
| rl = r['root_letters'] | |
| rid = r['root_id'] | |
| if rl and rl in roots: | |
| roots[rl]['root_id'] = rid | |
| elif rl: | |
| roots[rl] = { | |
| 'root_letters': rl, | |
| 'root_bare': rl.replace('-', '').replace('‑', ''), | |
| 'quran_tokens': 0, | |
| 'quran_lemmas': 0, | |
| 'root_id': rid, | |
| 'bitig_attested': False, | |
| 'first_ayah': None | |
| } | |
| # Source 3: RU entries | |
| ru_roots = old.execute(""" | |
| SELECT DISTINCT корень_id, корневые_буквы | |
| FROM [a1_записи] | |
| WHERE корень_id IS NOT NULL AND корень_id != '' | |
| """).fetchall() | |
| for r in ru_roots: | |
| rl = r['корневые_буквы'] | |
| rid = r['корень_id'] | |
| if rl and rl in roots and not roots[rl]['root_id']: | |
| roots[rl]['root_id'] = rid | |
| elif rl and rl not in roots: | |
| roots[rl] = { | |
| 'root_letters': rl, | |
| 'root_bare': rl.replace('-', '').replace('‑', ''), | |
| 'quran_tokens': 0, | |
| 'quran_lemmas': 0, | |
| 'root_id': rid, | |
| 'bitig_attested': False, | |
| 'first_ayah': None | |
| } | |
| # Source 4: FA entries | |
| fa_roots = old.execute(""" | |
| SELECT DISTINCT [r_she_id_ریشِه_root_id] as root_id, [hor_f_e_r_she_حُروفِ_ریشِه_root_letters] as root_letters | |
| FROM persian_a1_mad_khil | |
| WHERE [r_she_id_ریشِه_root_id] IS NOT NULL AND [r_she_id_ریشِه_root_id] != '' | |
| """).fetchall() | |
| for r in fa_roots: | |
| rl = r['root_letters'] | |
| rid = r['root_id'] | |
| if rl and rl in roots and not roots[rl]['root_id']: | |
| roots[rl]['root_id'] = rid | |
| elif rl and rl not in roots: | |
| roots[rl] = { | |
| 'root_letters': rl, | |
| 'root_bare': rl.replace('-', '').replace('‑', ''), | |
| 'quran_tokens': 0, | |
| 'quran_lemmas': 0, | |
| 'root_id': rid, | |
| 'bitig_attested': False, | |
| 'first_ayah': None | |
| } | |
| # Source 5: Latin entries | |
| lat_roots = old.execute(""" | |
| SELECT DISTINCT root_id, root_letters | |
| FROM latin_a1_entries | |
| WHERE root_id IS NOT NULL AND root_id != '' | |
| """).fetchall() | |
| for r in lat_roots: | |
| rl = r['root_letters'] | |
| rid = r['root_id'] | |
| if rl and rl in roots and not roots[rl]['root_id']: | |
| roots[rl]['root_id'] = rid | |
| elif rl and rl not in roots: | |
| roots[rl] = { | |
| 'root_letters': rl, | |
| 'root_bare': rl.replace('-', '').replace('‑', ''), | |
| 'quran_tokens': 0, | |
| 'quran_lemmas': 0, | |
| 'root_id': rid, | |
| 'bitig_attested': False, | |
| 'first_ayah': None | |
| } | |
| # Source 6: EU entries | |
| eu_roots = old.execute(""" | |
| SELECT DISTINCT root_id, root_letters | |
| FROM european_a1_entries | |
| WHERE root_id IS NOT NULL AND root_id != '' | |
| """).fetchall() | |
| for r in eu_roots: | |
| rl = r['root_letters'] | |
| rid = r['root_id'] | |
| if rl and rl in roots and not roots[rl]['root_id']: | |
| roots[rl]['root_id'] = rid | |
| elif rl and rl not in roots: | |
| roots[rl] = { | |
| 'root_letters': rl, | |
| 'root_bare': rl.replace('-', '').replace('‑', ''), | |
| 'quran_tokens': 0, | |
| 'quran_lemmas': 0, | |
| 'root_id': rid, | |
| 'bitig_attested': False, | |
| 'first_ayah': None | |
| } | |
| # Source 7: Bitig attestation | |
| bitig_roots = old.execute(""" | |
| SELECT DISTINCT root_letters | |
| FROM bitig_a1_entries | |
| WHERE root_letters IS NOT NULL AND root_letters != '' | |
| """).fetchall() | |
| for r in bitig_roots: | |
| rl = r['root_letters'] | |
| if rl and rl in roots: | |
| roots[rl]['bitig_attested'] = True | |
| # Source 8: Names of Allah | |
| allah_roots = old.execute(""" | |
| SELECT allah_id, root_id | |
| FROM a2_names_of_allah | |
| WHERE root_id IS NOT NULL AND root_id != '' | |
| """).fetchall() | |
| for r in allah_roots: | |
| rid = r['root_id'] | |
| # Find matching root_letters | |
| for rl, data in roots.items(): | |
| if data['root_id'] == rid: | |
| break | |
| # Assign root_ids to roots that don't have one | |
| # Use T### for Bitig-only, R### for AA | |
| max_r = 0 | |
| max_t = 0 | |
| for rl, data in roots.items(): | |
| if data['root_id']: | |
| rid = data['root_id'] | |
| if rid.startswith('R') and rid[1:].isdigit(): | |
| max_r = max(max_r, int(rid[1:])) | |
| elif rid.startswith('T') and rid[1:].isdigit(): | |
| max_t = max(max_t, int(rid[1:])) | |
| for rl, data in roots.items(): | |
| if not data['root_id']: | |
| if data['bitig_attested'] and data['quran_tokens'] == 0: | |
| max_t += 1 | |
| data['root_id'] = f"T{max_t:03d}" | |
| else: | |
| max_r += 1 | |
| data['root_id'] = f"R{max_r:03d}" | |
| # Determine root type | |
| for rl, data in roots.items(): | |
| bare = data['root_bare'] | |
| # Count actual letters (not hyphens, dashes) | |
| letters = [c for c in bare if c not in ('-', '‑', ' ', '\u200c')] | |
| n = len(letters) | |
| if n <= 2: | |
| data['root_type'] = 'BILITERAL' | |
| elif n == 3: | |
| data['root_type'] = 'TRILITERAL' | |
| elif n == 4: | |
| data['root_type'] = 'QUADRILITERAL' | |
| else: | |
| data['root_type'] = 'EXTENDED' | |
| # Insert into new DB | |
| inserted = 0 | |
| for rl, data in roots.items(): | |
| try: | |
| new.execute(""" | |
| INSERT INTO roots (root_id, root_letters, root_bare, root_type, | |
| quran_tokens, quran_lemmas, bitig_attested, primary_meaning) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| data['root_id'], data['root_letters'], data['root_bare'], | |
| data['root_type'], data['quran_tokens'], data['quran_lemmas'], | |
| data['bitig_attested'], None # no contaminated meanings | |
| )) | |
| inserted += 1 | |
| except sqlite3.IntegrityError as e: | |
| print(f" SKIP duplicate root_id {data['root_id']} for {rl}: {e}") | |
| new.commit() | |
| print(f" Inserted {inserted} roots. Max R={max_r}, Max T={max_t}") | |
| # Build lookup: root_letters -> root_id | |
| root_lookup = {rl: data['root_id'] for rl, data in roots.items()} | |
| root_id_lookup = {data['root_id']: rl for rl, data in roots.items()} | |
| return root_lookup, root_id_lookup | |
| # ============================================================================ | |
| # STEP 2: Migrate entries (unified table) | |
| # ============================================================================ | |
| def migrate_entries(old, new, root_lookup): | |
| """Merge all A1 tables into unified entries table.""" | |
| print("\n[STEP 2] Migrating entries...") | |
| # We need to track old->new ID mapping for FK references | |
| id_map = {} # (source_table, old_id) -> new_entry_id | |
| entry_id = 0 | |
| # Build set of valid root_ids in new DB | |
| valid_root_ids = set(r[0] for r in new.execute("SELECT root_id FROM roots").fetchall()) | |
| print(f" Valid root_ids in roots table: {len(valid_root_ids)}") | |
| # EN entries | |
| en_rows = old.execute("SELECT * FROM a1_entries ORDER BY entry_id").fetchall() | |
| print(f" EN: {len(en_rows)} entries") | |
| en_orphan_roots = 0 | |
| for r in en_rows: | |
| entry_id += 1 | |
| rid = r['root_id'] | |
| root_letters = r['root_letters'] if 'root_letters' in r.keys() else None | |
| # Resolve: prefer root_letters lookup, fallback to original if in roots, else NULL | |
| if root_letters and root_letters in root_lookup: | |
| rid = root_lookup[root_letters] | |
| elif rid and rid in valid_root_ids: | |
| pass | |
| elif rid: | |
| rid = None | |
| en_orphan_roots += 1 | |
| new.execute(""" | |
| INSERT INTO entries (entry_id, score, en_term, ar_word, root_id, root_letters, | |
| qur_meaning, pattern, allah_name_id, network_id, | |
| phonetic_chain, inversion_type, source_form, foundation_refs, | |
| ds_corridor, decay_level) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| entry_id, r['score'], r['en_term'], r['ar_word'], | |
| rid, root_letters, r['qur_meaning'], r['pattern'], | |
| r['allah_name_id'], r['network_id'], r['phonetic_chain'], | |
| r['inversion_type'], r['source_form'], r['foundation_ref'], | |
| r['corridor'] if 'corridor' in r.keys() else None, | |
| r['decay_model'] if 'decay_model' in r.keys() else None | |
| )) | |
| id_map[('en', r['entry_id'])] = entry_id | |
| # RU entries | |
| ru_rows = old.execute("SELECT * FROM [a1_записи] ORDER BY запись_id").fetchall() | |
| print(f" RU: {len(ru_rows)} entries") | |
| ru_orphan_roots = 0 | |
| for r in ru_rows: | |
| entry_id += 1 | |
| rid = r['корень_id'] | |
| root_letters = r['корневые_буквы'] if 'корневые_буквы' in r.keys() else None | |
| # Resolve root_id: prefer lookup by root_letters, fallback to original if valid | |
| if root_letters and root_letters in root_lookup: | |
| rid = root_lookup[root_letters] | |
| elif rid and rid in valid_root_ids: | |
| pass # keep original | |
| elif rid and '+' in str(rid): | |
| rid = None # compound root_ids like R202+R423 — set NULL for now | |
| elif rid and rid not in valid_root_ids: | |
| rid = None # orphan — no matching root | |
| ru_orphan_roots += 1 | |
| new.execute(""" | |
| INSERT INTO entries (entry_id, score, ru_term, ar_word, root_id, root_letters, | |
| qur_meaning, pattern, allah_name_id, network_id, | |
| phonetic_chain, inversion_type, source_form, foundation_refs) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| entry_id, r['балл'], r['рус_термин'], r['ар_слово'], | |
| rid, root_letters, r['коранич_значение'], r['паттерн'], | |
| r['имя_аллаха_id'], r['сеть_id'], r['фонетическая_цепь'], | |
| r['тип_инверсии'], r['исходная_форма'], r['основание'] | |
| )) | |
| id_map[('ru', r['запись_id'])] = entry_id | |
| if ru_orphan_roots: | |
| print(f" RU orphan roots (set NULL): {ru_orphan_roots}") | |
| # FA entries | |
| fa_rows = old.execute("SELECT * FROM persian_a1_mad_khil ORDER BY rowid").fetchall() | |
| print(f" FA: {len(fa_rows)} entries") | |
| for r in fa_rows: | |
| entry_id += 1 | |
| keys = r.keys() | |
| fa_term = r['v_zhe_f_rs__واژِهِ_فارسی_persian_term'] if 'v_zhe_f_rs__واژِهِ_فارسی_persian_term' in keys else None | |
| ar_word = r['kalame_a_l__کَلَمِه_اَصلی__عربی___بازنویسی___تَرجُمه__source_word__arabic___transliteration___translation'] if 'kalame_a_l__کَلَمِه_اَصلی__عربی___بازنویسی___تَرجُمه__source_word__arabic___transliteration___translation' in keys else None | |
| root_letters = r['hor_f_e_r_she_حُروفِ_ریشِه_root_letters'] if 'hor_f_e_r_she_حُروفِ_ریشِه_root_letters' in keys else None | |
| rid = r['r_she_id_ریشِه_root_id'] if 'r_she_id_ریشِه_root_id' in keys else None | |
| if root_letters and root_letters in root_lookup: | |
| rid = root_lookup[root_letters] | |
| elif rid and rid in valid_root_ids: | |
| pass | |
| elif rid: | |
| rid = None | |
| score = r['nomre_نُمره_score'] if 'nomre_نُمره_score' in keys else None | |
| new.execute(""" | |
| INSERT INTO entries (entry_id, score, fa_term, ar_word, root_id, root_letters, | |
| qur_meaning, pattern, allah_name_id, network_id, | |
| phonetic_chain, inversion_type, source_form, foundation_refs) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| entry_id, score, fa_term, ar_word, rid, root_letters, | |
| r['ma_n__ye_qur__n__مَعنایِ_قُرآنی__عربی___بازنویسی___تَرجُمه__qur_meaning__arabic___transliteration___translation'] if 'ma_n__ye_qur__n__مَعنایِ_قُرآنی__عربی___بازنویسی___تَرجُمه__qur_meaning__arabic___transliteration___translation' in keys else None, | |
| r['olg__اُلگو_pattern'] if 'olg__اُلگو_pattern' in keys else None, | |
| r['esm_e_all_h_id_اِسمِ_الله_allah_name_id'] if 'esm_e_all_h_id_اِسمِ_الله_allah_name_id' in keys else None, | |
| r['shabake_id_شَبَکِه_network_id'] if 'shabake_id_شَبَکِه_network_id' in keys else None, | |
| r['zanj_re__awt__زَنجیرِهِ_صَوتی_phonetic_chain'] if 'zanj_re__awt__زَنجیرِهِ_صَوتی_phonetic_chain' in keys else None, | |
| r['now__e_v_zhg_n__نَوعِ_واژگونی_inversion_type'] if 'now__e_v_zhg_n__نَوعِ_واژگونی_inversion_type' in keys else None, | |
| r['shakl_e_a_l__شَکلِ_اَصلی_source_form'] if 'shakl_e_a_l__شَکلِ_اَصلی_source_form' in keys else None, | |
| r['boniy_n_بُنیان_foundation_ref'] if 'boniy_n_بُنیان_foundation_ref' in keys else None, | |
| )) | |
| old_id = r['madkhal_id_مَدخَل_entry_id'] if 'madkhal_id_مَدخَل_entry_id' in keys else None | |
| id_map[('fa', old_id)] = entry_id | |
| new.commit() | |
| print(f" Total unified entries: {entry_id}") | |
| print(f" ID mappings: {len(id_map)}") | |
| return id_map | |
| # ============================================================================ | |
| # STEP 3: Migrate mechanism tables | |
| # ============================================================================ | |
| def migrate_mechanism(old, new): | |
| """Migrate M1-M5 tables.""" | |
| print("\n[STEP 3] Migrating mechanism tables...") | |
| # M1: phonetic_shifts | |
| rows = old.execute("SELECT * FROM m1_phonetic_shifts").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| new.execute(""" | |
| INSERT INTO phonetic_shifts (shift_id, ar_letter, ar_name, en_outputs, ru_outputs, | |
| description, examples, entry_ids, foundation_ref) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (r['shift_id'], r['ar_letter'], r['ar_name'], r['en_outputs'], | |
| r['ru_outputs'] if 'ru_outputs' in keys else None, | |
| r['direction'] if 'direction' in keys else None, | |
| r['examples'] if 'examples' in keys else None, | |
| r['entry_ids'] if 'entry_ids' in keys else None, | |
| r['foundation_ref'] if 'foundation_ref' in keys else None)) | |
| print(f" M1 shifts: {len(rows)}") | |
| # M2: detection_patterns | |
| rows = old.execute("SELECT * FROM m2_detection_patterns").fetchall() | |
| for r in rows: | |
| new.execute(""" | |
| INSERT INTO detection_patterns (pattern_id, name, level, description, triggers, qur_ref, example) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, (r['pattern_id'], r['name'], r['level'], r['description'], | |
| r['triggers'] if 'triggers' in r.keys() else None, | |
| r['qur_ref'] if 'qur_ref' in r.keys() else None, | |
| r['example'] if 'example' in r.keys() else None)) | |
| print(f" M2 detection: {len(rows)}") | |
| # M3: scholars | |
| rows = old.execute("SELECT * FROM m3_scholars").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| bp = r['birthplace'] if 'birthplace' in keys else (r['birth_place'] if 'birth_place' in keys else None) | |
| new.execute(""" | |
| INSERT INTO scholars (scholar_id, verified_name, birth_place, identity, role, | |
| achievement, lies_applied, death_fate, status) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (r['scholar_id'], r['verified_name'], bp, | |
| r['identity'], r['role'] if 'role' in keys else None, | |
| r['achievement'] if 'achievement' in keys else None, | |
| r['lies_applied'] if 'lies_applied' in keys else None, | |
| r['death_fate'] if 'death_fate' in keys else None, | |
| r['status'] if 'status' in keys else 'VERIFIED')) | |
| print(f" M3 scholars: {len(rows)}") | |
| # M4: networks | |
| rows = old.execute("SELECT * FROM m4_networks").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| new.execute(""" | |
| INSERT INTO networks (network_id, name, title, link_verse, description, mechanism, status) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, (r['network_id'], r['name'], r['title'] if 'title' in keys else None, | |
| r['link_verse'] if 'link_verse' in keys else None, | |
| r['description'], r['mechanism'] if 'mechanism' in keys else None, | |
| r['status'] if 'status' in keys else 'CONFIRMED')) | |
| print(f" M4 networks: {len(rows)}") | |
| new.commit() | |
| # ============================================================================ | |
| # STEP 4: Migrate application tables | |
| # ============================================================================ | |
| def migrate_application(old, new, root_lookup): | |
| """Migrate A2-A6 tables.""" | |
| print("\n[STEP 4] Migrating application tables...") | |
| # Get valid root_ids | |
| valid_root_ids = set(r[0] for r in new.execute("SELECT root_id FROM roots").fetchall()) | |
| # A2: names_of_allah | |
| rows = old.execute("SELECT * FROM a2_names_of_allah").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| rid = r['root_id'] if 'root_id' in keys else None | |
| if rid and rid not in valid_root_ids: | |
| rid = None # orphan root_id | |
| new.execute(""" | |
| INSERT INTO names_of_allah (allah_id, arabic_name, transliteration, meaning, | |
| qur_ref, entry_ids, root_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, (r['allah_id'], r['arabic_name'], r['transliteration'], | |
| r['meaning'], r['qur_ref'] if 'qur_ref' in keys else None, | |
| r['entry_ids'] if 'entry_ids' in keys else None, rid)) | |
| print(f" A2 names: {len(rows)}") | |
| # A3: quran_refs | |
| rows = old.execute("SELECT * FROM a3_quran_refs").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| try: | |
| new.execute(""" | |
| INSERT INTO quran_refs (ref_id, surah, ayah, arabic_text, transliteration, | |
| translation, relevance, entry_ids, layer_ref) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (r['ref_id'], r['surah'], r['ayah'], r['arabic_text'], | |
| r['transliteration'] if 'transliteration' in keys else None, | |
| r['translation'] if 'translation' in keys else None, | |
| r['relevance'] if 'relevance' in keys else None, | |
| r['entry_ids'] if 'entry_ids' in keys else None, | |
| r['layer_ref'] if 'layer_ref' in keys else None)) | |
| except sqlite3.IntegrityError: | |
| pass # skip duplicates | |
| print(f" A3 quran_refs: {len(rows)}") | |
| # A6: country_names | |
| rows = old.execute("SELECT * FROM a6_country_names").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| rid = r['root_id'] if 'root_id' in keys else None | |
| if rid and rid not in valid_root_ids: | |
| rid = None | |
| new.execute(""" | |
| INSERT INTO country_names (country_id, country_name, root_id, al_word, | |
| qur_meaning, phonetic_chain, naming_basis, notes) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (r['country_id'] if 'country_id' in keys else None, | |
| r['country_name'] if 'country_name' in keys else None, | |
| rid, | |
| r['al_word'] if 'al_word' in keys else None, | |
| r['qur_meaning'] if 'qur_meaning' in keys else None, | |
| r['phonetic_chain'] if 'phonetic_chain' in keys else None, | |
| r['naming_basis'] if 'naming_basis' in keys else None, | |
| r['notes'] if 'notes' in keys else None)) | |
| print(f" A6 country: {len(rows)}") | |
| new.commit() | |
| # ============================================================================ | |
| # STEP 5: Migrate child_schema → child_entries | |
| # ============================================================================ | |
| def migrate_child(old, new): | |
| """Migrate child_schema to child_entries.""" | |
| print("\n[STEP 5] Migrating child schema...") | |
| # Temporarily disable FK for child migration (data has evolved beyond original reference tables) | |
| new.execute("PRAGMA foreign_keys = OFF") | |
| # Get valid reference values | |
| valid_nt = set(r[0] for r in new.execute("SELECT nt_code FROM nt_codes").fetchall()) | |
| valid_op = set(r[0] for r in new.execute("SELECT op_code FROM operation_codes").fetchall()) | |
| rows = old.execute("SELECT * FROM child_schema").fetchall() | |
| for r in rows: | |
| keys = r.keys() | |
| nt = r['nt_code'] if 'nt_code' in keys else None | |
| parent = r['parent_op'] if 'parent_op' in keys else None | |
| if nt and nt not in valid_nt: | |
| nt = None # orphan nt_code | |
| if parent and parent not in valid_op: | |
| parent = None # orphan parent_op | |
| new.execute(""" | |
| INSERT INTO child_entries (child_id, shell_name, shell_language, orig_class, | |
| orig_root, orig_lemma, orig_meaning, operation_role, | |
| shell_meaning, inversion_direction, phonetic_chain, | |
| qur_anchors, dp_codes, nt_code, pattern, parent_op, | |
| gate_status, notes) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| r['entry_id'] if 'entry_id' in keys else (r['child_id'] if 'child_id' in keys else None), | |
| r['shell_name'] if 'shell_name' in keys else None, | |
| r['shell_language'] if 'shell_language' in keys else None, | |
| r['orig_class'] if 'orig_class' in keys else None, | |
| r['orig_root'] if 'orig_root' in keys else None, | |
| r['orig_lemma'] if 'orig_lemma' in keys else None, | |
| r['orig_meaning'] if 'orig_meaning' in keys else None, | |
| r['operation_role'] if 'operation_role' in keys else None, | |
| r['shell_meaning'] if 'shell_meaning' in keys else None, | |
| r['inversion_direction'] if 'inversion_direction' in keys else None, | |
| r['phonetic_chain'] if 'phonetic_chain' in keys else None, | |
| r['qur_anchors'] if 'qur_anchors' in keys else None, | |
| r['dp_codes'] if 'dp_codes' in keys else None, | |
| nt, | |
| r['pattern'] if 'pattern' in keys else None, | |
| parent, | |
| r['gate_status'] if 'gate_status' in keys else None, | |
| r['notes'] if 'notes' in keys else None | |
| )) | |
| print(f" Child entries: {len(rows)}") | |
| new.commit() | |
| # Re-enable FK | |
| new.execute("PRAGMA foreign_keys = ON") | |
| # ============================================================================ | |
| # STEP 6: Copy tables that stay as-is (Qur'anic, Body, Bitig, Protocol, etc.) | |
| # ============================================================================ | |
| def copy_table_raw(old, new, table_name, new_table_name=None): | |
| """Copy a table from old to new DB as-is (no FK, just data preservation).""" | |
| target = new_table_name or table_name | |
| try: | |
| cols = old.execute(f"PRAGMA table_info([{table_name}])").fetchall() | |
| if not cols: | |
| return 0 | |
| col_names = [c['name'] for c in cols] | |
| col_list = ', '.join([f'[{c}]' for c in col_names]) | |
| # Create table in new DB if it doesn't exist | |
| create_sql = old.execute( | |
| f"SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,) | |
| ).fetchone() | |
| if create_sql: | |
| sql = create_sql[0] | |
| if new_table_name: | |
| sql = sql.replace(f'CREATE TABLE [{table_name}]', f'CREATE TABLE [{target}]', 1) | |
| sql = sql.replace(f'CREATE TABLE {table_name}', f'CREATE TABLE [{target}]', 1) | |
| try: | |
| new.execute(sql) | |
| except sqlite3.OperationalError: | |
| pass # table already exists | |
| rows = old.execute(f"SELECT {col_list} FROM [{table_name}]").fetchall() | |
| placeholders = ', '.join(['?' for _ in col_names]) | |
| for r in rows: | |
| try: | |
| new.execute(f"INSERT INTO [{target}] ({col_list}) VALUES ({placeholders})", | |
| [r[c] for c in col_names]) | |
| except (sqlite3.IntegrityError, sqlite3.OperationalError): | |
| pass | |
| new.commit() | |
| return len(rows) | |
| except Exception as e: | |
| print(f" ERROR copying {table_name}: {e}") | |
| return 0 | |
| def migrate_remaining(old, new): | |
| """Copy all remaining tables that don't map to the schema.""" | |
| print("\n[STEP 6] Copying remaining tables...") | |
| # Tables already migrated | |
| migrated = { | |
| 'a1_entries', 'a1_записи', 'persian_a1_mad_khil', | |
| 'bitig_a1_entries', 'latin_a1_entries', 'european_a1_entries', | |
| 'a2_names_of_allah', 'a3_quran_refs', 'a6_country_names', | |
| 'child_schema', 'm1_phonetic_shifts', 'm2_detection_patterns', | |
| 'm3_scholars', 'm4_networks', 'root_translations', | |
| # Schema tables (already created by create_uslap_db.sql) | |
| 'languages', 'decay_levels', 'script_corridors', | |
| 'roots', 'entries', 'derivatives', 'cross_refs', 'quran_refs', | |
| 'country_names', 'names_of_allah', 'phonetic_shifts', | |
| 'detection_patterns', 'networks', 'scholars', 'qur_verification', | |
| 'nt_codes', 'operation_codes', 'dp_codes', 'op_codes', | |
| 'child_entries', 'child_entry_links', 'operators', | |
| 'host_civilizations', 'operation_cycles', 'events', | |
| 'intel_reports', 'operator_aliases', 'word_fingerprints', | |
| 'cluster_cache', 'phonetic_mappings', 'engine_queue', | |
| 'change_log', 'sync_status', 'session_index', | |
| # FTS and virtual tables | |
| 'entries_fts', 'term_search', 'term_search_config', | |
| 'term_search_data', 'term_search_docsize', 'term_search_idx', | |
| # SQLite internal | |
| 'sqlite_sequence', | |
| } | |
| all_tables = old.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" | |
| ).fetchall() | |
| copied = 0 | |
| for t in all_tables: | |
| name = t['name'] | |
| if name in migrated or name.startswith('sqlite_'): | |
| continue | |
| count = copy_table_raw(old, new, name) | |
| if count > 0: | |
| print(f" {name}: {count} rows") | |
| copied += 1 | |
| print(f" Copied {copied} additional tables") | |
| # ============================================================================ | |
| # STEP 7: Copy contamination triggers | |
| # ============================================================================ | |
| def migrate_triggers(old, new): | |
| """Copy contamination triggers from old DB.""" | |
| print("\n[STEP 7] Migrating contamination triggers...") | |
| triggers = old.execute( | |
| "SELECT name, sql FROM sqlite_master WHERE type='trigger' AND sql IS NOT NULL" | |
| ).fetchall() | |
| copied = 0 | |
| for t in triggers: | |
| try: | |
| new.execute(t['sql']) | |
| copied += 1 | |
| except (sqlite3.OperationalError, sqlite3.IntegrityError) as e: | |
| # Skip triggers that reference tables not yet in new DB | |
| pass | |
| new.commit() | |
| print(f" Copied {copied}/{len(triggers)} triggers") | |
| # ============================================================================ | |
| # STEP 8: Verify | |
| # ============================================================================ | |
| def verify(new): | |
| """Verify migration.""" | |
| print("\n[STEP 8] Verification...") | |
| cur = new.cursor() | |
| # Check FK enforcement | |
| fk = cur.execute("PRAGMA foreign_keys").fetchone() | |
| print(f" PRAGMA foreign_keys = {fk[0]}") | |
| # Check roots | |
| root_count = cur.execute("SELECT COUNT(*) FROM roots").fetchone()[0] | |
| print(f" roots: {root_count}") | |
| # Check entries | |
| entry_count = cur.execute("SELECT COUNT(*) FROM entries").fetchone()[0] | |
| en_count = cur.execute("SELECT COUNT(*) FROM entries WHERE en_term IS NOT NULL").fetchone()[0] | |
| ru_count = cur.execute("SELECT COUNT(*) FROM entries WHERE ru_term IS NOT NULL").fetchone()[0] | |
| fa_count = cur.execute("SELECT COUNT(*) FROM entries WHERE fa_term IS NOT NULL").fetchone()[0] | |
| print(f" entries: {entry_count} (EN:{en_count}, RU:{ru_count}, FA:{fa_count})") | |
| # Check FK integrity | |
| violations = cur.execute("PRAGMA foreign_key_check").fetchall() | |
| print(f" FK violations: {len(violations)}") | |
| if violations[:5]: | |
| for v in violations[:5]: | |
| print(f" {v}") | |
| # Check table count | |
| tables = cur.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'").fetchone()[0] | |
| triggers = cur.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='trigger'").fetchone()[0] | |
| print(f" Total tables: {tables}") | |
| print(f" Total triggers: {triggers}") | |
| # ============================================================================ | |
| # MAIN | |
| # ============================================================================ | |
| def check_mode(): | |
| """Dry run — show what will happen.""" | |
| print("=== DRY RUN ===") | |
| old = connect_old() | |
| tables = old.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'").fetchone()[0] | |
| print(f"Current DB: {tables} tables") | |
| en = old.execute("SELECT COUNT(*) FROM a1_entries").fetchone()[0] | |
| ru = old.execute("SELECT COUNT(*) FROM [a1_записи]").fetchone()[0] | |
| fa = old.execute("SELECT COUNT(*) FROM persian_a1_mad_khil").fetchone()[0] | |
| bi = old.execute("SELECT COUNT(*) FROM bitig_a1_entries").fetchone()[0] | |
| la = old.execute("SELECT COUNT(*) FROM latin_a1_entries").fetchone()[0] | |
| eu = old.execute("SELECT COUNT(*) FROM european_a1_entries").fetchone()[0] | |
| print(f"Entries: EN={en} RU={ru} FA={fa} BITIG={bi} LATIN={la} EU={eu}") | |
| print(f"Total to merge into unified entries: {en + ru + fa}") | |
| print(f"Bitig/Latin/EU stay as separate tables (different column structure)") | |
| print(f"\nWill create: {NEW_DB}") | |
| print(f"Using schema: {SCHEMA_SQL}") | |
| print(f"Original DB: UNTOUCHED") | |
| old.close() | |
| def migrate_mode(): | |
| """Execute full migration.""" | |
| print("=== MIGRATION START ===") | |
| print(f"Source: {OLD_DB}") | |
| print(f"Target: {NEW_DB}") | |
| print(f"Schema: {SCHEMA_SQL}") | |
| # Remove existing target | |
| if os.path.exists(NEW_DB): | |
| os.remove(NEW_DB) | |
| # Create new DB with schema | |
| print("\n[STEP 0] Creating new DB from schema...") | |
| new_conn = sqlite3.connect(NEW_DB) | |
| # Read and execute schema SQL (skip lines that need UDFs) | |
| with open(SCHEMA_SQL, 'r') as f: | |
| schema = f.read() | |
| # Remove extract_consonants triggers (need Python UDF) | |
| # We'll add them back later via db_access_layer | |
| lines = schema.split('\n') | |
| clean_lines = [] | |
| skip_until_end = False | |
| for line in lines: | |
| if 'extract_consonants' in line: | |
| skip_until_end = True | |
| if skip_until_end: | |
| if line.strip() == 'END;': | |
| skip_until_end = False | |
| continue | |
| clean_lines.append(line) | |
| clean_schema = '\n'.join(clean_lines) | |
| # Remove CHECK constraints on pattern (data has evolved beyond original enums) | |
| import re | |
| clean_schema = re.sub(r"CHECK\s*\(\s*pattern\s+IN\s*\([^)]+\)\s*\)", "", clean_schema) | |
| clean_schema = re.sub(r"CHECK\s*\(\s*status\s+IN\s*\([^)]+\)\s*\)", "", clean_schema) | |
| clean_schema = re.sub(r"CHECK\s*\(\s*change_type\s+IN\s*\([^)]+\)\s*\)", "", clean_schema) | |
| clean_schema = re.sub(r"CHECK\s*\(\s*sync_direction\s+IN\s*\([^)]+\)\s*\)", "", clean_schema) | |
| # Keep score CHECK and confidence CHECK — those are valid | |
| try: | |
| new_conn.executescript(clean_schema) | |
| print(" Schema created successfully") | |
| except Exception as e: | |
| print(f" Schema error: {e}") | |
| # Try statement by statement | |
| for stmt in clean_schema.split(';'): | |
| stmt = stmt.strip() | |
| if stmt and not stmt.startswith('--'): | |
| try: | |
| new_conn.execute(stmt) | |
| except Exception as e2: | |
| if 'already exists' not in str(e2): | |
| print(f" SKIP: {stmt[:60]}... ({e2})") | |
| new_conn.commit() | |
| new_conn.close() | |
| # Now open both DBs | |
| old = connect_old() | |
| new = connect_new() | |
| # Execute migration steps | |
| root_lookup, root_id_lookup = build_roots(old, new) | |
| id_map = migrate_entries(old, new, root_lookup) | |
| migrate_mechanism(old, new) | |
| migrate_application(old, new, root_lookup) | |
| migrate_child(old, new) | |
| migrate_remaining(old, new) | |
| migrate_triggers(old, new) | |
| verify(new) | |
| old.close() | |
| new.close() | |
| # File size comparison | |
| old_size = os.path.getsize(OLD_DB) | |
| new_size = os.path.getsize(NEW_DB) | |
| print(f"\n=== MIGRATION COMPLETE ===") | |
| print(f"Old DB: {old_size / 1024 / 1024:.1f} MB") | |
| print(f"New DB: {new_size / 1024 / 1024:.1f} MB") | |
| print(f"\nNew DB at: {NEW_DB}") | |
| print(f"Original UNTOUCHED at: {OLD_DB}") | |
| print(f"\nTo activate: rename v4 to v3 (after verification)") | |
| if __name__ == '__main__': | |
| if len(sys.argv) < 2: | |
| print("Usage: python3 migrate_to_schema.py check|migrate") | |
| sys.exit(1) | |
| cmd = sys.argv[1] | |
| if cmd == 'check': | |
| check_mode() | |
| elif cmd == 'migrate': | |
| migrate_mode() | |
| else: | |
| print(f"Unknown command: {cmd}") | |
| sys.exit(1) | |