Voxxium commited on
Commit
8bc36d8
·
verified ·
1 Parent(s): a59cacd

Update orchestrator.py

Browse files
Files changed (1) hide show
  1. orchestrator.py +163 -83
orchestrator.py CHANGED
@@ -1,5 +1,19 @@
1
  """
2
- Orchestrator — Télécharge les listes, envoie au worker Render
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  """
4
 
5
  import time
@@ -12,8 +26,7 @@ from enum import Enum
12
  logger = logging.getLogger("orchestrator")
13
 
14
  HEADERS = {"User-Agent": "Mozilla/5.0"}
15
- BATCH_SIZE = 20
16
- PAUSE_BETWEEN_BATCHES = 5
17
 
18
  SOURCES = [
19
  (
@@ -28,13 +41,13 @@ SOURCES = [
28
  ),
29
  ]
30
 
31
- MAX_PER_PROTO = 200
32
-
33
 
34
  class Phase(Enum):
35
  IDLE = "idle"
36
  DOWNLOADING = "downloading"
37
- TESTING = "testing"
 
 
38
  DONE = "done"
39
 
40
 
@@ -47,12 +60,13 @@ class Orchestrator:
47
  self._phase = Phase.IDLE
48
  self._lock = threading.Lock()
49
 
50
- self._queue = []
51
- self._results = []
 
 
52
  self._cycle_start = 0.0
53
- self._batches_sent = 0
54
- self._total_tested = 0
55
 
 
56
  self._runs = 0
57
  self._last_duration = 0.0
58
  self._last_found = 0
@@ -70,8 +84,7 @@ class Orchestrator:
70
  "worker_url": self.worker_url,
71
  "worker_ok": self._worker_ok,
72
  "real_ip": self.real_ip,
73
- "queue_remaining": len(self._queue),
74
- "results_collected": len(self._results),
75
  }
76
 
77
  def _detect_ip(self):
@@ -84,20 +97,18 @@ class Orchestrator:
84
  )
85
  self.real_ip = r.json().get("ip", "")
86
  logger.info(f"Real IP: {self.real_ip}")
87
- except Exception as e:
88
- logger.warning(f"IP detect failed: {e}")
89
 
90
  def _check_worker(self):
91
  try:
92
  r = requests.get(
93
  f"{self.worker_url}/ping",
94
- timeout=10, headers=HEADERS,
95
  )
96
  self._worker_ok = r.status_code == 200
97
  except Exception:
98
  self._worker_ok = False
99
- if not self._worker_ok:
100
- logger.error("Worker unreachable!")
101
  return self._worker_ok
102
 
103
  def _download(self, url):
@@ -130,7 +141,8 @@ class Orchestrator:
130
  logger.error(f"Download failed: {e}")
131
  return []
132
 
133
- def _send_batch(self, proxies, protocol):
 
134
  try:
135
  r = requests.post(
136
  f"{self.worker_url}/test",
@@ -139,20 +151,45 @@ class Orchestrator:
139
  "protocol": protocol,
140
  "real_ip": self.real_ip,
141
  },
142
- timeout=60,
143
  headers=HEADERS,
144
  )
145
  r.raise_for_status()
 
146
  self._worker_ok = True
147
- return r.json().get("working", [])
148
  except Exception as e:
149
- logger.error(f"Batch failed: {e}")
150
  self._worker_ok = False
151
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
 
153
  def tick(self):
154
  self._ticks += 1
155
-
156
  with self._lock:
157
  phase = self._phase
158
 
@@ -160,8 +197,12 @@ class Orchestrator:
160
  self._do_idle()
161
  elif phase == Phase.DOWNLOADING:
162
  self._do_download()
163
- elif phase == Phase.TESTING:
164
- self._do_test()
 
 
 
 
165
  elif phase == Phase.DONE:
166
  self._do_done()
167
 
@@ -172,24 +213,21 @@ class Orchestrator:
172
  self._detect_ip()
173
 
174
  if not self._check_worker():
175
- logger.error("Skipping worker down")
176
  return
177
 
178
  self._cycle_start = time.time()
179
- self._queue = []
180
- self._results = []
181
- self._batches_sent = 0
182
- self._total_tested = 0
183
 
184
  with self._lock:
185
  self._phase = Phase.DOWNLOADING
186
 
187
  def _do_download(self):
188
- logger.info("📥 Downloading…")
189
 
190
  seen = set()
191
- queue = []
192
-
193
  for protocol, url in SOURCES:
