"""network_monitor.py – Real-time network packet scanning for Kavach SIEM. Uses psutil to surface actual OS-level TCP/UDP connections and enriches them with heuristic threat analysis. """ import psutil import time import random import uuid from typing import Optional # ── Threat intelligence constants ──────────────────────────────────────────── # Ports commonly associated with RATs, backdoors, botnets and attack tools SUSPICIOUS_PORTS = { 4444, 4445, 4446, 1337, 31337, 8888, 9999, 12345, 12346, 6667, 6668, 6669, # IRC / botnet C2 1080, 3128, 8118, # open proxies 4899, # Radmin 3333, 5555, 7777, 14444, 45700, # crypto-mining pools 6200, 7000, 9095, 54321, 65535, # miscellaneous RAT ports } # Typical dev / legit service ports – used to suppress false positives SAFE_DST_PORTS = { 80, 443, 53, 22, 25, 587, 465, 993, 995, 8000, 8001, 3000, 3001, 5173, 5174, # local dev servers 3306, 5432, 27017, 6379, 6380, # databases (local) 11211, # memcached (local) } # Threat-pattern definitions (deterministic scoring) THREAT_PATTERNS = { "suspicious_port": {"severity": "CRITICAL", "score": 0.90, "type": "Suspicious Port", "desc": "Connection to known-malicious port"}, "dns_tunneling": {"severity": "CRITICAL", "score": 0.88, "type": "DNS Tunneling", "desc": "Oversized DNS query – possible data exfiltration via DNS"}, "syn_flood": {"severity": "CRITICAL", "score": 0.95, "type": "SYN Flood", "desc": "High-frequency SYN/RST pattern indicative of DoS"}, "data_exfil": {"severity": "HIGH", "score": 0.85, "type": "Data Exfiltration", "desc": "Large outbound payload to external host"}, "high_frequency": {"severity": "HIGH", "score": 0.80, "type": "High Frequency", "desc": "Abnormal connection rate from single source"}, "large_payload": {"severity": "MEDIUM", "score": 0.60, "type": "Large Payload", "desc": "Packet size exceeds expected threshold"}, "port_scan": {"severity": "HIGH", "score": 0.75, "type": "Port Scan", "desc": "Sequential port access pattern detected"}, } _INTERNAL_PREFIXES = ("10.", "172.16.", "172.17.", "172.18.", "172.19.", "172.20.", "172.21.", "172.22.", "172.23.", "172.24.", "172.25.", "172.26.", "172.27.", "172.28.", "172.29.", "172.30.", "172.31.", "192.168.", "127.") # In-process window for connection-rate tracking {src_ip: [timestamps]} _conn_rate: dict[str, list[float]] = {} # Track SYN/RST flag counts per source IP to detect real floods _flag_rate: dict[str, list[float]] = {} # ── Helpers ─────────────────────────────────────────────────────────────────── def _is_external(ip: str) -> bool: return not any(ip.startswith(p) for p in _INTERNAL_PREFIXES) def _purge_old_rate_entries(window: float = 10.0) -> None: now = time.time() for store in (_conn_rate, _flag_rate): for key in list(store.keys()): store[key] = [t for t in store[key] if now - t < window] if not store[key]: del store[key] # ── Real connection reader ──────────────────────────────────────────────────── def get_real_connections() -> list[dict]: """Return a snapshot of current ESTABLISHED / LISTEN connections via psutil.""" results = [] try: for conn in psutil.net_connections(kind="inet"): # Keep only active, actionable connection states. # TIME_WAIT/LISTEN/CLOSE_WAIT are very noisy and inflate log volume. if conn.status not in ("ESTABLISHED", "SYN_SENT"): continue if not conn.laddr or not conn.raddr: continue src_ip = conn.laddr.ip or "127.0.0.1" src_port = conn.laddr.port dst_ip = conn.raddr.ip dst_port = conn.raddr.port proc_name = None if conn.pid: try: proc_name = psutil.Process(conn.pid).name() except Exception: pass results.append({ "src_ip": src_ip, "src_port": src_port, "dst_ip": dst_ip, "dst_port": dst_port, "protocol": "TCP" if getattr(conn, "type", None) and conn.type.name == "SOCK_STREAM" else "UDP", "status": conn.status, "pid": conn.pid, "process": proc_name, }) except Exception: pass return results # ── Threat analyser ─────────────────────────────────────────────────────────── def analyze_packet(packet: dict) -> list[dict]: """Run heuristic checks and return a list of matched threat records.""" threats: list[dict] = [] dst_port = packet.get("dst_port", 0) byte_len = packet.get("bytes", 0) src_ip = packet.get("src_ip", "") dst_ip = packet.get("dst_ip", "") state = packet.get("state", "") size_ok = bool(packet.get("size_trustworthy", False)) # 1. Suspicious destination port if dst_port in SUSPICIOUS_PORTS and _is_external(dst_ip): t = dict(THREAT_PATTERNS["suspicious_port"]) t["detail"] = f"Port {dst_port} associated with RAT/backdoor/botnet" threats.append(t) # 2. Oversized DNS (possible tunneling) if size_ok and dst_port == 53 and _is_external(dst_ip) and byte_len > 1500: t = dict(THREAT_PATTERNS["dns_tunneling"]) t["detail"] = f"DNS payload {byte_len} B (normal ≤512 B)" threats.append(t) # 3. Large outbound payload to external host if size_ok and byte_len > 40_000 and _is_external(dst_ip): t = dict(THREAT_PATTERNS["data_exfil"]) t["detail"] = f"{byte_len:,} B outbound to {dst_ip}" threats.append(t) # 4. Oversized generic payload elif size_ok and byte_len > 50_000: t = dict(THREAT_PATTERNS["large_payload"]) t["detail"] = f"Payload {byte_len:,} B above 50 KB threshold" threats.append(t) # 5. SYN flood based on connection state bursts if state == "SYN_SENT": _flag_rate.setdefault(src_ip, []).append(time.time()) if len(_flag_rate.get(src_ip, [])) >= 25: t = dict(THREAT_PATTERNS["syn_flood"]) t["detail"] = (f"{len(_flag_rate[src_ip])} SYN_SENT connections " f"in 10 s from {src_ip}") threats.append(t) # 6. Connection-rate anomaly (raise threshold; suppress local DNS chatter) _purge_old_rate_entries() _conn_rate.setdefault(src_ip, []).append(time.time()) if len(_conn_rate[src_ip]) > 80 and not (dst_port == 53 and not _is_external(dst_ip)): t = dict(THREAT_PATTERNS["high_frequency"]) t["detail"] = f"{len(_conn_rate[src_ip])} connections in 10 s from {src_ip}" threats.append(t) return threats # ── Packet snapshot generator ───────────────────────────────────────────────── def generate_packet_snapshot() -> Optional[dict]: """ Build a single real packet event from current psutil network connections. Returns None when no suitable real connection is available. """ real_conns = get_real_connections() if not real_conns: return None conn = random.choice(real_conns) pkt: dict = { "id": str(uuid.uuid4()), "timestamp": time.time(), "src_ip": conn["src_ip"], "src_port": conn["src_port"], "dst_ip": conn["dst_ip"], "dst_port": conn["dst_port"], "protocol": conn["protocol"], # psutil net_connections does not expose packet byte size / ttl / tcp flags. # Keep placeholders to avoid fabricating telemetry. "bytes": 0, "ttl": None, "flags": None, "state": conn.get("status", "UNKNOWN"), "size_trustworthy": False, "pid": conn.get("pid"), "process": conn.get("process"), "synthetic": False, } threats = analyze_packet(pkt) pkt["threats"] = threats pkt["is_suspicious"] = bool(threats) pkt["max_score"] = max((t["score"] for t in threats), default=0.0) pkt["severity"] = threats[0]["severity"] if threats else "INFO" # Tag so the frontend can label it [REAL] or [SIM] pkt["source_tag"] = "REAL" if not pkt.get("synthetic", True) else "SIM" return pkt