Spaces:
Sleeping
Sleeping
File size: 6,559 Bytes
47172b7 e741479 47172b7 5ab52dd 47172b7 5ab52dd 5ac7273 47172b7 5ac7273 47172b7 5ac7273 47172b7 e741479 5ac7273 47172b7 5ab52dd 47172b7 0f777a0 47172b7 0f777a0 47172b7 5ac7273 47172b7 e741479 5ac7273 47172b7 5ac7273 761481b 5ac7273 e741479 47172b7 5ac7273 47172b7 5ac7273 e741479 5ac7273 47172b7 5ac7273 e741479 5ac7273 47172b7 5ac7273 92992a9 e741479 761481b 5ac7273 e741479 92992a9 e741479 5ac7273 47172b7 5ac7273 47172b7 5ac7273 e741479 47172b7 761481b 47172b7 e741479 5ac7273 0f777a0 47172b7 761481b e741479 761481b e741479 761481b e741479 5ac7273 e741479 5ac7273 761481b 5ac7273 e741479 761481b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | """
Hugging Face Spaces Cluster - Controller mit Task API
"""
import os
import time
import uuid
import threading
import json
import numpy as np
import gradio as gr
class Controller:
def __init__(self):
self.workers = {}
self.tasks = {}
self.lock = threading.Lock()
def register_worker(self, worker_id):
with self.lock:
if worker_id not in self.workers:
self.workers[worker_id] = {"status": "ready", "tasks": 0}
else:
self.workers[worker_id]["status"] = "ready"
print(f"β
Worker: {worker_id}")
return {"status": "ok"}
def get_worker(self):
with self.lock:
for wid, info in self.workers.items():
if info["status"] == "ready":
info["status"] = "busy"
return wid
return None
def submit_task(self, typ, data):
tid = str(uuid.uuid4())
with self.lock:
self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None, "worker_id": None}
self._distribute(tid)
return tid
def _distribute(self, tid):
wid = self.get_worker()
if wid:
with self.lock:
self.tasks[tid]["worker_id"] = wid
self.tasks[tid]["status"] = "assigned"
print(f"π€ {tid[:8]} β {wid}")
return True
return False
def submit_result(self, wid, tid, result):
with self.lock:
if tid in self.tasks:
self.tasks[tid]["result"] = result
self.tasks[tid]["status"] = "completed"
if wid in self.workers:
self.workers[wid]["status"] = "ready"
self.workers[wid]["tasks"] += 1
print(f"β
{tid[:8]} done")
return {"status": "ok"}
def get_task(self, wid):
with self.lock:
for tid, t in self.tasks.items():
if t["status"] == "pending":
t["status"] = "assigned"
t["worker_id"] = wid
return {"id": tid, "type": t["type"], "data": t["data"].tolist() if isinstance(t["data"], np.ndarray) else t["data"]}
return {}
def status(self):
with self.lock:
return {
"workers": len(self.workers),
"ready": sum(1 for w in self.workers.values() if w["status"] == "ready"),
"tasks": len(self.tasks),
"pending": sum(1 for t in self.tasks.values() if t["status"] == "pending"),
"done": sum(1 for t in self.tasks.values() if t["status"] == "completed"),
"worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()]
}
ctrl = Controller()
# UI Functions
def get_status():
s = ctrl.status()
wlist = "<br>".join([f"β’ {w['id']}: {'π’' if w['status']=='ready' else 'π΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine Worker"
return f"""## Cluster Status
**Worker:** {s['workers']} ({s['ready']} ready)
**Tasks:** {s['tasks']} ({s['pending']} pending, {s['done']} done)
**Worker Liste:**
{wlist}"""
def send_task(typ, data):
try:
d = json.loads(data) if isinstance(data, str) else data
tid = ctrl.submit_task(typ, d)
# Warte auf Ergebnis
for _ in range(50):
with ctrl.lock:
if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
return f"β
{tid[:8]}: {ctrl.tasks[tid]['result']}"
time.sleep(0.1)
return f"β³ {tid[:8]}: waiting..."
except Exception as e:
return f"β {e}"
def run_benchmark(num_chunks):
"""1 Million Berechnungen auf Worker verteilen"""
chunk_size = 1000000 // num_chunks
task_ids = []
for i in range(num_chunks):
data = list(range(i * chunk_size, (i + 1) * chunk_size))
tid = ctrl.submit_task("sum", data)
task_ids.append(tid)
# Warte auf alle
start = time.time()
results = []
while len(results) < len(task_ids) and time.time() - start < 60:
for tid in task_ids:
with ctrl.lock:
if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
if tid not in [r[0] for r in results]:
results.append((tid, ctrl.tasks[tid]["result"]))
time.sleep(0.2)
total = sum(r[1] for r in results if isinstance(r[1], (int, float)))
elapsed = time.time() - start
return f"""## Benchmark Ergebnis
**Chunks:** {num_chunks}
**Ergebnisse:** {len(results)}/{len(task_ids)}
**Summe:** {total:,}
**Zeit:** {elapsed:.2f}s
**Speedup:** {len(task_ids)/elapsed:.1f}x parallel"""
with gr.Blocks(title="Controller") as demo:
gr.Markdown("# π€ Cluster Controller")
with gr.Tabs():
with gr.Tab("Status"):
btn = gr.Button("π Refresh")
out = gr.Markdown()
btn.click(fn=get_status, outputs=out)
demo.load(fn=get_status, outputs=out)
with gr.Tab("Task"):
t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
d = gr.Textbox(label="Daten (JSON)", value="[1,2,3,4,5]")
b = gr.Button("Send")
r = gr.Textbox(label="Result")
b.click(fn=send_task, inputs=[t,d], outputs=r)
with gr.Tab("π Benchmark"):
chunks = gr.Slider(1, 10, value=10, step=1, label="Anzahl Chunks")
bench_btn = gr.Button("1 Million Berechnungen starten")
bench_out = gr.Markdown()
bench_btn.click(fn=run_benchmark, inputs=chunks, outputs=bench_out)
# API Routes
from fastapi import Request
def setup_api():
@demo.app.post("/api/register")
async def register(req: Request):
data = await req.json()
return ctrl.register_worker(data.get("worker_id", "unknown"))
@demo.app.get("/api/get_task")
async def get_task(worker_id: str):
return ctrl.get_task(worker_id)
@demo.app.post("/api/submit_result")
async def submit_result(req: Request):
data = await req.json()
return ctrl.submit_result(data["worker_id"], data["task_id"], data["result"])
@demo.app.get("/api/cluster_status")
async def cluster_status():
return ctrl.status()
if __name__ == "__main__":
setup_api()
print("π Starte Controller...")
demo.launch(server_name="0.0.0.0", server_port=7860)
|