MCP_data_foncier / server.py
Qdonnars's picture
Upload folder using huggingface_hub
dff63a3 verified
"""
Serveur MCP CEREMA — Tools pour les données foncières et friches.
Ce module définit les 5 fonctions-tools exposées via Gradio MCP :
1. rechercher_friches : recherche de friches sur un territoire
2. statistiques_prix_foncier : prix et volumes de transactions immobilières
3. evolution_prix : évolution temporelle des prix fonciers
4. statistiques_friches : statistiques agrégées de friches multi-échelle
5. diagnostic_foncier_territoire : vision combinée friches + marché foncier
"""
import pandas as pd
from data_loader import (
load_dv3f,
load_friches,
get_departement_from_commune,
get_friches_agg,
get_all_echelles_for_commune,
get_epci_for_commune,
get_region_for_commune,
get_region_for_departement,
REG_NAMES,
PERIODES_CONSTRUCTION,
)
# =============================================================================
# Helpers
# =============================================================================
def _format_prix(val: float | None) -> str:
"""Formate un prix en euros lisible."""
if val is None or pd.isna(val):
return "non disponible"
if val >= 1_000_000:
return f"{val/1_000_000:,.2f} M€".replace(",", " ")
if val >= 1_000:
return f"{val:,.0f} €".replace(",", " ")
return f"{val:.0f} €"
def _format_pxm2(val: float | None) -> str:
"""Formate un prix au m²."""
if val is None or pd.isna(val):
return "non disponible"
return f"{val:,.0f} €/m²".replace(",", " ")
def _format_surface(val: float | None, unite: str = "m²") -> str:
"""Formate une surface."""
if val is None or pd.isna(val):
return "non disponible"
return f"{val:,.0f} {unite}".replace(",", " ")
def _safe_int(val) -> int | None:
"""Convertit en int si possible, None sinon."""
try:
if pd.isna(val):
return None
return int(val)
except (ValueError, TypeError):
return None
def _safe_float(val) -> float | None:
"""Convertit en float si possible, None sinon."""
try:
if pd.isna(val):
return None
return float(val)
except (ValueError, TypeError):
return None
def _get_dv3f_row(echelle: str, code: str, annee: int) -> pd.Series | None:
"""Récupère une ligne DV3F par index."""
df = load_dv3f()
try:
row = df.loc[(echelle, code, annee)]
if isinstance(row, pd.DataFrame):
row = row.iloc[0]
return row
except KeyError:
return None
def _get_dv3f_series(echelle: str, code: str) -> pd.DataFrame | None:
"""Récupère toutes les années pour un territoire."""
df = load_dv3f()
try:
result = df.loc[(echelle, code)]
if isinstance(result, pd.Series):
result = result.to_frame().T
return result
except KeyError:
return None
def _find_best_echelle(commune: str = "", departement: str = "", annee: int = 2024):
"""Trouve la meilleure échelle disponible avec fallback.
Retourne (echelle, code, libelle, row) ou None.
"""
df = load_dv3f()
# 1. Essayer la commune
if commune:
row = _get_dv3f_row("communes", commune, annee)
if row is not None:
nb = _safe_int(row.get("nbtrans_cod111"))
if nb is not None and nb >= 5:
return ("communes", commune, row.get("libelle", commune), row)
# Commune trouvée mais pas assez de transactions pour les prix
# On retourne quand même la commune mais on signalera le fallback si besoin
dep = departement or get_departement_from_commune(commune)
row_dep = _get_dv3f_row("departements", dep, annee)
if row_dep is not None:
return ("departements", dep, row_dep.get("libelle", dep), row_dep,
"communes", commune, row)
else:
# Commune non trouvée, essayer le département
dep = departement or get_departement_from_commune(commune)
row_dep = _get_dv3f_row("departements", dep, annee)
if row_dep is not None:
return ("departements", dep, row_dep.get("libelle", dep), row_dep)
# 2. Essayer le département
if departement:
row = _get_dv3f_row("departements", departement, annee)
if row is not None:
return ("departements", departement, row.get("libelle", departement), row)
return None
# =============================================================================
# Tool 1 : Recherche de friches
# =============================================================================
def rechercher_friches(
commune: str = "",
departement: str = "",
type_friche: str = "",
surface_min: float = 0,
statut: str = "",
) -> str:
"""Recherche des friches disponibles sur un territoire donné.
Interroge la base Cartofriches du CEREMA (inventaire national des friches) pour
trouver les friches correspondant aux critères. Utile pour la politique de Zéro
Artificialisation Nette (ZAN) : identifier les terrains déjà artificialisés qui
peuvent être réhabilités plutôt que de consommer de nouveaux espaces naturels.
Args:
commune: Code INSEE de la commune (ex: "13055" pour Marseille, "75056" pour Paris). Optionnel.
departement: Code du département (ex: "13", "75", "59"). Optionnel.
type_friche: Type de friche à rechercher. Valeurs possibles : "industrielle", "habitat",
"commerciale", "ferroviaire", "militaire", "hospitalière", "logistique",
"agro-industrielle", "équipement public", "carrière ou mine". Optionnel.
surface_min: Surface minimale de la friche en mètres carrés (ex: 5000 pour 0.5 ha). Défaut : 0.
statut: Statut de la friche. Valeurs possibles : "sans projet", "avec projet",
"potentielle", "reconvertie". Optionnel.
Returns:
Texte structuré décrivant les friches trouvées, avec pour chacune : nom, type, surface,
statut, pollution, zonage urbanisme, et commune. Si aucun résultat, un message explicite.
"""
gdf = load_friches()
mask = pd.Series([True] * len(gdf), index=gdf.index)
# Filtres
if commune:
mask &= gdf["comm_insee"] == commune
if departement:
mask &= gdf["dep"] == departement
if type_friche:
type_mapping = {
"industrielle": "friche industrielle",
"habitat": "friche d'habitat",
"commerciale": "friche commerciale",
"ferroviaire": "friche ferroviaire",
"militaire": "friche militaire",
"hospitalière": "friche hospitalière",
"logistique": "friche logistique",
"agro-industrielle": "friche agro-industrielle",
"équipement public": "friche d'équipement public",
"carrière ou mine": "friche carrière ou mine",
}
type_val = type_mapping.get(type_friche.lower(), type_friche)
mask &= gdf["site_type"].str.lower() == type_val.lower()
if surface_min > 0:
mask &= pd.to_numeric(gdf["site_surface"], errors="coerce") >= surface_min
if statut:
statut_mapping = {
"sans projet": "friche sans projet",
"avec projet": "friche avec projet",
"potentielle": "friche potentielle",
"reconvertie": "friche reconvertie",
}
statut_val = statut_mapping.get(statut.lower(), statut)
mask &= gdf["site_statut"].str.lower() == statut_val.lower()
results = gdf[mask]
if len(results) == 0:
filters_desc = []
if commune:
filters_desc.append(f"commune {commune}")
if departement:
filters_desc.append(f"département {departement}")
if type_friche:
filters_desc.append(f"type '{type_friche}'")
if surface_min > 0:
filters_desc.append(f"surface ≥ {surface_min:,.0f} m²")
if statut:
filters_desc.append(f"statut '{statut}'")
return f"Aucune friche trouvée avec les critères : {', '.join(filters_desc)}. Essayez d'élargir la recherche (retirer un filtre ou chercher au niveau département)."
# Construire la réponse
total = len(results)
lines = []
territory = ""
if commune:
commune_name = results.iloc[0]["comm_nom"] if len(results) > 0 else commune
territory = f"la commune de {commune_name} ({commune})"
elif departement:
territory = f"le département {departement}"
else:
territory = "le territoire recherché"
lines.append(f"## Friches trouvées sur {territory}")
lines.append(f"**{total} friche(s) trouvée(s)**" + (f" (les 30 premières sont affichées)" if total > 30 else ""))
lines.append("")
# Statistiques résumées (sur TOUS les résultats, pas seulement les 30 premiers)
surfaces = pd.to_numeric(results["site_surface"], errors="coerce")
lines.append(f"**Surface totale** : {surfaces.sum()/10000:,.1f} hectares")
lines.append(f"**Surface médiane** : {surfaces.median()/10000:,.2f} hectares")
lines.append("")
# Répartition par statut (sur TOUS les résultats)
statut_counts = results["site_statut"].value_counts()
lines.append("**Répartition par statut** :")
for s, c in statut_counts.items():
lines.append(f"- {s} : {c}")
lines.append("")
# Limiter à 30 résultats pour le détail
results_display = results.head(30)
# Liste détaillée
lines.append("### Détail des friches")
lines.append("")
for _, row in results_display.iterrows():
surface_val = _safe_float(row.get("site_surface"))
surface_ha = f"{surface_val/10000:,.2f} ha" if surface_val else "non renseignée"
surface_m2 = f"{surface_val:,.0f} m²" if surface_val else ""
pollution = row.get("sol_pollution_existe", "inconnu")
if pd.isna(pollution):
pollution = "inconnu"
zonage = row.get("urba_zone_type", "non renseigné")
if pd.isna(zonage):
zonage = "non renseigné"
bati_etat = row.get("bati_etat", "inconnu")
if pd.isna(bati_etat):
bati_etat = "inconnu"
site_type = row.get("site_type", "inconnu")
if pd.isna(site_type):
site_type = "inconnu"
lines.append(f"**{row.get('site_nom', 'Sans nom')}**")
lines.append(f"- Commune : {row.get('comm_nom', '?')} ({row.get('comm_insee', '?')})")
lines.append(f"- Type : {site_type}")
lines.append(f"- Surface : {surface_ha} ({surface_m2})")
lines.append(f"- Statut : {row.get('site_statut', '?')}")
lines.append(f"- Pollution sol : {pollution}")
lines.append(f"- Zonage urbanisme : {zonage}")
lines.append(f"- État du bâti : {bati_etat}")
lines.append("")
return "\n".join(lines)
# =============================================================================
# Tool 2 : Statistiques de prix foncier
# =============================================================================
def statistiques_prix_foncier(
commune: str = "",
departement: str = "",
type_bien: str = "tous",
annee: str = "2024",
) -> str:
"""Récupère les statistiques de prix foncier (transactions immobilières) sur un territoire.
Utilise les données DV3F du CEREMA (Demande de Valeurs Foncières) qui compilent
l'ensemble des transactions immobilières en France depuis 2010. Fournit les prix
médians, prix au m², et volumes de transactions pour les maisons et appartements.
Si la commune est trop petite (moins de 5 transactions), les statistiques du département
sont automatiquement fournies en complément.
Args:
commune: Code INSEE de la commune (ex: "13055" pour Marseille). Optionnel.
departement: Code du département (ex: "13", "75"). Optionnel. Si ni commune ni département
ne sont fournis, un message d'erreur est retourné.
type_bien: Type de bien immobilier. Valeurs : "tous" (défaut), "maison", "appartement".
annee: Année des statistiques (de 2010 à 2024). Défaut : "2024".
Returns:
Texte structuré avec le nombre de transactions, le prix médian, le prix au m²
(quartiles Q25, médian, Q75), et la surface médiane. Inclut aussi la ventilation
par période de construction (avant 1914, 1914-1947, etc.) pour identifier le
potentiel de rénovation énergétique (biens anciens vs neufs).
"""
if not commune and not departement:
return "Veuillez fournir au moins un code INSEE de commune ou un code département pour obtenir des statistiques de prix foncier."
try:
annee_int = int(annee)
except ValueError:
return f"L'année '{annee}' n'est pas valide. Veuillez fournir une année entre 2010 et 2024."
if annee_int < 2010 or annee_int > 2024:
return f"L'année {annee_int} est hors de la période couverte (2010–2024)."
result = _find_best_echelle(commune, departement, annee_int)
if result is None:
return f"Aucune donnée trouvée pour le territoire demandé (commune={commune}, département={departement}, année={annee})."
# Gérer le cas du fallback avec données commune
commune_row = None
commune_code = None
if len(result) == 7:
echelle, code, libelle, row, _, commune_code, commune_row = result
fallback = True
else:
echelle, code, libelle, row = result
fallback = echelle == "departements" and commune != ""
lines = []
# Titre
if echelle == "communes":
lines.append(f"## Prix foncier à {libelle} ({code}) — {annee_int}")
elif echelle == "departements":
lines.append(f"## Prix foncier dans le département {libelle} ({code}) — {annee_int}")
if commune and commune_row is not None:
nb_trans = _safe_int(commune_row.get("nbtrans_cod1"))
lines.append(f"\n> **Note** : La commune {commune} a seulement {nb_trans or 0} transaction(s) en {annee_int}, ce qui est insuffisant pour des statistiques de prix fiables. Les données du département sont présentées en complément.")
elif commune:
lines.append(f"\n> **Note** : La commune {commune} n'a pas été trouvée dans les données. Les données du département sont présentées.")
else:
lines.append(f"## Prix foncier — {libelle} ({code}) — {annee_int}")
lines.append("")
# Nombre total de transactions
nb_total = _safe_int(row.get("nbtrans_cod1"))
nb_maisons = _safe_int(row.get("nbtrans_cod111"))
nb_apparts = _safe_int(row.get("nbtrans_cod121"))
nb_terrains = _safe_int(row.get("nbtrans_cod2"))
lines.append("### Volume de transactions")
lines.append(f"- Total mutations : **{nb_total:,}**".replace(",", " ") if nb_total else "- Total mutations : non disponible")
lines.append(f"- Maisons : **{nb_maisons:,}**".replace(",", " ") if nb_maisons else "- Maisons : 0")
lines.append(f"- Appartements : **{nb_apparts:,}**".replace(",", " ") if nb_apparts else "- Appartements : 0")
lines.append(f"- Terrains (non bâti) : **{nb_terrains:,}**".replace(",", " ") if nb_terrains else "- Terrains : 0")
lines.append("")
# Maisons
if type_bien in ("tous", "maison"):
lines.append("### Maisons")
prix_med = _safe_float(row.get("valeurfonc_median_cod111"))
prix_q25 = _safe_float(row.get("valeurfonc_q25_cod111"))
prix_q75 = _safe_float(row.get("valeurfonc_q75_cod111"))
pxm2_med = _safe_float(row.get("pxm2_median_cod111"))
pxm2_q25 = _safe_float(row.get("pxm2_q25_cod111"))
pxm2_q75 = _safe_float(row.get("pxm2_q75_cod111"))
surf_med = _safe_float(row.get("sbati_median_cod111"))
lines.append(f"- Prix médian : **{_format_prix(prix_med)}** (Q25: {_format_prix(prix_q25)}, Q75: {_format_prix(prix_q75)})")
lines.append(f"- Prix au m² médian : **{_format_pxm2(pxm2_med)}** (Q25: {_format_pxm2(pxm2_q25)}, Q75: {_format_pxm2(pxm2_q75)})")
lines.append(f"- Surface médiane : **{_format_surface(surf_med)}**")
lines.append("")
# Par période
lines.append("**Transactions par période de construction** :")
for p in ["mp1", "mp2", "mp3", "mp4", "mp5", "mpx"]:
nb = _safe_int(row.get(f"nbtrans_{p}"))
if nb and nb > 0:
prix = _safe_float(row.get(f"valeurfonc_median_{p}"))
pxm2 = _safe_float(row.get(f"pxm2_median_{p}"))
periode_label = PERIODES_CONSTRUCTION[p]
lines.append(f"- {periode_label} : {nb} transactions, prix médian {_format_prix(prix)}, {_format_pxm2(pxm2)}")
lines.append("")
# Appartements
if type_bien in ("tous", "appartement"):
lines.append("### Appartements")
prix_med = _safe_float(row.get("valeurfonc_median_cod121"))
prix_q25 = _safe_float(row.get("valeurfonc_q25_cod121"))
prix_q75 = _safe_float(row.get("valeurfonc_q75_cod121"))
pxm2_med = _safe_float(row.get("pxm2_median_cod121"))
pxm2_q25 = _safe_float(row.get("pxm2_q25_cod121"))
pxm2_q75 = _safe_float(row.get("pxm2_q75_cod121"))
surf_med = _safe_float(row.get("sbati_median_cod121"))
if nb_apparts and nb_apparts > 0:
lines.append(f"- Prix médian : **{_format_prix(prix_med)}** (Q25: {_format_prix(prix_q25)}, Q75: {_format_prix(prix_q75)})")
lines.append(f"- Prix au m² médian : **{_format_pxm2(pxm2_med)}** (Q25: {_format_pxm2(pxm2_q25)}, Q75: {_format_pxm2(pxm2_q75)})")
lines.append(f"- Surface médiane : **{_format_surface(surf_med)}**")
lines.append("")
# Par période
lines.append("**Transactions par période de construction** :")
for p in ["ap1", "ap2", "ap3", "ap4", "ap5", "apx"]:
nb = _safe_int(row.get(f"nbtrans_{p}"))
if nb and nb > 0:
prix = _safe_float(row.get(f"valeurfonc_median_{p}"))
pxm2 = _safe_float(row.get(f"pxm2_median_{p}"))
periode_label = PERIODES_CONSTRUCTION[p]
lines.append(f"- {periode_label} : {nb} transactions, prix médian {_format_prix(prix)}, {_format_pxm2(pxm2)}")
else:
lines.append("Pas de transactions d'appartements sur ce territoire pour cette année.")
lines.append("")
# Source
lines.append("---")
lines.append(f"*Source : DV3F (CEREMA/DGFiP), échelle {echelle}, année {annee_int}.*")
return "\n".join(lines)
# =============================================================================
# Tool 3 : Évolution des prix
# =============================================================================
def evolution_prix(
commune: str = "",
departement: str = "",
type_bien: str = "maison",
) -> str:
"""Retourne l'évolution des prix fonciers sur un territoire de 2010 à 2024.
Permet de visualiser la tendance du marché immobilier sur 15 ans : évolution du prix
médian, du prix au m², et du volume de transactions. Utile pour les études de marché
et l'analyse des dynamiques territoriales liées à l'artificialisation.
Args:
commune: Code INSEE de la commune (ex: "13055"). Optionnel.
departement: Code du département (ex: "13"). Optionnel. Au moins un des deux est requis.
type_bien: Type de bien : "maison" (défaut) ou "appartement".
Returns:
Tableau année par année avec le nombre de transactions, le prix médian, et le
prix au m² médian. Inclut le calcul de l'évolution entre la première et la dernière
année disponible.
"""
if not commune and not departement:
return "Veuillez fournir au moins un code INSEE de commune ou un code département."
# Déterminer l'échelle
echelle = "communes" if commune else "departements"
code = commune if commune else departement
data = _get_dv3f_series(echelle, code)
if data is None:
if commune:
dep = departement or get_departement_from_commune(commune)
data = _get_dv3f_series("departements", dep)
if data is None:
return f"Aucune donnée trouvée pour la commune {commune} ni le département {dep}."
echelle = "departements"
code = dep
else:
return f"Aucune donnée trouvée pour le département {departement}."
# Récupérer le libellé
libelle = data.iloc[0].get("libelle", code) if "libelle" in data.columns else code
# Colonnes selon le type de bien
if type_bien.lower() == "appartement":
col_nb = "nbtrans_cod121"
col_prix = "valeurfonc_median_cod121"
col_pxm2 = "pxm2_median_cod121"
bien_label = "Appartements"
else:
col_nb = "nbtrans_cod111"
col_prix = "valeurfonc_median_cod111"
col_pxm2 = "pxm2_median_cod111"
bien_label = "Maisons"
lines = []
if echelle == "communes":
lines.append(f"## Évolution des prix — {bien_label} à {libelle} ({code})")
else:
lines.append(f"## Évolution des prix — {bien_label} dans le département {libelle} ({code})")
lines.append("")
# Construire le tableau
lines.append("| Année | Nb transactions | Prix médian | Prix/m² médian |")
lines.append("|-------|-----------------|-------------|----------------|")
first_prix = None
last_prix = None
first_pxm2 = None
last_pxm2 = None
# data index is annee
for annee_val in sorted(data.index):
row = data.loc[annee_val]
if isinstance(row, pd.DataFrame):
row = row.iloc[0]
nb = _safe_int(row.get(col_nb))
prix = _safe_float(row.get(col_prix))
pxm2 = _safe_float(row.get(col_pxm2))
if prix is not None and first_prix is None:
first_prix = prix
first_pxm2 = pxm2
if prix is not None:
last_prix = prix
last_pxm2 = pxm2
nb_str = f"{nb:,}".replace(",", " ") if nb is not None else "-"
prix_str = _format_prix(prix) if prix else "-"
pxm2_str = _format_pxm2(pxm2) if pxm2 else "-"
lines.append(f"| {annee_val} | {nb_str} | {prix_str} | {pxm2_str} |")
lines.append("")
# Évolution
if first_prix and last_prix:
evol_prix = ((last_prix - first_prix) / first_prix) * 100
lines.append(f"**Évolution du prix médian** : {evol_prix:+.1f}% sur la période")
if first_pxm2 and last_pxm2:
evol_pxm2 = ((last_pxm2 - first_pxm2) / first_pxm2) * 100
lines.append(f"**Évolution du prix/m²** : {evol_pxm2:+.1f}% sur la période")
lines.append("")
lines.append("---")
lines.append(f"*Source : DV3F (CEREMA/DGFiP), échelle {echelle}.*")
return "\n".join(lines)
# =============================================================================
# Tool 4 : Statistiques agrégées de friches (multi-échelle)
# =============================================================================
def _format_friches_agg(stats: dict, echelle_label: str, territoire_label: str) -> str:
"""Formate les statistiques agrégées de friches en texte lisible."""
lines = []
lines.append(f"## Statistiques des friches — {territoire_label}")
lines.append(f"*Échelle : {echelle_label}*")
lines.append("")
nb = int(stats.get("nb_friches", 0))
if nb == 0:
lines.append("**Aucune friche recensée** dans la base Cartofriches pour ce territoire.")
return "\n".join(lines)
# Chiffres clés
lines.append("### Chiffres clés")
lines.append(f"- **Nombre de friches** : {nb:,}".replace(",", " "))
nb_communes = stats.get("nb_communes", 0)
if isinstance(nb_communes, (int, float)) and nb_communes > 1:
lines.append(f"- **Communes concernées** : {int(nb_communes):,}".replace(",", " "))
surf_tot = stats.get("surface_totale_ha", 0)
lines.append(f"- **Surface totale** : {surf_tot:,.1f} hectares ({surf_tot/100:,.2f} km²)".replace(",", " "))
lines.append(f"- **Surface médiane** : {stats.get('surface_mediane_ha', 0):,.2f} hectares".replace(",", " "))
lines.append(f"- **Surface moyenne** : {stats.get('surface_moyenne_ha', 0):,.2f} hectares".replace(",", " "))
lines.append("")
# Statuts
lines.append("### Répartition par statut")
for statut_key, statut_label in [
("nb_potentielle", "Friche potentielle"),
("nb_sans_projet", "Friche sans projet"),
("nb_avec_projet", "Friche avec projet"),
("nb_reconvertie", "Friche reconvertie"),
]:
count = int(stats.get(statut_key, 0))
pct = round(100 * count / nb, 1) if nb > 0 else 0
lines.append(f"- {statut_label} : **{count}** ({pct}%)")
lines.append("")
# Mobilisables
nb_mob = int(stats.get("nb_mobilisables", 0))
surf_mob = stats.get("surface_mobilisable_ha", 0)
lines.append("### Potentiel mobilisable (ZAN)")
lines.append(f"- **Friches mobilisables** (sans projet + potentielles) : **{nb_mob}** friches")
lines.append(f"- **Surface mobilisable** : **{surf_mob:,.1f} hectares**".replace(",", " "))
lines.append("")
# Qualité
nb_poll = int(stats.get("nb_polluees", 0))
nb_u = int(stats.get("nb_zone_u", 0))
pct_u = stats.get("pct_zone_u", 0)
lines.append("### Caractéristiques")
lines.append(f"- Pollution avérée ou supposée : {nb_poll} ({round(100*nb_poll/nb, 1) if nb else 0}%)")
lines.append(f"- En zone urbanisée (U) : {nb_u} ({pct_u}%) — requalifiables sans artificialisation")
lines.append("")
# Types principaux
top_types = stats.get("top_types", {})
if top_types and isinstance(top_types, dict):
lines.append("### Principaux types de friches")
for type_name, count in sorted(top_types.items(), key=lambda x: -x[1]):
lines.append(f"- {type_name} : {int(count)}")
lines.append("")
lines.append("---")
lines.append("*Source : Cartofriches (CEREMA), statistiques pré-agrégées.*")
return "\n".join(lines)
def statistiques_friches(
commune: str = "",
epci: str = "",
departement: str = "",
region: str = "",
echelle: str = "",
) -> str:
"""Fournit les statistiques agrégées des friches à différentes échelles territoriales.
Retourne le nombre de friches, la surface totale et mobilisable, la répartition par
statut et type, le taux de pollution et le pourcentage en zone urbanisée. Les données
sont pré-agrégées pour des réponses instantanées.
Fonctionne à 5 échelles : commune, EPCI (intercommunalité), département, région, national.
Si une commune est fournie, l'outil retourne aussi automatiquement les statistiques
aux échelles supérieures (EPCI, département, région) pour permettre la comparaison.
Args:
commune: Code INSEE de la commune (ex: "13055" pour Marseille). Optionnel.
epci: Code SIREN de l'EPCI / intercommunalité (ex: "200054807" pour Aix-Marseille-Provence).
Optionnel.
departement: Code du département (ex: "13", "59"). Optionnel.
region: Code de la région (ex: "93" pour PACA, "32" pour Hauts-de-France). Optionnel.
echelle: Forcer une échelle spécifique : "commune", "epci", "departement", "region",
"national". Si vide, l'échelle est déduite des paramètres fournis. Mettre
"national" pour les statistiques France entière.
Returns:
Statistiques structurées : nombre de friches, surface totale et mobilisable,
répartition par statut/type, pollution, zone urbanisée. Si une commune est fournie,
inclut aussi les statistiques EPCI, département et région pour comparaison.
"""
# Cas national
if echelle == "national" or (not commune and not epci and not departement and not region):
if not echelle:
return ("Veuillez fournir au moins un territoire (commune, EPCI, département, région) "
"ou préciser echelle='national' pour les statistiques France entière.")
stats = get_friches_agg("national")
if stats:
return _format_friches_agg(stats, "National", "France entière")
return "Erreur : données nationales non disponibles."
# Si commune fournie → retourner multi-échelle pour comparaison
if commune:
infos = get_all_echelles_for_commune(commune)
lines_parts = []
# Commune
stats_comm = get_friches_agg("commune", commune)
if stats_comm:
lines_parts.append(_format_friches_agg(
stats_comm, "Commune",
f"{infos.get('commune_nom', '')} ({commune})"
))
else:
lines_parts.append(f"## Friches — {infos.get('commune_nom', commune)} ({commune})\n\nAucune friche recensée dans cette commune.")
# EPCI
epci_code = infos.get("epci", "")
if epci_code and epci_code != "ZZZZZZZZZ":
stats_epci = get_friches_agg("epci", epci_code)
if stats_epci:
lines_parts.append(_format_friches_agg(
stats_epci, "EPCI (intercommunalité)",
f"{infos.get('epci_nom', epci_code)}"
))
# Département
dep = infos.get("departement", "")
if dep:
stats_dep = get_friches_agg("departement", dep)
if stats_dep:
lines_parts.append(_format_friches_agg(
stats_dep, "Département",
f"Département {dep}"
))
# Région
reg = infos.get("region", "")
if reg:
stats_reg = get_friches_agg("region", reg)
if stats_reg:
lines_parts.append(_format_friches_agg(
stats_reg, "Région",
f"{infos.get('region_nom', reg)}"
))
return "\n\n---\n\n".join(lines_parts)
# EPCI seul
if epci:
stats = get_friches_agg("epci", epci)
if stats:
return _format_friches_agg(stats, "EPCI (intercommunalité)",
stats.get("libelle", epci))
return f"Aucune donnée de friches pour l'EPCI {epci}."
# Département seul
if departement:
stats = get_friches_agg("departement", departement)
if stats:
return _format_friches_agg(stats, "Département",
f"Département {departement}")
return f"Aucune donnée de friches pour le département {departement}."
# Région seule
if region:
stats = get_friches_agg("region", region)
if stats:
reg_nom = REG_NAMES.get(region, region)
return _format_friches_agg(stats, "Région", reg_nom)
return f"Aucune donnée de friches pour la région {region}."
return "Paramètres insuffisants. Fournissez un code commune, EPCI, département ou région."
# =============================================================================
# Tool 5 : Diagnostic foncier territorial (multi-échelle)
# =============================================================================
def diagnostic_foncier_territoire(
commune: str = "",
epci: str = "",
departement: str = "",
region: str = "",
) -> str:
"""Fournit un diagnostic foncier complet d'un territoire en croisant friches et marché immobilier.
Outil de synthèse qui combine les données Cartofriches (inventaire des friches) et
DV3F (transactions immobilières) pour donner une vision globale de la situation
foncière d'un territoire. Particulièrement utile dans le cadre du Zéro Artificialisation
Nette (ZAN) pour identifier le potentiel de requalification et contextualiser avec
le marché local.
Fonctionne à toutes les échelles : commune, EPCI, département, région.
Args:
commune: Code INSEE de la commune (ex: "13055" pour Marseille). Optionnel.
epci: Code SIREN de l'EPCI (ex: "200054807"). Optionnel.
departement: Code du département (ex: "13"). Optionnel.
region: Code de la région (ex: "93" pour PACA). Optionnel.
Returns:
Diagnostic structuré en 3 parties :
1. Inventaire des friches (nombre, surface, types, statuts)
2. Marché foncier local (prix, volumes, tendance)
3. Analyse croisée (contextualisation friches / marché)
"""
if not commune and not epci and not departement and not region:
return "Veuillez fournir au moins un territoire (commune, EPCI, département ou région)."
lines = []
# --- Déterminer l'échelle et le territoire ---
if commune:
friches_echelle, friches_code = "commune", commune
dv3f_echelle, dv3f_code = "communes", commune
infos = get_all_echelles_for_commune(commune)
nom_territoire = f"{infos.get('commune_nom', commune)} ({commune})"
dep_code = infos.get("departement", get_departement_from_commune(commune))
elif epci:
friches_echelle, friches_code = "epci", epci
dv3f_echelle, dv3f_code = "epci", epci
stats_temp = get_friches_agg("epci", epci)
nom_territoire = stats_temp.get("libelle", epci) if stats_temp else epci
dep_code = ""
elif departement:
friches_echelle, friches_code = "departement", departement
dv3f_echelle, dv3f_code = "departements", departement
nom_territoire = f"Département {departement}"
dep_code = departement
else: # region
friches_echelle, friches_code = "region", region
dv3f_echelle, dv3f_code = "regions", region
nom_territoire = REG_NAMES.get(region, f"Région {region}")
dep_code = ""
lines.append(f"## Diagnostic foncier — {nom_territoire}")
lines.append("")
# --- Partie 1 : Friches (depuis les agrégations pré-calculées) ---
lines.append("### 1. Inventaire des friches (Cartofriches)")
lines.append("")
stats = get_friches_agg(friches_echelle, friches_code)
if stats is None or int(stats.get("nb_friches", 0)) == 0:
lines.append("**Aucune friche recensée** dans la base Cartofriches pour ce territoire.")
lines.append("> Cela ne signifie pas qu'il n'y a pas de friches, mais qu'aucune n'a été inventoriée.")
nb_friches = 0
else:
nb_friches = int(stats["nb_friches"])
nb_communes = stats.get("nb_communes", 0)
lines.append(f"- **Nombre de friches** : {nb_friches:,}".replace(",", " "))
if isinstance(nb_communes, (int, float)) and nb_communes > 1:
lines.append(f"- **Communes concernées** : {int(nb_communes):,}".replace(",", " "))
lines.append(f"- **Surface totale** : {stats['surface_totale_ha']:,.1f} hectares".replace(",", " "))
lines.append(f"- **Surface médiane** : {stats['surface_mediane_ha']:,.2f} hectares".replace(",", " "))
lines.append("")
# Par statut
lines.append("**Par statut** :")
for key, label in [("nb_potentielle", "friche potentielle"), ("nb_sans_projet", "friche sans projet"),
("nb_avec_projet", "friche avec projet"), ("nb_reconvertie", "friche reconvertie")]:
count = int(stats.get(key, 0))
if count > 0:
pct = round(100 * count / nb_friches, 0)
lines.append(f"- {label} : {count} ({pct:.0f}%)")
lines.append("")
# Types
top_types = stats.get("top_types", {})
if top_types and isinstance(top_types, dict):
lines.append("**Principaux types** :")
for t, c in sorted(top_types.items(), key=lambda x: -x[1])[:5]:
lines.append(f"- {t} : {int(c)}")
lines.append("")
# Pollution et zone U
nb_poll = int(stats.get("nb_polluees", 0))
nb_u = int(stats.get("nb_zone_u", 0))
pct_u = stats.get("pct_zone_u", 0)
lines.append(f"**Pollution** : {nb_poll} friche(s) avec pollution avérée ou supposée ({round(100*nb_poll/nb_friches)}%)")
lines.append(f"**En zone urbanisée (U)** : {nb_u} ({pct_u}%) — requalifiables sans artificialisation")
lines.append("")
# --- Partie 2 : Marché foncier ---
lines.append("### 2. Marché foncier (DV3F)")
lines.append("")
annee = 2024
pxm2_maisons = None
if commune:
result = _find_best_echelle(commune, dep_code, annee)
elif departement:
result = _find_best_echelle("", departement, annee)
elif epci:
row = _get_dv3f_row("epci", epci, annee)
result = ("epci", epci, nom_territoire, row) if row is not None else None
elif region:
row = _get_dv3f_row("regions", region, annee)
result = ("regions", region, nom_territoire, row) if row is not None else None
else:
result = None
if result is None:
lines.append("Aucune donnée de marché foncier disponible pour ce territoire.")
else:
if len(result) == 7:
ech, ech_code, ech_libelle, row, _, _, _ = result
else:
ech, ech_code, ech_libelle, row = result
if ech == "departements" and commune:
lines.append(f"> Données au niveau département ({ech_libelle}) — commune trop petite pour des statistiques fiables.")
lines.append("")
nb_total = _safe_int(row.get("nbtrans_cod1"))
nb_maisons = _safe_int(row.get("nbtrans_cod111"))
nb_apparts = _safe_int(row.get("nbtrans_cod121"))
lines.append(f"**Année {annee}** :")
lines.append(f"- Transactions totales : {nb_total:,}".replace(",", " ") if nb_total else "- Transactions totales : non disponible")
lines.append("")
pxm2_maisons = _safe_float(row.get("pxm2_median_cod111"))
prix_maisons = _safe_float(row.get("valeurfonc_median_cod111"))
if nb_maisons and nb_maisons > 0:
lines.append(f"**Maisons** ({nb_maisons:,} transactions) :".replace(",", " "))
lines.append(f"- Prix médian : {_format_prix(prix_maisons)}")
lines.append(f"- Prix/m² médian : {_format_pxm2(pxm2_maisons)}")
pxm2_apparts = _safe_float(row.get("pxm2_median_cod121"))
prix_apparts = _safe_float(row.get("valeurfonc_median_cod121"))
if nb_apparts and nb_apparts > 0:
lines.append(f"\n**Appartements** ({nb_apparts:,} transactions) :".replace(",", " "))
lines.append(f"- Prix médian : {_format_prix(prix_apparts)}")
lines.append(f"- Prix/m² médian : {_format_pxm2(pxm2_apparts)}")
# Tendance
row_2020 = _get_dv3f_row(ech, ech_code, 2020)
if row_2020 is not None:
pxm2_2020 = _safe_float(row_2020.get("pxm2_median_cod111"))
if pxm2_2020 and pxm2_maisons:
evol = ((pxm2_maisons - pxm2_2020) / pxm2_2020) * 100
lines.append(f"\n**Tendance** : prix/m² maisons {evol:+.1f}% entre 2020 et 2024")
lines.append("")
# --- Partie 3 : Analyse croisée ---
lines.append("### 3. Analyse croisée pour le ZAN")
lines.append("")
if stats and nb_friches > 0 and result is not None:
nb_mob = int(stats.get("nb_mobilisables", 0))
surf_mob = stats.get("surface_mobilisable_ha", 0)
lines.append(f"- **Gisement de friches mobilisables** (sans projet + potentielles) : {nb_mob} friches, {surf_mob:,.1f} ha".replace(",", " "))
if nb_mob > 0:
if pxm2_maisons:
lines.append(f"- **Contexte marché** : le prix/m² local ({_format_pxm2(pxm2_maisons)} pour les maisons) permet de contextualiser le coût de réhabilitation")
lines.append(f"- **Potentiel ZAN** : ces {nb_mob} friches représentent du foncier déjà artificialisé, mobilisable pour éviter la consommation de nouveaux espaces naturels ou agricoles")
else:
lines.append("- Toutes les friches du territoire ont déjà un projet ou ont été reconverties.")
elif nb_friches == 0:
lines.append("Pas de friches inventoriées sur ce territoire — le diagnostic ZAN nécessiterait un inventaire local complémentaire.")
else:
lines.append("Données de marché foncier indisponibles — le croisement friches/marché n'est pas possible.")
lines.append("")
lines.append("---")
lines.append("*Sources : Cartofriches (CEREMA) + DV3F (CEREMA/DGFiP).*")
return "\n".join(lines)