| import threading
|
| import queue
|
| import time
|
| from scapy.all import sniff, IP, TCP, UDP, ICMP
|
| from collections import deque
|
| import logging
|
|
|
| logging.basicConfig(filename='logs/sniffer.log', level=logging.ERROR,
|
| format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
| class PacketSniffer(threading.Thread):
|
| def __init__(self, packet_queue, interface=None, pcap_buffer_size=10000):
|
| super().__init__()
|
| self.packet_queue = packet_queue
|
| self.interface = interface
|
| self.running = True
|
| self.daemon = True
|
| self.raw_packets = deque(maxlen=pcap_buffer_size)
|
| self.lock = threading.Lock()
|
|
|
| def run(self):
|
| try:
|
| sniff(iface=self.interface, prn=self._process_packet, store=0, stop_filter=self._stop_filter)
|
| except Exception as e:
|
| logging.error(f"Sniffer error: {e}")
|
|
|
| def _process_packet(self, packet):
|
| if not self.running:
|
| return
|
|
|
| with self.lock:
|
| self.raw_packets.append(packet)
|
| try:
|
| if IP in packet:
|
| src = packet[IP].src
|
| dst = packet[IP].dst
|
| proto = packet[IP].proto
|
| size = len(packet)
|
| ts = time.time()
|
| port = None
|
| if TCP in packet:
|
| port = packet[TCP].dport
|
| proto_name = "TCP"
|
| elif UDP in packet:
|
| port = packet[UDP].dport
|
| proto_name = "UDP"
|
| elif ICMP in packet:
|
| proto_name = "ICMP"
|
| else:
|
| proto_name = "OTHER"
|
|
|
| packet_info = {
|
| 'timestamp': ts,
|
| 'src_ip': src,
|
| 'dst_ip': dst,
|
| 'protocol': proto_name,
|
| 'port': port,
|
| 'size': size
|
| }
|
| self.packet_queue.put(packet_info)
|
| except Exception as e:
|
| logging.error(f"Error processing packet: {e}")
|
|
|
| def _stop_filter(self, packet):
|
| return not self.running
|
|
|
| def get_pcap_buffer(self):
|
| with self.lock:
|
| return list(self.raw_packets)
|
|
|
| def stop(self):
|
| self.running = False |