Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import classification_report | |
| import joblib | |
| class NetworkAnalyzer: | |
| def __init__(self): | |
| self.model = RandomForestClassifier(n_estimators=100, random_state=42) | |
| self.scaler = StandardScaler() | |
| def preprocess_data(self, data): | |
| """ | |
| Preprocess network traffic data | |
| Features: packet_size, protocol_type, duration, src_bytes, dst_bytes | |
| """ | |
| # Convert categorical features | |
| data['protocol_type'] = pd.Categorical(data['protocol_type']).codes | |
| # Scale numerical features | |
| numeric_features = ['packet_size', 'duration', 'src_bytes', 'dst_bytes'] | |
| data[numeric_features] = self.scaler.fit_transform(data[numeric_features]) | |
| return data | |
| def train(self, X, y): | |
| """Train the model on network traffic data""" | |
| self.model.fit(X, y) | |
| def predict(self, X): | |
| """Make predictions on new traffic data""" | |
| return self.model.predict(X) | |
| def save_model(self, path): | |
| """Save the trained model""" | |
| joblib.dump(self.model, path) | |
| def load_model(self, path): | |
| """Load a trained model""" | |
| self.model = joblib.load(path) | |
| def main(): | |
| st.title("Network Traffic Analysis Model") | |
| # File upload | |
| uploaded_file = st.file_uploader("Upload network traffic data (CSV)", type=['csv']) | |
| if uploaded_file is not None: | |
| # Load and preprocess data | |
| data = pd.read_csv(uploaded_file) | |
| # Initialize model | |
| analyzer = NetworkAnalyzer() | |
| # Split features and target | |
| X = data.drop('is_anomaly', axis=1) | |
| y = data['is_anomaly'] | |
| # Preprocess data | |
| X_processed = analyzer.preprocess_data(X) | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X_processed, y, test_size=0.2, random_state=42 | |
| ) | |
| # Train model | |
| if st.button('Train Model'): | |
| analyzer.train(X_train, y_train) | |
| # Make predictions | |
| y_pred = analyzer.predict(X_test) | |
| # Display results | |
| st.write("Model Performance:") | |
| st.text(classification_report(y_test, y_pred)) | |
| # Save model | |
| analyzer.save_model('network_analyzer.joblib') | |
| st.success("Model trained and saved successfully!") | |
| # Load saved model | |
| if st.button('Load Saved Model'): | |
| analyzer.load_model('network_analyzer.joblib') | |
| st.success("Model loaded successfully!") | |
| # Make predictions on new data | |
| if st.button('Analyze New Traffic'): | |
| predictions = analyzer.predict(X_test) | |
| st.write("Analysis Results:") | |
| st.write(pd.DataFrame({ | |
| 'Prediction': ['Normal' if p == 0 else 'Anomaly' for p in predictions] | |
| })) | |
| if __name__ == '__main__': | |
| main() |