Marylene commited on
Commit
d2e8069
·
verified ·
1 Parent(s): a0d1aaa

Update quick_deploy_agent.py

Browse files
Files changed (1) hide show
  1. quick_deploy_agent.py +351 -425
quick_deploy_agent.py CHANGED
@@ -1,426 +1,352 @@
1
- # quickstart_agent.py
2
- from __future__ import annotations
3
- import json, re, unicodedata, ast
4
- from typing import List, Dict, Any, Optional
5
- import requests
6
- from smolagents import Tool, CodeAgent, InferenceClientModel
7
- from sentence_transformers import SentenceTransformer, util
8
-
9
- # ---- Mini référentiel COICOP (démo) ----
10
- COICOP_ITEMS = [
11
- {"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"},
12
- {"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"},
13
- {"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"},
14
- {"code": "01.1.4.5.4", "label": "Fromage de chèvre"},
15
- {"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"},
16
- {"code": "01.1.1.4", "label": "Pain"},
17
- {"code": "01.1.1.1", "label": "Riz"},
18
- {"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"},
19
- ]
20
-
21
- def normalize_txt(s: str) -> str:
22
- if not s: return ""
23
- s = s.upper()
24
- s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
25
- s = re.sub(r"[^A-Z0-9% ]+", " ", s)
26
- s = re.sub(r"\s+", " ", s).strip()
27
- return s
28
-
29
- def ean_check_digit_ok(ean: str) -> bool:
30
- digits = re.sub(r"\D", "", ean)
31
- if len(digits) not in (8, 12, 13, 14): return False
32
- total = 0
33
- for i, ch in enumerate(reversed(digits[:-1]), start=1):
34
- n = int(ch); total += n * (3 if i % 2 == 1 else 1)
35
- check = (10 - (total % 10)) % 10
36
- return check == int(digits[-1])
37
-
38
- # ---- ValidateEANTool : tout en local dans forward ----
39
- class ValidateEANTool(Tool):
40
- name, description = "validate_ean", "Valide un EAN/GTIN (clé GS1)."
41
- inputs = {"ean": {"type": "string", "description": "Code EAN/GTIN (8/12/13/14 chiffres)."}}
42
- output_type = "dict" # <— (facultatif)
43
-
44
- def forward(self, ean: str):
45
- import re
46
- digits = re.sub(r"\D", "", ean or "")
47
- if len(digits) not in (8, 12, 13, 14):
48
- return {"valid": False, "normalized": digits}
49
- total = 0
50
- for i, ch in enumerate(reversed(digits[:-1]), start=1):
51
- n = int(ch); total += n * (3 if i % 2 == 1 else 1)
52
- check = (10 - (total % 10)) % 10
53
- return {"valid": check == int(digits[-1]), "normalized": digits}
54
-
55
- # ---- OFFtoCOICOP : accepte off_payload (JSON brut) OU champs séparés ----
56
- class OFFtoCOICOP(Tool):
57
- name, description = "map_off_to_coicop", "Mappe catégories OFF vers COICOP (off_payload ou champs séparés)."
58
- inputs = {
59
- "product_name": {"type":"string", "description":"Nom produit OFF (fr/en).", "nullable": True},
60
- "categories_tags": {"type":"array", "description":"Liste OFF categories_tags.", "nullable": True},
61
- "ingredients_text":{"type":"string","description":"Texte ingrédients.", "nullable": True},
62
- # 👇 NOUVEAU : on peut passer directement la chaîne renvoyée par openfoodfacts_product_by_ean
63
- "off_payload": {"type":"string","description":"Chaîne JSON brute renvoyée par l'étape 2.", "nullable": True},
64
- }
65
- output_type="string"
66
-
67
- # --- utilitaires locaux (pas d'import global pour faciliter l'export Hub) ---
68
- import re as _re, json as _json, ast as _ast
69
- def _normalize_txt(self, s: str) -> str:
70
- import unicodedata, re
71
- if not s: return ""
72
- s = s.upper()
73
- s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
74
- s = re.sub(r"[^A-Z0-9% ]+", " ", s)
75
- return re.sub(r"\s+", " ", s).strip()
76
- def _to_list(self, x):
77
- import re
78
- if x is None: return []
79
- if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
80
- if isinstance(x, str):
81
- return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
82
- return [str(x).strip()]
83
- def _safe_parse(self, s):
84
- try:
85
- return self._json.loads(s)
86
- except Exception:
87
- try:
88
- return self._ast.literal_eval(s)
89
- except Exception:
90
- return {}
91
-
92
- # --- règles inchangées ---
93
- RULES = [
94
- (_re.compile(r"\b(CAMEMBERT|BRIE|COULOMMIERS|BLUE CHEESE|ROQUEFORT|GORGONZOLA|MUNSTER|REBLOCHON)\b"),
95
- ("01.1.4.5.2",0.95,"OFF: pâte molle/persillée")),
96
- (_re.compile(r"\b(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)\b"),
97
- ("01.1.4.5.3",0.90,"OFF: pâte pressée")),
98
- (_re.compile(r"\b(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)\b"),
99
- ("01.1.4.5.4",0.90,"OFF: chèvre")),
100
- (_re.compile(r"\b(FONDU|FONDUES?|RAPE|RÂPE|PORTIONS?|KIRI|VACHE QUI RIT|CARRE FRAIS|CARR[ÉE] FRAIS)\b"),
101
- ("01.1.4.5.5",0.85,"OFF: fondu/rapé/portions")),
102
- ]
103
-
104
- def forward(self, product_name=None, categories_tags=None, ingredients_text=None, off_payload=None) -> str:
105
- import json
106
- # 1) Si off_payload est fourni, on parse et on remplit les champs
107
- if off_payload and not (product_name or categories_tags or ingredients_text):
108
- data = self._safe_parse(off_payload) or {}
109
- # supporte aussi notre OFFByEAN normalisé (ok/product_name/…)
110
- p_name = data.get("product_name")
111
- c_tags = data.get("categories_tags")
112
- ingr = data.get("ingredients_text")
113
- product_name = p_name if isinstance(p_name, str) else ""
114
- categories_tags = self._to_list(c_tags)
115
- ingredients_text= ingr if isinstance(ingr, str) else ""
116
-
117
- # 2) Normalisation texte
118
- text = " ".join([t for t in [
119
- self._normalize_txt(product_name or ""),
120
- self._normalize_txt(" ".join(self._to_list(categories_tags))),
121
- self._normalize_txt(ingredients_text or "")
122
- ] if t])
123
-
124
- # 3) Application des règles
125
- c=[]
126
- for rx,(code,score,why) in self.RULES:
127
- if rx.search(text): c.append({"code":code,"why":why,"score":score})
128
-
129
- return json.dumps({"candidates": c})
130
-
131
- # ---- OFFByEAN : robuste + sortie normalisée + step3_inputs ----
132
- class OFFByEAN(Tool):
133
- name = "openfoodfacts_product_by_ean"
134
- description = "Open Food Facts /api/v0|v2/product/{ean} (name, brands, categories...)."
135
- inputs = {"ean": {"type": "string", "description": "EAN à interroger sur l'API OFF."}}
136
- output_type = "dict"
137
- requirements = ["requests"]
138
-
139
- def forward(self, ean: str):
140
- import re, requests
141
- from requests.adapters import HTTPAdapter
142
- try:
143
- from urllib3.util.retry import Retry
144
- except Exception:
145
- Retry = None
146
-
147
- def _to_list(x):
148
- if x is None: return []
149
- if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
150
- if isinstance(x, str):
151
- return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
152
- return [str(x).strip()]
153
-
154
- def _first(*vals):
155
- for v in vals:
156
- if isinstance(v, str) and v.strip(): return v.strip()
157
- return ""
158
-
159
- code = re.sub(r"\D", "", ean or "")
160
- if not code:
161
- return {"ok": False, "status": 0, "code": "", "error": "EAN vide"}
162
-
163
- sess = requests.Session()
164
- sess.headers.update({"User-Agent":"insee-coicop-agent/1.0","Accept":"application/json"})
165
- if Retry:
166
- retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[429,500,502,503,504],
167
- allowed_methods=frozenset(["GET"]), raise_on_status=False)
168
- sess.mount("https://", HTTPAdapter(max_retries=retry))
169
-
170
- urls = [
171
- f"https://world.openfoodfacts.org/api/v0/product/{code}.json",
172
- "https://world.openfoodfacts.org/api/v2/product/"
173
- f"{code}?lc=fr&fields=code,product_name,product_name_fr,brands,"
174
- "categories_tags,categories_tags_fr,ingredients_text,ingredients_text_fr,"
175
- "stores,status,status_verbose",
176
- f"https://world.openfoodfacts.net/api/v0/product/{code}.json",
177
- ]
178
-
179
- last_err = None
180
- for u in urls:
181
- try:
182
- r = sess.get(u, timeout=15)
183
- if not r.ok:
184
- last_err = f"HTTP {r.status_code}"
185
- continue
186
- data = r.json()
187
- product = data.get("product")
188
- status = data.get("status", 1 if product else 0)
189
- if status == 1 or product:
190
- p = product or {}
191
- product_name = _first(p.get("product_name_fr"), p.get("product_name"))
192
- categories_tags = p.get("categories_tags_fr") or p.get("categories_tags") or p.get("categories")
193
- categories_tags = _to_list(categories_tags)
194
- ingredients_text = _first(p.get("ingredients_text_fr"), p.get("ingredients_text"))
195
- brands = _first(p.get("brands"), None)
196
- stores = _first(p.get("stores"), None)
197
- return {
198
- "ok": True, "status": status, "status_verbose": data.get("status_verbose"),
199
- "code": code, "used_url": u,
200
- "product_name": product_name,
201
- "categories_tags": categories_tags,
202
- "ingredients_text": ingredients_text,
203
- "brands": brands, "brands_list": _to_list(brands),
204
- "stores": stores, "stores_list": _to_list(stores),
205
- "step3_inputs": {
206
- "product_name": product_name,
207
- "categories_tags": categories_tags,
208
- "ingredients_text": ingredients_text,
209
- },
210
- }
211
- except Exception as e:
212
- last_err = str(e)
213
-
214
- return {"ok": False, "status": 0, "code": code, "error": last_err or "not found"}
215
-
216
-
217
-
218
- # ---- RegexCOICOP : normalisation locale + regex précompilées ----
219
- class RegexCOICOP(Tool):
220
- name, description = "coicop_regex_rules", "Règles regex → candidats COICOP."
221
- inputs = {"text": {"type": "string", "description": "Libellé produit (texte libre) à analyser."}}
222
- output_type = "dict"
223
-
224
- import re as _re
225
- SOFT = _re.compile(r"(?:\b|^)(?:CAMEMB(?:ERT)?|BRIE|COULOMMI(?:ERS?)?|BLEU|ROQUEFORT|GORGONZ(?:OLA)?|REBLOCHON|MUNSTER)(?:\b|$)")
226
- PRESS = _re.compile(r"(?:\b|^)(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)(?:\b|$)")
227
- GOAT = _re.compile(r"(?:\b|^)(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)(?:\b|$)")
228
- PROC = _re.compile(r"(?:\b|^)(FONDU(?:ES?)?|FROMAGE FONDU|TOASTINETTES?|VACHE QUI RIT|KIRI|CARRE FRAIS|CARR[ÉE] FRAIS|PORTIONS?)(?:\b|$)|\bRAP[ÉE]?\b")
229
-
230
- @staticmethod
231
- def _normalize_txt(s: str) -> str:
232
- import unicodedata, re
233
- if not s: return ""
234
- s = s.upper()
235
- s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
236
- s = re.sub(r"[^A-Z0-9% ]+", " ", s)
237
- return re.sub(r"\s+", " ", s).strip()
238
-
239
- def forward(self, text: str):
240
- import re
241
- s = self._normalize_txt(text); c=[]
242
- if self.SOFT.search(s): c.append({"code":"01.1.4.5.2","why":"pâte molle/persillée","score":0.95})
243
- if self.PRESS.search(s): c.append({"code":"01.1.4.5.3","why":"pâte pressée","score":0.90})
244
- if self.GOAT.search(s): c.append({"code":"01.1.4.5.4","why":"chèvre","score":0.90})
245
- if self.PROC.search(s): c.append({"code":"01.1.4.5.5","why":"fondu/râpé/portions","score":0.85})
246
- if not c and re.search(r"\bFROMAGE\b", s): c.append({"code":"01.1.4.5","why":"générique fromage/laits caillés","score":0.6})
247
- return {"candidates": c}
248
-
249
-
250
- # ---- OFFtoCOICOP : normalisation locale + regex règles ----
251
- class OFFtoCOICOP(Tool):
252
- name, description = "map_off_to_coicop", "Mappe catégories OFF vers COICOP (off_payload ou champs séparés)."
253
- inputs = {
254
- "product_name": {"type":"string", "description":"Nom produit OFF (fr/en).", "nullable": True},
255
- "categories_tags": {"type":"array", "description":"Liste OFF categories_tags.", "nullable": True},
256
- "ingredients_text":{"type":"string","description":"Texte ingrédients.", "nullable": True},
257
- "off_payload": {"type":"string","description":"Chaîne JSON brute renvoyée par l'étape 2.", "nullable": True},
258
- }
259
- output_type="dict"
260
-
261
- import re as _re, json as _json, ast as _ast
262
- def _normalize_txt(self, s: str) -> str:
263
- import unicodedata, re
264
- if not s: return ""
265
- s = s.upper()
266
- s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
267
- s = re.sub(r"[^A-Z0-9% ]+", " ", s)
268
- return re.sub(r"\s+", " ", s).strip()
269
- def _to_list(self, x):
270
- import re
271
- if x is None: return []
272
- if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
273
- if isinstance(x, str): return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
274
- return [str(x).strip()]
275
- def _safe_parse(self, s):
276
- try: return self._json.loads(s)
277
- except Exception:
278
- try: return self._ast.literal_eval(s)
279
- except Exception: return {}
280
-
281
- RULES = [
282
- (_re.compile(r"\b(CAMEMBERT|BRIE|COULOMMIERS|BLUE CHEESE|ROQUEFORT|GORGONZOLA|MUNSTER|REBLOCHON)\b"), ("01.1.4.5.2",0.95,"OFF: pâte molle/persillée")),
283
- (_re.compile(r"\b(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)\b"), ("01.1.4.5.3",0.90,"OFF: pâte pressée")),
284
- (_re.compile(r"\b(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)\b"), ("01.1.4.5.4",0.90,"OFF: chèvre")),
285
- (_re.compile(r"\b(FONDU|FONDUES?|RAPE|RÂPE|PORTIONS?|KIRI|VACHE QUI RIT|CARRE FRAIS|CARR[ÉE] FRAIS)\b"), ("01.1.4.5.5",0.85,"OFF: fondu/rapé/portions")),
286
- ]
287
-
288
- def forward(self, product_name=None, categories_tags=None, ingredients_text=None, off_payload=None):
289
- if off_payload and not (product_name or categories_tags or ingredients_text):
290
- data = self._safe_parse(off_payload) or {}
291
- product_name = data.get("product_name") or ""
292
- categories_tags = self._to_list(data.get("categories_tags"))
293
- ingredients_text= data.get("ingredients_text") or ""
294
-
295
- text = " ".join([t for t in [
296
- self._normalize_txt(product_name or ""),
297
- self._normalize_txt(" ".join(self._to_list(categories_tags))),
298
- self._normalize_txt(ingredients_text or "")
299
- ] if t])
300
-
301
- c=[]
302
- for rx,(code,score,why) in self.RULES:
303
- if rx.search(text): c.append({"code":code,"why":why,"score":score})
304
-
305
- return {"candidates": c}
306
-
307
-
308
- # ---- SemSim : COICOP embarqué + import lazy du modèle ----
309
- class SemSim(Tool):
310
- name, description = "coicop_semantic_similarity", "Embeddings top-k COICOP."
311
- inputs = {"text":{"type":"string","description":"Texte libellé"},
312
- "topk":{"type":"integer","description":"Nombre de candidats (défaut 5)","nullable":True}}
313
- output_type = "dict"
314
- requirements = ["sentence_transformers", "torch"]
315
-
316
- COICOP_ITEMS = [
317
- {"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"},
318
- {"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"},
319
- {"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"},
320
- {"code": "01.1.4.5.4", "label": "Fromage de chèvre"},
321
- {"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"},
322
- {"code": "01.1.1.4", "label": "Pain"},
323
- {"code": "01.1.1.1", "label": "Riz"},
324
- {"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"},
325
- ]
326
-
327
- @staticmethod
328
- def _normalize_txt(s: str) -> str:
329
- import unicodedata, re
330
- if not s: return ""
331
- s = s.upper()
332
- s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
333
- s = re.sub(r"[^A-Z0-9% ]+", " ", s)
334
- return re.sub(r"\s+", " ", s).strip()
335
-
336
- def forward(self, text: str, topk: int = 5):
337
- from sentence_transformers import SentenceTransformer, util
338
- if not hasattr(self, "_model"):
339
- self._model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
340
- q = self._normalize_txt(text)
341
- q_emb = self._model.encode([q], normalize_embeddings=True)
342
- labels = [f"{it['code']} {it['label']}" for it in self.COICOP_ITEMS]
343
- L = self._model.encode(labels, normalize_embeddings=True)
344
- sims = util.cos_sim(q_emb, L).tolist()[0]
345
- ranked = sorted(
346
- [{"code": self.COICOP_ITEMS[i]["code"], "label": self.COICOP_ITEMS[i]["label"], "score": float(sims[i])}
347
- for i in range(len(self.COICOP_ITEMS))],
348
- key=lambda x: x["score"], reverse=True
349
- )
350
- return {"candidates": ranked[:max(1,int(topk))]}
351
-
352
-
353
- # ---- Resolve : import local json ----
354
- class Resolve(Tool):
355
- name, description = "resolve_coicop_candidates", "Fusionne candidats → choix final + alternatives + explication."
356
- inputs = {"json_lists": {"type":"array","description":"Liste de JSON (str/dict) d'autres tools."},
357
- "topn":{"type":"integer","description":"Nb d'alternatives (défaut 3)","nullable":True}}
358
- output_type = "dict"
359
-
360
- def forward(self, json_lists, topn: int = 3):
361
- # accepter listes de strings JSON OU de dicts
362
- import json
363
- from typing import Dict, Any
364
- bucket: Dict[str, Dict[str, Any]] = {}
365
- for s in json_lists:
366
- data = s
367
- if isinstance(s, str):
368
- try: data = json.loads(s)
369
- except Exception: data = {}
370
- if not isinstance(data, dict): continue
371
- for c in data.get("candidates", []):
372
- code = c["code"]; score = float(c.get("score", 0.0))
373
- why = c.get("why", "") or c.get("label", "")
374
- if code not in bucket:
375
- bucket[code] = {"code":code,"score":score,"votes":1,"evidences":[why] if why else []}
376
- else:
377
- bucket[code]["score"] = max(bucket[code]["score"], score)
378
- bucket[code]["votes"] += 1
379
- if why: bucket[code]["evidences"].append(why)
380
- for v in bucket.values():
381
- v["score_final"] = v["score"] + 0.05*(v["votes"]-1)
382
- ranked = sorted(bucket.values(), key=lambda x: x["score_final"], reverse=True)
383
- if not ranked: return {"final": None, "alternatives": [], "explanation":"Aucun candidat"}
384
- final = ranked[0]; alts = ranked[1:1+max(0,int(topn))]
385
- exp = f"Choix {final['code']} (score {final['score_final']:.2f}) – votes={final['votes']} – raisons: {', '.join(sorted(set(final['evidences'])))}"
386
- return {"final": final, "alternatives": alts, "explanation": exp}
387
-
388
-
389
-
390
- def build_agent(model_id: str | None = None) -> CodeAgent:
391
- model_id = model_id or "Qwen/Qwen2.5-Coder-7B-Instruct" # léger pour tester
392
- agent = CodeAgent(
393
- tools=[ValidateEANTool(), OFFByEAN(), RegexCOICOP(), OFFtoCOICOP(), SemSim(), Resolve()],
394
- model=InferenceClientModel(model_id=model_id),
395
- add_base_tools=False,
396
- max_steps=6,
397
- verbosity_level=2,
398
- )
399
- return agent
400
-
401
- def parse_result(res):
402
- if isinstance(res, dict): return res
403
- try: return ast.literal_eval(res)
404
- except Exception: return {"raw": res}
405
-
406
- if __name__ == "__main__":
407
- # Remplace par les vraies données si possible - uniquement du test
408
- ean = "3256221112345" # EAN fictif (peut ne pas exister sur OFF)
409
- label = "Camembert au lait cru AOP 250g - ALDI"
410
-
411
- agent = build_agent()
412
- task = f"""
413
- Classe ce produit en COICOP:
414
- EAN: {ean}
415
- Libellé: {label}
416
- Pipeline:
417
- 1) validate_ean(ean)
418
- 2) openfoodfacts_product_by_ean(ean) # si OFF ne trouve pas, on s'appuie sur regex + embeddings
419
- 3) map_off_to_coicop(product_name, categories_tags, ingredients_text)
420
- 4) coicop_regex_rules(text=libellé)
421
- 5) coicop_semantic_similarity(text=libellé, topk=5)
422
- 6) resolve_coicop_candidates([...], topn=3)
423
- Attend un JSON final.
424
- """
425
- out = agent.run(task)
426
  print(parse_result(out))
 
