Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Spaces Cluster - Worker | |
| ===================================== | |
| Läuft auf jedem Worker Space und führt Tasks aus. | |
| Deployment: | |
| 1. Diese Datei auf Hugging Face Space hochladen | |
| 2. requirements.txt hochladen | |
| 3. Space startet automatisch | |
| """ | |
| import os | |
| import time | |
| import requests | |
| import numpy as np | |
| from huggingface_hub import HfApi | |
| # Hugging Face Konfiguration | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| WORKER_ID = os.getenv("WORKER_ID", "worker-1") | |
| CONTROLLER_URL = os.getenv("CONTROLLER_URL", "") | |
| SPACE_NAME = os.getenv("SPACE_NAME", "") | |
| # API für Task-Empfang | |
| import gradio as gr | |
| # ============================================ | |
| # Task-Verarbeitung | |
| # ============================================ | |
| def process_task(task_type, data): | |
| """Verarbeitet einen Task vom Controller""" | |
| print(f"[{WORKER_ID}] Verarbeite Task: {task_type}") | |
| if task_type == "sum": | |
| result = np.sum(data) | |
| elif task_type == "mean": | |
| result = np.mean(data) | |
| elif task_type == "matrix_multiply": | |
| a, b = data | |
| result = np.dot(a, b) | |
| elif task_type == "inference": | |
| # Beispiel: ML-Inferenz | |
| result = np.mean(data) # Platzhalter | |
| else: | |
| result = None | |
| print(f"[{WORKER_ID}] Task abgeschlossen: {result}") | |
| return result | |
| def health_check(): | |
| """Sendet Health-Check an Controller""" | |
| if not CONTROLLER_URL: | |
| return | |
| try: | |
| requests.post( | |
| f"{CONTROLLER_URL}/api/register", | |
| json={"worker_id": WORKER_ID, "status": "ready"}, | |
| timeout=5 | |
| ) | |
| except Exception as e: | |
| print(f"Health-Check fehlgeschlagen: {e}") | |
| # ============================================ | |
| # Gradio Interface (für Space) | |
| # ============================================ | |
| def worker_interface(task_json): | |
| """Gradio Interface für manuelle Tasks""" | |
| import json | |
| try: | |
| task = json.loads(task_json) | |
| task_type = task.get("type", "sum") | |
| data = np.array(task.get("data", [1, 2, 3])) | |
| result = process_task(task_type, data) | |
| return json.dumps({ | |
| "status": "success", | |
| "worker": WORKER_ID, | |
| "result": float(result) if isinstance(result, (int, float)) else result.tolist() | |
| }) | |
| except Exception as e: | |
| return json.dumps({"status": "error", "message": str(e)}) | |
| def get_worker_status(): | |
| """Zeigt Worker-Status""" | |
| return f""" | |
| ## Worker Status | |
| **Worker ID:** {WORKER_ID} | |
| **Space:** {SPACE_NAME} | |
| **Controller:** {CONTROLLER_URL or 'Nicht konfiguriert'} | |
| **Status:** 🟢 Ready | |
| ### Verfügbare Tasks: | |
| - sum: Array-Summe | |
| - mean: Array-Durchschnitt | |
| - matrix_multiply: Matrix-Multiplikation | |
| - inference: ML-Inferenz | |
| """ | |
| # Gradio UI | |
| with gr.Blocks(title=f"Cluster Worker - {WORKER_ID}") as demo: | |
| gr.Markdown(f"# 🤗 Cluster Worker: {WORKER_ID}") | |
| with gr.Row(): | |
| with gr.Column(): | |
| status_md = gr.Markdown(get_worker_status()) | |
| with gr.Column(): | |
| gr.Markdown("### Task manuell ausführen") | |
| task_input = gr.Textbox( | |
| label="Task JSON", | |
| placeholder='{"type": "sum", "data": [1, 2, 3, 4, 5]}', | |
| value='{"type": "sum", "data": [1, 2, 3, 4, 5]}' | |
| ) | |
| run_btn = gr.Button("Task ausführen") | |
| result_output = gr.Textbox(label="Ergebnis") | |
| run_btn.click( | |
| fn=worker_interface, | |
| inputs=task_input, | |
| outputs=result_output | |
| ) | |
| # ============================================ | |
| # Background Polling (für Controller-Tasks) | |
| # ============================================ | |
| def poll_controller(): | |
| """Pollt Controller auf neue Tasks""" | |
| if not CONTROLLER_URL: | |
| return | |
| try: | |
| response = requests.get( | |
| f"{CONTROLLER_URL}/api/get_task", | |
| params={"worker_id": WORKER_ID}, | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| task = response.json() | |
| if task.get("type"): | |
| result = process_task(task["type"], task["data"]) | |
| # Ergebnis zurück senden | |
| requests.post( | |
| f"{CONTROLLER_URL}/api/submit_result", | |
| json={ | |
| "worker_id": WORKER_ID, | |
| "task_id": task.get("id"), | |
| "result": result | |
| }, | |
| timeout=5 | |
| ) | |
| except Exception as e: | |
| print(f"Polling fehlgeschlagen: {e}") | |
| # Background-Task starten | |
| import threading | |
| def start_polling(): | |
| while True: | |
| poll_controller() | |
| time.sleep(2) # Alle 2 Sekunden pollern | |
| threading.Thread(target=start_polling, daemon=True).start() | |
| # ============================================ | |
| # Main | |
| # ============================================ | |
| if __name__ == "__main__": | |
| print(f"🚀 Starte Worker: {WORKER_ID}") | |
| print(f" Space: {SPACE_NAME}") | |
| print(f" Controller: {CONTROLLER_URL}") | |
| # Health-Check senden | |
| health_check() | |
| # Gradio starten | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |