Spaces:
Paused
Paused
| import json | |
| import os | |
| import time | |
| import uuid | |
| import argparse # 导入参数解析库 | |
| import random | |
| import string | |
| import logging | |
| import requests | |
| from flask import Flask, render_template, request, jsonify, send_from_directory | |
| from flask_cors import CORS | |
| # 导入 pikpak.py 中的函数 | |
| from utils.pk_email import connect_imap | |
| from utils.pikpak import ( | |
| sign_encrypt, | |
| captcha_image_parse, | |
| ramdom_version, | |
| random_rtc_token, | |
| PikPak, | |
| save_account_info, | |
| test_proxy, | |
| ) | |
| # 导入 email_client | |
| from utils.email_client import EmailClient | |
| # 重试参数 | |
| max_retries = 3 | |
| retry_delay = 1.0 | |
| # 设置日志 | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # 定义一个retry函数,用于重试指定的函数 | |
| def retry_function(func, *args, max_retries=3, delay=1, **kwargs): | |
| """ | |
| 对指定函数进行重试 | |
| Args: | |
| func: 要重试的函数 | |
| *args: 传递给函数的位置参数 | |
| max_retries: 最大重试次数,默认为3 | |
| delay: 每次重试之间的延迟(秒),默认为1 | |
| **kwargs: 传递给函数的关键字参数 | |
| Returns: | |
| 函数的返回值,如果所有重试都失败则返回None | |
| """ | |
| retries = 0 | |
| result = None | |
| while retries < max_retries: | |
| if retries > 0: | |
| logger.info(f"第 {retries} 次重试函数 {func.__name__}...") | |
| result = func(*args, **kwargs) | |
| # 如果函数返回非None结果,视为成功 | |
| if result is not None: | |
| if retries > 0: | |
| logger.info(f"在第 {retries} 次重试后成功") | |
| return result | |
| # 如果达到最大重试次数,返回最后一次结果 | |
| if retries >= max_retries - 1: | |
| logger.warning(f"函数 {func.__name__} 在 {max_retries} 次尝试后失败") | |
| break | |
| # 等待指定的延迟时间 | |
| time.sleep(delay) | |
| retries += 1 | |
| return result | |
| # 解析命令行参数 | |
| parser = argparse.ArgumentParser(description="PikPak 自动邀请注册系统") | |
| args = parser.parse_args() | |
| app = Flask(__name__, static_url_path='/assets') | |
| # cors | |
| CORS(app, resources={r"/*": {"origins": "*"}}) | |
| app.secret_key = os.urandom(24) | |
| # 全局字典用于存储用户处理过程中的数据,以 email 为键 | |
| user_process_data = {} | |
| def health_check(): | |
| in_huggingface = ( | |
| os.environ.get("SPACE_ID") is not None or os.environ.get("SYSTEM") == "spaces" | |
| ) | |
| return jsonify( | |
| { | |
| "status": "OK", | |
| } | |
| ) | |
| def initialize(): | |
| # 获取用户表单输入 | |
| use_proxy = request.form.get("use_proxy") == "true" | |
| proxy_url = request.form.get("proxy_url", "") | |
| invite_code = request.form.get("invite_code", "") | |
| email = request.form.get("email", "") | |
| # 初始化参数 | |
| current_version = ramdom_version() | |
| version = current_version["v"] | |
| algorithms = current_version["algorithms"] | |
| client_id = "YNxT9w7GMdWvEOKa" | |
| client_secret = "dbw2OtmVEeuUvIptb1Coyg" | |
| package_name = "com.pikcloud.pikpak" | |
| device_id = str(uuid.uuid4()).replace("-", "") | |
| rtc_token = random_rtc_token() | |
| # 将这些参数存储到会话中以便后续使用 | |
| # session["use_proxy"] = use_proxy | |
| # session["proxy_url"] = proxy_url | |
| # session["invite_code"] = invite_code | |
| # session["email"] = email | |
| # session["version"] = version | |
| # session["algorithms"] = algorithms | |
| # session["client_id"] = client_id | |
| # session["client_secret"] = client_secret | |
| # session["package_name"] = package_name | |
| # session["device_id"] = device_id | |
| # session["rtc_token"] = rtc_token | |
| # 如果启用代理,则测试代理 | |
| proxy_status = None | |
| if use_proxy: | |
| proxy_status = test_proxy(proxy_url) | |
| # 创建PikPak实例 | |
| pikpak = PikPak( | |
| invite_code, | |
| client_id, | |
| device_id, | |
| version, | |
| algorithms, | |
| email, | |
| rtc_token, | |
| client_secret, | |
| package_name, | |
| use_proxy=use_proxy, | |
| proxy_http=proxy_url, | |
| proxy_https=proxy_url, | |
| ) | |
| # 初始化验证码 | |
| init_result = pikpak.init("POST:/v1/auth/verification") | |
| if ( | |
| not init_result | |
| or not isinstance(init_result, dict) | |
| or "captcha_token" not in init_result | |
| ): | |
| return jsonify( | |
| {"status": "error", "message": "初始化失败,请检查网络连接或代理设置"} | |
| ) | |
| # 将用户数据存储在全局字典中 | |
| user_data = { | |
| "use_proxy": use_proxy, | |
| "proxy_url": proxy_url, | |
| "invite_code": invite_code, | |
| "email": email, | |
| "version": version, | |
| "algorithms": algorithms, | |
| "client_id": client_id, | |
| "client_secret": client_secret, | |
| "package_name": package_name, | |
| "device_id": device_id, | |
| "rtc_token": rtc_token, | |
| "captcha_token": pikpak.captcha_token, # Store captcha_token here | |
| } | |
| user_process_data[email] = user_data | |
| # 将验证码令牌保存到会话中 - REMOVED | |
| # session["captcha_token"] = pikpak.captcha_token | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "message": "初始化成功,请进行滑块验证", | |
| "email": email, # Return email to client | |
| "proxy_status": proxy_status, | |
| "version": version, | |
| "device_id": device_id, | |
| "rtc_token": rtc_token, | |
| } | |
| ) | |
| def verify_captcha(): | |
| # 尝试从表单或JSON获取email | |
| email = request.form.get('email') | |
| if not email and request.is_json: | |
| data = request.get_json() | |
| email = data.get('email') | |
| if not email: | |
| return jsonify({"status": "error", "message": "请求中未提供Email"}) | |
| # 从全局字典获取用户数据 | |
| user_data = user_process_data.get(email) | |
| if not user_data: | |
| return jsonify({"status": "error", "message": "会话数据不存在或已过期,请重新初始化"}) | |
| # 从 user_data 中获取存储的数据 | |
| device_id = user_data.get("device_id") | |
| # email = user_data.get("email") # Email is now the key, already have it | |
| invite_code = user_data.get("invite_code") | |
| client_id = user_data.get("client_id") | |
| version = user_data.get("version") | |
| algorithms = user_data.get("algorithms") | |
| rtc_token = user_data.get("rtc_token") | |
| client_secret = user_data.get("client_secret") | |
| package_name = user_data.get("package_name") | |
| use_proxy = user_data.get("use_proxy") | |
| proxy_url = user_data.get("proxy_url") | |
| captcha_token = user_data.get("captcha_token", "") # Use get with default | |
| # Check if essential data is present (device_id is checked as example) | |
| if not device_id: | |
| return jsonify({"status": "error", "message": "必要的会话数据丢失,请重新初始化"}) | |
| # 创建PikPak实例 (使用从 user_data 获取的数据) | |
| pikpak = PikPak( | |
| invite_code, | |
| client_id, | |
| device_id, | |
| version, | |
| algorithms, | |
| email, | |
| rtc_token, | |
| client_secret, | |
| package_name, | |
| use_proxy=use_proxy, | |
| proxy_http=proxy_url, | |
| proxy_https=proxy_url, | |
| ) | |
| # 从 user_data 设置验证码令牌 | |
| pikpak.captcha_token = captcha_token | |
| # 尝试验证码验证 | |
| max_attempts = 5 | |
| captcha_result = None | |
| for attempt in range(max_attempts): | |
| try: | |
| captcha_result = captcha_image_parse(pikpak, device_id) | |
| if ( | |
| captcha_result | |
| and "response_data" in captcha_result | |
| and captcha_result["response_data"].get("result") == "accept" | |
| ): | |
| break | |
| time.sleep(2) | |
| except Exception as e: | |
| time.sleep(2) | |
| if ( | |
| not captcha_result | |
| or "response_data" not in captcha_result | |
| or captcha_result["response_data"].get("result") != "accept" | |
| ): | |
| return jsonify({"status": "error", "message": "滑块验证失败,请重试"}) | |
| # 滑块验证加密 | |
| try: | |
| executor_info = pikpak.executor() | |
| if not executor_info: | |
| return jsonify({"status": "error", "message": "获取executor信息失败"}) | |
| sign_encrypt_info = sign_encrypt( | |
| executor_info, | |
| pikpak.captcha_token, | |
| rtc_token, | |
| pikpak.use_proxy, | |
| pikpak.proxies, | |
| ) | |
| if ( | |
| not sign_encrypt_info | |
| or "request_id" not in sign_encrypt_info | |
| or "sign" not in sign_encrypt_info | |
| ): | |
| return jsonify({"status": "error", "message": "签名加密失败"}) | |
| # 更新验证码令牌 | |
| report_result = pikpak.report( | |
| sign_encrypt_info["request_id"], | |
| sign_encrypt_info["sign"], | |
| captcha_result["pid"], | |
| captcha_result["traceid"], | |
| ) | |
| # 请求邮箱验证码 | |
| verification_result = pikpak.verification() | |
| if ( | |
| not verification_result | |
| or not isinstance(verification_result, dict) | |
| or "verification_id" not in verification_result | |
| ): | |
| return jsonify({"status": "error", "message": "请求验证码失败"}) | |
| # 将更新的数据保存到 user_data 中 | |
| user_data["captcha_token"] = pikpak.captcha_token | |
| user_data["verification_id"] = pikpak.verification_id | |
| return jsonify({"status": "success", "message": "验证码已发送到邮箱,请查收"}) | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| return jsonify( | |
| { | |
| "status": "error", | |
| "message": f"验证过程出错: {str(e)}", | |
| "trace": error_trace, | |
| } | |
| ) | |
| def gen_password(): | |
| # 生成12位密码 | |
| return "".join(random.choices(string.ascii_letters + string.digits, k=12)) | |
| def register(): | |
| # 从表单获取验证码和email | |
| verification_code = request.form.get("verification_code") | |
| email = request.form.get('email') # Get email from form | |
| if not email: | |
| return jsonify({"status": "error", "message": "请求中未提供Email"}) | |
| if not verification_code: | |
| return jsonify({"status": "error", "message": "验证码不能为空"}) | |
| # 从全局字典获取用户数据 | |
| user_data = user_process_data.get(email) | |
| if not user_data: | |
| return jsonify({"status": "error", "message": "会话数据不存在或已过期,请重新初始化"}) | |
| # 从 user_data 中获取存储的数据 | |
| device_id = user_data.get("device_id") | |
| # email = user_data.get("email") # Already have email | |
| invite_code = user_data.get("invite_code") | |
| client_id = user_data.get("client_id") | |
| version = user_data.get("version") | |
| algorithms = user_data.get("algorithms") | |
| rtc_token = user_data.get("rtc_token") | |
| client_secret = user_data.get("client_secret") | |
| package_name = user_data.get("package_name") | |
| use_proxy = user_data.get("use_proxy") | |
| proxy_url = user_data.get("proxy_url") | |
| verification_id = user_data.get("verification_id") | |
| captcha_token = user_data.get("captcha_token", "") | |
| # Check if essential data is present | |
| if not device_id or not verification_id: | |
| return jsonify({"status": "error", "message": "必要的会话数据丢失,请重新初始化"}) | |
| # 创建PikPak实例 | |
| pikpak = PikPak( | |
| invite_code, | |
| client_id, | |
| device_id, | |
| version, | |
| algorithms, | |
| email, | |
| rtc_token, | |
| client_secret, | |
| package_name, | |
| use_proxy=use_proxy, | |
| proxy_http=proxy_url, | |
| proxy_https=proxy_url, | |
| ) | |
| # 从 user_data 中设置验证码令牌和验证ID | |
| pikpak.captcha_token = captcha_token | |
| pikpak.verification_id = verification_id | |
| # 验证验证码 | |
| pikpak.verify_post(verification_code) | |
| # 刷新时间戳并加密签名值 | |
| pikpak.init("POST:/v1/auth/signup") | |
| # 注册并登录 | |
| name = email.split("@")[0] | |
| password = gen_password() # 默认密码 | |
| signup_result = pikpak.signup(name, password, verification_code) | |
| # 填写邀请码 | |
| pikpak.activation_code() | |
| if ( | |
| not signup_result | |
| or not isinstance(signup_result, dict) | |
| or "access_token" not in signup_result | |
| ): | |
| return jsonify({"status": "error", "message": "注册失败,请检查验证码或重试"}) | |
| # 保存账号信息到JSON文件 | |
| account_info = { | |
| "captcha_token": pikpak.captcha_token, | |
| "timestamp": pikpak.timestamp, | |
| "name": name, | |
| "email": email, | |
| "password": password, | |
| "device_id": device_id, | |
| "version": version, | |
| "user_id": signup_result.get("sub", ""), | |
| "access_token": signup_result.get("access_token", ""), | |
| "refresh_token": signup_result.get("refresh_token", ""), | |
| "invite_code": invite_code, | |
| } | |
| # 保存账号信息 | |
| account_file = save_account_info(name, account_info) | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "message": "注册成功!账号已保存。", | |
| "account_info": account_info, | |
| } | |
| ) | |
| def test_proxy_route(): | |
| proxy_url = request.form.get("proxy_url", "http://127.0.0.1:7890") | |
| result = test_proxy(proxy_url) | |
| return jsonify( | |
| { | |
| "status": "success" if result else "error", | |
| "message": "代理连接测试成功" if result else "代理连接测试失败", | |
| } | |
| ) | |
| def get_verification(): | |
| """ | |
| 处理获取验证码的请求 | |
| """ | |
| email_user = request.form["email"] | |
| email_password = request.form["password"] | |
| # 先尝试从收件箱获取验证码 | |
| result = connect_imap(email_user, email_password, "INBOX") | |
| # 如果收件箱没有找到验证码,则尝试从垃圾邮件中查找 | |
| if result["code"] == 0: | |
| result = connect_imap(email_user, email_password, "Junk") | |
| return jsonify(result) | |
| def fetch_accounts(): | |
| # 获取account文件夹中的所有JSON文件 | |
| account_files = [] | |
| for file in os.listdir("account"): | |
| if file.endswith(".json"): | |
| file_path = os.path.join("account", file) | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| account_data = json.load(f) | |
| if isinstance(account_data, dict): | |
| # 添加文件名属性,用于后续操作 | |
| account_data["filename"] = file | |
| account_files.append(account_data) | |
| except Exception as e: | |
| logger.error(f"Error reading {file}: {str(e)}") | |
| if not account_files: | |
| return jsonify( | |
| {"status": "info", "message": "没有找到保存的账号", "accounts": []} | |
| ) | |
| account_files.sort(key=lambda x: x.get("timestamp", ""), reverse=True) | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "message": f"找到 {len(account_files)} 个账号", | |
| "accounts": account_files, | |
| } | |
| ) | |
| def update_account(): | |
| data = request.json | |
| if not data or "filename" not in data or "account_data" not in data: | |
| return jsonify({"status": "error", "message": "请求数据不完整"}) | |
| filename = data.get("filename") | |
| account_data = data.get("account_data") | |
| # 安全检查文件名 | |
| if not filename or ".." in filename or not filename.endswith(".json"): | |
| return jsonify({"status": "error", "message": "无效的文件名"}) | |
| file_path = os.path.join("account", filename) | |
| try: | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(account_data, f, indent=4, ensure_ascii=False) | |
| return jsonify({"status": "success", "message": "账号已成功更新"}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"更新账号时出错: {str(e)}"}) | |
| def delete_account(): | |
| # 检查是否是单个文件名还是文件名列表 | |
| if 'filenames' in request.form: | |
| # 批量删除模式 | |
| filenames = request.form.getlist('filenames') | |
| if not filenames: | |
| return jsonify({"status": "error", "message": "未提供文件名"}) | |
| results = { | |
| "success": [], | |
| "failed": [] | |
| } | |
| for filename in filenames: | |
| # 安全检查文件名 | |
| if ".." in filename or not filename.endswith(".json"): | |
| results["failed"].append({"filename": filename, "reason": "无效的文件名"}) | |
| continue | |
| file_path = os.path.join("account", filename) | |
| try: | |
| # 检查文件是否存在 | |
| if not os.path.exists(file_path): | |
| results["failed"].append({"filename": filename, "reason": "账号文件不存在"}) | |
| continue | |
| # 删除文件 | |
| os.remove(file_path) | |
| results["success"].append(filename) | |
| except Exception as e: | |
| results["failed"].append({"filename": filename, "reason": str(e)}) | |
| # 返回批量删除结果 | |
| if len(results["success"]) > 0: | |
| if len(results["failed"]) > 0: | |
| message = f"成功删除 {len(results['success'])} 个账号,{len(results['failed'])} 个账号删除失败" | |
| status = "partial" | |
| else: | |
| message = f"成功删除 {len(results['success'])} 个账号" | |
| status = "success" | |
| else: | |
| message = "所有账号删除失败" | |
| status = "error" | |
| return jsonify({ | |
| "status": status, | |
| "message": message, | |
| "results": results | |
| }) | |
| else: | |
| # 保持向后兼容 - 单个文件删除模式 | |
| filename = request.form.get("filename") | |
| if not filename: | |
| return jsonify({"status": "error", "message": "未提供文件名"}) | |
| # 安全检查文件名 | |
| if ".." in filename or not filename.endswith(".json"): | |
| return jsonify({"status": "error", "message": "无效的文件名"}) | |
| file_path = os.path.join("account", filename) | |
| try: | |
| # 检查文件是否存在 | |
| if not os.path.exists(file_path): | |
| return jsonify({"status": "error", "message": "账号文件不存在"}) | |
| # 删除文件 | |
| os.remove(file_path) | |
| return jsonify({"status": "success", "message": "账号已成功删除"}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"删除账号时出错: {str(e)}"}) | |
| def activate_account_with_names(): | |
| try: | |
| data = request.json | |
| key = data.get("key") | |
| names = data.get("names", []) # 获取指定的账户名称列表 | |
| all_accounts = data.get("all", False) # 获取是否处理所有账户的标志 | |
| if not key: | |
| return jsonify({"status": "error", "message": "密钥不能为空"}) | |
| if not all_accounts and (not names or not isinstance(names, list)): | |
| return jsonify({"status": "error", "message": "请提供有效的账户名称列表或设置 all=true"}) | |
| # 存储账号数据及其文件路径 | |
| accounts_with_paths = [] | |
| for file in os.listdir("account"): | |
| if file.endswith(".json"): | |
| # 如果 all=true 或者文件名在指定的names列表中,则处理该文件 | |
| file_name_without_ext = os.path.splitext(file)[0] | |
| if all_accounts or file_name_without_ext in names or file in names: | |
| file_path = os.path.join("account", file) | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| account_data = json.load(f) | |
| # 保存文件路径以便后续更新 | |
| accounts_with_paths.append( | |
| {"path": file_path, "data": account_data} | |
| ) | |
| if not accounts_with_paths: | |
| return jsonify({"status": "error", "message": "未找到指定的账号数据"}) | |
| # 使用多线程处理每个账号 | |
| import threading | |
| import queue | |
| # 创建结果队列 | |
| result_queue = queue.Queue() | |
| # 定义线程处理函数 | |
| def process_account(account_with_path, account_key, result_q): | |
| try: | |
| file_path = account_with_path["path"] | |
| single_account = account_with_path["data"] | |
| response = requests.post( | |
| headers={ | |
| "Content-Type": "application/json", | |
| "referer": "https://inject.kiteyuan.info/", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0", | |
| }, | |
| url="https://inject.kiteyuan.info/infoInject", | |
| json={"info": single_account, "key": account_key}, | |
| timeout=30, | |
| ) | |
| # 将结果放入队列 | |
| if response.status_code == 200: | |
| api_result = response.json() | |
| # 检查API是否返回了正确的数据格式 | |
| if isinstance(api_result, dict) and api_result.get("code") == 200 and "data" in api_result: | |
| # 获取返回的数据对象 | |
| account_data = api_result.get("data", {}) | |
| if account_data and isinstance(account_data, dict): | |
| # 更新账号信息 | |
| updated_account = single_account.copy() | |
| # 更新令牌信息 (从data子对象中提取) | |
| for key in ["access_token", "refresh_token", "captcha_token", "timestamp", "device_id", "user_id"]: | |
| if key in account_data: | |
| updated_account[key] = account_data[key] | |
| # 保存更新后的账号数据 | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(updated_account, f, indent=4, ensure_ascii=False) | |
| # 将更新后的数据放入结果队列 | |
| result_q.put( | |
| { | |
| "status": "success", | |
| "account": single_account.get("email", "未知邮箱"), | |
| "result": account_data, | |
| "updated": True, | |
| } | |
| ) | |
| else: | |
| # 返回的data不是字典类型 | |
| result_q.put( | |
| { | |
| "status": "error", | |
| "account": single_account.get("email", "未知邮箱"), | |
| "message": "返回的数据格式不符合预期", | |
| "result": api_result, | |
| } | |
| ) | |
| else: | |
| # API返回错误码或格式不符合预期 | |
| error_msg = api_result.get("msg", "未知错误") | |
| result_q.put( | |
| { | |
| "status": "error", | |
| "account": single_account.get("email", "未知邮箱"), | |
| "message": f"激活失败: {error_msg}", | |
| "result": api_result, | |
| } | |
| ) | |
| else: | |
| result_q.put( | |
| { | |
| "status": "error", | |
| "account": single_account.get("email", "未知邮箱"), | |
| "message": f"激活失败: HTTP {response.status_code}-{response.json().get('detail', '未知错误')}", | |
| "result": response.text, | |
| } | |
| ) | |
| except Exception as e: | |
| result_q.put( | |
| { | |
| "status": "error", | |
| "account": single_account.get("email", "未知邮箱"), | |
| "message": f"处理失败: {str(e)}", | |
| } | |
| ) | |
| # 创建并启动线程 | |
| threads = [] | |
| for account_with_path in accounts_with_paths: | |
| thread = threading.Thread( | |
| target=process_account, args=(account_with_path, key, result_queue) | |
| ) | |
| threads.append(thread) | |
| thread.start() | |
| # 等待所有线程完成 | |
| for thread in threads: | |
| thread.join() | |
| # 收集所有结果 | |
| results = [] | |
| while not result_queue.empty(): | |
| results.append(result_queue.get()) | |
| # 统计成功和失败的数量 | |
| success_count = sum(1 for r in results if r["status"] == "success") | |
| updated_count = sum( | |
| 1 | |
| for r in results | |
| if r.get("status") == "success" and r.get("updated", False) | |
| ) | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "message": f"账号激活完成: {success_count}/{len(accounts_with_paths)}个成功, {updated_count}个已更新数据", | |
| "results": results, | |
| } | |
| ) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"操作失败: {str(e)}"}) | |
| def check_email_inventory(): | |
| try: | |
| # 发送请求到库存API | |
| response = requests.get( | |
| url="https://zizhu.shanyouxiang.com/kucun", | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" | |
| }, | |
| timeout=10, | |
| ) | |
| if response.status_code == 200: | |
| return jsonify({"status": "success", "inventory": response.json()}) | |
| else: | |
| return jsonify( | |
| { | |
| "status": "error", | |
| "message": f"获取库存失败: HTTP {response.status_code}", | |
| } | |
| ) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"获取库存时出错: {str(e)}"}) | |
| def check_balance(): | |
| try: | |
| # 从请求参数中获取卡号 | |
| card = request.args.get("card") | |
| if not card: | |
| return jsonify({"status": "error", "message": "未提供卡号参数"}) | |
| # 发送请求到余额查询API | |
| response = requests.get( | |
| url="https://zizhu.shanyouxiang.com/yue", | |
| params={"card": card}, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" | |
| }, | |
| timeout=10, | |
| ) | |
| if response.status_code == 200: | |
| return jsonify({"status": "success", "balance": response.json()}) | |
| else: | |
| return jsonify( | |
| { | |
| "status": "error", | |
| "message": f"查询余额失败: HTTP {response.status_code}", | |
| } | |
| ) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"查询余额时出错: {str(e)}"}) | |
| def extract_emails(): | |
| try: | |
| # 从请求参数中获取必需的参数 | |
| card = request.args.get("card") | |
| shuliang = request.args.get("shuliang") | |
| leixing = request.args.get("leixing") | |
| # 获取前端传递的重试次数计数器,如果没有则初始化为0 | |
| frontend_retry_count = int(request.args.get("retry_count", "0")) | |
| # 验证必需的参数 | |
| if not card: | |
| return jsonify({"status": "error", "message": "未提供卡号参数"}) | |
| if not shuliang: | |
| return jsonify({"status": "error", "message": "未提供提取数量参数"}) | |
| if not leixing or leixing not in ["outlook", "hotmail"]: | |
| return jsonify( | |
| { | |
| "status": "error", | |
| "message": "提取类型参数无效,必须为 outlook 或 hotmail", | |
| } | |
| ) | |
| # 尝试将数量转换为整数 | |
| try: | |
| shuliang_int = int(shuliang) | |
| if shuliang_int < 1 or shuliang_int > 2000: | |
| return jsonify( | |
| {"status": "error", "message": "提取数量必须在1到2000之间"} | |
| ) | |
| except ValueError: | |
| return jsonify({"status": "error", "message": "提取数量必须为整数"}) | |
| # 后端重试计数器 | |
| retry_count = 0 | |
| max_retries = 20 # 单次后端请求的最大重试次数 | |
| retry_delay = 0 # 每次重试间隔秒数 | |
| # 记录总的前端+后端重试次数,用于展示给用户 | |
| total_retry_count = frontend_retry_count | |
| while retry_count < max_retries: | |
| # 发送请求到邮箱提取API | |
| response = requests.get( | |
| url="https://zizhu.shanyouxiang.com/huoqu", | |
| params={"card": card, "shuliang": shuliang, "leixing": leixing}, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" | |
| }, | |
| timeout=10, # 降低单次请求的超时时间,以便更快地进行重试 | |
| ) | |
| if response.status_code == 200: | |
| # 检查响应是否为JSON格式,如果是,通常表示没有库存 | |
| try: | |
| json_response = response.json() | |
| if isinstance(json_response, dict) and "msg" in json_response: | |
| # 没有库存,需要重试 | |
| retry_count += 1 | |
| total_retry_count += 1 | |
| # 如果达到后端最大重试次数,返回特殊状态让前端继续重试 | |
| if retry_count >= max_retries: | |
| return jsonify( | |
| { | |
| "status": "retry", | |
| "message": f"暂无库存: {json_response['msg']},已重试{total_retry_count}次,继续尝试中...", | |
| "retry_count": total_retry_count, | |
| } | |
| ) | |
| # 等待一段时间后重试 | |
| time.sleep(retry_delay) | |
| continue | |
| except ValueError: | |
| # 不是JSON格式,可能是成功的文本列表响应 | |
| pass | |
| # 处理文本响应 | |
| response_text = response.text.strip() | |
| # 解析响应文本为邮箱列表 | |
| emails = [] | |
| if response_text: | |
| for line in response_text.split("\n"): | |
| if line.strip(): | |
| emails.append(line.strip()) | |
| # 如果没有实际提取到邮箱(可能是空文本响应),继续重试 | |
| if not emails: | |
| retry_count += 1 | |
| total_retry_count += 1 | |
| if retry_count >= max_retries: | |
| return jsonify( | |
| { | |
| "status": "retry", | |
| "message": f"未能获取到邮箱,已重试{total_retry_count}次,继续尝试中...", | |
| "retry_count": total_retry_count, | |
| } | |
| ) | |
| time.sleep(retry_delay) | |
| continue | |
| # 成功获取到邮箱,返回结果 | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "emails": emails, | |
| "count": len(emails), | |
| "retries": total_retry_count, | |
| "message": f"成功获取{len(emails)}个邮箱,总共重试{total_retry_count}次", | |
| } | |
| ) | |
| else: | |
| # 请求失败,返回错误 | |
| return jsonify( | |
| { | |
| "status": "error", | |
| "message": f"提取邮箱失败: HTTP {response.status_code}", | |
| "response": response.text, | |
| } | |
| ) | |
| # 如果执行到这里,说明超过了最大重试次数 | |
| return jsonify( | |
| { | |
| "status": "retry", | |
| "message": f"暂无邮箱库存,已重试{total_retry_count}次,继续尝试中...", | |
| "retry_count": total_retry_count, | |
| } | |
| ) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": f"提取邮箱时出错: {str(e)}"}) | |
| # --- 新增API:通过EmailClient获取验证码 --- | |
| def get_email_verification_code_api(): | |
| """ | |
| 通过 EmailClient (通常是基于HTTP API的邮件服务) 获取验证码。 | |
| 接收 JSON 或 Form data。 | |
| 必需参数: email, token, client_id | |
| 可选参数: password, api_base_url, mailbox, code_regex, max_retries, retry_delay | |
| 如果EmailClient方法失败,将尝试使用connect_imap作为备用方法。 | |
| 如果用户之前已配置代理,也会使用相同的代理设置。 | |
| """ | |
| global max_retries, retry_delay | |
| if request.is_json: | |
| data = request.get_json() | |
| else: | |
| data = request.form | |
| email = data.get('email') | |
| token = data.get('token') # 对应 EmailClient 的 refresh_token | |
| client_id = data.get('client_id') | |
| if not all([email, token, client_id]): | |
| return jsonify({"status": "error", "message": "缺少必需参数: email, token, client_id"}), 400 | |
| # 获取可选参数 | |
| password = data.get('password') | |
| api_base_url = data.get('api_base_url') # 如果提供,将覆盖 EmailClient 的默认设置 | |
| mailbox = data.get('mailbox', "INBOX") | |
| code_regex = data.get('code_regex', r'\b\d{6}\b') # 默认匹配6位数字 | |
| # 检查是否在用户处理数据中有该邮箱,并提取代理设置 | |
| use_proxy = False | |
| proxy_url = None | |
| if email in user_process_data: | |
| user_data = user_process_data.get(email, {}) | |
| use_proxy = user_data.get("use_proxy", False) | |
| proxy_url = user_data.get("proxy_url", "") if use_proxy else None | |
| logger.info(f"为邮箱 {email} 使用代理设置: {use_proxy}, {proxy_url}") | |
| try: | |
| # 实例化 EmailClient,传入代理设置 | |
| email_client = EmailClient(api_base_url=api_base_url) | |
| # 设置代理(如果 EmailClient 类支持代理配置) | |
| if use_proxy and proxy_url and hasattr(email_client, 'set_proxy'): | |
| email_client.set_proxy(proxy_url) | |
| elif use_proxy and proxy_url: | |
| logger.warning("EmailClient 类不支持设置代理") | |
| # 使用重试机制调用获取验证码的方法 | |
| verification_code = retry_function( | |
| email_client.get_verification_code, | |
| token=token, | |
| client_id=client_id, | |
| email=email, | |
| password=password, | |
| mailbox=mailbox, | |
| code_regex=code_regex, | |
| max_retries=max_retries, | |
| delay=retry_delay | |
| ) | |
| if verification_code: | |
| return jsonify({"status": "success", "verification_code": verification_code}) | |
| else: | |
| # EmailClient 失败,尝试使用connect_imap作为备用方法 | |
| logger.info(f"EmailClient在{max_retries}次尝试后未能找到验证码,尝试使用connect_imap作为备用方法") | |
| # 检查是否有password参数 | |
| if not password: | |
| return jsonify({"status": "error", "msg": "EmailClient失败,且未提供password参数,无法使用备用方法"}), 200 | |
| # 先尝试从收件箱获取验证码,传入代理设置 | |
| result = connect_imap(email, password, "INBOX", use_proxy=use_proxy, proxy_url=proxy_url) | |
| # 如果收件箱没有找到验证码,则尝试从垃圾邮件中查找 | |
| if result["code"] == 0: | |
| result = connect_imap(email, password, "Junk", use_proxy=use_proxy, proxy_url=proxy_url) | |
| logger.info(f"catch 当前Oauth登录失败,IMAP结果如下:{result['msg']}") | |
| result["msg"] = f"当前Oauth登录失败,IMAP结果如下:{result['msg']}" | |
| if result["code"] == 0: | |
| return jsonify({"status": "error", "msg": "收件箱和垃圾邮件中均未找到验证码"}), 200 | |
| elif result["code"] == 200: | |
| return jsonify({"status": "success", "verification_code": result["verification_code"], "msg": result["msg"]}) | |
| else: | |
| return jsonify({"status": "error", "msg": result["msg"]}), 200 | |
| except Exception as e: | |
| # 捕获实例化或调用过程中的其他潜在错误 | |
| logger.error(f"处理 /api/get_email_verification_code 时出错: {str(e)}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| # 如果有password参数,尝试使用connect_imap作为备用方法 | |
| if password: | |
| logger.info(f"EmailClient出现异常,尝试使用connect_imap作为备用方法") | |
| try: | |
| # 先尝试从收件箱获取验证码,传入代理设置 | |
| result = connect_imap(email, password, "INBOX", use_proxy=use_proxy, proxy_url=proxy_url) | |
| # 如果收件箱没有找到验证码,则尝试从垃圾邮件中查找 | |
| if result["code"] == 0: | |
| result = connect_imap(email, password, "Junk", use_proxy=use_proxy, proxy_url=proxy_url) | |
| logger.info(f"catch 当前Oauth登录失败,IMAP结果如下:{result['msg']}") | |
| result["msg"] = f"当前Oauth登录失败,IMAP结果如下:{result['msg']}" | |
| if result["code"] == 0: | |
| return jsonify({"status": "error", "msg": "收件箱和垃圾邮件中均未找到验证码"}), 200 | |
| elif result["code"] == 200: | |
| return jsonify({"status": "success", "verification_code": result["verification_code"], "msg": result["msg"]}) | |
| else: | |
| return jsonify({"status": "error", "msg": result["msg"]}), 200 | |
| except Exception as backup_error: | |
| logger.error(f"备用方法connect_imap也失败: {str(backup_error)}") | |
| return jsonify({"status": "error", "message": f"主要和备用验证码获取方法均出现错误"}), 500 | |
| return jsonify({"status": "error", "message": f"处理请求时发生内部错误"}), 500 | |
| # 处理所有前端路由 | |
| def serve(path): | |
| #favicon vite.svg | |
| if path == 'favicon.ico' or path == 'vite.svg': | |
| return send_from_directory("static", path) | |
| # 对于所有其他请求 - 返回index.html (SPA入口点) | |
| return render_template('index.html') | |
| if __name__ == "__main__": | |
| app.run(debug=False, host="0.0.0.0", port=5000) | |