Asmita2682hshs commited on
Commit
e986bc1
·
verified ·
1 Parent(s): 2fa9362

Upload 4 files

Browse files
Files changed (3) hide show
  1. Dockerfile +1 -1
  2. hf_runner.py +4 -18
  3. worker.py +231 -128
Dockerfile CHANGED
@@ -1,4 +1,4 @@
1
- FROM mcr.microsoft.com/playwright/python:v1.58.0-jammy
2
 
3
  WORKDIR /app
4
 
 
1
+ FROM mcr.microsoft.com/playwright/python:v1.40.0-jammy
2
 
3
  WORKDIR /app
4
 
hf_runner.py CHANGED
@@ -1,6 +1,4 @@
1
  import os
2
- import sys
3
- import time
4
  import subprocess
5
  import threading
6
  from flask import Flask
@@ -12,23 +10,11 @@ def home():
12
  return "Worker is Running! 🚀"
13
 
14
  def run_worker():
15
- """
16
- Worker ko background mein run karta hai.
17
- FIX #1: Auto-restart loop — agar worker crash ho toh 5s baad restart hoga
18
- FIX #2: stdout/stderr properly piped taaki HF logs mein dikhe
19
- """
20
- while True:
21
- print("[hf_runner] Starting worker.py...", flush=True)
22
- result = subprocess.run(
23
- ["python", "-u", "worker.py"], # -u = unbuffered output
24
- stdout=sys.stdout,
25
- stderr=sys.stderr
26
- )
27
- print(f"[hf_runner] Worker exited with code {result.returncode}. Restarting in 5s...", flush=True)
28
- time.sleep(5)
29
 
30
  if __name__ == "__main__":
31
- # Start worker in background thread with auto-restart
32
  threading.Thread(target=run_worker, daemon=True).start()
33
- # Start dummy Flask server for Hugging Face health check (port 7860 required)
34
  app.run(host="0.0.0.0", port=7860)
 
1
  import os
 
 
2
  import subprocess
3
  import threading
4
  from flask import Flask
 
10
  return "Worker is Running! 🚀"
11
 
12
  def run_worker():
13
+ print("Starting Worker...")
14
+ subprocess.run(["python", "worker.py"])
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
  if __name__ == "__main__":
17
+ # Start worker in background
18
  threading.Thread(target=run_worker, daemon=True).start()
19
+ # Start dummy server for Hugging Face health check
20
  app.run(host="0.0.0.0", port=7860)
worker.py CHANGED
@@ -29,25 +29,33 @@ import redis.asyncio as redis
29
  # Playwright imports
30
  from playwright.async_api import async_playwright, Page, Browser, BrowserContext
31
 
 
 
32
  # -----------------------------------------------------------------------------
33
  REDIS_HOST = os.getenv("REDIS_HOST", "13.235.87.209")
34
  REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
35
  REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
36
 
37
- LEAD_QUEUE = "lead_queue"
38
- PROCESSING_QUEUE = "processing_queue"
39
- RESULT_QUEUE = "result_queue"
40
- ACCOUNT_POOL_READY = "account_pool:ready"
41
  ACCOUNT_POOL_COOLDOWN = "account_pool:cooldown"
42
- ACCOUNT_POOL_DONE = "account_pool:done_for_today"
43
- RETRY_COUNT_KEY = "retry_count"
44
-
45
- WORKER_TASKS = int(os.getenv("WORKER_THREADS", "5"))
46
- BATCH_SIZE = 10
47
- MAX_DAILY_LEADS = 40
48
- COOLDOWN_SECONDS = 3600 # 1 hour
49
- MAX_RETRIES = 3
50
- POLL_TIMEOUT = 5
 
 
 
 
 
 
51
 
52
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
53
  logger = logging.getLogger("worker")
@@ -57,48 +65,36 @@ _shutdown_requested = False
57
  # Machine identity (1 machine = 1 worker in dashboard)
58
  MACHINE_ID = socket.gethostname()
59
 
60
- # Shared state across all tasks on this machine
61
  _machine_stats = {
62
  "total_done": 0,
63
  "total_failed": 0,
64
- "threads": {}, # task_id -> current_action
65
  }
66
-
67
- # FIX #7: asyncio.Lock() module level par nahi, lazy init bhi nahi — main() ke andar bano
68
- _stats_lock: asyncio.Lock = None # type: ignore
69
 
