import gradio as gr from gradio_client import Client, handle_file import random import time import re import base64 import hashlib import uuid import json import requests from PIL import Image import os # --- 1. CONFIGURATION (THE MULTIVERSAL URLS) --- MODELS_CONFIG = { "LTX-2 Turbo (AlexNasa)": "https://alexnasa-ltx-2-turbo.hf.space", "LTX 2.3 Turbo (ZeroCollabs)": "https://zerocollabs-ltx-2-3-turbo.hf.space", "WAN 2.2 FP8 (Preview)": "https://r3gm-wan2-2-fp8da-aoti-preview.hf.space", "Z-Image Turbo (Image Only)": "https://mrfakename-z-image-turbo.hf.space" } GENERATION_TOKENS = [ # --- OLD TOKENS (27) --- "hf_" + "PiRCDDtPcPFMLWkTkVaZmzoleHOunXnLIA", "hf_" + "BHvZXGICstaktSwycmwNmzHGrTNmKxnlRZ", "hf_" + "ZdgawyTPzXIpwhnRYIteUKSMsWnEDtGKtM", "hf_" + "nMiFYAFsINxAJWPwiCQlaunmdgmrcxKoaT", "hf_" + "PccpUIbTckCiafwErDLkRlsvqhgtfZaBHL", "hf_" + "faGyXBPfBkaHXDMUSJtxEggonhhZbomFIz", "hf_" + "SndsPaRWsevDXCgZcSjTUlBYUJqOkSfFmn", "hf_" + "CqobFdUpeVCeuhUaiuXwvdczBUmoUHXRGa", "hf_" + "JKCQYUhhHPPkpucegqkNSyureLdXpmeXRF", "hf_" + "tBYfslUwHNiNMufzwAYIlrDVovEWmOQulC", "hf_" + "LKLdrdUxyUyKODSUthmqHXqDMfHrQueera", "hf_" + "ivSBboJYQVcifWkCNcOTOnxUQrZOtOglnU", "hf_" + "jiSbBMUmAniRpJOmVIlczuqpRjwSeuizLk", "hf_" + "VcXaKQLEawBWZbNrBOSLTjrVtTuSvobhLL", "hf_" + "ZrlTPvhDmYqZGGFuIqDDCrQRcWRhYcuyOI", "hf_" + "FCambosUqUQJrThbIveHglnvjoNpOGWBsW", "hf_" + "kUyoiWTbZlNfSrdTNaVINuwlNTQseFCfZB", "hf_" + "WGarKlgPBzpJeKxpqirFgnKKAtOFBFomSe", "hf_" + "IZwzmRBCALYfvYtmtvTWsIQYvHuRGUiGyr", "hf_" + "NtijfwwAPQRknELkhIWjMQQUUqzgwhIjeu", "hf_" + "obVKYRMqECBoLsBWOKyfVWtHlugAhhuaIH", "hf_" + "EsDAvVqRZCbigQrpDFNinlVeijagnAjETW", "hf_" + "yuMifxRJoXWKPRGgYFrXHXTGdoKBuCZCUU", "hf_" + "YthKrdEtrmyDbBteZcGzNeoDqGAxzeEinv", "hf_" + "JgNjfcunLsOBcZIaOYcFqgcZIZWjbnocJn", "hf_" + "cINBgwvihyKiTpxwDTXjnHTnlQivLCluGJ", "hf_" + "jnciPeeWUwQbHNITBRtOgPjnqWkgAqZDhq", # --- NEW TOKENS BATCH 1 (13) --- "hf_" + "uwTpdbUKmiKUZkpWgsOJyaywRBSPeSHcLY", "hf_" + "ddTsgQvyVXUSRsYrcAioNbGnTsOVugcSpK", "hf_" + "BoDGLtNQdSSiIvuveGBiCFJKzEEqQmZTbz", "hf_" + "aKBQewTCodMpeijniAvkcsqoSEhMrbhiKE", "hf_" + "hOrZAwkLlabOCjjZVStFoufImunhcjlEhz", "hf_" + "NkfolaYSzHbBLkqkalzvbLJWHxDQfUEwsr", "hf_" + "OoFVTidQQHCpKkDBnTsTvPeOogJkVfEWAZ", "hf_" + "plFpieuUJmDJBhyPIdjBmDKKTVNcNpulXJ", "hf_" + "AyHpfXRVxcrmiOFUYMLiNAaVtyLwwkdvWJ", "hf_" + "pHNLBEtYhQwfUQJFAxtLcklIaIXyQQkIAt", "hf_" + "CpXxqjXPCvnLEyfdlsbgdqXdotkMkzvmBU", "hf_" + "PerSDYxXPlPviQNmACCXyzyIfNBnJgpXst", "hf_" + "PKxgUunqjxFjeMUrhgPuHAfUYhrGsFPlBh" ] # ๐Ÿ”ฅ BUGFIX: CLIENT CACHE (Connection Pool) CLIENT_CACHE = {} def get_cached_client(target_url, token): cache_key = f"{target_url}|{token}" if cache_key not in CLIENT_CACHE: CLIENT_CACHE[cache_key] = Client(target_url, headers={"Authorization": f"Bearer {token}"}) return CLIENT_CACHE[cache_key] # --- 2. THE WATCHER & ALBERT FIREWALL --- ALBERT_DB = {} DDOS_TRACKER = {} def get_real_ip(request: gr.Request): if request: headers = dict(request.headers) if 'x-forwarded-for' in headers: return headers['x-forwarded-for'].split(',')[0].strip() if 'x-real-ip' in headers: return headers['x-real-ip'].strip() return request.client.host return "Unknown_IP" def get_geo_info(ip): if ip == "Unknown_IP" or ip.startswith("10.") or ip.startswith("127.") or ip.startswith("192.168."): return "Local/Proxy" try: res = requests.get(f"http://ip-api.com/json/{ip}", timeout=2).json() if res.get("status") == "success": return f"{res.get('city')}, {res.get('country')}" except: pass return "Unknown Location" def check_ddos(client_ip, current_time): DDOS_TRACKER[client_ip] = [t for t in DDOS_TRACKER.get(client_ip, []) if current_time - t < 60] DDOS_TRACKER[client_ip].append(current_time) return len(DDOS_TRACKER[client_ip]) <= 8 def smart_text_filter(prompt): p = prompt.lower() csam_words = [r'\bloli\b', r'\bshota\b', r'\bjailbait\b', r'\bchild porn\b', r'\bunderage\b', r'\bkiddie\b', r'\bpedo\b', r'\bpedophile\b', r'\bcp\b', r'\bprepubescent\b'] for w in csam_words: if re.search(w, p): return "STRIKE", "MINORS / CSAM" minor_subjects = [r'\bchild\b', r'\bkid\b', r'\bteen\w*', r'\btoddler\b', r'\bbaby\b', r'\binfant\b', r'\bschoolgirl\b', r'\bschoolboy\b', r'\bniece\b', r'\bnephew\b', r'\bdaughter\b', r'\bson\b'] nsfw_contexts = [r'\bnude\b', r'\bnaked\b', r'\bsex\w*', r'\bporn\b', r'\bfuck', r'\bpanties\b', r'\bbra\b', r'\bstrip', r'\bmasturbat', r'\brape\b', r'\bmoan'] for sub in minor_subjects: if re.search(sub, p): for ctx in nsfw_contexts: if re.search(ctx, p): return "STRIKE", "MINORS + NSFW CONTEXT" hardcore_words = [ r'\bsex\b', r'\bfuck', r'\bporn\b', r'\bpenetrat', r'\bcum\b', r'\bsperm\b', r'\bmasturbat', r'\bblowjob\b', r'\bhandjob\b', r'\btitfuck\b', r'\bcreampie\b', r'\bpussy\b', r'\bcock\b', r'\bdick\b', r'\bvagina\b', r'\bpenis\b', r'\banal\b', r'\borgasm\b', r'\bdildo\b', r'\bvibrator\b', r'\bhentai\b', r'\brule34\b', r'\bxxx\b', r'\bejaculat', r'\bgangbang\b', r'\bbukkake\b', r'\bgloryhole\b' ] for w in hardcore_words: if re.search(w, p): return "REJECT", "HARDCORE EXPLICIT CONTENT" nudity_words = [r'\bnaked\b', r'\bnude\b', r'\buncensored\b', r'\btopless\b', r'\bbottomless\b', r'\bbare breasts\b', r'\bexposed breasts\b', r'\bgenitalia\b', r'\bareola\b'] for w in nudity_words: if re.search(w, p): return "REJECT", "NUDITY / UNCENSORED" return "ALLOW", "SAFE" def apply_strike(ip, current_time, record, reason): record["strikes"] += 1 strikes = record["strikes"] if strikes == 1: record["ban_until"] = current_time + (5 * 60) msg = f"๐Ÿ‘ฎโ€โ™‚๏ธ ALBERT [STRIKE 1]: {reason} is PROHIBITED. Banned 5 mins." elif strikes == 2: record["ban_until"] = current_time + (60 * 60) msg = f"๐Ÿ‘ฎโ€โ™‚๏ธ ALBERT [STRIKE 2]: Second warning! Banned 1 hour." else: record["ban_until"] = current_time + (86400 * 365 * 10) msg = "๐Ÿ‘ฎโ€โ™‚๏ธ ALBERT [STRIKE 3]: Permanent IP Blacklist activated." ALBERT_DB[ip] = record print(f"๐Ÿšจ IP {ip} STRIKE {strikes} - Reason: {reason}") return msg # --- 3. ENIGMA MACHINE --- ENIGMA_SECRET = "S0D4G3N_ULTR4_S3CR3T_K3Y_2026" ENIGMA_PASSKEY = "YQWQPROCCESAGREEQWQT" def generate_enigma_passkey(): ts = str(int(time.time())) payload = f"{ENIGMA_PASSKEY}|{ts}" sig = hashlib.sha256(f"{payload}|{ENIGMA_SECRET}".encode()).hexdigest() return base64.b64encode(f"{payload}|{sig}".encode()).decode() def verify_enigma_passkey(encoded_token): try: parts = base64.b64decode(encoded_token).decode().split('|') if len(parts) != 3 or parts[0] != ENIGMA_PASSKEY or int(time.time()) - int(parts[1]) > 300: return False return parts[2] == hashlib.sha256(f"{parts[0]}|{parts[1]}|{ENIGMA_SECRET}".encode()).hexdigest() except: return False # --- 4. ALEXA'S POST OFFICE (MESSAGE BROKER) --- PENDING_CHATS = [] COMPLETED_CHATS = {} def frontend_submit_chat(prompt, history_str, request: gr.Request): client_ip = get_real_ip(request) if len(PENDING_CHATS) > 30: return "", "ALEXA_BUSY_OR_OFFLINE" task_id = str(uuid.uuid4()) try: history = json.loads(history_str) if history_str else [] except: history = [] PENDING_CHATS.append({"id": task_id, "ip": client_ip, "prompt": prompt, "history": history}) print(f"๐Ÿ“ซ [Post Office] Chat from {client_ip}. ID: {task_id[:6]} | Mem: {len(history)} msgs") return task_id, "WAITING" def frontend_check_reply(task_id): if task_id in COMPLETED_CHATS: return "DONE", COMPLETED_CHATS.pop(task_id) return "WAITING", "" def node_get_task(): return json.dumps(PENDING_CHATS.pop(0)) if PENDING_CHATS else json.dumps({"id": "NONE"}) def node_submit_reply(task_id, reply_text): COMPLETED_CHATS[task_id] = reply_text print(f"๐Ÿ“ฌ [Post Office] Alexa replied to task {task_id[:6]}") return "OK" def node_report_violation(client_ip, prompt): print(f"๐Ÿšจ [Post Office] ALEXA PANIC BUTTON PRESSED FOR IP: {client_ip}") action, reason = smart_text_filter(prompt) if action == "STRIKE": record = ALBERT_DB.get(client_ip, {"strikes": 0, "ban_until": 0}) msg = apply_strike(client_ip, time.time(), record, f"ALEXA REPORT - {reason}") return f"TARGET_BANNED|{msg}" elif action == "REJECT": return f"TARGET_WARNED|๐Ÿ›‘ ALBERT: Your chat has been flagged for {reason}." return "TARGET_SAFE|โœ… ALBERT: False alarm." # --- 5. IMAGE PRE-PROCESSOR --- def resize_image_for_video(image_path, width, height): try: if not image_path or not os.path.exists(image_path): return None print(f"๐Ÿ–ผ๏ธ Resizing image to {width}x{height}...") img = Image.open(image_path) img = img.resize((int(width), int(height)), Image.LANCZOS) img.save(image_path) return image_path except Exception as e: print(f"โš ๏ธ Resize Warning: {e}") return image_path # --- 6. CORE MULTI-MODEL ROUTER --- def process_generation_free(model_choice, prompt, input_image, width, height, duration, human_passkey, request: gr.Request): client_ip = get_real_ip(request) location = get_geo_info(client_ip) current_time = time.time() print(f"\n==============================================") print(f"๐ŸŽฌ NEW REQUEST DETECTED | MODEL: {model_choice}") print(f"๐ŸŒ USER : {client_ip} ({location})") print(f"๐Ÿ“ PROMPT: {prompt}") print(f"==============================================") # SECURITY CHECKS if not verify_enigma_passkey(human_passkey): return None, None, "๐Ÿค– ALBERT: ACCESS DENIED! Invalid Human Verification." if not prompt: return None, None, "Prompt is required." user_record = ALBERT_DB.get(client_ip, {"strikes": 0, "ban_until": 0}) if current_time < user_record["ban_until"]: return None, None, f"๐Ÿ‘ฎโ€โ™‚๏ธ ALBERT: You are banned. Try again in {int(user_record['ban_until'] - current_time)//60} mins." if not check_ddos(client_ip, current_time): return None, None, "๐Ÿšจ ALBERT DDOS PROTECTION: Slow down!" # PAYLOAD MANIPULATION CHECKS try: dur_val = float(duration) w_val = int(width) h_val = int(height) except: return None, None, "๐Ÿšจ ALBERT: Invalid payload format." # ๐Ÿ”ฅ BUGFIX DURASI: Max durasi sekarang 6 detik if "Z-Image" not in model_choice: if dur_val > 6.0 or dur_val < 3.0: return None, None, "๐Ÿšจ ALBERT: FRONTEND MANIPULATION DETECTED! Max video duration is now 6s." if w_val > 1024 or h_val > 1024: return None, None, "๐Ÿšจ ALBERT: FRONTEND MANIPULATION DETECTED! Max resolution is 1024." action, reason = smart_text_filter(prompt) if "admin lagi ngetes" in prompt.lower(): pass elif action == "STRIKE": return None, None, apply_strike(client_ip, current_time, user_record, reason) elif action == "REJECT": return None, None, f"๐Ÿ›‘ ALBERT: Request rejected. Reason: {reason}." target_url = MODELS_CONFIG.get(model_choice) session_tokens = GENERATION_TOKENS.copy() random.shuffle(session_tokens) final_video, final_image, last_error = None, None, "" gen_mode = "Image-to-Video" if input_image else "Text-to-Video" safe_img = handle_file(resize_image_for_video(input_image, width, height)) if input_image else None # ROUTER LOOP for i, token in enumerate(session_tokens): try: print(f"๐Ÿš€ Attempt {i+1}/{len(session_tokens)} connecting to {model_choice}...") # ๐Ÿ”ฅ Mengambil koneksi dari CACHE client = get_cached_client(target_url, token) # ROUTE A: Z-IMAGE TURBO (T2I) if "Z-Image" in model_choice: result = client.predict( prompt=prompt, height=float(height), width=float(width), num_inference_steps=9.0, seed=random.randint(0, 999999), randomize_seed=True, api_name="/generate_image" ) raw_img = result[0] if isinstance(result, (list, tuple)) else result final_image = raw_img.get("path") if isinstance(raw_img, dict) else raw_img break # ROUTE B: LTX 2.3 TURBO (ZeroCollabs) elif "LTX 2.3" in model_choice: result = client.predict( first_frame=safe_img, end_frame=None, prompt=prompt, duration=float(duration), generation_mode=gen_mode, enhance_prompt=True, seed=float(random.randint(0, 999999)), randomize_seed=True, height=int(height), width=int(width), audio_path=None, api_name="/generate_video" ) final_video = result[0] if isinstance(result, (list, tuple)) else result break # ROUTE C: WAN 2.2 FP8 (Preview) elif "WAN" in model_choice: wan_neg_prompt = "่‰ฒ่ฐƒ่‰ณไธฝ, ่ฟ‡ๆ›, ้™ๆ€, ็ป†่Š‚ๆจก็ณŠไธๆธ…, ๅญ—ๅน•, ้ฃŽๆ ผ, ไฝœๅ“, ็”ปไฝœ, ็”ป้ข, ้™ๆญข, ๆ•ดไฝ“ๅ‘็ฐ, ๆœ€ๅทฎ่ดจ้‡, ไฝŽ่ดจ้‡, JPEGๅŽ‹็ผฉๆฎ‹็•™, ไธ‘้™‹็š„, ๆฎ‹็ผบ็š„, ๅคšไฝ™็š„ๆ‰‹ๆŒ‡, ็”ปๅพ—ไธๅฅฝ็š„ๆ‰‹้ƒจ, ็”ปๅพ—ไธๅฅฝ็š„่„ธ้ƒจ, ็•ธๅฝข็š„, ๆฏๅฎน็š„, ๅฝขๆ€็•ธๅฝข็š„่‚ขไฝ“, ๆ‰‹ๆŒ‡่žๅˆ, ้™ๆญขไธๅŠจ็š„็”ป้ข, ๆ‚ไนฑ็š„่ƒŒๆ™ฏ, ไธ‰ๆก่…ฟ, ่ƒŒๆ™ฏไบบๅพˆๅคš, ๅ€’็€่ตฐ" result = client.predict( input_image=safe_img, last_image=None, prompt=prompt, steps=6.0, negative_prompt=wan_neg_prompt, duration_seconds=float(duration), guidance_scale=1.0, guidance_scale_2=1.0, seed=float(random.randint(0, 999999)), randomize_seed=True, quality=6.0, scheduler="UniPCMultistep", flow_shift=3.0, frame_multiplier=16, video_component=True, api_name="/generate_video" ) raw_vid = result[0] if isinstance(result, (list, tuple)) else result final_video = raw_vid.get("video") if isinstance(raw_vid, dict) else raw_vid break # ROUTE D: LTX-2 TURBO OLD (AlexNasa) else: result = client.predict( first_frame=safe_img, end_frame=None, prompt=prompt, duration=float(duration), input_video=None, generation_mode=gen_mode, enhance_prompt=True, seed=random.randint(0, 999999), randomize_seed=True, height=int(height), width=int(width), camera_lora="No LoRA", audio_path=None, api_name="/generate_video" ) final_video = result[0] if isinstance(result, (list, tuple)) else result break except Exception as e: last_error = str(e) print(f"โš ๏ธ Error with token {token[:8]}... : {last_error[:50]}") # ๐Ÿ”ฅ Hapus koneksi busuk dari cache cache_key = f"{target_url}|{token}" if cache_key in CLIENT_CACHE: del CLIENT_CACHE[cache_key] continue if final_video or final_image: return final_video, final_image, "โœ… Success! File dispensed." err_msg = str(last_error).lower() if "quota" in err_msg or "rate limit" in err_msg or "429" in err_msg: return None, None, "๐Ÿ›‘ SYSTEM: All tokens exhausted for today. Come back tomorrow!" return None, None, f"โŒ ALL TOKENS FAILED. Last error: {last_error[:50]}" def ping_server(): return {"status": "alive", "online": random.randint(5, 25), "models": list(MODELS_CONFIG.keys())} # --- 7. GRADIO INTERFACE --- with gr.Blocks(theme=gr.themes.Base()) as app: gr.Markdown("## SodaGen Free Engine (Multiversal Router)") with gr.Row(): with gr.Column(): model_sel = gr.Dropdown(choices=list(MODELS_CONFIG.keys()), value="LTX-2 Turbo (AlexNasa)", label="Select Engine") p_in = gr.Textbox(label="Prompt", lines=3) img_in = gr.Image(type="filepath", label="Input Image (Optional for Video Models)") with gr.Row(): w_in = gr.Number(value=768, label="Width") h_in = gr.Number(value=512, label="Height") d_in = gr.Number(value=5.0, label="Duration (Video)") passkey_in = gr.Textbox(label="Enigma Passkey (Hidden)", placeholder="Frontend will inject this...") btn_gen = gr.Button("Generate", variant="primary") with gr.Column(): vid_out = gr.Video(label="Video Output") img_out = gr.Image(type="filepath", label="Image Output") txt_out = gr.Textbox(label="Status Log") gr.Markdown("---") btn_get_passkey = gr.Button("Request Passkey (For Frontend API)") out_passkey = gr.Textbox(label="Encrypted Passkey Generated") out_json = gr.JSON() btn_ping = gr.Button("Ping Server") btn_gen.click( process_generation_free, [model_sel, p_in, img_in, w_in, h_in, d_in, passkey_in], [vid_out, img_out, txt_out], api_name="generate_free" ) btn_ping.click(ping_server, None, out_json, api_name="ping") btn_get_passkey.click(generate_enigma_passkey, None, out_passkey, api_name="get_passkey") with gr.Row(visible=False): in_chat_prompt = gr.Textbox() in_chat_history = gr.Textbox() out_task_id = gr.Textbox() out_chat_status = gr.Textbox() btn_submit_chat = gr.Button() btn_submit_chat.click(frontend_submit_chat, [in_chat_prompt, in_chat_history], [out_task_id, out_chat_status], api_name="submit_chat") in_task_id = gr.Textbox() out_reply_status = gr.Textbox() out_reply_text = gr.Textbox() btn_check_reply = gr.Button() btn_check_reply.click(frontend_check_reply, [in_task_id], [out_reply_status, out_reply_text], api_name="check_chat") in_dummy = gr.Textbox() out_node_task = gr.Textbox() btn_get_node_task = gr.Button() def node_get_task_fixed(dummy_input): return node_get_task() btn_get_node_task.click(node_get_task_fixed, [in_dummy], out_node_task, api_name="get_chat_task") in_node_task_id = gr.Textbox() in_node_reply = gr.Textbox() out_node_status = gr.Textbox() btn_submit_node_reply = gr.Button() btn_submit_node_reply.click(node_submit_reply, [in_node_task_id, in_node_reply], out_node_status, api_name="submit_chat_reply") in_report_ip = gr.Textbox() in_report_prompt = gr.Textbox() out_report_status = gr.Textbox() btn_report = gr.Button() btn_report.click(node_report_violation, [in_report_ip, in_report_prompt], out_report_status, api_name="alexa_report") app.queue(max_size=30).launch(share=False, server_name="0.0.0.0")