Spaces:
Paused
Paused
File size: 3,874 Bytes
65b968a 6b268d0 356e2ca 6b268d0 65b968a 6b268d0 235f767 6b268d0 356e2ca 9045e3f 235f767 65b968a 235f767 65b968a 235f767 356e2ca 9045e3f 356e2ca 5e0f261 cc64b3c 235f767 cc64b3c 235f767 cc64b3c 235f767 cc64b3c 235f767 6b268d0 235f767 5e0f261 235f767 6b268d0 9045e3f 235f767 9045e3f 6b268d0 356e2ca 6b268d0 5e0f261 235f767 6b268d0 235f767 5e0f261 6b268d0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | import asyncio
import random
from state import state
import scraper
from logger import get_logger
log = get_logger()
workers_active = []
async def worker_task(worker_id: int):
log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.")
await state.init_queue()
while True:
if state.status == "STOPPED":
break
if state.status == "PAUSED":
await asyncio.sleep(1)
continue
try:
# Atomic fetch from asynchronous local memory queues
word = state.queue.get_nowait()
except asyncio.QueueEmpty:
log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.")
break
# Explicitly pass None for proxy allocations to isolate proxy-free routing channels
proxy = None
try:
# Connect directly to the direct DOM scanning sequence
status = await scraper.check_fragment(word, proxy_url=proxy)
if status == "ERROR":
if not hasattr(state, 'fail_counts') or state.fail_counts is None:
state.fail_counts = {}
state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
# Failsafe 3-Strikes Matrix Boundary
if state.fail_counts[word] >= 3:
log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt")
await state.save_result(word, "error")
else:
# Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation
# This prevents rapid looping over the same unproxied connection
stagger_sleep = random.uniform(3.0, 6.0)
log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...")
await asyncio.sleep(stagger_sleep)
await state.queue.put(word)
elif status:
# Remove from failure cache logs upon successful classification mapping
if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
try: del state.fail_counts[word]
except KeyError: pass
await state.save_result(word, status)
except Exception as e:
log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}")
# Failsafe re-enqueue option to ensure data is never lost mid-flight
await state.queue.put(word)
await asyncio.sleep(4)
finally:
# Bulletproof Execution: Always register task completion to keep the event loop balanced
state.queue.task_done()
async def manage_workers():
global workers_active
while True:
if state.status == "RUNNING" and len(workers_active) < state.concurrency:
needed = state.concurrency - len(workers_active)
current_count = len(workers_active)
for i in range(needed):
t = asyncio.create_task(worker_task(current_count + i))
workers_active.append(t)
elif state.status == "STOPPED" and workers_active:
for t in workers_active:
t.cancel()
workers_active.clear()
log.info("All concurrent background workers forcefully torn down.")
# Keep tracking configurations active by filtering completed execution futures
workers_active = [t for t in workers_active if not t.done()]
await asyncio.sleep(1) |