| import os |
| import json |
| import uuid |
| import base64 |
| import threading |
| import zipfile |
| import io |
| import time |
| from pathlib import Path |
| from datetime import datetime |
|
|
| import requests as http_requests |
| from flask import Flask, render_template, request, jsonify, send_from_directory, send_file |
| from PIL import Image |
| from io import BytesIO |
|
|
| app = Flask(__name__, static_folder="static", static_url_path="/static") |
| app.config["UPLOAD_FOLDER"] = Path("uploads") |
| app.config["GENERATED_FOLDER"] = Path("generated") |
| app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024 |
|
|
| app.config["UPLOAD_FOLDER"].mkdir(exist_ok=True) |
| app.config["GENERATED_FOLDER"].mkdir(exist_ok=True) |
|
|
| tasks = {} |
|
|
| |
| _ref_url_cache = {} |
|
|
| GRSAI_API_KEY = os.environ.get("GRSAI_API_KEY", "") |
| GRSAI_HOST = os.environ.get("GRSAI_HOST", "https://grsai.dakka.com.cn") |
| GRSAI_MODEL = os.environ.get("GRSAI_MODEL", "nano-banana") |
|
|
| DRAW_PATH = "/v1/draw/nano-banana" |
| RESULT_PATH = "/v1/draw/result" |
|
|
| POLL_INTERVAL = 4 |
| POLL_TIMEOUT = 300 |
| MAX_UPLOAD_DIM = 2048 |
|
|
| |
| GRSAI_HTTP_TIMEOUT = (30, 180) |
| GRSAI_HTTP_RETRIES = 3 |
|
|
| |
|
|
| AMAZON_IMAGE_CONFIGS = [ |
| { |
| "id": "main", |
| "name": "Main Product Image", |
| "slot": "MAIN", |
| "desc": "纯白背景主图", |
| "prompt": ( |
| "Professional Amazon product listing main image. " |
| "Place {product_name} on a PURE WHITE background (RGB 255,255,255). " |
| "Product centered, filling ~85% of the frame. " |
| "Professional studio lighting, soft even shadows, ultra-sharp focus. " |
| "NO text, NO logos, NO watermarks, NO props. " |
| "Clean, high-end e-commerce product photography." |
| ), |
| }, |
| { |
| "id": "lifestyle", |
| "name": "Lifestyle Scene", |
| "slot": "PT01", |
| "desc": "场景使用图", |
| "prompt": ( |
| "Aspirational lifestyle photograph showing {product_name} being used " |
| "by {target_user} in {use_scene}. " |
| "The product is the clear hero of the image, in sharp focus. " |
| "Natural warm lighting, authentic premium feel. " |
| "Professional photography, shallow depth of field on background." |
| ), |
| }, |
| { |
| "id": "feature_1", |
| "name": "Key Feature #1", |
| "slot": "PT02", |
| "desc": "核心卖点 1", |
| "prompt": ( |
| "Clean infographic-style Amazon listing image for {product_name}. " |
| "Visually highlight the key feature: {feature_1}. " |
| "Show the product from the angle that best demonstrates this feature. " |
| "Light gradient background, modern minimal design. " |
| "Professional e-commerce photography." |
| ), |
| }, |
| { |
| "id": "feature_2", |
| "name": "Key Feature #2", |
| "slot": "PT03", |
| "desc": "核心卖点 2", |
| "prompt": ( |
| "Clean infographic-style Amazon listing image for {product_name}. " |
| "Visually highlight the feature: {feature_2}. " |
| "Show the product from a different angle that best demonstrates this feature. " |
| "Soft neutral background, modern layout. " |
| "Professional product photography." |
| ), |
| }, |
| { |
| "id": "detail", |
| "name": "Detail Close-up", |
| "slot": "PT04", |
| "desc": "细节微距图", |
| "prompt": ( |
| "Extreme close-up macro photograph of {product_name}. " |
| "Focus on material quality, texture, and fine craftsmanship. " |
| "Dramatic studio lighting emphasizing surface details and premium build quality. " |
| "Shallow depth of field, ultra-sharp on the detail area. " |
| "The close-up should convey exceptional quality." |
| ), |
| }, |
| { |
| "id": "scale", |
| "name": "Size & Scale", |
| "slot": "PT05", |
| "desc": "尺寸参考图", |
| "prompt": ( |
| "Product photograph of {product_name} shown with a human hand " |
| "or a common everyday object for clear size reference. " |
| "The viewer should immediately understand the real-world dimensions. " |
| "Clean background, professional lighting. " |
| "The product should look attractive while clearly communicating its size." |
| ), |
| }, |
| { |
| "id": "angle", |
| "name": "Alternative Angle", |
| "slot": "PT06", |
| "desc": "多角度展示", |
| "prompt": ( |
| "Professional product photograph showing {product_name} from " |
| "a 45-degree elevated angle, revealing the top and side simultaneously. " |
| "Clean white background, professional studio lighting. " |
| "This angle should reveal details not visible in a standard front view. " |
| "Sharp focus, high resolution e-commerce photography." |
| ), |
| }, |
| { |
| "id": "package", |
| "name": "What's Included", |
| "slot": "PT07", |
| "desc": "包装内容物", |
| "prompt": ( |
| "Professional flat-lay photograph showing {product_name} and ALL included " |
| "accessories neatly arranged on a clean white background. " |
| "Organized, symmetrical layout. Each item is clearly visible and identifiable. " |
| "Professional product photography, even lighting, no overlapping shadows. " |
| "This image shows everything the customer receives in the package." |
| ), |
| }, |
| ] |
|
|
| |
|
|
| REF_FOLDERS = { |
| "cover": "coverreference", |
| "feature": "features-references", |
| "howto": "howtoreferences", |
| } |
|
|
| STYLE_RULES = { |
| "cover": ( |
| "VISUAL STYLE (cover / hero banner — 16:9): Match the reference set’s premium " |
| "marketing-cover look: strong hero subject (product or lifestyle + product), " |
| "cinematic lighting with warm highlights and controlled contrast, generous " |
| "negative space reserved for headline or copy (top or side), layered depth " |
| "(foreground subject, softer background), cohesive color grading, high-end " |
| "e-commerce or tech-launch aesthetic. Composition should feel like a storefront " |
| "or campaign hero frame, not a plain screenshot. No illegible tiny text; avoid " |
| "clutter unless the user explicitly requests UI chrome." |
| ), |
| "feature": ( |
| "VISUAL STYLE (feature / benefit graphic): Match the reference set’s clean " |
| "SaaS-product marketing slides: clear single-message focus, soft gradients or " |
| "light neutral backgrounds, icon or abstract UI motifs, subtle cards and soft " |
| "shadows, modern minimal layout, trust and clarity. Emphasize one benefit or " |
| "capability; use metaphor, simple diagrams, or app-style panels as in the " |
| "references. Keep typography zones clean if text is added." |
| ), |
| "howto": ( |
| "VISUAL STYLE (how-to / step graphic): Match the reference set’s instructional " |
| "step cards: sequential, educational tone, one clear action per image, numbered " |
| "or step-flow implication, uncluttered composition, friendly UI or device-hand " |
| "cues, generous whitespace, aligned elements. The image should instantly read as " |
| "“Step N of a tutorial” for the user’s topic." |
| ), |
| } |
|
|
|
|
| def _references_base() -> Path: |
| return Path(__file__).resolve().parent / "static" / "references" |
|
|
|
|
| def list_default_reference_files(kind: str, limit: int = 4) -> list: |
| """Image paths from bundled reference folders (cover / feature / howto).""" |
| if kind not in REF_FOLDERS: |
| raise ValueError("invalid kind") |
| folder = _references_base() / REF_FOLDERS[kind] |
| if not folder.is_dir(): |
| return [] |
| exts = {".jpg", ".jpeg", ".png", ".webp", ".JPG", ".PNG", ".WEBP"} |
| files = sorted( |
| [p for p in folder.iterdir() if p.is_file() and p.suffix in exts], |
| key=lambda p: p.name.lower(), |
| ) |
| return files[:limit] |
|
|
|
|
| def public_urls_for_reference_paths(paths: list) -> list: |
| """Upload each file once; cache by resolved path string.""" |
| urls = [] |
| for p in paths: |
| key = str(Path(p).resolve()) |
| if key in _ref_url_cache: |
| urls.append(_ref_url_cache[key]) |
| continue |
| url = upload_image(key) |
| _ref_url_cache[key] = url |
| urls.append(url) |
| return urls |
|
|
|
|
| def build_custom_prompt(kind: str, user_prompt: str) -> str: |
| base = STYLE_RULES.get(kind, STYLE_RULES["cover"]) |
| up = (user_prompt or "").strip() or "Create a compelling image following the style rules." |
| return ( |
| f"{base}\n\n" |
| f"USER DIRECTION (follow closely; subject matter and specifics):\n{up}\n\n" |
| "Generate one new image that applies the reference visual language to the user’s " |
| "topic. Do not copy text or branding from references; invent appropriate content " |
| "for the user’s request." |
| ) |
|
|
|
|
| def run_custom_generation_task(task_id, kind, user_prompt, user_image_urls, api_key, host, model): |
| task = tasks[task_id] |
| try: |
| task["status"] = "generating" |
| task["message"] = "正在准备参考图与提示词…" |
| task["total"] = 1 |
| task["done"] = 0 |
| task["images"] = [] |
|
|
| default_paths = list_default_reference_files(kind, limit=4) |
| default_urls = public_urls_for_reference_paths(default_paths) if default_paths else [] |
|
|
| combined = (user_image_urls or []) + default_urls |
| combined = combined[:10] |
|
|
| aspect = "16:9" if kind == "cover" else "auto" |
| prompt = build_custom_prompt(kind, user_prompt) |
|
|
| task["message"] = "正在提交生成任务…" |
| draw_id = submit_draw(api_key, host, model, prompt, combined, aspect_ratio=aspect) |
|
|
| deadline = time.time() + POLL_TIMEOUT |
| while time.time() < deadline: |
| time.sleep(POLL_INTERVAL) |
| result = poll_draw_result(api_key, host, draw_id) |
| st = result.get("status", "") |
| if st == "succeeded": |
| img_url = result["results"][0]["url"] |
| fname = f"{task_id}_custom.png" |
| out_path = app.config["GENERATED_FOLDER"] / fname |
| download_image(img_url, str(out_path)) |
| task["done"] = 1 |
| task["images"] = [{ |
| "id": "custom", |
| "name": {"cover": "封面图", "feature": "功能图", "howto": "How-to 图"}.get(kind, "自定义"), |
| "slot": "CUSTOM", |
| "filename": fname, |
| "url": f"/api/images/{fname}", |
| "status": "ok", |
| }] |
| task["status"] = "completed" |
| task["message"] = "生成完成" |
| return |
| if st == "failed": |
| reason = result.get("failure_reason", "") or result.get("error", "unknown") |
| raise RuntimeError(reason) |
|
|
| raise TimeoutError("生成超时") |
|
|
| except Exception as e: |
| task["status"] = "error" |
| task["message"] = str(e) |
| task["images"] = [{ |
| "id": "custom", |
| "name": "自定义图", |
| "slot": "CUSTOM", |
| "status": "error", |
| "error": str(e), |
| }] |
|
|
|
|
| |
|
|
| def compress_for_upload(file_path, max_dim=1600, quality=85): |
| """Always convert to compressed JPEG for reliable hosting.""" |
| img = Image.open(file_path) |
| |
| if img.mode in ("RGBA", "LA", "P"): |
| if img.mode == "P": |
| img = img.convert("RGBA") |
| bg = Image.new("RGB", img.size, (255, 255, 255)) |
| bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None) |
| img = bg |
| elif img.mode != "RGB": |
| img = img.convert("RGB") |
| img.thumbnail((max_dim, max_dim), Image.LANCZOS) |
| buf = BytesIO() |
| img.save(buf, format="JPEG", quality=quality, optimize=True) |
| return buf.getvalue() |
|
|
|
|
| def _try_telegraph(jpeg_bytes): |
| resp = http_requests.post( |
| "https://telegra.ph/upload", |
| files={"file": ("image.jpg", jpeg_bytes, "image/jpeg")}, |
| timeout=(20, 90), |
| ) |
| resp.raise_for_status() |
| body = resp.json() |
| if isinstance(body, list) and body and "src" in body[0]: |
| return "https://telegra.ph" + body[0]["src"] |
| raise RuntimeError(f"Unexpected response: {body}") |
|
|
|
|
| def _try_catbox(jpeg_bytes): |
| resp = http_requests.post( |
| "https://catbox.moe/user/api.php", |
| data={"reqtype": "fileupload"}, |
| files={"fileToUpload": ("image.jpg", jpeg_bytes, "image/jpeg")}, |
| timeout=(20, 120), |
| ) |
| resp.raise_for_status() |
| url = resp.text.strip() |
| if url.startswith("http"): |
| return url |
| raise RuntimeError(f"Unexpected response: {url[:200]}") |
|
|
|
|
| def _try_0x0(jpeg_bytes): |
| resp = http_requests.post( |
| "https://0x0.st", |
| files={"file": ("image.jpg", jpeg_bytes, "image/jpeg")}, |
| timeout=(20, 90), |
| ) |
| resp.raise_for_status() |
| url = resp.text.strip() |
| if url.startswith("http"): |
| return url |
| raise RuntimeError(f"Unexpected response: {url[:200]}") |
|
|
|
|
| def _post_grsai_with_retry(url, headers, json_payload): |
| """POST to Grsai with long read timeout and retries on network errors.""" |
| last_err = None |
| for attempt in range(GRSAI_HTTP_RETRIES): |
| try: |
| resp = http_requests.post( |
| url, headers=headers, json=json_payload, timeout=GRSAI_HTTP_TIMEOUT, |
| ) |
| return resp |
| except ( |
| http_requests.exceptions.Timeout, |
| http_requests.exceptions.ConnectionError, |
| ) as e: |
| last_err = e |
| app.logger.warning(f"Grsai request attempt {attempt + 1}/{GRSAI_HTTP_RETRIES}: {e}") |
| if attempt < GRSAI_HTTP_RETRIES - 1: |
| time.sleep(2 + attempt * 3) |
| raise last_err |
|
|
|
|
| HOSTING_BACKENDS = [ |
| ("Telegraph", _try_telegraph), |
| ("Catbox", _try_catbox), |
| ("0x0.st", _try_0x0), |
| ] |
|
|
|
|
| def upload_image(file_path): |
| """Compress image and try multiple hosting services as fallbacks.""" |
| jpeg_bytes = compress_for_upload(file_path) |
| errors = [] |
| for name, fn in HOSTING_BACKENDS: |
| try: |
| url = fn(jpeg_bytes) |
| app.logger.info(f"Uploaded via {name}: {url}") |
| return url |
| except Exception as e: |
| errors.append(f"{name}: {e}") |
| app.logger.warning(f"{name} upload failed: {e}") |
| raise RuntimeError("所有图床均失败: " + "; ".join(errors)) |
|
|
|
|
| |
|
|
| def submit_draw(api_key, host, model, prompt, image_urls, aspect_ratio="1:1"): |
| """Submit a draw request and return the task id.""" |
| url = host + DRAW_PATH |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}", |
| } |
| payload = { |
| "model": model, |
| "prompt": prompt, |
| "webHook": "-1", |
| "aspectRatio": aspect_ratio, |
| } |
| if image_urls: |
| payload["urls"] = image_urls |
|
|
| resp = _post_grsai_with_retry(url, headers, payload) |
| resp.raise_for_status() |
| body = resp.json() |
|
|
| |
| if "data" in body and "id" in body["data"]: |
| return body["data"]["id"] |
| if "id" in body: |
| return body["id"] |
| raise RuntimeError(f"Unexpected draw response: {json.dumps(body, ensure_ascii=False)}") |
|
|
|
|
| def poll_draw_result(api_key, host, task_id): |
| """Poll the result endpoint once and return the status dict.""" |
| url = host + RESULT_PATH |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}", |
| } |
| resp = _post_grsai_with_retry(url, headers, {"id": task_id}) |
| resp.raise_for_status() |
| body = resp.json() |
| code = body.get("code") |
| if code == -22: |
| return {"status": "running"} |
| if code == 0 and body.get("data"): |
| return body["data"] |
| raise RuntimeError(body.get("msg", "Unknown API error")) |
|
|
|
|
| def wait_for_result(api_key, host, task_id): |
| """Poll until the draw task completes or times out.""" |
| deadline = time.time() + POLL_TIMEOUT |
| while time.time() < deadline: |
| result = poll_draw_result(api_key, host, task_id) |
| status = result.get("status", "") |
| if status == "succeeded": |
| return result |
| if status == "failed": |
| reason = result.get("failure_reason", "") or result.get("error", "unknown") |
| raise RuntimeError(f"Generation failed: {reason}") |
| time.sleep(POLL_INTERVAL) |
| raise TimeoutError("Image generation timed out") |
|
|
|
|
| def download_image(url, output_path): |
| """Download an image from URL and save locally.""" |
| resp = http_requests.get(url, timeout=(30, 120)) |
| resp.raise_for_status() |
| with open(output_path, "wb") as f: |
| f.write(resp.content) |
|
|
|
|
| |
|
|
| def build_prompt(config, product_info): |
| name = product_info.get("product_name") or "the product shown in the reference image" |
| return config["prompt"].format( |
| product_name=name, |
| feature_1=product_info.get("feature_1") or "its primary selling feature", |
| feature_2=product_info.get("feature_2") or "its secondary selling feature", |
| target_user=product_info.get("target_user") or "a satisfied customer", |
| use_scene=product_info.get("use_scene") or "an appropriate real-world setting", |
| ) |
|
|
|
|
| |
|
|
| def run_generation_task(task_id, image_urls, product_info, api_key, host, model, num_images): |
| task = tasks[task_id] |
| try: |
| configs = AMAZON_IMAGE_CONFIGS[:num_images] |
| task["total"] = len(configs) |
| task["done"] = 0 |
| task["images"] = [] |
|
|
| |
| task["status"] = "submitting" |
| task["message"] = "正在提交生成任务…" |
|
|
| draw_jobs = [] |
| for cfg in configs: |
| prompt = build_prompt(cfg, product_info) |
| try: |
| draw_id = submit_draw(api_key, host, model, prompt, image_urls) |
| draw_jobs.append({"config": cfg, "draw_id": draw_id, "state": "running"}) |
| except Exception as e: |
| draw_jobs.append({"config": cfg, "draw_id": None, "state": "failed", "error": str(e)}) |
| task["done"] += 1 |
| task["images"].append({ |
| "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], |
| "status": "error", "error": str(e), |
| }) |
|
|
| |
| task["status"] = "generating" |
| task["message"] = "正在生成图片…" |
|
|
| while any(j["state"] == "running" for j in draw_jobs): |
| time.sleep(POLL_INTERVAL) |
| for job in draw_jobs: |
| if job["state"] != "running": |
| continue |
| cfg = job["config"] |
| try: |
| result = poll_draw_result(api_key, host, job["draw_id"]) |
| st = result.get("status", "") |
| if st == "succeeded": |
| img_url = result["results"][0]["url"] |
| fname = f"{task_id}_{cfg['id']}.png" |
| out_path = app.config["GENERATED_FOLDER"] / fname |
| download_image(img_url, str(out_path)) |
| job["state"] = "done" |
| task["done"] += 1 |
| task["images"].append({ |
| "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], |
| "filename": fname, "url": f"/api/images/{fname}", "status": "ok", |
| }) |
| task["message"] = f"已完成 {task['done']}/{task['total']} 张" |
| elif st == "failed": |
| reason = result.get("failure_reason", "") or result.get("error", "unknown") |
| job["state"] = "failed" |
| task["done"] += 1 |
| task["images"].append({ |
| "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], |
| "status": "error", "error": reason, |
| }) |
| task["message"] = f"已完成 {task['done']}/{task['total']} 张" |
| except Exception as e: |
| |
| pass |
|
|
| |
| elapsed = time.time() - task["_start"] |
| if elapsed > POLL_TIMEOUT * 1.5: |
| for job in draw_jobs: |
| if job["state"] == "running": |
| job["state"] = "timeout" |
| cfg = job["config"] |
| task["done"] += 1 |
| task["images"].append({ |
| "id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"], |
| "status": "error", "error": "生成超时", |
| }) |
| break |
|
|
| task["status"] = "completed" |
| ok_count = sum(1 for img in task["images"] if img["status"] == "ok") |
| task["message"] = f"完成!成功 {ok_count}/{task['total']} 张" |
|
|
| except Exception as e: |
| task["status"] = "error" |
| task["message"] = f"出错了:{e}" |
|
|
|
|
| |
|
|
| @app.route("/") |
| def index(): |
| return render_template("index.html") |
|
|
|
|
| @app.route("/api/upload-images", methods=["POST"]) |
| def api_upload_images(): |
| """Upload product images and host them for public access. Returns public URLs.""" |
| files = request.files.getlist("images") |
| if not files: |
| return jsonify({"error": "请上传至少一张产品图片"}), 400 |
|
|
| local_paths = [] |
| public_urls = [] |
| errors = [] |
|
|
| for f in files: |
| fname = f"{uuid.uuid4().hex}{Path(f.filename).suffix.lower()}" |
| fp = app.config["UPLOAD_FOLDER"] / fname |
| f.save(fp) |
| local_paths.append(str(fp)) |
|
|
| try: |
| url = upload_image(str(fp)) |
| public_urls.append(url) |
| except Exception as e: |
| errors.append(f"{f.filename}: {e}") |
|
|
| if not public_urls and errors: |
| return jsonify({"error": "图片上传到图床失败:" + "; ".join(errors)}), 500 |
|
|
| return jsonify({ |
| "local_paths": local_paths, |
| "public_urls": public_urls, |
| "errors": errors, |
| }) |
|
|
|
|
| @app.route("/api/generate", methods=["POST"]) |
| def api_generate(): |
| data = request.json |
| api_key = GRSAI_API_KEY |
| host = GRSAI_HOST |
| model = GRSAI_MODEL |
| num_images = min(int(data.get("num_images", 8)), 8) |
| image_urls = data.get("image_urls", []) |
| product_info = data.get("product_info", {}) |
|
|
| tid = uuid.uuid4().hex[:8] |
| tasks[tid] = { |
| "id": tid, |
| "status": "queued", |
| "message": "排队中…", |
| "created": datetime.now().isoformat(), |
| "images": [], |
| "done": 0, |
| "total": 0, |
| "_start": time.time(), |
| } |
|
|
| t = threading.Thread( |
| target=run_generation_task, |
| args=(tid, image_urls, product_info, api_key, host, model, num_images), |
| daemon=True, |
| ) |
| t.start() |
| return jsonify({"task_id": tid}) |
|
|
|
|
| @app.route("/api/generate-custom", methods=["POST"]) |
| def api_generate_custom(): |
| data = request.json or {} |
| kind = (data.get("kind") or "").strip().lower() |
| if kind not in REF_FOLDERS: |
| return jsonify({"error": "kind 必须是 cover、feature 或 howto"}), 400 |
| user_prompt = (data.get("prompt") or "").strip() |
| if not user_prompt: |
| return jsonify({"error": "请填写生成要求(prompt)"}), 400 |
| image_urls = data.get("image_urls") or [] |
| if not isinstance(image_urls, list): |
| image_urls = [] |
|
|
| api_key = GRSAI_API_KEY |
| if not api_key: |
| return jsonify({"error": "服务端未配置 GRSAI_API_KEY"}), 500 |
| host = GRSAI_HOST |
| model = GRSAI_MODEL |
|
|
| tid = "c" + uuid.uuid4().hex[:7] |
| tasks[tid] = { |
| "id": tid, |
| "kind": "custom", |
| "custom_kind": kind, |
| "status": "queued", |
| "message": "排队中…", |
| "created": datetime.now().isoformat(), |
| "images": [], |
| "done": 0, |
| "total": 1, |
| "_start": time.time(), |
| } |
|
|
| t = threading.Thread( |
| target=run_custom_generation_task, |
| args=(tid, kind, user_prompt, image_urls, api_key, host, model), |
| daemon=True, |
| ) |
| t.start() |
| return jsonify({"task_id": tid}) |
|
|
|
|
| @app.route("/api/status/<tid>") |
| def api_status(tid): |
| task = tasks.get(tid) |
| if not task: |
| return jsonify({"error": "任务不存在"}), 404 |
| safe = {k: v for k, v in task.items() if not k.startswith("_")} |
| return jsonify(safe) |
|
|
|
|
| @app.route("/api/images/<filename>") |
| def serve_image(filename): |
| return send_from_directory(app.config["GENERATED_FOLDER"], filename) |
|
|
|
|
| @app.route("/api/download-all/<tid>") |
| def download_all(tid): |
| task = tasks.get(tid) |
| if not task: |
| return jsonify({"error": "任务不存在"}), 404 |
|
|
| buf = io.BytesIO() |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: |
| for img in task.get("images", []): |
| if img["status"] == "ok" and img.get("filename"): |
| fp = app.config["GENERATED_FOLDER"] / img["filename"] |
| if fp.exists(): |
| arc = f"{img['slot']}_{img['name'].replace(' ', '_')}.png" |
| zf.write(fp, arc) |
| buf.seek(0) |
| return send_file(buf, mimetype="application/zip", as_attachment=True, |
| download_name=f"amazon_images_{tid}.zip") |
|
|
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| app.run(debug=False, host="0.0.0.0", port=port) |
|
|