treel / backend /core /trae_bot.py
StarrySkyWorld's picture
Fix pricing goto abort handling
f561a9d
from playwright.sync_api import Page
from camoufox.sync_api import Camoufox
import time
from ext import mail_service
import random
import httpx
import qrcode
import io
import logging
import threading
import json
import os
import base64
import traceback
logger = logging.getLogger("TraeBot")
class LoginRequiredError(Exception):
pass
class TraeBot:
def __init__(self, session_id: str, callback=None):
self.session_id = session_id
self.callback = callback
self.mail = mail_service.MailService()
self.ifGotToken = False
self.token_data = None
self.qr_code_url = None
self.logs = []
self.status = "idle" # idle, running, waiting_for_scan, success, error
self.current_stage = "Initializing"
self.last_error = None
self.last_traceback = None
self.last_url = None
self.last_seen_url = None
self.last_title = None
self.last_html = None
self.last_screenshot_png_b64 = None
def log(self, message: str, level="info"):
entry = {"timestamp": time.time(), "message": message, "level": level}
self.logs.append(entry)
if self.callback:
self.callback("log", entry)
if level == "error":
logger.error(f"[{self.session_id}] {message}")
elif level == "debug":
logger.debug(f"[{self.session_id}] {message}")
else:
logger.info(f"[{self.session_id}] {message}")
def update_stage(self, stage: str):
self.current_stage = stage
def _try_get_location_href(self, page):
try:
return page.evaluate("() => location.href")
except Exception:
return None
def _try_get_title(self, page):
try:
return page.title()
except Exception:
return None
def _safe_visible(self, page, selector: str) -> bool:
try:
return page.locator(selector).first.is_visible(timeout=500)
except Exception:
return False
def _is_login_page(self, page) -> bool:
url = None
try:
url = page.url
except Exception:
url = None
if url and any(part in url for part in ("/login", "/sign-in", "/signin")):
return True
title = self._try_get_title(page)
if title and any(x in title.lower() for x in ("log in", "login", "sign in")):
if self._safe_visible(page, "input[type='password']") or self._safe_visible(page, "text=Log in") or self._safe_visible(page, "text=Sign in"):
return True
if self._safe_visible(page, "input[placeholder='Email']") and self._safe_visible(page, "input[type='password']"):
return True
return False
def _safe_goto(self, page, url: str, timeout: int = 40000):
self.last_seen_url = self._try_get_location_href(page) or getattr(page, "url", None)
self.last_title = self._try_get_title(page)
wait_untils = ("load", "domcontentloaded", "networkidle")
last_exc = None
for wait_until in wait_untils:
try:
self.log(f"Goto {url} (waitUntil={wait_until})", "debug")
page.goto(url, timeout=timeout, wait_until=wait_until)
self.last_seen_url = self._try_get_location_href(page) or getattr(page, "url", None)
self.last_title = self._try_get_title(page)
return
except Exception as e:
last_exc = e
self.last_seen_url = self._try_get_location_href(page) or getattr(page, "url", None)
self.last_title = self._try_get_title(page)
if "NS_BINDING_ABORTED" not in str(e):
raise
if self._is_login_page(page):
raise LoginRequiredError("Login required") from e
time.sleep(0.5)
if last_exc:
raise last_exc
def _capture_debug(self, page, exc: Exception):
self.last_error = f"{type(exc).__name__}: {exc}"
self.last_traceback = traceback.format_exc()
try:
self.last_url = getattr(page, "url", None)
except Exception:
self.last_url = None
self.last_seen_url = self._try_get_location_href(page) or self.last_url
self.last_title = self._try_get_title(page)
try:
html = page.content()
if isinstance(html, str) and len(html) > 100000:
html = html[:100000]
self.last_html = html
except Exception:
self.last_html = None
try:
png = page.screenshot(full_page=True, type="png")
if isinstance(png, (bytes, bytearray)):
self.last_screenshot_png_b64 = base64.b64encode(png).decode("ascii")
except Exception:
self.last_screenshot_png_b64 = None
def generate_password(self, length: int = 12) -> str:
characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()"
return "".join(random.choice(characters) for _ in range(length))
def run(self):
self.status = "running"
self.update_stage("Starting Browser")
start_time = time.time()
page = None
try:
self.log("Starting Browser and navigate to Trae.ai")
with Camoufox(
headless=os.environ.get("HEADLESS", "1") != "0",
os="windows",
geoip=True,
window=(1280, 720),
humanize=False
) as browser:
page = browser.new_page()
def check_timeout():
if self.status != "waiting_for_scan" and time.time() - start_time > 60:
raise TimeoutError("Registration timed out (exceeded 60 seconds)")
def click_btn(selectors: list):
check_timeout()
for selector in selectors:
try:
page.click(selector)
self.log(f"Try to click {selector}", "debug")
break
except Exception as e:
self.log(f"Failed to click btn with selector {selector}: {e}", "warning")
continue
def fill_input(selectors: list, value: str):
check_timeout()
for selector in selectors:
try:
page.fill(selector, value)
self.log(f"Try to fill {selector}", "debug")
break
except Exception as e:
self.log(f"Failed to fill {selector}: {e}", "warning")
continue
def axis_input(asix: list, value: str):
check_timeout()
x, y = asix
try:
page.mouse.move(x, y)
page.mouse.click(x, y)
page.keyboard.type(value)
self.log(f"Try to input {value} to {asix}", "debug")
except Exception as e:
self.log(f"Failed to input {value} to {asix}: {e}", "error")
def get_element_src(selectors: list) -> str:
check_timeout()
for selector in selectors:
try:
if page.is_visible(selector):
src = page.get_attribute(selector, "src")
if src:
self.log(f"Got src from {selector}: {src}", "debug")
return src
except Exception:
continue
self.log(f"Failed to get src from any selectors", "error")
return ""
def on_response(response):
self.log(f"Received response: {response.url}", "debug")
if "GetUserToken" in response.url:
try:
text = response.text()
self.log(f"Response body captured (token found)")
self.token_data = text
self.ifGotToken = True
if self.callback:
self.callback("token", text)
except Exception as e:
self.log(f"Failed to get response text: {e}", "error")
check_timeout()
self._safe_goto(page, "https://trae.ai/sign-up", timeout=40000)
page.on("response", on_response)
self.update_stage("Creating Email")
self.log("Create temp email")
email = self.mail.create_temp_email()
if not email:
raise Exception("Failed to create email")
self.log(f"Created temp email: {email}")
check_timeout()
self.log("Queue input email")
page.wait_for_selector("div.sc-eqUAAy:nth-child(1) > div:nth-child(1)")
MAIL_INPUT = [
"div.sc-eqUAAy:nth-child(1) > div:nth-child(1) > input:nth-child(1)",
"html.cc--elegant-black body div#root div.sc-gsFSXq.etLQat div.sc-kAyceB.iShCeB div.sc-dhKdcB.ddLexq div.sc-eqUAAy.cliMhU.email div.input-con.undefined input",
"xpath=/html/body/div/div[1]/div[2]/div[4]/div[1]/div[1]/input"
]
fill_input(MAIL_INPUT, email)
self.update_stage("Requesting Code")
self.log("Try to click Send mail btn")
SENDMAIL_BTN = [
".send-code",
"html.cc--elegant-black body div#root div.sc-gsFSXq.etLQat div.sc-kAyceB.iShCeB div.sc-dhKdcB.ddLexq div.sc-eqUAAy.cliMhU.verification-code div.input-con.undefined div.right-part.send-code",
"xpath=/html/body/div/div[1]/div[2]/div[4]/div[2]/div[1]/div[2]"
]
click_btn(SENDMAIL_BTN)
self.update_stage("Waiting for Code")
# Wait for code loop with timeout check
code_start_time = time.time()
trae_code = None
while time.time() - code_start_time < 30: # Inner timeout for code
check_timeout() # Check global timeout
trae_code = self.mail.wait_for_trae_code(email, timeout=5)
if trae_code:
break
if trae_code:
self.log(f"Received Trae code: {trae_code}")
else:
self.log("Failed to receive Trae code", "error")
raise Exception("Failed to receive Trae code")
self.update_stage("Verifying Code")
PASSWORD_INPUT = [
"div.sc-eqUAAy:nth-child(3) > div:nth-child(1) > input:nth-child(1)"
]
password_input_asix = page.query_selector(PASSWORD_INPUT[0]).bounding_box()
if password_input_asix:
self.log(f"Password input asix: {password_input_asix}")
axis_input([password_input_asix["x"]+10, password_input_asix["y"]-54], trae_code)
self.update_stage("Setting Password")
password = self.generate_password()
self.log(f"Generated password: {password}")
fill_input(PASSWORD_INPUT, password)
SIGNUP_BTN = [
"div.sc-gEvEer:nth-child(5)",
"html.cc--elegant-black body div#root div.sc-gsFSXq.etLQat div.sc-kAyceB.iShCeB div.sc-gEvEer.fQLTLP.mb-8.btn-submit.btn-large.trae__btn",
"/html/body/div/div[1]/div[2]/div[5]"
]
click_btn(SIGNUP_BTN)
try:
check_timeout()
self.update_stage("Finalizing Registration")
page.wait_for_url("https://www.trae.ai/account-setting")
self.log("Signup success")
self.update_stage("Claiming Gift")
self.log("Try to claim gift")
self._safe_goto(page, "https://www.trae.ai/pricing")
CLAIM_GIFT_BTN = [
".button_brand_l-UJja6g",
"/html/body/div/div[1]/div[2]/div/div/div[1]/div[2]/div[2]/div[2]/div[4]/button"
]
click_btn(CLAIM_GIFT_BTN)
ALIPAY_BTN = [
"div.pipo-card:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(2) > label:nth-child(1) > div:nth-child(2)",
"/html/body/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/div/div/div[2]/div[2]/div/div/div/div[2]/div/div[2]/label/div"
]
page.wait_for_selector(ALIPAY_BTN[0])
click_btn(ALIPAY_BTN)
self.ifGotToken = False
self.token_data = None
ALIPAY_CONFIRM_BTN = [
".pipo-button-preIcon",
"/html/body/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/div/div/div[2]/div[2]/div/div[2]/div[2]/button"
]
click_btn(ALIPAY_CONFIRM_BTN)
self.update_stage("Generating QR Code")
SUB_ELEMENT = [
".sign-qr-code-image > img:nth-child(1)",
"/html/body/div[1]/div/div[2]/div[2]/div/img"
]
page.wait_for_selector(SUB_ELEMENT[0])
qr_code_src = get_element_src(SUB_ELEMENT)
self.log(f"scan url to get free pro: {qr_code_src}")
if qr_code_src and qr_code_src.startswith("http"):
self.log("Resolving QR code redirect...")
with httpx.Client() as client:
response = client.get(qr_code_src, follow_redirects=True, timeout=15)
final_url = str(response.url)
self.log(f"QR Code Final URL: {final_url}")
self.qr_code_url = final_url
self.status = "waiting_for_scan"
self.update_stage("Waiting for Scan")
if self.callback:
self.callback("qr_code", final_url)
self.log("Waiting for token capture...")
page.wait_for_selector(".pipo-button", timeout=1000000)
self.ifGotToken = False
self.token_data = None
self._safe_goto(page, "https://www.trae.ai/account-settings")
except Exception as e:
if page:
self._capture_debug(page, e)
self.log(f"Process failed inside browser: {e}", "error")
self.status = "error"
raise e
# Keep browser open if we are waiting for token
while not self.ifGotToken:
time.sleep(4)
self.log("Waiting for token...")
self.status = "success"
self.update_stage("Success")
self.log("Registration process completed successfully")
except LoginRequiredError as e:
if page and not self.last_error:
self._capture_debug(page, e)
self.log(f"Process failed: {e}", "error")
self.status = "error"
self.update_stage("Login required")
except Exception as e:
if page and not self.last_error:
self._capture_debug(page, e)
self.log(f"Process failed: {e}", "error")
self.status = "error"
self.update_stage(f"Failed: {str(e)}")