Files changed (1) hide show
  1. worker.py +41 -29
worker.py CHANGED
@@ -8,56 +8,67 @@ log = get_logger()
8
  workers_active = []
9
 
10
  async def worker_task(worker_id: int):
11
- log.info(f"Worker {worker_id} started.")
12
  await state.init_queue()
13
 
14
  while True:
15
- try:
16
- # Safe boundary checks for structural state execution
17
- if state.status == "STOPPED":
18
- break
19
-
20
- if state.status == "PAUSED":
21
- await asyncio.sleep(1)
22
- continue
23
 
24
- try:
25
- word = state.queue.get_nowait()
26
- except asyncio.QueueEmpty:
27
- log.info(f"Worker {worker_id} finished. Queue empty.")
28
- break
 
29
 
30
- # Handle rotating proxy allocation safely
31
- proxy = random.choice(state.proxies) if state.proxies else None
32
-
33
- # Execute AJAX search extraction
 
34
  status = await scraper.check_fragment(word, proxy_url=proxy)
35
 
36
  if status == "ERROR":
37
- # CRITICAL FIX: Ensure property array initialization inside the thread loop space
38
  if not hasattr(state, 'fail_counts') or state.fail_counts is None:
39
  state.fail_counts = {}
40
 
41
  state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
42
 
43
- # Failsafe 3-Strikes Adaptive Moving Sequence
44
  if state.fail_counts[word] >= 3:
 
45
  await state.save_result(word, "error")
46
  else:
 
 
 
 
 
 
47
  await state.queue.put(word)
48
- # Dynamic staggered delay before recycling word to prevent loop collisions
49
- await asyncio.sleep(random.uniform(1.5, 3.5))
50
  elif status:
51
- # Remove word from failure map tracking on successful evaluation
52
  if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
53
- del state.fail_counts[word]
 
 
54
  await state.save_result(word, status)
55
 
56
- state.queue.task_done()
57
-
58
  except Exception as e:
59
- log.error(f"⚠️ Worker {worker_id} safely intercepted loop exception: {str(e)}")
60
- await asyncio.sleep(2)
 
 
 
 
 
 
61
 
62
  async def manage_workers():
63
  global workers_active
@@ -74,7 +85,8 @@ async def manage_workers():
74
  for t in workers_active:
75
  t.cancel()
76
  workers_active.clear()
77
- log.info("All workers forcefully stopped.")
78
 
 
79
  workers_active = [t for t in workers_active if not t.done()]
80
  await asyncio.sleep(1)
 
8
  workers_active = []
9
 
10
  async def worker_task(worker_id: int):
11
+ log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.")
12
  await state.init_queue()
13
 
14
  while True:
15
+ if state.status == "STOPPED":
16
+ break
17
+
18
+ if state.status == "PAUSED":
19
+ await asyncio.sleep(1)
20
+ continue
 
 
21
 
22
+ try:
23
+ # Atomic fetch from asynchronous local memory queues
24
+ word = state.queue.get_nowait()
25
+ except asyncio.QueueEmpty:
26
+ log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.")
27
+ break
28
 
29
+ # Explicitly pass None for proxy allocations to isolate proxy-free routing channels
30
+ proxy = None
31
+
32
+ try:
33
+ # Connect directly to the direct DOM scanning sequence
34
  status = await scraper.check_fragment(word, proxy_url=proxy)
35
 
36
  if status == "ERROR":
 
37
  if not hasattr(state, 'fail_counts') or state.fail_counts is None:
38
  state.fail_counts = {}
39
 
40
  state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
41
 
42
+ # Failsafe 3-Strikes Matrix Boundary
43
  if state.fail_counts[word] >= 3:
44
+ log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt")
45
  await state.save_result(word, "error")
46
  else:
47
+ # Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation
48
+ # This prevents rapid looping over the same unproxied connection
49
+ stagger_sleep = random.uniform(3.0, 6.0)
50
+ log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...")
51
+ await asyncio.sleep(stagger_sleep)
52
+
53
  await state.queue.put(word)
54
+
 
55
  elif status:
56
+ # Remove from failure cache logs upon successful classification mapping
57
  if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
58
+ try: del state.fail_counts[word]
59
+ except KeyError: pass
60
+
61
  await state.save_result(word, status)
62
 
 
 
63
  except Exception as e:
64
+ log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}")
65
+ # Failsafe re-enqueue option to ensure data is never lost mid-flight
66
+ await state.queue.put(word)
67
+ await asyncio.sleep(4)
68
+
69
+ finally:
70
+ # Bulletproof Execution: Always register task completion to keep the event loop balanced
71
+ state.queue.task_done()
72
 
73
  async def manage_workers():
74
  global workers_active
 
85
  for t in workers_active:
86
  t.cancel()
87
  workers_active.clear()
88
+ log.info("All concurrent background workers forcefully torn down.")
89
 
90
+ # Keep tracking configurations active by filtering completed execution futures
91
  workers_active = [t for t in workers_active if not t.done()]
92
  await asyncio.sleep(1)