Spaces:
Paused
Paused
| import requests | |
| from bs4 import BeautifulSoup | |
| from requests.adapters import HTTPAdapter | |
| from requests.packages.urllib3.util.retry import Retry | |
| import uuid | |
| import json | |
| import time | |
| import re | |
| import hashlib | |
| import base64 | |
| import random | |
| import string | |
| import os | |
| import sys | |
| from urllib.parse import quote, urlparse, parse_qs | |
| import concurrent.futures | |
| import logging | |
| # 配置日志(仅控制台输出) | |
| logging.basicConfig( | |
| level=logging.DEBUG, # 改为DEBUG级别,获取更详细的日志 | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| stream=sys.stdout | |
| ) | |
| # 获取环境变量 | |
| PASTE_API_URL = os.getenv('PASTE_API_URL') | |
| PASTE_API_PASSWORD = os.getenv('PASTE_API_PASSWORD') | |
| # 构建上传 URL | |
| UPLOAD_URL = PASTE_API_URL.replace('/api/paste/', '/api/admin/paste/') + '/content' | |
| # 新的临时邮箱API | |
| TEMP_EMAIL_API = "https://www.1secmail.com/api/v1/?action=genRandomMailbox" | |
| # 注册 API 地址 | |
| REGISTER_API_URL = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/signup?redirect_to=https%3A%2F%2Fchat.notdiamond.ai" | |
| # Supabase API | |
| SUPABASE_API_KEY = os.getenv('SUPABASE_API_KEY') | |
| # 请求限制和重试相关配置 | |
| MAX_RETRIES = 3 | |
| RETRY_DELAY = 10 # 秒 | |
| RATE_LIMIT_DELAY = 60 # 秒 | |
| def generate_strong_password(): | |
| """ | |
| 从环境变量获取密码,如果未设置则抛出异常 | |
| """ | |
| # 尝试从环境变量获取密码 | |
| env_password = os.getenv('REGISTER_PASSWORD') | |
| if not env_password: | |
| logging.error("未设置 REGISTER_PASSWORD 环境变量") | |
| raise ValueError("必须通过 REGISTER_PASSWORD 环境变量指定密码") | |
| return env_password | |
| def generate_code_verifier(): | |
| return base64.urlsafe_b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=') | |
| def generate_code_challenge(code_verifier): | |
| code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() | |
| return base64.urlsafe_b64encode(code_challenge).decode('utf-8').rstrip('=') | |
| def get_temp_email(): | |
| session = requests.Session() | |
| retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) | |
| session.mount('https://', HTTPAdapter(max_retries=retries)) | |
| try: | |
| response = session.get(TEMP_EMAIL_API) | |
| if response.status_code == 200: | |
| email = response.json()[0] | |
| logging.info(f"Generated temp email: {email}") | |
| return email | |
| except requests.exceptions.SSLError as e: | |
| logging.error(f"SSL Error getting temp email: {e}") | |
| return None | |
| def register(email, password): | |
| for attempt in range(MAX_RETRIES): | |
| session = requests.Session() | |
| headers = { | |
| 'apikey': SUPABASE_API_KEY, | |
| 'authorization': f'Bearer {SUPABASE_API_KEY}', | |
| 'content-type': 'application/json' | |
| } | |
| code_verifier = generate_code_verifier() | |
| code_challenge = generate_code_challenge(code_verifier) | |
| payload = { | |
| 'email': email, | |
| 'password': password, | |
| 'code_challenge': code_challenge, | |
| 'code_challenge_method': 's256', | |
| 'data': {}, | |
| 'gotrue_meta_security': {} | |
| } | |
| try: | |
| response = session.post(REGISTER_API_URL, headers=headers, json=payload) | |
| logging.info(f"Registration response status: {response.status_code}") | |
| logging.info(f"Registration response content: {response.text}") | |
| if response.status_code == 200: | |
| logging.info(f"Registered successfully with email: {email}") | |
| return True | |
| elif response.status_code == 429: | |
| logging.warning(f"Rate limit reached. Waiting {RATE_LIMIT_DELAY} seconds.") | |
| time.sleep(RATE_LIMIT_DELAY) | |
| else: | |
| logging.error(f"Failed to register with email: {email}. Response: {response.text}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Request error during registration: {e}") | |
| if attempt < MAX_RETRIES - 1: | |
| logging.info(f"Retrying registration in {RETRY_DELAY} seconds...") | |
| time.sleep(RETRY_DELAY) | |
| else: | |
| logging.error("Max registration retries reached.") | |
| return False | |
| return False | |
| def save_account(email, password): | |
| try: | |
| session = requests.Session() | |
| # 首先进行认证 | |
| auth_url = PASTE_API_URL | |
| auth_headers = { | |
| 'accept': '*/*', | |
| 'accept-language': 'zh-CN,zh;q=0.9', | |
| 'user-agent': 'Mozilla/5.0', | |
| 'x-password': PASTE_API_PASSWORD | |
| } | |
| # 获取现有内容 | |
| logging.info(f"Authenticating with URL: {auth_url}") | |
| response = session.get(auth_url, headers=auth_headers) | |
| logging.info(f"Authentication response status: {response.status_code}") | |
| logging.info(f"Authentication response content: {response.text}") | |
| if response.status_code != 200: | |
| logging.error(f"Authentication failed, status code: {response.status_code}") | |
| return False | |
| # 从响应获取现有内容 | |
| existing_data = response.json() | |
| existing_content = existing_data.get('content', '') | |
| # 构建新的内容 | |
| if existing_content: | |
| new_content = f"{existing_content};{email}|{password}" | |
| else: | |
| new_content = f"{email}|{password}" | |
| logging.info(f"New content to upload: {new_content}") | |
| # 上传新内容 | |
| upload_headers = { | |
| 'Authorization': 'Basic emhvdWRhbjp6aG91ZGFu', | |
| 'Content-Type': 'application/json' | |
| } | |
| upload_payload = { | |
| 'content': new_content | |
| } | |
| logging.info(f"Uploading to URL: {UPLOAD_URL}") | |
| upload_response = session.put(UPLOAD_URL, headers=upload_headers, json=upload_payload) | |
| logging.info(f"Upload response status: {upload_response.status_code}") | |
| logging.info(f"Upload response content: {upload_response.text}") | |
| if upload_response.status_code == 200: | |
| logging.info(f"Successfully uploaded account {email}") | |
| return True | |
| else: | |
| logging.error(f"Failed to upload account {email}. Status code: {upload_response.status_code}") | |
| return False | |
| except Exception as e: | |
| logging.error(f"Comprehensive error uploading account: {e}") | |
| return False | |
| def check_email(email): | |
| domain = email.split('@')[1] | |
| login = email.split('@')[0] | |
| # 获取消息ID | |
| mailbox_url = f"https://www.1secmail.com/api/v1/?action=getMessages&login={login}&domain={domain}" | |
| session = requests.Session() | |
| retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) | |
| session.mount('https://', HTTPAdapter(max_retries=retries)) | |
| logging.info(f"Checking email for {email}") | |
| time.sleep(10) | |
| for _ in range(20): | |
| try: | |
| response = session.get(mailbox_url) | |
| logging.info(f"Email check response status: {response.status_code}") | |
| if response.status_code == 200 and response.json(): | |
| for message in response.json(): | |
| if "Confirm Your Signup" in message.get('subject', ''): | |
| # 获取完整邮件内容 | |
| message_url = f"https://www.1secmail.com/api/v1/?action=readMessage&login={login}&domain={domain}&id={message['id']}" | |
| message_response = session.get(message_url) | |
| if message_response.status_code == 200: | |
| full_message = message_response.json() | |
| logging.info(f"Full message content found: {full_message}") | |
| # 提取验证链接 | |
| match = re.search(r'https?://chat\.notdiamond\.ai/auth/confirm\?[^\s]+', full_message.get('body', '')) | |
| if match: | |
| verification_link = match.group(0) | |
| logging.info(f"Verification link found: {verification_link}") | |
| # 解析 URL 并获取参数 | |
| parsed_url = urlparse(verification_link) | |
| query_params = parse_qs(parsed_url.query) | |
| # 使用 Supabase API 进行验证 | |
| verify_url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/verify" | |
| headers = { | |
| 'apikey': SUPABASE_API_KEY, | |
| 'authorization': f'Bearer {SUPABASE_API_KEY}', | |
| 'content-type': 'application/json' | |
| } | |
| payload = { | |
| 'token_hash': query_params['token_hash'][0], | |
| 'type': 'signup' # 硬编码为 'signup' | |
| } | |
| verify_response = session.post(verify_url, headers=headers, json=payload) | |
| logging.info(f"Verification response status: {verify_response.status_code}") | |
| if verify_response.status_code == 200: | |
| logging.info(f"Email verified for {email}") | |
| return True | |
| else: | |
| logging.error(f"Failed to verify email for {email}: {verify_response.text}") | |
| return False | |
| else: | |
| logging.error(f"Failed to extract verification link from email for {email}") | |
| return False | |
| time.sleep(5) | |
| except Exception as e: | |
| logging.error(f"Error during email verification: {e}") | |
| logging.warning(f"No verification email found for {email}") | |
| return False | |
| def register_and_verify(num_accounts=1): | |
| successful_accounts = [] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: | |
| futures = [] | |
| for _ in range(num_accounts): | |
| email = get_temp_email() | |
| if email: | |
| futures.append(executor.submit(process_account, email)) | |
| time.sleep(RETRY_DELAY) | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| if result: | |
| successful_accounts.append(result) | |
| return successful_accounts | |
| def process_account(email): | |
| try: | |
| # 使用环境变量获取密码 | |
| password = generate_strong_password() | |
| if register(email, password): | |
| save_result = save_account(email, password) | |
| if not save_result: | |
| logging.error(f"Failed to save account {email}") | |
| return None | |
| if check_email(email): | |
| logging.info(f"Account fully processed: {email}") | |
| return {'email': email, 'password': password} | |
| except Exception as e: | |
| logging.error(f"Error processing account {email}: {e}") | |
| return None | |
| def main(): | |
| num_accounts = 5 # 可以根据需要修改注册数量 | |
| successful_accounts = register_and_verify(num_accounts) | |
| # 仅输出到控制台 | |
| print(f"Successfully registered {len(successful_accounts)} accounts") | |
| for account in successful_accounts: | |
| print(f"Email: {account['email']}, Password: {account['password']}") | |
| if __name__ == "__main__": | |
| main() | |