| | import sqlite3
|
| | from datetime import datetime, timedelta
|
| | import pandas as pd
|
| | from sklearn.cluster import KMeans
|
| | from sklearn.pipeline import Pipeline
|
| | from sklearn.compose import ColumnTransformer
|
| | from sklearn.feature_extraction.text import TfidfVectorizer
|
| | from sklearn.preprocessing import StandardScaler, OneHotEncoder
|
| | from sklearn.metrics import silhouette_score
|
| | import matplotlib.pyplot as plt
|
| | import sendgrid
|
| | from sendgrid.helpers.mail import Mail, Email, To, Content
|
| | import os
|
| | from dotenv import load_dotenv
|
| |
|
| |
|
| | load_dotenv()
|
| | SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
|
| | FROM_EMAIL = os.getenv("FROM_EMAIL")
|
| | RECIPIENT_EMAIL = os.getenv("RECIPIENT_EMAIL")
|
| |
|
| |
|
| | db_path = "sqlite:///../../database/db_logsv2.db"
|
| |
|
| |
|
| | class SecurityReport:
|
| | def __init__(
|
| | self,
|
| | db_path=db_path,
|
| | sendgrid_api_key=SENDGRID_API_KEY,
|
| | from_email=FROM_EMAIL,
|
| | recipient_email=RECIPIENT_EMAIL,
|
| | ):
|
| | self.DB_PATH = db_path
|
| | self.sendgrid_api_key = sendgrid_api_key
|
| | self.from_email = from_email
|
| | self.recipient_email = recipient_email
|
| |
|
| | def query_logs(self, day:str = None) -> pd.DataFrame:
|
| | """
|
| | Récupère les logs de la journée sous forme de DataFrame.
|
| |
|
| | Cette fonction effectue les opérations suivantes :
|
| | 1. Détermine les horaires de début et de fin de la journée actuelle.
|
| | 2. Établit une connexion à la base de données SQLite.
|
| | 3. Exécute une requête SQL pour récupérer les logs du jour en effectuant des jointures
|
| | avec les tables `prompt`, `status` et `origin` afin d'obtenir des informations
|
| | détaillées sur chaque log.
|
| | 4. Retourne les données sous forme d'un DataFrame pandas.
|
| |
|
| | Returns:
|
| | pd.DataFrame: Un DataFrame contenant les logs de la journée avec les colonnes suivantes :
|
| | - timestamp: Horodatage du log.
|
| | - prompt: Texte de la requête.
|
| | - response: Réponse associée.
|
| | - status: Statut du log.
|
| | - origin: Adresse IP de l'utilisateur.
|
| | """
|
| |
|
| |
|
| | conn = sqlite3.connect(self.DB_PATH)
|
| | query = """
|
| | SELECT
|
| | log.timestamp AS timestamp,
|
| | prompt.prompt AS prompt,
|
| | prompt.response AS response,
|
| | status.status AS status,
|
| | origin.origin AS origin
|
| | FROM log
|
| | LEFT JOIN prompt ON log.id_prompt = prompt.id_prompt
|
| | LEFT JOIN status ON log.id_status = status.id_status
|
| | LEFT JOIN origin ON log.id_origin = origin.id_origin
|
| | """
|
| |
|
| | params = ()
|
| |
|
| |
|
| | if day:
|
| | start_of_day = datetime.combine(day, datetime.min.time())
|
| | end_of_day = datetime.combine(day, datetime.max.time())
|
| | query += " WHERE log.timestamp BETWEEN ? AND ?"
|
| | params = (start_of_day, end_of_day)
|
| |
|
| |
|
| | df = pd.read_sql_query(query, conn, params=params)
|
| | conn.close()
|
| | df["timestamp"] = df["timestamp"].astype(str)
|
| | df= df.fillna("unknow")
|
| |
|
| | return df
|
| |
|
| | def _create_pipeline(self):
|
| | """
|
| | Crée un pipeline de prétraitement des données pour le clustering.
|
| |
|
| | Cette fonction met en place un pipeline de transformation des données, qui comprend :
|
| | 1. Un pipeline spécifique pour les données textuelles, appliquant une vectorisation TF-IDF
|
| | avec un nombre de caractéristiques limité à 50.
|
| | 2. Un pipeline pour les données catégorielles, appliquant un encodage One-Hot tout en
|
| | ignorant les valeurs inconnues lors de la transformation.
|
| | 3. Une combinaison de ces transformations à l'aide d'un `ColumnTransformer` pour appliquer
|
| | les transformations appropriées aux bonnes colonnes du dataset.
|
| | 4. Un pipeline principal qui applique ces transformations et normalise les données avec
|
| | `StandardScaler` (sans soustraction de la moyenne, car TF-IDF produit des matrices creuses).
|
| |
|
| | Returns:
|
| | sklearn.pipeline.Pipeline: Un pipeline scikit-learn qui prépare les données
|
| | avant leur utilisation en Machine Learning.
|
| | """
|
| |
|
| |
|
| | categorical_features = ["status"]
|
| | text_features = ["timestamp", "prompt", "response", "origin"]
|
| |
|
| |
|
| | text_pipelines = {
|
| | feature: Pipeline([("tfidf", TfidfVectorizer(max_features=50, stop_words=None, analyzer="word"))])
|
| | for feature in text_features
|
| | }
|
| |
|
| |
|
| | cat_pipeline = Pipeline(
|
| | [("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))]
|
| | )
|
| |
|
| |
|
| | preprocessor = ColumnTransformer(
|
| | transformers=[
|
| | ("text_" + feature, text_pipelines[feature], feature)
|
| | for feature in text_features
|
| | ] + [("cat", cat_pipeline, categorical_features)],
|
| | remainder="drop"
|
| | )
|
| |
|
| |
|
| | pipeline = Pipeline(
|
| | [
|
| | ("preprocessor", preprocessor),
|
| | ("scaler", StandardScaler(with_mean=False)),
|
| | ]
|
| | )
|
| |
|
| | return pipeline
|
| |
|
| | def clustering_log(self, max_clusters:int=10) -> int:
|
| | """
|
| | Effectue un clustering sur les logs journaliers et détermine le nombre optimal de clusters.
|
| |
|
| | Cette fonction réalise les étapes suivantes :
|
| | 1. **Initialisation** :
|
| | - Définit les variables pour suivre le meilleur score Silhouette et le nombre optimal de clusters.
|
| | 2. **Récupération des logs journaliers** :
|
| | - Charge les logs du jour via `query_daily_logs()`.
|
| | 3. **Prétraitement des données** :
|
| | - Applique le pipeline de transformation `_create_pipeline()` pour préparer les logs.
|
| | 4. **Clustering avec K-Means** :
|
| | - Teste différentes valeurs de `n_clusters` (de 2 à `max_clusters`).
|
| | - Entraîne un modèle K-Means et calcule le **score de Silhouette** pour mesurer la qualité du clustering.
|
| | - Identifie la valeur de `n_clusters` offrant le meilleur score.
|
| | 5. **Affichage des résultats** :
|
| | - Affiche les scores pour chaque nombre de clusters testé.
|
| | - Retourne le nombre optimal de clusters.
|
| |
|
| | Args:
|
| | max_clusters (int, optional): Nombre maximal de clusters à tester. Par défaut, 10.
|
| |
|
| | Returns:
|
| | int: Le nombre optimal de clusters basé sur le meilleur score Silhouette.
|
| | """
|
| |
|
| |
|
| | best_score = -1
|
| | best_n_clusters = 2
|
| |
|
| |
|
| | logs = self.query_logs()
|
| |
|
| |
|
| | preprocessor = self._create_pipeline()
|
| | logs = preprocessor.fit_transform(logs)
|
| |
|
| |
|
| | for n_clusters in range(2, max_clusters + 1):
|
| | self.model = KMeans(n_clusters=n_clusters, random_state=0)
|
| | self.model.fit(logs)
|
| |
|
| |
|
| | score = silhouette_score(logs, self.model.labels_)
|
| | print(f"Nombre de clusters : {n_clusters}, Silhouette Score : {score:.4f}")
|
| |
|
| |
|
| | if score > best_score:
|
| | best_score = score
|
| | best_n_clusters = n_clusters
|
| |
|
| |
|
| | print(
|
| | f"\nMeilleur nombre de clusters : {best_n_clusters}, Silhouette Score : {best_score:.4f}"
|
| | )
|
| |
|
| | return best_n_clusters
|
| |
|
| | def generate_report(self, logs:pd.DataFrame) -> str:
|
| | """
|
| | Génère un rapport HTML sur les logs journaliers, incluant des statistiques et des résultats de clustering.
|
| |
|
| | Cette fonction effectue les étapes suivantes :
|
| | 1. **Calcul des statistiques** :
|
| | - Récupère la date actuelle.
|
| | - Calcule le nombre total de logs.
|
| | - Effectue un comptage des occurrences de chaque statut dans les logs.
|
| | - Exécute un clustering sur les logs pour déterminer le nombre de comportements différents.
|
| | 2. **Construction du rapport HTML** :
|
| | - Crée une page HTML contenant les statistiques sous forme de texte et de liste.
|
| | - Ajoute un titre, les informations de répartition des statuts, et le nombre de clusters détectés.
|
| | - Applique un style simple pour rendre le rapport lisible et structuré.
|
| | 3. **Retourne le rapport sous forme de chaîne HTML** :
|
| | - Le rapport est sous forme de code HTML prêt à être envoyé ou affiché.
|
| |
|
| | Args:
|
| | logs (pd.DataFrame): Un DataFrame contenant les logs à analyser, avec au moins une colonne `status`.
|
| |
|
| | Returns:
|
| | str: Un rapport HTML sous forme de chaîne de caractères.
|
| | """
|
| |
|
| |
|
| | date_str = datetime.now().strftime("%d/%m/%Y")
|
| |
|
| |
|
| | total_logs = len(logs)
|
| |
|
| |
|
| | status_counts = logs["status"].value_counts().to_dict()
|
| |
|
| |
|
| | n_clusters = self.clustering_log()
|
| |
|
| |
|
| | status_html = "".join(
|
| | f"<li><strong>{status}:</strong> {count}</li>"
|
| | for status, count in status_counts.items()
|
| | )
|
| |
|
| |
|
| | report = f"""
|
| | <html>
|
| | <head>
|
| | <style>
|
| | body {{
|
| | font-family: Arial, sans-serif;
|
| | color: #333;
|
| | line-height: 1.6;
|
| | }}
|
| | .container {{
|
| | max-width: 600px;
|
| | margin: 20px auto;
|
| | padding: 20px;
|
| | border: 1px solid #ddd;
|
| | border-radius: 8px;
|
| | background-color: #f9f9f9;
|
| | }}
|
| | h2 {{
|
| | background-color: #007BFF;
|
| | color: white;
|
| | padding: 10px;
|
| | border-radius: 5px;
|
| | text-align: center;
|
| | }}
|
| | ul {{
|
| | list-style-type: none;
|
| | padding: 0;
|
| | }}
|
| | li {{
|
| | padding: 5px 0;
|
| | }}
|
| | .footer {{
|
| | margin-top: 20px;
|
| | font-size: 12px;
|
| | text-align: center;
|
| | color: #777;
|
| | }}
|
| | </style>
|
| | </head>
|
| | <body>
|
| | <div class="container">
|
| | <h2>📋 Rapport de Sécurité - {date_str}</h2>
|
| | <p><u><strong>Nombre total de logs :</strong></u> {total_logs}</p>
|
| | <p><u><strong>Répartition des statuts :</strong></u></p>
|
| | <ul>
|
| | {status_html}
|
| | </ul>
|
| | <p><u><strong>🔍 Nombre de comportements différents détectés :</strong></u> {n_clusters}</p>
|
| | <div class="footer">
|
| | Rapport généré avec amour et passion par le système de surveillance. 🫶 🛡️
|
| | </div>
|
| | </div>
|
| | </body>
|
| | </html>
|
| | """
|
| |
|
| | return report
|
| |
|
| | def send_email(self, subject, body):
|
| | """
|
| | Envoie un email en utilisant l'API SendGrid.
|
| |
|
| | Cette fonction réalise les étapes suivantes :
|
| | 1. **Initialisation de l'API SendGrid** :
|
| | - Utilise la clé API de SendGrid (`self.sendgrid_api_key`) pour configurer l'accès à l'API.
|
| | 2. **Préparation du contenu de l'email** :
|
| | - Définit l'expéditeur (`from_email`), le destinataire (`to_email`), le sujet (`subject`)
|
| | et le corps de l'email (`body`), qui est en format HTML.
|
| | 3. **Envoi de l'email** :
|
| | - Envoie l'email via l'API SendGrid en utilisant la méthode `send.post`.
|
| | 4. **Gestion des erreurs** :
|
| | - Si l'envoi échoue, un message d'erreur est affiché.
|
| |
|
| | Args:
|
| | subject (str): Le sujet de l'email.
|
| | body (str): Le contenu de l'email en format HTML.
|
| |
|
| | Returns:
|
| | None: Si l'email est envoyé avec succès, aucun retour n'est généré,
|
| | sinon un message d'erreur est imprimé.
|
| | """
|
| |
|
| |
|
| | sg = sendgrid.SendGridAPIClient(api_key=self.sendgrid_api_key)
|
| |
|
| |
|
| | from_email = Email(self.from_email)
|
| | to_email = To(self.recipient_email)
|
| | content = Content("text/html", body)
|
| |
|
| |
|
| | mail = Mail(from_email, to_email, subject, content)
|
| |
|
| | try:
|
| |
|
| | response = sg.client.mail.send.post(request_body=mail.get())
|
| | print(f"Email envoyé avec succès: {response.status_code}")
|
| | except Exception as e:
|
| |
|
| | print(f"Erreur, email non-envoyé: {e}")
|
| |
|
| | def run_report(self):
|
| | """
|
| | Exécute le rapport journalier de sécurité et l'envoie par email.
|
| |
|
| | Cette fonction réalise les étapes suivantes :
|
| | 1. **Récupération des logs journaliers** :
|
| | - Utilise la méthode `query_daily_logs()` pour obtenir les logs du jour à analyser.
|
| | 2. **Génération du rapport** :
|
| | - Utilise la méthode `generate_report()` pour créer un rapport HTML contenant les statistiques et autres informations pertinentes sur les logs.
|
| | 3. **Envoi du rapport par email** :
|
| | - Utilise la méthode `send_email()` pour envoyer l'email avec le rapport généré en pièce jointe dans le corps du message.
|
| |
|
| | Returns:
|
| | None: Cette fonction n'a pas de valeur de retour. Elle exécute des actions (générer et envoyer un rapport).
|
| | """
|
| |
|
| |
|
| | logs = self.query_logs()
|
| |
|
| |
|
| | report = self.generate_report(logs)
|
| |
|
| |
|
| | self.send_email(subject="Rapport de sécurité journalier", body=report)
|
| |
|