Spaces:
Running
Running
| import gradio as gr | |
| from gradio_client import Client | |
| import requests | |
| import json | |
| import random | |
| import time | |
| import threading | |
| import os | |
| import re | |
| import dns.resolver # Library DNS untuk cek domain asli | |
| from datetime import datetime, date | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from PIL import Image | |
| # --- 1. CONFIGURATION --- | |
| # DATASET DATABASE | |
| DATASET_REPO_ID = "Bl4ckSpaces/soda-data" | |
| DATABASE_FILENAME = "soda_users.json" | |
| # TOKEN | |
| STORAGE_TOKEN = "hf_" + "gcQpArWmJAlvIapWLNsJOWUvmbkkVCktgV" | |
| GPU_SPACE_URL = "https://alexnasa-ltx-2-turbo.hf.space" | |
| # GENERATION TOKENS (27 TOKENS) | |
| GENERATION_TOKENS = [ | |
| "hf_" + "PiRCDDtPcPFMLWkTkVaZmzoleHOunXnLIA", "hf_" + "BHvZXGICstaktSwycmwNmzHGrTNmKxnlRZ", | |
| "hf_" + "ZdgawyTPzXIpwhnRYIteUKSMsWnEDtGKtM", "hf_" + "nMiFYAFsINxAJWPwiCQlaunmdgmrcxKoaT", | |
| "hf_" + "PccpUIbTckCiafwErDLkRlsvqhgtfZaBHL", "hf_" + "faGyXBPfBkaHXDMUSJtxEggonhhZbomFIz", | |
| "hf_" + "SndsPaRWsevDXCgZcSjTUlBYUJqOkSfFmn", "hf_" + "CqobFdUpeVCeuhUaiuXwvdczBUmoUHXRGa", | |
| "hf_" + "JKCQYUhhHPPkpucegqkNSyureLdXpmeXRF", "hf_" + "tBYfslUwHNiNMufzwAYIlrDVovEWmOQulC", | |
| "hf_" + "LKLdrdUxyUyKODSUthmqHXqDMfHrQueera", "hf_" + "ivSBboJYQVcifWkCNcOTOnxUQrZOtOglnU", | |
| "hf_" + "jiSbBMUmAniRpJOmVIlczuqpRjwSeuizLk", "hf_" + "VcXaKQLEawBWZbNrBOSLTjrVtTuSvobhLL", | |
| "hf_" + "ZrlTPvhDmYqZGGFuIqDDCrQRcWRhYcuyOI", "hf_" + "FCambosUqUQJrThbIveHglnvjoNpOGWBsW", | |
| "hf_" + "kUyoiWTbZlNfSrdTNaVINuwlNTQseFCfZB", "hf_" + "WGarKlgPBzpJeKxpqirFgnKKAtOFBFomSe", | |
| "hf_" + "IZwzmRBCALYfvYtmtvTWsIQYvHuRGUiGyr", | |
| "hf_" + "NtijfwwAPQRknELkhIWjMQQUUqzgwhIjeu", "hf_" + "obVKYRMqECBoLsBWOKyfVWtHlugAhhuaIH", | |
| "hf_" + "EsDAvVqRZCbigQrpDFNinlVeijagnAjETW", "hf_" + "yuMifxRJoXWKPRGgYFrXHXTGdoKBuCZCUU", | |
| "hf_" + "YthKrdEtrmyDbBteZcGzNeoDqGAxzeEinv", "hf_" + "JgNjfcunLsOBcZIaOYcFqgcZIZWjbnocJn", | |
| "hf_" + "cINBgwvihyKiTpxwDTXjnHTnlQivLCluGJ", "hf_" + "jnciPeeWUwQbHNITBRtOgPjnqWkgAqZDhq" | |
| ] | |
| CACHED_DB = {"users": {}} | |
| # --- 2. ADVANCED SECURITY (MULTI-LAYER EMAIL CHECK) --- | |
| def check_dns_mx(domain): | |
| """Cek apakah domain punya server email beneran (MX Record)""" | |
| try: | |
| dns.resolver.resolve(domain, 'MX') | |
| return True | |
| except: | |
| return False | |
| def validate_email_multilayer(email): | |
| """ | |
| Sistem Validasi Berlapis: | |
| 1. Cek Format & Blacklist Domain | |
| 2. Cek API 1 (MailCheck) | |
| 3. Cek API 2 (Debounce) | |
| 4. Cek API 3 (Eva) | |
| 5. Fallback: Cek DNS MX Record Manual | |
| """ | |
| print(f"🛡️ Security Scan: Checking {email}...") | |
| # LAYER 1: Basic Regex & Common Blocklist | |
| if not re.match(r"[^@]+@[^@]+\.[^@]+", email): | |
| return False, "Invalid format." | |
| domain = email.split('@')[1].lower() | |
| blocked_domains = ["tempmail.com", "10minutemail.com", "yopmail.com", "throwawaymail.com"] | |
| if domain in blocked_domains: | |
| return False, "Disposable/Temp mail detected." | |
| # LAYER 2: Multi-API Check (Round Robin) | |
| # Kita coba satu per satu. Jika satu gagal/limit, lanjut ke berikutnya. | |
| # API 1: MailCheck.ai (Cek Disposable) | |
| try: | |
| r = requests.get(f"https://api.mailcheck.ai/domain/{domain}", timeout=3) | |
| if r.status_code == 200: | |
| data = r.json() | |
| if data.get('disposable'): return False, "Disposable domain detected (L1)." | |
| except: | |
| print("⚠️ API 1 (MailCheck) busy/failed, switching to backup...") | |
| # API 2: Debounce.io (Cek Disposable) | |
| try: | |
| r = requests.get(f"https://disposable.debounce.io/?email={email}", timeout=3) | |
| if r.status_code == 200: | |
| data = r.json() | |
| if data.get('disposable') == "true": return False, "Disposable domain detected (L2)." | |
| except: | |
| print("⚠️ API 2 (Debounce) busy/failed, switching to backup...") | |
| # API 3: Eva PingUtil (Validasi Format & Domain) | |
| try: | |
| r = requests.get(f"https://api.eva.pingutil.com/email?email={email}", timeout=3) | |
| if r.status_code == 200: | |
| data = r.json() | |
| if data.get('data', {}).get('disposable'): return False, "Disposable domain detected (L3)." | |
| if not data.get('data', {}).get('deliverable'): | |
| # Note: Deliverable false bisa berarti inbox penuh, tapi kita strict aja | |
| # Kecuali Gmail/Yahoo kadang false positive, jadi kita skip block deliverable utk domain besar | |
| if "gmail" not in domain and "yahoo" not in domain: | |
| return False, "Email address looks undeliverable." | |
| except: | |
| print("⚠️ API 3 (Eva) busy/failed, switching to internal DNS...") | |
| # LAYER 3: THE ULTIMATE FALLBACK (Internal DNS Check) | |
| # Jika semua API di atas mati/limit, server kita cek sendiri ke internet. | |
| if not check_dns_mx(domain): | |
| return False, "Domain does not have a valid mail server." | |
| print("✅ Security Scan Passed.") | |
| return True, "Valid" | |
| # --- 3. DATABASE SYSTEM --- | |
| def load_db(): | |
| global CACHED_DB | |
| print("🔄 Downloading DB...") | |
| try: | |
| path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=DATABASE_FILENAME, repo_type="dataset", token=STORAGE_TOKEN) | |
| with open(path, 'r') as f: CACHED_DB = json.load(f) | |
| print("✅ DB Loaded") | |
| except: CACHED_DB = {"users": {}} | |
| def sync_db_background(): | |
| def task(): | |
| try: | |
| with open(DATABASE_FILENAME, 'w') as f: json.dump(CACHED_DB, f, indent=2) | |
| api = HfApi(token=STORAGE_TOKEN) | |
| api.upload_file(path_or_fileobj=DATABASE_FILENAME, path_in_repo=DATABASE_FILENAME, repo_id=DATASET_REPO_ID, repo_type="dataset") | |
| print("💾 Cloud Sync OK") | |
| except: pass | |
| threading.Thread(target=task).start() | |
| def save_db_ram(data): | |
| global CACHED_DB | |
| CACHED_DB = data | |
| sync_db_background() | |
| return True | |
| load_db() | |
| # --- 4. HELPER UPLOAD --- | |
| def upload_to_catbox(file_path): | |
| try: | |
| url = "https://catbox.moe/user/api.php" | |
| data = {"reqtype": "fileupload"} | |
| with open(file_path, "rb") as f: | |
| files = {"fileToUpload": f} | |
| response = requests.post(url, data=data, files=files, timeout=45) | |
| if response.status_code == 200: return response.text | |
| return None | |
| except: return None | |
| def resize_image_for_video(image_path, width, height): | |
| try: | |
| print(f"🖼️ Resizing image to {width}x{height}...") | |
| img = Image.open(image_path) | |
| img = img.resize((int(width), int(height)), Image.LANCZOS) | |
| img.save(image_path) | |
| return image_path | |
| except: return image_path | |
| # --- 5. SERVER LOGIC --- | |
| def auth_user(action, username, password, email): | |
| try: | |
| db = CACHED_DB | |
| today_str = str(date.today()) | |
| if not username or not password: return {"status": "error", "msg": "Input required."} | |
| is_admin = (username == "C0LA21") | |
| if action == "signup": | |
| if not email: return {"status": "error", "msg": "Email required."} | |
| # --- SECURITY CHECK --- | |
| is_valid, msg = validate_email_multilayer(email) | |
| if not is_valid: | |
| return {"status": "error", "msg": f"Security Block: {msg}"} | |
| # ---------------------- | |
| if username in db["users"]: return {"status": "error", "msg": "Username taken."} | |
| for user, data in db["users"].items(): | |
| if data.get("email") == email: return {"status": "error", "msg": "Email registered."} | |
| start_credits = 999999 if is_admin else 5 | |
| db["users"][username] = { | |
| "password": password, "email": email, | |
| "credits": start_credits, "last_restock": today_str, "gallery": [] | |
| } | |
| save_db_ram(db) | |
| return {"status": "success", "msg": "Account Created!", "credits": start_credits} | |
| elif action == "login": | |
| if username not in db["users"]: return {"status": "error", "msg": "User not found."} | |
| user_data = db["users"][username] | |
| if str(user_data.get("password")) != str(password): return {"status": "error", "msg": "Wrong password."} | |
| if user_data.get("last_restock") != today_str: | |
| user_data["credits"] = 999999 if is_admin else 5 | |
| user_data["last_restock"] = today_str | |
| save_db_ram(db) | |
| if is_admin: user_data["credits"] = 999999 | |
| return {"status": "success", "msg": "Login OK.", "credits": user_data["credits"], "gallery": user_data.get("gallery", [])} | |
| return {"status": "error", "msg": "Error."} | |
| except: return {"status": "error", "msg": "Server Error."} | |
| def process_generation(username, password, prompt, neg_prompt, input_image, width, height, guidance, steps): | |
| print(f"🎬 Processing: {username}") | |
| try: | |
| db = CACHED_DB | |
| if username not in db["users"]: return None, "Relogin required." | |
| if str(db["users"][username].get("password")) != str(password): return None, "Auth failed." | |
| is_admin = (username == "C0LA21") | |
| if not is_admin and db["users"][username]["credits"] <= 0: return None, "No credits left." | |
| if input_image: | |
| input_image = resize_image_for_video(input_image, width, height) | |
| video_path = None | |
| last_error = "" | |
| session_tokens = GENERATION_TOKENS.copy() | |
| random.shuffle(session_tokens) | |
| for i, current_token in enumerate(session_tokens): | |
| try: | |
| print(f"🚀 Attempt {i+1}...") | |
| client_gpu = Client(GPU_SPACE_URL, headers={"Authorization": f"Bearer {current_token}"}) | |
| result = client_gpu.predict( | |
| input_image=input_image, prompt=prompt, duration=4.0, enhance_prompt=True, | |
| seed=random.randint(0, 999999), randomize_seed=True, | |
| height=int(height), width=int(width), camera_lora="No LoRA", | |
| api_name="/generate_video" | |
| ) | |
| if isinstance(result, (list, tuple)): video_path = result[0] | |
| else: video_path = result | |
| if video_path: break | |
| except Exception as e: | |
| last_error = str(e) | |
| continue | |
| if not video_path: | |
| return None, f"Servers Busy (Quota Limit). Try later. Err: {last_error[:20]}" | |
| print("☁️ Uploading...") | |
| video_url = upload_to_catbox(video_path) | |
| if video_url: | |
| gallery_item = {"url": video_url, "prompt": prompt[:60], "date": str(date.today()), "type": "video"} | |
| if "gallery" not in db["users"][username]: db["users"][username]["gallery"] = [] | |
| db["users"][username]["gallery"].insert(0, gallery_item) | |
| db["users"][username]["gallery"] = db["users"][username]["gallery"][:20] | |
| if not is_admin: db["users"][username]["credits"] -= 1 | |
| save_db_ram(db) | |
| return video_path, f"Success! Credits: {db['users'][username]['credits']}" | |
| else: | |
| return video_path, "Generated but Upload Failed." | |
| except Exception as e: | |
| print(f"❌ Critical: {e}") | |
| return None, f"Failed: {e}" | |
| def ping_server(username=None): | |
| return {"status": "alive", "msg": "Soda is fizzy!"} | |
| with gr.Blocks() as app: | |
| action = gr.Textbox(); user = gr.Textbox(); pw = gr.Textbox(); email = gr.Textbox() | |
| p = gr.Textbox(); np = gr.Textbox(); img_in = gr.Image(type="filepath") | |
| w = gr.Number(); h = gr.Number(); g = gr.Number(); s = gr.Number() | |
| out_json = gr.JSON() | |
| out_vid = gr.Video() | |
| out_txt = gr.Textbox() | |
| btn_auth = gr.Button(visible=False) | |
| btn_auth.click(auth_user, [action, user, pw, email], out_json, api_name="auth") | |
| btn_gen = gr.Button(visible=False) | |
| btn_gen.click(process_generation, [user, pw, p, np, img_in, w, h, g, s], [out_vid, out_txt], api_name="generate") | |
| btn_ping = gr.Button(visible=False) | |
| btn_ping.click(ping_server, [user], out_json, api_name="ping") | |
| app.queue(max_size=20).launch(share=False, server_name="0.0.0.0") |