Spaces:
Runtime error
Runtime error
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import traceback | |
| import gradio as gr | |
| # Attempt to import libraries with more robust error handling | |
| try: | |
| import scapy.all as scapy | |
| except ImportError: | |
| scapy = None | |
| try: | |
| import pyshark | |
| except ImportError: | |
| pyshark = None | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import StandardScaler | |
| class NetworkAnomalyDetector: | |
| def __init__(self, huggingface_model=None): | |
| self.huggingface_model = huggingface_model | |
| self.isolation_forest = IsolationForest( | |
| contamination=0.1, # Assume 10% of packets might be anomalous | |
| random_state=42 | |
| ) | |
| self.scaler = StandardScaler() | |
| def parse_pcap_file(self, file_path): | |
| """ | |
| Parse network packet file with multiple parsing strategies | |
| :param file_path: Path to the packet capture file | |
| :return: DataFrame with packet features | |
| """ | |
| packet_features = [] | |
| parsing_errors = [] | |
| # Helper method to read raw file bytes as a fallback | |
| def read_raw_file(path): | |
| try: | |
| with open(path, 'rb') as f: | |
| file_contents = f.read() | |
| return [{ | |
| 'length': len(file_contents), | |
| 'protocol': 'Unknown', | |
| 'src_ip': 'Unknown', | |
| 'dst_ip': 'Unknown', | |
| 'timestamp': 0 | |
| }] | |
| except Exception as e: | |
| parsing_errors.append(f"Raw file reading error: {str(e)}") | |
| return [] | |
| # Method 1: Scapy parsing | |
| if scapy is not None: | |
| try: | |
| packets = scapy.rdpcap(file_path) | |
| for packet in packets: | |
| features = { | |
| 'length': len(packet), | |
| 'protocol': self._extract_protocol_scapy(packet), | |
| 'src_ip': self._extract_src_ip_scapy(packet), | |
| 'dst_ip': self._extract_dst_ip_scapy(packet), | |
| 'timestamp': getattr(packet, 'time', 0) | |
| } | |
| packet_features.append(features) | |
| if packet_features: | |
| return pd.DataFrame(packet_features) | |
| except Exception as e: | |
| parsing_errors.append(f"Scapy parsing error: {str(e)}") | |
| # Method 2: PyShark parsing (with async handling) | |
| if pyshark is not None: | |
| try: | |
| import asyncio | |
| async def parse_with_pyshark(): | |
| capture = pyshark.FileCapture(file_path) | |
| local_features = [] | |
| for packet in capture: | |
| try: | |
| features = { | |
| 'length': int(packet.length), | |
| 'protocol': self._extract_protocol_pyshark(packet), | |
| 'src_ip': self._extract_src_ip_pyshark(packet), | |
| 'dst_ip': self._extract_dst_ip_pyshark(packet), | |
| 'timestamp': float(packet.sniff_time.timestamp()) | |
| } | |
| local_features.append(features) | |
| except Exception as packet_error: | |
| parsing_errors.append(f"PyShark packet parsing error: {str(packet_error)}") | |
| capture.close() | |
| return local_features | |
| # Run the async function | |
| try: | |
| packet_features = asyncio.run(parse_with_pyshark()) | |
| if packet_features: | |
| return pd.DataFrame(packet_features) | |
| except Exception as async_error: | |
| parsing_errors.append(f"PyShark async error: {str(async_error)}") | |
| except Exception as e: | |
| parsing_errors.append(f"PyShark parsing error: {str(e)}") | |
| # Fallback: Raw file reading | |
| packet_features = read_raw_file(file_path) | |
| if packet_features: | |
| # Log parsing errors if any occurred | |
| if parsing_errors: | |
| print("Parsing Errors:") | |
| for error in parsing_errors: | |
| print(error) | |
| return pd.DataFrame(packet_features) | |
| # If all parsing methods fail | |
| print("All parsing methods failed. Parsing Errors:") | |
| for error in parsing_errors: | |
| print(error) | |
| return pd.DataFrame() | |
| def _extract_protocol_scapy(self, packet): | |
| """Extract protocol from Scapy packet""" | |
| try: | |
| if packet.haslayer(scapy.IP): | |
| return str(packet[scapy.IP].proto) | |
| return 'Unknown' | |
| except: | |
| return 'Unknown' | |
| def _extract_src_ip_scapy(self, packet): | |
| """Extract source IP from Scapy packet""" | |
| try: | |
| if packet.haslayer(scapy.IP): | |
| return packet[scapy.IP].src | |
| return 'Unknown' | |
| except: | |
| return 'Unknown' | |
| def _extract_dst_ip_scapy(self, packet): | |
| """Extract destination IP from Scapy packet""" | |
| try: | |
| if packet.haslayer(scapy.IP): | |
| return packet[scapy.IP].dst | |
| return 'Unknown' | |
| except: | |
| return 'Unknown' | |
| def _extract_protocol_pyshark(self, packet): | |
| """Extract protocol from PyShark packet""" | |
| try: | |
| if hasattr(packet, 'transport_layer'): | |
| return str(packet.transport_layer) | |
| elif hasattr(packet, 'ip'): | |
| return str(packet.ip.proto) | |
| return 'Unknown' | |
| except: | |
| return 'Unknown' | |
| def _extract_src_ip_pyshark(self, packet): | |
| """Extract source IP from PyShark packet""" | |
| try: | |
| if hasattr(packet, 'ip'): | |
| return packet.ip.src | |
| return 'Unknown' | |
| except: | |
| return 'Unknown' | |
| def _extract_dst_ip_pyshark(self, packet): | |
| """Extract destination IP from PyShark packet""" | |
| try: | |
| if hasattr(packet, 'ip'): | |
| return packet.ip.dst | |
| return 'Unknown' | |
| except: | |
| return 'Unknown' | |
| def analyze_network_file(self, file_path): | |
| """ | |
| Comprehensive network file analysis with anomaly detection | |
| :param file_path: Path to the network packet capture file | |
| :return: Dictionary containing anomaly detection results | |
| """ | |
| # Parse packet file | |
| packets_df = self.parse_pcap_file(file_path) | |
| if packets_df.empty: | |
| return { | |
| 'summary': { | |
| 'total_packets': 0, | |
| 'isolation_forest_anomalies': 0 | |
| }, | |
| 'packets': packets_df | |
| } | |
| # Prepare features for anomaly detection | |
| feature_columns = ['length', 'timestamp'] | |
| # Handle protocol and IP as categorical features | |
| packets_df['protocol_encoded'] = pd.Categorical(packets_df['protocol']).codes | |
| packets_df['src_ip_encoded'] = pd.Categorical(packets_df['src_ip']).codes | |
| packets_df['dst_ip_encoded'] = pd.Categorical(packets_df['dst_ip']).codes | |
| feature_columns.extend(['protocol_encoded', 'src_ip_encoded', 'dst_ip_encoded']) | |
| # Prepare features for anomaly detection | |
| features = packets_df[feature_columns] | |
| # Scale features | |
| features_scaled = self.scaler.fit_transform(features) | |
| # Detect anomalies | |
| anomaly_labels = self.isolation_forest.fit_predict(features_scaled) | |
| packets_df['is_anomaly'] = anomaly_labels == -1 | |
| # Create summary | |
| summary = { | |
| 'total_packets': len(packets_df), | |
| 'isolation_forest_anomalies': sum(packets_df['is_anomaly']) | |
| } | |
| return { | |
| 'summary': summary, | |
| 'packets': packets_df | |
| } | |
| def analyze_network_file(file_path): | |
| """ | |
| Wrapper function to analyze network file and handle Gradio interface requirements | |
| """ | |
| try: | |
| # Check if file exists | |
| if not os.path.exists(file_path): | |
| return "Error: File not found", None, "File does not exist." | |
| # Initialize detector | |
| detector = NetworkAnomalyDetector() | |
| # Analyze network file | |
| results = detector.analyze_network_file(file_path) | |
| # Prepare summary text | |
| summary_text = f""" | |
| Anomaly Detection Results: | |
| - Total Packets: {results['summary']['total_packets']} | |
| - Anomalous Packets: {results['summary']['isolation_forest_anomalies']} | |
| """ | |
| # Return results for display | |
| return summary_text, results['packets'], None | |
| except Exception as e: | |
| error_trace = traceback.format_exc() | |
| return f"Error: {str(e)}", None, error_trace | |
| def create_gradio_interface(): | |
| """ | |
| Create Gradio interface for Network Anomaly Detector | |
| """ | |
| with gr.Blocks(title="Network Anomaly Detector") as demo: | |
| gr.Markdown("# 🌐 Network Anomaly Detector") | |
| gr.Markdown("Upload a network packet capture file (PCAP) for anomaly analysis.") | |
| with gr.Row(): | |
| file_input = gr.File(label="Upload PCAP File", type="filepath", file_types=['.pcap', '.pkt']) | |
| analyze_button = gr.Button("Analyze Network File", variant="primary") | |
| # Outputs | |
| summary_output = gr.Textbox(label="Analysis Summary", lines=5) | |
| results_dataframe = gr.DataFrame(label="Packet Details") | |
| error_output = gr.Textbox(label="Error Trace", visible=False) | |
| # Event handlers | |
| analyze_button.click( | |
| fn=analyze_network_file, | |
| inputs=[file_input], | |
| outputs=[summary_output, results_dataframe, error_output] | |
| ) | |
| # Optional: Add some explanatory text about the tool | |
| gr.Markdown(""" | |
| ### How it works: | |
| 1. Upload a PCAP (packet capture) file | |
| 2. Click "Analyze Network File" | |
| 3. View summary of total packets and detected anomalies | |
| 4. Explore detailed packet information | |
| #### Anomaly Detection Techniques: | |
| - Uses Isolation Forest algorithm | |
| - Analyzes packet length, timestamp, protocol, and IP addresses | |
| - Highlights statistically unusual network traffic | |
| """) | |
| return demo | |
| def main(): | |
| # Check library requirements | |
| try: | |
| import scapy | |
| import pyshark | |
| import pandas | |
| import numpy | |
| import sklearn | |
| import gradio | |
| except ImportError as e: | |
| print(f"Missing required library: {e}") | |
| print("Please install: pip install scapy pyshark pandas numpy scikit-learn gradio") | |
| return | |
| # Launch Gradio app | |
| demo = create_gradio_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", # Make accessible on local network | |
| share=True, # Optional: create a public shareable link | |
| debug=True # Show detailed errors | |
| ) | |
| if __name__ == "__main__": | |
| main() |