T2v-backend / app.py
Bl4ckSpaces's picture
Update app.py
66f9d4b verified
import gradio as gr
from gradio_client import Client
import requests
import json
import random
import time
import threading
import os
import re
import dns.resolver # Library DNS untuk cek domain asli
from datetime import datetime, date
from huggingface_hub import HfApi, hf_hub_download
from PIL import Image
# --- 1. CONFIGURATION ---
# DATASET DATABASE
DATASET_REPO_ID = "Bl4ckSpaces/soda-data"
DATABASE_FILENAME = "soda_users.json"
# TOKEN
STORAGE_TOKEN = "hf_" + "gcQpArWmJAlvIapWLNsJOWUvmbkkVCktgV"
GPU_SPACE_URL = "https://alexnasa-ltx-2-turbo.hf.space"
# GENERATION TOKENS (27 TOKENS)
GENERATION_TOKENS = [
"hf_" + "PiRCDDtPcPFMLWkTkVaZmzoleHOunXnLIA", "hf_" + "BHvZXGICstaktSwycmwNmzHGrTNmKxnlRZ",
"hf_" + "ZdgawyTPzXIpwhnRYIteUKSMsWnEDtGKtM", "hf_" + "nMiFYAFsINxAJWPwiCQlaunmdgmrcxKoaT",
"hf_" + "PccpUIbTckCiafwErDLkRlsvqhgtfZaBHL", "hf_" + "faGyXBPfBkaHXDMUSJtxEggonhhZbomFIz",
"hf_" + "SndsPaRWsevDXCgZcSjTUlBYUJqOkSfFmn", "hf_" + "CqobFdUpeVCeuhUaiuXwvdczBUmoUHXRGa",
"hf_" + "JKCQYUhhHPPkpucegqkNSyureLdXpmeXRF", "hf_" + "tBYfslUwHNiNMufzwAYIlrDVovEWmOQulC",
"hf_" + "LKLdrdUxyUyKODSUthmqHXqDMfHrQueera", "hf_" + "ivSBboJYQVcifWkCNcOTOnxUQrZOtOglnU",
"hf_" + "jiSbBMUmAniRpJOmVIlczuqpRjwSeuizLk", "hf_" + "VcXaKQLEawBWZbNrBOSLTjrVtTuSvobhLL",
"hf_" + "ZrlTPvhDmYqZGGFuIqDDCrQRcWRhYcuyOI", "hf_" + "FCambosUqUQJrThbIveHglnvjoNpOGWBsW",
"hf_" + "kUyoiWTbZlNfSrdTNaVINuwlNTQseFCfZB", "hf_" + "WGarKlgPBzpJeKxpqirFgnKKAtOFBFomSe",
"hf_" + "IZwzmRBCALYfvYtmtvTWsIQYvHuRGUiGyr",
"hf_" + "NtijfwwAPQRknELkhIWjMQQUUqzgwhIjeu", "hf_" + "obVKYRMqECBoLsBWOKyfVWtHlugAhhuaIH",
"hf_" + "EsDAvVqRZCbigQrpDFNinlVeijagnAjETW", "hf_" + "yuMifxRJoXWKPRGgYFrXHXTGdoKBuCZCUU",
"hf_" + "YthKrdEtrmyDbBteZcGzNeoDqGAxzeEinv", "hf_" + "JgNjfcunLsOBcZIaOYcFqgcZIZWjbnocJn",
"hf_" + "cINBgwvihyKiTpxwDTXjnHTnlQivLCluGJ", "hf_" + "jnciPeeWUwQbHNITBRtOgPjnqWkgAqZDhq"
]
CACHED_DB = {"users": {}}
# --- 2. ADVANCED SECURITY (MULTI-LAYER EMAIL CHECK) ---
def check_dns_mx(domain):
"""Cek apakah domain punya server email beneran (MX Record)"""
try:
dns.resolver.resolve(domain, 'MX')
return True
except:
return False
def validate_email_multilayer(email):
"""
Sistem Validasi Berlapis:
1. Cek Format & Blacklist Domain
2. Cek API 1 (MailCheck)
3. Cek API 2 (Debounce)
4. Cek API 3 (Eva)
5. Fallback: Cek DNS MX Record Manual
"""
print(f"🛡️ Security Scan: Checking {email}...")
# LAYER 1: Basic Regex & Common Blocklist
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
return False, "Invalid format."
domain = email.split('@')[1].lower()
blocked_domains = ["tempmail.com", "10minutemail.com", "yopmail.com", "throwawaymail.com"]
if domain in blocked_domains:
return False, "Disposable/Temp mail detected."
# LAYER 2: Multi-API Check (Round Robin)
# Kita coba satu per satu. Jika satu gagal/limit, lanjut ke berikutnya.
# API 1: MailCheck.ai (Cek Disposable)
try:
r = requests.get(f"https://api.mailcheck.ai/domain/{domain}", timeout=3)
if r.status_code == 200:
data = r.json()
if data.get('disposable'): return False, "Disposable domain detected (L1)."
except:
print("⚠️ API 1 (MailCheck) busy/failed, switching to backup...")
# API 2: Debounce.io (Cek Disposable)
try:
r = requests.get(f"https://disposable.debounce.io/?email={email}", timeout=3)
if r.status_code == 200:
data = r.json()
if data.get('disposable') == "true": return False, "Disposable domain detected (L2)."
except:
print("⚠️ API 2 (Debounce) busy/failed, switching to backup...")
# API 3: Eva PingUtil (Validasi Format & Domain)
try:
r = requests.get(f"https://api.eva.pingutil.com/email?email={email}", timeout=3)
if r.status_code == 200:
data = r.json()
if data.get('data', {}).get('disposable'): return False, "Disposable domain detected (L3)."
if not data.get('data', {}).get('deliverable'):
# Note: Deliverable false bisa berarti inbox penuh, tapi kita strict aja
# Kecuali Gmail/Yahoo kadang false positive, jadi kita skip block deliverable utk domain besar
if "gmail" not in domain and "yahoo" not in domain:
return False, "Email address looks undeliverable."
except:
print("⚠️ API 3 (Eva) busy/failed, switching to internal DNS...")
# LAYER 3: THE ULTIMATE FALLBACK (Internal DNS Check)
# Jika semua API di atas mati/limit, server kita cek sendiri ke internet.
if not check_dns_mx(domain):
return False, "Domain does not have a valid mail server."
print("✅ Security Scan Passed.")
return True, "Valid"
# --- 3. DATABASE SYSTEM ---
def load_db():
global CACHED_DB
print("🔄 Downloading DB...")
try:
path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=DATABASE_FILENAME, repo_type="dataset", token=STORAGE_TOKEN)
with open(path, 'r') as f: CACHED_DB = json.load(f)
print("✅ DB Loaded")
except: CACHED_DB = {"users": {}}
def sync_db_background():
def task():
try:
with open(DATABASE_FILENAME, 'w') as f: json.dump(CACHED_DB, f, indent=2)
api = HfApi(token=STORAGE_TOKEN)
api.upload_file(path_or_fileobj=DATABASE_FILENAME, path_in_repo=DATABASE_FILENAME, repo_id=DATASET_REPO_ID, repo_type="dataset")
print("💾 Cloud Sync OK")
except: pass
threading.Thread(target=task).start()
def save_db_ram(data):
global CACHED_DB
CACHED_DB = data
sync_db_background()
return True
load_db()
# --- 4. HELPER UPLOAD ---
def upload_to_catbox(file_path):
try:
url = "https://catbox.moe/user/api.php"
data = {"reqtype": "fileupload"}
with open(file_path, "rb") as f:
files = {"fileToUpload": f}
response = requests.post(url, data=data, files=files, timeout=45)
if response.status_code == 200: return response.text
return None
except: return None
def resize_image_for_video(image_path, width, height):
try:
print(f"🖼️ Resizing image to {width}x{height}...")
img = Image.open(image_path)
img = img.resize((int(width), int(height)), Image.LANCZOS)
img.save(image_path)
return image_path
except: return image_path
# --- 5. SERVER LOGIC ---
def auth_user(action, username, password, email):
try:
db = CACHED_DB
today_str = str(date.today())
if not username or not password: return {"status": "error", "msg": "Input required."}
is_admin = (username == "C0LA21")
if action == "signup":
if not email: return {"status": "error", "msg": "Email required."}
# --- SECURITY CHECK ---
is_valid, msg = validate_email_multilayer(email)
if not is_valid:
return {"status": "error", "msg": f"Security Block: {msg}"}
# ----------------------
if username in db["users"]: return {"status": "error", "msg": "Username taken."}
for user, data in db["users"].items():
if data.get("email") == email: return {"status": "error", "msg": "Email registered."}
start_credits = 999999 if is_admin else 5
db["users"][username] = {
"password": password, "email": email,
"credits": start_credits, "last_restock": today_str, "gallery": []
}
save_db_ram(db)
return {"status": "success", "msg": "Account Created!", "credits": start_credits}
elif action == "login":
if username not in db["users"]: return {"status": "error", "msg": "User not found."}
user_data = db["users"][username]
if str(user_data.get("password")) != str(password): return {"status": "error", "msg": "Wrong password."}
if user_data.get("last_restock") != today_str:
user_data["credits"] = 999999 if is_admin else 5
user_data["last_restock"] = today_str
save_db_ram(db)
if is_admin: user_data["credits"] = 999999
return {"status": "success", "msg": "Login OK.", "credits": user_data["credits"], "gallery": user_data.get("gallery", [])}
return {"status": "error", "msg": "Error."}
except: return {"status": "error", "msg": "Server Error."}
def process_generation(username, password, prompt, neg_prompt, input_image, width, height, guidance, steps):
print(f"🎬 Processing: {username}")
try:
db = CACHED_DB
if username not in db["users"]: return None, "Relogin required."
if str(db["users"][username].get("password")) != str(password): return None, "Auth failed."
is_admin = (username == "C0LA21")
if not is_admin and db["users"][username]["credits"] <= 0: return None, "No credits left."
if input_image:
input_image = resize_image_for_video(input_image, width, height)
video_path = None
last_error = ""
session_tokens = GENERATION_TOKENS.copy()
random.shuffle(session_tokens)
for i, current_token in enumerate(session_tokens):
try:
print(f"🚀 Attempt {i+1}...")
client_gpu = Client(GPU_SPACE_URL, headers={"Authorization": f"Bearer {current_token}"})
result = client_gpu.predict(
input_image=input_image, prompt=prompt, duration=4.0, enhance_prompt=True,
seed=random.randint(0, 999999), randomize_seed=True,
height=int(height), width=int(width), camera_lora="No LoRA",
api_name="/generate_video"
)
if isinstance(result, (list, tuple)): video_path = result[0]
else: video_path = result
if video_path: break
except Exception as e:
last_error = str(e)
continue
if not video_path:
return None, f"Servers Busy (Quota Limit). Try later. Err: {last_error[:20]}"
print("☁️ Uploading...")
video_url = upload_to_catbox(video_path)
if video_url:
gallery_item = {"url": video_url, "prompt": prompt[:60], "date": str(date.today()), "type": "video"}
if "gallery" not in db["users"][username]: db["users"][username]["gallery"] = []
db["users"][username]["gallery"].insert(0, gallery_item)
db["users"][username]["gallery"] = db["users"][username]["gallery"][:20]
if not is_admin: db["users"][username]["credits"] -= 1
save_db_ram(db)
return video_path, f"Success! Credits: {db['users'][username]['credits']}"
else:
return video_path, "Generated but Upload Failed."
except Exception as e:
print(f"❌ Critical: {e}")
return None, f"Failed: {e}"
def ping_server(username=None):
return {"status": "alive", "msg": "Soda is fizzy!"}
with gr.Blocks() as app:
action = gr.Textbox(); user = gr.Textbox(); pw = gr.Textbox(); email = gr.Textbox()
p = gr.Textbox(); np = gr.Textbox(); img_in = gr.Image(type="filepath")
w = gr.Number(); h = gr.Number(); g = gr.Number(); s = gr.Number()
out_json = gr.JSON()
out_vid = gr.Video()
out_txt = gr.Textbox()
btn_auth = gr.Button(visible=False)
btn_auth.click(auth_user, [action, user, pw, email], out_json, api_name="auth")
btn_gen = gr.Button(visible=False)
btn_gen.click(process_generation, [user, pw, p, np, img_in, w, h, g, s], [out_vid, out_txt], api_name="generate")
btn_ping = gr.Button(visible=False)
btn_ping.click(ping_server, [user], out_json, api_name="ping")
app.queue(max_size=20).launch(share=False, server_name="0.0.0.0")