"""Parser agent: raw logs -> structured SecurityEvent.""" from __future__ import annotations import json import re from datetime import datetime, timezone from typing import Any from models.schemas import RawLogIngest, SecurityEvent, Severity _SSH_FAIL = re.compile( r"^(?P\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Failed password for (?:invalid user )?(?P\S+) from (?P[\d.]+)" ) _SSH_OK = re.compile( r"^(?P\w+\s+\d+\s+\d+:\d+:\d+).+sshd.*Accepted (?P\S+) for (?P\S+) from (?P[\d.]+)" ) _SUDO = re.compile( r"^(?P\w+\s+\d+\s+\d+:\d+:\d+).+sudo.+USER=(?P\S+).+COMMAND=(?P.+)" ) _NGINX = re.compile( r'^(?P[\d.]+).+\[(?P[^\]]+)\]."(?P\S+)\s+(?P\S+).*" (?P\d+)' ) _K8S = re.compile(r"type=(?P\S+)\s+reason=(?P\S+).*name=(?P\S+)") def _parse_ts_ssh(s: str) -> datetime: """Best-effort parse for auth.log style timestamps (current year).""" now = datetime.now(timezone.utc) try: dt = datetime.strptime(f"{now.year} {s}", "%Y %b %d %H:%M:%S") return dt.replace(tzinfo=timezone.utc) except ValueError: return now def parse_raw(ingest: RawLogIngest) -> SecurityEvent: line = ingest.raw_line.strip() meta = ingest.metadata or {} if ingest.source == "json" or line.startswith("{"): try: data = json.loads(line) sev_raw = str(data.get("severity", "info")).lower() try: sev = Severity(sev_raw) except ValueError: sev = Severity.INFO return SecurityEvent( timestamp=_parse_iso(data.get("timestamp")), event_type=str(data.get("event_type", "generic")), source_ip=data.get("source_ip"), host=str(data.get("host", meta.get("host", "unknown"))), severity=sev, message=str(data.get("message", line)), raw={"source": ingest.source, "line": line}, normalized=data, ) except json.JSONDecodeError: pass m = _SSH_FAIL.search(line) if m: return SecurityEvent( timestamp=_parse_ts_ssh(m.group("ts")), event_type="auth.ssh_failed", source_ip=m.group("ip"), host=meta.get("host", "edge-01"), severity=Severity.MEDIUM, message=f"Failed SSH for {m.group('user')} from {m.group('ip')}", raw={"source": ingest.source, "line": line}, normalized={"user": m.group("user")}, ) m = _SSH_OK.search(line) if m: return SecurityEvent( timestamp=_parse_ts_ssh(m.group("ts")), event_type="auth.ssh_success", source_ip=m.group("ip"), host=meta.get("host", "edge-01"), severity=Severity.LOW, message=f"Accepted SSH for {m.group('user')} via {m.group('method')}", raw={"source": ingest.source, "line": line}, normalized={"user": m.group("user"), "method": m.group("method")}, ) m = _SUDO.search(line) if m: return SecurityEvent( timestamp=_parse_ts_ssh(m.group("ts")), event_type="privilege.sudo", source_ip=meta.get("source_ip"), host=meta.get("host", "edge-01"), severity=Severity.HIGH, message=f"sudo escalation to {m.group('target')}: {m.group('cmd')[:120]}", raw={"source": ingest.source, "line": line}, normalized={"target_user": m.group("target"), "command": m.group("cmd")}, ) m = _NGINX.search(line) if m: code = int(m.group("code")) sev = Severity.HIGH if code in (401, 403) else Severity.INFO return SecurityEvent( timestamp=_parse_nginx_date(m.group("date")), event_type="web.request", source_ip=m.group("ip"), host=meta.get("host", "web-01"), severity=sev, message=f"{m.group('method')} {m.group('path')} -> {code}", raw={"source": ingest.source, "line": line}, normalized={"path": m.group("path"), "status": code}, ) m = _K8S.search(line) if m: return SecurityEvent( timestamp=datetime.now(timezone.utc), event_type="k8s.event", source_ip=None, host=meta.get("host", "k8s-api"), severity=Severity.MEDIUM, message=f"K8s {m.group('type')} {m.group('reason')} {m.group('name')}", raw={"source": ingest.source, "line": line}, normalized={"k8s_type": m.group("type"), "reason": m.group("reason")}, ) return SecurityEvent( timestamp=datetime.now(timezone.utc), event_type="generic.syslog", source_ip=meta.get("source_ip"), host=meta.get("host", "unknown"), severity=Severity.INFO, message=line[:512], raw={"source": ingest.source, "line": line}, normalized={}, ) def _parse_iso(ts: Any) -> datetime: if ts is None: return datetime.now(timezone.utc) if isinstance(ts, datetime): return ts if ts.tzinfo else ts.replace(tzinfo=timezone.utc) try: return datetime.fromisoformat(str(ts).replace("Z", "+00:00")) except ValueError: return datetime.now(timezone.utc) def _parse_nginx_date(s: str) -> datetime: try: return datetime.strptime(s.split()[0], "%d/%b/%Y:%H:%M:%S").replace(tzinfo=timezone.utc) except (ValueError, IndexError): return datetime.now(timezone.utc)