Asmita2682hshs commited on
Commit
2fa9362
·
verified ·
1 Parent(s): 294e0d7

Upload 5 files

Browse files
Files changed (2) hide show
  1. hf_runner.py +18 -4
  2. worker.py +118 -96
hf_runner.py CHANGED
@@ -1,4 +1,6 @@
1
  import os
 
 
2
  import subprocess
3
  import threading
4
  from flask import Flask
@@ -10,11 +12,23 @@ def home():
10
  return "Worker is Running! 🚀"
11
 
12
  def run_worker():
13
- print("Starting Worker...")
14
- subprocess.run(["python", "worker.py"])
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
  if __name__ == "__main__":
17
- # Start worker in background
18
  threading.Thread(target=run_worker, daemon=True).start()
19
- # Start dummy server for Hugging Face health check
20
  app.run(host="0.0.0.0", port=7860)
 
1
  import os
2
+ import sys
3
+ import time
4
  import subprocess
5
  import threading
6
  from flask import Flask
 
12
  return "Worker is Running! 🚀"
13
 
14
  def run_worker():
15
+ """
16
+ Worker ko background mein run karta hai.
17
+ FIX #1: Auto-restart loop — agar worker crash ho toh 5s baad restart hoga
18
+ FIX #2: stdout/stderr properly piped taaki HF logs mein dikhe
19
+ """
20
+ while True:
21
+ print("[hf_runner] Starting worker.py...", flush=True)
22
+ result = subprocess.run(
23
+ ["python", "-u", "worker.py"], # -u = unbuffered output
24
+ stdout=sys.stdout,
25
+ stderr=sys.stderr
26
+ )
27
+ print(f"[hf_runner] Worker exited with code {result.returncode}. Restarting in 5s...", flush=True)
28
+ time.sleep(5)
29
 
30
  if __name__ == "__main__":
31
+ # Start worker in background thread with auto-restart
32
  threading.Thread(target=run_worker, daemon=True).start()
33
+ # Start dummy Flask server for Hugging Face health check (port 7860 required)
34
  app.run(host="0.0.0.0", port=7860)
worker.py CHANGED
@@ -29,8 +29,6 @@ import redis.asyncio as redis
29
  # Playwright imports
30
  from playwright.async_api import async_playwright, Page, Browser, BrowserContext
31
 
32
- # -----------------------------------------------------------------------------
33
- # Configuration
34
  # -----------------------------------------------------------------------------
35
  REDIS_HOST = os.getenv("REDIS_HOST", "13.235.87.209")
36
  REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
@@ -47,7 +45,7 @@ RETRY_COUNT_KEY = "retry_count"
47
  WORKER_TASKS = int(os.getenv("WORKER_THREADS", "5"))
48
  BATCH_SIZE = 10
49
  MAX_DAILY_LEADS = 40
50
- COOLDOWN_SECONDS = 3600 # 1 hour
51
  MAX_RETRIES = 3
52
  POLL_TIMEOUT = 5
53
 
@@ -59,36 +57,48 @@ _shutdown_requested = False
59
  # Machine identity (1 machine = 1 worker in dashboard)
60
  MACHINE_ID = socket.gethostname()
61
 
62
- # Shared state across all threads on this machine
63
  _machine_stats = {
64
  "total_done": 0,
65
  "total_failed": 0,
66
  "threads": {}, # task_id -> current_action
67
  }
68
- _stats_lock = asyncio.Lock()
 
 
69
 
70
  def _signal_handler(signum, frame):
71
  global _shutdown_requested
72
- logger.info(f"Shutdown signal received. Draining worker pool...")
73
  _shutdown_requested = True
74
 
75
  signal.signal(signal.SIGINT, _signal_handler)
76
  signal.signal(signal.SIGTERM, _signal_handler)
77
 
78
- _redis_pool = None
 
 
 
 
79
 
80
  def get_redis_pool():
81
- global _redis_pool
82
- if _redis_pool is None:
83
- _redis_pool = redis.ConnectionPool(
84
- host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD,
85
- decode_responses=True, max_connections=50
86
- )
87
  return _redis_pool
88
 
