vijaym's picture
Upload folder using huggingface_hub
434e2be verified
"""Deterministic policy post-processing for model verdicts.
The LLM does extraction (find code spans, find competitor mentions). The
policy lookup ("is this language in the allow-list?") is a set-membership
check that doesn't need a 1.2B-parameter model. This module runs that
lookup deterministically over the model's already-extracted matches.
Stage 1 scope: code and competitor only. Other categories are pass-through.
"""
from __future__ import annotations
from copy import deepcopy
from schema.models import Verdict
# Map scanner_name → category whose matches it filters.
_CODE_SCANNERS = frozenset({"Code", "BanCode"})
_COMPETITOR_SCANNERS = frozenset({"BanCompetitors"})
def _find_policy(applied_policies: list[dict], scanner_names: frozenset) -> dict | None:
for policy in applied_policies:
if policy.get("scanner_name") in scanner_names:
return policy
return None
def _filter_code_matches(matches: list, policy: dict | None) -> list:
"""Drop matches whose kind is in the Code allow-list. BanCode keeps all."""
if policy is None:
return matches
scanner = policy.get("scanner_name")
if scanner == "BanCode":
return matches
if scanner == "Code":
allow = {lang.lower() for lang in policy.get("scanner_params", {}).get("languages") or []}
if not allow:
return matches
return [m for m in matches if (m.kind or "").lower() not in allow]
return matches
def _filter_competitor_matches(matches: list, policy: dict | None) -> list:
"""Keep only matches whose text matches a configured competitor (case-insensitive)."""
if policy is None:
return matches
competitors = {c.lower() for c in policy.get("scanner_params", {}).get("competitors") or []}
if not competitors:
return matches
out = []
for m in matches:
text_lc = (m.text or "").strip().lower()
if text_lc in competitors:
out.append(m)
continue
# Permissive substring match: "Salesforce CRM" should match competitor "Salesforce"
if any(c in text_lc for c in competitors):
out.append(m)
return out
def apply_policy_postprocess(
verdict: Verdict, applied_policies: list[dict]
) -> Verdict:
"""Return a new Verdict with code/competitor matches filtered against the
request's policy and `matched` / `overall_blocked` recomputed.
The model's emitted matches[] are treated as the extraction; the policy
rule is applied here deterministically.
"""
new_categories = []
for category in verdict.categories:
new_cat = category.model_copy(deep=True)
if category.name == "code":
policy = _find_policy(applied_policies, _CODE_SCANNERS)
new_cat.matches = _filter_code_matches(new_cat.matches, policy)
new_cat.matched = bool(new_cat.matches)
elif category.name == "competitor":
policy = _find_policy(applied_policies, _COMPETITOR_SCANNERS)
new_cat.matches = _filter_competitor_matches(new_cat.matches, policy)
new_cat.matched = bool(new_cat.matches)
new_categories.append(new_cat)
overall_blocked = any(c.matched for c in new_categories)
return verdict.model_copy(
update={
"categories": new_categories,
"overall_blocked": overall_blocked,
},
deep=True,
)