Spaces:
Paused
Paused
File size: 3,062 Bytes
65b968a 6b268d0 356e2ca 6b268d0 65b968a 6b268d0 356e2ca 9045e3f 5e0f261 6b268d0 9045e3f 65b968a 6b268d0 5e0f261 65b968a 5e0f261 6b268d0 5e0f261 356e2ca 9045e3f 356e2ca 5e0f261 cc64b3c 5e0f261 cc64b3c 5e0f261 6b268d0 5e0f261 6b268d0 9045e3f 5e0f261 356e2ca 9045e3f 6b268d0 356e2ca 6b268d0 5e0f261 6b268d0 5e0f261 6b268d0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | import asyncio
import random
from state import state
import scraper
from logger import get_logger
log = get_logger()
workers_active = []
async def worker_task(worker_id: int):
log.info(f"Worker {worker_id} started.")
await state.init_queue()
while True:
try:
# Safe boundary checks for structural state execution
if state.status == "STOPPED":
break
if state.status == "PAUSED":
await asyncio.sleep(1)
continue
try:
word = state.queue.get_nowait()
except asyncio.QueueEmpty:
log.info(f"Worker {worker_id} finished. Queue empty.")
break
# Handle rotating proxy allocation safely
proxy = random.choice(state.proxies) if state.proxies else None
# Execute AJAX search extraction
status = await scraper.check_fragment(word, proxy_url=proxy)
if status == "ERROR":
# CRITICAL FIX: Ensure property array initialization inside the thread loop space
if not hasattr(state, 'fail_counts') or state.fail_counts is None:
state.fail_counts = {}
state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
# Failsafe 3-Strikes Adaptive Moving Sequence
if state.fail_counts[word] >= 3:
await state.save_result(word, "error")
else:
await state.queue.put(word)
# Dynamic staggered delay before recycling word to prevent loop collisions
await asyncio.sleep(random.uniform(1.5, 3.5))
elif status:
# Remove word from failure map tracking on successful evaluation
if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
del state.fail_counts[word]
await state.save_result(word, status)
state.queue.task_done()
except Exception as e:
log.error(f"⚠️ Worker {worker_id} safely intercepted loop exception: {str(e)}")
await asyncio.sleep(2)
async def manage_workers():
global workers_active
while True:
if state.status == "RUNNING" and len(workers_active) < state.concurrency:
needed = state.concurrency - len(workers_active)
current_count = len(workers_active)
for i in range(needed):
t = asyncio.create_task(worker_task(current_count + i))
workers_active.append(t)
elif state.status == "STOPPED" and workers_active:
for t in workers_active:
t.cancel()
workers_active.clear()
log.info("All workers forcefully stopped.")
workers_active = [t for t in workers_active if not t.done()]
await asyncio.sleep(1) |