Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| amr_algorithm_audit.py β Phase 0 READ-ONLY audit of algorithm_registry. | |
| Runs the Unit 3C hard rules against every NAMED (non-block-placeholder) | |
| algorithm in algorithm_registry, producing a per-algorithm verdict and | |
| a fleet-wide summary. ZERO writes. ZERO LLM. Pure SQL analysis. | |
| Purpose: | |
| Before extending the algorithm layer (Phase 2), audit the existing | |
| 86 named rows to discover which ones have: | |
| - sparse root_maps (roots declared in Rule B but not attested at | |
| the algorithm's ayat in quran_word_roots) | |
| - primary_ayah pointing outside the ayah_map | |
| - non-canonical role values | |
| - algo_name that doesn't reference a Qur'anic anchor | |
| These are the prerequisites for any extension work. | |
| Hard rules audited: | |
| Rule A β NAME FORMAT | |
| For non-composite rows: algo_name / algo_id must reference a | |
| Qur'anic-anchored term (named figure, named surah, named | |
| concept, or BINARY-pair structural keyword). | |
| For composite rows (is_composite=1): exempt; composites may | |
| name historical instantiations of Qur'anic patterns. | |
| Rule B β ROOT COVERAGE (the strictest check) | |
| For every root in algorithm_root_map, verify that the root has | |
| at least one token in quran_word_roots within an ayah covered | |
| by this algorithm's ayah_map. Missing roots are the single most | |
| important finding β they reveal algorithms whose declared root | |
| signatures are not attested at their own ayat. | |
| Rule C β PRIMARY_AYAH ALIGNMENT | |
| primary_ayah must fall within at least one ayah_map range for | |
| this algorithm. A primary_ayah outside the ayat is a pointer | |
| error, not a content error. | |
| Rule D β ROLE VOCABULARY | |
| Every role value in algorithm_root_map must be in | |
| {PRIMARY, SUPPORT, BINARY_A, BINARY_B}. Other values are | |
| schema drift. | |
| Rule E β FLEET STATUS | |
| Fleet-wide tally of status and quf_pass values. Not a per-row | |
| check; reported as summary only. | |
| Rule F β PATTERN SPECIFICITY (added Session 42) | |
| For every root in algorithm_root_map, compute the | |
| "algorithm presence ratio" β the fraction of named algorithms | |
| that declare this root. A root with presence >= 15% has low | |
| discriminating power (it appears in many algorithms, so it | |
| cannot be a distinctive marker of any single one). A root | |
| with presence >= 30% is layer-level dilution. | |
| For each algorithm: | |
| - At least one root must be distinctive (ratio < 0.15). | |
| - If zero distinctive roots β FAIL. | |
| - If minority distinctive (< 50%) β WARN. | |
| - Else β PASS. | |
| This catches root_maps that are populated with layer-common | |
| particles only. Measured AGAINST THE NAMED ALGORITHM CORPUS, | |
| not against the full Qur'anic token count β a root can be | |
| Qur'an-frequent but still algorithm-distinctive. | |
| Rule G β DISTINCTIVE VOCABULARY COMPLETENESS (added Session 42) | |
| For each algorithm with an ayah_map, find content roots that | |
| fire at the mapped ayat with high DENSITY CONCENTRATION but | |
| are not declared in the algorithm's root_map. | |
| A root qualifies as a "missing marker" iff ALL of: | |
| (a) local_count at ayat >= 2 | |
| (b) local_count / total_qur_tokens >= 0.10 | |
| (β₯10% of the root's total Qur'anic occurrences happen | |
| at this algorithm's ayat β the root is concentrated | |
| at these ayat, not merely present. Tuned from 0.15 | |
| after spot-check showed 0.15 missed Ω-Ω-Ω at NUH.) | |
| (c) algorithm-layer presence ratio < 0.15 (distinctive | |
| within the named algorithm corpus) | |
| (d) root is not already in the algorithm's root_map | |
| The density-concentration gate filters out grammatical | |
| particles and ubiquitous function words (Ω -Ω, Ω-Ψ£, Ψ₯-Ω, | |
| Ω-Ω-Ω, Ψ£-Ω-Ω, etc.) which have thousands of Qur'anic tokens | |
| and would otherwise flood the results. Only genuine pattern | |
| markers pass all four gates. | |
| Verdicts: | |
| - missing_count > declared_count β FAIL | |
| - missing_count >= max(3, declared/2) β WARN | |
| - else β PASS | |
| Usage: | |
| python3 amr_algorithm_audit.py # full audit, stdout | |
| python3 amr_algorithm_audit.py --algo ALG-NUH-ARK-FLOOD | |
| python3 amr_algorithm_audit.py --class OPERATOR | |
| python3 amr_algorithm_audit.py --verdict FAIL # only failing rows | |
| python3 amr_algorithm_audit.py --summary-only # no per-row detail | |
| python3 amr_algorithm_audit.py --save report.txt # save full output | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import re | |
| import sqlite3 | |
| import sys | |
| from collections import Counter | |
| from datetime import datetime | |
| from typing import Any, Dict, Iterable, List, Optional, Tuple | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DB_PATH = os.path.join(SCRIPT_DIR, "uslap_database_v3.db") | |
| BLOCK_ALGO_PATTERN = re.compile(r"^ALG-SURAH-\d{3}-BLOCK-\d{3}-\d{3}$") | |
| SURAH_ALGO_PATTERN = re.compile(r"^ALG-SURAH-(\d{3})-[A-Z]+$") | |
| BINARY_ALGO_PATTERN = re.compile(r"^ALG-BINARY-[A-Z\-]+$") | |
| CANONICAL_ROLES = frozenset({"PRIMARY", "SUPPORT", "BINARY_A", "BINARY_B"}) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Qur'anic anchor tokens for Rule A. | |
| # Not exhaustive β just wide enough to cover the 86 named rows that | |
| # exist right now. Any row that doesn't hit one of these gets a WARN | |
| # and goes on the human-review list. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| QURANIC_FIGURES = frozenset({ | |
| "PHARAOH", "FIRAWN", "QARUN", "HAMAN", "IBLIS", | |
| "NUH", "IBRAHIM", "LUT", "MUSA", "YUSUF", "KHIDR", | |
| "SULAYMAN", "BILQIS", "MARYAM", "THAMUD", "SAMIRI", | |
| "AD", "HUD", "DHUL", "QARNAYN", "BANI", "ISRAIL", | |
| "YUNUS", "AYYUB", "ZAKARIYA", "YAHYA", "ISA", "IDRIS", | |
| "ADAM", "HAWA", "NIMROD", "KAHF", "QARIAH", | |
| }) | |
| QURANIC_SURAH_NAMES = frozenset({ | |
| "FATIHA", "BAQARAH", "IMRAN", "NISA", "MAIDA", "ANAM", | |
| "ARAF", "ANFAL", "TAWBAH", "HUD", "YUSUF", "RAD", | |
| "IBRAHIM", "HIJR", "NAHL", "ISRA", "KAHF", "MARYAM", | |
| "TAHA", "ANBIYA", "HAJJ", "MUMINUN", "NUR", "FURQAN", | |
| "SHUARA", "NAML", "QASAS", "ANKABUT", "RUM", "LUQMAN", | |
| "SAJDAH", "AHZAB", "SABA", "FATIR", "YASIN", "YA-SIN", | |
| "SAFFAT", "SAD", "ZUMAR", "GHAFIR", "FUSSILAT", "SHURA", | |
| "ZUKHRUF", "DUKHAN", "JATHIYAH", "AHQAF", "MUHAMMAD", | |
| "FATH", "HUJURAT", "QAF", "DHARIYAT", "TUR", "NAJM", | |
| "QAMAR", "RAHMAN", "WAQIAH", "HADID", "MUJADILAH", | |
| "HASHR", "MUMTAHINAH", "SAFF", "JUMUAH", "MUNAFIQUN", | |
| "TAGHABUN", "TALAQ", "TAHRIM", "MULK", "QALAM", "HAQQAH", | |
| "MAARIJ", "JINN", "MUZZAMMIL", "MUDDATHTHIR", "QIYAMAH", | |
| "INSAN", "MURSALAT", "NABA", "NAZIAT", "ABASA", "TAKWIR", | |
| "INFITAR", "MUTAFFIFIN", "INSHIQAQ", "BURUJ", "TARIQ", | |
| "ALA", "GHASHIYAH", "FAJR", "BALAD", "SHAMS", "LAYL", | |
| "DUHA", "SHARH", "TIN", "ALAQ", "QADR", "BAYYINAH", | |
| "ZALZALAH", "ADIYAT", "ASR", "HUMAZAH", "FIL", "QURAYSH", | |
| "MAUN", "KAWTHAR", "KAFIRUN", "NASR", "MASAD", "IKHLAS", | |
| "FALAQ", "NAS", | |
| }) | |
| QURANIC_CONCEPT_KEYWORDS = frozenset({ | |
| "HAYAT", "MAWT", "HIDAYA", "DALAL", "NOUR", "ZULUMAT", | |
| "HAQQ", "BATIL", "NAHAR", "SAMA", "BASAR", "AKHIRA", | |
| "DUNYA", "GARDEN", "FIRE", "BOOK", "WATER", "DENIAL", | |
| "COVENANT", "BREAKING", "CYCLE", "CREATION", "ALTERATION", | |
| "MIZAN", "RIBA", "WASIYYA", "KAYL", "IDOL", "REFUTATION", | |
| "ARK", "FLOOD", "NAQA", "RAQABA", "THRONE", "POWER", | |
| "HUMAN", "SALE", "SCHEME", "REFUSAL", "DECEPTION", "FRAUD", | |
| "WEALTH", "DESTRUCTION", "FAMILY", "HOSTILE", "TOWER", | |
| "EXTRACTION", "PROTECTION", "VULNERABLE", "INHERITANCE", | |
| "DISTRIBUTION", "TRAFFICKING", "PROGENY", "DEPLOYMENT", | |
| "CONCEPTION", "SENT", "DOWN", "AZIZ", "HAKIM", "CLOSING", | |
| "CASCADE", "HISTORICAL", "WARNING", "KINDLING", "PROVISION", | |
| "LIFE", "DEATH", "GUIDANCE", "LIGHT", "DARKNESS", "TRUTH", | |
| "FALSEHOOD", "NIGHT", "DAY", "HEARING", "SIGHT", "NASL", | |
| "HARTH", "OPERATOR", "COUNTER", "WAR", "BOTH", "SIDES", | |
| "DENIAL", "REFRAIN", | |
| }) | |
| ALLOWED_COMPOSITE_SUBJECTS = frozenset({ | |
| "RADHANITE", "HABASHA", "WASATANIYYAH", "NETWORK", | |
| "COMPOSITE", "PERSECUTION", "OPERATION", | |
| }) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DB HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _connect() -> sqlite3.Connection: | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| return conn | |
| def _parse_q_anchor(q: Optional[str]) -> Optional[Tuple[int, int]]: | |
| """Parse 'Q28:4' β (28, 4). None on failure.""" | |
| if not q: | |
| return None | |
| m = re.match(r"^Q?(\d{1,3}):(\d{1,4})$", q.strip()) | |
| if not m: | |
| return None | |
| return (int(m.group(1)), int(m.group(2))) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RULE CHECKERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _check_name_format(row: sqlite3.Row) -> Dict[str, Any]: | |
| """Rule A β name references a Qur'anic anchor.""" | |
| algo_id = row["algo_id"] or "" | |
| algo_name = row["algo_name"] or "" | |
| is_composite = bool(row["is_composite"]) | |
| if is_composite: | |
| return { | |
| "verdict": "PASS", | |
| "reason": "composite row β exempt from Rule A (allows historical subject label)", | |
| } | |
| # SURAH-NNN-KEYWORD pattern β validate surah number + keyword | |
| sm = SURAH_ALGO_PATTERN.match(algo_id) | |
| if sm: | |
| surah_num = int(sm.group(1)) | |
| if 1 <= surah_num <= 114: | |
| return { | |
| "verdict": "PASS", | |
| "reason": f"surah-anchored algorithm (surah {surah_num})", | |
| } | |
| return { | |
| "verdict": "FAIL", | |
| "reason": f"surah number {surah_num} out of range (1..114)", | |
| } | |
| # BINARY-XXX-YYY pattern β structural binary pair | |
| if BINARY_ALGO_PATTERN.match(algo_id): | |
| return {"verdict": "PASS", "reason": "binary-pair structural algorithm"} | |
| # Token scan β split algo_id on dashes, check each token | |
| tokens = [t for t in re.split(r"[-_]", algo_id.upper()) if t and t != "ALG"] | |
| hits = [] | |
| for t in tokens: | |
| if t in QURANIC_FIGURES: | |
| hits.append(("figure", t)) | |
| elif t in QURANIC_SURAH_NAMES: | |
| hits.append(("surah_name", t)) | |
| elif t in QURANIC_CONCEPT_KEYWORDS: | |
| hits.append(("concept", t)) | |
| if hits: | |
| return { | |
| "verdict": "PASS", | |
| "reason": f"{len(hits)} Qur'anic anchor token(s) matched", | |
| "matched_tokens": hits, | |
| } | |
| # Check if ALL tokens are composite-only (and is_composite is false β | |
| # because if it were true we'd have passed earlier) | |
| composite_only = [t for t in tokens if t in ALLOWED_COMPOSITE_SUBJECTS] | |
| if composite_only and len(composite_only) == len([t for t in tokens if t]): | |
| return { | |
| "verdict": "WARN", | |
| "reason": f"name uses composite-only tokens {composite_only} but is_composite=0", | |
| "matched_tokens": [("composite_only", t) for t in composite_only], | |
| } | |
| return { | |
| "verdict": "WARN", | |
| "reason": "no Qur'anic anchor token matched in algo_id", | |
| "tokens_tried": tokens, | |
| } | |
| def _check_root_coverage(conn: sqlite3.Connection, algo_id: str) -> Dict[str, Any]: | |
| """Rule B β every root_map root must be attested at one of the ayat.""" | |
| rm_rows = conn.execute( | |
| "SELECT root_letters, role FROM algorithm_root_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| am_rows = conn.execute( | |
| "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| if not rm_rows: | |
| return { | |
| "verdict": "FAIL", | |
| "reason": "algorithm has NO root_map entries", | |
| "total_roots": 0, "present": 0, "missing": 0, | |
| "missing_list": [], "present_list": [], | |
| } | |
| if not am_rows: | |
| return { | |
| "verdict": "FAIL", | |
| "reason": "algorithm has NO ayah_map entries β cannot verify coverage", | |
| "total_roots": len(rm_rows), "present": 0, "missing": len(rm_rows), | |
| "missing_list": [(r["root_letters"], r["role"]) for r in rm_rows], | |
| "present_list": [], | |
| } | |
| present: List[Tuple[str, str]] = [] | |
| missing: List[Tuple[str, str]] = [] | |
| for rm in rm_rows: | |
| root = rm["root_letters"] | |
| role = rm["role"] or "" | |
| found = False | |
| for am in am_rows: | |
| surah = am["surah"] | |
| ayah_start = am["ayah_start"] | |
| ayah_end = am["ayah_end"] if am["ayah_end"] is not None else ayah_start | |
| cnt = conn.execute( | |
| "SELECT COUNT(*) FROM quran_word_roots " | |
| "WHERE root = ? AND surah = ? AND ayah BETWEEN ? AND ?", | |
| (root, surah, ayah_start, ayah_end), | |
| ).fetchone()[0] | |
| if cnt > 0: | |
| found = True | |
| break | |
| if found: | |
| present.append((root, role)) | |
| else: | |
| missing.append((root, role)) | |
| total = len(rm_rows) | |
| hit = len(present) | |
| if hit == total: | |
| return { | |
| "verdict": "PASS", | |
| "reason": f"all {total} root(s) attested within ayah_map", | |
| "total_roots": total, "present": hit, "missing": 0, | |
| "missing_list": [], "present_list": present, | |
| } | |
| if hit >= total * 0.75 and missing: | |
| return { | |
| "verdict": "WARN", | |
| "reason": f"{len(missing)} of {total} root(s) not attested within ayah_map", | |
| "total_roots": total, "present": hit, "missing": len(missing), | |
| "missing_list": missing, "present_list": present, | |
| } | |
| return { | |
| "verdict": "FAIL", | |
| "reason": f"{len(missing)} of {total} root(s) not attested β " | |
| f"coverage below 75%", | |
| "total_roots": total, "present": hit, "missing": len(missing), | |
| "missing_list": missing, "present_list": present, | |
| } | |
| def _check_primary_ayah( | |
| conn: sqlite3.Connection, algo_id: str, primary_ayah: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """Rule C β primary_ayah must be in ayah_map.""" | |
| parsed = _parse_q_anchor(primary_ayah) | |
| if parsed is None: | |
| return { | |
| "verdict": "FAIL", | |
| "reason": f"primary_ayah unparseable: {primary_ayah!r}", | |
| } | |
| p_surah, p_ayah = parsed | |
| am_rows = conn.execute( | |
| "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| if not am_rows: | |
| return { | |
| "verdict": "FAIL", | |
| "reason": "no ayah_map to compare against", | |
| } | |
| for am in am_rows: | |
| end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"] | |
| if am["surah"] == p_surah and am["ayah_start"] <= p_ayah <= end: | |
| return { | |
| "verdict": "PASS", | |
| "reason": f"primary_ayah Q{p_surah}:{p_ayah} is in ayah_map", | |
| } | |
| return { | |
| "verdict": "FAIL", | |
| "reason": f"primary_ayah Q{p_surah}:{p_ayah} is NOT in any ayah_map range", | |
| } | |
| def _check_role_vocabulary(conn: sqlite3.Connection, algo_id: str) -> Dict[str, Any]: | |
| """Rule D β every role must be in the canonical set.""" | |
| rows = conn.execute( | |
| "SELECT DISTINCT role FROM algorithm_root_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| if not rows: | |
| return {"verdict": "PASS", "reason": "no roles (empty root_map)"} | |
| seen = {r["role"] for r in rows if r["role"]} | |
| bad = sorted(seen - CANONICAL_ROLES) | |
| if not bad: | |
| return { | |
| "verdict": "PASS", | |
| "reason": f"roles in canonical set: {sorted(seen)}", | |
| } | |
| return { | |
| "verdict": "FAIL", | |
| "reason": f"non-canonical role value(s): {bad}", | |
| "bad_roles": bad, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RULE F β PATTERN SPECIFICITY (Session 42) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Dilution thresholds (fraction of named algorithms that declare a root): | |
| # >= 0.30 SEVERE DILUTION (root is function-word-level common) | |
| # >= 0.15 MILD DILUTION (root has low discriminating power) | |
| # < 0.15 DISTINCTIVE (root can serve as a pattern marker) | |
| # | |
| # An algorithm passes Rule F iff at least one declared root is | |
| # distinctive. If zero distinctive roots, the algorithm has no pattern | |
| # marker and the audit hard-fails it. | |
| SEVERE_DILUTION_THRESHOLD = 0.30 | |
| MILD_DILUTION_THRESHOLD = 0.15 | |
| def _compute_root_dilution_map(conn: sqlite3.Connection) -> Dict[str, float]: | |
| """Pre-compute presence ratios for all roots in algorithm_root_map. | |
| Uses NAMED algorithms only as the corpus (block placeholders are | |
| excluded because they are by construction full-span indexing stubs | |
| that would artificially boost every root's ratio). | |
| Returns {root_letters: presence_ratio}. | |
| """ | |
| # Block placeholders have algo_ids matching 'ALG-SURAH-NNN-BLOCK-NNN-NNN'. | |
| # LIKE pattern: each '_' matches a single character. | |
| named_count = conn.execute( | |
| "SELECT COUNT(*) FROM algorithm_registry " | |
| "WHERE algo_id NOT LIKE 'ALG-SURAH-___-BLOCK-___-___'" | |
| ).fetchone()[0] | |
| if named_count == 0: | |
| return {} | |
| rows = conn.execute( | |
| """ | |
| SELECT root_letters, COUNT(DISTINCT algo_id) AS algo_count | |
| FROM algorithm_root_map | |
| WHERE algo_id IN ( | |
| SELECT algo_id FROM algorithm_registry | |
| WHERE algo_id NOT LIKE 'ALG-SURAH-___-BLOCK-___-___' | |
| ) | |
| AND root_letters IS NOT NULL | |
| AND root_letters != '' | |
| GROUP BY root_letters | |
| """ | |
| ).fetchall() | |
| return { | |
| r["root_letters"]: r["algo_count"] / named_count | |
| for r in rows | |
| } | |
| def _check_pattern_specificity( | |
| conn: sqlite3.Connection, | |
| algo_id: str, | |
| dilution_map: Dict[str, float], | |
| ) -> Dict[str, Any]: | |
| """Rule F β at least one declared root must be distinctive.""" | |
| rm_rows = conn.execute( | |
| "SELECT root_letters FROM algorithm_root_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| if not rm_rows: | |
| return {"verdict": "FAIL", "reason": "no root_map entries"} | |
| ratios: List[Tuple[str, float]] = [] | |
| severe_roots: List[Tuple[str, float]] = [] | |
| mild_roots: List[Tuple[str, float]] = [] | |
| distinctive_roots: List[Tuple[str, float]] = [] | |
| for r in rm_rows: | |
| root = r["root_letters"] | |
| if not root: | |
| continue | |
| ratio = dilution_map.get(root, 0.0) | |
| ratios.append((root, ratio)) | |
| if ratio >= SEVERE_DILUTION_THRESHOLD: | |
| severe_roots.append((root, ratio)) | |
| mild_roots.append((root, ratio)) # severe is a subset of mild | |
| elif ratio >= MILD_DILUTION_THRESHOLD: | |
| mild_roots.append((root, ratio)) | |
| else: | |
| distinctive_roots.append((root, ratio)) | |
| total = len(ratios) | |
| distinctive_count = len(distinctive_roots) | |
| mild_count = len(mild_roots) | |
| severe_count = len(severe_roots) | |
| ratios_sorted = sorted(ratios, key=lambda x: -x[1]) | |
| if distinctive_count == 0: | |
| return { | |
| "verdict": "FAIL", | |
| "reason": ( | |
| f"NO distinctive marker β all {total} root(s) have " | |
| f"presence ratio >= {MILD_DILUTION_THRESHOLD:.2f}" | |
| ), | |
| "total_roots": total, | |
| "distinctive": 0, | |
| "mild_diluted": mild_count, | |
| "severe_diluted": severe_count, | |
| "worst_offenders": ratios_sorted[:8], | |
| "distinctive_list": [], | |
| } | |
| if distinctive_count < max(1, total / 2): | |
| return { | |
| "verdict": "WARN", | |
| "reason": ( | |
| f"minority distinctive β only {distinctive_count}/{total} " | |
| f"root(s) below dilution threshold " | |
| f"{MILD_DILUTION_THRESHOLD:.2f}" | |
| ), | |
| "total_roots": total, | |
| "distinctive": distinctive_count, | |
| "mild_diluted": mild_count, | |
| "severe_diluted": severe_count, | |
| "worst_offenders": ratios_sorted[:8], | |
| "distinctive_list": distinctive_roots, | |
| } | |
| return { | |
| "verdict": "PASS", | |
| "reason": ( | |
| f"{distinctive_count}/{total} distinctive; " | |
| f"{mild_count} mild + {severe_count} severe diluted" | |
| ), | |
| "total_roots": total, | |
| "distinctive": distinctive_count, | |
| "mild_diluted": mild_count, | |
| "severe_diluted": severe_count, | |
| "distinctive_list": distinctive_roots, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RULE G β DISTINCTIVE VOCABULARY COMPLETENESS (Session 42) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # "What distinctive roots fire at the algorithm's own ayat that the | |
| # algorithm's root_map does NOT declare?" | |
| # | |
| # A root counts as a candidate-missing-marker if: | |
| # (a) it appears at least 2 times in the algorithm's ayah_map span, | |
| # (b) its algorithm-corpus presence ratio is < 0.15 (distinctive), and | |
| # (c) it is not already declared in the root_map. | |
| DISTINCTIVE_RATIO_THRESHOLD = 0.15 # algorithm-layer presence ratio | |
| MIN_LOCAL_COUNT_FOR_MARKER = 2 # minimum local occurrences at the ayat | |
| DENSITY_CONCENTRATION_THRESHOLD = 0.10 # β₯10% of root's total Qur'anic tokens at the ayat | |
| # Tuned from 0.15 to 0.10 after Session 42 spot-check: | |
| # 0.15 missed Ω-Ω-Ω (ark, 0.11), Ψ΄-Ω-Ω (desire, 0.13), Ω-Ω-Ω (0.12). | |
| # 0.10 catches all content-word markers with zero function-word noise | |
| # (function words sit at 0.01-0.03 concentration β 10Γ below threshold). | |
| def _compute_root_global_count_map(conn: sqlite3.Connection) -> Dict[str, int]: | |
| """Precompute total Qur'anic token count per root, from quran_word_roots. | |
| Used by Rule G to filter grammatical particles and ubiquitous | |
| function words out of the "missing distinctive marker" candidate | |
| set via a density-concentration test. | |
| """ | |
| rows = conn.execute( | |
| "SELECT root, COUNT(*) AS cnt FROM quran_word_roots " | |
| "WHERE root IS NOT NULL AND root != '' GROUP BY root" | |
| ).fetchall() | |
| return {r["root"]: r["cnt"] for r in rows} | |
| def _check_distinctive_gap( | |
| conn: sqlite3.Connection, | |
| algo_id: str, | |
| dilution_map: Dict[str, float], | |
| global_count_map: Dict[str, int], | |
| ) -> Dict[str, Any]: | |
| """Rule G β distinctive roots at the ayat but missing from root_map. | |
| A root qualifies as a missing marker iff: | |
| (a) local_count >= MIN_LOCAL_COUNT_FOR_MARKER | |
| (b) local_count / total_qur_tokens >= DENSITY_CONCENTRATION_THRESHOLD | |
| (c) algorithm-layer presence ratio < DISTINCTIVE_RATIO_THRESHOLD | |
| (d) root is not already in declared root_map | |
| """ | |
| declared = set() | |
| for r in conn.execute( | |
| "SELECT root_letters FROM algorithm_root_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ): | |
| if r["root_letters"]: | |
| declared.add(r["root_letters"]) | |
| am_rows = conn.execute( | |
| "SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?", | |
| (algo_id,), | |
| ).fetchall() | |
| if not am_rows: | |
| return { | |
| "verdict": "PASS", | |
| "reason": "no ayah_map to scan", | |
| "declared_count": len(declared), | |
| "missing_count": 0, | |
| "top_missing": [], | |
| } | |
| # Aggregate local root counts across every mapped ayah range | |
| local_counts: Dict[str, int] = {} | |
| for am in am_rows: | |
| surah = am["surah"] | |
| ayah_start = am["ayah_start"] | |
| ayah_end = am["ayah_end"] if am["ayah_end"] is not None else ayah_start | |
| rows = conn.execute( | |
| """ | |
| SELECT root, COUNT(*) AS cnt | |
| FROM quran_word_roots | |
| WHERE surah = ? | |
| AND ayah BETWEEN ? AND ? | |
| AND root IS NOT NULL | |
| AND root != '' | |
| GROUP BY root | |
| """, | |
| (surah, ayah_start, ayah_end), | |
| ).fetchall() | |
| for r in rows: | |
| root = r["root"] | |
| cnt = r["cnt"] or 0 | |
| if root: | |
| local_counts[root] = local_counts.get(root, 0) + cnt | |
| if not local_counts: | |
| return { | |
| "verdict": "PASS", | |
| "reason": "no roots attested at ayah_map ranges", | |
| "declared_count": len(declared), | |
| "missing_count": 0, | |
| "top_missing": [], | |
| } | |
| # Apply all four gates | |
| missing: List[Tuple[str, int, float, int, float]] = [] | |
| # each entry: (root, local_cnt, layer_ratio, global_cnt, concentration) | |
| for root, cnt in local_counts.items(): | |
| if root in declared: | |
| continue | |
| if cnt < MIN_LOCAL_COUNT_FOR_MARKER: | |
| continue | |
| global_cnt = global_count_map.get(root, 0) | |
| if global_cnt <= 0: | |
| continue | |
| concentration = cnt / global_cnt | |
| if concentration < DENSITY_CONCENTRATION_THRESHOLD: | |
| continue | |
| layer_ratio = dilution_map.get(root, 0.0) | |
| if layer_ratio >= DISTINCTIVE_RATIO_THRESHOLD: | |
| continue | |
| missing.append((root, cnt, layer_ratio, global_cnt, concentration)) | |
| # Sort: highest density concentration first, then highest local count | |
| missing.sort(key=lambda x: (-x[4], -x[1])) | |
| declared_count = max(1, len(declared)) | |
| n = len(missing) | |
| if n == 0: | |
| return { | |
| "verdict": "PASS", | |
| "reason": "root_map covers all distinctive markers at the ayat", | |
| "declared_count": len(declared), | |
| "missing_count": 0, | |
| "top_missing": [], | |
| } | |
| warn_threshold = max(3, declared_count // 2) | |
| if n > declared_count: | |
| return { | |
| "verdict": "FAIL", | |
| "reason": ( | |
| f"{n} distinctive root(s) fire at the ayat but are NOT in " | |
| f"root_map (more missing than declared: {n} > {declared_count})" | |
| ), | |
| "declared_count": len(declared), | |
| "missing_count": n, | |
| "top_missing": missing[:20], | |
| } | |
| if n >= warn_threshold: | |
| return { | |
| "verdict": "WARN", | |
| "reason": ( | |
| f"{n} distinctive root(s) fire at the ayat but are NOT in " | |
| f"root_map (declared: {declared_count})" | |
| ), | |
| "declared_count": len(declared), | |
| "missing_count": n, | |
| "top_missing": missing[:20], | |
| } | |
| return { | |
| "verdict": "PASS", | |
| "reason": f"only {n} minor distinctive gap(s)", | |
| "declared_count": len(declared), | |
| "missing_count": n, | |
| "top_missing": missing, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN AUDIT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def audit_algorithm( | |
| conn: sqlite3.Connection, | |
| row: sqlite3.Row, | |
| dilution_map: Optional[Dict[str, float]] = None, | |
| global_count_map: Optional[Dict[str, int]] = None, | |
| ) -> Dict[str, Any]: | |
| """Run all 6 per-row rules for one algorithm row. | |
| dilution_map enables Rule F (pattern specificity). | |
| global_count_map + dilution_map enables Rule G (distinctive gap). | |
| Either may be None β the corresponding rules are skipped. | |
| """ | |
| algo_id = row["algo_id"] | |
| result: Dict[str, Any] = { | |
| "algo_id": algo_id, | |
| "algo_name": row["algo_name"], | |
| "algo_class": row["algo_class"], | |
| "primary_ayah": row["primary_ayah"], | |
| "is_composite": bool(row["is_composite"]), | |
| "status": row["status"], | |
| "quf_pass": row["quf_pass"], | |
| "root_count": row["root_count"] or 0, | |
| "ayah_count": row["ayah_count"] or 0, | |
| "checks": {}, | |
| } | |
| result["checks"]["A_name_format"] = _check_name_format(row) | |
| result["checks"]["B_root_coverage"] = _check_root_coverage(conn, algo_id) | |
| result["checks"]["C_primary_ayah"] = _check_primary_ayah(conn, algo_id, row["primary_ayah"]) | |
| result["checks"]["D_role_vocabulary"] = _check_role_vocabulary(conn, algo_id) | |
| if dilution_map is not None: | |
| result["checks"]["F_pattern_specificity"] = _check_pattern_specificity( | |
| conn, algo_id, dilution_map | |
| ) | |
| if global_count_map is not None: | |
| result["checks"]["G_distinctive_gap"] = _check_distinctive_gap( | |
| conn, algo_id, dilution_map, global_count_map | |
| ) | |
| verdicts = [c["verdict"] for c in result["checks"].values()] | |
| if any(v == "FAIL" for v in verdicts): | |
| result["overall"] = "FAIL" | |
| elif any(v == "WARN" for v in verdicts): | |
| result["overall"] = "WARN" | |
| else: | |
| result["overall"] = "PASS" | |
| return result | |
| def collect_named_algorithms( | |
| conn: sqlite3.Connection, | |
| algo_id_filter: Optional[str] = None, | |
| class_filter: Optional[str] = None, | |
| ) -> List[sqlite3.Row]: | |
| """Return the list of rows we're auditing.""" | |
| rows = conn.execute("SELECT * FROM algorithm_registry ORDER BY algo_class, algo_id").fetchall() | |
| out = [] | |
| for r in rows: | |
| if BLOCK_ALGO_PATTERN.match(r["algo_id"] or ""): | |
| continue | |
| if algo_id_filter and r["algo_id"] != algo_id_filter: | |
| continue | |
| if class_filter and (r["algo_class"] or "").upper() != class_filter.upper(): | |
| continue | |
| out.append(r) | |
| return out | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RENDERERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def render_row(result: Dict[str, Any], show_detail: bool = True) -> str: | |
| """Render one algorithm's audit result.""" | |
| lines: List[str] = [] | |
| hdr = ( | |
| f"[{result['overall']:<4}] {result['algo_id']:<38} " | |
| f"{result['algo_class']:<13} Q={result['primary_ayah'] or '-':<10} " | |
| f"roots={result['root_count']:>3} ayat={result['ayah_count']:>3}" | |
| ) | |
| lines.append(hdr) | |
| if not show_detail: | |
| return "\n".join(lines) | |
| if result.get("algo_name"): | |
| name = result["algo_name"] | |
| if len(name) > 80: | |
| name = name[:77] + "..." | |
| lines.append(f" name: {name}") | |
| lines.append(f" status={result['status'] or '-'} quf_pass={result['quf_pass'] or '-'} composite={result['is_composite']}") | |
| for rule_key, check in result["checks"].items(): | |
| v = check["verdict"] | |
| reason = check.get("reason", "") | |
| lines.append(f" {rule_key:<22} [{v:<4}] {reason}") | |
| if rule_key == "B_root_coverage" and check.get("missing_list"): | |
| for r, role in check["missing_list"]: | |
| lines.append(f" β³ MISSING: {r:<12} role={role}") | |
| if rule_key == "F_pattern_specificity": | |
| worst = check.get("worst_offenders") or [] | |
| if worst: | |
| for root, ratio in worst[:5]: | |
| marker = "SEVERE" if ratio >= SEVERE_DILUTION_THRESHOLD else ( | |
| "mild" if ratio >= MILD_DILUTION_THRESHOLD else "distinct" | |
| ) | |
| lines.append( | |
| f" β³ {root:<12} ratio={ratio:.2f} ({marker})" | |
| ) | |
| distinctives = check.get("distinctive_list") or [] | |
| if distinctives and v != "FAIL": | |
| roots_only = ", ".join(r for r, _ in distinctives[:6]) | |
| lines.append(f" β³ distinctive markers: {roots_only}") | |
| if rule_key == "G_distinctive_gap": | |
| top = check.get("top_missing") or [] | |
| for entry in top[:20]: | |
| # tuple: (root, local_cnt, layer_ratio, global_cnt, concentration) | |
| if len(entry) >= 5: | |
| root, cnt, layer_ratio, gcnt, conc = entry | |
| lines.append( | |
| f" β³ MISSING: {root:<12} local={cnt:<3} " | |
| f"global={gcnt:<4} conc={conc:.2f} layer_r={layer_ratio:.2f}" | |
| ) | |
| else: | |
| # legacy 3-tuple fallback | |
| root, cnt, ratio = entry[0], entry[1], entry[2] | |
| lines.append( | |
| f" β³ MISSING: {root:<12} local_cnt={cnt:<3} " | |
| f"ratio={ratio:.2f}" | |
| ) | |
| return "\n".join(lines) | |
| def render_summary(results: List[Dict[str, Any]], fleet_stats: Dict[str, Any]) -> str: | |
| """Render fleet summary.""" | |
| lines: List[str] = [] | |
| lines.append("") | |
| lines.append("=" * 72) | |
| lines.append("PHASE 0 AUDIT β SUMMARY") | |
| lines.append("=" * 72) | |
| lines.append(f" audited: {len(results)} named algorithm rows") | |
| counts = Counter(r["overall"] for r in results) | |
| for verdict in ("PASS", "WARN", "FAIL"): | |
| lines.append(f" {verdict:<6} {counts.get(verdict, 0)}") | |
| lines.append("") | |
| lines.append(" BY CLASS:") | |
| by_class: Dict[str, Counter] = {} | |
| for r in results: | |
| cls = r["algo_class"] or "?" | |
| by_class.setdefault(cls, Counter())[r["overall"]] += 1 | |
| for cls in sorted(by_class): | |
| cc = by_class[cls] | |
| total = sum(cc.values()) | |
| lines.append( | |
| f" {cls:<14} total={total:>3} " | |
| f"pass={cc.get('PASS', 0):>3} warn={cc.get('WARN', 0):>3} fail={cc.get('FAIL', 0):>3}" | |
| ) | |
| lines.append("") | |
| lines.append(" BY RULE (failures count):") | |
| rule_fail = Counter() | |
| rule_warn = Counter() | |
| for r in results: | |
| for rk, c in r["checks"].items(): | |
| if c["verdict"] == "FAIL": | |
| rule_fail[rk] += 1 | |
| elif c["verdict"] == "WARN": | |
| rule_warn[rk] += 1 | |
| for rk in ( | |
| "A_name_format", | |
| "B_root_coverage", | |
| "C_primary_ayah", | |
| "D_role_vocabulary", | |
| "F_pattern_specificity", | |
| "G_distinctive_gap", | |
| ): | |
| lines.append( | |
| f" {rk:<24} fail={rule_fail.get(rk, 0):>3} warn={rule_warn.get(rk, 0):>3}" | |
| ) | |
| lines.append("") | |
| lines.append(" RULE E β FLEET STATUS (quf_pass + status):") | |
| for k, v in fleet_stats.items(): | |
| lines.append(f" {k:<30} {v}") | |
| lines.append("=" * 72) | |
| lines.append("") | |
| # Quick action list | |
| fails = [r for r in results if r["overall"] == "FAIL"] | |
| if fails: | |
| lines.append(f"FAIL list ({len(fails)}):") | |
| for r in fails: | |
| lines.append(f" {r['algo_id']}") | |
| warns = [r for r in results if r["overall"] == "WARN"] | |
| if warns: | |
| lines.append("") | |
| lines.append(f"WARN list ({len(warns)}):") | |
| for r in warns: | |
| lines.append(f" {r['algo_id']}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def collect_fleet_stats(conn: sqlite3.Connection) -> Dict[str, Any]: | |
| """Rule E β fleet-wide status and quf_pass tallies.""" | |
| stats: Dict[str, Any] = {} | |
| for row in conn.execute( | |
| "SELECT COALESCE(status, '(null)') as s, COUNT(*) FROM algorithm_registry GROUP BY s" | |
| ): | |
| stats[f"status = {row[0]}"] = row[1] | |
| for row in conn.execute( | |
| "SELECT COALESCE(quf_pass, '(null)') as q, COUNT(*) FROM algorithm_registry GROUP BY q" | |
| ): | |
| stats[f"quf_pass = {row[0]}"] = row[1] | |
| return stats | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(argv: List[str]) -> int: | |
| p = argparse.ArgumentParser(prog="amr_algorithm_audit") | |
| p.add_argument("--algo", default=None, help="audit one algorithm by algo_id") | |
| p.add_argument("--class", dest="class_filter", default=None, | |
| help="filter by algo_class (OPERATOR, NARRATIVE, ...)") | |
| p.add_argument("--verdict", default=None, choices=[None, "PASS", "WARN", "FAIL"], | |
| help="only show rows with this overall verdict") | |
| p.add_argument("--summary-only", action="store_true", | |
| help="omit per-row detail, print summary only") | |
| p.add_argument("--save", default=None, metavar="PATH", | |
| help="save full report to PATH (stdout output is unchanged)") | |
| args = p.parse_args(argv) | |
| conn = _connect() | |
| try: | |
| dilution_map = _compute_root_dilution_map(conn) | |
| global_count_map = _compute_root_global_count_map(conn) | |
| targets = collect_named_algorithms(conn, algo_id_filter=args.algo, | |
| class_filter=args.class_filter) | |
| results = [ | |
| audit_algorithm( | |
| conn, row, | |
| dilution_map=dilution_map, | |
| global_count_map=global_count_map, | |
| ) | |
| for row in targets | |
| ] | |
| fleet_stats = collect_fleet_stats(conn) | |
| out_lines: List[str] = [] | |
| out_lines.append(f"amr_algorithm_audit.py β Phase 0 Report") | |
| out_lines.append(f"Generated: {datetime.now().isoformat(timespec='seconds')}") | |
| out_lines.append(f"DB: {DB_PATH}") | |
| out_lines.append("") | |
| if not args.summary_only: | |
| out_lines.append("-" * 72) | |
| out_lines.append("PER-ROW AUDIT") | |
| out_lines.append("-" * 72) | |
| show = results | |
| if args.verdict: | |
| show = [r for r in results if r["overall"] == args.verdict] | |
| for r in show: | |
| out_lines.append(render_row(r, show_detail=True)) | |
| out_lines.append("") | |
| out_lines.append(render_summary(results, fleet_stats)) | |
| text = "\n".join(out_lines) | |
| print(text) | |
| if args.save: | |
| with open(args.save, "w", encoding="utf-8") as f: | |
| f.write(text) | |
| print(f"(full report saved to {args.save})") | |
| return 0 | |
| finally: | |
| conn.close() | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) | |