Spaces:
Running
Running
| import re | |
| import os | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| class GuardClassifier: | |
| """ | |
| GuardClassifier integrates a fine-tuned DistilBERT instance aiming | |
| for strictly relevant inputs (>0.70 confidence threshold). | |
| If the fine-tuned model path does not exist (or torch is not installed), | |
| it falls back to permissive Regex heuristics. | |
| """ | |
| def __init__(self, model_path: str = "fine_tuning/guard_classifier/model"): | |
| self.model_path = model_path | |
| self._model = None | |
| self._tokenizer = None | |
| if os.path.exists(self.model_path) and os.listdir(self.model_path): | |
| try: | |
| import torch # noqa: F811 β lazy import, not installed in CI or prod API | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| logger.info("Loading GuardClassifier model from %s", self.model_path) | |
| self._tokenizer = AutoTokenizer.from_pretrained(self.model_path) | |
| self._model = AutoModelForSequenceClassification.from_pretrained(self.model_path) | |
| self._model.eval() | |
| except ImportError: | |
| logger.warning("torch/transformers not installed, falling back to rule-based guard.") | |
| self._model = None | |
| except Exception as e: | |
| logger.warning("Failed to load DistilBERT Guard model, falling back to rule-based: %s", e) | |
| self._model = None | |
| else: | |
| logger.info("GuardClassifier model path not found, falling back to rule-based.") | |
| def is_safe_and_relevant(self, query: str) -> bool: | |
| """Wrapper to maintain existing pipeline signature.""" | |
| safe, score = self.is_in_scope(query) | |
| return safe | |
| def is_in_scope(self, text: str) -> tuple[bool, float]: | |
| """ | |
| Returns (is_in_scope, confidence_score). | |
| Threshold: 0.70. Below threshold -> out of scope. | |
| """ | |
| if not self._model or not self._tokenizer: | |
| result = self._rule_based_check(text) | |
| return (result, 1.0 if result else 0.0) | |
| try: | |
| import torch | |
| inputs = self._tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128) | |
| with torch.no_grad(): | |
| outputs = self._model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=-1)[0] | |
| in_scope_prob = probs[1].item() | |
| is_in_scope = in_scope_prob >= 0.70 | |
| return (is_in_scope, float(in_scope_prob)) | |
| except Exception as e: | |
| logger.warning("Inference error, reverting to rules: %s", e) | |
| result = self._rule_based_check(text) | |
| return (result, 1.0 if result else 0.0) | |
| # Compiled once at class load β cheaper than recompiling per call. | |
| _INJECTION_PATTERNS: list = [] | |
| def _build_patterns(cls) -> list: | |
| """Compile and cache all injection-detection regexes.""" | |
| if cls._INJECTION_PATTERNS: | |
| return cls._INJECTION_PATTERNS | |
| raw = [ | |
| # ββ Classic prompt injection ββββββββββββββββββββββββββββββββββββββ | |
| r"ignore\s+(all\s+)?(previous|prior|above|earlier)\s+(instructions?|prompts?|rules?|context)", | |
| r"disregard\s+(all\s+)?(previous|prior|above)\s+(instructions?|prompts?|rules?|context)", | |
| r"forget\s+(everything|all\s+(previous|prior|your))", | |
| r"override\s+(your\s+)?(instructions?|rules?|directives?|constraints?)", | |
| r"bypass\s+your\s+(restrictions?|safety|filters?|rules?|instructions?)", | |
| r"(do\s+not\s+follow|stop\s+following)\s+(your\s+)?(instructions?|rules?|guidelines?)", | |
| # ββ System prompt extraction ββββββββββββββββββββββββββββββββββββββ | |
| r"(repeat|print|output|reveal|show|display|dump|share)\s+(your\s+)?(system\s+)?(prompt|instructions?|rules?|directives?|constraints?|message)", | |
| r"what\s+(are|were)\s+your\s+(instructions?|rules?|system\s+prompt|directives?)", | |
| r"(tell|show)\s+me\s+(your\s+)?(system|initial|original|hidden|secret)\s+(prompt|instructions?|message)", | |
| r"\bsystem\s+message\b", | |
| # ββ Role / persona jailbreaks βββββββββββββββββββββββββββββββββββββ | |
| r"you\s+are\s+now\s+(a\s+|an\s+)?(?!(darshan|assistant))", | |
| r"(pretend|act|behave)\s+(like|as\s+if)\s+you\s+(are|have\s+no|don.t\s+have)", | |
| r"(pretend|imagine|assume|suppose)\s+you\s+(are|were)\s+(a\s+|an\s+)?(?!(darshan))", | |
| r"roleplay\s+as", | |
| r"(simulate|impersonate)\s+(a\s+|an\s+)?(different|other|unrestricted|evil|jailbroken)", | |
| r"(act|respond)\s+as\s+if\s+you\s+(have\s+no|don.t\s+have)\s+(restrictions?|rules?|guidelines?|filters?|safety)", | |
| r"you\s+(have\s+no|don.t\s+have)\s+(restrictions?|rules?|limits?|filters?)", | |
| r"\bdan\s+(mode|prompt|jailbreak)\b", | |
| r"developer\s+mode", | |
| r"jailbreak\b", | |
| r"unrestricted\s+(mode|access|version|ai)", | |
| r"no\s+filter(s|ed)?\s+(mode|version|response)", | |
| # ββ Hypothetical / simulation bypass (meta-instruction targeted only) βββββ | |
| # Note: kept narrow on purpose β Darshan has security/infosec repos and | |
| # visitors may legitimately ask about prompt injection, exploits, bypass | |
| # techniques, etc. as topics. These patterns only fire when they are | |
| # clearly attempts to change the *bot's behaviour*, not discuss a topic. | |
| r"in\s+a\s+(simulation|hypothetical|imaginary|alternate)\s+(scenario|world|universe).{0,30}(no\s+rules?|no\s+restrictions?|you\s+can)", | |
| r"(act|respond|behave).{0,20}as\s+if.{0,20}(no\s+restrictions?|no\s+rules?|unrestricted|jailbroken)", | |
| # ββ User private-info extraction ββββββββββββββββββββββββββββββββββ | |
| r"(what|share|give|show|tell).{0,20}(user.{0,10})?(email|phone|address|password|credit.?card|ssn|date.of.birth|location|ip.?address)", | |
| r"(collect|store|log|extract|retrieve|access).{0,20}(user|visitor|personal)\s+(data|info|information|details)", | |
| r"(do\s+you\s+have|can\s+you\s+access).{0,20}(my|the\s+user.s?)\s+(email|phone|data|address|password)", | |
| # ββ Reputation / defamation attacks ββββββββββββββββββββββββββββββ | |
| r"(say|write|tell|claim|state)\s+(that\s+)?darshan\s+(is|was|has\s+been).{0,40}(bad|stupid|incompetent|fraud|liar|criminal|terrible|fake|cheat)", | |
| r"(make|portray|describe)\s+darshan.{0,20}(negatively|badly|unfavorably|as\s+a\s+(fraud|liar|failure))", | |
| r"write\s+a\s+(negative|bad|false|defamatory|fake).{0,20}(review|statement|claim).{0,20}(about|of)\s+darshan", | |
| r"(discredit|slander|defame|insult|mock)\s+darshan", | |
| # ββ Instruction injection via delimiters ββββββββββββββββββββββββββ | |
| r"<\|\s*(system|user|assistant|im_start|im_end)\s*\|>", | |
| r"<<\s*sys\s*>>", | |
| r"\[\s*inst\s*\]", | |
| r"---\s*system\s*---", | |
| r"#+\s*system\s*prompt", | |
| r"#+\s*new\s+instructions?", | |
| # ββ Training-data poisoning signals ββββββββββββββββββββββββββββββ | |
| r"(add|inject|insert|plant|embed)\s+(this|the\s+following|text|instructions?)\s+(into|in)\s+(your\s+)?(training|context|memory|knowledge)", | |
| r"remember\s+(this|the\s+following)\s+(for\s+(future|all|every)|always)", | |
| r"from\s+now\s+on\s+(you\s+)?(must|will|should|always)", | |
| r"update\s+your\s+(instructions?|rules?|behaviour|system\s+prompt)", | |
| ] | |
| cls._INJECTION_PATTERNS = [re.compile(p, re.IGNORECASE) for p in raw] | |
| return cls._INJECTION_PATTERNS | |
| def _rule_based_check(self, text: str) -> bool: | |
| """Block on any known injection pattern; permissive otherwise.""" | |
| for pattern in self._build_patterns(): | |
| if pattern.search(text): | |
| return False | |
| return True | |