MLRSTREAMLIT / app.py
MMOON's picture
Update app.py
6365d52 verified
import logging
import json
import io
import zipfile
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any # Tuple n'est pas explicitement utilisé mais bon à garder
import pandas as pd
import requests
import plotly.express as px
import streamlit as st
from tenacity import retry, stop_after_attempt, wait_exponential
# import time # Non utilisé directement
from collections import defaultdict
# import hashlib # Non utilisé directement
import tempfile
import os
# Configuration Streamlit
st.set_page_config(page_title="Pesticide Data Explorer - Optimized", page_icon="🌿", layout="wide")
# Configuration logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("pesticide_app_optimized.log", mode='a', encoding="utf-8"),
logging.StreamHandler()
],
)
logger = logging.getLogger(__name__)
class PesticideDataFetcher:
BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides"
HEADERS = {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"User-Agent": "StreamlitPesticideApp/1.2 (compatible; Mozilla/5.0)"
}
def __init__(self):
self.session = requests.Session()
self.session.headers.update(self.HEADERS)
self.api_calls = 0
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=2, min=4, max=30))
def download_data(self, endpoint: str, params: Dict, stream_large_json: bool = False) -> Optional[Any]:
url = f"{self.BASE_URL}{endpoint}"
temp_file_path = None
try:
self.api_calls += 1
logger.info(f"Téléchargement: {url} | Params: {params} | API Call #{self.api_calls}")
if stream_large_json and params.get('format') == 'json':
logger.info(f"Mode streaming activé pour {url}")
with self.session.get(url, params=params, timeout=(10, 300), stream=True) as r:
r.raise_for_status()
with tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8', suffix=".json") as tmp_file:
temp_file_path = tmp_file.name
logger.info(f"Sauvegarde streamée vers: {temp_file_path}")
for chunk in r.iter_content(chunk_size=1024*1024): # 1MB chunks
if chunk: tmp_file.write(chunk.decode('utf-8', errors='replace'))
logger.info(f"Sauvegarde streamée terminée: {temp_file_path}")
logger.info(f"Lecture JSON depuis: {temp_file_path}")
with open(temp_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
else:
response = self.session.get(url, params=params, timeout=(10, 180))
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
if 'json' in content_type or params.get('format') == 'json':
return response.json()
elif 'csv' in content_type or params.get('format') == 'csv':
return response.text
elif 'zip' in content_type:
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
if not zf.namelist():
logger.error(f"Fichier ZIP vide: {url}")
return None
filename = zf.namelist()[0]
with zf.open(filename) as f_zip:
content = f_zip.read().decode('utf-8', errors='replace')
return json.loads(content) if filename.endswith('.json') else content
else:
logger.warning(f"Type contenu non géré: {content_type} pour {url}. Retour texte.")
return response.text
except requests.exceptions.Timeout as e:
logger.error(f"Timeout: {url} - {e}")
return None
except requests.RequestException as e:
logger.error(f"Erreur requête: {url} - {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Status: {e.response.status_code}, Réponse: {e.response.text[:200]}...")
return None
except json.JSONDecodeError as e:
logger.error(f"Erreur décodage JSON: {url} - {e}")
if temp_file_path and os.path.exists(temp_file_path):
logger.error(f"Contenu début fichier temp ({temp_file_path}):")
try:
with open(temp_file_path, 'r', encoding='utf-8') as f_err: logger.error(f_err.read(1000))
except Exception as read_err: logger.error(f"Impossible lire fichier temp: {read_err}")
return None
except Exception as e:
logger.error(f"Erreur inattendue (download_data) pour {url}: {e}", exc_info=True)
return None
finally:
if stream_large_json and temp_file_path and os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.info(f"Fichier temp supprimé: {temp_file_path}")
except OSError as e_os: logger.error(f"Impossible supprimer fichier temp {temp_file_path}: {e_os}")
def get_products_paginated(self, language: str = 'FR') -> List[Dict]:
all_products = []
url_base = f"{self.BASE_URL}/pesticide_residues_products"
params_initial = {'format': 'json', 'language': language, 'api-version': 'v2.0'}
current_url = url_base
page_num = 0
MAX_PAGES = 30
while current_url and page_num < MAX_PAGES:
self.api_calls += 1
page_num += 1
params_req = params_initial if current_url == url_base else None
logger.info(f"Produits page {page_num} depuis {current_url} (API global #{self.api_calls})")
try:
resp = self.session.get(current_url, params=params_req, timeout=(10, 45))
resp.raise_for_status()
data_page = resp.json()
except requests.RequestException as e:
logger.error(f"Erreur requête produits page {current_url}: {e}"); break
except json.JSONDecodeError as e:
logger.error(f"Erreur JSON produits page {current_url}: {e}. Reçu: {resp.text[:100]}"); break
items = data_page.get('value', []) if isinstance(data_page, dict) else (data_page if isinstance(data_page, list) else [])
if isinstance(items, list): all_products.extend(items)
current_url = data_page.get('nextLink') if isinstance(data_page, dict) else None
if not current_url: logger.info("Fin pagination produits.")
if page_num >= MAX_PAGES and current_url:
logger.warning(f"Limite pagination ({MAX_PAGES}) produits atteinte. Données potentiellement tronquées.")
logger.info(f"Récupéré {len(all_products)} produits ({page_num} pages).")
return all_products
@st.cache_data(ttl=86400, show_spinner="Chargement initial des données de référence...")
def download_all_data() -> Dict[str, Any]:
fetcher = PesticideDataFetcher()
results = {'substances': {}, 'mrls': [], 'products': [], 'product_dict': {}, 'stats': {}}
with st.status("Initialisation du téléchargement...", expanded=True) as status_bar:
status_bar.update(label="📥 Substances actives...")
data_subst = fetcher.download_data("/active_substances/download", {"format": "json", "api-version": "v2.0"})
if data_subst:
list_s = data_subst.get('value', []) if isinstance(data_subst, dict) else data_subst
if isinstance(list_s, list):
results['substances'] = {
i['substance_id']: i['substance_name']
for i in list_s if isinstance(i, dict) and i.get('substance_id') and i.get('substance_name')
}
logger.info(f"✓ {len(results['substances'])} substances.")
status_bar.update(label="📥 Enregistrements LMR (volumineux)...")
data_mrls = fetcher.download_data("/pesticide_residues_mrls/download",
{"format": "json", "language": "FR", "api-version": "v2.0"},
stream_large_json=True)
if data_mrls:
list_m = data_mrls.get('value', []) if isinstance(data_mrls, dict) else data_mrls
if isinstance(list_m, list): results['mrls'] = [i for i in list_m if isinstance(i, dict)]
logger.info(f"✓ {len(results['mrls'])} LMRs.")
status_bar.update(label="📥 Produits alimentaires...")
list_prods = fetcher.get_products_paginated(language='FR')
if isinstance(list_prods, list):
results['products'] = list_prods
results['product_dict'] = {
p['product_id']: p['product_name']
for p in list_prods if isinstance(p, dict) and p.get('product_id') and p.get('product_name')
}
logger.info(f"✓ {len(results['products'])} produits.")
results['stats'] = {'api_calls': fetcher.api_calls, 'substances_count': len(results['substances']),
'mrls_count': len(results['mrls']), 'products_count': len(results['products']),
'download_time': datetime.now().strftime('%d/%m/%Y %H:%M:%S')}
status_bar.update(label=f"✅ Données chargées! ({results['stats']['download_time']})", state="complete", expanded=False)
return results
class PesticideInterface:
def __init__(self):
self.data = download_all_data()
self._create_indexes()
def _create_indexes(self):
self.mrls_by_product = defaultdict(list)
for mrl_item in self.data.get('mrls', []):
if isinstance(mrl_item, dict) and mrl_item.get('product_id'):
self.mrls_by_product[mrl_item['product_id']].append(mrl_item)
self.product_choices = {
p_item['product_name']: p_item['product_id']
for p_item in self.data.get('products', []) if isinstance(p_item, dict) and p_item.get('product_name') and p_item.get('product_id')
}
logger.info(f"Index créés: {len(self.mrls_by_product)} produits avec LMR.")
def get_product_details(self, product_names_sel: List[str], future_only_flag: bool = False) -> pd.DataFrame:
sel_ids = [self.product_choices[name] for name in product_names_sel if name in self.product_choices]
if not sel_ids: return pd.DataFrame()
mrls_list = [mrl for pid_sel in sel_ids for mrl in self.mrls_by_product.get(pid_sel, [])]
if not mrls_list: return pd.DataFrame()
df_data = pd.DataFrame(mrls_list)
if df_data.empty: return pd.DataFrame()
df_data["Substance"] = df_data["pesticide_residue_id"].map(self.data.get('substances', {})).fillna("Inconnue")
df_data["Produit"] = df_data["product_id"].map(self.data.get('product_dict', {})).fillna("Inconnu")
def format_reg_link(row_data):
url_val, num_val = row_data.get("regulation_url"), row_data.get("regulation_number", "N/A")
return f"[{num_val}]({url_val})" if pd.notna(url_val) and str(url_val).strip().lower().startswith('http') else num_val
df_data["Lien Règlement"] = df_data.apply(format_reg_link, axis=1)
df_data["Date d'application"] = pd.to_datetime(df_data.get("entry_into_force_date"), errors="coerce")
if future_only_flag:
ts_now_utc = pd.Timestamp.now(tz='UTC')
df_dates_col = df_data["Date d'application"].copy()
if df_dates_col.dt.tz is None: df_dates_col = df_dates_col.dt.tz_localize('UTC', ambiguous='NaT', nonexistent='NaT')
else: df_dates_col = df_dates_col.dt.tz_convert('UTC')
ts_future_utc = ts_now_utc + timedelta(days=180)
df_data = df_data[ (df_dates_col.notna()) & (df_dates_col > ts_now_utc) & (df_dates_col <= ts_future_utc) ]
if df_data.empty: return pd.DataFrame()
df_data["Valeur LMR"] = pd.to_numeric(df_data.get("mrl_value"), errors='coerce')
cols_final = [c for c in ["Produit", "Substance", "Valeur LMR", "Date d'application", "Lien Règlement"] if c in df_data.columns]
df_data = df_data[cols_final].copy()
sort_order_cols = ["Produit"]
sort_asc = [True]
if "Date d'application" in df_data.columns:
sort_order_cols.append("Date d'application")
sort_asc.append(False)
df_data = df_data.sort_values(by=sort_order_cols, ascending=sort_asc, na_position='last')
return df_data
def create_interface(self):
st.title("🌿 EU Pesticides Database Explorer")
app_stats = self.data.get('stats', {})
m_col1, m_col2, m_col3, m_col4 = st.columns(4)
with m_col1: st.metric("📦 Produits", f"{app_stats.get('products_count', 0):,}")
with m_col2: st.metric("🧪 Substances", f"{app_stats.get('substances_count', 0):,}")
with m_col3: st.metric("📊 Enregistrements LMR", f"{app_stats.get('mrls_count', 0):,}")
with m_col4: st.metric("📞 Appels API", app_stats.get('api_calls', 0))
st.caption(f"Données de référence chargées ({app_stats.get('download_time', 'N/A')}).")
st.markdown("---")
ui_col1, ui_col2 = st.columns([3, 1])
with ui_col1:
opts_prods = sorted(list(self.product_choices.keys()))
sel_prods_names = st.multiselect("🔍 Sélectionnez produit(s)", options=opts_prods, placeholder="Commencez à taper...")
with ui_col2:
sel_future_only = st.checkbox("📅 Changements futurs (6 mois)", value=False, help="Nouveaux LMR ou modifications prévues.")
if sel_prods_names:
df_res = self.get_product_details(sel_prods_names, sel_future_only)
if df_res.empty:
info_msg = "Aucun changement LMR prévu." if sel_future_only else "Aucune donnée LMR trouvée."
st.info(f"{info_msg} pour la sélection actuelle.")
else:
st.markdown("### 📊 Résultats des LMR")
df_lmr_num = df_res[df_res["Valeur LMR"].notna()]
disp_col1, disp_col2 = st.columns(2)
with disp_col1: st.metric("Lignes affichées", len(df_res))
with disp_col2: st.metric("Substances uniques", df_lmr_num["Substance"].nunique() if not df_lmr_num.empty else 0)
with st.expander("⚙️ Options d'affichage", expanded=False):
opt_show_low_mrl = st.checkbox("Inclure LMR < 0.01 mg/kg", value=True)
opts_sort = [c for c in ["Produit", "Substance", "Valeur LMR", "Date d'application"] if c in df_res.columns]
opt_sort_by = None
if opts_sort:
def_sort_idx_opt = opts_sort.index("Date d'application") if "Date d'application" in opts_sort else 0
opt_sort_by = st.selectbox("Trier par", opts_sort, index=def_sort_idx_opt)
opt_sort_dir = st.radio("Ordre", ["Croissant", "Décroissant"], horizontal=True, index=1 if opt_sort_by=="Date d'application" else 0)
df_view = df_res.copy()
if not opt_show_low_mrl and "Valeur LMR" in df_view.columns:
df_view = df_view[df_view["Valeur LMR"] >= 0.01]
if opt_sort_by and opt_sort_by in df_view.columns:
df_view = df_view.sort_values(opt_sort_by, ascending=(opt_sort_dir == "Croissant"), na_position='last')
st.dataframe(df_view, use_container_width=True, hide_index=True,
column_config={
"Valeur LMR": st.column_config.NumberColumn("LMR (mg/kg)", format="%.4f", help="Limite Maximale de Résidus"),
"Date d'application": st.column_config.DateColumn("Application", format="%d/%m/%Y"), # Standard French date format
"Lien Règlement": st.column_config.TextColumn("Règlement")
})
if not df_view.empty: self.create_visualizations(df_view) # Visualiser si df_view n'est pas vide
fname_prods = "_".join(sel_prods_names[:2]).replace(" ", "_").replace("/", "_") + ("_etc" if len(sel_prods_names)>2 else "")
fname_csv = f"lmr_{fname_prods}_{datetime.now().strftime('%Y%m%d%H%M')}.csv"
st.download_button("📥 Export CSV", df_view.to_csv(index=False).encode('utf-8'), fname_csv, "text/csv")
else:
st.info("👆 Sélectionnez un ou plusieurs produits pour afficher leurs LMR.")
def create_visualizations(self, df: pd.DataFrame):
st.markdown("### 🎨 Visualisations")
tabs_viz = st.tabs(["📈 Évolution Temporelle", "📊 Distribution", "🏆 Top Substances"])
df_plot = df[df["Valeur LMR"].notna()].copy() # Utiliser une copie pour les modifications spécifiques aux graphiques
if df_plot.empty:
st.warning("Aucune donnée LMR numérique valide pour les graphiques.")
return
with tabs_viz[0]:
if "Date d'application" in df_plot.columns and df_plot["Date d'application"].notna().any():
df_plot_time = df_plot[df_plot["Date d'application"].notna()].sort_values("Date d'application")
if not df_plot_time.empty:
fig_scatter = px.scatter(df_plot_time, x="Date d'application", y="Valeur LMR", color="Substance", size="Valeur LMR",
hover_data=["Produit", "Lien Règlement"], title="Évolution LMR (axe Y log)", log_y=True)
fig_scatter.update_layout(legend_title_text='Substance', legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1))
st.plotly_chart(fig_scatter, use_container_width=True)
else: st.info("Pas de données datées pour ce graphique.")
else: st.info("'Date d'application' manquante ou vide.")
with tabs_viz[1]:
# CORRECTION: .nunique() retourne un int, pas besoin de .item()
color_by_prod = "Produit" if df_plot["Produit"].nunique() < 10 and df_plot["Produit"].nunique() > 0 else None
fig_hist_dist = px.histogram(df_plot, x="Valeur LMR", nbins=30, title="Distribution des LMR (axe X log)", log_x=True,
labels={"Valeur LMR": "LMR (mg/kg)"}, color=color_by_prod)
st.plotly_chart(fig_hist_dist, use_container_width=True)
# CORRECTION: .nunique() retourne un int, pas besoin de .item()
if df_plot["Produit"].nunique() > 1:
fig_box_dist = px.box(df_plot, x="Produit", y="Valeur LMR", title="LMR par Produit (axe Y log)", log_y=True,
color="Produit", points="outliers")
st.plotly_chart(fig_box_dist, use_container_width=True)
with tabs_viz[2]:
if "Substance" in df_plot.columns:
df_top_subs = (df_plot.groupby("Substance")["Valeur LMR"]
.agg(['max', 'count', 'mean']).rename(columns=str.capitalize)
.sort_values('Max', ascending=False).head(15).reset_index())
if not df_top_subs.empty:
fig_bar_top = px.bar(df_top_subs, y="Substance", x='Max', orientation='h', title="Top 15 Substances (LMR max)",
labels={'Max': 'LMR max (mg/kg)'}, hover_data={'Count': True, 'Mean': ':.4f'})
fig_bar_top.update_layout(yaxis={'categoryorder':'total ascending'})
st.plotly_chart(fig_bar_top, use_container_width=True)
else: st.info("Pas assez de données pour le Top Substances.")
else: st.info("'Substance' manquante pour ce graphique.")
def main():
with st.sidebar:
st.header("EU Pesticides Explorer")
st.caption("Version Optimisée")
st.markdown("Analyse des LMR de pesticides dans l'UE. Données via API de la Commission Européenne.")
if st.button("🔄 Forcer MAJ Données", key="sidebar_btn_reload", help="Efface le cache et recharge tout."):
st.cache_data.clear()
if 'pesticide_app_interface' in st.session_state: del st.session_state.pesticide_app_interface
st.rerun()
st.markdown("---")
if 'pesticide_app_interface' not in st.session_state:
logger.info("Initialisation PesticideInterface (session_state)...")
with st.spinner("Préparation de l'application et chargement des données de référence..."):
st.session_state.pesticide_app_interface = PesticideInterface()
logger.info("PesticideInterface initialisée.")
st.session_state.pesticide_app_interface.create_interface()
app_data_stats = st.session_state.pesticide_app_interface.data.get('stats', {})
if app_data_stats.get('download_time'):
st.sidebar.caption(f"Données chargées le: {app_data_stats['download_time']}")
else:
st.sidebar.caption("Statut chargement données non disponible.")
if __name__ == "__main__":
main()