70
  def _signal_handler(signum, frame):
71
  global _shutdown_requested
72
  logger.info("Shutdown signal received. Draining worker pool...")
73
  _shutdown_requested = True
74
 
75
- signal.signal(signal.SIGINT, _signal_handler)
76
  signal.signal(signal.SIGTERM, _signal_handler)
77
 
78
- # FIX #7: Redis pool eager init — module level par, race condition nahi hogi
79
- _redis_pool = redis.ConnectionPool(
80
- host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD,
81
- decode_responses=True, max_connections=50
82
- )
83
 
84
  def get_redis_pool():
 
 
 
 
 
 
85
  return _redis_pool
86
 
87
-
88
- # FIX #12: Redis startup connection check
89
- async def check_redis_connection():
90
- """Startup mein Redis reachability check karo"""
91
- client = redis.Redis(connection_pool=get_redis_pool())
92
- try:
93
- await client.ping()
94
- logger.info(f"✅ Redis connected: {REDIS_HOST}:{REDIS_PORT}")
95
- except Exception as e:
96
- logger.critical(f"❌ Cannot connect to Redis at {REDIS_HOST}:{REDIS_PORT} — {e}")
97
- sys.exit(1)
98
- finally:
99
- await client.aclose()
100
-
101
-
102
  class AutomationEngine:
103
  """Robust Playwright automation engine"""
104
 
@@ -126,7 +122,7 @@ class AutomationEngine:
126
  self.browser = await self.playwright.chromium.launch(**browser_options)
127
  self.context = await self.browser.new_context()
128
  self.page = await self.context.new_page()
129
- # Abort resource-heavy requests (images/fonts/media) for speed
130
  await self.page.route("**/*", lambda route: (
131
  route.abort() if route.request.resource_type in ["image", "font", "media"]
132
  else route.continue_()
@@ -169,6 +165,7 @@ class AutomationEngine:
169
  logger.error(f"Incorrect Temporary Access Pass for {email}")
170
  return False
171
 
 
172
  await asyncio.sleep(4)
173
 
174
  # Handling potential security prompts
@@ -185,7 +182,7 @@ class AutomationEngine:
185
  await self.page.wait_for_selector(selector, timeout=15000)
186
  await self.page.locator(selector).click()
187
 
188
- # Click the Email option
189
  await self.page.get_by_role("button", name="Email Receive a code to reset").click()
190
  await asyncio.sleep(2)
191
  except Exception as e:
@@ -233,44 +230,111 @@ class AutomationEngine:
233
  return False, str(e)
234
 
235
 
 
 
 
236
  class WorkerTask:
237
  def __init__(self, task_id: str):
238
- self.task_id = task_id
239
- self.client = redis.Redis(connection_pool=get_redis_pool())
240
- self.leads_done = 0
241
  self.leads_failed = 0
242
  self.current_action = "Starting"
243
 
244
- async def _fetch_batch(self) -> List[Dict[str, Any]]:
245
- """Fetch a batch of up to BATCH_SIZE leads from Redis."""
246
- leads = []
247
- try:
248
- # FIX #8: brpoplpush deprecated use blmove instead (Redis 6.2+)
249
- raw_lead = await self.client.blmove(
250
- LEAD_QUEUE, PROCESSING_QUEUE, "RIGHT", "LEFT", timeout=POLL_TIMEOUT
251
- )
252
- if not raw_lead:
253
- return leads
254
-
255
- leads.append({"_raw": raw_lead, "data": json.loads(raw_lead)})
 
 
 
 
 
 
256
 
257
- # Non-blocking fetch for the rest of the batch
258
- for _ in range(BATCH_SIZE - 1):
259
- raw = await self.client.lmove(LEAD_QUEUE, PROCESSING_QUEUE, "RIGHT", "LEFT")
260
- if not raw:
 
 
 
 
 
 
 
 
 
261
  break
262
- leads.append({"_raw": raw, "data": json.loads(raw)})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
  except Exception as e:
265
- logger.error(f"Error fetching batch: {e}")
 
 
 
266
 
267
- return leads
268
 
269
- async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]]):
270
  if not leads:
271
  return
272
- if len(leads) != len(results):
273
- logger.error(f"Mismatch: {len(leads)} leads but {len(results)} results — some leads may be stuck!")
274
  try:
275
  pipe = self.client.pipeline()