89
- # -----------------------------------------------------------------------------
90
- # Automation Engine
91
- # -----------------------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
92
  class AutomationEngine:
93
  """Robust Playwright automation engine"""
94
 
@@ -116,7 +126,7 @@ class AutomationEngine:
116
  self.browser = await self.playwright.chromium.launch(**browser_options)
117
  self.context = await self.browser.new_context()
118
  self.page = await self.context.new_page()
119
- # Abort resource-heavy requests
120
  await self.page.route("**/*", lambda route: (
121
  route.abort() if route.request.resource_type in ["image", "font", "media"]
122
  else route.continue_()
@@ -124,7 +134,7 @@ class AutomationEngine:
124
 
125
  async def stop(self):
126
  if self.context:
127
- await self.context.close() # 🔥 important
128
  if self.browser:
129
  await self.browser.close()
130
  if self.playwright:
@@ -143,47 +153,45 @@ class AutomationEngine:
143
  "response_mode=form_post"
144
  )
145
  await self.page.goto(login_url, timeout=60000)
146
-
147
  # Email step
148
  await self.page.get_by_role("textbox", name="Enter your email, phone, or").fill(email)
149
  await self.page.get_by_role("button", name="Next").click()
150
  await asyncio.sleep(2)
151
-
152
  # TAP step
153
  tap_field = self.page.get_by_role("textbox", name="Temporary Access Pass")
154
  await tap_field.fill(temp_pass)
155
  await self.page.get_by_role("button", name="Sign in").click()
156
-
157
  # Check for incorrect TAP immediately
158
  if await self.page.locator('text="Your Temporary Access Pass is incorrect."').is_visible(timeout=5000):
159
  logger.error(f"Incorrect Temporary Access Pass for {email}")
160
  return False
161
-
162
- # Wait a bit longer for the "Keep your account secure" or "Authenticator" page to load
163
  await asyncio.sleep(4)
164
-
165
  # Handling potential security prompts
166
  if await self.page.get_by_role("heading", name="Let's keep your account secure").is_visible(timeout=5000):
167
  await self.page.get_by_role("button", name="Next").click()
168
  await asyncio.sleep(2)
169
-
170
- # Choose different method -> Email (using the button from your HTML snippet)
171
  try:
172
- # Wait for the page to settle
173
  await asyncio.sleep(2)
174
  await self.page.wait_for_load_state("domcontentloaded", timeout=20000)
175
-
176
  selector = 'button[data-testid="choose-different-method-link"]:has-text("Set up a different way to sign in")'
177
  await self.page.wait_for_selector(selector, timeout=15000)
178
  await self.page.locator(selector).click()
179
-
180
- # Now click the Email option
181
  await self.page.get_by_role("button", name="Email Receive a code to reset").click()
182
  await asyncio.sleep(2)
183
  except Exception as e:
184
  logger.warning(f"Could not switch to Email method: {e}")
185
  return False
186
-
187
  return True
188
  except Exception as e:
189
  logger.error(f"Login failed for {email}: {e}")
@@ -211,22 +219,20 @@ class AutomationEngine:
211
 
212
  if await self.page.locator('[data-testid="message-bar-error"]').is_visible():
213
  text = await self.page.locator('[data-testid="message-bar-error"]').inner_text()
214
- if "too many" in text.lower(): return False, "rate_limit"
 
215
  return False, text
216
 
217
  if await self.page.locator('[data-testid="email-verify-challenge-otp-input"]').is_visible():
218
  await self.page.locator('[data-testid="backButton"]').click()
219
  logger.info(f"OTP Flow Success: {email}")
220
  return True, None
221
-
222
  return False, "otp_not_found"
223
  except Exception as e:
224
  return False, str(e)
225
 
226
 
227
- # -----------------------------------------------------------------------------
228
- # Worker Async Task Logic
229
- # -----------------------------------------------------------------------------
230
  class WorkerTask:
231
  def __init__(self, task_id: str):
232
  self.task_id = task_id
@@ -234,60 +240,61 @@ class WorkerTask:
234
  self.leads_done = 0
235
  self.leads_failed = 0
236
  self.current_action = "Starting"
237
-
238
  async def _fetch_batch(self) -> List[Dict[str, Any]]:
239
- """Fetch a batch of up to BATCH_SIZE using BRPOPLPUSH for the first, then RPOPLPUSH."""
240
  leads = []
241
  try:
242
- # Block for the first item
243
- raw_lead = await self.client.brpoplpush(LEAD_QUEUE, PROCESSING_QUEUE, timeout=POLL_TIMEOUT)
 
 
244
  if not raw_lead:
245
  return leads
246
-
247
  leads.append({"_raw": raw_lead, "data": json.loads(raw_lead)})
248
-
249
  # Non-blocking fetch for the rest of the batch
250
  for _ in range(BATCH_SIZE - 1):
251
- raw = await self.client.rpoplpush(LEAD_QUEUE, PROCESSING_QUEUE)
252
  if not raw:
253
  break
254
  leads.append({"_raw": raw, "data": json.loads(raw)})
255
-
256
  except Exception as e:
257
  logger.error(f"Error fetching batch: {e}")
258
-
259
  return leads
260
 
261
  async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]]):
