Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| # central_manager.py | |
| import time | |
| import threading | |
| from typing import Dict, List | |
| import requests | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from peer_discovery import PORT | |
| # ---- إعداد FastAPI ---------------------------------------------------------- | |
| app = FastAPI(title="Central Task Manager") | |
| # ---- نماذج البيانات -------------------------------------------------------- | |
| class RegisterRequest(BaseModel): | |
| """تسجيل أو تجديد ظهور العقدة.""" | |
| url: str # مثلاً: "http://203.0.113.45:PORT/run" | |
| load: float = 0.0 # نسبة تحميل العقدة (0.0 - 1.0)، اختياري | |
| class TaskRequest(BaseModel): | |
| func: str | |
| args: List = [] | |
| kwargs: Dict = {} | |
| complexity: float = 0.0 | |
| # ---- سجلّ العقد ------------------------------------------------------------ | |
| # بنخزّن للعقدة: آخر timestamp و load | |
| peers: Dict[str, Dict] = {} | |
| HEARTBEAT_TTL = 60 # ثواني قبل اعتبار العقدة متوقفة | |
| HEALTH_CHECK_FREQ = 30 # ثواني بين فحوص الصحة الداخلية | |
| # ---- API للعقد لتسجيل نفسها ----------------------------------------------- | |
| async def register_peer(req: RegisterRequest): | |
| """العقدة تستدعي هذه النقطة كلما انطلقت أو دورياً لتجديد ظهورها.""" | |
| peers[req.url] = {"last_seen": time.time(), "load": req.load} | |
| return {"status": "ok", "peers_count": len(peers)} | |
| # ---- API للعمليات --------------------------------------------------------- | |
| async def list_peers(): | |
| """يعيد قائمة بالعقد الصالحة بعد تنقية المتوقفة.""" | |
| now = time.time() | |
| # حذف العقد المتوقفة | |
| for url, info in list(peers.items()): | |
| if now - info["last_seen"] > HEARTBEAT_TTL: | |
| peers.pop(url) | |
| return list(peers.keys()) | |
| async def dispatch_task(task: TaskRequest): | |
| """يتلقى مهمة ويعيد توجيهها لأفضل عقدة أو ينفذ محليّاً.""" | |
| available = await list_peers() | |
| if not available: | |
| raise HTTPException(503, "لا توجد عقد متاحة حاليّاً") | |
| # خوارزمية بسيطة: الاختيار بناءً على أقل تحميل معلن | |
| # أو تدوير دائري إذا لم يعلن أحد عن تحميله | |
| best = None | |
| best_load = 1.1 | |
| for url in available: | |
| load = peers[url].get("load", None) | |
| if load is None: | |
| best = url | |
| break | |
| if load < best_load: | |
| best, best_load = url, load | |
| if not best: | |
| best = available[0] | |
| # إعادة توجيه الطلب | |
| try: | |
| resp = requests.post(best, json=task.dict(), timeout=10) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception as e: | |
| raise HTTPException(502, f"فشل التوجيه إلى {best}: {e}") | |
| # ---- فحص دوري لصحة العقد --------------------------------------------------- | |
| def health_check_loop(): | |
| while True: | |
| now = time.time() | |
| for url in list(peers.keys()): | |
| health_url = url.replace("/run", "/health") | |
| try: | |
| r = requests.get(health_url, timeout=3) | |
| if r.status_code == 200: | |
| peers[url]["last_seen"] = now | |
| # يمكنك تحديث load من رد /health إذا وفّرته | |
| else: | |
| peers.pop(url) | |
| except: | |
| peers.pop(url) | |
| time.sleep(HEALTH_CHECK_FREQ) | |
| # ---- تشغيل الخلفيات وخادم FastAPI ------------------------------------------ | |
| if __name__ == "__main__": | |
| # شغل لوب الفحص الطبي في الخلفية | |
| threading.Thread(target=health_check_loop, daemon=True).start() | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=1500) | |