""" Inference pipeline for the Smart Meal Analyzer. Combines: - NLP extraction (ingredients text → 8 structured features via gpt-4o-mini) - CV feature extraction (meal photo → 4 visual features) * "fast" mode: trained SqueezeNet (local, free) * "quality" mode: gpt-4o-mini vision (matches training-time labels) - GBM v3 (calories regression) - LLM explanation (German, 3-5 sentences) """ from __future__ import annotations import base64 import io import json import os import pickle from dataclasses import dataclass from pathlib import Path from typing import Literal, Optional import numpy as np import pandas as pd from PIL import Image REPO_ROOT = Path(__file__).resolve().parents[1] DATA_PROCESSED = REPO_ROOT / "data" / "processed" MODELS = REPO_ROOT / "models" CUISINE_CLASSES = ["asian", "european", "american", "mexican", "mediterranean", "other"] DISH_CATEGORIES = ["main", "salad", "soup", "dessert", "snack", "drink", "other"] HEADS = {"portion_bucket": 5, "cuisine_class": 6, "visible_fat": 5, "dish_category": 7} _CACHE: dict = {} # Load .env if available so OPENAI_API_KEY is picked up automatically. try: from dotenv import load_dotenv load_dotenv(REPO_ROOT / ".env") except ImportError: pass def get_openai_client(): if "openai" not in _CACHE: from openai import OpenAI _CACHE["openai"] = OpenAI() return _CACHE["openai"] def get_gbm(variant: str = "v3"): key = f"gbm_{variant}" if key not in _CACHE: with open(MODELS / f"gbm_{variant}.pkl", "rb") as f: _CACHE[key] = pickle.load(f) return _CACHE[key] def get_shap_explainer(variant: str = "v3"): """Cache a SHAP TreeExplainer for the calorie GBM. Returns None if shap unavailable.""" key = f"shap_{variant}" if key in _CACHE: return _CACHE[key] try: import shap bundle = get_gbm(variant) # tree_path_dependent is faster for sklearn GBM and needs no background data try: explainer = shap.TreeExplainer(bundle["model"], feature_perturbation="tree_path_dependent") except TypeError: explainer = shap.TreeExplainer(bundle["model"]) _CACHE[key] = explainer return explainer except Exception: _CACHE[key] = None return None def get_macro_models(variant: str = "v3") -> dict: """Optional: load macro-target GBM bundles (protein, fat, carb). Returns {} if not present.""" key = f"macros_{variant}" if key in _CACHE: return _CACHE[key] macros = {} for macro in ("protein", "fat", "carb"): path = MODELS / f"gbm_{variant}_{macro}.pkl" if path.exists(): with open(path, "rb") as f: macros[macro] = pickle.load(f) _CACHE[key] = macros return macros def get_cv_classifier(): if "cv_model" in _CACHE: return _CACHE["cv_model"], _CACHE["cv_device"], _CACHE["cv_transform"] import torch import torchvision.transforms as T from torchvision import models ckpt = MODELS / "cv_classifier.pth" if not ckpt.exists(): raise FileNotFoundError(f"No CV classifier at {ckpt}") device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu") state = torch.load(ckpt, map_location=device, weights_only=False) H = state["heads"] class Net(torch.nn.Module): def __init__(self): super().__init__() backbone = models.squeezenet1_1(weights=None) self.features = backbone.features self.pool = torch.nn.AdaptiveAvgPool2d(1) self.heads = torch.nn.ModuleDict({h: torch.nn.Linear(512, n) for h, n in H.items()}) def forward(self, x): f = self.pool(self.features(x)).flatten(1) return {h: self.heads[h](f) for h in H} model = Net().to(device) model.load_state_dict(state["state_dict"]) model.eval() transform = T.Compose([ T.Resize(256), T.CenterCrop(224), T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) _CACHE["cv_model"] = model _CACHE["cv_device"] = device _CACHE["cv_transform"] = transform return model, device, transform # ── NLP ────────────────────────────────────────────────────────────────────── NLP_PROMPT = """You are a nutrition-aware ingredient parser. Read ingredients and return ONLY valid JSON with EXACTLY: {"naive_kcal_per_serving": , "n_ingredients_parsed": , "has_oil_or_butter": <0|1>, "has_meat": <0|1>, "has_dairy": <0|1>, "has_added_sugar": <0|1>, "cooking_method_class": "raw"|"baked"|"fried"|"boiled"|"steamed"|"grilled"|"other", "dietary_flag": "vegan"|"vegetarian"|"omnivore"} Use full 0-3000 range for naive_kcal_per_serving.""" ALLERGEN_PROMPT = """You detect common food allergens in an ingredient list. Return ONLY valid JSON with EXACTLY these boolean fields (0 or 1): {"contains_gluten": <0|1>, "contains_lactose": <0|1>, "contains_nuts": <0|1>, "contains_eggs": <0|1>, "contains_soy": <0|1>, "contains_fish_or_shellfish": <0|1>} Be precise: 1 only when the ingredient is clearly present (or a clear derivative — wheat flour → gluten, milk → lactose, peanut butter → nuts). 0 if absent or uncertain.""" def _call_with_retry(client, **kwargs): """Wrap an OpenAI chat completion with simple exponential backoff (up to 3 tries).""" import time as _time last_err = None for attempt in range(3): try: return client.chat.completions.create(**kwargs) except Exception as e: last_err = e _time.sleep(0.5 * (2 ** attempt)) raise last_err def extract_text_features(text: str) -> dict: if not text or not text.strip(): return {"naive_kcal_per_serving": 400, "n_ingredients_parsed": 0, "has_oil_or_butter": 0, "has_meat": 0, "has_dairy": 0, "has_added_sugar": 0, "cooking_method_class": "other", "dietary_flag": "omnivore"} client = get_openai_client() resp = _call_with_retry(client, model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), response_format={"type": "json_object"}, messages=[ {"role": "system", "content": NLP_PROMPT}, {"role": "user", "content": text[:4000]}, ], temperature=0.0, max_tokens=200, seed=42, ) return json.loads(resp.choices[0].message.content) def detect_allergens(text: str) -> dict: """Best-effort allergen detection — used at inference time only, not part of training.""" default = {"contains_gluten": 0, "contains_lactose": 0, "contains_nuts": 0, "contains_eggs": 0, "contains_soy": 0, "contains_fish_or_shellfish": 0} if not text or not text.strip(): return default try: client = get_openai_client() resp = _call_with_retry(client, model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), response_format={"type": "json_object"}, messages=[ {"role": "system", "content": ALLERGEN_PROMPT}, {"role": "user", "content": text[:4000]}, ], temperature=0.0, max_tokens=150, seed=42, ) return {**default, **json.loads(resp.choices[0].message.content)} except Exception: return default # ── CV: vision-LLM ─────────────────────────────────────────────────────────── CV_LLM_PROMPT = """You are a food-image analyst. Look at the meal photo and return ONLY valid JSON with EXACTLY: {"portion_bucket": , "cuisine_class": "asian"|"european"|"american"|"mexican"|"mediterranean"|"other", "visible_fat": , "dish_category": "main"|"salad"|"soup"|"dessert"|"snack"|"drink"|"other"} Use full 1-5 range for portion_bucket and visible_fat.""" def cv_features_from_llm(image: Image.Image) -> dict: """Vision-LLM CV path with retry + CNN-fallback on failure.""" client = get_openai_client() img = image.copy() img.thumbnail((512, 512)) buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=85) img_b64 = base64.b64encode(buf.getvalue()).decode("ascii") try: resp = _call_with_retry(client, model=os.environ.get("OPENAI_VISION_MODEL", "gpt-4o-mini"), response_format={"type": "json_object"}, messages=[ {"role": "system", "content": CV_LLM_PROMPT}, {"role": "user", "content": [ {"type": "text", "text": "Analyze this meal photo."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}", "detail": "low"}}, ]}, ], temperature=0.0, max_tokens=200, seed=42, ) return json.loads(resp.choices[0].message.content) except Exception: # Fall back to the local CNN so the app still works without OpenAI try: return cv_features_from_classifier(image) except Exception: return {"portion_bucket": 3, "cuisine_class": "other", "visible_fat": 3, "dish_category": "other"} def cv_features_from_classifier(image: Image.Image) -> dict: import torch model, device, transform = get_cv_classifier() x = transform(image.convert("RGB")).unsqueeze(0).to(device) with torch.no_grad(): out = model(x) return { "portion_bucket": int(out["portion_bucket"].argmax(1).item()) + 1, "cuisine_class": CUISINE_CLASSES[int(out["cuisine_class"].argmax(1).item())], "visible_fat": int(out["visible_fat"].argmax(1).item()) + 1, "dish_category": DISH_CATEGORIES[int(out["dish_category"].argmax(1).item())], } # ── Build feature row + predict ────────────────────────────────────────────── @dataclass class Prediction: predicted_calories: float text_features: dict cv_features: dict cv_mode: str inputs: dict top_drivers: list # global feature importance (per model) macros: dict = None # {"protein_g": float, "fat_g": float, "carb_g": float} allergens: dict = None # {"contains_gluten": 0|1, ...} top_drivers_shap: list = None # per-instance SHAP contributions in kcal: [(name, shap_kcal, value), ...] shap_base_value: float = None # model's average prediction (intercept-like reference) FOOD_TYPES = ["Homemade food", "Restaurant food", "Packaged food", "Raw vegetables and fruits", "Others"] def _compute_shap(X: pd.DataFrame, feats: list, row: dict) -> tuple[Optional[list], Optional[float]]: """Return (top5 SHAP drivers, expected_value) for a single-row prediction, or (None, None) on failure. Newer numpy/shap combos return values with various shapes (n_features,), (1, n_features), (1, n_features, 1), or wrapper objects — we flatten defensively. """ explainer = get_shap_explainer("v3") if explainer is None: return None, None try: sv_arr = np.asarray(explainer.shap_values(X), dtype=float) sv_row = sv_arr.reshape(-1) if sv_arr.size == len(feats) else sv_arr.squeeze().reshape(-1) if sv_row.size != len(feats): return None, None top5 = np.argsort(np.abs(sv_row))[::-1][:5] drivers = [(str(feats[i]), float(sv_row[i]), float(row[feats[i]])) for i in top5] ev_arr = np.asarray(getattr(explainer, "expected_value", 0), dtype=float).flatten() base = float(ev_arr[0]) if ev_arr.size > 0 else 0.0 return drivers, base except Exception: return None, None def predict(image: Optional[Image.Image], ingredients_text: str, food_type: str = "Homemade food", cv_mode: Literal["cnn", "llm"] = "llm") -> Prediction: text_features = extract_text_features(ingredients_text or "") if image is None: cv = {"portion_bucket": 3, "cuisine_class": "other", "visible_fat": 3, "dish_category": "other"} cv_mode_used = "default (no image)" elif cv_mode == "llm": cv = cv_features_from_llm(image) cv_mode_used = "Vision-LLM (gpt-4o-mini)" else: cv = cv_features_from_classifier(image) cv_mode_used = "CNN (SqueezeNet)" bundle = get_gbm("v3") feats = bundle["features"] row = {f: 0.0 for f in feats} # baseline row["n_ingredients_raw"] = float(text_features.get("n_ingredients_parsed", 0)) row["log_n_ingredients"] = float(np.log1p(row["n_ingredients_raw"])) row["ing_len"] = float(len(ingredients_text or "")) # food_type one-hot if food_type not in FOOD_TYPES: food_type = "Others" for ft in FOOD_TYPES[1:]: # drop_first col = f"food_{ft}" if col in row: row[col] = 1 if ft == food_type else 0 # TF-IDF: leave at 0 (the trained TF-IDF vocabulary isn't accessible at runtime — # this is a known limitation, the inference path emphasizes structured features) # NLP features for c in ["has_oil_or_butter", "has_meat", "has_dairy", "has_added_sugar"]: row[c] = int(bool(text_features.get(c, 0))) for c in ["naive_kcal_per_serving", "n_ingredients_parsed"]: if c in row: row[c] = float(text_features.get(c, 0)) cm = str(text_features.get("cooking_method_class", "other")) df_cm = str(text_features.get("dietary_flag", "omnivore")) for c in ["baked", "fried", "boiled", "steamed", "grilled", "other"]: col = f"cooking_method_class_{c}" if col in row: row[col] = 1 if cm == c else 0 for c in ["vegetarian", "vegan"]: col = f"dietary_flag_{c}" if col in row: row[col] = 1 if df_cm == c else 0 # CV features row["portion_bucket"] = float(cv.get("portion_bucket", 3)) row["visible_fat"] = float(cv.get("visible_fat", 3)) cu = str(cv.get("cuisine_class", "other")) dc = str(cv.get("dish_category", "other")) for c in CUISINE_CLASSES[1:]: col = f"cuisine_class_{c}" if col in row: row[col] = 1 if cu == c else 0 for c in DISH_CATEGORIES[1:]: col = f"dish_category_{c}" if col in row: row[col] = 1 if dc == c else 0 X = pd.DataFrame([row])[feats] cal = float(np.round(bundle["model"].predict(X)[0])) importances = bundle["model"].feature_importances_ top_idx = np.argsort(importances)[::-1][:5] top_drivers = [(feats[i], float(importances[i]), float(row[feats[i]])) for i in top_idx] # Per-instance SHAP contributions (kcal-units). Falls back gracefully if shap unavailable. top_drivers_shap, shap_base = _compute_shap(X, feats, row) # Predict macros (protein, fat, carb) — uses same feature row, separate models macro_bundles = get_macro_models("v3") macros = {} for macro_name, macro_bundle in macro_bundles.items(): macro_X = pd.DataFrame([row])[macro_bundle["features"]] macros[f"{macro_name}_g"] = float(max(0.0, np.round(macro_bundle["model"].predict(macro_X)[0], 1))) # Allergen detection (separate LLM call, fails gracefully) allergens = detect_allergens(ingredients_text or "") return Prediction( predicted_calories=cal, text_features=text_features, cv_features=cv, cv_mode=cv_mode_used, inputs={"food_type": food_type, "ingredients": ingredients_text}, top_drivers=top_drivers, macros=macros, allergens=allergens, top_drivers_shap=top_drivers_shap, shap_base_value=shap_base, ) # ── German explanation ─────────────────────────────────────────────────────── EXPLANATION_PROMPT_WITH_IMAGE = """Du erklärst eine Kalorien-Schätzung aus einem maschinellen Lernmodell auf Deutsch. Wichtig: - Berechne KEINEN neuen Wert. Nutze EXAKT den vorgegebenen Wert. - Antwort als JSON mit dem Schlüssel "answer". - 3-5 kurze Sätze. - Nimm Bezug auf mindestens ein visuelles UND ein textuelles Merkmal. - Erwähne genau eine Unsicherheit oder Limitation. - Keine Markdown-Formatierung.""" EXPLANATION_PROMPT_TEXT_ONLY = """Du erklärst eine Kalorien-Schätzung aus einem maschinellen Lernmodell auf Deutsch. Wichtig: - Berechne KEINEN neuen Wert. Nutze EXAKT den vorgegebenen Wert. - Antwort als JSON mit dem Schlüssel "answer". - 3-5 kurze Sätze. - Es wurde KEIN Foto hochgeladen — beziehe dich AUSSCHLIESSLICH auf textuelle Merkmale aus den Zutaten. - Erwähne NIEMALS visuelle Aspekte wie Portionsgröße, Aussehen, sichtbares Fett, Farbe oder Garmethode-aus-dem-Bild. - Erwähne genau eine Unsicherheit oder Limitation (z.B. "ohne Foto fehlt die Portionsschätzung"). - Keine Markdown-Formatierung.""" def explain(pred: Prediction, ingredients_excerpt: str = "") -> str: image_was_provided = "no image" not in pred.cv_mode.lower() summary = { "predicted_calories_kcal": pred.predicted_calories, "predicted_macros": pred.macros or {}, "food_type": pred.inputs.get("food_type"), "text_flags_true": [k for k, v in pred.text_features.items() if v == 1 or v is True], "text_naive_kcal_estimate": pred.text_features.get("naive_kcal_per_serving"), "top_features": [d[0] for d in pred.top_drivers], } if image_was_provided: summary["cv_mode"] = pred.cv_mode summary["cv_features"] = pred.cv_features system_prompt = EXPLANATION_PROMPT_WITH_IMAGE else: summary["image_provided"] = False system_prompt = EXPLANATION_PROMPT_TEXT_ONLY user_msg = "Schätzung & Merkmale:\n" + json.dumps(summary, ensure_ascii=False, indent=2) if ingredients_excerpt: user_msg += "\n\nZutaten:\n" + ingredients_excerpt[:600] client = get_openai_client() resp = client.chat.completions.create( model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), response_format={"type": "json_object"}, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_msg}, ], temperature=0.4, max_tokens=300, ) return json.loads(resp.choices[0].message.content)["answer"]