uslap-query / Code_files /amr_algorithm_audit.py
uslap's picture
Upload folder using huggingface_hub
7cc8e29 verified
Raw
History Blame Contribute Delete
40.3 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
amr_algorithm_audit.py β€” Phase 0 READ-ONLY audit of algorithm_registry.
Runs the Unit 3C hard rules against every NAMED (non-block-placeholder)
algorithm in algorithm_registry, producing a per-algorithm verdict and
a fleet-wide summary. ZERO writes. ZERO LLM. Pure SQL analysis.
Purpose:
Before extending the algorithm layer (Phase 2), audit the existing
86 named rows to discover which ones have:
- sparse root_maps (roots declared in Rule B but not attested at
the algorithm's ayat in quran_word_roots)
- primary_ayah pointing outside the ayah_map
- non-canonical role values
- algo_name that doesn't reference a Qur'anic anchor
These are the prerequisites for any extension work.
Hard rules audited:
Rule A β€” NAME FORMAT
For non-composite rows: algo_name / algo_id must reference a
Qur'anic-anchored term (named figure, named surah, named
concept, or BINARY-pair structural keyword).
For composite rows (is_composite=1): exempt; composites may
name historical instantiations of Qur'anic patterns.
Rule B β€” ROOT COVERAGE (the strictest check)
For every root in algorithm_root_map, verify that the root has
at least one token in quran_word_roots within an ayah covered
by this algorithm's ayah_map. Missing roots are the single most
important finding β€” they reveal algorithms whose declared root
signatures are not attested at their own ayat.
Rule C β€” PRIMARY_AYAH ALIGNMENT
primary_ayah must fall within at least one ayah_map range for
this algorithm. A primary_ayah outside the ayat is a pointer
error, not a content error.
Rule D β€” ROLE VOCABULARY
Every role value in algorithm_root_map must be in
{PRIMARY, SUPPORT, BINARY_A, BINARY_B}. Other values are
schema drift.
Rule E β€” FLEET STATUS
Fleet-wide tally of status and quf_pass values. Not a per-row
check; reported as summary only.
Rule F β€” PATTERN SPECIFICITY (added Session 42)
For every root in algorithm_root_map, compute the
"algorithm presence ratio" β€” the fraction of named algorithms
that declare this root. A root with presence >= 15% has low
discriminating power (it appears in many algorithms, so it
cannot be a distinctive marker of any single one). A root
with presence >= 30% is layer-level dilution.
For each algorithm:
- At least one root must be distinctive (ratio < 0.15).
- If zero distinctive roots β†’ FAIL.
- If minority distinctive (< 50%) β†’ WARN.
- Else β†’ PASS.
This catches root_maps that are populated with layer-common
particles only. Measured AGAINST THE NAMED ALGORITHM CORPUS,
not against the full Qur'anic token count β€” a root can be
Qur'an-frequent but still algorithm-distinctive.
Rule G β€” DISTINCTIVE VOCABULARY COMPLETENESS (added Session 42)
For each algorithm with an ayah_map, find content roots that
fire at the mapped ayat with high DENSITY CONCENTRATION but
are not declared in the algorithm's root_map.
A root qualifies as a "missing marker" iff ALL of:
(a) local_count at ayat >= 2
(b) local_count / total_qur_tokens >= 0.10
(β‰₯10% of the root's total Qur'anic occurrences happen
at this algorithm's ayat β€” the root is concentrated
at these ayat, not merely present. Tuned from 0.15
after spot-check showed 0.15 missed ف-Ω„-Ωƒ at NUH.)
(c) algorithm-layer presence ratio < 0.15 (distinctive
within the named algorithm corpus)
(d) root is not already in the algorithm's root_map
The density-concentration gate filters out grammatical
particles and ubiquitous function words (Ω…-Ω†, Ω„-Ψ£, Ψ₯-Ω†,
Ω‚-و-Ω„, Ψ£-Ω„-Ω‡, etc.) which have thousands of Qur'anic tokens
and would otherwise flood the results. Only genuine pattern
markers pass all four gates.
Verdicts:
- missing_count > declared_count β†’ FAIL
- missing_count >= max(3, declared/2) β†’ WARN
- else β†’ PASS
Usage:
python3 amr_algorithm_audit.py # full audit, stdout
python3 amr_algorithm_audit.py --algo ALG-NUH-ARK-FLOOD
python3 amr_algorithm_audit.py --class OPERATOR
python3 amr_algorithm_audit.py --verdict FAIL # only failing rows
python3 amr_algorithm_audit.py --summary-only # no per-row detail
python3 amr_algorithm_audit.py --save report.txt # save full output
"""
from __future__ import annotations
import argparse
import os
import re
import sqlite3
import sys
from collections import Counter
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(SCRIPT_DIR, "uslap_database_v3.db")
BLOCK_ALGO_PATTERN = re.compile(r"^ALG-SURAH-\d{3}-BLOCK-\d{3}-\d{3}$")
SURAH_ALGO_PATTERN = re.compile(r"^ALG-SURAH-(\d{3})-[A-Z]+$")
BINARY_ALGO_PATTERN = re.compile(r"^ALG-BINARY-[A-Z\-]+$")
CANONICAL_ROLES = frozenset({"PRIMARY", "SUPPORT", "BINARY_A", "BINARY_B"})
# ─────────────────────────────────────────────────────────────────────
# Qur'anic anchor tokens for Rule A.
# Not exhaustive β€” just wide enough to cover the 86 named rows that
# exist right now. Any row that doesn't hit one of these gets a WARN
# and goes on the human-review list.
# ─────────────────────────────────────────────────────────────────────
QURANIC_FIGURES = frozenset({
"PHARAOH", "FIRAWN", "QARUN", "HAMAN", "IBLIS",
"NUH", "IBRAHIM", "LUT", "MUSA", "YUSUF", "KHIDR",
"SULAYMAN", "BILQIS", "MARYAM", "THAMUD", "SAMIRI",
"AD", "HUD", "DHUL", "QARNAYN", "BANI", "ISRAIL",
"YUNUS", "AYYUB", "ZAKARIYA", "YAHYA", "ISA", "IDRIS",
"ADAM", "HAWA", "NIMROD", "KAHF", "QARIAH",
})
QURANIC_SURAH_NAMES = frozenset({
"FATIHA", "BAQARAH", "IMRAN", "NISA", "MAIDA", "ANAM",
"ARAF", "ANFAL", "TAWBAH", "HUD", "YUSUF", "RAD",
"IBRAHIM", "HIJR", "NAHL", "ISRA", "KAHF", "MARYAM",
"TAHA", "ANBIYA", "HAJJ", "MUMINUN", "NUR", "FURQAN",
"SHUARA", "NAML", "QASAS", "ANKABUT", "RUM", "LUQMAN",
"SAJDAH", "AHZAB", "SABA", "FATIR", "YASIN", "YA-SIN",
"SAFFAT", "SAD", "ZUMAR", "GHAFIR", "FUSSILAT", "SHURA",
"ZUKHRUF", "DUKHAN", "JATHIYAH", "AHQAF", "MUHAMMAD",
"FATH", "HUJURAT", "QAF", "DHARIYAT", "TUR", "NAJM",
"QAMAR", "RAHMAN", "WAQIAH", "HADID", "MUJADILAH",
"HASHR", "MUMTAHINAH", "SAFF", "JUMUAH", "MUNAFIQUN",
"TAGHABUN", "TALAQ", "TAHRIM", "MULK", "QALAM", "HAQQAH",
"MAARIJ", "JINN", "MUZZAMMIL", "MUDDATHTHIR", "QIYAMAH",
"INSAN", "MURSALAT", "NABA", "NAZIAT", "ABASA", "TAKWIR",
"INFITAR", "MUTAFFIFIN", "INSHIQAQ", "BURUJ", "TARIQ",
"ALA", "GHASHIYAH", "FAJR", "BALAD", "SHAMS", "LAYL",
"DUHA", "SHARH", "TIN", "ALAQ", "QADR", "BAYYINAH",
"ZALZALAH", "ADIYAT", "ASR", "HUMAZAH", "FIL", "QURAYSH",
"MAUN", "KAWTHAR", "KAFIRUN", "NASR", "MASAD", "IKHLAS",
"FALAQ", "NAS",
})
QURANIC_CONCEPT_KEYWORDS = frozenset({
"HAYAT", "MAWT", "HIDAYA", "DALAL", "NOUR", "ZULUMAT",
"HAQQ", "BATIL", "NAHAR", "SAMA", "BASAR", "AKHIRA",
"DUNYA", "GARDEN", "FIRE", "BOOK", "WATER", "DENIAL",
"COVENANT", "BREAKING", "CYCLE", "CREATION", "ALTERATION",
"MIZAN", "RIBA", "WASIYYA", "KAYL", "IDOL", "REFUTATION",
"ARK", "FLOOD", "NAQA", "RAQABA", "THRONE", "POWER",
"HUMAN", "SALE", "SCHEME", "REFUSAL", "DECEPTION", "FRAUD",
"WEALTH", "DESTRUCTION", "FAMILY", "HOSTILE", "TOWER",
"EXTRACTION", "PROTECTION", "VULNERABLE", "INHERITANCE",
"DISTRIBUTION", "TRAFFICKING", "PROGENY", "DEPLOYMENT",
"CONCEPTION", "SENT", "DOWN", "AZIZ", "HAKIM", "CLOSING",
"CASCADE", "HISTORICAL", "WARNING", "KINDLING", "PROVISION",
"LIFE", "DEATH", "GUIDANCE", "LIGHT", "DARKNESS", "TRUTH",
"FALSEHOOD", "NIGHT", "DAY", "HEARING", "SIGHT", "NASL",
"HARTH", "OPERATOR", "COUNTER", "WAR", "BOTH", "SIDES",
"DENIAL", "REFRAIN",
})
ALLOWED_COMPOSITE_SUBJECTS = frozenset({
"RADHANITE", "HABASHA", "WASATANIYYAH", "NETWORK",
"COMPOSITE", "PERSECUTION", "OPERATION",
})
# ─────────────────────────────────────────────────────────────────────
# DB HELPERS
# ─────────────────────────────────────────────────────────────────────
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def _parse_q_anchor(q: Optional[str]) -> Optional[Tuple[int, int]]:
"""Parse 'Q28:4' β†’ (28, 4). None on failure."""
if not q:
return None
m = re.match(r"^Q?(\d{1,3}):(\d{1,4})$", q.strip())
if not m:
return None
return (int(m.group(1)), int(m.group(2)))
# ─────────────────────────────────────────────────────────────────────
# RULE CHECKERS
# ─────────────────────────────────────────────────────────────────────
def _check_name_format(row: sqlite3.Row) -> Dict[str, Any]:
"""Rule A β€” name references a Qur'anic anchor."""
algo_id = row["algo_id"] or ""
algo_name = row["algo_name"] or ""
is_composite = bool(row["is_composite"])
if is_composite:
return {
"verdict": "PASS",
"reason": "composite row β€” exempt from Rule A (allows historical subject label)",
}
# SURAH-NNN-KEYWORD pattern β€” validate surah number + keyword
sm = SURAH_ALGO_PATTERN.match(algo_id)
if sm:
surah_num = int(sm.group(1))
if 1 <= surah_num <= 114:
return {
"verdict": "PASS",
"reason": f"surah-anchored algorithm (surah {surah_num})",
}
return {
"verdict": "FAIL",
"reason": f"surah number {surah_num} out of range (1..114)",
}
# BINARY-XXX-YYY pattern β€” structural binary pair
if BINARY_ALGO_PATTERN.match(algo_id):
return {"verdict": "PASS", "reason": "binary-pair structural algorithm"}
# Token scan β€” split algo_id on dashes, check each token
tokens = [t for t in re.split(r"[-_]", algo_id.upper()) if t and t != "ALG"]
hits = []
for t in tokens:
if t in QURANIC_FIGURES:
hits.append(("figure", t))
elif t in QURANIC_SURAH_NAMES:
hits.append(("surah_name", t))
elif t in QURANIC_CONCEPT_KEYWORDS:
hits.append(("concept", t))
if hits:
return {
"verdict": "PASS",
"reason": f"{len(hits)} Qur'anic anchor token(s) matched",
"matched_tokens": hits,
}
# Check if ALL tokens are composite-only (and is_composite is false β€”
# because if it were true we'd have passed earlier)
composite_only = [t for t in tokens if t in ALLOWED_COMPOSITE_SUBJECTS]
if composite_only and len(composite_only) == len([t for t in tokens if t]):
return {
"verdict": "WARN",
"reason": f"name uses composite-only tokens {composite_only} but is_composite=0",
"matched_tokens": [("composite_only", t) for t in composite_only],
}
return {
"verdict": "WARN",
"reason": "no Qur'anic anchor token matched in algo_id",
"tokens_tried": tokens,
}
def _check_root_coverage(conn: sqlite3.Connection, algo_id: str) -> Dict[str, Any]:
"""Rule B β€” every root_map root must be attested at one of the ayat."""
rm_rows = conn.execute(
"SELECT root_letters, role FROM algorithm_root_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
am_rows = conn.execute(
"SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
if not rm_rows:
return {
"verdict": "FAIL",
"reason": "algorithm has NO root_map entries",
"total_roots": 0, "present": 0, "missing": 0,
"missing_list": [], "present_list": [],
}
if not am_rows:
return {
"verdict": "FAIL",
"reason": "algorithm has NO ayah_map entries β€” cannot verify coverage",
"total_roots": len(rm_rows), "present": 0, "missing": len(rm_rows),
"missing_list": [(r["root_letters"], r["role"]) for r in rm_rows],
"present_list": [],
}
present: List[Tuple[str, str]] = []
missing: List[Tuple[str, str]] = []
for rm in rm_rows:
root = rm["root_letters"]
role = rm["role"] or ""
found = False
for am in am_rows:
surah = am["surah"]
ayah_start = am["ayah_start"]
ayah_end = am["ayah_end"] if am["ayah_end"] is not None else ayah_start
cnt = conn.execute(
"SELECT COUNT(*) FROM quran_word_roots "
"WHERE root = ? AND surah = ? AND ayah BETWEEN ? AND ?",
(root, surah, ayah_start, ayah_end),
).fetchone()[0]
if cnt > 0:
found = True
break
if found:
present.append((root, role))
else:
missing.append((root, role))
total = len(rm_rows)
hit = len(present)
if hit == total:
return {
"verdict": "PASS",
"reason": f"all {total} root(s) attested within ayah_map",
"total_roots": total, "present": hit, "missing": 0,
"missing_list": [], "present_list": present,
}
if hit >= total * 0.75 and missing:
return {
"verdict": "WARN",
"reason": f"{len(missing)} of {total} root(s) not attested within ayah_map",
"total_roots": total, "present": hit, "missing": len(missing),
"missing_list": missing, "present_list": present,
}
return {
"verdict": "FAIL",
"reason": f"{len(missing)} of {total} root(s) not attested β€” "
f"coverage below 75%",
"total_roots": total, "present": hit, "missing": len(missing),
"missing_list": missing, "present_list": present,
}
def _check_primary_ayah(
conn: sqlite3.Connection, algo_id: str, primary_ayah: Optional[str],
) -> Dict[str, Any]:
"""Rule C β€” primary_ayah must be in ayah_map."""
parsed = _parse_q_anchor(primary_ayah)
if parsed is None:
return {
"verdict": "FAIL",
"reason": f"primary_ayah unparseable: {primary_ayah!r}",
}
p_surah, p_ayah = parsed
am_rows = conn.execute(
"SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
if not am_rows:
return {
"verdict": "FAIL",
"reason": "no ayah_map to compare against",
}
for am in am_rows:
end = am["ayah_end"] if am["ayah_end"] is not None else am["ayah_start"]
if am["surah"] == p_surah and am["ayah_start"] <= p_ayah <= end:
return {
"verdict": "PASS",
"reason": f"primary_ayah Q{p_surah}:{p_ayah} is in ayah_map",
}
return {
"verdict": "FAIL",
"reason": f"primary_ayah Q{p_surah}:{p_ayah} is NOT in any ayah_map range",
}
def _check_role_vocabulary(conn: sqlite3.Connection, algo_id: str) -> Dict[str, Any]:
"""Rule D β€” every role must be in the canonical set."""
rows = conn.execute(
"SELECT DISTINCT role FROM algorithm_root_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
if not rows:
return {"verdict": "PASS", "reason": "no roles (empty root_map)"}
seen = {r["role"] for r in rows if r["role"]}
bad = sorted(seen - CANONICAL_ROLES)
if not bad:
return {
"verdict": "PASS",
"reason": f"roles in canonical set: {sorted(seen)}",
}
return {
"verdict": "FAIL",
"reason": f"non-canonical role value(s): {bad}",
"bad_roles": bad,
}
# ─────────────────────────────────────────────────────────────────────
# RULE F β€” PATTERN SPECIFICITY (Session 42)
# ─────────────────────────────────────────────────────────────────────
# Dilution thresholds (fraction of named algorithms that declare a root):
# >= 0.30 SEVERE DILUTION (root is function-word-level common)
# >= 0.15 MILD DILUTION (root has low discriminating power)
# < 0.15 DISTINCTIVE (root can serve as a pattern marker)
#
# An algorithm passes Rule F iff at least one declared root is
# distinctive. If zero distinctive roots, the algorithm has no pattern
# marker and the audit hard-fails it.
SEVERE_DILUTION_THRESHOLD = 0.30
MILD_DILUTION_THRESHOLD = 0.15
def _compute_root_dilution_map(conn: sqlite3.Connection) -> Dict[str, float]:
"""Pre-compute presence ratios for all roots in algorithm_root_map.
Uses NAMED algorithms only as the corpus (block placeholders are
excluded because they are by construction full-span indexing stubs
that would artificially boost every root's ratio).
Returns {root_letters: presence_ratio}.
"""
# Block placeholders have algo_ids matching 'ALG-SURAH-NNN-BLOCK-NNN-NNN'.
# LIKE pattern: each '_' matches a single character.
named_count = conn.execute(
"SELECT COUNT(*) FROM algorithm_registry "
"WHERE algo_id NOT LIKE 'ALG-SURAH-___-BLOCK-___-___'"
).fetchone()[0]
if named_count == 0:
return {}
rows = conn.execute(
"""
SELECT root_letters, COUNT(DISTINCT algo_id) AS algo_count
FROM algorithm_root_map
WHERE algo_id IN (
SELECT algo_id FROM algorithm_registry
WHERE algo_id NOT LIKE 'ALG-SURAH-___-BLOCK-___-___'
)
AND root_letters IS NOT NULL
AND root_letters != ''
GROUP BY root_letters
"""
).fetchall()
return {
r["root_letters"]: r["algo_count"] / named_count
for r in rows
}
def _check_pattern_specificity(
conn: sqlite3.Connection,
algo_id: str,
dilution_map: Dict[str, float],
) -> Dict[str, Any]:
"""Rule F β€” at least one declared root must be distinctive."""
rm_rows = conn.execute(
"SELECT root_letters FROM algorithm_root_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
if not rm_rows:
return {"verdict": "FAIL", "reason": "no root_map entries"}
ratios: List[Tuple[str, float]] = []
severe_roots: List[Tuple[str, float]] = []
mild_roots: List[Tuple[str, float]] = []
distinctive_roots: List[Tuple[str, float]] = []
for r in rm_rows:
root = r["root_letters"]
if not root:
continue
ratio = dilution_map.get(root, 0.0)
ratios.append((root, ratio))
if ratio >= SEVERE_DILUTION_THRESHOLD:
severe_roots.append((root, ratio))
mild_roots.append((root, ratio)) # severe is a subset of mild
elif ratio >= MILD_DILUTION_THRESHOLD:
mild_roots.append((root, ratio))
else:
distinctive_roots.append((root, ratio))
total = len(ratios)
distinctive_count = len(distinctive_roots)
mild_count = len(mild_roots)
severe_count = len(severe_roots)
ratios_sorted = sorted(ratios, key=lambda x: -x[1])
if distinctive_count == 0:
return {
"verdict": "FAIL",
"reason": (
f"NO distinctive marker β€” all {total} root(s) have "
f"presence ratio >= {MILD_DILUTION_THRESHOLD:.2f}"
),
"total_roots": total,
"distinctive": 0,
"mild_diluted": mild_count,
"severe_diluted": severe_count,
"worst_offenders": ratios_sorted[:8],
"distinctive_list": [],
}
if distinctive_count < max(1, total / 2):
return {
"verdict": "WARN",
"reason": (
f"minority distinctive β€” only {distinctive_count}/{total} "
f"root(s) below dilution threshold "
f"{MILD_DILUTION_THRESHOLD:.2f}"
),
"total_roots": total,
"distinctive": distinctive_count,
"mild_diluted": mild_count,
"severe_diluted": severe_count,
"worst_offenders": ratios_sorted[:8],
"distinctive_list": distinctive_roots,
}
return {
"verdict": "PASS",
"reason": (
f"{distinctive_count}/{total} distinctive; "
f"{mild_count} mild + {severe_count} severe diluted"
),
"total_roots": total,
"distinctive": distinctive_count,
"mild_diluted": mild_count,
"severe_diluted": severe_count,
"distinctive_list": distinctive_roots,
}
# ─────────────────────────────────────────────────────────────────────
# RULE G β€” DISTINCTIVE VOCABULARY COMPLETENESS (Session 42)
# ─────────────────────────────────────────────────────────────────────
# "What distinctive roots fire at the algorithm's own ayat that the
# algorithm's root_map does NOT declare?"
#
# A root counts as a candidate-missing-marker if:
# (a) it appears at least 2 times in the algorithm's ayah_map span,
# (b) its algorithm-corpus presence ratio is < 0.15 (distinctive), and
# (c) it is not already declared in the root_map.
DISTINCTIVE_RATIO_THRESHOLD = 0.15 # algorithm-layer presence ratio
MIN_LOCAL_COUNT_FOR_MARKER = 2 # minimum local occurrences at the ayat
DENSITY_CONCENTRATION_THRESHOLD = 0.10 # β‰₯10% of root's total Qur'anic tokens at the ayat
# Tuned from 0.15 to 0.10 after Session 42 spot-check:
# 0.15 missed ف-Ω„-Ωƒ (ark, 0.11), Ψ΄-Ω‡-و (desire, 0.13), Ω†-ف-Ω‚ (0.12).
# 0.10 catches all content-word markers with zero function-word noise
# (function words sit at 0.01-0.03 concentration β€” 10Γ— below threshold).
def _compute_root_global_count_map(conn: sqlite3.Connection) -> Dict[str, int]:
"""Precompute total Qur'anic token count per root, from quran_word_roots.
Used by Rule G to filter grammatical particles and ubiquitous
function words out of the "missing distinctive marker" candidate
set via a density-concentration test.
"""
rows = conn.execute(
"SELECT root, COUNT(*) AS cnt FROM quran_word_roots "
"WHERE root IS NOT NULL AND root != '' GROUP BY root"
).fetchall()
return {r["root"]: r["cnt"] for r in rows}
def _check_distinctive_gap(
conn: sqlite3.Connection,
algo_id: str,
dilution_map: Dict[str, float],
global_count_map: Dict[str, int],
) -> Dict[str, Any]:
"""Rule G β€” distinctive roots at the ayat but missing from root_map.
A root qualifies as a missing marker iff:
(a) local_count >= MIN_LOCAL_COUNT_FOR_MARKER
(b) local_count / total_qur_tokens >= DENSITY_CONCENTRATION_THRESHOLD
(c) algorithm-layer presence ratio < DISTINCTIVE_RATIO_THRESHOLD
(d) root is not already in declared root_map
"""
declared = set()
for r in conn.execute(
"SELECT root_letters FROM algorithm_root_map WHERE algo_id = ?",
(algo_id,),
):
if r["root_letters"]:
declared.add(r["root_letters"])
am_rows = conn.execute(
"SELECT surah, ayah_start, ayah_end FROM algorithm_ayah_map WHERE algo_id = ?",
(algo_id,),
).fetchall()
if not am_rows:
return {
"verdict": "PASS",
"reason": "no ayah_map to scan",
"declared_count": len(declared),
"missing_count": 0,
"top_missing": [],
}
# Aggregate local root counts across every mapped ayah range
local_counts: Dict[str, int] = {}
for am in am_rows:
surah = am["surah"]
ayah_start = am["ayah_start"]
ayah_end = am["ayah_end"] if am["ayah_end"] is not None else ayah_start
rows = conn.execute(
"""
SELECT root, COUNT(*) AS cnt
FROM quran_word_roots
WHERE surah = ?
AND ayah BETWEEN ? AND ?
AND root IS NOT NULL
AND root != ''
GROUP BY root
""",
(surah, ayah_start, ayah_end),
).fetchall()
for r in rows:
root = r["root"]
cnt = r["cnt"] or 0
if root:
local_counts[root] = local_counts.get(root, 0) + cnt
if not local_counts:
return {
"verdict": "PASS",
"reason": "no roots attested at ayah_map ranges",
"declared_count": len(declared),
"missing_count": 0,
"top_missing": [],
}
# Apply all four gates
missing: List[Tuple[str, int, float, int, float]] = []
# each entry: (root, local_cnt, layer_ratio, global_cnt, concentration)
for root, cnt in local_counts.items():
if root in declared:
continue
if cnt < MIN_LOCAL_COUNT_FOR_MARKER:
continue
global_cnt = global_count_map.get(root, 0)
if global_cnt <= 0:
continue
concentration = cnt / global_cnt
if concentration < DENSITY_CONCENTRATION_THRESHOLD:
continue
layer_ratio = dilution_map.get(root, 0.0)
if layer_ratio >= DISTINCTIVE_RATIO_THRESHOLD:
continue
missing.append((root, cnt, layer_ratio, global_cnt, concentration))
# Sort: highest density concentration first, then highest local count
missing.sort(key=lambda x: (-x[4], -x[1]))
declared_count = max(1, len(declared))
n = len(missing)
if n == 0:
return {
"verdict": "PASS",
"reason": "root_map covers all distinctive markers at the ayat",
"declared_count": len(declared),
"missing_count": 0,
"top_missing": [],
}
warn_threshold = max(3, declared_count // 2)
if n > declared_count:
return {
"verdict": "FAIL",
"reason": (
f"{n} distinctive root(s) fire at the ayat but are NOT in "
f"root_map (more missing than declared: {n} > {declared_count})"
),
"declared_count": len(declared),
"missing_count": n,
"top_missing": missing[:20],
}
if n >= warn_threshold:
return {
"verdict": "WARN",
"reason": (
f"{n} distinctive root(s) fire at the ayat but are NOT in "
f"root_map (declared: {declared_count})"
),
"declared_count": len(declared),
"missing_count": n,
"top_missing": missing[:20],
}
return {
"verdict": "PASS",
"reason": f"only {n} minor distinctive gap(s)",
"declared_count": len(declared),
"missing_count": n,
"top_missing": missing,
}
# ─────────────────────────────────────────────────────────────────────
# MAIN AUDIT
# ─────────────────────────────────────────────────────────────────────
def audit_algorithm(
conn: sqlite3.Connection,
row: sqlite3.Row,
dilution_map: Optional[Dict[str, float]] = None,
global_count_map: Optional[Dict[str, int]] = None,
) -> Dict[str, Any]:
"""Run all 6 per-row rules for one algorithm row.
dilution_map enables Rule F (pattern specificity).
global_count_map + dilution_map enables Rule G (distinctive gap).
Either may be None β€” the corresponding rules are skipped.
"""
algo_id = row["algo_id"]
result: Dict[str, Any] = {
"algo_id": algo_id,
"algo_name": row["algo_name"],
"algo_class": row["algo_class"],
"primary_ayah": row["primary_ayah"],
"is_composite": bool(row["is_composite"]),
"status": row["status"],
"quf_pass": row["quf_pass"],
"root_count": row["root_count"] or 0,
"ayah_count": row["ayah_count"] or 0,
"checks": {},
}
result["checks"]["A_name_format"] = _check_name_format(row)
result["checks"]["B_root_coverage"] = _check_root_coverage(conn, algo_id)
result["checks"]["C_primary_ayah"] = _check_primary_ayah(conn, algo_id, row["primary_ayah"])
result["checks"]["D_role_vocabulary"] = _check_role_vocabulary(conn, algo_id)
if dilution_map is not None:
result["checks"]["F_pattern_specificity"] = _check_pattern_specificity(
conn, algo_id, dilution_map
)
if global_count_map is not None:
result["checks"]["G_distinctive_gap"] = _check_distinctive_gap(
conn, algo_id, dilution_map, global_count_map
)
verdicts = [c["verdict"] for c in result["checks"].values()]
if any(v == "FAIL" for v in verdicts):
result["overall"] = "FAIL"
elif any(v == "WARN" for v in verdicts):
result["overall"] = "WARN"
else:
result["overall"] = "PASS"
return result
def collect_named_algorithms(
conn: sqlite3.Connection,
algo_id_filter: Optional[str] = None,
class_filter: Optional[str] = None,
) -> List[sqlite3.Row]:
"""Return the list of rows we're auditing."""
rows = conn.execute("SELECT * FROM algorithm_registry ORDER BY algo_class, algo_id").fetchall()
out = []
for r in rows:
if BLOCK_ALGO_PATTERN.match(r["algo_id"] or ""):
continue
if algo_id_filter and r["algo_id"] != algo_id_filter:
continue
if class_filter and (r["algo_class"] or "").upper() != class_filter.upper():
continue
out.append(r)
return out
# ─────────────────────────────────────────────────────────────────────
# RENDERERS
# ─────────────────────────────────────────────────────────────────────
def render_row(result: Dict[str, Any], show_detail: bool = True) -> str:
"""Render one algorithm's audit result."""
lines: List[str] = []
hdr = (
f"[{result['overall']:<4}] {result['algo_id']:<38} "
f"{result['algo_class']:<13} Q={result['primary_ayah'] or '-':<10} "
f"roots={result['root_count']:>3} ayat={result['ayah_count']:>3}"
)
lines.append(hdr)
if not show_detail:
return "\n".join(lines)
if result.get("algo_name"):
name = result["algo_name"]
if len(name) > 80:
name = name[:77] + "..."
lines.append(f" name: {name}")
lines.append(f" status={result['status'] or '-'} quf_pass={result['quf_pass'] or '-'} composite={result['is_composite']}")
for rule_key, check in result["checks"].items():
v = check["verdict"]
reason = check.get("reason", "")
lines.append(f" {rule_key:<22} [{v:<4}] {reason}")
if rule_key == "B_root_coverage" and check.get("missing_list"):
for r, role in check["missing_list"]:
lines.append(f" ↳ MISSING: {r:<12} role={role}")
if rule_key == "F_pattern_specificity":
worst = check.get("worst_offenders") or []
if worst:
for root, ratio in worst[:5]:
marker = "SEVERE" if ratio >= SEVERE_DILUTION_THRESHOLD else (
"mild" if ratio >= MILD_DILUTION_THRESHOLD else "distinct"
)
lines.append(
f" ↳ {root:<12} ratio={ratio:.2f} ({marker})"
)
distinctives = check.get("distinctive_list") or []
if distinctives and v != "FAIL":
roots_only = ", ".join(r for r, _ in distinctives[:6])
lines.append(f" ↳ distinctive markers: {roots_only}")
if rule_key == "G_distinctive_gap":
top = check.get("top_missing") or []
for entry in top[:20]:
# tuple: (root, local_cnt, layer_ratio, global_cnt, concentration)
if len(entry) >= 5:
root, cnt, layer_ratio, gcnt, conc = entry
lines.append(
f" ↳ MISSING: {root:<12} local={cnt:<3} "
f"global={gcnt:<4} conc={conc:.2f} layer_r={layer_ratio:.2f}"
)
else:
# legacy 3-tuple fallback
root, cnt, ratio = entry[0], entry[1], entry[2]
lines.append(
f" ↳ MISSING: {root:<12} local_cnt={cnt:<3} "
f"ratio={ratio:.2f}"
)
return "\n".join(lines)
def render_summary(results: List[Dict[str, Any]], fleet_stats: Dict[str, Any]) -> str:
"""Render fleet summary."""
lines: List[str] = []
lines.append("")
lines.append("=" * 72)
lines.append("PHASE 0 AUDIT β€” SUMMARY")
lines.append("=" * 72)
lines.append(f" audited: {len(results)} named algorithm rows")
counts = Counter(r["overall"] for r in results)
for verdict in ("PASS", "WARN", "FAIL"):
lines.append(f" {verdict:<6} {counts.get(verdict, 0)}")
lines.append("")
lines.append(" BY CLASS:")
by_class: Dict[str, Counter] = {}
for r in results:
cls = r["algo_class"] or "?"
by_class.setdefault(cls, Counter())[r["overall"]] += 1
for cls in sorted(by_class):
cc = by_class[cls]
total = sum(cc.values())
lines.append(
f" {cls:<14} total={total:>3} "
f"pass={cc.get('PASS', 0):>3} warn={cc.get('WARN', 0):>3} fail={cc.get('FAIL', 0):>3}"
)
lines.append("")
lines.append(" BY RULE (failures count):")
rule_fail = Counter()
rule_warn = Counter()
for r in results:
for rk, c in r["checks"].items():
if c["verdict"] == "FAIL":
rule_fail[rk] += 1
elif c["verdict"] == "WARN":
rule_warn[rk] += 1
for rk in (
"A_name_format",
"B_root_coverage",
"C_primary_ayah",
"D_role_vocabulary",
"F_pattern_specificity",
"G_distinctive_gap",
):
lines.append(
f" {rk:<24} fail={rule_fail.get(rk, 0):>3} warn={rule_warn.get(rk, 0):>3}"
)
lines.append("")
lines.append(" RULE E β€” FLEET STATUS (quf_pass + status):")
for k, v in fleet_stats.items():
lines.append(f" {k:<30} {v}")
lines.append("=" * 72)
lines.append("")
# Quick action list
fails = [r for r in results if r["overall"] == "FAIL"]
if fails:
lines.append(f"FAIL list ({len(fails)}):")
for r in fails:
lines.append(f" {r['algo_id']}")
warns = [r for r in results if r["overall"] == "WARN"]
if warns:
lines.append("")
lines.append(f"WARN list ({len(warns)}):")
for r in warns:
lines.append(f" {r['algo_id']}")
lines.append("")
return "\n".join(lines)
def collect_fleet_stats(conn: sqlite3.Connection) -> Dict[str, Any]:
"""Rule E β€” fleet-wide status and quf_pass tallies."""
stats: Dict[str, Any] = {}
for row in conn.execute(
"SELECT COALESCE(status, '(null)') as s, COUNT(*) FROM algorithm_registry GROUP BY s"
):
stats[f"status = {row[0]}"] = row[1]
for row in conn.execute(
"SELECT COALESCE(quf_pass, '(null)') as q, COUNT(*) FROM algorithm_registry GROUP BY q"
):
stats[f"quf_pass = {row[0]}"] = row[1]
return stats
# ─────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────
def main(argv: List[str]) -> int:
p = argparse.ArgumentParser(prog="amr_algorithm_audit")
p.add_argument("--algo", default=None, help="audit one algorithm by algo_id")
p.add_argument("--class", dest="class_filter", default=None,
help="filter by algo_class (OPERATOR, NARRATIVE, ...)")
p.add_argument("--verdict", default=None, choices=[None, "PASS", "WARN", "FAIL"],
help="only show rows with this overall verdict")
p.add_argument("--summary-only", action="store_true",
help="omit per-row detail, print summary only")
p.add_argument("--save", default=None, metavar="PATH",
help="save full report to PATH (stdout output is unchanged)")
args = p.parse_args(argv)
conn = _connect()
try:
dilution_map = _compute_root_dilution_map(conn)
global_count_map = _compute_root_global_count_map(conn)
targets = collect_named_algorithms(conn, algo_id_filter=args.algo,
class_filter=args.class_filter)
results = [
audit_algorithm(
conn, row,
dilution_map=dilution_map,
global_count_map=global_count_map,
)
for row in targets
]
fleet_stats = collect_fleet_stats(conn)
out_lines: List[str] = []
out_lines.append(f"amr_algorithm_audit.py β€” Phase 0 Report")
out_lines.append(f"Generated: {datetime.now().isoformat(timespec='seconds')}")
out_lines.append(f"DB: {DB_PATH}")
out_lines.append("")
if not args.summary_only:
out_lines.append("-" * 72)
out_lines.append("PER-ROW AUDIT")
out_lines.append("-" * 72)
show = results
if args.verdict:
show = [r for r in results if r["overall"] == args.verdict]
for r in show:
out_lines.append(render_row(r, show_detail=True))
out_lines.append("")
out_lines.append(render_summary(results, fleet_stats))
text = "\n".join(out_lines)
print(text)
if args.save:
with open(args.save, "w", encoding="utf-8") as f:
f.write(text)
print(f"(full report saved to {args.save})")
return 0
finally:
conn.close()
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))