Timo123432345443 commited on
Commit
5ac7273
Β·
verified Β·
1 Parent(s): 5ab52dd

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +85 -106
app.py CHANGED
@@ -1,6 +1,6 @@
1
  """
2
  Hugging Face Spaces Cluster - Controller
3
- Basierend auf simple_cluster.py
4
  """
5
 
6
  import os
@@ -8,22 +8,22 @@ import time
8
  import uuid
9
  import threading
10
  import json
11
- from http.server import HTTPServer, BaseHTTPRequestHandler
12
- from urllib.parse import urlparse, parse_qs
13
  import gradio as gr
14
 
15
  CONTROLLER_ID = os.getenv("CONTROLLER_ID", "controller")
16
 
17
- class SimpleController:
18
  def __init__(self):
19
  self.workers = {}
20
  self.tasks = {}
21
  self.lock = threading.Lock()
22
 
23
- def add_worker(self, worker_id, address=""):
24
  with self.lock:
25
- self.workers[worker_id] = {"address": address, "status": "ready", "tasks": 0}
26
- print(f"βœ… Worker registriert: {worker_id}")
 
27
 
28
  def get_worker(self):
29
  with self.lock:
@@ -33,130 +33,109 @@ class SimpleController:
33
  return wid
34
  return None
35
 
36
- def submit_task(self, task_type, data):
37
- task_id = str(uuid.uuid4())
38
  with self.lock:
39
- self.tasks[task_id] = {"type": task_type, "data": data, "status": "pending", "worker_id": None, "result": None}
40
- self._distribute(task_id)
41
- return task_id
42
 
43
- def _distribute(self, task_id):
44
- worker_id = self.get_worker()
45
- if worker_id:
46
  with self.lock:
47
- self.tasks[task_id]["worker_id"] = worker_id
48
- self.tasks[task_id]["status"] = "assigned"
49
- print(f"πŸ“€ Task {task_id[:8]} β†’ {worker_id}")
50
 
51
- def submit_result(self, worker_id, task_id, result):
52
  with self.lock:
53
- if task_id in self.tasks:
54
- self.tasks[task_id]["result"] = result
55
- self.tasks[task_id]["status"] = "completed"
56
- if worker_id in self.workers:
57
- self.workers[worker_id]["status"] = "ready"
58
- self.workers[worker_id]["tasks"] += 1
59
- print(f"βœ… Task {task_id[:8]} abgeschlossen")
60
-
61
- def get_status(self):
 
 
 
 
 
 
 
 
 
 
62
  with self.lock:
63
  return {
64
- "workers": {"total": len(self.workers), "ready": sum(1 for w in self.workers.values() if w["status"] == "ready")},
65
- "tasks": {"total": len(self.tasks), "completed": sum(1 for t in self.tasks.values() if t["status"] == "completed")},
66
- "worker_list": [{"id": wid, "status": info["status"], "tasks": info["tasks"]} for wid, info in self.workers.items()]
 
 
67
  }
68
 
69
- controller = SimpleController()
70
-
71
- # HTTP Server fΓΌr Worker API
72
- class ControllerHandler(BaseHTTPRequestHandler):
73
- def do_POST(self):
74
- length = int(self.headers.get('Content-Length', 0))
75
- body = json.loads(self.rfile.read(length).decode()) if length else {}
76
-
77
- if self.path == "/api/register":
78
- controller.add_worker(body.get("worker_id", "unknown"))
79
- self.send_json({"status": "ok"})
80
- elif self.path == "/api/submit_result":
81
- controller.submit_result(body["worker_id"], body["task_id"], body["result"])
82
- self.send_json({"status": "ok"})
83
- else:
84
- self.send_error(404)
85
-
86
- def do_GET(self):
87
- params = parse_qs(urlparse(self.path).query)
88
-
89
- if self.path.startswith("/api/get_task"):
90
- worker_id = params.get("worker_id", [""])[0]
91
- with controller.lock:
92
- for tid, t in controller.tasks.items():
93
- if t["status"] == "pending":
94
- t["status"] = "assigned"
95
- t["worker_id"] = worker_id
96
- self.send_json({"id": tid, "type": t["type"], "data": t["data"]})
97
- return
98
- self.send_json({})
99
- elif self.path == "/api/cluster_status":
100
- self.send_json(controller.get_status())
101
- else:
102
- self.send_response(200)
103
- self.send_header("Content-Type", "text/html")
104
- self.end_headers()
105
- self.wfile.write(b"<h1>Cluster Controller</h1>")
106
-
107
- def send_json(self, data):
108
- self.send_response(200)
109
- self.send_header("Content-Type", "application/json")
110
- self.end_headers()
111
- self.wfile.write(json.dumps(data).encode())
112
-
113
- def log_message(self, *args): pass
114
-
115
- def run_server(port=7861):
116
- server = HTTPServer(("0.0.0.0", port), ControllerHandler)
117
- print(f"🌐 API Server lÀuft auf Port {port}")
118
- server.serve_forever()
119
 
