Spaces:
Sleeping
Sleeping
| """ | |
| Lazy-loading inference wrapper for the GRPO attacker/defender checkpoints. | |
| Models are loaded from Hugging Face on first use and cached in-process. | |
| A status_callback(msg) is fired at each stage so the UI can show live | |
| "Loading attacker model…" / "Loading defender model…" messages. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import threading | |
| from typing import Callable, Optional | |
| ATTACKER_REPO = "RapidOrc121/Incident-Response-attacker" | |
| DEFENDER_REPO = "RapidOrc121/Incident-response-defender" | |
| ATTACKS = ["PHISH", "BRUTEFORCE", "DRIVEBY", "RANSOM", "SQLI", "RCE", "LPE", "SUPPLYCHAIN"] | |
| DEFENSES = ["MFA", "PATCH", "EDR", "BACKUP", "WAF", "LEASTPRIV", "SBOM", "ROTATEKEYS"] | |
| _ATK_RE = re.compile(r"ATTACK:\s*(PHISH|BRUTEFORCE|DRIVEBY|RANSOM|SQLI|RCE|LPE|SUPPLYCHAIN)\b", re.IGNORECASE) | |
| _DEF_RE = re.compile(r"DEFEND:\s*(MFA|PATCH|EDR|BACKUP|WAF|LEASTPRIV|SBOM|ROTATEKEYS)\b", re.IGNORECASE) | |
| _lock = threading.Lock() | |
| _cache: dict = {} # keys: "attacker", "defender" | |
| _hf_available: Optional[bool] = None | |
| def _check_hf() -> bool: | |
| """Return True if transformers+torch are available for inference.""" | |
| global _hf_available | |
| if _hf_available is not None: | |
| return _hf_available | |
| try: | |
| import torch # noqa: F401 | |
| from transformers import pipeline # noqa: F401 | |
| _hf_available = True | |
| except ImportError: | |
| _hf_available = False | |
| return _hf_available | |
| BASE_MODEL = "unsloth/qwen2.5-0.5b-instruct-unsloth-bnb-4bit" | |
| def _load_model(adapter_repo: str, role: str, cb: Callable[[str], None]): | |
| """ | |
| Load the base Qwen2.5-0.5B (4-bit) model then apply the LoRA adapter. | |
| Both repos are PEFT adapter checkpoints — no standalone config.json. | |
| """ | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline | |
| from peft import PeftModel | |
| import torch | |
| cb(f"Loading base model ({BASE_MODEL})…") | |
| use_gpu = torch.cuda.is_available() | |
| bnb_cfg = BitsAndBytesConfig(load_in_4bit=True) if use_gpu else None | |
| tok = AutoTokenizer.from_pretrained(BASE_MODEL) | |
| base = AutoModelForCausalLM.from_pretrained( | |
| BASE_MODEL, | |
| quantization_config=bnb_cfg, | |
| device_map="auto" if use_gpu else None, | |
| torch_dtype=torch.float16 if use_gpu else torch.float32, | |
| low_cpu_mem_usage=True, | |
| ) | |
| cb(f"Base model loaded. Applying {role} LoRA adapter from {adapter_repo}…") | |
| model = PeftModel.from_pretrained(base, adapter_repo) | |
| model.eval() | |
| cb(f"{role.capitalize()} adapter merged. Building pipeline…") | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tok, | |
| max_new_tokens=48, | |
| do_sample=True, | |
| temperature=0.4, | |
| ) | |
| cb(f"{role.capitalize()} model ready.") | |
| return pipe | |
| def get_model(role: str, cb: Callable[[str], None]): | |
| """Return cached pipeline for role ('attacker'|'defender'), loading if needed.""" | |
| with _lock: | |
| if role not in _cache: | |
| repo = ATTACKER_REPO if role == "attacker" else DEFENDER_REPO | |
| _cache[role] = _load_model(repo, role, cb) | |
| else: | |
| cb(f"{role.capitalize()} model already loaded.") | |
| return _cache[role] | |
| def _parse(text: str, role: str) -> str: | |
| pat = _ATK_RE if role == "attacker" else _DEF_RE | |
| m = pat.search(text or "") | |
| return m.group(1).upper() if m else "" | |
| def generate_action(role: str, prompt: str, cb: Callable[[str], None]) -> str: | |
| """ | |
| Run inference for one role using the GRPO checkpoint. | |
| Returns the raw action string e.g. "ATTACK: PHISH" or "DEFEND: MFA", | |
| or "" if the model output could not be parsed. | |
| """ | |
| if not _check_hf(): | |
| cb("transformers/torch not installed — cannot run model inference.") | |
| return "" | |
| pipe = get_model(role, cb) | |
| cb(f"Running {role} inference…") | |
| output = pipe(prompt, return_full_text=False) | |
| raw = output[0]["generated_text"] if output else "" | |
| action = _parse(raw, role) | |
| if not action: | |
| cb(f"Could not parse {role} output: {raw!r:.80}") | |
| else: | |
| prefix = "ATTACK" if role == "attacker" else "DEFEND" | |
| cb(f"Model chose: {prefix}: {action}") | |
| return f"{'ATTACK' if role == 'attacker' else 'DEFEND'}: {action}" if action else "" | |