262
- if not leads: return
 
 
 
263
  try:
264
  pipe = self.client.pipeline()
265
  for lead, result in zip(leads, results):
266
  pipe.lpush(RESULT_QUEUE, json.dumps(result))
267
  pipe.lrem(PROCESSING_QUEUE, -1, lead["_raw"])
268
-
269
  lead_id = lead["data"].get("id", str(uuid.uuid4()))
270
  if result["success"]:
271
  pipe.hdel(RETRY_COUNT_KEY, lead_id)
272
-
273
  await pipe.execute()
274
  except Exception as e:
275
  logger.error(f"Error in cleanup and push: {e}")
276
 
277
  async def run(self):
278
  logger.info(f"Task {self.task_id} started. Batch Size: {BATCH_SIZE}")
279
-
280
  while not _shutdown_requested:
281
- # Update shared machine stats (no per-thread Redis write)
282
  async with _stats_lock:
283
  _machine_stats["threads"][self.task_id] = self.current_action
284
- _machine_stats["total_done"] = sum(
285
- t.leads_done for t in _all_worker_tasks
286
- )
287
- _machine_stats["total_failed"] = sum(
288
- t.leads_failed for t in _all_worker_tasks
289
- )
290
-
291
  # 1. Lease Account from Ready Pool
292
  email = await self.client.spop(ACCOUNT_POOL_READY)
293
  if not email:
@@ -295,39 +302,41 @@ class WorkerTask:
295
  logger.debug(f"Task {self.task_id} waiting for accounts in '{ACCOUNT_POOL_READY}'...")
296
  await asyncio.sleep(5)
297
  continue
298
-
299
  acc_key = f"account:{email}"
300
  acc_pass = await self.client.hget(acc_key, "temp_pass")
301
-
302
- # Fetch Leads
303
  self.current_action = "Fetching Leads"
304
  batch = await self._fetch_batch()
305
  if not batch:
306
- # No leads to process, put account back in ready and wait
307
  await self.client.sadd(ACCOUNT_POOL_READY, email)
308
  self.current_action = "Idle"
309
  await asyncio.sleep(1)
310
  continue
311
-
312
  results = []
313
  engine = AutomationEngine()
314
-
 
 
 
315
  try:
316
- # Set headless=False so the user can visually debug the Microsoft login flow
317
  await engine.start(headless=True)
318
-
319
  # 3. Login
320
  self.current_action = f"Logging in: {email}"
321
  login_success = await engine.login_microsoft(email, acc_pass)
322
-
323
  for item in batch:
324
  lead_data = item["data"]
325
  lead_id = lead_data.get("id", str(uuid.uuid4()))
326
  raw_data = item["_raw"]
327
  lead_email = lead_data.get("email")
328
-
329
  self.current_action = f"Processing: {lead_email}"
330
-
331
  if not login_success:
332
  # Fail entire batch if login fails
333
  results.append({
@@ -337,7 +346,7 @@ class WorkerTask:
337
  "worker_id": self.task_id,
338
  "row_number": lead_data.get("row_number")
339
  })
