lindaitool's picture
Upload app.py with huggingface_hub
3fb2994 verified
import os
import json
import uuid
import base64
import threading
import zipfile
import io
import time
from pathlib import Path
from datetime import datetime
import requests as http_requests
from flask import Flask, render_template, request, jsonify, send_from_directory, send_file
from PIL import Image
from io import BytesIO
app = Flask(__name__, static_folder="static", static_url_path="/static")
app.config["UPLOAD_FOLDER"] = Path("uploads")
app.config["GENERATED_FOLDER"] = Path("generated")
app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024
app.config["UPLOAD_FOLDER"].mkdir(exist_ok=True)
app.config["GENERATED_FOLDER"].mkdir(exist_ok=True)
tasks = {}
# Cache: disk path -> public URL (avoids re-uploading default refs every request)
_ref_url_cache = {}
GRSAI_API_KEY = os.environ.get("GRSAI_API_KEY", "")
GRSAI_HOST = os.environ.get("GRSAI_HOST", "https://grsai.dakka.com.cn")
GRSAI_MODEL = os.environ.get("GRSAI_MODEL", "nano-banana")
DRAW_PATH = "/v1/draw/nano-banana"
RESULT_PATH = "/v1/draw/result"
POLL_INTERVAL = 4 # seconds between polls
POLL_TIMEOUT = 300 # max seconds to wait per image
MAX_UPLOAD_DIM = 2048 # resize images before hosting
# Grsai:跨境 / 高峰时响应慢,使用 (连接, 读取) 双超时;单次读取勿低于此
GRSAI_HTTP_TIMEOUT = (30, 180)
GRSAI_HTTP_RETRIES = 3
# ── Amazon image type definitions ───────────────────────────────────────────
AMAZON_IMAGE_CONFIGS = [
{
"id": "main",
"name": "Main Product Image",
"slot": "MAIN",
"desc": "纯白背景主图",
"prompt": (
"Professional Amazon product listing main image. "
"Place {product_name} on a PURE WHITE background (RGB 255,255,255). "
"Product centered, filling ~85% of the frame. "
"Professional studio lighting, soft even shadows, ultra-sharp focus. "
"NO text, NO logos, NO watermarks, NO props. "
"Clean, high-end e-commerce product photography."
),
},
{
"id": "lifestyle",
"name": "Lifestyle Scene",
"slot": "PT01",
"desc": "场景使用图",
"prompt": (
"Aspirational lifestyle photograph showing {product_name} being used "
"by {target_user} in {use_scene}. "
"The product is the clear hero of the image, in sharp focus. "
"Natural warm lighting, authentic premium feel. "
"Professional photography, shallow depth of field on background."
),
},
{
"id": "feature_1",
"name": "Key Feature #1",
"slot": "PT02",
"desc": "核心卖点 1",
"prompt": (
"Clean infographic-style Amazon listing image for {product_name}. "
"Visually highlight the key feature: {feature_1}. "
"Show the product from the angle that best demonstrates this feature. "
"Light gradient background, modern minimal design. "
"Professional e-commerce photography."
),
},
{
"id": "feature_2",
"name": "Key Feature #2",
"slot": "PT03",
"desc": "核心卖点 2",
"prompt": (
"Clean infographic-style Amazon listing image for {product_name}. "
"Visually highlight the feature: {feature_2}. "
"Show the product from a different angle that best demonstrates this feature. "
"Soft neutral background, modern layout. "
"Professional product photography."
),
},
{
"id": "detail",
"name": "Detail Close-up",
"slot": "PT04",
"desc": "细节微距图",
"prompt": (
"Extreme close-up macro photograph of {product_name}. "
"Focus on material quality, texture, and fine craftsmanship. "
"Dramatic studio lighting emphasizing surface details and premium build quality. "
"Shallow depth of field, ultra-sharp on the detail area. "
"The close-up should convey exceptional quality."
),
},
{
"id": "scale",
"name": "Size & Scale",
"slot": "PT05",
"desc": "尺寸参考图",
"prompt": (
"Product photograph of {product_name} shown with a human hand "
"or a common everyday object for clear size reference. "
"The viewer should immediately understand the real-world dimensions. "
"Clean background, professional lighting. "
"The product should look attractive while clearly communicating its size."
),
},
{
"id": "angle",
"name": "Alternative Angle",
"slot": "PT06",
"desc": "多角度展示",
"prompt": (
"Professional product photograph showing {product_name} from "
"a 45-degree elevated angle, revealing the top and side simultaneously. "
"Clean white background, professional studio lighting. "
"This angle should reveal details not visible in a standard front view. "
"Sharp focus, high resolution e-commerce photography."
),
},
{
"id": "package",
"name": "What's Included",
"slot": "PT07",
"desc": "包装内容物",
"prompt": (
"Professional flat-lay photograph showing {product_name} and ALL included "
"accessories neatly arranged on a clean white background. "
"Organized, symmetrical layout. Each item is clearly visible and identifiable. "
"Professional product photography, even lighting, no overlapping shadows. "
"This image shows everything the customer receives in the package."
),
},
]
# ── Custom generator: default reference folders (from references.zip) ─────
REF_FOLDERS = {
"cover": "coverreference",
"feature": "features-references",
"howto": "howtoreferences",
}
STYLE_RULES = {
"cover": (
"VISUAL STYLE (cover / hero banner — 16:9): Match the reference set’s premium "
"marketing-cover look: strong hero subject (product or lifestyle + product), "
"cinematic lighting with warm highlights and controlled contrast, generous "
"negative space reserved for headline or copy (top or side), layered depth "
"(foreground subject, softer background), cohesive color grading, high-end "
"e-commerce or tech-launch aesthetic. Composition should feel like a storefront "
"or campaign hero frame, not a plain screenshot. No illegible tiny text; avoid "
"clutter unless the user explicitly requests UI chrome."
),
"feature": (
"VISUAL STYLE (feature / benefit graphic): Match the reference set’s clean "
"SaaS-product marketing slides: clear single-message focus, soft gradients or "
"light neutral backgrounds, icon or abstract UI motifs, subtle cards and soft "
"shadows, modern minimal layout, trust and clarity. Emphasize one benefit or "
"capability; use metaphor, simple diagrams, or app-style panels as in the "
"references. Keep typography zones clean if text is added."
),
"howto": (
"VISUAL STYLE (how-to / step graphic): Match the reference set’s instructional "
"step cards: sequential, educational tone, one clear action per image, numbered "
"or step-flow implication, uncluttered composition, friendly UI or device-hand "
"cues, generous whitespace, aligned elements. The image should instantly read as "
"“Step N of a tutorial” for the user’s topic."
),
}
def _references_base() -> Path:
return Path(__file__).resolve().parent / "static" / "references"
def list_default_reference_files(kind: str, limit: int = 4) -> list:
"""Image paths from bundled reference folders (cover / feature / howto)."""
if kind not in REF_FOLDERS:
raise ValueError("invalid kind")
folder = _references_base() / REF_FOLDERS[kind]
if not folder.is_dir():
return []
exts = {".jpg", ".jpeg", ".png", ".webp", ".JPG", ".PNG", ".WEBP"}
files = sorted(
[p for p in folder.iterdir() if p.is_file() and p.suffix in exts],
key=lambda p: p.name.lower(),
)
return files[:limit]
def public_urls_for_reference_paths(paths: list) -> list:
"""Upload each file once; cache by resolved path string."""
urls = []
for p in paths:
key = str(Path(p).resolve())
if key in _ref_url_cache:
urls.append(_ref_url_cache[key])
continue
url = upload_image(key)
_ref_url_cache[key] = url
urls.append(url)
return urls
def build_custom_prompt(kind: str, user_prompt: str) -> str:
base = STYLE_RULES.get(kind, STYLE_RULES["cover"])
up = (user_prompt or "").strip() or "Create a compelling image following the style rules."
return (
f"{base}\n\n"
f"USER DIRECTION (follow closely; subject matter and specifics):\n{up}\n\n"
"Generate one new image that applies the reference visual language to the user’s "
"topic. Do not copy text or branding from references; invent appropriate content "
"for the user’s request."
)
def run_custom_generation_task(task_id, kind, user_prompt, user_image_urls, api_key, host, model):
task = tasks[task_id]
try:
task["status"] = "generating"
task["message"] = "正在准备参考图与提示词…"
task["total"] = 1
task["done"] = 0
task["images"] = []
default_paths = list_default_reference_files(kind, limit=4)
default_urls = public_urls_for_reference_paths(default_paths) if default_paths else []
combined = (user_image_urls or []) + default_urls
combined = combined[:10]
aspect = "16:9" if kind == "cover" else "auto"
prompt = build_custom_prompt(kind, user_prompt)
task["message"] = "正在提交生成任务…"
draw_id = submit_draw(api_key, host, model, prompt, combined, aspect_ratio=aspect)
deadline = time.time() + POLL_TIMEOUT
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
result = poll_draw_result(api_key, host, draw_id)
st = result.get("status", "")
if st == "succeeded":
img_url = result["results"][0]["url"]
fname = f"{task_id}_custom.png"
out_path = app.config["GENERATED_FOLDER"] / fname
download_image(img_url, str(out_path))
task["done"] = 1
task["images"] = [{
"id": "custom",
"name": {"cover": "封面图", "feature": "功能图", "howto": "How-to 图"}.get(kind, "自定义"),
"slot": "CUSTOM",
"filename": fname,
"url": f"/api/images/{fname}",
"status": "ok",
}]
task["status"] = "completed"
task["message"] = "生成完成"
return
if st == "failed":
reason = result.get("failure_reason", "") or result.get("error", "unknown")
raise RuntimeError(reason)
raise TimeoutError("生成超时")
except Exception as e:
task["status"] = "error"
task["message"] = str(e)
task["images"] = [{
"id": "custom",
"name": "自定义图",
"slot": "CUSTOM",
"status": "error",
"error": str(e),
}]
# ── Image processing & hosting ──────────────────────────────────────────────
def compress_for_upload(file_path, max_dim=1600, quality=85):
"""Always convert to compressed JPEG for reliable hosting."""
img = Image.open(file_path)
# Handle transparency / palette modes → RGB with white background
if img.mode in ("RGBA", "LA", "P"):
if img.mode == "P":
img = img.convert("RGBA")
bg = Image.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
img.thumbnail((max_dim, max_dim), Image.LANCZOS)
buf = BytesIO()
img.save(buf, format="JPEG", quality=quality, optimize=True)
return buf.getvalue()
def _try_telegraph(jpeg_bytes):
resp = http_requests.post(
"https://telegra.ph/upload",
files={"file": ("image.jpg", jpeg_bytes, "image/jpeg")},
timeout=(20, 90),
)
resp.raise_for_status()
body = resp.json()
if isinstance(body, list) and body and "src" in body[0]:
return "https://telegra.ph" + body[0]["src"]
raise RuntimeError(f"Unexpected response: {body}")
def _try_catbox(jpeg_bytes):
resp = http_requests.post(
"https://catbox.moe/user/api.php",
data={"reqtype": "fileupload"},
files={"fileToUpload": ("image.jpg", jpeg_bytes, "image/jpeg")},
timeout=(20, 120),
)
resp.raise_for_status()
url = resp.text.strip()
if url.startswith("http"):
return url
raise RuntimeError(f"Unexpected response: {url[:200]}")
def _try_0x0(jpeg_bytes):
resp = http_requests.post(
"https://0x0.st",
files={"file": ("image.jpg", jpeg_bytes, "image/jpeg")},
timeout=(20, 90),
)
resp.raise_for_status()
url = resp.text.strip()
if url.startswith("http"):
return url
raise RuntimeError(f"Unexpected response: {url[:200]}")
def _post_grsai_with_retry(url, headers, json_payload):
"""POST to Grsai with long read timeout and retries on network errors."""
last_err = None
for attempt in range(GRSAI_HTTP_RETRIES):
try:
resp = http_requests.post(
url, headers=headers, json=json_payload, timeout=GRSAI_HTTP_TIMEOUT,
)
return resp
except (
http_requests.exceptions.Timeout,
http_requests.exceptions.ConnectionError,
) as e:
last_err = e
app.logger.warning(f"Grsai request attempt {attempt + 1}/{GRSAI_HTTP_RETRIES}: {e}")
if attempt < GRSAI_HTTP_RETRIES - 1:
time.sleep(2 + attempt * 3)
raise last_err
HOSTING_BACKENDS = [
("Telegraph", _try_telegraph),
("Catbox", _try_catbox),
("0x0.st", _try_0x0),
]
def upload_image(file_path):
"""Compress image and try multiple hosting services as fallbacks."""
jpeg_bytes = compress_for_upload(file_path)
errors = []
for name, fn in HOSTING_BACKENDS:
try:
url = fn(jpeg_bytes)
app.logger.info(f"Uploaded via {name}: {url}")
return url
except Exception as e:
errors.append(f"{name}: {e}")
app.logger.warning(f"{name} upload failed: {e}")
raise RuntimeError("所有图床均失败: " + "; ".join(errors))
# ── Nano Banana API helpers ─────────────────────────────────────────────────
def submit_draw(api_key, host, model, prompt, image_urls, aspect_ratio="1:1"):
"""Submit a draw request and return the task id."""
url = host + DRAW_PATH
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
payload = {
"model": model,
"prompt": prompt,
"webHook": "-1",
"aspectRatio": aspect_ratio,
}
if image_urls:
payload["urls"] = image_urls
resp = _post_grsai_with_retry(url, headers, payload)
resp.raise_for_status()
body = resp.json()
# The response structure when webHook="-1"
if "data" in body and "id" in body["data"]:
return body["data"]["id"]
if "id" in body:
return body["id"]
raise RuntimeError(f"Unexpected draw response: {json.dumps(body, ensure_ascii=False)}")
def poll_draw_result(api_key, host, task_id):
"""Poll the result endpoint once and return the status dict."""
url = host + RESULT_PATH
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
resp = _post_grsai_with_retry(url, headers, {"id": task_id})
resp.raise_for_status()
body = resp.json()
code = body.get("code")
if code == -22:
return {"status": "running"}
if code == 0 and body.get("data"):
return body["data"]
raise RuntimeError(body.get("msg", "Unknown API error"))
def wait_for_result(api_key, host, task_id):
"""Poll until the draw task completes or times out."""
deadline = time.time() + POLL_TIMEOUT
while time.time() < deadline:
result = poll_draw_result(api_key, host, task_id)
status = result.get("status", "")
if status == "succeeded":
return result
if status == "failed":
reason = result.get("failure_reason", "") or result.get("error", "unknown")
raise RuntimeError(f"Generation failed: {reason}")
time.sleep(POLL_INTERVAL)
raise TimeoutError("Image generation timed out")
def download_image(url, output_path):
"""Download an image from URL and save locally."""
resp = http_requests.get(url, timeout=(30, 120))
resp.raise_for_status()
with open(output_path, "wb") as f:
f.write(resp.content)
# ── Build prompts ──────────────────────────────────────────────────────────
def build_prompt(config, product_info):
name = product_info.get("product_name") or "the product shown in the reference image"
return config["prompt"].format(
product_name=name,
feature_1=product_info.get("feature_1") or "its primary selling feature",
feature_2=product_info.get("feature_2") or "its secondary selling feature",
target_user=product_info.get("target_user") or "a satisfied customer",
use_scene=product_info.get("use_scene") or "an appropriate real-world setting",
)
# ── Background generation task ─────────────────────────────────────────────
def run_generation_task(task_id, image_urls, product_info, api_key, host, model, num_images):
task = tasks[task_id]
try:
configs = AMAZON_IMAGE_CONFIGS[:num_images]
task["total"] = len(configs)
task["done"] = 0
task["images"] = []
# Phase 1: submit all draw requests
task["status"] = "submitting"
task["message"] = "正在提交生成任务…"
draw_jobs = []
for cfg in configs:
prompt = build_prompt(cfg, product_info)
try:
draw_id = submit_draw(api_key, host, model, prompt, image_urls)
draw_jobs.append({"config": cfg, "draw_id": draw_id, "state": "running"})
except Exception as e:
draw_jobs.append({"config": cfg, "draw_id": None, "state": "failed", "error": str(e)})
task["done"] += 1
task["images"].append({
"id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"],
"status": "error", "error": str(e),
})
# Phase 2: poll all running jobs until done
task["status"] = "generating"
task["message"] = "正在生成图片…"
while any(j["state"] == "running" for j in draw_jobs):
time.sleep(POLL_INTERVAL)
for job in draw_jobs:
if job["state"] != "running":
continue
cfg = job["config"]
try:
result = poll_draw_result(api_key, host, job["draw_id"])
st = result.get("status", "")
if st == "succeeded":
img_url = result["results"][0]["url"]
fname = f"{task_id}_{cfg['id']}.png"
out_path = app.config["GENERATED_FOLDER"] / fname
download_image(img_url, str(out_path))
job["state"] = "done"
task["done"] += 1
task["images"].append({
"id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"],
"filename": fname, "url": f"/api/images/{fname}", "status": "ok",
})
task["message"] = f"已完成 {task['done']}/{task['total']} 张"
elif st == "failed":
reason = result.get("failure_reason", "") or result.get("error", "unknown")
job["state"] = "failed"
task["done"] += 1
task["images"].append({
"id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"],
"status": "error", "error": reason,
})
task["message"] = f"已完成 {task['done']}/{task['total']} 张"
except Exception as e:
# Transient error, keep polling
pass
# Timeout check
elapsed = time.time() - task["_start"]
if elapsed > POLL_TIMEOUT * 1.5:
for job in draw_jobs:
if job["state"] == "running":
job["state"] = "timeout"
cfg = job["config"]
task["done"] += 1
task["images"].append({
"id": cfg["id"], "name": cfg["name"], "slot": cfg["slot"],
"status": "error", "error": "生成超时",
})
break
task["status"] = "completed"
ok_count = sum(1 for img in task["images"] if img["status"] == "ok")
task["message"] = f"完成!成功 {ok_count}/{task['total']} 张"
except Exception as e:
task["status"] = "error"
task["message"] = f"出错了:{e}"
# ── Routes ──────────────────────────────────────────────────────────────────
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/upload-images", methods=["POST"])
def api_upload_images():
"""Upload product images and host them for public access. Returns public URLs."""
files = request.files.getlist("images")
if not files:
return jsonify({"error": "请上传至少一张产品图片"}), 400
local_paths = []
public_urls = []
errors = []
for f in files:
fname = f"{uuid.uuid4().hex}{Path(f.filename).suffix.lower()}"
fp = app.config["UPLOAD_FOLDER"] / fname
f.save(fp)
local_paths.append(str(fp))
try:
url = upload_image(str(fp))
public_urls.append(url)
except Exception as e:
errors.append(f"{f.filename}: {e}")
if not public_urls and errors:
return jsonify({"error": "图片上传到图床失败:" + "; ".join(errors)}), 500
return jsonify({
"local_paths": local_paths,
"public_urls": public_urls,
"errors": errors,
})
@app.route("/api/generate", methods=["POST"])
def api_generate():
data = request.json
api_key = GRSAI_API_KEY
host = GRSAI_HOST
model = GRSAI_MODEL
num_images = min(int(data.get("num_images", 8)), 8)
image_urls = data.get("image_urls", [])
product_info = data.get("product_info", {})
tid = uuid.uuid4().hex[:8]
tasks[tid] = {
"id": tid,
"status": "queued",
"message": "排队中…",
"created": datetime.now().isoformat(),
"images": [],
"done": 0,
"total": 0,
"_start": time.time(),
}
t = threading.Thread(
target=run_generation_task,
args=(tid, image_urls, product_info, api_key, host, model, num_images),
daemon=True,
)
t.start()
return jsonify({"task_id": tid})
@app.route("/api/generate-custom", methods=["POST"])
def api_generate_custom():
data = request.json or {}
kind = (data.get("kind") or "").strip().lower()
if kind not in REF_FOLDERS:
return jsonify({"error": "kind 必须是 cover、feature 或 howto"}), 400
user_prompt = (data.get("prompt") or "").strip()
if not user_prompt:
return jsonify({"error": "请填写生成要求(prompt)"}), 400
image_urls = data.get("image_urls") or []
if not isinstance(image_urls, list):
image_urls = []
api_key = GRSAI_API_KEY
if not api_key:
return jsonify({"error": "服务端未配置 GRSAI_API_KEY"}), 500
host = GRSAI_HOST
model = GRSAI_MODEL
tid = "c" + uuid.uuid4().hex[:7]
tasks[tid] = {
"id": tid,
"kind": "custom",
"custom_kind": kind,
"status": "queued",
"message": "排队中…",
"created": datetime.now().isoformat(),
"images": [],
"done": 0,
"total": 1,
"_start": time.time(),
}
t = threading.Thread(
target=run_custom_generation_task,
args=(tid, kind, user_prompt, image_urls, api_key, host, model),
daemon=True,
)
t.start()
return jsonify({"task_id": tid})
@app.route("/api/status/<tid>")
def api_status(tid):
task = tasks.get(tid)
if not task:
return jsonify({"error": "任务不存在"}), 404
safe = {k: v for k, v in task.items() if not k.startswith("_")}
return jsonify(safe)
@app.route("/api/images/<filename>")
def serve_image(filename):
return send_from_directory(app.config["GENERATED_FOLDER"], filename)
@app.route("/api/download-all/<tid>")
def download_all(tid):
task = tasks.get(tid)
if not task:
return jsonify({"error": "任务不存在"}), 404
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for img in task.get("images", []):
if img["status"] == "ok" and img.get("filename"):
fp = app.config["GENERATED_FOLDER"] / img["filename"]
if fp.exists():
arc = f"{img['slot']}_{img['name'].replace(' ', '_')}.png"
zf.write(fp, arc)
buf.seek(0)
return send_file(buf, mimetype="application/zip", as_attachment=True,
download_name=f"amazon_images_{tid}.zip")
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
app.run(debug=False, host="0.0.0.0", port=port)