RasffDBv1 / app2.py
MMOON's picture
Rename app.py to app2.py
b790d78 verified
import streamlit as st
import pandas as pd
import sqlite3
import plotly.express as px
import plotly.graph_objects as go
from io import BytesIO
import datetime
from st_aggrid import AgGrid, GridOptionsBuilder
import altair as alt
import os
import requests
# === CONFIGURATION STREAMLIT ===
st.set_page_config(
page_title="RASFF Alerts Dashboard",
page_icon="🚨",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS pour améliorer l'interface
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
color: #1E88E5;
font-weight: 700;
}
.sub-header {
font-size: 1.5rem;
color: #004D40;
font-weight: 600;
}
.card {
padding: 1.5rem;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
background-color: white;
margin-bottom: 1rem;
}
.info-box {
background-color: #E3F2FD;
padding: 1rem;
border-radius: 5px;
border-left: 5px solid #1E88E5;
}
.success-box {
background-color: #E8F5E9;
padding: 1rem;
border-radius: 5px;
border-left: 5px solid #4CAF50;
}
.warning-box {
background-color: #FFF8E1;
padding: 1rem;
border-radius: 5px;
border-left: 5px solid #FFB300;
}
.error-box {
background-color: #FFEBEE;
padding: 1rem;
border-radius: 5px;
border-left: 5px solid #F44336;
}
.stButton>button {
width: 100%;
border-radius: 5px;
height: 3rem;
font-weight: 600;
}
</style>
""", unsafe_allow_html=True)
# === CONFIGURATION BASE DE DONNÉES ===
DB_PATH = "rasff_data.db"
DB_URL = "https://raw.githubusercontent.com/M00N69/RASFFDB/main/rasff_data.db"
CURRENT_YEAR = datetime.datetime.now().year
CURRENT_WEEK = datetime.datetime.now().isocalendar()[1]
WEEKS = list(range(1, 53))
# Configuration pour le cache
cache_ttl = 3600 # 1 heure
# === FONCTIONS ===
def download_db():
"""Télécharge et remplace la base de données depuis GitHub"""
with st.spinner("📥 Téléchargement de la base de données..."):
response = requests.get(DB_URL)
response.raise_for_status()
with open(DB_PATH, "wb") as file:
file.write(response.content)
st.success("✔️ Base de données téléchargée avec succès!")
st.cache_data.clear() # Efface le cache obsolète
# Recréer la table si nécessaire
create_database()
# Forcer le rechargement avec st.stop()
if 'reloaded' not in st.session_state:
st.session_state['reloaded'] = True
st.stop()
def create_database():
"""Crée la base de données si elle n'existe pas"""
if not os.path.exists(DB_PATH):
st.error("Base de données non trouvée. Veuillez la télécharger manuellement.")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS rasff_notifications (
reference TEXT PRIMARY KEY,
category TEXT,
type TEXT,
subject TEXT,
date DATETIME,
notifying_country TEXT,
classification TEXT,
risk_decision TEXT,
distribution TEXT,
forAttention TEXT,
forFollowUp TEXT,
operator TEXT,
origin TEXT,
hazards TEXT
)
""")
conn.commit()
conn.close()
def check_database_structure():
"""Vérifie la structure de la base de données et affiche des informations"""
if not os.path.exists(DB_PATH):
st.error("⚠️ Base de données non trouvée.")
return
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Vérifier si la table existe
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='rasff_notifications';")
table_exists = cursor.fetchone()
if not table_exists:
st.error("⚠️ La table 'rasff_notifications' n'existe pas dans la base de données.")
conn.close()
return
# Afficher les colonnes
cursor.execute("PRAGMA table_info(rasff_notifications);")
columns = cursor.fetchall()
columns_df = pd.DataFrame(columns, columns=["cid", "name", "type", "notnull", "dflt_value", "pk"])
st.subheader("Structure de la table")
st.dataframe(columns_df)
# Vérifier le nombre d'enregistrements
cursor.execute("SELECT COUNT(*) FROM rasff_notifications")
count = cursor.fetchone()[0]
st.metric("Nombre total d'enregistrements", count)
if count == 0:
st.warning("⚠️ La table est vide.")
conn.close()
@st.cache_data(ttl=cache_ttl)
def get_clean_dataframe():
"""Récupère et nettoie les données de la base SQLite affichée dans Débogage"""
st.cache_data.clear() # Efface le cache pour forcer le rechargement
# Vérifier si le fichier existe
if not os.path.exists(DB_PATH):
st.error("⚠️ Base de données non trouvée.")
return pd.DataFrame()
# Connexion à la base détectée dans Débogage
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Vérifier si la table existe
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='rasff_notifications';")
table_exists = cursor.fetchone()
if not table_exists:
st.error("⚠️ La table 'rasff_notifications' n'existe pas dans la base de données.")
return pd.DataFrame()
# Charger les données de 2024 et 2025
df = pd.read_sql("SELECT * FROM rasff_notifications WHERE strftime('%Y', date) IN ('2024', '2025')", conn)
conn.close()
if df.empty:
st.warning("⚠️ Aucune donnée disponible pour 2024 et 2025.")
return pd.DataFrame()
# Corriger le format des dates
if 'date' in df.columns:
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date"])
else:
st.error("⚠️ La colonne 'date' est absente.")
return pd.DataFrame()
st.write("📅 Années présentes dans le DataFrame après chargement :", df["date"].dt.year.unique())
return df
def create_interactive_table(df):
"""Crée une table interactive avec AgGrid"""
if df.empty:
st.warning("⚠️ Aucune donnée à afficher.")
return
# Formater la date pour l'affichage
display_df = df.copy()
display_df["date"] = display_df["date"].dt.strftime("%Y-%m-%d")
gb = GridOptionsBuilder.from_dataframe(display_df)
gb.configure_pagination(paginationAutoPageSize=False, paginationPageSize=15)
gb.configure_column("date", headerName="Date", type=["dateColumnFilter"], sortable=True)
gb.configure_column("reference", headerName="Référence", sortable=True, filter=True)
gb.configure_column("notifying_country", headerName="Pays notifiant", sortable=True, filter=True)
gb.configure_column("origin", headerName="Pays d'origine", sortable=True, filter=True)
gb.configure_column("category", headerName="Catégorie", sortable=True, filter=True)
gb.configure_column("subject", headerName="Produit", sortable=True, filter=True)
gb.configure_column("hazards", headerName="Substance", sortable=True, filter=True)
gb.configure_column("classification", headerName="Type de danger", sortable=True, filter=True)
gb.configure_column("risk_decision", headerName="Décision de risque", sortable=True, filter=True)
gb.configure_selection('single')
gridOptions = gb.build()
grid_response = AgGrid(
display_df,
gridOptions=gridOptions,
data_return_mode='AS_INPUT',
update_mode='MODEL_CHANGED',
fit_columns_on_grid_load=False,
theme='streamlit',
enable_enterprise_modules=False,
height=400,
width='100%',
reload_data=False
)
return grid_response
def create_monthly_chart(df, selected_year):
"""Crée un graphique interactif des alertes par mois"""
if df.empty:
return None
# Filter by year
year_df = df[df["date"].dt.year == selected_year]
if year_df.empty:
return None
# Count alerts by month
monthly_counts = year_df["date"].dt.month.value_counts().sort_index()
month_names = {
1: "Janvier", 2: "Février", 3: "Mars", 4: "Avril", 5: "Mai", 6: "Juin",
7: "Juillet", 8: "Août", 9: "Septembre", 10: "Octobre", 11: "Novembre", 12: "Décembre"
}
monthly_data = pd.DataFrame({
'Mois': [month_names[m] for m in monthly_counts.index],
'Nombre': monthly_counts.values,
'Mois_num': monthly_counts.index # Pour trier correctement
})
# Trier par numéro de mois
monthly_data = monthly_data.sort_values('Mois_num')
# Create bar chart with Plotly
fig = px.bar(
monthly_data,
x='Mois',
y='Nombre',
title=f"Alertes par mois en {selected_year}",
labels={'Nombre': "Nombre d'alertes"},
color='Nombre',
color_continuous_scale='Blues'
)
fig.update_layout(
xaxis_title="Mois",
yaxis_title="Nombre d'alertes",
height=400,
template="plotly_white"
)
return fig
def create_country_chart(df, selected_year):
"""Crée un graphique des alertes par pays d'origine"""
if df.empty:
return None
# Filter by year
year_df = df[df["date"].dt.year == selected_year]
if year_df.empty:
return None
# Get top 10 countries - filtrer les valeurs nulles
top_countries = year_df["origin"].dropna().value_counts().nlargest(10)
country_data = pd.DataFrame({
'Pays': top_countries.index,
'Nombre': top_countries.values
})
# Create horizontal bar chart with Plotly
fig = px.bar(
country_data,
y='Pays',
x='Nombre',
title=f"Top 10 pays d'origine des alertes en {selected_year}",
orientation='h',
color='Nombre',
color_continuous_scale='Reds'
)
fig.update_layout(
yaxis=dict(autorange="reversed"),
height=400,
template="plotly_white"
)
return fig
def create_category_chart(df, selected_year):
"""Crée un graphique des alertes par catégorie de produit"""
if df.empty:
return None
# Filter by year
year_df = df[df["date"].dt.year == selected_year]
if year_df.empty:
return None
# Get top 10 categories - filtrer les valeurs nulles
top_categories = year_df["category"].dropna().value_counts().nlargest(10)
category_data = pd.DataFrame({
'Catégorie': top_categories.index,
'Nombre': top_categories.values
})
# Create pie chart with Plotly
fig = px.pie(
category_data,
names='Catégorie',
values='Nombre',
title=f"Répartition par catégorie de produit en {selected_year}",
hole=0.4
)
fig.update_layout(
height=400,
template="plotly_white"
)
return fig
def create_hazard_chart(df, selected_year):
"""Crée un graphique des alertes par type de danger"""
if df.empty:
return None
# Filter by year
year_df = df[df["date"].dt.year == selected_year]
if year_df.empty:
return None
# Get hazard categories - filtrer les valeurs nulles
hazard_counts = year_df["classification"].dropna().value_counts()
hazard_data = pd.DataFrame({
'Type': hazard_counts.index,
'Nombre': hazard_counts.values
})
# Create treemap with Plotly
fig = px.treemap(
hazard_data,
path=['Type'],
values='Nombre',
title=f"Types de dangers signalés en {selected_year}",
color='Nombre',
color_continuous_scale='Greens'
)
fig.update_layout(
height=400,
template="plotly_white"
)
return fig
def create_risk_decision_chart(df, selected_year):
"""Crée un graphique des alertes par décision de risque"""
if df.empty:
return None
# Filter by year
year_df = df[df["date"].dt.year == selected_year]
if year_df.empty:
return None
# Get risk decisions - filtrer les valeurs nulles
risk_counts = year_df["risk_decision"].dropna().value_counts()
risk_data = pd.DataFrame({
'Décision': risk_counts.index,
'Nombre': risk_counts.values
})
# Create bar chart with Plotly
fig = px.bar(
risk_data,
x='Décision',
y='Nombre',
title=f"Décisions de risque en {selected_year}",
color='Nombre',
color_continuous_scale='Oranges'
)
fig.update_layout(
xaxis_title="Décision de risque",
yaxis_title="Nombre d'alertes",
height=400,
template="plotly_white"
)
return fig
# === INTERFACE STREAMLIT ===
# Sidebar
with st.sidebar:
st.image("https://riskplaza.com/wp-content/uploads/2017/10/Riskplaza-RASFF-portal.png", width=200)
st.markdown("## 🔍 À propos")
st.markdown("""
Cette application permet de suivre les alertes RASFF (Rapid Alert System for Food and Feed)
de l'Union Européenne concernant la sécurité alimentaire.
Les données sont téléchargées depuis le site de diffusion officiel.
""")
# Bouton pour télécharger la base de données
if st.button("Télécharger la base de données"):
download_db()
create_database() # Créer la table après le téléchargement
# Bouton pour forcer la mise à jour de la base
if st.button("🔄 Forcer la mise à jour de la base"):
download_db()
create_database()
st.cache_data.clear()
st.session_state['reloaded'] = True
st.stop() # Arrête l'exécution pour déclencher un reload automatique
# Afficher le chemin et la taille de la base de données
st.write(f"Chemin de la base de données : {DB_PATH}")
if os.path.exists(DB_PATH):
st.write(f"Taille de la base de données : {os.path.getsize(DB_PATH) / 1024:.2f} KB")
# Vérifier les années disponibles dans la base
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT strftime('%Y', date) FROM rasff_notifications")
years_in_db = cursor.fetchall()
conn.close()
st.write(f"Années disponibles dans la base : {[year[0] for year in years_in_db]}")
# Vérifier les tables existantes dans la base
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
conn.close()
st.write(f"Tables disponibles : {[table[0] for table in tables]}")
# Vérifier les enregistrements de 2025
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*), MIN(date), MAX(date) FROM rasff_notifications WHERE strftime('%Y', date) = '2025'")
count_2025, min_date, max_date = cursor.fetchone()
conn.close()
st.write(f"Nombre d'enregistrements pour 2025 : {count_2025}")
st.write(f"Dates disponibles pour 2025 : De {min_date} à {max_date}")
# Main content
st.markdown('<h1 class="main-header">🚨 Tableau de bord RASFF</h1>', unsafe_allow_html=True)
# Tabs for different views
tab1, tab2, tab3 = st.tabs(["📊 Dashboard", "📋 Données brutes", "🔧 Débogage"])
with tab1:
# Dashboard view
df = get_clean_dataframe()
st.cache_data.clear() # Efface le cache avant de charger les données
if not df.empty:
st.write("📊 Aperçu des données chargées :")
st.write(df.head())
col1, col2, col3, col4 = st.columns(4)
with col1:
total_alerts = len(df)
st.metric("Total des alertes", f"{total_alerts:,}")
with col2:
current_year_alerts = len(df[df["date"].dt.year == CURRENT_YEAR]) if 'date' in df.columns else 0
last_year_alerts = len(df[df["date"].dt.year == CURRENT_YEAR - 1])
delta = current_year_alerts - last_year_alerts
delta_percent = (delta / last_year_alerts * 100) if last_year_alerts > 0 else 0
st.metric(f"Alertes {CURRENT_YEAR}", f"{current_year_alerts:,}", f"{delta_percent:.1f}%")
with col3:
current_month = datetime.datetime.now().month
current_month_alerts = len(df[(df["date"].dt.year == CURRENT_YEAR) & (df["date"].dt.month == current_month)])
st.metric(f"Alertes du mois", f"{current_month_alerts:,}")
with col4:
countries_count = df["origin"].dropna().nunique()
st.metric("Pays concernés", f"{countries_count:,}")
# Year selector
years = sorted(df["date"].dt.year.unique())
if years: # Vérifier que years n'est pas vide
selected_year = st.selectbox("📅 Sélectionnez une année", years, index=len(years) - 1)
# Charts row 1
col1, col2 = st.columns(2)
with col1:
monthly_chart = create_monthly_chart(df, selected_year)
if monthly_chart:
st.plotly_chart(monthly_chart, use_container_width=True)
else:
st.info(f"Pas de données disponibles pour {selected_year}")
with col2:
country_chart = create_country_chart(df, selected_year)
if country_chart:
st.plotly_chart(country_chart, use_container_width=True)
else:
st.info(f"Pas de données disponibles pour {selected_year}")
# Charts row 2
col1, col2 = st.columns(2)
with col1:
category_chart = create_category_chart(df, selected_year)
if category_chart:
st.plotly_chart(category_chart, use_container_width=True)
else:
st.info(f"Pas de données disponibles pour {selected_year}")
with col2:
# Nouveau graphique pour risk_decision
risk_chart = create_risk_decision_chart(df, selected_year)
if risk_chart:
st.plotly_chart(risk_chart, use_container_width=True)
else:
st.info(f"Pas de données disponibles pour {selected_year}")
# Troisième ligne de graphiques
col1, col2 = st.columns(2)
with col1:
hazard_chart = create_hazard_chart(df, selected_year)
if hazard_chart:
st.plotly_chart(hazard_chart, use_container_width=True)
else:
st.info(f"Pas de données disponibles pour {selected_year}")
else:
st.warning("⚠️ Aucune année trouvée dans les données.")
else:
st.warning("⚠️ Aucune donnée en base. Veuillez mettre à jour la base de données.")
st.info("👈 Utilisez le menu latéral pour mettre à jour les données.")
with tab2:
# Raw data view
st.markdown('<h2 class="sub-header">📋 Données brutes</h2>', unsafe_allow_html=True)
df = get_clean_dataframe()
st.cache_data.clear() # Forcer le rechargement des données
if not df.empty:
# Filtres interactifs
col1, col2, col3 = st.columns(3)
with col1:
years = sorted(df["date"].dt.year.unique())
if years: # Vérifier que years n'est pas vide
selected_year = st.selectbox("📅 Année", years, index=len(years) - 1, key="year_filter")
# Apply year filter
df_filtered = df[df["date"].dt.year == selected_year]
else:
st.warning("⚠️ Aucune année trouvée dans les données.")
df_filtered = df
with col2:
# Filtrer les valeurs nulles
countries = ["Tous"] + sorted(df_filtered["origin"].dropna().unique())
selected_country = st.selectbox("🌍 Pays d'origine", countries, key="country_filter")
# Apply country filter
if selected_country != "Tous":
df_filtered = df_filtered[df_filtered["origin"] == selected_country]
with col3:
# Filtrer les valeurs nulles
categories = ["Tous"] + sorted(df_filtered["category"].dropna().unique())
selected_category = st.selectbox("📦 Catégorie de produit", categories, key="category_filter")
# Apply category filter
if selected_category != "Tous":
df_filtered = df_filtered[df_filtered["category"] == selected_category]
# Show interactive table
grid_response = create_interactive_table(df_filtered)
# Export options
col1, col2 = st.columns(2)
with col1:
if st.button("📥 Exporter en CSV"):
csv = df_filtered.to_csv(index=False).encode('utf-8')
st.download_button(
label="📥 Télécharger CSV",
data=csv,
file_name=f"rasff_data_{selected_year if 'selected_year' in locals() else 'all'}.csv",
mime="text/csv",
)
with col2:
if st.button("📥 Exporter en Excel"):
output = BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df_filtered.to_excel(writer, sheet_name='RASFF_Data', index=False)
excel_data = output.getvalue()
st.download_button(
label="📥 Télécharger Excel",
data=excel_data,
file_name=f"rasff_data_{selected_year if 'selected_year' in locals() else 'all'}.xlsx",
mime="application/vnd.ms-excel",
)
# Vérifier les filtres actifs
st.write("Filtres actifs dans 'Données brutes':")
st.write(f"Année sélectionnée : {selected_year}")
st.write(f"Pays sélectionné : {selected_country}")
st.write(f"Catégorie sélectionnée : {selected_category}")
else:
st.warning("⚠️ Aucune donnée en base. Veuillez mettre à jour la base de données.")
st.info("👈 Utilisez le menu latéral pour mettre à jour les données.")
with tab3:
st.markdown('<h2 class="sub-header">🔧 Débogage de la base de données</h2>', unsafe_allow_html=True)
if st.button("Vérifier la structure de la base"):
check_database_structure()
if st.button("Effacer le cache"):
st.cache_data.clear()
st.success("Cache effacé avec succès! Les données seront rechargées.")
st.session_state['reloaded'] = True
st.stop() # Arrête l'exécution pour déclencher un reload automatique
# Footer
st.markdown("---")
st.markdown("Développé avec ❤️ pour la surveillance des alertes alimentaires")