276
  for lead, result in zip(leads, results):
@@ -280,6 +344,13 @@ class WorkerTask:
280
  lead_id = lead["data"].get("id", str(uuid.uuid4()))
281
  if result["success"]:
282
  pipe.hdel(RETRY_COUNT_KEY, lead_id)
 
 
 
 
 
 
 
283
 
284
  await pipe.execute()
285
  except Exception as e:
@@ -292,10 +363,12 @@ class WorkerTask:
292
  # Update shared machine stats
293
  async with _stats_lock:
294
  _machine_stats["threads"][self.task_id] = self.current_action
295
- _machine_stats["total_done"] = sum(t.leads_done for t in _all_worker_tasks)
296
  _machine_stats["total_failed"] = sum(t.leads_failed for t in _all_worker_tasks)
297
 
298
- # 1. Lease Account from Ready Pool
 
 
299
  email = await self.client.spop(ACCOUNT_POOL_READY)
300
  if not email:
301
  self.current_action = "Idle"
@@ -303,65 +376,75 @@ class WorkerTask:
303
  await asyncio.sleep(5)
304
  continue
305
 
306
- acc_key = f"account:{email}"
307
  acc_pass = await self.client.hget(acc_key, "temp_pass")
308
 
309
- # 2. Fetch Leads
 
 
 
 
 
 
 
310
  self.current_action = "Fetching Leads"
311
- batch = await self._fetch_batch()
 
312
  if not batch:
313
- # No leads, return account to ready pool
314
  await self.client.sadd(ACCOUNT_POOL_READY, email)
315
  self.current_action = "Idle"
316
  await asyncio.sleep(1)
317
  continue
318
 
319
- results = []
320
- engine = AutomationEngine()
321
 
322
- # FIX #2: login_success default False PEHLE try block se — UnboundLocalError prevent
323
- login_success = False
324
 
325
  try:
326
  await engine.start(headless=True)
327
 
328
- # 3. Login
 
 
329
  self.current_action = f"Logging in: {email}"
330
  login_success = await engine.login_microsoft(email, acc_pass)
331
 
332
  for item in batch:
333
- lead_data = item["data"]
334
- lead_id = lead_data.get("id", str(uuid.uuid4()))
335
- raw_data = item["_raw"]
336
  lead_email = lead_data.get("email")
337
 
338
  self.current_action = f"Processing: {lead_email}"
339
 
340
  if not login_success:
341
- # Fail entire batch if login fails
 
 
 
 
 
 
342
  results.append({
343
  "lead_id": lead_id,
344
  "success": False,
345
  "error": "login_failed",
346
  "worker_id": self.task_id,
347
- "row_number": lead_data.get("row_number")
 
348
  })
349
-
350
- # Requeue the raw_data
351
- pipe = self.client.pipeline()
352
- pipe.rpush(LEAD_QUEUE, raw_data)
353
- pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
354
- await pipe.execute()
355
  item["_skip"] = True
356
  continue
357
 
358
- # FIX #3: retries default 0 PEHLE — UnboundLocalError prevent
359
- retries = 0
 
360
  try:
361
- retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0)
362
  start_time = time.time()
363
 
364
- # Run Automation
365
  success, err = await engine.process_lead(lead_email)
366
  elapsed = (time.time() - start_time) * 1000
367
 
@@ -372,18 +455,20 @@ class WorkerTask:
372
  "error": err if not success else None,
373
  "processing_time_ms": elapsed,
374
  "worker_id": self.task_id,
375
- "row_number": lead_data.get("row_number")
 
376
  })
377
 
378
  if success:
379
  self.leads_done += 1
380
- logger.info(f"[+] Successfully sent lead to: {lead_email}")
381
  else:
382
  self.leads_failed += 1
383
  logger.warning(f"[-] Failed lead: {lead_email}. Reason: {err}")
384
 
385
  if err == "rate_limit":
386
- logger.warning("Rate limit hit, stopping batch")
 
387
  break
388
 
389
  except Exception as e:
@@ -397,12 +482,15 @@ class WorkerTask:
397
  "success": False,
398
  "error": str(e),
399
  "worker_id": self.task_id,
400
- "row_number": lead_data.get("row_number")
 
401
  })
402
  else:
 
403
  pipe = self.client.pipeline()
404
  pipe.rpush(LEAD_QUEUE, raw_data)
