import os import json import uuid import base64 import threading import zipfile import io import time from pathlib import Path from datetime import datetime import requests as http_requests from flask import Flask, render_template, request, jsonify, send_from_directory, send_file from PIL import Image from io import BytesIO app = Flask(__name__, static_folder="static", static_url_path="/static") app.config["UPLOAD_FOLDER"] = Path("uploads") app.config["GENERATED_FOLDER"] = Path("generated") app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024 app.config["UPLOAD_FOLDER"].mkdir(exist_ok=True) app.config["GENERATED_FOLDER"].mkdir(exist_ok=True) tasks = {} # Cache: disk path -> public URL (avoids re-uploading default refs every request) _ref_url_cache = {} GRSAI_API_KEY = os.environ.get("GRSAI_API_KEY", "") GRSAI_HOST = os.environ.get("GRSAI_HOST", "https://grsai.dakka.com.cn") GRSAI_MODEL = os.environ.get("GRSAI_MODEL", "nano-banana") DRAW_PATH = "/v1/draw/nano-banana" RESULT_PATH = "/v1/draw/result" POLL_INTERVAL = 4 # seconds between polls POLL_TIMEOUT = 300 # max seconds to wait per image MAX_UPLOAD_DIM = 2048 # resize images before hosting # Grsai:跨境 / 高峰时响应慢,使用 (连接, 读取) 双超时;单次读取勿低于此 GRSAI_HTTP_TIMEOUT = (30, 180) GRSAI_HTTP_RETRIES = 3 # ── Amazon image type definitions ─────────────────────────────────────────── AMAZON_IMAGE_CONFIGS = [ { "id": "main", "name": "Main Product Image", "slot": "MAIN", "desc": "纯白背景主图", "prompt": ( "Professional Amazon product listing main image. " "Place {product_name} on a PURE WHITE background (RGB 255,255,255). " "Product centered, filling ~85% of the frame. " "Professional studio lighting, soft even shadows, ultra-sharp focus. " "NO text, NO logos, NO watermarks, NO props. " "Clean, high-end e-commerce product photography." ), }, { "id": "lifestyle", "name": "Lifestyle Scene", "slot": "PT01", "desc": "场景使用图", "prompt": ( "Aspirational lifestyle photograph showing {product_name} being used " "by {target_user} in {use_scene}. " "The product is the clear hero of the image, in sharp focus. " "Natural warm lighting, authentic premium feel. " "Professional photography, shallow depth of field on background." ), }, { "id": "feature_1", "name": "Key Feature #1", "slot": "PT02", "desc": "核心卖点 1", "prompt": ( "Clean infographic-style Amazon listing image for {product_name}. " "Visually highlight the key feature: {feature_1}. " "Show the product from the angle that best demonstrates this feature. " "Light gradient background, modern minimal design. " "Professional e-commerce photography." ), }, { "id": "feature_2", "name": "Key Feature #2", "slot": "PT03", "desc": "核心卖点 2", "prompt": ( "Clean infographic-style Amazon listing image for {product_name}. " "Visually highlight the feature: {feature_2}. " "Show the product from a different angle that best demonstrates this feature. " "Soft neutral background, modern layout. " "Professional product photography." ), }, { "id": "detail", "name": "Detail Close-up", "slot": "PT04", "desc": "细节微距图", "prompt": ( "Extreme close-up macro photograph of {product_name}. " "Focus on material quality, texture, and fine craftsmanship. " "Dramatic studio lighting emphasizing surface details and premium build quality. " "Shallow depth of field, ultra-sharp on the detail area. " "The close-up should convey exceptional quality." ), }, { "id": "scale", "name": "Size & Scale", "slot": "PT05", "desc": "尺寸参考图", "prompt": ( "Product photograph of {product_name} shown with a human hand " "or a common everyday object for clear size reference. " "The viewer should immediately understand the real-world dimensions. " "Clean background, professional lighting. " "The product should look attractive while clearly communicating its size." ), }, { "id": "angle", "name": "Alternative Angle", "slot": "PT06", "desc": "多角度展示", "prompt": ( "Professional product photograph showing {product_name} from " "a 45-degree elevated angle, revealing the top and side simultaneously. " "Clean white background, professional studio lighting. " "This angle should reveal details not visible in a standard front view. " "Sharp focus, high resolution e-commerce photography." ), }, { "id": "package", "name": "What's Included", "slot": "PT07", "desc": "包装内容物", "prompt": ( "Professional flat-lay photograph showing {product_name} and ALL included " "accessories neatly arranged on a clean white background. " "Organized, symmetrical layout. Each item is clearly visible and identifiable. " "Professional product photography, even lighting, no overlapping shadows. " "This image shows everything the customer receives in the package." ), }, ] # ── Custom generator: default reference folders (from references.zip) ───── REF_FOLDERS = { "cover": "coverreference", "feature": "features-references", "howto": "howtoreferences", } STYLE_RULES = { "cover": ( "VISUAL STYLE (cover / hero banner — 16:9): Match the reference set’s premium " "marketing-cover look: strong hero subject (product or lifestyle + product), " "cinematic lighting with warm highlights and controlled contrast, generous " "negative space reserved for headline or copy (top or side), layered depth " "(foreground subject, softer background), cohesive color grading, high-end " "e-commerce or tech-launch aesthetic. Composition should feel like a storefront " "or campaign hero frame, not a plain screenshot. No illegible tiny text; avoid " "clutter unless the user explicitly requests UI chrome." ), "feature": ( "VISUAL STYLE (feature / benefit graphic): Match the reference set’s clean " "SaaS-product marketing slides: clear single-message focus, soft gradients or " "light neutral backgrounds, icon or abstract UI motifs, subtle cards and soft " "shadows, modern minimal layout, trust and clarity. Emphasize one benefit or " "capability; use metaphor, simple diagrams, or app-style panels as in the " "references. Keep typography zones clean if text is added." ), "howto": ( "VISUAL STYLE (how-to / step graphic): Match the reference set’s instructional " "step cards: sequential, educational tone, one clear action per image, numbered " "or step-flow implication, uncluttered composition, friendly UI or device-hand " "cues, generous whitespace, aligned elements. The image should instantly read as " "“Step N of a tutorial” for the user’s topic." ), } def _references_base() -> Path: return Path(__file__).resolve().parent / "static" / "references" def list_default_reference_files(kind: str, limit: int = 4) -> list: """Image paths from bundled reference folders (cover / feature / howto).""" if kind not in REF_FOLDERS: raise ValueError("invalid kind") folder = _references_base() / REF_FOLDERS[kind] if not folder.is_dir(): return [] exts = {".jpg", ".jpeg", ".png", ".webp", ".JPG", ".PNG", ".WEBP"} files = sorted( [p for p in folder.iterdir() if p.is_file() and p.suffix in exts], key=lambda p: p.name.lower(), ) return files[:limit] def public_urls_for_reference_paths(paths: list) -> list: """Upload each file once; cache by resolved path string.""" urls = [] for p in paths: key = str(Path(p).resolve()) if key in _ref_url_cache: urls.append(_ref_url_cache[key]) continue url = upload_image(key) _ref_url_cache[key] = url urls.append(url) return urls def build_custom_prompt(kind: str, user_prompt: str) -> str: base = STYLE_RULES.get(kind, STYLE_RULES["cover"]) up = (user_prompt or "").strip() or "Create a compelling image following the style rules." return ( f"{base}\n\n" f"USER DIRECTION (follow closely; subject matter and specifics):\n{up}\n\n" "Generate one new image that applies the reference visual language to the user’s " "topic. Do not copy text or branding from references; invent appropriate content " "for the user’s request." ) def run_custom_generation_task(task_id, kind, user_prompt, user_image_urls, api_key, host, model): task = tasks[task_id] try: task["status"] = "generating" task["message"] = "正在准备参考图与提示词…" task["total"] = 1 task["done"] = 0 task["images"] = [] default_paths = list_default_reference_files(kind, limit=4) default_urls = public_urls_for_reference_paths(default_paths) if default_paths else [] combined = (user_image_urls or []) + default_urls combined = combined[:10] aspect = "16:9" if kind == "cover" else "auto" prompt = build_custom_prompt(kind, user_prompt) task["message"] = "正在提交生成任务…" draw_id = submit_draw(api_key, host, model, prompt, combined, aspect_ratio=aspect) deadline = time.time() + POLL_TIMEOUT while time.time() < deadline: time.sleep(POLL_INTERVAL) result = poll_draw_result(api_key, host, draw_id) st = result.get("status", "") if st == "succeeded": img_url = result["results"][0]["url"] fname = f"{task_id}_custom.png" out_path = app.config["GENERATED_FOLDER"] / fname download_image(img_url, str(out_path)) task["done"] = 1 task["images"] = [{ "id": "custom", "name": {"cover": "封面图", "feature": "功能图", "howto": "How-to 图"}.get(kind, "自定义"), "slot": "CUSTOM", "filename": fname, "url": f"/api/images/{fname}", "status": "ok", }] task["status"] = "completed" task["message"] = "生成完成" return if st == "failed": reason = result.get("failure_reason", "") or result.get("error", "unknown") raise RuntimeError(reason) raise TimeoutError("生成超时") except Exception as e: task["status"] = "error" task["message"] = str(e) task["images"] = [{ "id": "custom", "name": "自定义图", "slot": "CUSTOM", "status": "error", "error": str(e), }] # ── Image processing & hosting ────────────────────────────────────────────── def compress_for_upload(file_path, max_dim=1600, quality=85): """Always convert to compressed JPEG for reliable hosting.""" img = Image.open(file_path) # Handle transparency / palette modes → RGB with white background if img.mode in ("RGBA", "LA", "P"): if img.mode == "P": img = img.convert("RGBA") bg = Image.new("RGB", img.size, (255, 255, 255)) bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) img = bg elif img.mode != "RGB": img = img.convert("RGB") img.thumbnail((max_dim, max_dim), Image.LANCZOS) buf = BytesIO() img.save(buf, format="JPEG", quality=quality, optimize=True) return buf.getvalue() def _try_telegraph(jpeg_bytes): resp = http_requests.post( "https://telegra.ph/upload", files={"file": ("image.jpg", jpeg_bytes, "image/jpeg")}, timeout=(20, 90), ) resp.raise_for_status() body = resp.json() if isinstance(body, list) and body and "src" in body[0]: return "https://telegra.ph" + body[0]["src"] raise RuntimeError(f"Unexpected response: {body}") def _try_catbox(jpeg_bytes): resp = http_requests.post( "https://catbox.moe/user/api.php", data={"reqtype": "fileupload"}, files={"fileToUpload": ("image.jpg", jpeg_bytes, "image/jpeg")}, timeout=(20, 120), ) resp.raise_for_status() url = resp.text.strip() if url.startswith("http"): return url raise RuntimeError(f"Unexpected response: {url[:200]}") def _try_0x0(jpeg_bytes): resp = http_requests.post( "https://0x0.st", files={"file": ("image.jpg", jpeg_bytes, "image/jpeg")}, timeout=(20, 90), ) resp.raise_for_status() url = resp.text.strip() if url.startswith("http"): return url raise RuntimeError(f"Unexpected response: {url[:200]}") def _post_grsai_with_retry(url, headers, json_payload): """POST to Grsai with long read timeout and retries on network errors.""" last_err = None for attempt in range(GRSAI_HTTP_RETRIES): try: resp = http_requests.post( url, headers=headers, json=json_payload, timeout=GRSAI_HTTP_TIMEOUT, ) return resp except ( http_requests.exceptions.Timeout, http_requests.exceptions.ConnectionError, ) as e: last_err = e app.logger.warning(f"Grsai request attempt {attempt + 1}/{GRSAI_HTTP_RETRIES}: {e}") if attempt < GRSAI_HTTP_RETRIES - 1: time.sleep(2 + attempt * 3) raise last_err HOSTING_BACKENDS = [ ("Telegraph", _try_telegraph), ("Catbox", _try_catbox), ("0x0.st", _try_0x0), ] def upload_image(file_path): """Compress image and try multiple hosting services as fallbacks.""" jpeg_bytes = compress_for_upload(file_path) errors = [] for name, fn in HOSTING_BACKENDS: try: url = fn(jpeg_bytes) app.logger.info(f"Uploaded via {name}: {url}") return url except Exception as e: errors.append(f"{name}: {e}") app.logger.warning(f"{name} upload failed: {e}") raise RuntimeError("所有图床均失败: " + "; ".join(errors)) # ── Nano Banana API helpers ───────────────────────────────────────────────── def submit_draw(api_key, host, model, prompt, image_urls, aspect_ratio="1:1"): """Submit a draw request and return the task id.""" url = host + DRAW_PATH headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", } payload = { "model": model, "prompt": prompt, "webHook": "-1", "aspectRatio": aspect_ratio, } if image_urls: payload["urls"] = image_urls resp = _post_grsai_with_retry(url, headers, payload) resp.raise_for_status() body = resp.json() # The response structure when webHook="-1" if "data" in body and "id" in body["data"]: return body["data"]["id"] if "id" in body: return body["id"] raise RuntimeError(f"Unexpected draw response: {json.dumps(body, ensure_ascii=False)}") def poll_draw_result(api_key, host, task_id): """Poll the result endpoint once and return the status dict.""" url = host + RESULT_PATH headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", } resp = _post_grsai_with_retry(url, headers, {"id": task_id}) resp.raise_for_status() body = resp.json() code = body.get("code") if code == -22: return {"status": "running"} if code == 0 and body.get("data"): return body["data"] raise RuntimeError(body.get("msg", "Unknown API error")) def wait_for_result(api_key, host, task_id): """Poll until the draw task completes or times out.""" deadline = time.time() + POLL_TIMEOUT while time.time() < deadline: result = poll_draw_result(api_key, host, task_id) status = result.get("status", "") if status == "succeeded": return result if status == "failed": reason = result.get("failure_reason", "") or result.get("error", "unknown") raise RuntimeError(f"Generation failed: {reason}") time.sleep(POLL_INTERVAL) raise TimeoutError("Image generation timed out") def download_image(url, output_path): """Download an image from URL and save locally.""" resp = http_requests.get(url, timeout=(30, 120)) resp.raise_for_status() with open(output_path, "wb") as f: f.write(resp.content) # ── Build prompts ────────────────────────────────────────────────────────── def build_prompt(config, product_info): name = product_info.get("product_name") or "the product shown in the reference image" return config["prompt"].format( product_name=name, feature_1=product_info.get("feature_1") or "its primary selling feature", feature_2=product_info.get("feature_2") or "its secondary selling feature", target_user=product_info.get("target_user") or "a satisfied customer", use_scene=product_info.get("use_scene") or "an appropriate real-world setting", ) # ── Background generation task ───────────────────────────────────────────── def run_generation_task(task_id, image_urls, product_info, api_key, host, model, num_images): task = tasks[task_id] try: configs = AMAZON_IMAGE_CONFIGS[:num_images] task["total"] = len(configs) task["done"] = 0 task["images"] = [] # Phase 1: submit all draw requests task["status"] = "submitting" task["message"] = "正在提交生成任务…" draw_jobs = [] for cfg in configs: prompt = build_prompt(cfg, product_info) try: draw_id = submit_draw(api_key, host, model, prompt, image_urls) draw_jobs.append({"config": cfg, "draw_id": draw_id, "state": "running"}) except Exception as e: draw_jobs.append({"config": cfg, "draw_id": None, "state": "failed", "error": str(e)}) task["done"] += 1 task["images"].append({ "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], "status": "error", "error": str(e), }) # Phase 2: poll all running jobs until done task["status"] = "generating" task["message"] = "正在生成图片…" while any(j["state"] == "running" for j in draw_jobs): time.sleep(POLL_INTERVAL) for job in draw_jobs: if job["state"] != "running": continue cfg = job["config"] try: result = poll_draw_result(api_key, host, job["draw_id"]) st = result.get("status", "") if st == "succeeded": img_url = result["results"][0]["url"] fname = f"{task_id}_{cfg['id']}.png" out_path = app.config["GENERATED_FOLDER"] / fname download_image(img_url, str(out_path)) job["state"] = "done" task["done"] += 1 task["images"].append({ "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], "filename": fname, "url": f"/api/images/{fname}", "status": "ok", }) task["message"] = f"已完成 {task['done']}/{task['total']} 张" elif st == "failed": reason = result.get("failure_reason", "") or result.get("error", "unknown") job["state"] = "failed" task["done"] += 1 task["images"].append({ "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], "status": "error", "error": reason, }) task["message"] = f"已完成 {task['done']}/{task['total']} 张" except Exception as e: # Transient error, keep polling pass # Timeout check elapsed = time.time() - task["_start"] if elapsed > POLL_TIMEOUT * 1.5: for job in draw_jobs: if job["state"] == "running": job["state"] = "timeout" cfg = job["config"] task["done"] += 1 task["images"].append({ "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], "status": "error", "error": "生成超时", }) break task["status"] = "completed" ok_count = sum(1 for img in task["images"] if img["status"] == "ok") task["message"] = f"完成!成功 {ok_count}/{task['total']} 张" except Exception as e: task["status"] = "error" task["message"] = f"出错了:{e}" # ── Routes ────────────────────────────────────────────────────────────────── @app.route("/") def index(): return render_template("index.html") @app.route("/api/upload-images", methods=["POST"]) def api_upload_images(): """Upload product images and host them for public access. Returns public URLs.""" files = request.files.getlist("images") if not files: return jsonify({"error": "请上传至少一张产品图片"}), 400 local_paths = [] public_urls = [] errors = [] for f in files: fname = f"{uuid.uuid4().hex}{Path(f.filename).suffix.lower()}" fp = app.config["UPLOAD_FOLDER"] / fname f.save(fp) local_paths.append(str(fp)) try: url = upload_image(str(fp)) public_urls.append(url) except Exception as e: errors.append(f"{f.filename}: {e}") if not public_urls and errors: return jsonify({"error": "图片上传到图床失败:" + "; ".join(errors)}), 500 return jsonify({ "local_paths": local_paths, "public_urls": public_urls, "errors": errors, }) @app.route("/api/generate", methods=["POST"]) def api_generate(): data = request.json api_key = GRSAI_API_KEY host = GRSAI_HOST model = GRSAI_MODEL num_images = min(int(data.get("num_images", 8)), 8) image_urls = data.get("image_urls", []) product_info = data.get("product_info", {}) tid = uuid.uuid4().hex[:8] tasks[tid] = { "id": tid, "status": "queued", "message": "排队中…", "created": datetime.now().isoformat(), "images": [], "done": 0, "total": 0, "_start": time.time(), } t = threading.Thread( target=run_generation_task, args=(tid, image_urls, product_info, api_key, host, model, num_images), daemon=True, ) t.start() return jsonify({"task_id": tid}) @app.route("/api/generate-custom", methods=["POST"]) def api_generate_custom(): data = request.json or {} kind = (data.get("kind") or "").strip().lower() if kind not in REF_FOLDERS: return jsonify({"error": "kind 必须是 cover、feature 或 howto"}), 400 user_prompt = (data.get("prompt") or "").strip() if not user_prompt: return jsonify({"error": "请填写生成要求(prompt)"}), 400 image_urls = data.get("image_urls") or [] if not isinstance(image_urls, list): image_urls = [] api_key = GRSAI_API_KEY if not api_key: return jsonify({"error": "服务端未配置 GRSAI_API_KEY"}), 500 host = GRSAI_HOST model = GRSAI_MODEL tid = "c" + uuid.uuid4().hex[:7] tasks[tid] = { "id": tid, "kind": "custom", "custom_kind": kind, "status": "queued", "message": "排队中…", "created": datetime.now().isoformat(), "images": [], "done": 0, "total": 1, "_start": time.time(), } t = threading.Thread( target=run_custom_generation_task, args=(tid, kind, user_prompt, image_urls, api_key, host, model), daemon=True, ) t.start() return jsonify({"task_id": tid}) @app.route("/api/status/") def api_status(tid): task = tasks.get(tid) if not task: return jsonify({"error": "任务不存在"}), 404 safe = {k: v for k, v in task.items() if not k.startswith("_")} return jsonify(safe) @app.route("/api/images/") def serve_image(filename): return send_from_directory(app.config["GENERATED_FOLDER"], filename) @app.route("/api/download-all/") def download_all(tid): task = tasks.get(tid) if not task: return jsonify({"error": "任务不存在"}), 404 buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for img in task.get("images", []): if img["status"] == "ok" and img.get("filename"): fp = app.config["GENERATED_FOLDER"] / img["filename"] if fp.exists(): arc = f"{img['slot']}_{img['name'].replace(' ', '_')}.png" zf.write(fp, arc) buf.seek(0) return send_file(buf, mimetype="application/zip", as_attachment=True, download_name=f"amazon_images_{tid}.zip") if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) app.run(debug=False, host="0.0.0.0", port=port)