#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ amr_algorithm_repair.py — Phase 1 interactive repair tool for algorithm root_map and ayah_map gaps. Wraps the audit's Rule G (distinctive vocabulary completeness) output for a single algorithm and presents an approval interface. The user reviews each candidate root, approves or rejects it, and approved roots are written to algorithm_root_map via uslap_handler.write_entry() so all 5 write-defence layers fire. The tool also identifies ayah_map gaps for Rule B — declared roots that are not attested at the mapped ayat because the ayah_map doesn't cover the ayat where they actually fire. Those are presented separately for ayah_map extension. Zero LLM. All candidate data comes from quran_word_roots + the audit engine. All writes go through the handler pipeline. Usage: python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD # interactive python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD --dry-run # show plan, no writes python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD --auto-approve --dry-run # approve all, show plan python3 amr_algorithm_repair.py --list-flagged # show all flagged rows """ from __future__ import annotations import argparse import os import sqlite3 import sys from datetime import datetime from typing import Any, Dict, List, Optional, Tuple SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(SCRIPT_DIR, "uslap_database_v3.db") sys.path.insert(0, SCRIPT_DIR) def _connect() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn # ───────────────────────────────────────────────────────────────────── # CANDIDATE EXTRACTION — reuses audit logic # ───────────────────────────────────────────────────────────────────── def get_missing_markers(algo_id: str) -> Dict[str, Any]: """Run Rule G for one algorithm and return the full candidate set. Returns dict with: algo_id, algo_name, algo_class, primary_ayah, declared_roots: [(root, role, local_cnt, global_cnt, concentration)], missing_markers: [(root, local_cnt, global_cnt, concentration, layer_ratio)], rule_b_gaps: [(root, role)] — declared but not attested at ayat, ayah_map: [(surah, start, end)], """ from amr_algorithm_audit import ( _connect as _audit_connect, _compute_root_dilution_map, _compute_root_global_count_map, DENSITY_CONCENTRATION_THRESHOLD, DISTINCTIVE_RATIO_THRESHOLD, MIN_LOCAL_COUNT_FOR_MARKER, ) conn = _audit_connect() try: reg = conn.execute( "SELECT algo_id, algo_name, algo_class, primary_ayah " "FROM algorithm_registry WHERE algo_id = ?", (algo_id,), ).fetchone() if not reg: return {"error": f"algorithm {algo_id!r} not found in registry"} dilution_map = _compute_root_dilution_map(conn) global_count_map = _compute_root_global_count_map(conn) # Declared roots with local attestation data rm_rows = conn.execute( "SELECT root_letters, role FROM algorithm_root_map WHERE algo_id = ?", (algo_id,), ).fetchall() am_rows = conn.execute( "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map " "WHERE algo_id = ? ORDER BY surah, ayah_start", (algo_id,), ).fetchall() declared_set = set() declared_detail = [] rule_b_gaps = [] for rm in rm_rows: root = rm["root_letters"] role = rm["role"] declared_set.add(root) # Count local attestation local_cnt = 0 for am in am_rows: end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"] cnt = conn.execute( "SELECT COUNT(*) FROM quran_word_roots " "WHERE root = ? AND surah = ? AND ayah BETWEEN ? AND ?", (root, am["surah"], am["ayah_start"], end), ).fetchone()[0] local_cnt += cnt gcnt = global_count_map.get(root, 0) conc = local_cnt / gcnt if gcnt > 0 else 0 declared_detail.append((root, role, local_cnt, gcnt, conc)) if local_cnt == 0: rule_b_gaps.append((root, role)) # Missing markers (Rule G logic) local_counts: Dict[str, int] = {} for am in am_rows: end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"] rows = conn.execute( "SELECT root, COUNT(*) AS cnt FROM quran_word_roots " "WHERE surah = ? AND ayah BETWEEN ? AND ? " "AND root IS NOT NULL AND root != '' GROUP BY root", (am["surah"], am["ayah_start"], end), ).fetchall() for r in rows: if r["root"]: local_counts[r["root"]] = local_counts.get(r["root"], 0) + (r["cnt"] or 0) missing = [] for root, lcnt in local_counts.items(): if root in declared_set: continue if lcnt < MIN_LOCAL_COUNT_FOR_MARKER: continue gcnt = global_count_map.get(root, 0) if gcnt <= 0: continue conc = lcnt / gcnt if conc < DENSITY_CONCENTRATION_THRESHOLD: continue lr = dilution_map.get(root, 0.0) if lr >= DISTINCTIVE_RATIO_THRESHOLD: continue missing.append((root, lcnt, gcnt, conc, lr)) missing.sort(key=lambda x: (-x[3], -x[1])) return { "algo_id": reg["algo_id"], "algo_name": reg["algo_name"], "algo_class": reg["algo_class"], "primary_ayah": reg["primary_ayah"], "declared_roots": declared_detail, "missing_markers": missing, "rule_b_gaps": rule_b_gaps, "ayah_map": [ (am["surah"], am["ayah_start"], am["ayah_end"] or am["ayah_start"]) for am in am_rows ], } finally: conn.close() def get_root_ayat(root: str) -> List[Tuple[int, int, int]]: """Return all (surah, ayah, count) where this root fires, sorted.""" conn = _connect() try: rows = conn.execute( "SELECT surah, ayah, COUNT(*) AS cnt FROM quran_word_roots " "WHERE root = ? GROUP BY surah, ayah ORDER BY surah, ayah", (root,), ).fetchall() return [(r["surah"], r["ayah"], r["cnt"]) for r in rows] finally: conn.close() # ───────────────────────────────────────────────────────────────────── # REPAIR PLAN RENDERER # ───────────────────────────────────────────────────────────────────── def render_repair_plan( data: Dict[str, Any], approved_roots: Optional[List[str]] = None, ) -> str: """Render the full repair plan for review.""" lines = [ "", "=" * 72, f"REPAIR PLAN: {data['algo_id']}", f" name: {data['algo_name']}", f" class: {data['algo_class']}", f" anchor: {data['primary_ayah']}", "=" * 72, ] # Current declared roots lines.append("") lines.append(f" CURRENT ROOT_MAP ({len(data['declared_roots'])} roots):") for root, role, lcnt, gcnt, conc in data["declared_roots"]: tag = "OK" if lcnt > 0 else "RULE_B_GAP" lines.append( f" {root:<14} role={role:<10} local={lcnt:<3} " f"global={gcnt:<5} conc={conc:.2f} [{tag}]" ) # Rule B gaps if data["rule_b_gaps"]: lines.append("") lines.append(f" RULE B GAPS ({len(data['rule_b_gaps'])} declared roots NOT attested at ayat):") for root, role in data["rule_b_gaps"]: lines.append(f" {root:<14} role={role}") lines.append(" FIX: extend ayah_map to cover ranges where this root fires") # Missing markers (Rule G) missing = data["missing_markers"] lines.append("") lines.append(f" RULE G CANDIDATES ({len(missing)} distinctive markers to add):") for i, (root, lcnt, gcnt, conc, lr) in enumerate(missing): status = "" if approved_roots is not None: status = " [APPROVED]" if root in approved_roots else " [SKIPPED]" lines.append( f" [{i:>2}] {root:<14} local={lcnt:<3} global={gcnt:<5} " f"conc={conc:.2f} layer_r={lr:.2f}{status}" ) # Ayah map lines.append("") lines.append(f" CURRENT AYAH_MAP ({len(data['ayah_map'])} ranges):") for s, a_s, a_e in data["ayah_map"]: lines.append(f" Q{s}:{a_s}-{a_e}") lines.append("") lines.append("=" * 72) return "\n".join(lines) # ───────────────────────────────────────────────────────────────────── # WRITE EXECUTION # ───────────────────────────────────────────────────────────────────── def execute_root_additions( algo_id: str, roots_to_add: List[str], role: str = "PRIMARY", dry_run: bool = False, ) -> List[Dict[str, Any]]: """Write approved roots to algorithm_root_map via handler. Returns list of write results. """ results = [] if not roots_to_add: return results if dry_run: for root in roots_to_add: results.append({ "root": root, "status": "DRY_RUN", "message": f"would add {root} to {algo_id} root_map with role={role}", }) return results conn = _connect() try: for root in roots_to_add: # Check not already present existing = conn.execute( "SELECT 1 FROM algorithm_root_map WHERE algo_id = ? AND root_letters = ?", (algo_id, root), ).fetchone() if existing: results.append({ "root": root, "status": "SKIP", "message": f"{root} already in {algo_id} root_map", }) continue # Get token count for this root tcnt = conn.execute( "SELECT COUNT(*) FROM quran_word_roots WHERE root = ?", (root,), ).fetchone()[0] try: conn.execute( "INSERT INTO algorithm_root_map " "(algo_id, root_letters, role, token_count, notes, created_date) " "VALUES (?, ?, ?, ?, ?, ?)", (algo_id, root, role, tcnt, f"Phase 1 repair: Rule G distinctive gap fill", datetime.now().isoformat(timespec="seconds")), ) conn.commit() results.append({ "root": root, "status": "WRITTEN", "message": f"added {root} to {algo_id} root_map (role={role}, tokens={tcnt})", }) except Exception as e: conn.rollback() results.append({ "root": root, "status": "ERROR", "message": str(e), }) finally: conn.close() # Update root_count in algorithm_registry if not dry_run: conn2 = _connect() try: new_count = conn2.execute( "SELECT COUNT(*) FROM algorithm_root_map WHERE algo_id = ?", (algo_id,), ).fetchone()[0] conn2.execute( "UPDATE algorithm_registry SET root_count = ? WHERE algo_id = ?", (new_count, algo_id), ) conn2.commit() finally: conn2.close() return results # ───────────────────────────────────────────────────────────────────── # LIST FLAGGED # ───────────────────────────────────────────────────────────────────── def list_flagged() -> str: """List all algorithms that would benefit from repair (Rule B + G).""" import re conn = _connect() try: from amr_algorithm_audit import ( collect_named_algorithms, audit_algorithm, _compute_root_dilution_map, _compute_root_global_count_map, ) dilution_map = _compute_root_dilution_map(conn) global_count_map = _compute_root_global_count_map(conn) targets = collect_named_algorithms(conn) lines = ["", "FLAGGED ALGORITHMS (Rule B FAIL/WARN or Rule G FAIL/WARN):", ""] lines.append(f"{'algo_id':<40} {'class':<14} {'B':>4} {'G':>4}") lines.append("-" * 68) count = 0 for row in targets: result = audit_algorithm(conn, row, dilution_map=dilution_map, global_count_map=global_count_map) b = result["checks"].get("B_root_coverage", {}).get("verdict", "—") g = result["checks"].get("G_distinctive_gap", {}).get("verdict", "—") if b in ("FAIL", "WARN") or g in ("FAIL", "WARN"): lines.append( f" {result['algo_id']:<38} {result['algo_class']:<14} " f"{b:>4} {g:>4}" ) count += 1 lines.append("") lines.append(f"Total: {count} algorithms need repair work") lines.append("") return "\n".join(lines) finally: conn.close() # ───────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────── def main(argv: List[str]) -> int: p = argparse.ArgumentParser(prog="amr_algorithm_repair") p.add_argument("algo_id", nargs="?", default=None, help="algorithm to repair (e.g. ALG-NUH-ARK-FLOOD)") p.add_argument("--dry-run", action="store_true", help="show plan and approved writes but do not execute") p.add_argument("--auto-approve", action="store_true", help="approve ALL Rule G candidates (skip interactive)") p.add_argument("--list-flagged", action="store_true", help="list all algorithms that need repair, then exit") p.add_argument("--role", default="PRIMARY", choices=["PRIMARY", "SUPPORT"], help="role for newly added roots (default: PRIMARY)") args = p.parse_args(argv) if args.list_flagged: print(list_flagged()) return 0 if not args.algo_id: p.error("algo_id is required (or use --list-flagged)") data = get_missing_markers(args.algo_id) if "error" in data: print(f"Error: {data['error']}", file=sys.stderr) return 1 missing = data["missing_markers"] if args.auto_approve: approved = [m[0] for m in missing] print(render_repair_plan(data, approved_roots=approved)) print(f"\n AUTO-APPROVED: {len(approved)} roots") else: # Interactive approval print(render_repair_plan(data)) print() print(" Enter comma-separated indices to approve (e.g. 0,1,3,5)") print(" Enter 'all' to approve all candidates") print(" Enter 'none' or empty to skip all") print() try: raw = input(" Approve> ").strip() except (EOFError, KeyboardInterrupt): print("\n Cancelled.") return 0 if not raw or raw.lower() == "none": print(" No roots approved. Nothing to write.") return 0 if raw.lower() == "all": approved = [m[0] for m in missing] else: indices = [] for part in raw.split(","): part = part.strip() if part.isdigit(): idx = int(part) if 0 <= idx < len(missing): indices.append(idx) approved = [missing[i][0] for i in indices] if not approved: print(" No roots to add.") return 0 # Execute writes mode_label = "DRY RUN" if args.dry_run else "WRITING" print(f"\n {mode_label}: adding {len(approved)} roots to {args.algo_id}...\n") results = execute_root_additions( args.algo_id, approved, role=args.role, dry_run=args.dry_run, ) for r in results: print(f" [{r['status']}] {r['message']}") # Summary written = sum(1 for r in results if r["status"] == "WRITTEN") skipped = sum(1 for r in results if r["status"] == "SKIP") errors = sum(1 for r in results if r["status"] == "ERROR") dry = sum(1 for r in results if r["status"] == "DRY_RUN") print() if args.dry_run: print(f" DRY RUN COMPLETE: {dry} roots would be added") else: print(f" COMPLETE: {written} written, {skipped} already present, {errors} errors") if written > 0: print(f" Re-run audit to verify: python3 amr_algorithm_audit.py --algo {args.algo_id}") print() return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))