405
  pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
 
406
  await pipe.execute()
407
  item["_skip"] = True
408
 
@@ -411,79 +499,94 @@ class WorkerTask:
411
  finally:
412
  await engine.stop()
413
 
414
- # 4. Handle Account Routing (Cooldown / Done / Pending Refresh)
 
 
415
  if not login_success:
416
  await self.client.sadd("account_pool:pending_refresh", email)
417
- logger.info(f"Account {email} login failed. Moved to pending_refresh.")
418
  else:
419
- # FIX #6: Count only actually processed leads (not skipped)
420
- actual_processed = len([b for b in batch if not b.get("_skip")])
421
- leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", actual_processed)
 
 
 
422
 
423
  if leads_sent >= MAX_DAILY_LEADS:
424
  await self.client.sadd(ACCOUNT_POOL_DONE, email)
425
- logger.info(f"Account {email} hit daily limit ({leads_sent}/{MAX_DAILY_LEADS}). Moved to Done.")
426
  else:
427
  now = time.time()
428
  await self.client.hset(acc_key, "cooldown_until", now + COOLDOWN_SECONDS)
429
  await self.client.sadd(ACCOUNT_POOL_COOLDOWN, email)
430
- logger.info(f"Account {email} on Cooldown ({leads_sent}/{MAX_DAILY_LEADS}). 1 hour rest.")
 
 
 
431
 
432
- final_batch = [b for b in batch if not b.get("_skip")]
433
- valid_ids = {b["data"].get("id") for b in final_batch}
434
- final_results = [r for r in results if r["lead_id"] in valid_ids]
 
 
 
435
 
436
- await self._cleanup_and_push_results(final_batch, final_results)
437
 
438
 
 
439
  _all_worker_tasks: list["WorkerTask"] = []
440
 
441
 
442
  async def machine_heartbeat_loop():
443
- """Send ONE heartbeat per machine every 5s."""
444
  client = redis.Redis(connection_pool=get_redis_pool())
445
- key = f"worker:machine:{MACHINE_ID}"
 
446
  while not _shutdown_requested:
447
  async with _stats_lock:
448
- done = _machine_stats["total_done"]
449
- failed = _machine_stats["total_failed"]
450
  threads = dict(_machine_stats["threads"])
451
 
452
- total = done + failed
453
  success_rate = round((done / total * 100), 1) if total > 0 else 0
454
- busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting"))
455
 
456
  payload = {
457
- "id": MACHINE_ID,
458
- "status": "busy" if busy_count > 0 else "online",
459
  "active_threads": busy_count,
460
- "max_threads": WORKER_TASKS,
461
- "leads_done": done,
462
- "leads_failed": failed,
463
- "success_rate": success_rate,
464
  "current_action": next(
465
  (a for a in threads.values() if a not in ("Idle", "Starting")),
466
  "Idle"
467
  ),
468
- "version": "2.1.0",
469
- "last_seen": time.time()
470
  }
471
  await client.setex(key, 30, json.dumps(payload))
472
  await asyncio.sleep(5)
473
 
474
 
475
  async def main():
476
- global _stats_lock
477
-
478
- # FIX #13: asyncio.Lock() event loop ke andar banao, module level par nahi
479
- _stats_lock = asyncio.Lock()
480
-
481
  logger.info("=" * 50)
482
  logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}")
483
  logger.info("=" * 50)
484
 
485
- # FIX #12: Redis connection check at startup
486
- await check_redis_connection()
 
 
 
 
 
 
 
487
 
488
  for i in range(WORKER_TASKS):
489
  t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}")
 
29
  # Playwright imports
30
  from playwright.async_api import async_playwright, Page, Browser, BrowserContext
31
 
32
+ # -----------------------------------------------------------------------------
33
+ # Configuration
34
  # -----------------------------------------------------------------------------
35
  REDIS_HOST = os.getenv("REDIS_HOST", "13.235.87.209")
36
  REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
37
  REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
38
 
39
+ LEAD_QUEUE = "lead_queue"
40
+ PROCESSING_QUEUE = "processing_queue"
41
+ RESULT_QUEUE = "result_queue"
42
+ ACCOUNT_POOL_READY = "account_pool:ready"
43
  ACCOUNT_POOL_COOLDOWN = "account_pool:cooldown"
