Spaces:
Paused
Paused
| import asyncio | |
| import random | |
| from state import state | |
| import scraper | |
| from logger import get_logger | |
| log = get_logger() | |
| workers_active = [] | |
| async def worker_task(worker_id: int): | |
| log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.") | |
| await state.init_queue() | |
| while True: | |
| if state.status == "STOPPED": | |
| break | |
| if state.status == "PAUSED": | |
| await asyncio.sleep(1) | |
| continue | |
| try: | |
| # Atomic fetch from asynchronous local memory queues | |
| word = state.queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.") | |
| break | |
| # Explicitly pass None for proxy allocations to isolate proxy-free routing channels | |
| proxy = None | |
| try: | |
| # Connect directly to the direct DOM scanning sequence | |
| status = await scraper.check_fragment(word, proxy_url=proxy) | |
| if status == "ERROR": | |
| if not hasattr(state, 'fail_counts') or state.fail_counts is None: | |
| state.fail_counts = {} | |
| state.fail_counts[word] = state.fail_counts.get(word, 0) + 1 | |
| # Failsafe 3-Strikes Matrix Boundary | |
| if state.fail_counts[word] >= 3: | |
| log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt") | |
| await state.save_result(word, "error") | |
| else: | |
| # Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation | |
| # This prevents rapid looping over the same unproxied connection | |
| stagger_sleep = random.uniform(3.0, 6.0) | |
| log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...") | |
| await asyncio.sleep(stagger_sleep) | |
| await state.queue.put(word) | |
| elif status: | |
| # Remove from failure cache logs upon successful classification mapping | |
| if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts: | |
| try: del state.fail_counts[word] | |
| except KeyError: pass | |
| await state.save_result(word, status) | |
| except Exception as e: | |
| log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}") | |
| # Failsafe re-enqueue option to ensure data is never lost mid-flight | |
| await state.queue.put(word) | |
| await asyncio.sleep(4) | |
| finally: | |
| # Bulletproof Execution: Always register task completion to keep the event loop balanced | |
| state.queue.task_done() | |
| async def manage_workers(): | |
| global workers_active | |
| while True: | |
| if state.status == "RUNNING" and len(workers_active) < state.concurrency: | |
| needed = state.concurrency - len(workers_active) | |
| current_count = len(workers_active) | |
| for i in range(needed): | |
| t = asyncio.create_task(worker_task(current_count + i)) | |
| workers_active.append(t) | |
| elif state.status == "STOPPED" and workers_active: | |
| for t in workers_active: | |
| t.cancel() | |
| workers_active.clear() | |
| log.info("All concurrent background workers forcefully torn down.") | |
| # Keep tracking configurations active by filtering completed execution futures | |
| workers_active = [t for t in workers_active if not t.done()] | |
| await asyncio.sleep(1) |