""" LLM summarizer — extracted from MurshidUIPipeline.ipynb (cells 11-12). Converts sanitized Wazuh XML rule to a one-sentence behavior summary. Original file is NOT modified. """ from __future__ import annotations import json import re import unicodedata import torch # -------------------------------------------------------------------------- # Constants (identical to notebook) # -------------------------------------------------------------------------- MAX_INPUT_TOKENS = 2048 MAX_NEW_TOKENS = 160 DO_SAMPLE = False NUM_BEAMS = 4 MAX_RETRIES = 3 SYSTEM_INSTR = ( "You are a cybersecurity expert.\n" "You will be provided with a Wazuh rule in XML format.\n" "Write EXACTLY ONE sentence describing the observable event pattern the rule matches.\n\n" "HARD CONSTRAINTS:\n" '1) Output must be minified JSON only: {"summary":"..."}\n' "2) ONE sentence only.\n" "3) Start with one of: Detects, Monitors, Identifies, Flags, Reports, Tracks, Captures.\n" "4) Use ONLY facts present in the XML. Describe the observable system event only.\n" "5) Do NOT infer attacker intent, attack type, or technique.\n" "6) Do NOT mention MITRE, ATT&CK, or attack technique names unless explicitly present in the XML.\n" "7) Do NOT use speculative language: likely, potentially, possible, possibly, may indicate, or could indicate.\n" "8) Length: 7 to 18 words.\n" "9) SHOULD include a clear event type when possible.\n" "10) Mention at least ONE concrete indicator if available (event_id, process name, file path,\n" " registry key, service, protocol/port, URL pattern, command, username, IP).\n" "If only a single indicator exists, still produce a complete behavior-focused sentence.\n" ) REPAIR_HINT = ( "Your previous output was rejected.\n" "Fix it to satisfy ALL constraints:\n" '- Output MUST be minified JSON only: {"summary":"..."}\n' "- One sentence only.\n" "- Keep it behavior-focused.\n" "- Include at least ONE concrete indicator if present in the XML.\n" "- Do NOT add any extra text outside JSON.\n" ) VERB_OK = ("Detects", "Monitors", "Identifies", "Flags", "Reports", "Tracks", "Captures") JSON_OBJ_RE = re.compile(r"\{.*?\}", re.DOTALL) BAD_INTRO_RE = re.compile( r"^\s*(this\s+(wazuh\s+)?rule|the\s+rule|this\s+alert)\b", re.IGNORECASE ) BAD_INTENT_RE = re.compile(r"\b(likely|potentially|possible|maybe)\b", re.IGNORECASE) GENERIC_RE = re.compile( r"\b(detects activity|detects suspicious activity|detects potentially suspicious activity|" r"monitors activity|reports activity|detects an event pattern defined by the rule indicators)\b", re.IGNORECASE, ) # -------------------------------------------------------------------------- # Helpers (identical to notebook) # -------------------------------------------------------------------------- def _build_prompt(rule_xml: str, tokenizer, extra_hint: str = "") -> str: sys = SYSTEM_INSTR + (("\n" + extra_hint) if extra_hint else "") user = f"Wazuh rule XML:\n{rule_xml}\n\nReturn JSON only:" messages = [{"role": "system", "content": sys}, {"role": "user", "content": user}] return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) def _looks_broken_encoding(s: str) -> bool: return any(m in s for m in ("Ã", "Ð", "Ñ", "â", "â")) if s else False def _try_extract_json_summary(text: str) -> str | None: t = (text or "").strip() if not t: return None if t.startswith("{") and '"summary"' in t: try: obj = json.loads(t) if isinstance(obj, dict) and isinstance(obj.get("summary"), str): return obj["summary"].strip() except Exception: pass m = JSON_OBJ_RE.search(t) if m and '"summary"' in m.group(0): blob = m.group(0) try: obj = json.loads(blob) if isinstance(obj, dict) and isinstance(obj.get("summary"), str): return obj["summary"].strip() except Exception: m2 = re.search(r'"summary"\s*:\s*"([^"]+)"', blob) if m2: return m2.group(1).strip() return None def _normalize_one_sentence(s: str) -> str: s = re.sub(r"\s+", " ", (s or "").strip()).strip() s = unicodedata.normalize("NFKC", s) if not s: return "" if BAD_INTRO_RE.match(s): s = BAD_INTRO_RE.sub("", s).lstrip(":,- ").strip() if not s: return "" if not any(s.startswith(v) for v in VERB_OK): s = "Detects " + (s[0].lower() + s[1:]) if len(s) > 1 else "" if not s: return "" m = re.search(r"[.!?](?:\s|$)", s) s = s[: m.end()].strip() if m else s + "." s = re.sub(r"^(Detects\s+)+", "Detects ", s).strip() return re.sub(r"\s+", " ", s).strip() def _looks_truncated(s: str) -> bool: return not s or s.strip().endswith(("(", ":", " -", ",")) def _has_behavior_signal(s: str) -> bool: kws = ["create","delete","execute","spawn","launch","login","logon","authentication", "connect","request","query","modify","registry","process","command","file", "service","ip","url","dns","http","vpn","account"] return any(k in s.lower() for k in kws) def _has_indicator_signal(s: str) -> bool: kws = [".exe",".dll",".ps1",".bat",".cmd","powershell","cmd.exe","reg.exe","rundll32", "svchost","registry","temp","system32","event_id","http","dns","ip","url","port","key"] return any(k in s.lower() for k in kws) def _is_bad(s: str) -> bool: if not s or BAD_INTRO_RE.match(s) or BAD_INTENT_RE.search(s) or GENERIC_RE.search(s): return True if _looks_broken_encoding(s) or _looks_truncated(s): return True wc = len(s.split()) if wc < 7 or wc > 18 or not _has_behavior_signal(s): return True return bool((s.startswith("{") and "summary" in s) or ('"summary"' in s and "{" in s)) def _is_catastrophic(s: str) -> bool: return not s or _looks_broken_encoding(s) or _looks_truncated(s) or len(s.split()) < 3 def _score(s: str) -> int: wc = len(s.split()) return ( (3 if 7 <= wc <= 18 else 0) + (3 if _has_behavior_signal(s) else 0) + (2 if _has_indicator_signal(s) else 0) + (1 if not GENERIC_RE.search(s) else 0) + (1 if not BAD_INTENT_RE.search(s) else 0) ) def _rescue_finalize(s: str) -> str: s = _normalize_one_sentence(s) if not s: return "Detects rule-matched behavior." s = re.sub(r",\s*(possibly|potentially|maybe|may)\b.*$", "", s, flags=re.IGNORECASE).strip() s = re.sub(r"\b(possibly|potentially|maybe|may)\b", "", s, flags=re.IGNORECASE) s = re.sub(r"\s+", " ", s).strip() if len(s.split()) < 7: low = s.lower() for kw, rep in [ ("powershell", "Detects powershell.exe process execution."), ("cmd", "Detects cmd.exe process execution."), ("reg", "Detects reg.exe process execution."), ("svchost", "Detects svchost.exe process execution."), ]: if kw in low: s = rep break else: s = s.rstrip(".") + " matching rule indicators." if _looks_truncated(s): s = s.rstrip(".") + " matching rule indicators." if not any(s.startswith(v) for v in VERB_OK): s = "Detects " + s[0].lower() + s[1:] if len(s) > 1 else "Detects rule-matched behavior." words = s.split() if len(words) > 18: s = " ".join(words[:18]).rstrip(".") + "." return re.sub(r"\s+", " ", s if s.endswith(".") else s + ".").strip() # -------------------------------------------------------------------------- # Public API # -------------------------------------------------------------------------- def summarize_one_rule(rule_xml: str, model, tokenizer, device: str | None = None) -> str: """Generate a one-sentence summary for a sanitized Wazuh rule XML string.""" if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" pad_id = tokenizer.pad_token_id or tokenizer.eos_token_id eos_id = tokenizer.eos_token_id or pad_id best: str | None = None best_any: str | None = None last_raw = "" last_cleaned = "" for attempt in range(1, MAX_RETRIES + 1): prompt = _build_prompt( rule_xml, tokenizer, extra_hint=REPAIR_HINT if attempt >= 2 else "" ) inputs = tokenizer( prompt, return_tensors="pt", truncation=True, max_length=MAX_INPUT_TOKENS ).to(device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=MAX_NEW_TOKENS, do_sample=DO_SAMPLE, num_beams=NUM_BEAMS, pad_token_id=pad_id, eos_token_id=eos_id, repetition_penalty=1.05, no_repeat_ngram_size=3, ) raw = tokenizer.decode( outputs[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True ).strip() last_raw = raw parsed = _try_extract_json_summary(raw) if parsed is None: continue cleaned = _normalize_one_sentence(parsed) last_cleaned = cleaned if cleaned and not _is_catastrophic(cleaned): if best_any is None or _score(cleaned) > _score(best_any): best_any = cleaned if not _is_bad(cleaned): best = cleaned break if best is None: if best_any and not _is_catastrophic(best_any): best = best_any else: src = last_cleaned or _try_extract_json_summary(last_raw) or last_raw best = _rescue_finalize(src) return best