File size: 6,559 Bytes
47172b7
e741479
47172b7
 
 
5ab52dd
47172b7
 
5ab52dd
5ac7273
47172b7
 
5ac7273
47172b7
 
 
 
 
5ac7273
47172b7
e741479
 
 
 
5ac7273
 
47172b7
5ab52dd
47172b7
0f777a0
47172b7
 
0f777a0
47172b7
 
5ac7273
 
47172b7
e741479
5ac7273
 
47172b7
5ac7273
 
 
761481b
5ac7273
 
 
e741479
 
47172b7
5ac7273
47172b7
5ac7273
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e741479
5ac7273
 
 
47172b7
 
5ac7273
 
 
e741479
5ac7273
 
47172b7
 
5ac7273
92992a9
e741479
761481b
5ac7273
e741479
 
 
 
92992a9
e741479
5ac7273
47172b7
5ac7273
47172b7
5ac7273
 
e741479
 
 
 
 
 
 
47172b7
761481b
47172b7
e741479
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ac7273
0f777a0
47172b7
761481b
e741479
761481b
 
 
e741479
761481b
 
e741479
5ac7273
 
 
e741479
 
 
 
 
 
 
 
5ac7273
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
761481b
 
5ac7273
e741479
761481b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
Hugging Face Spaces Cluster - Controller mit Task API
"""

import os
import time
import uuid
import threading
import json
import numpy as np
import gradio as gr

class Controller:
    def __init__(self):
        self.workers = {}
        self.tasks = {}
        self.lock = threading.Lock()

    def register_worker(self, worker_id):
        with self.lock:
            if worker_id not in self.workers:
                self.workers[worker_id] = {"status": "ready", "tasks": 0}
            else:
                self.workers[worker_id]["status"] = "ready"
        print(f"βœ… Worker: {worker_id}")
        return {"status": "ok"}

    def get_worker(self):
        with self.lock:
            for wid, info in self.workers.items():
                if info["status"] == "ready":
                    info["status"] = "busy"
                    return wid
        return None

    def submit_task(self, typ, data):
        tid = str(uuid.uuid4())
        with self.lock:
            self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None, "worker_id": None}
        self._distribute(tid)
        return tid

    def _distribute(self, tid):
        wid = self.get_worker()
        if wid:
            with self.lock:
                self.tasks[tid]["worker_id"] = wid
                self.tasks[tid]["status"] = "assigned"
            print(f"πŸ“€ {tid[:8]} β†’ {wid}")
            return True
        return False

    def submit_result(self, wid, tid, result):
        with self.lock:
            if tid in self.tasks:
                self.tasks[tid]["result"] = result
                self.tasks[tid]["status"] = "completed"
            if wid in self.workers:
                self.workers[wid]["status"] = "ready"
                self.workers[wid]["tasks"] += 1
        print(f"βœ… {tid[:8]} done")
        return {"status": "ok"}

    def get_task(self, wid):
        with self.lock:
            for tid, t in self.tasks.items():
                if t["status"] == "pending":
                    t["status"] = "assigned"
                    t["worker_id"] = wid
                    return {"id": tid, "type": t["type"], "data": t["data"].tolist() if isinstance(t["data"], np.ndarray) else t["data"]}
        return {}

    def status(self):
        with self.lock:
            return {
                "workers": len(self.workers),
                "ready": sum(1 for w in self.workers.values() if w["status"] == "ready"),
                "tasks": len(self.tasks),
                "pending": sum(1 for t in self.tasks.values() if t["status"] == "pending"),
                "done": sum(1 for t in self.tasks.values() if t["status"] == "completed"),
                "worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()]
            }

ctrl = Controller()

# UI Functions
def get_status():
    s = ctrl.status()
    wlist = "<br>".join([f"β€’ {w['id']}: {'🟒' if w['status']=='ready' else 'πŸ”΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine Worker"
    return f"""## Cluster Status
**Worker:** {s['workers']} ({s['ready']} ready)
**Tasks:** {s['tasks']} ({s['pending']} pending, {s['done']} done)

**Worker Liste:**
{wlist}"""

def send_task(typ, data):
    try:
        d = json.loads(data) if isinstance(data, str) else data
        tid = ctrl.submit_task(typ, d)
        # Warte auf Ergebnis
        for _ in range(50):
            with ctrl.lock:
                if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
                    return f"βœ… {tid[:8]}: {ctrl.tasks[tid]['result']}"
            time.sleep(0.1)
        return f"⏳ {tid[:8]}: waiting..."
    except Exception as e:
        return f"❌ {e}"

def run_benchmark(num_chunks):
    """1 Million Berechnungen auf Worker verteilen"""
    chunk_size = 1000000 // num_chunks
    task_ids = []
    
    for i in range(num_chunks):
        data = list(range(i * chunk_size, (i + 1) * chunk_size))
        tid = ctrl.submit_task("sum", data)
        task_ids.append(tid)
    
    # Warte auf alle
    start = time.time()
    results = []
    while len(results) < len(task_ids) and time.time() - start < 60:
        for tid in task_ids:
            with ctrl.lock:
                if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
                    if tid not in [r[0] for r in results]:
                        results.append((tid, ctrl.tasks[tid]["result"]))
        time.sleep(0.2)
    
    total = sum(r[1] for r in results if isinstance(r[1], (int, float)))
    elapsed = time.time() - start
    
    return f"""## Benchmark Ergebnis
**Chunks:** {num_chunks}
**Ergebnisse:** {len(results)}/{len(task_ids)}
**Summe:** {total:,}
**Zeit:** {elapsed:.2f}s
**Speedup:** {len(task_ids)/elapsed:.1f}x parallel"""

with gr.Blocks(title="Controller") as demo:
    gr.Markdown("# πŸ€— Cluster Controller")
    with gr.Tabs():
        with gr.Tab("Status"):
            btn = gr.Button("πŸ”„ Refresh")
            out = gr.Markdown()
            btn.click(fn=get_status, outputs=out)
            demo.load(fn=get_status, outputs=out)
        
        with gr.Tab("Task"):
            t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
            d = gr.Textbox(label="Daten (JSON)", value="[1,2,3,4,5]")
            b = gr.Button("Send")
            r = gr.Textbox(label="Result")
            b.click(fn=send_task, inputs=[t,d], outputs=r)
        
        with gr.Tab("πŸš€ Benchmark"):
            chunks = gr.Slider(1, 10, value=10, step=1, label="Anzahl Chunks")
            bench_btn = gr.Button("1 Million Berechnungen starten")
            bench_out = gr.Markdown()
            bench_btn.click(fn=run_benchmark, inputs=chunks, outputs=bench_out)

# API Routes
from fastapi import Request

def setup_api():
    @demo.app.post("/api/register")
    async def register(req: Request):
        data = await req.json()
        return ctrl.register_worker(data.get("worker_id", "unknown"))
    
    @demo.app.get("/api/get_task")
    async def get_task(worker_id: str):
        return ctrl.get_task(worker_id)
    
    @demo.app.post("/api/submit_result")
    async def submit_result(req: Request):
        data = await req.json()
        return ctrl.submit_result(data["worker_id"], data["task_id"], data["result"])
    
    @demo.app.get("/api/cluster_status")
    async def cluster_status():
        return ctrl.status()

if __name__ == "__main__":
    setup_api()
    print("πŸš€ Starte Controller...")
    demo.launch(server_name="0.0.0.0", server_port=7860)