abcj / monitor /packet_sniffer.py
theghostcmd's picture
Upload 18 files
5765e13 verified
Raw
History Blame Contribute Delete
2.44 kB
import threading
import queue
import time
from scapy.all import sniff, IP, TCP, UDP, ICMP
from collections import deque
import logging
logging.basicConfig(filename='logs/sniffer.log', level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s')
class PacketSniffer(threading.Thread):
def __init__(self, packet_queue, interface=None, pcap_buffer_size=10000):
super().__init__()
self.packet_queue = packet_queue
self.interface = interface
self.running = True
self.daemon = True
self.raw_packets = deque(maxlen=pcap_buffer_size) # store raw packets for PCAP
self.lock = threading.Lock()
def run(self):
try:
sniff(iface=self.interface, prn=self._process_packet, store=0, stop_filter=self._stop_filter)
except Exception as e:
logging.error(f"Sniffer error: {e}")
def _process_packet(self, packet):
if not self.running:
return
# Store raw packet for PCAP export
with self.lock:
self.raw_packets.append(packet)
try:
if IP in packet:
src = packet[IP].src
dst = packet[IP].dst
proto = packet[IP].proto
size = len(packet)
ts = time.time()
port = None
if TCP in packet:
port = packet[TCP].dport
proto_name = "TCP"
elif UDP in packet:
port = packet[UDP].dport
proto_name = "UDP"
elif ICMP in packet:
proto_name = "ICMP"
else:
proto_name = "OTHER"
packet_info = {
'timestamp': ts,
'src_ip': src,
'dst_ip': dst,
'protocol': proto_name,
'port': port,
'size': size
}
self.packet_queue.put(packet_info)
except Exception as e:
logging.error(f"Error processing packet: {e}")
def _stop_filter(self, packet):
return not self.running
def get_pcap_buffer(self):
with self.lock:
return list(self.raw_packets)
def stop(self):
self.running = False