Spaces:
Sleeping
Sleeping
| import logging | |
| import json | |
| import io | |
| import zipfile | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any # Tuple n'est pas explicitement utilisé mais bon à garder | |
| import pandas as pd | |
| import requests | |
| import plotly.express as px | |
| import streamlit as st | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # import time # Non utilisé directement | |
| from collections import defaultdict | |
| # import hashlib # Non utilisé directement | |
| import tempfile | |
| import os | |
| # Configuration Streamlit | |
| st.set_page_config(page_title="Pesticide Data Explorer - Optimized", page_icon="🌿", layout="wide") | |
| # Configuration logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[ | |
| logging.FileHandler("pesticide_app_optimized.log", mode='a', encoding="utf-8"), | |
| logging.StreamHandler() | |
| ], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class PesticideDataFetcher: | |
| BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides" | |
| HEADERS = { | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-cache", | |
| "User-Agent": "StreamlitPesticideApp/1.2 (compatible; Mozilla/5.0)" | |
| } | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update(self.HEADERS) | |
| self.api_calls = 0 | |
| def download_data(self, endpoint: str, params: Dict, stream_large_json: bool = False) -> Optional[Any]: | |
| url = f"{self.BASE_URL}{endpoint}" | |
| temp_file_path = None | |
| try: | |
| self.api_calls += 1 | |
| logger.info(f"Téléchargement: {url} | Params: {params} | API Call #{self.api_calls}") | |
| if stream_large_json and params.get('format') == 'json': | |
| logger.info(f"Mode streaming activé pour {url}") | |
| with self.session.get(url, params=params, timeout=(10, 300), stream=True) as r: | |
| r.raise_for_status() | |
| with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8', suffix=".json") as tmp_file: | |
| temp_file_path = tmp_file.name | |
| logger.info(f"Sauvegarde streamée vers: {temp_file_path}") | |
| for chunk in r.iter_content(chunk_size=1024*1024): # 1MB chunks | |
| if chunk: tmp_file.write(chunk.decode('utf-8', errors='replace')) | |
| logger.info(f"Sauvegarde streamée terminée: {temp_file_path}") | |
| logger.info(f"Lecture JSON depuis: {temp_file_path}") | |
| with open(temp_file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data | |
| else: | |
| response = self.session.get(url, params=params, timeout=(10, 180)) | |
| response.raise_for_status() | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'json' in content_type or params.get('format') == 'json': | |
| return response.json() | |
| elif 'csv' in content_type or params.get('format') == 'csv': | |
| return response.text | |
| elif 'zip' in content_type: | |
| with zipfile.ZipFile(io.BytesIO(response.content)) as zf: | |
| if not zf.namelist(): | |
| logger.error(f"Fichier ZIP vide: {url}") | |
| return None | |
| filename = zf.namelist()[0] | |
| with zf.open(filename) as f_zip: | |
| content = f_zip.read().decode('utf-8', errors='replace') | |
| return json.loads(content) if filename.endswith('.json') else content | |
| else: | |
| logger.warning(f"Type contenu non géré: {content_type} pour {url}. Retour texte.") | |
| return response.text | |
| except requests.exceptions.Timeout as e: | |
| logger.error(f"Timeout: {url} - {e}") | |
| return None | |
| except requests.RequestException as e: | |
| logger.error(f"Erreur requête: {url} - {e}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| logger.error(f"Status: {e.response.status_code}, Réponse: {e.response.text[:200]}...") | |
| return None | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Erreur décodage JSON: {url} - {e}") | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| logger.error(f"Contenu début fichier temp ({temp_file_path}):") | |
| try: | |
| with open(temp_file_path, 'r', encoding='utf-8') as f_err: logger.error(f_err.read(1000)) | |
| except Exception as read_err: logger.error(f"Impossible lire fichier temp: {read_err}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Erreur inattendue (download_data) pour {url}: {e}", exc_info=True) | |
| return None | |
| finally: | |
| if stream_large_json and temp_file_path and os.path.exists(temp_file_path): | |
| try: | |
| os.remove(temp_file_path) | |
| logger.info(f"Fichier temp supprimé: {temp_file_path}") | |
| except OSError as e_os: logger.error(f"Impossible supprimer fichier temp {temp_file_path}: {e_os}") | |
| def get_products_paginated(self, language: str = 'FR') -> List[Dict]: | |
| all_products = [] | |
| url_base = f"{self.BASE_URL}/pesticide_residues_products" | |
| params_initial = {'format': 'json', 'language': language, 'api-version': 'v2.0'} | |
| current_url = url_base | |
| page_num = 0 | |
| MAX_PAGES = 30 | |
| while current_url and page_num < MAX_PAGES: | |
| self.api_calls += 1 | |
| page_num += 1 | |
| params_req = params_initial if current_url == url_base else None | |
| logger.info(f"Produits page {page_num} depuis {current_url} (API global #{self.api_calls})") | |
| try: | |
| resp = self.session.get(current_url, params=params_req, timeout=(10, 45)) | |
| resp.raise_for_status() | |
| data_page = resp.json() | |
| except requests.RequestException as e: | |
| logger.error(f"Erreur requête produits page {current_url}: {e}"); break | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Erreur JSON produits page {current_url}: {e}. Reçu: {resp.text[:100]}"); break | |
| items = data_page.get('value', []) if isinstance(data_page, dict) else (data_page if isinstance(data_page, list) else []) | |
| if isinstance(items, list): all_products.extend(items) | |
| current_url = data_page.get('nextLink') if isinstance(data_page, dict) else None | |
| if not current_url: logger.info("Fin pagination produits.") | |
| if page_num >= MAX_PAGES and current_url: | |
| logger.warning(f"Limite pagination ({MAX_PAGES}) produits atteinte. Données potentiellement tronquées.") | |
| logger.info(f"Récupéré {len(all_products)} produits ({page_num} pages).") | |
| return all_products | |
| def download_all_data() -> Dict[str, Any]: | |
| fetcher = PesticideDataFetcher() | |
| results = {'substances': {}, 'mrls': [], 'products': [], 'product_dict': {}, 'stats': {}} | |
| with st.status("Initialisation du téléchargement...", expanded=True) as status_bar: | |
| status_bar.update(label="📥 Substances actives...") | |
| data_subst = fetcher.download_data("/active_substances/download", {"format": "json", "api-version": "v2.0"}) | |
| if data_subst: | |
| list_s = data_subst.get('value', []) if isinstance(data_subst, dict) else data_subst | |
| if isinstance(list_s, list): | |
| results['substances'] = { | |
| i['substance_id']: i['substance_name'] | |
| for i in list_s if isinstance(i, dict) and i.get('substance_id') and i.get('substance_name') | |
| } | |
| logger.info(f"✓ {len(results['substances'])} substances.") | |
| status_bar.update(label="📥 Enregistrements LMR (volumineux)...") | |
| data_mrls = fetcher.download_data("/pesticide_residues_mrls/download", | |
| {"format": "json", "language": "FR", "api-version": "v2.0"}, | |
| stream_large_json=True) | |
| if data_mrls: | |
| list_m = data_mrls.get('value', []) if isinstance(data_mrls, dict) else data_mrls | |
| if isinstance(list_m, list): results['mrls'] = [i for i in list_m if isinstance(i, dict)] | |
| logger.info(f"✓ {len(results['mrls'])} LMRs.") | |
| status_bar.update(label="📥 Produits alimentaires...") | |
| list_prods = fetcher.get_products_paginated(language='FR') | |
| if isinstance(list_prods, list): | |
| results['products'] = list_prods | |
| results['product_dict'] = { | |
| p['product_id']: p['product_name'] | |
| for p in list_prods if isinstance(p, dict) and p.get('product_id') and p.get('product_name') | |
| } | |
| logger.info(f"✓ {len(results['products'])} produits.") | |
| results['stats'] = {'api_calls': fetcher.api_calls, 'substances_count': len(results['substances']), | |
| 'mrls_count': len(results['mrls']), 'products_count': len(results['products']), | |
| 'download_time': datetime.now().strftime('%d/%m/%Y %H:%M:%S')} | |
| status_bar.update(label=f"✅ Données chargées! ({results['stats']['download_time']})", state="complete", expanded=False) | |
| return results | |
| class PesticideInterface: | |
| def __init__(self): | |
| self.data = download_all_data() | |
| self._create_indexes() | |
| def _create_indexes(self): | |
| self.mrls_by_product = defaultdict(list) | |
| for mrl_item in self.data.get('mrls', []): | |
| if isinstance(mrl_item, dict) and mrl_item.get('product_id'): | |
| self.mrls_by_product[mrl_item['product_id']].append(mrl_item) | |
| self.product_choices = { | |
| p_item['product_name']: p_item['product_id'] | |
| for p_item in self.data.get('products', []) if isinstance(p_item, dict) and p_item.get('product_name') and p_item.get('product_id') | |
| } | |
| logger.info(f"Index créés: {len(self.mrls_by_product)} produits avec LMR.") | |
| def get_product_details(self, product_names_sel: List[str], future_only_flag: bool = False) -> pd.DataFrame: | |
| sel_ids = [self.product_choices[name] for name in product_names_sel if name in self.product_choices] | |
| if not sel_ids: return pd.DataFrame() | |
| mrls_list = [mrl for pid_sel in sel_ids for mrl in self.mrls_by_product.get(pid_sel, [])] | |
| if not mrls_list: return pd.DataFrame() | |
| df_data = pd.DataFrame(mrls_list) | |
| if df_data.empty: return pd.DataFrame() | |
| df_data["Substance"] = df_data["pesticide_residue_id"].map(self.data.get('substances', {})).fillna("Inconnue") | |
| df_data["Produit"] = df_data["product_id"].map(self.data.get('product_dict', {})).fillna("Inconnu") | |
| def format_reg_link(row_data): | |
| url_val, num_val = row_data.get("regulation_url"), row_data.get("regulation_number", "N/A") | |
| return f"[{num_val}]({url_val})" if pd.notna(url_val) and str(url_val).strip().lower().startswith('http') else num_val | |
| df_data["Lien Règlement"] = df_data.apply(format_reg_link, axis=1) | |
| df_data["Date d'application"] = pd.to_datetime(df_data.get("entry_into_force_date"), errors="coerce") | |
| if future_only_flag: | |
| ts_now_utc = pd.Timestamp.now(tz='UTC') | |
| df_dates_col = df_data["Date d'application"].copy() | |
| if df_dates_col.dt.tz is None: df_dates_col = df_dates_col.dt.tz_localize('UTC', ambiguous='NaT', nonexistent='NaT') | |
| else: df_dates_col = df_dates_col.dt.tz_convert('UTC') | |
| ts_future_utc = ts_now_utc + timedelta(days=180) | |
| df_data = df_data[ (df_dates_col.notna()) & (df_dates_col > ts_now_utc) & (df_dates_col <= ts_future_utc) ] | |
| if df_data.empty: return pd.DataFrame() | |
| df_data["Valeur LMR"] = pd.to_numeric(df_data.get("mrl_value"), errors='coerce') | |
| cols_final = [c for c in ["Produit", "Substance", "Valeur LMR", "Date d'application", "Lien Règlement"] if c in df_data.columns] | |
| df_data = df_data[cols_final].copy() | |
| sort_order_cols = ["Produit"] | |
| sort_asc = [True] | |
| if "Date d'application" in df_data.columns: | |
| sort_order_cols.append("Date d'application") | |
| sort_asc.append(False) | |
| df_data = df_data.sort_values(by=sort_order_cols, ascending=sort_asc, na_position='last') | |
| return df_data | |
| def create_interface(self): | |
| st.title("🌿 EU Pesticides Database Explorer") | |
| app_stats = self.data.get('stats', {}) | |
| m_col1, m_col2, m_col3, m_col4 = st.columns(4) | |
| with m_col1: st.metric("📦 Produits", f"{app_stats.get('products_count', 0):,}") | |
| with m_col2: st.metric("🧪 Substances", f"{app_stats.get('substances_count', 0):,}") | |
| with m_col3: st.metric("📊 Enregistrements LMR", f"{app_stats.get('mrls_count', 0):,}") | |
| with m_col4: st.metric("📞 Appels API", app_stats.get('api_calls', 0)) | |
| st.caption(f"Données de référence chargées ({app_stats.get('download_time', 'N/A')}).") | |
| st.markdown("---") | |
| ui_col1, ui_col2 = st.columns([3, 1]) | |
| with ui_col1: | |
| opts_prods = sorted(list(self.product_choices.keys())) | |
| sel_prods_names = st.multiselect("🔍 Sélectionnez produit(s)", options=opts_prods, placeholder="Commencez à taper...") | |
| with ui_col2: | |
| sel_future_only = st.checkbox("📅 Changements futurs (6 mois)", value=False, help="Nouveaux LMR ou modifications prévues.") | |
| if sel_prods_names: | |
| df_res = self.get_product_details(sel_prods_names, sel_future_only) | |
| if df_res.empty: | |
| info_msg = "Aucun changement LMR prévu." if sel_future_only else "Aucune donnée LMR trouvée." | |
| st.info(f"{info_msg} pour la sélection actuelle.") | |
| else: | |
| st.markdown("### 📊 Résultats des LMR") | |
| df_lmr_num = df_res[df_res["Valeur LMR"].notna()] | |
| disp_col1, disp_col2 = st.columns(2) | |
| with disp_col1: st.metric("Lignes affichées", len(df_res)) | |
| with disp_col2: st.metric("Substances uniques", df_lmr_num["Substance"].nunique() if not df_lmr_num.empty else 0) | |
| with st.expander("⚙️ Options d'affichage", expanded=False): | |
| opt_show_low_mrl = st.checkbox("Inclure LMR < 0.01 mg/kg", value=True) | |
| opts_sort = [c for c in ["Produit", "Substance", "Valeur LMR", "Date d'application"] if c in df_res.columns] | |
| opt_sort_by = None | |
| if opts_sort: | |
| def_sort_idx_opt = opts_sort.index("Date d'application") if "Date d'application" in opts_sort else 0 | |
| opt_sort_by = st.selectbox("Trier par", opts_sort, index=def_sort_idx_opt) | |
| opt_sort_dir = st.radio("Ordre", ["Croissant", "Décroissant"], horizontal=True, index=1 if opt_sort_by=="Date d'application" else 0) | |
| df_view = df_res.copy() | |
| if not opt_show_low_mrl and "Valeur LMR" in df_view.columns: | |
| df_view = df_view[df_view["Valeur LMR"] >= 0.01] | |
| if opt_sort_by and opt_sort_by in df_view.columns: | |
| df_view = df_view.sort_values(opt_sort_by, ascending=(opt_sort_dir == "Croissant"), na_position='last') | |
| st.dataframe(df_view, use_container_width=True, hide_index=True, | |
| column_config={ | |
| "Valeur LMR": st.column_config.NumberColumn("LMR (mg/kg)", format="%.4f", help="Limite Maximale de Résidus"), | |
| "Date d'application": st.column_config.DateColumn("Application", format="%d/%m/%Y"), # Standard French date format | |
| "Lien Règlement": st.column_config.TextColumn("Règlement") | |
| }) | |
| if not df_view.empty: self.create_visualizations(df_view) # Visualiser si df_view n'est pas vide | |
| fname_prods = "_".join(sel_prods_names[:2]).replace(" ", "_").replace("/", "_") + ("_etc" if len(sel_prods_names)>2 else "") | |
| fname_csv = f"lmr_{fname_prods}_{datetime.now().strftime('%Y%m%d%H%M')}.csv" | |
| st.download_button("📥 Export CSV", df_view.to_csv(index=False).encode('utf-8'), fname_csv, "text/csv") | |
| else: | |
| st.info("👆 Sélectionnez un ou plusieurs produits pour afficher leurs LMR.") | |
| def create_visualizations(self, df: pd.DataFrame): | |
| st.markdown("### 🎨 Visualisations") | |
| tabs_viz = st.tabs(["📈 Évolution Temporelle", "📊 Distribution", "🏆 Top Substances"]) | |
| df_plot = df[df["Valeur LMR"].notna()].copy() # Utiliser une copie pour les modifications spécifiques aux graphiques | |
| if df_plot.empty: | |
| st.warning("Aucune donnée LMR numérique valide pour les graphiques.") | |
| return | |
| with tabs_viz[0]: | |
| if "Date d'application" in df_plot.columns and df_plot["Date d'application"].notna().any(): | |
| df_plot_time = df_plot[df_plot["Date d'application"].notna()].sort_values("Date d'application") | |
| if not df_plot_time.empty: | |
| fig_scatter = px.scatter(df_plot_time, x="Date d'application", y="Valeur LMR", color="Substance", size="Valeur LMR", | |
| hover_data=["Produit", "Lien Règlement"], title="Évolution LMR (axe Y log)", log_y=True) | |
| fig_scatter.update_layout(legend_title_text='Substance', legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)) | |
| st.plotly_chart(fig_scatter, use_container_width=True) | |
| else: st.info("Pas de données datées pour ce graphique.") | |
| else: st.info("'Date d'application' manquante ou vide.") | |
| with tabs_viz[1]: | |
| # CORRECTION: .nunique() retourne un int, pas besoin de .item() | |
| color_by_prod = "Produit" if df_plot["Produit"].nunique() < 10 and df_plot["Produit"].nunique() > 0 else None | |
| fig_hist_dist = px.histogram(df_plot, x="Valeur LMR", nbins=30, title="Distribution des LMR (axe X log)", log_x=True, | |
| labels={"Valeur LMR": "LMR (mg/kg)"}, color=color_by_prod) | |
| st.plotly_chart(fig_hist_dist, use_container_width=True) | |
| # CORRECTION: .nunique() retourne un int, pas besoin de .item() | |
| if df_plot["Produit"].nunique() > 1: | |
| fig_box_dist = px.box(df_plot, x="Produit", y="Valeur LMR", title="LMR par Produit (axe Y log)", log_y=True, | |
| color="Produit", points="outliers") | |
| st.plotly_chart(fig_box_dist, use_container_width=True) | |
| with tabs_viz[2]: | |
| if "Substance" in df_plot.columns: | |
| df_top_subs = (df_plot.groupby("Substance")["Valeur LMR"] | |
| .agg(['max', 'count', 'mean']).rename(columns=str.capitalize) | |
| .sort_values('Max', ascending=False).head(15).reset_index()) | |
| if not df_top_subs.empty: | |
| fig_bar_top = px.bar(df_top_subs, y="Substance", x='Max', orientation='h', title="Top 15 Substances (LMR max)", | |
| labels={'Max': 'LMR max (mg/kg)'}, hover_data={'Count': True, 'Mean': ':.4f'}) | |
| fig_bar_top.update_layout(yaxis={'categoryorder':'total ascending'}) | |
| st.plotly_chart(fig_bar_top, use_container_width=True) | |
| else: st.info("Pas assez de données pour le Top Substances.") | |
| else: st.info("'Substance' manquante pour ce graphique.") | |
| def main(): | |
| with st.sidebar: | |
| st.header("EU Pesticides Explorer") | |
| st.caption("Version Optimisée") | |
| st.markdown("Analyse des LMR de pesticides dans l'UE. Données via API de la Commission Européenne.") | |
| if st.button("🔄 Forcer MAJ Données", key="sidebar_btn_reload", help="Efface le cache et recharge tout."): | |
| st.cache_data.clear() | |
| if 'pesticide_app_interface' in st.session_state: del st.session_state.pesticide_app_interface | |
| st.rerun() | |
| st.markdown("---") | |
| if 'pesticide_app_interface' not in st.session_state: | |
| logger.info("Initialisation PesticideInterface (session_state)...") | |
| with st.spinner("Préparation de l'application et chargement des données de référence..."): | |
| st.session_state.pesticide_app_interface = PesticideInterface() | |
| logger.info("PesticideInterface initialisée.") | |
| st.session_state.pesticide_app_interface.create_interface() | |
| app_data_stats = st.session_state.pesticide_app_interface.data.get('stats', {}) | |
| if app_data_stats.get('download_time'): | |
| st.sidebar.caption(f"Données chargées le: {app_data_stats['download_time']}") | |
| else: | |
| st.sidebar.caption("Statut chargement données non disponible.") | |
| if __name__ == "__main__": | |
| main() |