import os import numpy as np import pandas as pd import traceback import gradio as gr # Attempt to import libraries with more robust error handling try: import scapy.all as scapy except ImportError: scapy = None try: import pyshark except ImportError: pyshark = None from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler class NetworkAnomalyDetector: def __init__(self, huggingface_model=None): self.huggingface_model = huggingface_model self.isolation_forest = IsolationForest( contamination=0.1, # Assume 10% of packets might be anomalous random_state=42 ) self.scaler = StandardScaler() def parse_pcap_file(self, file_path): """ Parse network packet file with multiple parsing strategies :param file_path: Path to the packet capture file :return: DataFrame with packet features """ packet_features = [] parsing_errors = [] # Helper method to read raw file bytes as a fallback def read_raw_file(path): try: with open(path, 'rb') as f: file_contents = f.read() return [{ 'length': len(file_contents), 'protocol': 'Unknown', 'src_ip': 'Unknown', 'dst_ip': 'Unknown', 'timestamp': 0 }] except Exception as e: parsing_errors.append(f"Raw file reading error: {str(e)}") return [] # Method 1: Scapy parsing if scapy is not None: try: packets = scapy.rdpcap(file_path) for packet in packets: features = { 'length': len(packet), 'protocol': self._extract_protocol_scapy(packet), 'src_ip': self._extract_src_ip_scapy(packet), 'dst_ip': self._extract_dst_ip_scapy(packet), 'timestamp': getattr(packet, 'time', 0) } packet_features.append(features) if packet_features: return pd.DataFrame(packet_features) except Exception as e: parsing_errors.append(f"Scapy parsing error: {str(e)}") # Method 2: PyShark parsing (with async handling) if pyshark is not None: try: import asyncio async def parse_with_pyshark(): capture = pyshark.FileCapture(file_path) local_features = [] for packet in capture: try: features = { 'length': int(packet.length), 'protocol': self._extract_protocol_pyshark(packet), 'src_ip': self._extract_src_ip_pyshark(packet), 'dst_ip': self._extract_dst_ip_pyshark(packet), 'timestamp': float(packet.sniff_time.timestamp()) } local_features.append(features) except Exception as packet_error: parsing_errors.append(f"PyShark packet parsing error: {str(packet_error)}") capture.close() return local_features # Run the async function try: packet_features = asyncio.run(parse_with_pyshark()) if packet_features: return pd.DataFrame(packet_features) except Exception as async_error: parsing_errors.append(f"PyShark async error: {str(async_error)}") except Exception as e: parsing_errors.append(f"PyShark parsing error: {str(e)}") # Fallback: Raw file reading packet_features = read_raw_file(file_path) if packet_features: # Log parsing errors if any occurred if parsing_errors: print("Parsing Errors:") for error in parsing_errors: print(error) return pd.DataFrame(packet_features) # If all parsing methods fail print("All parsing methods failed. Parsing Errors:") for error in parsing_errors: print(error) return pd.DataFrame() def _extract_protocol_scapy(self, packet): """Extract protocol from Scapy packet""" try: if packet.haslayer(scapy.IP): return str(packet[scapy.IP].proto) return 'Unknown' except: return 'Unknown' def _extract_src_ip_scapy(self, packet): """Extract source IP from Scapy packet""" try: if packet.haslayer(scapy.IP): return packet[scapy.IP].src return 'Unknown' except: return 'Unknown' def _extract_dst_ip_scapy(self, packet): """Extract destination IP from Scapy packet""" try: if packet.haslayer(scapy.IP): return packet[scapy.IP].dst return 'Unknown' except: return 'Unknown' def _extract_protocol_pyshark(self, packet): """Extract protocol from PyShark packet""" try: if hasattr(packet, 'transport_layer'): return str(packet.transport_layer) elif hasattr(packet, 'ip'): return str(packet.ip.proto) return 'Unknown' except: return 'Unknown' def _extract_src_ip_pyshark(self, packet): """Extract source IP from PyShark packet""" try: if hasattr(packet, 'ip'): return packet.ip.src return 'Unknown' except: return 'Unknown' def _extract_dst_ip_pyshark(self, packet): """Extract destination IP from PyShark packet""" try: if hasattr(packet, 'ip'): return packet.ip.dst return 'Unknown' except: return 'Unknown' def analyze_network_file(self, file_path): """ Comprehensive network file analysis with anomaly detection :param file_path: Path to the network packet capture file :return: Dictionary containing anomaly detection results """ # Parse packet file packets_df = self.parse_pcap_file(file_path) if packets_df.empty: return { 'summary': { 'total_packets': 0, 'isolation_forest_anomalies': 0 }, 'packets': packets_df } # Prepare features for anomaly detection feature_columns = ['length', 'timestamp'] # Handle protocol and IP as categorical features packets_df['protocol_encoded'] = pd.Categorical(packets_df['protocol']).codes packets_df['src_ip_encoded'] = pd.Categorical(packets_df['src_ip']).codes packets_df['dst_ip_encoded'] = pd.Categorical(packets_df['dst_ip']).codes feature_columns.extend(['protocol_encoded', 'src_ip_encoded', 'dst_ip_encoded']) # Prepare features for anomaly detection features = packets_df[feature_columns] # Scale features features_scaled = self.scaler.fit_transform(features) # Detect anomalies anomaly_labels = self.isolation_forest.fit_predict(features_scaled) packets_df['is_anomaly'] = anomaly_labels == -1 # Create summary summary = { 'total_packets': len(packets_df), 'isolation_forest_anomalies': sum(packets_df['is_anomaly']) } return { 'summary': summary, 'packets': packets_df } def analyze_network_file(file_path): """ Wrapper function to analyze network file and handle Gradio interface requirements """ try: # Check if file exists if not os.path.exists(file_path): return "Error: File not found", None, "File does not exist." # Initialize detector detector = NetworkAnomalyDetector() # Analyze network file results = detector.analyze_network_file(file_path) # Prepare summary text summary_text = f""" Anomaly Detection Results: - Total Packets: {results['summary']['total_packets']} - Anomalous Packets: {results['summary']['isolation_forest_anomalies']} """ # Return results for display return summary_text, results['packets'], None except Exception as e: error_trace = traceback.format_exc() return f"Error: {str(e)}", None, error_trace def create_gradio_interface(): """ Create Gradio interface for Network Anomaly Detector """ with gr.Blocks(title="Network Anomaly Detector") as demo: gr.Markdown("# 🌐 Network Anomaly Detector") gr.Markdown("Upload a network packet capture file (PCAP) for anomaly analysis.") with gr.Row(): file_input = gr.File(label="Upload PCAP File", type="filepath", file_types=['.pcap', '.pkt']) analyze_button = gr.Button("Analyze Network File", variant="primary") # Outputs summary_output = gr.Textbox(label="Analysis Summary", lines=5) results_dataframe = gr.DataFrame(label="Packet Details") error_output = gr.Textbox(label="Error Trace", visible=False) # Event handlers analyze_button.click( fn=analyze_network_file, inputs=[file_input], outputs=[summary_output, results_dataframe, error_output] ) # Optional: Add some explanatory text about the tool gr.Markdown(""" ### How it works: 1. Upload a PCAP (packet capture) file 2. Click "Analyze Network File" 3. View summary of total packets and detected anomalies 4. Explore detailed packet information #### Anomaly Detection Techniques: - Uses Isolation Forest algorithm - Analyzes packet length, timestamp, protocol, and IP addresses - Highlights statistically unusual network traffic """) return demo def main(): # Check library requirements try: import scapy import pyshark import pandas import numpy import sklearn import gradio except ImportError as e: print(f"Missing required library: {e}") print("Please install: pip install scapy pyshark pandas numpy scikit-learn gradio") return # Launch Gradio app demo = create_gradio_interface() demo.launch( server_name="0.0.0.0", # Make accessible on local network share=True, # Optional: create a public shareable link debug=True # Show detailed errors ) if __name__ == "__main__": main()