""" IMAGE GENERATOR WORKER - HuggingFace Space ========================================== Background worker for processing image generation jobs """ import os import gradio as gr import threading import time import json import traceback from concurrent.futures import ThreadPoolExecutor print(">>> [INIT] Starting Image Generator Worker...") # Worker status worker_status = { "running": False, "active_jobs": 0, "jobs_done": 0, "last_check": None, "errors": [] } def load_worker_module(): """Load image_core.py and config.json from HF Datasets""" from huggingface_hub import hf_hub_download import shutil import importlib.util import sys token = os.environ.get("KUNCI_GUDANG") if not token: raise ValueError("KUNCI_GUDANG not set!") repo_id = os.environ.get("REPO_ID", "malikrf22/abcx") # Download image_core.py path_core = hf_hub_download( repo_id=repo_id, filename="image_core.py", repo_type="dataset", token=token, force_download=True ) # Download config.json path_config = hf_hub_download( repo_id=repo_id, filename="datalogin.json", repo_type="dataset", token=token, force_download=True ) # Copy to local shutil.copy(path_core, "image_module.py") # Import module spec = importlib.util.spec_from_file_location("image_module", "image_module.py") module = importlib.util.module_from_spec(spec) sys.modules["image_module"] = module spec.loader.exec_module(module) # Load config with open(path_config, 'r') as f: config = json.load(f) return module, config def process_single_job(worker, job): """Process single job in separate thread""" job_id = job.get('id', 'unknown')[:8] try: success = worker.process_job(job) return (job_id, success) except Exception as e: return (job_id, False, str(e)) def worker_loop(): """Background worker loop""" global worker_status try: module, config = load_worker_module() worker = module.ImageJobWorker(config) worker_status["running"] = True print(">>> [WORKER] Image Worker started successfully") except Exception as e: error_msg = str(e)[:200] worker_status["errors"].append(f"Init: {error_msg}") print(f">>> [WORKER] Init failed: {e}") traceback.print_exc() return # Concurrent processing (higher for images since they're faster) max_workers = config.get("max_concurrent_image_jobs", 15) print(f">>> [WORKER] Max concurrent: {max_workers}") with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} while True: try: worker_status["last_check"] = time.strftime("%H:%M:%S") # Check completed futures done_futures = [f for f in list(futures.keys()) if f.done()] for future in done_futures: job_id = futures.pop(future) try: result = future.result() if isinstance(result, tuple) and len(result) >= 2: if result[1]: worker_status["jobs_done"] += 1 print(f">>> [WORKER] ✅ Done: {job_id}") else: print(f">>> [WORKER] ❌ Failed: {job_id}") except Exception as e: print(f">>> [WORKER] ❌ Error {job_id}: {e}") # Fill available slots slots_available = max_workers - len(futures) for _ in range(slots_available): job = worker.claim_job() if job: job_id = job.get('id', 'unknown')[:8] future = executor.submit(process_single_job, worker, job) futures[future] = job_id print(f">>> [WORKER] 🚀 Started: {job_id} (active: {len(futures)})") else: break worker_status["active_jobs"] = len(futures) if not futures: time.sleep(2) else: time.sleep(0.5) except Exception as e: error_msg = f"{time.strftime('%H:%M:%S')}: {str(e)[:100]}" worker_status["errors"] = worker_status["errors"][-10:] + [error_msg] print(f">>> [WORKER] Error: {e}") time.sleep(10) # Start worker in background worker_thread = threading.Thread(target=worker_loop, daemon=True) worker_thread.start() time.sleep(3) # ============================================================================== # GRADIO STATUS PAGE # ============================================================================== def get_status(): """Generate status markdown""" errors_text = "\n".join([f"- {e}" for e in worker_status["errors"][-5:]]) if worker_status["errors"] else "- None" return f""" # 🖼️ Image Generator Worker | Item | Status | |------|--------| | **Running** | {'✅ Yes' if worker_status['running'] else '❌ No'} | | **Active Jobs** | 🔄 {worker_status['active_jobs']} | | **Jobs Completed** | ✅ {worker_status['jobs_done']} | | **Last Check** | {worker_status['last_check'] or 'Never'} | ### Engine: - 🎨 **Gemini Opal** - AI Image Generation ### Recent Errors: {errors_text} --- *Click Refresh to update* """ # Gradio interface with gr.Blocks(title="Image Generator Worker") as demo: gr.Markdown("# 🖼️ Image Generator Worker") gr.Markdown("Gemini Opal powered image generation") status_display = gr.Markdown(get_status()) refresh_btn = gr.Button("🔄 Refresh Status", variant="primary") refresh_btn.click(fn=get_status, inputs=None, outputs=[status_display]) gr.Markdown(""" --- ### Features: - ✅ Text-to-Image - ✅ Image-to-Image (Edit) - ✅ Multiple aspect ratios - ✅ Auto-refund on failure """) if __name__ == "__main__": demo.queue().launch()