Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import time | |
| import threading | |
| import os | |
| import uuid | |
| from datetime import datetime, date | |
| from huggingface_hub import HfApi | |
| # --- KONFIGURASI --- | |
| # Token Write milik Space INI (Database) | |
| HF_TOKEN = "hf_" + "gcQpArWmJAlvIapWLNsJOWUvmbkkVCktgV" | |
| # ID Space ini (otomatis didapat) | |
| REPO_ID = os.getenv("HF_REPO_ID") | |
| DB_FILE = "soda_db.json" | |
| LOG_FILE = "soda_logs.json" | |
| # --- RAM STORAGE --- | |
| RAM_DB = {"users": {}} | |
| RAM_LOGS = [] | |
| # --- DATABASE ENGINE --- | |
| def load_from_storage(): | |
| global RAM_DB, RAM_LOGS | |
| print("π Booting Database Server...") | |
| if os.path.exists(DB_FILE): | |
| try: | |
| with open(DB_FILE, 'r') as f: RAM_DB = json.load(f) | |
| print("β Users Loaded") | |
| except: RAM_DB = {"users": {}} | |
| else: RAM_DB = {"users": {}} | |
| if os.path.exists(LOG_FILE): | |
| try: | |
| with open(LOG_FILE, 'r') as f: RAM_LOGS = json.load(f) | |
| print("β Logs Loaded") | |
| except: RAM_LOGS = [] | |
| else: RAM_LOGS = [] | |
| def save_to_storage_task(): | |
| while True: | |
| time.sleep(3600) # Save tiap 1 jam | |
| print("πΎ Saving Data...") | |
| try: | |
| with open(DB_FILE, 'w') as f: json.dump(RAM_DB, f, indent=2) | |
| with open(LOG_FILE, 'w') as f: json.dump(RAM_LOGS, f, indent=2) | |
| if REPO_ID: | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_file(path_or_fileobj=DB_FILE, path_in_repo=DB_FILE, repo_id=REPO_ID, repo_type="space") | |
| api.upload_file(path_or_fileobj=LOG_FILE, path_in_repo=LOG_FILE, repo_id=REPO_ID, repo_type="space") | |
| print("β Cloud Sync OK") | |
| except Exception as e: print(f"β Save Failed: {e}") | |
| load_from_storage() | |
| threading.Thread(target=save_to_storage_task, daemon=True).start() | |
| # --- API ENDPOINTS --- | |
| def api_auth(action, username, password, email): | |
| try: | |
| today_str = str(date.today()) | |
| is_admin = (username == "C0LA21") | |
| if action == "signup": | |
| if username in RAM_DB["users"]: return {"status": "error", "msg": "Username Taken"} | |
| for u, d in RAM_DB["users"].items(): | |
| if d.get("email") == email: return {"status": "error", "msg": "Email Used"} | |
| start_credits = 999999 if is_admin else 5 | |
| RAM_DB["users"][username] = { | |
| "password": password, | |
| "email": email, | |
| "credits": start_credits, | |
| "last_restock": today_str, | |
| "gallery": [] | |
| } | |
| RAM_LOGS.insert(0, {"time": str(datetime.now()), "type": "SIGNUP", "user": username}) | |
| return {"status": "success", "msg": "Created", "credits": start_credits} | |
| elif action == "login": | |
| if username not in RAM_DB["users"]: return {"status": "error", "msg": "User not found"} | |
| user_data = RAM_DB["users"][username] | |
| if str(user_data.get("password")) != str(password): return {"status": "error", "msg": "Wrong pass"} | |
| if user_data.get("last_restock") != today_str: | |
| user_data["credits"] = 999999 if is_admin else 5 | |
| user_data["last_restock"] = today_str | |
| if is_admin: user_data["credits"] = 999999 | |
| # Return gallery untuk frontend | |
| user_gallery = user_data.get("gallery", []) | |
| return {"status": "success", "msg": "OK", "credits": user_data["credits"], "gallery": user_gallery} | |
| return {"status": "error", "msg": "Invalid Action"} | |
| except Exception as e: return {"status": "error", "msg": str(e)} | |
| def api_deduct(username, password): | |
| if username not in RAM_DB["users"]: return {"allow": False, "msg": "Relogin"} | |
| user_data = RAM_DB["users"][username] | |
| if str(user_data.get("password")) != str(password): return {"allow": False, "msg": "Auth Fail"} | |
| if username == "C0LA21": return {"allow": True, "credits": 999999} | |
| if user_data["credits"] > 0: | |
| user_data["credits"] -= 1 | |
| return {"allow": True, "credits": user_data["credits"]} | |
| else: | |
| return {"allow": False, "msg": "No Credits"} | |
| def api_save_video(video_path, username, prompt): | |
| """ | |
| ENDPOINT VITAL: Menerima file dari Server Utama, Upload ke Repo DB, Simpan ke History. | |
| """ | |
| try: | |
| if not REPO_ID: return {"status": "error", "msg": "DB Repo ID Missing"} | |
| # 1. Upload Video ke Repo Space INI (Database Space) | |
| api = HfApi(token=HF_TOKEN) | |
| filename = f"{datetime.now().strftime('%Y%m%d')}_{str(uuid.uuid4())[:8]}.mp4" | |
| path_in_repo = f"videos/{username}/{filename}" | |
| print(f"βοΈ Storing Video from Main Server: {path_in_repo}") | |
| api.upload_file( | |
| path_or_fileobj=video_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=REPO_ID, | |
| repo_type="space" | |
| ) | |
| # 2. Buat Public URL | |
| public_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/{path_in_repo}" | |
| # 3. Simpan ke Gallery User di RAM | |
| if username in RAM_DB["users"]: | |
| gallery_item = { | |
| "url": public_url, | |
| "prompt": prompt[:50] + "...", | |
| "date": str(date.today()), | |
| "type": "video" | |
| } | |
| if "gallery" not in RAM_DB["users"][username]: RAM_DB["users"][username]["gallery"] = [] | |
| RAM_DB["users"][username]["gallery"].insert(0, gallery_item) | |
| RAM_DB["users"][username]["gallery"] = RAM_DB["users"][username]["gallery"][:15] | |
| # 4. Catat Log | |
| entry = { | |
| "time": str(datetime.now()), | |
| "type": "GENERATE", | |
| "user": username, | |
| "details": {"url": public_url} | |
| } | |
| RAM_LOGS.insert(0, entry) | |
| if len(RAM_LOGS) > 200: RAM_LOGS.pop() | |
| return {"status": "success", "url": public_url} | |
| except Exception as e: | |
| print(f"β Save Video Error: {e}") | |
| return {"status": "error", "msg": str(e)} | |
| def api_get_logs(): | |
| return RAM_LOGS[:100] | |
| # --- UI (API ONLY) --- | |
| with gr.Blocks() as app: | |
| gr.Markdown("### SODA DATABASE & STORAGE SERVER") | |
| # Inputs | |
| action = gr.Textbox(label="Action") | |
| user = gr.Textbox(label="User") | |
| pw = gr.Textbox(label="Pass") | |
| email = gr.Textbox(label="Email") | |
| # Inputs for Video Save (Terima File) | |
| vid_file = gr.Video(label="Video File") | |
| prompt_txt = gr.Textbox(label="Prompt") | |
| out_json = gr.JSON() | |
| # API Routes | |
| btn_auth = gr.Button("Auth") | |
| btn_auth.click(api_auth, [action, user, pw, email], out_json, api_name="auth") | |
| btn_deduct = gr.Button("Deduct") | |
| btn_deduct.click(api_deduct, [user, pw], out_json, api_name="deduct") | |
| # ENDPOINT KHUSUS: Main Server mengirim file ke sini | |
| btn_save = gr.Button("Save Video") | |
| btn_save.click(api_save_video, [vid_file, user, prompt_txt], out_json, api_name="save_video") | |
| btn_get_logs = gr.Button("Get Logs") | |
| btn_get_logs.click(api_get_logs, None, out_json, api_name="get_logs") | |
| app.queue().launch(server_name="0.0.0.0") |