File size: 3,803 Bytes
124ea58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Preprocessing pipeline for logs, JSON alerts, and raw text.
Prepares input for narrative generation and entity extraction.
"""

import json
import re
from typing import Any


def parse_input(raw: str) -> dict[str, Any]:
    """
    Parse user input (text or JSON) into a normalized structure.
    
    Returns:
        dict with keys: type, content, lines, parsed (if JSON)
    """
    raw = raw.strip() or ""
    if not raw:
        return {"type": "empty", "content": "", "lines": []}

    # Try JSON first
    try:
        parsed = json.loads(raw)
        if isinstance(parsed, list):
            lines = [json.dumps(item) if isinstance(item, dict) else str(item) for item in parsed]
        elif isinstance(parsed, dict):
            lines = [raw]
        else:
            lines = [str(parsed)]
        return {
            "type": "json",
            "content": raw,
            "parsed": parsed,
            "lines": lines,
        }
    except json.JSONDecodeError:
        pass

    # Treat as plain text / log lines
    lines = [ln.strip() for ln in raw.splitlines() if ln.strip()]
    return {
        "type": "text",
        "content": raw,
        "lines": lines,
    }


def extract_iocs(text: str) -> list[str]:
    """Extract potential IOCs (IPs, domains, hashes, URLs) using regex."""
    iocs: list[str] = []
    
    # IPv4
    for m in re.finditer(r"\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b", text):
        iocs.append(m.group())
    
    # MD5
    for m in re.finditer(r"\b[a-fA-F0-9]{32}\b", text):
        iocs.append(m.group())
    
    # SHA256
    for m in re.finditer(r"\b[a-fA-F0-9]{64}\b", text):
        iocs.append(m.group())
    
    # Domains (simple)
    for m in re.finditer(r"\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b", text):
        iocs.append(m.group())
    
    # CVE IDs
    for m in re.finditer(r"CVE-\d{4}-\d{4,}", text, re.I):
        iocs.append(m.group())
    
    # MITRE TTP (T####)
    for m in re.finditer(r"T\d{4}(?:\.\d{3})?", text):
        iocs.append(m.group())
    
    return list(dict.fromkeys(iocs))


def parse_alert(alert: dict[str, Any]) -> str:
    """Convert alert dict into a flat text string for model input."""
    parts: list[str] = []
    for k, v in alert.items():
        if isinstance(v, (dict, list)):
            parts.extend(flatten_json(v, k))
        else:
            parts.append(f"{k}={v}")
    return " | ".join(parts)


def preprocess_for_model(raw: str) -> str:
    """
    Full preprocessing pipeline: parse logs/JSON/alerts -> single text string.
    Use as model input.
    """
    parsed = parse_input(raw)
    if parsed["type"] == "empty":
        return ""
    lines = parsed["lines"]
    if parsed["type"] == "json":
        if isinstance(parsed.get("parsed"), list):
            texts = []
            for item in parsed["parsed"]:
                if isinstance(item, dict):
                    texts.append(parse_alert(item))
                else:
                    texts.append(str(item))
            return "\n".join(texts)
        elif isinstance(parsed.get("parsed"), dict):
            return parse_alert(parsed["parsed"])
    return parsed["content"]


def flatten_json(obj: Any, prefix: str = "") -> list[str]:
    """Flatten JSON object into key=value log-like strings."""
    out: list[str] = []
    if isinstance(obj, dict):
        for k, v in obj.items():
            key = f"{prefix}.{k}" if prefix else k
            out.extend(flatten_json(v, key))
    elif isinstance(obj, list):
        for i, v in enumerate(obj):
            key = f"{prefix}[{i}]" if prefix else f"[{i}]"
            out.extend(flatten_json(v, key))
    else:
        out.append(f"{prefix}={obj}" if prefix else str(obj))
    return out