340
-
341
  # Requeue the raw_data
342
  pipe = self.client.pipeline()
343
  pipe.rpush(LEAD_QUEUE, raw_data)
@@ -345,15 +354,17 @@ class WorkerTask:
345
  await pipe.execute()
346
  item["_skip"] = True
347
  continue
348
-
 
 
349
  try:
350
  retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0)
351
  start_time = time.time()
352
-
353
  # Run Automation
354
  success, err = await engine.process_lead(lead_email)
355
  elapsed = (time.time() - start_time) * 1000
356
-
357
  results.append({
358
  "lead_id": lead_id,
359
  "success": success,
@@ -363,23 +374,23 @@ class WorkerTask:
363
  "worker_id": self.task_id,
364
  "row_number": lead_data.get("row_number")
365
  })
366
-
367
  if success:
368
  self.leads_done += 1
369
  logger.info(f"[+] Successfully sent lead to: {lead_email}")
370
  else:
371
  self.leads_failed += 1
372
- logger.warning(f"[-] Failed to send lead to: {lead_email}. Reason: {err}")
373
-
374
  if err == "rate_limit":
375
  logger.warning("Rate limit hit, stopping batch")
376
  break
377
-
378
  except Exception as e:
379
  logger.error(f"Error processing lead {lead_id}: {e}")
380
  retries += 1
381
  await self.client.hset(RETRY_COUNT_KEY, lead_id, retries)
382
-
383
  if retries >= MAX_RETRIES:
384
  results.append({
385
  "lead_id": lead_id,
@@ -394,20 +405,21 @@ class WorkerTask:
394
  pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
395
  await pipe.execute()
396
  item["_skip"] = True
397
-
398
  except Exception as e:
399
  logger.error(f"Engine exception: {e}")
400
  finally:
401
  await engine.stop()
402
-
403
- # 4. Handle Account Routing (Cooldown / Done)
404
  if not login_success:
405
  await self.client.sadd("account_pool:pending_refresh", email)
406
  logger.info(f"Account {email} login failed. Moved to pending_refresh.")
407
  else:
408
- # Increment usage
409
- leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", len(batch))
410
-
 
411
  if leads_sent >= MAX_DAILY_LEADS:
412
  await self.client.sadd(ACCOUNT_POOL_DONE, email)
413
  logger.info(f"Account {email} hit daily limit ({leads_sent}/{MAX_DAILY_LEADS}). Moved to Done.")
@@ -415,20 +427,20 @@ class WorkerTask:
415
  now = time.time()
416
  await self.client.hset(acc_key, "cooldown_until", now + COOLDOWN_SECONDS)
417
  await self.client.sadd(ACCOUNT_POOL_COOLDOWN, email)
418
- logger.info(f"Account {email} finished batch ({leads_sent}/{MAX_DAILY_LEADS}). Moved to Cooldown for 1 hour.")
419
-
420
  final_batch = [b for b in batch if not b.get("_skip")]
421
- # Filter results for leads actually in final_batch
422
  valid_ids = {b["data"].get("id") for b in final_batch}
423
  final_results = [r for r in results if r["lead_id"] in valid_ids]
424
-
425
  await self._cleanup_and_push_results(final_batch, final_results)
426
 
427
- # Global list of all WorkerTask instances on this machine
428
  _all_worker_tasks: list["WorkerTask"] = []
429
 
 
430
  async def machine_heartbeat_loop():
431
- """Single coroutine that sends ONE heartbeat for the whole machine every 5s."""
432
  client = redis.Redis(connection_pool=get_redis_pool())
433
  key = f"worker:machine:{MACHINE_ID}"
434
  while not _shutdown_requested:
@@ -436,11 +448,11 @@ async def machine_heartbeat_loop():
436
  done = _machine_stats["total_done"]
437
  failed = _machine_stats["total_failed"]
438
  threads = dict(_machine_stats["threads"])
439
-
440
  total = done + failed
441
  success_rate = round((done / total * 100), 1) if total > 0 else 0
442
  busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting"))
443
-
444
  payload = {
445
  "id": MACHINE_ID,
446
  "status": "busy" if busy_count > 0 else "online",
@@ -453,24 +465,34 @@ async def machine_heartbeat_loop():
453
  (a for a in threads.values() if a not in ("Idle", "Starting")),
454
  "Idle"
455
  ),
456
- "version": "2.0.0",
457
  "last_seen": time.time()
458
  }
459
  await client.setex(key, 30, json.dumps(payload))
460
  await asyncio.sleep(5)
461
 
 
462
  async def main():
463
- logger.info("="*50)
 
 
 
 
 
464
  logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}")
