Muttered3 commited on
Commit
65b968a
·
verified ·
1 Parent(s): 2418b45

Create worker.py

Browse files
Files changed (1) hide show
  1. worker.py +86 -0
worker.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import time
3
+ import os
4
+ from aiohttp import ClientSession, TCPConnector
5
+ import db
6
+ from scraper import fetch_word
7
+
8
+ async def run_worker(client):
9
+ connector = TCPConnector(limit=100)
10
+ async with ClientSession(connector=connector) as session:
11
+ while True:
12
+ try:
13
+ state = await db.get_state()
14
+ if state.get("paused") == "1":
15
+ await asyncio.sleep(2)
16
+ continue
17
+ if state.get("running") != "1":
18
+ await asyncio.sleep(2)
19
+ continue
20
+
21
+ concurrency = await db.get_concurrency()
22
+ batch_size = min(concurrency, 50)
23
+
24
+ words = []
25
+ for _ in range(batch_size):
26
+ w = await db.pop_word()
27
+ if w: words.append(w)
28
+
29
+ if not words:
30
+ if state.get("running") == "1":
31
+ await db.set_state(running="0")
32
+ admins = [int(x) for x in os.environ.get("ADMIN_IDS", "").split(",") if x]
33
+ for admin in admins:
34
+ try:
35
+ await client.send_message(admin, "✅ Scan complete")
36
+ except Exception:
37
+ pass
38
+ await asyncio.sleep(5)
39
+ continue
40
+
41
+ async def process_one(word):
42
+ status = await fetch_word(session, word)
43
+ await db.mark_done(word, status)
44
+
45
+ await asyncio.gather(*[process_one(w) for w in words])
46
+
47
+ state = await db.get_state()
48
+ processed = int(state.get("processed", 0))
49
+
50
+ if processed > 0 and (processed % 500) < batch_size:
51
+ await send_progress(client)
52
+
53
+ except Exception as e:
54
+ print(f"[WORKER ERROR] {e}")
55
+ await asyncio.sleep(5)
56
+
57
+ async def send_progress(client):
58
+ try:
59
+ state = await db.get_state()
60
+ counts = await db.get_counts()
61
+ processed = int(state.get("processed", 0))
62
+ total = int(state.get("total", 0))
63
+ start_time = float(state.get("start_time", time.time()))
64
+
65
+ pct = (processed / total * 100) if total > 0 else 0
66
+ elapsed = time.time() - start_time
67
+ wps = processed / elapsed if elapsed > 0 else 0
68
+ rem_sec = (total - processed) / wps if wps > 0 else 0
69
+
70
+ h = int(rem_sec // 3600)
71
+ m = int((rem_sec % 3600) // 60)
72
+
73
+ msg = (
74
+ f"⚡ Progress: {processed:,}/{total:,} ({pct:.2f}%)\n"
75
+ f"🔴 Taken: {counts['taken']:,} | 🚫 Unavail: {counts['unavailable']:,}\n"
76
+ f"⏱ Est. remaining: ~{h}h {m}m"
77
+ )
78
+
79
+ admins = [int(x) for x in os.environ.get("ADMIN_IDS", "").split(",") if x]
80
+ for admin in admins:
81
+ try:
82
+ await client.send_message(admin, msg)
83
+ except Exception:
84
+ pass
85
+ except Exception as e:
86
+ print(f"[PROGRESS ERROR] {e}")