1
+ # quickstart_agent.py
2
+ from __future__ import annotations
3
+ import json, re, unicodedata, ast
4
+ from typing import List, Dict, Any, Optional
5
+ import requests
6
+ from smolagents import Tool, CodeAgent, InferenceClientModel
7
+ from sentence_transformers import SentenceTransformer, util
8
+
9
+ # ---- Mini référentiel COICOP (démo) ----
10
+ COICOP_ITEMS = [
11
+ {"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"},
12
+ {"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"},
13
+ {"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"},
14
+ {"code": "01.1.4.5.4", "label": "Fromage de chèvre"},
15
+ {"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"},
16
+ {"code": "01.1.1.4", "label": "Pain"},
17
+ {"code": "01.1.1.1", "label": "Riz"},
18
+ {"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"},
19
+ ]
20
+
21
+ def normalize_txt(s: str) -> str:
22
+ if not s: return ""
23
+ s = s.upper()
24
+ s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
25
+ s = re.sub(r"[^A-Z0-9% ]+", " ", s)
26
+ s = re.sub(r"\s+", " ", s).strip()
27
+ return s
28
+
29
+ def ean_check_digit_ok(ean: str) -> bool:
30
+ digits = re.sub(r"\D", "", ean)
31
+ if len(digits) not in (8, 12, 13, 14): return False
32
+ total = 0
33
+ for i, ch in enumerate(reversed(digits[:-1]), start=1):
34
+ n = int(ch); total += n * (3 if i % 2 == 1 else 1)
35
+ check = (10 - (total % 10)) % 10
36
+ return check == int(digits[-1])
37
+
38
+ # ---- ValidateEANTool : tout en local dans forward ----
39
+ class ValidateEANTool(Tool):
40
+ name, description = "validate_ean", "Valide un EAN/GTIN (clé GS1)."
41
+ inputs = {"ean": {"type": "string", "description": "Code EAN/GTIN (8/12/13/14 chiffres)."}}
42
+ output_type = "dict" # <— (facultatif)
43
+
44
+ def forward(self, ean: str):
45
+ import re
46
+ digits = re.sub(r"\D", "", ean or "")
47
+ if len(digits) not in (8, 12, 13, 14):
48
+ return {"valid": False, "normalized": digits}
49
+ total = 0
50
+ for i, ch in enumerate(reversed(digits[:-1]), start=1):
51
+ n = int(ch); total += n * (3 if i % 2 == 1 else 1)
52
+ check = (10 - (total % 10)) % 10
53
+ return {"valid": check == int(digits[-1]), "normalized": digits}
54
+
55
+ # ---- OFFtoCOICOP : accepte off_payload (JSON brut) OU champs séparés ----
56
+
57
+ # ---- OFFByEAN : robuste + sortie normalisée + step3_inputs ----
58
+ class OFFByEAN(Tool):
59
+ name = "openfoodfacts_product_by_ean"
60
+ description = "Open Food Facts /api/v0|v2/product/{ean} (name, brands, categories...)."
61
+ inputs = {"ean": {"type": "string", "description": "EAN à interroger sur l'API OFF."}}
62
+ output_type = "dict"
63
+ requirements = ["requests"]
64
+
65
+ def forward(self, ean: str):
66
+ import re, requests
67
+ from requests.adapters import HTTPAdapter
68
+ try:
69
+ from urllib3.util.retry import Retry
70
+ except Exception:
71
+ Retry = None
72
+
73
+ def _to_list(x):
74
+ if x is None: return []
75
+ if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
76
+ if isinstance(x, str):
77
+ return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
78
+ return [str(x).strip()]
79
+
80
+ def _first(*vals):
81
+ for v in vals:
82
+ if isinstance(v, str) and v.strip(): return v.strip()
83
+ return ""
84
+
85
+ code = re.sub(r"\D", "", ean or "")
86
+ if not code:
87
+ return {"ok": False, "status": 0, "code": "", "error": "EAN vide"}
88
+
89
+ sess = requests.Session()
90
+ sess.headers.update({"User-Agent":"insee-coicop-agent/1.0","Accept":"application/json"})
91
+ if Retry:
92
+ retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[429,500,502,503,504],
93
+ allowed_methods=frozenset(["GET"]), raise_on_status=False)
94
+ sess.mount("https://", HTTPAdapter(max_retries=retry))
95
+
96
+ urls = [
97
+ f"https://world.openfoodfacts.org/api/v0/product/{code}.json",
98
+ "https://world.openfoodfacts.org/api/v2/product/"
99
+ f"{code}?lc=fr&fields=code,product_name,product_name_fr,brands,"
100
+ "categories_tags,categories_tags_fr,ingredients_text,ingredients_text_fr,"
101
+ "stores,status,status_verbose",
102
+ f"https://world.openfoodfacts.net/api/v0/product/{code}.json",
103
+ ]
104
+
105
+ last_err = None
106
+ for u in urls:
107
+ try:
108
+ r = sess.get(u, timeout=15)
109
+ if not r.ok:
110
+ last_err = f"HTTP {r.status_code}"
111
+ continue
112
+ data = r.json()
113
+ product = data.get("product")
114
+ status = data.get("status", 1 if product else 0)
115
+ if status == 1 or product:
116
+ p = product or {}
117
+ product_name = _first(p.get("product_name_fr"), p.get("product_name"))
118
+ categories_tags = p.get("categories_tags_fr") or p.get("categories_tags") or p.get("categories")
119
+ categories_tags = _to_list(categories_tags)
120
+ ingredients_text = _first(p.get("ingredients_text_fr"), p.get("ingredients_text"))
121
+ brands = _first(p.get("brands"), None)
122
+ stores = _first(p.get("stores"), None)
123
+ return {
124
+ "ok": True, "status": status, "status_verbose": data.get("status_verbose"),
125
+ "code": code, "used_url": u,
126
+ "product_name": product_name,
127
+ "categories_tags": categories_tags,
128
+ "ingredients_text": ingredients_text,
129
+ "brands": brands, "brands_list": _to_list(brands),
130
+ "stores": stores, "stores_list": _to_list(stores),
131
+ "step3_inputs": {
132
+ "product_name": product_name,
133
+ "categories_tags": categories_tags,
134
+ "ingredients_text": ingredients_text,
135
+ },
136
+ }
137
+ except Exception as e:
138
+ last_err = str(e)
139
+
140
+ return {"ok": False, "status": 0, "code": code, "error": last_err or "not found"}
141
+
142
+
143
+
144
+ # ---- RegexCOICOP : normalisation locale + regex précompilées ----
145
+ class RegexCOICOP(Tool):
146
+ name, description = "coicop_regex_rules", "Règles regex → candidats COICOP."
147
+ inputs = {"text": {"type": "string", "description": "Libellé produit (texte libre) à analyser."}}
148
+ output_type = "dict"
149
+
150
+ import re as _re
151
+ SOFT = _re.compile(r"(?:\b|^)(?:CAMEMB(?:ERT)?|BRIE|COULOMMI(?:ERS?)?|BLEU|ROQUEFORT|GORGONZ(?:OLA)?|REBLOCHON|MUNSTER)(?:\b|$)")
152
+ PRESS = _re.compile(r"(?:\b|^)(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)(?:\b|$)")
153
+ GOAT = _re.compile(r"(?:\b|^)(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)(?:\b|$)")
154
+ PROC = _re.compile(r"(?:\b|^)(FONDU(?:ES?)?|FROMAGE FONDU|TOASTINETTES?|VACHE QUI RIT|KIRI|CARRE FRAIS|CARR[ÉE] FRAIS|PORTIONS?)(?:\b|$)|\bRAP[ÉE]?\b")
155
+
156
+ @staticmethod
157
+ def _normalize_txt(s: str) -> str:
158
+ import unicodedata, re
159
+ if not s: return ""
160
+ s = s.upper()
161
+ s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
162
+ s = re.sub(r"[^A-Z0-9% ]+", " ", s)
163
+ return re.sub(r"\s+", " ", s).strip()
164
+
165
+ def forward(self, text: str):
166
+ import re
167
+ s = self._normalize_txt(text); c=[]
168
+ if self.SOFT.search(s): c.append({"code":"01.1.4.5.2","why":"pâte molle/persillée","score":0.95})
169
+ if self.PRESS.search(s): c.append({"code":"01.1.4.5.3","why":"pâte pressée","score":0.90})
170
+ if self.GOAT.search(s): c.append({"code":"01.1.4.5.4","why":"chèvre","score":0.90})
171
+ if self.PROC.search(s): c.append({"code":"01.1.4.5.5","why":"fondu/râpé/portions","score":0.85})
172
+ if not c and re.search(r"\bFROMAGE\b", s): c.append({"code":"01.1.4.5","why":"générique fromage/laits caillés","score":0.6})
173
+ return {"candidates": c}
174
+
175
+
176
+ # ---- OFFtoCOICOP : normalisation locale + regex règles ----
177
+ class OFFtoCOICOP(Tool):
178
+ name, description = "map_off_to_coicop", "Mappe catégories OFF vers COICOP (off_payload ou champs séparés)."
179
+ inputs = {
180
+ "product_name": {"type":"string", "description":"Nom produit OFF (fr/en).", "nullable": True},
181
+ "categories_tags": {"type":"array", "description":"Liste OFF categories_tags.", "nullable": True},
182
+ "ingredients_text":{"type":"string","description":"Texte ingrédients.", "nullable": True},
183
+ "off_payload": {"type":"string","description":"Chaîne JSON brute renvoyée par l'étape 2.", "nullable": True},
184
+ }
185
+ output_type="dict"
186
+
187
+ import re as _re, json as _json, ast as _ast
188
+ def _normalize_txt(self, s: str) -> str:
189
+ import unicodedata, re
190
+ if not s: return ""
191
+ s = s.upper()
192
+ s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
193
+ s = re.sub(r"[^A-Z0-9% ]+", " ", s)
194
+ return re.sub(r"\s+", " ", s).strip()
195
+ def _to_list(self, x):
196
+ import re
197
+ if x is None: return []
198
+ if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
199
+ if isinstance(x, str): return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
200
+ return [str(x).strip()]
201
+ def _safe_parse(self, s):
202
+ try: return self._json.loads(s)
203
+ except Exception:
204
+ try: return self._ast.literal_eval(s)
205
+ except Exception: return {}
206
+
207
+ RULES = [
208
+ (_re.compile(r"\b(CAMEMBERT|BRIE|COULOMMIERS|BLUE CHEESE|ROQUEFORT|GORGONZOLA|MUNSTER|REBLOCHON)\b"), ("01.1.4.5.2",0.95,"OFF: pâte molle/persillée")),
209
+ (_re.compile(r"\b(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)\b"), ("01.1.4.5.3",0.90,"OFF: pâte pressée")),
210
+ (_re.compile(r"\b(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)\b"), ("01.1.4.5.4",0.90,"OFF: chèvre")),
211
+ (_re.compile(r"\b(FONDU|FONDUES?|RAPE|RÂPE|PORTIONS?|KIRI|VACHE QUI RIT|CARRE FRAIS|CARR[ÉE] FRAIS)\b"), ("01.1.4.5.5",0.85,"OFF: fondu/rapé/portions")),
212
+ ]
213
+
214
+ def forward(self, product_name=None, categories_tags=None, ingredients_text=None, off_payload=None):
215
+ if off_payload and not (product_name or categories_tags or ingredients_text):
216
+ data = self._safe_parse(off_payload) or {}
217
+ product_name = data.get("product_name") or ""
218
+ categories_tags = self._to_list(data.get("categories_tags"))
219
+ ingredients_text= data.get("ingredients_text") or ""
220
+
221
+ text = " ".join([t for t in [
222
+ self._normalize_txt(product_name or ""),
223
+ self._normalize_txt(" ".join(self._to_list(categories_tags))),
224
+ self._normalize_txt(ingredients_text or "")
225
+ ] if t])
226
+
227
+ c=[]
228
+ for rx,(code,score,why) in self.RULES:
229
+ if rx.search(text): c.append({"code":code,"why":why,"score":score})
230
+
231
+ return {"candidates": c}
232
+
233
+
234
+ # ---- SemSim : COICOP embarqué + import lazy du modèle ----
235
+ class SemSim(Tool):
236
+ name, description = "coicop_semantic_similarity", "Embeddings → top-k COICOP."
237
+ inputs = {"text":{"type":"string","description":"Texte libellé"},
238
+ "topk":{"type":"integer","description":"Nombre de candidats (défaut 5)","nullable":True}}
239
+ output_type = "dict"
240
+ requirements = ["sentence_transformers", "torch"]
241
+
242
+ COICOP_ITEMS = [
243
+ {"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"},
244
+ {"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"},
245
+ {"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"},
246
+ {"code": "01.1.4.5.4", "label": "Fromage de chèvre"},
247
+ {"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"},
248
+ {"code": "01.1.1.4", "label": "Pain"},
249
+ {"code": "01.1.1.1", "label": "Riz"},
250
+ {"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"},
251
+ ]
252
+
253
+ @staticmethod
254
+ def _normalize_txt(s: str) -> str:
255
+ import unicodedata, re
256
+ if not s: return ""
257
+ s = s.upper()
258
+ s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
259
+ s = re.sub(r"[^A-Z0-9% ]+", " ", s)
260
+ return re.sub(r"\s+", " ", s).strip()
261
+
262
+ def forward(self, text: str, topk: int = 5):
263
+ from sentence_transformers import SentenceTransformer, util
264
+ if not hasattr(self, "_model"):
265
+ self._model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
266
+ q = self._normalize_txt(text)
267
+ q_emb = self._model.encode([q], normalize_embeddings=True)
268
+ labels = [f"{it['code']} {it['label']}" for it in self.COICOP_ITEMS]
269
+ L = self._model.encode(labels, normalize_embeddings=True)
270
+ sims = util.cos_sim(q_emb, L).tolist()[0]
271
+ ranked = sorted(
272
+ [{"code": self.COICOP_ITEMS[i]["code"], "label": self.COICOP_ITEMS[i]["label"], "score": float(sims[i])}
273
+ for i in range(len(self.COICOP_ITEMS))],
274
+ key=lambda x: x["score"], reverse=True
275
+ )
276
+ return {"candidates": ranked[:max(1,int(topk))]}
277
+
278
+
279
+ # ---- Resolve : import local json ----
280
+ class Resolve(Tool):
281
+ name, description = "resolve_coicop_candidates", "Fusionne candidats → choix final + alternatives + explication."
282
+ inputs = {"json_lists": {"type":"array","description":"Liste de JSON (str/dict) d'autres tools."},
283
+ "topn":{"type":"integer","description":"Nb d'alternatives (défaut 3)","nullable":True}}
284
+ output_type = "dict"
285
+
286
+ def forward(self, json_lists, topn: int = 3):
287
+ # accepter listes de strings JSON OU de dicts
288
+ import json
289
+ from typing import Dict, Any
290
+ bucket: Dict[str, Dict[str, Any]] = {}
291
+ for s in json_lists:
292
+ data = s
293
+ if isinstance(s, str):
294
+ try: data = json.loads(s)
295
+ except Exception: data = {}
296
+ if not isinstance(data, dict): continue
297
+ for c in data.get("candidates", []):
298
+ code = c["code"]; score = float(c.get("score", 0.0))
299
+ why = c.get("why", "") or c.get("label", "")
300
+ if code not in bucket:
301
+ bucket[code] = {"code":code,"score":score,"votes":1,"evidences":[why] if why else []}
302
+ else:
303
+ bucket[code]["score"] = max(bucket[code]["score"], score)
304
+ bucket[code]["votes"] += 1
305
+ if why: bucket[code]["evidences"].append(why)
306
+ for v in bucket.values():
307
+ v["score_final"] = v["score"] + 0.05*(v["votes"]-1)
308
+ ranked = sorted(bucket.values(), key=lambda x: x["score_final"], reverse=True)
309
+ if not ranked: return {"final": None, "alternatives": [], "explanation":"Aucun candidat"}
310
+ final = ranked[0]; alts = ranked[1:1+max(0,int(topn))]
311
+ exp = f"Choix {final['code']} (score {final['score_final']:.2f}) – votes={final['votes']} – raisons: {', '.join(sorted(set(final['evidences'])))}"
312
+ return {"final": final, "alternatives": alts, "explanation": exp}
313
+
314
+
315
+
316
+ def build_agent(model_id: str | None = None) -> CodeAgent:
317
+ model_id = model_id or "Qwen/Qwen2.5-Coder-7B-Instruct" # léger pour tester
318
+ agent = CodeAgent(
319
+ tools=[ValidateEANTool(), OFFByEAN(), RegexCOICOP(), OFFtoCOICOP(), SemSim(), Resolve()],
320
+ model=InferenceClientModel(model_id=model_id),
321
+ add_base_tools=False,
322
+ max_steps=6,
323
+ verbosity_level=2,
324
+ )
325
+ return agent
326
+
327
+ def parse_result(res):
328
+ if isinstance(res, dict): return res
329
+ try: return ast.literal_eval(res)
330
+ except Exception: return {"raw": res}
331
+
332
+ if __name__ == "__main__":
333
+ # Remplace par les vraies données si possible - uniquement du test
334
+ ean = "3256221112345" # EAN fictif (peut ne pas exister sur OFF)
335
+ label = "Camembert au lait cru AOP 250g - ALDI"
336
+
337
+ agent = build_agent()
338
+ task = f"""
339
+ Classe ce produit en COICOP:
340
+ EAN: {ean}
341
+ Libellé: {label}
342
+ Pipeline:
343
+ 1) validate_ean(ean)
344
+ 2) openfoodfacts_product_by_ean(ean) # si OFF ne trouve pas, on s'appuie sur regex + embeddings
345
+ 3) map_off_to_coicop(product_name, categories_tags, ingredients_text)
346
+ 4) coicop_regex_rules(text=libellé)
347
+ 5) coicop_semantic_similarity(text=libellé, topk=5)
348
+ 6) resolve_coicop_candidates([...], topn=3)
349
+ Attend un JSON final.
350
+ """
351
+ out = agent.run(task)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
352
  print(parse_result(out))