| """Deterministic policy post-processing for model verdicts. |
| |
| The LLM does extraction (find code spans, find competitor mentions). The |
| policy lookup ("is this language in the allow-list?") is a set-membership |
| check that doesn't need a 1.2B-parameter model. This module runs that |
| lookup deterministically over the model's already-extracted matches. |
| |
| Stage 1 scope: code and competitor only. Other categories are pass-through. |
| """ |
| from __future__ import annotations |
|
|
| from copy import deepcopy |
|
|
| from schema.models import Verdict |
|
|
|
|
| |
| _CODE_SCANNERS = frozenset({"Code", "BanCode"}) |
| _COMPETITOR_SCANNERS = frozenset({"BanCompetitors"}) |
|
|
|
|
| def _find_policy(applied_policies: list[dict], scanner_names: frozenset) -> dict | None: |
| for policy in applied_policies: |
| if policy.get("scanner_name") in scanner_names: |
| return policy |
| return None |
|
|
|
|
| def _filter_code_matches(matches: list, policy: dict | None) -> list: |
| """Drop matches whose kind is in the Code allow-list. BanCode keeps all.""" |
| if policy is None: |
| return matches |
| scanner = policy.get("scanner_name") |
| if scanner == "BanCode": |
| return matches |
| if scanner == "Code": |
| allow = {lang.lower() for lang in policy.get("scanner_params", {}).get("languages") or []} |
| if not allow: |
| return matches |
| return [m for m in matches if (m.kind or "").lower() not in allow] |
| return matches |
|
|
|
|
| def _filter_competitor_matches(matches: list, policy: dict | None) -> list: |
| """Keep only matches whose text matches a configured competitor (case-insensitive).""" |
| if policy is None: |
| return matches |
| competitors = {c.lower() for c in policy.get("scanner_params", {}).get("competitors") or []} |
| if not competitors: |
| return matches |
| out = [] |
| for m in matches: |
| text_lc = (m.text or "").strip().lower() |
| if text_lc in competitors: |
| out.append(m) |
| continue |
| |
| if any(c in text_lc for c in competitors): |
| out.append(m) |
| return out |
|
|
|
|
| def apply_policy_postprocess( |
| verdict: Verdict, applied_policies: list[dict] |
| ) -> Verdict: |
| """Return a new Verdict with code/competitor matches filtered against the |
| request's policy and `matched` / `overall_blocked` recomputed. |
| |
| The model's emitted matches[] are treated as the extraction; the policy |
| rule is applied here deterministically. |
| """ |
| new_categories = [] |
| for category in verdict.categories: |
| new_cat = category.model_copy(deep=True) |
| if category.name == "code": |
| policy = _find_policy(applied_policies, _CODE_SCANNERS) |
| new_cat.matches = _filter_code_matches(new_cat.matches, policy) |
| new_cat.matched = bool(new_cat.matches) |
| elif category.name == "competitor": |
| policy = _find_policy(applied_policies, _COMPETITOR_SCANNERS) |
| new_cat.matches = _filter_competitor_matches(new_cat.matches, policy) |
| new_cat.matched = bool(new_cat.matches) |
| new_categories.append(new_cat) |
|
|
| overall_blocked = any(c.matched for c in new_categories) |
|
|
| return verdict.model_copy( |
| update={ |
| "categories": new_categories, |
| "overall_blocked": overall_blocked, |
| }, |
| deep=True, |
| ) |
|
|