| """ |
| LLM summarizer — extracted from MurshidUIPipeline.ipynb (cells 11-12). |
| Converts sanitized Wazuh XML rule to a one-sentence behavior summary. |
| Original file is NOT modified. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import re |
| import unicodedata |
|
|
| import torch |
|
|
| |
| |
| |
| MAX_INPUT_TOKENS = 2048 |
| MAX_NEW_TOKENS = 160 |
| DO_SAMPLE = False |
| NUM_BEAMS = 4 |
| MAX_RETRIES = 3 |
|
|
| SYSTEM_INSTR = ( |
| "You are a cybersecurity expert.\n" |
| "You will be provided with a Wazuh rule in XML format.\n" |
| "Write EXACTLY ONE sentence describing the observable event pattern the rule matches.\n\n" |
| "HARD CONSTRAINTS:\n" |
| '1) Output must be minified JSON only: {"summary":"..."}\n' |
| "2) ONE sentence only.\n" |
| "3) Start with one of: Detects, Monitors, Identifies, Flags, Reports, Tracks, Captures.\n" |
| "4) Use ONLY facts present in the XML. Describe the observable system event only.\n" |
| "5) Do NOT infer attacker intent, attack type, or technique.\n" |
| "6) Do NOT mention MITRE, ATT&CK, or attack technique names unless explicitly present in the XML.\n" |
| "7) Do NOT use speculative language: likely, potentially, possible, possibly, may indicate, or could indicate.\n" |
| "8) Length: 7 to 18 words.\n" |
| "9) SHOULD include a clear event type when possible.\n" |
| "10) Mention at least ONE concrete indicator if available (event_id, process name, file path,\n" |
| " registry key, service, protocol/port, URL pattern, command, username, IP).\n" |
| "If only a single indicator exists, still produce a complete behavior-focused sentence.\n" |
| ) |
|
|
| REPAIR_HINT = ( |
| "Your previous output was rejected.\n" |
| "Fix it to satisfy ALL constraints:\n" |
| '- Output MUST be minified JSON only: {"summary":"..."}\n' |
| "- One sentence only.\n" |
| "- Keep it behavior-focused.\n" |
| "- Include at least ONE concrete indicator if present in the XML.\n" |
| "- Do NOT add any extra text outside JSON.\n" |
| ) |
|
|
| VERB_OK = ("Detects", "Monitors", "Identifies", "Flags", "Reports", "Tracks", "Captures") |
| JSON_OBJ_RE = re.compile(r"\{.*?\}", re.DOTALL) |
| BAD_INTRO_RE = re.compile( |
| r"^\s*(this\s+(wazuh\s+)?rule|the\s+rule|this\s+alert)\b", re.IGNORECASE |
| ) |
| BAD_INTENT_RE = re.compile(r"\b(likely|potentially|possible|maybe)\b", re.IGNORECASE) |
| GENERIC_RE = re.compile( |
| r"\b(detects activity|detects suspicious activity|detects potentially suspicious activity|" |
| r"monitors activity|reports activity|detects an event pattern defined by the rule indicators)\b", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _build_prompt(rule_xml: str, tokenizer, extra_hint: str = "") -> str: |
| sys = SYSTEM_INSTR + (("\n" + extra_hint) if extra_hint else "") |
| user = f"Wazuh rule XML:\n{rule_xml}\n\nReturn JSON only:" |
| messages = [{"role": "system", "content": sys}, {"role": "user", "content": user}] |
| return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
|
|
| def _looks_broken_encoding(s: str) -> bool: |
| return any(m in s for m in ("Ã", "Ð", "Ñ", "â", "â")) if s else False |
|
|
|
|
| def _try_extract_json_summary(text: str) -> str | None: |
| t = (text or "").strip() |
| if not t: |
| return None |
| if t.startswith("{") and '"summary"' in t: |
| try: |
| obj = json.loads(t) |
| if isinstance(obj, dict) and isinstance(obj.get("summary"), str): |
| return obj["summary"].strip() |
| except Exception: |
| pass |
| m = JSON_OBJ_RE.search(t) |
| if m and '"summary"' in m.group(0): |
| blob = m.group(0) |
| try: |
| obj = json.loads(blob) |
| if isinstance(obj, dict) and isinstance(obj.get("summary"), str): |
| return obj["summary"].strip() |
| except Exception: |
| m2 = re.search(r'"summary"\s*:\s*"([^"]+)"', blob) |
| if m2: |
| return m2.group(1).strip() |
| return None |
|
|
|
|
| def _normalize_one_sentence(s: str) -> str: |
| s = re.sub(r"\s+", " ", (s or "").strip()).strip() |
| s = unicodedata.normalize("NFKC", s) |
| if not s: |
| return "" |
| if BAD_INTRO_RE.match(s): |
| s = BAD_INTRO_RE.sub("", s).lstrip(":,- ").strip() |
| if not s: |
| return "" |
| if not any(s.startswith(v) for v in VERB_OK): |
| s = "Detects " + (s[0].lower() + s[1:]) if len(s) > 1 else "" |
| if not s: |
| return "" |
| m = re.search(r"[.!?](?:\s|$)", s) |
| s = s[: m.end()].strip() if m else s + "." |
| s = re.sub(r"^(Detects\s+)+", "Detects ", s).strip() |
| return re.sub(r"\s+", " ", s).strip() |
|
|
|
|
| def _looks_truncated(s: str) -> bool: |
| return not s or s.strip().endswith(("(", ":", " -", ",")) |
|
|
|
|
| def _has_behavior_signal(s: str) -> bool: |
| kws = ["create","delete","execute","spawn","launch","login","logon","authentication", |
| "connect","request","query","modify","registry","process","command","file", |
| "service","ip","url","dns","http","vpn","account"] |
| return any(k in s.lower() for k in kws) |
|
|
|
|
| def _has_indicator_signal(s: str) -> bool: |
| kws = [".exe",".dll",".ps1",".bat",".cmd","powershell","cmd.exe","reg.exe","rundll32", |
| "svchost","registry","temp","system32","event_id","http","dns","ip","url","port","key"] |
| return any(k in s.lower() for k in kws) |
|
|
|
|
| def _is_bad(s: str) -> bool: |
| if not s or BAD_INTRO_RE.match(s) or BAD_INTENT_RE.search(s) or GENERIC_RE.search(s): |
| return True |
| if _looks_broken_encoding(s) or _looks_truncated(s): |
| return True |
| wc = len(s.split()) |
| if wc < 7 or wc > 18 or not _has_behavior_signal(s): |
| return True |
| return bool((s.startswith("{") and "summary" in s) or ('"summary"' in s and "{" in s)) |
|
|
|
|
| def _is_catastrophic(s: str) -> bool: |
| return not s or _looks_broken_encoding(s) or _looks_truncated(s) or len(s.split()) < 3 |
|
|
|
|
| def _score(s: str) -> int: |
| wc = len(s.split()) |
| return ( |
| (3 if 7 <= wc <= 18 else 0) |
| + (3 if _has_behavior_signal(s) else 0) |
| + (2 if _has_indicator_signal(s) else 0) |
| + (1 if not GENERIC_RE.search(s) else 0) |
| + (1 if not BAD_INTENT_RE.search(s) else 0) |
| ) |
|
|
|
|
| def _rescue_finalize(s: str) -> str: |
| s = _normalize_one_sentence(s) |
| if not s: |
| return "Detects rule-matched behavior." |
| s = re.sub(r",\s*(possibly|potentially|maybe|may)\b.*$", "", s, flags=re.IGNORECASE).strip() |
| s = re.sub(r"\b(possibly|potentially|maybe|may)\b", "", s, flags=re.IGNORECASE) |
| s = re.sub(r"\s+", " ", s).strip() |
| if len(s.split()) < 7: |
| low = s.lower() |
| for kw, rep in [ |
| ("powershell", "Detects powershell.exe process execution."), |
| ("cmd", "Detects cmd.exe process execution."), |
| ("reg", "Detects reg.exe process execution."), |
| ("svchost", "Detects svchost.exe process execution."), |
| ]: |
| if kw in low: |
| s = rep |
| break |
| else: |
| s = s.rstrip(".") + " matching rule indicators." |
| if _looks_truncated(s): |
| s = s.rstrip(".") + " matching rule indicators." |
| if not any(s.startswith(v) for v in VERB_OK): |
| s = "Detects " + s[0].lower() + s[1:] if len(s) > 1 else "Detects rule-matched behavior." |
| words = s.split() |
| if len(words) > 18: |
| s = " ".join(words[:18]).rstrip(".") + "." |
| return re.sub(r"\s+", " ", s if s.endswith(".") else s + ".").strip() |
|
|
|
|
| |
| |
| |
|
|
| def summarize_one_rule(rule_xml: str, model, tokenizer, device: str | None = None) -> str: |
| """Generate a one-sentence summary for a sanitized Wazuh rule XML string.""" |
| if device is None: |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| pad_id = tokenizer.pad_token_id or tokenizer.eos_token_id |
| eos_id = tokenizer.eos_token_id or pad_id |
|
|
| best: str | None = None |
| best_any: str | None = None |
| last_raw = "" |
| last_cleaned = "" |
|
|
| for attempt in range(1, MAX_RETRIES + 1): |
| prompt = _build_prompt( |
| rule_xml, tokenizer, extra_hint=REPAIR_HINT if attempt >= 2 else "" |
| ) |
| inputs = tokenizer( |
| prompt, return_tensors="pt", truncation=True, max_length=MAX_INPUT_TOKENS |
| ).to(device) |
|
|
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=MAX_NEW_TOKENS, |
| do_sample=DO_SAMPLE, |
| num_beams=NUM_BEAMS, |
| pad_token_id=pad_id, |
| eos_token_id=eos_id, |
| repetition_penalty=1.05, |
| no_repeat_ngram_size=3, |
| ) |
|
|
| raw = tokenizer.decode( |
| outputs[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True |
| ).strip() |
| last_raw = raw |
|
|
| parsed = _try_extract_json_summary(raw) |
| if parsed is None: |
| continue |
|
|
| cleaned = _normalize_one_sentence(parsed) |
| last_cleaned = cleaned |
|
|
| if cleaned and not _is_catastrophic(cleaned): |
| if best_any is None or _score(cleaned) > _score(best_any): |
| best_any = cleaned |
|
|
| if not _is_bad(cleaned): |
| best = cleaned |
| break |
|
|
| if best is None: |
| if best_any and not _is_catastrophic(best_any): |
| best = best_any |
| else: |
| src = last_cleaned or _try_extract_json_summary(last_raw) or last_raw |
| best = _rescue_finalize(src) |
|
|
| return best |
|
|