Dvs / worker.py
UNUSUALxd's picture
Update worker.py
4686359 verified
raw
history blame
3.87 kB
import asyncio
import random
from state import state
import scraper
from logger import get_logger
log = get_logger()
workers_active = []
async def worker_task(worker_id: int):
log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.")
await state.init_queue()
while True:
if state.status == "STOPPED":
break
if state.status == "PAUSED":
await asyncio.sleep(1)
continue
try:
# Atomic fetch from asynchronous local memory queues
word = state.queue.get_nowait()
except asyncio.QueueEmpty:
log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.")
break
# Explicitly pass None for proxy allocations to isolate proxy-free routing channels
proxy = None
try:
# Connect directly to the direct DOM scanning sequence
status = await scraper.check_fragment(word, proxy_url=proxy)
if status == "ERROR":
if not hasattr(state, 'fail_counts') or state.fail_counts is None:
state.fail_counts = {}
state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
# Failsafe 3-Strikes Matrix Boundary
if state.fail_counts[word] >= 3:
log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt")
await state.save_result(word, "error")
else:
# Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation
# This prevents rapid looping over the same unproxied connection
stagger_sleep = random.uniform(3.0, 6.0)
log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...")
await asyncio.sleep(stagger_sleep)
await state.queue.put(word)
elif status:
# Remove from failure cache logs upon successful classification mapping
if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
try: del state.fail_counts[word]
except KeyError: pass
await state.save_result(word, status)
except Exception as e:
log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}")
# Failsafe re-enqueue option to ensure data is never lost mid-flight
await state.queue.put(word)
await asyncio.sleep(4)
finally:
# Bulletproof Execution: Always register task completion to keep the event loop balanced
state.queue.task_done()
async def manage_workers():
global workers_active
while True:
if state.status == "RUNNING" and len(workers_active) < state.concurrency:
needed = state.concurrency - len(workers_active)
current_count = len(workers_active)
for i in range(needed):
t = asyncio.create_task(worker_task(current_count + i))
workers_active.append(t)
elif state.status == "STOPPED" and workers_active:
for t in workers_active:
t.cancel()
workers_active.clear()
log.info("All concurrent background workers forcefully torn down.")
# Keep tracking configurations active by filtering completed execution futures
workers_active = [t for t in workers_active if not t.done()]
await asyncio.sleep(1)