StyleWellBackend / zalando_scraper.py
HelloWorld0204's picture
Upload 22 files
3116e09 verified
from __future__ import annotations
import logging
import json
import os
import re
from typing import Any, Callable, Optional
from urllib.parse import parse_qs, urlencode, urlparse
import requests
from bs4 import BeautifulSoup
def _env_int(name: str, default: int) -> int:
raw = os.getenv(name)
if raw is None or str(raw).strip() == "":
return default
try:
return int(str(raw).strip())
except (TypeError, ValueError):
return default
ZALANDO_BASE_URL = "https://www.zalando.co.uk"
APIFY_ACTOR_ENDPOINT = os.getenv(
"APIFY_ACTOR_ENDPOINT",
"https://api.apify.com/v2/acts/vistics~zalando-scraper/run-sync-get-dataset-items",
)
APIFY_TOKEN = os.getenv("APIFY_API_TOKEN", "").strip()
APIFY_MAX_RESULTS = 20
APIFY_MIN_TIMEOUT_SECONDS = max(60, _env_int("APIFY_MIN_TIMEOUT_SECONDS", 180))
APIFY_WAIT_FOR_FINISH_SECONDS = max(60, _env_int("APIFY_WAIT_FOR_FINISH_SECONDS", 300))
HTML_FALLBACK_TIMEOUT_SECONDS = max(20, _env_int("ZALANDO_HTML_TIMEOUT_SECONDS", 45))
REQUEST_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
}
if not logging.getLogger().handlers:
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
logger.setLevel(getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO))
CATEGORY_PATH_MAP = {
"topwear": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"},
"bottomwear": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"},
"layers": {"women": "womens-clothing", "men": "mens-clothing", "unisex": "clothing"},
"dress": {"women": "womens-clothing-dresses", "men": "mens-clothing", "unisex": "clothing"},
"dresses": {"women": "womens-clothing-dresses", "men": "mens-clothing", "unisex": "clothing"},
"shoes": {"women": "womens-shoes", "men": "mens-shoes", "unisex": "shoes"},
"footwear": {"women": "womens-shoes", "men": "mens-shoes", "unisex": "shoes"},
"sportswear": {"women": "womens-sports", "men": "mens-sports", "unisex": "sports"},
}
_COLOR_TERMS = [
"black",
"white",
"navy",
"blue",
"grey",
"gray",
"beige",
"olive",
"green",
"brown",
"khaki",
"cream",
"maroon",
"charcoal",
"tan",
"red",
"pink",
"purple",
"yellow",
"orange",
]
_COLOR_QUERY_KEYWORDS: dict[str, set[str]] = {
"black": {"black"},
"white": {"white", "bright white", "off white", "off-white"},
"navy": {"navy", "dark blue", "dk blue", "dress blues", "moonlit ocean", "midnight blue"},
"blue": {"blue", "navy", "dark blue", "dk blue", "dress blues", "ice blue", "light blue", "skyway", "moonlit ocean"},
"grey": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"},
"gray": {"grey", "gray", "dark grey", "dark gray", "steel grey", "steel gray", "charcoal"},
"beige": {"beige", "sand", "tan", "stone", "morel"},
"brown": {"brown", "tan", "morel"},
"olive": {"olive", "khaki"},
"green": {"green", "olive", "khaki"},
"red": {"red", "brick red", "winetasting", "wine"},
"maroon": {"maroon", "burgundy", "wine", "winetasting"},
}
_CATEGORY_QUERY_KEYWORDS: dict[str, set[str]] = {
"shirt": {"shirt", "formal shirt"},
"polo": {"polo"},
"jacket": {"jacket", "blazer", "coat"},
"trousers": {"trousers", "pants", "chinos"},
"pants": {"pants", "trousers", "chinos"},
"shorts": {"shorts"},
"jeans": {"jeans"},
}
ScrapePostprocessFn = Callable[[list[dict[str, str]]], list[dict[str, str]]]
WardrobeSummary = dict[str, Any]
TextCompletionFn = Callable[[str, int], str]
def _norm(value: Any) -> str:
return str(value or "").strip().lower()
def _query_from_search_url(search_url: str) -> str:
parsed = urlparse(str(search_url or ""))
values = parse_qs(parsed.query).get("q") or []
return str(values[0] if values else "").strip()
def _query_color_keywords(query: str) -> set[str]:
normalized = _norm(query)
for color in _COLOR_TERMS:
if color in normalized:
return _COLOR_QUERY_KEYWORDS.get(color, {color})
return set()
def _query_category_keywords(query: str) -> set[str]:
normalized = _norm(query)
for category, keywords in _CATEGORY_QUERY_KEYWORDS.items():
if category in normalized:
return keywords
return set()
def _product_match_text(product: dict[str, str]) -> str:
return _norm(
" ".join(
[
str(product.get("name") or ""),
str(product.get("color") or ""),
str(product.get("brand") or ""),
str(product.get("item_link") or ""),
]
)
)
def _filter_products_for_search_query(products: list[dict[str, str]], search_url: str) -> list[dict[str, str]]:
query = _query_from_search_url(search_url)
color_keywords = _query_color_keywords(query)
category_keywords = _query_category_keywords(query)
if not color_keywords and not category_keywords:
return products
filtered: list[dict[str, str]] = []
for product in products:
text = _product_match_text(product)
if color_keywords and not any(keyword in text for keyword in color_keywords):
continue
if category_keywords and not any(keyword in text for keyword in category_keywords):
continue
filtered.append(product)
return filtered
def _normalize_target_category(value: Any) -> str:
normalized = _norm(value)
if normalized in {"topwear", "top", "upper", "tops"}:
return "topwear"
if normalized in {"bottomwear", "bottom", "lower", "bottoms"}:
return "bottomwear"
return "both"
def _extract_price_text(value: Any) -> str:
text = str(value or "").strip()
if not text:
return "N/A"
match = re.search(r"([\u00a3$€]\s?\d+[\d,]*(?:\.\d{2})?)", text)
if match:
return match.group(1).replace(" ", "")
return text
def _extract_src_from_srcset(srcset: str) -> str:
if not srcset:
return ""
first = srcset.split(",")[0].strip()
return first.split(" ")[0].strip()
def _ensure_zalando_url(value: str) -> str:
href = str(value or "").strip()
if not href:
return ""
if href.startswith("//"):
return f"https:{href}"
if href.startswith("/"):
return f"{ZALANDO_BASE_URL}{href}"
return href
def _format_apify_money(raw_value: Any, currency_symbol: str) -> str:
text = str(raw_value or "").strip()
if not text:
return ""
normalized = text.replace(",", "")
# Apify commonly returns minor units like 5999 => 59.99
if re.fullmatch(r"\d+", normalized):
major = int(normalized) // 100
minor = int(normalized) % 100
return f"{currency_symbol}{major}.{minor:02d}" if currency_symbol else f"{major}.{minor:02d}"
match = re.search(r"\d+(?:\.\d{1,2})?", normalized)
if not match:
return ""
return f"{currency_symbol}{match.group(0)}" if currency_symbol else match.group(0)
def summarize_wardrobe_metadata(wardrobe_items: list[dict[str, Any]]) -> WardrobeSummary:
items = [item for item in wardrobe_items if isinstance(item, dict)]
colors: dict[str, int] = {}
types: dict[str, int] = {}
categories: dict[str, int] = {}
fabrics: dict[str, int] = {}
fits: dict[str, int] = {}
occasions: dict[str, int] = {}
for item in items:
description = item.get("description") if isinstance(item.get("description"), dict) else {}
color = str(item.get("color") or description.get("color") or "").strip().lower()
garment_type = str(item.get("type") or description.get("type") or "").strip().lower()
category = str(item.get("category") or description.get("category") or "").strip().lower()
fabric = str(item.get("fabric") or description.get("fabric") or "").strip().lower()
fit = str(item.get("fit") or description.get("fit") or "").strip().lower()
occasion = str(item.get("occasion") or description.get("occasion") or description.get("style") or "").strip().lower()
if color:
colors[color] = colors.get(color, 0) + 1
if garment_type:
types[garment_type] = types.get(garment_type, 0) + 1
if category:
categories[category] = categories.get(category, 0) + 1
if fabric:
fabrics[fabric] = fabrics.get(fabric, 0) + 1
if fit:
fits[fit] = fits.get(fit, 0) + 1
if occasion:
occasions[occasion] = occasions.get(occasion, 0) + 1
def top_values(counter: dict[str, int], limit: int = 8) -> list[dict[str, Any]]:
return [
{"value": key, "count": count}
for key, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:limit]
]
return {
"total_items": len(items),
"colors": top_values(colors),
"types": top_values(types),
"categories": top_values(categories),
"fabrics": top_values(fabrics),
"fits": top_values(fits),
"occasions": top_values(occasions),
}
def _count_query_signals(query: str, requested_category: str | None = None) -> dict[str, bool]:
normalized = _norm(query)
has_color = any(color in normalized for color in _COLOR_TERMS)
requested = _norm(requested_category)
has_type = bool(requested and requested not in {"both", "all"}) or any(
token in normalized for token in [
"trouser", "trousers", "pants", "jeans", "shorts", "joggers", "skirt", "dress",
"topwear", "bottomwear", "shirt", "tee", "blouse", "polo", "hoodie", "jacket",
"sweater", "blazer", "t-shirt", "tank", "leggings",
]
)
has_style = any(token in normalized for token in [
"slim", "regular", "relaxed", "oversized", "tailored", "smart", "casual", "formal",
"party", "work", "interview", "weekend", "minimal", "structured", "clean",
])
has_fit = any(token in normalized for token in ["slim-fit", "slim fit", "regular-fit", "regular fit", "relaxed-fit", "relaxed fit"])
return {
"has_color": has_color,
"has_type": has_type,
"has_style": has_style or has_fit,
}
def is_underspecified_query(query: str, requested_category: str | None = None) -> bool:
signals = _count_query_signals(query, requested_category=requested_category)
explicit_signal_count = sum(1 for value in signals.values() if value)
vague_tokens = {
"some",
"something",
"stuff",
"nice",
"good",
"recommend",
"suggest",
"maybe",
"outfit",
"look",
}
normalized = _norm(query)
has_vague_language = any(token in normalized for token in vague_tokens)
return explicit_signal_count < 3 or has_vague_language
def _build_enrichment_prompt(
query: str,
wardrobe_summary: WardrobeSummary,
requested_category: str | None,
gender: str | None,
) -> str:
return (
"You are helping enrich an underspecified Zalando shopping request. "
"Return ONLY valid JSON and no prose.\n\n"
"Output schema:\n"
'{"suggested_types":[],"suggested_colours":[],"occasion":"","style_notes":""}\n\n'
f"User query: {query}\n"
f"Requested category: {requested_category or ''}\n"
f"Gender: {gender or ''}\n"
f"Wardrobe metadata summary: {json.dumps(wardrobe_summary, ensure_ascii=True)}\n\n"
"Rules:\n"
"- Keep suggested_types to product/search terms that fit the requested category.\n"
"- Keep suggested_colours complementary to the wardrobe summary.\n"
"- Occasion must be a single short lowercase label when possible.\n"
"- style_notes must be concise and search-friendly.\n"
)
def _parse_json_object(text: str) -> dict[str, Any]:
raw = str(text or "").strip()
if not raw:
return {}
try:
parsed = json.loads(raw)
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
start = raw.find("{")
end = raw.rfind("}")
if start == -1 or end == -1 or end <= start:
return {}
try:
parsed = json.loads(raw[start : end + 1])
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
return {}
def _normalize_enrichment_payload(payload: dict[str, Any], requested_category: str | None) -> dict[str, Any]:
def to_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
cleaned: list[str] = []
for entry in value:
text = str(entry or "").strip()
if text and text not in cleaned:
cleaned.append(text)
return cleaned
suggested_types = to_list(payload.get("suggested_types"))
suggested_colours = to_list(payload.get("suggested_colours") or payload.get("suggested_colors"))
occasion = str(payload.get("occasion") or "").strip().lower()
style_notes = str(payload.get("style_notes") or "").strip()
requested = _norm(requested_category)
if requested and requested not in {"both", "all"} and requested not in {"topwear", "bottomwear"}:
requested = "bottomwear" if any(token in requested for token in ["bottom", "trouser", "pant", "jean", "skirt", "short"]) else "topwear"
if requested in {"topwear", "bottomwear"} and not suggested_types:
suggested_types = [requested]
if not suggested_colours:
suggested_colours = ["black"]
return {
"suggested_types": suggested_types,
"suggested_colours": suggested_colours,
"occasion": occasion,
"style_notes": style_notes,
}
def enrich_underspecified_query(
query: str,
wardrobe_items: list[dict[str, Any]] | None = None,
requested_category: str | None = None,
gender: str | None = None,
completion_fn: TextCompletionFn | None = None,
max_tokens: int = 500,
) -> dict[str, Any]:
wardrobe_summary = summarize_wardrobe_metadata(wardrobe_items or [])
if not is_underspecified_query(query, requested_category=requested_category):
return {
"used": False,
"query": str(query or "").strip(),
"wardrobe_summary": wardrobe_summary,
"enrichment": {
"suggested_types": [],
"suggested_colours": [],
"occasion": "",
"style_notes": "",
},
}
if not completion_fn:
return {
"used": True,
"query": str(query or "").strip(),
"wardrobe_summary": wardrobe_summary,
"enrichment": {
"suggested_types": [],
"suggested_colours": [],
"occasion": "",
"style_notes": "",
},
}
prompt = _build_enrichment_prompt(query, wardrobe_summary, requested_category, gender)
model_text = completion_fn(prompt, max_tokens)
parsed = _parse_json_object(model_text)
enrichment = _normalize_enrichment_payload(parsed, requested_category=requested_category)
return {
"used": True,
"query": str(query or "").strip(),
"wardrobe_summary": wardrobe_summary,
"enrichment": enrichment,
}
def compose_search_query_from_enrichment(
query: str,
enrichment: dict[str, Any] | None,
gender: str | None = None,
requested_category: str | None = None,
) -> str:
base_query = str(query or "").strip()
enrichment = enrichment or {}
target_category = _normalize_target_category(requested_category)
suggested_types = [str(value).strip() for value in (enrichment.get("suggested_types") or []) if str(value).strip()]
suggested_colours = [str(value).strip() for value in (enrichment.get("suggested_colours") or []) if str(value).strip()]
style_notes = str(enrichment.get("style_notes") or "").strip()
occasion = str(enrichment.get("occasion") or "").strip()
tokens: list[str] = []
if base_query:
tokens.extend([piece for piece in re.split(r"\s+", base_query) if piece])
elif gender:
tokens.append(_normalize_gender(gender, base_query))
def append_unique(token: str) -> None:
cleaned = str(token or "").strip()
if cleaned and cleaned not in tokens:
tokens.append(cleaned)
if gender:
append_unique(_normalize_gender(gender, base_query))
if suggested_colours:
append_unique(suggested_colours[0])
if suggested_types:
append_unique(suggested_types[0])
elif requested_category:
requested = _norm(requested_category)
if requested in {"topwear", "bottomwear"}:
append_unique(requested)
elif any(token in requested for token in ["bottom", "trouser", "pant", "jean", "skirt", "short"]):
append_unique("bottomwear")
elif any(token in requested for token in ["top", "shirt", "tee", "blouse", "polo", "jacket"]):
append_unique("topwear")
if occasion:
append_unique(occasion)
if style_notes:
style_tokens = [piece for piece in re.split(r"[^a-zA-Z0-9-]+", style_notes.lower()) if piece]
for token in style_tokens[:3]:
append_unique(token)
if not tokens:
tokens = [base_query or _normalize_gender(gender, base_query)]
topwear_terms = {"shirt", "shirts", "tee", "t-shirt", "tshirt", "topwear", "blazer", "jacket", "polo", "hoodie", "kurta"}
bottomwear_terms = {"trouser", "trousers", "pants", "jeans", "shorts", "joggers", "bottomwear"}
normalized_tokens = [str(token).strip().lower() for token in tokens]
has_topwear_term = any(token in topwear_terms for token in normalized_tokens)
has_bottomwear_term = any(token in bottomwear_terms for token in normalized_tokens)
if target_category == "bottomwear" and has_topwear_term and not has_bottomwear_term:
replacement = "trousers"
for index, token in enumerate(normalized_tokens):
if token in topwear_terms:
tokens[index] = replacement
normalized_tokens[index] = replacement
break
else:
append_unique(replacement)
elif target_category == "topwear" and has_bottomwear_term and not has_topwear_term:
replacement = "shirt"
for index, token in enumerate(normalized_tokens):
if token in bottomwear_terms:
tokens[index] = replacement
normalized_tokens[index] = replacement
break
else:
append_unique(replacement)
return " ".join(part for part in tokens if part).strip()
def _normalize_gender(gender: str | None, query: str) -> str:
g = _norm(gender)
if g in {"men", "male", "man", "mens"}:
return "men"
if g in {"women", "female", "woman", "womens"}:
return "women"
if g == "unisex":
return "unisex"
query_hint = _norm(query)
if any(token in query_hint for token in [" men ", "male", "man", "mens"]):
return "men"
if any(token in query_hint for token in [" women ", "female", "woman", "womens"]):
return "women"
return "unisex"
def _pick_category_path(query: str, audience: str) -> str:
haystack = _norm(query)
selected = ""
for token, path_map in CATEGORY_PATH_MAP.items():
if token in haystack:
selected = path_map.get(audience) or path_map.get("unisex") or ""
break
if not selected:
if audience == "men":
selected = "mens-clothing"
elif audience == "women":
selected = "womens-clothing"
else:
selected = "clothing"
if audience == "men" and selected.startswith("womens-"):
selected = selected.replace("womens-", "mens-", 1)
if audience == "women" and selected.startswith("mens-"):
selected = selected.replace("mens-", "womens-", 1)
if audience == "unisex" and selected.startswith(("mens-", "womens-")):
selected = selected.split("-", 1)[1]
return selected or "clothing"
def build_zalando_search_url(query: str, gender: str | None = None) -> str:
normalized_query = str(query or "").strip()
if not normalized_query:
raise ValueError("query is required")
audience = _normalize_gender(gender, normalized_query)
path = _pick_category_path(normalized_query, audience)
params = urlencode({"q": normalized_query})
return f"{ZALANDO_BASE_URL}/{path}?{params}"
def build_zalando_search_urls_from_query(query: str, gender: str | None = None) -> list[str]:
normalized_query = str(query or "").strip()
if not normalized_query:
return []
if gender:
return [build_zalando_search_url(normalized_query, gender=gender)]
urls: list[str] = []
for audience in ["women", "men", "unisex"]:
url = build_zalando_search_url(normalized_query, gender=audience)
if url not in urls:
urls.append(url)
return urls
def build_zalando_search_urls_from_request(
query: str,
gender: str | None = None,
wardrobe_items: list[dict[str, Any]] | None = None,
requested_category: str | None = None,
completion_fn: TextCompletionFn | None = None,
max_tokens: int = 500,
) -> tuple[list[str], dict[str, Any]]:
enrichment_result = enrich_underspecified_query(
query=query,
wardrobe_items=wardrobe_items,
requested_category=requested_category,
gender=gender,
completion_fn=completion_fn,
max_tokens=max_tokens,
)
final_query = compose_search_query_from_enrichment(
query=enrichment_result.get("query") or query,
enrichment=enrichment_result.get("enrichment") if isinstance(enrichment_result.get("enrichment"), dict) else None,
gender=gender,
requested_category=requested_category,
)
search_urls = build_zalando_search_urls_from_query(final_query, gender=gender)
return search_urls, {**enrichment_result, "final_query": final_query}
def _apify_request_url() -> str:
if APIFY_TOKEN:
return f"{APIFY_ACTOR_ENDPOINT}?token={APIFY_TOKEN}"
return APIFY_ACTOR_ENDPOINT
def _apify_actor_id_from_endpoint(endpoint: str) -> str:
parsed = urlparse(str(endpoint or "").strip())
segments = [segment for segment in parsed.path.split("/") if segment]
if "acts" in segments:
index = segments.index("acts")
if index + 1 < len(segments):
return segments[index + 1]
return "vistics~zalando-scraper"
def _build_apify_payload(search_url: str, max_results: int) -> dict[str, Any]:
return {
"startUrls": [str(search_url or "").strip()],
"maxResults": int(max_results),
}
def _http_error_detail(exc: requests.RequestException, limit: int = 800) -> str:
response = getattr(exc, "response", None)
if response is None:
return ""
status = getattr(response, "status_code", None)
body = ""
try:
body = str(response.text or "").strip().replace("\n", " ")
except Exception:
body = ""
if body:
body = body[:limit]
if status is None and not body:
return ""
return f"status={status} body={body}".strip()
def _extract_apify_items(raw_payload: Any) -> list[dict[str, Any]]:
if isinstance(raw_payload, list):
return [item for item in raw_payload if isinstance(item, dict)]
if isinstance(raw_payload, dict):
for key in ("items", "data"):
value = raw_payload.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
return []
def _normalize_apify_items(raw_items: list[dict[str, Any]], effective_limit: int) -> list[dict[str, str]]:
items: list[dict[str, str]] = []
seen: set[str] = set()
for raw in raw_items:
normalized = _normalize_product(raw)
if not normalized["item_link"] or normalized["item_link"] in seen:
continue
seen.add(normalized["item_link"])
items.append(normalized)
if len(items) >= effective_limit:
break
return items
def _scrape_with_apify_run_dataset_fallback(
search_url: str,
effective_limit: int,
timeout_seconds: int,
) -> list[dict[str, str]]:
actor_id = _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT)
run_url = f"https://api.apify.com/v2/acts/{actor_id}/runs"
wait_for_finish = min(max(60, APIFY_WAIT_FOR_FINISH_SECONDS), 300)
variant_errors: list[str] = []
logger.info(
"zalando crawl retry source=apify-run search_url=%s actor_id=%s wait_for_finish=%s",
search_url,
actor_id,
wait_for_finish,
)
variants = ["string"]
for variant_name in variants:
run_payload = _build_apify_payload(search_url, effective_limit)
run_id = ""
run_status = ""
dataset_id = ""
try:
run_response = requests.post(
run_url,
params={"token": APIFY_TOKEN, "waitForFinish": wait_for_finish},
json=run_payload,
timeout=timeout_seconds,
)
run_response.raise_for_status()
run_json = run_response.json()
run_data = run_json.get("data") if isinstance(run_json, dict) else None
if not isinstance(run_data, dict):
variant_errors.append(f"{variant_name}: invalid run payload")
continue
run_id = str(run_data.get("id") or "").strip()
run_status = str(run_data.get("status") or "").strip()
dataset_id = str(run_data.get("defaultDatasetId") or "").strip()
logger.info(
"zalando crawl retry source=apify-run completed variant=%s run_id=%s status=%s dataset_id=%s",
variant_name,
run_id,
run_status,
dataset_id,
)
except requests.RequestException as exc:
detail = _http_error_detail(exc)
variant_errors.append(f"{variant_name}: {exc} {detail}".strip())
logger.warning(
"zalando crawl failed source=apify-run variant=%s search_url=%s error=%s detail=%s",
variant_name,
search_url,
exc,
detail,
)
continue
if not dataset_id:
variant_errors.append(f"{variant_name}: missing defaultDatasetId")
continue
try:
dataset_response = requests.get(
f"https://api.apify.com/v2/datasets/{dataset_id}/items",
params={
"token": APIFY_TOKEN,
"clean": "true",
"format": "json",
"limit": effective_limit,
},
timeout=timeout_seconds,
)
dataset_response.raise_for_status()
dataset_items = _extract_apify_items(dataset_response.json())
items = _normalize_apify_items(dataset_items, effective_limit)
logger.info(
"zalando crawl retry source=apify-dataset variant=%s run_id=%s dataset_id=%s raw_items=%s items=%s",
variant_name,
run_id,
dataset_id,
len(dataset_items),
len(items),
)
if items:
return items
variant_errors.append(f"{variant_name}: empty dataset")
except requests.RequestException as exc:
detail = _http_error_detail(exc)
variant_errors.append(f"{variant_name}: {exc} {detail}".strip())
logger.warning(
"zalando crawl failed source=apify-dataset variant=%s run_id=%s dataset_id=%s error=%s detail=%s",
variant_name,
run_id,
dataset_id,
exc,
detail,
)
if variant_errors:
logger.warning(
"zalando crawl retry source=apify-run exhausted search_url=%s errors=%s",
search_url,
"; ".join(variant_errors),
)
return []
def _normalize_product(item: dict[str, Any]) -> dict[str, str]:
name = str(
item.get("name")
or item.get("title")
or item.get("productName")
or item.get("product_name")
or "N/A"
).strip()
fallback_price = _extract_price_text(
item.get("price")
or item.get("currentPrice")
or item.get("displayPrice")
or item.get("priceLabel")
or "N/A"
)
currency_symbol = str(item.get("currencySymbol") or "").strip()
promotional_price = _format_apify_money(item.get("promotionalPrice"), currency_symbol)
original_price = _format_apify_money(item.get("originalPrice"), currency_symbol)
discount_percent = str(item.get("discountPercent") or "").strip()
brand = str(item.get("brand") or item.get("brandName") or "").strip()
if promotional_price:
price = promotional_price if not discount_percent else f"{promotional_price} ({discount_percent})"
elif original_price:
price = original_price
else:
price = fallback_price
image_url = _ensure_zalando_url(
str(
item.get("image")
or item.get("imageUrl")
or item.get("image_url")
or item.get("thumbnail")
or ""
)
)
url_value = _ensure_zalando_url(
str(
item.get("url")
or item.get("productUrl")
or item.get("item_link")
or item.get("link")
or ""
)
)
color = str(item.get("color") or item.get("colorName") or item.get("colour") or "").strip()
if not color and " - " in name:
color = name.rsplit(" - ", 1)[-1].strip()
return {
"name": name or "N/A",
"price": price or "N/A",
"brand": brand,
"color": color,
"currency_symbol": currency_symbol,
"promotional_price": promotional_price,
"original_price": original_price,
"discount_percent": discount_percent,
"image_url": image_url,
"item_link": url_value,
}
def _scrape_with_apify(search_url: str, max_products: int | None, timeout_seconds: int) -> list[dict[str, str]]:
requested_limit = int(max_products) if isinstance(max_products, int) and max_products > 0 else APIFY_MAX_RESULTS
effective_limit = min(requested_limit, APIFY_MAX_RESULTS)
apify_timeout = max(int(timeout_seconds), APIFY_MIN_TIMEOUT_SECONDS)
actor_id = _apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT)
logger.info(
"zalando crawl start source=apify search_url=%s requested_max=%s effective_max=%s timeout=%s actor_id=%s",
search_url,
max_products,
effective_limit,
apify_timeout,
actor_id,
)
variants = ["string"]
variant_errors: list[str] = []
for variant_name in variants:
try:
payload = _build_apify_payload(search_url, effective_limit)
response = requests.post(_apify_request_url(), json=payload, timeout=apify_timeout)
response.raise_for_status()
raw_items = _extract_apify_items(response.json())
items = _normalize_apify_items(raw_items, effective_limit)
logger.info(
"zalando crawl end source=apify variant=%s search_url=%s crawled=%s raw_items=%s items=%s",
variant_name,
search_url,
bool(items),
len(raw_items),
len(items),
)
if items:
return items
variant_errors.append(f"{variant_name}: empty result")
except requests.RequestException as exc:
detail = _http_error_detail(exc)
variant_errors.append(f"{variant_name}: {exc} {detail}".strip())
logger.warning(
"zalando crawl failed source=apify variant=%s search_url=%s error=%s detail=%s",
variant_name,
search_url,
exc,
detail,
)
continue
try:
fallback_items = _scrape_with_apify_run_dataset_fallback(
search_url=search_url,
effective_limit=effective_limit,
timeout_seconds=apify_timeout,
)
logger.info(
"zalando crawl end source=apify-run search_url=%s crawled=%s items=%s",
search_url,
bool(fallback_items),
len(fallback_items),
)
if fallback_items:
return fallback_items
except requests.RequestException as exc:
detail = _http_error_detail(exc)
variant_errors.append(f"run_dataset: {exc} {detail}".strip())
logger.warning("zalando crawl failed source=apify-run search_url=%s error=%s detail=%s", search_url, exc, detail)
if variant_errors:
logger.warning(
"zalando crawl source=apify exhausted search_url=%s errors=%s",
search_url,
"; ".join(variant_errors),
)
logger.warning(
"zalando crawl end source=apify search_url=%s crawled=False items=0 reason=no_items_from_sync_or_run_dataset",
search_url,
)
return []
def _scrape_with_html(search_url: str, max_products: int | None, timeout_seconds: int) -> list[dict[str, str]]:
html_timeout = max(int(timeout_seconds), HTML_FALLBACK_TIMEOUT_SECONDS)
logger.info("zalando crawl start source=html search_url=%s max_products=%s timeout=%s", search_url, max_products, html_timeout)
response = requests.get(search_url, headers=REQUEST_HEADERS, timeout=html_timeout)
response.raise_for_status()
soup = BeautifulSoup(response.content, "lxml")
items: list[dict[str, str]] = []
seen: set[str] = set()
cards = soup.select('article, div[data-testid*="product"], li[data-testid*="product"]')
for card in cards:
link_tag = card.select_one('a[href*="/p/"]') or card.find("a", href=True)
if not link_tag:
continue
item_link = _ensure_zalando_url(str(link_tag.get("href") or ""))
if not item_link or item_link in seen or "zalando" not in item_link:
continue
name_tag = (
card.select_one('[data-testid*="product-name"]')
or card.select_one('[data-testid*="name"]')
or card.find("h3")
or card.find("h2")
or link_tag
)
name = str(name_tag.get_text(" ", strip=True) if name_tag else "N/A").strip() or "N/A"
price_tag = (
card.select_one('[data-testid*="price"]')
or card.find(attrs={"class": re.compile(r"price|money|amount", re.I)})
)
price_text = str(price_tag.get_text(" ", strip=True) if price_tag else "")
price = _extract_price_text(price_text)
img_tag = card.find("img")
image_url = ""
if img_tag:
image_url = _ensure_zalando_url(
str(
img_tag.get("src")
or img_tag.get("data-src")
or _extract_src_from_srcset(str(img_tag.get("srcset") or ""))
)
)
seen.add(item_link)
items.append(
{
"name": name,
"price": price,
"image_url": image_url,
"item_link": item_link,
}
)
if isinstance(max_products, int) and max_products > 0 and len(items) >= max_products:
break
logger.info("zalando crawl end source=html search_url=%s crawled=%s items=%s", search_url, bool(items), len(items))
return items
def _requires_postprocess(items: list[dict[str, str]]) -> bool:
if not items:
return False
missing = 0
for item in items:
if item.get("name") in {"", "N/A"} or item.get("price") in {"", "N/A"}:
missing += 1
return missing > 0
def extract_product_summaries(
search_url: str,
max_products: int | None = None,
request_timeout_seconds: int = 35,
use_apify: bool = True,
postprocess: Optional[ScrapePostprocessFn] = None,
) -> list[dict[str, str]]:
if not str(search_url or "").strip():
raise ValueError("search_url is required")
max_count = int(max_products) if isinstance(max_products, int) and max_products > 0 else None
logger.info(
"zalando crawl requested search_url=%s max_products=%s capped_to=%s use_apify=%s actor_id=%s",
search_url,
max_products,
max_count,
bool(use_apify and APIFY_TOKEN),
_apify_actor_id_from_endpoint(APIFY_ACTOR_ENDPOINT),
)
products: list[dict[str, str]] = []
errors: list[str] = []
if use_apify and APIFY_TOKEN:
try:
products = _scrape_with_apify(search_url, max_count, request_timeout_seconds)
if not products:
errors.append("apify: empty result set")
logger.warning("zalando crawl source=apify returned zero items search_url=%s", search_url)
except requests.RequestException as exc:
errors.append(f"apify: {exc}")
logger.warning("zalando crawl failed source=apify search_url=%s error=%s", search_url, exc)
if not products:
try:
if use_apify and APIFY_TOKEN:
logger.info("zalando crawl fallback source=html search_url=%s", search_url)
products = _scrape_with_html(search_url, max_count, request_timeout_seconds)
except requests.RequestException as exc:
errors.append(f"html: {exc}")
logger.warning("zalando crawl failed source=html search_url=%s error=%s", search_url, exc)
if postprocess and _requires_postprocess(products):
try:
products = postprocess(products)
except Exception:
# Never fail scraping because post-processing failed.
pass
products = _filter_products_for_search_query(products, search_url)
if not products and errors:
logger.warning("zalando crawl completed with no results search_url=%s errors=%s", search_url, "; ".join(errors))
raise requests.RequestException("; ".join(errors))
logger.info("zalando crawl completed search_url=%s crawled=%s items=%s", search_url, bool(products), len(products))
if isinstance(max_count, int) and max_count > 0:
return products[:max_count]
return products
def search_products(
query: str,
gender: str | None = None,
max_products: int | None = None,
use_apify: bool = True,
request_timeout_seconds: int = 35,
postprocess: Optional[ScrapePostprocessFn] = None,
wardrobe_items: list[dict[str, Any]] | None = None,
requested_category: str | None = None,
completion_fn: TextCompletionFn | None = None,
enrichment_max_tokens: int = 500,
) -> dict[str, Any]:
max_count = int(max_products) if isinstance(max_products, int) and max_products > 0 else None
search_urls, enrichment_result = build_zalando_search_urls_from_request(
query=query,
gender=gender,
wardrobe_items=wardrobe_items,
requested_category=requested_category,
completion_fn=completion_fn,
max_tokens=enrichment_max_tokens,
)
if not search_urls:
raise ValueError("query is required")
logger.info(
"zalando search plan query=%s search_urls=%s max_products=%s",
query,
len(search_urls),
max_count,
)
products: list[dict[str, str]] = []
seen: set[str] = set()
for search_url in search_urls:
summaries = extract_product_summaries(
search_url=search_url,
max_products=max_count,
request_timeout_seconds=request_timeout_seconds,
use_apify=use_apify,
postprocess=postprocess,
)
for item in summaries:
item_link = str(item.get("item_link") or "").strip()
if not item_link or item_link in seen:
continue
seen.add(item_link)
products.append(item)
if isinstance(max_count, int) and max_count > 0 and len(products) >= max_count:
break
if isinstance(max_count, int) and max_count > 0 and len(products) >= max_count:
break
logger.info(
"zalando search completed query=%s crawled=%s items=%s search_urls=%s",
query,
bool(products),
len(products),
len(search_urls),
)
return {
"search_urls": search_urls,
"products": products,
"count": len(products),
"enrichment": enrichment_result,
}