|
|
|
|
|
""" |
|
|
Unified UGC Platform + Face Enhancer for Hugging Face Spaces |
|
|
Combines all functionality into a single ComfyUI-powered space |
|
|
""" |
|
|
|
|
|
|
|
|
try: |
|
|
from gradio_patch import apply_gradio_patch |
|
|
apply_gradio_patch() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
import gradio as gr |
|
|
import json |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import requests |
|
|
import io |
|
|
import base64 |
|
|
import time |
|
|
import os |
|
|
import sys |
|
|
import subprocess |
|
|
import threading |
|
|
from typing import Dict, List, Optional, Tuple, Any |
|
|
from pathlib import Path |
|
|
import traceback |
|
|
from datetime import datetime |
|
|
import torch |
|
|
import select |
|
|
import uuid |
|
|
|
|
|
|
|
|
IS_SPACES = os.environ.get("SPACE_ID") is not None |
|
|
REBUILD_MARKER = "2025-01-06-v3" |
|
|
|
|
|
|
|
|
if IS_SPACES: |
|
|
WORK_DIR = Path("/home/user/app") |
|
|
else: |
|
|
WORK_DIR = Path.cwd() |
|
|
|
|
|
|
|
|
STARTUP_LOG = [] |
|
|
def log_startup(message: str, level: str = "INFO"): |
|
|
"""Log startup messages with timestamp""" |
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
entry = f"[{timestamp}] [{level}] {message}" |
|
|
STARTUP_LOG.append(entry) |
|
|
print(entry) |
|
|
|
|
|
os.chdir(WORK_DIR) |
|
|
|
|
|
|
|
|
if IS_SPACES: |
|
|
log_startup(f"π Running in Hugging Face Spaces") |
|
|
log_startup(f"π Working directory: {WORK_DIR}") |
|
|
log_startup(f"π Files in directory: {list(WORK_DIR.iterdir())}") |
|
|
log_startup(f"π Python version: {sys.version}") |
|
|
log_startup(f"π₯ PyTorch available: {torch.cuda.is_available()}") |
|
|
log_startup(f"π Code version: {REBUILD_MARKER}") |
|
|
|
|
|
|
|
|
if Path("/app/comfyui/ComfyUI").exists(): |
|
|
log_startup("β
ComfyUI pre-installed in Docker") |
|
|
|
|
|
comfyui_link = WORK_DIR / "comfyui" |
|
|
if not comfyui_link.exists(): |
|
|
os.symlink("/app/comfyui", str(comfyui_link)) |
|
|
log_startup("β
Created ComfyUI symlink") |
|
|
elif not (WORK_DIR / "comfyui/ComfyUI").exists(): |
|
|
log_startup("β οΈ ComfyUI not found, will set up if needed", "WARNING") |
|
|
|
|
|
|
|
|
def setup_environment(): |
|
|
"""Run setup script to clone repositories""" |
|
|
setup_script = WORK_DIR / "setup.sh" |
|
|
|
|
|
if not setup_script.exists(): |
|
|
log_startup("β setup.sh not found!", "ERROR") |
|
|
return False |
|
|
|
|
|
try: |
|
|
log_startup(f"π§ Running setup script: {setup_script}") |
|
|
|
|
|
|
|
|
os.chmod(str(setup_script), 0o755) |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
['bash', str(setup_script)], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
cwd=str(WORK_DIR) |
|
|
) |
|
|
|
|
|
if result.returncode == 0: |
|
|
log_startup("β
Setup completed successfully") |
|
|
return True |
|
|
else: |
|
|
log_startup(f"β Setup failed with return code {result.returncode}", "ERROR") |
|
|
log_startup(f"stdout: {result.stdout}", "ERROR") |
|
|
log_startup(f"stderr: {result.stderr}", "ERROR") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
log_startup(f"β Failed to run setup script: {e}", "ERROR") |
|
|
log_startup(f"Traceback: {traceback.format_exc()}", "ERROR") |
|
|
return False |
|
|
|
|
|
def download_models(): |
|
|
"""Download the full 'Authenticity Stack' for UGC from Hugging Face Hub.""" |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
log_startup("=== π₯ Starting Model Download Process (Authenticity Stack) ===") |
|
|
|
|
|
models_to_download = [ |
|
|
|
|
|
|
|
|
("SG161222/RealVisXL_V4.0", "RealVisXL_V4.0.safetensors", "comfyui/ComfyUI/models/checkpoints"), |
|
|
|
|
|
|
|
|
("stabilityai/stable-diffusion-xl-base-1.0", "sd_xl_base_1.0.safetensors", "comfyui/ComfyUI/models/checkpoints"), |
|
|
|
|
|
|
|
|
("stabilityai/sdxl-vae", "sdxl_vae.safetensors", "comfyui/ComfyUI/models/vae"), |
|
|
|
|
|
|
|
|
|
|
|
("philz1337x/epicrealism", "epicrealism_naturalSinRC1VAE.safetensors", "comfyui/ComfyUI/models/loras/epiCRealism - Natural photographic.safetensors"), |
|
|
|
|
|
|
|
|
|
|
|
("PvDeep/Add-Detail-XL", "add-detail-xl.safetensors", "comfyui/ComfyUI/models/loras/detail_tweaker_xl_v2.safetensors"), |
|
|
|
|
|
|
|
|
|
|
|
("artificialguybr/filmgrain-redmond-filmgrain-lora-for-sdxl", "FilmGrainRedmond-FilmGrain-FilmGrainAF.safetensors", "comfyui/ComfyUI/models/loras/film_grain_helper_sdxl.safetensors") |
|
|
] |
|
|
|
|
|
for repo_id, filename, dest_path in models_to_download: |
|
|
|
|
|
full_path = WORK_DIR / dest_path |
|
|
if Path(dest_path).suffix: |
|
|
full_path = WORK_DIR / Path(dest_path) |
|
|
local_dir = str(full_path.parent) |
|
|
else: |
|
|
full_path = WORK_DIR / dest_path / filename |
|
|
local_dir = str(WORK_DIR / dest_path) |
|
|
|
|
|
if not full_path.exists(): |
|
|
log_startup(f"Downloading {filename} from {repo_id}...") |
|
|
try: |
|
|
full_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=filename, |
|
|
local_dir=local_dir, |
|
|
local_dir_use_symlinks=False, |
|
|
resume_download=True |
|
|
) |
|
|
log_startup(f"β
Downloaded {filename}") |
|
|
except Exception as e: |
|
|
log_startup(f"β οΈ Failed to download {filename}: {e}", "WARNING") |
|
|
else: |
|
|
log_startup(f"β
Found existing model: {full_path.name}") |
|
|
|
|
|
log_startup("=== Model Download Process Complete ===") |
|
|
|
|
|
|
|
|
COMFYUI_PATH = WORK_DIR / "comfyui/ComfyUI" |
|
|
MODEL_DIR = WORK_DIR / "models" |
|
|
OUTPUT_DIR = WORK_DIR / "outputs" |
|
|
TEMP_DIR = WORK_DIR / "temp" |
|
|
|
|
|
|
|
|
for dir_path in [MODEL_DIR, OUTPUT_DIR, TEMP_DIR]: |
|
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if COMFYUI_PATH.exists(): |
|
|
sys.path.insert(0, str(COMFYUI_PATH)) |
|
|
log_startup(f"β
Added ComfyUI to Python path: {COMFYUI_PATH}") |
|
|
|
|
|
|
|
|
DEFAULT_WORKFLOW_CONFIGS = { |
|
|
"ugc_authentic": { |
|
|
"name": "Authentic UGC Style (Recommended)", |
|
|
"workflow_file": "organic_portrait_ugc.json" |
|
|
}, |
|
|
"portrait": { |
|
|
"name": "High-Resolution Portrait", |
|
|
"workflow_file": "organic_portrait_workflow.json" |
|
|
}, |
|
|
"full_body": { |
|
|
"name": "Full Body Portrait", |
|
|
"workflow_file": "full_body_workflow.json" |
|
|
}, |
|
|
"street": { |
|
|
"name": "Street Photography Style", |
|
|
"workflow_file": "street_photo_workflow.json" |
|
|
}, |
|
|
"flux": { |
|
|
"name": "FLUX Model", |
|
|
"workflow_file": "flux_workflow.json" |
|
|
} |
|
|
} |
|
|
|
|
|
FACE_ENHANCEMENT_WORKFLOW = "face_enhancement_workflow.json" |
|
|
|
|
|
class ComfyUIManager: |
|
|
def __init__(self): |
|
|
"""Initialize ComfyUI Manager""" |
|
|
self.server_process = None |
|
|
self.server_url = "http://127.0.0.1:8188" |
|
|
self.ws_url = "ws://127.0.0.1:8188/ws" |
|
|
self.client_id = None |
|
|
self.setup_complete = False |
|
|
self.models_downloaded = False |
|
|
self.server_log = [] |
|
|
self.max_log_lines = 500 |
|
|
log_startup("ComfyUI Manager initialized") |
|
|
|
|
|
def ensure_setup(self): |
|
|
"""Ensure ComfyUI is set up""" |
|
|
if self.setup_complete: |
|
|
return True |
|
|
|
|
|
if not COMFYUI_PATH.exists(): |
|
|
log_startup("π§ ComfyUI not found, running setup...") |
|
|
if not setup_environment(): |
|
|
return False |
|
|
|
|
|
self.setup_complete = True |
|
|
return True |
|
|
|
|
|
def _log_output(self, pipe, prefix): |
|
|
"""Log output from a pipe""" |
|
|
try: |
|
|
while True: |
|
|
line = pipe.readline() |
|
|
if not line: |
|
|
break |
|
|
line = line.strip() |
|
|
if line: |
|
|
log_entry = f"{prefix}: {line}" |
|
|
log_startup(log_entry) |
|
|
self.server_log.append(log_entry) |
|
|
if len(self.server_log) > self.max_log_lines: |
|
|
self.server_log.pop(0) |
|
|
except Exception as e: |
|
|
log_startup(f"Error reading {prefix}: {e}", "ERROR") |
|
|
|
|
|
def start_server(self): |
|
|
"""Start ComfyUI server with simplified logic""" |
|
|
if self.server_process and self.server_process.poll() is None: |
|
|
log_startup("ComfyUI server already running") |
|
|
return True |
|
|
|
|
|
log_startup("π Starting ComfyUI server...") |
|
|
|
|
|
try: |
|
|
|
|
|
cmd = [ |
|
|
sys.executable, |
|
|
"main.py", |
|
|
"--listen", "127.0.0.1", |
|
|
"--port", "8188", |
|
|
"--disable-auto-launch" |
|
|
] |
|
|
|
|
|
|
|
|
log_startup(f"Server command: {' '.join(cmd)}") |
|
|
log_startup("β
Confirmed: NO --use-legacy-frontend flag in command") |
|
|
|
|
|
self.server_process = subprocess.Popen( |
|
|
cmd, |
|
|
cwd=str(COMFYUI_PATH), |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True, |
|
|
bufsize=1, |
|
|
env={**os.environ, "PYTHONUNBUFFERED": "1"} |
|
|
) |
|
|
|
|
|
|
|
|
stdout_thread = threading.Thread( |
|
|
target=self._log_output, |
|
|
args=(self.server_process.stdout, "STDOUT"), |
|
|
daemon=True |
|
|
) |
|
|
stderr_thread = threading.Thread( |
|
|
target=self._log_output, |
|
|
args=(self.server_process.stderr, "STDERR"), |
|
|
daemon=True |
|
|
) |
|
|
|
|
|
stdout_thread.start() |
|
|
stderr_thread.start() |
|
|
|
|
|
|
|
|
log_startup("β³ Waiting for ComfyUI server to be ready...") |
|
|
start_time = time.time() |
|
|
timeout = 120 |
|
|
|
|
|
while time.time() - start_time < timeout: |
|
|
if self.server_process.poll() is not None: |
|
|
log_startup("β Server process terminated unexpectedly", "ERROR") |
|
|
return False |
|
|
|
|
|
try: |
|
|
response = requests.get(f"{self.server_url}/system_stats", timeout=2) |
|
|
if response.status_code == 200: |
|
|
log_startup("β
ComfyUI server is ready!") |
|
|
return True |
|
|
except requests.exceptions.RequestException: |
|
|
pass |
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
log_startup("β Server startup timeout", "ERROR") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
log_startup(f"β Failed to start server: {e}", "ERROR") |
|
|
log_startup(f"Traceback: {traceback.format_exc()}", "ERROR") |
|
|
return False |
|
|
|
|
|
def stop_server(self): |
|
|
"""Stop ComfyUI server""" |
|
|
if self.server_process: |
|
|
log_startup("Stopping ComfyUI server...") |
|
|
self.server_process.terminate() |
|
|
try: |
|
|
self.server_process.wait(timeout=10) |
|
|
except subprocess.TimeoutExpired: |
|
|
log_startup("Server didn't terminate gracefully, forcing kill...") |
|
|
self.server_process.kill() |
|
|
self.server_process.wait() |
|
|
finally: |
|
|
self.server_process = None |
|
|
log_startup("ComfyUI server stopped") |
|
|
|
|
|
def check_models(self): |
|
|
"""Check available models""" |
|
|
try: |
|
|
response = requests.get(f"{self.server_url}/object_info", timeout=10) |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
if "CheckpointLoaderSimple" in data: |
|
|
models = data["CheckpointLoaderSimple"]["input"]["required"]["ckpt_name"][0] |
|
|
log_startup(f"π¦ Available models: {models}") |
|
|
return models |
|
|
except Exception as e: |
|
|
log_startup(f"β οΈ Could not fetch models: {e}", "WARNING") |
|
|
return [] |
|
|
|
|
|
def load_workflow(self, workflow_file: str) -> Optional[Dict]: |
|
|
"""Load workflow from file""" |
|
|
workflow_path = WORK_DIR / "workflows" / workflow_file |
|
|
if not workflow_path.exists(): |
|
|
log_startup(f"β Workflow not found: {workflow_path}", "ERROR") |
|
|
return None |
|
|
|
|
|
try: |
|
|
with open(workflow_path, 'r') as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
log_startup(f"β Failed to load workflow: {e}", "ERROR") |
|
|
return None |
|
|
|
|
|
def queue_prompt(self, workflow: Dict) -> Optional[str]: |
|
|
"""Queue a prompt and return the prompt ID""" |
|
|
try: |
|
|
p = {"prompt": workflow, "client_id": self.client_id} |
|
|
response = requests.post(f"{self.server_url}/prompt", json=p) |
|
|
if response.status_code == 200: |
|
|
return response.json().get('prompt_id') |
|
|
except Exception as e: |
|
|
log_startup(f"β Failed to queue prompt: {e}", "ERROR") |
|
|
return None |
|
|
|
|
|
def get_history(self, prompt_id: str) -> Optional[Dict]: |
|
|
"""Get generation history""" |
|
|
try: |
|
|
response = requests.get(f"{self.server_url}/history/{prompt_id}") |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
except Exception as e: |
|
|
log_startup(f"β Failed to get history: {e}", "ERROR") |
|
|
return None |
|
|
|
|
|
def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> bool: |
|
|
"""Wait for prompt completion""" |
|
|
start_time = time.time() |
|
|
while time.time() - start_time < timeout: |
|
|
history = self.get_history(prompt_id) |
|
|
if history and prompt_id in history: |
|
|
if history[prompt_id].get('outputs'): |
|
|
return True |
|
|
time.sleep(1) |
|
|
return False |
|
|
|
|
|
def get_output_images(self, prompt_id: str) -> List[Image.Image]: |
|
|
"""Get output images from completed prompt""" |
|
|
images = [] |
|
|
history = self.get_history(prompt_id) |
|
|
|
|
|
if not history or prompt_id not in history: |
|
|
return images |
|
|
|
|
|
outputs = history[prompt_id].get('outputs', {}) |
|
|
for node_id, node_output in outputs.items(): |
|
|
if 'images' in node_output: |
|
|
for image_info in node_output['images']: |
|
|
filename = image_info['filename'] |
|
|
subfolder = image_info.get('subfolder', '') |
|
|
|
|
|
|
|
|
response = requests.get( |
|
|
f"{self.server_url}/view", |
|
|
params={'filename': filename, 'subfolder': subfolder} |
|
|
) |
|
|
if response.status_code == 200: |
|
|
image = Image.open(io.BytesIO(response.content)) |
|
|
images.append(image) |
|
|
|
|
|
return images |
|
|
|
|
|
|
|
|
manager = ComfyUIManager() |
|
|
|
|
|
def process_image( |
|
|
style: str, |
|
|
prompt: str, |
|
|
negative_prompt: str = "", |
|
|
seed: int = -1, |
|
|
steps: int = 25, |
|
|
cfg_scale: float = 7.0, |
|
|
width: int = 1024, |
|
|
height: int = 1024, |
|
|
progress=gr.Progress(track_tqdm=True) |
|
|
) -> List[Image.Image]: |
|
|
"""Process image generation request""" |
|
|
|
|
|
progress(0, desc="Initializing...") |
|
|
|
|
|
|
|
|
if not manager.ensure_setup(): |
|
|
raise gr.Error("Failed to set up ComfyUI") |
|
|
|
|
|
if not manager.start_server(): |
|
|
raise gr.Error("Failed to start ComfyUI server") |
|
|
|
|
|
progress(0.2, desc="Loading workflow...") |
|
|
|
|
|
|
|
|
workflow_config = DEFAULT_WORKFLOW_CONFIGS.get(style) |
|
|
if not workflow_config: |
|
|
raise gr.Error(f"Unknown style: {style}") |
|
|
|
|
|
workflow = manager.load_workflow(workflow_config["workflow_file"]) |
|
|
if not workflow: |
|
|
raise gr.Error("Failed to load workflow") |
|
|
|
|
|
|
|
|
if seed == -1: |
|
|
seed = int(time.time() * 1000) % 1000000 |
|
|
|
|
|
progress(0.3, desc="Preparing generation...") |
|
|
|
|
|
|
|
|
for node_id, node in workflow.items(): |
|
|
if node.get("class_type") == "CLIPTextEncode": |
|
|
if "positive" in str(node.get("_meta", {}).get("title", "")).lower(): |
|
|
node["inputs"]["text"] = prompt |
|
|
elif "negative" in str(node.get("_meta", {}).get("title", "")).lower(): |
|
|
node["inputs"]["text"] = negative_prompt |
|
|
|
|
|
elif node.get("class_type") == "KSampler": |
|
|
node["inputs"]["seed"] = seed |
|
|
node["inputs"]["steps"] = steps |
|
|
node["inputs"]["cfg"] = cfg_scale |
|
|
|
|
|
elif node.get("class_type") == "EmptyLatentImage": |
|
|
node["inputs"]["width"] = width |
|
|
node["inputs"]["height"] = height |
|
|
|
|
|
|
|
|
prompt_id = manager.queue_prompt(workflow) |
|
|
if not prompt_id: |
|
|
raise gr.Error("Failed to queue generation") |
|
|
|
|
|
progress(0.4, desc="Generating image...") |
|
|
|
|
|
|
|
|
if not manager.wait_for_completion(prompt_id, timeout=300): |
|
|
raise gr.Error("Generation timeout") |
|
|
|
|
|
progress(0.9, desc="Retrieving results...") |
|
|
|
|
|
|
|
|
images = manager.get_output_images(prompt_id) |
|
|
if not images: |
|
|
raise gr.Error("No images generated") |
|
|
|
|
|
progress(1.0, desc="Complete!") |
|
|
return images |
|
|
|
|
|
def enhance_face( |
|
|
image: Image.Image, |
|
|
enhancement_level: float = 0.5, |
|
|
progress=gr.Progress(track_tqdm=True) |
|
|
) -> Optional[Image.Image]: |
|
|
"""Enhance face in uploaded image""" |
|
|
|
|
|
progress(0, desc="Initializing face enhancement...") |
|
|
|
|
|
|
|
|
if image is None: |
|
|
raise gr.Error("No image provided") |
|
|
|
|
|
|
|
|
MAX_IMAGE_SIZE = 4096 |
|
|
if image.width > MAX_IMAGE_SIZE or image.height > MAX_IMAGE_SIZE: |
|
|
raise gr.Error(f"Image too large. Maximum dimension is {MAX_IMAGE_SIZE}px") |
|
|
|
|
|
|
|
|
if not manager.ensure_setup(): |
|
|
raise gr.Error("Failed to set up ComfyUI") |
|
|
|
|
|
if not manager.start_server(): |
|
|
raise gr.Error("Failed to start ComfyUI server") |
|
|
|
|
|
progress(0.2, desc="Uploading image...") |
|
|
|
|
|
|
|
|
temp_filename = f"input_{uuid.uuid4().hex}_{int(time.time())}.png" |
|
|
temp_path = TEMP_DIR / temp_filename |
|
|
image.save(temp_path) |
|
|
|
|
|
|
|
|
with open(temp_path, 'rb') as f: |
|
|
files = {'image': ('image.png', f, 'image/png')} |
|
|
response = requests.post(f"{manager.server_url}/upload/image", files=files) |
|
|
|
|
|
if response.status_code != 200: |
|
|
raise gr.Error("Failed to upload image") |
|
|
|
|
|
upload_data = response.json() |
|
|
uploaded_filename = upload_data['name'] |
|
|
|
|
|
progress(0.3, desc="Loading enhancement workflow...") |
|
|
|
|
|
|
|
|
workflow = manager.load_workflow(FACE_ENHANCEMENT_WORKFLOW) |
|
|
if not workflow: |
|
|
raise gr.Error("Failed to load face enhancement workflow") |
|
|
|
|
|
|
|
|
for node_id, node in workflow.items(): |
|
|
if node.get("class_type") == "LoadImage": |
|
|
node["inputs"]["image"] = uploaded_filename |
|
|
|
|
|
progress(0.4, desc="Processing face enhancement...") |
|
|
|
|
|
|
|
|
prompt_id = manager.queue_prompt(workflow) |
|
|
if not prompt_id: |
|
|
raise gr.Error("Failed to queue enhancement") |
|
|
|
|
|
if not manager.wait_for_completion(prompt_id, timeout=120): |
|
|
raise gr.Error("Enhancement timeout") |
|
|
|
|
|
progress(0.9, desc="Retrieving enhanced image...") |
|
|
|
|
|
|
|
|
images = manager.get_output_images(prompt_id) |
|
|
if not images: |
|
|
raise gr.Error("No enhanced image generated") |
|
|
|
|
|
|
|
|
try: |
|
|
temp_path.unlink(missing_ok=True) |
|
|
except Exception as e: |
|
|
log_startup(f"Warning: Failed to clean up temp file: {e}", "WARNING") |
|
|
|
|
|
progress(1.0, desc="Enhancement complete!") |
|
|
return images[0] |
|
|
|
|
|
def create_gradio_interface(): |
|
|
"""Create the Gradio interface""" |
|
|
|
|
|
with gr.Blocks(title="Unified UGC Platform", theme=gr.themes.Soft()) as app: |
|
|
gr.Markdown(""" |
|
|
# π¨ Unified UGC Platform |
|
|
|
|
|
Generate high-quality images and enhance faces using state-of-the-art AI models. |
|
|
""") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.TabItem("πΌοΈ Generate Images"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
style = gr.Dropdown( |
|
|
choices=[(v["name"], k) for k, v in DEFAULT_WORKFLOW_CONFIGS.items()], |
|
|
value="ugc_authentic", |
|
|
label="Style" |
|
|
) |
|
|
prompt = gr.Textbox( |
|
|
label="Prompt", |
|
|
placeholder="e.g., selfie of a woman in her bedroom, iPhone photo, natural light, casual outfit, authentic moment", |
|
|
lines=3 |
|
|
) |
|
|
negative_prompt = gr.Textbox( |
|
|
label="Negative Prompt", |
|
|
placeholder="e.g., perfect skin, studio lighting, professional photography, airbrushed, plastic skin", |
|
|
lines=2 |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
seed = gr.Number(label="Seed", value=-1, precision=0) |
|
|
steps = gr.Slider(label="Steps", minimum=1, maximum=50, value=25, step=1) |
|
|
|
|
|
with gr.Row(): |
|
|
cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, value=5, step=0.5) |
|
|
|
|
|
with gr.Row(): |
|
|
width = gr.Slider(label="Width", minimum=256, maximum=2048, value=1024, step=64) |
|
|
height = gr.Slider(label="Height", minimum=256, maximum=2048, value=1024, step=64) |
|
|
|
|
|
generate_btn = gr.Button("π¨ Generate", variant="primary") |
|
|
|
|
|
with gr.Column(): |
|
|
output_gallery = gr.Gallery( |
|
|
label="Generated Images", |
|
|
show_label=True, |
|
|
elem_id="gallery", |
|
|
columns=2, |
|
|
rows=2, |
|
|
height="600px" |
|
|
) |
|
|
|
|
|
|
|
|
generate_btn.click( |
|
|
fn=process_image, |
|
|
inputs=[style, prompt, negative_prompt, seed, steps, cfg_scale, width, height], |
|
|
outputs=output_gallery |
|
|
) |
|
|
|
|
|
|
|
|
with gr.TabItem("β¨ Enhance Faces"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
input_image = gr.Image( |
|
|
label="Upload Image", |
|
|
type="pil" |
|
|
) |
|
|
enhancement_level = gr.Slider( |
|
|
label="Enhancement Level", |
|
|
minimum=0, |
|
|
maximum=1, |
|
|
value=0.5, |
|
|
step=0.1 |
|
|
) |
|
|
enhance_btn = gr.Button("β¨ Enhance Face", variant="primary") |
|
|
|
|
|
with gr.Column(): |
|
|
output_image = gr.Image( |
|
|
label="Enhanced Image", |
|
|
type="pil" |
|
|
) |
|
|
|
|
|
|
|
|
enhance_btn.click( |
|
|
fn=enhance_face, |
|
|
inputs=[input_image, enhancement_level], |
|
|
outputs=output_image |
|
|
) |
|
|
|
|
|
|
|
|
with gr.TabItem("π§ System Status"): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Startup Log") |
|
|
log_display = gr.Textbox( |
|
|
value=lambda: "\n".join(STARTUP_LOG[-50:]), |
|
|
label="Recent Logs", |
|
|
lines=20, |
|
|
max_lines=30, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
gr.Markdown("### Server Log") |
|
|
server_log_display = gr.Textbox( |
|
|
value=lambda: "\n".join(manager.server_log[-50:]), |
|
|
label="ComfyUI Server Logs", |
|
|
lines=20, |
|
|
max_lines=30, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
refresh_btn = gr.Button("π Refresh Logs") |
|
|
|
|
|
def refresh_logs(): |
|
|
return ( |
|
|
"\n".join(STARTUP_LOG[-50:]), |
|
|
"\n".join(manager.server_log[-50:]) |
|
|
) |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=refresh_logs, |
|
|
outputs=[log_display, server_log_display] |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
Made with β€οΈ using ComfyUI and Gradio |
|
|
""") |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
import atexit |
|
|
import signal |
|
|
|
|
|
def cleanup_on_exit(): |
|
|
"""Clean up resources on exit""" |
|
|
log_startup("Cleaning up resources...") |
|
|
manager.stop_server() |
|
|
log_startup("Cleanup complete") |
|
|
|
|
|
|
|
|
atexit.register(cleanup_on_exit) |
|
|
signal.signal(signal.SIGTERM, lambda s, f: cleanup_on_exit()) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
log_startup("=" * 50) |
|
|
log_startup("π STARTING UNIFIED UGC PLATFORM") |
|
|
log_startup("=" * 50) |
|
|
|
|
|
try: |
|
|
|
|
|
if not manager.ensure_setup(): |
|
|
log_startup("β Initial setup failed", "ERROR") |
|
|
raise RuntimeError("Setup failed") |
|
|
|
|
|
|
|
|
log_startup("π¨ Downloading Authenticity Stack models...") |
|
|
download_models() |
|
|
manager.models_downloaded = True |
|
|
|
|
|
|
|
|
log_startup("π§ Starting ComfyUI server...") |
|
|
if not manager.start_server(): |
|
|
log_startup("β Failed to start ComfyUI server", "ERROR") |
|
|
raise RuntimeError("Server startup failed") |
|
|
|
|
|
|
|
|
models = manager.check_models() |
|
|
if models: |
|
|
log_startup(f"β
Found {len(models)} models") |
|
|
|
|
|
|
|
|
log_startup("π¨ Creating Gradio interface...") |
|
|
app = create_gradio_interface() |
|
|
|
|
|
log_startup("π Launching application...") |
|
|
app.launch( |
|
|
server_name="0.0.0.0" if IS_SPACES else "127.0.0.1", |
|
|
server_port=7860, |
|
|
share=True if IS_SPACES else False |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
log_startup(f"β FATAL ERROR: {e}", "ERROR") |
|
|
log_startup(f"Traceback: {traceback.format_exc()}", "ERROR") |
|
|
|
|
|
|
|
|
with gr.Blocks() as error_app: |
|
|
gr.Markdown("# β Application Failed to Start") |
|
|
gr.Markdown(f"**Error:** {str(e)}") |
|
|
gr.Textbox( |
|
|
value="\n".join(STARTUP_LOG), |
|
|
label="Startup Log", |
|
|
lines=30, |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
error_app.launch( |
|
|
server_name="0.0.0.0" if IS_SPACES else "127.0.0.1", |
|
|
server_port=7860, |
|
|
share=True if IS_SPACES else False |
|
|
) |