44
+ ACCOUNT_POOL_DONE = "account_pool:done_for_today"
45
+ RETRY_COUNT_KEY = "retry_count"
46
+
47
+ # Key that stores a SET of lead emails that are currently in-flight or already done.
48
+ # This is the deduplication guard: a lead email is added here before processing
49
+ # and removed only if it fails (so it can be retried). If it succeeds it stays
50
+ # forever (within the day) so it cannot be sent again.
51
+ SENT_LEADS_SET = "leads:sent_today"
52
+
53
+ WORKER_TASKS = int(os.getenv("WORKER_THREADS", "5"))
54
+ BATCH_SIZE = 10 # leads per account session
55
+ MAX_DAILY_LEADS = 40 # max leads per account per day (4 batches × 10)
56
+ COOLDOWN_SECONDS = 3600 # 1 hour between batches
57
+ MAX_RETRIES = 3
58
+ POLL_TIMEOUT = 5 # seconds to block on BRPOPLPUSH
59
 
60
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
61
  logger = logging.getLogger("worker")
 
65
  # Machine identity (1 machine = 1 worker in dashboard)
66
  MACHINE_ID = socket.gethostname()
67
 
68
+ # Shared stats across all coroutines on this machine
69
  _machine_stats = {
70
  "total_done": 0,
71
  "total_failed": 0,
72
+ "threads": {}, # task_id -> current_action
73
  }
74
+ _stats_lock = asyncio.Lock()
 
 
75
 
76
  def _signal_handler(signum, frame):
77
  global _shutdown_requested
78
  logger.info("Shutdown signal received. Draining worker pool...")
79
  _shutdown_requested = True
80
 
81
+ signal.signal(signal.SIGINT, _signal_handler)
82
  signal.signal(signal.SIGTERM, _signal_handler)
83
 
84
+ _redis_pool = None
 
 
 
 
85
 
86
  def get_redis_pool():
87
+ global _redis_pool
88
+ if _redis_pool is None:
89
+ _redis_pool = redis.ConnectionPool(
90
+ host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD,
91
+ decode_responses=True, max_connections=50
92
+ )
93
  return _redis_pool
94
 
95
+ # -----------------------------------------------------------------------------
96
+ # Automation Engine
97
+ # -----------------------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
98
  class AutomationEngine:
99
  """Robust Playwright automation engine"""
100
 
 
122
  self.browser = await self.playwright.chromium.launch(**browser_options)
123
  self.context = await self.browser.new_context()
124
  self.page = await self.context.new_page()
