| | import os |
| | import subprocess |
| | import sys |
| |
|
| | |
| | WORKSPACE_DIR = "/workspace" |
| | VENV_DIR = os.path.join(WORKSPACE_DIR, "venv") |
| | APPS_DIR = os.path.join(WORKSPACE_DIR, "apps") |
| | REPO_DIR = os.path.join(WORKSPACE_DIR, "Qwen-Image-Edit") |
| | HF_TOKEN = "YOUR_HF_TOKEN_HERE" |
| |
|
| | |
| | CACHE_BASE = os.path.join(WORKSPACE_DIR, "cache") |
| | TMP_DIR = os.path.join(WORKSPACE_DIR, "tmp") |
| | PIP_CACHE = os.path.join(CACHE_BASE, "pip") |
| | HF_HOME = os.path.join(CACHE_BASE, "huggingface") |
| |
|
| | def ensure_dirs(): |
| | """Ensures all necessary persistent directories exist.""" |
| | dirs = [APPS_DIR, REPO_DIR, CACHE_BASE, TMP_DIR, PIP_CACHE, HF_HOME] |
| | for d in dirs: |
| | if not os.path.exists(d): |
| | os.makedirs(d) |
| | print(f"Created directory: {d}") |
| |
|
| | def run_command(command, cwd=None, env=None): |
| | """Runs a shell command and prints output.""" |
| | print(f"Running: {command}") |
| | current_env = os.environ.copy() |
| | |
| | |
| | current_env["TMPDIR"] = TMP_DIR |
| | current_env["PIP_CACHE_DIR"] = PIP_CACHE |
| | current_env["HF_HOME"] = HF_HOME |
| | |
| | if env: |
| | current_env.update(env) |
| | |
| | process = subprocess.Popen( |
| | command, |
| | shell=True, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.STDOUT, |
| | text=True, |
| | cwd=cwd, |
| | env=current_env |
| | ) |
| | for line in process.stdout: |
| | print(line, end="") |
| | process.wait() |
| | if process.returncode != 0: |
| | print(f"Command failed with return code {process.returncode}") |
| | return process.returncode |
| |
|
| | def setup_venv(): |
| | """Sets up a persistent virtual environment in /workspace.""" |
| | if not os.path.exists(VENV_DIR): |
| | print(f"Creating virtual environment in {VENV_DIR}...") |
| | run_command(f"python3 -m venv {VENV_DIR}") |
| | else: |
| | print("Virtual environment already exists.") |
| |
|
| | def install_package(package_name): |
| | """Installs a pip package into the persistent venv.""" |
| | pip_path = os.path.join(VENV_DIR, "bin", "pip") |
| | run_command(f"{pip_path} install {package_name}") |
| |
|
| | def install_git_xet(): |
| | """Installs git-xet using the huggingface script.""" |
| | print("Installing git-xet...") |
| | run_command("curl -LsSf https://huggingface.co/install-git-xet.sh | bash") |
| | run_command("git xet install") |
| |
|
| | def install_hf_cli(): |
| | """Installs Hugging Face CLI.""" |
| | print("Installing Hugging Face CLI...") |
| | run_command("curl -LsSf https://hf.co/cli/install.sh | bash") |
| |
|
| | def download_space(): |
| | """Downloads the Qwen Space using hf cli.""" |
| | if not os.path.exists(REPO_DIR): |
| | os.makedirs(REPO_DIR) |
| | |
| | print(f"Downloading Space to {REPO_DIR}...") |
| | |
| | hf_path = os.path.expanduser("~/.local/bin/hf") |
| | if not os.path.exists(hf_path): |
| | hf_path = "hf" |
| |
|
| | env = {"HF_TOKEN": HF_TOKEN} |
| | run_command(f"{hf_path} download Pr0f3ssi0n4ln00b/Qwen-Image-Edit-Rapid-AIO-Loras-Experimental --repo-type=space --local-dir {REPO_DIR}", env=env) |
| |
|
| | def create_app_file(filename, content): |
| | """Creates/Updates a file in the apps directory.""" |
| | if not os.path.exists(APPS_DIR): |
| | os.makedirs(APPS_DIR) |
| | |
| | filepath = os.path.join(APPS_DIR, filename) |
| | with open(filepath, "w") as f: |
| | f.write(content) |
| | print(f"Created/Updated: {filepath}") |
| |
|
| | def patch_app(): |
| | """Patches app.py to optimize for VRAM and fix OOM issues.""" |
| | app_path = os.path.join(REPO_DIR, "app.py") |
| | if not os.path.exists(app_path): |
| | print(f"Warning: {app_path} not found, cannot patch.") |
| | return |
| |
|
| | print("Patching app.py for memory optimization...") |
| | with open(app_path, "r") as f: |
| | content = f.read() |
| |
|
| | |
| | content = content.replace( |
| | 'device_map="cuda",', |
| | 'device_map="auto",\n low_cpu_mem_usage=True,' |
| | ) |
| |
|
| | |
| | content = content.replace(').to(device)', ')') |
| |
|
| | |
| | if "p.enable_model_cpu_offload()" not in content: |
| | content = content.replace( |
| | 'return p', |
| | 'p.enable_model_cpu_offload()\n return p' |
| | ) |
| |
|
| | |
| | content = content.replace( |
| | 'pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3())', |
| | 'print("Skipping FA3 optimization for stability.")' |
| | ) |
| |
|
| | |
| | content = content.replace( |
| | 'demo.queue(max_size=30).launch(', |
| | 'demo.queue(max_size=30).launch(server_name="0.0.0.0", share=True, ' |
| | ) |
| | |
| | |
| | |
| | if 'import spaces' in content and 'class spaces:' not in content: |
| | content = 'import sys\ntry:\n import spaces\nexcept ImportError:\n class spaces:\n @staticmethod\n def GPU(f): return f\nsys.modules["spaces"] = sys.modules.get("spaces", spaces)\n' + content |
| |
|
| | |
| | additional_prompts_map = { |
| | "Consistance": "improve consistency and quality of the generated image", |
| | "F2P": "transform the image into a high-quality photo with realistic details", |
| | "Multiple-Angles": "change the camera angle of the image", |
| | "Light-Restoration": "Remove shadows and relight the image using soft lighting", |
| | "Relight": "Relight the image with cinematic lighting", |
| | "Multi-Angle-Lighting": "Change the lighting direction and intensity", |
| | "Edit-Skin": "Enhance skin textures and natural details", |
| | "Next-Scene": "Generate the next scene based on the current image", |
| | "Flat-Log": "Desaturate and lower contrast for a flat log look", |
| | "Upscale-Image": "Enhance and sharpen the image details", |
| | "BFS-Best-FaceSwap": "head_swap : start with Picture 1 as the base image, keeping its lighting, environment, and background. remove the head from Picture 1 completely and replace it with the head from Picture 2, strictly preserving the hair, eye color, and nose structure, mouth, lips and front head of Picture 2. copy the eye direction, head rotation, and micro-expressions from Picture 1. high quality, sharp details, 4k", |
| | "BFS-Best-FaceSwap-merge": "head_swap : start with Picture 1 as the base image, keeping its lighting, environment, and background. remove the head from Picture 1 completely and replace it with the head from Picture 2, strictly preserving the hair, eye color, and nose structure, mouth, lips and front head of Picture 2. copy the eye direction, head rotation, and micro-expressions from Picture 1. high quality, sharp details, 4k", |
| | "Qwen-lora-nsfw": "Convert this picture to artistic style.", |
| | } |
| | |
| | |
| | new_lora_config = """ |
| | "Qwen-lora-nsfw": { |
| | "type": "single", |
| | "repo": "wiikoo/Qwen-lora-nsfw", |
| | "weights": "loras/qwen_image_edit_remove-clothing_v1.0.safetensors", |
| | "adapter_name": "qwen-lora-nsfw", |
| | "strength": 1.0, |
| | }, |
| | """ |
| | if '"Qwen-lora-nsfw":' not in content: |
| | content = content.replace( |
| | 'ADAPTER_SPECS = {', |
| | 'ADAPTER_SPECS = {' + new_lora_config |
| | ) |
| |
|
| | if "Manual Patch for missing prompts" not in content: |
| | content += "\n\n# Manual Patch for missing prompts\ntry:\n LORA_PRESET_PROMPTS.update({\n" |
| | for key, val in additional_prompts_map.items(): |
| | content += f' "{key}": "{val}",\n' |
| | content += " })\nexcept NameError:\n pass\n" |
| |
|
| | |
| | |
| | new_ui_logic = """ |
| | def on_lora_change_ui(selected_lora, current_prompt, extras_condition_only): |
| | # Always provide the preset if selected |
| | prompt_val = current_prompt |
| | if selected_lora != NONE_LORA: |
| | preset = LORA_PRESET_PROMPTS.get(selected_lora, "") |
| | if preset: |
| | prompt_val = preset |
| | |
| | prompt_update = gr.update(value=prompt_val) |
| | """ |
| | |
| | start_marker = "def on_lora_change_ui" |
| | end_marker = "return prompt_update, img2_update, extras_update" |
| | |
| | if start_marker in content and end_marker in content: |
| | import re |
| | content = re.sub( |
| | r"def on_lora_change_ui\(.*?\):.*?return prompt_update, img2_update, extras_update", |
| | new_ui_logic + "\n # Image2 visibility/label\n if lora_requires_two_images(selected_lora):\n img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora))\n else:\n img2_update = gr.update(visible=False, value=None, label='Upload Reference (Image 2)')\n\n # Extra references routing default\n if selected_lora in ('BFS-Best-FaceSwap', 'BFS-Best-FaceSwap-merge', 'AnyPose'):\n extras_update = gr.update(value=True)\n else:\n extras_update = gr.update(value=extras_condition_only)\n\n return prompt_update, img2_update, extras_update", |
| | content, |
| | flags=re.DOTALL |
| | ) |
| |
|
| | with open(app_path, "w") as f: |
| | f.write(content) |
| | |
| | |
| | with open(app_path, "r") as f: |
| | content = f.read() |
| |
|
| | |
| | append_fn = """ |
| | def _append_to_gallery(existing_gallery, new_image): |
| | if existing_gallery is None: |
| | return [new_image] |
| | if not isinstance(existing_gallery, list): |
| | existing_gallery = [existing_gallery] |
| | existing_gallery.append(new_image) |
| | return existing_gallery |
| | """ |
| | if "def _append_to_gallery" not in content: |
| | content = content.replace( |
| | '# UI helpers: output routing + derived conditioning', |
| | '# UI helpers: output routing + derived conditioning\n' + append_fn |
| | ) |
| |
|
| | |
| | content = content.replace('height=290)', ')') |
| | content = content.replace('height=350)', ')') |
| |
|
| | |
| | |
| | if "gr.Examples(" in content: |
| | import re |
| | content = re.sub( |
| | r"gr\.Examples\([\s\S]*?label=\"Examples\"[\s\S]*?\)", |
| | "# Examples removed automatically by setup_manager", |
| | content |
| | ) |
| |
|
| | with open(app_path, "w") as f: |
| | f.write(content) |
| | |
| |
|
| | |
| | with open(app_path, "r") as f: |
| | content = f.read() |
| |
|
| | |
| | if "update_prompt_with_camera" not in content: |
| | content = content.replace("import os", "import os\nfrom camera_control_ui import CameraControl3D, build_camera_prompt, update_prompt_with_camera") |
| | |
| | |
| | camera_lora_config = """ |
| | "3D-Camera": { |
| | "type": "single", |
| | "repo": "fal/Qwen-Image-Edit-2511-Multiple-Angles-LoRA", |
| | "weights": "qwen-image-edit-2511-multiple-angles-lora.safetensors", |
| | "adapter_name": "angles", |
| | "strength": 1.0, |
| | }, |
| | """ |
| | if '"3D-Camera":' not in content: |
| | content = content.replace( |
| | 'ADAPTER_SPECS = {', |
| | 'ADAPTER_SPECS = {' + camera_lora_config |
| | ) |
| |
|
| | |
| | prompt_clear_logic = """ |
| | def on_lora_change_ui(selected_lora, current_prompt, extras_condition_only): |
| | prompt_val = current_prompt |
| | if selected_lora != NONE_LORA: |
| | preset = LORA_PRESET_PROMPTS.get(selected_lora, "") |
| | if preset: |
| | prompt_val = preset |
| | else: |
| | prompt_val = "" # CLEAR THE PROMPT IF ACTIVE BUT NO PRESET |
| | |
| | prompt_update = gr.update(value=prompt_val) |
| | camera_update = gr.update(visible=(selected_lora == "3D-Camera")) |
| | |
| | # Image2 visibility/label |
| | if lora_requires_two_images(selected_lora): |
| | img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora)) |
| | else: |
| | img2_update = gr.update(visible=False, value=None, label='Upload Reference (Image 2)') |
| | |
| | # Extra references routing default |
| | if selected_lora in ('BFS-Best-FaceSwap', 'BFS-Best-FaceSwap-merge', 'AnyPose'): |
| | extras_update = gr.update(value=True) |
| | else: |
| | extras_update = gr.update(value=extras_condition_only) |
| | |
| | return prompt_update, img2_update, extras_update, camera_update |
| | """ |
| | old_on_lora = """ |
| | def on_lora_change_ui(selected_lora, current_prompt, extras_condition_only): |
| | # Always provide the preset if selected |
| | prompt_val = current_prompt |
| | if selected_lora != NONE_LORA: |
| | preset = LORA_PRESET_PROMPTS.get(selected_lora, "") |
| | if preset: |
| | prompt_val = preset |
| | |
| | prompt_update = gr.update(value=prompt_val) |
| | |
| | # Image2 visibility/label |
| | if lora_requires_two_images(selected_lora): |
| | img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora)) |
| | else: |
| | img2_update = gr.update(visible=False, value=None, label='Upload Reference (Image 2)') |
| | |
| | # Extra references routing default |
| | if selected_lora in ('BFS-Best-FaceSwap', 'BFS-Best-FaceSwap-merge', 'AnyPose'): |
| | extras_update = gr.update(value=True) |
| | else: |
| | extras_update = gr.update(value=extras_condition_only) |
| | |
| | return prompt_update, img2_update, extras_update |
| | """ |
| | if "camera_update = gr.update(visible" not in content: |
| | content = content.replace(old_on_lora.strip(), prompt_clear_logic.strip()) |
| | |
| | |
| | content = content.replace( |
| | "outputs=[prompt, input_image_2, extras_condition_only],", |
| | "outputs=[prompt, input_image_2, extras_condition_only, camera_container]," |
| | ) |
| |
|
| | |
| | camera_ui_block = """ |
| | input_image_2 = gr.Image(label="Upload Reference (Image 2)", type="pil", height=290, visible=False) |
| | |
| | with gr.Column(visible=False) as camera_container: |
| | gr.Markdown("### 🎮 3D Camera Control\\n*Drag handles: 🟢 Azimuth, 🩷 Elevation, 🟠 Distance*") |
| | camera_3d = CameraControl3D(value={"azimuth": 0, "elevation": 0, "distance": 1.0}, elem_id="camera-3d-control") |
| | gr.Markdown("### 🎚️ Slider Controls") |
| | azimuth_slider = gr.Slider(label="Azimuth", minimum=0, maximum=315, step=45, value=0, info="0°=front, 90°=right, 180°=back, 270°=left") |
| | elevation_slider = gr.Slider(label="Elevation", minimum=-30, maximum=60, step=30, value=0, info="-30°=low angle, 0°=eye, 60°=high angle") |
| | distance_slider = gr.Slider(label="Distance", minimum=0.6, maximum=1.4, step=0.4, value=1.0, info="0.6=close, 1.0=medium, 1.4=wide") |
| | """ |
| | if "camera_container:" not in content: |
| | content = content.replace( |
| | ' input_image_2 = gr.Image(label="Upload Reference (Image 2)", type="pil", height=290, visible=False)', |
| | camera_ui_block.strip("\\n") |
| | ) |
| |
|
| | |
| | camera_events = """ |
| | # --- 3D Camera Events --- |
| | def update_prompt_from_sliders(az, el, dist, curr_prompt): |
| | return update_prompt_with_camera(az, el, dist, curr_prompt) |
| | |
| | def sync_3d_to_sliders(cv, curr_prompt): |
| | if cv and isinstance(cv, dict): |
| | az = cv.get('azimuth', 0) |
| | el = cv.get('elevation', 0) |
| | dist = cv.get('distance', 1.0) |
| | return az, el, dist, update_prompt_with_camera(az, el, dist, curr_prompt) |
| | return gr.update(), gr.update(), gr.update(), gr.update() |
| | |
| | def sync_sliders_to_3d(az, el, dist): |
| | return {"azimuth": az, "elevation": el, "distance": dist} |
| | |
| | |
| | def update_3d_image(img): |
| | if img is None: return gr.update(imageUrl=None) |
| | import base64 |
| | from io import BytesIO |
| | buf = BytesIO() |
| | img.save(buf, format="PNG") |
| | durl = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode()}" |
| | return gr.update(imageUrl=durl) |
| | |
| | for slider in [azimuth_slider, elevation_slider, distance_slider]: |
| | slider.change(fn=update_prompt_from_sliders, inputs=[azimuth_slider, elevation_slider, distance_slider, prompt], outputs=[prompt]) |
| | slider.release(fn=sync_sliders_to_3d, inputs=[azimuth_slider, elevation_slider, distance_slider], outputs=[camera_3d]) |
| | |
| | camera_3d.change(fn=sync_3d_to_sliders, inputs=[camera_3d, prompt], outputs=[azimuth_slider, elevation_slider, distance_slider, prompt]) |
| | |
| | input_image_1.upload(fn=update_3d_image, inputs=[input_image_1], outputs=[camera_3d]) |
| | input_image_1.clear(fn=lambda: gr.update(imageUrl=None), outputs=[camera_3d]) |
| | |
| | run_button.click( |
| | """ |
| | if "def sync_3d_to_sliders" not in content: |
| | content = content.replace(" run_button.click(\n", camera_events) |
| |
|
| | |
| | content = content.replace("\\n demo.queue", "\n demo.queue") |
| |
|
| | if "head=" not in content: |
| | content = content.replace( |
| | "demo.queue(max_size=30).launch(", |
| | """head = '<script src="https://cdnjs.cloudflare.com/ajax/libs/three.js/r128/three.min.js"></script>' |
| | demo.queue(max_size=30).launch(head=head, """ |
| | ) |
| |
|
| | with open(app_path, "w") as f: |
| | f.write(content) |
| | |
| |
|
| | print("Successfully patched app.py.") |
| |
|
| | def install_dependencies(): |
| | """Installs dependencies from requirements.txt into the persistent venv.""" |
| | pip_path = os.path.join(VENV_DIR, "bin", "pip") |
| | requirements_path = os.path.join(REPO_DIR, "requirements.txt") |
| | |
| | if os.path.exists(requirements_path): |
| | print("Installing dependencies from requirements.txt...") |
| | |
| | |
| | run_command(f"{pip_path} install -r {requirements_path}") |
| | else: |
| | print(f"No requirements.txt found in {REPO_DIR}") |
| |
|
| | def run_app(): |
| | """Starts the Gradio app.""" |
| | python_path = os.path.join(VENV_DIR, "bin", "python") |
| | app_path = os.path.join(REPO_DIR, "app.py") |
| | |
| | if os.path.exists(app_path): |
| | print(f"Starting app: {app_path}") |
| | |
| | |
| | env = {"PYTHONPATH": REPO_DIR} |
| | run_command(f"{python_path} {app_path}", cwd=REPO_DIR, env=env) |
| | else: |
| | print(f"App file not found: {app_path}") |
| |
|
| | def main(): |
| | |
| | if not os.path.exists(WORKSPACE_DIR): |
| | print(f"Error: {WORKSPACE_DIR} not found. Ensure this is a RunPod with persistent storage.") |
| | return |
| |
|
| | ensure_dirs() |
| | setup_venv() |
| | install_git_xet() |
| | install_hf_cli() |
| | download_space() |
| | patch_app() |
| | install_dependencies() |
| | |
| | |
| | print("Setup tasks completed. Run with 'run' argument to start the app.") |
| |
|
| | if __name__ == "__main__": |
| | if len(sys.argv) > 1 and sys.argv[1] == "run": |
| | run_app() |
| | else: |
| | main() |
| |
|