| import os |
| import sys |
| import subprocess |
|
|
| def _ensure_dependencies(): |
| try: |
| import redis |
| import playwright |
| except ImportError: |
| print("[-] Missing dependencies. Auto-installing 'redis' and 'playwright'...") |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "redis", "playwright"]) |
| print("[-] Installing Playwright chromium browser...") |
| subprocess.check_call([sys.executable, "-m", "playwright", "install", "chromium"]) |
| print("[+] Installation complete. Restarting worker...") |
| os.execv(sys.executable, [sys.executable] + sys.argv) |
|
|
| _ensure_dependencies() |
|
|
| import json |
| import logging |
| import signal |
| import asyncio |
| import uuid |
| import time |
| import socket |
| from typing import Any, Dict, List, Optional |
| import redis.asyncio as redis |
|
|
| |
| from playwright.async_api import async_playwright, Page, Browser, BrowserContext |
|
|
| |
| |
| |
| REDIS_HOST = os.getenv("REDIS_HOST", "54.244.160.21") |
| REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) |
| REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None) |
|
|
| LEAD_QUEUE = "lead_queue" |
| PROCESSING_QUEUE = "processing_queue" |
| RESULT_QUEUE = "result_queue" |
| ACCOUNT_POOL_READY = "account_pool:ready" |
| ACCOUNT_POOL_ASSIGNED = "account_pool:assigned" |
| ACCOUNT_POOL_PROCESSING = "account_pool:processing" |
| ACCOUNT_POOL_COOLDOWN = "account_pool:cooldown" |
| RETRY_COUNT_KEY = "retry_count" |
|
|
| |
| |
| |
| |
| SENT_LEADS_SET = "leads:sent_today" |
|
|
| WORKER_TASKS = int(os.getenv("WORKER_THREADS", "10")) |
| BATCH_SIZE = 10 |
| DEFAULT_COOLDOWN_SECONDS = int(os.getenv("DEFAULT_COOLDOWN_SECONDS", "600")) |
| RESET_SENT_DEDUP_DAILY = os.getenv("RESET_SENT_DEDUP_DAILY", "0").lower() in {"1", "true", "yes", "on"} |
| ADMIN_SETTINGS_FILE = os.getenv( |
| "ADMIN_SETTINGS_FILE", |
| r"C:\Users\PRADIP MUDI\Downloads\Telegram Desktop\lead_processor\admin_state.json" |
| ) |
| MAX_RETRIES = 3 |
| POLL_TIMEOUT = 5 |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger("worker") |
|
|
| _shutdown_requested = False |
|
|
| |
| MACHINE_ID = socket.gethostname() |
|
|
| |
| _machine_stats = { |
| "total_done": 0, |
| "total_failed": 0, |
| "threads": {}, |
| } |
| _stats_lock = asyncio.Lock() |
|
|
| def _signal_handler(signum, frame): |
| global _shutdown_requested |
| logger.info("Shutdown signal received. Draining worker pool...") |
| _shutdown_requested = True |
|
|
| signal.signal(signal.SIGINT, _signal_handler) |
| signal.signal(signal.SIGTERM, _signal_handler) |
|
|
| _redis_pool = None |
|
|
| def get_redis_pool(): |
| global _redis_pool |
| if _redis_pool is None: |
| _redis_pool = redis.ConnectionPool( |
| host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, |
| decode_responses=True, max_connections=50 |
| ) |
| return _redis_pool |
|
|
| async def get_cooldown_seconds(client: redis.Redis) -> int: |
| try: |
| raw = await client.get("config:account_cooldown_seconds") |
| if raw is None: |
| if os.path.exists(ADMIN_SETTINGS_FILE): |
| with open(ADMIN_SETTINGS_FILE, "r", encoding="utf-8") as f: |
| settings = json.load(f).get("settings", {}) |
| raw = settings.get("cooldown_seconds") |
| if raw is None: |
| return DEFAULT_COOLDOWN_SECONDS |
| value = int(raw) |
| return max(60, min(value, 24 * 3600)) |
| except Exception as e: |
| logger.warning(f"Could not read cooldown setting, using default: {e}") |
| return DEFAULT_COOLDOWN_SECONDS |
|
|
| |
| |
| |
| class AutomationEngine: |
| """Robust Playwright automation engine""" |
|
|
| def __init__(self): |
| self.playwright = None |
| self.browser: Optional[Browser] = None |
| self.context: Optional[BrowserContext] = None |
| self.page: Optional[Page] = None |
|
|
| async def start(self, headless: bool = True): |
| self.playwright = await async_playwright().start() |
| browser_options = { |
| "headless": headless, |
| "args": [ |
| "--no-sandbox", |
| "--disable-setuid-sandbox", |
| "--disable-dev-shm-usage", |
| "--disable-accelerated-2d-canvas", |
| "--no-first-run", |
| "--no-zygote", |
| "--single-process", |
| "--disable-gpu" |
| ] |
| } |
| self.browser = await self.playwright.chromium.launch(**browser_options) |
| |
| |
| user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" |
| |
| self.context = await self.browser.new_context( |
| user_agent=user_agent, |
| locale="en-US", |
| timezone_id="UTC", |
| viewport={"width": 1280, "height": 800} |
| ) |
| self.page = await self.context.new_page() |
| |
| excluded_resource_types = {"image", "font", "media"} |
|
|
| async def route_interceptor(route): |
| if route.request.resource_type in excluded_resource_types: |
| return await route.abort() |
| return await route.continue_() |
|
|
| await self.page.route("**/*", route_interceptor) |
|
|
| async def stop(self): |
| if self.context: |
| await self.context.close() |
| if self.browser: |
| await self.browser.close() |
| if self.playwright: |
| await self.playwright.stop() |
|
|
| async def login_microsoft(self, email: str, temp_pass: str) -> bool: |
| """Robust Login using TAP (Temporary Access Pass) flow with branching logic""" |
| try: |
| logger.info(f"Logging in as {email} using TAP flow...") |
| login_url = ( |
| "https://login.microsoftonline.com/common/oauth2/v2.0/authorize?" |
| "client_id=4765445b-32c6-49b0-83e6-1d93765276ca&" |
| "redirect_uri=https%3A%2F%2Fwww.office.com%2Flandingv2&" |
| "response_type=code%20id_token&" |
| "scope=openid%20profile%20https%3A%2F%2Fwww.office.com%2Fv2%2FOfficeHome.All&" |
| "response_mode=form_post" |
| ) |
| await self.page.goto(login_url, timeout=60000) |
|
|
| |
| email_field = self.page.locator('input[type="email"], input[name="loginfmt"]') |
| await email_field.wait_for(timeout=30000) |
| await email_field.fill(email) |
| await asyncio.sleep(1) |
| |
| |
| await self.page.locator('input[type="submit"], #idSIButton9').click() |
| await self.page.keyboard.press("Enter") |
| |
| logger.info("Email submitted, waiting for redirect...") |
| await asyncio.sleep(5) |
|
|
| |
| |
| for attempt in range(10): |
| curr_url = self.page.url |
| logger.info(f"Login Progress (Attempt {attempt+1}): {curr_url}") |
|
|
| |
| tap_field = self.page.locator('input[name="otc"], input[placeholder="Temporary Access Pass"]') |
| if await tap_field.is_visible(timeout=3000): |
| logger.info("TAP field detected.") |
| await tap_field.fill(temp_pass) |
| await self.page.locator('input[type="submit"], #idSIButton9').click() |
| await self.page.keyboard.press("Enter") |
| break |
|
|
| |
| pwd_field = self.page.locator('input[type="password"]') |
| if await pwd_field.is_visible(timeout=1000): |
| logger.warning("Microsoft asked for Password instead of TAP. Attempting to switch...") |
| |
| |
| tap_switch_selectors = [ |
| 'text="Use a Temporary Access Pass instead"', |
| '#idA_PWD_SwitchToOTC', |
| 'a:has-text("Temporary Access Pass")' |
| ] |
| for sel in tap_switch_selectors: |
| link = self.page.locator(sel) |
| if await link.is_visible(timeout=2000): |
| logger.info(f"Switching to TAP via {sel}") |
| await link.click() |
| await asyncio.sleep(3) |
| continue |
| |
| |
| other_ways_selectors = [ |
| 'text="Other ways to sign in"', |
| 'text="Sign in another way"', |
| '#idA_PWD_SwitchToOther' |
| ] |
| for sel in other_ways_selectors: |
| link = self.page.locator(sel) |
| if await link.is_visible(timeout=2000): |
| logger.info(f"Opening '{sel}' to find TAP option.") |
| await link.click() |
| await asyncio.sleep(2) |
| |
| |
| tap_link = self.page.locator('text="Temporary Access Pass"') |
| if await tap_link.is_visible(timeout=3000): |
| await tap_link.click() |
| await asyncio.sleep(3) |
| break |
| |
| |
| if await self.page.locator('input[placeholder="Temporary Access Pass"]').is_visible(timeout=2000): |
| continue |
| |
| logger.error("Could not find TAP option on password page.") |
| return False |
|
|
| |
| if "kmsi" in curr_url.lower() or await self.page.locator('text="Stay signed in?"').is_visible(timeout=1000): |
| logger.info("Handling 'Stay signed in?' prompt.") |
| await self.page.locator('input[type="submit"], #idSIButton9').click() |
| await self.page.keyboard.press("Enter") |
| break |
|
|
| |
| if "proofup" in curr_url.lower() or await self.page.locator('text="More information required"').is_visible(timeout=1000): |
| logger.info("Handling 'More information required' prompt.") |
| await self.page.locator('input[type="submit"], #idSIButton9').click() |
| await self.page.keyboard.press("Enter") |
| await asyncio.sleep(5) |
| continue |
| |
| |
| work_btn = self.page.locator('text="Work or school account"') |
| if await work_btn.is_visible(timeout=1000): |
| logger.info("Selecting 'Work or school account'.") |
| await work_btn.click() |
| await asyncio.sleep(3) |
| continue |
|
|
| |
| if attempt == 5 and "login.microsoftonline.com" in curr_url: |
| try: |
| os.makedirs("screenshots", exist_ok=True) |
| ss_path = f"screenshots/stuck_login_{email.split('@')[0]}.png" |
| await self.page.screenshot(path=ss_path) |
| logger.warning(f"Login stuck. Diagnostic screenshot saved to {ss_path}") |
| except: pass |
|
|
| await asyncio.sleep(3) |
|
|
| |
| await asyncio.sleep(5) |
| |
| |
| if await self.page.locator('text="Your Temporary Access Pass is incorrect."').is_visible(timeout=2000): |
| logger.error(f"Incorrect Temporary Access Pass for {email}") |
| return False |
|
|
| |
| if "proofup" in self.page.url.lower() or await self.page.get_by_role("heading", name="Let's keep your account secure").is_visible(timeout=5000): |
| logger.info("On security setup page. Clicking Next...") |
| await self.page.get_by_role("button", name="Next").click() |
| await asyncio.sleep(5) |
|
|
| |
| logger.info(f"Attempting to select 'Email' as MFA method... (URL: {self.page.url})") |
| try: |
| |
| if "mysignins" in self.page.url.lower(): |
| logger.info(f"Detected MySignins environment. URL: {self.page.url}") |
| |
| |
| for load_attempt in range(10): |
| elements = await self.page.locator("button, a").all_inner_texts() |
| if len(elements) > 1: |
| logger.info(f"MySignins elements loaded: {elements}") |
| break |
| logger.info("Waiting for MySignins buttons to appear...") |
| await asyncio.sleep(3) |
| |
| |
| try: |
| os.makedirs("screenshots", exist_ok=True) |
| ss_path = f"screenshots/mysignins_{email.split('@')[0]}.png" |
| await self.page.screenshot(path=ss_path) |
| except: pass |
|
|
| |
| email_btns = [ |
| 'button:has-text("Email")', |
| '[data-testid="email-method-button"]', |
| 'text="Email Receive a code to reset"', |
| 'button:has-text("Receive a code to reset")', |
| 'div[role="button"]:has-text("Email")' |
| ] |
| for sel in email_btns: |
| btn = self.page.locator(sel) |
| if await btn.is_visible(timeout=2000): |
| logger.info(f"Clicking Email method via {sel}") |
| await btn.click() |
| await asyncio.sleep(5) |
| break |
| |
| |
| diff_method_selectors = [ |
| 'text="I want to set up a different method"', |
| 'text="Set up a different way to sign in"', |
| 'button:has-text("different way")', |
| 'button:has-text("different method")' |
| ] |
| for sel in diff_method_selectors: |
| btn = self.page.locator(sel) |
| if await btn.is_visible(timeout=2000): |
| logger.info(f"Clicking transition link: {sel}") |
| await btn.click() |
| await asyncio.sleep(3) |
| break |
|
|
| |
| mfa_selectors = [ |
| 'button[data-testid="choose-different-method-link"]', |
| 'text="I want to set up a different method"', |
| 'text="Set up a different way to sign in"', |
| 'a:has-text("different method")', |
| '#otherWaysLink', |
| 'text="I want to use a different method"' |
| ] |
| |
| found_method_link = False |
| for sel in mfa_selectors: |
| btn = self.page.locator(sel) |
| if await btn.is_visible(timeout=3000): |
| logger.info(f"Found MFA method link: {sel}") |
| await btn.click() |
| found_method_link = True |
| await asyncio.sleep(3) |
| break |
| |
| if found_method_link: |
| |
| email_opt_selectors = [ |
| 'button:has-text("Email")', |
| '[data-testid="email-method-button"]', |
| 'text="Email Receive a code to reset"', |
| 'div[role="button"]:has-text("Email")' |
| ] |
| for sel in email_opt_selectors: |
| opt = self.page.locator(sel) |
| if await opt.is_visible(timeout=5000): |
| logger.info(f"Selecting Email method from list via {sel}") |
| await opt.click() |
| await asyncio.sleep(5) |
| break |
| else: |
| logger.warning("Could not find 'Choose different method' link.") |
| |
| email_btn = self.page.locator('button:has-text("Email"), [data-testid="email-method-button"]') |
| if await email_btn.is_visible(timeout=2000): |
| logger.info("Found Email button as fallback. Clicking.") |
| await email_btn.click() |
| await asyncio.sleep(5) |
|
|
| except Exception as e: |
| logger.warning(f"Error during MFA method selection: {e}") |
|
|
| logger.info(f"Login flow completed for {email}. Final URL: {self.page.url}") |
| return True |
| except Exception as e: |
| logger.error(f"Login failed for {email}: {e}") |
| try: |
| os.makedirs("screenshots", exist_ok=True) |
| await self.page.screenshot(path=f"screenshots/login_fail_{email.split('@')[0]}.png") |
| except: pass |
| return False |
|
|
| async def process_lead(self, email: str) -> tuple[bool, Optional[str]]: |
| """Process a single lead OTP flow""" |
| try: |
| |
| email_btn = self.page.locator('button:has-text("Email")') |
| if await email_btn.is_visible(timeout=10000): |
| await email_btn.click() |
|
|
| |
| email_input = self.page.locator('[data-testid="email-input"]') |
| await email_input.wait_for(timeout=60000) |
| await email_input.fill(email) |
| await self.page.locator('[data-testid="reskin-step-next-button"]').click(timeout=60000) |
|
|
| |
| await self.page.wait_for_selector( |
| '[data-testid="email-verify-challenge-otp-input"], [data-testid="message-bar-error"]', |
| timeout=60000 |
| ) |
|
|
| if await self.page.locator('[data-testid="message-bar-error"]').is_visible(): |
| text = await self.page.locator('[data-testid="message-bar-error"]').inner_text() |
| if "too many" in text.lower(): |
| return False, "rate_limit" |
| return False, text |
|
|
| if await self.page.locator('[data-testid="email-verify-challenge-otp-input"]').is_visible(): |
| await self.page.locator('[data-testid="backButton"]').click() |
| logger.info(f"OTP Flow Success: {email}") |
| return True, None |
|
|
| return False, "otp_not_found" |
| except Exception as e: |
| try: |
| os.makedirs("screenshots", exist_ok=True) |
| await self.page.screenshot(path=f"screenshots/lead_fail_{email.split('@')[0]}.png") |
| except: pass |
| return False, str(e) |
|
|
|
|
| |
| |
| |
| class WorkerTask: |
| def __init__(self, task_id: str): |
| self.task_id = task_id |
| self.client = redis.Redis(connection_pool=get_redis_pool()) |
| self.leads_done = 0 |
| self.leads_failed = 0 |
| self.current_action = "Starting" |
|
|
| |
| |
| |
| |
| |
| |
| async def _fetch_unique_batch(self, account_email: str) -> List[Dict[str, Any]]: |
| """ |
| Pops leads from the isolated account_queue provided by the Assigner. |
| Global and Account-level dedup are already handled by Assigner, |
| but we still check locally as a secondary guard. |
| """ |
| unique_leads = [] |
| account_queue = f"account_queue:{account_email}" |
| used_key = f"used:{account_email}" |
| skipped = 0 |
| |
| try: |
| |
| raw_leads = await self.client.lrange(account_queue, 0, BATCH_SIZE - 1) |
| if not raw_leads: |
| return [] |
| |
| |
| await self.client.ltrim(account_queue, len(raw_leads), -1) |
|
|
| for raw_lead in raw_leads: |
| try: |
| data = json.loads(raw_lead) |
| except Exception: |
| continue |
|
|
| lead_email = data.get("email", "").strip().lower() |
| if not lead_email: |
| continue |
|
|
| |
| already_sent = await self.client.sismember(used_key, lead_email) |
| if already_sent: |
| skipped += 1 |
| logger.debug(f"Skipping lead {lead_email} - already used by {account_email}") |
| continue |
|
|
| |
| claimed = await self.client.sadd(used_key, lead_email) |
| if not claimed: |
| skipped += 1 |
| continue |
|
|
| data["email"] = lead_email |
| unique_leads.append({"_raw": raw_lead, "data": data}) |
|
|
| except Exception as e: |
| logger.error(f"Error fetching unique batch: {e}") |
|
|
| if skipped: |
| logger.info(f"[Dedup] Skipped {skipped} duplicate leads in this batch fetch.") |
|
|
| return unique_leads |
|
|
| async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]], account_email: str): |
| if not leads: |
| return |
| try: |
| pipe = self.client.pipeline() |
| for lead, result in zip(leads, results): |
| pipe.lpush(RESULT_QUEUE, json.dumps(result)) |
|
|
| lead_id = lead["data"].get("id", str(uuid.uuid4())) |
| if result["success"]: |
| pipe.hdel(RETRY_COUNT_KEY, lead_id) |
| else: |
| |
| |
| retries = int(result.get("_retries", 0)) |
| if retries < MAX_RETRIES: |
| lead_email = lead["data"].get("email", "") |
| pipe.srem(f"used:{account_email}", lead_email) |
|
|
| await pipe.execute() |
| except Exception as e: |
| logger.error(f"Error in cleanup and push: {e}") |
|
|
| async def run(self): |
| logger.info(f"Task {self.task_id} started. Batch Size: {BATCH_SIZE}") |
|
|
| while not _shutdown_requested: |
| |
| async with _stats_lock: |
| _machine_stats["threads"][self.task_id] = self.current_action |
| _machine_stats["total_done"] = sum(t.leads_done for t in _all_worker_tasks) |
| _machine_stats["total_failed"] = sum(t.leads_failed for t in _all_worker_tasks) |
|
|
| |
| |
| |
| email = await self.client.brpoplpush(ACCOUNT_POOL_ASSIGNED, ACCOUNT_POOL_PROCESSING, timeout=5) |
| if not email: |
| self.current_action = "Idle" |
| continue |
|
|
| acc_key = f"account:{email}" |
| acc_pass = await self.client.hget(acc_key, "temp_pass") |
|
|
| if not acc_pass: |
| logger.warning(f"Account {email} has no temp_pass. Moving to pending_refresh.") |
| await self.client.sadd("account_pool:pending_refresh", email) |
| continue |
|
|
| |
| |
| |
| self.current_action = "Fetching Leads" |
| batch = await self._fetch_unique_batch(email) |
|
|
| if not batch: |
| |
| await self.client.lrem(ACCOUNT_POOL_PROCESSING, -1, email) |
| await self.client.sadd(ACCOUNT_POOL_READY, email) |
| self.current_action = "Idle" |
| await asyncio.sleep(1) |
| continue |
|
|
| logger.info(f"Task {self.task_id}: Account={email}, Leads in account_queue={len(batch)}") |
|
|
| results = [] |
| engine = AutomationEngine() |
|
|
| try: |
| await engine.start(headless=True) |
|
|
| |
| |
| |
| self.current_action = f"Logging in: {email}" |
| login_success = await engine.login_microsoft(email, acc_pass) |
|
|
| for item_idx, item in enumerate(batch): |
| lead_data = item["data"] |
| lead_id = lead_data.get("id", str(uuid.uuid4())) |
| raw_data = item["_raw"] |
| lead_email = lead_data.get("email") |
|
|
| self.current_action = f"Processing: {lead_email}" |
|
|
| if not login_success: |
| |
| pipe = self.client.pipeline() |
| pipe.lpush(LEAD_QUEUE, raw_data) |
| pipe.srem(f"used:{email}", lead_email) |
| await pipe.execute() |
|
|
| results.append({ |
| "lead_id": lead_id, |
| "success": False, |
| "error": "login_failed", |
| "worker_id": self.task_id, |
| "row_number": lead_data.get("row_number"), |
| "_skip": True, |
| }) |
| item["_skip"] = True |
| continue |
|
|
| |
| |
| |
| try: |
| retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0) |
| start_time = time.time() |
|
|
| success, err = await engine.process_lead(lead_email) |
| elapsed = (time.time() - start_time) * 1000 |
|
|
| results.append({ |
| "lead_id": lead_id, |
| "success": success, |
| "output": {"error": err} if not success else {}, |
| "error": err if not success else None, |
| "processing_time_ms": elapsed, |
| "worker_id": self.task_id, |
| "row_number": lead_data.get("row_number"), |
| "_retries": retries, |
| }) |
|
|
| if success: |
| self.leads_done += 1 |
| logger.info(f"[+] Sent lead: {lead_email} via {email}") |
| else: |
| self.leads_failed += 1 |
| logger.warning(f"[-] Failed lead: {lead_email}. Reason: {err}") |
|
|
| abort_batch = False |
| if err == "rate_limit": |
| logger.warning("Rate limit hit — stopping batch early.") |
| abort_batch = True |
| elif err and "Timeout" in str(err): |
| logger.warning("Page timeout detected (broken page) — stopping batch early.") |
| abort_batch = True |
|
|
| if abort_batch: |
| |
| if item_idx + 1 < len(batch): |
| pipe = self.client.pipeline() |
| for future_item in batch[item_idx + 1:]: |
| pipe.lpush(LEAD_QUEUE, future_item["_raw"]) |
| pipe.srem(f"used:{email}", future_item["data"].get("email", "")) |
| future_item["_skip"] = True |
| await pipe.execute() |
| break |
|
|
| except Exception as e: |
| logger.error(f"Error processing lead {lead_id}: {e}") |
| retries += 1 |
| await self.client.hset(RETRY_COUNT_KEY, lead_id, retries) |
|
|
| if retries >= MAX_RETRIES: |
| results.append({ |
| "lead_id": lead_id, |
| "success": False, |
| "error": str(e), |
| "worker_id": self.task_id, |
| "row_number": lead_data.get("row_number"), |
| "_retries": retries, |
| }) |
| else: |
| |
| pipe = self.client.pipeline() |
| pipe.lpush(LEAD_QUEUE, raw_data) |
| pipe.srem(f"used:{email}", lead_email) |
| await pipe.execute() |
| item["_skip"] = True |
|
|
| except Exception as e: |
| logger.error(f"Engine exception: {e}") |
| finally: |
| await engine.stop() |
|
|
| |
| |
| |
| if not login_success: |
| await self.client.sadd("account_pool:pending_refresh", email) |
| logger.info(f"Account {email} login failed → pending_refresh.") |
| else: |
| |
| actually_sent = sum( |
| 1 for r in results |
| if not r.get("_skip") and r["success"] |
| ) |
| leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", actually_sent) |
|
|
| now = time.time() |
| cooldown_seconds = await get_cooldown_seconds(self.client) |
| pipe = self.client.pipeline() |
| pipe.hset(acc_key, "cooldown_until", now + cooldown_seconds) |
| pipe.sadd(ACCOUNT_POOL_COOLDOWN, email) |
| pipe.srem("account_pool:done_for_today", email) |
| await pipe.execute() |
| logger.info( |
| f"Account {email} batch done ({leads_sent} leads total) " |
| f"-> Cooldown {round(cooldown_seconds / 60, 1)}m." |
| ) |
|
|
| |
| |
| |
| final_batch = [b for b in batch if not b.get("_skip")] |
| valid_ids = {b["data"].get("id") for b in final_batch} |
| final_results = [r for r in results if r.get("lead_id") in valid_ids and not r.get("_skip")] |
|
|
| await self._cleanup_and_push_results(final_batch, final_results, email) |
|
|
|
|
| |
| _all_worker_tasks: list["WorkerTask"] = [] |
|
|
|
|
| async def machine_heartbeat_loop(): |
| """Sends ONE heartbeat key for the whole machine every 5s.""" |
| client = redis.Redis(connection_pool=get_redis_pool()) |
| key = f"worker:machine:{MACHINE_ID}" |
|
|
| while not _shutdown_requested: |
| async with _stats_lock: |
| done = _machine_stats["total_done"] |
| failed = _machine_stats["total_failed"] |
| threads = dict(_machine_stats["threads"]) |
|
|
| total = done + failed |
| success_rate = round((done / total * 100), 1) if total > 0 else 0 |
| busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting")) |
|
|
| payload = { |
| "id": MACHINE_ID, |
| "status": "busy" if busy_count > 0 else "online", |
| "active_threads": busy_count, |
| "max_threads": WORKER_TASKS, |
| "leads_done": done, |
| "leads_failed": failed, |
| "success_rate": success_rate, |
| "current_action": next( |
| (a for a in threads.values() if a not in ("Idle", "Starting")), |
| "Idle" |
| ), |
| "version": "3.0.0", |
| "last_seen": time.time(), |
| } |
| blob = json.dumps(payload) |
| await client.setex(key, 30, blob) |
| await client.set(f"{key}:snapshot", blob) |
| await asyncio.sleep(5) |
|
|
|
|
| async def main(): |
| logger.info("=" * 50) |
| logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}") |
| logger.info("=" * 50) |
|
|
| client = redis.Redis(connection_pool=get_redis_pool()) |
| if RESET_SENT_DEDUP_DAILY: |
| import datetime |
| now = datetime.datetime.now(datetime.timezone.utc) |
| tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1) |
| ttl = int((tomorrow - now).total_seconds()) |
| await client.expire(SENT_LEADS_SET, ttl) |
| logger.info(f"SENT_LEADS_SET TTL set to {ttl}s (expires at UTC midnight).") |
| else: |
| logger.info("SENT_LEADS_SET daily auto-reset disabled. Worker state stays stable across resets.") |
|
|
| for i in range(WORKER_TASKS): |
| t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}") |
| _all_worker_tasks.append(t) |
|
|
| task_coroutines = [t.run() for t in _all_worker_tasks] |
| await asyncio.gather(machine_heartbeat_loop(), *task_coroutines) |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| asyncio.run(main()) |
| except KeyboardInterrupt: |
| logger.info("Worker node shutting down.") |
|
|