| from __future__ import annotations |
|
|
| from dataclasses import dataclass, asdict |
| from typing import Any, Dict |
|
|
| from .config import HIGH_RISK_BLOCK_THRESHOLD, MEDIUM_RISK_REDACT_THRESHOLD |
| from .risk_model import RiskResult |
|
|
|
|
| @dataclass |
| class PolicyDecision: |
| allow: bool |
| redact_output: bool |
| reason: str |
| risk_score: float |
| flags: Dict[str, bool] |
|
|
|
|
| def decide_from_risk(risk: RiskResult) -> PolicyDecision: |
| score = risk.score |
|
|
| |
| def _build_detailed_reason(reason: str, risk_obj: RiskResult) -> str: |
| """Build a more informative reason from risk factors.""" |
| if risk_obj.reasons: |
| |
| for r in risk_obj.reasons: |
| if r and "detected" in r.lower(): |
| return reason + ": " + r |
| return reason |
|
|
| |
| if risk.flags.get("ssrf_attempt"): |
| detailed_reason = _build_detailed_reason("Blocked: SSRF attempt", risk) |
| return PolicyDecision( |
| allow=False, |
| redact_output=True, |
| reason=detailed_reason, |
| risk_score=score, |
| flags=risk.flags, |
| ) |
| if risk.flags.get("malicious_url"): |
| detailed_reason = _build_detailed_reason("Blocked: Malicious URL pattern", risk) |
| return PolicyDecision( |
| allow=False, |
| redact_output=True, |
| reason=detailed_reason, |
| risk_score=score, |
| flags=risk.flags, |
| ) |
| if risk.flags.get("jailbreak_like") or risk.flags.get("prompt_injection"): |
| detailed_reason = _build_detailed_reason("Blocked: Jailbreak or prompt injection attempt", risk) |
| return PolicyDecision( |
| allow=False, |
| redact_output=True, |
| reason=detailed_reason, |
| risk_score=score, |
| flags=risk.flags, |
| ) |
| if risk.flags.get("exfiltration_like"): |
| detailed_reason = _build_detailed_reason("Blocked: Data exfiltration attempt", risk) |
| return PolicyDecision( |
| allow=False, |
| redact_output=True, |
| reason=detailed_reason, |
| risk_score=score, |
| flags=risk.flags, |
| ) |
| if risk.flags.get("data_theft_like"): |
| detailed_reason = _build_detailed_reason("Blocked: Data theft or competitive intelligence", risk) |
| return PolicyDecision( |
| allow=False, |
| redact_output=True, |
| reason=detailed_reason, |
| risk_score=score, |
| flags=risk.flags, |
| ) |
|
|
| if score >= HIGH_RISK_BLOCK_THRESHOLD: |
| |
| threat_info = "" |
| if risk.flags.get("operational_disguise"): |
| threat_info = "operational disguise (credential theft)" |
| elif risk.flags.get("code_extraction"): |
| threat_info = "code extraction attempt" |
| elif risk.flags.get("enumeration_pattern"): |
| threat_info = "enumeration attempt" |
| elif risk.flags.get("unverified_authority_claim"): |
| threat_info = "unverified authority claim" |
|
|
| if threat_info: |
| detailed_reason = f"Blocked: {threat_info} (risk {score:.2f})" |
| else: |
| detailed_reason = f"Blocked: high risk ({score:.2f}) - {', '.join(risk.reasons[:1]) if risk.reasons else 'security policy violation'}" |
|
|
| return PolicyDecision( |
| allow=False, |
| redact_output=True, |
| reason=detailed_reason, |
| risk_score=score, |
| flags=risk.flags, |
| ) |
|
|
| if score >= MEDIUM_RISK_REDACT_THRESHOLD: |
| return PolicyDecision( |
| allow=True, |
| redact_output=True, |
| reason=f"Allowed with redaction: medium risk ({score:.2f})", |
| risk_score=score, |
| flags=risk.flags, |
| ) |
|
|
| return PolicyDecision( |
| allow=True, |
| redact_output=False, |
| reason=f"Allowed: low risk ({score:.2f})", |
| risk_score=score, |
| flags=risk.flags, |
| ) |
|
|
|
|
| def decision_to_dict(d: PolicyDecision) -> Dict[str, Any]: |
| return asdict(d) |
|
|