from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.keys import Keys from webdriver_manager.chrome import ChromeDriverManager from fastapi import FastAPI, BackgroundTasks import uvicorn import asyncio import time import random import string import logging import requests import json import re import os # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MailGw: def __init__(self): self.base_url = "https://api.mail.gw" self.token = None self.email = None self.password = "Temp123!@#" def create_email(self): try: # 获取可用域名列表 domains_response = requests.get(f"{self.base_url}/domains") domains = domains_response.json()['hydra:member'] # 打印所有可用域名 logger.info(f"可用域名列表: {[d['domain'] for d in domains]}") # 随机选择一个域名 domain = random.choice(domains)['domain'] logger.info(f"选择的域名: {domain}") # 生成随机邮箱地址 random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) email = f"{random_name}@{domain}" # 创建邮箱账户 account_response = requests.post( f"{self.base_url}/accounts", json={ "address": email, "password": self.password } ) if account_response.status_code != 201: logger.error(f"创建邮箱失败: {account_response.text}") return None self.email = email # 获取token token_response = requests.post( f"{self.base_url}/token", json={ "address": email, "password": self.password } ) if token_response.status_code != 200: logger.error(f"获取token失败: {token_response.text}") return None self.token = token_response.json()['token'] logger.info(f"创建邮箱成功: {email}") return email except Exception as e: logger.error(f"创建邮箱时发生错误: {str(e)}") return None def get_verification_code(self, max_retries=10, delay=2): if not self.token: logger.error("Token不存在") return None headers = {"Authorization": f"Bearer {self.token}"} for _ in range(max_retries): try: # 获取邮件列表 messages_response = requests.get( f"{self.base_url}/messages", headers=headers ) messages = messages_response.json()['hydra:member'] if messages: # 获取最新邮件的ID message_id = messages[0]['id'] # 获取邮件内容 message_response = requests.get( f"{self.base_url}/messages/{message_id}", headers=headers ) # 从邮件内容中提取验证码 text = message_response.json()['text'] # 使用正则表达式匹配6位数字验证码 match = re.search(r'\b\d{6}\b', text) if match: verification_code = match.group(0) logger.info(f"获取到验证码: {verification_code}") return verification_code except Exception as e: logger.error(f"获取验证码时发生错误: {str(e)}") time.sleep(delay) logger.info(f"等待验证码,重试第 {_ + 1} 次") logger.error("未能获取到验证码") return None def get_cookie(driver): try: cookie_script = """ var cookieString = document.cookie; var essentialCookies = cookieString.split('; ').filter(function(cookie) { return cookie.startsWith('daily_query_') || cookie.startsWith('DSR=') || cookie.startsWith('DS=') || cookie.startsWith('uuid_guest=') || cookie.startsWith('ai_model='); }).join('; '); console.log('Essential Cookies:', essentialCookies); return essentialCookies; """ cookie = driver.execute_script(cookie_script) logger.info(f"获取的 Cookie: {cookie}") with open('cookies.txt', 'a') as file: file.write(f" - '{cookie}'\n") logger.info("Cookie 已追加到 cookies.txt 文件") return cookie except Exception as e: logger.error(f"获取cookie错误: {str(e)}") return None def wait_for_url(driver, url, timeout=10): try: start_time = time.time() while time.time() - start_time < timeout: current_url = driver.current_url.rstrip('/') target_url = url.rstrip('/') if current_url == target_url: return True time.sleep(0.5) return False except TimeoutException: return False def wait_for_email_input(driver, timeout=10): """等待邮箱输入框出现并返回""" start_time = time.time() while time.time() - start_time < timeout: try: input_js = """ return document.querySelector("#AppProvider_Wrapper > form > descope-wc") .shadowRoot.querySelector("#TExZpnv5m6") .shadowRoot.querySelector("#input-vaadin-email-field-3") """ email_input = driver.execute_script(input_js) if email_input: logger.info("邮箱输入框已加载") return True except Exception: pass time.sleep(0.5) logger.error("邮箱输入框未出现") return False def input_email(driver, email): """使用JavaScript输入邮箱""" try: input_js = f""" var input = document.querySelector("#AppProvider_Wrapper > form > descope-wc") .shadowRoot.querySelector("#TExZpnv5m6") .shadowRoot.querySelector("#input-vaadin-email-field-3"); input.value = "{email}"; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); """ driver.execute_script(input_js) logger.info("输入邮箱成功") return True except Exception as e: logger.error(f"输入邮箱失败: {str(e)}") return False def input_verification_code(driver, code): """使用JavaScript输入验证码""" try: input_js = f""" var input = document.querySelector("#AppProvider_Wrapper > form > descope-wc") .shadowRoot.querySelector("#oneTimeCodeId") .shadowRoot.querySelector("#input-vaadin-text-field-7 > div.wrapper > descope-text-field:nth-child(1)") .shadowRoot.querySelector("#input-vaadin-text-field-35"); input.value = "{code}"; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); """ driver.execute_script(input_js) logger.info("输入验证码成功") return True except Exception as e: logger.error(f"输入验证码失败: {str(e)}") return False def registration_process(driver, wait): try: # 创建临时邮箱 mail_client = MailGw() email = mail_client.create_email() if not email: logger.error("创建临时邮箱失败") return False # 第一步:打开认证页面 driver.get("https://you.com/authenticate") logger.info("打开认证页面") # 等待邮箱输入框出现 if not wait_for_email_input(driver): logger.error("等待邮箱输入框超时") return False # 输入邮箱 if not input_email(driver, email): logger.error("输入邮箱失败") return False time.sleep(0.5) # 按回车继续 actions = webdriver.ActionChains(driver) actions.send_keys(Keys.RETURN) actions.perform() logger.info("按下回车键") time.sleep(2) # 获取验证码 verification_code = mail_client.get_verification_code() if not verification_code: logger.error("获取验证码失败") return False logger.info(f"获取验证码: {verification_code}") # 输入验证码 if not input_verification_code(driver, verification_code): logger.error("输入验证码失败") return False time.sleep(2) # 等待URL变化并获取cookie if wait_for_url(driver, "https://you.com"): logger.info("URL已变更为 https://you.com") time.sleep(1) cookie = get_cookie(driver) if cookie: logger.info("成功获取cookie") return True else: logger.error("获取cookie失败") return False else: logger.error("URL未变化为预期值") return False except Exception as e: logger.error(f"注册过程错误: {str(e)}") return False app = FastAPI() cookie_generator = None class CookieGenerator: def __init__(self): self.driver = None self.running = False self.cookies = [] async def start_generation(self): if self.running: return self.running = True try: chrome_options = Options() chrome_options.add_argument('--headless=new') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--disable-software-rasterizer') chrome_options.add_argument('--disable-extensions') chrome_options.add_argument('--disable-setuid-sandbox') chrome_options.add_argument('--disable-web-security') chrome_options.add_argument('--no-first-run') chrome_options.add_argument('--no-default-browser-check') chrome_options.add_argument('--allow-running-insecure-content') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--user-data-dir=/tmp/chrome-data') chrome_options.add_argument('--disable-features=VizDisplayCompositor') chrome_options.add_argument('--disable-dev-tools') chrome_options.binary_location = os.getenv('CHROME_BIN', '/usr/bin/chromium') service = Service( os.getenv('CHROMEDRIVER_PATH', '/usr/bin/chromedriver') ) self.driver = webdriver.Chrome(service=service, options=chrome_options) wait = WebDriverWait(self.driver, 20) while self.running: success = registration_process(self.driver, wait) if success: logger.info("本次循环成功") else: logger.error("本次循环失败") await asyncio.sleep(5) except Exception as e: logger.error(f"Cookie生成错误: {str(e)}") if hasattr(e, 'msg'): logger.error(f"错误消息: {e.msg}") finally: self.running = False if self.driver: self.driver.quit() @app.on_event("startup") async def startup_event(): global cookie_generator cookie_generator = CookieGenerator() @app.get("/") async def read_root(): return {"status": "running"} @app.get("/start") async def start_generation(background_tasks: BackgroundTasks): global cookie_generator if not cookie_generator.running: background_tasks.add_task(cookie_generator.start_generation) return {"status": "started"} return {"status": "already running"} @app.get("/stop") async def stop_generation(): global cookie_generator if cookie_generator.running: cookie_generator.running = False return {"status": "stopping"} return {"status": "not running"} @app.get("/cookies") async def get_cookies(): try: if os.path.exists('cookies.txt'): with open('cookies.txt', 'r') as file: cookies = file.readlines() return {"cookies": cookies} return {"cookies": []} except Exception as e: return {"error": str(e)} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)