Muttered3 commited on
Commit
6b268d0
·
verified ·
1 Parent(s): 4b0add0

Update worker.py

Browse files
Files changed (1) hide show
  1. worker.py +54 -31
worker.py CHANGED
@@ -1,50 +1,73 @@
1
  import asyncio
2
- import db
 
3
  import scraper
4
- import traceback
5
 
6
- async def start_worker(worker_id: int):
7
- print(f"⚙️ Worker {worker_id} initialized.")
8
- r = await db.get_redis()
 
 
 
9
 
10
  while True:
11
  try:
12
- state = await db.get_state()
13
- if state.get("running") != "1" or state.get("paused") == "1":
14
- await asyncio.sleep(2)
 
 
 
 
15
  continue
16
 
17
- word = await r.rpop("frag:queue")
18
- if not word:
19
- await asyncio.sleep(2)
20
- continue
 
 
21
 
22
- proxy = await db.get_random_proxy()
 
 
 
23
  status = await scraper.check_fragment(word, proxy_url=proxy)
24
 
25
  if status == "ERROR":
26
- await r.lpush("frag:queue", word)
 
27
  await asyncio.sleep(1.5)
28
- continue
29
-
30
- if status:
31
- await db.mark_done(word, status)
32
 
33
  except Exception as e:
34
- print(f"Worker {worker_id} Error: {str(e)}")
35
- traceback.print_exc()
36
  await asyncio.sleep(2)
37
 
38
- async def run_worker():
39
- print("🚀 Sniper Engine Starting...")
40
- concurrency = await db.get_concurrency()
41
 
42
- tasks = []
43
- for i in range(concurrency):
44
- task = asyncio.create_task(start_worker(i))
45
- tasks.append(task)
 
 
 
46
 
47
- await asyncio.gather(*tasks, return_exceptions=True)
48
-
49
- if __name__ == "__main__":
50
- asyncio.run(run_worker())
 
 
 
 
 
 
 
 
1
  import asyncio
2
+ import random
3
+ from state import state
4
  import scraper
5
+ from logger import get_logger
6
 
7
+ log = get_logger()
8
+ workers_active = []
9
+
10
+ async def worker_task(worker_id: int):
11
+ log.info(f"Worker {worker_id} started.")
12
+ await state.init_queue()
13
 
14
  while True:
15
  try:
16
+ # Handle STOP state
17
+ if state.status == "STOPPED":
18
+ break
19
+
20
+ # Handle PAUSE state
21
+ if state.status == "PAUSED":
22
+ await asyncio.sleep(1)
23
  continue
24
 
25
+ # Try to get a word from the in-memory queue
26
+ try:
27
+ word = state.queue.get_nowait()
28
+ except asyncio.QueueEmpty:
29
+ log.info(f"Worker {worker_id} finished. Queue empty.")
30
+ break
31
 
32
+ # Pick a proxy if available
33
+ proxy = random.choice(state.proxies) if state.proxies else None
34
+
35
+ # Execute scrape
36
  status = await scraper.check_fragment(word, proxy_url=proxy)
37
 
38
  if status == "ERROR":
39
+ # Put it back to try again later
40
+ await state.queue.put(word)
41
  await asyncio.sleep(1.5)
42
+ elif status:
43
+ await state.save_result(word, status)
44
+
45
+ state.queue.task_done()
46
 
47
  except Exception as e:
48
+ log.exception(f"Worker {worker_id} crashed:")
 
49
  await asyncio.sleep(2)
50
 
51
+ async def manage_workers():
52
+ """Monitors state and spins up workers as needed."""
53
+ global workers_active
54
 
55
+ while True:
56
+ if state.status == "RUNNING" and len(workers_active) < state.concurrency:
57
+ # Spin up missing workers
58
+ needed = state.concurrency - len(workers_active)
59
+ for i in range(needed):
60
+ t = asyncio.create_task(worker_task(len(workers_active) + i))
61
+ workers_active.append(t)
62
 
63
+ elif state.status == "STOPPED" and workers_active:
64
+ # Cancel all workers if stopped
65
+ for t in workers_active:
66
+ t.cancel()
67
+ workers_active.clear()
68
+ log.info("All workers forcefully stopped.")
69
+
70
+ # Clean up finished tasks from the active list
71
+ workers_active = [t for t in workers_active if not t.done()]
72
+
73
+ await asyncio.sleep(1)