Spaces:
Running
Running
File size: 9,189 Bytes
042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | #!/usr/bin/env python3
import os
import socket
import threading
import time
import logging
import requests
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser
# إعداد السجلات
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# منفذ الخدمة (بدءاً من 1000 مع زيادة متسلسلة)
current_port = 1000
def get_sequential_port():
global current_port
port = current_port
current_port += 1
if current_port > 9999:
current_port = 1000
return port
PORT = current_port
with open("offload_port.txt", "w") as f:
f.write(str(current_port))
PORT = int(os.getenv("CPU_PORT", get_sequential_port()))
SERVICE = "_tasknode._tcp.local."
PEERS = set()
PEERS_INFO = {}
CENTRAL_REGISTRY_SERVERS = [
"http://cv5303201.regru.cloud",
"https://amaloffload.onrender.com",
"https://osamabyc86-offload.hf.space",
"https://osamabyc19866-omsd.hf.space",
"https://52.13.128.108",
"https://176.28.156.149",
"https://44.229.227.142",
"https://osamabyc86-amalcoreflow.hf.space"
]
def get_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def register_peer(ip, port):
peer_url = f"http://{ip}:{port}/run"
if peer_url not in PEERS:
PEERS.add(peer_url)
logger.info(f"✅ تم تسجيل قرين جديد: {peer_url}")
def register_with_central_servers():
"""تسجيل هذه العقدة على الخوادم المركزية"""
local_ip = get_local_ip()
node_info = {
"node_id": socket.gethostname(),
"ip": local_ip,
"port": PORT,
"hostname": socket.gethostname(),
"timestamp": time.time(),
"capabilities": ["cpu", "gpu", "storage"]
}
successful_registrations = 0
for server in CENTRAL_REGISTRY_SERVERS:
for endpoint in ["/register", "/api/register", "/nodes/register", "/peer/register"]:
try:
url = f"{server.rstrip('/')}{endpoint}"
logger.info(f"🔗 محاولة التسجيل في: {url}")
response = requests.post(
url,
json=node_info,
timeout=10,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
data = response.json()
successful_registrations += 1
logger.info(f"✅ تم التسجيل بنجاح في: {server}")
# إضافة الأقران من الاستجابة
if isinstance(data, dict) and "peers" in data:
for peer in data["peers"]:
if isinstance(peer, str) and peer.startswith("http"):
PEERS.add(peer)
logger.info(f"👥 تمت إضافة قرين من السجل: {peer}")
break # توقف عن تجربة endpoints أخرى لهذا الخادم
elif response.status_code == 404:
continue # جرب endpoint التالي
else:
logger.warning(f"⚠️ استجابة غير متوقعة من {server}: {response.status_code}")
except requests.exceptions.Timeout:
logger.warning(f"⏰ انتهت المهلة مع {server}")
continue
except requests.exceptions.ConnectionError:
logger.warning(f"🔌 تعذر الاتصال بـ {server}")
continue
except Exception as e:
logger.warning(f"❌ خطأ مع {server}: {str(e)}")
continue
logger.info(f"📊 إجمالي التسجيلات الناجحة: {successful_registrations}/{len(CENTRAL_REGISTRY_SERVERS)}")
return successful_registrations > 0
def discover_lan_peers():
"""اكتشاف الأقران على الشبكة المحلية باستخدام Zeroconf"""
class PeerListener:
def add_service(self, zc, type_, name):
info = zc.get_service_info(type_, name)
if info and info.addresses:
ip = socket.inet_ntoa(info.addresses[0])
port = info.port
register_peer(ip, port)
logger.info(f"🔍 تم اكتشاف قرين محلي: {ip}:{port}")
try:
zeroconf = Zeroconf()
ServiceBrowser(zeroconf, SERVICE, PeerListener())
return zeroconf
except Exception as e:
logger.error(f"❌ فشل في بدء اكتشاف LAN: {e}")
return None
def start_periodic_registration(interval=60):
"""بدء التسجيل الدوري مع الخوادم المركزية"""
def registration_loop():
while True:
try:
logger.info("🔄 بدء التسجيل الدوري مع الخوادم المركزية...")
register_with_central_servers()
logger.info(f"📈 عدد الأقران الحالي: {len(PEERS)}")
time.sleep(interval)
except Exception as e:
logger.error(f"💥 خطأ في التسجيل الدوري: {e}")
time.sleep(interval)
thread = threading.Thread(target=registration_loop, daemon=True)
thread.start()
return thread
def start_periodic_discovery(interval=30):
"""اكتشاف دوري للأقران المحليين"""
def discovery_loop():
while True:
try:
# فحص نطاق IPs المحلي لاكتشاف الأقران
local_ip = get_local_ip()
base_ip = ".".join(local_ip.split(".")[:-1])
for i in range(1, 50):
if i == int(local_ip.split(".")[-1]):
continue # تخطي الذات
ip = f"{base_ip}.{i}"
for port in [PORT, 8888, 8000, 5000, 3000]:
peer_url = f"http://{ip}:{port}"
if check_peer_availability(peer_url):
register_peer(ip, port)
time.sleep(interval)
except Exception as e:
logger.error(f"💥 خطأ في الاكتشاف الدوري: {e}")
time.sleep(interval)
thread = threading.Thread(target=discovery_loop, daemon=True)
thread.start()
return thread
def check_peer_availability(peer_url):
"""فحص توفر القرين"""
try:
response = requests.get(f"{peer_url}/status", timeout=3)
return response.status_code == 200
except:
return False
def get_peers():
"""الحصول على قائمة الأقران المتاحة"""
return list(PEERS)
def main():
logger.info("🚀 بدء نظام اكتشاف الأقران المتقدم...")
logger.info(f"🌐 العقدة: {socket.gethostname()} - {get_local_ip()}:{PORT}")
# تسجيل الخدمة المحلية باستخدام Zeroconf
try:
zeroconf = Zeroconf()
info = ServiceInfo(
type_=SERVICE,
name=f"{socket.gethostname()}.{SERVICE}",
addresses=[socket.inet_aton(get_local_ip())],
port=PORT,
properties={b'version': b'1.0', b'hostname': socket.gethostname().encode()},
server=f"{socket.gethostname()}.local."
)
zeroconf.register_service(info)
logger.info("✅ تم تسجيل الخدمة المحلية باستخدام Zeroconf")
except Exception as e:
logger.error(f"❌ فشل في تسجيل Zeroconf: {e}")
zeroconf = None
# بدء الاكتشاف المحلي
lan_zeroconf = discover_lan_peers()
# بدء التسجيل الدوري مع الخوادم المركزية
registration_thread = start_periodic_registration(interval=60)
# بدء الاكتشاف الدوري
discovery_thread = start_periodic_discovery(interval=45)
try:
while True:
logger.info(f"📊 إحصائيات - الأقران: {len(PEERS)}")
if PEERS:
logger.info("📋 قائمة الأقران المتاحة:")
for i, peer in enumerate(list(PEERS)[:5]): # عرض أول 5 فقط
logger.info(f" {i+1}. {peer}")
if len(PEERS) > 5:
logger.info(f" ... و{len(PEERS) - 5} آخرين")
time.sleep(30)
except KeyboardInterrupt:
logger.info("🛑 إيقاف النظام...")
finally:
if zeroconf:
zeroconf.unregister_service(info)
zeroconf.close()
if lan_zeroconf:
lan_zeroconf.close()
if __name__ == "__main__":
main()
|