Timo123432345443 commited on
Commit
62c45f9
·
verified ·
1 Parent(s): 57a1238

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +139 -99
app.py CHANGED
@@ -2,18 +2,16 @@
2
  Hugging Face Spaces Cluster - Worker App
3
  =========================================
4
  Läuft auf jedem Worker Space und führt Tasks aus.
5
-
6
- Deployment:
7
- 1. Diese Datei als app.py auf Hugging Face Space hochladen
8
- 2. requirements.txt hochladen
9
- 3. Space startet automatisch
10
  """
11
 
12
  import os
13
  import time
14
  import requests
15
  import numpy as np
16
- from huggingface_hub import HfApi
 
 
 
17
 
18
  # Hugging Face Konfiguration
19
  HF_TOKEN = os.getenv("HF_TOKEN", "")
@@ -21,8 +19,13 @@ WORKER_ID = os.getenv("WORKER_ID", "worker-1")
21
  CONTROLLER_URL = os.getenv("CONTROLLER_URL", "")
22
  SPACE_NAME = os.getenv("SPACE_NAME", "")
23
 
24
- # API für Task-Empfang
25
- import gradio as gr
 
 
 
 
 
26
 
27
  # ============================================
28
  # Task-Verarbeitung
@@ -32,44 +35,135 @@ def process_task(task_type, data):
32
  """Verarbeitet einen Task vom Controller"""
33
  print(f"[{WORKER_ID}] Verarbeite Task: {task_type}")
34
 
 
 
 
35
  if task_type == "sum":
36
- result = np.sum(data)
37
  elif task_type == "mean":
38
- result = np.mean(data)
39
  elif task_type == "matrix_multiply":
40
  a, b = data
41
- result = np.dot(a, b)
42
  elif task_type == "inference":
43
- # Beispiel: ML-Inferenz
44
- result = np.mean(data) # Platzhalter
45
  else:
46
  result = None
47
 
48
- print(f"[{WORKER_ID}] Task abgeschlossen: {result}")
49
  return result
50
 
51
- def health_check():
52
- """Sendet Health-Check an Controller"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  if not CONTROLLER_URL:
54
  return
55
 
56
  try:
