import sqlite3 from datetime import datetime, timedelta import pandas as pd from sklearn.cluster import KMeans from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.metrics import silhouette_score import matplotlib.pyplot as plt import sendgrid from sendgrid.helpers.mail import Mail, Email, To, Content import os from dotenv import load_dotenv # Charger les variabels d'environnement load_dotenv() SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY") FROM_EMAIL = os.getenv("FROM_EMAIL") RECIPIENT_EMAIL = os.getenv("RECIPIENT_EMAIL") # Chemin vers la DB db_path = "sqlite:///../../database/db_logsv2.db" class SecurityReport: def __init__( self, db_path=db_path, sendgrid_api_key=SENDGRID_API_KEY, from_email=FROM_EMAIL, recipient_email=RECIPIENT_EMAIL, ): self.DB_PATH = db_path self.sendgrid_api_key = sendgrid_api_key self.from_email = from_email self.recipient_email = recipient_email def query_logs(self, day:str = None) -> pd.DataFrame: """ Récupère les logs de la journée sous forme de DataFrame. Cette fonction effectue les opérations suivantes : 1. Détermine les horaires de début et de fin de la journée actuelle. 2. Établit une connexion à la base de données SQLite. 3. Exécute une requête SQL pour récupérer les logs du jour en effectuant des jointures avec les tables `prompt`, `status` et `origin` afin d'obtenir des informations détaillées sur chaque log. 4. Retourne les données sous forme d'un DataFrame pandas. Returns: pd.DataFrame: Un DataFrame contenant les logs de la journée avec les colonnes suivantes : - timestamp: Horodatage du log. - prompt: Texte de la requête. - response: Réponse associée. - status: Statut du log. - origin: Adresse IP de l'utilisateur. """ # Connection à la BDD et requete conn = sqlite3.connect(self.DB_PATH) query = """ SELECT log.timestamp AS timestamp, prompt.prompt AS prompt, prompt.response AS response, status.status AS status, origin.origin AS origin FROM log LEFT JOIN prompt ON log.id_prompt = prompt.id_prompt LEFT JOIN status ON log.id_status = status.id_status LEFT JOIN origin ON log.id_origin = origin.id_origin """ # Initialisation des paramètres params = () # Initialisation des horaires si date présente if day: start_of_day = datetime.combine(day, datetime.min.time()) end_of_day = datetime.combine(day, datetime.max.time()) query += " WHERE log.timestamp BETWEEN ? AND ?" params = (start_of_day, end_of_day) # Logs récupérés au format DataFrame df = pd.read_sql_query(query, conn, params=params) conn.close() df["timestamp"] = df["timestamp"].astype(str) df= df.fillna("unknow") return df def _create_pipeline(self): """ Crée un pipeline de prétraitement des données pour le clustering. Cette fonction met en place un pipeline de transformation des données, qui comprend : 1. Un pipeline spécifique pour les données textuelles, appliquant une vectorisation TF-IDF avec un nombre de caractéristiques limité à 50. 2. Un pipeline pour les données catégorielles, appliquant un encodage One-Hot tout en ignorant les valeurs inconnues lors de la transformation. 3. Une combinaison de ces transformations à l'aide d'un `ColumnTransformer` pour appliquer les transformations appropriées aux bonnes colonnes du dataset. 4. Un pipeline principal qui applique ces transformations et normalise les données avec `StandardScaler` (sans soustraction de la moyenne, car TF-IDF produit des matrices creuses). Returns: sklearn.pipeline.Pipeline: Un pipeline scikit-learn qui prépare les données avant leur utilisation en Machine Learning. """ # Colonnes catégorielles et textuelles categorical_features = ["status"] text_features = ["timestamp", "prompt", "response", "origin"] # Pipeline pour les données textuelles text_pipelines = { feature: Pipeline([("tfidf", TfidfVectorizer(max_features=50, stop_words=None, analyzer="word"))]) for feature in text_features } # Pipeline pour les données catgéorielles cat_pipeline = Pipeline( [("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))] ) # Combinaison des méthodes preprocessor = ColumnTransformer( transformers=[ ("text_" + feature, text_pipelines[feature], feature) for feature in text_features ] + [("cat", cat_pipeline, categorical_features)], remainder="drop" ) # Pipeline principale pipeline = Pipeline( [ ("preprocessor", preprocessor), ("scaler", StandardScaler(with_mean=False)), ] ) return pipeline def clustering_log(self, max_clusters:int=10) -> int: """ Effectue un clustering sur les logs journaliers et détermine le nombre optimal de clusters. Cette fonction réalise les étapes suivantes : 1. **Initialisation** : - Définit les variables pour suivre le meilleur score Silhouette et le nombre optimal de clusters. 2. **Récupération des logs journaliers** : - Charge les logs du jour via `query_daily_logs()`. 3. **Prétraitement des données** : - Applique le pipeline de transformation `_create_pipeline()` pour préparer les logs. 4. **Clustering avec K-Means** : - Teste différentes valeurs de `n_clusters` (de 2 à `max_clusters`). - Entraîne un modèle K-Means et calcule le **score de Silhouette** pour mesurer la qualité du clustering. - Identifie la valeur de `n_clusters` offrant le meilleur score. 5. **Affichage des résultats** : - Affiche les scores pour chaque nombre de clusters testé. - Retourne le nombre optimal de clusters. Args: max_clusters (int, optional): Nombre maximal de clusters à tester. Par défaut, 10. Returns: int: Le nombre optimal de clusters basé sur le meilleur score Silhouette. """ # Initialisation des paramètres best_score = -1 # Score Silhouette le plus élevé trouvé best_n_clusters = 2 # Nombre optimal de clusters # Récupère les logs journaliers logs = self.query_logs() # Prétraitement des logs preprocessor = self._create_pipeline() logs = preprocessor.fit_transform(logs) # Teste plusieurs nombres de clusters pour identifier le meilleur for n_clusters in range(2, max_clusters + 1): self.model = KMeans(n_clusters=n_clusters, random_state=0) self.model.fit(logs) # Calcul du score de Silhouette score = silhouette_score(logs, self.model.labels_) print(f"Nombre de clusters : {n_clusters}, Silhouette Score : {score:.4f}") # Mise à jour du meilleur score et du meilleur nombre de clusters if score > best_score: best_score = score best_n_clusters = n_clusters # Affichage du meilleur nombre de clusters print( f"\nMeilleur nombre de clusters : {best_n_clusters}, Silhouette Score : {best_score:.4f}" ) return best_n_clusters def generate_report(self, logs:pd.DataFrame) -> str: """ Génère un rapport HTML sur les logs journaliers, incluant des statistiques et des résultats de clustering. Cette fonction effectue les étapes suivantes : 1. **Calcul des statistiques** : - Récupère la date actuelle. - Calcule le nombre total de logs. - Effectue un comptage des occurrences de chaque statut dans les logs. - Exécute un clustering sur les logs pour déterminer le nombre de comportements différents. 2. **Construction du rapport HTML** : - Crée une page HTML contenant les statistiques sous forme de texte et de liste. - Ajoute un titre, les informations de répartition des statuts, et le nombre de clusters détectés. - Applique un style simple pour rendre le rapport lisible et structuré. 3. **Retourne le rapport sous forme de chaîne HTML** : - Le rapport est sous forme de code HTML prêt à être envoyé ou affiché. Args: logs (pd.DataFrame): Un DataFrame contenant les logs à analyser, avec au moins une colonne `status`. Returns: str: Un rapport HTML sous forme de chaîne de caractères. """ # Récupération de la date actuelle sous format dd/mm/yyyy date_str = datetime.now().strftime("%d/%m/%Y") # Nombre total de logs total_logs = len(logs) # Comptage des occurrences de chaque statut status_counts = logs["status"].value_counts().to_dict() # Exécution du clustering pour obtenir le nombre de comportements différents détectés n_clusters = self.clustering_log() # Création d'une liste HTML des statuts et de leurs occurrences status_html = "".join( f"
Nombre total de logs : {total_logs}
Répartition des statuts :
🔍 Nombre de comportements différents détectés : {n_clusters}