WebPass / webpass /network_monitor.py
ag235772's picture
Initial Release: WebPass V2 with Steganography, Crypto Vault, and Cloud Toggle
136c0f7
# network_monitor.py
import time
from scapy.sendrecv import AsyncSniffer
from scapy.all import IP, TCP, UDP, DNS
def process_packet(pkt):
"""Processes a Scapy packet into a simple dictionary."""
d = {"timestamp": pkt.time}
if pkt.haslayer(IP):
ip = pkt[IP]
d.update(src=ip.src, dst=ip.dst, ttl=ip.ttl, length=len(pkt))
if pkt.haslayer(TCP):
t = pkt[TCP]
d.update(
proto=6, src_port=t.sport, dst_port=t.dport, flags=str(t.flags),
info=f"TCP {ip.src}:{t.sport}{ip.dst}:{t.dport}"
)
elif pkt.haslayer(UDP):
u = pkt[UDP]
d.update(proto=17, src_port=u.sport, dst_port=u.dport, flags="-")
if pkt.haslayer(DNS) and hasattr(pkt[DNS], 'qd') and pkt[DNS].qd is not None:
qname = pkt[DNS].qd.qname
# qname can be bytes, decode safely
qname_str = qname.decode(errors='ignore') if isinstance(qname, bytes) else str(qname)
d.update(proto="DNS", info=f"DNS Query for {qname_str}")
else:
d.update(info=f"UDP {ip.src}:{u.sport}{ip.dst}:{u.dport}")
else:
d.setdefault("proto", ip.proto)
else:
d.update(src="N/A", dst="N/A", proto="Other", length=len(pkt))
return d
def start_packet_capture(app, socketio, interface=None, filter_expr="ip"):
"""
Sniffs packets in a background thread, adds them to the app's deque,
and emits every packet to connected clients.
"""
def _handle(pkt):
try:
data = process_packet(pkt)
except Exception:
return # Ignore packets that cause processing errors
# 1. Store the packet in the historical backlog
app.captured_packets.append(data)
# 2. Emit EVERY packet to the live feed (filter removed)
socketio.emit("new_packet", data)
sniffer = AsyncSniffer(
iface=interface,
filter=filter_expr,
prn=_handle,
store=False
)
sniffer.start()