Spaces:
Sleeping
Sleeping
| """ | |
| Inference pipeline for the Smart Meal Analyzer. | |
| Combines: | |
| - NLP extraction (ingredients text → 8 structured features via gpt-4o-mini) | |
| - CV feature extraction (meal photo → 4 visual features) | |
| * "fast" mode: trained SqueezeNet (local, free) | |
| * "quality" mode: gpt-4o-mini vision (matches training-time labels) | |
| - GBM v3 (calories regression) | |
| - LLM explanation (German, 3-5 sentences) | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import io | |
| import json | |
| import os | |
| import pickle | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Literal, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image | |
| REPO_ROOT = Path(__file__).resolve().parents[1] | |
| DATA_PROCESSED = REPO_ROOT / "data" / "processed" | |
| MODELS = REPO_ROOT / "models" | |
| CUISINE_CLASSES = ["asian", "european", "american", "mexican", "mediterranean", "other"] | |
| DISH_CATEGORIES = ["main", "salad", "soup", "dessert", "snack", "drink", "other"] | |
| HEADS = {"portion_bucket": 5, "cuisine_class": 6, "visible_fat": 5, "dish_category": 7} | |
| _CACHE: dict = {} | |
| # Load .env if available so OPENAI_API_KEY is picked up automatically. | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv(REPO_ROOT / ".env") | |
| except ImportError: | |
| pass | |
| def get_openai_client(): | |
| if "openai" not in _CACHE: | |
| from openai import OpenAI | |
| _CACHE["openai"] = OpenAI() | |
| return _CACHE["openai"] | |
| def get_gbm(variant: str = "v3"): | |
| key = f"gbm_{variant}" | |
| if key not in _CACHE: | |
| with open(MODELS / f"gbm_{variant}.pkl", "rb") as f: | |
| _CACHE[key] = pickle.load(f) | |
| return _CACHE[key] | |
| def get_shap_explainer(variant: str = "v3"): | |
| """Cache a SHAP TreeExplainer for the calorie GBM. Returns None if shap unavailable.""" | |
| key = f"shap_{variant}" | |
| if key in _CACHE: | |
| return _CACHE[key] | |
| try: | |
| import shap | |
| bundle = get_gbm(variant) | |
| # tree_path_dependent is faster for sklearn GBM and needs no background data | |
| try: | |
| explainer = shap.TreeExplainer(bundle["model"], feature_perturbation="tree_path_dependent") | |
| except TypeError: | |
| explainer = shap.TreeExplainer(bundle["model"]) | |
| _CACHE[key] = explainer | |
| return explainer | |
| except Exception: | |
| _CACHE[key] = None | |
| return None | |
| def get_macro_models(variant: str = "v3") -> dict: | |
| """Optional: load macro-target GBM bundles (protein, fat, carb). Returns {} if not present.""" | |
| key = f"macros_{variant}" | |
| if key in _CACHE: | |
| return _CACHE[key] | |
| macros = {} | |
| for macro in ("protein", "fat", "carb"): | |
| path = MODELS / f"gbm_{variant}_{macro}.pkl" | |
| if path.exists(): | |
| with open(path, "rb") as f: | |
| macros[macro] = pickle.load(f) | |
| _CACHE[key] = macros | |
| return macros | |
| def get_cv_classifier(): | |
| if "cv_model" in _CACHE: | |
| return _CACHE["cv_model"], _CACHE["cv_device"], _CACHE["cv_transform"] | |
| import torch | |
| import torchvision.transforms as T | |
| from torchvision import models | |
| ckpt = MODELS / "cv_classifier.pth" | |
| if not ckpt.exists(): | |
| raise FileNotFoundError(f"No CV classifier at {ckpt}") | |
| device = torch.device("mps" if torch.backends.mps.is_available() | |
| else "cuda" if torch.cuda.is_available() else "cpu") | |
| state = torch.load(ckpt, map_location=device, weights_only=False) | |
| H = state["heads"] | |
| class Net(torch.nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| backbone = models.squeezenet1_1(weights=None) | |
| self.features = backbone.features | |
| self.pool = torch.nn.AdaptiveAvgPool2d(1) | |
| self.heads = torch.nn.ModuleDict({h: torch.nn.Linear(512, n) for h, n in H.items()}) | |
| def forward(self, x): | |
| f = self.pool(self.features(x)).flatten(1) | |
| return {h: self.heads[h](f) for h in H} | |
| model = Net().to(device) | |
| model.load_state_dict(state["state_dict"]) | |
| model.eval() | |
| transform = T.Compose([ | |
| T.Resize(256), T.CenterCrop(224), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| _CACHE["cv_model"] = model | |
| _CACHE["cv_device"] = device | |
| _CACHE["cv_transform"] = transform | |
| return model, device, transform | |
| # ── NLP ────────────────────────────────────────────────────────────────────── | |
| NLP_PROMPT = """You are a nutrition-aware ingredient parser. | |
| Read ingredients and return ONLY valid JSON with EXACTLY: | |
| {"naive_kcal_per_serving": <int>, "n_ingredients_parsed": <int>, | |
| "has_oil_or_butter": <0|1>, "has_meat": <0|1>, "has_dairy": <0|1>, "has_added_sugar": <0|1>, | |
| "cooking_method_class": "raw"|"baked"|"fried"|"boiled"|"steamed"|"grilled"|"other", | |
| "dietary_flag": "vegan"|"vegetarian"|"omnivore"} | |
| Use full 0-3000 range for naive_kcal_per_serving.""" | |
| ALLERGEN_PROMPT = """You detect common food allergens in an ingredient list. Return ONLY valid JSON with EXACTLY these boolean fields (0 or 1): | |
| {"contains_gluten": <0|1>, "contains_lactose": <0|1>, "contains_nuts": <0|1>, | |
| "contains_eggs": <0|1>, "contains_soy": <0|1>, "contains_fish_or_shellfish": <0|1>} | |
| Be precise: 1 only when the ingredient is clearly present (or a clear derivative — wheat flour → gluten, milk → lactose, peanut butter → nuts). 0 if absent or uncertain.""" | |
| def _call_with_retry(client, **kwargs): | |
| """Wrap an OpenAI chat completion with simple exponential backoff (up to 3 tries).""" | |
| import time as _time | |
| last_err = None | |
| for attempt in range(3): | |
| try: | |
| return client.chat.completions.create(**kwargs) | |
| except Exception as e: | |
| last_err = e | |
| _time.sleep(0.5 * (2 ** attempt)) | |
| raise last_err | |
| def extract_text_features(text: str) -> dict: | |
| if not text or not text.strip(): | |
| return {"naive_kcal_per_serving": 400, "n_ingredients_parsed": 0, | |
| "has_oil_or_butter": 0, "has_meat": 0, "has_dairy": 0, "has_added_sugar": 0, | |
| "cooking_method_class": "other", "dietary_flag": "omnivore"} | |
| client = get_openai_client() | |
| resp = _call_with_retry(client, | |
| model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": NLP_PROMPT}, | |
| {"role": "user", "content": text[:4000]}, | |
| ], | |
| temperature=0.0, max_tokens=200, seed=42, | |
| ) | |
| return json.loads(resp.choices[0].message.content) | |
| def detect_allergens(text: str) -> dict: | |
| """Best-effort allergen detection — used at inference time only, not part of training.""" | |
| default = {"contains_gluten": 0, "contains_lactose": 0, "contains_nuts": 0, | |
| "contains_eggs": 0, "contains_soy": 0, "contains_fish_or_shellfish": 0} | |
| if not text or not text.strip(): | |
| return default | |
| try: | |
| client = get_openai_client() | |
| resp = _call_with_retry(client, | |
| model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": ALLERGEN_PROMPT}, | |
| {"role": "user", "content": text[:4000]}, | |
| ], | |
| temperature=0.0, max_tokens=150, seed=42, | |
| ) | |
| return {**default, **json.loads(resp.choices[0].message.content)} | |
| except Exception: | |
| return default | |
| # ── CV: vision-LLM ─────────────────────────────────────────────────────────── | |
| CV_LLM_PROMPT = """You are a food-image analyst. | |
| Look at the meal photo and return ONLY valid JSON with EXACTLY: | |
| {"portion_bucket": <int 1-5>, | |
| "cuisine_class": "asian"|"european"|"american"|"mexican"|"mediterranean"|"other", | |
| "visible_fat": <int 1-5>, | |
| "dish_category": "main"|"salad"|"soup"|"dessert"|"snack"|"drink"|"other"} | |
| Use full 1-5 range for portion_bucket and visible_fat.""" | |
| def cv_features_from_llm(image: Image.Image) -> dict: | |
| """Vision-LLM CV path with retry + CNN-fallback on failure.""" | |
| client = get_openai_client() | |
| img = image.copy() | |
| img.thumbnail((512, 512)) | |
| buf = io.BytesIO() | |
| img.convert("RGB").save(buf, format="JPEG", quality=85) | |
| img_b64 = base64.b64encode(buf.getvalue()).decode("ascii") | |
| try: | |
| resp = _call_with_retry(client, | |
| model=os.environ.get("OPENAI_VISION_MODEL", "gpt-4o-mini"), | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": CV_LLM_PROMPT}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Analyze this meal photo."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}", "detail": "low"}}, | |
| ]}, | |
| ], | |
| temperature=0.0, max_tokens=200, seed=42, | |
| ) | |
| return json.loads(resp.choices[0].message.content) | |
| except Exception: | |
| # Fall back to the local CNN so the app still works without OpenAI | |
| try: | |
| return cv_features_from_classifier(image) | |
| except Exception: | |
| return {"portion_bucket": 3, "cuisine_class": "other", "visible_fat": 3, "dish_category": "other"} | |
| def cv_features_from_classifier(image: Image.Image) -> dict: | |
| import torch | |
| model, device, transform = get_cv_classifier() | |
| x = transform(image.convert("RGB")).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| out = model(x) | |
| return { | |
| "portion_bucket": int(out["portion_bucket"].argmax(1).item()) + 1, | |
| "cuisine_class": CUISINE_CLASSES[int(out["cuisine_class"].argmax(1).item())], | |
| "visible_fat": int(out["visible_fat"].argmax(1).item()) + 1, | |
| "dish_category": DISH_CATEGORIES[int(out["dish_category"].argmax(1).item())], | |
| } | |
| # ── Build feature row + predict ────────────────────────────────────────────── | |
| class Prediction: | |
| predicted_calories: float | |
| text_features: dict | |
| cv_features: dict | |
| cv_mode: str | |
| inputs: dict | |
| top_drivers: list # global feature importance (per model) | |
| macros: dict = None # {"protein_g": float, "fat_g": float, "carb_g": float} | |
| allergens: dict = None # {"contains_gluten": 0|1, ...} | |
| top_drivers_shap: list = None # per-instance SHAP contributions in kcal: [(name, shap_kcal, value), ...] | |
| shap_base_value: float = None # model's average prediction (intercept-like reference) | |
| FOOD_TYPES = ["Homemade food", "Restaurant food", "Packaged food", "Raw vegetables and fruits", "Others"] | |
| def _compute_shap(X: pd.DataFrame, feats: list, row: dict) -> tuple[Optional[list], Optional[float]]: | |
| """Return (top5 SHAP drivers, expected_value) for a single-row prediction, or (None, None) on failure. | |
| Newer numpy/shap combos return values with various shapes (n_features,), (1, n_features), | |
| (1, n_features, 1), or wrapper objects — we flatten defensively. | |
| """ | |
| explainer = get_shap_explainer("v3") | |
| if explainer is None: | |
| return None, None | |
| try: | |
| sv_arr = np.asarray(explainer.shap_values(X), dtype=float) | |
| sv_row = sv_arr.reshape(-1) if sv_arr.size == len(feats) else sv_arr.squeeze().reshape(-1) | |
| if sv_row.size != len(feats): | |
| return None, None | |
| top5 = np.argsort(np.abs(sv_row))[::-1][:5] | |
| drivers = [(str(feats[i]), float(sv_row[i]), float(row[feats[i]])) for i in top5] | |
| ev_arr = np.asarray(getattr(explainer, "expected_value", 0), dtype=float).flatten() | |
| base = float(ev_arr[0]) if ev_arr.size > 0 else 0.0 | |
| return drivers, base | |
| except Exception: | |
| return None, None | |
| def predict(image: Optional[Image.Image], ingredients_text: str, | |
| food_type: str = "Homemade food", | |
| cv_mode: Literal["cnn", "llm"] = "llm") -> Prediction: | |
| text_features = extract_text_features(ingredients_text or "") | |
| if image is None: | |
| cv = {"portion_bucket": 3, "cuisine_class": "other", "visible_fat": 3, "dish_category": "other"} | |
| cv_mode_used = "default (no image)" | |
| elif cv_mode == "llm": | |
| cv = cv_features_from_llm(image) | |
| cv_mode_used = "Vision-LLM (gpt-4o-mini)" | |
| else: | |
| cv = cv_features_from_classifier(image) | |
| cv_mode_used = "CNN (SqueezeNet)" | |
| bundle = get_gbm("v3") | |
| feats = bundle["features"] | |
| row = {f: 0.0 for f in feats} | |
| # baseline | |
| row["n_ingredients_raw"] = float(text_features.get("n_ingredients_parsed", 0)) | |
| row["log_n_ingredients"] = float(np.log1p(row["n_ingredients_raw"])) | |
| row["ing_len"] = float(len(ingredients_text or "")) | |
| # food_type one-hot | |
| if food_type not in FOOD_TYPES: | |
| food_type = "Others" | |
| for ft in FOOD_TYPES[1:]: # drop_first | |
| col = f"food_{ft}" | |
| if col in row: | |
| row[col] = 1 if ft == food_type else 0 | |
| # TF-IDF: leave at 0 (the trained TF-IDF vocabulary isn't accessible at runtime — | |
| # this is a known limitation, the inference path emphasizes structured features) | |
| # NLP features | |
| for c in ["has_oil_or_butter", "has_meat", "has_dairy", "has_added_sugar"]: | |
| row[c] = int(bool(text_features.get(c, 0))) | |
| for c in ["naive_kcal_per_serving", "n_ingredients_parsed"]: | |
| if c in row: | |
| row[c] = float(text_features.get(c, 0)) | |
| cm = str(text_features.get("cooking_method_class", "other")) | |
| df_cm = str(text_features.get("dietary_flag", "omnivore")) | |
| for c in ["baked", "fried", "boiled", "steamed", "grilled", "other"]: | |
| col = f"cooking_method_class_{c}" | |
| if col in row: | |
| row[col] = 1 if cm == c else 0 | |
| for c in ["vegetarian", "vegan"]: | |
| col = f"dietary_flag_{c}" | |
| if col in row: | |
| row[col] = 1 if df_cm == c else 0 | |
| # CV features | |
| row["portion_bucket"] = float(cv.get("portion_bucket", 3)) | |
| row["visible_fat"] = float(cv.get("visible_fat", 3)) | |
| cu = str(cv.get("cuisine_class", "other")) | |
| dc = str(cv.get("dish_category", "other")) | |
| for c in CUISINE_CLASSES[1:]: | |
| col = f"cuisine_class_{c}" | |
| if col in row: | |
| row[col] = 1 if cu == c else 0 | |
| for c in DISH_CATEGORIES[1:]: | |
| col = f"dish_category_{c}" | |
| if col in row: | |
| row[col] = 1 if dc == c else 0 | |
| X = pd.DataFrame([row])[feats] | |
| cal = float(np.round(bundle["model"].predict(X)[0])) | |
| importances = bundle["model"].feature_importances_ | |
| top_idx = np.argsort(importances)[::-1][:5] | |
| top_drivers = [(feats[i], float(importances[i]), float(row[feats[i]])) for i in top_idx] | |
| # Per-instance SHAP contributions (kcal-units). Falls back gracefully if shap unavailable. | |
| top_drivers_shap, shap_base = _compute_shap(X, feats, row) | |
| # Predict macros (protein, fat, carb) — uses same feature row, separate models | |
| macro_bundles = get_macro_models("v3") | |
| macros = {} | |
| for macro_name, macro_bundle in macro_bundles.items(): | |
| macro_X = pd.DataFrame([row])[macro_bundle["features"]] | |
| macros[f"{macro_name}_g"] = float(max(0.0, np.round(macro_bundle["model"].predict(macro_X)[0], 1))) | |
| # Allergen detection (separate LLM call, fails gracefully) | |
| allergens = detect_allergens(ingredients_text or "") | |
| return Prediction( | |
| predicted_calories=cal, | |
| text_features=text_features, | |
| cv_features=cv, | |
| cv_mode=cv_mode_used, | |
| inputs={"food_type": food_type, "ingredients": ingredients_text}, | |
| top_drivers=top_drivers, | |
| macros=macros, | |
| allergens=allergens, | |
| top_drivers_shap=top_drivers_shap, | |
| shap_base_value=shap_base, | |
| ) | |
| # ── German explanation ─────────────────────────────────────────────────────── | |
| EXPLANATION_PROMPT_WITH_IMAGE = """Du erklärst eine Kalorien-Schätzung aus einem maschinellen Lernmodell auf Deutsch. | |
| Wichtig: | |
| - Berechne KEINEN neuen Wert. Nutze EXAKT den vorgegebenen Wert. | |
| - Antwort als JSON mit dem Schlüssel "answer". | |
| - 3-5 kurze Sätze. | |
| - Nimm Bezug auf mindestens ein visuelles UND ein textuelles Merkmal. | |
| - Erwähne genau eine Unsicherheit oder Limitation. | |
| - Keine Markdown-Formatierung.""" | |
| EXPLANATION_PROMPT_TEXT_ONLY = """Du erklärst eine Kalorien-Schätzung aus einem maschinellen Lernmodell auf Deutsch. | |
| Wichtig: | |
| - Berechne KEINEN neuen Wert. Nutze EXAKT den vorgegebenen Wert. | |
| - Antwort als JSON mit dem Schlüssel "answer". | |
| - 3-5 kurze Sätze. | |
| - Es wurde KEIN Foto hochgeladen — beziehe dich AUSSCHLIESSLICH auf textuelle Merkmale aus den Zutaten. | |
| - Erwähne NIEMALS visuelle Aspekte wie Portionsgröße, Aussehen, sichtbares Fett, Farbe oder Garmethode-aus-dem-Bild. | |
| - Erwähne genau eine Unsicherheit oder Limitation (z.B. "ohne Foto fehlt die Portionsschätzung"). | |
| - Keine Markdown-Formatierung.""" | |
| def explain(pred: Prediction, ingredients_excerpt: str = "") -> str: | |
| image_was_provided = "no image" not in pred.cv_mode.lower() | |
| summary = { | |
| "predicted_calories_kcal": pred.predicted_calories, | |
| "predicted_macros": pred.macros or {}, | |
| "food_type": pred.inputs.get("food_type"), | |
| "text_flags_true": [k for k, v in pred.text_features.items() if v == 1 or v is True], | |
| "text_naive_kcal_estimate": pred.text_features.get("naive_kcal_per_serving"), | |
| "top_features": [d[0] for d in pred.top_drivers], | |
| } | |
| if image_was_provided: | |
| summary["cv_mode"] = pred.cv_mode | |
| summary["cv_features"] = pred.cv_features | |
| system_prompt = EXPLANATION_PROMPT_WITH_IMAGE | |
| else: | |
| summary["image_provided"] = False | |
| system_prompt = EXPLANATION_PROMPT_TEXT_ONLY | |
| user_msg = "Schätzung & Merkmale:\n" + json.dumps(summary, ensure_ascii=False, indent=2) | |
| if ingredients_excerpt: | |
| user_msg += "\n\nZutaten:\n" + ingredients_excerpt[:600] | |
| client = get_openai_client() | |
| resp = client.chat.completions.create( | |
| model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_msg}, | |
| ], | |
| temperature=0.4, max_tokens=300, | |
| ) | |
| return json.loads(resp.choices[0].message.content)["answer"] | |