Spaces:
Sleeping
Sleeping
| """ | |
| Module d'analyse prédictive et de détection d'anomalies pour AfriDataHub | |
| Created by Marino ATOHOUN - AfriDataHub Platform | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from django.utils import timezone | |
| from django.db.models import Q, Avg, Count | |
| from datasets.models import Dataset, DataPoint, Alert | |
| import logging | |
| logger = logging.getLogger('afridatahub') | |
| class TrendAnalyzer: | |
| """Analyseur de tendances pour les données temporelles""" | |
| def __init__(self): | |
| self.min_data_points = 3 # Minimum de points pour analyser une tendance | |
| def analyze_dataset_trend(self, dataset_id, days_back=30): | |
| """Analyse la tendance d'un dataset sur une période donnée""" | |
| try: | |
| dataset = Dataset.objects.get(id=dataset_id) | |
| end_date = timezone.now().date() | |
| start_date = end_date - timedelta(days=days_back) | |
| # Récupérer les données de la période | |
| data_points = DataPoint.objects.filter( | |
| dataset=dataset, | |
| date__gte=start_date, | |
| date__lte=end_date | |
| ).order_by('date') | |
| if data_points.count() < self.min_data_points: | |
| return None | |
| # Convertir en DataFrame pour l'analyse | |
| df = pd.DataFrame(list(data_points.values('date', 'value', 'country'))) | |
| # Analyser par pays | |
| trends = {} | |
| for country in df['country'].unique(): | |
| country_data = df[df['country'] == country].sort_values('date') | |
| if len(country_data) >= self.min_data_points: | |
| trend = self._calculate_trend(country_data['value'].values) | |
| trends[country] = trend | |
| return { | |
| 'dataset_id': dataset_id, | |
| 'dataset_title': dataset.title, | |
| 'period_days': days_back, | |
| 'trends': trends, | |
| 'analysis_date': timezone.now() | |
| } | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'analyse de tendance: {str(e)}") | |
| return None | |
| def _calculate_trend(self, values): | |
| """Calcule la tendance d'une série de valeurs""" | |
| if len(values) < 2: | |
| return {'direction': 'stable', 'strength': 0, 'change_percent': 0} | |
| # Calcul de la pente de régression linéaire | |
| x = np.arange(len(values)) | |
| slope, intercept = np.polyfit(x, values, 1) | |
| # Calcul du pourcentage de changement | |
| first_value = values[0] | |
| last_value = values[-1] | |
| change_percent = ((last_value - first_value) / first_value) * 100 if first_value != 0 else 0 | |
| # Détermination de la direction | |
| if abs(change_percent) < 5: # Seuil de 5% pour considérer comme stable | |
| direction = 'stable' | |
| elif change_percent > 0: | |
| direction = 'increasing' | |
| else: | |
| direction = 'decreasing' | |
| # Force de la tendance basée sur la corrélation | |
| correlation = np.corrcoef(x, values)[0, 1] | |
| strength = abs(correlation) if not np.isnan(correlation) else 0 | |
| return { | |
| 'direction': direction, | |
| 'strength': strength, | |
| 'change_percent': round(change_percent, 2), | |
| 'slope': slope, | |
| 'correlation': correlation | |
| } | |
| class AnomalyDetector: | |
| """Détecteur d'anomalies dans les données""" | |
| def __init__(self): | |
| self.z_threshold = 2.5 # Seuil Z-score pour détecter les anomalies | |
| def detect_anomalies(self, dataset_id, country=None): | |
| """Détecte les anomalies dans un dataset""" | |
| try: | |
| dataset = Dataset.objects.get(id=dataset_id) | |
| # Filtrer par pays si spécifié | |
| query = Q(dataset=dataset) | |
| if country: | |
| query &= Q(country=country) | |
| data_points = DataPoint.objects.filter(query).order_by('date') | |
| if data_points.count() < 10: # Minimum de données pour détecter des anomalies | |
| return [] | |
| # Convertir en DataFrame | |
| df = pd.DataFrame(list(data_points.values('id', 'date', 'value', 'country'))) | |
| anomalies = [] | |
| # Analyser par pays | |
| for country_code in df['country'].unique(): | |
| country_data = df[df['country'] == country_code] | |
| country_anomalies = self._detect_country_anomalies(country_data, dataset) | |
| anomalies.extend(country_anomalies) | |
| return anomalies | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la détection d'anomalies: {str(e)}") | |
| return [] | |
| def _detect_country_anomalies(self, data, dataset): | |
| """Détecte les anomalies pour un pays spécifique""" | |
| if len(data) < 10: | |
| return [] | |
| values = data['value'].values | |
| mean_val = np.mean(values) | |
| std_val = np.std(values) | |
| if std_val == 0: # Pas de variation | |
| return [] | |
| anomalies = [] | |
| for _, row in data.iterrows(): | |
| z_score = abs((row['value'] - mean_val) / std_val) | |
| if z_score > self.z_threshold: | |
| anomaly_type = 'high' if row['value'] > mean_val else 'low' | |
| anomalies.append({ | |
| 'data_point_id': row['id'], | |
| 'dataset': dataset, | |
| 'country': row['country'], | |
| 'date': row['date'], | |
| 'value': row['value'], | |
| 'expected_range': (mean_val - 2*std_val, mean_val + 2*std_val), | |
| 'z_score': z_score, | |
| 'anomaly_type': anomaly_type, | |
| 'severity': 'high' if z_score > 3 else 'medium' | |
| }) | |
| return anomalies | |
| class AlertGenerator: | |
| """Générateur d'alertes automatiques""" | |
| def __init__(self): | |
| self.trend_analyzer = TrendAnalyzer() | |
| self.anomaly_detector = AnomalyDetector() | |
| def generate_trend_alerts(self, dataset_id): | |
| """Génère des alertes basées sur les tendances""" | |
| trend_analysis = self.trend_analyzer.analyze_dataset_trend(dataset_id) | |
| if not trend_analysis: | |
| return [] | |
| alerts = [] | |
| dataset = Dataset.objects.get(id=dataset_id) | |
| for country, trend in trend_analysis['trends'].items(): | |
| # Alerte pour forte baisse | |
| if trend['direction'] == 'decreasing' and abs(trend['change_percent']) > 20: | |
| alert = self._create_alert( | |
| dataset=dataset, | |
| country=country, | |
| alert_type='trend_down', | |
| severity='high' if abs(trend['change_percent']) > 50 else 'medium', | |
| title=f"Forte baisse détectée - {dataset.title}", | |
| message=f"Baisse de {abs(trend['change_percent']):.1f}% détectée pour {country}", | |
| current_value=trend['change_percent'] | |
| ) | |
| alerts.append(alert) | |
| # Alerte pour forte hausse | |
| elif trend['direction'] == 'increasing' and trend['change_percent'] > 30: | |
| alert = self._create_alert( | |
| dataset=dataset, | |
| country=country, | |
| alert_type='trend_up', | |
| severity='medium', | |
| title=f"Forte hausse détectée - {dataset.title}", | |
| message=f"Hausse de {trend['change_percent']:.1f}% détectée pour {country}", | |
| current_value=trend['change_percent'] | |
| ) | |
| alerts.append(alert) | |
| return alerts | |
| def generate_anomaly_alerts(self, dataset_id): | |
| """Génère des alertes basées sur les anomalies""" | |
| anomalies = self.anomaly_detector.detect_anomalies(dataset_id) | |
| alerts = [] | |
| for anomaly in anomalies: | |
| alert = self._create_alert( | |
| dataset=anomaly['dataset'], | |
| country=anomaly['country'], | |
| alert_type='anomaly', | |
| severity=anomaly['severity'], | |
| title=f"Anomalie détectée - {anomaly['dataset'].title}", | |
| message=f"Valeur anormale ({anomaly['value']}) détectée pour {anomaly['country']}", | |
| current_value=anomaly['value'], | |
| threshold_value=anomaly['expected_range'][1] if anomaly['anomaly_type'] == 'high' else anomaly['expected_range'][0] | |
| ) | |
| alerts.append(alert) | |
| return alerts | |
| def _create_alert(self, dataset, country, alert_type, severity, title, message, current_value=None, threshold_value=None): | |
| """Crée une alerte si elle n'existe pas déjà""" | |
| # Vérifier si une alerte similaire existe déjà | |
| existing_alert = Alert.objects.filter( | |
| dataset=dataset, | |
| country=country, | |
| alert_type=alert_type, | |
| is_active=True, | |
| created_at__gte=timezone.now() - timedelta(days=1) # Dans les dernières 24h | |
| ).first() | |
| if existing_alert: | |
| return existing_alert | |
| # Créer une nouvelle alerte | |
| alert = Alert.objects.create( | |
| dataset=dataset, | |
| country=country, | |
| alert_type=alert_type, | |
| severity=severity, | |
| title=title, | |
| message=message, | |
| current_value=current_value, | |
| threshold_value=threshold_value | |
| ) | |
| logger.info(f"Nouvelle alerte créée: {title} pour {country}") | |
| return alert | |
| class PredictiveAnalyzer: | |
| """Analyseur prédictif simple""" | |
| def predict_next_values(self, dataset_id, country, days_ahead=30): | |
| """Prédit les valeurs futures basées sur les tendances historiques""" | |
| try: | |
| dataset = Dataset.objects.get(id=dataset_id) | |
| # Récupérer les données historiques (90 derniers jours) | |
| end_date = timezone.now().date() | |
| start_date = end_date - timedelta(days=90) | |
| data_points = DataPoint.objects.filter( | |
| dataset=dataset, | |
| country=country, | |
| date__gte=start_date, | |
| date__lte=end_date | |
| ).order_by('date') | |
| if data_points.count() < 10: | |
| return None | |
| # Convertir en arrays | |
| dates = [dp.date for dp in data_points] | |
| values = [float(dp.value) for dp in data_points] | |
| # Régression linéaire simple | |
| x = np.arange(len(values)) | |
| slope, intercept = np.polyfit(x, values, 1) | |
| # Prédictions | |
| predictions = [] | |
| last_x = len(values) - 1 | |
| for i in range(1, days_ahead + 1): | |
| future_x = last_x + i | |
| predicted_value = slope * future_x + intercept | |
| future_date = end_date + timedelta(days=i) | |
| predictions.append({ | |
| 'date': future_date, | |
| 'predicted_value': max(0, predicted_value), # Éviter les valeurs négatives | |
| 'confidence': max(0, 1 - (i / days_ahead)) # Confiance décroissante | |
| }) | |
| return { | |
| 'dataset_id': dataset_id, | |
| 'country': country, | |
| 'predictions': predictions, | |
| 'trend_slope': slope, | |
| 'historical_data_points': len(values) | |
| } | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la prédiction: {str(e)}") | |
| return None | |
| def run_automated_analysis(): | |
| """Exécute l'analyse automatisée pour tous les datasets actifs""" | |
| alert_generator = AlertGenerator() | |
| active_datasets = Dataset.objects.filter(status='active') | |
| total_alerts = 0 | |
| for dataset in active_datasets: | |
| try: | |
| # Générer des alertes de tendance | |
| trend_alerts = alert_generator.generate_trend_alerts(dataset.id) | |
| total_alerts += len(trend_alerts) | |
| # Générer des alertes d'anomalie | |
| anomaly_alerts = alert_generator.generate_anomaly_alerts(dataset.id) | |
| total_alerts += len(anomaly_alerts) | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'analyse du dataset {dataset.id}: {str(e)}") | |
| logger.info(f"Analyse automatisée terminée. {total_alerts} nouvelles alertes générées.") | |
| return total_alerts | |