#!/usr/bin/env python3 """ external_server.py — سيرفر مركزي لتوزيع المهام + Dashboard تفاعلي """ import logging import requests import socket from flask import Flask, request, jsonify, render_template from flask_cors import CORS from flask_socketio import SocketIO, emit from peer_discovery import PEERS from peer_discovery import PORT logging.basicConfig(level=logging.INFO) app = Flask(__name__) CORS(app, resources={r"/*": {"origins": "*"}}) socketio = SocketIO(app, cors_allowed_origins="*") connected_peers = {} # {node_id: {"cpu":%, "ram":%, "gpu":%}} # ─────────────── التحقق من توفر المنفذ ─────────────── def is_port_available(port): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('0.0.0.0', port)) return True except OSError: return False # ─────────────── اختيار أفضل Peer ─────────────── def select_best_peer(): if not PEERS: logging.warning("⚠️ لا توجد أجهزة مسجلة حالياً.") return None try: peer_loads = [] for peer_url in PEERS: try: # بناء URL لفحص الحالة status_url = peer_url.replace('/run_task', '/health') if '/run_task' not in peer_url: status_url = f"{peer_url}/health" resp = requests.get(status_url, timeout=2) if resp.ok: data = resp.json() # افتراض أن الخادم يعيد cpu_load أو استخدام قيمة افتراضية cpu_load = data.get("cpu_load", 50) peer_loads.append((peer_url, cpu_load)) logging.info(f"✅ {peer_url} - الحمل: {cpu_load}%") except Exception as e: logging.warning(f"❌ لا يمكن الوصول إلى {peer_url}: {e}") continue if not peer_loads: return None # اختيار الأقل حملًا best_peer = min(peer_loads, key=lambda x: x[1]) logging.info(f"🎯 أفضل جهاز: {best_peer[0]} مع حمل {best_peer[1]}%") return best_peer[0] except Exception as e: logging.error(f"❌ خطأ في اختيار الـ Peer: {e}") return None # ─────────────── API توزيع المهام ─────────────── @app.route("/submit_task", methods=["POST"]) def submit_task(): data = request.get_json() if not data or "func" not in data: return jsonify({"error": "يجب تحديد اسم الدالة (func)"}), 400 peer = select_best_peer() if not peer: return jsonify({"error": "لا توجد أجهزة متاحة حالياً"}), 503 try: # تأكد من أن عنوان الـ Peer صحيح if not peer.startswith('http'): peer = f"http://{peer}" logging.info(f"📤 إرسال المهمة إلى: {peer}") resp = requests.post(peer, json=data, timeout=30) if resp.ok: result = resp.json() logging.info(f"✅ تم تنفيذ المهمة بنجاح على {peer}") return jsonify({"status": "success", "result": result, "executed_on": peer}) else: logging.error(f"❌ فشل تنفيذ المهمة على {peer}: {resp.status_code}") return jsonify({"error": f"فشل إرسال المهمة: {resp.text}"}), 500 except requests.exceptions.Timeout: logging.error(f"⏰ انتهت المهلة أثناء الاتصال بـ {peer}") return jsonify({"error": "انتهت مهلة الاتصال"}), 408 except Exception as e: logging.error(f"❌ خطأ في إرسال المهمة إلى {peer}: {e}") return jsonify({"error": str(e)}), 500 # ─────────────── API تحديث حالة الأجهزة ─────────────── @app.route("/update_status", methods=["POST"]) def update_status(): data = request.json node_id = data.get("node_id") if not node_id: return jsonify({"error": "node_id مطلوب"}), 400 connected_peers[node_id] = { "cpu": data.get("cpu", 0), "ram": data.get("ram", 0), "gpu": data.get("gpu", 0), "last_update": "now" # يمكن إضافة timestamp } socketio.emit("update_peers", connected_peers, broadcast=True) logging.info(f"📊 تم تحديث حالة {node_id}") return jsonify({"status": "ok"}) # ─────────────── صفحة الحالة ─────────────── @app.route("/status") def status(): return jsonify({ "connected_peers": connected_peers, "available_peers": list(PEERS), "total_peers": len(PEERS) }) # ─────────────── صفحة Dashboard ─────────────── @app.route("/") def index(): return render_template("dashboard.html", peers=connected_peers) # ─────────────── استقبال تحديثات الحالة عبر WebSocket ─────────────── @socketio.on("status_update") def handle_status_update(data): connected_peers.update(data) emit("update_peers", connected_peers, broadcast=True) # ─────────────── دردشة ─────────────── @socketio.on("send_message") def handle_message(data): emit("receive_message", data, broadcast=True) # ─────────────── تشغيل السيرفر ─────────────── if __name__ == "__main__": # محاولة منافذ مختلفة ports_to_try = [5005, 5006, 5007, 5008, 5009] selected_port = None for port in ports_to_try: if is_port_available(port): selected_port = port break if selected_port is None: logging.error("❌ لا توجد منافذ متاحة. حاول إغلاق التطبيقات الأخرى.") exit(1) logging.info(f"🚀 بدء السيرفر المركزي مع Dashboard على المنفذ {selected_port}") try: socketio.run(app, host="0.0.0.0", port=selected_port, debug=False) except OSError as e: logging.error(f"❌ فشل تشغيل السيرفر على المنفذ {selected_port}: {e}") except KeyboardInterrupt: logging.info("⏹️ إيقاف السيرفر...") except Exception as e: logging.error(f"❌ خطأ غير متوقع: {e}")