#!/usr/bin/env python3 import os import socket import threading import time import logging import requests from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser # إعداد السجلات logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # منفذ الخدمة (بدءاً من 1000 مع زيادة متسلسلة) current_port = 1000 def get_sequential_port(): global current_port port = current_port current_port += 1 if current_port > 9999: current_port = 1000 return port PORT = current_port with open("offload_port.txt", "w") as f: f.write(str(current_port)) PORT = int(os.getenv("CPU_PORT", get_sequential_port())) SERVICE = "_tasknode._tcp.local." PEERS = set() PEERS_INFO = {} CENTRAL_REGISTRY_SERVERS = [ "http://cv5303201.regru.cloud", "https://amaloffload.onrender.com", "https://osamabyc86-offload.hf.space", "https://osamabyc19866-omsd.hf.space", "https://52.13.128.108", "https://176.28.156.149", "https://44.229.227.142", "https://osamabyc86-amalcoreflow.hf.space" ] def get_local_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "127.0.0.1" def register_peer(ip, port): peer_url = f"http://{ip}:{port}/run" if peer_url not in PEERS: PEERS.add(peer_url) logger.info(f"✅ تم تسجيل قرين جديد: {peer_url}") def register_with_central_servers(): """تسجيل هذه العقدة على الخوادم المركزية""" local_ip = get_local_ip() node_info = { "node_id": socket.gethostname(), "ip": local_ip, "port": PORT, "hostname": socket.gethostname(), "timestamp": time.time(), "capabilities": ["cpu", "gpu", "storage"] } successful_registrations = 0 for server in CENTRAL_REGISTRY_SERVERS: for endpoint in ["/register", "/api/register", "/nodes/register", "/peer/register"]: try: url = f"{server.rstrip('/')}{endpoint}" logger.info(f"🔗 محاولة التسجيل في: {url}") response = requests.post( url, json=node_info, timeout=10, headers={"Content-Type": "application/json"} ) if response.status_code == 200: data = response.json() successful_registrations += 1 logger.info(f"✅ تم التسجيل بنجاح في: {server}") # إضافة الأقران من الاستجابة if isinstance(data, dict) and "peers" in data: for peer in data["peers"]: if isinstance(peer, str) and peer.startswith("http"): PEERS.add(peer) logger.info(f"👥 تمت إضافة قرين من السجل: {peer}") break # توقف عن تجربة endpoints أخرى لهذا الخادم elif response.status_code == 404: continue # جرب endpoint التالي else: logger.warning(f"⚠️ استجابة غير متوقعة من {server}: {response.status_code}") except requests.exceptions.Timeout: logger.warning(f"⏰ انتهت المهلة مع {server}") continue except requests.exceptions.ConnectionError: logger.warning(f"🔌 تعذر الاتصال بـ {server}") continue except Exception as e: logger.warning(f"❌ خطأ مع {server}: {str(e)}") continue logger.info(f"📊 إجمالي التسجيلات الناجحة: {successful_registrations}/{len(CENTRAL_REGISTRY_SERVERS)}") return successful_registrations > 0 def discover_lan_peers(): """اكتشاف الأقران على الشبكة المحلية باستخدام Zeroconf""" class PeerListener: def add_service(self, zc, type_, name): info = zc.get_service_info(type_, name) if info and info.addresses: ip = socket.inet_ntoa(info.addresses[0]) port = info.port register_peer(ip, port) logger.info(f"🔍 تم اكتشاف قرين محلي: {ip}:{port}") try: zeroconf = Zeroconf() ServiceBrowser(zeroconf, SERVICE, PeerListener()) return zeroconf except Exception as e: logger.error(f"❌ فشل في بدء اكتشاف LAN: {e}") return None def start_periodic_registration(interval=60): """بدء التسجيل الدوري مع الخوادم المركزية""" def registration_loop(): while True: try: logger.info("🔄 بدء التسجيل الدوري مع الخوادم المركزية...") register_with_central_servers() logger.info(f"📈 عدد الأقران الحالي: {len(PEERS)}") time.sleep(interval) except Exception as e: logger.error(f"💥 خطأ في التسجيل الدوري: {e}") time.sleep(interval) thread = threading.Thread(target=registration_loop, daemon=True) thread.start() return thread def start_periodic_discovery(interval=30): """اكتشاف دوري للأقران المحليين""" def discovery_loop(): while True: try: # فحص نطاق IPs المحلي لاكتشاف الأقران local_ip = get_local_ip() base_ip = ".".join(local_ip.split(".")[:-1]) for i in range(1, 50): if i == int(local_ip.split(".")[-1]): continue # تخطي الذات ip = f"{base_ip}.{i}" for port in [PORT, 8888, 8000, 5000, 3000]: peer_url = f"http://{ip}:{port}" if check_peer_availability(peer_url): register_peer(ip, port) time.sleep(interval) except Exception as e: logger.error(f"💥 خطأ في الاكتشاف الدوري: {e}") time.sleep(interval) thread = threading.Thread(target=discovery_loop, daemon=True) thread.start() return thread def check_peer_availability(peer_url): """فحص توفر القرين""" try: response = requests.get(f"{peer_url}/status", timeout=3) return response.status_code == 200 except: return False def get_peers(): """الحصول على قائمة الأقران المتاحة""" return list(PEERS) def main(): logger.info("🚀 بدء نظام اكتشاف الأقران المتقدم...") logger.info(f"🌐 العقدة: {socket.gethostname()} - {get_local_ip()}:{PORT}") # تسجيل الخدمة المحلية باستخدام Zeroconf try: zeroconf = Zeroconf() info = ServiceInfo( type_=SERVICE, name=f"{socket.gethostname()}.{SERVICE}", addresses=[socket.inet_aton(get_local_ip())], port=PORT, properties={b'version': b'1.0', b'hostname': socket.gethostname().encode()}, server=f"{socket.gethostname()}.local." ) zeroconf.register_service(info) logger.info("✅ تم تسجيل الخدمة المحلية باستخدام Zeroconf") except Exception as e: logger.error(f"❌ فشل في تسجيل Zeroconf: {e}") zeroconf = None # بدء الاكتشاف المحلي lan_zeroconf = discover_lan_peers() # بدء التسجيل الدوري مع الخوادم المركزية registration_thread = start_periodic_registration(interval=60) # بدء الاكتشاف الدوري discovery_thread = start_periodic_discovery(interval=45) try: while True: logger.info(f"📊 إحصائيات - الأقران: {len(PEERS)}") if PEERS: logger.info("📋 قائمة الأقران المتاحة:") for i, peer in enumerate(list(PEERS)[:5]): # عرض أول 5 فقط logger.info(f" {i+1}. {peer}") if len(PEERS) > 5: logger.info(f" ... و{len(PEERS) - 5} آخرين") time.sleep(30) except KeyboardInterrupt: logger.info("🛑 إيقاف النظام...") finally: if zeroconf: zeroconf.unregister_service(info) zeroconf.close() if lan_zeroconf: lan_zeroconf.close() if __name__ == "__main__": main()