Spaces:
Sleeping
Sleeping
| """ | |
| data_loader.py - Chargement et indexation des bases de données EcoALIM, GFLI et PDF CIR. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import json | |
| import re | |
| from functools import lru_cache | |
| from typing import Dict, List, Optional, Tuple | |
| from datasets import load_dataset,DownloadMode | |
| import pandas as pd | |
| import pdfplumber | |
| import config | |
| # ============================================================================ | |
| # EcoALIM | |
| # ============================================================================ | |
| def load_ecoalim() -> pd.DataFrame: | |
| """Charge la base EcoALIM (feuille FR) et renvoie un DataFrame nettoyé.""" | |
| def get_ecoalim_df() -> pd.DataFrame: | |
| if config.IS_PRODUCTION: | |
| print("#############") | |
| ecoalim = load_dataset("CCPA-GAIA/ECOALIM",data_files="ecoalim.csv", token=config.HF_KEY,download_mode=DownloadMode.FORCE_REDOWNLOAD) | |
| return ecoalim["train"].to_pandas() | |
| return pd.read_excel( | |
| config.ECOALIM_PATH, | |
| sheet_name=config.ECOALIM_SHEET, | |
| header=config.ECOALIM_HEADER_ROW, | |
| ) | |
| df = get_ecoalim_df() | |
| # Supprimer les lignes entièrement vides | |
| df = df.dropna(subset=[config.ECOALIM_COL_NOM]).reset_index(drop=True) | |
| # Normaliser les colonnes pays en minuscules pour faciliter la recherche | |
| for col in [config.ECOALIM_COL_PAYS_PROD, config.ECOALIM_COL_PAYS_TRANSFO]: | |
| if col in df.columns: | |
| df[col] = df[col].astype(str).str.strip().str.lower() | |
| return df | |
| def _normalize_for_search(text: str) -> str: | |
| """Normalise un texte pour la recherche (accents, casse, ponctuation).""" | |
| import unicodedata | |
| text = text.lower().strip() | |
| # Normalize unicode accents | |
| nfkd = unicodedata.normalize('NFKD', text) | |
| ascii_text = ''.join(c for c in nfkd if not unicodedata.combining(c)) | |
| return ascii_text | |
| _STOPWORDS_FR = { | |
| "de", "du", "des", "la", "le", "les", "d", "l", "a", "au", "aux" | |
| } | |
| def _tokens_for_search(text: str) -> list[str]: | |
| """Découpe un texte en tokens utiles pour une recherche souple.""" | |
| text = _normalize_for_search(text) | |
| tokens = re.findall(r"[a-z0-9]+", text) | |
| return [t for t in tokens if t and t not in _STOPWORDS_FR] | |
| def is_name_match(matiere: str, intrant_name: str) -> bool: | |
| """ | |
| Vérifie si le nom de la matière est une correspondance réelle (mot entier) | |
| dans le nom de l'intrant, et non un simple sous-chaîne accidentelle. | |
| Ex : "blé" ne matche PAS "blend", mais matche "Blé tendre". | |
| """ | |
| mat_norm = _normalize_for_search(matiere) | |
| int_norm = _normalize_for_search(intrant_name) | |
| if mat_norm == int_norm: | |
| return True | |
| # Le mot de la matière doit apparaître comme mot entier dans l'intrant | |
| pattern = r'\b' + re.escape(mat_norm) + r'\b' | |
| return bool(re.search(pattern, int_norm)) | |
| def search_ecoalim( | |
| matiere: str, | |
| pays_production: Optional[str] = None, | |
| pays_transformation: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Cherche dans EcoALIM les lignes correspondant à une matière première. | |
| Utilise une recherche intelligente avec priorisation : | |
| 1. Nom commence par la matière | |
| 2. Mot entier trouvé dans le nom | |
| 3. Contient la matière (substring) | |
| Retourne un DataFrame filtré et trié par pertinence (peut être vide). | |
| """ | |
| df = load_ecoalim() | |
| matiere_norm = _normalize_for_search(matiere) | |
| # Build normalized column for search | |
| nom_col = config.ECOALIM_COL_NOM | |
| df_norms = df[nom_col].apply(lambda x: _normalize_for_search(str(x)) if pd.notna(x) else "") | |
| # Create priority masks | |
| mask_starts = df_norms.str.startswith(matiere_norm, na=False) | |
| pattern_word = r'\b' + re.escape(matiere_norm) + r'\b' | |
| mask_word = df_norms.str.contains(pattern_word, na=False, regex=True) | |
| tokens = _tokens_for_search(matiere_norm) | |
| mask_tokens = pd.Series(False, index=df.index) | |
| if tokens: | |
| mask_tokens = df_norms.apply( | |
| lambda x: all(t in _tokens_for_search(x) for t in tokens) | |
| ) | |
| mask_contains = df_norms.str.contains(re.escape(matiere_norm), na=False) | |
| # Use best available mask with priority | |
| if mask_starts.any(): | |
| mask = mask_starts | |
| elif mask_word.any(): | |
| mask = mask_word | |
| elif mask_tokens.any(): | |
| mask = mask_tokens | |
| elif mask_contains.any(): | |
| mask = mask_contains | |
| else: | |
| return pd.DataFrame(columns=df.columns) | |
| if pays_production: | |
| pays_prod_low = pays_production.lower().strip() | |
| mask_pays = df[config.ECOALIM_COL_PAYS_PROD].str.contains( | |
| re.escape(pays_prod_low), na=False | |
| ) | |
| combined = mask & mask_pays | |
| if combined.any(): | |
| mask = combined | |
| if pays_transformation: | |
| pays_transfo_low = pays_transformation.lower().strip() | |
| mask_transfo = df[config.ECOALIM_COL_PAYS_TRANSFO].str.contains( | |
| re.escape(pays_transfo_low), na=False | |
| ) | |
| combined = mask & mask_transfo | |
| if combined.any(): | |
| mask = combined | |
| result = df[mask].copy() | |
| # Sort by relevance: entries starting with the search term come first | |
| if not result.empty: | |
| result_norms = result[nom_col].apply(lambda x: _normalize_for_search(str(x))) | |
| result["_priority"] = 3 | |
| result.loc[result_norms.str.contains(pattern_word, na=False, regex=True), "_priority"] = 1 | |
| result.loc[result_norms.str.startswith(matiere_norm, na=False), "_priority"] = 0 | |
| result.loc[result_norms.apply(lambda x: all(t in _tokens_for_search(x) for t in tokens)), "_priority"] = 2 | |
| # Prefer OS outputs over champ when ties exist | |
| result["_os_priority"] = 1 | |
| result.loc[result_norms.str.contains("sortie os", na=False), "_os_priority"] = 0 | |
| result = result.sort_values(["_priority", "_os_priority"]).drop(columns=["_priority", "_os_priority"]) | |
| return result | |
| def get_ecoalim_climate_value( | |
| matiere: str, | |
| pays_production: Optional[str] = None, | |
| pays_transformation: Optional[str] = None, | |
| ) -> Optional[Tuple[float, str, str]]: | |
| """ | |
| Retourne (valeur_kg_co2_eq, nom_intrant, source_info) ou None. | |
| Unité EcoALIM : kg CO2 eq / kg de produit. | |
| """ | |
| results = search_ecoalim(matiere, pays_production, pays_transformation) | |
| if results.empty: | |
| return None | |
| # Prendre la première correspondance (ou la plus défavorable si demandé) | |
| row = results.iloc[0] | |
| val = row.get(config.ECOALIM_COL_CLIMATE) | |
| if pd.isna(val): | |
| return None | |
| nom = row.get(config.ECOALIM_COL_NOM, matiere) | |
| return (float(val), str(nom), "ECOALIM") | |
| def get_ecoalim_worst_value(matiere: str) -> Optional[Tuple[float, str, str]]: | |
| """Retourne la valeur la plus défavorable (max) pour cette matière dans EcoALIM.""" | |
| results = search_ecoalim(matiere) | |
| if results.empty: | |
| return None | |
| climate_col = config.ECOALIM_COL_CLIMATE | |
| results_valid = results.dropna(subset=[climate_col]) | |
| if results_valid.empty: | |
| return None | |
| idx = results_valid[climate_col].idxmax() | |
| row = results_valid.loc[idx] | |
| return (float(row[climate_col]), str(row[config.ECOALIM_COL_NOM]), "ECOALIM (valeur la plus défavorable)") | |
| # ============================================================================ | |
| # GFLI | |
| # ============================================================================ | |
| def load_gfli() -> pd.DataFrame: | |
| """Charge la base GFLI (Economic allocation EF3.1).""" | |
| def get_glfi_df() -> pd.DataFrame: | |
| if config.IS_PRODUCTION: | |
| glfi_dataset = load_dataset("CCPA-GAIA/ECOALIM",data_files="glfi.csv", token=config.HF_KEY,download_mode=DownloadMode.FORCE_REDOWNLOAD) | |
| return glfi_dataset["train"].to_pandas() | |
| return pd.read_excel( | |
| config.GFLI_PATH, | |
| sheet_name=config.GFLI_SHEET, | |
| ) | |
| df = get_glfi_df() | |
| df = df.dropna(subset=[config.GFLI_COL_PRODUCT]).reset_index(drop=True) | |
| return df | |
| def _extract_gfli_country(product_name: str) -> Optional[str]: | |
| """Extrait le code pays ISO d'un nom de produit GFLI (ex: '.../FR Economic S' -> 'FR').""" | |
| m = re.search(r"/([A-Z]{2,3})\s+Economic\s+S", product_name) | |
| return m.group(1) if m else None | |
| def _extract_gfli_base_name(product_name: str) -> str: | |
| """Extrait le nom de base du produit GFLI (sans le code pays).""" | |
| m = re.match(r"(.+)/[A-Z]{2,3}\s+Economic\s+S", product_name) | |
| return m.group(1).strip() if m else product_name.strip() | |
| def search_gfli( | |
| matiere: str, | |
| country_iso: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Recherche dans GFLI par nom de matière (en anglais) et optionnellement par pays ISO. | |
| Uses word-boundary matching for better precision. | |
| """ | |
| logging.info(f"Searching GLFI with args matiere: {matiere}, country_iso: {country_iso}") | |
| df = load_gfli() | |
| matiere_norm = _normalize_for_search(matiere) | |
| prod_col = config.GFLI_COL_PRODUCT | |
| df_norms = df[prod_col].apply(lambda x: _normalize_for_search(str(x)) if pd.notna(x) else "") | |
| # Strategy 1: starts-with | |
| mask = df_norms.str.startswith(matiere_norm, na=False) | |
| # Strategy 2: word-boundary match | |
| if not mask.any(): | |
| pattern_word = r'\b' + re.escape(matiere_norm) + r'\b' | |
| mask = df_norms.str.contains(pattern_word, na=False, regex=True) | |
| # Strategy 3: token-subset match (souple) | |
| if not mask.any(): | |
| tokens = _tokens_for_search(matiere_norm) | |
| if tokens: | |
| mask = df_norms.apply(lambda x: all(t in _tokens_for_search(x) for t in tokens)) | |
| # Strategy 4: contains | |
| if not mask.any(): | |
| mask = df_norms.str.contains(re.escape(matiere_norm), na=False) | |
| if country_iso: | |
| country_upper = country_iso.upper().strip() | |
| mask_country = df[prod_col].str.contains( | |
| rf"/{re.escape(country_upper)}\s+Economic\s+S", na=False, regex=True | |
| ) | |
| # Filtrage strict : si un pays est demandé, ne retourner QUE les résultats de ce pays | |
| mask = mask & mask_country | |
| logging.info("Masked df: %s", df[mask].head()) | |
| return df[mask].copy() | |
| def get_gfli_climate_value( | |
| matiere: str, | |
| country_iso: Optional[str] = None, | |
| ) -> Optional[Tuple[float, str, str]]: | |
| """ | |
| Retourne (valeur_kg_co2_eq_par_tonne, nom_produit, source_info) ou None. | |
| Unité GFLI : kg CO2 eq / tonne de produit. | |
| """ | |
| results = search_gfli(matiere, country_iso) | |
| if results.empty: | |
| return None | |
| row = results.iloc[0] | |
| val = row.get(config.GFLI_COL_CLIMATE) | |
| if pd.isna(val): | |
| return None | |
| nom = row.get(config.GFLI_COL_PRODUCT, matiere) | |
| return (float(val), str(nom), "GFLI") | |
| def get_gfli_worst_value(matiere: str) -> Optional[Tuple[float, str, str]]: | |
| """Retourne la valeur la plus défavorable (max) pour cette matière dans GFLI.""" | |
| results = search_gfli(matiere) | |
| if results.empty: | |
| return None | |
| climate_col = config.GFLI_COL_CLIMATE | |
| results_valid = results.dropna(subset=[climate_col]) | |
| if results_valid.empty: | |
| return None | |
| idx = results_valid[climate_col].idxmax() | |
| row = results_valid.loc[idx] | |
| return (float(row[climate_col]), str(row[config.GFLI_COL_PRODUCT]), "GFLI (valeur la plus défavorable)") | |
| def get_gfli_rer_value(matiere: str) -> Optional[Tuple[float, str, str]]: | |
| """Retourne la valeur Mix Européen (RER) dans GFLI.""" | |
| return get_gfli_climate_value(matiere, "RER") | |
| def get_gfli_glo_value(matiere: str) -> Optional[Tuple[float, str, str]]: | |
| """Retourne la valeur Mix Monde (GLO) dans GFLI.""" | |
| return get_gfli_climate_value(matiere, "GLO") | |
| # ============================================================================ | |
| # GFLI - Listes utilitaires | |
| # ============================================================================ | |
| def get_gfli_base_products() -> List[str]: | |
| """Retourne la liste des noms de base de produits uniques dans GFLI.""" | |
| df = load_gfli() | |
| products = df[config.GFLI_COL_PRODUCT].dropna().unique() | |
| base_names = set() | |
| for p in products: | |
| base_names.add(_extract_gfli_base_name(str(p))) | |
| return sorted(base_names) | |
| def get_ecoalim_matieres() -> List[str]: | |
| """Retourne la liste des matières premières uniques dans EcoALIM.""" | |
| df = load_ecoalim() | |
| return sorted(df[config.ECOALIM_COL_NOM].dropna().unique().tolist()) | |
| # ============================================================================ | |
| # Fonctions multi-candidats (pour affichage comparatif) | |
| # ============================================================================ | |
| def get_top_ecoalim_candidates( | |
| matiere: str, | |
| pays_production: Optional[str] = None, | |
| pays_transformation: Optional[str] = None, | |
| top_n: Optional[int] = 8, | |
| ) -> List[Dict]: | |
| """ | |
| Retourne les top N correspondances EcoALIM triées par pertinence, | |
| chacune avec nom + valeur impact. | |
| """ | |
| results = search_ecoalim(matiere, pays_production, pays_transformation) | |
| if results.empty: | |
| return [] | |
| candidates = [] | |
| rows = results if top_n is None else results.head(top_n) | |
| for _, row in rows.iterrows(): | |
| val = row.get(config.ECOALIM_COL_CLIMATE) | |
| if pd.notna(val): | |
| candidates.append({ | |
| "nom": str(row[config.ECOALIM_COL_NOM]), | |
| "impact": float(val), | |
| "unite": "kg CO2 eq / kg", | |
| "source": "ECOALIM", | |
| }) | |
| return candidates | |
| def get_top_gfli_candidates( | |
| matiere: str, | |
| country_iso: Optional[str] = None, | |
| top_n: Optional[int] = 8, | |
| ) -> List[Dict]: | |
| """ | |
| Retourne les top N correspondances GFLI triées par pertinence, | |
| chacune avec nom + valeur impact. | |
| """ | |
| results = search_gfli(matiere, country_iso) | |
| if results.empty: | |
| return [] | |
| candidates = [] | |
| rows = results if top_n is None else results.head(top_n) | |
| for _, row in rows.iterrows(): | |
| val = row.get(config.GFLI_COL_CLIMATE) | |
| if pd.notna(val): | |
| candidates.append({ | |
| "nom": str(row[config.GFLI_COL_PRODUCT]), | |
| "impact": float(val), | |
| "unite": "kg CO2 eq / tonne", | |
| "source": "GFLI", | |
| }) | |
| return candidates | |
| # ============================================================================ | |
| # PDF CIR - Catalogue des Matières Premières | |
| # ============================================================================ | |
| def load_pdf_text() -> str: | |
| """Charge et retourne le texte complet du PDF CIR.""" | |
| full_text = [] | |
| if config.IS_PRODUCTION: | |
| dataset = load_dataset( | |
| "CCPA-GAIA/ECOALIM", | |
| data_files=config.PDF_CIR_PATH.split("/")[-1], | |
| token=config.HF_KEY | |
| ) | |
| pdf = dataset["train"][0]["pdf"] | |
| # Assuming this pdf object behaves like pdfplumber | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| if text: | |
| full_text.append(text) | |
| else: | |
| # Keep everything inside `with` | |
| with pdfplumber.open(config.PDF_CIR_PATH) as pdf: | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| if text: | |
| full_text.append(text) | |
| return "\n\n".join(full_text) | |
| def get_pdf_excerpt(max_chars: int = 15000) -> str: | |
| """Retourne un extrait du PDF CIR (tronqué si nécessaire) pour envoi au LLM.""" | |
| text = load_pdf_text() | |
| if len(text) > max_chars: | |
| return text[:max_chars] + "\n... [texte tronqué]" | |
| return text | |
| # ============================================================================ | |
| # Logigramme | |
| # ============================================================================ | |
| def load_logigramme() -> dict: | |
| """Charge le logigramme JSON.""" | |
| with open(config.LOGIGRAMME_PATH, "r", encoding="utf-8") as f: | |
| return json.load(f) | |