photogen1 / app.py
stateofw's picture
Fix critical startup order bug - ensure ComfyUI setup before model downloads
61046db
#!/usr/bin/env python3
"""
Unified UGC Platform + Face Enhancer for Hugging Face Spaces
Combines all functionality into a single ComfyUI-powered space
"""
# Apply Gradio patch before importing gradio (if needed)
try:
from gradio_patch import apply_gradio_patch
apply_gradio_patch()
except Exception:
pass # Patch is optional, continue if it fails
import gradio as gr
import json
import numpy as np
from PIL import Image
import requests
import io
import base64
import time
import os
import sys
import subprocess
import threading
from typing import Dict, List, Optional, Tuple, Any
from pathlib import Path
import traceback
from datetime import datetime
import torch
import select
import uuid
# Check if we're in Hugging Face Spaces
IS_SPACES = os.environ.get("SPACE_ID") is not None
REBUILD_MARKER = "2025-01-06-v3" # Force rebuild with Gradio 4.37.2
# Setup working directory
if IS_SPACES:
WORK_DIR = Path("/home/user/app")
else:
WORK_DIR = Path.cwd()
# Initialize startup diagnostics
STARTUP_LOG = []
def log_startup(message: str, level: str = "INFO"):
"""Log startup messages with timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] [{level}] {message}"
STARTUP_LOG.append(entry)
print(entry)
os.chdir(WORK_DIR)
# Early setup check for HF Spaces
if IS_SPACES:
log_startup(f"🌐 Running in Hugging Face Spaces")
log_startup(f"πŸ“ Working directory: {WORK_DIR}")
log_startup(f"πŸ“‚ Files in directory: {list(WORK_DIR.iterdir())}")
log_startup(f"🐍 Python version: {sys.version}")
log_startup(f"πŸ”₯ PyTorch available: {torch.cuda.is_available()}")
log_startup(f"πŸ”„ Code version: {REBUILD_MARKER}")
# Check if ComfyUI is already available (from Docker build)
if Path("/app/comfyui/ComfyUI").exists():
log_startup("βœ… ComfyUI pre-installed in Docker")
# Create symlink if not exists
comfyui_link = WORK_DIR / "comfyui"
if not comfyui_link.exists():
os.symlink("/app/comfyui", str(comfyui_link))
log_startup("βœ… Created ComfyUI symlink")
elif not (WORK_DIR / "comfyui/ComfyUI").exists():
log_startup("⚠️ ComfyUI not found, will set up if needed", "WARNING")
# Setup function
def setup_environment():
"""Run setup script to clone repositories"""
setup_script = WORK_DIR / "setup.sh"
if not setup_script.exists():
log_startup("❌ setup.sh not found!", "ERROR")
return False
try:
log_startup(f"πŸ”§ Running setup script: {setup_script}")
# Make script executable
os.chmod(str(setup_script), 0o755)
# Run setup
result = subprocess.run(
['bash', str(setup_script)],
capture_output=True,
text=True,
cwd=str(WORK_DIR)
)
if result.returncode == 0:
log_startup("βœ… Setup completed successfully")
return True
else:
log_startup(f"❌ Setup failed with return code {result.returncode}", "ERROR")
log_startup(f"stdout: {result.stdout}", "ERROR")
log_startup(f"stderr: {result.stderr}", "ERROR")
return False
except Exception as e:
log_startup(f"❌ Failed to run setup script: {e}", "ERROR")
log_startup(f"Traceback: {traceback.format_exc()}", "ERROR")
return False
def download_models():
"""Download the full 'Authenticity Stack' for UGC from Hugging Face Hub."""
from huggingface_hub import hf_hub_download
log_startup("=== πŸ“₯ Starting Model Download Process (Authenticity Stack) ===")
models_to_download = [
# --- 1. Foundation Checkpoints ---
# RealVisXL for UGC style
("SG161222/RealVisXL_V4.0", "RealVisXL_V4.0.safetensors", "comfyui/ComfyUI/models/checkpoints"),
# SDXL base model for other workflows
("stabilityai/stable-diffusion-xl-base-1.0", "sd_xl_base_1.0.safetensors", "comfyui/ComfyUI/models/checkpoints"),
# --- 2. VAE ---
("stabilityai/sdxl-vae", "sdxl_vae.safetensors", "comfyui/ComfyUI/models/vae"),
# --- 3. Style LoRA (The "Camera") ---
# Using philz1337x's epicrealism repository which has the exact file we need
("philz1337x/epicrealism", "epicrealism_naturalSinRC1VAE.safetensors", "comfyui/ComfyUI/models/loras/epiCRealism - Natural photographic.safetensors"),
# --- 4. Detail LoRA (The "Texture") ---
# Using add-detail-xl from PvDeep which is a popular detail tweaker for SDXL
("PvDeep/Add-Detail-XL", "add-detail-xl.safetensors", "comfyui/ComfyUI/models/loras/detail_tweaker_xl_v2.safetensors"),
# --- 5. Finishing LoRA (The "Film") ---
# Using the film grain LoRA from artificialguybr
("artificialguybr/filmgrain-redmond-filmgrain-lora-for-sdxl", "FilmGrainRedmond-FilmGrain-FilmGrainAF.safetensors", "comfyui/ComfyUI/models/loras/film_grain_helper_sdxl.safetensors")
]
for repo_id, filename, dest_path in models_to_download:
# This handles custom filenames and directories correctly
full_path = WORK_DIR / dest_path
if Path(dest_path).suffix:
full_path = WORK_DIR / Path(dest_path)
local_dir = str(full_path.parent)
else:
full_path = WORK_DIR / dest_path / filename
local_dir = str(WORK_DIR / dest_path)
if not full_path.exists():
log_startup(f"Downloading {filename} from {repo_id}...")
try:
full_path.parent.mkdir(parents=True, exist_ok=True)
hf_hub_download(
repo_id=repo_id,
filename=filename,
local_dir=local_dir,
local_dir_use_symlinks=False,
resume_download=True
)
log_startup(f"βœ… Downloaded {filename}")
except Exception as e:
log_startup(f"⚠️ Failed to download {filename}: {e}", "WARNING")
else:
log_startup(f"βœ… Found existing model: {full_path.name}")
log_startup("=== Model Download Process Complete ===")
# Configure ComfyUI paths
COMFYUI_PATH = WORK_DIR / "comfyui/ComfyUI"
MODEL_DIR = WORK_DIR / "models"
OUTPUT_DIR = WORK_DIR / "outputs"
TEMP_DIR = WORK_DIR / "temp"
# Ensure directories exist
for dir_path in [MODEL_DIR, OUTPUT_DIR, TEMP_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
# Add ComfyUI to Python path if it exists
if COMFYUI_PATH.exists():
sys.path.insert(0, str(COMFYUI_PATH))
log_startup(f"βœ… Added ComfyUI to Python path: {COMFYUI_PATH}")
# Configuration
DEFAULT_WORKFLOW_CONFIGS = {
"ugc_authentic": {
"name": "Authentic UGC Style (Recommended)",
"workflow_file": "organic_portrait_ugc.json"
},
"portrait": {
"name": "High-Resolution Portrait",
"workflow_file": "organic_portrait_workflow.json"
},
"full_body": {
"name": "Full Body Portrait",
"workflow_file": "full_body_workflow.json"
},
"street": {
"name": "Street Photography Style",
"workflow_file": "street_photo_workflow.json"
},
"flux": {
"name": "FLUX Model",
"workflow_file": "flux_workflow.json"
}
}
FACE_ENHANCEMENT_WORKFLOW = "face_enhancement_workflow.json"
class ComfyUIManager:
def __init__(self):
"""Initialize ComfyUI Manager"""
self.server_process = None
self.server_url = "http://127.0.0.1:8188"
self.ws_url = "ws://127.0.0.1:8188/ws"
self.client_id = None
self.setup_complete = False
self.models_downloaded = False
self.server_log = []
self.max_log_lines = 500
log_startup("ComfyUI Manager initialized")
def ensure_setup(self):
"""Ensure ComfyUI is set up"""
if self.setup_complete:
return True
if not COMFYUI_PATH.exists():
log_startup("πŸ”§ ComfyUI not found, running setup...")
if not setup_environment():
return False
self.setup_complete = True
return True
def _log_output(self, pipe, prefix):
"""Log output from a pipe"""
try:
while True:
line = pipe.readline()
if not line:
break
line = line.strip()
if line:
log_entry = f"{prefix}: {line}"
log_startup(log_entry)
self.server_log.append(log_entry)
if len(self.server_log) > self.max_log_lines:
self.server_log.pop(0)
except Exception as e:
log_startup(f"Error reading {prefix}: {e}", "ERROR")
def start_server(self):
"""Start ComfyUI server with simplified logic"""
if self.server_process and self.server_process.poll() is None:
log_startup("ComfyUI server already running")
return True
log_startup("πŸš€ Starting ComfyUI server...")
try:
# Build command without obsolete --use-legacy-frontend flag
cmd = [
sys.executable,
"main.py",
"--listen", "127.0.0.1",
"--port", "8188",
"--disable-auto-launch"
]
# Log the exact command for debugging
log_startup(f"Server command: {' '.join(cmd)}")
log_startup("βœ… Confirmed: NO --use-legacy-frontend flag in command")
self.server_process = subprocess.Popen(
cmd,
cwd=str(COMFYUI_PATH),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env={**os.environ, "PYTHONUNBUFFERED": "1"}
)
# Start logging threads
stdout_thread = threading.Thread(
target=self._log_output,
args=(self.server_process.stdout, "STDOUT"),
daemon=True
)
stderr_thread = threading.Thread(
target=self._log_output,
args=(self.server_process.stderr, "STDERR"),
daemon=True
)
stdout_thread.start()
stderr_thread.start()
# Wait for server to start
log_startup("⏳ Waiting for ComfyUI server to be ready...")
start_time = time.time()
timeout = 120 # 2 minutes timeout
while time.time() - start_time < timeout:
if self.server_process.poll() is not None:
log_startup("❌ Server process terminated unexpectedly", "ERROR")
return False
try:
response = requests.get(f"{self.server_url}/system_stats", timeout=2)
if response.status_code == 200:
log_startup("βœ… ComfyUI server is ready!")
return True
except requests.exceptions.RequestException:
pass
time.sleep(2)
log_startup("❌ Server startup timeout", "ERROR")
return False
except Exception as e:
log_startup(f"❌ Failed to start server: {e}", "ERROR")
log_startup(f"Traceback: {traceback.format_exc()}", "ERROR")
return False
def stop_server(self):
"""Stop ComfyUI server"""
if self.server_process:
log_startup("Stopping ComfyUI server...")
self.server_process.terminate()
try:
self.server_process.wait(timeout=10)
except subprocess.TimeoutExpired:
log_startup("Server didn't terminate gracefully, forcing kill...")
self.server_process.kill()
self.server_process.wait() # Wait for the kill to complete
finally:
self.server_process = None
log_startup("ComfyUI server stopped")
def check_models(self):
"""Check available models"""
try:
response = requests.get(f"{self.server_url}/object_info", timeout=10)
if response.status_code == 200:
data = response.json()
if "CheckpointLoaderSimple" in data:
models = data["CheckpointLoaderSimple"]["input"]["required"]["ckpt_name"][0]
log_startup(f"πŸ“¦ Available models: {models}")
return models
except Exception as e:
log_startup(f"⚠️ Could not fetch models: {e}", "WARNING")
return []
def load_workflow(self, workflow_file: str) -> Optional[Dict]:
"""Load workflow from file"""
workflow_path = WORK_DIR / "workflows" / workflow_file
if not workflow_path.exists():
log_startup(f"❌ Workflow not found: {workflow_path}", "ERROR")
return None
try:
with open(workflow_path, 'r') as f:
return json.load(f)
except Exception as e:
log_startup(f"❌ Failed to load workflow: {e}", "ERROR")
return None
def queue_prompt(self, workflow: Dict) -> Optional[str]:
"""Queue a prompt and return the prompt ID"""
try:
p = {"prompt": workflow, "client_id": self.client_id}
response = requests.post(f"{self.server_url}/prompt", json=p)
if response.status_code == 200:
return response.json().get('prompt_id')
except Exception as e:
log_startup(f"❌ Failed to queue prompt: {e}", "ERROR")
return None
def get_history(self, prompt_id: str) -> Optional[Dict]:
"""Get generation history"""
try:
response = requests.get(f"{self.server_url}/history/{prompt_id}")
if response.status_code == 200:
return response.json()
except Exception as e:
log_startup(f"❌ Failed to get history: {e}", "ERROR")
return None
def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> bool:
"""Wait for prompt completion"""
start_time = time.time()
while time.time() - start_time < timeout:
history = self.get_history(prompt_id)
if history and prompt_id in history:
if history[prompt_id].get('outputs'):
return True
time.sleep(1)
return False
def get_output_images(self, prompt_id: str) -> List[Image.Image]:
"""Get output images from completed prompt"""
images = []
history = self.get_history(prompt_id)
if not history or prompt_id not in history:
return images
outputs = history[prompt_id].get('outputs', {})
for node_id, node_output in outputs.items():
if 'images' in node_output:
for image_info in node_output['images']:
filename = image_info['filename']
subfolder = image_info.get('subfolder', '')
# Fetch image
response = requests.get(
f"{self.server_url}/view",
params={'filename': filename, 'subfolder': subfolder}
)
if response.status_code == 200:
image = Image.open(io.BytesIO(response.content))
images.append(image)
return images
# Global manager instance
manager = ComfyUIManager()
def process_image(
style: str,
prompt: str,
negative_prompt: str = "",
seed: int = -1,
steps: int = 25,
cfg_scale: float = 7.0,
width: int = 1024,
height: int = 1024,
progress=gr.Progress(track_tqdm=True)
) -> List[Image.Image]:
"""Process image generation request"""
progress(0, desc="Initializing...")
# Ensure server is running
if not manager.ensure_setup():
raise gr.Error("Failed to set up ComfyUI")
if not manager.start_server():
raise gr.Error("Failed to start ComfyUI server")
progress(0.2, desc="Loading workflow...")
# Load workflow
workflow_config = DEFAULT_WORKFLOW_CONFIGS.get(style)
if not workflow_config:
raise gr.Error(f"Unknown style: {style}")
workflow = manager.load_workflow(workflow_config["workflow_file"])
if not workflow:
raise gr.Error("Failed to load workflow")
# Update workflow parameters
if seed == -1:
seed = int(time.time() * 1000) % 1000000
progress(0.3, desc="Preparing generation...")
# Find and update nodes in workflow
for node_id, node in workflow.items():
if node.get("class_type") == "CLIPTextEncode":
if "positive" in str(node.get("_meta", {}).get("title", "")).lower():
node["inputs"]["text"] = prompt
elif "negative" in str(node.get("_meta", {}).get("title", "")).lower():
node["inputs"]["text"] = negative_prompt
elif node.get("class_type") == "KSampler":
node["inputs"]["seed"] = seed
node["inputs"]["steps"] = steps
node["inputs"]["cfg"] = cfg_scale
elif node.get("class_type") == "EmptyLatentImage":
node["inputs"]["width"] = width
node["inputs"]["height"] = height
# Queue prompt
prompt_id = manager.queue_prompt(workflow)
if not prompt_id:
raise gr.Error("Failed to queue generation")
progress(0.4, desc="Generating image...")
# Wait for completion
if not manager.wait_for_completion(prompt_id, timeout=300):
raise gr.Error("Generation timeout")
progress(0.9, desc="Retrieving results...")
# Get output images
images = manager.get_output_images(prompt_id)
if not images:
raise gr.Error("No images generated")
progress(1.0, desc="Complete!")
return images
def enhance_face(
image: Image.Image,
enhancement_level: float = 0.5,
progress=gr.Progress(track_tqdm=True)
) -> Optional[Image.Image]:
"""Enhance face in uploaded image"""
progress(0, desc="Initializing face enhancement...")
# Validate input image
if image is None:
raise gr.Error("No image provided")
# Check image size to prevent DoS
MAX_IMAGE_SIZE = 4096 # Maximum dimension
if image.width > MAX_IMAGE_SIZE or image.height > MAX_IMAGE_SIZE:
raise gr.Error(f"Image too large. Maximum dimension is {MAX_IMAGE_SIZE}px")
# Ensure server is running
if not manager.ensure_setup():
raise gr.Error("Failed to set up ComfyUI")
if not manager.start_server():
raise gr.Error("Failed to start ComfyUI server")
progress(0.2, desc="Uploading image...")
# Upload image to ComfyUI with unique filename
temp_filename = f"input_{uuid.uuid4().hex}_{int(time.time())}.png"
temp_path = TEMP_DIR / temp_filename
image.save(temp_path)
# Upload to ComfyUI
with open(temp_path, 'rb') as f:
files = {'image': ('image.png', f, 'image/png')}
response = requests.post(f"{manager.server_url}/upload/image", files=files)
if response.status_code != 200:
raise gr.Error("Failed to upload image")
upload_data = response.json()
uploaded_filename = upload_data['name']
progress(0.3, desc="Loading enhancement workflow...")
# Load face enhancement workflow
workflow = manager.load_workflow(FACE_ENHANCEMENT_WORKFLOW)
if not workflow:
raise gr.Error("Failed to load face enhancement workflow")
# Update workflow with uploaded image
for node_id, node in workflow.items():
if node.get("class_type") == "LoadImage":
node["inputs"]["image"] = uploaded_filename
progress(0.4, desc="Processing face enhancement...")
# Queue and process
prompt_id = manager.queue_prompt(workflow)
if not prompt_id:
raise gr.Error("Failed to queue enhancement")
if not manager.wait_for_completion(prompt_id, timeout=120):
raise gr.Error("Enhancement timeout")
progress(0.9, desc="Retrieving enhanced image...")
# Get result
images = manager.get_output_images(prompt_id)
if not images:
raise gr.Error("No enhanced image generated")
# Clean up temp file
try:
temp_path.unlink(missing_ok=True)
except Exception as e:
log_startup(f"Warning: Failed to clean up temp file: {e}", "WARNING")
progress(1.0, desc="Enhancement complete!")
return images[0]
def create_gradio_interface():
"""Create the Gradio interface"""
with gr.Blocks(title="Unified UGC Platform", theme=gr.themes.Soft()) as app:
gr.Markdown("""
# 🎨 Unified UGC Platform
Generate high-quality images and enhance faces using state-of-the-art AI models.
""")
with gr.Tabs():
# Image Generation Tab
with gr.TabItem("πŸ–ΌοΈ Generate Images"):
with gr.Row():
with gr.Column():
style = gr.Dropdown(
choices=[(v["name"], k) for k, v in DEFAULT_WORKFLOW_CONFIGS.items()],
value="ugc_authentic",
label="Style"
)
prompt = gr.Textbox(
label="Prompt",
placeholder="e.g., selfie of a woman in her bedroom, iPhone photo, natural light, casual outfit, authentic moment",
lines=3
)
negative_prompt = gr.Textbox(
label="Negative Prompt",
placeholder="e.g., perfect skin, studio lighting, professional photography, airbrushed, plastic skin",
lines=2
)
with gr.Row():
seed = gr.Number(label="Seed", value=-1, precision=0)
steps = gr.Slider(label="Steps", minimum=1, maximum=50, value=25, step=1)
with gr.Row():
cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, value=5, step=0.5)
with gr.Row():
width = gr.Slider(label="Width", minimum=256, maximum=2048, value=1024, step=64)
height = gr.Slider(label="Height", minimum=256, maximum=2048, value=1024, step=64)
generate_btn = gr.Button("🎨 Generate", variant="primary")
with gr.Column():
output_gallery = gr.Gallery(
label="Generated Images",
show_label=True,
elem_id="gallery",
columns=2,
rows=2,
height="600px"
)
# Wire up generation
generate_btn.click(
fn=process_image,
inputs=[style, prompt, negative_prompt, seed, steps, cfg_scale, width, height],
outputs=output_gallery
)
# Face Enhancement Tab
with gr.TabItem("✨ Enhance Faces"):
with gr.Row():
with gr.Column():
input_image = gr.Image(
label="Upload Image",
type="pil"
)
enhancement_level = gr.Slider(
label="Enhancement Level",
minimum=0,
maximum=1,
value=0.5,
step=0.1
)
enhance_btn = gr.Button("✨ Enhance Face", variant="primary")
with gr.Column():
output_image = gr.Image(
label="Enhanced Image",
type="pil"
)
# Wire up enhancement
enhance_btn.click(
fn=enhance_face,
inputs=[input_image, enhancement_level],
outputs=output_image
)
# System Status Tab
with gr.TabItem("πŸ”§ System Status"):
with gr.Column():
gr.Markdown("### Startup Log")
log_display = gr.Textbox(
value=lambda: "\n".join(STARTUP_LOG[-50:]),
label="Recent Logs",
lines=20,
max_lines=30,
interactive=False
)
gr.Markdown("### Server Log")
server_log_display = gr.Textbox(
value=lambda: "\n".join(manager.server_log[-50:]),
label="ComfyUI Server Logs",
lines=20,
max_lines=30,
interactive=False
)
refresh_btn = gr.Button("πŸ”„ Refresh Logs")
def refresh_logs():
return (
"\n".join(STARTUP_LOG[-50:]),
"\n".join(manager.server_log[-50:])
)
refresh_btn.click(
fn=refresh_logs,
outputs=[log_display, server_log_display]
)
gr.Markdown("""
---
Made with ❀️ using ComfyUI and Gradio
""")
return app
# Add cleanup handler
import atexit
import signal
def cleanup_on_exit():
"""Clean up resources on exit"""
log_startup("Cleaning up resources...")
manager.stop_server()
log_startup("Cleanup complete")
# Register cleanup handlers
atexit.register(cleanup_on_exit)
signal.signal(signal.SIGTERM, lambda s, f: cleanup_on_exit())
# Main execution
if __name__ == "__main__":
log_startup("=" * 50)
log_startup("πŸš€ STARTING UNIFIED UGC PLATFORM")
log_startup("=" * 50)
try:
# Step 1: Ensure environment (cloning repos) is set up
if not manager.ensure_setup():
log_startup("❌ Initial setup failed", "ERROR")
raise RuntimeError("Setup failed")
# Step 2: Download all models after ComfyUI is set up
log_startup("🎨 Downloading Authenticity Stack models...")
download_models()
manager.models_downloaded = True
# Step 3: Start server
log_startup("πŸ”§ Starting ComfyUI server...")
if not manager.start_server():
log_startup("❌ Failed to start ComfyUI server", "ERROR")
raise RuntimeError("Server startup failed")
# Check models
models = manager.check_models()
if models:
log_startup(f"βœ… Found {len(models)} models")
# Create and launch Gradio app
log_startup("🎨 Creating Gradio interface...")
app = create_gradio_interface()
log_startup("🌟 Launching application...")
app.launch(
server_name="0.0.0.0" if IS_SPACES else "127.0.0.1",
server_port=7860,
share=True if IS_SPACES else False
)
except Exception as e:
log_startup(f"❌ FATAL ERROR: {e}", "ERROR")
log_startup(f"Traceback: {traceback.format_exc()}", "ERROR")
# Create error display app
with gr.Blocks() as error_app:
gr.Markdown("# ❌ Application Failed to Start")
gr.Markdown(f"**Error:** {str(e)}")
gr.Textbox(
value="\n".join(STARTUP_LOG),
label="Startup Log",
lines=30,
interactive=False
)
error_app.launch(
server_name="0.0.0.0" if IS_SPACES else "127.0.0.1",
server_port=7860,
share=True if IS_SPACES else False
)