MCP_CO2 / src /tools.py
Qdonnars's picture
v2: Architecture simplifiée - 3 outils progressifs
7815898
"""Outils MCP v2 pour l'API ADEME (AGRIBALYSE + Base Carbone).
Architecture simplifiée en 3 outils :
1. search - Exploration large (id + nom uniquement)
2. compare - Éléments différenciants pour choisir entre FE
3. details - Méthodologie complète d'un FE
"""
import json
from typing import Any
import asyncio
from .clients import AgribalyseClient, BaseCarboneClient
# Clients globaux
_agribalyse_client: AgribalyseClient | None = None
_base_carbone_client: BaseCarboneClient | None = None
def _get_agribalyse() -> AgribalyseClient:
global _agribalyse_client
if _agribalyse_client is None:
_agribalyse_client = AgribalyseClient()
return _agribalyse_client
def _get_base_carbone() -> BaseCarboneClient:
global _base_carbone_client
if _base_carbone_client is None:
_base_carbone_client = BaseCarboneClient()
return _base_carbone_client
# ============================================================================
# FORMATTERS - Différents niveaux de détail
# ============================================================================
def _format_minimal_agribalyse(item: dict[str, Any]) -> dict[str, Any]:
"""Format minimal pour search : id + name seulement."""
return {
"id": f"agri_{item.get('Code_CIQUAL', '')}",
"name": item.get("Nom_du_Produit_en_Français", ""),
}
def _format_minimal_base_carbone(item: dict[str, Any]) -> dict[str, Any]:
"""Format minimal pour search : id + name seulement."""
name_parts = [item.get("Nom_base_français", "")]
if item.get("Nom_attribut_français"):
name_parts.append(item.get("Nom_attribut_français"))
if item.get("Nom_frontière_français"):
name_parts.append(f"({item.get('Nom_frontière_français')})")
element_id = item.get("Identifiant_de_l'élément", "")
return {
"id": f"bc_{element_id}",
"name": " - ".join(filter(None, name_parts)),
}
def _format_compare_agribalyse(item: dict[str, Any]) -> dict[str, Any]:
"""Format comparaison pour compare : éléments différenciants."""
return {
"id": f"agri_{item.get('Code_CIQUAL', '')}",
"name": item.get("Nom_du_Produit_en_Français", ""),
"value": item.get("Changement_climatique", 0),
"unit": "kgCO2e/kg",
"uncertainty": None, # AGRIBALYSE n'a pas d'incertitude directe
"dqr": item.get("DQR"),
"source": "AGRIBALYSE 3.1",
"category": item.get("Groupe_d'aliment", ""),
"status": "valid",
}
def _format_compare_base_carbone(item: dict[str, Any]) -> dict[str, Any]:
"""Format comparaison pour compare : éléments différenciants."""
name_parts = [item.get("Nom_base_français", "")]
if item.get("Nom_attribut_français"):
name_parts.append(item.get("Nom_attribut_français"))
if item.get("Nom_frontière_français"):
name_parts.append(f"({item.get('Nom_frontière_français')})")
# Extraire la date de validité
validity = item.get("Période_de_validité", "")
# Mapper le statut
status_map = {
"Valide générique": "valid",
"Valide spécifique": "valid",
"Archivé": "archived",
}
status = status_map.get(item.get("Statut_de_l'élément", ""), "unknown")
element_id = item.get("Identifiant_de_l'élément", "")
return {
"id": f"bc_{element_id}",
"name": " - ".join(filter(None, name_parts)),
"value": item.get("Total_poste_non_décomposé", 0),
"unit": item.get("Unité_français", ""),
"uncertainty": item.get("Incertitude"),
"dqr": item.get("Qualité"),
"date": validity,
"source": item.get("Source") or item.get("Contributeur", "ADEME"),
"category": item.get("Code_de_la_catégorie", "").split(" > ")[0] if item.get("Code_de_la_catégorie") else "",
"status": status,
}
def _format_full_agribalyse(item: dict[str, Any]) -> dict[str, Any]:
"""Format complet pour details : tout."""
return {
"id": f"agri_{item.get('Code_CIQUAL', '')}",
"name": item.get("Nom_du_Produit_en_Français", ""),
"name_en": item.get("LCI_Name", ""),
"value": item.get("Changement_climatique", 0),
"unit": "kgCO2e/kg",
"category": item.get("Groupe_d'aliment", ""),
"subcategory": item.get("Sous-groupe_d'aliment", ""),
"quality": {
"dqr": item.get("DQR"),
"ef_score": item.get("Score_unique_EF"),
},
"climate_breakdown": {
"fossil": item.get("Changement_climatique_-_émissions_fossiles"),
"biogenic": item.get("Changement_climatique_-_émissions_biogéniques"),
"land_use_change": item.get("Changement_climatique_-_émissions_liées_au_changement_d'affectation_des_sols"),
},
"other_impacts": {
"acidification": item.get("Acidification_terrestre_et_eaux_douces"),
"eutrophication_marine": item.get("Eutrophisation_marine"),
"eutrophication_freshwater": item.get("Eutrophisation_eaux_douces"),
"land_use": item.get("Utilisation_du_sol"),
"water_use": item.get("Épuisement_des_ressources_eau"),
"particulate_matter": item.get("Particules_fines"),
},
"context": {
"delivery": item.get("Livraison"),
"preparation": item.get("Préparation"),
"packaging": item.get("Approche_emballage_"),
"by_plane": item.get("code_avion", False),
},
"source": "AGRIBALYSE 3.1 - ADEME",
}
def _format_full_base_carbone(item: dict[str, Any], posts: list[dict] = None) -> dict[str, Any]:
"""Format complet pour details : tout."""
name_parts = [item.get("Nom_base_français", "")]
if item.get("Nom_attribut_français"):
name_parts.append(item.get("Nom_attribut_français"))
if item.get("Nom_frontière_français"):
name_parts.append(f"({item.get('Nom_frontière_français')})")
element_id = item.get("Identifiant_de_l'élément", "")
result = {
"id": f"bc_{element_id}",
"name": " - ".join(filter(None, name_parts)),
"name_en": item.get("Nom_base_anglais", ""),
"value": item.get("Total_poste_non_décomposé", 0),
"unit": item.get("Unité_français", ""),
"category": item.get("Code_de_la_catégorie", ""),
"location": item.get("Localisation_géographique", ""),
"quality": {
"uncertainty_percent": item.get("Incertitude"),
"dqr": item.get("Qualité"),
"transparency": item.get("Transparence"),
"indicators": {
"M": item.get("Qualité_M"),
"TeR": item.get("Qualité_TeR"),
"C": item.get("Qualité_C"),
"TiR": item.get("Qualité_TiR"),
"GR": item.get("Qualité_GR"),
"P": item.get("Qualité_P"),
},
},
"methodology": {
"source": item.get("Source"),
"program": item.get("Programme"),
"program_url": item.get("Url_du_programme"),
"contributor": item.get("Contributeur"),
"other_contributors": item.get("Autres_Contributeurs"),
"structure": item.get("Structure"),
"comment": item.get("Commentaire_français"),
"regulations": item.get("Réglementations"),
},
"gas_breakdown": {
"co2_fossil": item.get("CO2f"),
"co2_biogenic": item.get("CO2b"),
"ch4_fossil": item.get("CH4f"),
"ch4_biogenic": item.get("CH4b"),
"n2o": item.get("N2O"),
"other_ghg": item.get("Autres_GES"),
},
"validity": {
"status": item.get("Statut_de_l'élément"),
"created": item.get("Date_de_création"),
"modified": item.get("Date_de_modification"),
"period": item.get("Période_de_validité"),
},
}
# Ajouter la décomposition par poste si disponible
if posts:
result["post_breakdown"] = [
{
"post": p.get("Nom_poste_français"),
"type": p.get("Type_poste"),
"value": p.get("Total_poste_non_décomposé"),
}
for p in posts
]
return result
# ============================================================================
# OUTIL 1 : SEARCH - Exploration large
# ============================================================================
async def search(
query: str,
source: str = "all",
size: int = 20
) -> str:
"""Recherche de facteurs d'émission - retourne uniquement ID et nom.
Première étape pour identifier les FE pertinents avant d'approfondir.
Recherche dans AGRIBALYSE (alimentation) et/ou Base Carbone (transport,
énergie, déchets, équipements).
Args:
query: Terme de recherche (ex: "boeuf", "camion diesel", "gaz naturel")
source: Base à interroger
- "all" : cherche dans les 2 bases (défaut)
- "agribalyse" : alimentation uniquement
- "base_carbone" : transport, énergie, déchets, équipements
size: Nombre max de résultats par source (défaut: 20, max: 50)
Returns:
JSON avec id et nom uniquement pour chaque résultat.
Utiliser compare() ensuite pour les éléments différenciants,
puis details() pour la méthodologie complète.
Example:
search("camion livraison")
search("boeuf", source="agribalyse")
search("électricité", source="base_carbone", size=30)
"""
results = []
counts = {"agribalyse": 0, "base_carbone": 0}
size = min(size, 50)
try:
tasks = []
if source in ["all", "agribalyse"]:
async def search_agri():
client = _get_agribalyse()
data = await client.search(query, size=size)
items = []
for item in data.get("results", []):
items.append(_format_minimal_agribalyse(item))
return "agribalyse", items
tasks.append(search_agri())
if source in ["all", "base_carbone"]:
async def search_bc():
client = _get_base_carbone()
data = await client.search(query, size=size)
items = []
for item in data.get("results", []):
if item.get("Type_Ligne") == "Elément":
items.append(_format_minimal_base_carbone(item))
return "base_carbone", items
tasks.append(search_bc())
# Exécuter en parallèle
if tasks:
results_raw = await asyncio.gather(*tasks)
for src, items in results_raw:
results.extend(items)
counts[src] = len(items)
return json.dumps({
"total": len(results),
"counts": {k: v for k, v in counts.items() if v > 0},
"results": results
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
# ============================================================================
# OUTIL 2 : COMPARE - Éléments différenciants
# ============================================================================
async def compare(
ids: str
) -> str:
"""Compare plusieurs facteurs d'émission - retourne les éléments différenciants.
Deuxième étape après search() pour choisir entre les FE candidats.
Retourne valeur, unité, incertitude, date, source et statut.
Args:
ids: Liste d'identifiants séparés par des virgules.
Format: "agri_XXXXX" pour AGRIBALYSE, "bc_XXXXX" pour Base Carbone.
Exemple: "bc_28022,bc_28023,bc_28024"
Returns:
JSON avec tableau comparatif et résumé (min, max, plus récent).
Example:
compare("bc_28022,bc_28023,bc_28024")
compare("agri_6101,agri_25033")
"""
try:
id_list = [id.strip() for id in ids.split(",")]
agri_ids = [id[5:] for id in id_list if id.startswith("agri_")]
bc_ids = [id[3:] for id in id_list if id.startswith("bc_")]
results = []
# Rechercher dans AGRIBALYSE
if agri_ids:
client = _get_agribalyse()
for agri_id in agri_ids:
item = await client.get_by_code(int(agri_id))
if item:
results.append(_format_compare_agribalyse(item))
# Rechercher dans Base Carbone
if bc_ids:
client = _get_base_carbone()
for bc_id in bc_ids:
items = await client.get_by_id(bc_id)
if items:
for item in items:
if item.get("Type_Ligne") == "Elément":
results.append(_format_compare_base_carbone(item))
break
# Calculer le résumé
summary = {}
if results:
values = [(r["id"], r["value"]) for r in results if r.get("value") is not None]
if values:
min_item = min(values, key=lambda x: x[1])
max_item = max(values, key=lambda x: x[1])
summary["lowest_impact"] = {"id": min_item[0], "value": min_item[1]}
summary["highest_impact"] = {"id": max_item[0], "value": max_item[1]}
if len(values) >= 2:
diff = max_item[1] - min_item[1]
diff_pct = (diff / min_item[1] * 100) if min_item[1] > 0 else 0
summary["spread"] = {
"absolute": round(diff, 4),
"percent": round(diff_pct, 1)
}
return json.dumps({
"count": len(results),
"comparison": results,
"summary": summary
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
# ============================================================================
# OUTIL 3 : DETAILS - Méthodologie complète
# ============================================================================
async def details(
ids: str
) -> str:
"""Détails méthodologiques complets d'un ou plusieurs facteurs d'émission.
Troisième étape pour comprendre la méthodologie, les hypothèses et
la fiabilité d'un FE. Retourne toutes les informations disponibles :
incertitude, décomposition par gaz, commentaires, réglementations, etc.
Args:
ids: Liste d'identifiants séparés par des virgules (1-5 max recommandé).
Format: "agri_XXXXX" pour AGRIBALYSE, "bc_XXXXX" pour Base Carbone.
Returns:
JSON avec tous les détails méthodologiques pour chaque FE.
Example:
details("bc_28022")
details("agri_6101,agri_25033")
"""
try:
id_list = [id.strip() for id in ids.split(",")][:5] # Max 5
agri_ids = [id[5:] for id in id_list if id.startswith("agri_")]
bc_ids = [id[3:] for id in id_list if id.startswith("bc_")]
results = []
# AGRIBALYSE
if agri_ids:
client = _get_agribalyse()
for agri_id in agri_ids:
item = await client.get_by_code(int(agri_id))
if item:
results.append(_format_full_agribalyse(item))
# Base Carbone (avec décomposition par poste)
if bc_ids:
client = _get_base_carbone()
for bc_id in bc_ids:
items = await client.get_by_id(bc_id)
if items:
main_item = None
posts = []
for item in items:
if item.get("Type_Ligne") == "Elément":
main_item = item
elif item.get("Type_Ligne") == "Poste":
posts.append(item)
if main_item:
results.append(_format_full_base_carbone(main_item, posts))
return json.dumps({
"count": len(results),
"details": results
}, ensure_ascii=False, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
# Liste des outils pour export
ALL_TOOLS = [
search,
compare,
details,
]