amalCoreFlow / peer_discovery.py
osamabyc86's picture
Upload 69 files
84c65b6 verified
#!/usr/bin/env python3
import os
import socket
import threading
import time
import logging
import requests
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser
# إعداد السجلات
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# منفذ الخدمة (بدءاً من 1000 مع زيادة متسلسلة)
current_port = 1000
def get_sequential_port():
global current_port
port = current_port
current_port += 1
if current_port > 9999:
current_port = 1000
return port
PORT = current_port
with open("offload_port.txt", "w") as f:
f.write(str(current_port))
PORT = int(os.getenv("CPU_PORT", get_sequential_port()))
SERVICE = "_tasknode._tcp.local."
PEERS = set()
PEERS_INFO = {}
CENTRAL_REGISTRY_SERVERS = [
"http://cv5303201.regru.cloud",
"https://amaloffload.onrender.com",
"https://osamabyc86-offload.hf.space",
"https://osamabyc19866-omsd.hf.space",
"https://52.13.128.108",
"https://176.28.156.149",
"https://44.229.227.142",
"https://osamabyc86-amalcoreflow.hf.space"
]
def get_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def register_peer(ip, port):
peer_url = f"http://{ip}:{port}/run"
if peer_url not in PEERS:
PEERS.add(peer_url)
logger.info(f"✅ تم تسجيل قرين جديد: {peer_url}")
def register_with_central_servers():
"""تسجيل هذه العقدة على الخوادم المركزية"""
local_ip = get_local_ip()
node_info = {
"node_id": socket.gethostname(),
"ip": local_ip,
"port": PORT,
"hostname": socket.gethostname(),
"timestamp": time.time(),
"capabilities": ["cpu", "gpu", "storage"]
}
successful_registrations = 0
for server in CENTRAL_REGISTRY_SERVERS:
for endpoint in ["/register", "/api/register", "/nodes/register", "/peer/register"]:
try:
url = f"{server.rstrip('/')}{endpoint}"
logger.info(f"🔗 محاولة التسجيل في: {url}")
response = requests.post(
url,
json=node_info,
timeout=10,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
data = response.json()
successful_registrations += 1
logger.info(f"✅ تم التسجيل بنجاح في: {server}")
# إضافة الأقران من الاستجابة
if isinstance(data, dict) and "peers" in data:
for peer in data["peers"]:
if isinstance(peer, str) and peer.startswith("http"):
PEERS.add(peer)
logger.info(f"👥 تمت إضافة قرين من السجل: {peer}")
break # توقف عن تجربة endpoints أخرى لهذا الخادم
elif response.status_code == 404:
continue # جرب endpoint التالي
else:
logger.warning(f"⚠️ استجابة غير متوقعة من {server}: {response.status_code}")
except requests.exceptions.Timeout:
logger.warning(f"⏰ انتهت المهلة مع {server}")
continue
except requests.exceptions.ConnectionError:
logger.warning(f"🔌 تعذر الاتصال بـ {server}")
continue
except Exception as e:
logger.warning(f"❌ خطأ مع {server}: {str(e)}")
continue
logger.info(f"📊 إجمالي التسجيلات الناجحة: {successful_registrations}/{len(CENTRAL_REGISTRY_SERVERS)}")
return successful_registrations > 0
def discover_lan_peers():
"""اكتشاف الأقران على الشبكة المحلية باستخدام Zeroconf"""
class PeerListener:
def add_service(self, zc, type_, name):
info = zc.get_service_info(type_, name)
if info and info.addresses:
ip = socket.inet_ntoa(info.addresses[0])
port = info.port
register_peer(ip, port)
logger.info(f"🔍 تم اكتشاف قرين محلي: {ip}:{port}")
try:
zeroconf = Zeroconf()
ServiceBrowser(zeroconf, SERVICE, PeerListener())
return zeroconf
except Exception as e:
logger.error(f"❌ فشل في بدء اكتشاف LAN: {e}")
return None
def start_periodic_registration(interval=60):
"""بدء التسجيل الدوري مع الخوادم المركزية"""
def registration_loop():
while True:
try:
logger.info("🔄 بدء التسجيل الدوري مع الخوادم المركزية...")
register_with_central_servers()
logger.info(f"📈 عدد الأقران الحالي: {len(PEERS)}")
time.sleep(interval)
except Exception as e:
logger.error(f"💥 خطأ في التسجيل الدوري: {e}")
time.sleep(interval)
thread = threading.Thread(target=registration_loop, daemon=True)
thread.start()
return thread
def start_periodic_discovery(interval=30):
"""اكتشاف دوري للأقران المحليين"""
def discovery_loop():
while True:
try:
# فحص نطاق IPs المحلي لاكتشاف الأقران
local_ip = get_local_ip()
base_ip = ".".join(local_ip.split(".")[:-1])
for i in range(1, 50):
if i == int(local_ip.split(".")[-1]):
continue # تخطي الذات
ip = f"{base_ip}.{i}"
for port in [PORT, 8888, 8000, 5000, 3000]:
peer_url = f"http://{ip}:{port}"
if check_peer_availability(peer_url):
register_peer(ip, port)
time.sleep(interval)
except Exception as e:
logger.error(f"💥 خطأ في الاكتشاف الدوري: {e}")
time.sleep(interval)
thread = threading.Thread(target=discovery_loop, daemon=True)
thread.start()
return thread
def check_peer_availability(peer_url):
"""فحص توفر القرين"""
try:
response = requests.get(f"{peer_url}/status", timeout=3)
return response.status_code == 200
except:
return False
def get_peers():
"""الحصول على قائمة الأقران المتاحة"""
return list(PEERS)
def main():
logger.info("🚀 بدء نظام اكتشاف الأقران المتقدم...")
logger.info(f"🌐 العقدة: {socket.gethostname()} - {get_local_ip()}:{PORT}")
# تسجيل الخدمة المحلية باستخدام Zeroconf
try:
zeroconf = Zeroconf()
info = ServiceInfo(
type_=SERVICE,
name=f"{socket.gethostname()}.{SERVICE}",
addresses=[socket.inet_aton(get_local_ip())],
port=PORT,
properties={b'version': b'1.0', b'hostname': socket.gethostname().encode()},
server=f"{socket.gethostname()}.local."
)
zeroconf.register_service(info)
logger.info("✅ تم تسجيل الخدمة المحلية باستخدام Zeroconf")
except Exception as e:
logger.error(f"❌ فشل في تسجيل Zeroconf: {e}")
zeroconf = None
# بدء الاكتشاف المحلي
lan_zeroconf = discover_lan_peers()
# بدء التسجيل الدوري مع الخوادم المركزية
registration_thread = start_periodic_registration(interval=60)
# بدء الاكتشاف الدوري
discovery_thread = start_periodic_discovery(interval=45)
try:
while True:
logger.info(f"📊 إحصائيات - الأقران: {len(PEERS)}")
if PEERS:
logger.info("📋 قائمة الأقران المتاحة:")
for i, peer in enumerate(list(PEERS)[:5]): # عرض أول 5 فقط
logger.info(f" {i+1}. {peer}")
if len(PEERS) > 5:
logger.info(f" ... و{len(PEERS) - 5} آخرين")
time.sleep(30)
except KeyboardInterrupt:
logger.info("🛑 إيقاف النظام...")
finally:
if zeroconf:
zeroconf.unregister_service(info)
zeroconf.close()
if lan_zeroconf:
lan_zeroconf.close()
if __name__ == "__main__":
main()