Spaces:
Running
Running
| """Parser agent: raw logs -> structured SecurityEvent.""" | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from models.schemas import RawLogIngest, SecurityEvent, Severity | |
| _SSH_FAIL = re.compile( | |
| r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Failed password for (?:invalid user )?(?P<user>\S+) from (?P<ip>[\d.]+)" | |
| ) | |
| _SSH_OK = re.compile( | |
| r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Accepted (?P<method>\S+) for (?P<user>\S+) from (?P<ip>[\d.]+)" | |
| ) | |
| _SUDO = re.compile( | |
| r"^(?P<ts>\w+\s+\d+\s+\d+:\d+:\d+).+sudo.+USER=(?P<target>\S+).+COMMAND=(?P<cmd>.+)" | |
| ) | |
| _NGINX = re.compile( | |
| r'^(?P<ip>[\d.]+).+\[(?P<date>[^\]]+)\]."(?P<method>\S+)\s+(?P<path>\S+).*" (?P<code>\d+)' | |
| ) | |
| _K8S = re.compile(r"type=(?P<type>\S+)\s+reason=(?P<reason>\S+).*name=(?P<name>\S+)") | |
| def _parse_ts_ssh(s: str) -> datetime: | |
| """Best-effort parse for auth.log style timestamps (current year).""" | |
| now = datetime.now(timezone.utc) | |
| try: | |
| dt = datetime.strptime(f"{now.year} {s}", "%Y %b %d %H:%M:%S") | |
| return dt.replace(tzinfo=timezone.utc) | |
| except ValueError: | |
| return now | |
| def parse_raw(ingest: RawLogIngest) -> SecurityEvent: | |
| line = ingest.raw_line.strip() | |
| meta = ingest.metadata or {} | |
| if ingest.source == "json" or line.startswith("{"): | |
| try: | |
| data = json.loads(line) | |
| sev_raw = str(data.get("severity", "info")).lower() | |
| try: | |
| sev = Severity(sev_raw) | |
| except ValueError: | |
| sev = Severity.INFO | |
| return SecurityEvent( | |
| timestamp=_parse_iso(data.get("timestamp")), | |
| event_type=str(data.get("event_type", "generic")), | |
| source_ip=data.get("source_ip"), | |
| host=str(data.get("host", meta.get("host", "unknown"))), | |
| severity=sev, | |
| message=str(data.get("message", line)), | |
| raw={"source": ingest.source, "line": line}, | |
| normalized=data, | |
| ) | |
| except json.JSONDecodeError: | |
| pass | |
| m = _SSH_FAIL.search(line) | |
| if m: | |
| return SecurityEvent( | |
| timestamp=_parse_ts_ssh(m.group("ts")), | |
| event_type="auth.ssh_failed", | |
| source_ip=m.group("ip"), | |
| host=meta.get("host", "edge-01"), | |
| severity=Severity.MEDIUM, | |
| message=f"Failed SSH for {m.group('user')} from {m.group('ip')}", | |
| raw={"source": ingest.source, "line": line}, | |
| normalized={"user": m.group("user")}, | |
| ) | |
| m = _SSH_OK.search(line) | |
| if m: | |
| return SecurityEvent( | |
| timestamp=_parse_ts_ssh(m.group("ts")), | |
| event_type="auth.ssh_success", | |
| source_ip=m.group("ip"), | |
| host=meta.get("host", "edge-01"), | |
| severity=Severity.LOW, | |
| message=f"Accepted SSH for {m.group('user')} via {m.group('method')}", | |
| raw={"source": ingest.source, "line": line}, | |
| normalized={"user": m.group("user"), "method": m.group("method")}, | |
| ) | |
| m = _SUDO.search(line) | |
| if m: | |
| return SecurityEvent( | |
| timestamp=_parse_ts_ssh(m.group("ts")), | |
| event_type="privilege.sudo", | |
| source_ip=meta.get("source_ip"), | |
| host=meta.get("host", "edge-01"), | |
| severity=Severity.HIGH, | |
| message=f"sudo escalation to {m.group('target')}: {m.group('cmd')[:120]}", | |
| raw={"source": ingest.source, "line": line}, | |
| normalized={"target_user": m.group("target"), "command": m.group("cmd")}, | |
| ) | |
| m = _NGINX.search(line) | |
| if m: | |
| code = int(m.group("code")) | |
| sev = Severity.HIGH if code in (401, 403) else Severity.INFO | |
| return SecurityEvent( | |
| timestamp=_parse_nginx_date(m.group("date")), | |
| event_type="web.request", | |
| source_ip=m.group("ip"), | |
| host=meta.get("host", "web-01"), | |
| severity=sev, | |
| message=f"{m.group('method')} {m.group('path')} -> {code}", | |
| raw={"source": ingest.source, "line": line}, | |
| normalized={"path": m.group("path"), "status": code}, | |
| ) | |
| m = _K8S.search(line) | |
| if m: | |
| return SecurityEvent( | |
| timestamp=datetime.now(timezone.utc), | |
| event_type="k8s.event", | |
| source_ip=None, | |
| host=meta.get("host", "k8s-api"), | |
| severity=Severity.MEDIUM, | |
| message=f"K8s {m.group('type')} {m.group('reason')} {m.group('name')}", | |
| raw={"source": ingest.source, "line": line}, | |
| normalized={"k8s_type": m.group("type"), "reason": m.group("reason")}, | |
| ) | |
| return SecurityEvent( | |
| timestamp=datetime.now(timezone.utc), | |
| event_type="generic.syslog", | |
| source_ip=meta.get("source_ip"), | |
| host=meta.get("host", "unknown"), | |
| severity=Severity.INFO, | |
| message=line[:512], | |
| raw={"source": ingest.source, "line": line}, | |
| normalized={}, | |
| ) | |
| def _parse_iso(ts: Any) -> datetime: | |
| if ts is None: | |
| return datetime.now(timezone.utc) | |
| if isinstance(ts, datetime): | |
| return ts if ts.tzinfo else ts.replace(tzinfo=timezone.utc) | |
| try: | |
| return datetime.fromisoformat(str(ts).replace("Z", "+00:00")) | |
| except ValueError: | |
| return datetime.now(timezone.utc) | |
| def _parse_nginx_date(s: str) -> datetime: | |
| try: | |
| return datetime.strptime(s.split()[0], "%d/%b/%Y:%H:%M:%S").replace(tzinfo=timezone.utc) | |
| except (ValueError, IndexError): | |
| return datetime.now(timezone.utc) | |