Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Build SQLite database from USLaP_Final_Data_Consolidated_Master_v3.xlsx | |
| Preserves UTF-8 Arabic text exactly as stored. | |
| Creates one table per primary sheet. | |
| Adds cross-reference table linking ENTRY_ID → PARENT_OP → DP_REF. | |
| Now includes PROTOCOL_CORRECTIONS and SCHOLAR_WARNINGS tables. | |
| """ | |
| import sqlite3 | |
| import openpyxl | |
| import re | |
| from pathlib import Path | |
| def clean_column_name(col): | |
| """Convert Excel column header to valid SQLite column name.""" | |
| if col is None: | |
| return "unknown" | |
| # Remove non-alphanumeric, replace spaces with underscore | |
| col = str(col).strip() | |
| col = re.sub(r'[^\w\s]', '', col) # Remove punctuation | |
| col = re.sub(r'\s+', '_', col) # Replace spaces with underscore | |
| col = col.lower() | |
| if not col: | |
| return "unknown" | |
| return col | |
| def extract_parent_ops(parent_op_str): | |
| """Extract individual parent operations from strings like 'UMD-RL1 + UMD-ST1'""" | |
| if not parent_op_str: | |
| return [] | |
| # Split by various separators | |
| ops = re.split(r'[·,\+\s]+', str(parent_op_str)) | |
| # Filter for operation codes (UMD- prefix) | |
| parent_ops = [] | |
| for op in ops: | |
| op = op.strip() | |
| if op and re.match(r'^UMD-', op): | |
| parent_ops.append(op) | |
| return parent_ops | |
| def extract_dp_codes(dp_string): | |
| """Extract individual DP codes from strings like 'DP08 · DP07 · DP11 · DP15'""" | |
| if not dp_string: | |
| return [] | |
| # Split by various separators | |
| codes = re.split(r'[·,\s]+', str(dp_string)) | |
| # Filter for DP codes (start with DP followed by digits or word) | |
| dp_codes = [] | |
| for code in codes: | |
| code = code.strip() | |
| if code and (re.match(r'^DP\d+', code) or re.match(r'^DP-', code)): | |
| dp_codes.append(code) | |
| return dp_codes | |
| def create_tables(conn, cursor): | |
| """Create tables for each primary sheet.""" | |
| # Create UMD_OPERATIONS table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS umd_operations ( | |
| op_id TEXT PRIMARY KEY, | |
| op_name TEXT, | |
| op_class TEXT, | |
| qur_primary TEXT, | |
| qur_secondary TEXT, | |
| op_structure TEXT, | |
| founding_instances TEXT, | |
| dp_always_active TEXT, | |
| gate_shortcut TEXT, | |
| np_layer TEXT, | |
| darvo_active TEXT, | |
| notes TEXT, | |
| status TEXT, | |
| last_updated TEXT | |
| ) | |
| ''') | |
| # Create CHILD_SCHEMA table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS child_schema ( | |
| entry_id TEXT PRIMARY KEY, | |
| shell_name TEXT, | |
| shell_language TEXT, | |
| orig_class TEXT, | |
| orig_root TEXT, | |
| orig_lemma TEXT, | |
| orig_meaning TEXT, | |
| operation_role TEXT, | |
| shell_meaning TEXT, | |
| inversion_direction TEXT, | |
| phonetic_chain TEXT, | |
| qur_anchors TEXT, | |
| dp_codes TEXT, | |
| nt_code TEXT, | |
| pattern TEXT, | |
| parent_op TEXT, | |
| gate_status TEXT, | |
| notes TEXT | |
| ) | |
| ''') | |
| # Create DP_REGISTER table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS dp_register ( | |
| dp_code TEXT PRIMARY KEY, | |
| name TEXT, | |
| class TEXT, | |
| trigger TEXT, | |
| mechanism TEXT, | |
| qur_anchor TEXT, | |
| example TEXT, | |
| distinct_from TEXT, | |
| protocol_note TEXT, | |
| status TEXT | |
| ) | |
| ''') | |
| # Create ATT_TERMS table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS att_terms ( | |
| term_id TEXT PRIMARY KEY, | |
| arabic TEXT, | |
| transliteration TEXT, | |
| translation TEXT, | |
| root TEXT, | |
| qur_anchor TEXT, | |
| function_in_lattice TEXT, | |
| umd_op_ref TEXT, | |
| dp_ref TEXT, | |
| inversion_type TEXT, | |
| notes TEXT | |
| ) | |
| ''') | |
| # Create PHONETIC_REVERSAL table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS phonetic_reversal ( | |
| shift_code TEXT PRIMARY KEY, | |
| from_modern TEXT, | |
| to_orig TEXT, | |
| class TEXT, | |
| mechanism TEXT, | |
| attested_example TEXT, | |
| entry_ref TEXT, | |
| reliability TEXT, | |
| notes TEXT, | |
| status TEXT | |
| ) | |
| ''') | |
| # Create SESSION_INDEX table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS session_index ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| item_type TEXT, | |
| entry_id TEXT, | |
| description TEXT, | |
| sheet TEXT, | |
| status TEXT, | |
| notes TEXT | |
| ) | |
| ''') | |
| # Create PROTOCOL_CORRECTIONS table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS protocol_corrections ( | |
| correction_id TEXT PRIMARY KEY, | |
| type TEXT, | |
| old_form TEXT, | |
| new_form TEXT, | |
| authority TEXT, | |
| scope TEXT, | |
| date TEXT | |
| ) | |
| ''') | |
| # Create SCHOLAR_WARNINGS table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS scholar_warnings ( | |
| sc_code TEXT PRIMARY KEY, | |
| scholar TEXT, | |
| arabic_name TEXT, | |
| origin TEXT, | |
| century TEXT, | |
| primary_work TEXT, | |
| warning_content TEXT, | |
| key_contribution_to_lattice TEXT | |
| ) | |
| ''') | |
| # Create cross-reference table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS cross_reference ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| entry_id TEXT, | |
| parent_op TEXT, | |
| dp_ref TEXT, | |
| FOREIGN KEY (entry_id) REFERENCES child_schema (entry_id), | |
| FOREIGN KEY (parent_op) REFERENCES umd_operations (op_id) | |
| ) | |
| ''') | |
| conn.commit() | |
| print("Created all tables") | |
| def import_sheet_data(conn, cursor, wb, sheet_name, table_name): | |
| """Import data from an Excel sheet to SQLite table.""" | |
| ws = wb[sheet_name] | |
| rows = list(ws.iter_rows(values_only=True)) | |
| if not rows: | |
| print(f" No data in {sheet_name}") | |
| return 0 | |
| # Find header row based on sheet-specific patterns | |
| header_row = None | |
| data_start = 0 | |
| # Special handling for each sheet based on debug output | |
| if sheet_name == "DP_REGISTER": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| elif sheet_name == "PHONETIC_REVERSAL": | |
| # Row 0: title, Row 1: description, Row 2: zone header, Row 3: headers | |
| if len(rows) > 3: | |
| header_row = rows[3] | |
| data_start = 4 | |
| elif sheet_name == "UMD_OPERATIONS": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| elif sheet_name == "CHILD_SCHEMA": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| elif sheet_name == "ATT_TERMS": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| elif sheet_name == "SESSION_INDEX": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| elif sheet_name == "PROTOCOL_CORRECTIONS": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| elif sheet_name == "SCHOLAR_WARNINGS": | |
| # Row 0: title, Row 1: description, Row 2: headers | |
| if len(rows) > 2: | |
| header_row = rows[2] | |
| data_start = 3 | |
| else: | |
| # Generic fallback: find row with typical column names | |
| for i, row in enumerate(rows): | |
| if row and any(isinstance(cell, str) and ('ID' in cell or 'NAME' in cell or 'CODE' in cell) for cell in row): | |
| header_row = row | |
| data_start = i + 1 | |
| break | |
| if header_row is None: | |
| print(f" Could not find headers in {sheet_name}") | |
| return 0 | |
| # Clean column names | |
| col_names = [] | |
| for cell in header_row: | |
| col_name = clean_column_name(cell) | |
| # Handle edge case where title row got mistaken as header | |
| if col_name and 'uslap' in col_name and any(x in col_name for x in ['title', 'header', 'master']): | |
| # This looks like a title, not a column name | |
| col_names.append(clean_column_name(str(header_row.index(cell)))) | |
| else: | |
| col_names.append(col_name) | |
| # Create placeholders for SQL | |
| placeholders = ', '.join(['?' for _ in col_names]) | |
| col_list = ', '.join(col_names) | |
| # Prepare insert statement | |
| insert_sql = f"INSERT OR REPLACE INTO {table_name} ({col_list}) VALUES ({placeholders})" | |
| # Insert data rows | |
| count = 0 | |
| for i in range(data_start, len(rows)): | |
| row = rows[i] | |
| # Skip empty rows or rows where first cell is None | |
| if not row or row[0] is None: | |
| continue | |
| # Ensure row has same length as columns | |
| row_data = list(row) | |
| while len(row_data) < len(col_names): | |
| row_data.append(None) | |
| row_data = row_data[:len(col_names)] | |
| try: | |
| cursor.execute(insert_sql, row_data) | |
| count += 1 | |
| except Exception as e: | |
| print(f" Error inserting row {i} in {sheet_name}: {e}") | |
| print(f" Row data: {row_data}") | |
| conn.commit() | |
| print(f" Imported {count} rows into {table_name}") | |
| return count | |
| def build_cross_reference(conn, cursor): | |
| """Build cross-reference table from CHILD_SCHEMA data.""" | |
| cursor.execute("DELETE FROM cross_reference") | |
| # Get all child schema entries | |
| cursor.execute("SELECT entry_id, parent_op, dp_codes FROM child_schema") | |
| rows = cursor.fetchall() | |
| count = 0 | |
| for entry_id, parent_op_str, dp_codes_str in rows: | |
| if not entry_id or entry_id == 'ENTRY_ID': | |
| continue | |
| # Extract individual parent operations (e.g., "UMD-RL1 + UMD-ST1" -> ["UMD-RL1", "UMD-ST1"]) | |
| parent_ops = extract_parent_ops(parent_op_str) | |
| # Extract individual DP codes | |
| dp_codes = extract_dp_codes(dp_codes_str) | |
| # Insert one row per parent_op per dp_code | |
| for parent_op in parent_ops: | |
| # Verify parent_op exists in umd_operations | |
| cursor.execute("SELECT op_id FROM umd_operations WHERE op_id = ?", (parent_op,)) | |
| if not cursor.fetchone(): | |
| print(f" Warning: Parent operation {parent_op} not found in umd_operations for entry {entry_id}") | |
| continue | |
| for dp_code in dp_codes: | |
| cursor.execute( | |
| "INSERT INTO cross_reference (entry_id, parent_op, dp_ref) VALUES (?, ?, ?)", | |
| (entry_id, parent_op, dp_code) | |
| ) | |
| count += 1 | |
| conn.commit() | |
| print(f" Built {count} cross-reference entries") | |
| return count | |
| def main(): | |
| excel_path = "USLaP_Final_Data_Consolidated_Master_v3.xlsx" | |
| db_path = "uslap_database_v3.db" | |
| print(f"Opening Excel file: {excel_path}") | |
| wb = openpyxl.load_workbook(excel_path, read_only=True, data_only=True) | |
| print(f"Creating SQLite database: {db_path}") | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Enable foreign keys | |
| cursor.execute("PRAGMA foreign_keys = ON") | |
| # Drop all existing tables to recreate fresh | |
| print("Dropping existing tables...") | |
| tables_to_drop = [ | |
| "umd_operations", "child_schema", "dp_register", "att_terms", | |
| "phonetic_reversal", "session_index", "protocol_corrections", | |
| "scholar_warnings", "cross_reference" | |
| ] | |
| for table in tables_to_drop: | |
| try: | |
| cursor.execute(f"DROP TABLE IF EXISTS {table}") | |
| except Exception as e: | |
| print(f" Warning: Could not drop {table}: {e}") | |
| conn.commit() | |
| # Create tables | |
| create_tables(conn, cursor) | |
| # Import data from each primary sheet | |
| primary_sheets = [ | |
| ("UMD_OPERATIONS", "umd_operations"), | |
| ("CHILD_SCHEMA", "child_schema"), | |
| ("DP_REGISTER", "dp_register"), | |
| ("ATT_TERMS", "att_terms"), | |
| ("PHONETIC_REVERSAL", "phonetic_reversal"), | |
| ("SESSION_INDEX", "session_index"), | |
| ("PROTOCOL_CORRECTIONS", "protocol_corrections"), | |
| ("SCHOLAR_WARNINGS", "scholar_warnings") | |
| ] | |
| total_rows = 0 | |
| for excel_sheet, db_table in primary_sheets: | |
| if excel_sheet in wb.sheetnames: | |
| print(f"\nImporting {excel_sheet} -> {db_table}") | |
| count = import_sheet_data(conn, cursor, wb, excel_sheet, db_table) | |
| total_rows += count | |
| else: | |
| print(f"\nSheet {excel_sheet} not found in Excel file") | |
| # Build cross-reference table | |
| print("\nBuilding cross-reference table...") | |
| cross_ref_count = build_cross_reference(conn, cursor) | |
| # Create indexes for faster searching | |
| print("\nCreating indexes...") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_child_schema_entry_id ON child_schema(entry_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_child_schema_parent_op ON child_schema(parent_op)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_cross_ref_entry_id ON cross_reference(entry_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_cross_ref_dp_ref ON cross_reference(dp_ref)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_umd_operations_op_id ON umd_operations(op_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_dp_register_dp_code ON dp_register(dp_code)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_protocol_corrections_id ON protocol_corrections(correction_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_scholar_warnings_sc_code ON scholar_warnings(sc_code)") | |
| conn.commit() | |
| # Print statistics | |
| print(f"\n=== Import Summary ===") | |
| print(f"Total rows imported: {total_rows}") | |
| print(f"Cross-reference entries: {cross_ref_count}") | |
| # Verify data counts | |
| for table in ["umd_operations", "child_schema", "dp_register", "att_terms", | |
| "phonetic_reversal", "session_index", "protocol_corrections", | |
| "scholar_warnings", "cross_reference"]: | |
| cursor.execute(f"SELECT COUNT(*) FROM {table}") | |
| count = cursor.fetchone()[0] | |
| print(f"{table}: {count} rows") | |
| wb.close() | |
| conn.close() | |
| print(f"\nDatabase created successfully: {db_path}") | |
| if __name__ == "__main__": | |
| main() |