465
- logger.info("="*50)
466
-
 
 
 
467
  for i in range(WORKER_TASKS):
468
  t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}")
469
  _all_worker_tasks.append(t)
470
-
471
  task_coroutines = [t.run() for t in _all_worker_tasks]
472
  await asyncio.gather(machine_heartbeat_loop(), *task_coroutines)
473
 
 
474
  if __name__ == "__main__":
475
  try:
476
  asyncio.run(main())
 
29
  # Playwright imports
30
  from playwright.async_api import async_playwright, Page, Browser, BrowserContext
31
 
 
 
32
  # -----------------------------------------------------------------------------
33
  REDIS_HOST = os.getenv("REDIS_HOST", "13.235.87.209")
34
  REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
 
45
  WORKER_TASKS = int(os.getenv("WORKER_THREADS", "5"))
46
  BATCH_SIZE = 10
47
  MAX_DAILY_LEADS = 40
48
+ COOLDOWN_SECONDS = 3600 # 1 hour
49
  MAX_RETRIES = 3
50
  POLL_TIMEOUT = 5
51
 
 
57
  # Machine identity (1 machine = 1 worker in dashboard)
58
  MACHINE_ID = socket.gethostname()
59
 
60
+ # Shared state across all tasks on this machine
61
  _machine_stats = {
62
  "total_done": 0,
63
  "total_failed": 0,
64
  "threads": {}, # task_id -> current_action
65
  }
66
+
67
+ # FIX #7: asyncio.Lock() module level par nahi, lazy init bhi nahi — main() ke andar bano
68
+ _stats_lock: asyncio.Lock = None # type: ignore
69
 
70
  def _signal_handler(signum, frame):
71
  global _shutdown_requested
72
+ logger.info("Shutdown signal received. Draining worker pool...")
73
  _shutdown_requested = True
74
 
75
  signal.signal(signal.SIGINT, _signal_handler)
76
  signal.signal(signal.SIGTERM, _signal_handler)
77
 
78
+ # FIX #7: Redis pool eager init — module level par, race condition nahi hogi
79
+ _redis_pool = redis.ConnectionPool(
80
+ host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD,
81
+ decode_responses=True, max_connections=50
82
+ )
83
 
84
  def get_redis_pool():
 
 
 
 
 
 
85
  return _redis_pool
86
 
87
+
88
+ # FIX #12: Redis startup connection check
89
+ async def check_redis_connection():
90
+ """Startup mein Redis reachability check karo"""
91
+ client = redis.Redis(connection_pool=get_redis_pool())
92
+ try:
93
+ await client.ping()
94
+ logger.info(f"✅ Redis connected: {REDIS_HOST}:{REDIS_PORT}")
95
+ except Exception as e:
96
+ logger.critical(f"❌ Cannot connect to Redis at {REDIS_HOST}:{REDIS_PORT} — {e}")
97
+ sys.exit(1)
98
+ finally:
99
+ await client.aclose()
100
+
101
+
102
  class AutomationEngine:
103
  """Robust Playwright automation engine"""
104
 
 
126
  self.browser = await self.playwright.chromium.launch(**browser_options)
127
  self.context = await self.browser.new_context()
128
  self.page = await self.context.new_page()
