Spaces:
Paused
Paused
| import asyncio | |
| import random | |
| from state import state | |
| import scraper | |
| from logger import get_logger | |
| log = get_logger() | |
| workers_active = [] | |
| async def worker_task(worker_id: int): | |
| log.info(f"Worker {worker_id} started.") | |
| await state.init_queue() | |
| while True: | |
| try: | |
| # Safe boundary checks for structural state execution | |
| if state.status == "STOPPED": | |
| break | |
| if state.status == "PAUSED": | |
| await asyncio.sleep(1) | |
| continue | |
| try: | |
| word = state.queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| log.info(f"Worker {worker_id} finished. Queue empty.") | |
| break | |
| # Handle rotating proxy allocation safely | |
| proxy = random.choice(state.proxies) if state.proxies else None | |
| # Execute AJAX search extraction | |
| status = await scraper.check_fragment(word, proxy_url=proxy) | |
| if status == "ERROR": | |
| # CRITICAL FIX: Ensure property array initialization inside the thread loop space | |
| if not hasattr(state, 'fail_counts') or state.fail_counts is None: | |
| state.fail_counts = {} | |
| state.fail_counts[word] = state.fail_counts.get(word, 0) + 1 | |
| # Failsafe 3-Strikes Adaptive Moving Sequence | |
| if state.fail_counts[word] >= 3: | |
| await state.save_result(word, "error") | |
| else: | |
| await state.queue.put(word) | |
| # Dynamic staggered delay before recycling word to prevent loop collisions | |
| await asyncio.sleep(random.uniform(1.5, 3.5)) | |
| elif status: | |
| # Remove word from failure map tracking on successful evaluation | |
| if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts: | |
| del state.fail_counts[word] | |
| await state.save_result(word, status) | |
| state.queue.task_done() | |
| except Exception as e: | |
| log.error(f"⚠️ Worker {worker_id} safely intercepted loop exception: {str(e)}") | |
| await asyncio.sleep(2) | |
| async def manage_workers(): | |
| global workers_active | |
| while True: | |
| if state.status == "RUNNING" and len(workers_active) < state.concurrency: | |
| needed = state.concurrency - len(workers_active) | |
| current_count = len(workers_active) | |
| for i in range(needed): | |
| t = asyncio.create_task(worker_task(current_count + i)) | |
| workers_active.append(t) | |
| elif state.status == "STOPPED" and workers_active: | |
| for t in workers_active: | |
| t.cancel() | |
| workers_active.clear() | |
| log.info("All workers forcefully stopped.") | |
| workers_active = [t for t in workers_active if not t.done()] | |
| await asyncio.sleep(1) |