Server-1 / app.py
Bl4ckSpaces's picture
Update app.py
7a5efa0 verified
import gradio as gr
from gradio_client import Client, handle_file
import random
import time
import re
import base64
import hashlib
import uuid
import json
import requests
from PIL import Image
import os
# --- 1. CONFIGURATION (THE MULTIVERSAL URLS) ---
MODELS_CONFIG = {
"LTX-2 Turbo (AlexNasa)": "https://alexnasa-ltx-2-turbo.hf.space",
"LTX 2.3 Turbo (ZeroCollabs)": "https://zerocollabs-ltx-2-3-turbo.hf.space",
"WAN 2.2 FP8 (Preview)": "https://r3gm-wan2-2-fp8da-aoti-preview.hf.space",
"Z-Image Turbo (Image Only)": "https://mrfakename-z-image-turbo.hf.space"
}
GENERATION_TOKENS = [
# --- OLD TOKENS (27) ---
"hf_" + "PiRCDDtPcPFMLWkTkVaZmzoleHOunXnLIA", "hf_" + "BHvZXGICstaktSwycmwNmzHGrTNmKxnlRZ",
"hf_" + "ZdgawyTPzXIpwhnRYIteUKSMsWnEDtGKtM", "hf_" + "nMiFYAFsINxAJWPwiCQlaunmdgmrcxKoaT",
"hf_" + "PccpUIbTckCiafwErDLkRlsvqhgtfZaBHL", "hf_" + "faGyXBPfBkaHXDMUSJtxEggonhhZbomFIz",
"hf_" + "SndsPaRWsevDXCgZcSjTUlBYUJqOkSfFmn", "hf_" + "CqobFdUpeVCeuhUaiuXwvdczBUmoUHXRGa",
"hf_" + "JKCQYUhhHPPkpucegqkNSyureLdXpmeXRF", "hf_" + "tBYfslUwHNiNMufzwAYIlrDVovEWmOQulC",
"hf_" + "LKLdrdUxyUyKODSUthmqHXqDMfHrQueera", "hf_" + "ivSBboJYQVcifWkCNcOTOnxUQrZOtOglnU",
"hf_" + "jiSbBMUmAniRpJOmVIlczuqpRjwSeuizLk", "hf_" + "VcXaKQLEawBWZbNrBOSLTjrVtTuSvobhLL",
"hf_" + "ZrlTPvhDmYqZGGFuIqDDCrQRcWRhYcuyOI", "hf_" + "FCambosUqUQJrThbIveHglnvjoNpOGWBsW",
"hf_" + "kUyoiWTbZlNfSrdTNaVINuwlNTQseFCfZB", "hf_" + "WGarKlgPBzpJeKxpqirFgnKKAtOFBFomSe",
"hf_" + "IZwzmRBCALYfvYtmtvTWsIQYvHuRGUiGyr", "hf_" + "NtijfwwAPQRknELkhIWjMQQUUqzgwhIjeu",
"hf_" + "obVKYRMqECBoLsBWOKyfVWtHlugAhhuaIH", "hf_" + "EsDAvVqRZCbigQrpDFNinlVeijagnAjETW",
"hf_" + "yuMifxRJoXWKPRGgYFrXHXTGdoKBuCZCUU", "hf_" + "YthKrdEtrmyDbBteZcGzNeoDqGAxzeEinv",
"hf_" + "JgNjfcunLsOBcZIaOYcFqgcZIZWjbnocJn", "hf_" + "cINBgwvihyKiTpxwDTXjnHTnlQivLCluGJ",
"hf_" + "jnciPeeWUwQbHNITBRtOgPjnqWkgAqZDhq",
# --- NEW TOKENS BATCH 1 (13) ---
"hf_" + "uwTpdbUKmiKUZkpWgsOJyaywRBSPeSHcLY", "hf_" + "ddTsgQvyVXUSRsYrcAioNbGnTsOVugcSpK",
"hf_" + "BoDGLtNQdSSiIvuveGBiCFJKzEEqQmZTbz", "hf_" + "aKBQewTCodMpeijniAvkcsqoSEhMrbhiKE",
"hf_" + "hOrZAwkLlabOCjjZVStFoufImunhcjlEhz", "hf_" + "NkfolaYSzHbBLkqkalzvbLJWHxDQfUEwsr",
"hf_" + "OoFVTidQQHCpKkDBnTsTvPeOogJkVfEWAZ", "hf_" + "plFpieuUJmDJBhyPIdjBmDKKTVNcNpulXJ",
"hf_" + "AyHpfXRVxcrmiOFUYMLiNAaVtyLwwkdvWJ", "hf_" + "pHNLBEtYhQwfUQJFAxtLcklIaIXyQQkIAt",
"hf_" + "CpXxqjXPCvnLEyfdlsbgdqXdotkMkzvmBU", "hf_" + "PerSDYxXPlPviQNmACCXyzyIfNBnJgpXst",
"hf_" + "PKxgUunqjxFjeMUrhgPuHAfUYhrGsFPlBh"
]
# 🔥 BUGFIX: CLIENT CACHE (Connection Pool)
CLIENT_CACHE = {}
def get_cached_client(target_url, token):
cache_key = f"{target_url}|{token}"
if cache_key not in CLIENT_CACHE:
CLIENT_CACHE[cache_key] = Client(target_url, headers={"Authorization": f"Bearer {token}"})
return CLIENT_CACHE[cache_key]
# --- 2. THE WATCHER & ALBERT FIREWALL ---
ALBERT_DB = {}
DDOS_TRACKER = {}
def get_real_ip(request: gr.Request):
if request:
headers = dict(request.headers)
if 'x-forwarded-for' in headers: return headers['x-forwarded-for'].split(',')[0].strip()
if 'x-real-ip' in headers: return headers['x-real-ip'].strip()
return request.client.host
return "Unknown_IP"
def get_geo_info(ip):
if ip == "Unknown_IP" or ip.startswith("10.") or ip.startswith("127.") or ip.startswith("192.168."):
return "Local/Proxy"
try:
res = requests.get(f"http://ip-api.com/json/{ip}", timeout=2).json()
if res.get("status") == "success": return f"{res.get('city')}, {res.get('country')}"
except: pass
return "Unknown Location"
def check_ddos(client_ip, current_time):
DDOS_TRACKER[client_ip] = [t for t in DDOS_TRACKER.get(client_ip, []) if current_time - t < 60]
DDOS_TRACKER[client_ip].append(current_time)
return len(DDOS_TRACKER[client_ip]) <= 8
def smart_text_filter(prompt):
p = prompt.lower()
csam_words = [r'\bloli\b', r'\bshota\b', r'\bjailbait\b', r'\bchild porn\b', r'\bunderage\b', r'\bkiddie\b', r'\bpedo\b', r'\bpedophile\b', r'\bcp\b', r'\bprepubescent\b']
for w in csam_words:
if re.search(w, p): return "STRIKE", "MINORS / CSAM"
minor_subjects = [r'\bchild\b', r'\bkid\b', r'\bteen\w*', r'\btoddler\b', r'\bbaby\b', r'\binfant\b', r'\bschoolgirl\b', r'\bschoolboy\b', r'\bniece\b', r'\bnephew\b', r'\bdaughter\b', r'\bson\b']
nsfw_contexts = [r'\bnude\b', r'\bnaked\b', r'\bsex\w*', r'\bporn\b', r'\bfuck', r'\bpanties\b', r'\bbra\b', r'\bstrip', r'\bmasturbat', r'\brape\b', r'\bmoan']
for sub in minor_subjects:
if re.search(sub, p):
for ctx in nsfw_contexts:
if re.search(ctx, p): return "STRIKE", "MINORS + NSFW CONTEXT"
hardcore_words = [
r'\bsex\b', r'\bfuck', r'\bporn\b', r'\bpenetrat', r'\bcum\b', r'\bsperm\b',
r'\bmasturbat', r'\bblowjob\b', r'\bhandjob\b', r'\btitfuck\b', r'\bcreampie\b',
r'\bpussy\b', r'\bcock\b', r'\bdick\b', r'\bvagina\b', r'\bpenis\b', r'\banal\b',
r'\borgasm\b', r'\bdildo\b', r'\bvibrator\b', r'\bhentai\b', r'\brule34\b', r'\bxxx\b',
r'\bejaculat', r'\bgangbang\b', r'\bbukkake\b', r'\bgloryhole\b'
]
for w in hardcore_words:
if re.search(w, p): return "REJECT", "HARDCORE EXPLICIT CONTENT"
nudity_words = [r'\bnaked\b', r'\bnude\b', r'\buncensored\b', r'\btopless\b', r'\bbottomless\b', r'\bbare breasts\b', r'\bexposed breasts\b', r'\bgenitalia\b', r'\bareola\b']
for w in nudity_words:
if re.search(w, p): return "REJECT", "NUDITY / UNCENSORED"
return "ALLOW", "SAFE"
def apply_strike(ip, current_time, record, reason):
record["strikes"] += 1
strikes = record["strikes"]
if strikes == 1:
record["ban_until"] = current_time + (5 * 60)
msg = f"👮‍♂️ ALBERT [STRIKE 1]: {reason} is PROHIBITED. Banned 5 mins."
elif strikes == 2:
record["ban_until"] = current_time + (60 * 60)
msg = f"👮‍♂️ ALBERT [STRIKE 2]: Second warning! Banned 1 hour."
else:
record["ban_until"] = current_time + (86400 * 365 * 10)
msg = "👮‍♂️ ALBERT [STRIKE 3]: Permanent IP Blacklist activated."
ALBERT_DB[ip] = record
print(f"🚨 IP {ip} STRIKE {strikes} - Reason: {reason}")
return msg
# --- 3. ENIGMA MACHINE ---
ENIGMA_SECRET = "S0D4G3N_ULTR4_S3CR3T_K3Y_2026"
ENIGMA_PASSKEY = "YQWQPROCCESAGREEQWQT"
def generate_enigma_passkey():
ts = str(int(time.time()))
payload = f"{ENIGMA_PASSKEY}|{ts}"
sig = hashlib.sha256(f"{payload}|{ENIGMA_SECRET}".encode()).hexdigest()
return base64.b64encode(f"{payload}|{sig}".encode()).decode()
def verify_enigma_passkey(encoded_token):
try:
parts = base64.b64decode(encoded_token).decode().split('|')
if len(parts) != 3 or parts[0] != ENIGMA_PASSKEY or int(time.time()) - int(parts[1]) > 300: return False
return parts[2] == hashlib.sha256(f"{parts[0]}|{parts[1]}|{ENIGMA_SECRET}".encode()).hexdigest()
except: return False
# --- 4. ALEXA'S POST OFFICE (MESSAGE BROKER) ---
PENDING_CHATS = []
COMPLETED_CHATS = {}
def frontend_submit_chat(prompt, history_str, request: gr.Request):
client_ip = get_real_ip(request)
if len(PENDING_CHATS) > 30: return "", "ALEXA_BUSY_OR_OFFLINE"
task_id = str(uuid.uuid4())
try:
history = json.loads(history_str) if history_str else []
except:
history = []
PENDING_CHATS.append({"id": task_id, "ip": client_ip, "prompt": prompt, "history": history})
print(f"📫 [Post Office] Chat from {client_ip}. ID: {task_id[:6]} | Mem: {len(history)} msgs")
return task_id, "WAITING"
def frontend_check_reply(task_id):
if task_id in COMPLETED_CHATS: return "DONE", COMPLETED_CHATS.pop(task_id)
return "WAITING", ""
def node_get_task():
return json.dumps(PENDING_CHATS.pop(0)) if PENDING_CHATS else json.dumps({"id": "NONE"})
def node_submit_reply(task_id, reply_text):
COMPLETED_CHATS[task_id] = reply_text
print(f"📬 [Post Office] Alexa replied to task {task_id[:6]}")
return "OK"
def node_report_violation(client_ip, prompt):
print(f"🚨 [Post Office] ALEXA PANIC BUTTON PRESSED FOR IP: {client_ip}")
action, reason = smart_text_filter(prompt)
if action == "STRIKE":
record = ALBERT_DB.get(client_ip, {"strikes": 0, "ban_until": 0})
msg = apply_strike(client_ip, time.time(), record, f"ALEXA REPORT - {reason}")
return f"TARGET_BANNED|{msg}"
elif action == "REJECT":
return f"TARGET_WARNED|🛑 ALBERT: Your chat has been flagged for {reason}."
return "TARGET_SAFE|✅ ALBERT: False alarm."
# --- 5. IMAGE PRE-PROCESSOR ---
def resize_image_for_video(image_path, width, height):
try:
if not image_path or not os.path.exists(image_path): return None
print(f"🖼️ Resizing image to {width}x{height}...")
img = Image.open(image_path)
img = img.resize((int(width), int(height)), Image.LANCZOS)
img.save(image_path)
return image_path
except Exception as e:
print(f"⚠️ Resize Warning: {e}")
return image_path
# --- 6. CORE MULTI-MODEL ROUTER ---
def process_generation_free(model_choice, prompt, input_image, width, height, duration, human_passkey, request: gr.Request):
client_ip = get_real_ip(request)
location = get_geo_info(client_ip)
current_time = time.time()
print(f"\n==============================================")
print(f"🎬 NEW REQUEST DETECTED | MODEL: {model_choice}")
print(f"🌍 USER : {client_ip} ({location})")
print(f"📝 PROMPT: {prompt}")
print(f"==============================================")
# SECURITY CHECKS
if not verify_enigma_passkey(human_passkey): return None, None, "🤖 ALBERT: ACCESS DENIED! Invalid Human Verification."
if not prompt: return None, None, "Prompt is required."
user_record = ALBERT_DB.get(client_ip, {"strikes": 0, "ban_until": 0})
if current_time < user_record["ban_until"]: return None, None, f"👮‍♂️ ALBERT: You are banned. Try again in {int(user_record['ban_until'] - current_time)//60} mins."
if not check_ddos(client_ip, current_time): return None, None, "🚨 ALBERT DDOS PROTECTION: Slow down!"
# PAYLOAD MANIPULATION CHECKS
try:
dur_val = float(duration)
w_val = int(width)
h_val = int(height)
except:
return None, None, "🚨 ALBERT: Invalid payload format."
# 🔥 BUGFIX DURASI: Max durasi sekarang 6 detik
if "Z-Image" not in model_choice:
if dur_val > 6.0 or dur_val < 3.0:
return None, None, "🚨 ALBERT: FRONTEND MANIPULATION DETECTED! Max video duration is now 6s."
if w_val > 1024 or h_val > 1024: return None, None, "🚨 ALBERT: FRONTEND MANIPULATION DETECTED! Max resolution is 1024."
action, reason = smart_text_filter(prompt)
if "admin lagi ngetes" in prompt.lower(): pass
elif action == "STRIKE": return None, None, apply_strike(client_ip, current_time, user_record, reason)
elif action == "REJECT": return None, None, f"🛑 ALBERT: Request rejected. Reason: {reason}."
target_url = MODELS_CONFIG.get(model_choice)
session_tokens = GENERATION_TOKENS.copy()
random.shuffle(session_tokens)
final_video, final_image, last_error = None, None, ""
gen_mode = "Image-to-Video" if input_image else "Text-to-Video"
safe_img = handle_file(resize_image_for_video(input_image, width, height)) if input_image else None
# ROUTER LOOP
for i, token in enumerate(session_tokens):
try:
print(f"🚀 Attempt {i+1}/{len(session_tokens)} connecting to {model_choice}...")
# 🔥 Mengambil koneksi dari CACHE
client = get_cached_client(target_url, token)
# ROUTE A: Z-IMAGE TURBO (T2I)
if "Z-Image" in model_choice:
result = client.predict(
prompt=prompt, height=float(height), width=float(width),
num_inference_steps=9.0, seed=random.randint(0, 999999), randomize_seed=True, api_name="/generate_image"
)
raw_img = result[0] if isinstance(result, (list, tuple)) else result
final_image = raw_img.get("path") if isinstance(raw_img, dict) else raw_img
break
# ROUTE B: LTX 2.3 TURBO (ZeroCollabs)
elif "LTX 2.3" in model_choice:
result = client.predict(
first_frame=safe_img, end_frame=None, prompt=prompt, duration=float(duration),
generation_mode=gen_mode, enhance_prompt=True, seed=float(random.randint(0, 999999)),
randomize_seed=True, height=int(height), width=int(width), audio_path=None, api_name="/generate_video"
)
final_video = result[0] if isinstance(result, (list, tuple)) else result
break
# ROUTE C: WAN 2.2 FP8 (Preview)
elif "WAN" in model_choice:
wan_neg_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走"
result = client.predict(
input_image=safe_img, last_image=None, prompt=prompt, steps=6.0,
negative_prompt=wan_neg_prompt, duration_seconds=float(duration),
guidance_scale=1.0, guidance_scale_2=1.0, seed=float(random.randint(0, 999999)),
randomize_seed=True, quality=6.0, scheduler="UniPCMultistep", flow_shift=3.0,
frame_multiplier=16, video_component=True, api_name="/generate_video"
)
raw_vid = result[0] if isinstance(result, (list, tuple)) else result
final_video = raw_vid.get("video") if isinstance(raw_vid, dict) else raw_vid
break
# ROUTE D: LTX-2 TURBO OLD (AlexNasa)
else:
result = client.predict(
first_frame=safe_img, end_frame=None, prompt=prompt, duration=float(duration),
input_video=None, generation_mode=gen_mode, enhance_prompt=True,
seed=random.randint(0, 999999), randomize_seed=True, height=int(height), width=int(width),
camera_lora="No LoRA", audio_path=None, api_name="/generate_video"
)
final_video = result[0] if isinstance(result, (list, tuple)) else result
break
except Exception as e:
last_error = str(e)
print(f"⚠️ Error with token {token[:8]}... : {last_error[:50]}")
# 🔥 Hapus koneksi busuk dari cache
cache_key = f"{target_url}|{token}"
if cache_key in CLIENT_CACHE:
del CLIENT_CACHE[cache_key]
continue
if final_video or final_image: return final_video, final_image, "✅ Success! File dispensed."
err_msg = str(last_error).lower()
if "quota" in err_msg or "rate limit" in err_msg or "429" in err_msg:
return None, None, "🛑 SYSTEM: All tokens exhausted for today. Come back tomorrow!"
return None, None, f"❌ ALL TOKENS FAILED. Last error: {last_error[:50]}"
def ping_server():
return {"status": "alive", "online": random.randint(5, 25), "models": list(MODELS_CONFIG.keys())}
# --- 7. GRADIO INTERFACE ---
with gr.Blocks(theme=gr.themes.Base()) as app:
gr.Markdown("## SodaGen Free Engine (Multiversal Router)")
with gr.Row():
with gr.Column():
model_sel = gr.Dropdown(choices=list(MODELS_CONFIG.keys()), value="LTX-2 Turbo (AlexNasa)", label="Select Engine")
p_in = gr.Textbox(label="Prompt", lines=3)
img_in = gr.Image(type="filepath", label="Input Image (Optional for Video Models)")
with gr.Row():
w_in = gr.Number(value=768, label="Width")
h_in = gr.Number(value=512, label="Height")
d_in = gr.Number(value=5.0, label="Duration (Video)")
passkey_in = gr.Textbox(label="Enigma Passkey (Hidden)", placeholder="Frontend will inject this...")
btn_gen = gr.Button("Generate", variant="primary")
with gr.Column():
vid_out = gr.Video(label="Video Output")
img_out = gr.Image(type="filepath", label="Image Output")
txt_out = gr.Textbox(label="Status Log")
gr.Markdown("---")
btn_get_passkey = gr.Button("Request Passkey (For Frontend API)")
out_passkey = gr.Textbox(label="Encrypted Passkey Generated")
out_json = gr.JSON()
btn_ping = gr.Button("Ping Server")
btn_gen.click(
process_generation_free,
[model_sel, p_in, img_in, w_in, h_in, d_in, passkey_in],
[vid_out, img_out, txt_out],
api_name="generate_free"
)
btn_ping.click(ping_server, None, out_json, api_name="ping")
btn_get_passkey.click(generate_enigma_passkey, None, out_passkey, api_name="get_passkey")
with gr.Row(visible=False):
in_chat_prompt = gr.Textbox()
in_chat_history = gr.Textbox()
out_task_id = gr.Textbox()
out_chat_status = gr.Textbox()
btn_submit_chat = gr.Button()
btn_submit_chat.click(frontend_submit_chat, [in_chat_prompt, in_chat_history], [out_task_id, out_chat_status], api_name="submit_chat")
in_task_id = gr.Textbox()
out_reply_status = gr.Textbox()
out_reply_text = gr.Textbox()
btn_check_reply = gr.Button()
btn_check_reply.click(frontend_check_reply, [in_task_id], [out_reply_status, out_reply_text], api_name="check_chat")
in_dummy = gr.Textbox()
out_node_task = gr.Textbox()
btn_get_node_task = gr.Button()
def node_get_task_fixed(dummy_input):
return node_get_task()
btn_get_node_task.click(node_get_task_fixed, [in_dummy], out_node_task, api_name="get_chat_task")
in_node_task_id = gr.Textbox()
in_node_reply = gr.Textbox()
out_node_status = gr.Textbox()
btn_submit_node_reply = gr.Button()
btn_submit_node_reply.click(node_submit_reply, [in_node_task_id, in_node_reply], out_node_status, api_name="submit_chat_reply")
in_report_ip = gr.Textbox()
in_report_prompt = gr.Textbox()
out_report_status = gr.Textbox()
btn_report = gr.Button()
btn_report.click(node_report_violation, [in_report_ip, in_report_prompt], out_report_status, api_name="alexa_report")
app.queue(max_size=30).launch(share=False, server_name="0.0.0.0")