129
+ # Abort resource-heavy requests (images/fonts/media) for speed
130
  await self.page.route("**/*", lambda route: (
131
  route.abort() if route.request.resource_type in ["image", "font", "media"]
132
  else route.continue_()
 
134
 
135
  async def stop(self):
136
  if self.context:
137
+ await self.context.close()
138
  if self.browser:
139
  await self.browser.close()
140
  if self.playwright:
 
153
  "response_mode=form_post"
154
  )
155
  await self.page.goto(login_url, timeout=60000)
156
+
157
  # Email step
158
  await self.page.get_by_role("textbox", name="Enter your email, phone, or").fill(email)
159
  await self.page.get_by_role("button", name="Next").click()
160
  await asyncio.sleep(2)
161
+
162
  # TAP step
163
  tap_field = self.page.get_by_role("textbox", name="Temporary Access Pass")
164
  await tap_field.fill(temp_pass)
165
  await self.page.get_by_role("button", name="Sign in").click()
166
+
167
  # Check for incorrect TAP immediately
168
  if await self.page.locator('text="Your Temporary Access Pass is incorrect."').is_visible(timeout=5000):
169
  logger.error(f"Incorrect Temporary Access Pass for {email}")
170
  return False
171
+
 
172
  await asyncio.sleep(4)
173
+
174
  # Handling potential security prompts
175
  if await self.page.get_by_role("heading", name="Let's keep your account secure").is_visible(timeout=5000):
176
  await self.page.get_by_role("button", name="Next").click()
177
  await asyncio.sleep(2)
178
+
179
+ # Choose different method -> Email
180
  try:
 
181
  await asyncio.sleep(2)
182
  await self.page.wait_for_load_state("domcontentloaded", timeout=20000)
183
+
184
  selector = 'button[data-testid="choose-different-method-link"]:has-text("Set up a different way to sign in")'
185
  await self.page.wait_for_selector(selector, timeout=15000)
186
  await self.page.locator(selector).click()
187
+
188
+ # Click the Email option
189
  await self.page.get_by_role("button", name="Email Receive a code to reset").click()
190
  await asyncio.sleep(2)
191
  except Exception as e:
192
  logger.warning(f"Could not switch to Email method: {e}")
193
  return False
194
+
195
  return True
196
  except Exception as e:
197
  logger.error(f"Login failed for {email}: {e}")
 
219
 
220
  if await self.page.locator('[data-testid="message-bar-error"]').is_visible():
221
  text = await self.page.locator('[data-testid="message-bar-error"]').inner_text()
222
+ if "too many" in text.lower():
223
+ return False, "rate_limit"
224
  return False, text
225
 
226
  if await self.page.locator('[data-testid="email-verify-challenge-otp-input"]').is_visible():
227
  await self.page.locator('[data-testid="backButton"]').click()
228
  logger.info(f"OTP Flow Success: {email}")
229
  return True, None
230
+
231
  return False, "otp_not_found"
232
  except Exception as e:
233
  return False, str(e)
234
 
235
 
 
 
 
236
  class WorkerTask:
237
  def __init__(self, task_id: str):
238
  self.task_id = task_id
 
240
  self.leads_done = 0
241
  self.leads_failed = 0
242
  self.current_action = "Starting"
243
+
244
  async def _fetch_batch(self) -> List[Dict[str, Any]]:
245
+ """Fetch a batch of up to BATCH_SIZE leads from Redis."""
246
  leads = []
247
  try:
248
+ # FIX #8: brpoplpush deprecated — use blmove instead (Redis 6.2+)
249
+ raw_lead = await self.client.blmove(
250
+ LEAD_QUEUE, PROCESSING_QUEUE, "RIGHT", "LEFT", timeout=POLL_TIMEOUT
251
+ )
252
  if not raw_lead:
253
  return leads
254
+
255
  leads.append({"_raw": raw_lead, "data": json.loads(raw_lead)})
256
+
257
  # Non-blocking fetch for the rest of the batch
258
  for _ in range(BATCH_SIZE - 1):
259
+ raw = await self.client.lmove(LEAD_QUEUE, PROCESSING_QUEUE, "RIGHT", "LEFT")
260
  if not raw:
261
  break
262
  leads.append({"_raw": raw, "data": json.loads(raw)})
263
+
264
  except Exception as e:
265
  logger.error(f"Error fetching batch: {e}")
266
+
267
  return leads
268
 
269
  async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]]):
