File size: 5,627 Bytes
8b3905d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Parser agent: raw logs -> structured SecurityEvent."""

from __future__ import annotations

import json
import re
from datetime import datetime, timezone
from typing import Any

from models.schemas import RawLogIngest, SecurityEvent, Severity


_SSH_FAIL = re.compile(
    r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Failed password for (?:invalid user )?(?P<user>\S+) from (?P<ip>[\d.]+)"
)
_SSH_OK = re.compile(
    r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Accepted (?P<method>\S+) for (?P<user>\S+) from (?P<ip>[\d.]+)"
)
_SUDO = re.compile(
    r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sudo.+USER=(?P<target>\S+).+COMMAND=(?P<cmd>.+)"
)
_NGINX = re.compile(
    r'^(?P<ip>[\d.]+).+\[(?P<date>[^\]]+)\]."(?P<method>\S+)\s+(?P<path>\S+).*" (?P<code>\d+)'
)
_K8S = re.compile(r"type=(?P<type>\S+)\s+reason=(?P<reason>\S+).*name=(?P<name>\S+)")


def _parse_ts_ssh(s: str) -> datetime:
    """Best-effort parse for auth.log style timestamps (current year)."""
    now = datetime.now(timezone.utc)
    try:
        dt = datetime.strptime(f"{now.year} {s}", "%Y %b %d %H:%M:%S")
        return dt.replace(tzinfo=timezone.utc)
    except ValueError:
        return now


def parse_raw(ingest: RawLogIngest) -> SecurityEvent:
    line = ingest.raw_line.strip()
    meta = ingest.metadata or {}

    if ingest.source == "json" or line.startswith("{"):
        try:
            data = json.loads(line)
            sev_raw = str(data.get("severity", "info")).lower()
            try:
                sev = Severity(sev_raw)
            except ValueError:
                sev = Severity.INFO
            return SecurityEvent(
                timestamp=_parse_iso(data.get("timestamp")),
                event_type=str(data.get("event_type", "generic")),
                source_ip=data.get("source_ip"),
                host=str(data.get("host", meta.get("host", "unknown"))),
                severity=sev,
                message=str(data.get("message", line)),
                raw={"source": ingest.source, "line": line},
                normalized=data,
            )
        except json.JSONDecodeError:
            pass

    m = _SSH_FAIL.search(line)
    if m:
        return SecurityEvent(
            timestamp=_parse_ts_ssh(m.group("ts")),
            event_type="auth.ssh_failed",
            source_ip=m.group("ip"),
            host=meta.get("host", "edge-01"),
            severity=Severity.MEDIUM,
            message=f"Failed SSH for {m.group('user')} from {m.group('ip')}",
            raw={"source": ingest.source, "line": line},
            normalized={"user": m.group("user")},
        )

    m = _SSH_OK.search(line)
    if m:
        return SecurityEvent(
            timestamp=_parse_ts_ssh(m.group("ts")),
            event_type="auth.ssh_success",
            source_ip=m.group("ip"),
            host=meta.get("host", "edge-01"),
            severity=Severity.LOW,
            message=f"Accepted SSH for {m.group('user')} via {m.group('method')}",
            raw={"source": ingest.source, "line": line},
            normalized={"user": m.group("user"), "method": m.group("method")},
        )

    m = _SUDO.search(line)
    if m:
        return SecurityEvent(
            timestamp=_parse_ts_ssh(m.group("ts")),
            event_type="privilege.sudo",
            source_ip=meta.get("source_ip"),
            host=meta.get("host", "edge-01"),
            severity=Severity.HIGH,
            message=f"sudo escalation to {m.group('target')}: {m.group('cmd')[:120]}",
            raw={"source": ingest.source, "line": line},
            normalized={"target_user": m.group("target"), "command": m.group("cmd")},
        )

    m = _NGINX.search(line)
    if m:
        code = int(m.group("code"))
        sev = Severity.HIGH if code in (401, 403) else Severity.INFO
        return SecurityEvent(
            timestamp=_parse_nginx_date(m.group("date")),
            event_type="web.request",
            source_ip=m.group("ip"),
            host=meta.get("host", "web-01"),
            severity=sev,
            message=f"{m.group('method')} {m.group('path')} -> {code}",
            raw={"source": ingest.source, "line": line},
            normalized={"path": m.group("path"), "status": code},
        )

    m = _K8S.search(line)
    if m:
        return SecurityEvent(
            timestamp=datetime.now(timezone.utc),
            event_type="k8s.event",
            source_ip=None,
            host=meta.get("host", "k8s-api"),
            severity=Severity.MEDIUM,
            message=f"K8s {m.group('type')} {m.group('reason')} {m.group('name')}",
            raw={"source": ingest.source, "line": line},
            normalized={"k8s_type": m.group("type"), "reason": m.group("reason")},
        )

    return SecurityEvent(
        timestamp=datetime.now(timezone.utc),
        event_type="generic.syslog",
        source_ip=meta.get("source_ip"),
        host=meta.get("host", "unknown"),
        severity=Severity.INFO,
        message=line[:512],
        raw={"source": ingest.source, "line": line},
        normalized={},
    )


def _parse_iso(ts: Any) -> datetime:
    if ts is None:
        return datetime.now(timezone.utc)
    if isinstance(ts, datetime):
        return ts if ts.tzinfo else ts.replace(tzinfo=timezone.utc)
    try:
        return datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
    except ValueError:
        return datetime.now(timezone.utc)


def _parse_nginx_date(s: str) -> datetime:
    try:
        return datetime.strptime(s.split()[0], "%d/%b/%Y:%H:%M:%S").replace(tzinfo=timezone.utc)
    except (ValueError, IndexError):
        return datetime.now(timezone.utc)