File size: 3,062 Bytes
65b968a
6b268d0
 
356e2ca
6b268d0
65b968a
6b268d0
 
 
 
 
 
356e2ca
9045e3f
 
5e0f261
6b268d0
 
 
 
 
9045e3f
65b968a
6b268d0
5e0f261
 
 
 
65b968a
5e0f261
6b268d0
 
5e0f261
356e2ca
9045e3f
356e2ca
5e0f261
 
 
 
cc64b3c
 
5e0f261
cc64b3c
 
 
 
5e0f261
 
6b268d0
5e0f261
 
 
6b268d0
 
 
9045e3f
 
5e0f261
356e2ca
9045e3f
6b268d0
 
356e2ca
6b268d0
5e0f261
 
 
 
 
 
 
 
 
 
 
 
6b268d0
5e0f261
6b268d0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import asyncio
import random
from state import state
import scraper
from logger import get_logger

log = get_logger()
workers_active = []

async def worker_task(worker_id: int):
    log.info(f"Worker {worker_id} started.")
    await state.init_queue()
    
    while True:
        try:
            # Safe boundary checks for structural state execution
            if state.status == "STOPPED":
                break
                
            if state.status == "PAUSED":
                await asyncio.sleep(1)
                continue

            try:
                word = state.queue.get_nowait()
            except asyncio.QueueEmpty:
                log.info(f"Worker {worker_id} finished. Queue empty.")
                break

            # Handle rotating proxy allocation safely
            proxy = random.choice(state.proxies) if state.proxies else None
            
            # Execute AJAX search extraction
            status = await scraper.check_fragment(word, proxy_url=proxy)
            
            if status == "ERROR":
                # CRITICAL FIX: Ensure property array initialization inside the thread loop space
                if not hasattr(state, 'fail_counts') or state.fail_counts is None:
                    state.fail_counts = {}
                    
                state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
                
                # Failsafe 3-Strikes Adaptive Moving Sequence
                if state.fail_counts[word] >= 3:
                    await state.save_result(word, "error")
                else:
                    await state.queue.put(word)
                    # Dynamic staggered delay before recycling word to prevent loop collisions
                    await asyncio.sleep(random.uniform(1.5, 3.5))
            elif status:
                # Remove word from failure map tracking on successful evaluation
                if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
                    del state.fail_counts[word]
                await state.save_result(word, status)
                
            state.queue.task_done()

        except Exception as e:
            log.error(f"⚠️ Worker {worker_id} safely intercepted loop exception: {str(e)}")
            await asyncio.sleep(2)

async def manage_workers():
    global workers_active
    
    while True:
        if state.status == "RUNNING" and len(workers_active) < state.concurrency:
            needed = state.concurrency - len(workers_active)
            current_count = len(workers_active)
            for i in range(needed):
                t = asyncio.create_task(worker_task(current_count + i))
                workers_active.append(t)
        
        elif state.status == "STOPPED" and workers_active:
            for t in workers_active:
                t.cancel()
            workers_active.clear()
            log.info("All workers forcefully stopped.")
            
        workers_active = [t for t in workers_active if not t.done()]
        await asyncio.sleep(1)