import os import time import json import tempfile from typing import Dict, Any, List, Generator, Tuple, Optional import requests import gradio as gr import base64 import re PRINTIFY_BASE = "https://api.printify.com" DEFAULT_BASE_PRICE = 0.001 GRID_FILENAME = "grid.png" HARD_MAX_VARIANTS_PER_PRODUCT = 100 CHUNK_TARGET_VARIANTS = 85 def _now() -> str: return time.strftime("%H:%M:%S") def _sleep(ms: int): time.sleep(ms / 1000) def _auth_headers() -> Dict[str, str]: token = os.environ.get("PRINTIFY_API_TOKEN") if not token: raise RuntimeError("Missing PRINTIFY_API_TOKEN (HF Space Secret).") return {"Authorization": f"Bearer {token}"} def _parse_retry_after_seconds(r: requests.Response) -> Optional[float]: ra = r.headers.get("Retry-After") if ra: try: return float(ra) except Exception: pass reset = r.headers.get("X-RateLimit-Reset") or r.headers.get("X-RateLimit-Reset-After") if reset: try: return float(reset) except Exception: pass try: txt = (r.text or "")[:2000] except Exception: txt = "" m = re.search(r"(\d+(\.\d+)?)\s*(s|sec|secs|second|seconds)\b", txt, re.I) if m: try: return float(m.group(1)) except Exception: return None return None def _req( method: str, path: str, json_body: Optional[Dict[str, Any]] = None, logs: Optional[List[str]] = None, ) -> Any: kwargs: Dict[str, Any] = { "headers": _auth_headers(), "timeout": 60, } if json_body is not None: kwargs["json"] = json_body while True: r = requests.request( method, f"{PRINTIFY_BASE}{path}", **kwargs, ) if r.status_code == 429: wait_s = _parse_retry_after_seconds(r) if logs is not None: _log( logs, f"RATE_LIMIT 429 path={path} retry_after={wait_s} " f"headers={dict(r.headers)} body={r.text[:400]}", ) if wait_s is None: raise RuntimeError(f"HTTP 429 without retry instruction: {r.text[:2000]}") time.sleep(max(0.0, wait_s)) continue if r.status_code >= 500: wait_s = _parse_retry_after_seconds(r) if logs is not None: _log( logs, f"SERVER_ERROR {r.status_code} path={path} retry_after={wait_s} " f"body={r.text[:400]}", ) if wait_s is None: raise RuntimeError(f"HTTP {r.status_code}: {r.text[:2000]}") time.sleep(max(0.0, wait_s)) continue if r.status_code >= 400: raise RuntimeError(f"HTTP {r.status_code}: {r.text[:2000]}") if r.text: return r.json() return {} def _log(logs: List[str], msg: str): logs.append(f"[{_now()}] {msg}") def _find_first_valid_pair(logs: List[str]) -> Dict[str, Any]: _log(logs, "Listing blueprints") blueprints = _req("GET", "/v1/catalog/blueprints.json", logs=logs) for bp in blueprints: bp_id = str(bp["id"]) _log(logs, f"Blueprint {bp_id}: fetching providers") providers = _req( "GET", f"/v1/catalog/blueprints/{bp_id}/print_providers.json", logs=logs, ) for p in providers: p_id = str(p["id"]) _log(logs, f"Blueprint {bp_id} / Provider {p_id}: fetching variants") vr = _req( "GET", f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/variants.json?show-out-of-stock=1", logs=logs, ) variants = vr.get("variants") if isinstance(variants, list) and variants: _log(logs, f"FOUND {len(variants)} variants") return { "blueprint": bp, "provider": p, "variants": variants, "blueprintDetails": _req( "GET", f"/v1/catalog/blueprints/{bp_id}.json", logs=logs, ), "providerDetails": _req( "GET", f"/v1/catalog/print_providers/{p_id}.json", logs=logs, ), "shippingInfo": _req( "GET", f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/shipping.json", logs=logs, ), } raise RuntimeError("No blueprint/provider pair with variants found.") def _get_all_provider_blobs_for_blueprint( logs: List[str], blueprint: Dict[str, Any], ) -> List[Dict[str, Any]]: bp_id = str(blueprint.get("id")) if not bp_id: raise RuntimeError("Blueprint object missing id.") _log(logs, f"BLUEPRINT_ALL_PROVIDERS blueprint_id={bp_id}: fetching providers") providers = _req( "GET", f"/v1/catalog/blueprints/{bp_id}/print_providers.json", logs=logs, ) if not isinstance(providers, list) or not providers: raise RuntimeError(f"No providers found for blueprint {bp_id}.") bp_details = _req( "GET", f"/v1/catalog/blueprints/{bp_id}.json", logs=logs, ) blobs: List[Dict[str, Any]] = [] for p in providers: if not isinstance(p, dict): continue p_id = str(p.get("id")) if not p_id: continue _log(logs, f"Blueprint {bp_id} / Provider {p_id}: fetching variants (ALL)") vr = _req( "GET", f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/variants.json?show-out-of-stock=1", logs=logs, ) variants = vr.get("variants") count = len(variants) if isinstance(variants, list) else 0 _log( logs, f"PROVIDER_VARIANTS blueprint={bp_id} provider={p_id} count={count}", ) if not isinstance(variants, list) or not variants: continue provider_details = _req( "GET", f"/v1/catalog/print_providers/{p_id}.json", logs=logs, ) shipping_info = _req( "GET", f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/shipping.json", logs=logs, ) blobs.append( { "blueprint": blueprint, "provider": p, "variants": variants, "blueprintDetails": bp_details, "providerDetails": provider_details, "shippingInfo": shipping_info, } ) if not blobs: raise RuntimeError(f"No providers with variants for blueprint {bp_id}.") return blobs def _build_product(blob: Dict[str, Any], currency: str, logs: List[str]) -> Dict[str, Any]: variants = blob["variants"] snapshot = [] for v in variants: cents = v.get("price") cents = int(cents) if isinstance(cents, (int, str)) and str(cents).isdigit() else None ph = v.get("placeholders") if not isinstance(ph, list): ph = [] placeholders = [] for x in ph: if not isinstance(x, dict): continue placeholders.append({ "position": x.get("position"), "width": x.get("width"), "height": x.get("height"), }) snapshot.append({ "id": v.get("id"), "sku": v.get("sku"), "size": (v.get("options") or {}).get("size"), "color": (v.get("options") or {}).get("color"), "priceCents": cents, "price": round(cents / 100, 2) if cents is not None else None, "placeholders": placeholders, }) _log(logs, f"VARIANT_SNAPSHOT sample={json.dumps(snapshot[:20])}") print_area_sample = [] for s in snapshot[:20]: print_area_sample.append({ "id": s.get("id"), "size": s.get("size"), "color": s.get("color"), "placeholders": s.get("placeholders"), }) _log(logs, f"PRINT_AREA_SNAPSHOT sample={json.dumps(print_area_sample)}") print_areas_by_variant: Dict[str, Any] = {} for s in snapshot: vid = s.get("id") if vid is None: continue per_pos: Dict[str, Any] = {} for ph in (s.get("placeholders") or []): pos = ph.get("position") if not pos: continue per_pos[str(pos)] = {"width": ph.get("width"), "height": ph.get("height")} print_areas_by_variant[str(vid)] = per_pos all_positions = sorted({ ph.get("position") for s in snapshot for ph in (s.get("placeholders") or []) if ph.get("position") }) all_cents = [v["priceCents"] for v in snapshot if v["priceCents"] is not None] min_price = round(min(all_cents) / 100, 2) if all_cents else DEFAULT_BASE_PRICE colors = sorted({v["color"] for v in snapshot if v["color"]}) sizes = sorted({v["size"] for v in snapshot if v["size"]}) provider = blob["providerDetails"] loc = provider.get("location") or {} loc_str = ", ".join(x for x in [loc.get("city"), loc.get("country")] if x) or "empty" description = ( (blob["blueprintDetails"].get("description") or "empty") + f"\n\nFulfilled by {provider.get('title') or provider.get('name')} — {loc_str}" ) return { "name": blob["blueprintDetails"].get("title"), "currency": currency, "price": min_price, "tags": ["Printify", f"Provider: {provider.get('title') or provider.get('name')}"], "options": { "Color": colors, "Size": sizes, }, "description": description, "variants": snapshot, "printAreas": { "positions": all_positions, "byVariantId": print_areas_by_variant, "units": "px", }, "raw": blob, } def _pick_shop_id(shops_resp: Any) -> str: if isinstance(shops_resp, list): items = shops_resp elif isinstance(shops_resp, dict): items = shops_resp.get("data") or shops_resp.get("shops") or shops_resp.get("items") or [] else: items = [] if not isinstance(items, list) or not items: raise RuntimeError("No shops found on /v1/shops.json for this token.") forced = (os.environ.get("PRINTIFY_SHOP_ID") or "").strip() if forced: for s in items: if isinstance(s, dict) and str(s.get("id")) == forced: return str(s["id"]) raise RuntimeError(f"Preferred shop '{forced}' not found; refusing to fallback.") preferred_name = os.environ.get("PRINTIFY_SHOP_NAME", "Atheria").strip().lower() for s in items: if not isinstance(s, dict): continue title = (s.get("title") or s.get("name") or "").strip().lower() if title and title == preferred_name: return str(s["id"]) raise RuntimeError(f"Preferred shop name '{os.environ.get('PRINTIFY_SHOP_NAME','Atheria')}' not found; refusing to fallback.") def _upload_grid_from_file(logs: List[str]) -> Dict[str, Any]: path = os.path.join(os.getcwd(), GRID_FILENAME) if not os.path.exists(path): raise RuntimeError(f"Grid image not found at {path}") with open(path, "rb") as f: raw = f.read() b64 = base64.b64encode(raw).decode("ascii") payload = { "file_name": GRID_FILENAME, "contents": b64, } _log(logs, f"UPLOADING_GRID path={path} bytes={len(raw)} b64len={len(b64)}") resp = _req("POST", "/v1/uploads/images.json", json_body=payload, logs=logs) if not isinstance(resp, dict) or not resp.get("id"): raise RuntimeError(f"Unexpected upload response: {str(resp)[:500]}") _log( logs, f"GRID_UPLOAD id={resp.get('id')} file={resp.get('file_name')} " f"w={resp.get('width')} h={resp.get('height')}", ) return resp def _scale_fill(ph_w: float, ph_h: float, img_w: float, img_h: float) -> float: if ph_w <= 0 or ph_h <= 0 or img_w <= 0 or img_h <= 0: return 1.0 s = (ph_h * img_w) / (ph_w * img_h) if s < 1.0: s = 1.0 return float(s) def _log_catalog_variants(logs: List[str], product_info: Dict[str, Any], limit: int = 50): variants = product_info.get("variants") or [] total = len(variants) if isinstance(variants, list) else 0 _log(logs, f"CATALOG_VARIANTS_TOTAL {total}") if not isinstance(variants, list): return n = 0 for v in variants: if not isinstance(v, dict): continue vid = v.get("id") size = v.get("size") color = v.get("color") ph = v.get("placeholders") or [] phn = len(ph) if isinstance(ph, list) else 0 _log(logs, f"CATALOG_VARIANT id={vid} size={size} color={color} placeholders={phn}") n += 1 if n >= limit: break def _create_product_all_variants_with_grid( logs: List[str], shop_id: str, blob: Dict[str, Any], product_info: Dict[str, Any], upload: Dict[str, Any], ) -> Dict[str, Any]: bp_id = (blob.get("blueprint") or {}).get("id") provider_id = (blob.get("provider") or {}).get("id") if bp_id is None or provider_id is None: raise RuntimeError("Missing blueprint or provider id.") enabled_variants = product_info.get("variants") or [] if not isinstance(enabled_variants, list) or not enabled_variants: raise RuntimeError("No variants to create product with.") all_variants = product_info.get("_allVariants") or enabled_variants if not isinstance(all_variants, list) or not all_variants: raise RuntimeError("No all-variants universe found for this provider.") enabled_ids_raw = product_info.get("_enabledVariantIds") or [] enabled_ids = set() for x in enabled_ids_raw: try: enabled_ids.add(int(x)) except Exception: pass if not enabled_ids: for v in enabled_variants: vid = v.get("id") if vid is None: continue try: enabled_ids.add(int(vid)) except Exception: continue img_id = upload.get("id") img_w = float(upload.get("width") or 0) img_h = float(upload.get("height") or 0) variants_payload = [] print_areas_payload = [] enabled_count = 0 for v in all_variants: if not isinstance(v, dict): continue vid = v.get("id") if vid is None: continue try: vid_i = int(vid) except Exception: continue is_on = vid_i in enabled_ids if is_on: enabled_count += 1 variants_payload.append({ "id": vid_i, "price": 1, "is_enabled": bool(is_on), }) for v in enabled_variants: if not isinstance(v, dict): continue vid = v.get("id") if vid is None: continue try: vid_i = int(vid) except Exception: continue if vid_i not in enabled_ids: continue placeholders = v.get("placeholders") or [] if not isinstance(placeholders, list) or not placeholders: continue ph_payload = [] for ph in placeholders: if not isinstance(ph, dict): continue pos = ph.get("position") pw = ph.get("width") phh = ph.get("height") if not pos or pw is None or phh is None: continue try: pwf = float(pw) phf = float(phh) except Exception: continue scale = _scale_fill(pwf, phf, img_w, img_h) ph_payload.append({ "position": pos, "images": [{ "id": img_id, "x": 0.5, "y": 0.5, "scale": scale, "angle": 0, }], }) if ph_payload: print_areas_payload.append({ "variant_ids": [vid_i], "placeholders": ph_payload, }) if not variants_payload: raise RuntimeError("No valid variant ids to create product with.") if not print_areas_payload: raise RuntimeError("No placeholders payload generated for any enabled variant.") provider_details = blob.get("providerDetails") or {} provider_name = provider_details.get("title") or provider_details.get("name") or str(provider_id) base_title = (product_info.get("name") or "Printify Grid Test") + f" — {provider_name}" suffix = f" [bp={bp_id} pp={provider_id}]" if product_info.get("_dup") is True: title = base_title + " — Duplicate" + f" [bp={bp_id} pp={provider_id} dup]" else: title = base_title + suffix description = product_info.get("description") or "" payload = { "title": title, "description": description, "blueprint_id": int(bp_id), "print_provider_id": int(provider_id), "variants": variants_payload, "print_areas": print_areas_payload, } _log( logs, f"PHASE_B_CREATE shop_id={shop_id} blueprint_id={bp_id} provider_id={provider_id} " f"variants_payload_total={len(variants_payload)} enabled={enabled_count} print_areas_payload={len(print_areas_payload)}", ) created = _req( "POST", f"/v1/shops/{shop_id}/products.json", json_body=payload, logs=logs, ) if not isinstance(created, Dict): raise RuntimeError(f"Unexpected create response: {str(created)[:500]}") _log(logs, f"PHASE_B_CREATED product_id={created.get('id')}") return created def _get_product(logs: List[str], shop_id: str, product_id: str) -> Dict[str, Any]: prod = _req("GET", f"/v1/shops/{shop_id}/products/{product_id}.json", logs=logs) if not isinstance(prod, dict): raise RuntimeError(f"Unexpected product response: {str(prod)[:500]}") variants = prod.get("variants") or [] _log(logs, f"SHOP_VARIANTS_TOTAL {len(variants) if isinstance(variants, list) else 0}") if isinstance(variants, list): for v in variants: if not isinstance(v, dict): continue _log( logs, f"SHOP_VARIANT id={v.get('id')} enabled={v.get('is_enabled')} " f"cost={v.get('cost')} price={v.get('price')}", ) return prod def _margin_pct() -> float: raw = (os.environ.get("PRINTIFY_MARGIN_PCT") or "").strip() if not raw: return 0.0 try: return float(raw) except Exception: return 0.0 def _update_prices_to_cost_plus_margin( logs: List[str], shop_id: str, product_id: str, product: Dict[str, Any], ) -> Dict[str, Any]: variants = product.get("variants") or [] if not isinstance(variants, list) or not variants: raise RuntimeError("No variants found on created product for price update.") m = _margin_pct() payload_variants = [] for v in variants: if not isinstance(v, dict): continue if v.get("is_enabled") is not True: continue vid = v.get("id") cost = v.get("cost") if vid is None or cost is None: continue try: cost_cents = int(cost) except Exception: continue price_cents = int(round(cost_cents * (1.0 + m))) if price_cents < 1: price_cents = 1 payload_variants.append({ "id": int(vid), "price": price_cents, "is_enabled": True, }) _log(logs, f"PRICE_SET id={vid} cost={cost_cents} price={price_cents} margin_pct={m}") if not payload_variants: raise RuntimeError("No variant prices could be computed from costs.") upd = _req( "PUT", f"/v1/shops/{shop_id}/products/{product_id}.json", json_body={"variants": payload_variants}, logs=logs, ) if not isinstance(upd, dict): raise RuntimeError(f"Unexpected update response: {str(upd)[:500]}") _log(logs, f"PRICE_UPDATE_DONE variants={len(payload_variants)}") return upd def _rebuild_options_for_variants(variants: List[Dict[str, Any]]) -> Dict[str, List[str]]: colors = sorted({v.get("color") for v in variants if v.get("color")}) sizes = sorted({v.get("size") for v in variants if v.get("size")}) return { "Color": colors, "Size": sizes, } def run(currency: str) -> Generator[Tuple[str, str], None, None]: logs: List[str] = [] result: Dict[str, Any] = {} def flush(): return "\n".join(logs), json.dumps(result, indent=2) try: _log(logs, "START") yield flush() shops = _req("GET", "/v1/shops.json", logs=logs) _log(logs, f"SHOP_LIST {json.dumps(shops)}") yield flush() fd, path = tempfile.mkstemp(prefix="shops_", suffix=".json") os.close(fd) with open(path, "w", encoding="utf-8") as f: json.dump(shops, f, indent=2) _log(logs, f"SHOPS_JSON_WRITTEN {path}") yield flush() blob = _find_first_valid_pair(logs) result = _build_product(blob, currency or "USD", logs) _log(logs, "DONE") yield flush() except Exception as e: _log(logs, f"ERROR: {e}") result = {"error": str(e)} yield flush() def phase_b(currency: str) -> Generator[Tuple[str, str], None, None]: logs: List[str] = [] result: Dict[str, Any] = {} def flush(): return "\n".join(logs), json.dumps(result, indent=2) try: _log(logs, "PHASE_B_START") yield flush() shops = _req("GET", "/v1/shops.json", logs=logs) _log(logs, f"SHOP_LIST {json.dumps(shops)}") yield flush() shop_id = _pick_shop_id(shops) _log(logs, f"SHOP_ID {shop_id}") yield flush() first_blob = _find_first_valid_pair(logs) bp_obj = first_blob.get("blueprint") or {} bp_id = bp_obj.get("id") if bp_id is None: raise RuntimeError("First valid blob missing blueprint id.") _log(logs, f"PHASE_B_BLUEPRINT {bp_id}") result["phaseBBlueprintId"] = bp_id yield flush() provider_blobs = _get_all_provider_blobs_for_blueprint(logs, bp_obj) result["providerCount"] = len(provider_blobs) _log(logs, f"PHASE_B_PROVIDER_COUNT {len(provider_blobs)}") yield flush() upload = _upload_grid_from_file(logs) result["gridUpload"] = upload yield flush() provider_runs: List[Dict[str, Any]] = [] currency_code = currency or "USD" for idx, blob in enumerate(provider_blobs): provider_details = blob.get("providerDetails") or {} provider_meta = blob.get("provider") or {} provider_id = provider_meta.get("id") provider_name = ( provider_details.get("title") or provider_details.get("name") or str(provider_id) ) _log( logs, f"PROVIDER_RUN index={idx} provider_id={provider_id} " f"name={provider_name}", ) yield flush() product_info_full = _build_product(blob, currency_code, logs) _log_catalog_variants(logs, product_info_full, limit=75) yield flush() variants_all = product_info_full.get("variants") or [] if not isinstance(variants_all, list) or not variants_all: _log(logs, f"PROVIDER_SKIP provider_id={provider_id} reason=no_variants_after_build") yield flush() continue total_variants = len(variants_all) if total_variants <= HARD_MAX_VARIANTS_PER_PRODUCT: chunk_index = 0 total_chunks = 1 p_info_chunk = dict(product_info_full) p_info_chunk["variants"] = variants_all p_info_chunk["options"] = _rebuild_options_for_variants(variants_all) p_info_chunk["_allVariants"] = variants_all p_info_chunk["_enabledVariantIds"] = [int(v.get("id")) for v in variants_all if v.get("id") is not None] _log( logs, f"PROVIDER_SINGLE product provider_id={provider_id} name={provider_name} " f"variants={total_variants}", ) yield flush() created = _create_product_all_variants_with_grid( logs, shop_id=shop_id, blob=blob, product_info=p_info_chunk, upload=upload, ) created_id = str(created.get("id") or "") if not created_id: raise RuntimeError("Created product response missing id.") yield flush() prod1 = _get_product(logs, shop_id, created_id) yield flush() _log( logs, f"COMPARE_COUNTS provider={provider_id} chunk_index={chunk_index} " f"catalog_chunk={len(variants_all)} " f"shop={len(prod1.get('variants') or [])}", ) yield flush() upd = _update_prices_to_cost_plus_margin(logs, shop_id, created_id, prod1) yield flush() prod2 = _get_product(logs, shop_id, created_id) yield flush() provider_runs.append( { "providerId": provider_id, "providerName": provider_name, "chunkIndex": chunk_index, "totalChunks": total_chunks, "catalogVariantCountTotal": total_variants, "catalogVariantCountChunk": len(variants_all), "shopVariantCountAfterCreate": len(prod1.get("variants") or []), "shopVariantCountFinal": len(prod2.get("variants") or []), "productId": created_id, "priceUpdateResponse": upd, } ) else: total_chunks = (total_variants + CHUNK_TARGET_VARIANTS - 1) // CHUNK_TARGET_VARIANTS chunk_index = 0 offset = 0 while offset < total_variants: chunk = variants_all[offset: offset + CHUNK_TARGET_VARIANTS] p_info_chunk = dict(product_info_full) p_info_chunk["variants"] = chunk p_info_chunk["options"] = _rebuild_options_for_variants(chunk) p_info_chunk["_allVariants"] = variants_all p_info_chunk["_enabledVariantIds"] = [int(v.get("id")) for v in chunk if v.get("id") is not None] p_info_chunk["_dup"] = bool(chunk_index > 0) _log( logs, f"PROVIDER_CHUNK provider_id={provider_id} name={provider_name} " f"chunk_index={chunk_index} total_chunks={total_chunks} " f"offset={offset} size={len(chunk)} total_variants={total_variants}", ) yield flush() created = _create_product_all_variants_with_grid( logs, shop_id=shop_id, blob=blob, product_info=p_info_chunk, upload=upload, ) created_id = str(created.get("id") or "") if not created_id: raise RuntimeError("Created product response missing id.") yield flush() prod1 = _get_product(logs, shop_id, created_id) yield flush() _log( logs, f"COMPARE_COUNTS provider={provider_id} chunk_index={chunk_index} " f"catalog_chunk={len(chunk)} " f"shop={len(prod1.get('variants') or [])}", ) yield flush() upd = _update_prices_to_cost_plus_margin(logs, shop_id, created_id, prod1) yield flush() prod2 = _get_product(logs, shop_id, created_id) yield flush() provider_runs.append( { "providerId": provider_id, "providerName": provider_name, "chunkIndex": chunk_index, "totalChunks": total_chunks, "catalogVariantCountTotal": total_variants, "catalogVariantCountChunk": len(chunk), "shopVariantCountAfterCreate": len(prod1.get("variants") or []), "shopVariantCountFinal": len(prod2.get("variants") or []), "productId": created_id, "priceUpdateResponse": upd, } ) offset += CHUNK_TARGET_VARIANTS chunk_index += 1 result["providerRuns"] = provider_runs _log(logs, "PHASE_B_DONE") yield flush() except Exception as e: _log(logs, f"ERROR: {e}") result = {"error": str(e)} yield flush() with gr.Blocks(title="Printify Catalog Probe") as demo: gr.Markdown("Extract and normalize ONE Printify blueprint into provider-specific JSON objects and products.") currency = gr.Textbox(label="Currency", value="USD") btn = gr.Button("Run") btn_b = gr.Button("Phase B (Test)") logs = gr.Textbox(label="Logs", lines=18) out = gr.Textbox(label="Output JSON", lines=18) btn.click(run, inputs=[currency], outputs=[logs, out]) btn_b.click(phase_b, inputs=[currency], outputs=[logs, out]) demo.queue().launch()