Cyber_Security / app.py
shaheerawan3's picture
Create app.py
6aca930 verified
import streamlit as st
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
import joblib
class NetworkAnalyzer:
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
def preprocess_data(self, data):
"""
Preprocess network traffic data
Features: packet_size, protocol_type, duration, src_bytes, dst_bytes
"""
# Convert categorical features
data['protocol_type'] = pd.Categorical(data['protocol_type']).codes
# Scale numerical features
numeric_features = ['packet_size', 'duration', 'src_bytes', 'dst_bytes']
data[numeric_features] = self.scaler.fit_transform(data[numeric_features])
return data
def train(self, X, y):
"""Train the model on network traffic data"""
self.model.fit(X, y)
def predict(self, X):
"""Make predictions on new traffic data"""
return self.model.predict(X)
def save_model(self, path):
"""Save the trained model"""
joblib.dump(self.model, path)
def load_model(self, path):
"""Load a trained model"""
self.model = joblib.load(path)
def main():
st.title("Network Traffic Analysis Model")
# File upload
uploaded_file = st.file_uploader("Upload network traffic data (CSV)", type=['csv'])
if uploaded_file is not None:
# Load and preprocess data
data = pd.read_csv(uploaded_file)
# Initialize model
analyzer = NetworkAnalyzer()
# Split features and target
X = data.drop('is_anomaly', axis=1)
y = data['is_anomaly']
# Preprocess data
X_processed = analyzer.preprocess_data(X)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X_processed, y, test_size=0.2, random_state=42
)
# Train model
if st.button('Train Model'):
analyzer.train(X_train, y_train)
# Make predictions
y_pred = analyzer.predict(X_test)
# Display results
st.write("Model Performance:")
st.text(classification_report(y_test, y_pred))
# Save model
analyzer.save_model('network_analyzer.joblib')
st.success("Model trained and saved successfully!")
# Load saved model
if st.button('Load Saved Model'):
analyzer.load_model('network_analyzer.joblib')
st.success("Model loaded successfully!")
# Make predictions on new data
if st.button('Analyze New Traffic'):
predictions = analyzer.predict(X_test)
st.write("Analysis Results:")
st.write(pd.DataFrame({
'Prediction': ['Normal' if p == 0 else 'Anomaly' for p in predictions]
}))
if __name__ == '__main__':
main()