File size: 9,189 Bytes
042d8bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84c65b6
 
 
042d8bf
84c65b6
042d8bf
 
 
 
 
84c65b6
 
042d8bf
 
 
84c65b6
 
 
042d8bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84c65b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
042d8bf
 
84c65b6
 
042d8bf
 
84c65b6
042d8bf
84c65b6
 
 
 
 
 
 
 
 
 
 
042d8bf
84c65b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
042d8bf
 
84c65b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
042d8bf
 
 
84c65b6
 
 
 
 
 
 
 
 
042d8bf
 
 
84c65b6
 
 
 
 
042d8bf
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
#!/usr/bin/env python3
import os
import socket
import threading
import time
import logging
import requests
from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser

# إعداد السجلات
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# منفذ الخدمة (بدءاً من 1000 مع زيادة متسلسلة)
current_port = 1000

def get_sequential_port():
    global current_port
    port = current_port
    current_port += 1
    if current_port > 9999:
        current_port = 1000
    return port
PORT = current_port
with open("offload_port.txt", "w") as f:
    f.write(str(current_port))

PORT = int(os.getenv("CPU_PORT", get_sequential_port()))
SERVICE = "_tasknode._tcp.local."
PEERS = set()
PEERS_INFO = {}

CENTRAL_REGISTRY_SERVERS = [
    "http://cv5303201.regru.cloud",
    "https://amaloffload.onrender.com", 
    "https://osamabyc86-offload.hf.space",
    "https://osamabyc19866-omsd.hf.space",
    "https://52.13.128.108",
    "https://176.28.156.149", 
    "https://44.229.227.142",
    "https://osamabyc86-amalcoreflow.hf.space"
]

def get_local_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception:
        return "127.0.0.1"

def register_peer(ip, port):
    peer_url = f"http://{ip}:{port}/run"
    if peer_url not in PEERS:
        PEERS.add(peer_url)
        logger.info(f"✅ تم تسجيل قرين جديد: {peer_url}")

def register_with_central_servers():
    """تسجيل هذه العقدة على الخوادم المركزية"""
    local_ip = get_local_ip()
    node_info = {
        "node_id": socket.gethostname(),
        "ip": local_ip,
        "port": PORT,
        "hostname": socket.gethostname(),
        "timestamp": time.time(),
        "capabilities": ["cpu", "gpu", "storage"]
    }
    
    successful_registrations = 0
    
    for server in CENTRAL_REGISTRY_SERVERS:
        for endpoint in ["/register", "/api/register", "/nodes/register", "/peer/register"]:
            try:
                url = f"{server.rstrip('/')}{endpoint}"
                logger.info(f"🔗 محاولة التسجيل في: {url}")
                
                response = requests.post(
                    url, 
                    json=node_info, 
                    timeout=10,
                    headers={"Content-Type": "application/json"}
                )
                
                if response.status_code == 200:
                    data = response.json()
                    successful_registrations += 1
                    logger.info(f"✅ تم التسجيل بنجاح في: {server}")
                    
                    # إضافة الأقران من الاستجابة
                    if isinstance(data, dict) and "peers" in data:
                        for peer in data["peers"]:
                            if isinstance(peer, str) and peer.startswith("http"):
                                PEERS.add(peer)
                                logger.info(f"👥 تمت إضافة قرين من السجل: {peer}")
                    
                    break  # توقف عن تجربة endpoints أخرى لهذا الخادم
                    
                elif response.status_code == 404:
                    continue  # جرب endpoint التالي
                else:
                    logger.warning(f"⚠️ استجابة غير متوقعة من {server}: {response.status_code}")
                    
            except requests.exceptions.Timeout:
                logger.warning(f"⏰ انتهت المهلة مع {server}")
                continue
            except requests.exceptions.ConnectionError:
                logger.warning(f"🔌 تعذر الاتصال بـ {server}")
                continue
            except Exception as e:
                logger.warning(f"❌ خطأ مع {server}: {str(e)}")
                continue
    
    logger.info(f"📊 إجمالي التسجيلات الناجحة: {successful_registrations}/{len(CENTRAL_REGISTRY_SERVERS)}")
    return successful_registrations > 0

