#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ amr_algorithm_audit.py — Phase 0 READ-ONLY audit of algorithm_registry. Runs the Unit 3C hard rules against every NAMED (non-block-placeholder) algorithm in algorithm_registry, producing a per-algorithm verdict and a fleet-wide summary. ZERO writes. ZERO LLM. Pure SQL analysis. Purpose: Before extending the algorithm layer (Phase 2), audit the existing 86 named rows to discover which ones have: - sparse root_maps (roots declared in Rule B but not attested at the algorithm's ayat in quran_word_roots) - primary_ayah pointing outside the ayah_map - non-canonical role values - algo_name that doesn't reference a Qur'anic anchor These are the prerequisites for any extension work. Hard rules audited: Rule A — NAME FORMAT For non-composite rows: algo_name / algo_id must reference a Qur'anic-anchored term (named figure, named surah, named concept, or BINARY-pair structural keyword). For composite rows (is_composite=1): exempt; composites may name historical instantiations of Qur'anic patterns. Rule B — ROOT COVERAGE (the strictest check) For every root in algorithm_root_map, verify that the root has at least one token in quran_word_roots within an ayah covered by this algorithm's ayah_map. Missing roots are the single most important finding — they reveal algorithms whose declared root signatures are not attested at their own ayat. Rule C — PRIMARY_AYAH ALIGNMENT primary_ayah must fall within at least one ayah_map range for this algorithm. A primary_ayah outside the ayat is a pointer error, not a content error. Rule D — ROLE VOCABULARY Every role value in algorithm_root_map must be in {PRIMARY, SUPPORT, BINARY_A, BINARY_B}. Other values are schema drift. Rule E — FLEET STATUS Fleet-wide tally of status and quf_pass values. Not a per-row check; reported as summary only. Rule F — PATTERN SPECIFICITY (added Session 42) For every root in algorithm_root_map, compute the "algorithm presence ratio" — the fraction of named algorithms that declare this root. A root with presence >= 15% has low discriminating power (it appears in many algorithms, so it cannot be a distinctive marker of any single one). A root with presence >= 30% is layer-level dilution. For each algorithm: - At least one root must be distinctive (ratio < 0.15). - If zero distinctive roots → FAIL. - If minority distinctive (< 50%) → WARN. - Else → PASS. This catches root_maps that are populated with layer-common particles only. Measured AGAINST THE NAMED ALGORITHM CORPUS, not against the full Qur'anic token count — a root can be Qur'an-frequent but still algorithm-distinctive. Rule G — DISTINCTIVE VOCABULARY COMPLETENESS (added Session 42) For each algorithm with an ayah_map, find content roots that fire at the mapped ayat with high DENSITY CONCENTRATION but are not declared in the algorithm's root_map. A root qualifies as a "missing marker" iff ALL of: (a) local_count at ayat >= 2 (b) local_count / total_qur_tokens >= 0.10 (≥10% of the root's total Qur'anic occurrences happen at this algorithm's ayat — the root is concentrated at these ayat, not merely present. Tuned from 0.15 after spot-check showed 0.15 missed ف-ل-ك at NUH.) (c) algorithm-layer presence ratio < 0.15 (distinctive within the named algorithm corpus) (d) root is not already in the algorithm's root_map The density-concentration gate filters out grammatical particles and ubiquitous function words (م-ن, ل-أ, إ-ن, ق-و-ل, أ-ل-ه, etc.) which have thousands of Qur'anic tokens and would otherwise flood the results. Only genuine pattern markers pass all four gates. Verdicts: - missing_count > declared_count → FAIL - missing_count >= max(3, declared/2) → WARN - else → PASS Usage: python3 amr_algorithm_audit.py # full audit, stdout python3 amr_algorithm_audit.py --algo ALG-NUH-ARK-FLOOD python3 amr_algorithm_audit.py --class OPERATOR python3 amr_algorithm_audit.py --verdict FAIL # only failing rows python3 amr_algorithm_audit.py --summary-only # no per-row detail python3 amr_algorithm_audit.py --save report.txt # save full output """ from __future__ import annotations import argparse import os import re import sqlite3 import sys from collections import Counter from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(SCRIPT_DIR, "uslap_database_v3.db") BLOCK_ALGO_PATTERN = re.compile(r"^ALG-SURAH-\d{3}-BLOCK-\d{3}-\d{3}$") SURAH_ALGO_PATTERN = re.compile(r"^ALG-SURAH-(\d{3})-[A-Z]+$") BINARY_ALGO_PATTERN = re.compile(r"^ALG-BINARY-[A-Z\-]+$") CANONICAL_ROLES = frozenset({"PRIMARY", "SUPPORT", "BINARY_A", "BINARY_B"}) # ───────────────────────────────────────────────────────────────────── # Qur'anic anchor tokens for Rule A. # Not exhaustive — just wide enough to cover the 86 named rows that # exist right now. Any row that doesn't hit one of these gets a WARN # and goes on the human-review list. # ───────────────────────────────────────────────────────────────────── QURANIC_FIGURES = frozenset({ "PHARAOH", "FIRAWN", "QARUN", "HAMAN", "IBLIS", "NUH", "IBRAHIM", "LUT", "MUSA", "YUSUF", "KHIDR", "SULAYMAN", "BILQIS", "MARYAM", "THAMUD", "SAMIRI", "AD", "HUD", "DHUL", "QARNAYN", "BANI", "ISRAIL", "YUNUS", "AYYUB", "ZAKARIYA", "YAHYA", "ISA", "IDRIS", "ADAM", "HAWA", "NIMROD", "KAHF", "QARIAH", }) QURANIC_SURAH_NAMES = frozenset({ "FATIHA", "BAQARAH", "IMRAN", "NISA", "MAIDA", "ANAM", "ARAF", "ANFAL", "TAWBAH", "HUD", "YUSUF", "RAD", "IBRAHIM", "HIJR", "NAHL", "ISRA", "KAHF", "MARYAM", "TAHA", "ANBIYA", "HAJJ", "MUMINUN", "NUR", "FURQAN", "SHUARA", "NAML", "QASAS", "ANKABUT", "RUM", "LUQMAN", "SAJDAH", "AHZAB", "SABA", "FATIR", "YASIN", "YA-SIN", "SAFFAT", "SAD", "ZUMAR", "GHAFIR", "FUSSILAT", "SHURA", "ZUKHRUF", "DUKHAN", "JATHIYAH", "AHQAF", "MUHAMMAD", "FATH", "HUJURAT", "QAF", "DHARIYAT", "TUR", "NAJM", "QAMAR", "RAHMAN", "WAQIAH", "HADID", "MUJADILAH", "HASHR", "MUMTAHINAH", "SAFF", "JUMUAH", "MUNAFIQUN", "TAGHABUN", "TALAQ", "TAHRIM", "MULK", "QALAM", "HAQQAH", "MAARIJ", "JINN", "MUZZAMMIL", "MUDDATHTHIR", "QIYAMAH", "INSAN", "MURSALAT", "NABA", "NAZIAT", "ABASA", "TAKWIR", "INFITAR", "MUTAFFIFIN", "INSHIQAQ", "BURUJ", "TARIQ", "ALA", "GHASHIYAH", "FAJR", "BALAD", "SHAMS", "LAYL", "DUHA", "SHARH", "TIN", "ALAQ", "QADR", "BAYYINAH", "ZALZALAH", "ADIYAT", "ASR", "HUMAZAH", "FIL", "QURAYSH", "MAUN", "KAWTHAR", "KAFIRUN", "NASR", "MASAD", "IKHLAS", "FALAQ", "NAS", }) QURANIC_CONCEPT_KEYWORDS = frozenset({ "HAYAT", "MAWT", "HIDAYA", "DALAL", "NOUR", "ZULUMAT", "HAQQ", "BATIL", "NAHAR", "SAMA", "BASAR", "AKHIRA", "DUNYA", "GARDEN", "FIRE", "BOOK", "WATER", "DENIAL", "COVENANT", "BREAKING", "CYCLE", "CREATION", "ALTERATION", "MIZAN", "RIBA", "WASIYYA", "KAYL", "IDOL", "REFUTATION", "ARK", "FLOOD", "NAQA", "RAQABA", "THRONE", "POWER", "HUMAN", "SALE", "SCHEME", "REFUSAL", "DECEPTION", "FRAUD", "WEALTH", "DESTRUCTION", "FAMILY", "HOSTILE", "TOWER", "EXTRACTION", "PROTECTION", "VULNERABLE", "INHERITANCE", "DISTRIBUTION", "TRAFFICKING", "PROGENY", "DEPLOYMENT", "CONCEPTION", "SENT", "DOWN", "AZIZ", "HAKIM", "CLOSING", "CASCADE", "HISTORICAL", "WARNING", "KINDLING", "PROVISION", "LIFE", "DEATH", "GUIDANCE", "LIGHT", "DARKNESS", "TRUTH", "FALSEHOOD", "NIGHT", "DAY", "HEARING", "SIGHT", "NASL", "HARTH", "OPERATOR", "COUNTER", "WAR", "BOTH", "SIDES", "DENIAL", "REFRAIN", }) ALLOWED_COMPOSITE_SUBJECTS = frozenset({ "RADHANITE", "HABASHA", "WASATANIYYAH", "NETWORK", "COMPOSITE", "PERSECUTION", "OPERATION", }) # ───────────────────────────────────────────────────────────────────── # DB HELPERS # ───────────────────────────────────────────────────────────────────── def _connect() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def _parse_q_anchor(q: Optional[str]) -> Optional[Tuple[int, int]]: """Parse 'Q28:4' → (28, 4). None on failure.""" if not q: return None m = re.match(r"^Q?(\d{1,3}):(\d{1,4})$", q.strip()) if not m: return None return (int(m.group(1)), int(m.group(2))) # ───────────────────────────────────────────────────────────────────── # RULE CHECKERS # ───────────────────────────────────────────────────────────────────── def _check_name_format(row: sqlite3.Row) -> Dict[str, Any]: """Rule A — name references a Qur'anic anchor.""" algo_id = row["algo_id"] or "" algo_name = row["algo_name"] or "" is_composite = bool(row["is_composite"]) if is_composite: return { "verdict": "PASS", "reason": "composite row — exempt from Rule A (allows historical subject label)", } # SURAH-NNN-KEYWORD pattern — validate surah number + keyword sm = SURAH_ALGO_PATTERN.match(algo_id) if sm: surah_num = int(sm.group(1)) if 1 <= surah_num <= 114: return { "verdict": "PASS", "reason": f"surah-anchored algorithm (surah {surah_num})", } return { "verdict": "FAIL", "reason": f"surah number {surah_num} out of range (1..114)", } # BINARY-XXX-YYY pattern — structural binary pair if BINARY_ALGO_PATTERN.match(algo_id): return {"verdict": "PASS", "reason": "binary-pair structural algorithm"} # Token scan — split algo_id on dashes, check each token tokens = [t for t in re.split(r"[-_]", algo_id.upper()) if t and t != "ALG"] hits = [] for t in tokens: if t in QURANIC_FIGURES: hits.append(("figure", t)) elif t in QURANIC_SURAH_NAMES: hits.append(("surah_name", t)) elif t in QURANIC_CONCEPT_KEYWORDS: hits.append(("concept", t)) if hits: return { "verdict": "PASS", "reason": f"{len(hits)} Qur'anic anchor token(s) matched", "matched_tokens": hits, } # Check if ALL tokens are composite-only (and is_composite is false — # because if it were true we'd have passed earlier) composite_only = [t for t in tokens if t in ALLOWED_COMPOSITE_SUBJECTS] if composite_only and len(composite_only) == len([t for t in tokens if t]): return { "verdict": "WARN", "reason": f"name uses composite-only tokens {composite_only} but is_composite=0", "matched_tokens": [("composite_only", t) for t in composite_only], } return { "verdict": "WARN", "reason": "no Qur'anic anchor token matched in algo_id", "tokens_tried": tokens, } def _check_root_coverage(conn: sqlite3.Connection, algo_id: str) -> Dict[str, Any]: """Rule B — every root_map root must be attested at one of the ayat.""" rm_rows = conn.execute( "SELECT root_letters, role FROM algorithm_root_map WHERE algo_id = ?", (algo_id,), ).fetchall() am_rows = conn.execute( "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?", (algo_id,), ).fetchall() if not rm_rows: return { "verdict": "FAIL", "reason": "algorithm has NO root_map entries", "total_roots": 0, "present": 0, "missing": 0, "missing_list": [], "present_list": [], } if not am_rows: return { "verdict": "FAIL", "reason": "algorithm has NO ayah_map entries — cannot verify coverage", "total_roots": len(rm_rows), "present": 0, "missing": len(rm_rows), "missing_list": [(r["root_letters"], r["role"]) for r in rm_rows], "present_list": [], } present: List[Tuple[str, str]] = [] missing: List[Tuple[str, str]] = [] for rm in rm_rows: root = rm["root_letters"] role = rm["role"] or "" found = False for am in am_rows: surah = am["surah"] ayah_start = am["ayah_start"] ayah_end = am["ayah_end"] if am["ayah_end"] is not None else ayah_start cnt = conn.execute( "SELECT COUNT(*) FROM quran_word_roots " "WHERE root = ? AND surah = ? AND ayah BETWEEN ? AND ?", (root, surah, ayah_start, ayah_end), ).fetchone()[0] if cnt > 0: found = True break if found: present.append((root, role)) else: missing.append((root, role)) total = len(rm_rows) hit = len(present) if hit == total: return { "verdict": "PASS", "reason": f"all {total} root(s) attested within ayah_map", "total_roots": total, "present": hit, "missing": 0, "missing_list": [], "present_list": present, } if hit >= total * 0.75 and missing: return { "verdict": "WARN", "reason": f"{len(missing)} of {total} root(s) not attested within ayah_map", "total_roots": total, "present": hit, "missing": len(missing), "missing_list": missing, "present_list": present, } return { "verdict": "FAIL", "reason": f"{len(missing)} of {total} root(s) not attested — " f"coverage below 75%", "total_roots": total, "present": hit, "missing": len(missing), "missing_list": missing, "present_list": present, } def _check_primary_ayah( conn: sqlite3.Connection, algo_id: str, primary_ayah: Optional[str], ) -> Dict[str, Any]: """Rule C — primary_ayah must be in ayah_map.""" parsed = _parse_q_anchor(primary_ayah) if parsed is None: return { "verdict": "FAIL", "reason": f"primary_ayah unparseable: {primary_ayah!r}", } p_surah, p_ayah = parsed am_rows = conn.execute( "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?", (algo_id,), ).fetchall() if not am_rows: return { "verdict": "FAIL", "reason": "no ayah_map to compare against", } for am in am_rows: end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"] if am["surah"] == p_surah and am["ayah_start"] <= p_ayah <= end: return { "verdict": "PASS", "reason": f"primary_ayah Q{p_surah}:{p_ayah} is in ayah_map", } return { "verdict": "FAIL", "reason": f"primary_ayah Q{p_surah}:{p_ayah} is NOT in any ayah_map range", } def _check_role_vocabulary(conn: sqlite3.Connection, algo_id: str) -> Dict[str, Any]: """Rule D — every role must be in the canonical set.""" rows = conn.execute( "SELECT DISTINCT role FROM algorithm_root_map WHERE algo_id = ?", (algo_id,), ).fetchall() if not rows: return {"verdict": "PASS", "reason": "no roles (empty root_map)"} seen = {r["role"] for r in rows if r["role"]} bad = sorted(seen - CANONICAL_ROLES) if not bad: return { "verdict": "PASS", "reason": f"roles in canonical set: {sorted(seen)}", } return { "verdict": "FAIL", "reason": f"non-canonical role value(s): {bad}", "bad_roles": bad, } # ───────────────────────────────────────────────────────────────────── # RULE F — PATTERN SPECIFICITY (Session 42) # ───────────────────────────────────────────────────────────────────── # Dilution thresholds (fraction of named algorithms that declare a root): # >= 0.30 SEVERE DILUTION (root is function-word-level common) # >= 0.15 MILD DILUTION (root has low discriminating power) # < 0.15 DISTINCTIVE (root can serve as a pattern marker) # # An algorithm passes Rule F iff at least one declared root is # distinctive. If zero distinctive roots, the algorithm has no pattern # marker and the audit hard-fails it. SEVERE_DILUTION_THRESHOLD = 0.30 MILD_DILUTION_THRESHOLD = 0.15 def _compute_root_dilution_map(conn: sqlite3.Connection) -> Dict[str, float]: """Pre-compute presence ratios for all roots in algorithm_root_map. Uses NAMED algorithms only as the corpus (block placeholders are excluded because they are by construction full-span indexing stubs that would artificially boost every root's ratio). Returns {root_letters: presence_ratio}. """ # Block placeholders have algo_ids matching 'ALG-SURAH-NNN-BLOCK-NNN-NNN'. # LIKE pattern: each '_' matches a single character. named_count = conn.execute( "SELECT COUNT(*) FROM algorithm_registry " "WHERE algo_id NOT LIKE 'ALG-SURAH-___-BLOCK-___-___'" ).fetchone()[0] if named_count == 0: return {} rows = conn.execute( """ SELECT root_letters, COUNT(DISTINCT algo_id) AS algo_count FROM algorithm_root_map WHERE algo_id IN ( SELECT algo_id FROM algorithm_registry WHERE algo_id NOT LIKE 'ALG-SURAH-___-BLOCK-___-___' ) AND root_letters IS NOT NULL AND root_letters != '' GROUP BY root_letters """ ).fetchall() return { r["root_letters"]: r["algo_count"] / named_count for r in rows } def _check_pattern_specificity( conn: sqlite3.Connection, algo_id: str, dilution_map: Dict[str, float], ) -> Dict[str, Any]: """Rule F — at least one declared root must be distinctive.""" rm_rows = conn.execute( "SELECT root_letters FROM algorithm_root_map WHERE algo_id = ?", (algo_id,), ).fetchall() if not rm_rows: return {"verdict": "FAIL", "reason": "no root_map entries"} ratios: List[Tuple[str, float]] = [] severe_roots: List[Tuple[str, float]] = [] mild_roots: List[Tuple[str, float]] = [] distinctive_roots: List[Tuple[str, float]] = [] for r in rm_rows: root = r["root_letters"] if not root: continue ratio = dilution_map.get(root, 0.0) ratios.append((root, ratio)) if ratio >= SEVERE_DILUTION_THRESHOLD: severe_roots.append((root, ratio)) mild_roots.append((root, ratio)) # severe is a subset of mild elif ratio >= MILD_DILUTION_THRESHOLD: mild_roots.append((root, ratio)) else: distinctive_roots.append((root, ratio)) total = len(ratios) distinctive_count = len(distinctive_roots) mild_count = len(mild_roots) severe_count = len(severe_roots) ratios_sorted = sorted(ratios, key=lambda x: -x[1]) if distinctive_count == 0: return { "verdict": "FAIL", "reason": ( f"NO distinctive marker — all {total} root(s) have " f"presence ratio >= {MILD_DILUTION_THRESHOLD:.2f}" ), "total_roots": total, "distinctive": 0, "mild_diluted": mild_count, "severe_diluted": severe_count, "worst_offenders": ratios_sorted[:8], "distinctive_list": [], } if distinctive_count < max(1, total / 2): return { "verdict": "WARN", "reason": ( f"minority distinctive — only {distinctive_count}/{total} " f"root(s) below dilution threshold " f"{MILD_DILUTION_THRESHOLD:.2f}" ), "total_roots": total, "distinctive": distinctive_count, "mild_diluted": mild_count, "severe_diluted": severe_count, "worst_offenders": ratios_sorted[:8], "distinctive_list": distinctive_roots, } return { "verdict": "PASS", "reason": ( f"{distinctive_count}/{total} distinctive; " f"{mild_count} mild + {severe_count} severe diluted" ), "total_roots": total, "distinctive": distinctive_count, "mild_diluted": mild_count, "severe_diluted": severe_count, "distinctive_list": distinctive_roots, } # ───────────────────────────────────────────────────────────────────── # RULE G — DISTINCTIVE VOCABULARY COMPLETENESS (Session 42) # ───────────────────────────────────────────────────────────────────── # "What distinctive roots fire at the algorithm's own ayat that the # algorithm's root_map does NOT declare?" # # A root counts as a candidate-missing-marker if: # (a) it appears at least 2 times in the algorithm's ayah_map span, # (b) its algorithm-corpus presence ratio is < 0.15 (distinctive), and # (c) it is not already declared in the root_map. DISTINCTIVE_RATIO_THRESHOLD = 0.15 # algorithm-layer presence ratio MIN_LOCAL_COUNT_FOR_MARKER = 2 # minimum local occurrences at the ayat DENSITY_CONCENTRATION_THRESHOLD = 0.10 # ≥10% of root's total Qur'anic tokens at the ayat # Tuned from 0.15 to 0.10 after Session 42 spot-check: # 0.15 missed ف-ل-ك (ark, 0.11), ش-ه-و (desire, 0.13), ن-ف-ق (0.12). # 0.10 catches all content-word markers with zero function-word noise # (function words sit at 0.01-0.03 concentration — 10× below threshold). def _compute_root_global_count_map(conn: sqlite3.Connection) -> Dict[str, int]: """Precompute total Qur'anic token count per root, from quran_word_roots. Used by Rule G to filter grammatical particles and ubiquitous function words out of the "missing distinctive marker" candidate set via a density-concentration test. """ rows = conn.execute( "SELECT root, COUNT(*) AS cnt FROM quran_word_roots " "WHERE root IS NOT NULL AND root != '' GROUP BY root" ).fetchall() return {r["root"]: r["cnt"] for r in rows} def _check_distinctive_gap( conn: sqlite3.Connection, algo_id: str, dilution_map: Dict[str, float], global_count_map: Dict[str, int], ) -> Dict[str, Any]: """Rule G — distinctive roots at the ayat but missing from root_map. A root qualifies as a missing marker iff: (a) local_count >= MIN_LOCAL_COUNT_FOR_MARKER (b) local_count / total_qur_tokens >= DENSITY_CONCENTRATION_THRESHOLD (c) algorithm-layer presence ratio < DISTINCTIVE_RATIO_THRESHOLD (d) root is not already in declared root_map """ declared = set() for r in conn.execute( "SELECT root_letters FROM algorithm_root_map WHERE algo_id = ?", (algo_id,), ): if r["root_letters"]: declared.add(r["root_letters"]) am_rows = conn.execute( "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?", (algo_id,), ).fetchall() if not am_rows: return { "verdict": "PASS", "reason": "no ayah_map to scan", "declared_count": len(declared), "missing_count": 0, "top_missing": [], } # Aggregate local root counts across every mapped ayah range local_counts: Dict[str, int] = {} for am in am_rows: surah = am["surah"] ayah_start = am["ayah_start"] ayah_end = am["ayah_end"] if am["ayah_end"] is not None else ayah_start rows = conn.execute( """ SELECT root, COUNT(*) AS cnt FROM quran_word_roots WHERE surah = ? AND ayah BETWEEN ? AND ? AND root IS NOT NULL AND root != '' GROUP BY root """, (surah, ayah_start, ayah_end), ).fetchall() for r in rows: root = r["root"] cnt = r["cnt"] or 0 if root: local_counts[root] = local_counts.get(root, 0) + cnt if not local_counts: return { "verdict": "PASS", "reason": "no roots attested at ayah_map ranges", "declared_count": len(declared), "missing_count": 0, "top_missing": [], } # Apply all four gates missing: List[Tuple[str, int, float, int, float]] = [] # each entry: (root, local_cnt, layer_ratio, global_cnt, concentration) for root, cnt in local_counts.items(): if root in declared: continue if cnt < MIN_LOCAL_COUNT_FOR_MARKER: continue global_cnt = global_count_map.get(root, 0) if global_cnt <= 0: continue concentration = cnt / global_cnt if concentration < DENSITY_CONCENTRATION_THRESHOLD: continue layer_ratio = dilution_map.get(root, 0.0) if layer_ratio >= DISTINCTIVE_RATIO_THRESHOLD: continue missing.append((root, cnt, layer_ratio, global_cnt, concentration)) # Sort: highest density concentration first, then highest local count missing.sort(key=lambda x: (-x[4], -x[1])) declared_count = max(1, len(declared)) n = len(missing) if n == 0: return { "verdict": "PASS", "reason": "root_map covers all distinctive markers at the ayat", "declared_count": len(declared), "missing_count": 0, "top_missing": [], } warn_threshold = max(3, declared_count // 2) if n > declared_count: return { "verdict": "FAIL", "reason": ( f"{n} distinctive root(s) fire at the ayat but are NOT in " f"root_map (more missing than declared: {n} > {declared_count})" ), "declared_count": len(declared), "missing_count": n, "top_missing": missing[:20], } if n >= warn_threshold: return { "verdict": "WARN", "reason": ( f"{n} distinctive root(s) fire at the ayat but are NOT in " f"root_map (declared: {declared_count})" ), "declared_count": len(declared), "missing_count": n, "top_missing": missing[:20], } return { "verdict": "PASS", "reason": f"only {n} minor distinctive gap(s)", "declared_count": len(declared), "missing_count": n, "top_missing": missing, } # ───────────────────────────────────────────────────────────────────── # MAIN AUDIT # ───────────────────────────────────────────────────────────────────── def audit_algorithm( conn: sqlite3.Connection, row: sqlite3.Row, dilution_map: Optional[Dict[str, float]] = None, global_count_map: Optional[Dict[str, int]] = None, ) -> Dict[str, Any]: """Run all 6 per-row rules for one algorithm row. dilution_map enables Rule F (pattern specificity). global_count_map + dilution_map enables Rule G (distinctive gap). Either may be None — the corresponding rules are skipped. """ algo_id = row["algo_id"] result: Dict[str, Any] = { "algo_id": algo_id, "algo_name": row["algo_name"], "algo_class": row["algo_class"], "primary_ayah": row["primary_ayah"], "is_composite": bool(row["is_composite"]), "status": row["status"], "quf_pass": row["quf_pass"], "root_count": row["root_count"] or 0, "ayah_count": row["ayah_count"] or 0, "checks": {}, } result["checks"]["A_name_format"] = _check_name_format(row) result["checks"]["B_root_coverage"] = _check_root_coverage(conn, algo_id) result["checks"]["C_primary_ayah"] = _check_primary_ayah(conn, algo_id, row["primary_ayah"]) result["checks"]["D_role_vocabulary"] = _check_role_vocabulary(conn, algo_id) if dilution_map is not None: result["checks"]["F_pattern_specificity"] = _check_pattern_specificity( conn, algo_id, dilution_map ) if global_count_map is not None: result["checks"]["G_distinctive_gap"] = _check_distinctive_gap( conn, algo_id, dilution_map, global_count_map ) verdicts = [c["verdict"] for c in result["checks"].values()] if any(v == "FAIL" for v in verdicts): result["overall"] = "FAIL" elif any(v == "WARN" for v in verdicts): result["overall"] = "WARN" else: result["overall"] = "PASS" return result def collect_named_algorithms( conn: sqlite3.Connection, algo_id_filter: Optional[str] = None, class_filter: Optional[str] = None, ) -> List[sqlite3.Row]: """Return the list of rows we're auditing.""" rows = conn.execute("SELECT * FROM algorithm_registry ORDER BY algo_class, algo_id").fetchall() out = [] for r in rows: if BLOCK_ALGO_PATTERN.match(r["algo_id"] or ""): continue if algo_id_filter and r["algo_id"] != algo_id_filter: continue if class_filter and (r["algo_class"] or "").upper() != class_filter.upper(): continue out.append(r) return out # ───────────────────────────────────────────────────────────────────── # RENDERERS # ───────────────────────────────────────────────────────────────────── def render_row(result: Dict[str, Any], show_detail: bool = True) -> str: """Render one algorithm's audit result.""" lines: List[str] = [] hdr = ( f"[{result['overall']:<4}] {result['algo_id']:<38} " f"{result['algo_class']:<13} Q={result['primary_ayah'] or '-':<10} " f"roots={result['root_count']:>3} ayat={result['ayah_count']:>3}" ) lines.append(hdr) if not show_detail: return "\n".join(lines) if result.get("algo_name"): name = result["algo_name"] if len(name) > 80: name = name[:77] + "..." lines.append(f" name: {name}") lines.append(f" status={result['status'] or '-'} quf_pass={result['quf_pass'] or '-'} composite={result['is_composite']}") for rule_key, check in result["checks"].items(): v = check["verdict"] reason = check.get("reason", "") lines.append(f" {rule_key:<22} [{v:<4}] {reason}") if rule_key == "B_root_coverage" and check.get("missing_list"): for r, role in check["missing_list"]: lines.append(f" ↳ MISSING: {r:<12} role={role}") if rule_key == "F_pattern_specificity": worst = check.get("worst_offenders") or [] if worst: for root, ratio in worst[:5]: marker = "SEVERE" if ratio >= SEVERE_DILUTION_THRESHOLD else ( "mild" if ratio >= MILD_DILUTION_THRESHOLD else "distinct" ) lines.append( f" ↳ {root:<12} ratio={ratio:.2f} ({marker})" ) distinctives = check.get("distinctive_list") or [] if distinctives and v != "FAIL": roots_only = ", ".join(r for r, _ in distinctives[:6]) lines.append(f" ↳ distinctive markers: {roots_only}") if rule_key == "G_distinctive_gap": top = check.get("top_missing") or [] for entry in top[:20]: # tuple: (root, local_cnt, layer_ratio, global_cnt, concentration) if len(entry) >= 5: root, cnt, layer_ratio, gcnt, conc = entry lines.append( f" ↳ MISSING: {root:<12} local={cnt:<3} " f"global={gcnt:<4} conc={conc:.2f} layer_r={layer_ratio:.2f}" ) else: # legacy 3-tuple fallback root, cnt, ratio = entry[0], entry[1], entry[2] lines.append( f" ↳ MISSING: {root:<12} local_cnt={cnt:<3} " f"ratio={ratio:.2f}" ) return "\n".join(lines) def render_summary(results: List[Dict[str, Any]], fleet_stats: Dict[str, Any]) -> str: """Render fleet summary.""" lines: List[str] = [] lines.append("") lines.append("=" * 72) lines.append("PHASE 0 AUDIT — SUMMARY") lines.append("=" * 72) lines.append(f" audited: {len(results)} named algorithm rows") counts = Counter(r["overall"] for r in results) for verdict in ("PASS", "WARN", "FAIL"): lines.append(f" {verdict:<6} {counts.get(verdict, 0)}") lines.append("") lines.append(" BY CLASS:") by_class: Dict[str, Counter] = {} for r in results: cls = r["algo_class"] or "?" by_class.setdefault(cls, Counter())[r["overall"]] += 1 for cls in sorted(by_class): cc = by_class[cls] total = sum(cc.values()) lines.append( f" {cls:<14} total={total:>3} " f"pass={cc.get('PASS', 0):>3} warn={cc.get('WARN', 0):>3} fail={cc.get('FAIL', 0):>3}" ) lines.append("") lines.append(" BY RULE (failures count):") rule_fail = Counter() rule_warn = Counter() for r in results: for rk, c in r["checks"].items(): if c["verdict"] == "FAIL": rule_fail[rk] += 1 elif c["verdict"] == "WARN": rule_warn[rk] += 1 for rk in ( "A_name_format", "B_root_coverage", "C_primary_ayah", "D_role_vocabulary", "F_pattern_specificity", "G_distinctive_gap", ): lines.append( f" {rk:<24} fail={rule_fail.get(rk, 0):>3} warn={rule_warn.get(rk, 0):>3}" ) lines.append("") lines.append(" RULE E — FLEET STATUS (quf_pass + status):") for k, v in fleet_stats.items(): lines.append(f" {k:<30} {v}") lines.append("=" * 72) lines.append("") # Quick action list fails = [r for r in results if r["overall"] == "FAIL"] if fails: lines.append(f"FAIL list ({len(fails)}):") for r in fails: lines.append(f" {r['algo_id']}") warns = [r for r in results if r["overall"] == "WARN"] if warns: lines.append("") lines.append(f"WARN list ({len(warns)}):") for r in warns: lines.append(f" {r['algo_id']}") lines.append("") return "\n".join(lines) def collect_fleet_stats(conn: sqlite3.Connection) -> Dict[str, Any]: """Rule E — fleet-wide status and quf_pass tallies.""" stats: Dict[str, Any] = {} for row in conn.execute( "SELECT COALESCE(status, '(null)') as s, COUNT(*) FROM algorithm_registry GROUP BY s" ): stats[f"status = {row[0]}"] = row[1] for row in conn.execute( "SELECT COALESCE(quf_pass, '(null)') as q, COUNT(*) FROM algorithm_registry GROUP BY q" ): stats[f"quf_pass = {row[0]}"] = row[1] return stats # ───────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────── def main(argv: List[str]) -> int: p = argparse.ArgumentParser(prog="amr_algorithm_audit") p.add_argument("--algo", default=None, help="audit one algorithm by algo_id") p.add_argument("--class", dest="class_filter", default=None, help="filter by algo_class (OPERATOR, NARRATIVE, ...)") p.add_argument("--verdict", default=None, choices=[None, "PASS", "WARN", "FAIL"], help="only show rows with this overall verdict") p.add_argument("--summary-only", action="store_true", help="omit per-row detail, print summary only") p.add_argument("--save", default=None, metavar="PATH", help="save full report to PATH (stdout output is unchanged)") args = p.parse_args(argv) conn = _connect() try: dilution_map = _compute_root_dilution_map(conn) global_count_map = _compute_root_global_count_map(conn) targets = collect_named_algorithms(conn, algo_id_filter=args.algo, class_filter=args.class_filter) results = [ audit_algorithm( conn, row, dilution_map=dilution_map, global_count_map=global_count_map, ) for row in targets ] fleet_stats = collect_fleet_stats(conn) out_lines: List[str] = [] out_lines.append(f"amr_algorithm_audit.py — Phase 0 Report") out_lines.append(f"Generated: {datetime.now().isoformat(timespec='seconds')}") out_lines.append(f"DB: {DB_PATH}") out_lines.append("") if not args.summary_only: out_lines.append("-" * 72) out_lines.append("PER-ROW AUDIT") out_lines.append("-" * 72) show = results if args.verdict: show = [r for r in results if r["overall"] == args.verdict] for r in show: out_lines.append(render_row(r, show_detail=True)) out_lines.append("") out_lines.append(render_summary(results, fleet_stats)) text = "\n".join(out_lines) print(text) if args.save: with open(args.save, "w", encoding="utf-8") as f: f.write(text) print(f"(full report saved to {args.save})") return 0 finally: conn.close() if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))