""" Module d'analyse des données agricoles et calcul des risques """ import pandas as pd from config import OPTIONAL_GROUP_COLS, REQUIRED_COLUMNS, RISK_LEVELS class AgricultureAnalyzer: """Classe responsable de l'analyse des données agricoles""" def __init__(self, data=None): self.df = data self.risk_analysis = None def set_data(self, data): """Définit les données à analyser""" self.df = data def analyze_data(self): """Analyse des données et calcul des risques""" if self.df is None or len(self.df) == 0: print("❌ Pas de données à analyser") return "Erreur: Aucune donnée chargée" try: print(f"🔄 Début de l'analyse sur {len(self.df)} enregistrements...") # Analyse générale general_stats = self._calculate_general_stats() # Analyse des herbicides herbicide_stats = self._calculate_herbicide_stats() # Calcul de l'analyse des risques self.calculate_risk_analysis() print("✅ Analyse terminée avec succès") return general_stats, herbicide_stats except Exception as e: print(f"❌ Erreur lors de l'analyse: {str(e)}") return None, None def _calculate_general_stats(self): """Calcule les statistiques générales""" return { 'total_parcelles': self.df['numparcell'].nunique(), 'total_interventions': len(self.df), 'surface_totale': self.df['surfparc'].sum(), 'surface_moyenne': self.df['surfparc'].mean(), 'periode': f"{self.df['millesime'].min()} - {self.df['millesime'].max()}" } def _calculate_herbicide_stats(self): """Calcule les statistiques sur les herbicides""" if 'familleprod' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'].copy() return { 'nb_interventions_herbicides': len(herbicides_df), 'pourcentage_herbicides': (len(herbicides_df) / len(self.df)) * 100, 'parcelles_traitees': herbicides_df['numparcell'].nunique() } else: return { 'nb_interventions_herbicides': 0, 'pourcentage_herbicides': 0, 'parcelles_traitees': 0 } def calculate_risk_analysis(self): """Calcule l'analyse des risques par parcelle""" try: print("🔄 Calcul de l'analyse des risques...") # Vérifier les colonnes nécessaires required_group_cols = ['numparcell', 'surfparc'] # Construire la liste des colonnes de groupement disponibles group_cols = [col for col in required_group_cols if col in self.df.columns] group_cols.extend([col for col in OPTIONAL_GROUP_COLS if col in self.df.columns]) if len(group_cols) < 2: print(f"❌ Colonnes insuffisantes pour le groupement: {group_cols}") self.risk_analysis = pd.DataFrame() return # Construire l'agrégation selon les colonnes disponibles agg_dict = self._build_aggregation_dict() if not agg_dict: print("❌ Aucune colonne disponible pour l'agrégation") self.risk_analysis = pd.DataFrame() return # Groupement des données par parcelle risk_analysis = self.df.groupby(group_cols).agg(agg_dict).round(2) # Ajout des quantités d'herbicides spécifiques risk_analysis = self._add_herbicide_quantities(risk_analysis, group_cols) # Renommage des colonnes risk_analysis = self._rename_columns(risk_analysis, agg_dict) # Calcul de l'IFT approximatif risk_analysis = self._calculate_ift(risk_analysis, group_cols) # Classification du risque risk_analysis['Risque_adventice'] = risk_analysis.apply(self._classify_risk, axis=1) # Tri par risque risk_analysis = self._sort_by_risk(risk_analysis) self.risk_analysis = risk_analysis print(f"✅ Analyse des risques terminée: {len(self.risk_analysis)} parcelles analysées") except Exception as e: print(f"❌ Erreur lors du calcul des risques: {str(e)}") self.risk_analysis = pd.DataFrame() def _build_aggregation_dict(self): """Construit le dictionnaire d'agrégation selon les colonnes disponibles""" agg_dict = {} if 'familleprod' in self.df.columns: agg_dict['familleprod'] = lambda x: (x == 'Herbicides').sum() if 'libevenem' in self.df.columns: agg_dict['libevenem'] = lambda x: len(x.unique()) if 'produit' in self.df.columns: agg_dict['produit'] = lambda x: len(x.unique()) if 'quantitetot' in self.df.columns: agg_dict['quantitetot'] = 'sum' return agg_dict def _add_herbicide_quantities(self, risk_analysis, group_cols): """Ajoute les quantités d'herbicides spécifiques""" if 'familleprod' in self.df.columns and 'quantitetot' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: herbicide_quantities = herbicides_df.groupby(group_cols)['quantitetot'].sum().fillna(0) risk_analysis['Quantite_herbicides'] = herbicide_quantities.reindex(risk_analysis.index, fill_value=0) else: risk_analysis['Quantite_herbicides'] = 0 else: risk_analysis['Quantite_herbicides'] = 0 return risk_analysis def _rename_columns(self, risk_analysis, agg_dict): """Renomme les colonnes de façon sécurisée""" new_column_names = {} if 'familleprod' in agg_dict: new_column_names['familleprod'] = 'Nb_herbicides' if 'libevenem' in agg_dict: new_column_names['libevenem'] = 'Diversite_evenements' if 'produit' in agg_dict: new_column_names['produit'] = 'Diversite_produits' if 'quantitetot' in agg_dict: new_column_names['quantitetot'] = 'Quantite_totale' return risk_analysis.rename(columns=new_column_names) def _calculate_ift(self, risk_analysis, group_cols): """Calcule l'IFT approximatif""" if 'surfparc' in group_cols: risk_analysis['IFT_herbicide_approx'] = ( risk_analysis['Quantite_herbicides'] / risk_analysis.index.get_level_values('surfparc') ).round(2) else: risk_analysis['IFT_herbicide_approx'] = 0 return risk_analysis def _classify_risk(self, row): """Classification du risque pour une parcelle""" ift = row.get('IFT_herbicide_approx', 0) nb_herb = row.get('Nb_herbicides', 0) if ift == 0 and nb_herb == 0: return 'TRÈS FAIBLE' elif ift < 1 and nb_herb <= 1: return 'FAIBLE' elif ift < 3 and nb_herb <= 3: return 'MODÉRÉ' elif ift < 5 and nb_herb <= 5: return 'ÉLEVÉ' else: return 'TRÈS ÉLEVÉ' def _sort_by_risk(self, risk_analysis): """Trie les résultats par niveau de risque""" risk_order = {r: i for i, r in enumerate(RISK_LEVELS)} risk_analysis['Risk_Score'] = risk_analysis['Risque_adventice'].map(risk_order) return risk_analysis.sort_values(['Risk_Score', 'IFT_herbicide_approx']) def get_summary_stats(self): """Retourne les statistiques de résumé avec gestion d'erreur""" try: if self.df is None: return "❌ Aucune donnée disponible" # Statistiques générales avec gestion d'erreur try: total_parcelles = self.df['numparcell'].nunique() total_interventions = len(self.df) surface_totale = self.df['surfparc'].sum() surface_moyenne = self.df['surfparc'].mean() periode_min = self.df['millesime'].min() periode_max = self.df['millesime'].max() stats_text = f""" ## 📊 Statistiques Générales - **Nombre total de parcelles**: {total_parcelles} - **Nombre d'interventions**: {total_interventions:,} - **Surface totale**: {surface_totale:.2f} hectares - **Surface moyenne par parcelle**: {surface_moyenne:.2f} hectares - **Période**: {periode_min} - {periode_max} ## 🧪 Analyse Herbicides """ except Exception as e: print(f"❌ Erreur dans les statistiques générales: {e}") stats_text = """ ## 📊 Statistiques Générales ❌ Erreur lors du calcul des statistiques générales ## 🧪 Analyse Herbicides """ # Analyse des herbicides avec gestion d'erreur try: if 'familleprod' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: nb_herbicides = len(herbicides_df) pct_herbicides = (nb_herbicides/len(self.df)*100) parcelles_traitees = herbicides_df['numparcell'].nunique() if 'produit' in herbicides_df.columns: produits_uniques = herbicides_df['produit'].nunique() stats_text += f""" - **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%) - **Parcelles traitées**: {parcelles_traitees} - **Produits herbicides différents**: {produits_uniques} """ else: stats_text += f""" - **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%) - **Parcelles traitées**: {parcelles_traitees} """ else: stats_text += "\n- **Aucune intervention herbicide détectée**" else: stats_text += "\n- **Données d'herbicides non disponibles**" except Exception as e: print(f"❌ Erreur dans l'analyse des herbicides: {e}") stats_text += "\n❌ Erreur lors de l'analyse des herbicides" # Analyse des risques avec gestion d'erreur try: if self.risk_analysis is not None and len(self.risk_analysis) > 0: risk_distribution = self.risk_analysis['Risque_adventice'].value_counts() stats_text += f""" ## 🎯 Répartition des Risques Adventices """ for risk_level in RISK_LEVELS: if risk_level in risk_distribution: count = risk_distribution[risk_level] pct = (count / len(self.risk_analysis)) * 100 stats_text += f"- **{risk_level}**: {count} parcelles ({pct:.1f}%)\n" else: stats_text += "\n\n❌ Analyse des risques non disponible" except Exception as e: print(f"❌ Erreur dans l'analyse des risques: {e}") stats_text += "\n\n❌ Erreur lors de l'analyse des risques" return stats_text except Exception as e: print(f"❌ Erreur critique dans get_summary_stats: {e}") return "❌ Erreur critique lors de la génération des statistiques" def get_low_risk_recommendations(self): """Retourne les recommandations pour les parcelles à faible risque avec gestion d'erreur""" try: if self.risk_analysis is None or len(self.risk_analysis) == 0: return "❌ Analyse des risques non disponible" try: low_risk = self.risk_analysis[ self.risk_analysis['Risque_adventice'].isin(['TRÈS FAIBLE', 'FAIBLE']) ].head(10) if len(low_risk) == 0: return """## 🌾 Recommandations pour Cultures Sensibles ❌ Aucune parcelle à faible risque trouvée. 💡 **Suggestion**: Considérez une rotation plus longue ou des techniques alternatives pour réduire la pression adventice.""" recommendations = "## 🌾 TOP 10 - Parcelles Recommandées pour Cultures Sensibles (Pois, Haricot)\n\n" for idx, row in low_risk.iterrows(): try: if isinstance(idx, tuple) and len(idx) >= 4: parcelle, nom, culture, surface = idx[:4] else: # Fallback si l'index n'est pas un tuple de 4 éléments parcelle = str(idx) nom = "N/A" culture = "N/A" surface = row.get('surfparc', 0) if hasattr(row, 'get') else 0 # Vérification des valeurs avec fallbacks risque = row.get('Risque_adventice', 'N/A') if hasattr(row, 'get') else 'N/A' ift = row.get('IFT_herbicide_approx', 0) if hasattr(row, 'get') else 0 nb_herb = row.get('Nb_herbicides', 0) if hasattr(row, 'get') else 0 # Conversion sécurisée pour les formats try: surface_formatted = f"{float(surface):.2f}" if surface != "N/A" else "N/A" except (ValueError, TypeError): surface_formatted = str(surface) try: ift_formatted = f"{float(ift):.2f}" if ift != "N/A" else "N/A" except (ValueError, TypeError): ift_formatted = str(ift) recommendations += f""" **Parcelle {parcelle}** ({nom}) - Culture actuelle: {culture} - Surface: {surface_formatted} ha - Niveau de risque: {risque} - IFT herbicide: {ift_formatted} - Nombre d'herbicides: {nb_herb} --- """ except Exception as e: print(f"❌ Erreur lors du traitement d'une parcelle: {e}") recommendations += f""" **Parcelle {str(idx)}** ❌ Erreur lors du traitement des données de cette parcelle --- """ return recommendations except Exception as e: print(f"❌ Erreur lors de la génération des recommandations: {e}") return """## 🌾 Recommandations pour Cultures Sensibles ❌ Erreur lors de la génération des recommandations. 💡 **Suggestion**: Vérifiez la qualité des données et relancez l'analyse.""" except Exception as e: print(f"❌ Erreur critique dans get_low_risk_recommendations: {e}") return "❌ Erreur critique lors de la génération des recommandations" def get_risk_analysis(self): """Retourne l'analyse des risques""" return self.risk_analysis