""" app.py — Wardrobe Assistant API (v2) Changes from v1: - _item_store (in-memory list) replaced by SQLite via db.py - All outfit scoring routed through scoring.py (strategic v2 model) - _gap_suggestions now analyses the actual wardrobe instead of hardcoding items - /feedback endpoint added for preference data collection - Score responses include human-readable reason + tip from scoring.py """ from __future__ import annotations import base64 import io import json import os import re import tempfile import threading import traceback import time import uuid import html as html_lib from dataclasses import replace from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, NoReturn from urllib.error import HTTPError from urllib.parse import parse_qs, quote_plus, unquote, urlencode, urljoin, urlparse from urllib.request import Request, urlopen import requests from fastapi import Body, FastAPI, File, HTTPException, Query, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from PIL import Image, ImageDraw, ImageFont, ImageOps from db import ( init_db, item_insert, item_get_all, item_get, item_update, item_delete, feedback_record, cache_get, cache_set, cache_purge_expired, ) from scoring import ( score_pair_full, extract_base_color, extract_style, extract_fit, extract_pattern, extract_season, _NEUTRALS, ) from fashion_ai import get_recommendation_service from scraper import Recommendation as ScraperRecommendation from scraper import ( build_search_urls_from_query as build_nike_search_urls_from_query, build_search_urls_from_recommendation as build_nike_search_urls_from_recommendation, extract_product_summaries as extract_nike_product_summaries, ) from zalando_scraper import ( build_zalando_search_url, build_zalando_search_urls_from_request, extract_product_summaries as extract_zalando_product_summaries, ) def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _norm(value: Any) -> str: return str(value or "").strip().lower() MATCHING_RESULT_CACHE: dict[str, dict[str, Any]] = {} MATCHING_RESULT_CACHE_LOCK = threading.Lock() MATCHING_RESULT_CACHE_MAX = int(os.getenv("MATCHING_RESULT_CACHE_MAX", "500")) MATCHING_RESULT_CACHE_TTL_SECONDS = int(os.getenv("MATCHING_RESULT_CACHE_TTL_SECONDS", "86400")) def _matching_cache_storage_key(key: str) -> str: return f"matching:{key}" def _normalize_cache_category(value: Any) -> str: category = _norm(value) if category in {"topwear", "bottomwear", "others"}: return category return "" def _extract_cache_user_id(payload: dict[str, Any], wardrobe_items: list[dict[str, Any]]) -> str: payload_user = str(payload.get("user_id") or "").strip() if payload_user: return payload_user for item in wardrobe_items: candidate = str(item.get("user_id") or "").strip() if candidate: return candidate return "anonymous" def _build_matching_cache_key( user_id: str, category: str, occasion: str, wardrobe_hash: str, lock_signature: str = "", ) -> str: return "|".join([ user_id.strip() or "anonymous", _normalize_cache_category(category), _norm(occasion) or "casual", wardrobe_hash.strip(), lock_signature.strip(), ]) def _matching_cache_get(key: str) -> dict[str, Any] | None: with MATCHING_RESULT_CACHE_LOCK: cached = MATCHING_RESULT_CACHE.get(key) if isinstance(cached, dict): return json.loads(json.dumps(cached)) try: persisted = cache_get(_matching_cache_storage_key(key)) except Exception as exc: print(f"[matching-cache] db read failed key={key} reason={exc!r}") return None if not isinstance(persisted, dict): return None snapshot = json.loads(json.dumps(persisted)) with MATCHING_RESULT_CACHE_LOCK: MATCHING_RESULT_CACHE[key] = snapshot while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX: oldest_key = next(iter(MATCHING_RESULT_CACHE)) MATCHING_RESULT_CACHE.pop(oldest_key, None) return json.loads(json.dumps(snapshot)) def _matching_cache_set(key: str, payload: dict[str, Any]) -> None: snapshot = json.loads(json.dumps(payload)) with MATCHING_RESULT_CACHE_LOCK: MATCHING_RESULT_CACHE[key] = snapshot while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX: oldest_key = next(iter(MATCHING_RESULT_CACHE)) MATCHING_RESULT_CACHE.pop(oldest_key, None) try: cache_set( _matching_cache_storage_key(key), snapshot, ttl_seconds=MATCHING_RESULT_CACHE_TTL_SECONDS, ) except Exception as exc: print(f"[matching-cache] db write failed key={key} reason={exc!r}") def _build_lock_signature_from_payload(payload: dict[str, Any]) -> str: explicit = str(payload.get("lock_signature") or "").strip() if explicit: return explicit top_selected_raw = payload.get("top_selected") bottom_selected_raw = payload.get("bottom_selected") other_selected_raw = payload.get("other_selected") top_id = str(top_selected_raw.get("id") or "").strip() if isinstance(top_selected_raw, dict) else "" bottom_id = str(bottom_selected_raw.get("id") or "").strip() if isinstance(bottom_selected_raw, dict) else "" other_id = str(other_selected_raw.get("id") or "").strip() if isinstance(other_selected_raw, dict) else "" if not top_id and not bottom_id and not other_id: return "" return f"top:{top_id or '-'}|bottom:{bottom_id or '-'}|other:{other_id or '-'}" def _infer_type(category: str) -> str: n = _norm(category) if any(keyword in n for keyword in ["shirt", "tee", "top", "kurta", "blouse", "hoodie", "sweater", "blazer", "jacket", "polo"]): return "topwear" if any(keyword in n for keyword in ["jean", "pant", "trouser", "short", "skirt", "jogger", "palazzo", "chino"]): return "bottomwear" return "others" def _normalize_wardrobe_item(item: dict[str, Any]) -> dict[str, Any]: description = item.get("description") if isinstance(item.get("description"), dict) else {} category = str(item.get("category") or description.get("category") or description.get("type") or "Unknown") item_type = str(item.get("type") or description.get("type") or _infer_type(category)) return { "id": item.get("id") or str(uuid.uuid4()), "image_url": item.get("image_url") or "", "type": item_type, "category": category, "color": str(item.get("color") or description.get("color") or "Unknown"), "pattern": str(item.get("pattern") or description.get("pattern") or "Solid"), "fabric": str(item.get("fabric") or description.get("fabric") or "Unknown"), "fit": str(item.get("fit") or description.get("fit") or "Unknown"), "season": str(item.get("season") or description.get("season") or "All-Season"), "style": str(item.get("style") or description.get("occasion") or description.get("style") or "casual"), "occasion": str(item.get("occasion") or description.get("occasion") or "casual"), "description": description, } def _build_outfit_payload(scored: dict[str, Any], top: dict[str, Any] | None, bottom: dict[str, Any] | None, rank: int, other: dict[str, Any] | None) -> dict[str, Any]: payload = { **scored, "rank": rank, "top": top, "bottom": bottom, } if other is not None: payload["other"] = other return payload def _gap_suggestions(wardrobe: list[dict[str, Any]], occasion: str) -> list[dict[str, Any]]: tops = sum(1 for item in wardrobe if _norm(item.get("type")) == "topwear") bottoms = sum(1 for item in wardrobe if _norm(item.get("type")) == "bottomwear") suggestions: list[dict[str, Any]] = [] if tops == 0: suggestions.append({"focus": "topwear", "suggestion": "Add a versatile topwear staple", "reason": "No topwear items found."}) if bottoms == 0: suggestions.append({"focus": "bottomwear", "suggestion": "Add a versatile bottomwear staple", "reason": "No bottomwear items found."}) if tops and bottoms and abs(tops - bottoms) > 2: suggestions.append({"focus": "balance", "suggestion": "Balance your wardrobe mix", "reason": "One category is much larger than the other."}) if not suggestions: suggestions.append({"focus": "versatility", "suggestion": f"Add one {occasion} piece that can mix with existing staples", "reason": "Wardrobe is already balanced."}) return suggestions[:4] SCRAPER_OUTPUT_DIR = Path(__file__).resolve().parent / "scraped_json" SCRAPER_RUNTIME_RESULTS: dict[str, dict[str, Any]] = {} SCRAPER_RUNTIME_LOCK = threading.Lock() SCRAPER_PLANNER_MODEL_ID = os.getenv( "SCRAPER_PLANNER_MODEL_ID", "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning", ) SCRAPER_PLANNER_MAX_TOKENS = int(os.getenv("SCRAPER_PLANNER_MAX_TOKENS", "800")) SCRAPER_DEFAULT_STORE = str(os.getenv("SCRAPER_DEFAULT_STORE", "nike")).strip().lower() def _save_scraper_json_payload(prefix: str, payload: dict[str, Any]) -> str: SCRAPER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") file_path = SCRAPER_OUTPUT_DIR / f"{prefix}_{timestamp}.json" with file_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, ensure_ascii=True, indent=2) return str(file_path) def _store_scraper_runtime_result(payload: dict[str, Any]) -> dict[str, Any]: runtime_id = str(uuid.uuid4()) record = { "runtime_id": runtime_id, "created_at": _now_iso(), **payload, } with SCRAPER_RUNTIME_LOCK: SCRAPER_RUNTIME_RESULTS[runtime_id] = record while len(SCRAPER_RUNTIME_RESULTS) > 25: oldest_key = next(iter(SCRAPER_RUNTIME_RESULTS)) SCRAPER_RUNTIME_RESULTS.pop(oldest_key, None) return record def _candidate_text_model_ids(primary_model_id: str) -> list[str]: return _candidate_model_ids(primary_model_id) def _run_text_inference_with_model(primary_model_id: str, prompt: str, max_tokens: int) -> str: api_key = _nvidia_api_key() if not api_key: raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) last_error: Exception | None = None for model_id in _candidate_text_model_ids(primary_model_id): current_max_tokens = max_tokens while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: payload = { "model": model_id, "messages": [ { "role": "user", "content": prompt, } ], "max_tokens": current_max_tokens, "temperature": NVIDIA_TEMPERATURE, "top_p": NVIDIA_TOP_P, "stream": True, } if NVIDIA_ENABLE_THINKING: payload["chat_template_kwargs"] = {"enable_thinking": True} headers = { "Authorization": f"Bearer {api_key}", "Accept": "text/event-stream", "Content-Type": "application/json", } last_error = None for attempt in range(NVIDIA_MAX_RETRIES + 1): try: response = requests.post( NVIDIA_INVOKE_URL, headers=headers, json=payload, timeout=NVIDIA_TIMEOUT_SECONDS, ) if response.status_code in {429, 500, 502, 503, 504}: raise NvidiaGatewayError( f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", status_code=503 if response.status_code == 429 else 502, ) if response.status_code >= 400: raise NvidiaGatewayError( f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", status_code=502, ) try: return _extract_streamed_nvidia_text(response) except NvidiaPayloadError as stream_exc: # Some providers occasionally terminate SSE without final text; retry once using non-stream mode. if "stream ended without returning any content" not in str(stream_exc).lower(): raise non_stream_payload = {**payload, "stream": False} non_stream_headers = { "Authorization": f"Bearer {api_key}", "Accept": "application/json", "Content-Type": "application/json", } non_stream_response = requests.post( NVIDIA_INVOKE_URL, headers=non_stream_headers, json=non_stream_payload, timeout=NVIDIA_TIMEOUT_SECONDS, ) if non_stream_response.status_code in {429, 500, 502, 503, 504}: raise NvidiaGatewayError( f"NVIDIA API transient failure {non_stream_response.status_code}: {non_stream_response.text[:500]}", status_code=503 if non_stream_response.status_code == 429 else 502, ) if non_stream_response.status_code >= 400: raise NvidiaGatewayError( f"NVIDIA API request failed with {non_stream_response.status_code}: {non_stream_response.text[:500]}", status_code=502, ) try: parsed_payload = non_stream_response.json() except ValueError as exc: raise NvidiaPayloadError( "NVIDIA non-stream planner response was not valid JSON." ) from exc return _extract_nvidia_text(parsed_payload) except NvidiaTokenLimitError as exc: last_error = exc break except (requests.RequestException, NvidiaGatewayError) as exc: last_error = exc if _is_degraded_function_error(exc): break if attempt >= NVIDIA_MAX_RETRIES: break time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) except NvidiaPayloadError: raise if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) continue if _is_degraded_function_error(last_error or Exception()): print(f"[nvidia] model degraded, trying fallback model: {model_id}") break if isinstance(last_error, NvidiaGatewayError): raise last_error if isinstance(last_error, requests.RequestException): raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error if last_error is not None: raise last_error break if isinstance(last_error, Exception): raise NvidiaGatewayError( f"NVIDIA API request failed on all configured models ({', '.join(_candidate_text_model_ids(primary_model_id))}): {last_error}", status_code=502, ) from last_error raise NvidiaGatewayError( "NVIDIA API request failed after exhausting reasoning token budget.", status_code=502, ) def run_scraper_planner_text_inference(prompt: str, max_tokens: int = SCRAPER_PLANNER_MAX_TOKENS) -> str: return _run_text_inference_with_model(SCRAPER_PLANNER_MODEL_ID, prompt, max_tokens) def _normalize_store_name(value: str | None) -> str: normalized = _norm(value) if normalized in {"zalando", "nike"}: return normalized return "nike" def _build_store_query_from_recommendation(recommendation: ScraperRecommendation, occasion: str = "") -> str: parts = [ str(recommendation.gender or "").strip(), str(recommendation.color or "").strip(), str(recommendation.category or "").strip(), str(occasion or "").strip(), ] return " ".join(part for part in parts if part) def _build_store_search_urls_from_recommendation( recommendation: ScraperRecommendation, store: str, occasion: str = "", ) -> list[str]: store_key = _normalize_store_name(store) if store_key == "zalando": query = _build_store_query_from_recommendation(recommendation, occasion=occasion) return [build_zalando_search_url(query=query, gender=recommendation.gender)] return build_nike_search_urls_from_recommendation(recommendation, store=store_key) def _build_store_search_urls_from_query( query: str, store: str, gender: str | None = None, wardrobe_items: list[dict[str, Any]] | None = None, requested_category: str | None = None, ) -> list[str]: store_key = _normalize_store_name(store) if store_key == "zalando": search_urls, _ = build_zalando_search_urls_from_request( query=query, gender=gender, wardrobe_items=wardrobe_items, requested_category=requested_category, # URL generation should follow planner output + deterministic rules. # GPT OSS remains reserved for post-scrape cleanup only. completion_fn=None, ) return search_urls return build_nike_search_urls_from_query(query=query, store=store_key, gender=gender) def _extract_store_product_summaries(search_url: str, store: str) -> list[dict[str, str]]: store_key = _normalize_store_name(store) if store_key == "zalando": return extract_zalando_product_summaries( search_url=search_url, max_products=None, use_apify=True, postprocess=None, ) return extract_nike_product_summaries(search_url, store=store_key) def _wardrobe_metadata_snapshot(limit: int = 30) -> dict[str, Any]: wardrobe = [_normalize_wardrobe_item(item) for item in item_get_all()] counts: dict[str, int] = {} for item in wardrobe: key = f"{_norm(item.get('type') or 'unknown')}|{_norm(item.get('occasion') or 'unknown')}" counts[key] = counts.get(key, 0) + 1 return { "total_items": len(wardrobe), "items": [ { "id": item.get("id"), "type": item.get("type"), "category": item.get("category"), "color": item.get("color"), "pattern": item.get("pattern"), "fit": item.get("fit"), "season": item.get("season"), "style": item.get("style"), "occasion": item.get("occasion"), } for item in wardrobe[:limit] ], "counts": counts, } def _build_scraper_plan_prompt( occasion: str, gender: str, preferences: str, user_prompt: str, target_category: str, filters: dict[str, Any], wardrobe_snapshot: dict[str, Any], planning_context: dict[str, Any], max_products: int | None, store: str, ) -> str: return ( "ROLE: Senior Fashion Merchandising Strategist & Query Planner for AI Wardrobe Assistant\n\n" f"OBJECTIVE: Generate exactly one high-precision {store.title()} shopping plan that is context-safe, occasion-safe, and wardrobe-grounded.\n\n" "---\n\n" "INPUTS:\n\n" f"user_request: \"{user_prompt}\"\n" f"occasion: \"{occasion}\"\n" f"target_category: \"{target_category}\" // strict slot constraint\n" f"gender: \"{gender}\"\n" f"preferences: \"{preferences}\"\n" f"filters: {json.dumps(filters, ensure_ascii=True)}\n" f"max_products: {max_products if isinstance(max_products, int) and max_products > 0 else 'uncapped'}\n" f"planning_context: {json.dumps(planning_context, ensure_ascii=True)}\n" f"wardrobe_snapshot: {json.dumps(wardrobe_snapshot, ensure_ascii=True)}\n\n" "---\n\n" "EXECUTION RULES (Hard Constraints):\n\n" "1. SLOT LOCK: target_category is immutable. If \"topwear\", NEVER output bottomwear categories (pants, shorts, joggers) and vice versa.\n\n" "2. OCCASION VETTING: For formal | interview | work | wedding | client_meeting:\n" " BLOCK: hoodie, sweatshirt, joggers, shorts, tank, tights, leggings, crop-top, sports-bra-as-outerwear\n" " ALLOW: shirt, polo, blazer_outerwear, sweater, sweatshirt_structured (only if minimal branding)\n\n" "3. PRIORITY HIERARCHY:\n" " First: allowed_categories from planning_context\n" " Second: color_shortlist from planning_context\n" " Third: reference_item_ids from planning_context (anchor reasoning here)\n\n" "4. COLOR LOGIC: Select ONE color from color_shortlist that:\n" " - Complements majority of wardrobe_snapshot bottoms\n" " - Suits occasion formality\n" " - Has highest non-conflict score with existing wardrobe\n\n" "5. If planning_context.color_resonance_scores exists, prefer the highest-scoring color unless user filters explicitly override it.\n\n" "---\n\n" "REASONING FRAMEWORK:\n\n" "Step 1 — Parse wardrobe_snapshot for color frequency, gap categories, and bottom-dominant hues\n" "Step 2 — Cross-reference planning_context.reference_items for silhouette compatibility\n" "Step 3 — Filter through occasion veto list\n" "Step 4 — Select optimal {category, color} pair with highest interoperability score\n" "Step 5 — Compose commerce-optimized query: gender + color + occasion + category\n\n" "---\n\n" "OUTPUT SCHEMA (strict JSON, no markdown, no extra keys):\n\n" "{\n" ' "target_category": "topwear|bottomwear",\n' ' "color": "string from color_shortlist",\n' ' "category": "string from allowed_categories post-vetting",\n' ' "gender": "men|women|unisex",\n' ' "style_direction": "formal-smart|business-casual|casual-polished|etc",\n' ' "reference_item_ids": ["array from planning_context"],\n' ' "query": "commerce-ready search string",\n' ' "wardrobe_grounding": "specific evidence from wardrobe_snapshot",\n' ' "reason": "concise strategic justification"\n' "}\n" ) def _recover_scraper_plan_from_text( model_text: str, planning_context: dict[str, Any], occasion: str, gender: str, ) -> dict[str, Any]: text = str(model_text or "").strip() if not text: return {} parsed = parse_json_from_text(text) if isinstance(parsed, dict) and parsed: return parsed allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()] colors = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] lowered = text.lower() category = "" for candidate in allowed: if candidate.lower() in lowered: category = candidate break if not category and allowed: category = allowed[0] color = "" for candidate in colors: if candidate.lower() in lowered: color = candidate break if not color and colors: color = colors[0] inferred_gender = _normalize_scraper_gender(gender) if "women" in lowered: inferred_gender = "women" elif "men" in lowered: inferred_gender = "men" elif "unisex" in lowered: inferred_gender = "unisex" query = "" query_match = re.search(r'"query"\s*:\s*"([^"]+)"', text, flags=re.IGNORECASE) if query_match: query = query_match.group(1).strip() if not query and category and color: query = _build_planned_query( inferred_gender, color, category, occasion, str(planning_context.get("style_direction") or "occasion-aligned"), ) if not category or not color or not query: return {} return { "target_category": planning_context.get("resolved_target_category", "topwear"), "color": color, "category": category, "gender": inferred_gender, "style_direction": planning_context.get("style_direction", "occasion-aligned"), "reference_item_ids": planning_context.get("reference_item_ids", []), "query": query, "reason": "Recovered Nemotron planner output from semi-structured response.", "source": "nemotron", } def _normalize_scraper_gender(value: str | None) -> str | None: normalized = str(value or "").strip().lower() if normalized in {"men", "male", "man", "mens"}: return "men" if normalized in {"women", "female", "woman", "womens"}: return "women" if normalized in {"unisex", "any", "all"}: return "unisex" return None def _normalize_target_category(value: Any) -> str: normalized = _norm(value) if normalized in {"topwear", "top", "upper", "tops"}: return "topwear" if normalized in {"bottomwear", "bottom", "lower", "bottoms"}: return "bottomwear" return "both" _PROMPT_TO_TARGET_HINTS = { "topwear": { "top", "topwear", "shirt", "blazer", "jacket", "polo", "tee", "t-shirt", "kurta", "upper", }, "bottomwear": { "bottom", "bottomwear", "trouser", "trousers", "pants", "jeans", "shorts", "joggers", "lower", }, } _PROMPT_TO_OCCASION_HINTS: dict[str, set[str]] = { "formal": {"formal", "interview", "office", "work", "business", "meeting", "wedding"}, "party": {"party", "festive", "diwali", "celebration", "date", "ethnic"}, "sports": {"sports", "gym", "workout", "training", "running", "run", "active"}, "casual": {"casual", "daily", "everyday", "weekend", "outing"}, } _PROMPT_COLOR_TERMS = [ "black", "white", "navy", "blue", "grey", "gray", "beige", "olive", "green", "brown", "khaki", "cream", "maroon", "charcoal", "tan", ] def _infer_structured_request_from_prompt(user_prompt: str) -> dict[str, Any]: normalized = _norm(user_prompt) if not normalized: return { "target_category": "both", "occasion": "", "gender": "", "preferred_colors": [], "include_keywords": [], "exclude_keywords": [], } target_category = "both" top_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["topwear"] if token in normalized) bottom_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["bottomwear"] if token in normalized) if top_hits > bottom_hits and top_hits > 0: target_category = "topwear" elif bottom_hits > top_hits and bottom_hits > 0: target_category = "bottomwear" occasion = "" for bucket, tokens in _PROMPT_TO_OCCASION_HINTS.items(): if any(token in normalized for token in tokens): occasion = bucket break gender = "" if any(token in normalized for token in {" men", "male", "man", " mens"}): gender = "men" elif any(token in normalized for token in {" women", "female", "woman", " womens"}): gender = "women" elif "unisex" in normalized: gender = "unisex" preferred_colors: list[str] = [] for color in _PROMPT_COLOR_TERMS: if color in normalized and color not in preferred_colors: preferred_colors.append(color) include_keywords: list[str] = [] for keyword in ["formal", "structured", "minimal", "smart", "elegant", "tailored"]: if keyword in normalized and keyword not in include_keywords: include_keywords.append(keyword) exclude_keywords: list[str] = [] for keyword in ["hoodie", "oversized", "ripped", "distressed", "athleisure"]: if f"avoid {keyword}" in normalized or f"no {keyword}" in normalized or f"without {keyword}" in normalized: exclude_keywords.append(keyword) return { "target_category": target_category, "occasion": occasion, "gender": gender, "preferred_colors": preferred_colors, "include_keywords": include_keywords, "exclude_keywords": exclude_keywords, } def _occasion_bucket(value: str) -> str: normalized = _norm(value) if any(token in normalized for token in {"formal", "interview", "office", "work", "business", "wedding", "meeting"}): return "formal" if any(token in normalized for token in {"party", "festive", "diwali", "celebration", "ethnic", "date"}): return "party" if any(token in normalized for token in {"sports", "gym", "active", "training", "run", "running"}): return "sports" return "casual" def _top_terms(values: list[str], limit: int = 6) -> list[str]: counts: dict[str, int] = {} for value in values: key = _norm(value) if not key or key == "unknown": continue counts[key] = counts.get(key, 0) + 1 return [key for key, _ in sorted(counts.items(), key=lambda pair: pair[1], reverse=True)[:limit]] def _rank_color_resonance( slot_colors: dict[str, list[str]], reference_slot: str, preferred_colors: list[str], occasion_bucket: str, ) -> list[dict[str, Any]]: reference_counts: dict[str, int] = {} global_counts: dict[str, int] = {} for raw_color in slot_colors.get(reference_slot, []): normalized = extract_base_color(raw_color or "") if not normalized or normalized == "unknown": continue reference_counts[normalized] = reference_counts.get(normalized, 0) + 1 for raw_color in slot_colors.get("topwear", []) + slot_colors.get("bottomwear", []): normalized = extract_base_color(raw_color or "") if not normalized or normalized == "unknown": continue global_counts[normalized] = global_counts.get(normalized, 0) + 1 preferred_set = { extract_base_color(value or "") for value in preferred_colors if extract_base_color(value or "") } neutral_colors = ["navy", "black", "white", "grey", "beige"] candidate_pool: list[str] = [] for candidate in [ *preferred_colors, *[key for key, _ in sorted(reference_counts.items(), key=lambda pair: pair[1], reverse=True)], *[key for key, _ in sorted(global_counts.items(), key=lambda pair: pair[1], reverse=True)], *neutral_colors, ]: normalized = extract_base_color(candidate or "") if not normalized or normalized == "unknown" or normalized in candidate_pool: continue candidate_pool.append(normalized) formal_boost_colors = {"navy", "black", "white", "grey", "charcoal", "beige"} sports_boost_colors = {"black", "white", "grey", "navy", "blue", "red", "green"} ranked: list[dict[str, Any]] = [] for color in candidate_pool: reference_count = reference_counts.get(color, 0) global_count = global_counts.get(color, 0) preferred_bonus = 2 if color in preferred_set else 0 occasion_bonus = 0 if occasion_bucket == "formal" and color in formal_boost_colors: occasion_bonus = 1 elif occasion_bucket == "sports" and color in sports_boost_colors: occasion_bonus = 1 score = (reference_count * 3) + global_count + preferred_bonus + occasion_bonus ranked.append( { "color": color, "score": score, "reference_count": reference_count, "global_count": global_count, "preferred": color in preferred_set, } ) ranked.sort( key=lambda item: ( int(item.get("score") or 0), int(item.get("reference_count") or 0), int(item.get("global_count") or 0), ), reverse=True, ) return ranked SCRAPER_CATEGORY_POLICY: dict[str, dict[str, list[str]]] = { "topwear": { "formal": ["shirt", "polo", "jacket"], "party": ["shirt", "jacket", "polo"], "sports": ["jersey", "t-shirt", "hoodie"], "casual": ["shirt", "t-shirt", "polo", "jacket", "hoodie"], }, "bottomwear": { "formal": ["trousers", "pants"], "party": ["trousers", "pants", "jeans"], "sports": ["joggers", "shorts", "tights", "leggings"], "casual": ["jeans", "pants", "shorts", "joggers", "trousers"], }, } SCRAPER_FORMAL_DISALLOWED = { "hoodie", "sweatshirt", "joggers", "shorts", "tank top", "tights", "leggings", } SCRAPER_RELEVANCE_EXCLUDE_TOKENS = { "sock", "socks", "trunk", "trunks", "boxer", "brief", "briefs", "underwear", "bra", "bralette", "panty", "panties", "bikini", "swim", "swimsuit", "belt", "cap", "hat", "beanie", "wallet", "bag", "backpack", "watch", "shoe", "sneaker", "boot", "sandals", "slippers", } SCRAPER_CATEGORY_KEYWORDS: dict[str, set[str]] = { "shirt": {"shirt", "formal shirt", "oxford", "button-down", "button up"}, "polo": {"polo"}, "jacket": {"jacket", "blazer", "suit jacket", "sport coat", "coat"}, "t-shirt": {"t-shirt", "tee", "crew neck"}, "hoodie": {"hoodie"}, "trousers": {"trouser", "trousers", "tailored"}, "pants": {"pant", "pants", "chino"}, "jeans": {"jeans", "denim"}, "shorts": {"shorts"}, "joggers": {"jogger", "joggers"}, } def _allowed_categories(target_category: str, occasion_bucket: str) -> list[str]: target = target_category if target_category in {"topwear", "bottomwear"} else "topwear" categories = list(SCRAPER_CATEGORY_POLICY.get(target, {}).get(occasion_bucket, [])) if not categories: categories = list(SCRAPER_CATEGORY_POLICY[target]["casual"]) if occasion_bucket == "formal": categories = [category for category in categories if category not in SCRAPER_FORMAL_DISALLOWED] return categories def _resolve_target_category(requested_target: str, wardrobe_snapshot: dict[str, Any]) -> str: if requested_target in {"topwear", "bottomwear"}: return requested_target counts = wardrobe_snapshot.get("counts") if isinstance(wardrobe_snapshot.get("counts"), dict) else {} top_count = sum(value for key, value in counts.items() if key.startswith("topwear|")) bottom_count = sum(value for key, value in counts.items() if key.startswith("bottomwear|")) if top_count <= bottom_count: return "topwear" return "bottomwear" def _product_text_for_relevance(product: dict[str, Any]) -> str: name = str(product.get("name") or "") url = str(product.get("item_link") or "") color = str(product.get("color") or "") brand = str(product.get("brand") or "") return _norm(f"{name} {color} {brand} {url}") SCRAPER_COLOR_KEYWORDS: dict[str, set[str]] = { "black": {"black", "jet black"}, "white": {"white", "bright white", "off white", "off-white"}, "navy": {"navy", "dark blue", "dk blue", "dress blues", "moonlit ocean", "midnight blue"}, "blue": {"blue", "navy", "dark blue", "dk blue", "dress blues", "ice blue", "light blue", "skyway", "moonlit ocean"}, "grey": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, "gray": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, "beige": {"beige", "sand", "tan", "stone", "morel", "oatmeal", "cornstalk", "cream", "camel"}, "brown": {"brown", "tan", "morel"}, "olive": {"olive", "khaki"}, "green": {"green", "olive", "khaki"}, "red": {"red", "brick red", "winetasting", "wine"}, "maroon": {"maroon", "burgundy", "wine", "winetasting"}, } def _color_keywords_for_relevance(color: str) -> set[str]: normalized = extract_base_color(color or "") or _norm(color) if not normalized or normalized == "unknown": return set() return SCRAPER_COLOR_KEYWORDS.get(normalized, {normalized}) def _matches_planned_color(product: dict[str, Any], planned_color: str) -> bool: keywords = _color_keywords_for_relevance(planned_color) if not keywords: return True text = _product_text_for_relevance(product) return any(keyword in text for keyword in keywords) def _is_relevant_scraped_product( product: dict[str, Any], target_slot: str, planned_category: str, planned_color: str, occasion_bucket: str, ) -> bool: text = _product_text_for_relevance(product) if not text: return False if any(token in text for token in SCRAPER_RELEVANCE_EXCLUDE_TOKENS): return False planned = _norm(planned_category) planned_keywords = SCRAPER_CATEGORY_KEYWORDS.get(planned, {planned} if planned else set()) if planned_keywords and not any(keyword in text for keyword in planned_keywords): return False if not _matches_planned_color(product, planned_color): return False if target_slot == "topwear": topwear_terms = {"shirt", "polo", "blazer", "jacket", "coat", "t-shirt", "tee", "hoodie"} if not any(term in text for term in topwear_terms): return False if target_slot == "bottomwear": bottom_terms = {"trousers", "pants", "jeans", "joggers", "shorts"} if not any(term in text for term in bottom_terms): return False if occasion_bucket == "formal": formal_blocked = {"t-shirt", "tee", "hoodie", "jogger", "shorts", "sport"} if any(token in text for token in formal_blocked): return False return True def _complementary_slot(slot: str) -> str: return "bottomwear" if slot == "topwear" else "topwear" def _format_matched_label(item: dict[str, Any]) -> str: color = str(item.get("color") or "").strip().lower() category = str(item.get("category") or item.get("type") or "item").strip().lower() if color and category: return f"{color} {category}" return color or category or "item" def _build_product_match_context( product: dict[str, Any], query_plan: dict[str, Any], wardrobe_snapshot: dict[str, Any], target_category: str, occasion: str, ) -> dict[str, Any]: product_slot = target_category if target_category in {"topwear", "bottomwear"} else "topwear" matching_slot = _complementary_slot(product_slot) wardrobe_items = [ _normalize_wardrobe_item(item) for item in (wardrobe_snapshot.get("items") or []) if isinstance(item, dict) ] matching_items = [item for item in wardrobe_items if _norm(item.get("type")) == matching_slot] product_name = str(product.get("name") or "Suggested product").strip() product_category = str(query_plan.get("category") or product_slot).strip() or product_slot product_color = str(query_plan.get("color") or "unknown").strip() or "unknown" product_style = str(query_plan.get("style_direction") or query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual" product_stub = { "id": str(product.get("item_link") or product_name or "product"), "type": product_slot, "category": product_category, "color": product_color, "pattern": "solid", "fabric": "unknown", "fit": product_style, "style": product_style, "occasion": str(query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual", "season": "all-season", } scored_matches: list[dict[str, Any]] = [] for candidate in matching_items: if product_slot == "topwear": scored = score_pair_full(product_stub, candidate, occasion) else: scored = score_pair_full(candidate, product_stub, occasion) scored_matches.append( { "id": candidate.get("id"), "type": candidate.get("type"), "category": candidate.get("category"), "color": candidate.get("color"), "score": scored.get("score", 0), "reason": scored.get("reason", ""), } ) scored_matches.sort(key=lambda entry: (int(entry.get("score") or 0), str(entry.get("color") or "")), reverse=True) matched_garments = scored_matches[:3] if matched_garments: matched_labels = [_format_matched_label(item) for item in matched_garments] if len(matched_labels) == 1: matched_text = matched_labels[0] elif len(matched_labels) == 2: matched_text = f"{matched_labels[0]} and {matched_labels[1]}" else: matched_text = f"{', '.join(matched_labels[:-1])}, and {matched_labels[-1]}" reason = ( f"{product_name} is a strong {product_slot} choice because it pairs cleanly with your {matching_slot} pieces like {matched_text}. " f"The match is only evaluated against {matching_slot} garments, so no top-top or bottom-bottom pairing is used." ) else: reason = ( f"{product_name} fits as a {product_slot} recommendation, but there were no {matching_slot} wardrobe items available to compare against." ) return { "reason": reason, "match_score": int(matched_garments[0]["score"]) if matched_garments else 0, "matched_with_slot": matching_slot, "matched_garments": matched_garments, } def _enrich_scraper_products_with_matches( products: list[dict[str, Any]], query_plan: dict[str, Any], wardrobe_snapshot: dict[str, Any], target_category: str, occasion: str, ) -> list[dict[str, Any]]: enriched_products: list[dict[str, Any]] = [] for product in products: if not isinstance(product, dict): continue enriched_products.append( { **product, **_build_product_match_context( product=product, query_plan=query_plan, wardrobe_snapshot=wardrobe_snapshot, target_category=target_category, occasion=occasion, ), } ) return enriched_products def _build_scraper_planning_context( wardrobe_snapshot: dict[str, Any], requested_target_category: str, occasion: str, gender: str, filters: dict[str, Any], ) -> dict[str, Any]: items = wardrobe_snapshot.get("items") if isinstance(wardrobe_snapshot.get("items"), list) else [] occasion_bucket = _occasion_bucket(occasion) resolved_target = _resolve_target_category(requested_target_category, wardrobe_snapshot) reference_slot = "bottomwear" if resolved_target == "topwear" else "topwear" slot_colors: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []} slot_categories: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []} for raw_item in items: if not isinstance(raw_item, dict): continue slot = _norm(raw_item.get("type")) slot_key = slot if slot in slot_colors else "others" slot_colors[slot_key].append(str(raw_item.get("color") or "")) slot_categories[slot_key].append(str(raw_item.get("category") or "")) preferred_colors = [str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()] color_resonance_scores = _rank_color_resonance( slot_colors=slot_colors, reference_slot=reference_slot, preferred_colors=preferred_colors, occasion_bucket=occasion_bucket, ) color_shortlist = [ str(entry.get("color") or "").strip() for entry in color_resonance_scores if str(entry.get("color") or "").strip() ][:6] allowed = _allowed_categories(resolved_target, occasion_bucket) style_hint = "formal-smart" if occasion_bucket == "formal" else "occasion-aligned" if occasion_bucket == "party": style_hint = "elevated-party" if occasion_bucket == "sports": style_hint = "performance-athletic" def reference_score(item: dict[str, Any]) -> int: score = 0 slot = _norm(item.get("type")) if slot == reference_slot: score += 4 category = _norm(item.get("category")) if category and category != "unknown": score += 2 color = extract_base_color(item.get("color") or "") if color and color != "unknown": score += 2 item_style = _norm(item.get("style") or item.get("occasion") or "") if occasion_bucket == "formal" and any(token in item_style for token in {"formal", "work", "office", "business"}): score += 3 if occasion_bucket == "party" and any(token in item_style for token in {"party", "festive", "ethnic"}): score += 3 if occasion_bucket == "casual" and "casual" in item_style: score += 2 return score ranked_reference_items = [item for item in items if isinstance(item, dict)] ranked_reference_items.sort(key=reference_score, reverse=True) reference_item_ids = [str(item.get("id") or "") for item in ranked_reference_items if str(item.get("id") or "").strip()][:4] return { "requested_target_category": requested_target_category, "resolved_target_category": resolved_target, "occasion_bucket": occasion_bucket, "gender_preference": _normalize_scraper_gender(gender), "allowed_categories": allowed, "color_shortlist": color_shortlist[:6], "color_resonance_scores": color_resonance_scores[:8], "style_direction": style_hint, "reference_slot": reference_slot, "reference_item_ids": reference_item_ids, "reference_items": [ { "id": item.get("id"), "type": item.get("type"), "category": item.get("category"), "color": item.get("color"), "style": item.get("style"), "occasion": item.get("occasion"), } for item in ranked_reference_items[:6] ], "slot_dominant_categories": { "topwear": _top_terms(slot_categories.get("topwear", []), limit=4), "bottomwear": _top_terms(slot_categories.get("bottomwear", []), limit=4), }, "slot_dominant_colors": { "topwear": _top_terms(slot_colors.get("topwear", []), limit=4), "bottomwear": _top_terms(slot_colors.get("bottomwear", []), limit=4), }, } def _normalize_planned_category(raw_value: Any, allowed: list[str]) -> str: normalized = _norm(raw_value) if normalized: for category in allowed: if normalized == category or normalized in category or category in normalized: return category return allowed[0] if allowed else "shirt" def _extract_explicit_category_from_prompt(user_prompt: str, allowed: list[str]) -> str | None: normalized_prompt = _norm(user_prompt) if not normalized_prompt: return None # Map common synonyms to policy categories. synonym_map: dict[str, str] = { "blazer": "jacket", "sport coat": "jacket", "suit jacket": "jacket", "tee": "t-shirt", "tshirt": "t-shirt", "trouser": "trousers", "pant": "pants", } allowed_set = {str(value).strip().lower() for value in allowed if str(value).strip()} # Direct allowed-category mention takes highest priority. for category in allowed: normalized_category = str(category).strip().lower() if normalized_category and normalized_category in normalized_prompt: return category # Then check synonym map against allowed categories. for phrase, mapped_category in synonym_map.items(): if phrase in normalized_prompt and mapped_category in allowed_set: for category in allowed: if str(category).strip().lower() == mapped_category: return category return None def _normalize_planned_color(raw_value: Any, color_shortlist: list[str]) -> str: normalized = extract_base_color(raw_value or "") if normalized and normalized in color_shortlist: return normalized if normalized: for candidate in color_shortlist: if candidate in normalized or normalized in candidate: return candidate return "" def _resolve_color_fallback(color_shortlist: list[str], color_resonance_scores: list[dict[str, Any]]) -> str: for entry in color_resonance_scores: color = str(entry.get("color") or "").strip() if color: return color if color_shortlist: return color_shortlist[0] return "black" def _normalize_reference_ids( raw_ids: Any, valid_ids: list[str], fallback_ids: list[str], limit: int = 4, ) -> list[str]: valid_set = {value for value in valid_ids if value} normalized: list[str] = [] if isinstance(raw_ids, list): for value in raw_ids: item_id = str(value or "").strip() if not item_id or item_id in normalized or item_id not in valid_set: continue normalized.append(item_id) if len(normalized) >= limit: return normalized for item_id in fallback_ids: if item_id and item_id not in normalized: normalized.append(item_id) if len(normalized) >= limit: break return normalized def _build_planned_query( gender: str | None, color: str, category: str, occasion: str, style_direction: str, ) -> str: parts = [ str(gender or "").strip(), color, style_direction, category, f"for {occasion.strip()}" if occasion.strip() else "", ] return " ".join(part for part in parts if part).strip() def _fallback_scraper_plan( planning_context: dict[str, Any], occasion: str, gender: str, reason: str, ) -> dict[str, Any]: allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()] color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] style_direction = str(planning_context.get("style_direction") or "occasion-aligned") resolved_target = str(planning_context.get("resolved_target_category") or "topwear") plan_gender = _normalize_scraper_gender(gender) category = allowed[0] if allowed else "shirt" color = color_shortlist[0] if color_shortlist else "black" query = _build_planned_query(plan_gender, color, category, occasion, style_direction) return { "target_category": resolved_target, "color": color, "category": category, "gender": plan_gender, "style_direction": style_direction, "reference_item_ids": planning_context.get("reference_item_ids", []), "query": query, "reason": reason, "source": "fallback", } def _generate_scraper_plan_with_nemotron( occasion: str, gender: str, preferences: str, user_prompt: str, target_category: str, filters: dict[str, Any], max_products: int | None, store: str, strict_nemotron: bool = False, ) -> dict[str, Any]: wardrobe_snapshot = _wardrobe_metadata_snapshot() requested_target = _normalize_target_category(target_category) safe_filters = filters if isinstance(filters, dict) else {} planning_context = _build_scraper_planning_context( wardrobe_snapshot=wardrobe_snapshot, requested_target_category=requested_target, occasion=occasion, gender=gender, filters=safe_filters, ) prompt = _build_scraper_plan_prompt( occasion=occasion, gender=gender, preferences=preferences, user_prompt=user_prompt, target_category=requested_target, filters=safe_filters, wardrobe_snapshot=wardrobe_snapshot, planning_context=planning_context, max_products=max_products, store=store, ) plan_source = "nemotron" plan_error: str | None = None try: model_text = run_scraper_planner_text_inference(prompt, max_tokens=SCRAPER_PLANNER_MAX_TOKENS) parsed = _recover_scraper_plan_from_text( model_text=model_text, planning_context=planning_context, occasion=occasion, gender=gender, ) if not isinstance(parsed, dict) or not parsed: raise NvidiaPayloadError("Nemotron scraper planner returned empty or invalid JSON payload.") except Exception as exc: if strict_nemotron: raise NvidiaPayloadError(f"Nemotron planner unavailable: {exc}") from exc plan_source = "fallback" plan_error = str(exc) parsed = _fallback_scraper_plan( planning_context=planning_context, occasion=occasion, gender=gender, reason=( "Live Nemotron query planning was unavailable, so a deterministic fallback planner was used." ), ) resolved_target = _normalize_target_category( parsed.get("target_category") or planning_context.get("resolved_target_category") or requested_target ) if resolved_target == "both": resolved_target = str(planning_context.get("resolved_target_category") or "topwear") allowed = _allowed_categories( target_category=resolved_target, occasion_bucket=str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)), ) occasion_bucket = str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)) color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] color_resonance_scores = [ entry for entry in (planning_context.get("color_resonance_scores") or []) if isinstance(entry, dict) ] color = _normalize_planned_color(parsed.get("color"), color_shortlist) if not color: color = _resolve_color_fallback(color_shortlist, color_resonance_scores) if color_shortlist and color not in color_shortlist: color = _resolve_color_fallback(color_shortlist, color_resonance_scores) explicit_category = _extract_explicit_category_from_prompt(user_prompt, allowed) category = explicit_category or _normalize_planned_category(parsed.get("category"), allowed) requested_gender = _normalize_scraper_gender(gender) inferred_or_planned_gender = _normalize_scraper_gender( str(parsed.get("gender") or planning_context.get("gender_preference") or gender or "") ) plan_gender = requested_gender or inferred_or_planned_gender style_direction = str(parsed.get("style_direction") or planning_context.get("style_direction") or "occasion-aligned") valid_reference_ids = [ str(item.get("id") or "") for item in (wardrobe_snapshot.get("items") or []) if isinstance(item, dict) ] reference_item_ids = _normalize_reference_ids( raw_ids=parsed.get("reference_item_ids"), valid_ids=valid_reference_ids, fallback_ids=[str(value) for value in planning_context.get("reference_item_ids", [])], limit=4, ) query = str(parsed.get("query") or "").strip() if not query or explicit_category is not None or requested_gender is not None: query = _build_planned_query(plan_gender, color, category, occasion, style_direction) resonance_lead = color_resonance_scores[0] if color_resonance_scores else {} default_grounding = ( f"Selected {color} from DB metadata resonance: " f"reference_count={int(resonance_lead.get('reference_count') or 0)}, " f"global_count={int(resonance_lead.get('global_count') or 0)}." if resonance_lead else "Selected color using wardrobe metadata shortlist and reference-slot compatibility." ) wardrobe_grounding = str(parsed.get("wardrobe_grounding") or default_grounding) reason = str(parsed.get("reason") or "Nemotron generated a wardrobe-aware shopping query.") recommendation = ScraperRecommendation( color=color, category=category, gender=plan_gender, ) store_key = _normalize_store_name(store or SCRAPER_DEFAULT_STORE or "nike") search_urls = _build_store_search_urls_from_query( query, store=store_key, gender=plan_gender, wardrobe_items=list(wardrobe_snapshot.get("items") or []), requested_category=requested_target, ) if not search_urls: search_urls = _build_store_search_urls_from_recommendation( recommendation, store=store_key, occasion=occasion, ) intermediate_steps: list[dict[str, Any]] = [ { "step": "plan", "store": store_key, "query": query, "target_category": resolved_target, "color": color, "category": category, } ] generated_urls = list(dict.fromkeys(search_urls)) scrape_limit = max_products if isinstance(max_products, int) and max_products > 0 else 12 scraped_products: list[dict[str, Any]] = [] fallback_products: list[dict[str, Any]] = [] seen_links: set[str] = set() scrape_errors: list[str] = [] intermediate_steps.append( { "step": "url_generation", "query": query, "url_count": len(generated_urls), "total_urls": len(generated_urls), } ) if not generated_urls: intermediate_steps.append( { "step": "diagnostic", "message": "Planner succeeded but no search URLs were generated.", "attempted_url_count": 0, } ) else: for index, search_url in enumerate(generated_urls): if len(scraped_products) >= scrape_limit: break try: extracted = _extract_store_product_summaries(search_url=search_url, store=store_key) except requests.RequestException as exc: error_message = f"url[{index + 1}] scrape failed: {exc}" scrape_errors.append(error_message) intermediate_steps.append( { "step": "scrape", "query": query, "url_count": len(generated_urls), "new_products": 0, "total_products": len(scraped_products), "errors": [error_message], } ) continue new_products = 0 for product in extracted: if not isinstance(product, dict): continue item_link = str(product.get("item_link") or "").strip() if not item_link or item_link in seen_links: continue seen_links.add(item_link) if _is_relevant_scraped_product( product=product, target_slot=resolved_target, planned_category=category, planned_color=color, occasion_bucket=occasion_bucket, ): scraped_products.append(product) new_products += 1 else: fallback_products.append(product) if len(scraped_products) >= scrape_limit: break intermediate_steps.append( { "step": "scrape", "query": query, "url_count": len(generated_urls), "new_products": new_products, "total_products": len(scraped_products), } ) if not scraped_products and fallback_products and not _color_keywords_for_relevance(color): scraped_products = fallback_products[:scrape_limit] intermediate_steps.append( { "step": "scrape_fallback", "query": query, "new_products": len(scraped_products), "total_products": len(scraped_products), "message": "Used non-filtered scrape fallback because strict relevance filtering returned no products.", } ) elif not scraped_products and fallback_products: intermediate_steps.append( { "step": "scrape_filter", "query": query, "rejected_products": len(fallback_products), "total_products": 0, "message": "Rejected scraped products because none matched the planned color and category.", } ) query_plan_payload = { "color": color, "category": category, "gender": plan_gender, "query": query, "final_query": query, "reason": reason, "wardrobe_grounding": wardrobe_grounding, "source": plan_source, "target_category": resolved_target, "style_direction": style_direction, "occasion_bucket": planning_context.get("occasion_bucket"), "reference_item_ids": reference_item_ids, "color_resonance_scores": color_resonance_scores[:4], } enriched_products = _enrich_scraper_products_with_matches( products=scraped_products, query_plan=query_plan_payload, wardrobe_snapshot=wardrobe_snapshot, target_category=resolved_target, occasion=occasion, ) product_urls = [ str(product.get("item_link") or "").strip() for product in enriched_products if str(product.get("item_link") or "").strip() ] response_payload: dict[str, Any] = { "occasion": occasion, "gender": gender, "preferences": preferences, "wardrobe_snapshot": wardrobe_snapshot, "query_plan": query_plan_payload, "store": store_key, "search_urls": generated_urls, "product_urls": product_urls, "products": enriched_products, "count": len(enriched_products), "intermediate_steps": intermediate_steps, "final_query": query, "plan_source": plan_source, "plan_error": plan_error, "scrape_error": "; ".join(scrape_errors) if scrape_errors else None, "target_category": resolved_target, } response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload) return _store_scraper_runtime_result(response_payload) def _build_shopping_suggestions_from_scraper( occasion: str, target_category: str, gender_preference: str, filters: dict[str, Any], max_results: int, store: str, ) -> dict[str, Any]: preferences = ", ".join( str(value) for value in [ *([item for item in filters.get("preferred_colors", []) if item]), *([item for item in filters.get("preferred_patterns", []) if item]), *([item for item in filters.get("preferred_styles", []) if item]), *([item for item in filters.get("preferred_fabrics", []) if item]), *([item for item in filters.get("preferred_fits", []) if item]), *([item for item in filters.get("preferred_seasons", []) if item]), *([item for item in filters.get("include_keywords", []) if item]), ] ) runtime_payload = _generate_scraper_plan_with_nemotron( occasion=occasion, gender=gender_preference, preferences=preferences, user_prompt=preferences, target_category=target_category, filters=filters, max_products=max_results, store=store, ) products = list(runtime_payload.get("products") or []) query_plan = dict(runtime_payload.get("query_plan") or {}) suggestion_items: list[dict[str, Any]] = [] for index, product in enumerate(products[:max_results]): item_link = str(product.get("item_link") or "").strip() if not item_link: continue suggestion_items.append( { "target_category": target_category if target_category != "both" else ("topwear" if index % 2 == 0 else "bottomwear"), "title": str(product.get("name") or query_plan.get("query") or "Suggested Product"), "url": item_link, "image_url": str(product.get("image_url") or ""), "store": str(runtime_payload.get("store") or store or "nike").title(), "match_score": max(65, 95 - index * 4), "reason": str(product.get("reason") or query_plan.get("reason") or "Nemotron generated a wardrobe-aware shopping query."), "product_category": str(query_plan.get("category") or "shopping"), "color": str(query_plan.get("color") or "black"), "pattern": "solid", "search_query": str(query_plan.get("query") or occasion), "scrape_status": "live", "scrape_error": None, "product_gender": str(query_plan.get("gender") or gender_preference or "unknown") or "unknown", "matched_with_slot": str(product.get("matched_with_slot") or ("bottomwear" if target_category == "topwear" else "topwear")), "matched_garments": product.get("matched_garments") or [], } ) return { "occasion": occasion, "target_category": target_category, "gender_preference": gender_preference, "search_filters": filters, "suggestions": suggestion_items, "error": None if suggestion_items else "No live scraper results were returned.", "saved_json_path": runtime_payload.get("saved_json_path", ""), "runtime_id": runtime_payload.get("runtime_id", ""), "query_plan": query_plan, } app = FastAPI(title="Wardrobe Classifier API", version="2.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) _OUTFIT_GRID_SESSIONS: dict[str, dict[str, Any]] = {} @asynccontextmanager async def lifespan(app: FastAPI): init_db() cache_purge_expired() yield app.router.lifespan_context = lifespan CLASSIFICATION_PROMPT = """You are a fashion expert analyzing a garment image for a wardrobe assistant app. Carefully examine the primary clothing item in the image and return only a valid JSON object. Rules: - Focus only on the dominant foreground garment. Ignore the person's face/body, background, hangers, mannequins, and room objects. - If multiple garments are visible, classify the single most prominent garment only. - Be specific with colors (for example, "Navy Blue" instead of "Blue", "Olive Green" instead of "Green"). - If any attribute is not clearly visible, use "Unknown". - For "color", include all visible colors in one string (for example, "White with Black stripes"). - Do not include any explanation, markdown, or text outside the JSON object. Return this exact JSON structure: { "type": "e.g. T-Shirt / Jeans / Dress / Jacket / Hoodie / Shorts / Saree / Kurta", "category": "Topwear / Bottomwear / Footwear / Outerwear / Ethnic / Accessories", "color": "exact specific color name not basic colours", "pattern": "Solid / Striped / Checkered / Floral / Printed / Graphic / Embroidered / Tie-Dye", "fabric": "Cotton / Denim / Wool / Polyester / Silk / Linen / Leather / Unknown", "fit": "Slim / Regular / Oversized / Fitted / Relaxed / Unknown", "occasion": "Casual / Formal / Sports / Party / Work / Ethnic", "season": "Summer / Winter / Monsoon / All-Season" }""" NVIDIA_INVOKE_URL = os.getenv( "NVIDIA_INVOKE_URL", "https://integrate.api.nvidia.com/v1/chat/completions", ) NVIDIA_MODEL_ID = os.getenv("NVIDIA_MODEL_ID", "qwen/qwen3.5-122b-a10b") NVIDIA_MAX_TOKENS = int(os.getenv("NVIDIA_MAX_TOKENS", "16384")) NVIDIA_REASONING_MAX_TOKENS = int(os.getenv("NVIDIA_REASONING_MAX_TOKENS", "16384")) NVIDIA_TEMPERATURE = float(os.getenv("NVIDIA_TEMPERATURE", "0.60")) NVIDIA_TOP_P = float(os.getenv("NVIDIA_TOP_P", "0.95")) NVIDIA_TIMEOUT_SECONDS = int(os.getenv("NVIDIA_TIMEOUT_SECONDS", "180")) NVIDIA_MAX_RETRIES = int(os.getenv("NVIDIA_MAX_RETRIES", "3")) NVIDIA_RETRY_BACKOFF_SECONDS = float(os.getenv("NVIDIA_RETRY_BACKOFF_SECONDS", "0.8")) NVIDIA_ENABLE_THINKING = str(os.getenv("NVIDIA_ENABLE_THINKING", "false")).strip().lower() == "true" NVIDIA_IMAGE_MAX_DIM = int(os.getenv("NVIDIA_IMAGE_MAX_DIM", "1400")) NVIDIA_FALLBACK_MODEL_IDS = [ model_id.strip() for model_id in os.getenv("NVIDIA_FALLBACK_MODEL_IDS", "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning").split(",") if model_id.strip() ] NVIDIA_API_KEY_MISSING_DETAIL = "NVIDIA_API_KEY is not configured on this Space." class NvidiaGatewayError(RuntimeError): def __init__(self, message: str, status_code: int = 502) -> None: super().__init__(message) self.status_code = status_code class NvidiaPayloadError(RuntimeError): pass class NvidiaTokenLimitError(NvidiaPayloadError): pass def _nvidia_api_key() -> str: return os.getenv("NVIDIA_API_KEY", "").strip() def _candidate_model_ids(primary_model_id: str) -> list[str]: model_ids = [primary_model_id, *NVIDIA_FALLBACK_MODEL_IDS] deduped: list[str] = [] seen: set[str] = set() for model_id in model_ids: key = model_id.strip() if not key or key in seen: continue deduped.append(key) seen.add(key) return deduped def _is_degraded_function_error(exc: Exception) -> bool: if not isinstance(exc, NvidiaGatewayError): return False return "DEGRADED function cannot be invoked" in str(exc) OUTFIT_GRID_CELL_SIZE = int(os.getenv("OUTFIT_GRID_CELL_SIZE", "224")) OUTFIT_GRID_LABEL_HEIGHT = int(os.getenv("OUTFIT_GRID_LABEL_HEIGHT", "28")) OUTFIT_GRID_PADDING = int(os.getenv("OUTFIT_GRID_PADDING", "12")) OUTFIT_GRID_FETCH_TIMEOUT_SECONDS = int(os.getenv("OUTFIT_GRID_FETCH_TIMEOUT_SECONDS", "12")) OUTFIT_GRID_SESSION_TTL_SECONDS = int(os.getenv("OUTFIT_GRID_SESSION_TTL_SECONDS", "3600")) OUTFIT_GRID_SESSION_DIR = Path( os.getenv("OUTFIT_GRID_SESSION_DIR", str(Path(tempfile.gettempdir()) / "wardrobe-grid-sessions")) ) OUTFIT_GRID_MAX_TOP_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_TOP_ITEMS", "4")) OUTFIT_GRID_MAX_BOTTOM_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_BOTTOM_ITEMS", "4")) OUTFIT_ANCHOR_MIN_SCORE = int(os.getenv("OUTFIT_ANCHOR_MIN_SCORE", "45")) OUTFIT_TEXT_PRESELECT_ENABLED = str(os.getenv("OUTFIT_TEXT_PRESELECT_ENABLED", "false")).strip().lower() == "true" OUTFIT_TEXT_SELECTOR_MAX_TOKENS = int(os.getenv("OUTFIT_TEXT_SELECTOR_MAX_TOKENS", "400")) OUTFIT_AI_MAX_TOKENS = int(os.getenv("OUTFIT_AI_MAX_TOKENS", "4096")) OUTFIT_TEXT_SELECTOR_NAME = "nemotron-text-preselect-v1" OUTFIT_AI_SCORER_NAME = "ai-grid-v1" OUTFIT_FALLBACK_SCORER_NAME = "fallback-current-v1" OUTFIT_GRID_SCORING_PROMPT_TEMPLATE = """You are an expert multimodal outfit matching engine. Task: Evaluate every valid outfit combination shown in the attached wardrobe grid image and rank the best outfits for the given context. Grid semantics: - Row 1 contains topwear only. - Row 2 contains bottomwear only. - Row 3 contains optional "Others" items (footwear, accessories, outerwear, or uncategorized garments). If Row 3 is absent, ignore this slot. - Each cell is labeled with a coordinate like 1:1, 1:2, 2:1, 2:2, 3:1. - A valid outfit is exactly one Row 1 item plus one Row 2 item, and optionally one Row 3 item. User context: - Occasion: {occasion} - Region: {region} - Weather JSON: {weather_json} - User profile JSON: {user_profile_json} - Anchor mode: {anchor_mode} - Locked top index: {locked_top_index} - Locked bottom index: {locked_bottom_index} - Locked other index: {locked_other_index} - Anchor item JSON: {anchor_item_json} Wardrobe metadata map: {metadata_json} Scoring rubric: - occasion relevance - color harmony - pattern compatibility - fit alignment - style coherence - seasonal/contextual appropriateness Instructions: 1. Use both the composite image and the metadata map together. 2. Treat the locked item as the fixed styling anchor whenever Anchor mode is not "none". 3. Evaluate all {combination_count} possible valid combinations exactly once before ranking. 4. If a locked top, locked bottom, or locked other index is provided, only consider combinations containing that index. 5. Assign each retained combination a final score from 0 to 100 and this score breakdown: occasion, color, pattern, fit, style, season 6. Return only valid JSON. No markdown and no prose outside JSON. 7. Return at most the top {top_k} outfits in descending score order. 8. When a Row 3 item is part of the outfit, include its cell in "other_index". If no Row 3 item is used, set "other_index" to null. Return this exact JSON shape: {{ "recommendations": [ {{ "top_index": "1:1", "bottom_index": "2:1", "other_index": "3:1", "score": 92, "breakdown": {{ "occasion": 94, "color": 91, "pattern": 89, "fit": 90, "style": 93, "season": 88 }}, "reason": "Max 15 words. Short user-facing explanation grounded in visual + metadata evidence.", "tip": "Max 10 words. One concise styling tip." }} ] }}""" # --------------------------------------------------------------------------- # Model helpers # --------------------------------------------------------------------------- def _image_to_data_url(image: Image.Image) -> str: if NVIDIA_IMAGE_MAX_DIM > 0: image = image.copy() image.thumbnail((NVIDIA_IMAGE_MAX_DIM, NVIDIA_IMAGE_MAX_DIM), Image.Resampling.LANCZOS) buffer = io.BytesIO() image.save(buffer, format="PNG") image_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") return f"data:image/png;base64,{image_b64}" def _extract_text_from_nvidia_content(content: Any) -> str: if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for chunk in content: if isinstance(chunk, str): parts.append(chunk) continue if not isinstance(chunk, dict): continue for key in ("text", "content", "value"): value = chunk.get(key) if isinstance(value, str) and value: parts.append(value) break return "".join(parts).strip() if isinstance(content, dict): for key in ("text", "content", "value"): value = content.get(key) if isinstance(value, str) and value: return value return "" def _extract_nvidia_text(payload: dict[str, Any]) -> str: try: choice = payload["choices"][0] message = choice["message"] except (KeyError, IndexError, TypeError) as exc: raise NvidiaPayloadError(f"Unexpected NVIDIA API response shape: {payload}") from exc content = message.get("content") extracted_content = _extract_text_from_nvidia_content(content) if extracted_content: return extracted_content reasoning_content = message.get("reasoning_content") extracted_reasoning_content = _extract_text_from_nvidia_content(reasoning_content) if extracted_reasoning_content: return extracted_reasoning_content reasoning = message.get("reasoning") if isinstance(reasoning, str) and reasoning.strip(): return reasoning.strip() if choice.get("finish_reason") == "length": raise NvidiaTokenLimitError( "NVIDIA response hit max_tokens before final content was produced." ) raise NvidiaPayloadError( "Unexpected NVIDIA message payload: " f"finish_reason={choice.get('finish_reason')}, " f"message_keys={list(message.keys())}, " f"content={content!r}, " f"reasoning_content={reasoning_content!r}" ) def _extract_streamed_nvidia_text(response: requests.Response) -> str: chunks: list[str] = [] for raw_line in response.iter_lines(decode_unicode=True): if not raw_line: continue line = raw_line.strip() if not line or not line.startswith("data:"): continue data = line[5:].strip() if not data or data == "[DONE]": continue try: payload = json.loads(data) except json.JSONDecodeError: continue try: choice = payload["choices"][0] except (KeyError, IndexError, TypeError): continue delta = choice.get("delta") or {} if isinstance(delta, dict): content = _extract_text_from_nvidia_content(delta.get("content")) if content: chunks.append(content) reasoning = _extract_text_from_nvidia_content(delta.get("reasoning_content")) if reasoning: chunks.append(reasoning) alt_reasoning = _extract_text_from_nvidia_content(delta.get("reasoning")) if alt_reasoning: chunks.append(alt_reasoning) message = choice.get("message") or {} if isinstance(message, dict): final_content = _extract_text_from_nvidia_content(message.get("content")) if final_content: chunks.append(final_content) final_reasoning = _extract_text_from_nvidia_content(message.get("reasoning_content")) if final_reasoning: chunks.append(final_reasoning) text = "".join(chunks).strip() if text: return text raise NvidiaPayloadError("NVIDIA stream ended without returning any content.") def run_nvidia_inference(image: Image.Image, prompt: str, max_tokens: int = NVIDIA_MAX_TOKENS) -> str: api_key = _nvidia_api_key() if not api_key: raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) last_error: Exception | None = None for model_id in _candidate_model_ids(NVIDIA_MODEL_ID): current_max_tokens = max_tokens while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: payload = { "model": model_id, "messages": [ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": _image_to_data_url(image), }, }, ], } ], "max_tokens": current_max_tokens, "temperature": NVIDIA_TEMPERATURE, "top_p": NVIDIA_TOP_P, "stream": True, } if NVIDIA_ENABLE_THINKING: payload["chat_template_kwargs"] = {"enable_thinking": True} headers = { "Authorization": f"Bearer {api_key}", "Accept": "text/event-stream", "Content-Type": "application/json", } last_error = None for attempt in range(NVIDIA_MAX_RETRIES + 1): try: response = requests.post( NVIDIA_INVOKE_URL, headers=headers, json=payload, timeout=NVIDIA_TIMEOUT_SECONDS, ) if response.status_code in {429, 500, 502, 503, 504}: raise NvidiaGatewayError( f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", status_code=503 if response.status_code == 429 else 502, ) if response.status_code >= 400: raise NvidiaGatewayError( f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", status_code=502, ) return _extract_streamed_nvidia_text(response) except NvidiaTokenLimitError as exc: last_error = exc break except (requests.RequestException, NvidiaGatewayError) as exc: last_error = exc if _is_degraded_function_error(exc): break if attempt >= NVIDIA_MAX_RETRIES: break time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) except NvidiaPayloadError: raise if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) continue if _is_degraded_function_error(last_error or Exception()): print(f"[nvidia] model degraded, trying fallback model: {model_id}") break if isinstance(last_error, NvidiaGatewayError): raise last_error if isinstance(last_error, requests.RequestException): raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error if last_error is not None: raise last_error break if isinstance(last_error, Exception): raise NvidiaGatewayError( f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}", status_code=502, ) from last_error raise NvidiaGatewayError( "NVIDIA API request failed after exhausting reasoning token budget.", status_code=502, ) def run_nvidia_text_inference(prompt: str, max_tokens: int = OUTFIT_TEXT_SELECTOR_MAX_TOKENS) -> str: api_key = _nvidia_api_key() if not api_key: raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) last_error: Exception | None = None for model_id in _candidate_model_ids(NVIDIA_MODEL_ID): current_max_tokens = max_tokens while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: payload = { "model": model_id, "messages": [ { "role": "user", "content": prompt, } ], "max_tokens": current_max_tokens, "temperature": NVIDIA_TEMPERATURE, "top_p": NVIDIA_TOP_P, "stream": True, } if NVIDIA_ENABLE_THINKING: payload["chat_template_kwargs"] = {"enable_thinking": True} headers = { "Authorization": f"Bearer {api_key}", "Accept": "text/event-stream", "Content-Type": "application/json", } last_error = None for attempt in range(NVIDIA_MAX_RETRIES + 1): try: response = requests.post( NVIDIA_INVOKE_URL, headers=headers, json=payload, timeout=NVIDIA_TIMEOUT_SECONDS, ) if response.status_code in {429, 500, 502, 503, 504}: raise NvidiaGatewayError( f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", status_code=503 if response.status_code == 429 else 502, ) if response.status_code >= 400: raise NvidiaGatewayError( f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", status_code=502, ) return _extract_streamed_nvidia_text(response) except NvidiaTokenLimitError as exc: last_error = exc break except (requests.RequestException, NvidiaGatewayError) as exc: last_error = exc if _is_degraded_function_error(exc): break if attempt >= NVIDIA_MAX_RETRIES: break time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) except NvidiaPayloadError: raise if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) continue if _is_degraded_function_error(last_error or Exception()): print(f"[nvidia] model degraded, trying fallback model: {model_id}") break if isinstance(last_error, NvidiaGatewayError): raise last_error if isinstance(last_error, requests.RequestException): raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error if last_error is not None: raise last_error break if isinstance(last_error, Exception): raise NvidiaGatewayError( f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}", status_code=502, ) from last_error raise NvidiaGatewayError( "NVIDIA API request failed after exhausting reasoning token budget.", status_code=502, ) def parse_json_from_text(text: str) -> dict[str, Any]: if not text: return {} stripped = text.strip() try: return json.loads(stripped) except json.JSONDecodeError: s, e = stripped.find("{"), stripped.rfind("}") if s != -1 and e != -1 and e > s: try: return json.loads(stripped[s:e + 1]) except json.JSONDecodeError: pass return {} def normalize_specs(specs: dict[str, Any]) -> dict[str, str]: return { "type": str(specs.get("type", "Unknown")), "category": str(specs.get("category", "Unknown")), "color": str(specs.get("color", "Unknown")), "pattern": str(specs.get("pattern", "Unknown")), "fabric": str(specs.get("fabric", "Unknown")), "fit": str(specs.get("fit", "Unknown")), "occasion": str(specs.get("occasion", "Unknown")), "season": str(specs.get("season", "Unknown")), } def _clamp_score(value: Any, fallback: int = 0) -> int: if isinstance(value, str): match = re.search(r"-?\d+(?:\.\d+)?", value) value = match.group(0) if match else value try: score = int(round(float(value))) except (TypeError, ValueError): score = fallback return max(0, min(100, score)) def _safe_metadata_item(item: dict[str, Any]) -> dict[str, Any]: return { "id": item.get("id"), "slot": item.get("type"), "type": item.get("type"), "category": item.get("category"), "color": item.get("color"), "pattern": item.get("pattern"), "fabric": item.get("fabric"), "fit": item.get("fit"), "occasion": item.get("style"), "season": item.get("season"), } def _placeholder_grid_tile(index_label: str, item: dict[str, Any], tile_size: int) -> Image.Image: tile = Image.new("RGB", (tile_size, tile_size), (245, 245, 245)) draw = ImageDraw.Draw(tile) font = ImageFont.load_default() draw.rectangle((8, 8, tile_size - 8, tile_size - 8), outline=(180, 180, 180), width=2) draw.text((14, 14), index_label, fill=(40, 40, 40), font=font) draw.text((14, 36), str(item.get("category") or "Unknown")[:24], fill=(80, 80, 80), font=font) draw.text((14, 54), str(item.get("color") or "Unknown")[:24], fill=(80, 80, 80), font=font) return tile def _load_grid_tile(image_url: str, index_label: str, item: dict[str, Any]) -> Image.Image: if not image_url or image_url.startswith("memory://"): return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) parsed = urlparse(image_url) if parsed.scheme not in {"http", "https"}: return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) try: req = Request( image_url, headers={"User-Agent": "Mozilla/5.0", "Accept": "image/*,*/*;q=0.8", "Referer": image_url}, ) with urlopen(req, timeout=OUTFIT_GRID_FETCH_TIMEOUT_SECONDS) as resp: tile = Image.open(io.BytesIO(resp.read())).convert("RGB") return ImageOps.fit( tile, (OUTFIT_GRID_CELL_SIZE, OUTFIT_GRID_CELL_SIZE), method=Image.Resampling.LANCZOS, centering=(0.5, 0.5), ) except Exception: return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) def _prune_outfit_grid_sessions() -> None: now = time.time() for session_id, record in list(_OUTFIT_GRID_SESSIONS.items()): if now - float(record.get("created_at") or 0) <= OUTFIT_GRID_SESSION_TTL_SECONDS: continue path = record.get("image_path") if isinstance(path, str): try: Path(path).unlink(missing_ok=True) except Exception: pass _OUTFIT_GRID_SESSIONS.pop(session_id, None) def _build_outfit_grid_session( tops: list[dict[str, Any]], bottoms: list[dict[str, Any]], others: list[dict[str, Any]], occasion: str, user_profile: dict[str, Any] | None, weather: dict[str, Any] | None, region: str, ) -> dict[str, Any]: _prune_outfit_grid_sessions() session_id = str(uuid.uuid4()) rows = [ (1, "Topwear", tops), (2, "Bottomwear", bottoms), ] if others: rows.append((3, "Others", others)) columns = max(len(tops), len(bottoms), len(others), 1) cell_span = OUTFIT_GRID_CELL_SIZE + 2 * OUTFIT_GRID_PADDING row_span = OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT + 2 * OUTFIT_GRID_PADDING canvas = Image.new("RGB", (columns * cell_span, len(rows) * row_span), (255, 255, 255)) draw = ImageDraw.Draw(canvas) font = ImageFont.load_default() metadata_map: dict[str, dict[str, Any]] = {} item_lookup: dict[str, dict[str, Any]] = {} def draw_row(items: list[dict[str, Any]], row_index: int, row_name: str) -> None: if not items: y = (row_index - 1) * row_span + 10 draw.text((12, y), f"Row {row_index}: {row_name} (no items)", fill=(120, 120, 120), font=font) return for col_index, item in enumerate(items, start=1): index_label = f"{row_index}:{col_index}" x0 = (col_index - 1) * cell_span + OUTFIT_GRID_PADDING y0 = (row_index - 1) * row_span + OUTFIT_GRID_PADDING tile = _load_grid_tile(str(item.get("image_url") or ""), index_label, item) canvas.paste(tile, (x0, y0)) label_box = ( x0, y0 + OUTFIT_GRID_CELL_SIZE, x0 + OUTFIT_GRID_CELL_SIZE, y0 + OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT, ) draw.rectangle(label_box, fill=(20, 20, 20)) label_text = f"{index_label} | {str(item.get('color') or 'Unknown')[:18]} {str(item.get('category') or 'Unknown')[:18]}" draw.text((label_box[0] + 6, label_box[1] + 8), label_text[:36], fill=(255, 255, 255), font=font) metadata_map[index_label] = _safe_metadata_item(item) item_lookup[index_label] = item for row_index, row_name, row_items in rows: draw_row(row_items, row_index, row_name) OUTFIT_GRID_SESSION_DIR.mkdir(parents=True, exist_ok=True) image_path = OUTFIT_GRID_SESSION_DIR / f"{session_id}.png" canvas.save(image_path, format="PNG") _OUTFIT_GRID_SESSIONS[session_id] = { "image_path": str(image_path), "metadata_map": metadata_map, "created_at": time.time(), "occasion": occasion, "user_profile": user_profile or {}, "weather": weather or {}, "region": region, } return { "session_id": session_id, "image": canvas, "metadata_map": metadata_map, "item_lookup": item_lookup, "image_path": str(image_path), } def _grid_scoring_prompt( metadata_map: dict[str, dict[str, Any]], occasion: str, weather: dict[str, Any] | None, user_profile: dict[str, Any] | None, region: str, anchor_mode: str, anchor_item: dict[str, Any] | None, locked_top_index: str | None, locked_bottom_index: str | None, locked_other_index: str | None, combination_count: int, top_k: int, ) -> str: compact_json = lambda value: json.dumps(value, ensure_ascii=True, separators=(",", ":")) return OUTFIT_GRID_SCORING_PROMPT_TEMPLATE.format( occasion=occasion or "casual", region=region or "global", weather_json=compact_json(weather or {}), user_profile_json=compact_json(user_profile or {}), anchor_mode=anchor_mode, locked_top_index=locked_top_index or "None", locked_bottom_index=locked_bottom_index or "None", locked_other_index=locked_other_index or "None", anchor_item_json=compact_json(_safe_metadata_item(anchor_item or {})), metadata_json=compact_json(metadata_map), combination_count=combination_count, top_k=top_k, ) def _fallback_rule_recommendations( occasion: str, case_name: str, tops: list[dict[str, Any]], bottoms: list[dict[str, Any]], others: list[dict[str, Any]], top_k: int, include_pair_outfits: bool = True, include_other_outfits: bool = True, ) -> dict[str, Any]: outfits: list[dict[str, Any]] = [] if include_pair_outfits: for top in tops: for bottom in bottoms: scored = score_pair_full(top, bottom, occasion, other=None) outfits.append(_build_outfit_payload(scored, top, bottom, rank=0, other=None)) if include_other_outfits: for other in others: # Score standalone Others as complete outfits rather than as add-ons. scored = score_pair_full(other, other, occasion, other=None) base_reason = str(scored.get("reason") or "") scored["reason"] = f"{other.get('color', 'This')} {other.get('category', 'item')} works as a complete standalone look." if base_reason: scored["reason"] = f"{scored['reason']} {base_reason}" scored["tip"] = "Use footwear and accessories only to complement this single-piece outfit." outfits.append(_build_outfit_payload(scored, None, None, rank=0, other=other)) outfits.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) for index, outfit in enumerate(outfits[:top_k], start=1): outfit["rank"] = index if case_name == "D" and outfits: return { "occasion": occasion, "case": case_name, "selected_outfit_score": outfits[0], "recommendations": [], "improved_recommendations": outfits[1:top_k], "total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0), "notice": None, "engine_version": "scoring-v2", } return { "occasion": occasion, "case": case_name, "selected_outfit_score": None, "recommendations": outfits[:top_k], "improved_recommendations": [], "total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0), "notice": None, "engine_version": "scoring-v2", } def _occasion_prefers_standalone_others(occasion: str) -> bool: occasion_n = _norm(occasion) if not occasion_n: return False return any( keyword in occasion_n for keyword in [ "wedding", "festive", "ethnic", "ceremony", "engagement", "reception", "sangeet", "haldi", "mehndi", ] ) def _merge_standalone_others_for_priority_occasions( result: dict[str, Any], occasion: str, others: list[dict[str, Any]], top_k: int, ) -> dict[str, Any]: if not others or not _occasion_prefers_standalone_others(occasion): return result if str(result.get("case") or "").upper() == "D": return result standalone_payload = _fallback_rule_recommendations( occasion=occasion, case_name=str(result.get("case") or "A"), tops=[], bottoms=[], others=others, top_k=top_k, include_pair_outfits=False, include_other_outfits=True, ) standalone = [ dict(outfit) for outfit in (standalone_payload.get("recommendations") or []) if isinstance(outfit, dict) ] if not standalone: return result for outfit in standalone: base_score = int(outfit.get("score") or 0) boosted_score = min(100, max(base_score, 78) + 12) outfit["score"] = boosted_score breakdown = outfit.get("breakdown") if isinstance(breakdown, dict): breakdown["occasion"] = min(100, max(int(breakdown.get("occasion") or 0), 92)) recommendations = [ dict(outfit) for outfit in (result.get("recommendations") or []) if isinstance(outfit, dict) ] merged = [*recommendations, *standalone] merged.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) merged = merged[:top_k] for index, outfit in enumerate(merged, start=1): outfit["rank"] = index result["recommendations"] = merged result["total_combinations_checked"] = int(result.get("total_combinations_checked") or 0) + len(others) return result def _current_fallback_recommendations( wardrobe_items: list[dict[str, Any]], occasion: str, top_selected: dict[str, Any] | None, bottom_selected: dict[str, Any] | None, other_selected: dict[str, Any] | None, weather: dict[str, Any] | None, user_profile: dict[str, Any] | None, region: str, top_k: int, candidate_pool: int, diversity_lambda: float, case_name: str, tops: list[dict[str, Any]], bottoms: list[dict[str, Any]], others: list[dict[str, Any]], ) -> dict[str, Any]: def _strip_optional_slots(outfit: dict[str, Any]) -> dict[str, Any]: cleaned = dict(outfit) cleaned.pop("shoes", None) cleaned.pop("accessory", None) if cleaned.get("top") and cleaned.get("bottom"): cleaned.pop("other", None) return cleaned try: result = get_recommendation_service().recommend( wardrobe_items=wardrobe_items, occasion=occasion, top_selected=top_selected, bottom_selected=bottom_selected, other_selected=other_selected, weather=weather, user_profile=user_profile, region=region, top_k=top_k, candidate_pool=candidate_pool, diversity_lambda=diversity_lambda, ) if isinstance(result.get("selected_outfit_score"), dict): result["selected_outfit_score"] = _strip_optional_slots(result["selected_outfit_score"]) recommendations = [ _strip_optional_slots(entry) for entry in (result.get("recommendations") or []) if isinstance(entry, dict) ] improved = [ _strip_optional_slots(entry) for entry in (result.get("improved_recommendations") or []) if isinstance(entry, dict) ] result["recommendations"] = recommendations[:top_k] result["improved_recommendations"] = improved[:top_k] result = _merge_standalone_others_for_priority_occasions( result=result, occasion=occasion, others=others, top_k=top_k, ) print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=fashion_ai") return result except Exception as exc: print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=scoring-v2 reason={exc!r}") return _fallback_rule_recommendations( occasion, case_name, tops, bottoms, others, top_k, include_pair_outfits=other_selected is None, include_other_outfits=bool(others), ) def _extract_grid_indices(outfit_raw: dict[str, Any]) -> tuple[str, str, str]: top_index = str( outfit_raw.get("top_index") or outfit_raw.get("top") or outfit_raw.get("top_cell") or "" ).strip() bottom_index = str( outfit_raw.get("bottom_index") or outfit_raw.get("bottom") or outfit_raw.get("bottom_cell") or "" ).strip() other_index = str( outfit_raw.get("other_index") or outfit_raw.get("other") or outfit_raw.get("other_cell") or "" ).strip() if top_index and bottom_index and other_index: return top_index, bottom_index, other_index combo_text = str( outfit_raw.get("combination") or outfit_raw.get("combo") or outfit_raw.get("pair") or "" ) matches = re.findall(r"[123]:\d+", combo_text) top_match = next((value for value in matches if value.startswith("1:")), "") bottom_match = next((value for value in matches if value.startswith("2:")), "") other_match = next((value for value in matches if value.startswith("3:")), "") return top_index or top_match, bottom_index or bottom_match, other_index or other_match def _rank_anchor_candidates( anchor_item: dict[str, Any], candidates: list[dict[str, Any]], occasion: str, anchor_is_top: bool, ) -> list[dict[str, Any]]: ranked: list[tuple[int, dict[str, Any]]] = [] for candidate in candidates: scored = ( score_pair_full(anchor_item, candidate, occasion) if anchor_is_top else score_pair_full(candidate, anchor_item, occasion) ) ranked.append((int(scored.get("score") or 0), candidate)) ranked.sort(key=lambda pair: pair[0], reverse=True) compatible = [item for score, item in ranked if score >= OUTFIT_ANCHOR_MIN_SCORE] if compatible: return compatible return [item for _, item in ranked] def _text_selector_item_payload(item: dict[str, Any], index: int) -> dict[str, Any]: return { "index": index, "id": str(item.get("id") or ""), "type": str(item.get("type") or ""), "category": str(item.get("category") or "Unknown"), "color": str(item.get("color") or "Unknown"), "pattern": str(item.get("pattern") or "Unknown"), "fabric": str(item.get("fabric") or "Unknown"), "fit": str(item.get("fit") or "Unknown"), "season": str(item.get("season") or "Unknown"), "style": str(item.get("style") or "Unknown"), "occasion": str(item.get("occasion") or "Unknown"), } def _select_grid_candidates_with_text_ai( candidates: list[dict[str, Any]], slot_name: str, occasion: str, limit: int, anchor_mode: str, anchor_item: dict[str, Any] | None, ) -> list[dict[str, Any]]: if len(candidates) <= limit: return candidates if not OUTFIT_TEXT_PRESELECT_ENABLED: return candidates[:limit] candidate_payload = [ _text_selector_item_payload(item, idx + 1) for idx, item in enumerate(candidates) ] anchor_payload = _text_selector_item_payload(anchor_item, 0) if anchor_item else None prompt = ( "You are a fashion ranking assistant for candidate preselection.\n" f"Occasion: {occasion}\n" f"Slot to rank: {slot_name}\n" f"Anchor mode: {anchor_mode}\n" f"Keep exactly {limit} items if possible (or fewer when candidates are fewer).\n\n" "Goal:\n" "Select the strongest candidates for downstream outfit matching using only textual metadata.\n" "Prioritize occasion relevance, compatibility with anchor context, and diversity in color/pattern/style.\n\n" "Rules:\n" "1. Choose only IDs from the provided candidates.\n" "2. No duplicate IDs.\n" "3. Return strictly valid JSON and nothing else.\n" "4. Prefer candidates that maximize useful pairing variety, not near-duplicates.\n\n" "Return EXACT shape:\n" "{\"selected_ids\":[\"id1\",\"id2\"]}\n\n" f"Anchor item JSON:\n{json.dumps(anchor_payload, ensure_ascii=True)}\n\n" f"Candidate items JSON:\n{json.dumps(candidate_payload, ensure_ascii=True)}" ) try: response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS) parsed_payload = parse_json_from_text(response_text) selected_ids_raw = parsed_payload.get("selected_ids") if isinstance(parsed_payload, dict) else None selected_indices_raw = parsed_payload.get("selected_indices") if isinstance(parsed_payload, dict) else None selected_ids: list[str] = [] if isinstance(selected_ids_raw, list): selected_ids = [ str(value).strip() for value in selected_ids_raw if str(value).strip() ] if not selected_ids and isinstance(selected_indices_raw, list): for raw_index in selected_indices_raw: try: index = int(raw_index) except (TypeError, ValueError): continue if 1 <= index <= len(candidates): selected_ids.append(str(candidates[index - 1].get("id") or "").strip()) elif 0 <= index < len(candidates): selected_ids.append(str(candidates[index].get("id") or "").strip()) id_to_item = { str(item.get("id") or "").strip(): item for item in candidates } selected: list[dict[str, Any]] = [] seen: set[str] = set() for item_id in selected_ids: if not item_id or item_id in seen: continue item = id_to_item.get(item_id) if not item: continue selected.append(item) seen.add(item_id) if len(selected) >= limit: break if len(selected) < limit: for item in candidates: item_id = str(item.get("id") or "").strip() if item_id in seen: continue selected.append(item) seen.add(item_id) if len(selected) >= limit: break selected = selected[:limit] if len(selected) == limit: print( f"[outfit-preselect] slot={slot_name} mode={anchor_mode} " f"in={len(candidates)} kept={len(selected)} strategy={OUTFIT_TEXT_SELECTOR_NAME}" ) return selected raise NvidiaPayloadError( f"Text preselector returned insufficient candidates: requested={limit} got={len(selected)}" ) except Exception as exc: print( f"[outfit-preselect] slot={slot_name} mode={anchor_mode} " f"in={len(candidates)} kept={limit} strategy=head reason={exc!r}" ) return candidates[:limit] _WEDDING_ETHNIC_TOPWEAR_CATEGORIES = { "kurta", "sherwani", "nehru jacket", "bandhgala", "achkan", "saree blouse", "lehenga choli", "anarkali", } def _filter_garments_for_wedding(items: list[dict[str, Any]]) -> list[dict[str, Any]]: """Use a single LLM call to select only wedding-appropriate garments and reassign their slot type for outfit scoring. Ethnic/formal pieces currently typed as 'others' (sherwanis, kurtas, blazers, etc.) are promoted to 'topwear' so they enter the scoring grid as primary tops instead of only appearing in the "lock other" section.""" if not items: return items garment_summaries = [] for item in items: garment_summaries.append({ "id": str(item.get("id") or ""), "type": str(item.get("type") or ""), "category": str(item.get("category") or "Unknown"), "color": str(item.get("color") or "Unknown"), "pattern": str(item.get("pattern") or "Unknown"), "fabric": str(item.get("fabric") or "Unknown"), "fit": str(item.get("fit") or "Unknown"), "style": str(item.get("style") or "Unknown"), "occasion": str(item.get("occasion") or "Unknown"), }) prompt = ( "You are a fashion expert selecting garments suitable for a WEDDING occasion.\n\n" "Wedding-appropriate garments include:\n" "- Ethnic wear: kurtas, sherwanis, nehru jackets, bandhgalas, achkans, sarees, lehengas, churidars, dhotis, salwar kameez\n" "- Formal/semi-formal: blazers, suit jackets, dress shirts, formal trousers, dress pants, waistcoats\n" "- Elegant pieces: silk fabrics, embroidered items, brocade, velvet\n" "- Accessories that work for weddings: formal shoes, stoles, dupattas\n\n" "Garments to EXCLUDE:\n" "- Casual everyday items: plain t-shirts, basic tees, hoodies, sweatshirts, joggers, denim jeans, gym shorts\n" "- Sportswear, athleisure, distressed or ripped clothing\n" "- Very casual items like graphic tees, cargo shorts, flip-flops\n" "- Basic casual shirts UNLESS they are clearly formal/dress shirts\n\n" "If a garment is borderline (e.g. a smart chino, a dark polo), include it only if it could realistically " "be part of a wedding guest outfit.\n\n" "For each selected garment, also decide its ROLE in a wedding outfit:\n" "- \"topwear\": items worn on the upper body as the primary piece (kurtas, sherwanis, blazers, dress shirts, suit jackets, nehru jackets, waistcoats)\n" "- \"bottomwear\": items worn on the lower body (formal trousers, churidars, dress pants, lehengas)\n" "- \"others\": layering pieces, accessories, footwear, dupattas, stoles\n\n" "Return strictly valid JSON and nothing else.\n\n" "Return EXACT shape:\n" "{\"selected\":[{\"id\":\"...\",\"role\":\"topwear|bottomwear|others\"},...]}\n\n" f"Garments JSON:\n{json.dumps(garment_summaries, ensure_ascii=True)}" ) try: response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS) parsed = parse_json_from_text(response_text) selected_raw = parsed.get("selected") if isinstance(parsed, dict) else None if not isinstance(selected_raw, list): print("[wedding-filter] LLM returned no selected list, using all items") return items role_map: dict[str, str] = {} for entry in selected_raw: if not isinstance(entry, dict): continue item_id = str(entry.get("id") or "").strip() role = str(entry.get("role") or "").strip().lower() if item_id and role in {"topwear", "bottomwear", "others"}: role_map[item_id] = role if not role_map: print("[wedding-filter] LLM returned empty selections, using all items") return items filtered: list[dict[str, Any]] = [] for item in items: item_id = str(item.get("id") or "").strip() if item_id not in role_map: continue promoted = {**item, "type": role_map[item_id]} filtered.append(promoted) print( f"[wedding-filter] in={len(items)} kept={len(filtered)} " f"tops={sum(1 for i in filtered if i.get('type') == 'topwear')} " f"bottoms={sum(1 for i in filtered if i.get('type') == 'bottomwear')} " f"others={sum(1 for i in filtered if i.get('type') == 'others')}" ) return filtered if filtered else items except Exception as exc: print(f"[wedding-filter] LLM call failed reason={exc!r}, using all items") return items def _resolve_outfit_grid_sources( wardrobe_items: list[dict[str, Any]], occasion: str, top_selected: dict[str, Any] | None, bottom_selected: dict[str, Any] | None, other_selected: dict[str, Any] | None, ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], str, dict[str, Any] | None]: all_tops = [i for i in wardrobe_items if i.get("type") == "topwear"] all_bottoms = [i for i in wardrobe_items if i.get("type") == "bottomwear"] all_others = [ i for i in wardrobe_items if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"} ] other_pool = [other_selected] if other_selected else [] if top_selected and bottom_selected: anchor_mode = "locked-top+locked-bottom+locked-other" if other_selected else "locked-top+locked-bottom" return [top_selected], [bottom_selected], other_pool, anchor_mode, other_selected or top_selected if top_selected: ranked_bottoms = _rank_anchor_candidates( anchor_item=top_selected, candidates=all_bottoms, occasion=occasion, anchor_is_top=True, ) anchor_mode = "locked-top+locked-other" if other_selected else "locked-top" return [top_selected], ranked_bottoms, other_pool, anchor_mode, other_selected or top_selected if bottom_selected: ranked_tops = _rank_anchor_candidates( anchor_item=bottom_selected, candidates=all_tops, occasion=occasion, anchor_is_top=False, ) anchor_mode = "locked-bottom+locked-other" if other_selected else "locked-bottom" return ranked_tops, [bottom_selected], other_pool, anchor_mode, other_selected or bottom_selected if other_selected: return all_tops, all_bottoms, [], "locked-other", other_selected return all_tops, all_bottoms, [], "none", None def _normalize_ai_outfit_payload( parsed_payload: dict[str, Any], item_lookup: dict[str, dict[str, Any]], occasion: str, case_name: str, top_k: int, total_combinations: int, session_id: str, ) -> dict[str, Any]: raw_recommendations = parsed_payload.get("recommendations") if not isinstance(raw_recommendations, list): raw_recommendations = parsed_payload.get("outfits") if not isinstance(raw_recommendations, list): raw_recommendations = parsed_payload.get("top_outfits") if not isinstance(raw_recommendations, list): raise NvidiaPayloadError(f"AI outfit scorer returned no recommendation list: {parsed_payload}") recommendations: list[dict[str, Any]] = [] for raw_entry in raw_recommendations: if not isinstance(raw_entry, dict): continue top_index, bottom_index, other_index = _extract_grid_indices(raw_entry) top_item = item_lookup.get(top_index) bottom_item = item_lookup.get(bottom_index) other_item = item_lookup.get(other_index) if other_index else None if not top_item or not bottom_item: continue base_breakdown = raw_entry.get("breakdown") if isinstance(raw_entry.get("breakdown"), dict) else {} breakdown = { "color": _clamp_score(base_breakdown.get("color"), 70), "style": _clamp_score(base_breakdown.get("style"), 70), "occasion": _clamp_score(base_breakdown.get("occasion"), 70), "fit": _clamp_score(base_breakdown.get("fit"), 70), "pattern": _clamp_score(base_breakdown.get("pattern"), 70), "season": _clamp_score(base_breakdown.get("season"), 70), } recommendation = { "rank": 0, "score": _clamp_score(raw_entry.get("score"), 0), "breakdown": breakdown, "reason": str(raw_entry.get("reason") or "AI-generated outfit recommendation."), "tip": str(raw_entry.get("tip") or "Use matching accessories to complete this look."), "combination": f"{top_index} + {bottom_index}" + (f" + {other_index}" if other_item else ""), "grid_session_id": session_id, "top": { "id": top_item.get("id"), "category": top_item.get("category"), "color": top_item.get("color"), "image_url": top_item.get("image_url", ""), }, "bottom": { "id": bottom_item.get("id"), "category": bottom_item.get("category"), "color": bottom_item.get("color"), "image_url": bottom_item.get("image_url", ""), }, } if other_item: recommendation["other"] = { "id": other_item.get("id"), "category": other_item.get("category"), "color": other_item.get("color"), "image_url": other_item.get("image_url", ""), } recommendations.append(recommendation) if not recommendations: raise NvidiaPayloadError(f"AI outfit scorer returned no valid index-mapped recommendations: {parsed_payload}") recommendations.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) recommendations = recommendations[:top_k] for index, outfit in enumerate(recommendations, start=1): outfit["rank"] = index if case_name == "D": return { "occasion": occasion, "case": case_name, "selected_outfit_score": recommendations[0], "recommendations": [], "improved_recommendations": recommendations, "total_combinations_checked": total_combinations, "notice": None, "grid_session_id": session_id, "engine_version": OUTFIT_AI_SCORER_NAME, } return { "occasion": occasion, "case": case_name, "selected_outfit_score": None, "recommendations": recommendations, "improved_recommendations": [], "total_combinations_checked": total_combinations, "notice": None, "grid_session_id": session_id, "engine_version": OUTFIT_AI_SCORER_NAME, } def _recommend_outfits_with_ai_grid( wardrobe_items: list[dict[str, Any]], occasion: str, top_selected: dict[str, Any] | None, bottom_selected: dict[str, Any] | None, other_selected: dict[str, Any] | None, weather: dict[str, Any] | None, user_profile: dict[str, Any] | None, region: str, top_k: int, case_name: str, ) -> dict[str, Any]: top_source, bottom_source, other_source, anchor_mode, anchor_item = _resolve_outfit_grid_sources( wardrobe_items=wardrobe_items, occasion=occasion, top_selected=top_selected, bottom_selected=bottom_selected, other_selected=other_selected, ) top_pool_count = len(top_source) bottom_pool_count = len(bottom_source) if not top_selected and len(top_source) > OUTFIT_GRID_MAX_TOP_ITEMS: top_source = _select_grid_candidates_with_text_ai( candidates=top_source, slot_name="topwear", occasion=occasion, limit=OUTFIT_GRID_MAX_TOP_ITEMS, anchor_mode=anchor_mode, anchor_item=anchor_item, ) if not bottom_selected and len(bottom_source) > OUTFIT_GRID_MAX_BOTTOM_ITEMS: bottom_source = _select_grid_candidates_with_text_ai( candidates=bottom_source, slot_name="bottomwear", occasion=occasion, limit=OUTFIT_GRID_MAX_BOTTOM_ITEMS, anchor_mode=anchor_mode, anchor_item=anchor_item, ) if not top_source or not bottom_source: raise NvidiaPayloadError("AI outfit scorer requires at least one topwear and one bottomwear item.") grid_session = _build_outfit_grid_session( tops=top_source, bottoms=bottom_source, others=other_source, occasion=occasion, user_profile=user_profile, weather=weather, region=region, ) combination_count = len(top_source) * len(bottom_source) * ( len(other_source) if other_selected else (len(other_source) + 1 if other_source else 1) ) prompt = _grid_scoring_prompt( metadata_map=grid_session["metadata_map"], occasion=occasion, weather=weather, user_profile=user_profile, region=region, anchor_mode=anchor_mode, anchor_item=anchor_item, locked_top_index="1:1" if top_selected else None, locked_bottom_index="2:1" if bottom_selected else None, locked_other_index="3:1" if other_selected else None, combination_count=combination_count, top_k=top_k, ) if not other_source: prompt = ( f"{prompt}\n\n" "Important: This grid contains only Row 1 (Topwear) and Row 2 (Bottomwear). " "Always set other_index to null." ) print( f"[outfit-grid] mode={anchor_mode} session={grid_session['session_id']} " f"tops_in={top_pool_count} bottoms_in={bottom_pool_count} " f"rows=2 tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} " f"combinations={combination_count}" ) model_text = run_nvidia_inference(grid_session["image"], prompt, max_tokens=OUTFIT_AI_MAX_TOKENS) parsed_payload = parse_json_from_text(model_text) if not parsed_payload: raise NvidiaPayloadError(f"AI outfit scorer returned unparsable JSON: {model_text[:500]}") result = _normalize_ai_outfit_payload( parsed_payload=parsed_payload, item_lookup=grid_session["item_lookup"], occasion=occasion, case_name=case_name, top_k=top_k, total_combinations=combination_count, session_id=grid_session["session_id"], ) print( f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} " f"mode={anchor_mode} session={grid_session['session_id']} " f"tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} " f"combinations={combination_count}" ) return result def _raise_http_error(exc: Exception) -> NoReturn: print("Classification request failed:", repr(exc)) traceback.print_exc() if isinstance(exc, RuntimeError) and str(exc) == NVIDIA_API_KEY_MISSING_DETAIL: raise HTTPException(status_code=503, detail=NVIDIA_API_KEY_MISSING_DETAIL) from exc if isinstance(exc, NvidiaGatewayError): raise HTTPException(status_code=exc.status_code, detail=str(exc)) from exc if isinstance(exc, NvidiaPayloadError): raise HTTPException(status_code=502, detail=str(exc)) from exc if isinstance(exc, requests.RequestException): raise HTTPException(status_code=502, detail=f"NVIDIA API request failed: {exc}") from exc raise HTTPException(status_code=500, detail=str(exc)) from exc # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/") def root() -> dict[str, str]: return {"status": "ok", "message": "Wardrobe Classifier API v2"} @app.get("/health") def health() -> dict[str, str]: return { "status": "ok", "classification_provider": "nvidia", "model": NVIDIA_MODEL_ID, "nvidia_api_configured": str(bool(_nvidia_api_key())), "nvidia_invoke_url": NVIDIA_INVOKE_URL, "engine_version": "scoring-v2", "outfit_matching_provider": "nemotron", } @app.post("/product-urls") def product_urls(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: color = str(payload.get("color") or "") category = str(payload.get("category") or "") gender = payload.get("gender") max_products = int(payload.get("max_products") or 30) store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) if not color or not category: raise HTTPException(status_code=400, detail="color and category are required") recommendation = ScraperRecommendation( color=color, category=category, gender=str(gender) if gender else None, ) try: search_urls = _build_store_search_urls_from_recommendation( recommendation, store=store, occasion="", ) products: list[dict[str, str]] = [] seen_links: set[str] = set() for search_url in search_urls: for product in _extract_store_product_summaries(search_url, store=store): item_link = str(product.get("item_link") or "").strip() if not item_link or item_link in seen_links: continue seen_links.add(item_link) products.append(product) if len(products) >= max_products: break if len(products) >= max_products: break response_payload: dict[str, Any] = { "store": store, "search_urls": search_urls, "product_urls": [item["item_link"] for item in products], "products": products, "count": len(products), } response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload) return response_payload except requests.RequestException as exc: raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc @app.post("/suggestions") @app.post("/api/suggestions") def suggestions(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: occasion = str(payload.get("occasion") or "casual") target_category = str(payload.get("target_category") or payload.get("targetCategory") or "both") gender_preference = str(payload.get("gender_preference") or payload.get("genderPreference") or "any") filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {} max_results = int(payload.get("max_results") or payload.get("maxResults") or 8) store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) if max_results < 1: raise HTTPException(status_code=400, detail="max_results must be at least 1") try: return _build_shopping_suggestions_from_scraper( occasion=occasion, target_category=target_category, gender_preference=gender_preference, filters=filters, max_results=max_results, store=store, ) except NvidiaGatewayError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc except NvidiaPayloadError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc except requests.RequestException as exc: raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc @app.post("/scraper/recommend") def scraper_recommend(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: user_prompt = str(payload.get("user_prompt") or payload.get("prompt") or "").strip() inferred = _infer_structured_request_from_prompt(user_prompt) inferred_target_category = _normalize_target_category(inferred.get("target_category")) occasion = str(payload.get("occasion") or inferred.get("occasion") or "casual") if _norm(occasion) in {"", "auto", "any"}: occasion = str(inferred.get("occasion") or "casual") gender = str(payload.get("gender") or inferred.get("gender") or "") payload_target_category = _normalize_target_category( payload.get("target_category") or payload.get("targetCategory") or "both" ) target_category = ( inferred_target_category if inferred_target_category in {"topwear", "bottomwear"} else payload_target_category if payload_target_category in {"topwear", "bottomwear"} else "both" ) filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {} inferred_colors = inferred.get("preferred_colors") if isinstance(inferred.get("preferred_colors"), list) else [] inferred_include = inferred.get("include_keywords") if isinstance(inferred.get("include_keywords"), list) else [] inferred_exclude = inferred.get("exclude_keywords") if isinstance(inferred.get("exclude_keywords"), list) else [] filters = { **filters, "preferred_colors": [ *([str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()]), *([str(value) for value in inferred_colors if str(value).strip()]), ], "include_keywords": [ *([str(value) for value in (filters.get("include_keywords") or []) if str(value).strip()]), *([str(value) for value in inferred_include if str(value).strip()]), ], "exclude_keywords": [ *([str(value) for value in (filters.get("exclude_keywords") or []) if str(value).strip()]), *([str(value) for value in inferred_exclude if str(value).strip()]), ], } preference_parts = [str(payload.get("preferences") or "").strip(), user_prompt] preferences = ", ".join(part for part in preference_parts if part) max_products_raw = payload.get("max_products") max_products = int(max_products_raw) if max_products_raw not in {None, ""} else None store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) if isinstance(max_products, int) and max_products < 1: raise HTTPException(status_code=400, detail="max_products must be at least 1") try: return _generate_scraper_plan_with_nemotron( occasion=occasion, gender=gender, preferences=preferences, user_prompt=user_prompt, target_category=target_category, filters=filters, max_products=max_products, store=store, strict_nemotron=True, ) except NvidiaGatewayError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc except NvidiaPayloadError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc except requests.RequestException as exc: raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc @app.get("/scraper") def scraper_page() -> Response: wardrobe_snapshot = _wardrobe_metadata_snapshot(limit=12) wardrobe_json = html_lib.escape(json.dumps(wardrobe_snapshot, ensure_ascii=True, indent=2)) html_content = f"""
Nemotron reads wardrobe metadata, builds a context-aware shopping query, and returns matching products with links, names, prices, and images.
Current items loaded from the database are used by Nemotron to shape the shopping query.
{wardrobe_json}
Run the search to generate a wardrobe-aware query.