Spaces:
Running on Zero
Running on Zero
| """ | |
| arabguard/core.py | |
| ================= | |
| Main entry point for the ArabGuard SDK. | |
| Pipeline β strict 3-phase execution | |
| ------------------------------------- | |
| PHASE 1 β NORMALIZATION | |
| β normalize_and_detect(raw_text, debug=True) | |
| β β normalized_text, base_score, steps{intent/code/arabic/keyword scores} | |
| β | |
| PHASE 2 β REGEX (runs on NORMALIZED text only) | |
| β ArabicRegexSecurityLayer β per-group matching + categorization | |
| β RegexSecurityLayer β per-group matching + categorization | |
| β β matched patterns, category labels, regex score bump | |
| β | |
| PHASE 3 β MARBERT AI (conditional) | |
| β Activates only when: | |
| β β’ 80 β€ final_score β€ 120, OR | |
| β β’ decision is FLAG or BLOCKED | |
| β β ai_prediction (0/1), ai_confidence (0.0β1.0) | |
| pipeline_steps schema (forwarded to dashboard) | |
| ----------------------------------------------- | |
| # β Phase 1 ββββββββββββββββββββββββββββββββββββββββββ | |
| "phase_1_normalization": { | |
| "raw_input": str, # original text | |
| "normalized_text": str, # after deobfuscation | |
| "intent_score": int, # sanitize_malicious_code_intent() | |
| "code_score": int, # analyze_code_patterns() | |
| "arabic_kw_score": int, # detect_arabic_injection() | |
| "keyword_score": int, # dangerous keyword scan | |
| "base_score": int, # sum of above (pre-regex) | |
| "pipeline_decision": str, # SAFE|FLAG|BLOCKED from pipeline alone | |
| "transformations": list, # which transforms fired (base64, hex, β¦) | |
| } | |
| # β Phase 2 ββββββββββββββββββββββββββββββββββββββββββ | |
| "phase_2_regex": { | |
| "ran_on": str, # "normalized_text" | |
| "arabic": { | |
| "fired": bool, | |
| "category": str, # e.g. "ignore_instructions" | |
| "match_count": int, | |
| "matched_patterns":list, # up to 3 truncated pattern strings | |
| }, | |
| "english": { | |
| "fired": bool, | |
| "category": str, | |
| "match_count": int, | |
| "matched_patterns":list, | |
| }, | |
| "regex_score_bump": int, # score added by regex hits | |
| "score_after_regex": int, | |
| "decision_after_regex":str, | |
| } | |
| # β Phase 3 ββββββββββββββββββββββββββββββββββββββββββ | |
| "phase_3_ai": { | |
| "activated": bool, | |
| "reason": str, # why AI was / was not activated | |
| "prediction": int|None, # 0=safe, 1=malicious | |
| "confidence": float|None, # 0.0β1.0 | |
| "label": str|None, # "MALICIOUS"|"SAFE"|None | |
| "score_contribution": int, # score bump from AI (if any) | |
| "decision_after_ai": str, | |
| } | |
| # β Final ββββββββββββββββββββββββββββββββββββββββββββ | |
| "final_score": int, | |
| "final_decision": str, | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| import warnings | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from .pipeline import normalize_and_detect | |
| from .security_layers import ( | |
| ArabicRegexSecurityLayer, | |
| RegexSecurityLayer, | |
| CombinedSecurityLayer, | |
| ) | |
| logger = logging.getLogger("arabguard.core") | |
| # ββ AI dependency check ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _TRANSFORMERS_AVAILABLE = False | |
| _TORCH_AVAILABLE = False | |
| AutoTokenizer = None # type: ignore[assignment] | |
| AutoModelForSequenceClassification = None # type: ignore[assignment] | |
| torch = None # type: ignore[assignment] | |
| try: | |
| import torch as _torch | |
| _TORCH_AVAILABLE = True | |
| torch = _torch | |
| logger.debug("torch %s imported", _torch.__version__) | |
| except ImportError as _e: | |
| logger.warning( | |
| "torch not found (%s) β AI layer will be disabled. " | |
| "Install: pip install torch", _e, | |
| ) | |
| try: | |
| from transformers import ( | |
| AutoTokenizer as _AT, | |
| AutoModelForSequenceClassification as _AM, | |
| ) | |
| AutoTokenizer = _AT # type: ignore[assignment] | |
| AutoModelForSequenceClassification = _AM # type: ignore[assignment] | |
| _TRANSFORMERS_AVAILABLE = True | |
| logger.debug("transformers imported") | |
| except ImportError as _e: | |
| logger.warning( | |
| "transformers not found (%s) β AI layer will be disabled. " | |
| "Install: pip install transformers scipy", _e, | |
| ) | |
| AI_DEPS_AVAILABLE: bool = _TRANSFORMERS_AVAILABLE and _TORCH_AVAILABLE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PATTERN β CATEGORY MAP (for readable dashboard labels) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Map each security_layers group attribute β human-readable category label | |
| _ARABIC_GROUP_LABELS: Dict[str, str] = { | |
| "basic_ignore_patterns": "Ignore / Cancel Instructions", | |
| "arabic_role_change_patterns": "Role Change / Hijack", | |
| "arabic_system_access_patterns": "System Access / Prompt Leak", | |
| "arabic_jailbreak_patterns": "Jailbreak Trigger", | |
| "arabic_sensitive_info_patterns":"Sensitive Information Request", | |
| "arabic_adversarial_patterns": "Adversarial Manipulation", | |
| "arabic_force_answer_patterns": "Force-Answer Attempt", | |
| } | |
| _ENGLISH_GROUP_LABELS: Dict[str, str] = { | |
| "ignore_patterns": "Ignore / Override Instructions", | |
| "role_change_patterns": "Role Change / Hijack", | |
| "system_access_patterns": "System Access", | |
| "prompt_leaking_patterns": "Prompt Leak", | |
| "jailbreak_patterns": "Jailbreak Trigger", | |
| "context_manipulation": "Context Manipulation", | |
| "sensitive_info_patterns": "Sensitive Information", | |
| "adversarial_patterns": "Adversarial Manipulation", | |
| "stealthy_patterns": "Stealthy Injection", | |
| "exfiltration_patterns":"Data Exfiltration", | |
| "multi_turn_patterns": "Multi-Turn Attack", | |
| "obfuscation_patterns": "Obfuscation", | |
| "encoding_patterns": "Encoding Attack", | |
| } | |
| def _categorize_match( | |
| pattern: str, | |
| layer_instance: Any, | |
| group_labels: Dict[str, str], | |
| ) -> str: | |
| """ | |
| Walk the layer's named pattern groups to find which group contains | |
| ``pattern``, then return the human-readable category label. | |
| Falls back to "Unknown Pattern" if not found. | |
| """ | |
| for attr, label in group_labels.items(): | |
| group = getattr(layer_instance, attr, []) | |
| if pattern in group: | |
| return label | |
| return "Unknown Pattern" | |
| def _truncate_pattern(p: str, maxlen: int = 60) -> str: | |
| """Truncate a raw regex string for safe dashboard display.""" | |
| if len(p) <= maxlen: | |
| return p | |
| return p[:maxlen] + "β¦" | |
| def _detect_transformations(raw: str, normalized: str) -> List[str]: | |
| """ | |
| Compare raw vs normalized text and report which transforms were applied. | |
| Used to populate pipeline_steps.phase_1_normalization.transformations. | |
| """ | |
| transforms: List[str] = [] | |
| # Base64 decode | |
| if re.search(r"[A-Za-z0-9+/=]{12,}", raw): | |
| if normalized != raw: | |
| transforms.append("base64_decode") | |
| # Hex decode | |
| if re.search(r"\b[0-9a-fA-F]{8,}\b", raw): | |
| transforms.append("hex_decode") | |
| # Unicode normalization (NFKC) | |
| import unicodedata | |
| if unicodedata.normalize("NFKC", raw) != raw: | |
| transforms.append("unicode_nfkc") | |
| # HTML entities | |
| import html as _html | |
| if _html.unescape(raw) != raw: | |
| transforms.append("html_unescape") | |
| # Split-letter merging (heuristic: single chars separated by spaces) | |
| if re.search(r"(?:\b[A-Za-z]\b\s+){3,}", raw): | |
| transforms.append("split_letter_merge") | |
| # Excessive char repetition | |
| if re.search(r"(.)\1{3,}", raw): | |
| transforms.append("repetition_collapse") | |
| # Arabic normalization (different alef forms etc.) | |
| arabic_variants = re.compile(r"[Ψ’Ψ£Ψ₯Ω±]") | |
| if arabic_variants.search(raw): | |
| transforms.append("arabic_normalize") | |
| return transforms if transforms else ["none"] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GUARD RESULT DATACLASS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GuardResult: | |
| """ | |
| Full analysis result returned by :meth:`ArabGuard.analyze`. | |
| decision "SAFE" | "FLAG" | "BLOCKED" | |
| score 0β300 | |
| is_blocked decision == "BLOCKED" | |
| is_flagged decision in {"FLAG", "BLOCKED"} | |
| normalized_text text after full deobfuscation pipeline | |
| matched_pattern first regex match, or None | |
| all_matched_patterns all matched regex strings | |
| pipeline_steps rich per-phase breakdown (see module docstring) | |
| reason human-readable explanation | |
| ai_confidence MARBERT confidence 0.0β1.0, None if AI not used | |
| ai_prediction 0=safe, 1=malicious, None if AI not used | |
| """ | |
| decision : str | |
| score : int | |
| is_blocked : bool | |
| is_flagged : bool | |
| normalized_text : str | |
| matched_pattern : Optional[str] = field(default=None) | |
| all_matched_patterns: List[str] = field(default_factory=list) | |
| pipeline_steps : Dict[str, Any] = field(default_factory=dict) | |
| reason : str = "" | |
| ai_confidence : Optional[float] = field(default=None) | |
| ai_prediction : Optional[int] = field(default=None) | |
| def __bool__(self) -> bool: | |
| return not self.is_flagged | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "decision": self.decision, | |
| "score": self.score, | |
| "is_blocked": self.is_blocked, | |
| "is_flagged": self.is_flagged, | |
| "normalized_text": self.normalized_text, | |
| "matched_pattern": self.matched_pattern, | |
| "all_matched_patterns": self.all_matched_patterns, | |
| "pipeline_steps": self.pipeline_steps, | |
| "reason": self.reason, | |
| "ai_confidence": self.ai_confidence, | |
| "ai_prediction": self.ai_prediction, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN CLASS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ArabGuard: | |
| """ | |
| Multi-layer Arabic/English prompt-injection and jailbreak detector. | |
| Detection pipeline β 3 strict phases | |
| ------------------------------------- | |
| Phase 1 Normalization | |
| Deobfuscates the raw text, runs keyword / intent / code scoring. | |
| Produces: normalized_text, base_score, preliminary decision. | |
| Phase 2 Regex (on normalized text) | |
| Runs Arabic and English regex layers on the NORMALIZED text. | |
| Per-group categorization is stored in pipeline_steps. | |
| Produces: matched patterns, regex score bump, updated decision. | |
| Phase 3 MARBERT AI (conditional) | |
| Activates only when: 80 β€ score β€ 120 OR decision is FLAG/BLOCKED. | |
| Produces: ai_prediction, ai_confidence, final decision. | |
| Parameters | |
| ---------- | |
| use_ai : bool | |
| Enable MARBERT AI layer. Default ``True``. | |
| Falls back to ``False`` gracefully if deps are missing. | |
| ai_model_name : str | |
| HuggingFace model id. Default ``"d12o6aa/ArabGuard"``. | |
| block_on_flag : bool | |
| Treat FLAG as BLOCKED (strict mode). Default ``False``. | |
| custom_score_threshold : Optional[int] | |
| Override default BLOCKED threshold (120). | |
| device : Optional[str] | |
| ``"cpu"`` | ``"cuda"`` | ``"mps"`` | ``None`` (auto-detect). | |
| """ | |
| def __init__( | |
| self, | |
| use_ai : bool = True, | |
| ai_model_name : str = "d12o6aa/ArabGuard", | |
| block_on_flag : bool = False, | |
| custom_score_threshold: Optional[int] = None, | |
| device : Optional[str] = None, | |
| ): | |
| self.block_on_flag = block_on_flag | |
| self.custom_score_threshold = custom_score_threshold | |
| self.ai_model_name = ai_model_name | |
| # Regex layers | |
| self._arabic = ArabicRegexSecurityLayer() | |
| self._english = RegexSecurityLayer() | |
| self._combined = CombinedSecurityLayer() | |
| # AI model state β always defined even when disabled | |
| self._tokenizer: Any = None | |
| self._model : Any = None | |
| self._device : Optional[str] = None | |
| if use_ai and not AI_DEPS_AVAILABLE: | |
| warnings.warn( | |
| "ArabGuard: use_ai=True but transformers/torch are not installed. " | |
| "AI layer disabled. " | |
| f"(transformers={_TRANSFORMERS_AVAILABLE}, torch={_TORCH_AVAILABLE}) " | |
| "Fix: pip install 'arabguard[ai]'", | |
| RuntimeWarning, | |
| stacklevel=2, | |
| ) | |
| self.use_ai = False | |
| else: | |
| self.use_ai = use_ai | |
| if self.use_ai: | |
| self._load_ai_model(device) | |
| # ββ AI model setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_ai_model(self, device: Optional[str] = None) -> None: | |
| """Load the MARBERT classifier from Hugging Face Hub.""" | |
| try: | |
| if device is None: | |
| if torch.cuda.is_available(): | |
| device = "cuda" | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| device = "mps" | |
| else: | |
| device = "cpu" | |
| self._device = device | |
| logger.info( | |
| "Loading AI model '%s' β device='%s' β¦", | |
| self.ai_model_name, self._device, | |
| ) | |
| self._tokenizer = AutoTokenizer.from_pretrained( | |
| self.ai_model_name, use_fast=True, | |
| ) | |
| self._model = AutoModelForSequenceClassification.from_pretrained( | |
| self.ai_model_name, | |
| ) | |
| self._model.to(self._device) | |
| self._model.eval() | |
| logger.info( | |
| "AI model ready β device=%s params=%s", | |
| self._device, | |
| f"{sum(p.numel() for p in self._model.parameters()):,}", | |
| ) | |
| except Exception as exc: | |
| warnings.warn( | |
| f"ArabGuard: failed to load model '{self.ai_model_name}': {exc}. " | |
| "AI layer disabled β regex+pipeline will still run.", | |
| RuntimeWarning, | |
| stacklevel=3, | |
| ) | |
| logger.error("AI model load failed: %s", exc, exc_info=True) | |
| self.use_ai = False | |
| self._tokenizer = None | |
| self._model = None | |
| self._device = None | |
| # ββ AI inference ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _ai_predict(self, text: str) -> Tuple[int, float]: | |
| """ | |
| Run MARBERT inference on ``text``. | |
| Returns (prediction, confidence) | |
| prediction : 0 = safe, 1 = malicious | |
| confidence : 0.0β1.0 | |
| """ | |
| if not self.use_ai or self._model is None: | |
| return 0, 0.0 | |
| try: | |
| inputs = self._tokenizer( | |
| text, | |
| return_tensors = "pt", | |
| truncation = True, | |
| max_length = 512, | |
| padding = True, | |
| ) | |
| inputs = {k: v.to(self._device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| logits = self._model(**inputs).logits | |
| probs = torch.softmax(logits, dim=-1) | |
| prediction = int(torch.argmax(probs, dim=-1).item()) | |
| confidence = float(probs[0, prediction].item()) | |
| logger.debug( | |
| "_ai_predict pred=%d conf=%.3f text=%r", | |
| prediction, confidence, text[:60], | |
| ) | |
| return prediction, confidence | |
| except Exception as exc: | |
| warnings.warn( | |
| f"ArabGuard: AI inference failed: {exc}. Defaulting to safe.", | |
| RuntimeWarning, | |
| stacklevel=2, | |
| ) | |
| logger.warning("AI inference error: %s", exc) | |
| return 0, 0.0 | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def check(self, text: str) -> bool: | |
| """Fast boolean: True = safe, False = blocked/flagged.""" | |
| return not self.analyze(text).is_flagged | |
| def analyze(self, text: str) -> GuardResult: | |
| """ | |
| Full 3-phase analysis. | |
| Returns a GuardResult whose ``pipeline_steps`` dict contains one | |
| nested section per phase, suitable for professional dashboard display. | |
| """ | |
| if not isinstance(text, str): | |
| text = str(text) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PHASE 1 β NORMALIZATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # normalize_and_detect() runs: | |
| # 1. sanitize_malicious_code_intent β intent_score | |
| # 2. analyze_code_patterns β code_score | |
| # 3. detect_arabic_injection β arabic_kw_score | |
| # 4-12. unicode/html/emoji/b64/hex/deobfuscate/split/collapse | |
| # 13. dangerous keyword scoring β keyword_score | |
| # | |
| normalized, base_score, p1_decision, raw_steps = normalize_and_detect( | |
| text, debug=True | |
| ) | |
| # Apply custom score threshold before regex | |
| if self.custom_score_threshold is not None: | |
| if base_score >= self.custom_score_threshold: | |
| p1_decision = "BLOCKED" | |
| elif p1_decision == "BLOCKED": | |
| p1_decision = "FLAG" | |
| transformations = _detect_transformations(text, normalized) | |
| phase1: Dict[str, Any] = { | |
| "raw_input": text, | |
| "normalized_text": normalized, | |
| "intent_score": raw_steps.get("intent_score", 0), | |
| "code_score": raw_steps.get("code_score", 0), | |
| "arabic_kw_score": raw_steps.get("arabic_score", 0), | |
| "keyword_score": raw_steps.get("keyword_score", 0), | |
| "base_score": base_score, | |
| "pipeline_decision": p1_decision, | |
| "transformations": transformations, | |
| } | |
| score = base_score | |
| decision = p1_decision | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PHASE 2 β REGEX (on normalized text only) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Run Arabic + English layers on the NORMALIZED text. | |
| # Per-group categorization gives the dashboard meaningful labels | |
| # instead of raw regex strings. | |
| # | |
| # β Arabic layer ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ar_all_matches: List[str] = self._arabic.get_all_matches(normalized) | |
| ar_first: Optional[str] = self._arabic.get_matched_pattern(normalized) | |
| ar_fired = bool(ar_first) | |
| ar_category = ( | |
| _categorize_match(ar_first, self._arabic, _ARABIC_GROUP_LABELS) | |
| if ar_first else "β" | |
| ) | |
| ar_display_patterns = [ | |
| _truncate_pattern(p) for p in ar_all_matches[:3] | |
| ] | |
| # β English layer βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| en_all_matches: List[str] = self._english.get_all_matches(normalized) | |
| en_first: Optional[str] = self._english.get_matched_pattern(normalized) | |
| en_fired = bool(en_first) | |
| en_category = ( | |
| _categorize_match(en_first, self._english, _ENGLISH_GROUP_LABELS) | |
| if en_first else "β" | |
| ) | |
| en_display_patterns = [ | |
| _truncate_pattern(p) for p in en_all_matches[:3] | |
| ] | |
| # β Consolidate βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| all_matched: List[str] = list(dict.fromkeys(ar_all_matches + en_all_matches)) | |
| first_match: Optional[str] = ar_first or en_first | |
| regex_hit = bool(first_match) | |
| # β Score + decision bump from regex hits ββββββββββββββββββββββββββ | |
| regex_score_bump = 0 | |
| if regex_hit and decision == "SAFE": | |
| decision = "FLAG" | |
| regex_score_bump = max(0, 85 - score) | |
| score = max(score, 85) | |
| if ar_fired and decision != "BLOCKED": | |
| bump = max(0, 130 - score) | |
| regex_score_bump += bump | |
| score = max(score, 130) | |
| decision = "BLOCKED" | |
| if en_fired and decision != "BLOCKED": | |
| bump = max(0, 130 - score) | |
| regex_score_bump += bump | |
| score = max(score, 130) | |
| decision = "BLOCKED" | |
| phase2: Dict[str, Any] = { | |
| "ran_on": "normalized_text", | |
| "arabic": { | |
| "fired": ar_fired, | |
| "category": ar_category, | |
| "match_count": len(ar_all_matches), | |
| "matched_patterns": ar_display_patterns, | |
| }, | |
| "english": { | |
| "fired": en_fired, | |
| "category": en_category, | |
| "match_count": len(en_all_matches), | |
| "matched_patterns": en_display_patterns, | |
| }, | |
| "regex_score_bump": regex_score_bump, | |
| "score_after_regex": score, | |
| "decision_after_regex": decision, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PHASE 3 β MARBERT AI (conditional) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # Activation condition (as requested): | |
| # β’ 80 β€ score β€ 120 (FLAG / borderline BLOCKED zone) | |
| # β’ OR decision is FLAG | |
| # β’ OR decision is BLOCKED (AI confirms or second-opinion) | |
| # | |
| ai_prediction : Optional[int] = None | |
| ai_confidence : Optional[float] = None | |
| ai_score_bump : int = 0 | |
| in_borderline = (80 <= score <= 120) | |
| needs_confirm = decision in {"FLAG", "BLOCKED"} | |
| should_use_ai = self.use_ai and (in_borderline or needs_confirm) | |
| if should_use_ai: | |
| activation_reason = ( | |
| f"score={score} in [80,120]" if in_borderline | |
| else f"decision={decision} requires confirmation" | |
| ) | |
| elif not self.use_ai: | |
| activation_reason = "AI disabled (transformers not installed)" | |
| else: | |
| activation_reason = ( | |
| f"score={score} outside [80,120] and decision={decision} β skipped" | |
| ) | |
| if should_use_ai: | |
| ai_prediction, ai_confidence = self._ai_predict(normalized) | |
| if ai_prediction == 1: | |
| if ai_confidence >= 0.75: | |
| prev_score = score | |
| score = max(score, 130) | |
| ai_score_bump = score - prev_score | |
| decision = "BLOCKED" | |
| logger.info( | |
| "AI β BLOCKED conf=%.3f score=%d text=%r", | |
| ai_confidence, score, text[:60], | |
| ) | |
| elif ai_confidence >= 0.55: | |
| if decision == "SAFE": | |
| decision = "FLAG" | |
| prev_score = score | |
| score = max(score, 85) | |
| ai_score_bump = score - prev_score | |
| else: | |
| # AI confident it's safe β can downgrade FLAG (not BLOCKED) | |
| if decision == "FLAG" and ai_confidence is not None and ai_confidence < 0.35: | |
| decision = "SAFE" | |
| score = min(score, 60) | |
| logger.debug("AI downgraded FLAG β SAFE conf=%.3f", ai_confidence) | |
| phase3: Dict[str, Any] = { | |
| "activated": should_use_ai, | |
| "reason": activation_reason, | |
| "prediction": ai_prediction, | |
| "confidence": round(ai_confidence, 4) if ai_confidence is not None else None, | |
| "label": ( | |
| "MALICIOUS" if ai_prediction == 1 | |
| else "SAFE" if ai_prediction == 0 | |
| else None | |
| ), | |
| "score_contribution": ai_score_bump, | |
| "decision_after_ai": decision, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BLOCK-ON-FLAG + FINALIZE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if self.block_on_flag and decision == "FLAG": | |
| decision = "BLOCKED" | |
| final_score = min(score, 300) | |
| # ββ Assemble full pipeline_steps dict (dashboard-ready) βββββββββββ | |
| pipeline_steps: Dict[str, Any] = { | |
| "phase_1_normalization": phase1, | |
| "phase_2_regex": phase2, | |
| "phase_3_ai": phase3, | |
| "final_score": final_score, | |
| "final_decision": decision, | |
| } | |
| # ββ Build human-readable reason βββββββββββββββββββββββββββββββββββ | |
| reason = self._build_reason( | |
| decision, final_score, | |
| first_match, phase1, | |
| phase2, phase3, | |
| ) | |
| logger.debug( | |
| "analyze() β %s score=%d ai_conf=%s", | |
| decision, final_score, | |
| f"{ai_confidence:.3f}" if ai_confidence is not None else "N/A", | |
| ) | |
| return GuardResult( | |
| decision = decision, | |
| score = final_score, | |
| is_blocked = decision == "BLOCKED", | |
| is_flagged = decision in {"FLAG", "BLOCKED"}, | |
| normalized_text = normalized, | |
| matched_pattern = first_match, | |
| all_matched_patterns = all_matched, | |
| pipeline_steps = pipeline_steps, | |
| reason = reason, | |
| ai_confidence = ai_confidence, | |
| ai_prediction = ai_prediction, | |
| ) | |
| def batch_check(self, texts: List[str]) -> List[bool]: | |
| """Check a list of texts. Returns True for each safe text.""" | |
| return [self.check(t) for t in texts] | |
| def batch_analyze(self, texts: List[str]) -> List[GuardResult]: | |
| """Analyze a list of texts. Returns one GuardResult per input.""" | |
| return [self.analyze(t) for t in texts] | |
| # ββ Internal helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_reason( | |
| decision : str, | |
| score : int, | |
| match : Optional[str], | |
| phase1 : Dict[str, Any], | |
| phase2 : Dict[str, Any], | |
| phase3 : Dict[str, Any], | |
| ) -> str: | |
| """ | |
| Compose a human-readable explanation from all three phases. | |
| Shown in ScannerPanel and the expanded ThreatTable row. | |
| """ | |
| if decision == "SAFE": | |
| base = f"No threats detected (score={score}/300)." | |
| p3 = phase3 | |
| if p3.get("activated") and p3.get("label") == "SAFE": | |
| base += f" AI confirms safe (confidence={p3['confidence']:.2f})." | |
| return base | |
| parts: List[str] = [f"Decision: {decision} | Score: {score}/300."] | |
| # Phase 1 contributions | |
| if phase1.get("intent_score", 0) > 0: | |
| parts.append(f"[P1] Malicious code intent (+{phase1['intent_score']}).") | |
| if phase1.get("arabic_kw_score", 0) > 0: | |
| parts.append(f"[P1] Arabic injection keyword (+{phase1['arabic_kw_score']}).") | |
| if phase1.get("code_score", 0) > 0: | |
| parts.append(f"[P1] Suspicious code pattern (+{phase1['code_score']}).") | |
| if phase1.get("keyword_score", 0) > 0: | |
| parts.append(f"[P1] Dangerous keywords (+{phase1['keyword_score']}).") | |
| # Phase 2 contributions | |
| ar = phase2.get("arabic", {}) | |
| en = phase2.get("english", {}) | |
| if ar.get("fired"): | |
| parts.append(f"[P2-AR] {ar['category']} ({ar['match_count']} pattern(s) matched).") | |
| if en.get("fired"): | |
| parts.append(f"[P2-EN] {en['category']} ({en['match_count']} pattern(s) matched).") | |
| if match: | |
| short = (_truncate_pattern(match, 70)) | |
| parts.append(f"[P2] First match: {short}") | |
| # Phase 3 contribution | |
| p3 = phase3 | |
| if p3.get("activated") and p3.get("label"): | |
| conf = p3.get("confidence") or 0.0 | |
| label = p3["label"] | |
| parts.append(f"[P3-AI] {label} (confidence={conf:.2f}).") | |
| return " ".join(parts) | |
| def __repr__(self) -> str: | |
| ai = f"enabled on {self._device}" if self.use_ai else "disabled" | |
| return ( | |
| f"ArabGuard(use_ai={ai}, " | |
| f"block_on_flag={self.block_on_flag}, " | |
| f"model={self.ai_model_name!r})" | |
| ) | |