125
+ # Abort resource-heavy requests
126
  await self.page.route("**/*", lambda route: (
127
  route.abort() if route.request.resource_type in ["image", "font", "media"]
128
  else route.continue_()
 
165
  logger.error(f"Incorrect Temporary Access Pass for {email}")
166
  return False
167
 
168
+ # Wait for "Keep your account secure" or "Authenticator" page to load
169
  await asyncio.sleep(4)
170
 
171
  # Handling potential security prompts
 
182
  await self.page.wait_for_selector(selector, timeout=15000)
183
  await self.page.locator(selector).click()
184
 
185
+ # Now click the Email option
186
  await self.page.get_by_role("button", name="Email Receive a code to reset").click()
187
  await asyncio.sleep(2)
188
  except Exception as e:
 
230
  return False, str(e)
231
 
232
 
233
+ # -----------------------------------------------------------------------------
234
+ # Worker Async Task Logic
235
+ # -----------------------------------------------------------------------------
236
  class WorkerTask:
237
  def __init__(self, task_id: str):
238
+ self.task_id = task_id
239
+ self.client = redis.Redis(connection_pool=get_redis_pool())
240
+ self.leads_done = 0
241
  self.leads_failed = 0
242
  self.current_action = "Starting"
243
 
244
+ # ------------------------------------------------------------------
245
+ # Fetch exactly BATCH_SIZE UNIQUE leads from the queue.
246
+ # Uses a Redis SET (SENT_LEADS_SET) to skip leads already processed
247
+ # today. Each lead that passes the dedup check is atomically claimed
248
+ # via SETNX so no two threads can process the same email at once.
249
+ # ------------------------------------------------------------------
250
+ async def _fetch_unique_batch(self, account_email: str) -> List[Dict[str, Any]]:
251
+ """
252
+ Pop up to BATCH_SIZE × 3 items from the queue, de-duplicate against
253
+ SENT_LEADS_SET and claim them. Returns at most BATCH_SIZE leads.
254
+
255
+ Claimed leads are added to SENT_LEADS_SET immediately so concurrent
256
+ threads cannot pick them up. Skipped duplicates are discarded (not
257
+ re-queued — they were already processed or in-flight).
258
+ """
259
+ unique_leads = []
260
+ seen_emails = set() # within this fetch call
261
+ skipped = 0
262
 
263
+ try:
264
+ # Block for the FIRST item
265
+ raw = await self.client.brpoplpush(LEAD_QUEUE, PROCESSING_QUEUE, timeout=POLL_TIMEOUT)
266
+ if not raw:
267
+ return []
268
+
269
+ candidates = [raw]
270
+
271
+ # Non-blocking fetch of extra candidates so we can fill a full batch
272
+ # even after dedup removes some items.
273
+ for _ in range(BATCH_SIZE * 3 - 1):
274
+ extra = await self.client.rpoplpush(LEAD_QUEUE, PROCESSING_QUEUE)
275
+ if not extra:
276
  break
277
+ candidates.append(extra)
278
+
279
+ for raw_lead in candidates:
280
+ if len(unique_leads) >= BATCH_SIZE:
281
+ # Put unused extras back to the front of lead_queue
282
+ await self.client.rpush(LEAD_QUEUE, raw_lead)
283
+ await self.client.lrem(PROCESSING_QUEUE, -1, raw_lead)
284
+ continue
285
+
286
+ try:
287
+ data = json.loads(raw_lead)
288
+ except Exception:
289
+ # Malformed JSON — discard
290
+ await self.client.lrem(PROCESSING_QUEUE, -1, raw_lead)
291
+ continue
292
+
293
+ lead_email = data.get("email", "").strip().lower()
294
+ if not lead_email:
295
+ await self.client.lrem(PROCESSING_QUEUE, -1, raw_lead)
296
+ continue
297
+
298
+ # --- Deduplication ---
299
+ # 1. Already processed in this fetch call?
300
+ if lead_email in seen_emails:
301
+ skipped += 1
302
+ await self.client.lrem(PROCESSING_QUEUE, -1, raw_lead)
303
+ continue
304
+
305
+ used_key = f"used:{account_email}"
306
+
307
+ # 2. Already sent by this account?
308
+ already_sent = await self.client.sismember(used_key, lead_email)
309
+ if already_sent:
310
+ skipped += 1
311
+ logger.debug(f"Skipping lead {lead_email} - already used by {account_email}")
312
+ await self.client.lrem(PROCESSING_QUEUE, -1, raw_lead)
313
+ continue
314
+
315
+ # Claim this lead for this account
316
+ claimed = await self.client.sadd(used_key, lead_email)
317
+ if not claimed:
318
+ skipped += 1
319
+ await self.client.lrem(PROCESSING_QUEUE, -1, raw_lead)
320
+ continue
321
+
322
+ # Normalize email stored in data for consistency
323
+ data["email"] = lead_email
324
+ seen_emails.add(lead_email)
325
+ unique_leads.append({"_raw": raw_lead, "data": data})
326
 
327
  except Exception as e:
328
+ logger.error(f"Error fetching unique batch: {e}")
329
+
330
+ if skipped:
331
+ logger.info(f"[Dedup] Skipped {skipped} duplicate leads in this batch fetch.")
332
 
333
+ return unique_leads
334
 
335
+ async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]], account_email: str):
336
  if not leads:
337
  return
 
 
338
  try:
339
  pipe = self.client.pipeline()
340
  for lead, result in zip(leads, results):
 
344
  lead_id = lead["data"].get("id", str(uuid.uuid4()))
345
  if result["success"]:
346
  pipe.hdel(RETRY_COUNT_KEY, lead_id)
347
+ else:
348
+ # On failure, un-claim from SENT_LEADS_SET so it can be
349
+ # retried next time (unless max retries exceeded).
350
+ retries = int(result.get("_retries", 0))
351
+ if retries < MAX_RETRIES:
352
+ lead_email = lead["data"].get("email", "")
353
+ pipe.srem(f"used:{account_email}", lead_email)
354
 
355
  await pipe.execute()
356
  except Exception as e:
 
363
  # Update shared machine stats
364
  async with _stats_lock:
365
  _machine_stats["threads"][self.task_id] = self.current_action
