import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Any import pandas as pd import requests import plotly.express as px import streamlit as st from tenacity import retry, stop_after_attempt, wait_exponential # Configuration Streamlit st.set_page_config(page_title="Pesticide Data Explorer", page_icon="🌿", layout="wide") # Configuration logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("pesticide_app.log", encoding="utf-8"), logging.StreamHandler()], ) logger = logging.getLogger(__name__) class PesticideDataFetcher: BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides" HEADERS = { "Content-Type": "application/json", "Cache-Control": "no-cache", "User-Agent": "Mozilla/5.0" } def __init__(self): self.session = requests.Session() self.session.headers.update(self.HEADERS) @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def fetch_data(self, url: str, params: Optional[Dict] = None) -> Dict[str, Any]: """Effectue une requête GET avec gestion des erreurs améliorée""" try: response = self.session.get(url, params=params, timeout=15) response.raise_for_status() return response.json() except requests.RequestException as e: logger.error(f"Erreur API : {e}") if "timeout" in str(e).lower(): logger.warning("Timeout - tentative de reconnexion...") return {} def get_mrls(self, product_ids: Optional[List[int]] = None) -> List[Dict]: """Récupère les LMR pour une liste de produits""" all_mrls = [] if not product_ids: url = f"{self.BASE_URL}/pesticide_residues_mrls?format=json&api-version=v2.0" response = self.fetch_data(url) return response.get("value", []) for product_id in product_ids: url = f"{self.BASE_URL}/pesticide_residues_mrls?format=json&api-version=v2.0" params = {"product_id": product_id} response = self.fetch_data(url, params=params) if response and "value" in response: all_mrls.extend(response["value"]) return all_mrls @st.cache_data(ttl=3600) # Cache augmenté à 1 heure def get_products() -> List[Dict]: fetcher = PesticideDataFetcher() url = f"{fetcher.BASE_URL}/pesticide_residues_products?format=json&language=FR&api-version=v2.0" all_products = [] while url: response = fetcher.fetch_data(url) if not response or "value" not in response: break all_products.extend(response["value"]) url = response.get("nextLink") return all_products @st.cache_data(ttl=3600) def get_all_substances() -> Dict[int, str]: fetcher = PesticideDataFetcher() url = f"{fetcher.BASE_URL}/active_substances?format=json&api-version=v2.0" all_substances = {} while url: response = fetcher.fetch_data(url) if not response or "value" not in response: break for item in response["value"]: if item.get("substance_id") and item.get("substance_name"): all_substances[item["substance_id"]] = item["substance_name"] url = response.get("nextLink") return all_substances class PesticideInterface: def __init__(self): self.fetcher = PesticideDataFetcher() self.products = get_products() self.product_choices = {p['product_name']: p['product_id'] for p in self.products} self.substances = get_all_substances() def get_product_details(self, product_names: List[str], future_only: bool = False) -> pd.DataFrame: product_ids = [self.product_choices[name] for name in product_names] all_mrls = self.fetcher.get_mrls(product_ids) df = pd.DataFrame(all_mrls) if df.empty: if future_only: st.info("Aucun changement de LMR prévu dans les 6 prochains mois pour les produits sélectionnés.") return df # Log des données pour debug logger.info(f"Nombre total d'entrées : {len(df)}") logger.info(f"Colonnes disponibles : {df.columns.tolist()}") # Enrichir les données df["Substance"] = df["pesticide_residue_id"].map(self.substances) # Log des substances non trouvées missing_substances = df[df["Substance"].isna()]["pesticide_residue_id"].unique() if len(missing_substances) > 0: logger.warning(f"Substances non trouvées: {missing_substances}") # Remplacer les NA par "Inconnu" seulement après le log df["Substance"] = df["Substance"].fillna("Inconnu") df["Règlement"] = df.apply( lambda x: f'{x["regulation_number"]}' if pd.notna(x["regulation_url"]) else x["regulation_number"], axis=1 ) # Conversion des dates df["Date d'application"] = pd.to_datetime(df["entry_into_force_date"], errors="coerce") # Filtrer pour les 6 prochains mois si demandé if future_only: now = datetime.now() future_date = now + timedelta(days=180) future_df = df[ (df["Date d'application"] > now) & (df["Date d'application"] <= future_date) ] if future_df.empty: st.info(f"🔍 Aucun changement de LMR prévu entre le {now.strftime('%d/%m/%Y')} et le {future_date.strftime('%d/%m/%Y')} pour les produits sélectionnés.") return pd.DataFrame() # Retourne un DataFrame vide df = future_df # Renommer et convertir les colonnes df = df.rename(columns={"mrl_value": "Valeur LMR"}) # Conversion explicite en nombre flottant df["Valeur LMR"] = pd.to_numeric(df["Valeur LMR"], errors='coerce') # Sélection finale des colonnes columns = ["Substance", "Valeur LMR", "Date d'application", "Règlement"] df = df[columns].sort_values("Date d'application", ascending=False) return df def create_interface(self): st.title("🌿 Base de données des pesticides de l'UE") col1, col2 = st.columns([3, 1]) with col1: product_names = st.multiselect( "Sélectionnez un ou plusieurs produits", list(self.product_choices.keys()) ) with col2: future_only = st.checkbox("Uniquement les 6 prochains mois", value=False) if st.button("Afficher les données"): if not product_names: st.warning("Veuillez sélectionner au moins un produit.") return df = self.get_product_details(product_names, future_only) if df.empty: return # Le message d'info a déjà été affiché dans get_product_details else: if future_only: st.markdown("### Changements de LMR prévus dans les 6 prochains mois") else: st.markdown("### Tableau des LMR") # Formatage amélioré du tableau df_display = df.copy() df_display["Date d'application"] = df_display["Date d'application"].dt.strftime('%d/%m/%Y') # Fonction de formatage personnalisée def format_value(val): if pd.isna(val): return '-' elif isinstance(val, (int, float)): return f"{val:.3f}" return val styled_df = df_display.style.format({ 'Valeur LMR': format_value, }).hide_index() # Ajouter un résumé des changements if not df.empty: nb_changes = len(df) st.info(f"📊 {nb_changes} entrée{'s' if nb_changes > 1 else ''} trouvée{'s' if nb_changes > 1 else ''}.") st.markdown(""" """, unsafe_allow_html=True) st.markdown(styled_df.to_html(escape=False), unsafe_allow_html=True) # Créer les visualisations self.create_visualizations(df) def create_visualizations(self, df: pd.DataFrame): """Crée les visualisations des données""" # Graphique d'évolution des LMR fig1 = px.scatter( df, x="Date d'application", y="Valeur LMR", color="Substance", title="Évolution des LMR dans le temps", hover_data=["Règlement"] ) st.plotly_chart(fig1, use_container_width=True) def main(): interface = PesticideInterface() interface.create_interface() if __name__ == "__main__": main()