import json import math import time import hashlib from pathlib import Path from datetime import datetime, timezone from typing import Any, Dict, List, Tuple import gradio as gr from huggingface_hub import HfApi, hf_hub_download # ----------------------------------------------------------------------------- # CROVIA — CEP Terminal v2.0.0 (PRISM CONSOLE + OMISSION ORACLE) # Custom HTML/CSS/JS frontend, Gradio backend + events # Global Standard Release - EU AI Act Compliance # ----------------------------------------------------------------------------- CEP_DATASET_ID = "Crovia/cep-capsules" OMISSIONS_DATASET_ID = "Crovia/global-ai-training-omissions" OPEN_DEMO_MODE = True ENGINE_VERSION = "2.0.0" _CAPSULE_LIST_CACHE = {"ts": 0.0, "items": []} _CAPSULE_LIST_TTL_SEC = 300 # 5 min _OMISSIONS_CACHE = {"ts": 0.0, "data": {}} # ============================================================================= # OMISSION ORACLE v2.0.0 - Shadow Score Engine # ============================================================================= NECESSITY_CANON = { "NEC#1": {"name": "Missing data provenance", "severity": 75, "category": "provenance"}, "NEC#2": {"name": "Missing license attribution", "severity": 80, "category": "license"}, "NEC#7": {"name": "Missing usage scope", "severity": 45, "category": "scope"}, "NEC#10": {"name": "Missing temporal validity", "severity": 40, "category": "validity"}, "NEC#13": {"name": "Missing accountable entity", "severity": 70, "category": "identity"}, } def _load_omissions_ranking() -> Dict[str, Any]: """Load global omissions ranking from HuggingFace.""" now = time.time() if (now - _OMISSIONS_CACHE["ts"]) < _CAPSULE_LIST_TTL_SEC and _OMISSIONS_CACHE["data"]: return _OMISSIONS_CACHE["data"] try: path = hf_hub_download( repo_id=OMISSIONS_DATASET_ID, filename="EVIDENCE.json", repo_type="dataset", ) with open(path, "r", encoding="utf-8") as f: data = json.load(f) _OMISSIONS_CACHE["ts"] = now _OMISSIONS_CACHE["data"] = data return data except Exception: return {} def analyze_model_shadow_score(model_data: Dict[str, Any]) -> Dict[str, Any]: """ Omission Oracle: Analyze model for NEC# violations and compute Shadow Score. """ model_id = model_data.get("id", model_data.get("model_id", "unknown")) license_val = model_data.get("license", model_data.get("cardData", {}).get("license")) datasets = model_data.get("datasets", model_data.get("cardData", {}).get("datasets", [])) author = model_data.get("author", "") tags = model_data.get("tags", []) violations = [] # NEC#1: Missing data provenance if not datasets and "fine-tuned" not in " ".join(tags).lower(): violations.append({ "id": "NEC#1", "name": NECESSITY_CANON["NEC#1"]["name"], "severity": NECESSITY_CANON["NEC#1"]["severity"], "reason": "No training data provenance declared" }) # NEC#2: Missing license attribution if not license_val: violations.append({ "id": "NEC#2", "name": NECESSITY_CANON["NEC#2"]["name"], "severity": NECESSITY_CANON["NEC#2"]["severity"], "reason": "No license attribution declared" }) # NEC#7: Missing usage scope has_intended_use = any("intended" in t.lower() or "use" in t.lower() for t in tags) if not has_intended_use: violations.append({ "id": "NEC#7", "name": NECESSITY_CANON["NEC#7"]["name"], "severity": NECESSITY_CANON["NEC#7"]["severity"], "reason": "No intended use or usage scope declared" }) # NEC#13: Missing accountable entity if not author: violations.append({ "id": "NEC#13", "name": NECESSITY_CANON["NEC#13"]["name"], "severity": NECESSITY_CANON["NEC#13"]["severity"], "reason": "No accountable author or organization" }) # Compute Shadow Score (100 = perfect, 0 = critical) total_severity = sum(v["severity"] for v in violations) shadow_score = max(0, min(100, 100 - int(total_severity * 0.15))) # Determine badge if shadow_score >= 90: badge, badge_color = "GOLD", "#F59E0B" elif shadow_score >= 75: badge, badge_color = "SILVER", "#94A3B8" elif shadow_score >= 60: badge, badge_color = "BRONZE", "#D97706" else: badge, badge_color = "UNVERIFIED", "#EF4444" # Severity classification if not violations: severity = "CLEAN" elif shadow_score >= 75: severity = "LOW" elif shadow_score >= 60: severity = "MEDIUM" elif shadow_score >= 40: severity = "HIGH" else: severity = "CRITICAL" return { "model_id": model_id, "shadow_score": shadow_score, "badge": badge, "badge_color": badge_color, "severity": severity, "violations": violations, "violation_count": len(violations), "license": license_val or "not declared", "author": author or "unknown", "analyzed_at": _nowz(), "oracle_version": ENGINE_VERSION, } def _nowz() -> str: return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") def _canonical_json_bytes(obj: dict) -> bytes: return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") def _sha256_hex(b: bytes) -> str: return hashlib.sha256(b).hexdigest() def _short(s: str, n: int = 16) -> str: if not isinstance(s, str) or not s: return "" return s[:n] + "…" def _list_capsules() -> List[str]: now = time.time() if (now - _CAPSULE_LIST_CACHE["ts"]) < _CAPSULE_LIST_TTL_SEC and _CAPSULE_LIST_CACHE["items"]: return _CAPSULE_LIST_CACHE["items"] items: List[str] = [] try: files = HfApi().list_repo_files(repo_id=CEP_DATASET_ID, repo_type="dataset") for f in files: if f.endswith(".json") and f.startswith("CEP-") and not f.lower().endswith("index.json"): items.append(Path(f).stem) items = sorted(set(items))[:350] except Exception: items = [] _CAPSULE_LIST_CACHE["ts"] = now _CAPSULE_LIST_CACHE["items"] = items return items def fetch_capsule(cep_id: str) -> Dict[str, Any]: path = hf_hub_download( repo_id=CEP_DATASET_ID, filename=f"{cep_id}.json", repo_type="dataset", ) with open(path, "r", encoding="utf-8") as f: return json.load(f) def compute_trust(checks: Dict[str, bool]) -> Tuple[str, str, float]: missing = [k for k, v in checks.items() if not v] if not missing: return ("GREEN", "Fully anchored: evidence + signature + hashchain bound.", 1.0) if missing == ["hashchain_root"]: return ("YELLOW", "Evidence present, but not bound to a verifiable training run (missing hashchain).", 0.62) if missing == ["signature"]: return ("YELLOW", "Evidence present, but missing signature metadata (publisher not authenticated).", 0.58) if "evidence" in missing: return ("RED", "No evidence nodes inside capsule (cannot inspect annex/payout anchors).", 0.18) return ("RED", "Incomplete evidence: missing critical verification anchors.", 0.28) def make_terminal_and_inspector(capsule: Dict[str, Any], cep_id: str) -> Dict[str, Any]: terminal: List[str] = [] inspector: List[str] = [] schema = capsule.get("schema", "unknown") period = capsule.get("period", "unknown") model = capsule.get("model", {}) model_id = model.get("model_id", "unknown-model") if isinstance(model, dict) else "unknown-model" evidence = capsule.get("evidence", {}) if not isinstance(evidence, dict): evidence = {} meta = capsule.get("meta", {}) if isinstance(capsule.get("meta"), dict) else {} hashchain_root = meta.get("hashchain_sha256", "") hashchain_present = isinstance(hashchain_root, str) and bool(hashchain_root.strip()) sig_present = "signature" in capsule cap_sha = _sha256_hex(_canonical_json_bytes(capsule)) checks = { "schema": isinstance(schema, str) and schema != "unknown", "period": isinstance(period, str) and len(period) >= 4, "model_id": isinstance(model_id, str) and model_id != "unknown-model", "evidence": isinstance(evidence, dict) and len(evidence) > 0, "signature": bool(sig_present), "hashchain_root": bool(hashchain_present), } trust_level, trust_reason, trust_score = compute_trust(checks) terminal.append("CROVIA // PRISM CONSOLE v1") terminal.append(f"capsule {cep_id}") terminal.append(f"timestamp {_nowz()}") terminal.append(f"mode {'OPEN DEMO (light checks)' if OPEN_DEMO_MODE else 'FULL'}") terminal.append("") terminal.append("[STRUCTURE]") terminal.append(f"schema {schema}") terminal.append(f"model {model_id}") terminal.append(f"period {period}") terminal.append(f"evidence {len(evidence)} nodes") terminal.append("") terminal.append("[TRUST]") terminal.append(f"level {trust_level}") terminal.append(f"reason {trust_reason}") terminal.append("") terminal.append("[INTEGRITY]") terminal.append(f"capsule_sha256 {cap_sha}") terminal.append(f"signature {'present' if sig_present else 'missing'}") terminal.append(f"hashchain {('sha256:' + _short(hashchain_root, 16)) if hashchain_present else 'missing'}") terminal.append("") proof = f"crovia:v1;m={model_id};p={period};h={cap_sha};u=hf://{CEP_DATASET_ID}/{cep_id}.json" terminal.append("[PROOF STRING]") terminal.append(proof) terminal.append("") if trust_level != "GREEN": terminal.append("[WHY THIS MATTERS]") if not hashchain_present: terminal.append("- Not bound to a verifiable training run (no hashchain anchor).") if not sig_present: terminal.append("- Publisher not authenticated (no signature metadata).") if len(evidence) == 0: terminal.append("- No inspectable annex/payout nodes in evidence.") terminal.append("") inspector.append("INSPECTOR (real checks)") for k in ["schema", "period", "model_id", "evidence", "signature", "hashchain_root"]: inspector.append(f"- {k:13s} : {'OK' if checks[k] else 'FAIL'}") inspector.append("") inspector.append("TRUST SIGNAL") inspector.append(f"- level : {trust_level}") inspector.append(f"- score : {trust_score:.2f}") inspector.append(f"- note : {trust_reason}") inspector.append("") inspector.append("EVIDENCE NODES (first 18)") if evidence: for ek, ev in list(evidence.items())[:18]: if isinstance(ev, dict): sha = ev.get("sha256", "") url = ev.get("url", "") path = ev.get("path", "") line = f"- {ek}" if sha: line += f" | sha256:{_short(str(sha), 16)}" if url: line += f" | url:{str(url)[:88]}" if path: line += f" | path:{str(path)[:88]}" inspector.append(line) else: inspector.append(f"- {ek} | (non-object)") else: inspector.append("- (none)") nodes = list(evidence.keys()) anchors = ["receipts", "payouts", "signature", "hashchain"] for a in anchors: if a not in nodes: nodes.append(a) return { "cep_id": cep_id, "schema": schema, "period": period, "model_id": model_id, "trust_level": trust_level, "trust_reason": trust_reason, "trust_score": trust_score, "capsule_sha256": cap_sha, "signature_present": sig_present, "hashchain_present": hashchain_present, "hashchain_sha256": hashchain_root if hashchain_present else "", "proof": proof, "terminal": "\n".join(terminal), "inspector": "\n".join(inspector), "nodes": nodes, } def inspect_payload(cep_id: str) -> str: cep_id = (cep_id or "").strip() if not cep_id: return json.dumps({"error": "empty"}, ensure_ascii=False) try: cap = fetch_capsule(cep_id) except Exception as e: return json.dumps( { "error": "fetch_failed", "cep_id": cep_id, "detail": f"{type(e).__name__}: {e}", }, ensure_ascii=False, ) payload = make_terminal_and_inspector(cap, cep_id) return json.dumps(payload, ensure_ascii=False) CSS = r""" """ UI_HTML = r"""
Public inspection of AI training evidence capsules. No simulations. No assumptions. Only what exists.
Loading…
—