Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Spaces Cluster - Controller mit Task API | |
| """ | |
| import os | |
| import time | |
| import uuid | |
| import threading | |
| import json | |
| import numpy as np | |
| import gradio as gr | |
| class Controller: | |
| def __init__(self): | |
| self.workers = {} | |
| self.tasks = {} | |
| self.lock = threading.Lock() | |
| def register_worker(self, worker_id): | |
| with self.lock: | |
| if worker_id not in self.workers: | |
| self.workers[worker_id] = {"status": "ready", "tasks": 0} | |
| else: | |
| self.workers[worker_id]["status"] = "ready" | |
| print(f"β Worker: {worker_id}") | |
| return {"status": "ok"} | |
| def get_worker(self): | |
| with self.lock: | |
| for wid, info in self.workers.items(): | |
| if info["status"] == "ready": | |
| info["status"] = "busy" | |
| return wid | |
| return None | |
| def submit_task(self, typ, data): | |
| tid = str(uuid.uuid4()) | |
| with self.lock: | |
| self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None, "worker_id": None} | |
| self._distribute(tid) | |
| return tid | |
| def _distribute(self, tid): | |
| wid = self.get_worker() | |
| if wid: | |
| with self.lock: | |
| self.tasks[tid]["worker_id"] = wid | |
| self.tasks[tid]["status"] = "assigned" | |
| print(f"π€ {tid[:8]} β {wid}") | |
| return True | |
| return False | |
| def submit_result(self, wid, tid, result): | |
| with self.lock: | |
| if tid in self.tasks: | |
| self.tasks[tid]["result"] = result | |
| self.tasks[tid]["status"] = "completed" | |
| if wid in self.workers: | |
| self.workers[wid]["status"] = "ready" | |
| self.workers[wid]["tasks"] += 1 | |
| print(f"β {tid[:8]} done") | |
| return {"status": "ok"} | |
| def get_task(self, wid): | |
| with self.lock: | |
| for tid, t in self.tasks.items(): | |
| if t["status"] == "pending": | |
| t["status"] = "assigned" | |
| t["worker_id"] = wid | |
| return {"id": tid, "type": t["type"], "data": t["data"].tolist() if isinstance(t["data"], np.ndarray) else t["data"]} | |
| return {} | |
| def status(self): | |
| with self.lock: | |
| return { | |
| "workers": len(self.workers), | |
| "ready": sum(1 for w in self.workers.values() if w["status"] == "ready"), | |
| "tasks": len(self.tasks), | |
| "pending": sum(1 for t in self.tasks.values() if t["status"] == "pending"), | |
| "done": sum(1 for t in self.tasks.values() if t["status"] == "completed"), | |
| "worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()] | |
| } | |
| ctrl = Controller() | |
| # UI Functions | |
| def get_status(): | |
| s = ctrl.status() | |
| wlist = "<br>".join([f"β’ {w['id']}: {'π’' if w['status']=='ready' else 'π΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine Worker" | |
| return f"""## Cluster Status | |
| **Worker:** {s['workers']} ({s['ready']} ready) | |
| **Tasks:** {s['tasks']} ({s['pending']} pending, {s['done']} done) | |
| **Worker Liste:** | |
| {wlist}""" | |
| def send_task(typ, data): | |
| try: | |
| d = json.loads(data) if isinstance(data, str) else data | |
| tid = ctrl.submit_task(typ, d) | |
| # Warte auf Ergebnis | |
| for _ in range(50): | |
| with ctrl.lock: | |
| if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed": | |
| return f"β {tid[:8]}: {ctrl.tasks[tid]['result']}" | |
| time.sleep(0.1) | |
| return f"β³ {tid[:8]}: waiting..." | |
| except Exception as e: | |
| return f"β {e}" | |
| def run_benchmark(num_chunks): | |
| """1 Million Berechnungen auf Worker verteilen""" | |
| chunk_size = 1000000 // num_chunks | |
| task_ids = [] | |
| for i in range(num_chunks): | |
| data = list(range(i * chunk_size, (i + 1) * chunk_size)) | |
| tid = ctrl.submit_task("sum", data) | |
| task_ids.append(tid) | |
| # Warte auf alle | |
| start = time.time() | |
| results = [] | |
| while len(results) < len(task_ids) and time.time() - start < 60: | |
| for tid in task_ids: | |
| with ctrl.lock: | |
| if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed": | |
| if tid not in [r[0] for r in results]: | |
| results.append((tid, ctrl.tasks[tid]["result"])) | |
| time.sleep(0.2) | |
| total = sum(r[1] for r in results if isinstance(r[1], (int, float))) | |
| elapsed = time.time() - start | |
| return f"""## Benchmark Ergebnis | |
| **Chunks:** {num_chunks} | |
| **Ergebnisse:** {len(results)}/{len(task_ids)} | |
| **Summe:** {total:,} | |
| **Zeit:** {elapsed:.2f}s | |
| **Speedup:** {len(task_ids)/elapsed:.1f}x parallel""" | |
| with gr.Blocks(title="Controller") as demo: | |
| gr.Markdown("# π€ Cluster Controller") | |
| with gr.Tabs(): | |
| with gr.Tab("Status"): | |
| btn = gr.Button("π Refresh") | |
| out = gr.Markdown() | |
| btn.click(fn=get_status, outputs=out) | |
| demo.load(fn=get_status, outputs=out) | |
| with gr.Tab("Task"): | |
| t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ") | |
| d = gr.Textbox(label="Daten (JSON)", value="[1,2,3,4,5]") | |
| b = gr.Button("Send") | |
| r = gr.Textbox(label="Result") | |
| b.click(fn=send_task, inputs=[t,d], outputs=r) | |
| with gr.Tab("π Benchmark"): | |
| chunks = gr.Slider(1, 10, value=10, step=1, label="Anzahl Chunks") | |
| bench_btn = gr.Button("1 Million Berechnungen starten") | |
| bench_out = gr.Markdown() | |
| bench_btn.click(fn=run_benchmark, inputs=chunks, outputs=bench_out) | |
| # API Routes | |
| from fastapi import Request | |
| def setup_api(): | |
| async def register(req: Request): | |
| data = await req.json() | |
| return ctrl.register_worker(data.get("worker_id", "unknown")) | |
| async def get_task(worker_id: str): | |
| return ctrl.get_task(worker_id) | |
| async def submit_result(req: Request): | |
| data = await req.json() | |
| return ctrl.submit_result(data["worker_id"], data["task_id"], data["result"]) | |
| async def cluster_status(): | |
| return ctrl.status() | |
| if __name__ == "__main__": | |
| setup_api() | |
| print("π Starte Controller...") | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |