import gradio as gr import json import time import threading import os import uuid from datetime import datetime, date from huggingface_hub import HfApi # --- KONFIGURASI --- # Token Write milik Space INI (Database) HF_TOKEN = "hf_" + "gcQpArWmJAlvIapWLNsJOWUvmbkkVCktgV" # ID Space ini (otomatis didapat) REPO_ID = os.getenv("HF_REPO_ID") DB_FILE = "soda_db.json" LOG_FILE = "soda_logs.json" # --- RAM STORAGE --- RAM_DB = {"users": {}} RAM_LOGS = [] # --- DATABASE ENGINE --- def load_from_storage(): global RAM_DB, RAM_LOGS print("🔄 Booting Database Server...") if os.path.exists(DB_FILE): try: with open(DB_FILE, 'r') as f: RAM_DB = json.load(f) print("✅ Users Loaded") except: RAM_DB = {"users": {}} else: RAM_DB = {"users": {}} if os.path.exists(LOG_FILE): try: with open(LOG_FILE, 'r') as f: RAM_LOGS = json.load(f) print("✅ Logs Loaded") except: RAM_LOGS = [] else: RAM_LOGS = [] def save_to_storage_task(): while True: time.sleep(3600) # Save tiap 1 jam print("💾 Saving Data...") try: with open(DB_FILE, 'w') as f: json.dump(RAM_DB, f, indent=2) with open(LOG_FILE, 'w') as f: json.dump(RAM_LOGS, f, indent=2) if REPO_ID: api = HfApi(token=HF_TOKEN) api.upload_file(path_or_fileobj=DB_FILE, path_in_repo=DB_FILE, repo_id=REPO_ID, repo_type="space") api.upload_file(path_or_fileobj=LOG_FILE, path_in_repo=LOG_FILE, repo_id=REPO_ID, repo_type="space") print("✅ Cloud Sync OK") except Exception as e: print(f"❌ Save Failed: {e}") load_from_storage() threading.Thread(target=save_to_storage_task, daemon=True).start() # --- API ENDPOINTS --- def api_auth(action, username, password, email): try: today_str = str(date.today()) is_admin = (username == "C0LA21") if action == "signup": if username in RAM_DB["users"]: return {"status": "error", "msg": "Username Taken"} for u, d in RAM_DB["users"].items(): if d.get("email") == email: return {"status": "error", "msg": "Email Used"} start_credits = 999999 if is_admin else 5 RAM_DB["users"][username] = { "password": password, "email": email, "credits": start_credits, "last_restock": today_str, "gallery": [] } RAM_LOGS.insert(0, {"time": str(datetime.now()), "type": "SIGNUP", "user": username}) return {"status": "success", "msg": "Created", "credits": start_credits} elif action == "login": if username not in RAM_DB["users"]: return {"status": "error", "msg": "User not found"} user_data = RAM_DB["users"][username] if str(user_data.get("password")) != str(password): return {"status": "error", "msg": "Wrong pass"} if user_data.get("last_restock") != today_str: user_data["credits"] = 999999 if is_admin else 5 user_data["last_restock"] = today_str if is_admin: user_data["credits"] = 999999 # Return gallery untuk frontend user_gallery = user_data.get("gallery", []) return {"status": "success", "msg": "OK", "credits": user_data["credits"], "gallery": user_gallery} return {"status": "error", "msg": "Invalid Action"} except Exception as e: return {"status": "error", "msg": str(e)} def api_deduct(username, password): if username not in RAM_DB["users"]: return {"allow": False, "msg": "Relogin"} user_data = RAM_DB["users"][username] if str(user_data.get("password")) != str(password): return {"allow": False, "msg": "Auth Fail"} if username == "C0LA21": return {"allow": True, "credits": 999999} if user_data["credits"] > 0: user_data["credits"] -= 1 return {"allow": True, "credits": user_data["credits"]} else: return {"allow": False, "msg": "No Credits"} def api_save_video(video_path, username, prompt): """ ENDPOINT VITAL: Menerima file dari Server Utama, Upload ke Repo DB, Simpan ke History. """ try: if not REPO_ID: return {"status": "error", "msg": "DB Repo ID Missing"} # 1. Upload Video ke Repo Space INI (Database Space) api = HfApi(token=HF_TOKEN) filename = f"{datetime.now().strftime('%Y%m%d')}_{str(uuid.uuid4())[:8]}.mp4" path_in_repo = f"videos/{username}/{filename}" print(f"☁️ Storing Video from Main Server: {path_in_repo}") api.upload_file( path_or_fileobj=video_path, path_in_repo=path_in_repo, repo_id=REPO_ID, repo_type="space" ) # 2. Buat Public URL public_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/{path_in_repo}" # 3. Simpan ke Gallery User di RAM if username in RAM_DB["users"]: gallery_item = { "url": public_url, "prompt": prompt[:50] + "...", "date": str(date.today()), "type": "video" } if "gallery" not in RAM_DB["users"][username]: RAM_DB["users"][username]["gallery"] = [] RAM_DB["users"][username]["gallery"].insert(0, gallery_item) RAM_DB["users"][username]["gallery"] = RAM_DB["users"][username]["gallery"][:15] # 4. Catat Log entry = { "time": str(datetime.now()), "type": "GENERATE", "user": username, "details": {"url": public_url} } RAM_LOGS.insert(0, entry) if len(RAM_LOGS) > 200: RAM_LOGS.pop() return {"status": "success", "url": public_url} except Exception as e: print(f"❌ Save Video Error: {e}") return {"status": "error", "msg": str(e)} def api_get_logs(): return RAM_LOGS[:100] # --- UI (API ONLY) --- with gr.Blocks() as app: gr.Markdown("### SODA DATABASE & STORAGE SERVER") # Inputs action = gr.Textbox(label="Action") user = gr.Textbox(label="User") pw = gr.Textbox(label="Pass") email = gr.Textbox(label="Email") # Inputs for Video Save (Terima File) vid_file = gr.Video(label="Video File") prompt_txt = gr.Textbox(label="Prompt") out_json = gr.JSON() # API Routes btn_auth = gr.Button("Auth") btn_auth.click(api_auth, [action, user, pw, email], out_json, api_name="auth") btn_deduct = gr.Button("Deduct") btn_deduct.click(api_deduct, [user, pw], out_json, api_name="deduct") # ENDPOINT KHUSUS: Main Server mengirim file ke sini btn_save = gr.Button("Save Video") btn_save.click(api_save_video, [vid_file, user, prompt_txt], out_json, api_name="save_video") btn_get_logs = gr.Button("Get Logs") btn_get_logs.click(api_get_logs, None, out_json, api_name="get_logs") app.queue().launch(server_name="0.0.0.0")