Timo123432345443 commited on
Commit
c04d0d9
·
verified ·
1 Parent(s): c58e4a7

Upload controller.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. controller.py +371 -0
controller.py ADDED
@@ -0,0 +1,371 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Hugging Face Spaces Cluster - Controller
3
+ =========================================
4
+ Koordiniert Worker Spaces und verteilt Tasks.
5
+
6
+ Deployment:
7
+ 1. Diese Datei auf Hugging Face Space hochladen
8
+ 2. requirements.txt hochladen
9
+ 3. Space startet automatisch
10
+ """
11
+
12
+ import os
13
+ import time
14
+ import json
15
+ import uuid
16
+ import threading
17
+ import numpy as np
18
+ from collections import defaultdict
19
+ from datetime import datetime
20
+
21
+ import gradio as gr
22
+
23
+ # Hugging Face Konfiguration
24
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
25
+ CONTROLLER_ID = os.getenv("CONTROLLER_ID", "controller")
26
+ SPACE_NAME = os.getenv("SPACE_NAME", "")
27
+
28
+ # ============================================
29
+ # Cluster Management
30
+ # ============================================
31
+
32
+ class ClusterController:
33
+ """Verwaltet Worker und verteilt Tasks"""
34
+
35
+ def __init__(self):
36
+ self.workers = {} # worker_id -> {status, last_seen, tasks_completed}
37
+ self.tasks = {} # task_id -> {status, result, worker_id}
38
+ self.results = {} # task_id -> result
39
+ self.lock = threading.Lock()
40
+
41
+ def register_worker(self, worker_id):
42
+ """Registriert einen Worker"""
43
+ with self.lock:
44
+ self.workers[worker_id] = {
45
+ "status": "ready",
46
+ "last_seen": datetime.now(),
47
+ "tasks_completed": 0
48
+ }
49
+ print(f"✅ Worker registriert: {worker_id}")
50
+ return {"status": "ok"}
51
+
52
+ def get_available_worker(self):
53
+ """Findet verfügbaren Worker"""
54
+ with self.lock:
55
+ for worker_id, info in self.workers.items():
56
+ if info["status"] == "ready":
57
+ # Worker als busy markieren
58
+ info["status"] = "busy"
59
+ return worker_id
60
+ return None
61
+
62
+ def submit_task(self, task_type, data):
63
+ """Submit一个新 Task"""
64
+ task_id = str(uuid.uuid4())
65
+
66
+ with self.lock:
67
+ self.tasks[task_id] = {
68
+ "type": task_type,
69
+ "data": data,
70
+ "status": "pending",
71
+ "created": datetime.now(),
72
+ "worker_id": None,
73
+ "result": None
74
+ }
75
+
76
+ # Task an Worker verteilen
77
+ self._distribute_task(task_id)
78
+
79
+ return task_id
80
+
81
+ def _distribute_task(self, task_id):
82
+ """Verteilt Task an verfügbaren Worker"""
83
+ worker_id = self.get_available_worker()
84
+
85
+ if worker_id is None:
86
+ # Kein Worker verfügbar, Task bleibt pending
87
+ return None
88
+
89
+ with self.lock:
90
+ task = self.tasks[task_id]
91
+ task["worker_id"] = worker_id
92
+ task["status"] = "assigned"
93
+
94
+ print(f"📤 Task {task_id[:8]} → Worker {worker_id}")
95
+ return worker_id
96
+
97
+ def submit_result(self, worker_id, task_id, result):
98
+ """Speichert Ergebnis von Worker"""
99
+ with self.lock:
100
+ if task_id in self.tasks:
101
+ self.tasks[task_id]["result"] = result
102
+ self.tasks[task_id]["status"] = "completed"
103
+
104
+ if worker_id in self.workers:
105
+ self.workers[worker_id]["status"] = "ready"
106
+ self.workers[worker_id]["tasks_completed"] += 1
107
+
108
+ print(f"✅ Task {task_id[:8]} abgeschlossen von {worker_id}")
109
+ return {"status": "ok"}
110
+
111
+ def get_task_status(self, task_id):
112
+ """Gibt Task-Status zurück"""
113
+ with self.lock:
114
+ if task_id in self.tasks:
115
+ task = self.tasks[task_id]
116
+ return {
117
+ "task_id": task_id,
118
+ "status": task["status"],
119
+ "result": task["result"],
120
+ "worker_id": task["worker_id"]
121
+ }
122
+ return {"status": "not_found"}
123
+
124
+ def get_cluster_status(self):
125
+ """Gibt Cluster-Übersicht"""
126
+ with self.lock:
127
+ total_workers = len(self.workers)
128
+ ready_workers = sum(1 for w in self.workers.values() if w["status"] == "ready")
129
+ busy_workers = sum(1 for w in self.workers.values() if w["status"] == "busy")
130
+
131
+ total_tasks = len(self.tasks)
132
+ pending_tasks = sum(1 for t in self.tasks.values() if t["status"] == "pending")
133
+ completed_tasks = sum(1 for t in self.tasks.values() if t["status"] == "completed")
134
+
135
+ return {
136
+ "workers": {
137
+ "total": total_workers,
138
+ "ready": ready_workers,
139
+ "busy": busy_workers
140
+ },
141
+ "tasks": {
142
+ "total": total_tasks,
143
+ "pending": pending_tasks,
144
+ "completed": completed_tasks
145
+ },
146
+ "worker_list": [
147
+ {"id": wid, "status": info["status"], "tasks": info["tasks_completed"]}
148
+ for wid, info in self.workers.items()
149
+ ]
150
+ }
151
+
152
+ def process_batch(self, task_type, data_chunks):
153
+ """Verarbeitet Batch von Daten-Chunks parallel"""
154
+ task_ids = []
155
+
156
+ # Tasks für alle Chunks erstellen
157
+ for chunk in data_chunks:
158
+ task_id = self.submit_task(task_type, chunk)
159
+ task_ids.append(task_id)
160
+
161
+ # Auf Ergebnisse warten
162
+ results = []
163
+ start_time = time.time()
164
+ timeout = 60 # 60 Sekunden Timeout
165
+
166
+ for task_id in task_ids:
167
+ remaining = timeout - (time.time() - start_time)
168
+ if remaining <= 0:
169
+ results.append({"error": "timeout"})
170
+ continue
171
+
172
+ while True:
173
+ status = self.get_task_status(task_id)
174
+ if status["status"] == "completed":
175
+ results.append(status["result"])
176
+ break
177
+ elif status["status"] == "pending":
178
+ # Retry Task-Verteilung
179
+ self._distribute_task(task_id)
180
+
181
+ time.sleep(0.5)
182
+
183
+ if time.time() - start_time > timeout:
184
+ results.append({"error": "timeout"})
185
+ break
186
+
187
+ return results
188
+
189
+ # Globaler Controller
190
+ controller = ClusterController()
191
+
192
+ # ============================================
193
+ # Gradio Interface
194
+ # ============================================
195
+
196
+ def ui_submit_task(task_type, data_str):
197
+ """UI: Task submit"""
198
+ import numpy as np
199
+
200
+ try:
201
+ data = json.loads(data_str)
202
+ if isinstance(data, list):
203
+ data = np.array(data)
204
+
205
+ task_id = controller.submit_task(task_type, data)
206
+ return f"✅ Task submitted: `{task_id[:8]}`"
207
+ except Exception as e:
208
+ return f"❌ Error: {e}"
209
+
210
+ def ui_get_status():
211
+ """UI: Cluster Status anzeigen"""
212
+ status = controller.get_cluster_status()
213
+
214
+ workers_html = "<br>".join([
215
+ f" • {w['id']}: {'🟢' if w['status'] == 'ready' else '🔴'} ({w['tasks']} Tasks)"
216
+ for w in status["worker_list"]
217
+ ]) or " Keine Worker registriert"
218
+
219
+ return f"""
220
+ ## Cluster Status
221
+
222
+ ### Workers
223
+ - Gesamt: {status['workers']['total']}
224
+ - Bereit: {status['workers']['ready']} 🟢
225
+ - Beschäftigt: {status['workers']['busy']} 🔴
226
+
227
+ ### Tasks
228
+ - Gesamt: {status['tasks']['total']}
229
+ - Pending: {status['tasks']['pending']}
230
+ - Abgeschlossen: {status['tasks']['completed']}
231
+
232
+ ### Worker Liste
233
+ {workers_html}
234
+ """
235
+
236
+ def ui_check_task(task_id):
237
+ """UI: Task-Status prüfen"""
238
+ status = controller.get_task_status(task_id)
239
+ return json.dumps(status, indent=2, default=str)
240
+
241
+ def ui_process_batch(num_chunks):
242
+ """UI: Batch Processing Demo"""
243
+ import numpy as np
244
+
245
+ # Daten in Chunks teilen
246
+ data = np.random.random(10000)
247
+ chunks = np.array_split(data, int(num_chunks))
248
+
249
+ # Batch verarbeiten
250
+ results = controller.process_batch("sum", chunks)
251
+
252
+ # Ergebnisse aggregieren
253
+ valid_results = [r for r in results if isinstance(r, (int, float))]
254
+ total = sum(valid_results)
255
+
256
+ return f"""
257
+ ### Batch-Ergebnis
258
+
259
+ - Chunks: {len(chunks)}
260
+ - Ergebnisse: {len(valid_results)}/{len(results)}
261
+ - Summe: {total:.4f}
262
+ - Durchschnitte: {[f'{r:.4f}' for r in valid_results[:5]]}{'...' if len(valid_results) > 5 else ''}
263
+ """
264
+
265
+ # Gradio UI
266
+ with gr.Blocks(title="Cluster Controller") as demo:
267
+ gr.Markdown("# 🤗 Hugging Face Spaces Cluster Controller")
268
+
269
+ with gr.Tabs():
270
+ with gr.Tab("Cluster Status"):
271
+ status_btn = gr.Button("Status aktualisieren")
272
+ status_output = gr.Markdown(ui_get_status())
273
+ status_btn.click(ui_get_status, outputs=status_output)
274
+
275
+ with gr.Tab("Task Submit"):
276
+ task_type = gr.Dropdown(
277
+ choices=["sum", "mean", "matrix_multiply", "inference"],
278
+ value="sum",
279
+ label="Task Typ"
280
+ )
281
+ data_input = gr.Textbox(
282
+ label="Daten (JSON)",
283
+ placeholder="[1, 2, 3, 4, 5]",
284
+ value="[1, 2, 3, 4, 5]"
285
+ )
286
+ submit_btn = gr.Button("Task absenden")
287
+ task_result = gr.Textbox(label="Ergebnis")
288
+ submit_btn.click(ui_submit_task, inputs=[task_type, data_input], outputs=task_result)
289
+
290
+ with gr.Tab("Batch Processing"):
291
+ num_chunks = gr.Slider(1, 10, value=3, step=1, label="Anzahl Chunks")
292
+ batch_btn = gr.Button("Batch starten")
293
+ batch_output = gr.Markdown()
294
+ batch_btn.click(ui_process_batch, inputs=num_chunks, outputs=batch_output)
295
+
296
+ with gr.Tab("API Info"):
297
+ gr.Markdown("""
298
+ ## API Endpoints
299
+
300
+ ```
301
+ POST /api/register
302
+ {"worker_id": "worker-1"}
303
+
304
+ GET /api/get_task?worker_id=worker-1
305
+
306
+ POST /api/submit_result
307
+ {"worker_id": "worker-1", "task_id": "...", "result": 42}
308
+
309
+ GET /api/task_status?task_id=...
310
+
311
+ GET /api/cluster_status
312
+ ```
313
+ """)
314
+
315
+ # Auto-refresh alle 5 Sekunden
316
+ demo.load(ui_get_status, outputs=status_output, every=5)
317
+
318
+ # ============================================
319
+ # FastAPI Backend (für Worker-Kommunikation)
320
+ # ============================================
321
+
322
+ from fastapi import FastAPI, Request
323
+ from fastapi.responses import JSONResponse
324
+
325
+ app = FastAPI()
326
+
327
+ @app.post("/api/register")
328
+ async def register_worker(request: Request):
329
+ data = await request.json()
330
+ return controller.register_worker(data["worker_id"])
331
+
332
+ @app.get("/api/get_task")
333
+ async def get_task(worker_id: str):
334
+ # Einfache Implementierung - in Produktion besser queue-basiert
335
+ with controller.lock:
336
+ for task_id, task in controller.tasks.items():
337
+ if task["status"] == "pending":
338
+ task["status"] = "assigned"
339
+ task["worker_id"] = worker_id
340
+ return {"id": task_id, "type": task["type"], "data": task["data"]}
341
+ return {}
342
+
343
+ @app.post("/api/submit_result")
344
+ async def submit_result(request: Request):
345
+ data = await request.json()
346
+ return controller.submit_result(
347
+ data["worker_id"],
348
+ data["task_id"],
349
+ data["result"]
350
+ )
351
+
352
+ @app.get("/api/task_status")
353
+ async def task_status(task_id: str):
354
+ return controller.get_task_status(task_id)
355
+
356
+ @app.get("/api/cluster_status")
357
+ async def cluster_status():
358
+ return controller.get_cluster_status()
359
+
360
+ # ============================================
361
+ # Main
362
+ # ============================================
363
+
364
+ if __name__ == "__main__":
365
+ import uvicorn
366
+
367
+ print(f"🚀 Starte Cluster Controller: {CONTROLLER_ID}")
368
+ print(f" Space: {SPACE_NAME}")
369
+
370
+ # Gradio + FastAPI starten
371
+ demo.launch(server_name="0.0.0.0", server_port=7860)