murshid / murshid_backend /app /ml /summarizer.py
devorbit's picture
Initial deployment - secrets removed
26e1c2e
"""
LLM summarizer — extracted from MurshidUIPipeline.ipynb (cells 11-12).
Converts sanitized Wazuh XML rule to a one-sentence behavior summary.
Original file is NOT modified.
"""
from __future__ import annotations
import json
import re
import unicodedata
import torch
# --------------------------------------------------------------------------
# Constants (identical to notebook)
# --------------------------------------------------------------------------
MAX_INPUT_TOKENS = 2048
MAX_NEW_TOKENS = 160
DO_SAMPLE = False
NUM_BEAMS = 4
MAX_RETRIES = 3
SYSTEM_INSTR = (
"You are a cybersecurity expert.\n"
"You will be provided with a Wazuh rule in XML format.\n"
"Write EXACTLY ONE sentence describing the observable event pattern the rule matches.\n\n"
"HARD CONSTRAINTS:\n"
'1) Output must be minified JSON only: {"summary":"..."}\n'
"2) ONE sentence only.\n"
"3) Start with one of: Detects, Monitors, Identifies, Flags, Reports, Tracks, Captures.\n"
"4) Use ONLY facts present in the XML. Describe the observable system event only.\n"
"5) Do NOT infer attacker intent, attack type, or technique.\n"
"6) Do NOT mention MITRE, ATT&CK, or attack technique names unless explicitly present in the XML.\n"
"7) Do NOT use speculative language: likely, potentially, possible, possibly, may indicate, or could indicate.\n"
"8) Length: 7 to 18 words.\n"
"9) SHOULD include a clear event type when possible.\n"
"10) Mention at least ONE concrete indicator if available (event_id, process name, file path,\n"
" registry key, service, protocol/port, URL pattern, command, username, IP).\n"
"If only a single indicator exists, still produce a complete behavior-focused sentence.\n"
)
REPAIR_HINT = (
"Your previous output was rejected.\n"
"Fix it to satisfy ALL constraints:\n"
'- Output MUST be minified JSON only: {"summary":"..."}\n'
"- One sentence only.\n"
"- Keep it behavior-focused.\n"
"- Include at least ONE concrete indicator if present in the XML.\n"
"- Do NOT add any extra text outside JSON.\n"
)
VERB_OK = ("Detects", "Monitors", "Identifies", "Flags", "Reports", "Tracks", "Captures")
JSON_OBJ_RE = re.compile(r"\{.*?\}", re.DOTALL)
BAD_INTRO_RE = re.compile(
r"^\s*(this\s+(wazuh\s+)?rule|the\s+rule|this\s+alert)\b", re.IGNORECASE
)
BAD_INTENT_RE = re.compile(r"\b(likely|potentially|possible|maybe)\b", re.IGNORECASE)
GENERIC_RE = re.compile(
r"\b(detects activity|detects suspicious activity|detects potentially suspicious activity|"
r"monitors activity|reports activity|detects an event pattern defined by the rule indicators)\b",
re.IGNORECASE,
)
# --------------------------------------------------------------------------
# Helpers (identical to notebook)
# --------------------------------------------------------------------------
def _build_prompt(rule_xml: str, tokenizer, extra_hint: str = "") -> str:
sys = SYSTEM_INSTR + (("\n" + extra_hint) if extra_hint else "")
user = f"Wazuh rule XML:\n{rule_xml}\n\nReturn JSON only:"
messages = [{"role": "system", "content": sys}, {"role": "user", "content": user}]
return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
def _looks_broken_encoding(s: str) -> bool:
return any(m in s for m in ("Ã", "Ð", "Ñ", "â", "â")) if s else False
def _try_extract_json_summary(text: str) -> str | None:
t = (text or "").strip()
if not t:
return None
if t.startswith("{") and '"summary"' in t:
try:
obj = json.loads(t)
if isinstance(obj, dict) and isinstance(obj.get("summary"), str):
return obj["summary"].strip()
except Exception:
pass
m = JSON_OBJ_RE.search(t)
if m and '"summary"' in m.group(0):
blob = m.group(0)
try:
obj = json.loads(blob)
if isinstance(obj, dict) and isinstance(obj.get("summary"), str):
return obj["summary"].strip()
except Exception:
m2 = re.search(r'"summary"\s*:\s*"([^"]+)"', blob)
if m2:
return m2.group(1).strip()
return None
def _normalize_one_sentence(s: str) -> str:
s = re.sub(r"\s+", " ", (s or "").strip()).strip()
s = unicodedata.normalize("NFKC", s)
if not s:
return ""
if BAD_INTRO_RE.match(s):
s = BAD_INTRO_RE.sub("", s).lstrip(":,- ").strip()
if not s:
return ""
if not any(s.startswith(v) for v in VERB_OK):
s = "Detects " + (s[0].lower() + s[1:]) if len(s) > 1 else ""
if not s:
return ""
m = re.search(r"[.!?](?:\s|$)", s)
s = s[: m.end()].strip() if m else s + "."
s = re.sub(r"^(Detects\s+)+", "Detects ", s).strip()
return re.sub(r"\s+", " ", s).strip()
def _looks_truncated(s: str) -> bool:
return not s or s.strip().endswith(("(", ":", " -", ","))
def _has_behavior_signal(s: str) -> bool:
kws = ["create","delete","execute","spawn","launch","login","logon","authentication",
"connect","request","query","modify","registry","process","command","file",
"service","ip","url","dns","http","vpn","account"]
return any(k in s.lower() for k in kws)
def _has_indicator_signal(s: str) -> bool:
kws = [".exe",".dll",".ps1",".bat",".cmd","powershell","cmd.exe","reg.exe","rundll32",
"svchost","registry","temp","system32","event_id","http","dns","ip","url","port","key"]
return any(k in s.lower() for k in kws)
def _is_bad(s: str) -> bool:
if not s or BAD_INTRO_RE.match(s) or BAD_INTENT_RE.search(s) or GENERIC_RE.search(s):
return True
if _looks_broken_encoding(s) or _looks_truncated(s):
return True
wc = len(s.split())
if wc < 7 or wc > 18 or not _has_behavior_signal(s):
return True
return bool((s.startswith("{") and "summary" in s) or ('"summary"' in s and "{" in s))
def _is_catastrophic(s: str) -> bool:
return not s or _looks_broken_encoding(s) or _looks_truncated(s) or len(s.split()) < 3
def _score(s: str) -> int:
wc = len(s.split())
return (
(3 if 7 <= wc <= 18 else 0)
+ (3 if _has_behavior_signal(s) else 0)
+ (2 if _has_indicator_signal(s) else 0)
+ (1 if not GENERIC_RE.search(s) else 0)
+ (1 if not BAD_INTENT_RE.search(s) else 0)
)
def _rescue_finalize(s: str) -> str:
s = _normalize_one_sentence(s)
if not s:
return "Detects rule-matched behavior."
s = re.sub(r",\s*(possibly|potentially|maybe|may)\b.*$", "", s, flags=re.IGNORECASE).strip()
s = re.sub(r"\b(possibly|potentially|maybe|may)\b", "", s, flags=re.IGNORECASE)
s = re.sub(r"\s+", " ", s).strip()
if len(s.split()) < 7:
low = s.lower()
for kw, rep in [
("powershell", "Detects powershell.exe process execution."),
("cmd", "Detects cmd.exe process execution."),
("reg", "Detects reg.exe process execution."),
("svchost", "Detects svchost.exe process execution."),
]:
if kw in low:
s = rep
break
else:
s = s.rstrip(".") + " matching rule indicators."
if _looks_truncated(s):
s = s.rstrip(".") + " matching rule indicators."
if not any(s.startswith(v) for v in VERB_OK):
s = "Detects " + s[0].lower() + s[1:] if len(s) > 1 else "Detects rule-matched behavior."
words = s.split()
if len(words) > 18:
s = " ".join(words[:18]).rstrip(".") + "."
return re.sub(r"\s+", " ", s if s.endswith(".") else s + ".").strip()
# --------------------------------------------------------------------------
# Public API
# --------------------------------------------------------------------------
def summarize_one_rule(rule_xml: str, model, tokenizer, device: str | None = None) -> str:
"""Generate a one-sentence summary for a sanitized Wazuh rule XML string."""
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
pad_id = tokenizer.pad_token_id or tokenizer.eos_token_id
eos_id = tokenizer.eos_token_id or pad_id
best: str | None = None
best_any: str | None = None
last_raw = ""
last_cleaned = ""
for attempt in range(1, MAX_RETRIES + 1):
prompt = _build_prompt(
rule_xml, tokenizer, extra_hint=REPAIR_HINT if attempt >= 2 else ""
)
inputs = tokenizer(
prompt, return_tensors="pt", truncation=True, max_length=MAX_INPUT_TOKENS
).to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=MAX_NEW_TOKENS,
do_sample=DO_SAMPLE,
num_beams=NUM_BEAMS,
pad_token_id=pad_id,
eos_token_id=eos_id,
repetition_penalty=1.05,
no_repeat_ngram_size=3,
)
raw = tokenizer.decode(
outputs[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True
).strip()
last_raw = raw
parsed = _try_extract_json_summary(raw)
if parsed is None:
continue
cleaned = _normalize_one_sentence(parsed)
last_cleaned = cleaned
if cleaned and not _is_catastrophic(cleaned):
if best_any is None or _score(cleaned) > _score(best_any):
best_any = cleaned
if not _is_bad(cleaned):
best = cleaned
break
if best is None:
if best_any and not _is_catastrophic(best_any):
best = best_any
else:
src = last_cleaned or _try_extract_json_summary(last_raw) or last_raw
best = _rescue_finalize(src)
return best