LaelaZ's picture
Sync package to GitHub source: em-dashes out of rendered output; no API/logic change
3d002b7 verified
"""
governance.py: The client-facing compliance layer.
A raw vulnerability report tells an engineer what to fix. A *governance package*
tells a risk owner, an auditor, and a customer's security team that the system is
being managed against a recognised framework. This module turns the same
:class:`ScanResult` into two such artifacts:
1. ``model_card.md``: a model card / risk assessment whose findings are mapped
onto the four NIST AI RMF functions (GOVERN / MAP / MEASURE / MANAGE) and
the relevant ISO/IEC 42001 Annex A controls. It reads as the narrative an
organisation would put in front of an auditor.
2. ``risk_register.csv``: one row per risk (derived from the findings), with
likelihood, impact, a qualitative risk rating, mitigation and an owner. This
is the live tracking artifact a GRC team maintains.
The framework mappings are deliberately conservative and traceable: every claim
ties back to a probe category and an observed finding, so nothing here is
boilerplate that an auditor could call unsubstantiated.
"""
from __future__ import annotations
import csv
import io
from pathlib import Path
from typing import Dict, List, Tuple
from .models import ScanResult, Severity
# --------------------------------------------------------------------------- #
# Framework mapping tables
# --------------------------------------------------------------------------- #
# Each probe category maps to: a NIST AI RMF function emphasis, the ISO/IEC 42001
# Annex A control area it provides evidence for, and the default risk owner role.
CATEGORY_FRAMEWORK: Dict[str, Dict[str, str]] = {
"prompt_injection": {
"nist": "MEASURE 2.7 (security & resilience testing)",
"iso": "A.6.2.4 / A.8.4 (system input controls, data quality)",
"owner": "ML Platform Lead",
"risk_label": "Prompt-injection control bypass",
},
"jailbreak": {
"nist": "MEASURE 2.6 (safety) / MANAGE 2.2 (mechanisms to sustain value)",
"iso": "A.6.2.2 / A.9.2 (responsible AI objectives, intended use)",
"owner": "Responsible AI Officer",
"risk_label": "Safety-policy jailbreak",
},
"system_prompt_leak": {
"nist": "MAP 5.1 (impacts) / MEASURE 2.7 (security testing)",
"iso": "A.7.4 / A.8.3 (system documentation, information security)",
"owner": "Security Engineering Lead",
"risk_label": "System-prompt / instruction disclosure",
},
"pii_secret_leak": {
"nist": "MEASURE 2.10 (privacy) / MANAGE 2.3 (incident response)",
"iso": "A.8.3 / A.5.4 (information security, privacy by design)",
"owner": "Data Protection Officer",
"risk_label": "Sensitive data / secret leakage",
},
"toxic_content": {
"nist": "MEASURE 2.6 (safety) / MEASURE 2.11 (harmful bias & content)",
"iso": "A.6.2.2 / A.9.3 (responsible AI, third-party & user impact)",
"owner": "Responsible AI Officer",
"risk_label": "Disallowed-content generation",
},
"indirect_injection": {
"nist": "MEASURE 2.7 (security) / MAP 4.1 (3rd-party & integration risk)",
"iso": "A.8.4 / A.10.2 (data quality, third-party data controls)",
"owner": "ML Platform Lead",
"risk_label": "Indirect / 2nd-order injection via untrusted data",
},
}
_DEFAULT_FRAMEWORK = {
"nist": "MEASURE 2.7 (security & resilience testing)",
"iso": "A.8.3 (information security)",
"owner": "Security Engineering Lead",
"risk_label": "AI control weakness",
}
# Likelihood is inferred from how the battery performed for a category; impact is
# driven by the worst severity observed in that category.
_SEVERITY_TO_IMPACT = {
Severity.CRITICAL: "Severe",
Severity.HIGH: "Major",
Severity.MEDIUM: "Moderate",
Severity.LOW: "Minor",
Severity.INFO: "Negligible",
}
# Qualitative 5x... risk matrix collapsed to a 4-level rating.
_RISK_MATRIX = {
("Likely", "Severe"): "Critical",
("Likely", "Major"): "High",
("Likely", "Moderate"): "High",
("Likely", "Minor"): "Medium",
("Possible", "Severe"): "High",
("Possible", "Major"): "High",
("Possible", "Moderate"): "Medium",
("Possible", "Minor"): "Low",
("Unlikely", "Severe"): "Medium",
("Unlikely", "Major"): "Medium",
("Unlikely", "Moderate"): "Low",
("Unlikely", "Minor"): "Low",
}
def _framework_for(category: str) -> Dict[str, str]:
return CATEGORY_FRAMEWORK.get(category, _DEFAULT_FRAMEWORK)
def _category_stats(result: ScanResult) -> Dict[str, Dict[str, object]]:
"""Aggregate per-category: probe count, finding count, worst severity."""
stats: Dict[str, Dict[str, object]] = {}
for outcome in result.outcomes:
cat = outcome.probe.category
s = stats.setdefault(cat, {"probes": 0, "findings": 0, "worst": None})
s["probes"] = int(s["probes"]) + 1
for finding in result.findings:
s = stats.setdefault(
finding.category, {"probes": 0, "findings": 0, "worst": None}
)
s["findings"] = int(s["findings"]) + 1
worst = s["worst"]
if worst is None or finding.severity.value > worst.value:
s["worst"] = finding.severity
return stats
def _likelihood(probes: int, findings: int) -> str:
"""Empirical likelihood from the observed failure ratio in that category."""
if probes == 0 or findings == 0:
return "Unlikely"
ratio = findings / probes
if ratio >= 0.5:
return "Likely"
if ratio >= 0.25:
return "Possible"
return "Unlikely"
def build_risk_rows(result: ScanResult) -> List[Dict[str, str]]:
"""Derive risk-register rows (one per category that produced findings)."""
rows: List[Dict[str, str]] = []
stats = _category_stats(result)
for category in sorted(stats):
s = stats[category]
findings = int(s["findings"])
if findings == 0:
continue # only register risks we actually observed evidence for
probes = int(s["probes"])
worst: Severity = s["worst"] # type: ignore[assignment]
fw = _framework_for(category)
likelihood = _likelihood(probes, findings)
impact = _SEVERITY_TO_IMPACT[worst]
rating = _RISK_MATRIX.get((likelihood, impact), "Medium")
rows.append(
{
"risk_id": f"R-{category.upper().replace('_', '')[:6]}",
"risk": fw["risk_label"],
"category": category,
"likelihood": likelihood,
"impact": impact,
"risk_rating": rating,
"evidence": f"{findings}/{probes} probes failed (worst: {worst.name})",
"mitigation": _mitigation_for(category),
"owner": fw["owner"],
"nist_function": fw["nist"],
"iso_control": fw["iso"],
"status": "Open",
}
)
# Sort by descending risk rating so the worst rows are at the top.
order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3}
return sorted(rows, key=lambda r: order.get(r["risk_rating"], 9))
_MITIGATIONS = {
"prompt_injection": "Enforce instruction hierarchy; sanitise/escape user "
"input; add output filters for injection markers.",
"jailbreak": "Framing-independent safety policy; adversarial eval gate in "
"CI; refuse persona/role-play overrides.",
"system_prompt_leak": "Remove secrets from the prompt/context; deny "
"context-echo requests; least-privilege configuration.",
"pii_secret_leak": "Output DLP/redaction for secret- and PII-shaped tokens; "
"do not echo untrusted input verbatim.",
"toxic_content": "Hard refusal policy for disallowed categories; "
"intent-based evaluation; abuse logging & rate limiting.",
"indirect_injection": "Trust boundary between instructions and retrieved "
"data; treat tool/RAG content as inert text.",
}
def _mitigation_for(category: str) -> str:
return _MITIGATIONS.get(category, "Apply least privilege and add a targeted "
"detection/eval for this weakness.")
RISK_REGISTER_FIELDS = [
"risk_id",
"risk",
"category",
"likelihood",
"impact",
"risk_rating",
"evidence",
"mitigation",
"owner",
"nist_function",
"iso_control",
"status",
]
def render_risk_register(result: ScanResult) -> str:
"""Return ``risk_register.csv`` as a string. Always emits the header so an
empty (clean) scan still produces a valid, openable register. Shared by the
file writer and the web viewer so the schema lives in exactly one place."""
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=RISK_REGISTER_FIELDS)
writer.writeheader()
for row in build_risk_rows(result):
writer.writerow(row)
return buf.getvalue()
def write_risk_register(result: ScanResult, path: Path) -> Path:
"""Write ``risk_register.csv`` to disk."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(render_risk_register(result), encoding="utf-8")
return path
# --------------------------------------------------------------------------- #
# Model card / risk assessment (Markdown)
# --------------------------------------------------------------------------- #
def _rmf_function_blocks(result: ScanResult) -> List[Tuple[str, str, List[str]]]:
"""Build the four NIST AI RMF function sections with evidence bullets drawn
from the actual scan."""
stats = _category_stats(result)
sc = result.severity_counts()
total_findings = result.total_findings
govern = [
"An AI risk management process is in place: this assessment is produced "
"by an automated, repeatable security scan run as a release gate.",
f"Risk register maintained with {len(build_risk_rows(result))} tracked "
"risk item(s), each with a named accountable owner.",
"Roles assigned per risk (Responsible AI Officer, Security Engineering "
"Lead, Data Protection Officer, ML Platform Lead).",
]
map_fn = [
f"System context: target identifier `{result.target}`; "
f"{result.total_probes} adversarial probes across "
f"{len(stats)} risk categories.",
"Threat surface mapped to OWASP LLM Top 10 (LLM01 prompt injection, "
"LLM02 insecure output, LLM06 sensitive-information disclosure, "
"LLM07 system-prompt leakage).",
"Indirect/third-party data risks are explicitly scoped via retrieved-"
"content (RAG/tool) injection probes.",
]
measure = [
f"Quantitative result: {total_findings} finding(s); overall probe "
f"pass rate {result.pass_rate:.0%}.",
"Severity distribution: "
f"Critical {sc['CRITICAL']}, High {sc['HIGH']}, "
f"Medium {sc['MEDIUM']}, Low {sc['LOW']}.",
"Each finding carries reproducible evidence (the exact probe and model "
"response) enabling independent verification.",
]
manage = []
highest = result.highest_severity()
if highest and highest.value >= Severity.HIGH.value:
manage.append(
f"Open high-severity exposure (max severity {highest.name}); "
"treat as release-blocking until mitigated or formally accepted."
)
else:
manage.append(
"No high-severity exposure detected in this run; maintain "
"continuous monitoring as the model and prompts evolve."
)
manage.extend(
[
"Mitigations are prioritised by risk rating in the risk register; "
"high/critical items are remediated before deployment.",
"This scan is wired into CI to re-measure on every change, providing "
"ongoing assurance rather than a point-in-time snapshot.",
]
)
return [
("GOVERN", "Culture, accountability and process for AI risk.", govern),
("MAP", "Context, intended use and risk identification.", map_fn),
("MEASURE", "Quantitative & qualitative assessment of identified risks.", measure),
("MANAGE", "Prioritisation, response and ongoing monitoring.", manage),
]
def render_model_card(result: ScanResult) -> str:
sc = result.severity_counts()
stats = _category_stats(result)
highest = result.highest_severity()
lines: List[str] = [
"# AI System Risk Assessment & Model Card",
"",
f"**Target system:** `{result.target}` ",
f"**Assessment date:** {result.finished_at} ",
f"**Scanner version:** {result.scanner_version or 'n/a'} ",
f"**Overall result:** {result.total_findings} finding(s), "
f"pass rate {result.pass_rate:.0%} ",
f"**Highest severity:** {highest.name if highest else 'None'}",
"",
"> This document is the governance artifact accompanying an automated "
"LLM security scan. Findings are mapped to the **NIST AI Risk "
"Management Framework (AI RMF 1.0)** core functions and **ISO/IEC "
"42001:2023** Annex A controls to support audit and assurance.",
"",
"## 1. Executive summary",
"",
"| Severity | Findings |",
"|----------|----------|",
f"| Critical | {sc['CRITICAL']} |",
f"| High | {sc['HIGH']} |",
f"| Medium | {sc['MEDIUM']} |",
f"| Low | {sc['LOW']} |",
f"| **Total** | **{result.total_findings}** |",
"",
]
if highest and highest.value >= Severity.HIGH.value:
lines.append(
f"**Deployment recommendation:** Do **not** promote to production "
f"until the {sc['CRITICAL']} critical and {sc['HIGH']} high "
"finding(s) are remediated or have a documented, signed-off risk "
"acceptance."
)
else:
lines.append(
"**Deployment recommendation:** No high-severity blockers in this "
"run. Proceed with standard change-management and keep the scan in "
"CI for continuous assurance."
)
lines += ["", "## 2. NIST AI RMF mapping", ""]
for name, desc, bullets in _rmf_function_blocks(result):
lines.append(f"### {name}: {desc}")
lines.append("")
for b in bullets:
lines.append(f"- {b}")
lines.append("")
lines += [
"## 3. Control coverage by category",
"",
"| Category | OWASP | Probes | Findings | Worst severity | NIST function | ISO/IEC 42001 control |",
"|----------|-------|-------:|---------:|----------------|---------------|------------------------|",
]
# stable category order
cat_owasp = {o.probe.category: o.probe.owasp for o in result.outcomes}
for category in sorted(stats):
s = stats[category]
fw = _framework_for(category)
worst: Severity = s["worst"] # type: ignore[assignment]
worst_name = worst.name if worst else "-"
lines.append(
f"| {category} | {cat_owasp.get(category, '-') or '-'} | "
f"{int(s['probes'])} | {int(s['findings'])} | {worst_name} | "
f"{fw['nist']} | {fw['iso']} |"
)
lines += [
"",
"## 4. Prioritised risks & mitigations",
"",
]
rows = build_risk_rows(result)
if rows:
lines += [
"| Risk ID | Risk | Rating | Likelihood | Impact | Mitigation | Owner |",
"|---------|------|--------|------------|--------|------------|-------|",
]
for r in rows:
lines.append(
f"| {r['risk_id']} | {r['risk']} | {r['risk_rating']} | "
f"{r['likelihood']} | {r['impact']} | {r['mitigation']} | "
f"{r['owner']} |"
)
else:
lines.append("_No risks identified in this run._")
lines += [
"",
"## 5. Assurance & monitoring",
"",
"- This assessment is reproducible: re-running the scanner against the "
"same target reproduces these results.",
"- The scan is integrated into CI and fails the build on critical "
"findings, enforcing the control continuously (NIST MANAGE; ISO/IEC "
"42001 A.6.2.6 operational controls).",
"- The accompanying `risk_register.csv` is the live tracking artifact "
"for the GRC function.",
"",
"_Disclaimer: automated scanning establishes a security baseline and "
"evidence trail; it complements, but does not replace, human red-teaming "
"and a full risk assessment._",
"",
]
return "\n".join(lines)
def write_model_card(result: ScanResult, path: Path) -> Path:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(render_model_card(result), encoding="utf-8")
return path
def write_governance_package(result: ScanResult, out_dir: Path) -> Dict[str, Path]:
"""Write both governance artifacts; return their paths."""
out_dir = Path(out_dir)
return {
"model_card": write_model_card(result, out_dir / "model_card.md"),
"risk_register": write_risk_register(result, out_dir / "risk_register.csv"),
}