#!/usr/bin/env python3 """ Unified UGC Platform + Face Enhancer for Hugging Face Spaces Combines all functionality into a single ComfyUI-powered space """ # Apply Gradio patch before importing gradio (if needed) try: from gradio_patch import apply_gradio_patch apply_gradio_patch() except Exception: pass # Patch is optional, continue if it fails import gradio as gr import json import numpy as np from PIL import Image import requests import io import base64 import time import os import sys import subprocess import threading from typing import Dict, List, Optional, Tuple, Any from pathlib import Path import traceback from datetime import datetime import torch import select import uuid # Check if we're in Hugging Face Spaces IS_SPACES = os.environ.get("SPACE_ID") is not None REBUILD_MARKER = "2025-01-06-v3" # Force rebuild with Gradio 4.37.2 # Setup working directory if IS_SPACES: WORK_DIR = Path("/home/user/app") else: WORK_DIR = Path.cwd() # Initialize startup diagnostics STARTUP_LOG = [] def log_startup(message: str, level: str = "INFO"): """Log startup messages with timestamp""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") entry = f"[{timestamp}] [{level}] {message}" STARTUP_LOG.append(entry) print(entry) os.chdir(WORK_DIR) # Early setup check for HF Spaces if IS_SPACES: log_startup(f"🌐 Running in Hugging Face Spaces") log_startup(f"📁 Working directory: {WORK_DIR}") log_startup(f"📂 Files in directory: {list(WORK_DIR.iterdir())}") log_startup(f"🐍 Python version: {sys.version}") log_startup(f"🔥 PyTorch available: {torch.cuda.is_available()}") log_startup(f"🔄 Code version: {REBUILD_MARKER}") # Check if ComfyUI is already available (from Docker build) if Path("/app/comfyui/ComfyUI").exists(): log_startup("✅ ComfyUI pre-installed in Docker") # Create symlink if not exists comfyui_link = WORK_DIR / "comfyui" if not comfyui_link.exists(): os.symlink("/app/comfyui", str(comfyui_link)) log_startup("✅ Created ComfyUI symlink") elif not (WORK_DIR / "comfyui/ComfyUI").exists(): log_startup("⚠️ ComfyUI not found, will set up if needed", "WARNING") # Setup function def setup_environment(): """Run setup script to clone repositories""" setup_script = WORK_DIR / "setup.sh" if not setup_script.exists(): log_startup("❌ setup.sh not found!", "ERROR") return False try: log_startup(f"🔧 Running setup script: {setup_script}") # Make script executable os.chmod(str(setup_script), 0o755) # Run setup result = subprocess.run( ['bash', str(setup_script)], capture_output=True, text=True, cwd=str(WORK_DIR) ) if result.returncode == 0: log_startup("✅ Setup completed successfully") return True else: log_startup(f"❌ Setup failed with return code {result.returncode}", "ERROR") log_startup(f"stdout: {result.stdout}", "ERROR") log_startup(f"stderr: {result.stderr}", "ERROR") return False except Exception as e: log_startup(f"❌ Failed to run setup script: {e}", "ERROR") log_startup(f"Traceback: {traceback.format_exc()}", "ERROR") return False def download_models(): """Download the full 'Authenticity Stack' for UGC from Hugging Face Hub.""" from huggingface_hub import hf_hub_download log_startup("=== 📥 Starting Model Download Process (Authenticity Stack) ===") models_to_download = [ # --- 1. Foundation Checkpoints --- # RealVisXL for UGC style ("SG161222/RealVisXL_V4.0", "RealVisXL_V4.0.safetensors", "comfyui/ComfyUI/models/checkpoints"), # SDXL base model for other workflows ("stabilityai/stable-diffusion-xl-base-1.0", "sd_xl_base_1.0.safetensors", "comfyui/ComfyUI/models/checkpoints"), # --- 2. VAE --- ("stabilityai/sdxl-vae", "sdxl_vae.safetensors", "comfyui/ComfyUI/models/vae"), # --- 3. Style LoRA (The "Camera") --- # Using philz1337x's epicrealism repository which has the exact file we need ("philz1337x/epicrealism", "epicrealism_naturalSinRC1VAE.safetensors", "comfyui/ComfyUI/models/loras/epiCRealism - Natural photographic.safetensors"), # --- 4. Detail LoRA (The "Texture") --- # Using add-detail-xl from PvDeep which is a popular detail tweaker for SDXL ("PvDeep/Add-Detail-XL", "add-detail-xl.safetensors", "comfyui/ComfyUI/models/loras/detail_tweaker_xl_v2.safetensors"), # --- 5. Finishing LoRA (The "Film") --- # Using the film grain LoRA from artificialguybr ("artificialguybr/filmgrain-redmond-filmgrain-lora-for-sdxl", "FilmGrainRedmond-FilmGrain-FilmGrainAF.safetensors", "comfyui/ComfyUI/models/loras/film_grain_helper_sdxl.safetensors") ] for repo_id, filename, dest_path in models_to_download: # This handles custom filenames and directories correctly full_path = WORK_DIR / dest_path if Path(dest_path).suffix: full_path = WORK_DIR / Path(dest_path) local_dir = str(full_path.parent) else: full_path = WORK_DIR / dest_path / filename local_dir = str(WORK_DIR / dest_path) if not full_path.exists(): log_startup(f"Downloading {filename} from {repo_id}...") try: full_path.parent.mkdir(parents=True, exist_ok=True) hf_hub_download( repo_id=repo_id, filename=filename, local_dir=local_dir, local_dir_use_symlinks=False, resume_download=True ) log_startup(f"✅ Downloaded {filename}") except Exception as e: log_startup(f"⚠️ Failed to download {filename}: {e}", "WARNING") else: log_startup(f"✅ Found existing model: {full_path.name}") log_startup("=== Model Download Process Complete ===") # Configure ComfyUI paths COMFYUI_PATH = WORK_DIR / "comfyui/ComfyUI" MODEL_DIR = WORK_DIR / "models" OUTPUT_DIR = WORK_DIR / "outputs" TEMP_DIR = WORK_DIR / "temp" # Ensure directories exist for dir_path in [MODEL_DIR, OUTPUT_DIR, TEMP_DIR]: dir_path.mkdir(parents=True, exist_ok=True) # Add ComfyUI to Python path if it exists if COMFYUI_PATH.exists(): sys.path.insert(0, str(COMFYUI_PATH)) log_startup(f"✅ Added ComfyUI to Python path: {COMFYUI_PATH}") # Configuration DEFAULT_WORKFLOW_CONFIGS = { "ugc_authentic": { "name": "Authentic UGC Style (Recommended)", "workflow_file": "organic_portrait_ugc.json" }, "portrait": { "name": "High-Resolution Portrait", "workflow_file": "organic_portrait_workflow.json" }, "full_body": { "name": "Full Body Portrait", "workflow_file": "full_body_workflow.json" }, "street": { "name": "Street Photography Style", "workflow_file": "street_photo_workflow.json" }, "flux": { "name": "FLUX Model", "workflow_file": "flux_workflow.json" } } FACE_ENHANCEMENT_WORKFLOW = "face_enhancement_workflow.json" class ComfyUIManager: def __init__(self): """Initialize ComfyUI Manager""" self.server_process = None self.server_url = "http://127.0.0.1:8188" self.ws_url = "ws://127.0.0.1:8188/ws" self.client_id = None self.setup_complete = False self.models_downloaded = False self.server_log = [] self.max_log_lines = 500 log_startup("ComfyUI Manager initialized") def ensure_setup(self): """Ensure ComfyUI is set up""" if self.setup_complete: return True if not COMFYUI_PATH.exists(): log_startup("🔧 ComfyUI not found, running setup...") if not setup_environment(): return False self.setup_complete = True return True def _log_output(self, pipe, prefix): """Log output from a pipe""" try: while True: line = pipe.readline() if not line: break line = line.strip() if line: log_entry = f"{prefix}: {line}" log_startup(log_entry) self.server_log.append(log_entry) if len(self.server_log) > self.max_log_lines: self.server_log.pop(0) except Exception as e: log_startup(f"Error reading {prefix}: {e}", "ERROR") def start_server(self): """Start ComfyUI server with simplified logic""" if self.server_process and self.server_process.poll() is None: log_startup("ComfyUI server already running") return True log_startup("🚀 Starting ComfyUI server...") try: # Build command without obsolete --use-legacy-frontend flag cmd = [ sys.executable, "main.py", "--listen", "127.0.0.1", "--port", "8188", "--disable-auto-launch" ] # Log the exact command for debugging log_startup(f"Server command: {' '.join(cmd)}") log_startup("✅ Confirmed: NO --use-legacy-frontend flag in command") self.server_process = subprocess.Popen( cmd, cwd=str(COMFYUI_PATH), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, env={**os.environ, "PYTHONUNBUFFERED": "1"} ) # Start logging threads stdout_thread = threading.Thread( target=self._log_output, args=(self.server_process.stdout, "STDOUT"), daemon=True ) stderr_thread = threading.Thread( target=self._log_output, args=(self.server_process.stderr, "STDERR"), daemon=True ) stdout_thread.start() stderr_thread.start() # Wait for server to start log_startup("⏳ Waiting for ComfyUI server to be ready...") start_time = time.time() timeout = 120 # 2 minutes timeout while time.time() - start_time < timeout: if self.server_process.poll() is not None: log_startup("❌ Server process terminated unexpectedly", "ERROR") return False try: response = requests.get(f"{self.server_url}/system_stats", timeout=2) if response.status_code == 200: log_startup("✅ ComfyUI server is ready!") return True except requests.exceptions.RequestException: pass time.sleep(2) log_startup("❌ Server startup timeout", "ERROR") return False except Exception as e: log_startup(f"❌ Failed to start server: {e}", "ERROR") log_startup(f"Traceback: {traceback.format_exc()}", "ERROR") return False def stop_server(self): """Stop ComfyUI server""" if self.server_process: log_startup("Stopping ComfyUI server...") self.server_process.terminate() try: self.server_process.wait(timeout=10) except subprocess.TimeoutExpired: log_startup("Server didn't terminate gracefully, forcing kill...") self.server_process.kill() self.server_process.wait() # Wait for the kill to complete finally: self.server_process = None log_startup("ComfyUI server stopped") def check_models(self): """Check available models""" try: response = requests.get(f"{self.server_url}/object_info", timeout=10) if response.status_code == 200: data = response.json() if "CheckpointLoaderSimple" in data: models = data["CheckpointLoaderSimple"]["input"]["required"]["ckpt_name"][0] log_startup(f"📦 Available models: {models}") return models except Exception as e: log_startup(f"⚠️ Could not fetch models: {e}", "WARNING") return [] def load_workflow(self, workflow_file: str) -> Optional[Dict]: """Load workflow from file""" workflow_path = WORK_DIR / "workflows" / workflow_file if not workflow_path.exists(): log_startup(f"❌ Workflow not found: {workflow_path}", "ERROR") return None try: with open(workflow_path, 'r') as f: return json.load(f) except Exception as e: log_startup(f"❌ Failed to load workflow: {e}", "ERROR") return None def queue_prompt(self, workflow: Dict) -> Optional[str]: """Queue a prompt and return the prompt ID""" try: p = {"prompt": workflow, "client_id": self.client_id} response = requests.post(f"{self.server_url}/prompt", json=p) if response.status_code == 200: return response.json().get('prompt_id') except Exception as e: log_startup(f"❌ Failed to queue prompt: {e}", "ERROR") return None def get_history(self, prompt_id: str) -> Optional[Dict]: """Get generation history""" try: response = requests.get(f"{self.server_url}/history/{prompt_id}") if response.status_code == 200: return response.json() except Exception as e: log_startup(f"❌ Failed to get history: {e}", "ERROR") return None def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> bool: """Wait for prompt completion""" start_time = time.time() while time.time() - start_time < timeout: history = self.get_history(prompt_id) if history and prompt_id in history: if history[prompt_id].get('outputs'): return True time.sleep(1) return False def get_output_images(self, prompt_id: str) -> List[Image.Image]: """Get output images from completed prompt""" images = [] history = self.get_history(prompt_id) if not history or prompt_id not in history: return images outputs = history[prompt_id].get('outputs', {}) for node_id, node_output in outputs.items(): if 'images' in node_output: for image_info in node_output['images']: filename = image_info['filename'] subfolder = image_info.get('subfolder', '') # Fetch image response = requests.get( f"{self.server_url}/view", params={'filename': filename, 'subfolder': subfolder} ) if response.status_code == 200: image = Image.open(io.BytesIO(response.content)) images.append(image) return images # Global manager instance manager = ComfyUIManager() def process_image( style: str, prompt: str, negative_prompt: str = "", seed: int = -1, steps: int = 25, cfg_scale: float = 7.0, width: int = 1024, height: int = 1024, progress=gr.Progress(track_tqdm=True) ) -> List[Image.Image]: """Process image generation request""" progress(0, desc="Initializing...") # Ensure server is running if not manager.ensure_setup(): raise gr.Error("Failed to set up ComfyUI") if not manager.start_server(): raise gr.Error("Failed to start ComfyUI server") progress(0.2, desc="Loading workflow...") # Load workflow workflow_config = DEFAULT_WORKFLOW_CONFIGS.get(style) if not workflow_config: raise gr.Error(f"Unknown style: {style}") workflow = manager.load_workflow(workflow_config["workflow_file"]) if not workflow: raise gr.Error("Failed to load workflow") # Update workflow parameters if seed == -1: seed = int(time.time() * 1000) % 1000000 progress(0.3, desc="Preparing generation...") # Find and update nodes in workflow for node_id, node in workflow.items(): if node.get("class_type") == "CLIPTextEncode": if "positive" in str(node.get("_meta", {}).get("title", "")).lower(): node["inputs"]["text"] = prompt elif "negative" in str(node.get("_meta", {}).get("title", "")).lower(): node["inputs"]["text"] = negative_prompt elif node.get("class_type") == "KSampler": node["inputs"]["seed"] = seed node["inputs"]["steps"] = steps node["inputs"]["cfg"] = cfg_scale elif node.get("class_type") == "EmptyLatentImage": node["inputs"]["width"] = width node["inputs"]["height"] = height # Queue prompt prompt_id = manager.queue_prompt(workflow) if not prompt_id: raise gr.Error("Failed to queue generation") progress(0.4, desc="Generating image...") # Wait for completion if not manager.wait_for_completion(prompt_id, timeout=300): raise gr.Error("Generation timeout") progress(0.9, desc="Retrieving results...") # Get output images images = manager.get_output_images(prompt_id) if not images: raise gr.Error("No images generated") progress(1.0, desc="Complete!") return images def enhance_face( image: Image.Image, enhancement_level: float = 0.5, progress=gr.Progress(track_tqdm=True) ) -> Optional[Image.Image]: """Enhance face in uploaded image""" progress(0, desc="Initializing face enhancement...") # Validate input image if image is None: raise gr.Error("No image provided") # Check image size to prevent DoS MAX_IMAGE_SIZE = 4096 # Maximum dimension if image.width > MAX_IMAGE_SIZE or image.height > MAX_IMAGE_SIZE: raise gr.Error(f"Image too large. Maximum dimension is {MAX_IMAGE_SIZE}px") # Ensure server is running if not manager.ensure_setup(): raise gr.Error("Failed to set up ComfyUI") if not manager.start_server(): raise gr.Error("Failed to start ComfyUI server") progress(0.2, desc="Uploading image...") # Upload image to ComfyUI with unique filename temp_filename = f"input_{uuid.uuid4().hex}_{int(time.time())}.png" temp_path = TEMP_DIR / temp_filename image.save(temp_path) # Upload to ComfyUI with open(temp_path, 'rb') as f: files = {'image': ('image.png', f, 'image/png')} response = requests.post(f"{manager.server_url}/upload/image", files=files) if response.status_code != 200: raise gr.Error("Failed to upload image") upload_data = response.json() uploaded_filename = upload_data['name'] progress(0.3, desc="Loading enhancement workflow...") # Load face enhancement workflow workflow = manager.load_workflow(FACE_ENHANCEMENT_WORKFLOW) if not workflow: raise gr.Error("Failed to load face enhancement workflow") # Update workflow with uploaded image for node_id, node in workflow.items(): if node.get("class_type") == "LoadImage": node["inputs"]["image"] = uploaded_filename progress(0.4, desc="Processing face enhancement...") # Queue and process prompt_id = manager.queue_prompt(workflow) if not prompt_id: raise gr.Error("Failed to queue enhancement") if not manager.wait_for_completion(prompt_id, timeout=120): raise gr.Error("Enhancement timeout") progress(0.9, desc="Retrieving enhanced image...") # Get result images = manager.get_output_images(prompt_id) if not images: raise gr.Error("No enhanced image generated") # Clean up temp file try: temp_path.unlink(missing_ok=True) except Exception as e: log_startup(f"Warning: Failed to clean up temp file: {e}", "WARNING") progress(1.0, desc="Enhancement complete!") return images[0] def create_gradio_interface(): """Create the Gradio interface""" with gr.Blocks(title="Unified UGC Platform", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🎨 Unified UGC Platform Generate high-quality images and enhance faces using state-of-the-art AI models. """) with gr.Tabs(): # Image Generation Tab with gr.TabItem("🖼️ Generate Images"): with gr.Row(): with gr.Column(): style = gr.Dropdown( choices=[(v["name"], k) for k, v in DEFAULT_WORKFLOW_CONFIGS.items()], value="ugc_authentic", label="Style" ) prompt = gr.Textbox( label="Prompt", placeholder="e.g., selfie of a woman in her bedroom, iPhone photo, natural light, casual outfit, authentic moment", lines=3 ) negative_prompt = gr.Textbox( label="Negative Prompt", placeholder="e.g., perfect skin, studio lighting, professional photography, airbrushed, plastic skin", lines=2 ) with gr.Row(): seed = gr.Number(label="Seed", value=-1, precision=0) steps = gr.Slider(label="Steps", minimum=1, maximum=50, value=25, step=1) with gr.Row(): cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, value=5, step=0.5) with gr.Row(): width = gr.Slider(label="Width", minimum=256, maximum=2048, value=1024, step=64) height = gr.Slider(label="Height", minimum=256, maximum=2048, value=1024, step=64) generate_btn = gr.Button("🎨 Generate", variant="primary") with gr.Column(): output_gallery = gr.Gallery( label="Generated Images", show_label=True, elem_id="gallery", columns=2, rows=2, height="600px" ) # Wire up generation generate_btn.click( fn=process_image, inputs=[style, prompt, negative_prompt, seed, steps, cfg_scale, width, height], outputs=output_gallery ) # Face Enhancement Tab with gr.TabItem("✨ Enhance Faces"): with gr.Row(): with gr.Column(): input_image = gr.Image( label="Upload Image", type="pil" ) enhancement_level = gr.Slider( label="Enhancement Level", minimum=0, maximum=1, value=0.5, step=0.1 ) enhance_btn = gr.Button("✨ Enhance Face", variant="primary") with gr.Column(): output_image = gr.Image( label="Enhanced Image", type="pil" ) # Wire up enhancement enhance_btn.click( fn=enhance_face, inputs=[input_image, enhancement_level], outputs=output_image ) # System Status Tab with gr.TabItem("🔧 System Status"): with gr.Column(): gr.Markdown("### Startup Log") log_display = gr.Textbox( value=lambda: "\n".join(STARTUP_LOG[-50:]), label="Recent Logs", lines=20, max_lines=30, interactive=False ) gr.Markdown("### Server Log") server_log_display = gr.Textbox( value=lambda: "\n".join(manager.server_log[-50:]), label="ComfyUI Server Logs", lines=20, max_lines=30, interactive=False ) refresh_btn = gr.Button("🔄 Refresh Logs") def refresh_logs(): return ( "\n".join(STARTUP_LOG[-50:]), "\n".join(manager.server_log[-50:]) ) refresh_btn.click( fn=refresh_logs, outputs=[log_display, server_log_display] ) gr.Markdown(""" --- Made with ❤️ using ComfyUI and Gradio """) return app # Add cleanup handler import atexit import signal def cleanup_on_exit(): """Clean up resources on exit""" log_startup("Cleaning up resources...") manager.stop_server() log_startup("Cleanup complete") # Register cleanup handlers atexit.register(cleanup_on_exit) signal.signal(signal.SIGTERM, lambda s, f: cleanup_on_exit()) # Main execution if __name__ == "__main__": log_startup("=" * 50) log_startup("🚀 STARTING UNIFIED UGC PLATFORM") log_startup("=" * 50) try: # Step 1: Ensure environment (cloning repos) is set up if not manager.ensure_setup(): log_startup("❌ Initial setup failed", "ERROR") raise RuntimeError("Setup failed") # Step 2: Download all models after ComfyUI is set up log_startup("🎨 Downloading Authenticity Stack models...") download_models() manager.models_downloaded = True # Step 3: Start server log_startup("🔧 Starting ComfyUI server...") if not manager.start_server(): log_startup("❌ Failed to start ComfyUI server", "ERROR") raise RuntimeError("Server startup failed") # Check models models = manager.check_models() if models: log_startup(f"✅ Found {len(models)} models") # Create and launch Gradio app log_startup("🎨 Creating Gradio interface...") app = create_gradio_interface() log_startup("🌟 Launching application...") app.launch( server_name="0.0.0.0" if IS_SPACES else "127.0.0.1", server_port=7860, share=True if IS_SPACES else False ) except Exception as e: log_startup(f"❌ FATAL ERROR: {e}", "ERROR") log_startup(f"Traceback: {traceback.format_exc()}", "ERROR") # Create error display app with gr.Blocks() as error_app: gr.Markdown("# ❌ Application Failed to Start") gr.Markdown(f"**Error:** {str(e)}") gr.Textbox( value="\n".join(STARTUP_LOG), label="Startup Log", lines=30, interactive=False ) error_app.launch( server_name="0.0.0.0" if IS_SPACES else "127.0.0.1", server_port=7860, share=True if IS_SPACES else False )