import os import ast import json import random import pandas as pd import numpy as np from tqdm import tqdm import warnings from .candidate import coarse_rank_candidates, hard_filter, rule_generate_candidates from .feature import build_features from .io import load_recipes_csv, load_ingredient_map RECIPES_PATH = load_recipes_csv() INGREDIENT_MAP = load_ingredient_map() PARENTS = INGREDIENT_MAP["parents"] CHILDREN = INGREDIENT_MAP["children"] def parse_list(x): """Convert a stringified list into a Python list safely.""" if pd.isna(x) or x == "": return [] if isinstance(x, list): return x try: return ast.literal_eval(x) except Exception: return [] def parse_set(x): """Convert a stringified collection into a Python set safely.""" if pd.isna(x) or x == "": return set() if isinstance(x, set): return x if isinstance(x, (list, tuple)): return set(x) if isinstance(x, str): try: v = ast.literal_eval(x) if isinstance(v, (list, tuple, set)): return set(v) return {v} except Exception: return {x.strip()} return {x} def _parents_pool_from_df(df: pd.DataFrame): cols = ["main_parent", "staple_parent", "other_parent", "seasoning_parent"] pool = set() for c in cols: if c in df.columns: for s in df[c]: pool |= set(s) if isinstance(s, (set, list, tuple)) else set() return sorted(pool) def sample_user_parents(parents_pool, user_profile=None, prev_inventory=None, min_items=3, max_items=10, keep_ratio=0.6, reset_interval=20, round_idx=0): liked = set((user_profile or {}).get("other_preferences", {}).get("preferred_main", [])) disliked = set((user_profile or {}).get("other_preferences", {}).get("disliked_main", [])) forbidden = set((user_profile or {}).get("forbidden_parents", [])) | disliked pool, weights = [], [] for p in parents_pool: if p in forbidden: continue w = 3.0 if p in liked else 1.0 pool.append(p); weights.append(w) if not pool: pool, weights = parents_pool[:], [1.0] * len(parents_pool) inventory = set() force_reset = (round_idx % reset_interval == 0) if prev_inventory and not force_reset: prev_list = list(prev_inventory); random.shuffle(prev_list) keep_k = max(0, int(len(prev_list) * keep_ratio)) inventory |= set(prev_list[:keep_k]) k = random.randint(min_items, max_items) remain = max(0, k - len(inventory)) for _ in range(min(remain, len(pool))): idx = random.choices(range(len(pool)), weights=weights, k=1)[0] inventory.add(pool[idx]) return list(inventory) def _weighted_pick3(indexes, scores, temperature=1.0): idxs = list(indexes) scs = np.array(scores, dtype=float) if np.any(scs < 0): scs = scs - scs.min() if scs.sum() == 0: scs = np.ones_like(scs) picks = [] for _ in range(min(3, len(idxs))): probs = np.exp(scs / max(temperature, 1e-6)) probs = probs / probs.sum() choice = np.random.choice(len(idxs), p=probs) picks.append(idxs[choice]) idxs.pop(choice) scs = np.delete(scs, choice) if len(idxs) == 0: break return picks # ---------- Main cold-start ---------- # ---------- Main cold-start ---------- def cold_start_ranker(user_id: str, n_rounds: int = 1000, topn_coarse: int = 5000, topk_rule: int = 3, batch_size: int = 5000, switch_interval: int = 100): """ Cold-start data generation for learning-to-rank. Top-5 selection prioritizes user pantry coverage deterministically: 1. Fully covered recipes first (missing_count == 0) 2. Then few missing (esp. staple/other) 3. Heavy penalty for missing main ingredients. """ base_dir = os.path.join("recipe_recommendation", "user_data", user_id) if not os.path.exists(base_dir): base_dir = os.path.join("recipe_recommendation", "input_user_data", user_id) if not os.path.exists(base_dir): raise FileNotFoundError( f"❌ User profile not found for '{user_id}' in either 'recipe_recommendation/user_data' or 'recipe_recommendation/input_user_data'." ) print(f"[cold_start_ranker] Using base_dir = {base_dir}") profile_path = os.path.join(base_dir, "user_profile.json") features_path = os.path.join(base_dir, "user_features_rank.csv") if os.path.exists(features_path): print(f"[cold_start] Features already exist at {features_path}") return features_path with open(profile_path, "r", encoding="utf-8") as f: user_profile = json.load(f) # Load and parse recipes df_all = pd.read_csv(RECIPES_PATH) to_set = ["main_parent", "staple_parent", "other_parent", "seasoning_parent", "cuisine_attr"] to_list = ["ingredients"] for c in to_set: if c in df_all.columns: df_all[c] = df_all[c].apply(parse_set) for c in to_list: if c in df_all.columns: df_all[c] = df_all[c].apply(parse_list) # Step 1 hard filter if hard_filter is not None: try: before = len(df_all) mask = df_all.apply(lambda r: hard_filter(r.to_dict(), user_profile), axis=1) df_all = df_all[mask] after = len(df_all) print(f"[cold_start] Step1 hard filter applied: {before} -> {after}") except Exception as e: warnings.warn(f"[cold_start] hard_filter failed, skip. err={e}") n_chunks = (len(df_all) // batch_size) + 1 chunks = np.array_split(df_all, n_chunks) parents_pool = _parents_pool_from_df(df_all) rows = [] prev_inventory = None for i in tqdm(range(n_rounds), desc="Cold-start rounds"): chunk_id = (i // switch_interval) % n_chunks df_chunk = chunks[chunk_id].copy() # pantry sampling user_parents = sample_user_parents( parents_pool, user_profile=user_profile, prev_inventory=prev_inventory, round_idx=i ) prev_inventory = user_parents # Step 2: coarse recall coarse_list = coarse_rank_candidates( recipes=df_chunk.to_dict(orient="records"), user_parents=user_parents, user_profile=user_profile, top_n=min(topn_coarse, len(df_chunk)) ) if not coarse_list: continue coarse_df = pd.DataFrame(coarse_list) # Step 3: rule rerank → Top-5 candidates (just for selecting the 5) rule_df = rule_generate_candidates( coarse_df, user_parents=user_parents, user_profile=user_profile ) if rule_df.empty or len(rule_df) < topk_rule: continue top5 = rule_df.head(topk_rule).copy() # ===== Deterministic scoring with feasibility + region + soft constraints ===== user_set = set(user_parents) scored_candidates = [] # Nutrition goals (from profile) ng = user_profile.get("nutritional_goals", {}) cal_min = ng.get("calories", {}).get("min", 0) cal_max = ng.get("calories", {}).get("max", 1e9) pro_min = ng.get("protein", {}).get("min", 0) pro_max = ng.get("protein", {}).get("max", 1e9) # Preferences liked = set(user_profile.get("other_preferences", {}).get("preferred_main", [])) disliked = set(user_profile.get("other_preferences", {}).get("disliked_main", [])) max_cooking_time = user_profile.get("other_preferences", {}).get("cooking_time_max", None) for idx, row in top5.iterrows(): main_set = set(row.get("main_parent", set())) staple_set = set(row.get("staple_parent", set())) other_set = set(row.get("other_parent", set())) main_total = len(main_set) staple_total = len(staple_set) main_match = len(main_set & user_set) staple_match = len(staple_set & user_set) # === 1) Feasibility check === total_needed = max(1, main_total + staple_total) total_have = main_match + staple_match coverage_ratio = total_have / total_needed if coverage_ratio < 0.5: continue # === 2) Region preference === region_score = 1.0 if row.get("region_match", 0) else 0.0 # === 3) Cooking time soft constraint === time_val = row.get("minutes", None) time_score = 0.0 if max_cooking_time and time_val is not None: try: t_val = float(time_val) t_max = float(max_cooking_time) lower_bound = 0.8 * t_max upper_bound = 1.2 * t_max if lower_bound <= t_val <= upper_bound: time_score = 1.0 else: deviation = abs(t_val - t_max) / t_max time_score = max(0.0, 1.0 - deviation) except (TypeError, ValueError): time_score = 0.0 else: time_score = 1.0 # === 4) Calories soft constraint === cal_val = row.get("calories", None) cal_score = 1.0 if cal_val is not None and cal_min < cal_max: try: c_val = float(cal_val) cal_center = 0.5 * (cal_min + cal_max) tol = 0.3 * cal_center lower_bound = cal_center - tol upper_bound = cal_center + tol if lower_bound <= c_val <= upper_bound: cal_score = 1.0 else: deviation = abs(c_val - cal_center) / cal_center cal_score = max(0.0, 1.0 - deviation) except (TypeError, ValueError): cal_score = 0.0 # === 4b) Protein soft constraint === protein_val = row.get("protein", None) protein_score = 1.0 if protein_val is not None and pro_min < pro_max: try: p_val = float(protein_val) pro_center = 0.5 * (pro_min + pro_max) tol = 0.2 * pro_center lower_bound = pro_center - tol upper_bound = pro_center + tol if lower_bound <= p_val <= upper_bound: protein_score = 1.0 else: deviation = abs(p_val - pro_center) / pro_center protein_score = max(0.0, 1.0 - deviation) except (TypeError, ValueError): protein_score = 0.0 # === 5) Liked / Disliked main === like_bonus = 1.0 if main_set & liked else 0.0 dislike_penalty = 1.0 if main_set & disliked else 0.0 # === 6) Final scoring === score = ( 0.5 * coverage_ratio + 0.15 * region_score + 0.1 * time_score + 0.1 * cal_score + 0.05 * protein_score + 0.05 * like_bonus - 0.05 * dislike_penalty ) scored_candidates.append((idx, score)) # Sort and pick top3 for relevance scored_candidates.sort(key=lambda x: x[1], reverse=True) picked_idxs = [idx for idx, _ in scored_candidates[:3]] # relevance labels 3 / 2 / 1 labels = {idx: 0 for idx in top5.index} if len(picked_idxs) > 0: labels[picked_idxs[0]] = 3 if len(picked_idxs) > 1: labels[picked_idxs[1]] = 2 if len(picked_idxs) > 2: labels[picked_idxs[2]] = 1 # build features for all 5 candidates for idx, row in top5.iterrows(): up = set(user_parents) main_set = set(row.get("main_parent", set())) staple_set = set(row.get("staple_parent", set())) other_set = set(row.get("other_parent", set())) recipe_dict = { "main": main_set, "staple": staple_set, "other": other_set, "seasoning": set(row.get("seasoning_parent", set())), "matched_main": len(main_set & up), "matched_staple": len(staple_set & up), "matched_other": len(other_set & up), "calories": row.get("calories", 0), "protein": row.get("protein", 0), "fat": row.get("fat", 0), "region": row.get("region", ""), "cuisine_attr": row.get("cuisine_attr", []), "ingredients": row.get("ingredients", []), "minutes": row.get("minutes", None), } feats = build_features(recipe_dict, user_profile) feats["relevance"] = float(labels[idx]) feats["qid"] = int(i) rows.append(feats) out = pd.DataFrame(rows) if "qid" not in out.columns or out.empty: print(f"[cold_start] No valid training data generated for {user_id}, skipping save.") return None valid_qids = out.groupby("qid").size() keep_qids = valid_qids[valid_qids > 1].index out = out[out["qid"].isin(keep_qids)].reset_index(drop=True) os.makedirs(base_dir, exist_ok=True) out_path = os.path.join(base_dir, "user_features_rank.csv") out.to_csv(out_path, index=False) print(f"[cold_start] Saved {len(out)} rows to {out_path}") return out_path if __name__ == "__main__": cold_start_ranker( user_id="user_1", n_rounds=10000, topn_coarse=20000, topk_rule=5, coverage_penalty=0.15, temperature=0.5 )