File size: 3,874 Bytes
65b968a
6b268d0
 
356e2ca
6b268d0
65b968a
6b268d0
 
 
 
235f767
6b268d0
356e2ca
9045e3f
235f767
 
 
 
 
 
65b968a
235f767
 
 
 
 
 
65b968a
235f767
 
 
 
 
356e2ca
9045e3f
356e2ca
5e0f261
 
 
cc64b3c
 
235f767
cc64b3c
235f767
cc64b3c
 
235f767
 
 
 
 
 
cc64b3c
235f767
6b268d0
235f767
5e0f261
235f767
 
 
6b268d0
 
9045e3f
235f767
 
 
 
 
 
 
 
9045e3f
6b268d0
 
356e2ca
6b268d0
5e0f261
 
 
 
 
 
 
 
 
 
 
235f767
6b268d0
235f767
5e0f261
6b268d0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import random
from state import state
import scraper
from logger import get_logger

log = get_logger()
workers_active = []

async def worker_task(worker_id: int):
    log.info(f"Worker {worker_id} successfully mapped to runtime thread loop pool.")
    await state.init_queue()
    
    while True:
        if state.status == "STOPPED":
            break
            
        if state.status == "PAUSED":
            await asyncio.sleep(1)
            continue

        try:
            # Atomic fetch from asynchronous local memory queues
            word = state.queue.get_nowait()
        except asyncio.QueueEmpty:
            log.info(f"Worker {worker_id} finished execution. Memory loop registry empty.")
            break

        # Explicitly pass None for proxy allocations to isolate proxy-free routing channels
        proxy = None
        
        try:
            # Connect directly to the direct DOM scanning sequence
            status = await scraper.check_fragment(word, proxy_url=proxy)
            
            if status == "ERROR":
                if not hasattr(state, 'fail_counts') or state.fail_counts is None:
                    state.fail_counts = {}
                    
                state.fail_counts[word] = state.fail_counts.get(word, 0) + 1
                
                # Failsafe 3-Strikes Matrix Boundary
                if state.fail_counts[word] >= 3:
                    log.warning(f"⚠️ 3-Strikes Exhausted for '{word}'. Dumping entry safely to results/error.txt")
                    await state.save_result(word, "error")
                else:
                    # Defensive Pacing: Sleep the worker task BEFORE putting the item back into circulation
                    # This prevents rapid looping over the same unproxied connection
                    stagger_sleep = random.uniform(3.0, 6.0)
                    log.info(f"🔄 Intercepted execution error for '{word}'. Workers cooling loop path for {stagger_sleep:.2f}s...")
                    await asyncio.sleep(stagger_sleep)
                    
                    await state.queue.put(word)
                    
            elif status:
                # Remove from failure cache logs upon successful classification mapping
                if hasattr(state, 'fail_counts') and state.fail_counts and word in state.fail_counts:
                    try: del state.fail_counts[word]
                    except KeyError: pass
                    
                await state.save_result(word, status)
                
        except Exception as e:
            log.error(f"🚨 Critical transaction fault inside Worker {worker_id} processing '{word}': {str(e)}")
            # Failsafe re-enqueue option to ensure data is never lost mid-flight
            await state.queue.put(word)
            await asyncio.sleep(4)
            
        finally:
            # Bulletproof Execution: Always register task completion to keep the event loop balanced
            state.queue.task_done()

async def manage_workers():
    global workers_active
    
    while True:
        if state.status == "RUNNING" and len(workers_active) < state.concurrency:
            needed = state.concurrency - len(workers_active)
            current_count = len(workers_active)
            for i in range(needed):
                t = asyncio.create_task(worker_task(current_count + i))
                workers_active.append(t)
        
        elif state.status == "STOPPED" and workers_active:
            for t in workers_active:
                t.cancel()
            workers_active.clear()
            log.info("All concurrent background workers forcefully torn down.")
            
        # Keep tracking configurations active by filtering completed execution futures
        workers_active = [t for t in workers_active if not t.done()]
        await asyncio.sleep(1)