"""Gradio application for the smart fridge detector + recipe recommendation pipeline.""" import json import tempfile from pathlib import Path from typing import List, Tuple, Dict, Any import cv2 import gradio as gr import numpy as np from PIL import Image import pandas as pd from frige_detect.detect import ( detect_and_generate, load_roboflow_credentials, RoboflowCredentials, ) from recipe_recommendation.main import ( load_recipes, recommend_recipes, save_user_profile, get_feedback, USER_DATA_DIR, ) from sklearn.metrics import ndcg_score import joblib # --------------------------------------------------------------------------- # Global resources # --------------------------------------------------------------------------- CREDENTIALS_PATH = Path("frige_detect/roboflow_credentials.txt") ROBOFLOW_CREDENTIALS: RoboflowCredentials = load_roboflow_credentials(str(CREDENTIALS_PATH)) RECIPES_DF = load_recipes() # --------------------------------------------------------------------------- # Predefined user profiles for examples # --------------------------------------------------------------------------- # EXAMPLE_PROFILES = { # "user_1": { # "vegetarian_type": "flexible", # "allergies": "", # "regions": "North America", # "calorie_min": 250, # "calorie_max": 2000, # "protein_min": 10, # "protein_max": 160, # "preferred_main": "", # "disliked_main": "", # "cooking_time": 45, # }, # "user_2": { # "vegetarian_type": "flexible_vegetarian", # "allergies": "shrimp", # "regions": "Asia", # "calorie_min": 400, # "calorie_max": 1500, # "protein_min": 40, # "protein_max": 120, # "preferred_main": "tofu", # "disliked_main": "beef", # "cooking_time": 60, # }, # "user_3": { # "vegetarian_type": "non_vegetarian", # "allergies": "", # "regions": "Europe", # "calorie_min": 500, # "calorie_max": 2000, # "protein_min": 80, # "protein_max": 160, # "preferred_main": "beef, chicken", # "disliked_main": "", # "cooking_time": 45, # }, # } EXAMPLE_IDS = [ uid for uid in ("user_1", "user_2", "user_3", "user_4", "user_5") if (USER_DATA_DIR / uid / "user_profile.json").exists() ] # Predefined example images EXAMPLE_IMAGES = [ "frige_detect/demo/t1.jpg", "frige_detect/demo/t2.jpg", "frige_detect/demo/t3.jpg", ] # --------------------------------------------------------------------------- # Helper utilities # --------------------------------------------------------------------------- def parse_csv_list(text: str) -> List[str]: if not text: return [] parts = [item.strip() for item in text.split(",") if item.strip()] return parts def ensure_numpy_image(image: Any) -> np.ndarray: """Convert incoming image (PIL or numpy) to RGB numpy array.""" if image is None: raise ValueError("Please upload a fridge photo before running detection.") if isinstance(image, np.ndarray): return image if isinstance(image, Image.Image): return np.array(image.convert("RGB")) raise ValueError("Unsupported image format provided.") def write_temp_image(image: np.ndarray) -> str: """Write numpy image to a temporary file and return the path.""" temp_dir = Path(tempfile.mkdtemp(prefix="fridge_upload_")) temp_path = temp_dir / "upload.jpg" bgr_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) cv2.imwrite(str(temp_path), bgr_image) return str(temp_path) def build_user_profile( user_id: str, vegetarian_type: str, allergies: str, regions: str, calorie_range: Tuple[float, float], protein_range: Tuple[float, float], preferred_main: str, disliked_main: str, cooking_time: float, ) -> Dict[str, Any]: """ Build and save user profile. This function ALWAYS creates or overwrites the profile with the current input values, enabling users to modify preferences on-the-fly. """ user_id = user_id.strip() if not user_id: raise ValueError("User ID cannot be empty.") profile_dir = USER_DATA_DIR / user_id profile_path = profile_dir / "user_profile.json" # Preserve feedback count if profile exists num_feedback = 0 if profile_path.exists(): try: existing = json.loads(profile_path.read_text(encoding="utf-8")) num_feedback = existing.get("num_feedback", 0) except Exception: pass profile = { "user_id": user_id, "num_feedback": num_feedback, "diet": {"vegetarian_type": vegetarian_type}, "allergies": parse_csv_list(allergies), "region_preference": parse_csv_list(regions), "nutritional_goals": { "calories": {"min": int(calorie_range[0]), "max": int(calorie_range[1])}, "protein": {"min": int(protein_range[0]), "max": int(protein_range[1])}, }, "other_preferences": { "preferred_main": parse_csv_list(preferred_main), "disliked_main": parse_csv_list(disliked_main), "cooking_time_max": int(cooking_time) if cooking_time else None, }, } # Always save the profile (create new or overwrite existing) save_user_profile(user_id, profile) print(f"[app] Profile saved/updated for user '{user_id}'") return profile def summarize_ingredients( user_parents: List[str], high_conf: List[str], low_conf: List[str], ) -> str: lines = ["### Ingredient Mapping"] if user_parents: lines.append("- **Mapped parent ingredients:** " + ", ".join(sorted(user_parents))) else: lines.append("- **Mapped parent ingredients:** none") if high_conf: lines.append("- **High confidence detections:** " + ", ".join(sorted(high_conf))) if low_conf: lines.append("- **Low confidence detections:** " + ", ".join(sorted(set(low_conf)))) return "\n".join(lines) def _ensure_iterable(value: Any) -> List[str]: if value is None: return [] if isinstance(value, set): return sorted(value) if isinstance(value, list): return value if isinstance(value, str): return [value] return list(value) def render_recommendations(df, user_parents=None): """ Render the top-k recommendation list in Markdown format with ✅/❌ marks for ingredients. Uses original ingredient names instead of parent names. """ if user_parents is None: user_parents = set() else: user_parents = set(user_parents) feedback_rows = [] md_lines = [] for i, row in df.iterrows(): # --- Header line with score --- name = row.get("name", "Unknown") score = row.get("score", None) if score is not None: line = f"**{i+1}. {name} — score {score:.1f}%**" else: line = f"**{i+1}. {name}**" md_lines.append(line) # --- Region / Cuisine --- region = row.get("region", "Unavailable") if isinstance(region, (set, list, tuple)): region_str = ", ".join(str(x) for x in region) else: region_str = str(region) cuisine = _ensure_iterable(row.get("cuisine_attr")) cuisine_str = ", ".join(cuisine) if cuisine else "Unavailable" md_lines.append(f" - Region: {region_str}") md_lines.append(f" - Cuisine: {cuisine_str}") # --- Nutrition --- calories = row.get("calories", "N/A") protein = row.get("protein", "N/A") md_lines.append(f" - Calories: {calories}") md_lines.append(f" - Protein: {protein}") # --- Build mapping: parent -> list of original ingredient strings --- ingredient_list = _ensure_iterable(row.get("ingredients")) parent_to_ing = {} for ing in ingredient_list: ing_lower = ing.lower() for parent_cat in ["main_parent", "staple_parent", "other_parent", "seasoning_parent"]: parents = _ensure_iterable(row.get(parent_cat)) for p in parents: if p in ing_lower: parent_to_ing.setdefault(p, []).append(ing) # --- Ingredient categories with ✅/❌ based on parent presence --- for key, label in [ ("main_parent", "Main Ingredients"), ("staple_parent", "Staple Ingredients"), ("other_parent", "Other Ingredients") ]: parents = _ensure_iterable(row.get(key)) if parents: annotated = [] for p in parents: ing_names = parent_to_ing.get(p, [p]) # fallback to parent name if no match mark = "✅" if p in user_parents else "❌" annotated.append(f"{', '.join(ing_names)} {mark}") md_lines.append(f" - {label}: {', '.join(annotated)}") # --- Seasoning (no marks) --- seasoning_parents = _ensure_iterable(row.get("seasoning_parent")) if seasoning_parents: seasoning_names = [] for p in seasoning_parents: seasoning_names.extend(parent_to_ing.get(p, [p])) md_lines.append(f" - Seasoning: {', '.join(seasoning_names)}") md_lines.append("") # spacing feedback_rows.append({ "recipe_name": name, "recipe_id": row.get("recipe_id"), "full_row": row.to_dict(), }) return "\n".join(md_lines), feedback_rows def load_example_profile(profile_name: str): """Load example user profile from recipe_recommendation/user_data//user_profile.json""" try: p = USER_DATA_DIR / profile_name / "user_profile.json" data = json.loads(p.read_text(encoding="utf-8")) veg_type = (data.get("diet", {}).get("vegetarian_type") or "flexible") allergies = ",".join(data.get("allergies", []) or []) regions = ",".join(data.get("region_preference", []) or []) ng = data.get("nutritional_goals", {}) cal_min = int(ng.get("calories", {}).get("min", 400)) cal_max = int(ng.get("calories", {}).get("max", 2000)) pro_min = int(ng.get("protein", {}).get("min", 10)) pro_max = int(ng.get("protein", {}).get("max", 160)) op = data.get("other_preferences", {}) preferred_main = ",".join(op.get("preferred_main", []) or []) disliked_main = ",".join(op.get("disliked_main", []) or []) cooking_time = int(op.get("cooking_time_max", 45) or 45) return ( profile_name, veg_type, allergies, regions, cal_min, cal_max, pro_min, pro_max, preferred_main, disliked_main, cooking_time, ) except Exception as exc: return ("user_custom", "flexible", "", "", 400, 2000, 10, 160, "", "", 45) def load_example_image(image_path: str): """Load an example image.""" return image_path def run_pipeline( image, user_id, vegetarian_type, allergies, regions, calorie_min, calorie_max, protein_min, protein_max, preferred_main, disliked_main, cooking_time, ): """ Main pipeline function. This ALWAYS creates/updates the user profile based on current input values, then runs detection and recommendation. """ try: rgb_image = ensure_numpy_image(image) upload_path = write_temp_image(rgb_image) temp_dir = Path(tempfile.mkdtemp(prefix="fridge_outputs_")) output_json = temp_dir / "recipe_input.json" output_image = temp_dir / "annotated_image.jpg" detection_result = detect_and_generate( image_path=upload_path, credentials=ROBOFLOW_CREDENTIALS, conf_threshold=0.4, overlap_threshold=0.3, conf_split=0.7, output_json=str(output_json), output_image=str(output_image), ) Path(upload_path).unlink(missing_ok=True) detection_payload = detection_result.get("recipe_json") if not detection_payload: raise ValueError("No detection result returned. Please try again with a clearer photo.") ingredients = detection_payload.get("ingredients", []) if not isinstance(ingredients, list) or len(ingredients) == 0: raise ValueError("No ingredients detected. Please try again with a clearer photo.") #2: Always create/update user profile with current UI values profile = build_user_profile( user_id, vegetarian_type, allergies, regions, (calorie_min, calorie_max), (protein_min, protein_max), preferred_main, disliked_main, cooking_time, ) import time time.sleep(1) detection_payload = detection_result["recipe_json"] detection_payload_json = json.dumps(detection_payload, ensure_ascii=False, indent=2) ml_top, user_parents, high_conf, low_conf = recommend_recipes( detection_payload, user_id, RECIPES_DF, topk=5, ) if not ml_top.empty: for score_col in ["rank_score", "pred", "match_score", "ml_score"]: if score_col in ml_top.columns and ml_top[score_col].notna().any(): max_val = ml_top[score_col].max() if max_val > 0: ml_top["score"] = ml_top[score_col] / max_val * 100 break ingredient_summary = summarize_ingredients(user_parents, high_conf, low_conf) protein_min_val = profile["nutritional_goals"]["protein"]["min"] protein_max_val = profile["nutritional_goals"]["protein"]["max"] calorie_min_val = profile["nutritional_goals"]["calories"]["min"] calorie_max_val = profile["nutritional_goals"]["calories"]["max"] goals_summary = ( f"### Nutritional Goals\n" f"- Calories: {calorie_min_val} - {calorie_max_val} kcal\n" f"- Protein: {protein_min_val} - {protein_max_val} g" ) ingredient_summary = goals_summary + "\n\n" + ingredient_summary recommendation_md, feedback_rows = render_recommendations(ml_top, user_parents) dropdown_choices = [ f"{idx + 1}. {row.get('recipe_name', 'Recipe')}" for idx, row in enumerate(feedback_rows) ] status = "" if feedback_rows else "No recipes available for feedback yet." # Add success message about profile creation/update profile_status = f"✓ Profile '{user_id}' has been saved/updated with your current preferences." return ( str(output_image), detection_payload_json, ingredient_summary, recommendation_md, gr.update(choices=dropdown_choices, value=None), feedback_rows, profile_status, ) except Exception as exc: return ( None, None, f"⚠️ {exc}", "", gr.update(choices=[], value=None), [], f"⚠️ {exc}", ) def record_feedback(selected_recipe: str, user_id: str, feedback_rows: List[Dict[str, Any]]): if not selected_recipe: return "Please select a recipe before submitting feedback." if not user_id: return "Please provide a valid user ID." if not feedback_rows: return "No recommendation data available. Run the pipeline first." try: index = int(selected_recipe.split(".")[0]) - 1 except (ValueError, IndexError): return "Unable to parse the selected recipe." if index < 0 or index >= len(feedback_rows): return "Selected recipe is out of range." recipe_row = feedback_rows[index] get_feedback(user_id, recipe_row) profile_path = USER_DATA_DIR / user_id / "user_profile.json" if profile_path.exists(): data = json.loads(profile_path.read_text(encoding="utf-8")) data["num_feedback"] = data.get("num_feedback", 0) + 1 save_user_profile(user_id, data) return f"✓ Feedback recorded for {recipe_row.get('recipe_name', 'selected recipe')}!" # --------------------------------------------------------------------------- # Gradio UI definition # --------------------------------------------------------------------------- # config cheker # def check_config(): # profile_path = USER_DATA_DIR / "user_custom" / "user_profile.json" # if not profile_path.exists(): # print("⚠️ No profile found for user_custom yet") # return # with open(profile_path) as f: # profile = json.load(f) # from recipe_recommendation.main import normalize_user_profile, prepare_recipes_df # profile = normalize_user_profile(profile) # ng = profile['nutritional_goals'] # cal = ng['calories'] # pro = ng['protein'] # print("\n" + "="*60) # print("⚙️ USER CONFIG CHECK") # print("="*60) # print(f"Calories: {cal['min']} - {cal['max']}") # print(f"Protein: {pro['min']} - {pro['max']}g") # df = prepare_recipes_df(RECIPES_DF.copy()) # # Test how many pass # passed = df[(df['calories'] >= cal['min']) & (df['calories'] <= cal['max']) & # (df['protein'] >= pro['min']) & (df['protein'] <= pro['max'])] # print(f"\nRecipes matching your ranges: {len(passed)}/{len(df)} ({len(passed)/len(df)*100:.1f}%)") # if len(passed) == 0: # print("\n❌ NO RECIPES match your settings!") # print(f"Try: Calories 200-1500, Protein 10-120") # else: # print(f"\n✅ OK - showing sample:") # for _, r in passed.head(3).iterrows(): # print(f" - {r['name'][:40]}: {r['calories']:.0f} cal, {r['protein']:.0f}g") # print("="*60 + "\n") # check_config() def split_ranges(calorie_range, protein_range): cal_min, cal_max = calorie_range pro_min, pro_max = protein_range return cal_min, cal_max, pro_min, pro_max with gr.Blocks(title="Smart Fridge Recipe Assistant", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # Smart Fridge Recipe Assistant **Notice:** For non-example profiles, online cold start may take **3-5 minutes** to run. We recommend starting with the **Example Profiles** and **Example Fridge Images** for a faster experience. **How to use:** 1. (Optional) Select an example profile and/or image from dropdowns 2. Modify any preferences in the form - your profile will be saved automatically when you click Analyze 3. Upload or select a fridge image 4. Click "Analyze fridge & recommend recipes" """ ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Quick Start Examples") profile_selector = gr.Dropdown( label="Choose a predefined user profile", choices=EXAMPLE_IDS, value=None, ) image_selector = gr.Dropdown( label="Choose an example fridge image", choices=[f"Image {i+1}: {img}" for i, img in enumerate(EXAMPLE_IMAGES)], value=None, ) image_input = gr.Image( label="Fridge photo (upload or use example)", type="pil", height=350, ) detection_json = gr.JSON(label="Detection payload") annotated_output = gr.Image(label="Annotated detection", height=350) with gr.Column(scale=1): gr.Markdown("### User Preferences (auto-saved on each run)") user_id_box = gr.Textbox( label="User ID (will create new profile if doesn't exist)", value="user_custom", placeholder="e.g. my_new_profile", ) vegetarian_radio = gr.Radio( [ "flexible_vegetarian", "vegetarian", "vegan", "non_vegetarian", ], label="Vegetarian preference", value="non_vegetarian", ) allergies_box = gr.Textbox( label="Allergies (comma separated)", placeholder="peanut, shrimp", ) regions_box = gr.Textbox( label="Preferred regions (comma separated)", placeholder="Asia, Europe", ) calorie_min_slider = gr.Slider(0, 4000, value=400, step=50, label="Min Calories") calorie_max_slider = gr.Slider(0, 4000, value=2000, step=50, label="Max Calories") protein_min_slider = gr.Slider( minimum=0, maximum=250, value=10, step=5, label="Protein Min (g)", ) protein_max_slider = gr.Slider( minimum=0, maximum=250, value=160, step=5, label="Protein Max (g)", ) preferred_box = gr.Textbox( label="Preferred main ingredients", placeholder="chicken, tofu", ) disliked_box = gr.Textbox( label="Disliked main ingredients", placeholder="lamb", ) cooking_slider = gr.Slider( minimum=0, maximum=180, value=45, step=5, label="Max cooking time (minutes)", ) run_button = gr.Button("Analyze fridge & recommend recipes", variant="primary") ingredient_md = gr.Markdown() recommendation_md = gr.Markdown() feedback_dropdown = gr.Dropdown(label="Select a recipe for positive feedback", choices=[]) feedback_button = gr.Button("Save feedback") feedback_status = gr.Markdown() feedback_state = gr.State([]) # Connect profile selector profile_selector.change( fn=load_example_profile, inputs=[profile_selector], outputs=[ user_id_box, vegetarian_radio, allergies_box, regions_box, calorie_min_slider, calorie_max_slider, protein_min_slider, protein_max_slider, preferred_box, disliked_box, cooking_slider, ], ) # Connect image selector def select_image(choice): if choice: idx = int(choice.split(":")[0].replace("Image ", "")) - 1 return EXAMPLE_IMAGES[idx] return None image_selector.change( fn=select_image, inputs=[image_selector], outputs=[image_input], ) run_button.click( fn=run_pipeline, inputs=[ image_input, user_id_box, vegetarian_radio, allergies_box, regions_box, calorie_min_slider, calorie_max_slider, protein_min_slider, protein_max_slider, preferred_box, disliked_box, cooking_slider, ], outputs=[ annotated_output, detection_json, ingredient_md, recommendation_md, feedback_dropdown, feedback_state, feedback_status, ], ) feedback_button.click( fn=record_feedback, inputs=[feedback_dropdown, user_id_box, feedback_state], outputs=feedback_status, ) if __name__ == "__main__": demo.launch(share=True)