Spaces:
Sleeping
Sleeping
| """ | |
| app_optimized.py β Recipe Recommender Multimodal Demo (optimised build) | |
| Hugging Face Space | CPU-only | Gradio 4.44 | |
| Optimisation notes: | |
| β’ CNN + LLM load lazily on first use (lru_cache + threading.Lock). | |
| β’ Two-phase UX: Phase 1 (<3 s) = ingredients + recipe table; | |
| Phase 2 (~30 s) = LLM narration, user-triggered. | |
| β’ gr.HTML ingredient panel β real images OR coloured text badges. | |
| β’ Pipeline transparency panel β query, scores, per-stage timing. | |
| β’ gr.Examples β 5 predefined text queries for instant demos. | |
| """ | |
| # ββ stdlib ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import base64 | |
| import functools | |
| import json | |
| import os | |
| import threading | |
| import time | |
| from pathlib import Path | |
| # ββ third-party βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import faiss | |
| import gradio as gr | |
| # ββ Patch 1: gradio_client 0.6.x β bool JSON-Schema values cause TypeError βββ | |
| import gradio_client.utils as _gcu | |
| _orig_get_type = _gcu.get_type | |
| _orig_jstpt = _gcu._json_schema_to_python_type | |
| def _safe_get_type(schema): | |
| if not isinstance(schema, dict): | |
| return "Any" | |
| return _orig_get_type(schema) | |
| def _safe_jstpt(schema, defs=None): | |
| if not isinstance(schema, dict): | |
| return "Any" | |
| return _orig_jstpt(schema, defs) | |
| _gcu.get_type = _safe_get_type | |
| _gcu._json_schema_to_python_type = _safe_jstpt | |
| # ββ Patch 2: Starlette >=1.0 changed TemplateResponse(name, ctx) β (req, name) β | |
| import starlette.templating as _st | |
| _orig_TemplateResponse = _st.Jinja2Templates.TemplateResponse | |
| def _compat_TemplateResponse(self, *args, **kwargs): | |
| # Old API (Starlette <1.0): TemplateResponse(name: str, context: dict, ...) | |
| # New API (Starlette >=1.0): TemplateResponse(request, name: str, context=...) | |
| if args and isinstance(args[0], str): | |
| name = args[0] | |
| context = args[1] if len(args) > 1 else kwargs.pop("context", {}) | |
| request = context.get("request") | |
| return _orig_TemplateResponse(self, request, name, context=context, **kwargs) | |
| return _orig_TemplateResponse(self, *args, **kwargs) | |
| _st.Jinja2Templates.TemplateResponse = _compat_TemplateResponse | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import pandas as pd | |
| import torch | |
| import torchvision.models as models | |
| import torchvision.transforms as T | |
| from huggingface_hub import hf_hub_download | |
| try: | |
| from llama_cpp import Llama | |
| _LLAMA_AVAILABLE = True | |
| except ImportError: | |
| Llama = None # type: ignore[assignment, misc] | |
| _LLAMA_AVAILABLE = False | |
| print("llama-cpp-python not available β LLM disabled") | |
| from PIL import Image | |
| from rapidfuzz import process as rfprocess | |
| from sentence_transformers import SentenceTransformer | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIG | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| HF_USERNAME = os.environ.get("HF_USERNAME", "ramonsj11") | |
| HF_SPACE_NAME = os.environ.get("HF_SPACE_NAME", "ProyectoFinal_recetas") | |
| CNN_REPO = f"{HF_USERNAME}/recipe-ingredient-classifier" | |
| LLM_REPO = f"{HF_USERNAME}/recipe-llm-gguf" | |
| EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" | |
| _EMBED_SHORT = "multilingual-MiniLM-L12-v2 Β· 384-dim" | |
| DIETARY_CHOICES = ["any", "vegetarian", "vegan", "gluten-free", "dairy-free"] | |
| SPEED_CHOICES = ["any", "fast", "medium", "slow"] | |
| # Pastel palette for missing-ingredient badges | |
| _BADGE_COLORS = [ | |
| "#FFB3B3", "#B3D9FF", "#B3FFB3", "#FFD9B3", | |
| "#E8B3FF", "#B3FFE8", "#FFE8B3", "#D9B3FF", | |
| ] | |
| _GREY = Image.new("RGB", (200, 200), color=(210, 210, 210)) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STARTUP ARTIFACTS β FAISS + embeddings (fast, always needed) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("Loading FAISS indexβ¦") | |
| faiss_index = faiss.read_index("recipe_faiss.index") | |
| print("Loading dataframeβ¦") | |
| df = pd.read_parquet("df_final_embeddings.parquet").reset_index(drop=True) | |
| with open("ingredient_catalog.json") as _f: | |
| ingredient_catalog: dict[str, str] = json.load(_f) | |
| try: | |
| with open("class_labels.json") as _f: | |
| class_labels: dict[str, str] = json.load(_f) | |
| print(f" class_labels.json: {len(class_labels)} classes") | |
| except FileNotFoundError: | |
| class_labels = {} | |
| print(" class_labels.json not found β CNN disabled") | |
| NUM_CLASSES = len(class_labels) | |
| _catalog_keys = list(ingredient_catalog.keys()) | |
| # Column-name compatibility β prefer Spanish column if present | |
| if "ingredient_text_es" in df.columns: | |
| INGR_COL = "ingredient_text_es" | |
| elif "ingredient_text" in df.columns: | |
| INGR_COL = "ingredient_text" | |
| else: | |
| INGR_COL = "ingredients_text_processed" | |
| DIETARY_COL = "dietary_profile" if "dietary_profile" in df.columns else "dietary_profile_updated" | |
| CUISINE_COL = "cuisine_list" if "cuisine_list" in df.columns else "cuisine" | |
| DISH_TYPE_COLS = [c for c in ("course_list", "course", "category", "subcategory") if c in df.columns] | |
| print(f" {len(df):,} recipes | ingr_col={INGR_COL} | dietary_col={DIETARY_COL}") | |
| print("Loading SentenceTransformerβ¦") | |
| # Multilingual model β supports Spanish and English queries (384-dim, same as before) | |
| embedding_model = SentenceTransformer(EMBED_MODEL) | |
| print("Startup artifacts ready β CNN + LLM will load on first use.") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OPTIMISATION 1 + 2 β lru_cache lazy loaders with thread-safe getters | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _cnn_lock = threading.Lock() | |
| _llm_lock = threading.Lock() | |
| _cnn_tf = T.Compose([ | |
| T.Resize(256), | |
| T.CenterCrop(224), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| def _load_cnn_cached() -> torch.nn.Module: | |
| """Download weights + build model exactly once; result cached in-process.""" | |
| if NUM_CLASSES == 0: | |
| raise RuntimeError("class_labels.json not found β CNN unavailable") | |
| weights_path = hf_hub_download(repo_id=CNN_REPO, filename="efficientnet_ingredients.pth") | |
| mdl = models.efficientnet_b0(weights=None) | |
| mdl.classifier[1] = torch.nn.Linear(1280, NUM_CLASSES) | |
| mdl.load_state_dict(torch.load(weights_path, map_location="cpu")) | |
| mdl.eval() | |
| return mdl | |
| def _load_llm_cached() -> "Llama": | |
| """Download GGUF + initialise Llama exactly once; result cached in-process.""" | |
| if not _LLAMA_AVAILABLE: | |
| raise RuntimeError("llama-cpp-python not installed β LLM unavailable") | |
| gguf_path = hf_hub_download(repo_id=LLM_REPO, filename="tinyllama-recipes-q4.gguf") | |
| return Llama(model_path=gguf_path, n_ctx=2048, n_threads=4, verbose=False) | |
| def get_cnn() -> torch.nn.Module: | |
| """Thread-safe lazy getter β safe to call from concurrent Gradio requests.""" | |
| with _cnn_lock: | |
| return _load_cnn_cached() | |
| def get_llm() -> Llama: | |
| """Thread-safe lazy getter β safe to call from concurrent Gradio requests.""" | |
| with _llm_lock: | |
| return _load_llm_cached() | |
| def _cnn_loaded() -> bool: | |
| return _load_cnn_cached.cache_info().currsize > 0 | |
| def _llm_loaded() -> bool: | |
| return _load_llm_cached.cache_info().currsize > 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FUNCTION 1 β ingredient classification (lazy CNN) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def classify_ingredients(image: Image.Image) -> list[tuple[str, float]]: | |
| """Return top-10 [(ingredient_name, confidence)] from a PIL image.""" | |
| model = get_cnn() | |
| tensor = _cnn_tf(image.convert("RGB")).unsqueeze(0) | |
| with torch.no_grad(): | |
| probs = torch.softmax(model(tensor), dim=1)[0] | |
| top10 = torch.topk(probs, 10) | |
| return [ | |
| (class_labels.get(str(i.item()), f"class_{i.item()}"), s.item()) | |
| for i, s in zip(top10.indices, top10.values) | |
| ] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FUNCTION 5 β ingredient image lookup | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_ingredient_image(name: str) -> str | None: | |
| """Fuzzy-match name against catalog (threshold 72); return path or None.""" | |
| hit = rfprocess.extractOne(name.lower(), _catalog_keys) | |
| if hit and hit[1] >= 72: | |
| return ingredient_catalog[hit[0]] | |
| return None | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FUNCTION 2 β recipe retrieval (now also returns query string + scores) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_dietary(raw) -> list[str]: | |
| if isinstance(raw, list): | |
| return [str(x).lower() for x in raw] | |
| try: | |
| return [str(x).lower() for x in json.loads(raw)] | |
| except Exception: | |
| return [str(raw).lower()] | |
| def _stringify(val) -> str: | |
| if isinstance(val, list): | |
| return ", ".join(str(x) for x in val) | |
| try: | |
| return ", ".join(str(x) for x in json.loads(val)) | |
| except Exception: | |
| return str(val) if pd.notna(val) else "" | |
| def _choice_values_from_columns(frame: pd.DataFrame, columns: list[str], limit: int = 40) -> list[str]: | |
| values: set[str] = set() | |
| for col in columns: | |
| for raw in frame[col].dropna().head(20000): | |
| text = _stringify(raw) if "_list" in col else str(raw) | |
| for item in text.split(","): | |
| item = item.strip() | |
| if item and item.lower() not in {"nan", "none", "[]"}: | |
| values.add(item) | |
| return ["any"] + sorted(values)[:limit] | |
| DISH_TYPE_CHOICES = _choice_values_from_columns(df, DISH_TYPE_COLS) | |
| def _contains_choice(raw, choice: str) -> bool: | |
| if choice == "any": | |
| return True | |
| return choice.lower() in _stringify(raw).lower() | |
| def _text_blob(row: dict) -> str: | |
| parts = [ | |
| row.get("recipe_title", ""), | |
| row.get(INGR_COL, ""), | |
| row.get("ingredients_text_processed", ""), | |
| row.get("directions_text", ""), | |
| row.get("description", ""), | |
| row.get("category", ""), | |
| row.get("subcategory", ""), | |
| row.get("course", ""), | |
| row.get("course_list", ""), | |
| ] | |
| return " ".join(_stringify(p).lower() for p in parts if p is not None) | |
| def _ingredient_overlap(query_terms: list[str], row: dict) -> float: | |
| terms = [t.lower().strip() for t in query_terms if t and t.strip()] | |
| if not terms: | |
| return 0.0 | |
| blob = _text_blob(row) | |
| return sum(1 for term in terms if term in blob) / len(terms) | |
| def _has_dish_image(row: dict) -> float: | |
| path = row.get("dish_image_path") or row.get("image_path") or "" | |
| return 1.0 if path and Path(path).exists() else 0.0 | |
| class MLPReranker(torch.nn.Module): | |
| """Small deterministic MLP over retrieval/filter features.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.net = torch.nn.Sequential( | |
| torch.nn.Linear(7, 8), | |
| torch.nn.ReLU(), | |
| torch.nn.Linear(8, 1), | |
| ) | |
| self._init_reasonable_weights() | |
| def _init_reasonable_weights(self) -> None: | |
| with torch.no_grad(): | |
| first: torch.nn.Linear = self.net[0] # type: ignore[assignment] | |
| second: torch.nn.Linear = self.net[2] # type: ignore[assignment] | |
| first.weight.zero_() | |
| first.bias.zero_() | |
| for i in range(7): | |
| first.weight[i, i] = 1.0 | |
| first.weight[7] = torch.tensor([0.8, 1.0, 0.5, 0.35, 0.25, 0.45, 0.2]) | |
| second.weight[:] = torch.tensor([[1.5, 1.2, 0.7, 0.5, 0.35, 0.75, 0.25, 1.0]]) | |
| second.bias.zero_() | |
| def score(self, features: list[list[float]]) -> list[float]: | |
| if not features: | |
| return [] | |
| tensor = torch.tensor(features, dtype=torch.float32) | |
| return self.net(tensor).squeeze(-1).tolist() | |
| reranker = MLPReranker() | |
| def rerank_recipes( | |
| cands: pd.DataFrame, | |
| ingredients: list[str], | |
| dietary_filter: str, | |
| speed_filter: str, | |
| dish_type_filter: str, | |
| ) -> pd.DataFrame: | |
| rows = cands.to_dict(orient="records") | |
| features: list[list[float]] = [] | |
| for row in rows: | |
| features.append([ | |
| float(row.get("_score", 0.0)), | |
| _ingredient_overlap(ingredients, row), | |
| 1.0 if dietary_filter == "any" or _contains_choice(row.get(DIETARY_COL, ""), dietary_filter) else 0.0, | |
| 1.0 if speed_filter == "any" or str(row.get("cook_speed", "")).lower() == speed_filter.lower() else 0.0, | |
| 1.0 if dish_type_filter == "any" or any(_contains_choice(row.get(col, ""), dish_type_filter) for col in DISH_TYPE_COLS) else 0.0, | |
| 1.0 if any(term.lower() in str(row.get("recipe_title", "")).lower() for term in ingredients) else 0.0, | |
| _has_dish_image(row), | |
| ]) | |
| ranked = cands.copy() | |
| ranked["_rerank_score"] = reranker.score(features) | |
| return ranked.sort_values("_rerank_score", ascending=False) | |
| def retrieve_recipes( | |
| ingredients: list[str], | |
| dietary_filter: str = "any", | |
| speed_filter: str = "any", | |
| dish_type_filter: str = "any", | |
| k: int = 5, | |
| ) -> tuple[list[dict], str, list[float]]: | |
| """Returns (recipe_dicts, query_text, reranker_scores).""" | |
| query = "ingredients: " + ", ".join(ingredients) | |
| emb = embedding_model.encode([query], normalize_embeddings=True).astype("float32") | |
| dists, idxs = faiss_index.search(emb, 50) | |
| cands = df.iloc[idxs[0]].copy() | |
| cands["_score"] = dists[0] | |
| if dietary_filter != "any": | |
| mask = cands[DIETARY_COL].apply(lambda v: dietary_filter.lower() in _parse_dietary(v)) | |
| cands = cands[mask] | |
| if speed_filter != "any" and "cook_speed" in cands.columns: | |
| cands = cands[cands["cook_speed"].str.lower() == speed_filter.lower()] | |
| if dish_type_filter != "any" and DISH_TYPE_COLS: | |
| mask = cands.apply( | |
| lambda row: any(_contains_choice(row.get(col, ""), dish_type_filter) for col in DISH_TYPE_COLS), | |
| axis=1, | |
| ) | |
| cands = cands[mask] | |
| ranked = rerank_recipes(cands, ingredients, dietary_filter, speed_filter, dish_type_filter) | |
| top = ranked.head(k) | |
| scores = top["_rerank_score"].tolist() | |
| return top.to_dict(orient="records"), query, scores | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FUNCTION 3 β streaming LLM narration (lazy LLM) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _narration_prompt(row: dict) -> str: | |
| title = row.get("recipe_title", "Unknown recipe") | |
| ingr = row.get(INGR_COL) or row.get("ingredients_text_processed", "") | |
| # ingredient_text is space-separated in this dataset; convert for readability | |
| ingr = ingr.replace(" ", ", ") if " " in ingr and "," not in ingr else ingr | |
| dirs = row.get("directions_text", "")[:800] | |
| dietary = _stringify(row.get(DIETARY_COL) or row.get("dietary_profile_updated", "")) | |
| return ( | |
| "<|system|>\n" | |
| "You are a helpful cooking assistant. Narrate recipes clearly and engagingly.\n</s>\n" | |
| "<|user|>\n" | |
| "Please narrate this recipe in a friendly way:\n" | |
| f"Title: {title}\nIngredients: {ingr}\nInstructions: {dirs}\nDietary: {dietary}\n</s>\n" | |
| "<|assistant|>\n" | |
| ) | |
| def build_recipe_detail_md(row: dict | None) -> str: | |
| if not row: | |
| return "Select a recipe to see ingredients and procedure." | |
| title = row.get("recipe_title", "Recipe") | |
| ingredients = row.get(INGR_COL) or row.get("ingredients_text_processed", "") | |
| ingredients = ingredients.replace(" ", ", ") if " " in ingredients and "," not in ingredients else ingredients | |
| directions = row.get("directions_text", "") or row.get("directions", "") | |
| cuisine = _stringify(row.get(CUISINE_COL, "")) | |
| dietary = _stringify(row.get(DIETARY_COL, "")) | |
| speed = row.get("cook_speed", "") | |
| meta = " Β· ".join(str(x) for x in [cuisine, dietary, speed] if str(x).strip()) | |
| return ( | |
| f"### {title}\n\n" | |
| f"{meta}\n\n" | |
| f"**Ingredients**\n\n{ingredients or 'Not available'}\n\n" | |
| f"**Procedure**\n\n{directions or 'Not available'}" | |
| ) | |
| def generate_recipe(recipe_row: dict | None): | |
| """Generator β streams growing narration string; shows gr.Info on first LLM load.""" | |
| if not recipe_row: | |
| yield "Select a recipe from the table above, then click 'Narrate'." | |
| return | |
| if not _llm_loaded(): | |
| gr.Info("Loading language model for the first time (~25 s) β please waitβ¦") | |
| model = get_llm() | |
| accumulated = "" | |
| for chunk in model(_narration_prompt(recipe_row), max_tokens=512, temperature=0.7, stream=True): | |
| accumulated += chunk["choices"][0]["text"] | |
| yield accumulated | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FUNCTION 4 β chat about the active recipe (lazy LLM) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chat_about_recipe( | |
| message: str, | |
| history: list[list[str | None]], | |
| recipe_state: dict | None, | |
| ) -> tuple[list, str]: | |
| if not message.strip(): | |
| return history, "" | |
| if recipe_state: | |
| title = recipe_state.get("recipe_title", "a recipe") | |
| ingr = recipe_state.get(INGR_COL, "") | |
| sys_msg = ( | |
| f"The user is asking about '{title}'.\nIngredients: {ingr}\n" | |
| "Answer only questions related to this recipe." | |
| ) | |
| else: | |
| sys_msg = "You are a helpful cooking assistant." | |
| if not _llm_loaded(): | |
| gr.Info("Loading language model for the first time (~25 s) β please waitβ¦") | |
| model = get_llm() | |
| prompt = ( | |
| f"<|system|>\n{sys_msg}\n</s>\n" | |
| f"<|user|>\n{message}\n</s>\n" | |
| "<|assistant|>\n" | |
| ) | |
| reply = model(prompt, max_tokens=300, temperature=0.7, stream=False)["choices"][0]["text"].strip() | |
| return history + [[message, reply]], "" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FIX β ingredient HTML panel (image card OR coloured text badge) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _img_to_b64(path: str) -> str | None: | |
| """Encode a local image as a base64 data-URI for inline HTML embedding.""" | |
| try: | |
| ext = Path(path).suffix.lstrip(".").lower() | |
| mime = "image/jpeg" if ext in ("jpg", "jpeg") else f"image/{ext}" | |
| with open(path, "rb") as fh: | |
| b64 = base64.b64encode(fh.read()).decode() | |
| return f"data:{mime};base64,{b64}" | |
| except Exception: | |
| return None | |
| def build_ingredient_html(top_ingr: list[tuple[str, float]]) -> str: | |
| """ | |
| Returns an HTML string for gr.HTML. | |
| - Ingredient WITH catalog image β thumbnail card (base64 inline src). | |
| - Ingredient WITHOUT image β coloured text badge (no grey placeholder). | |
| """ | |
| cards: list[str] = [] | |
| for i, (name, conf) in enumerate(top_ingr): | |
| pct = f"{conf * 100:.1f}%" | |
| color = _BADGE_COLORS[i % len(_BADGE_COLORS)] | |
| path = get_ingredient_image(name) | |
| src = _img_to_b64(path) if path and Path(path).exists() else None | |
| if src: | |
| cards.append( | |
| f'<div style="text-align:center;margin:6px;width:110px">' | |
| f'<img src="{src}" style="width:100px;height:100px;' | |
| f'object-fit:cover;border-radius:10px;border:1px solid #ddd">' | |
| f'<div style="font-size:12px;margin-top:3px;color:#333">{name}</div>' | |
| f'<div style="font-size:11px;color:#888">{pct}</div>' | |
| f'</div>' | |
| ) | |
| else: | |
| # Coloured badge β no grey rectangle | |
| cards.append( | |
| f'<div style="text-align:center;margin:6px;width:110px;' | |
| f'display:flex;flex-direction:column;align-items:center;justify-content:center">' | |
| f'<span style="background:{color};padding:6px 12px;border-radius:14px;' | |
| f'font-size:13px;font-weight:500;display:inline-block">π₯¬ {name}</span>' | |
| f'<div style="font-size:11px;color:#888;margin-top:4px">{pct}</div>' | |
| f'</div>' | |
| ) | |
| return ( | |
| '<div style="display:flex;flex-wrap:wrap;gap:4px;' | |
| 'padding:8px;min-height:60px;align-items:flex-start">' | |
| + "".join(cards) | |
| + "</div>" | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DISH GALLERY | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_dish_gallery(recipes: list[dict]) -> list[tuple[Image.Image, str]]: | |
| items: list[tuple[Image.Image, str]] = [] | |
| for row in recipes: | |
| path = row.get("dish_image_path") or row.get("image_path") or "" | |
| if path and Path(path).exists(): | |
| try: | |
| img = Image.open(path).convert("RGB").resize((300, 200)) | |
| except Exception: | |
| img = _GREY | |
| else: | |
| img = _GREY | |
| items.append((img, row.get("recipe_title", "Recipe"))) | |
| return items | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OPTIMISATION 3 β TWO-PHASE SEARCH HANDLER | |
| # Phase 1 (this function, <3 s): CNN + FAISS β panels A, B, debug | |
| # Phase 2 (narrate_btn click, ~30 s): LLM narration on demand | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def find_recipes( | |
| image: Image.Image | None, | |
| text_query: str, | |
| dietary: str, | |
| speed: str, | |
| dish_type: str, | |
| progress=gr.Progress(), | |
| ): | |
| """ | |
| Outputs (8): | |
| search_status, ingr_html, dish_gallery, recipe_df, | |
| recipe_detail, recipe_state, results_state, pipeline_debug | |
| """ | |
| t_total = time.perf_counter() | |
| if image is None and not (text_query or "").strip(): | |
| raise gr.Error("Please upload a photo or type ingredient names.") | |
| # OPTIMISATION 5 β debug dict filled throughout this function | |
| debug: dict = { | |
| "models": { | |
| "cnn": f"EfficientNet-B0 ({NUM_CLASSES} classes)", | |
| "embed": f"{_EMBED_SHORT} (cosine)", | |
| "reranker": "MLP reranker over FAISS score + overlap + filters + image signal", | |
| "llm": "TinyLlama-1.1B-Chat Q4_K_M (lazy β loads on Narrate)", | |
| }, | |
| "query": "", | |
| "reranker_scores": {}, | |
| "timing_ms": {}, | |
| } | |
| # ββ Phase 1a: classify image or parse text ββββββββββββββββββββββββββββββββ | |
| top_ingr: list[tuple[str, float]] | |
| if image is not None: | |
| if not _cnn_loaded(): | |
| gr.Info("Loading ingredient classifier for the first time (~15 s)β¦") | |
| progress(0.15, desc="Running ingredient classifierβ¦") | |
| t0 = time.perf_counter() | |
| top_ingr = classify_ingredients(image) | |
| debug["timing_ms"]["cnn_ms"] = round((time.perf_counter() - t0) * 1000) | |
| names = [n for n, _ in top_ingr[:5]] | |
| else: | |
| names = [s.strip() for s in text_query.split(",") if s.strip()] | |
| top_ingr = [(n, 1.0) for n in names[:10]] | |
| debug["timing_ms"]["cnn_ms"] = 0 # not used for text input | |
| if not names: | |
| raise gr.Error("Could not extract any ingredient names from the input.") | |
| # ββ Phase 1b: embed + FAISS retrieval ββββββββββββββββββββββββββββββββββββ | |
| progress(0.45, desc="Searching 64k recipesβ¦") | |
| t0 = time.perf_counter() | |
| recipes, query_text, scores = retrieve_recipes(names, dietary, speed, dish_type) | |
| debug["timing_ms"]["faiss_ms"] = round((time.perf_counter() - t0) * 1000) | |
| debug["query"] = query_text | |
| debug["reranker_scores"] = { | |
| r.get("recipe_title", f"recipe_{i}"): round(float(s), 4) | |
| for i, (r, s) in enumerate(zip(recipes, scores)) | |
| } | |
| if not recipes: | |
| raise gr.Error( | |
| "No recipes matched those filters. " | |
| "Try setting Dietary and/or Cook speed to 'any'." | |
| ) | |
| # ββ Phase 1c: build result panels ββββββββββββββββββββββββββββββββββββββββ | |
| progress(0.75, desc="Building result panelsβ¦") | |
| t0 = time.perf_counter() | |
| ingr_html_str = build_ingredient_html(top_ingr) | |
| dish_gal = build_dish_gallery(recipes) | |
| display = [ | |
| { | |
| "Title": r.get("recipe_title", ""), | |
| "Cuisine": _stringify(r.get(CUISINE_COL, "")), | |
| "Type": _stringify(r.get("course_list", r.get("course", r.get("category", "")))), | |
| "Speed": r.get("cook_speed", ""), | |
| "Dietary": _stringify(r.get(DIETARY_COL, "")), | |
| } | |
| for r in recipes | |
| ] | |
| recipe_df_data = pd.DataFrame(display) | |
| recipe_detail = build_recipe_detail_md(recipes[0]) | |
| debug["timing_ms"]["render_ms"] = round((time.perf_counter() - t0) * 1000) | |
| debug["timing_ms"]["phase1_total_ms"] = round((time.perf_counter() - t_total) * 1000) | |
| elapsed_s = debug["timing_ms"]["phase1_total_ms"] / 1000 | |
| faiss_s = debug["timing_ms"]["faiss_ms"] / 1000 | |
| status = ( | |
| f"**Phase 1 complete** β found **{len(recipes)}** recipes " | |
| f"(FAISS {faiss_s:.2f}s Β· total {elapsed_s:.2f}s) " | |
| f"| Click a row or dish image to select, then press **Narrate** for AI narration." | |
| ) | |
| progress(1.0, desc="Phase 1 done β ") | |
| return ( | |
| status, | |
| ingr_html_str, | |
| dish_gal, | |
| recipe_df_data, | |
| recipe_detail, | |
| recipes[0], | |
| recipes, | |
| debug, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SELECTION HANDLERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def select_from_df(evt: gr.SelectData, results: list[dict]) -> tuple[dict, str, str]: | |
| if not results: | |
| return None, "", "" | |
| row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else 0 | |
| row = results[min(row_idx, len(results) - 1)] | |
| return row, f"Selected: **{row.get('recipe_title', 'Recipe')}**", build_recipe_detail_md(row) | |
| def select_from_gallery(evt: gr.SelectData, results: list[dict]) -> tuple[dict, str, str]: | |
| if not results: | |
| return None, "", "" | |
| idx = min(int(evt.index), len(results) - 1) | |
| row = results[idx] | |
| return row, f"Selected: **{row.get('recipe_title', 'Recipe')}**", build_recipe_detail_md(row) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PIPELINE DIAGRAM β Tab 2 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PIPELINE_MD = """\ | |
| ## How the pipeline works | |
| ``` | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β USER INPUT β | |
| β Photo ββORββ Text query ββORββ Ingredient list β | |
| ββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ | |
| β | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β EfficientNet-B0 (lazy β loads on first photo) β | |
| β image β top-10 ingredient predictions β | |
| β Fruits-360 + Recipe Ingredients Dataset ~150 cls β | |
| ββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ | |
| β ingredient name list | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β multilingual-MiniLM-L12-v2 Β· 384-dim β | |
| β "ingredients: tomato, onion, β¦" β float32 vec β | |
| ββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ | |
| β | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β FAISS IndexFlatIP Β· 64k recipes β | |
| β top-50 β filters: diet + speed + dish type β | |
| ββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ | |
| β | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β MLP reranker β | |
| β FAISS score + ingredient overlap + filter match β | |
| ββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ | |
| β β PHASE 1 COMPLETE (< 3 s) | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β Top-5 recipe cards β | |
| β (title Β· cuisine Β· dietary tags Β· cook speed) β | |
| ββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ | |
| β user clicks "Narrate" β PHASE 2 | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β TinyLlama-1.1B-Chat Q4_K_M (lazy β ~25 s load)β | |
| β Streams friendly narration Β· ~30 s on CPU β | |
| ββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ | |
| β user types in chat | |
| ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ | |
| β Chat mode: recipe injected as system context β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ``` | |
| ### Lazy loading strategy | |
| | Component | Loads at | Approx. time | | |
| |---|---|---| | |
| | FAISS index + dataframe | App startup | ~2 s | | |
| | SentenceTransformer | App startup | ~3 s | | |
| | EfficientNet-B0 | First photo upload | ~10 s (once) | | |
| | TinyLlama GGUF | First "Narrate" or Chat | ~25 s (once) | | |
| After first load each model is cached in-process for all subsequent requests. | |
| ### Per-stage latency (free-tier CPU, post-load) | |
| | Step | Time | | |
| |---|---| | |
| | CNN classification | < 1 s | | |
| | MiniLM embedding | < 0.5 s | | |
| | FAISS top-50 search + filters + MLP rerank | < 1.5 s | | |
| | **Phase 1 total** | **< 3 s** | | |
| | LLM narration (512 tokens) | 25β40 s | | |
| | Chat reply (300 tokens) | 15β25 s | | |
| """ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="Recipe Recommender (optimised)", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π³ Recipe Recommender β Multimodal AI Demo") | |
| gr.Markdown( | |
| "FAISS index and embedding model ready β " | |
| "| CNN and LLM load **on first use** (once only, then cached)." | |
| ) | |
| # Shared state | |
| recipe_state = gr.State(None) # currently selected recipe dict | |
| results_state = gr.State([]) # all retrieved recipe dicts | |
| with gr.Tabs(): | |
| # ββ TAB 1 β Find recipes βββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Find recipes"): | |
| with gr.Row(equal_height=False): | |
| # ββ LEFT COLUMN: inputs + examples ββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1, min_width=300): | |
| img_input = gr.Image( | |
| label="Photo of your ingredient (optional)", | |
| type="pil", | |
| height=220, | |
| ) | |
| text_input = gr.Textbox( | |
| label="Or describe ingredients / craving", | |
| placeholder="tomato, onion, garlic, basil or quick vegan pastaβ¦", | |
| lines=2, | |
| ) | |
| dietary_dd = gr.Dropdown( | |
| label="Dietary preference", | |
| choices=DIETARY_CHOICES, | |
| value="any", | |
| ) | |
| speed_dd = gr.Dropdown( | |
| label="Cook speed", | |
| choices=SPEED_CHOICES, | |
| value="any", | |
| ) | |
| dish_type_dd = gr.Dropdown( | |
| label="Dish type", | |
| choices=DISH_TYPE_CHOICES, | |
| value="any", | |
| ) | |
| find_btn = gr.Button("Find Recipes π", variant="primary", size="lg") | |
| # OPTIMISATION 4 β pre-loaded text examples | |
| gr.Examples( | |
| examples=[ | |
| ["tomato, mozzarella, basil", "vegetarian", "any", "any"], | |
| ["chicken, garlic, lemon", "any", "medium", "any"], | |
| ["oats, banana, honey", "vegan", "any", "any"], | |
| ["pasta, eggs, bacon, parmesan", "any", "medium", "any"], | |
| ["black beans, corn, avocado", "vegan", "any", "any"], | |
| ], | |
| inputs=[text_input, dietary_dd, speed_dd, dish_type_dd], | |
| label="Try an example", | |
| examples_per_page=5, | |
| cache_examples=False, | |
| ) | |
| # ββ RIGHT COLUMN: results ββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=2): | |
| search_status = gr.Markdown( | |
| "Upload a photo **or** type ingredients, then click **Find Recipes**." | |
| ) | |
| # Panel A β detected ingredients (HTML: thumbnail OR coloured badge) | |
| with gr.Accordion("Detected ingredients", open=True): | |
| ingr_html = gr.HTML( | |
| value='<p style="color:#aaa;padding:8px;font-size:13px">β</p>' | |
| ) | |
| # Panel B β top recipes (Phase 1 output) | |
| with gr.Accordion("Top recipes", open=True): | |
| dish_gallery = gr.Gallery( | |
| label="Dish images β click to select a recipe", | |
| columns=5, | |
| height=190, | |
| object_fit="cover", | |
| show_label=True, | |
| allow_preview=False, | |
| ) | |
| recipe_df = gr.Dataframe( | |
| headers=["Title", "Cuisine", "Type", "Speed", "Dietary"], | |
| interactive=False, | |
| wrap=True, | |
| row_count=(5, "fixed"), | |
| ) | |
| # Panel C β selected recipe details + LLM narration | |
| with gr.Accordion( | |
| "Recipe procedure and ingredients", | |
| open=True, | |
| ): | |
| recipe_detail_md = gr.Markdown( | |
| "Select a recipe to see ingredients and procedure." | |
| ) | |
| narrate_btn = gr.Button( | |
| "Narrate selected recipe βΆ", variant="secondary" | |
| ) | |
| narration_box = gr.Textbox( | |
| lines=12, | |
| interactive=False, | |
| placeholder=( | |
| "Select a recipe in the table above, " | |
| "then click 'Narrate selected recipe'β¦" | |
| ), | |
| show_copy_button=True, | |
| label="", | |
| ) | |
| # Panel D β chat about active recipe | |
| with gr.Accordion( | |
| "Chat about this recipe [TinyLlama Β· ~20 s per reply]", | |
| open=True, | |
| ): | |
| chatbot = gr.Chatbot(height=300, bubble_full_width=False) | |
| with gr.Row(): | |
| chat_input = gr.Textbox( | |
| placeholder="Ask me anything about this recipeβ¦", | |
| show_label=False, | |
| scale=5, | |
| ) | |
| chat_btn = gr.Button("Send β©", scale=1, variant="primary") | |
| clear_btn = gr.Button("Clear chat", size="sm") | |
| # OPTIMISATION 5 β Pipeline transparency panel | |
| with gr.Accordion("Pipeline transparency", open=False): | |
| gr.Markdown( | |
| "_Query embedding text, MLP reranker scores, " | |
| "and per-stage timing for the last request._" | |
| ) | |
| pipeline_debug_json = gr.JSON(label="", value={}) | |
| # ββ TAB 2 β How it works βββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("How it works"): | |
| gr.Markdown(PIPELINE_MD) | |
| # ββ EVENT HANDLERS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Phase 1 search β 7 outputs (added ingr_html, pipeline_debug) | |
| find_btn.click( | |
| fn=find_recipes, | |
| inputs=[img_input, text_input, dietary_dd, speed_dd, dish_type_dd], | |
| outputs=[ | |
| search_status, | |
| ingr_html, | |
| dish_gallery, | |
| recipe_df, | |
| recipe_detail_md, | |
| recipe_state, | |
| results_state, | |
| pipeline_debug_json, | |
| ], | |
| ) | |
| # Select recipe via dataframe row click | |
| recipe_df.select( | |
| fn=select_from_df, | |
| inputs=[results_state], | |
| outputs=[recipe_state, search_status, recipe_detail_md], | |
| ) | |
| # Select recipe via dish gallery click | |
| dish_gallery.select( | |
| fn=select_from_gallery, | |
| inputs=[results_state], | |
| outputs=[recipe_state, search_status, recipe_detail_md], | |
| ) | |
| # Phase 2 β streaming narration (lazy LLM) | |
| narrate_btn.click( | |
| fn=generate_recipe, | |
| inputs=[recipe_state], | |
| outputs=[narration_box], | |
| ) | |
| # Chat β button or Enter key | |
| chat_btn.click( | |
| fn=chat_about_recipe, | |
| inputs=[chat_input, chatbot, recipe_state], | |
| outputs=[chatbot, chat_input], | |
| ) | |
| chat_input.submit( | |
| fn=chat_about_recipe, | |
| inputs=[chat_input, chatbot, recipe_state], | |
| outputs=[chatbot, chat_input], | |
| ) | |
| clear_btn.click(fn=lambda: ([], ""), outputs=[chatbot, chat_input]) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LAUNCH | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| demo.queue(max_size=3) | |
| demo.launch() | |