Spaces:
Sleeping
Sleeping
| ๏ปฟfrom __future__ import annotations | |
| import base64 | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import threading | |
| import time | |
| import traceback | |
| import uuid | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime, timezone | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import Optional | |
| import uvicorn | |
| from fastapi import FastAPI, File, Form, UploadFile | |
| from fastapi.responses import FileResponse, JSONResponse, Response | |
| from fastapi.staticfiles import StaticFiles | |
| from google import genai | |
| from google.genai import types | |
| from huggingface_hub import HfApi | |
| from openai import OpenAI | |
| from PIL import Image, ImageChops, ImageDraw, ImageFilter, ImageOps | |
| APP_TITLE = "AI ModelCut Studio" | |
| BASE_DIR = Path(__file__).parent | |
| ASSETS_DIR = BASE_DIR / "assets" | |
| PRESET_FACE_CANDIDATES = [ | |
| ASSETS_DIR / "model_face_preset.png", | |
| BASE_DIR / "model_face_preset.png", | |
| ] | |
| OPENAI_DEFAULT_IMAGE_MODEL = os.environ.get("OPENAI_IMAGE_MODEL", "gpt-image-2") | |
| GEMINI_DEFAULT_IMAGE_MODEL = os.environ.get("GEMINI_IMAGE_MODEL", "gemini-3.1-flash-image-preview") | |
| TARGET_SIZES = { | |
| "1K": (1024, 1536), | |
| "2K": (2048, 3072), | |
| } | |
| DEMO_FALLBACK = os.environ.get("DEMO_FALLBACK", "").lower() == "true" | |
| API_INPUT_MAX_SIDE = int(os.environ.get("API_INPUT_MAX_SIDE", "2048")) | |
| # Max concurrent image-generation calls (batch shots / Gemini candidates run in parallel). | |
| GEN_MAX_WORKERS = max(1, int(os.environ.get("GEN_MAX_WORKERS", "4"))) | |
| # Proportion policy. When a body reference exists, its proportions are always matched. | |
| # With NO body reference: IDEALIZE_PROPORTIONS=true โ force 8.2-8.5 heads editorial look; | |
| # otherwise leave proportions neutral (no forced head-shrink / leg elongation). | |
| IDEALIZE_PROPORTIONS = os.environ.get("IDEALIZE_PROPORTIONS", "").lower() == "true" | |
| # Post-process: re-crop full-body output so the subject occupies the same vertical band | |
| # (head-top / feet-bottom margins) as the body reference image. Set to "false" to disable. | |
| MATCH_REFERENCE_FRAMING = os.environ.get("MATCH_REFERENCE_FRAMING", "true").lower() != "false" | |
| # Color distance (0-255) above which a pixel counts as subject vs background. | |
| SUBJECT_BG_TOLERANCE = max(1, int(os.environ.get("SUBJECT_BG_TOLERANCE", "32"))) | |
| # Optional manual framing override (fractions of height, e.g. "0.10"). If BOTH are set they | |
| # replace the reference-derived margins โ head sits at TOP, feet at (1 - BOTTOM). | |
| FRAMING_TOP_MARGIN = os.environ.get("FRAMING_TOP_MARGIN", "").strip() | |
| FRAMING_BOTTOM_MARGIN = os.environ.get("FRAMING_BOTTOM_MARGIN", "").strip() | |
| HF_DATASET_REPO_DEFAULT = "sunyoung00/ROEM_TEST" | |
| STUDIO_BACKGROUND_PROMPT = ( | |
| "Use a clean seamless studio background in solid warm light gray color #E8E7E2. " | |
| "Ignore the background from all reference images. " | |
| "Keep only a natural soft floor shadow. " | |
| "Do not add props, walls, patterns, gradients, or colored lighting." | |
| ) | |
| FULL_BODY_PROPORTION_PROMPT = ( | |
| "Use elegant fashion model proportions with a naturally smaller head-to-body ratio, " | |
| "approximately 8.2 to 8.5 heads tall. Keep the face identity exactly the same, but scale " | |
| "the head naturally smaller relative to the full body. Use long legs, balanced shoulders, " | |
| "and realistic runway/editorial model proportions. Do not distort the face, neck, hands, " | |
| "feet, or garment shape." | |
| ) | |
| PROPORTION_MATCH_PROMPT = ( | |
| "Match the model's head SIZE, face size, neck length, torso-to-leg ratio, and overall " | |
| "head-to-body proportions to the body-type reference image. Reproduce the natural proportions " | |
| "shown in that reference. Do NOT elongate the legs, do NOT shrink the head, and do NOT apply " | |
| "exaggerated runway/editorial proportions. Do not distort the face, neck, hands, feet, or garment shape." | |
| ) | |
| FACE_ARTIFACT_PREVENTION_PROMPT = ( | |
| "Keep facial features clean, smooth, and natural. Do not over-sharpen the face, add skin " | |
| "texture noise, mottling, patchy artifacts, speckles, blotches, or uneven discoloration. " | |
| "Preserve clear eyes, nose, lips, brows, and natural skin tone without repainting the identity." | |
| ) | |
| FULL_BODY_FRAMING_LOCK_PROMPT = ( | |
| "Preserve the subject scale, crop, and camera distance from the selected base image. " | |
| "The selected base image controls the final framing, not the pose reference image. " | |
| "Match the selected base image subject bounding box: keep the head top, shoe bottom, body center, " | |
| "and full-body height in nearly the same pixel positions. Do not zoom out, do not make the model " | |
| "smaller in the frame, and do not copy the margins from the pose reference image. If pose and " | |
| "framing conflict, prioritize the selected base image framing." | |
| ) | |
| SKIN_TONE_LOCK_PROMPT = ( | |
| "Preserve the original skin tone and facial exposure from the selected base image. " | |
| "Do not whiten, pale, brighten, over-smooth, or overexpose the face." | |
| ) | |
| DETAIL_SHOT_PROMPT = ( | |
| "Create an EXTREME close-up macro detail shot of the garment only. " | |
| "NO MODEL, NO FACE, NO BODY PARTS, NO HAIR, NO SKIN. " | |
| "Zoom in tightly to showcase the fabric texture, stitching, and construction quality of the target area. " | |
| "Keep the exact same garment color, material, and design as the source image." | |
| ) | |
| FULL_BODY_FRAMING_BLOCK = ( | |
| "FRAMING (FULL BODY, NON-NEGOTIABLE): wide full-length shot. " | |
| "The standing figure must occupy approximately 75-80% of the frame height, with clear empty space on all four sides. " | |
| "Leave at least 8% empty space above the top of the hair/head, at least 8% below the soles of the shoes, and " | |
| "about 5% on the left and right. Every body part must be visible: head, face, shoulders, torso, waist, knees, " | |
| "ankles, and feet with complete shoes. If ANY body part is cropped, the result is wrong. " | |
| "No bags, no phones, no extra accessories held in the hands." | |
| ) | |
| # Precise English transform instruction per Korean shot label. | |
| # Used to image-to-image transform the selected base cut while keeping identity/outfit locked. | |
| SHOT_TRANSFORM_INSTRUCTIONS = { | |
| "์ ์ (์ ๋ฉด)": "Front-facing FULL-BODY standing shot of the exact same model and outfit.", | |
| "์ ์ (์๋ฉด)": "Front-facing FULL-BODY standing shot of the exact same model and outfit.", | |
| "์ ์ (์์ ํฌ์ฆ)": ( | |
| "FULL-BODY shot of the exact same model and outfit in a natural, relaxed editorial pose. " | |
| "Keep both feet and the complete standing figure visible." | |
| ), | |
| "์ ์ (์ธก๋ฉด)": ( | |
| "Rotate the model to a SIDE PROFILE (about 90 degrees) to show the silhouette of the exact same outfit " | |
| "as a full-body shot." | |
| ), | |
| "์ ์ (ํ๋ฉด)": ( | |
| "Rotate the model 180 degrees to show the BACK of the exact same outfit as a full-body shot. " | |
| "Show the back construction details of the garment clearly." | |
| ), | |
| "์๋ฐ์ ": ( | |
| "MEDIUM CLOSE-UP UPPER-BODY portrait, framed from approximately the waist up to above the top of the head. " | |
| "The entire head including the complete crown of hair MUST be fully visible โ leave at least 8% empty space " | |
| "above the hair, never crop the top of the head. Sharp focus on the upper garment." | |
| ), | |
| "์๋ฐ์ (์๋ฉด)": ( | |
| "Front-facing MEDIUM CLOSE-UP UPPER-BODY portrait, framed from approximately the waist up to above the top " | |
| "of the head. The entire head and hair crown MUST be fully visible โ leave at least 8% empty space above " | |
| "the hair, never crop the top of the head. Sharp focus on the upper garment." | |
| ), | |
| "์๋ฐ์ (์ธก๋ฉด)": ( | |
| "SIDE-PROFILE (about 90 degrees) UPPER-BODY portrait, framed from approximately the waist up to above the " | |
| "top of the head. Keep the whole head and hair crown visible. Show the side silhouette of the upper garment." | |
| ), | |
| "์๋ฐ์ (ํ๋ฉด)": ( | |
| "Rotate the model 180 degrees and frame an UPPER-BODY BACK portrait from the waist up. " | |
| "Keep the whole head and hair crown visible. Show the back neckline and upper-back construction of the same garment." | |
| ), | |
| "์๋ฐ์ (ํด๋ก์ฆ์ )": ( | |
| "TIGHT CLOSE-UP of the upper chest, neckline, and collar/tie area of the same garment, including the lower " | |
| "face and shoulders. Show the fabric texture and neckline construction in sharp detail. Keep the same model identity." | |
| ), | |
| "ํ๋ฐ์ ": ( | |
| "LOWER-BODY shot framed from the waist down to the soles of the shoes. " | |
| "Keep both feet and the complete shoes fully visible. Sharp focus on the lower garment, hem, and shoes." | |
| ), | |
| "ํ๋ฐ์ (์์ ํฌ์ฆ)": ( | |
| "LOWER-BODY shot from the waist down in a natural, relaxed stance. " | |
| "Both feet and complete shoes must be fully visible. Sharp focus on the lower garment and footwear." | |
| ), | |
| "ํ๋ฐ์ (ํด๋ก์ฆ์ )": ( | |
| "EXTREME CLOSE-UP macro of the lower-garment detail (waistband, tie, hem, or fabric texture). " | |
| "Garment only โ no face. Show the construction and texture in sharp detail." | |
| ), | |
| "๋ํ ์ผ(์์)": "Focus the detail shot on the TOP garment area (collar, placket, sleeve, or main fabric texture).", | |
| "๋ํ ์ผ(ํฌ์ผ)": "Focus the detail shot on the POCKET area, showing stitching and construction.", | |
| "๋ํ ์ผ(์ ๋ฐ)": "Focus the detail shot on the SHOES / footwear.", | |
| "๋ํ ์ผ(ํ๋ฉด)": ( | |
| "Focus the detail shot on the BACK construction of the garment (back neckline, zipper, seams, or fabric " | |
| "texture from behind)." | |
| ), | |
| } | |
| # Garment-only macro shots (no model/face/skin). | |
| _DETAIL_SHOTS = {"๋ํ ์ผ(์์)", "๋ํ ์ผ(ํฌ์ผ)", "๋ํ ์ผ(์ ๋ฐ)", "๋ํ ์ผ(ํ๋ฉด)", "ํ๋ฐ์ (ํด๋ก์ฆ์ )"} | |
| # Shots where the deterministic crop-to-reference is skipped (extreme crops / no clear full subject). | |
| _NO_REFRAME_SHOTS = _DETAIL_SHOTS | {"์๋ฐ์ (ํด๋ก์ฆ์ )"} | |
| # ---- Per-shot reference library + body-type reference -------------------------- | |
| # Each shot button maps 1:1 to a reference image in assets/poses/ whose filename is the | |
| # shot label with parentheses turned into underscores, e.g.: | |
| # "์ ์ (์๋ฉด)" -> assets/poses/์ ์ _์๋ฉด_.(png|jpg|jpeg|webp) | |
| # "์๋ฐ์ (ํด๋ก์ฆ์ )" -> assets/poses/์๋ฐ์ _ํด๋ก์ฆ์ _.(...) | |
| # "ํ๋ฐ์ " -> assets/poses/ํ๋ฐ์ .(...) | |
| # The reference defines pose, camera angle, and crop/framing for that shot. | |
| POSES_DIR = ASSETS_DIR / "poses" | |
| POSE_IMAGE_EXTENSIONS = (".png", ".jpg", ".jpeg", ".webp") | |
| # Model body-type reference (physique only; face stays from the face preset). | |
| BODY_PRESET_CANDIDATES = [ | |
| ASSETS_DIR / "model_body_preset.png", | |
| BASE_DIR / "model_body_preset.png", | |
| ] | |
| def _shot_reference_stems(shot_type: str) -> list[str]: | |
| """Candidate filename stems for a shot label, in priority order. | |
| Supports both naming styles so references resolve regardless of how they were saved: | |
| 1) label as-is, with parentheses kept -> "์ ์ (์๋ฉด)" -> ์ ์ (์๋ฉด).jpeg | |
| 2) parentheses replaced with underscores -> "์ ์ _์๋ฉด_" -> ์ ์ _์๋ฉด_.jpeg | |
| """ | |
| label = (shot_type or "").strip() | |
| if not label: | |
| return [] | |
| underscore = label.replace("(", "_").replace(")", "_") | |
| stems = [label] | |
| if underscore != label: | |
| stems.append(underscore) | |
| return stems | |
| def _shot_reference_stem(shot_type: str) -> str: | |
| """Primary (parens-kept) filename stem for a shot label.""" | |
| stems = _shot_reference_stems(shot_type) | |
| return stems[0] if stems else "" | |
| BODY_REFERENCE_PROMPT = ( | |
| "A BODY-TYPE reference image is provided. Match the model's physique to it: overall height " | |
| "impression, body build, shoulder width, limb proportions, AND the head-to-body ratio โ i.e. how " | |
| "large the head and face appear relative to the full body. Use ONLY the body type and proportions " | |
| "from that image. Do NOT copy its face, hairstyle, skin tone, clothing, pose, or background โ those " | |
| "come from the other reference images." | |
| ) | |
| def _reference_legend(has_face: bool, has_body: bool, product_count: int, has_pose: bool) -> str: | |
| """Describe each reference image by its position so the model never confuses roles.""" | |
| roles: list[str] = [] | |
| if has_face: | |
| roles.append("FACE identity (copy this exact face, hairline, and features)") | |
| if has_body: | |
| roles.append("BODY-TYPE physique (match build/proportions only; ignore its face, hair, clothing, pose)") | |
| if has_pose: | |
| roles.append("POSE/FRAMING guide (follow its body pose, camera angle, viewing direction and crop only; ignore its face, clothing, body type, background)") | |
| for index in range(product_count): | |
| roles.append(f"PRODUCT garment {index + 1} (preserve its design, color, logo, and texture exactly)") | |
| if not roles: | |
| return "" | |
| legend = "; ".join(f"image {index + 1} = {role}" for index, role in enumerate(roles)) | |
| return "REFERENCE IMAGE ROLES (in this exact order): " + legend + "." | |
| app = FastAPI(title=APP_TITLE) | |
| ASSETS_DIR.mkdir(exist_ok=True) | |
| app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") | |
| _OPENAI_CLIENT: Optional[OpenAI] = None | |
| _GEMINI_CLIENT: Optional[genai.Client] = None | |
| _CLIENT_LOCK = threading.Lock() | |
| def _log(message: str, request_id: str = "-") -> None: | |
| print(f"[MODEL-CUT][{request_id}] {message}", flush=True) | |
| def _create_fallback_face() -> Image.Image: | |
| canvas = Image.new("RGB", (768, 1024), (248, 248, 248)) | |
| draw = ImageDraw.Draw(canvas) | |
| draw.ellipse((210, 120, 558, 468), fill=(36, 28, 26)) | |
| draw.rounded_rectangle((258, 210, 510, 560), radius=118, fill=(238, 211, 195)) | |
| draw.ellipse((276, 315, 332, 344), fill=(74, 64, 58)) | |
| draw.ellipse((436, 315, 492, 344), fill=(74, 64, 58)) | |
| draw.arc((340, 378, 428, 438), 15, 165, fill=(170, 116, 110), width=5) | |
| draw.line((300, 283, 348, 272), fill=(72, 52, 45), width=7) | |
| draw.line((420, 272, 468, 283), fill=(72, 52, 45), width=7) | |
| draw.rounded_rectangle((120, 558, 648, 980), radius=140, fill=(238, 211, 195)) | |
| draw.rectangle((188, 782, 580, 1024), fill=(255, 255, 255)) | |
| draw.line((384, 104, 384, 258), fill=(82, 70, 66), width=5) | |
| return canvas | |
| def load_preset_face() -> Image.Image: | |
| for preset_path in PRESET_FACE_CANDIDATES: | |
| if preset_path.exists(): | |
| return ImageOps.exif_transpose(Image.open(preset_path)).convert("RGB") | |
| return _create_fallback_face() | |
| def load_body_reference() -> Optional[Image.Image]: | |
| """Optional model body-type reference. Returns None if no preset is present.""" | |
| for preset_path in BODY_PRESET_CANDIDATES: | |
| if preset_path.exists(): | |
| return ImageOps.exif_transpose(Image.open(preset_path)).convert("RGB") | |
| return None | |
| def load_shot_reference(shot_type: str) -> Optional[Image.Image]: | |
| """Load the reference image that defines pose/angle/crop for the given shot label. | |
| Looks in assets/poses/ (then assets/) for a file whose name matches the shot label, | |
| accepting both parens-kept ("์ ์ (์๋ฉด).jpeg") and underscore ("์ ์ _์๋ฉด_.jpeg") | |
| naming. Returns None if no matching reference file is present. | |
| """ | |
| stems = _shot_reference_stems(shot_type) | |
| if not stems: | |
| return None | |
| for directory in (POSES_DIR, ASSETS_DIR): | |
| if not directory.exists(): | |
| continue | |
| for stem in stems: | |
| for ext in POSE_IMAGE_EXTENSIONS: | |
| candidate = directory / f"{stem}{ext}" | |
| if candidate.exists(): | |
| return ImageOps.exif_transpose(Image.open(candidate)).convert("RGB") | |
| return None | |
| async def _read_upload(upload: Optional[UploadFile]) -> Optional[Image.Image]: | |
| if upload is None or not upload.filename: | |
| return None | |
| content = await upload.read() | |
| if not content: | |
| return None | |
| return ImageOps.exif_transpose(Image.open(BytesIO(content))).convert("RGB") | |
| def _read_data_url_image(data_url: str) -> Optional[Image.Image]: | |
| if not data_url or not data_url.startswith("data:image/") or ";base64," not in data_url: | |
| return None | |
| encoded = data_url.split(";base64,", 1)[1] | |
| raw = base64.b64decode(encoded) | |
| return ImageOps.exif_transpose(Image.open(BytesIO(raw))).convert("RGB") | |
| def _get_openai_client() -> OpenAI: | |
| global _OPENAI_CLIENT | |
| if _OPENAI_CLIENT is None: | |
| with _CLIENT_LOCK: | |
| if _OPENAI_CLIENT is None: | |
| _OPENAI_CLIENT = OpenAI() | |
| return _OPENAI_CLIENT | |
| def _get_gemini_client(api_key: str) -> genai.Client: | |
| global _GEMINI_CLIENT | |
| if _GEMINI_CLIENT is None: | |
| with _CLIENT_LOCK: | |
| if _GEMINI_CLIENT is None: | |
| _GEMINI_CLIENT = genai.Client(api_key=api_key) | |
| return _GEMINI_CLIENT | |
| def _prepare_api_reference(image: Image.Image) -> Image.Image: | |
| prepared = ImageOps.exif_transpose(image).convert("RGB") | |
| prepared.thumbnail((API_INPUT_MAX_SIDE, API_INPUT_MAX_SIDE), Image.Resampling.LANCZOS) | |
| return prepared | |
| def _image_summary(image: Optional[Image.Image]) -> str: | |
| if image is None: | |
| return "none" | |
| return f"{image.width}x{image.height}" | |
| def _fit_image(image: Image.Image, size: tuple[int, int]) -> Image.Image: | |
| image = ImageOps.exif_transpose(image).convert("RGBA") | |
| image.thumbnail(size, Image.Resampling.LANCZOS) | |
| canvas = Image.new("RGBA", size, (246, 243, 239, 255)) | |
| x = (size[0] - image.width) // 2 | |
| y = (size[1] - image.height) // 2 | |
| canvas.alpha_composite(image, (x, y)) | |
| return canvas | |
| def _draw_model_cut( | |
| product_image: Optional[Image.Image], | |
| model_face: Image.Image, | |
| label: str, | |
| resolution: str, | |
| pose_shift: int, | |
| shot_type: str = "?๊พฉ๋(?๋บฃใ)", | |
| ) -> Image.Image: | |
| size = (1024, 1280) if resolution == "1K" else (1536, 1920) | |
| canvas = Image.new("RGB", size, (246, 243, 239)) | |
| draw = ImageDraw.Draw(canvas) | |
| grid = max(size[0] // 24, 36) | |
| for x in range(0, size[0], grid): | |
| draw.line((x, 0, x, size[1]), fill=(235, 232, 226), width=1) | |
| for y in range(0, size[1], grid): | |
| draw.line((0, y, size[0], y), fill=(235, 232, 226), width=1) | |
| cx = size[0] // 2 + pose_shift | |
| head_r = size[0] // 15 | |
| is_upper = "์๋ฐ์ " in shot_type or "?๊ณท์ปฒ" in shot_type | |
| is_lower = "ํ๋ฐ์ " in shot_type or "?์์ปฒ" in shot_type | |
| is_detail = "๋ํ ์ผ" in shot_type or "?๋ท๋" in shot_type | |
| is_back = "ํ๋ฉด" in shot_type or "?๊พจใ" in shot_type | |
| if is_upper: | |
| head_r = size[0] // 11 | |
| if is_detail: | |
| head_r = size[0] // 18 | |
| draw.ellipse((cx - head_r, size[1] // 8, cx + head_r, size[1] // 8 + head_r * 2), fill=(232, 204, 184)) | |
| draw.arc( | |
| (cx - head_r - 8, size[1] // 8 - 6, cx + head_r + 8, size[1] // 8 + head_r * 2), | |
| 190, | |
| 350, | |
| fill=(24, 24, 26), | |
| width=max(8, size[0] // 70), | |
| ) | |
| face = ImageOps.fit(model_face, (head_r * 2, head_r * 2), method=Image.Resampling.LANCZOS, centering=(0.5, 0.34)) | |
| face_mask = Image.new("L", face.size, 0) | |
| mask_draw = ImageDraw.Draw(face_mask) | |
| mask_draw.ellipse((0, 0, face.width, face.height), fill=230) | |
| canvas.paste(face, (cx - head_r, size[1] // 8), face_mask.filter(ImageFilter.GaussianBlur(0.6))) | |
| shoulder_y = size[1] // 4 | |
| hem_y = int(size[1] * 0.72) | |
| if is_upper: | |
| shoulder_y = size[1] // 3 | |
| hem_y = int(size[1] * 0.92) | |
| if is_lower: | |
| shoulder_y = size[1] // 7 | |
| hem_y = int(size[1] * 0.82) | |
| if is_back: | |
| draw.rectangle((cx - head_r, size[1] // 8, cx + head_r, size[1] // 8 + head_r * 2), fill=(31, 28, 27)) | |
| body = [ | |
| (cx - size[0] // 6, shoulder_y), | |
| (cx + size[0] // 6, shoulder_y), | |
| (cx + size[0] // 8, hem_y), | |
| (cx - size[0] // 8, hem_y), | |
| ] | |
| draw.polygon(body, fill=(29, 32, 36)) | |
| if product_image: | |
| product_box = (size[0] // 3, int(size[1] * 0.44)) | |
| if is_upper: | |
| product_box = (size[0] // 2, int(size[1] * 0.5)) | |
| if is_lower: | |
| product_box = (size[0] // 2, int(size[1] * 0.58)) | |
| if is_detail: | |
| product_box = (int(size[0] * 0.72), int(size[1] * 0.55)) | |
| product = _fit_image(product_image, product_box) | |
| product_mask = Image.new("L", product.size, 0) | |
| product_mask_draw = ImageDraw.Draw(product_mask) | |
| product_mask_draw.rounded_rectangle((0, 0, product.width, product.height), radius=18, fill=210) | |
| px = cx - product.width // 2 | |
| py = shoulder_y + size[1] // 18 | |
| if is_lower: | |
| py = int(size[1] * 0.36) | |
| if is_detail: | |
| py = int(size[1] * 0.26) | |
| canvas.paste(product.convert("RGB"), (px, py), product_mask.filter(ImageFilter.GaussianBlur(1.2))) | |
| leg_y = hem_y | |
| if not is_upper and not is_detail: | |
| draw.line((cx - size[0] // 14, leg_y, cx - size[0] // 9, int(size[1] * 0.9)), fill=(24, 26, 29), width=size[0] // 34) | |
| draw.line((cx + size[0] // 14, leg_y, cx + size[0] // 9, int(size[1] * 0.9)), fill=(24, 26, 29), width=size[0] // 34) | |
| draw.ellipse((24, 24, 82, 82), fill=(20, 22, 24)) | |
| draw.text((41, 42), "AI", fill=(255, 255, 255)) | |
| draw.text((24, size[1] - 64), label, fill=(30, 34, 38)) | |
| return canvas | |
| def _image_to_data_url(image: Image.Image, resolution: str = "1K") -> str: | |
| output = BytesIO() | |
| if resolution == "2K": | |
| image.convert("RGB").save(output, format="JPEG", quality=92, optimize=True, progressive=True, subsampling=0) | |
| encoded = base64.b64encode(output.getvalue()).decode("ascii") | |
| return f"data:image/jpeg;base64,{encoded}" | |
| image.save(output, format="PNG", optimize=True) | |
| encoded = base64.b64encode(output.getvalue()).decode("ascii") | |
| return f"data:image/png;base64,{encoded}" | |
| def _image_to_png_bytes(image: Image.Image) -> bytes: | |
| output = BytesIO() | |
| image.save(output, format="PNG") | |
| output.seek(0) | |
| return output.getvalue() | |
| def _image_to_jpeg_bytes(image: Image.Image) -> bytes: | |
| output = BytesIO() | |
| image.convert("RGB").save(output, format="JPEG", quality=95, optimize=True, subsampling=0) | |
| output.seek(0) | |
| return output.getvalue() | |
| def _safe_dataset_name(value: str) -> str: | |
| cleaned = re.sub(r"[^0-9A-Za-z๊ฐ-ํฃ_.()-]+", "_", value.strip()) | |
| return cleaned.strip("_")[:80] or "modelcut" | |
| def _upload_generation_to_dataset( | |
| images: list[Image.Image], | |
| labels: list[str], | |
| metadata: dict, | |
| request_id: str, | |
| ) -> None: | |
| dataset_repo = os.environ.get("HF_DATASET_REPO", HF_DATASET_REPO_DEFAULT).strip() | |
| token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") | |
| if not dataset_repo: | |
| _log("dataset upload skipped: HF_DATASET_REPO is empty", request_id) | |
| return | |
| if not token: | |
| _log("dataset upload skipped: HF_TOKEN is not set", request_id) | |
| return | |
| try: | |
| api = HfApi(token=token) | |
| api.create_repo(repo_id=dataset_repo, repo_type="dataset", exist_ok=True) | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | |
| folder = f"generated/{timestamp}_{request_id}" | |
| uploaded_files = [] | |
| for index, image in enumerate(images, start=1): | |
| label = labels[index - 1] if index - 1 < len(labels) else f"image-{index}" | |
| filename = f"{index:02d}_{_safe_dataset_name(label)}.png" | |
| path_in_repo = f"{folder}/{filename}" | |
| api.upload_file( | |
| path_or_fileobj=_image_to_png_bytes(image), | |
| path_in_repo=path_in_repo, | |
| repo_id=dataset_repo, | |
| repo_type="dataset", | |
| commit_message=f"Add generated model cut {request_id}", | |
| ) | |
| uploaded_files.append(path_in_repo) | |
| metadata_payload = { | |
| **metadata, | |
| "request_id": request_id, | |
| "created_at": timestamp, | |
| "files": uploaded_files, | |
| } | |
| api.upload_file( | |
| path_or_fileobj=json.dumps(metadata_payload, ensure_ascii=False, indent=2).encode("utf-8"), | |
| path_in_repo=f"{folder}/metadata.json", | |
| repo_id=dataset_repo, | |
| repo_type="dataset", | |
| commit_message=f"Add model cut metadata {request_id}", | |
| ) | |
| _log(f"dataset upload done repo={dataset_repo} files={len(uploaded_files)} folder={folder}", request_id) | |
| except Exception as error: | |
| _log(f"dataset upload failed repo={dataset_repo} error={error}", request_id) | |
| def _normalize_output_size(image: Image.Image, resolution: str) -> Image.Image: | |
| target = TARGET_SIZES.get(resolution, TARGET_SIZES["1K"]) | |
| image = ImageOps.exif_transpose(image).convert("RGB") | |
| if image.size == target: | |
| return image | |
| fitted = ImageOps.contain(image, target, method=Image.Resampling.LANCZOS) | |
| canvas = Image.new("RGB", target, (246, 243, 239)) | |
| x = (target[0] - fitted.width) // 2 | |
| y = (target[1] - fitted.height) // 2 | |
| canvas.paste(fitted, (x, y)) | |
| return canvas | |
| def _estimate_bg_color(image: Image.Image) -> tuple[int, int, int]: | |
| """Estimate the (solid studio) background color from the image corners.""" | |
| rgb = image.convert("RGB") | |
| w, h = rgb.size | |
| patch = max(4, min(w, h) // 50) | |
| samples: list[tuple[int, int, int]] = [] | |
| for cx, cy in [(0, 0), (w - patch, 0), (0, h - patch), (w - patch, h - patch)]: | |
| region = rgb.crop((cx, cy, cx + patch, cy + patch)) | |
| samples.append(tuple(int(v) for v in region.resize((1, 1), Image.Resampling.LANCZOS).getpixel((0, 0)))) | |
| samples.sort(key=lambda c: c[0] + c[1] + c[2]) | |
| return samples[len(samples) // 2] # median-ish corner | |
| def _subject_bbox(image: Image.Image, tolerance: int) -> Optional[tuple[int, int, int, int]]: | |
| """Bounding box (left, top, right, bottom) of the subject vs a near-solid background.""" | |
| rgb = image.convert("RGB") | |
| bg = Image.new("RGB", rgb.size, _estimate_bg_color(rgb)) | |
| diff = ImageChops.difference(rgb, bg) | |
| r, g, b = diff.split() | |
| per_pixel_max = ImageChops.lighter(ImageChops.lighter(r, g), b) # strongest channel diff | |
| mask = per_pixel_max.point(lambda p: 255 if p >= tolerance else 0) | |
| return mask.getbbox() | |
| def _reframe_to_reference( | |
| image: Image.Image, | |
| reference: Optional[Image.Image], | |
| target_size: tuple[int, int], | |
| ) -> Image.Image: | |
| """Re-crop/scale a full-body output so the subject occupies the same vertical band | |
| (head-top and feet-bottom margins) as the reference image โ or as explicit env margins. | |
| Falls back to the unchanged image (at target size) if detection looks unreliable.""" | |
| width, height = target_size | |
| base = image.convert("RGB") | |
| # 1) Determine the target vertical band (fractions of final height). | |
| top_frac: Optional[float] = None | |
| bottom_frac: Optional[float] = None | |
| if FRAMING_TOP_MARGIN and FRAMING_BOTTOM_MARGIN: | |
| try: | |
| top_frac = float(FRAMING_TOP_MARGIN) | |
| bottom_frac = 1.0 - float(FRAMING_BOTTOM_MARGIN) | |
| except ValueError: | |
| top_frac = bottom_frac = None | |
| if top_frac is None and reference is not None: | |
| ref_box = _subject_bbox(reference, SUBJECT_BG_TOLERANCE) | |
| if ref_box: | |
| top_frac = ref_box[1] / reference.height | |
| bottom_frac = ref_box[3] / reference.height | |
| if top_frac is None or bottom_frac is None: | |
| return _normalize_output_size(base, "1K" if target_size == TARGET_SIZES["1K"] else "2K") | |
| subject_frac = bottom_frac - top_frac | |
| if not (0.2 < subject_frac < 0.98): # sanity: reference detection failed | |
| return _normalize_output_size(base, "1K" if target_size == TARGET_SIZES["1K"] else "2K") | |
| # 2) Find the subject in the generated image. | |
| gen_box = _subject_bbox(base, SUBJECT_BG_TOLERANCE) | |
| if not gen_box: | |
| return _normalize_output_size(base, "1K" if target_size == TARGET_SIZES["1K"] else "2K") | |
| gen_subject_h = gen_box[3] - gen_box[1] | |
| if gen_subject_h <= 0: | |
| return _normalize_output_size(base, "1K" if target_size == TARGET_SIZES["1K"] else "2K") | |
| # 3) Scale so the subject height matches the target band, then place it. | |
| scale = (subject_frac * height) / gen_subject_h | |
| new_w = max(1, round(base.width * scale)) | |
| new_h = max(1, round(base.height * scale)) | |
| scaled = base.resize((new_w, new_h), Image.Resampling.LANCZOS) | |
| subject_cx = ((gen_box[0] + gen_box[2]) / 2) * scale | |
| subject_top = gen_box[1] * scale | |
| paste_x = round(width / 2 - subject_cx) | |
| paste_y = round(top_frac * height - subject_top) | |
| canvas = Image.new("RGB", (width, height), _estimate_bg_color(base)) | |
| canvas.paste(scaled, (paste_x, paste_y)) | |
| return canvas | |
| def _openai_size_for_model(model: str, resolution: str) -> str: | |
| if model == "gpt-image-2": | |
| return "2048x3072" if resolution == "2K" else "1024x1536" | |
| return "1024x1536" | |
| def _gemini_image_config(model: str, resolution: str) -> types.ImageConfig: | |
| image_config = {"aspect_ratio": "2:3"} | |
| if model in {"gemini-3.1-flash-image-preview", "gemini-3-pro-image-preview"}: | |
| image_config["image_size"] = resolution | |
| return types.ImageConfig(**image_config) | |
| def _compose_generation_prompt( | |
| category: str, | |
| fit: str, | |
| length: str, | |
| style: str, | |
| prompt: str, | |
| pose: str, | |
| total_length_cm: str, | |
| generation_mode: str, | |
| shot_type: str, | |
| selected_base_index: int, | |
| has_body_reference: bool = False, | |
| has_pose_reference: bool = False, | |
| product_count: int = 0, | |
| ) -> str: | |
| shot_instruction = "?๊พฉ๋(?๋บฃใ) ่ใ ปใง??๏งโค๋ฝ่??๊พจ๋ซ็??์น๊ฝฆ?์๊ฝญ??" | |
| is_full_body = generation_mode != "shot_variant" or "์ ์ " in shot_type or "?๊พฉ๋" in shot_type | |
| if generation_mode == "shot_variant": | |
| shot_instruction = ( | |
| f"?์ข๊นฎ??ๆนฒ๊ณ? ่?{selected_base_index + 1}??๏งโค๋ฝ ?์จ๋ฌ, ?ใ ผ๋ผฑ, ๏งฃ๋์, ?์๊ธฝ, ?๋ฑ๊ธฝ, ?๋ฏ์ฑ, ๆฟก์ํฌ, " | |
| f"?ใ ป๏ผ?๏ฝ? ?์ข??์ํฌ ??ๆดั๋ฃ๏ง?'{shot_type}'ๆฟก?่นยๅฏ์๋ธฏ?๋ช์. " | |
| "Use the selected base image reference as the source photo to transform; do not create a new unrelated model." | |
| ) | |
| length_text = f"{length}, ?๋๊ธฝ ็ฅ์น์ฃ {total_length_cm}cm" if total_length_cm else length | |
| legend = _reference_legend( | |
| has_face=True, | |
| has_body=has_body_reference, | |
| product_count=product_count, | |
| has_pose=has_pose_reference, | |
| ) | |
| # Proportion policy: a body reference always wins (match it). Without one, only | |
| # apply the idealized 8.2-8.5 head look when explicitly enabled. | |
| if not is_full_body: | |
| proportion_prompt = "" | |
| elif has_body_reference: | |
| proportion_prompt = PROPORTION_MATCH_PROMPT | |
| elif IDEALIZE_PROPORTIONS: | |
| proportion_prompt = FULL_BODY_PROPORTION_PROMPT | |
| else: | |
| proportion_prompt = "" | |
| return "\n".join( | |
| [ | |
| "Create a high-resolution fashion ecommerce AI model photo.", | |
| legend, | |
| "CRITICAL IDENTITY LOCK: Use the face reference (image 1) as the exact persona model.", | |
| "All generated candidates must show the same person, not a similar-looking new model.", | |
| "Preserve the same face shape, jawline, eye shape, eye spacing, nose, lips, eyebrows, skin tone, and hairline from the face reference.", | |
| "Do not beautify, age-shift, ethnicity-shift, change makeup style, or invent a different face.", | |
| "If generating multiple candidates, keep the face identity identical across every candidate.", | |
| BODY_REFERENCE_PROMPT if has_body_reference else "", | |
| proportion_prompt, | |
| FACE_ARTIFACT_PREVENTION_PROMPT if is_full_body else "", | |
| FULL_BODY_FRAMING_BLOCK if is_full_body else "", | |
| "Preserve the original skin tone and facial exposure from the face reference. Do not whiten, pale, brighten, over-smooth, or overexpose the face.", | |
| shot_instruction, | |
| f"Garment category: {category}. Fit: {fit}. Length: {length_text}.", | |
| f"Style: {style}. Pose reference: {pose}.", | |
| STUDIO_BACKGROUND_PROMPT, | |
| "Use sharp fabric texture and accurate garment edges.", | |
| "Preserve the uploaded product image details as faithfully as possible.", | |
| "Do not alter logos, buttons, patterns, colors, or silhouette.", | |
| "Output should be suitable for a shopping mall product detail page.", | |
| prompt.strip(), | |
| ] | |
| ).strip() | |
| def _compose_transform_prompt( | |
| shot_type: str, | |
| prompt: str, | |
| total_length_cm: str, | |
| selected_base_index: int, | |
| has_pose_reference: bool = False, | |
| ) -> str: | |
| is_detail = shot_type in _DETAIL_SHOTS | |
| is_full_body = "์ ์ " in shot_type | |
| shot_instruction = SHOT_TRANSFORM_INSTRUCTIONS.get( | |
| shot_type, f"Create this shot composition: {shot_type}." | |
| ) | |
| extra = f"Additional instruction: {prompt.strip()}" if prompt.strip() else "" | |
| pose_reference = ( | |
| "A POSE/FRAMING reference image is also provided. Match its body pose, camera angle, viewing " | |
| "direction (front / side / back), and crop/framing as closely as possible. Take ONLY pose, angle " | |
| "and framing from it โ identity, face, outfit, garment color and texture must come from the source " | |
| "(first) image, never from the pose reference." | |
| if has_pose_reference | |
| else "" | |
| ) | |
| lines = [ | |
| "Edit the FIRST image. Use it as the source photo to transform; do NOT create a new, unrelated model.", | |
| ] | |
| if is_detail: | |
| lines.append( | |
| "Keep the exact same garment color, fabric texture, material, silhouette, logos, and design as the first image." | |
| ) | |
| else: | |
| lines.append( | |
| "Keep the exact same person, face, skin tone, hair style, outfit, garment color, fabric texture, " | |
| "silhouette, shoes, and background from the first image." | |
| ) | |
| lines.append("Do not repaint the face, do not beautify, and do not change the clothing design.") | |
| lines.append(f"TARGET SHOT: {shot_type}.") | |
| lines.append(shot_instruction) | |
| if is_detail: | |
| lines.append(DETAIL_SHOT_PROMPT) | |
| else: | |
| # Person is in frame โ preserve skin tone; lock scale/crop only for full-body shots. | |
| lines.append(SKIN_TONE_LOCK_PROMPT) | |
| if is_full_body: | |
| lines.append(FULL_BODY_FRAMING_LOCK_PROMPT) | |
| lines.append(FULL_BODY_FRAMING_BLOCK) | |
| lines.append(pose_reference) | |
| lines.append("Keep the edit natural and close to the source image.") | |
| lines.append(extra) | |
| return "\n".join(line for line in lines if line).strip() | |
| def _split_provider_model(image_model: str) -> tuple[str, str]: | |
| if ":" not in image_model: | |
| return "openai", image_model | |
| provider, model = image_model.split(":", 1) | |
| return provider, model | |
| def _resolve_model(provider: str, model: str) -> str: | |
| if provider == "openai": | |
| return os.environ.get("OPENAI_IMAGE_MODEL", model or OPENAI_DEFAULT_IMAGE_MODEL) | |
| if provider == "gemini": | |
| return os.environ.get("GEMINI_IMAGE_MODEL", model or GEMINI_DEFAULT_IMAGE_MODEL) | |
| return model | |
| def _generate_with_openai( | |
| references: list[Optional[Image.Image]], | |
| model: str, | |
| prompt: str, | |
| resolution: str, | |
| count: int, | |
| request_id: str = "-", | |
| ) -> list[Image.Image]: | |
| if not os.environ.get("OPENAI_API_KEY"): | |
| raise RuntimeError("OPENAI_API_KEY is not set.") | |
| client = _get_openai_client() | |
| references = [_prepare_api_reference(image) for image in references if image is not None] | |
| size = _openai_size_for_model(model, resolution) | |
| image_files = [] | |
| try: | |
| started = time.perf_counter() | |
| _log( | |
| f"openai start model={model} size={size} count={count} refs={len(references)} " | |
| f"ref_sizes={[f'{image.width}x{image.height}' for image in references]} prompt_chars={len(prompt)}", | |
| request_id, | |
| ) | |
| for index, image in enumerate(references): | |
| payload = BytesIO(_image_to_jpeg_bytes(image)) | |
| payload.name = f"reference_{index}.jpg" | |
| image_files.append(payload) | |
| if image_files: | |
| response = client.images.edit( | |
| model=model, | |
| image=image_files, | |
| prompt=prompt, | |
| size=size, | |
| quality="high", | |
| n=count, | |
| ) | |
| else: | |
| response = client.images.generate( | |
| model=model, | |
| prompt=prompt, | |
| size=size, | |
| quality="high", | |
| n=count, | |
| ) | |
| images = [] | |
| for item in response.data: | |
| if getattr(item, "b64_json", None): | |
| raw = base64.b64decode(item.b64_json) | |
| images.append(_normalize_output_size(Image.open(BytesIO(raw)), resolution)) | |
| elif getattr(item, "url", None): | |
| raise RuntimeError("OpenAI returned an image URL, but URL fetching is disabled in this container.") | |
| if not images: | |
| raise RuntimeError("OpenAI did not return image data.") | |
| _log(f"openai done images={len(images)} elapsed={time.perf_counter() - started:.1f}s", request_id) | |
| return images | |
| finally: | |
| for file in image_files: | |
| file.close() | |
| def _generate_with_gemini( | |
| references: list[Optional[Image.Image]], | |
| model: str, | |
| prompt: str, | |
| resolution: str, | |
| count: int, | |
| request_id: str = "-", | |
| ) -> list[Image.Image]: | |
| api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("GEMINI_API_KEY or GOOGLE_API_KEY is not set.") | |
| client = _get_gemini_client(api_key) | |
| references = [_prepare_api_reference(image) for image in references if image is not None] | |
| contents = [*references, prompt] | |
| started = time.perf_counter() | |
| _log( | |
| f"gemini start model={model} count={count} refs={len(references)} " | |
| f"ref_sizes={[f'{image.width}x{image.height}' for image in references]} prompt_chars={len(prompt)}", | |
| request_id, | |
| ) | |
| def _one_candidate(_index: int) -> Optional[Image.Image]: | |
| response = client.models.generate_content( | |
| model=model, | |
| contents=contents, | |
| config=types.GenerateContentConfig( | |
| response_modalities=["TEXT", "IMAGE"], | |
| image_config=_gemini_image_config(model, resolution), | |
| ), | |
| ) | |
| parts = getattr(response, "parts", None) | |
| if parts is None and getattr(response, "candidates", None): | |
| parts = response.candidates[0].content.parts | |
| for part in parts or []: | |
| inline_data = getattr(part, "inline_data", None) | |
| if inline_data and inline_data.data: | |
| raw = inline_data.data | |
| if isinstance(raw, str): | |
| raw = base64.b64decode(raw) | |
| return _normalize_output_size(Image.open(BytesIO(raw)), resolution) | |
| return None | |
| if count <= 1: | |
| images = [image for image in [_one_candidate(0)] if image is not None] | |
| else: | |
| # Fan out the candidate calls; executor.map preserves input order. | |
| with ThreadPoolExecutor(max_workers=min(count, GEN_MAX_WORKERS)) as executor: | |
| images = [image for image in executor.map(_one_candidate, range(count)) if image is not None] | |
| if not images: | |
| raise RuntimeError("Gemini did not return image data.") | |
| _log(f"gemini done images={len(images)} elapsed={time.perf_counter() - started:.1f}s", request_id) | |
| return images | |
| def generate_model_cuts( | |
| product_images: list[Optional[Image.Image]], | |
| model_face: Image.Image, | |
| selected_reference_image: Optional[Image.Image], | |
| pose_reference_image: Optional[Image.Image], | |
| image_model: str, | |
| selected_product: str, | |
| category: str, | |
| fit: str, | |
| length: str, | |
| style: str, | |
| prompt: str, | |
| pose: str, | |
| resolution: str, | |
| total_length_cm: str, | |
| generation_mode: str, | |
| shot_type: str, | |
| shot_types: list[str], | |
| selected_base_index: int, | |
| only_selected_cut: bool, | |
| model_body: Optional[Image.Image] = None, | |
| request_id: str = "-", | |
| ) -> tuple[list[Image.Image], list[str]]: | |
| product_match = re.search(r"\d+", selected_product or "") | |
| product_index = max(0, min(3, int(product_match.group(0)) - 1 if product_match else 0)) | |
| selected_pair = product_images[product_index * 2 : product_index * 2 + 2] | |
| primary_product = next((image for image in selected_pair + product_images if image is not None), None) | |
| length_label = f"{length} / {total_length_cm}cm" if total_length_cm else length | |
| provider, requested_model = _split_provider_model(image_model) | |
| model = _resolve_model(provider, requested_model) | |
| # Body-type reference: explicit upload wins, otherwise fall back to assets preset (may be None). | |
| body_reference = model_body or load_body_reference() | |
| front_products = [image for image in product_images if image is not None] | |
| _log( | |
| f"compose mode={generation_mode} provider={provider} model={model} resolution={resolution} " | |
| f"selected_product={selected_product} selected_pair={[_image_summary(image) for image in selected_pair]} " | |
| f"selected_reference={_image_summary(selected_reference_image)} pose_reference={_image_summary(pose_reference_image)} " | |
| f"face={_image_summary(model_face)} body_reference={_image_summary(body_reference)} " | |
| f"shot_type={shot_type or '-'} shot_types={shot_types or []}", | |
| request_id, | |
| ) | |
| composed_prompt = _compose_generation_prompt( | |
| category=category, | |
| fit=fit, | |
| length=length, | |
| style=style, | |
| prompt=prompt, | |
| pose=pose, | |
| total_length_cm=total_length_cm, | |
| generation_mode=generation_mode, | |
| shot_type=shot_type, | |
| selected_base_index=selected_base_index, | |
| has_body_reference=body_reference is not None, | |
| has_pose_reference=False, | |
| product_count=len(front_products), | |
| ) | |
| if generation_mode in {"shot_variant", "shot_batch"}: | |
| selected_shots = shot_types if generation_mode == "shot_batch" and shot_types else [shot_type or "?๊พฉ๋(?๋จฏ์?ั์ซฐ)"] | |
| reference_face = None if selected_reference_image is not None else model_face | |
| def _render_shot(selected_shot: str) -> list[Image.Image]: | |
| # User-uploaded pose wins; otherwise load the named reference for this exact shot. | |
| shot_pose = pose_reference_image or load_shot_reference(selected_shot) | |
| # Order matters: base image first, then the pose/framing reference. | |
| references = [ | |
| image | |
| for image in [reference_face, selected_reference_image, shot_pose] | |
| if image is not None | |
| ] | |
| _log( | |
| f"transform shot={selected_shot} refs={len(references)} " | |
| f"ref_sizes={[_image_summary(image) for image in references]} " | |
| f"pose={'upload' if pose_reference_image is not None else ('named' if shot_pose is not None else 'none')}", | |
| request_id, | |
| ) | |
| shot_prompt = _compose_transform_prompt( | |
| shot_type=selected_shot, | |
| prompt=prompt, | |
| total_length_cm=total_length_cm, | |
| selected_base_index=selected_base_index, | |
| has_pose_reference=shot_pose is not None, | |
| ) | |
| if provider == "openai": | |
| shot_images = _generate_with_openai(references, model, shot_prompt, resolution, 1, request_id) | |
| else: | |
| shot_images = _generate_with_gemini(references, model, shot_prompt, resolution, 1, request_id) | |
| # Crop/scale the output to match the reference's framing โ except for extreme | |
| # garment crops (detail / close-up) where subject detection is unreliable. | |
| if MATCH_REFERENCE_FRAMING and shot_pose is not None and selected_shot not in _NO_REFRAME_SHOTS: | |
| target_size = TARGET_SIZES.get(resolution, TARGET_SIZES["1K"]) | |
| shot_images = [_reframe_to_reference(image, shot_pose, target_size) for image in shot_images] | |
| return shot_images | |
| try: | |
| if provider in {"openai", "gemini"}: | |
| if len(selected_shots) <= 1: | |
| results = [_render_shot(selected_shots[0])] | |
| else: | |
| # Shots are independent โ fan out. executor.map keeps the input order, | |
| # so images stay aligned with their labels. | |
| with ThreadPoolExecutor(max_workers=min(len(selected_shots), GEN_MAX_WORKERS)) as executor: | |
| results = list(executor.map(_render_shot, selected_shots)) | |
| images = [image for shot_images in results for image in shot_images] | |
| labels = list(selected_shots) | |
| return images, labels | |
| except Exception as error: | |
| if not DEMO_FALLBACK: | |
| raise | |
| print(f"Real image generation failed, using demo renderer: {error}") | |
| elif generation_mode in {"front_candidates", "front_candidate"}: | |
| front_count = 1 if generation_mode == "front_candidate" else 3 | |
| # Reference order: face (identity) โ body-type (physique) โ product garments. | |
| front_references = [model_face] | |
| if body_reference is not None: | |
| front_references.append(body_reference) | |
| front_references.extend(front_products) | |
| try: | |
| if provider == "openai": | |
| images = _generate_with_openai(front_references, model, composed_prompt, resolution, front_count, request_id) | |
| elif provider == "gemini": | |
| images = _generate_with_gemini(front_references, model, composed_prompt, resolution, front_count, request_id) | |
| else: | |
| images = None | |
| if images is not None: | |
| # Re-crop so the subject sits in the same vertical band as the framing reference. | |
| # Prefer the dedicated ์ ์ (์๋ฉด) reference, else fall back to the body reference. | |
| framing_ref = load_shot_reference("์ ์ (์๋ฉด)") or body_reference | |
| if MATCH_REFERENCE_FRAMING and (framing_ref is not None or (FRAMING_TOP_MARGIN and FRAMING_BOTTOM_MARGIN)): | |
| target_size = TARGET_SIZES.get(resolution, TARGET_SIZES["1K"]) | |
| reframed = [_reframe_to_reference(image, framing_ref, target_size) for image in images] | |
| _log(f"reframe applied to {len(reframed)} front candidate(s) target={target_size}", request_id) | |
| images = reframed | |
| return images, [f"์ ์ (์ ๋ฉด) ํ๋ณด {index + 1}" for index in range(front_count)] | |
| except Exception as error: | |
| if not DEMO_FALLBACK: | |
| raise | |
| print(f"Real image generation failed, using demo renderer: {error}") | |
| if generation_mode in {"shot_variant", "shot_batch"}: | |
| selected_shots = shot_types if generation_mode == "shot_batch" and shot_types else [shot_type or "?๊พฉ๋(?๋จฏ์?ั์ซฐ)"] | |
| images = [] | |
| labels = [] | |
| base_label = f"?์ข๊นฎ ่?{selected_base_index + 1}" | |
| shift_map = { | |
| "์ ์ (์์ ํฌ์ฆ)": -36, | |
| "์ ์ (์ธก๋ฉด)": 42, | |
| "์ ์ (ํ๋ฉด)": 0, | |
| "์๋ฐ์ ": 0, | |
| "์๋ฐ์ (ํ๋ฉด)": 18, | |
| "ํ๋ฐ์ ": -18, | |
| "ํ๋ฐ์ (์์ ํฌ์ฆ)": 34, | |
| "๋ํ ์ผ(์์)": 0, | |
| "๋ํ ์ผ(ํฌ์ผ)": -22, | |
| "๋ํ ์ผ(์ ๋ฐ)": 22, | |
| } | |
| for shot_label in selected_shots: | |
| label = f"{shot_label} / {base_label}" | |
| image = _draw_model_cut(primary_product, model_face, label, resolution, shift_map.get(shot_label, 0), shot_label) | |
| images.append(image) | |
| labels.append(shot_label) | |
| return images, labels | |
| fallback_count = 1 if generation_mode == "front_candidate" else 3 | |
| labels = [ | |
| f"์ ์ (์ ๋ฉด) ํ๋ณด 1 / {category} / {fit} / {length_label}", | |
| f"์ ์ (์ ๋ฉด) ํ๋ณด 2 / {style}", | |
| f"์ ์ (์ ๋ฉด) ํ๋ณด 3 / {pose}", | |
| ][:fallback_count] | |
| shifts = [0, -18, 18][:fallback_count] | |
| images = [ | |
| _draw_model_cut(primary_product, model_face, label, resolution, shift, "์ ์ (์ ๋ฉด)") | |
| for label, shift in zip(labels, shifts) | |
| ] | |
| return images, [f"์ ์ (์ ๋ฉด) ํ๋ณด {index + 1}" for index in range(fallback_count)] | |
| def index() -> FileResponse: | |
| return FileResponse(BASE_DIR / "index.html") | |
| def styles() -> FileResponse: | |
| return FileResponse(BASE_DIR / "styles.css") | |
| def script() -> FileResponse: | |
| return FileResponse(BASE_DIR / "script.js") | |
| def model_face_preset() -> Response: | |
| for preset_path in PRESET_FACE_CANDIDATES: | |
| if preset_path.exists(): | |
| return FileResponse(preset_path) | |
| return Response(content=_image_to_png_bytes(_create_fallback_face()), media_type="image/png") | |
| def health() -> dict[str, str]: | |
| return {"status": "ok"} | |
| async def generate( | |
| product_1_front: Optional[UploadFile] = File(None), | |
| product_1_back: Optional[UploadFile] = File(None), | |
| product_2_front: Optional[UploadFile] = File(None), | |
| product_2_back: Optional[UploadFile] = File(None), | |
| product_3_front: Optional[UploadFile] = File(None), | |
| product_3_back: Optional[UploadFile] = File(None), | |
| product_4_front: Optional[UploadFile] = File(None), | |
| product_4_back: Optional[UploadFile] = File(None), | |
| model_face: Optional[UploadFile] = File(None), | |
| model_body: Optional[UploadFile] = File(None), | |
| face_source: str = Form("์ฒจ๋ถ ์ผ๊ตด ํ๋ฆฌ์ "), | |
| image_model: str = Form("openai:gpt-image-2"), | |
| selected_product: str = Form("์ ํ 1"), | |
| category: str = Form("์์ฐํฐ"), | |
| fit: str = Form("ํ์ค"), | |
| length: str = Form("๋ฌด๋ฆ"), | |
| style: str = Form("์ปค๋จธ์ค ๋ฃฉ๋ถ"), | |
| prompt: str = Form(""), | |
| pose: str = Form("์ ๋ฉด"), | |
| resolution: str = Form("1K"), | |
| total_length_cm: str = Form(""), | |
| generation_mode: str = Form("front_candidates"), | |
| shot_type: str = Form(""), | |
| shot_types: str = Form(""), | |
| selected_base_index: int = Form(0), | |
| selected_reference_image: Optional[UploadFile] = File(None), | |
| pose_reference_image: Optional[UploadFile] = File(None), | |
| only_selected_cut: bool = Form(False), | |
| ) -> JSONResponse: | |
| request_id = uuid.uuid4().hex[:8] | |
| request_started = time.perf_counter() | |
| _log( | |
| f"request start mode={generation_mode} shot_type={shot_type or '-'} shot_types={shot_types or '-'} " | |
| f"model={image_model} resolution={resolution} selected_product={selected_product}", | |
| request_id, | |
| ) | |
| uploads = [ | |
| product_1_front, | |
| product_1_back, | |
| product_2_front, | |
| product_2_back, | |
| product_3_front, | |
| product_3_back, | |
| product_4_front, | |
| product_4_back, | |
| ] | |
| product_images = [await _read_upload(upload) for upload in uploads] | |
| selected_reference = await _read_upload(selected_reference_image) | |
| pose_reference = await _read_upload(pose_reference_image) | |
| uploaded_face = await _read_upload(model_face) | |
| uploaded_body = await _read_upload(model_body) | |
| _log( | |
| f"uploads products={sum(image is not None for image in product_images)}/8 " | |
| f"product_sizes={[_image_summary(image) for image in product_images if image is not None]} " | |
| f"selected_reference={_image_summary(selected_reference)} pose_reference={_image_summary(pose_reference)} " | |
| f"uploaded_face={_image_summary(uploaded_face)}", | |
| request_id, | |
| ) | |
| if face_source == "?๋ ์ค???์จ๋ฌ" and uploaded_face: | |
| selected_face = uploaded_face | |
| elif any(preset_path.exists() for preset_path in PRESET_FACE_CANDIDATES): | |
| selected_face = load_preset_face() | |
| elif DEMO_FALLBACK: | |
| selected_face = load_preset_face() | |
| else: | |
| return JSONResponse( | |
| { | |
| "error": "?์โ ค?๋ฎ๊ตน ?์จ๋ฌ ?๊พจโ?๋ญ์ ?๋๋ฟ?๋๋. assets/model_face_preset.png ?๋จฎ๋ ็ทโฆ๋ model_face_preset.png็??ัโๅซ๊ณ๊ตน ?๋ถพใ?๋จฏ๊ฝ ๏งโค๋ฝ ?์จ๋ฌ???๋ ์ค?์๋ธฏ?๋ช์.", | |
| "provider": _split_provider_model(image_model)[0], | |
| "model": _resolve_model(*_split_provider_model(image_model)), | |
| "generation_mode": generation_mode, | |
| "resolution": resolution, | |
| }, | |
| status_code=400, | |
| ) | |
| try: | |
| images, labels = await asyncio.to_thread( | |
| generate_model_cuts, | |
| product_images=product_images, | |
| model_face=selected_face, | |
| selected_reference_image=selected_reference, | |
| pose_reference_image=pose_reference, | |
| image_model=image_model, | |
| selected_product=selected_product, | |
| category=category, | |
| fit=fit, | |
| length=length, | |
| style=style, | |
| prompt=prompt, | |
| pose=pose, | |
| resolution=resolution, | |
| total_length_cm=total_length_cm, | |
| generation_mode=generation_mode, | |
| shot_type=shot_type, | |
| shot_types=[item for item in shot_types.split("|") if item], | |
| selected_base_index=selected_base_index, | |
| only_selected_cut=only_selected_cut, | |
| model_body=uploaded_body, | |
| request_id=request_id, | |
| ) | |
| _log(f"request done images={len(images)} labels={labels} elapsed={time.perf_counter() - request_started:.1f}s", request_id) | |
| asyncio.create_task( | |
| asyncio.to_thread( | |
| _upload_generation_to_dataset, | |
| images, | |
| labels, | |
| { | |
| "kind": "generate", | |
| "image_model": image_model, | |
| "selected_product": selected_product, | |
| "category": category, | |
| "fit": fit, | |
| "length": length, | |
| "style": style, | |
| "pose": pose, | |
| "resolution": resolution, | |
| "total_length_cm": total_length_cm, | |
| "generation_mode": generation_mode, | |
| "shot_type": shot_type, | |
| "shot_types": [item for item in shot_types.split("|") if item], | |
| "selected_base_index": selected_base_index, | |
| "labels": labels, | |
| }, | |
| request_id, | |
| ) | |
| ) | |
| except Exception as error: | |
| provider, requested_model = _split_provider_model(image_model) | |
| resolved_model = _resolve_model(provider, requested_model) | |
| traceback.print_exc() | |
| _log(f"request failed error={error} elapsed={time.perf_counter() - request_started:.1f}s", request_id) | |
| return JSONResponse( | |
| { | |
| "error": str(error), | |
| "provider": provider, | |
| "model": resolved_model, | |
| "generation_mode": generation_mode, | |
| "resolution": resolution, | |
| }, | |
| status_code=500, | |
| ) | |
| return JSONResponse({"images": [_image_to_data_url(image, resolution) for image in images], "labels": labels}) | |
| async def edit_image( | |
| base_image: UploadFile = File(...), | |
| reference_images: Optional[list[UploadFile]] = File(None), | |
| image_model: str = Form("openai:gpt-image-2"), | |
| instruction: str = Form(""), | |
| background: str = Form(""), | |
| resolution: str = Form("1K"), | |
| ) -> JSONResponse: | |
| try: | |
| base = await _read_upload(base_image) | |
| if base is None: | |
| return JSONResponse({"error": "?์์ ??ๆนฒ๊ณ? ?๋?๏งยๅชย ?๋๋ฟ?๋๋."}, status_code=400) | |
| refs = [] | |
| for upload in reference_images or []: | |
| image = await _read_upload(upload) | |
| if image is not None: | |
| refs.append(image) | |
| provider, requested_model = _split_provider_model(image_model) | |
| model = _resolve_model(provider, requested_model) | |
| edit_prompt = "\n".join( | |
| [ | |
| "Edit this fashion model image while preserving the same model identity, outfit, garment color, fabric texture, silhouette, and product details.", | |
| "Only apply the requested changes. Do not change the face or clothing unless explicitly requested.", | |
| f"Background preset: {background or 'keep current background'}", | |
| f"User edit instruction: {instruction or 'Regenerate naturally with the same settings.'}", | |
| ] | |
| ) | |
| if provider == "openai": | |
| images = _generate_with_openai([base, *refs], model, edit_prompt, resolution, 1) | |
| elif provider == "gemini": | |
| images = _generate_with_gemini([base, *refs], model, edit_prompt, resolution, 1) | |
| else: | |
| return JSONResponse({"error": f"๏งย?๋จฐ๋ธฏ๏งย ?๋ ๋ provider?๋ ๋ฒ?? {provider}"}, status_code=400) | |
| edit_request_id = uuid.uuid4().hex[:8] | |
| asyncio.create_task( | |
| asyncio.to_thread( | |
| _upload_generation_to_dataset, | |
| images, | |
| ["์์ ์ด๋ฏธ์ง"], | |
| { | |
| "kind": "edit", | |
| "image_model": image_model, | |
| "resolution": resolution, | |
| "background": background, | |
| "instruction": instruction, | |
| "labels": ["์์ ์ด๋ฏธ์ง"], | |
| }, | |
| edit_request_id, | |
| ) | |
| ) | |
| return JSONResponse({"images": [_image_to_data_url(image, resolution) for image in images], "labels": ["?์์ ?๋?๏งย"]}) | |
| except Exception as error: | |
| provider, requested_model = _split_provider_model(image_model) | |
| traceback.print_exc() | |
| return JSONResponse( | |
| { | |
| "error": str(error), | |
| "provider": provider, | |
| "model": _resolve_model(provider, requested_model), | |
| "resolution": resolution, | |
| }, | |
| status_code=500, | |
| ) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "7860")) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port) | |