diff --git "a/__lib__/app.py" "b/__lib__/app.py" deleted file mode 100644--- "a/__lib__/app.py" +++ /dev/null @@ -1,2055 +0,0 @@ -import gradio as gr -import threading -import os -import shutil -import tempfile -import time -import json -import util -from util import (process_image_edit, process_text_to_image, process_image_upscale, - process_face_swap, process_multi_image_edit, process_watermark_removal, - download_and_check_result_nsfw, GoodWebsiteUrl, RateLimitConfig, create_mask_from_layers, - TASK_LOCK_MINUTES, PRINT_STATS_INTERVAL, NSFW_BLUR_SECONDS) -from nfsw import NSFWDetector - - -# 创建全局配置实例 -rate_limit_config = RateLimitConfig() - - -# ============================================================================= -# IP归属地判断 - 保留用于限速和统计 -# ============================================================================= - - -def has_active_task(client_ip): - """ - 检查IP是否有正在进行的任务(3分钟内) - - Returns: - tuple: (has_active, remaining_seconds) - """ - if client_ip not in util.IP_Active_Tasks: - return False, 0 - - start_time = util.IP_Active_Tasks[client_ip] - elapsed = time.time() - start_time - lock_seconds = TASK_LOCK_MINUTES * 60 - - if elapsed < lock_seconds: - remaining = lock_seconds - elapsed - return True, remaining - else: - # 任务锁定已过期,清除 - del util.IP_Active_Tasks[client_ip] - return False, 0 - - -def set_active_task(client_ip, active=True): - """ - 设置或清除IP的活动任务状态 - """ - if active: - util.IP_Active_Tasks[client_ip] = time.time() - # print(f"🔒 Task locked for IP: {client_ip}") - else: - if client_ip in util.IP_Active_Tasks: - del util.IP_Active_Tasks[client_ip] - # 记录任务完成时间 - util.IP_Last_Task_Time[client_ip] = time.time() - # print(f"🔓 Task unlocked for IP: {client_ip}") - - - -country_dict = { - "zh": ["中国"], - "hi": ["印度"], - "fi": ["芬兰"], - "en": ["美国", "澳大利亚", "英国", "加拿大", "新西兰", "爱尔兰"], - "es": ["西班牙", "墨西哥", "阿根廷", "哥伦比亚", "智利", "秘鲁"], - "pt": ["葡萄牙", "巴西"], - "fr": ["法国", "摩纳哥"], - "de": ["德国", "奥地利"], - "it": ["意大利", "圣马力诺", "梵蒂冈"], - "ja": ["日本"], - "ru": ["俄罗斯"], - "uk": ["乌克兰"], - "ar": ["沙特阿拉伯", "埃及", "阿拉伯联合酋长国", "摩洛哥"], - "nl": ["荷兰"], - "no": ["挪威"], - "sv": ["瑞典"], - "id": ["印度尼西亚"], - "vi": ["越南"], - "he": ["以色列"], - "tr": ["土耳其"], - "da": ["丹麦"], -} - - -def query_ip_country(client_ip): - """查询IP地址地理信息""" - if client_ip in util.IP_Country_Cache: - return util.IP_Country_Cache[client_ip] - - if not client_ip or client_ip in ["127.0.0.1", "localhost", "::1"]: - default_geo = {"country": "Unknown", "region": "Unknown", "city": "Unknown"} - util.IP_Country_Cache[client_ip] = default_geo - return default_geo - - try: - import requests - from requests.exceptions import Timeout, ConnectionError, RequestException - - api_url = f"https://api.vore.top/api/IPdata?ip={client_ip}" - response = requests.get(api_url, timeout=3) - - if response.status_code == 200: - data = response.json() - if data.get("code") == 200 and "ipdata" in data: - ipdata = data["ipdata"] - geo_info = { - "country": ipdata.get("info1", "Unknown"), - "region": ipdata.get("info2", "Unknown"), - "city": ipdata.get("info3", "Unknown") - } - util.IP_Country_Cache[client_ip] = geo_info - return geo_info - except Exception as e: - print(f"Error querying IP {client_ip}: {e}") - - default_geo = {"country": "Unknown", "region": "Unknown", "city": "Unknown"} - util.IP_Country_Cache[client_ip] = default_geo - return default_geo - - -def get_lang_from_country(country): - """根据国家获取语言代码""" - if not country or country == "Unknown": - return "en" - - for lang, countries in country_dict.items(): - if country in countries: - return lang - return "en" - - -def get_lang_from_ip(client_ip): - """根据IP获取语言""" - geo_info = query_ip_country(client_ip) - return get_lang_from_country(geo_info.get("country", "Unknown")) - - -def get_ip_generation_count(client_ip): - """获取IP生成次数""" - return util.IP_Generation_Count.get(client_ip, 0) - - -def increment_ip_generation_count(client_ip): - """增加IP生成次数""" - if client_ip not in util.IP_Generation_Count: - util.IP_Generation_Count[client_ip] = 0 - util.IP_Generation_Count[client_ip] += 1 - return util.IP_Generation_Count[client_ip] - - -def get_ip_phase(client_ip): - """获取IP当前阶段""" - count = get_ip_generation_count(client_ip) - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - - config = rate_limit_config.get_country_config(country) - max_limit = config.get("max_limit", rate_limit_config.BLOCKED_LIMIT) - - if config.get("is_restricted"): - if count >= max_limit: - return 'blocked' - elif count >= max(0, max_limit - 2): - return 'rate_limit_3' - elif count >= max(0, max_limit - 3): - return 'rate_limit_2' - elif count >= max(0, max_limit - 4): - return 'rate_limit_1' - else: - return 'free' - - free_limit = config.get("free_phase_limit", rate_limit_config.FREE_PHASE_LIMIT) - - if count < free_limit: - return 'free' - elif count < rate_limit_config.SLOW_PHASE_1_LIMIT: - return 'rate_limit_1' - elif count < rate_limit_config.SLOW_PHASE_2_LIMIT: - return 'rate_limit_2' - elif count < max_limit: - return 'rate_limit_3' - else: - return 'blocked' - - -def check_rate_limit_for_phase(client_ip, phase): - """检查阶段的速率限制""" - if phase not in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - return False, 0, 0 - - if phase == 'rate_limit_1': - window_minutes = rate_limit_config.PHASE_1_WINDOW_MINUTES - elif phase == 'rate_limit_2': - window_minutes = rate_limit_config.PHASE_2_WINDOW_MINUTES - else: - window_minutes = rate_limit_config.PHASE_3_WINDOW_MINUTES - - current_time = time.time() - window_key = f"{client_ip}_{phase}" - - if window_key in util.IP_Rate_Limit_Track: - track_data = util.IP_Rate_Limit_Track[window_key] - if current_time - track_data['start_time'] > window_minutes * 60: - util.IP_Rate_Limit_Track[window_key] = { - 'count': 0, - 'start_time': current_time, - 'last_generation': current_time - } - else: - util.IP_Rate_Limit_Track[window_key] = { - 'count': 0, - 'start_time': current_time, - 'last_generation': current_time - } - - track_data = util.IP_Rate_Limit_Track[window_key] - - if track_data['count'] >= rate_limit_config.MAX_IMAGES_PER_WINDOW: - elapsed = current_time - track_data['start_time'] - wait_time = (window_minutes * 60) - elapsed - wait_minutes = max(0, wait_time / 60) - return True, wait_minutes, track_data['count'] - - return False, 0, track_data['count'] - - -def update_country_stats(client_ip): - """更新国家使用统计""" - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info["country"] - - if country not in util.Country_Usage_Stats: - util.Country_Usage_Stats[country] = 0 - util.Country_Usage_Stats[country] += 1 - - util.Total_Request_Count += 1 - - if util.Total_Request_Count % PRINT_STATS_INTERVAL == 0: - print("\n" + "="*60) - print(f"📊 国家使用统计 (总请求数: {util.Total_Request_Count})") - print("="*60) - sorted_stats = sorted(util.Country_Usage_Stats.items(), key=lambda x: x[1], reverse=True) - for country_name, count in sorted_stats: - percentage = (count / util.Total_Request_Count) * 100 - print(f" {country_name}: {count} 次 ({percentage:.1f}%)") - print("="*60 + "\n") - - -def record_generation_attempt(client_ip, phase): - """记录生成尝试""" - increment_ip_generation_count(client_ip) - update_country_stats(client_ip) - - if phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - window_key = f"{client_ip}_{phase}" - current_time = time.time() - - if window_key in util.IP_Rate_Limit_Track: - util.IP_Rate_Limit_Track[window_key]['count'] += 1 - util.IP_Rate_Limit_Track[window_key]['last_generation'] = current_time - else: - util.IP_Rate_Limit_Track[window_key] = { - 'count': 1, - 'start_time': current_time, - 'last_generation': current_time - } - - -def apply_gaussian_blur_to_image_url(image_url, blur_strength=50): - """对图片URL应用高斯模糊""" - try: - import requests - from PIL import Image, ImageFilter - import io - - response = requests.get(image_url, timeout=30) - if response.status_code != 200: - return None - - image_data = io.BytesIO(response.content) - image = Image.open(image_data) - blurred_image = image.filter(ImageFilter.GaussianBlur(radius=blur_strength)) - - return blurred_image - except Exception as e: - print(f"⚠️ Failed to apply Gaussian blur: {e}") - return None - - -# Initialize NSFW detector -try: - nsfw_detector = NSFWDetector() - print("✅ NSFW detector initialized successfully") -except Exception as e: - print(f"❌ NSFW detector initialization failed: {e}") - nsfw_detector = None - - -# ============================================================================= -# 通用NSFW检测函数 -# ============================================================================= -def check_nsfw_for_input(input_image, country, current_count, client_ip): - """ - 检测输入图片是否为NSFW内容 - 返回: (is_nsfw, should_check) - 是否为NSFW,是否需要检测 - """ - if input_image is None: - return False, False - - if not rate_limit_config.should_enable_nsfw(country, current_count): - return False, False - - if nsfw_detector is None: - return False, False - - try: - nsfw_result = nsfw_detector.predict_pil_label_only(input_image) - if nsfw_result.lower() == "nsfw": - print(f"🔍 Input NSFW detected: ❌❌❌ - IP: {client_ip}") - return True, True - return False, True - except Exception as e: - print(f"⚠️ NSFW detection failed: {e}") - return False, True - - -def check_nsfw_for_result(result_url, country, current_count): - """ - 检测结果图片是否为NSFW内容 - 返回: is_nsfw - """ - if nsfw_detector is None: - return False - - if not rate_limit_config.should_enable_nsfw(country, current_count): - return False - - try: - is_nsfw, _ = download_and_check_result_nsfw(result_url, nsfw_detector) - return is_nsfw - except Exception: - return False - - -def create_nsfw_blurred_response(result_url, redirect_url): - """ - 创建NSFW模糊处理后的响应HTML - 使用CSS filter动画实现延迟模糊效果(时间由NSFW_BLUR_SECONDS配置) - 返回: (result_html, action_html) 或 None 如果模糊失败 - """ - # 创建唯一ID - unique_id = f"nsfw_{int(time.time() * 1000)}" - - # 计算动画时间点(在总时长中的百分比) - # 前N秒保持清晰,然后在1秒内过渡到模糊 - total_duration = NSFW_BLUR_SECONDS + 1 - clear_percentage = (NSFW_BLUR_SECONDS / total_duration) * 100 - - # 使用CSS filter实现模糊效果,从清晰到模糊的动画 - result_html = f""" - -
- Result Image -
- """ - - nsfw_button_html = f""" -
- 🔒 Unlock Full Image - Visit Website -
- """ - return result_html, nsfw_button_html - - -# ============================================================================= -# 通用处理函数 -# ============================================================================= -def create_blocked_button_html(url): - """创建封锁状态的按钮HTML""" - return f""" -
- 🚀 Unlimited Generation -
- """ - - -def create_rate_limit_button_html(url): - """创建限速状态的按钮HTML""" - return f""" -
- ⏰ Skip Wait - Unlimited Generation -
- """ - - -def create_task_locked_button_html(url, remaining_seconds): - """创建任务锁定状态的按钮HTML""" - remaining_minutes = int(remaining_seconds / 60) + 1 - return f""" -
- ⏰ Process Multiple Tasks -
- """ - - -def create_like_tip_html(): - """创建点击红心提示HTML""" - return """ -
-
👉 Click the ❤️ Like button to unlock more free trial attempts!
-
- """ - - -def create_result_image_html(image_url, max_height=500): - """创建结果图片HTML(直接从源站加载,不经过HF Space服务器)""" - if not image_url: - return "" - return f""" -
- Result Image -
- """ - - -def pil_image_to_base64_html(pil_image, max_height=500): - """将PIL Image转为base64嵌入HTML(不经过服务器中转)""" - import io - import base64 - - try: - if pil_image is None: - return "" - - # 转为JPEG格式 - img_buffer = io.BytesIO() - if pil_image.mode != 'RGB': - pil_image = pil_image.convert('RGB') - pil_image.save(img_buffer, format='JPEG', quality=85) - img_data = img_buffer.getvalue() - - # 编码为base64 - img_base64 = base64.b64encode(img_data).decode('utf-8') - - return f""" -
- Blurred Result -
- """ - except Exception as e: - print(f"⚠️ Failed to convert PIL to base64 HTML: {e}") - return "" - - -def create_action_buttons_html(task_uuid, input_image_url, result_url, prompt, is_restricted, show_like_tip): - """创建操作按钮HTML""" - action_buttons_html = "" - - if task_uuid and not is_restricted: - task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" - from urllib.parse import quote - encoded_prompt = quote(prompt.strip()) if prompt else "" - encoded_result_url = quote(result_url) if result_url else "" - i2v_url = f"https://omnicreator.net/image-to-video?input_image={input_image_url}&end_image={result_url}&prompt={encoded_prompt}" - face_swap_url = f"https://omnicreator.net/face-swap?user_image={encoded_result_url}" - - action_buttons_html = f""" -
- 🎥 Convert to Video - 🧍 Swap Face - 💾 Download HD Image -
- """ - - if show_like_tip or is_restricted: - action_buttons_html += create_like_tip_html() - - return action_buttons_html - - -# ============================================================================= -# 单图编辑接口 -# ============================================================================= -def edit_image_interface(input_image, prompt, request: gr.Request, progress=gr.Progress()): - """单图编辑接口""" - try: - client_ip = request.client.host - x_forwarded_for = dict(request.headers).get('x-forwarded-for') - if x_forwarded_for: - # 只取第一个有效IP - client_ip = x_forwarded_for.split(",")[0].strip() - if client_ip not in util.IP_Dict: - util.IP_Dict[client_ip] = 0 - util.IP_Dict[client_ip] += 1 - - # 立即查询并缓存IP归属地,避免后续一直为Unknown - query_ip_country(client_ip) - - if input_image is None: - return "", None, "❌ Please upload an image first", gr.update(visible=False) - - if not prompt or prompt.strip() == "": - return "", None, "❌ Please enter editing instructions", gr.update(visible=False) - - if len(prompt.strip()) <= 3: - return "", None, "❌ Prompt is too short. Please provide detailed editing instructions.", gr.update(visible=False) - except Exception as e: - return "", None, "❌ Failed to process request", gr.update(visible=False) - - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - current_phase = get_ip_phase(client_ip) - current_count = get_ip_generation_count(client_ip) - is_restricted = rate_limit_config.is_restricted_country(country) - - show_like_tip = rate_limit_config.should_show_like_tip(country, current_count) - redirect_url = rate_limit_config.get_redirect_url(country, "en") - - # 检查是否有活动任务(3分钟内) - is_active, remaining_seconds = has_active_task(client_ip) - if is_active: - return "", None, f"⏰ You have a task in progress. Please wait until it is finished before submitting a new task.", gr.update(value=create_task_locked_button_html(redirect_url, remaining_seconds), visible=True) - - # 检查是否被封锁 - if current_phase == 'blocked': - return "", None, "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - # 检查速率限制 - if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - is_limited, wait_minutes, window_count = check_rate_limit_for_phase(client_ip, current_phase) - if is_limited: - wait_minutes_int = int(wait_minutes) + 1 - return "", None, f"⏰ Rate limit reached. Please wait {wait_minutes_int} minutes before trying again.", gr.update(value=create_rate_limit_button_html(redirect_url), visible=True) - - # NSFW检测 - is_nsfw_task, _ = check_nsfw_for_input(input_image, country, current_count, client_ip) - - def progress_callback(message): - try: - if progress is not None: - if "Queue:" in message or "tasks ahead" in message: - progress(0.1, desc=message) - elif "Processing" in message or "AI is processing" in message: - progress(0.7, desc=message) - elif "Generating" in message or "Almost done" in message: - progress(0.9, desc=message) - else: - progress(0.5, desc=message) - except Exception: - pass - - try: - # 设置任务锁定 - set_active_task(client_ip, True) - - # 判断是否使用高速高清模式 - use_high_speed_mode = current_count < rate_limit_config.HIGH_SPEED_HD_COUNT - task_priority = 1 if use_high_speed_mode else 0 - record_generation_attempt(client_ip, current_phase) - updated_count = get_ip_generation_count(client_ip) - - mode_info = "High-Speed HD" if use_high_speed_mode else "Standard" - print(f"✅ [Single Edit] Processing started - IP: {client_ip}, Country: {country}, Phase: {current_phase}, Count: {updated_count}, Mode: {mode_info}, Prompt: {prompt.strip()[:50]}...", flush=True) - - input_image_url, result_url, message, task_uuid = process_image_edit(input_image, prompt.strip(), None, progress_callback, priority=task_priority, client_ip=client_ip) - - if message and message.startswith("HF_LIMIT_EXCEEDED:"): - return "", None, "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if result_url: - # 检查结果图片NSFW - if check_nsfw_for_result(result_url, country, current_count): - is_nsfw_task = True - - # 应用模糊处理 - if is_nsfw_task: - nsfw_response = create_nsfw_blurred_response(result_url, redirect_url) - if nsfw_response: - blurred_html, nsfw_button_html = nsfw_response - set_active_task(client_ip, False) # 解锁任务 - # 返回None作为URL,防止用户通过"Use as Input"绕过NSFW检测 - return blurred_html, None, "⚠️ Content detected by filter. Visit website to unlock.", gr.update(value=nsfw_button_html, visible=True) - - # 正常情况:返回HTML显示图片 - result_html = create_result_image_html(result_url) - action_html = create_action_buttons_html(task_uuid, input_image_url, result_url, prompt, is_restricted, show_like_tip) - set_active_task(client_ip, False) # 解锁任务 - print(f"✅ [Single Edit] Completed - IP: {client_ip}, TaskUUID: {task_uuid}, Result: {result_url}...", flush=True) - - # 添加模式提示 - if use_high_speed_mode: - status_msg = f"✅ Completed with High-Speed HD Mode: {message}" - else: - status_msg = f"✅ Completed with Standard Mode: {message}\n💡 Visit https://omnicreator.net for High-Speed HD generation" - - return result_html, result_url, status_msg, gr.update(value=action_html, visible=True) - else: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Single Edit] Failed - IP: {client_ip}, Error: {message}", flush=True) - return "", None, f"❌ Processing failed: {message}", gr.update(visible=False) - - except Exception as e: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Single Edit] Exception - IP: {client_ip}, Error: {str(e)}", flush=True) - return "", None, f"❌ Error: {str(e)}", gr.update(visible=False) - - -# ============================================================================= -# 多图编辑接口 -# ============================================================================= -def multi_image_edit_interface(input_image1, input_image2, input_image3, prompt, aspect_ratio, request: gr.Request, progress=gr.Progress()): - """多图编辑接口""" - try: - client_ip = request.client.host - x_forwarded_for = dict(request.headers).get('x-forwarded-for') - if x_forwarded_for: - # 只取第一个有效IP - client_ip = x_forwarded_for.split(",")[0].strip() - # 立即查询并缓存IP归属地,避免后续一直为Unknown - query_ip_country(client_ip) - - images = [img for img in [input_image1, input_image2, input_image3] if img is not None] - - if len(images) < 2: - return "", "❌ Please upload at least 2 images", gr.update(visible=False) - - if not prompt or prompt.strip() == "": - return "", "❌ Please enter editing instructions", gr.update(visible=False) - - if len(prompt.strip()) <= 3: - return "", "❌ Prompt is too short. Please provide detailed editing instructions.", gr.update(visible=False) - except Exception as e: - return "", "❌ Failed to process request", gr.update(visible=False) - - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - country_config = rate_limit_config.get_country_config(country) - current_phase = get_ip_phase(client_ip) - current_count = get_ip_generation_count(client_ip) - is_restricted = rate_limit_config.is_restricted_country(country) - - show_like_tip = rate_limit_config.should_show_like_tip(country, current_count) - redirect_url = rate_limit_config.get_redirect_url(country, "en") - - # 检查是否有活动任务(3分钟内) - is_active, remaining_seconds = has_active_task(client_ip) - if is_active: - return "", f"⏰ You have a task in progress. Please wait until it is finished before submitting a new task.", gr.update(value=create_task_locked_button_html(redirect_url, remaining_seconds), visible=True) - - # 根据用户次数和国家配置决定是否使用高速高清模式 - high_speed_hd_count = country_config.get("high_speed_hd_count", rate_limit_config.HIGH_SPEED_HD_COUNT) - use_high_speed_mode = current_count < high_speed_hd_count - - # 检查是否被封锁 - if current_phase == 'blocked': - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - is_limited, wait_minutes, _ = check_rate_limit_for_phase(client_ip, current_phase) - if is_limited: - wait_minutes_int = int(wait_minutes) + 1 - return "", f"⏰ Rate limit reached. Please wait {wait_minutes_int} minutes before trying again.", gr.update(value=create_rate_limit_button_html(redirect_url), visible=True) - - # NSFW检测 - 检测所有输入图片 - is_nsfw_task = False - for img in images: - if img is not None: - nsfw_detected, _ = check_nsfw_for_input(img, country, current_count, client_ip) - if nsfw_detected: - is_nsfw_task = True - break - - # 决定任务参数(在progress_callback之前定义) - if use_high_speed_mode: - task_priority = 1 - is_sr = 1 # 启用超分 - mode_info = "🚀 High-Speed + HD Mode" - else: - task_priority = 0 - is_sr = 0 # 关闭超分 - mode_info = "⏱️ Low Resolution Mode | Visit https://omnicreator.net for High-Speed HD generation" - - def progress_callback(message): - try: - if progress is not None: - progress(0.5, desc=f"{mode_info} | {message}") - except Exception: - pass - - try: - # 设置任务锁定 - set_active_task(client_ip, True) - - record_generation_attempt(client_ip, current_phase) - updated_count = get_ip_generation_count(client_ip) - - print(f"✅ [Multi Edit] Processing started - IP: {client_ip}, Country: {country}, Phase: {current_phase}, Count: {updated_count}, Priority: {task_priority}, Images: {len(images)}, Prompt: {prompt.strip()[:50]}...", flush=True) - - input_url, result_url, message, task_uuid = process_multi_image_edit(images, prompt.strip(), progress_callback, priority=task_priority, client_ip=client_ip, is_sr=is_sr) - - if message and message.startswith("HF_LIMIT_EXCEEDED:"): - set_active_task(client_ip, False) # 解锁任务 - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if result_url: - # 检查结果图片NSFW - if check_nsfw_for_result(result_url, country, current_count): - is_nsfw_task = True - - # 应用模糊处理 - if is_nsfw_task: - nsfw_response = create_nsfw_blurred_response(result_url, redirect_url) - if nsfw_response: - blurred_html, nsfw_button_html = nsfw_response - set_active_task(client_ip, False) - return blurred_html, "⚠️ Content detected by filter. Visit website to unlock.", gr.update(value=nsfw_button_html, visible=True) - - result_html = create_result_image_html(result_url) - action_html = create_action_buttons_html(task_uuid, input_url, result_url, prompt, is_restricted, show_like_tip) - - # 添加模式提示信息 - if use_high_speed_mode: - mode_tip = "🚀 Generated with High-Speed + HD Mode" - else: - mode_tip = "⏱️ Generated with Low Resolution Mode | Visit https://omnicreator.net for High-Speed HD generation" - status_message = f"{mode_tip}\n✅ Completed: {message}" - - set_active_task(client_ip, False) # 解锁任务 - print(f"✅ [Multi Edit] Completed - IP: {client_ip}, TaskUUID: {task_uuid}, Result: {result_url}...", flush=True) - return result_html, status_message, gr.update(value=action_html, visible=True) - else: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Multi Edit] Failed - IP: {client_ip}, Error: {message}", flush=True) - return "", f"❌ Processing failed: {message}", gr.update(visible=False) - - except Exception as e: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Multi Edit] Exception - IP: {client_ip}, Error: {str(e)}", flush=True) - return "", f"❌ Error: {str(e)}", gr.update(visible=False) - - -# ============================================================================= -# 文生图接口 -# ============================================================================= -def text_to_image_interface(prompt, aspect_ratio, request: gr.Request, progress=gr.Progress()): - """文生图接口""" - try: - client_ip = request.client.host - x_forwarded_for = dict(request.headers).get('x-forwarded-for') - if x_forwarded_for: - # 只取第一个有效IP - client_ip = x_forwarded_for.split(",")[0].strip() - # 立即查询并缓存IP归属地,避免后续一直为Unknown - query_ip_country(client_ip) - - if not prompt or prompt.strip() == "": - return "", "❌ Please enter a prompt", gr.update(visible=False) - - if len(prompt.strip()) <= 3: - return "", "❌ Prompt is too short. Please provide detailed description.", gr.update(visible=False) - except Exception as e: - return "", "❌ Failed to process request", gr.update(visible=False) - - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - country_config = rate_limit_config.get_country_config(country) - current_phase = get_ip_phase(client_ip) - current_count = get_ip_generation_count(client_ip) - is_restricted = rate_limit_config.is_restricted_country(country) - - show_like_tip = rate_limit_config.should_show_like_tip(country, current_count) - redirect_url = rate_limit_config.get_redirect_url(country, "en") - - # 检查是否有活动任务(3分钟内) - is_active, remaining_seconds = has_active_task(client_ip) - if is_active: - return "", f"⏰ You have a task in progress. Please wait until it is finished before submitting a new task.", gr.update(value=create_task_locked_button_html(redirect_url, remaining_seconds), visible=True) - - # 根据用户次数和国家配置决定是否使用高速高清模式 - high_speed_hd_count = country_config.get("high_speed_hd_count", rate_limit_config.HIGH_SPEED_HD_COUNT) - use_high_speed_mode = current_count < high_speed_hd_count - - # 检查是否被封锁 - if current_phase == 'blocked': - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - is_limited, wait_minutes, _ = check_rate_limit_for_phase(client_ip, current_phase) - if is_limited: - wait_minutes_int = int(wait_minutes) + 1 - return "", f"⏰ Rate limit reached. Please wait {wait_minutes_int} minutes before trying again.", gr.update(value=create_rate_limit_button_html(redirect_url), visible=True) - - # 决定任务参数(在progress_callback之前定义,以便在回调中使用) - if use_high_speed_mode: - task_priority = 1 - is_sr = 1 # 启用超分 - target_area = 800 * 800 # 高清分辨率 - mode_info = "🚀 High-Speed + HD Mode (800×800)" - else: - task_priority = 0 - is_sr = 0 # 关闭超分 - target_area = 576 * 576 # 标准分辨率 - mode_info = "⏱️ Low Resolution Mode (576×576) | Visit https://omnicreator.net for High-Speed HD generation" - - def progress_callback(message): - try: - if progress is not None: - # 添加模式提示 - progress(0.5, desc=f"{mode_info} | {message}") - except Exception: - pass - - try: - # 设置任务锁定 - set_active_task(client_ip, True) - - record_generation_attempt(client_ip, current_phase) - updated_count = get_ip_generation_count(client_ip) - - print(f"✅ [Text2Image] Processing started - IP: {client_ip}, Country: {country}, Phase: {current_phase}, Count: {updated_count}, Priority: {task_priority}, Ratio: {aspect_ratio}, Prompt: {prompt.strip()[:50]}...", flush=True) - - result_url, message, task_uuid = process_text_to_image(prompt.strip(), aspect_ratio, progress_callback, priority=task_priority, client_ip=client_ip, is_sr=is_sr, target_area=target_area) - - if message and message.startswith("HF_LIMIT_EXCEEDED:"): - set_active_task(client_ip, False) - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if result_url: - # 检查结果图片NSFW - is_nsfw_task = check_nsfw_for_result(result_url, country, current_count) - - # 应用模糊处理 - if is_nsfw_task: - nsfw_response = create_nsfw_blurred_response(result_url, redirect_url) - if nsfw_response: - blurred_html, nsfw_button_html = nsfw_response - set_active_task(client_ip, False) - return blurred_html, "⚠️ Content detected by filter. Visit website to unlock.", gr.update(value=nsfw_button_html, visible=True) - - result_html = create_result_image_html(result_url) - - # 为T2I生成3个操作按钮 - action_html = "" - if task_uuid and not is_restricted: - task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" - from urllib.parse import quote - encoded_prompt = quote(prompt.strip()) if prompt else "" - encoded_result_url = quote(result_url) if result_url else "" - i2v_url = f"https://omnicreator.net/image-to-video?input_image={encoded_result_url}&prompt={encoded_prompt}" - face_swap_url = f"https://omnicreator.net/face-swap?user_image={encoded_result_url}" - - action_html = f""" -
- 🎥 Convert to Video - 🧍 Swap Face - 💾 Download HD Image -
- """ - - if show_like_tip or is_restricted: - action_html += create_like_tip_html() - - # 添加模式提示信息 - if use_high_speed_mode: - mode_tip = "🚀 Generated with High-Speed + HD Mode (800×800)" - else: - mode_tip = "⏱️ Generated with Low Resolution Mode (576×576) | Visit https://omnicreator.net for High-Speed HD generation" - status_message = f"{mode_tip}\n✅ Completed: {message}" - - set_active_task(client_ip, False) # 解锁任务 - print(f"✅ [Text2Image] Completed - IP: {client_ip}, TaskUUID: {task_uuid}, Result: {result_url}...", flush=True) - return result_html, status_message, gr.update(value=action_html, visible=True) - else: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Text2Image] Failed - IP: {client_ip}, Error: {message}", flush=True) - return "", f"❌ Processing failed: {message}", gr.update(visible=False) - - except Exception as e: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Text2Image] Exception - IP: {client_ip}, Error: {str(e)}", flush=True) - return "", f"❌ Error: {str(e)}", gr.update(visible=False) - - -# ============================================================================= -# 图片放大接口 -# ============================================================================= -def image_upscale_interface(input_image, request: gr.Request, progress=gr.Progress()): - """图片放大接口""" - try: - client_ip = request.client.host - x_forwarded_for = dict(request.headers).get('x-forwarded-for') - if x_forwarded_for: - # 只取第一个有效IP - client_ip = x_forwarded_for.split(",")[0].strip() - # 立即查询并缓存IP归属地,避免后续一直为Unknown - query_ip_country(client_ip) - - if input_image is None: - return "", "❌ Please upload an image first", gr.update(visible=False) - except Exception as e: - return "", "❌ Failed to process request", gr.update(visible=False) - - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - current_phase = get_ip_phase(client_ip) - current_count = get_ip_generation_count(client_ip) - is_restricted = rate_limit_config.is_restricted_country(country) - - show_like_tip = rate_limit_config.should_show_like_tip(country, current_count) - redirect_url = rate_limit_config.get_redirect_url(country, "en") - - # 检查是否有活动任务(3分钟内) - is_active, remaining_seconds = has_active_task(client_ip) - if is_active: - return "", f"⏰ You have a task in progress. Please wait until it is finished before submitting a new task.", gr.update(value=create_task_locked_button_html(redirect_url, remaining_seconds), visible=True) - - # 检查是否被封锁 - if current_phase == 'blocked': - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - is_limited, wait_minutes, _ = check_rate_limit_for_phase(client_ip, current_phase) - if is_limited: - wait_minutes_int = int(wait_minutes) + 1 - return "", f"⏰ Rate limit reached. Please wait {wait_minutes_int} minutes before trying again.", gr.update(value=create_rate_limit_button_html(redirect_url), visible=True) - - # NSFW检测 - is_nsfw_task, _ = check_nsfw_for_input(input_image, country, current_count, client_ip) - - def progress_callback(message): - try: - if progress is not None: - progress(0.5, desc=message) - except Exception: - pass - - try: - # 设置任务锁定 - set_active_task(client_ip, True) - - # 判断是否使用高速高清模式 - use_high_speed_mode = current_count < rate_limit_config.HIGH_SPEED_HD_COUNT - task_priority = 1 if use_high_speed_mode else 0 - record_generation_attempt(client_ip, current_phase) - updated_count = get_ip_generation_count(client_ip) - - mode_info = "High-Speed HD" if use_high_speed_mode else "Standard" - print(f"✅ [Upscale] Processing started - IP: {client_ip}, Country: {country}, Phase: {current_phase}, Count: {updated_count}, Mode: {mode_info}", flush=True) - - input_url, result_url, message, task_uuid = process_image_upscale(input_image, progress_callback, priority=task_priority, client_ip=client_ip) - - if message and message.startswith("HF_LIMIT_EXCEEDED:"): - set_active_task(client_ip, False) - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if result_url: - # 检查结果图片NSFW - if check_nsfw_for_result(result_url, country, current_count): - is_nsfw_task = True - - # 应用模糊处理 - if is_nsfw_task: - nsfw_response = create_nsfw_blurred_response(result_url, redirect_url) - if nsfw_response: - blurred_html, nsfw_button_html = nsfw_response - set_active_task(client_ip, False) - return blurred_html, "⚠️ Content detected by filter. Visit website to unlock.", gr.update(value=nsfw_button_html, visible=True) - - result_html = create_result_image_html(result_url) - action_html = "" - if task_uuid and not is_restricted: - task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" - action_html = f""" -
- 💾 Download HD Image -
- """ - if show_like_tip or is_restricted: - action_html += create_like_tip_html() - set_active_task(client_ip, False) # 解锁任务 - print(f"✅ [Upscale] Completed - IP: {client_ip}, TaskUUID: {task_uuid}, Result: {result_url}...", flush=True) - - # 添加模式提示 - if use_high_speed_mode: - status_msg = f"✅ Completed with High-Speed HD Mode: {message}" - else: - status_msg = f"✅ Completed with Standard Mode: {message}\n💡 Visit https://omnicreator.net for High-Speed HD generation" - - return result_html, status_msg, gr.update(value=action_html, visible=bool(action_html)) - else: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Upscale] Failed - IP: {client_ip}, Error: {message}", flush=True) - return "", f"❌ Processing failed: {message}", gr.update(visible=False) - - except Exception as e: - set_active_task(client_ip, False) # 解锁任务 - print(f"❌ [Upscale] Exception - IP: {client_ip}, Error: {str(e)}", flush=True) - return "", f"❌ Error: {str(e)}", gr.update(visible=False) - - -# ============================================================================= -# 换脸接口 -# ============================================================================= -def face_swap_interface(target_editor, source_face_image, request: gr.Request, progress=gr.Progress()): - """换脸接口 - 支持从ImageEditor提取mask""" - try: - client_ip = request.client.host - x_forwarded_for = dict(request.headers).get('x-forwarded-for') - if x_forwarded_for: - # 只取第一个有效IP - client_ip = x_forwarded_for.split(",")[0].strip() - # 立即查询并缓存IP归属地,避免后续一直为Unknown - query_ip_country(client_ip) - - # 从ImageEditor提取图片和mask - target_image = None - mask_image = None - - if target_editor is None: - return "", "❌ Please upload a target image", gr.update(visible=False) - - # ImageEditor返回字典格式: {"background": PIL, "layers": [PIL], "composite": PIL} - if isinstance(target_editor, dict): - target_image = target_editor.get("background") - layers = target_editor.get("layers", []) - - # 从layers提取mask - if layers and len(layers) > 0: - from util import create_mask_from_layers - mask_image = create_mask_from_layers(target_image, layers) - else: - # 兼容直接传入PIL Image的情况 - target_image = target_editor - - if target_image is None: - return "", "❌ Please upload a target image", gr.update(visible=False) - - if source_face_image is None: - return "", "❌ Please upload a source face image", gr.update(visible=False) - except Exception as e: - return "", "❌ Failed to process request", gr.update(visible=False) - - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - current_phase = get_ip_phase(client_ip) - current_count = get_ip_generation_count(client_ip) - is_restricted = rate_limit_config.is_restricted_country(country) - - show_like_tip = rate_limit_config.should_show_like_tip(country, current_count) - redirect_url = rate_limit_config.get_redirect_url(country, "en") - - # 检查活动任务 - is_active, remaining_seconds = has_active_task(client_ip) - if is_active: - return "", f"⏰ Task in progress. Wait {int(remaining_seconds/60)+1} min.", gr.update(value=create_task_locked_button_html(redirect_url, remaining_seconds), visible=True) - - # 检查是否被封锁 - if current_phase == 'blocked': - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - is_limited, wait_minutes, _ = check_rate_limit_for_phase(client_ip, current_phase) - if is_limited: - return "", f"⏰ Rate limit reached. Please wait {int(wait_minutes)+1} minutes before trying again.", gr.update(value=create_rate_limit_button_html(redirect_url), visible=True) - - # NSFW检测 - 检测target和source图片 - is_nsfw_task, _ = check_nsfw_for_input(target_image, country, current_count, client_ip) - if not is_nsfw_task: - is_nsfw_task, _ = check_nsfw_for_input(source_face_image, country, current_count, client_ip) - - def progress_callback(message): - try: - if progress is not None: - progress(0.5, desc=message) - except Exception: - pass - - try: - set_active_task(client_ip, True) - - # 判断是否使用高速高清模式 - use_high_speed_mode = current_count < rate_limit_config.HIGH_SPEED_HD_COUNT - task_priority = 1 if use_high_speed_mode else 0 - record_generation_attempt(client_ip, current_phase) - updated_count = get_ip_generation_count(client_ip) - - has_mask = "Yes" if mask_image is not None else "No" - mode_info = "High-Speed HD" if use_high_speed_mode else "Standard" - print(f"✅ [FaceSwap] Processing started - IP: {client_ip}, Country: {country}, Phase: {current_phase}, Count: {updated_count}, Mode: {mode_info}, HasMask: {has_mask}", flush=True) - - input_url, result_url, message, task_uuid = process_face_swap(target_image, source_face_image, mask_image, progress_callback, priority=task_priority, client_ip=client_ip) - - if message and message.startswith("HF_LIMIT_EXCEEDED:"): - set_active_task(client_ip, False) - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if result_url: - # 检查结果图片NSFW - if check_nsfw_for_result(result_url, country, current_count): - is_nsfw_task = True - - # 应用模糊处理 - if is_nsfw_task: - nsfw_response = create_nsfw_blurred_response(result_url, redirect_url) - if nsfw_response: - blurred_html, nsfw_button_html = nsfw_response - set_active_task(client_ip, False) - return blurred_html, "⚠️ Content detected by filter. Visit website to unlock.", gr.update(value=nsfw_button_html, visible=True) - - result_html = create_result_image_html(result_url) - action_html = "" - if task_uuid and not is_restricted: - task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" - action_html = f""" -
- 💾 Download HD Image -
- """ - if show_like_tip or is_restricted: - action_html += create_like_tip_html() - set_active_task(client_ip, False) - print(f"✅ [FaceSwap] Completed - IP: {client_ip}, TaskUUID: {task_uuid}, Result: {result_url}...", flush=True) - - # 添加模式提示 - if use_high_speed_mode: - status_msg = f"✅ Completed with High-Speed HD Mode: {message}" - else: - status_msg = f"✅ Completed with Standard Mode: {message}\n💡 Visit https://omnicreator.net for High-Speed HD generation" - - return result_html, status_msg, gr.update(value=action_html, visible=bool(action_html)) - else: - set_active_task(client_ip, False) - print(f"❌ [FaceSwap] Failed - IP: {client_ip}, Error: {message}", flush=True) - return "", f"❌ Processing failed: {message}", gr.update(visible=False) - - except Exception as e: - set_active_task(client_ip, False) - print(f"❌ [FaceSwap] Exception - IP: {client_ip}, Error: {str(e)}", flush=True) - return "", f"❌ Error: {str(e)}", gr.update(visible=False) - - -# ============================================================================= -# 去水印接口 -# ============================================================================= -def watermark_removal_interface(input_image, force_removal, request: gr.Request, progress=gr.Progress()): - """去水印接口 - 支持强力模式""" - try: - client_ip = request.client.host - x_forwarded_for = dict(request.headers).get('x-forwarded-for') - if x_forwarded_for: - # 只取第一个有效IP - client_ip = x_forwarded_for.split(",")[0].strip() - # 立即查询并缓存IP归属地,避免后续一直为Unknown - query_ip_country(client_ip) - - if input_image is None: - return "", "❌ Please upload an image", gr.update(visible=False) - except Exception as e: - return "", "❌ Failed to process request", gr.update(visible=False) - - geo_info = util.IP_Country_Cache.get(client_ip, {"country": "Unknown"}) - country = geo_info.get("country", "Unknown") - current_phase = get_ip_phase(client_ip) - current_count = get_ip_generation_count(client_ip) - is_restricted = rate_limit_config.is_restricted_country(country) - - show_like_tip = rate_limit_config.should_show_like_tip(country, current_count) - redirect_url = rate_limit_config.get_redirect_url(country, "en") - - # 检查活动任务 - is_active, remaining_seconds = has_active_task(client_ip) - if is_active: - return "", f"⏰ Task in progress. Please wait until it is finished before submitting a new task.", gr.update(value=create_task_locked_button_html(redirect_url, remaining_seconds), visible=True) - - # 检查是否被封锁 - if current_phase == 'blocked': - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: - is_limited, wait_minutes, _ = check_rate_limit_for_phase(client_ip, current_phase) - if is_limited: - return "", f"⏰ Rate limit reached. Please wait {int(wait_minutes)+1} minutes before trying again.", gr.update(value=create_rate_limit_button_html(redirect_url), visible=True) - - # NSFW检测 - is_nsfw_task, _ = check_nsfw_for_input(input_image, country, current_count, client_ip) - - def progress_callback(message): - try: - if progress is not None: - mode_text = "💪 AI Force Mode | " if force_removal else "" - progress(0.5, desc=f"{mode_text}🧽 Removing watermark... | {message}") - except Exception: - pass - - try: - set_active_task(client_ip, True) - - # 判断是否使用高速高清模式 - use_high_speed_mode = current_count < rate_limit_config.HIGH_SPEED_HD_COUNT - task_priority = 1 if use_high_speed_mode else 0 - record_generation_attempt(client_ip, current_phase) - updated_count = get_ip_generation_count(client_ip) - - force_mode = "Yes" if force_removal else "No" - mode_info = "High-Speed HD" if use_high_speed_mode else "Standard" - print(f"✅ [Watermark] Processing started - IP: {client_ip}, Country: {country}, Phase: {current_phase}, Count: {updated_count}, Mode: {mode_info}, ForceMode: {force_mode}", flush=True) - - input_url, result_url, message, task_uuid = process_watermark_removal(input_image, progress_callback, priority=task_priority, client_ip=client_ip, force_removal=force_removal) - - if message and message.startswith("HF_LIMIT_EXCEEDED:"): - set_active_task(client_ip, False) - return "", "🚫 You have reached the free generation limit.", gr.update(value=create_blocked_button_html(redirect_url), visible=True) - - if result_url: - # 检查结果图片NSFW - if check_nsfw_for_result(result_url, country, current_count): - is_nsfw_task = True - - # 应用模糊处理 - if is_nsfw_task: - nsfw_response = create_nsfw_blurred_response(result_url, redirect_url) - if nsfw_response: - blurred_html, nsfw_button_html = nsfw_response - set_active_task(client_ip, False) - return blurred_html, "⚠️ Content detected by filter. Visit website to unlock.", gr.update(value=nsfw_button_html, visible=True) - - result_html = create_result_image_html(result_url) - action_html = "" - if task_uuid and not is_restricted: - task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" - action_html = f""" -
- 💾 Download HD Image - 🚀 Unlimited Removal -
- """ - if show_like_tip or is_restricted: - action_html += create_like_tip_html() - set_active_task(client_ip, False) - print(f"✅ [Watermark] Completed - IP: {client_ip}, TaskUUID: {task_uuid}, Result: {result_url}...", flush=True) - - # 添加模式提示 - if use_high_speed_mode: - status_msg = f"✅ Completed with High-Speed HD Mode: {message}" - else: - status_msg = f"✅ Completed with Standard Mode: {message}\n💡 Visit https://omnicreator.net for High-Speed HD generation" - - return result_html, status_msg, gr.update(value=action_html, visible=bool(action_html)) - else: - set_active_task(client_ip, False) - print(f"❌ [Watermark] Failed - IP: {client_ip}, Error: {message}", flush=True) - return "", f"❌ Processing failed: {message}", gr.update(visible=False) - - except Exception as e: - set_active_task(client_ip, False) - print(f"❌ [Watermark] Exception - IP: {client_ip}, Error: {str(e)}", flush=True) - return "", f"❌ Error: {str(e)}", gr.update(visible=False) - - -# ============================================================================= -# 创建Gradio应用 -# ============================================================================= -def create_app(): - with gr.Blocks( - title="AI Image Editor", - theme=gr.themes.Soft(), - css=""" - .main-container { max-width: 1200px; margin: 0 auto; } - .news-banner-row { margin: 10px auto 15px auto; padding: 0 10px; max-width: 1200px; width: 100% !important; } - .banner-lang-selector { margin-left: auto !important; display: flex !important; justify-content: flex-end !important; } - .upload-area { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; text-align: center; } - .result-area { margin-top: 20px; padding: 20px; border-radius: 10px; background-color: #f8f9fa; } - """ - ) as app: - - # 主标题 - gr.HTML(""" -
-

- 🎨 Omni Editor 2.0 -

-
- """) - - # Banner - gr.HTML(""" - -
- - Please give us a ❤️ if you find it helpful. Free trials refreshed every few days. - -
- """, visible=True) - - # ============================================================================= - # 多Tab界面 - # ============================================================================= - with gr.Tabs() as tabs: - - # ============================================================================= - # Tab 1: 单图编辑 - # ============================================================================= - with gr.Tab("🖼️ Single Image Edit") as single_edit_tab: - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("### 📤 Upload Image") - single_input_image = gr.Image(label="Select image to edit", type="pil", height=512, elem_classes=["upload-area"]) - gr.Markdown("### ✏️ Editing Instructions") - single_prompt = gr.Textbox(label="Prompt", placeholder="Describe what you want to change in the image...", lines=3) - single_edit_btn = gr.Button("🎨 Start Editing", variant="primary", size="lg") - - with gr.Column(scale=1): - gr.Markdown("### 🎯 Editing Result") - single_output_image = gr.HTML(label="Output Image", elem_classes=["result-area"]) - single_output_url = gr.State() # 隐藏state保存URL用于Use as Input - single_use_as_input_btn = gr.Button("🔄 Use as Input", variant="secondary", size="sm") - single_status = gr.Textbox(label="Processing Status", lines=2, interactive=False) - single_action_buttons = gr.HTML(visible=False) - - gr.Markdown("### 💡 Prompt Examples") - with gr.Row(): - for prompt in ["Change background to beach sunset", "Add rainbow in the sky", "Make it look like oil painting", "Remove the background", "Change outfit to red dress"]: - gr.Button(prompt, size="sm").click(lambda p=prompt: p, outputs=single_prompt) - - single_edit_btn.click( - fn=edit_image_interface, - inputs=[single_input_image, single_prompt], - outputs=[single_output_image, single_output_url, single_status, single_action_buttons], - show_progress=True, - concurrency_limit=20 - ) - - def use_result_as_input(url): - if url: - try: - import requests - from PIL import Image - import io - response = requests.get(url, timeout=10) - if response.status_code == 200: - return Image.open(io.BytesIO(response.content)) - except: - pass - return None - - single_use_as_input_btn.click( - fn=use_result_as_input, - inputs=[single_output_url], - outputs=[single_input_image] - ) - - # ============================================================================= - # Tab 2: 多图编辑 - # ============================================================================= - with gr.Tab("🖼️🖼️ Multi-Image Edit") as multi_edit_tab: - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("### 📤 Upload Images") - # 3个图片组件放在同一排 - with gr.Row(): - multi_input_image1 = gr.Image(label="Image 1", type="pil", height=200, elem_classes=["upload-area"]) - multi_input_image2 = gr.Image(label="Image 2", type="pil", height=200, elem_classes=["upload-area"]) - multi_input_image3 = gr.Image(label="Image 3 (Optional)", type="pil", height=200, elem_classes=["upload-area"]) - - gr.Markdown("📐 Output Size") - multi_aspect_ratio = gr.Radio( - choices=["Auto", "16:9", "4:3", "1:1", "3:4", "9:16"], - value="Auto", - label="Select aspect ratio", - info="Choose output dimensions or Auto for original sizing" - ) - - gr.Markdown("### ✏️ Editing Instructions") - multi_prompt = gr.Textbox(label="Prompt", placeholder="Describe how to combine or edit these images...", lines=3) - multi_edit_btn = gr.Button("🎨 Start Multi-Image Editing", variant="primary", size="lg") - - with gr.Column(scale=1): - gr.Markdown("### 🎯 Editing Result") - multi_output_image = gr.HTML(label="Output Image", elem_classes=["result-area"]) - multi_status = gr.Textbox(label="Processing Status", lines=2, interactive=False) - multi_action_buttons = gr.HTML(visible=False) - - gr.Markdown("### 💡 Prompt Examples") - with gr.Row(): - for prompt in ["Merge these two animals into one image", "Combine the elements from both images", "Create a scene with objects from all images"]: - gr.Button(prompt, size="sm").click(lambda p=prompt: p, outputs=multi_prompt) - - multi_edit_btn.click( - fn=multi_image_edit_interface, - inputs=[multi_input_image1, multi_input_image2, multi_input_image3, multi_prompt, multi_aspect_ratio], - outputs=[multi_output_image, multi_status, multi_action_buttons], - show_progress=True, - concurrency_limit=20 - ) - - # ============================================================================= - # Tab 3: 文生图 - # ============================================================================= - with gr.Tab("✨ Text to Image") as t2i_tab: - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("### ✏️ Describe Your Image") - t2i_prompt = gr.Textbox(label="Prompt", placeholder="Describe the image you want to generate...", lines=5) - - gr.Markdown("📐 Output Size") - t2i_aspect_ratio = gr.Radio( - choices=["16:9", "4:3", "1:1", "3:4", "9:16"], - value="1:1", - label="Select aspect ratio", - info="Choose output dimensions for generated image" - ) - - t2i_btn = gr.Button("🎨 Generate Image", variant="primary", size="lg") - - with gr.Column(scale=1): - gr.Markdown("### 🎯 Generated Image") - t2i_output_image = gr.HTML(label="Output Image", elem_classes=["result-area"]) - t2i_output_url = gr.State() # 保存URL用于生成按钮链接 - t2i_status = gr.Textbox(label="Processing Status", lines=2, interactive=False) - t2i_action_buttons = gr.HTML(visible=False) - - gr.Markdown("### 💡 Prompt Examples") - with gr.Row(): - for prompt in ["A beautiful young woman standing by the seaside at sunset", "A majestic dragon flying over a medieval castle", "A cute cat wearing a wizard hat, digital art", "Futuristic city skyline at night with neon lights"]: - gr.Button(prompt, size="sm").click(lambda p=prompt: p, outputs=t2i_prompt) - - t2i_btn.click( - fn=text_to_image_interface, - inputs=[t2i_prompt, t2i_aspect_ratio], - outputs=[t2i_output_image, t2i_status, t2i_action_buttons], - show_progress=True, - concurrency_limit=20 - ) - - # ============================================================================= - # Tab 4: 图片放大 - # ============================================================================= - with gr.Tab("🔍 Image Upscale") as upscale_tab: - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("### 📤 Upload Image") - upscale_input_image = gr.Image(label="Select image to upscale", type="pil", height=400, elem_classes=["upload-area"]) - upscale_btn = gr.Button("🔍 Upscale Image", variant="primary", size="lg") - - with gr.Column(scale=1): - gr.Markdown("### 🎯 Upscaled Result") - upscale_output_image = gr.HTML(label="Output Image", elem_classes=["result-area"]) - upscale_status = gr.Textbox(label="Processing Status", lines=2, interactive=False) - upscale_action_buttons = gr.HTML(visible=False) - - upscale_btn.click( - fn=image_upscale_interface, - inputs=[upscale_input_image], - outputs=[upscale_output_image, upscale_status, upscale_action_buttons], - show_progress=True, - concurrency_limit=20 - ) - - # ============================================================================= - # Tab 5: 换脸 - # ============================================================================= - with gr.Tab("👤 Face Swap") as faceswap_tab: - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("### 📤 Upload Target Image") - gr.Markdown("*If multiple faces exist, draw on the face you want to replace*", elem_classes=["hint-text"]) - faceswap_target_editor = gr.ImageEditor( - label="Target image (draw on face to replace if multiple faces)", - type="pil", - height=350, - brush=gr.Brush(colors=["#FFFFFF"], default_size=80, color_mode="fixed"), - eraser=gr.Eraser(default_size=40), - layers=False, - sources=["upload", "clipboard"] - ) - gr.Markdown("### 📤 Upload Source Face") - faceswap_source = gr.Image(label="Source face image", type="pil", height=256) - faceswap_btn = gr.Button("👤 Swap Face", variant="primary", size="lg") - - with gr.Column(scale=1): - gr.Markdown("### 🎯 Face Swap Result") - faceswap_output = gr.HTML(label="Output Image", elem_classes=["result-area"]) - faceswap_status = gr.Textbox(label="Processing Status", lines=2, interactive=False) - faceswap_action_buttons = gr.HTML(visible=False) - - faceswap_btn.click( - fn=face_swap_interface, - inputs=[faceswap_target_editor, faceswap_source], - outputs=[faceswap_output, faceswap_status, faceswap_action_buttons], - show_progress=True, - concurrency_limit=20 - ) - - # ============================================================================= - # Tab 6: 去水印 - # ============================================================================= - with gr.Tab("🧽 Remove Watermark") as watermark_tab: - with gr.Row(): - with gr.Column(scale=1): - gr.Markdown("### 🧽 Upload Image") - watermark_input = gr.Image(label="Select image to remove watermark", type="pil", height=400, elem_classes=["upload-area"]) - watermark_force_removal = gr.Checkbox( - label="💪 Force Removal", - info="Ultimate AI-powered watermark removal. AI analyzes the watermark for better results.", - value=False - ) - watermark_btn = gr.Button("🧽 Remove Watermark", variant="primary", size="lg") - - with gr.Column(scale=1): - gr.Markdown("🎯 Result") - watermark_output = gr.HTML(label="Image without watermark", elem_classes=["result-area"]) - watermark_use_as_input_btn = gr.Button("🔄 Use as Input (Try Again)", variant="secondary", size="sm") - watermark_status = gr.Textbox(label="Processing status", lines=2, interactive=False) - watermark_action_buttons = gr.HTML(visible=False) - - watermark_btn.click( - fn=watermark_removal_interface, - inputs=[watermark_input, watermark_force_removal], - outputs=[watermark_output, watermark_status, watermark_action_buttons], - show_progress=True, - concurrency_limit=20 - ) - - # 定义一个函数来实现"使用结果作为输入"功能 - def watermark_use_result(output_html): - # 从HTML中提取图片URL并下载 - if output_html and 'src=' in str(output_html): - import re - match = re.search(r"src='([^']+)'", str(output_html)) - if match: - url = match.group(1) - try: - import requests - from PIL import Image - import io - response = requests.get(url, timeout=10) - if response.status_code == 200: - return Image.open(io.BytesIO(response.content)) - except: - pass - return None - - watermark_use_as_input_btn.click( - fn=watermark_use_result, - inputs=[watermark_output], - outputs=[watermark_input] - ) - - # ============================================================================= - # SEO内容区域 - # ============================================================================= - gr.HTML(""" -
- - -
-

🎨 Unlimited AI Image Generation & Editing

-

Access unlimited image generation, video creation, and advanced editing features. No time limitation, no ads, no watermark.

-
- 🚀 Get Unlimited Access -
-
- - -
-

- 🎨 What Omni Model Can Do -

-

- This Space demonstrates just a small fraction of what the Omni model can do. Beyond what you see here, it can also perform: -

-
- 👗 Virtual Try-On - 🌅 Background Replacement - 💇 Hairstyle Changer - 📄 Poster Editing - 🎨 Style Transfer - + dozens more... -
-

- No fine-tuning required - just modify the prompt and input parameters! -

-
- - -
-

- 🤖 Omni Creator 2.0: 8B Unified Multi-Modal Diffusion Transformer -

-

- 8B-parameter native MM-DiT unifying T2I, pixel-level editing, and I2V generation. - Uses CLIP/T5-style text encoders and visual conditioners to deliver high-fidelity multi-modal results with shared transformer backbone. -

- - -
-
- RoPE + AdaLN-Zero DiT Blocks -

Multi-head attention with timestep-conditioned modulation

-
-
- Adaptive Multi-Modal Gating -

Learned fusion of text, 3× images, and temporal context

-
-
- HPC-Ready Optimization -

FP8 + RoPE + RMSNorm for production-scale inference

-
-
- - -
-
📊 AdaLN-Zero + AMG (Full Stack)
-
-# 1) Modal fusion (text / up to 3 imgs / temporal)
-α = Softmax(MLP([ctxt; cimg; ctmp]));  C = αtxt·ctxt + Σ αi·cimg,i + αtmp·ctmp
-# 2) Time-conditioned modulation
-h = TEmbed(t) ⊕ C;  (γ, β, λ) = MLP(h)
-# 3) RoPE self-attn + gated residual
-Q = RoPE(Wq·LN(x)); K = RoPE(Wk·LN(x)); V = Wv·LN(x)
-A = Softmax((QKT/√d) + Brel) · V
-x ← x + λ ⊙ A · (1+γ) + β + CrossAttn(x, C)
-# 4) AdaLN-zero modulated SwiGLU FFN
-u = SwiGLU(W1·LN(x));   x ← x + λ ⊙ (W2·u) · (1+γ) + β -
-
-
- - -
-

- ⚡ OmniScheduler: Unified Hybrid Flow-Diffusion Sampler -

-

- Sampling framework that couples Flow Matching, Karras sigmas, and Heun/RK4 ODE solvers for fast 4–8 step generation. -

- - -
-
-

🎯 Few-Step Flow Matching

-

Supports velocity/epsilon/sample prediction with flow-to-velocity conversion, enabling 4–8 step inference.

-
-
-

🔄 Multi-Stage Sampling

-

Coarse-to-fine pipeline: ~70% steps for coarse draft, optional refine passes for details.

-
-
-

📈 RK4 Hybrid ODE

-

Runge-Kutta 4th order solver with flow matching conditioning for optimal trajectory evolution.

-
-
- - -
-
🔢 Flow Matching Formulation
-
-

Given data distribution p₁(x) and noise distribution p₀(z) = N(0, I), the Rectified Flow defines:

-
- x_t = (1 - t) · z + t · x    where t ∈ [0, 1] -
-

The velocity field v_θ(x_t, t) is trained to match the conditional velocity:

-
- L_FM = E_{t,x,z}[ ||v_θ(x_t, t) - (x - z)||² ] -
-

At inference, we solve the ODE: dx/dt = v_θ(x_t, t) from t=0 to t=1

-
-
- - -
-
π π-Flow Policy Network (Coarse Trajectory)
-
-
Instead of evaluating the model dozens of times, a lightweight policy network can predict a multi-step velocity trajectory in one forward pass.
-
-
# One-shot trajectory prediction (coarse stage)
-
v_{0:S-1} = π_φ(z₀, c, t_grid) - x_{k+1} = x_k + v_k · Δt (k = 0..S-1)
-
-
How it integrates with OmniScheduler:
-
-
Stage-1 (coarse): apply the predicted velocities directly (policy rollout) to rapidly move along the flow-matching path.
-
Stage-2 (refine): optionally switch to Heun/RK4 higher-order updates for detail recovery and stability.
-
Multi-modal conditioning: the policy is conditioned on aggregated text/visual context + time embedding, outputting velocity fields matching latent shape.
-
-
-
- - -
-
🎲 Multi-Stage Sampling Pipeline
-
-
-
📷
-
Input
-
Text + Images
-
-
-
-
-
Stage 1
-
Coarse (≈70%)
-
Euler/Heun, few steps
-
-
-
-
💎
-
Stage 2
-
Refine (≈30%)
-
RK4 optional
-
-
-
-
🎨
-
Output
-
HD Image/Video
-
-
-
-
- - -
-

- 📈 State-of-the-Art Model Comparison (2025) -

-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
ModelParamsArchitectureTrainingInferenceNFEAcceleration
FLUX.2-Dev32BDiT + MMFlow MatchingEuler/DPM50FP8 + FlashAttn
Qwen-Image20BDiT + MLLMRectified FlowFlowMatch Euler30-50Lightning (4-8步)
Qwen-Image-Edit20BDiT + Dual-BranchFlow MatchingEuler28-50Prompt Rewrite
HunyuanVideo13B+AsymmDiTDiffusionMulti-step50+FP8 + Multi-frame
Wan2.25B/14BDiT + MoEDiffusionMulti-step30-50MoE Routing + FP8
Z-Image-Turbo6BDistilled DiTProgressive DistillFew-step4-8Teacher-Student
Mochi10BVideo DiTDiffusionMulti-step50+ComfyUI Parallel
⭐ Omni Creator 2.08BMM-DiT + AMGπ-Flow + FMRK4 Hybrid4-8Policy Distill + Multi-Stage
-
- - -
- Abbreviations: - DiT = Diffusion Transformer | MM = Multi-Modal | MLLM = Multimodal LLM | MoE = Mixture of Experts | - FM = Flow Matching | NFE = Number of Function Evaluations | AMG = Adaptive Multi-Modal Gating -
-
-
- """) - - return app - - -if __name__ == "__main__": - app = create_app() - app.queue( - default_concurrency_limit=20, - max_size=50, - api_open=False - ) - app.launch( - server_name="0.0.0.0", - show_error=True, - quiet=False, - max_threads=40, - height=800, - favicon_path=None - )