Asmita2682hshs commited on
Commit
885c1e6
·
verified ·
1 Parent(s): 8f5a7d3

Upload worker.py

Browse files
Files changed (1) hide show
  1. worker.py +478 -0
worker.py ADDED
@@ -0,0 +1,478 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+ import subprocess
4
+
5
+ def _ensure_dependencies():
6
+ try:
7
+ import redis
8
+ import playwright
9
+ except ImportError:
10
+ print("[-] Missing dependencies. Auto-installing 'redis' and 'playwright'...")
11
+ subprocess.check_call([sys.executable, "-m", "pip", "install", "redis", "playwright"])
12
+ print("[-] Installing Playwright chromium browser...")
13
+ subprocess.check_call([sys.executable, "-m", "playwright", "install", "chromium"])
14
+ print("[+] Installation complete. Restarting worker...")
15
+ os.execv(sys.executable, [sys.executable] + sys.argv)
16
+
17
+ _ensure_dependencies()
18
+
19
+ import json
20
+ import logging
21
+ import signal
22
+ import asyncio
23
+ import uuid
24
+ import time
25
+ import socket
26
+ from typing import Any, Dict, List, Optional
27
+ import redis.asyncio as redis
28
+
29
+ # Playwright imports
30
+ from playwright.async_api import async_playwright, Page, Browser, BrowserContext
31
+
32
+ # -----------------------------------------------------------------------------
33
+ # Configuration
34
+ # -----------------------------------------------------------------------------
35
+ REDIS_HOST = os.getenv("REDIS_HOST", "13.235.87.209")
36
+ REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
37
+ REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
38
+
39
+ LEAD_QUEUE = "lead_queue"
40
+ PROCESSING_QUEUE = "processing_queue"
41
+ RESULT_QUEUE = "result_queue"
42
+ ACCOUNT_POOL_READY = "account_pool:ready"
43
+ ACCOUNT_POOL_COOLDOWN = "account_pool:cooldown"
44
+ ACCOUNT_POOL_DONE = "account_pool:done_for_today"
45
+ RETRY_COUNT_KEY = "retry_count"
46
+
47
+ WORKER_TASKS = int(os.getenv("WORKER_THREADS", "5"))
48
+ BATCH_SIZE = 10
49
+ MAX_DAILY_LEADS = 40
50
+ COOLDOWN_SECONDS = 3600 # 1 hour
51
+ MAX_RETRIES = 3
52
+ POLL_TIMEOUT = 5
53
+
54
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
55
+ logger = logging.getLogger("worker")
56
+
57
+ _shutdown_requested = False
58
+
59
+ # Machine identity (1 machine = 1 worker in dashboard)
60
+ MACHINE_ID = socket.gethostname()
61
+
62
+ # Shared state across all threads on this machine
63
+ _machine_stats = {
64
+ "total_done": 0,
65
+ "total_failed": 0,
66
+ "threads": {}, # task_id -> current_action
67
+ }
68
+ _stats_lock = asyncio.Lock()
69
+
70
+ def _signal_handler(signum, frame):
71
+ global _shutdown_requested
72
+ logger.info(f"Shutdown signal received. Draining worker pool...")
73
+ _shutdown_requested = True
74
+
75
+ signal.signal(signal.SIGINT, _signal_handler)
76
+ signal.signal(signal.SIGTERM, _signal_handler)
77
+
78
+ _redis_pool = None
79
+
80
+ def get_redis_pool():
81
+ global _redis_pool
82
+ if _redis_pool is None:
83
+ _redis_pool = redis.ConnectionPool(
84
+ host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD,
85
+ decode_responses=True, max_connections=50
86
+ )
87
+ return _redis_pool
88
+
89
+ # -----------------------------------------------------------------------------
90
+ # Automation Engine
91
+ # -----------------------------------------------------------------------------
92
+ class AutomationEngine:
93
+ """Robust Playwright automation engine"""
94
+
95
+ def __init__(self):
96
+ self.playwright = None
97
+ self.browser: Optional[Browser] = None
98
+ self.context: Optional[BrowserContext] = None
99
+ self.page: Optional[Page] = None
100
+
101
+ async def start(self, headless: bool = True):
102
+ self.playwright = await async_playwright().start()
103
+ browser_options = {
104
+ "headless": headless,
105
+ "args": [
106
+ "--no-sandbox",
107
+ "--disable-setuid-sandbox",
108
+ "--disable-dev-shm-usage",
109
+ "--disable-accelerated-2d-canvas",
110
+ "--no-first-run",
111
+ "--no-zygote",
112
+ "--single-process",
113
+ "--disable-gpu"
114
+ ]
115
+ }
116
+ self.browser = await self.playwright.chromium.launch(**browser_options)
117
+ self.context = await self.browser.new_context()
118
+ self.page = await self.context.new_page()
119
+ # Abort resource-heavy requests
120
+ await self.page.route("**/*", lambda route: (
121
+ route.abort() if route.request.resource_type in ["image", "font", "media"]
122
+ else route.continue_()
123
+ ))
124
+
125
+ async def stop(self):
126
+ if self.context:
127
+ await self.context.close() # 🔥 important
128
+ if self.browser:
129
+ await self.browser.close()
130
+ if self.playwright:
131
+ await self.playwright.stop()
132
+
133
+ async def login_microsoft(self, email: str, temp_pass: str) -> bool:
134
+ """Login using TAP (Temporary Access Pass) flow"""
135
+ try:
136
+ logger.info(f"Logging in as {email} using TAP")
137
+ login_url = (
138
+ "https://login.microsoftonline.com/common/oauth2/v2.0/authorize?"
139
+ "client_id=4765445b-32c6-49b0-83e6-1d93765276ca&"
140
+ "redirect_uri=https%3A%2F%2Fwww.office.com%2Flandingv2&"
141
+ "response_type=code%20id_token&"
142
+ "scope=openid%20profile%20https%3A%2F%2Fwww.office.com%2Fv2%2FOfficeHome.All&"
143
+ "response_mode=form_post"
144
+ )
145
+ await self.page.goto(login_url, timeout=60000)
146
+
147
+ # Email step
148
+ await self.page.get_by_role("textbox", name="Enter your email, phone, or").fill(email)
149
+ await self.page.get_by_role("button", name="Next").click()
150
+ await asyncio.sleep(2)
151
+
152
+ # TAP step
153
+ tap_field = self.page.get_by_role("textbox", name="Temporary Access Pass")
154
+ await tap_field.fill(temp_pass)
155
+ await self.page.get_by_role("button", name="Sign in").click()
156
+
157
+ # Check for incorrect TAP immediately
158
+ if await self.page.locator('text="Your Temporary Access Pass is incorrect."').is_visible(timeout=5000):
159
+ logger.error(f"Incorrect Temporary Access Pass for {email}")
160
+ return False
161
+
162
+ # Wait a bit longer for the "Keep your account secure" or "Authenticator" page to load
163
+ await asyncio.sleep(4)
164
+
165
+ # Handling potential security prompts
166
+ if await self.page.get_by_role("heading", name="Let's keep your account secure").is_visible(timeout=5000):
167
+ await self.page.get_by_role("button", name="Next").click()
168
+ await asyncio.sleep(2)
169
+
170
+ # Choose different method -> Email (using the button from your HTML snippet)
171
+ try:
172
+ # Wait for the page to settle
173
+ await asyncio.sleep(2)
174
+ await self.page.wait_for_load_state("domcontentloaded", timeout=20000)
175
+
176
+ selector = 'button[data-testid="choose-different-method-link"]:has-text("Set up a different way to sign in")'
177
+ await self.page.wait_for_selector(selector, timeout=15000)
178
+ await self.page.locator(selector).click()
179
+
180
+ # Now click the Email option
181
+ await self.page.get_by_role("button", name="Email Receive a code to reset").click()
182
+ await asyncio.sleep(2)
183
+ except Exception as e:
184
+ logger.warning(f"Could not switch to Email method: {e}")
185
+ return False
186
+
187
+ return True
188
+ except Exception as e:
189
+ logger.error(f"Login failed for {email}: {e}")
190
+ return False
191
+
192
+ async def process_lead(self, email: str) -> tuple[bool, Optional[str]]:
193
+ """Process a single lead OTP flow"""
194
+ try:
195
+ # Click Email method if visible
196
+ email_btn = self.page.locator('button:has-text("Email")')
197
+ if await email_btn.is_visible(timeout=3000):
198
+ await email_btn.click()
199
+
200
+ # Fill lead email
201
+ email_input = self.page.locator('[data-testid="email-input"]')
202
+ await email_input.wait_for(timeout=10000)
203
+ await email_input.fill(email)
204
+ await self.page.locator('[data-testid="reskin-step-next-button"]').click()
205
+
206
+ # Wait for OTP or Error
207
+ await self.page.wait_for_selector(
208
+ '[data-testid="email-verify-challenge-otp-input"], [data-testid="message-bar-error"]',
209
+ timeout=15000
210
+ )
211
+
212
+ if await self.page.locator('[data-testid="message-bar-error"]').is_visible():
213
+ text = await self.page.locator('[data-testid="message-bar-error"]').inner_text()
214
+ if "too many" in text.lower(): return False, "rate_limit"
215
+ return False, text
216
+
217
+ if await self.page.locator('[data-testid="email-verify-challenge-otp-input"]').is_visible():
218
+ await self.page.locator('[data-testid="backButton"]').click()
219
+ logger.info(f"OTP Flow Success: {email}")
220
+ return True, None
221
+
222
+ return False, "otp_not_found"
223
+ except Exception as e:
224
+ return False, str(e)
225
+
226
+
227
+ # -----------------------------------------------------------------------------
228
+ # Worker Async Task Logic
229
+ # -----------------------------------------------------------------------------
230
+ class WorkerTask:
231
+ def __init__(self, task_id: str):
232
+ self.task_id = task_id
233
+ self.client = redis.Redis(connection_pool=get_redis_pool())
234
+ self.leads_done = 0
235
+ self.leads_failed = 0
236
+ self.current_action = "Starting"
237
+
238
+ async def _fetch_batch(self) -> List[Dict[str, Any]]:
239
+ """Fetch a batch of up to BATCH_SIZE using BRPOPLPUSH for the first, then RPOPLPUSH."""
240
+ leads = []
241
+ try:
242
+ # Block for the first item
243
+ raw_lead = await self.client.brpoplpush(LEAD_QUEUE, PROCESSING_QUEUE, timeout=POLL_TIMEOUT)
244
+ if not raw_lead:
245
+ return leads
246
+
247
+ leads.append({"_raw": raw_lead, "data": json.loads(raw_lead)})
248
+
249
+ # Non-blocking fetch for the rest of the batch
250
+ for _ in range(BATCH_SIZE - 1):
251
+ raw = await self.client.rpoplpush(LEAD_QUEUE, PROCESSING_QUEUE)
252
+ if not raw:
253
+ break
254
+ leads.append({"_raw": raw, "data": json.loads(raw)})
255
+
256
+ except Exception as e:
257
+ logger.error(f"Error fetching batch: {e}")
258
+
259
+ return leads
260
+
261
+ async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]]):
262
+ if not leads: return
263
+ try:
264
+ pipe = self.client.pipeline()
265
+ for lead, result in zip(leads, results):
266
+ pipe.lpush(RESULT_QUEUE, json.dumps(result))
267
+ pipe.lrem(PROCESSING_QUEUE, -1, lead["_raw"])
268
+
269
+ lead_id = lead["data"].get("id", str(uuid.uuid4()))
270
+ if result["success"]:
271
+ pipe.hdel(RETRY_COUNT_KEY, lead_id)
272
+
273
+ await pipe.execute()
274
+ except Exception as e:
275
+ logger.error(f"Error in cleanup and push: {e}")
276
+
277
+ async def run(self):
278
+ logger.info(f"Task {self.task_id} started. Batch Size: {BATCH_SIZE}")
279
+
280
+ while not _shutdown_requested:
281
+ # Update shared machine stats (no per-thread Redis write)
282
+ async with _stats_lock:
283
+ _machine_stats["threads"][self.task_id] = self.current_action
284
+ _machine_stats["total_done"] = sum(
285
+ t.leads_done for t in _all_worker_tasks
286
+ )
287
+ _machine_stats["total_failed"] = sum(
288
+ t.leads_failed for t in _all_worker_tasks
289
+ )
290
+
291
+ # 1. Lease Account from Ready Pool
292
+ email = await self.client.spop(ACCOUNT_POOL_READY)
293
+ if not email:
294
+ self.current_action = "Idle"
295
+ logger.debug(f"Task {self.task_id} waiting for accounts in '{ACCOUNT_POOL_READY}'...")
296
+ await asyncio.sleep(5)
297
+ continue
298
+
299
+ acc_key = f"account:{email}"
300
+ acc_pass = await self.client.hget(acc_key, "temp_pass")
301
+
302
+ # Fetch Leads
303
+ self.current_action = "Fetching Leads"
304
+ batch = await self._fetch_batch()
305
+ if not batch:
306
+ # No leads to process, put account back in ready and wait
307
+ await self.client.sadd(ACCOUNT_POOL_READY, email)
308
+ self.current_action = "Idle"
309
+ await asyncio.sleep(1)
310
+ continue
311
+
312
+ results = []
313
+ engine = AutomationEngine()
314
+
315
+ try:
316
+ # Set headless=False so the user can visually debug the Microsoft login flow
317
+ await engine.start(headless=True)
318
+
319
+ # 3. Login
320
+ self.current_action = f"Logging in: {email}"
321
+ login_success = await engine.login_microsoft(email, acc_pass)
322
+
323
+ for item in batch:
324
+ lead_data = item["data"]
325
+ lead_id = lead_data.get("id", str(uuid.uuid4()))
326
+ raw_data = item["_raw"]
327
+ lead_email = lead_data.get("email")
328
+
329
+ self.current_action = f"Processing: {lead_email}"
330
+
331
+ if not login_success:
332
+ # Fail entire batch if login fails
333
+ results.append({
334
+ "lead_id": lead_id,
335
+ "success": False,
336
+ "error": "login_failed",
337
+ "worker_id": self.task_id,
338
+ "row_number": lead_data.get("row_number")
339
+ })
340
+
341
+ # Requeue the raw_data
342
+ pipe = self.client.pipeline()
343
+ pipe.rpush(LEAD_QUEUE, raw_data)
344
+ pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
345
+ await pipe.execute()
346
+ item["_skip"] = True
347
+ continue
348
+
349
+ try:
350
+ retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0)
351
+ start_time = time.time()
352
+
353
+ # Run Automation
354
+ success, err = await engine.process_lead(lead_email)
355
+ elapsed = (time.time() - start_time) * 1000
356
+
357
+ results.append({
358
+ "lead_id": lead_id,
359
+ "success": success,
360
+ "output": {"error": err} if not success else {},
361
+ "error": err if not success else None,
362
+ "processing_time_ms": elapsed,
363
+ "worker_id": self.task_id,
364
+ "row_number": lead_data.get("row_number")
365
+ })
366
+
367
+ if success:
368
+ self.leads_done += 1
369
+ logger.info(f"[+] Successfully sent lead to: {lead_email}")
370
+ else:
371
+ self.leads_failed += 1
372
+ logger.warning(f"[-] Failed to send lead to: {lead_email}. Reason: {err}")
373
+
374
+ if err == "rate_limit":
375
+ logger.warning("Rate limit hit, stopping batch")
376
+ break
377
+
378
+ except Exception as e:
379
+ logger.error(f"Error processing lead {lead_id}: {e}")
380
+ retries += 1
381
+ await self.client.hset(RETRY_COUNT_KEY, lead_id, retries)
382
+
383
+ if retries >= MAX_RETRIES:
384
+ results.append({
385
+ "lead_id": lead_id,
386
+ "success": False,
387
+ "error": str(e),
388
+ "worker_id": self.task_id,
389
+ "row_number": lead_data.get("row_number")
390
+ })
391
+ else:
392
+ pipe = self.client.pipeline()
393
+ pipe.rpush(LEAD_QUEUE, raw_data)
394
+ pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
395
+ await pipe.execute()
396
+ item["_skip"] = True
397
+
398
+ except Exception as e:
399
+ logger.error(f"Engine exception: {e}")
400
+ finally:
401
+ await engine.stop()
402
+
403
+ # 4. Handle Account Routing (Cooldown / Done)
404
+ if not login_success:
405
+ await self.client.sadd("account_pool:pending_refresh", email)
406
+ logger.info(f"Account {email} login failed. Moved to pending_refresh.")
407
+ else:
408
+ # Increment usage
409
+ leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", len(batch))
410
+
411
+ if leads_sent >= MAX_DAILY_LEADS:
412
+ await self.client.sadd(ACCOUNT_POOL_DONE, email)
413
+ logger.info(f"Account {email} hit daily limit ({leads_sent}/{MAX_DAILY_LEADS}). Moved to Done.")
414
+ else:
415
+ now = time.time()
416
+ await self.client.hset(acc_key, "cooldown_until", now + COOLDOWN_SECONDS)
417
+ await self.client.sadd(ACCOUNT_POOL_COOLDOWN, email)
418
+ logger.info(f"Account {email} finished batch ({leads_sent}/{MAX_DAILY_LEADS}). Moved to Cooldown for 1 hour.")
419
+
420
+ final_batch = [b for b in batch if not b.get("_skip")]
421
+ # Filter results for leads actually in final_batch
422
+ valid_ids = {b["data"].get("id") for b in final_batch}
423
+ final_results = [r for r in results if r["lead_id"] in valid_ids]
424
+
425
+ await self._cleanup_and_push_results(final_batch, final_results)
426
+
427
+ # Global list of all WorkerTask instances on this machine
428
+ _all_worker_tasks: list["WorkerTask"] = []
429
+
430
+ async def machine_heartbeat_loop():
431
+ """Single coroutine that sends ONE heartbeat for the whole machine every 5s."""
432
+ client = redis.Redis(connection_pool=get_redis_pool())
433
+ key = f"worker:machine:{MACHINE_ID}"
434
+ while not _shutdown_requested:
435
+ async with _stats_lock:
436
+ done = _machine_stats["total_done"]
437
+ failed = _machine_stats["total_failed"]
438
+ threads = dict(_machine_stats["threads"])
439
+
440
+ total = done + failed
441
+ success_rate = round((done / total * 100), 1) if total > 0 else 0
442
+ busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting"))
443
+
444
+ payload = {
445
+ "id": MACHINE_ID,
446
+ "status": "busy" if busy_count > 0 else "online",
447
+ "active_threads": busy_count,
448
+ "max_threads": WORKER_TASKS,
449
+ "leads_done": done,
450
+ "leads_failed": failed,
451
+ "success_rate": success_rate,
452
+ "current_action": next(
453
+ (a for a in threads.values() if a not in ("Idle", "Starting")),
454
+ "Idle"
455
+ ),
456
+ "version": "2.0.0",
457
+ "last_seen": time.time()
458
+ }
459
+ await client.setex(key, 30, json.dumps(payload))
460
+ await asyncio.sleep(5)
461
+
462
+ async def main():
463
+ logger.info("="*50)
464
+ logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}")
465
+ logger.info("="*50)
466
+
467
+ for i in range(WORKER_TASKS):
468
+ t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}")
469
+ _all_worker_tasks.append(t)
470
+
471
+ task_coroutines = [t.run() for t in _all_worker_tasks]
472
+ await asyncio.gather(machine_heartbeat_loop(), *task_coroutines)
473
+
474
+ if __name__ == "__main__":
475
+ try:
476
+ asyncio.run(main())
477
+ except KeyboardInterrupt:
478
+ logger.info("Worker node shutting down.")