366
+ _machine_stats["total_done"] = sum(t.leads_done for t in _all_worker_tasks)
367
  _machine_stats["total_failed"] = sum(t.leads_failed for t in _all_worker_tasks)
368
 
369
+ # ----------------------------------------------------------
370
+ # 1. Lease ONE account from the Ready pool (atomic SPOP)
371
+ # ----------------------------------------------------------
372
  email = await self.client.spop(ACCOUNT_POOL_READY)
373
  if not email:
374
  self.current_action = "Idle"
 
376
  await asyncio.sleep(5)
377
  continue
378
 
379
+ acc_key = f"account:{email}"
380
  acc_pass = await self.client.hget(acc_key, "temp_pass")
381
 
382
+ if not acc_pass:
383
+ logger.warning(f"Account {email} has no temp_pass. Moving to pending_refresh.")
384
+ await self.client.sadd("account_pool:pending_refresh", email)
385
+ continue
386
+
387
+ # ----------------------------------------------------------
388
+ # 2. Fetch up to 10 UNIQUE leads
389
+ # ----------------------------------------------------------
390
  self.current_action = "Fetching Leads"
391
+ batch = await self._fetch_unique_batch(email)
392
+
393
  if not batch:
394
+ # No leads return account to ready pool and wait
395
  await self.client.sadd(ACCOUNT_POOL_READY, email)
396
  self.current_action = "Idle"
397
  await asyncio.sleep(1)
398
  continue
399
 
400
+ logger.info(f"Task {self.task_id}: Account={email}, Leads in batch={len(batch)}")
 
401
 
402
+ results = []
403
+ engine = AutomationEngine()
404
 
405
  try:
406
  await engine.start(headless=True)
407
 
408
+ # ----------------------------------------------------------
409
+ # 3. Login once per account session
410
+ # ----------------------------------------------------------
411
  self.current_action = f"Logging in: {email}"
412
  login_success = await engine.login_microsoft(email, acc_pass)
413
 
414
  for item in batch:
415
+ lead_data = item["data"]
416
+ lead_id = lead_data.get("id", str(uuid.uuid4()))
417
+ raw_data = item["_raw"]
418
  lead_email = lead_data.get("email")
419
 
420
  self.current_action = f"Processing: {lead_email}"
421
 
422
  if not login_success:
423
+ # Login failed requeue lead and unclaim from sent set
424
+ pipe = self.client.pipeline()
425
+ pipe.rpush(LEAD_QUEUE, raw_data)
426
+ pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
427
+ pipe.srem(f"used:{email}", lead_email) # ← un-claim so it can be retried
428
+ await pipe.execute()
429
+
430
  results.append({
431
  "lead_id": lead_id,
432
  "success": False,
433
  "error": "login_failed",
434
  "worker_id": self.task_id,
435
+ "row_number": lead_data.get("row_number"),
436
+ "_skip": True,
437
  })
 
 
 
 
 
 
438
  item["_skip"] = True
439
  continue
440
 
441
+ # --------------------------------------------------
442
+ # 4. Process each lead
443
+ # --------------------------------------------------
444
  try:
445
+ retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0)
446
  start_time = time.time()
447
 
 
448
  success, err = await engine.process_lead(lead_email)
449
  elapsed = (time.time() - start_time) * 1000
450
 
 
455
  "error": err if not success else None,
456
  "processing_time_ms": elapsed,
457
  "worker_id": self.task_id,
458
+ "row_number": lead_data.get("row_number"),
459
+ "_retries": retries,
460
  })
461
 
462
  if success:
463
  self.leads_done += 1
464
+ logger.info(f"[+] Sent lead: {lead_email} via {email}")
465
  else:
466
  self.leads_failed += 1
467
  logger.warning(f"[-] Failed lead: {lead_email}. Reason: {err}")
468
 
469
  if err == "rate_limit":
470
+ logger.warning("Rate limit hit stopping batch early.")
471
+ # Un-claim remaining batch items that weren't processed
472
  break
473
 
474
  except Exception as e:
 
482
  "success": False,
483
  "error": str(e),
484
  "worker_id": self.task_id,
485
+ "row_number": lead_data.get("row_number"),
486
+ "_retries": retries,
487
  })
488
  else:
489
+ # Requeue and un-claim
490
  pipe = self.client.pipeline()