120
- # UI Functions
121
  def get_status():
122
- s = controller.get_status()
123
- workers = "<br>".join([f"β€’ {w['id']}: {'🟒' if w['status']=='ready' else 'πŸ”΄'} ({w['tasks']} Tasks)" for w in s["worker_list"]]) or "Keine Worker"
124
- return f"""## Cluster Status
125
- **Worker:** {s['workers']['total']} ({s['workers']['ready']} ready)
126
- **Tasks:** {s['tasks']['total']} ({s['tasks']['completed']} done)
127
 
128
- **Worker:**
129
- {workers}"""
130
 
131
- def submit_task(typ, data):
132
  try:
133
- d = json.loads(data)
134
- tid = controller.submit_task(typ, d)
135
- return f"βœ… Task: `{tid[:8]}`"
136
  except Exception as e:
137
  return f"❌ {e}"
138
 
139
- # Gradio UI
140
- with gr.Blocks(title="Cluster Controller") as demo:
141
  gr.Markdown("# πŸ€— Cluster Controller")
142
  with gr.Tabs():
143
  with gr.Tab("Status"):
144
- btn = gr.Button("πŸ”„ Refresh")
145
  out = gr.Markdown()
146
  btn.click(fn=get_status, outputs=out)
147
  demo.load(fn=get_status, outputs=out)
148
  with gr.Tab("Task"):
149
  t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
150
- d = gr.Textbox(label="Daten (JSON)", value="[1,2,3,4,5]")
151
- b = gr.Button("Senden")
152
- r = gr.Textbox(label="Ergebnis")
153
- b.click(fn=submit_task, inputs=[t,d], outputs=r)
154
- with gr.Tab("Info"):
155
- gr.Markdown(f"**API:** http://0.0.0.0:7861\n- POST /api/register\n- GET /api/get_task?worker_id=x\n- POST /api/submit_result")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
  if __name__ == "__main__":
158
- # Start HTTP Server im Hintergrund
159
- threading.Thread(target=run_server, daemon=True).start()
160
- time.sleep(1)
161
- print("πŸš€ Starte Controller UI...")
162
  demo.launch(server_name="0.0.0.0", server_port=7860)
 
1
  """
2
  Hugging Face Spaces Cluster - Controller
3
+ Alles auf Port 7860 (Gradio + API)
4
  """
5
 
6
  import os
 
8
  import uuid
9
  import threading
10
  import json
11
+ import numpy as np
 
12
  import gradio as gr
13
 
14
  CONTROLLER_ID = os.getenv("CONTROLLER_ID", "controller")
15
 
16
+ class Controller:
17
  def __init__(self):
18
  self.workers = {}
19
  self.tasks = {}
20
  self.lock = threading.Lock()
21
 
22
+ def register_worker(self, worker_id):
23
  with self.lock:
24
+ self.workers[worker_id] = {"status": "ready", "tasks": 0}
25
+ print(f"βœ… Worker: {worker_id}")
26
+ return {"status": "ok"}
27
 
28
  def get_worker(self):
29
  with self.lock:
 
33
  return wid
34
  return None
35
 
36
+ def submit_task(self, typ, data):
37
+ tid = str(uuid.uuid4())
38
  with self.lock:
39
+ self.tasks[tid] = {"type": typ, "data": data, "status": "pending", "result": None}
40
+ self._distribute(tid)
41
+ return tid
42
 
43
+ def _distribute(self, tid):
44
+ wid = self.get_worker()
45
+ if wid:
46
  with self.lock:
