| import json
|
| from pathlib import Path
|
| from typing import Any, Dict, List
|
|
|
| from pydantic import BaseModel
|
|
|
| from core.mcp_protocol import mcp_call
|
|
|
|
|
| class AuditResolution(BaseModel):
|
| verdict: str
|
| risk_score: float
|
| reasoning_steps: List[str]
|
| mcp_trace: str
|
| warning: str = ""
|
|
|
|
|
| class LogicAuditor:
|
| """
|
| Evidence-weighted logic auditor for credential review.
|
|
|
| The auditor treats registry matches as supporting evidence only. It does not
|
| approve a credential solely because an institution exists in ROR or because a
|
| file name contains a trusted-looking keyword.
|
| """
|
|
|
| def __init__(self, blacklist_path: str = "data/fraud_blacklist.json"):
|
| self.blacklist_path = Path(blacklist_path)
|
|
|
| @staticmethod
|
| def _normalize(value: str) -> str:
|
| return " ".join(value.lower().replace("&", "and").split())
|
|
|
| def _load_blacklist_names(self) -> set[str]:
|
| names: set[str] = set()
|
| try:
|
| with self.blacklist_path.open("r", encoding="utf-8") as f:
|
| blacklist_data = json.load(f)
|
| except (OSError, json.JSONDecodeError):
|
| return names
|
|
|
| for entry in blacklist_data.get("blacklist", []):
|
| if entry.get("name"):
|
| names.add(self._normalize(entry["name"]))
|
| for alias in entry.get("aliases", []):
|
| names.add(self._normalize(alias))
|
| return names
|
|
|
| async def audit(self, transcript: Dict[str, Any], profile: Dict[str, Any]) -> AuditResolution:
|
| print("[LOGIC] [Logic-Auditor] Initializing evidence-weighted review...")
|
| call = mcp_call("mcp_logic_audit", {"transcript_id": "...", "context_level": "deep"})
|
|
|
| reasoning_steps: List[str] = []
|
| anomalies: List[tuple[str, float]] = []
|
| warnings: List[str] = []
|
|
|
| inst_name = self._normalize(profile.get("name", ""))
|
| blacklist_names = self._load_blacklist_names()
|
| is_diploma_mill = profile.get("is_diploma_mill", False) or inst_name in blacklist_names
|
| profile_status = profile.get("status", "unknown")
|
|
|
| reasoning_steps.append("Step 0: Checking known diploma-mill and degree-factory indicators.")
|
| if is_diploma_mill or profile_status == "fraudulent":
|
| warning_msg = profile.get("warning") or "DIPLOMA MILL / DEGREE FACTORY DETECTED -- credentials from this institution require hard rejection."
|
| return AuditResolution(
|
| verdict="REJECTED — DIPLOMA MILL / DEGREE FACTORY",
|
| risk_score=100.0,
|
| reasoning_steps=[
|
| *reasoning_steps,
|
| f"Result 0: HARD REJECTION. '{profile.get('name', 'Unknown')}' is flagged by the fraud registry.",
|
| "No approval is issued because the issuing entity is disqualified.",
|
| ],
|
| mcp_trace=call.trace_id,
|
| warning=warning_msg,
|
| )
|
| reasoning_steps.append("Result 0: No exact blacklist or alias match found.")
|
|
|
| reasoning_steps.append("Step 1: Mapping graduation window against institutional lifecycle.")
|
| grad_year = int(transcript.get("graduation_year") or 0)
|
| est_year = profile.get("established_year")
|
| if grad_year > 0 and est_year and grad_year < int(est_year):
|
| anomalies.append(("CRITICAL: Graduation predates the institution founding year.", 55.0))
|
| reasoning_steps.append("Result 1: Temporal violation found.")
|
| elif est_year:
|
| reasoning_steps.append("Result 1: Timeline is internally consistent.")
|
| else:
|
| warnings.append("Founding year unavailable; temporal validation is incomplete.")
|
| reasoning_steps.append("Result 1: Founding year unavailable; timeline needs review.")
|
|
|
| reasoning_steps.append("Step 2: Evaluating registry evidence without granting automatic approval.")
|
| has_ror_id = bool(profile.get("ror_id"))
|
| source = profile.get("source", "none")
|
| match_confidence = float(profile.get("match_confidence") or 0.0)
|
| if has_ror_id and profile_status == "active":
|
| reasoning_steps.append("Result 2: Active ROR presence found as supporting institution-existence evidence.")
|
| elif has_ror_id:
|
| anomalies.append((f"WARNING: ROR status is '{profile_status}', not active.", 30.0))
|
| reasoning_steps.append("Result 2: Registry presence found, but status requires review.")
|
| else:
|
| anomalies.append(("WARNING: No verified registry identifier was resolved.", 35.0))
|
| reasoning_steps.append("Result 2: No ROR identifier available.")
|
|
|
| if source in {"ror", "local_index"} and 0 < match_confidence < 0.80:
|
| anomalies.append(("WARNING: Institution match confidence is below the production threshold.", 25.0))
|
| reasoning_steps.append("Result 2b: Match confidence is low and should be manually reviewed.")
|
|
|
| reasoning_steps.append("Step 3: Checking credential-authenticity evidence.")
|
| if not transcript.get("credential_id") and not transcript.get("signature_verified"):
|
| anomalies.append(("WARNING: No credential ID or cryptographic issuer signature was verified.", 35.0))
|
| reasoning_steps.append("Result 3: Credential authenticity remains unproven.")
|
| else:
|
| reasoning_steps.append("Result 3: Credential-level evidence is present.")
|
|
|
| risk_score = min(100.0, sum(weight for _, weight in anomalies))
|
| if risk_score >= 85:
|
| verdict = "REJECTED"
|
| elif risk_score > 0 or warnings:
|
| verdict = "NEEDS_REVIEW"
|
| else:
|
| verdict = "APPROVED"
|
|
|
| return AuditResolution(
|
| verdict=verdict,
|
| risk_score=risk_score,
|
| reasoning_steps=[*reasoning_steps, *[item for item, _ in anomalies], *warnings],
|
| mcp_trace=call.trace_id,
|
| warning="; ".join(warnings),
|
| )
|
|
|