diff --git "a/app.py" "b/app.py" new file mode 100644--- /dev/null +++ "b/app.py" @@ -0,0 +1,3726 @@ +""" +app.py — Wardrobe Assistant API (v2) + +Changes from v1: + - _item_store (in-memory list) replaced by SQLite via db.py + - All outfit scoring routed through scoring.py (strategic v2 model) + - _gap_suggestions now analyses the actual wardrobe instead of hardcoding items + - /feedback endpoint added for preference data collection + - Score responses include human-readable reason + tip from scoring.py +""" + +from __future__ import annotations + +import base64 +import io +import json +import os +import re +import tempfile +import threading +import traceback +import time +import uuid +import html as html_lib +from dataclasses import replace +from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, NoReturn +from urllib.error import HTTPError +from urllib.parse import parse_qs, quote_plus, unquote, urlencode, urljoin, urlparse +from urllib.request import Request, urlopen + +import requests +from fastapi import Body, FastAPI, File, HTTPException, Query, Response, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from PIL import Image, ImageDraw, ImageFont, ImageOps + +from db import ( + init_db, + item_insert, + item_get_all, + item_get, + item_update, + item_delete, + feedback_record, + cache_get, + cache_set, + cache_purge_expired, +) + +from scoring import ( + score_pair_full, + extract_base_color, + extract_style, + extract_fit, + extract_pattern, + extract_season, + _NEUTRALS, +) +from fashion_ai import get_recommendation_service +from scraper import Recommendation as ScraperRecommendation +from scraper import ( + build_search_urls_from_query as build_nike_search_urls_from_query, + build_search_urls_from_recommendation as build_nike_search_urls_from_recommendation, + extract_product_summaries as extract_nike_product_summaries, +) +from zalando_scraper import ( + build_zalando_search_url, + build_zalando_search_urls_from_request, + extract_product_summaries as extract_zalando_product_summaries, +) + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _norm(value: Any) -> str: + return str(value or "").strip().lower() + + +MATCHING_RESULT_CACHE: dict[str, dict[str, Any]] = {} +MATCHING_RESULT_CACHE_LOCK = threading.Lock() +MATCHING_RESULT_CACHE_MAX = int(os.getenv("MATCHING_RESULT_CACHE_MAX", "500")) +MATCHING_RESULT_CACHE_TTL_SECONDS = int(os.getenv("MATCHING_RESULT_CACHE_TTL_SECONDS", "86400")) + + +def _matching_cache_storage_key(key: str) -> str: + return f"matching:{key}" + + +def _normalize_cache_category(value: Any) -> str: + category = _norm(value) + if category in {"topwear", "bottomwear", "others"}: + return category + return "" + + +def _extract_cache_user_id(payload: dict[str, Any], wardrobe_items: list[dict[str, Any]]) -> str: + payload_user = str(payload.get("user_id") or "").strip() + if payload_user: + return payload_user + + for item in wardrobe_items: + candidate = str(item.get("user_id") or "").strip() + if candidate: + return candidate + + return "anonymous" + + +def _build_matching_cache_key( + user_id: str, + category: str, + occasion: str, + wardrobe_hash: str, + lock_signature: str = "", +) -> str: + return "|".join([ + user_id.strip() or "anonymous", + _normalize_cache_category(category), + _norm(occasion) or "casual", + wardrobe_hash.strip(), + lock_signature.strip(), + ]) + + +def _matching_cache_get(key: str) -> dict[str, Any] | None: + with MATCHING_RESULT_CACHE_LOCK: + cached = MATCHING_RESULT_CACHE.get(key) + if isinstance(cached, dict): + return json.loads(json.dumps(cached)) + + try: + persisted = cache_get(_matching_cache_storage_key(key)) + except Exception as exc: + print(f"[matching-cache] db read failed key={key} reason={exc!r}") + return None + + if not isinstance(persisted, dict): + return None + + snapshot = json.loads(json.dumps(persisted)) + with MATCHING_RESULT_CACHE_LOCK: + MATCHING_RESULT_CACHE[key] = snapshot + while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX: + oldest_key = next(iter(MATCHING_RESULT_CACHE)) + MATCHING_RESULT_CACHE.pop(oldest_key, None) + + return json.loads(json.dumps(snapshot)) + + +def _matching_cache_set(key: str, payload: dict[str, Any]) -> None: + snapshot = json.loads(json.dumps(payload)) + + with MATCHING_RESULT_CACHE_LOCK: + MATCHING_RESULT_CACHE[key] = snapshot + while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX: + oldest_key = next(iter(MATCHING_RESULT_CACHE)) + MATCHING_RESULT_CACHE.pop(oldest_key, None) + + try: + cache_set( + _matching_cache_storage_key(key), + snapshot, + ttl_seconds=MATCHING_RESULT_CACHE_TTL_SECONDS, + ) + except Exception as exc: + print(f"[matching-cache] db write failed key={key} reason={exc!r}") + + +def _build_lock_signature_from_payload(payload: dict[str, Any]) -> str: + explicit = str(payload.get("lock_signature") or "").strip() + if explicit: + return explicit + + top_selected_raw = payload.get("top_selected") + bottom_selected_raw = payload.get("bottom_selected") + other_selected_raw = payload.get("other_selected") + + top_id = str(top_selected_raw.get("id") or "").strip() if isinstance(top_selected_raw, dict) else "" + bottom_id = str(bottom_selected_raw.get("id") or "").strip() if isinstance(bottom_selected_raw, dict) else "" + other_id = str(other_selected_raw.get("id") or "").strip() if isinstance(other_selected_raw, dict) else "" + + if not top_id and not bottom_id and not other_id: + return "" + + return f"top:{top_id or '-'}|bottom:{bottom_id or '-'}|other:{other_id or '-'}" + + +def _infer_type(category: str) -> str: + n = _norm(category) + if any(keyword in n for keyword in ["shirt", "tee", "top", "kurta", "blouse", "hoodie", "sweater", "blazer", "jacket", "polo"]): + return "topwear" + if any(keyword in n for keyword in ["jean", "pant", "trouser", "short", "skirt", "jogger", "palazzo", "chino"]): + return "bottomwear" + return "others" + + +def _normalize_wardrobe_item(item: dict[str, Any]) -> dict[str, Any]: + description = item.get("description") if isinstance(item.get("description"), dict) else {} + category = str(item.get("category") or description.get("category") or description.get("type") or "Unknown") + item_type = str(item.get("type") or description.get("type") or _infer_type(category)) + return { + "id": item.get("id") or str(uuid.uuid4()), + "image_url": item.get("image_url") or "", + "type": item_type, + "category": category, + "color": str(item.get("color") or description.get("color") or "Unknown"), + "pattern": str(item.get("pattern") or description.get("pattern") or "Solid"), + "fabric": str(item.get("fabric") or description.get("fabric") or "Unknown"), + "fit": str(item.get("fit") or description.get("fit") or "Unknown"), + "season": str(item.get("season") or description.get("season") or "All-Season"), + "style": str(item.get("style") or description.get("occasion") or description.get("style") or "casual"), + "occasion": str(item.get("occasion") or description.get("occasion") or "casual"), + "description": description, + } + + +def _build_outfit_payload(scored: dict[str, Any], top: dict[str, Any] | None, bottom: dict[str, Any] | None, rank: int, other: dict[str, Any] | None) -> dict[str, Any]: + payload = { + **scored, + "rank": rank, + "top": top, + "bottom": bottom, + } + if other is not None: + payload["other"] = other + return payload + + +def _gap_suggestions(wardrobe: list[dict[str, Any]], occasion: str) -> list[dict[str, Any]]: + tops = sum(1 for item in wardrobe if _norm(item.get("type")) == "topwear") + bottoms = sum(1 for item in wardrobe if _norm(item.get("type")) == "bottomwear") + suggestions: list[dict[str, Any]] = [] + if tops == 0: + suggestions.append({"focus": "topwear", "suggestion": "Add a versatile topwear staple", "reason": "No topwear items found."}) + if bottoms == 0: + suggestions.append({"focus": "bottomwear", "suggestion": "Add a versatile bottomwear staple", "reason": "No bottomwear items found."}) + if tops and bottoms and abs(tops - bottoms) > 2: + suggestions.append({"focus": "balance", "suggestion": "Balance your wardrobe mix", "reason": "One category is much larger than the other."}) + if not suggestions: + suggestions.append({"focus": "versatility", "suggestion": f"Add one {occasion} piece that can mix with existing staples", "reason": "Wardrobe is already balanced."}) + return suggestions[:4] + + +SCRAPER_OUTPUT_DIR = Path(__file__).resolve().parent / "scraped_json" +SCRAPER_RUNTIME_RESULTS: dict[str, dict[str, Any]] = {} +SCRAPER_RUNTIME_LOCK = threading.Lock() +KIMI_MODEL_ID = os.getenv("KIMI_MODEL_ID", "moonshotai/kimi-k2.5") +KIMI_MAX_TOKENS = int(os.getenv("KIMI_MAX_TOKENS", "800")) +SCRAPER_DEFAULT_STORE = str(os.getenv("SCRAPER_DEFAULT_STORE", "nike")).strip().lower() + + +def _save_scraper_json_payload(prefix: str, payload: dict[str, Any]) -> str: + SCRAPER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + file_path = SCRAPER_OUTPUT_DIR / f"{prefix}_{timestamp}.json" + with file_path.open("w", encoding="utf-8") as handle: + json.dump(payload, handle, ensure_ascii=True, indent=2) + return str(file_path) + + +def _store_scraper_runtime_result(payload: dict[str, Any]) -> dict[str, Any]: + runtime_id = str(uuid.uuid4()) + record = { + "runtime_id": runtime_id, + "created_at": _now_iso(), + **payload, + } + with SCRAPER_RUNTIME_LOCK: + SCRAPER_RUNTIME_RESULTS[runtime_id] = record + while len(SCRAPER_RUNTIME_RESULTS) > 25: + oldest_key = next(iter(SCRAPER_RUNTIME_RESULTS)) + SCRAPER_RUNTIME_RESULTS.pop(oldest_key, None) + return record + + +def _candidate_text_model_ids(primary_model_id: str) -> list[str]: + return _candidate_model_ids(primary_model_id) + + +def _run_text_inference_with_model(primary_model_id: str, prompt: str, max_tokens: int) -> str: + api_key = _nvidia_api_key() + if not api_key: + raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) + + last_error: Exception | None = None + for model_id in _candidate_text_model_ids(primary_model_id): + current_max_tokens = max_tokens + while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: + payload = { + "model": model_id, + "messages": [ + { + "role": "user", + "content": prompt, + } + ], + "max_tokens": current_max_tokens, + "temperature": NVIDIA_TEMPERATURE, + "top_p": NVIDIA_TOP_P, + "stream": True, + "include_reasoning": False, + "chat_template_kwargs": {"enable_thinking": NVIDIA_ENABLE_THINKING}, + } + headers = { + "Authorization": f"Bearer {api_key}", + "Accept": "text/event-stream", + "Content-Type": "application/json", + } + + last_error = None + for attempt in range(NVIDIA_MAX_RETRIES + 1): + try: + response = requests.post( + NVIDIA_INVOKE_URL, + headers=headers, + json=payload, + timeout=NVIDIA_TIMEOUT_SECONDS, + ) + if response.status_code in {429, 500, 502, 503, 504}: + raise NvidiaGatewayError( + f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", + status_code=503 if response.status_code == 429 else 502, + ) + if response.status_code >= 400: + raise NvidiaGatewayError( + f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", + status_code=502, + ) + try: + return _extract_streamed_nvidia_text(response) + except NvidiaPayloadError as stream_exc: + # Some providers occasionally terminate SSE without final text; retry once using non-stream mode. + if "stream ended without returning any content" not in str(stream_exc).lower(): + raise + + non_stream_payload = {**payload, "stream": False} + non_stream_headers = { + "Authorization": f"Bearer {api_key}", + "Accept": "application/json", + "Content-Type": "application/json", + } + non_stream_response = requests.post( + NVIDIA_INVOKE_URL, + headers=non_stream_headers, + json=non_stream_payload, + timeout=NVIDIA_TIMEOUT_SECONDS, + ) + if non_stream_response.status_code in {429, 500, 502, 503, 504}: + raise NvidiaGatewayError( + f"NVIDIA API transient failure {non_stream_response.status_code}: {non_stream_response.text[:500]}", + status_code=503 if non_stream_response.status_code == 429 else 502, + ) + if non_stream_response.status_code >= 400: + raise NvidiaGatewayError( + f"NVIDIA API request failed with {non_stream_response.status_code}: {non_stream_response.text[:500]}", + status_code=502, + ) + try: + parsed_payload = non_stream_response.json() + except ValueError as exc: + raise NvidiaPayloadError( + "NVIDIA non-stream planner response was not valid JSON." + ) from exc + return _extract_nvidia_text(parsed_payload) + except NvidiaTokenLimitError as exc: + last_error = exc + break + except (requests.RequestException, NvidiaGatewayError) as exc: + last_error = exc + if _is_degraded_function_error(exc): + break + if attempt >= NVIDIA_MAX_RETRIES: + break + time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) + except NvidiaPayloadError: + raise + + if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: + current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) + continue + if _is_degraded_function_error(last_error or Exception()): + print(f"[nvidia] model degraded, trying fallback model: {model_id}") + break + if isinstance(last_error, NvidiaGatewayError): + raise last_error + if isinstance(last_error, requests.RequestException): + raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error + if last_error is not None: + raise last_error + break + + if isinstance(last_error, Exception): + raise NvidiaGatewayError( + f"NVIDIA API request failed on all configured models ({', '.join(_candidate_text_model_ids(primary_model_id))}): {last_error}", + status_code=502, + ) from last_error + raise NvidiaGatewayError( + "NVIDIA API request failed after exhausting reasoning token budget.", + status_code=502, + ) + + +def run_kimi_text_inference(prompt: str, max_tokens: int = KIMI_MAX_TOKENS) -> str: + return _run_text_inference_with_model(KIMI_MODEL_ID, prompt, max_tokens) + + +def _normalize_store_name(value: str | None) -> str: + normalized = _norm(value) + if normalized in {"zalando", "nike"}: + return normalized + return "nike" + + +def _build_store_query_from_recommendation(recommendation: ScraperRecommendation, occasion: str = "") -> str: + parts = [ + str(recommendation.gender or "").strip(), + str(recommendation.color or "").strip(), + str(recommendation.category or "").strip(), + str(occasion or "").strip(), + ] + return " ".join(part for part in parts if part) + + + + +def _build_store_search_urls_from_recommendation( + recommendation: ScraperRecommendation, + store: str, + occasion: str = "", +) -> list[str]: + store_key = _normalize_store_name(store) + if store_key == "zalando": + query = _build_store_query_from_recommendation(recommendation, occasion=occasion) + return [build_zalando_search_url(query=query, gender=recommendation.gender)] + return build_nike_search_urls_from_recommendation(recommendation, store=store_key) + + +def _build_store_search_urls_from_query( + query: str, + store: str, + gender: str | None = None, + wardrobe_items: list[dict[str, Any]] | None = None, + requested_category: str | None = None, +) -> list[str]: + store_key = _normalize_store_name(store) + if store_key == "zalando": + search_urls, _ = build_zalando_search_urls_from_request( + query=query, + gender=gender, + wardrobe_items=wardrobe_items, + requested_category=requested_category, + # URL generation should follow Kimi planner output + deterministic rules. + # GPT OSS remains reserved for post-scrape cleanup only. + completion_fn=None, + ) + return search_urls + return build_nike_search_urls_from_query(query=query, store=store_key, gender=gender) + + +def _extract_store_product_summaries(search_url: str, store: str) -> list[dict[str, str]]: + store_key = _normalize_store_name(store) + if store_key == "zalando": + return extract_zalando_product_summaries( + search_url=search_url, + max_products=None, + use_apify=True, + postprocess=None, + ) + return extract_nike_product_summaries(search_url, store=store_key) + + +def _wardrobe_metadata_snapshot(limit: int = 30) -> dict[str, Any]: + wardrobe = [_normalize_wardrobe_item(item) for item in item_get_all()] + counts: dict[str, int] = {} + for item in wardrobe: + key = f"{_norm(item.get('type') or 'unknown')}|{_norm(item.get('occasion') or 'unknown')}" + counts[key] = counts.get(key, 0) + 1 + return { + "total_items": len(wardrobe), + "items": [ + { + "id": item.get("id"), + "type": item.get("type"), + "category": item.get("category"), + "color": item.get("color"), + "pattern": item.get("pattern"), + "fit": item.get("fit"), + "season": item.get("season"), + "style": item.get("style"), + "occasion": item.get("occasion"), + } + for item in wardrobe[:limit] + ], + "counts": counts, + } + + +def _build_scraper_plan_prompt( + occasion: str, + gender: str, + preferences: str, + user_prompt: str, + target_category: str, + filters: dict[str, Any], + wardrobe_snapshot: dict[str, Any], + planning_context: dict[str, Any], + max_products: int | None, + store: str, +) -> str: + return ( + "ROLE: Senior Fashion Merchandising Strategist & Query Planner for AI Wardrobe Assistant\n\n" + f"OBJECTIVE: Generate exactly one high-precision {store.title()} shopping plan that is context-safe, occasion-safe, and wardrobe-grounded.\n\n" + "---\n\n" + "INPUTS:\n\n" + f"user_request: \"{user_prompt}\"\n" + f"occasion: \"{occasion}\"\n" + f"target_category: \"{target_category}\" // strict slot constraint\n" + f"gender: \"{gender}\"\n" + f"preferences: \"{preferences}\"\n" + f"filters: {json.dumps(filters, ensure_ascii=True)}\n" + f"max_products: {max_products if isinstance(max_products, int) and max_products > 0 else 'uncapped'}\n" + f"planning_context: {json.dumps(planning_context, ensure_ascii=True)}\n" + f"wardrobe_snapshot: {json.dumps(wardrobe_snapshot, ensure_ascii=True)}\n\n" + "---\n\n" + "EXECUTION RULES (Hard Constraints):\n\n" + "1. SLOT LOCK: target_category is immutable. If \"topwear\", NEVER output bottomwear categories (pants, shorts, joggers) and vice versa.\n\n" + "2. OCCASION VETTING: For formal | interview | work | wedding | client_meeting:\n" + " BLOCK: hoodie, sweatshirt, joggers, shorts, tank, tights, leggings, crop-top, sports-bra-as-outerwear\n" + " ALLOW: shirt, polo, blazer_outerwear, sweater, sweatshirt_structured (only if minimal branding)\n\n" + "3. PRIORITY HIERARCHY:\n" + " First: allowed_categories from planning_context\n" + " Second: color_shortlist from planning_context\n" + " Third: reference_item_ids from planning_context (anchor reasoning here)\n\n" + "4. COLOR LOGIC: Select ONE color from color_shortlist that:\n" + " - Complements majority of wardrobe_snapshot bottoms\n" + " - Suits occasion formality\n" + " - Has highest non-conflict score with existing wardrobe\n\n" + "5. If planning_context.color_resonance_scores exists, prefer the highest-scoring color unless user filters explicitly override it.\n\n" + "---\n\n" + "REASONING FRAMEWORK:\n\n" + "Step 1 — Parse wardrobe_snapshot for color frequency, gap categories, and bottom-dominant hues\n" + "Step 2 — Cross-reference planning_context.reference_items for silhouette compatibility\n" + "Step 3 — Filter through occasion veto list\n" + "Step 4 — Select optimal {category, color} pair with highest interoperability score\n" + "Step 5 — Compose commerce-optimized query: gender + color + occasion + category\n\n" + "---\n\n" + "OUTPUT SCHEMA (strict JSON, no markdown, no extra keys):\n\n" + "{\n" + ' "target_category": "topwear|bottomwear",\n' + ' "color": "string from color_shortlist",\n' + ' "category": "string from allowed_categories post-vetting",\n' + ' "gender": "men|women|unisex",\n' + ' "style_direction": "formal-smart|business-casual|casual-polished|etc",\n' + ' "reference_item_ids": ["array from planning_context"],\n' + ' "query": "commerce-ready search string",\n' + ' "wardrobe_grounding": "specific evidence from wardrobe_snapshot",\n' + ' "reason": "concise strategic justification"\n' + "}\n" + ) + + +def _recover_scraper_plan_from_text( + model_text: str, + planning_context: dict[str, Any], + occasion: str, + gender: str, +) -> dict[str, Any]: + text = str(model_text or "").strip() + if not text: + return {} + + parsed = parse_json_from_text(text) + if isinstance(parsed, dict) and parsed: + return parsed + + allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()] + colors = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] + + lowered = text.lower() + + category = "" + for candidate in allowed: + if candidate.lower() in lowered: + category = candidate + break + if not category and allowed: + category = allowed[0] + + color = "" + for candidate in colors: + if candidate.lower() in lowered: + color = candidate + break + if not color and colors: + color = colors[0] + + inferred_gender = _normalize_scraper_gender(gender) + if "women" in lowered: + inferred_gender = "women" + elif "men" in lowered: + inferred_gender = "men" + elif "unisex" in lowered: + inferred_gender = "unisex" + + query = "" + query_match = re.search(r'"query"\s*:\s*"([^"]+)"', text, flags=re.IGNORECASE) + if query_match: + query = query_match.group(1).strip() + if not query and category and color: + query = _build_planned_query( + inferred_gender, + color, + category, + occasion, + str(planning_context.get("style_direction") or "occasion-aligned"), + ) + + if not category or not color or not query: + return {} + + return { + "target_category": planning_context.get("resolved_target_category", "topwear"), + "color": color, + "category": category, + "gender": inferred_gender, + "style_direction": planning_context.get("style_direction", "occasion-aligned"), + "reference_item_ids": planning_context.get("reference_item_ids", []), + "query": query, + "reason": "Recovered Kimi planner output from semi-structured response.", + "source": "kimi", + } + + +def _normalize_scraper_gender(value: str | None) -> str | None: + normalized = str(value or "").strip().lower() + if normalized in {"men", "male", "man", "mens"}: + return "men" + if normalized in {"women", "female", "woman", "womens"}: + return "women" + if normalized in {"unisex", "any", "all"}: + return "unisex" + return None + + +def _normalize_target_category(value: Any) -> str: + normalized = _norm(value) + if normalized in {"topwear", "top", "upper", "tops"}: + return "topwear" + if normalized in {"bottomwear", "bottom", "lower", "bottoms"}: + return "bottomwear" + return "both" + + +_PROMPT_TO_TARGET_HINTS = { + "topwear": { + "top", "topwear", "shirt", "blazer", "jacket", "polo", "tee", "t-shirt", "kurta", "upper", + }, + "bottomwear": { + "bottom", "bottomwear", "trouser", "trousers", "pants", "jeans", "shorts", "joggers", "lower", + }, +} + +_PROMPT_TO_OCCASION_HINTS: dict[str, set[str]] = { + "formal": {"formal", "interview", "office", "work", "business", "meeting", "wedding"}, + "party": {"party", "festive", "diwali", "celebration", "date", "ethnic"}, + "sports": {"sports", "gym", "workout", "training", "running", "run", "active"}, + "casual": {"casual", "daily", "everyday", "weekend", "outing"}, +} + +_PROMPT_COLOR_TERMS = [ + "black", "white", "navy", "blue", "grey", "gray", "beige", "olive", "green", "brown", + "khaki", "cream", "maroon", "charcoal", "tan", +] + + +def _infer_structured_request_from_prompt(user_prompt: str) -> dict[str, Any]: + normalized = _norm(user_prompt) + if not normalized: + return { + "target_category": "both", + "occasion": "", + "gender": "", + "preferred_colors": [], + "include_keywords": [], + "exclude_keywords": [], + } + + target_category = "both" + top_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["topwear"] if token in normalized) + bottom_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["bottomwear"] if token in normalized) + if top_hits > bottom_hits and top_hits > 0: + target_category = "topwear" + elif bottom_hits > top_hits and bottom_hits > 0: + target_category = "bottomwear" + + occasion = "" + for bucket, tokens in _PROMPT_TO_OCCASION_HINTS.items(): + if any(token in normalized for token in tokens): + occasion = bucket + break + + gender = "" + if any(token in normalized for token in {" men", "male", "man", " mens"}): + gender = "men" + elif any(token in normalized for token in {" women", "female", "woman", " womens"}): + gender = "women" + elif "unisex" in normalized: + gender = "unisex" + + preferred_colors: list[str] = [] + for color in _PROMPT_COLOR_TERMS: + if color in normalized and color not in preferred_colors: + preferred_colors.append(color) + + include_keywords: list[str] = [] + for keyword in ["formal", "structured", "minimal", "smart", "elegant", "tailored"]: + if keyword in normalized and keyword not in include_keywords: + include_keywords.append(keyword) + + exclude_keywords: list[str] = [] + for keyword in ["hoodie", "oversized", "ripped", "distressed", "athleisure"]: + if f"avoid {keyword}" in normalized or f"no {keyword}" in normalized or f"without {keyword}" in normalized: + exclude_keywords.append(keyword) + + return { + "target_category": target_category, + "occasion": occasion, + "gender": gender, + "preferred_colors": preferred_colors, + "include_keywords": include_keywords, + "exclude_keywords": exclude_keywords, + } + + +def _occasion_bucket(value: str) -> str: + normalized = _norm(value) + if any(token in normalized for token in {"formal", "interview", "office", "work", "business", "wedding", "meeting"}): + return "formal" + if any(token in normalized for token in {"party", "festive", "diwali", "celebration", "ethnic", "date"}): + return "party" + if any(token in normalized for token in {"sports", "gym", "active", "training", "run", "running"}): + return "sports" + return "casual" + + +def _top_terms(values: list[str], limit: int = 6) -> list[str]: + counts: dict[str, int] = {} + for value in values: + key = _norm(value) + if not key or key == "unknown": + continue + counts[key] = counts.get(key, 0) + 1 + return [key for key, _ in sorted(counts.items(), key=lambda pair: pair[1], reverse=True)[:limit]] + + +def _rank_color_resonance( + slot_colors: dict[str, list[str]], + reference_slot: str, + preferred_colors: list[str], + occasion_bucket: str, +) -> list[dict[str, Any]]: + reference_counts: dict[str, int] = {} + global_counts: dict[str, int] = {} + + for raw_color in slot_colors.get(reference_slot, []): + normalized = extract_base_color(raw_color or "") + if not normalized or normalized == "unknown": + continue + reference_counts[normalized] = reference_counts.get(normalized, 0) + 1 + + for raw_color in slot_colors.get("topwear", []) + slot_colors.get("bottomwear", []): + normalized = extract_base_color(raw_color or "") + if not normalized or normalized == "unknown": + continue + global_counts[normalized] = global_counts.get(normalized, 0) + 1 + + preferred_set = { + extract_base_color(value or "") + for value in preferred_colors + if extract_base_color(value or "") + } + + neutral_colors = ["navy", "black", "white", "grey", "beige"] + candidate_pool: list[str] = [] + for candidate in [ + *preferred_colors, + *[key for key, _ in sorted(reference_counts.items(), key=lambda pair: pair[1], reverse=True)], + *[key for key, _ in sorted(global_counts.items(), key=lambda pair: pair[1], reverse=True)], + *neutral_colors, + ]: + normalized = extract_base_color(candidate or "") + if not normalized or normalized == "unknown" or normalized in candidate_pool: + continue + candidate_pool.append(normalized) + + formal_boost_colors = {"navy", "black", "white", "grey", "charcoal", "beige"} + sports_boost_colors = {"black", "white", "grey", "navy", "blue", "red", "green"} + + ranked: list[dict[str, Any]] = [] + for color in candidate_pool: + reference_count = reference_counts.get(color, 0) + global_count = global_counts.get(color, 0) + preferred_bonus = 2 if color in preferred_set else 0 + occasion_bonus = 0 + if occasion_bucket == "formal" and color in formal_boost_colors: + occasion_bonus = 1 + elif occasion_bucket == "sports" and color in sports_boost_colors: + occasion_bonus = 1 + + score = (reference_count * 3) + global_count + preferred_bonus + occasion_bonus + ranked.append( + { + "color": color, + "score": score, + "reference_count": reference_count, + "global_count": global_count, + "preferred": color in preferred_set, + } + ) + + ranked.sort( + key=lambda item: ( + int(item.get("score") or 0), + int(item.get("reference_count") or 0), + int(item.get("global_count") or 0), + ), + reverse=True, + ) + return ranked + + +SCRAPER_CATEGORY_POLICY: dict[str, dict[str, list[str]]] = { + "topwear": { + "formal": ["shirt", "polo", "jacket"], + "party": ["shirt", "jacket", "polo"], + "sports": ["jersey", "t-shirt", "hoodie"], + "casual": ["shirt", "t-shirt", "polo", "jacket", "hoodie"], + }, + "bottomwear": { + "formal": ["trousers", "pants"], + "party": ["trousers", "pants", "jeans"], + "sports": ["joggers", "shorts", "tights", "leggings"], + "casual": ["jeans", "pants", "shorts", "joggers", "trousers"], + }, +} + +SCRAPER_FORMAL_DISALLOWED = { + "hoodie", "sweatshirt", "joggers", "shorts", "tank top", "tights", "leggings", +} + +SCRAPER_RELEVANCE_EXCLUDE_TOKENS = { + "sock", "socks", "trunk", "trunks", "boxer", "brief", "briefs", "underwear", + "bra", "bralette", "panty", "panties", "bikini", "swim", "swimsuit", + "belt", "cap", "hat", "beanie", "wallet", "bag", "backpack", "watch", + "shoe", "sneaker", "boot", "sandals", "slippers", +} + +SCRAPER_CATEGORY_KEYWORDS: dict[str, set[str]] = { + "shirt": {"shirt", "formal shirt", "oxford", "button-down", "button up"}, + "polo": {"polo"}, + "jacket": {"jacket", "blazer", "suit jacket", "sport coat", "coat"}, + "t-shirt": {"t-shirt", "tee", "crew neck"}, + "hoodie": {"hoodie"}, + "trousers": {"trouser", "trousers", "tailored"}, + "pants": {"pant", "pants", "chino"}, + "jeans": {"jeans", "denim"}, + "shorts": {"shorts"}, + "joggers": {"jogger", "joggers"}, +} + + +def _allowed_categories(target_category: str, occasion_bucket: str) -> list[str]: + target = target_category if target_category in {"topwear", "bottomwear"} else "topwear" + categories = list(SCRAPER_CATEGORY_POLICY.get(target, {}).get(occasion_bucket, [])) + if not categories: + categories = list(SCRAPER_CATEGORY_POLICY[target]["casual"]) + if occasion_bucket == "formal": + categories = [category for category in categories if category not in SCRAPER_FORMAL_DISALLOWED] + return categories + + +def _resolve_target_category(requested_target: str, wardrobe_snapshot: dict[str, Any]) -> str: + if requested_target in {"topwear", "bottomwear"}: + return requested_target + + counts = wardrobe_snapshot.get("counts") if isinstance(wardrobe_snapshot.get("counts"), dict) else {} + top_count = sum(value for key, value in counts.items() if key.startswith("topwear|")) + bottom_count = sum(value for key, value in counts.items() if key.startswith("bottomwear|")) + if top_count <= bottom_count: + return "topwear" + return "bottomwear" + + +def _product_text_for_relevance(product: dict[str, Any]) -> str: + name = str(product.get("name") or "") + url = str(product.get("item_link") or "") + return _norm(f"{name} {url}") + + +def _is_relevant_scraped_product( + product: dict[str, Any], + target_slot: str, + planned_category: str, + occasion_bucket: str, +) -> bool: + text = _product_text_for_relevance(product) + if not text: + return False + + if any(token in text for token in SCRAPER_RELEVANCE_EXCLUDE_TOKENS): + return False + + planned = _norm(planned_category) + planned_keywords = SCRAPER_CATEGORY_KEYWORDS.get(planned, {planned} if planned else set()) + if planned_keywords and not any(keyword in text for keyword in planned_keywords): + return False + + if target_slot == "topwear": + topwear_terms = {"shirt", "polo", "blazer", "jacket", "coat", "t-shirt", "tee", "hoodie"} + if not any(term in text for term in topwear_terms): + return False + + if target_slot == "bottomwear": + bottom_terms = {"trousers", "pants", "jeans", "joggers", "shorts"} + if not any(term in text for term in bottom_terms): + return False + + if occasion_bucket == "formal": + formal_blocked = {"t-shirt", "tee", "hoodie", "jogger", "shorts", "sport"} + if any(token in text for token in formal_blocked): + return False + + return True + + +def _complementary_slot(slot: str) -> str: + return "bottomwear" if slot == "topwear" else "topwear" + + +def _format_matched_label(item: dict[str, Any]) -> str: + color = str(item.get("color") or "").strip().lower() + category = str(item.get("category") or item.get("type") or "item").strip().lower() + if color and category: + return f"{color} {category}" + return color or category or "item" + + +def _build_product_match_context( + product: dict[str, Any], + query_plan: dict[str, Any], + wardrobe_snapshot: dict[str, Any], + target_category: str, + occasion: str, +) -> dict[str, Any]: + product_slot = target_category if target_category in {"topwear", "bottomwear"} else "topwear" + matching_slot = _complementary_slot(product_slot) + + wardrobe_items = [ + _normalize_wardrobe_item(item) + for item in (wardrobe_snapshot.get("items") or []) + if isinstance(item, dict) + ] + matching_items = [item for item in wardrobe_items if _norm(item.get("type")) == matching_slot] + + product_name = str(product.get("name") or "Suggested product").strip() + product_category = str(query_plan.get("category") or product_slot).strip() or product_slot + product_color = str(query_plan.get("color") or "unknown").strip() or "unknown" + product_style = str(query_plan.get("style_direction") or query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual" + + product_stub = { + "id": str(product.get("item_link") or product_name or "product"), + "type": product_slot, + "category": product_category, + "color": product_color, + "pattern": "solid", + "fabric": "unknown", + "fit": product_style, + "style": product_style, + "occasion": str(query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual", + "season": "all-season", + } + + scored_matches: list[dict[str, Any]] = [] + for candidate in matching_items: + if product_slot == "topwear": + scored = score_pair_full(product_stub, candidate, occasion) + else: + scored = score_pair_full(candidate, product_stub, occasion) + scored_matches.append( + { + "id": candidate.get("id"), + "type": candidate.get("type"), + "category": candidate.get("category"), + "color": candidate.get("color"), + "score": scored.get("score", 0), + "reason": scored.get("reason", ""), + } + ) + + scored_matches.sort(key=lambda entry: (int(entry.get("score") or 0), str(entry.get("color") or "")), reverse=True) + matched_garments = scored_matches[:3] + + if matched_garments: + matched_labels = [_format_matched_label(item) for item in matched_garments] + if len(matched_labels) == 1: + matched_text = matched_labels[0] + elif len(matched_labels) == 2: + matched_text = f"{matched_labels[0]} and {matched_labels[1]}" + else: + matched_text = f"{', '.join(matched_labels[:-1])}, and {matched_labels[-1]}" + reason = ( + f"{product_name} is a strong {product_slot} choice because it pairs cleanly with your {matching_slot} pieces like {matched_text}. " + f"The match is only evaluated against {matching_slot} garments, so no top-top or bottom-bottom pairing is used." + ) + else: + reason = ( + f"{product_name} fits as a {product_slot} recommendation, but there were no {matching_slot} wardrobe items available to compare against." + ) + + return { + "reason": reason, + "match_score": int(matched_garments[0]["score"]) if matched_garments else 0, + "matched_with_slot": matching_slot, + "matched_garments": matched_garments, + } + + +def _enrich_scraper_products_with_matches( + products: list[dict[str, Any]], + query_plan: dict[str, Any], + wardrobe_snapshot: dict[str, Any], + target_category: str, + occasion: str, +) -> list[dict[str, Any]]: + enriched_products: list[dict[str, Any]] = [] + for product in products: + if not isinstance(product, dict): + continue + enriched_products.append( + { + **product, + **_build_product_match_context( + product=product, + query_plan=query_plan, + wardrobe_snapshot=wardrobe_snapshot, + target_category=target_category, + occasion=occasion, + ), + } + ) + return enriched_products + + +def _build_scraper_planning_context( + wardrobe_snapshot: dict[str, Any], + requested_target_category: str, + occasion: str, + gender: str, + filters: dict[str, Any], +) -> dict[str, Any]: + items = wardrobe_snapshot.get("items") if isinstance(wardrobe_snapshot.get("items"), list) else [] + occasion_bucket = _occasion_bucket(occasion) + resolved_target = _resolve_target_category(requested_target_category, wardrobe_snapshot) + reference_slot = "bottomwear" if resolved_target == "topwear" else "topwear" + + slot_colors: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []} + slot_categories: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []} + for raw_item in items: + if not isinstance(raw_item, dict): + continue + slot = _norm(raw_item.get("type")) + slot_key = slot if slot in slot_colors else "others" + slot_colors[slot_key].append(str(raw_item.get("color") or "")) + slot_categories[slot_key].append(str(raw_item.get("category") or "")) + + preferred_colors = [str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()] + color_resonance_scores = _rank_color_resonance( + slot_colors=slot_colors, + reference_slot=reference_slot, + preferred_colors=preferred_colors, + occasion_bucket=occasion_bucket, + ) + color_shortlist = [ + str(entry.get("color") or "").strip() + for entry in color_resonance_scores + if str(entry.get("color") or "").strip() + ][:6] + + allowed = _allowed_categories(resolved_target, occasion_bucket) + style_hint = "formal-smart" if occasion_bucket == "formal" else "occasion-aligned" + if occasion_bucket == "party": + style_hint = "elevated-party" + if occasion_bucket == "sports": + style_hint = "performance-athletic" + + def reference_score(item: dict[str, Any]) -> int: + score = 0 + slot = _norm(item.get("type")) + if slot == reference_slot: + score += 4 + category = _norm(item.get("category")) + if category and category != "unknown": + score += 2 + color = extract_base_color(item.get("color") or "") + if color and color != "unknown": + score += 2 + item_style = _norm(item.get("style") or item.get("occasion") or "") + if occasion_bucket == "formal" and any(token in item_style for token in {"formal", "work", "office", "business"}): + score += 3 + if occasion_bucket == "party" and any(token in item_style for token in {"party", "festive", "ethnic"}): + score += 3 + if occasion_bucket == "casual" and "casual" in item_style: + score += 2 + return score + + ranked_reference_items = [item for item in items if isinstance(item, dict)] + ranked_reference_items.sort(key=reference_score, reverse=True) + reference_item_ids = [str(item.get("id") or "") for item in ranked_reference_items if str(item.get("id") or "").strip()][:4] + + return { + "requested_target_category": requested_target_category, + "resolved_target_category": resolved_target, + "occasion_bucket": occasion_bucket, + "gender_preference": _normalize_scraper_gender(gender), + "allowed_categories": allowed, + "color_shortlist": color_shortlist[:6], + "color_resonance_scores": color_resonance_scores[:8], + "style_direction": style_hint, + "reference_slot": reference_slot, + "reference_item_ids": reference_item_ids, + "reference_items": [ + { + "id": item.get("id"), + "type": item.get("type"), + "category": item.get("category"), + "color": item.get("color"), + "style": item.get("style"), + "occasion": item.get("occasion"), + } + for item in ranked_reference_items[:6] + ], + "slot_dominant_categories": { + "topwear": _top_terms(slot_categories.get("topwear", []), limit=4), + "bottomwear": _top_terms(slot_categories.get("bottomwear", []), limit=4), + }, + "slot_dominant_colors": { + "topwear": _top_terms(slot_colors.get("topwear", []), limit=4), + "bottomwear": _top_terms(slot_colors.get("bottomwear", []), limit=4), + }, + } + + +def _normalize_planned_category(raw_value: Any, allowed: list[str]) -> str: + normalized = _norm(raw_value) + if normalized: + for category in allowed: + if normalized == category or normalized in category or category in normalized: + return category + return allowed[0] if allowed else "shirt" + + +def _extract_explicit_category_from_prompt(user_prompt: str, allowed: list[str]) -> str | None: + normalized_prompt = _norm(user_prompt) + if not normalized_prompt: + return None + + # Map common synonyms to policy categories. + synonym_map: dict[str, str] = { + "blazer": "jacket", + "sport coat": "jacket", + "suit jacket": "jacket", + "tee": "t-shirt", + "tshirt": "t-shirt", + "trouser": "trousers", + "pant": "pants", + } + + allowed_set = {str(value).strip().lower() for value in allowed if str(value).strip()} + + # Direct allowed-category mention takes highest priority. + for category in allowed: + normalized_category = str(category).strip().lower() + if normalized_category and normalized_category in normalized_prompt: + return category + + # Then check synonym map against allowed categories. + for phrase, mapped_category in synonym_map.items(): + if phrase in normalized_prompt and mapped_category in allowed_set: + for category in allowed: + if str(category).strip().lower() == mapped_category: + return category + + return None + + +def _normalize_planned_color(raw_value: Any, color_shortlist: list[str]) -> str: + normalized = extract_base_color(raw_value or "") + if normalized and normalized in color_shortlist: + return normalized + if normalized: + for candidate in color_shortlist: + if candidate in normalized or normalized in candidate: + return candidate + return "" + + +def _resolve_color_fallback(color_shortlist: list[str], color_resonance_scores: list[dict[str, Any]]) -> str: + for entry in color_resonance_scores: + color = str(entry.get("color") or "").strip() + if color: + return color + if color_shortlist: + return color_shortlist[0] + return "black" + + +def _normalize_reference_ids( + raw_ids: Any, + valid_ids: list[str], + fallback_ids: list[str], + limit: int = 4, +) -> list[str]: + valid_set = {value for value in valid_ids if value} + normalized: list[str] = [] + if isinstance(raw_ids, list): + for value in raw_ids: + item_id = str(value or "").strip() + if not item_id or item_id in normalized or item_id not in valid_set: + continue + normalized.append(item_id) + if len(normalized) >= limit: + return normalized + + for item_id in fallback_ids: + if item_id and item_id not in normalized: + normalized.append(item_id) + if len(normalized) >= limit: + break + return normalized + + +def _build_planned_query( + gender: str | None, + color: str, + category: str, + occasion: str, + style_direction: str, +) -> str: + parts = [ + str(gender or "").strip(), + color, + style_direction, + category, + f"for {occasion.strip()}" if occasion.strip() else "", + ] + return " ".join(part for part in parts if part).strip() + + +def _fallback_scraper_plan( + planning_context: dict[str, Any], + occasion: str, + gender: str, + reason: str, +) -> dict[str, Any]: + allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()] + color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] + style_direction = str(planning_context.get("style_direction") or "occasion-aligned") + resolved_target = str(planning_context.get("resolved_target_category") or "topwear") + + plan_gender = _normalize_scraper_gender(gender) + category = allowed[0] if allowed else "shirt" + color = color_shortlist[0] if color_shortlist else "black" + query = _build_planned_query(plan_gender, color, category, occasion, style_direction) + + return { + "target_category": resolved_target, + "color": color, + "category": category, + "gender": plan_gender, + "style_direction": style_direction, + "reference_item_ids": planning_context.get("reference_item_ids", []), + "query": query, + "reason": reason, + "source": "fallback", + } + + +def _generate_scraper_plan_with_kimi( + occasion: str, + gender: str, + preferences: str, + user_prompt: str, + target_category: str, + filters: dict[str, Any], + max_products: int | None, + store: str, + strict_kimi: bool = False, +) -> dict[str, Any]: + wardrobe_snapshot = _wardrobe_metadata_snapshot() + requested_target = _normalize_target_category(target_category) + safe_filters = filters if isinstance(filters, dict) else {} + planning_context = _build_scraper_planning_context( + wardrobe_snapshot=wardrobe_snapshot, + requested_target_category=requested_target, + occasion=occasion, + gender=gender, + filters=safe_filters, + ) + + prompt = _build_scraper_plan_prompt( + occasion=occasion, + gender=gender, + preferences=preferences, + user_prompt=user_prompt, + target_category=requested_target, + filters=safe_filters, + wardrobe_snapshot=wardrobe_snapshot, + planning_context=planning_context, + max_products=max_products, + store=store, + ) + + plan_source = "kimi" + plan_error: str | None = None + try: + model_text = run_kimi_text_inference(prompt, max_tokens=KIMI_MAX_TOKENS) + parsed = _recover_scraper_plan_from_text( + model_text=model_text, + planning_context=planning_context, + occasion=occasion, + gender=gender, + ) + if not isinstance(parsed, dict) or not parsed: + raise NvidiaPayloadError("Kimi scraper planner returned empty or invalid JSON payload.") + except Exception as exc: + if strict_kimi: + raise NvidiaPayloadError(f"Kimi planner unavailable: {exc}") from exc + plan_source = "fallback" + plan_error = str(exc) + parsed = _fallback_scraper_plan( + planning_context=planning_context, + occasion=occasion, + gender=gender, + reason=( + "Live Kimi query planning was unavailable, so a deterministic fallback planner was used." + ), + ) + + resolved_target = _normalize_target_category( + parsed.get("target_category") or planning_context.get("resolved_target_category") or requested_target + ) + if resolved_target == "both": + resolved_target = str(planning_context.get("resolved_target_category") or "topwear") + + allowed = _allowed_categories( + target_category=resolved_target, + occasion_bucket=str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)), + ) + occasion_bucket = str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)) + color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] + color_resonance_scores = [ + entry + for entry in (planning_context.get("color_resonance_scores") or []) + if isinstance(entry, dict) + ] + + color = _normalize_planned_color(parsed.get("color"), color_shortlist) + if not color: + color = _resolve_color_fallback(color_shortlist, color_resonance_scores) + if color_shortlist and color not in color_shortlist: + color = _resolve_color_fallback(color_shortlist, color_resonance_scores) + explicit_category = _extract_explicit_category_from_prompt(user_prompt, allowed) + category = explicit_category or _normalize_planned_category(parsed.get("category"), allowed) + + requested_gender = _normalize_scraper_gender(gender) + inferred_or_planned_gender = _normalize_scraper_gender( + str(parsed.get("gender") or planning_context.get("gender_preference") or gender or "") + ) + plan_gender = requested_gender or inferred_or_planned_gender + + style_direction = str(parsed.get("style_direction") or planning_context.get("style_direction") or "occasion-aligned") + valid_reference_ids = [ + str(item.get("id") or "") + for item in (wardrobe_snapshot.get("items") or []) + if isinstance(item, dict) + ] + reference_item_ids = _normalize_reference_ids( + raw_ids=parsed.get("reference_item_ids"), + valid_ids=valid_reference_ids, + fallback_ids=[str(value) for value in planning_context.get("reference_item_ids", [])], + limit=4, + ) + + query = str(parsed.get("query") or "").strip() + if not query or explicit_category is not None or requested_gender is not None: + query = _build_planned_query(plan_gender, color, category, occasion, style_direction) + + resonance_lead = color_resonance_scores[0] if color_resonance_scores else {} + default_grounding = ( + f"Selected {color} from DB metadata resonance: " + f"reference_count={int(resonance_lead.get('reference_count') or 0)}, " + f"global_count={int(resonance_lead.get('global_count') or 0)}." + if resonance_lead + else "Selected color using wardrobe metadata shortlist and reference-slot compatibility." + ) + + wardrobe_grounding = str(parsed.get("wardrobe_grounding") or default_grounding) + reason = str(parsed.get("reason") or "Kimi generated a wardrobe-aware shopping query.") + + recommendation = ScraperRecommendation( + color=color, + category=category, + gender=plan_gender, + ) + store_key = _normalize_store_name(store or SCRAPER_DEFAULT_STORE or "nike") + + search_urls = _build_store_search_urls_from_query( + query, + store=store_key, + gender=plan_gender, + wardrobe_items=list(wardrobe_snapshot.get("items") or []), + requested_category=requested_target, + ) + if not search_urls: + search_urls = _build_store_search_urls_from_recommendation( + recommendation, + store=store_key, + occasion=occasion, + ) + intermediate_steps: list[dict[str, Any]] = [ + { + "step": "plan", + "store": store_key, + "query": query, + "target_category": resolved_target, + "color": color, + "category": category, + } + ] + + generated_urls = list(dict.fromkeys(search_urls)) + scrape_limit = max_products if isinstance(max_products, int) and max_products > 0 else 12 + scraped_products: list[dict[str, Any]] = [] + fallback_products: list[dict[str, Any]] = [] + seen_links: set[str] = set() + scrape_errors: list[str] = [] + + intermediate_steps.append( + { + "step": "url_generation", + "query": query, + "url_count": len(generated_urls), + "total_urls": len(generated_urls), + } + ) + + if not generated_urls: + intermediate_steps.append( + { + "step": "diagnostic", + "message": "Planner succeeded but no search URLs were generated.", + "attempted_url_count": 0, + } + ) + else: + for index, search_url in enumerate(generated_urls): + if len(scraped_products) >= scrape_limit: + break + + try: + extracted = _extract_store_product_summaries(search_url=search_url, store=store_key) + except requests.RequestException as exc: + error_message = f"url[{index + 1}] scrape failed: {exc}" + scrape_errors.append(error_message) + intermediate_steps.append( + { + "step": "scrape", + "query": query, + "url_count": len(generated_urls), + "new_products": 0, + "total_products": len(scraped_products), + "errors": [error_message], + } + ) + continue + + new_products = 0 + for product in extracted: + if not isinstance(product, dict): + continue + item_link = str(product.get("item_link") or "").strip() + if not item_link or item_link in seen_links: + continue + seen_links.add(item_link) + + if _is_relevant_scraped_product( + product=product, + target_slot=resolved_target, + planned_category=category, + occasion_bucket=occasion_bucket, + ): + scraped_products.append(product) + new_products += 1 + else: + fallback_products.append(product) + + if len(scraped_products) >= scrape_limit: + break + + intermediate_steps.append( + { + "step": "scrape", + "query": query, + "url_count": len(generated_urls), + "new_products": new_products, + "total_products": len(scraped_products), + } + ) + + if not scraped_products and fallback_products: + scraped_products = fallback_products[:scrape_limit] + intermediate_steps.append( + { + "step": "scrape_fallback", + "query": query, + "new_products": len(scraped_products), + "total_products": len(scraped_products), + "message": "Used non-filtered scrape fallback because strict relevance filtering returned no products.", + } + ) + + query_plan_payload = { + "color": color, + "category": category, + "gender": plan_gender, + "query": query, + "final_query": query, + "reason": reason, + "wardrobe_grounding": wardrobe_grounding, + "source": plan_source, + "target_category": resolved_target, + "style_direction": style_direction, + "occasion_bucket": planning_context.get("occasion_bucket"), + "reference_item_ids": reference_item_ids, + "color_resonance_scores": color_resonance_scores[:4], + } + + enriched_products = _enrich_scraper_products_with_matches( + products=scraped_products, + query_plan=query_plan_payload, + wardrobe_snapshot=wardrobe_snapshot, + target_category=resolved_target, + occasion=occasion, + ) + product_urls = [ + str(product.get("item_link") or "").strip() + for product in enriched_products + if str(product.get("item_link") or "").strip() + ] + + response_payload: dict[str, Any] = { + "occasion": occasion, + "gender": gender, + "preferences": preferences, + "wardrobe_snapshot": wardrobe_snapshot, + "query_plan": query_plan_payload, + "store": store_key, + "search_urls": generated_urls, + "product_urls": product_urls, + "products": enriched_products, + "count": len(enriched_products), + "intermediate_steps": intermediate_steps, + "final_query": query, + "plan_source": plan_source, + "plan_error": plan_error, + "scrape_error": "; ".join(scrape_errors) if scrape_errors else None, + "target_category": resolved_target, + } + response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload) + return _store_scraper_runtime_result(response_payload) + + +def _build_shopping_suggestions_from_scraper( + occasion: str, + target_category: str, + gender_preference: str, + filters: dict[str, Any], + max_results: int, + store: str, +) -> dict[str, Any]: + preferences = ", ".join( + str(value) + for value in [ + *([item for item in filters.get("preferred_colors", []) if item]), + *([item for item in filters.get("preferred_patterns", []) if item]), + *([item for item in filters.get("preferred_styles", []) if item]), + *([item for item in filters.get("preferred_fabrics", []) if item]), + *([item for item in filters.get("preferred_fits", []) if item]), + *([item for item in filters.get("preferred_seasons", []) if item]), + *([item for item in filters.get("include_keywords", []) if item]), + ] + ) + + runtime_payload = _generate_scraper_plan_with_kimi( + occasion=occasion, + gender=gender_preference, + preferences=preferences, + user_prompt=preferences, + target_category=target_category, + filters=filters, + max_products=max_results, + store=store, + ) + + products = list(runtime_payload.get("products") or []) + query_plan = dict(runtime_payload.get("query_plan") or {}) + suggestion_items: list[dict[str, Any]] = [] + + for index, product in enumerate(products[:max_results]): + item_link = str(product.get("item_link") or "").strip() + if not item_link: + continue + suggestion_items.append( + { + "target_category": target_category if target_category != "both" else ("topwear" if index % 2 == 0 else "bottomwear"), + "title": str(product.get("name") or query_plan.get("query") or "Suggested Product"), + "url": item_link, + "image_url": str(product.get("image_url") or ""), + "store": str(runtime_payload.get("store") or store or "nike").title(), + "match_score": max(65, 95 - index * 4), + "reason": str(product.get("reason") or query_plan.get("reason") or "Kimi generated a wardrobe-aware shopping query."), + "product_category": str(query_plan.get("category") or "shopping"), + "color": str(query_plan.get("color") or "black"), + "pattern": "solid", + "search_query": str(query_plan.get("query") or occasion), + "scrape_status": "live", + "scrape_error": None, + "product_gender": str(query_plan.get("gender") or gender_preference or "unknown") or "unknown", + "matched_with_slot": str(product.get("matched_with_slot") or ("bottomwear" if target_category == "topwear" else "topwear")), + "matched_garments": product.get("matched_garments") or [], + } + ) + + return { + "occasion": occasion, + "target_category": target_category, + "gender_preference": gender_preference, + "search_filters": filters, + "suggestions": suggestion_items, + "error": None if suggestion_items else "No live scraper results were returned.", + "saved_json_path": runtime_payload.get("saved_json_path", ""), + "runtime_id": runtime_payload.get("runtime_id", ""), + "query_plan": query_plan, + } + + +app = FastAPI(title="Wardrobe Classifier API", version="2.0.0") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +_OUTFIT_GRID_SESSIONS: dict[str, dict[str, Any]] = {} + + +@asynccontextmanager +async def lifespan(app: FastAPI): + init_db() + cache_purge_expired() + yield + + +app.router.lifespan_context = lifespan + + +CLASSIFICATION_PROMPT = """You are a fashion expert analyzing a garment image for a wardrobe assistant app. + +Carefully examine the primary clothing item in the image and return only a valid JSON object. + +Rules: +- Focus only on the dominant foreground garment. Ignore the person's face/body, background, hangers, mannequins, and room objects. +- If multiple garments are visible, classify the single most prominent garment only. +- Be specific with colors (for example, "Navy Blue" instead of "Blue", "Olive Green" instead of "Green"). +- If any attribute is not clearly visible, use "Unknown". +- For "color", include all visible colors in one string (for example, "White with Black stripes"). +- Do not include any explanation, markdown, or text outside the JSON object. + +Return this exact JSON structure: +{ + "type": "e.g. T-Shirt / Jeans / Dress / Jacket / Hoodie / Shorts / Saree / Kurta", + "category": "Topwear / Bottomwear / Footwear / Outerwear / Ethnic / Accessories", + "color": "exact specific color name not basic colours", + "pattern": "Solid / Striped / Checkered / Floral / Printed / Graphic / Embroidered / Tie-Dye", + "fabric": "Cotton / Denim / Wool / Polyester / Silk / Linen / Leather / Unknown", + "fit": "Slim / Regular / Oversized / Fitted / Relaxed / Unknown", + "occasion": "Casual / Formal / Sports / Party / Work / Ethnic", + "season": "Summer / Winter / Monsoon / All-Season" +}""" + +NVIDIA_INVOKE_URL = os.getenv( + "NVIDIA_INVOKE_URL", + "https://integrate.api.nvidia.com/v1/chat/completions", +) +NVIDIA_MODEL_ID = os.getenv("NVIDIA_MODEL_ID", "qwen/qwen3.5-122b-a10b") +NVIDIA_MAX_TOKENS = int(os.getenv("NVIDIA_MAX_TOKENS", "16384")) +NVIDIA_REASONING_MAX_TOKENS = int(os.getenv("NVIDIA_REASONING_MAX_TOKENS", "16384")) +NVIDIA_TEMPERATURE = float(os.getenv("NVIDIA_TEMPERATURE", "0.60")) +NVIDIA_TOP_P = float(os.getenv("NVIDIA_TOP_P", "0.95")) +NVIDIA_TIMEOUT_SECONDS = int(os.getenv("NVIDIA_TIMEOUT_SECONDS", "180")) +NVIDIA_MAX_RETRIES = int(os.getenv("NVIDIA_MAX_RETRIES", "3")) +NVIDIA_RETRY_BACKOFF_SECONDS = float(os.getenv("NVIDIA_RETRY_BACKOFF_SECONDS", "0.8")) +NVIDIA_ENABLE_THINKING = str(os.getenv("NVIDIA_ENABLE_THINKING", "false")).strip().lower() == "true" +NVIDIA_IMAGE_MAX_DIM = int(os.getenv("NVIDIA_IMAGE_MAX_DIM", "1400")) +NVIDIA_FALLBACK_MODEL_IDS = [ + model_id.strip() + for model_id in os.getenv("NVIDIA_FALLBACK_MODEL_IDS", "moonshotai/kimi-k2.5").split(",") + if model_id.strip() +] +NVIDIA_API_KEY_MISSING_DETAIL = "NVIDIA_API_KEY is not configured on this Space." + + +class NvidiaGatewayError(RuntimeError): + def __init__(self, message: str, status_code: int = 502) -> None: + super().__init__(message) + self.status_code = status_code + + +class NvidiaPayloadError(RuntimeError): + pass + + +class NvidiaTokenLimitError(NvidiaPayloadError): + pass + + +def _nvidia_api_key() -> str: + return os.getenv("NVIDIA_API_KEY", "").strip() + + +def _candidate_model_ids(primary_model_id: str) -> list[str]: + model_ids = [primary_model_id, *NVIDIA_FALLBACK_MODEL_IDS] + deduped: list[str] = [] + seen: set[str] = set() + for model_id in model_ids: + key = model_id.strip() + if not key or key in seen: + continue + deduped.append(key) + seen.add(key) + return deduped + + +def _is_degraded_function_error(exc: Exception) -> bool: + if not isinstance(exc, NvidiaGatewayError): + return False + return "DEGRADED function cannot be invoked" in str(exc) + + +OUTFIT_GRID_CELL_SIZE = int(os.getenv("OUTFIT_GRID_CELL_SIZE", "224")) +OUTFIT_GRID_LABEL_HEIGHT = int(os.getenv("OUTFIT_GRID_LABEL_HEIGHT", "28")) +OUTFIT_GRID_PADDING = int(os.getenv("OUTFIT_GRID_PADDING", "12")) +OUTFIT_GRID_FETCH_TIMEOUT_SECONDS = int(os.getenv("OUTFIT_GRID_FETCH_TIMEOUT_SECONDS", "12")) +OUTFIT_GRID_SESSION_TTL_SECONDS = int(os.getenv("OUTFIT_GRID_SESSION_TTL_SECONDS", "3600")) +OUTFIT_GRID_SESSION_DIR = Path( + os.getenv("OUTFIT_GRID_SESSION_DIR", str(Path(tempfile.gettempdir()) / "wardrobe-grid-sessions")) +) +OUTFIT_GRID_MAX_TOP_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_TOP_ITEMS", "4")) +OUTFIT_GRID_MAX_BOTTOM_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_BOTTOM_ITEMS", "4")) +OUTFIT_ANCHOR_MIN_SCORE = int(os.getenv("OUTFIT_ANCHOR_MIN_SCORE", "45")) +OUTFIT_TEXT_PRESELECT_ENABLED = str(os.getenv("OUTFIT_TEXT_PRESELECT_ENABLED", "false")).strip().lower() == "true" +OUTFIT_TEXT_SELECTOR_MAX_TOKENS = int(os.getenv("OUTFIT_TEXT_SELECTOR_MAX_TOKENS", "400")) +OUTFIT_AI_MAX_TOKENS = int(os.getenv("OUTFIT_AI_MAX_TOKENS", "1200")) +OUTFIT_TEXT_SELECTOR_NAME = "kimi-text-preselect-v1" +OUTFIT_AI_SCORER_NAME = "ai-grid-v1" +OUTFIT_FALLBACK_SCORER_NAME = "fallback-current-v1" +OUTFIT_GRID_SCORING_PROMPT_TEMPLATE = """You are an expert multimodal outfit matching engine. + +Task: +Evaluate every valid outfit combination shown in the attached wardrobe grid image and rank the best outfits for the given context. + +Grid semantics: +- Row 1 contains topwear only. +- Row 2 contains bottomwear only. +- Row 3 contains optional "Others" items (footwear, accessories, outerwear, or uncategorized garments). If Row 3 is absent, ignore this slot. +- Each cell is labeled with a coordinate like 1:1, 1:2, 2:1, 2:2, 3:1. +- A valid outfit is exactly one Row 1 item plus one Row 2 item, and optionally one Row 3 item. + +User context: +- Occasion: {occasion} +- Region: {region} +- Weather JSON: {weather_json} +- User profile JSON: {user_profile_json} +- Anchor mode: {anchor_mode} +- Locked top index: {locked_top_index} +- Locked bottom index: {locked_bottom_index} +- Locked other index: {locked_other_index} +- Anchor item JSON: {anchor_item_json} + +Wardrobe metadata map: +{metadata_json} + +Scoring rubric: +- occasion relevance +- color harmony +- pattern compatibility +- fit alignment +- style coherence +- seasonal/contextual appropriateness + +Instructions: +1. Use both the composite image and the metadata map together. +2. Treat the locked item as the fixed styling anchor whenever Anchor mode is not "none". +3. Evaluate all {combination_count} possible valid combinations exactly once before ranking. +4. If a locked top, locked bottom, or locked other index is provided, only consider combinations containing that index. +5. Assign each retained combination a final score from 0 to 100 and this score breakdown: + occasion, color, pattern, fit, style, season +6. Return only valid JSON. No markdown and no prose outside JSON. +7. Return at most the top {top_k} outfits in descending score order. +8. When a Row 3 item is part of the outfit, include its cell in "other_index". If no Row 3 item is used, set "other_index" to null. + +Return this exact JSON shape: +{{ + "recommendations": [ + {{ + "top_index": "1:1", + "bottom_index": "2:1", + "other_index": "3:1", + "score": 92, + "breakdown": {{ + "occasion": 94, + "color": 91, + "pattern": 89, + "fit": 90, + "style": 93, + "season": 88 + }}, + "reason": "Short, user-facing explanation grounded in visual + metadata evidence.", + "tip": "One concise styling tip." + }} + ] +}}""" + + +# --------------------------------------------------------------------------- +# Model helpers +# --------------------------------------------------------------------------- + +def _image_to_data_url(image: Image.Image) -> str: + if NVIDIA_IMAGE_MAX_DIM > 0: + image = image.copy() + image.thumbnail((NVIDIA_IMAGE_MAX_DIM, NVIDIA_IMAGE_MAX_DIM), Image.Resampling.LANCZOS) + buffer = io.BytesIO() + image.save(buffer, format="PNG") + image_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") + return f"data:image/png;base64,{image_b64}" + + +def _extract_text_from_nvidia_content(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for chunk in content: + if isinstance(chunk, str): + parts.append(chunk) + continue + if not isinstance(chunk, dict): + continue + for key in ("text", "content", "value"): + value = chunk.get(key) + if isinstance(value, str) and value: + parts.append(value) + break + return "".join(parts).strip() + if isinstance(content, dict): + for key in ("text", "content", "value"): + value = content.get(key) + if isinstance(value, str) and value: + return value + return "" + + +def _extract_nvidia_text(payload: dict[str, Any]) -> str: + try: + choice = payload["choices"][0] + message = choice["message"] + except (KeyError, IndexError, TypeError) as exc: + raise NvidiaPayloadError(f"Unexpected NVIDIA API response shape: {payload}") from exc + + content = message.get("content") + extracted_content = _extract_text_from_nvidia_content(content) + if extracted_content: + return extracted_content + + reasoning_content = message.get("reasoning_content") + extracted_reasoning_content = _extract_text_from_nvidia_content(reasoning_content) + if extracted_reasoning_content: + return extracted_reasoning_content + + reasoning = message.get("reasoning") + if isinstance(reasoning, str) and reasoning.strip(): + return reasoning.strip() + + if choice.get("finish_reason") == "length": + raise NvidiaTokenLimitError( + "NVIDIA response hit max_tokens before final content was produced." + ) + + raise NvidiaPayloadError( + "Unexpected NVIDIA message payload: " + f"finish_reason={choice.get('finish_reason')}, " + f"message_keys={list(message.keys())}, " + f"content={content!r}, " + f"reasoning_content={reasoning_content!r}" + ) + + +def _extract_streamed_nvidia_text(response: requests.Response) -> str: + chunks: list[str] = [] + for raw_line in response.iter_lines(decode_unicode=True): + if not raw_line: + continue + line = raw_line.strip() + if not line or not line.startswith("data:"): + continue + data = line[5:].strip() + if not data or data == "[DONE]": + continue + try: + payload = json.loads(data) + except json.JSONDecodeError: + continue + + try: + choice = payload["choices"][0] + except (KeyError, IndexError, TypeError): + continue + + delta = choice.get("delta") or {} + if isinstance(delta, dict): + content = _extract_text_from_nvidia_content(delta.get("content")) + if content: + chunks.append(content) + reasoning = _extract_text_from_nvidia_content(delta.get("reasoning_content")) + if reasoning: + chunks.append(reasoning) + alt_reasoning = _extract_text_from_nvidia_content(delta.get("reasoning")) + if alt_reasoning: + chunks.append(alt_reasoning) + + message = choice.get("message") or {} + if isinstance(message, dict): + final_content = _extract_text_from_nvidia_content(message.get("content")) + if final_content: + chunks.append(final_content) + final_reasoning = _extract_text_from_nvidia_content(message.get("reasoning_content")) + if final_reasoning: + chunks.append(final_reasoning) + + text = "".join(chunks).strip() + if text: + return text + raise NvidiaPayloadError("NVIDIA stream ended without returning any content.") + + +def run_nvidia_inference(image: Image.Image, prompt: str, max_tokens: int = NVIDIA_MAX_TOKENS) -> str: + api_key = _nvidia_api_key() + if not api_key: + raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) + + last_error: Exception | None = None + for model_id in _candidate_model_ids(NVIDIA_MODEL_ID): + current_max_tokens = max_tokens + while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: + payload = { + "model": model_id, + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": _image_to_data_url(image), + }, + }, + ], + } + ], + "max_tokens": current_max_tokens, + "temperature": NVIDIA_TEMPERATURE, + "top_p": NVIDIA_TOP_P, + "stream": True, + "include_reasoning": False, + "chat_template_kwargs": {"enable_thinking": NVIDIA_ENABLE_THINKING}, + } + headers = { + "Authorization": f"Bearer {api_key}", + "Accept": "text/event-stream", + "Content-Type": "application/json", + } + + last_error = None + for attempt in range(NVIDIA_MAX_RETRIES + 1): + try: + response = requests.post( + NVIDIA_INVOKE_URL, + headers=headers, + json=payload, + timeout=NVIDIA_TIMEOUT_SECONDS, + ) + if response.status_code in {429, 500, 502, 503, 504}: + raise NvidiaGatewayError( + f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", + status_code=503 if response.status_code == 429 else 502, + ) + if response.status_code >= 400: + raise NvidiaGatewayError( + f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", + status_code=502, + ) + return _extract_streamed_nvidia_text(response) + except NvidiaTokenLimitError as exc: + last_error = exc + break + except (requests.RequestException, NvidiaGatewayError) as exc: + last_error = exc + if _is_degraded_function_error(exc): + break + if attempt >= NVIDIA_MAX_RETRIES: + break + time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) + except NvidiaPayloadError: + raise + + if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: + current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) + continue + if _is_degraded_function_error(last_error or Exception()): + print(f"[nvidia] model degraded, trying fallback model: {model_id}") + break + if isinstance(last_error, NvidiaGatewayError): + raise last_error + if isinstance(last_error, requests.RequestException): + raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error + if last_error is not None: + raise last_error + break + + if isinstance(last_error, Exception): + raise NvidiaGatewayError( + f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}", + status_code=502, + ) from last_error + raise NvidiaGatewayError( + "NVIDIA API request failed after exhausting reasoning token budget.", + status_code=502, + ) + + +def run_nvidia_text_inference(prompt: str, max_tokens: int = OUTFIT_TEXT_SELECTOR_MAX_TOKENS) -> str: + api_key = _nvidia_api_key() + if not api_key: + raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) + + last_error: Exception | None = None + for model_id in _candidate_model_ids(NVIDIA_MODEL_ID): + current_max_tokens = max_tokens + while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: + payload = { + "model": model_id, + "messages": [ + { + "role": "user", + "content": prompt, + } + ], + "max_tokens": current_max_tokens, + "temperature": NVIDIA_TEMPERATURE, + "top_p": NVIDIA_TOP_P, + "stream": True, + "include_reasoning": False, + "chat_template_kwargs": {"enable_thinking": NVIDIA_ENABLE_THINKING}, + } + headers = { + "Authorization": f"Bearer {api_key}", + "Accept": "text/event-stream", + "Content-Type": "application/json", + } + + last_error = None + for attempt in range(NVIDIA_MAX_RETRIES + 1): + try: + response = requests.post( + NVIDIA_INVOKE_URL, + headers=headers, + json=payload, + timeout=NVIDIA_TIMEOUT_SECONDS, + ) + if response.status_code in {429, 500, 502, 503, 504}: + raise NvidiaGatewayError( + f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", + status_code=503 if response.status_code == 429 else 502, + ) + if response.status_code >= 400: + raise NvidiaGatewayError( + f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", + status_code=502, + ) + return _extract_streamed_nvidia_text(response) + except NvidiaTokenLimitError as exc: + last_error = exc + break + except (requests.RequestException, NvidiaGatewayError) as exc: + last_error = exc + if _is_degraded_function_error(exc): + break + if attempt >= NVIDIA_MAX_RETRIES: + break + time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) + except NvidiaPayloadError: + raise + + if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: + current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) + continue + if _is_degraded_function_error(last_error or Exception()): + print(f"[nvidia] model degraded, trying fallback model: {model_id}") + break + if isinstance(last_error, NvidiaGatewayError): + raise last_error + if isinstance(last_error, requests.RequestException): + raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error + if last_error is not None: + raise last_error + break + + if isinstance(last_error, Exception): + raise NvidiaGatewayError( + f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}", + status_code=502, + ) from last_error + raise NvidiaGatewayError( + "NVIDIA API request failed after exhausting reasoning token budget.", + status_code=502, + ) + + +def parse_json_from_text(text: str) -> dict[str, Any]: + if not text: + return {} + stripped = text.strip() + try: + return json.loads(stripped) + except json.JSONDecodeError: + s, e = stripped.find("{"), stripped.rfind("}") + if s != -1 and e != -1 and e > s: + try: + return json.loads(stripped[s:e + 1]) + except json.JSONDecodeError: + pass + return {} + + +def normalize_specs(specs: dict[str, Any]) -> dict[str, str]: + return { + "type": str(specs.get("type", "Unknown")), + "category": str(specs.get("category", "Unknown")), + "color": str(specs.get("color", "Unknown")), + "pattern": str(specs.get("pattern", "Unknown")), + "fabric": str(specs.get("fabric", "Unknown")), + "fit": str(specs.get("fit", "Unknown")), + "occasion": str(specs.get("occasion", "Unknown")), + "season": str(specs.get("season", "Unknown")), + } + + +def _clamp_score(value: Any, fallback: int = 0) -> int: + if isinstance(value, str): + match = re.search(r"-?\d+(?:\.\d+)?", value) + value = match.group(0) if match else value + try: + score = int(round(float(value))) + except (TypeError, ValueError): + score = fallback + return max(0, min(100, score)) + + +def _safe_metadata_item(item: dict[str, Any]) -> dict[str, Any]: + return { + "id": item.get("id"), + "slot": item.get("type"), + "type": item.get("type"), + "category": item.get("category"), + "color": item.get("color"), + "pattern": item.get("pattern"), + "fabric": item.get("fabric"), + "fit": item.get("fit"), + "occasion": item.get("style"), + "season": item.get("season"), + } + + +def _placeholder_grid_tile(index_label: str, item: dict[str, Any], tile_size: int) -> Image.Image: + tile = Image.new("RGB", (tile_size, tile_size), (245, 245, 245)) + draw = ImageDraw.Draw(tile) + font = ImageFont.load_default() + draw.rectangle((8, 8, tile_size - 8, tile_size - 8), outline=(180, 180, 180), width=2) + draw.text((14, 14), index_label, fill=(40, 40, 40), font=font) + draw.text((14, 36), str(item.get("category") or "Unknown")[:24], fill=(80, 80, 80), font=font) + draw.text((14, 54), str(item.get("color") or "Unknown")[:24], fill=(80, 80, 80), font=font) + return tile + + +def _load_grid_tile(image_url: str, index_label: str, item: dict[str, Any]) -> Image.Image: + if not image_url or image_url.startswith("memory://"): + return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) + + parsed = urlparse(image_url) + if parsed.scheme not in {"http", "https"}: + return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) + + try: + req = Request( + image_url, + headers={"User-Agent": "Mozilla/5.0", "Accept": "image/*,*/*;q=0.8", "Referer": image_url}, + ) + with urlopen(req, timeout=OUTFIT_GRID_FETCH_TIMEOUT_SECONDS) as resp: + tile = Image.open(io.BytesIO(resp.read())).convert("RGB") + return ImageOps.fit( + tile, + (OUTFIT_GRID_CELL_SIZE, OUTFIT_GRID_CELL_SIZE), + method=Image.Resampling.LANCZOS, + centering=(0.5, 0.5), + ) + except Exception: + return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) + + +def _prune_outfit_grid_sessions() -> None: + now = time.time() + for session_id, record in list(_OUTFIT_GRID_SESSIONS.items()): + if now - float(record.get("created_at") or 0) <= OUTFIT_GRID_SESSION_TTL_SECONDS: + continue + path = record.get("image_path") + if isinstance(path, str): + try: + Path(path).unlink(missing_ok=True) + except Exception: + pass + _OUTFIT_GRID_SESSIONS.pop(session_id, None) + + +def _build_outfit_grid_session( + tops: list[dict[str, Any]], + bottoms: list[dict[str, Any]], + others: list[dict[str, Any]], + occasion: str, + user_profile: dict[str, Any] | None, + weather: dict[str, Any] | None, + region: str, +) -> dict[str, Any]: + _prune_outfit_grid_sessions() + session_id = str(uuid.uuid4()) + + rows = [ + (1, "Topwear", tops), + (2, "Bottomwear", bottoms), + ] + if others: + rows.append((3, "Others", others)) + + columns = max(len(tops), len(bottoms), len(others), 1) + cell_span = OUTFIT_GRID_CELL_SIZE + 2 * OUTFIT_GRID_PADDING + row_span = OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT + 2 * OUTFIT_GRID_PADDING + canvas = Image.new("RGB", (columns * cell_span, len(rows) * row_span), (255, 255, 255)) + draw = ImageDraw.Draw(canvas) + font = ImageFont.load_default() + + metadata_map: dict[str, dict[str, Any]] = {} + item_lookup: dict[str, dict[str, Any]] = {} + + def draw_row(items: list[dict[str, Any]], row_index: int, row_name: str) -> None: + if not items: + y = (row_index - 1) * row_span + 10 + draw.text((12, y), f"Row {row_index}: {row_name} (no items)", fill=(120, 120, 120), font=font) + return + + for col_index, item in enumerate(items, start=1): + index_label = f"{row_index}:{col_index}" + x0 = (col_index - 1) * cell_span + OUTFIT_GRID_PADDING + y0 = (row_index - 1) * row_span + OUTFIT_GRID_PADDING + tile = _load_grid_tile(str(item.get("image_url") or ""), index_label, item) + canvas.paste(tile, (x0, y0)) + label_box = ( + x0, + y0 + OUTFIT_GRID_CELL_SIZE, + x0 + OUTFIT_GRID_CELL_SIZE, + y0 + OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT, + ) + draw.rectangle(label_box, fill=(20, 20, 20)) + label_text = f"{index_label} | {str(item.get('color') or 'Unknown')[:18]} {str(item.get('category') or 'Unknown')[:18]}" + draw.text((label_box[0] + 6, label_box[1] + 8), label_text[:36], fill=(255, 255, 255), font=font) + metadata_map[index_label] = _safe_metadata_item(item) + item_lookup[index_label] = item + + for row_index, row_name, row_items in rows: + draw_row(row_items, row_index, row_name) + + OUTFIT_GRID_SESSION_DIR.mkdir(parents=True, exist_ok=True) + image_path = OUTFIT_GRID_SESSION_DIR / f"{session_id}.png" + canvas.save(image_path, format="PNG") + + _OUTFIT_GRID_SESSIONS[session_id] = { + "image_path": str(image_path), + "metadata_map": metadata_map, + "created_at": time.time(), + "occasion": occasion, + "user_profile": user_profile or {}, + "weather": weather or {}, + "region": region, + } + + return { + "session_id": session_id, + "image": canvas, + "metadata_map": metadata_map, + "item_lookup": item_lookup, + "image_path": str(image_path), + } + + +def _grid_scoring_prompt( + metadata_map: dict[str, dict[str, Any]], + occasion: str, + weather: dict[str, Any] | None, + user_profile: dict[str, Any] | None, + region: str, + anchor_mode: str, + anchor_item: dict[str, Any] | None, + locked_top_index: str | None, + locked_bottom_index: str | None, + locked_other_index: str | None, + combination_count: int, + top_k: int, +) -> str: + compact_json = lambda value: json.dumps(value, ensure_ascii=True, separators=(",", ":")) + return OUTFIT_GRID_SCORING_PROMPT_TEMPLATE.format( + occasion=occasion or "casual", + region=region or "global", + weather_json=compact_json(weather or {}), + user_profile_json=compact_json(user_profile or {}), + anchor_mode=anchor_mode, + locked_top_index=locked_top_index or "None", + locked_bottom_index=locked_bottom_index or "None", + locked_other_index=locked_other_index or "None", + anchor_item_json=compact_json(_safe_metadata_item(anchor_item or {})), + metadata_json=compact_json(metadata_map), + combination_count=combination_count, + top_k=top_k, + ) + + +def _fallback_rule_recommendations( + occasion: str, + case_name: str, + tops: list[dict[str, Any]], + bottoms: list[dict[str, Any]], + others: list[dict[str, Any]], + top_k: int, + include_pair_outfits: bool = True, + include_other_outfits: bool = True, +) -> dict[str, Any]: + outfits: list[dict[str, Any]] = [] + if include_pair_outfits: + for top in tops: + for bottom in bottoms: + scored = score_pair_full(top, bottom, occasion, other=None) + outfits.append(_build_outfit_payload(scored, top, bottom, rank=0, other=None)) + + if include_other_outfits: + for other in others: + # Score standalone Others as complete outfits rather than as add-ons. + scored = score_pair_full(other, other, occasion, other=None) + base_reason = str(scored.get("reason") or "") + scored["reason"] = f"{other.get('color', 'This')} {other.get('category', 'item')} works as a complete standalone look." + if base_reason: + scored["reason"] = f"{scored['reason']} {base_reason}" + scored["tip"] = "Use footwear and accessories only to complement this single-piece outfit." + outfits.append(_build_outfit_payload(scored, None, None, rank=0, other=other)) + + outfits.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) + for index, outfit in enumerate(outfits[:top_k], start=1): + outfit["rank"] = index + + if case_name == "D" and outfits: + return { + "occasion": occasion, + "case": case_name, + "selected_outfit_score": outfits[0], + "recommendations": [], + "improved_recommendations": outfits[1:top_k], + "total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0), + "notice": None, + "engine_version": "scoring-v2", + } + + return { + "occasion": occasion, + "case": case_name, + "selected_outfit_score": None, + "recommendations": outfits[:top_k], + "improved_recommendations": [], + "total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0), + "notice": None, + "engine_version": "scoring-v2", + } + + +def _occasion_prefers_standalone_others(occasion: str) -> bool: + occasion_n = _norm(occasion) + if not occasion_n: + return False + return any( + keyword in occasion_n + for keyword in [ + "wedding", + "festive", + "ethnic", + "ceremony", + "engagement", + "reception", + "sangeet", + "haldi", + "mehndi", + ] + ) + + +def _merge_standalone_others_for_priority_occasions( + result: dict[str, Any], + occasion: str, + others: list[dict[str, Any]], + top_k: int, +) -> dict[str, Any]: + if not others or not _occasion_prefers_standalone_others(occasion): + return result + if str(result.get("case") or "").upper() == "D": + return result + + standalone_payload = _fallback_rule_recommendations( + occasion=occasion, + case_name=str(result.get("case") or "A"), + tops=[], + bottoms=[], + others=others, + top_k=top_k, + include_pair_outfits=False, + include_other_outfits=True, + ) + standalone = [ + dict(outfit) + for outfit in (standalone_payload.get("recommendations") or []) + if isinstance(outfit, dict) + ] + if not standalone: + return result + + for outfit in standalone: + base_score = int(outfit.get("score") or 0) + boosted_score = min(100, max(base_score, 78) + 12) + outfit["score"] = boosted_score + + breakdown = outfit.get("breakdown") + if isinstance(breakdown, dict): + breakdown["occasion"] = min(100, max(int(breakdown.get("occasion") or 0), 92)) + + recommendations = [ + dict(outfit) + for outfit in (result.get("recommendations") or []) + if isinstance(outfit, dict) + ] + merged = [*recommendations, *standalone] + merged.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) + merged = merged[:top_k] + for index, outfit in enumerate(merged, start=1): + outfit["rank"] = index + + result["recommendations"] = merged + result["total_combinations_checked"] = int(result.get("total_combinations_checked") or 0) + len(others) + return result + + +def _current_fallback_recommendations( + wardrobe_items: list[dict[str, Any]], + occasion: str, + top_selected: dict[str, Any] | None, + bottom_selected: dict[str, Any] | None, + other_selected: dict[str, Any] | None, + weather: dict[str, Any] | None, + user_profile: dict[str, Any] | None, + region: str, + top_k: int, + candidate_pool: int, + diversity_lambda: float, + case_name: str, + tops: list[dict[str, Any]], + bottoms: list[dict[str, Any]], + others: list[dict[str, Any]], +) -> dict[str, Any]: + def _strip_optional_slots(outfit: dict[str, Any]) -> dict[str, Any]: + cleaned = dict(outfit) + cleaned.pop("shoes", None) + cleaned.pop("accessory", None) + if cleaned.get("top") and cleaned.get("bottom"): + cleaned.pop("other", None) + return cleaned + + try: + result = get_recommendation_service().recommend( + wardrobe_items=wardrobe_items, + occasion=occasion, + top_selected=top_selected, + bottom_selected=bottom_selected, + other_selected=other_selected, + weather=weather, + user_profile=user_profile, + region=region, + top_k=top_k, + candidate_pool=candidate_pool, + diversity_lambda=diversity_lambda, + ) + if isinstance(result.get("selected_outfit_score"), dict): + result["selected_outfit_score"] = _strip_optional_slots(result["selected_outfit_score"]) + + recommendations = [ + _strip_optional_slots(entry) + for entry in (result.get("recommendations") or []) + if isinstance(entry, dict) + ] + improved = [ + _strip_optional_slots(entry) + for entry in (result.get("improved_recommendations") or []) + if isinstance(entry, dict) + ] + + result["recommendations"] = recommendations[:top_k] + result["improved_recommendations"] = improved[:top_k] + result = _merge_standalone_others_for_priority_occasions( + result=result, + occasion=occasion, + others=others, + top_k=top_k, + ) + print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=fashion_ai") + return result + except Exception as exc: + print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=scoring-v2 reason={exc!r}") + return _fallback_rule_recommendations( + occasion, + case_name, + tops, + bottoms, + others, + top_k, + include_pair_outfits=other_selected is None, + include_other_outfits=bool(others), + ) + + +def _extract_grid_indices(outfit_raw: dict[str, Any]) -> tuple[str, str, str]: + top_index = str( + outfit_raw.get("top_index") + or outfit_raw.get("top") + or outfit_raw.get("top_cell") + or "" + ).strip() + bottom_index = str( + outfit_raw.get("bottom_index") + or outfit_raw.get("bottom") + or outfit_raw.get("bottom_cell") + or "" + ).strip() + other_index = str( + outfit_raw.get("other_index") + or outfit_raw.get("other") + or outfit_raw.get("other_cell") + or "" + ).strip() + + if top_index and bottom_index and other_index: + return top_index, bottom_index, other_index + + combo_text = str( + outfit_raw.get("combination") + or outfit_raw.get("combo") + or outfit_raw.get("pair") + or "" + ) + matches = re.findall(r"[123]:\d+", combo_text) + top_match = next((value for value in matches if value.startswith("1:")), "") + bottom_match = next((value for value in matches if value.startswith("2:")), "") + other_match = next((value for value in matches if value.startswith("3:")), "") + return top_index or top_match, bottom_index or bottom_match, other_index or other_match + + +def _rank_anchor_candidates( + anchor_item: dict[str, Any], + candidates: list[dict[str, Any]], + occasion: str, + anchor_is_top: bool, +) -> list[dict[str, Any]]: + ranked: list[tuple[int, dict[str, Any]]] = [] + for candidate in candidates: + scored = ( + score_pair_full(anchor_item, candidate, occasion) + if anchor_is_top + else score_pair_full(candidate, anchor_item, occasion) + ) + ranked.append((int(scored.get("score") or 0), candidate)) + + ranked.sort(key=lambda pair: pair[0], reverse=True) + compatible = [item for score, item in ranked if score >= OUTFIT_ANCHOR_MIN_SCORE] + if compatible: + return compatible + return [item for _, item in ranked] + + +def _text_selector_item_payload(item: dict[str, Any], index: int) -> dict[str, Any]: + return { + "index": index, + "id": str(item.get("id") or ""), + "type": str(item.get("type") or ""), + "category": str(item.get("category") or "Unknown"), + "color": str(item.get("color") or "Unknown"), + "pattern": str(item.get("pattern") or "Unknown"), + "fabric": str(item.get("fabric") or "Unknown"), + "fit": str(item.get("fit") or "Unknown"), + "season": str(item.get("season") or "Unknown"), + "style": str(item.get("style") or "Unknown"), + "occasion": str(item.get("occasion") or "Unknown"), + } + + +def _select_grid_candidates_with_text_ai( + candidates: list[dict[str, Any]], + slot_name: str, + occasion: str, + limit: int, + anchor_mode: str, + anchor_item: dict[str, Any] | None, +) -> list[dict[str, Any]]: + if len(candidates) <= limit: + return candidates + + if not OUTFIT_TEXT_PRESELECT_ENABLED: + return candidates[:limit] + + candidate_payload = [ + _text_selector_item_payload(item, idx + 1) + for idx, item in enumerate(candidates) + ] + anchor_payload = _text_selector_item_payload(anchor_item, 0) if anchor_item else None + + prompt = ( + "You are a fashion ranking assistant for candidate preselection.\n" + f"Occasion: {occasion}\n" + f"Slot to rank: {slot_name}\n" + f"Anchor mode: {anchor_mode}\n" + f"Keep exactly {limit} items if possible (or fewer when candidates are fewer).\n\n" + "Goal:\n" + "Select the strongest candidates for downstream outfit matching using only textual metadata.\n" + "Prioritize occasion relevance, compatibility with anchor context, and diversity in color/pattern/style.\n\n" + "Rules:\n" + "1. Choose only IDs from the provided candidates.\n" + "2. No duplicate IDs.\n" + "3. Return strictly valid JSON and nothing else.\n" + "4. Prefer candidates that maximize useful pairing variety, not near-duplicates.\n\n" + "Return EXACT shape:\n" + "{\"selected_ids\":[\"id1\",\"id2\"]}\n\n" + f"Anchor item JSON:\n{json.dumps(anchor_payload, ensure_ascii=True)}\n\n" + f"Candidate items JSON:\n{json.dumps(candidate_payload, ensure_ascii=True)}" + ) + + try: + response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS) + parsed_payload = parse_json_from_text(response_text) + selected_ids_raw = parsed_payload.get("selected_ids") if isinstance(parsed_payload, dict) else None + selected_indices_raw = parsed_payload.get("selected_indices") if isinstance(parsed_payload, dict) else None + + selected_ids: list[str] = [] + if isinstance(selected_ids_raw, list): + selected_ids = [ + str(value).strip() + for value in selected_ids_raw + if str(value).strip() + ] + + if not selected_ids and isinstance(selected_indices_raw, list): + for raw_index in selected_indices_raw: + try: + index = int(raw_index) + except (TypeError, ValueError): + continue + if 1 <= index <= len(candidates): + selected_ids.append(str(candidates[index - 1].get("id") or "").strip()) + elif 0 <= index < len(candidates): + selected_ids.append(str(candidates[index].get("id") or "").strip()) + + id_to_item = { + str(item.get("id") or "").strip(): item + for item in candidates + } + + selected: list[dict[str, Any]] = [] + seen: set[str] = set() + for item_id in selected_ids: + if not item_id or item_id in seen: + continue + item = id_to_item.get(item_id) + if not item: + continue + selected.append(item) + seen.add(item_id) + if len(selected) >= limit: + break + + if len(selected) < limit: + for item in candidates: + item_id = str(item.get("id") or "").strip() + if item_id in seen: + continue + selected.append(item) + seen.add(item_id) + if len(selected) >= limit: + break + + selected = selected[:limit] + if len(selected) == limit: + print( + f"[outfit-preselect] slot={slot_name} mode={anchor_mode} " + f"in={len(candidates)} kept={len(selected)} strategy={OUTFIT_TEXT_SELECTOR_NAME}" + ) + return selected + + raise NvidiaPayloadError( + f"Text preselector returned insufficient candidates: requested={limit} got={len(selected)}" + ) + except Exception as exc: + print( + f"[outfit-preselect] slot={slot_name} mode={anchor_mode} " + f"in={len(candidates)} kept={limit} strategy=head reason={exc!r}" + ) + return candidates[:limit] + + +def _resolve_outfit_grid_sources( + wardrobe_items: list[dict[str, Any]], + occasion: str, + top_selected: dict[str, Any] | None, + bottom_selected: dict[str, Any] | None, + other_selected: dict[str, Any] | None, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], str, dict[str, Any] | None]: + all_tops = [i for i in wardrobe_items if i.get("type") == "topwear"] + all_bottoms = [i for i in wardrobe_items if i.get("type") == "bottomwear"] + all_others = [ + i + for i in wardrobe_items + if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"} + ] + other_pool = [other_selected] if other_selected else [] + + if top_selected and bottom_selected: + anchor_mode = "locked-top+locked-bottom+locked-other" if other_selected else "locked-top+locked-bottom" + return [top_selected], [bottom_selected], other_pool, anchor_mode, other_selected or top_selected + + if top_selected: + ranked_bottoms = _rank_anchor_candidates( + anchor_item=top_selected, + candidates=all_bottoms, + occasion=occasion, + anchor_is_top=True, + ) + anchor_mode = "locked-top+locked-other" if other_selected else "locked-top" + return [top_selected], ranked_bottoms, other_pool, anchor_mode, other_selected or top_selected + + if bottom_selected: + ranked_tops = _rank_anchor_candidates( + anchor_item=bottom_selected, + candidates=all_tops, + occasion=occasion, + anchor_is_top=False, + ) + anchor_mode = "locked-bottom+locked-other" if other_selected else "locked-bottom" + return ranked_tops, [bottom_selected], other_pool, anchor_mode, other_selected or bottom_selected + + if other_selected: + return all_tops, all_bottoms, [], "locked-other", other_selected + + return all_tops, all_bottoms, [], "none", None + + +def _normalize_ai_outfit_payload( + parsed_payload: dict[str, Any], + item_lookup: dict[str, dict[str, Any]], + occasion: str, + case_name: str, + top_k: int, + total_combinations: int, + session_id: str, +) -> dict[str, Any]: + raw_recommendations = parsed_payload.get("recommendations") + if not isinstance(raw_recommendations, list): + raw_recommendations = parsed_payload.get("outfits") + if not isinstance(raw_recommendations, list): + raw_recommendations = parsed_payload.get("top_outfits") + if not isinstance(raw_recommendations, list): + raise NvidiaPayloadError(f"AI outfit scorer returned no recommendation list: {parsed_payload}") + + recommendations: list[dict[str, Any]] = [] + for raw_entry in raw_recommendations: + if not isinstance(raw_entry, dict): + continue + top_index, bottom_index, other_index = _extract_grid_indices(raw_entry) + top_item = item_lookup.get(top_index) + bottom_item = item_lookup.get(bottom_index) + other_item = item_lookup.get(other_index) if other_index else None + if not top_item or not bottom_item: + continue + + base_breakdown = raw_entry.get("breakdown") if isinstance(raw_entry.get("breakdown"), dict) else {} + breakdown = { + "color": _clamp_score(base_breakdown.get("color"), 70), + "style": _clamp_score(base_breakdown.get("style"), 70), + "occasion": _clamp_score(base_breakdown.get("occasion"), 70), + "fit": _clamp_score(base_breakdown.get("fit"), 70), + "pattern": _clamp_score(base_breakdown.get("pattern"), 70), + "season": _clamp_score(base_breakdown.get("season"), 70), + } + recommendation = { + "rank": 0, + "score": _clamp_score(raw_entry.get("score"), 0), + "breakdown": breakdown, + "reason": str(raw_entry.get("reason") or "AI-generated outfit recommendation."), + "tip": str(raw_entry.get("tip") or "Use matching accessories to complete this look."), + "combination": f"{top_index} + {bottom_index}" + (f" + {other_index}" if other_item else ""), + "grid_session_id": session_id, + "top": { + "id": top_item.get("id"), + "category": top_item.get("category"), + "color": top_item.get("color"), + "image_url": top_item.get("image_url", ""), + }, + "bottom": { + "id": bottom_item.get("id"), + "category": bottom_item.get("category"), + "color": bottom_item.get("color"), + "image_url": bottom_item.get("image_url", ""), + }, + } + if other_item: + recommendation["other"] = { + "id": other_item.get("id"), + "category": other_item.get("category"), + "color": other_item.get("color"), + "image_url": other_item.get("image_url", ""), + } + recommendations.append(recommendation) + + if not recommendations: + raise NvidiaPayloadError(f"AI outfit scorer returned no valid index-mapped recommendations: {parsed_payload}") + + recommendations.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) + recommendations = recommendations[:top_k] + for index, outfit in enumerate(recommendations, start=1): + outfit["rank"] = index + + if case_name == "D": + return { + "occasion": occasion, + "case": case_name, + "selected_outfit_score": recommendations[0], + "recommendations": [], + "improved_recommendations": recommendations, + "total_combinations_checked": total_combinations, + "notice": None, + "grid_session_id": session_id, + "engine_version": OUTFIT_AI_SCORER_NAME, + } + + return { + "occasion": occasion, + "case": case_name, + "selected_outfit_score": None, + "recommendations": recommendations, + "improved_recommendations": [], + "total_combinations_checked": total_combinations, + "notice": None, + "grid_session_id": session_id, + "engine_version": OUTFIT_AI_SCORER_NAME, + } + + +def _recommend_outfits_with_ai_grid( + wardrobe_items: list[dict[str, Any]], + occasion: str, + top_selected: dict[str, Any] | None, + bottom_selected: dict[str, Any] | None, + other_selected: dict[str, Any] | None, + weather: dict[str, Any] | None, + user_profile: dict[str, Any] | None, + region: str, + top_k: int, + case_name: str, +) -> dict[str, Any]: + top_source, bottom_source, other_source, anchor_mode, anchor_item = _resolve_outfit_grid_sources( + wardrobe_items=wardrobe_items, + occasion=occasion, + top_selected=top_selected, + bottom_selected=bottom_selected, + other_selected=other_selected, + ) + + top_pool_count = len(top_source) + bottom_pool_count = len(bottom_source) + + if not top_selected and len(top_source) > OUTFIT_GRID_MAX_TOP_ITEMS: + top_source = _select_grid_candidates_with_text_ai( + candidates=top_source, + slot_name="topwear", + occasion=occasion, + limit=OUTFIT_GRID_MAX_TOP_ITEMS, + anchor_mode=anchor_mode, + anchor_item=anchor_item, + ) + if not bottom_selected and len(bottom_source) > OUTFIT_GRID_MAX_BOTTOM_ITEMS: + bottom_source = _select_grid_candidates_with_text_ai( + candidates=bottom_source, + slot_name="bottomwear", + occasion=occasion, + limit=OUTFIT_GRID_MAX_BOTTOM_ITEMS, + anchor_mode=anchor_mode, + anchor_item=anchor_item, + ) + + if not top_source or not bottom_source: + raise NvidiaPayloadError("AI outfit scorer requires at least one topwear and one bottomwear item.") + + grid_session = _build_outfit_grid_session( + tops=top_source, + bottoms=bottom_source, + others=other_source, + occasion=occasion, + user_profile=user_profile, + weather=weather, + region=region, + ) + + combination_count = len(top_source) * len(bottom_source) * ( + len(other_source) if other_selected else (len(other_source) + 1 if other_source else 1) + ) + + prompt = _grid_scoring_prompt( + metadata_map=grid_session["metadata_map"], + occasion=occasion, + weather=weather, + user_profile=user_profile, + region=region, + anchor_mode=anchor_mode, + anchor_item=anchor_item, + locked_top_index="1:1" if top_selected else None, + locked_bottom_index="2:1" if bottom_selected else None, + locked_other_index="3:1" if other_selected else None, + combination_count=combination_count, + top_k=top_k, + ) + if not other_source: + prompt = ( + f"{prompt}\n\n" + "Important: This grid contains only Row 1 (Topwear) and Row 2 (Bottomwear). " + "Always set other_index to null." + ) + print( + f"[outfit-grid] mode={anchor_mode} session={grid_session['session_id']} " + f"tops_in={top_pool_count} bottoms_in={bottom_pool_count} " + f"rows=2 tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} " + f"combinations={combination_count}" + ) + model_text = run_nvidia_inference(grid_session["image"], prompt, max_tokens=OUTFIT_AI_MAX_TOKENS) + parsed_payload = parse_json_from_text(model_text) + if not parsed_payload: + raise NvidiaPayloadError(f"AI outfit scorer returned unparsable JSON: {model_text[:500]}") + + result = _normalize_ai_outfit_payload( + parsed_payload=parsed_payload, + item_lookup=grid_session["item_lookup"], + occasion=occasion, + case_name=case_name, + top_k=top_k, + total_combinations=combination_count, + session_id=grid_session["session_id"], + ) + print( + f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} " + f"mode={anchor_mode} session={grid_session['session_id']} " + f"tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} " + f"combinations={combination_count}" + ) + return result + + +def _raise_http_error(exc: Exception) -> NoReturn: + print("Classification request failed:", repr(exc)) + traceback.print_exc() + if isinstance(exc, RuntimeError) and str(exc) == NVIDIA_API_KEY_MISSING_DETAIL: + raise HTTPException(status_code=503, detail=NVIDIA_API_KEY_MISSING_DETAIL) from exc + if isinstance(exc, NvidiaGatewayError): + raise HTTPException(status_code=exc.status_code, detail=str(exc)) from exc + if isinstance(exc, NvidiaPayloadError): + raise HTTPException(status_code=502, detail=str(exc)) from exc + if isinstance(exc, requests.RequestException): + raise HTTPException(status_code=502, detail=f"NVIDIA API request failed: {exc}") from exc + raise HTTPException(status_code=500, detail=str(exc)) from exc + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@app.get("/") +def root() -> dict[str, str]: + return {"status": "ok", "message": "Wardrobe Classifier API v2"} + + +@app.get("/health") +def health() -> dict[str, str]: + return { + "status": "ok", + "classification_provider": "nvidia", + "model": NVIDIA_MODEL_ID, + "nvidia_api_configured": str(bool(_nvidia_api_key())), + "nvidia_invoke_url": NVIDIA_INVOKE_URL, + "engine_version": "scoring-v2", + "outfit_matching_provider": "kimi", + } + + +@app.post("/product-urls") +def product_urls(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: + color = str(payload.get("color") or "") + category = str(payload.get("category") or "") + gender = payload.get("gender") + max_products = int(payload.get("max_products") or 30) + store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) + + if not color or not category: + raise HTTPException(status_code=400, detail="color and category are required") + + recommendation = ScraperRecommendation( + color=color, + category=category, + gender=str(gender) if gender else None, + ) + + try: + search_urls = _build_store_search_urls_from_recommendation( + recommendation, + store=store, + occasion="", + ) + products: list[dict[str, str]] = [] + seen_links: set[str] = set() + + for search_url in search_urls: + for product in _extract_store_product_summaries(search_url, store=store): + item_link = str(product.get("item_link") or "").strip() + if not item_link or item_link in seen_links: + continue + seen_links.add(item_link) + products.append(product) + if len(products) >= max_products: + break + if len(products) >= max_products: + break + + response_payload: dict[str, Any] = { + "store": store, + "search_urls": search_urls, + "product_urls": [item["item_link"] for item in products], + "products": products, + "count": len(products), + } + response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload) + return response_payload + except requests.RequestException as exc: + raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc + + +@app.post("/suggestions") +@app.post("/api/suggestions") +def suggestions(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: + occasion = str(payload.get("occasion") or "casual") + target_category = str(payload.get("target_category") or payload.get("targetCategory") or "both") + gender_preference = str(payload.get("gender_preference") or payload.get("genderPreference") or "any") + filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {} + max_results = int(payload.get("max_results") or payload.get("maxResults") or 8) + store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) + + if max_results < 1: + raise HTTPException(status_code=400, detail="max_results must be at least 1") + + try: + return _build_shopping_suggestions_from_scraper( + occasion=occasion, + target_category=target_category, + gender_preference=gender_preference, + filters=filters, + max_results=max_results, + store=store, + ) + except NvidiaGatewayError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + except NvidiaPayloadError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + except requests.RequestException as exc: + raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc + + +@app.post("/scraper/recommend") +def scraper_recommend(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: + user_prompt = str(payload.get("user_prompt") or payload.get("prompt") or "").strip() + inferred = _infer_structured_request_from_prompt(user_prompt) + inferred_target_category = _normalize_target_category(inferred.get("target_category")) + + occasion = str(payload.get("occasion") or inferred.get("occasion") or "casual") + if _norm(occasion) in {"", "auto", "any"}: + occasion = str(inferred.get("occasion") or "casual") + + gender = str(payload.get("gender") or inferred.get("gender") or "") + payload_target_category = _normalize_target_category( + payload.get("target_category") or payload.get("targetCategory") or "both" + ) + target_category = ( + inferred_target_category + if inferred_target_category in {"topwear", "bottomwear"} + else payload_target_category + if payload_target_category in {"topwear", "bottomwear"} + else "both" + ) + + filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {} + inferred_colors = inferred.get("preferred_colors") if isinstance(inferred.get("preferred_colors"), list) else [] + inferred_include = inferred.get("include_keywords") if isinstance(inferred.get("include_keywords"), list) else [] + inferred_exclude = inferred.get("exclude_keywords") if isinstance(inferred.get("exclude_keywords"), list) else [] + filters = { + **filters, + "preferred_colors": [ + *([str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()]), + *([str(value) for value in inferred_colors if str(value).strip()]), + ], + "include_keywords": [ + *([str(value) for value in (filters.get("include_keywords") or []) if str(value).strip()]), + *([str(value) for value in inferred_include if str(value).strip()]), + ], + "exclude_keywords": [ + *([str(value) for value in (filters.get("exclude_keywords") or []) if str(value).strip()]), + *([str(value) for value in inferred_exclude if str(value).strip()]), + ], + } + + preference_parts = [str(payload.get("preferences") or "").strip(), user_prompt] + preferences = ", ".join(part for part in preference_parts if part) + max_products_raw = payload.get("max_products") + max_products = int(max_products_raw) if max_products_raw not in {None, ""} else None + store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) + + if isinstance(max_products, int) and max_products < 1: + raise HTTPException(status_code=400, detail="max_products must be at least 1") + + try: + return _generate_scraper_plan_with_kimi( + occasion=occasion, + gender=gender, + preferences=preferences, + user_prompt=user_prompt, + target_category=target_category, + filters=filters, + max_products=max_products, + store=store, + strict_kimi=True, + ) + except NvidiaGatewayError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + except NvidiaPayloadError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + except requests.RequestException as exc: + raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc + + +@app.get("/scraper") +def scraper_page() -> Response: + wardrobe_snapshot = _wardrobe_metadata_snapshot(limit=12) + wardrobe_json = html_lib.escape(json.dumps(wardrobe_snapshot, ensure_ascii=True, indent=2)) + html_content = f""" + + +
+ + +Kimi reads wardrobe metadata, builds a context-aware shopping query, and returns matching products with links, names, prices, and images.
+Current items loaded from the database are used by Kimi to shape the shopping query.
+{wardrobe_json}
+ Run the search to generate a wardrobe-aware query.+