yuki-sui's picture
Upload 169 files
ed71b0e verified
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Any, Dict
from .config import HIGH_RISK_BLOCK_THRESHOLD, MEDIUM_RISK_REDACT_THRESHOLD
from .risk_model import RiskResult
@dataclass
class PolicyDecision:
allow: bool
redact_output: bool
reason: str
risk_score: float
flags: Dict[str, bool]
def decide_from_risk(risk: RiskResult) -> PolicyDecision:
score = risk.score
# Build detailed reason from risk factors if available
def _build_detailed_reason(reason: str, risk_obj: RiskResult) -> str:
"""Build a more informative reason from risk factors."""
if risk_obj.reasons:
# Use the most specific reason available
for r in risk_obj.reasons:
if r and "detected" in r.lower():
return reason + ": " + r
return reason
# Immediate hard-blocks for certain flags irrespective of score
if risk.flags.get("ssrf_attempt"):
detailed_reason = _build_detailed_reason("Blocked: SSRF attempt", risk)
return PolicyDecision(
allow=False,
redact_output=True,
reason=detailed_reason,
risk_score=score,
flags=risk.flags,
)
if risk.flags.get("malicious_url"):
detailed_reason = _build_detailed_reason("Blocked: Malicious URL pattern", risk)
return PolicyDecision(
allow=False,
redact_output=True,
reason=detailed_reason,
risk_score=score,
flags=risk.flags,
)
if risk.flags.get("jailbreak_like") or risk.flags.get("prompt_injection"):
detailed_reason = _build_detailed_reason("Blocked: Jailbreak or prompt injection attempt", risk)
return PolicyDecision(
allow=False,
redact_output=True,
reason=detailed_reason,
risk_score=score,
flags=risk.flags,
)
if risk.flags.get("exfiltration_like"):
detailed_reason = _build_detailed_reason("Blocked: Data exfiltration attempt", risk)
return PolicyDecision(
allow=False,
redact_output=True,
reason=detailed_reason,
risk_score=score,
flags=risk.flags,
)
if risk.flags.get("data_theft_like"):
detailed_reason = _build_detailed_reason("Blocked: Data theft or competitive intelligence", risk)
return PolicyDecision(
allow=False,
redact_output=True,
reason=detailed_reason,
risk_score=score,
flags=risk.flags,
)
if score >= HIGH_RISK_BLOCK_THRESHOLD:
# For score-based blocks, include specific threat info
threat_info = ""
if risk.flags.get("operational_disguise"):
threat_info = "operational disguise (credential theft)"
elif risk.flags.get("code_extraction"):
threat_info = "code extraction attempt"
elif risk.flags.get("enumeration_pattern"):
threat_info = "enumeration attempt"
elif risk.flags.get("unverified_authority_claim"):
threat_info = "unverified authority claim"
if threat_info:
detailed_reason = f"Blocked: {threat_info} (risk {score:.2f})"
else:
detailed_reason = f"Blocked: high risk ({score:.2f}) - {', '.join(risk.reasons[:1]) if risk.reasons else 'security policy violation'}"
return PolicyDecision(
allow=False,
redact_output=True,
reason=detailed_reason,
risk_score=score,
flags=risk.flags,
)
if score >= MEDIUM_RISK_REDACT_THRESHOLD:
return PolicyDecision(
allow=True,
redact_output=True,
reason=f"Allowed with redaction: medium risk ({score:.2f})",
risk_score=score,
flags=risk.flags,
)
return PolicyDecision(
allow=True,
redact_output=False,
reason=f"Allowed: low risk ({score:.2f})",
risk_score=score,
flags=risk.flags,
)
def decision_to_dict(d: PolicyDecision) -> Dict[str, Any]:
return asdict(d)