MCP_CO2 / src /clients /base_carbone.py
Qdonnars's picture
MCP Server ADEME Environnement - Initial release
aac4780
"""Client API Base Carbone - Facteurs d'émission ADEME."""
import httpx
from typing import Any
BASE_URL = "https://data.ademe.fr/data-fair/api/v1/datasets/base-carboner"
class BaseCarboneClient:
"""Client async pour l'API Base Carbone."""
def __init__(self, timeout: float = 30.0):
self.timeout = timeout
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
async def search(
self,
query: str,
size: int = 10,
type_ligne: str = "Elément",
statut: str | None = None,
category: str | None = None
) -> dict[str, Any]:
"""Recherche de facteurs d'émission dans Base Carbone.
Args:
query: Terme de recherche
size: Nombre de résultats (max 100)
type_ligne: "Elément" ou "Poste" (défaut: Elément)
statut: Filtrer par statut ("Valide générique", "Archivé", etc.)
category: Filtrer par catégorie (chemin partiel)
Returns:
Dict avec total et results
"""
client = await self._get_client()
params = {
"q": query,
"size": min(size, 100)
}
# Construire les filtres
filters = []
if type_ligne:
filters.append(f"Type_Ligne:{type_ligne}")
if statut:
filters.append(f"Statut_de_l'élément:\"{statut}\"")
if category:
filters.append(f"Code_de_la_catégorie:*{category}*")
if filters:
params["qs"] = " AND ".join(filters)
response = await client.get(f"{BASE_URL}/lines", params=params)
response.raise_for_status()
return response.json()
async def get_by_id(self, element_id: str) -> dict[str, Any] | None:
"""Récupère un facteur d'émission par son identifiant.
Args:
element_id: Identifiant de l'élément
Returns:
Données du FE ou None
"""
client = await self._get_client()
params = {
"qs": f"Identifiant_de_l'élément:{element_id}",
"size": 10 # Peut avoir plusieurs lignes (postes)
}
response = await client.get(f"{BASE_URL}/lines", params=params)
response.raise_for_status()
data = response.json()
if data.get("results"):
return data["results"]
return None
async def get_categories(self, parent: str | None = None) -> list[dict[str, Any]]:
"""Liste les catégories disponibles.
Args:
parent: Chemin de catégorie parent pour filtrer
Returns:
Liste des catégories avec leur compte
"""
client = await self._get_client()
params = {
"field": "Code_de_la_catégorie",
"size": 200
}
if parent:
params["qs"] = f"Code_de_la_catégorie:*{parent}*"
response = await client.get(f"{BASE_URL}/values_agg", params=params)
response.raise_for_status()
data = response.json()
return [
{"category": item["value"], "count": item["total"]}
for item in data.get("aggs", [])
]
async def search_transport(
self,
vehicle_type: str | None = None,
fuel_type: str | None = None,
size: int = 20
) -> dict[str, Any]:
"""Recherche de FE transport.
Args:
vehicle_type: Type de véhicule (VUL, Poids lourd, Voiture, etc.)
fuel_type: Type de carburant (Diesel, Essence, Électrique, etc.)
size: Nombre de résultats
Returns:
Facteurs d'émission transport
"""
query_parts = ["transport"]
if vehicle_type:
query_parts.append(vehicle_type)
if fuel_type:
query_parts.append(fuel_type)
return await self.search(
query=" ".join(query_parts),
size=size,
category="Transport"
)
async def search_energy(
self,
energy_type: str | None = None,
size: int = 20
) -> dict[str, Any]:
"""Recherche de FE énergie.
Args:
energy_type: Type d'énergie (Gaz naturel, Électricité, Fioul, etc.)
size: Nombre de résultats
Returns:
Facteurs d'émission énergie
"""
query = energy_type or "énergie"
return await self.search(
query=query,
size=size,
category="Combustibles"
)
async def search_waste(
self,
waste_type: str | None = None,
treatment: str | None = None,
size: int = 20
) -> dict[str, Any]:
"""Recherche de FE déchets.
Args:
waste_type: Type de déchet (alimentaire, plastique, etc.)
treatment: Traitement (compost, incinération, stockage, etc.)
size: Nombre de résultats
Returns:
Facteurs d'émission déchets
"""
query_parts = ["déchet"]
if waste_type:
query_parts.append(waste_type)
if treatment:
query_parts.append(treatment)
return await self.search(
query=" ".join(query_parts),
size=size,
category="Traitement des déchets"
)
async def search_equipment(
self,
equipment_type: str | None = None,
size: int = 20
) -> dict[str, Any]:
"""Recherche de FE équipements.
Args:
equipment_type: Type d'équipement (ordinateur, téléphone, etc.)
size: Nombre de résultats
Returns:
Facteurs d'émission équipements
"""
query = equipment_type or "équipement informatique"
return await self.search(
query=query,
size=size,
category="Achats de biens"
)
# Champs principaux Base Carbone pour référence
BASE_CARBONE_FIELDS = {
"identite": [
"Identifiant_de_l'élément",
"Nom_base_français",
"Nom_base_anglais",
"Nom_attribut_français",
"Nom_frontière_français",
"Code_de_la_catégorie",
"Type_de_l'élément",
"Type_Ligne",
],
"valeurs": [
"Total_poste_non_décomposé",
"Unité_français",
"Unité_anglais",
],
"decomposition_gaz": [
"CO2f",
"CO2b",
"CH4f",
"CH4b",
"N2O",
"Autres_GES",
],
"qualite": [
"Incertitude",
"Qualité",
"Qualité_M",
"Qualité_TeR",
"Qualité_C",
"Qualité_TiR",
"Qualité_GR",
"Qualité_P",
"Transparence",
],
"methodologie": [
"Source",
"Programme",
"Url_du_programme",
"Contributeur",
"Autres_Contributeurs",
"Structure",
"Commentaire_français",
"Réglementations",
],
"validite": [
"Statut_de_l'élément",
"Date_de_création",
"Date_de_modification",
"Période_de_validité",
],
"localisation": [
"Localisation_géographique",
"Sous-localisation_géographique_français",
],
}