Spaces:
Paused
Paused
| from playwright.sync_api import Page | |
| from camoufox.sync_api import Camoufox | |
| import time | |
| from ext import mail_service | |
| import random | |
| import httpx | |
| import qrcode | |
| import io | |
| import logging | |
| import threading | |
| import json | |
| import os | |
| import base64 | |
| import traceback | |
| logger = logging.getLogger("TraeBot") | |
| class LoginRequiredError(Exception): | |
| pass | |
| class TraeBot: | |
| def __init__(self, session_id: str, callback=None): | |
| self.session_id = session_id | |
| self.callback = callback | |
| self.mail = mail_service.MailService() | |
| self.ifGotToken = False | |
| self.token_data = None | |
| self.qr_code_url = None | |
| self.logs = [] | |
| self.status = "idle" # idle, running, waiting_for_scan, success, error | |
| self.current_stage = "Initializing" | |
| self.last_error = None | |
| self.last_traceback = None | |
| self.last_url = None | |
| self.last_seen_url = None | |
| self.last_title = None | |
| self.last_html = None | |
| self.last_screenshot_png_b64 = None | |
| def log(self, message: str, level="info"): | |
| entry = {"timestamp": time.time(), "message": message, "level": level} | |
| self.logs.append(entry) | |
| if self.callback: | |
| self.callback("log", entry) | |
| if level == "error": | |
| logger.error(f"[{self.session_id}] {message}") | |
| elif level == "debug": | |
| logger.debug(f"[{self.session_id}] {message}") | |
| else: | |
| logger.info(f"[{self.session_id}] {message}") | |
| def update_stage(self, stage: str): | |
| self.current_stage = stage | |
| def _try_get_location_href(self, page): | |
| try: | |
| return page.evaluate("() => location.href") | |
| except Exception: | |
| return None | |
| def _try_get_title(self, page): | |
| try: | |
| return page.title() | |
| except Exception: | |
| return None | |
| def _safe_visible(self, page, selector: str) -> bool: | |
| try: | |
| return page.locator(selector).first.is_visible(timeout=500) | |
| except Exception: | |
| return False | |
| def _is_login_page(self, page) -> bool: | |
| url = None | |
| try: | |
| url = page.url | |
| except Exception: | |
| url = None | |
| if url and any(part in url for part in ("/login", "/sign-in", "/signin")): | |
| return True | |
| title = self._try_get_title(page) | |
| if title and any(x in title.lower() for x in ("log in", "login", "sign in")): | |
| if self._safe_visible(page, "input[type='password']") or self._safe_visible(page, "text=Log in") or self._safe_visible(page, "text=Sign in"): | |
| return True | |
| if self._safe_visible(page, "input[placeholder='Email']") and self._safe_visible(page, "input[type='password']"): | |
| return True | |
| return False | |
| def _safe_goto(self, page, url: str, timeout: int = 40000): | |
| self.last_seen_url = self._try_get_location_href(page) or getattr(page, "url", None) | |
| self.last_title = self._try_get_title(page) | |
| wait_untils = ("load", "domcontentloaded", "networkidle") | |
| last_exc = None | |
| for wait_until in wait_untils: | |
| try: | |
| self.log(f"Goto {url} (waitUntil={wait_until})", "debug") | |
| page.goto(url, timeout=timeout, wait_until=wait_until) | |
| self.last_seen_url = self._try_get_location_href(page) or getattr(page, "url", None) | |
| self.last_title = self._try_get_title(page) | |
| return | |
| except Exception as e: | |
| last_exc = e | |
| self.last_seen_url = self._try_get_location_href(page) or getattr(page, "url", None) | |
| self.last_title = self._try_get_title(page) | |
| if "NS_BINDING_ABORTED" not in str(e): | |
| raise | |
| if self._is_login_page(page): | |
| raise LoginRequiredError("Login required") from e | |
| time.sleep(0.5) | |
| if last_exc: | |
| raise last_exc | |
| def _capture_debug(self, page, exc: Exception): | |
| self.last_error = f"{type(exc).__name__}: {exc}" | |
| self.last_traceback = traceback.format_exc() | |
| try: | |
| self.last_url = getattr(page, "url", None) | |
| except Exception: | |
| self.last_url = None | |
| self.last_seen_url = self._try_get_location_href(page) or self.last_url | |
| self.last_title = self._try_get_title(page) | |
| try: | |
| html = page.content() | |
| if isinstance(html, str) and len(html) > 100000: | |
| html = html[:100000] | |
| self.last_html = html | |
| except Exception: | |
| self.last_html = None | |
| try: | |
| png = page.screenshot(full_page=True, type="png") | |
| if isinstance(png, (bytes, bytearray)): | |
| self.last_screenshot_png_b64 = base64.b64encode(png).decode("ascii") | |
| except Exception: | |
| self.last_screenshot_png_b64 = None | |
| def generate_password(self, length: int = 12) -> str: | |
| characters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()" | |
| return "".join(random.choice(characters) for _ in range(length)) | |
| def run(self): | |
| self.status = "running" | |
| self.update_stage("Starting Browser") | |
| start_time = time.time() | |
| page = None | |
| try: | |
| self.log("Starting Browser and navigate to Trae.ai") | |
| with Camoufox( | |
| headless=os.environ.get("HEADLESS", "1") != "0", | |
| os="windows", | |
| geoip=True, | |
| window=(1280, 720), | |
| humanize=False | |
| ) as browser: | |
| page = browser.new_page() | |
| def check_timeout(): | |
| if self.status != "waiting_for_scan" and time.time() - start_time > 60: | |
| raise TimeoutError("Registration timed out (exceeded 60 seconds)") | |
| def click_btn(selectors: list): | |
| check_timeout() | |
| for selector in selectors: | |
| try: | |
| page.click(selector) | |
| self.log(f"Try to click {selector}", "debug") | |
| break | |
| except Exception as e: | |
| self.log(f"Failed to click btn with selector {selector}: {e}", "warning") | |
| continue | |
| def fill_input(selectors: list, value: str): | |
| check_timeout() | |
| for selector in selectors: | |
| try: | |
| page.fill(selector, value) | |
| self.log(f"Try to fill {selector}", "debug") | |
| break | |
| except Exception as e: | |
| self.log(f"Failed to fill {selector}: {e}", "warning") | |
| continue | |
| def axis_input(asix: list, value: str): | |
| check_timeout() | |
| x, y = asix | |
| try: | |
| page.mouse.move(x, y) | |
| page.mouse.click(x, y) | |
| page.keyboard.type(value) | |
| self.log(f"Try to input {value} to {asix}", "debug") | |
| except Exception as e: | |
| self.log(f"Failed to input {value} to {asix}: {e}", "error") | |
| def get_element_src(selectors: list) -> str: | |
| check_timeout() | |
| for selector in selectors: | |
| try: | |
| if page.is_visible(selector): | |
| src = page.get_attribute(selector, "src") | |
| if src: | |
| self.log(f"Got src from {selector}: {src}", "debug") | |
| return src | |
| except Exception: | |
| continue | |
| self.log(f"Failed to get src from any selectors", "error") | |
| return "" | |
| def on_response(response): | |
| self.log(f"Received response: {response.url}", "debug") | |
| if "GetUserToken" in response.url: | |
| try: | |
| text = response.text() | |
| self.log(f"Response body captured (token found)") | |
| self.token_data = text | |
| self.ifGotToken = True | |
| if self.callback: | |
| self.callback("token", text) | |
| except Exception as e: | |
| self.log(f"Failed to get response text: {e}", "error") | |
| check_timeout() | |
| self._safe_goto(page, "https://trae.ai/sign-up", timeout=40000) | |
| page.on("response", on_response) | |
| self.update_stage("Creating Email") | |
| self.log("Create temp email") | |
| email = self.mail.create_temp_email() | |
| if not email: | |
| raise Exception("Failed to create email") | |
| self.log(f"Created temp email: {email}") | |
| check_timeout() | |
| self.log("Queue input email") | |
| page.wait_for_selector("div.sc-eqUAAy:nth-child(1) > div:nth-child(1)") | |
| MAIL_INPUT = [ | |
| "div.sc-eqUAAy:nth-child(1) > div:nth-child(1) > input:nth-child(1)", | |
| "html.cc--elegant-black body div#root div.sc-gsFSXq.etLQat div.sc-kAyceB.iShCeB div.sc-dhKdcB.ddLexq div.sc-eqUAAy.cliMhU.email div.input-con.undefined input", | |
| "xpath=/html/body/div/div[1]/div[2]/div[4]/div[1]/div[1]/input" | |
| ] | |
| fill_input(MAIL_INPUT, email) | |
| self.update_stage("Requesting Code") | |
| self.log("Try to click Send mail btn") | |
| SENDMAIL_BTN = [ | |
| ".send-code", | |
| "html.cc--elegant-black body div#root div.sc-gsFSXq.etLQat div.sc-kAyceB.iShCeB div.sc-dhKdcB.ddLexq div.sc-eqUAAy.cliMhU.verification-code div.input-con.undefined div.right-part.send-code", | |
| "xpath=/html/body/div/div[1]/div[2]/div[4]/div[2]/div[1]/div[2]" | |
| ] | |
| click_btn(SENDMAIL_BTN) | |
| self.update_stage("Waiting for Code") | |
| # Wait for code loop with timeout check | |
| code_start_time = time.time() | |
| trae_code = None | |
| while time.time() - code_start_time < 30: # Inner timeout for code | |
| check_timeout() # Check global timeout | |
| trae_code = self.mail.wait_for_trae_code(email, timeout=5) | |
| if trae_code: | |
| break | |
| if trae_code: | |
| self.log(f"Received Trae code: {trae_code}") | |
| else: | |
| self.log("Failed to receive Trae code", "error") | |
| raise Exception("Failed to receive Trae code") | |
| self.update_stage("Verifying Code") | |
| PASSWORD_INPUT = [ | |
| "div.sc-eqUAAy:nth-child(3) > div:nth-child(1) > input:nth-child(1)" | |
| ] | |
| password_input_asix = page.query_selector(PASSWORD_INPUT[0]).bounding_box() | |
| if password_input_asix: | |
| self.log(f"Password input asix: {password_input_asix}") | |
| axis_input([password_input_asix["x"]+10, password_input_asix["y"]-54], trae_code) | |
| self.update_stage("Setting Password") | |
| password = self.generate_password() | |
| self.log(f"Generated password: {password}") | |
| fill_input(PASSWORD_INPUT, password) | |
| SIGNUP_BTN = [ | |
| "div.sc-gEvEer:nth-child(5)", | |
| "html.cc--elegant-black body div#root div.sc-gsFSXq.etLQat div.sc-kAyceB.iShCeB div.sc-gEvEer.fQLTLP.mb-8.btn-submit.btn-large.trae__btn", | |
| "/html/body/div/div[1]/div[2]/div[5]" | |
| ] | |
| click_btn(SIGNUP_BTN) | |
| try: | |
| check_timeout() | |
| self.update_stage("Finalizing Registration") | |
| page.wait_for_url("https://www.trae.ai/account-setting") | |
| self.log("Signup success") | |
| self.update_stage("Claiming Gift") | |
| self.log("Try to claim gift") | |
| self._safe_goto(page, "https://www.trae.ai/pricing") | |
| CLAIM_GIFT_BTN = [ | |
| ".button_brand_l-UJja6g", | |
| "/html/body/div/div[1]/div[2]/div/div/div[1]/div[2]/div[2]/div[2]/div[4]/button" | |
| ] | |
| click_btn(CLAIM_GIFT_BTN) | |
| ALIPAY_BTN = [ | |
| "div.pipo-card:nth-child(2) > div:nth-child(1) > div:nth-child(1) > div:nth-child(1) > div:nth-child(2) > div:nth-child(1) > div:nth-child(2) > label:nth-child(1) > div:nth-child(2)", | |
| "/html/body/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/div/div/div[2]/div[2]/div/div/div/div[2]/div/div[2]/label/div" | |
| ] | |
| page.wait_for_selector(ALIPAY_BTN[0]) | |
| click_btn(ALIPAY_BTN) | |
| self.ifGotToken = False | |
| self.token_data = None | |
| ALIPAY_CONFIRM_BTN = [ | |
| ".pipo-button-preIcon", | |
| "/html/body/div[1]/div/div[2]/div/div[1]/div[2]/div[2]/div/div/div/div[2]/div[2]/div/div[2]/div[2]/button" | |
| ] | |
| click_btn(ALIPAY_CONFIRM_BTN) | |
| self.update_stage("Generating QR Code") | |
| SUB_ELEMENT = [ | |
| ".sign-qr-code-image > img:nth-child(1)", | |
| "/html/body/div[1]/div/div[2]/div[2]/div/img" | |
| ] | |
| page.wait_for_selector(SUB_ELEMENT[0]) | |
| qr_code_src = get_element_src(SUB_ELEMENT) | |
| self.log(f"scan url to get free pro: {qr_code_src}") | |
| if qr_code_src and qr_code_src.startswith("http"): | |
| self.log("Resolving QR code redirect...") | |
| with httpx.Client() as client: | |
| response = client.get(qr_code_src, follow_redirects=True, timeout=15) | |
| final_url = str(response.url) | |
| self.log(f"QR Code Final URL: {final_url}") | |
| self.qr_code_url = final_url | |
| self.status = "waiting_for_scan" | |
| self.update_stage("Waiting for Scan") | |
| if self.callback: | |
| self.callback("qr_code", final_url) | |
| self.log("Waiting for token capture...") | |
| page.wait_for_selector(".pipo-button", timeout=1000000) | |
| self.ifGotToken = False | |
| self.token_data = None | |
| self._safe_goto(page, "https://www.trae.ai/account-settings") | |
| except Exception as e: | |
| if page: | |
| self._capture_debug(page, e) | |
| self.log(f"Process failed inside browser: {e}", "error") | |
| self.status = "error" | |
| raise e | |
| # Keep browser open if we are waiting for token | |
| while not self.ifGotToken: | |
| time.sleep(4) | |
| self.log("Waiting for token...") | |
| self.status = "success" | |
| self.update_stage("Success") | |
| self.log("Registration process completed successfully") | |
| except LoginRequiredError as e: | |
| if page and not self.last_error: | |
| self._capture_debug(page, e) | |
| self.log(f"Process failed: {e}", "error") | |
| self.status = "error" | |
| self.update_stage("Login required") | |
| except Exception as e: | |
| if page and not self.last_error: | |
| self._capture_debug(page, e) | |
| self.log(f"Process failed: {e}", "error") | |
| self.status = "error" | |
| self.update_stage(f"Failed: {str(e)}") | |