osamabyc86 commited on
Commit
02c9bc3
·
verified ·
1 Parent(s): ed0cfc7

Update peer_discovery.py

Browse files
Files changed (1) hide show
  1. peer_discovery.py +64 -81
peer_discovery.py CHANGED
@@ -8,18 +8,16 @@ import requests
8
  from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser
9
  import random
10
 
11
- # منفذ عشوائي مكوَّن من 4 أرقام (مثلاً 0007، 8321…)
12
- PORT = int(os.getenv("CPU_PORT", random.randint(1, 9999)))
 
 
 
13
 
14
- # إذا كان باقي الكود يحتاج متغيّر rport iterable
15
- # أنشئ مجموعة تحتوي هذا المنفذ فقط (اختياري):
16
- rport = {f"{PORT:04}"}
17
- # 👇 إعداد الـ peer discovery عبر LAN وInternet
18
  SERVICE = "_tasknode._tcp.local."
19
- PORT = int(os.getenv("CPU_PORT", random.choice(list(rport))))
20
- PEERS = set() # مجموعة URLs للأقران (/run)
21
 
22
- # 🌐 قائمة السيرفرات (Failover List)
23
  BASES = [
24
  "https://cv4790811.regru.cloud",
25
  "https://amaloffload.onrender.com",
@@ -28,8 +26,8 @@ BASES = [
28
  "http://10.229.228.178",
29
  ]
30
 
31
- CENTRAL_REGISTRY_SERVERS = [f"{base}:{port}" for base in BASES for port in rport]
32
-
33
  current_server_index = 0
34
 
35
 
@@ -50,6 +48,7 @@ def get_local_ip():
50
  try: s.close()
51
  except: pass
52
 
 
53
  # 🔄 اختيار السيرفر المتاح
54
  def get_active_central_server():
55
  global current_server_index
@@ -64,6 +63,7 @@ def get_active_central_server():
64
  continue
65
  return None
66
 
 
67
  # ❶ تسجيل الخدمة في شبكة LAN
68
  def register_service_lan():
69
  zc = Zeroconf()
@@ -81,6 +81,7 @@ def register_service_lan():
81
  except Exception as e:
82
  print(f"❌ LAN registration failed: {e}")
83
 
 
84
  # ❷ مستمع اكتشاف LAN
85
  class Listener:
86
  def add_service(self, zc, t, name):
@@ -103,37 +104,53 @@ def discover_lan_loop():
103
  while True:
104
  time.sleep(5)
105
 
106
- # ❸ التسجيل في الخادم المركزي (مع Failover)
107
- def register_with_central():
108
- node_id = os.getenv("NODE_ID", socket.gethostname())
109
- info = {"node_id": node_id, "ip": get_local_ip(), "port": PORT}
110
- server = get_active_central_server()
111
- if not server:
112
- print("❌ لا يوجد أي سيرفر مركزي متاح حالياً")
113
- return
114
- try:
115
- resp = requests.post(f"{server}/register", json=info, timeout=5)
116
- resp.raise_for_status()
117
- peers_list = resp.json()
118
- for p in peers_list:
119
- peer_url = f"http://{p['ip']}:{p['port']}/run"
120
- if peer_url not in PEERS:
121
- PEERS.add(peer_url)
122
- print(f"🌐 Registered and discovered central peer: {peer_url}")
123
- except Exception as e:
124
- print(f"❌ Central registration failed on {server}: {e}")
125
 
126
- # مزامنة الأقران مع السيرفر المركزي (مع Failover)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
  def fetch_central_loop():
128
  print("🔄 Central registry sync loop started")
129
  while True:
130
- server = get_active_central_server()
131
- if not server:
132
- print("⚠️ لا يوجد سيرفر مركزي متاح للمزامنة")
133
- time.sleep(30)
134
  continue
135
  try:
136
- resp = requests.get(f"{server}/peers", timeout=5)
137
  resp.raise_for_status()
138
  peers_list = resp.json()
139
  for p in peers_list:
@@ -142,51 +159,17 @@ def fetch_central_loop():
142
  PEERS.add(peer_url)
143
  print(f"🌐 Central peer discovered: {peer_url}")
144
  except Exception as e:
145
- print(f"⚠️ Fetch central peers failed on {server}: {e}")
146
  time.sleep(300)
147
- def connect_until_success():
148
- """
149
- يحاول الاتصال بكل سيرفر في CENTRAL_REGISTRY_SERVERS
150
- على كل منفذ في RPORTS بالتتابع (مع تأخير بسيط).
151
- لا يخرج إلا بعد نجاح التسجيل، ويُعيد السيرفر المختار وقائمة الأقران.
152
- """
153
- global PORT, current_server_index
154
- while True:
155
- for port in RPORTS: # جرّب كل المنافذ
156
- for idx, server in enumerate(CENTRAL_REGISTRY_SERVERS):
157
- info = {
158
- "node_id": os.getenv("NODE_ID", socket.gethostname()),
159
- "ip": get_local_ip(),
160
- "port": port
161
- }
162
- try:
163
- resp = requests.post(f"{server}/register",
164
- json=info, timeout=5)
165
- resp.raise_for_status() # نجاح
166
- PORT = port # ثبّت المنفذ الناجح
167
- current_server_index = idx # حدّث المؤشّر
168
- print(f"✅ Connected: {server} on port {PORT}")
169
- return server, resp.json() # peers_list
170
- except Exception as e:
171
- logging.info("❌ %s:%s -> %s", server, port, e)
172
- time.sleep(5) # انتظر قليلاً ثم أَعِد الكرّة
173
-
174
- # 🚀 Main
175
- def main():
176
- logging.basicConfig(level=logging.INFO)
177
- print("🚀 Peer Discovery System starting...")
178
-
179
- threading.Thread(target=register_service_lan, daemon=True).start()
180
- threading.Thread(target=discover_lan_loop, daemon=True).start()
181
-
182
- register_with_central()
183
- threading.Thread(target=fetch_central_loop, daemon=True).start()
184
 
185
- try:
186
- while True:
187
- time.sleep(60)
188
- except KeyboardInterrupt:
189
- print("🛑 Exiting...")
190
 
191
- if __name__ == "__main__":
192
- main()
 
 
 
 
 
 
 
 
 
8
  from zeroconf import Zeroconf, ServiceInfo, ServiceBrowser
9
  import random
10
 
11
+ # ---------- المتغيرات الأساسية ----------
12
+ PORT = None
13
+ CURRENT_SERVER = None
14
+ CONNECTED = threading.Event()
15
+ PEERS = set()
16
 
 
 
 
 
17
  SERVICE = "_tasknode._tcp.local."
18
+ LAN_PORT = 7520 # منفذ LAN الداخلي
 
19
 
20
+ # ---------- قائمة السيرفرات ----------
21
  BASES = [
22
  "https://cv4790811.regru.cloud",
23
  "https://amaloffload.onrender.com",
 
26
  "http://10.229.228.178",
27
  ]
28
 
29
+ RPORTS = [f"{i:04}" for i in range(1, 10000)]
30
+ CENTRAL_REGISTRY_SERVERS = [f"{base}:{port}" for base in BASES for port in RPORTS]
31
  current_server_index = 0
32
 
33
 
 
48
  try: s.close()
49
  except: pass
50
 
51
+
52
  # 🔄 اختيار السيرفر المتاح
53
  def get_active_central_server():
54
  global current_server_index
 
63
  continue
64
  return None
65
 
66
+
67
  # ❶ تسجيل الخدمة في شبكة LAN
68
  def register_service_lan():
69
  zc = Zeroconf()
 
81
  except Exception as e:
82
  print(f"❌ LAN registration failed: {e}")
83
 
84
+
85
  # ❷ مستمع اكتشاف LAN
86
  class Listener:
87
  def add_service(self, zc, t, name):
 
104
  while True:
105
  time.sleep(5)
106
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
+ # الاتصال بسيرفر مركزي (مع محاولات حتى النجاح)
109
+ def connect_until_success():
110
+ """
111
+ يحاول الاتصال بكل سيرفر وبورت بالتسلسل، ويتوقف عند أول نجاح
112
+ ويضبط PORT وCONNECTED وCENTRAL_REGISTRY
113
+ """
114
+ global PORT, current_server_index, CURRENT_SERVER
115
+
116
+ for port_int in range(1, 10000):
117
+ port = f"{port_int:04}"
118
+ for idx, server_base in enumerate(BASES):
119
+ server = f"{server_base}:{port}"
120
+ info = {
121
+ "node_id": os.getenv("NODE_ID", socket.gethostname()),
122
+ "ip": get_local_ip(),
123
+ "port": int(port)
124
+ }
125
+ try:
126
+ resp = requests.post(f"{server}/register", json=info, timeout=5)
127
+ resp.raise_for_status()
128
+ PORT = int(port)
129
+ CURRENT_SERVER = server
130
+ current_server_index = idx
131
+ CONNECTED.set()
132
+ print(f"✅ Connected: {server} on port {PORT}")
133
+ peers = resp.json()
134
+ for p in peers:
135
+ peer_url = f"http://{p['ip']}:{p['port']}/run"
136
+ if peer_url not in PEERS:
137
+ PEERS.add(peer_url)
138
+ print(f"🌐 Central peer: {peer_url}")
139
+ return
140
+ except Exception as e:
141
+ logging.info("❌ %s:%s -> %s", server_base, port, e)
142
+ time.sleep(0.1)
143
+
144
+
145
+ # ❹ مزامنة مستمرة مع السيرفر المركزي
146
  def fetch_central_loop():
147
  print("🔄 Central registry sync loop started")
148
  while True:
149
+ if not CONNECTED.is_set():
150
+ time.sleep(5)
 
 
151
  continue
152
  try:
153
+ resp = requests.get(f"{CURRENT_SERVER}/peers", timeout=5)
154
  resp.raise_for_status()
155
  peers_list = resp.json()
156
  for p in peers_list:
 
159
  PEERS.add(peer_url)
160
  print(f"🌐 Central peer discovered: {peer_url}")
161
  except Exception as e:
162
+ print(f"⚠️ Fetch central peers failed: {e}")
163
  time.sleep(300)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
 
 
 
 
 
 
165
 
166
+ # 🚀 تشغيل تلقائي عند الاستيراد
167
+ def _start():
168
+ if not CONNECTED.is_set():
169
+ print("🚀 Starting peer discovery...")
170
+ connect_until_success()
171
+ threading.Thread(target=register_service_lan, daemon=True).start()
172
+ threading.Thread(target=discover_lan_loop, daemon=True).start()
173
+ threading.Thread(target=fetch_central_loop, daemon=True).start()
174
+
175
+ _start() # ← تشغيل تلقائي