""" Hugging Face Spaces Cluster - Worker ===================================== Läuft auf jedem Worker Space und führt Tasks aus. Deployment: 1. Diese Datei auf Hugging Face Space hochladen 2. requirements.txt hochladen 3. Space startet automatisch """ import os import time import requests import numpy as np from huggingface_hub import HfApi # Hugging Face Konfiguration HF_TOKEN = os.getenv("HF_TOKEN", "") WORKER_ID = os.getenv("WORKER_ID", "worker-1") CONTROLLER_URL = os.getenv("CONTROLLER_URL", "") SPACE_NAME = os.getenv("SPACE_NAME", "") # API für Task-Empfang import gradio as gr # ============================================ # Task-Verarbeitung # ============================================ def process_task(task_type, data): """Verarbeitet einen Task vom Controller""" print(f"[{WORKER_ID}] Verarbeite Task: {task_type}") if task_type == "sum": result = np.sum(data) elif task_type == "mean": result = np.mean(data) elif task_type == "matrix_multiply": a, b = data result = np.dot(a, b) elif task_type == "inference": # Beispiel: ML-Inferenz result = np.mean(data) # Platzhalter else: result = None print(f"[{WORKER_ID}] Task abgeschlossen: {result}") return result def health_check(): """Sendet Health-Check an Controller""" if not CONTROLLER_URL: return try: requests.post( f"{CONTROLLER_URL}/api/register", json={"worker_id": WORKER_ID, "status": "ready"}, timeout=5 ) except Exception as e: print(f"Health-Check fehlgeschlagen: {e}") # ============================================ # Gradio Interface (für Space) # ============================================ def worker_interface(task_json): """Gradio Interface für manuelle Tasks""" import json try: task = json.loads(task_json) task_type = task.get("type", "sum") data = np.array(task.get("data", [1, 2, 3])) result = process_task(task_type, data) return json.dumps({ "status": "success", "worker": WORKER_ID, "result": float(result) if isinstance(result, (int, float)) else result.tolist() }) except Exception as e: return json.dumps({"status": "error", "message": str(e)}) def get_worker_status(): """Zeigt Worker-Status""" return f""" ## Worker Status **Worker ID:** {WORKER_ID} **Space:** {SPACE_NAME} **Controller:** {CONTROLLER_URL or 'Nicht konfiguriert'} **Status:** 🟢 Ready ### Verfügbare Tasks: - sum: Array-Summe - mean: Array-Durchschnitt - matrix_multiply: Matrix-Multiplikation - inference: ML-Inferenz """ # Gradio UI with gr.Blocks(title=f"Cluster Worker - {WORKER_ID}") as demo: gr.Markdown(f"# 🤗 Cluster Worker: {WORKER_ID}") with gr.Row(): with gr.Column(): status_md = gr.Markdown(get_worker_status()) with gr.Column(): gr.Markdown("### Task manuell ausführen") task_input = gr.Textbox( label="Task JSON", placeholder='{"type": "sum", "data": [1, 2, 3, 4, 5]}', value='{"type": "sum", "data": [1, 2, 3, 4, 5]}' ) run_btn = gr.Button("Task ausführen") result_output = gr.Textbox(label="Ergebnis") run_btn.click( fn=worker_interface, inputs=task_input, outputs=result_output ) # ============================================ # Background Polling (für Controller-Tasks) # ============================================ def poll_controller(): """Pollt Controller auf neue Tasks""" if not CONTROLLER_URL: return try: response = requests.get( f"{CONTROLLER_URL}/api/get_task", params={"worker_id": WORKER_ID}, timeout=5 ) if response.status_code == 200: task = response.json() if task.get("type"): result = process_task(task["type"], task["data"]) # Ergebnis zurück senden requests.post( f"{CONTROLLER_URL}/api/submit_result", json={ "worker_id": WORKER_ID, "task_id": task.get("id"), "result": result }, timeout=5 ) except Exception as e: print(f"Polling fehlgeschlagen: {e}") # Background-Task starten import threading def start_polling(): while True: poll_controller() time.sleep(2) # Alle 2 Sekunden pollern threading.Thread(target=start_polling, daemon=True).start() # ============================================ # Main # ============================================ if __name__ == "__main__": print(f"🚀 Starte Worker: {WORKER_ID}") print(f" Space: {SPACE_NAME}") print(f" Controller: {CONTROLLER_URL}") # Health-Check senden health_check() # Gradio starten demo.launch(server_name="0.0.0.0", server_port=7860)