IFSUSPENSIONS / src /streamlit_app.py
MMOON's picture
Update src/streamlit_app.py
b7295de verified
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg') # Mode non-interactif pour matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import re
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from matplotlib.backends.backend_pdf import PdfPages
from datetime import datetime
import io
import requests
# from wordcloud import WordCloud # Si utilisé
# from sklearn.feature_extraction.text import TfidfVectorizer # Si utilisé
# from sklearn.cluster import KMeans # Si utilisé
# --- Configuration de la Page Streamlit ---
st.set_page_config(
page_title="Analyseur Sécurité Alimentaire IFS",
page_icon="🛡️",
layout="wide", # Utilise toute la largeur de la page
initial_sidebar_state="expanded" # Sidebar ouverte par défaut
)
# --- Styles CSS Personnalisés (Optionnel, pour un look plus moderne) ---
# Vous pouvez ajouter du CSS ici pour personnaliser davantage l'apparence.
# Exemple : st.markdown("<style>...</style>", unsafe_allow_html=True)
# Pour l'instant, nous allons nous concentrer sur la structure.
# --- Classe IFSAnalyzer (copiez-collez votre classe ici) ---
# Assurez-vous que les méthodes de la classe sont adaptées pour Streamlit :
# - Elles devraient retourner des données (DataFrames, dicts, figures Plotly/Matplotlib)
# - La génération PDF devrait enregistrer le fichier et retourner son chemin.
class IFSAnalyzer:
def __init__(self, locked_file_io, checklist_file_io=None):
self.locked_df = None
self.checklist_df = None
self.themes_keywords_definition = {
'HYGIENE_PERSONNEL': ['hygien', 'personnel', 'clothing', 'hand', 'wash', 'uniform', 'gmp', 'locker', 'changing room', 'work clothing', 'protective clothes'],
'HACCP_CCP_OPRP': ['haccp', 'ccp', 'oprp', 'critical', 'control', 'point', 'monitoring', 'hazard analysis', 'validation haccp', '2.3.9.1'],
'TRACEABILITY': ['traceability', 'trace', 'record', 'batch', 'lot', 'identification', 'tracking', '4.18.1'],
'ALLERGEN_MANAGEMENT': ['allergen', 'allergy', 'cross-contamination', 'gluten', 'lactose', 'celery', 'mustard', 'wheat', 'egg', '4.19.2'],
'PEST_CONTROL': ['pest', 'rodent', 'insect', 'trap', 'bait', 'infestation', 'fly', 'mouse', 'rat', 'moth', 'weevil', 'spider', 'cobweb', '4.13.1', '4.13.2'],
'CLEANING_SANITATION': ['clean', 'sanitation', 'disinfect', 'chemical', 'cleaning plan', 'dirt', 'residue', '4.10.1', '4.10.2'],
'TEMPERATURE_CONTROL': ['temperature', 'cold', 'heat', 'refrigerat', 'freez', 'thaw', 'cooling'],
'MAINTENANCE_EQUIPMENT': ['maintenance', 'equipment', 'calibrat', 'repair', 'infrastructure', 'facility', 'structure', 'conveyor', '4.9.1.1', '4.16.5', '4.17.2', '4.17.4'],
'DOCUMENTATION_RECORDS': ['document', 'procedure', 'record', 'manual', 'specification', 'not documented', '5.1.1', '5.1.2', '5.3.2'],
'FOREIGN_BODY_CONTAMINATION': ['foreign body', 'foreign material', 'glass', 'metal', 'detect', 'x-ray', 'contaminat', 'wood', 'plastic', '4.12.1', '4.12.2', '4.12.3'],
'STORAGE_WAREHOUSING': ['storage', 'warehouse', 'stock', 'segregat', 'pallet', 'raw material storage', '4.14.3', '4.14.5'],
'SUPPLIER_RAW_MATERIAL_CONTROL': ['supplier', 'vendor', 'purchase', 'raw material', 'ingredient', 'packaging material', 'declaration of conformity', 'doc', '4.5.1', '4.5.2'],
'LABELLING': ['label', 'labelling', 'declaration', 'ingredient list', 'mrl', 'allergen labelling', 'nutritional information', '4.3.1', '4.3.2'],
'QUANTITY_CONTROL_WEIGHT': ['quantity control', 'weight', 'fill', 'scale', 'metrological', 'underfilling', '5.5.1', '5.5.2', '5.4.1', '5.4.2'],
'MANAGEMENT_RESPONSIBILITY_CULTURE': ['management', 'responsibilit', 'food safety culture', 'internal audit', 'corrective action', 'training', '1.1.1', '1.1.2', '1.2.1', '5.11.1', '5.11.2', '5.11.3', '5.11.4'],
'NON_PAYMENT_ADMINISTRATIVE': ['payment', 'invoice', 'pay', 'closure', 'discontinued', 'bankrupt', 'denies access', 'ceased operation', 'fire', 'merged'],
'INTEGRITY_PROGRAM_ISSUES': ['integrity', 'on-site check', 'ioc', 'unannounced audit', 'integrity on-site check', 'integrity on site check']
}
self.load_data(locked_file_io, checklist_file_io)
if self.locked_df is not None:
self.clean_lock_reasons()
def load_data(self, locked_file_io, checklist_file_io=None):
try:
self.locked_df = pd.read_csv(locked_file_io, encoding='utf-8')
# print(f"✅ Données de suspension chargées: {len(self.locked_df)} lignes initiales.")
if 'Standard' in self.locked_df.columns:
self.locked_df = self.locked_df[self.locked_df['Standard'].str.contains('IFS Food', na=False, case=False)]
# print(f"📋 Après filtrage IFS Food: {len(self.locked_df)} lignes.")
if checklist_file_io:
try:
self.checklist_df = pd.read_csv(checklist_file_io, encoding='utf-8')
# print(f"✅ Checklist IFS chargée: {len(self.checklist_df)} exigences.")
if 'Requirement Number' not in self.checklist_df.columns or \
'Requirement text (English)' not in self.checklist_df.columns:
st.warning("Colonnes 'Requirement Number' ou 'Requirement text (English)' manquantes dans la checklist. L'analyse des exigences sera limitée.")
self.checklist_df = None
except Exception as e_checklist:
st.error(f"Erreur lors du chargement du fichier checklist : {e_checklist}")
self.checklist_df = None
# else:
# print("ℹ️ Fichier checklist non fourni. L'analyse des exigences sera basée sur les numéros de chapitre uniquement.")
except Exception as e:
st.error(f"❌ Erreur lors du chargement du fichier des suspensions : {e}")
self.locked_df = None
def clean_lock_reasons(self):
if self.locked_df is None or 'Lock reason' not in self.locked_df.columns: return
self.locked_df['lock_reason_clean'] = self.locked_df['Lock reason'].astype(str).fillna('') \
.str.lower() \
.str.replace(r'[\n\r\t]', ' ', regex=True) \
.str.replace(r'[^\w\s\.\-\/\%]', ' ', regex=True) \
.str.replace(r'\s+', ' ', regex=True).str.strip()
def extract_ifs_chapters(self, text):
if pd.isna(text) or not isinstance(text, str) or text.strip() == '': return []
patterns = [
r'(?:ko|major|cl\.|req\.|requirement(?: item)?|chapter|section|point|§|cl\s+|clause)?\s*(\d\.\d{1,2}(?:\.\d{1,2})?)', # X.Y ou X.Y.Z avec préfixes
r'(\d\.\d{1,2}(?:\.\d{1,2})?)\s*(?:ko|major|:|-|\(ko\)|\(major\))', # X.Y ou X.Y.Z suivi de KO/Major
r'(\d{1,2})\s*-\s*ko', # Ex: 5.11.3 - KO (capture le numéro seul)
r'requirement\s+(\d\.\d\.\d)',
r'cl\s+(\d\.\d+(?:\.\d+)?)', # Ex: cl 4.12.1
r'§\s*(\d\.\d+(?:\.\d+)?)' # Ex: § 4.13.1
]
chapters_found = []
normalized_text = text.lower().replace('\n', ' ').replace('\r', ' ')
for pattern in patterns:
matches = re.findall(pattern, normalized_text)
for match in matches:
chapter_num = match if isinstance(match, str) else (match[-1] if isinstance(match, tuple) and match[-1] else match[0] if isinstance(match, tuple) and match[0] else None)
if chapter_num:
chapter_num = chapter_num.strip().rstrip('.').strip()
if re.fullmatch(r'\d(\.\d+){1,2}', chapter_num) or re.fullmatch(r'\d\.\d+', chapter_num) :
main_chapter_part = chapter_num.split('.')[0]
if main_chapter_part.isdigit() and 1 <= int(main_chapter_part) <= 6:
chapters_found.append(chapter_num)
return sorted(list(set(chapters_found)))
def analyze_themes(self):
if self.locked_df is None or 'lock_reason_clean' not in self.locked_df.columns: return {}, {}
theme_counts = {theme: 0 for theme in self.themes_keywords_definition}
theme_details = {theme: [] for theme in self.themes_keywords_definition}
for index, row in self.locked_df.iterrows():
reason_text = row['lock_reason_clean']
original_reason = row['Lock reason']
supplier = row['Supplier']
country = row.get('Country/Region', 'N/A')
for theme, keywords in self.themes_keywords_definition.items():
if any(keyword in reason_text for keyword in keywords):
theme_counts[theme] += 1
theme_details[theme].append({
"reason": original_reason,
"supplier": supplier,
"country": country
})
return theme_counts, theme_details
def geographic_analysis(self):
if self.locked_df is None or 'Country/Region' not in self.locked_df.columns: return None
return self.locked_df.groupby('Country/Region').size().sort_values(ascending=False).reset_index(name='total_suspensions')
def clean_product_scopes(self, scope_text):
if pd.isna(scope_text): return []
scope_text = str(scope_text)
raw_scopes = re.split(r'[,;\s"\'’`]|et|\/|&|\.\s', scope_text) # Ajout de guillemets et point suivi d'espace
cleaned_scopes = []
for scope in raw_scopes:
scope = scope.strip().replace('"', '').replace("'", "")
if not scope or not scope.isdigit(): continue # Ignorer si vide ou non numérique après nettoyage
num = int(scope)
if 1 <= num <= 11:
cleaned_scopes.append(str(num))
elif num > 1000: # ex: 2005, 2007, 2009, 2010
potential_scope_2 = str(num % 100) # Pour 10, 11
potential_scope_1 = str(num % 10) # Pour 1-9
if potential_scope_2 in ['10', '11']:
cleaned_scopes.append(potential_scope_2)
elif potential_scope_1 in [str(i) for i in range(1,10)]:
cleaned_scopes.append(potential_scope_1)
return list(set(cleaned_scopes))
def product_scope_analysis(self):
if self.locked_df is None or 'Product scopes' not in self.locked_df.columns: return None
all_scopes = []
for scopes_text in self.locked_df['Product scopes'].dropna():
cleaned = self.clean_product_scopes(scopes_text)
all_scopes.extend(cleaned)
return Counter(all_scopes)
def chapter_frequency_analysis(self):
if self.locked_df is None or 'Lock reason' not in self.locked_df.columns: return Counter()
all_chapters = []
for reason in self.locked_df['Lock reason'].dropna():
all_chapters.extend(self.extract_ifs_chapters(reason))
return Counter(all_chapters)
def analyze_audit_types(self):
if self.locked_df is None: return {}, {}
audit_keywords = {
'INTEGRITY_PROGRAM_IP': ['integrity program', 'integrity', 'programme intégrité', 'programme integrity','onsite check', 'on site check', 'on-site check', 'on-site integrity check', 'ioc', 'i.o.c', 'ip audit', 'integrity audit', 'spot check', 'unannounced audit', 'audit inopiné', 'control inopiné', 'ifs integrity'],
'SURVEILLANCE_FOLLOW_UP': ['surveillance', 'surveillance audit', 'follow up audit', 'follow-up', 'suivi', 'corrective action'],
'COMPLAINT_WITHDRAWAL': ['complaint', 'réclamation', 'plainte', 'customer complaint', 'withdrawal', 'retrait', 'recall'],
'RECERTIFICATION_RENEWAL': ['recertification', 'renewal', 'renouvellement', 're-certification', 'renewal audit']
}
audit_analysis = {audit_type: 0 for audit_type in audit_keywords}
audit_examples = {audit_type: {'examples': [], 'countries': Counter()} for audit_type in audit_keywords}
for index, row in self.locked_df.iterrows():
text_to_search = (str(row.get('Lock reason', '')) + " " + str(row.get('Lock history', ''))).lower()
for audit_type, keywords in audit_keywords.items():
if any(keyword in text_to_search for keyword in keywords):
audit_analysis[audit_type] += 1
if len(audit_examples[audit_type]['examples']) < 5: # Plus d'exemples
audit_examples[audit_type]['examples'].append({
'Supplier': row.get('Supplier', 'N/A'),
'Country/Region': row.get('Country/Region', 'N/A'),
'Lock reason': row.get('Lock reason', 'N/A')
})
audit_examples[audit_type]['countries'][row.get('Country/Region', 'N/A')] += 1
for audit_type in audit_examples:
audit_examples[audit_type]['countries'] = dict(audit_examples[audit_type]['countries'].most_common(5))
return audit_analysis, audit_examples
def generate_ifs_recommendations_analysis(self):
if self.locked_df is None or self.checklist_df is None: return None
chapter_counts = self.chapter_frequency_analysis()
if not chapter_counts: return None
recommendations = []
for chapter, count in chapter_counts.most_common():
norm_chapter = chapter.replace("KO ", "").strip()
req_text_series = self.checklist_df[self.checklist_df['Requirement Number'].astype(str).str.strip() == norm_chapter]['Requirement text (English)']
req_text = req_text_series.iloc[0] if not req_text_series.empty else "Texte de l'exigence non trouvé."
# if len(req_text) > 250: req_text = req_text[:247] + "..." # Tronquer pour affichage résumé
recommendations.append({'chapter': chapter, 'count': count, 'requirement_text': req_text})
return recommendations
def cross_analysis_scope_themes(self):
if self.locked_df is None or 'Product scopes' not in self.locked_df.columns or 'lock_reason_clean' not in self.locked_df.columns: return None
themes_for_cross = {
'HYGIENE': self.themes_keywords_definition['HYGIENE_PERSONNEL'], 'HACCP': self.themes_keywords_definition['HACCP_CCP_OPRP'],
'TRACE': self.themes_keywords_definition['TRACEABILITY'], 'ALLERGEN': self.themes_keywords_definition['ALLERGEN_MANAGEMENT'],
'CLEAN': self.themes_keywords_definition['CLEANING_SANITATION'], 'MAINT': self.themes_keywords_definition['MAINTENANCE_EQUIPMENT'],
'LABEL': self.themes_keywords_definition['LABELLING'], 'PEST': self.themes_keywords_definition['PEST_CONTROL']
}
scope_theme_data = []
for idx, row in self.locked_df.iterrows():
scopes_text, reason_text = row['Product scopes'], row['lock_reason_clean']
if pd.notna(scopes_text) and pd.notna(reason_text) and reason_text:
for scope in self.clean_product_scopes(scopes_text):
for theme, keywords in themes_for_cross.items():
if any(kw in reason_text for kw in keywords):
scope_theme_data.append({'scope': f"Scope {scope}", 'theme': theme})
if not scope_theme_data: return None
return pd.DataFrame(scope_theme_data).pivot_table(index='scope', columns='theme', aggfunc='size', fill_value=0)
def _create_plotly_bar_chart(self, data_dict, title, orientation='v', xaxis_title="", yaxis_title="", color='royalblue', height=400):
if not data_dict : return go.Figure()
if orientation == 'h':
y_data = list(data_dict.keys())
x_data = list(data_dict.values())
else:
x_data = list(data_dict.keys())
y_data = list(data_dict.values())
fig = go.Figure(go.Bar(x=x_data, y=y_data, orientation=orientation, marker_color=color, text=x_data if orientation=='h' else y_data, textposition='auto'))
fig.update_layout(
title={'text': title, 'x':0.5, 'font': {'size': 16}},
xaxis_title=xaxis_title,
yaxis_title=yaxis_title,
height=height,
margin=dict(l=10, r=10, t=50, b=10),
yaxis=dict(autorange="reversed") if orientation == 'h' else {},
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)'
)
return fig
def _create_plotly_choropleth_map(self, geo_data_df, title, height=500):
if geo_data_df is None or geo_data_df.empty: return go.Figure()
fig = px.choropleth(geo_data_df, locations="Country/Region",
locationmode='country names', # S'assurer que les noms de pays sont compatibles
color="total_suspensions",
hover_name="Country/Region",
color_continuous_scale=px.colors.sequential.Plasma,
title=title,
height=height)
fig.update_layout(
title={'x':0.5, 'font': {'size': 16}},
geo=dict(showframe=False, showcoastlines=False, projection_type='equirectangular'),
margin=dict(l=10, r=10, t=50, b=10),
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)'
)
return fig
def _create_plotly_heatmap(self, pivot_matrix, title, height=500):
if pivot_matrix is None or pivot_matrix.empty: return go.Figure()
fig = px.imshow(pivot_matrix, text_auto=True, aspect="auto", color_continuous_scale='YlGnBu', title=title, height=height)
fig.update_layout(
title={'x':0.5, 'font': {'size': 16}},
margin=dict(l=10, r=10, t=50, b=10),
xaxis=dict(tickangle=45),
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)'
)
return fig
# --- Méthodes pour le rapport PDF (utilisant Matplotlib) ---
def _add_text_to_pdf_page(self, fig, text_lines, start_y=0.95, line_height=0.035, font_size=9, title="", title_font_size=14, max_chars_per_line=100):
ax = fig.gca()
ax.clear()
ax.axis('off')
if title:
ax.text(0.5, start_y, title, ha='center', va='top', fontsize=title_font_size, fontweight='bold')
start_y -= (line_height * 2.5)
current_y = start_y
for line in text_lines:
wrapped_lines = [line[i:i+max_chars_per_line] for i in range(0, len(line), max_chars_per_line)]
for wrapped_line in wrapped_lines:
if current_y < 0.05: return False # Page pleine
fw = 'bold' if line.startswith(tuple(["🎯","📊","🌍","🏭","📋","🔍"])) else 'normal'
fs = font_size + 1 if fw == 'bold' else font_size
ax.text(0.03, current_y, wrapped_line, ha='left', va='top', fontsize=fs, fontweight=fw)
current_y -= line_height
if not line.strip(): current_y -= (line_height * 0.5)
return True
def _create_matplotlib_figure_for_pdf(self, data_dict_or_df, title, x_label="", y_label="", chart_type='barh', top_n=10, color='skyblue', xtick_rotation=0, ytick_fontsize=8):
if not data_dict_or_df and not isinstance(data_dict_or_df, pd.DataFrame) : return None
fig, ax = plt.subplots(figsize=(10, 6.5))
items, values = [], []
if isinstance(data_dict_or_df, (Counter, dict)):
sorted_data = dict(sorted(data_dict_or_df.items(), key=lambda item: item[1], reverse=True)[:top_n])
items = [str(k).replace('_',' ').replace('MANAGEMENT','MGMT').replace('RESPONSIBILITY','RESP.')[:30] for k in sorted_data.keys()] # Tronquer labels longs
values = list(sorted_data.values())
elif isinstance(data_dict_or_df, pd.DataFrame):
df_top = data_dict_or_df.head(top_n)
if 'Country/Region' in df_top.columns and 'total_suspensions' in df_top.columns:
items = df_top['Country/Region'].tolist()
values = df_top['total_suspensions'].tolist()
chart_type = 'bar'
elif 'chapter' in df_top.columns and 'count' in df_top.columns and 'requirement_text' in df_top.columns:
items = [f"{row['chapter']}\n({row['requirement_text'][:35]}...)" if row['requirement_text'] != "Texte de l'exigence non trouvé." else row['chapter'] for index, row in df_top.iterrows()]
values = df_top['count'].tolist()
chart_type = 'bar'
else: # Cas générique
if not df_top.empty:
items = df_top.index.astype(str).tolist() if len(df_top.columns) == 1 else df_top.iloc[:,0].astype(str).tolist()
values = df_top.iloc[:,0].tolist() if len(df_top.columns) == 1 else df_top.iloc[:,1].tolist()
if not items or not values or all(v == 0 for v in values): return None
if chart_type == 'barh':
ax.barh(items, values, color=color, edgecolor='grey')
ax.set_yticklabels(items, fontsize=ytick_fontsize)
ax.invert_yaxis()
ax.set_xlabel(x_label if x_label else 'Nombre de cas', fontsize=10)
for i, v_ in enumerate(values): ax.text(v_ + (max(values)*0.01), i, str(v_), va='center', fontsize=8)
ax.set_xlim(0, max(values) * 1.12 if values else 1)
elif chart_type == 'bar':
bars = ax.bar(items, values, color=color, edgecolor='grey')
ax.set_xticklabels(items, rotation=xtick_rotation, ha='right' if xtick_rotation > 0 else 'center', fontsize=ytick_fontsize)
ax.set_ylabel(y_label if y_label else 'Nombre de cas', fontsize=10)
for bar in bars:
yval = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2.0, yval + (max(values)*0.01), int(yval), ha='center', va='bottom', fontsize=8)
ax.set_ylim(0, max(values) * 1.12 if values else 1)
ax.set_title(title, fontsize=13, fontweight='bold', pad=15)
ax.grid(axis='x' if chart_type == 'barh' else 'y', linestyle='--', alpha=0.7)
sns.despine(left=True, bottom=True)
plt.tight_layout(pad=1.5)
return fig
def export_report_to_pdf(self, filename='IFS_Analysis_Report.pdf'):
if self.locked_df is None: return None
try:
with PdfPages(filename) as pdf:
# print(f"📄 Génération du rapport PDF: {filename}")
total_suspensions = len(self.locked_df)
if total_suspensions == 0:
fig = plt.figure(figsize=(8.5, 11)); self._add_text_to_pdf_page(fig, ["Aucune donnée."], title="Rapport"); pdf.savefig(fig); plt.close(fig); return filename
# Page 1: Couverture
fig = plt.figure(figsize=(8.5, 11))
title_page_text = [ f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}", "",
f"Source Suspensions: {st.session_state.get('locked_file_name_original', 'N/A')}",
f"Source Checklist: {st.session_state.get('checklist_file_name_original', 'Non fournie')}", "",
"📊 VUE D'ENSEMBLE",
f" • Total suspensions IFS Food: {total_suspensions}",
f" • Avec motifs: {self.locked_df['Lock reason'].notna().sum()} ({self.locked_df['Lock reason'].notna().sum()/total_suspensions*100:.1f}%)" ]
audit_s, _ = self.analyze_audit_types(); total_as = sum(audit_s.values())
title_page_text.append(f" • Liées à audits spécifiques: {total_as} ({total_as/total_suspensions*100:.1f}%)")
self._add_text_to_pdf_page(fig, title_page_text, title="Rapport d'Analyse IFS Food Safety"); pdf.savefig(fig, bbox_inches='tight'); plt.close(fig)
# Graphiques
tc, _ = self.analyze_themes(); fig_t = self._create_matplotlib_figure_for_pdf(tc, 'Top 10 Thèmes NC', color='indianred', ytick_fontsize=7);
if fig_t: pdf.savefig(fig_t, bbox_inches='tight'); plt.close(fig_t)
gs = self.geographic_analysis(); fig_g = self._create_matplotlib_figure_for_pdf(gs, 'Top 10 Pays', chart_type='bar', color='lightseagreen', xtick_rotation=30, ytick_fontsize=7);
if fig_g: pdf.savefig(fig_g, bbox_inches='tight'); plt.close(fig_g)
sc = self.product_scope_analysis(); sc_plot = {f"Sc {k}": v for k,v in sc.items()}; fig_s = self._create_matplotlib_figure_for_pdf(sc_plot, 'Top 10 Product Scopes', color='cornflowerblue', ytick_fontsize=7);
if fig_s: pdf.savefig(fig_s, bbox_inches='tight'); plt.close(fig_s)
reco = self.generate_ifs_recommendations_analysis()
if reco:
df_reco = pd.DataFrame(reco)
fig_c = self._create_matplotlib_figure_for_pdf(df_reco, 'Top 10 Exigences IFS', chart_type='bar', color='gold', xtick_rotation=30, ytick_fontsize=6);
else:
cc_direct = self.chapter_frequency_analysis()
fig_c = self._create_matplotlib_figure_for_pdf(cc_direct, 'Top 10 Chapitres IFS (Numéros)', chart_type='bar', color='gold', xtick_rotation=30, ytick_fontsize=7);
if fig_c: pdf.savefig(fig_c, bbox_inches='tight'); plt.close(fig_c)
# Heatmap
cpm = self.cross_analysis_scope_themes()
if cpm is not None and not cpm.empty:
top_n = min(8, len(cpm.index)); scope_tots = cpm.sum(axis=1).sort_values(ascending=False)
cpm_f = cpm.loc[scope_tots.head(top_n).index] if len(cpm.index) > top_n else cpm
if not cpm_f.empty:
fig_h, ax_h = plt.subplots(figsize=(10, max(5, len(cpm_f.index)*0.6)))
sns.heatmap(cpm_f, annot=True, cmap="YlGnBu", fmt='d', ax=ax_h, annot_kws={"size":7});
ax_h.set_title('Corrélations: Thèmes vs Scopes (Top)', fontsize=13, pad=15)
ax_h.tick_params(axis='x', labelsize=8, rotation=30, ha='right'); ax_h.tick_params(axis='y', labelsize=8)
plt.tight_layout(pad=1.5); pdf.savefig(fig_h, bbox_inches='tight'); plt.close(fig_h)
# Pages Texte Détaillées
for gen_func, title_str, lh, fs, mcpl in [
(self.generate_detailed_theme_analysis, "Analyse Thématique Détaillée", 0.03, 8, 110),
(self.generate_audit_analysis_report, "Analyse des Types d'Audits", 0.03, 8, 110)
]:
fig = plt.figure(figsize=(8.5, 11)); s_io = io.StringIO(); gen_func(stream=s_io)
self._add_text_to_pdf_page(fig, s_io.getvalue().splitlines(), title=title_str, line_height=lh, font_size=fs, max_chars_per_line=mcpl)
pdf.savefig(fig, bbox_inches='tight'); plt.close(fig)
if reco:
fig = plt.figure(figsize=(8.5, 11))
req_tl = ["Note: Texte de l'exigence de la checklist IFS Food v8 (si fournie).\n"]
for r_ in sorted(reco, key=lambda x: x['count'], reverse=True)[:20]:
req_tl.extend([f"📋 Chap {r_['chapter']} ({r_['count']} mentions)", f" Txt: {r_['requirement_text']}", ""])
self._add_text_to_pdf_page(fig, req_tl, title="Détail Exigences IFS", line_height=0.028, font_size=7, max_chars_per_line=120)
pdf.savefig(fig, bbox_inches='tight'); plt.close(fig)
# print(f"✅ Rapport PDF complet généré: {filename}")
return filename
except Exception as e:
st.error(f"❌ Erreur majeure PDF: {e}")
return None # Pas de fallback txt dans Streamlit, l'erreur est affichée
# --- Fonctions Utilitaires pour Streamlit ---
@st.cache_data # Mise en cache des données téléchargées pour éviter de recharger
def load_csv_data(uploaded_file):
if uploaded_file is not None:
try:
return pd.read_csv(uploaded_file)
except Exception as e:
st.error(f"Erreur lors de la lecture du fichier CSV : {e}")
return None
return None
@st.cache_resource # Mise en cache de l'objet analyseur
def get_analyzer(locked_data_io, checklist_data_io):
return IFSAnalyzer(locked_data_io, checklist_data_io)
def download_checklist_from_github_st(url="https://raw.githubusercontent.com/M00N69/Action-plan/main/Guide%20Checklist_IFS%20Food%20V%208%20-%20CHECKLIST.csv"):
try:
response = requests.get(url, timeout=10) # Ajout d'un timeout
response.raise_for_status()
return io.StringIO(response.text)
except requests.exceptions.RequestException as e:
st.warning(f"Impossible de télécharger la checklist depuis GitHub ({e}). L'analyse des exigences sera limitée.")
return None
# --- Interface Streamlit ---
def main():
st.title("🛡️ Analyseur de Sécurité Alimentaire IFS")
st.markdown("Téléversez votre fichier de suspensions IFS pour générer une analyse détaillée et des visualisations.")
# --- Sidebar pour les options ---
with st.sidebar:
st.header("⚙️ Options d'Analyse")
locked_file_uploaded = st.file_uploader("1. Fichier des suspensions IFS (.csv)", type="csv", key="locked_uploader")
st.markdown("---")
checklist_source = st.radio(
"2. Source de la Checklist IFS Food V8:",
("Utiliser celle de GitHub (Recommandé)", "Téléverser ma checklist", "Ne pas utiliser de checklist"),
key="checklist_source_radio"
)
checklist_file_uploaded = None
if checklist_source == "Téléverser ma checklist":
checklist_file_uploaded = st.file_uploader("Téléversez votre fichier checklist (.csv)", type="csv", key="checklist_uploader")
# --- Logique principale ---
if locked_file_uploaded is not None:
# Stocker les noms de fichiers originaux pour le rapport PDF
st.session_state.locked_file_name_original = locked_file_uploaded.name
if checklist_file_uploaded:
st.session_state.checklist_file_name_original = checklist_file_uploaded.name
elif checklist_source == "Utiliser celle de GitHub (Recommandé)":
st.session_state.checklist_file_name_original = "Checklist IFS Food V8 (GitHub)"
else:
st.session_state.checklist_file_name_original = "Non fournie"
locked_data_io = io.BytesIO(locked_file_uploaded.getvalue()) # Convertir en BytesIO pour l'analyseur
checklist_data_io = None
if checklist_source == "Téléverser ma checklist" and checklist_file_uploaded is not None:
checklist_data_io = io.BytesIO(checklist_file_uploaded.getvalue())
elif checklist_source == "Utiliser celle de GitHub (Recommandé)":
# Ne pas mettre en cache le téléchargement lui-même, mais le résultat si nécessaire
checklist_data_io = download_checklist_from_github_st()
# Utiliser BytesIO pour l'analyseur
analyzer = get_analyzer(locked_data_io, checklist_data_io)
if analyzer.locked_df is not None and not analyzer.locked_df.empty:
st.success(f"Fichier '{locked_file_uploaded.name}' chargé et analysé. {len(analyzer.locked_df)} suspensions IFS Food trouvées.")
display_dashboard_tabs(analyzer) # Afficher les onglets avec les résultats
# Bouton pour générer le rapport PDF
st.sidebar.markdown("---")
if st.sidebar.button("📄 Générer le Rapport PDF Complet", key="pdf_button"):
with st.spinner("Génération du rapport PDF en cours... Veuillez patienter."):
pdf_path = analyzer.export_report_to_pdf() # La méthode gère l'enregistrement
if pdf_path:
with open(pdf_path, "rb") as pdf_file:
st.sidebar.download_button(
label="📥 Télécharger le Rapport PDF",
data=pdf_file,
file_name="Analyse_IFS_Suspensions_Report.pdf",
mime="application/pdf"
)
st.sidebar.success("Rapport PDF généré !")
else:
st.sidebar.error("Erreur lors de la création du rapport PDF.")
else:
st.error("Aucune donnée IFS Food n'a été trouvée dans le fichier ou après filtrage. Veuillez vérifier votre fichier.")
else:
st.info("Veuillez téléverser un fichier CSV des suspensions IFS pour commencer l'analyse.")
st.sidebar.markdown("---")
st.sidebar.markdown("Développé avec ❤️ par IA")
def display_dashboard_tabs(analyzer):
tab_titles = [
"📊 Vue d'Ensemble", "🌍 Géographie", "🏷️ Thèmes Détaillés",
"📋 Exigences IFS", "🕵️ Types d'Audits", "🔗 Analyse Croisée"
]
tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs(tab_titles)
with tab1: # Vue d'Ensemble
st.header("📊 Vue d'Ensemble des Suspensions")
col1, col2 = st.columns(2)
total_suspensions = len(analyzer.locked_df)
with_reasons_count = analyzer.locked_df['Lock reason'].notna().sum()
audit_analysis_summary, _ = analyzer.analyze_audit_types()
total_audit_special = sum(audit_analysis_summary.values())
with col1:
st.metric("Total Suspensions IFS Food", total_suspensions)
st.metric("Avec Motifs Documentés", f"{with_reasons_count} ({with_reasons_count/total_suspensions*100:.1f}%)" if total_suspensions > 0 else "0")
with col2:
st.metric("Liées à Audits Spécifiques (IP, etc.)", f"{total_audit_special} ({total_audit_special/total_suspensions*100:.1f}%)" if total_suspensions > 0 else "0")
# Vous pouvez ajouter d'autres métriques ici
st.markdown("---")
st.subheader("Visualisations Clés")
# Thèmes
theme_counts, _ = analyzer.analyze_themes()
if theme_counts:
top_themes = dict(sorted(theme_counts.items(), key=lambda x:x[1], reverse=True)[:10])
top_themes_clean = {k.replace('_',' ').replace('MANAGEMENT','MGMT').replace('RESPONSIBILITY','RESP.'):v for k,v in top_themes.items() if v > 0}
if top_themes_clean: st.plotly_chart(analyzer._create_plotly_bar_chart(top_themes_clean, "Top 10 Thèmes de Non-Conformités", orientation='h', color='indianred', height=450), use_container_width=True)
# Product Scopes
scope_counts = analyzer.product_scope_analysis()
if scope_counts:
top_scopes = dict(scope_counts.most_common(10))
top_scopes_clean = {f"Scope {k}": v for k, v in top_scopes.items() if v > 0}
if top_scopes_clean: st.plotly_chart(analyzer._create_plotly_bar_chart(top_scopes_clean, "Top 10 Product Scopes Impactés", orientation='h', color='cornflowerblue', height=450), use_container_width=True)
with tab2: # Géographie
st.header("🌍 Analyse Géographique")
geo_stats_df = analyzer.geographic_analysis()
if geo_stats_df is not None and not geo_stats_df.empty:
st.plotly_chart(analyzer._create_plotly_choropleth_map(geo_stats_df, "Suspensions par Pays"), use_container_width=True)
st.markdown("---")
st.subheader("Tableau des Suspensions par Pays (Top 15)")
st.dataframe(geo_stats_df.head(15), use_container_width=True)
else:
st.info("Données géographiques non disponibles.")
with tab3: # Thèmes Détaillés
st.header("🏷️ Analyse Thématique Détaillée")
theme_counts, theme_details = analyzer.analyze_themes()
for theme, count in sorted(theme_counts.items(), key=lambda x: x[1], reverse=True):
if count > 0:
with st.expander(f"{theme.replace('_', ' ')} ({count} cas)", expanded=False):
st.markdown(f"**Exemples de motifs (jusqu'à 3) pour le thème : {theme.replace('_', ' ')}**")
for i, detail in enumerate(theme_details[theme][:3]):
st.markdown(f"**Cas {i+1} (Fournisseur: {detail['supplier']}, Pays: {detail['country']})**")
st.caption(f"{detail['reason'][:500]}...") # Afficher une partie du motif
st.markdown("---")
# Ajouter les pays les plus touchés pour ce thème
theme_keywords_current_theme = analyzer.themes_keywords_definition.get(theme, [])
if theme_keywords_current_theme:
theme_mask = analyzer.locked_df['lock_reason_clean'].str.contains('|'.join(theme_keywords_current_theme), case=False, na=False, regex=True)
if theme_mask.sum() > 0:
theme_countries_df = analyzer.locked_df[theme_mask]['Country/Region'].value_counts().reset_index()
theme_countries_df.columns = ['Pays', 'Nombre de cas']
if not theme_countries_df.empty:
st.markdown("**Pays les plus affectés par ce thème :**")
st.dataframe(theme_countries_df.head(5), use_container_width=True)
with tab4: # Exigences IFS
st.header("📋 Analyse des Exigences IFS")
recommendations = analyzer.generate_ifs_recommendations_analysis()
if recommendations and analyzer.checklist_df is not None:
st.info("Les textes des exigences proviennent de la checklist IFS Food V8. Les numéros sont extraits des motifs de suspension.")
df_reco = pd.DataFrame(recommendations)
df_reco_sorted = df_reco.sort_values(by='count', ascending=False)
# Graphique des exigences les plus citées
top_reco_chart = df_reco_sorted.head(10).copy() # Pour le graphique
# Raccourcir les labels pour le graphique
top_reco_chart['display_label'] = top_reco_chart.apply(lambda row: f"{row['chapter']} ({row['requirement_text'][:25]}...)", axis=1)
reco_chart_data = pd.Series(top_reco_chart['count'].values, index=top_reco_chart['display_label']).to_dict()
st.plotly_chart(analyzer._create_plotly_bar_chart(reco_chart_data, "Top 10 Exigences IFS Citées", orientation='v', color='gold', height=500), use_container_width=True)
st.markdown("---")
st.subheader("Détail des Exigences Citées")
for index, row in df_reco_sorted.iterrows():
with st.expander(f"Exigence {row['chapter']} ({row['count']} mentions)", expanded=False):
st.markdown(f"**Texte de l'exigence :**")
st.markdown(f"> {row['requirement_text']}")
elif recommendations: # Pas de checklist mais des chapitres extraits
st.warning("Checklist non chargée. Affichage des numéros de chapitres uniquement.")
df_reco = pd.DataFrame(recommendations)
df_reco_sorted = df_reco.sort_values(by='count', ascending=False)
chapter_counts_dict = pd.Series(df_reco_sorted['count'].values, index=df_reco_sorted['chapter']).to_dict()
st.plotly_chart(analyzer._create_plotly_bar_chart(chapter_counts_dict, "Top Chapitres IFS Cités (Numéros)", orientation='v', color='gold', height=500), use_container_width=True)
st.dataframe(df_reco_sorted, use_container_width=True)
else:
st.info("Aucune exigence IFS spécifique n'a pu être extraite des motifs ou la checklist n'est pas disponible.")
with tab5: # Types d'Audits
st.header("🕵️ Analyse par Types d'Audits")
audit_analysis, audit_examples = analyzer.analyze_audit_types()
if audit_analysis:
# Graphique
audit_analysis_clean = {k.replace('_', ' '):v for k,v in audit_analysis.items() if v > 0}
if audit_analysis_clean: st.plotly_chart(analyzer._create_plotly_bar_chart(audit_analysis_clean, "Répartition par Type d'Audit Spécifique", color='darkorange', height=400), use_container_width=True)
st.markdown("---")
st.subheader("Détails et Exemples par Type d'Audit")
for audit_type, count in sorted(audit_analysis.items(), key=lambda x: x[1], reverse=True):
if count > 0:
with st.expander(f"{audit_type.replace('_', ' ')} ({count} cas)", expanded=False):
st.markdown(f"**Exemples (jusqu'à 3) pour : {audit_type.replace('_', ' ')}**")
for i, ex_data in enumerate(audit_examples[audit_type]['examples'][:3]):
st.markdown(f"**Cas {i+1} (Fournisseur: {ex_data.get('Supplier', 'N/A')}, Pays: {ex_data.get('Country/Region', 'N/A')})**")
st.caption(f"{ex_data.get('Lock reason', 'N/A')[:500]}...")
st.markdown("---")
countries_data = audit_examples[audit_type]['countries']
if countries_data:
st.markdown(f"**Répartition géographique (Top 5 pays) pour ce type d'audit :** {', '.join([f'{c} ({n})' for c, n in countries_data.items()])}")
else:
st.info("Aucune donnée sur les types d'audits disponible.")
with tab6: # Analyse Croisée
st.header("🔗 Analyse Croisée : Thèmes vs Product Scopes")
cross_pivot_matrix = analyzer.cross_analysis_scope_themes()
if cross_pivot_matrix is not None and not cross_pivot_matrix.empty:
# Pour la lisibilité, on peut filtrer les scopes avec peu de données ou transposer
# Limiter le nombre de scopes affichés dans la heatmap pour la lisibilité
top_n_scopes_for_heatmap = min(15, len(cross_pivot_matrix.index))
if len(cross_pivot_matrix.index) > top_n_scopes_for_heatmap:
scope_totals = cross_pivot_matrix.sum(axis=1).sort_values(ascending=False)
top_scopes_names = scope_totals.head(top_n_scopes_for_heatmap).index
cross_pivot_matrix_filtered = cross_pivot_matrix.loc[top_scopes_names]
else:
cross_pivot_matrix_filtered = cross_pivot_matrix
if not cross_pivot_matrix_filtered.empty:
st.plotly_chart(analyzer._create_plotly_heatmap(cross_pivot_matrix_filtered, "Fréquence des Thèmes de NC par Product Scope (Top Scopes)", height=max(500, len(cross_pivot_matrix_filtered.index) * 40)), use_container_width=True)
st.markdown("---")
st.subheader("Tableau de Corrélation Complet (Scopes vs Thèmes)")
st.dataframe(cross_pivot_matrix.style.background_gradient(cmap='YlGnBu', axis=None), use_container_width=True) # axis=None pour colorer toute la cellule
else:
st.info("Pas assez de données pour afficher la heatmap après filtrage.")
else:
st.info("Données insuffisantes pour l'analyse croisée Thèmes vs Product Scopes.")
# --- Exécution de l'application ---
if __name__ == "__main__":
main()