def discover_lan_peers():
    """اكتشاف الأقران على الشبكة المحلية باستخدام Zeroconf"""
    class PeerListener:
        def add_service(self, zc, type_, name):
            info = zc.get_service_info(type_, name)
            if info and info.addresses:
                ip = socket.inet_ntoa(info.addresses[0])
                port = info.port
                register_peer(ip, port)
                logger.info(f"🔍 تم اكتشاف قرين محلي: {ip}:{port}")

    try:
        zeroconf = Zeroconf()
        ServiceBrowser(zeroconf, SERVICE, PeerListener())
        return zeroconf
    except Exception as e:
        logger.error(f"❌ فشل في بدء اكتشاف LAN: {e}")
        return None

def start_periodic_registration(interval=60):
    """بدء التسجيل الدوري مع الخوادم المركزية"""
    def registration_loop():
        while True:
            try:
                logger.info("🔄 بدء التسجيل الدوري مع الخوادم المركزية...")
                register_with_central_servers()
                logger.info(f"📈 عدد الأقران الحالي: {len(PEERS)}")
                time.sleep(interval)
            except Exception as e:
                logger.error(f"💥 خطأ في التسجيل الدوري: {e}")
                time.sleep(interval)
    
    thread = threading.Thread(target=registration_loop, daemon=True)
    thread.start()
    return thread

def start_periodic_discovery(interval=30):
    """اكتشاف دوري للأقران المحليين"""
    def discovery_loop():
        while True:
            try:
                # فحص نطاق IPs المحلي لاكتشاف الأقران
                local_ip = get_local_ip()
                base_ip = ".".join(local_ip.split(".")[:-1])
                
                for i in range(1, 50):
                    if i == int(local_ip.split(".")[-1]):
                        continue  # تخطي الذات
                    
                    ip = f"{base_ip}.{i}"
                    for port in [PORT, 8888, 8000, 5000, 3000]:
                        peer_url = f"http://{ip}:{port}"
                        if check_peer_availability(peer_url):
                            register_peer(ip, port)
                
                time.sleep(interval)
            except Exception as e:
                logger.error(f"💥 خطأ في الاكتشاف الدوري: {e}")
                time.sleep(interval)
    
    thread = threading.Thread(target=discovery_loop, daemon=True)
    thread.start()
    return thread

def check_peer_availability(peer_url):
    """فحص توفر القرين"""
    try:
        response = requests.get(f"{peer_url}/status", timeout=3)
        return response.status_code == 200
    except:
        return False

def get_peers():
    """الحصول على قائمة الأقران المتاحة"""
    return list(PEERS)

def main():
    logger.info("🚀 بدء نظام اكتشاف الأقران المتقدم...")
    logger.info(f"🌐 العقدة: {socket.gethostname()} - {get_local_ip()}:{PORT}")

    # تسجيل الخدمة المحلية باستخدام Zeroconf
    try:
        zeroconf = Zeroconf()
        info = ServiceInfo(
            type_=SERVICE,
            name=f"{socket.gethostname()}.{SERVICE}",
            addresses=[socket.inet_aton(get_local_ip())],
            port=PORT,
            properties={b'version': b'1.0', b'hostname': socket.gethostname().encode()},
            server=f"{socket.gethostname()}.local."
        )
        zeroconf.register_service(info)
        logger.info("✅ تم تسجيل الخدمة المحلية باستخدام Zeroconf")
    except Exception as e:
        logger.error(f"❌ فشل في تسجيل Zeroconf: {e}")
        zeroconf = None

    # بدء الاكتشاف المحلي
    lan_zeroconf = discover_lan_peers()

    # بدء التسجيل الدوري مع الخوادم المركزية
    registration_thread = start_periodic_registration(interval=60)
    
    # بدء الاكتشاف الدوري
    discovery_thread = start_periodic_discovery(interval=45)

    try:
        while True:
            logger.info(f"📊 إحصائيات - الأقران: {len(PEERS)}")
            if PEERS:
                logger.info("📋 قائمة الأقران المتاحة:")
                for i, peer in enumerate(list(PEERS)[:5]):  # عرض أول 5 فقط
                    logger.info(f"  {i+1}. {peer}")
                if len(PEERS) > 5:
                    logger.info(f"  ... و{len(PEERS) - 5} آخرين")
            
            time.sleep(30)
    except KeyboardInterrupt:
        logger.info("🛑 إيقاف النظام...")
    finally:
        if zeroconf:
            zeroconf.unregister_service(info)
            zeroconf.close()
        if lan_zeroconf:
            lan_zeroconf.close()

if __name__ == "__main__":
    main()