57
- requests.post(
58
- f"{CONTROLLER_URL}/api/register",
59
- json={"worker_id": WORKER_ID, "status": "ready"},
60
  timeout=5
61
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  except Exception as e:
63
- print(f"Health-Check fehlgeschlagen: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
  # ============================================
66
- # Gradio Interface (für Space)
67
  # ============================================
68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  def worker_interface(task_json):
70
  """Gradio Interface für manuelle Tasks"""
71
- import json
72
-
73
  try:
74
  task = json.loads(task_json)
75
  task_type = task.get("type", "sum")
@@ -85,32 +179,36 @@ def worker_interface(task_json):
85
  except Exception as e:
86
  return json.dumps({"status": "error", "message": str(e)})
87
 
88
- def get_worker_status():
89
- """Zeigt Worker-Status"""
90
- return f"""
91
- ## Worker Status
92
-
93
- **Worker ID:** {WORKER_ID}
94
- **Space:** {SPACE_NAME}
95
- **Controller:** {CONTROLLER_URL or 'Nicht konfiguriert'}
96
- **Status:** 🟢 Ready
97
 
98
- ### Verfügbare Tasks:
99
- - sum: Array-Summe
100
- - mean: Array-Durchschnitt
101
- - matrix_multiply: Matrix-Multiplikation
102
- - inference: ML-Inferenz
103
- """
 
 
104
 
105
  # Gradio UI
106
  with gr.Blocks(title=f"Cluster Worker - {WORKER_ID}") as demo:
107
- gr.Markdown(f"# 🤗 Cluster Worker: {WORKER_ID}")
108
 
109
  with gr.Row():
110
  with gr.Column():
111
  status_md = gr.Markdown(get_worker_status())
 
 
112
 
113
  with gr.Column():
 
 
 
 
 
114
  gr.Markdown("### Task manuell ausführen")
115
  task_input = gr.Textbox(
116
  label="Task JSON",
@@ -120,70 +218,12 @@ with gr.Blocks(title=f"Cluster Worker - {WORKER_ID}") as demo:
120
  run_btn = gr.Button("Task ausführen")
121
  result_output = gr.Textbox(label="Ergebnis")
122
 
123
- run_btn.click(
124
- fn=worker_interface,
125
- inputs=task_input,
126
- outputs=result_output
127
- )
128
-
129
- # ============================================
130
- # Background Polling (für Controller-Tasks)
131
- # ============================================
132
-
133
- def poll_controller():
134
- """Pollt Controller auf neue Tasks"""
135
- if not CONTROLLER_URL:
136
- return
137
-
138
- try:
139
- response = requests.get(
140
- f"{CONTROLLER_URL}/api/get_task",
141
- params={"worker_id": WORKER_ID},
142
- timeout=5
143
- )
144
-
145
- if response.status_code == 200:
146
- task = response.json()
147
- if task.get("type"):
148
- result = process_task(task["type"], task["data"])
149
-
150
- # Ergebnis zurück senden
151
- requests.post(
152
- f"{CONTROLLER_URL}/api/submit_result",
153
- json={
154
- "worker_id": WORKER_ID,
155
- "task_id": task.get("id"),
156
- "result": result
157
- },
158
- timeout=5
159
- )
160
- except Exception as e:
161
- print(f"Polling fehlgeschlagen: {e}")
162
-
163
- # Background-Task starten
164
- import threading
165
-
166
- def start_polling():
167
- while True:
168
- poll_controller()
169
- time.sleep(2) # Alle 2 Sekunden pollern
170
-
171
- threading.Thread(target=start_polling, daemon=True).start()
172
 
173
  # ============================================
174
  # Main
175
  # ============================================
176
 
177
  if __name__ == "__main__":
178
- print(f"🚀 Starte Worker: {WORKER_ID}")
179
- print(f" Space: {SPACE_NAME}")
180
- print(f" Controller: {CONTROLLER_URL}")
181
-
182
- # Health-Check senden
183
- health_check()
184
-
185
- # Gradio starten
186
  demo.launch(server_name="0.0.0.0", server_port=7860)
187
-
188
- # Für Hugging Face Spaces - die App muss 'app' heißen
189
- app = demo
 
2
  Hugging Face Spaces Cluster - Worker App
3
  =========================================
4
  Läuft auf jedem Worker Space und führt Tasks aus.
 
 
 
 
 
5
  """
6
 
7
  import os
8
  import time
9
  import requests
10
  import numpy as np
11
+ import threading
12
+ import json
13
+
14
+ import gradio as gr
15
 
16
  # Hugging Face Konfiguration
17
  HF_TOKEN = os.getenv("HF_TOKEN", "")
 
19
  CONTROLLER_URL = os.getenv("CONTROLLER_URL", "")
20
  SPACE_NAME = os.getenv("SPACE_NAME", "")
21
 
22
+ print(f"\n{'='*60}")
23
+ print(f"🔧 Worker startet...")
24
+ print(f" WORKER_ID: {WORKER_ID}")
25
+ print(f" CONTROLLER_URL: {CONTROLLER_URL}")
26
+ print(f" SPACE_NAME: {SPACE_NAME}")
27
+ print(f" HF_TOKEN: {HF_TOKEN[:10]}...")
28
+ print(f"{'='*60}\n")
29
 
30
  # ============================================
31
  # Task-Verarbeitung
 
35
  """Verarbeitet einen Task vom Controller"""
36
  print(f"[{WORKER_ID}] Verarbeite Task: {task_type}")
37
 
38
+ if isinstance(data, list):
39
+ data = np.array(data)
40
+
41
  if task_type == "sum":
42
+ result = float(np.sum(data))
43
  elif task_type == "mean":
44
+ result = float(np.mean(data))
45
  elif task_type == "matrix_multiply":
46
  a, b = data
47
+ result = np.dot(np.array(a), np.array(b)).tolist()
48
  elif task_type == "inference":
49
+ result = float(np.mean(data))
 
50
  else:
51
  result = None
52
 
53
+ print(f"[{WORKER_ID}] Ergebnis: {result}")
54
  return result
55
 
56
+ # ============================================
57
+ # Beim Controller registrieren
58
+ # ============================================
59
+
60
+ def register_with_controller():
61
+ """Registriert sich beim Controller"""
62
+ if not CONTROLLER_URL:
63
+ print("❌ CONTROLLER_URL nicht gesetzt!")
64
+ return False
65
+
66
+ try:
67
+ # Versuche HTTP und HTTPS
68
+ urls_to_try = [
69
+ CONTROLLER_URL,
70
+ CONTROLLER_URL.replace("https://", "http://"),
71
+ ]
72
+
73
+ for url in urls_to_try:
74
+ try:
75
+ response = requests.post(
76
+ f"{url}/api/register",
77
+ json={"worker_id": WORKER_ID, "status": "ready"},
78
+ timeout=10
79
+ )
80
+ if response.status_code == 200:
81
+ print(f"✅ Beim Controller registriert: {url}")
82
+ return True
83
+ except Exception as e:
84
+ continue
85
+
86
+ print(f"❌ Registrierung fehlgeschlagen: {CONTROLLER_URL}")
87
+ return False
88
+
89
+ except Exception as e:
90
+ print(f"❌ Fehler bei Registrierung: {e}")
91
+ return False
92
+
93
+ def poll_controller():
94
+ """Pollt Controller auf neue Tasks"""
95
  if not CONTROLLER_URL:
96
  return
97
 
98
  try:
99
+ response = requests.get(
100
+ f"{CONTROLLER_URL}/api/get_task",
101
+ params={"worker_id": WORKER_ID},
102
  timeout=5
103
  )
104
+
105
+ if response.status_code == 200 and response.text.strip():
106
+ try:
107
+ task = response.json()
108
+ if task and task.get("type"):
109
+ print(f"📥 Task erhalten: {task.get('id', 'unknown')[:8]} ({task['type']})")
110
+ result = process_task(task["type"], task["data"])
111
+
112
+ # Ergebnis zurück senden
113
+ requests.post(
114
+ f"{CONTROLLER_URL}/api/submit_result",
115
+ json={
116
+ "worker_id": WORKER_ID,
117
+ "task_id": task.get("id"),
118
+ "result": result
119
+ },
120
+ timeout=5
121
+ )
122
+ print(f"✅ Ergebnis gesendet")
123
+ except json.JSONDecodeError:
124
+ pass # Leere Antwort, kein Task verfügbar
125
+ else:
126
+ pass # Kein Task verfügbar
127
+
128
  except Exception as e:
129
+ pass # Still schweigen beim Polling
130
+
131
+ def start_polling():
132
+ """Startet Background-Polling"""
133
+ # Erst registrieren
134
+ register_with_controller()
135
+
136
+ # Dann pollen
137
+ while True:
138
+ poll_controller()
139
+ time.sleep(3)
140
+
141
+ # Background-Thread starten
142
+ threading.Thread(target=start_polling, daemon=True).start()
143
 
144
  # ============================================
145
+ # Gradio Interface
146
  # ============================================
147
 
148
+ def get_worker_status():
149
+ """Zeigt Worker-Status"""
150
+ registered = "🟢" if CONTROLLER_URL else "🔴"
151
+ return f"""
152
+ ## Worker Status
153
+
154
+ **Worker ID:** {WORKER_ID}
155
+ **Space:** {SPACE_NAME}
156
+ **Controller:** {CONTROLLER_URL or 'Nicht konfiguriert'}
157
+ **Status:** {registered} {'Verbunden' if CONTROLLER_URL else 'Keine URL'}
158
+
159
+ ### Verfügbare Tasks:
160
+ - sum: Array-Summe
161
+ - mean: Array-Durchschnitt
162
+ - matrix_multiply: Matrix-Multiplikation
163
+ """
164
+
165
  def worker_interface(task_json):
166
  """Gradio Interface für manuelle Tasks"""
 
 
167
  try:
168
  task = json.loads(task_json)
169
  task_type = task.get("type", "sum")
 
179
  except Exception as e:
180
  return json.dumps({"status": "error", "message": str(e)})
181
 
182
+ def test_connection():
183
+ """Testet Verbindung zum Controller"""
184
+ if not CONTROLLER_URL:
185
+ return "❌ CONTROLLER_URL nicht gesetzt!"
 
 
 
 
 
186
 
187
+ try:
188
+ response = requests.get(f"{CONTROLLER_URL}/api/cluster_status", timeout=5)
189
+ if response.status_code == 200:
190
+ return f"✅ Controller erreichbar: {CONTROLLER_URL}"
191
+ else:
192
+ return f"❌ Controller antwortet nicht: {response.status_code}"
193
+ except Exception as e:
194
+ return f"❌ Verbindung fehlgeschlagen: {e}"
195
 
196
  # Gradio UI
197
  with gr.Blocks(title=f"Cluster Worker - {WORKER_ID}") as demo:
198
+ gr.Markdown(f"# 🔧 Cluster Worker: {WORKER_ID}")
199
 
200
  with gr.Row():
201
  with gr.Column():
202
  status_md = gr.Markdown(get_worker_status())
203
+ refresh_btn = gr.Button("🔄 Status aktualisieren")
204
+ refresh_btn.click(fn=get_worker_status, outputs=status_md)
205
 
206
  with gr.Column():
207
+ gr.Markdown("### Verbindung testen")
208
+ test_btn = gr.Button("🔗 Teste Controller")
209
+ test_output = gr.Textbox(label="Ergebnis")
210
+ test_btn.click(fn=test_connection, outputs=test_output)
211
+
212
  gr.Markdown("### Task manuell ausführen")
213
  task_input = gr.Textbox(
214
  label="Task JSON",
 
218
  run_btn = gr.Button("Task ausführen")
219
  result_output = gr.Textbox(label="Ergebnis")
220
 
221
+ run_btn.click(fn=worker_interface, inputs=task_input, outputs=result_output)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
 
223
  # ============================================
224
  # Main
225
  # ============================================
226
 
227
  if __name__ == "__main__":
228
+ print(f"🚀 Starte Gradio UI...")
 
 
 
 
 
 
 
229
  demo.launch(server_name="0.0.0.0", server_port=7860)