cluster-worker-2 / worker.py
Timo123432345443's picture
Upload worker.py with huggingface_hub
a13c313 verified
"""
Hugging Face Spaces Cluster - Worker
=====================================
Läuft auf jedem Worker Space und führt Tasks aus.
Deployment:
1. Diese Datei auf Hugging Face Space hochladen
2. requirements.txt hochladen
3. Space startet automatisch
"""
import os
import time
import requests
import numpy as np
from huggingface_hub import HfApi
# Hugging Face Konfiguration
HF_TOKEN = os.getenv("HF_TOKEN", "")
WORKER_ID = os.getenv("WORKER_ID", "worker-1")
CONTROLLER_URL = os.getenv("CONTROLLER_URL", "")
SPACE_NAME = os.getenv("SPACE_NAME", "")
# API für Task-Empfang
import gradio as gr
# ============================================
# Task-Verarbeitung
# ============================================
def process_task(task_type, data):
"""Verarbeitet einen Task vom Controller"""
print(f"[{WORKER_ID}] Verarbeite Task: {task_type}")
if task_type == "sum":
result = np.sum(data)
elif task_type == "mean":
result = np.mean(data)
elif task_type == "matrix_multiply":
a, b = data
result = np.dot(a, b)
elif task_type == "inference":
# Beispiel: ML-Inferenz
result = np.mean(data) # Platzhalter
else:
result = None
print(f"[{WORKER_ID}] Task abgeschlossen: {result}")
return result
def health_check():
"""Sendet Health-Check an Controller"""
if not CONTROLLER_URL:
return
try:
requests.post(
f"{CONTROLLER_URL}/api/register",
json={"worker_id": WORKER_ID, "status": "ready"},
timeout=5
)
except Exception as e:
print(f"Health-Check fehlgeschlagen: {e}")
# ============================================
# Gradio Interface (für Space)
# ============================================
def worker_interface(task_json):
"""Gradio Interface für manuelle Tasks"""
import json
try:
task = json.loads(task_json)
task_type = task.get("type", "sum")
data = np.array(task.get("data", [1, 2, 3]))
result = process_task(task_type, data)
return json.dumps({
"status": "success",
"worker": WORKER_ID,
"result": float(result) if isinstance(result, (int, float)) else result.tolist()
})
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
def get_worker_status():
"""Zeigt Worker-Status"""
return f"""
## Worker Status
**Worker ID:** {WORKER_ID}
**Space:** {SPACE_NAME}
**Controller:** {CONTROLLER_URL or 'Nicht konfiguriert'}
**Status:** 🟢 Ready
### Verfügbare Tasks:
- sum: Array-Summe
- mean: Array-Durchschnitt
- matrix_multiply: Matrix-Multiplikation
- inference: ML-Inferenz
"""
# Gradio UI
with gr.Blocks(title=f"Cluster Worker - {WORKER_ID}") as demo:
gr.Markdown(f"# 🤗 Cluster Worker: {WORKER_ID}")
with gr.Row():
with gr.Column():
status_md = gr.Markdown(get_worker_status())
with gr.Column():
gr.Markdown("### Task manuell ausführen")
task_input = gr.Textbox(
label="Task JSON",
placeholder='{"type": "sum", "data": [1, 2, 3, 4, 5]}',
value='{"type": "sum", "data": [1, 2, 3, 4, 5]}'
)
run_btn = gr.Button("Task ausführen")
result_output = gr.Textbox(label="Ergebnis")
run_btn.click(
fn=worker_interface,
inputs=task_input,
outputs=result_output
)
# ============================================
# Background Polling (für Controller-Tasks)
# ============================================
def poll_controller():
"""Pollt Controller auf neue Tasks"""
if not CONTROLLER_URL:
return
try:
response = requests.get(
f"{CONTROLLER_URL}/api/get_task",
params={"worker_id": WORKER_ID},
timeout=5
)
if response.status_code == 200:
task = response.json()
if task.get("type"):
result = process_task(task["type"], task["data"])
# Ergebnis zurück senden
requests.post(
f"{CONTROLLER_URL}/api/submit_result",
json={
"worker_id": WORKER_ID,
"task_id": task.get("id"),
"result": result
},
timeout=5
)
except Exception as e:
print(f"Polling fehlgeschlagen: {e}")
# Background-Task starten
import threading
def start_polling():
while True:
poll_controller()
time.sleep(2) # Alle 2 Sekunden pollern
threading.Thread(target=start_polling, daemon=True).start()
# ============================================
# Main
# ============================================
if __name__ == "__main__":
print(f"🚀 Starte Worker: {WORKER_ID}")
print(f" Space: {SPACE_NAME}")
print(f" Controller: {CONTROLLER_URL}")
# Health-Check senden
health_check()
# Gradio starten
demo.launch(server_name="0.0.0.0", server_port=7860)