| """ |
| IMAGE GENERATOR WORKER - HuggingFace Space |
| ========================================== |
| Background worker for processing image generation jobs |
| """ |
|
|
| import os |
| import gradio as gr |
| import threading |
| import time |
| import json |
| import traceback |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| print(">>> [INIT] Starting Image Generator Worker...") |
|
|
| |
| worker_status = { |
| "running": False, |
| "active_jobs": 0, |
| "jobs_done": 0, |
| "last_check": None, |
| "errors": [] |
| } |
|
|
| def load_worker_module(): |
| """Load image_core.py and config.json from HF Datasets""" |
| from huggingface_hub import hf_hub_download |
| import shutil |
| import importlib.util |
| import sys |
| |
| token = os.environ.get("KUNCI_GUDANG") |
| if not token: |
| raise ValueError("KUNCI_GUDANG not set!") |
| |
| repo_id = os.environ.get("REPO_ID", "malikrf22/abcx") |
| |
| |
| path_core = hf_hub_download( |
| repo_id=repo_id, |
| filename="image_core.py", |
| repo_type="dataset", |
| token=token, |
| force_download=True |
| ) |
| |
| |
| path_config = hf_hub_download( |
| repo_id=repo_id, |
| filename="datalogin.json", |
| repo_type="dataset", |
| token=token, |
| force_download=True |
| ) |
| |
| |
| shutil.copy(path_core, "image_module.py") |
| |
| |
| spec = importlib.util.spec_from_file_location("image_module", "image_module.py") |
| module = importlib.util.module_from_spec(spec) |
| sys.modules["image_module"] = module |
| spec.loader.exec_module(module) |
| |
| |
| with open(path_config, 'r') as f: |
| config = json.load(f) |
| |
| return module, config |
|
|
| def process_single_job(worker, job): |
| """Process single job in separate thread""" |
| job_id = job.get('id', 'unknown')[:8] |
| try: |
| success = worker.process_job(job) |
| return (job_id, success) |
| except Exception as e: |
| return (job_id, False, str(e)) |
|
|
| def worker_loop(): |
| """Background worker loop""" |
| global worker_status |
| |
| try: |
| module, config = load_worker_module() |
| worker = module.ImageJobWorker(config) |
| worker_status["running"] = True |
| print(">>> [WORKER] Image Worker started successfully") |
| |
| except Exception as e: |
| error_msg = str(e)[:200] |
| worker_status["errors"].append(f"Init: {error_msg}") |
| print(f">>> [WORKER] Init failed: {e}") |
| traceback.print_exc() |
| return |
| |
| |
| max_workers = config.get("max_concurrent_image_jobs", 15) |
| print(f">>> [WORKER] Max concurrent: {max_workers}") |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| futures = {} |
| |
| while True: |
| try: |
| worker_status["last_check"] = time.strftime("%H:%M:%S") |
| |
| |
| done_futures = [f for f in list(futures.keys()) if f.done()] |
| for future in done_futures: |
| job_id = futures.pop(future) |
| try: |
| result = future.result() |
| if isinstance(result, tuple) and len(result) >= 2: |
| if result[1]: |
| worker_status["jobs_done"] += 1 |
| print(f">>> [WORKER] ✅ Done: {job_id}") |
| else: |
| print(f">>> [WORKER] ❌ Failed: {job_id}") |
| except Exception as e: |
| print(f">>> [WORKER] ❌ Error {job_id}: {e}") |
| |
| |
| slots_available = max_workers - len(futures) |
| |
| for _ in range(slots_available): |
| job = worker.claim_job() |
| if job: |
| job_id = job.get('id', 'unknown')[:8] |
| future = executor.submit(process_single_job, worker, job) |
| futures[future] = job_id |
| print(f">>> [WORKER] 🚀 Started: {job_id} (active: {len(futures)})") |
| else: |
| break |
| |
| worker_status["active_jobs"] = len(futures) |
| |
| if not futures: |
| time.sleep(2) |
| else: |
| time.sleep(0.5) |
| |
| except Exception as e: |
| error_msg = f"{time.strftime('%H:%M:%S')}: {str(e)[:100]}" |
| worker_status["errors"] = worker_status["errors"][-10:] + [error_msg] |
| print(f">>> [WORKER] Error: {e}") |
| time.sleep(10) |
|
|
| |
| worker_thread = threading.Thread(target=worker_loop, daemon=True) |
| worker_thread.start() |
| time.sleep(3) |
|
|
| |
| |
| |
|
|
| def get_status(): |
| """Generate status markdown""" |
| errors_text = "\n".join([f"- {e}" for e in worker_status["errors"][-5:]]) if worker_status["errors"] else "- None" |
| |
| return f""" |
| # 🖼️ Image Generator Worker |
| |
| | Item | Status | |
| |------|--------| |
| | **Running** | {'✅ Yes' if worker_status['running'] else '❌ No'} | |
| | **Active Jobs** | 🔄 {worker_status['active_jobs']} | |
| | **Jobs Completed** | ✅ {worker_status['jobs_done']} | |
| | **Last Check** | {worker_status['last_check'] or 'Never'} | |
| |
| ### Engine: |
| - 🎨 **Gemini Opal** - AI Image Generation |
| |
| ### Recent Errors: |
| {errors_text} |
| |
| --- |
| *Click Refresh to update* |
| """ |
|
|
| |
| with gr.Blocks(title="Image Generator Worker") as demo: |
| gr.Markdown("# 🖼️ Image Generator Worker") |
| gr.Markdown("Gemini Opal powered image generation") |
| |
| status_display = gr.Markdown(get_status()) |
| |
| refresh_btn = gr.Button("🔄 Refresh Status", variant="primary") |
| refresh_btn.click(fn=get_status, inputs=None, outputs=[status_display]) |
| |
| gr.Markdown(""" |
| --- |
| ### Features: |
| - ✅ Text-to-Image |
| - ✅ Image-to-Image (Edit) |
| - ✅ Multiple aspect ratios |
| - ✅ Auto-refund on failure |
| """) |
|
|
| if __name__ == "__main__": |
| demo.queue().launch() |