Spaces:
Running
Running
| """ | |
| app.py — Wardrobe Assistant API (v2) | |
| Changes from v1: | |
| - _item_store (in-memory list) replaced by SQLite via db.py | |
| - All outfit scoring routed through scoring.py (strategic v2 model) | |
| - _gap_suggestions now analyses the actual wardrobe instead of hardcoding items | |
| - /feedback endpoint added for preference data collection | |
| - Score responses include human-readable reason + tip from scoring.py | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import io | |
| import json | |
| import os | |
| import re | |
| import tempfile | |
| import threading | |
| import traceback | |
| import time | |
| import uuid | |
| import html as html_lib | |
| from dataclasses import replace | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, NoReturn | |
| from urllib.error import HTTPError | |
| from urllib.parse import parse_qs, quote_plus, unquote, urlencode, urljoin, urlparse | |
| from urllib.request import Request, urlopen | |
| import requests | |
| from fastapi import Body, FastAPI, File, HTTPException, Query, Response, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from PIL import Image, ImageDraw, ImageFont, ImageOps | |
| from db import ( | |
| init_db, | |
| item_insert, | |
| item_get_all, | |
| item_get, | |
| item_update, | |
| item_delete, | |
| feedback_record, | |
| cache_get, | |
| cache_set, | |
| cache_purge_expired, | |
| ) | |
| from scoring import ( | |
| score_pair_full, | |
| extract_base_color, | |
| extract_style, | |
| extract_fit, | |
| extract_pattern, | |
| extract_season, | |
| _NEUTRALS, | |
| ) | |
| from fashion_ai import get_recommendation_service | |
| from scraper import Recommendation as ScraperRecommendation | |
| from scraper import ( | |
| build_search_urls_from_query as build_nike_search_urls_from_query, | |
| build_search_urls_from_recommendation as build_nike_search_urls_from_recommendation, | |
| extract_product_summaries as extract_nike_product_summaries, | |
| ) | |
| from zalando_scraper import ( | |
| build_zalando_search_url, | |
| build_zalando_search_urls_from_request, | |
| extract_product_summaries as extract_zalando_product_summaries, | |
| ) | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _norm(value: Any) -> str: | |
| return str(value or "").strip().lower() | |
| MATCHING_RESULT_CACHE: dict[str, dict[str, Any]] = {} | |
| MATCHING_RESULT_CACHE_LOCK = threading.Lock() | |
| MATCHING_RESULT_CACHE_MAX = int(os.getenv("MATCHING_RESULT_CACHE_MAX", "500")) | |
| MATCHING_RESULT_CACHE_TTL_SECONDS = int(os.getenv("MATCHING_RESULT_CACHE_TTL_SECONDS", "86400")) | |
| def _matching_cache_storage_key(key: str) -> str: | |
| return f"matching:{key}" | |
| def _normalize_cache_category(value: Any) -> str: | |
| category = _norm(value) | |
| if category in {"topwear", "bottomwear", "others"}: | |
| return category | |
| return "" | |
| def _extract_cache_user_id(payload: dict[str, Any], wardrobe_items: list[dict[str, Any]]) -> str: | |
| payload_user = str(payload.get("user_id") or "").strip() | |
| if payload_user: | |
| return payload_user | |
| for item in wardrobe_items: | |
| candidate = str(item.get("user_id") or "").strip() | |
| if candidate: | |
| return candidate | |
| return "anonymous" | |
| def _build_matching_cache_key( | |
| user_id: str, | |
| category: str, | |
| occasion: str, | |
| wardrobe_hash: str, | |
| lock_signature: str = "", | |
| ) -> str: | |
| return "|".join([ | |
| user_id.strip() or "anonymous", | |
| _normalize_cache_category(category), | |
| _norm(occasion) or "casual", | |
| wardrobe_hash.strip(), | |
| lock_signature.strip(), | |
| ]) | |
| def _matching_cache_get(key: str) -> dict[str, Any] | None: | |
| with MATCHING_RESULT_CACHE_LOCK: | |
| cached = MATCHING_RESULT_CACHE.get(key) | |
| if isinstance(cached, dict): | |
| return json.loads(json.dumps(cached)) | |
| try: | |
| persisted = cache_get(_matching_cache_storage_key(key)) | |
| except Exception as exc: | |
| print(f"[matching-cache] db read failed key={key} reason={exc!r}") | |
| return None | |
| if not isinstance(persisted, dict): | |
| return None | |
| snapshot = json.loads(json.dumps(persisted)) | |
| with MATCHING_RESULT_CACHE_LOCK: | |
| MATCHING_RESULT_CACHE[key] = snapshot | |
| while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX: | |
| oldest_key = next(iter(MATCHING_RESULT_CACHE)) | |
| MATCHING_RESULT_CACHE.pop(oldest_key, None) | |
| return json.loads(json.dumps(snapshot)) | |
| def _matching_cache_set(key: str, payload: dict[str, Any]) -> None: | |
| snapshot = json.loads(json.dumps(payload)) | |
| with MATCHING_RESULT_CACHE_LOCK: | |
| MATCHING_RESULT_CACHE[key] = snapshot | |
| while len(MATCHING_RESULT_CACHE) > MATCHING_RESULT_CACHE_MAX: | |
| oldest_key = next(iter(MATCHING_RESULT_CACHE)) | |
| MATCHING_RESULT_CACHE.pop(oldest_key, None) | |
| try: | |
| cache_set( | |
| _matching_cache_storage_key(key), | |
| snapshot, | |
| ttl_seconds=MATCHING_RESULT_CACHE_TTL_SECONDS, | |
| ) | |
| except Exception as exc: | |
| print(f"[matching-cache] db write failed key={key} reason={exc!r}") | |
| def _build_lock_signature_from_payload(payload: dict[str, Any]) -> str: | |
| explicit = str(payload.get("lock_signature") or "").strip() | |
| if explicit: | |
| return explicit | |
| top_selected_raw = payload.get("top_selected") | |
| bottom_selected_raw = payload.get("bottom_selected") | |
| other_selected_raw = payload.get("other_selected") | |
| top_id = str(top_selected_raw.get("id") or "").strip() if isinstance(top_selected_raw, dict) else "" | |
| bottom_id = str(bottom_selected_raw.get("id") or "").strip() if isinstance(bottom_selected_raw, dict) else "" | |
| other_id = str(other_selected_raw.get("id") or "").strip() if isinstance(other_selected_raw, dict) else "" | |
| if not top_id and not bottom_id and not other_id: | |
| return "" | |
| return f"top:{top_id or '-'}|bottom:{bottom_id or '-'}|other:{other_id or '-'}" | |
| def _infer_type(category: str) -> str: | |
| n = _norm(category) | |
| if any(keyword in n for keyword in ["shirt", "tee", "top", "kurta", "blouse", "hoodie", "sweater", "blazer", "jacket", "polo"]): | |
| return "topwear" | |
| if any(keyword in n for keyword in ["jean", "pant", "trouser", "short", "skirt", "jogger", "palazzo", "chino"]): | |
| return "bottomwear" | |
| return "others" | |
| def _normalize_wardrobe_item(item: dict[str, Any]) -> dict[str, Any]: | |
| description = item.get("description") if isinstance(item.get("description"), dict) else {} | |
| category = str(item.get("category") or description.get("category") or description.get("type") or "Unknown") | |
| item_type = str(item.get("type") or description.get("type") or _infer_type(category)) | |
| return { | |
| "id": item.get("id") or str(uuid.uuid4()), | |
| "image_url": item.get("image_url") or "", | |
| "type": item_type, | |
| "category": category, | |
| "color": str(item.get("color") or description.get("color") or "Unknown"), | |
| "pattern": str(item.get("pattern") or description.get("pattern") or "Solid"), | |
| "fabric": str(item.get("fabric") or description.get("fabric") or "Unknown"), | |
| "fit": str(item.get("fit") or description.get("fit") or "Unknown"), | |
| "season": str(item.get("season") or description.get("season") or "All-Season"), | |
| "style": str(item.get("style") or description.get("occasion") or description.get("style") or "casual"), | |
| "occasion": str(item.get("occasion") or description.get("occasion") or "casual"), | |
| "description": description, | |
| } | |
| def _build_outfit_payload(scored: dict[str, Any], top: dict[str, Any] | None, bottom: dict[str, Any] | None, rank: int, other: dict[str, Any] | None) -> dict[str, Any]: | |
| payload = { | |
| **scored, | |
| "rank": rank, | |
| "top": top, | |
| "bottom": bottom, | |
| } | |
| if other is not None: | |
| payload["other"] = other | |
| return payload | |
| def _gap_suggestions(wardrobe: list[dict[str, Any]], occasion: str) -> list[dict[str, Any]]: | |
| tops = sum(1 for item in wardrobe if _norm(item.get("type")) == "topwear") | |
| bottoms = sum(1 for item in wardrobe if _norm(item.get("type")) == "bottomwear") | |
| suggestions: list[dict[str, Any]] = [] | |
| if tops == 0: | |
| suggestions.append({"focus": "topwear", "suggestion": "Add a versatile topwear staple", "reason": "No topwear items found."}) | |
| if bottoms == 0: | |
| suggestions.append({"focus": "bottomwear", "suggestion": "Add a versatile bottomwear staple", "reason": "No bottomwear items found."}) | |
| if tops and bottoms and abs(tops - bottoms) > 2: | |
| suggestions.append({"focus": "balance", "suggestion": "Balance your wardrobe mix", "reason": "One category is much larger than the other."}) | |
| if not suggestions: | |
| suggestions.append({"focus": "versatility", "suggestion": f"Add one {occasion} piece that can mix with existing staples", "reason": "Wardrobe is already balanced."}) | |
| return suggestions[:4] | |
| SCRAPER_OUTPUT_DIR = Path(__file__).resolve().parent / "scraped_json" | |
| SCRAPER_RUNTIME_RESULTS: dict[str, dict[str, Any]] = {} | |
| SCRAPER_RUNTIME_LOCK = threading.Lock() | |
| SCRAPER_PLANNER_MODEL_ID = os.getenv( | |
| "SCRAPER_PLANNER_MODEL_ID", | |
| "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning", | |
| ) | |
| SCRAPER_PLANNER_MAX_TOKENS = int(os.getenv("SCRAPER_PLANNER_MAX_TOKENS", "800")) | |
| SCRAPER_DEFAULT_STORE = str(os.getenv("SCRAPER_DEFAULT_STORE", "nike")).strip().lower() | |
| def _save_scraper_json_payload(prefix: str, payload: dict[str, Any]) -> str: | |
| SCRAPER_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | |
| file_path = SCRAPER_OUTPUT_DIR / f"{prefix}_{timestamp}.json" | |
| with file_path.open("w", encoding="utf-8") as handle: | |
| json.dump(payload, handle, ensure_ascii=True, indent=2) | |
| return str(file_path) | |
| def _store_scraper_runtime_result(payload: dict[str, Any]) -> dict[str, Any]: | |
| runtime_id = str(uuid.uuid4()) | |
| record = { | |
| "runtime_id": runtime_id, | |
| "created_at": _now_iso(), | |
| **payload, | |
| } | |
| with SCRAPER_RUNTIME_LOCK: | |
| SCRAPER_RUNTIME_RESULTS[runtime_id] = record | |
| while len(SCRAPER_RUNTIME_RESULTS) > 25: | |
| oldest_key = next(iter(SCRAPER_RUNTIME_RESULTS)) | |
| SCRAPER_RUNTIME_RESULTS.pop(oldest_key, None) | |
| return record | |
| def _candidate_text_model_ids(primary_model_id: str) -> list[str]: | |
| return _candidate_model_ids(primary_model_id) | |
| def _run_text_inference_with_model(primary_model_id: str, prompt: str, max_tokens: int) -> str: | |
| api_key = _nvidia_api_key() | |
| if not api_key: | |
| raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) | |
| last_error: Exception | None = None | |
| for model_id in _candidate_text_model_ids(primary_model_id): | |
| current_max_tokens = max_tokens | |
| while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: | |
| payload = { | |
| "model": model_id, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": prompt, | |
| } | |
| ], | |
| "max_tokens": current_max_tokens, | |
| "temperature": NVIDIA_TEMPERATURE, | |
| "top_p": NVIDIA_TOP_P, | |
| "stream": True, | |
| } | |
| if NVIDIA_ENABLE_THINKING: | |
| payload["chat_template_kwargs"] = {"enable_thinking": True} | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Accept": "text/event-stream", | |
| "Content-Type": "application/json", | |
| } | |
| last_error = None | |
| for attempt in range(NVIDIA_MAX_RETRIES + 1): | |
| try: | |
| response = requests.post( | |
| NVIDIA_INVOKE_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=NVIDIA_TIMEOUT_SECONDS, | |
| ) | |
| if response.status_code in {429, 500, 502, 503, 504}: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", | |
| status_code=503 if response.status_code == 429 else 502, | |
| ) | |
| if response.status_code >= 400: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", | |
| status_code=502, | |
| ) | |
| try: | |
| return _extract_streamed_nvidia_text(response) | |
| except NvidiaPayloadError as stream_exc: | |
| # Some providers occasionally terminate SSE without final text; retry once using non-stream mode. | |
| if "stream ended without returning any content" not in str(stream_exc).lower(): | |
| raise | |
| non_stream_payload = {**payload, "stream": False} | |
| non_stream_headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| non_stream_response = requests.post( | |
| NVIDIA_INVOKE_URL, | |
| headers=non_stream_headers, | |
| json=non_stream_payload, | |
| timeout=NVIDIA_TIMEOUT_SECONDS, | |
| ) | |
| if non_stream_response.status_code in {429, 500, 502, 503, 504}: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API transient failure {non_stream_response.status_code}: {non_stream_response.text[:500]}", | |
| status_code=503 if non_stream_response.status_code == 429 else 502, | |
| ) | |
| if non_stream_response.status_code >= 400: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed with {non_stream_response.status_code}: {non_stream_response.text[:500]}", | |
| status_code=502, | |
| ) | |
| try: | |
| parsed_payload = non_stream_response.json() | |
| except ValueError as exc: | |
| raise NvidiaPayloadError( | |
| "NVIDIA non-stream planner response was not valid JSON." | |
| ) from exc | |
| return _extract_nvidia_text(parsed_payload) | |
| except NvidiaTokenLimitError as exc: | |
| last_error = exc | |
| break | |
| except (requests.RequestException, NvidiaGatewayError) as exc: | |
| last_error = exc | |
| if _is_degraded_function_error(exc): | |
| break | |
| if attempt >= NVIDIA_MAX_RETRIES: | |
| break | |
| time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) | |
| except NvidiaPayloadError: | |
| raise | |
| if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: | |
| current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) | |
| continue | |
| if _is_degraded_function_error(last_error or Exception()): | |
| print(f"[nvidia] model degraded, trying fallback model: {model_id}") | |
| break | |
| if isinstance(last_error, NvidiaGatewayError): | |
| raise last_error | |
| if isinstance(last_error, requests.RequestException): | |
| raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error | |
| if last_error is not None: | |
| raise last_error | |
| break | |
| if isinstance(last_error, Exception): | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed on all configured models ({', '.join(_candidate_text_model_ids(primary_model_id))}): {last_error}", | |
| status_code=502, | |
| ) from last_error | |
| raise NvidiaGatewayError( | |
| "NVIDIA API request failed after exhausting reasoning token budget.", | |
| status_code=502, | |
| ) | |
| def run_scraper_planner_text_inference(prompt: str, max_tokens: int = SCRAPER_PLANNER_MAX_TOKENS) -> str: | |
| return _run_text_inference_with_model(SCRAPER_PLANNER_MODEL_ID, prompt, max_tokens) | |
| def _normalize_store_name(value: str | None) -> str: | |
| normalized = _norm(value) | |
| if normalized in {"zalando", "nike"}: | |
| return normalized | |
| return "nike" | |
| def _build_store_query_from_recommendation(recommendation: ScraperRecommendation, occasion: str = "") -> str: | |
| parts = [ | |
| str(recommendation.gender or "").strip(), | |
| str(recommendation.color or "").strip(), | |
| str(recommendation.category or "").strip(), | |
| str(occasion or "").strip(), | |
| ] | |
| return " ".join(part for part in parts if part) | |
| def _build_store_search_urls_from_recommendation( | |
| recommendation: ScraperRecommendation, | |
| store: str, | |
| occasion: str = "", | |
| ) -> list[str]: | |
| store_key = _normalize_store_name(store) | |
| if store_key == "zalando": | |
| query = _build_store_query_from_recommendation(recommendation, occasion=occasion) | |
| return [build_zalando_search_url(query=query, gender=recommendation.gender)] | |
| return build_nike_search_urls_from_recommendation(recommendation, store=store_key) | |
| def _build_store_search_urls_from_query( | |
| query: str, | |
| store: str, | |
| gender: str | None = None, | |
| wardrobe_items: list[dict[str, Any]] | None = None, | |
| requested_category: str | None = None, | |
| ) -> list[str]: | |
| store_key = _normalize_store_name(store) | |
| if store_key == "zalando": | |
| search_urls, _ = build_zalando_search_urls_from_request( | |
| query=query, | |
| gender=gender, | |
| wardrobe_items=wardrobe_items, | |
| requested_category=requested_category, | |
| # URL generation should follow planner output + deterministic rules. | |
| # GPT OSS remains reserved for post-scrape cleanup only. | |
| completion_fn=None, | |
| ) | |
| return search_urls | |
| return build_nike_search_urls_from_query(query=query, store=store_key, gender=gender) | |
| def _extract_store_product_summaries(search_url: str, store: str) -> list[dict[str, str]]: | |
| store_key = _normalize_store_name(store) | |
| if store_key == "zalando": | |
| return extract_zalando_product_summaries( | |
| search_url=search_url, | |
| max_products=None, | |
| use_apify=True, | |
| postprocess=None, | |
| ) | |
| return extract_nike_product_summaries(search_url, store=store_key) | |
| def _wardrobe_metadata_snapshot(limit: int = 30) -> dict[str, Any]: | |
| wardrobe = [_normalize_wardrobe_item(item) for item in item_get_all()] | |
| counts: dict[str, int] = {} | |
| for item in wardrobe: | |
| key = f"{_norm(item.get('type') or 'unknown')}|{_norm(item.get('occasion') or 'unknown')}" | |
| counts[key] = counts.get(key, 0) + 1 | |
| return { | |
| "total_items": len(wardrobe), | |
| "items": [ | |
| { | |
| "id": item.get("id"), | |
| "type": item.get("type"), | |
| "category": item.get("category"), | |
| "color": item.get("color"), | |
| "pattern": item.get("pattern"), | |
| "fit": item.get("fit"), | |
| "season": item.get("season"), | |
| "style": item.get("style"), | |
| "occasion": item.get("occasion"), | |
| } | |
| for item in wardrobe[:limit] | |
| ], | |
| "counts": counts, | |
| } | |
| def _build_scraper_plan_prompt( | |
| occasion: str, | |
| gender: str, | |
| preferences: str, | |
| user_prompt: str, | |
| target_category: str, | |
| filters: dict[str, Any], | |
| wardrobe_snapshot: dict[str, Any], | |
| planning_context: dict[str, Any], | |
| max_products: int | None, | |
| store: str, | |
| ) -> str: | |
| return ( | |
| "ROLE: Senior Fashion Merchandising Strategist & Query Planner for AI Wardrobe Assistant\n\n" | |
| f"OBJECTIVE: Generate exactly one high-precision {store.title()} shopping plan that is context-safe, occasion-safe, and wardrobe-grounded.\n\n" | |
| "---\n\n" | |
| "INPUTS:\n\n" | |
| f"user_request: \"{user_prompt}\"\n" | |
| f"occasion: \"{occasion}\"\n" | |
| f"target_category: \"{target_category}\" // strict slot constraint\n" | |
| f"gender: \"{gender}\"\n" | |
| f"preferences: \"{preferences}\"\n" | |
| f"filters: {json.dumps(filters, ensure_ascii=True)}\n" | |
| f"max_products: {max_products if isinstance(max_products, int) and max_products > 0 else 'uncapped'}\n" | |
| f"planning_context: {json.dumps(planning_context, ensure_ascii=True)}\n" | |
| f"wardrobe_snapshot: {json.dumps(wardrobe_snapshot, ensure_ascii=True)}\n\n" | |
| "---\n\n" | |
| "EXECUTION RULES (Hard Constraints):\n\n" | |
| "1. SLOT LOCK: target_category is immutable. If \"topwear\", NEVER output bottomwear categories (pants, shorts, joggers) and vice versa.\n\n" | |
| "2. OCCASION VETTING: For formal | interview | work | wedding | client_meeting:\n" | |
| " BLOCK: hoodie, sweatshirt, joggers, shorts, tank, tights, leggings, crop-top, sports-bra-as-outerwear\n" | |
| " ALLOW: shirt, polo, blazer_outerwear, sweater, sweatshirt_structured (only if minimal branding)\n\n" | |
| "3. PRIORITY HIERARCHY:\n" | |
| " First: allowed_categories from planning_context\n" | |
| " Second: color_shortlist from planning_context\n" | |
| " Third: reference_item_ids from planning_context (anchor reasoning here)\n\n" | |
| "4. COLOR LOGIC: Select ONE color from color_shortlist that:\n" | |
| " - Complements majority of wardrobe_snapshot bottoms\n" | |
| " - Suits occasion formality\n" | |
| " - Has highest non-conflict score with existing wardrobe\n\n" | |
| "5. If planning_context.color_resonance_scores exists, prefer the highest-scoring color unless user filters explicitly override it.\n\n" | |
| "---\n\n" | |
| "REASONING FRAMEWORK:\n\n" | |
| "Step 1 — Parse wardrobe_snapshot for color frequency, gap categories, and bottom-dominant hues\n" | |
| "Step 2 — Cross-reference planning_context.reference_items for silhouette compatibility\n" | |
| "Step 3 — Filter through occasion veto list\n" | |
| "Step 4 — Select optimal {category, color} pair with highest interoperability score\n" | |
| "Step 5 — Compose commerce-optimized query: gender + color + occasion + category\n\n" | |
| "---\n\n" | |
| "OUTPUT SCHEMA (strict JSON, no markdown, no extra keys):\n\n" | |
| "{\n" | |
| ' "target_category": "topwear|bottomwear",\n' | |
| ' "color": "string from color_shortlist",\n' | |
| ' "category": "string from allowed_categories post-vetting",\n' | |
| ' "gender": "men|women|unisex",\n' | |
| ' "style_direction": "formal-smart|business-casual|casual-polished|etc",\n' | |
| ' "reference_item_ids": ["array from planning_context"],\n' | |
| ' "query": "commerce-ready search string",\n' | |
| ' "wardrobe_grounding": "specific evidence from wardrobe_snapshot",\n' | |
| ' "reason": "concise strategic justification"\n' | |
| "}\n" | |
| ) | |
| def _recover_scraper_plan_from_text( | |
| model_text: str, | |
| planning_context: dict[str, Any], | |
| occasion: str, | |
| gender: str, | |
| ) -> dict[str, Any]: | |
| text = str(model_text or "").strip() | |
| if not text: | |
| return {} | |
| parsed = parse_json_from_text(text) | |
| if isinstance(parsed, dict) and parsed: | |
| return parsed | |
| allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()] | |
| colors = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] | |
| lowered = text.lower() | |
| category = "" | |
| for candidate in allowed: | |
| if candidate.lower() in lowered: | |
| category = candidate | |
| break | |
| if not category and allowed: | |
| category = allowed[0] | |
| color = "" | |
| for candidate in colors: | |
| if candidate.lower() in lowered: | |
| color = candidate | |
| break | |
| if not color and colors: | |
| color = colors[0] | |
| inferred_gender = _normalize_scraper_gender(gender) | |
| if "women" in lowered: | |
| inferred_gender = "women" | |
| elif "men" in lowered: | |
| inferred_gender = "men" | |
| elif "unisex" in lowered: | |
| inferred_gender = "unisex" | |
| query = "" | |
| query_match = re.search(r'"query"\s*:\s*"([^"]+)"', text, flags=re.IGNORECASE) | |
| if query_match: | |
| query = query_match.group(1).strip() | |
| if not query and category and color: | |
| query = _build_planned_query( | |
| inferred_gender, | |
| color, | |
| category, | |
| occasion, | |
| str(planning_context.get("style_direction") or "occasion-aligned"), | |
| ) | |
| if not category or not color or not query: | |
| return {} | |
| return { | |
| "target_category": planning_context.get("resolved_target_category", "topwear"), | |
| "color": color, | |
| "category": category, | |
| "gender": inferred_gender, | |
| "style_direction": planning_context.get("style_direction", "occasion-aligned"), | |
| "reference_item_ids": planning_context.get("reference_item_ids", []), | |
| "query": query, | |
| "reason": "Recovered Nemotron planner output from semi-structured response.", | |
| "source": "nemotron", | |
| } | |
| def _normalize_scraper_gender(value: str | None) -> str | None: | |
| normalized = str(value or "").strip().lower() | |
| if normalized in {"men", "male", "man", "mens"}: | |
| return "men" | |
| if normalized in {"women", "female", "woman", "womens"}: | |
| return "women" | |
| if normalized in {"unisex", "any", "all"}: | |
| return "unisex" | |
| return None | |
| def _normalize_target_category(value: Any) -> str: | |
| normalized = _norm(value) | |
| if normalized in {"topwear", "top", "upper", "tops"}: | |
| return "topwear" | |
| if normalized in {"bottomwear", "bottom", "lower", "bottoms"}: | |
| return "bottomwear" | |
| return "both" | |
| _PROMPT_TO_TARGET_HINTS = { | |
| "topwear": { | |
| "top", "topwear", "shirt", "blazer", "jacket", "polo", "tee", "t-shirt", "kurta", "upper", | |
| }, | |
| "bottomwear": { | |
| "bottom", "bottomwear", "trouser", "trousers", "pants", "jeans", "shorts", "joggers", "lower", | |
| }, | |
| } | |
| _PROMPT_TO_OCCASION_HINTS: dict[str, set[str]] = { | |
| "formal": {"formal", "interview", "office", "work", "business", "meeting", "wedding"}, | |
| "party": {"party", "festive", "diwali", "celebration", "date", "ethnic"}, | |
| "sports": {"sports", "gym", "workout", "training", "running", "run", "active"}, | |
| "casual": {"casual", "daily", "everyday", "weekend", "outing"}, | |
| } | |
| _PROMPT_COLOR_TERMS = [ | |
| "black", "white", "navy", "blue", "grey", "gray", "beige", "olive", "green", "brown", | |
| "khaki", "cream", "maroon", "charcoal", "tan", | |
| ] | |
| def _infer_structured_request_from_prompt(user_prompt: str) -> dict[str, Any]: | |
| normalized = _norm(user_prompt) | |
| if not normalized: | |
| return { | |
| "target_category": "both", | |
| "occasion": "", | |
| "gender": "", | |
| "preferred_colors": [], | |
| "include_keywords": [], | |
| "exclude_keywords": [], | |
| } | |
| target_category = "both" | |
| top_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["topwear"] if token in normalized) | |
| bottom_hits = sum(1 for token in _PROMPT_TO_TARGET_HINTS["bottomwear"] if token in normalized) | |
| if top_hits > bottom_hits and top_hits > 0: | |
| target_category = "topwear" | |
| elif bottom_hits > top_hits and bottom_hits > 0: | |
| target_category = "bottomwear" | |
| occasion = "" | |
| for bucket, tokens in _PROMPT_TO_OCCASION_HINTS.items(): | |
| if any(token in normalized for token in tokens): | |
| occasion = bucket | |
| break | |
| gender = "" | |
| if any(token in normalized for token in {" men", "male", "man", " mens"}): | |
| gender = "men" | |
| elif any(token in normalized for token in {" women", "female", "woman", " womens"}): | |
| gender = "women" | |
| elif "unisex" in normalized: | |
| gender = "unisex" | |
| preferred_colors: list[str] = [] | |
| for color in _PROMPT_COLOR_TERMS: | |
| if color in normalized and color not in preferred_colors: | |
| preferred_colors.append(color) | |
| include_keywords: list[str] = [] | |
| for keyword in ["formal", "structured", "minimal", "smart", "elegant", "tailored"]: | |
| if keyword in normalized and keyword not in include_keywords: | |
| include_keywords.append(keyword) | |
| exclude_keywords: list[str] = [] | |
| for keyword in ["hoodie", "oversized", "ripped", "distressed", "athleisure"]: | |
| if f"avoid {keyword}" in normalized or f"no {keyword}" in normalized or f"without {keyword}" in normalized: | |
| exclude_keywords.append(keyword) | |
| return { | |
| "target_category": target_category, | |
| "occasion": occasion, | |
| "gender": gender, | |
| "preferred_colors": preferred_colors, | |
| "include_keywords": include_keywords, | |
| "exclude_keywords": exclude_keywords, | |
| } | |
| def _occasion_bucket(value: str) -> str: | |
| normalized = _norm(value) | |
| if any(token in normalized for token in {"formal", "interview", "office", "work", "business", "wedding", "meeting"}): | |
| return "formal" | |
| if any(token in normalized for token in {"party", "festive", "diwali", "celebration", "ethnic", "date"}): | |
| return "party" | |
| if any(token in normalized for token in {"sports", "gym", "active", "training", "run", "running"}): | |
| return "sports" | |
| return "casual" | |
| def _top_terms(values: list[str], limit: int = 6) -> list[str]: | |
| counts: dict[str, int] = {} | |
| for value in values: | |
| key = _norm(value) | |
| if not key or key == "unknown": | |
| continue | |
| counts[key] = counts.get(key, 0) + 1 | |
| return [key for key, _ in sorted(counts.items(), key=lambda pair: pair[1], reverse=True)[:limit]] | |
| def _rank_color_resonance( | |
| slot_colors: dict[str, list[str]], | |
| reference_slot: str, | |
| preferred_colors: list[str], | |
| occasion_bucket: str, | |
| ) -> list[dict[str, Any]]: | |
| reference_counts: dict[str, int] = {} | |
| global_counts: dict[str, int] = {} | |
| for raw_color in slot_colors.get(reference_slot, []): | |
| normalized = extract_base_color(raw_color or "") | |
| if not normalized or normalized == "unknown": | |
| continue | |
| reference_counts[normalized] = reference_counts.get(normalized, 0) + 1 | |
| for raw_color in slot_colors.get("topwear", []) + slot_colors.get("bottomwear", []): | |
| normalized = extract_base_color(raw_color or "") | |
| if not normalized or normalized == "unknown": | |
| continue | |
| global_counts[normalized] = global_counts.get(normalized, 0) + 1 | |
| preferred_set = { | |
| extract_base_color(value or "") | |
| for value in preferred_colors | |
| if extract_base_color(value or "") | |
| } | |
| neutral_colors = ["navy", "black", "white", "grey", "beige"] | |
| candidate_pool: list[str] = [] | |
| for candidate in [ | |
| *preferred_colors, | |
| *[key for key, _ in sorted(reference_counts.items(), key=lambda pair: pair[1], reverse=True)], | |
| *[key for key, _ in sorted(global_counts.items(), key=lambda pair: pair[1], reverse=True)], | |
| *neutral_colors, | |
| ]: | |
| normalized = extract_base_color(candidate or "") | |
| if not normalized or normalized == "unknown" or normalized in candidate_pool: | |
| continue | |
| candidate_pool.append(normalized) | |
| formal_boost_colors = {"navy", "black", "white", "grey", "charcoal", "beige"} | |
| sports_boost_colors = {"black", "white", "grey", "navy", "blue", "red", "green"} | |
| ranked: list[dict[str, Any]] = [] | |
| for color in candidate_pool: | |
| reference_count = reference_counts.get(color, 0) | |
| global_count = global_counts.get(color, 0) | |
| preferred_bonus = 2 if color in preferred_set else 0 | |
| occasion_bonus = 0 | |
| if occasion_bucket == "formal" and color in formal_boost_colors: | |
| occasion_bonus = 1 | |
| elif occasion_bucket == "sports" and color in sports_boost_colors: | |
| occasion_bonus = 1 | |
| score = (reference_count * 3) + global_count + preferred_bonus + occasion_bonus | |
| ranked.append( | |
| { | |
| "color": color, | |
| "score": score, | |
| "reference_count": reference_count, | |
| "global_count": global_count, | |
| "preferred": color in preferred_set, | |
| } | |
| ) | |
| ranked.sort( | |
| key=lambda item: ( | |
| int(item.get("score") or 0), | |
| int(item.get("reference_count") or 0), | |
| int(item.get("global_count") or 0), | |
| ), | |
| reverse=True, | |
| ) | |
| return ranked | |
| SCRAPER_CATEGORY_POLICY: dict[str, dict[str, list[str]]] = { | |
| "topwear": { | |
| "formal": ["shirt", "polo", "jacket"], | |
| "party": ["shirt", "jacket", "polo"], | |
| "sports": ["jersey", "t-shirt", "hoodie"], | |
| "casual": ["shirt", "t-shirt", "polo", "jacket", "hoodie"], | |
| }, | |
| "bottomwear": { | |
| "formal": ["trousers", "pants"], | |
| "party": ["trousers", "pants", "jeans"], | |
| "sports": ["joggers", "shorts", "tights", "leggings"], | |
| "casual": ["jeans", "pants", "shorts", "joggers", "trousers"], | |
| }, | |
| } | |
| SCRAPER_FORMAL_DISALLOWED = { | |
| "hoodie", "sweatshirt", "joggers", "shorts", "tank top", "tights", "leggings", | |
| } | |
| SCRAPER_RELEVANCE_EXCLUDE_TOKENS = { | |
| "sock", "socks", "trunk", "trunks", "boxer", "brief", "briefs", "underwear", | |
| "bra", "bralette", "panty", "panties", "bikini", "swim", "swimsuit", | |
| "belt", "cap", "hat", "beanie", "wallet", "bag", "backpack", "watch", | |
| "shoe", "sneaker", "boot", "sandals", "slippers", | |
| } | |
| SCRAPER_CATEGORY_KEYWORDS: dict[str, set[str]] = { | |
| "shirt": {"shirt", "formal shirt", "oxford", "button-down", "button up"}, | |
| "polo": {"polo"}, | |
| "jacket": {"jacket", "blazer", "suit jacket", "sport coat", "coat"}, | |
| "t-shirt": {"t-shirt", "tee", "crew neck"}, | |
| "hoodie": {"hoodie"}, | |
| "trousers": {"trouser", "trousers", "tailored"}, | |
| "pants": {"pant", "pants", "chino"}, | |
| "jeans": {"jeans", "denim"}, | |
| "shorts": {"shorts"}, | |
| "joggers": {"jogger", "joggers"}, | |
| } | |
| def _allowed_categories(target_category: str, occasion_bucket: str) -> list[str]: | |
| target = target_category if target_category in {"topwear", "bottomwear"} else "topwear" | |
| categories = list(SCRAPER_CATEGORY_POLICY.get(target, {}).get(occasion_bucket, [])) | |
| if not categories: | |
| categories = list(SCRAPER_CATEGORY_POLICY[target]["casual"]) | |
| if occasion_bucket == "formal": | |
| categories = [category for category in categories if category not in SCRAPER_FORMAL_DISALLOWED] | |
| return categories | |
| def _resolve_target_category(requested_target: str, wardrobe_snapshot: dict[str, Any]) -> str: | |
| if requested_target in {"topwear", "bottomwear"}: | |
| return requested_target | |
| counts = wardrobe_snapshot.get("counts") if isinstance(wardrobe_snapshot.get("counts"), dict) else {} | |
| top_count = sum(value for key, value in counts.items() if key.startswith("topwear|")) | |
| bottom_count = sum(value for key, value in counts.items() if key.startswith("bottomwear|")) | |
| if top_count <= bottom_count: | |
| return "topwear" | |
| return "bottomwear" | |
| def _product_text_for_relevance(product: dict[str, Any]) -> str: | |
| name = str(product.get("name") or "") | |
| url = str(product.get("item_link") or "") | |
| color = str(product.get("color") or "") | |
| brand = str(product.get("brand") or "") | |
| return _norm(f"{name} {color} {brand} {url}") | |
| SCRAPER_COLOR_KEYWORDS: dict[str, set[str]] = { | |
| "black": {"black", "jet black"}, | |
| "white": {"white", "bright white", "off white", "off-white"}, | |
| "navy": {"navy", "dark blue", "dk blue", "dress blues", "moonlit ocean", "midnight blue"}, | |
| "blue": {"blue", "navy", "dark blue", "dk blue", "dress blues", "ice blue", "light blue", "skyway", "moonlit ocean"}, | |
| "grey": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, | |
| "gray": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, | |
| "beige": {"beige", "sand", "tan", "stone", "morel", "oatmeal", "cornstalk", "cream", "camel"}, | |
| "brown": {"brown", "tan", "morel"}, | |
| "olive": {"olive", "khaki"}, | |
| "green": {"green", "olive", "khaki"}, | |
| "red": {"red", "brick red", "winetasting", "wine"}, | |
| "maroon": {"maroon", "burgundy", "wine", "winetasting"}, | |
| } | |
| def _color_keywords_for_relevance(color: str) -> set[str]: | |
| normalized = extract_base_color(color or "") or _norm(color) | |
| if not normalized or normalized == "unknown": | |
| return set() | |
| return SCRAPER_COLOR_KEYWORDS.get(normalized, {normalized}) | |
| def _matches_planned_color(product: dict[str, Any], planned_color: str) -> bool: | |
| keywords = _color_keywords_for_relevance(planned_color) | |
| if not keywords: | |
| return True | |
| text = _product_text_for_relevance(product) | |
| return any(keyword in text for keyword in keywords) | |
| def _is_relevant_scraped_product( | |
| product: dict[str, Any], | |
| target_slot: str, | |
| planned_category: str, | |
| planned_color: str, | |
| occasion_bucket: str, | |
| ) -> bool: | |
| text = _product_text_for_relevance(product) | |
| if not text: | |
| return False | |
| if any(token in text for token in SCRAPER_RELEVANCE_EXCLUDE_TOKENS): | |
| return False | |
| planned = _norm(planned_category) | |
| planned_keywords = SCRAPER_CATEGORY_KEYWORDS.get(planned, {planned} if planned else set()) | |
| if planned_keywords and not any(keyword in text for keyword in planned_keywords): | |
| return False | |
| if not _matches_planned_color(product, planned_color): | |
| return False | |
| if target_slot == "topwear": | |
| topwear_terms = {"shirt", "polo", "blazer", "jacket", "coat", "t-shirt", "tee", "hoodie"} | |
| if not any(term in text for term in topwear_terms): | |
| return False | |
| if target_slot == "bottomwear": | |
| bottom_terms = {"trousers", "pants", "jeans", "joggers", "shorts"} | |
| if not any(term in text for term in bottom_terms): | |
| return False | |
| if occasion_bucket == "formal": | |
| formal_blocked = {"t-shirt", "tee", "hoodie", "jogger", "shorts", "sport"} | |
| if any(token in text for token in formal_blocked): | |
| return False | |
| return True | |
| def _complementary_slot(slot: str) -> str: | |
| return "bottomwear" if slot == "topwear" else "topwear" | |
| def _format_matched_label(item: dict[str, Any]) -> str: | |
| color = str(item.get("color") or "").strip().lower() | |
| category = str(item.get("category") or item.get("type") or "item").strip().lower() | |
| if color and category: | |
| return f"{color} {category}" | |
| return color or category or "item" | |
| def _build_product_match_context( | |
| product: dict[str, Any], | |
| query_plan: dict[str, Any], | |
| wardrobe_snapshot: dict[str, Any], | |
| target_category: str, | |
| occasion: str, | |
| ) -> dict[str, Any]: | |
| product_slot = target_category if target_category in {"topwear", "bottomwear"} else "topwear" | |
| matching_slot = _complementary_slot(product_slot) | |
| wardrobe_items = [ | |
| _normalize_wardrobe_item(item) | |
| for item in (wardrobe_snapshot.get("items") or []) | |
| if isinstance(item, dict) | |
| ] | |
| matching_items = [item for item in wardrobe_items if _norm(item.get("type")) == matching_slot] | |
| product_name = str(product.get("name") or "Suggested product").strip() | |
| product_category = str(query_plan.get("category") or product_slot).strip() or product_slot | |
| product_color = str(query_plan.get("color") or "unknown").strip() or "unknown" | |
| product_style = str(query_plan.get("style_direction") or query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual" | |
| product_stub = { | |
| "id": str(product.get("item_link") or product_name or "product"), | |
| "type": product_slot, | |
| "category": product_category, | |
| "color": product_color, | |
| "pattern": "solid", | |
| "fabric": "unknown", | |
| "fit": product_style, | |
| "style": product_style, | |
| "occasion": str(query_plan.get("occasion_bucket") or occasion or "casual").strip() or "casual", | |
| "season": "all-season", | |
| } | |
| scored_matches: list[dict[str, Any]] = [] | |
| for candidate in matching_items: | |
| if product_slot == "topwear": | |
| scored = score_pair_full(product_stub, candidate, occasion) | |
| else: | |
| scored = score_pair_full(candidate, product_stub, occasion) | |
| scored_matches.append( | |
| { | |
| "id": candidate.get("id"), | |
| "type": candidate.get("type"), | |
| "category": candidate.get("category"), | |
| "color": candidate.get("color"), | |
| "score": scored.get("score", 0), | |
| "reason": scored.get("reason", ""), | |
| } | |
| ) | |
| scored_matches.sort(key=lambda entry: (int(entry.get("score") or 0), str(entry.get("color") or "")), reverse=True) | |
| matched_garments = scored_matches[:3] | |
| if matched_garments: | |
| matched_labels = [_format_matched_label(item) for item in matched_garments] | |
| if len(matched_labels) == 1: | |
| matched_text = matched_labels[0] | |
| elif len(matched_labels) == 2: | |
| matched_text = f"{matched_labels[0]} and {matched_labels[1]}" | |
| else: | |
| matched_text = f"{', '.join(matched_labels[:-1])}, and {matched_labels[-1]}" | |
| reason = ( | |
| f"{product_name} is a strong {product_slot} choice because it pairs cleanly with your {matching_slot} pieces like {matched_text}. " | |
| f"The match is only evaluated against {matching_slot} garments, so no top-top or bottom-bottom pairing is used." | |
| ) | |
| else: | |
| reason = ( | |
| f"{product_name} fits as a {product_slot} recommendation, but there were no {matching_slot} wardrobe items available to compare against." | |
| ) | |
| return { | |
| "reason": reason, | |
| "match_score": int(matched_garments[0]["score"]) if matched_garments else 0, | |
| "matched_with_slot": matching_slot, | |
| "matched_garments": matched_garments, | |
| } | |
| def _enrich_scraper_products_with_matches( | |
| products: list[dict[str, Any]], | |
| query_plan: dict[str, Any], | |
| wardrobe_snapshot: dict[str, Any], | |
| target_category: str, | |
| occasion: str, | |
| ) -> list[dict[str, Any]]: | |
| enriched_products: list[dict[str, Any]] = [] | |
| for product in products: | |
| if not isinstance(product, dict): | |
| continue | |
| enriched_products.append( | |
| { | |
| **product, | |
| **_build_product_match_context( | |
| product=product, | |
| query_plan=query_plan, | |
| wardrobe_snapshot=wardrobe_snapshot, | |
| target_category=target_category, | |
| occasion=occasion, | |
| ), | |
| } | |
| ) | |
| return enriched_products | |
| def _build_scraper_planning_context( | |
| wardrobe_snapshot: dict[str, Any], | |
| requested_target_category: str, | |
| occasion: str, | |
| gender: str, | |
| filters: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| items = wardrobe_snapshot.get("items") if isinstance(wardrobe_snapshot.get("items"), list) else [] | |
| occasion_bucket = _occasion_bucket(occasion) | |
| resolved_target = _resolve_target_category(requested_target_category, wardrobe_snapshot) | |
| reference_slot = "bottomwear" if resolved_target == "topwear" else "topwear" | |
| slot_colors: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []} | |
| slot_categories: dict[str, list[str]] = {"topwear": [], "bottomwear": [], "others": []} | |
| for raw_item in items: | |
| if not isinstance(raw_item, dict): | |
| continue | |
| slot = _norm(raw_item.get("type")) | |
| slot_key = slot if slot in slot_colors else "others" | |
| slot_colors[slot_key].append(str(raw_item.get("color") or "")) | |
| slot_categories[slot_key].append(str(raw_item.get("category") or "")) | |
| preferred_colors = [str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()] | |
| color_resonance_scores = _rank_color_resonance( | |
| slot_colors=slot_colors, | |
| reference_slot=reference_slot, | |
| preferred_colors=preferred_colors, | |
| occasion_bucket=occasion_bucket, | |
| ) | |
| color_shortlist = [ | |
| str(entry.get("color") or "").strip() | |
| for entry in color_resonance_scores | |
| if str(entry.get("color") or "").strip() | |
| ][:6] | |
| allowed = _allowed_categories(resolved_target, occasion_bucket) | |
| style_hint = "formal-smart" if occasion_bucket == "formal" else "occasion-aligned" | |
| if occasion_bucket == "party": | |
| style_hint = "elevated-party" | |
| if occasion_bucket == "sports": | |
| style_hint = "performance-athletic" | |
| def reference_score(item: dict[str, Any]) -> int: | |
| score = 0 | |
| slot = _norm(item.get("type")) | |
| if slot == reference_slot: | |
| score += 4 | |
| category = _norm(item.get("category")) | |
| if category and category != "unknown": | |
| score += 2 | |
| color = extract_base_color(item.get("color") or "") | |
| if color and color != "unknown": | |
| score += 2 | |
| item_style = _norm(item.get("style") or item.get("occasion") or "") | |
| if occasion_bucket == "formal" and any(token in item_style for token in {"formal", "work", "office", "business"}): | |
| score += 3 | |
| if occasion_bucket == "party" and any(token in item_style for token in {"party", "festive", "ethnic"}): | |
| score += 3 | |
| if occasion_bucket == "casual" and "casual" in item_style: | |
| score += 2 | |
| return score | |
| ranked_reference_items = [item for item in items if isinstance(item, dict)] | |
| ranked_reference_items.sort(key=reference_score, reverse=True) | |
| reference_item_ids = [str(item.get("id") or "") for item in ranked_reference_items if str(item.get("id") or "").strip()][:4] | |
| return { | |
| "requested_target_category": requested_target_category, | |
| "resolved_target_category": resolved_target, | |
| "occasion_bucket": occasion_bucket, | |
| "gender_preference": _normalize_scraper_gender(gender), | |
| "allowed_categories": allowed, | |
| "color_shortlist": color_shortlist[:6], | |
| "color_resonance_scores": color_resonance_scores[:8], | |
| "style_direction": style_hint, | |
| "reference_slot": reference_slot, | |
| "reference_item_ids": reference_item_ids, | |
| "reference_items": [ | |
| { | |
| "id": item.get("id"), | |
| "type": item.get("type"), | |
| "category": item.get("category"), | |
| "color": item.get("color"), | |
| "style": item.get("style"), | |
| "occasion": item.get("occasion"), | |
| } | |
| for item in ranked_reference_items[:6] | |
| ], | |
| "slot_dominant_categories": { | |
| "topwear": _top_terms(slot_categories.get("topwear", []), limit=4), | |
| "bottomwear": _top_terms(slot_categories.get("bottomwear", []), limit=4), | |
| }, | |
| "slot_dominant_colors": { | |
| "topwear": _top_terms(slot_colors.get("topwear", []), limit=4), | |
| "bottomwear": _top_terms(slot_colors.get("bottomwear", []), limit=4), | |
| }, | |
| } | |
| def _normalize_planned_category(raw_value: Any, allowed: list[str]) -> str: | |
| normalized = _norm(raw_value) | |
| if normalized: | |
| for category in allowed: | |
| if normalized == category or normalized in category or category in normalized: | |
| return category | |
| return allowed[0] if allowed else "shirt" | |
| def _extract_explicit_category_from_prompt(user_prompt: str, allowed: list[str]) -> str | None: | |
| normalized_prompt = _norm(user_prompt) | |
| if not normalized_prompt: | |
| return None | |
| # Map common synonyms to policy categories. | |
| synonym_map: dict[str, str] = { | |
| "blazer": "jacket", | |
| "sport coat": "jacket", | |
| "suit jacket": "jacket", | |
| "tee": "t-shirt", | |
| "tshirt": "t-shirt", | |
| "trouser": "trousers", | |
| "pant": "pants", | |
| } | |
| allowed_set = {str(value).strip().lower() for value in allowed if str(value).strip()} | |
| # Direct allowed-category mention takes highest priority. | |
| for category in allowed: | |
| normalized_category = str(category).strip().lower() | |
| if normalized_category and normalized_category in normalized_prompt: | |
| return category | |
| # Then check synonym map against allowed categories. | |
| for phrase, mapped_category in synonym_map.items(): | |
| if phrase in normalized_prompt and mapped_category in allowed_set: | |
| for category in allowed: | |
| if str(category).strip().lower() == mapped_category: | |
| return category | |
| return None | |
| def _normalize_planned_color(raw_value: Any, color_shortlist: list[str]) -> str: | |
| normalized = extract_base_color(raw_value or "") | |
| if normalized and normalized in color_shortlist: | |
| return normalized | |
| if normalized: | |
| for candidate in color_shortlist: | |
| if candidate in normalized or normalized in candidate: | |
| return candidate | |
| return "" | |
| def _resolve_color_fallback(color_shortlist: list[str], color_resonance_scores: list[dict[str, Any]]) -> str: | |
| for entry in color_resonance_scores: | |
| color = str(entry.get("color") or "").strip() | |
| if color: | |
| return color | |
| if color_shortlist: | |
| return color_shortlist[0] | |
| return "black" | |
| def _normalize_reference_ids( | |
| raw_ids: Any, | |
| valid_ids: list[str], | |
| fallback_ids: list[str], | |
| limit: int = 4, | |
| ) -> list[str]: | |
| valid_set = {value for value in valid_ids if value} | |
| normalized: list[str] = [] | |
| if isinstance(raw_ids, list): | |
| for value in raw_ids: | |
| item_id = str(value or "").strip() | |
| if not item_id or item_id in normalized or item_id not in valid_set: | |
| continue | |
| normalized.append(item_id) | |
| if len(normalized) >= limit: | |
| return normalized | |
| for item_id in fallback_ids: | |
| if item_id and item_id not in normalized: | |
| normalized.append(item_id) | |
| if len(normalized) >= limit: | |
| break | |
| return normalized | |
| def _build_planned_query( | |
| gender: str | None, | |
| color: str, | |
| category: str, | |
| occasion: str, | |
| style_direction: str, | |
| ) -> str: | |
| parts = [ | |
| str(gender or "").strip(), | |
| color, | |
| style_direction, | |
| category, | |
| f"for {occasion.strip()}" if occasion.strip() else "", | |
| ] | |
| return " ".join(part for part in parts if part).strip() | |
| def _fallback_scraper_plan( | |
| planning_context: dict[str, Any], | |
| occasion: str, | |
| gender: str, | |
| reason: str, | |
| ) -> dict[str, Any]: | |
| allowed = [str(value) for value in planning_context.get("allowed_categories", []) if str(value).strip()] | |
| color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] | |
| style_direction = str(planning_context.get("style_direction") or "occasion-aligned") | |
| resolved_target = str(planning_context.get("resolved_target_category") or "topwear") | |
| plan_gender = _normalize_scraper_gender(gender) | |
| category = allowed[0] if allowed else "shirt" | |
| color = color_shortlist[0] if color_shortlist else "black" | |
| query = _build_planned_query(plan_gender, color, category, occasion, style_direction) | |
| return { | |
| "target_category": resolved_target, | |
| "color": color, | |
| "category": category, | |
| "gender": plan_gender, | |
| "style_direction": style_direction, | |
| "reference_item_ids": planning_context.get("reference_item_ids", []), | |
| "query": query, | |
| "reason": reason, | |
| "source": "fallback", | |
| } | |
| def _generate_scraper_plan_with_nemotron( | |
| occasion: str, | |
| gender: str, | |
| preferences: str, | |
| user_prompt: str, | |
| target_category: str, | |
| filters: dict[str, Any], | |
| max_products: int | None, | |
| store: str, | |
| strict_nemotron: bool = False, | |
| ) -> dict[str, Any]: | |
| wardrobe_snapshot = _wardrobe_metadata_snapshot() | |
| requested_target = _normalize_target_category(target_category) | |
| safe_filters = filters if isinstance(filters, dict) else {} | |
| planning_context = _build_scraper_planning_context( | |
| wardrobe_snapshot=wardrobe_snapshot, | |
| requested_target_category=requested_target, | |
| occasion=occasion, | |
| gender=gender, | |
| filters=safe_filters, | |
| ) | |
| prompt = _build_scraper_plan_prompt( | |
| occasion=occasion, | |
| gender=gender, | |
| preferences=preferences, | |
| user_prompt=user_prompt, | |
| target_category=requested_target, | |
| filters=safe_filters, | |
| wardrobe_snapshot=wardrobe_snapshot, | |
| planning_context=planning_context, | |
| max_products=max_products, | |
| store=store, | |
| ) | |
| plan_source = "nemotron" | |
| plan_error: str | None = None | |
| try: | |
| model_text = run_scraper_planner_text_inference(prompt, max_tokens=SCRAPER_PLANNER_MAX_TOKENS) | |
| parsed = _recover_scraper_plan_from_text( | |
| model_text=model_text, | |
| planning_context=planning_context, | |
| occasion=occasion, | |
| gender=gender, | |
| ) | |
| if not isinstance(parsed, dict) or not parsed: | |
| raise NvidiaPayloadError("Nemotron scraper planner returned empty or invalid JSON payload.") | |
| except Exception as exc: | |
| if strict_nemotron: | |
| raise NvidiaPayloadError(f"Nemotron planner unavailable: {exc}") from exc | |
| plan_source = "fallback" | |
| plan_error = str(exc) | |
| parsed = _fallback_scraper_plan( | |
| planning_context=planning_context, | |
| occasion=occasion, | |
| gender=gender, | |
| reason=( | |
| "Live Nemotron query planning was unavailable, so a deterministic fallback planner was used." | |
| ), | |
| ) | |
| resolved_target = _normalize_target_category( | |
| parsed.get("target_category") or planning_context.get("resolved_target_category") or requested_target | |
| ) | |
| if resolved_target == "both": | |
| resolved_target = str(planning_context.get("resolved_target_category") or "topwear") | |
| allowed = _allowed_categories( | |
| target_category=resolved_target, | |
| occasion_bucket=str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)), | |
| ) | |
| occasion_bucket = str(planning_context.get("occasion_bucket") or _occasion_bucket(occasion)) | |
| color_shortlist = [str(value) for value in planning_context.get("color_shortlist", []) if str(value).strip()] | |
| color_resonance_scores = [ | |
| entry | |
| for entry in (planning_context.get("color_resonance_scores") or []) | |
| if isinstance(entry, dict) | |
| ] | |
| color = _normalize_planned_color(parsed.get("color"), color_shortlist) | |
| if not color: | |
| color = _resolve_color_fallback(color_shortlist, color_resonance_scores) | |
| if color_shortlist and color not in color_shortlist: | |
| color = _resolve_color_fallback(color_shortlist, color_resonance_scores) | |
| explicit_category = _extract_explicit_category_from_prompt(user_prompt, allowed) | |
| category = explicit_category or _normalize_planned_category(parsed.get("category"), allowed) | |
| requested_gender = _normalize_scraper_gender(gender) | |
| inferred_or_planned_gender = _normalize_scraper_gender( | |
| str(parsed.get("gender") or planning_context.get("gender_preference") or gender or "") | |
| ) | |
| plan_gender = requested_gender or inferred_or_planned_gender | |
| style_direction = str(parsed.get("style_direction") or planning_context.get("style_direction") or "occasion-aligned") | |
| valid_reference_ids = [ | |
| str(item.get("id") or "") | |
| for item in (wardrobe_snapshot.get("items") or []) | |
| if isinstance(item, dict) | |
| ] | |
| reference_item_ids = _normalize_reference_ids( | |
| raw_ids=parsed.get("reference_item_ids"), | |
| valid_ids=valid_reference_ids, | |
| fallback_ids=[str(value) for value in planning_context.get("reference_item_ids", [])], | |
| limit=4, | |
| ) | |
| query = str(parsed.get("query") or "").strip() | |
| if not query or explicit_category is not None or requested_gender is not None: | |
| query = _build_planned_query(plan_gender, color, category, occasion, style_direction) | |
| resonance_lead = color_resonance_scores[0] if color_resonance_scores else {} | |
| default_grounding = ( | |
| f"Selected {color} from DB metadata resonance: " | |
| f"reference_count={int(resonance_lead.get('reference_count') or 0)}, " | |
| f"global_count={int(resonance_lead.get('global_count') or 0)}." | |
| if resonance_lead | |
| else "Selected color using wardrobe metadata shortlist and reference-slot compatibility." | |
| ) | |
| wardrobe_grounding = str(parsed.get("wardrobe_grounding") or default_grounding) | |
| reason = str(parsed.get("reason") or "Nemotron generated a wardrobe-aware shopping query.") | |
| recommendation = ScraperRecommendation( | |
| color=color, | |
| category=category, | |
| gender=plan_gender, | |
| ) | |
| store_key = _normalize_store_name(store or SCRAPER_DEFAULT_STORE or "nike") | |
| search_urls = _build_store_search_urls_from_query( | |
| query, | |
| store=store_key, | |
| gender=plan_gender, | |
| wardrobe_items=list(wardrobe_snapshot.get("items") or []), | |
| requested_category=requested_target, | |
| ) | |
| if not search_urls: | |
| search_urls = _build_store_search_urls_from_recommendation( | |
| recommendation, | |
| store=store_key, | |
| occasion=occasion, | |
| ) | |
| intermediate_steps: list[dict[str, Any]] = [ | |
| { | |
| "step": "plan", | |
| "store": store_key, | |
| "query": query, | |
| "target_category": resolved_target, | |
| "color": color, | |
| "category": category, | |
| } | |
| ] | |
| generated_urls = list(dict.fromkeys(search_urls)) | |
| scrape_limit = max_products if isinstance(max_products, int) and max_products > 0 else 12 | |
| scraped_products: list[dict[str, Any]] = [] | |
| fallback_products: list[dict[str, Any]] = [] | |
| seen_links: set[str] = set() | |
| scrape_errors: list[str] = [] | |
| intermediate_steps.append( | |
| { | |
| "step": "url_generation", | |
| "query": query, | |
| "url_count": len(generated_urls), | |
| "total_urls": len(generated_urls), | |
| } | |
| ) | |
| if not generated_urls: | |
| intermediate_steps.append( | |
| { | |
| "step": "diagnostic", | |
| "message": "Planner succeeded but no search URLs were generated.", | |
| "attempted_url_count": 0, | |
| } | |
| ) | |
| else: | |
| for index, search_url in enumerate(generated_urls): | |
| if len(scraped_products) >= scrape_limit: | |
| break | |
| try: | |
| extracted = _extract_store_product_summaries(search_url=search_url, store=store_key) | |
| except requests.RequestException as exc: | |
| error_message = f"url[{index + 1}] scrape failed: {exc}" | |
| scrape_errors.append(error_message) | |
| intermediate_steps.append( | |
| { | |
| "step": "scrape", | |
| "query": query, | |
| "url_count": len(generated_urls), | |
| "new_products": 0, | |
| "total_products": len(scraped_products), | |
| "errors": [error_message], | |
| } | |
| ) | |
| continue | |
| new_products = 0 | |
| for product in extracted: | |
| if not isinstance(product, dict): | |
| continue | |
| item_link = str(product.get("item_link") or "").strip() | |
| if not item_link or item_link in seen_links: | |
| continue | |
| seen_links.add(item_link) | |
| if _is_relevant_scraped_product( | |
| product=product, | |
| target_slot=resolved_target, | |
| planned_category=category, | |
| planned_color=color, | |
| occasion_bucket=occasion_bucket, | |
| ): | |
| scraped_products.append(product) | |
| new_products += 1 | |
| else: | |
| fallback_products.append(product) | |
| if len(scraped_products) >= scrape_limit: | |
| break | |
| intermediate_steps.append( | |
| { | |
| "step": "scrape", | |
| "query": query, | |
| "url_count": len(generated_urls), | |
| "new_products": new_products, | |
| "total_products": len(scraped_products), | |
| } | |
| ) | |
| if not scraped_products and fallback_products and not _color_keywords_for_relevance(color): | |
| scraped_products = fallback_products[:scrape_limit] | |
| intermediate_steps.append( | |
| { | |
| "step": "scrape_fallback", | |
| "query": query, | |
| "new_products": len(scraped_products), | |
| "total_products": len(scraped_products), | |
| "message": "Used non-filtered scrape fallback because strict relevance filtering returned no products.", | |
| } | |
| ) | |
| elif not scraped_products and fallback_products: | |
| intermediate_steps.append( | |
| { | |
| "step": "scrape_filter", | |
| "query": query, | |
| "rejected_products": len(fallback_products), | |
| "total_products": 0, | |
| "message": "Rejected scraped products because none matched the planned color and category.", | |
| } | |
| ) | |
| query_plan_payload = { | |
| "color": color, | |
| "category": category, | |
| "gender": plan_gender, | |
| "query": query, | |
| "final_query": query, | |
| "reason": reason, | |
| "wardrobe_grounding": wardrobe_grounding, | |
| "source": plan_source, | |
| "target_category": resolved_target, | |
| "style_direction": style_direction, | |
| "occasion_bucket": planning_context.get("occasion_bucket"), | |
| "reference_item_ids": reference_item_ids, | |
| "color_resonance_scores": color_resonance_scores[:4], | |
| } | |
| enriched_products = _enrich_scraper_products_with_matches( | |
| products=scraped_products, | |
| query_plan=query_plan_payload, | |
| wardrobe_snapshot=wardrobe_snapshot, | |
| target_category=resolved_target, | |
| occasion=occasion, | |
| ) | |
| product_urls = [ | |
| str(product.get("item_link") or "").strip() | |
| for product in enriched_products | |
| if str(product.get("item_link") or "").strip() | |
| ] | |
| response_payload: dict[str, Any] = { | |
| "occasion": occasion, | |
| "gender": gender, | |
| "preferences": preferences, | |
| "wardrobe_snapshot": wardrobe_snapshot, | |
| "query_plan": query_plan_payload, | |
| "store": store_key, | |
| "search_urls": generated_urls, | |
| "product_urls": product_urls, | |
| "products": enriched_products, | |
| "count": len(enriched_products), | |
| "intermediate_steps": intermediate_steps, | |
| "final_query": query, | |
| "plan_source": plan_source, | |
| "plan_error": plan_error, | |
| "scrape_error": "; ".join(scrape_errors) if scrape_errors else None, | |
| "target_category": resolved_target, | |
| } | |
| response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload) | |
| return _store_scraper_runtime_result(response_payload) | |
| def _build_shopping_suggestions_from_scraper( | |
| occasion: str, | |
| target_category: str, | |
| gender_preference: str, | |
| filters: dict[str, Any], | |
| max_results: int, | |
| store: str, | |
| ) -> dict[str, Any]: | |
| preferences = ", ".join( | |
| str(value) | |
| for value in [ | |
| *([item for item in filters.get("preferred_colors", []) if item]), | |
| *([item for item in filters.get("preferred_patterns", []) if item]), | |
| *([item for item in filters.get("preferred_styles", []) if item]), | |
| *([item for item in filters.get("preferred_fabrics", []) if item]), | |
| *([item for item in filters.get("preferred_fits", []) if item]), | |
| *([item for item in filters.get("preferred_seasons", []) if item]), | |
| *([item for item in filters.get("include_keywords", []) if item]), | |
| ] | |
| ) | |
| runtime_payload = _generate_scraper_plan_with_nemotron( | |
| occasion=occasion, | |
| gender=gender_preference, | |
| preferences=preferences, | |
| user_prompt=preferences, | |
| target_category=target_category, | |
| filters=filters, | |
| max_products=max_results, | |
| store=store, | |
| ) | |
| products = list(runtime_payload.get("products") or []) | |
| query_plan = dict(runtime_payload.get("query_plan") or {}) | |
| suggestion_items: list[dict[str, Any]] = [] | |
| for index, product in enumerate(products[:max_results]): | |
| item_link = str(product.get("item_link") or "").strip() | |
| if not item_link: | |
| continue | |
| suggestion_items.append( | |
| { | |
| "target_category": target_category if target_category != "both" else ("topwear" if index % 2 == 0 else "bottomwear"), | |
| "title": str(product.get("name") or query_plan.get("query") or "Suggested Product"), | |
| "url": item_link, | |
| "image_url": str(product.get("image_url") or ""), | |
| "store": str(runtime_payload.get("store") or store or "nike").title(), | |
| "match_score": max(65, 95 - index * 4), | |
| "reason": str(product.get("reason") or query_plan.get("reason") or "Nemotron generated a wardrobe-aware shopping query."), | |
| "product_category": str(query_plan.get("category") or "shopping"), | |
| "color": str(query_plan.get("color") or "black"), | |
| "pattern": "solid", | |
| "search_query": str(query_plan.get("query") or occasion), | |
| "scrape_status": "live", | |
| "scrape_error": None, | |
| "product_gender": str(query_plan.get("gender") or gender_preference or "unknown") or "unknown", | |
| "matched_with_slot": str(product.get("matched_with_slot") or ("bottomwear" if target_category == "topwear" else "topwear")), | |
| "matched_garments": product.get("matched_garments") or [], | |
| } | |
| ) | |
| return { | |
| "occasion": occasion, | |
| "target_category": target_category, | |
| "gender_preference": gender_preference, | |
| "search_filters": filters, | |
| "suggestions": suggestion_items, | |
| "error": None if suggestion_items else "No live scraper results were returned.", | |
| "saved_json_path": runtime_payload.get("saved_json_path", ""), | |
| "runtime_id": runtime_payload.get("runtime_id", ""), | |
| "query_plan": query_plan, | |
| } | |
| app = FastAPI(title="Wardrobe Classifier API", version="2.0.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| _OUTFIT_GRID_SESSIONS: dict[str, dict[str, Any]] = {} | |
| async def lifespan(app: FastAPI): | |
| init_db() | |
| cache_purge_expired() | |
| yield | |
| app.router.lifespan_context = lifespan | |
| CLASSIFICATION_PROMPT = """You are a fashion expert analyzing a garment image for a wardrobe assistant app. | |
| Carefully examine the primary clothing item in the image and return only a valid JSON object. | |
| Rules: | |
| - Focus only on the dominant foreground garment. Ignore the person's face/body, background, hangers, mannequins, and room objects. | |
| - If multiple garments are visible, classify the single most prominent garment only. | |
| - Be specific with colors (for example, "Navy Blue" instead of "Blue", "Olive Green" instead of "Green"). | |
| - If any attribute is not clearly visible, use "Unknown". | |
| - For "color", include all visible colors in one string (for example, "White with Black stripes"). | |
| - Do not include any explanation, markdown, or text outside the JSON object. | |
| Return this exact JSON structure: | |
| { | |
| "type": "e.g. T-Shirt / Jeans / Dress / Jacket / Hoodie / Shorts / Saree / Kurta", | |
| "category": "Topwear / Bottomwear / Footwear / Outerwear / Ethnic / Accessories", | |
| "color": "exact specific color name not basic colours", | |
| "pattern": "Solid / Striped / Checkered / Floral / Printed / Graphic / Embroidered / Tie-Dye", | |
| "fabric": "Cotton / Denim / Wool / Polyester / Silk / Linen / Leather / Unknown", | |
| "fit": "Slim / Regular / Oversized / Fitted / Relaxed / Unknown", | |
| "occasion": "Casual / Formal / Sports / Party / Work / Ethnic", | |
| "season": "Summer / Winter / Monsoon / All-Season" | |
| }""" | |
| NVIDIA_INVOKE_URL = os.getenv( | |
| "NVIDIA_INVOKE_URL", | |
| "https://integrate.api.nvidia.com/v1/chat/completions", | |
| ) | |
| NVIDIA_MODEL_ID = os.getenv("NVIDIA_MODEL_ID", "qwen/qwen3.5-122b-a10b") | |
| NVIDIA_MAX_TOKENS = int(os.getenv("NVIDIA_MAX_TOKENS", "16384")) | |
| NVIDIA_REASONING_MAX_TOKENS = int(os.getenv("NVIDIA_REASONING_MAX_TOKENS", "16384")) | |
| NVIDIA_TEMPERATURE = float(os.getenv("NVIDIA_TEMPERATURE", "0.60")) | |
| NVIDIA_TOP_P = float(os.getenv("NVIDIA_TOP_P", "0.95")) | |
| NVIDIA_TIMEOUT_SECONDS = int(os.getenv("NVIDIA_TIMEOUT_SECONDS", "180")) | |
| NVIDIA_MAX_RETRIES = int(os.getenv("NVIDIA_MAX_RETRIES", "3")) | |
| NVIDIA_RETRY_BACKOFF_SECONDS = float(os.getenv("NVIDIA_RETRY_BACKOFF_SECONDS", "0.8")) | |
| NVIDIA_ENABLE_THINKING = str(os.getenv("NVIDIA_ENABLE_THINKING", "false")).strip().lower() == "true" | |
| NVIDIA_IMAGE_MAX_DIM = int(os.getenv("NVIDIA_IMAGE_MAX_DIM", "1400")) | |
| NVIDIA_FALLBACK_MODEL_IDS = [ | |
| model_id.strip() | |
| for model_id in os.getenv("NVIDIA_FALLBACK_MODEL_IDS", "nvidia/nemotron-3-nano-omni-30b-a3b-reasoning").split(",") | |
| if model_id.strip() | |
| ] | |
| NVIDIA_API_KEY_MISSING_DETAIL = "NVIDIA_API_KEY is not configured on this Space." | |
| class NvidiaGatewayError(RuntimeError): | |
| def __init__(self, message: str, status_code: int = 502) -> None: | |
| super().__init__(message) | |
| self.status_code = status_code | |
| class NvidiaPayloadError(RuntimeError): | |
| pass | |
| class NvidiaTokenLimitError(NvidiaPayloadError): | |
| pass | |
| def _nvidia_api_key() -> str: | |
| return os.getenv("NVIDIA_API_KEY", "").strip() | |
| def _candidate_model_ids(primary_model_id: str) -> list[str]: | |
| model_ids = [primary_model_id, *NVIDIA_FALLBACK_MODEL_IDS] | |
| deduped: list[str] = [] | |
| seen: set[str] = set() | |
| for model_id in model_ids: | |
| key = model_id.strip() | |
| if not key or key in seen: | |
| continue | |
| deduped.append(key) | |
| seen.add(key) | |
| return deduped | |
| def _is_degraded_function_error(exc: Exception) -> bool: | |
| if not isinstance(exc, NvidiaGatewayError): | |
| return False | |
| return "DEGRADED function cannot be invoked" in str(exc) | |
| OUTFIT_GRID_CELL_SIZE = int(os.getenv("OUTFIT_GRID_CELL_SIZE", "224")) | |
| OUTFIT_GRID_LABEL_HEIGHT = int(os.getenv("OUTFIT_GRID_LABEL_HEIGHT", "28")) | |
| OUTFIT_GRID_PADDING = int(os.getenv("OUTFIT_GRID_PADDING", "12")) | |
| OUTFIT_GRID_FETCH_TIMEOUT_SECONDS = int(os.getenv("OUTFIT_GRID_FETCH_TIMEOUT_SECONDS", "12")) | |
| OUTFIT_GRID_SESSION_TTL_SECONDS = int(os.getenv("OUTFIT_GRID_SESSION_TTL_SECONDS", "3600")) | |
| OUTFIT_GRID_SESSION_DIR = Path( | |
| os.getenv("OUTFIT_GRID_SESSION_DIR", str(Path(tempfile.gettempdir()) / "wardrobe-grid-sessions")) | |
| ) | |
| OUTFIT_GRID_MAX_TOP_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_TOP_ITEMS", "4")) | |
| OUTFIT_GRID_MAX_BOTTOM_ITEMS = int(os.getenv("OUTFIT_GRID_MAX_BOTTOM_ITEMS", "4")) | |
| OUTFIT_ANCHOR_MIN_SCORE = int(os.getenv("OUTFIT_ANCHOR_MIN_SCORE", "45")) | |
| OUTFIT_TEXT_PRESELECT_ENABLED = str(os.getenv("OUTFIT_TEXT_PRESELECT_ENABLED", "false")).strip().lower() == "true" | |
| OUTFIT_TEXT_SELECTOR_MAX_TOKENS = int(os.getenv("OUTFIT_TEXT_SELECTOR_MAX_TOKENS", "400")) | |
| OUTFIT_AI_MAX_TOKENS = int(os.getenv("OUTFIT_AI_MAX_TOKENS", "4096")) | |
| OUTFIT_TEXT_SELECTOR_NAME = "nemotron-text-preselect-v1" | |
| OUTFIT_AI_SCORER_NAME = "ai-grid-v1" | |
| OUTFIT_FALLBACK_SCORER_NAME = "fallback-current-v1" | |
| OUTFIT_GRID_SCORING_PROMPT_TEMPLATE = """You are an expert multimodal outfit matching engine. | |
| Task: | |
| Evaluate every valid outfit combination shown in the attached wardrobe grid image and rank the best outfits for the given context. | |
| Grid semantics: | |
| - Row 1 contains topwear only. | |
| - Row 2 contains bottomwear only. | |
| - Row 3 contains optional "Others" items (footwear, accessories, outerwear, or uncategorized garments). If Row 3 is absent, ignore this slot. | |
| - Each cell is labeled with a coordinate like 1:1, 1:2, 2:1, 2:2, 3:1. | |
| - A valid outfit is exactly one Row 1 item plus one Row 2 item, and optionally one Row 3 item. | |
| User context: | |
| - Occasion: {occasion} | |
| - Region: {region} | |
| - Weather JSON: {weather_json} | |
| - User profile JSON: {user_profile_json} | |
| - Anchor mode: {anchor_mode} | |
| - Locked top index: {locked_top_index} | |
| - Locked bottom index: {locked_bottom_index} | |
| - Locked other index: {locked_other_index} | |
| - Anchor item JSON: {anchor_item_json} | |
| Wardrobe metadata map: | |
| {metadata_json} | |
| Scoring rubric: | |
| - occasion relevance | |
| - color harmony | |
| - pattern compatibility | |
| - fit alignment | |
| - style coherence | |
| - seasonal/contextual appropriateness | |
| Instructions: | |
| 1. Use both the composite image and the metadata map together. | |
| 2. Treat the locked item as the fixed styling anchor whenever Anchor mode is not "none". | |
| 3. Evaluate all {combination_count} possible valid combinations exactly once before ranking. | |
| 4. If a locked top, locked bottom, or locked other index is provided, only consider combinations containing that index. | |
| 5. Assign each retained combination a final score from 0 to 100 and this score breakdown: | |
| occasion, color, pattern, fit, style, season | |
| 6. Return only valid JSON. No markdown and no prose outside JSON. | |
| 7. Return at most the top {top_k} outfits in descending score order. | |
| 8. When a Row 3 item is part of the outfit, include its cell in "other_index". If no Row 3 item is used, set "other_index" to null. | |
| Return this exact JSON shape: | |
| {{ | |
| "recommendations": [ | |
| {{ | |
| "top_index": "1:1", | |
| "bottom_index": "2:1", | |
| "other_index": "3:1", | |
| "score": 92, | |
| "breakdown": {{ | |
| "occasion": 94, | |
| "color": 91, | |
| "pattern": 89, | |
| "fit": 90, | |
| "style": 93, | |
| "season": 88 | |
| }}, | |
| "reason": "Max 15 words. Short user-facing explanation grounded in visual + metadata evidence.", | |
| "tip": "Max 10 words. One concise styling tip." | |
| }} | |
| ] | |
| }}""" | |
| # --------------------------------------------------------------------------- | |
| # Model helpers | |
| # --------------------------------------------------------------------------- | |
| def _image_to_data_url(image: Image.Image) -> str: | |
| if NVIDIA_IMAGE_MAX_DIM > 0: | |
| image = image.copy() | |
| image.thumbnail((NVIDIA_IMAGE_MAX_DIM, NVIDIA_IMAGE_MAX_DIM), Image.Resampling.LANCZOS) | |
| buffer = io.BytesIO() | |
| image.save(buffer, format="PNG") | |
| image_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| return f"data:image/png;base64,{image_b64}" | |
| def _extract_text_from_nvidia_content(content: Any) -> str: | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for chunk in content: | |
| if isinstance(chunk, str): | |
| parts.append(chunk) | |
| continue | |
| if not isinstance(chunk, dict): | |
| continue | |
| for key in ("text", "content", "value"): | |
| value = chunk.get(key) | |
| if isinstance(value, str) and value: | |
| parts.append(value) | |
| break | |
| return "".join(parts).strip() | |
| if isinstance(content, dict): | |
| for key in ("text", "content", "value"): | |
| value = content.get(key) | |
| if isinstance(value, str) and value: | |
| return value | |
| return "" | |
| def _extract_nvidia_text(payload: dict[str, Any]) -> str: | |
| try: | |
| choice = payload["choices"][0] | |
| message = choice["message"] | |
| except (KeyError, IndexError, TypeError) as exc: | |
| raise NvidiaPayloadError(f"Unexpected NVIDIA API response shape: {payload}") from exc | |
| content = message.get("content") | |
| extracted_content = _extract_text_from_nvidia_content(content) | |
| if extracted_content: | |
| return extracted_content | |
| reasoning_content = message.get("reasoning_content") | |
| extracted_reasoning_content = _extract_text_from_nvidia_content(reasoning_content) | |
| if extracted_reasoning_content: | |
| return extracted_reasoning_content | |
| reasoning = message.get("reasoning") | |
| if isinstance(reasoning, str) and reasoning.strip(): | |
| return reasoning.strip() | |
| if choice.get("finish_reason") == "length": | |
| raise NvidiaTokenLimitError( | |
| "NVIDIA response hit max_tokens before final content was produced." | |
| ) | |
| raise NvidiaPayloadError( | |
| "Unexpected NVIDIA message payload: " | |
| f"finish_reason={choice.get('finish_reason')}, " | |
| f"message_keys={list(message.keys())}, " | |
| f"content={content!r}, " | |
| f"reasoning_content={reasoning_content!r}" | |
| ) | |
| def _extract_streamed_nvidia_text(response: requests.Response) -> str: | |
| chunks: list[str] = [] | |
| for raw_line in response.iter_lines(decode_unicode=True): | |
| if not raw_line: | |
| continue | |
| line = raw_line.strip() | |
| if not line or not line.startswith("data:"): | |
| continue | |
| data = line[5:].strip() | |
| if not data or data == "[DONE]": | |
| continue | |
| try: | |
| payload = json.loads(data) | |
| except json.JSONDecodeError: | |
| continue | |
| try: | |
| choice = payload["choices"][0] | |
| except (KeyError, IndexError, TypeError): | |
| continue | |
| delta = choice.get("delta") or {} | |
| if isinstance(delta, dict): | |
| content = _extract_text_from_nvidia_content(delta.get("content")) | |
| if content: | |
| chunks.append(content) | |
| reasoning = _extract_text_from_nvidia_content(delta.get("reasoning_content")) | |
| if reasoning: | |
| chunks.append(reasoning) | |
| alt_reasoning = _extract_text_from_nvidia_content(delta.get("reasoning")) | |
| if alt_reasoning: | |
| chunks.append(alt_reasoning) | |
| message = choice.get("message") or {} | |
| if isinstance(message, dict): | |
| final_content = _extract_text_from_nvidia_content(message.get("content")) | |
| if final_content: | |
| chunks.append(final_content) | |
| final_reasoning = _extract_text_from_nvidia_content(message.get("reasoning_content")) | |
| if final_reasoning: | |
| chunks.append(final_reasoning) | |
| text = "".join(chunks).strip() | |
| if text: | |
| return text | |
| raise NvidiaPayloadError("NVIDIA stream ended without returning any content.") | |
| def run_nvidia_inference(image: Image.Image, prompt: str, max_tokens: int = NVIDIA_MAX_TOKENS) -> str: | |
| api_key = _nvidia_api_key() | |
| if not api_key: | |
| raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) | |
| last_error: Exception | None = None | |
| for model_id in _candidate_model_ids(NVIDIA_MODEL_ID): | |
| current_max_tokens = max_tokens | |
| while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: | |
| payload = { | |
| "model": model_id, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": _image_to_data_url(image), | |
| }, | |
| }, | |
| ], | |
| } | |
| ], | |
| "max_tokens": current_max_tokens, | |
| "temperature": NVIDIA_TEMPERATURE, | |
| "top_p": NVIDIA_TOP_P, | |
| "stream": True, | |
| } | |
| if NVIDIA_ENABLE_THINKING: | |
| payload["chat_template_kwargs"] = {"enable_thinking": True} | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Accept": "text/event-stream", | |
| "Content-Type": "application/json", | |
| } | |
| last_error = None | |
| for attempt in range(NVIDIA_MAX_RETRIES + 1): | |
| try: | |
| response = requests.post( | |
| NVIDIA_INVOKE_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=NVIDIA_TIMEOUT_SECONDS, | |
| ) | |
| if response.status_code in {429, 500, 502, 503, 504}: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", | |
| status_code=503 if response.status_code == 429 else 502, | |
| ) | |
| if response.status_code >= 400: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", | |
| status_code=502, | |
| ) | |
| return _extract_streamed_nvidia_text(response) | |
| except NvidiaTokenLimitError as exc: | |
| last_error = exc | |
| break | |
| except (requests.RequestException, NvidiaGatewayError) as exc: | |
| last_error = exc | |
| if _is_degraded_function_error(exc): | |
| break | |
| if attempt >= NVIDIA_MAX_RETRIES: | |
| break | |
| time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) | |
| except NvidiaPayloadError: | |
| raise | |
| if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: | |
| current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) | |
| continue | |
| if _is_degraded_function_error(last_error or Exception()): | |
| print(f"[nvidia] model degraded, trying fallback model: {model_id}") | |
| break | |
| if isinstance(last_error, NvidiaGatewayError): | |
| raise last_error | |
| if isinstance(last_error, requests.RequestException): | |
| raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error | |
| if last_error is not None: | |
| raise last_error | |
| break | |
| if isinstance(last_error, Exception): | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}", | |
| status_code=502, | |
| ) from last_error | |
| raise NvidiaGatewayError( | |
| "NVIDIA API request failed after exhausting reasoning token budget.", | |
| status_code=502, | |
| ) | |
| def run_nvidia_text_inference(prompt: str, max_tokens: int = OUTFIT_TEXT_SELECTOR_MAX_TOKENS) -> str: | |
| api_key = _nvidia_api_key() | |
| if not api_key: | |
| raise RuntimeError(NVIDIA_API_KEY_MISSING_DETAIL) | |
| last_error: Exception | None = None | |
| for model_id in _candidate_model_ids(NVIDIA_MODEL_ID): | |
| current_max_tokens = max_tokens | |
| while current_max_tokens <= NVIDIA_REASONING_MAX_TOKENS: | |
| payload = { | |
| "model": model_id, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": prompt, | |
| } | |
| ], | |
| "max_tokens": current_max_tokens, | |
| "temperature": NVIDIA_TEMPERATURE, | |
| "top_p": NVIDIA_TOP_P, | |
| "stream": True, | |
| } | |
| if NVIDIA_ENABLE_THINKING: | |
| payload["chat_template_kwargs"] = {"enable_thinking": True} | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Accept": "text/event-stream", | |
| "Content-Type": "application/json", | |
| } | |
| last_error = None | |
| for attempt in range(NVIDIA_MAX_RETRIES + 1): | |
| try: | |
| response = requests.post( | |
| NVIDIA_INVOKE_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=NVIDIA_TIMEOUT_SECONDS, | |
| ) | |
| if response.status_code in {429, 500, 502, 503, 504}: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API transient failure {response.status_code}: {response.text[:500]}", | |
| status_code=503 if response.status_code == 429 else 502, | |
| ) | |
| if response.status_code >= 400: | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed with {response.status_code}: {response.text[:500]}", | |
| status_code=502, | |
| ) | |
| return _extract_streamed_nvidia_text(response) | |
| except NvidiaTokenLimitError as exc: | |
| last_error = exc | |
| break | |
| except (requests.RequestException, NvidiaGatewayError) as exc: | |
| last_error = exc | |
| if _is_degraded_function_error(exc): | |
| break | |
| if attempt >= NVIDIA_MAX_RETRIES: | |
| break | |
| time.sleep(NVIDIA_RETRY_BACKOFF_SECONDS * (attempt + 1)) | |
| except NvidiaPayloadError: | |
| raise | |
| if isinstance(last_error, NvidiaTokenLimitError) and current_max_tokens < NVIDIA_REASONING_MAX_TOKENS: | |
| current_max_tokens = min(current_max_tokens * 2, NVIDIA_REASONING_MAX_TOKENS) | |
| continue | |
| if _is_degraded_function_error(last_error or Exception()): | |
| print(f"[nvidia] model degraded, trying fallback model: {model_id}") | |
| break | |
| if isinstance(last_error, NvidiaGatewayError): | |
| raise last_error | |
| if isinstance(last_error, requests.RequestException): | |
| raise NvidiaGatewayError(f"NVIDIA API request failed: {last_error}", status_code=502) from last_error | |
| if last_error is not None: | |
| raise last_error | |
| break | |
| if isinstance(last_error, Exception): | |
| raise NvidiaGatewayError( | |
| f"NVIDIA API request failed on all configured models ({', '.join(_candidate_model_ids(NVIDIA_MODEL_ID))}): {last_error}", | |
| status_code=502, | |
| ) from last_error | |
| raise NvidiaGatewayError( | |
| "NVIDIA API request failed after exhausting reasoning token budget.", | |
| status_code=502, | |
| ) | |
| def parse_json_from_text(text: str) -> dict[str, Any]: | |
| if not text: | |
| return {} | |
| stripped = text.strip() | |
| try: | |
| return json.loads(stripped) | |
| except json.JSONDecodeError: | |
| s, e = stripped.find("{"), stripped.rfind("}") | |
| if s != -1 and e != -1 and e > s: | |
| try: | |
| return json.loads(stripped[s:e + 1]) | |
| except json.JSONDecodeError: | |
| pass | |
| return {} | |
| def normalize_specs(specs: dict[str, Any]) -> dict[str, str]: | |
| return { | |
| "type": str(specs.get("type", "Unknown")), | |
| "category": str(specs.get("category", "Unknown")), | |
| "color": str(specs.get("color", "Unknown")), | |
| "pattern": str(specs.get("pattern", "Unknown")), | |
| "fabric": str(specs.get("fabric", "Unknown")), | |
| "fit": str(specs.get("fit", "Unknown")), | |
| "occasion": str(specs.get("occasion", "Unknown")), | |
| "season": str(specs.get("season", "Unknown")), | |
| } | |
| def _clamp_score(value: Any, fallback: int = 0) -> int: | |
| if isinstance(value, str): | |
| match = re.search(r"-?\d+(?:\.\d+)?", value) | |
| value = match.group(0) if match else value | |
| try: | |
| score = int(round(float(value))) | |
| except (TypeError, ValueError): | |
| score = fallback | |
| return max(0, min(100, score)) | |
| def _safe_metadata_item(item: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "id": item.get("id"), | |
| "slot": item.get("type"), | |
| "type": item.get("type"), | |
| "category": item.get("category"), | |
| "color": item.get("color"), | |
| "pattern": item.get("pattern"), | |
| "fabric": item.get("fabric"), | |
| "fit": item.get("fit"), | |
| "occasion": item.get("style"), | |
| "season": item.get("season"), | |
| } | |
| def _placeholder_grid_tile(index_label: str, item: dict[str, Any], tile_size: int) -> Image.Image: | |
| tile = Image.new("RGB", (tile_size, tile_size), (245, 245, 245)) | |
| draw = ImageDraw.Draw(tile) | |
| font = ImageFont.load_default() | |
| draw.rectangle((8, 8, tile_size - 8, tile_size - 8), outline=(180, 180, 180), width=2) | |
| draw.text((14, 14), index_label, fill=(40, 40, 40), font=font) | |
| draw.text((14, 36), str(item.get("category") or "Unknown")[:24], fill=(80, 80, 80), font=font) | |
| draw.text((14, 54), str(item.get("color") or "Unknown")[:24], fill=(80, 80, 80), font=font) | |
| return tile | |
| def _load_grid_tile(image_url: str, index_label: str, item: dict[str, Any]) -> Image.Image: | |
| if not image_url or image_url.startswith("memory://"): | |
| return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) | |
| parsed = urlparse(image_url) | |
| if parsed.scheme not in {"http", "https"}: | |
| return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) | |
| try: | |
| req = Request( | |
| image_url, | |
| headers={"User-Agent": "Mozilla/5.0", "Accept": "image/*,*/*;q=0.8", "Referer": image_url}, | |
| ) | |
| with urlopen(req, timeout=OUTFIT_GRID_FETCH_TIMEOUT_SECONDS) as resp: | |
| tile = Image.open(io.BytesIO(resp.read())).convert("RGB") | |
| return ImageOps.fit( | |
| tile, | |
| (OUTFIT_GRID_CELL_SIZE, OUTFIT_GRID_CELL_SIZE), | |
| method=Image.Resampling.LANCZOS, | |
| centering=(0.5, 0.5), | |
| ) | |
| except Exception: | |
| return _placeholder_grid_tile(index_label, item, OUTFIT_GRID_CELL_SIZE) | |
| def _prune_outfit_grid_sessions() -> None: | |
| now = time.time() | |
| for session_id, record in list(_OUTFIT_GRID_SESSIONS.items()): | |
| if now - float(record.get("created_at") or 0) <= OUTFIT_GRID_SESSION_TTL_SECONDS: | |
| continue | |
| path = record.get("image_path") | |
| if isinstance(path, str): | |
| try: | |
| Path(path).unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| _OUTFIT_GRID_SESSIONS.pop(session_id, None) | |
| def _build_outfit_grid_session( | |
| tops: list[dict[str, Any]], | |
| bottoms: list[dict[str, Any]], | |
| others: list[dict[str, Any]], | |
| occasion: str, | |
| user_profile: dict[str, Any] | None, | |
| weather: dict[str, Any] | None, | |
| region: str, | |
| ) -> dict[str, Any]: | |
| _prune_outfit_grid_sessions() | |
| session_id = str(uuid.uuid4()) | |
| rows = [ | |
| (1, "Topwear", tops), | |
| (2, "Bottomwear", bottoms), | |
| ] | |
| if others: | |
| rows.append((3, "Others", others)) | |
| columns = max(len(tops), len(bottoms), len(others), 1) | |
| cell_span = OUTFIT_GRID_CELL_SIZE + 2 * OUTFIT_GRID_PADDING | |
| row_span = OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT + 2 * OUTFIT_GRID_PADDING | |
| canvas = Image.new("RGB", (columns * cell_span, len(rows) * row_span), (255, 255, 255)) | |
| draw = ImageDraw.Draw(canvas) | |
| font = ImageFont.load_default() | |
| metadata_map: dict[str, dict[str, Any]] = {} | |
| item_lookup: dict[str, dict[str, Any]] = {} | |
| def draw_row(items: list[dict[str, Any]], row_index: int, row_name: str) -> None: | |
| if not items: | |
| y = (row_index - 1) * row_span + 10 | |
| draw.text((12, y), f"Row {row_index}: {row_name} (no items)", fill=(120, 120, 120), font=font) | |
| return | |
| for col_index, item in enumerate(items, start=1): | |
| index_label = f"{row_index}:{col_index}" | |
| x0 = (col_index - 1) * cell_span + OUTFIT_GRID_PADDING | |
| y0 = (row_index - 1) * row_span + OUTFIT_GRID_PADDING | |
| tile = _load_grid_tile(str(item.get("image_url") or ""), index_label, item) | |
| canvas.paste(tile, (x0, y0)) | |
| label_box = ( | |
| x0, | |
| y0 + OUTFIT_GRID_CELL_SIZE, | |
| x0 + OUTFIT_GRID_CELL_SIZE, | |
| y0 + OUTFIT_GRID_CELL_SIZE + OUTFIT_GRID_LABEL_HEIGHT, | |
| ) | |
| draw.rectangle(label_box, fill=(20, 20, 20)) | |
| label_text = f"{index_label} | {str(item.get('color') or 'Unknown')[:18]} {str(item.get('category') or 'Unknown')[:18]}" | |
| draw.text((label_box[0] + 6, label_box[1] + 8), label_text[:36], fill=(255, 255, 255), font=font) | |
| metadata_map[index_label] = _safe_metadata_item(item) | |
| item_lookup[index_label] = item | |
| for row_index, row_name, row_items in rows: | |
| draw_row(row_items, row_index, row_name) | |
| OUTFIT_GRID_SESSION_DIR.mkdir(parents=True, exist_ok=True) | |
| image_path = OUTFIT_GRID_SESSION_DIR / f"{session_id}.png" | |
| canvas.save(image_path, format="PNG") | |
| _OUTFIT_GRID_SESSIONS[session_id] = { | |
| "image_path": str(image_path), | |
| "metadata_map": metadata_map, | |
| "created_at": time.time(), | |
| "occasion": occasion, | |
| "user_profile": user_profile or {}, | |
| "weather": weather or {}, | |
| "region": region, | |
| } | |
| return { | |
| "session_id": session_id, | |
| "image": canvas, | |
| "metadata_map": metadata_map, | |
| "item_lookup": item_lookup, | |
| "image_path": str(image_path), | |
| } | |
| def _grid_scoring_prompt( | |
| metadata_map: dict[str, dict[str, Any]], | |
| occasion: str, | |
| weather: dict[str, Any] | None, | |
| user_profile: dict[str, Any] | None, | |
| region: str, | |
| anchor_mode: str, | |
| anchor_item: dict[str, Any] | None, | |
| locked_top_index: str | None, | |
| locked_bottom_index: str | None, | |
| locked_other_index: str | None, | |
| combination_count: int, | |
| top_k: int, | |
| ) -> str: | |
| compact_json = lambda value: json.dumps(value, ensure_ascii=True, separators=(",", ":")) | |
| return OUTFIT_GRID_SCORING_PROMPT_TEMPLATE.format( | |
| occasion=occasion or "casual", | |
| region=region or "global", | |
| weather_json=compact_json(weather or {}), | |
| user_profile_json=compact_json(user_profile or {}), | |
| anchor_mode=anchor_mode, | |
| locked_top_index=locked_top_index or "None", | |
| locked_bottom_index=locked_bottom_index or "None", | |
| locked_other_index=locked_other_index or "None", | |
| anchor_item_json=compact_json(_safe_metadata_item(anchor_item or {})), | |
| metadata_json=compact_json(metadata_map), | |
| combination_count=combination_count, | |
| top_k=top_k, | |
| ) | |
| def _fallback_rule_recommendations( | |
| occasion: str, | |
| case_name: str, | |
| tops: list[dict[str, Any]], | |
| bottoms: list[dict[str, Any]], | |
| others: list[dict[str, Any]], | |
| top_k: int, | |
| include_pair_outfits: bool = True, | |
| include_other_outfits: bool = True, | |
| ) -> dict[str, Any]: | |
| outfits: list[dict[str, Any]] = [] | |
| if include_pair_outfits: | |
| for top in tops: | |
| for bottom in bottoms: | |
| scored = score_pair_full(top, bottom, occasion, other=None) | |
| outfits.append(_build_outfit_payload(scored, top, bottom, rank=0, other=None)) | |
| if include_other_outfits: | |
| for other in others: | |
| # Score standalone Others as complete outfits rather than as add-ons. | |
| scored = score_pair_full(other, other, occasion, other=None) | |
| base_reason = str(scored.get("reason") or "") | |
| scored["reason"] = f"{other.get('color', 'This')} {other.get('category', 'item')} works as a complete standalone look." | |
| if base_reason: | |
| scored["reason"] = f"{scored['reason']} {base_reason}" | |
| scored["tip"] = "Use footwear and accessories only to complement this single-piece outfit." | |
| outfits.append(_build_outfit_payload(scored, None, None, rank=0, other=other)) | |
| outfits.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) | |
| for index, outfit in enumerate(outfits[:top_k], start=1): | |
| outfit["rank"] = index | |
| if case_name == "D" and outfits: | |
| return { | |
| "occasion": occasion, | |
| "case": case_name, | |
| "selected_outfit_score": outfits[0], | |
| "recommendations": [], | |
| "improved_recommendations": outfits[1:top_k], | |
| "total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0), | |
| "notice": None, | |
| "engine_version": "scoring-v2", | |
| } | |
| return { | |
| "occasion": occasion, | |
| "case": case_name, | |
| "selected_outfit_score": None, | |
| "recommendations": outfits[:top_k], | |
| "improved_recommendations": [], | |
| "total_combinations_checked": (len(tops) * len(bottoms) if include_pair_outfits else 0) + (len(others) if include_other_outfits else 0), | |
| "notice": None, | |
| "engine_version": "scoring-v2", | |
| } | |
| def _occasion_prefers_standalone_others(occasion: str) -> bool: | |
| occasion_n = _norm(occasion) | |
| if not occasion_n: | |
| return False | |
| return any( | |
| keyword in occasion_n | |
| for keyword in [ | |
| "wedding", | |
| "festive", | |
| "ethnic", | |
| "ceremony", | |
| "engagement", | |
| "reception", | |
| "sangeet", | |
| "haldi", | |
| "mehndi", | |
| ] | |
| ) | |
| def _merge_standalone_others_for_priority_occasions( | |
| result: dict[str, Any], | |
| occasion: str, | |
| others: list[dict[str, Any]], | |
| top_k: int, | |
| ) -> dict[str, Any]: | |
| if not others or not _occasion_prefers_standalone_others(occasion): | |
| return result | |
| if str(result.get("case") or "").upper() == "D": | |
| return result | |
| standalone_payload = _fallback_rule_recommendations( | |
| occasion=occasion, | |
| case_name=str(result.get("case") or "A"), | |
| tops=[], | |
| bottoms=[], | |
| others=others, | |
| top_k=top_k, | |
| include_pair_outfits=False, | |
| include_other_outfits=True, | |
| ) | |
| standalone = [ | |
| dict(outfit) | |
| for outfit in (standalone_payload.get("recommendations") or []) | |
| if isinstance(outfit, dict) | |
| ] | |
| if not standalone: | |
| return result | |
| for outfit in standalone: | |
| base_score = int(outfit.get("score") or 0) | |
| boosted_score = min(100, max(base_score, 78) + 12) | |
| outfit["score"] = boosted_score | |
| breakdown = outfit.get("breakdown") | |
| if isinstance(breakdown, dict): | |
| breakdown["occasion"] = min(100, max(int(breakdown.get("occasion") or 0), 92)) | |
| recommendations = [ | |
| dict(outfit) | |
| for outfit in (result.get("recommendations") or []) | |
| if isinstance(outfit, dict) | |
| ] | |
| merged = [*recommendations, *standalone] | |
| merged.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) | |
| merged = merged[:top_k] | |
| for index, outfit in enumerate(merged, start=1): | |
| outfit["rank"] = index | |
| result["recommendations"] = merged | |
| result["total_combinations_checked"] = int(result.get("total_combinations_checked") or 0) + len(others) | |
| return result | |
| def _current_fallback_recommendations( | |
| wardrobe_items: list[dict[str, Any]], | |
| occasion: str, | |
| top_selected: dict[str, Any] | None, | |
| bottom_selected: dict[str, Any] | None, | |
| other_selected: dict[str, Any] | None, | |
| weather: dict[str, Any] | None, | |
| user_profile: dict[str, Any] | None, | |
| region: str, | |
| top_k: int, | |
| candidate_pool: int, | |
| diversity_lambda: float, | |
| case_name: str, | |
| tops: list[dict[str, Any]], | |
| bottoms: list[dict[str, Any]], | |
| others: list[dict[str, Any]], | |
| ) -> dict[str, Any]: | |
| def _strip_optional_slots(outfit: dict[str, Any]) -> dict[str, Any]: | |
| cleaned = dict(outfit) | |
| cleaned.pop("shoes", None) | |
| cleaned.pop("accessory", None) | |
| if cleaned.get("top") and cleaned.get("bottom"): | |
| cleaned.pop("other", None) | |
| return cleaned | |
| try: | |
| result = get_recommendation_service().recommend( | |
| wardrobe_items=wardrobe_items, | |
| occasion=occasion, | |
| top_selected=top_selected, | |
| bottom_selected=bottom_selected, | |
| other_selected=other_selected, | |
| weather=weather, | |
| user_profile=user_profile, | |
| region=region, | |
| top_k=top_k, | |
| candidate_pool=candidate_pool, | |
| diversity_lambda=diversity_lambda, | |
| ) | |
| if isinstance(result.get("selected_outfit_score"), dict): | |
| result["selected_outfit_score"] = _strip_optional_slots(result["selected_outfit_score"]) | |
| recommendations = [ | |
| _strip_optional_slots(entry) | |
| for entry in (result.get("recommendations") or []) | |
| if isinstance(entry, dict) | |
| ] | |
| improved = [ | |
| _strip_optional_slots(entry) | |
| for entry in (result.get("improved_recommendations") or []) | |
| if isinstance(entry, dict) | |
| ] | |
| result["recommendations"] = recommendations[:top_k] | |
| result["improved_recommendations"] = improved[:top_k] | |
| result = _merge_standalone_others_for_priority_occasions( | |
| result=result, | |
| occasion=occasion, | |
| others=others, | |
| top_k=top_k, | |
| ) | |
| print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=fashion_ai") | |
| return result | |
| except Exception as exc: | |
| print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=scoring-v2 reason={exc!r}") | |
| return _fallback_rule_recommendations( | |
| occasion, | |
| case_name, | |
| tops, | |
| bottoms, | |
| others, | |
| top_k, | |
| include_pair_outfits=other_selected is None, | |
| include_other_outfits=bool(others), | |
| ) | |
| def _extract_grid_indices(outfit_raw: dict[str, Any]) -> tuple[str, str, str]: | |
| top_index = str( | |
| outfit_raw.get("top_index") | |
| or outfit_raw.get("top") | |
| or outfit_raw.get("top_cell") | |
| or "" | |
| ).strip() | |
| bottom_index = str( | |
| outfit_raw.get("bottom_index") | |
| or outfit_raw.get("bottom") | |
| or outfit_raw.get("bottom_cell") | |
| or "" | |
| ).strip() | |
| other_index = str( | |
| outfit_raw.get("other_index") | |
| or outfit_raw.get("other") | |
| or outfit_raw.get("other_cell") | |
| or "" | |
| ).strip() | |
| if top_index and bottom_index and other_index: | |
| return top_index, bottom_index, other_index | |
| combo_text = str( | |
| outfit_raw.get("combination") | |
| or outfit_raw.get("combo") | |
| or outfit_raw.get("pair") | |
| or "" | |
| ) | |
| matches = re.findall(r"[123]:\d+", combo_text) | |
| top_match = next((value for value in matches if value.startswith("1:")), "") | |
| bottom_match = next((value for value in matches if value.startswith("2:")), "") | |
| other_match = next((value for value in matches if value.startswith("3:")), "") | |
| return top_index or top_match, bottom_index or bottom_match, other_index or other_match | |
| def _rank_anchor_candidates( | |
| anchor_item: dict[str, Any], | |
| candidates: list[dict[str, Any]], | |
| occasion: str, | |
| anchor_is_top: bool, | |
| ) -> list[dict[str, Any]]: | |
| ranked: list[tuple[int, dict[str, Any]]] = [] | |
| for candidate in candidates: | |
| scored = ( | |
| score_pair_full(anchor_item, candidate, occasion) | |
| if anchor_is_top | |
| else score_pair_full(candidate, anchor_item, occasion) | |
| ) | |
| ranked.append((int(scored.get("score") or 0), candidate)) | |
| ranked.sort(key=lambda pair: pair[0], reverse=True) | |
| compatible = [item for score, item in ranked if score >= OUTFIT_ANCHOR_MIN_SCORE] | |
| if compatible: | |
| return compatible | |
| return [item for _, item in ranked] | |
| def _text_selector_item_payload(item: dict[str, Any], index: int) -> dict[str, Any]: | |
| return { | |
| "index": index, | |
| "id": str(item.get("id") or ""), | |
| "type": str(item.get("type") or ""), | |
| "category": str(item.get("category") or "Unknown"), | |
| "color": str(item.get("color") or "Unknown"), | |
| "pattern": str(item.get("pattern") or "Unknown"), | |
| "fabric": str(item.get("fabric") or "Unknown"), | |
| "fit": str(item.get("fit") or "Unknown"), | |
| "season": str(item.get("season") or "Unknown"), | |
| "style": str(item.get("style") or "Unknown"), | |
| "occasion": str(item.get("occasion") or "Unknown"), | |
| } | |
| def _select_grid_candidates_with_text_ai( | |
| candidates: list[dict[str, Any]], | |
| slot_name: str, | |
| occasion: str, | |
| limit: int, | |
| anchor_mode: str, | |
| anchor_item: dict[str, Any] | None, | |
| ) -> list[dict[str, Any]]: | |
| if len(candidates) <= limit: | |
| return candidates | |
| if not OUTFIT_TEXT_PRESELECT_ENABLED: | |
| return candidates[:limit] | |
| candidate_payload = [ | |
| _text_selector_item_payload(item, idx + 1) | |
| for idx, item in enumerate(candidates) | |
| ] | |
| anchor_payload = _text_selector_item_payload(anchor_item, 0) if anchor_item else None | |
| prompt = ( | |
| "You are a fashion ranking assistant for candidate preselection.\n" | |
| f"Occasion: {occasion}\n" | |
| f"Slot to rank: {slot_name}\n" | |
| f"Anchor mode: {anchor_mode}\n" | |
| f"Keep exactly {limit} items if possible (or fewer when candidates are fewer).\n\n" | |
| "Goal:\n" | |
| "Select the strongest candidates for downstream outfit matching using only textual metadata.\n" | |
| "Prioritize occasion relevance, compatibility with anchor context, and diversity in color/pattern/style.\n\n" | |
| "Rules:\n" | |
| "1. Choose only IDs from the provided candidates.\n" | |
| "2. No duplicate IDs.\n" | |
| "3. Return strictly valid JSON and nothing else.\n" | |
| "4. Prefer candidates that maximize useful pairing variety, not near-duplicates.\n\n" | |
| "Return EXACT shape:\n" | |
| "{\"selected_ids\":[\"id1\",\"id2\"]}\n\n" | |
| f"Anchor item JSON:\n{json.dumps(anchor_payload, ensure_ascii=True)}\n\n" | |
| f"Candidate items JSON:\n{json.dumps(candidate_payload, ensure_ascii=True)}" | |
| ) | |
| try: | |
| response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS) | |
| parsed_payload = parse_json_from_text(response_text) | |
| selected_ids_raw = parsed_payload.get("selected_ids") if isinstance(parsed_payload, dict) else None | |
| selected_indices_raw = parsed_payload.get("selected_indices") if isinstance(parsed_payload, dict) else None | |
| selected_ids: list[str] = [] | |
| if isinstance(selected_ids_raw, list): | |
| selected_ids = [ | |
| str(value).strip() | |
| for value in selected_ids_raw | |
| if str(value).strip() | |
| ] | |
| if not selected_ids and isinstance(selected_indices_raw, list): | |
| for raw_index in selected_indices_raw: | |
| try: | |
| index = int(raw_index) | |
| except (TypeError, ValueError): | |
| continue | |
| if 1 <= index <= len(candidates): | |
| selected_ids.append(str(candidates[index - 1].get("id") or "").strip()) | |
| elif 0 <= index < len(candidates): | |
| selected_ids.append(str(candidates[index].get("id") or "").strip()) | |
| id_to_item = { | |
| str(item.get("id") or "").strip(): item | |
| for item in candidates | |
| } | |
| selected: list[dict[str, Any]] = [] | |
| seen: set[str] = set() | |
| for item_id in selected_ids: | |
| if not item_id or item_id in seen: | |
| continue | |
| item = id_to_item.get(item_id) | |
| if not item: | |
| continue | |
| selected.append(item) | |
| seen.add(item_id) | |
| if len(selected) >= limit: | |
| break | |
| if len(selected) < limit: | |
| for item in candidates: | |
| item_id = str(item.get("id") or "").strip() | |
| if item_id in seen: | |
| continue | |
| selected.append(item) | |
| seen.add(item_id) | |
| if len(selected) >= limit: | |
| break | |
| selected = selected[:limit] | |
| if len(selected) == limit: | |
| print( | |
| f"[outfit-preselect] slot={slot_name} mode={anchor_mode} " | |
| f"in={len(candidates)} kept={len(selected)} strategy={OUTFIT_TEXT_SELECTOR_NAME}" | |
| ) | |
| return selected | |
| raise NvidiaPayloadError( | |
| f"Text preselector returned insufficient candidates: requested={limit} got={len(selected)}" | |
| ) | |
| except Exception as exc: | |
| print( | |
| f"[outfit-preselect] slot={slot_name} mode={anchor_mode} " | |
| f"in={len(candidates)} kept={limit} strategy=head reason={exc!r}" | |
| ) | |
| return candidates[:limit] | |
| _WEDDING_ETHNIC_TOPWEAR_CATEGORIES = { | |
| "kurta", "sherwani", "nehru jacket", "bandhgala", "achkan", | |
| "saree blouse", "lehenga choli", "anarkali", | |
| } | |
| def _filter_garments_for_wedding(items: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Use a single LLM call to select only wedding-appropriate garments | |
| and reassign their slot type for outfit scoring. | |
| Ethnic/formal pieces currently typed as 'others' (sherwanis, kurtas, | |
| blazers, etc.) are promoted to 'topwear' so they enter the scoring grid | |
| as primary tops instead of only appearing in the "lock other" section.""" | |
| if not items: | |
| return items | |
| garment_summaries = [] | |
| for item in items: | |
| garment_summaries.append({ | |
| "id": str(item.get("id") or ""), | |
| "type": str(item.get("type") or ""), | |
| "category": str(item.get("category") or "Unknown"), | |
| "color": str(item.get("color") or "Unknown"), | |
| "pattern": str(item.get("pattern") or "Unknown"), | |
| "fabric": str(item.get("fabric") or "Unknown"), | |
| "fit": str(item.get("fit") or "Unknown"), | |
| "style": str(item.get("style") or "Unknown"), | |
| "occasion": str(item.get("occasion") or "Unknown"), | |
| }) | |
| prompt = ( | |
| "You are a fashion expert selecting garments suitable for a WEDDING occasion.\n\n" | |
| "Wedding-appropriate garments include:\n" | |
| "- Ethnic wear: kurtas, sherwanis, nehru jackets, bandhgalas, achkans, sarees, lehengas, churidars, dhotis, salwar kameez\n" | |
| "- Formal/semi-formal: blazers, suit jackets, dress shirts, formal trousers, dress pants, waistcoats\n" | |
| "- Elegant pieces: silk fabrics, embroidered items, brocade, velvet\n" | |
| "- Accessories that work for weddings: formal shoes, stoles, dupattas\n\n" | |
| "Garments to EXCLUDE:\n" | |
| "- Casual everyday items: plain t-shirts, basic tees, hoodies, sweatshirts, joggers, denim jeans, gym shorts\n" | |
| "- Sportswear, athleisure, distressed or ripped clothing\n" | |
| "- Very casual items like graphic tees, cargo shorts, flip-flops\n" | |
| "- Basic casual shirts UNLESS they are clearly formal/dress shirts\n\n" | |
| "If a garment is borderline (e.g. a smart chino, a dark polo), include it only if it could realistically " | |
| "be part of a wedding guest outfit.\n\n" | |
| "For each selected garment, also decide its ROLE in a wedding outfit:\n" | |
| "- \"topwear\": items worn on the upper body as the primary piece (kurtas, sherwanis, blazers, dress shirts, suit jackets, nehru jackets, waistcoats)\n" | |
| "- \"bottomwear\": items worn on the lower body (formal trousers, churidars, dress pants, lehengas)\n" | |
| "- \"others\": layering pieces, accessories, footwear, dupattas, stoles\n\n" | |
| "Return strictly valid JSON and nothing else.\n\n" | |
| "Return EXACT shape:\n" | |
| "{\"selected\":[{\"id\":\"...\",\"role\":\"topwear|bottomwear|others\"},...]}\n\n" | |
| f"Garments JSON:\n{json.dumps(garment_summaries, ensure_ascii=True)}" | |
| ) | |
| try: | |
| response_text = run_nvidia_text_inference(prompt, max_tokens=OUTFIT_TEXT_SELECTOR_MAX_TOKENS) | |
| parsed = parse_json_from_text(response_text) | |
| selected_raw = parsed.get("selected") if isinstance(parsed, dict) else None | |
| if not isinstance(selected_raw, list): | |
| print("[wedding-filter] LLM returned no selected list, using all items") | |
| return items | |
| role_map: dict[str, str] = {} | |
| for entry in selected_raw: | |
| if not isinstance(entry, dict): | |
| continue | |
| item_id = str(entry.get("id") or "").strip() | |
| role = str(entry.get("role") or "").strip().lower() | |
| if item_id and role in {"topwear", "bottomwear", "others"}: | |
| role_map[item_id] = role | |
| if not role_map: | |
| print("[wedding-filter] LLM returned empty selections, using all items") | |
| return items | |
| filtered: list[dict[str, Any]] = [] | |
| for item in items: | |
| item_id = str(item.get("id") or "").strip() | |
| if item_id not in role_map: | |
| continue | |
| promoted = {**item, "type": role_map[item_id]} | |
| filtered.append(promoted) | |
| print( | |
| f"[wedding-filter] in={len(items)} kept={len(filtered)} " | |
| f"tops={sum(1 for i in filtered if i.get('type') == 'topwear')} " | |
| f"bottoms={sum(1 for i in filtered if i.get('type') == 'bottomwear')} " | |
| f"others={sum(1 for i in filtered if i.get('type') == 'others')}" | |
| ) | |
| return filtered if filtered else items | |
| except Exception as exc: | |
| print(f"[wedding-filter] LLM call failed reason={exc!r}, using all items") | |
| return items | |
| def _resolve_outfit_grid_sources( | |
| wardrobe_items: list[dict[str, Any]], | |
| occasion: str, | |
| top_selected: dict[str, Any] | None, | |
| bottom_selected: dict[str, Any] | None, | |
| other_selected: dict[str, Any] | None, | |
| ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], str, dict[str, Any] | None]: | |
| all_tops = [i for i in wardrobe_items if i.get("type") == "topwear"] | |
| all_bottoms = [i for i in wardrobe_items if i.get("type") == "bottomwear"] | |
| all_others = [ | |
| i | |
| for i in wardrobe_items | |
| if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"} | |
| ] | |
| other_pool = [other_selected] if other_selected else [] | |
| if top_selected and bottom_selected: | |
| anchor_mode = "locked-top+locked-bottom+locked-other" if other_selected else "locked-top+locked-bottom" | |
| return [top_selected], [bottom_selected], other_pool, anchor_mode, other_selected or top_selected | |
| if top_selected: | |
| ranked_bottoms = _rank_anchor_candidates( | |
| anchor_item=top_selected, | |
| candidates=all_bottoms, | |
| occasion=occasion, | |
| anchor_is_top=True, | |
| ) | |
| anchor_mode = "locked-top+locked-other" if other_selected else "locked-top" | |
| return [top_selected], ranked_bottoms, other_pool, anchor_mode, other_selected or top_selected | |
| if bottom_selected: | |
| ranked_tops = _rank_anchor_candidates( | |
| anchor_item=bottom_selected, | |
| candidates=all_tops, | |
| occasion=occasion, | |
| anchor_is_top=False, | |
| ) | |
| anchor_mode = "locked-bottom+locked-other" if other_selected else "locked-bottom" | |
| return ranked_tops, [bottom_selected], other_pool, anchor_mode, other_selected or bottom_selected | |
| if other_selected: | |
| return all_tops, all_bottoms, [], "locked-other", other_selected | |
| return all_tops, all_bottoms, [], "none", None | |
| def _normalize_ai_outfit_payload( | |
| parsed_payload: dict[str, Any], | |
| item_lookup: dict[str, dict[str, Any]], | |
| occasion: str, | |
| case_name: str, | |
| top_k: int, | |
| total_combinations: int, | |
| session_id: str, | |
| ) -> dict[str, Any]: | |
| raw_recommendations = parsed_payload.get("recommendations") | |
| if not isinstance(raw_recommendations, list): | |
| raw_recommendations = parsed_payload.get("outfits") | |
| if not isinstance(raw_recommendations, list): | |
| raw_recommendations = parsed_payload.get("top_outfits") | |
| if not isinstance(raw_recommendations, list): | |
| raise NvidiaPayloadError(f"AI outfit scorer returned no recommendation list: {parsed_payload}") | |
| recommendations: list[dict[str, Any]] = [] | |
| for raw_entry in raw_recommendations: | |
| if not isinstance(raw_entry, dict): | |
| continue | |
| top_index, bottom_index, other_index = _extract_grid_indices(raw_entry) | |
| top_item = item_lookup.get(top_index) | |
| bottom_item = item_lookup.get(bottom_index) | |
| other_item = item_lookup.get(other_index) if other_index else None | |
| if not top_item or not bottom_item: | |
| continue | |
| base_breakdown = raw_entry.get("breakdown") if isinstance(raw_entry.get("breakdown"), dict) else {} | |
| breakdown = { | |
| "color": _clamp_score(base_breakdown.get("color"), 70), | |
| "style": _clamp_score(base_breakdown.get("style"), 70), | |
| "occasion": _clamp_score(base_breakdown.get("occasion"), 70), | |
| "fit": _clamp_score(base_breakdown.get("fit"), 70), | |
| "pattern": _clamp_score(base_breakdown.get("pattern"), 70), | |
| "season": _clamp_score(base_breakdown.get("season"), 70), | |
| } | |
| recommendation = { | |
| "rank": 0, | |
| "score": _clamp_score(raw_entry.get("score"), 0), | |
| "breakdown": breakdown, | |
| "reason": str(raw_entry.get("reason") or "AI-generated outfit recommendation."), | |
| "tip": str(raw_entry.get("tip") or "Use matching accessories to complete this look."), | |
| "combination": f"{top_index} + {bottom_index}" + (f" + {other_index}" if other_item else ""), | |
| "grid_session_id": session_id, | |
| "top": { | |
| "id": top_item.get("id"), | |
| "category": top_item.get("category"), | |
| "color": top_item.get("color"), | |
| "image_url": top_item.get("image_url", ""), | |
| }, | |
| "bottom": { | |
| "id": bottom_item.get("id"), | |
| "category": bottom_item.get("category"), | |
| "color": bottom_item.get("color"), | |
| "image_url": bottom_item.get("image_url", ""), | |
| }, | |
| } | |
| if other_item: | |
| recommendation["other"] = { | |
| "id": other_item.get("id"), | |
| "category": other_item.get("category"), | |
| "color": other_item.get("color"), | |
| "image_url": other_item.get("image_url", ""), | |
| } | |
| recommendations.append(recommendation) | |
| if not recommendations: | |
| raise NvidiaPayloadError(f"AI outfit scorer returned no valid index-mapped recommendations: {parsed_payload}") | |
| recommendations.sort(key=lambda outfit: int(outfit.get("score") or 0), reverse=True) | |
| recommendations = recommendations[:top_k] | |
| for index, outfit in enumerate(recommendations, start=1): | |
| outfit["rank"] = index | |
| if case_name == "D": | |
| return { | |
| "occasion": occasion, | |
| "case": case_name, | |
| "selected_outfit_score": recommendations[0], | |
| "recommendations": [], | |
| "improved_recommendations": recommendations, | |
| "total_combinations_checked": total_combinations, | |
| "notice": None, | |
| "grid_session_id": session_id, | |
| "engine_version": OUTFIT_AI_SCORER_NAME, | |
| } | |
| return { | |
| "occasion": occasion, | |
| "case": case_name, | |
| "selected_outfit_score": None, | |
| "recommendations": recommendations, | |
| "improved_recommendations": [], | |
| "total_combinations_checked": total_combinations, | |
| "notice": None, | |
| "grid_session_id": session_id, | |
| "engine_version": OUTFIT_AI_SCORER_NAME, | |
| } | |
| def _recommend_outfits_with_ai_grid( | |
| wardrobe_items: list[dict[str, Any]], | |
| occasion: str, | |
| top_selected: dict[str, Any] | None, | |
| bottom_selected: dict[str, Any] | None, | |
| other_selected: dict[str, Any] | None, | |
| weather: dict[str, Any] | None, | |
| user_profile: dict[str, Any] | None, | |
| region: str, | |
| top_k: int, | |
| case_name: str, | |
| ) -> dict[str, Any]: | |
| top_source, bottom_source, other_source, anchor_mode, anchor_item = _resolve_outfit_grid_sources( | |
| wardrobe_items=wardrobe_items, | |
| occasion=occasion, | |
| top_selected=top_selected, | |
| bottom_selected=bottom_selected, | |
| other_selected=other_selected, | |
| ) | |
| top_pool_count = len(top_source) | |
| bottom_pool_count = len(bottom_source) | |
| if not top_selected and len(top_source) > OUTFIT_GRID_MAX_TOP_ITEMS: | |
| top_source = _select_grid_candidates_with_text_ai( | |
| candidates=top_source, | |
| slot_name="topwear", | |
| occasion=occasion, | |
| limit=OUTFIT_GRID_MAX_TOP_ITEMS, | |
| anchor_mode=anchor_mode, | |
| anchor_item=anchor_item, | |
| ) | |
| if not bottom_selected and len(bottom_source) > OUTFIT_GRID_MAX_BOTTOM_ITEMS: | |
| bottom_source = _select_grid_candidates_with_text_ai( | |
| candidates=bottom_source, | |
| slot_name="bottomwear", | |
| occasion=occasion, | |
| limit=OUTFIT_GRID_MAX_BOTTOM_ITEMS, | |
| anchor_mode=anchor_mode, | |
| anchor_item=anchor_item, | |
| ) | |
| if not top_source or not bottom_source: | |
| raise NvidiaPayloadError("AI outfit scorer requires at least one topwear and one bottomwear item.") | |
| grid_session = _build_outfit_grid_session( | |
| tops=top_source, | |
| bottoms=bottom_source, | |
| others=other_source, | |
| occasion=occasion, | |
| user_profile=user_profile, | |
| weather=weather, | |
| region=region, | |
| ) | |
| combination_count = len(top_source) * len(bottom_source) * ( | |
| len(other_source) if other_selected else (len(other_source) + 1 if other_source else 1) | |
| ) | |
| prompt = _grid_scoring_prompt( | |
| metadata_map=grid_session["metadata_map"], | |
| occasion=occasion, | |
| weather=weather, | |
| user_profile=user_profile, | |
| region=region, | |
| anchor_mode=anchor_mode, | |
| anchor_item=anchor_item, | |
| locked_top_index="1:1" if top_selected else None, | |
| locked_bottom_index="2:1" if bottom_selected else None, | |
| locked_other_index="3:1" if other_selected else None, | |
| combination_count=combination_count, | |
| top_k=top_k, | |
| ) | |
| if not other_source: | |
| prompt = ( | |
| f"{prompt}\n\n" | |
| "Important: This grid contains only Row 1 (Topwear) and Row 2 (Bottomwear). " | |
| "Always set other_index to null." | |
| ) | |
| print( | |
| f"[outfit-grid] mode={anchor_mode} session={grid_session['session_id']} " | |
| f"tops_in={top_pool_count} bottoms_in={bottom_pool_count} " | |
| f"rows=2 tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} " | |
| f"combinations={combination_count}" | |
| ) | |
| model_text = run_nvidia_inference(grid_session["image"], prompt, max_tokens=OUTFIT_AI_MAX_TOKENS) | |
| parsed_payload = parse_json_from_text(model_text) | |
| if not parsed_payload: | |
| raise NvidiaPayloadError(f"AI outfit scorer returned unparsable JSON: {model_text[:500]}") | |
| result = _normalize_ai_outfit_payload( | |
| parsed_payload=parsed_payload, | |
| item_lookup=grid_session["item_lookup"], | |
| occasion=occasion, | |
| case_name=case_name, | |
| top_k=top_k, | |
| total_combinations=combination_count, | |
| session_id=grid_session["session_id"], | |
| ) | |
| print( | |
| f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} " | |
| f"mode={anchor_mode} session={grid_session['session_id']} " | |
| f"tops={len(top_source)} bottoms={len(bottom_source)} others={len(other_source)} " | |
| f"combinations={combination_count}" | |
| ) | |
| return result | |
| def _raise_http_error(exc: Exception) -> NoReturn: | |
| print("Classification request failed:", repr(exc)) | |
| traceback.print_exc() | |
| if isinstance(exc, RuntimeError) and str(exc) == NVIDIA_API_KEY_MISSING_DETAIL: | |
| raise HTTPException(status_code=503, detail=NVIDIA_API_KEY_MISSING_DETAIL) from exc | |
| if isinstance(exc, NvidiaGatewayError): | |
| raise HTTPException(status_code=exc.status_code, detail=str(exc)) from exc | |
| if isinstance(exc, NvidiaPayloadError): | |
| raise HTTPException(status_code=502, detail=str(exc)) from exc | |
| if isinstance(exc, requests.RequestException): | |
| raise HTTPException(status_code=502, detail=f"NVIDIA API request failed: {exc}") from exc | |
| raise HTTPException(status_code=500, detail=str(exc)) from exc | |
| # --------------------------------------------------------------------------- | |
| # Endpoints | |
| # --------------------------------------------------------------------------- | |
| def root() -> dict[str, str]: | |
| return {"status": "ok", "message": "Wardrobe Classifier API v2"} | |
| def health() -> dict[str, str]: | |
| return { | |
| "status": "ok", | |
| "classification_provider": "nvidia", | |
| "model": NVIDIA_MODEL_ID, | |
| "nvidia_api_configured": str(bool(_nvidia_api_key())), | |
| "nvidia_invoke_url": NVIDIA_INVOKE_URL, | |
| "engine_version": "scoring-v2", | |
| "outfit_matching_provider": "nemotron", | |
| } | |
| def product_urls(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| color = str(payload.get("color") or "") | |
| category = str(payload.get("category") or "") | |
| gender = payload.get("gender") | |
| max_products = int(payload.get("max_products") or 30) | |
| store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) | |
| if not color or not category: | |
| raise HTTPException(status_code=400, detail="color and category are required") | |
| recommendation = ScraperRecommendation( | |
| color=color, | |
| category=category, | |
| gender=str(gender) if gender else None, | |
| ) | |
| try: | |
| search_urls = _build_store_search_urls_from_recommendation( | |
| recommendation, | |
| store=store, | |
| occasion="", | |
| ) | |
| products: list[dict[str, str]] = [] | |
| seen_links: set[str] = set() | |
| for search_url in search_urls: | |
| for product in _extract_store_product_summaries(search_url, store=store): | |
| item_link = str(product.get("item_link") or "").strip() | |
| if not item_link or item_link in seen_links: | |
| continue | |
| seen_links.add(item_link) | |
| products.append(product) | |
| if len(products) >= max_products: | |
| break | |
| if len(products) >= max_products: | |
| break | |
| response_payload: dict[str, Any] = { | |
| "store": store, | |
| "search_urls": search_urls, | |
| "product_urls": [item["item_link"] for item in products], | |
| "products": products, | |
| "count": len(products), | |
| } | |
| response_payload["saved_json_path"] = _save_scraper_json_payload("product_urls", response_payload) | |
| return response_payload | |
| except requests.RequestException as exc: | |
| raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc | |
| def suggestions(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| occasion = str(payload.get("occasion") or "casual") | |
| target_category = str(payload.get("target_category") or payload.get("targetCategory") or "both") | |
| gender_preference = str(payload.get("gender_preference") or payload.get("genderPreference") or "any") | |
| filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {} | |
| max_results = int(payload.get("max_results") or payload.get("maxResults") or 8) | |
| store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) | |
| if max_results < 1: | |
| raise HTTPException(status_code=400, detail="max_results must be at least 1") | |
| try: | |
| return _build_shopping_suggestions_from_scraper( | |
| occasion=occasion, | |
| target_category=target_category, | |
| gender_preference=gender_preference, | |
| filters=filters, | |
| max_results=max_results, | |
| store=store, | |
| ) | |
| except NvidiaGatewayError as exc: | |
| raise HTTPException(status_code=502, detail=str(exc)) from exc | |
| except NvidiaPayloadError as exc: | |
| raise HTTPException(status_code=502, detail=str(exc)) from exc | |
| except requests.RequestException as exc: | |
| raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc | |
| def scraper_recommend(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| user_prompt = str(payload.get("user_prompt") or payload.get("prompt") or "").strip() | |
| inferred = _infer_structured_request_from_prompt(user_prompt) | |
| inferred_target_category = _normalize_target_category(inferred.get("target_category")) | |
| occasion = str(payload.get("occasion") or inferred.get("occasion") or "casual") | |
| if _norm(occasion) in {"", "auto", "any"}: | |
| occasion = str(inferred.get("occasion") or "casual") | |
| gender = str(payload.get("gender") or inferred.get("gender") or "") | |
| payload_target_category = _normalize_target_category( | |
| payload.get("target_category") or payload.get("targetCategory") or "both" | |
| ) | |
| target_category = ( | |
| inferred_target_category | |
| if inferred_target_category in {"topwear", "bottomwear"} | |
| else payload_target_category | |
| if payload_target_category in {"topwear", "bottomwear"} | |
| else "both" | |
| ) | |
| filters = payload.get("filters") if isinstance(payload.get("filters"), dict) else {} | |
| inferred_colors = inferred.get("preferred_colors") if isinstance(inferred.get("preferred_colors"), list) else [] | |
| inferred_include = inferred.get("include_keywords") if isinstance(inferred.get("include_keywords"), list) else [] | |
| inferred_exclude = inferred.get("exclude_keywords") if isinstance(inferred.get("exclude_keywords"), list) else [] | |
| filters = { | |
| **filters, | |
| "preferred_colors": [ | |
| *([str(value) for value in (filters.get("preferred_colors") or []) if str(value).strip()]), | |
| *([str(value) for value in inferred_colors if str(value).strip()]), | |
| ], | |
| "include_keywords": [ | |
| *([str(value) for value in (filters.get("include_keywords") or []) if str(value).strip()]), | |
| *([str(value) for value in inferred_include if str(value).strip()]), | |
| ], | |
| "exclude_keywords": [ | |
| *([str(value) for value in (filters.get("exclude_keywords") or []) if str(value).strip()]), | |
| *([str(value) for value in inferred_exclude if str(value).strip()]), | |
| ], | |
| } | |
| preference_parts = [str(payload.get("preferences") or "").strip(), user_prompt] | |
| preferences = ", ".join(part for part in preference_parts if part) | |
| max_products_raw = payload.get("max_products") | |
| max_products = int(max_products_raw) if max_products_raw not in {None, ""} else None | |
| store = _normalize_store_name(str(payload.get("store") or SCRAPER_DEFAULT_STORE or "nike")) | |
| if isinstance(max_products, int) and max_products < 1: | |
| raise HTTPException(status_code=400, detail="max_products must be at least 1") | |
| try: | |
| return _generate_scraper_plan_with_nemotron( | |
| occasion=occasion, | |
| gender=gender, | |
| preferences=preferences, | |
| user_prompt=user_prompt, | |
| target_category=target_category, | |
| filters=filters, | |
| max_products=max_products, | |
| store=store, | |
| strict_nemotron=True, | |
| ) | |
| except NvidiaGatewayError as exc: | |
| raise HTTPException(status_code=502, detail=str(exc)) from exc | |
| except NvidiaPayloadError as exc: | |
| raise HTTPException(status_code=502, detail=str(exc)) from exc | |
| except requests.RequestException as exc: | |
| raise HTTPException(status_code=502, detail=f"Failed to fetch {store.title()} pages: {exc}") from exc | |
| def scraper_page() -> Response: | |
| wardrobe_snapshot = _wardrobe_metadata_snapshot(limit=12) | |
| wardrobe_json = html_lib.escape(json.dumps(wardrobe_snapshot, ensure_ascii=True, indent=2)) | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="utf-8" /> | |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> | |
| <title>Wardrobe Assistant Child</title> | |
| <style> | |
| :root {{ | |
| --bg: #0f172a; | |
| --panel: #111827; | |
| --panel-2: #1f2937; | |
| --text: #e5e7eb; | |
| --muted: #9ca3af; | |
| --accent: #22c55e; | |
| --accent-2: #38bdf8; | |
| --border: rgba(255,255,255,0.08); | |
| }} | |
| * {{ box-sizing: border-box; }} | |
| body {{ margin: 0; font-family: Arial, sans-serif; background: radial-gradient(circle at top, #1e293b, #020617 70%); color: var(--text); }} | |
| .wrap {{ max-width: 1200px; margin: 0 auto; padding: 24px; }} | |
| .hero {{ display: grid; gap: 16px; grid-template-columns: 1.4fr 1fr; align-items: start; }} | |
| .card {{ background: rgba(17,24,39,0.88); border: 1px solid var(--border); border-radius: 20px; padding: 20px; backdrop-filter: blur(10px); box-shadow: 0 20px 60px rgba(0,0,0,0.28); }} | |
| h1 {{ margin: 0 0 8px; font-size: clamp(28px, 4vw, 46px); }} | |
| p {{ line-height: 1.6; color: var(--muted); }} | |
| label {{ display: block; margin: 14px 0 6px; font-size: 13px; color: #cbd5e1; }} | |
| input, select, textarea, button {{ width: 100%; border-radius: 14px; border: 1px solid var(--border); background: rgba(15,23,42,0.9); color: var(--text); padding: 12px 14px; font-size: 14px; }} | |
| textarea {{ min-height: 110px; resize: vertical; }} | |
| button {{ background: linear-gradient(135deg, var(--accent), var(--accent-2)); color: #fff; border: 0; font-weight: 700; cursor: pointer; margin-top: 16px; }} | |
| .grid {{ display: grid; gap: 16px; grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); margin-top: 18px; }} | |
| .muted {{ color: var(--muted); font-size: 13px; }} | |
| .products {{ display: grid; gap: 14px; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); margin-top: 18px; }} | |
| .product {{ background: rgba(15,23,42,0.8); border: 1px solid var(--border); border-radius: 18px; overflow: hidden; }} | |
| .product img {{ width: 100%; aspect-ratio: 1 / 1; object-fit: cover; display: block; background: #0b1220; }} | |
| .product .body {{ padding: 12px; }} | |
| .pill {{ display: inline-block; padding: 4px 10px; border-radius: 999px; background: rgba(56,189,248,0.12); color: #7dd3fc; font-size: 12px; margin-right: 8px; margin-bottom: 8px; }} | |
| pre {{ white-space: pre-wrap; word-break: break-word; background: rgba(2,6,23,0.9); padding: 14px; border-radius: 16px; border: 1px solid var(--border); overflow: auto; }} | |
| a {{ color: #93c5fd; }} | |
| .status {{ margin-top: 12px; color: #d1fae5; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="wrap"> | |
| <div class="hero"> | |
| <div class="card"> | |
| <h1>Wardrobe Assistant Child</h1> | |
| <p>Nemotron reads wardrobe metadata, builds a context-aware shopping query, and returns matching products with links, names, prices, and images.</p> | |
| <div class="grid"> | |
| <div> | |
| <label for="occasion">Occasion</label> | |
| <select id="occasion"> | |
| <option>casual</option> | |
| <option>formal</option> | |
| <option>work</option> | |
| <option>party</option> | |
| <option>sports</option> | |
| <option>ethnic</option> | |
| </select> | |
| </div> | |
| <div> | |
| <label for="gender">Gender</label> | |
| <select id="gender"> | |
| <option value="">auto</option> | |
| <option>men</option> | |
| <option>women</option> | |
| <option>unisex</option> | |
| </select> | |
| </div> | |
| <input type="hidden" id="store" value="nike" /> | |
| <div> | |
| <label for="max_products">Max Products</label> | |
| <input id="max_products" type="number" min="1" max="30" value="12" /> | |
| </div> | |
| </div> | |
| <label for="preferences">Other Preferences</label> | |
| <textarea id="preferences" placeholder="Example: formal office look, breathable fabric, neutral tones, regular fit, avoid oversized silhouettes"></textarea> | |
| <button id="runBtn">Generate Nemotron Query and Scrape</button> | |
| <div id="status" class="status"></div> | |
| </div> | |
| <div class="card"> | |
| <h2 style="margin-top:0;">Wardrobe Metadata Snapshot</h2> | |
| <p class="muted">Current items loaded from the database are used by Nemotron to shape the shopping query.</p> | |
| <pre id="wardrobeSnapshot">{wardrobe_json}</pre> | |
| </div> | |
| </div> | |
| <div class="card" style="margin-top:16px;"> | |
| <h2 style="margin-top:0;">Nemotron Query Plan</h2> | |
| <pre id="queryPlan">Run the search to generate a wardrobe-aware query.</pre> | |
| </div> | |
| <div class="card" style="margin-top:16px;"> | |
| <h2 style="margin-top:0;">Results</h2> | |
| <div id="products" class="products"></div> | |
| </div> | |
| </div> | |
| <script> | |
| const runBtn = document.getElementById('runBtn'); | |
| const statusEl = document.getElementById('status'); | |
| const planEl = document.getElementById('queryPlan'); | |
| const productsEl = document.getElementById('products'); | |
| function escapeHtml(value) {{ | |
| return String(value || '') | |
| .replaceAll('&', '&') | |
| .replaceAll('<', '<') | |
| .replaceAll('>', '>') | |
| .replaceAll('"', '"') | |
| .replaceAll("'", '''); | |
| }} | |
| function renderProducts(products) {{ | |
| if (!products || !products.length) {{ | |
| productsEl.innerHTML = '<p class="muted">No products found.</p>'; | |
| return; | |
| }} | |
| productsEl.innerHTML = products.map((product) => ` | |
| <div class="product"> | |
| <img src="${{escapeHtml(product.image_url || '')}}" alt="${{escapeHtml(product.name || 'Product')}}" /> | |
| <div class="body"> | |
| <div class="pill">${{escapeHtml(product.price || 'N/A')}}</div> | |
| <h3 style="margin:8px 0 6px; font-size:16px;">${{escapeHtml(product.name || 'Unnamed product')}}</h3> | |
| <div class="muted" style="margin-bottom:10px;">${{escapeHtml(product.item_link || '')}}</div> | |
| <a href="${{escapeHtml(product.item_link || '#')}}" target="_blank" rel="noreferrer">Open product</a> | |
| </div> | |
| </div> | |
| `).join(''); | |
| }} | |
| runBtn.addEventListener('click', async () => {{ | |
| statusEl.textContent = 'Generating query with Nemotron and scraping products...'; | |
| productsEl.innerHTML = ''; | |
| try {{ | |
| const payload = {{ | |
| occasion: document.getElementById('occasion').value, | |
| gender: document.getElementById('gender').value, | |
| store: 'nike', | |
| preferences: document.getElementById('preferences').value, | |
| max_products: Number(document.getElementById('max_products').value || 12), | |
| }}; | |
| const response = await fetch('/scraper/recommend', {{ | |
| method: 'POST', | |
| headers: {{ 'Content-Type': 'application/json' }}, | |
| body: JSON.stringify(payload), | |
| }}); | |
| const data = await response.json(); | |
| if (!response.ok) {{ | |
| throw new Error(data.detail || 'Request failed'); | |
| }} | |
| planEl.textContent = JSON.stringify({{ | |
| query_plan: data.query_plan || {{}}, | |
| intermediate_steps: data.intermediate_steps || [], | |
| }}, null, 2); | |
| renderProducts(data.products || []); | |
| statusEl.textContent = `Loaded ${{data.count || 0}} products. Saved JSON: ${{data.saved_json_path || 'not saved'}}`; | |
| }} catch (error) {{ | |
| statusEl.textContent = `Error: ${{error.message}}`; | |
| planEl.textContent = 'Query planning failed.'; | |
| }} | |
| }}); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| return Response(content=html_content, media_type="text/html") | |
| async def classify(image: UploadFile = File(...)) -> dict[str, str]: | |
| if not image.filename: | |
| raise HTTPException(status_code=400, detail="No image selected") | |
| try: | |
| raw = await image.read() | |
| pil_image = Image.open(io.BytesIO(raw)).convert("RGB") | |
| pil_image.thumbnail((512, 512)) | |
| model_text = run_nvidia_inference(pil_image, CLASSIFICATION_PROMPT) | |
| return normalize_specs(parse_json_from_text(model_text)) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| _raise_http_error(e) | |
| async def upload(image: UploadFile = File(...)) -> dict[str, Any]: | |
| if not image.filename: | |
| raise HTTPException(status_code=400, detail="No image selected") | |
| try: | |
| raw = await image.read() | |
| pil_image = Image.open(io.BytesIO(raw)).convert("RGB") | |
| pil_image.thumbnail((512, 512)) | |
| model_text = run_nvidia_inference(pil_image, CLASSIFICATION_PROMPT) | |
| specs = normalize_specs(parse_json_from_text(model_text)) | |
| item_id = str(uuid.uuid4()) | |
| raw_record = { | |
| "id": item_id, | |
| "image_url": f"memory://{item_id}", | |
| "description": specs, | |
| "created_at": _now_iso(), | |
| } | |
| normalized = _normalize_wardrobe_item(raw_record) | |
| return item_insert(normalized) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| _raise_http_error(e) | |
| def get_items() -> dict[str, list[dict[str, Any]]]: | |
| return {"items": item_get_all()} | |
| def delete_item_endpoint(item_id: str) -> dict[str, Any]: | |
| if not item_delete(item_id): | |
| raise HTTPException(status_code=404, detail="Item not found") | |
| return {"success": True, "id": item_id} | |
| def update_item_endpoint( | |
| item_id: str, | |
| payload: dict[str, Any] = Body(default_factory=dict), | |
| ) -> dict[str, Any]: | |
| updated = item_update(item_id, payload) | |
| if updated is None: | |
| raise HTTPException(status_code=404, detail="Item not found") | |
| return updated | |
| def post_feedback(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| top_id = str(payload.get("top_id") or "") | |
| bottom_id = str(payload.get("bottom_id") or "") | |
| action = str(payload.get("action") or "") | |
| occasion = str(payload.get("occasion") or "casual") | |
| score = payload.get("score") | |
| if not top_id or not bottom_id: | |
| raise HTTPException(status_code=400, detail="top_id and bottom_id required") | |
| if action not in {"wear", "skip", "save"}: | |
| raise HTTPException(status_code=400, detail="action must be 'wear', 'skip', or 'save'") | |
| if item_get(top_id) is None: | |
| raise HTTPException(status_code=404, detail=f"top item {top_id} not found") | |
| if item_get(bottom_id) is None: | |
| raise HTTPException(status_code=404, detail=f"bottom item {bottom_id} not found") | |
| return feedback_record(top_id, bottom_id, action, occasion, score) | |
| def ai_score_outfit(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| top = payload.get("top") | |
| bottom = payload.get("bottom") | |
| other = payload.get("other") | |
| occasion = str(payload.get("occasion") or "casual") | |
| weather = payload.get("weather") if isinstance(payload.get("weather"), dict) else None | |
| user_profile = payload.get("user_profile") if isinstance(payload.get("user_profile"), dict) else None | |
| region = str(payload.get("region") or "global") | |
| if not isinstance(top, dict) or not isinstance(bottom, dict): | |
| raise HTTPException(status_code=400, detail="Both 'top' and 'bottom' are required") | |
| nt = _normalize_wardrobe_item(top) | |
| nb = _normalize_wardrobe_item(bottom) | |
| no = _normalize_wardrobe_item(other) if isinstance(other, dict) else None | |
| engine_version = OUTFIT_FALLBACK_SCORER_NAME | |
| try: | |
| ai_payload = _recommend_outfits_with_ai_grid( | |
| wardrobe_items=[item for item in [nt, nb, no] if item is not None], | |
| occasion=occasion, | |
| top_selected=nt, | |
| bottom_selected=nb, | |
| other_selected=no, | |
| weather=weather, | |
| user_profile=user_profile, | |
| region=region, | |
| top_k=1, | |
| case_name="D", | |
| ) | |
| res = ai_payload.get("selected_outfit_score") | |
| if not isinstance(res, dict): | |
| raise NvidiaPayloadError(f"AI scorer returned no selected outfit score: {ai_payload}") | |
| engine_version = str(ai_payload.get("engine_version") or OUTFIT_AI_SCORER_NAME) | |
| except Exception as exc: | |
| print(f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} single-pair failed reason={exc!r}") | |
| try: | |
| res = get_recommendation_service().score_outfit( | |
| top=nt, | |
| bottom=nb, | |
| other=no, | |
| occasion=occasion, | |
| weather=weather, | |
| user_profile=user_profile, | |
| region=region, | |
| ) | |
| print(f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} source=fashion_ai single-pair") | |
| engine_version = str(res.get("engine_version") or OUTFIT_FALLBACK_SCORER_NAME) | |
| except Exception as fallback_exc: | |
| print( | |
| f"[outfit-scoring] algo={OUTFIT_FALLBACK_SCORER_NAME} " | |
| f"source=scoring-v2 single-pair reason={fallback_exc!r}" | |
| ) | |
| res = score_pair_full(nt, nb, occasion, other=no) | |
| engine_version = "scoring-v2" | |
| return { | |
| "score": res["score"], | |
| "color_score": res["breakdown"]["color"], | |
| "style_score": res["breakdown"]["style"], | |
| "occasion_score": res["breakdown"]["occasion"], | |
| "fit_score": res["breakdown"]["fit"], | |
| "pattern_score": res["breakdown"]["pattern"], | |
| "season_score": res["breakdown"].get("season", 0), | |
| "reason": res["reason"], | |
| "tip": res["tip"], | |
| "engine_version": engine_version, | |
| } | |
| def ai_gap_analysis(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| wardrobe_raw = payload.get("wardrobe", []) | |
| occasion = str(payload.get("occasion") or "casual") | |
| if not isinstance(wardrobe_raw, list): | |
| raise HTTPException(status_code=400, detail="'wardrobe' must be a list") | |
| wardrobe = [_normalize_wardrobe_item(i if isinstance(i, dict) else {}) for i in wardrobe_raw] | |
| return {"suggestions": _gap_suggestions(wardrobe, occasion)} | |
| def ai_recommend_outfits(payload: dict[str, Any] = Body(default_factory=dict)) -> dict[str, Any]: | |
| occasion = str(payload.get("occasion") or "casual") | |
| wardrobe_items_raw = payload.get("wardrobe_items", []) | |
| top_selected_raw = payload.get("top_selected") | |
| bottom_selected_raw = payload.get("bottom_selected") | |
| other_selected_raw = payload.get("other_selected") | |
| weather = payload.get("weather") if isinstance(payload.get("weather"), dict) else None | |
| user_profile = payload.get("user_profile") if isinstance(payload.get("user_profile"), dict) else None | |
| region = str(payload.get("region") or "global") | |
| top_k = int(payload.get("top_k") or 5) | |
| candidate_pool = int(payload.get("candidate_pool") or 24) | |
| diversity_lambda = float(payload.get("diversity_lambda") or 0.28) | |
| cache_category = _normalize_cache_category(payload.get("cache_category")) | |
| wardrobe_hash = str(payload.get("wardrobe_hash") or "").strip() | |
| lock_signature = _build_lock_signature_from_payload(payload) | |
| if wardrobe_items_raw is None: | |
| wardrobe_items_raw = [] | |
| if not isinstance(wardrobe_items_raw, list): | |
| raise HTTPException(status_code=400, detail="'wardrobe_items' must be a list") | |
| wardrobe_items = [_normalize_wardrobe_item(i if isinstance(i, dict) else {}) for i in wardrobe_items_raw] | |
| if not wardrobe_items: | |
| wardrobe_items = [_normalize_wardrobe_item(i) for i in item_get_all()] | |
| if not wardrobe_items: | |
| return { | |
| "occasion": occasion, "case": "A", | |
| "selected_outfit_score": None, "recommendations": [], | |
| "total_combinations_checked": 0, | |
| "notice": "Your wardrobe is empty. Add garments to get outfit recommendations.", | |
| "engine_version": "scoring-v2", | |
| } | |
| is_wedding = str(occasion or "").strip().lower() == "wedding" | |
| if is_wedding: | |
| others_only = [ | |
| i for i in wardrobe_items | |
| if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"} | |
| ] | |
| if others_only: | |
| wedding_filtered = _filter_garments_for_wedding(others_only) | |
| for item in wedding_filtered: | |
| item["type"] = "others" | |
| wardrobe_items = wedding_filtered | |
| else: | |
| wardrobe_items = [] | |
| top_selected_raw = None | |
| bottom_selected_raw = None | |
| top_selected = _normalize_wardrobe_item(top_selected_raw) if isinstance(top_selected_raw, dict) else None | |
| bottom_selected = _normalize_wardrobe_item(bottom_selected_raw) if isinstance(bottom_selected_raw, dict) else None | |
| other_selected = _normalize_wardrobe_item(other_selected_raw) if isinstance(other_selected_raw, dict) else None | |
| cache_key: str | None = None | |
| if cache_category and wardrobe_hash: | |
| cache_user_id = _extract_cache_user_id(payload, wardrobe_items) | |
| cache_key = _build_matching_cache_key( | |
| cache_user_id, | |
| cache_category, | |
| occasion, | |
| wardrobe_hash, | |
| lock_signature, | |
| ) | |
| cached_payload = _matching_cache_get(cache_key) | |
| if cached_payload is not None: | |
| print(f"[matching-cache] hit key={cache_key}") | |
| return cached_payload | |
| print(f"[matching-cache] miss key={cache_key}") | |
| def _maybe_cache(result: dict[str, Any]) -> dict[str, Any]: | |
| if cache_key: | |
| _matching_cache_set(cache_key, result) | |
| print(f"[matching-cache] store key={cache_key}") | |
| return result | |
| all_others = [ | |
| i | |
| for i in wardrobe_items | |
| if i.get("type") == "others" or i.get("type") not in {"topwear", "bottomwear"} | |
| ] | |
| if is_wedding: | |
| if other_selected and ( | |
| other_selected.get("type") == "others" | |
| or other_selected.get("type") not in {"topwear", "bottomwear"} | |
| ): | |
| return _maybe_cache(_fallback_rule_recommendations( | |
| occasion=occasion, | |
| case_name="E", | |
| tops=[], | |
| bottoms=[], | |
| others=[other_selected], | |
| top_k=max(1, min(top_k, 20)), | |
| include_pair_outfits=False, | |
| include_other_outfits=True, | |
| )) | |
| if all_others: | |
| return _maybe_cache(_fallback_rule_recommendations( | |
| occasion=occasion, | |
| case_name="E", | |
| tops=[], | |
| bottoms=[], | |
| others=all_others, | |
| top_k=max(1, min(top_k, 20)), | |
| include_pair_outfits=False, | |
| include_other_outfits=True, | |
| )) | |
| return _maybe_cache({ | |
| "occasion": occasion, "case": "A", | |
| "selected_outfit_score": None, "recommendations": [], | |
| "total_combinations_checked": 0, | |
| "notice": "Add at least one item in Others to get wedding recommendations.", | |
| "engine_version": "scoring-v2", | |
| }) | |
| if other_selected and (other_selected.get("type") == "others" or other_selected.get("type") not in {"topwear", "bottomwear"}): | |
| return _maybe_cache(_fallback_rule_recommendations( | |
| occasion=occasion, | |
| case_name="E", | |
| tops=[], | |
| bottoms=[], | |
| others=[other_selected], | |
| top_k=max(1, min(top_k, 20)), | |
| include_pair_outfits=False, | |
| include_other_outfits=True, | |
| )) | |
| tops, bottoms, others, _, _ = _resolve_outfit_grid_sources( | |
| wardrobe_items=wardrobe_items, | |
| occasion=occasion, | |
| top_selected=top_selected, | |
| bottom_selected=bottom_selected, | |
| other_selected=other_selected, | |
| ) | |
| if not tops or not bottoms: | |
| if all_others: | |
| return _maybe_cache(_fallback_rule_recommendations( | |
| occasion=occasion, | |
| case_name="E", | |
| tops=[], | |
| bottoms=[], | |
| others=all_others, | |
| top_k=max(1, min(top_k, 20)), | |
| include_pair_outfits=False, | |
| include_other_outfits=True, | |
| )) | |
| return _maybe_cache({ | |
| "occasion": occasion, "case": "A", | |
| "selected_outfit_score": None, "recommendations": [], | |
| "total_combinations_checked": 0, | |
| "notice": "You need at least one compatible topwear and one compatible bottomwear item to generate outfits.", | |
| "engine_version": "scoring-v2", | |
| }) | |
| case_name = "A" | |
| if top_selected and not bottom_selected: | |
| case_name = "B" | |
| elif bottom_selected and not top_selected: | |
| case_name = "C" | |
| elif top_selected and bottom_selected: | |
| case_name = "D" | |
| top_k = max(1, min(top_k, 20)) | |
| candidate_pool = max(4, min(candidate_pool, 64)) | |
| diversity_lambda = max(0.0, min(diversity_lambda, 0.95)) | |
| priority_other_candidates = ( | |
| all_others | |
| if not top_selected and not bottom_selected and not other_selected | |
| else others | |
| ) | |
| try: | |
| ai_result = _recommend_outfits_with_ai_grid( | |
| wardrobe_items=wardrobe_items, | |
| occasion=occasion, | |
| top_selected=top_selected, | |
| bottom_selected=bottom_selected, | |
| other_selected=other_selected, | |
| weather=weather, | |
| user_profile=user_profile, | |
| region=region, | |
| top_k=top_k, | |
| case_name=case_name, | |
| ) | |
| return _maybe_cache(_merge_standalone_others_for_priority_occasions( | |
| result=ai_result, | |
| occasion=occasion, | |
| others=priority_other_candidates, | |
| top_k=top_k, | |
| )) | |
| except Exception as exc: | |
| print(f"[outfit-scoring] algo={OUTFIT_AI_SCORER_NAME} failed reason={exc!r}") | |
| return _maybe_cache(_current_fallback_recommendations( | |
| wardrobe_items=wardrobe_items, | |
| occasion=occasion, | |
| top_selected=top_selected, | |
| bottom_selected=bottom_selected, | |
| other_selected=other_selected, | |
| weather=weather, | |
| user_profile=user_profile, | |
| region=region, | |
| top_k=top_k, | |
| candidate_pool=candidate_pool, | |
| diversity_lambda=diversity_lambda, | |
| case_name=case_name, | |
| tops=tops, | |
| bottoms=bottoms, | |
| others=priority_other_candidates, | |
| )) | |
| def image_proxy(url: str = Query(..., description="Remote image URL")) -> Response: | |
| parsed = urlparse(url) | |
| if parsed.scheme not in {"http", "https"}: | |
| raise HTTPException(status_code=400, detail="Invalid image url") | |
| try: | |
| req = Request(url, headers={"User-Agent": "Mozilla/5.0", "Accept": "image/*,*/*;q=0.8", "Referer": url}) | |
| with urlopen(req, timeout=12) as resp: | |
| content = resp.read() | |
| content_type = resp.headers.get("Content-Type", "image/jpeg") | |
| return Response(content=content, media_type=content_type, headers={"Cache-Control": "public, max-age=3600"}) | |
| except Exception as e: | |
| raise HTTPException(status_code=502, detail=f"Failed to fetch image: {e}") from e | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |