Muttered3 commited on
Commit
cc64b3c
·
verified ·
1 Parent(s): 7a8a39d

Update worker.py

Browse files
Files changed (1) hide show
  1. worker.py +44 -30
worker.py CHANGED
@@ -9,65 +9,79 @@ workers_active = []
9
 
10
  async def worker_task(worker_id: int):
11
  log.info(f"Worker {worker_id} started.")
 
 
12
  await state.init_queue()
13
 
14
  while True:
15
  try:
16
- # Handle STOP state
17
  if state.status == "STOPPED":
 
18
  break
19
 
20
- # Handle PAUSE state
21
  if state.status == "PAUSED":
22
  await asyncio.sleep(1)
23
  continue
24
 
25
- # Try to get a word from the in-memory queue
26
  try:
27
- word = state.queue.get_nowait()
28
- except asyncio.QueueEmpty:
29
- log.info(f"Worker {worker_id} finished. Queue empty.")
30
- break
 
 
 
31
 
32
- # Pick a proxy if available
33
  proxy = random.choice(state.proxies) if state.proxies else None
34
 
35
- # Execute scrape
36
  status = await scraper.check_fragment(word, proxy_url=proxy)
37
 
38
  if status == "ERROR":
39
- # Put it back to try again later
40
- await state.queue.put(word)
41
- await asyncio.sleep(1.5)
 
 
 
 
 
 
42
  elif status:
43
  await state.save_result(word, status)
44
 
45
  state.queue.task_done()
46
 
47
  except Exception as e:
48
- log.exception(f"Worker {worker_id} crashed:")
49
  await asyncio.sleep(2)
50
 
51
  async def manage_workers():
52
- """Monitors state and spins up workers as needed."""
53
  global workers_active
54
 
55
  while True:
56
- if state.status == "RUNNING" and len(workers_active) < state.concurrency:
57
- # Spin up missing workers
58
- needed = state.concurrency - len(workers_active)
59
- for i in range(needed):
60
- t = asyncio.create_task(worker_task(len(workers_active) + i))
61
- workers_active.append(t)
62
-
63
- elif state.status == "STOPPED" and workers_active:
64
- # Cancel all workers if stopped
65
- for t in workers_active:
66
- t.cancel()
67
- workers_active.clear()
68
- log.info("All workers forcefully stopped.")
 
 
 
 
 
 
 
 
69
 
70
- # Clean up finished tasks from the active list
71
- workers_active = [t for t in workers_active if not t.done()]
72
-
73
  await asyncio.sleep(1)
 
9
 
10
  async def worker_task(worker_id: int):
11
  log.info(f"Worker {worker_id} started.")
12
+
13
+ # Ensure queue state is completely bound to the active loop thread
14
  await state.init_queue()
15
 
16
  while True:
17
  try:
 
18
  if state.status == "STOPPED":
19
+ log.info(f"Worker {worker_id} stopped via command.")
20
  break
21
 
 
22
  if state.status == "PAUSED":
23
  await asyncio.sleep(1)
24
  continue
25
 
26
+ # Bulletproof non-blocking queue fetch with polling failsafe
27
  try:
28
+ word = await asyncio.wait_for(state.queue.get(), timeout=2.0)
29
+ except asyncio.TimeoutError:
30
+ # Failsafe check: if scanner is still running but queue is genuinely depleted, exit gracefully
31
+ if state.queue.empty():
32
+ log.info(f"Worker {worker_id} exiting. Queue is empty.")
33
+ break
34
+ continue
35
 
36
+ # Thread-safe proxy slicing
37
  proxy = random.choice(state.proxies) if state.proxies else None
38
 
39
+ # Execute HTTP scrape request via native aiohttp engine
40
  status = await scraper.check_fragment(word, proxy_url=proxy)
41
 
42
  if status == "ERROR":
43
+ state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
44
+
45
+ # 3-Strike proxy drops rule
46
+ if state.fail_counts[word] >= 3:
47
+ log.error(f"❌ Word '{word}' failed 3 distinct proxy attempts. Logging to error bin.")
48
+ await state.save_result(word, "error")
49
+ else:
50
+ await state.queue.put(word)
51
+ await asyncio.sleep(1.0)
52
  elif status:
53
  await state.save_result(word, status)
54
 
55
  state.queue.task_done()
56
 
57
  except Exception as e:
58
+ log.exception(f"⚠️ Worker {worker_id} unhandled crash loop event:")
59
  await asyncio.sleep(2)
60
 
61
  async def manage_workers():
 
62
  global workers_active
63
 
64
  while True:
65
+ try:
66
+ if state.status == "RUNNING":
67
+ # Dynamically fill out the active worker pool target
68
+ if len(workers_active) < state.concurrency:
69
+ needed = state.concurrency - len(workers_active)
70
+ current_count = len(workers_active)
71
+ for i in range(needed):
72
+ t = asyncio.create_task(worker_task(current_count + i))
73
+ workers_active.append(t)
74
+
75
+ elif state.status == "STOPPED" and workers_active:
76
+ for t in workers_active:
77
+ t.cancel()
78
+ workers_active.clear()
79
+ log.info("Manager: All active worker pools cleared and forcefully reset.")
80
+
81
+ # Filter and drop dead/finished thread futures cleanly
82
+ workers_active = [t for t in workers_active if not t.done()]
83
+
84
+ except Exception as e:
85
+ log.error(f"Manager Engine Fault: {str(e)}")
86
 
 
 
 
87
  await asyncio.sleep(1)