270
+ if not leads:
271
+ return
272
+ if len(leads) != len(results):
273
+ logger.error(f"Mismatch: {len(leads)} leads but {len(results)} results — some leads may be stuck!")
274
  try:
275
  pipe = self.client.pipeline()
276
  for lead, result in zip(leads, results):
277
  pipe.lpush(RESULT_QUEUE, json.dumps(result))
278
  pipe.lrem(PROCESSING_QUEUE, -1, lead["_raw"])
279
+
280
  lead_id = lead["data"].get("id", str(uuid.uuid4()))
281
  if result["success"]:
282
  pipe.hdel(RETRY_COUNT_KEY, lead_id)
283
+
284
  await pipe.execute()
285
  except Exception as e:
286
  logger.error(f"Error in cleanup and push: {e}")
287
 
288
  async def run(self):
289
  logger.info(f"Task {self.task_id} started. Batch Size: {BATCH_SIZE}")
290
+
291
  while not _shutdown_requested:
292
+ # Update shared machine stats
293
  async with _stats_lock:
294
  _machine_stats["threads"][self.task_id] = self.current_action
295
+ _machine_stats["total_done"] = sum(t.leads_done for t in _all_worker_tasks)
296
+ _machine_stats["total_failed"] = sum(t.leads_failed for t in _all_worker_tasks)
297
+
 
 
 
 
298
  # 1. Lease Account from Ready Pool
299
  email = await self.client.spop(ACCOUNT_POOL_READY)
300
  if not email:
 
302
  logger.debug(f"Task {self.task_id} waiting for accounts in '{ACCOUNT_POOL_READY}'...")
303
  await asyncio.sleep(5)
304
  continue
305
+
306
  acc_key = f"account:{email}"
307
  acc_pass = await self.client.hget(acc_key, "temp_pass")
308
+
309
+ # 2. Fetch Leads
310
  self.current_action = "Fetching Leads"
311
  batch = await self._fetch_batch()
312
  if not batch:
313
+ # No leads, return account to ready pool
314
  await self.client.sadd(ACCOUNT_POOL_READY, email)
315
  self.current_action = "Idle"
316
  await asyncio.sleep(1)
317
  continue
318
+
319
  results = []
320
  engine = AutomationEngine()
321
+
322
+ # FIX #2: login_success default False PEHLE try block se — UnboundLocalError prevent
323
+ login_success = False
324
+
325
  try:
 
326
  await engine.start(headless=True)
327
+
328
  # 3. Login
329
  self.current_action = f"Logging in: {email}"
330
  login_success = await engine.login_microsoft(email, acc_pass)
331
+
332
  for item in batch:
333
  lead_data = item["data"]
334
  lead_id = lead_data.get("id", str(uuid.uuid4()))
335
  raw_data = item["_raw"]
336
  lead_email = lead_data.get("email")
337
+
338
  self.current_action = f"Processing: {lead_email}"
339
+
340
  if not login_success:
341
  # Fail entire batch if login fails
342
  results.append({
 
346
  "worker_id": self.task_id,
347
  "row_number": lead_data.get("row_number")
348
  })
349
+
350
  # Requeue the raw_data
351
  pipe = self.client.pipeline()
352
  pipe.rpush(LEAD_QUEUE, raw_data)
 
354
  await pipe.execute()
355
  item["_skip"] = True
356
  continue
357
+
358
+ # FIX #3: retries default 0 PEHLE — UnboundLocalError prevent
359
+ retries = 0
360
  try:
361
  retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0)
362
  start_time = time.time()
363
+
364
  # Run Automation
365
  success, err = await engine.process_lead(lead_email)
366
  elapsed = (time.time() - start_time) * 1000
367
+
368
  results.append({
369
  "lead_id": lead_id,
370
  "success": success,
 
374
  "worker_id": self.task_id,
375
  "row_number": lead_data.get("row_number")
376
  })
377
+
378
  if success:
379
  self.leads_done += 1
380
  logger.info(f"[+] Successfully sent lead to: {lead_email}")
381
  else:
382
  self.leads_failed += 1
383
+ logger.warning(f"[-] Failed lead: {lead_email}. Reason: {err}")
384
+
385
  if err == "rate_limit":
