openenv-content-moderation / server /policy_engine.py
ThejasRao's picture
Upload folder using huggingface_hub
0f8b6aa verified
"""
Deterministic policy engine.
Given post content + full context (user history, engagement, geo), computes:
- violation_type
- base_severity (before modifiers)
- final_severity (after user-history + engagement + geo overrides)
- expected_action
No ML — pure keyword/rule matching so evaluation is fully reproducible.
"""
from __future__ import annotations
import re
from models import (
ActionType,
GroundTruth,
Severity,
SEVERITY_ORDER,
ViolationType,
)
# ---------------------------------------------------------------------------
# Keyword banks
# ---------------------------------------------------------------------------
_HARASSMENT_KEYWORDS = [
r"\bkill\b", r"\bmurder\b", r"\bstab\b", r"\bshoot\b",
r"\bdead\b.*\byou\b", r"\byou.*\bdead\b",
r"\bfind you\b", r"\bwatch your back\b", r"\bmake you regret\b",
r"\bhate you\b", r"\bdestroy you\b",
]
_MISINFORMATION_KEYWORDS = [
r"\bvaccines? cause autism\b", r"\b5g.*covid\b", r"\bcovid.*5g\b",
r"\bdoctors are hiding\b", r"\bgovernment.*hiding\b",
r"\bspread awareness\b.*\bdelete this\b",
r"\bponzi scheme\b", r"\bfaked.*audit\b",
r"\bradiation weakens\b",
]
_RESTRICTED_KEYWORDS = [
r"\bscam\b", r"\bstealing\b", r"\bcriminals?\b",
r"\bproof\b.*\bfaked?\b", r"\bhave proof\b",
]
def _matches_any(text: str, patterns: list[str]) -> bool:
lowered = text.lower()
return any(re.search(p, lowered) for p in patterns)
# ---------------------------------------------------------------------------
# Severity ladder helpers
# ---------------------------------------------------------------------------
_SEVERITY_LIST = [Severity.low, Severity.medium, Severity.high, Severity.critical]
def _bump(severity: Severity, n: int = 1) -> Severity:
idx = min(SEVERITY_ORDER[severity] + n, len(_SEVERITY_LIST) - 1)
return _SEVERITY_LIST[idx]
def _drop(severity: Severity, n: int = 1) -> Severity:
idx = max(SEVERITY_ORDER[severity] - n, 0)
return _SEVERITY_LIST[idx]
# ---------------------------------------------------------------------------
# PolicyEngine
# ---------------------------------------------------------------------------
class PolicyEngine:
"""Evaluate a post against platform policy deterministically."""
def evaluate(
self,
content: str,
geo: str,
user_history: list[str],
engagement: dict[str, int],
) -> GroundTruth:
violation = self._detect_violation(content)
base_severity = self._base_severity(violation)
final_severity = self._apply_modifiers(
base_severity, violation, user_history, engagement, geo
)
expected_action = self._map_action(violation, final_severity)
return GroundTruth(
violation_type=violation,
base_severity=base_severity,
final_severity=final_severity,
expected_action=expected_action,
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _detect_violation(self, content: str) -> ViolationType:
if _matches_any(content, _HARASSMENT_KEYWORDS):
return ViolationType.harassment
if _matches_any(content, _MISINFORMATION_KEYWORDS):
return ViolationType.misinformation
if _matches_any(content, _RESTRICTED_KEYWORDS):
return ViolationType.restricted
return ViolationType.safe
def _base_severity(self, violation: ViolationType) -> Severity:
return {
ViolationType.harassment: Severity.high,
ViolationType.misinformation: Severity.high,
ViolationType.restricted: Severity.medium,
ViolationType.safe: Severity.low,
}[violation]
def _apply_modifiers(
self,
severity: Severity,
violation: ViolationType,
user_history: list[str],
engagement: dict[str, int],
geo: str,
) -> Severity:
# Prior violations escalate severity
prior_violations = sum(
1 for h in user_history if "violation" in h.lower()
)
if prior_violations >= 2:
severity = _bump(severity)
elif prior_violations == 1:
severity = _bump(severity) if SEVERITY_ORDER[severity] < 3 else severity
# High engagement spreads harm faster
total_engagement = sum(engagement.values())
if total_engagement > 10_000:
severity = _bump(severity)
elif total_engagement > 1_000:
severity = _bump(severity) if SEVERITY_ORDER[severity] < 2 else severity
# Geo overrides
severity = self._geo_override(severity, violation, geo)
return severity
def _geo_override(
self, severity: Severity, violation: ViolationType, geo: str
) -> Severity:
if geo == "EU":
# EU DSA: stricter on misinformation and defamation
if violation in (ViolationType.misinformation, ViolationType.restricted):
severity = _bump(severity)
elif geo == "IN":
# India: political content gets more lenience; health mis stays strict
if violation == ViolationType.restricted:
severity = _drop(severity)
# US: default — no geo adjustment
return severity
def _map_action(
self, violation: ViolationType, severity: Severity
) -> ActionType:
if violation == ViolationType.safe:
return ActionType.allow
s = SEVERITY_ORDER[severity]
if violation == ViolationType.harassment:
# Harassment: flag medium, remove high+
if s >= SEVERITY_ORDER[Severity.high]:
return ActionType.remove
return ActionType.flag
if violation == ViolationType.misinformation:
if s >= SEVERITY_ORDER[Severity.high]:
return ActionType.remove
if s == SEVERITY_ORDER[Severity.medium]:
return ActionType.flag
return ActionType.allow
if violation == ViolationType.restricted:
if s >= SEVERITY_ORDER[Severity.high]:
return ActionType.flag
if s == SEVERITY_ORDER[Severity.medium]:
return ActionType.escalate
return ActionType.allow
return ActionType.allow