Timo123432345443 commited on
Commit
e741479
Β·
verified Β·
1 Parent(s): 5ac7273

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +66 -18
app.py CHANGED
@@ -1,6 +1,5 @@
1
  """
2
- Hugging Face Spaces Cluster - Controller
3
- Alles auf Port 7860 (Gradio + API)
4
  """
5
 
6
  import os
@@ -11,8 +10,6 @@ import json
11
  import numpy as np
12
  import gradio as gr
13
 
14
- CONTROLLER_ID = os.getenv("CONTROLLER_ID", "controller")
15
-
16
  class Controller:
17
  def __init__(self):
18
  self.workers = {}
@@ -21,7 +18,10 @@ class Controller:
21
 
22
  def register_worker(self, worker_id):
23
  with self.lock:
24
- self.workers[worker_id] = {"status": "ready", "tasks": 0}
 
 
 
25
  print(f"βœ… Worker: {worker_id}")
26
  return {"status": "ok"}
27
 
@@ -36,7 +36,7 @@ class Controller:
36
  def submit_task(self, typ, data):
37
  tid = str(uuid.uuid4())
38
  with self.lock:
39
- self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None}
40
  self._distribute(tid)
41
  return tid
42
 
@@ -47,6 +47,8 @@ class Controller:
47
  self.tasks[tid]["worker_id"] = wid
48
  self.tasks[tid]["status"] = "assigned"
49
  print(f"πŸ“€ {tid[:8]} β†’ {wid}")
 
 
50
 
51
  def submit_result(self, wid, tid, result):
52
  with self.lock:
@@ -65,7 +67,7 @@ class Controller:
65
  if t["status"] == "pending":
66
  t["status"] = "assigned"
67
  t["worker_id"] = wid
68
- return {"id": tid, "type": t["type"], "data": t["data"]}
69
  return {}
70
 
71
  def status(self):
@@ -74,46 +76,92 @@ class Controller:
74
  "workers": len(self.workers),
75
  "ready": sum(1 for w in self.workers.values() if w["status"] == "ready"),
76
  "tasks": len(self.tasks),
 
77
  "done": sum(1 for t in self.tasks.values() if t["status"] == "completed"),
78
  "worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()]
79
  }
80
 
81
  ctrl = Controller()
82
 
83
- # UI
84
  def get_status():
85
  s = ctrl.status()
86
- wlist = "<br>".join([f"β€’ {w['id']}: {'🟒' if w['status']=='ready' else 'πŸ”΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine"
87
- return f"""## Status
88
- Worker: {s['workers']} ({s['ready']} ready)
89
- Tasks: {s['tasks']} ({s['done']} done)
90
 
 
91
  {wlist}"""
92
 
93
  def send_task(typ, data):
94
  try:
95
  d = json.loads(data) if isinstance(data, str) else data
96
  tid = ctrl.submit_task(typ, d)
97
- return f"βœ… {tid[:8]}"
 
 
 
 
 
 
98
  except Exception as e:
99
  return f"❌ {e}"
100
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  with gr.Blocks(title="Controller") as demo:
102
  gr.Markdown("# πŸ€— Cluster Controller")
103
  with gr.Tabs():
104
  with gr.Tab("Status"):
105
- btn = gr.Button("πŸ”„")
106
  out = gr.Markdown()
107
  btn.click(fn=get_status, outputs=out)
108
  demo.load(fn=get_status, outputs=out)
 
109
  with gr.Tab("Task"):
110
  t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
111
- d = gr.Textbox(label="Daten", value="[1,2,3,4,5]")
112
  b = gr.Button("Send")
113
  r = gr.Textbox(label="Result")
114
  b.click(fn=send_task, inputs=[t,d], outputs=r)
115
-
116
- # API Routes - direkt an Gradio App hΓ€ngen
 
 
 
 
 
 
117
  from fastapi import Request
118
 
119
  def setup_api():
@@ -137,5 +185,5 @@ def setup_api():
137
 
138
  if __name__ == "__main__":
139
  setup_api()
140
- print("πŸš€ Start...")
141
  demo.launch(server_name="0.0.0.0", server_port=7860)
 
1
  """
2
+ Hugging Face Spaces Cluster - Controller mit Task API
 
3
  """
4
 
5
  import os
 
10
  import numpy as np
11
  import gradio as gr
12
 
 
 
13
  class Controller:
14
  def __init__(self):
15
  self.workers = {}
 
18
 
19
  def register_worker(self, worker_id):
20
  with self.lock:
21
+ if worker_id not in self.workers:
22
+ self.workers[worker_id] = {"status": "ready", "tasks": 0}
23
+ else:
24
+ self.workers[worker_id]["status"] = "ready"
25
  print(f"βœ… Worker: {worker_id}")
26
  return {"status": "ok"}
27
 
 
36
  def submit_task(self, typ, data):
37
  tid = str(uuid.uuid4())
38
  with self.lock:
39
+ self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None, "worker_id": None}
40
  self._distribute(tid)
41
  return tid
42
 
 
47
  self.tasks[tid]["worker_id"] = wid
48
  self.tasks[tid]["status"] = "assigned"
49
  print(f"πŸ“€ {tid[:8]} β†’ {wid}")
50
+ return True
51
+ return False
52
 
53
  def submit_result(self, wid, tid, result):
54
  with self.lock:
 
67
  if t["status"] == "pending":
68
  t["status"] = "assigned"
69
  t["worker_id"] = wid
70
+ return {"id": tid, "type": t["type"], "data": t["data"].tolist() if isinstance(t["data"], np.ndarray) else t["data"]}
71
  return {}
72
 
73
  def status(self):
 
76
  "workers": len(self.workers),
77
  "ready": sum(1 for w in self.workers.values() if w["status"] == "ready"),
78
  "tasks": len(self.tasks),
79
+ "pending": sum(1 for t in self.tasks.values() if t["status"] == "pending"),
80
  "done": sum(1 for t in self.tasks.values() if t["status"] == "completed"),
81
  "worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()]
82
  }
