uslap-query / Code_files /archive /migrate_to_schema.py
uslap's picture
Upload folder using huggingface_hub
7cc8e29 verified
Raw
History Blame Contribute Delete
36.7 kB
#!/usr/bin/env python3
"""
USLaP Schema Migration — Restore FK-enforced architecture
بِسْمِ اللَّهِ الرَّحْمَٰنِ الرَّحِيمِ
Migrates current 193-table DB → correct schema from create_uslap_db.sql
- Builds `roots` table from root_translations + all A1 root_ids
- Merges 6 A1 tables into unified `entries` with FK to roots
- Migrates all other tables with FK links
- Preserves ALL existing data
- Copies contamination triggers
- Restores PRAGMA foreign_keys = ON
Usage:
python3 Code_files/migrate_to_schema.py check # dry run — show what will happen
python3 Code_files/migrate_to_schema.py migrate # execute migration
"""
import sqlite3
import sys
import os
import json
from datetime import datetime
OLD_DB = "/Users/mmsetubal/Documents/USLaP workplace/Code_files/uslap_database_v3.db"
NEW_DB = "/Users/mmsetubal/Documents/USLaP workplace/Code_files/uslap_database_v4_migrated.db"
SCHEMA_SQL = "/Users/mmsetubal/Documents/USLaP workplace/Code_files/create_uslap_db.sql"
def connect_old():
conn = sqlite3.connect(OLD_DB)
conn.row_factory = sqlite3.Row
return conn
def connect_new():
conn = sqlite3.connect(NEW_DB)
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
return conn
# ============================================================================
# STEP 1: Build roots table from all sources
# ============================================================================
def build_roots(old, new):
"""Collect all unique roots from all tables, merge with root_translations."""
print("\n[STEP 1] Building roots table...")
roots = {} # root_id -> {root_letters, root_bare, quran_tokens, quran_lemmas, bitig_attested}
# Source 1: root_translations (has Qur'anic data)
rows = old.execute("""
SELECT root_hyphenated, root_unhyphenated, token_count, form_count, first_ayah
FROM root_translations
""").fetchall()
for r in rows:
# We need to find a root_id for this root
# Check if any entry references it
root_h = r['root_hyphenated']
root_bare = r['root_unhyphenated']
roots[root_h] = {
'root_letters': root_h,
'root_bare': root_bare,
'quran_tokens': r['token_count'] or 0,
'quran_lemmas': r['form_count'] or 0,
'root_id': None, # will be assigned
'bitig_attested': False,
'first_ayah': r['first_ayah']
}
# Source 2: EN entries (has root_id mapping)
en_roots = old.execute("""
SELECT DISTINCT root_id, root_letters
FROM a1_entries
WHERE root_id IS NOT NULL AND root_id != ''
""").fetchall()
for r in en_roots:
rl = r['root_letters']
rid = r['root_id']
if rl and rl in roots:
roots[rl]['root_id'] = rid
elif rl:
roots[rl] = {
'root_letters': rl,
'root_bare': rl.replace('-', '').replace('‑', ''),
'quran_tokens': 0,
'quran_lemmas': 0,
'root_id': rid,
'bitig_attested': False,
'first_ayah': None
}
# Source 3: RU entries
ru_roots = old.execute("""
SELECT DISTINCT корень_id, корневые_буквы
FROM [a1_записи]
WHERE корень_id IS NOT NULL AND корень_id != ''
""").fetchall()
for r in ru_roots:
rl = r['корневые_буквы']
rid = r['корень_id']
if rl and rl in roots and not roots[rl]['root_id']:
roots[rl]['root_id'] = rid
elif rl and rl not in roots:
roots[rl] = {
'root_letters': rl,
'root_bare': rl.replace('-', '').replace('‑', ''),
'quran_tokens': 0,
'quran_lemmas': 0,
'root_id': rid,
'bitig_attested': False,
'first_ayah': None
}
# Source 4: FA entries
fa_roots = old.execute("""
SELECT DISTINCT [r_she_id_ریشِه_root_id] as root_id, [hor_f_e_r_she_حُروفِ_ریشِه_root_letters] as root_letters
FROM persian_a1_mad_khil
WHERE [r_she_id_ریشِه_root_id] IS NOT NULL AND [r_she_id_ریشِه_root_id] != ''
""").fetchall()
for r in fa_roots:
rl = r['root_letters']
rid = r['root_id']
if rl and rl in roots and not roots[rl]['root_id']:
roots[rl]['root_id'] = rid
elif rl and rl not in roots:
roots[rl] = {
'root_letters': rl,
'root_bare': rl.replace('-', '').replace('‑', ''),
'quran_tokens': 0,
'quran_lemmas': 0,
'root_id': rid,
'bitig_attested': False,
'first_ayah': None
}
# Source 5: Latin entries
lat_roots = old.execute("""
SELECT DISTINCT root_id, root_letters
FROM latin_a1_entries
WHERE root_id IS NOT NULL AND root_id != ''
""").fetchall()
for r in lat_roots:
rl = r['root_letters']
rid = r['root_id']
if rl and rl in roots and not roots[rl]['root_id']:
roots[rl]['root_id'] = rid
elif rl and rl not in roots:
roots[rl] = {
'root_letters': rl,
'root_bare': rl.replace('-', '').replace('‑', ''),
'quran_tokens': 0,
'quran_lemmas': 0,
'root_id': rid,
'bitig_attested': False,
'first_ayah': None
}
# Source 6: EU entries
eu_roots = old.execute("""
SELECT DISTINCT root_id, root_letters
FROM european_a1_entries
WHERE root_id IS NOT NULL AND root_id != ''
""").fetchall()
for r in eu_roots:
rl = r['root_letters']
rid = r['root_id']
if rl and rl in roots and not roots[rl]['root_id']:
roots[rl]['root_id'] = rid
elif rl and rl not in roots:
roots[rl] = {
'root_letters': rl,
'root_bare': rl.replace('-', '').replace('‑', ''),
'quran_tokens': 0,
'quran_lemmas': 0,
'root_id': rid,
'bitig_attested': False,
'first_ayah': None
}
# Source 7: Bitig attestation
bitig_roots = old.execute("""
SELECT DISTINCT root_letters
FROM bitig_a1_entries
WHERE root_letters IS NOT NULL AND root_letters != ''
""").fetchall()
for r in bitig_roots:
rl = r['root_letters']
if rl and rl in roots:
roots[rl]['bitig_attested'] = True
# Source 8: Names of Allah
allah_roots = old.execute("""
SELECT allah_id, root_id
FROM a2_names_of_allah
WHERE root_id IS NOT NULL AND root_id != ''
""").fetchall()
for r in allah_roots:
rid = r['root_id']
# Find matching root_letters
for rl, data in roots.items():
if data['root_id'] == rid:
break
# Assign root_ids to roots that don't have one
# Use T### for Bitig-only, R### for AA
max_r = 0
max_t = 0
for rl, data in roots.items():
if data['root_id']:
rid = data['root_id']
if rid.startswith('R') and rid[1:].isdigit():
max_r = max(max_r, int(rid[1:]))
elif rid.startswith('T') and rid[1:].isdigit():
max_t = max(max_t, int(rid[1:]))
for rl, data in roots.items():
if not data['root_id']:
if data['bitig_attested'] and data['quran_tokens'] == 0:
max_t += 1
data['root_id'] = f"T{max_t:03d}"
else:
max_r += 1
data['root_id'] = f"R{max_r:03d}"
# Determine root type
for rl, data in roots.items():
bare = data['root_bare']
# Count actual letters (not hyphens, dashes)
letters = [c for c in bare if c not in ('-', '‑', ' ', '\u200c')]
n = len(letters)
if n <= 2:
data['root_type'] = 'BILITERAL'
elif n == 3:
data['root_type'] = 'TRILITERAL'
elif n == 4:
data['root_type'] = 'QUADRILITERAL'
else:
data['root_type'] = 'EXTENDED'
# Insert into new DB
inserted = 0
for rl, data in roots.items():
try:
new.execute("""
INSERT INTO roots (root_id, root_letters, root_bare, root_type,
quran_tokens, quran_lemmas, bitig_attested, primary_meaning)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
data['root_id'], data['root_letters'], data['root_bare'],
data['root_type'], data['quran_tokens'], data['quran_lemmas'],
data['bitig_attested'], None # no contaminated meanings
))
inserted += 1
except sqlite3.IntegrityError as e:
print(f" SKIP duplicate root_id {data['root_id']} for {rl}: {e}")
new.commit()
print(f" Inserted {inserted} roots. Max R={max_r}, Max T={max_t}")
# Build lookup: root_letters -> root_id
root_lookup = {rl: data['root_id'] for rl, data in roots.items()}
root_id_lookup = {data['root_id']: rl for rl, data in roots.items()}
return root_lookup, root_id_lookup
# ============================================================================
# STEP 2: Migrate entries (unified table)
# ============================================================================
def migrate_entries(old, new, root_lookup):
"""Merge all A1 tables into unified entries table."""
print("\n[STEP 2] Migrating entries...")
# We need to track old->new ID mapping for FK references
id_map = {} # (source_table, old_id) -> new_entry_id
entry_id = 0
# Build set of valid root_ids in new DB
valid_root_ids = set(r[0] for r in new.execute("SELECT root_id FROM roots").fetchall())
print(f" Valid root_ids in roots table: {len(valid_root_ids)}")
# EN entries
en_rows = old.execute("SELECT * FROM a1_entries ORDER BY entry_id").fetchall()
print(f" EN: {len(en_rows)} entries")
en_orphan_roots = 0
for r in en_rows:
entry_id += 1
rid = r['root_id']
root_letters = r['root_letters'] if 'root_letters' in r.keys() else None
# Resolve: prefer root_letters lookup, fallback to original if in roots, else NULL
if root_letters and root_letters in root_lookup:
rid = root_lookup[root_letters]
elif rid and rid in valid_root_ids:
pass
elif rid:
rid = None
en_orphan_roots += 1
new.execute("""
INSERT INTO entries (entry_id, score, en_term, ar_word, root_id, root_letters,
qur_meaning, pattern, allah_name_id, network_id,
phonetic_chain, inversion_type, source_form, foundation_refs,
ds_corridor, decay_level)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry_id, r['score'], r['en_term'], r['ar_word'],
rid, root_letters, r['qur_meaning'], r['pattern'],
r['allah_name_id'], r['network_id'], r['phonetic_chain'],
r['inversion_type'], r['source_form'], r['foundation_ref'],
r['corridor'] if 'corridor' in r.keys() else None,
r['decay_model'] if 'decay_model' in r.keys() else None
))
id_map[('en', r['entry_id'])] = entry_id
# RU entries
ru_rows = old.execute("SELECT * FROM [a1_записи] ORDER BY запись_id").fetchall()
print(f" RU: {len(ru_rows)} entries")
ru_orphan_roots = 0
for r in ru_rows:
entry_id += 1
rid = r['корень_id']
root_letters = r['корневые_буквы'] if 'корневые_буквы' in r.keys() else None
# Resolve root_id: prefer lookup by root_letters, fallback to original if valid
if root_letters and root_letters in root_lookup:
rid = root_lookup[root_letters]
elif rid and rid in valid_root_ids:
pass # keep original
elif rid and '+' in str(rid):
rid = None # compound root_ids like R202+R423 — set NULL for now
elif rid and rid not in valid_root_ids:
rid = None # orphan — no matching root
ru_orphan_roots += 1
new.execute("""
INSERT INTO entries (entry_id, score, ru_term, ar_word, root_id, root_letters,
qur_meaning, pattern, allah_name_id, network_id,
phonetic_chain, inversion_type, source_form, foundation_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry_id, r['балл'], r['рус_термин'], r['ар_слово'],
rid, root_letters, r['коранич_значение'], r['паттерн'],
r['имя_аллаха_id'], r['сеть_id'], r['фонетическая_цепь'],
r['тип_инверсии'], r['исходная_форма'], r['основание']
))
id_map[('ru', r['запись_id'])] = entry_id
if ru_orphan_roots:
print(f" RU orphan roots (set NULL): {ru_orphan_roots}")
# FA entries
fa_rows = old.execute("SELECT * FROM persian_a1_mad_khil ORDER BY rowid").fetchall()
print(f" FA: {len(fa_rows)} entries")
for r in fa_rows:
entry_id += 1
keys = r.keys()
fa_term = r['v_zhe_f_rs__واژِهِ_فارسی_persian_term'] if 'v_zhe_f_rs__واژِهِ_فارسی_persian_term' in keys else None
ar_word = r['kalame_a_l__کَلَمِه_اَصلی__عربی___بازنویسی___تَرجُمه__source_word__arabic___transliteration___translation'] if 'kalame_a_l__کَلَمِه_اَصلی__عربی___بازنویسی___تَرجُمه__source_word__arabic___transliteration___translation' in keys else None
root_letters = r['hor_f_e_r_she_حُروفِ_ریشِه_root_letters'] if 'hor_f_e_r_she_حُروفِ_ریشِه_root_letters' in keys else None
rid = r['r_she_id_ریشِه_root_id'] if 'r_she_id_ریشِه_root_id' in keys else None
if root_letters and root_letters in root_lookup:
rid = root_lookup[root_letters]
elif rid and rid in valid_root_ids:
pass
elif rid:
rid = None
score = r['nomre_نُمره_score'] if 'nomre_نُمره_score' in keys else None
new.execute("""
INSERT INTO entries (entry_id, score, fa_term, ar_word, root_id, root_letters,
qur_meaning, pattern, allah_name_id, network_id,
phonetic_chain, inversion_type, source_form, foundation_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry_id, score, fa_term, ar_word, rid, root_letters,
r['ma_n__ye_qur__n__مَعنایِ_قُرآنی__عربی___بازنویسی___تَرجُمه__qur_meaning__arabic___transliteration___translation'] if 'ma_n__ye_qur__n__مَعنایِ_قُرآنی__عربی___بازنویسی___تَرجُمه__qur_meaning__arabic___transliteration___translation' in keys else None,
r['olg__اُلگو_pattern'] if 'olg__اُلگو_pattern' in keys else None,
r['esm_e_all_h_id_اِسمِ_الله_allah_name_id'] if 'esm_e_all_h_id_اِسمِ_الله_allah_name_id' in keys else None,
r['shabake_id_شَبَکِه_network_id'] if 'shabake_id_شَبَکِه_network_id' in keys else None,
r['zanj_re__awt__زَنجیرِهِ_صَوتی_phonetic_chain'] if 'zanj_re__awt__زَنجیرِهِ_صَوتی_phonetic_chain' in keys else None,
r['now__e_v_zhg_n__نَوعِ_واژگونی_inversion_type'] if 'now__e_v_zhg_n__نَوعِ_واژگونی_inversion_type' in keys else None,
r['shakl_e_a_l__شَکلِ_اَصلی_source_form'] if 'shakl_e_a_l__شَکلِ_اَصلی_source_form' in keys else None,
r['boniy_n_بُنیان_foundation_ref'] if 'boniy_n_بُنیان_foundation_ref' in keys else None,
))
old_id = r['madkhal_id_مَدخَل_entry_id'] if 'madkhal_id_مَدخَل_entry_id' in keys else None
id_map[('fa', old_id)] = entry_id
new.commit()
print(f" Total unified entries: {entry_id}")
print(f" ID mappings: {len(id_map)}")
return id_map
# ============================================================================
# STEP 3: Migrate mechanism tables
# ============================================================================
def migrate_mechanism(old, new):
"""Migrate M1-M5 tables."""
print("\n[STEP 3] Migrating mechanism tables...")
# M1: phonetic_shifts
rows = old.execute("SELECT * FROM m1_phonetic_shifts").fetchall()
for r in rows:
keys = r.keys()
new.execute("""
INSERT INTO phonetic_shifts (shift_id, ar_letter, ar_name, en_outputs, ru_outputs,
description, examples, entry_ids, foundation_ref)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (r['shift_id'], r['ar_letter'], r['ar_name'], r['en_outputs'],
r['ru_outputs'] if 'ru_outputs' in keys else None,
r['direction'] if 'direction' in keys else None,
r['examples'] if 'examples' in keys else None,
r['entry_ids'] if 'entry_ids' in keys else None,
r['foundation_ref'] if 'foundation_ref' in keys else None))
print(f" M1 shifts: {len(rows)}")
# M2: detection_patterns
rows = old.execute("SELECT * FROM m2_detection_patterns").fetchall()
for r in rows:
new.execute("""
INSERT INTO detection_patterns (pattern_id, name, level, description, triggers, qur_ref, example)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (r['pattern_id'], r['name'], r['level'], r['description'],
r['triggers'] if 'triggers' in r.keys() else None,
r['qur_ref'] if 'qur_ref' in r.keys() else None,
r['example'] if 'example' in r.keys() else None))
print(f" M2 detection: {len(rows)}")
# M3: scholars
rows = old.execute("SELECT * FROM m3_scholars").fetchall()
for r in rows:
keys = r.keys()
bp = r['birthplace'] if 'birthplace' in keys else (r['birth_place'] if 'birth_place' in keys else None)
new.execute("""
INSERT INTO scholars (scholar_id, verified_name, birth_place, identity, role,
achievement, lies_applied, death_fate, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (r['scholar_id'], r['verified_name'], bp,
r['identity'], r['role'] if 'role' in keys else None,
r['achievement'] if 'achievement' in keys else None,
r['lies_applied'] if 'lies_applied' in keys else None,
r['death_fate'] if 'death_fate' in keys else None,
r['status'] if 'status' in keys else 'VERIFIED'))
print(f" M3 scholars: {len(rows)}")
# M4: networks
rows = old.execute("SELECT * FROM m4_networks").fetchall()
for r in rows:
keys = r.keys()
new.execute("""
INSERT INTO networks (network_id, name, title, link_verse, description, mechanism, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (r['network_id'], r['name'], r['title'] if 'title' in keys else None,
r['link_verse'] if 'link_verse' in keys else None,
r['description'], r['mechanism'] if 'mechanism' in keys else None,
r['status'] if 'status' in keys else 'CONFIRMED'))
print(f" M4 networks: {len(rows)}")
new.commit()
# ============================================================================
# STEP 4: Migrate application tables
# ============================================================================
def migrate_application(old, new, root_lookup):
"""Migrate A2-A6 tables."""
print("\n[STEP 4] Migrating application tables...")
# Get valid root_ids
valid_root_ids = set(r[0] for r in new.execute("SELECT root_id FROM roots").fetchall())
# A2: names_of_allah
rows = old.execute("SELECT * FROM a2_names_of_allah").fetchall()
for r in rows:
keys = r.keys()
rid = r['root_id'] if 'root_id' in keys else None
if rid and rid not in valid_root_ids:
rid = None # orphan root_id
new.execute("""
INSERT INTO names_of_allah (allah_id, arabic_name, transliteration, meaning,
qur_ref, entry_ids, root_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (r['allah_id'], r['arabic_name'], r['transliteration'],
r['meaning'], r['qur_ref'] if 'qur_ref' in keys else None,
r['entry_ids'] if 'entry_ids' in keys else None, rid))
print(f" A2 names: {len(rows)}")
# A3: quran_refs
rows = old.execute("SELECT * FROM a3_quran_refs").fetchall()
for r in rows:
keys = r.keys()
try:
new.execute("""
INSERT INTO quran_refs (ref_id, surah, ayah, arabic_text, transliteration,
translation, relevance, entry_ids, layer_ref)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (r['ref_id'], r['surah'], r['ayah'], r['arabic_text'],
r['transliteration'] if 'transliteration' in keys else None,
r['translation'] if 'translation' in keys else None,
r['relevance'] if 'relevance' in keys else None,
r['entry_ids'] if 'entry_ids' in keys else None,
r['layer_ref'] if 'layer_ref' in keys else None))
except sqlite3.IntegrityError:
pass # skip duplicates
print(f" A3 quran_refs: {len(rows)}")
# A6: country_names
rows = old.execute("SELECT * FROM a6_country_names").fetchall()
for r in rows:
keys = r.keys()
rid = r['root_id'] if 'root_id' in keys else None
if rid and rid not in valid_root_ids:
rid = None
new.execute("""
INSERT INTO country_names (country_id, country_name, root_id, al_word,
qur_meaning, phonetic_chain, naming_basis, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (r['country_id'] if 'country_id' in keys else None,
r['country_name'] if 'country_name' in keys else None,
rid,
r['al_word'] if 'al_word' in keys else None,
r['qur_meaning'] if 'qur_meaning' in keys else None,
r['phonetic_chain'] if 'phonetic_chain' in keys else None,
r['naming_basis'] if 'naming_basis' in keys else None,
r['notes'] if 'notes' in keys else None))
print(f" A6 country: {len(rows)}")
new.commit()
# ============================================================================
# STEP 5: Migrate child_schema → child_entries
# ============================================================================
def migrate_child(old, new):
"""Migrate child_schema to child_entries."""
print("\n[STEP 5] Migrating child schema...")
# Temporarily disable FK for child migration (data has evolved beyond original reference tables)
new.execute("PRAGMA foreign_keys = OFF")
# Get valid reference values
valid_nt = set(r[0] for r in new.execute("SELECT nt_code FROM nt_codes").fetchall())
valid_op = set(r[0] for r in new.execute("SELECT op_code FROM operation_codes").fetchall())
rows = old.execute("SELECT * FROM child_schema").fetchall()
for r in rows:
keys = r.keys()
nt = r['nt_code'] if 'nt_code' in keys else None
parent = r['parent_op'] if 'parent_op' in keys else None
if nt and nt not in valid_nt:
nt = None # orphan nt_code
if parent and parent not in valid_op:
parent = None # orphan parent_op
new.execute("""
INSERT INTO child_entries (child_id, shell_name, shell_language, orig_class,
orig_root, orig_lemma, orig_meaning, operation_role,
shell_meaning, inversion_direction, phonetic_chain,
qur_anchors, dp_codes, nt_code, pattern, parent_op,
gate_status, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
r['entry_id'] if 'entry_id' in keys else (r['child_id'] if 'child_id' in keys else None),
r['shell_name'] if 'shell_name' in keys else None,
r['shell_language'] if 'shell_language' in keys else None,
r['orig_class'] if 'orig_class' in keys else None,
r['orig_root'] if 'orig_root' in keys else None,
r['orig_lemma'] if 'orig_lemma' in keys else None,
r['orig_meaning'] if 'orig_meaning' in keys else None,
r['operation_role'] if 'operation_role' in keys else None,
r['shell_meaning'] if 'shell_meaning' in keys else None,
r['inversion_direction'] if 'inversion_direction' in keys else None,
r['phonetic_chain'] if 'phonetic_chain' in keys else None,
r['qur_anchors'] if 'qur_anchors' in keys else None,
r['dp_codes'] if 'dp_codes' in keys else None,
nt,
r['pattern'] if 'pattern' in keys else None,
parent,
r['gate_status'] if 'gate_status' in keys else None,
r['notes'] if 'notes' in keys else None
))
print(f" Child entries: {len(rows)}")
new.commit()
# Re-enable FK
new.execute("PRAGMA foreign_keys = ON")
# ============================================================================
# STEP 6: Copy tables that stay as-is (Qur'anic, Body, Bitig, Protocol, etc.)
# ============================================================================
def copy_table_raw(old, new, table_name, new_table_name=None):
"""Copy a table from old to new DB as-is (no FK, just data preservation)."""
target = new_table_name or table_name
try:
cols = old.execute(f"PRAGMA table_info([{table_name}])").fetchall()
if not cols:
return 0
col_names = [c['name'] for c in cols]
col_list = ', '.join([f'[{c}]' for c in col_names])
# Create table in new DB if it doesn't exist
create_sql = old.execute(
f"SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,)
).fetchone()
if create_sql:
sql = create_sql[0]
if new_table_name:
sql = sql.replace(f'CREATE TABLE [{table_name}]', f'CREATE TABLE [{target}]', 1)
sql = sql.replace(f'CREATE TABLE {table_name}', f'CREATE TABLE [{target}]', 1)
try:
new.execute(sql)
except sqlite3.OperationalError:
pass # table already exists
rows = old.execute(f"SELECT {col_list} FROM [{table_name}]").fetchall()
placeholders = ', '.join(['?' for _ in col_names])
for r in rows:
try:
new.execute(f"INSERT INTO [{target}] ({col_list}) VALUES ({placeholders})",
[r[c] for c in col_names])
except (sqlite3.IntegrityError, sqlite3.OperationalError):
pass
new.commit()
return len(rows)
except Exception as e:
print(f" ERROR copying {table_name}: {e}")
return 0
def migrate_remaining(old, new):
"""Copy all remaining tables that don't map to the schema."""
print("\n[STEP 6] Copying remaining tables...")
# Tables already migrated
migrated = {
'a1_entries', 'a1_записи', 'persian_a1_mad_khil',
'bitig_a1_entries', 'latin_a1_entries', 'european_a1_entries',
'a2_names_of_allah', 'a3_quran_refs', 'a6_country_names',
'child_schema', 'm1_phonetic_shifts', 'm2_detection_patterns',
'm3_scholars', 'm4_networks', 'root_translations',
# Schema tables (already created by create_uslap_db.sql)
'languages', 'decay_levels', 'script_corridors',
'roots', 'entries', 'derivatives', 'cross_refs', 'quran_refs',
'country_names', 'names_of_allah', 'phonetic_shifts',
'detection_patterns', 'networks', 'scholars', 'qur_verification',
'nt_codes', 'operation_codes', 'dp_codes', 'op_codes',
'child_entries', 'child_entry_links', 'operators',
'host_civilizations', 'operation_cycles', 'events',
'intel_reports', 'operator_aliases', 'word_fingerprints',
'cluster_cache', 'phonetic_mappings', 'engine_queue',
'change_log', 'sync_status', 'session_index',
# FTS and virtual tables
'entries_fts', 'term_search', 'term_search_config',
'term_search_data', 'term_search_docsize', 'term_search_idx',
# SQLite internal
'sqlite_sequence',
}
all_tables = old.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
copied = 0
for t in all_tables:
name = t['name']
if name in migrated or name.startswith('sqlite_'):
continue
count = copy_table_raw(old, new, name)
if count > 0:
print(f" {name}: {count} rows")
copied += 1
print(f" Copied {copied} additional tables")
# ============================================================================
# STEP 7: Copy contamination triggers
# ============================================================================
def migrate_triggers(old, new):
"""Copy contamination triggers from old DB."""
print("\n[STEP 7] Migrating contamination triggers...")
triggers = old.execute(
"SELECT name, sql FROM sqlite_master WHERE type='trigger' AND sql IS NOT NULL"
).fetchall()
copied = 0
for t in triggers:
try:
new.execute(t['sql'])
copied += 1
except (sqlite3.OperationalError, sqlite3.IntegrityError) as e:
# Skip triggers that reference tables not yet in new DB
pass
new.commit()
print(f" Copied {copied}/{len(triggers)} triggers")
# ============================================================================
# STEP 8: Verify
# ============================================================================
def verify(new):
"""Verify migration."""
print("\n[STEP 8] Verification...")
cur = new.cursor()
# Check FK enforcement
fk = cur.execute("PRAGMA foreign_keys").fetchone()
print(f" PRAGMA foreign_keys = {fk[0]}")
# Check roots
root_count = cur.execute("SELECT COUNT(*) FROM roots").fetchone()[0]
print(f" roots: {root_count}")
# Check entries
entry_count = cur.execute("SELECT COUNT(*) FROM entries").fetchone()[0]
en_count = cur.execute("SELECT COUNT(*) FROM entries WHERE en_term IS NOT NULL").fetchone()[0]
ru_count = cur.execute("SELECT COUNT(*) FROM entries WHERE ru_term IS NOT NULL").fetchone()[0]
fa_count = cur.execute("SELECT COUNT(*) FROM entries WHERE fa_term IS NOT NULL").fetchone()[0]
print(f" entries: {entry_count} (EN:{en_count}, RU:{ru_count}, FA:{fa_count})")
# Check FK integrity
violations = cur.execute("PRAGMA foreign_key_check").fetchall()
print(f" FK violations: {len(violations)}")
if violations[:5]:
for v in violations[:5]:
print(f" {v}")
# Check table count
tables = cur.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'").fetchone()[0]
triggers = cur.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='trigger'").fetchone()[0]
print(f" Total tables: {tables}")
print(f" Total triggers: {triggers}")
# ============================================================================
# MAIN
# ============================================================================
def check_mode():
"""Dry run — show what will happen."""
print("=== DRY RUN ===")
old = connect_old()
tables = old.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'").fetchone()[0]
print(f"Current DB: {tables} tables")
en = old.execute("SELECT COUNT(*) FROM a1_entries").fetchone()[0]
ru = old.execute("SELECT COUNT(*) FROM [a1_записи]").fetchone()[0]
fa = old.execute("SELECT COUNT(*) FROM persian_a1_mad_khil").fetchone()[0]
bi = old.execute("SELECT COUNT(*) FROM bitig_a1_entries").fetchone()[0]
la = old.execute("SELECT COUNT(*) FROM latin_a1_entries").fetchone()[0]
eu = old.execute("SELECT COUNT(*) FROM european_a1_entries").fetchone()[0]
print(f"Entries: EN={en} RU={ru} FA={fa} BITIG={bi} LATIN={la} EU={eu}")
print(f"Total to merge into unified entries: {en + ru + fa}")
print(f"Bitig/Latin/EU stay as separate tables (different column structure)")
print(f"\nWill create: {NEW_DB}")
print(f"Using schema: {SCHEMA_SQL}")
print(f"Original DB: UNTOUCHED")
old.close()
def migrate_mode():
"""Execute full migration."""
print("=== MIGRATION START ===")
print(f"Source: {OLD_DB}")
print(f"Target: {NEW_DB}")
print(f"Schema: {SCHEMA_SQL}")
# Remove existing target
if os.path.exists(NEW_DB):
os.remove(NEW_DB)
# Create new DB with schema
print("\n[STEP 0] Creating new DB from schema...")
new_conn = sqlite3.connect(NEW_DB)
# Read and execute schema SQL (skip lines that need UDFs)
with open(SCHEMA_SQL, 'r') as f:
schema = f.read()
# Remove extract_consonants triggers (need Python UDF)
# We'll add them back later via db_access_layer
lines = schema.split('\n')
clean_lines = []
skip_until_end = False
for line in lines:
if 'extract_consonants' in line:
skip_until_end = True
if skip_until_end:
if line.strip() == 'END;':
skip_until_end = False
continue
clean_lines.append(line)
clean_schema = '\n'.join(clean_lines)
# Remove CHECK constraints on pattern (data has evolved beyond original enums)
import re
clean_schema = re.sub(r"CHECK\s*\(\s*pattern\s+IN\s*\([^)]+\)\s*\)", "", clean_schema)
clean_schema = re.sub(r"CHECK\s*\(\s*status\s+IN\s*\([^)]+\)\s*\)", "", clean_schema)
clean_schema = re.sub(r"CHECK\s*\(\s*change_type\s+IN\s*\([^)]+\)\s*\)", "", clean_schema)
clean_schema = re.sub(r"CHECK\s*\(\s*sync_direction\s+IN\s*\([^)]+\)\s*\)", "", clean_schema)
# Keep score CHECK and confidence CHECK — those are valid
try:
new_conn.executescript(clean_schema)
print(" Schema created successfully")
except Exception as e:
print(f" Schema error: {e}")
# Try statement by statement
for stmt in clean_schema.split(';'):
stmt = stmt.strip()
if stmt and not stmt.startswith('--'):
try:
new_conn.execute(stmt)
except Exception as e2:
if 'already exists' not in str(e2):
print(f" SKIP: {stmt[:60]}... ({e2})")
new_conn.commit()
new_conn.close()
# Now open both DBs
old = connect_old()
new = connect_new()
# Execute migration steps
root_lookup, root_id_lookup = build_roots(old, new)
id_map = migrate_entries(old, new, root_lookup)
migrate_mechanism(old, new)
migrate_application(old, new, root_lookup)
migrate_child(old, new)
migrate_remaining(old, new)
migrate_triggers(old, new)
verify(new)
old.close()
new.close()
# File size comparison
old_size = os.path.getsize(OLD_DB)
new_size = os.path.getsize(NEW_DB)
print(f"\n=== MIGRATION COMPLETE ===")
print(f"Old DB: {old_size / 1024 / 1024:.1f} MB")
print(f"New DB: {new_size / 1024 / 1024:.1f} MB")
print(f"\nNew DB at: {NEW_DB}")
print(f"Original UNTOUCHED at: {OLD_DB}")
print(f"\nTo activate: rename v4 to v3 (after verification)")
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 migrate_to_schema.py check|migrate")
sys.exit(1)
cmd = sys.argv[1]
if cmd == 'check':
check_mode()
elif cmd == 'migrate':
migrate_mode()
else:
print(f"Unknown command: {cmd}")
sys.exit(1)