"""Deterministic policy post-processing for model verdicts. The LLM does extraction (find code spans, find competitor mentions). The policy lookup ("is this language in the allow-list?") is a set-membership check that doesn't need a 1.2B-parameter model. This module runs that lookup deterministically over the model's already-extracted matches. Stage 1 scope: code and competitor only. Other categories are pass-through. """ from __future__ import annotations from copy import deepcopy from schema.models import Verdict # Map scanner_name → category whose matches it filters. _CODE_SCANNERS = frozenset({"Code", "BanCode"}) _COMPETITOR_SCANNERS = frozenset({"BanCompetitors"}) def _find_policy(applied_policies: list[dict], scanner_names: frozenset) -> dict | None: for policy in applied_policies: if policy.get("scanner_name") in scanner_names: return policy return None def _filter_code_matches(matches: list, policy: dict | None) -> list: """Drop matches whose kind is in the Code allow-list. BanCode keeps all.""" if policy is None: return matches scanner = policy.get("scanner_name") if scanner == "BanCode": return matches if scanner == "Code": allow = {lang.lower() for lang in policy.get("scanner_params", {}).get("languages") or []} if not allow: return matches return [m for m in matches if (m.kind or "").lower() not in allow] return matches def _filter_competitor_matches(matches: list, policy: dict | None) -> list: """Keep only matches whose text matches a configured competitor (case-insensitive).""" if policy is None: return matches competitors = {c.lower() for c in policy.get("scanner_params", {}).get("competitors") or []} if not competitors: return matches out = [] for m in matches: text_lc = (m.text or "").strip().lower() if text_lc in competitors: out.append(m) continue # Permissive substring match: "Salesforce CRM" should match competitor "Salesforce" if any(c in text_lc for c in competitors): out.append(m) return out def apply_policy_postprocess( verdict: Verdict, applied_policies: list[dict] ) -> Verdict: """Return a new Verdict with code/competitor matches filtered against the request's policy and `matched` / `overall_blocked` recomputed. The model's emitted matches[] are treated as the extraction; the policy rule is applied here deterministically. """ new_categories = [] for category in verdict.categories: new_cat = category.model_copy(deep=True) if category.name == "code": policy = _find_policy(applied_policies, _CODE_SCANNERS) new_cat.matches = _filter_code_matches(new_cat.matches, policy) new_cat.matched = bool(new_cat.matches) elif category.name == "competitor": policy = _find_policy(applied_policies, _COMPETITOR_SCANNERS) new_cat.matches = _filter_competitor_matches(new_cat.matches, policy) new_cat.matched = bool(new_cat.matches) new_categories.append(new_cat) overall_blocked = any(c.matched for c in new_categories) return verdict.model_copy( update={ "categories": new_categories, "overall_blocked": overall_blocked, }, deep=True, )