import re import os from app.core.logging import get_logger logger = get_logger(__name__) class GuardClassifier: """ GuardClassifier integrates a fine-tuned DistilBERT instance aiming for strictly relevant inputs (>0.70 confidence threshold). If the fine-tuned model path does not exist (or torch is not installed), it falls back to permissive Regex heuristics. """ def __init__(self, model_path: str = "fine_tuning/guard_classifier/model"): self.model_path = model_path self._model = None self._tokenizer = None if os.path.exists(self.model_path) and os.listdir(self.model_path): try: import torch # noqa: F811 — lazy import, not installed in CI or prod API from transformers import AutoTokenizer, AutoModelForSequenceClassification logger.info("Loading GuardClassifier model from %s", self.model_path) self._tokenizer = AutoTokenizer.from_pretrained(self.model_path) self._model = AutoModelForSequenceClassification.from_pretrained(self.model_path) self._model.eval() except ImportError: logger.warning("torch/transformers not installed, falling back to rule-based guard.") self._model = None except Exception as e: logger.warning("Failed to load DistilBERT Guard model, falling back to rule-based: %s", e) self._model = None else: logger.info("GuardClassifier model path not found, falling back to rule-based.") def is_safe_and_relevant(self, query: str) -> bool: """Wrapper to maintain existing pipeline signature.""" safe, score = self.is_in_scope(query) return safe def is_in_scope(self, text: str) -> tuple[bool, float]: """ Returns (is_in_scope, confidence_score). Threshold: 0.70. Below threshold -> out of scope. """ if not self._model or not self._tokenizer: result = self._rule_based_check(text) return (result, 1.0 if result else 0.0) try: import torch inputs = self._tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128) with torch.no_grad(): outputs = self._model(**inputs) probs = torch.softmax(outputs.logits, dim=-1)[0] in_scope_prob = probs[1].item() is_in_scope = in_scope_prob >= 0.70 return (is_in_scope, float(in_scope_prob)) except Exception as e: logger.warning("Inference error, reverting to rules: %s", e) result = self._rule_based_check(text) return (result, 1.0 if result else 0.0) # Compiled once at class load — cheaper than recompiling per call. _INJECTION_PATTERNS: list = [] @classmethod def _build_patterns(cls) -> list: """Compile and cache all injection-detection regexes.""" if cls._INJECTION_PATTERNS: return cls._INJECTION_PATTERNS raw = [ # ── Classic prompt injection ────────────────────────────────────── r"ignore\s+(all\s+)?(previous|prior|above|earlier)\s+(instructions?|prompts?|rules?|context)", r"disregard\s+(all\s+)?(previous|prior|above)\s+(instructions?|prompts?|rules?|context)", r"forget\s+(everything|all\s+(previous|prior|your))", r"override\s+(your\s+)?(instructions?|rules?|directives?|constraints?)", r"bypass\s+your\s+(restrictions?|safety|filters?|rules?|instructions?)", r"(do\s+not\s+follow|stop\s+following)\s+(your\s+)?(instructions?|rules?|guidelines?)", # ── System prompt extraction ────────────────────────────────────── r"(repeat|print|output|reveal|show|display|dump|share)\s+(your\s+)?(system\s+)?(prompt|instructions?|rules?|directives?|constraints?|message)", r"what\s+(are|were)\s+your\s+(instructions?|rules?|system\s+prompt|directives?)", r"(tell|show)\s+me\s+(your\s+)?(system|initial|original|hidden|secret)\s+(prompt|instructions?|message)", r"\bsystem\s+message\b", # ── Role / persona jailbreaks ───────────────────────────────────── r"you\s+are\s+now\s+(a\s+|an\s+)?(?!(darshan|assistant))", r"(pretend|act|behave)\s+(like|as\s+if)\s+you\s+(are|have\s+no|don.t\s+have)", r"(pretend|imagine|assume|suppose)\s+you\s+(are|were)\s+(a\s+|an\s+)?(?!(darshan))", r"roleplay\s+as", r"(simulate|impersonate)\s+(a\s+|an\s+)?(different|other|unrestricted|evil|jailbroken)", r"(act|respond)\s+as\s+if\s+you\s+(have\s+no|don.t\s+have)\s+(restrictions?|rules?|guidelines?|filters?|safety)", r"you\s+(have\s+no|don.t\s+have)\s+(restrictions?|rules?|limits?|filters?)", r"\bdan\s+(mode|prompt|jailbreak)\b", r"developer\s+mode", r"jailbreak\b", r"unrestricted\s+(mode|access|version|ai)", r"no\s+filter(s|ed)?\s+(mode|version|response)", # ── Hypothetical / simulation bypass (meta-instruction targeted only) ───── # Note: kept narrow on purpose — Darshan has security/infosec repos and # visitors may legitimately ask about prompt injection, exploits, bypass # techniques, etc. as topics. These patterns only fire when they are # clearly attempts to change the *bot's behaviour*, not discuss a topic. r"in\s+a\s+(simulation|hypothetical|imaginary|alternate)\s+(scenario|world|universe).{0,30}(no\s+rules?|no\s+restrictions?|you\s+can)", r"(act|respond|behave).{0,20}as\s+if.{0,20}(no\s+restrictions?|no\s+rules?|unrestricted|jailbroken)", # ── User private-info extraction ────────────────────────────────── r"(what|share|give|show|tell).{0,20}(user.{0,10})?(email|phone|address|password|credit.?card|ssn|date.of.birth|location|ip.?address)", r"(collect|store|log|extract|retrieve|access).{0,20}(user|visitor|personal)\s+(data|info|information|details)", r"(do\s+you\s+have|can\s+you\s+access).{0,20}(my|the\s+user.s?)\s+(email|phone|data|address|password)", # ── Reputation / defamation attacks ────────────────────────────── r"(say|write|tell|claim|state)\s+(that\s+)?darshan\s+(is|was|has\s+been).{0,40}(bad|stupid|incompetent|fraud|liar|criminal|terrible|fake|cheat)", r"(make|portray|describe)\s+darshan.{0,20}(negatively|badly|unfavorably|as\s+a\s+(fraud|liar|failure))", r"write\s+a\s+(negative|bad|false|defamatory|fake).{0,20}(review|statement|claim).{0,20}(about|of)\s+darshan", r"(discredit|slander|defame|insult|mock)\s+darshan", # ── Instruction injection via delimiters ────────────────────────── r"<\|\s*(system|user|assistant|im_start|im_end)\s*\|>", r"<<\s*sys\s*>>", r"\[\s*inst\s*\]", r"---\s*system\s*---", r"#+\s*system\s*prompt", r"#+\s*new\s+instructions?", # ── Training-data poisoning signals ────────────────────────────── r"(add|inject|insert|plant|embed)\s+(this|the\s+following|text|instructions?)\s+(into|in)\s+(your\s+)?(training|context|memory|knowledge)", r"remember\s+(this|the\s+following)\s+(for\s+(future|all|every)|always)", r"from\s+now\s+on\s+(you\s+)?(must|will|should|always)", r"update\s+your\s+(instructions?|rules?|behaviour|system\s+prompt)", ] cls._INJECTION_PATTERNS = [re.compile(p, re.IGNORECASE) for p in raw] return cls._INJECTION_PATTERNS def _rule_based_check(self, text: str) -> bool: """Block on any known injection pattern; permissive otherwise.""" for pattern in self._build_patterns(): if pattern.search(text): return False return True