File size: 19,744 Bytes
674481b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 | import os
import subprocess
import sys
# Configuration
WORKSPACE_DIR = "/workspace"
VENV_DIR = os.path.join(WORKSPACE_DIR, "venv")
APPS_DIR = os.path.join(WORKSPACE_DIR, "apps")
REPO_DIR = os.path.join(WORKSPACE_DIR, "Qwen-Image-Edit")
HF_TOKEN = "YOUR_HF_TOKEN_HERE"
# Cache and Temp Directories (Strictly on persistent drive)
CACHE_BASE = os.path.join(WORKSPACE_DIR, "cache")
TMP_DIR = os.path.join(WORKSPACE_DIR, "tmp")
PIP_CACHE = os.path.join(CACHE_BASE, "pip")
HF_HOME = os.path.join(CACHE_BASE, "huggingface")
def ensure_dirs():
"""Ensures all necessary persistent directories exist."""
dirs = [APPS_DIR, REPO_DIR, CACHE_BASE, TMP_DIR, PIP_CACHE, HF_HOME]
for d in dirs:
if not os.path.exists(d):
os.makedirs(d)
print(f"Created directory: {d}")
def run_command(command, cwd=None, env=None):
"""Runs a shell command and prints output."""
print(f"Running: {command}")
current_env = os.environ.copy()
# Force use of persistent directories
current_env["TMPDIR"] = TMP_DIR
current_env["PIP_CACHE_DIR"] = PIP_CACHE
current_env["HF_HOME"] = HF_HOME
if env:
current_env.update(env)
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=cwd,
env=current_env
)
for line in process.stdout:
print(line, end="")
process.wait()
if process.returncode != 0:
print(f"Command failed with return code {process.returncode}")
return process.returncode
def setup_venv():
"""Sets up a persistent virtual environment in /workspace."""
if not os.path.exists(VENV_DIR):
print(f"Creating virtual environment in {VENV_DIR}...")
run_command(f"python3 -m venv {VENV_DIR}")
else:
print("Virtual environment already exists.")
def install_package(package_name):
"""Installs a pip package into the persistent venv."""
pip_path = os.path.join(VENV_DIR, "bin", "pip")
run_command(f"{pip_path} install {package_name}")
def install_git_xet():
"""Installs git-xet using the huggingface script."""
print("Installing git-xet...")
run_command("curl -LsSf https://huggingface.co/install-git-xet.sh | bash")
run_command("git xet install")
def install_hf_cli():
"""Installs Hugging Face CLI."""
print("Installing Hugging Face CLI...")
run_command("curl -LsSf https://hf.co/cli/install.sh | bash")
def download_space():
"""Downloads the Qwen Space using hf cli."""
if not os.path.exists(REPO_DIR):
os.makedirs(REPO_DIR)
print(f"Downloading Space to {REPO_DIR}...")
# Using full path to hf if it's in ~/.local/bin
hf_path = os.path.expanduser("~/.local/bin/hf")
if not os.path.exists(hf_path):
hf_path = "hf" # fallback to PATH
env = {"HF_TOKEN": HF_TOKEN}
run_command(f"{hf_path} download Pr0f3ssi0n4ln00b/Qwen-Image-Edit-Rapid-AIO-Loras-Experimental --repo-type=space --local-dir {REPO_DIR}", env=env)
def create_app_file(filename, content):
"""Creates/Updates a file in the apps directory."""
if not os.path.exists(APPS_DIR):
os.makedirs(APPS_DIR)
filepath = os.path.join(APPS_DIR, filename)
with open(filepath, "w") as f:
f.write(content)
print(f"Created/Updated: {filepath}")
def patch_app():
"""Patches app.py to optimize for VRAM and fix OOM issues."""
app_path = os.path.join(REPO_DIR, "app.py")
if not os.path.exists(app_path):
print(f"Warning: {app_path} not found, cannot patch.")
return
print("Patching app.py for memory optimization...")
with open(app_path, "r") as f:
content = f.read()
# 1. Update transformer loading to use device_map="auto" and low_cpu_mem_usage
content = content.replace(
'device_map="cuda",',
'device_map="auto",\n low_cpu_mem_usage=True,'
)
# 2. Remove redundant .to(device) which causes OOM
content = content.replace(').to(device)', ')')
# 3. Enable model CPU offload to save VRAM
if "p.enable_model_cpu_offload()" not in content:
content = content.replace(
'return p',
'p.enable_model_cpu_offload()\n return p'
)
# 4. Disable FA3 Processor (to avoid hangs/compilation issues)
content = content.replace(
'pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3())',
'print("Skipping FA3 optimization for stability.")'
)
# 5. Fix launch parameters for visibility and accessibility
content = content.replace(
'demo.queue(max_size=30).launch(',
'demo.queue(max_size=30).launch(server_name="0.0.0.0", share=True, '
)
# 6. Ensure spaces.GPU is handled (if it blocks)
# Usually it's fine, but let's be safe and mock it if env isn't right
if 'import spaces' in content and 'class spaces:' not in content:
content = 'import sys\ntry:\n import spaces\nexcept ImportError:\n class spaces:\n @staticmethod\n def GPU(f): return f\nsys.modules["spaces"] = sys.modules.get("spaces", spaces)\n' + content
# 7. Add missing LORA_PRESET_PROMPTS (Robust append)
additional_prompts_map = {
"Consistance": "improve consistency and quality of the generated image",
"F2P": "transform the image into a high-quality photo with realistic details",
"Multiple-Angles": "change the camera angle of the image",
"Light-Restoration": "Remove shadows and relight the image using soft lighting",
"Relight": "Relight the image with cinematic lighting",
"Multi-Angle-Lighting": "Change the lighting direction and intensity",
"Edit-Skin": "Enhance skin textures and natural details",
"Next-Scene": "Generate the next scene based on the current image",
"Flat-Log": "Desaturate and lower contrast for a flat log look",
"Upscale-Image": "Enhance and sharpen the image details",
"BFS-Best-FaceSwap": "head_swap : start with Picture 1 as the base image, keeping its lighting, environment, and background. remove the head from Picture 1 completely and replace it with the head from Picture 2, strictly preserving the hair, eye color, and nose structure, mouth, lips and front head of Picture 2. copy the eye direction, head rotation, and micro-expressions from Picture 1. high quality, sharp details, 4k",
"BFS-Best-FaceSwap-merge": "head_swap : start with Picture 1 as the base image, keeping its lighting, environment, and background. remove the head from Picture 1 completely and replace it with the head from Picture 2, strictly preserving the hair, eye color, and nose structure, mouth, lips and front head of Picture 2. copy the eye direction, head rotation, and micro-expressions from Picture 1. high quality, sharp details, 4k",
"Qwen-lora-nsfw": "Convert this picture to artistic style.", # Default prompt
}
# 9. Add new LoRA to ADAPTER_SPECS
new_lora_config = """
"Qwen-lora-nsfw": {
"type": "single",
"repo": "wiikoo/Qwen-lora-nsfw",
"weights": "loras/qwen_image_edit_remove-clothing_v1.0.safetensors",
"adapter_name": "qwen-lora-nsfw",
"strength": 1.0,
},
"""
if '"Qwen-lora-nsfw":' not in content:
content = content.replace(
'ADAPTER_SPECS = {',
'ADAPTER_SPECS = {' + new_lora_config
)
if "Manual Patch for missing prompts" not in content:
content += "\n\n# Manual Patch for missing prompts\ntry:\n LORA_PRESET_PROMPTS.update({\n"
for key, val in additional_prompts_map.items():
content += f' "{key}": "{val}",\n'
content += " })\nexcept NameError:\n pass\n"
# 8. Modify on_lora_change_ui to ALWAYS update the prompt if a style is picked
# (or at least be more aggressive)
new_ui_logic = """
def on_lora_change_ui(selected_lora, current_prompt, extras_condition_only):
# Always provide the preset if selected
prompt_val = current_prompt
if selected_lora != NONE_LORA:
preset = LORA_PRESET_PROMPTS.get(selected_lora, "")
if preset:
prompt_val = preset
prompt_update = gr.update(value=prompt_val)
"""
# Find the old function and replace it
start_marker = "def on_lora_change_ui"
end_marker = "return prompt_update, img2_update, extras_update"
if start_marker in content and end_marker in content:
import re
content = re.sub(
r"def on_lora_change_ui\(.*?\):.*?return prompt_update, img2_update, extras_update",
new_ui_logic + "\n # Image2 visibility/label\n if lora_requires_two_images(selected_lora):\n img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora))\n else:\n img2_update = gr.update(visible=False, value=None, label='Upload Reference (Image 2)')\n\n # Extra references routing default\n if selected_lora in ('BFS-Best-FaceSwap', 'BFS-Best-FaceSwap-merge', 'AnyPose'):\n extras_update = gr.update(value=True)\n else:\n extras_update = gr.update(value=extras_condition_only)\n\n return prompt_update, img2_update, extras_update",
content,
flags=re.DOTALL
)
with open(app_path, "w") as f:
f.write(content)
# --- NEW UI PATCHES ---
with open(app_path, "r") as f:
content = f.read()
# 10. Implement missing _append_to_gallery function
append_fn = """
def _append_to_gallery(existing_gallery, new_image):
if existing_gallery is None:
return [new_image]
if not isinstance(existing_gallery, list):
existing_gallery = [existing_gallery]
existing_gallery.append(new_image)
return existing_gallery
"""
if "def _append_to_gallery" not in content:
content = content.replace(
'# UI helpers: output routing + derived conditioning',
'# UI helpers: output routing + derived conditioning\n' + append_fn
)
# 11. Remove height constraints from main image components
content = content.replace('height=290)', ')')
content = content.replace('height=350)', ')')
# 12. Strip out gr.Examples block to declutter UI
# We find the start of gr.Examples and the end of its call
if "gr.Examples(" in content:
import re
content = re.sub(
r"gr\.Examples\([\s\S]*?label=\"Examples\"[\s\S]*?\)",
"# Examples removed automatically by setup_manager",
content
)
with open(app_path, "w") as f:
f.write(content)
# --- END NEW UI PATCHES ---
# --- 3D CAMERA AND PROMPT CLEARING PATCHES ---
with open(app_path, "r") as f:
content = f.read()
# Import the custom 3D Camera control safely at the top
if "update_prompt_with_camera" not in content:
content = content.replace("import os", "import os\nfrom camera_control_ui import CameraControl3D, build_camera_prompt, update_prompt_with_camera")
# Add the 3D Camera LoRA to ADAPTER_SPECS
camera_lora_config = """
"3D-Camera": {
"type": "single",
"repo": "fal/Qwen-Image-Edit-2511-Multiple-Angles-LoRA",
"weights": "qwen-image-edit-2511-multiple-angles-lora.safetensors",
"adapter_name": "angles",
"strength": 1.0,
},
"""
if '"3D-Camera":' not in content:
content = content.replace(
'ADAPTER_SPECS = {',
'ADAPTER_SPECS = {' + camera_lora_config
)
# Patch on_lora_change_ui to clear prompt if no preset exists and toggle 3D camera visibility
prompt_clear_logic = """
def on_lora_change_ui(selected_lora, current_prompt, extras_condition_only):
prompt_val = current_prompt
if selected_lora != NONE_LORA:
preset = LORA_PRESET_PROMPTS.get(selected_lora, "")
if preset:
prompt_val = preset
else:
prompt_val = "" # CLEAR THE PROMPT IF ACTIVE BUT NO PRESET
prompt_update = gr.update(value=prompt_val)
camera_update = gr.update(visible=(selected_lora == "3D-Camera"))
# Image2 visibility/label
if lora_requires_two_images(selected_lora):
img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora))
else:
img2_update = gr.update(visible=False, value=None, label='Upload Reference (Image 2)')
# Extra references routing default
if selected_lora in ('BFS-Best-FaceSwap', 'BFS-Best-FaceSwap-merge', 'AnyPose'):
extras_update = gr.update(value=True)
else:
extras_update = gr.update(value=extras_condition_only)
return prompt_update, img2_update, extras_update, camera_update
"""
old_on_lora = """
def on_lora_change_ui(selected_lora, current_prompt, extras_condition_only):
# Always provide the preset if selected
prompt_val = current_prompt
if selected_lora != NONE_LORA:
preset = LORA_PRESET_PROMPTS.get(selected_lora, "")
if preset:
prompt_val = preset
prompt_update = gr.update(value=prompt_val)
# Image2 visibility/label
if lora_requires_two_images(selected_lora):
img2_update = gr.update(visible=True, label=image2_label_for_lora(selected_lora))
else:
img2_update = gr.update(visible=False, value=None, label='Upload Reference (Image 2)')
# Extra references routing default
if selected_lora in ('BFS-Best-FaceSwap', 'BFS-Best-FaceSwap-merge', 'AnyPose'):
extras_update = gr.update(value=True)
else:
extras_update = gr.update(value=extras_condition_only)
return prompt_update, img2_update, extras_update
"""
if "camera_update = gr.update(visible" not in content:
content = content.replace(old_on_lora.strip(), prompt_clear_logic.strip())
# We also need to update the caller
content = content.replace(
"outputs=[prompt, input_image_2, extras_condition_only],",
"outputs=[prompt, input_image_2, extras_condition_only, camera_container],"
)
# Inject the 3D Camera UI Block right below input_image_2 definition
camera_ui_block = """
input_image_2 = gr.Image(label="Upload Reference (Image 2)", type="pil", height=290, visible=False)
with gr.Column(visible=False) as camera_container:
gr.Markdown("### 🎮 3D Camera Control\\n*Drag handles: 🟢 Azimuth, 🩷 Elevation, 🟠 Distance*")
camera_3d = CameraControl3D(value={"azimuth": 0, "elevation": 0, "distance": 1.0}, elem_id="camera-3d-control")
gr.Markdown("### 🎚️ Slider Controls")
azimuth_slider = gr.Slider(label="Azimuth", minimum=0, maximum=315, step=45, value=0, info="0°=front, 90°=right, 180°=back, 270°=left")
elevation_slider = gr.Slider(label="Elevation", minimum=-30, maximum=60, step=30, value=0, info="-30°=low angle, 0°=eye, 60°=high angle")
distance_slider = gr.Slider(label="Distance", minimum=0.6, maximum=1.4, step=0.4, value=1.0, info="0.6=close, 1.0=medium, 1.4=wide")
"""
if "camera_container:" not in content:
content = content.replace(
' input_image_2 = gr.Image(label="Upload Reference (Image 2)", type="pil", height=290, visible=False)',
camera_ui_block.strip("\\n")
)
# Inject the Events. We place them right before "run_button.click("
camera_events = """
# --- 3D Camera Events ---
def update_prompt_from_sliders(az, el, dist, curr_prompt):
return update_prompt_with_camera(az, el, dist, curr_prompt)
def sync_3d_to_sliders(cv, curr_prompt):
if cv and isinstance(cv, dict):
az = cv.get('azimuth', 0)
el = cv.get('elevation', 0)
dist = cv.get('distance', 1.0)
return az, el, dist, update_prompt_with_camera(az, el, dist, curr_prompt)
return gr.update(), gr.update(), gr.update(), gr.update()
def sync_sliders_to_3d(az, el, dist):
return {"azimuth": az, "elevation": el, "distance": dist}
def update_3d_image(img):
if img is None: return gr.update(imageUrl=None)
import base64
from io import BytesIO
buf = BytesIO()
img.save(buf, format="PNG")
durl = f"data:image/png;base64,{base64.b64encode(buf.getvalue()).decode()}"
return gr.update(imageUrl=durl)
for slider in [azimuth_slider, elevation_slider, distance_slider]:
slider.change(fn=update_prompt_from_sliders, inputs=[azimuth_slider, elevation_slider, distance_slider, prompt], outputs=[prompt])
slider.release(fn=sync_sliders_to_3d, inputs=[azimuth_slider, elevation_slider, distance_slider], outputs=[camera_3d])
camera_3d.change(fn=sync_3d_to_sliders, inputs=[camera_3d, prompt], outputs=[azimuth_slider, elevation_slider, distance_slider, prompt])
input_image_1.upload(fn=update_3d_image, inputs=[input_image_1], outputs=[camera_3d])
input_image_1.clear(fn=lambda: gr.update(imageUrl=None), outputs=[camera_3d])
run_button.click(
"""
if "def sync_3d_to_sliders" not in content:
content = content.replace(" run_button.click(\n", camera_events)
# Clear any bad \\n literals if they exist
content = content.replace("\\n demo.queue", "\n demo.queue")
if "head=" not in content:
content = content.replace(
"demo.queue(max_size=30).launch(",
"""head = '<script src="https://cdnjs.cloudflare.com/ajax/libs/three.js/r128/three.min.js"></script>'
demo.queue(max_size=30).launch(head=head, """
)
with open(app_path, "w") as f:
f.write(content)
# --- END 3D CAMERA PATCHES ---
print("Successfully patched app.py.")
def install_dependencies():
"""Installs dependencies from requirements.txt into the persistent venv."""
pip_path = os.path.join(VENV_DIR, "bin", "pip")
requirements_path = os.path.join(REPO_DIR, "requirements.txt")
if os.path.exists(requirements_path):
print("Installing dependencies from requirements.txt...")
# Note: torch 2.9.1 might not exist on PyPI, checking if it needs --extra-index-url
# For L40S, we typically want the latest stable torch with CUDA 12.x
run_command(f"{pip_path} install -r {requirements_path}")
else:
print(f"No requirements.txt found in {REPO_DIR}")
def run_app():
"""Starts the Gradio app."""
python_path = os.path.join(VENV_DIR, "bin", "python")
app_path = os.path.join(REPO_DIR, "app.py")
if os.path.exists(app_path):
print(f"Starting app: {app_path}")
# Gradio apps often need to be bound to 0.0.0.0 for external access
# We'll run it and see if it requires specific environment variables
env = {"PYTHONPATH": REPO_DIR}
run_command(f"{python_path} {app_path}", cwd=REPO_DIR, env=env)
else:
print(f"App file not found: {app_path}")
def main():
# Ensure workspace exists
if not os.path.exists(WORKSPACE_DIR):
print(f"Error: {WORKSPACE_DIR} not found. Ensure this is a RunPod with persistent storage.")
return
ensure_dirs()
setup_venv()
install_git_xet()
install_hf_cli()
download_space()
patch_app()
install_dependencies()
# We don't call run_app here by default to allow script updates
print("Setup tasks completed. Run with 'run' argument to start the app.")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "run":
run_app()
else:
main()
|