""" Module d'analyse prédictive et de détection d'anomalies pour AfriDataHub Created by Marino ATOHOUN - AfriDataHub Platform """ import numpy as np import pandas as pd from datetime import datetime, timedelta from django.utils import timezone from django.db.models import Q, Avg, Count from datasets.models import Dataset, DataPoint, Alert import logging logger = logging.getLogger('afridatahub') class TrendAnalyzer: """Analyseur de tendances pour les données temporelles""" def __init__(self): self.min_data_points = 3 # Minimum de points pour analyser une tendance def analyze_dataset_trend(self, dataset_id, days_back=30): """Analyse la tendance d'un dataset sur une période donnée""" try: dataset = Dataset.objects.get(id=dataset_id) end_date = timezone.now().date() start_date = end_date - timedelta(days=days_back) # Récupérer les données de la période data_points = DataPoint.objects.filter( dataset=dataset, date__gte=start_date, date__lte=end_date ).order_by('date') if data_points.count() < self.min_data_points: return None # Convertir en DataFrame pour l'analyse df = pd.DataFrame(list(data_points.values('date', 'value', 'country'))) # Analyser par pays trends = {} for country in df['country'].unique(): country_data = df[df['country'] == country].sort_values('date') if len(country_data) >= self.min_data_points: trend = self._calculate_trend(country_data['value'].values) trends[country] = trend return { 'dataset_id': dataset_id, 'dataset_title': dataset.title, 'period_days': days_back, 'trends': trends, 'analysis_date': timezone.now() } except Exception as e: logger.error(f"Erreur lors de l'analyse de tendance: {str(e)}") return None def _calculate_trend(self, values): """Calcule la tendance d'une série de valeurs""" if len(values) < 2: return {'direction': 'stable', 'strength': 0, 'change_percent': 0} # Calcul de la pente de régression linéaire x = np.arange(len(values)) slope, intercept = np.polyfit(x, values, 1) # Calcul du pourcentage de changement first_value = values[0] last_value = values[-1] change_percent = ((last_value - first_value) / first_value) * 100 if first_value != 0 else 0 # Détermination de la direction if abs(change_percent) < 5: # Seuil de 5% pour considérer comme stable direction = 'stable' elif change_percent > 0: direction = 'increasing' else: direction = 'decreasing' # Force de la tendance basée sur la corrélation correlation = np.corrcoef(x, values)[0, 1] strength = abs(correlation) if not np.isnan(correlation) else 0 return { 'direction': direction, 'strength': strength, 'change_percent': round(change_percent, 2), 'slope': slope, 'correlation': correlation } class AnomalyDetector: """Détecteur d'anomalies dans les données""" def __init__(self): self.z_threshold = 2.5 # Seuil Z-score pour détecter les anomalies def detect_anomalies(self, dataset_id, country=None): """Détecte les anomalies dans un dataset""" try: dataset = Dataset.objects.get(id=dataset_id) # Filtrer par pays si spécifié query = Q(dataset=dataset) if country: query &= Q(country=country) data_points = DataPoint.objects.filter(query).order_by('date') if data_points.count() < 10: # Minimum de données pour détecter des anomalies return [] # Convertir en DataFrame df = pd.DataFrame(list(data_points.values('id', 'date', 'value', 'country'))) anomalies = [] # Analyser par pays for country_code in df['country'].unique(): country_data = df[df['country'] == country_code] country_anomalies = self._detect_country_anomalies(country_data, dataset) anomalies.extend(country_anomalies) return anomalies except Exception as e: logger.error(f"Erreur lors de la détection d'anomalies: {str(e)}") return [] def _detect_country_anomalies(self, data, dataset): """Détecte les anomalies pour un pays spécifique""" if len(data) < 10: return [] values = data['value'].values mean_val = np.mean(values) std_val = np.std(values) if std_val == 0: # Pas de variation return [] anomalies = [] for _, row in data.iterrows(): z_score = abs((row['value'] - mean_val) / std_val) if z_score > self.z_threshold: anomaly_type = 'high' if row['value'] > mean_val else 'low' anomalies.append({ 'data_point_id': row['id'], 'dataset': dataset, 'country': row['country'], 'date': row['date'], 'value': row['value'], 'expected_range': (mean_val - 2*std_val, mean_val + 2*std_val), 'z_score': z_score, 'anomaly_type': anomaly_type, 'severity': 'high' if z_score > 3 else 'medium' }) return anomalies class AlertGenerator: """Générateur d'alertes automatiques""" def __init__(self): self.trend_analyzer = TrendAnalyzer() self.anomaly_detector = AnomalyDetector() def generate_trend_alerts(self, dataset_id): """Génère des alertes basées sur les tendances""" trend_analysis = self.trend_analyzer.analyze_dataset_trend(dataset_id) if not trend_analysis: return [] alerts = [] dataset = Dataset.objects.get(id=dataset_id) for country, trend in trend_analysis['trends'].items(): # Alerte pour forte baisse if trend['direction'] == 'decreasing' and abs(trend['change_percent']) > 20: alert = self._create_alert( dataset=dataset, country=country, alert_type='trend_down', severity='high' if abs(trend['change_percent']) > 50 else 'medium', title=f"Forte baisse détectée - {dataset.title}", message=f"Baisse de {abs(trend['change_percent']):.1f}% détectée pour {country}", current_value=trend['change_percent'] ) alerts.append(alert) # Alerte pour forte hausse elif trend['direction'] == 'increasing' and trend['change_percent'] > 30: alert = self._create_alert( dataset=dataset, country=country, alert_type='trend_up', severity='medium', title=f"Forte hausse détectée - {dataset.title}", message=f"Hausse de {trend['change_percent']:.1f}% détectée pour {country}", current_value=trend['change_percent'] ) alerts.append(alert) return alerts def generate_anomaly_alerts(self, dataset_id): """Génère des alertes basées sur les anomalies""" anomalies = self.anomaly_detector.detect_anomalies(dataset_id) alerts = [] for anomaly in anomalies: alert = self._create_alert( dataset=anomaly['dataset'], country=anomaly['country'], alert_type='anomaly', severity=anomaly['severity'], title=f"Anomalie détectée - {anomaly['dataset'].title}", message=f"Valeur anormale ({anomaly['value']}) détectée pour {anomaly['country']}", current_value=anomaly['value'], threshold_value=anomaly['expected_range'][1] if anomaly['anomaly_type'] == 'high' else anomaly['expected_range'][0] ) alerts.append(alert) return alerts def _create_alert(self, dataset, country, alert_type, severity, title, message, current_value=None, threshold_value=None): """Crée une alerte si elle n'existe pas déjà""" # Vérifier si une alerte similaire existe déjà existing_alert = Alert.objects.filter( dataset=dataset, country=country, alert_type=alert_type, is_active=True, created_at__gte=timezone.now() - timedelta(days=1) # Dans les dernières 24h ).first() if existing_alert: return existing_alert # Créer une nouvelle alerte alert = Alert.objects.create( dataset=dataset, country=country, alert_type=alert_type, severity=severity, title=title, message=message, current_value=current_value, threshold_value=threshold_value ) logger.info(f"Nouvelle alerte créée: {title} pour {country}") return alert class PredictiveAnalyzer: """Analyseur prédictif simple""" def predict_next_values(self, dataset_id, country, days_ahead=30): """Prédit les valeurs futures basées sur les tendances historiques""" try: dataset = Dataset.objects.get(id=dataset_id) # Récupérer les données historiques (90 derniers jours) end_date = timezone.now().date() start_date = end_date - timedelta(days=90) data_points = DataPoint.objects.filter( dataset=dataset, country=country, date__gte=start_date, date__lte=end_date ).order_by('date') if data_points.count() < 10: return None # Convertir en arrays dates = [dp.date for dp in data_points] values = [float(dp.value) for dp in data_points] # Régression linéaire simple x = np.arange(len(values)) slope, intercept = np.polyfit(x, values, 1) # Prédictions predictions = [] last_x = len(values) - 1 for i in range(1, days_ahead + 1): future_x = last_x + i predicted_value = slope * future_x + intercept future_date = end_date + timedelta(days=i) predictions.append({ 'date': future_date, 'predicted_value': max(0, predicted_value), # Éviter les valeurs négatives 'confidence': max(0, 1 - (i / days_ahead)) # Confiance décroissante }) return { 'dataset_id': dataset_id, 'country': country, 'predictions': predictions, 'trend_slope': slope, 'historical_data_points': len(values) } except Exception as e: logger.error(f"Erreur lors de la prédiction: {str(e)}") return None def run_automated_analysis(): """Exécute l'analyse automatisée pour tous les datasets actifs""" alert_generator = AlertGenerator() active_datasets = Dataset.objects.filter(status='active') total_alerts = 0 for dataset in active_datasets: try: # Générer des alertes de tendance trend_alerts = alert_generator.generate_trend_alerts(dataset.id) total_alerts += len(trend_alerts) # Générer des alertes d'anomalie anomaly_alerts = alert_generator.generate_anomaly_alerts(dataset.id) total_alerts += len(anomaly_alerts) except Exception as e: logger.error(f"Erreur lors de l'analyse du dataset {dataset.id}: {str(e)}") logger.info(f"Analyse automatisée terminée. {total_alerts} nouvelles alertes générées.") return total_alerts