Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json, re, unicodedata, ast, os | |
| from typing import List, Dict, Any, Optional | |
| import requests | |
| from smolagents import Tool, CodeAgent, InferenceClientModel | |
| from sentence_transformers import SentenceTransformer, util | |
| # --- Config runtime via env (avec valeurs par défaut sûres sur Space) --- | |
| HF_TIMEOUT = int(os.getenv("HF_TIMEOUT", "180")) # 180s au lieu de 60s | |
| HF_MAX_TOKENS = int(os.getenv("HF_MAX_TOKENS", "384")) # réduire un peu la génération | |
| AGENT_MAX_STEPS = int(os.getenv("AGENT_MAX_STEPS", "6")) | |
| # Ordre: un modèle préféré, puis 2 replis rapides et dispo publique | |
| FALLBACK_MODELS = [ | |
| os.getenv("HF_MODEL_ID") or "meta-llama/Meta-Llama-3.1-8B-Instruct", | |
| "Qwen/Qwen2.5-7B-Instruct", | |
| "HuggingFaceH4/zephyr-7b-beta", | |
| ] | |
| # ---- Mini référentiel COICOP (démo) ---- | |
| # ---- Mini référentiel COICOP (démo) ---- | |
| COICOP_ITEMS = [ | |
| {"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"}, | |
| {"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"}, | |
| {"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"}, | |
| {"code": "01.1.4.5.4", "label": "Fromage de chèvre"}, | |
| {"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"}, | |
| {"code": "01.1.1.4", "label": "Pain"}, | |
| {"code": "01.1.1.1", "label": "Riz"}, | |
| {"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"}, | |
| ] | |
| # ✅ Map code -> libellé (avec un libellé pour le code générique) | |
| CODE_TO_LABEL = {it["code"]: it["label"] for it in COICOP_ITEMS} | |
| CODE_TO_LABEL.setdefault("01.1.4.5", "Fromages (générique)") | |
| def normalize_txt(s: str) -> str: | |
| if not s: return "" | |
| s = s.upper() | |
| s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") | |
| s = re.sub(r"[^A-Z0-9% ]+", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def ean_check_digit_ok(ean: str) -> bool: | |
| digits = re.sub(r"\D", "", ean) | |
| if len(digits) not in (8, 12, 13, 14): return False | |
| total = 0 | |
| for i, ch in enumerate(reversed(digits[:-1]), start=1): | |
| n = int(ch); total += n * (3 if i % 2 == 1 else 1) | |
| check = (10 - (total % 10)) % 10 | |
| return check == int(digits[-1]) | |
| # ---- ValidateEANTool ---- | |
| class ValidateEANTool(Tool): | |
| name, description = "validate_ean", "Valide un EAN/GTIN (clé GS1)." | |
| inputs = {"ean": {"type": "string", "description": "Code EAN/GTIN (8/12/13/14 chiffres)."}} | |
| output_type = "object" | |
| def forward(self, ean: str): | |
| digits = re.sub(r"\D", "", ean or "") | |
| if len(digits) not in (8, 12, 13, 14): | |
| return {"valid": False, "normalized": digits} | |
| total = 0 | |
| for i, ch in enumerate(reversed(digits[:-1]), start=1): | |
| n = int(ch); total += n * (3 if i % 2 == 1 else 1) | |
| check = (10 - (total % 10)) % 10 | |
| return {"valid": check == int(digits[-1]), "normalized": digits} | |
| # ---- OFFByEAN ---- | |
| class OFFByEAN(Tool): | |
| name = "openfoodfacts_product_by_ean" | |
| description = "Open Food Facts /api/v0|v2/product/{ean} (name, brands, categories...)." | |
| inputs = {"ean": {"type": "string", "description": "EAN à interroger sur l'API OFF."}} | |
| output_type = "object" | |
| requirements = ["requests"] | |
| def forward(self, ean: str): | |
| import re, json | |
| from requests.adapters import HTTPAdapter | |
| try: | |
| from urllib3.util.retry import Retry | |
| except Exception: | |
| Retry = None | |
| def _to_list(x): | |
| if x is None: return [] | |
| if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()] | |
| if isinstance(x, str): | |
| return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()] | |
| return [str(x).strip()] | |
| def _first(*vals): | |
| for v in vals: | |
| if isinstance(v, str) and v.strip(): return v.strip() | |
| return "" | |
| code = re.sub(r"\D", "", ean or "") | |
| if not code: | |
| return {"ok": False, "status": 0, "code": "", "error": "EAN vide"} | |
| sess = requests.Session() | |
| sess.headers.update({"User-Agent":"insee-coicop-agent/1.0","Accept":"application/json"}) | |
| if Retry: | |
| retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[429,500,502,503,504], | |
| allowed_methods=frozenset(["GET"]), raise_on_status=False) | |
| sess.mount("https://", HTTPAdapter(max_retries=retry)) | |
| urls = [ | |
| f"https://world.openfoodfacts.org/api/v0/product/{code}.json", | |
| "https://world.openfoodfacts.org/api/v2/product/" | |
| f"{code}?lc=fr&fields=code,product_name,product_name_fr,brands,labels_tags," | |
| "categories_tags,categories_tags_fr,categories_hierarchy,ingredients,ingredients_text," | |
| "ingredients_text_fr,ingredients_text_en,allergens,allergens_tags,traces,traces_tags," | |
| "stores,status,status_verbose", | |
| f"https://world.openfoodfacts.net/api/v0/product/{code}.json", | |
| ] | |
| last_err = None | |
| for u in urls: | |
| try: | |
| r = sess.get(u, timeout=15) | |
| if not r.ok: | |
| last_err = f"HTTP {r.status_code}" | |
| continue | |
| data = r.json() | |
| product = data.get("product") | |
| status = data.get("status", 1 if product else 0) | |
| if status == 1 or product: | |
| p = product or {} | |
| product_name = _first(p.get("product_name_fr"), p.get("product_name")) | |
| categories_tags = p.get("categories_tags_fr") or p.get("categories_tags") or p.get("categories") | |
| categories_tags = _to_list(categories_tags) | |
| categories_hierarchy = _to_list(p.get("categories_hierarchy")) | |
| # Ingrédients : texte + liste structurée | |
| ingredients_text = _first(p.get("ingredients_text_fr"), p.get("ingredients_text_en"), p.get("ingredients_text")) | |
| ingredients_list = [] | |
| if isinstance(p.get("ingredients"), list): | |
| for it in p["ingredients"]: | |
| txt = it.get("text") or it.get("id") or "" | |
| if txt: ingredients_list.append(str(txt).strip()) | |
| allergens = _first(p.get("allergens"), None) | |
| allergens_tags = _to_list(p.get("allergens_tags")) | |
| traces = _first(p.get("traces"), None) # ex: "lait, noisettes" | |
| traces_tags = _to_list(p.get("traces_tags")) | |
| labels_tags = _to_list(p.get("labels_tags")) | |
| brands = _first(p.get("brands"), None) | |
| stores = _first(p.get("stores"), None) | |
| return { | |
| "ok": True, "status": status, "status_verbose": data.get("status_verbose"), | |
| "code": code, "used_url": u, | |
| "product_name": product_name, | |
| "categories_tags": categories_tags, | |
| "categories_hierarchy": categories_hierarchy, | |
| "ingredients_text": ingredients_text, | |
| "ingredients_list": ingredients_list, | |
| "allergens": allergens, | |
| "allergens_tags": allergens_tags, | |
| "traces": traces, | |
| "traces_tags": traces_tags, | |
| "labels_tags": labels_tags, | |
| "brands": brands, "brands_list": _to_list(brands), | |
| "stores": stores, "stores_list": _to_list(stores), | |
| # Entrées déjà prêtes pour l’étape 3 | |
| "step3_inputs": { | |
| "product_name": product_name, | |
| "categories_tags": categories_tags, | |
| "ingredients_text": ingredients_text, | |
| "ingredients_list": ingredients_list, | |
| "traces": traces, | |
| "traces_tags": traces_tags, | |
| }, | |
| } | |
| except Exception as e: | |
| last_err = str(e) | |
| return {"ok": False, "status": 0, "code": code, "error": last_err or "not found"} | |
| # ---- RegexCOICOP ---- | |
| class RegexCOICOP(Tool): | |
| name, description = "coicop_regex_rules", "Règles regex → candidats COICOP." | |
| inputs = {"text": {"type": "string", "description": "Libellé produit (texte libre) à analyser."}} | |
| output_type = "object" | |
| import re as _re | |
| SOFT = _re.compile(r"(?:\b|^)(?:CAMEMB(?:ERT)?|BRIE|COULOMMI(?:ERS?)?|BLEU|ROQUEFORT|GORGONZOLA|REBLOCHON|MUNSTER)(?:\b|$)") | |
| PRESS = _re.compile(r"(?:\b|^)(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)(?:\b|$)") | |
| GOAT = _re.compile(r"(?:\b|^)(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)(?:\b|$)") | |
| PROC = _re.compile(r"(?:\b|^)(FONDU(?:ES?)?|FROMAGE FONDU|TOASTINETTES?|VACHE QUI RIT|KIRI|CARRE FRAIS|CARR[ÉE] FRAIS|PORTIONS?)(?:\b|$)|\bRAP[ÉE]?\b") | |
| def _normalize_txt(s: str) -> str: | |
| import unicodedata, re | |
| if not s: return "" | |
| s = s.upper() | |
| s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") | |
| s = re.sub(r"[^A-Z0-9% ]+", " ", s) | |
| return re.sub(r"\s+", " ", s).strip() | |
| def forward(self, text: str): | |
| import re | |
| s = self._normalize_txt(text); c=[] | |
| if self.SOFT.search(s): c.append({"code":"01.1.4.5.2","why":"pâte molle/persillée","score":0.95}) | |
| if self.PRESS.search(s): c.append({"code":"01.1.4.5.3","why":"pâte pressée","score":0.90}) | |
| if self.GOAT.search(s): c.append({"code":"01.1.4.5.4","why":"chèvre","score":0.90}) | |
| if self.PROC.search(s): c.append({"code":"01.1.4.5.5","why":"fondu/râpé/portions","score":0.85}) | |
| if not c and re.search(r"\bFROMAGE\b", s): c.append({"code":"01.1.4.5","why":"générique fromage/laits caillés","score":0.6}) | |
| if not c and re.search(r"\bCR[ÉE]MEUX\b", s): c.append({"code":"01.1.4.5.1","why":"mot-clé 'crémeux' (laits caillés/crèmes fromagères)","score":0.55}) | |
| return {"candidates": c} | |
| # ---- OFFtoCOICOP ---- | |
| class OFFtoCOICOP(Tool): | |
| name, description = "map_off_to_coicop", "Mappe catégories OFF vers COICOP (off_payload ou champs séparés)." | |
| inputs = { | |
| "product_name": {"type":"string", "description":"Nom produit OFF (fr/en).", "nullable": True}, | |
| "categories_tags": {"type":"array", "description":"Liste OFF categories_tags.", "nullable": True}, | |
| "ingredients_text":{"type":"string","description":"Texte ingrédients.", "nullable": True}, | |
| "ingredients_list":{"type":"array", "description":"Liste structurée des ingrédients (strings).", "nullable": True}, | |
| "traces": {"type":"string","description":"Champ traces (fr).", "nullable": True}, | |
| "traces_tags": {"type":"array", "description":"Tags de traces.", "nullable": True}, | |
| # 🔧 IMPORTANT: on autorise un objet ici (dict ou string) | |
| "off_payload": {"type":"object","description":"Sortie brute de l'étape 2 (dict OU string).", "nullable": True}, | |
| } | |
| output_type="object" | |
| import re as _re, json as _json, ast as _ast | |
| def _normalize_txt(self, s: str) -> str: | |
| import unicodedata, re | |
| if not s: return "" | |
| s = s.upper() | |
| s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") | |
| s = re.sub(r"[^A-Z0-9% ]+", " ", s) | |
| return re.sub(r"\s+", " ", s).strip() | |
| def _to_list(self, x): | |
| import re | |
| if x is None: return [] | |
| if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()] | |
| if isinstance(x, str): return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()] | |
| return [str(x).strip()] | |
| def _safe_parse(self, x): | |
| # Accepte déjà un dict ; sinon essaie JSON puis literal_eval | |
| if isinstance(x, dict): return x | |
| if not isinstance(x, str): return {} | |
| try: return self._json.loads(x) | |
| except Exception: | |
| try: return self._ast.literal_eval(x) | |
| except Exception: return {} | |
| # --- mots-clés par familles | |
| SOFT = _re.compile(r"\b(CAMEMBERT|BRIE|COULOMMIERS|BLUE CHEESE|ROQUEFORT|GORGONZOLA|MUNSTER|REBLOCHON)\b") | |
| PRESS = _re.compile(r"\b(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)\b") | |
| GOAT = _re.compile(r"\b(CHEVRE|CH[ÈE]VRE|STE MAURE|CROTTIN|BUCHE|BUCHETTE|PICODON|PELARDON|BANON)\b") | |
| PROC = _re.compile(r"\b(FONDU|FONDUES?|RAPE|RÂPE|PORTIONS?|KIRI|VACHE QUI RIT|CARRE FRAIS|CARR[ÉE] FRAIS|TOASTINETTES?)\b") | |
| GENERIC_FROMAGE = _re.compile(r"\bFROMAGE[S]?\b") | |
| CREMEUX = _re.compile(r"\bCR[ÉE]MEUX\b") | |
| EN_CHEESE = _re.compile(r"\bCHEESE(S)?\b") | |
| # --- suppression des clauses "traces" | |
| _TRACES_BLOCK = _re.compile( | |
| r"(PEUT\s+CONTENIR\s+DES\s+TRACES\s+DE\s+[^.;\)\]]+)|" | |
| r"(MAY\s+CONTAIN\s+TRACES\s+OF\s+[^.;\)\]]+)|" | |
| r"(\bTRACES?\s+DE\s+[^.;\)\]]+)", | |
| _re.I | |
| ) | |
| def _without_traces(self, s: str) -> str: | |
| if not s: return "" | |
| return self._TRACES_BLOCK.sub(" ", s) | |
| def _mk(self, code, base, why, source): | |
| boost = {"name":0.05, "cat":0.04, "ing_no_traces":0.03, "ing":0.01}.get(source, 0.0) | |
| return {"code": code, "score": round(base+boost, 4), "why": f"{why} (source:{source})"} | |
| def _pad_min3(self, ranked, hint_is_cheese=False): | |
| # Padding déterministe pour garantir >=3 candidats sans dupliquer | |
| fallback_order = ["01.1.4.5.2","01.1.4.5.3","01.1.4.5.5","01.1.4.5.1","01.1.4.5"] | |
| present = {r["code"] for r in ranked} | |
| for code in fallback_order: | |
| if len(ranked) >= 3: break | |
| if code in present: continue | |
| why = "fallback générique fromage" if hint_is_cheese else "fallback faible (peu d'indices)" | |
| base = 0.52 if hint_is_cheese else 0.48 | |
| ranked.append({"code": code, "score": base, "why": why}) | |
| present.add(code) | |
| return ranked[:3] | |
| def forward(self, product_name=None, categories_tags=None, ingredients_text=None, | |
| ingredients_list=None, traces=None, traces_tags=None, off_payload=None): | |
| # Hydrate depuis off_payload si besoin (dict OU string), y compris step3_inputs | |
| if off_payload and not (product_name or categories_tags or ingredients_text or ingredients_list or traces or traces_tags): | |
| data = self._safe_parse(off_payload) or {} | |
| step3 = data.get("step3_inputs") or {} | |
| product_name = data.get("product_name") or step3.get("product_name") or "" | |
| categories_tags = self._to_list(data.get("categories_tags") or step3.get("categories_tags")) | |
| ingredients_text= data.get("ingredients_text") or step3.get("ingredients_text") or "" | |
| ingredients_list= self._to_list(data.get("ingredients_list")) | |
| traces = data.get("traces") or step3.get("traces") or "" | |
| traces_tags = self._to_list(data.get("traces_tags") or step3.get("traces_tags")) | |
| # Normalisations | |
| name = self._normalize_txt(product_name or "") | |
| cats_raw = " ".join(self._to_list(categories_tags)) | |
| cats = self._normalize_txt(cats_raw) | |
| ingt = self._normalize_txt(ingredients_text or "") | |
| ingt_no_tr = self._normalize_txt(self._without_traces(ingredients_text or "")) | |
| ing_list = [self._normalize_txt(x) for x in self._to_list(ingredients_list)] | |
| ing_join = " ".join(ing_list) | |
| ing_join_no_tr = self._normalize_txt(self._without_traces(ing_join)) | |
| # Indice large "fromage" | |
| hint_is_cheese = ( | |
| bool(self.GENERIC_FROMAGE.search(name) or self.GENERIC_FROMAGE.search(cats) or self.EN_CHEESE.search(cats)) | |
| or ("EN:CHEESES" in cats or "FR:FROMAGES" in cats or "FROMAGES" in cats) | |
| ) | |
| c=[] | |
| # 1) Nom produit & catégories (fort) | |
| if self.SOFT.search(name) or self.SOFT.search(cats): | |
| c.append(self._mk("01.1.4.5.2", 0.90, "OFF: pâte molle/persillée", "name" if self.SOFT.search(name) else "cat")) | |
| if self.PRESS.search(name) or self.PRESS.search(cats): | |
| c.append(self._mk("01.1.4.5.3", 0.87, "OFF: pâte pressée", "name" if self.PRESS.search(name) else "cat")) | |
| if self.GOAT.search(name) or self.GOAT.search(cats): | |
| c.append(self._mk("01.1.4.5.4", 0.88, "OFF: chèvre", "name" if self.GOAT.search(name) else "cat")) | |
| if self.PROC.search(name) or self.PROC.search(cats): | |
| c.append(self._mk("01.1.4.5.5", 0.86, "OFF: fondu/râpé/portions", "name" if self.PROC.search(name) else "cat")) | |
| # 2) Ingrédients – SANS "traces" (moyen) | |
| if self.SOFT.search(ingt_no_tr) or self.SOFT.search(ing_join_no_tr): | |
| c.append(self._mk("01.1.4.5.2", 0.84, "Ingrédients (sans traces): pâte molle/persillée", "ing_no_traces")) | |
| if self.PRESS.search(ingt_no_tr) or self.PRESS.search(ing_join_no_tr): | |
| c.append(self._mk("01.1.4.5.3", 0.82, "Ingrédients (sans traces): pâte pressée", "ing_no_traces")) | |
| if self.GOAT.search(ingt_no_tr) or self.GOAT.search(ing_join_no_tr): | |
| c.append(self._mk("01.1.4.5.4", 0.83, "Ingrédients (sans traces): chèvre", "ing_no_traces")) | |
| if self.PROC.search(ingt_no_tr) or self.PROC.search(ing_join_no_tr): | |
| c.append(self._mk("01.1.4.5.5", 0.80, "Ingrédients (sans traces): fondu/râpé/portions", "ing_no_traces")) | |
| # 3) Ingrédients bruts (faible — pas de déclencheur chèvre ici) | |
| if self.SOFT.search(ingt) or self.SOFT.search(ing_join): | |
| c.append(self._mk("01.1.4.5.2", 0.78, "Ingrédients: pâte molle/persillée", "ing")) | |
| if self.PRESS.search(ingt) or self.PRESS.search(ing_join): | |
| c.append(self._mk("01.1.4.5.3", 0.76, "Ingrédients: pâte pressée", "ing")) | |
| if self.PROC.search(ingt) or self.PROC.search(ing_join): | |
| c.append(self._mk("01.1.4.5.5", 0.74, "Ingrédients: fondu/râpé/portions", "ing")) | |
| # 4) Génériques si rien d'évident | |
| if not c and (hint_is_cheese or self.GENERIC_FROMAGE.search(name) or self.GENERIC_FROMAGE.search(cats) or self.CREMEUX.search(name)): | |
| # proposer générique fromage + 2 familles probables | |
| c.extend([ | |
| {"code":"01.1.4.5", "score":0.62, "why":"OFF: générique fromage"}, | |
| {"code":"01.1.4.5.2","score":0.60, "why":"fallback fromage (molle/persillée)"}, | |
| {"code":"01.1.4.5.3","score":0.59, "why":"fallback fromage (pressée)"}, | |
| ]) | |
| # Dédupliquer / agréger | |
| bucket={} | |
| for ci in c: | |
| code=ci["code"] | |
| if code not in bucket: | |
| bucket[code] = {**ci, "why_list":[ci.get("why","")]} | |
| else: | |
| if ci["score"]>bucket[code]["score"]: | |
| bucket[code].update({"score":ci["score"], "why":ci.get("why","")}) | |
| bucket[code]["why_list"].append(ci.get("why","")) | |
| ranked = sorted(bucket.values(), key=lambda x: x["score"], reverse=True) | |
| # 🎯 Toujours AU MOINS 3 candidats (avec padding si nécessaire) | |
| if len(ranked) < 3: | |
| ranked = self._pad_min3(ranked, hint_is_cheese=hint_is_cheese) | |
| return {"candidates": ranked[:3]} | |
| # ---- SemSim ---- | |
| class SemSim(Tool): | |
| name, description = "coicop_semantic_similarity", "Embeddings → top-k COICOP." | |
| inputs = {"text":{"type":"string","description":"Texte libellé"}, | |
| "topk":{"type":"integer","description":"Nombre de candidats (défaut 5)","nullable":True}} | |
| output_type = "object" | |
| requirements = ["sentence_transformers", "torch"] | |
| COICOP_ITEMS = COICOP_ITEMS | |
| def _normalize_txt(s: str) -> str: | |
| import unicodedata, re | |
| if not s: return "" | |
| s = s.upper() | |
| s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") | |
| s = re.sub(r"[^A-Z0-9% ]+", " ", s) | |
| return re.sub(r"\s+", " ", s).strip() | |
| def forward(self, text: str, topk: int = 5): | |
| if not hasattr(self, "_model"): | |
| self._model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| q = self._normalize_txt(text) | |
| q_emb = self._model.encode([q], normalize_embeddings=True) | |
| labels = [f"{it['code']} {it['label']}" for it in self.COICOP_ITEMS] | |
| L = self._model.encode(labels, normalize_embeddings=True) | |
| sims = util.cos_sim(q_emb, L).tolist()[0] | |
| ranked = sorted( | |
| [{"code": self.COICOP_ITEMS[i]["code"], "label": self.COICOP_ITEMS[i]["label"], "score": float(sims[i])} | |
| for i in range(len(self.COICOP_ITEMS))], | |
| key=lambda x: x["score"], reverse=True | |
| ) | |
| return {"candidates": ranked[:max(1,int(topk))]} | |
| # ---- Web tools (recherche & lecture) ---- | |
| class WebSearch(Tool): | |
| name = "web_search" | |
| description = "Recherche web légère (DuckDuckGo HTML). Entrée: query (fr/en). Retour: top résultats avec titre, url, snippet." | |
| inputs = {"query": {"type":"string","description":"Requête de recherche web."}} | |
| output_type = "object" | |
| requirements = ["requests"] | |
| def forward(self, query: str): | |
| import html | |
| sess = requests.Session() | |
| sess.headers.update({"User-Agent":"insee-coicop-agent/1.0"}) | |
| try: | |
| r = sess.get("https://duckduckgo.com/html/", params={"q": query, "kl":"fr-fr"}, timeout=15) | |
| r.raise_for_status() | |
| except Exception as e: | |
| return {"ok": False, "error": str(e), "results": []} | |
| # parsing très simple, sans dépendance lourde | |
| text = r.text | |
| # Résultats sous <a class="result__a" href="...">Titre</a> | |
| results = [] | |
| for m in re.finditer(r'<a[^>]+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)</a>', text, re.I|re.S): | |
| url = html.unescape(m.group(1)) | |
| title = re.sub("<.*?>", "", html.unescape(m.group(2))).strip() | |
| # Snippet | |
| snip_m = re.search(r'<a[^>]+class="result__a"[^>]+href="{}"[^>]*>.*?</a>.*?<a[^>]+class="result__snippet"[^>]*>(.*?)</a>'.format(re.escape(m.group(1))), text, re.I|re.S) | |
| snippet = "" | |
| if snip_m: | |
| snippet = re.sub("<.*?>", "", html.unescape(snip_m.group(1))).strip() | |
| if title and url: | |
| results.append({"title": title, "url": url, "snippet": snippet}) | |
| if len(results) >= 8: | |
| break | |
| return {"ok": True, "query": query, "results": results} | |
| class WebGet(Tool): | |
| name = "web_get" | |
| description = "Télécharge une page web et renvoie un texte brut nettoyé (limité à ~50k chars)." | |
| inputs = {"url": {"type":"string","description":"URL http(s) à lire."}} | |
| output_type = "object" | |
| requirements = ["requests", "beautifulsoup4"] | |
| def forward(self, url: str): | |
| import html | |
| text_out = "" | |
| try: | |
| r = requests.get(url, headers={"User-Agent":"insee-coicop-agent/1.0"}, timeout=20) | |
| if not r.ok: | |
| return {"ok": False, "status": r.status_code, "url": url, "text": ""} | |
| content = r.text | |
| try: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(content, "html.parser") | |
| # retirer scripts/styles/nav | |
| for tag in soup(["script","style","noscript","header","footer","nav","form","aside"]): | |
| tag.decompose() | |
| text_out = soup.get_text(separator=" ") | |
| except Exception: | |
| # fallback brut: retire les tags | |
| text_out = re.sub(r"<script.*?</script>|<style.*?</style>", " ", content, flags=re.S|re.I) | |
| text_out = re.sub(r"<[^>]+>", " ", text_out) | |
| text_out = re.sub(r"\s+", " ", text_out).strip() | |
| if len(text_out) > 50000: | |
| text_out = text_out[:50000] | |
| return {"ok": True, "url": url, "text": text_out} | |
| except Exception as e: | |
| return {"ok": False, "url": url, "error": str(e), "text": ""} | |
| # ---- MergeCandidatesTool ---- | |
| class MergeCandidatesTool(Tool): | |
| name = "merge_candidates" | |
| description = ("Fusionne des listes de candidats COICOP (dédupe par code, prend le score max, " | |
| "agrège les justifs) et garantit min_k éléments avec padding neutre.") | |
| inputs = { | |
| "candidates_lists": {"type": "array", "description": "Liste de dicts {'candidates':[...]} venant d'autres outils."}, | |
| "min_k": {"type": "integer", "description": "Taille minimale de la liste fusionnée (défaut 3).", "nullable": True}, | |
| "fallback_bias": {"type": "string", "description": "Indice métier pour le padding (ex: 'cheese' ou '').", "nullable": True}, | |
| "score_cap": {"type": "number", "description": "Clip des scores à [0, score_cap] (défaut 1.0).", "nullable": True}, | |
| } | |
| output_type = "object" | |
| def forward(self, candidates_lists, min_k: int = 3, fallback_bias: str = "", score_cap: float = 1.0): | |
| # 1) Collecte | |
| if not isinstance(candidates_lists, list): | |
| return {"candidates": []} | |
| bucket = {} # code -> {code, score, votes, why_list} | |
| for obj in candidates_lists: | |
| if not isinstance(obj, dict): | |
| continue | |
| for c in obj.get("candidates", []): | |
| code = c.get("code") | |
| if not code: | |
| continue | |
| score = float(c.get("score", c.get("score_final", 0.0))) | |
| if score_cap is not None: | |
| score = max(0.0, min(float(score_cap), score)) | |
| why = c.get("why", "") or c.get("label", "") | |
| if code not in bucket: | |
| bucket[code] = {"code": code, "score": score, "votes": 1, "why_list": [why] if why else []} | |
| else: | |
| # Garde le meilleur score, incrémente les votes, agrège les raisons | |
| if score > bucket[code]["score"]: | |
| bucket[code]["score"] = score | |
| bucket[code]["votes"] += 1 | |
| if why: | |
| bucket[code]["why_list"].append(why) | |
| merged = list(bucket.values()) | |
| # 2) Tri primaire par score puis par votes | |
| merged.sort(key=lambda x: (x["score"], x["votes"]), reverse=True) | |
| # 3) Padding si < min_k | |
| def _fallback_order(bias: str): | |
| # Ordre neutre mais raisonnable pour les fromages | |
| base = ["01.1.4.5.2", "01.1.4.5.3", "01.1.4.5.5", "01.1.4.5.1", "01.1.4.5"] | |
| return base if (bias or "").lower() == "cheese" else base | |
| if len(merged) < max(1, int(min_k or 3)): | |
| present = {m["code"] for m in merged} | |
| for code in _fallback_order(fallback_bias): | |
| if len(merged) >= min_k: | |
| break | |
| if code in present: | |
| continue | |
| merged.append({ | |
| "code": code, | |
| "score": 0.5 if (fallback_bias or "").lower() == "cheese" else 0.48, | |
| "votes": 0, | |
| "why_list": ["padding fallback"] | |
| }) | |
| present.add(code) | |
| # 4) Normalisation finale de forme (why synthétique) | |
| out = [] | |
| for m in merged[:max(1, int(min_k or 3))]: | |
| why = ", ".join(sorted(set([w for w in m.get("why_list", []) if w]))) | |
| if not why: | |
| why = "fusion (pas d'explications)" | |
| out.append({"code": m["code"], "score": m["score"], "votes": m["votes"], "why": why}) | |
| return {"candidates": out} | |
| # ---- Resolve ---- | |
| class Resolve(Tool): | |
| name, description = "resolve_coicop_candidates", "Fusionne candidats → choix final + alternatives + explication." | |
| inputs = {"json_lists": {"type":"array","description":"Liste de JSON (str/dict) d'autres tools."}, | |
| "topn":{"type":"integer","description":"Nb d'alternatives (défaut 3)","nullable":True}} | |
| output_type = "object" | |
| def _fallback_min3(self): | |
| # ordre neutre et scores modestes (avec libellés) | |
| base = [ | |
| {"code":"01.1.4.5.2","label": CODE_TO_LABEL.get("01.1.4.5.2",""), | |
| "score_final":0.50,"votes":0,"evidences":["fallback (aucune évidence)"]}, | |
| {"code":"01.1.4.5.3","label": CODE_TO_LABEL.get("01.1.4.5.3",""), | |
| "score_final":0.49,"votes":0,"evidences":["fallback (aucune évidence)"]}, | |
| {"code":"01.1.4.5.5","label": CODE_TO_LABEL.get("01.1.4.5.5",""), | |
| "score_final":0.48,"votes":0,"evidences":["fallback (aucune évidence)"]}, | |
| ] | |
| return base | |
| def forward(self, json_lists, topn: int = 3): | |
| import json | |
| from typing import Dict, Any | |
| bucket: Dict[str, Dict[str, Any]] = {} | |
| # Tolérance liste directe | |
| if isinstance(json_lists, list) and json_lists and isinstance(json_lists[0], dict) and "code" in json_lists[0]: | |
| json_lists = [{"candidates": json_lists}] | |
| for s in json_lists: | |
| data = s | |
| if isinstance(s, str): | |
| try: data = json.loads(s) | |
| except Exception: data = {} | |
| if not isinstance(data, dict): | |
| continue | |
| for c in data.get("candidates", []): | |
| code = c.get("code") | |
| if not code: | |
| continue | |
| score = float(c.get("score", c.get("score_final", 0.0))) | |
| why = c.get("why", "") or c.get("label", "") | |
| # ✅ libellé via le mapping (fallback sur un éventuel label déjà présent) | |
| label = CODE_TO_LABEL.get(code, c.get("label", "")) | |
| if code not in bucket: | |
| bucket[code] = { | |
| "code": code, | |
| "label": label, # <-- ajouté | |
| "score": score, | |
| "votes": 1, | |
| "evidences": [why] if why else [] | |
| } | |
| else: | |
| bucket[code]["score"] = max(bucket[code]["score"], score) | |
| bucket[code]["votes"] += 1 | |
| if why: | |
| bucket[code]["evidences"].append(why) | |
| # garde un label si absent | |
| if not bucket[code].get("label"): | |
| bucket[code]["label"] = label | |
| if not bucket: | |
| # 🔁 Fallback global si VRAIMENT rien n'a pu être agrégé (avec labels) | |
| ranked = self._fallback_min3() | |
| final = ranked[0] | |
| alts = ranked[1:] | |
| exp = "Aucun candidat issu des outils; retour d’un fallback générique (aucune évidence trouvée)." | |
| return {"final": final, "alternatives": alts, "candidates_top": ranked, "explanation": exp} | |
| for v in bucket.values(): | |
| v["score_final"] = v["score"] + 0.05*(v["votes"]-1) | |
| ranked = sorted(bucket.values(), key=lambda x: x["score_final"], reverse=True) | |
| # Top fusionné : au moins 3 | |
| min_top = max(3, topn if isinstance(topn, int) and topn>0 else 3) | |
| if len(ranked) < min_top: | |
| # compléter avec un petit fallback sans dupliquer (avec labels) | |
| already = {r["code"] for r in ranked} | |
| for fb in self._fallback_min3(): | |
| if len(ranked) >= min_top: | |
| break | |
| if fb["code"] in already: | |
| continue | |
| ranked.append(fb) | |
| # Sélection finale | |
| final = ranked[0] | |
| alts = ranked[1:1+min_top-1] | |
| # Sécurise le label si jamais manquant (ne change rien au scoring) | |
| final.setdefault("label", CODE_TO_LABEL.get(final["code"], "")) | |
| for a in alts: | |
| a.setdefault("label", CODE_TO_LABEL.get(a["code"], "")) | |
| ev = final.get("evidences", []) | |
| exp = ( | |
| f"Choix {final['code']} (score {final['score_final']:.2f}) – votes={final.get('votes',0)} – raisons: {', '.join(sorted(set(ev)))}" | |
| if ev else | |
| f"Choix {final['code']} (score {final['score_final']:.2f}) – fallback partiel." | |
| ) | |
| # candidates_top avec labels assurés | |
| candidates_top = [] | |
| for r in ranked[:min_top]: | |
| r.setdefault("label", CODE_TO_LABEL.get(r["code"], "")) | |
| candidates_top.append(r) | |
| return {"final": final, "alternatives": alts, "candidates_top": candidates_top, "explanation": exp} | |
| # ---- build_agent ---- | |
| def build_agent(model_id: str | None = None) -> CodeAgent: | |
| mid = model_id or FALLBACK_MODELS[0] | |
| model = InferenceClientModel( | |
| model_id=mid, | |
| temperature=0.2, | |
| max_tokens=HF_MAX_TOKENS, | |
| timeout=HF_TIMEOUT, # ⬅️ timeout augmenté | |
| top_p=0.95, | |
| ) | |
| agent = CodeAgent( | |
| tools=[ValidateEANTool(), OFFByEAN(), RegexCOICOP(), OFFtoCOICOP(), SemSim(), | |
| WebSearch(), WebGet(), | |
| MergeCandidatesTool(), Resolve()], | |
| model=model, | |
| add_base_tools=False, | |
| max_steps=AGENT_MAX_STEPS, # ⬅️ moins d’étapes = moins de tokens/latence | |
| verbosity_level=1, # ⬅️ logs plus courts = moins de tokens sortants | |
| ) | |
| return agent | |
| # ---- run task with fallback ---- | |
| def run_task_with_fallback(task: str): | |
| errors = [] | |
| for mid in [m for m in FALLBACK_MODELS if m]: | |
| try: | |
| agent = build_agent(mid) | |
| return agent.run(task) | |
| except Exception as e: | |
| errors.append(f"{mid}: {type(e).__name__}: {e}") | |
| # on tente le modèle suivant | |
| continue | |
| # Si TOUT a échoué, renvoyer un JSON propre plutôt qu’un crash | |
| return { | |
| "final": None, | |
| "alternatives": [], | |
| "candidates_top": [], | |
| "explanation": "LLM backend indisponible (timeouts).", | |
| "errors": errors, | |
| } | |
| def parse_result(res): | |
| if isinstance(res, dict): return res | |
| try: return ast.literal_eval(res) | |
| except Exception: return {"raw": res} | |
| if __name__ == "__main__": | |
| ean = "3256221112345" # EAN fictif | |
| label = "Les p'tits crémeux – Aldi – 216 g" | |
| agent = build_agent() | |
| task = f"""\ | |
| Classe ce produit en COICOP: | |
| EAN: {ean} | |
| Libellé: {label} | |
| Outils autorisés : | |
| - validate_ean | |
| - openfoodfacts_product_by_ean | |
| - map_off_to_coicop | |
| - coicop_regex_rules | |
| - coicop_semantic_similarity | |
| - merge_candidates | |
| - resolve_coicop_candidates | |
| - python_interpreter # UNIQUEMENT pour lignes simples d’assignation ou d’appel d’outil | |
| Règles STRICTES d’écriture de code : | |
| - Aucune structure de contrôle Python : pas de if, else, for, while, try, with, def, class. | |
| - Aucun print, aucun logging, aucune concaténation multi-ligne. | |
| - Chaque bloc de code contient une seule instruction Python, sur une seule ligne. | |
| - Commencer par définir deux variables : | |
| 1) EAN_STR = "{ean}" | |
| 2) LBL = \"\"\"{label}\"\"\" | |
| - Pour tous les outils qui prennent le libellé, utiliser LBL. | |
| - La fonction validate_ean renvoie un dictionnaire avec les clés 'valid' et 'normalized'. Ne pas la traiter comme un booléen directement. | |
| Règles STRICTES de sortie : | |
| - Terminer par un unique objet JSON valide en appelant final_answer avec cet objet. | |
| - Ne pas ajouter de texte en dehors de l’objet JSON final. | |
| - Ne pas utiliser de backticks. | |
| - Le JSON final doit contenir les clés : final, alternatives, candidates_top, explanation. | |
| Branchements (décision prise sans écrire de if en code) : | |
| - MODE AVEC EAN si EAN_STR n’est pas "N/A" ET si validate_ean(EAN_STR) renvoie valid = True ET si l’appel OpenFoodFacts renvoie ok = True. | |
| - Sinon, MODE SANS EAN. | |
| Pipeline — MODE AVEC EAN : | |
| 1) v = validate_ean(EAN_STR) | |
| 2) off = openfoodfacts_product_by_ean(EAN_STR) | |
| 3) offmap = map_off_to_coicop(off_payload=off) | |
| 4) rx = coicop_regex_rules(text=LBL) | |
| 5) sem = coicop_semantic_similarity(text=LBL, topk=5) | |
| 6) merged = merge_candidates(candidates_lists=[offmap, rx, sem], min_k=3, fallback_bias="cheese") | |
| 7) res = resolve_coicop_candidates(json_lists=[merged], topn=3) | |
| → Appeler immédiatement final_answer avec res. | |
| Pipeline — MODE SANS EAN : | |
| 1) rx = coicop_regex_rules(text=LBL) | |
| 2) sem = coicop_semantic_similarity(text=LBL, topk=5) | |
| 3) merged = merge_candidates(candidates_lists=[rx, sem], min_k=3, fallback_bias="cheese") | |
| 4) res = resolve_coicop_candidates(json_lists=[merged], topn=3) | |
| → Appeler immédiatement final_answer avec res. | |
| Contraintes d’usage : | |
| - Utiliser python_interpreter uniquement pour des lignes uniques d’assignation ou d’appel d’outil (ex: var = tool(args) ou tool(args)). | |
| - Ne créer aucun fichier et ne faire aucune entrée/sortie externe. | |
| """ | |
| # out = agent.run(task) | |
| out = run_task_with_fallback(task) | |
| print(parse_result(out)) | |