194
  proxies = self._download(url)
195
  proxies = [p for p in proxies if p not in seen]
@@ -199,84 +237,127 @@ class Orchestrator:
199
  random.shuffle(proxies)
200
  proxies = proxies[:MAX_PER_PROTO]
201
 
202
- for p in proxies:
203
- queue.append((p, protocol))
204
-
205
  logger.info(f" {protocol.upper()}: {len(proxies)}")
206
  time.sleep(1)
207
 
208
- random.shuffle(queue)
209
- self._queue = queue
210
- logger.info(f" Queue: {len(queue)}")
211
 
212
  with self._lock:
213
- self._phase = Phase.TESTING
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
 
215
- def _do_test(self):
216
- if not self._queue:
 
 
217
  with self._lock:
218
- self._phase = Phase.DONE
219
  return
220
 
221
- batch_items = self._queue[:BATCH_SIZE]
222
- self._queue = self._queue[BATCH_SIZE:]
 
 
 
 
223
 
224
- by_proto = {}
225
- for proxy, proto in batch_items:
226
- by_proto.setdefault(proto, []).append(proxy)
 
227
 
228
- for proto, proxies in by_proto.items():
229
- working = self._send_batch(proxies, proto)
 
230
 
231
- for w in working:
232
- self._results.append({
233
- "proxy": w["proxy"],
234
- "protocol": proto,
235
- "proxy_url": f"{proto}://{w['proxy']}",
236
- "latency": w["latency"],
237
- "proxy_ip": w["proxy_ip"],
238
- "verified": w["verified"],
239
- })
240
 
241
- self._total_tested += len(proxies)
 
242
 
243
- self._batches_sent += 1
244
- remaining = len(self._queue)
 
245
 
246
- logger.info(
247
- f" Batch #{self._batches_sent}: "
248
- f" {len(self._results)} | "
249
- f"⏳ {remaining} left"
250
- )
251
 
252
- if remaining > 0:
253
- time.sleep(PAUSE_BETWEEN_BATCHES)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
 
255
  def _do_done(self):
256
  duration = time.time() - self._cycle_start
257
 
258
- if self._results:
259
- self.pool.refresh(self._results)
260
 
261
  self._runs += 1
262
  self._last_duration = round(duration, 1)
263
- self._last_found = len(self._results)
264
 
265
  from collections import Counter
266
- c = Counter(r["protocol"] for r in self._results)
267
 
268
  logger.info("=" * 40)
269
  logger.info(f"✅ CYCLE #{self._runs}")
270
- logger.info(f" Found: {len(self._results)}")
271
- logger.info(f" Tested: {self._total_tested}")
272
- logger.info(f" Batches: {self._batches_sent}")
273
- logger.info(f" Time: {self._last_duration}s")
274
- logger.info(f" SOCKS5: {c.get('socks5', 0)}")
275
- logger.info(f" HTTP: {c.get('http', 0)}")
276
  logger.info("=" * 40)
277
 
278
- self._queue = []
279
- self._results = []
 
280
 
281
  with self._lock:
282
  self._phase = Phase.IDLE
@@ -284,8 +365,7 @@ class Orchestrator:
284
  def ping_worker(self):
285
  try:
286
  r = requests.get(
287
- f"{self.worker_url}/ping",
288
- timeout=10,
289
  )
290
  self._worker_ok = r.status_code == 200
291
  except Exception:
 
1
  """
2
+ Orchestrator v2 Envoie tout d'un coup, poll le résultat
3
+ ──────────────────────────────────────────────────────────
4
+
5
+ Flow:
6
+ 1. IDLE → télécharge les listes GitHub
7
+ 2. SENDING → envoie chaque protocole au worker (1 POST chacun)
8
+ 3. POLLING → poll /job/{id} toutes les 30s
9
+ 4. COLLECTING → quand jobs finis, récupère résultats
10
+ 5. DONE → refresh pool → IDLE
11
+
12
+ Requêtes HF sortantes:
13
+ - 2× GitHub (download listes)
14
+ - 2× POST worker (envoi proxies)
15
+ - N× GET worker (poll status)
16
+ = que des requêtes vers GitHub et Render = SAFE
17
  """
18
 
19
  import time
 
26
  logger = logging.getLogger("orchestrator")
27
 
28
  HEADERS = {"User-Agent": "Mozilla/5.0"}
29
+ MAX_PER_PROTO = 500 # on peut envoyer plus maintenant
 
30
 
31
  SOURCES = [
32
  (
 
41
  ),
42
  ]
