Iris314's picture
Upload app.py
69dd7d5 verified
"""Gradio application for the smart fridge detector + recipe recommendation pipeline."""
import json
import tempfile
from pathlib import Path
from typing import List, Tuple, Dict, Any
import cv2
import gradio as gr
import numpy as np
from PIL import Image
import pandas as pd
from frige_detect.detect import (
detect_and_generate,
load_roboflow_credentials,
RoboflowCredentials,
)
from recipe_recommendation.main import (
load_recipes,
recommend_recipes,
save_user_profile,
get_feedback,
USER_DATA_DIR,
)
from sklearn.metrics import ndcg_score
import joblib
# ---------------------------------------------------------------------------
# Global resources
# ---------------------------------------------------------------------------
CREDENTIALS_PATH = Path("frige_detect/roboflow_credentials.txt")
ROBOFLOW_CREDENTIALS: RoboflowCredentials = load_roboflow_credentials(str(CREDENTIALS_PATH))
RECIPES_DF = load_recipes()
# ---------------------------------------------------------------------------
# Predefined user profiles for examples
# ---------------------------------------------------------------------------
# EXAMPLE_PROFILES = {
# "user_1": {
# "vegetarian_type": "flexible",
# "allergies": "",
# "regions": "North America",
# "calorie_min": 250,
# "calorie_max": 2000,
# "protein_min": 10,
# "protein_max": 160,
# "preferred_main": "",
# "disliked_main": "",
# "cooking_time": 45,
# },
# "user_2": {
# "vegetarian_type": "flexible_vegetarian",
# "allergies": "shrimp",
# "regions": "Asia",
# "calorie_min": 400,
# "calorie_max": 1500,
# "protein_min": 40,
# "protein_max": 120,
# "preferred_main": "tofu",
# "disliked_main": "beef",
# "cooking_time": 60,
# },
# "user_3": {
# "vegetarian_type": "non_vegetarian",
# "allergies": "",
# "regions": "Europe",
# "calorie_min": 500,
# "calorie_max": 2000,
# "protein_min": 80,
# "protein_max": 160,
# "preferred_main": "beef, chicken",
# "disliked_main": "",
# "cooking_time": 45,
# },
# }
EXAMPLE_IDS = [
uid for uid in ("user_1", "user_2", "user_3", "user_4", "user_5")
if (USER_DATA_DIR / uid / "user_profile.json").exists()
]
# Predefined example images
EXAMPLE_IMAGES = [
"frige_detect/demo/t1.jpg",
"frige_detect/demo/t2.jpg",
"frige_detect/demo/t3.jpg",
]
# ---------------------------------------------------------------------------
# Helper utilities
# ---------------------------------------------------------------------------
def parse_csv_list(text: str) -> List[str]:
if not text:
return []
parts = [item.strip() for item in text.split(",") if item.strip()]
return parts
def ensure_numpy_image(image: Any) -> np.ndarray:
"""Convert incoming image (PIL or numpy) to RGB numpy array."""
if image is None:
raise ValueError("Please upload a fridge photo before running detection.")
if isinstance(image, np.ndarray):
return image
if isinstance(image, Image.Image):
return np.array(image.convert("RGB"))
raise ValueError("Unsupported image format provided.")
def write_temp_image(image: np.ndarray) -> str:
"""Write numpy image to a temporary file and return the path."""
temp_dir = Path(tempfile.mkdtemp(prefix="fridge_upload_"))
temp_path = temp_dir / "upload.jpg"
bgr_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
cv2.imwrite(str(temp_path), bgr_image)
return str(temp_path)
def build_user_profile(
user_id: str,
vegetarian_type: str,
allergies: str,
regions: str,
calorie_range: Tuple[float, float],
protein_range: Tuple[float, float],
preferred_main: str,
disliked_main: str,
cooking_time: float,
) -> Dict[str, Any]:
"""
Build and save user profile. This function ALWAYS creates or overwrites the profile
with the current input values, enabling users to modify preferences on-the-fly.
"""
user_id = user_id.strip()
if not user_id:
raise ValueError("User ID cannot be empty.")
profile_dir = USER_DATA_DIR / user_id
profile_path = profile_dir / "user_profile.json"
# Preserve feedback count if profile exists
num_feedback = 0
if profile_path.exists():
try:
existing = json.loads(profile_path.read_text(encoding="utf-8"))
num_feedback = existing.get("num_feedback", 0)
except Exception:
pass
profile = {
"user_id": user_id,
"num_feedback": num_feedback,
"diet": {"vegetarian_type": vegetarian_type},
"allergies": parse_csv_list(allergies),
"region_preference": parse_csv_list(regions),
"nutritional_goals": {
"calories": {"min": int(calorie_range[0]), "max": int(calorie_range[1])},
"protein": {"min": int(protein_range[0]), "max": int(protein_range[1])},
},
"other_preferences": {
"preferred_main": parse_csv_list(preferred_main),
"disliked_main": parse_csv_list(disliked_main),
"cooking_time_max": int(cooking_time) if cooking_time else None,
},
}
# Always save the profile (create new or overwrite existing)
save_user_profile(user_id, profile)
print(f"[app] Profile saved/updated for user '{user_id}'")
return profile
def summarize_ingredients(
user_parents: List[str],
high_conf: List[str],
low_conf: List[str],
) -> str:
lines = ["### Ingredient Mapping"]
if user_parents:
lines.append("- **Mapped parent ingredients:** " + ", ".join(sorted(user_parents)))
else:
lines.append("- **Mapped parent ingredients:** none")
if high_conf:
lines.append("- **High confidence detections:** " + ", ".join(sorted(high_conf)))
if low_conf:
lines.append("- **Low confidence detections:** " + ", ".join(sorted(set(low_conf))))
return "\n".join(lines)
def _ensure_iterable(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, set):
return sorted(value)
if isinstance(value, list):
return value
if isinstance(value, str):
return [value]
return list(value)
def render_recommendations(df, user_parents=None):
"""
Render the top-k recommendation list in Markdown format with
✅/❌ marks for ingredients. Uses original ingredient names instead of parent names.
"""
if user_parents is None:
user_parents = set()
else:
user_parents = set(user_parents)
feedback_rows = []
md_lines = []
for i, row in df.iterrows():
# --- Header line with score ---
name = row.get("name", "Unknown")
score = row.get("score", None)
if score is not None:
line = f"**{i+1}. {name} — score {score:.1f}%**"
else:
line = f"**{i+1}. {name}**"
md_lines.append(line)
# --- Region / Cuisine ---
region = row.get("region", "Unavailable")
if isinstance(region, (set, list, tuple)):
region_str = ", ".join(str(x) for x in region)
else:
region_str = str(region)
cuisine = _ensure_iterable(row.get("cuisine_attr"))
cuisine_str = ", ".join(cuisine) if cuisine else "Unavailable"
md_lines.append(f" - Region: {region_str}")
md_lines.append(f" - Cuisine: {cuisine_str}")
# --- Nutrition ---
calories = row.get("calories", "N/A")
protein = row.get("protein", "N/A")
md_lines.append(f" - Calories: {calories}")
md_lines.append(f" - Protein: {protein}")
# --- Build mapping: parent -> list of original ingredient strings ---
ingredient_list = _ensure_iterable(row.get("ingredients"))
parent_to_ing = {}
for ing in ingredient_list:
ing_lower = ing.lower()
for parent_cat in ["main_parent", "staple_parent", "other_parent", "seasoning_parent"]:
parents = _ensure_iterable(row.get(parent_cat))
for p in parents:
if p in ing_lower:
parent_to_ing.setdefault(p, []).append(ing)
# --- Ingredient categories with ✅/❌ based on parent presence ---
for key, label in [
("main_parent", "Main Ingredients"),
("staple_parent", "Staple Ingredients"),
("other_parent", "Other Ingredients")
]:
parents = _ensure_iterable(row.get(key))
if parents:
annotated = []
for p in parents:
ing_names = parent_to_ing.get(p, [p]) # fallback to parent name if no match
mark = "✅" if p in user_parents else "❌"
annotated.append(f"{', '.join(ing_names)} {mark}")
md_lines.append(f" - {label}: {', '.join(annotated)}")
# --- Seasoning (no marks) ---
seasoning_parents = _ensure_iterable(row.get("seasoning_parent"))
if seasoning_parents:
seasoning_names = []
for p in seasoning_parents:
seasoning_names.extend(parent_to_ing.get(p, [p]))
md_lines.append(f" - Seasoning: {', '.join(seasoning_names)}")
md_lines.append("") # spacing
feedback_rows.append({
"recipe_name": name,
"recipe_id": row.get("recipe_id"),
"full_row": row.to_dict(),
})
return "\n".join(md_lines), feedback_rows
def load_example_profile(profile_name: str):
"""Load example user profile from recipe_recommendation/user_data/<user_x>/user_profile.json"""
try:
p = USER_DATA_DIR / profile_name / "user_profile.json"
data = json.loads(p.read_text(encoding="utf-8"))
veg_type = (data.get("diet", {}).get("vegetarian_type") or "flexible")
allergies = ",".join(data.get("allergies", []) or [])
regions = ",".join(data.get("region_preference", []) or [])
ng = data.get("nutritional_goals", {})
cal_min = int(ng.get("calories", {}).get("min", 400))
cal_max = int(ng.get("calories", {}).get("max", 2000))
pro_min = int(ng.get("protein", {}).get("min", 10))
pro_max = int(ng.get("protein", {}).get("max", 160))
op = data.get("other_preferences", {})
preferred_main = ",".join(op.get("preferred_main", []) or [])
disliked_main = ",".join(op.get("disliked_main", []) or [])
cooking_time = int(op.get("cooking_time_max", 45) or 45)
return (
profile_name,
veg_type,
allergies,
regions,
cal_min,
cal_max,
pro_min,
pro_max,
preferred_main,
disliked_main,
cooking_time,
)
except Exception as exc:
return ("user_custom", "flexible", "", "", 400, 2000, 10, 160, "", "", 45)
def load_example_image(image_path: str):
"""Load an example image."""
return image_path
def run_pipeline(
image,
user_id,
vegetarian_type,
allergies,
regions,
calorie_min,
calorie_max,
protein_min,
protein_max,
preferred_main,
disliked_main,
cooking_time,
):
"""
Main pipeline function.
This ALWAYS creates/updates the user profile based on current input values,
then runs detection and recommendation.
"""
try:
rgb_image = ensure_numpy_image(image)
upload_path = write_temp_image(rgb_image)
temp_dir = Path(tempfile.mkdtemp(prefix="fridge_outputs_"))
output_json = temp_dir / "recipe_input.json"
output_image = temp_dir / "annotated_image.jpg"
detection_result = detect_and_generate(
image_path=upload_path,
credentials=ROBOFLOW_CREDENTIALS,
conf_threshold=0.4,
overlap_threshold=0.3,
conf_split=0.7,
output_json=str(output_json),
output_image=str(output_image),
)
Path(upload_path).unlink(missing_ok=True)
detection_payload = detection_result.get("recipe_json")
if not detection_payload:
raise ValueError("No detection result returned. Please try again with a clearer photo.")
ingredients = detection_payload.get("ingredients", [])
if not isinstance(ingredients, list) or len(ingredients) == 0:
raise ValueError("No ingredients detected. Please try again with a clearer photo.")
#2: Always create/update user profile with current UI values
profile = build_user_profile(
user_id,
vegetarian_type,
allergies,
regions,
(calorie_min, calorie_max),
(protein_min, protein_max),
preferred_main,
disliked_main,
cooking_time,
)
import time
time.sleep(1)
detection_payload = detection_result["recipe_json"]
detection_payload_json = json.dumps(detection_payload, ensure_ascii=False, indent=2)
ml_top, user_parents, high_conf, low_conf = recommend_recipes(
detection_payload,
user_id,
RECIPES_DF,
topk=5,
)
if not ml_top.empty:
for score_col in ["rank_score", "pred", "match_score", "ml_score"]:
if score_col in ml_top.columns and ml_top[score_col].notna().any():
max_val = ml_top[score_col].max()
if max_val > 0:
ml_top["score"] = ml_top[score_col] / max_val * 100
break
ingredient_summary = summarize_ingredients(user_parents, high_conf, low_conf)
protein_min_val = profile["nutritional_goals"]["protein"]["min"]
protein_max_val = profile["nutritional_goals"]["protein"]["max"]
calorie_min_val = profile["nutritional_goals"]["calories"]["min"]
calorie_max_val = profile["nutritional_goals"]["calories"]["max"]
goals_summary = (
f"### Nutritional Goals\n"
f"- Calories: {calorie_min_val} - {calorie_max_val} kcal\n"
f"- Protein: {protein_min_val} - {protein_max_val} g"
)
ingredient_summary = goals_summary + "\n\n" + ingredient_summary
recommendation_md, feedback_rows = render_recommendations(ml_top, user_parents)
dropdown_choices = [
f"{idx + 1}. {row.get('recipe_name', 'Recipe')}" for idx, row in enumerate(feedback_rows)
]
status = "" if feedback_rows else "No recipes available for feedback yet."
# Add success message about profile creation/update
profile_status = f"✓ Profile '{user_id}' has been saved/updated with your current preferences."
return (
str(output_image),
detection_payload_json,
ingredient_summary,
recommendation_md,
gr.update(choices=dropdown_choices, value=None),
feedback_rows,
profile_status,
)
except Exception as exc:
return (
None,
None,
f"⚠️ {exc}",
"",
gr.update(choices=[], value=None),
[],
f"⚠️ {exc}",
)
def record_feedback(selected_recipe: str, user_id: str, feedback_rows: List[Dict[str, Any]]):
if not selected_recipe:
return "Please select a recipe before submitting feedback."
if not user_id:
return "Please provide a valid user ID."
if not feedback_rows:
return "No recommendation data available. Run the pipeline first."
try:
index = int(selected_recipe.split(".")[0]) - 1
except (ValueError, IndexError):
return "Unable to parse the selected recipe."
if index < 0 or index >= len(feedback_rows):
return "Selected recipe is out of range."
recipe_row = feedback_rows[index]
get_feedback(user_id, recipe_row)
profile_path = USER_DATA_DIR / user_id / "user_profile.json"
if profile_path.exists():
data = json.loads(profile_path.read_text(encoding="utf-8"))
data["num_feedback"] = data.get("num_feedback", 0) + 1
save_user_profile(user_id, data)
return f"✓ Feedback recorded for {recipe_row.get('recipe_name', 'selected recipe')}!"
# ---------------------------------------------------------------------------
# Gradio UI definition
# ---------------------------------------------------------------------------
# config cheker
# def check_config():
# profile_path = USER_DATA_DIR / "user_custom" / "user_profile.json"
# if not profile_path.exists():
# print("⚠️ No profile found for user_custom yet")
# return
# with open(profile_path) as f:
# profile = json.load(f)
# from recipe_recommendation.main import normalize_user_profile, prepare_recipes_df
# profile = normalize_user_profile(profile)
# ng = profile['nutritional_goals']
# cal = ng['calories']
# pro = ng['protein']
# print("\n" + "="*60)
# print("⚙️ USER CONFIG CHECK")
# print("="*60)
# print(f"Calories: {cal['min']} - {cal['max']}")
# print(f"Protein: {pro['min']} - {pro['max']}g")
# df = prepare_recipes_df(RECIPES_DF.copy())
# # Test how many pass
# passed = df[(df['calories'] >= cal['min']) & (df['calories'] <= cal['max']) &
# (df['protein'] >= pro['min']) & (df['protein'] <= pro['max'])]
# print(f"\nRecipes matching your ranges: {len(passed)}/{len(df)} ({len(passed)/len(df)*100:.1f}%)")
# if len(passed) == 0:
# print("\n❌ NO RECIPES match your settings!")
# print(f"Try: Calories 200-1500, Protein 10-120")
# else:
# print(f"\n✅ OK - showing sample:")
# for _, r in passed.head(3).iterrows():
# print(f" - {r['name'][:40]}: {r['calories']:.0f} cal, {r['protein']:.0f}g")
# print("="*60 + "\n")
# check_config()
def split_ranges(calorie_range, protein_range):
cal_min, cal_max = calorie_range
pro_min, pro_max = protein_range
return cal_min, cal_max, pro_min, pro_max
with gr.Blocks(title="Smart Fridge Recipe Assistant", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# Smart Fridge Recipe Assistant
**Notice:** For non-example profiles, online cold start may take **3-5 minutes** to run.
We recommend starting with the **Example Profiles** and **Example Fridge Images** for a faster experience.
**How to use:**
1. (Optional) Select an example profile and/or image from dropdowns
2. Modify any preferences in the form - your profile will be saved automatically when you click Analyze
3. Upload or select a fridge image
4. Click "Analyze fridge & recommend recipes"
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Quick Start Examples")
profile_selector = gr.Dropdown(
label="Choose a predefined user profile",
choices=EXAMPLE_IDS,
value=None,
)
image_selector = gr.Dropdown(
label="Choose an example fridge image",
choices=[f"Image {i+1}: {img}" for i, img in enumerate(EXAMPLE_IMAGES)],
value=None,
)
image_input = gr.Image(
label="Fridge photo (upload or use example)",
type="pil",
height=350,
)
detection_json = gr.JSON(label="Detection payload")
annotated_output = gr.Image(label="Annotated detection", height=350)
with gr.Column(scale=1):
gr.Markdown("### User Preferences (auto-saved on each run)")
user_id_box = gr.Textbox(
label="User ID (will create new profile if doesn't exist)",
value="user_custom",
placeholder="e.g. my_new_profile",
)
vegetarian_radio = gr.Radio(
[
"flexible_vegetarian",
"vegetarian",
"vegan",
"non_vegetarian",
],
label="Vegetarian preference",
value="non_vegetarian",
)
allergies_box = gr.Textbox(
label="Allergies (comma separated)",
placeholder="peanut, shrimp",
)
regions_box = gr.Textbox(
label="Preferred regions (comma separated)",
placeholder="Asia, Europe",
)
calorie_min_slider = gr.Slider(0, 4000, value=400, step=50, label="Min Calories")
calorie_max_slider = gr.Slider(0, 4000, value=2000, step=50, label="Max Calories")
protein_min_slider = gr.Slider(
minimum=0,
maximum=250,
value=10,
step=5,
label="Protein Min (g)",
)
protein_max_slider = gr.Slider(
minimum=0,
maximum=250,
value=160,
step=5,
label="Protein Max (g)",
)
preferred_box = gr.Textbox(
label="Preferred main ingredients",
placeholder="chicken, tofu",
)
disliked_box = gr.Textbox(
label="Disliked main ingredients",
placeholder="lamb",
)
cooking_slider = gr.Slider(
minimum=0,
maximum=180,
value=45,
step=5,
label="Max cooking time (minutes)",
)
run_button = gr.Button("Analyze fridge & recommend recipes", variant="primary")
ingredient_md = gr.Markdown()
recommendation_md = gr.Markdown()
feedback_dropdown = gr.Dropdown(label="Select a recipe for positive feedback", choices=[])
feedback_button = gr.Button("Save feedback")
feedback_status = gr.Markdown()
feedback_state = gr.State([])
# Connect profile selector
profile_selector.change(
fn=load_example_profile,
inputs=[profile_selector],
outputs=[
user_id_box,
vegetarian_radio,
allergies_box,
regions_box,
calorie_min_slider,
calorie_max_slider,
protein_min_slider,
protein_max_slider,
preferred_box,
disliked_box,
cooking_slider,
],
)
# Connect image selector
def select_image(choice):
if choice:
idx = int(choice.split(":")[0].replace("Image ", "")) - 1
return EXAMPLE_IMAGES[idx]
return None
image_selector.change(
fn=select_image,
inputs=[image_selector],
outputs=[image_input],
)
run_button.click(
fn=run_pipeline,
inputs=[
image_input,
user_id_box,
vegetarian_radio,
allergies_box,
regions_box,
calorie_min_slider,
calorie_max_slider,
protein_min_slider,
protein_max_slider,
preferred_box,
disliked_box,
cooking_slider,
],
outputs=[
annotated_output,
detection_json,
ingredient_md,
recommendation_md,
feedback_dropdown,
feedback_state,
feedback_status,
],
)
feedback_button.click(
fn=record_feedback,
inputs=[feedback_dropdown, user_id_box, feedback_state],
outputs=feedback_status,
)
if __name__ == "__main__":
demo.launch(share=True)