| """ |
| Deterministic policy engine. |
| |
| Given post content + full context (user history, engagement, geo), computes: |
| - violation_type |
| - base_severity (before modifiers) |
| - final_severity (after user-history + engagement + geo overrides) |
| - expected_action |
| |
| No ML — pure keyword/rule matching so evaluation is fully reproducible. |
| """ |
| from __future__ import annotations |
|
|
| import re |
|
|
| from models import ( |
| ActionType, |
| GroundTruth, |
| Severity, |
| SEVERITY_ORDER, |
| ViolationType, |
| ) |
|
|
| |
| |
| |
|
|
| _HARASSMENT_KEYWORDS = [ |
| r"\bkill\b", r"\bmurder\b", r"\bstab\b", r"\bshoot\b", |
| r"\bdead\b.*\byou\b", r"\byou.*\bdead\b", |
| r"\bfind you\b", r"\bwatch your back\b", r"\bmake you regret\b", |
| r"\bhate you\b", r"\bdestroy you\b", |
| ] |
|
|
| _MISINFORMATION_KEYWORDS = [ |
| r"\bvaccines? cause autism\b", r"\b5g.*covid\b", r"\bcovid.*5g\b", |
| r"\bdoctors are hiding\b", r"\bgovernment.*hiding\b", |
| r"\bspread awareness\b.*\bdelete this\b", |
| r"\bponzi scheme\b", r"\bfaked.*audit\b", |
| r"\bradiation weakens\b", |
| ] |
|
|
| _RESTRICTED_KEYWORDS = [ |
| r"\bscam\b", r"\bstealing\b", r"\bcriminals?\b", |
| r"\bproof\b.*\bfaked?\b", r"\bhave proof\b", |
| ] |
|
|
|
|
| def _matches_any(text: str, patterns: list[str]) -> bool: |
| lowered = text.lower() |
| return any(re.search(p, lowered) for p in patterns) |
|
|
|
|
| |
| |
| |
|
|
| _SEVERITY_LIST = [Severity.low, Severity.medium, Severity.high, Severity.critical] |
|
|
|
|
| def _bump(severity: Severity, n: int = 1) -> Severity: |
| idx = min(SEVERITY_ORDER[severity] + n, len(_SEVERITY_LIST) - 1) |
| return _SEVERITY_LIST[idx] |
|
|
|
|
| def _drop(severity: Severity, n: int = 1) -> Severity: |
| idx = max(SEVERITY_ORDER[severity] - n, 0) |
| return _SEVERITY_LIST[idx] |
|
|
|
|
| |
| |
| |
|
|
| class PolicyEngine: |
| """Evaluate a post against platform policy deterministically.""" |
|
|
| def evaluate( |
| self, |
| content: str, |
| geo: str, |
| user_history: list[str], |
| engagement: dict[str, int], |
| ) -> GroundTruth: |
| violation = self._detect_violation(content) |
| base_severity = self._base_severity(violation) |
| final_severity = self._apply_modifiers( |
| base_severity, violation, user_history, engagement, geo |
| ) |
| expected_action = self._map_action(violation, final_severity) |
|
|
| return GroundTruth( |
| violation_type=violation, |
| base_severity=base_severity, |
| final_severity=final_severity, |
| expected_action=expected_action, |
| ) |
|
|
| |
| |
| |
|
|
| def _detect_violation(self, content: str) -> ViolationType: |
| if _matches_any(content, _HARASSMENT_KEYWORDS): |
| return ViolationType.harassment |
| if _matches_any(content, _MISINFORMATION_KEYWORDS): |
| return ViolationType.misinformation |
| if _matches_any(content, _RESTRICTED_KEYWORDS): |
| return ViolationType.restricted |
| return ViolationType.safe |
|
|
| def _base_severity(self, violation: ViolationType) -> Severity: |
| return { |
| ViolationType.harassment: Severity.high, |
| ViolationType.misinformation: Severity.high, |
| ViolationType.restricted: Severity.medium, |
| ViolationType.safe: Severity.low, |
| }[violation] |
|
|
| def _apply_modifiers( |
| self, |
| severity: Severity, |
| violation: ViolationType, |
| user_history: list[str], |
| engagement: dict[str, int], |
| geo: str, |
| ) -> Severity: |
| |
| prior_violations = sum( |
| 1 for h in user_history if "violation" in h.lower() |
| ) |
| if prior_violations >= 2: |
| severity = _bump(severity) |
| elif prior_violations == 1: |
| severity = _bump(severity) if SEVERITY_ORDER[severity] < 3 else severity |
|
|
| |
| total_engagement = sum(engagement.values()) |
| if total_engagement > 10_000: |
| severity = _bump(severity) |
| elif total_engagement > 1_000: |
| severity = _bump(severity) if SEVERITY_ORDER[severity] < 2 else severity |
|
|
| |
| severity = self._geo_override(severity, violation, geo) |
|
|
| return severity |
|
|
| def _geo_override( |
| self, severity: Severity, violation: ViolationType, geo: str |
| ) -> Severity: |
| if geo == "EU": |
| |
| if violation in (ViolationType.misinformation, ViolationType.restricted): |
| severity = _bump(severity) |
| elif geo == "IN": |
| |
| if violation == ViolationType.restricted: |
| severity = _drop(severity) |
| |
| return severity |
|
|
| def _map_action( |
| self, violation: ViolationType, severity: Severity |
| ) -> ActionType: |
| if violation == ViolationType.safe: |
| return ActionType.allow |
|
|
| s = SEVERITY_ORDER[severity] |
|
|
| if violation == ViolationType.harassment: |
| |
| if s >= SEVERITY_ORDER[Severity.high]: |
| return ActionType.remove |
| return ActionType.flag |
|
|
| if violation == ViolationType.misinformation: |
| if s >= SEVERITY_ORDER[Severity.high]: |
| return ActionType.remove |
| if s == SEVERITY_ORDER[Severity.medium]: |
| return ActionType.flag |
| return ActionType.allow |
|
|
| if violation == ViolationType.restricted: |
| if s >= SEVERITY_ORDER[Severity.high]: |
| return ActionType.flag |
| if s == SEVERITY_ORDER[Severity.medium]: |
| return ActionType.escalate |
| return ActionType.allow |
|
|
| return ActionType.allow |
|
|