43
 
 
 
44
 
45
  class Phase(Enum):
46
  IDLE = "idle"
47
  DOWNLOADING = "downloading"
48
+ SENDING = "sending"
49
+ POLLING = "polling"
50
+ COLLECTING = "collecting"
51
  DONE = "done"
52
 
53
 
 
60
  self._phase = Phase.IDLE
61
  self._lock = threading.Lock()
62
 
63
+ # données du cycle
64
+ self._proxy_lists = {} # {protocol: [proxies]}
65
+ self._job_ids = {} # {protocol: job_id}
66
+ self._all_results = []
67
  self._cycle_start = 0.0
 
 
68
 
69
+ # stats
70
  self._runs = 0
71
  self._last_duration = 0.0
72
  self._last_found = 0
 
84
  "worker_url": self.worker_url,
85
  "worker_ok": self._worker_ok,
86
  "real_ip": self.real_ip,
87
+ "active_jobs": dict(self._job_ids),
 
88
  }
89
 
90
  def _detect_ip(self):
 
97
  )
98
  self.real_ip = r.json().get("ip", "")
99
  logger.info(f"Real IP: {self.real_ip}")
100
+ except Exception:
101
+ logger.warning("IP detect failed")
102
 
103
  def _check_worker(self):
104
  try:
105
  r = requests.get(
106
  f"{self.worker_url}/ping",
107
+ timeout=10,
108
  )
109
  self._worker_ok = r.status_code == 200
110
  except Exception:
111
  self._worker_ok = False
 
 
112
  return self._worker_ok
113
 
114
  def _download(self, url):
 
141
  logger.error(f"Download failed: {e}")
142
  return []
143
 
144
+ def _send_to_worker(self, proxies, protocol):
145
+ """Envoie un gros batch au worker, retourne job_id."""
146
  try:
147
  r = requests.post(
148
  f"{self.worker_url}/test",
 
151
  "protocol": protocol,
152
  "real_ip": self.real_ip,
153
  },
154
+ timeout=30,
155
  headers=HEADERS,
156
  )
157
  r.raise_for_status()
158
+ data = r.json()
159
  self._worker_ok = True
160
+ return data.get("job_id")
161
  except Exception as e:
162
+ logger.error(f"Send failed: {e}")
163
  self._worker_ok = False
164
+ return None
165
+
166
+ def _poll_job(self, job_id):
167
+ """Poll le statut d'un job. Retourne le dict."""
168
+ try:
169
+ r = requests.get(
170
+ f"{self.worker_url}/job/{job_id}",
171
+ timeout=15, headers=HEADERS,
172
+ )
173
+ r.raise_for_status()
174
+ self._worker_ok = True
175
+ return r.json()
176
+ except Exception as e:
177
+ logger.error(f"Poll failed: {e}")
178
+ self._worker_ok = False
179
+ return None
180
+
181
+ def _cleanup_job(self, job_id):
182
+ """Supprime un job terminé sur le worker."""
183
+ try:
184
+ requests.delete(
185
+ f"{self.worker_url}/job/{job_id}",
186
+ timeout=10,
187
+ )
188
+ except Exception:
189
+ pass
190
 
191
  def tick(self):
192
  self._ticks += 1
 
193
  with self._lock:
194
  phase = self._phase
195
 
 
197
  self._do_idle()
198
  elif phase == Phase.DOWNLOADING:
199
  self._do_download()
200
+ elif phase == Phase.SENDING:
201
+ self._do_send()
202
+ elif phase == Phase.POLLING:
203
+ self._do_poll()
204
+ elif phase == Phase.COLLECTING:
205
+ self._do_collect()
206
  elif phase == Phase.DONE:
207
  self._do_done()
208
 
 
213
  self._detect_ip()
214
 
215
  if not self._check_worker():
216
+ logger.error("Worker down, skip")
217
  return
218
 
219
  self._cycle_start = time.time()
220
+ self._proxy_lists = {}
221
+ self._job_ids = {}
222
+ self._all_results = []
 
223
 
224
  with self._lock:
225
  self._phase = Phase.DOWNLOADING
226
 
227
  def _do_download(self):
228
+ logger.info("📥 Downloading lists…")
229
 
230
  seen = set()
 
 
231
  for protocol, url in SOURCES:
232
  proxies = self._download(url)
233
  proxies = [p for p in proxies if p not in seen]
 
237
  random.shuffle(proxies)
238
  proxies = proxies[:MAX_PER_PROTO]
