""" VIDEO GENERATOR WORKER - HuggingFace Space ========================================== VEO 3.1 Only - Background worker """ import os import gradio as gr import threading import time import json import traceback from concurrent.futures import ThreadPoolExecutor print(">>> [INIT] Starting Video Generator Worker (VEO 3.1 Only)...") # Worker status worker_status = { "running": False, "active_jobs": 0, "jobs_done": 0, "last_check": None, "errors": [] } def load_worker_module(): """Load video_core.py""" from huggingface_hub import hf_hub_download import shutil import importlib.util import sys # ==================== TOKEN CONFIGURATION ==================== token = os.environ.get("HF_TOKEN") if not token: raise ValueError("HF_TOKEN not set!") # Dataset repository (semua file ada di sini) repo_id = os.environ.get("REPO_ID", "malikrf22/abcx") print(f">>> [INIT] Loading from dataset: {repo_id}") # ==================== DOWNLOAD VIDEO_CORE.PY ==================== print(f">>> [INIT] Downloading video_core.py...") path_core = hf_hub_download( repo_id=repo_id, filename="video_core.py", repo_type="dataset", token=token, force_download=True ) # ==================== DOWNLOAD DATALOGIN.JSON ==================== print(f">>> [INIT] Downloading datalogin.json...") path_login = hf_hub_download( repo_id=repo_id, filename="datalogin.json", repo_type="dataset", token=token, force_download=True ) # ==================== LOAD CREDENTIALS ==================== with open(path_login, 'r') as f: login_data = json.load(f) print(f">>> [INIT] Login data keys: {list(login_data.keys())}") # Ambil data dari key utama (mmm123 atau key lainnya) main_key = os.environ.get("LOGIN_KEY", "mmm123") if main_key in login_data: credentials = login_data[main_key] else: # Fallback: ambil key pertama yang berisi dict for key, value in login_data.items(): if isinstance(value, dict): credentials = value main_key = key break else: raise ValueError("No valid credentials found in datalogin.json") print(f">>> [INIT] Using credentials from key: {main_key}") # ==================== BUILD CONFIG ==================== config = { "shared_pools": { "dreamina": { "accounts": credentials.get("dreamina_accounts", []) } }, "max_concurrent_video_jobs": int(os.environ.get("MAX_CONCURRENT_JOBS", "5")), "gemini_refresh_cookie": credentials.get("gemini_refresh_cookie", []), "seedream_accounts": credentials.get("seedream_accounts", []) } dreamina_count = len(config["shared_pools"]["dreamina"]["accounts"]) print(f">>> [INIT] ✅ Loaded {dreamina_count} Dreamina accounts") # ==================== IMPORT MODULE ==================== shutil.copy(path_core, "video_module.py") spec = importlib.util.spec_from_file_location("video_module", "video_module.py") module = importlib.util.module_from_spec(spec) sys.modules["video_module"] = module spec.loader.exec_module(module) return module, config def process_single_job(worker, job): """Process single job in separate thread""" job_id = job.get('id', 'unknown')[:8] try: success = worker.process_job(job) return (job_id, success) except Exception as e: return (job_id, False, str(e)) def worker_loop(): """Background worker loop""" global worker_status try: module, config = load_worker_module() worker = module.VideoJobWorker(config) worker_status["running"] = True print(">>> [WORKER] VEO 3.1 Worker started successfully") except Exception as e: error_msg = str(e)[:200] worker_status["errors"].append(f"Init: {error_msg}") print(f">>> [WORKER] Init failed: {e}") traceback.print_exc() return max_workers = config.get("max_concurrent_video_jobs", 5) print(f">>> [WORKER] Max concurrent: {max_workers}") with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} while True: try: worker_status["last_check"] = time.strftime("%H:%M:%S") done_futures = [f for f in list(futures.keys()) if f.done()] for future in done_futures: job_id = futures.pop(future) try: result = future.result() if isinstance(result, tuple) and len(result) >= 2: if result[1]: worker_status["jobs_done"] += 1 print(f">>> [WORKER] ✅ Done: {job_id}") else: print(f">>> [WORKER] ❌ Failed: {job_id}") except Exception as e: print(f">>> [WORKER] ❌ Error {job_id}: {e}") slots_available = max_workers - len(futures) for _ in range(slots_available): job = worker.claim_job() if job: job_id = job.get('id', 'unknown')[:8] future = executor.submit(process_single_job, worker, job) futures[future] = job_id print(f">>> [WORKER] 🚀 Started: {job_id} (active: {len(futures)})") else: break worker_status["active_jobs"] = len(futures) if not futures: time.sleep(3) else: time.sleep(1) except Exception as e: error_msg = f"{time.strftime('%H:%M:%S')}: {str(e)[:100]}" worker_status["errors"] = worker_status["errors"][-10:] + [error_msg] print(f">>> [WORKER] Error: {e}") time.sleep(10) # Start worker worker_thread = threading.Thread(target=worker_loop, daemon=True) worker_thread.start() time.sleep(3) # ============================================================================== # GRADIO STATUS PAGE # ============================================================================== def get_status(): errors_text = "\n".join([f"- {e}" for e in worker_status["errors"][-5:]]) if worker_status["errors"] else "- None" return f""" # 🎬 Video Generator Worker (VEO 3.1) | Item | Status | |------|--------| | **Running** | {'✅ Yes' if worker_status['running'] else '❌ No'} | | **Active Jobs** | 🔄 {worker_status['active_jobs']} | | **Jobs Completed** | ✅ {worker_status['jobs_done']} | | **Last Check** | {worker_status['last_check'] or 'Never'} | ### Engine: - 🎬 **VEO 3.1** (Dreamina) - High quality video generation ### Recent Errors: {errors_text} --- *Click Refresh to update* """ with gr.Blocks(title="Video Generator Worker") as demo: gr.Markdown("# 🎬 Video Generator Worker") gr.Markdown("VEO 3.1 powered video generation") status_display = gr.Markdown(get_status()) refresh_btn = gr.Button("🔄 Refresh Status", variant="primary") refresh_btn.click(fn=get_status, inputs=None, outputs=[status_display]) gr.Markdown(""" --- ### Features: - ✅ Text-to-Video - ✅ Image-to-Video - ✅ Auto-detect aspect ratio - ✅ Multiple aspect ratios (16:9, 9:16, 1:1) - ✅ Auto-refund on failure """) if __name__ == "__main__": demo.queue().launch()