import os import sys import subprocess def _ensure_dependencies(): try: import redis import playwright except ImportError: print("[-] Missing dependencies. Auto-installing 'redis' and 'playwright'...") subprocess.check_call([sys.executable, "-m", "pip", "install", "redis", "playwright"]) print("[-] Installing Playwright chromium browser...") subprocess.check_call([sys.executable, "-m", "playwright", "install", "chromium"]) print("[+] Installation complete. Restarting worker...") os.execv(sys.executable, [sys.executable] + sys.argv) _ensure_dependencies() import json import logging import signal import asyncio import uuid import time import socket from typing import Any, Dict, List, Optional import redis.asyncio as redis # Playwright imports from playwright.async_api import async_playwright, Page, Browser, BrowserContext # ----------------------------------------------------------------------------- # Configuration # ----------------------------------------------------------------------------- REDIS_HOST = os.getenv("REDIS_HOST", "18.188.157.122") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None) LEAD_QUEUE = "lead_queue" PROCESSING_QUEUE = "processing_queue" RESULT_QUEUE = "result_queue" ACCOUNT_POOL_READY = "account_pool:ready" ACCOUNT_POOL_ASSIGNED = "account_pool:assigned" ACCOUNT_POOL_PROCESSING = "account_pool:processing" ACCOUNT_POOL_COOLDOWN = "account_pool:cooldown" RETRY_COUNT_KEY = "retry_count" # Key that stores a SET of lead emails that are currently in-flight or already done. # This is the deduplication guard: a lead email is added here before processing # and removed only if it fails (so it can be retried). If it succeeds it stays # forever (within the day) so it cannot be sent again. SENT_LEADS_SET = "leads:sent_today" WORKER_TASKS = int(os.getenv("WORKER_THREADS", "10")) BATCH_SIZE = 10 # leads per account session DEFAULT_COOLDOWN_SECONDS = int(os.getenv("DEFAULT_COOLDOWN_SECONDS", "600")) # 10 minutes between batches RESET_SENT_DEDUP_DAILY = os.getenv("RESET_SENT_DEDUP_DAILY", "0").lower() in {"1", "true", "yes", "on"} ADMIN_SETTINGS_FILE = os.getenv( "ADMIN_SETTINGS_FILE", r"C:\Users\PRADIP MUDI\Downloads\Telegram Desktop\lead_processor\admin_state.json" ) MAX_RETRIES = 3 POLL_TIMEOUT = 5 # seconds to block on BRPOPLPUSH logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("worker") _shutdown_requested = False # Machine identity (1 machine = 1 worker in dashboard) MACHINE_ID = socket.gethostname() # Shared stats across all coroutines on this machine _machine_stats = { "total_done": 0, "total_failed": 0, "threads": {}, # task_id -> current_action } _stats_lock = asyncio.Lock() def _signal_handler(signum, frame): global _shutdown_requested logger.info("Shutdown signal received. Draining worker pool...") _shutdown_requested = True signal.signal(signal.SIGINT, _signal_handler) signal.signal(signal.SIGTERM, _signal_handler) _redis_pool = None def get_redis_pool(): global _redis_pool if _redis_pool is None: _redis_pool = redis.ConnectionPool( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True, max_connections=50 ) return _redis_pool async def get_cooldown_seconds(client: redis.Redis) -> int: try: raw = await client.get("config:account_cooldown_seconds") if raw is None: if os.path.exists(ADMIN_SETTINGS_FILE): with open(ADMIN_SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f).get("settings", {}) raw = settings.get("cooldown_seconds") if raw is None: return DEFAULT_COOLDOWN_SECONDS value = int(raw) return max(60, min(value, 24 * 3600)) except Exception as e: logger.warning(f"Could not read cooldown setting, using default: {e}") return DEFAULT_COOLDOWN_SECONDS # ----------------------------------------------------------------------------- # Automation Engine # ----------------------------------------------------------------------------- class AutomationEngine: """Robust Playwright automation engine""" def __init__(self): self.playwright = None self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None async def start(self, headless: bool = False): self.playwright = await async_playwright().start() browser_options = { "headless": headless, "args": [ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-accelerated-2d-canvas", "--no-first-run", "--no-zygote", "--single-process", "--disable-gpu" ] } self.browser = await self.playwright.chromium.launch(**browser_options) # Use a realistic User-Agent to avoid "Headless" detection user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" self.context = await self.browser.new_context( user_agent=user_agent, locale="en-US", timezone_id="UTC", viewport={"width": 1280, "height": 800} ) self.page = await self.context.new_page() # Aggressive network blocking for super low bandwidth excluded_resource_types = {"image", "font", "media"} async def route_interceptor(route): if route.request.resource_type in excluded_resource_types: return await route.abort() return await route.continue_() await self.page.route("**/*", route_interceptor) async def stop(self): if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() async def is_visible_robust(self, locator, timeout_ms: int = 5000) -> bool: """Wait for a locator to be visible and return bool, without throwing.""" try: await locator.wait_for(state="visible", timeout=timeout_ms) return True except: return False async def ensure_on_lead_form(self) -> bool: """Robustly ensure the browser is on the lead input form, handling transitions and reloads.""" try: # 1. Check if already there email_input = self.page.locator('[data-testid="email-input"]') if await self.is_visible_robust(email_input, 3000): return True logger.info(f"Not on lead form (URL: {self.page.url}). Attempting to navigate...") # 2. Handle 'mysignins' or security info redirects if "mysignins" in self.page.url.lower() or "security" in self.page.url.lower(): # Try clicking "Set up a different way" transition links first transition_selectors = [ 'text="I want to set up a different method"', 'text="Set up a different way to sign in"', 'button:has-text("different way")', 'button:has-text("different method")', 'button[data-testid="choose-different-method-link"]', 'a:has-text("different method")', '#otherWaysLink' ] for sel in transition_selectors: btn = self.page.locator(sel) if await self.is_visible_robust(btn, 2000): logger.info(f"Clicking transition link: {sel}") await btn.click() try: await self.page.wait_for_load_state("networkidle", timeout=20000) except Exception: await self.page.wait_for_load_state("domcontentloaded", timeout=10000) break # 3. Look for "Email" method buttons email_btns = [ 'button:has-text("Email")', '[data-testid="email-method-button"]', 'text="Email Receive a code to reset"', 'button:has-text("Receive a code to reset")', 'div[role="button"]:has-text("Email")', 'span:has-text("Email")' ] for sel in email_btns: btn = self.page.locator(sel) if await self.is_visible_robust(btn, 3000): logger.info(f"Clicking Email method via {sel}") await btn.click() try: await self.page.wait_for_load_state("networkidle", timeout=20000) except Exception: await self.page.wait_for_load_state("domcontentloaded", timeout=10000) if await self.is_visible_robust(email_input, 5000): return True # 4. Try clicking back button back_btn = self.page.locator('[data-testid="backButton"]') if await self.is_visible_robust(back_btn, 2000): logger.info("Clicking back button...") await back_btn.click() await asyncio.sleep(2) if await self.is_visible_robust(email_input, 3000): return True # 5. Final fallback: Reload logger.warning("Still not on form. Reloading page...") try: await self.page.reload(timeout=60000, wait_until="domcontentloaded") except Exception as e: logger.warning(f"Reload timed out or failed: {e}") await asyncio.sleep(5) # Re-check after reload if await self.is_visible_robust(email_input, 5000): return True # One more attempt at Email button after reload for sel in email_btns: btn = self.page.locator(sel) if await self.is_visible_robust(btn, 3000): await btn.click() await asyncio.sleep(5) break return await self.is_visible_robust(email_input, 5000) except Exception as e: logger.error(f"Error in ensure_on_lead_form: {e}") return False async def login_microsoft(self, email: str, temp_pass: str) -> bool: """Robust Login using TAP (Temporary Access Pass) flow with branching logic""" try: logger.info(f"Logging in as {email} using TAP flow...") login_url = ( "https://login.microsoftonline.com/common/oauth2/v2.0/authorize?" "client_id=4765445b-32c6-49b0-83e6-1d93765276ca&" "redirect_uri=https%3A%2F%2Fwww.office.com%2Flandingv2&" "response_type=code%20id_token&" "scope=openid%20profile%20https%3A%2F%2Fwww.office.com%2Fv2%2FOfficeHome.All&" "response_mode=form_post" ) await self.page.goto(login_url, timeout=90000) # 1. Email Step email_field = self.page.locator('input[type="email"], input[name="loginfmt"]').first try: await email_field.wait_for(state="visible", timeout=60000) except Exception: pass # allow fill to attempt and throw its own error await email_field.fill(email, timeout=30000) # Click and also press Enter to be sure await self.page.locator('input[type="submit"], #idSIButton9').click() await self.page.keyboard.press("Enter") logger.info("Email submitted, waiting for redirect...") # 2. Detect next state (TAP vs Password vs Security) # Increase attempts to 15 for slower environments for attempt in range(15): curr_url = self.page.url logger.info(f"Login Progress (Attempt {attempt+1}): {curr_url}") # Case A: Temporary Access Pass field tap_field = self.page.locator('input[name="otc"], input[placeholder="Temporary Access Pass"]') if await self.is_visible_robust(tap_field, 3000): logger.info("TAP field detected.") await tap_field.fill(temp_pass) await self.page.locator('input[type="submit"], #idSIButton9').click() await self.page.keyboard.press("Enter") break # Case B: Password field pwd_field = self.page.locator('input[type="password"]') if await self.is_visible_robust(pwd_field, 1000): logger.warning("Microsoft asked for Password instead of TAP. Attempting to switch...") # Try direct TAP link or "Other ways" tap_switch_selectors = [ 'text="Use a Temporary Access Pass instead"', '#idA_PWD_SwitchToOTC', 'a:has-text("Temporary Access Pass")', 'text="Other ways to sign in"', 'text="Sign in another way"', '#idA_PWD_SwitchToOther' ] for sel in tap_switch_selectors: link = self.page.locator(sel) if await self.is_visible_robust(link, 2000): logger.info(f"Clicking switch option: {sel}") await link.click() await asyncio.sleep(3) # Check if TAP link appeared after clicking "Other ways" tap_link = self.page.locator('text="Temporary Access Pass"') if await self.is_visible_robust(tap_link, 3000): await tap_link.click() await asyncio.sleep(3) break # Re-check if we are now on TAP page if await self.is_visible_robust(self.page.locator('input[placeholder="Temporary Access Pass"]'), 2000): continue logger.error("Could not find TAP option on password page.") return False # Case C: "Stay signed in?" if "kmsi" in curr_url.lower() or await self.is_visible_robust(self.page.locator('text="Stay signed in?"'), 1000): logger.info("Handling 'Stay signed in?' prompt.") await self.page.locator('input[type="submit"], #idSIButton9').click() await self.page.keyboard.press("Enter") break # Case D: Security / More info required if "proofup" in curr_url.lower() or await self.is_visible_robust(self.page.locator('text="More information required"'), 1000): logger.info("Handling 'More information required' prompt.") await self.page.locator('input[type="submit"], #idSIButton9').click() await self.page.keyboard.press("Enter") await asyncio.sleep(5) continue # Case E: Account Disambiguation (Work/School vs Personal) work_btn = self.page.locator('text="Work or school account"') if await self.is_visible_robust(work_btn, 1000): logger.info("Selecting 'Work or school account'.") await work_btn.click() await asyncio.sleep(3) continue # Fallback: Warning if still on login page and stuck if attempt == 7 and "login.microsoftonline.com" in curr_url: logger.warning(f"Login stuck for {email}. URL: {curr_url}") await asyncio.sleep(3) # 3. Final verification and MFA Method Selection await asyncio.sleep(5) # Check for incorrect TAP if await self.is_visible_robust(self.page.locator('text="Your Temporary Access Pass is incorrect."'), 5000): logger.error(f"Incorrect Temporary Access Pass for {email}") return False # Case: "Keep your account secure" / "More information required" if "proofup" in self.page.url.lower() or await self.is_visible_robust(self.page.get_by_role("heading", name="Let's keep your account secure"), 5000): logger.info("On security setup page. Clicking Next...") await self.page.get_by_role("button", name="Next").click() try: await self.page.wait_for_load_state("networkidle", timeout=20000) except Exception: await self.page.wait_for_load_state("domcontentloaded", timeout=10000) # 4. Force "Email" method for lead processing return await self.ensure_on_lead_form() logger.info(f"Login flow completed for {email}. Final URL: {self.page.url}") return True except Exception as e: logger.error(f"Login failed for {email}: {e}") return False async def process_lead(self, email: str) -> tuple[bool, Optional[str]]: """Process a single lead OTP flow with recovery logic""" try: # Ensure we are on a clean starting state if not await self.ensure_on_lead_form(): logger.error(f"Could not reach lead form for {email}") return False, "form_not_reachable" # Wait for email input email_input = self.page.locator('[data-testid="email-input"]') await email_input.wait_for(timeout=45000) await email_input.fill(email) # Click and also press Enter to be sure next_btn = self.page.locator('[data-testid="reskin-step-next-button"]') await next_btn.click(timeout=45000) await self.page.keyboard.press("Enter") # Wait for OTP or Error target_state = ( '[data-testid="email-verify-challenge-otp-input"], ' '[data-testid="message-bar-error"], ' '#usernameError, #i0116Error, ' 'iframe[src*="arkoselabs"], iframe[src*="fc/a/"]' ) try: await self.page.wait_for_selector(target_state, timeout=60000) except Exception as e: return False, f"Timeout: {str(e)}" # Handle known inline errors (invalid email, etc) if await self.is_visible_robust(self.page.locator('#usernameError, #i0116Error'), 1000): text = await self.page.locator('#usernameError, #i0116Error').first.inner_text() # Must go back if it's an inline error so the form is ready for the next lead await self.page.locator('[data-testid="backButton"], #idBtn_Back').first.click(timeout=3000) await asyncio.sleep(1) return False, f"Invalid Email: {text}" # Handle Captcha if await self.is_visible_robust(self.page.locator('iframe[src*="arkoselabs"], iframe[src*="fc/a/"]'), 1000): return False, "captcha_detected" if await self.is_visible_robust(self.page.locator('[data-testid="message-bar-error"]'), 1000): text = await self.page.locator('[data-testid="message-bar-error"]').inner_text() if "too many" in text.lower(): return False, "rate_limit" return False, text if await self.is_visible_robust(self.page.locator('[data-testid="email-verify-challenge-otp-input"]'), 1000): # SUCCESS - but we MUST go back for the next lead back_btn = self.page.locator('[data-testid="backButton"]') if await self.is_visible_robust(back_btn, 5000): await back_btn.click() logger.info(f"OTP Flow Success: {email}") return True, None return False, "otp_not_found" except Exception as e: logger.error(f"Error in process_lead for {email}: {e}") return False, str(e) # ----------------------------------------------------------------------------- # Worker Async Task Logic # ----------------------------------------------------------------------------- class WorkerTask: def __init__(self, task_id: str): self.task_id = task_id self.client = redis.Redis(connection_pool=get_redis_pool()) self.leads_done = 0 self.leads_failed = 0 self.current_action = "Starting" # ------------------------------------------------------------------ # Fetch exactly BATCH_SIZE UNIQUE leads from the queue. # Uses a Redis SET (SENT_LEADS_SET) to skip leads already processed # today. Each lead that passes the dedup check is atomically claimed # via SETNX so no two threads can process the same email at once. # ------------------------------------------------------------------ async def _fetch_unique_batch(self, account_email: str) -> List[Dict[str, Any]]: """ Pops leads from the isolated account_queue provided by the Assigner. Global and Account-level dedup are already handled by Assigner, but we still check locally as a secondary guard. """ unique_leads = [] account_queue = f"account_queue:{account_email}" used_key = f"used:{account_email}" skipped = 0 try: # We pop up to BATCH_SIZE items from the account queue raw_leads = await self.client.lrange(account_queue, 0, BATCH_SIZE - 1) if not raw_leads: return [] # Remove them from the list await self.client.ltrim(account_queue, len(raw_leads), -1) for raw_lead in raw_leads: try: data = json.loads(raw_lead) except Exception: continue lead_email = data.get("email", "").strip().lower() if not lead_email: continue # Secondary dedup guard already_sent = await self.client.sismember(used_key, lead_email) if already_sent: skipped += 1 logger.debug(f"Skipping lead {lead_email} - already used by {account_email}") continue # Claim this lead for this account claimed = await self.client.sadd(used_key, lead_email) if not claimed: skipped += 1 continue data["email"] = lead_email unique_leads.append({"_raw": raw_lead, "data": data}) except Exception as e: logger.error(f"Error fetching unique batch: {e}") if skipped: logger.info(f"[Dedup] Skipped {skipped} duplicate leads in this batch fetch.") return unique_leads async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]], account_email: str): if not leads: return try: pipe = self.client.pipeline() for lead, result in zip(leads, results): pipe.lpush(RESULT_QUEUE, json.dumps(result)) lead_id = lead["data"].get("id", str(uuid.uuid4())) if result["success"]: pipe.hdel(RETRY_COUNT_KEY, lead_id) else: # On failure, un-claim from SENT_LEADS_SET so it can be # retried next time (unless max retries exceeded). retries = int(result.get("_retries", 0)) if retries < MAX_RETRIES: lead_email = lead["data"].get("email", "") pipe.srem(f"used:{account_email}", lead_email) await pipe.execute() except Exception as e: logger.error(f"Error in cleanup and push: {e}") async def run(self): logger.info(f"Task {self.task_id} started. Batch Size: {BATCH_SIZE}") while not _shutdown_requested: # Update shared machine stats async with _stats_lock: _machine_stats["threads"][self.task_id] = self.current_action _machine_stats["total_done"] = sum(t.leads_done for t in _all_worker_tasks) _machine_stats["total_failed"] = sum(t.leads_failed for t in _all_worker_tasks) # ---------------------------------------------------------- # 1. Lease ONE account from Assigned pool (atomic BRPOPLPUSH) # ---------------------------------------------------------- try: email = await self.client.brpoplpush(ACCOUNT_POOL_ASSIGNED, ACCOUNT_POOL_PROCESSING, timeout=5) except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: logger.warning(f"Redis blocking operation interrupted: {e}") await asyncio.sleep(2) continue if not email: self.current_action = "Idle" continue acc_key = f"account:{email}" acc_pass = await self.client.hget(acc_key, "temp_pass") if not acc_pass: logger.warning(f"Account {email} has no temp_pass. Moving to pending_refresh.") await self.client.sadd("account_pool:pending_refresh", email) continue # ---------------------------------------------------------- # 2. Fetch up to 10 UNIQUE leads # ---------------------------------------------------------- self.current_action = "Fetching Leads" batch = await self._fetch_unique_batch(email) if not batch: # No leads — return account to ready pool and clean up assignment await self.client.lrem(ACCOUNT_POOL_PROCESSING, -1, email) await self.client.sadd(ACCOUNT_POOL_READY, email) self.current_action = "Idle" await asyncio.sleep(1) continue logger.info(f"Task {self.task_id}: Account={email}, Leads in account_queue={len(batch)}") results = [] engine = AutomationEngine() login_success = False try: await engine.start(headless=True) # ---------------------------------------------------------- # 3. Login once per account session # ---------------------------------------------------------- self.current_action = f"Logging in: {email}" login_success = await engine.login_microsoft(email, acc_pass) for item_idx, item in enumerate(batch): lead_data = item["data"] lead_id = lead_data.get("id", str(uuid.uuid4())) raw_data = item["_raw"] lead_email = lead_data.get("email") self.current_action = f"Processing: {lead_email}" if not login_success: # Login failed — requeue lead to lead_queue, unclaim, and abort account pipe = self.client.pipeline() pipe.lpush(LEAD_QUEUE, raw_data) pipe.srem(f"used:{email}", lead_email) # ← un-claim so it can be retried await pipe.execute() results.append({ "lead_id": lead_id, "success": False, "error": "login_failed", "worker_id": self.task_id, "row_number": lead_data.get("row_number"), "_skip": True, }) item["_skip"] = True continue # -------------------------------------------------- # 4. Process each lead # -------------------------------------------------- try: retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0) start_time = time.time() success, err = await engine.process_lead(lead_email) elapsed = (time.time() - start_time) * 1000 results.append({ "lead_id": lead_id, "success": success, "output": {"error": err} if not success else {}, "error": err if not success else None, "processing_time_ms": elapsed, "worker_id": self.task_id, "row_number": lead_data.get("row_number"), "_retries": retries, }) if success: self.leads_done += 1 logger.info(f"[+] Sent lead: {lead_email} via {email}") else: self.leads_failed += 1 logger.warning(f"[-] Failed lead: {lead_email}. Reason: {err}") abort_batch = False if err == "rate_limit": logger.warning("Rate limit hit — stopping batch early.") abort_batch = True elif err == "captcha_detected": logger.warning("Captcha detected — stopping batch early to avoid blocks.") abort_batch = True elif err and "Timeout" in str(err): logger.warning("Page timeout detected (broken page) — stopping batch early.") abort_batch = True if abort_batch: # Requeue remaining leads in the batch so they aren't wasted/lost if item_idx + 1 < len(batch): pipe = self.client.pipeline() for future_item in batch[item_idx + 1:]: pipe.lpush(LEAD_QUEUE, future_item["_raw"]) pipe.srem(f"used:{email}", future_item["data"].get("email", "")) future_item["_skip"] = True await pipe.execute() break except Exception as e: logger.error(f"Error processing lead {lead_id}: {e}") retries += 1 await self.client.hset(RETRY_COUNT_KEY, lead_id, retries) if retries >= MAX_RETRIES: results.append({ "lead_id": lead_id, "success": False, "error": str(e), "worker_id": self.task_id, "row_number": lead_data.get("row_number"), "_retries": retries, }) else: # Requeue and un-claim pipe = self.client.pipeline() pipe.lpush(LEAD_QUEUE, raw_data) pipe.srem(f"used:{email}", lead_email) await pipe.execute() item["_skip"] = True except Exception as e: logger.error(f"Engine exception: {e}") finally: await engine.stop() # ---------------------------------------------------------- # 5. Account routing after batch # ---------------------------------------------------------- if not login_success: await self.client.sadd("account_pool:pending_refresh", email) logger.info(f"Account {email} login failed → pending_refresh.") else: # Count only leads that were actually attempted (not skipped/requeued) actually_sent = sum( 1 for r in results if not r.get("_skip") and r["success"] ) leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", actually_sent) now = time.time() cooldown_seconds = await get_cooldown_seconds(self.client) pipe = self.client.pipeline() pipe.hset(acc_key, "cooldown_until", now + cooldown_seconds) pipe.sadd(ACCOUNT_POOL_COOLDOWN, email) pipe.srem("account_pool:done_for_today", email) await pipe.execute() logger.info( f"Account {email} batch done ({leads_sent} leads total) " f"-> Cooldown {round(cooldown_seconds / 60, 1)}m." ) # ---------------------------------------------------------- # 6. Push results to result_queue and clean processing_queue # ---------------------------------------------------------- final_batch = [b for b in batch if not b.get("_skip")] valid_ids = {b["data"].get("id") for b in final_batch} final_results = [r for r in results if r.get("lead_id") in valid_ids and not r.get("_skip")] await self._cleanup_and_push_results(final_batch, final_results, email) # Global list of all WorkerTask instances on this machine _all_worker_tasks: list["WorkerTask"] = [] async def machine_heartbeat_loop(): """Sends ONE heartbeat key for the whole machine every 5s.""" client = redis.Redis(connection_pool=get_redis_pool()) key = f"worker:machine:{MACHINE_ID}" while not _shutdown_requested: async with _stats_lock: done = _machine_stats["total_done"] failed = _machine_stats["total_failed"] threads = dict(_machine_stats["threads"]) total = done + failed success_rate = round((done / total * 100), 1) if total > 0 else 0 busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting")) payload = { "id": MACHINE_ID, "status": "busy" if busy_count > 0 else "online", "active_threads": busy_count, "max_threads": WORKER_TASKS, "leads_done": done, "leads_failed": failed, "success_rate": success_rate, "current_action": next( (a for a in threads.values() if a not in ("Idle", "Starting")), "Idle" ), "version": "3.0.0", "last_seen": time.time(), } blob = json.dumps(payload) await client.setex(key, 30, blob) await client.set(f"{key}:snapshot", blob) await asyncio.sleep(5) async def main(): logger.info("=" * 50) logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}") logger.info("=" * 50) client = redis.Redis(connection_pool=get_redis_pool()) if RESET_SENT_DEDUP_DAILY: import datetime now = datetime.datetime.now(datetime.timezone.utc) tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1) ttl = int((tomorrow - now).total_seconds()) await client.expire(SENT_LEADS_SET, ttl) logger.info(f"SENT_LEADS_SET TTL set to {ttl}s (expires at UTC midnight).") else: logger.info("SENT_LEADS_SET daily auto-reset disabled. Worker state stays stable across resets.") for i in range(WORKER_TASKS): t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}") _all_worker_tasks.append(t) task_coroutines = [t.run() for t in _all_worker_tasks] await asyncio.gather(machine_heartbeat_loop(), *task_coroutines) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Worker node shutting down.")