83
 
84
  ctrl = Controller()
85
 
86
+ # UI Functions
87
  def get_status():
88
  s = ctrl.status()
89
+ wlist = "<br>".join([f"β€’ {w['id']}: {'🟒' if w['status']=='ready' else 'πŸ”΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine Worker"
90
+ return f"""## Cluster Status
91
+ **Worker:** {s['workers']} ({s['ready']} ready)
92
+ **Tasks:** {s['tasks']} ({s['pending']} pending, {s['done']} done)
93
 
94
+ **Worker Liste:**
95
  {wlist}"""
96
 
97
  def send_task(typ, data):
98
  try:
99
  d = json.loads(data) if isinstance(data, str) else data
100
  tid = ctrl.submit_task(typ, d)
101
+ # Warte auf Ergebnis
102
+ for _ in range(50):
103
+ with ctrl.lock:
104
+ if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
105
+ return f"βœ… {tid[:8]}: {ctrl.tasks[tid]['result']}"
106
+ time.sleep(0.1)
107
+ return f"⏳ {tid[:8]}: waiting..."
108
  except Exception as e:
109
  return f"❌ {e}"
110
 
111
+ def run_benchmark(num_chunks):
112
+ """1 Million Berechnungen auf Worker verteilen"""
113
+ chunk_size = 1000000 // num_chunks
114
+ task_ids = []
115
+
116
+ for i in range(num_chunks):
117
+ data = list(range(i * chunk_size, (i + 1) * chunk_size))
118
+ tid = ctrl.submit_task("sum", data)
119
+ task_ids.append(tid)
120
+
121
+ # Warte auf alle
122
+ start = time.time()
123
+ results = []
124
+ while len(results) < len(task_ids) and time.time() - start < 60:
125
+ for tid in task_ids:
126
+ with ctrl.lock:
127
+ if tid in ctrl.tasks and ctrl.tasks[tid]["status"] == "completed":
128
+ if tid not in [r[0] for r in results]:
129
+ results.append((tid, ctrl.tasks[tid]["result"]))
130
+ time.sleep(0.2)
131
+
132
+ total = sum(r[1] for r in results if isinstance(r[1], (int, float)))
133
+ elapsed = time.time() - start
134
+
135
+ return f"""## Benchmark Ergebnis
136
+ **Chunks:** {num_chunks}
137
+ **Ergebnisse:** {len(results)}/{len(task_ids)}
138
+ **Summe:** {total:,}
139
+ **Zeit:** {elapsed:.2f}s
140
+ **Speedup:** {len(task_ids)/elapsed:.1f}x parallel"""
141
+
142
  with gr.Blocks(title="Controller") as demo:
143
  gr.Markdown("# πŸ€— Cluster Controller")
144
  with gr.Tabs():
145
  with gr.Tab("Status"):
146
+ btn = gr.Button("πŸ”„ Refresh")
147
  out = gr.Markdown()
148
  btn.click(fn=get_status, outputs=out)
149
  demo.load(fn=get_status, outputs=out)
150
+
151
  with gr.Tab("Task"):
152
  t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
153
+ d = gr.Textbox(label="Daten (JSON)", value="[1,2,3,4,5]")
154
  b = gr.Button("Send")
155
  r = gr.Textbox(label="Result")
156
  b.click(fn=send_task, inputs=[t,d], outputs=r)
157
+
158
+ with gr.Tab("πŸš€ Benchmark"):
159
+ chunks = gr.Slider(1, 10, value=10, step=1, label="Anzahl Chunks")
160
+ bench_btn = gr.Button("1 Million Berechnungen starten")
161
+ bench_out = gr.Markdown()
162
+ bench_btn.click(fn=run_benchmark, inputs=chunks, outputs=bench_out)
163
+
164
+ # API Routes
165
  from fastapi import Request
166
 
167
  def setup_api():
 
185
 
186
  if __name__ == "__main__":
187
  setup_api()
188
+ print("πŸš€ Starte Controller...")
189
  demo.launch(server_name="0.0.0.0", server_port=7860)