Spaces:
Runtime error
Runtime error
| import os | |
| import joblib | |
| import pefile | |
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| import hashlib | |
| import traceback | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, recall_score | |
| # Chemin vers le modèle sauvegardé | |
| MODEL_PATH = 'random_forest_model.pkl' | |
| # Fonction pour entraîner et sauvegarder le modèle | |
| def train_and_save_model(): | |
| """Entraîner et sauvegarder le modèle si nécessaire.""" | |
| st.write("Aucun modèle trouvé. Entraînement en cours...") | |
| # Chargement des données | |
| data = pd.read_csv("DatasetmalwareExtrait.csv") | |
| # Traitement des données | |
| X = data.drop(['legitimate'], axis=1) | |
| y = data['legitimate'] | |
| # Entraînement du modèle | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) | |
| model = RandomForestClassifier( | |
| n_estimators=196, | |
| random_state=42, | |
| criterion="gini", | |
| max_depth=25, | |
| min_samples_split=4, | |
| min_samples_leaf=1 | |
| ) | |
| model.fit(X_train, y_train) | |
| # Évaluation du modèle | |
| y_pred = model.predict(X_test) | |
| accuracy = accuracy_score(y_test, y_pred) | |
| recall = recall_score(y_test, y_pred, average='weighted') | |
| st.write(f"Précision du modèle supervisé : {accuracy:.3f}") | |
| st.write(f"Rappel du modèle supervisé : {recall:.3f}") | |
| # Sauvegarde du modèle | |
| joblib.dump(model, MODEL_PATH) | |
| st.write(f"Modèle sauvegardé sous : {MODEL_PATH}") | |
| return model | |
| # Chargement ou entraînement du modèle | |
| if os.path.exists(MODEL_PATH): | |
| st.write("Chargement du modèle existant...") | |
| model = joblib.load(MODEL_PATH) | |
| else: | |
| model = train_and_save_model() | |
| # Fonction pour calculer le hash d'un fichier | |
| def calculate_file_hash(file_path): | |
| """Calculer le hash SHA-256 du fichier.""" | |
| sha256_hash = hashlib.sha256() | |
| with open(file_path, "rb") as f: | |
| for byte_block in iter(lambda: f.read(4096), b""): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| # Fonction pour extraire les attributs PE | |
| def extract_pe_attributes(file_path): | |
| """Extraction avancée des attributs du fichier PE.""" | |
| try: | |
| pe = pefile.PE(file_path) | |
| attributes = { | |
| 'AddressOfEntryPoint': pe.OPTIONAL_HEADER.AddressOfEntryPoint, | |
| 'MajorLinkerVersion': pe.OPTIONAL_HEADER.MajorLinkerVersion, | |
| 'MajorImageVersion': pe.OPTIONAL_HEADER.MajorImageVersion, | |
| 'MajorOperatingSystemVersion': pe.OPTIONAL_HEADER.MajorOperatingSystemVersion, | |
| 'DllCharacteristics': pe.OPTIONAL_HEADER.DllCharacteristics, | |
| 'SizeOfStackReserve': pe.OPTIONAL_HEADER.SizeOfStackReserve, | |
| 'NumberOfSections': pe.FILE_HEADER.NumberOfSections, | |
| 'ResourceSize': pe.OPTIONAL_HEADER.DATA_DIRECTORY[2].Size | |
| } | |
| return attributes | |
| except Exception as e: | |
| st.error(f"Erreur de traitement du fichier {file_path}: {str(e)}") | |
| return {"Erreur": str(e)} | |
| # Fonction de prédiction | |
| def predict_malware(file): | |
| """Prédiction de malware avec gestion d'erreurs.""" | |
| if model is None: | |
| return "Erreur : Modèle non chargé" | |
| try: | |
| # Sauvegarde temporaire du fichier | |
| temp_file = f"temp_{file.name}" | |
| with open(temp_file, "wb") as f: | |
| f.write(file.read()) | |
| # Extraire les attributs du fichier | |
| attributes = extract_pe_attributes(temp_file) | |
| if "Erreur" in attributes: | |
| return attributes["Erreur"] | |
| # Convertir en DataFrame | |
| df = pd.DataFrame([attributes]) | |
| # Prédiction | |
| prediction = model.predict(df) | |
| proba = model.predict_proba(df)[0] | |
| # Résultat avec probabilité | |
| if prediction[0] == 1: | |
| result = f"🚨 MALWARE (Probabilité: {proba[1] * 100:.2f}%)" | |
| else: | |
| result = f"✅ Fichier Légitime (Probabilité: {proba[0] * 100:.2f}%)" | |
| # Suppression du fichier temporaire | |
| os.remove(temp_file) | |
| return result | |
| except Exception as e: | |
| return f"Erreur d'analyse : {str(e)}" | |
| # Interface Streamlit | |
| st.title("🛡️ Détecteur de Malwares") | |
| st.write("Téléchargez un fichier exécutable pour analyser s'il est légitime ou un malware.") | |
| uploaded_file = st.file_uploader("Télécharger un fichier exécutable (.exe, .dll, .sys)", type=["exe", "dll", "sys"]) | |
| if uploaded_file is not None: | |
| st.write("Analyse en cours...") | |
| result = predict_malware(uploaded_file) | |
| st.success(result) | |