import logging import json import io import zipfile from datetime import datetime, timedelta from typing import Dict, List, Optional, Any # Tuple n'est pas explicitement utilisé mais bon à garder import pandas as pd import requests import plotly.express as px import streamlit as st from tenacity import retry, stop_after_attempt, wait_exponential # import time # Non utilisé directement from collections import defaultdict # import hashlib # Non utilisé directement import tempfile import os # Configuration Streamlit st.set_page_config(page_title="Pesticide Data Explorer - Optimized", page_icon="🌿", layout="wide") # Configuration logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("pesticide_app_optimized.log", mode='a', encoding="utf-8"), logging.StreamHandler() ], ) logger = logging.getLogger(__name__) class PesticideDataFetcher: BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides" HEADERS = { "Content-Type": "application/json", "Cache-Control": "no-cache", "User-Agent": "StreamlitPesticideApp/1.2 (compatible; Mozilla/5.0)" } def __init__(self): self.session = requests.Session() self.session.headers.update(self.HEADERS) self.api_calls = 0 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=2, min=4, max=30)) def download_data(self, endpoint: str, params: Dict, stream_large_json: bool = False) -> Optional[Any]: url = f"{self.BASE_URL}{endpoint}" temp_file_path = None try: self.api_calls += 1 logger.info(f"Téléchargement: {url} | Params: {params} | API Call #{self.api_calls}") if stream_large_json and params.get('format') == 'json': logger.info(f"Mode streaming activé pour {url}") with self.session.get(url, params=params, timeout=(10, 300), stream=True) as r: r.raise_for_status() with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8', suffix=".json") as tmp_file: temp_file_path = tmp_file.name logger.info(f"Sauvegarde streamée vers: {temp_file_path}") for chunk in r.iter_content(chunk_size=1024*1024): # 1MB chunks if chunk: tmp_file.write(chunk.decode('utf-8', errors='replace')) logger.info(f"Sauvegarde streamée terminée: {temp_file_path}") logger.info(f"Lecture JSON depuis: {temp_file_path}") with open(temp_file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data else: response = self.session.get(url, params=params, timeout=(10, 180)) response.raise_for_status() content_type = response.headers.get('Content-Type', '') if 'json' in content_type or params.get('format') == 'json': return response.json() elif 'csv' in content_type or params.get('format') == 'csv': return response.text elif 'zip' in content_type: with zipfile.ZipFile(io.BytesIO(response.content)) as zf: if not zf.namelist(): logger.error(f"Fichier ZIP vide: {url}") return None filename = zf.namelist()[0] with zf.open(filename) as f_zip: content = f_zip.read().decode('utf-8', errors='replace') return json.loads(content) if filename.endswith('.json') else content else: logger.warning(f"Type contenu non géré: {content_type} pour {url}. Retour texte.") return response.text except requests.exceptions.Timeout as e: logger.error(f"Timeout: {url} - {e}") return None except requests.RequestException as e: logger.error(f"Erreur requête: {url} - {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Status: {e.response.status_code}, Réponse: {e.response.text[:200]}...") return None except json.JSONDecodeError as e: logger.error(f"Erreur décodage JSON: {url} - {e}") if temp_file_path and os.path.exists(temp_file_path): logger.error(f"Contenu début fichier temp ({temp_file_path}):") try: with open(temp_file_path, 'r', encoding='utf-8') as f_err: logger.error(f_err.read(1000)) except Exception as read_err: logger.error(f"Impossible lire fichier temp: {read_err}") return None except Exception as e: logger.error(f"Erreur inattendue (download_data) pour {url}: {e}", exc_info=True) return None finally: if stream_large_json and temp_file_path and os.path.exists(temp_file_path): try: os.remove(temp_file_path) logger.info(f"Fichier temp supprimé: {temp_file_path}") except OSError as e_os: logger.error(f"Impossible supprimer fichier temp {temp_file_path}: {e_os}") def get_products_paginated(self, language: str = 'FR') -> List[Dict]: all_products = [] url_base = f"{self.BASE_URL}/pesticide_residues_products" params_initial = {'format': 'json', 'language': language, 'api-version': 'v2.0'} current_url = url_base page_num = 0 MAX_PAGES = 30 while current_url and page_num < MAX_PAGES: self.api_calls += 1 page_num += 1 params_req = params_initial if current_url == url_base else None logger.info(f"Produits page {page_num} depuis {current_url} (API global #{self.api_calls})") try: resp = self.session.get(current_url, params=params_req, timeout=(10, 45)) resp.raise_for_status() data_page = resp.json() except requests.RequestException as e: logger.error(f"Erreur requête produits page {current_url}: {e}"); break except json.JSONDecodeError as e: logger.error(f"Erreur JSON produits page {current_url}: {e}. Reçu: {resp.text[:100]}"); break items = data_page.get('value', []) if isinstance(data_page, dict) else (data_page if isinstance(data_page, list) else []) if isinstance(items, list): all_products.extend(items) current_url = data_page.get('nextLink') if isinstance(data_page, dict) else None if not current_url: logger.info("Fin pagination produits.") if page_num >= MAX_PAGES and current_url: logger.warning(f"Limite pagination ({MAX_PAGES}) produits atteinte. Données potentiellement tronquées.") logger.info(f"Récupéré {len(all_products)} produits ({page_num} pages).") return all_products @st.cache_data(ttl=86400, show_spinner="Chargement initial des données de référence...") def download_all_data() -> Dict[str, Any]: fetcher = PesticideDataFetcher() results = {'substances': {}, 'mrls': [], 'products': [], 'product_dict': {}, 'stats': {}} with st.status("Initialisation du téléchargement...", expanded=True) as status_bar: status_bar.update(label="📥 Substances actives...") data_subst = fetcher.download_data("/active_substances/download", {"format": "json", "api-version": "v2.0"}) if data_subst: list_s = data_subst.get('value', []) if isinstance(data_subst, dict) else data_subst if isinstance(list_s, list): results['substances'] = { i['substance_id']: i['substance_name'] for i in list_s if isinstance(i, dict) and i.get('substance_id') and i.get('substance_name') } logger.info(f"✓ {len(results['substances'])} substances.") status_bar.update(label="📥 Enregistrements LMR (volumineux)...") data_mrls = fetcher.download_data("/pesticide_residues_mrls/download", {"format": "json", "language": "FR", "api-version": "v2.0"}, stream_large_json=True) if data_mrls: list_m = data_mrls.get('value', []) if isinstance(data_mrls, dict) else data_mrls if isinstance(list_m, list): results['mrls'] = [i for i in list_m if isinstance(i, dict)] logger.info(f"✓ {len(results['mrls'])} LMRs.") status_bar.update(label="📥 Produits alimentaires...") list_prods = fetcher.get_products_paginated(language='FR') if isinstance(list_prods, list): results['products'] = list_prods results['product_dict'] = { p['product_id']: p['product_name'] for p in list_prods if isinstance(p, dict) and p.get('product_id') and p.get('product_name') } logger.info(f"✓ {len(results['products'])} produits.") results['stats'] = {'api_calls': fetcher.api_calls, 'substances_count': len(results['substances']), 'mrls_count': len(results['mrls']), 'products_count': len(results['products']), 'download_time': datetime.now().strftime('%d/%m/%Y %H:%M:%S')} status_bar.update(label=f"✅ Données chargées! ({results['stats']['download_time']})", state="complete", expanded=False) return results class PesticideInterface: def __init__(self): self.data = download_all_data() self._create_indexes() def _create_indexes(self): self.mrls_by_product = defaultdict(list) for mrl_item in self.data.get('mrls', []): if isinstance(mrl_item, dict) and mrl_item.get('product_id'): self.mrls_by_product[mrl_item['product_id']].append(mrl_item) self.product_choices = { p_item['product_name']: p_item['product_id'] for p_item in self.data.get('products', []) if isinstance(p_item, dict) and p_item.get('product_name') and p_item.get('product_id') } logger.info(f"Index créés: {len(self.mrls_by_product)} produits avec LMR.") def get_product_details(self, product_names_sel: List[str], future_only_flag: bool = False) -> pd.DataFrame: sel_ids = [self.product_choices[name] for name in product_names_sel if name in self.product_choices] if not sel_ids: return pd.DataFrame() mrls_list = [mrl for pid_sel in sel_ids for mrl in self.mrls_by_product.get(pid_sel, [])] if not mrls_list: return pd.DataFrame() df_data = pd.DataFrame(mrls_list) if df_data.empty: return pd.DataFrame() df_data["Substance"] = df_data["pesticide_residue_id"].map(self.data.get('substances', {})).fillna("Inconnue") df_data["Produit"] = df_data["product_id"].map(self.data.get('product_dict', {})).fillna("Inconnu") def format_reg_link(row_data): url_val, num_val = row_data.get("regulation_url"), row_data.get("regulation_number", "N/A") return f"[{num_val}]({url_val})" if pd.notna(url_val) and str(url_val).strip().lower().startswith('http') else num_val df_data["Lien Règlement"] = df_data.apply(format_reg_link, axis=1) df_data["Date d'application"] = pd.to_datetime(df_data.get("entry_into_force_date"), errors="coerce") if future_only_flag: ts_now_utc = pd.Timestamp.now(tz='UTC') df_dates_col = df_data["Date d'application"].copy() if df_dates_col.dt.tz is None: df_dates_col = df_dates_col.dt.tz_localize('UTC', ambiguous='NaT', nonexistent='NaT') else: df_dates_col = df_dates_col.dt.tz_convert('UTC') ts_future_utc = ts_now_utc + timedelta(days=180) df_data = df_data[ (df_dates_col.notna()) & (df_dates_col > ts_now_utc) & (df_dates_col <= ts_future_utc) ] if df_data.empty: return pd.DataFrame() df_data["Valeur LMR"] = pd.to_numeric(df_data.get("mrl_value"), errors='coerce') cols_final = [c for c in ["Produit", "Substance", "Valeur LMR", "Date d'application", "Lien Règlement"] if c in df_data.columns] df_data = df_data[cols_final].copy() sort_order_cols = ["Produit"] sort_asc = [True] if "Date d'application" in df_data.columns: sort_order_cols.append("Date d'application") sort_asc.append(False) df_data = df_data.sort_values(by=sort_order_cols, ascending=sort_asc, na_position='last') return df_data def create_interface(self): st.title("🌿 EU Pesticides Database Explorer") app_stats = self.data.get('stats', {}) m_col1, m_col2, m_col3, m_col4 = st.columns(4) with m_col1: st.metric("📦 Produits", f"{app_stats.get('products_count', 0):,}") with m_col2: st.metric("🧪 Substances", f"{app_stats.get('substances_count', 0):,}") with m_col3: st.metric("📊 Enregistrements LMR", f"{app_stats.get('mrls_count', 0):,}") with m_col4: st.metric("📞 Appels API", app_stats.get('api_calls', 0)) st.caption(f"Données de référence chargées ({app_stats.get('download_time', 'N/A')}).") st.markdown("---") ui_col1, ui_col2 = st.columns([3, 1]) with ui_col1: opts_prods = sorted(list(self.product_choices.keys())) sel_prods_names = st.multiselect("🔍 Sélectionnez produit(s)", options=opts_prods, placeholder="Commencez à taper...") with ui_col2: sel_future_only = st.checkbox("📅 Changements futurs (6 mois)", value=False, help="Nouveaux LMR ou modifications prévues.") if sel_prods_names: df_res = self.get_product_details(sel_prods_names, sel_future_only) if df_res.empty: info_msg = "Aucun changement LMR prévu." if sel_future_only else "Aucune donnée LMR trouvée." st.info(f"{info_msg} pour la sélection actuelle.") else: st.markdown("### 📊 Résultats des LMR") df_lmr_num = df_res[df_res["Valeur LMR"].notna()] disp_col1, disp_col2 = st.columns(2) with disp_col1: st.metric("Lignes affichées", len(df_res)) with disp_col2: st.metric("Substances uniques", df_lmr_num["Substance"].nunique() if not df_lmr_num.empty else 0) with st.expander("⚙️ Options d'affichage", expanded=False): opt_show_low_mrl = st.checkbox("Inclure LMR < 0.01 mg/kg", value=True) opts_sort = [c for c in ["Produit", "Substance", "Valeur LMR", "Date d'application"] if c in df_res.columns] opt_sort_by = None if opts_sort: def_sort_idx_opt = opts_sort.index("Date d'application") if "Date d'application" in opts_sort else 0 opt_sort_by = st.selectbox("Trier par", opts_sort, index=def_sort_idx_opt) opt_sort_dir = st.radio("Ordre", ["Croissant", "Décroissant"], horizontal=True, index=1 if opt_sort_by=="Date d'application" else 0) df_view = df_res.copy() if not opt_show_low_mrl and "Valeur LMR" in df_view.columns: df_view = df_view[df_view["Valeur LMR"] >= 0.01] if opt_sort_by and opt_sort_by in df_view.columns: df_view = df_view.sort_values(opt_sort_by, ascending=(opt_sort_dir == "Croissant"), na_position='last') st.dataframe(df_view, use_container_width=True, hide_index=True, column_config={ "Valeur LMR": st.column_config.NumberColumn("LMR (mg/kg)", format="%.4f", help="Limite Maximale de Résidus"), "Date d'application": st.column_config.DateColumn("Application", format="%d/%m/%Y"), # Standard French date format "Lien Règlement": st.column_config.TextColumn("Règlement") }) if not df_view.empty: self.create_visualizations(df_view) # Visualiser si df_view n'est pas vide fname_prods = "_".join(sel_prods_names[:2]).replace(" ", "_").replace("/", "_") + ("_etc" if len(sel_prods_names)>2 else "") fname_csv = f"lmr_{fname_prods}_{datetime.now().strftime('%Y%m%d%H%M')}.csv" st.download_button("📥 Export CSV", df_view.to_csv(index=False).encode('utf-8'), fname_csv, "text/csv") else: st.info("👆 Sélectionnez un ou plusieurs produits pour afficher leurs LMR.") def create_visualizations(self, df: pd.DataFrame): st.markdown("### 🎨 Visualisations") tabs_viz = st.tabs(["📈 Évolution Temporelle", "📊 Distribution", "🏆 Top Substances"]) df_plot = df[df["Valeur LMR"].notna()].copy() # Utiliser une copie pour les modifications spécifiques aux graphiques if df_plot.empty: st.warning("Aucune donnée LMR numérique valide pour les graphiques.") return with tabs_viz[0]: if "Date d'application" in df_plot.columns and df_plot["Date d'application"].notna().any(): df_plot_time = df_plot[df_plot["Date d'application"].notna()].sort_values("Date d'application") if not df_plot_time.empty: fig_scatter = px.scatter(df_plot_time, x="Date d'application", y="Valeur LMR", color="Substance", size="Valeur LMR", hover_data=["Produit", "Lien Règlement"], title="Évolution LMR (axe Y log)", log_y=True) fig_scatter.update_layout(legend_title_text='Substance', legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)) st.plotly_chart(fig_scatter, use_container_width=True) else: st.info("Pas de données datées pour ce graphique.") else: st.info("'Date d'application' manquante ou vide.") with tabs_viz[1]: # CORRECTION: .nunique() retourne un int, pas besoin de .item() color_by_prod = "Produit" if df_plot["Produit"].nunique() < 10 and df_plot["Produit"].nunique() > 0 else None fig_hist_dist = px.histogram(df_plot, x="Valeur LMR", nbins=30, title="Distribution des LMR (axe X log)", log_x=True, labels={"Valeur LMR": "LMR (mg/kg)"}, color=color_by_prod) st.plotly_chart(fig_hist_dist, use_container_width=True) # CORRECTION: .nunique() retourne un int, pas besoin de .item() if df_plot["Produit"].nunique() > 1: fig_box_dist = px.box(df_plot, x="Produit", y="Valeur LMR", title="LMR par Produit (axe Y log)", log_y=True, color="Produit", points="outliers") st.plotly_chart(fig_box_dist, use_container_width=True) with tabs_viz[2]: if "Substance" in df_plot.columns: df_top_subs = (df_plot.groupby("Substance")["Valeur LMR"] .agg(['max', 'count', 'mean']).rename(columns=str.capitalize) .sort_values('Max', ascending=False).head(15).reset_index()) if not df_top_subs.empty: fig_bar_top = px.bar(df_top_subs, y="Substance", x='Max', orientation='h', title="Top 15 Substances (LMR max)", labels={'Max': 'LMR max (mg/kg)'}, hover_data={'Count': True, 'Mean': ':.4f'}) fig_bar_top.update_layout(yaxis={'categoryorder':'total ascending'}) st.plotly_chart(fig_bar_top, use_container_width=True) else: st.info("Pas assez de données pour le Top Substances.") else: st.info("'Substance' manquante pour ce graphique.") def main(): with st.sidebar: st.header("EU Pesticides Explorer") st.caption("Version Optimisée") st.markdown("Analyse des LMR de pesticides dans l'UE. Données via API de la Commission Européenne.") if st.button("🔄 Forcer MAJ Données", key="sidebar_btn_reload", help="Efface le cache et recharge tout."): st.cache_data.clear() if 'pesticide_app_interface' in st.session_state: del st.session_state.pesticide_app_interface st.rerun() st.markdown("---") if 'pesticide_app_interface' not in st.session_state: logger.info("Initialisation PesticideInterface (session_state)...") with st.spinner("Préparation de l'application et chargement des données de référence..."): st.session_state.pesticide_app_interface = PesticideInterface() logger.info("PesticideInterface initialisée.") st.session_state.pesticide_app_interface.create_interface() app_data_stats = st.session_state.pesticide_app_interface.data.get('stats', {}) if app_data_stats.get('download_time'): st.sidebar.caption(f"Données chargées le: {app_data_stats['download_time']}") else: st.sidebar.caption("Statut chargement données non disponible.") if __name__ == "__main__": main()