""" data_loader.py - Chargement et indexation des bases de données EcoALIM, GFLI et PDF CIR. """ from __future__ import annotations import logging import json import re from functools import lru_cache from typing import Dict, List, Optional, Tuple from datasets import load_dataset,DownloadMode import pandas as pd import pdfplumber import config # ============================================================================ # EcoALIM # ============================================================================ @lru_cache(maxsize=1) def load_ecoalim() -> pd.DataFrame: """Charge la base EcoALIM (feuille FR) et renvoie un DataFrame nettoyé.""" def get_ecoalim_df() -> pd.DataFrame: if config.IS_PRODUCTION: print("#############") ecoalim = load_dataset("CCPA-GAIA/ECOALIM",data_files="ecoalim.csv", token=config.HF_KEY,download_mode=DownloadMode.FORCE_REDOWNLOAD) return ecoalim["train"].to_pandas() return pd.read_excel( config.ECOALIM_PATH, sheet_name=config.ECOALIM_SHEET, header=config.ECOALIM_HEADER_ROW, ) df = get_ecoalim_df() # Supprimer les lignes entièrement vides df = df.dropna(subset=[config.ECOALIM_COL_NOM]).reset_index(drop=True) # Normaliser les colonnes pays en minuscules pour faciliter la recherche for col in [config.ECOALIM_COL_PAYS_PROD, config.ECOALIM_COL_PAYS_TRANSFO]: if col in df.columns: df[col] = df[col].astype(str).str.strip().str.lower() return df def _normalize_for_search(text: str) -> str: """Normalise un texte pour la recherche (accents, casse, ponctuation).""" import unicodedata text = text.lower().strip() # Normalize unicode accents nfkd = unicodedata.normalize('NFKD', text) ascii_text = ''.join(c for c in nfkd if not unicodedata.combining(c)) return ascii_text _STOPWORDS_FR = { "de", "du", "des", "la", "le", "les", "d", "l", "a", "au", "aux" } def _tokens_for_search(text: str) -> list[str]: """Découpe un texte en tokens utiles pour une recherche souple.""" text = _normalize_for_search(text) tokens = re.findall(r"[a-z0-9]+", text) return [t for t in tokens if t and t not in _STOPWORDS_FR] def is_name_match(matiere: str, intrant_name: str) -> bool: """ Vérifie si le nom de la matière est une correspondance réelle (mot entier) dans le nom de l'intrant, et non un simple sous-chaîne accidentelle. Ex : "blé" ne matche PAS "blend", mais matche "Blé tendre". """ mat_norm = _normalize_for_search(matiere) int_norm = _normalize_for_search(intrant_name) if mat_norm == int_norm: return True # Le mot de la matière doit apparaître comme mot entier dans l'intrant pattern = r'\b' + re.escape(mat_norm) + r'\b' return bool(re.search(pattern, int_norm)) def search_ecoalim( matiere: str, pays_production: Optional[str] = None, pays_transformation: Optional[str] = None, ) -> pd.DataFrame: """ Cherche dans EcoALIM les lignes correspondant à une matière première. Utilise une recherche intelligente avec priorisation : 1. Nom commence par la matière 2. Mot entier trouvé dans le nom 3. Contient la matière (substring) Retourne un DataFrame filtré et trié par pertinence (peut être vide). """ df = load_ecoalim() matiere_norm = _normalize_for_search(matiere) # Build normalized column for search nom_col = config.ECOALIM_COL_NOM df_norms = df[nom_col].apply(lambda x: _normalize_for_search(str(x)) if pd.notna(x) else "") # Create priority masks mask_starts = df_norms.str.startswith(matiere_norm, na=False) pattern_word = r'\b' + re.escape(matiere_norm) + r'\b' mask_word = df_norms.str.contains(pattern_word, na=False, regex=True) tokens = _tokens_for_search(matiere_norm) mask_tokens = pd.Series(False, index=df.index) if tokens: mask_tokens = df_norms.apply( lambda x: all(t in _tokens_for_search(x) for t in tokens) ) mask_contains = df_norms.str.contains(re.escape(matiere_norm), na=False) # Use best available mask with priority if mask_starts.any(): mask = mask_starts elif mask_word.any(): mask = mask_word elif mask_tokens.any(): mask = mask_tokens elif mask_contains.any(): mask = mask_contains else: return pd.DataFrame(columns=df.columns) if pays_production: pays_prod_low = pays_production.lower().strip() mask_pays = df[config.ECOALIM_COL_PAYS_PROD].str.contains( re.escape(pays_prod_low), na=False ) combined = mask & mask_pays if combined.any(): mask = combined if pays_transformation: pays_transfo_low = pays_transformation.lower().strip() mask_transfo = df[config.ECOALIM_COL_PAYS_TRANSFO].str.contains( re.escape(pays_transfo_low), na=False ) combined = mask & mask_transfo if combined.any(): mask = combined result = df[mask].copy() # Sort by relevance: entries starting with the search term come first if not result.empty: result_norms = result[nom_col].apply(lambda x: _normalize_for_search(str(x))) result["_priority"] = 3 result.loc[result_norms.str.contains(pattern_word, na=False, regex=True), "_priority"] = 1 result.loc[result_norms.str.startswith(matiere_norm, na=False), "_priority"] = 0 result.loc[result_norms.apply(lambda x: all(t in _tokens_for_search(x) for t in tokens)), "_priority"] = 2 # Prefer OS outputs over champ when ties exist result["_os_priority"] = 1 result.loc[result_norms.str.contains("sortie os", na=False), "_os_priority"] = 0 result = result.sort_values(["_priority", "_os_priority"]).drop(columns=["_priority", "_os_priority"]) return result def get_ecoalim_climate_value( matiere: str, pays_production: Optional[str] = None, pays_transformation: Optional[str] = None, ) -> Optional[Tuple[float, str, str]]: """ Retourne (valeur_kg_co2_eq, nom_intrant, source_info) ou None. Unité EcoALIM : kg CO2 eq / kg de produit. """ results = search_ecoalim(matiere, pays_production, pays_transformation) if results.empty: return None # Prendre la première correspondance (ou la plus défavorable si demandé) row = results.iloc[0] val = row.get(config.ECOALIM_COL_CLIMATE) if pd.isna(val): return None nom = row.get(config.ECOALIM_COL_NOM, matiere) return (float(val), str(nom), "ECOALIM") def get_ecoalim_worst_value(matiere: str) -> Optional[Tuple[float, str, str]]: """Retourne la valeur la plus défavorable (max) pour cette matière dans EcoALIM.""" results = search_ecoalim(matiere) if results.empty: return None climate_col = config.ECOALIM_COL_CLIMATE results_valid = results.dropna(subset=[climate_col]) if results_valid.empty: return None idx = results_valid[climate_col].idxmax() row = results_valid.loc[idx] return (float(row[climate_col]), str(row[config.ECOALIM_COL_NOM]), "ECOALIM (valeur la plus défavorable)") # ============================================================================ # GFLI # ============================================================================ @lru_cache(maxsize=1) def load_gfli() -> pd.DataFrame: """Charge la base GFLI (Economic allocation EF3.1).""" def get_glfi_df() -> pd.DataFrame: if config.IS_PRODUCTION: glfi_dataset = load_dataset("CCPA-GAIA/ECOALIM",data_files="glfi.csv", token=config.HF_KEY,download_mode=DownloadMode.FORCE_REDOWNLOAD) return glfi_dataset["train"].to_pandas() return pd.read_excel( config.GFLI_PATH, sheet_name=config.GFLI_SHEET, ) df = get_glfi_df() df = df.dropna(subset=[config.GFLI_COL_PRODUCT]).reset_index(drop=True) return df def _extract_gfli_country(product_name: str) -> Optional[str]: """Extrait le code pays ISO d'un nom de produit GFLI (ex: '.../FR Economic S' -> 'FR').""" m = re.search(r"/([A-Z]{2,3})\s+Economic\s+S", product_name) return m.group(1) if m else None def _extract_gfli_base_name(product_name: str) -> str: """Extrait le nom de base du produit GFLI (sans le code pays).""" m = re.match(r"(.+)/[A-Z]{2,3}\s+Economic\s+S", product_name) return m.group(1).strip() if m else product_name.strip() def search_gfli( matiere: str, country_iso: Optional[str] = None, ) -> pd.DataFrame: """ Recherche dans GFLI par nom de matière (en anglais) et optionnellement par pays ISO. Uses word-boundary matching for better precision. """ logging.info(f"Searching GLFI with args matiere: {matiere}, country_iso: {country_iso}") df = load_gfli() matiere_norm = _normalize_for_search(matiere) prod_col = config.GFLI_COL_PRODUCT df_norms = df[prod_col].apply(lambda x: _normalize_for_search(str(x)) if pd.notna(x) else "") # Strategy 1: starts-with mask = df_norms.str.startswith(matiere_norm, na=False) # Strategy 2: word-boundary match if not mask.any(): pattern_word = r'\b' + re.escape(matiere_norm) + r'\b' mask = df_norms.str.contains(pattern_word, na=False, regex=True) # Strategy 3: token-subset match (souple) if not mask.any(): tokens = _tokens_for_search(matiere_norm) if tokens: mask = df_norms.apply(lambda x: all(t in _tokens_for_search(x) for t in tokens)) # Strategy 4: contains if not mask.any(): mask = df_norms.str.contains(re.escape(matiere_norm), na=False) if country_iso: country_upper = country_iso.upper().strip() mask_country = df[prod_col].str.contains( rf"/{re.escape(country_upper)}\s+Economic\s+S", na=False, regex=True ) # Filtrage strict : si un pays est demandé, ne retourner QUE les résultats de ce pays mask = mask & mask_country logging.info("Masked df: %s", df[mask].head()) return df[mask].copy() def get_gfli_climate_value( matiere: str, country_iso: Optional[str] = None, ) -> Optional[Tuple[float, str, str]]: """ Retourne (valeur_kg_co2_eq_par_tonne, nom_produit, source_info) ou None. Unité GFLI : kg CO2 eq / tonne de produit. """ results = search_gfli(matiere, country_iso) if results.empty: return None row = results.iloc[0] val = row.get(config.GFLI_COL_CLIMATE) if pd.isna(val): return None nom = row.get(config.GFLI_COL_PRODUCT, matiere) return (float(val), str(nom), "GFLI") def get_gfli_worst_value(matiere: str) -> Optional[Tuple[float, str, str]]: """Retourne la valeur la plus défavorable (max) pour cette matière dans GFLI.""" results = search_gfli(matiere) if results.empty: return None climate_col = config.GFLI_COL_CLIMATE results_valid = results.dropna(subset=[climate_col]) if results_valid.empty: return None idx = results_valid[climate_col].idxmax() row = results_valid.loc[idx] return (float(row[climate_col]), str(row[config.GFLI_COL_PRODUCT]), "GFLI (valeur la plus défavorable)") def get_gfli_rer_value(matiere: str) -> Optional[Tuple[float, str, str]]: """Retourne la valeur Mix Européen (RER) dans GFLI.""" return get_gfli_climate_value(matiere, "RER") def get_gfli_glo_value(matiere: str) -> Optional[Tuple[float, str, str]]: """Retourne la valeur Mix Monde (GLO) dans GFLI.""" return get_gfli_climate_value(matiere, "GLO") # ============================================================================ # GFLI - Listes utilitaires # ============================================================================ def get_gfli_base_products() -> List[str]: """Retourne la liste des noms de base de produits uniques dans GFLI.""" df = load_gfli() products = df[config.GFLI_COL_PRODUCT].dropna().unique() base_names = set() for p in products: base_names.add(_extract_gfli_base_name(str(p))) return sorted(base_names) def get_ecoalim_matieres() -> List[str]: """Retourne la liste des matières premières uniques dans EcoALIM.""" df = load_ecoalim() return sorted(df[config.ECOALIM_COL_NOM].dropna().unique().tolist()) # ============================================================================ # Fonctions multi-candidats (pour affichage comparatif) # ============================================================================ def get_top_ecoalim_candidates( matiere: str, pays_production: Optional[str] = None, pays_transformation: Optional[str] = None, top_n: Optional[int] = 8, ) -> List[Dict]: """ Retourne les top N correspondances EcoALIM triées par pertinence, chacune avec nom + valeur impact. """ results = search_ecoalim(matiere, pays_production, pays_transformation) if results.empty: return [] candidates = [] rows = results if top_n is None else results.head(top_n) for _, row in rows.iterrows(): val = row.get(config.ECOALIM_COL_CLIMATE) if pd.notna(val): candidates.append({ "nom": str(row[config.ECOALIM_COL_NOM]), "impact": float(val), "unite": "kg CO2 eq / kg", "source": "ECOALIM", }) return candidates def get_top_gfli_candidates( matiere: str, country_iso: Optional[str] = None, top_n: Optional[int] = 8, ) -> List[Dict]: """ Retourne les top N correspondances GFLI triées par pertinence, chacune avec nom + valeur impact. """ results = search_gfli(matiere, country_iso) if results.empty: return [] candidates = [] rows = results if top_n is None else results.head(top_n) for _, row in rows.iterrows(): val = row.get(config.GFLI_COL_CLIMATE) if pd.notna(val): candidates.append({ "nom": str(row[config.GFLI_COL_PRODUCT]), "impact": float(val), "unite": "kg CO2 eq / tonne", "source": "GFLI", }) return candidates # ============================================================================ # PDF CIR - Catalogue des Matières Premières # ============================================================================ @lru_cache(maxsize=1) def load_pdf_text() -> str: """Charge et retourne le texte complet du PDF CIR.""" full_text = [] if config.IS_PRODUCTION: dataset = load_dataset( "CCPA-GAIA/ECOALIM", data_files=config.PDF_CIR_PATH.split("/")[-1], token=config.HF_KEY ) pdf = dataset["train"][0]["pdf"] # Assuming this pdf object behaves like pdfplumber for page in pdf.pages: text = page.extract_text() if text: full_text.append(text) else: # Keep everything inside `with` with pdfplumber.open(config.PDF_CIR_PATH) as pdf: for page in pdf.pages: text = page.extract_text() if text: full_text.append(text) return "\n\n".join(full_text) def get_pdf_excerpt(max_chars: int = 15000) -> str: """Retourne un extrait du PDF CIR (tronqué si nécessaire) pour envoi au LLM.""" text = load_pdf_text() if len(text) > max_chars: return text[:max_chars] + "\n... [texte tronqué]" return text # ============================================================================ # Logigramme # ============================================================================ @lru_cache(maxsize=1) def load_logigramme() -> dict: """Charge le logigramme JSON.""" with open(config.LOGIGRAMME_PATH, "r", encoding="utf-8") as f: return json.load(f)