Dvs / worker.py
Muttered3's picture
Update worker.py
5e0f261 verified
raw
history blame
3.06 kB
import asyncio
import random
from state import state
import scraper
from logger import get_logger
log = get_logger()
workers_active = []
async def worker_task(worker_id: int):
log.info(f"Worker {worker_id} started.")
await state.init_queue()
while True:
try:
# Safe boundary checks for structural state execution
if state.status == "STOPPED":
break
if state.status == "PAUSED":
await asyncio.sleep(1)
continue
try:
word = state.queue.get_nowait()
except asyncio.QueueEmpty:
log.info(f"Worker {worker_id} finished. Queue empty.")
break
# Handle rotating proxy allocation safely
proxy = random.choice(state.proxies) if state.proxies else None
# Execute AJAX search extraction
status = await scraper.check_fragment(word, proxy_url=proxy)
if status == "ERROR":
# CRITICAL FIX: Ensure property array initialization inside the thread loop space
if not hasattr(state, 'fail_counts') or state.fail_counts is None:
state.fail_counts = {}
state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
# Failsafe 3-Strikes Adaptive Moving Sequence
if state.fail_counts[word] >= 3:
await state.save_result(word, "error")
else:
await state.queue.put(word)
# Dynamic staggered delay before recycling word to prevent loop collisions
await asyncio.sleep(random.uniform(1.5, 3.5))
elif status:
# Remove word from failure map tracking on successful evaluation
if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
del state.fail_counts[word]
await state.save_result(word, status)
state.queue.task_done()
except Exception as e:
log.error(f"⚠️ Worker {worker_id} safely intercepted loop exception: {str(e)}")
await asyncio.sleep(2)
async def manage_workers():
global workers_active
while True:
if state.status == "RUNNING" and len(workers_active) < state.concurrency:
needed = state.concurrency - len(workers_active)
current_count = len(workers_active)
for i in range(needed):
t = asyncio.create_task(worker_task(current_count + i))
workers_active.append(t)
elif state.status == "STOPPED" and workers_active:
for t in workers_active:
t.cancel()
workers_active.clear()
log.info("All workers forcefully stopped.")
workers_active = [t for t in workers_active if not t.done()]
await asyncio.sleep(1)