Timo123432345443's picture
Upload app.py with huggingface_hub
e741479 verified
"""
Hugging Face Spaces Cluster - Controller mit Task API
"""
import os
import time
import uuid
import threading
import json
import numpy as np
import gradio as gr
class Controller:
def __init__(self):
self.workers = {}
self.tasks = {}
self.lock = threading.Lock()
def register_worker(self, worker_id):
with self.lock:
if worker_id not in self.workers:
self.workers[worker_id] = {"status": "ready", "tasks": 0}
else:
self.workers[worker_id]["status"] = "ready"
print(f"βœ… Worker: {worker_id}")
return {"status": "ok"}
def get_worker(self):
with self.lock:
for wid, info in self.workers.items():
if info["status"] == "ready":
info["status"] = "busy"
return wid
return None
def submit_task(self, typ, data):
tid = str(uuid.uuid4())
with self.lock:
self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None, "worker_id": None}
self._distribute(tid)
return tid
def _distribute(self, tid):
wid = self.get_worker()
if wid:
with self.lock:
self.tasks[tid]["worker_id"] = wid
self.tasks[tid]["status"] = "assigned"
print(f"πŸ“€ {tid[:8]} β†’ {wid}")
return True
return False
def submit_result(self, wid, tid, result):
with self.lock:
if tid in self.tasks:
self.tasks[tid]["result"] = result
self.tasks[tid]["status"] = "completed"
if wid in self.workers:
self.workers[wid]["status"] = "ready"
self.workers[wid]["tasks"] += 1
print(f"βœ… {tid[:8]} done")
return {"status": "ok"}
def get_task(self, wid):
with self.lock:
for tid, t in self.tasks.items():
if t["status"] == "pending":
t["status"] = "assigned"
t["worker_id"] = wid
return {"id": tid, "type": t["type"], "data": t["data"].tolist() if isinstance(t["data"], np.ndarray) else t["data"]}
return {}
def status(self):
with self.lock:
return {
"workers": len(self.workers),
"ready": sum(1 for w in self.workers.values() if w["status"] == "ready"),
"tasks": len(self.tasks),
"pending": sum(1 for t in self.tasks.values() if t["status"] == "pending"),
"done": sum(1 for t in self.tasks.values() if t["status"] == "completed"),
"worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()]
}
ctrl = Controller()
# UI Functions
def get_status():
s = ctrl.status()
wlist = "<br>".join([f"β€’ {w['id']}: {'🟒' if w['status']=='ready' else 'πŸ”΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine Worker"
return f"""## Cluster Status
**Worker:** {s['workers']} ({s['ready']} ready)
**Tasks:** {s['tasks']} ({s['pending']} pending, {s['done']} done)
**Worker Liste:**
{wlist}"""
def send_task(typ, data):
try:
d = json.loads(data) if isinstance(data, str) else data
tid = ctrl.submit_task(typ, d)
# Warte auf Ergebnis
for _ in range(50):
with ctrl.lock:
if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
return f"βœ… {tid[:8]}: {ctrl.tasks[tid]['result']}"
time.sleep(0.1)
return f"⏳ {tid[:8]}: waiting..."
except Exception as e:
return f"❌ {e}"
def run_benchmark(num_chunks):
"""1 Million Berechnungen auf Worker verteilen"""
chunk_size = 1000000 // num_chunks
task_ids = []
for i in range(num_chunks):
data = list(range(i * chunk_size, (i + 1) * chunk_size))
tid = ctrl.submit_task("sum", data)
task_ids.append(tid)
# Warte auf alle
start = time.time()
results = []
while len(results) < len(task_ids) and time.time() - start < 60:
for tid in task_ids:
with ctrl.lock:
if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
if tid not in [r[0] for r in results]:
results.append((tid, ctrl.tasks[tid]["result"]))
time.sleep(0.2)
total = sum(r[1] for r in results if isinstance(r[1], (int, float)))
elapsed = time.time() - start
return f"""## Benchmark Ergebnis
**Chunks:** {num_chunks}
**Ergebnisse:** {len(results)}/{len(task_ids)}
**Summe:** {total:,}
**Zeit:** {elapsed:.2f}s
**Speedup:** {len(task_ids)/elapsed:.1f}x parallel"""
with gr.Blocks(title="Controller") as demo:
gr.Markdown("# πŸ€— Cluster Controller")
with gr.Tabs():
with gr.Tab("Status"):
btn = gr.Button("πŸ”„ Refresh")
out = gr.Markdown()
btn.click(fn=get_status, outputs=out)
demo.load(fn=get_status, outputs=out)
with gr.Tab("Task"):
t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
d = gr.Textbox(label="Daten (JSON)", value="[1,2,3,4,5]")
b = gr.Button("Send")
r = gr.Textbox(label="Result")
b.click(fn=send_task, inputs=[t,d], outputs=r)
with gr.Tab("πŸš€ Benchmark"):
chunks = gr.Slider(1, 10, value=10, step=1, label="Anzahl Chunks")
bench_btn = gr.Button("1 Million Berechnungen starten")
bench_out = gr.Markdown()
bench_btn.click(fn=run_benchmark, inputs=chunks, outputs=bench_out)
# API Routes
from fastapi import Request
def setup_api():
@demo.app.post("/api/register")
async def register(req: Request):
data = await req.json()
return ctrl.register_worker(data.get("worker_id", "unknown"))
@demo.app.get("/api/get_task")
async def get_task(worker_id: str):
return ctrl.get_task(worker_id)
@demo.app.post("/api/submit_result")
async def submit_result(req: Request):
data = await req.json()
return ctrl.submit_result(data["worker_id"], data["task_id"], data["result"])
@demo.app.get("/api/cluster_status")
async def cluster_status():
return ctrl.status()
if __name__ == "__main__":
setup_api()
print("πŸš€ Starte Controller...")
demo.launch(server_name="0.0.0.0", server_port=7860)