amalCoreFlow / node_client.py
osamabyc86's picture
Upload 69 files
84c65b6 verified
#!/usr/bin/env python3
# ================================================================
# node_client.py – عميل تسجيل العُقدة في نظام AmalOffload
# ---------------------------------------------------------------
# • يختار منفذًا (من ENV أو من مجموعة PORTS).
# • يجلب عنوان الـ IP المحلي.
# • يحاول التسجيل في خادم سجلٍّ مركزي واحد تِلو الآخر،
# وعلى كل المنافذ، حتى ينجح.
# • عند النجاح يُرجع قائمة الأقران (Peers) من الخادم.
# ================================================================
import os
import socket
import time
import logging
import random
import requests
from typing import Iterable, Tuple, List
# ⬇️ منافذ مقترحة؛ يمكنك التعديل أو توليدها ديناميكيًا
DEFAULT_PORTS = {
7520, 7384, 9021, 6998, 5810, 9274,
8645, 7329, 7734, 8456, 6173, 7860,
8080, 8000, 5000, 3000, 8888, 9999
}
# ⬇️ خوادم السجل الاحتياطية بالترتيب المفضَّل
DEFAULT_REGISTRY_SERVERS = [
"http://localhost:8888", # خادم محلي أولاً
"http://127.0.0.1:8888", # خادم محلي بديل
"https://cv4790811.regru.cloud",
"https://amaloffload.onrender.com",
"https://osamabyc86-offload.hf.space",
"http://10.229.36.125",
"http://10.229.228.178",
"http://192.168.1.1:8888", # راوتر محلي
"http://192.168.0.1:8888", # راوتر محلي بديل
]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
class NodeClient:
"""
عميل خفيف يعتني بالتسجيل المتكرِّر في خادم سجل مركزي.
يمكن استيراده في أي سكربت وتشغيله في خيط منفصل.
"""
def __init__(
self,
PORTs: Iterable[int] | None = None,
registry_servers: List[str] | None = None,
node_id: str | None = None,
):
self.PORTs = set(PORTs) if PORTs else DEFAULT_PORTS
self.registry_servers = list(registry_servers) if registry_servers else DEFAULT_REGISTRY_SERVERS
self.node_id = node_id or os.getenv("NODE_ID", socket.gethostname())
# مبدئيًّا اختَر منفذًا (أولوية للمتغيّر البيئي إن وُجد)
self.port: int = int(os.getenv("CPU_PORT", random.choice(list(self.PORTs)))) # 🔧 تصحيح: PORTs بدلاً من PORTS
self.current_server_index: int | None = None
self.session = requests.Session()
self.session.timeout = 10
# -------------------------------------------------------------------------
@staticmethod
def get_local_ip() -> str:
"""يحاول معرفة أفضل عنوان IP محلي لاستخدامه في الشبكة."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# لا يهم أن ينجح الاتصال الفعلي، الهدف كشف IP واجهة الخروج
s.connect(("8.8.8.8", 53))
return s.getsockname()[0]
except Exception:
try:
# محاولة بديلة
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
except Exception:
return "127.0.0.1"
def _register_once(self, server: str, port: int) -> List[str]:
"""مُحاولة واحدة للتسجيل؛ تُعيد peers أو ترفع استثناءً."""
payload = {
"node_id": self.node_id,
"ip": self.get_local_ip(),
"port": port,
"hostname": socket.gethostname(),
"timestamp": time.time()
}
# جرب مسارات مختلفة للتسجيل
endpoints = ["/register", "/api/register", "/nodes/register", "/peer/register"]
for endpoint in endpoints:
try:
url = f"{server.rstrip('/')}{endpoint}"
logging.info(f"🔗 محاولة التسجيل في: {url}")
resp = self.session.post(url, json=payload, timeout=8)
if resp.status_code == 200:
data = resp.json()
logging.info(f"✅ تسجيل ناجح في {server}")
return data.get("peers", []) if isinstance(data, dict) else data
elif resp.status_code == 404:
continue # جرب endpoint التالي
else:
resp.raise_for_status()
except requests.exceptions.Timeout:
logging.warning(f"⏰ انتهت المهلة مع {server}{endpoint}")
continue
except requests.exceptions.ConnectionError:
logging.warning(f"🔌 تعذر الاتصال بـ {server}{endpoint}")
continue
except Exception as e:
logging.warning(f"❌ خطأ مع {server}{endpoint}: {e}")
continue
# إذا وصلنا هنا، فكل المحاولات فشلت
raise Exception(f"فشل التسجيل في {server} بعد تجربة جميع المسارات")
# -------------------------------------------------------------------------
def discover_local_servers(self) -> List[str]:
"""اكتشاف خوادم محلية على الشبكة."""
local_servers = []
base_ip = ".".join(self.get_local_ip().split(".")[:-1])
# فحص نطاق IPs المحلي
for i in range(1, 50): # فحص أول 50 عنوان فقط للسرعة
if i == int(self.get_local_ip().split(".")[-1]):
continue # تخطي الذات
ip = f"{base_ip}.{i}"
for port in [8888, 8000, 5000, 3000]:
server_url = f"http://{ip}:{port}"
if self.check_server_availability(server_url):
local_servers.append(server_url)
logging.info(f"🔍 تم اكتشاف خادم محلي: {server_url}")
return local_servers
def check_server_availability(self, server_url: str) -> bool:
"""فحص توفر الخادم."""
try:
resp = self.session.get(f"{server_url}/status", timeout=2)
return resp.status_code == 200
except:
return False
def connect_until_success(self, retry_delay: int = 10) -> Tuple[str, List[str]]:
"""
يدور على جميع المنافذ والخوادم حتى ينجح التسجيل.
• عند النجاح يُرجع: (عنوان الخادم، قائمة الأقران)
• لا يرفع استثناءات؛ إمّا ينجح أو يستمر في المحاولة إلى ما لا نهاية.
"""
logging.info("🔄 بدء محاولات التسجيل للعقدة '%s'...", self.node_id)
logging.info("🌐 عنوان IP المحلي: %s", self.get_local_ip())
logging.info("📋 عدد الخوادم المتاحة: %d", len(self.registry_servers))
attempt = 0
while True:
attempt += 1
logging.info("🔄 محاولة التسجيل رقم %d", attempt)
# اكتشاف خوادم محلية أولاً
if attempt % 3 == 1: # كل 3 محاولات، اكتشف خوادم محلية
local_servers = self.discover_local_servers()
all_servers = local_servers + self.registry_servers
else:
all_servers = self.registry_servers
for port in self.PORTs: # 🔧 تصحيح: PORTs بدلاً من PORTS
for idx, server in enumerate(all_servers):
try:
peers = self._register_once(server, port)
# سجّل النجاح واحفظ المعلومات
self.port = port
self.current_server_index = idx
logging.info("✅ تسجيل ناجح: %s على المنفذ %s", server, port)
logging.info("👥 عدد الأقران المكتشفين: %d", len(peers))
return server, peers
except Exception as e:
logging.debug("❌ %s:%s -> %s", server, port, e)
logging.warning("❌ فشلت جميع محاولات التسجيل، إعادة المحاولة بعد %d ثواني", retry_delay)
time.sleep(retry_delay)
# -------------------------------------------------------------------------
def run_background(self) -> None:
"""
إطلاق التسجيل في خيط منفصل؛ مفيد إذا كنت تريد
إبقاء Main Thread للمهام الأخرى.
"""
import threading
def background_connect():
try:
server, peers = self.connect_until_success()
logging.info("🎯 التسجيل الخلفي ناجح مع %s", server)
except Exception as e:
logging.error("💥 خطأ في التسجيل الخلفي: %s", e)
threading.Thread(target=background_connect, daemon=True).start()
def get_current_info(self) -> dict:
"""الحصول على معلومات العقدة الحالية."""
return {
"node_id": self.node_id,
"ip": self.get_local_ip(),
"port": self.port,
"hostname": socket.gethostname(),
"current_server": self.registry_servers[self.current_server_index] if self.current_server_index is not None else None
}
# -----------------------------------------------------------------------------
if __name__ == "__main__":
"""
للتجربة المباشرة:
$ python node_client.py
"""
try:
client = NodeClient()
print("🔍 جاري اكتشاف الخوادم المحلية...")
local_servers = client.discover_local_servers()
if local_servers:
print(f"✅ تم اكتشاف {len(local_servers)} خادم محلي")
print("🚀 بدء عملية التسجيل...")
server, peer_list = client.connect_until_success()
print(f"✅ تسجيل ناجح مع الخادم: {server}")
print(f"👥 عدد الأقران: {len(peer_list)}")
if peer_list:
print("📋 قائمة الأقران:")
for i, peer in enumerate(peer_list[:10]): # عرض أول 10 أقران فقط
print(f" {i+1}. {peer}")
if len(peer_list) > 10:
print(f" ... و{len(peer_list) - 10} آخرين")
except KeyboardInterrupt:
print("\n🛑 تم إيقاف العميل بواسطة المستخدم")
except Exception as e:
print(f"💥 خطأ غير متوقع: {e}")