PrintifyALL / app.py
Theflame47's picture
Update app.py
45fcf99 verified
import os
import time
import json
import tempfile
from typing import Dict, Any, List, Generator, Tuple, Optional
import requests
import gradio as gr
import base64
import re
PRINTIFY_BASE = "https://api.printify.com"
DEFAULT_BASE_PRICE = 0.001
GRID_FILENAME = "grid.png"
HARD_MAX_VARIANTS_PER_PRODUCT = 100
CHUNK_TARGET_VARIANTS = 85
def _now() -> str:
return time.strftime("%H:%M:%S")
def _sleep(ms: int):
time.sleep(ms / 1000)
def _auth_headers() -> Dict[str, str]:
token = os.environ.get("PRINTIFY_API_TOKEN")
if not token:
raise RuntimeError("Missing PRINTIFY_API_TOKEN (HF Space Secret).")
return {"Authorization": f"Bearer {token}"}
def _parse_retry_after_seconds(r: requests.Response) -> Optional[float]:
ra = r.headers.get("Retry-After")
if ra:
try:
return float(ra)
except Exception:
pass
reset = r.headers.get("X-RateLimit-Reset") or r.headers.get("X-RateLimit-Reset-After")
if reset:
try:
return float(reset)
except Exception:
pass
try:
txt = (r.text or "")[:2000]
except Exception:
txt = ""
m = re.search(r"(\d+(\.\d+)?)\s*(s|sec|secs|second|seconds)\b", txt, re.I)
if m:
try:
return float(m.group(1))
except Exception:
return None
return None
def _req(
method: str,
path: str,
json_body: Optional[Dict[str, Any]] = None,
logs: Optional[List[str]] = None,
) -> Any:
kwargs: Dict[str, Any] = {
"headers": _auth_headers(),
"timeout": 60,
}
if json_body is not None:
kwargs["json"] = json_body
while True:
r = requests.request(
method,
f"{PRINTIFY_BASE}{path}",
**kwargs,
)
if r.status_code == 429:
wait_s = _parse_retry_after_seconds(r)
if logs is not None:
_log(
logs,
f"RATE_LIMIT 429 path={path} retry_after={wait_s} "
f"headers={dict(r.headers)} body={r.text[:400]}",
)
if wait_s is None:
raise RuntimeError(f"HTTP 429 without retry instruction: {r.text[:2000]}")
time.sleep(max(0.0, wait_s))
continue
if r.status_code >= 500:
wait_s = _parse_retry_after_seconds(r)
if logs is not None:
_log(
logs,
f"SERVER_ERROR {r.status_code} path={path} retry_after={wait_s} "
f"body={r.text[:400]}",
)
if wait_s is None:
raise RuntimeError(f"HTTP {r.status_code}: {r.text[:2000]}")
time.sleep(max(0.0, wait_s))
continue
if r.status_code >= 400:
raise RuntimeError(f"HTTP {r.status_code}: {r.text[:2000]}")
if r.text:
return r.json()
return {}
def _log(logs: List[str], msg: str):
logs.append(f"[{_now()}] {msg}")
def _find_first_valid_pair(logs: List[str]) -> Dict[str, Any]:
_log(logs, "Listing blueprints")
blueprints = _req("GET", "/v1/catalog/blueprints.json", logs=logs)
for bp in blueprints:
bp_id = str(bp["id"])
_log(logs, f"Blueprint {bp_id}: fetching providers")
providers = _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}/print_providers.json",
logs=logs,
)
for p in providers:
p_id = str(p["id"])
_log(logs, f"Blueprint {bp_id} / Provider {p_id}: fetching variants")
vr = _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/variants.json?show-out-of-stock=1",
logs=logs,
)
variants = vr.get("variants")
if isinstance(variants, list) and variants:
_log(logs, f"FOUND {len(variants)} variants")
return {
"blueprint": bp,
"provider": p,
"variants": variants,
"blueprintDetails": _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}.json",
logs=logs,
),
"providerDetails": _req(
"GET",
f"/v1/catalog/print_providers/{p_id}.json",
logs=logs,
),
"shippingInfo": _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/shipping.json",
logs=logs,
),
}
raise RuntimeError("No blueprint/provider pair with variants found.")
def _get_all_provider_blobs_for_blueprint(
logs: List[str],
blueprint: Dict[str, Any],
) -> List[Dict[str, Any]]:
bp_id = str(blueprint.get("id"))
if not bp_id:
raise RuntimeError("Blueprint object missing id.")
_log(logs, f"BLUEPRINT_ALL_PROVIDERS blueprint_id={bp_id}: fetching providers")
providers = _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}/print_providers.json",
logs=logs,
)
if not isinstance(providers, list) or not providers:
raise RuntimeError(f"No providers found for blueprint {bp_id}.")
bp_details = _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}.json",
logs=logs,
)
blobs: List[Dict[str, Any]] = []
for p in providers:
if not isinstance(p, dict):
continue
p_id = str(p.get("id"))
if not p_id:
continue
_log(logs, f"Blueprint {bp_id} / Provider {p_id}: fetching variants (ALL)")
vr = _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/variants.json?show-out-of-stock=1",
logs=logs,
)
variants = vr.get("variants")
count = len(variants) if isinstance(variants, list) else 0
_log(
logs,
f"PROVIDER_VARIANTS blueprint={bp_id} provider={p_id} count={count}",
)
if not isinstance(variants, list) or not variants:
continue
provider_details = _req(
"GET",
f"/v1/catalog/print_providers/{p_id}.json",
logs=logs,
)
shipping_info = _req(
"GET",
f"/v1/catalog/blueprints/{bp_id}/print_providers/{p_id}/shipping.json",
logs=logs,
)
blobs.append(
{
"blueprint": blueprint,
"provider": p,
"variants": variants,
"blueprintDetails": bp_details,
"providerDetails": provider_details,
"shippingInfo": shipping_info,
}
)
if not blobs:
raise RuntimeError(f"No providers with variants for blueprint {bp_id}.")
return blobs
def _build_product(blob: Dict[str, Any], currency: str, logs: List[str]) -> Dict[str, Any]:
variants = blob["variants"]
snapshot = []
for v in variants:
cents = v.get("price")
cents = int(cents) if isinstance(cents, (int, str)) and str(cents).isdigit() else None
ph = v.get("placeholders")
if not isinstance(ph, list):
ph = []
placeholders = []
for x in ph:
if not isinstance(x, dict):
continue
placeholders.append({
"position": x.get("position"),
"width": x.get("width"),
"height": x.get("height"),
})
snapshot.append({
"id": v.get("id"),
"sku": v.get("sku"),
"size": (v.get("options") or {}).get("size"),
"color": (v.get("options") or {}).get("color"),
"priceCents": cents,
"price": round(cents / 100, 2) if cents is not None else None,
"placeholders": placeholders,
})
_log(logs, f"VARIANT_SNAPSHOT sample={json.dumps(snapshot[:20])}")
print_area_sample = []
for s in snapshot[:20]:
print_area_sample.append({
"id": s.get("id"),
"size": s.get("size"),
"color": s.get("color"),
"placeholders": s.get("placeholders"),
})
_log(logs, f"PRINT_AREA_SNAPSHOT sample={json.dumps(print_area_sample)}")
print_areas_by_variant: Dict[str, Any] = {}
for s in snapshot:
vid = s.get("id")
if vid is None:
continue
per_pos: Dict[str, Any] = {}
for ph in (s.get("placeholders") or []):
pos = ph.get("position")
if not pos:
continue
per_pos[str(pos)] = {"width": ph.get("width"), "height": ph.get("height")}
print_areas_by_variant[str(vid)] = per_pos
all_positions = sorted({
ph.get("position")
for s in snapshot
for ph in (s.get("placeholders") or [])
if ph.get("position")
})
all_cents = [v["priceCents"] for v in snapshot if v["priceCents"] is not None]
min_price = round(min(all_cents) / 100, 2) if all_cents else DEFAULT_BASE_PRICE
colors = sorted({v["color"] for v in snapshot if v["color"]})
sizes = sorted({v["size"] for v in snapshot if v["size"]})
provider = blob["providerDetails"]
loc = provider.get("location") or {}
loc_str = ", ".join(x for x in [loc.get("city"), loc.get("country")] if x) or "empty"
description = (
(blob["blueprintDetails"].get("description") or "empty")
+ f"\n\nFulfilled by {provider.get('title') or provider.get('name')}{loc_str}"
)
return {
"name": blob["blueprintDetails"].get("title"),
"currency": currency,
"price": min_price,
"tags": ["Printify", f"Provider: {provider.get('title') or provider.get('name')}"],
"options": {
"Color": colors,
"Size": sizes,
},
"description": description,
"variants": snapshot,
"printAreas": {
"positions": all_positions,
"byVariantId": print_areas_by_variant,
"units": "px",
},
"raw": blob,
}
def _pick_shop_id(shops_resp: Any) -> str:
if isinstance(shops_resp, list):
items = shops_resp
elif isinstance(shops_resp, dict):
items = shops_resp.get("data") or shops_resp.get("shops") or shops_resp.get("items") or []
else:
items = []
if not isinstance(items, list) or not items:
raise RuntimeError("No shops found on /v1/shops.json for this token.")
forced = (os.environ.get("PRINTIFY_SHOP_ID") or "").strip()
if forced:
for s in items:
if isinstance(s, dict) and str(s.get("id")) == forced:
return str(s["id"])
raise RuntimeError(f"Preferred shop '{forced}' not found; refusing to fallback.")
preferred_name = os.environ.get("PRINTIFY_SHOP_NAME", "Atheria").strip().lower()
for s in items:
if not isinstance(s, dict):
continue
title = (s.get("title") or s.get("name") or "").strip().lower()
if title and title == preferred_name:
return str(s["id"])
raise RuntimeError(f"Preferred shop name '{os.environ.get('PRINTIFY_SHOP_NAME','Atheria')}' not found; refusing to fallback.")
def _upload_grid_from_file(logs: List[str]) -> Dict[str, Any]:
path = os.path.join(os.getcwd(), GRID_FILENAME)
if not os.path.exists(path):
raise RuntimeError(f"Grid image not found at {path}")
with open(path, "rb") as f:
raw = f.read()
b64 = base64.b64encode(raw).decode("ascii")
payload = {
"file_name": GRID_FILENAME,
"contents": b64,
}
_log(logs, f"UPLOADING_GRID path={path} bytes={len(raw)} b64len={len(b64)}")
resp = _req("POST", "/v1/uploads/images.json", json_body=payload, logs=logs)
if not isinstance(resp, dict) or not resp.get("id"):
raise RuntimeError(f"Unexpected upload response: {str(resp)[:500]}")
_log(
logs,
f"GRID_UPLOAD id={resp.get('id')} file={resp.get('file_name')} "
f"w={resp.get('width')} h={resp.get('height')}",
)
return resp
def _scale_fill(ph_w: float, ph_h: float, img_w: float, img_h: float) -> float:
if ph_w <= 0 or ph_h <= 0 or img_w <= 0 or img_h <= 0:
return 1.0
s = (ph_h * img_w) / (ph_w * img_h)
if s < 1.0:
s = 1.0
return float(s)
def _log_catalog_variants(logs: List[str], product_info: Dict[str, Any], limit: int = 50):
variants = product_info.get("variants") or []
total = len(variants) if isinstance(variants, list) else 0
_log(logs, f"CATALOG_VARIANTS_TOTAL {total}")
if not isinstance(variants, list):
return
n = 0
for v in variants:
if not isinstance(v, dict):
continue
vid = v.get("id")
size = v.get("size")
color = v.get("color")
ph = v.get("placeholders") or []
phn = len(ph) if isinstance(ph, list) else 0
_log(logs, f"CATALOG_VARIANT id={vid} size={size} color={color} placeholders={phn}")
n += 1
if n >= limit:
break
def _create_product_all_variants_with_grid(
logs: List[str],
shop_id: str,
blob: Dict[str, Any],
product_info: Dict[str, Any],
upload: Dict[str, Any],
) -> Dict[str, Any]:
bp_id = (blob.get("blueprint") or {}).get("id")
provider_id = (blob.get("provider") or {}).get("id")
if bp_id is None or provider_id is None:
raise RuntimeError("Missing blueprint or provider id.")
enabled_variants = product_info.get("variants") or []
if not isinstance(enabled_variants, list) or not enabled_variants:
raise RuntimeError("No variants to create product with.")
all_variants = product_info.get("_allVariants") or enabled_variants
if not isinstance(all_variants, list) or not all_variants:
raise RuntimeError("No all-variants universe found for this provider.")
enabled_ids_raw = product_info.get("_enabledVariantIds") or []
enabled_ids = set()
for x in enabled_ids_raw:
try:
enabled_ids.add(int(x))
except Exception:
pass
if not enabled_ids:
for v in enabled_variants:
vid = v.get("id")
if vid is None:
continue
try:
enabled_ids.add(int(vid))
except Exception:
continue
img_id = upload.get("id")
img_w = float(upload.get("width") or 0)
img_h = float(upload.get("height") or 0)
variants_payload = []
print_areas_payload = []
enabled_count = 0
for v in all_variants:
if not isinstance(v, dict):
continue
vid = v.get("id")
if vid is None:
continue
try:
vid_i = int(vid)
except Exception:
continue
is_on = vid_i in enabled_ids
if is_on:
enabled_count += 1
variants_payload.append({
"id": vid_i,
"price": 1,
"is_enabled": bool(is_on),
})
for v in enabled_variants:
if not isinstance(v, dict):
continue
vid = v.get("id")
if vid is None:
continue
try:
vid_i = int(vid)
except Exception:
continue
if vid_i not in enabled_ids:
continue
placeholders = v.get("placeholders") or []
if not isinstance(placeholders, list) or not placeholders:
continue
ph_payload = []
for ph in placeholders:
if not isinstance(ph, dict):
continue
pos = ph.get("position")
pw = ph.get("width")
phh = ph.get("height")
if not pos or pw is None or phh is None:
continue
try:
pwf = float(pw)
phf = float(phh)
except Exception:
continue
scale = _scale_fill(pwf, phf, img_w, img_h)
ph_payload.append({
"position": pos,
"images": [{
"id": img_id,
"x": 0.5,
"y": 0.5,
"scale": scale,
"angle": 0,
}],
})
if ph_payload:
print_areas_payload.append({
"variant_ids": [vid_i],
"placeholders": ph_payload,
})
if not variants_payload:
raise RuntimeError("No valid variant ids to create product with.")
if not print_areas_payload:
raise RuntimeError("No placeholders payload generated for any enabled variant.")
provider_details = blob.get("providerDetails") or {}
provider_name = provider_details.get("title") or provider_details.get("name") or str(provider_id)
base_title = (product_info.get("name") or "Printify Grid Test") + f" — {provider_name}"
suffix = f" [bp={bp_id} pp={provider_id}]"
if product_info.get("_dup") is True:
title = base_title + " — Duplicate" + f" [bp={bp_id} pp={provider_id} dup]"
else:
title = base_title + suffix
description = product_info.get("description") or ""
payload = {
"title": title,
"description": description,
"blueprint_id": int(bp_id),
"print_provider_id": int(provider_id),
"variants": variants_payload,
"print_areas": print_areas_payload,
}
_log(
logs,
f"PHASE_B_CREATE shop_id={shop_id} blueprint_id={bp_id} provider_id={provider_id} "
f"variants_payload_total={len(variants_payload)} enabled={enabled_count} print_areas_payload={len(print_areas_payload)}",
)
created = _req(
"POST",
f"/v1/shops/{shop_id}/products.json",
json_body=payload,
logs=logs,
)
if not isinstance(created, Dict):
raise RuntimeError(f"Unexpected create response: {str(created)[:500]}")
_log(logs, f"PHASE_B_CREATED product_id={created.get('id')}")
return created
def _get_product(logs: List[str], shop_id: str, product_id: str) -> Dict[str, Any]:
prod = _req("GET", f"/v1/shops/{shop_id}/products/{product_id}.json", logs=logs)
if not isinstance(prod, dict):
raise RuntimeError(f"Unexpected product response: {str(prod)[:500]}")
variants = prod.get("variants") or []
_log(logs, f"SHOP_VARIANTS_TOTAL {len(variants) if isinstance(variants, list) else 0}")
if isinstance(variants, list):
for v in variants:
if not isinstance(v, dict):
continue
_log(
logs,
f"SHOP_VARIANT id={v.get('id')} enabled={v.get('is_enabled')} "
f"cost={v.get('cost')} price={v.get('price')}",
)
return prod
def _margin_pct() -> float:
raw = (os.environ.get("PRINTIFY_MARGIN_PCT") or "").strip()
if not raw:
return 0.0
try:
return float(raw)
except Exception:
return 0.0
def _update_prices_to_cost_plus_margin(
logs: List[str],
shop_id: str,
product_id: str,
product: Dict[str, Any],
) -> Dict[str, Any]:
variants = product.get("variants") or []
if not isinstance(variants, list) or not variants:
raise RuntimeError("No variants found on created product for price update.")
m = _margin_pct()
payload_variants = []
for v in variants:
if not isinstance(v, dict):
continue
if v.get("is_enabled") is not True:
continue
vid = v.get("id")
cost = v.get("cost")
if vid is None or cost is None:
continue
try:
cost_cents = int(cost)
except Exception:
continue
price_cents = int(round(cost_cents * (1.0 + m)))
if price_cents < 1:
price_cents = 1
payload_variants.append({
"id": int(vid),
"price": price_cents,
"is_enabled": True,
})
_log(logs, f"PRICE_SET id={vid} cost={cost_cents} price={price_cents} margin_pct={m}")
if not payload_variants:
raise RuntimeError("No variant prices could be computed from costs.")
upd = _req(
"PUT",
f"/v1/shops/{shop_id}/products/{product_id}.json",
json_body={"variants": payload_variants},
logs=logs,
)
if not isinstance(upd, dict):
raise RuntimeError(f"Unexpected update response: {str(upd)[:500]}")
_log(logs, f"PRICE_UPDATE_DONE variants={len(payload_variants)}")
return upd
def _rebuild_options_for_variants(variants: List[Dict[str, Any]]) -> Dict[str, List[str]]:
colors = sorted({v.get("color") for v in variants if v.get("color")})
sizes = sorted({v.get("size") for v in variants if v.get("size")})
return {
"Color": colors,
"Size": sizes,
}
def run(currency: str) -> Generator[Tuple[str, str], None, None]:
logs: List[str] = []
result: Dict[str, Any] = {}
def flush():
return "\n".join(logs), json.dumps(result, indent=2)
try:
_log(logs, "START")
yield flush()
shops = _req("GET", "/v1/shops.json", logs=logs)
_log(logs, f"SHOP_LIST {json.dumps(shops)}")
yield flush()
fd, path = tempfile.mkstemp(prefix="shops_", suffix=".json")
os.close(fd)
with open(path, "w", encoding="utf-8") as f:
json.dump(shops, f, indent=2)
_log(logs, f"SHOPS_JSON_WRITTEN {path}")
yield flush()
blob = _find_first_valid_pair(logs)
result = _build_product(blob, currency or "USD", logs)
_log(logs, "DONE")
yield flush()
except Exception as e:
_log(logs, f"ERROR: {e}")
result = {"error": str(e)}
yield flush()
def phase_b(currency: str) -> Generator[Tuple[str, str], None, None]:
logs: List[str] = []
result: Dict[str, Any] = {}
def flush():
return "\n".join(logs), json.dumps(result, indent=2)
try:
_log(logs, "PHASE_B_START")
yield flush()
shops = _req("GET", "/v1/shops.json", logs=logs)
_log(logs, f"SHOP_LIST {json.dumps(shops)}")
yield flush()
shop_id = _pick_shop_id(shops)
_log(logs, f"SHOP_ID {shop_id}")
yield flush()
first_blob = _find_first_valid_pair(logs)
bp_obj = first_blob.get("blueprint") or {}
bp_id = bp_obj.get("id")
if bp_id is None:
raise RuntimeError("First valid blob missing blueprint id.")
_log(logs, f"PHASE_B_BLUEPRINT {bp_id}")
result["phaseBBlueprintId"] = bp_id
yield flush()
provider_blobs = _get_all_provider_blobs_for_blueprint(logs, bp_obj)
result["providerCount"] = len(provider_blobs)
_log(logs, f"PHASE_B_PROVIDER_COUNT {len(provider_blobs)}")
yield flush()
upload = _upload_grid_from_file(logs)
result["gridUpload"] = upload
yield flush()
provider_runs: List[Dict[str, Any]] = []
currency_code = currency or "USD"
for idx, blob in enumerate(provider_blobs):
provider_details = blob.get("providerDetails") or {}
provider_meta = blob.get("provider") or {}
provider_id = provider_meta.get("id")
provider_name = (
provider_details.get("title")
or provider_details.get("name")
or str(provider_id)
)
_log(
logs,
f"PROVIDER_RUN index={idx} provider_id={provider_id} "
f"name={provider_name}",
)
yield flush()
product_info_full = _build_product(blob, currency_code, logs)
_log_catalog_variants(logs, product_info_full, limit=75)
yield flush()
variants_all = product_info_full.get("variants") or []
if not isinstance(variants_all, list) or not variants_all:
_log(logs, f"PROVIDER_SKIP provider_id={provider_id} reason=no_variants_after_build")
yield flush()
continue
total_variants = len(variants_all)
if total_variants <= HARD_MAX_VARIANTS_PER_PRODUCT:
chunk_index = 0
total_chunks = 1
p_info_chunk = dict(product_info_full)
p_info_chunk["variants"] = variants_all
p_info_chunk["options"] = _rebuild_options_for_variants(variants_all)
p_info_chunk["_allVariants"] = variants_all
p_info_chunk["_enabledVariantIds"] = [int(v.get("id")) for v in variants_all if v.get("id") is not None]
_log(
logs,
f"PROVIDER_SINGLE product provider_id={provider_id} name={provider_name} "
f"variants={total_variants}",
)
yield flush()
created = _create_product_all_variants_with_grid(
logs,
shop_id=shop_id,
blob=blob,
product_info=p_info_chunk,
upload=upload,
)
created_id = str(created.get("id") or "")
if not created_id:
raise RuntimeError("Created product response missing id.")
yield flush()
prod1 = _get_product(logs, shop_id, created_id)
yield flush()
_log(
logs,
f"COMPARE_COUNTS provider={provider_id} chunk_index={chunk_index} "
f"catalog_chunk={len(variants_all)} "
f"shop={len(prod1.get('variants') or [])}",
)
yield flush()
upd = _update_prices_to_cost_plus_margin(logs, shop_id, created_id, prod1)
yield flush()
prod2 = _get_product(logs, shop_id, created_id)
yield flush()
provider_runs.append(
{
"providerId": provider_id,
"providerName": provider_name,
"chunkIndex": chunk_index,
"totalChunks": total_chunks,
"catalogVariantCountTotal": total_variants,
"catalogVariantCountChunk": len(variants_all),
"shopVariantCountAfterCreate": len(prod1.get("variants") or []),
"shopVariantCountFinal": len(prod2.get("variants") or []),
"productId": created_id,
"priceUpdateResponse": upd,
}
)
else:
total_chunks = (total_variants + CHUNK_TARGET_VARIANTS - 1) // CHUNK_TARGET_VARIANTS
chunk_index = 0
offset = 0
while offset < total_variants:
chunk = variants_all[offset: offset + CHUNK_TARGET_VARIANTS]
p_info_chunk = dict(product_info_full)
p_info_chunk["variants"] = chunk
p_info_chunk["options"] = _rebuild_options_for_variants(chunk)
p_info_chunk["_allVariants"] = variants_all
p_info_chunk["_enabledVariantIds"] = [int(v.get("id")) for v in chunk if v.get("id") is not None]
p_info_chunk["_dup"] = bool(chunk_index > 0)
_log(
logs,
f"PROVIDER_CHUNK provider_id={provider_id} name={provider_name} "
f"chunk_index={chunk_index} total_chunks={total_chunks} "
f"offset={offset} size={len(chunk)} total_variants={total_variants}",
)
yield flush()
created = _create_product_all_variants_with_grid(
logs,
shop_id=shop_id,
blob=blob,
product_info=p_info_chunk,
upload=upload,
)
created_id = str(created.get("id") or "")
if not created_id:
raise RuntimeError("Created product response missing id.")
yield flush()
prod1 = _get_product(logs, shop_id, created_id)
yield flush()
_log(
logs,
f"COMPARE_COUNTS provider={provider_id} chunk_index={chunk_index} "
f"catalog_chunk={len(chunk)} "
f"shop={len(prod1.get('variants') or [])}",
)
yield flush()
upd = _update_prices_to_cost_plus_margin(logs, shop_id, created_id, prod1)
yield flush()
prod2 = _get_product(logs, shop_id, created_id)
yield flush()
provider_runs.append(
{
"providerId": provider_id,
"providerName": provider_name,
"chunkIndex": chunk_index,
"totalChunks": total_chunks,
"catalogVariantCountTotal": total_variants,
"catalogVariantCountChunk": len(chunk),
"shopVariantCountAfterCreate": len(prod1.get("variants") or []),
"shopVariantCountFinal": len(prod2.get("variants") or []),
"productId": created_id,
"priceUpdateResponse": upd,
}
)
offset += CHUNK_TARGET_VARIANTS
chunk_index += 1
result["providerRuns"] = provider_runs
_log(logs, "PHASE_B_DONE")
yield flush()
except Exception as e:
_log(logs, f"ERROR: {e}")
result = {"error": str(e)}
yield flush()
with gr.Blocks(title="Printify Catalog Probe") as demo:
gr.Markdown("Extract and normalize ONE Printify blueprint into provider-specific JSON objects and products.")
currency = gr.Textbox(label="Currency", value="USD")
btn = gr.Button("Run")
btn_b = gr.Button("Phase B (Test)")
logs = gr.Textbox(label="Logs", lines=18)
out = gr.Textbox(label="Output JSON", lines=18)
btn.click(run, inputs=[currency], outputs=[logs, out])
btn_b.click(phase_b, inputs=[currency], outputs=[logs, out])
demo.queue().launch()