#!/usr/bin/env python3 # ================================================================ # node_client.py – عميل تسجيل العُقدة في نظام AmalOffload # --------------------------------------------------------------- # • يختار منفذًا (من ENV أو من مجموعة PORTS). # • يجلب عنوان الـ IP المحلي. # • يحاول التسجيل في خادم سجلٍّ مركزي واحد تِلو الآخر، # وعلى كل المنافذ، حتى ينجح. # • عند النجاح يُرجع قائمة الأقران (Peers) من الخادم. # ================================================================ import os import socket import time import logging import random import requests from typing import Iterable, Tuple, List # ⬇️ منافذ مقترحة؛ يمكنك التعديل أو توليدها ديناميكيًا DEFAULT_PORTS = { 7520, 7384, 9021, 6998, 5810, 9274, 8645, 7329, 7734, 8456, 6173, 7860, 8080, 8000, 5000, 3000, 8888, 9999 } # ⬇️ خوادم السجل الاحتياطية بالترتيب المفضَّل DEFAULT_REGISTRY_SERVERS = [ "http://localhost:8888", # خادم محلي أولاً "http://127.0.0.1:8888", # خادم محلي بديل "https://cv4790811.regru.cloud", "https://amaloffload.onrender.com", "https://osamabyc86-offload.hf.space", "http://10.229.36.125", "http://10.229.228.178", "http://192.168.1.1:8888", # راوتر محلي "http://192.168.0.1:8888", # راوتر محلي بديل ] logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) class NodeClient: """ عميل خفيف يعتني بالتسجيل المتكرِّر في خادم سجل مركزي. يمكن استيراده في أي سكربت وتشغيله في خيط منفصل. """ def __init__( self, PORTs: Iterable[int] | None = None, registry_servers: List[str] | None = None, node_id: str | None = None, ): self.PORTs = set(PORTs) if PORTs else DEFAULT_PORTS self.registry_servers = list(registry_servers) if registry_servers else DEFAULT_REGISTRY_SERVERS self.node_id = node_id or os.getenv("NODE_ID", socket.gethostname()) # مبدئيًّا اختَر منفذًا (أولوية للمتغيّر البيئي إن وُجد) self.port: int = int(os.getenv("CPU_PORT", random.choice(list(self.PORTs)))) # 🔧 تصحيح: PORTs بدلاً من PORTS self.current_server_index: int | None = None self.session = requests.Session() self.session.timeout = 10 # ------------------------------------------------------------------------- @staticmethod def get_local_ip() -> str: """يحاول معرفة أفضل عنوان IP محلي لاستخدامه في الشبكة.""" try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: # لا يهم أن ينجح الاتصال الفعلي، الهدف كشف IP واجهة الخروج s.connect(("8.8.8.8", 53)) return s.getsockname()[0] except Exception: try: # محاولة بديلة hostname = socket.gethostname() return socket.gethostbyname(hostname) except Exception: return "127.0.0.1" def _register_once(self, server: str, port: int) -> List[str]: """مُحاولة واحدة للتسجيل؛ تُعيد peers أو ترفع استثناءً.""" payload = { "node_id": self.node_id, "ip": self.get_local_ip(), "port": port, "hostname": socket.gethostname(), "timestamp": time.time() } # جرب مسارات مختلفة للتسجيل endpoints = ["/register", "/api/register", "/nodes/register", "/peer/register"] for endpoint in endpoints: try: url = f"{server.rstrip('/')}{endpoint}" logging.info(f"🔗 محاولة التسجيل في: {url}") resp = self.session.post(url, json=payload, timeout=8) if resp.status_code == 200: data = resp.json() logging.info(f"✅ تسجيل ناجح في {server}") return data.get("peers", []) if isinstance(data, dict) else data elif resp.status_code == 404: continue # جرب endpoint التالي else: resp.raise_for_status() except requests.exceptions.Timeout: logging.warning(f"⏰ انتهت المهلة مع {server}{endpoint}") continue except requests.exceptions.ConnectionError: logging.warning(f"🔌 تعذر الاتصال بـ {server}{endpoint}") continue except Exception as e: logging.warning(f"❌ خطأ مع {server}{endpoint}: {e}") continue # إذا وصلنا هنا، فكل المحاولات فشلت raise Exception(f"فشل التسجيل في {server} بعد تجربة جميع المسارات") # ------------------------------------------------------------------------- def discover_local_servers(self) -> List[str]: """اكتشاف خوادم محلية على الشبكة.""" local_servers = [] base_ip = ".".join(self.get_local_ip().split(".")[:-1]) # فحص نطاق IPs المحلي for i in range(1, 50): # فحص أول 50 عنوان فقط للسرعة if i == int(self.get_local_ip().split(".")[-1]): continue # تخطي الذات ip = f"{base_ip}.{i}" for port in [8888, 8000, 5000, 3000]: server_url = f"http://{ip}:{port}" if self.check_server_availability(server_url): local_servers.append(server_url) logging.info(f"🔍 تم اكتشاف خادم محلي: {server_url}") return local_servers def check_server_availability(self, server_url: str) -> bool: """فحص توفر الخادم.""" try: resp = self.session.get(f"{server_url}/status", timeout=2) return resp.status_code == 200 except: return False def connect_until_success(self, retry_delay: int = 10) -> Tuple[str, List[str]]: """ يدور على جميع المنافذ والخوادم حتى ينجح التسجيل. • عند النجاح يُرجع: (عنوان الخادم، قائمة الأقران) • لا يرفع استثناءات؛ إمّا ينجح أو يستمر في المحاولة إلى ما لا نهاية. """ logging.info("🔄 بدء محاولات التسجيل للعقدة '%s'...", self.node_id) logging.info("🌐 عنوان IP المحلي: %s", self.get_local_ip()) logging.info("📋 عدد الخوادم المتاحة: %d", len(self.registry_servers)) attempt = 0 while True: attempt += 1 logging.info("🔄 محاولة التسجيل رقم %d", attempt) # اكتشاف خوادم محلية أولاً if attempt % 3 == 1: # كل 3 محاولات، اكتشف خوادم محلية local_servers = self.discover_local_servers() all_servers = local_servers + self.registry_servers else: all_servers = self.registry_servers for port in self.PORTs: # 🔧 تصحيح: PORTs بدلاً من PORTS for idx, server in enumerate(all_servers): try: peers = self._register_once(server, port) # سجّل النجاح واحفظ المعلومات self.port = port self.current_server_index = idx logging.info("✅ تسجيل ناجح: %s على المنفذ %s", server, port) logging.info("👥 عدد الأقران المكتشفين: %d", len(peers)) return server, peers except Exception as e: logging.debug("❌ %s:%s -> %s", server, port, e) logging.warning("❌ فشلت جميع محاولات التسجيل، إعادة المحاولة بعد %d ثواني", retry_delay) time.sleep(retry_delay) # ------------------------------------------------------------------------- def run_background(self) -> None: """ إطلاق التسجيل في خيط منفصل؛ مفيد إذا كنت تريد إبقاء Main Thread للمهام الأخرى. """ import threading def background_connect(): try: server, peers = self.connect_until_success() logging.info("🎯 التسجيل الخلفي ناجح مع %s", server) except Exception as e: logging.error("💥 خطأ في التسجيل الخلفي: %s", e) threading.Thread(target=background_connect, daemon=True).start() def get_current_info(self) -> dict: """الحصول على معلومات العقدة الحالية.""" return { "node_id": self.node_id, "ip": self.get_local_ip(), "port": self.port, "hostname": socket.gethostname(), "current_server": self.registry_servers[self.current_server_index] if self.current_server_index is not None else None } # ----------------------------------------------------------------------------- if __name__ == "__main__": """ للتجربة المباشرة: $ python node_client.py """ try: client = NodeClient() print("🔍 جاري اكتشاف الخوادم المحلية...") local_servers = client.discover_local_servers() if local_servers: print(f"✅ تم اكتشاف {len(local_servers)} خادم محلي") print("🚀 بدء عملية التسجيل...") server, peer_list = client.connect_until_success() print(f"✅ تسجيل ناجح مع الخادم: {server}") print(f"👥 عدد الأقران: {len(peer_list)}") if peer_list: print("📋 قائمة الأقران:") for i, peer in enumerate(peer_list[:10]): # عرض أول 10 أقران فقط print(f" {i+1}. {peer}") if len(peer_list) > 10: print(f" ... و{len(peer_list) - 10} آخرين") except KeyboardInterrupt: print("\n🛑 تم إيقاف العميل بواسطة المستخدم") except Exception as e: print(f"💥 خطأ غير متوقع: {e}")