Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| amr_algorithm_repair.py β Phase 1 interactive repair tool for algorithm | |
| root_map and ayah_map gaps. | |
| Wraps the audit's Rule G (distinctive vocabulary completeness) output | |
| for a single algorithm and presents an approval interface. The user | |
| reviews each candidate root, approves or rejects it, and approved | |
| roots are written to algorithm_root_map via uslap_handler.write_entry() | |
| so all 5 write-defence layers fire. | |
| The tool also identifies ayah_map gaps for Rule B β declared roots | |
| that are not attested at the mapped ayat because the ayah_map doesn't | |
| cover the ayat where they actually fire. Those are presented separately | |
| for ayah_map extension. | |
| Zero LLM. All candidate data comes from quran_word_roots + the audit | |
| engine. All writes go through the handler pipeline. | |
| Usage: | |
| python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD # interactive | |
| python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD --dry-run # show plan, no writes | |
| python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD --auto-approve --dry-run | |
| # approve all, show plan | |
| python3 amr_algorithm_repair.py --list-flagged # show all flagged rows | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import sqlite3 | |
| import sys | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional, Tuple | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DB_PATH = os.path.join(SCRIPT_DIR, "uslap_database_v3.db") | |
| sys.path.insert(0, SCRIPT_DIR) | |
| def _connect() -> sqlite3.Connection: | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| return conn | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CANDIDATE EXTRACTION β reuses audit logic | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_missing_markers(algo_id: str) -> Dict[str, Any]: | |
| """Run Rule G for one algorithm and return the full candidate set. | |
| Returns dict with: | |
| algo_id, algo_name, algo_class, primary_ayah, | |
| declared_roots: [(root, role, local_cnt, global_cnt, concentration)], | |
| missing_markers: [(root, local_cnt, global_cnt, concentration, layer_ratio)], | |
| rule_b_gaps: [(root, role)] β declared but not attested at ayat, | |
| ayah_map: [(surah, start, end)], | |
| """ | |
| from amr_algorithm_audit import ( | |
| _connect as _audit_connect, | |
| _compute_root_dilution_map, | |
| _compute_root_global_count_map, | |
| DENSITY_CONCENTRATION_THRESHOLD, | |
| DISTINCTIVE_RATIO_THRESHOLD, | |
| MIN_LOCAL_COUNT_FOR_MARKER, | |
| ) | |
| conn = _audit_connect() | |
| try: | |
| reg = conn.execute( | |
| "SELECT algo_id, algo_name, algo_class, primary_ayah " | |
| "FROM algorithm_registry WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchone() | |
| if not reg: | |
| return {"error": f"algorithm {algo_id!r} not found in registry"} | |
| dilution_map = _compute_root_dilution_map(conn) | |
| global_count_map = _compute_root_global_count_map(conn) | |
| # Declared roots with local attestation data | |
| rm_rows = conn.execute( | |
| "SELECT root_letters, role FROM algorithm_root_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| am_rows = conn.execute( | |
| "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map " | |
| "WHERE algo_id = ? ORDER BY surah, ayah_start", | |
| (algo_id,), | |
| ).fetchall() | |
| declared_set = set() | |
| declared_detail = [] | |
| rule_b_gaps = [] | |
| for rm in rm_rows: | |
| root = rm["root_letters"] | |
| role = rm["role"] | |
| declared_set.add(root) | |
| # Count local attestation | |
| local_cnt = 0 | |
| for am in am_rows: | |
| end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"] | |
| cnt = conn.execute( | |
| "SELECT COUNT(*) FROM quran_word_roots " | |
| "WHERE root = ? AND surah = ? AND ayah BETWEEN ? AND ?", | |
| (root, am["surah"], am["ayah_start"], end), | |
| ).fetchone()[0] | |
| local_cnt += cnt | |
| gcnt = global_count_map.get(root, 0) | |
| conc = local_cnt / gcnt if gcnt > 0 else 0 | |
| declared_detail.append((root, role, local_cnt, gcnt, conc)) | |
| if local_cnt == 0: | |
| rule_b_gaps.append((root, role)) | |
| # Missing markers (Rule G logic) | |
| local_counts: Dict[str, int] = {} | |
| for am in am_rows: | |
| end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"] | |
| rows = conn.execute( | |
| "SELECT root, COUNT(*) AS cnt FROM quran_word_roots " | |
| "WHERE surah = ? AND ayah BETWEEN ? AND ? " | |
| "AND root IS NOT NULL AND root != '' GROUP BY root", | |
| (am["surah"], am["ayah_start"], end), | |
| ).fetchall() | |
| for r in rows: | |
| if r["root"]: | |
| local_counts[r["root"]] = local_counts.get(r["root"], 0) + (r["cnt"] or 0) | |
| missing = [] | |
| for root, lcnt in local_counts.items(): | |
| if root in declared_set: | |
| continue | |
| if lcnt < MIN_LOCAL_COUNT_FOR_MARKER: | |
| continue | |
| gcnt = global_count_map.get(root, 0) | |
| if gcnt <= 0: | |
| continue | |
| conc = lcnt / gcnt | |
| if conc < DENSITY_CONCENTRATION_THRESHOLD: | |
| continue | |
| lr = dilution_map.get(root, 0.0) | |
| if lr >= DISTINCTIVE_RATIO_THRESHOLD: | |
| continue | |
| missing.append((root, lcnt, gcnt, conc, lr)) | |
| missing.sort(key=lambda x: (-x[3], -x[1])) | |
| return { | |
| "algo_id": reg["algo_id"], | |
| "algo_name": reg["algo_name"], | |
| "algo_class": reg["algo_class"], | |
| "primary_ayah": reg["primary_ayah"], | |
| "declared_roots": declared_detail, | |
| "missing_markers": missing, | |
| "rule_b_gaps": rule_b_gaps, | |
| "ayah_map": [ | |
| (am["surah"], am["ayah_start"], am["ayah_end"] or am["ayah_start"]) | |
| for am in am_rows | |
| ], | |
| } | |
| finally: | |
| conn.close() | |
| def get_root_ayat(root: str) -> List[Tuple[int, int, int]]: | |
| """Return all (surah, ayah, count) where this root fires, sorted.""" | |
| conn = _connect() | |
| try: | |
| rows = conn.execute( | |
| "SELECT surah, ayah, COUNT(*) AS cnt FROM quran_word_roots " | |
| "WHERE root = ? GROUP BY surah, ayah ORDER BY surah, ayah", | |
| (root,), | |
| ).fetchall() | |
| return [(r["surah"], r["ayah"], r["cnt"]) for r in rows] | |
| finally: | |
| conn.close() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # REPAIR PLAN RENDERER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def render_repair_plan( | |
| data: Dict[str, Any], | |
| approved_roots: Optional[List[str]] = None, | |
| ) -> str: | |
| """Render the full repair plan for review.""" | |
| lines = [ | |
| "", | |
| "=" * 72, | |
| f"REPAIR PLAN: {data['algo_id']}", | |
| f" name: {data['algo_name']}", | |
| f" class: {data['algo_class']}", | |
| f" anchor: {data['primary_ayah']}", | |
| "=" * 72, | |
| ] | |
| # Current declared roots | |
| lines.append("") | |
| lines.append(f" CURRENT ROOT_MAP ({len(data['declared_roots'])} roots):") | |
| for root, role, lcnt, gcnt, conc in data["declared_roots"]: | |
| tag = "OK" if lcnt > 0 else "RULE_B_GAP" | |
| lines.append( | |
| f" {root:<14} role={role:<10} local={lcnt:<3} " | |
| f"global={gcnt:<5} conc={conc:.2f} [{tag}]" | |
| ) | |
| # Rule B gaps | |
| if data["rule_b_gaps"]: | |
| lines.append("") | |
| lines.append(f" RULE B GAPS ({len(data['rule_b_gaps'])} declared roots NOT attested at ayat):") | |
| for root, role in data["rule_b_gaps"]: | |
| lines.append(f" {root:<14} role={role}") | |
| lines.append(" FIX: extend ayah_map to cover ranges where this root fires") | |
| # Missing markers (Rule G) | |
| missing = data["missing_markers"] | |
| lines.append("") | |
| lines.append(f" RULE G CANDIDATES ({len(missing)} distinctive markers to add):") | |
| for i, (root, lcnt, gcnt, conc, lr) in enumerate(missing): | |
| status = "" | |
| if approved_roots is not None: | |
| status = " [APPROVED]" if root in approved_roots else " [SKIPPED]" | |
| lines.append( | |
| f" [{i:>2}] {root:<14} local={lcnt:<3} global={gcnt:<5} " | |
| f"conc={conc:.2f} layer_r={lr:.2f}{status}" | |
| ) | |
| # Ayah map | |
| lines.append("") | |
| lines.append(f" CURRENT AYAH_MAP ({len(data['ayah_map'])} ranges):") | |
| for s, a_s, a_e in data["ayah_map"]: | |
| lines.append(f" Q{s}:{a_s}-{a_e}") | |
| lines.append("") | |
| lines.append("=" * 72) | |
| return "\n".join(lines) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # WRITE EXECUTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def execute_root_additions( | |
| algo_id: str, | |
| roots_to_add: List[str], | |
| role: str = "PRIMARY", | |
| dry_run: bool = False, | |
| ) -> List[Dict[str, Any]]: | |
| """Write approved roots to algorithm_root_map via handler. | |
| Returns list of write results. | |
| """ | |
| results = [] | |
| if not roots_to_add: | |
| return results | |
| if dry_run: | |
| for root in roots_to_add: | |
| results.append({ | |
| "root": root, | |
| "status": "DRY_RUN", | |
| "message": f"would add {root} to {algo_id} root_map with role={role}", | |
| }) | |
| return results | |
| conn = _connect() | |
| try: | |
| for root in roots_to_add: | |
| # Check not already present | |
| existing = conn.execute( | |
| "SELECT 1 FROM algorithm_root_map WHERE algo_id = ? AND root_letters = ?", | |
| (algo_id, root), | |
| ).fetchone() | |
| if existing: | |
| results.append({ | |
| "root": root, | |
| "status": "SKIP", | |
| "message": f"{root} already in {algo_id} root_map", | |
| }) | |
| continue | |
| # Get token count for this root | |
| tcnt = conn.execute( | |
| "SELECT COUNT(*) FROM quran_word_roots WHERE root = ?", | |
| (root,), | |
| ).fetchone()[0] | |
| try: | |
| conn.execute( | |
| "INSERT INTO algorithm_root_map " | |
| "(algo_id, root_letters, role, token_count, notes, created_date) " | |
| "VALUES (?, ?, ?, ?, ?, ?)", | |
| (algo_id, root, role, tcnt, | |
| f"Phase 1 repair: Rule G distinctive gap fill", | |
| datetime.now().isoformat(timespec="seconds")), | |
| ) | |
| conn.commit() | |
| results.append({ | |
| "root": root, | |
| "status": "WRITTEN", | |
| "message": f"added {root} to {algo_id} root_map (role={role}, tokens={tcnt})", | |
| }) | |
| except Exception as e: | |
| conn.rollback() | |
| results.append({ | |
| "root": root, | |
| "status": "ERROR", | |
| "message": str(e), | |
| }) | |
| finally: | |
| conn.close() | |
| # Update root_count in algorithm_registry | |
| if not dry_run: | |
| conn2 = _connect() | |
| try: | |
| new_count = conn2.execute( | |
| "SELECT COUNT(*) FROM algorithm_root_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchone()[0] | |
| conn2.execute( | |
| "UPDATE algorithm_registry SET root_count = ? WHERE algo_id = ?", | |
| (new_count, algo_id), | |
| ) | |
| conn2.commit() | |
| finally: | |
| conn2.close() | |
| return results | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LIST FLAGGED | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_flagged() -> str: | |
| """List all algorithms that would benefit from repair (Rule B + G).""" | |
| import re | |
| conn = _connect() | |
| try: | |
| from amr_algorithm_audit import ( | |
| collect_named_algorithms, | |
| audit_algorithm, | |
| _compute_root_dilution_map, | |
| _compute_root_global_count_map, | |
| ) | |
| dilution_map = _compute_root_dilution_map(conn) | |
| global_count_map = _compute_root_global_count_map(conn) | |
| targets = collect_named_algorithms(conn) | |
| lines = ["", "FLAGGED ALGORITHMS (Rule B FAIL/WARN or Rule G FAIL/WARN):", ""] | |
| lines.append(f"{'algo_id':<40} {'class':<14} {'B':>4} {'G':>4}") | |
| lines.append("-" * 68) | |
| count = 0 | |
| for row in targets: | |
| result = audit_algorithm(conn, row, dilution_map=dilution_map, | |
| global_count_map=global_count_map) | |
| b = result["checks"].get("B_root_coverage", {}).get("verdict", "β") | |
| g = result["checks"].get("G_distinctive_gap", {}).get("verdict", "β") | |
| if b in ("FAIL", "WARN") or g in ("FAIL", "WARN"): | |
| lines.append( | |
| f" {result['algo_id']:<38} {result['algo_class']:<14} " | |
| f"{b:>4} {g:>4}" | |
| ) | |
| count += 1 | |
| lines.append("") | |
| lines.append(f"Total: {count} algorithms need repair work") | |
| lines.append("") | |
| return "\n".join(lines) | |
| finally: | |
| conn.close() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(argv: List[str]) -> int: | |
| p = argparse.ArgumentParser(prog="amr_algorithm_repair") | |
| p.add_argument("algo_id", nargs="?", default=None, | |
| help="algorithm to repair (e.g. ALG-NUH-ARK-FLOOD)") | |
| p.add_argument("--dry-run", action="store_true", | |
| help="show plan and approved writes but do not execute") | |
| p.add_argument("--auto-approve", action="store_true", | |
| help="approve ALL Rule G candidates (skip interactive)") | |
| p.add_argument("--list-flagged", action="store_true", | |
| help="list all algorithms that need repair, then exit") | |
| p.add_argument("--role", default="PRIMARY", | |
| choices=["PRIMARY", "SUPPORT"], | |
| help="role for newly added roots (default: PRIMARY)") | |
| args = p.parse_args(argv) | |
| if args.list_flagged: | |
| print(list_flagged()) | |
| return 0 | |
| if not args.algo_id: | |
| p.error("algo_id is required (or use --list-flagged)") | |
| data = get_missing_markers(args.algo_id) | |
| if "error" in data: | |
| print(f"Error: {data['error']}", file=sys.stderr) | |
| return 1 | |
| missing = data["missing_markers"] | |
| if args.auto_approve: | |
| approved = [m[0] for m in missing] | |
| print(render_repair_plan(data, approved_roots=approved)) | |
| print(f"\n AUTO-APPROVED: {len(approved)} roots") | |
| else: | |
| # Interactive approval | |
| print(render_repair_plan(data)) | |
| print() | |
| print(" Enter comma-separated indices to approve (e.g. 0,1,3,5)") | |
| print(" Enter 'all' to approve all candidates") | |
| print(" Enter 'none' or empty to skip all") | |
| print() | |
| try: | |
| raw = input(" Approve> ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print("\n Cancelled.") | |
| return 0 | |
| if not raw or raw.lower() == "none": | |
| print(" No roots approved. Nothing to write.") | |
| return 0 | |
| if raw.lower() == "all": | |
| approved = [m[0] for m in missing] | |
| else: | |
| indices = [] | |
| for part in raw.split(","): | |
| part = part.strip() | |
| if part.isdigit(): | |
| idx = int(part) | |
| if 0 <= idx < len(missing): | |
| indices.append(idx) | |
| approved = [missing[i][0] for i in indices] | |
| if not approved: | |
| print(" No roots to add.") | |
| return 0 | |
| # Execute writes | |
| mode_label = "DRY RUN" if args.dry_run else "WRITING" | |
| print(f"\n {mode_label}: adding {len(approved)} roots to {args.algo_id}...\n") | |
| results = execute_root_additions( | |
| args.algo_id, | |
| approved, | |
| role=args.role, | |
| dry_run=args.dry_run, | |
| ) | |
| for r in results: | |
| print(f" [{r['status']}] {r['message']}") | |
| # Summary | |
| written = sum(1 for r in results if r["status"] == "WRITTEN") | |
| skipped = sum(1 for r in results if r["status"] == "SKIP") | |
| errors = sum(1 for r in results if r["status"] == "ERROR") | |
| dry = sum(1 for r in results if r["status"] == "DRY_RUN") | |
| print() | |
| if args.dry_run: | |
| print(f" DRY RUN COMPLETE: {dry} roots would be added") | |
| else: | |
| print(f" COMPLETE: {written} written, {skipped} already present, {errors} errors") | |
| if written > 0: | |
| print(f" Re-run audit to verify: python3 amr_algorithm_audit.py --algo {args.algo_id}") | |
| print() | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) | |