| """Custom handler for Hugging Face Inference Endpoints. |
| |
| Loads an EfficientNet-B0 Keras classifier (`classification_model.h5`) for the |
| Food-101-style food categories defined by `nutritional_database.json` and |
| returns the predicted dish along with calories / protein / fat / carbs. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import base64 |
| import io |
| import json |
| import logging |
| import os |
| from typing import Any, Dict, List, Optional, Union |
| from urllib.request import urlopen |
|
|
| import numpy as np |
| from PIL import Image |
|
|
| os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2") |
| import tensorflow as tf |
|
|
| logger = logging.getLogger(__name__) |
| logging.basicConfig(level=logging.INFO) |
|
|
| IMAGE_SIZE = (224, 224) |
| TOP_K = 5 |
|
|
|
|
| def _to_pil(payload: Any) -> Image.Image: |
| """Best-effort conversion of whatever the endpoint received into a PIL image.""" |
| if isinstance(payload, Image.Image): |
| return payload.convert("RGB") |
|
|
| if isinstance(payload, dict): |
| for key in ("image", "inputs", "url", "data", "bytes", "b64"): |
| if key in payload: |
| return _to_pil(payload[key]) |
| raise ValueError(f"No image-like key found in dict input: {list(payload)}") |
|
|
| if isinstance(payload, (bytes, bytearray)): |
| return Image.open(io.BytesIO(bytes(payload))).convert("RGB") |
|
|
| if isinstance(payload, str): |
| |
| if payload.startswith(("http://", "https://")): |
| with urlopen(payload, timeout=15) as resp: |
| return Image.open(io.BytesIO(resp.read())).convert("RGB") |
| if payload.startswith("data:"): |
| payload = payload.split(",", 1)[-1] |
| try: |
| return Image.open(io.BytesIO(base64.b64decode(payload))).convert("RGB") |
| except Exception as exc: |
| raise ValueError(f"String input is neither a URL nor valid base64: {exc}") |
|
|
| raise TypeError(f"Unsupported input type: {type(payload).__name__}") |
|
|
|
|
| def _preprocess(image: Image.Image) -> np.ndarray: |
| image = image.resize(IMAGE_SIZE, Image.BILINEAR) |
| arr = np.asarray(image, dtype=np.float32) |
| |
| arr = tf.keras.applications.efficientnet.preprocess_input(arr) |
| return np.expand_dims(arr, axis=0) |
|
|
|
|
| def _humanize(label: str) -> str: |
| return label.replace("_", " ").title() |
|
|
|
|
| class EndpointHandler: |
| """Handler invoked by the HF Inference Endpoints toolkit on each request.""" |
|
|
| def __init__(self, path: str = ""): |
| path = path or os.path.dirname(os.path.abspath(__file__)) |
| logger.info("Loading food-recognition model from %s", path) |
|
|
| model_path = os.path.join(path, "classification_model.h5") |
| if not os.path.exists(model_path): |
| raise FileNotFoundError(f"classification_model.h5 not found at {model_path}") |
|
|
| self.model = tf.keras.models.load_model(model_path, compile=False) |
|
|
| nutri_path = os.path.join(path, "nutritional_database.json") |
| with open(nutri_path, "r", encoding="utf-8") as fh: |
| self.nutrition: Dict[str, Dict[str, float]] = json.load(fh) |
|
|
| |
| |
| self.labels: List[str] = list(self.nutrition.keys()) |
|
|
| try: |
| output_shape = self.model.output_shape |
| num_outputs = output_shape[-1] if isinstance(output_shape, tuple) else None |
| if num_outputs is not None and num_outputs != len(self.labels): |
| logger.warning( |
| "Model output size (%s) != number of labels (%s); " |
| "predictions will be truncated/padded to the shorter length.", |
| num_outputs, len(self.labels), |
| ) |
| except Exception: |
| pass |
|
|
| logger.info("Loaded %d food classes", len(self.labels)) |
|
|
| def _nutrition_for(self, label: str) -> Dict[str, Optional[float]]: |
| info = self.nutrition.get(label, {}) |
| return { |
| "calories_per_100g": info.get("calories_per_100g"), |
| "protein_per_100g": info.get("protein_per_100g"), |
| "fat_per_100g": info.get("fat_per_100g"), |
| "carbs_per_100g": info.get("carbs_per_100g"), |
| "fiber_per_100g": info.get("fiber_per_100g"), |
| } |
|
|
| def __call__(self, data: Union[Dict[str, Any], bytes, Image.Image]) -> List[Dict[str, Any]]: |
| |
| |
| if isinstance(data, dict): |
| payload = data.get("inputs", data) |
| params = data.get("parameters", {}) or {} |
| else: |
| payload = data |
| params = {} |
|
|
| top_k = int(params.get("top_k", TOP_K)) |
|
|
| image = _to_pil(payload) |
| batch = _preprocess(image) |
|
|
| preds = self.model.predict(batch, verbose=0) |
| if isinstance(preds, (list, tuple)): |
| preds = preds[0] |
| scores = np.asarray(preds).reshape(-1) |
|
|
| |
| if scores.min() < 0.0 or scores.max() > 1.0 + 1e-3: |
| scores = tf.nn.softmax(scores).numpy() |
|
|
| n = min(len(scores), len(self.labels)) |
| scores = scores[:n] |
| labels = self.labels[:n] |
|
|
| order = np.argsort(scores)[::-1] |
| top_idx = order[: max(1, top_k)] |
|
|
| best_idx = int(top_idx[0]) |
| best_label = labels[best_idx] |
|
|
| return [{ |
| "dish": best_label, |
| "dish_name": _humanize(best_label), |
| "confidence": float(scores[best_idx]), |
| **self._nutrition_for(best_label), |
| "top_predictions": [ |
| { |
| "dish": labels[int(i)], |
| "dish_name": _humanize(labels[int(i)]), |
| "confidence": float(scores[int(i)]), |
| } |
| for i in top_idx |
| ], |
| }] |
|
|