NB2 / nb2.py
bobocup's picture
Update nb2.py
90019f7 verified
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.keys import Keys
from webdriver_manager.chrome import ChromeDriverManager
from fastapi import FastAPI, BackgroundTasks
import uvicorn
import asyncio
import time
import random
import string
import logging
import requests
import json
import re
import os
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MailGw:
def __init__(self):
self.base_url = "https://api.mail.gw"
self.token = None
self.email = None
self.password = "Temp123!@#"
def create_email(self):
try:
# 获取可用域名列表
domains_response = requests.get(f"{self.base_url}/domains")
domains = domains_response.json()['hydra:member']
# 打印所有可用域名
logger.info(f"可用域名列表: {[d['domain'] for d in domains]}")
# 随机选择一个域名
domain = random.choice(domains)['domain']
logger.info(f"选择的域名: {domain}")
# 生成随机邮箱地址
random_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
email = f"{random_name}@{domain}"
# 创建邮箱账户
account_response = requests.post(
f"{self.base_url}/accounts",
json={
"address": email,
"password": self.password
}
)
if account_response.status_code != 201:
logger.error(f"创建邮箱失败: {account_response.text}")
return None
self.email = email
# 获取token
token_response = requests.post(
f"{self.base_url}/token",
json={
"address": email,
"password": self.password
}
)
if token_response.status_code != 200:
logger.error(f"获取token失败: {token_response.text}")
return None
self.token = token_response.json()['token']
logger.info(f"创建邮箱成功: {email}")
return email
except Exception as e:
logger.error(f"创建邮箱时发生错误: {str(e)}")
return None
def get_verification_code(self, max_retries=10, delay=2):
if not self.token:
logger.error("Token不存在")
return None
headers = {"Authorization": f"Bearer {self.token}"}
for _ in range(max_retries):
try:
# 获取邮件列表
messages_response = requests.get(
f"{self.base_url}/messages",
headers=headers
)
messages = messages_response.json()['hydra:member']
if messages:
# 获取最新邮件的ID
message_id = messages[0]['id']
# 获取邮件内容
message_response = requests.get(
f"{self.base_url}/messages/{message_id}",
headers=headers
)
# 从邮件内容中提取验证码
text = message_response.json()['text']
# 使用正则表达式匹配6位数字验证码
match = re.search(r'\b\d{6}\b', text)
if match:
verification_code = match.group(0)
logger.info(f"获取到验证码: {verification_code}")
return verification_code
except Exception as e:
logger.error(f"获取验证码时发生错误: {str(e)}")
time.sleep(delay)
logger.info(f"等待验证码,重试第 {_ + 1} 次")
logger.error("未能获取到验证码")
return None
def get_cookie(driver):
try:
cookie_script = """
var cookieString = document.cookie;
var essentialCookies = cookieString.split('; ').filter(function(cookie) {
return cookie.startsWith('daily_query_') ||
cookie.startsWith('DSR=') ||
cookie.startsWith('DS=') ||
cookie.startsWith('uuid_guest=') ||
cookie.startsWith('ai_model=');
}).join('; ');
console.log('Essential Cookies:', essentialCookies);
return essentialCookies;
"""
cookie = driver.execute_script(cookie_script)
logger.info(f"获取的 Cookie: {cookie}")
with open('cookies.txt', 'a') as file:
file.write(f" - '{cookie}'\n")
logger.info("Cookie 已追加到 cookies.txt 文件")
return cookie
except Exception as e:
logger.error(f"获取cookie错误: {str(e)}")
return None
def wait_for_url(driver, url, timeout=10):
try:
start_time = time.time()
while time.time() - start_time < timeout:
current_url = driver.current_url.rstrip('/')
target_url = url.rstrip('/')
if current_url == target_url:
return True
time.sleep(0.5)
return False
except TimeoutException:
return False
def wait_for_email_input(driver, timeout=10):
"""等待邮箱输入框出现并返回"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
input_js = """
return document.querySelector("#AppProvider_Wrapper > form > descope-wc")
.shadowRoot.querySelector("#TExZpnv5m6")
.shadowRoot.querySelector("#input-vaadin-email-field-3")
"""
email_input = driver.execute_script(input_js)
if email_input:
logger.info("邮箱输入框已加载")
return True
except Exception:
pass
time.sleep(0.5)
logger.error("邮箱输入框未出现")
return False
def input_email(driver, email):
"""使用JavaScript输入邮箱"""
try:
input_js = f"""
var input = document.querySelector("#AppProvider_Wrapper > form > descope-wc")
.shadowRoot.querySelector("#TExZpnv5m6")
.shadowRoot.querySelector("#input-vaadin-email-field-3");
input.value = "{email}";
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
"""
driver.execute_script(input_js)
logger.info("输入邮箱成功")
return True
except Exception as e:
logger.error(f"输入邮箱失败: {str(e)}")
return False
def input_verification_code(driver, code):
"""使用JavaScript输入验证码"""
try:
input_js = f"""
var input = document.querySelector("#AppProvider_Wrapper > form > descope-wc")
.shadowRoot.querySelector("#oneTimeCodeId")
.shadowRoot.querySelector("#input-vaadin-text-field-7 > div.wrapper > descope-text-field:nth-child(1)")
.shadowRoot.querySelector("#input-vaadin-text-field-35");
input.value = "{code}";
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
"""
driver.execute_script(input_js)
logger.info("输入验证码成功")
return True
except Exception as e:
logger.error(f"输入验证码失败: {str(e)}")
return False
def registration_process(driver, wait):
try:
# 创建临时邮箱
mail_client = MailGw()
email = mail_client.create_email()
if not email:
logger.error("创建临时邮箱失败")
return False
# 第一步:打开认证页面
driver.get("https://you.com/authenticate")
logger.info("打开认证页面")
# 等待邮箱输入框出现
if not wait_for_email_input(driver):
logger.error("等待邮箱输入框超时")
return False
# 输入邮箱
if not input_email(driver, email):
logger.error("输入邮箱失败")
return False
time.sleep(0.5)
# 按回车继续
actions = webdriver.ActionChains(driver)
actions.send_keys(Keys.RETURN)
actions.perform()
logger.info("按下回车键")
time.sleep(2)
# 获取验证码
verification_code = mail_client.get_verification_code()
if not verification_code:
logger.error("获取验证码失败")
return False
logger.info(f"获取验证码: {verification_code}")
# 输入验证码
if not input_verification_code(driver, verification_code):
logger.error("输入验证码失败")
return False
time.sleep(2)
# 等待URL变化并获取cookie
if wait_for_url(driver, "https://you.com"):
logger.info("URL已变更为 https://you.com")
time.sleep(1)
cookie = get_cookie(driver)
if cookie:
logger.info("成功获取cookie")
return True
else:
logger.error("获取cookie失败")
return False
else:
logger.error("URL未变化为预期值")
return False
except Exception as e:
logger.error(f"注册过程错误: {str(e)}")
return False
app = FastAPI()
cookie_generator = None
class CookieGenerator:
def __init__(self):
self.driver = None
self.running = False
self.cookies = []
async def start_generation(self):
if self.running:
return
self.running = True
try:
chrome_options = Options()
chrome_options.add_argument('--headless=new')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-software-rasterizer')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-setuid-sandbox')
chrome_options.add_argument('--disable-web-security')
chrome_options.add_argument('--no-first-run')
chrome_options.add_argument('--no-default-browser-check')
chrome_options.add_argument('--allow-running-insecure-content')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--user-data-dir=/tmp/chrome-data')
chrome_options.add_argument('--disable-features=VizDisplayCompositor')
chrome_options.add_argument('--disable-dev-tools')
chrome_options.binary_location = os.getenv('CHROME_BIN', '/usr/bin/chromium')
service = Service(
os.getenv('CHROMEDRIVER_PATH', '/usr/bin/chromedriver')
)
self.driver = webdriver.Chrome(service=service, options=chrome_options)
wait = WebDriverWait(self.driver, 20)
while self.running:
success = registration_process(self.driver, wait)
if success:
logger.info("本次循环成功")
else:
logger.error("本次循环失败")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Cookie生成错误: {str(e)}")
if hasattr(e, 'msg'):
logger.error(f"错误消息: {e.msg}")
finally:
self.running = False
if self.driver:
self.driver.quit()
@app.on_event("startup")
async def startup_event():
global cookie_generator
cookie_generator = CookieGenerator()
@app.get("/")
async def read_root():
return {"status": "running"}
@app.get("/start")
async def start_generation(background_tasks: BackgroundTasks):
global cookie_generator
if not cookie_generator.running:
background_tasks.add_task(cookie_generator.start_generation)
return {"status": "started"}
return {"status": "already running"}
@app.get("/stop")
async def stop_generation():
global cookie_generator
if cookie_generator.running:
cookie_generator.running = False
return {"status": "stopping"}
return {"status": "not running"}
@app.get("/cookies")
async def get_cookies():
try:
if os.path.exists('cookies.txt'):
with open('cookies.txt', 'r') as file:
cookies = file.readlines()
return {"cookies": cookies}
return {"cookies": []}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)