import streamlit as st import pandas as pd import numpy as np import matplotlib matplotlib.use('Agg') # Mode non-interactif pour matplotlib import matplotlib.pyplot as plt import seaborn as sns from collections import Counter import re import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from matplotlib.backends.backend_pdf import PdfPages from datetime import datetime import io import requests # from wordcloud import WordCloud # Si utilisé # from sklearn.feature_extraction.text import TfidfVectorizer # Si utilisé # from sklearn.cluster import KMeans # Si utilisé # --- Configuration de la Page Streamlit --- st.set_page_config( page_title="Analyseur Sécurité Alimentaire IFS", page_icon="🛡️", layout="wide", # Utilise toute la largeur de la page initial_sidebar_state="expanded" # Sidebar ouverte par défaut ) # --- Styles CSS Personnalisés (Optionnel, pour un look plus moderne) --- # Vous pouvez ajouter du CSS ici pour personnaliser davantage l'apparence. # Exemple : st.markdown("", unsafe_allow_html=True) # Pour l'instant, nous allons nous concentrer sur la structure. # --- Classe IFSAnalyzer (copiez-collez votre classe ici) --- # Assurez-vous que les méthodes de la classe sont adaptées pour Streamlit : # - Elles devraient retourner des données (DataFrames, dicts, figures Plotly/Matplotlib) # - La génération PDF devrait enregistrer le fichier et retourner son chemin. class IFSAnalyzer: def __init__(self, locked_file_io, checklist_file_io=None): self.locked_df = None self.checklist_df = None self.themes_keywords_definition = { 'HYGIENE_PERSONNEL': ['hygien', 'personnel', 'clothing', 'hand', 'wash', 'uniform', 'gmp', 'locker', 'changing room', 'work clothing', 'protective clothes'], 'HACCP_CCP_OPRP': ['haccp', 'ccp', 'oprp', 'critical', 'control', 'point', 'monitoring', 'hazard analysis', 'validation haccp', '2.3.9.1'], 'TRACEABILITY': ['traceability', 'trace', 'record', 'batch', 'lot', 'identification', 'tracking', '4.18.1'], 'ALLERGEN_MANAGEMENT': ['allergen', 'allergy', 'cross-contamination', 'gluten', 'lactose', 'celery', 'mustard', 'wheat', 'egg', '4.19.2'], 'PEST_CONTROL': ['pest', 'rodent', 'insect', 'trap', 'bait', 'infestation', 'fly', 'mouse', 'rat', 'moth', 'weevil', 'spider', 'cobweb', '4.13.1', '4.13.2'], 'CLEANING_SANITATION': ['clean', 'sanitation', 'disinfect', 'chemical', 'cleaning plan', 'dirt', 'residue', '4.10.1', '4.10.2'], 'TEMPERATURE_CONTROL': ['temperature', 'cold', 'heat', 'refrigerat', 'freez', 'thaw', 'cooling'], 'MAINTENANCE_EQUIPMENT': ['maintenance', 'equipment', 'calibrat', 'repair', 'infrastructure', 'facility', 'structure', 'conveyor', '4.9.1.1', '4.16.5', '4.17.2', '4.17.4'], 'DOCUMENTATION_RECORDS': ['document', 'procedure', 'record', 'manual', 'specification', 'not documented', '5.1.1', '5.1.2', '5.3.2'], 'FOREIGN_BODY_CONTAMINATION': ['foreign body', 'foreign material', 'glass', 'metal', 'detect', 'x-ray', 'contaminat', 'wood', 'plastic', '4.12.1', '4.12.2', '4.12.3'], 'STORAGE_WAREHOUSING': ['storage', 'warehouse', 'stock', 'segregat', 'pallet', 'raw material storage', '4.14.3', '4.14.5'], 'SUPPLIER_RAW_MATERIAL_CONTROL': ['supplier', 'vendor', 'purchase', 'raw material', 'ingredient', 'packaging material', 'declaration of conformity', 'doc', '4.5.1', '4.5.2'], 'LABELLING': ['label', 'labelling', 'declaration', 'ingredient list', 'mrl', 'allergen labelling', 'nutritional information', '4.3.1', '4.3.2'], 'QUANTITY_CONTROL_WEIGHT': ['quantity control', 'weight', 'fill', 'scale', 'metrological', 'underfilling', '5.5.1', '5.5.2', '5.4.1', '5.4.2'], 'MANAGEMENT_RESPONSIBILITY_CULTURE': ['management', 'responsibilit', 'food safety culture', 'internal audit', 'corrective action', 'training', '1.1.1', '1.1.2', '1.2.1', '5.11.1', '5.11.2', '5.11.3', '5.11.4'], 'NON_PAYMENT_ADMINISTRATIVE': ['payment', 'invoice', 'pay', 'closure', 'discontinued', 'bankrupt', 'denies access', 'ceased operation', 'fire', 'merged'], 'INTEGRITY_PROGRAM_ISSUES': ['integrity', 'on-site check', 'ioc', 'unannounced audit', 'integrity on-site check', 'integrity on site check'] } self.load_data(locked_file_io, checklist_file_io) if self.locked_df is not None: self.clean_lock_reasons() def load_data(self, locked_file_io, checklist_file_io=None): try: self.locked_df = pd.read_csv(locked_file_io, encoding='utf-8') # print(f"✅ Données de suspension chargées: {len(self.locked_df)} lignes initiales.") if 'Standard' in self.locked_df.columns: self.locked_df = self.locked_df[self.locked_df['Standard'].str.contains('IFS Food', na=False, case=False)] # print(f"📋 Après filtrage IFS Food: {len(self.locked_df)} lignes.") if checklist_file_io: try: self.checklist_df = pd.read_csv(checklist_file_io, encoding='utf-8') # print(f"✅ Checklist IFS chargée: {len(self.checklist_df)} exigences.") if 'Requirement Number' not in self.checklist_df.columns or \ 'Requirement text (English)' not in self.checklist_df.columns: st.warning("Colonnes 'Requirement Number' ou 'Requirement text (English)' manquantes dans la checklist. L'analyse des exigences sera limitée.") self.checklist_df = None except Exception as e_checklist: st.error(f"Erreur lors du chargement du fichier checklist : {e_checklist}") self.checklist_df = None # else: # print("ℹ️ Fichier checklist non fourni. L'analyse des exigences sera basée sur les numéros de chapitre uniquement.") except Exception as e: st.error(f"❌ Erreur lors du chargement du fichier des suspensions : {e}") self.locked_df = None def clean_lock_reasons(self): if self.locked_df is None or 'Lock reason' not in self.locked_df.columns: return self.locked_df['lock_reason_clean'] = self.locked_df['Lock reason'].astype(str).fillna('') \ .str.lower() \ .str.replace(r'[\n\r\t]', ' ', regex=True) \ .str.replace(r'[^\w\s\.\-\/\%]', ' ', regex=True) \ .str.replace(r'\s+', ' ', regex=True).str.strip() def extract_ifs_chapters(self, text): if pd.isna(text) or not isinstance(text, str) or text.strip() == '': return [] patterns = [ r'(?:ko|major|cl\.|req\.|requirement(?: item)?|chapter|section|point|§|cl\s+|clause)?\s*(\d\.\d{1,2}(?:\.\d{1,2})?)', # X.Y ou X.Y.Z avec préfixes r'(\d\.\d{1,2}(?:\.\d{1,2})?)\s*(?:ko|major|:|-|\(ko\)|\(major\))', # X.Y ou X.Y.Z suivi de KO/Major r'(\d{1,2})\s*-\s*ko', # Ex: 5.11.3 - KO (capture le numéro seul) r'requirement\s+(\d\.\d\.\d)', r'cl\s+(\d\.\d+(?:\.\d+)?)', # Ex: cl 4.12.1 r'§\s*(\d\.\d+(?:\.\d+)?)' # Ex: § 4.13.1 ] chapters_found = [] normalized_text = text.lower().replace('\n', ' ').replace('\r', ' ') for pattern in patterns: matches = re.findall(pattern, normalized_text) for match in matches: chapter_num = match if isinstance(match, str) else (match[-1] if isinstance(match, tuple) and match[-1] else match[0] if isinstance(match, tuple) and match[0] else None) if chapter_num: chapter_num = chapter_num.strip().rstrip('.').strip() if re.fullmatch(r'\d(\.\d+){1,2}', chapter_num) or re.fullmatch(r'\d\.\d+', chapter_num) : main_chapter_part = chapter_num.split('.')[0] if main_chapter_part.isdigit() and 1 <= int(main_chapter_part) <= 6: chapters_found.append(chapter_num) return sorted(list(set(chapters_found))) def analyze_themes(self): if self.locked_df is None or 'lock_reason_clean' not in self.locked_df.columns: return {}, {} theme_counts = {theme: 0 for theme in self.themes_keywords_definition} theme_details = {theme: [] for theme in self.themes_keywords_definition} for index, row in self.locked_df.iterrows(): reason_text = row['lock_reason_clean'] original_reason = row['Lock reason'] supplier = row['Supplier'] country = row.get('Country/Region', 'N/A') for theme, keywords in self.themes_keywords_definition.items(): if any(keyword in reason_text for keyword in keywords): theme_counts[theme] += 1 theme_details[theme].append({ "reason": original_reason, "supplier": supplier, "country": country }) return theme_counts, theme_details def geographic_analysis(self): if self.locked_df is None or 'Country/Region' not in self.locked_df.columns: return None return self.locked_df.groupby('Country/Region').size().sort_values(ascending=False).reset_index(name='total_suspensions') def clean_product_scopes(self, scope_text): if pd.isna(scope_text): return [] scope_text = str(scope_text) raw_scopes = re.split(r'[,;\s"\'’`]|et|\/|&|\.\s', scope_text) # Ajout de guillemets et point suivi d'espace cleaned_scopes = [] for scope in raw_scopes: scope = scope.strip().replace('"', '').replace("'", "") if not scope or not scope.isdigit(): continue # Ignorer si vide ou non numérique après nettoyage num = int(scope) if 1 <= num <= 11: cleaned_scopes.append(str(num)) elif num > 1000: # ex: 2005, 2007, 2009, 2010 potential_scope_2 = str(num % 100) # Pour 10, 11 potential_scope_1 = str(num % 10) # Pour 1-9 if potential_scope_2 in ['10', '11']: cleaned_scopes.append(potential_scope_2) elif potential_scope_1 in [str(i) for i in range(1,10)]: cleaned_scopes.append(potential_scope_1) return list(set(cleaned_scopes)) def product_scope_analysis(self): if self.locked_df is None or 'Product scopes' not in self.locked_df.columns: return None all_scopes = [] for scopes_text in self.locked_df['Product scopes'].dropna(): cleaned = self.clean_product_scopes(scopes_text) all_scopes.extend(cleaned) return Counter(all_scopes) def chapter_frequency_analysis(self): if self.locked_df is None or 'Lock reason' not in self.locked_df.columns: return Counter() all_chapters = [] for reason in self.locked_df['Lock reason'].dropna(): all_chapters.extend(self.extract_ifs_chapters(reason)) return Counter(all_chapters) def analyze_audit_types(self): if self.locked_df is None: return {}, {} audit_keywords = { 'INTEGRITY_PROGRAM_IP': ['integrity program', 'integrity', 'programme intégrité', 'programme integrity','onsite check', 'on site check', 'on-site check', 'on-site integrity check', 'ioc', 'i.o.c', 'ip audit', 'integrity audit', 'spot check', 'unannounced audit', 'audit inopiné', 'control inopiné', 'ifs integrity'], 'SURVEILLANCE_FOLLOW_UP': ['surveillance', 'surveillance audit', 'follow up audit', 'follow-up', 'suivi', 'corrective action'], 'COMPLAINT_WITHDRAWAL': ['complaint', 'réclamation', 'plainte', 'customer complaint', 'withdrawal', 'retrait', 'recall'], 'RECERTIFICATION_RENEWAL': ['recertification', 'renewal', 'renouvellement', 're-certification', 'renewal audit'] } audit_analysis = {audit_type: 0 for audit_type in audit_keywords} audit_examples = {audit_type: {'examples': [], 'countries': Counter()} for audit_type in audit_keywords} for index, row in self.locked_df.iterrows(): text_to_search = (str(row.get('Lock reason', '')) + " " + str(row.get('Lock history', ''))).lower() for audit_type, keywords in audit_keywords.items(): if any(keyword in text_to_search for keyword in keywords): audit_analysis[audit_type] += 1 if len(audit_examples[audit_type]['examples']) < 5: # Plus d'exemples audit_examples[audit_type]['examples'].append({ 'Supplier': row.get('Supplier', 'N/A'), 'Country/Region': row.get('Country/Region', 'N/A'), 'Lock reason': row.get('Lock reason', 'N/A') }) audit_examples[audit_type]['countries'][row.get('Country/Region', 'N/A')] += 1 for audit_type in audit_examples: audit_examples[audit_type]['countries'] = dict(audit_examples[audit_type]['countries'].most_common(5)) return audit_analysis, audit_examples def generate_ifs_recommendations_analysis(self): if self.locked_df is None or self.checklist_df is None: return None chapter_counts = self.chapter_frequency_analysis() if not chapter_counts: return None recommendations = [] for chapter, count in chapter_counts.most_common(): norm_chapter = chapter.replace("KO ", "").strip() req_text_series = self.checklist_df[self.checklist_df['Requirement Number'].astype(str).str.strip() == norm_chapter]['Requirement text (English)'] req_text = req_text_series.iloc[0] if not req_text_series.empty else "Texte de l'exigence non trouvé." # if len(req_text) > 250: req_text = req_text[:247] + "..." # Tronquer pour affichage résumé recommendations.append({'chapter': chapter, 'count': count, 'requirement_text': req_text}) return recommendations def cross_analysis_scope_themes(self): if self.locked_df is None or 'Product scopes' not in self.locked_df.columns or 'lock_reason_clean' not in self.locked_df.columns: return None themes_for_cross = { 'HYGIENE': self.themes_keywords_definition['HYGIENE_PERSONNEL'], 'HACCP': self.themes_keywords_definition['HACCP_CCP_OPRP'], 'TRACE': self.themes_keywords_definition['TRACEABILITY'], 'ALLERGEN': self.themes_keywords_definition['ALLERGEN_MANAGEMENT'], 'CLEAN': self.themes_keywords_definition['CLEANING_SANITATION'], 'MAINT': self.themes_keywords_definition['MAINTENANCE_EQUIPMENT'], 'LABEL': self.themes_keywords_definition['LABELLING'], 'PEST': self.themes_keywords_definition['PEST_CONTROL'] } scope_theme_data = [] for idx, row in self.locked_df.iterrows(): scopes_text, reason_text = row['Product scopes'], row['lock_reason_clean'] if pd.notna(scopes_text) and pd.notna(reason_text) and reason_text: for scope in self.clean_product_scopes(scopes_text): for theme, keywords in themes_for_cross.items(): if any(kw in reason_text for kw in keywords): scope_theme_data.append({'scope': f"Scope {scope}", 'theme': theme}) if not scope_theme_data: return None return pd.DataFrame(scope_theme_data).pivot_table(index='scope', columns='theme', aggfunc='size', fill_value=0) def _create_plotly_bar_chart(self, data_dict, title, orientation='v', xaxis_title="", yaxis_title="", color='royalblue', height=400): if not data_dict : return go.Figure() if orientation == 'h': y_data = list(data_dict.keys()) x_data = list(data_dict.values()) else: x_data = list(data_dict.keys()) y_data = list(data_dict.values()) fig = go.Figure(go.Bar(x=x_data, y=y_data, orientation=orientation, marker_color=color, text=x_data if orientation=='h' else y_data, textposition='auto')) fig.update_layout( title={'text': title, 'x':0.5, 'font': {'size': 16}}, xaxis_title=xaxis_title, yaxis_title=yaxis_title, height=height, margin=dict(l=10, r=10, t=50, b=10), yaxis=dict(autorange="reversed") if orientation == 'h' else {}, plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)' ) return fig def _create_plotly_choropleth_map(self, geo_data_df, title, height=500): if geo_data_df is None or geo_data_df.empty: return go.Figure() fig = px.choropleth(geo_data_df, locations="Country/Region", locationmode='country names', # S'assurer que les noms de pays sont compatibles color="total_suspensions", hover_name="Country/Region", color_continuous_scale=px.colors.sequential.Plasma, title=title, height=height) fig.update_layout( title={'x':0.5, 'font': {'size': 16}}, geo=dict(showframe=False, showcoastlines=False, projection_type='equirectangular'), margin=dict(l=10, r=10, t=50, b=10), plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)' ) return fig def _create_plotly_heatmap(self, pivot_matrix, title, height=500): if pivot_matrix is None or pivot_matrix.empty: return go.Figure() fig = px.imshow(pivot_matrix, text_auto=True, aspect="auto", color_continuous_scale='YlGnBu', title=title, height=height) fig.update_layout( title={'x':0.5, 'font': {'size': 16}}, margin=dict(l=10, r=10, t=50, b=10), xaxis=dict(tickangle=45), plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)' ) return fig # --- Méthodes pour le rapport PDF (utilisant Matplotlib) --- def _add_text_to_pdf_page(self, fig, text_lines, start_y=0.95, line_height=0.035, font_size=9, title="", title_font_size=14, max_chars_per_line=100): ax = fig.gca() ax.clear() ax.axis('off') if title: ax.text(0.5, start_y, title, ha='center', va='top', fontsize=title_font_size, fontweight='bold') start_y -= (line_height * 2.5) current_y = start_y for line in text_lines: wrapped_lines = [line[i:i+max_chars_per_line] for i in range(0, len(line), max_chars_per_line)] for wrapped_line in wrapped_lines: if current_y < 0.05: return False # Page pleine fw = 'bold' if line.startswith(tuple(["🎯","📊","🌍","🏭","📋","🔍"])) else 'normal' fs = font_size + 1 if fw == 'bold' else font_size ax.text(0.03, current_y, wrapped_line, ha='left', va='top', fontsize=fs, fontweight=fw) current_y -= line_height if not line.strip(): current_y -= (line_height * 0.5) return True def _create_matplotlib_figure_for_pdf(self, data_dict_or_df, title, x_label="", y_label="", chart_type='barh', top_n=10, color='skyblue', xtick_rotation=0, ytick_fontsize=8): if not data_dict_or_df and not isinstance(data_dict_or_df, pd.DataFrame) : return None fig, ax = plt.subplots(figsize=(10, 6.5)) items, values = [], [] if isinstance(data_dict_or_df, (Counter, dict)): sorted_data = dict(sorted(data_dict_or_df.items(), key=lambda item: item[1], reverse=True)[:top_n]) items = [str(k).replace('_',' ').replace('MANAGEMENT','MGMT').replace('RESPONSIBILITY','RESP.')[:30] for k in sorted_data.keys()] # Tronquer labels longs values = list(sorted_data.values()) elif isinstance(data_dict_or_df, pd.DataFrame): df_top = data_dict_or_df.head(top_n) if 'Country/Region' in df_top.columns and 'total_suspensions' in df_top.columns: items = df_top['Country/Region'].tolist() values = df_top['total_suspensions'].tolist() chart_type = 'bar' elif 'chapter' in df_top.columns and 'count' in df_top.columns and 'requirement_text' in df_top.columns: items = [f"{row['chapter']}\n({row['requirement_text'][:35]}...)" if row['requirement_text'] != "Texte de l'exigence non trouvé." else row['chapter'] for index, row in df_top.iterrows()] values = df_top['count'].tolist() chart_type = 'bar' else: # Cas générique if not df_top.empty: items = df_top.index.astype(str).tolist() if len(df_top.columns) == 1 else df_top.iloc[:,0].astype(str).tolist() values = df_top.iloc[:,0].tolist() if len(df_top.columns) == 1 else df_top.iloc[:,1].tolist() if not items or not values or all(v == 0 for v in values): return None if chart_type == 'barh': ax.barh(items, values, color=color, edgecolor='grey') ax.set_yticklabels(items, fontsize=ytick_fontsize) ax.invert_yaxis() ax.set_xlabel(x_label if x_label else 'Nombre de cas', fontsize=10) for i, v_ in enumerate(values): ax.text(v_ + (max(values)*0.01), i, str(v_), va='center', fontsize=8) ax.set_xlim(0, max(values) * 1.12 if values else 1) elif chart_type == 'bar': bars = ax.bar(items, values, color=color, edgecolor='grey') ax.set_xticklabels(items, rotation=xtick_rotation, ha='right' if xtick_rotation > 0 else 'center', fontsize=ytick_fontsize) ax.set_ylabel(y_label if y_label else 'Nombre de cas', fontsize=10) for bar in bars: yval = bar.get_height() ax.text(bar.get_x() + bar.get_width()/2.0, yval + (max(values)*0.01), int(yval), ha='center', va='bottom', fontsize=8) ax.set_ylim(0, max(values) * 1.12 if values else 1) ax.set_title(title, fontsize=13, fontweight='bold', pad=15) ax.grid(axis='x' if chart_type == 'barh' else 'y', linestyle='--', alpha=0.7) sns.despine(left=True, bottom=True) plt.tight_layout(pad=1.5) return fig def export_report_to_pdf(self, filename='IFS_Analysis_Report.pdf'): if self.locked_df is None: return None try: with PdfPages(filename) as pdf: # print(f"📄 Génération du rapport PDF: {filename}") total_suspensions = len(self.locked_df) if total_suspensions == 0: fig = plt.figure(figsize=(8.5, 11)); self._add_text_to_pdf_page(fig, ["Aucune donnée."], title="Rapport"); pdf.savefig(fig); plt.close(fig); return filename # Page 1: Couverture fig = plt.figure(figsize=(8.5, 11)) title_page_text = [ f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}", "", f"Source Suspensions: {st.session_state.get('locked_file_name_original', 'N/A')}", f"Source Checklist: {st.session_state.get('checklist_file_name_original', 'Non fournie')}", "", "📊 VUE D'ENSEMBLE", f" • Total suspensions IFS Food: {total_suspensions}", f" • Avec motifs: {self.locked_df['Lock reason'].notna().sum()} ({self.locked_df['Lock reason'].notna().sum()/total_suspensions*100:.1f}%)" ] audit_s, _ = self.analyze_audit_types(); total_as = sum(audit_s.values()) title_page_text.append(f" • Liées à audits spécifiques: {total_as} ({total_as/total_suspensions*100:.1f}%)") self._add_text_to_pdf_page(fig, title_page_text, title="Rapport d'Analyse IFS Food Safety"); pdf.savefig(fig, bbox_inches='tight'); plt.close(fig) # Graphiques tc, _ = self.analyze_themes(); fig_t = self._create_matplotlib_figure_for_pdf(tc, 'Top 10 Thèmes NC', color='indianred', ytick_fontsize=7); if fig_t: pdf.savefig(fig_t, bbox_inches='tight'); plt.close(fig_t) gs = self.geographic_analysis(); fig_g = self._create_matplotlib_figure_for_pdf(gs, 'Top 10 Pays', chart_type='bar', color='lightseagreen', xtick_rotation=30, ytick_fontsize=7); if fig_g: pdf.savefig(fig_g, bbox_inches='tight'); plt.close(fig_g) sc = self.product_scope_analysis(); sc_plot = {f"Sc {k}": v for k,v in sc.items()}; fig_s = self._create_matplotlib_figure_for_pdf(sc_plot, 'Top 10 Product Scopes', color='cornflowerblue', ytick_fontsize=7); if fig_s: pdf.savefig(fig_s, bbox_inches='tight'); plt.close(fig_s) reco = self.generate_ifs_recommendations_analysis() if reco: df_reco = pd.DataFrame(reco) fig_c = self._create_matplotlib_figure_for_pdf(df_reco, 'Top 10 Exigences IFS', chart_type='bar', color='gold', xtick_rotation=30, ytick_fontsize=6); else: cc_direct = self.chapter_frequency_analysis() fig_c = self._create_matplotlib_figure_for_pdf(cc_direct, 'Top 10 Chapitres IFS (Numéros)', chart_type='bar', color='gold', xtick_rotation=30, ytick_fontsize=7); if fig_c: pdf.savefig(fig_c, bbox_inches='tight'); plt.close(fig_c) # Heatmap cpm = self.cross_analysis_scope_themes() if cpm is not None and not cpm.empty: top_n = min(8, len(cpm.index)); scope_tots = cpm.sum(axis=1).sort_values(ascending=False) cpm_f = cpm.loc[scope_tots.head(top_n).index] if len(cpm.index) > top_n else cpm if not cpm_f.empty: fig_h, ax_h = plt.subplots(figsize=(10, max(5, len(cpm_f.index)*0.6))) sns.heatmap(cpm_f, annot=True, cmap="YlGnBu", fmt='d', ax=ax_h, annot_kws={"size":7}); ax_h.set_title('Corrélations: Thèmes vs Scopes (Top)', fontsize=13, pad=15) ax_h.tick_params(axis='x', labelsize=8, rotation=30, ha='right'); ax_h.tick_params(axis='y', labelsize=8) plt.tight_layout(pad=1.5); pdf.savefig(fig_h, bbox_inches='tight'); plt.close(fig_h) # Pages Texte Détaillées for gen_func, title_str, lh, fs, mcpl in [ (self.generate_detailed_theme_analysis, "Analyse Thématique Détaillée", 0.03, 8, 110), (self.generate_audit_analysis_report, "Analyse des Types d'Audits", 0.03, 8, 110) ]: fig = plt.figure(figsize=(8.5, 11)); s_io = io.StringIO(); gen_func(stream=s_io) self._add_text_to_pdf_page(fig, s_io.getvalue().splitlines(), title=title_str, line_height=lh, font_size=fs, max_chars_per_line=mcpl) pdf.savefig(fig, bbox_inches='tight'); plt.close(fig) if reco: fig = plt.figure(figsize=(8.5, 11)) req_tl = ["Note: Texte de l'exigence de la checklist IFS Food v8 (si fournie).\n"] for r_ in sorted(reco, key=lambda x: x['count'], reverse=True)[:20]: req_tl.extend([f"📋 Chap {r_['chapter']} ({r_['count']} mentions)", f" Txt: {r_['requirement_text']}", ""]) self._add_text_to_pdf_page(fig, req_tl, title="Détail Exigences IFS", line_height=0.028, font_size=7, max_chars_per_line=120) pdf.savefig(fig, bbox_inches='tight'); plt.close(fig) # print(f"✅ Rapport PDF complet généré: {filename}") return filename except Exception as e: st.error(f"❌ Erreur majeure PDF: {e}") return None # Pas de fallback txt dans Streamlit, l'erreur est affichée # --- Fonctions Utilitaires pour Streamlit --- @st.cache_data # Mise en cache des données téléchargées pour éviter de recharger def load_csv_data(uploaded_file): if uploaded_file is not None: try: return pd.read_csv(uploaded_file) except Exception as e: st.error(f"Erreur lors de la lecture du fichier CSV : {e}") return None return None @st.cache_resource # Mise en cache de l'objet analyseur def get_analyzer(locked_data_io, checklist_data_io): return IFSAnalyzer(locked_data_io, checklist_data_io) def download_checklist_from_github_st(url="https://raw.githubusercontent.com/M00N69/Action-plan/main/Guide%20Checklist_IFS%20Food%20V%208%20-%20CHECKLIST.csv"): try: response = requests.get(url, timeout=10) # Ajout d'un timeout response.raise_for_status() return io.StringIO(response.text) except requests.exceptions.RequestException as e: st.warning(f"Impossible de télécharger la checklist depuis GitHub ({e}). L'analyse des exigences sera limitée.") return None # --- Interface Streamlit --- def main(): st.title("🛡️ Analyseur de Sécurité Alimentaire IFS") st.markdown("Téléversez votre fichier de suspensions IFS pour générer une analyse détaillée et des visualisations.") # --- Sidebar pour les options --- with st.sidebar: st.header("⚙️ Options d'Analyse") locked_file_uploaded = st.file_uploader("1. Fichier des suspensions IFS (.csv)", type="csv", key="locked_uploader") st.markdown("---") checklist_source = st.radio( "2. Source de la Checklist IFS Food V8:", ("Utiliser celle de GitHub (Recommandé)", "Téléverser ma checklist", "Ne pas utiliser de checklist"), key="checklist_source_radio" ) checklist_file_uploaded = None if checklist_source == "Téléverser ma checklist": checklist_file_uploaded = st.file_uploader("Téléversez votre fichier checklist (.csv)", type="csv", key="checklist_uploader") # --- Logique principale --- if locked_file_uploaded is not None: # Stocker les noms de fichiers originaux pour le rapport PDF st.session_state.locked_file_name_original = locked_file_uploaded.name if checklist_file_uploaded: st.session_state.checklist_file_name_original = checklist_file_uploaded.name elif checklist_source == "Utiliser celle de GitHub (Recommandé)": st.session_state.checklist_file_name_original = "Checklist IFS Food V8 (GitHub)" else: st.session_state.checklist_file_name_original = "Non fournie" locked_data_io = io.BytesIO(locked_file_uploaded.getvalue()) # Convertir en BytesIO pour l'analyseur checklist_data_io = None if checklist_source == "Téléverser ma checklist" and checklist_file_uploaded is not None: checklist_data_io = io.BytesIO(checklist_file_uploaded.getvalue()) elif checklist_source == "Utiliser celle de GitHub (Recommandé)": # Ne pas mettre en cache le téléchargement lui-même, mais le résultat si nécessaire checklist_data_io = download_checklist_from_github_st() # Utiliser BytesIO pour l'analyseur analyzer = get_analyzer(locked_data_io, checklist_data_io) if analyzer.locked_df is not None and not analyzer.locked_df.empty: st.success(f"Fichier '{locked_file_uploaded.name}' chargé et analysé. {len(analyzer.locked_df)} suspensions IFS Food trouvées.") display_dashboard_tabs(analyzer) # Afficher les onglets avec les résultats # Bouton pour générer le rapport PDF st.sidebar.markdown("---") if st.sidebar.button("📄 Générer le Rapport PDF Complet", key="pdf_button"): with st.spinner("Génération du rapport PDF en cours... Veuillez patienter."): pdf_path = analyzer.export_report_to_pdf() # La méthode gère l'enregistrement if pdf_path: with open(pdf_path, "rb") as pdf_file: st.sidebar.download_button( label="📥 Télécharger le Rapport PDF", data=pdf_file, file_name="Analyse_IFS_Suspensions_Report.pdf", mime="application/pdf" ) st.sidebar.success("Rapport PDF généré !") else: st.sidebar.error("Erreur lors de la création du rapport PDF.") else: st.error("Aucune donnée IFS Food n'a été trouvée dans le fichier ou après filtrage. Veuillez vérifier votre fichier.") else: st.info("Veuillez téléverser un fichier CSV des suspensions IFS pour commencer l'analyse.") st.sidebar.markdown("---") st.sidebar.markdown("Développé avec ❤️ par IA") def display_dashboard_tabs(analyzer): tab_titles = [ "📊 Vue d'Ensemble", "🌍 Géographie", "🏷️ Thèmes Détaillés", "📋 Exigences IFS", "🕵️ Types d'Audits", "🔗 Analyse Croisée" ] tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs(tab_titles) with tab1: # Vue d'Ensemble st.header("📊 Vue d'Ensemble des Suspensions") col1, col2 = st.columns(2) total_suspensions = len(analyzer.locked_df) with_reasons_count = analyzer.locked_df['Lock reason'].notna().sum() audit_analysis_summary, _ = analyzer.analyze_audit_types() total_audit_special = sum(audit_analysis_summary.values()) with col1: st.metric("Total Suspensions IFS Food", total_suspensions) st.metric("Avec Motifs Documentés", f"{with_reasons_count} ({with_reasons_count/total_suspensions*100:.1f}%)" if total_suspensions > 0 else "0") with col2: st.metric("Liées à Audits Spécifiques (IP, etc.)", f"{total_audit_special} ({total_audit_special/total_suspensions*100:.1f}%)" if total_suspensions > 0 else "0") # Vous pouvez ajouter d'autres métriques ici st.markdown("---") st.subheader("Visualisations Clés") # Thèmes theme_counts, _ = analyzer.analyze_themes() if theme_counts: top_themes = dict(sorted(theme_counts.items(), key=lambda x:x[1], reverse=True)[:10]) top_themes_clean = {k.replace('_',' ').replace('MANAGEMENT','MGMT').replace('RESPONSIBILITY','RESP.'):v for k,v in top_themes.items() if v > 0} if top_themes_clean: st.plotly_chart(analyzer._create_plotly_bar_chart(top_themes_clean, "Top 10 Thèmes de Non-Conformités", orientation='h', color='indianred', height=450), use_container_width=True) # Product Scopes scope_counts = analyzer.product_scope_analysis() if scope_counts: top_scopes = dict(scope_counts.most_common(10)) top_scopes_clean = {f"Scope {k}": v for k, v in top_scopes.items() if v > 0} if top_scopes_clean: st.plotly_chart(analyzer._create_plotly_bar_chart(top_scopes_clean, "Top 10 Product Scopes Impactés", orientation='h', color='cornflowerblue', height=450), use_container_width=True) with tab2: # Géographie st.header("🌍 Analyse Géographique") geo_stats_df = analyzer.geographic_analysis() if geo_stats_df is not None and not geo_stats_df.empty: st.plotly_chart(analyzer._create_plotly_choropleth_map(geo_stats_df, "Suspensions par Pays"), use_container_width=True) st.markdown("---") st.subheader("Tableau des Suspensions par Pays (Top 15)") st.dataframe(geo_stats_df.head(15), use_container_width=True) else: st.info("Données géographiques non disponibles.") with tab3: # Thèmes Détaillés st.header("🏷️ Analyse Thématique Détaillée") theme_counts, theme_details = analyzer.analyze_themes() for theme, count in sorted(theme_counts.items(), key=lambda x: x[1], reverse=True): if count > 0: with st.expander(f"{theme.replace('_', ' ')} ({count} cas)", expanded=False): st.markdown(f"**Exemples de motifs (jusqu'à 3) pour le thème : {theme.replace('_', ' ')}**") for i, detail in enumerate(theme_details[theme][:3]): st.markdown(f"**Cas {i+1} (Fournisseur: {detail['supplier']}, Pays: {detail['country']})**") st.caption(f"{detail['reason'][:500]}...") # Afficher une partie du motif st.markdown("---") # Ajouter les pays les plus touchés pour ce thème theme_keywords_current_theme = analyzer.themes_keywords_definition.get(theme, []) if theme_keywords_current_theme: theme_mask = analyzer.locked_df['lock_reason_clean'].str.contains('|'.join(theme_keywords_current_theme), case=False, na=False, regex=True) if theme_mask.sum() > 0: theme_countries_df = analyzer.locked_df[theme_mask]['Country/Region'].value_counts().reset_index() theme_countries_df.columns = ['Pays', 'Nombre de cas'] if not theme_countries_df.empty: st.markdown("**Pays les plus affectés par ce thème :**") st.dataframe(theme_countries_df.head(5), use_container_width=True) with tab4: # Exigences IFS st.header("📋 Analyse des Exigences IFS") recommendations = analyzer.generate_ifs_recommendations_analysis() if recommendations and analyzer.checklist_df is not None: st.info("Les textes des exigences proviennent de la checklist IFS Food V8. Les numéros sont extraits des motifs de suspension.") df_reco = pd.DataFrame(recommendations) df_reco_sorted = df_reco.sort_values(by='count', ascending=False) # Graphique des exigences les plus citées top_reco_chart = df_reco_sorted.head(10).copy() # Pour le graphique # Raccourcir les labels pour le graphique top_reco_chart['display_label'] = top_reco_chart.apply(lambda row: f"{row['chapter']} ({row['requirement_text'][:25]}...)", axis=1) reco_chart_data = pd.Series(top_reco_chart['count'].values, index=top_reco_chart['display_label']).to_dict() st.plotly_chart(analyzer._create_plotly_bar_chart(reco_chart_data, "Top 10 Exigences IFS Citées", orientation='v', color='gold', height=500), use_container_width=True) st.markdown("---") st.subheader("Détail des Exigences Citées") for index, row in df_reco_sorted.iterrows(): with st.expander(f"Exigence {row['chapter']} ({row['count']} mentions)", expanded=False): st.markdown(f"**Texte de l'exigence :**") st.markdown(f"> {row['requirement_text']}") elif recommendations: # Pas de checklist mais des chapitres extraits st.warning("Checklist non chargée. Affichage des numéros de chapitres uniquement.") df_reco = pd.DataFrame(recommendations) df_reco_sorted = df_reco.sort_values(by='count', ascending=False) chapter_counts_dict = pd.Series(df_reco_sorted['count'].values, index=df_reco_sorted['chapter']).to_dict() st.plotly_chart(analyzer._create_plotly_bar_chart(chapter_counts_dict, "Top Chapitres IFS Cités (Numéros)", orientation='v', color='gold', height=500), use_container_width=True) st.dataframe(df_reco_sorted, use_container_width=True) else: st.info("Aucune exigence IFS spécifique n'a pu être extraite des motifs ou la checklist n'est pas disponible.") with tab5: # Types d'Audits st.header("🕵️ Analyse par Types d'Audits") audit_analysis, audit_examples = analyzer.analyze_audit_types() if audit_analysis: # Graphique audit_analysis_clean = {k.replace('_', ' '):v for k,v in audit_analysis.items() if v > 0} if audit_analysis_clean: st.plotly_chart(analyzer._create_plotly_bar_chart(audit_analysis_clean, "Répartition par Type d'Audit Spécifique", color='darkorange', height=400), use_container_width=True) st.markdown("---") st.subheader("Détails et Exemples par Type d'Audit") for audit_type, count in sorted(audit_analysis.items(), key=lambda x: x[1], reverse=True): if count > 0: with st.expander(f"{audit_type.replace('_', ' ')} ({count} cas)", expanded=False): st.markdown(f"**Exemples (jusqu'à 3) pour : {audit_type.replace('_', ' ')}**") for i, ex_data in enumerate(audit_examples[audit_type]['examples'][:3]): st.markdown(f"**Cas {i+1} (Fournisseur: {ex_data.get('Supplier', 'N/A')}, Pays: {ex_data.get('Country/Region', 'N/A')})**") st.caption(f"{ex_data.get('Lock reason', 'N/A')[:500]}...") st.markdown("---") countries_data = audit_examples[audit_type]['countries'] if countries_data: st.markdown(f"**Répartition géographique (Top 5 pays) pour ce type d'audit :** {', '.join([f'{c} ({n})' for c, n in countries_data.items()])}") else: st.info("Aucune donnée sur les types d'audits disponible.") with tab6: # Analyse Croisée st.header("🔗 Analyse Croisée : Thèmes vs Product Scopes") cross_pivot_matrix = analyzer.cross_analysis_scope_themes() if cross_pivot_matrix is not None and not cross_pivot_matrix.empty: # Pour la lisibilité, on peut filtrer les scopes avec peu de données ou transposer # Limiter le nombre de scopes affichés dans la heatmap pour la lisibilité top_n_scopes_for_heatmap = min(15, len(cross_pivot_matrix.index)) if len(cross_pivot_matrix.index) > top_n_scopes_for_heatmap: scope_totals = cross_pivot_matrix.sum(axis=1).sort_values(ascending=False) top_scopes_names = scope_totals.head(top_n_scopes_for_heatmap).index cross_pivot_matrix_filtered = cross_pivot_matrix.loc[top_scopes_names] else: cross_pivot_matrix_filtered = cross_pivot_matrix if not cross_pivot_matrix_filtered.empty: st.plotly_chart(analyzer._create_plotly_heatmap(cross_pivot_matrix_filtered, "Fréquence des Thèmes de NC par Product Scope (Top Scopes)", height=max(500, len(cross_pivot_matrix_filtered.index) * 40)), use_container_width=True) st.markdown("---") st.subheader("Tableau de Corrélation Complet (Scopes vs Thèmes)") st.dataframe(cross_pivot_matrix.style.background_gradient(cmap='YlGnBu', axis=None), use_container_width=True) # axis=None pour colorer toute la cellule else: st.info("Pas assez de données pour afficher la heatmap après filtrage.") else: st.info("Données insuffisantes pour l'analyse croisée Thèmes vs Product Scopes.") # --- Exécution de l'application --- if __name__ == "__main__": main()