import asyncio import random from state import state import scraper from logger import get_logger log = get_logger() workers_active = [] async def worker_task(worker_id: int): log.info(f"Worker {worker_id} started.") await state.init_queue() while True: try: # Safe boundary checks for structural state execution if state.status == "STOPPED": break if state.status == "PAUSED": await asyncio.sleep(1) continue try: word = state.queue.get_nowait() except asyncio.QueueEmpty: log.info(f"Worker {worker_id} finished. Queue empty.") break # Handle rotating proxy allocation safely proxy = random.choice(state.proxies) if state.proxies else None # Execute AJAX search extraction status = await scraper.check_fragment(word, proxy_url=proxy) if status == "ERROR": # CRITICAL FIX: Ensure property array initialization inside the thread loop space if not hasattr(state, 'fail_counts') or state.fail_counts is None: state.fail_counts = {} state.fail_counts[word] = state.fail_counts.get(word, 0) + 1 # Failsafe 3-Strikes Adaptive Moving Sequence if state.fail_counts[word] >= 3: await state.save_result(word, "error") else: await state.queue.put(word) # Dynamic staggered delay before recycling word to prevent loop collisions await asyncio.sleep(random.uniform(1.5, 3.5)) elif status: # Remove word from failure map tracking on successful evaluation if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts: del state.fail_counts[word] await state.save_result(word, status) state.queue.task_done() except Exception as e: log.error(f"⚠️ Worker {worker_id} safely intercepted loop exception: {str(e)}") await asyncio.sleep(2) async def manage_workers(): global workers_active while True: if state.status == "RUNNING" and len(workers_active) < state.concurrency: needed = state.concurrency - len(workers_active) current_count = len(workers_active) for i in range(needed): t = asyncio.create_task(worker_task(current_count + i)) workers_active.append(t) elif state.status == "STOPPED" and workers_active: for t in workers_active: t.cancel() workers_active.clear() log.info("All workers forcefully stopped.") workers_active = [t for t in workers_active if not t.done()] await asyncio.sleep(1)