Spaces:
Sleeping
Sleeping
| import os, shutil | |
| # Désactiver les analytics Gradio dès le débu | |
| os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" | |
| # shutil.rmtree(os.path.expanduser("~/.cache/huggingface/datasets"), ignore_errors=True) | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import warnings | |
| from datasets import load_dataset | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download | |
| import urllib.parse | |
| warnings.filterwarnings('ignore') | |
| # Configuration Hugging Face | |
| hf_token = os.environ.get("HF_TOKEN") | |
| dataset_id = "HackathonCRA/2024" | |
| # Configuration des graphiques | |
| plt.style.use('default') | |
| sns.set_palette("husl") | |
| class AgricultureAnalyzer: | |
| def __init__(self): | |
| self.df = None | |
| self.risk_analysis = None | |
| def load_data(self): | |
| """Charge les données du dataset Hugging Face""" | |
| print("🔄 Chargement des données depuis Hugging Face...") | |
| print(f"📋 Dataset ID: {dataset_id}") | |
| print(f"📋 Token disponible: {'Oui' if hf_token else 'Non'}") | |
| self.df = None | |
| # 1) Tentative de chargement direct via datasets.load_dataset | |
| try: | |
| dataset = load_dataset( | |
| dataset_id, | |
| split="train", | |
| token=hf_token, | |
| trust_remote_code=True, | |
| ) | |
| print(f"📊 Dataset chargé: {len(dataset)} exemples") | |
| try: | |
| self.df = dataset.to_pandas() | |
| print("✅ Conversion to_pandas() réussie") | |
| except Exception as pandas_error: | |
| print(f"❌ Erreur to_pandas(): {pandas_error}") | |
| print("🔄 Tentative de conversion manuelle...") | |
| data_list = [] | |
| for i, item in enumerate(dataset): | |
| data_list.append(item) | |
| if i < 5: | |
| print(f"📋 Exemple {i}: {list(item.keys())}") | |
| self.df = pd.DataFrame(data_list) | |
| print(f"✅ Conversion manuelle réussie: {len(self.df)} lignes") | |
| except Exception as e: | |
| print(f"❌ Erreur lors du chargement depuis Hugging Face: {str(e)}") | |
| print(f"❌ Type d'erreur: {type(e).__name__}") | |
| # 2) Fallback: récupérer directement les fichiers du repo (csv/parquet/tsv/json) | |
| fallback_msg = self._fallback_load_from_repo_files() | |
| if self.df is None: | |
| return f"❌ Erreur lors du chargement du dataset : {str(e)} | Fallback: {fallback_msg}" | |
| # Si on n'a toujours pas de dataframe, arrêter | |
| if self.df is None: | |
| return "❌ Impossible de charger les données" | |
| print(f"📊 Données chargées: {len(self.df)} lignes") | |
| print(f"📊 Colonnes disponibles: {list(self.df.columns)}") | |
| # Nettoyage et validation | |
| required_columns = ["numparcell", "surfparc", "millesime"] | |
| missing_cols = [col for col in required_columns if col not in self.df.columns] | |
| if missing_cols: | |
| print(f"❌ Colonnes manquantes: {missing_cols}") | |
| self.df = None | |
| return f"❌ Colonnes manquantes: {missing_cols}" | |
| # Nettoyage | |
| initial_len = len(self.df) | |
| self.df = self.df.dropna(subset=required_columns) | |
| print(f"📊 Avant nettoyage: {initial_len} lignes") | |
| print(f"📊 Après nettoyage: {len(self.df)} lignes") | |
| def _fallback_load_from_repo_files(self): | |
| """Fallback pour charger les données en téléchargeant directement les fichiers du repo HF.""" | |
| try: | |
| print("🔄 Tentative de chargement alternatif via fichiers du dépôt Hugging Face...") | |
| api = HfApi() | |
| files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset", token=hf_token) | |
| if not files: | |
| print("❌ Aucun fichier dans le dépôt") | |
| return "Aucun fichier trouvé dans le dépôt." | |
| data_files = [ | |
| f for f in files if f.lower().endswith((".parquet", ".csv", ".tsv", ".json")) | |
| ] | |
| if not data_files: | |
| print("❌ Aucun fichier de données exploitable (csv/tsv/parquet/json)") | |
| return "Aucun fichier exploitable (csv/tsv/parquet/json)." | |
| # Priorité: parquet > csv > tsv > json | |
| for ext in [".parquet", ".csv", ".tsv", ".json"]: | |
| selected = [f for f in data_files if f.lower().endswith(ext)] | |
| if selected: | |
| chosen_ext = ext | |
| selected_files = selected | |
| break | |
| print(f"📂 Fichiers détectés ({chosen_ext}): {selected_files[:5]}{' ...' if len(selected_files) > 5 else ''}") | |
| local_paths = [] | |
| for f in selected_files: | |
| local_path = hf_hub_download( | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| filename=f, | |
| token=hf_token, | |
| ) | |
| local_paths.append(local_path) | |
| frames = [] | |
| if chosen_ext == ".parquet": | |
| for p in local_paths: | |
| frames.append(pd.read_parquet(p)) | |
| elif chosen_ext == ".csv": | |
| for p in local_paths: | |
| frames.append(pd.read_csv(p)) | |
| elif chosen_ext == ".tsv": | |
| for p in local_paths: | |
| frames.append(pd.read_csv(p, sep="\t")) | |
| elif chosen_ext == ".json": | |
| for p in local_paths: | |
| try: | |
| frames.append(pd.read_json(p, lines=True)) | |
| except Exception: | |
| frames.append(pd.read_json(p)) | |
| self.df = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0] | |
| print(f"✅ Fallback réussi: {len(self.df)} lignes chargées depuis les fichiers du dépôt") | |
| return None | |
| except Exception as e: | |
| print(f"❌ Fallback échoué: {e}") | |
| # Dernier recours: fichier local d'exemple | |
| sample_path = os.path.join(os.path.dirname(__file__), "sample_data.csv") | |
| if os.path.exists(sample_path): | |
| try: | |
| self.df = pd.read_csv(sample_path) | |
| print(f"✅ Chargement du fichier local 'sample_data.csv' ({len(self.df)} lignes)") | |
| return "Chargement via fichier local de secours." | |
| except Exception as e2: | |
| print(f"❌ Échec du chargement du fichier local: {e2}") | |
| return f"Fallback échoué: {e}" | |
| def analyze_data(self): | |
| """Analyse des données et calcul des risques""" | |
| if self.df is None or len(self.df) == 0: | |
| print("❌ Pas de données à analyser") | |
| return "Erreur: Aucune donnée chargée" | |
| try: | |
| print(f"🔄 Début de l'analyse sur {len(self.df)} enregistrements...") | |
| # Analyse générale | |
| general_stats = { | |
| 'total_parcelles': self.df['numparcell'].nunique(), | |
| 'total_interventions': len(self.df), | |
| 'surface_totale': self.df['surfparc'].sum(), | |
| 'surface_moyenne': self.df['surfparc'].mean(), | |
| 'periode': f"{self.df['millesime'].min()} - {self.df['millesime'].max()}" | |
| } | |
| # Analyse des herbicides | |
| if 'familleprod' in self.df.columns: | |
| herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'].copy() | |
| herbicide_stats = { | |
| 'nb_interventions_herbicides': len(herbicides_df), | |
| 'pourcentage_herbicides': (len(herbicides_df) / len(self.df)) * 100, | |
| 'parcelles_traitees': herbicides_df['numparcell'].nunique() | |
| } | |
| else: | |
| herbicide_stats = { | |
| 'nb_interventions_herbicides': 0, | |
| 'pourcentage_herbicides': 0, | |
| 'parcelles_traitees': 0 | |
| } | |
| # Calcul de l'analyse des risques | |
| self.calculate_risk_analysis() | |
| print("✅ Analyse terminée avec succès") | |
| return general_stats, herbicide_stats | |
| except Exception as e: | |
| print(f"❌ Erreur lors de l'analyse: {str(e)}") | |
| return None, None | |
| def calculate_risk_analysis(self): | |
| """Calcule l'analyse des risques par parcelle""" | |
| try: | |
| print("🔄 Calcul de l'analyse des risques...") | |
| # Vérifier les colonnes nécessaires | |
| required_group_cols = ['numparcell', 'surfparc'] | |
| optional_group_cols = ['nomparc', 'libelleusag'] | |
| # Construire la liste des colonnes de groupement disponibles | |
| group_cols = [col for col in required_group_cols if col in self.df.columns] | |
| group_cols.extend([col for col in optional_group_cols if col in self.df.columns]) | |
| if len(group_cols) < 2: | |
| print(f"❌ Colonnes insuffisantes pour le groupement: {group_cols}") | |
| self.risk_analysis = pd.DataFrame() | |
| return | |
| # Construire l'agrégation selon les colonnes disponibles | |
| agg_dict = {} | |
| if 'familleprod' in self.df.columns: | |
| agg_dict['familleprod'] = lambda x: (x == 'Herbicides').sum() | |
| if 'libevenem' in self.df.columns: | |
| agg_dict['libevenem'] = lambda x: len(x.unique()) | |
| if 'produit' in self.df.columns: | |
| agg_dict['produit'] = lambda x: len(x.unique()) | |
| if 'quantitetot' in self.df.columns: | |
| agg_dict['quantitetot'] = 'sum' | |
| if not agg_dict: | |
| print("❌ Aucune colonne disponible pour l'agrégation") | |
| self.risk_analysis = pd.DataFrame() | |
| return | |
| # Groupement des données par parcelle | |
| risk_analysis = self.df.groupby(group_cols).agg(agg_dict).round(2) | |
| # Quantités d'herbicides spécifiques (seulement si les colonnes existent) | |
| if 'familleprod' in self.df.columns and 'quantitetot' in self.df.columns: | |
| herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] | |
| if len(herbicides_df) > 0: | |
| herbicide_quantities = herbicides_df.groupby(group_cols)['quantitetot'].sum().fillna(0) | |
| risk_analysis['Quantite_herbicides'] = herbicide_quantities.reindex(risk_analysis.index, fill_value=0) | |
| else: | |
| risk_analysis['Quantite_herbicides'] = 0 | |
| else: | |
| risk_analysis['Quantite_herbicides'] = 0 | |
| # Renommer les colonnes de façon sécurisée | |
| new_column_names = {} | |
| if 'familleprod' in agg_dict: | |
| new_column_names['familleprod'] = 'Nb_herbicides' | |
| if 'libevenem' in agg_dict: | |
| new_column_names['libevenem'] = 'Diversite_evenements' | |
| if 'produit' in agg_dict: | |
| new_column_names['produit'] = 'Diversite_produits' | |
| if 'quantitetot' in agg_dict: | |
| new_column_names['quantitetot'] = 'Quantite_totale' | |
| risk_analysis = risk_analysis.rename(columns=new_column_names) | |
| # Calcul de l'IFT approximatif | |
| if 'surfparc' in group_cols: | |
| risk_analysis['IFT_herbicide_approx'] = (risk_analysis['Quantite_herbicides'] / | |
| risk_analysis.index.get_level_values('surfparc')).round(2) | |
| else: | |
| risk_analysis['IFT_herbicide_approx'] = 0 | |
| # Classification du risque | |
| def classify_risk(row): | |
| ift = row.get('IFT_herbicide_approx', 0) | |
| nb_herb = row.get('Nb_herbicides', 0) | |
| if ift == 0 and nb_herb == 0: | |
| return 'TRÈS FAIBLE' | |
| elif ift < 1 and nb_herb <= 1: | |
| return 'FAIBLE' | |
| elif ift < 3 and nb_herb <= 3: | |
| return 'MODÉRÉ' | |
| elif ift < 5 and nb_herb <= 5: | |
| return 'ÉLEVÉ' | |
| else: | |
| return 'TRÈS ÉLEVÉ' | |
| risk_analysis['Risque_adventice'] = risk_analysis.apply(classify_risk, axis=1) | |
| # Tri par risque | |
| risk_order = ['TRÈS FAIBLE', 'FAIBLE', 'MODÉRÉ', 'ÉLEVÉ', 'TRÈS ÉLEVÉ'] | |
| risk_analysis['Risk_Score'] = risk_analysis['Risque_adventice'].map({r: i for i, r in enumerate(risk_order)}) | |
| self.risk_analysis = risk_analysis.sort_values(['Risk_Score', 'IFT_herbicide_approx']) | |
| print(f"✅ Analyse des risques terminée: {len(self.risk_analysis)} parcelles analysées") | |
| except Exception as e: | |
| print(f"❌ Erreur lors du calcul des risques: {str(e)}") | |
| self.risk_analysis = pd.DataFrame() | |
| def get_summary_stats(self): | |
| """Retourne les statistiques de résumé""" | |
| if self.df is None: | |
| return "Aucune donnée disponible" | |
| stats_text = f""" | |
| ## 📊 Statistiques Générales | |
| - **Nombre total de parcelles**: {self.df['numparcell'].nunique()} | |
| - **Nombre d'interventions**: {len(self.df):,} | |
| - **Surface totale**: {self.df['surfparc'].sum():.2f} hectares | |
| - **Surface moyenne par parcelle**: {self.df['surfparc'].mean():.2f} hectares | |
| - **Période**: {self.df['millesime'].min()} - {self.df['millesime'].max()} | |
| ## 🧪 Analyse Herbicides | |
| """ | |
| herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] | |
| if len(herbicides_df) > 0: | |
| stats_text += f""" | |
| - **Interventions herbicides**: {len(herbicides_df)} ({(len(herbicides_df)/len(self.df)*100):.1f}%) | |
| - **Parcelles traitées**: {herbicides_df['numparcell'].nunique()} | |
| - **Produits herbicides différents**: {herbicides_df['produit'].nunique()} | |
| """ | |
| if self.risk_analysis is not None: | |
| risk_distribution = self.risk_analysis['Risque_adventice'].value_counts() | |
| stats_text += f""" | |
| ## 🎯 Répartition des Risques Adventices | |
| """ | |
| for risk_level in ['TRÈS FAIBLE', 'FAIBLE', 'MODÉRÉ', 'ÉLEVÉ', 'TRÈS ÉLEVÉ']: | |
| if risk_level in risk_distribution: | |
| count = risk_distribution[risk_level] | |
| pct = (count / len(self.risk_analysis)) * 100 | |
| stats_text += f"- **{risk_level}**: {count} parcelles ({pct:.1f}%)\n" | |
| return stats_text | |
| def get_low_risk_recommendations(self): | |
| """Retourne les recommandations pour les parcelles à faible risque""" | |
| if self.risk_analysis is None: | |
| return "Analyse des risques non disponible" | |
| low_risk = self.risk_analysis[ | |
| self.risk_analysis['Risque_adventice'].isin(['TRÈS FAIBLE', 'FAIBLE']) | |
| ].head(10) | |
| recommendations = "## 🌾 TOP 10 - Parcelles Recommandées pour Cultures Sensibles (Pois, Haricot)\n\n" | |
| for idx, row in low_risk.iterrows(): | |
| if isinstance(idx, tuple) and len(idx) >= 4: | |
| parcelle, nom, culture, surface = idx[:4] | |
| else: | |
| # Fallback si l'index n'est pas un tuple de 4 éléments | |
| parcelle = str(idx) | |
| nom = "N/A" | |
| culture = "N/A" | |
| surface = row.get('surfparc', 0) if 'surfparc' in row else 0 | |
| recommendations += f""" | |
| **Parcelle {parcelle}** ({nom}) | |
| - Culture actuelle: {culture} | |
| - Surface: {surface:.2f} ha | |
| - Niveau de risque: {row['Risque_adventice']} | |
| - IFT herbicide: {row['IFT_herbicide_approx']:.2f} | |
| - Nombre d'herbicides: {row.get('Nb_herbicides', 0)} | |
| --- | |
| """ | |
| return recommendations | |
| def create_risk_visualization(self): | |
| """Crée la visualisation des risques""" | |
| if self.risk_analysis is None or len(self.risk_analysis) == 0: | |
| # Créer un graphique vide avec message d'erreur | |
| fig = px.scatter(title="❌ Aucune donnée d'analyse des risques disponible") | |
| fig.add_annotation(text="Veuillez charger les données d'abord", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| risk_df = self.risk_analysis.reset_index() | |
| # Vérifier quelles colonnes sont disponibles pour hover_data | |
| available_hover_cols = [] | |
| for col in ['nomparc', 'libelleusag']: | |
| if col in risk_df.columns: | |
| available_hover_cols.append(col) | |
| fig = px.scatter(risk_df, | |
| x='surfparc', | |
| y='IFT_herbicide_approx', | |
| color='Risque_adventice', | |
| size='Nb_herbicides', | |
| hover_data=available_hover_cols if available_hover_cols else None, | |
| color_discrete_map={ | |
| 'TRÈS FAIBLE': 'green', | |
| 'FAIBLE': 'lightgreen', | |
| 'MODÉRÉ': 'orange', | |
| 'ÉLEVÉ': 'red', | |
| 'TRÈS ÉLEVÉ': 'darkred' | |
| }, | |
| title="🎯 Analyse du Risque Adventice par Parcelle", | |
| labels={ | |
| 'surfparc': 'Surface de la parcelle (ha)', | |
| 'IFT_herbicide_approx': 'IFT Herbicide (approximatif)', | |
| 'Risque_adventice': 'Niveau de risque' | |
| }) | |
| fig.update_layout(width=800, height=600, title_font_size=16) | |
| return fig | |
| def create_culture_analysis(self): | |
| """Analyse par type de culture""" | |
| if self.df is None or len(self.df) == 0: | |
| # Créer un graphique vide avec message d'erreur | |
| fig = px.pie(title="❌ Aucune donnée disponible") | |
| fig.add_annotation(text="Veuillez charger les données d'abord", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| culture_counts = self.df['libelleusag'].value_counts() | |
| fig = px.pie(values=culture_counts.values, | |
| names=culture_counts.index, | |
| title="🌱 Répartition des Cultures") | |
| fig.update_layout(width=700, height=500) | |
| return fig | |
| def create_risk_distribution(self): | |
| """Distribution des niveaux de risque""" | |
| if self.risk_analysis is None or len(self.risk_analysis) == 0: | |
| # Créer un graphique vide avec message d'erreur | |
| fig = px.bar(title="❌ Aucune analyse des risques disponible") | |
| fig.add_annotation(text="Veuillez charger les données d'abord", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| risk_counts = self.risk_analysis['Risque_adventice'].value_counts() | |
| fig = px.bar(x=risk_counts.index, | |
| y=risk_counts.values, | |
| color=risk_counts.index, | |
| color_discrete_map={ | |
| 'TRÈS FAIBLE': 'green', | |
| 'FAIBLE': 'lightgreen', | |
| 'MODÉRÉ': 'orange', | |
| 'ÉLEVÉ': 'red', | |
| 'TRÈS ÉLEVÉ': 'darkred' | |
| }, | |
| title="📊 Distribution des Niveaux de Risque Adventice", | |
| labels={'x': 'Niveau de risque', 'y': 'Nombre de parcelles'}) | |
| fig.update_layout(width=700, height=500, showlegend=False) | |
| return fig | |
| def get_available_years(self): | |
| """Retourne la liste des années disponibles dans les données""" | |
| if self.df is None or len(self.df) == 0: | |
| return [] | |
| years = sorted(self.df['millesime'].dropna().unique()) | |
| return [int(year) for year in years if pd.notna(year)] | |
| def filter_data_by_year(self, year): | |
| """Filtre les données par année""" | |
| if self.df is None or year is None: | |
| return None | |
| year_data = self.df[self.df['millesime'] == year].copy() | |
| return year_data | |
| def get_year_summary_stats(self, year): | |
| """Retourne les statistiques de résumé pour une année spécifique""" | |
| year_data = self.filter_data_by_year(year) | |
| if year_data is None or len(year_data) == 0: | |
| return f"❌ Aucune donnée disponible pour l'année {year}" | |
| stats_text = f""" | |
| ## 📊 Statistiques pour l'année {year} | |
| - **Nombre de parcelles**: {year_data['numparcell'].nunique()} | |
| - **Nombre d'interventions**: {len(year_data):,} | |
| - **Surface totale**: {year_data['surfparc'].sum():.2f} hectares | |
| - **Surface moyenne par parcelle**: {year_data['surfparc'].mean():.2f} hectares | |
| ## 🧪 Analyse Herbicides | |
| """ | |
| if 'familleprod' in year_data.columns: | |
| herbicides_df = year_data[year_data['familleprod'] == 'Herbicides'] | |
| if len(herbicides_df) > 0: | |
| stats_text += f""" | |
| - **Interventions herbicides**: {len(herbicides_df)} ({(len(herbicides_df)/len(year_data)*100):.1f}%) | |
| - **Parcelles traitées**: {herbicides_df['numparcell'].nunique()} | |
| - **Produits herbicides différents**: {herbicides_df['produit'].nunique()} | |
| """ | |
| else: | |
| stats_text += "\n- **Aucune intervention herbicide cette année**" | |
| return stats_text | |
| def create_year_culture_analysis(self, year): | |
| """Analyse par type de culture pour une année spécifique""" | |
| year_data = self.filter_data_by_year(year) | |
| if year_data is None or len(year_data) == 0: | |
| fig = px.pie(title=f"❌ Aucune donnée disponible pour {year}") | |
| fig.add_annotation(text=f"Aucune donnée pour l'année {year}", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| if 'libelleusag' not in year_data.columns: | |
| fig = px.pie(title=f"❌ Données de culture manquantes pour {year}") | |
| fig.add_annotation(text="Colonne 'libelleusag' manquante", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| culture_counts = year_data['libelleusag'].value_counts() | |
| fig = px.pie(values=culture_counts.values, | |
| names=culture_counts.index, | |
| title=f"🌱 Répartition des Cultures - {year}") | |
| fig.update_layout(width=700, height=500) | |
| return fig | |
| def create_year_interventions_timeline(self, year): | |
| """Crée un graphique temporel des interventions pour une année""" | |
| year_data = self.filter_data_by_year(year) | |
| if year_data is None or len(year_data) == 0: | |
| fig = px.bar(title=f"❌ Aucune donnée disponible pour {year}") | |
| fig.add_annotation(text=f"Aucune donnée pour l'année {year}", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| if 'datedebut' not in year_data.columns: | |
| # Fallback: graphique des types d'événements | |
| if 'libevenem' in year_data.columns: | |
| event_counts = year_data['libevenem'].value_counts() | |
| fig = px.bar(x=event_counts.index, | |
| y=event_counts.values, | |
| title=f"📊 Types d'Interventions - {year}", | |
| labels={'x': 'Type d\'intervention', 'y': 'Nombre'}) | |
| fig.update_layout(width=800, height=500) | |
| fig.update_xaxis(tickangle=45) | |
| return fig | |
| else: | |
| fig = px.bar(title=f"❌ Données d'intervention manquantes pour {year}") | |
| fig.add_annotation(text="Colonnes de dates manquantes", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| # Convertir les dates et créer le graphique temporel | |
| try: | |
| year_data['datedebut_parsed'] = pd.to_datetime(year_data['datedebut'], | |
| format='%d/%m/%y', errors='coerce') | |
| year_data['mois'] = year_data['datedebut_parsed'].dt.month | |
| monthly_counts = year_data.groupby('mois').size().reset_index() | |
| monthly_counts.columns = ['mois', 'nb_interventions'] | |
| # Ajouter les noms des mois | |
| month_names = {1: 'Jan', 2: 'Fév', 3: 'Mar', 4: 'Avr', 5: 'Mai', 6: 'Jun', | |
| 7: 'Jul', 8: 'Aoû', 9: 'Sep', 10: 'Oct', 11: 'Nov', 12: 'Déc'} | |
| monthly_counts['mois_nom'] = monthly_counts['mois'].map(month_names) | |
| fig = px.bar(monthly_counts, | |
| x='mois_nom', | |
| y='nb_interventions', | |
| title=f"📅 Répartition Mensuelle des Interventions - {year}", | |
| labels={'mois_nom': 'Mois', 'nb_interventions': 'Nombre d\'interventions'}) | |
| fig.update_layout(width=800, height=500) | |
| return fig | |
| except Exception as e: | |
| print(f"❌ Erreur lors de la création du graphique temporel: {e}") | |
| # Fallback vers le graphique des événements | |
| if 'libevenem' in year_data.columns: | |
| event_counts = year_data['libevenem'].value_counts() | |
| fig = px.bar(x=event_counts.index, | |
| y=event_counts.values, | |
| title=f"📊 Types d'Interventions - {year}", | |
| labels={'x': 'Type d\'intervention', 'y': 'Nombre'}) | |
| fig.update_layout(width=800, height=500) | |
| fig.update_xaxis(tickangle=45) | |
| return fig | |
| else: | |
| fig = px.bar(title=f"❌ Erreur lors du traitement des données {year}") | |
| return fig | |
| def create_year_herbicide_analysis(self, year): | |
| """Analyse des herbicides pour une année spécifique""" | |
| year_data = self.filter_data_by_year(year) | |
| if year_data is None or len(year_data) == 0: | |
| fig = px.bar(title=f"❌ Aucune donnée disponible pour {year}") | |
| fig.add_annotation(text=f"Aucune donnée pour l'année {year}", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| if 'familleprod' not in year_data.columns: | |
| fig = px.bar(title=f"❌ Données de produits manquantes pour {year}") | |
| fig.add_annotation(text="Colonne 'familleprod' manquante", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| herbicides_df = year_data[year_data['familleprod'] == 'Herbicides'] | |
| if len(herbicides_df) == 0: | |
| fig = px.bar(title=f"✅ Aucun herbicide utilisé en {year}") | |
| fig.add_annotation(text=f"Aucune intervention herbicide en {year}", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| if 'produit' in herbicides_df.columns: | |
| herbicide_counts = herbicides_df['produit'].value_counts().head(10) | |
| fig = px.bar(x=herbicide_counts.values, | |
| y=herbicide_counts.index, | |
| orientation='h', | |
| title=f"🧪 Top 10 Herbicides Utilisés - {year}", | |
| labels={'x': 'Nombre d\'utilisations', 'y': 'Produit'}) | |
| fig.update_layout(width=800, height=500) | |
| return fig | |
| else: | |
| fig = px.bar(title=f"❌ Détails des produits manquants pour {year}") | |
| fig.add_annotation(text="Colonne 'produit' manquante", | |
| xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| # Initialisation de l'analyseur | |
| analyzer = AgricultureAnalyzer() | |
| analyzer.load_data() | |
| analyzer.analyze_data() # Analyse des données après chargement | |
| # Interface Gradio | |
| def create_interface(): | |
| with gr.Blocks(title="🌾 Analyse Adventices Agricoles CRA", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🌾 Analyse des Adventices Agricoles - CRA Bretagne | |
| **Objectif**: Anticiper et réduire la pression des adventices dans les parcelles agricoles bretonnes | |
| Cette application analyse les données historiques pour identifier les parcelles les plus adaptées | |
| à la culture de plantes sensibles comme le pois ou le haricot. | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("📊 Vue d'ensemble"): | |
| gr.Markdown("## Statistiques générales des données agricoles") | |
| stats_output = gr.Markdown(analyzer.get_summary_stats()) | |
| with gr.Row(): | |
| culture_plot = gr.Plot(analyzer.create_culture_analysis()) | |
| risk_dist_plot = gr.Plot(analyzer.create_risk_distribution()) | |
| with gr.TabItem("🎯 Analyse des Risques"): | |
| gr.Markdown("## Cartographie des risques adventices par parcelle") | |
| risk_plot = gr.Plot(analyzer.create_risk_visualization()) | |
| gr.Markdown(""" | |
| **Interprétation du graphique**: | |
| - **Axe X**: Surface de la parcelle (hectares) | |
| - **Axe Y**: IFT Herbicide approximatif | |
| - **Couleur**: Niveau de risque adventice | |
| - **Taille**: Nombre d'herbicides utilisés | |
| Les parcelles vertes (risque faible) sont idéales pour les cultures sensibles. | |
| """) | |
| with gr.TabItem("🌾 Recommandations"): | |
| reco_output = gr.Markdown(analyzer.get_low_risk_recommendations()) | |
| gr.Markdown(""" | |
| ## 💡 Conseils pour la gestion des adventices | |
| ### Parcelles à Très Faible Risque (Vertes) | |
| - ✅ **Idéales pour pois et haricot** | |
| - ✅ Historique d'usage herbicide minimal | |
| - ✅ Pression adventice faible attendue | |
| ### Parcelles à Faible Risque (Vert clair) | |
| - ⚠️ Surveillance légère recommandée | |
| - ✅ Conviennent aux cultures sensibles avec précautions | |
| ### Parcelles à Risque Modéré/Élevé (Orange/Rouge) | |
| - ❌ Éviter pour cultures sensibles | |
| - 🔍 Rotation nécessaire avant implantation | |
| - 📈 Surveillance renforcée des adventices | |
| ### Stratégies alternatives | |
| - **Rotation longue**: 3-4 ans avant cultures sensibles | |
| - **Cultures intermédiaires**: CIPAN pour réduire la pression | |
| - **Techniques mécaniques**: Hersage, binage | |
| - **Biostimulants**: Renforcement naturel des cultures | |
| """) | |
| with gr.TabItem("ℹ️ À propos"): | |
| gr.Markdown(""" | |
| ## 🎯 Méthodologie | |
| Cette analyse se base sur : | |
| ### Calcul de l'IFT (Indice de Fréquence de Traitement) | |
| - **IFT ≈ Quantité appliquée / Surface de parcelle** | |
| - Indicateur de l'intensité des traitements herbicides | |
| ### Classification des risques | |
| - **TRÈS FAIBLE**: IFT = 0, aucun herbicide | |
| - **FAIBLE**: IFT < 1, usage minimal | |
| - **MODÉRÉ**: IFT < 3, usage modéré | |
| - **ÉLEVÉ**: IFT < 5, usage important | |
| - **TRÈS ÉLEVÉ**: IFT ≥ 5, usage intensif | |
| ### Données analysées | |
| - **Source**: Station Expérimentale de Kerguéhennec | |
| - **Période**: Campagne 2025 | |
| - **Variables**: Interventions, produits, quantités, surfaces | |
| --- | |
| **Développé pour le Hackathon CRA Bretagne** 🏆 | |
| *Application d'aide à la décision pour une agriculture durable* | |
| """) | |
| # Bouton de rafraîchissement | |
| refresh_btn = gr.Button("🔄 Actualiser les données", variant="secondary") | |
| def refresh_data(): | |
| analyzer.load_data() | |
| analyzer.analyze_data() # Recalculer l'analyse après rechargement | |
| return ( | |
| analyzer.get_summary_stats(), | |
| analyzer.create_culture_analysis(), | |
| analyzer.create_risk_distribution(), | |
| analyzer.create_risk_visualization(), | |
| analyzer.get_low_risk_recommendations() | |
| ) | |
| refresh_btn.click( | |
| refresh_data, | |
| outputs=[stats_output, culture_plot, risk_dist_plot, risk_plot, reco_output] | |
| ) | |
| return demo | |
| # Lancement de l'application | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| # Configuration pour Hugging Face Spaces | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False # Pas besoin de share sur HF Spaces | |
| ) | |