import asyncio import random from state import state import scraper from logger import get_logger log = get_logger() workers_active = [] async def worker_task(worker_id: int): log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.") await state.init_queue() while True: if state.status == "STOPPED": break if state.status == "PAUSED": await asyncio.sleep(1) continue try: # Atomic fetch from asynchronous local memory queues word = state.queue.get_nowait() except asyncio.QueueEmpty: log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.") break # Explicitly pass None for proxy allocations to isolate proxy-free routing channels proxy = None try: # Connect directly to the direct DOM scanning sequence status = await scraper.check_fragment(word, proxy_url=proxy) if status == "ERROR": if not hasattr(state, 'fail_counts') or state.fail_counts is None: state.fail_counts = {} state.fail_counts[word] = state.fail_counts.get(word, 0) + 1 # Failsafe 3-Strikes Matrix Boundary if state.fail_counts[word] >= 3: log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt") await state.save_result(word, "error") else: # Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation # This prevents rapid looping over the same unproxied connection stagger_sleep = random.uniform(3.0, 6.0) log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...") await asyncio.sleep(stagger_sleep) await state.queue.put(word) elif status: # Remove from failure cache logs upon successful classification mapping if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts: try: del state.fail_counts[word] except KeyError: pass await state.save_result(word, status) except Exception as e: log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}") # Failsafe re-enqueue option to ensure data is never lost mid-flight await state.queue.put(word) await asyncio.sleep(4) finally: # Bulletproof Execution: Always register task completion to keep the event loop balanced state.queue.task_done() async def manage_workers(): global workers_active while True: if state.status == "RUNNING" and len(workers_active) < state.concurrency: needed = state.concurrency - len(workers_active) current_count = len(workers_active) for i in range(needed): t = asyncio.create_task(worker_task(current_count + i)) workers_active.append(t) elif state.status == "STOPPED" and workers_active: for t in workers_active: t.cancel() workers_active.clear() log.info("All concurrent background workers forcefully torn down.") # Keep tracking configurations active by filtering completed execution futures workers_active = [t for t in workers_active if not t.done()] await asyncio.sleep(1)