logs / network_monitor.py
Samyak000's picture
Update network_monitor.py
be03cbf verified
"""network_monitor.py – Real-time network packet scanning for Kavach SIEM.
Uses psutil to surface actual OS-level TCP/UDP connections and enriches
them with heuristic threat analysis.
"""
import psutil
import time
import random
import uuid
from typing import Optional
# ── Threat intelligence constants ────────────────────────────────────────────
# Ports commonly associated with RATs, backdoors, botnets and attack tools
SUSPICIOUS_PORTS = {
4444, 4445, 4446, 1337, 31337, 8888, 9999,
12345, 12346, 6667, 6668, 6669, # IRC / botnet C2
1080, 3128, 8118, # open proxies
4899, # Radmin
3333, 5555, 7777, 14444, 45700, # crypto-mining pools
6200, 7000, 9095, 54321, 65535, # miscellaneous RAT ports
}
# Typical dev / legit service ports – used to suppress false positives
SAFE_DST_PORTS = {
80, 443, 53, 22, 25, 587, 465, 993, 995,
8000, 8001, 3000, 3001, 5173, 5174, # local dev servers
3306, 5432, 27017, 6379, 6380, # databases (local)
11211, # memcached (local)
}
# Threat-pattern definitions (deterministic scoring)
THREAT_PATTERNS = {
"suspicious_port": {"severity": "CRITICAL", "score": 0.90,
"type": "Suspicious Port",
"desc": "Connection to known-malicious port"},
"dns_tunneling": {"severity": "CRITICAL", "score": 0.88,
"type": "DNS Tunneling",
"desc": "Oversized DNS query – possible data exfiltration via DNS"},
"syn_flood": {"severity": "CRITICAL", "score": 0.95,
"type": "SYN Flood",
"desc": "High-frequency SYN/RST pattern indicative of DoS"},
"data_exfil": {"severity": "HIGH", "score": 0.85,
"type": "Data Exfiltration",
"desc": "Large outbound payload to external host"},
"high_frequency": {"severity": "HIGH", "score": 0.80,
"type": "High Frequency",
"desc": "Abnormal connection rate from single source"},
"large_payload": {"severity": "MEDIUM", "score": 0.60,
"type": "Large Payload",
"desc": "Packet size exceeds expected threshold"},
"port_scan": {"severity": "HIGH", "score": 0.75,
"type": "Port Scan",
"desc": "Sequential port access pattern detected"},
}
_INTERNAL_PREFIXES = ("10.", "172.16.", "172.17.", "172.18.", "172.19.",
"172.20.", "172.21.", "172.22.", "172.23.", "172.24.",
"172.25.", "172.26.", "172.27.", "172.28.", "172.29.",
"172.30.", "172.31.", "192.168.", "127.")
# In-process window for connection-rate tracking {src_ip: [timestamps]}
_conn_rate: dict[str, list[float]] = {}
# Track SYN/RST flag counts per source IP to detect real floods
_flag_rate: dict[str, list[float]] = {}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _is_external(ip: str) -> bool:
return not any(ip.startswith(p) for p in _INTERNAL_PREFIXES)
def _purge_old_rate_entries(window: float = 10.0) -> None:
now = time.time()
for store in (_conn_rate, _flag_rate):
for key in list(store.keys()):
store[key] = [t for t in store[key] if now - t < window]
if not store[key]:
del store[key]
# ── Real connection reader ────────────────────────────────────────────────────
def get_real_connections() -> list[dict]:
"""Return a snapshot of current ESTABLISHED / LISTEN connections via psutil."""
results = []
try:
for conn in psutil.net_connections(kind="inet"):
# Keep only active, actionable connection states.
# TIME_WAIT/LISTEN/CLOSE_WAIT are very noisy and inflate log volume.
if conn.status not in ("ESTABLISHED", "SYN_SENT"):
continue
if not conn.laddr or not conn.raddr:
continue
src_ip = conn.laddr.ip or "127.0.0.1"
src_port = conn.laddr.port
dst_ip = conn.raddr.ip
dst_port = conn.raddr.port
proc_name = None
if conn.pid:
try:
proc_name = psutil.Process(conn.pid).name()
except Exception:
pass
results.append({
"src_ip": src_ip,
"src_port": src_port,
"dst_ip": dst_ip,
"dst_port": dst_port,
"protocol": "TCP" if getattr(conn, "type", None) and
conn.type.name == "SOCK_STREAM" else "UDP",
"status": conn.status,
"pid": conn.pid,
"process": proc_name,
})
except Exception:
pass
return results
# ── Threat analyser ───────────────────────────────────────────────────────────
def analyze_packet(packet: dict) -> list[dict]:
"""Run heuristic checks and return a list of matched threat records."""
threats: list[dict] = []
dst_port = packet.get("dst_port", 0)
byte_len = packet.get("bytes", 0)
src_ip = packet.get("src_ip", "")
dst_ip = packet.get("dst_ip", "")
state = packet.get("state", "")
size_ok = bool(packet.get("size_trustworthy", False))
# 1. Suspicious destination port
if dst_port in SUSPICIOUS_PORTS and _is_external(dst_ip):
t = dict(THREAT_PATTERNS["suspicious_port"])
t["detail"] = f"Port {dst_port} associated with RAT/backdoor/botnet"
threats.append(t)
# 2. Oversized DNS (possible tunneling)
if size_ok and dst_port == 53 and _is_external(dst_ip) and byte_len > 1500:
t = dict(THREAT_PATTERNS["dns_tunneling"])
t["detail"] = f"DNS payload {byte_len} B (normal ≀512 B)"
threats.append(t)
# 3. Large outbound payload to external host
if size_ok and byte_len > 40_000 and _is_external(dst_ip):
t = dict(THREAT_PATTERNS["data_exfil"])
t["detail"] = f"{byte_len:,} B outbound to {dst_ip}"
threats.append(t)
# 4. Oversized generic payload
elif size_ok and byte_len > 50_000:
t = dict(THREAT_PATTERNS["large_payload"])
t["detail"] = f"Payload {byte_len:,} B above 50 KB threshold"
threats.append(t)
# 5. SYN flood based on connection state bursts
if state == "SYN_SENT":
_flag_rate.setdefault(src_ip, []).append(time.time())
if len(_flag_rate.get(src_ip, [])) >= 25:
t = dict(THREAT_PATTERNS["syn_flood"])
t["detail"] = (f"{len(_flag_rate[src_ip])} SYN_SENT connections "
f"in 10 s from {src_ip}")
threats.append(t)
# 6. Connection-rate anomaly (raise threshold; suppress local DNS chatter)
_purge_old_rate_entries()
_conn_rate.setdefault(src_ip, []).append(time.time())
if len(_conn_rate[src_ip]) > 80 and not (dst_port == 53 and not _is_external(dst_ip)):
t = dict(THREAT_PATTERNS["high_frequency"])
t["detail"] = f"{len(_conn_rate[src_ip])} connections in 10 s from {src_ip}"
threats.append(t)
return threats
# ── Packet snapshot generator ─────────────────────────────────────────────────
def generate_packet_snapshot() -> Optional[dict]:
"""
Build a single real packet event from current psutil network connections.
Returns None when no suitable real connection is available.
"""
real_conns = get_real_connections()
if not real_conns:
return None
conn = random.choice(real_conns)
pkt: dict = {
"id": str(uuid.uuid4()),
"timestamp": time.time(),
"src_ip": conn["src_ip"],
"src_port": conn["src_port"],
"dst_ip": conn["dst_ip"],
"dst_port": conn["dst_port"],
"protocol": conn["protocol"],
# psutil net_connections does not expose packet byte size / ttl / tcp flags.
# Keep placeholders to avoid fabricating telemetry.
"bytes": 0,
"ttl": None,
"flags": None,
"state": conn.get("status", "UNKNOWN"),
"size_trustworthy": False,
"pid": conn.get("pid"),
"process": conn.get("process"),
"synthetic": False,
}
threats = analyze_packet(pkt)
pkt["threats"] = threats
pkt["is_suspicious"] = bool(threats)
pkt["max_score"] = max((t["score"] for t in threats), default=0.0)
pkt["severity"] = threats[0]["severity"] if threats else "INFO"
# Tag so the frontend can label it [REAL] or [SIM]
pkt["source_tag"] = "REAL" if not pkt.get("synthetic", True) else "SIM"
return pkt