uslap-query / Code_files /amr_algorithm_repair.py
uslap's picture
Upload folder using huggingface_hub
7cc8e29 verified
Raw
History Blame Contribute Delete
18.7 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
amr_algorithm_repair.py β€” Phase 1 interactive repair tool for algorithm
root_map and ayah_map gaps.
Wraps the audit's Rule G (distinctive vocabulary completeness) output
for a single algorithm and presents an approval interface. The user
reviews each candidate root, approves or rejects it, and approved
roots are written to algorithm_root_map via uslap_handler.write_entry()
so all 5 write-defence layers fire.
The tool also identifies ayah_map gaps for Rule B β€” declared roots
that are not attested at the mapped ayat because the ayah_map doesn't
cover the ayat where they actually fire. Those are presented separately
for ayah_map extension.
Zero LLM. All candidate data comes from quran_word_roots + the audit
engine. All writes go through the handler pipeline.
Usage:
python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD # interactive
python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD --dry-run # show plan, no writes
python3 amr_algorithm_repair.py ALG-NUH-ARK-FLOOD --auto-approve --dry-run
# approve all, show plan
python3 amr_algorithm_repair.py --list-flagged # show all flagged rows
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(SCRIPT_DIR, "uslap_database_v3.db")
sys.path.insert(0, SCRIPT_DIR)
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
# ─────────────────────────────────────────────────────────────────────
# CANDIDATE EXTRACTION β€” reuses audit logic
# ─────────────────────────────────────────────────────────────────────
def get_missing_markers(algo_id: str) -> Dict[str, Any]:
"""Run Rule G for one algorithm and return the full candidate set.
Returns dict with:
algo_id, algo_name, algo_class, primary_ayah,
declared_roots: [(root, role, local_cnt, global_cnt, concentration)],
missing_markers: [(root, local_cnt, global_cnt, concentration, layer_ratio)],
rule_b_gaps: [(root, role)] β€” declared but not attested at ayat,
ayah_map: [(surah, start, end)],
"""
from amr_algorithm_audit import (
_connect as _audit_connect,
_compute_root_dilution_map,
_compute_root_global_count_map,
DENSITY_CONCENTRATION_THRESHOLD,
DISTINCTIVE_RATIO_THRESHOLD,
MIN_LOCAL_COUNT_FOR_MARKER,
)
conn = _audit_connect()
try:
reg = conn.execute(
"SELECT algo_id, algo_name, algo_class, primary_ayah "
"FROM algorithm_registry WHERE algo_id = ?",
(algo_id,),
).fetchone()
if not reg:
return {"error": f"algorithm {algo_id!r} not found in registry"}
dilution_map = _compute_root_dilution_map(conn)
global_count_map = _compute_root_global_count_map(conn)
# Declared roots with local attestation data
rm_rows = conn.execute(
"SELECT root_letters, role FROM algorithm_root_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
am_rows = conn.execute(
"SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map "
"WHERE algo_id = ? ORDER BY surah, ayah_start",
(algo_id,),
).fetchall()
declared_set = set()
declared_detail = []
rule_b_gaps = []
for rm in rm_rows:
root = rm["root_letters"]
role = rm["role"]
declared_set.add(root)
# Count local attestation
local_cnt = 0
for am in am_rows:
end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"]
cnt = conn.execute(
"SELECT COUNT(*) FROM quran_word_roots "
"WHERE root = ? AND surah = ? AND ayah BETWEEN ? AND ?",
(root, am["surah"], am["ayah_start"], end),
).fetchone()[0]
local_cnt += cnt
gcnt = global_count_map.get(root, 0)
conc = local_cnt / gcnt if gcnt > 0 else 0
declared_detail.append((root, role, local_cnt, gcnt, conc))
if local_cnt == 0:
rule_b_gaps.append((root, role))
# Missing markers (Rule G logic)
local_counts: Dict[str, int] = {}
for am in am_rows:
end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"]
rows = conn.execute(
"SELECT root, COUNT(*) AS cnt FROM quran_word_roots "
"WHERE surah = ? AND ayah BETWEEN ? AND ? "
"AND root IS NOT NULL AND root != '' GROUP BY root",
(am["surah"], am["ayah_start"], end),
).fetchall()
for r in rows:
if r["root"]:
local_counts[r["root"]] = local_counts.get(r["root"], 0) + (r["cnt"] or 0)
missing = []
for root, lcnt in local_counts.items():
if root in declared_set:
continue
if lcnt < MIN_LOCAL_COUNT_FOR_MARKER:
continue
gcnt = global_count_map.get(root, 0)
if gcnt <= 0:
continue
conc = lcnt / gcnt
if conc < DENSITY_CONCENTRATION_THRESHOLD:
continue
lr = dilution_map.get(root, 0.0)
if lr >= DISTINCTIVE_RATIO_THRESHOLD:
continue
missing.append((root, lcnt, gcnt, conc, lr))
missing.sort(key=lambda x: (-x[3], -x[1]))
return {
"algo_id": reg["algo_id"],
"algo_name": reg["algo_name"],
"algo_class": reg["algo_class"],
"primary_ayah": reg["primary_ayah"],
"declared_roots": declared_detail,
"missing_markers": missing,
"rule_b_gaps": rule_b_gaps,
"ayah_map": [
(am["surah"], am["ayah_start"], am["ayah_end"] or am["ayah_start"])
for am in am_rows
],
}
finally:
conn.close()
def get_root_ayat(root: str) -> List[Tuple[int, int, int]]:
"""Return all (surah, ayah, count) where this root fires, sorted."""
conn = _connect()
try:
rows = conn.execute(
"SELECT surah, ayah, COUNT(*) AS cnt FROM quran_word_roots "
"WHERE root = ? GROUP BY surah, ayah ORDER BY surah, ayah",
(root,),
).fetchall()
return [(r["surah"], r["ayah"], r["cnt"]) for r in rows]
finally:
conn.close()
# ─────────────────────────────────────────────────────────────────────
# REPAIR PLAN RENDERER
# ─────────────────────────────────────────────────────────────────────
def render_repair_plan(
data: Dict[str, Any],
approved_roots: Optional[List[str]] = None,
) -> str:
"""Render the full repair plan for review."""
lines = [
"",
"=" * 72,
f"REPAIR PLAN: {data['algo_id']}",
f" name: {data['algo_name']}",
f" class: {data['algo_class']}",
f" anchor: {data['primary_ayah']}",
"=" * 72,
]
# Current declared roots
lines.append("")
lines.append(f" CURRENT ROOT_MAP ({len(data['declared_roots'])} roots):")
for root, role, lcnt, gcnt, conc in data["declared_roots"]:
tag = "OK" if lcnt > 0 else "RULE_B_GAP"
lines.append(
f" {root:<14} role={role:<10} local={lcnt:<3} "
f"global={gcnt:<5} conc={conc:.2f} [{tag}]"
)
# Rule B gaps
if data["rule_b_gaps"]:
lines.append("")
lines.append(f" RULE B GAPS ({len(data['rule_b_gaps'])} declared roots NOT attested at ayat):")
for root, role in data["rule_b_gaps"]:
lines.append(f" {root:<14} role={role}")
lines.append(" FIX: extend ayah_map to cover ranges where this root fires")
# Missing markers (Rule G)
missing = data["missing_markers"]
lines.append("")
lines.append(f" RULE G CANDIDATES ({len(missing)} distinctive markers to add):")
for i, (root, lcnt, gcnt, conc, lr) in enumerate(missing):
status = ""
if approved_roots is not None:
status = " [APPROVED]" if root in approved_roots else " [SKIPPED]"
lines.append(
f" [{i:>2}] {root:<14} local={lcnt:<3} global={gcnt:<5} "
f"conc={conc:.2f} layer_r={lr:.2f}{status}"
)
# Ayah map
lines.append("")
lines.append(f" CURRENT AYAH_MAP ({len(data['ayah_map'])} ranges):")
for s, a_s, a_e in data["ayah_map"]:
lines.append(f" Q{s}:{a_s}-{a_e}")
lines.append("")
lines.append("=" * 72)
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────
# WRITE EXECUTION
# ─────────────────────────────────────────────────────────────────────
def execute_root_additions(
algo_id: str,
roots_to_add: List[str],
role: str = "PRIMARY",
dry_run: bool = False,
) -> List[Dict[str, Any]]:
"""Write approved roots to algorithm_root_map via handler.
Returns list of write results.
"""
results = []
if not roots_to_add:
return results
if dry_run:
for root in roots_to_add:
results.append({
"root": root,
"status": "DRY_RUN",
"message": f"would add {root} to {algo_id} root_map with role={role}",
})
return results
conn = _connect()
try:
for root in roots_to_add:
# Check not already present
existing = conn.execute(
"SELECT 1 FROM algorithm_root_map WHERE algo_id = ? AND root_letters = ?",
(algo_id, root),
).fetchone()
if existing:
results.append({
"root": root,
"status": "SKIP",
"message": f"{root} already in {algo_id} root_map",
})
continue
# Get token count for this root
tcnt = conn.execute(
"SELECT COUNT(*) FROM quran_word_roots WHERE root = ?",
(root,),
).fetchone()[0]
try:
conn.execute(
"INSERT INTO algorithm_root_map "
"(algo_id, root_letters, role, token_count, notes, created_date) "
"VALUES (?, ?, ?, ?, ?, ?)",
(algo_id, root, role, tcnt,
f"Phase 1 repair: Rule G distinctive gap fill",
datetime.now().isoformat(timespec="seconds")),
)
conn.commit()
results.append({
"root": root,
"status": "WRITTEN",
"message": f"added {root} to {algo_id} root_map (role={role}, tokens={tcnt})",
})
except Exception as e:
conn.rollback()
results.append({
"root": root,
"status": "ERROR",
"message": str(e),
})
finally:
conn.close()
# Update root_count in algorithm_registry
if not dry_run:
conn2 = _connect()
try:
new_count = conn2.execute(
"SELECT COUNT(*) FROM algorithm_root_map WHERE algo_id = ?",
(algo_id,),
).fetchone()[0]
conn2.execute(
"UPDATE algorithm_registry SET root_count = ? WHERE algo_id = ?",
(new_count, algo_id),
)
conn2.commit()
finally:
conn2.close()
return results
# ─────────────────────────────────────────────────────────────────────
# LIST FLAGGED
# ─────────────────────────────────────────────────────────────────────
def list_flagged() -> str:
"""List all algorithms that would benefit from repair (Rule B + G)."""
import re
conn = _connect()
try:
from amr_algorithm_audit import (
collect_named_algorithms,
audit_algorithm,
_compute_root_dilution_map,
_compute_root_global_count_map,
)
dilution_map = _compute_root_dilution_map(conn)
global_count_map = _compute_root_global_count_map(conn)
targets = collect_named_algorithms(conn)
lines = ["", "FLAGGED ALGORITHMS (Rule B FAIL/WARN or Rule G FAIL/WARN):", ""]
lines.append(f"{'algo_id':<40} {'class':<14} {'B':>4} {'G':>4}")
lines.append("-" * 68)
count = 0
for row in targets:
result = audit_algorithm(conn, row, dilution_map=dilution_map,
global_count_map=global_count_map)
b = result["checks"].get("B_root_coverage", {}).get("verdict", "β€”")
g = result["checks"].get("G_distinctive_gap", {}).get("verdict", "β€”")
if b in ("FAIL", "WARN") or g in ("FAIL", "WARN"):
lines.append(
f" {result['algo_id']:<38} {result['algo_class']:<14} "
f"{b:>4} {g:>4}"
)
count += 1
lines.append("")
lines.append(f"Total: {count} algorithms need repair work")
lines.append("")
return "\n".join(lines)
finally:
conn.close()
# ─────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────
def main(argv: List[str]) -> int:
p = argparse.ArgumentParser(prog="amr_algorithm_repair")
p.add_argument("algo_id", nargs="?", default=None,
help="algorithm to repair (e.g. ALG-NUH-ARK-FLOOD)")
p.add_argument("--dry-run", action="store_true",
help="show plan and approved writes but do not execute")
p.add_argument("--auto-approve", action="store_true",
help="approve ALL Rule G candidates (skip interactive)")
p.add_argument("--list-flagged", action="store_true",
help="list all algorithms that need repair, then exit")
p.add_argument("--role", default="PRIMARY",
choices=["PRIMARY", "SUPPORT"],
help="role for newly added roots (default: PRIMARY)")
args = p.parse_args(argv)
if args.list_flagged:
print(list_flagged())
return 0
if not args.algo_id:
p.error("algo_id is required (or use --list-flagged)")
data = get_missing_markers(args.algo_id)
if "error" in data:
print(f"Error: {data['error']}", file=sys.stderr)
return 1
missing = data["missing_markers"]
if args.auto_approve:
approved = [m[0] for m in missing]
print(render_repair_plan(data, approved_roots=approved))
print(f"\n AUTO-APPROVED: {len(approved)} roots")
else:
# Interactive approval
print(render_repair_plan(data))
print()
print(" Enter comma-separated indices to approve (e.g. 0,1,3,5)")
print(" Enter 'all' to approve all candidates")
print(" Enter 'none' or empty to skip all")
print()
try:
raw = input(" Approve> ").strip()
except (EOFError, KeyboardInterrupt):
print("\n Cancelled.")
return 0
if not raw or raw.lower() == "none":
print(" No roots approved. Nothing to write.")
return 0
if raw.lower() == "all":
approved = [m[0] for m in missing]
else:
indices = []
for part in raw.split(","):
part = part.strip()
if part.isdigit():
idx = int(part)
if 0 <= idx < len(missing):
indices.append(idx)
approved = [missing[i][0] for i in indices]
if not approved:
print(" No roots to add.")
return 0
# Execute writes
mode_label = "DRY RUN" if args.dry_run else "WRITING"
print(f"\n {mode_label}: adding {len(approved)} roots to {args.algo_id}...\n")
results = execute_root_additions(
args.algo_id,
approved,
role=args.role,
dry_run=args.dry_run,
)
for r in results:
print(f" [{r['status']}] {r['message']}")
# Summary
written = sum(1 for r in results if r["status"] == "WRITTEN")
skipped = sum(1 for r in results if r["status"] == "SKIP")
errors = sum(1 for r in results if r["status"] == "ERROR")
dry = sum(1 for r in results if r["status"] == "DRY_RUN")
print()
if args.dry_run:
print(f" DRY RUN COMPLETE: {dry} roots would be added")
else:
print(f" COMPLETE: {written} written, {skipped} already present, {errors} errors")
if written > 0:
print(f" Re-run audit to verify: python3 amr_algorithm_audit.py --algo {args.algo_id}")
print()
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))