rdp / worker.py
kailash22's picture
Update worker.py
64c4dc0 verified
import os
import sys
import subprocess
def _ensure_dependencies():
try:
import redis
import playwright
except ImportError:
print("[-] Missing dependencies. Auto-installing 'redis' and 'playwright'...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "redis", "playwright"])
print("[-] Installing Playwright chromium browser...")
subprocess.check_call([sys.executable, "-m", "playwright", "install", "chromium"])
print("[+] Installation complete. Restarting worker...")
os.execv(sys.executable, [sys.executable] + sys.argv)
_ensure_dependencies()
import json
import logging
import signal
import asyncio
import uuid
import time
import socket
from typing import Any, Dict, List, Optional
import redis.asyncio as redis
# Playwright imports
from playwright.async_api import async_playwright, Page, Browser, BrowserContext
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
REDIS_HOST = os.getenv("REDIS_HOST", "18.188.157.122")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
LEAD_QUEUE = "lead_queue"
PROCESSING_QUEUE = "processing_queue"
RESULT_QUEUE = "result_queue"
ACCOUNT_POOL_READY = "account_pool:ready"
ACCOUNT_POOL_ASSIGNED = "account_pool:assigned"
ACCOUNT_POOL_PROCESSING = "account_pool:processing"
ACCOUNT_POOL_COOLDOWN = "account_pool:cooldown"
RETRY_COUNT_KEY = "retry_count"
# Key that stores a SET of lead emails that are currently in-flight or already done.
# This is the deduplication guard: a lead email is added here before processing
# and removed only if it fails (so it can be retried). If it succeeds it stays
# forever (within the day) so it cannot be sent again.
SENT_LEADS_SET = "leads:sent_today"
WORKER_TASKS = int(os.getenv("WORKER_THREADS", "10"))
BATCH_SIZE = 10 # leads per account session
DEFAULT_COOLDOWN_SECONDS = int(os.getenv("DEFAULT_COOLDOWN_SECONDS", "600")) # 10 minutes between batches
RESET_SENT_DEDUP_DAILY = os.getenv("RESET_SENT_DEDUP_DAILY", "0").lower() in {"1", "true", "yes", "on"}
ADMIN_SETTINGS_FILE = os.getenv(
"ADMIN_SETTINGS_FILE",
r"C:\Users\PRADIP MUDI\Downloads\Telegram Desktop\lead_processor\admin_state.json"
)
MAX_RETRIES = 3
POLL_TIMEOUT = 5 # seconds to block on BRPOPLPUSH
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("worker")
_shutdown_requested = False
# Machine identity (1 machine = 1 worker in dashboard)
MACHINE_ID = socket.gethostname()
# Shared stats across all coroutines on this machine
_machine_stats = {
"total_done": 0,
"total_failed": 0,
"threads": {}, # task_id -> current_action
}
_stats_lock = asyncio.Lock()
def _signal_handler(signum, frame):
global _shutdown_requested
logger.info("Shutdown signal received. Draining worker pool...")
_shutdown_requested = True
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
_redis_pool = None
def get_redis_pool():
global _redis_pool
if _redis_pool is None:
_redis_pool = redis.ConnectionPool(
host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD,
decode_responses=True, max_connections=50
)
return _redis_pool
async def get_cooldown_seconds(client: redis.Redis) -> int:
try:
raw = await client.get("config:account_cooldown_seconds")
if raw is None:
if os.path.exists(ADMIN_SETTINGS_FILE):
with open(ADMIN_SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f).get("settings", {})
raw = settings.get("cooldown_seconds")
if raw is None:
return DEFAULT_COOLDOWN_SECONDS
value = int(raw)
return max(60, min(value, 24 * 3600))
except Exception as e:
logger.warning(f"Could not read cooldown setting, using default: {e}")
return DEFAULT_COOLDOWN_SECONDS
# -----------------------------------------------------------------------------
# Automation Engine
# -----------------------------------------------------------------------------
class AutomationEngine:
"""Robust Playwright automation engine"""
def __init__(self):
self.playwright = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
async def start(self, headless: bool = False):
self.playwright = await async_playwright().start()
browser_options = {
"headless": headless,
"args": [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-accelerated-2d-canvas",
"--no-first-run",
"--no-zygote",
"--single-process",
"--disable-gpu"
]
}
self.browser = await self.playwright.chromium.launch(**browser_options)
# Use a realistic User-Agent to avoid "Headless" detection
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
self.context = await self.browser.new_context(
user_agent=user_agent,
locale="en-US",
timezone_id="UTC",
viewport={"width": 1280, "height": 800}
)
self.page = await self.context.new_page()
# Aggressive network blocking for super low bandwidth
excluded_resource_types = {"image", "font", "media"}
async def route_interceptor(route):
if route.request.resource_type in excluded_resource_types:
return await route.abort()
return await route.continue_()
await self.page.route("**/*", route_interceptor)
async def stop(self):
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
async def is_visible_robust(self, locator, timeout_ms: int = 5000) -> bool:
"""Wait for a locator to be visible and return bool, without throwing."""
try:
await locator.wait_for(state="visible", timeout=timeout_ms)
return True
except:
return False
async def ensure_on_lead_form(self) -> bool:
"""Robustly ensure the browser is on the lead input form, handling transitions and reloads."""
try:
# 1. Check if already there
email_input = self.page.locator('[data-testid="email-input"]')
if await self.is_visible_robust(email_input, 3000):
return True
logger.info(f"Not on lead form (URL: {self.page.url}). Attempting to navigate...")
# 2. Handle 'mysignins' or security info redirects
if "mysignins" in self.page.url.lower() or "security" in self.page.url.lower():
# Try clicking "Set up a different way" transition links first
transition_selectors = [
'text="I want to set up a different method"',
'text="Set up a different way to sign in"',
'button:has-text("different way")',
'button:has-text("different method")',
'button[data-testid="choose-different-method-link"]',
'a:has-text("different method")',
'#otherWaysLink'
]
for sel in transition_selectors:
btn = self.page.locator(sel)
if await self.is_visible_robust(btn, 2000):
logger.info(f"Clicking transition link: {sel}")
await btn.click()
try:
await self.page.wait_for_load_state("networkidle", timeout=20000)
except Exception:
await self.page.wait_for_load_state("domcontentloaded", timeout=10000)
break
# 3. Look for "Email" method buttons
email_btns = [
'button:has-text("Email")',
'[data-testid="email-method-button"]',
'text="Email Receive a code to reset"',
'button:has-text("Receive a code to reset")',
'div[role="button"]:has-text("Email")',
'span:has-text("Email")'
]
for sel in email_btns:
btn = self.page.locator(sel)
if await self.is_visible_robust(btn, 3000):
logger.info(f"Clicking Email method via {sel}")
await btn.click()
try:
await self.page.wait_for_load_state("networkidle", timeout=20000)
except Exception:
await self.page.wait_for_load_state("domcontentloaded", timeout=10000)
if await self.is_visible_robust(email_input, 5000):
return True
# 4. Try clicking back button
back_btn = self.page.locator('[data-testid="backButton"]')
if await self.is_visible_robust(back_btn, 2000):
logger.info("Clicking back button...")
await back_btn.click()
await asyncio.sleep(2)
if await self.is_visible_robust(email_input, 3000):
return True
# 5. Final fallback: Reload
logger.warning("Still not on form. Reloading page...")
try:
await self.page.reload(timeout=60000, wait_until="domcontentloaded")
except Exception as e:
logger.warning(f"Reload timed out or failed: {e}")
await asyncio.sleep(5)
# Re-check after reload
if await self.is_visible_robust(email_input, 5000):
return True
# One more attempt at Email button after reload
for sel in email_btns:
btn = self.page.locator(sel)
if await self.is_visible_robust(btn, 3000):
await btn.click()
await asyncio.sleep(5)
break
return await self.is_visible_robust(email_input, 5000)
except Exception as e:
logger.error(f"Error in ensure_on_lead_form: {e}")
return False
async def login_microsoft(self, email: str, temp_pass: str) -> bool:
"""Robust Login using TAP (Temporary Access Pass) flow with branching logic"""
try:
logger.info(f"Logging in as {email} using TAP flow...")
login_url = (
"https://login.microsoftonline.com/common/oauth2/v2.0/authorize?"
"client_id=4765445b-32c6-49b0-83e6-1d93765276ca&"
"redirect_uri=https%3A%2F%2Fwww.office.com%2Flandingv2&"
"response_type=code%20id_token&"
"scope=openid%20profile%20https%3A%2F%2Fwww.office.com%2Fv2%2FOfficeHome.All&"
"response_mode=form_post"
)
await self.page.goto(login_url, timeout=90000)
# 1. Email Step
email_field = self.page.locator('input[type="email"], input[name="loginfmt"]').first
try:
await email_field.wait_for(state="visible", timeout=60000)
except Exception:
pass # allow fill to attempt and throw its own error
await email_field.fill(email, timeout=30000)
# Click and also press Enter to be sure
await self.page.locator('input[type="submit"], #idSIButton9').click()
await self.page.keyboard.press("Enter")
logger.info("Email submitted, waiting for redirect...")
# 2. Detect next state (TAP vs Password vs Security)
# Increase attempts to 15 for slower environments
for attempt in range(15):
curr_url = self.page.url
logger.info(f"Login Progress (Attempt {attempt+1}): {curr_url}")
# Case A: Temporary Access Pass field
tap_field = self.page.locator('input[name="otc"], input[placeholder="Temporary Access Pass"]')
if await self.is_visible_robust(tap_field, 3000):
logger.info("TAP field detected.")
await tap_field.fill(temp_pass)
await self.page.locator('input[type="submit"], #idSIButton9').click()
await self.page.keyboard.press("Enter")
break
# Case B: Password field
pwd_field = self.page.locator('input[type="password"]')
if await self.is_visible_robust(pwd_field, 1000):
logger.warning("Microsoft asked for Password instead of TAP. Attempting to switch...")
# Try direct TAP link or "Other ways"
tap_switch_selectors = [
'text="Use a Temporary Access Pass instead"',
'#idA_PWD_SwitchToOTC',
'a:has-text("Temporary Access Pass")',
'text="Other ways to sign in"',
'text="Sign in another way"',
'#idA_PWD_SwitchToOther'
]
for sel in tap_switch_selectors:
link = self.page.locator(sel)
if await self.is_visible_robust(link, 2000):
logger.info(f"Clicking switch option: {sel}")
await link.click()
await asyncio.sleep(3)
# Check if TAP link appeared after clicking "Other ways"
tap_link = self.page.locator('text="Temporary Access Pass"')
if await self.is_visible_robust(tap_link, 3000):
await tap_link.click()
await asyncio.sleep(3)
break
# Re-check if we are now on TAP page
if await self.is_visible_robust(self.page.locator('input[placeholder="Temporary Access Pass"]'), 2000):
continue
logger.error("Could not find TAP option on password page.")
return False
# Case C: "Stay signed in?"
if "kmsi" in curr_url.lower() or await self.is_visible_robust(self.page.locator('text="Stay signed in?"'), 1000):
logger.info("Handling 'Stay signed in?' prompt.")
await self.page.locator('input[type="submit"], #idSIButton9').click()
await self.page.keyboard.press("Enter")
break
# Case D: Security / More info required
if "proofup" in curr_url.lower() or await self.is_visible_robust(self.page.locator('text="More information required"'), 1000):
logger.info("Handling 'More information required' prompt.")
await self.page.locator('input[type="submit"], #idSIButton9').click()
await self.page.keyboard.press("Enter")
await asyncio.sleep(5)
continue
# Case E: Account Disambiguation (Work/School vs Personal)
work_btn = self.page.locator('text="Work or school account"')
if await self.is_visible_robust(work_btn, 1000):
logger.info("Selecting 'Work or school account'.")
await work_btn.click()
await asyncio.sleep(3)
continue
# Fallback: Warning if still on login page and stuck
if attempt == 7 and "login.microsoftonline.com" in curr_url:
logger.warning(f"Login stuck for {email}. URL: {curr_url}")
await asyncio.sleep(3)
# 3. Final verification and MFA Method Selection
await asyncio.sleep(5)
# Check for incorrect TAP
if await self.is_visible_robust(self.page.locator('text="Your Temporary Access Pass is incorrect."'), 5000):
logger.error(f"Incorrect Temporary Access Pass for {email}")
return False
# Case: "Keep your account secure" / "More information required"
if "proofup" in self.page.url.lower() or await self.is_visible_robust(self.page.get_by_role("heading", name="Let's keep your account secure"), 5000):
logger.info("On security setup page. Clicking Next...")
await self.page.get_by_role("button", name="Next").click()
try:
await self.page.wait_for_load_state("networkidle", timeout=20000)
except Exception:
await self.page.wait_for_load_state("domcontentloaded", timeout=10000)
# 4. Force "Email" method for lead processing
return await self.ensure_on_lead_form()
logger.info(f"Login flow completed for {email}. Final URL: {self.page.url}")
return True
except Exception as e:
logger.error(f"Login failed for {email}: {e}")
return False
async def process_lead(self, email: str) -> tuple[bool, Optional[str]]:
"""Process a single lead OTP flow with recovery logic"""
try:
# Ensure we are on a clean starting state
if not await self.ensure_on_lead_form():
logger.error(f"Could not reach lead form for {email}")
return False, "form_not_reachable"
# Wait for email input
email_input = self.page.locator('[data-testid="email-input"]')
await email_input.wait_for(timeout=45000)
await email_input.fill(email)
# Click and also press Enter to be sure
next_btn = self.page.locator('[data-testid="reskin-step-next-button"]')
await next_btn.click(timeout=45000)
await self.page.keyboard.press("Enter")
# Wait for OTP or Error
target_state = (
'[data-testid="email-verify-challenge-otp-input"], '
'[data-testid="message-bar-error"], '
'#usernameError, #i0116Error, '
'iframe[src*="arkoselabs"], iframe[src*="fc/a/"]'
)
try:
await self.page.wait_for_selector(target_state, timeout=60000)
except Exception as e:
return False, f"Timeout: {str(e)}"
# Handle known inline errors (invalid email, etc)
if await self.is_visible_robust(self.page.locator('#usernameError, #i0116Error'), 1000):
text = await self.page.locator('#usernameError, #i0116Error').first.inner_text()
# Must go back if it's an inline error so the form is ready for the next lead
await self.page.locator('[data-testid="backButton"], #idBtn_Back').first.click(timeout=3000)
await asyncio.sleep(1)
return False, f"Invalid Email: {text}"
# Handle Captcha
if await self.is_visible_robust(self.page.locator('iframe[src*="arkoselabs"], iframe[src*="fc/a/"]'), 1000):
return False, "captcha_detected"
if await self.is_visible_robust(self.page.locator('[data-testid="message-bar-error"]'), 1000):
text = await self.page.locator('[data-testid="message-bar-error"]').inner_text()
if "too many" in text.lower():
return False, "rate_limit"
return False, text
if await self.is_visible_robust(self.page.locator('[data-testid="email-verify-challenge-otp-input"]'), 1000):
# SUCCESS - but we MUST go back for the next lead
back_btn = self.page.locator('[data-testid="backButton"]')
if await self.is_visible_robust(back_btn, 5000):
await back_btn.click()
logger.info(f"OTP Flow Success: {email}")
return True, None
return False, "otp_not_found"
except Exception as e:
logger.error(f"Error in process_lead for {email}: {e}")
return False, str(e)
# -----------------------------------------------------------------------------
# Worker Async Task Logic
# -----------------------------------------------------------------------------
class WorkerTask:
def __init__(self, task_id: str):
self.task_id = task_id
self.client = redis.Redis(connection_pool=get_redis_pool())
self.leads_done = 0
self.leads_failed = 0
self.current_action = "Starting"
# ------------------------------------------------------------------
# Fetch exactly BATCH_SIZE UNIQUE leads from the queue.
# Uses a Redis SET (SENT_LEADS_SET) to skip leads already processed
# today. Each lead that passes the dedup check is atomically claimed
# via SETNX so no two threads can process the same email at once.
# ------------------------------------------------------------------
async def _fetch_unique_batch(self, account_email: str) -> List[Dict[str, Any]]:
"""
Pops leads from the isolated account_queue provided by the Assigner.
Global and Account-level dedup are already handled by Assigner,
but we still check locally as a secondary guard.
"""
unique_leads = []
account_queue = f"account_queue:{account_email}"
used_key = f"used:{account_email}"
skipped = 0
try:
# We pop up to BATCH_SIZE items from the account queue
raw_leads = await self.client.lrange(account_queue, 0, BATCH_SIZE - 1)
if not raw_leads:
return []
# Remove them from the list
await self.client.ltrim(account_queue, len(raw_leads), -1)
for raw_lead in raw_leads:
try:
data = json.loads(raw_lead)
except Exception:
continue
lead_email = data.get("email", "").strip().lower()
if not lead_email:
continue
# Secondary dedup guard
already_sent = await self.client.sismember(used_key, lead_email)
if already_sent:
skipped += 1
logger.debug(f"Skipping lead {lead_email} - already used by {account_email}")
continue
# Claim this lead for this account
claimed = await self.client.sadd(used_key, lead_email)
if not claimed:
skipped += 1
continue
data["email"] = lead_email
unique_leads.append({"_raw": raw_lead, "data": data})
except Exception as e:
logger.error(f"Error fetching unique batch: {e}")
if skipped:
logger.info(f"[Dedup] Skipped {skipped} duplicate leads in this batch fetch.")
return unique_leads
async def _cleanup_and_push_results(self, leads: List[Dict[str, Any]], results: List[Dict[str, Any]], account_email: str):
if not leads:
return
try:
pipe = self.client.pipeline()
for lead, result in zip(leads, results):
pipe.lpush(RESULT_QUEUE, json.dumps(result))
lead_id = lead["data"].get("id", str(uuid.uuid4()))
if result["success"]:
pipe.hdel(RETRY_COUNT_KEY, lead_id)
else:
# On failure, un-claim from SENT_LEADS_SET so it can be
# retried next time (unless max retries exceeded).
retries = int(result.get("_retries", 0))
if retries < MAX_RETRIES:
lead_email = lead["data"].get("email", "")
pipe.srem(f"used:{account_email}", lead_email)
await pipe.execute()
except Exception as e:
logger.error(f"Error in cleanup and push: {e}")
async def run(self):
logger.info(f"Task {self.task_id} started. Batch Size: {BATCH_SIZE}")
while not _shutdown_requested:
# Update shared machine stats
async with _stats_lock:
_machine_stats["threads"][self.task_id] = self.current_action
_machine_stats["total_done"] = sum(t.leads_done for t in _all_worker_tasks)
_machine_stats["total_failed"] = sum(t.leads_failed for t in _all_worker_tasks)
# ----------------------------------------------------------
# 1. Lease ONE account from Assigned pool (atomic BRPOPLPUSH)
# ----------------------------------------------------------
try:
email = await self.client.brpoplpush(ACCOUNT_POOL_ASSIGNED, ACCOUNT_POOL_PROCESSING, timeout=5)
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
logger.warning(f"Redis blocking operation interrupted: {e}")
await asyncio.sleep(2)
continue
if not email:
self.current_action = "Idle"
continue
acc_key = f"account:{email}"
acc_pass = await self.client.hget(acc_key, "temp_pass")
if not acc_pass:
logger.warning(f"Account {email} has no temp_pass. Moving to pending_refresh.")
await self.client.sadd("account_pool:pending_refresh", email)
continue
# ----------------------------------------------------------
# 2. Fetch up to 10 UNIQUE leads
# ----------------------------------------------------------
self.current_action = "Fetching Leads"
batch = await self._fetch_unique_batch(email)
if not batch:
# No leads — return account to ready pool and clean up assignment
await self.client.lrem(ACCOUNT_POOL_PROCESSING, -1, email)
await self.client.sadd(ACCOUNT_POOL_READY, email)
self.current_action = "Idle"
await asyncio.sleep(1)
continue
logger.info(f"Task {self.task_id}: Account={email}, Leads in account_queue={len(batch)}")
results = []
engine = AutomationEngine()
login_success = False
try:
await engine.start(headless=True)
# ----------------------------------------------------------
# 3. Login once per account session
# ----------------------------------------------------------
self.current_action = f"Logging in: {email}"
login_success = await engine.login_microsoft(email, acc_pass)
for item_idx, item in enumerate(batch):
lead_data = item["data"]
lead_id = lead_data.get("id", str(uuid.uuid4()))
raw_data = item["_raw"]
lead_email = lead_data.get("email")
self.current_action = f"Processing: {lead_email}"
if not login_success:
# Login failed — requeue lead to lead_queue, unclaim, and abort account
pipe = self.client.pipeline()
pipe.lpush(LEAD_QUEUE, raw_data)
pipe.srem(f"used:{email}", lead_email) # ← un-claim so it can be retried
await pipe.execute()
results.append({
"lead_id": lead_id,
"success": False,
"error": "login_failed",
"worker_id": self.task_id,
"row_number": lead_data.get("row_number"),
"_skip": True,
})
item["_skip"] = True
continue
# --------------------------------------------------
# 4. Process each lead
# --------------------------------------------------
try:
retries = int(await self.client.hget(RETRY_COUNT_KEY, lead_id) or 0)
start_time = time.time()
success, err = await engine.process_lead(lead_email)
elapsed = (time.time() - start_time) * 1000
results.append({
"lead_id": lead_id,
"success": success,
"output": {"error": err} if not success else {},
"error": err if not success else None,
"processing_time_ms": elapsed,
"worker_id": self.task_id,
"row_number": lead_data.get("row_number"),
"_retries": retries,
})
if success:
self.leads_done += 1
logger.info(f"[+] Sent lead: {lead_email} via {email}")
else:
self.leads_failed += 1
logger.warning(f"[-] Failed lead: {lead_email}. Reason: {err}")
abort_batch = False
if err == "rate_limit":
logger.warning("Rate limit hit — stopping batch early.")
abort_batch = True
elif err == "captcha_detected":
logger.warning("Captcha detected — stopping batch early to avoid blocks.")
abort_batch = True
elif err and "Timeout" in str(err):
logger.warning("Page timeout detected (broken page) — stopping batch early.")
abort_batch = True
if abort_batch:
# Requeue remaining leads in the batch so they aren't wasted/lost
if item_idx + 1 < len(batch):
pipe = self.client.pipeline()
for future_item in batch[item_idx + 1:]:
pipe.lpush(LEAD_QUEUE, future_item["_raw"])
pipe.srem(f"used:{email}", future_item["data"].get("email", ""))
future_item["_skip"] = True
await pipe.execute()
break
except Exception as e:
logger.error(f"Error processing lead {lead_id}: {e}")
retries += 1
await self.client.hset(RETRY_COUNT_KEY, lead_id, retries)
if retries >= MAX_RETRIES:
results.append({
"lead_id": lead_id,
"success": False,
"error": str(e),
"worker_id": self.task_id,
"row_number": lead_data.get("row_number"),
"_retries": retries,
})
else:
# Requeue and un-claim
pipe = self.client.pipeline()
pipe.lpush(LEAD_QUEUE, raw_data)
pipe.srem(f"used:{email}", lead_email)
await pipe.execute()
item["_skip"] = True
except Exception as e:
logger.error(f"Engine exception: {e}")
finally:
await engine.stop()
# ----------------------------------------------------------
# 5. Account routing after batch
# ----------------------------------------------------------
if not login_success:
await self.client.sadd("account_pool:pending_refresh", email)
logger.info(f"Account {email} login failed → pending_refresh.")
else:
# Count only leads that were actually attempted (not skipped/requeued)
actually_sent = sum(
1 for r in results
if not r.get("_skip") and r["success"]
)
leads_sent = await self.client.hincrby(acc_key, "leads_sent_today", actually_sent)
now = time.time()
cooldown_seconds = await get_cooldown_seconds(self.client)
pipe = self.client.pipeline()
pipe.hset(acc_key, "cooldown_until", now + cooldown_seconds)
pipe.sadd(ACCOUNT_POOL_COOLDOWN, email)
pipe.srem("account_pool:done_for_today", email)
await pipe.execute()
logger.info(
f"Account {email} batch done ({leads_sent} leads total) "
f"-> Cooldown {round(cooldown_seconds / 60, 1)}m."
)
# ----------------------------------------------------------
# 6. Push results to result_queue and clean processing_queue
# ----------------------------------------------------------
final_batch = [b for b in batch if not b.get("_skip")]
valid_ids = {b["data"].get("id") for b in final_batch}
final_results = [r for r in results if r.get("lead_id") in valid_ids and not r.get("_skip")]
await self._cleanup_and_push_results(final_batch, final_results, email)
# Global list of all WorkerTask instances on this machine
_all_worker_tasks: list["WorkerTask"] = []
async def machine_heartbeat_loop():
"""Sends ONE heartbeat key for the whole machine every 5s."""
client = redis.Redis(connection_pool=get_redis_pool())
key = f"worker:machine:{MACHINE_ID}"
while not _shutdown_requested:
async with _stats_lock:
done = _machine_stats["total_done"]
failed = _machine_stats["total_failed"]
threads = dict(_machine_stats["threads"])
total = done + failed
success_rate = round((done / total * 100), 1) if total > 0 else 0
busy_count = sum(1 for a in threads.values() if a not in ("Idle", "Starting"))
payload = {
"id": MACHINE_ID,
"status": "busy" if busy_count > 0 else "online",
"active_threads": busy_count,
"max_threads": WORKER_TASKS,
"leads_done": done,
"leads_failed": failed,
"success_rate": success_rate,
"current_action": next(
(a for a in threads.values() if a not in ("Idle", "Starting")),
"Idle"
),
"version": "3.0.0",
"last_seen": time.time(),
}
blob = json.dumps(payload)
await client.setex(key, 30, blob)
await client.set(f"{key}:snapshot", blob)
await asyncio.sleep(5)
async def main():
logger.info("=" * 50)
logger.info(f"Starting Async Playwright Worker [{MACHINE_ID}]. Tasks: {WORKER_TASKS}")
logger.info("=" * 50)
client = redis.Redis(connection_pool=get_redis_pool())
if RESET_SENT_DEDUP_DAILY:
import datetime
now = datetime.datetime.now(datetime.timezone.utc)
tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1)
ttl = int((tomorrow - now).total_seconds())
await client.expire(SENT_LEADS_SET, ttl)
logger.info(f"SENT_LEADS_SET TTL set to {ttl}s (expires at UTC midnight).")
else:
logger.info("SENT_LEADS_SET daily auto-reset disabled. Worker state stays stable across resets.")
for i in range(WORKER_TASKS):
t = WorkerTask(f"worker-{uuid.uuid4().hex[:6]}")
_all_worker_tasks.append(t)
task_coroutines = [t.run() for t in _all_worker_tasks]
await asyncio.gather(machine_heartbeat_loop(), *task_coroutines)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Worker node shutting down.")