""" Serveur MCP CEREMA — Tools pour les données foncières et friches. Ce module définit les 5 fonctions-tools exposées via Gradio MCP : 1. rechercher_friches : recherche de friches sur un territoire 2. statistiques_prix_foncier : prix et volumes de transactions immobilières 3. evolution_prix : évolution temporelle des prix fonciers 4. statistiques_friches : statistiques agrégées de friches multi-échelle 5. diagnostic_foncier_territoire : vision combinée friches + marché foncier """ import pandas as pd from data_loader import ( load_dv3f, load_friches, get_departement_from_commune, get_friches_agg, get_all_echelles_for_commune, get_epci_for_commune, get_region_for_commune, get_region_for_departement, REG_NAMES, PERIODES_CONSTRUCTION, ) # ============================================================================= # Helpers # ============================================================================= def _format_prix(val: float | None) -> str: """Formate un prix en euros lisible.""" if val is None or pd.isna(val): return "non disponible" if val >= 1_000_000: return f"{val/1_000_000:,.2f} M€".replace(",", " ") if val >= 1_000: return f"{val:,.0f} €".replace(",", " ") return f"{val:.0f} €" def _format_pxm2(val: float | None) -> str: """Formate un prix au m².""" if val is None or pd.isna(val): return "non disponible" return f"{val:,.0f} €/m²".replace(",", " ") def _format_surface(val: float | None, unite: str = "m²") -> str: """Formate une surface.""" if val is None or pd.isna(val): return "non disponible" return f"{val:,.0f} {unite}".replace(",", " ") def _safe_int(val) -> int | None: """Convertit en int si possible, None sinon.""" try: if pd.isna(val): return None return int(val) except (ValueError, TypeError): return None def _safe_float(val) -> float | None: """Convertit en float si possible, None sinon.""" try: if pd.isna(val): return None return float(val) except (ValueError, TypeError): return None def _get_dv3f_row(echelle: str, code: str, annee: int) -> pd.Series | None: """Récupère une ligne DV3F par index.""" df = load_dv3f() try: row = df.loc[(echelle, code, annee)] if isinstance(row, pd.DataFrame): row = row.iloc[0] return row except KeyError: return None def _get_dv3f_series(echelle: str, code: str) -> pd.DataFrame | None: """Récupère toutes les années pour un territoire.""" df = load_dv3f() try: result = df.loc[(echelle, code)] if isinstance(result, pd.Series): result = result.to_frame().T return result except KeyError: return None def _find_best_echelle(commune: str = "", departement: str = "", annee: int = 2024): """Trouve la meilleure échelle disponible avec fallback. Retourne (echelle, code, libelle, row) ou None. """ df = load_dv3f() # 1. Essayer la commune if commune: row = _get_dv3f_row("communes", commune, annee) if row is not None: nb = _safe_int(row.get("nbtrans_cod111")) if nb is not None and nb >= 5: return ("communes", commune, row.get("libelle", commune), row) # Commune trouvée mais pas assez de transactions pour les prix # On retourne quand même la commune mais on signalera le fallback si besoin dep = departement or get_departement_from_commune(commune) row_dep = _get_dv3f_row("departements", dep, annee) if row_dep is not None: return ("departements", dep, row_dep.get("libelle", dep), row_dep, "communes", commune, row) else: # Commune non trouvée, essayer le département dep = departement or get_departement_from_commune(commune) row_dep = _get_dv3f_row("departements", dep, annee) if row_dep is not None: return ("departements", dep, row_dep.get("libelle", dep), row_dep) # 2. Essayer le département if departement: row = _get_dv3f_row("departements", departement, annee) if row is not None: return ("departements", departement, row.get("libelle", departement), row) return None # ============================================================================= # Tool 1 : Recherche de friches # ============================================================================= def rechercher_friches( commune: str = "", departement: str = "", type_friche: str = "", surface_min: float = 0, statut: str = "", ) -> str: """Recherche des friches disponibles sur un territoire donné. Interroge la base Cartofriches du CEREMA (inventaire national des friches) pour trouver les friches correspondant aux critères. Utile pour la politique de Zéro Artificialisation Nette (ZAN) : identifier les terrains déjà artificialisés qui peuvent être réhabilités plutôt que de consommer de nouveaux espaces naturels. Args: commune: Code INSEE de la commune (ex: "13055" pour Marseille, "75056" pour Paris). Optionnel. departement: Code du département (ex: "13", "75", "59"). Optionnel. type_friche: Type de friche à rechercher. Valeurs possibles : "industrielle", "habitat", "commerciale", "ferroviaire", "militaire", "hospitalière", "logistique", "agro-industrielle", "équipement public", "carrière ou mine". Optionnel. surface_min: Surface minimale de la friche en mètres carrés (ex: 5000 pour 0.5 ha). Défaut : 0. statut: Statut de la friche. Valeurs possibles : "sans projet", "avec projet", "potentielle", "reconvertie". Optionnel. Returns: Texte structuré décrivant les friches trouvées, avec pour chacune : nom, type, surface, statut, pollution, zonage urbanisme, et commune. Si aucun résultat, un message explicite. """ gdf = load_friches() mask = pd.Series([True] * len(gdf), index=gdf.index) # Filtres if commune: mask &= gdf["comm_insee"] == commune if departement: mask &= gdf["dep"] == departement if type_friche: type_mapping = { "industrielle": "friche industrielle", "habitat": "friche d'habitat", "commerciale": "friche commerciale", "ferroviaire": "friche ferroviaire", "militaire": "friche militaire", "hospitalière": "friche hospitalière", "logistique": "friche logistique", "agro-industrielle": "friche agro-industrielle", "équipement public": "friche d'équipement public", "carrière ou mine": "friche carrière ou mine", } type_val = type_mapping.get(type_friche.lower(), type_friche) mask &= gdf["site_type"].str.lower() == type_val.lower() if surface_min > 0: mask &= pd.to_numeric(gdf["site_surface"], errors="coerce") >= surface_min if statut: statut_mapping = { "sans projet": "friche sans projet", "avec projet": "friche avec projet", "potentielle": "friche potentielle", "reconvertie": "friche reconvertie", } statut_val = statut_mapping.get(statut.lower(), statut) mask &= gdf["site_statut"].str.lower() == statut_val.lower() results = gdf[mask] if len(results) == 0: filters_desc = [] if commune: filters_desc.append(f"commune {commune}") if departement: filters_desc.append(f"département {departement}") if type_friche: filters_desc.append(f"type '{type_friche}'") if surface_min > 0: filters_desc.append(f"surface ≥ {surface_min:,.0f} m²") if statut: filters_desc.append(f"statut '{statut}'") return f"Aucune friche trouvée avec les critères : {', '.join(filters_desc)}. Essayez d'élargir la recherche (retirer un filtre ou chercher au niveau département)." # Construire la réponse total = len(results) lines = [] territory = "" if commune: commune_name = results.iloc[0]["comm_nom"] if len(results) > 0 else commune territory = f"la commune de {commune_name} ({commune})" elif departement: territory = f"le département {departement}" else: territory = "le territoire recherché" lines.append(f"## Friches trouvées sur {territory}") lines.append(f"**{total} friche(s) trouvée(s)**" + (f" (les 30 premières sont affichées)" if total > 30 else "")) lines.append("") # Statistiques résumées (sur TOUS les résultats, pas seulement les 30 premiers) surfaces = pd.to_numeric(results["site_surface"], errors="coerce") lines.append(f"**Surface totale** : {surfaces.sum()/10000:,.1f} hectares") lines.append(f"**Surface médiane** : {surfaces.median()/10000:,.2f} hectares") lines.append("") # Répartition par statut (sur TOUS les résultats) statut_counts = results["site_statut"].value_counts() lines.append("**Répartition par statut** :") for s, c in statut_counts.items(): lines.append(f"- {s} : {c}") lines.append("") # Limiter à 30 résultats pour le détail results_display = results.head(30) # Liste détaillée lines.append("### Détail des friches") lines.append("") for _, row in results_display.iterrows(): surface_val = _safe_float(row.get("site_surface")) surface_ha = f"{surface_val/10000:,.2f} ha" if surface_val else "non renseignée" surface_m2 = f"{surface_val:,.0f} m²" if surface_val else "" pollution = row.get("sol_pollution_existe", "inconnu") if pd.isna(pollution): pollution = "inconnu" zonage = row.get("urba_zone_type", "non renseigné") if pd.isna(zonage): zonage = "non renseigné" bati_etat = row.get("bati_etat", "inconnu") if pd.isna(bati_etat): bati_etat = "inconnu" site_type = row.get("site_type", "inconnu") if pd.isna(site_type): site_type = "inconnu" lines.append(f"**{row.get('site_nom', 'Sans nom')}**") lines.append(f"- Commune : {row.get('comm_nom', '?')} ({row.get('comm_insee', '?')})") lines.append(f"- Type : {site_type}") lines.append(f"- Surface : {surface_ha} ({surface_m2})") lines.append(f"- Statut : {row.get('site_statut', '?')}") lines.append(f"- Pollution sol : {pollution}") lines.append(f"- Zonage urbanisme : {zonage}") lines.append(f"- État du bâti : {bati_etat}") lines.append("") return "\n".join(lines) # ============================================================================= # Tool 2 : Statistiques de prix foncier # ============================================================================= def statistiques_prix_foncier( commune: str = "", departement: str = "", type_bien: str = "tous", annee: str = "2024", ) -> str: """Récupère les statistiques de prix foncier (transactions immobilières) sur un territoire. Utilise les données DV3F du CEREMA (Demande de Valeurs Foncières) qui compilent l'ensemble des transactions immobilières en France depuis 2010. Fournit les prix médians, prix au m², et volumes de transactions pour les maisons et appartements. Si la commune est trop petite (moins de 5 transactions), les statistiques du département sont automatiquement fournies en complément. Args: commune: Code INSEE de la commune (ex: "13055" pour Marseille). Optionnel. departement: Code du département (ex: "13", "75"). Optionnel. Si ni commune ni département ne sont fournis, un message d'erreur est retourné. type_bien: Type de bien immobilier. Valeurs : "tous" (défaut), "maison", "appartement". annee: Année des statistiques (de 2010 à 2024). Défaut : "2024". Returns: Texte structuré avec le nombre de transactions, le prix médian, le prix au m² (quartiles Q25, médian, Q75), et la surface médiane. Inclut aussi la ventilation par période de construction (avant 1914, 1914-1947, etc.) pour identifier le potentiel de rénovation énergétique (biens anciens vs neufs). """ if not commune and not departement: return "Veuillez fournir au moins un code INSEE de commune ou un code département pour obtenir des statistiques de prix foncier." try: annee_int = int(annee) except ValueError: return f"L'année '{annee}' n'est pas valide. Veuillez fournir une année entre 2010 et 2024." if annee_int < 2010 or annee_int > 2024: return f"L'année {annee_int} est hors de la période couverte (2010–2024)." result = _find_best_echelle(commune, departement, annee_int) if result is None: return f"Aucune donnée trouvée pour le territoire demandé (commune={commune}, département={departement}, année={annee})." # Gérer le cas du fallback avec données commune commune_row = None commune_code = None if len(result) == 7: echelle, code, libelle, row, _, commune_code, commune_row = result fallback = True else: echelle, code, libelle, row = result fallback = echelle == "departements" and commune != "" lines = [] # Titre if echelle == "communes": lines.append(f"## Prix foncier à {libelle} ({code}) — {annee_int}") elif echelle == "departements": lines.append(f"## Prix foncier dans le département {libelle} ({code}) — {annee_int}") if commune and commune_row is not None: nb_trans = _safe_int(commune_row.get("nbtrans_cod1")) lines.append(f"\n> **Note** : La commune {commune} a seulement {nb_trans or 0} transaction(s) en {annee_int}, ce qui est insuffisant pour des statistiques de prix fiables. Les données du département sont présentées en complément.") elif commune: lines.append(f"\n> **Note** : La commune {commune} n'a pas été trouvée dans les données. Les données du département sont présentées.") else: lines.append(f"## Prix foncier — {libelle} ({code}) — {annee_int}") lines.append("") # Nombre total de transactions nb_total = _safe_int(row.get("nbtrans_cod1")) nb_maisons = _safe_int(row.get("nbtrans_cod111")) nb_apparts = _safe_int(row.get("nbtrans_cod121")) nb_terrains = _safe_int(row.get("nbtrans_cod2")) lines.append("### Volume de transactions") lines.append(f"- Total mutations : **{nb_total:,}**".replace(",", " ") if nb_total else "- Total mutations : non disponible") lines.append(f"- Maisons : **{nb_maisons:,}**".replace(",", " ") if nb_maisons else "- Maisons : 0") lines.append(f"- Appartements : **{nb_apparts:,}**".replace(",", " ") if nb_apparts else "- Appartements : 0") lines.append(f"- Terrains (non bâti) : **{nb_terrains:,}**".replace(",", " ") if nb_terrains else "- Terrains : 0") lines.append("") # Maisons if type_bien in ("tous", "maison"): lines.append("### Maisons") prix_med = _safe_float(row.get("valeurfonc_median_cod111")) prix_q25 = _safe_float(row.get("valeurfonc_q25_cod111")) prix_q75 = _safe_float(row.get("valeurfonc_q75_cod111")) pxm2_med = _safe_float(row.get("pxm2_median_cod111")) pxm2_q25 = _safe_float(row.get("pxm2_q25_cod111")) pxm2_q75 = _safe_float(row.get("pxm2_q75_cod111")) surf_med = _safe_float(row.get("sbati_median_cod111")) lines.append(f"- Prix médian : **{_format_prix(prix_med)}** (Q25: {_format_prix(prix_q25)}, Q75: {_format_prix(prix_q75)})") lines.append(f"- Prix au m² médian : **{_format_pxm2(pxm2_med)}** (Q25: {_format_pxm2(pxm2_q25)}, Q75: {_format_pxm2(pxm2_q75)})") lines.append(f"- Surface médiane : **{_format_surface(surf_med)}**") lines.append("") # Par période lines.append("**Transactions par période de construction** :") for p in ["mp1", "mp2", "mp3", "mp4", "mp5", "mpx"]: nb = _safe_int(row.get(f"nbtrans_{p}")) if nb and nb > 0: prix = _safe_float(row.get(f"valeurfonc_median_{p}")) pxm2 = _safe_float(row.get(f"pxm2_median_{p}")) periode_label = PERIODES_CONSTRUCTION[p] lines.append(f"- {periode_label} : {nb} transactions, prix médian {_format_prix(prix)}, {_format_pxm2(pxm2)}") lines.append("") # Appartements if type_bien in ("tous", "appartement"): lines.append("### Appartements") prix_med = _safe_float(row.get("valeurfonc_median_cod121")) prix_q25 = _safe_float(row.get("valeurfonc_q25_cod121")) prix_q75 = _safe_float(row.get("valeurfonc_q75_cod121")) pxm2_med = _safe_float(row.get("pxm2_median_cod121")) pxm2_q25 = _safe_float(row.get("pxm2_q25_cod121")) pxm2_q75 = _safe_float(row.get("pxm2_q75_cod121")) surf_med = _safe_float(row.get("sbati_median_cod121")) if nb_apparts and nb_apparts > 0: lines.append(f"- Prix médian : **{_format_prix(prix_med)}** (Q25: {_format_prix(prix_q25)}, Q75: {_format_prix(prix_q75)})") lines.append(f"- Prix au m² médian : **{_format_pxm2(pxm2_med)}** (Q25: {_format_pxm2(pxm2_q25)}, Q75: {_format_pxm2(pxm2_q75)})") lines.append(f"- Surface médiane : **{_format_surface(surf_med)}**") lines.append("") # Par période lines.append("**Transactions par période de construction** :") for p in ["ap1", "ap2", "ap3", "ap4", "ap5", "apx"]: nb = _safe_int(row.get(f"nbtrans_{p}")) if nb and nb > 0: prix = _safe_float(row.get(f"valeurfonc_median_{p}")) pxm2 = _safe_float(row.get(f"pxm2_median_{p}")) periode_label = PERIODES_CONSTRUCTION[p] lines.append(f"- {periode_label} : {nb} transactions, prix médian {_format_prix(prix)}, {_format_pxm2(pxm2)}") else: lines.append("Pas de transactions d'appartements sur ce territoire pour cette année.") lines.append("") # Source lines.append("---") lines.append(f"*Source : DV3F (CEREMA/DGFiP), échelle {echelle}, année {annee_int}.*") return "\n".join(lines) # ============================================================================= # Tool 3 : Évolution des prix # ============================================================================= def evolution_prix( commune: str = "", departement: str = "", type_bien: str = "maison", ) -> str: """Retourne l'évolution des prix fonciers sur un territoire de 2010 à 2024. Permet de visualiser la tendance du marché immobilier sur 15 ans : évolution du prix médian, du prix au m², et du volume de transactions. Utile pour les études de marché et l'analyse des dynamiques territoriales liées à l'artificialisation. Args: commune: Code INSEE de la commune (ex: "13055"). Optionnel. departement: Code du département (ex: "13"). Optionnel. Au moins un des deux est requis. type_bien: Type de bien : "maison" (défaut) ou "appartement". Returns: Tableau année par année avec le nombre de transactions, le prix médian, et le prix au m² médian. Inclut le calcul de l'évolution entre la première et la dernière année disponible. """ if not commune and not departement: return "Veuillez fournir au moins un code INSEE de commune ou un code département." # Déterminer l'échelle echelle = "communes" if commune else "departements" code = commune if commune else departement data = _get_dv3f_series(echelle, code) if data is None: if commune: dep = departement or get_departement_from_commune(commune) data = _get_dv3f_series("departements", dep) if data is None: return f"Aucune donnée trouvée pour la commune {commune} ni le département {dep}." echelle = "departements" code = dep else: return f"Aucune donnée trouvée pour le département {departement}." # Récupérer le libellé libelle = data.iloc[0].get("libelle", code) if "libelle" in data.columns else code # Colonnes selon le type de bien if type_bien.lower() == "appartement": col_nb = "nbtrans_cod121" col_prix = "valeurfonc_median_cod121" col_pxm2 = "pxm2_median_cod121" bien_label = "Appartements" else: col_nb = "nbtrans_cod111" col_prix = "valeurfonc_median_cod111" col_pxm2 = "pxm2_median_cod111" bien_label = "Maisons" lines = [] if echelle == "communes": lines.append(f"## Évolution des prix — {bien_label} à {libelle} ({code})") else: lines.append(f"## Évolution des prix — {bien_label} dans le département {libelle} ({code})") lines.append("") # Construire le tableau lines.append("| Année | Nb transactions | Prix médian | Prix/m² médian |") lines.append("|-------|-----------------|-------------|----------------|") first_prix = None last_prix = None first_pxm2 = None last_pxm2 = None # data index is annee for annee_val in sorted(data.index): row = data.loc[annee_val] if isinstance(row, pd.DataFrame): row = row.iloc[0] nb = _safe_int(row.get(col_nb)) prix = _safe_float(row.get(col_prix)) pxm2 = _safe_float(row.get(col_pxm2)) if prix is not None and first_prix is None: first_prix = prix first_pxm2 = pxm2 if prix is not None: last_prix = prix last_pxm2 = pxm2 nb_str = f"{nb:,}".replace(",", " ") if nb is not None else "-" prix_str = _format_prix(prix) if prix else "-" pxm2_str = _format_pxm2(pxm2) if pxm2 else "-" lines.append(f"| {annee_val} | {nb_str} | {prix_str} | {pxm2_str} |") lines.append("") # Évolution if first_prix and last_prix: evol_prix = ((last_prix - first_prix) / first_prix) * 100 lines.append(f"**Évolution du prix médian** : {evol_prix:+.1f}% sur la période") if first_pxm2 and last_pxm2: evol_pxm2 = ((last_pxm2 - first_pxm2) / first_pxm2) * 100 lines.append(f"**Évolution du prix/m²** : {evol_pxm2:+.1f}% sur la période") lines.append("") lines.append("---") lines.append(f"*Source : DV3F (CEREMA/DGFiP), échelle {echelle}.*") return "\n".join(lines) # ============================================================================= # Tool 4 : Statistiques agrégées de friches (multi-échelle) # ============================================================================= def _format_friches_agg(stats: dict, echelle_label: str, territoire_label: str) -> str: """Formate les statistiques agrégées de friches en texte lisible.""" lines = [] lines.append(f"## Statistiques des friches — {territoire_label}") lines.append(f"*Échelle : {echelle_label}*") lines.append("") nb = int(stats.get("nb_friches", 0)) if nb == 0: lines.append("**Aucune friche recensée** dans la base Cartofriches pour ce territoire.") return "\n".join(lines) # Chiffres clés lines.append("### Chiffres clés") lines.append(f"- **Nombre de friches** : {nb:,}".replace(",", " ")) nb_communes = stats.get("nb_communes", 0) if isinstance(nb_communes, (int, float)) and nb_communes > 1: lines.append(f"- **Communes concernées** : {int(nb_communes):,}".replace(",", " ")) surf_tot = stats.get("surface_totale_ha", 0) lines.append(f"- **Surface totale** : {surf_tot:,.1f} hectares ({surf_tot/100:,.2f} km²)".replace(",", " ")) lines.append(f"- **Surface médiane** : {stats.get('surface_mediane_ha', 0):,.2f} hectares".replace(",", " ")) lines.append(f"- **Surface moyenne** : {stats.get('surface_moyenne_ha', 0):,.2f} hectares".replace(",", " ")) lines.append("") # Statuts lines.append("### Répartition par statut") for statut_key, statut_label in [ ("nb_potentielle", "Friche potentielle"), ("nb_sans_projet", "Friche sans projet"), ("nb_avec_projet", "Friche avec projet"), ("nb_reconvertie", "Friche reconvertie"), ]: count = int(stats.get(statut_key, 0)) pct = round(100 * count / nb, 1) if nb > 0 else 0 lines.append(f"- {statut_label} : **{count}** ({pct}%)") lines.append("") # Mobilisables nb_mob = int(stats.get("nb_mobilisables", 0)) surf_mob = stats.get("surface_mobilisable_ha", 0) lines.append("### Potentiel mobilisable (ZAN)") lines.append(f"- **Friches mobilisables** (sans projet + potentielles) : **{nb_mob}** friches") lines.append(f"- **Surface mobilisable** : **{surf_mob:,.1f} hectares**".replace(",", " ")) lines.append("") # Qualité nb_poll = int(stats.get("nb_polluees", 0)) nb_u = int(stats.get("nb_zone_u", 0)) pct_u = stats.get("pct_zone_u", 0) lines.append("### Caractéristiques") lines.append(f"- Pollution avérée ou supposée : {nb_poll} ({round(100*nb_poll/nb, 1) if nb else 0}%)") lines.append(f"- En zone urbanisée (U) : {nb_u} ({pct_u}%) — requalifiables sans artificialisation") lines.append("") # Types principaux top_types = stats.get("top_types", {}) if top_types and isinstance(top_types, dict): lines.append("### Principaux types de friches") for type_name, count in sorted(top_types.items(), key=lambda x: -x[1]): lines.append(f"- {type_name} : {int(count)}") lines.append("") lines.append("---") lines.append("*Source : Cartofriches (CEREMA), statistiques pré-agrégées.*") return "\n".join(lines) def statistiques_friches( commune: str = "", epci: str = "", departement: str = "", region: str = "", echelle: str = "", ) -> str: """Fournit les statistiques agrégées des friches à différentes échelles territoriales. Retourne le nombre de friches, la surface totale et mobilisable, la répartition par statut et type, le taux de pollution et le pourcentage en zone urbanisée. Les données sont pré-agrégées pour des réponses instantanées. Fonctionne à 5 échelles : commune, EPCI (intercommunalité), département, région, national. Si une commune est fournie, l'outil retourne aussi automatiquement les statistiques aux échelles supérieures (EPCI, département, région) pour permettre la comparaison. Args: commune: Code INSEE de la commune (ex: "13055" pour Marseille). Optionnel. epci: Code SIREN de l'EPCI / intercommunalité (ex: "200054807" pour Aix-Marseille-Provence). Optionnel. departement: Code du département (ex: "13", "59"). Optionnel. region: Code de la région (ex: "93" pour PACA, "32" pour Hauts-de-France). Optionnel. echelle: Forcer une échelle spécifique : "commune", "epci", "departement", "region", "national". Si vide, l'échelle est déduite des paramètres fournis. Mettre "national" pour les statistiques France entière. Returns: Statistiques structurées : nombre de friches, surface totale et mobilisable, répartition par statut/type, pollution, zone urbanisée. Si une commune est fournie, inclut aussi les statistiques EPCI, département et région pour comparaison. """ # Cas national if echelle == "national" or (not commune and not epci and not departement and not region): if not echelle: return ("Veuillez fournir au moins un territoire (commune, EPCI, département, région) " "ou préciser echelle='national' pour les statistiques France entière.") stats = get_friches_agg("national") if stats: return _format_friches_agg(stats, "National", "France entière") return "Erreur : données nationales non disponibles." # Si commune fournie → retourner multi-échelle pour comparaison if commune: infos = get_all_echelles_for_commune(commune) lines_parts = [] # Commune stats_comm = get_friches_agg("commune", commune) if stats_comm: lines_parts.append(_format_friches_agg( stats_comm, "Commune", f"{infos.get('commune_nom', '')} ({commune})" )) else: lines_parts.append(f"## Friches — {infos.get('commune_nom', commune)} ({commune})\n\nAucune friche recensée dans cette commune.") # EPCI epci_code = infos.get("epci", "") if epci_code and epci_code != "ZZZZZZZZZ": stats_epci = get_friches_agg("epci", epci_code) if stats_epci: lines_parts.append(_format_friches_agg( stats_epci, "EPCI (intercommunalité)", f"{infos.get('epci_nom', epci_code)}" )) # Département dep = infos.get("departement", "") if dep: stats_dep = get_friches_agg("departement", dep) if stats_dep: lines_parts.append(_format_friches_agg( stats_dep, "Département", f"Département {dep}" )) # Région reg = infos.get("region", "") if reg: stats_reg = get_friches_agg("region", reg) if stats_reg: lines_parts.append(_format_friches_agg( stats_reg, "Région", f"{infos.get('region_nom', reg)}" )) return "\n\n---\n\n".join(lines_parts) # EPCI seul if epci: stats = get_friches_agg("epci", epci) if stats: return _format_friches_agg(stats, "EPCI (intercommunalité)", stats.get("libelle", epci)) return f"Aucune donnée de friches pour l'EPCI {epci}." # Département seul if departement: stats = get_friches_agg("departement", departement) if stats: return _format_friches_agg(stats, "Département", f"Département {departement}") return f"Aucune donnée de friches pour le département {departement}." # Région seule if region: stats = get_friches_agg("region", region) if stats: reg_nom = REG_NAMES.get(region, region) return _format_friches_agg(stats, "Région", reg_nom) return f"Aucune donnée de friches pour la région {region}." return "Paramètres insuffisants. Fournissez un code commune, EPCI, département ou région." # ============================================================================= # Tool 5 : Diagnostic foncier territorial (multi-échelle) # ============================================================================= def diagnostic_foncier_territoire( commune: str = "", epci: str = "", departement: str = "", region: str = "", ) -> str: """Fournit un diagnostic foncier complet d'un territoire en croisant friches et marché immobilier. Outil de synthèse qui combine les données Cartofriches (inventaire des friches) et DV3F (transactions immobilières) pour donner une vision globale de la situation foncière d'un territoire. Particulièrement utile dans le cadre du Zéro Artificialisation Nette (ZAN) pour identifier le potentiel de requalification et contextualiser avec le marché local. Fonctionne à toutes les échelles : commune, EPCI, département, région. Args: commune: Code INSEE de la commune (ex: "13055" pour Marseille). Optionnel. epci: Code SIREN de l'EPCI (ex: "200054807"). Optionnel. departement: Code du département (ex: "13"). Optionnel. region: Code de la région (ex: "93" pour PACA). Optionnel. Returns: Diagnostic structuré en 3 parties : 1. Inventaire des friches (nombre, surface, types, statuts) 2. Marché foncier local (prix, volumes, tendance) 3. Analyse croisée (contextualisation friches / marché) """ if not commune and not epci and not departement and not region: return "Veuillez fournir au moins un territoire (commune, EPCI, département ou région)." lines = [] # --- Déterminer l'échelle et le territoire --- if commune: friches_echelle, friches_code = "commune", commune dv3f_echelle, dv3f_code = "communes", commune infos = get_all_echelles_for_commune(commune) nom_territoire = f"{infos.get('commune_nom', commune)} ({commune})" dep_code = infos.get("departement", get_departement_from_commune(commune)) elif epci: friches_echelle, friches_code = "epci", epci dv3f_echelle, dv3f_code = "epci", epci stats_temp = get_friches_agg("epci", epci) nom_territoire = stats_temp.get("libelle", epci) if stats_temp else epci dep_code = "" elif departement: friches_echelle, friches_code = "departement", departement dv3f_echelle, dv3f_code = "departements", departement nom_territoire = f"Département {departement}" dep_code = departement else: # region friches_echelle, friches_code = "region", region dv3f_echelle, dv3f_code = "regions", region nom_territoire = REG_NAMES.get(region, f"Région {region}") dep_code = "" lines.append(f"## Diagnostic foncier — {nom_territoire}") lines.append("") # --- Partie 1 : Friches (depuis les agrégations pré-calculées) --- lines.append("### 1. Inventaire des friches (Cartofriches)") lines.append("") stats = get_friches_agg(friches_echelle, friches_code) if stats is None or int(stats.get("nb_friches", 0)) == 0: lines.append("**Aucune friche recensée** dans la base Cartofriches pour ce territoire.") lines.append("> Cela ne signifie pas qu'il n'y a pas de friches, mais qu'aucune n'a été inventoriée.") nb_friches = 0 else: nb_friches = int(stats["nb_friches"]) nb_communes = stats.get("nb_communes", 0) lines.append(f"- **Nombre de friches** : {nb_friches:,}".replace(",", " ")) if isinstance(nb_communes, (int, float)) and nb_communes > 1: lines.append(f"- **Communes concernées** : {int(nb_communes):,}".replace(",", " ")) lines.append(f"- **Surface totale** : {stats['surface_totale_ha']:,.1f} hectares".replace(",", " ")) lines.append(f"- **Surface médiane** : {stats['surface_mediane_ha']:,.2f} hectares".replace(",", " ")) lines.append("") # Par statut lines.append("**Par statut** :") for key, label in [("nb_potentielle", "friche potentielle"), ("nb_sans_projet", "friche sans projet"), ("nb_avec_projet", "friche avec projet"), ("nb_reconvertie", "friche reconvertie")]: count = int(stats.get(key, 0)) if count > 0: pct = round(100 * count / nb_friches, 0) lines.append(f"- {label} : {count} ({pct:.0f}%)") lines.append("") # Types top_types = stats.get("top_types", {}) if top_types and isinstance(top_types, dict): lines.append("**Principaux types** :") for t, c in sorted(top_types.items(), key=lambda x: -x[1])[:5]: lines.append(f"- {t} : {int(c)}") lines.append("") # Pollution et zone U nb_poll = int(stats.get("nb_polluees", 0)) nb_u = int(stats.get("nb_zone_u", 0)) pct_u = stats.get("pct_zone_u", 0) lines.append(f"**Pollution** : {nb_poll} friche(s) avec pollution avérée ou supposée ({round(100*nb_poll/nb_friches)}%)") lines.append(f"**En zone urbanisée (U)** : {nb_u} ({pct_u}%) — requalifiables sans artificialisation") lines.append("") # --- Partie 2 : Marché foncier --- lines.append("### 2. Marché foncier (DV3F)") lines.append("") annee = 2024 pxm2_maisons = None if commune: result = _find_best_echelle(commune, dep_code, annee) elif departement: result = _find_best_echelle("", departement, annee) elif epci: row = _get_dv3f_row("epci", epci, annee) result = ("epci", epci, nom_territoire, row) if row is not None else None elif region: row = _get_dv3f_row("regions", region, annee) result = ("regions", region, nom_territoire, row) if row is not None else None else: result = None if result is None: lines.append("Aucune donnée de marché foncier disponible pour ce territoire.") else: if len(result) == 7: ech, ech_code, ech_libelle, row, _, _, _ = result else: ech, ech_code, ech_libelle, row = result if ech == "departements" and commune: lines.append(f"> Données au niveau département ({ech_libelle}) — commune trop petite pour des statistiques fiables.") lines.append("") nb_total = _safe_int(row.get("nbtrans_cod1")) nb_maisons = _safe_int(row.get("nbtrans_cod111")) nb_apparts = _safe_int(row.get("nbtrans_cod121")) lines.append(f"**Année {annee}** :") lines.append(f"- Transactions totales : {nb_total:,}".replace(",", " ") if nb_total else "- Transactions totales : non disponible") lines.append("") pxm2_maisons = _safe_float(row.get("pxm2_median_cod111")) prix_maisons = _safe_float(row.get("valeurfonc_median_cod111")) if nb_maisons and nb_maisons > 0: lines.append(f"**Maisons** ({nb_maisons:,} transactions) :".replace(",", " ")) lines.append(f"- Prix médian : {_format_prix(prix_maisons)}") lines.append(f"- Prix/m² médian : {_format_pxm2(pxm2_maisons)}") pxm2_apparts = _safe_float(row.get("pxm2_median_cod121")) prix_apparts = _safe_float(row.get("valeurfonc_median_cod121")) if nb_apparts and nb_apparts > 0: lines.append(f"\n**Appartements** ({nb_apparts:,} transactions) :".replace(",", " ")) lines.append(f"- Prix médian : {_format_prix(prix_apparts)}") lines.append(f"- Prix/m² médian : {_format_pxm2(pxm2_apparts)}") # Tendance row_2020 = _get_dv3f_row(ech, ech_code, 2020) if row_2020 is not None: pxm2_2020 = _safe_float(row_2020.get("pxm2_median_cod111")) if pxm2_2020 and pxm2_maisons: evol = ((pxm2_maisons - pxm2_2020) / pxm2_2020) * 100 lines.append(f"\n**Tendance** : prix/m² maisons {evol:+.1f}% entre 2020 et 2024") lines.append("") # --- Partie 3 : Analyse croisée --- lines.append("### 3. Analyse croisée pour le ZAN") lines.append("") if stats and nb_friches > 0 and result is not None: nb_mob = int(stats.get("nb_mobilisables", 0)) surf_mob = stats.get("surface_mobilisable_ha", 0) lines.append(f"- **Gisement de friches mobilisables** (sans projet + potentielles) : {nb_mob} friches, {surf_mob:,.1f} ha".replace(",", " ")) if nb_mob > 0: if pxm2_maisons: lines.append(f"- **Contexte marché** : le prix/m² local ({_format_pxm2(pxm2_maisons)} pour les maisons) permet de contextualiser le coût de réhabilitation") lines.append(f"- **Potentiel ZAN** : ces {nb_mob} friches représentent du foncier déjà artificialisé, mobilisable pour éviter la consommation de nouveaux espaces naturels ou agricoles") else: lines.append("- Toutes les friches du territoire ont déjà un projet ou ont été reconverties.") elif nb_friches == 0: lines.append("Pas de friches inventoriées sur ce territoire — le diagnostic ZAN nécessiterait un inventaire local complémentaire.") else: lines.append("Données de marché foncier indisponibles — le croisement friches/marché n'est pas possible.") lines.append("") lines.append("---") lines.append("*Sources : Cartofriches (CEREMA) + DV3F (CEREMA/DGFiP).*") return "\n".join(lines)