MLRSTREAMLIT / appv1.py
MMOON's picture
Rename app.py to appv1.py
d96ab49 verified
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import pandas as pd
import requests
import plotly.express as px
import streamlit as st
from tenacity import retry, stop_after_attempt, wait_exponential
# Configuration Streamlit
st.set_page_config(page_title="Pesticide Data Explorer", page_icon="🌿", layout="wide")
# Configuration logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("pesticide_app.log", encoding="utf-8"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
class PesticideDataFetcher:
BASE_URL = "https://api.datalake.sante.service.ec.europa.eu/sante/pesticides"
HEADERS = {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"User-Agent": "Mozilla/5.0"
}
def __init__(self):
self.session = requests.Session()
self.session.headers.update(self.HEADERS)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_data(self, url: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Effectue une requête GET avec gestion des erreurs améliorée"""
try:
response = self.session.get(url, params=params, timeout=15)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Erreur API : {e}")
if "timeout" in str(e).lower():
logger.warning("Timeout - tentative de reconnexion...")
return {}
def get_mrls(self, product_ids: Optional[List[int]] = None) -> List[Dict]:
"""Récupère les LMR pour une liste de produits"""
all_mrls = []
if not product_ids:
url = f"{self.BASE_URL}/pesticide_residues_mrls?format=json&api-version=v2.0"
response = self.fetch_data(url)
return response.get("value", [])
for product_id in product_ids:
url = f"{self.BASE_URL}/pesticide_residues_mrls?format=json&api-version=v2.0"
params = {"product_id": product_id}
response = self.fetch_data(url, params=params)
if response and "value" in response:
all_mrls.extend(response["value"])
return all_mrls
@st.cache_data(ttl=3600) # Cache augmenté à 1 heure
def get_products() -> List[Dict]:
fetcher = PesticideDataFetcher()
url = f"{fetcher.BASE_URL}/pesticide_residues_products?format=json&language=FR&api-version=v2.0"
all_products = []
while url:
response = fetcher.fetch_data(url)
if not response or "value" not in response:
break
all_products.extend(response["value"])
url = response.get("nextLink")
return all_products
@st.cache_data(ttl=3600)
def get_all_substances() -> Dict[int, str]:
fetcher = PesticideDataFetcher()
url = f"{fetcher.BASE_URL}/active_substances?format=json&api-version=v2.0"
all_substances = {}
while url:
response = fetcher.fetch_data(url)
if not response or "value" not in response:
break
for item in response["value"]:
if item.get("substance_id") and item.get("substance_name"):
all_substances[item["substance_id"]] = item["substance_name"]
url = response.get("nextLink")
return all_substances
class PesticideInterface:
def __init__(self):
self.fetcher = PesticideDataFetcher()
self.products = get_products()
self.product_choices = {p['product_name']: p['product_id'] for p in self.products}
self.substances = get_all_substances()
def get_product_details(self, product_names: List[str], future_only: bool = False) -> pd.DataFrame:
product_ids = [self.product_choices[name] for name in product_names]
all_mrls = self.fetcher.get_mrls(product_ids)
df = pd.DataFrame(all_mrls)
if df.empty:
if future_only:
st.info("Aucun changement de LMR prévu dans les 6 prochains mois pour les produits sélectionnés.")
return df
# Log des données pour debug
logger.info(f"Nombre total d'entrées : {len(df)}")
logger.info(f"Colonnes disponibles : {df.columns.tolist()}")
# Enrichir les données
df["Substance"] = df["pesticide_residue_id"].map(self.substances)
# Log des substances non trouvées
missing_substances = df[df["Substance"].isna()]["pesticide_residue_id"].unique()
if len(missing_substances) > 0:
logger.warning(f"Substances non trouvées: {missing_substances}")
# Remplacer les NA par "Inconnu" seulement après le log
df["Substance"] = df["Substance"].fillna("Inconnu")
df["Règlement"] = df.apply(
lambda x: f'<a href="{x["regulation_url"]}" target="_blank">{x["regulation_number"]}</a>'
if pd.notna(x["regulation_url"]) else x["regulation_number"],
axis=1
)
# Conversion des dates
df["Date d'application"] = pd.to_datetime(df["entry_into_force_date"], errors="coerce")
# Filtrer pour les 6 prochains mois si demandé
if future_only:
now = datetime.now()
future_date = now + timedelta(days=180)
future_df = df[
(df["Date d'application"] > now) &
(df["Date d'application"] <= future_date)
]
if future_df.empty:
st.info(f"🔍 Aucun changement de LMR prévu entre le {now.strftime('%d/%m/%Y')} et le {future_date.strftime('%d/%m/%Y')} pour les produits sélectionnés.")
return pd.DataFrame() # Retourne un DataFrame vide
df = future_df
# Renommer et convertir les colonnes
df = df.rename(columns={"mrl_value": "Valeur LMR"})
# Conversion explicite en nombre flottant
df["Valeur LMR"] = pd.to_numeric(df["Valeur LMR"], errors='coerce')
# Sélection finale des colonnes
columns = ["Substance", "Valeur LMR", "Date d'application", "Règlement"]
df = df[columns].sort_values("Date d'application", ascending=False)
return df
def create_interface(self):
st.title("🌿 Base de données des pesticides de l'UE")
col1, col2 = st.columns([3, 1])
with col1:
product_names = st.multiselect(
"Sélectionnez un ou plusieurs produits",
list(self.product_choices.keys())
)
with col2:
future_only = st.checkbox("Uniquement les 6 prochains mois", value=False)
if st.button("Afficher les données"):
if not product_names:
st.warning("Veuillez sélectionner au moins un produit.")
return
df = self.get_product_details(product_names, future_only)
if df.empty:
return # Le message d'info a déjà été affiché dans get_product_details
else:
if future_only:
st.markdown("### Changements de LMR prévus dans les 6 prochains mois")
else:
st.markdown("### Tableau des LMR")
# Formatage amélioré du tableau
df_display = df.copy()
df_display["Date d'application"] = df_display["Date d'application"].dt.strftime('%d/%m/%Y')
# Fonction de formatage personnalisée
def format_value(val):
if pd.isna(val):
return '-'
elif isinstance(val, (int, float)):
return f"{val:.3f}"
return val
styled_df = df_display.style.format({
'Valeur LMR': format_value,
}).hide_index()
# Ajouter un résumé des changements
if not df.empty:
nb_changes = len(df)
st.info(f"📊 {nb_changes} entrée{'s' if nb_changes > 1 else ''} trouvée{'s' if nb_changes > 1 else ''}.")
st.markdown("""
<style>
table {
border-collapse: collapse;
margin: 25px 0;
font-size: 0.9em;
font-family: sans-serif;
min-width: 400px;
box-shadow: 0 0 20px rgba(0, 0, 0, 0.15);
}
table thead tr {
background-color: #009879;
color: #ffffff;
text-align: left;
}
table th,
table td {
padding: 12px 15px;
border: 1px solid #ddd;
}
table tbody tr {
border-bottom: 1px solid #dddddd;
}
table tbody tr:nth-of-type(even) {
background-color: #f3f3f3;
}
</style>
""", unsafe_allow_html=True)
st.markdown(styled_df.to_html(escape=False), unsafe_allow_html=True)
# Créer les visualisations
self.create_visualizations(df)
def create_visualizations(self, df: pd.DataFrame):
"""Crée les visualisations des données"""
# Graphique d'évolution des LMR
fig1 = px.scatter(
df,
x="Date d'application",
y="Valeur LMR",
color="Substance",
title="Évolution des LMR dans le temps",
hover_data=["Règlement"]
)
st.plotly_chart(fig1, use_container_width=True)
def main():
interface = PesticideInterface()
interface.create_interface()
if __name__ == "__main__":
main()