AfriDataHubBackend / core /data_integrators.py
rinogeek's picture
Fresh start with LFS for sqlite
d292ba9
"""
Intégrateurs de données externes pour AfriDataHub
Created by Marino ATOHOUN - AfriDataHub Platform
"""
import requests
import json
import logging
from datetime import datetime, timedelta
from django.conf import settings
from django.utils import timezone
from datasets.models import Dataset, DataPoint, Alert
logger = logging.getLogger('afridatahub')
AFRICAN_COUNTRIES = [
'DZ', 'AO', 'BJ', 'BW', 'BF', 'BI', 'CV', 'CM', 'CF', 'TD',
'KM', 'CG', 'CD', 'DJ', 'EG', 'GQ', 'ER', 'SZ', 'ET', 'GA',
'GM', 'GH', 'GN', 'GW', 'CI', 'KE', 'LS', 'LR', 'LY', 'MG',
'MW', 'ML', 'MR', 'MU', 'MA', 'MZ', 'NA', 'NE', 'NG', 'RW',
'ST', 'SN', 'SC', 'SL', 'SO', 'ZA', 'SS', 'SD', 'TZ', 'TG',
'TN', 'UG', 'ZM', 'ZW'
]
class BaseDataIntegrator:
"""Classe de base pour les intégrateurs de données"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'AfriDataHub/1.0 (https://afridatahub.com)'
})
def create_or_update_dataset(self, title, description, domain, source_name, source_url):
"""Crée ou met à jour un dataset"""
dataset, created = Dataset.objects.get_or_create(
title=title,
defaults={
'description': description,
'domain': domain,
'source_name': source_name,
'source_url': source_url,
'status': 'active'
}
)
if not created:
dataset.last_updated = timezone.now()
dataset.save()
return dataset
def save_data_point(self, dataset, country, value, date, unit='', extra_info=None):
"""Sauvegarde un point de données"""
if extra_info is None:
extra_info = {}
data_point, created = DataPoint.objects.update_or_create(
dataset=dataset,
country=country,
date=date,
defaults={
'value': value,
'unit': unit,
'extra_info': extra_info
}
)
return data_point, created
class FAODataIntegrator(BaseDataIntegrator):
"""Intégrateur pour les données FAO (agriculture)"""
def __init__(self):
super().__init__()
self.base_url = "http://www.fao.org/faostat/api/v1"
self.api_key = settings.FAO_API_KEY
def fetch_crop_production_data(self):
"""Récupère les données de production agricole"""
try:
# Créer le dataset
dataset = self.create_or_update_dataset(
title="Production agricole africaine (FAO)",
description="Données de production des principales cultures en Afrique",
domain="agriculture",
source_name="FAO",
source_url="http://www.fao.org/faostat/"
)
# Données d'exemple (en attendant l'API réelle)
import random
sample_data = []
crops = ['Maïs', 'Blé', 'Cacao', 'Café', 'Riz', 'Manioc']
for country in AFRICAN_COUNTRIES:
crop = random.choice(crops)
sample_data.append({
'country': country,
'crop': crop,
'production': random.randint(10000, 5000000),
'date': '2023-01-01'
})
created_count = 0
for item in sample_data:
date = datetime.strptime(item['date'], '%Y-%m-%d').date()
data_point, created = self.save_data_point(
dataset=dataset,
country=item['country'],
value=item['production'],
date=date,
unit='tonnes',
extra_info={'crop': item['crop']}
)
if created:
created_count += 1
logger.info(f"FAO: {created_count} nouveaux points de données créés")
return created_count
except Exception as e:
logger.error(f"Erreur lors de l'intégration FAO: {str(e)}")
return 0
class WorldBankDataIntegrator(BaseDataIntegrator):
"""Intégrateur pour les données de la Banque Mondiale"""
def __init__(self):
super().__init__()
self.base_url = "https://api.worldbank.org/v2"
def fetch_economic_indicators(self):
"""Récupère les indicateurs économiques"""
try:
# Dataset PIB
gdp_dataset = self.create_or_update_dataset(
title="PIB par habitant (Banque Mondiale)",
description="Produit Intérieur Brut par habitant des pays africains",
domain="economy",
source_name="Banque Mondiale",
source_url="https://data.worldbank.org/"
)
# Dataset Énergie
energy_dataset = self.create_or_update_dataset(
title="Accès à l'électricité (Banque Mondiale)",
description="Pourcentage de la population ayant accès à l'électricité",
domain="energy",
source_name="Banque Mondiale",
source_url="https://data.worldbank.org/"
)
# Données d'exemple
import random
gdp_data = []
energy_data = []
for country in AFRICAN_COUNTRIES:
gdp_data.append({
'country': country,
'gdp_per_capita': random.randint(500, 15000),
'date': '2023-01-01'
})
energy_data.append({
'country': country,
'electricity_access': random.randint(10, 100),
'date': '2023-01-01'
})
created_count = 0
# Sauvegarder les données PIB
for item in gdp_data:
date = datetime.strptime(item['date'], '%Y-%m-%d').date()
data_point, created = self.save_data_point(
dataset=gdp_dataset,
country=item['country'],
value=item['gdp_per_capita'],
date=date,
unit='USD'
)
if created:
created_count += 1
# Sauvegarder les données d'énergie
for item in energy_data:
date = datetime.strptime(item['date'], '%Y-%m-%d').date()
data_point, created = self.save_data_point(
dataset=energy_dataset,
country=item['country'],
value=item['electricity_access'],
date=date,
unit='%'
)
if created:
created_count += 1
logger.info(f"World Bank: {created_count} nouveaux points de données créés")
return created_count
except Exception as e:
logger.error(f"Erreur lors de l'intégration World Bank: {str(e)}")
return 0
class WHODataIntegrator(BaseDataIntegrator):
"""Intégrateur pour les données OMS (santé)"""
def __init__(self):
super().__init__()
self.base_url = "https://ghoapi.azureedge.net/api"
def fetch_health_indicators(self):
"""Récupère les indicateurs de santé"""
try:
# Dataset Espérance de vie
life_expectancy_dataset = self.create_or_update_dataset(
title="Espérance de vie (OMS)",
description="Espérance de vie à la naissance dans les pays africains",
domain="health",
source_name="OMS",
source_url="https://www.who.int/"
)
# Dataset Mortalité infantile
infant_mortality_dataset = self.create_or_update_dataset(
title="Mortalité infantile (OMS)",
description="Taux de mortalité infantile pour 1000 naissances vivantes",
domain="health",
source_name="OMS",
source_url="https://www.who.int/"
)
# Données d'exemple
import random
life_expectancy_data = []
infant_mortality_data = []
for country in AFRICAN_COUNTRIES:
life_expectancy_data.append({
'country': country,
'life_expectancy': round(random.uniform(50, 80), 1),
'date': '2023-01-01'
})
infant_mortality_data.append({
'country': country,
'infant_mortality': random.randint(10, 100),
'date': '2023-01-01'
})
created_count = 0
# Sauvegarder les données d'espérance de vie
for item in life_expectancy_data:
date = datetime.strptime(item['date'], '%Y-%m-%d').date()
data_point, created = self.save_data_point(
dataset=life_expectancy_dataset,
country=item['country'],
value=item['life_expectancy'],
date=date,
unit='années'
)
if created:
created_count += 1
# Sauvegarder les données de mortalité infantile
for item in infant_mortality_data:
date = datetime.strptime(item['date'], '%Y-%m-%d').date()
data_point, created = self.save_data_point(
dataset=infant_mortality_dataset,
country=item['country'],
value=item['infant_mortality'],
date=date,
unit='pour 1000'
)
if created:
created_count += 1
logger.info(f"WHO: {created_count} nouveaux points de données créés")
return created_count
except Exception as e:
logger.error(f"Erreur lors de l'intégration WHO: {str(e)}")
return 0
class WeatherDataIntegrator(BaseDataIntegrator):
"""Intégrateur pour les données météorologiques"""
def __init__(self):
super().__init__()
self.api_key = settings.OPENWEATHER_API_KEY
self.base_url = "https://api.openweathermap.org/data/2.5"
def fetch_weather_data(self):
"""Récupère les données météorologiques actuelles"""
try:
# Dataset Température
temperature_dataset = self.create_or_update_dataset(
title="Température moyenne (OpenWeatherMap)",
description="Température moyenne quotidienne dans les capitales africaines",
domain="weather",
source_name="OpenWeatherMap",
source_url="https://openweathermap.org/"
)
# Dataset Précipitations
rainfall_dataset = self.create_or_update_dataset(
title="Précipitations (OpenWeatherMap)",
description="Précipitations quotidiennes dans les capitales africaines",
domain="weather",
source_name="OpenWeatherMap",
source_url="https://openweathermap.org/"
)
# Données d'exemple (simulation météo)
import random
created_count = 0
today = timezone.now().date()
for country in AFRICAN_COUNTRIES:
# Température simulée
temp = random.uniform(15, 35) # Entre 15 et 35°C
data_point, created = self.save_data_point(
dataset=temperature_dataset,
country=country,
value=round(temp, 1),
date=today,
unit='°C'
)
if created:
created_count += 1
# Précipitations simulées
rainfall = random.uniform(0, 20) # Entre 0 et 20mm
data_point, created = self.save_data_point(
dataset=rainfall_dataset,
country=country,
value=round(rainfall, 1),
date=today,
unit='mm'
)
if created:
created_count += 1
logger.info(f"Weather: {created_count} nouveaux points de données créés")
return created_count
except Exception as e:
logger.error(f"Erreur lors de l'intégration Weather: {str(e)}")
return 0
def run_all_integrators():
"""Exécute tous les intégrateurs de données"""
total_created = 0
integrators = [
FAODataIntegrator(),
WorldBankDataIntegrator(),
WHODataIntegrator(),
WeatherDataIntegrator(),
]
for integrator in integrators:
try:
if hasattr(integrator, 'fetch_crop_production_data'):
total_created += integrator.fetch_crop_production_data()
elif hasattr(integrator, 'fetch_economic_indicators'):
total_created += integrator.fetch_economic_indicators()
elif hasattr(integrator, 'fetch_health_indicators'):
total_created += integrator.fetch_health_indicators()
elif hasattr(integrator, 'fetch_weather_data'):
total_created += integrator.fetch_weather_data()
except Exception as e:
logger.error(f"Erreur avec l'intégrateur {integrator.__class__.__name__}: {str(e)}")
logger.info(f"Total: {total_created} nouveaux points de données créés")
return total_created