239
 
240
+ self._proxy_lists[protocol] = proxies
 
 
241
  logger.info(f" {protocol.upper()}: {len(proxies)}")
242
  time.sleep(1)
243
 
244
+ total = sum(len(v) for v in self._proxy_lists.values())
245
+ logger.info(f" Total: {total}")
 
246
 
247
  with self._lock:
248
+ self._phase = Phase.SENDING
249
+
250
+ def _do_send(self):
251
+ """Envoie chaque protocole au worker."""
252
+ logger.info("📤 Sending to worker…")
253
+
254
+ for protocol, proxies in self._proxy_lists.items():
255
+ if not proxies:
256
+ continue
257
+
258
+ job_id = self._send_to_worker(proxies, protocol)
259
+ if job_id:
260
+ self._job_ids[protocol] = job_id
261
+ logger.info(
262
+ f" {protocol.upper()}: "
263
+ f"job {job_id} ({len(proxies)} proxies)"
264
+ )
265
+ else:
266
+ logger.error(
267
+ f" {protocol.upper()}: send failed"
268
+ )
269
 
270
+ time.sleep(1)
271
+
272
+ if not self._job_ids:
273
+ logger.error("No jobs created, aborting")
274
  with self._lock:
275
+ self._phase = Phase.IDLE
276
  return
277
 
278
+ with self._lock:
279
+ self._phase = Phase.POLLING
280
+
281
+ def _do_poll(self):
282
+ """Poll les jobs en cours."""
283
+ all_done = True
284
 
285
+ for protocol, job_id in list(self._job_ids.items()):
286
+ data = self._poll_job(job_id)
287
+ if not data:
288
+ continue
289
 
290
+ status = data.get("status", "unknown")
291
+ progress = data.get("progress", 0)
292
+ working = data.get("working", 0)
293
 
294
+ logger.info(
295
+ f" {protocol.upper()} [{job_id}]: "
296
+ f"{status} {progress}% "
297
+ f"✅ {working}"
298
+ )
 
 
 
 
299
 
300
+ if status != "done":
301
+ all_done = False
302
 
303
+ if all_done:
304
+ with self._lock:
305
+ self._phase = Phase.COLLECTING
306
 
307
+ def _do_collect(self):
308
+ """Récupère les résultats de tous les jobs."""
309
+ logger.info("📦 Collecting results…")
 
 
310
 
311
+ for protocol, job_id in self._job_ids.items():
312
+ data = self._poll_job(job_id)
313
+ if not data:
314
+ continue
315
+
316
+ results = data.get("results", [])
317
+ for r in results:
318
+ self._all_results.append({
319
+ "proxy": r["proxy"],
320
+ "protocol": protocol,
321
+ "proxy_url": f"{protocol}://{r['proxy']}",
322
+ "latency": r["latency"],
323
+ "proxy_ip": r["proxy_ip"],
324
+ "verified": r["verified"],
325
+ })
326
+
327
+ logger.info(
328
+ f" {protocol.upper()}: {len(results)} working"
329
+ )
330
+
331
+ # cleanup
332
+ self._cleanup_job(job_id)
333
+
334
+ with self._lock:
335
+ self._phase = Phase.DONE
336
 
337
  def _do_done(self):
338
  duration = time.time() - self._cycle_start
339
 
340
+ if self._all_results:
341
+ self.pool.refresh(self._all_results)
342
 
343
  self._runs += 1
344
  self._last_duration = round(duration, 1)
345
+ self._last_found = len(self._all_results)
346
 
347
  from collections import Counter
348
+ c = Counter(r["protocol"] for r in self._all_results)
349
 
350
  logger.info("=" * 40)
351
  logger.info(f"✅ CYCLE #{self._runs}")
352
+ logger.info(f" Found: {len(self._all_results)}")
353
+ logger.info(f" Time: {self._last_duration}s")
354
+ logger.info(f" SOCKS5: {c.get('socks5', 0)}")
355
+ logger.info(f" HTTP: {c.get('http', 0)}")
 
 
356
  logger.info("=" * 40)
357
 
358
+ self._proxy_lists = {}
359
+ self._job_ids = {}
360
+ self._all_results = []
361
 
362
  with self._lock:
363
  self._phase = Phase.IDLE
 
365
  def ping_worker(self):
366
  try:
367
  r = requests.get(
368
+ f"{self.worker_url}/ping", timeout=10,
 
369
  )
370
  self._worker_ok = r.status_code == 200
371
  except Exception: