HelloWorld0204's picture
Upload 22 files
107e093 verified
"""
app.py — Wardrobe Assistant API (v2)
Changes from v1:
- _item_store (in-memory list) replaced by SQLite via db.py
- All outfit scoring routed through scoring.py (strategic v2 model)
- _gap_suggestions now analyses the actual wardrobe instead of hardcoding items
- /feedback endpoint added for preference data collection
- Score responses include human-readable reason + tip from scoring.py
"""
from __future__ import annotations
import base64
import io
import json
import os
import re
import tempfile
import threading
import traceback
import time
import uuid
import html as html_lib
from dataclasses import replace
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, NoReturn
from urllib.error import HTTPError
from urllib.parse import parse_qs, quote_plus, unquote, urlencode, urljoin, urlparse
from urllib.request import Request, urlopen
import requests
from fastapi import Body, FastAPI, File, HTTPException, Query, Response, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image, ImageDraw, ImageFont, ImageOps
from db import (
init_db,
item_insert,
item_get_all,
item_get,
item_update,
item_delete,
feedback_record,
cache_get,
cache_set,
cache_purge_expired,
)
from scoring import (
score_pair_full,
extract_base_color,
extract_style,
extract_fit,
extract_pattern,
extract_season,
_NEUTRALS,
)
from fashion_ai import get_recommendation_service
from scraper import Recommendation as ScraperRecommendation
from scraper import (
build_search_urls_from_query as build_nike_search_urls_from_query,
build_search_urls_from_recommendation as build_nike_search_urls_from_recommendation,
extract_product_summaries as extract_nike_product_summaries,
)
from zalando_scraper import (
build_zalando_search_url,
build_zalando_search_urls_from_request,
extract_product_summaries as extract_zalando_product_summaries,
)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _norm(value: Any) -> str:
return str(value or "").strip().lower()
MATCHING_RESULT_CACHE: dict[str, dict[str, Any]] = {}
MATCHING_RESULT_CACHE_LOCK = threading.Lock()
MATCHING_RESULT_CACHE_MAX = int(os.getenv("MATCHING_RESULT_CACHE_MAX", "500"))
MATCHING_RESULT_CACHE_TTL_SECONDS = int(os.getenv("MATCHING_RESULT_CACHE_TTL_SECONDS", "86400"))
def _matching_cache_storage_key(key: str) -> str:
return f"matching:{key}"
def _normalize_cache_category(value: Any) -> str:
category = _norm(value)
if category in {"topwear", "bottomwear", "others"}:
return category
return ""
def _extract_cache_user_id(payload: dict[str, Any], wardrobe_items: list[dict[str, Any]]) -> str:
payload_user = str(payload.get("user_id") or "").strip()
if payload_user:
return payload_user
for item in wardrobe_items:
candidate = str(item.get("user_id") or "").strip()
if candidate:
return candidate
return "anonymous"
def _build_matching_cache_key(
user_id: str,
category: str,
occasion: str,
wardrobe_hash: str,
lock_signature: str = "",
) -> str:
return "|".join([
user_id.strip() or "anonymous",
_normalize_cache_category(category),
_norm(occasion) or "casual",
wardrobe_hash.strip(),
lock_signature.strip(),
])
def _matching_cache_get(key: str) -> dict[str, Any] | None:
with MATCHING_RESULT_CACHE_LOCK:
cached = MATCHING_RESULT_CACHE.get(key)
if isinstance(cached, dict):
return json.loads(json.dumps(cached))
try:
persisted = cache_get(_matching_cache_storage_key(key))
except Exception as exc:
print(f"[matching-cache] db read failed key={key} reason={exc!r}")
return None
if not isinstance(persisted, dict):
return None
snapshot = json.loads(json.dumps(persisted))
with MATCHING_RESULT_CACHE_LOCK:
MATCHING_RESULT_CACHE[key] = snapshot
while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX:
oldest_key = next(iter(MATCHING_RESULT_CACHE))
MATCHING_RESULT_CACHE.pop(oldest_key, None)
return json.loads(json.dumps(snapshot))
def _matching_cache_set(key: str, payload: dict[str, Any]) -> None:
snapshot = json.loads(json.dumps(payload))
with MATCHING_RESULT_CACHE_LOCK:
MATCHING_RESULT_CACHE[key] = snapshot
while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX:
oldest_key = next(iter(MATCHING_RESULT_CACHE))
MATCHING_RESULT_CACHE.pop(oldest_key, None)
try:
cache_set(
_matching_cache_storage_key(key),
snapshot,
ttl_seconds=MATCHING_RESULT_CACHE_TTL_SECONDS,
)
except Exception as exc:
print(f"[matching-cache] db write failed key={key} reason={exc!r}")
def _build_lock_signature_from_payload(payload: dict[str, Any]) -> str:
explicit = str(payload.get("lock_signature") or "").strip()
if explicit:
return explicit
top_selected_raw = payload.get("top_selected")
bottom_selected_raw = payload.get("bottom_selected")
other_selected_raw = payload.get("other_selected")
top_id = str(top_selected_raw.get("id") or "").strip() if isinstance(top_selected_raw, dict) else ""
bottom_id = str(bottom_selected_raw.get("id") or "").strip() if isinstance(bottom_selected_raw, dict) else ""
other_id = str(other_selected_raw.get("id") or "").strip() if isinstance(other_selected_raw, dict) else ""
if not top_id and not bottom_id and not other_id:
return ""
return f"top:{top_id or '-'}|bottom:{bottom_id or '-'}|other:{other_id or '-'}"
def _infer_type(category: str) -> str:
n = _norm(category)
if any(keyword in n for keyword in ["shirt", "tee", "top", "kurta", "blouse", "hoodie", "sweater", "blazer", "jacket", "polo"]):
return "topwear"
if any(keyword in n for keyword in ["jean", "pant", "trouser", "short", "skirt", "jogger", "palazzo", "chino"]):
return "bottomwear"
return "others"
def _normalize_wardrobe_item(item: dict[str, Any]) -> dict[str, Any]:
description = item.get("description") if isinstance(item.get("description"), dict) else {}
category = str(item.get("category") or description.get("category") or description.get("type") or "Unknown")
item_type = str(item.get("type") or description.get("type") or _infer_type(category))
return {
"id": item.get("id") or str(uuid.uuid4()),
"image_url": item.get("image_url") or "",
"type": item_type,
"category": category,
"color": str(item.get("color") or description.get("color") or "Unknown"),
"pattern": str(item.get("pattern") or description.get("pattern") or "Solid"),
"fabric": str(item.get("fabric") or description.get("fabric") or "Unknown"),
"fit": str(item.get("fit") or description.get("fit") or "Unknown"),
"season": str(item.get("season") or description.get("season") or "All-Season"),
"style": str(item.get("style") or description.get("occasion") or description.get("style") or "casual"),
"occasion": str(item.get("occasion") or description.get("occasion") or "casual"),
"description": description,
}
def _build_outfit_payload(scored: dict[str, Any], top: dict[str, Any] | None, bottom: dict[str, Any] | None, rank: int, other: dict[str, Any] | None) -> dict[str, Any]:
payload = {
**scored,
"rank": rank,
"top": top,
"bottom": bottom,
}
if other is not None:
payload["other"] = other
return payload
def _gap_suggestions(wardrobe: list[dict[str, Any]], occasion: str) -> list[dict[str, Any]]:
tops = sum(1 for item in wardrobe if _norm(item.get("type")) == "topwear")
bottoms = sum(1 for item in wardrobe if _norm(item.get("type")) == "bottomwear")
suggestions: list[dict[str, Any]] = []
if tops == 0:
suggestions.append({"focus": "topwear", "suggestion": "Add a versatile topwear staple", "reason": "No topwear items found."})
if bottoms == 0:
suggestions.append({"focus": "bottomwear", "suggestion": "Add a versatile bottomwear staple", "reason": "No bottomwear items found."})
if tops and bottoms and abs(tops - bottoms) > 2:
suggestions.append({"focus": "balance", "suggestion": "Balance your wardrobe mix", "reason": "One category is much larger than the other."})
if not suggestions:
suggestions.append({"focus": "versatility", "suggestion": f"Add one {occasion} piece that can mix with existing staples", "reason": "Wardrobe is already balanced."})
return suggestions[:4]
SCRAPER_OUTPUT_DIR = Path(__file__).resolve().parent / "scraped_json"
SCRAPER_RUNTIME_RESULTS: dict[str, dict[str, Any]] = {}
SCRAPER_RUNTIME_LOCK = threading.Lock()
SCRAPER_PLANNER_MODEL_ID = os.getenv(
"SCRAPER_PLANNER_MODEL_ID",
"nvidia/nemotron-3-nano-omni-30b-a3b-reasoning",
)
SCRAPER_PLANNER_MAX_TOKENS = int(os.getenv("SCRAPER_PLANNER_MAX_TOKENS", "800"))
SCRAPER_DEFAULT_STORE = str(os.getenv("SCRAPER_DEFAULT_STORE", "nike")).strip().lower()
def _save_scraper_json_payload(prefix: str, payload: dict[str, Any]) -> str:
SCRAPER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
file_path = SCRAPER_OUTPUT_DIR / f"{prefix}_{timestamp}.json"
with file_path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=True, indent=2)
return str(file_path)
def _store_scraper_runtime_result(payload: dict[str, Any]) -> dict[str, Any]:
runtime_id = str(uuid.uuid4())
record = {
"runtime_id": runtime_id,
"created_at": _now_iso(),
**payload,
}
with SCRAPER_RUNTIME_LOCK:
SCRAPER_RUNTIME_RESULTS[runtime_id] = record
while len(SCRAPER_RUNTIME_RESULTS) > 25:
oldest_key = next(iter(SCRAPER_RUNTIME_RESULTS))
SCRAPER_RUNTIME_RESULTS.pop(oldest_key, None)
return record
def _candidate_text_model_ids(primary_model_id: str) -> list[str]:
return _candidate_model_ids(primary_model_id)
def _run_text_inference_with_model(primary_model_id: str, prompt: str, max_tokens: int) -> str:
api_key = _nvidia_api_key()
if not api_key:
raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL)
last_error: Exception | None = None
for model_id in _candidate_text_model_ids(primary_model_id):
current_max_tokens = max_tokens
while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS:
payload = {
"model": model_id,
"messages": [
{
"role": "user",
"content": prompt,
}
],
"max_tokens": current_max_tokens,
"temperature": NVIDIA_TEMPERATURE,
"top_p": NVIDIA_TOP_P,
"stream": True,
}
if NVIDIA_ENABLE_THINKING:
payload["chat_template_kwargs"] = {"enable_thinking": True}
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
}
last_error = None
for attempt in range(NVIDIA_MAX_RETRIES + 1):
try:
response = requests.post(
NVIDIA_INVOKE_URL,
headers=headers,
json=payload,
timeout=NVIDIA_TIMEOUT_SECONDS,
)
if response.status_code in {429, 500, 502, 503, 504}:
raise NvidiaGatewayError(
f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}",
status_code=503 if response.status_code == 429 else 502,
)
if response.status_code >= 400:
raise NvidiaGatewayError(
f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}",
status_code=502,
)
try:
return _extract_streamed_nvidia_text(response)
except NvidiaPayloadError as stream_exc:
# Some providers occasionally terminate SSE without final text; retry once using non-stream mode.
if "stream ended without returning any content" not in str(stream_exc).lower():
raise
non_stream_payload = {**payload, "stream": False}
non_stream_headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
"Content-Type": "application/json",
}
non_stream_response = requests.post(
NVIDIA_INVOKE_URL,
headers=non_stream_headers,
json=non_stream_payload,
timeout=NVIDIA_TIMEOUT_SECONDS,
)
if non_stream_response.status_code in {429, 500, 502, 503, 504}:
raise NvidiaGatewayError(
f"NVIDIA API transient failure {non_stream_response.status_code}: {non_stream_response.text[:500]}",
status_code=503 if non_stream_response.status_code == 429 else 502,
)
if non_stream_response.status_code >= 400:
raise NvidiaGatewayError(
f"NVIDIA API request failed with {non_stream_response.status_code}: {non_stream_response.text[:500]}",
status_code=502,
)
try:
parsed_payload = non_stream_response.json()
except ValueError as exc:
raise NvidiaPayloadError(
"NVIDIA non-stream planner response was not valid JSON."
) from exc
return _extract_nvidia_text(parsed_payload)
except NvidiaTokenLimitError as exc:
last_error = exc
break
except (requests.RequestException, NvidiaGatewayError) as exc:
last_error = exc
if _is_degraded_function_error(exc):
break
if attempt >= NVIDIA_MAX_RETRIES:
break
time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1))
except NvidiaPayloadError:
raise
if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS:
current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS)
continue
if _is_degraded_function_error(last_error or Exception()):
print(f"[nvidia] model degraded, trying fallback model: {model_id}")
break
if isinstance(last_error, NvidiaGatewayError):
raise last_error
if isinstance(last_error, requests.RequestException):
raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error
if last_error is not None:
raise last_error
break
if isinstance(last_error, Exception):
raise NvidiaGatewayError(
f"NVIDIA API request failed on all configured models ({', '.join(_candidate_text_model_ids(primary_model_id))}): {last_error}",
status_code=502,
) from last_error
raise NvidiaGatewayError(
"NVIDIA API request failed after exhausting reasoning token budget.",
status_code=502,
)
def run_scraper_planner_text_inference(prompt: str, max_tokens: int = SCRAPER_PLANNER_MAX_TOKENS) -> str:
return _run_text_inference_with_model(SCRAPER_PLANNER_MODEL_ID, prompt, max_tokens)
def _normalize_store_name(value: str | None) -> str:
normalized = _norm(value)
if normalized in {"zalando", "nike"}:
return normalized
return "nike"
def _build_store_query_from_recommendation(recommendation: ScraperRecommendation, occasion: str = "") -> str:
parts = [
str(recommendation.gender or "").strip(),
str(recommendation.color or "").strip(),
str(recommendation.category or "").strip(),
str(occasion or "").strip(),
]
return " ".join(part for part in parts if part)
def _build_store_search_urls_from_recommendation(
recommendation: ScraperRecommendation,
store: str,
occasion: str = "",
) -> list[str]:
store_key = _normalize_store_name(store)
if store_key == "zalando":
query = _build_store_query_from_recommendation(recommendation, occasion=occasion)
return [build_zalando_search_url(query=query, gender=recommendation.gender)]
return build_nike_search_urls_from_recommendation(recommendation, store=store_key)
def _build_store_search_urls_from_query(
query: str,
store: str,
gender: str | None = None,
wardrobe_items: list[dict[str, Any]] | None = None,
requested_category: str | None = None,
) -> list[str]:
store_key = _normalize_store_name(store)
if store_key == "zalando":
search_urls, _ = build_zalando_search_urls_from_request(
query=query,
gender=gender,
wardrobe_items=wardrobe_items,
requested_category=requested_category,
# URL generation should follow planner output + deterministic rules.
# GPT OSS remains reserved for post-scrape cleanup only.
completion_fn=None,
)
return search_urls
return build_nike_search_urls_from_query(query=query, store=store_key, gender=gender)
def _extract_store_product_summaries(search_url: str, store: str) -> list[dict[str, str]]:
store_key = _normalize_store_name(store)
if store_key == "zalando":
return extract_zalando_product_summaries(
search_url=search_url,
max_products=None,
use_apify=True,
postprocess=None,
)
return extract_nike_product_summaries(search_url, store=store_key)
def _wardrobe_metadata_snapshot(limit: int = 30) -> dict[str, Any]:
wardrobe = [_normalize_wardrobe_item(item) for item in item_get_all()]
counts: dict[str, int] = {}
for item in wardrobe:
key = f"{_norm(item.get('type') or 'unknown')}|{_norm(item.get('occasion') or 'unknown')}"
counts[key] = counts.get(key, 0) + 1
return {
"total_items": len(wardrobe),
"items": [
{
"id": item.get("id"),
"type": item.get("type"),
"category": item.get("category"),
"color": item.get("color"),
"pattern": item.get("pattern"),
"fit": item.get("fit"),
"season": item.get("season"),
"style": item.get("style"),
"occasion": item.get("occasion"),
}
for item in wardrobe[:limit]
],
"counts": counts,
}
def _build_scraper_plan_prompt(
occasion: str,
gender: str,
preferences: str,
user_prompt: str,
target_category: str,
filters: dict[str, Any],
wardrobe_snapshot: dict[str, Any],
planning_context: dict[str, Any],
max_products: int | None,
store: str,
) -> str:
return (
"ROLE: Senior Fashion Merchandising Strategist & Query Planner for AI Wardrobe Assistant\n\n"
f"OBJECTIVE: Generate exactly one high-precision {store.title()} shopping plan that is context-safe, occasion-safe, and wardrobe-grounded.\n\n"
"---\n\n"
"INPUTS:\n\n"
f"user_request: \"{user_prompt}\"\n"
f"occasion: \"{occasion}\"\n"
f"target_category: \"{target_category}\" // strict slot constraint\n"
f"gender: \"{gender}\"\n"
f"preferences: \"{preferences}\"\n"
f"filters: {json.dumps(filters, ensure_ascii=True)}\n"
f"max_products: {max_products if isinstance(max_products, int) and max_products > 0 else 'uncapped'}\n"
f"planning_context: {json.dumps(planning_context, ensure_ascii=True)}\n"
f"wardrobe_snapshot: {json.dumps(wardrobe_snapshot, ensure_ascii=True)}\n\n"
"---\n\n"
"EXECUTION RULES (Hard Constraints):\n\n"
"1. SLOT LOCK: target_category is immutable. If \"topwear\", NEVER output bottomwear categories (pants, shorts, joggers) and vice versa.\n\n"
"2. OCCASION VETTING: For formal | interview | work | wedding | client_meeting:\n"
" BLOCK: hoodie, sweatshirt, joggers, shorts, tank, tights, leggings, crop-top, sports-bra-as-outerwear\n"
" ALLOW: shirt, polo, blazer_outerwear, sweater, sweatshirt_structured (only if minimal branding)\n\n"
"3. PRIORITY HIERARCHY:\n"
" First: allowed_categories from planning_context\n"
" Second: color_shortlist from planning_context\n"
" Third: reference_item_ids from planning_context (anchor reasoning here)\n\n"
"4. COLOR LOGIC: Select ONE color from color_shortlist that:\n"
" - Complements majority of wardrobe_snapshot bottoms\n"
" - Suits occasion formality\n"
" - Has highest non-conflict score with existing wardrobe\n\n"
"5. If planning_context.color_resonance_scores exists, prefer the highest-scoring color unless user filters explicitly override it.\n\n"
"---\n\n"
"REASONING FRAMEWORK:\n\n"
"Step 1 — Parse wardrobe_snapshot for color frequency, gap categories, and bottom-dominant hues\n"
"Step 2 — Cross-reference planning_context.reference_items for silhouette compatibility\n"
"Step 3 — Filter through occasion veto list\n"
"Step 4 — Select optimal {category, color} pair with highest interoperability score\n"
"Step 5 — Compose commerce-optimized query: gender + color + occasion + category\n\n"
"---\n\n"
"OUTPUT SCHEMA (strict JSON, no markdown, no extra keys):\n\n"
"{\n"
' "target_category": "topwear|bottomwear",\n'
' "color": "string from color_shortlist",\n'
' "category": "string from allowed_categories post-vetting",\n'
' "gender": "men|women|unisex",\n'
' "style_direction": "formal-smart|business-casual|casual-polished|etc",\n'
' "reference_item_ids": ["array from planning_context"],\n'
' "query": "commerce-ready search string",\n'
' "wardrobe_grounding": "specific evidence from wardrobe_snapshot",\n'
' "reason": "concise strategic justification"\n'
"}\n"
)
def _recover_scraper_plan_from_text(
model_text: str,
planning_context: dict[str, Any],
occasion: str,
gender: str,
) -> dict[str, Any]:
text = str(model_text or "").strip()
if not text:
return {}
parsed = parse_json_from_text(text)
if isinstance(parsed, dict) and parsed:
return parsed
allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()]
colors = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()]
lowered = text.lower()
category = ""
for candidate in allowed:
if candidate.lower() in lowered:
category = candidate
break
if not category and allowed:
category = allowed[0]
color = ""
for candidate in colors:
if candidate.lower() in lowered:
color = candidate
break
if not color and colors:
color = colors[0]
inferred_gender = _normalize_scraper_gender(gender)
if "women" in lowered:
inferred_gender = "women"
elif "men" in lowered:
inferred_gender = "men"
elif "unisex" in lowered:
inferred_gender = "unisex"
query = ""
query_match = re.search(r'"query"\s*:\s*"([^"]+)"', text, flags=re.IGNORECASE)
if query_match:
query = query_match.group(1).strip()
if not query and category and color:
query = _build_planned_query(
inferred_gender,
color,
category,
occasion,
str(planning_context.get("style_direction") or "occasion-aligned"),
)
if not category or not color or not query:
return {}
return {
"target_category": planning_context.get("resolved_target_category", "topwear"),
"color": color,
"category": category,
"gender": inferred_gender,
"style_direction": planning_context.get("style_direction", "occasion-aligned"),
"reference_item_ids": planning_context.get("reference_item_ids", []),
"query": query,
"reason": "Recovered Nemotron planner output from semi-structured response.",
"source": "nemotron",
}
def _normalize_scraper_gender(value: str | None) -> str | None:
normalized = str(value or "").strip().lower()
if normalized in {"men", "male", "man", "mens"}:
return "men"
if normalized in {"women", "female", "woman", "womens"}:
return "women"
if normalized in {"unisex", "any", "all"}:
return "unisex"
return None
def _normalize_target_category(value: Any) -> str:
normalized = _norm(value)
if normalized in {"topwear", "top", "upper", "tops"}:
return "topwear"
if normalized in {"bottomwear", "bottom", "lower", "bottoms"}:
return "bottomwear"
return "both"
_PROMPT_TO_TARGET_HINTS = {
"topwear": {
"top", "topwear", "shirt", "blazer", "jacket", "polo", "tee", "t-shirt", "kurta", "upper",
},
"bottomwear": {
"bottom", "bottomwear", "trouser", "trousers", "pants", "jeans", "shorts", "joggers", "lower",
},
}
_PROMPT_TO_OCCASION_HINTS: dict[str, set[str]] = {
"formal": {"formal", "interview", "office", "work", "business", "meeting", "wedding"},
"party": {"party", "festive", "diwali", "celebration", "date", "ethnic"},
"sports": {"sports", "gym", "workout", "training", "running", "run", "active"},
"casual": {"casual", "daily", "everyday", "weekend", "outing"},
}
_PROMPT_COLOR_TERMS = [
"black", "white", "navy", "blue", "grey", "gray", "beige", "olive", "green", "brown",
"khaki", "cream", "maroon", "charcoal", "tan",
]
def _infer_structured_request_from_prompt(user_prompt: str) -> dict[str, Any]:
normalized = _norm(user_prompt)
if not normalized:
return {
"target_category": "both",
"occasion": "",
"gender": "",
"preferred_colors": [],
"include_keywords": [],
"exclude_keywords": [],
}
target_category = "both"
top_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["topwear"] if token in normalized)
bottom_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["bottomwear"] if token in normalized)
if top_hits > bottom_hits and top_hits > 0:
target_category = "topwear"
elif bottom_hits > top_hits and bottom_hits > 0:
target_category = "bottomwear"
occasion = ""
for bucket, tokens in _PROMPT_TO_OCCASION_HINTS.items():
if any(token in normalized for token in tokens):
occasion = bucket
break
gender = ""
if any(token in normalized for token in {" men", "male", "man", " mens"}):
gender = "men"
elif any(token in normalized for token in {" women", "female", "woman", " womens"}):
gender = "women"
elif "unisex" in normalized:
gender = "unisex"
preferred_colors: list[str] = []
for color in _PROMPT_COLOR_TERMS:
if color in normalized and color not in preferred_colors:
preferred_colors.append(color)
include_keywords: list[str] = []
for keyword in ["formal", "structured", "minimal", "smart", "elegant", "tailored"]:
if keyword in normalized and keyword not in include_keywords:
include_keywords.append(keyword)
exclude_keywords: list[str] = []
for keyword in ["hoodie", "oversized", "ripped", "distressed", "athleisure"]:
if f"avoid {keyword}" in normalized or f"no {keyword}" in normalized or f"without {keyword}" in normalized:
exclude_keywords.append(keyword)
return {
"target_category": target_category,
"occasion": occasion,
"gender": gender,
"preferred_colors": preferred_colors,
"include_keywords": include_keywords,
"exclude_keywords": exclude_keywords,
}
def _occasion_bucket(value: str) -> str:
normalized = _norm(value)
if any(token in normalized for token in {"formal", "interview", "office", "work", "business", "wedding", "meeting"}):
return "formal"
if any(token in normalized for token in {"party", "festive", "diwali", "celebration", "ethnic", "date"}):
return "party"
if any(token in normalized for token in {"sports", "gym", "active", "training", "run", "running"}):
return "sports"
return "casual"
def _top_terms(values: list[str], limit: int = 6) -> list[str]:
counts: dict[str, int] = {}
for value in values:
key = _norm(value)
if not key or key == "unknown":
continue
counts[key] = counts.get(key, 0) + 1
return [key for key, _ in sorted(counts.items(), key=lambda pair: pair[1], reverse=True)[:limit]]
def _rank_color_resonance(
slot_colors: dict[str, list[str]],
reference_slot: str,
preferred_colors: list[str],
occasion_bucket: str,
) -> list[dict[str, Any]]:
reference_counts: dict[str, int] = {}
global_counts: dict[str, int] = {}
for raw_color in slot_colors.get(reference_slot, []):
normalized = extract_base_color(raw_color or "")
if not normalized or normalized == "unknown":
continue
reference_counts[normalized] = reference_counts.get(normalized, 0) + 1
for raw_color in slot_colors.get("topwear", []) + slot_colors.get("bottomwear", []):
normalized = extract_base_color(raw_color or "")
if not normalized or normalized == "unknown":
continue
global_counts[normalized] = global_counts.get(normalized, 0) + 1
preferred_set = {
extract_base_color(value or "")
for value in preferred_colors
if extract_base_color(value or "")
}
neutral_colors = ["navy", "black", "white", "grey", "beige"]
candidate_pool: list[str] = []
for candidate in [
*preferred_colors,
*[key for key, _ in sorted(reference_counts.items(), key=lambda pair: pair[1], reverse=True)],
*[key for key, _ in sorted(global_counts.items(), key=lambda pair: pair[1], reverse=True)],
*neutral_colors,
]:
normalized = extract_base_color(candidate or "")
if not normalized or normalized == "unknown" or normalized in candidate_pool:
continue
candidate_pool.append(normalized)
formal_boost_colors = {"navy", "black", "white", "grey", "charcoal", "beige"}
sports_boost_colors = {"black", "white", "grey", "navy", "blue", "red", "green"}
ranked: list[dict[str, Any]] = []
for color in candidate_pool:
reference_count = reference_counts.get(color, 0)
global_count = global_counts.get(color, 0)
preferred_bonus = 2 if color in preferred_set else 0
occasion_bonus = 0
if occasion_bucket == "formal" and color in formal_boost_colors:
occasion_bonus = 1
elif occasion_bucket == "sports" and color in sports_boost_colors:
occasion_bonus = 1
score = (reference_count * 3) + global_count + preferred_bonus + occasion_bonus
ranked.append(
{
"color": color,
"score": score,
"reference_count": reference_count,
"global_count": global_count,
"preferred": color in preferred_set,
}
)
ranked.sort(
key=lambda item: (
int(item.get("score") or 0),
int(item.get("reference_count") or 0),
int(item.get("global_count") or 0),
),
reverse=True,
)
return ranked
SCRAPER_CATEGORY_POLICY: dict[str, dict[str, list[str]]] = {
"topwear": {
"formal": ["shirt", "polo", "jacket"],
"party": ["shirt", "jacket", "polo"],
"sports": ["jersey", "t-shirt", "hoodie"],
"casual": ["shirt", "t-shirt", "polo", "jacket", "hoodie"],
},
"bottomwear": {
"formal": ["trousers", "pants"],
"party": ["trousers", "pants", "jeans"],
"sports": ["joggers", "shorts", "tights", "leggings"],
"casual": ["jeans", "pants", "shorts", "joggers", "trousers"],
},
}
SCRAPER_FORMAL_DISALLOWED = {
"hoodie", "sweatshirt", "joggers", "shorts", "tank top", "tights", "leggings",
}
SCRAPER_RELEVANCE_EXCLUDE_TOKENS = {
"sock", "socks", "trunk", "trunks", "boxer", "brief", "briefs", "underwear",
"bra", "bralette", "panty", "panties", "bikini", "swim", "swimsuit",
"belt", "cap", "hat", "beanie", "wallet", "bag", "backpack", "watch",
"shoe", "sneaker", "boot", "sandals", "slippers",
}
SCRAPER_CATEGORY_KEYWORDS: dict[str, set[str]] = {
"shirt": {"shirt", "formal shirt", "oxford", "button-down", "button up"},
"polo": {"polo"},
"jacket": {"jacket", "blazer", "suit jacket", "sport coat", "coat"},
"t-shirt": {"t-shirt", "tee", "crew neck"},
"hoodie": {"hoodie"},
"trousers": {"trouser", "trousers", "tailored"},
"pants": {"pant", "pants", "chino"},
"jeans": {"jeans", "denim"},
"shorts": {"shorts"},
"joggers": {"jogger", "joggers"},
}
def _allowed_categories(target_category: str, occasion_bucket: str) -> list[str]:
target = target_category if target_category in {"topwear", "bottomwear"} else "topwear"
categories = list(SCRAPER_CATEGORY_POLICY.get(target, {}).get(occasion_bucket, []))
if not categories:
categories = list(SCRAPER_CATEGORY_POLICY[target]["casual"])
if occasion_bucket == "formal":
categories = [category for category in categories if category not in SCRAPER_FORMAL_DISALLOWED]
return categories
def _resolve_target_category(requested_target: str, wardrobe_snapshot: dict[str, Any]) -> str:
if requested_target in {"topwear", "bottomwear"}:
return requested_target
counts = wardrobe_snapshot.get("counts") if isinstance(wardrobe_snapshot.get("counts"), dict) else {}
top_count = sum(value for key, value in counts.items() if key.startswith("topwear|"))
bottom_count = sum(value for key, value in counts.items() if key.startswith("bottomwear|"))
if top_count <= bottom_count:
return "topwear"
return "bottomwear"
def _product_text_for_relevance(product: dict[str, Any]) -> str:
name = str(product.get("name") or "")
url = str(product.get("item_link") or "")
color = str(product.get("color") or "")
brand = str(product.get("brand") or "")
return _norm(f"{name} {color} {brand} {url}")
SCRAPER_COLOR_KEYWORDS: dict[str, set[str]] = {
"black": {"black", "jet black"},
"white": {"white", "bright white", "off white", "off-white"},
"navy": {"navy", "dark blue", "dk blue", "dress blues", "moonlit ocean", "midnight blue"},
"blue": {"blue", "navy", "dark blue", "dk blue", "dress blues", "ice blue", "light blue", "skyway", "moonlit ocean"},
"grey": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"},
"gray": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"},
"beige": {"beige", "sand", "tan", "stone", "morel", "oatmeal", "cornstalk", "cream", "camel"},
"brown": {"brown", "tan", "morel"},
"olive": {"olive", "khaki"},
"green": {"green", "olive", "khaki"},
"red": {"red", "brick red", "winetasting", "wine"},
"maroon": {"maroon", "burgundy", "wine", "winetasting"},
}
def _color_keywords_for_relevance(color: str) -> set[str]:
normalized = extract_base_color(color or "") or _norm(color)
if not normalized or normalized == "unknown":
return set()
return SCRAPER_COLOR_KEYWORDS.get(normalized, {normalized})
def _matches_planned_color(product: dict[str, Any], planned_color: str) -> bool:
keywords = _color_keywords_for_relevance(planned_color)
if not keywords:
return True
text = _product_text_for_relevance(product)
return any(keyword in text for keyword in keywords)
def _is_relevant_scraped_product(
product: dict[str, Any],
target_slot: str,
planned_category: str,
planned_color: str,
occasion_bucket: str,
) -> bool:
text = _product_text_for_relevance(product)
if not text:
return False
if any(token in text for token in SCRAPER_RELEVANCE_EXCLUDE_TOKENS):
return False
planned = _norm(planned_category)
planned_keywords = SCRAPER_CATEGORY_KEYWORDS.get(planned, {planned} if planned else set())
if planned_keywords and not any(keyword in text for keyword in planned_keywords):
return False
if not _matches_planned_color(product, planned_color):
return False
if target_slot == "topwear":
topwear_terms = {"shirt", "polo", "blazer", "jacket", "coat", "t-shirt", "tee", "hoodie"}
if not any(term in text for term in topwear_terms):
return False
if target_slot == "bottomwear":
bottom_terms = {"trousers", "pants", "jeans", "joggers", "shorts"}
if not any(term in text for term in bottom_terms):
return False
if occasion_bucket == "formal":
formal_blocked = {"t-shirt", "tee", "hoodie", "jogger", "shorts", "sport"}
if any(token in text for token in formal_blocked):
return False
return True
def _complementary_slot(slot: str) -> str:
return "bottomwear" if slot == "topwear" else "topwear"
def _format_matched_label(item: dict[str, Any]) -> str:
color = str(item.get("color") or "").strip().lower()
category = str(item.get("category") or item.get("type") or "item").strip().lower()
if color and category:
return f"{color} {category}"
return color or category or "item"
def _build_product_match_context(
product: dict[str, Any],
query_plan: dict[str, Any],
wardrobe_snapshot: dict[str, Any],
target_category: str,
occasion: str,
) -> dict[str, Any]:
product_slot = target_category if target_category in {"topwear", "bottomwear"} else "topwear"
matching_slot = _complementary_slot(product_slot)
wardrobe_items = [
_normalize_wardrobe_item(item)
for item in (wardrobe_snapshot.get("items") or [])
if isinstance(item, dict)
]
matching_items = [item for item in wardrobe_items if _norm(item.get("type")) == matching_slot]
product_name = str(product.get("name") or "Suggested product").strip()
product_category = str(query_plan.get("category") or product_slot).strip() or product_slot
product_color = str(query_plan.get("color") or "unknown").strip() or "unknown"
product_style = str(query_plan.get("style_direction") or query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual"
product_stub = {
"id": str(product.get("item_link") or product_name or "product"),
"type": product_slot,
"category": product_category,
"color": product_color,
"pattern": "solid",
"fabric": "unknown",
"fit": product_style,
"style": product_style,
"occasion": str(query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual",
"season": "all-season",
}
scored_matches: list[dict[str, Any]] = []
for candidate in matching_items:
if product_slot == "topwear":
scored = score_pair_full(product_stub, candidate, occasion)
else:
scored = score_pair_full(candidate, product_stub, occasion)
scored_matches.append(
{
"id": candidate.get("id"),
"type": candidate.get("type"),
"category": candidate.get("category"),
"color": candidate.get("color"),
"score": scored.get("score", 0),
"reason": scored.get("reason", ""),
}
)
scored_matches.sort(key=lambda entry: (int(entry.get("score") or 0), str(entry.get("color") or "")), reverse=True)
matched_garments = scored_matches[:3]
if matched_garments:
matched_labels = [_format_matched_label(item) for item in matched_garments]
if len(matched_labels) == 1:
matched_text = matched_labels[0]
elif len(matched_labels) == 2:
matched_text = f"{matched_labels[0]} and {matched_labels[1]}"
else:
matched_text = f"{', '.join(matched_labels[:-1])}, and {matched_labels[-1]}"
reason = (
f"{product_name} is a strong {product_slot} choice because it pairs cleanly with your {matching_slot} pieces like {matched_text}. "
f"The match is only evaluated against {matching_slot} garments, so no top-top or bottom-bottom pairing is used."
)
else:
reason = (
f"{product_name} fits as a {product_slot} recommendation, but there were no {matching_slot} wardrobe items available to compare against."
)
return {
"reason": reason,
"match_score": int(matched_garments[0]["score"]) if matched_garments else 0,
"matched_with_slot": matching_slot,
"matched_garments": matched_garments,
}
def _enrich_scraper_products_with_matches(
products: list[dict[str, Any]],
query_plan: dict[str, Any],
wardrobe_snapshot: dict[str, Any],
target_category: str,
occasion: str,
) -> list[dict[str, Any]]:
enriched_products: list[dict[str, Any]] = []
for product in products:
if not isinstance(product, dict):
continue
enriched_products.append(
{
**product,
**_build_product_match_context(
product=product,
query_plan=query_plan,
wardrobe_snapshot=wardrobe_snapshot,
target_category=target_category,
occasion=occasion,
),
}
)
return enriched_products
def _build_scraper_planning_context(
wardrobe_snapshot: dict[str, Any],
requested_target_category: str,
occasion: str,
gender: str,
filters: dict[str, Any],
) -> dict[str, Any]:
items = wardrobe_snapshot.get("items") if isinstance(wardrobe_snapshot.get("items"), list) else []
occasion_bucket = _occasion_bucket(occasion)
resolved_target = _resolve_target_category(requested_target_category, wardrobe_snapshot)
reference_slot = "bottomwear" if resolved_target == "topwear" else "topwear"
slot_colors: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []}
slot_categories: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []}
for raw_item in items:
if not isinstance(raw_item, dict):
continue
slot = _norm(raw_item.get("type"))
slot_key = slot if slot in slot_colors else "others"
slot_colors[slot_key].append(str(raw_item.get("color") or ""))
slot_categories[slot_key].append(str(raw_item.get("category") or ""))
preferred_colors = [str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()]
color_resonance_scores = _rank_color_resonance(
slot_colors=slot_colors,
reference_slot=reference_slot,
preferred_colors=preferred_colors,
occasion_bucket=occasion_bucket,
)
color_shortlist = [
str(entry.get("color") or "").strip()
for entry in color_resonance_scores
if str(entry.get("color") or "").strip()
][:6]
allowed = _allowed_categories(resolved_target, occasion_bucket)
style_hint = "formal-smart" if occasion_bucket == "formal" else "occasion-aligned"
if occasion_bucket == "party":
style_hint = "elevated-party"
if occasion_bucket == "sports":
style_hint = "performance-athletic"
def reference_score(item: dict[str, Any]) -> int:
score = 0
slot = _norm(item.get("type"))
if slot == reference_slot:
score += 4
category = _norm(item.get("category"))
if category and category != "unknown":
score += 2
color = extract_base_color(item.get("color") or "")
if color and color != "unknown":
score += 2
item_style = _norm(item.get("style") or item.get("occasion") or "")
if occasion_bucket == "formal" and any(token in item_style for token in {"formal", "work", "office", "business"}):
score += 3
if occasion_bucket == "party" and any(token in item_style for token in {"party", "festive", "ethnic"}):
score += 3
if occasion_bucket == "casual" and "casual" in item_style:
score += 2
return score
ranked_reference_items = [item for item in items if isinstance(item, dict)]
ranked_reference_items.sort(key=reference_score, reverse=True)
reference_item_ids = [str(item.get("id") or "") for item in ranked_reference_items if str(item.get("id") or "").strip()][:4]
return {
"requested_target_category": requested_target_category,
"resolved_target_category": resolved_target,
"occasion_bucket": occasion_bucket,
"gender_preference": _normalize_scraper_gender(gender),
"allowed_categories": allowed,
"color_shortlist": color_shortlist[:6],
"color_resonance_scores": color_resonance_scores[:8],
"style_direction": style_hint,
"reference_slot": reference_slot,
"reference_item_ids": reference_item_ids,
"reference_items": [
{
"id": item.get("id"),
"type": item.get("type"),
"category": item.get("category"),
"color": item.get("color"),
"style": item.get("style"),
"occasion": item.get("occasion"),
}
for item in ranked_reference_items[:6]
],
"slot_dominant_categories": {
"topwear": _top_terms(slot_categories.get("topwear", []), limit=4),
"bottomwear": _top_terms(slot_categories.get("bottomwear", []), limit=4),
},
"slot_dominant_colors": {
"topwear": _top_terms(slot_colors.get("topwear", []), limit=4),
"bottomwear": _top_terms(slot_colors.get("bottomwear", []), limit=4),
},
}
def _normalize_planned_category(raw_value: Any, allowed: list[str]) -> str:
normalized = _norm(raw_value)
if normalized:
for category in allowed:
if normalized == category or normalized in category or category in normalized:
return category
return allowed[0] if allowed else "shirt"
def _extract_explicit_category_from_prompt(user_prompt: str, allowed: list[str]) -> str | None:
normalized_prompt = _norm(user_prompt)
if not normalized_prompt:
return None
# Map common synonyms to policy categories.
synonym_map: dict[str, str] = {
"blazer": "jacket",
"sport coat": "jacket",
"suit jacket": "jacket",
"tee": "t-shirt",
"tshirt": "t-shirt",
"trouser": "trousers",
"pant": "pants",
}
allowed_set = {str(value).strip().lower() for value in allowed if str(value).strip()}
# Direct allowed-category mention takes highest priority.
for category in allowed:
normalized_category = str(category).strip().lower()
if normalized_category and normalized_category in normalized_prompt:
return category
# Then check synonym map against allowed categories.
for phrase, mapped_category in synonym_map.items():
if phrase in normalized_prompt and mapped_category in allowed_set:
for category in allowed:
if str(category).strip().lower() == mapped_category:
return category
return None
def _normalize_planned_color(raw_value: Any, color_shortlist: list[str]) -> str:
normalized = extract_base_color(raw_value or "")
if normalized and normalized in color_shortlist:
return normalized
if normalized:
for candidate in color_shortlist:
if candidate in normalized or normalized in candidate:
return candidate
return ""
def _resolve_color_fallback(color_shortlist: list[str], color_resonance_scores: list[dict[str, Any]]) -> str:
for entry in color_resonance_scores:
color = str(entry.get("color") or "").strip()
if color:
return color
if color_shortlist:
return color_shortlist[0]
return "black"
def _normalize_reference_ids(
raw_ids: Any,
valid_ids: list[str],
fallback_ids: list[str],
limit: int = 4,
) -> list[str]:
valid_set = {value for value in valid_ids if value}
normalized: list[str] = []
if isinstance(raw_ids, list):
for value in raw_ids:
item_id = str(value or "").strip()
if not item_id or item_id in normalized or item_id not in valid_set:
continue
normalized.append(item_id)
if len(normalized) >= limit:
return normalized
for item_id in fallback_ids:
if item_id and item_id not in normalized:
normalized.append(item_id)
if len(normalized) >= limit:
break
return normalized
def _build_planned_query(
gender: str | None,
color: str,
category: str,
occasion: str,
style_direction: str,
) -> str:
parts = [
str(gender or "").strip(),
color,
style_direction,
category,
f"for {occasion.strip()}" if occasion.strip() else "",
]
return " ".join(part for part in parts if part).strip()
def _fallback_scraper_plan(
planning_context: dict[str, Any],
occasion: str,
gender: str,
reason: str,
) -> dict[str, Any]:
allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()]
color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()]
style_direction = str(planning_context.get("style_direction") or "occasion-aligned")
resolved_target = str(planning_context.get("resolved_target_category") or "topwear")
plan_gender = _normalize_scraper_gender(gender)
category = allowed[0] if allowed else "shirt"
color = color_shortlist[0] if color_shortlist else "black"
query = _build_planned_query(plan_gender, color, category, occasion, style_direction)
return {
"target_category": resolved_target,
"color": color,
"category": category,
"gender": plan_gender,
"style_direction": style_direction,
"reference_item_ids": planning_context.get("reference_item_ids", []),
"query": query,
"reason": reason,
"source": "fallback",
}
def _generate_scraper_plan_with_nemotron(
occasion: str,
gender: str,
preferences: str,
user_prompt: str,
target_category: str,
filters: dict[str, Any],
max_products: int | None,
store: str,
strict_nemotron: bool = False,
) -> dict[str, Any]:
wardrobe_snapshot = _wardrobe_metadata_snapshot()
requested_target = _normalize_target_category(target_category)
safe_filters = filters if isinstance(filters, dict) else {}
planning_context = _build_scraper_planning_context(
wardrobe_snapshot=wardrobe_snapshot,
requested_target_category=requested_target,
occasion=occasion,
gender=gender,
filters=safe_filters,
)
prompt = _build_scraper_plan_prompt(
occasion=occasion,
gender=gender,
preferences=preferences,
user_prompt=user_prompt,
target_category=requested_target,
filters=safe_filters,
wardrobe_snapshot=wardrobe_snapshot,
planning_context=planning_context,
max_products=max_products,
store=store,
)
plan_source = "nemotron"
plan_error: str | None = None
try:
model_text = run_scraper_planner_text_inference(prompt, max_tokens=SCRAPER_PLANNER_MAX_TOKENS)
parsed = _recover_scraper_plan_from_text(
model_text=model_text,
planning_context=planning_context,
occasion=occasion,
gender=gender,
)
if not isinstance(parsed, dict) or not parsed:
raise NvidiaPayloadError("Nemotron scraper planner returned empty or invalid JSON payload.")
except Exception as exc:
if strict_nemotron:
raise NvidiaPayloadError(f"Nemotron planner unavailable: {exc}") from exc
plan_source = "fallback"
plan_error = str(exc)
parsed = _fallback_scraper_plan(
planning_context=planning_context,
occasion=occasion,
gender=gender,
reason=(
"Live Nemotron query planning was unavailable, so a deterministic fallback planner was used."
),
)
resolved_target = _normalize_target_category(
parsed.get("target_category") or planning_context.get("resolved_target_category") or requested_target
)
if resolved_target == "both":
resolved_target = str(planning_context.get("resolved_target_category") or "topwear")
allowed = _allowed_categories(
target_category=resolved_target,
occasion_bucket=str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)),
)
occasion_bucket = str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion))
color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()]
color_resonance_scores = [
entry
for entry in (planning_context.get("color_resonance_scores") or [])
if isinstance(entry, dict)
]
color = _normalize_planned_color(parsed.get("color"), color_shortlist)
if not color:
color = _resolve_color_fallback(color_shortlist, color_resonance_scores)
if color_shortlist and color not in color_shortlist:
color = _resolve_color_fallback(color_shortlist, color_resonance_scores)
explicit_category = _extract_explicit_category_from_prompt(user_prompt, allowed)
category = explicit_category or _normalize_planned_category(parsed.get("category"), allowed)
requested_gender = _normalize_scraper_gender(gender)
inferred_or_planned_gender = _normalize_scraper_gender(
str(parsed.get("gender") or planning_context.get("gender_preference") or gender or "")
)
plan_gender = requested_gender or inferred_or_planned_gender
style_direction = str(parsed.get("style_direction") or planning_context.get("style_direction") or "occasion-aligned")
valid_reference_ids = [
str(item.get("id") or "")
for item in (wardrobe_snapshot.get("items") or [])
if isinstance(item, dict)
]
reference_item_ids = _normalize_reference_ids(
raw_ids=parsed.get("reference_item_ids"),
valid_ids=valid_reference_ids,
fallback_ids=[str(value) for value in planning_context.get("reference_item_ids", [])],
limit=4,
)
query = str(parsed.get("query") or "").strip()
if not query or explicit_category is not None or requested_gender is not None:
query = _build_planned_query(plan_gender, color, category, occasion, style_direction)
resonance_lead = color_resonance_scores[0] if color_resonance_scores else {}
default_grounding = (
f"Selected {color} from DB metadata resonance: "
f"reference_count={int(resonance_lead.get('reference_count') or 0)}, "
f"global_count={int(resonance_lead.get('global_count') or 0)}."
if resonance_lead
else "Selected color using wardrobe metadata shortlist and reference-slot compatibility."
)
wardrobe_grounding = str(parsed.get("wardrobe_grounding") or default_grounding)
reason = str(parsed.get("reason") or "Nemotron generated a wardrobe-aware shopping query.")
recommendation = ScraperRecommendation(
color=color,
category=category,
gender=plan_gender,
)
store_key = _normalize_store_name(store or SCRAPER_DEFAULT_STORE or "nike")
search_urls = _build_store_search_urls_from_query(
query,
store=store_key,
gender=plan_gender,
wardrobe_items=list(wardrobe_snapshot.get("items") or []),
requested_category=requested_target,
)
if not search_urls:
search_urls = _build_store_search_urls_from_recommendation(
recommendation,
store=store_key,
occasion=occasion,
)
intermediate_steps: list[dict[str, Any]] = [
{
"step": "plan",
"store": store_key,
"query": query,
"target_category": resolved_target,
"color": color,
"category": category,
}
]
generated_urls = list(dict.fromkeys(search_urls))
scrape_limit = max_products if isinstance(max_products, int) and max_products > 0 else 12
scraped_products: list[dict[str, Any]] = []
fallback_products: list[dict[str, Any]] = []
seen_links: set[str] = set()
scrape_errors: list[str] = []
intermediate_steps.append(
{
"step": "url_generation",
"query": query,
"url_count": len(generated_urls),
"total_urls": len(generated_urls),
}
)
if not generated_urls:
intermediate_steps.append(
{
"step": "diagnostic",
"message": "Planner succeeded but no search URLs were generated.",
"attempted_url_count": 0,
}
)
else:
for index, search_url in enumerate(generated_urls):
if len(scraped_products) >= scrape_limit:
break
try:
extracted = _extract_store_product_summaries(search_url=search_url, store=store_key)
except requests.RequestException as exc:
error_message = f"url[{index + 1}] scrape failed: {exc}"
scrape_errors.append(error_message)
intermediate_steps.append(
{
"step": "scrape",
"query": query,
"url_count": len(generated_urls),
"new_products": 0,
"total_products": len(scraped_products),
"errors": [error_message],
}
)
continue
new_products = 0
for product in extracted:
if not isinstance(product, dict):
continue
item_link = str(product.get("item_link") or "").strip()
if not item_link or item_link in seen_links:
continue
seen_links.add(item_link)
if _is_relevant_scraped_product(
product=product,
target_slot=resolved_target,
planned_category=category,
planned_color=color,
occasion_bucket=occasion_bucket,
):
scraped_products.append(product)
new_products += 1
else:
fallback_products.append(product)
if len(scraped_products) >= scrape_limit:
break
intermediate_steps.append(
{
"step": "scrape",
"query": query,
"url_count": len(generated_urls),
"new_products": new_products,
"total_products": len(scraped_products),
}
)
if not scraped_products and fallback_products and not _color_keywords_for_relevance(color):
scraped_products = fallback_products[:scrape_limit]
intermediate_steps.append(
{
"step": "scrape_fallback",
"query": query,
"new_products": len(scraped_products),
"total_products": len(scraped_products),
"message": "Used non-filtered scrape fallback because strict relevance filtering returned no products.",
}
)
elif not scraped_products and fallback_products:
intermediate_steps.append(
{
"step": "scrape_filter",
"query": query,
"rejected_products": len(fallback_products),
"total_products": 0,
"message": "Rejected scraped products because none matched the planned color and category.",
}
)
query_plan_payload = {
"color": color,
"category": category,
"gender": plan_gender,
"query": query,
"final_query": query,
"reason": reason,
"wardrobe_grounding": wardrobe_grounding,
"source": plan_source,
"target_category": resolved_target,
"style_direction": style_direction,
"occasion_bucket": planning_context.get("occasion_bucket"),
"reference_item_ids": reference_item_ids,
"color_resonance_scores": color_resonance_scores[:4],
}
enriched_products = _enrich_scraper_products_with_matches(
products=scraped_products,
query_plan=query_plan_payload,
wardrobe_snapshot=wardrobe_snapshot,
target_category=resolved_target,
occasion=occasion,
)
product_urls = [
str(product.get("item_link") or "").strip()
for product in enriched_products
if str(product.get("item_link") or "").strip()
]
response_payload: dict[str, Any] = {
"occasion": occasion,
"gender": gender,
"preferences": preferences,
"wardrobe_snapshot": wardrobe_snapshot,
"query_plan": query_plan_payload,
"store": store_key,
"search_urls": generated_urls,
"product_urls": product_urls,
"products": enriched_products,
"count": len(enriched_products),
"intermediate_steps": intermediate_steps,
"final_query": query,
"plan_source": plan_source,
"plan_error": plan_error,
"scrape_error": "; ".join(scrape_errors) if scrape_errors else None,
"target_category": resolved_target,
}
response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload)
return _store_scraper_runtime_result(response_payload)
def _build_shopping_suggestions_from_scraper(
occasion: str,
target_category: str,
gender_preference: str,
filters: dict[str, Any],
max_results: int,
store: str,
) -> dict[str, Any]:
preferences = ", ".join(
str(value)
for value in [
*([item for item in filters.get("preferred_colors", []) if item]),
*([item for item in filters.get("preferred_patterns", []) if item]),
*([item for item in filters.get("preferred_styles", []) if item]),
*([item for item in filters.get("preferred_fabrics", []) if item]),
*([item for item in filters.get("preferred_fits", []) if item]),
*([item for item in filters.get("preferred_seasons", []) if item]),
*([item for item in filters.get("include_keywords", []) if item]),
]
)
runtime_payload = _generate_scraper_plan_with_nemotron(
occasion=occasion,
gender=gender_preference,
preferences=preferences,
user_prompt=preferences,
target_category=target_category,
filters=filters,
max_products=max_results,
store=store,
)
products = list(runtime_payload.get("products") or [])
query_plan = dict(runtime_payload.get("query_plan") or {})
suggestion_items: list[dict[str, Any]] = []
for index, product in enumerate(products[:max_results]):
item_link = str(product.get("item_link") or "").strip()
if not item_link:
continue
suggestion_items.append(
{
"target_category": target_category if target_category != "both" else ("topwear" if index % 2 == 0 else "bottomwear"),
"title": str(product.get("name") or query_plan.get("query") or "Suggested Product"),
"url": item_link,
"image_url": str(product.get("image_url") or ""),
"store": str(runtime_payload.get("store") or store or "nike").title(),
"match_score": max(65, 95 - index * 4),
"reason": str(product.get("reason") or query_plan.get("reason") or "Nemotron generated a wardrobe-aware shopping query."),
"product_category": str(query_plan.get("category") or "shopping"),
"color": str(query_plan.get("color") or "black"),
"pattern": "solid",
"search_query": str(query_plan.get("query") or occasion),
"scrape_status": "live",
"scrape_error": None,
"product_gender": str(query_plan.get("gender") or gender_preference or "unknown") or "unknown",
"matched_with_slot": str(product.get("matched_with_slot") or ("bottomwear" if target_category == "topwear" else "topwear")),
"matched_garments": product.get("matched_garments") or [],
}
)
return {
"occasion": occasion,
"target_category": target_category,
"gender_preference": gender_preference,
"search_filters": filters,
"suggestions": suggestion_items,
"error": None if suggestion_items else "No live scraper results were returned.",
"saved_json_path": runtime_payload.get("saved_json_path", ""),
"runtime_id": runtime_payload.get("runtime_id", ""),
"query_plan": query_plan,
}
app = FastAPI(title="Wardrobe Classifier API", version="2.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
_OUTFIT_GRID_SESSIONS: dict[str, dict[str, Any]] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
cache_purge_expired()
yield
app.router.lifespan_context = lifespan
CLASSIFICATION_PROMPT = """You are a fashion expert analyzing a garment image for a wardrobe assistant app.
Carefully examine the primary clothing item in the image and return only a valid JSON object.
Rules:
- Focus only on the dominant foreground garment. Ignore the person's face/body, background, hangers, mannequins, and room objects.
- If multiple garments are visible, classify the single most prominent garment only.
- Be specific with colors (for example, "Navy Blue" instead of "Blue", "Olive Green" instead of "Green").
- If any attribute is not clearly visible, use "Unknown".
- For "color", include all visible colors in one string (for example, "White with Black stripes").
- Do not include any explanation, markdown, or text outside the JSON object.
Return this exact JSON structure:
{
"type": "e.g. T-Shirt / Jeans / Dress / Jacket / Hoodie / Shorts / Saree / Kurta",
"category": "Topwear / Bottomwear / Footwear / Outerwear / Ethnic / Accessories",
"color": "exact specific color name not basic colours",
"pattern": "Solid / Striped / Checkered / Floral / Printed / Graphic / Embroidered / Tie-Dye",
"fabric": "Cotton / Denim / Wool / Polyester / Silk / Linen / Leather / Unknown",
"fit": "Slim / Regular / Oversized / Fitted / Relaxed / Unknown",
"occasion": "Casual / Formal / Sports / Party / Work / Ethnic",
"season": "Summer / Winter / Monsoon / All-Season"
}"""
NVIDIA_INVOKE_URL = os.getenv(
"NVIDIA_INVOKE_URL",
"https://integrate.api.nvidia.com/v1/chat/completions",
)
NVIDIA_MODEL_ID = os.getenv("NVIDIA_MODEL_ID", "qwen/qwen3.5-122b-a10b")
NVIDIA_MAX_TOKENS = int(os.getenv("NVIDIA_MAX_TOKENS", "16384"))
NVIDIA_REASONING_MAX_TOKENS = int(os.getenv("NVIDIA_REASONING_MAX_TOKENS", "16384"))
NVIDIA_TEMPERATURE = float(os.getenv("NVIDIA_TEMPERATURE", "0.60"))
NVIDIA_TOP_P = float(os.getenv("NVIDIA_TOP_P", "0.95"))
NVIDIA_TIMEOUT_SECONDS = int(os.getenv("NVIDIA_TIMEOUT_SECONDS", "180"))
NVIDIA_MAX_RETRIES = int(os.getenv("NVIDIA_MAX_RETRIES", "3"))
NVIDIA_RETRY_BACKOFF_SECONDS = float(os.getenv("NVIDIA_RETRY_BACKOFF_SECONDS", "0.8"))
NVIDIA_ENABLE_THINKING = str(os.getenv("NVIDIA_ENABLE_THINKING", "false")).strip().lower() == "true"
NVIDIA_IMAGE_MAX_DIM = int(os.getenv("NVIDIA_IMAGE_MAX_DIM", "1400"))
NVIDIA_FALLBACK_MODEL_IDS = [
model_id.strip()
for model_id in os.getenv("NVIDIA_FALLBACK_MODEL_IDS", "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning").split(",")
if model_id.strip()
]
NVIDIA_API_KEY_MISSING_DETAIL = "NVIDIA_API_KEY is not configured on this Space."
class NvidiaGatewayError(RuntimeError):
def __init__(self, message: str, status_code: int = 502) -> None:
super().__init__(message)
self.status_code = status_code
class NvidiaPayloadError(RuntimeError):
pass
class NvidiaTokenLimitError(NvidiaPayloadError):
pass
def _nvidia_api_key() -> str:
return os.getenv("NVIDIA_API_KEY", "").strip()
def _candidate_model_ids(primary_model_id: str) -> list[str]:
model_ids = [primary_model_id, *NVIDIA_FALLBACK_MODEL_IDS]
deduped: list[str] = []
seen: set[str] = set()
for model_id in model_ids:
key = model_id.strip()
if not key or key in seen:
continue
deduped.append(key)
seen.add(key)
return deduped
def _is_degraded_function_error(exc: Exception) -> bool:
if not isinstance(exc, NvidiaGatewayError):
return False
return "DEGRADED function cannot be invoked" in str(exc)
OUTFIT_GRID_CELL_SIZE = int(os.getenv("OUTFIT_GRID_CELL_SIZE", "224"))
OUTFIT_GRID_LABEL_HEIGHT = int(os.getenv("OUTFIT_GRID_LABEL_HEIGHT", "28"))
OUTFIT_GRID_PADDING = int(os.getenv("OUTFIT_GRID_PADDING", "12"))
OUTFIT_GRID_FETCH_TIMEOUT_SECONDS = int(os.getenv("OUTFIT_GRID_FETCH_TIMEOUT_SECONDS", "12"))
OUTFIT_GRID_SESSION_TTL_SECONDS = int(os.getenv("OUTFIT_GRID_SESSION_TTL_SECONDS", "3600"))
OUTFIT_GRID_SESSION_DIR = Path(
os.getenv("OUTFIT_GRID_SESSION_DIR", str(Path(tempfile.gettempdir()) / "wardrobe-grid-sessions"))
)
OUTFIT_GRID_MAX_TOP_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_TOP_ITEMS", "4"))
OUTFIT_GRID_MAX_BOTTOM_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_BOTTOM_ITEMS", "4"))
OUTFIT_ANCHOR_MIN_SCORE = int(os.getenv("OUTFIT_ANCHOR_MIN_SCORE", "45"))
OUTFIT_TEXT_PRESELECT_ENABLED = str(os.getenv("OUTFIT_TEXT_PRESELECT_ENABLED", "false")).strip().lower() == "true"
OUTFIT_TEXT_SELECTOR_MAX_TOKENS = int(os.getenv("OUTFIT_TEXT_SELECTOR_MAX_TOKENS", "400"))
OUTFIT_AI_MAX_TOKENS = int(os.getenv("OUTFIT_AI_MAX_TOKENS", "4096"))
OUTFIT_TEXT_SELECTOR_NAME = "nemotron-text-preselect-v1"
OUTFIT_AI_SCORER_NAME = "ai-grid-v1"
OUTFIT_FALLBACK_SCORER_NAME = "fallback-current-v1"
OUTFIT_GRID_SCORING_PROMPT_TEMPLATE = """You are an expert multimodal outfit matching engine.
Task:
Evaluate every valid outfit combination shown in the attached wardrobe grid image and rank the best outfits for the given context.
Grid semantics:
- Row 1 contains topwear only.
- Row 2 contains bottomwear only.
- Row 3 contains optional "Others" items (footwear, accessories, outerwear, or uncategorized garments). If Row 3 is absent, ignore this slot.
- Each cell is labeled with a coordinate like 1:1, 1:2, 2:1, 2:2, 3:1.
- A valid outfit is exactly one Row 1 item plus one Row 2 item, and optionally one Row 3 item.
User context:
- Occasion: {occasion}
- Region: {region}
- Weather JSON: {weather_json}
- User profile JSON: {user_profile_json}
- Anchor mode: {anchor_mode}
- Locked top index: {locked_top_index}
- Locked bottom index: {locked_bottom_index}
- Locked other index: {locked_other_index}
- Anchor item JSON: {anchor_item_json}
Wardrobe metadata map:
{metadata_json}
Scoring rubric:
- occasion relevance
- color harmony
- pattern compatibility
- fit alignment
- style coherence
- seasonal/contextual appropriateness
Instructions:
1. Use both the composite image and the metadata map together.
2. Treat the locked item as the fixed styling anchor whenever Anchor mode is not "none".
3. Evaluate all {combination_count} possible valid combinations exactly once before ranking.
4. If a locked top, locked bottom, or locked other index is provided, only consider combinations containing that index.
5. Assign each retained combination a final score from 0 to 100 and this score breakdown:
occasion, color, pattern, fit, style, season
6. Return only valid JSON. No markdown and no prose outside JSON.
7. Return at most the top {top_k} outfits in descending score order.
8. When a Row 3 item is part of the outfit, include its cell in "other_index". If no Row 3 item is used, set "other_index" to null.
Return this exact JSON shape:
{{
"recommendations": [
{{
"top_index": "1:1",
"bottom_index": "2:1",
"other_index": "3:1",
"score": 92,
"breakdown": {{
"occasion": 94,
"color": 91,
"pattern": 89,
"fit": 90,
"style": 93,
"season": 88
}},
"reason": "Max 15 words. Short user-facing explanation grounded in visual + metadata evidence.",
"tip": "Max 10 words. One concise styling tip."
}}
]
}}"""
# ---------------------------------------------------------------------------
# Model helpers
# ---------------------------------------------------------------------------
def _image_to_data_url(image: Image.Image) -> str:
if NVIDIA_IMAGE_MAX_DIM > 0:
image = image.copy()
image.thumbnail((NVIDIA_IMAGE_MAX_DIM, NVIDIA_IMAGE_MAX_DIM), Image.Resampling.LANCZOS)
buffer = io.BytesIO()
image.save(buffer, format="PNG")
image_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return f"data:image/png;base64,{image_b64}"
def _extract_text_from_nvidia_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for chunk in content:
if isinstance(chunk, str):
parts.append(chunk)
continue
if not isinstance(chunk, dict):
continue
for key in ("text", "content", "value"):
value = chunk.get(key)
if isinstance(value, str) and value:
parts.append(value)
break
return "".join(parts).strip()
if isinstance(content, dict):
for key in ("text", "content", "value"):
value = content.get(key)
if isinstance(value, str) and value:
return value
return ""
def _extract_nvidia_text(payload: dict[str, Any]) -> str:
try:
choice = payload["choices"][0]
message = choice["message"]
except (KeyError, IndexError, TypeError) as exc:
raise NvidiaPayloadError(f"Unexpected NVIDIA API response shape: {payload}") from exc
content = message.get("content")
extracted_content = _extract_text_from_nvidia_content(content)
if extracted_content:
return extracted_content
reasoning_content = message.get("reasoning_content")
extracted_reasoning_content = _extract_text_from_nvidia_content(reasoning_content)
if extracted_reasoning_content:
return extracted_reasoning_content
reasoning = message.get("reasoning")
if isinstance(reasoning, str) and reasoning.strip():
return reasoning.strip()
if choice.get("finish_reason") == "length":
raise NvidiaTokenLimitError(
"NVIDIA response hit max_tokens before final content was produced."
)
raise NvidiaPayloadError(
"Unexpected NVIDIA message payload: "
f"finish_reason={choice.get('finish_reason')}, "
f"message_keys={list(message.keys())}, "
f"content={content!r}, "
f"reasoning_content={reasoning_content!r}"
)
def _extract_streamed_nvidia_text(response: requests.Response) -> str:
chunks: list[str] = []
for raw_line in response.iter_lines(decode_unicode=True):
if not raw_line:
continue
line = raw_line.strip()
if not line or not line.startswith("data:"):
continue
data = line[5:].strip()
if not data or data == "[DONE]":
continue
try:
payload = json.loads(data)
except json.JSONDecodeError:
continue
try:
choice = payload["choices"][0]
except (KeyError, IndexError, TypeError):
continue
delta = choice.get("delta") or {}
if isinstance(delta, dict):
content = _extract_text_from_nvidia_content(delta.get("content"))
if content:
chunks.append(content)
reasoning = _extract_text_from_nvidia_content(delta.get("reasoning_content"))
if reasoning:
chunks.append(reasoning)
alt_reasoning = _extract_text_from_nvidia_content(delta.get("reasoning"))
if alt_reasoning:
chunks.append(alt_reasoning)
message = choice.get("message") or {}
if isinstance(message, dict):
final_content = _extract_text_from_nvidia_content(message.get("content"))
if final_content:
chunks.append(final_content)
final_reasoning = _extract_text_from_nvidia_content(message.get("reasoning_content"))
if final_reasoning:
chunks.append(final_reasoning)
text = "".join(chunks).strip()
if text:
return text
raise NvidiaPayloadError("NVIDIA stream ended without returning any content.")
def run_nvidia_inference(image: Image.Image, prompt: str, max_tokens: int = NVIDIA_MAX_TOKENS) -> str:
api_key = _nvidia_api_key()
if not api_key:
raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL)
last_error: Exception | None = None
for model_id in _candidate_model_ids(NVIDIA_MODEL_ID):
current_max_tokens = max_tokens
while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS:
payload = {
"model": model_id,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": _image_to_data_url(image),
},
},
],
}
],
"max_tokens": current_max_tokens,
"temperature": NVIDIA_TEMPERATURE,
"top_p": NVIDIA_TOP_P,
"stream": True,
}
if NVIDIA_ENABLE_THINKING:
payload["chat_template_kwargs"] = {"enable_thinking": True}
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
}
last_error = None
for attempt in range(NVIDIA_MAX_RETRIES + 1):
try:
response = requests.post(
NVIDIA_INVOKE_URL,
headers=headers,
json=payload,
timeout=NVIDIA_TIMEOUT_SECONDS,
)
if response.status_code in {429, 500, 502, 503, 504}:
raise NvidiaGatewayError(
f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}",
status_code=503 if response.status_code == 429 else 502,
)
if response.status_code >= 400:
raise NvidiaGatewayError(
f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}",
status_code=502,
)
return _extract_streamed_nvidia_text(response)
except NvidiaTokenLimitError as exc:
last_error = exc
break
except (requests.RequestException, NvidiaGatewayError) as exc:
last_error = exc
if _is_degraded_function_error(exc):
break
if attempt >= NVIDIA_MAX_RETRIES:
break
time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1))
except NvidiaPayloadError:
raise
if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS:
current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS)
continue
if _is_degraded_function_error(last_error or Exception()):
print(f"[nvidia] model degraded, trying fallback model: {model_id}")
break
if isinstance(last_error, NvidiaGatewayError):
raise last_error
if isinstance(last_error, requests.RequestException):
raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error
if last_error is not None:
raise last_error
break
if isinstance(last_error, Exception):
raise NvidiaGatewayError(
f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}",
status_code=502,
) from last_error
raise NvidiaGatewayError(
"NVIDIA API request failed after exhausting reasoning token budget.",
status_code=502,
)
def run_nvidia_text_inference(prompt: str, max_tokens: int = OUTFIT_TEXT_SELECTOR_MAX_TOKENS) -> str:
api_key = _nvidia_api_key()
if not api_key:
raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL)
last_error: Exception | None = None
for model_id in _candidate_model_ids(NVIDIA_MODEL_ID):
current_max_tokens = max_tokens
while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS:
payload = {
"model": model_id,
"messages": [
{
"role": "user",
"content": prompt,
}
],
"max_tokens": current_max_tokens,
"temperature": NVIDIA_TEMPERATURE,
"top_p": NVIDIA_TOP_P,
"stream": True,
}
if NVIDIA_ENABLE_THINKING:
payload["chat_template_kwargs"] = {"enable_thinking": True}
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
}
last_error = None
for attempt in range(NVIDIA_MAX_RETRIES + 1):
try:
response = requests.post(
NVIDIA_INVOKE_URL,
headers=headers,
json=payload,
timeout=NVIDIA_TIMEOUT_SECONDS,
)
if response.status_code in {429, 500, 502, 503, 504}:
raise NvidiaGatewayError(
f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}",
status_code=503 if response.status_code == 429 else 502,
)
if response.status_code >= 400:
raise NvidiaGatewayError(
f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}",
status_code=502,
)
return _extract_streamed_nvidia_text(response)
except NvidiaTokenLimitError as exc:
last_error = exc
break
except (requests.RequestException, NvidiaGatewayError) as exc:
last_error = exc
if _is_degraded_function_error(exc):
break
if attempt >= NVIDIA_MAX_RETRIES:
break
time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1))
except NvidiaPayloadError:
raise
if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS:
current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS)
continue
if _is_degraded_function_error(last_error or Exception()):
print(f"[nvidia] model degraded, trying fallback model: {model_id}")
break
if isinstance(last_error, NvidiaGatewayError):
raise last_error
if isinstance(last_error, requests.RequestException):
raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error
if last_error is not None:
raise last_error
break
if isinstance(last_error, Exception):
raise NvidiaGatewayError(
f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}",
status_code=502,
) from last_error
raise NvidiaGatewayError(
"NVIDIA API request failed after exhausting reasoning token budget.",
status_code=502,
)
def parse_json_from_text(text: str) -> dict[str, Any]:
if not text:
return {}
stripped = text.strip()
try:
return json.loads(stripped)
except json.JSONDecodeError:
s, e = stripped.find("{"), stripped.rfind("}")
if s != -1 and e != -1 and e > s:
try:
return json.loads(stripped[s:e + 1])
except json.JSONDecodeError:
pass
return {}
def normalize_specs(specs: dict[str, Any]) -> dict[str, str]:
return {
"type": str(specs.get("type", "Unknown")),
"category": str(specs.get("category", "Unknown")),
"color": str(specs.get("color", "Unknown")),
"pattern": str(specs.get("pattern", "Unknown")),
"fabric": str(specs.get("fabric", "Unknown")),
"fit": str(specs.get("fit", "Unknown")),
"occasion": str(specs.get("occasion", "Unknown")),
"season": str(specs.get("season", "Unknown")),
}
def _clamp_score(value: Any, fallback: int = 0) -> int:
if isinstance(value, str):
match = re.search(r"-?\d+(?:\.\d+)?", value)
value = match.group(0) if match else value
try:
score = int(round(float(value)))
except (TypeError, ValueError):
score = fallback
return max(0, min(100, score))
def _safe_metadata_item(item: dict[str, Any]) -> dict[str, Any]:
return {
"id": item.get("id"),
"slot": item.get("type"),
"type": item.get("type"),
"category": item.get("category"),
"color": item.get("color"),
"pattern": item.get("pattern"),
"fabric": item.get("fabric"),
"fit": item.get("fit"),
"occasion": item.get("style"),
"season": item.get("season"),
}
def _placeholder_grid_tile(index_label: str, item: dict[str, Any], tile_size: int) -> Image.Image:
tile = Image.new("RGB", (tile_size, tile_size), (245, 245, 245))
draw = ImageDraw.Draw(tile)
font = ImageFont.load_default()
draw.rectangle((8, 8, tile_size - 8, tile_size - 8), outline=(180, 180, 180), width=2)
draw.text((14, 14), index_label, fill=(40, 40, 40), font=font)
draw.text((14, 36), str(item.get("category") or "Unknown")[:24], fill=(80, 80, 80), font=font)
draw.text((14, 54), str(item.get("color") or "Unknown")[:24], fill=(80, 80, 80), font=font)
return tile
def _load_grid_tile(image_url: str, index_label: str, item: dict[str, Any]) -> Image.Image:
if not image_url or image_url.startswith("memory://"):
return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE)
parsed = urlparse(image_url)
if parsed.scheme not in {"http", "https"}:
return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE)
try:
req = Request(
image_url,
headers={"User-Agent": "Mozilla/5.0", "Accept": "image/*,*/*;q=0.8", "Referer": image_url},
)
with urlopen(req, timeout=OUTFIT_GRID_FETCH_TIMEOUT_SECONDS) as resp:
tile = Image.open(io.BytesIO(resp.read())).convert("RGB")
return ImageOps.fit(
tile,
(OUTFIT_GRID_CELL_SIZE, OUTFIT_GRID_CELL_SIZE),
method=Image.Resampling.LANCZOS,
centering=(0.5, 0.5),
)
except Exception:
return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE)
def _prune_outfit_grid_sessions() -> None:
now = time.time()
for session_id, record in list(_OUTFIT_GRID_SESSIONS.items()):
if now - float(record.get("created_at") or 0) <= OUTFIT_GRID_SESSION_TTL_SECONDS:
continue
path = record.get("image_path")
if isinstance(path, str):
try:
Path(path).unlink(missing_ok=True)
except Exception:
pass
_OUTFIT_GRID_SESSIONS.pop(session_id, None)
def _build_outfit_grid_session(
tops: list[dict[str, Any]],
bottoms: list[dict[str, Any]],
others: list[dict[str, Any]],
occasion: str,
user_profile: dict[str, Any] | None,
weather: dict[str, Any] | None,
region: str,
) -> dict[str, Any]:
_prune_outfit_grid_sessions()
session_id = str(uuid.uuid4())
rows = [
(1, "Topwear", tops),
(2, "Bottomwear", bottoms),
]
if others:
rows.append((3, "Others", others))
columns = max(len(tops), len(bottoms), len(others), 1)
cell_span = OUTFIT_GRID_CELL_SIZE + 2 * OUTFIT_GRID_PADDING
row_span = OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT + 2 * OUTFIT_GRID_PADDING
canvas = Image.new("RGB", (columns * cell_span, len(rows) * row_span), (255, 255, 255))
draw = ImageDraw.Draw(canvas)
font = ImageFont.load_default()
metadata_map: dict[str, dict[str, Any]] = {}
item_lookup: dict[str, dict[str, Any]] = {}
def draw_row(items: list[dict[str, Any]], row_index: int, row_name: str) -> None:
if not items:
y = (row_index - 1) * row_span + 10
draw.text((12, y), f"Row {row_index}: {row_name} (no items)", fill=(120, 120, 120), font=font)
return
for col_index, item in enumerate(items, start=1):
index_label = f"{row_index}:{col_index}"
x0 = (col_index - 1) * cell_span + OUTFIT_GRID_PADDING
y0 = (row_index - 1) * row_span + OUTFIT_GRID_PADDING
tile = _load_grid_tile(str(item.get("image_url") or ""), index_label, item)
canvas.paste(tile, (x0, y0))
label_box = (
x0,
y0 + OUTFIT_GRID_CELL_SIZE,
x0 + OUTFIT_GRID_CELL_SIZE,
y0 + OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT,
)
draw.rectangle(label_box, fill=(20, 20, 20))
label_text = f"{index_label} | {str(item.get('color') or 'Unknown')[:18]} {str(item.get('category') or 'Unknown')[:18]}"
draw.text((label_box[0] + 6, label_box[1] + 8), label_text[:36], fill=(255, 255, 255), font=font)
metadata_map[index_label] = _safe_metadata_item(item)
item_lookup[index_label] = item
for row_index, row_name, row_items in rows:
draw_row(row_items, row_index, row_name)
OUTFIT_GRID_SESSION_DIR.mkdir(parents=True, exist_ok=True)
image_path = OUTFIT_GRID_SESSION_DIR / f"{session_id}.png"
canvas.save(image_path, format="PNG")
_OUTFIT_GRID_SESSIONS[session_id] = {
"image_path": str(image_path),
"metadata_map": metadata_map,
"created_at": time.time(),
"occasion": occasion,
"user_profile": user_profile or {},
"weather": weather or {},
"region": region,
}
return {
"session_id": session_id,
"image": canvas,
"metadata_map": metadata_map,
"item_lookup": item_lookup,
"image_path": str(image_path),
}
def _grid_scoring_prompt(
metadata_map: dict[str, dict[str, Any]],
occasion: str,
weather: dict[str, Any] | None,
user_profile: dict[str, Any] | None,
region: str,
anchor_mode: str,
anchor_item: dict[str, Any] | None,
locked_top_index: str | None,
locked_bottom_index: str | None,
locked_other_index: str | None,
combination_count: int,
top_k: int,
) -> str:
compact_json = lambda value: json.dumps(value, ensure_ascii=True, separators=(",", ":"))
return OUTFIT_GRID_SCORING_PROMPT_TEMPLATE.format(
occasion=occasion or "casual",
region=region or "global",
weather_json=compact_json(weather or {}),
user_profile_json=compact_json(user_profile or {}),
anchor_mode=anchor_mode,
locked_top_index=locked_top_index or "None",
locked_bottom_index=locked_bottom_index or "None",
locked_other_index=locked_other_index or "None",
anchor_item_json=compact_json(_safe_metadata_item(anchor_item or {})),
metadata_json=compact_json(metadata_map),
combination_count=combination_count,
top_k=top_k,
)
def _fallback_rule_recommendations(
occasion: str,
case_name: str,
tops: list[dict[str, Any]],
bottoms: list[dict[str, Any]],
others: list[dict[str, Any]],
top_k: int,
include_pair_outfits: bool = True,
include_other_outfits: bool = True,
) -> dict[str, Any]:
outfits: list[dict[str, Any]] = []
if include_pair_outfits:
for top in tops:
for bottom in bottoms:
scored = score_pair_full(top, bottom, occasion, other=None)
outfits.append(_build_outfit_payload(scored, top, bottom, rank=0, other=None))
if include_other_outfits:
for other in others:
# Score standalone Others as complete outfits rather than as add-ons.
scored = score_pair_full(other, other, occasion, other=None)
base_reason = str(scored.get("reason") or "")
scored["reason"] = f"{other.get('color', 'This')} {other.get('category', 'item')} works as a complete standalone look."
if base_reason:
scored["reason"] = f"{scored['reason']} {base_reason}"
scored["tip"] = "Use footwear and accessories only to complement this single-piece outfit."
outfits.append(_build_outfit_payload(scored, None, None, rank=0, other=other))
outfits.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True)
for index, outfit in enumerate(outfits[:top_k], start=1):
outfit["rank"] = index
if case_name == "D" and outfits:
return {
"occasion": occasion,
"case": case_name,
"selected_outfit_score": outfits[0],
"recommendations": [],
"improved_recommendations": outfits[1:top_k],
"total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0),
"notice": None,
"engine_version": "scoring-v2",
}
return {
"occasion": occasion,
"case": case_name,
"selected_outfit_score": None,
"recommendations": outfits[:top_k],
"improved_recommendations": [],
"total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0),
"notice": None,
"engine_version": "scoring-v2",
}
def _occasion_prefers_standalone_others(occasion: str) -> bool:
occasion_n = _norm(occasion)
if not occasion_n:
return False
return any(
keyword in occasion_n
for keyword in [
"wedding",
"festive",
"ethnic",
"ceremony",
"engagement",
"reception",
"sangeet",
"haldi",
"mehndi",
]
)
def _merge_standalone_others_for_priority_occasions(
result: dict[str, Any],
occasion: str,
others: list[dict[str, Any]],
top_k: int,
) -> dict[str, Any]:
if not others or not _occasion_prefers_standalone_others(occasion):
return result
if str(result.get("case") or "").upper() == "D":
return result
standalone_payload = _fallback_rule_recommendations(
occasion=occasion,
case_name=str(result.get("case") or "A"),
tops=[],
bottoms=[],
others=others,
top_k=top_k,
include_pair_outfits=False,
include_other_outfits=True,
)
standalone = [
dict(outfit)
for outfit in (standalone_payload.get("recommendations") or [])
if isinstance(outfit, dict)
]
if not standalone:
return result
for outfit in standalone:
base_score = int(outfit.get("score") or 0)
boosted_score = min(100, max(base_score, 78) + 12)
outfit["score"] = boosted_score
breakdown = outfit.get("breakdown")
if isinstance(breakdown, dict):
breakdown["occasion"] = min(100, max(int(breakdown.get("occasion") or 0), 92))
recommendations = [
dict(outfit)
for outfit in (result.get("recommendations") or [])
if isinstance(outfit, dict)
]
merged = [*recommendations, *standalone]
merged.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True)
merged = merged[:top_k]
for index, outfit in enumerate(merged, start=1):
outfit["rank"] = index
result["recommendations"] = merged
result["total_combinations_checked"] = int(result.get("total_combinations_checked") or 0) + len(others)
return result
def _current_fallback_recommendations(
wardrobe_items: list[dict[str, Any]],
occasion: str,
top_selected: dict[str, Any] | None,
bottom_selected: dict[str, Any] | None,
other_selected: dict[str, Any] | None,
weather: dict[str, Any] | None,
user_profile: dict[str, Any] | None,
region: str,
top_k: int,
candidate_pool: int,
diversity_lambda: float,
case_name: str,
tops: list[dict[str, Any]],
bottoms: list[dict[str, Any]],
others: list[dict[str, Any]],
) -> dict[str, Any]:
def _strip_optional_slots(outfit: dict[str, Any]) -> dict[str, Any]:
cleaned = dict(outfit)
cleaned.pop("shoes", None)
cleaned.pop("accessory", None)
if cleaned.get("top") and cleaned.get("bottom"):
cleaned.pop("other", None)
return cleaned
try:
result = get_recommendation_service().recommend(
wardrobe_items=wardrobe_items,
occasion=occasion,
top_selected=top_selected,
bottom_selected=bottom_selected,
other_selected=other_selected,
weather=weather,
user_profile=user_profile,
region=region,
top_k=top_k,
candidate_pool=candidate_pool,
diversity_lambda=diversity_lambda,
)
if isinstance(result.get("selected_outfit_score"), dict):
result["selected_outfit_score"] = _strip_optional_slots(result["selected_outfit_score"])
recommendations = [
_strip_optional_slots(entry)
for entry in (result.get("recommendations") or [])
if isinstance(entry, dict)
]
improved = [
_strip_optional_slots(entry)
for entry in (result.get("improved_recommendations") or [])
if isinstance(entry, dict)
]
result["recommendations"] = recommendations[:top_k]
result["improved_recommendations"] = improved[:top_k]
result = _merge_standalone_others_for_priority_occasions(
result=result,
occasion=occasion,
others=others,
top_k=top_k,
)
print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=fashion_ai")
return result
except Exception as exc:
print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=scoring-v2 reason={exc!r}")
return _fallback_rule_recommendations(
occasion,
case_name,
tops,
bottoms,
others,
top_k,
include_pair_outfits=other_selected is None,
include_other_outfits=bool(others),
)
def _extract_grid_indices(outfit_raw: dict[str, Any]) -> tuple[str, str, str]:
top_index = str(
outfit_raw.get("top_index")
or outfit_raw.get("top")
or outfit_raw.get("top_cell")
or ""
).strip()
bottom_index = str(
outfit_raw.get("bottom_index")
or outfit_raw.get("bottom")
or outfit_raw.get("bottom_cell")
or ""
).strip()
other_index = str(
outfit_raw.get("other_index")
or outfit_raw.get("other")
or outfit_raw.get("other_cell")
or ""
).strip()
if top_index and bottom_index and other_index:
return top_index, bottom_index, other_index
combo_text = str(
outfit_raw.get("combination")
or outfit_raw.get("combo")
or outfit_raw.get("pair")
or ""
)
matches = re.findall(r"[123]:\d+", combo_text)
top_match = next((value for value in matches if value.startswith("1:")), "")
bottom_match = next((value for value in matches if value.startswith("2:")), "")
other_match = next((value for value in matches if value.startswith("3:")), "")
return top_index or top_match, bottom_index or bottom_match, other_index or other_match
def _rank_anchor_candidates(
anchor_item: dict[str, Any],
candidates: list[dict[str, Any]],
occasion: str,
anchor_is_top: bool,
) -> list[dict[str, Any]]:
ranked: list[tuple[int, dict[str, Any]]] = []
for candidate in candidates:
scored = (
score_pair_full(anchor_item, candidate, occasion)
if anchor_is_top
else score_pair_full(candidate, anchor_item, occasion)
)
ranked.append((int(scored.get("score") or 0), candidate))
ranked.sort(key=lambda pair: pair[0], reverse=True)
compatible = [item for score, item in ranked if score >= OUTFIT_ANCHOR_MIN_SCORE]
if compatible:
return compatible
return [item for _, item in ranked]
def _text_selector_item_payload(item: dict[str, Any], index: int) -> dict[str, Any]:
return {
"index": index,
"id": str(item.get("id") or ""),
"type": str(item.get("type") or ""),
"category": str(item.get("category") or "Unknown"),
"color": str(item.get("color") or "Unknown"),
"pattern": str(item.get("pattern") or "Unknown"),
"fabric": str(item.get("fabric") or "Unknown"),
"fit": str(item.get("fit") or "Unknown"),
"season": str(item.get("season") or "Unknown"),
"style": str(item.get("style") or "Unknown"),
"occasion": str(item.get("occasion") or "Unknown"),
}
def _select_grid_candidates_with_text_ai(
candidates: list[dict[str, Any]],
slot_name: str,
occasion: str,
limit: int,
anchor_mode: str,
anchor_item: dict[str, Any] | None,
) -> list[dict[str, Any]]:
if len(candidates) <= limit:
return candidates
if not OUTFIT_TEXT_PRESELECT_ENABLED:
return candidates[:limit]
candidate_payload = [
_text_selector_item_payload(item, idx + 1)
for idx, item in enumerate(candidates)
]
anchor_payload = _text_selector_item_payload(anchor_item, 0) if anchor_item else None
prompt = (
"You are a fashion ranking assistant for candidate preselection.\n"
f"Occasion: {occasion}\n"
f"Slot to rank: {slot_name}\n"
f"Anchor mode: {anchor_mode}\n"
f"Keep exactly {limit} items if possible (or fewer when candidates are fewer).\n\n"
"Goal:\n"
"Select the strongest candidates for downstream outfit matching using only textual metadata.\n"
"Prioritize occasion relevance, compatibility with anchor context, and diversity in color/pattern/style.\n\n"
"Rules:\n"
"1. Choose only IDs from the provided candidates.\n"
"2. No duplicate IDs.\n"
"3. Return strictly valid JSON and nothing else.\n"
"4. Prefer candidates that maximize useful pairing variety, not near-duplicates.\n\n"
"Return EXACT shape:\n"
"{\"selected_ids\":[\"id1\",\"id2\"]}\n\n"
f"Anchor item JSON:\n{json.dumps(anchor_payload, ensure_ascii=True)}\n\n"
f"Candidate items JSON:\n{json.dumps(candidate_payload, ensure_ascii=True)}"
)
try:
response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS)
parsed_payload = parse_json_from_text(response_text)
selected_ids_raw = parsed_payload.get("selected_ids") if isinstance(parsed_payload, dict) else None
selected_indices_raw = parsed_payload.get("selected_indices") if isinstance(parsed_payload, dict) else None
selected_ids: list[str] = []
if isinstance(selected_ids_raw, list):
selected_ids = [
str(value).strip()
for value in selected_ids_raw
if str(value).strip()
]
if not selected_ids and isinstance(selected_indices_raw, list):
for raw_index in selected_indices_raw:
try:
index = int(raw_index)
except (TypeError, ValueError):
continue
if 1 <= index <= len(candidates):
selected_ids.append(str(candidates[index - 1].get("id") or "").strip())
elif 0 <= index < len(candidates):
selected_ids.append(str(candidates[index].get("id") or "").strip())
id_to_item = {
str(item.get("id") or "").strip(): item
for item in candidates
}
selected: list[dict[str, Any]] = []
seen: set[str] = set()
for item_id in selected_ids:
if not item_id or item_id in seen:
continue
item = id_to_item.get(item_id)
if not item:
continue
selected.append(item)
seen.add(item_id)
if len(selected) >= limit:
break
if len(selected) < limit:
for item in candidates:
item_id = str(item.get("id") or "").strip()
if item_id in seen:
continue
selected.append(item)
seen.add(item_id)
if len(selected) >= limit:
break
selected = selected[:limit]
if len(selected) == limit:
print(
f"[outfit-preselect] slot={slot_name} mode={anchor_mode} "
f"in={len(candidates)} kept={len(selected)} strategy={OUTFIT_TEXT_SELECTOR_NAME}"
)
return selected
raise NvidiaPayloadError(
f"Text preselector returned insufficient candidates: requested={limit} got={len(selected)}"
)
except Exception as exc:
print(
f"[outfit-preselect] slot={slot_name} mode={anchor_mode} "
f"in={len(candidates)} kept={limit} strategy=head reason={exc!r}"
)
return candidates[:limit]
_WEDDING_ETHNIC_TOPWEAR_CATEGORIES = {
"kurta", "sherwani", "nehru jacket", "bandhgala", "achkan",
"saree blouse", "lehenga choli", "anarkali",
}
def _filter_garments_for_wedding(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Use a single LLM call to select only wedding-appropriate garments
and reassign their slot type for outfit scoring.
Ethnic/formal pieces currently typed as 'others' (sherwanis, kurtas,
blazers, etc.) are promoted to 'topwear' so they enter the scoring grid
as primary tops instead of only appearing in the "lock other" section."""
if not items:
return items
garment_summaries = []
for item in items:
garment_summaries.append({
"id": str(item.get("id") or ""),
"type": str(item.get("type") or ""),
"category": str(item.get("category") or "Unknown"),
"color": str(item.get("color") or "Unknown"),
"pattern": str(item.get("pattern") or "Unknown"),
"fabric": str(item.get("fabric") or "Unknown"),
"fit": str(item.get("fit") or "Unknown"),
"style": str(item.get("style") or "Unknown"),
"occasion": str(item.get("occasion") or "Unknown"),
})
prompt = (
"You are a fashion expert selecting garments suitable for a WEDDING occasion.\n\n"
"Wedding-appropriate garments include:\n"
"- Ethnic wear: kurtas, sherwanis, nehru jackets, bandhgalas, achkans, sarees, lehengas, churidars, dhotis, salwar kameez\n"
"- Formal/semi-formal: blazers, suit jackets, dress shirts, formal trousers, dress pants, waistcoats\n"
"- Elegant pieces: silk fabrics, embroidered items, brocade, velvet\n"
"- Accessories that work for weddings: formal shoes, stoles, dupattas\n\n"
"Garments to EXCLUDE:\n"
"- Casual everyday items: plain t-shirts, basic tees, hoodies, sweatshirts, joggers, denim jeans, gym shorts\n"
"- Sportswear, athleisure, distressed or ripped clothing\n"
"- Very casual items like graphic tees, cargo shorts, flip-flops\n"
"- Basic casual shirts UNLESS they are clearly formal/dress shirts\n\n"
"If a garment is borderline (e.g. a smart chino, a dark polo), include it only if it could realistically "
"be part of a wedding guest outfit.\n\n"
"For each selected garment, also decide its ROLE in a wedding outfit:\n"
"- \"topwear\": items worn on the upper body as the primary piece (kurtas, sherwanis, blazers, dress shirts, suit jackets, nehru jackets, waistcoats)\n"
"- \"bottomwear\": items worn on the lower body (formal trousers, churidars, dress pants, lehengas)\n"
"- \"others\": layering pieces, accessories, footwear, dupattas, stoles\n\n"
"Return strictly valid JSON and nothing else.\n\n"
"Return EXACT shape:\n"
"{\"selected\":[{\"id\":\"...\",\"role\":\"topwear|bottomwear|others\"},...]}\n\n"
f"Garments JSON:\n{json.dumps(garment_summaries, ensure_ascii=True)}"
)
try:
response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS)
parsed = parse_json_from_text(response_text)
selected_raw = parsed.get("selected") if isinstance(parsed, dict) else None
if not isinstance(selected_raw, list):
print("[wedding-filter] LLM returned no selected list, using all items")
return items
role_map: dict[str, str] = {}
for entry in selected_raw:
if not isinstance(entry, dict):
continue
item_id = str(entry.get("id") or "").strip()
role = str(entry.get("role") or "").strip().lower()
if item_id and role in {"topwear", "bottomwear", "others"}:
role_map[item_id] = role
if not role_map:
print("[wedding-filter] LLM returned empty selections, using all items")
return items
filtered: list[dict[str, Any]] = []
for item in items:
item_id = str(item.get("id") or "").strip()
if item_id not in role_map:
continue
promoted = {**item, "type": role_map[item_id]}
filtered.append(promoted)
print(
f"[wedding-filter] in={len(items)} kept={len(filtered)} "
f"tops={sum(1 for i in filtered if i.get('type') == 'topwear')} "
f"bottoms={sum(1 for i in filtered if i.get('type') == 'bottomwear')} "
f"others={sum(1 for i in filtered if i.get('type') == 'others')}"
)
return filtered if filtered else items
except Exception as exc:
print(f"[wedding-filter] LLM call failed reason={exc!r}, using all items")
return items
def _resolve_outfit_grid_sources(
wardrobe_items: list[dict[str, Any]],
occasion: str,
top_selected: dict[str, Any] | None,
bottom_selected: dict[str, Any] | None,
other_selected: dict[str, Any] | None,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], str, dict[str, Any] | None]:
all_tops = [i for i in wardrobe_items if i.get("type") == "topwear"]
all_bottoms = [i for i in wardrobe_items if i.get("type") == "bottomwear"]
all_others = [
i
for i in wardrobe_items
if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"}
]
other_pool = [other_selected] if other_selected else []
if top_selected and bottom_selected:
anchor_mode = "locked-top+locked-bottom+locked-other" if other_selected else "locked-top+locked-bottom"
return [top_selected], [bottom_selected], other_pool, anchor_mode, other_selected or top_selected
if top_selected:
ranked_bottoms = _rank_anchor_candidates(
anchor_item=top_selected,
candidates=all_bottoms,
occasion=occasion,
anchor_is_top=True,
)
anchor_mode = "locked-top+locked-other" if other_selected else "locked-top"
return [top_selected], ranked_bottoms, other_pool, anchor_mode, other_selected or top_selected
if bottom_selected:
ranked_tops = _rank_anchor_candidates(
anchor_item=bottom_selected,
candidates=all_tops,
occasion=occasion,
anchor_is_top=False,
)
anchor_mode = "locked-bottom+locked-other" if other_selected else "locked-bottom"
return ranked_tops, [bottom_selected], other_pool, anchor_mode, other_selected or bottom_selected
if other_selected:
return all_tops, all_bottoms, [], "locked-other", other_selected
return all_tops, all_bottoms, [], "none", None
def _normalize_ai_outfit_payload(
parsed_payload: dict[str, Any],
item_lookup: dict[str, dict[str, Any]],
occasion: str,
case_name: str,
top_k: int,
total_combinations: int,
session_id: str,
) -> dict[str, Any]:
raw_recommendations = parsed_payload.get("recommendations")
if not isinstance(raw_recommendations, list):
raw_recommendations = parsed_payload.get("outfits")
if not isinstance(raw_recommendations, list):
raw_recommendations = parsed_payload.get("top_outfits")
if not isinstance(raw_recommendations, list):
raise NvidiaPayloadError(f"AI outfit scorer returned no recommendation list: {parsed_payload}")
recommendations: list[dict[str, Any]] = []
for raw_entry in raw_recommendations:
if not isinstance(raw_entry, dict):
continue
top_index, bottom_index, other_index = _extract_grid_indices(raw_entry)
top_item = item_lookup.get(top_index)
bottom_item = item_lookup.get(bottom_index)
other_item = item_lookup.get(other_index) if other_index else None
if not top_item or not bottom_item:
continue
base_breakdown = raw_entry.get("breakdown") if isinstance(raw_entry.get("breakdown"), dict) else {}
breakdown = {
"color": _clamp_score(base_breakdown.get("color"), 70),
"style": _clamp_score(base_breakdown.get("style"), 70),
"occasion": _clamp_score(base_breakdown.get("occasion"), 70),
"fit": _clamp_score(base_breakdown.get("fit"), 70),
"pattern": _clamp_score(base_breakdown.get("pattern"), 70),
"season": _clamp_score(base_breakdown.get("season"), 70),
}
recommendation = {
"rank": 0,
"score": _clamp_score(raw_entry.get("score"), 0),
"breakdown": breakdown,
"reason": str(raw_entry.get("reason") or "AI-generated outfit recommendation."),
"tip": str(raw_entry.get("tip") or "Use matching accessories to complete this look."),
"combination": f"{top_index} + {bottom_index}" + (f" + {other_index}" if other_item else ""),
"grid_session_id": session_id,
"top": {
"id": top_item.get("id"),
"category": top_item.get("category"),
"color": top_item.get("color"),
"image_url": top_item.get("image_url", ""),
},
"bottom": {
"id": bottom_item.get("id"),
"category": bottom_item.get("category"),
"color": bottom_item.get("color"),
"image_url": bottom_item.get("image_url", ""),
},
}
if other_item:
recommendation["other"] = {
"id": other_item.get("id"),
"category": other_item.get("category"),
"color": other_item.get("color"),
"image_url": other_item.get("image_url", ""),
}
recommendations.append(recommendation)
if not recommendations:
raise NvidiaPayloadError(f"AI outfit scorer returned no valid index-mapped recommendations: {parsed_payload}")
recommendations.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True)
recommendations = recommendations[:top_k]
for index, outfit in enumerate(recommendations, start=1):
outfit["rank"] = index
if case_name == "D":
return {
"occasion": occasion,
"case": case_name,
"selected_outfit_score": recommendations[0],
"recommendations": [],
"improved_recommendations": recommendations,
"total_combinations_checked": total_combinations,
"notice": None,
"grid_session_id": session_id,
"engine_version": OUTFIT_AI_SCORER_NAME,
}
return {
"occasion": occasion,
"case": case_name,
"selected_outfit_score": None,
"recommendations": recommendations,
"improved_recommendations": [],
"total_combinations_checked": total_combinations,
"notice": None,
"grid_session_id": session_id,
"engine_version": OUTFIT_AI_SCORER_NAME,
}
def _recommend_outfits_with_ai_grid(
wardrobe_items: list[dict[str, Any]],
occasion: str,
top_selected: dict[str, Any] | None,
bottom_selected: dict[str, Any] | None,
other_selected: dict[str, Any] | None,
weather: dict[str, Any] | None,
user_profile: dict[str, Any] | None,
region: str,
top_k: int,
case_name: str,
) -> dict[str, Any]:
top_source, bottom_source, other_source, anchor_mode, anchor_item = _resolve_outfit_grid_sources(
wardrobe_items=wardrobe_items,
occasion=occasion,
top_selected=top_selected,
bottom_selected=bottom_selected,
other_selected=other_selected,
)
top_pool_count = len(top_source)
bottom_pool_count = len(bottom_source)
if not top_selected and len(top_source) > OUTFIT_GRID_MAX_TOP_ITEMS:
top_source = _select_grid_candidates_with_text_ai(
candidates=top_source,
slot_name="topwear",
occasion=occasion,
limit=OUTFIT_GRID_MAX_TOP_ITEMS,
anchor_mode=anchor_mode,
anchor_item=anchor_item,
)
if not bottom_selected and len(bottom_source) > OUTFIT_GRID_MAX_BOTTOM_ITEMS:
bottom_source = _select_grid_candidates_with_text_ai(
candidates=bottom_source,
slot_name="bottomwear",
occasion=occasion,
limit=OUTFIT_GRID_MAX_BOTTOM_ITEMS,
anchor_mode=anchor_mode,
anchor_item=anchor_item,
)
if not top_source or not bottom_source:
raise NvidiaPayloadError("AI outfit scorer requires at least one topwear and one bottomwear item.")
grid_session = _build_outfit_grid_session(
tops=top_source,
bottoms=bottom_source,
others=other_source,
occasion=occasion,
user_profile=user_profile,
weather=weather,
region=region,
)
combination_count = len(top_source) * len(bottom_source) * (
len(other_source) if other_selected else (len(other_source) + 1 if other_source else 1)
)
prompt = _grid_scoring_prompt(
metadata_map=grid_session["metadata_map"],
occasion=occasion,
weather=weather,
user_profile=user_profile,
region=region,
anchor_mode=anchor_mode,
anchor_item=anchor_item,
locked_top_index="1:1" if top_selected else None,
locked_bottom_index="2:1" if bottom_selected else None,
locked_other_index="3:1" if other_selected else None,
combination_count=combination_count,
top_k=top_k,
)
if not other_source:
prompt = (
f"{prompt}\n\n"
"Important: This grid contains only Row 1 (Topwear) and Row 2 (Bottomwear). "
"Always set other_index to null."
)
print(
f"[outfit-grid] mode={anchor_mode} session={grid_session['session_id']} "
f"tops_in={top_pool_count} bottoms_in={bottom_pool_count} "
f"rows=2 tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} "
f"combinations={combination_count}"
)
model_text = run_nvidia_inference(grid_session["image"], prompt, max_tokens=OUTFIT_AI_MAX_TOKENS)
parsed_payload = parse_json_from_text(model_text)
if not parsed_payload:
raise NvidiaPayloadError(f"AI outfit scorer returned unparsable JSON: {model_text[:500]}")
result = _normalize_ai_outfit_payload(
parsed_payload=parsed_payload,
item_lookup=grid_session["item_lookup"],
occasion=occasion,
case_name=case_name,
top_k=top_k,
total_combinations=combination_count,
session_id=grid_session["session_id"],
)
print(
f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} "
f"mode={anchor_mode} session={grid_session['session_id']} "
f"tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} "
f"combinations={combination_count}"
)
return result
def _raise_http_error(exc: Exception) -> NoReturn:
print("Classification request failed:", repr(exc))
traceback.print_exc()
if isinstance(exc, RuntimeError) and str(exc) == NVIDIA_API_KEY_MISSING_DETAIL:
raise HTTPException(status_code=503, detail=NVIDIA_API_KEY_MISSING_DETAIL) from exc
if isinstance(exc, NvidiaGatewayError):
raise HTTPException(status_code=exc.status_code, detail=str(exc)) from exc
if isinstance(exc, NvidiaPayloadError):
raise HTTPException(status_code=502, detail=str(exc)) from exc
if isinstance(exc, requests.RequestException):
raise HTTPException(status_code=502, detail=f"NVIDIA API request failed: {exc}") from exc
raise HTTPException(status_code=500, detail=str(exc)) from exc
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/")
def root() -> dict[str, str]:
return {"status": "ok", "message": "Wardrobe Classifier API v2"}
@app.get("/health")
def health() -> dict[str, str]:
return {
"status": "ok",
"classification_provider": "nvidia",
"model": NVIDIA_MODEL_ID,
"nvidia_api_configured": str(bool(_nvidia_api_key())),
"nvidia_invoke_url": NVIDIA_INVOKE_URL,
"engine_version": "scoring-v2",
"outfit_matching_provider": "nemotron",
}
@app.post("/product-urls")
def product_urls(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
color = str(payload.get("color") or "")
category = str(payload.get("category") or "")
gender = payload.get("gender")
max_products = int(payload.get("max_products") or 30)
store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike"))
if not color or not category:
raise HTTPException(status_code=400, detail="color and category are required")
recommendation = ScraperRecommendation(
color=color,
category=category,
gender=str(gender) if gender else None,
)
try:
search_urls = _build_store_search_urls_from_recommendation(
recommendation,
store=store,
occasion="",
)
products: list[dict[str, str]] = []
seen_links: set[str] = set()
for search_url in search_urls:
for product in _extract_store_product_summaries(search_url, store=store):
item_link = str(product.get("item_link") or "").strip()
if not item_link or item_link in seen_links:
continue
seen_links.add(item_link)
products.append(product)
if len(products) >= max_products:
break
if len(products) >= max_products:
break
response_payload: dict[str, Any] = {
"store": store,
"search_urls": search_urls,
"product_urls": [item["item_link"] for item in products],
"products": products,
"count": len(products),
}
response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload)
return response_payload
except requests.RequestException as exc:
raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc
@app.post("/suggestions")
@app.post("/api/suggestions")
def suggestions(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
occasion = str(payload.get("occasion") or "casual")
target_category = str(payload.get("target_category") or payload.get("targetCategory") or "both")
gender_preference = str(payload.get("gender_preference") or payload.get("genderPreference") or "any")
filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {}
max_results = int(payload.get("max_results") or payload.get("maxResults") or 8)
store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike"))
if max_results < 1:
raise HTTPException(status_code=400, detail="max_results must be at least 1")
try:
return _build_shopping_suggestions_from_scraper(
occasion=occasion,
target_category=target_category,
gender_preference=gender_preference,
filters=filters,
max_results=max_results,
store=store,
)
except NvidiaGatewayError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
except NvidiaPayloadError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
except requests.RequestException as exc:
raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc
@app.post("/scraper/recommend")
def scraper_recommend(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
user_prompt = str(payload.get("user_prompt") or payload.get("prompt") or "").strip()
inferred = _infer_structured_request_from_prompt(user_prompt)
inferred_target_category = _normalize_target_category(inferred.get("target_category"))
occasion = str(payload.get("occasion") or inferred.get("occasion") or "casual")
if _norm(occasion) in {"", "auto", "any"}:
occasion = str(inferred.get("occasion") or "casual")
gender = str(payload.get("gender") or inferred.get("gender") or "")
payload_target_category = _normalize_target_category(
payload.get("target_category") or payload.get("targetCategory") or "both"
)
target_category = (
inferred_target_category
if inferred_target_category in {"topwear", "bottomwear"}
else payload_target_category
if payload_target_category in {"topwear", "bottomwear"}
else "both"
)
filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {}
inferred_colors = inferred.get("preferred_colors") if isinstance(inferred.get("preferred_colors"), list) else []
inferred_include = inferred.get("include_keywords") if isinstance(inferred.get("include_keywords"), list) else []
inferred_exclude = inferred.get("exclude_keywords") if isinstance(inferred.get("exclude_keywords"), list) else []
filters = {
**filters,
"preferred_colors": [
*([str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()]),
*([str(value) for value in inferred_colors if str(value).strip()]),
],
"include_keywords": [
*([str(value) for value in (filters.get("include_keywords") or []) if str(value).strip()]),
*([str(value) for value in inferred_include if str(value).strip()]),
],
"exclude_keywords": [
*([str(value) for value in (filters.get("exclude_keywords") or []) if str(value).strip()]),
*([str(value) for value in inferred_exclude if str(value).strip()]),
],
}
preference_parts = [str(payload.get("preferences") or "").strip(), user_prompt]
preferences = ", ".join(part for part in preference_parts if part)
max_products_raw = payload.get("max_products")
max_products = int(max_products_raw) if max_products_raw not in {None, ""} else None
store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike"))
if isinstance(max_products, int) and max_products < 1:
raise HTTPException(status_code=400, detail="max_products must be at least 1")
try:
return _generate_scraper_plan_with_nemotron(
occasion=occasion,
gender=gender,
preferences=preferences,
user_prompt=user_prompt,
target_category=target_category,
filters=filters,
max_products=max_products,
store=store,
strict_nemotron=True,
)
except NvidiaGatewayError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
except NvidiaPayloadError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
except requests.RequestException as exc:
raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc
@app.get("/scraper")
def scraper_page() -> Response:
wardrobe_snapshot = _wardrobe_metadata_snapshot(limit=12)
wardrobe_json = html_lib.escape(json.dumps(wardrobe_snapshot, ensure_ascii=True, indent=2))
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Wardrobe Assistant Child</title>
<style>
:root {{
--bg: #0f172a;
--panel: #111827;
--panel-2: #1f2937;
--text: #e5e7eb;
--muted: #9ca3af;
--accent: #22c55e;
--accent-2: #38bdf8;
--border: rgba(255,255,255,0.08);
}}
* {{ box-sizing: border-box; }}
body {{ margin: 0; font-family: Arial, sans-serif; background: radial-gradient(circle at top, #1e293b, #020617 70%); color: var(--text); }}
.wrap {{ max-width: 1200px; margin: 0 auto; padding: 24px; }}
.hero {{ display: grid; gap: 16px; grid-template-columns: 1.4fr 1fr; align-items: start; }}
.card {{ background: rgba(17,24,39,0.88); border: 1px solid var(--border); border-radius: 20px; padding: 20px; backdrop-filter: blur(10px); box-shadow: 0 20px 60px rgba(0,0,0,0.28); }}
h1 {{ margin: 0 0 8px; font-size: clamp(28px, 4vw, 46px); }}
p {{ line-height: 1.6; color: var(--muted); }}
label {{ display: block; margin: 14px 0 6px; font-size: 13px; color: #cbd5e1; }}
input, select, textarea, button {{ width: 100%; border-radius: 14px; border: 1px solid var(--border); background: rgba(15,23,42,0.9); color: var(--text); padding: 12px 14px; font-size: 14px; }}
textarea {{ min-height: 110px; resize: vertical; }}
button {{ background: linear-gradient(135deg, var(--accent), var(--accent-2)); color: #fff; border: 0; font-weight: 700; cursor: pointer; margin-top: 16px; }}
.grid {{ display: grid; gap: 16px; grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); margin-top: 18px; }}
.muted {{ color: var(--muted); font-size: 13px; }}
.products {{ display: grid; gap: 14px; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); margin-top: 18px; }}
.product {{ background: rgba(15,23,42,0.8); border: 1px solid var(--border); border-radius: 18px; overflow: hidden; }}
.product img {{ width: 100%; aspect-ratio: 1 / 1; object-fit: cover; display: block; background: #0b1220; }}
.product .body {{ padding: 12px; }}
.pill {{ display: inline-block; padding: 4px 10px; border-radius: 999px; background: rgba(56,189,248,0.12); color: #7dd3fc; font-size: 12px; margin-right: 8px; margin-bottom: 8px; }}
pre {{ white-space: pre-wrap; word-break: break-word; background: rgba(2,6,23,0.9); padding: 14px; border-radius: 16px; border: 1px solid var(--border); overflow: auto; }}
a {{ color: #93c5fd; }}
.status {{ margin-top: 12px; color: #d1fae5; }}
</style>
</head>
<body>
<div class="wrap">
<div class="hero">
<div class="card">
<h1>Wardrobe Assistant Child</h1>
<p>Nemotron reads wardrobe metadata, builds a context-aware shopping query, and returns matching products with links, names, prices, and images.</p>
<div class="grid">
<div>
<label for="occasion">Occasion</label>
<select id="occasion">
<option>casual</option>
<option>formal</option>
<option>work</option>
<option>party</option>
<option>sports</option>
<option>ethnic</option>
</select>
</div>
<div>
<label for="gender">Gender</label>
<select id="gender">
<option value="">auto</option>
<option>men</option>
<option>women</option>
<option>unisex</option>
</select>
</div>
<input type="hidden" id="store" value="nike" />
<div>
<label for="max_products">Max Products</label>
<input id="max_products" type="number" min="1" max="30" value="12" />
</div>
</div>
<label for="preferences">Other Preferences</label>
<textarea id="preferences" placeholder="Example: formal office look, breathable fabric, neutral tones, regular fit, avoid oversized silhouettes"></textarea>
<button id="runBtn">Generate Nemotron Query and Scrape</button>
<div id="status" class="status"></div>
</div>
<div class="card">
<h2 style="margin-top:0;">Wardrobe Metadata Snapshot</h2>
<p class="muted">Current items loaded from the database are used by Nemotron to shape the shopping query.</p>
<pre id="wardrobeSnapshot">{wardrobe_json}</pre>
</div>
</div>
<div class="card" style="margin-top:16px;">
<h2 style="margin-top:0;">Nemotron Query Plan</h2>
<pre id="queryPlan">Run the search to generate a wardrobe-aware query.</pre>
</div>
<div class="card" style="margin-top:16px;">
<h2 style="margin-top:0;">Results</h2>
<div id="products" class="products"></div>
</div>
</div>
<script>
const runBtn = document.getElementById('runBtn');
const statusEl = document.getElementById('status');
const planEl = document.getElementById('queryPlan');
const productsEl = document.getElementById('products');
function escapeHtml(value) {{
return String(value || '')
.replaceAll('&', '&amp;')
.replaceAll('<', '&lt;')
.replaceAll('>', '&gt;')
.replaceAll('"', '&quot;')
.replaceAll("'", '&#39;');
}}
function renderProducts(products) {{
if (!products || !products.length) {{
productsEl.innerHTML = '<p class="muted">No products found.</p>';
return;
}}
productsEl.innerHTML = products.map((product) => `
<div class="product">
<img src="${{escapeHtml(product.image_url || '')}}" alt="${{escapeHtml(product.name || 'Product')}}" />
<div class="body">
<div class="pill">${{escapeHtml(product.price || 'N/A')}}</div>
<h3 style="margin:8px 0 6px; font-size:16px;">${{escapeHtml(product.name || 'Unnamed product')}}</h3>
<div class="muted" style="margin-bottom:10px;">${{escapeHtml(product.item_link || '')}}</div>
<a href="${{escapeHtml(product.item_link || '#')}}" target="_blank" rel="noreferrer">Open product</a>
</div>
</div>
`).join('');
}}
runBtn.addEventListener('click', async () => {{
statusEl.textContent = 'Generating query with Nemotron and scraping products...';
productsEl.innerHTML = '';
try {{
const payload = {{
occasion: document.getElementById('occasion').value,
gender: document.getElementById('gender').value,
store: 'nike',
preferences: document.getElementById('preferences').value,
max_products: Number(document.getElementById('max_products').value || 12),
}};
const response = await fetch('/scraper/recommend', {{
method: 'POST',
headers: {{ 'Content-Type': 'application/json' }},
body: JSON.stringify(payload),
}});
const data = await response.json();
if (!response.ok) {{
throw new Error(data.detail || 'Request failed');
}}
planEl.textContent = JSON.stringify({{
query_plan: data.query_plan || {{}},
intermediate_steps: data.intermediate_steps || [],
}}, null, 2);
renderProducts(data.products || []);
statusEl.textContent = `Loaded ${{data.count || 0}} products. Saved JSON: ${{data.saved_json_path || 'not saved'}}`;
}} catch (error) {{
statusEl.textContent = `Error: ${{error.message}}`;
planEl.textContent = 'Query planning failed.';
}}
}});
</script>
</body>
</html>
"""
return Response(content=html_content, media_type="text/html")
@app.post("/classify")
async def classify(image: UploadFile = File(...)) -> dict[str, str]:
if not image.filename:
raise HTTPException(status_code=400, detail="No image selected")
try:
raw = await image.read()
pil_image = Image.open(io.BytesIO(raw)).convert("RGB")
pil_image.thumbnail((512, 512))
model_text = run_nvidia_inference(pil_image, CLASSIFICATION_PROMPT)
return normalize_specs(parse_json_from_text(model_text))
except HTTPException:
raise
except Exception as e:
_raise_http_error(e)
@app.post("/upload")
async def upload(image: UploadFile = File(...)) -> dict[str, Any]:
if not image.filename:
raise HTTPException(status_code=400, detail="No image selected")
try:
raw = await image.read()
pil_image = Image.open(io.BytesIO(raw)).convert("RGB")
pil_image.thumbnail((512, 512))
model_text = run_nvidia_inference(pil_image, CLASSIFICATION_PROMPT)
specs = normalize_specs(parse_json_from_text(model_text))
item_id = str(uuid.uuid4())
raw_record = {
"id": item_id,
"image_url": f"memory://{item_id}",
"description": specs,
"created_at": _now_iso(),
}
normalized = _normalize_wardrobe_item(raw_record)
return item_insert(normalized)
except HTTPException:
raise
except Exception as e:
_raise_http_error(e)
@app.get("/items")
def get_items() -> dict[str, list[dict[str, Any]]]:
return {"items": item_get_all()}
@app.delete("/items/{item_id}")
def delete_item_endpoint(item_id: str) -> dict[str, Any]:
if not item_delete(item_id):
raise HTTPException(status_code=404, detail="Item not found")
return {"success": True, "id": item_id}
@app.put("/items/{item_id}")
def update_item_endpoint(
item_id: str,
payload: dict[str, Any] = Body(default_factory=dict),
) -> dict[str, Any]:
updated = item_update(item_id, payload)
if updated is None:
raise HTTPException(status_code=404, detail="Item not found")
return updated
@app.post("/feedback")
def post_feedback(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
top_id = str(payload.get("top_id") or "")
bottom_id = str(payload.get("bottom_id") or "")
action = str(payload.get("action") or "")
occasion = str(payload.get("occasion") or "casual")
score = payload.get("score")
if not top_id or not bottom_id:
raise HTTPException(status_code=400, detail="top_id and bottom_id required")
if action not in {"wear", "skip", "save"}:
raise HTTPException(status_code=400, detail="action must be 'wear', 'skip', or 'save'")
if item_get(top_id) is None:
raise HTTPException(status_code=404, detail=f"top item {top_id} not found")
if item_get(bottom_id) is None:
raise HTTPException(status_code=404, detail=f"bottom item {bottom_id} not found")
return feedback_record(top_id, bottom_id, action, occasion, score)
@app.post("/ai/score-outfit")
def ai_score_outfit(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
top = payload.get("top")
bottom = payload.get("bottom")
other = payload.get("other")
occasion = str(payload.get("occasion") or "casual")
weather = payload.get("weather") if isinstance(payload.get("weather"), dict) else None
user_profile = payload.get("user_profile") if isinstance(payload.get("user_profile"), dict) else None
region = str(payload.get("region") or "global")
if not isinstance(top, dict) or not isinstance(bottom, dict):
raise HTTPException(status_code=400, detail="Both 'top' and 'bottom' are required")
nt = _normalize_wardrobe_item(top)
nb = _normalize_wardrobe_item(bottom)
no = _normalize_wardrobe_item(other) if isinstance(other, dict) else None
engine_version = OUTFIT_FALLBACK_SCORER_NAME
try:
ai_payload = _recommend_outfits_with_ai_grid(
wardrobe_items=[item for item in [nt, nb, no] if item is not None],
occasion=occasion,
top_selected=nt,
bottom_selected=nb,
other_selected=no,
weather=weather,
user_profile=user_profile,
region=region,
top_k=1,
case_name="D",
)
res = ai_payload.get("selected_outfit_score")
if not isinstance(res, dict):
raise NvidiaPayloadError(f"AI scorer returned no selected outfit score: {ai_payload}")
engine_version = str(ai_payload.get("engine_version") or OUTFIT_AI_SCORER_NAME)
except Exception as exc:
print(f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} single-pair failed reason={exc!r}")
try:
res = get_recommendation_service().score_outfit(
top=nt,
bottom=nb,
other=no,
occasion=occasion,
weather=weather,
user_profile=user_profile,
region=region,
)
print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=fashion_ai single-pair")
engine_version = str(res.get("engine_version") or OUTFIT_FALLBACK_SCORER_NAME)
except Exception as fallback_exc:
print(
f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} "
f"source=scoring-v2 single-pair reason={fallback_exc!r}"
)
res = score_pair_full(nt, nb, occasion, other=no)
engine_version = "scoring-v2"
return {
"score": res["score"],
"color_score": res["breakdown"]["color"],
"style_score": res["breakdown"]["style"],
"occasion_score": res["breakdown"]["occasion"],
"fit_score": res["breakdown"]["fit"],
"pattern_score": res["breakdown"]["pattern"],
"season_score": res["breakdown"].get("season", 0),
"reason": res["reason"],
"tip": res["tip"],
"engine_version": engine_version,
}
@app.post("/ai/gap-analysis")
def ai_gap_analysis(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
wardrobe_raw = payload.get("wardrobe", [])
occasion = str(payload.get("occasion") or "casual")
if not isinstance(wardrobe_raw, list):
raise HTTPException(status_code=400, detail="'wardrobe' must be a list")
wardrobe = [_normalize_wardrobe_item(i if isinstance(i, dict) else {}) for i in wardrobe_raw]
return {"suggestions": _gap_suggestions(wardrobe, occasion)}
@app.post("/ai/recommend-outfits")
def ai_recommend_outfits(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]:
occasion = str(payload.get("occasion") or "casual")
wardrobe_items_raw = payload.get("wardrobe_items", [])
top_selected_raw = payload.get("top_selected")
bottom_selected_raw = payload.get("bottom_selected")
other_selected_raw = payload.get("other_selected")
weather = payload.get("weather") if isinstance(payload.get("weather"), dict) else None
user_profile = payload.get("user_profile") if isinstance(payload.get("user_profile"), dict) else None
region = str(payload.get("region") or "global")
top_k = int(payload.get("top_k") or 5)
candidate_pool = int(payload.get("candidate_pool") or 24)
diversity_lambda = float(payload.get("diversity_lambda") or 0.28)
cache_category = _normalize_cache_category(payload.get("cache_category"))
wardrobe_hash = str(payload.get("wardrobe_hash") or "").strip()
lock_signature = _build_lock_signature_from_payload(payload)
if wardrobe_items_raw is None:
wardrobe_items_raw = []
if not isinstance(wardrobe_items_raw, list):
raise HTTPException(status_code=400, detail="'wardrobe_items' must be a list")
wardrobe_items = [_normalize_wardrobe_item(i if isinstance(i, dict) else {}) for i in wardrobe_items_raw]
if not wardrobe_items:
wardrobe_items = [_normalize_wardrobe_item(i) for i in item_get_all()]
if not wardrobe_items:
return {
"occasion": occasion, "case": "A",
"selected_outfit_score": None, "recommendations": [],
"total_combinations_checked": 0,
"notice": "Your wardrobe is empty. Add garments to get outfit recommendations.",
"engine_version": "scoring-v2",
}
is_wedding = str(occasion or "").strip().lower() == "wedding"
if is_wedding:
others_only = [
i for i in wardrobe_items
if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"}
]
if others_only:
wedding_filtered = _filter_garments_for_wedding(others_only)
for item in wedding_filtered:
item["type"] = "others"
wardrobe_items = wedding_filtered
else:
wardrobe_items = []
top_selected_raw = None
bottom_selected_raw = None
top_selected = _normalize_wardrobe_item(top_selected_raw) if isinstance(top_selected_raw, dict) else None
bottom_selected = _normalize_wardrobe_item(bottom_selected_raw) if isinstance(bottom_selected_raw, dict) else None
other_selected = _normalize_wardrobe_item(other_selected_raw) if isinstance(other_selected_raw, dict) else None
cache_key: str | None = None
if cache_category and wardrobe_hash:
cache_user_id = _extract_cache_user_id(payload, wardrobe_items)
cache_key = _build_matching_cache_key(
cache_user_id,
cache_category,
occasion,
wardrobe_hash,
lock_signature,
)
cached_payload = _matching_cache_get(cache_key)
if cached_payload is not None:
print(f"[matching-cache] hit key={cache_key}")
return cached_payload
print(f"[matching-cache] miss key={cache_key}")
def _maybe_cache(result: dict[str, Any]) -> dict[str, Any]:
if cache_key:
_matching_cache_set(cache_key, result)
print(f"[matching-cache] store key={cache_key}")
return result
all_others = [
i
for i in wardrobe_items
if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"}
]
if is_wedding:
if other_selected and (
other_selected.get("type") == "others"
or other_selected.get("type") not in {"topwear", "bottomwear"}
):
return _maybe_cache(_fallback_rule_recommendations(
occasion=occasion,
case_name="E",
tops=[],
bottoms=[],
others=[other_selected],
top_k=max(1, min(top_k, 20)),
include_pair_outfits=False,
include_other_outfits=True,
))
if all_others:
return _maybe_cache(_fallback_rule_recommendations(
occasion=occasion,
case_name="E",
tops=[],
bottoms=[],
others=all_others,
top_k=max(1, min(top_k, 20)),
include_pair_outfits=False,
include_other_outfits=True,
))
return _maybe_cache({
"occasion": occasion, "case": "A",
"selected_outfit_score": None, "recommendations": [],
"total_combinations_checked": 0,
"notice": "Add at least one item in Others to get wedding recommendations.",
"engine_version": "scoring-v2",
})
if other_selected and (other_selected.get("type") == "others" or other_selected.get("type") not in {"topwear", "bottomwear"}):
return _maybe_cache(_fallback_rule_recommendations(
occasion=occasion,
case_name="E",
tops=[],
bottoms=[],
others=[other_selected],
top_k=max(1, min(top_k, 20)),
include_pair_outfits=False,
include_other_outfits=True,
))
tops, bottoms, others, _, _ = _resolve_outfit_grid_sources(
wardrobe_items=wardrobe_items,
occasion=occasion,
top_selected=top_selected,
bottom_selected=bottom_selected,
other_selected=other_selected,
)
if not tops or not bottoms:
if all_others:
return _maybe_cache(_fallback_rule_recommendations(
occasion=occasion,
case_name="E",
tops=[],
bottoms=[],
others=all_others,
top_k=max(1, min(top_k, 20)),
include_pair_outfits=False,
include_other_outfits=True,
))
return _maybe_cache({
"occasion": occasion, "case": "A",
"selected_outfit_score": None, "recommendations": [],
"total_combinations_checked": 0,
"notice": "You need at least one compatible topwear and one compatible bottomwear item to generate outfits.",
"engine_version": "scoring-v2",
})
case_name = "A"
if top_selected and not bottom_selected:
case_name = "B"
elif bottom_selected and not top_selected:
case_name = "C"
elif top_selected and bottom_selected:
case_name = "D"
top_k = max(1, min(top_k, 20))
candidate_pool = max(4, min(candidate_pool, 64))
diversity_lambda = max(0.0, min(diversity_lambda, 0.95))
priority_other_candidates = (
all_others
if not top_selected and not bottom_selected and not other_selected
else others
)
try:
ai_result = _recommend_outfits_with_ai_grid(
wardrobe_items=wardrobe_items,
occasion=occasion,
top_selected=top_selected,
bottom_selected=bottom_selected,
other_selected=other_selected,
weather=weather,
user_profile=user_profile,
region=region,
top_k=top_k,
case_name=case_name,
)
return _maybe_cache(_merge_standalone_others_for_priority_occasions(
result=ai_result,
occasion=occasion,
others=priority_other_candidates,
top_k=top_k,
))
except Exception as exc:
print(f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} failed reason={exc!r}")
return _maybe_cache(_current_fallback_recommendations(
wardrobe_items=wardrobe_items,
occasion=occasion,
top_selected=top_selected,
bottom_selected=bottom_selected,
other_selected=other_selected,
weather=weather,
user_profile=user_profile,
region=region,
top_k=top_k,
candidate_pool=candidate_pool,
diversity_lambda=diversity_lambda,
case_name=case_name,
tops=tops,
bottoms=bottoms,
others=priority_other_candidates,
))
@app.get("/image-proxy")
def image_proxy(url: str = Query(..., description="Remote image URL")) -> Response:
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
raise HTTPException(status_code=400, detail="Invalid image url")
try:
req = Request(url, headers={"User-Agent": "Mozilla/5.0", "Accept": "image/*,*/*;q=0.8", "Referer": url})
with urlopen(req, timeout=12) as resp:
content = resp.read()
content_type = resp.headers.get("Content-Type", "image/jpeg")
return Response(content=content, media_type=content_type, headers={"Cache-Control": "public, max-age=3600"})
except Exception as e:
raise HTTPException(status_code=502, detail=f"Failed to fetch image: {e}") from e
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)