| | from selenium import webdriver |
| | from selenium.webdriver.chrome.service import Service |
| | from selenium.webdriver.chrome.options import Options |
| | from selenium.webdriver.common.by import By |
| | from selenium.webdriver.support.ui import WebDriverWait |
| | from selenium.webdriver.support import expected_conditions as EC |
| | from selenium.common.exceptions import TimeoutException |
| | from selenium.webdriver.common.keys import Keys |
| | from webdriver_manager.chrome import ChromeDriverManager |
| | from fastapi import FastAPI, BackgroundTasks |
| | import uvicorn |
| | import asyncio |
| | import time |
| | import random |
| | import string |
| | import logging |
| | import requests |
| | import json |
| | import re |
| | import os |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | class MailGw: |
| | def __init__(self): |
| | self.base_url = "https://api.mail.gw" |
| | self.token = None |
| | self.email = None |
| | self.password = "Temp123!@#" |
| |
|
| | def create_email(self): |
| | try: |
| | |
| | domains_response = requests.get(f"{self.base_url}/domains") |
| | domains = domains_response.json()['hydra:member'] |
| | |
| | |
| | logger.info(f"可用域名列表: {[d['domain'] for d in domains]}") |
| | |
| | |
| | domain = random.choice(domains)['domain'] |
| | logger.info(f"选择的域名: {domain}") |
| |
|
| | |
| | random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| | email = f"{random_name}@{domain}" |
| |
|
| | |
| | account_response = requests.post( |
| | f"{self.base_url}/accounts", |
| | json={ |
| | "address": email, |
| | "password": self.password |
| | } |
| | ) |
| |
|
| | if account_response.status_code != 201: |
| | logger.error(f"创建邮箱失败: {account_response.text}") |
| | return None |
| |
|
| | self.email = email |
| |
|
| | |
| | token_response = requests.post( |
| | f"{self.base_url}/token", |
| | json={ |
| | "address": email, |
| | "password": self.password |
| | } |
| | ) |
| |
|
| | if token_response.status_code != 200: |
| | logger.error(f"获取token失败: {token_response.text}") |
| | return None |
| |
|
| | self.token = token_response.json()['token'] |
| | logger.info(f"创建邮箱成功: {email}") |
| | return email |
| |
|
| | except Exception as e: |
| | logger.error(f"创建邮箱时发生错误: {str(e)}") |
| | return None |
| | def get_verification_code(self, max_retries=10, delay=2): |
| | if not self.token: |
| | logger.error("Token不存在") |
| | return None |
| |
|
| | headers = {"Authorization": f"Bearer {self.token}"} |
| |
|
| | for _ in range(max_retries): |
| | try: |
| | |
| | messages_response = requests.get( |
| | f"{self.base_url}/messages", |
| | headers=headers |
| | ) |
| |
|
| | messages = messages_response.json()['hydra:member'] |
| |
|
| | if messages: |
| | |
| | message_id = messages[0]['id'] |
| | |
| | |
| | message_response = requests.get( |
| | f"{self.base_url}/messages/{message_id}", |
| | headers=headers |
| | ) |
| | |
| | |
| | text = message_response.json()['text'] |
| | |
| | |
| | match = re.search(r'\b\d{6}\b', text) |
| | if match: |
| | verification_code = match.group(0) |
| | logger.info(f"获取到验证码: {verification_code}") |
| | return verification_code |
| |
|
| | except Exception as e: |
| | logger.error(f"获取验证码时发生错误: {str(e)}") |
| |
|
| | time.sleep(delay) |
| | logger.info(f"等待验证码,重试第 {_ + 1} 次") |
| |
|
| | logger.error("未能获取到验证码") |
| | return None |
| |
|
| | def get_cookie(driver): |
| | try: |
| | cookie_script = """ |
| | var cookieString = document.cookie; |
| | var essentialCookies = cookieString.split('; ').filter(function(cookie) { |
| | return cookie.startsWith('daily_query_') || |
| | cookie.startsWith('DSR=') || |
| | cookie.startsWith('DS=') || |
| | cookie.startsWith('uuid_guest=') || |
| | cookie.startsWith('ai_model='); |
| | }).join('; '); |
| | console.log('Essential Cookies:', essentialCookies); |
| | return essentialCookies; |
| | """ |
| | cookie = driver.execute_script(cookie_script) |
| | |
| | logger.info(f"获取的 Cookie: {cookie}") |
| | |
| | with open('cookies.txt', 'a') as file: |
| | file.write(f" - '{cookie}'\n") |
| | |
| | logger.info("Cookie 已追加到 cookies.txt 文件") |
| | return cookie |
| | except Exception as e: |
| | logger.error(f"获取cookie错误: {str(e)}") |
| | return None |
| |
|
| | def wait_for_url(driver, url, timeout=10): |
| | try: |
| | start_time = time.time() |
| | while time.time() - start_time < timeout: |
| | current_url = driver.current_url.rstrip('/') |
| | target_url = url.rstrip('/') |
| | if current_url == target_url: |
| | return True |
| | time.sleep(0.5) |
| | return False |
| | except TimeoutException: |
| | return False |
| |
|
| | def wait_for_email_input(driver, timeout=10): |
| | """等待邮箱输入框出现并返回""" |
| | start_time = time.time() |
| | while time.time() - start_time < timeout: |
| | try: |
| | input_js = """ |
| | return document.querySelector("#AppProvider_Wrapper > form > descope-wc") |
| | .shadowRoot.querySelector("#TExZpnv5m6") |
| | .shadowRoot.querySelector("#input-vaadin-email-field-3") |
| | """ |
| | email_input = driver.execute_script(input_js) |
| | if email_input: |
| | logger.info("邮箱输入框已加载") |
| | return True |
| | except Exception: |
| | pass |
| | time.sleep(0.5) |
| | logger.error("邮箱输入框未出现") |
| | return False |
| | def input_email(driver, email): |
| | """使用JavaScript输入邮箱""" |
| | try: |
| | input_js = f""" |
| | var input = document.querySelector("#AppProvider_Wrapper > form > descope-wc") |
| | .shadowRoot.querySelector("#TExZpnv5m6") |
| | .shadowRoot.querySelector("#input-vaadin-email-field-3"); |
| | input.value = "{email}"; |
| | input.dispatchEvent(new Event('input', {{ bubbles: true }})); |
| | input.dispatchEvent(new Event('change', {{ bubbles: true }})); |
| | """ |
| | driver.execute_script(input_js) |
| | logger.info("输入邮箱成功") |
| | return True |
| | except Exception as e: |
| | logger.error(f"输入邮箱失败: {str(e)}") |
| | return False |
| |
|
| | def input_verification_code(driver, code): |
| | """使用JavaScript输入验证码""" |
| | try: |
| | input_js = f""" |
| | var input = document.querySelector("#AppProvider_Wrapper > form > descope-wc") |
| | .shadowRoot.querySelector("#oneTimeCodeId") |
| | .shadowRoot.querySelector("#input-vaadin-text-field-7 > div.wrapper > descope-text-field:nth-child(1)") |
| | .shadowRoot.querySelector("#input-vaadin-text-field-35"); |
| | input.value = "{code}"; |
| | input.dispatchEvent(new Event('input', {{ bubbles: true }})); |
| | input.dispatchEvent(new Event('change', {{ bubbles: true }})); |
| | """ |
| | driver.execute_script(input_js) |
| | logger.info("输入验证码成功") |
| | return True |
| | except Exception as e: |
| | logger.error(f"输入验证码失败: {str(e)}") |
| | return False |
| |
|
| | def registration_process(driver, wait): |
| | try: |
| | |
| | mail_client = MailGw() |
| | email = mail_client.create_email() |
| | if not email: |
| | logger.error("创建临时邮箱失败") |
| | return False |
| |
|
| | |
| | driver.get("https://you.com/authenticate") |
| | logger.info("打开认证页面") |
| |
|
| | |
| | if not wait_for_email_input(driver): |
| | logger.error("等待邮箱输入框超时") |
| | return False |
| |
|
| | |
| | if not input_email(driver, email): |
| | logger.error("输入邮箱失败") |
| | return False |
| |
|
| | time.sleep(0.5) |
| |
|
| | |
| | actions = webdriver.ActionChains(driver) |
| | actions.send_keys(Keys.RETURN) |
| | actions.perform() |
| | logger.info("按下回车键") |
| | time.sleep(2) |
| |
|
| | |
| | verification_code = mail_client.get_verification_code() |
| | if not verification_code: |
| | logger.error("获取验证码失败") |
| | return False |
| |
|
| | logger.info(f"获取验证码: {verification_code}") |
| |
|
| | |
| | if not input_verification_code(driver, verification_code): |
| | logger.error("输入验证码失败") |
| | return False |
| |
|
| | time.sleep(2) |
| |
|
| | |
| | if wait_for_url(driver, "https://you.com"): |
| | logger.info("URL已变更为 https://you.com") |
| | time.sleep(1) |
| | cookie = get_cookie(driver) |
| | if cookie: |
| | logger.info("成功获取cookie") |
| | return True |
| | else: |
| | logger.error("获取cookie失败") |
| | return False |
| | else: |
| | logger.error("URL未变化为预期值") |
| | return False |
| |
|
| | except Exception as e: |
| | logger.error(f"注册过程错误: {str(e)}") |
| | return False |
| |
|
| | app = FastAPI() |
| | cookie_generator = None |
| |
|
| | class CookieGenerator: |
| | def __init__(self): |
| | self.driver = None |
| | self.running = False |
| | self.cookies = [] |
| |
|
| | async def start_generation(self): |
| | if self.running: |
| | return |
| | |
| | self.running = True |
| | try: |
| | chrome_options = Options() |
| | chrome_options.add_argument('--headless=new') |
| | chrome_options.add_argument('--no-sandbox') |
| | chrome_options.add_argument('--disable-dev-shm-usage') |
| | chrome_options.add_argument('--disable-gpu') |
| | chrome_options.add_argument('--disable-software-rasterizer') |
| | chrome_options.add_argument('--disable-extensions') |
| | chrome_options.add_argument('--disable-setuid-sandbox') |
| | chrome_options.add_argument('--disable-web-security') |
| | chrome_options.add_argument('--no-first-run') |
| | chrome_options.add_argument('--no-default-browser-check') |
| | chrome_options.add_argument('--allow-running-insecure-content') |
| | chrome_options.add_argument('--disable-blink-features=AutomationControlled') |
| | chrome_options.add_argument('--window-size=1920,1080') |
| | chrome_options.add_argument('--user-data-dir=/tmp/chrome-data') |
| | chrome_options.add_argument('--disable-features=VizDisplayCompositor') |
| | chrome_options.add_argument('--disable-dev-tools') |
| | chrome_options.binary_location = os.getenv('CHROME_BIN', '/usr/bin/chromium') |
| | |
| | service = Service( |
| | os.getenv('CHROMEDRIVER_PATH', '/usr/bin/chromedriver') |
| | ) |
| | |
| | self.driver = webdriver.Chrome(service=service, options=chrome_options) |
| | wait = WebDriverWait(self.driver, 20) |
| |
|
| | while self.running: |
| | success = registration_process(self.driver, wait) |
| | if success: |
| | logger.info("本次循环成功") |
| | else: |
| | logger.error("本次循环失败") |
| | await asyncio.sleep(5) |
| |
|
| | except Exception as e: |
| | logger.error(f"Cookie生成错误: {str(e)}") |
| | if hasattr(e, 'msg'): |
| | logger.error(f"错误消息: {e.msg}") |
| | finally: |
| | self.running = False |
| | if self.driver: |
| | self.driver.quit() |
| |
|
| | @app.on_event("startup") |
| | async def startup_event(): |
| | global cookie_generator |
| | cookie_generator = CookieGenerator() |
| | |
| | @app.get("/") |
| | async def read_root(): |
| | return {"status": "running"} |
| |
|
| | @app.get("/start") |
| | async def start_generation(background_tasks: BackgroundTasks): |
| | global cookie_generator |
| | if not cookie_generator.running: |
| | background_tasks.add_task(cookie_generator.start_generation) |
| | return {"status": "started"} |
| | return {"status": "already running"} |
| |
|
| | @app.get("/stop") |
| | async def stop_generation(): |
| | global cookie_generator |
| | if cookie_generator.running: |
| | cookie_generator.running = False |
| | return {"status": "stopping"} |
| | return {"status": "not running"} |
| |
|
| | @app.get("/cookies") |
| | async def get_cookies(): |
| | try: |
| | if os.path.exists('cookies.txt'): |
| | with open('cookies.txt', 'r') as file: |
| | cookies = file.readlines() |
| | return {"cookies": cookies} |
| | return {"cookies": []} |
| | except Exception as e: |
| | return {"error": str(e)} |
| |
|
| | if __name__ == "__main__": |
| | uvicorn.run(app, host="0.0.0.0", port=7860) |