smart-meal-analyzer / src /inference.py
saettsam's picture
Deploy runtime: app.py + src/inference.py + 5 models + tfidf vocab + README
371911a verified
"""
Inference pipeline for the Smart Meal Analyzer.
Combines:
- NLP extraction (ingredients text → 8 structured features via gpt-4o-mini)
- CV feature extraction (meal photo → 4 visual features)
* "fast" mode: trained SqueezeNet (local, free)
* "quality" mode: gpt-4o-mini vision (matches training-time labels)
- GBM v3 (calories regression)
- LLM explanation (German, 3-5 sentences)
"""
from __future__ import annotations
import base64
import io
import json
import os
import pickle
from dataclasses import dataclass
from pathlib import Path
from typing import Literal, Optional
import numpy as np
import pandas as pd
from PIL import Image
REPO_ROOT = Path(__file__).resolve().parents[1]
DATA_PROCESSED = REPO_ROOT / "data" / "processed"
MODELS = REPO_ROOT / "models"
CUISINE_CLASSES = ["asian", "european", "american", "mexican", "mediterranean", "other"]
DISH_CATEGORIES = ["main", "salad", "soup", "dessert", "snack", "drink", "other"]
HEADS = {"portion_bucket": 5, "cuisine_class": 6, "visible_fat": 5, "dish_category": 7}
_CACHE: dict = {}
# Load .env if available so OPENAI_API_KEY is picked up automatically.
try:
from dotenv import load_dotenv
load_dotenv(REPO_ROOT / ".env")
except ImportError:
pass
def get_openai_client():
if "openai" not in _CACHE:
from openai import OpenAI
_CACHE["openai"] = OpenAI()
return _CACHE["openai"]
def get_gbm(variant: str = "v3"):
key = f"gbm_{variant}"
if key not in _CACHE:
with open(MODELS / f"gbm_{variant}.pkl", "rb") as f:
_CACHE[key] = pickle.load(f)
return _CACHE[key]
def get_shap_explainer(variant: str = "v3"):
"""Cache a SHAP TreeExplainer for the calorie GBM. Returns None if shap unavailable."""
key = f"shap_{variant}"
if key in _CACHE:
return _CACHE[key]
try:
import shap
bundle = get_gbm(variant)
# tree_path_dependent is faster for sklearn GBM and needs no background data
try:
explainer = shap.TreeExplainer(bundle["model"], feature_perturbation="tree_path_dependent")
except TypeError:
explainer = shap.TreeExplainer(bundle["model"])
_CACHE[key] = explainer
return explainer
except Exception:
_CACHE[key] = None
return None
def get_macro_models(variant: str = "v3") -> dict:
"""Optional: load macro-target GBM bundles (protein, fat, carb). Returns {} if not present."""
key = f"macros_{variant}"
if key in _CACHE:
return _CACHE[key]
macros = {}
for macro in ("protein", "fat", "carb"):
path = MODELS / f"gbm_{variant}_{macro}.pkl"
if path.exists():
with open(path, "rb") as f:
macros[macro] = pickle.load(f)
_CACHE[key] = macros
return macros
def get_cv_classifier():
if "cv_model" in _CACHE:
return _CACHE["cv_model"], _CACHE["cv_device"], _CACHE["cv_transform"]
import torch
import torchvision.transforms as T
from torchvision import models
ckpt = MODELS / "cv_classifier.pth"
if not ckpt.exists():
raise FileNotFoundError(f"No CV classifier at {ckpt}")
device = torch.device("mps" if torch.backends.mps.is_available()
else "cuda" if torch.cuda.is_available() else "cpu")
state = torch.load(ckpt, map_location=device, weights_only=False)
H = state["heads"]
class Net(torch.nn.Module):
def __init__(self):
super().__init__()
backbone = models.squeezenet1_1(weights=None)
self.features = backbone.features
self.pool = torch.nn.AdaptiveAvgPool2d(1)
self.heads = torch.nn.ModuleDict({h: torch.nn.Linear(512, n) for h, n in H.items()})
def forward(self, x):
f = self.pool(self.features(x)).flatten(1)
return {h: self.heads[h](f) for h in H}
model = Net().to(device)
model.load_state_dict(state["state_dict"])
model.eval()
transform = T.Compose([
T.Resize(256), T.CenterCrop(224),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
_CACHE["cv_model"] = model
_CACHE["cv_device"] = device
_CACHE["cv_transform"] = transform
return model, device, transform
# ── NLP ──────────────────────────────────────────────────────────────────────
NLP_PROMPT = """You are a nutrition-aware ingredient parser.
Read ingredients and return ONLY valid JSON with EXACTLY:
{"naive_kcal_per_serving": <int>, "n_ingredients_parsed": <int>,
"has_oil_or_butter": <0|1>, "has_meat": <0|1>, "has_dairy": <0|1>, "has_added_sugar": <0|1>,
"cooking_method_class": "raw"|"baked"|"fried"|"boiled"|"steamed"|"grilled"|"other",
"dietary_flag": "vegan"|"vegetarian"|"omnivore"}
Use full 0-3000 range for naive_kcal_per_serving."""
ALLERGEN_PROMPT = """You detect common food allergens in an ingredient list. Return ONLY valid JSON with EXACTLY these boolean fields (0 or 1):
{"contains_gluten": <0|1>, "contains_lactose": <0|1>, "contains_nuts": <0|1>,
"contains_eggs": <0|1>, "contains_soy": <0|1>, "contains_fish_or_shellfish": <0|1>}
Be precise: 1 only when the ingredient is clearly present (or a clear derivative — wheat flour → gluten, milk → lactose, peanut butter → nuts). 0 if absent or uncertain."""
def _call_with_retry(client, **kwargs):
"""Wrap an OpenAI chat completion with simple exponential backoff (up to 3 tries)."""
import time as _time
last_err = None
for attempt in range(3):
try:
return client.chat.completions.create(**kwargs)
except Exception as e:
last_err = e
_time.sleep(0.5 * (2 ** attempt))
raise last_err
def extract_text_features(text: str) -> dict:
if not text or not text.strip():
return {"naive_kcal_per_serving": 400, "n_ingredients_parsed": 0,
"has_oil_or_butter": 0, "has_meat": 0, "has_dairy": 0, "has_added_sugar": 0,
"cooking_method_class": "other", "dietary_flag": "omnivore"}
client = get_openai_client()
resp = _call_with_retry(client,
model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": NLP_PROMPT},
{"role": "user", "content": text[:4000]},
],
temperature=0.0, max_tokens=200, seed=42,
)
return json.loads(resp.choices[0].message.content)
def detect_allergens(text: str) -> dict:
"""Best-effort allergen detection — used at inference time only, not part of training."""
default = {"contains_gluten": 0, "contains_lactose": 0, "contains_nuts": 0,
"contains_eggs": 0, "contains_soy": 0, "contains_fish_or_shellfish": 0}
if not text or not text.strip():
return default
try:
client = get_openai_client()
resp = _call_with_retry(client,
model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": ALLERGEN_PROMPT},
{"role": "user", "content": text[:4000]},
],
temperature=0.0, max_tokens=150, seed=42,
)
return {**default, **json.loads(resp.choices[0].message.content)}
except Exception:
return default
# ── CV: vision-LLM ───────────────────────────────────────────────────────────
CV_LLM_PROMPT = """You are a food-image analyst.
Look at the meal photo and return ONLY valid JSON with EXACTLY:
{"portion_bucket": <int 1-5>,
"cuisine_class": "asian"|"european"|"american"|"mexican"|"mediterranean"|"other",
"visible_fat": <int 1-5>,
"dish_category": "main"|"salad"|"soup"|"dessert"|"snack"|"drink"|"other"}
Use full 1-5 range for portion_bucket and visible_fat."""
def cv_features_from_llm(image: Image.Image) -> dict:
"""Vision-LLM CV path with retry + CNN-fallback on failure."""
client = get_openai_client()
img = image.copy()
img.thumbnail((512, 512))
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=85)
img_b64 = base64.b64encode(buf.getvalue()).decode("ascii")
try:
resp = _call_with_retry(client,
model=os.environ.get("OPENAI_VISION_MODEL", "gpt-4o-mini"),
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": CV_LLM_PROMPT},
{"role": "user", "content": [
{"type": "text", "text": "Analyze this meal photo."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}", "detail": "low"}},
]},
],
temperature=0.0, max_tokens=200, seed=42,
)
return json.loads(resp.choices[0].message.content)
except Exception:
# Fall back to the local CNN so the app still works without OpenAI
try:
return cv_features_from_classifier(image)
except Exception:
return {"portion_bucket": 3, "cuisine_class": "other", "visible_fat": 3, "dish_category": "other"}
def cv_features_from_classifier(image: Image.Image) -> dict:
import torch
model, device, transform = get_cv_classifier()
x = transform(image.convert("RGB")).unsqueeze(0).to(device)
with torch.no_grad():
out = model(x)
return {
"portion_bucket": int(out["portion_bucket"].argmax(1).item()) + 1,
"cuisine_class": CUISINE_CLASSES[int(out["cuisine_class"].argmax(1).item())],
"visible_fat": int(out["visible_fat"].argmax(1).item()) + 1,
"dish_category": DISH_CATEGORIES[int(out["dish_category"].argmax(1).item())],
}
# ── Build feature row + predict ──────────────────────────────────────────────
@dataclass
class Prediction:
predicted_calories: float
text_features: dict
cv_features: dict
cv_mode: str
inputs: dict
top_drivers: list # global feature importance (per model)
macros: dict = None # {"protein_g": float, "fat_g": float, "carb_g": float}
allergens: dict = None # {"contains_gluten": 0|1, ...}
top_drivers_shap: list = None # per-instance SHAP contributions in kcal: [(name, shap_kcal, value), ...]
shap_base_value: float = None # model's average prediction (intercept-like reference)
FOOD_TYPES = ["Homemade food", "Restaurant food", "Packaged food", "Raw vegetables and fruits", "Others"]
def _compute_shap(X: pd.DataFrame, feats: list, row: dict) -> tuple[Optional[list], Optional[float]]:
"""Return (top5 SHAP drivers, expected_value) for a single-row prediction, or (None, None) on failure.
Newer numpy/shap combos return values with various shapes (n_features,), (1, n_features),
(1, n_features, 1), or wrapper objects — we flatten defensively.
"""
explainer = get_shap_explainer("v3")
if explainer is None:
return None, None
try:
sv_arr = np.asarray(explainer.shap_values(X), dtype=float)
sv_row = sv_arr.reshape(-1) if sv_arr.size == len(feats) else sv_arr.squeeze().reshape(-1)
if sv_row.size != len(feats):
return None, None
top5 = np.argsort(np.abs(sv_row))[::-1][:5]
drivers = [(str(feats[i]), float(sv_row[i]), float(row[feats[i]])) for i in top5]
ev_arr = np.asarray(getattr(explainer, "expected_value", 0), dtype=float).flatten()
base = float(ev_arr[0]) if ev_arr.size > 0 else 0.0
return drivers, base
except Exception:
return None, None
def predict(image: Optional[Image.Image], ingredients_text: str,
food_type: str = "Homemade food",
cv_mode: Literal["cnn", "llm"] = "llm") -> Prediction:
text_features = extract_text_features(ingredients_text or "")
if image is None:
cv = {"portion_bucket": 3, "cuisine_class": "other", "visible_fat": 3, "dish_category": "other"}
cv_mode_used = "default (no image)"
elif cv_mode == "llm":
cv = cv_features_from_llm(image)
cv_mode_used = "Vision-LLM (gpt-4o-mini)"
else:
cv = cv_features_from_classifier(image)
cv_mode_used = "CNN (SqueezeNet)"
bundle = get_gbm("v3")
feats = bundle["features"]
row = {f: 0.0 for f in feats}
# baseline
row["n_ingredients_raw"] = float(text_features.get("n_ingredients_parsed", 0))
row["log_n_ingredients"] = float(np.log1p(row["n_ingredients_raw"]))
row["ing_len"] = float(len(ingredients_text or ""))
# food_type one-hot
if food_type not in FOOD_TYPES:
food_type = "Others"
for ft in FOOD_TYPES[1:]: # drop_first
col = f"food_{ft}"
if col in row:
row[col] = 1 if ft == food_type else 0
# TF-IDF: leave at 0 (the trained TF-IDF vocabulary isn't accessible at runtime —
# this is a known limitation, the inference path emphasizes structured features)
# NLP features
for c in ["has_oil_or_butter", "has_meat", "has_dairy", "has_added_sugar"]:
row[c] = int(bool(text_features.get(c, 0)))
for c in ["naive_kcal_per_serving", "n_ingredients_parsed"]:
if c in row:
row[c] = float(text_features.get(c, 0))
cm = str(text_features.get("cooking_method_class", "other"))
df_cm = str(text_features.get("dietary_flag", "omnivore"))
for c in ["baked", "fried", "boiled", "steamed", "grilled", "other"]:
col = f"cooking_method_class_{c}"
if col in row:
row[col] = 1 if cm == c else 0
for c in ["vegetarian", "vegan"]:
col = f"dietary_flag_{c}"
if col in row:
row[col] = 1 if df_cm == c else 0
# CV features
row["portion_bucket"] = float(cv.get("portion_bucket", 3))
row["visible_fat"] = float(cv.get("visible_fat", 3))
cu = str(cv.get("cuisine_class", "other"))
dc = str(cv.get("dish_category", "other"))
for c in CUISINE_CLASSES[1:]:
col = f"cuisine_class_{c}"
if col in row:
row[col] = 1 if cu == c else 0
for c in DISH_CATEGORIES[1:]:
col = f"dish_category_{c}"
if col in row:
row[col] = 1 if dc == c else 0
X = pd.DataFrame([row])[feats]
cal = float(np.round(bundle["model"].predict(X)[0]))
importances = bundle["model"].feature_importances_
top_idx = np.argsort(importances)[::-1][:5]
top_drivers = [(feats[i], float(importances[i]), float(row[feats[i]])) for i in top_idx]
# Per-instance SHAP contributions (kcal-units). Falls back gracefully if shap unavailable.
top_drivers_shap, shap_base = _compute_shap(X, feats, row)
# Predict macros (protein, fat, carb) — uses same feature row, separate models
macro_bundles = get_macro_models("v3")
macros = {}
for macro_name, macro_bundle in macro_bundles.items():
macro_X = pd.DataFrame([row])[macro_bundle["features"]]
macros[f"{macro_name}_g"] = float(max(0.0, np.round(macro_bundle["model"].predict(macro_X)[0], 1)))
# Allergen detection (separate LLM call, fails gracefully)
allergens = detect_allergens(ingredients_text or "")
return Prediction(
predicted_calories=cal,
text_features=text_features,
cv_features=cv,
cv_mode=cv_mode_used,
inputs={"food_type": food_type, "ingredients": ingredients_text},
top_drivers=top_drivers,
macros=macros,
allergens=allergens,
top_drivers_shap=top_drivers_shap,
shap_base_value=shap_base,
)
# ── German explanation ───────────────────────────────────────────────────────
EXPLANATION_PROMPT_WITH_IMAGE = """Du erklärst eine Kalorien-Schätzung aus einem maschinellen Lernmodell auf Deutsch.
Wichtig:
- Berechne KEINEN neuen Wert. Nutze EXAKT den vorgegebenen Wert.
- Antwort als JSON mit dem Schlüssel "answer".
- 3-5 kurze Sätze.
- Nimm Bezug auf mindestens ein visuelles UND ein textuelles Merkmal.
- Erwähne genau eine Unsicherheit oder Limitation.
- Keine Markdown-Formatierung."""
EXPLANATION_PROMPT_TEXT_ONLY = """Du erklärst eine Kalorien-Schätzung aus einem maschinellen Lernmodell auf Deutsch.
Wichtig:
- Berechne KEINEN neuen Wert. Nutze EXAKT den vorgegebenen Wert.
- Antwort als JSON mit dem Schlüssel "answer".
- 3-5 kurze Sätze.
- Es wurde KEIN Foto hochgeladen — beziehe dich AUSSCHLIESSLICH auf textuelle Merkmale aus den Zutaten.
- Erwähne NIEMALS visuelle Aspekte wie Portionsgröße, Aussehen, sichtbares Fett, Farbe oder Garmethode-aus-dem-Bild.
- Erwähne genau eine Unsicherheit oder Limitation (z.B. "ohne Foto fehlt die Portionsschätzung").
- Keine Markdown-Formatierung."""
def explain(pred: Prediction, ingredients_excerpt: str = "") -> str:
image_was_provided = "no image" not in pred.cv_mode.lower()
summary = {
"predicted_calories_kcal": pred.predicted_calories,
"predicted_macros": pred.macros or {},
"food_type": pred.inputs.get("food_type"),
"text_flags_true": [k for k, v in pred.text_features.items() if v == 1 or v is True],
"text_naive_kcal_estimate": pred.text_features.get("naive_kcal_per_serving"),
"top_features": [d[0] for d in pred.top_drivers],
}
if image_was_provided:
summary["cv_mode"] = pred.cv_mode
summary["cv_features"] = pred.cv_features
system_prompt = EXPLANATION_PROMPT_WITH_IMAGE
else:
summary["image_provided"] = False
system_prompt = EXPLANATION_PROMPT_TEXT_ONLY
user_msg = "Schätzung & Merkmale:\n" + json.dumps(summary, ensure_ascii=False, indent=2)
if ingredients_excerpt:
user_msg += "\n\nZutaten:\n" + ingredients_excerpt[:600]
client = get_openai_client()
resp = client.chat.completions.create(
model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"),
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_msg},
],
temperature=0.4, max_tokens=300,
)
return json.loads(resp.choices[0].message.content)["answer"]