Spaces:
Paused
Paused
Update worker.py
#4
by UNUSUALxd - opened
worker.py
CHANGED
|
@@ -8,56 +8,67 @@ log = get_logger()
|
|
| 8 |
workers_active = []
|
| 9 |
|
| 10 |
async def worker_task(worker_id: int):
|
| 11 |
-
log.info(f"Worker {worker_id}
|
| 12 |
await state.init_queue()
|
| 13 |
|
| 14 |
while True:
|
| 15 |
-
|
| 16 |
-
|
| 17 |
-
|
| 18 |
-
|
| 19 |
-
|
| 20 |
-
|
| 21 |
-
await asyncio.sleep(1)
|
| 22 |
-
continue
|
| 23 |
|
| 24 |
-
|
| 25 |
-
|
| 26 |
-
|
| 27 |
-
|
| 28 |
-
|
|
|
|
| 29 |
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
|
| 33 |
-
|
|
|
|
| 34 |
status = await scraper.check_fragment(word, proxy_url=proxy)
|
| 35 |
|
| 36 |
if status == "ERROR":
|
| 37 |
-
# CRITICAL FIX: Ensure property array initialization inside the thread loop space
|
| 38 |
if not hasattr(state, 'fail_counts') or state.fail_counts is None:
|
| 39 |
state.fail_counts = {}
|
| 40 |
|
| 41 |
state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
|
| 42 |
|
| 43 |
-
# Failsafe 3-Strikes
|
| 44 |
if state.fail_counts[word] >= 3:
|
|
|
|
| 45 |
await state.save_result(word, "error")
|
| 46 |
else:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 47 |
await state.queue.put(word)
|
| 48 |
-
|
| 49 |
-
await asyncio.sleep(random.uniform(1.5, 3.5))
|
| 50 |
elif status:
|
| 51 |
-
# Remove
|
| 52 |
if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
|
| 53 |
-
del state.fail_counts[word]
|
|
|
|
|
|
|
| 54 |
await state.save_result(word, status)
|
| 55 |
|
| 56 |
-
state.queue.task_done()
|
| 57 |
-
|
| 58 |
except Exception as e:
|
| 59 |
-
log.error(f"
|
| 60 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 61 |
|
| 62 |
async def manage_workers():
|
| 63 |
global workers_active
|
|
@@ -74,7 +85,8 @@ async def manage_workers():
|
|
| 74 |
for t in workers_active:
|
| 75 |
t.cancel()
|
| 76 |
workers_active.clear()
|
| 77 |
-
log.info("All workers forcefully
|
| 78 |
|
|
|
|
| 79 |
workers_active = [t for t in workers_active if not t.done()]
|
| 80 |
await asyncio.sleep(1)
|
|
|
|
| 8 |
workers_active = []
|
| 9 |
|
| 10 |
async def worker_task(worker_id: int):
|
| 11 |
+
log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.")
|
| 12 |
await state.init_queue()
|
| 13 |
|
| 14 |
while True:
|
| 15 |
+
if state.status == "STOPPED":
|
| 16 |
+
break
|
| 17 |
+
|
| 18 |
+
if state.status == "PAUSED":
|
| 19 |
+
await asyncio.sleep(1)
|
| 20 |
+
continue
|
|
|
|
|
|
|
| 21 |
|
| 22 |
+
try:
|
| 23 |
+
# Atomic fetch from asynchronous local memory queues
|
| 24 |
+
word = state.queue.get_nowait()
|
| 25 |
+
except asyncio.QueueEmpty:
|
| 26 |
+
log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.")
|
| 27 |
+
break
|
| 28 |
|
| 29 |
+
# Explicitly pass None for proxy allocations to isolate proxy-free routing channels
|
| 30 |
+
proxy = None
|
| 31 |
+
|
| 32 |
+
try:
|
| 33 |
+
# Connect directly to the direct DOM scanning sequence
|
| 34 |
status = await scraper.check_fragment(word, proxy_url=proxy)
|
| 35 |
|
| 36 |
if status == "ERROR":
|
|
|
|
| 37 |
if not hasattr(state, 'fail_counts') or state.fail_counts is None:
|
| 38 |
state.fail_counts = {}
|
| 39 |
|
| 40 |
state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
|
| 41 |
|
| 42 |
+
# Failsafe 3-Strikes Matrix Boundary
|
| 43 |
if state.fail_counts[word] >= 3:
|
| 44 |
+
log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt")
|
| 45 |
await state.save_result(word, "error")
|
| 46 |
else:
|
| 47 |
+
# Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation
|
| 48 |
+
# This prevents rapid looping over the same unproxied connection
|
| 49 |
+
stagger_sleep = random.uniform(3.0, 6.0)
|
| 50 |
+
log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...")
|
| 51 |
+
await asyncio.sleep(stagger_sleep)
|
| 52 |
+
|
| 53 |
await state.queue.put(word)
|
| 54 |
+
|
|
|
|
| 55 |
elif status:
|
| 56 |
+
# Remove from failure cache logs upon successful classification mapping
|
| 57 |
if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
|
| 58 |
+
try: del state.fail_counts[word]
|
| 59 |
+
except KeyError: pass
|
| 60 |
+
|
| 61 |
await state.save_result(word, status)
|
| 62 |
|
|
|
|
|
|
|
| 63 |
except Exception as e:
|
| 64 |
+
log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}")
|
| 65 |
+
# Failsafe re-enqueue option to ensure data is never lost mid-flight
|
| 66 |
+
await state.queue.put(word)
|
| 67 |
+
await asyncio.sleep(4)
|
| 68 |
+
|
| 69 |
+
finally:
|
| 70 |
+
# Bulletproof Execution: Always register task completion to keep the event loop balanced
|
| 71 |
+
state.queue.task_done()
|
| 72 |
|
| 73 |
async def manage_workers():
|
| 74 |
global workers_active
|
|
|
|
| 85 |
for t in workers_active:
|
| 86 |
t.cancel()
|
| 87 |
workers_active.clear()
|
| 88 |
+
log.info("All concurrent background workers forcefully torn down.")
|
| 89 |
|
| 90 |
+
# Keep tracking configurations active by filtering completed execution futures
|
| 91 |
workers_active = [t for t in workers_active if not t.done()]
|
| 92 |
await asyncio.sleep(1)
|