from __future__ import annotations import logging import json import os import re from typing import Any, Callable, Optional from urllib.parse import parse_qs, urlencode, urlparse import requests from bs4 import BeautifulSoup def _env_int(name: str, default: int) -> int: raw = os.getenv(name) if raw is None or str(raw).strip() == "": return default try: return int(str(raw).strip()) except (TypeError, ValueError): return default ZALANDO_BASE_URL = "https://www.zalando.co.uk" APIFY_ACTOR_ENDPOINT = os.getenv( "APIFY_ACTOR_ENDPOINT", "https://api.apify.com/v2/acts/vistics~zalando-scraper/run-sync-get-dataset-items", ) APIFY_TOKEN = os.getenv("APIFY_API_TOKEN", "").strip() APIFY_MAX_RESULTS = 20 APIFY_MIN_TIMEOUT_SECONDS = max(60, _env_int("APIFY_MIN_TIMEOUT_SECONDS", 180)) APIFY_WAIT_FOR_FINISH_SECONDS = max(60, _env_int("APIFY_WAIT_FOR_FINISH_SECONDS", 300)) HTML_FALLBACK_TIMEOUT_SECONDS = max(20, _env_int("ZALANDO_HTML_TIMEOUT_SECONDS", 45)) REQUEST_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) } if not logging.getLogger().handlers: logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) logger = logging.getLogger(__name__) logger.setLevel(getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO)) CATEGORY_PATH_MAP = { "topwear": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"}, "bottomwear": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"}, "layers": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"}, "dress": {"women": "womens-clothing-dresses", "men": "mens-clothing", "unisex": "clothing"}, "dresses": {"women": "womens-clothing-dresses", "men": "mens-clothing", "unisex": "clothing"}, "shoes": {"women": "womens-shoes", "men": "mens-shoes", "unisex": "shoes"}, "footwear": {"women": "womens-shoes", "men": "mens-shoes", "unisex": "shoes"}, "sportswear": {"women": "womens-sports", "men": "mens-sports", "unisex": "sports"}, } _COLOR_TERMS = [ "black", "white", "navy", "blue", "grey", "gray", "beige", "olive", "green", "brown", "khaki", "cream", "maroon", "charcoal", "tan", "red", "pink", "purple", "yellow", "orange", ] _COLOR_QUERY_KEYWORDS: dict[str, set[str]] = { "black": {"black"}, "white": {"white", "bright white", "off white", "off-white"}, "navy": {"navy", "dark blue", "dk blue", "dress blues", "moonlit ocean", "midnight blue"}, "blue": {"blue", "navy", "dark blue", "dk blue", "dress blues", "ice blue", "light blue", "skyway", "moonlit ocean"}, "grey": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, "gray": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"}, "beige": {"beige", "sand", "tan", "stone", "morel"}, "brown": {"brown", "tan", "morel"}, "olive": {"olive", "khaki"}, "green": {"green", "olive", "khaki"}, "red": {"red", "brick red", "winetasting", "wine"}, "maroon": {"maroon", "burgundy", "wine", "winetasting"}, } _CATEGORY_QUERY_KEYWORDS: dict[str, set[str]] = { "shirt": {"shirt", "formal shirt"}, "polo": {"polo"}, "jacket": {"jacket", "blazer", "coat"}, "trousers": {"trousers", "pants", "chinos"}, "pants": {"pants", "trousers", "chinos"}, "shorts": {"shorts"}, "jeans": {"jeans"}, } ScrapePostprocessFn = Callable[[list[dict[str, str]]], list[dict[str, str]]] WardrobeSummary = dict[str, Any] TextCompletionFn = Callable[[str, int], str] def _norm(value: Any) -> str: return str(value or "").strip().lower() def _query_from_search_url(search_url: str) -> str: parsed = urlparse(str(search_url or "")) values = parse_qs(parsed.query).get("q") or [] return str(values[0] if values else "").strip() def _query_color_keywords(query: str) -> set[str]: normalized = _norm(query) for color in _COLOR_TERMS: if color in normalized: return _COLOR_QUERY_KEYWORDS.get(color, {color}) return set() def _query_category_keywords(query: str) -> set[str]: normalized = _norm(query) for category, keywords in _CATEGORY_QUERY_KEYWORDS.items(): if category in normalized: return keywords return set() def _product_match_text(product: dict[str, str]) -> str: return _norm( " ".join( [ str(product.get("name") or ""), str(product.get("color") or ""), str(product.get("brand") or ""), str(product.get("item_link") or ""), ] ) ) def _filter_products_for_search_query(products: list[dict[str, str]], search_url: str) -> list[dict[str, str]]: query = _query_from_search_url(search_url) color_keywords = _query_color_keywords(query) category_keywords = _query_category_keywords(query) if not color_keywords and not category_keywords: return products filtered: list[dict[str, str]] = [] for product in products: text = _product_match_text(product) if color_keywords and not any(keyword in text for keyword in color_keywords): continue if category_keywords and not any(keyword in text for keyword in category_keywords): continue filtered.append(product) return filtered def _normalize_target_category(value: Any) -> str: normalized = _norm(value) if normalized in {"topwear", "top", "upper", "tops"}: return "topwear" if normalized in {"bottomwear", "bottom", "lower", "bottoms"}: return "bottomwear" return "both" def _extract_price_text(value: Any) -> str: text = str(value or "").strip() if not text: return "N/A" match = re.search(r"([\u00a3$€]\s?\d+[\d,]*(?:\.\d{2})?)", text) if match: return match.group(1).replace(" ", "") return text def _extract_src_from_srcset(srcset: str) -> str: if not srcset: return "" first = srcset.split(",")[0].strip() return first.split(" ")[0].strip() def _ensure_zalando_url(value: str) -> str: href = str(value or "").strip() if not href: return "" if href.startswith("//"): return f"https:{href}" if href.startswith("/"): return f"{ZALANDO_BASE_URL}{href}" return href def _format_apify_money(raw_value: Any, currency_symbol: str) -> str: text = str(raw_value or "").strip() if not text: return "" normalized = text.replace(",", "") # Apify commonly returns minor units like 5999 => 59.99 if re.fullmatch(r"\d+", normalized): major = int(normalized) // 100 minor = int(normalized) % 100 return f"{currency_symbol}{major}.{minor:02d}" if currency_symbol else f"{major}.{minor:02d}" match = re.search(r"\d+(?:\.\d{1,2})?", normalized) if not match: return "" return f"{currency_symbol}{match.group(0)}" if currency_symbol else match.group(0) def summarize_wardrobe_metadata(wardrobe_items: list[dict[str, Any]]) -> WardrobeSummary: items = [item for item in wardrobe_items if isinstance(item, dict)] colors: dict[str, int] = {} types: dict[str, int] = {} categories: dict[str, int] = {} fabrics: dict[str, int] = {} fits: dict[str, int] = {} occasions: dict[str, int] = {} for item in items: description = item.get("description") if isinstance(item.get("description"), dict) else {} color = str(item.get("color") or description.get("color") or "").strip().lower() garment_type = str(item.get("type") or description.get("type") or "").strip().lower() category = str(item.get("category") or description.get("category") or "").strip().lower() fabric = str(item.get("fabric") or description.get("fabric") or "").strip().lower() fit = str(item.get("fit") or description.get("fit") or "").strip().lower() occasion = str(item.get("occasion") or description.get("occasion") or description.get("style") or "").strip().lower() if color: colors[color] = colors.get(color, 0) + 1 if garment_type: types[garment_type] = types.get(garment_type, 0) + 1 if category: categories[category] = categories.get(category, 0) + 1 if fabric: fabrics[fabric] = fabrics.get(fabric, 0) + 1 if fit: fits[fit] = fits.get(fit, 0) + 1 if occasion: occasions[occasion] = occasions.get(occasion, 0) + 1 def top_values(counter: dict[str, int], limit: int = 8) -> list[dict[str, Any]]: return [ {"value": key, "count": count} for key, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:limit] ] return { "total_items": len(items), "colors": top_values(colors), "types": top_values(types), "categories": top_values(categories), "fabrics": top_values(fabrics), "fits": top_values(fits), "occasions": top_values(occasions), } def _count_query_signals(query: str, requested_category: str | None = None) -> dict[str, bool]: normalized = _norm(query) has_color = any(color in normalized for color in _COLOR_TERMS) requested = _norm(requested_category) has_type = bool(requested and requested not in {"both", "all"}) or any( token in normalized for token in [ "trouser", "trousers", "pants", "jeans", "shorts", "joggers", "skirt", "dress", "topwear", "bottomwear", "shirt", "tee", "blouse", "polo", "hoodie", "jacket", "sweater", "blazer", "t-shirt", "tank", "leggings", ] ) has_style = any(token in normalized for token in [ "slim", "regular", "relaxed", "oversized", "tailored", "smart", "casual", "formal", "party", "work", "interview", "weekend", "minimal", "structured", "clean", ]) has_fit = any(token in normalized for token in ["slim-fit", "slim fit", "regular-fit", "regular fit", "relaxed-fit", "relaxed fit"]) return { "has_color": has_color, "has_type": has_type, "has_style": has_style or has_fit, } def is_underspecified_query(query: str, requested_category: str | None = None) -> bool: signals = _count_query_signals(query, requested_category=requested_category) explicit_signal_count = sum(1 for value in signals.values() if value) vague_tokens = { "some", "something", "stuff", "nice", "good", "recommend", "suggest", "maybe", "outfit", "look", } normalized = _norm(query) has_vague_language = any(token in normalized for token in vague_tokens) return explicit_signal_count < 3 or has_vague_language def _build_enrichment_prompt( query: str, wardrobe_summary: WardrobeSummary, requested_category: str | None, gender: str | None, ) -> str: return ( "You are helping enrich an underspecified Zalando shopping request. " "Return ONLY valid JSON and no prose.\n\n" "Output schema:\n" '{"suggested_types":[],"suggested_colours":[],"occasion":"","style_notes":""}\n\n' f"User query: {query}\n" f"Requested category: {requested_category or ''}\n" f"Gender: {gender or ''}\n" f"Wardrobe metadata summary: {json.dumps(wardrobe_summary, ensure_ascii=True)}\n\n" "Rules:\n" "- Keep suggested_types to product/search terms that fit the requested category.\n" "- Keep suggested_colours complementary to the wardrobe summary.\n" "- Occasion must be a single short lowercase label when possible.\n" "- style_notes must be concise and search-friendly.\n" ) def _parse_json_object(text: str) -> dict[str, Any]: raw = str(text or "").strip() if not raw: return {} try: parsed = json.loads(raw) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: start = raw.find("{") end = raw.rfind("}") if start == -1 or end == -1 or end <= start: return {} try: parsed = json.loads(raw[start : end + 1]) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: return {} def _normalize_enrichment_payload(payload: dict[str, Any], requested_category: str | None) -> dict[str, Any]: def to_list(value: Any) -> list[str]: if not isinstance(value, list): return [] cleaned: list[str] = [] for entry in value: text = str(entry or "").strip() if text and text not in cleaned: cleaned.append(text) return cleaned suggested_types = to_list(payload.get("suggested_types")) suggested_colours = to_list(payload.get("suggested_colours") or payload.get("suggested_colors")) occasion = str(payload.get("occasion") or "").strip().lower() style_notes = str(payload.get("style_notes") or "").strip() requested = _norm(requested_category) if requested and requested not in {"both", "all"} and requested not in {"topwear", "bottomwear"}: requested = "bottomwear" if any(token in requested for token in ["bottom", "trouser", "pant", "jean", "skirt", "short"]) else "topwear" if requested in {"topwear", "bottomwear"} and not suggested_types: suggested_types = [requested] if not suggested_colours: suggested_colours = ["black"] return { "suggested_types": suggested_types, "suggested_colours": suggested_colours, "occasion": occasion, "style_notes": style_notes, } def enrich_underspecified_query( query: str, wardrobe_items: list[dict[str, Any]] | None = None, requested_category: str | None = None, gender: str | None = None, completion_fn: TextCompletionFn | None = None, max_tokens: int = 500, ) -> dict[str, Any]: wardrobe_summary = summarize_wardrobe_metadata(wardrobe_items or []) if not is_underspecified_query(query, requested_category=requested_category): return { "used": False, "query": str(query or "").strip(), "wardrobe_summary": wardrobe_summary, "enrichment": { "suggested_types": [], "suggested_colours": [], "occasion": "", "style_notes": "", }, } if not completion_fn: return { "used": True, "query": str(query or "").strip(), "wardrobe_summary": wardrobe_summary, "enrichment": { "suggested_types": [], "suggested_colours": [], "occasion": "", "style_notes": "", }, } prompt = _build_enrichment_prompt(query, wardrobe_summary, requested_category, gender) model_text = completion_fn(prompt, max_tokens) parsed = _parse_json_object(model_text) enrichment = _normalize_enrichment_payload(parsed, requested_category=requested_category) return { "used": True, "query": str(query or "").strip(), "wardrobe_summary": wardrobe_summary, "enrichment": enrichment, } def compose_search_query_from_enrichment( query: str, enrichment: dict[str, Any] | None, gender: str | None = None, requested_category: str | None = None, ) -> str: base_query = str(query or "").strip() enrichment = enrichment or {} target_category = _normalize_target_category(requested_category) suggested_types = [str(value).strip() for value in (enrichment.get("suggested_types") or []) if str(value).strip()] suggested_colours = [str(value).strip() for value in (enrichment.get("suggested_colours") or []) if str(value).strip()] style_notes = str(enrichment.get("style_notes") or "").strip() occasion = str(enrichment.get("occasion") or "").strip() tokens: list[str] = [] if base_query: tokens.extend([piece for piece in re.split(r"\s+", base_query) if piece]) elif gender: tokens.append(_normalize_gender(gender, base_query)) def append_unique(token: str) -> None: cleaned = str(token or "").strip() if cleaned and cleaned not in tokens: tokens.append(cleaned) if gender: append_unique(_normalize_gender(gender, base_query)) if suggested_colours: append_unique(suggested_colours[0]) if suggested_types: append_unique(suggested_types[0]) elif requested_category: requested = _norm(requested_category) if requested in {"topwear", "bottomwear"}: append_unique(requested) elif any(token in requested for token in ["bottom", "trouser", "pant", "jean", "skirt", "short"]): append_unique("bottomwear") elif any(token in requested for token in ["top", "shirt", "tee", "blouse", "polo", "jacket"]): append_unique("topwear") if occasion: append_unique(occasion) if style_notes: style_tokens = [piece for piece in re.split(r"[^a-zA-Z0-9-]+", style_notes.lower()) if piece] for token in style_tokens[:3]: append_unique(token) if not tokens: tokens = [base_query or _normalize_gender(gender, base_query)] topwear_terms = {"shirt", "shirts", "tee", "t-shirt", "tshirt", "topwear", "blazer", "jacket", "polo", "hoodie", "kurta"} bottomwear_terms = {"trouser", "trousers", "pants", "jeans", "shorts", "joggers", "bottomwear"} normalized_tokens = [str(token).strip().lower() for token in tokens] has_topwear_term = any(token in topwear_terms for token in normalized_tokens) has_bottomwear_term = any(token in bottomwear_terms for token in normalized_tokens) if target_category == "bottomwear" and has_topwear_term and not has_bottomwear_term: replacement = "trousers" for index, token in enumerate(normalized_tokens): if token in topwear_terms: tokens[index] = replacement normalized_tokens[index] = replacement break else: append_unique(replacement) elif target_category == "topwear" and has_bottomwear_term and not has_topwear_term: replacement = "shirt" for index, token in enumerate(normalized_tokens): if token in bottomwear_terms: tokens[index] = replacement normalized_tokens[index] = replacement break else: append_unique(replacement) return " ".join(part for part in tokens if part).strip() def _normalize_gender(gender: str | None, query: str) -> str: g = _norm(gender) if g in {"men", "male", "man", "mens"}: return "men" if g in {"women", "female", "woman", "womens"}: return "women" if g == "unisex": return "unisex" query_hint = _norm(query) if any(token in query_hint for token in [" men ", "male", "man", "mens"]): return "men" if any(token in query_hint for token in [" women ", "female", "woman", "womens"]): return "women" return "unisex" def _pick_category_path(query: str, audience: str) -> str: haystack = _norm(query) selected = "" for token, path_map in CATEGORY_PATH_MAP.items(): if token in haystack: selected = path_map.get(audience) or path_map.get("unisex") or "" break if not selected: if audience == "men": selected = "mens-clothing" elif audience == "women": selected = "womens-clothing" else: selected = "clothing" if audience == "men" and selected.startswith("womens-"): selected = selected.replace("womens-", "mens-", 1) if audience == "women" and selected.startswith("mens-"): selected = selected.replace("mens-", "womens-", 1) if audience == "unisex" and selected.startswith(("mens-", "womens-")): selected = selected.split("-", 1)[1] return selected or "clothing" def build_zalando_search_url(query: str, gender: str | None = None) -> str: normalized_query = str(query or "").strip() if not normalized_query: raise ValueError("query is required") audience = _normalize_gender(gender, normalized_query) path = _pick_category_path(normalized_query, audience) params = urlencode({"q": normalized_query}) return f"{ZALANDO_BASE_URL}/{path}?{params}" def build_zalando_search_urls_from_query(query: str, gender: str | None = None) -> list[str]: normalized_query = str(query or "").strip() if not normalized_query: return [] if gender: return [build_zalando_search_url(normalized_query, gender=gender)] urls: list[str] = [] for audience in ["women", "men", "unisex"]: url = build_zalando_search_url(normalized_query, gender=audience) if url not in urls: urls.append(url) return urls def build_zalando_search_urls_from_request( query: str, gender: str | None = None, wardrobe_items: list[dict[str, Any]] | None = None, requested_category: str | None = None, completion_fn: TextCompletionFn | None = None, max_tokens: int = 500, ) -> tuple[list[str], dict[str, Any]]: enrichment_result = enrich_underspecified_query( query=query, wardrobe_items=wardrobe_items, requested_category=requested_category, gender=gender, completion_fn=completion_fn, max_tokens=max_tokens, ) final_query = compose_search_query_from_enrichment( query=enrichment_result.get("query") or query, enrichment=enrichment_result.get("enrichment") if isinstance(enrichment_result.get("enrichment"), dict) else None, gender=gender, requested_category=requested_category, ) search_urls = build_zalando_search_urls_from_query(final_query, gender=gender) return search_urls, {**enrichment_result, "final_query": final_query} def _apify_request_url() -> str: if APIFY_TOKEN: return f"{APIFY_ACTOR_ENDPOINT}?token={APIFY_TOKEN}" return APIFY_ACTOR_ENDPOINT def _apify_actor_id_from_endpoint(endpoint: str) -> str: parsed = urlparse(str(endpoint or "").strip()) segments = [segment for segment in parsed.path.split("/") if segment] if "acts" in segments: index = segments.index("acts") if index + 1 < len(segments): return segments[index + 1] return "vistics~zalando-scraper" def _build_apify_payload(search_url: str, max_results: int) -> dict[str, Any]: return { "startUrls": [str(search_url or "").strip()], "maxResults": int(max_results), } def _http_error_detail(exc: requests.RequestException, limit: int = 800) -> str: response = getattr(exc, "response", None) if response is None: return "" status = getattr(response, "status_code", None) body = "" try: body = str(response.text or "").strip().replace("\n", " ") except Exception: body = "" if body: body = body[:limit] if status is None and not body: return "" return f"status={status} body={body}".strip() def _extract_apify_items(raw_payload: Any) -> list[dict[str, Any]]: if isinstance(raw_payload, list): return [item for item in raw_payload if isinstance(item, dict)] if isinstance(raw_payload, dict): for key in ("items", "data"): value = raw_payload.get(key) if isinstance(value, list): return [item for item in value if isinstance(item, dict)] return [] def _normalize_apify_items(raw_items: list[dict[str, Any]], effective_limit: int) -> list[dict[str, str]]: items: list[dict[str, str]] = [] seen: set[str] = set() for raw in raw_items: normalized = _normalize_product(raw) if not normalized["item_link"] or normalized["item_link"] in seen: continue seen.add(normalized["item_link"]) items.append(normalized) if len(items) >= effective_limit: break return items def _scrape_with_apify_run_dataset_fallback( search_url: str, effective_limit: int, timeout_seconds: int, ) -> list[dict[str, str]]: actor_id = _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT) run_url = f"https://api.apify.com/v2/acts/{actor_id}/runs" wait_for_finish = min(max(60, APIFY_WAIT_FOR_FINISH_SECONDS), 300) variant_errors: list[str] = [] logger.info( "zalando crawl retry source=apify-run search_url=%s actor_id=%s wait_for_finish=%s", search_url, actor_id, wait_for_finish, ) variants = ["string"] for variant_name in variants: run_payload = _build_apify_payload(search_url, effective_limit) run_id = "" run_status = "" dataset_id = "" try: run_response = requests.post( run_url, params={"token": APIFY_TOKEN, "waitForFinish": wait_for_finish}, json=run_payload, timeout=timeout_seconds, ) run_response.raise_for_status() run_json = run_response.json() run_data = run_json.get("data") if isinstance(run_json, dict) else None if not isinstance(run_data, dict): variant_errors.append(f"{variant_name}: invalid run payload") continue run_id = str(run_data.get("id") or "").strip() run_status = str(run_data.get("status") or "").strip() dataset_id = str(run_data.get("defaultDatasetId") or "").strip() logger.info( "zalando crawl retry source=apify-run completed variant=%s run_id=%s status=%s dataset_id=%s", variant_name, run_id, run_status, dataset_id, ) except requests.RequestException as exc: detail = _http_error_detail(exc) variant_errors.append(f"{variant_name}: {exc} {detail}".strip()) logger.warning( "zalando crawl failed source=apify-run variant=%s search_url=%s error=%s detail=%s", variant_name, search_url, exc, detail, ) continue if not dataset_id: variant_errors.append(f"{variant_name}: missing defaultDatasetId") continue try: dataset_response = requests.get( f"https://api.apify.com/v2/datasets/{dataset_id}/items", params={ "token": APIFY_TOKEN, "clean": "true", "format": "json", "limit": effective_limit, }, timeout=timeout_seconds, ) dataset_response.raise_for_status() dataset_items = _extract_apify_items(dataset_response.json()) items = _normalize_apify_items(dataset_items, effective_limit) logger.info( "zalando crawl retry source=apify-dataset variant=%s run_id=%s dataset_id=%s raw_items=%s items=%s", variant_name, run_id, dataset_id, len(dataset_items), len(items), ) if items: return items variant_errors.append(f"{variant_name}: empty dataset") except requests.RequestException as exc: detail = _http_error_detail(exc) variant_errors.append(f"{variant_name}: {exc} {detail}".strip()) logger.warning( "zalando crawl failed source=apify-dataset variant=%s run_id=%s dataset_id=%s error=%s detail=%s", variant_name, run_id, dataset_id, exc, detail, ) if variant_errors: logger.warning( "zalando crawl retry source=apify-run exhausted search_url=%s errors=%s", search_url, "; ".join(variant_errors), ) return [] def _normalize_product(item: dict[str, Any]) -> dict[str, str]: name = str( item.get("name") or item.get("title") or item.get("productName") or item.get("product_name") or "N/A" ).strip() fallback_price = _extract_price_text( item.get("price") or item.get("currentPrice") or item.get("displayPrice") or item.get("priceLabel") or "N/A" ) currency_symbol = str(item.get("currencySymbol") or "").strip() promotional_price = _format_apify_money(item.get("promotionalPrice"), currency_symbol) original_price = _format_apify_money(item.get("originalPrice"), currency_symbol) discount_percent = str(item.get("discountPercent") or "").strip() brand = str(item.get("brand") or item.get("brandName") or "").strip() if promotional_price: price = promotional_price if not discount_percent else f"{promotional_price} ({discount_percent})" elif original_price: price = original_price else: price = fallback_price image_url = _ensure_zalando_url( str( item.get("image") or item.get("imageUrl") or item.get("image_url") or item.get("thumbnail") or "" ) ) url_value = _ensure_zalando_url( str( item.get("url") or item.get("productUrl") or item.get("item_link") or item.get("link") or "" ) ) color = str(item.get("color") or item.get("colorName") or item.get("colour") or "").strip() if not color and " - " in name: color = name.rsplit(" - ", 1)[-1].strip() return { "name": name or "N/A", "price": price or "N/A", "brand": brand, "color": color, "currency_symbol": currency_symbol, "promotional_price": promotional_price, "original_price": original_price, "discount_percent": discount_percent, "image_url": image_url, "item_link": url_value, } def _scrape_with_apify(search_url: str, max_products: int | None, timeout_seconds: int) -> list[dict[str, str]]: requested_limit = int(max_products) if isinstance(max_products, int) and max_products > 0 else APIFY_MAX_RESULTS effective_limit = min(requested_limit, APIFY_MAX_RESULTS) apify_timeout = max(int(timeout_seconds), APIFY_MIN_TIMEOUT_SECONDS) actor_id = _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT) logger.info( "zalando crawl start source=apify search_url=%s requested_max=%s effective_max=%s timeout=%s actor_id=%s", search_url, max_products, effective_limit, apify_timeout, actor_id, ) variants = ["string"] variant_errors: list[str] = [] for variant_name in variants: try: payload = _build_apify_payload(search_url, effective_limit) response = requests.post(_apify_request_url(), json=payload, timeout=apify_timeout) response.raise_for_status() raw_items = _extract_apify_items(response.json()) items = _normalize_apify_items(raw_items, effective_limit) logger.info( "zalando crawl end source=apify variant=%s search_url=%s crawled=%s raw_items=%s items=%s", variant_name, search_url, bool(items), len(raw_items), len(items), ) if items: return items variant_errors.append(f"{variant_name}: empty result") except requests.RequestException as exc: detail = _http_error_detail(exc) variant_errors.append(f"{variant_name}: {exc} {detail}".strip()) logger.warning( "zalando crawl failed source=apify variant=%s search_url=%s error=%s detail=%s", variant_name, search_url, exc, detail, ) continue try: fallback_items = _scrape_with_apify_run_dataset_fallback( search_url=search_url, effective_limit=effective_limit, timeout_seconds=apify_timeout, ) logger.info( "zalando crawl end source=apify-run search_url=%s crawled=%s items=%s", search_url, bool(fallback_items), len(fallback_items), ) if fallback_items: return fallback_items except requests.RequestException as exc: detail = _http_error_detail(exc) variant_errors.append(f"run_dataset: {exc} {detail}".strip()) logger.warning("zalando crawl failed source=apify-run search_url=%s error=%s detail=%s", search_url, exc, detail) if variant_errors: logger.warning( "zalando crawl source=apify exhausted search_url=%s errors=%s", search_url, "; ".join(variant_errors), ) logger.warning( "zalando crawl end source=apify search_url=%s crawled=False items=0 reason=no_items_from_sync_or_run_dataset", search_url, ) return [] def _scrape_with_html(search_url: str, max_products: int | None, timeout_seconds: int) -> list[dict[str, str]]: html_timeout = max(int(timeout_seconds), HTML_FALLBACK_TIMEOUT_SECONDS) logger.info("zalando crawl start source=html search_url=%s max_products=%s timeout=%s", search_url, max_products, html_timeout) response = requests.get(search_url, headers=REQUEST_HEADERS, timeout=html_timeout) response.raise_for_status() soup = BeautifulSoup(response.content, "lxml") items: list[dict[str, str]] = [] seen: set[str] = set() cards = soup.select('article, div[data-testid*="product"], li[data-testid*="product"]') for card in cards: link_tag = card.select_one('a[href*="/p/"]') or card.find("a", href=True) if not link_tag: continue item_link = _ensure_zalando_url(str(link_tag.get("href") or "")) if not item_link or item_link in seen or "zalando" not in item_link: continue name_tag = ( card.select_one('[data-testid*="product-name"]') or card.select_one('[data-testid*="name"]') or card.find("h3") or card.find("h2") or link_tag ) name = str(name_tag.get_text(" ", strip=True) if name_tag else "N/A").strip() or "N/A" price_tag = ( card.select_one('[data-testid*="price"]') or card.find(attrs={"class": re.compile(r"price|money|amount", re.I)}) ) price_text = str(price_tag.get_text(" ", strip=True) if price_tag else "") price = _extract_price_text(price_text) img_tag = card.find("img") image_url = "" if img_tag: image_url = _ensure_zalando_url( str( img_tag.get("src") or img_tag.get("data-src") or _extract_src_from_srcset(str(img_tag.get("srcset") or "")) ) ) seen.add(item_link) items.append( { "name": name, "price": price, "image_url": image_url, "item_link": item_link, } ) if isinstance(max_products, int) and max_products > 0 and len(items) >= max_products: break logger.info("zalando crawl end source=html search_url=%s crawled=%s items=%s", search_url, bool(items), len(items)) return items def _requires_postprocess(items: list[dict[str, str]]) -> bool: if not items: return False missing = 0 for item in items: if item.get("name") in {"", "N/A"} or item.get("price") in {"", "N/A"}: missing += 1 return missing > 0 def extract_product_summaries( search_url: str, max_products: int | None = None, request_timeout_seconds: int = 35, use_apify: bool = True, postprocess: Optional[ScrapePostprocessFn] = None, ) -> list[dict[str, str]]: if not str(search_url or "").strip(): raise ValueError("search_url is required") max_count = int(max_products) if isinstance(max_products, int) and max_products > 0 else None logger.info( "zalando crawl requested search_url=%s max_products=%s capped_to=%s use_apify=%s actor_id=%s", search_url, max_products, max_count, bool(use_apify and APIFY_TOKEN), _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT), ) products: list[dict[str, str]] = [] errors: list[str] = [] if use_apify and APIFY_TOKEN: try: products = _scrape_with_apify(search_url, max_count, request_timeout_seconds) if not products: errors.append("apify: empty result set") logger.warning("zalando crawl source=apify returned zero items search_url=%s", search_url) except requests.RequestException as exc: errors.append(f"apify: {exc}") logger.warning("zalando crawl failed source=apify search_url=%s error=%s", search_url, exc) if not products: try: if use_apify and APIFY_TOKEN: logger.info("zalando crawl fallback source=html search_url=%s", search_url) products = _scrape_with_html(search_url, max_count, request_timeout_seconds) except requests.RequestException as exc: errors.append(f"html: {exc}") logger.warning("zalando crawl failed source=html search_url=%s error=%s", search_url, exc) if postprocess and _requires_postprocess(products): try: products = postprocess(products) except Exception: # Never fail scraping because post-processing failed. pass products = _filter_products_for_search_query(products, search_url) if not products and errors: logger.warning("zalando crawl completed with no results search_url=%s errors=%s", search_url, "; ".join(errors)) raise requests.RequestException("; ".join(errors)) logger.info("zalando crawl completed search_url=%s crawled=%s items=%s", search_url, bool(products), len(products)) if isinstance(max_count, int) and max_count > 0: return products[:max_count] return products def search_products( query: str, gender: str | None = None, max_products: int | None = None, use_apify: bool = True, request_timeout_seconds: int = 35, postprocess: Optional[ScrapePostprocessFn] = None, wardrobe_items: list[dict[str, Any]] | None = None, requested_category: str | None = None, completion_fn: TextCompletionFn | None = None, enrichment_max_tokens: int = 500, ) -> dict[str, Any]: max_count = int(max_products) if isinstance(max_products, int) and max_products > 0 else None search_urls, enrichment_result = build_zalando_search_urls_from_request( query=query, gender=gender, wardrobe_items=wardrobe_items, requested_category=requested_category, completion_fn=completion_fn, max_tokens=enrichment_max_tokens, ) if not search_urls: raise ValueError("query is required") logger.info( "zalando search plan query=%s search_urls=%s max_products=%s", query, len(search_urls), max_count, ) products: list[dict[str, str]] = [] seen: set[str] = set() for search_url in search_urls: summaries = extract_product_summaries( search_url=search_url, max_products=max_count, request_timeout_seconds=request_timeout_seconds, use_apify=use_apify, postprocess=postprocess, ) for item in summaries: item_link = str(item.get("item_link") or "").strip() if not item_link or item_link in seen: continue seen.add(item_link) products.append(item) if isinstance(max_count, int) and max_count > 0 and len(products) >= max_count: break if isinstance(max_count, int) and max_count > 0 and len(products) >= max_count: break logger.info( "zalando search completed query=%s crawled=%s items=%s search_urls=%s", query, bool(products), len(products), len(search_urls), ) return { "search_urls": search_urls, "products": products, "count": len(products), "enrichment": enrichment_result, }