data / analyzer.py
Tracy André
updated
d9372eb
raw
history blame
16 kB
"""
Module d'analyse des données agricoles et calcul des risques
"""
import pandas as pd
from config import OPTIONAL_GROUP_COLS, REQUIRED_COLUMNS, RISK_LEVELS
class AgricultureAnalyzer:
"""Classe responsable de l'analyse des données agricoles"""
def __init__(self, data=None):
self.df = data
self.risk_analysis = None
def set_data(self, data):
"""Définit les données à analyser"""
self.df = data
def analyze_data(self):
"""Analyse des données et calcul des risques"""
if self.df is None or len(self.df) == 0:
print("❌ Pas de données à analyser")
return "Erreur: Aucune donnée chargée"
try:
print(f"🔄 Début de l'analyse sur {len(self.df)} enregistrements...")
# Analyse générale
general_stats = self._calculate_general_stats()
# Analyse des herbicides
herbicide_stats = self._calculate_herbicide_stats()
# Calcul de l'analyse des risques
self.calculate_risk_analysis()
print("✅ Analyse terminée avec succès")
return general_stats, herbicide_stats
except Exception as e:
print(f"❌ Erreur lors de l'analyse: {str(e)}")
return None, None
def _calculate_general_stats(self):
"""Calcule les statistiques générales"""
return {
'total_parcelles': self.df['numparcell'].nunique(),
'total_interventions': len(self.df),
'surface_totale': self.df['surfparc'].sum(),
'surface_moyenne': self.df['surfparc'].mean(),
'periode': f"{self.df['millesime'].min()} - {self.df['millesime'].max()}"
}
def _calculate_herbicide_stats(self):
"""Calcule les statistiques sur les herbicides"""
if 'familleprod' in self.df.columns:
herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'].copy()
return {
'nb_interventions_herbicides': len(herbicides_df),
'pourcentage_herbicides': (len(herbicides_df) / len(self.df)) * 100,
'parcelles_traitees': herbicides_df['numparcell'].nunique()
}
else:
return {
'nb_interventions_herbicides': 0,
'pourcentage_herbicides': 0,
'parcelles_traitees': 0
}
def calculate_risk_analysis(self):
"""Calcule l'analyse des risques par parcelle"""
try:
print("🔄 Calcul de l'analyse des risques...")
# Vérifier les colonnes nécessaires
required_group_cols = ['numparcell', 'surfparc']
# Construire la liste des colonnes de groupement disponibles
group_cols = [col for col in required_group_cols if col in self.df.columns]
group_cols.extend([col for col in OPTIONAL_GROUP_COLS if col in self.df.columns])
if len(group_cols) < 2:
print(f"❌ Colonnes insuffisantes pour le groupement: {group_cols}")
self.risk_analysis = pd.DataFrame()
return
# Construire l'agrégation selon les colonnes disponibles
agg_dict = self._build_aggregation_dict()
if not agg_dict:
print("❌ Aucune colonne disponible pour l'agrégation")
self.risk_analysis = pd.DataFrame()
return
# Groupement des données par parcelle
risk_analysis = self.df.groupby(group_cols).agg(agg_dict).round(2)
# Ajout des quantités d'herbicides spécifiques
risk_analysis = self._add_herbicide_quantities(risk_analysis, group_cols)
# Renommage des colonnes
risk_analysis = self._rename_columns(risk_analysis, agg_dict)
# Calcul de l'IFT approximatif
risk_analysis = self._calculate_ift(risk_analysis, group_cols)
# Classification du risque
risk_analysis['Risque_adventice'] = risk_analysis.apply(self._classify_risk, axis=1)
# Tri par risque
risk_analysis = self._sort_by_risk(risk_analysis)
self.risk_analysis = risk_analysis
print(f"✅ Analyse des risques terminée: {len(self.risk_analysis)} parcelles analysées")
except Exception as e:
print(f"❌ Erreur lors du calcul des risques: {str(e)}")
self.risk_analysis = pd.DataFrame()
def _build_aggregation_dict(self):
"""Construit le dictionnaire d'agrégation selon les colonnes disponibles"""
agg_dict = {}
if 'familleprod' in self.df.columns:
agg_dict['familleprod'] = lambda x: (x == 'Herbicides').sum()
if 'libevenem' in self.df.columns:
agg_dict['libevenem'] = lambda x: len(x.unique())
if 'produit' in self.df.columns:
agg_dict['produit'] = lambda x: len(x.unique())
if 'quantitetot' in self.df.columns:
agg_dict['quantitetot'] = 'sum'
return agg_dict
def _add_herbicide_quantities(self, risk_analysis, group_cols):
"""Ajoute les quantités d'herbicides spécifiques"""
if 'familleprod' in self.df.columns and 'quantitetot' in self.df.columns:
herbicides_df = self.df[self.df['familleprod'] == 'Herbicides']
if len(herbicides_df) > 0:
herbicide_quantities = herbicides_df.groupby(group_cols)['quantitetot'].sum().fillna(0)
risk_analysis['Quantite_herbicides'] = herbicide_quantities.reindex(risk_analysis.index, fill_value=0)
else:
risk_analysis['Quantite_herbicides'] = 0
else:
risk_analysis['Quantite_herbicides'] = 0
return risk_analysis
def _rename_columns(self, risk_analysis, agg_dict):
"""Renomme les colonnes de façon sécurisée"""
new_column_names = {}
if 'familleprod' in agg_dict:
new_column_names['familleprod'] = 'Nb_herbicides'
if 'libevenem' in agg_dict:
new_column_names['libevenem'] = 'Diversite_evenements'
if 'produit' in agg_dict:
new_column_names['produit'] = 'Diversite_produits'
if 'quantitetot' in agg_dict:
new_column_names['quantitetot'] = 'Quantite_totale'
return risk_analysis.rename(columns=new_column_names)
def _calculate_ift(self, risk_analysis, group_cols):
"""Calcule l'IFT approximatif"""
if 'surfparc' in group_cols:
risk_analysis['IFT_herbicide_approx'] = (
risk_analysis['Quantite_herbicides'] /
risk_analysis.index.get_level_values('surfparc')
).round(2)
else:
risk_analysis['IFT_herbicide_approx'] = 0
return risk_analysis
def _classify_risk(self, row):
"""Classification du risque pour une parcelle"""
ift = row.get('IFT_herbicide_approx', 0)
nb_herb = row.get('Nb_herbicides', 0)
if ift == 0 and nb_herb == 0:
return 'TRÈS FAIBLE'
elif ift < 1 and nb_herb <= 1:
return 'FAIBLE'
elif ift < 3 and nb_herb <= 3:
return 'MODÉRÉ'
elif ift < 5 and nb_herb <= 5:
return 'ÉLEVÉ'
else:
return 'TRÈS ÉLEVÉ'
def _sort_by_risk(self, risk_analysis):
"""Trie les résultats par niveau de risque"""
risk_order = {r: i for i, r in enumerate(RISK_LEVELS)}
risk_analysis['Risk_Score'] = risk_analysis['Risque_adventice'].map(risk_order)
return risk_analysis.sort_values(['Risk_Score', 'IFT_herbicide_approx'])
def get_summary_stats(self):
"""Retourne les statistiques de résumé avec gestion d'erreur"""
try:
if self.df is None:
return "❌ Aucune donnée disponible"
# Statistiques générales avec gestion d'erreur
try:
total_parcelles = self.df['numparcell'].nunique()
total_interventions = len(self.df)
surface_totale = self.df['surfparc'].sum()
surface_moyenne = self.df['surfparc'].mean()
periode_min = self.df['millesime'].min()
periode_max = self.df['millesime'].max()
stats_text = f"""
## 📊 Statistiques Générales
- **Nombre total de parcelles**: {total_parcelles}
- **Nombre d'interventions**: {total_interventions:,}
- **Surface totale**: {surface_totale:.2f} hectares
- **Surface moyenne par parcelle**: {surface_moyenne:.2f} hectares
- **Période**: {periode_min} - {periode_max}
## 🧪 Analyse Herbicides
"""
except Exception as e:
print(f"❌ Erreur dans les statistiques générales: {e}")
stats_text = """
## 📊 Statistiques Générales
❌ Erreur lors du calcul des statistiques générales
## 🧪 Analyse Herbicides
"""
# Analyse des herbicides avec gestion d'erreur
try:
if 'familleprod' in self.df.columns:
herbicides_df = self.df[self.df['familleprod'] == 'Herbicides']
if len(herbicides_df) > 0:
nb_herbicides = len(herbicides_df)
pct_herbicides = (nb_herbicides/len(self.df)*100)
parcelles_traitees = herbicides_df['numparcell'].nunique()
if 'produit' in herbicides_df.columns:
produits_uniques = herbicides_df['produit'].nunique()
stats_text += f"""
- **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%)
- **Parcelles traitées**: {parcelles_traitees}
- **Produits herbicides différents**: {produits_uniques}
"""
else:
stats_text += f"""
- **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%)
- **Parcelles traitées**: {parcelles_traitees}
"""
else:
stats_text += "\n- **Aucune intervention herbicide détectée**"
else:
stats_text += "\n- **Données d'herbicides non disponibles**"
except Exception as e:
print(f"❌ Erreur dans l'analyse des herbicides: {e}")
stats_text += "\n❌ Erreur lors de l'analyse des herbicides"
# Analyse des risques avec gestion d'erreur
try:
if self.risk_analysis is not None and len(self.risk_analysis) > 0:
risk_distribution = self.risk_analysis['Risque_adventice'].value_counts()
stats_text += f"""
## 🎯 Répartition des Risques Adventices
"""
for risk_level in RISK_LEVELS:
if risk_level in risk_distribution:
count = risk_distribution[risk_level]
pct = (count / len(self.risk_analysis)) * 100
stats_text += f"- **{risk_level}**: {count} parcelles ({pct:.1f}%)\n"
else:
stats_text += "\n\n❌ Analyse des risques non disponible"
except Exception as e:
print(f"❌ Erreur dans l'analyse des risques: {e}")
stats_text += "\n\n❌ Erreur lors de l'analyse des risques"
return stats_text
except Exception as e:
print(f"❌ Erreur critique dans get_summary_stats: {e}")
return "❌ Erreur critique lors de la génération des statistiques"
def get_low_risk_recommendations(self):
"""Retourne les recommandations pour les parcelles à faible risque avec gestion d'erreur"""
try:
if self.risk_analysis is None or len(self.risk_analysis) == 0:
return "❌ Analyse des risques non disponible"
try:
low_risk = self.risk_analysis[
self.risk_analysis['Risque_adventice'].isin(['TRÈS FAIBLE', 'FAIBLE'])
].head(10)
if len(low_risk) == 0:
return """## 🌾 Recommandations pour Cultures Sensibles
❌ Aucune parcelle à faible risque trouvée.
💡 **Suggestion**: Considérez une rotation plus longue ou des techniques alternatives pour réduire la pression adventice."""
recommendations = "## 🌾 TOP 10 - Parcelles Recommandées pour Cultures Sensibles (Pois, Haricot)\n\n"
for idx, row in low_risk.iterrows():
try:
if isinstance(idx, tuple) and len(idx) >= 4:
parcelle, nom, culture, surface = idx[:4]
else:
# Fallback si l'index n'est pas un tuple de 4 éléments
parcelle = str(idx)
nom = "N/A"
culture = "N/A"
surface = row.get('surfparc', 0) if hasattr(row, 'get') else 0
# Vérification des valeurs avec fallbacks
risque = row.get('Risque_adventice', 'N/A') if hasattr(row, 'get') else 'N/A'
ift = row.get('IFT_herbicide_approx', 0) if hasattr(row, 'get') else 0
nb_herb = row.get('Nb_herbicides', 0) if hasattr(row, 'get') else 0
# Conversion sécurisée pour les formats
try:
surface_formatted = f"{float(surface):.2f}" if surface != "N/A" else "N/A"
except (ValueError, TypeError):
surface_formatted = str(surface)
try:
ift_formatted = f"{float(ift):.2f}" if ift != "N/A" else "N/A"
except (ValueError, TypeError):
ift_formatted = str(ift)
recommendations += f"""
**Parcelle {parcelle}** ({nom})
- Culture actuelle: {culture}
- Surface: {surface_formatted} ha
- Niveau de risque: {risque}
- IFT herbicide: {ift_formatted}
- Nombre d'herbicides: {nb_herb}
---
"""
except Exception as e:
print(f"❌ Erreur lors du traitement d'une parcelle: {e}")
recommendations += f"""
**Parcelle {str(idx)}**
❌ Erreur lors du traitement des données de cette parcelle
---
"""
return recommendations
except Exception as e:
print(f"❌ Erreur lors de la génération des recommandations: {e}")
return """## 🌾 Recommandations pour Cultures Sensibles
❌ Erreur lors de la génération des recommandations.
💡 **Suggestion**: Vérifiez la qualité des données et relancez l'analyse."""
except Exception as e:
print(f"❌ Erreur critique dans get_low_risk_recommendations: {e}")
return "❌ Erreur critique lors de la génération des recommandations"
def get_risk_analysis(self):
"""Retourne l'analyse des risques"""
return self.risk_analysis