491
  pipe.rpush(LEAD_QUEUE, raw_data)
492
  pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
493
+ pipe.srem(f"used:{email}", lead_email)
494
  await pipe.execute()
495
  item["_skip"] = True
496
 
 
499
  finally:
500
  await engine.stop()
501
 
502
+ # ----------------------------------------------------------
503
+ # 5. Account routing after batch
504
+ # ----------------------------------------------------------
505
  if not login_success:
506
  await self.client.sadd("account_pool:pending_refresh", email)
507
+ logger.info(f"Account {email} login failed pending_refresh.")
508
  else:
509
+ # Count only leads that were actually attempted (not skipped/requeued)
510
+ actually_sent = sum(
511
+ 1 for r in results
512
+ if not r.get("_skip") and r["success"]
513
+ )
514
+ leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", actually_sent)
515
 
516
  if leads_sent >= MAX_DAILY_LEADS:
517
  await self.client.sadd(ACCOUNT_POOL_DONE, email)
518
+ logger.info(f"Account {email} hit daily limit ({leads_sent}/{MAX_DAILY_LEADS}) Done.")
519
  else:
520
  now = time.time()
521
  await self.client.hset(acc_key, "cooldown_until", now + COOLDOWN_SECONDS)
522
  await self.client.sadd(ACCOUNT_POOL_COOLDOWN, email)
523
+ logger.info(
524
+ f"Account {email} batch done ({leads_sent}/{MAX_DAILY_LEADS}) "
525
+ f"→ Cooldown 1hr. Next batch in ~1 hour."
526
+ )
527
 
528
+ # ----------------------------------------------------------
529
+ # 6. Push results to result_queue and clean processing_queue
530
+ # ----------------------------------------------------------
531
+ final_batch = [b for b in batch if not b.get("_skip")]
532
+ valid_ids = {b["data"].get("id") for b in final_batch}
533
+ final_results = [r for r in results if r.get("lead_id") in valid_ids and not r.get("_skip")]
534
 
535
+ await self._cleanup_and_push_results(final_batch, final_results, email)
536
 
537
 
538
+ # Global list of all WorkerTask instances on this machine
539
  _all_worker_tasks: list["WorkerTask"] = []
540
 
541
 
542
  async def machine_heartbeat_loop():
543
+ """Sends ONE heartbeat key for the whole machine every 5s."""
544
  client = redis.Redis(connection_pool=get_redis_pool())
545
+ key = f"worker:machine:{MACHINE_ID}"
546
+
547
  while not _shutdown_requested:
548
  async with _stats_lock:
549
+ done = _machine_stats["total_done"]
550
+ failed = _machine_stats["total_failed"]
551
  threads = dict(_machine_stats["threads"])
552
 
553
+ total = done + failed
554
  success_rate = round((done / total * 100), 1) if total > 0 else 0
555
+ busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting"))
556
 
557
  payload = {
558
+ "id": MACHINE_ID,
559
+ "status": "busy" if busy_count > 0 else "online",
560
  "active_threads": busy_count,
561
+ "max_threads": WORKER_TASKS,
562
+ "leads_done": done,
563
+ "leads_failed": failed,
564
+ "success_rate": success_rate,
565
  "current_action": next(
566
  (a for a in threads.values() if a not in ("Idle", "Starting")),
567
  "Idle"
568
  ),
569
+ "version": "3.0.0",
570
+ "last_seen": time.time(),
571
  }
572
  await client.setex(key, 30, json.dumps(payload))
573
  await asyncio.sleep(5)
574
 
575
 
576
  async def main():
 
 
 
 
 
577
  logger.info("=" * 50)
578
  logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}")
579
  logger.info("=" * 50)
580
 
581
+ # Set SENT_LEADS_SET to expire at end of day so it auto-resets
582
+ # (The account_manager midnight reset also handles this via leads_sent_today)
583
+ client = redis.Redis(connection_pool=get_redis_pool())
584
+ import datetime
585
+ now = datetime.datetime.now(datetime.timezone.utc)
586
+ tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1)
587
+ ttl = int((tomorrow - now).total_seconds())
588
+ await client.expire(SENT_LEADS_SET, ttl)
589
+ logger.info(f"SENT_LEADS_SET TTL set to {ttl}s (expires at UTC midnight).")
590
 
591
  for i in range(WORKER_TASKS):
592
  t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}")