386
  logger.warning("Rate limit hit, stopping batch")
387
  break
388
+
389
  except Exception as e:
390
  logger.error(f"Error processing lead {lead_id}: {e}")
391
  retries += 1
392
  await self.client.hset(RETRY_COUNT_KEY, lead_id, retries)
393
+
394
  if retries >= MAX_RETRIES:
395
  results.append({
396
  "lead_id": lead_id,
 
405
  pipe.lrem(PROCESSING_QUEUE, -1, raw_data)
406
  await pipe.execute()
407
  item["_skip"] = True
408
+
409
  except Exception as e:
410
  logger.error(f"Engine exception: {e}")
411
  finally:
412
  await engine.stop()
413
+
414
+ # 4. Handle Account Routing (Cooldown / Done / Pending Refresh)
415
  if not login_success:
416
  await self.client.sadd("account_pool:pending_refresh", email)
417
  logger.info(f"Account {email} login failed. Moved to pending_refresh.")
418
  else:
419
+ # FIX #6: Count only actually processed leads (not skipped)
420
+ actual_processed = len([b for b in batch if not b.get("_skip")])
421
+ leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", actual_processed)
422
+
423
  if leads_sent >= MAX_DAILY_LEADS:
424
  await self.client.sadd(ACCOUNT_POOL_DONE, email)
425
  logger.info(f"Account {email} hit daily limit ({leads_sent}/{MAX_DAILY_LEADS}). Moved to Done.")
 
427
  now = time.time()
428
  await self.client.hset(acc_key, "cooldown_until", now + COOLDOWN_SECONDS)
429
  await self.client.sadd(ACCOUNT_POOL_COOLDOWN, email)
430
+ logger.info(f"Account {email} on Cooldown ({leads_sent}/{MAX_DAILY_LEADS}). 1 hour rest.")
431
+
432
  final_batch = [b for b in batch if not b.get("_skip")]
 
433
  valid_ids = {b["data"].get("id") for b in final_batch}
434
  final_results = [r for r in results if r["lead_id"] in valid_ids]
435
+
436
  await self._cleanup_and_push_results(final_batch, final_results)
437
 
438
+
439
  _all_worker_tasks: list["WorkerTask"] = []
440
 
441
+
442
  async def machine_heartbeat_loop():
443
+ """Send ONE heartbeat per machine every 5s."""
444
  client = redis.Redis(connection_pool=get_redis_pool())
445
  key = f"worker:machine:{MACHINE_ID}"
446
  while not _shutdown_requested:
 
448
  done = _machine_stats["total_done"]
449
  failed = _machine_stats["total_failed"]
450
  threads = dict(_machine_stats["threads"])
451
+
452
  total = done + failed
453
  success_rate = round((done / total * 100), 1) if total > 0 else 0
454
  busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting"))
455
+
456
  payload = {
457
  "id": MACHINE_ID,
458
  "status": "busy" if busy_count > 0 else "online",
 
465
  (a for a in threads.values() if a not in ("Idle", "Starting")),
466
  "Idle"
467
  ),
468
+ "version": "2.1.0",
469
  "last_seen": time.time()
470
  }
471
  await client.setex(key, 30, json.dumps(payload))
472
  await asyncio.sleep(5)
473
 
474
+
475
  async def main():
476
+ global _stats_lock
477
+
478
+ # FIX #13: asyncio.Lock() event loop ke andar banao, module level par nahi
479
+ _stats_lock = asyncio.Lock()
480
+
481
+ logger.info("=" * 50)
482
  logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}")
483
+ logger.info("=" * 50)
484
+
485
+ # FIX #12: Redis connection check at startup
486
+ await check_redis_connection()
487
+
488
  for i in range(WORKER_TASKS):
489
  t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}")
490
  _all_worker_tasks.append(t)
491
+
492
  task_coroutines = [t.run() for t in _all_worker_tasks]
493
  await asyncio.gather(machine_heartbeat_loop(), *task_coroutines)
494
 
495
+
496
  if __name__ == "__main__":
497
  try:
498
  asyncio.run(main())