47
+ self.tasks[tid]["worker_id"] = wid
48
+ self.tasks[tid]["status"] = "assigned"
49
+ print(f"πŸ“€ {tid[:8]} β†’ {wid}")
50
 
51
+ def submit_result(self, wid, tid, result):
52
  with self.lock:
53
+ if tid in self.tasks:
54
+ self.tasks[tid]["result"] = result
55
+ self.tasks[tid]["status"] = "completed"
56
+ if wid in self.workers:
57
+ self.workers[wid]["status"] = "ready"
58
+ self.workers[wid]["tasks"] += 1
59
+ print(f"βœ… {tid[:8]} done")
60
+ return {"status": "ok"}
61
+
62
+ def get_task(self, wid):
63
+ with self.lock:
64
+ for tid, t in self.tasks.items():
65
+ if t["status"] == "pending":
66
+ t["status"] = "assigned"
67
+ t["worker_id"] = wid
68
+ return {"id": tid, "type": t["type"], "data": t["data"]}
69
+ return {}
70
+
71
+ def status(self):
72
  with self.lock:
73
  return {
74
+ "workers": len(self.workers),
75
+ "ready": sum(1 for w in self.workers.values() if w["status"] == "ready"),
76
+ "tasks": len(self.tasks),
77
+ "done": sum(1 for t in self.tasks.values() if t["status"] == "completed"),
78
+ "worker_list": [{"id": k, "status": v["status"], "tasks": v["tasks"]} for k,v in self.workers.items()]
79
  }
80
 
81
+ ctrl = Controller()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
+ # UI
84
  def get_status():
85
+ s = ctrl.status()
86
+ wlist = "<br>".join([f"β€’ {w['id']}: {'🟒' if w['status']=='ready' else 'πŸ”΄'} ({w['tasks']})" for w in s["worker_list"]]) or "Keine"
87
+ return f"""## Status
88
+ Worker: {s['workers']} ({s['ready']} ready)
89
+ Tasks: {s['tasks']} ({s['done']} done)
90
 
91
+ {wlist}"""
 
92
 
93
+ def send_task(typ, data):
94
  try:
95
+ d = json.loads(data) if isinstance(data, str) else data
96
+ tid = ctrl.submit_task(typ, d)
97
+ return f"βœ… {tid[:8]}"
98
  except Exception as e:
99
  return f"❌ {e}"
100
 
101
+ with gr.Blocks(title="Controller") as demo:
 
102
  gr.Markdown("# πŸ€— Cluster Controller")
103
  with gr.Tabs():
104
  with gr.Tab("Status"):
105
+ btn = gr.Button("πŸ”„")
106
  out = gr.Markdown()
107
  btn.click(fn=get_status, outputs=out)
108
  demo.load(fn=get_status, outputs=out)
109
  with gr.Tab("Task"):
110
  t = gr.Dropdown(choices=["sum", "mean"], value="sum", label="Typ")
111
+ d = gr.Textbox(label="Daten", value="[1,2,3,4,5]")
112
+ b = gr.Button("Send")
113
+ r = gr.Textbox(label="Result")
114
+ b.click(fn=send_task, inputs=[t,d], outputs=r)
115
+
116
+ # API Routes - direkt an Gradio App hΓ€ngen
117
+ from fastapi import Request
118
+
119
+ def setup_api():
120
+ @demo.app.post("/api/register")
121
+ async def register(req: Request):
122
+ data = await req.json()
123
+ return ctrl.register_worker(data.get("worker_id", "unknown"))
124
+
125
+ @demo.app.get("/api/get_task")
126
+ async def get_task(worker_id: str):
127
+ return ctrl.get_task(worker_id)
128
+
129
+ @demo.app.post("/api/submit_result")
130
+ async def submit_result(req: Request):
131
+ data = await req.json()
132
+ return ctrl.submit_result(data["worker_id"], data["task_id"], data["result"])
133
+
134
+ @demo.app.get("/api/cluster_status")
135
+ async def cluster_status():
136
+ return ctrl.status()
137
 
138
  if __name__ == "__main__":
139
+ setup_api()
140
+ print("πŸš€ Start...")
 
 
141
  demo.launch(server_name="0.0.0.0", server_port=7860)