Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| import json | |
| import os | |
| import re | |
| from typing import Any, Callable, Optional | |
| from urllib.parse import parse_qs, urlencode, urlparse | |
| import requests | |
| from bs4 import BeautifulSoup | |
| def _env_int(name: str, default: int) -> int: | |
| raw = os.getenv(name) | |
| if raw is None or str(raw).strip() == "": | |
| return default | |
| try: | |
| return int(str(raw).strip()) | |
| except (TypeError, ValueError): | |
| return default | |
| ZALANDO_BASE_URL = "https://www.zalando.co.uk" | |
| APIFY_ACTOR_ENDPOINT = os.getenv( | |
| "APIFY_ACTOR_ENDPOINT", | |
| "https://api.apify.com/v2/acts/vistics~zalando-scraper/run-sync-get-dataset-items", | |
| ) | |
| APIFY_TOKEN = os.getenv("APIFY_API_TOKEN", "").strip() | |
| APIFY_MAX_RESULTS = 20 | |
| APIFY_MIN_TIMEOUT_SECONDS = max(60, _env_int("APIFY_MIN_TIMEOUT_SECONDS", 180)) | |
| APIFY_WAIT_FOR_FINISH_SECONDS = max(60, _env_int("APIFY_WAIT_FOR_FINISH_SECONDS", 300)) | |
| HTML_FALLBACK_TIMEOUT_SECONDS = max(20, _env_int("ZALANDO_HTML_TIMEOUT_SECONDS", 45)) | |
| REQUEST_HEADERS = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/124.0.0.0 Safari/537.36" | |
| ) | |
| } | |
| if not logging.getLogger().handlers: | |
| logging.basicConfig( | |
| level=os.getenv("LOG_LEVEL", "INFO").upper(), | |
| format="%(asctime)s %(levelname)s %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO)) | |
| CATEGORY_PATH_MAP = { | |
| "topwear": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"}, | |
| "bottomwear": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"}, | |
| "layers": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"}, | |
| "dress": {"women": "womens-clothing-dresses", "men": "mens-clothing", "unisex": "clothing"}, | |
| "dresses": {"women": "womens-clothing-dresses", "men": "mens-clothing", "unisex": "clothing"}, | |
| "shoes": {"women": "womens-shoes", "men": "mens-shoes", "unisex": "shoes"}, | |
| "footwear": {"women": "womens-shoes", "men": "mens-shoes", "unisex": "shoes"}, | |
| "sportswear": {"women": "womens-sports", "men": "mens-sports", "unisex": "sports"}, | |
| } | |
| _COLOR_TERMS = [ | |
| "black", | |
| "white", | |
| "navy", | |
| "blue", | |
| "grey", | |
| "gray", | |
| "beige", | |
| "olive", | |
| "green", | |
| "brown", | |
| "khaki", | |
| "cream", | |
| "maroon", | |
| "charcoal", | |
| "tan", | |
| "red", | |
| "pink", | |
| "purple", | |
| "yellow", | |
| "orange", | |
| ] | |
| _COLOR_QUERY_KEYWORDS: dict[str, set[str]] = { | |
| "black": {"black"}, | |
| "white": {"white", "bright white", "off white", "off-white"}, | |
| "navy": {"navy", "dark blue", "dk blue", "dress blues", "moonlit ocean", "midnight blue"}, | |
| "blue": {"blue", "navy", "dark blue", "dk blue", "dress blues", "ice blue", "light blue", "skyway", "moonlit ocean"}, | |
| "grey": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, | |
| "gray": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, | |
| "beige": {"beige", "sand", "tan", "stone", "morel"}, | |
| "brown": {"brown", "tan", "morel"}, | |
| "olive": {"olive", "khaki"}, | |
| "green": {"green", "olive", "khaki"}, | |
| "red": {"red", "brick red", "winetasting", "wine"}, | |
| "maroon": {"maroon", "burgundy", "wine", "winetasting"}, | |
| } | |
| _CATEGORY_QUERY_KEYWORDS: dict[str, set[str]] = { | |
| "shirt": {"shirt", "formal shirt"}, | |
| "polo": {"polo"}, | |
| "jacket": {"jacket", "blazer", "coat"}, | |
| "trousers": {"trousers", "pants", "chinos"}, | |
| "pants": {"pants", "trousers", "chinos"}, | |
| "shorts": {"shorts"}, | |
| "jeans": {"jeans"}, | |
| } | |
| ScrapePostprocessFn = Callable[[list[dict[str, str]]], list[dict[str, str]]] | |
| WardrobeSummary = dict[str, Any] | |
| TextCompletionFn = Callable[[str, int], str] | |
| def _norm(value: Any) -> str: | |
| return str(value or "").strip().lower() | |
| def _query_from_search_url(search_url: str) -> str: | |
| parsed = urlparse(str(search_url or "")) | |
| values = parse_qs(parsed.query).get("q") or [] | |
| return str(values[0] if values else "").strip() | |
| def _query_color_keywords(query: str) -> set[str]: | |
| normalized = _norm(query) | |
| for color in _COLOR_TERMS: | |
| if color in normalized: | |
| return _COLOR_QUERY_KEYWORDS.get(color, {color}) | |
| return set() | |
| def _query_category_keywords(query: str) -> set[str]: | |
| normalized = _norm(query) | |
| for category, keywords in _CATEGORY_QUERY_KEYWORDS.items(): | |
| if category in normalized: | |
| return keywords | |
| return set() | |
| def _product_match_text(product: dict[str, str]) -> str: | |
| return _norm( | |
| " ".join( | |
| [ | |
| str(product.get("name") or ""), | |
| str(product.get("color") or ""), | |
| str(product.get("brand") or ""), | |
| str(product.get("item_link") or ""), | |
| ] | |
| ) | |
| ) | |
| def _filter_products_for_search_query(products: list[dict[str, str]], search_url: str) -> list[dict[str, str]]: | |
| query = _query_from_search_url(search_url) | |
| color_keywords = _query_color_keywords(query) | |
| category_keywords = _query_category_keywords(query) | |
| if not color_keywords and not category_keywords: | |
| return products | |
| filtered: list[dict[str, str]] = [] | |
| for product in products: | |
| text = _product_match_text(product) | |
| if color_keywords and not any(keyword in text for keyword in color_keywords): | |
| continue | |
| if category_keywords and not any(keyword in text for keyword in category_keywords): | |
| continue | |
| filtered.append(product) | |
| return filtered | |
| def _normalize_target_category(value: Any) -> str: | |
| normalized = _norm(value) | |
| if normalized in {"topwear", "top", "upper", "tops"}: | |
| return "topwear" | |
| if normalized in {"bottomwear", "bottom", "lower", "bottoms"}: | |
| return "bottomwear" | |
| return "both" | |
| def _extract_price_text(value: Any) -> str: | |
| text = str(value or "").strip() | |
| if not text: | |
| return "N/A" | |
| match = re.search(r"([\u00a3$€]\s?\d+[\d,]*(?:\.\d{2})?)", text) | |
| if match: | |
| return match.group(1).replace(" ", "") | |
| return text | |
| def _extract_src_from_srcset(srcset: str) -> str: | |
| if not srcset: | |
| return "" | |
| first = srcset.split(",")[0].strip() | |
| return first.split(" ")[0].strip() | |
| def _ensure_zalando_url(value: str) -> str: | |
| href = str(value or "").strip() | |
| if not href: | |
| return "" | |
| if href.startswith("//"): | |
| return f"https:{href}" | |
| if href.startswith("/"): | |
| return f"{ZALANDO_BASE_URL}{href}" | |
| return href | |
| def _format_apify_money(raw_value: Any, currency_symbol: str) -> str: | |
| text = str(raw_value or "").strip() | |
| if not text: | |
| return "" | |
| normalized = text.replace(",", "") | |
| # Apify commonly returns minor units like 5999 => 59.99 | |
| if re.fullmatch(r"\d+", normalized): | |
| major = int(normalized) // 100 | |
| minor = int(normalized) % 100 | |
| return f"{currency_symbol}{major}.{minor:02d}" if currency_symbol else f"{major}.{minor:02d}" | |
| match = re.search(r"\d+(?:\.\d{1,2})?", normalized) | |
| if not match: | |
| return "" | |
| return f"{currency_symbol}{match.group(0)}" if currency_symbol else match.group(0) | |
| def summarize_wardrobe_metadata(wardrobe_items: list[dict[str, Any]]) -> WardrobeSummary: | |
| items = [item for item in wardrobe_items if isinstance(item, dict)] | |
| colors: dict[str, int] = {} | |
| types: dict[str, int] = {} | |
| categories: dict[str, int] = {} | |
| fabrics: dict[str, int] = {} | |
| fits: dict[str, int] = {} | |
| occasions: dict[str, int] = {} | |
| for item in items: | |
| description = item.get("description") if isinstance(item.get("description"), dict) else {} | |
| color = str(item.get("color") or description.get("color") or "").strip().lower() | |
| garment_type = str(item.get("type") or description.get("type") or "").strip().lower() | |
| category = str(item.get("category") or description.get("category") or "").strip().lower() | |
| fabric = str(item.get("fabric") or description.get("fabric") or "").strip().lower() | |
| fit = str(item.get("fit") or description.get("fit") or "").strip().lower() | |
| occasion = str(item.get("occasion") or description.get("occasion") or description.get("style") or "").strip().lower() | |
| if color: | |
| colors[color] = colors.get(color, 0) + 1 | |
| if garment_type: | |
| types[garment_type] = types.get(garment_type, 0) + 1 | |
| if category: | |
| categories[category] = categories.get(category, 0) + 1 | |
| if fabric: | |
| fabrics[fabric] = fabrics.get(fabric, 0) + 1 | |
| if fit: | |
| fits[fit] = fits.get(fit, 0) + 1 | |
| if occasion: | |
| occasions[occasion] = occasions.get(occasion, 0) + 1 | |
| def top_values(counter: dict[str, int], limit: int = 8) -> list[dict[str, Any]]: | |
| return [ | |
| {"value": key, "count": count} | |
| for key, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:limit] | |
| ] | |
| return { | |
| "total_items": len(items), | |
| "colors": top_values(colors), | |
| "types": top_values(types), | |
| "categories": top_values(categories), | |
| "fabrics": top_values(fabrics), | |
| "fits": top_values(fits), | |
| "occasions": top_values(occasions), | |
| } | |
| def _count_query_signals(query: str, requested_category: str | None = None) -> dict[str, bool]: | |
| normalized = _norm(query) | |
| has_color = any(color in normalized for color in _COLOR_TERMS) | |
| requested = _norm(requested_category) | |
| has_type = bool(requested and requested not in {"both", "all"}) or any( | |
| token in normalized for token in [ | |
| "trouser", "trousers", "pants", "jeans", "shorts", "joggers", "skirt", "dress", | |
| "topwear", "bottomwear", "shirt", "tee", "blouse", "polo", "hoodie", "jacket", | |
| "sweater", "blazer", "t-shirt", "tank", "leggings", | |
| ] | |
| ) | |
| has_style = any(token in normalized for token in [ | |
| "slim", "regular", "relaxed", "oversized", "tailored", "smart", "casual", "formal", | |
| "party", "work", "interview", "weekend", "minimal", "structured", "clean", | |
| ]) | |
| has_fit = any(token in normalized for token in ["slim-fit", "slim fit", "regular-fit", "regular fit", "relaxed-fit", "relaxed fit"]) | |
| return { | |
| "has_color": has_color, | |
| "has_type": has_type, | |
| "has_style": has_style or has_fit, | |
| } | |
| def is_underspecified_query(query: str, requested_category: str | None = None) -> bool: | |
| signals = _count_query_signals(query, requested_category=requested_category) | |
| explicit_signal_count = sum(1 for value in signals.values() if value) | |
| vague_tokens = { | |
| "some", | |
| "something", | |
| "stuff", | |
| "nice", | |
| "good", | |
| "recommend", | |
| "suggest", | |
| "maybe", | |
| "outfit", | |
| "look", | |
| } | |
| normalized = _norm(query) | |
| has_vague_language = any(token in normalized for token in vague_tokens) | |
| return explicit_signal_count < 3 or has_vague_language | |
| def _build_enrichment_prompt( | |
| query: str, | |
| wardrobe_summary: WardrobeSummary, | |
| requested_category: str | None, | |
| gender: str | None, | |
| ) -> str: | |
| return ( | |
| "You are helping enrich an underspecified Zalando shopping request. " | |
| "Return ONLY valid JSON and no prose.\n\n" | |
| "Output schema:\n" | |
| '{"suggested_types":[],"suggested_colours":[],"occasion":"","style_notes":""}\n\n' | |
| f"User query: {query}\n" | |
| f"Requested category: {requested_category or ''}\n" | |
| f"Gender: {gender or ''}\n" | |
| f"Wardrobe metadata summary: {json.dumps(wardrobe_summary, ensure_ascii=True)}\n\n" | |
| "Rules:\n" | |
| "- Keep suggested_types to product/search terms that fit the requested category.\n" | |
| "- Keep suggested_colours complementary to the wardrobe summary.\n" | |
| "- Occasion must be a single short lowercase label when possible.\n" | |
| "- style_notes must be concise and search-friendly.\n" | |
| ) | |
| def _parse_json_object(text: str) -> dict[str, Any]: | |
| raw = str(text or "").strip() | |
| if not raw: | |
| return {} | |
| try: | |
| parsed = json.loads(raw) | |
| return parsed if isinstance(parsed, dict) else {} | |
| except json.JSONDecodeError: | |
| start = raw.find("{") | |
| end = raw.rfind("}") | |
| if start == -1 or end == -1 or end <= start: | |
| return {} | |
| try: | |
| parsed = json.loads(raw[start : end + 1]) | |
| return parsed if isinstance(parsed, dict) else {} | |
| except json.JSONDecodeError: | |
| return {} | |
| def _normalize_enrichment_payload(payload: dict[str, Any], requested_category: str | None) -> dict[str, Any]: | |
| def to_list(value: Any) -> list[str]: | |
| if not isinstance(value, list): | |
| return [] | |
| cleaned: list[str] = [] | |
| for entry in value: | |
| text = str(entry or "").strip() | |
| if text and text not in cleaned: | |
| cleaned.append(text) | |
| return cleaned | |
| suggested_types = to_list(payload.get("suggested_types")) | |
| suggested_colours = to_list(payload.get("suggested_colours") or payload.get("suggested_colors")) | |
| occasion = str(payload.get("occasion") or "").strip().lower() | |
| style_notes = str(payload.get("style_notes") or "").strip() | |
| requested = _norm(requested_category) | |
| if requested and requested not in {"both", "all"} and requested not in {"topwear", "bottomwear"}: | |
| requested = "bottomwear" if any(token in requested for token in ["bottom", "trouser", "pant", "jean", "skirt", "short"]) else "topwear" | |
| if requested in {"topwear", "bottomwear"} and not suggested_types: | |
| suggested_types = [requested] | |
| if not suggested_colours: | |
| suggested_colours = ["black"] | |
| return { | |
| "suggested_types": suggested_types, | |
| "suggested_colours": suggested_colours, | |
| "occasion": occasion, | |
| "style_notes": style_notes, | |
| } | |
| def enrich_underspecified_query( | |
| query: str, | |
| wardrobe_items: list[dict[str, Any]] | None = None, | |
| requested_category: str | None = None, | |
| gender: str | None = None, | |
| completion_fn: TextCompletionFn | None = None, | |
| max_tokens: int = 500, | |
| ) -> dict[str, Any]: | |
| wardrobe_summary = summarize_wardrobe_metadata(wardrobe_items or []) | |
| if not is_underspecified_query(query, requested_category=requested_category): | |
| return { | |
| "used": False, | |
| "query": str(query or "").strip(), | |
| "wardrobe_summary": wardrobe_summary, | |
| "enrichment": { | |
| "suggested_types": [], | |
| "suggested_colours": [], | |
| "occasion": "", | |
| "style_notes": "", | |
| }, | |
| } | |
| if not completion_fn: | |
| return { | |
| "used": True, | |
| "query": str(query or "").strip(), | |
| "wardrobe_summary": wardrobe_summary, | |
| "enrichment": { | |
| "suggested_types": [], | |
| "suggested_colours": [], | |
| "occasion": "", | |
| "style_notes": "", | |
| }, | |
| } | |
| prompt = _build_enrichment_prompt(query, wardrobe_summary, requested_category, gender) | |
| model_text = completion_fn(prompt, max_tokens) | |
| parsed = _parse_json_object(model_text) | |
| enrichment = _normalize_enrichment_payload(parsed, requested_category=requested_category) | |
| return { | |
| "used": True, | |
| "query": str(query or "").strip(), | |
| "wardrobe_summary": wardrobe_summary, | |
| "enrichment": enrichment, | |
| } | |
| def compose_search_query_from_enrichment( | |
| query: str, | |
| enrichment: dict[str, Any] | None, | |
| gender: str | None = None, | |
| requested_category: str | None = None, | |
| ) -> str: | |
| base_query = str(query or "").strip() | |
| enrichment = enrichment or {} | |
| target_category = _normalize_target_category(requested_category) | |
| suggested_types = [str(value).strip() for value in (enrichment.get("suggested_types") or []) if str(value).strip()] | |
| suggested_colours = [str(value).strip() for value in (enrichment.get("suggested_colours") or []) if str(value).strip()] | |
| style_notes = str(enrichment.get("style_notes") or "").strip() | |
| occasion = str(enrichment.get("occasion") or "").strip() | |
| tokens: list[str] = [] | |
| if base_query: | |
| tokens.extend([piece for piece in re.split(r"\s+", base_query) if piece]) | |
| elif gender: | |
| tokens.append(_normalize_gender(gender, base_query)) | |
| def append_unique(token: str) -> None: | |
| cleaned = str(token or "").strip() | |
| if cleaned and cleaned not in tokens: | |
| tokens.append(cleaned) | |
| if gender: | |
| append_unique(_normalize_gender(gender, base_query)) | |
| if suggested_colours: | |
| append_unique(suggested_colours[0]) | |
| if suggested_types: | |
| append_unique(suggested_types[0]) | |
| elif requested_category: | |
| requested = _norm(requested_category) | |
| if requested in {"topwear", "bottomwear"}: | |
| append_unique(requested) | |
| elif any(token in requested for token in ["bottom", "trouser", "pant", "jean", "skirt", "short"]): | |
| append_unique("bottomwear") | |
| elif any(token in requested for token in ["top", "shirt", "tee", "blouse", "polo", "jacket"]): | |
| append_unique("topwear") | |
| if occasion: | |
| append_unique(occasion) | |
| if style_notes: | |
| style_tokens = [piece for piece in re.split(r"[^a-zA-Z0-9-]+", style_notes.lower()) if piece] | |
| for token in style_tokens[:3]: | |
| append_unique(token) | |
| if not tokens: | |
| tokens = [base_query or _normalize_gender(gender, base_query)] | |
| topwear_terms = {"shirt", "shirts", "tee", "t-shirt", "tshirt", "topwear", "blazer", "jacket", "polo", "hoodie", "kurta"} | |
| bottomwear_terms = {"trouser", "trousers", "pants", "jeans", "shorts", "joggers", "bottomwear"} | |
| normalized_tokens = [str(token).strip().lower() for token in tokens] | |
| has_topwear_term = any(token in topwear_terms for token in normalized_tokens) | |
| has_bottomwear_term = any(token in bottomwear_terms for token in normalized_tokens) | |
| if target_category == "bottomwear" and has_topwear_term and not has_bottomwear_term: | |
| replacement = "trousers" | |
| for index, token in enumerate(normalized_tokens): | |
| if token in topwear_terms: | |
| tokens[index] = replacement | |
| normalized_tokens[index] = replacement | |
| break | |
| else: | |
| append_unique(replacement) | |
| elif target_category == "topwear" and has_bottomwear_term and not has_topwear_term: | |
| replacement = "shirt" | |
| for index, token in enumerate(normalized_tokens): | |
| if token in bottomwear_terms: | |
| tokens[index] = replacement | |
| normalized_tokens[index] = replacement | |
| break | |
| else: | |
| append_unique(replacement) | |
| return " ".join(part for part in tokens if part).strip() | |
| def _normalize_gender(gender: str | None, query: str) -> str: | |
| g = _norm(gender) | |
| if g in {"men", "male", "man", "mens"}: | |
| return "men" | |
| if g in {"women", "female", "woman", "womens"}: | |
| return "women" | |
| if g == "unisex": | |
| return "unisex" | |
| query_hint = _norm(query) | |
| if any(token in query_hint for token in [" men ", "male", "man", "mens"]): | |
| return "men" | |
| if any(token in query_hint for token in [" women ", "female", "woman", "womens"]): | |
| return "women" | |
| return "unisex" | |
| def _pick_category_path(query: str, audience: str) -> str: | |
| haystack = _norm(query) | |
| selected = "" | |
| for token, path_map in CATEGORY_PATH_MAP.items(): | |
| if token in haystack: | |
| selected = path_map.get(audience) or path_map.get("unisex") or "" | |
| break | |
| if not selected: | |
| if audience == "men": | |
| selected = "mens-clothing" | |
| elif audience == "women": | |
| selected = "womens-clothing" | |
| else: | |
| selected = "clothing" | |
| if audience == "men" and selected.startswith("womens-"): | |
| selected = selected.replace("womens-", "mens-", 1) | |
| if audience == "women" and selected.startswith("mens-"): | |
| selected = selected.replace("mens-", "womens-", 1) | |
| if audience == "unisex" and selected.startswith(("mens-", "womens-")): | |
| selected = selected.split("-", 1)[1] | |
| return selected or "clothing" | |
| def build_zalando_search_url(query: str, gender: str | None = None) -> str: | |
| normalized_query = str(query or "").strip() | |
| if not normalized_query: | |
| raise ValueError("query is required") | |
| audience = _normalize_gender(gender, normalized_query) | |
| path = _pick_category_path(normalized_query, audience) | |
| params = urlencode({"q": normalized_query}) | |
| return f"{ZALANDO_BASE_URL}/{path}?{params}" | |
| def build_zalando_search_urls_from_query(query: str, gender: str | None = None) -> list[str]: | |
| normalized_query = str(query or "").strip() | |
| if not normalized_query: | |
| return [] | |
| if gender: | |
| return [build_zalando_search_url(normalized_query, gender=gender)] | |
| urls: list[str] = [] | |
| for audience in ["women", "men", "unisex"]: | |
| url = build_zalando_search_url(normalized_query, gender=audience) | |
| if url not in urls: | |
| urls.append(url) | |
| return urls | |
| def build_zalando_search_urls_from_request( | |
| query: str, | |
| gender: str | None = None, | |
| wardrobe_items: list[dict[str, Any]] | None = None, | |
| requested_category: str | None = None, | |
| completion_fn: TextCompletionFn | None = None, | |
| max_tokens: int = 500, | |
| ) -> tuple[list[str], dict[str, Any]]: | |
| enrichment_result = enrich_underspecified_query( | |
| query=query, | |
| wardrobe_items=wardrobe_items, | |
| requested_category=requested_category, | |
| gender=gender, | |
| completion_fn=completion_fn, | |
| max_tokens=max_tokens, | |
| ) | |
| final_query = compose_search_query_from_enrichment( | |
| query=enrichment_result.get("query") or query, | |
| enrichment=enrichment_result.get("enrichment") if isinstance(enrichment_result.get("enrichment"), dict) else None, | |
| gender=gender, | |
| requested_category=requested_category, | |
| ) | |
| search_urls = build_zalando_search_urls_from_query(final_query, gender=gender) | |
| return search_urls, {**enrichment_result, "final_query": final_query} | |
| def _apify_request_url() -> str: | |
| if APIFY_TOKEN: | |
| return f"{APIFY_ACTOR_ENDPOINT}?token={APIFY_TOKEN}" | |
| return APIFY_ACTOR_ENDPOINT | |
| def _apify_actor_id_from_endpoint(endpoint: str) -> str: | |
| parsed = urlparse(str(endpoint or "").strip()) | |
| segments = [segment for segment in parsed.path.split("/") if segment] | |
| if "acts" in segments: | |
| index = segments.index("acts") | |
| if index + 1 < len(segments): | |
| return segments[index + 1] | |
| return "vistics~zalando-scraper" | |
| def _build_apify_payload(search_url: str, max_results: int) -> dict[str, Any]: | |
| return { | |
| "startUrls": [str(search_url or "").strip()], | |
| "maxResults": int(max_results), | |
| } | |
| def _http_error_detail(exc: requests.RequestException, limit: int = 800) -> str: | |
| response = getattr(exc, "response", None) | |
| if response is None: | |
| return "" | |
| status = getattr(response, "status_code", None) | |
| body = "" | |
| try: | |
| body = str(response.text or "").strip().replace("\n", " ") | |
| except Exception: | |
| body = "" | |
| if body: | |
| body = body[:limit] | |
| if status is None and not body: | |
| return "" | |
| return f"status={status} body={body}".strip() | |
| def _extract_apify_items(raw_payload: Any) -> list[dict[str, Any]]: | |
| if isinstance(raw_payload, list): | |
| return [item for item in raw_payload if isinstance(item, dict)] | |
| if isinstance(raw_payload, dict): | |
| for key in ("items", "data"): | |
| value = raw_payload.get(key) | |
| if isinstance(value, list): | |
| return [item for item in value if isinstance(item, dict)] | |
| return [] | |
| def _normalize_apify_items(raw_items: list[dict[str, Any]], effective_limit: int) -> list[dict[str, str]]: | |
| items: list[dict[str, str]] = [] | |
| seen: set[str] = set() | |
| for raw in raw_items: | |
| normalized = _normalize_product(raw) | |
| if not normalized["item_link"] or normalized["item_link"] in seen: | |
| continue | |
| seen.add(normalized["item_link"]) | |
| items.append(normalized) | |
| if len(items) >= effective_limit: | |
| break | |
| return items | |
| def _scrape_with_apify_run_dataset_fallback( | |
| search_url: str, | |
| effective_limit: int, | |
| timeout_seconds: int, | |
| ) -> list[dict[str, str]]: | |
| actor_id = _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT) | |
| run_url = f"https://api.apify.com/v2/acts/{actor_id}/runs" | |
| wait_for_finish = min(max(60, APIFY_WAIT_FOR_FINISH_SECONDS), 300) | |
| variant_errors: list[str] = [] | |
| logger.info( | |
| "zalando crawl retry source=apify-run search_url=%s actor_id=%s wait_for_finish=%s", | |
| search_url, | |
| actor_id, | |
| wait_for_finish, | |
| ) | |
| variants = ["string"] | |
| for variant_name in variants: | |
| run_payload = _build_apify_payload(search_url, effective_limit) | |
| run_id = "" | |
| run_status = "" | |
| dataset_id = "" | |
| try: | |
| run_response = requests.post( | |
| run_url, | |
| params={"token": APIFY_TOKEN, "waitForFinish": wait_for_finish}, | |
| json=run_payload, | |
| timeout=timeout_seconds, | |
| ) | |
| run_response.raise_for_status() | |
| run_json = run_response.json() | |
| run_data = run_json.get("data") if isinstance(run_json, dict) else None | |
| if not isinstance(run_data, dict): | |
| variant_errors.append(f"{variant_name}: invalid run payload") | |
| continue | |
| run_id = str(run_data.get("id") or "").strip() | |
| run_status = str(run_data.get("status") or "").strip() | |
| dataset_id = str(run_data.get("defaultDatasetId") or "").strip() | |
| logger.info( | |
| "zalando crawl retry source=apify-run completed variant=%s run_id=%s status=%s dataset_id=%s", | |
| variant_name, | |
| run_id, | |
| run_status, | |
| dataset_id, | |
| ) | |
| except requests.RequestException as exc: | |
| detail = _http_error_detail(exc) | |
| variant_errors.append(f"{variant_name}: {exc} {detail}".strip()) | |
| logger.warning( | |
| "zalando crawl failed source=apify-run variant=%s search_url=%s error=%s detail=%s", | |
| variant_name, | |
| search_url, | |
| exc, | |
| detail, | |
| ) | |
| continue | |
| if not dataset_id: | |
| variant_errors.append(f"{variant_name}: missing defaultDatasetId") | |
| continue | |
| try: | |
| dataset_response = requests.get( | |
| f"https://api.apify.com/v2/datasets/{dataset_id}/items", | |
| params={ | |
| "token": APIFY_TOKEN, | |
| "clean": "true", | |
| "format": "json", | |
| "limit": effective_limit, | |
| }, | |
| timeout=timeout_seconds, | |
| ) | |
| dataset_response.raise_for_status() | |
| dataset_items = _extract_apify_items(dataset_response.json()) | |
| items = _normalize_apify_items(dataset_items, effective_limit) | |
| logger.info( | |
| "zalando crawl retry source=apify-dataset variant=%s run_id=%s dataset_id=%s raw_items=%s items=%s", | |
| variant_name, | |
| run_id, | |
| dataset_id, | |
| len(dataset_items), | |
| len(items), | |
| ) | |
| if items: | |
| return items | |
| variant_errors.append(f"{variant_name}: empty dataset") | |
| except requests.RequestException as exc: | |
| detail = _http_error_detail(exc) | |
| variant_errors.append(f"{variant_name}: {exc} {detail}".strip()) | |
| logger.warning( | |
| "zalando crawl failed source=apify-dataset variant=%s run_id=%s dataset_id=%s error=%s detail=%s", | |
| variant_name, | |
| run_id, | |
| dataset_id, | |
| exc, | |
| detail, | |
| ) | |
| if variant_errors: | |
| logger.warning( | |
| "zalando crawl retry source=apify-run exhausted search_url=%s errors=%s", | |
| search_url, | |
| "; ".join(variant_errors), | |
| ) | |
| return [] | |
| def _normalize_product(item: dict[str, Any]) -> dict[str, str]: | |
| name = str( | |
| item.get("name") | |
| or item.get("title") | |
| or item.get("productName") | |
| or item.get("product_name") | |
| or "N/A" | |
| ).strip() | |
| fallback_price = _extract_price_text( | |
| item.get("price") | |
| or item.get("currentPrice") | |
| or item.get("displayPrice") | |
| or item.get("priceLabel") | |
| or "N/A" | |
| ) | |
| currency_symbol = str(item.get("currencySymbol") or "").strip() | |
| promotional_price = _format_apify_money(item.get("promotionalPrice"), currency_symbol) | |
| original_price = _format_apify_money(item.get("originalPrice"), currency_symbol) | |
| discount_percent = str(item.get("discountPercent") or "").strip() | |
| brand = str(item.get("brand") or item.get("brandName") or "").strip() | |
| if promotional_price: | |
| price = promotional_price if not discount_percent else f"{promotional_price} ({discount_percent})" | |
| elif original_price: | |
| price = original_price | |
| else: | |
| price = fallback_price | |
| image_url = _ensure_zalando_url( | |
| str( | |
| item.get("image") | |
| or item.get("imageUrl") | |
| or item.get("image_url") | |
| or item.get("thumbnail") | |
| or "" | |
| ) | |
| ) | |
| url_value = _ensure_zalando_url( | |
| str( | |
| item.get("url") | |
| or item.get("productUrl") | |
| or item.get("item_link") | |
| or item.get("link") | |
| or "" | |
| ) | |
| ) | |
| color = str(item.get("color") or item.get("colorName") or item.get("colour") or "").strip() | |
| if not color and " - " in name: | |
| color = name.rsplit(" - ", 1)[-1].strip() | |
| return { | |
| "name": name or "N/A", | |
| "price": price or "N/A", | |
| "brand": brand, | |
| "color": color, | |
| "currency_symbol": currency_symbol, | |
| "promotional_price": promotional_price, | |
| "original_price": original_price, | |
| "discount_percent": discount_percent, | |
| "image_url": image_url, | |
| "item_link": url_value, | |
| } | |
| def _scrape_with_apify(search_url: str, max_products: int | None, timeout_seconds: int) -> list[dict[str, str]]: | |
| requested_limit = int(max_products) if isinstance(max_products, int) and max_products > 0 else APIFY_MAX_RESULTS | |
| effective_limit = min(requested_limit, APIFY_MAX_RESULTS) | |
| apify_timeout = max(int(timeout_seconds), APIFY_MIN_TIMEOUT_SECONDS) | |
| actor_id = _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT) | |
| logger.info( | |
| "zalando crawl start source=apify search_url=%s requested_max=%s effective_max=%s timeout=%s actor_id=%s", | |
| search_url, | |
| max_products, | |
| effective_limit, | |
| apify_timeout, | |
| actor_id, | |
| ) | |
| variants = ["string"] | |
| variant_errors: list[str] = [] | |
| for variant_name in variants: | |
| try: | |
| payload = _build_apify_payload(search_url, effective_limit) | |
| response = requests.post(_apify_request_url(), json=payload, timeout=apify_timeout) | |
| response.raise_for_status() | |
| raw_items = _extract_apify_items(response.json()) | |
| items = _normalize_apify_items(raw_items, effective_limit) | |
| logger.info( | |
| "zalando crawl end source=apify variant=%s search_url=%s crawled=%s raw_items=%s items=%s", | |
| variant_name, | |
| search_url, | |
| bool(items), | |
| len(raw_items), | |
| len(items), | |
| ) | |
| if items: | |
| return items | |
| variant_errors.append(f"{variant_name}: empty result") | |
| except requests.RequestException as exc: | |
| detail = _http_error_detail(exc) | |
| variant_errors.append(f"{variant_name}: {exc} {detail}".strip()) | |
| logger.warning( | |
| "zalando crawl failed source=apify variant=%s search_url=%s error=%s detail=%s", | |
| variant_name, | |
| search_url, | |
| exc, | |
| detail, | |
| ) | |
| continue | |
| try: | |
| fallback_items = _scrape_with_apify_run_dataset_fallback( | |
| search_url=search_url, | |
| effective_limit=effective_limit, | |
| timeout_seconds=apify_timeout, | |
| ) | |
| logger.info( | |
| "zalando crawl end source=apify-run search_url=%s crawled=%s items=%s", | |
| search_url, | |
| bool(fallback_items), | |
| len(fallback_items), | |
| ) | |
| if fallback_items: | |
| return fallback_items | |
| except requests.RequestException as exc: | |
| detail = _http_error_detail(exc) | |
| variant_errors.append(f"run_dataset: {exc} {detail}".strip()) | |
| logger.warning("zalando crawl failed source=apify-run search_url=%s error=%s detail=%s", search_url, exc, detail) | |
| if variant_errors: | |
| logger.warning( | |
| "zalando crawl source=apify exhausted search_url=%s errors=%s", | |
| search_url, | |
| "; ".join(variant_errors), | |
| ) | |
| logger.warning( | |
| "zalando crawl end source=apify search_url=%s crawled=False items=0 reason=no_items_from_sync_or_run_dataset", | |
| search_url, | |
| ) | |
| return [] | |
| def _scrape_with_html(search_url: str, max_products: int | None, timeout_seconds: int) -> list[dict[str, str]]: | |
| html_timeout = max(int(timeout_seconds), HTML_FALLBACK_TIMEOUT_SECONDS) | |
| logger.info("zalando crawl start source=html search_url=%s max_products=%s timeout=%s", search_url, max_products, html_timeout) | |
| response = requests.get(search_url, headers=REQUEST_HEADERS, timeout=html_timeout) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, "lxml") | |
| items: list[dict[str, str]] = [] | |
| seen: set[str] = set() | |
| cards = soup.select('article, div[data-testid*="product"], li[data-testid*="product"]') | |
| for card in cards: | |
| link_tag = card.select_one('a[href*="/p/"]') or card.find("a", href=True) | |
| if not link_tag: | |
| continue | |
| item_link = _ensure_zalando_url(str(link_tag.get("href") or "")) | |
| if not item_link or item_link in seen or "zalando" not in item_link: | |
| continue | |
| name_tag = ( | |
| card.select_one('[data-testid*="product-name"]') | |
| or card.select_one('[data-testid*="name"]') | |
| or card.find("h3") | |
| or card.find("h2") | |
| or link_tag | |
| ) | |
| name = str(name_tag.get_text(" ", strip=True) if name_tag else "N/A").strip() or "N/A" | |
| price_tag = ( | |
| card.select_one('[data-testid*="price"]') | |
| or card.find(attrs={"class": re.compile(r"price|money|amount", re.I)}) | |
| ) | |
| price_text = str(price_tag.get_text(" ", strip=True) if price_tag else "") | |
| price = _extract_price_text(price_text) | |
| img_tag = card.find("img") | |
| image_url = "" | |
| if img_tag: | |
| image_url = _ensure_zalando_url( | |
| str( | |
| img_tag.get("src") | |
| or img_tag.get("data-src") | |
| or _extract_src_from_srcset(str(img_tag.get("srcset") or "")) | |
| ) | |
| ) | |
| seen.add(item_link) | |
| items.append( | |
| { | |
| "name": name, | |
| "price": price, | |
| "image_url": image_url, | |
| "item_link": item_link, | |
| } | |
| ) | |
| if isinstance(max_products, int) and max_products > 0 and len(items) >= max_products: | |
| break | |
| logger.info("zalando crawl end source=html search_url=%s crawled=%s items=%s", search_url, bool(items), len(items)) | |
| return items | |
| def _requires_postprocess(items: list[dict[str, str]]) -> bool: | |
| if not items: | |
| return False | |
| missing = 0 | |
| for item in items: | |
| if item.get("name") in {"", "N/A"} or item.get("price") in {"", "N/A"}: | |
| missing += 1 | |
| return missing > 0 | |
| def extract_product_summaries( | |
| search_url: str, | |
| max_products: int | None = None, | |
| request_timeout_seconds: int = 35, | |
| use_apify: bool = True, | |
| postprocess: Optional[ScrapePostprocessFn] = None, | |
| ) -> list[dict[str, str]]: | |
| if not str(search_url or "").strip(): | |
| raise ValueError("search_url is required") | |
| max_count = int(max_products) if isinstance(max_products, int) and max_products > 0 else None | |
| logger.info( | |
| "zalando crawl requested search_url=%s max_products=%s capped_to=%s use_apify=%s actor_id=%s", | |
| search_url, | |
| max_products, | |
| max_count, | |
| bool(use_apify and APIFY_TOKEN), | |
| _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT), | |
| ) | |
| products: list[dict[str, str]] = [] | |
| errors: list[str] = [] | |
| if use_apify and APIFY_TOKEN: | |
| try: | |
| products = _scrape_with_apify(search_url, max_count, request_timeout_seconds) | |
| if not products: | |
| errors.append("apify: empty result set") | |
| logger.warning("zalando crawl source=apify returned zero items search_url=%s", search_url) | |
| except requests.RequestException as exc: | |
| errors.append(f"apify: {exc}") | |
| logger.warning("zalando crawl failed source=apify search_url=%s error=%s", search_url, exc) | |
| if not products: | |
| try: | |
| if use_apify and APIFY_TOKEN: | |
| logger.info("zalando crawl fallback source=html search_url=%s", search_url) | |
| products = _scrape_with_html(search_url, max_count, request_timeout_seconds) | |
| except requests.RequestException as exc: | |
| errors.append(f"html: {exc}") | |
| logger.warning("zalando crawl failed source=html search_url=%s error=%s", search_url, exc) | |
| if postprocess and _requires_postprocess(products): | |
| try: | |
| products = postprocess(products) | |
| except Exception: | |
| # Never fail scraping because post-processing failed. | |
| pass | |
| products = _filter_products_for_search_query(products, search_url) | |
| if not products and errors: | |
| logger.warning("zalando crawl completed with no results search_url=%s errors=%s", search_url, "; ".join(errors)) | |
| raise requests.RequestException("; ".join(errors)) | |
| logger.info("zalando crawl completed search_url=%s crawled=%s items=%s", search_url, bool(products), len(products)) | |
| if isinstance(max_count, int) and max_count > 0: | |
| return products[:max_count] | |
| return products | |
| def search_products( | |
| query: str, | |
| gender: str | None = None, | |
| max_products: int | None = None, | |
| use_apify: bool = True, | |
| request_timeout_seconds: int = 35, | |
| postprocess: Optional[ScrapePostprocessFn] = None, | |
| wardrobe_items: list[dict[str, Any]] | None = None, | |
| requested_category: str | None = None, | |
| completion_fn: TextCompletionFn | None = None, | |
| enrichment_max_tokens: int = 500, | |
| ) -> dict[str, Any]: | |
| max_count = int(max_products) if isinstance(max_products, int) and max_products > 0 else None | |
| search_urls, enrichment_result = build_zalando_search_urls_from_request( | |
| query=query, | |
| gender=gender, | |
| wardrobe_items=wardrobe_items, | |
| requested_category=requested_category, | |
| completion_fn=completion_fn, | |
| max_tokens=enrichment_max_tokens, | |
| ) | |
| if not search_urls: | |
| raise ValueError("query is required") | |
| logger.info( | |
| "zalando search plan query=%s search_urls=%s max_products=%s", | |
| query, | |
| len(search_urls), | |
| max_count, | |
| ) | |
| products: list[dict[str, str]] = [] | |
| seen: set[str] = set() | |
| for search_url in search_urls: | |
| summaries = extract_product_summaries( | |
| search_url=search_url, | |
| max_products=max_count, | |
| request_timeout_seconds=request_timeout_seconds, | |
| use_apify=use_apify, | |
| postprocess=postprocess, | |
| ) | |
| for item in summaries: | |
| item_link = str(item.get("item_link") or "").strip() | |
| if not item_link or item_link in seen: | |
| continue | |
| seen.add(item_link) | |
| products.append(item) | |
| if isinstance(max_count, int) and max_count > 0 and len(products) >= max_count: | |
| break | |
| if isinstance(max_count, int) and max_count > 0 and len(products) >= max_count: | |
| break | |
| logger.info( | |
| "zalando search completed query=%s crawled=%s items=%s search_urls=%s", | |
| query, | |
| bool(products), | |
| len(products), | |
| len(search_urls), | |
| ) | |
| return { | |
| "search_urls": search_urls, | |
| "products": products, | |
| "count": len(products), | |
| "enrichment": enrichment_result, | |
| } | |