import os import joblib import pefile import numpy as np import pandas as pd import streamlit as st import hashlib import traceback from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, recall_score # Chemin vers le modèle sauvegardé MODEL_PATH = 'random_forest_model.pkl' # Fonction pour entraîner et sauvegarder le modèle def train_and_save_model(): """Entraîner et sauvegarder le modèle si nécessaire.""" st.write("Aucun modèle trouvé. Entraînement en cours...") # Chargement des données data = pd.read_csv("DatasetmalwareExtrait.csv") # Traitement des données X = data.drop(['legitimate'], axis=1) y = data['legitimate'] # Entraînement du modèle X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) model = RandomForestClassifier( n_estimators=196, random_state=42, criterion="gini", max_depth=25, min_samples_split=4, min_samples_leaf=1 ) model.fit(X_train, y_train) # Évaluation du modèle y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) recall = recall_score(y_test, y_pred, average='weighted') st.write(f"Précision du modèle supervisé : {accuracy:.3f}") st.write(f"Rappel du modèle supervisé : {recall:.3f}") # Sauvegarde du modèle joblib.dump(model, MODEL_PATH) st.write(f"Modèle sauvegardé sous : {MODEL_PATH}") return model # Chargement ou entraînement du modèle if os.path.exists(MODEL_PATH): st.write("Chargement du modèle existant...") model = joblib.load(MODEL_PATH) else: model = train_and_save_model() # Fonction pour calculer le hash d'un fichier def calculate_file_hash(file_path): """Calculer le hash SHA-256 du fichier.""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() # Fonction pour extraire les attributs PE def extract_pe_attributes(file_path): """Extraction avancée des attributs du fichier PE.""" try: pe = pefile.PE(file_path) attributes = { 'AddressOfEntryPoint': pe.OPTIONAL_HEADER.AddressOfEntryPoint, 'MajorLinkerVersion': pe.OPTIONAL_HEADER.MajorLinkerVersion, 'MajorImageVersion': pe.OPTIONAL_HEADER.MajorImageVersion, 'MajorOperatingSystemVersion': pe.OPTIONAL_HEADER.MajorOperatingSystemVersion, 'DllCharacteristics': pe.OPTIONAL_HEADER.DllCharacteristics, 'SizeOfStackReserve': pe.OPTIONAL_HEADER.SizeOfStackReserve, 'NumberOfSections': pe.FILE_HEADER.NumberOfSections, 'ResourceSize': pe.OPTIONAL_HEADER.DATA_DIRECTORY[2].Size } return attributes except Exception as e: st.error(f"Erreur de traitement du fichier {file_path}: {str(e)}") return {"Erreur": str(e)} # Fonction de prédiction def predict_malware(file): """Prédiction de malware avec gestion d'erreurs.""" if model is None: return "Erreur : Modèle non chargé" try: # Sauvegarde temporaire du fichier temp_file = f"temp_{file.name}" with open(temp_file, "wb") as f: f.write(file.read()) # Extraire les attributs du fichier attributes = extract_pe_attributes(temp_file) if "Erreur" in attributes: return attributes["Erreur"] # Convertir en DataFrame df = pd.DataFrame([attributes]) # Prédiction prediction = model.predict(df) proba = model.predict_proba(df)[0] # Résultat avec probabilité if prediction[0] == 1: result = f"🚨 MALWARE (Probabilité: {proba[1] * 100:.2f}%)" else: result = f"✅ Fichier Légitime (Probabilité: {proba[0] * 100:.2f}%)" # Suppression du fichier temporaire os.remove(temp_file) return result except Exception as e: return f"Erreur d'analyse : {str(e)}" # Interface Streamlit st.title("🛡️ Détecteur de Malwares") st.write("Téléchargez un fichier exécutable pour analyser s'il est légitime ou un malware.") uploaded_file = st.file_uploader("Télécharger un fichier exécutable (.exe, .dll, .sys)", type=["exe", "dll", "sys"]) if uploaded_file is not None: st.write("Analyse en cours...") result = predict_malware(uploaded_file) st.success(result)