Spaces:
Runtime error
Runtime error
| # app.py | |
| import os | |
| import random | |
| import re | |
| import shutil | |
| import subprocess | |
| import time | |
| import traceback | |
| from typing import Optional, Tuple | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from diffusers import QwenImageEditPlusPipeline, QwenImageTransformer2DModel | |
| # ----------------------------------------------------------------------------- | |
| # Optional: Hugging Face Spaces helpers (chỉ có trên Spaces runtime) | |
| # ----------------------------------------------------------------------------- | |
| try: | |
| import spaces # type: ignore | |
| except Exception: | |
| spaces = None | |
| # ----------------------------------------------------------------------------- | |
| # (OPTIONAL) Cache model về Google Drive khi chạy Colab | |
| # ----------------------------------------------------------------------------- | |
| # os.environ["HF_HOME"] = "/content/drive/MyDrive/hf_cache" | |
| # os.environ["TRANSFORMERS_CACHE"] = os.path.join(os.environ["HF_HOME"], "transformers") # legacy (không cần) | |
| # os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1" | |
| # ----------------------------------------------------------------------------- | |
| # Model loading (global singleton) | |
| # ----------------------------------------------------------------------------- | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| MAX_SEED = np.iinfo(np.int32).max | |
| _pipe: Optional[QwenImageEditPlusPipeline] = None | |
| def get_pipe() -> QwenImageEditPlusPipeline: | |
| global _pipe | |
| if _pipe is not None: | |
| return _pipe | |
| # Load base pipeline + Rapid-AIO transformer (đúng type theo diffusers) | |
| pipe = QwenImageEditPlusPipeline.from_pretrained( | |
| "Qwen/Qwen-Image-Edit-2509", | |
| transformer=QwenImageTransformer2DModel.from_pretrained( | |
| "linoyts/Qwen-Image-Edit-Rapid-AIO", | |
| subfolder="transformer", | |
| torch_dtype=dtype, | |
| device_map="cuda" if device == "cuda" else None, | |
| ), | |
| torch_dtype=dtype, | |
| ) | |
| if device == "cuda": | |
| pipe = pipe.to("cuda") | |
| # Load LoRA angles | |
| pipe.load_lora_weights( | |
| "dx8152/Qwen-Edit-2509-Multiple-angles", | |
| weight_name="镜头转换.safetensors", | |
| adapter_name="angles", | |
| ) | |
| pipe.set_adapters(["angles"], adapter_weights=[1.0]) | |
| pipe.fuse_lora(adapter_names=["angles"], lora_scale=1.25) | |
| pipe.unload_lora_weights() | |
| # Optional AOTI (Spaces only) | |
| if spaces is not None and hasattr(spaces, "aoti_blocks_load"): | |
| try: | |
| spaces.aoti_blocks_load(pipe.transformer, "zerogpu-aoti/Qwen-Image", variant="fa3") | |
| print("[INFO] spaces.aoti_blocks_load enabled") | |
| except Exception as e: | |
| print(f"[WARN] spaces.aoti_blocks_load failed: {e}") | |
| _pipe = pipe | |
| return _pipe | |
| # ----------------------------------------------------------------------------- | |
| # Prompt builder | |
| # ----------------------------------------------------------------------------- | |
| def build_camera_prompt( | |
| rotate_deg: float = 0.0, | |
| move_forward: float = 0.0, | |
| vertical_tilt: float = 0.0, | |
| wideangle: bool = False, | |
| ) -> str: | |
| parts = [] | |
| if rotate_deg != 0: | |
| if rotate_deg > 0: | |
| parts.append( | |
| f"将镜头向左旋转{abs(rotate_deg)}度 Rotate the camera {abs(rotate_deg)} degrees to the left." | |
| ) | |
| else: | |
| parts.append( | |
| f"将镜头向右旋转{abs(rotate_deg)}度 Rotate the camera {abs(rotate_deg)} degrees to the right." | |
| ) | |
| if move_forward > 5: | |
| parts.append("将镜头转为特写镜头 Turn the camera to a close-up.") | |
| elif move_forward >= 1: | |
| parts.append("将镜头向前移动 Move the camera forward.") | |
| if vertical_tilt <= -1: | |
| parts.append("将相机转向鸟瞰视角 Turn the camera to a bird's-eye view.") | |
| elif vertical_tilt >= 1: | |
| parts.append("将相机切换到仰视视角 Turn the camera to a worm's-eye view.") | |
| if wideangle: | |
| parts.append("将镜头转为广角镜头 Turn the camera to a wide-angle lens.") | |
| return " ".join(parts) if parts else "no camera movement" | |
| # ----------------------------------------------------------------------------- | |
| # Inference | |
| # ----------------------------------------------------------------------------- | |
| def infer_camera_edit( | |
| image: Optional[Image.Image], | |
| rotate_deg: float, | |
| move_forward: float, | |
| vertical_tilt: float, | |
| wideangle: bool, | |
| seed: int, | |
| randomize_seed: bool, | |
| true_guidance_scale: float, | |
| num_inference_steps: int, | |
| height: int, | |
| width: int, | |
| ) -> Tuple[Image.Image, int, str]: | |
| if image is None: | |
| raise gr.Error("Please upload an image.") | |
| prompt = build_camera_prompt(rotate_deg, move_forward, vertical_tilt, wideangle) | |
| print("Prompt:", prompt) | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| if prompt == "no camera movement": | |
| return image, seed, prompt | |
| pipe = get_pipe() | |
| generator = torch.Generator(device=device).manual_seed(int(seed)) | |
| out = pipe( | |
| image=[image.convert("RGB")], | |
| prompt=prompt, | |
| height=height if height and height > 0 else None, | |
| width=width if width and width > 0 else None, | |
| num_inference_steps=int(num_inference_steps), | |
| true_cfg_scale=float(true_guidance_scale), | |
| generator=generator, | |
| num_images_per_prompt=1, | |
| ).images[0] | |
| return out, seed, prompt | |
| # ----------------------------------------------------------------------------- | |
| # UI (Gradio 6.3.0 compatible) | |
| # ----------------------------------------------------------------------------- | |
| CAMERA_3D_HTML_TEMPLATE = """ | |
| <div style="width:100%;height:400px;background:#1a1a1a;border-radius:12px; | |
| display:flex;align-items:center;justify-content:center;color:#aaa;"> | |
| <div style="text-align:center;"> | |
| <div style="font-size:18px;color:#00ff88;font-weight:700;">3D Camera Preview</div> | |
| <div style="margin-top:8px;">(Gradio 6) JS ↔ Python sync removed</div> | |
| <div style="margin-top:6px;">Use sliders to control camera</div> | |
| </div> | |
| </div> | |
| """ | |
| css = """ | |
| #camera-3d-control { min-height: 400px; } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("## 🎬 Qwen Image Edit — Camera Angle Control (Gradio 6.3.0)") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image = gr.Image(label="Input Image", type="pil", height=280) | |
| with gr.Tab("🎮 3D Camera Preview"): | |
| _ = gr.HTML(value=CAMERA_3D_HTML_TEMPLATE, elem_id="camera-3d-control") | |
| with gr.Tab("🎚️ Controls"): | |
| rotate_deg = gr.Slider(-90, 90, step=45, label="Rotate (°)", value=0) | |
| move_forward = gr.Slider(0, 10, step=5, label="Move Forward", value=0) | |
| vertical_tilt = gr.Slider(-1, 1, step=1, label="Vertical Tilt", value=0) | |
| wideangle = gr.Checkbox(label="Wide-angle", value=False) | |
| run_btn = gr.Button("🚀 Generate", variant="primary") | |
| with gr.Column(scale=1): | |
| result = gr.Image(label="Output Image", height=350) | |
| prompt_preview = gr.Textbox(label="Generated Prompt", interactive=False) | |
| with gr.Accordion("⚙️ Advanced", open=False): | |
| seed = gr.Slider(0, MAX_SEED, step=1, label="Seed", value=0) | |
| randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) | |
| true_guidance_scale = gr.Slider(1.0, 10.0, step=0.1, label="CFG Scale", value=1.0) | |
| num_inference_steps = gr.Slider(1, 40, step=1, label="Steps", value=4) | |
| height = gr.Slider(256, 2048, step=8, value=1024, label="Height") | |
| width = gr.Slider(256, 2048, step=8, value=1024, label="Width") | |
| # ✅ API endpoint: /run/predict với api_name="edit_camera_angles" | |
| run_btn.click( | |
| fn=infer_camera_edit, | |
| inputs=[ | |
| image, | |
| rotate_deg, | |
| move_forward, | |
| vertical_tilt, | |
| wideangle, | |
| seed, | |
| randomize_seed, | |
| true_guidance_scale, | |
| num_inference_steps, | |
| height, | |
| width, | |
| ], | |
| outputs=[result, seed, prompt_preview], | |
| api_name="edit_camera_angles", | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=16) | |
| demo.launch( | |
| server_name="127.0.0.1", | |
| server_port=7861, | |
| share=False, # <- quan trọng | |
| debug=True, | |
| show_error=True, | |
| prevent_thread_lock=True, # để không block cell (nếu bạn chạy bằng python app.py) | |
| ) | |
| # giữ process sống nếu chạy bằng python app.py | |
| import time | |
| while True: | |
| time.sleep(3600) | |