AfriDataHubBackend / core /analytics.py
rinogeek's picture
Fresh start with LFS for sqlite
d292ba9
"""
Module d'analyse prédictive et de détection d'anomalies pour AfriDataHub
Created by Marino ATOHOUN - AfriDataHub Platform
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from django.utils import timezone
from django.db.models import Q, Avg, Count
from datasets.models import Dataset, DataPoint, Alert
import logging
logger = logging.getLogger('afridatahub')
class TrendAnalyzer:
"""Analyseur de tendances pour les données temporelles"""
def __init__(self):
self.min_data_points = 3 # Minimum de points pour analyser une tendance
def analyze_dataset_trend(self, dataset_id, days_back=30):
"""Analyse la tendance d'un dataset sur une période donnée"""
try:
dataset = Dataset.objects.get(id=dataset_id)
end_date = timezone.now().date()
start_date = end_date - timedelta(days=days_back)
# Récupérer les données de la période
data_points = DataPoint.objects.filter(
dataset=dataset,
date__gte=start_date,
date__lte=end_date
).order_by('date')
if data_points.count() < self.min_data_points:
return None
# Convertir en DataFrame pour l'analyse
df = pd.DataFrame(list(data_points.values('date', 'value', 'country')))
# Analyser par pays
trends = {}
for country in df['country'].unique():
country_data = df[df['country'] == country].sort_values('date')
if len(country_data) >= self.min_data_points:
trend = self._calculate_trend(country_data['value'].values)
trends[country] = trend
return {
'dataset_id': dataset_id,
'dataset_title': dataset.title,
'period_days': days_back,
'trends': trends,
'analysis_date': timezone.now()
}
except Exception as e:
logger.error(f"Erreur lors de l'analyse de tendance: {str(e)}")
return None
def _calculate_trend(self, values):
"""Calcule la tendance d'une série de valeurs"""
if len(values) < 2:
return {'direction': 'stable', 'strength': 0, 'change_percent': 0}
# Calcul de la pente de régression linéaire
x = np.arange(len(values))
slope, intercept = np.polyfit(x, values, 1)
# Calcul du pourcentage de changement
first_value = values[0]
last_value = values[-1]
change_percent = ((last_value - first_value) / first_value) * 100 if first_value != 0 else 0
# Détermination de la direction
if abs(change_percent) < 5: # Seuil de 5% pour considérer comme stable
direction = 'stable'
elif change_percent > 0:
direction = 'increasing'
else:
direction = 'decreasing'
# Force de la tendance basée sur la corrélation
correlation = np.corrcoef(x, values)[0, 1]
strength = abs(correlation) if not np.isnan(correlation) else 0
return {
'direction': direction,
'strength': strength,
'change_percent': round(change_percent, 2),
'slope': slope,
'correlation': correlation
}
class AnomalyDetector:
"""Détecteur d'anomalies dans les données"""
def __init__(self):
self.z_threshold = 2.5 # Seuil Z-score pour détecter les anomalies
def detect_anomalies(self, dataset_id, country=None):
"""Détecte les anomalies dans un dataset"""
try:
dataset = Dataset.objects.get(id=dataset_id)
# Filtrer par pays si spécifié
query = Q(dataset=dataset)
if country:
query &= Q(country=country)
data_points = DataPoint.objects.filter(query).order_by('date')
if data_points.count() < 10: # Minimum de données pour détecter des anomalies
return []
# Convertir en DataFrame
df = pd.DataFrame(list(data_points.values('id', 'date', 'value', 'country')))
anomalies = []
# Analyser par pays
for country_code in df['country'].unique():
country_data = df[df['country'] == country_code]
country_anomalies = self._detect_country_anomalies(country_data, dataset)
anomalies.extend(country_anomalies)
return anomalies
except Exception as e:
logger.error(f"Erreur lors de la détection d'anomalies: {str(e)}")
return []
def _detect_country_anomalies(self, data, dataset):
"""Détecte les anomalies pour un pays spécifique"""
if len(data) < 10:
return []
values = data['value'].values
mean_val = np.mean(values)
std_val = np.std(values)
if std_val == 0: # Pas de variation
return []
anomalies = []
for _, row in data.iterrows():
z_score = abs((row['value'] - mean_val) / std_val)
if z_score > self.z_threshold:
anomaly_type = 'high' if row['value'] > mean_val else 'low'
anomalies.append({
'data_point_id': row['id'],
'dataset': dataset,
'country': row['country'],
'date': row['date'],
'value': row['value'],
'expected_range': (mean_val - 2*std_val, mean_val + 2*std_val),
'z_score': z_score,
'anomaly_type': anomaly_type,
'severity': 'high' if z_score > 3 else 'medium'
})
return anomalies
class AlertGenerator:
"""Générateur d'alertes automatiques"""
def __init__(self):
self.trend_analyzer = TrendAnalyzer()
self.anomaly_detector = AnomalyDetector()
def generate_trend_alerts(self, dataset_id):
"""Génère des alertes basées sur les tendances"""
trend_analysis = self.trend_analyzer.analyze_dataset_trend(dataset_id)
if not trend_analysis:
return []
alerts = []
dataset = Dataset.objects.get(id=dataset_id)
for country, trend in trend_analysis['trends'].items():
# Alerte pour forte baisse
if trend['direction'] == 'decreasing' and abs(trend['change_percent']) > 20:
alert = self._create_alert(
dataset=dataset,
country=country,
alert_type='trend_down',
severity='high' if abs(trend['change_percent']) > 50 else 'medium',
title=f"Forte baisse détectée - {dataset.title}",
message=f"Baisse de {abs(trend['change_percent']):.1f}% détectée pour {country}",
current_value=trend['change_percent']
)
alerts.append(alert)
# Alerte pour forte hausse
elif trend['direction'] == 'increasing' and trend['change_percent'] > 30:
alert = self._create_alert(
dataset=dataset,
country=country,
alert_type='trend_up',
severity='medium',
title=f"Forte hausse détectée - {dataset.title}",
message=f"Hausse de {trend['change_percent']:.1f}% détectée pour {country}",
current_value=trend['change_percent']
)
alerts.append(alert)
return alerts
def generate_anomaly_alerts(self, dataset_id):
"""Génère des alertes basées sur les anomalies"""
anomalies = self.anomaly_detector.detect_anomalies(dataset_id)
alerts = []
for anomaly in anomalies:
alert = self._create_alert(
dataset=anomaly['dataset'],
country=anomaly['country'],
alert_type='anomaly',
severity=anomaly['severity'],
title=f"Anomalie détectée - {anomaly['dataset'].title}",
message=f"Valeur anormale ({anomaly['value']}) détectée pour {anomaly['country']}",
current_value=anomaly['value'],
threshold_value=anomaly['expected_range'][1] if anomaly['anomaly_type'] == 'high' else anomaly['expected_range'][0]
)
alerts.append(alert)
return alerts
def _create_alert(self, dataset, country, alert_type, severity, title, message, current_value=None, threshold_value=None):
"""Crée une alerte si elle n'existe pas déjà"""
# Vérifier si une alerte similaire existe déjà
existing_alert = Alert.objects.filter(
dataset=dataset,
country=country,
alert_type=alert_type,
is_active=True,
created_at__gte=timezone.now() - timedelta(days=1) # Dans les dernières 24h
).first()
if existing_alert:
return existing_alert
# Créer une nouvelle alerte
alert = Alert.objects.create(
dataset=dataset,
country=country,
alert_type=alert_type,
severity=severity,
title=title,
message=message,
current_value=current_value,
threshold_value=threshold_value
)
logger.info(f"Nouvelle alerte créée: {title} pour {country}")
return alert
class PredictiveAnalyzer:
"""Analyseur prédictif simple"""
def predict_next_values(self, dataset_id, country, days_ahead=30):
"""Prédit les valeurs futures basées sur les tendances historiques"""
try:
dataset = Dataset.objects.get(id=dataset_id)
# Récupérer les données historiques (90 derniers jours)
end_date = timezone.now().date()
start_date = end_date - timedelta(days=90)
data_points = DataPoint.objects.filter(
dataset=dataset,
country=country,
date__gte=start_date,
date__lte=end_date
).order_by('date')
if data_points.count() < 10:
return None
# Convertir en arrays
dates = [dp.date for dp in data_points]
values = [float(dp.value) for dp in data_points]
# Régression linéaire simple
x = np.arange(len(values))
slope, intercept = np.polyfit(x, values, 1)
# Prédictions
predictions = []
last_x = len(values) - 1
for i in range(1, days_ahead + 1):
future_x = last_x + i
predicted_value = slope * future_x + intercept
future_date = end_date + timedelta(days=i)
predictions.append({
'date': future_date,
'predicted_value': max(0, predicted_value), # Éviter les valeurs négatives
'confidence': max(0, 1 - (i / days_ahead)) # Confiance décroissante
})
return {
'dataset_id': dataset_id,
'country': country,
'predictions': predictions,
'trend_slope': slope,
'historical_data_points': len(values)
}
except Exception as e:
logger.error(f"Erreur lors de la prédiction: {str(e)}")
return None
def run_automated_analysis():
"""Exécute l'analyse automatisée pour tous les datasets actifs"""
alert_generator = AlertGenerator()
active_datasets = Dataset.objects.filter(status='active')
total_alerts = 0
for dataset in active_datasets:
try:
# Générer des alertes de tendance
trend_alerts = alert_generator.generate_trend_alerts(dataset.id)
total_alerts += len(trend_alerts)
# Générer des alertes d'anomalie
anomaly_alerts = alert_generator.generate_anomaly_alerts(dataset.id)
total_alerts += len(anomaly_alerts)
except Exception as e:
logger.error(f"Erreur lors de l'analyse du dataset {dataset.id}: {str(e)}")
logger.info(f"Analyse automatisée terminée. {total_alerts} nouvelles alertes générées.")
return total_alerts