import streamlit as st import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.metrics import classification_report import joblib class NetworkAnalyzer: def __init__(self): self.model = RandomForestClassifier(n_estimators=100, random_state=42) self.scaler = StandardScaler() def preprocess_data(self, data): """ Preprocess network traffic data Features: packet_size, protocol_type, duration, src_bytes, dst_bytes """ # Convert categorical features data['protocol_type'] = pd.Categorical(data['protocol_type']).codes # Scale numerical features numeric_features = ['packet_size', 'duration', 'src_bytes', 'dst_bytes'] data[numeric_features] = self.scaler.fit_transform(data[numeric_features]) return data def train(self, X, y): """Train the model on network traffic data""" self.model.fit(X, y) def predict(self, X): """Make predictions on new traffic data""" return self.model.predict(X) def save_model(self, path): """Save the trained model""" joblib.dump(self.model, path) def load_model(self, path): """Load a trained model""" self.model = joblib.load(path) def main(): st.title("Network Traffic Analysis Model") # File upload uploaded_file = st.file_uploader("Upload network traffic data (CSV)", type=['csv']) if uploaded_file is not None: # Load and preprocess data data = pd.read_csv(uploaded_file) # Initialize model analyzer = NetworkAnalyzer() # Split features and target X = data.drop('is_anomaly', axis=1) y = data['is_anomaly'] # Preprocess data X_processed = analyzer.preprocess_data(X) # Split data X_train, X_test, y_train, y_test = train_test_split( X_processed, y, test_size=0.2, random_state=42 ) # Train model if st.button('Train Model'): analyzer.train(X_train, y_train) # Make predictions y_pred = analyzer.predict(X_test) # Display results st.write("Model Performance:") st.text(classification_report(y_test, y_pred)) # Save model analyzer.save_model('network_analyzer.joblib') st.success("Model trained and saved successfully!") # Load saved model if st.button('Load Saved Model'): analyzer.load_model('network_analyzer.joblib') st.success("Model loaded successfully!") # Make predictions on new data if st.button('Analyze New Traffic'): predictions = analyzer.predict(X_test) st.write("Analysis Results:") st.write(pd.DataFrame({ 'Prediction': ['Normal' if p == 0 else 'Anomaly' for p in predictions] })) if __name__ == '__main__': main()