T2v-Database / app.py
Bl4ckSpaces's picture
Update app.py
f657a6d verified
import gradio as gr
import json
import time
import threading
import os
import uuid
from datetime import datetime, date
from huggingface_hub import HfApi
# --- KONFIGURASI ---
# Token Write milik Space INI (Database)
HF_TOKEN = "hf_" + "gcQpArWmJAlvIapWLNsJOWUvmbkkVCktgV"
# ID Space ini (otomatis didapat)
REPO_ID = os.getenv("HF_REPO_ID")
DB_FILE = "soda_db.json"
LOG_FILE = "soda_logs.json"
# --- RAM STORAGE ---
RAM_DB = {"users": {}}
RAM_LOGS = []
# --- DATABASE ENGINE ---
def load_from_storage():
global RAM_DB, RAM_LOGS
print("πŸ”„ Booting Database Server...")
if os.path.exists(DB_FILE):
try:
with open(DB_FILE, 'r') as f: RAM_DB = json.load(f)
print("βœ… Users Loaded")
except: RAM_DB = {"users": {}}
else: RAM_DB = {"users": {}}
if os.path.exists(LOG_FILE):
try:
with open(LOG_FILE, 'r') as f: RAM_LOGS = json.load(f)
print("βœ… Logs Loaded")
except: RAM_LOGS = []
else: RAM_LOGS = []
def save_to_storage_task():
while True:
time.sleep(3600) # Save tiap 1 jam
print("πŸ’Ύ Saving Data...")
try:
with open(DB_FILE, 'w') as f: json.dump(RAM_DB, f, indent=2)
with open(LOG_FILE, 'w') as f: json.dump(RAM_LOGS, f, indent=2)
if REPO_ID:
api = HfApi(token=HF_TOKEN)
api.upload_file(path_or_fileobj=DB_FILE, path_in_repo=DB_FILE, repo_id=REPO_ID, repo_type="space")
api.upload_file(path_or_fileobj=LOG_FILE, path_in_repo=LOG_FILE, repo_id=REPO_ID, repo_type="space")
print("βœ… Cloud Sync OK")
except Exception as e: print(f"❌ Save Failed: {e}")
load_from_storage()
threading.Thread(target=save_to_storage_task, daemon=True).start()
# --- API ENDPOINTS ---
def api_auth(action, username, password, email):
try:
today_str = str(date.today())
is_admin = (username == "C0LA21")
if action == "signup":
if username in RAM_DB["users"]: return {"status": "error", "msg": "Username Taken"}
for u, d in RAM_DB["users"].items():
if d.get("email") == email: return {"status": "error", "msg": "Email Used"}
start_credits = 999999 if is_admin else 5
RAM_DB["users"][username] = {
"password": password,
"email": email,
"credits": start_credits,
"last_restock": today_str,
"gallery": []
}
RAM_LOGS.insert(0, {"time": str(datetime.now()), "type": "SIGNUP", "user": username})
return {"status": "success", "msg": "Created", "credits": start_credits}
elif action == "login":
if username not in RAM_DB["users"]: return {"status": "error", "msg": "User not found"}
user_data = RAM_DB["users"][username]
if str(user_data.get("password")) != str(password): return {"status": "error", "msg": "Wrong pass"}
if user_data.get("last_restock") != today_str:
user_data["credits"] = 999999 if is_admin else 5
user_data["last_restock"] = today_str
if is_admin: user_data["credits"] = 999999
# Return gallery untuk frontend
user_gallery = user_data.get("gallery", [])
return {"status": "success", "msg": "OK", "credits": user_data["credits"], "gallery": user_gallery}
return {"status": "error", "msg": "Invalid Action"}
except Exception as e: return {"status": "error", "msg": str(e)}
def api_deduct(username, password):
if username not in RAM_DB["users"]: return {"allow": False, "msg": "Relogin"}
user_data = RAM_DB["users"][username]
if str(user_data.get("password")) != str(password): return {"allow": False, "msg": "Auth Fail"}
if username == "C0LA21": return {"allow": True, "credits": 999999}
if user_data["credits"] > 0:
user_data["credits"] -= 1
return {"allow": True, "credits": user_data["credits"]}
else:
return {"allow": False, "msg": "No Credits"}
def api_save_video(video_path, username, prompt):
"""
ENDPOINT VITAL: Menerima file dari Server Utama, Upload ke Repo DB, Simpan ke History.
"""
try:
if not REPO_ID: return {"status": "error", "msg": "DB Repo ID Missing"}
# 1. Upload Video ke Repo Space INI (Database Space)
api = HfApi(token=HF_TOKEN)
filename = f"{datetime.now().strftime('%Y%m%d')}_{str(uuid.uuid4())[:8]}.mp4"
path_in_repo = f"videos/{username}/{filename}"
print(f"☁️ Storing Video from Main Server: {path_in_repo}")
api.upload_file(
path_or_fileobj=video_path,
path_in_repo=path_in_repo,
repo_id=REPO_ID,
repo_type="space"
)
# 2. Buat Public URL
public_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/{path_in_repo}"
# 3. Simpan ke Gallery User di RAM
if username in RAM_DB["users"]:
gallery_item = {
"url": public_url,
"prompt": prompt[:50] + "...",
"date": str(date.today()),
"type": "video"
}
if "gallery" not in RAM_DB["users"][username]: RAM_DB["users"][username]["gallery"] = []
RAM_DB["users"][username]["gallery"].insert(0, gallery_item)
RAM_DB["users"][username]["gallery"] = RAM_DB["users"][username]["gallery"][:15]
# 4. Catat Log
entry = {
"time": str(datetime.now()),
"type": "GENERATE",
"user": username,
"details": {"url": public_url}
}
RAM_LOGS.insert(0, entry)
if len(RAM_LOGS) > 200: RAM_LOGS.pop()
return {"status": "success", "url": public_url}
except Exception as e:
print(f"❌ Save Video Error: {e}")
return {"status": "error", "msg": str(e)}
def api_get_logs():
return RAM_LOGS[:100]
# --- UI (API ONLY) ---
with gr.Blocks() as app:
gr.Markdown("### SODA DATABASE & STORAGE SERVER")
# Inputs
action = gr.Textbox(label="Action")
user = gr.Textbox(label="User")
pw = gr.Textbox(label="Pass")
email = gr.Textbox(label="Email")
# Inputs for Video Save (Terima File)
vid_file = gr.Video(label="Video File")
prompt_txt = gr.Textbox(label="Prompt")
out_json = gr.JSON()
# API Routes
btn_auth = gr.Button("Auth")
btn_auth.click(api_auth, [action, user, pw, email], out_json, api_name="auth")
btn_deduct = gr.Button("Deduct")
btn_deduct.click(api_deduct, [user, pw], out_json, api_name="deduct")
# ENDPOINT KHUSUS: Main Server mengirim file ke sini
btn_save = gr.Button("Save Video")
btn_save.click(api_save_video, [vid_file, user, prompt_txt], out_json, api_name="save_video")
btn_get_logs = gr.Button("Get Logs")
btn_get_logs.click(api_get_logs, None, out_json, api_name="get_logs")
app.queue().launch(server_name="0.0.0.0")