import gradio as gr from gradio_client import Client import requests import json import random import time import threading import os import re import dns.resolver # Library DNS untuk cek domain asli from datetime import datetime, date from huggingface_hub import HfApi, hf_hub_download from PIL import Image # --- 1. CONFIGURATION --- # DATASET DATABASE DATASET_REPO_ID = "Bl4ckSpaces/soda-data" DATABASE_FILENAME = "soda_users.json" # TOKEN STORAGE_TOKEN = "hf_" + "gcQpArWmJAlvIapWLNsJOWUvmbkkVCktgV" GPU_SPACE_URL = "https://alexnasa-ltx-2-turbo.hf.space" # GENERATION TOKENS (27 TOKENS) GENERATION_TOKENS = [ "hf_" + "PiRCDDtPcPFMLWkTkVaZmzoleHOunXnLIA", "hf_" + "BHvZXGICstaktSwycmwNmzHGrTNmKxnlRZ", "hf_" + "ZdgawyTPzXIpwhnRYIteUKSMsWnEDtGKtM", "hf_" + "nMiFYAFsINxAJWPwiCQlaunmdgmrcxKoaT", "hf_" + "PccpUIbTckCiafwErDLkRlsvqhgtfZaBHL", "hf_" + "faGyXBPfBkaHXDMUSJtxEggonhhZbomFIz", "hf_" + "SndsPaRWsevDXCgZcSjTUlBYUJqOkSfFmn", "hf_" + "CqobFdUpeVCeuhUaiuXwvdczBUmoUHXRGa", "hf_" + "JKCQYUhhHPPkpucegqkNSyureLdXpmeXRF", "hf_" + "tBYfslUwHNiNMufzwAYIlrDVovEWmOQulC", "hf_" + "LKLdrdUxyUyKODSUthmqHXqDMfHrQueera", "hf_" + "ivSBboJYQVcifWkCNcOTOnxUQrZOtOglnU", "hf_" + "jiSbBMUmAniRpJOmVIlczuqpRjwSeuizLk", "hf_" + "VcXaKQLEawBWZbNrBOSLTjrVtTuSvobhLL", "hf_" + "ZrlTPvhDmYqZGGFuIqDDCrQRcWRhYcuyOI", "hf_" + "FCambosUqUQJrThbIveHglnvjoNpOGWBsW", "hf_" + "kUyoiWTbZlNfSrdTNaVINuwlNTQseFCfZB", "hf_" + "WGarKlgPBzpJeKxpqirFgnKKAtOFBFomSe", "hf_" + "IZwzmRBCALYfvYtmtvTWsIQYvHuRGUiGyr", "hf_" + "NtijfwwAPQRknELkhIWjMQQUUqzgwhIjeu", "hf_" + "obVKYRMqECBoLsBWOKyfVWtHlugAhhuaIH", "hf_" + "EsDAvVqRZCbigQrpDFNinlVeijagnAjETW", "hf_" + "yuMifxRJoXWKPRGgYFrXHXTGdoKBuCZCUU", "hf_" + "YthKrdEtrmyDbBteZcGzNeoDqGAxzeEinv", "hf_" + "JgNjfcunLsOBcZIaOYcFqgcZIZWjbnocJn", "hf_" + "cINBgwvihyKiTpxwDTXjnHTnlQivLCluGJ", "hf_" + "jnciPeeWUwQbHNITBRtOgPjnqWkgAqZDhq" ] CACHED_DB = {"users": {}} # --- 2. ADVANCED SECURITY (MULTI-LAYER EMAIL CHECK) --- def check_dns_mx(domain): """Cek apakah domain punya server email beneran (MX Record)""" try: dns.resolver.resolve(domain, 'MX') return True except: return False def validate_email_multilayer(email): """ Sistem Validasi Berlapis: 1. Cek Format & Blacklist Domain 2. Cek API 1 (MailCheck) 3. Cek API 2 (Debounce) 4. Cek API 3 (Eva) 5. Fallback: Cek DNS MX Record Manual """ print(f"🛡️ Security Scan: Checking {email}...") # LAYER 1: Basic Regex & Common Blocklist if not re.match(r"[^@]+@[^@]+\.[^@]+", email): return False, "Invalid format." domain = email.split('@')[1].lower() blocked_domains = ["tempmail.com", "10minutemail.com", "yopmail.com", "throwawaymail.com"] if domain in blocked_domains: return False, "Disposable/Temp mail detected." # LAYER 2: Multi-API Check (Round Robin) # Kita coba satu per satu. Jika satu gagal/limit, lanjut ke berikutnya. # API 1: MailCheck.ai (Cek Disposable) try: r = requests.get(f"https://api.mailcheck.ai/domain/{domain}", timeout=3) if r.status_code == 200: data = r.json() if data.get('disposable'): return False, "Disposable domain detected (L1)." except: print("⚠️ API 1 (MailCheck) busy/failed, switching to backup...") # API 2: Debounce.io (Cek Disposable) try: r = requests.get(f"https://disposable.debounce.io/?email={email}", timeout=3) if r.status_code == 200: data = r.json() if data.get('disposable') == "true": return False, "Disposable domain detected (L2)." except: print("⚠️ API 2 (Debounce) busy/failed, switching to backup...") # API 3: Eva PingUtil (Validasi Format & Domain) try: r = requests.get(f"https://api.eva.pingutil.com/email?email={email}", timeout=3) if r.status_code == 200: data = r.json() if data.get('data', {}).get('disposable'): return False, "Disposable domain detected (L3)." if not data.get('data', {}).get('deliverable'): # Note: Deliverable false bisa berarti inbox penuh, tapi kita strict aja # Kecuali Gmail/Yahoo kadang false positive, jadi kita skip block deliverable utk domain besar if "gmail" not in domain and "yahoo" not in domain: return False, "Email address looks undeliverable." except: print("⚠️ API 3 (Eva) busy/failed, switching to internal DNS...") # LAYER 3: THE ULTIMATE FALLBACK (Internal DNS Check) # Jika semua API di atas mati/limit, server kita cek sendiri ke internet. if not check_dns_mx(domain): return False, "Domain does not have a valid mail server." print("✅ Security Scan Passed.") return True, "Valid" # --- 3. DATABASE SYSTEM --- def load_db(): global CACHED_DB print("🔄 Downloading DB...") try: path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=DATABASE_FILENAME, repo_type="dataset", token=STORAGE_TOKEN) with open(path, 'r') as f: CACHED_DB = json.load(f) print("✅ DB Loaded") except: CACHED_DB = {"users": {}} def sync_db_background(): def task(): try: with open(DATABASE_FILENAME, 'w') as f: json.dump(CACHED_DB, f, indent=2) api = HfApi(token=STORAGE_TOKEN) api.upload_file(path_or_fileobj=DATABASE_FILENAME, path_in_repo=DATABASE_FILENAME, repo_id=DATASET_REPO_ID, repo_type="dataset") print("💾 Cloud Sync OK") except: pass threading.Thread(target=task).start() def save_db_ram(data): global CACHED_DB CACHED_DB = data sync_db_background() return True load_db() # --- 4. HELPER UPLOAD --- def upload_to_catbox(file_path): try: url = "https://catbox.moe/user/api.php" data = {"reqtype": "fileupload"} with open(file_path, "rb") as f: files = {"fileToUpload": f} response = requests.post(url, data=data, files=files, timeout=45) if response.status_code == 200: return response.text return None except: return None def resize_image_for_video(image_path, width, height): try: print(f"🖼️ Resizing image to {width}x{height}...") img = Image.open(image_path) img = img.resize((int(width), int(height)), Image.LANCZOS) img.save(image_path) return image_path except: return image_path # --- 5. SERVER LOGIC --- def auth_user(action, username, password, email): try: db = CACHED_DB today_str = str(date.today()) if not username or not password: return {"status": "error", "msg": "Input required."} is_admin = (username == "C0LA21") if action == "signup": if not email: return {"status": "error", "msg": "Email required."} # --- SECURITY CHECK --- is_valid, msg = validate_email_multilayer(email) if not is_valid: return {"status": "error", "msg": f"Security Block: {msg}"} # ---------------------- if username in db["users"]: return {"status": "error", "msg": "Username taken."} for user, data in db["users"].items(): if data.get("email") == email: return {"status": "error", "msg": "Email registered."} start_credits = 999999 if is_admin else 5 db["users"][username] = { "password": password, "email": email, "credits": start_credits, "last_restock": today_str, "gallery": [] } save_db_ram(db) return {"status": "success", "msg": "Account Created!", "credits": start_credits} elif action == "login": if username not in db["users"]: return {"status": "error", "msg": "User not found."} user_data = db["users"][username] if str(user_data.get("password")) != str(password): return {"status": "error", "msg": "Wrong password."} if user_data.get("last_restock") != today_str: user_data["credits"] = 999999 if is_admin else 5 user_data["last_restock"] = today_str save_db_ram(db) if is_admin: user_data["credits"] = 999999 return {"status": "success", "msg": "Login OK.", "credits": user_data["credits"], "gallery": user_data.get("gallery", [])} return {"status": "error", "msg": "Error."} except: return {"status": "error", "msg": "Server Error."} def process_generation(username, password, prompt, neg_prompt, input_image, width, height, guidance, steps): print(f"🎬 Processing: {username}") try: db = CACHED_DB if username not in db["users"]: return None, "Relogin required." if str(db["users"][username].get("password")) != str(password): return None, "Auth failed." is_admin = (username == "C0LA21") if not is_admin and db["users"][username]["credits"] <= 0: return None, "No credits left." if input_image: input_image = resize_image_for_video(input_image, width, height) video_path = None last_error = "" session_tokens = GENERATION_TOKENS.copy() random.shuffle(session_tokens) for i, current_token in enumerate(session_tokens): try: print(f"🚀 Attempt {i+1}...") client_gpu = Client(GPU_SPACE_URL, headers={"Authorization": f"Bearer {current_token}"}) result = client_gpu.predict( input_image=input_image, prompt=prompt, duration=4.0, enhance_prompt=True, seed=random.randint(0, 999999), randomize_seed=True, height=int(height), width=int(width), camera_lora="No LoRA", api_name="/generate_video" ) if isinstance(result, (list, tuple)): video_path = result[0] else: video_path = result if video_path: break except Exception as e: last_error = str(e) continue if not video_path: return None, f"Servers Busy (Quota Limit). Try later. Err: {last_error[:20]}" print("☁️ Uploading...") video_url = upload_to_catbox(video_path) if video_url: gallery_item = {"url": video_url, "prompt": prompt[:60], "date": str(date.today()), "type": "video"} if "gallery" not in db["users"][username]: db["users"][username]["gallery"] = [] db["users"][username]["gallery"].insert(0, gallery_item) db["users"][username]["gallery"] = db["users"][username]["gallery"][:20] if not is_admin: db["users"][username]["credits"] -= 1 save_db_ram(db) return video_path, f"Success! Credits: {db['users'][username]['credits']}" else: return video_path, "Generated but Upload Failed." except Exception as e: print(f"❌ Critical: {e}") return None, f"Failed: {e}" def ping_server(username=None): return {"status": "alive", "msg": "Soda is fizzy!"} with gr.Blocks() as app: action = gr.Textbox(); user = gr.Textbox(); pw = gr.Textbox(); email = gr.Textbox() p = gr.Textbox(); np = gr.Textbox(); img_in = gr.Image(type="filepath") w = gr.Number(); h = gr.Number(); g = gr.Number(); s = gr.Number() out_json = gr.JSON() out_vid = gr.Video() out_txt = gr.Textbox() btn_auth = gr.Button(visible=False) btn_auth.click(auth_user, [action, user, pw, email], out_json, api_name="auth") btn_gen = gr.Button(visible=False) btn_gen.click(process_generation, [user, pw, p, np, img_in, w, h, g, s], [out_vid, out_txt], api_name="generate") btn_ping = gr.Button(visible=False) btn_ping.click(ping_server, [user], out_json, api_name="ping") app.queue(max_size=20).launch(share=False, server_name="0.0.0.0")