SmartRescue / src /security /security_report.py
maxenceLIOGIER's picture
Upload folder using huggingface_hub
c642aa9 verified
raw
history blame
15.5 kB
import sqlite3
from datetime import datetime, timedelta
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
import sendgrid
from sendgrid.helpers.mail import Mail, Email, To, Content
import os
from dotenv import load_dotenv
# Charger les variabels d'environnement
load_dotenv()
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
FROM_EMAIL = os.getenv("FROM_EMAIL")
RECIPIENT_EMAIL = os.getenv("RECIPIENT_EMAIL")
# Chemin vers la DB
db_path = "sqlite:///../../database/db_logsv2.db"
class SecurityReport:
def __init__(
self,
db_path=db_path,
sendgrid_api_key=SENDGRID_API_KEY,
from_email=FROM_EMAIL,
recipient_email=RECIPIENT_EMAIL,
):
self.DB_PATH = db_path
self.sendgrid_api_key = sendgrid_api_key
self.from_email = from_email
self.recipient_email = recipient_email
def query_logs(self, day:str = None) -> pd.DataFrame:
"""
Récupère les logs de la journée sous forme de DataFrame.
Cette fonction effectue les opérations suivantes :
1. Détermine les horaires de début et de fin de la journée actuelle.
2. Établit une connexion à la base de données SQLite.
3. Exécute une requête SQL pour récupérer les logs du jour en effectuant des jointures
avec les tables `prompt`, `status` et `origin` afin d'obtenir des informations
détaillées sur chaque log.
4. Retourne les données sous forme d'un DataFrame pandas.
Returns:
pd.DataFrame: Un DataFrame contenant les logs de la journée avec les colonnes suivantes :
- timestamp: Horodatage du log.
- prompt: Texte de la requête.
- response: Réponse associée.
- status: Statut du log.
- origin: Adresse IP de l'utilisateur.
"""
# Connection à la BDD et requete
conn = sqlite3.connect(self.DB_PATH)
query = """
SELECT
log.timestamp AS timestamp,
prompt.prompt AS prompt,
prompt.response AS response,
status.status AS status,
origin.origin AS origin
FROM log
LEFT JOIN prompt ON log.id_prompt = prompt.id_prompt
LEFT JOIN status ON log.id_status = status.id_status
LEFT JOIN origin ON log.id_origin = origin.id_origin
"""
# Initialisation des paramètres
params = ()
# Initialisation des horaires si date présente
if day:
start_of_day = datetime.combine(day, datetime.min.time())
end_of_day = datetime.combine(day, datetime.max.time())
query += " WHERE log.timestamp BETWEEN ? AND ?"
params = (start_of_day, end_of_day)
# Logs récupérés au format DataFrame
df = pd.read_sql_query(query, conn, params=params)
conn.close()
df["timestamp"] = df["timestamp"].astype(str)
df= df.fillna("unknow")
return df
def _create_pipeline(self):
"""
Crée un pipeline de prétraitement des données pour le clustering.
Cette fonction met en place un pipeline de transformation des données, qui comprend :
1. Un pipeline spécifique pour les données textuelles, appliquant une vectorisation TF-IDF
avec un nombre de caractéristiques limité à 50.
2. Un pipeline pour les données catégorielles, appliquant un encodage One-Hot tout en
ignorant les valeurs inconnues lors de la transformation.
3. Une combinaison de ces transformations à l'aide d'un `ColumnTransformer` pour appliquer
les transformations appropriées aux bonnes colonnes du dataset.
4. Un pipeline principal qui applique ces transformations et normalise les données avec
`StandardScaler` (sans soustraction de la moyenne, car TF-IDF produit des matrices creuses).
Returns:
sklearn.pipeline.Pipeline: Un pipeline scikit-learn qui prépare les données
avant leur utilisation en Machine Learning.
"""
# Colonnes catégorielles et textuelles
categorical_features = ["status"]
text_features = ["timestamp", "prompt", "response", "origin"]
# Pipeline pour les données textuelles
text_pipelines = {
feature: Pipeline([("tfidf", TfidfVectorizer(max_features=50, stop_words=None, analyzer="word"))])
for feature in text_features
}
# Pipeline pour les données catgéorielles
cat_pipeline = Pipeline(
[("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))]
)
# Combinaison des méthodes
preprocessor = ColumnTransformer(
transformers=[
("text_" + feature, text_pipelines[feature], feature)
for feature in text_features
] + [("cat", cat_pipeline, categorical_features)],
remainder="drop"
)
# Pipeline principale
pipeline = Pipeline(
[
("preprocessor", preprocessor),
("scaler", StandardScaler(with_mean=False)),
]
)
return pipeline
def clustering_log(self, max_clusters:int=10) -> int:
"""
Effectue un clustering sur les logs journaliers et détermine le nombre optimal de clusters.
Cette fonction réalise les étapes suivantes :
1. **Initialisation** :
- Définit les variables pour suivre le meilleur score Silhouette et le nombre optimal de clusters.
2. **Récupération des logs journaliers** :
- Charge les logs du jour via `query_daily_logs()`.
3. **Prétraitement des données** :
- Applique le pipeline de transformation `_create_pipeline()` pour préparer les logs.
4. **Clustering avec K-Means** :
- Teste différentes valeurs de `n_clusters` (de 2 à `max_clusters`).
- Entraîne un modèle K-Means et calcule le **score de Silhouette** pour mesurer la qualité du clustering.
- Identifie la valeur de `n_clusters` offrant le meilleur score.
5. **Affichage des résultats** :
- Affiche les scores pour chaque nombre de clusters testé.
- Retourne le nombre optimal de clusters.
Args:
max_clusters (int, optional): Nombre maximal de clusters à tester. Par défaut, 10.
Returns:
int: Le nombre optimal de clusters basé sur le meilleur score Silhouette.
"""
# Initialisation des paramètres
best_score = -1 # Score Silhouette le plus élevé trouvé
best_n_clusters = 2 # Nombre optimal de clusters
# Récupère les logs journaliers
logs = self.query_logs()
# Prétraitement des logs
preprocessor = self._create_pipeline()
logs = preprocessor.fit_transform(logs)
# Teste plusieurs nombres de clusters pour identifier le meilleur
for n_clusters in range(2, max_clusters + 1):
self.model = KMeans(n_clusters=n_clusters, random_state=0)
self.model.fit(logs)
# Calcul du score de Silhouette
score = silhouette_score(logs, self.model.labels_)
print(f"Nombre de clusters : {n_clusters}, Silhouette Score : {score:.4f}")
# Mise à jour du meilleur score et du meilleur nombre de clusters
if score > best_score:
best_score = score
best_n_clusters = n_clusters
# Affichage du meilleur nombre de clusters
print(
f"\nMeilleur nombre de clusters : {best_n_clusters}, Silhouette Score : {best_score:.4f}"
)
return best_n_clusters
def generate_report(self, logs:pd.DataFrame) -> str:
"""
Génère un rapport HTML sur les logs journaliers, incluant des statistiques et des résultats de clustering.
Cette fonction effectue les étapes suivantes :
1. **Calcul des statistiques** :
- Récupère la date actuelle.
- Calcule le nombre total de logs.
- Effectue un comptage des occurrences de chaque statut dans les logs.
- Exécute un clustering sur les logs pour déterminer le nombre de comportements différents.
2. **Construction du rapport HTML** :
- Crée une page HTML contenant les statistiques sous forme de texte et de liste.
- Ajoute un titre, les informations de répartition des statuts, et le nombre de clusters détectés.
- Applique un style simple pour rendre le rapport lisible et structuré.
3. **Retourne le rapport sous forme de chaîne HTML** :
- Le rapport est sous forme de code HTML prêt à être envoyé ou affiché.
Args:
logs (pd.DataFrame): Un DataFrame contenant les logs à analyser, avec au moins une colonne `status`.
Returns:
str: Un rapport HTML sous forme de chaîne de caractères.
"""
# Récupération de la date actuelle sous format dd/mm/yyyy
date_str = datetime.now().strftime("%d/%m/%Y")
# Nombre total de logs
total_logs = len(logs)
# Comptage des occurrences de chaque statut
status_counts = logs["status"].value_counts().to_dict()
# Exécution du clustering pour obtenir le nombre de comportements différents détectés
n_clusters = self.clustering_log()
# Création d'une liste HTML des statuts et de leurs occurrences
status_html = "".join(
f"<li><strong>{status}:</strong> {count}</li>"
for status, count in status_counts.items()
)
# Construction du rapport HTML
report = f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
color: #333;
line-height: 1.6;
}}
.container {{
max-width: 600px;
margin: 20px auto;
padding: 20px;
border: 1px solid #ddd;
border-radius: 8px;
background-color: #f9f9f9;
}}
h2 {{
background-color: #007BFF;
color: white;
padding: 10px;
border-radius: 5px;
text-align: center;
}}
ul {{
list-style-type: none;
padding: 0;
}}
li {{
padding: 5px 0;
}}
.footer {{
margin-top: 20px;
font-size: 12px;
text-align: center;
color: #777;
}}
</style>
</head>
<body>
<div class="container">
<h2>📋 Rapport de Sécurité - {date_str}</h2>
<p><u><strong>Nombre total de logs :</strong></u> {total_logs}</p>
<p><u><strong>Répartition des statuts :</strong></u></p>
<ul>
{status_html}
</ul>
<p><u><strong>🔍 Nombre de comportements différents détectés :</strong></u> {n_clusters}</p>
<div class="footer">
Rapport généré avec amour et passion par le système de surveillance. 🫶 🛡️
</div>
</div>
</body>
</html>
"""
return report
def send_email(self, subject, body):
"""
Envoie un email en utilisant l'API SendGrid.
Cette fonction réalise les étapes suivantes :
1. **Initialisation de l'API SendGrid** :
- Utilise la clé API de SendGrid (`self.sendgrid_api_key`) pour configurer l'accès à l'API.
2. **Préparation du contenu de l'email** :
- Définit l'expéditeur (`from_email`), le destinataire (`to_email`), le sujet (`subject`)
et le corps de l'email (`body`), qui est en format HTML.
3. **Envoi de l'email** :
- Envoie l'email via l'API SendGrid en utilisant la méthode `send.post`.
4. **Gestion des erreurs** :
- Si l'envoi échoue, un message d'erreur est affiché.
Args:
subject (str): Le sujet de l'email.
body (str): Le contenu de l'email en format HTML.
Returns:
None: Si l'email est envoyé avec succès, aucun retour n'est généré,
sinon un message d'erreur est imprimé.
"""
# Initialisation de l'API SendGrid
sg = sendgrid.SendGridAPIClient(api_key=self.sendgrid_api_key)
# Création des objets pour l'expéditeur, le destinataire et le contenu
from_email = Email(self.from_email)
to_email = To(self.recipient_email)
content = Content("text/html", body)
# Création de l'objet Mail avec les informations nécessaires
mail = Mail(from_email, to_email, subject, content)
try:
# Envoi de l'email via l'API SendGrid
response = sg.client.mail.send.post(request_body=mail.get())
print(f"Email envoyé avec succès: {response.status_code}")
except Exception as e:
# Si une erreur survient, affichage du message d'erreur
print(f"Erreur, email non-envoyé: {e}")
def run_report(self):
"""
Exécute le rapport journalier de sécurité et l'envoie par email.
Cette fonction réalise les étapes suivantes :
1. **Récupération des logs journaliers** :
- Utilise la méthode `query_daily_logs()` pour obtenir les logs du jour à analyser.
2. **Génération du rapport** :
- Utilise la méthode `generate_report()` pour créer un rapport HTML contenant les statistiques et autres informations pertinentes sur les logs.
3. **Envoi du rapport par email** :
- Utilise la méthode `send_email()` pour envoyer l'email avec le rapport généré en pièce jointe dans le corps du message.
Returns:
None: Cette fonction n'a pas de valeur de retour. Elle exécute des actions (générer et envoyer un rapport).
"""
# Récupérer les logs de la journée
logs = self.query_logs()
# Générer un rapport à partir des logs récupérés
report = self.generate_report(logs)
# Envoi du rapport par email
self.send_email(subject="Rapport de sécurité journalier", body=report)