Iris314's picture
Update recipe_recommendation/src/coldstart.py
c81cc13 verified
import os
import ast
import json
import random
import pandas as pd
import numpy as np
from tqdm import tqdm
import warnings
from .candidate import coarse_rank_candidates, hard_filter, rule_generate_candidates
from .feature import build_features
from .io import load_recipes_csv, load_ingredient_map
RECIPES_PATH = load_recipes_csv()
INGREDIENT_MAP = load_ingredient_map()
PARENTS = INGREDIENT_MAP["parents"]
CHILDREN = INGREDIENT_MAP["children"]
def parse_list(x):
"""Convert a stringified list into a Python list safely."""
if pd.isna(x) or x == "":
return []
if isinstance(x, list):
return x
try:
return ast.literal_eval(x)
except Exception:
return []
def parse_set(x):
"""Convert a stringified collection into a Python set safely."""
if pd.isna(x) or x == "":
return set()
if isinstance(x, set):
return x
if isinstance(x, (list, tuple)):
return set(x)
if isinstance(x, str):
try:
v = ast.literal_eval(x)
if isinstance(v, (list, tuple, set)):
return set(v)
return {v}
except Exception:
return {x.strip()}
return {x}
def _parents_pool_from_df(df: pd.DataFrame):
cols = ["main_parent", "staple_parent", "other_parent", "seasoning_parent"]
pool = set()
for c in cols:
if c in df.columns:
for s in df[c]:
pool |= set(s) if isinstance(s, (set, list, tuple)) else set()
return sorted(pool)
def sample_user_parents(parents_pool,
user_profile=None,
prev_inventory=None,
min_items=3, max_items=10,
keep_ratio=0.6, reset_interval=20, round_idx=0):
liked = set((user_profile or {}).get("other_preferences", {}).get("preferred_main", []))
disliked = set((user_profile or {}).get("other_preferences", {}).get("disliked_main", []))
forbidden = set((user_profile or {}).get("forbidden_parents", [])) | disliked
pool, weights = [], []
for p in parents_pool:
if p in forbidden:
continue
w = 3.0 if p in liked else 1.0
pool.append(p); weights.append(w)
if not pool:
pool, weights = parents_pool[:], [1.0] * len(parents_pool)
inventory = set()
force_reset = (round_idx % reset_interval == 0)
if prev_inventory and not force_reset:
prev_list = list(prev_inventory); random.shuffle(prev_list)
keep_k = max(0, int(len(prev_list) * keep_ratio))
inventory |= set(prev_list[:keep_k])
k = random.randint(min_items, max_items)
remain = max(0, k - len(inventory))
for _ in range(min(remain, len(pool))):
idx = random.choices(range(len(pool)), weights=weights, k=1)[0]
inventory.add(pool[idx])
return list(inventory)
def _weighted_pick3(indexes, scores, temperature=1.0):
idxs = list(indexes)
scs = np.array(scores, dtype=float)
if np.any(scs < 0):
scs = scs - scs.min()
if scs.sum() == 0:
scs = np.ones_like(scs)
picks = []
for _ in range(min(3, len(idxs))):
probs = np.exp(scs / max(temperature, 1e-6))
probs = probs / probs.sum()
choice = np.random.choice(len(idxs), p=probs)
picks.append(idxs[choice])
idxs.pop(choice)
scs = np.delete(scs, choice)
if len(idxs) == 0:
break
return picks
# ---------- Main cold-start ----------
# ---------- Main cold-start ----------
def cold_start_ranker(user_id: str,
n_rounds: int = 1000,
topn_coarse: int = 5000,
topk_rule: int = 3,
batch_size: int = 5000,
switch_interval: int = 100):
"""
Cold-start data generation for learning-to-rank.
Top-5 selection prioritizes user pantry coverage deterministically:
1. Fully covered recipes first (missing_count == 0)
2. Then few missing (esp. staple/other)
3. Heavy penalty for missing main ingredients.
"""
base_dir = os.path.join("recipe_recommendation", "user_data", user_id)
if not os.path.exists(base_dir):
base_dir = os.path.join("recipe_recommendation", "input_user_data", user_id)
if not os.path.exists(base_dir):
raise FileNotFoundError(
f"❌ User profile not found for '{user_id}' in either 'recipe_recommendation/user_data' or 'recipe_recommendation/input_user_data'."
)
print(f"[cold_start_ranker] Using base_dir = {base_dir}")
profile_path = os.path.join(base_dir, "user_profile.json")
features_path = os.path.join(base_dir, "user_features_rank.csv")
if os.path.exists(features_path):
print(f"[cold_start] Features already exist at {features_path}")
return features_path
with open(profile_path, "r", encoding="utf-8") as f:
user_profile = json.load(f)
# Load and parse recipes
df_all = pd.read_csv(RECIPES_PATH)
to_set = ["main_parent", "staple_parent", "other_parent", "seasoning_parent", "cuisine_attr"]
to_list = ["ingredients"]
for c in to_set:
if c in df_all.columns:
df_all[c] = df_all[c].apply(parse_set)
for c in to_list:
if c in df_all.columns:
df_all[c] = df_all[c].apply(parse_list)
# Step 1 hard filter
if hard_filter is not None:
try:
before = len(df_all)
mask = df_all.apply(lambda r: hard_filter(r.to_dict(), user_profile), axis=1)
df_all = df_all[mask]
after = len(df_all)
print(f"[cold_start] Step1 hard filter applied: {before} -> {after}")
except Exception as e:
warnings.warn(f"[cold_start] hard_filter failed, skip. err={e}")
n_chunks = (len(df_all) // batch_size) + 1
chunks = np.array_split(df_all, n_chunks)
parents_pool = _parents_pool_from_df(df_all)
rows = []
prev_inventory = None
for i in tqdm(range(n_rounds), desc="Cold-start rounds"):
chunk_id = (i // switch_interval) % n_chunks
df_chunk = chunks[chunk_id].copy()
# pantry sampling
user_parents = sample_user_parents(
parents_pool,
user_profile=user_profile,
prev_inventory=prev_inventory,
round_idx=i
)
prev_inventory = user_parents
# Step 2: coarse recall
coarse_list = coarse_rank_candidates(
recipes=df_chunk.to_dict(orient="records"),
user_parents=user_parents,
user_profile=user_profile,
top_n=min(topn_coarse, len(df_chunk))
)
if not coarse_list:
continue
coarse_df = pd.DataFrame(coarse_list)
# Step 3: rule rerank → Top-5 candidates (just for selecting the 5)
rule_df = rule_generate_candidates(
coarse_df,
user_parents=user_parents,
user_profile=user_profile
)
if rule_df.empty or len(rule_df) < topk_rule:
continue
top5 = rule_df.head(topk_rule).copy()
# ===== Deterministic scoring with feasibility + region + soft constraints =====
user_set = set(user_parents)
scored_candidates = []
# Nutrition goals (from profile)
ng = user_profile.get("nutritional_goals", {})
cal_min = ng.get("calories", {}).get("min", 0)
cal_max = ng.get("calories", {}).get("max", 1e9)
pro_min = ng.get("protein", {}).get("min", 0)
pro_max = ng.get("protein", {}).get("max", 1e9)
# Preferences
liked = set(user_profile.get("other_preferences", {}).get("preferred_main", []))
disliked = set(user_profile.get("other_preferences", {}).get("disliked_main", []))
max_cooking_time = user_profile.get("other_preferences", {}).get("cooking_time_max", None)
for idx, row in top5.iterrows():
main_set = set(row.get("main_parent", set()))
staple_set = set(row.get("staple_parent", set()))
other_set = set(row.get("other_parent", set()))
main_total = len(main_set)
staple_total = len(staple_set)
main_match = len(main_set & user_set)
staple_match = len(staple_set & user_set)
# === 1) Feasibility check ===
total_needed = max(1, main_total + staple_total)
total_have = main_match + staple_match
coverage_ratio = total_have / total_needed
if coverage_ratio < 0.5:
continue
# === 2) Region preference ===
region_score = 1.0 if row.get("region_match", 0) else 0.0
# === 3) Cooking time soft constraint ===
time_val = row.get("minutes", None)
time_score = 0.0
if max_cooking_time and time_val is not None:
try:
t_val = float(time_val)
t_max = float(max_cooking_time)
lower_bound = 0.8 * t_max
upper_bound = 1.2 * t_max
if lower_bound <= t_val <= upper_bound:
time_score = 1.0
else:
deviation = abs(t_val - t_max) / t_max
time_score = max(0.0, 1.0 - deviation)
except (TypeError, ValueError):
time_score = 0.0
else:
time_score = 1.0
# === 4) Calories soft constraint ===
cal_val = row.get("calories", None)
cal_score = 1.0
if cal_val is not None and cal_min < cal_max:
try:
c_val = float(cal_val)
cal_center = 0.5 * (cal_min + cal_max)
tol = 0.3 * cal_center
lower_bound = cal_center - tol
upper_bound = cal_center + tol
if lower_bound <= c_val <= upper_bound:
cal_score = 1.0
else:
deviation = abs(c_val - cal_center) / cal_center
cal_score = max(0.0, 1.0 - deviation)
except (TypeError, ValueError):
cal_score = 0.0
# === 4b) Protein soft constraint ===
protein_val = row.get("protein", None)
protein_score = 1.0
if protein_val is not None and pro_min < pro_max:
try:
p_val = float(protein_val)
pro_center = 0.5 * (pro_min + pro_max)
tol = 0.2 * pro_center
lower_bound = pro_center - tol
upper_bound = pro_center + tol
if lower_bound <= p_val <= upper_bound:
protein_score = 1.0
else:
deviation = abs(p_val - pro_center) / pro_center
protein_score = max(0.0, 1.0 - deviation)
except (TypeError, ValueError):
protein_score = 0.0
# === 5) Liked / Disliked main ===
like_bonus = 1.0 if main_set & liked else 0.0
dislike_penalty = 1.0 if main_set & disliked else 0.0
# === 6) Final scoring ===
score = (
0.5 * coverage_ratio +
0.15 * region_score +
0.1 * time_score +
0.1 * cal_score +
0.05 * protein_score +
0.05 * like_bonus -
0.05 * dislike_penalty
)
scored_candidates.append((idx, score))
# Sort and pick top3 for relevance
scored_candidates.sort(key=lambda x: x[1], reverse=True)
picked_idxs = [idx for idx, _ in scored_candidates[:3]]
# relevance labels 3 / 2 / 1
labels = {idx: 0 for idx in top5.index}
if len(picked_idxs) > 0:
labels[picked_idxs[0]] = 3
if len(picked_idxs) > 1:
labels[picked_idxs[1]] = 2
if len(picked_idxs) > 2:
labels[picked_idxs[2]] = 1
# build features for all 5 candidates
for idx, row in top5.iterrows():
up = set(user_parents)
main_set = set(row.get("main_parent", set()))
staple_set = set(row.get("staple_parent", set()))
other_set = set(row.get("other_parent", set()))
recipe_dict = {
"main": main_set,
"staple": staple_set,
"other": other_set,
"seasoning": set(row.get("seasoning_parent", set())),
"matched_main": len(main_set & up),
"matched_staple": len(staple_set & up),
"matched_other": len(other_set & up),
"calories": row.get("calories", 0),
"protein": row.get("protein", 0),
"fat": row.get("fat", 0),
"region": row.get("region", ""),
"cuisine_attr": row.get("cuisine_attr", []),
"ingredients": row.get("ingredients", []),
"minutes": row.get("minutes", None),
}
feats = build_features(recipe_dict, user_profile)
feats["relevance"] = float(labels[idx])
feats["qid"] = int(i)
rows.append(feats)
out = pd.DataFrame(rows)
if "qid" not in out.columns or out.empty:
print(f"[cold_start] No valid training data generated for {user_id}, skipping save.")
return None
valid_qids = out.groupby("qid").size()
keep_qids = valid_qids[valid_qids > 1].index
out = out[out["qid"].isin(keep_qids)].reset_index(drop=True)
os.makedirs(base_dir, exist_ok=True)
out_path = os.path.join(base_dir, "user_features_rank.csv")
out.to_csv(out_path, index=False)
print(f"[cold_start] Saved {len(out)} rows to {out_path}")
return out_path
if __name__ == "__main__":
cold_start_ranker(
user_id="user_1",
n_rounds=10000,
topn_coarse=20000,
topk_rule=5,
coverage_penalty=0.15,
temperature=0.5
)