MYLPOIDS / appV0.py
MMOON's picture
Rename app.py to appV0.py
c91adda verified
import streamlit as st
import pandas as pd
# numpy n'est plus utilisé -> import os # Gardé pour os.unlink
import io
import re
import base64
import os
from datetime import datetime
import tempfile
import pdfplumber
import plotly.express as px
import plotly.graph_objects as go
import logging # Ajout pour un meilleur logging d'erreurs
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration de la Page Streamlit ---
st.set_page_config(
page_title="Extracteur de Rapports de Pesées",
page_icon="⚖️",
layout="wide",
initial_sidebar_state="expanded"
)
# --- Styles CSS Personnalisés ---
st.markdown("""
<style>
.main-header { font-size: 2.5rem; color: #1E88E5; font-weight: 700; margin-bottom: 1rem; }
.sub-header { font-size: 1.5rem; color: #0D47A1; font-weight: 600; margin-top: 1.5rem; margin-bottom: 1rem; }
.card { background-color: #f9f9f9; border-radius: 10px; padding: 20px; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); margin-bottom: 1rem; }
.highlight { background-color: #e3f2fd; padding: 10px; border-radius: 5px; margin: 10px 0; }
.success-message { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 5px; margin: 10px 0; }
.info-text { color: #555; font-size: 0.9rem; }
.metric-card { background-color: #f0f8ff; border-left: 4px solid #1E88E5; padding: 15px; margin: 10px 0; border-radius: 5px; }
</style>
""", unsafe_allow_html=True)
# --- Constantes (Regex et autres) ---
# Regex pour extraire les informations clés
REGEX_PRODUCT_NAME = r'^([A-Z0-9\s]+)' # Début de ligne, majuscules, chiffres, espaces
REGEX_DLUO = r'DLUO\s+(\d{2}\.\d{2}\.\d{2})'
REGEX_DATETIME = r'(\d{2}/\d{2}/\d{4}\s+-\s+\d{2}:\d{2})'
REGEX_WEIGHTS = r'(\d+)\s*\.{5,}\s*([\d,\.]+)\s*g' # Au moins 5 points pour séparer
REGEX_MEAN = r'x̄\s+\d+[,\.]?\d*\s*%\s+([\d,\.]+)\s*g'
REGEX_STD = r's\s+\d+[,\.]?\d*\s*%\s+([\d,\.]+)\s*g'
REGEX_MIN = r'Min\s+\d+[,\.]?\d*\s*%\s+([\d,\.]+)\s*g'
REGEX_MAX = r'Max\s+\d+[,\.]?\d*\s*%\s+([\d,\.]+)\s*g'
REGEX_NOMINAL = r'Nominal\s+([\d,\.]+)\s*g'
REGEX_REPORT_ID = r'([A-Z0-9]+)\s*/\s*\d{2}/\d{2}/\d{4}' # ID avant la date
DEFAULT_REPORT_ID_PREFIX = "ID_Unknown_"
# --- Fonctions Utilitaires ---
def safe_extract_group(match, group_index=1, default=None):
"""Extrait un groupe d'une correspondance regex en toute sécurité."""
if match:
try:
return match.group(group_index).strip()
except IndexError:
return default
return default
def safe_convert_float(value_str, default=None):
"""Convertit une chaîne en float en toute sécurité (gère ',', '.', None)."""
if value_str is None:
return default
try:
# Remplace la virgule par un point pour la conversion
return float(value_str.replace(',', '.'))
except (ValueError, AttributeError):
return default
def extract_data_from_pdf(pdf_file_path):
"""
Extrait les données structurées d'un fichier PDF de rapport de pesée.
Args:
pdf_file_path (str): Chemin vers le fichier PDF temporaire.
Returns:
dict: Un dictionnaire contenant:
- success (bool): True si l'extraction réussit, False sinon.
- data (pd.DataFrame): DataFrame des pesées individuelles (si succès).
- metadata (dict): Métadonnées extraites du rapport (si succès).
- error (str): Message d'erreur (si échec).
"""
try:
text_content = ""
with pdfplumber.open(pdf_file_path) as pdf:
# Vérification si le PDF contient des pages
if not pdf.pages:
return {"success": False, "error": "Le fichier PDF ne contient aucune page."}
# Robustesse: Vérifie si extract_text retourne du texte
for page in pdf.pages:
text = page.extract_text()
if text: # Ne concatène que si du texte est extrait
text_content += text + "\n"
else:
logging.warning(f"Aucun texte extrait de la page {page.page_number} du fichier {os.path.basename(pdf_file_path)}")
if not text_content:
return {"success": False, "error": "Aucun contenu textuel n'a pu être extrait du PDF."}
# --- Extraction des Métadonnées ---
product_name = safe_extract_group(re.search(REGEX_PRODUCT_NAME, text_content, re.MULTILINE), default="Non trouvé")
dluo = safe_extract_group(re.search(REGEX_DLUO, text_content), default="Non trouvé")
date_time = safe_extract_group(re.search(REGEX_DATETIME, text_content), default="Non trouvé")
report_id = safe_extract_group(re.search(REGEX_REPORT_ID, text_content), default=f"{DEFAULT_REPORT_ID_PREFIX}{datetime.now().strftime('%Y%m%d%H%M%S')}")
# --- Extraction des Statistiques ---
nominal_weight_str = safe_extract_group(re.search(REGEX_NOMINAL, text_content))
mean_weight_str = safe_extract_group(re.search(REGEX_MEAN, text_content))
std_weight_str = safe_extract_group(re.search(REGEX_STD, text_content))
min_weight_str = safe_extract_group(re.search(REGEX_MIN, text_content))
max_weight_str = safe_extract_group(re.search(REGEX_MAX, text_content))
# Conversion sécurisée en float
nominal_weight = safe_convert_float(nominal_weight_str)
mean_weight = safe_convert_float(mean_weight_str)
std_weight = safe_convert_float(std_weight_str)
min_weight = safe_convert_float(min_weight_str)
max_weight = safe_convert_float(max_weight_str)
# --- Extraction des Pesées Individuelles ---
weights_matches = re.findall(REGEX_WEIGHTS, text_content)
weights_data = []
for match in weights_matches:
try:
sample_num = int(match[0])
weight = safe_convert_float(match[1])
if weight is not None: # N'ajoute que si le poids est valide
weights_data.append({"Échantillon": sample_num, "Poids (g)": weight})
except (ValueError, IndexError):
logging.warning(f"Impossible de traiter la ligne de poids : {match}")
continue # Passe à la ligne suivante
if not weights_data:
return {
"success": False,
"error": "Aucune donnée de pesée individuelle valide trouvée dans le PDF."
}
# --- Création du DataFrame ---
weights_df = pd.DataFrame(weights_data)
# Ajout des métadonnées à chaque ligne
weights_df["Produit"] = product_name
weights_df["DLUO"] = dluo
weights_df["Date_Heure"] = date_time
weights_df["ID_Rapport"] = report_id
weights_df["Poids_Nominal"] = nominal_weight
weights_df["Poids_Moyen_Rapport"] = mean_weight # Renommé pour clarté
weights_df["Ecart_Type_Rapport"] = std_weight # Renommé pour clarté
weights_df["Poids_Min_Rapport"] = min_weight # Renommé pour clarté
weights_df["Poids_Max_Rapport"] = max_weight # Renommé pour clarté
# --- Calculs Dérivés ---
# Correction: Utilisation de 'is not None' pour vérifier l'existence du poids nominal
if nominal_weight is not None and nominal_weight != 0:
weights_df["Déviation_g"] = weights_df["Poids (g)"] - nominal_weight
weights_df["Déviation_%"] = (weights_df["Déviation_g"] / nominal_weight) * 100
else:
# Si pas de poids nominal, on ne peut pas calculer la déviation
weights_df["Déviation_g"] = None
weights_df["Déviation_%"] = None
metadata = {
"product_name": product_name, "dluo": dluo, "date_time": date_time,
"report_id": report_id, "nominal_weight": nominal_weight,
"mean_weight": mean_weight, "std_weight": std_weight,
"min_weight": min_weight, "max_weight": max_weight,
"num_samples": len(weights_df)
}
return {"success": True, "data": weights_df, "metadata": metadata}
except pdfplumber.pdfa.PDFSyntaxError:
logging.error(f"Erreur de syntaxe PDF dans le fichier {os.path.basename(pdf_file_path)}.")
return {"success": False, "error": f"Erreur de syntaxe PDF dans le fichier {os.path.basename(pdf_file_path)}. Le fichier est peut-être corrompu."}
except Exception as e:
logging.error(f"Erreur inattendue lors de l'extraction du PDF {os.path.basename(pdf_file_path)}: {e}", exc_info=True)
return {"success": False, "error": f"Erreur inattendue lors de l'extraction: {str(e)}"}
def get_download_link(df, filename, link_text):
"""
Génère un lien de téléchargement HTML pour un DataFrame (CSV ou Excel).
Args:
df (pd.DataFrame): Le DataFrame à télécharger.
filename (str): Le nom du fichier (incluant .csv ou .xlsx).
link_text (str): Le texte à afficher pour le lien.
Returns:
str: La balise HTML <a> pour le téléchargement, ou une chaîne vide si erreur.
"""
if not isinstance(df, pd.DataFrame) or df.empty:
return "" # Ne génère pas de lien si le DataFrame est vide
try:
if filename.lower().endswith('.csv'):
data = df.to_csv(index=False, encoding='utf-8') # Spécifier l'encodage
mime_type = 'text/csv'
b64 = base64.b64encode(data.encode('utf-8')).decode('utf-8') # Encoder en bytes avant b64
elif filename.lower().endswith('.xlsx'):
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
df.to_excel(writer, index=False, sheet_name='Données')
data_bytes = buffer.getvalue()
mime_type = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
b64 = base64.b64encode(data_bytes).decode('utf-8')
else:
st.error("Format de fichier non supporté pour le téléchargement.")
return ""
href = f'<a href="data:{mime_type};base64,{b64}" download="{filename}">{link_text}</a>'
return href
except Exception as e:
st.error(f"Erreur lors de la génération du lien de téléchargement pour {filename}: {str(e)}")
logging.error(f"Erreur get_download_link pour {filename}: {e}", exc_info=True)
return ""
# --- Initialisation de Session State ---
# Utilisation de clés descriptives
if 'app_data' not in st.session_state:
st.session_state.app_data = {
"all_data": pd.DataFrame(), # Données historiques + nouvelles données
"current_batch_data": pd.DataFrame(), # Données du dernier lot traité
"last_extraction_summary": [] # Résumé des derniers fichiers traités
}
# --- Interface Utilisateur (Sidebar & Pages) ---
st.sidebar.markdown('<div class="main-header">⚖️ Extracteur de Rapports</div>', unsafe_allow_html=True)
page = st.sidebar.radio("Navigation", ["Extraction des PDF", "Analyse des Données"])
# =======================================
# PAGE 1: EXTRACTION DES PDF
# =======================================
if page == "Extraction des PDF":
st.markdown('<div class="main-header">Extraction des Données de Rapports PDF</div>', unsafe_allow_html=True)
st.markdown("""
<div class="card">
<p>Chargez un ou plusieurs fichiers PDF contenant des rapports de pesées. L'application tentera d'extraire automatiquement les informations pertinentes.</p>
<p class="info-text">Assurez-vous que les PDF sont basés sur du texte (pas des images scannées sans OCR) et suivent un format cohérent pour de meilleurs résultats.</p>
</div>
""", unsafe_allow_html=True)
# --- Section de Chargement ---
st.markdown('<div class="sub-header">📤 Chargement des Rapports PDF</div>', unsafe_allow_html=True)
uploaded_files = st.file_uploader(
"Sélectionnez un ou plusieurs rapports PDF",
type="pdf",
accept_multiple_files=True,
help="Vous pouvez glisser-déposer des fichiers ici."
)
process_button = st.button("Lancer l'Extraction", type="primary", disabled=not uploaded_files)
# --- Traitement de l'Extraction ---
if uploaded_files and process_button:
progress_bar = st.progress(0)
status_text = st.empty()
status_text.info("⏳ Démarrage du traitement...")
extracted_data_list = []
extraction_summary = []
# Initialise processed_ids avec les IDs existants ou un set vide
if "ID_Rapport" in st.session_state.app_data["all_data"].columns:
processed_ids = set(st.session_state.app_data["all_data"]["ID_Rapport"].unique())
else:
processed_ids = set()
newly_added_data = []
for i, uploaded_file in enumerate(uploaded_files):
file_name = uploaded_file.name
progress = (i + 1) / len(uploaded_files)
status_text.info(f"Traitement du fichier {i+1}/{len(uploaded_files)}: {file_name}")
tmp_path = None # Initialisation pour le bloc finally
# Utilisation de fichier temporaire pour pdfplumber
try:
# Crée un fichier temporaire sécurisé
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(uploaded_file.getvalue())
tmp_path = tmp_file.name
# Extraction (le fichier est fermé après le 'with')
result = extract_data_from_pdf(tmp_path)
except Exception as e:
# Erreur lors de la création/écriture du fichier temporaire
result = {"success": False, "error": f"Erreur de gestion de fichier: {str(e)}"}
logging.error(f"Erreur de fichier temporaire pour {file_name}: {e}", exc_info=True)
finally:
# Suppression du fichier temporaire s'il a été créé
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception as e_unlink:
logging.warning(f"Impossible de supprimer le fichier temporaire {tmp_path}: {e_unlink}")
# Traitement du résultat
if result["success"]:
report_id = result["metadata"]["report_id"]
# Vérification simple pour éviter les doublons basés sur l'ID
# Aussi, ne pas considérer les ID générés comme des doublons fiables
if not report_id.startswith(DEFAULT_REPORT_ID_PREFIX) and report_id in processed_ids:
extraction_summary.append({
"filename": file_name, "status": "ℹ️ Info",
"metadata": result["metadata"], "outcome": "Doublon (ID déjà présent)"
})
else:
extracted_data_list.append(result["data"])
newly_added_data.append(result["data"]) # Garder trace de ce qui est nouveau dans ce batch
if not report_id.startswith(DEFAULT_REPORT_ID_PREFIX):
processed_ids.add(report_id) # Ajouter l'ID aux IDs traités s'il est valide
status_msg = "✅ Succès" if not report_id.startswith(DEFAULT_REPORT_ID_PREFIX) else "⚠️ Attention"
outcome_msg = "Ajouté" if not report_id.startswith(DEFAULT_REPORT_ID_PREFIX) else "Ajouté (ID non trouvé/généré)"
extraction_summary.append({
"filename": file_name, "status": status_msg,
"metadata": result["metadata"], "outcome": outcome_msg
})
else:
extraction_summary.append({
"filename": file_name, "status": "❌ Échec",
"error": result["error"], "outcome": "Non ajouté"
})
progress_bar.progress(progress)
status_text.success("Traitement terminé !")
st.session_state.app_data["last_extraction_summary"] = extraction_summary
# --- Mise à Jour et Affichage des Résultats ---
if newly_added_data: # Si de NOUVELLES données ont été ajoutées
current_batch_df = pd.concat(newly_added_data, ignore_index=True)
st.session_state.app_data["current_batch_data"] = current_batch_df
# Mise à jour des données globales
if not st.session_state.app_data["all_data"].empty:
st.session_state.app_data["all_data"] = pd.concat(
[st.session_state.app_data["all_data"], current_batch_df],
ignore_index=True
)
# Optionnel: Supprimer les doublons stricts (même ID et même échantillon)
# Ceci est une sécurité, la logique précédente devrait déjà éviter les doublons de rapport
if "ID_Rapport" in st.session_state.app_data["all_data"].columns and "Échantillon" in st.session_state.app_data["all_data"].columns:
st.session_state.app_data["all_data"].drop_duplicates(subset=["ID_Rapport", "Échantillon"], keep='last', inplace=True)
else:
st.session_state.app_data["all_data"] = current_batch_df
st.markdown('<div class="success-message">✅ Extraction terminée. De nouvelles données ont été ajoutées.</div>', unsafe_allow_html=True)
elif extraction_summary: # Si on a traité des fichiers mais rien ajouté (doublons, erreurs)
st.info("Traitement terminé. Aucune nouvelle donnée unique n'a été ajoutée (vérifiez le résumé ci-dessous).")
st.session_state.app_data["current_batch_data"] = pd.DataFrame() # Vider le batch courant
else: # Normalement impossible si process_button était cliqué avec des fichiers
st.error("Aucun fichier n'a été traité.")
# Afficher le résumé des rapports traités
st.markdown('<div class="sub-header">📊 Résumé des Rapports Traités</div>', unsafe_allow_html=True)
for result in extraction_summary:
color = "green" if "Succès" in result["status"] else "orange" if "Info" in result["status"] or "Attention" in result["status"] else "red"
icon = result["status"].split(" ")[0] # Prend l'emoji
status_text_display = result['status'].split(" ", 1)[1] # Prend le texte après l'emoji
with st.expander(f"{icon} {result['filename']} - {status_text_display} - ({result['outcome']})", expanded=(color != "green")):
if "metadata" in result:
metadata = result["metadata"]
nom_w = metadata.get('nominal_weight', 'N/A')
nom_w_display = f"{nom_w} g" if isinstance(nom_w, (int, float)) else nom_w
mean_w = metadata.get('mean_weight', 'N/A')
mean_w_display = f"{mean_w} g" if isinstance(mean_w, (int, float)) else mean_w
std_w = metadata.get('std_weight', 'N/A')
std_w_display = f"{std_w} g" if isinstance(std_w, (int, float)) else std_w
min_w = metadata.get('min_weight', 'N/A')
min_w_display = f"{min_w} g" if isinstance(min_w, (int, float)) else min_w
max_w = metadata.get('max_weight', 'N/A')
max_w_display = f"{max_w} g" if isinstance(max_w, (int, float)) else max_w
st.markdown(f"""
<div class="highlight">
<strong>Produit:</strong> {metadata.get('product_name', 'N/A')}<br>
<strong>DLUO:</strong> {metadata.get('dluo', 'N/A')}<br>
<strong>Date/Heure:</strong> {metadata.get('date_time', 'N/A')}<br>
<strong>ID Rapport:</strong> {metadata.get('report_id', 'N/A')}<br>
<strong>Poids Nominal:</strong> {nom_w_display}<br>
<strong>Stats Rapport (Moy/Std/Min/Max):</strong>
{mean_w_display} / {std_w_display} /
{min_w_display} / {max_w_display}<br>
<strong>Échantillons Extraits:</strong> {metadata.get('num_samples', 'N/A')}
</div>
""", unsafe_allow_html=True)
if "error" in result:
st.error(f"Détail de l'erreur : {result['error']}")
# --- Visualisation du Dernier Batch ---
current_data = st.session_state.app_data["current_batch_data"]
if not current_data.empty:
st.markdown('<div class="sub-header">📈 Visualisation du Dernier Batch Ajouté</div>', unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
fig_hist = px.histogram(
current_data, x="Poids (g)", color="Produit", marginal="box",
title="Distribution des Poids (Dernier Batch)", template="plotly_white"
)
fig_hist.update_layout(xaxis_title="Poids (g)", yaxis_title="Nombre d'échantillons", legend_title="Produit")
st.plotly_chart(fig_hist, use_container_width=True)
with col2:
if "Déviation_%" in current_data.columns and not current_data["Déviation_%"].isnull().all():
fig_dev = px.scatter(
current_data, x="Échantillon", y="Déviation_%", color="Produit",
title="Déviation / Nominal (%) (Dernier Batch)", template="plotly_white",
labels={"Déviation_%": "Déviation (%)", "Échantillon": "Numéro d'Échantillon"}
)
# Ajoute une ligne à zéro pour référence
fig_dev.add_hline(y=0, line_dash="dash", line_color="red", annotation_text="Nominal")
st.plotly_chart(fig_dev, use_container_width=True)
else:
st.info("Le calcul de la déviation (%) nécessite un poids nominal valide.")
# Afficher le tableau des données du dernier batch
st.markdown('<div class="sub-header">📋 Données du Dernier Batch Ajouté</div>', unsafe_allow_html=True)
st.dataframe(current_data, use_container_width=True)
# Liens de téléchargement pour le dernier batch
st.markdown('<div class="sub-header">💾 Téléchargement du Dernier Batch</div>', unsafe_allow_html=True)
col_dl1, col_dl2 = st.columns(2)
with col_dl1:
csv_link = get_download_link(current_data, f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", "📥 Télécharger en CSV")
st.markdown(csv_link, unsafe_allow_html=True)
with col_dl2:
excel_link = get_download_link(current_data, f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx", "📥 Télécharger en Excel")
st.markdown(excel_link, unsafe_allow_html=True)
# =======================================
# PAGE 2: ANALYSE DES DONNÉES
# =======================================
elif page == "Analyse des Données":
st.markdown('<div class="main-header">Analyse des Données Historiques</div>', unsafe_allow_html=True)
# --- Chargement Optionnel de Données Historiques ---
if st.session_state.app_data["all_data"].empty:
st.markdown("""
<div class="card">
<p>Aucune donnée disponible. Vous pouvez :</p>
<ul>
<li>Retourner à la page <b>Extraction des PDF</b> pour charger des rapports.</li>
<li>Charger un fichier CSV ou Excel contenant des données précédemment extraites.</li>
</ul>
</div>
""", unsafe_allow_html=True)
st.markdown('<div class="sub-header">📤 Chargement Optionnel de Données Historiques</div>', unsafe_allow_html=True)
uploaded_hist_file = st.file_uploader(
"Chargez un fichier CSV ou Excel",
type=["csv", "xlsx"],
help="Le fichier doit contenir les colonnes attendues (Produit, Poids (g), Date_Heure, etc.)"
)
if uploaded_hist_file:
try:
if uploaded_hist_file.name.lower().endswith('.csv'):
hist_data = pd.read_csv(uploaded_hist_file)
else:
hist_data = pd.read_excel(uploaded_hist_file)
# Validation simple des colonnes essentielles
required_cols = ["Produit", "Poids (g)", "Date_Heure"]
if all(col in hist_data.columns for col in required_cols):
st.session_state.app_data["all_data"] = hist_data
# Vider le 'dernier batch' car on a chargé l'historique
st.session_state.app_data["current_batch_data"] = pd.DataFrame()
st.session_state.app_data["last_extraction_summary"] = []
st.success("Données historiques chargées avec succès!")
# Correction: Utilisation de st.rerun() au lieu de st.experimental_rerun()
st.rerun()
else:
missing_cols = [col for col in required_cols if col not in hist_data.columns]
st.error(f"Le fichier chargé ne contient pas les colonnes requises: {', '.join(missing_cols)}")
except Exception as e:
st.error(f"Erreur lors du chargement ou de la lecture du fichier: {str(e)}")
logging.error(f"Erreur chargement historique: {e}", exc_info=True)
# --- Affichage et Analyse si des Données existent ---
if not st.session_state.app_data["all_data"].empty:
all_data = st.session_state.app_data["all_data"].copy() # Travailler sur une copie
# Tentative de conversion de la colonne Date_Heure (robuste)
if "Date_Heure" in all_data.columns:
try:
# Extrait la partie date avant le " - " et gère les erreurs
date_part = all_data["Date_Heure"].astype(str).str.split(" - ", n=1, expand=True)[0]
# Convertit en datetime, mettant NaT si le format est incorrect
all_data["Date"] = pd.to_datetime(date_part, format="%d/%m/%Y", errors='coerce').dt.date
if all_data["Date"].isnull().any():
st.warning("Certaines valeurs dans 'Date_Heure' n'ont pas pu être converties au format JJ/MM/AAAA. Ces lignes pourraient être exclues du filtrage par date.")
except Exception as e:
st.error(f"Erreur lors de la tentative de conversion de 'Date_Heure' en dates: {e}")
if "Date" not in all_data.columns: # Crée la colonne même si erreur
all_data["Date"] = None
else:
st.warning("La colonne 'Date_Heure' est manquante. Le filtrage par date ne sera pas disponible.")
all_data["Date"] = None
# --- Métriques Clés ---
st.markdown('<div class="sub-header">📊 Métriques Clés (Globales)</div>', unsafe_allow_html=True)
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Nb. Échantillons Total", len(all_data))
with col2:
st.metric("Nb. Produits Uniques", all_data["Produit"].nunique())
with col3:
# Compte les sessions uniques basées sur l'ID de rapport si disponible, sinon Date_Heure
unique_sessions = all_data["ID_Rapport"].nunique() if "ID_Rapport" in all_data.columns and all_data["ID_Rapport"].notna().any() else all_data["Date_Heure"].nunique()
st.metric("Nb. Rapports/Sessions", unique_sessions)
with col4:
if "Déviation_%" in all_data.columns and not all_data["Déviation_%"].isnull().all():
avg_deviation = all_data["Déviation_%"].mean()
st.metric("Déviation Moyenne (%)", f"{avg_deviation:.2f}%")
else:
st.metric("Déviation Moyenne (%)", "N/A")
# --- Filtres ---
# Correction de la syntaxe : Assurez-vous que la chaîne est correctement terminée
st.markdown('<div class="sub-header">🔍 Filtres pour l\'Analyse</div>', unsafe_allow_html=True)
col_f1, col_f2 = st.columns(2)
with col_f1:
products = sorted(all_data["Produit"].unique())
# Gère le cas où il n'y a aucun produit
if products:
selected_products = st.multiselect("Filtrer par Produit", options=products, default=products)
else:
selected_products = []
st.info("Aucun produit trouvé dans les données.")
with col_f2:
# Vérifie que la colonne Date existe et contient des dates valides
if "Date" in all_data.columns and all_data["Date"].notna().any():
valid_dates = sorted(all_data["Date"].dropna().unique())
min_date = min(valid_dates)
max_date = max(valid_dates)
# Assure que min_date n'est pas postérieur à max_date
default_start = min_date
default_end = max_date
if min_date > max_date:
default_start, default_end = max_date, min_date # Swap si nécessaire (peu probable mais sûr)
selected_date_range = st.date_input(
"Filtrer par Plage de Dates",
value=(default_start, default_end),
min_value=default_start,
max_value=default_end,
key="date_filter"
)
else:
selected_date_range = None
st.info("Filtrage par date non disponible (colonne 'Date' manquante ou invalide).")
# --- Application des Filtres ---
filtered_data = all_data.copy()
if selected_products:
filtered_data = filtered_data[filtered_data["Produit"].isin(selected_products)]
# Applique le filtre de date seulement si la colonne Date existe,
# que la plage est valide et qu'elle contient des dates non-NaT
if selected_date_range and len(selected_date_range) == 2 and "Date" in filtered_data.columns and filtered_data["Date"].notna().any():
start_date, end_date = selected_date_range
# Comparaison sécurisée avec les dates (ignore les NaT)
filtered_data = filtered_data[
(filtered_data["Date"].notna()) &
(filtered_data["Date"] >= start_date) &
(filtered_data["Date"] <= end_date)
]
# --- Affichage si données filtrées ---
if not filtered_data.empty:
st.markdown(f'<p class="info-text">Affichage de {len(filtered_data)} échantillons après filtrage.</p>', unsafe_allow_html=True)
# --- Visualisations Avancées ---
st.markdown('<div class="sub-header">📊 Visualisations Avancées</div>', unsafe_allow_html=True)
tab1, tab2, tab3 = st.tabs(["📈 Distribution & Histogrammes", "⏱️ Analyse Temporelle", "⚖️ Comparaison Produits"])
with tab1:
st.markdown("#### Distribution des Poids par Produit (Boîtes à Moustaches)")
if not filtered_data.empty:
fig_box = px.box(
filtered_data, x="Produit", y="Poids (g)", color="Produit", points=False, # 'all' peut être lent
title="Distribution des Poids par Produit Filtré", template="plotly_white"
)
fig_box.update_layout(xaxis_title="Produit", yaxis_title="Poids (g)", showlegend=False)
st.plotly_chart(fig_box, use_container_width=True)
else:
st.info("Aucune donnée à afficher pour la distribution.")
st.markdown("#### Histogramme Détaillé par Produit")
unique_filtered_products = sorted(filtered_data["Produit"].unique())
if unique_filtered_products:
histo_product = st.selectbox(
"Choisir un produit pour l'histogramme détaillé",
options=unique_filtered_products,
key="histo_select"
)
if histo_product:
product_data = filtered_data[filtered_data["Produit"] == histo_product]
if not product_data.empty:
fig_hist_detail = px.histogram(
product_data, x="Poids (g)", marginal="rug", nbins=30,
title=f"Histogramme des Poids - {histo_product}", template="plotly_white"
)
# Ajout ligne nominale si disponible et valide
if "Poids_Nominal" in product_data.columns and not product_data["Poids_Nominal"].isnull().all():
nominal_val = product_data["Poids_Nominal"].dropna().iloc[0] # Prend la première valeur non nulle
if pd.notna(nominal_val):
fig_hist_detail.add_vline(
x=nominal_val, line_dash="dash", line_color="red",
annotation_text=f"Nominal: {nominal_val}g", annotation_position="top right"
)
st.plotly_chart(fig_hist_detail, use_container_width=True)
else:
st.info(f"Aucune donnée pour le produit sélectionné : {histo_product}")
else:
st.info("Aucun produit disponible pour l'histogramme après filtrage.")
with tab2:
st.markdown("#### Évolution Temporelle des Poids Moyens")
# Vérifie la présence de dates valides dans les données filtrées
if "Date" in filtered_data.columns and filtered_data["Date"].notna().any():
# Agréger par date et produit
try:
time_data = filtered_data.dropna(subset=["Date", "Produit", "Poids (g)"]).groupby(["Date", "Produit"], observed=True).agg( # observed=True pour éviter les combinaisons vides
Poids_Moyen=("Poids (g)", "mean"),
Ecart_Type=("Poids (g)", "std"),
Nb_Echantillons=("Poids (g)", "count")
).reset_index()
if not time_data.empty:
fig_line = px.line(
time_data, x="Date", y="Poids_Moyen", color="Produit",
markers=True, error_y="Ecart_Type",
title="Évolution des Poids Moyens (± Écart Type) dans le Temps",
template="plotly_white", labels={"Poids_Moyen": "Poids Moyen (g)"}
)
st.plotly_chart(fig_line, use_container_width=True)
else:
st.info("Pas assez de données valides (Date, Produit, Poids) pour l'analyse temporelle.")
except Exception as e_agg:
st.error(f"Erreur lors de l'agrégation des données temporelles: {e_agg}")
logging.error(f"Erreur agrégation temporelle: {e_agg}", exc_info=True)
time_data = pd.DataFrame() # Assurer que time_data existe mais est vide
st.markdown("#### Heatmap des Poids Moyens par Produit et Date")
if not time_data.empty and time_data['Date'].nunique() > 1 and time_data['Produit'].nunique() > 0: # Besoin d'au moins 2 dates pour une heatmap utile
try:
# Créer une matrice pivot pour la heatmap
heatmap_pivot = time_data.pivot_table(index="Produit", columns="Date", values="Poids_Moyen")
if not heatmap_pivot.empty:
fig_heatmap = px.imshow(
heatmap_pivot, aspect="auto", # Ajuste l'aspect
color_continuous_scale="RdBu_r", # Rouge = bas, Bleu = haut
title="Heatmap des Poids Moyens",
template="plotly_white"
)
# Amélioration de la taille de la heatmap
fig_heatmap.update_layout(height=max(400, len(heatmap_pivot.index) * 50 + 100)) # Hauteur dynamique + marge
fig_heatmap.update_layout(
xaxis_title="Date", yaxis_title="Produit",
coloraxis_colorbar_title="Poids Moyen (g)"
)
st.plotly_chart(fig_heatmap, use_container_width=True)
else:
st.info("Pas assez de données variées (produits/dates) pour générer une heatmap.")
except Exception as e_heatmap:
st.warning(f"Impossible de générer la heatmap: {e_heatmap}")
logging.warning(f"Erreur heatmap: {e_heatmap}", exc_info=True)
else:
st.info("Pas assez de données (dates ou produits distincts) pour générer une heatmap pertinente.")
else:
st.info("Analyse temporelle non disponible (colonne 'Date' invalide ou manquante dans les données filtrées).")
with tab3:
st.markdown("#### Comparaison des Performances Produits")
# Vérifie la présence de déviation valide
if "Déviation_%" in filtered_data.columns and filtered_data["Déviation_%"].notna().any():
st.markdown("##### Déviation Moyenne (%) par Produit")
try:
dev_stats = filtered_data.dropna(subset=["Produit", "Déviation_%"]).groupby("Produit", observed=True).agg(
Déviation_Moyenne=("Déviation_%", "mean"),
Écart_Type_Déviation=("Déviation_%", "std")
).reset_index()
if not dev_stats.empty:
fig_bar_dev = px.bar(
dev_stats, x="Produit", y="Déviation_Moyenne", error_y="Écart_Type_Déviation",
color="Produit", title="Déviation Moyenne / Nominal (%) par Produit",
template="plotly_white", labels={"Déviation_Moyenne": "Déviation Moyenne (%)"}
)
fig_bar_dev.update_layout(showlegend=False)
st.plotly_chart(fig_bar_dev, use_container_width=True)
else:
st.info("Aucune donnée de déviation valide pour le graphique à barres.")
except Exception as e_dev_bar:
st.error(f"Erreur lors du calcul des statistiques de déviation: {e_dev_bar}")
logging.error(f"Erreur barre déviation: {e_dev_bar}", exc_info=True)
dev_stats = pd.DataFrame()
# --- Radar Chart Amélioré ---
st.markdown("##### Radar Chart Comparatif")
num_unique_products = filtered_data["Produit"].nunique()
if num_unique_products > 1:
try:
# Calculer les métriques agrégées par produit, en ignorant les NaN pour chaque métrique
radar_metrics = filtered_data.groupby("Produit", observed=True).agg(
Poids_Moyen=("Poids (g)", "mean"),
Poids_Std=("Poids (g)", "std"),
Abs_Dev_Moyenne=("Déviation_%", lambda x: x.abs().mean()), # Moyenne de la valeur absolue
Dev_Std=("Déviation_%", "std")
).reset_index()
# Définir les catégories pour le radar
categories_map = {
"Poids_Moyen": "Précision Poids",
"Poids_Std": "Stabilité Poids",
"Abs_Dev_Moyenne": "Conformité Nominale",
"Dev_Std": "Consistance Conformité"
}
categories = list(categories_map.values())
# Préparer les données pour la normalisation, gérant les NaN et inversant si nécessaire
df_to_normalize = pd.DataFrame(index=radar_metrics.index)
df_to_normalize["Précision Poids"] = radar_metrics["Poids_Moyen"]
df_to_normalize["Stabilité Poids"] = -radar_metrics["Poids_Std"] # Négatif car moins = mieux
df_to_normalize["Conformité Nominale"] = -radar_metrics["Abs_Dev_Moyenne"] # Négatif car moins = mieux
df_to_normalize["Consistance Conformité"] = -radar_metrics["Dev_Std"] # Négatif car moins = mieux
# Remplacer les NaN restants *après* l'inversion (ex: std d'un seul point est NaN)
# On remplace par une valeur qui sera "mauvaise" après normalisation (la minimale)
for col in df_to_normalize.columns:
if df_to_normalize[col].isnull().any():
min_val = df_to_normalize[col].min() # Le min après inversion est la "pire" performance
df_to_normalize[col].fillna(min_val, inplace=True)
# Normalisation Min-Max (0 à 1, où 1 est le "meilleur")
normalized_df = df_to_normalize.apply(
lambda x: (x - x.min()) / (x.max() - x.min()) if (x.max() - x.min()) > 1e-9 else 0.5, # Gère division par zéro ou quasi-zéro
axis=0
)
normalized_df["Produit"] = radar_metrics["Produit"] # Rajouter le produit
# Créer la figure Radar
fig_radar = go.Figure()
theta_cats = categories + categories[:1] # Pour fermer le polygone
for i, row in normalized_df.iterrows():
values = row[categories].tolist()
values += values[:1] # Fermer le polygone
fig_radar.add_trace(go.Scatterpolar(
r=values,
theta=theta_cats,
fill='toself',
name=row["Produit"]
))
fig_radar.update_layout(
polar=dict(radialaxis=dict(visible=True, range=[0, 1])),
title="Comparaison Radar des Produits (Normalisé 0-1, plus proche de 1 = mieux)",
template="plotly_white",
legend_title="Produit"
)
st.plotly_chart(fig_radar, use_container_width=True)
except Exception as e_radar:
st.error(f"Erreur lors de la génération du radar chart: {e_radar}")
logging.error(f"Erreur radar chart: {e_radar}", exc_info=True)
else:
st.info("Au moins deux produits sont nécessaires dans les données filtrées pour générer le radar chart comparatif.")
else:
st.warning("Comparaison basée sur la déviation non disponible (colonne 'Déviation_%' manquante ou vide dans les données filtrées).")
# --- Affichage Table Données Filtrées ---
st.markdown('<div class="sub-header">📋 Données Filtrées Détaillées</div>', unsafe_allow_html=True)
# Affiche le DataFrame sans la colonne 'Date' ajoutée techniquement
st.dataframe(filtered_data.drop(columns=["Date"], errors='ignore'), use_container_width=True)
# --- Téléchargement Données Filtrées ---
st.markdown('<div class="sub-header">💾 Téléchargement des Données Filtrées</div>', unsafe_allow_html=True)
col_dl_hist1, col_dl_hist2 = st.columns(2)
# Exclut également la colonne Date du téléchargement
df_to_download = filtered_data.drop(columns=["Date"], errors='ignore')
with col_dl_hist1:
csv_link_hist = get_download_link(df_to_download, "donnees_filtrees.csv", "📥 Télécharger en CSV")
st.markdown(csv_link_hist, unsafe_allow_html=True)
with col_dl_hist2:
excel_link_hist = get_download_link(df_to_download, "donnees_filtrees.xlsx", "📥 Télécharger en Excel")
st.markdown(excel_link_hist, unsafe_allow_html=True)
elif not st.session_state.app_data["all_data"].empty: # Si on a des données globales mais rien après filtrage
st.warning("Aucune donnée ne correspond aux filtres sélectionnés.")
# Le cas où all_data est vide est géré au début de la page
# --- Pied de Page ---
st.markdown("---")
st.markdown("""
<div style="text-align: center; margin-top: 20px; color: #888; font-size: 0.9em;">
<p>⚖️ Extracteur & Analyseur de Rapports de Pesées | Version 1.2</p>
<p>Développé avec Streamlit & Plotly</p>
</div>
""", unsafe_allow_html=True)