Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import tempfile | |
| from typing import Dict, Any, List, Generator, Tuple, Optional | |
| import requests | |
| import gradio as gr | |
| import base64 | |
| import re | |
| PRINTIFY_BASE = "https://api.printify.com" | |
| DEFAULT_BASE_PRICE = 0.001 | |
| GRID_FILENAME = "grid.png" | |
| HARD_MAX_VARIANTS_PER_PRODUCT = 100 | |
| CHUNK_TARGET_VARIANTS = 85 | |
| def _now() -> str: | |
| return time.strftime("%H:%M:%S") | |
| def _sleep(ms: int): | |
| time.sleep(ms / 1000) | |
| def _auth_headers() -> Dict[str, str]: | |
| token = os.environ.get("PRINTIFY_API_TOKEN") | |
| if not token: | |
| raise RuntimeError("Missing PRINTIFY_API_TOKEN (HF Space Secret).") | |
| return {"Authorization": f"Bearer {token}"} | |
| def _parse_retry_after_seconds(r: requests.Response) -> Optional[float]: | |
| ra = r.headers.get("Retry-After") | |
| if ra: | |
| try: | |
| return float(ra) | |
| except Exception: | |
| pass | |
| reset = r.headers.get("X-RateLimit-Reset") or r.headers.get("X-RateLimit-Reset-After") | |
| if reset: | |
| try: | |
| return float(reset) | |
| except Exception: | |
| pass | |
| try: | |
| txt = (r.text or "")[:2000] | |
| except Exception: | |
| txt = "" | |
| m = re.search(r"(\d+(\.\d+)?)\s*(s|sec|secs|second|seconds)\b", txt, re.I) | |
| if m: | |
| try: | |
| return float(m.group(1)) | |
| except Exception: | |
| return None | |
| return None | |
| def _req( | |
| method: str, | |
| path: str, | |
| json_body: Optional[Dict[str, Any]] = None, | |
| logs: Optional[List[str]] = None, | |
| ) -> Any: | |
| kwargs: Dict[str, Any] = { | |
| "headers": _auth_headers(), | |
| "timeout": 60, | |
| } | |
| if json_body is not None: | |
| kwargs["json"] = json_body | |
| while True: | |
| r = requests.request( | |
| method, | |
| f"{PRINTIFY_BASE}{path}", | |
| **kwargs, | |
| ) | |
| if r.status_code == 429: | |
| wait_s = _parse_retry_after_seconds(r) | |
| if logs is not None: | |
| _log( | |
| logs, | |
| f"RATE_LIMIT 429 path={path} retry_after={wait_s} " | |
| f"headers={dict(r.headers)} body={r.text[:400]}", | |
| ) | |
| if wait_s is None: | |
| raise RuntimeError(f"HTTP 429 without retry instruction: {r.text[:2000]}") | |
| time.sleep(max(0.0, wait_s)) | |
| continue | |
| if r.status_code >= 500: | |
| wait_s = _parse_retry_after_seconds(r) | |
| if logs is not None: | |
| _log( | |
| logs, | |
| f"SERVER_ERROR {r.status_code} path={path} retry_after={wait_s} " | |
| f"body={r.text[:400]}", | |
| ) | |
| if wait_s is None: | |
| raise RuntimeError(f"HTTP {r.status_code}: {r.text[:2000]}") | |
| time.sleep(max(0.0, wait_s)) | |
| continue | |
| if r.status_code >= 400: | |
| raise RuntimeError(f"HTTP {r.status_code}: {r.text[:2000]}") | |
| if r.text: | |
| return r.json() | |
| return {} | |
| def _log(logs: List[str], msg: str): | |
| logs.append(f"[{_now()}] {msg}") | |
| def _find_first_valid_pair(logs: List[str]) -> Dict[str, Any]: | |
| _log(logs, "Listing blueprints") | |
| blueprints = _req("GET", "/v1/catalog/blueprints.json", logs=logs) | |
| for bp in blueprints: | |
| bp_id = str(bp["id"]) | |
| _log(logs, f"Blueprint {bp_id}: fetching providers") | |
| providers = _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}/print_providers.json", | |
| logs=logs, | |
| ) | |
| for p in providers: | |
| p_id = str(p["id"]) | |
| _log(logs, f"Blueprint {bp_id} / Provider {p_id}: fetching variants") | |
| vr = _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/variants.json?show-out-of-stock=1", | |
| logs=logs, | |
| ) | |
| variants = vr.get("variants") | |
| if isinstance(variants, list) and variants: | |
| _log(logs, f"FOUND {len(variants)} variants") | |
| return { | |
| "blueprint": bp, | |
| "provider": p, | |
| "variants": variants, | |
| "blueprintDetails": _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}.json", | |
| logs=logs, | |
| ), | |
| "providerDetails": _req( | |
| "GET", | |
| f"/v1/catalog/print_providers/{p_id}.json", | |
| logs=logs, | |
| ), | |
| "shippingInfo": _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/shipping.json", | |
| logs=logs, | |
| ), | |
| } | |
| raise RuntimeError("No blueprint/provider pair with variants found.") | |
| def _get_all_provider_blobs_for_blueprint( | |
| logs: List[str], | |
| blueprint: Dict[str, Any], | |
| ) -> List[Dict[str, Any]]: | |
| bp_id = str(blueprint.get("id")) | |
| if not bp_id: | |
| raise RuntimeError("Blueprint object missing id.") | |
| _log(logs, f"BLUEPRINT_ALL_PROVIDERS blueprint_id={bp_id}: fetching providers") | |
| providers = _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}/print_providers.json", | |
| logs=logs, | |
| ) | |
| if not isinstance(providers, list) or not providers: | |
| raise RuntimeError(f"No providers found for blueprint {bp_id}.") | |
| bp_details = _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}.json", | |
| logs=logs, | |
| ) | |
| blobs: List[Dict[str, Any]] = [] | |
| for p in providers: | |
| if not isinstance(p, dict): | |
| continue | |
| p_id = str(p.get("id")) | |
| if not p_id: | |
| continue | |
| _log(logs, f"Blueprint {bp_id} / Provider {p_id}: fetching variants (ALL)") | |
| vr = _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/variants.json?show-out-of-stock=1", | |
| logs=logs, | |
| ) | |
| variants = vr.get("variants") | |
| count = len(variants) if isinstance(variants, list) else 0 | |
| _log( | |
| logs, | |
| f"PROVIDER_VARIANTS blueprint={bp_id} provider={p_id} count={count}", | |
| ) | |
| if not isinstance(variants, list) or not variants: | |
| continue | |
| provider_details = _req( | |
| "GET", | |
| f"/v1/catalog/print_providers/{p_id}.json", | |
| logs=logs, | |
| ) | |
| shipping_info = _req( | |
| "GET", | |
| f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/shipping.json", | |
| logs=logs, | |
| ) | |
| blobs.append( | |
| { | |
| "blueprint": blueprint, | |
| "provider": p, | |
| "variants": variants, | |
| "blueprintDetails": bp_details, | |
| "providerDetails": provider_details, | |
| "shippingInfo": shipping_info, | |
| } | |
| ) | |
| if not blobs: | |
| raise RuntimeError(f"No providers with variants for blueprint {bp_id}.") | |
| return blobs | |
| def _build_product(blob: Dict[str, Any], currency: str, logs: List[str]) -> Dict[str, Any]: | |
| variants = blob["variants"] | |
| snapshot = [] | |
| for v in variants: | |
| cents = v.get("price") | |
| cents = int(cents) if isinstance(cents, (int, str)) and str(cents).isdigit() else None | |
| ph = v.get("placeholders") | |
| if not isinstance(ph, list): | |
| ph = [] | |
| placeholders = [] | |
| for x in ph: | |
| if not isinstance(x, dict): | |
| continue | |
| placeholders.append({ | |
| "position": x.get("position"), | |
| "width": x.get("width"), | |
| "height": x.get("height"), | |
| }) | |
| snapshot.append({ | |
| "id": v.get("id"), | |
| "sku": v.get("sku"), | |
| "size": (v.get("options") or {}).get("size"), | |
| "color": (v.get("options") or {}).get("color"), | |
| "priceCents": cents, | |
| "price": round(cents / 100, 2) if cents is not None else None, | |
| "placeholders": placeholders, | |
| }) | |
| _log(logs, f"VARIANT_SNAPSHOT sample={json.dumps(snapshot[:20])}") | |
| print_area_sample = [] | |
| for s in snapshot[:20]: | |
| print_area_sample.append({ | |
| "id": s.get("id"), | |
| "size": s.get("size"), | |
| "color": s.get("color"), | |
| "placeholders": s.get("placeholders"), | |
| }) | |
| _log(logs, f"PRINT_AREA_SNAPSHOT sample={json.dumps(print_area_sample)}") | |
| print_areas_by_variant: Dict[str, Any] = {} | |
| for s in snapshot: | |
| vid = s.get("id") | |
| if vid is None: | |
| continue | |
| per_pos: Dict[str, Any] = {} | |
| for ph in (s.get("placeholders") or []): | |
| pos = ph.get("position") | |
| if not pos: | |
| continue | |
| per_pos[str(pos)] = {"width": ph.get("width"), "height": ph.get("height")} | |
| print_areas_by_variant[str(vid)] = per_pos | |
| all_positions = sorted({ | |
| ph.get("position") | |
| for s in snapshot | |
| for ph in (s.get("placeholders") or []) | |
| if ph.get("position") | |
| }) | |
| all_cents = [v["priceCents"] for v in snapshot if v["priceCents"] is not None] | |
| min_price = round(min(all_cents) / 100, 2) if all_cents else DEFAULT_BASE_PRICE | |
| colors = sorted({v["color"] for v in snapshot if v["color"]}) | |
| sizes = sorted({v["size"] for v in snapshot if v["size"]}) | |
| provider = blob["providerDetails"] | |
| loc = provider.get("location") or {} | |
| loc_str = ", ".join(x for x in [loc.get("city"), loc.get("country")] if x) or "empty" | |
| description = ( | |
| (blob["blueprintDetails"].get("description") or "empty") | |
| + f"\n\nFulfilled by {provider.get('title') or provider.get('name')} — {loc_str}" | |
| ) | |
| return { | |
| "name": blob["blueprintDetails"].get("title"), | |
| "currency": currency, | |
| "price": min_price, | |
| "tags": ["Printify", f"Provider: {provider.get('title') or provider.get('name')}"], | |
| "options": { | |
| "Color": colors, | |
| "Size": sizes, | |
| }, | |
| "description": description, | |
| "variants": snapshot, | |
| "printAreas": { | |
| "positions": all_positions, | |
| "byVariantId": print_areas_by_variant, | |
| "units": "px", | |
| }, | |
| "raw": blob, | |
| } | |
| def _pick_shop_id(shops_resp: Any) -> str: | |
| if isinstance(shops_resp, list): | |
| items = shops_resp | |
| elif isinstance(shops_resp, dict): | |
| items = shops_resp.get("data") or shops_resp.get("shops") or shops_resp.get("items") or [] | |
| else: | |
| items = [] | |
| if not isinstance(items, list) or not items: | |
| raise RuntimeError("No shops found on /v1/shops.json for this token.") | |
| forced = (os.environ.get("PRINTIFY_SHOP_ID") or "").strip() | |
| if forced: | |
| for s in items: | |
| if isinstance(s, dict) and str(s.get("id")) == forced: | |
| return str(s["id"]) | |
| raise RuntimeError(f"Preferred shop '{forced}' not found; refusing to fallback.") | |
| preferred_name = os.environ.get("PRINTIFY_SHOP_NAME", "Atheria").strip().lower() | |
| for s in items: | |
| if not isinstance(s, dict): | |
| continue | |
| title = (s.get("title") or s.get("name") or "").strip().lower() | |
| if title and title == preferred_name: | |
| return str(s["id"]) | |
| raise RuntimeError(f"Preferred shop name '{os.environ.get('PRINTIFY_SHOP_NAME','Atheria')}' not found; refusing to fallback.") | |
| def _upload_grid_from_file(logs: List[str]) -> Dict[str, Any]: | |
| path = os.path.join(os.getcwd(), GRID_FILENAME) | |
| if not os.path.exists(path): | |
| raise RuntimeError(f"Grid image not found at {path}") | |
| with open(path, "rb") as f: | |
| raw = f.read() | |
| b64 = base64.b64encode(raw).decode("ascii") | |
| payload = { | |
| "file_name": GRID_FILENAME, | |
| "contents": b64, | |
| } | |
| _log(logs, f"UPLOADING_GRID path={path} bytes={len(raw)} b64len={len(b64)}") | |
| resp = _req("POST", "/v1/uploads/images.json", json_body=payload, logs=logs) | |
| if not isinstance(resp, dict) or not resp.get("id"): | |
| raise RuntimeError(f"Unexpected upload response: {str(resp)[:500]}") | |
| _log( | |
| logs, | |
| f"GRID_UPLOAD id={resp.get('id')} file={resp.get('file_name')} " | |
| f"w={resp.get('width')} h={resp.get('height')}", | |
| ) | |
| return resp | |
| def _scale_fill(ph_w: float, ph_h: float, img_w: float, img_h: float) -> float: | |
| if ph_w <= 0 or ph_h <= 0 or img_w <= 0 or img_h <= 0: | |
| return 1.0 | |
| s = (ph_h * img_w) / (ph_w * img_h) | |
| if s < 1.0: | |
| s = 1.0 | |
| return float(s) | |
| def _log_catalog_variants(logs: List[str], product_info: Dict[str, Any], limit: int = 50): | |
| variants = product_info.get("variants") or [] | |
| total = len(variants) if isinstance(variants, list) else 0 | |
| _log(logs, f"CATALOG_VARIANTS_TOTAL {total}") | |
| if not isinstance(variants, list): | |
| return | |
| n = 0 | |
| for v in variants: | |
| if not isinstance(v, dict): | |
| continue | |
| vid = v.get("id") | |
| size = v.get("size") | |
| color = v.get("color") | |
| ph = v.get("placeholders") or [] | |
| phn = len(ph) if isinstance(ph, list) else 0 | |
| _log(logs, f"CATALOG_VARIANT id={vid} size={size} color={color} placeholders={phn}") | |
| n += 1 | |
| if n >= limit: | |
| break | |
| def _create_product_all_variants_with_grid( | |
| logs: List[str], | |
| shop_id: str, | |
| blob: Dict[str, Any], | |
| product_info: Dict[str, Any], | |
| upload: Dict[str, Any], | |
| ) -> Dict[str, Any]: | |
| bp_id = (blob.get("blueprint") or {}).get("id") | |
| provider_id = (blob.get("provider") or {}).get("id") | |
| if bp_id is None or provider_id is None: | |
| raise RuntimeError("Missing blueprint or provider id.") | |
| enabled_variants = product_info.get("variants") or [] | |
| if not isinstance(enabled_variants, list) or not enabled_variants: | |
| raise RuntimeError("No variants to create product with.") | |
| all_variants = product_info.get("_allVariants") or enabled_variants | |
| if not isinstance(all_variants, list) or not all_variants: | |
| raise RuntimeError("No all-variants universe found for this provider.") | |
| enabled_ids_raw = product_info.get("_enabledVariantIds") or [] | |
| enabled_ids = set() | |
| for x in enabled_ids_raw: | |
| try: | |
| enabled_ids.add(int(x)) | |
| except Exception: | |
| pass | |
| if not enabled_ids: | |
| for v in enabled_variants: | |
| vid = v.get("id") | |
| if vid is None: | |
| continue | |
| try: | |
| enabled_ids.add(int(vid)) | |
| except Exception: | |
| continue | |
| img_id = upload.get("id") | |
| img_w = float(upload.get("width") or 0) | |
| img_h = float(upload.get("height") or 0) | |
| variants_payload = [] | |
| print_areas_payload = [] | |
| enabled_count = 0 | |
| for v in all_variants: | |
| if not isinstance(v, dict): | |
| continue | |
| vid = v.get("id") | |
| if vid is None: | |
| continue | |
| try: | |
| vid_i = int(vid) | |
| except Exception: | |
| continue | |
| is_on = vid_i in enabled_ids | |
| if is_on: | |
| enabled_count += 1 | |
| variants_payload.append({ | |
| "id": vid_i, | |
| "price": 1, | |
| "is_enabled": bool(is_on), | |
| }) | |
| for v in enabled_variants: | |
| if not isinstance(v, dict): | |
| continue | |
| vid = v.get("id") | |
| if vid is None: | |
| continue | |
| try: | |
| vid_i = int(vid) | |
| except Exception: | |
| continue | |
| if vid_i not in enabled_ids: | |
| continue | |
| placeholders = v.get("placeholders") or [] | |
| if not isinstance(placeholders, list) or not placeholders: | |
| continue | |
| ph_payload = [] | |
| for ph in placeholders: | |
| if not isinstance(ph, dict): | |
| continue | |
| pos = ph.get("position") | |
| pw = ph.get("width") | |
| phh = ph.get("height") | |
| if not pos or pw is None or phh is None: | |
| continue | |
| try: | |
| pwf = float(pw) | |
| phf = float(phh) | |
| except Exception: | |
| continue | |
| scale = _scale_fill(pwf, phf, img_w, img_h) | |
| ph_payload.append({ | |
| "position": pos, | |
| "images": [{ | |
| "id": img_id, | |
| "x": 0.5, | |
| "y": 0.5, | |
| "scale": scale, | |
| "angle": 0, | |
| }], | |
| }) | |
| if ph_payload: | |
| print_areas_payload.append({ | |
| "variant_ids": [vid_i], | |
| "placeholders": ph_payload, | |
| }) | |
| if not variants_payload: | |
| raise RuntimeError("No valid variant ids to create product with.") | |
| if not print_areas_payload: | |
| raise RuntimeError("No placeholders payload generated for any enabled variant.") | |
| provider_details = blob.get("providerDetails") or {} | |
| provider_name = provider_details.get("title") or provider_details.get("name") or str(provider_id) | |
| base_title = (product_info.get("name") or "Printify Grid Test") + f" — {provider_name}" | |
| suffix = f" [bp={bp_id} pp={provider_id}]" | |
| if product_info.get("_dup") is True: | |
| title = base_title + " — Duplicate" + f" [bp={bp_id} pp={provider_id} dup]" | |
| else: | |
| title = base_title + suffix | |
| description = product_info.get("description") or "" | |
| payload = { | |
| "title": title, | |
| "description": description, | |
| "blueprint_id": int(bp_id), | |
| "print_provider_id": int(provider_id), | |
| "variants": variants_payload, | |
| "print_areas": print_areas_payload, | |
| } | |
| _log( | |
| logs, | |
| f"PHASE_B_CREATE shop_id={shop_id} blueprint_id={bp_id} provider_id={provider_id} " | |
| f"variants_payload_total={len(variants_payload)} enabled={enabled_count} print_areas_payload={len(print_areas_payload)}", | |
| ) | |
| created = _req( | |
| "POST", | |
| f"/v1/shops/{shop_id}/products.json", | |
| json_body=payload, | |
| logs=logs, | |
| ) | |
| if not isinstance(created, Dict): | |
| raise RuntimeError(f"Unexpected create response: {str(created)[:500]}") | |
| _log(logs, f"PHASE_B_CREATED product_id={created.get('id')}") | |
| return created | |
| def _get_product(logs: List[str], shop_id: str, product_id: str) -> Dict[str, Any]: | |
| prod = _req("GET", f"/v1/shops/{shop_id}/products/{product_id}.json", logs=logs) | |
| if not isinstance(prod, dict): | |
| raise RuntimeError(f"Unexpected product response: {str(prod)[:500]}") | |
| variants = prod.get("variants") or [] | |
| _log(logs, f"SHOP_VARIANTS_TOTAL {len(variants) if isinstance(variants, list) else 0}") | |
| if isinstance(variants, list): | |
| for v in variants: | |
| if not isinstance(v, dict): | |
| continue | |
| _log( | |
| logs, | |
| f"SHOP_VARIANT id={v.get('id')} enabled={v.get('is_enabled')} " | |
| f"cost={v.get('cost')} price={v.get('price')}", | |
| ) | |
| return prod | |
| def _margin_pct() -> float: | |
| raw = (os.environ.get("PRINTIFY_MARGIN_PCT") or "").strip() | |
| if not raw: | |
| return 0.0 | |
| try: | |
| return float(raw) | |
| except Exception: | |
| return 0.0 | |
| def _update_prices_to_cost_plus_margin( | |
| logs: List[str], | |
| shop_id: str, | |
| product_id: str, | |
| product: Dict[str, Any], | |
| ) -> Dict[str, Any]: | |
| variants = product.get("variants") or [] | |
| if not isinstance(variants, list) or not variants: | |
| raise RuntimeError("No variants found on created product for price update.") | |
| m = _margin_pct() | |
| payload_variants = [] | |
| for v in variants: | |
| if not isinstance(v, dict): | |
| continue | |
| if v.get("is_enabled") is not True: | |
| continue | |
| vid = v.get("id") | |
| cost = v.get("cost") | |
| if vid is None or cost is None: | |
| continue | |
| try: | |
| cost_cents = int(cost) | |
| except Exception: | |
| continue | |
| price_cents = int(round(cost_cents * (1.0 + m))) | |
| if price_cents < 1: | |
| price_cents = 1 | |
| payload_variants.append({ | |
| "id": int(vid), | |
| "price": price_cents, | |
| "is_enabled": True, | |
| }) | |
| _log(logs, f"PRICE_SET id={vid} cost={cost_cents} price={price_cents} margin_pct={m}") | |
| if not payload_variants: | |
| raise RuntimeError("No variant prices could be computed from costs.") | |
| upd = _req( | |
| "PUT", | |
| f"/v1/shops/{shop_id}/products/{product_id}.json", | |
| json_body={"variants": payload_variants}, | |
| logs=logs, | |
| ) | |
| if not isinstance(upd, dict): | |
| raise RuntimeError(f"Unexpected update response: {str(upd)[:500]}") | |
| _log(logs, f"PRICE_UPDATE_DONE variants={len(payload_variants)}") | |
| return upd | |
| def _rebuild_options_for_variants(variants: List[Dict[str, Any]]) -> Dict[str, List[str]]: | |
| colors = sorted({v.get("color") for v in variants if v.get("color")}) | |
| sizes = sorted({v.get("size") for v in variants if v.get("size")}) | |
| return { | |
| "Color": colors, | |
| "Size": sizes, | |
| } | |
| def run(currency: str) -> Generator[Tuple[str, str], None, None]: | |
| logs: List[str] = [] | |
| result: Dict[str, Any] = {} | |
| def flush(): | |
| return "\n".join(logs), json.dumps(result, indent=2) | |
| try: | |
| _log(logs, "START") | |
| yield flush() | |
| shops = _req("GET", "/v1/shops.json", logs=logs) | |
| _log(logs, f"SHOP_LIST {json.dumps(shops)}") | |
| yield flush() | |
| fd, path = tempfile.mkstemp(prefix="shops_", suffix=".json") | |
| os.close(fd) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(shops, f, indent=2) | |
| _log(logs, f"SHOPS_JSON_WRITTEN {path}") | |
| yield flush() | |
| blob = _find_first_valid_pair(logs) | |
| result = _build_product(blob, currency or "USD", logs) | |
| _log(logs, "DONE") | |
| yield flush() | |
| except Exception as e: | |
| _log(logs, f"ERROR: {e}") | |
| result = {"error": str(e)} | |
| yield flush() | |
| def phase_b(currency: str) -> Generator[Tuple[str, str], None, None]: | |
| logs: List[str] = [] | |
| result: Dict[str, Any] = {} | |
| def flush(): | |
| return "\n".join(logs), json.dumps(result, indent=2) | |
| try: | |
| _log(logs, "PHASE_B_START") | |
| yield flush() | |
| shops = _req("GET", "/v1/shops.json", logs=logs) | |
| _log(logs, f"SHOP_LIST {json.dumps(shops)}") | |
| yield flush() | |
| shop_id = _pick_shop_id(shops) | |
| _log(logs, f"SHOP_ID {shop_id}") | |
| yield flush() | |
| first_blob = _find_first_valid_pair(logs) | |
| bp_obj = first_blob.get("blueprint") or {} | |
| bp_id = bp_obj.get("id") | |
| if bp_id is None: | |
| raise RuntimeError("First valid blob missing blueprint id.") | |
| _log(logs, f"PHASE_B_BLUEPRINT {bp_id}") | |
| result["phaseBBlueprintId"] = bp_id | |
| yield flush() | |
| provider_blobs = _get_all_provider_blobs_for_blueprint(logs, bp_obj) | |
| result["providerCount"] = len(provider_blobs) | |
| _log(logs, f"PHASE_B_PROVIDER_COUNT {len(provider_blobs)}") | |
| yield flush() | |
| upload = _upload_grid_from_file(logs) | |
| result["gridUpload"] = upload | |
| yield flush() | |
| provider_runs: List[Dict[str, Any]] = [] | |
| currency_code = currency or "USD" | |
| for idx, blob in enumerate(provider_blobs): | |
| provider_details = blob.get("providerDetails") or {} | |
| provider_meta = blob.get("provider") or {} | |
| provider_id = provider_meta.get("id") | |
| provider_name = ( | |
| provider_details.get("title") | |
| or provider_details.get("name") | |
| or str(provider_id) | |
| ) | |
| _log( | |
| logs, | |
| f"PROVIDER_RUN index={idx} provider_id={provider_id} " | |
| f"name={provider_name}", | |
| ) | |
| yield flush() | |
| product_info_full = _build_product(blob, currency_code, logs) | |
| _log_catalog_variants(logs, product_info_full, limit=75) | |
| yield flush() | |
| variants_all = product_info_full.get("variants") or [] | |
| if not isinstance(variants_all, list) or not variants_all: | |
| _log(logs, f"PROVIDER_SKIP provider_id={provider_id} reason=no_variants_after_build") | |
| yield flush() | |
| continue | |
| total_variants = len(variants_all) | |
| if total_variants <= HARD_MAX_VARIANTS_PER_PRODUCT: | |
| chunk_index = 0 | |
| total_chunks = 1 | |
| p_info_chunk = dict(product_info_full) | |
| p_info_chunk["variants"] = variants_all | |
| p_info_chunk["options"] = _rebuild_options_for_variants(variants_all) | |
| p_info_chunk["_allVariants"] = variants_all | |
| p_info_chunk["_enabledVariantIds"] = [int(v.get("id")) for v in variants_all if v.get("id") is not None] | |
| _log( | |
| logs, | |
| f"PROVIDER_SINGLE product provider_id={provider_id} name={provider_name} " | |
| f"variants={total_variants}", | |
| ) | |
| yield flush() | |
| created = _create_product_all_variants_with_grid( | |
| logs, | |
| shop_id=shop_id, | |
| blob=blob, | |
| product_info=p_info_chunk, | |
| upload=upload, | |
| ) | |
| created_id = str(created.get("id") or "") | |
| if not created_id: | |
| raise RuntimeError("Created product response missing id.") | |
| yield flush() | |
| prod1 = _get_product(logs, shop_id, created_id) | |
| yield flush() | |
| _log( | |
| logs, | |
| f"COMPARE_COUNTS provider={provider_id} chunk_index={chunk_index} " | |
| f"catalog_chunk={len(variants_all)} " | |
| f"shop={len(prod1.get('variants') or [])}", | |
| ) | |
| yield flush() | |
| upd = _update_prices_to_cost_plus_margin(logs, shop_id, created_id, prod1) | |
| yield flush() | |
| prod2 = _get_product(logs, shop_id, created_id) | |
| yield flush() | |
| provider_runs.append( | |
| { | |
| "providerId": provider_id, | |
| "providerName": provider_name, | |
| "chunkIndex": chunk_index, | |
| "totalChunks": total_chunks, | |
| "catalogVariantCountTotal": total_variants, | |
| "catalogVariantCountChunk": len(variants_all), | |
| "shopVariantCountAfterCreate": len(prod1.get("variants") or []), | |
| "shopVariantCountFinal": len(prod2.get("variants") or []), | |
| "productId": created_id, | |
| "priceUpdateResponse": upd, | |
| } | |
| ) | |
| else: | |
| total_chunks = (total_variants + CHUNK_TARGET_VARIANTS - 1) // CHUNK_TARGET_VARIANTS | |
| chunk_index = 0 | |
| offset = 0 | |
| while offset < total_variants: | |
| chunk = variants_all[offset: offset + CHUNK_TARGET_VARIANTS] | |
| p_info_chunk = dict(product_info_full) | |
| p_info_chunk["variants"] = chunk | |
| p_info_chunk["options"] = _rebuild_options_for_variants(chunk) | |
| p_info_chunk["_allVariants"] = variants_all | |
| p_info_chunk["_enabledVariantIds"] = [int(v.get("id")) for v in chunk if v.get("id") is not None] | |
| p_info_chunk["_dup"] = bool(chunk_index > 0) | |
| _log( | |
| logs, | |
| f"PROVIDER_CHUNK provider_id={provider_id} name={provider_name} " | |
| f"chunk_index={chunk_index} total_chunks={total_chunks} " | |
| f"offset={offset} size={len(chunk)} total_variants={total_variants}", | |
| ) | |
| yield flush() | |
| created = _create_product_all_variants_with_grid( | |
| logs, | |
| shop_id=shop_id, | |
| blob=blob, | |
| product_info=p_info_chunk, | |
| upload=upload, | |
| ) | |
| created_id = str(created.get("id") or "") | |
| if not created_id: | |
| raise RuntimeError("Created product response missing id.") | |
| yield flush() | |
| prod1 = _get_product(logs, shop_id, created_id) | |
| yield flush() | |
| _log( | |
| logs, | |
| f"COMPARE_COUNTS provider={provider_id} chunk_index={chunk_index} " | |
| f"catalog_chunk={len(chunk)} " | |
| f"shop={len(prod1.get('variants') or [])}", | |
| ) | |
| yield flush() | |
| upd = _update_prices_to_cost_plus_margin(logs, shop_id, created_id, prod1) | |
| yield flush() | |
| prod2 = _get_product(logs, shop_id, created_id) | |
| yield flush() | |
| provider_runs.append( | |
| { | |
| "providerId": provider_id, | |
| "providerName": provider_name, | |
| "chunkIndex": chunk_index, | |
| "totalChunks": total_chunks, | |
| "catalogVariantCountTotal": total_variants, | |
| "catalogVariantCountChunk": len(chunk), | |
| "shopVariantCountAfterCreate": len(prod1.get("variants") or []), | |
| "shopVariantCountFinal": len(prod2.get("variants") or []), | |
| "productId": created_id, | |
| "priceUpdateResponse": upd, | |
| } | |
| ) | |
| offset += CHUNK_TARGET_VARIANTS | |
| chunk_index += 1 | |
| result["providerRuns"] = provider_runs | |
| _log(logs, "PHASE_B_DONE") | |
| yield flush() | |
| except Exception as e: | |
| _log(logs, f"ERROR: {e}") | |
| result = {"error": str(e)} | |
| yield flush() | |
| with gr.Blocks(title="Printify Catalog Probe") as demo: | |
| gr.Markdown("Extract and normalize ONE Printify blueprint into provider-specific JSON objects and products.") | |
| currency = gr.Textbox(label="Currency", value="USD") | |
| btn = gr.Button("Run") | |
| btn_b = gr.Button("Phase B (Test)") | |
| logs = gr.Textbox(label="Logs", lines=18) | |
| out = gr.Textbox(label="Output JSON", lines=18) | |
| btn.click(run, inputs=[currency], outputs=[logs, out]) | |
| btn_b.click(phase_b, inputs=[currency], outputs=[logs, out]) | |
| demo.queue().launch() |