Spaces:
Paused
Paused
| # coding:utf-8 | |
| import hashlib | |
| import json | |
| import random | |
| import time | |
| import uuid | |
| import requests | |
| from PIL import Image | |
| from io import BytesIO | |
| import base64 | |
| import os | |
| def ca_f_encrypt(frames, index, pid, use_proxy=False, proxies=None): | |
| url = "https://kiteyuan.233285.xyz/cafEncrypt" | |
| payload = json.dumps({ | |
| "frames": frames, | |
| "index": index, | |
| "pid": pid | |
| }) | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
| "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "Connection": "keep-alive", | |
| 'Content-Type': 'application/json', | |
| "Upgrade-Insecure-Requests": "1", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "none", | |
| "Sec-Fetch-User": "?1" | |
| } | |
| try: | |
| # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None) | |
| response = requests.request("POST", url, headers=headers, data=payload, proxies=None) | |
| response.raise_for_status() | |
| if not response.text: | |
| print(f"API响应为空: {url}") | |
| return {"f": "", "ca": ["", "", "", ""]} | |
| # 解析响应以确保与 text.py 行为一致 | |
| result = json.loads(response.text) | |
| if "f" not in result or "ca" not in result: | |
| print(f"API响应缺少关键字段: {result}") | |
| return {"f": "", "ca": ["", "", "", ""]} | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| print(f"API请求失败: {e}") | |
| if use_proxy: | |
| print(f"当前使用的代理: {proxies}") | |
| return {"f": "", "ca": ["", "", "", ""]} | |
| except json.JSONDecodeError as e: | |
| print(f"JSON解析错误: {e}, 响应内容: {response.text}") | |
| return {"f": "", "ca": ["", "", "", ""]} | |
| def image_parse(image, frames, use_proxy=False, proxies=None): | |
| url = "https://kiteyuan.233285.xyz/imageParse" | |
| payload = json.dumps({ | |
| "image": image, | |
| "frames": frames | |
| }) | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
| "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "Connection": "keep-alive", | |
| 'Content-Type': 'application/json', | |
| "Upgrade-Insecure-Requests": "1", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "none", | |
| "Sec-Fetch-User": "?1" | |
| } | |
| try: | |
| # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None) | |
| response = requests.request("POST", url, headers=headers, data=payload, proxies=None) | |
| response.raise_for_status() # 检查HTTP状态码 | |
| if not response.text: | |
| print(f"API响应为空: {url}") | |
| return {"best_index": 0} # 返回一个默认值 | |
| # 解析响应以确保与 text.py 行为一致 | |
| result = json.loads(response.text) | |
| if "best_index" not in result: | |
| print(f"API响应缺少best_index字段: {result}") | |
| return {"best_index": 0} | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| print(f"API请求失败: {e}") | |
| if use_proxy: | |
| print(f"当前使用的代理: {proxies}") | |
| return {"best_index": 0} # 返回一个默认值 | |
| except json.JSONDecodeError as e: | |
| print(f"JSON解析错误: {e}, 响应内容: {response.text}") | |
| return {"best_index": 0} # 返回一个默认值 | |
| def sign_encrypt(code, captcha_token, rtc_token, use_proxy=False, proxies=None): | |
| url = "https://kiteyuan.233285.xyz/signEncrypt" | |
| # 检查 code 是否为空或 None | |
| if not code: | |
| print("code 参数为空,无法进行加密") | |
| return {"request_id": "", "sign": ""} | |
| # 如果 code 是字符串而不是对象,则直接使用 | |
| if isinstance(code, str): | |
| payload_data = code | |
| else: | |
| try: | |
| payload_data = json.dumps(code) | |
| except (TypeError, ValueError) as e: | |
| print(f"code 参数序列化失败: {e}") | |
| return {"request_id": "", "sign": ""} | |
| try: | |
| payload = json.dumps({ | |
| "code": payload_data, | |
| "captcha_token": captcha_token, | |
| "rtc_token": rtc_token | |
| }) | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
| "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "Connection": "keep-alive", | |
| 'Content-Type': 'application/json', | |
| "Upgrade-Insecure-Requests": "1", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "none", | |
| "Sec-Fetch-User": "?1" | |
| } | |
| # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None, timeout=30) | |
| response = requests.request("POST", url, headers=headers, data=payload, proxies=None, timeout=30) | |
| response.raise_for_status() | |
| if not response.text: | |
| print(f"API响应为空: {url}") | |
| return {"request_id": "", "sign": ""} | |
| # 解析响应以确保与 text.py 行为一致 | |
| result = json.loads(response.text) | |
| if "request_id" not in result or "sign" not in result: | |
| print(f"API响应缺少关键字段: {result}") | |
| return {"request_id": "", "sign": ""} | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| print(f"API请求失败: {e}") | |
| if use_proxy: | |
| print(f"当前使用的代理: {proxies}") | |
| return {"request_id": "", "sign": ""} | |
| except json.JSONDecodeError as e: | |
| print(f"JSON解析错误: {e}, 响应内容: {response.text}") | |
| return {"request_id": "", "sign": ""} | |
| except Exception as e: | |
| print(f"未知错误: {e}") | |
| return {"request_id": "", "sign": ""} | |
| def d_encrypt(pid, device_id, f, use_proxy=False, proxies=None): | |
| url = "https://kiteyuan.233285.xyz/dEncrypt" | |
| payload = json.dumps({ | |
| "pid": pid, | |
| "device_id": device_id, | |
| "f": f | |
| }) | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
| "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "Connection": "keep-alive", | |
| 'Content-Type': 'application/json', | |
| "Upgrade-Insecure-Requests": "1", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "none", | |
| "Sec-Fetch-User": "?1" | |
| } | |
| try: | |
| # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None) | |
| response = requests.request("POST", url, headers=headers, data=payload, proxies=None) | |
| response.raise_for_status() | |
| if not response.text: | |
| print(f"API响应为空: {url}") | |
| return "" | |
| return response.text | |
| except requests.exceptions.RequestException as e: | |
| print(f"API请求失败: {e}") | |
| if use_proxy: | |
| print(f"当前使用的代理: {proxies}") | |
| return "" | |
| # md5加密算法 | |
| def captcha_sign_encrypt(encrypt_string, salts): | |
| for salt in salts: | |
| encrypt_string = hashlib.md5((encrypt_string + salt["salt"]).encode("utf-8")).hexdigest() | |
| return encrypt_string | |
| def captcha_image_parse(pikpak, device_id): | |
| try: | |
| # 获取frames信息 | |
| frames_info = pikpak.gen() | |
| if not frames_info or not isinstance(frames_info, dict) or "pid" not in frames_info or "frames" not in frames_info: | |
| print("获取frames_info失败,返回内容:", frames_info) | |
| return {"response_data": {"result": "reject"}, "pid": "", "traceid": ""} | |
| if "traceid" not in frames_info: | |
| frames_info["traceid"] = "" | |
| # 下载验证码图片 | |
| captcha_image = image_download(device_id, frames_info["pid"], frames_info["traceid"], pikpak.use_proxy, pikpak.proxies) | |
| if not captcha_image: | |
| print("图片下载失败") | |
| return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} | |
| # 读取图片数据并转换为 PIL.Image | |
| img = Image.open(BytesIO(captcha_image)) | |
| # 将图片转换为 Base64 编码 | |
| buffered = BytesIO() | |
| img.save(buffered, format="PNG") # 可根据图片格式调整 format | |
| base64_image = base64.b64encode(buffered.getvalue()).decode() | |
| # 获取最佳滑块位置 | |
| best_index = image_parse(base64_image, frames_info["frames"], pikpak.use_proxy, pikpak.proxies) | |
| if "best_index" not in best_index: | |
| print("图片分析失败, 返回内容:", best_index) | |
| return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} | |
| # 滑块加密 | |
| json_data = ca_f_encrypt(frames_info["frames"], best_index["best_index"], frames_info["pid"], pikpak.use_proxy, pikpak.proxies) | |
| if "f" not in json_data or "ca" not in json_data: | |
| print("加密计算失败, 返回内容:", json_data) | |
| return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} | |
| f = json_data['f'] | |
| npac = json_data['ca'] | |
| # d加密 | |
| d = d_encrypt(frames_info["pid"], device_id, f, pikpak.use_proxy, pikpak.proxies) | |
| if not d: | |
| print("d_encrypt失败") | |
| return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} | |
| # 验证 | |
| verify2 = pikpak.image_verify(frames_info["pid"], frames_info["traceid"], f, npac[0], npac[1], npac[2], npac[3], d) | |
| return { | |
| "response_data": verify2, | |
| "pid": frames_info["pid"], | |
| "traceid": frames_info["traceid"], | |
| } | |
| except Exception as e: | |
| print(f"滑块验证过程中出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {"response_data": {"result": "reject"}, "pid": "", "traceid": ""} | |
| def image_download(device_id, pid, traceid, use_proxy=False, proxies=None): | |
| url = f"https://user.mypikpak.com/pzzl/image?deviceid={device_id}&pid={pid}&traceid={traceid}" | |
| headers = { | |
| 'pragma': 'no-cache', | |
| 'priority': 'u=1, i' | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, proxies=proxies if use_proxy else None) | |
| response.raise_for_status() | |
| if response.status_code == 200: | |
| return response.content # 直接返回图片的二进制数据 | |
| else: | |
| print(f"下载失败,状态码: {response.status_code}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"图片下载失败: {e}") | |
| if use_proxy: | |
| print(f"当前使用的代理: {proxies}") | |
| return None | |
| def ramdom_version(): | |
| version_list = [ | |
| { | |
| "v": "1.42.6", | |
| "algorithms": [{"alg": "md5", "salt": "frupTFdxwcJ5mcL3R8"}, | |
| {"alg": "md5", "salt": "jB496fSFfbWLhWyqV"}, | |
| {"alg": "md5", "salt": "xYLtzn8LT5h3KbAalCjc/Wf"}, | |
| {"alg": "md5", "salt": "PSHSbm1SlxbvkwNk4mZrJhBZ1vsHCtEdm3tsRiy1IPUnqi1FNB5a2F"}, | |
| {"alg": "md5", "salt": "SX/WvPCRzgkLIp99gDnLaCs0jGn2+urx7vz/"}, | |
| {"alg": "md5", "salt": "OGdm+dgLk5EpK4O1nDB+Z4l"}, | |
| {"alg": "md5", "salt": "nwtOQpz2xFLIE3EmrDwMKe/Vlw2ubhRcnS2R23bwx9wMh+C3Sg"}, | |
| {"alg": "md5", "salt": "FI/9X9jbnTLa61RHprndT0GkVs18Chd"}] | |
| }, | |
| { | |
| "v": "1.47.1", | |
| "algorithms": [{'alg': 'md5', 'salt': 'Gez0T9ijiI9WCeTsKSg3SMlx'}, {'alg': 'md5', 'salt': 'zQdbalsolyb1R/'}, | |
| {'alg': 'md5', 'salt': 'ftOjr52zt51JD68C3s'}, | |
| {'alg': 'md5', 'salt': 'yeOBMH0JkbQdEFNNwQ0RI9T3wU/v'}, | |
| {'alg': 'md5', 'salt': 'BRJrQZiTQ65WtMvwO'}, | |
| {'alg': 'md5', 'salt': 'je8fqxKPdQVJiy1DM6Bc9Nb1'}, | |
| {'alg': 'md5', 'salt': 'niV'}, {'alg': 'md5', 'salt': '9hFCW2R1'}, | |
| {'alg': 'md5', 'salt': 'sHKHpe2i96'}, | |
| {'alg': 'md5', 'salt': 'p7c5E6AcXQ/IJUuAEC9W6'}, {'alg': 'md5', 'salt': ''}, | |
| {'alg': 'md5', 'salt': 'aRv9hjc9P+Pbn+u3krN6'}, | |
| {'alg': 'md5', 'salt': 'BzStcgE8qVdqjEH16l4'}, | |
| {'alg': 'md5', 'salt': 'SqgeZvL5j9zoHP95xWHt'}, | |
| {'alg': 'md5', 'salt': 'zVof5yaJkPe3VFpadPof'}] | |
| }, | |
| { | |
| "v": "1.48.3", | |
| "algorithms": [{'alg': 'md5', 'salt': 'aDhgaSE3MsjROCmpmsWqP1sJdFJ'}, | |
| {'alg': 'md5', 'salt': '+oaVkqdd8MJuKT+uMr2AYKcd9tdWge3XPEPR2hcePUknd'}, | |
| {'alg': 'md5', 'salt': 'u/sd2GgT2fTytRcKzGicHodhvIltMntA3xKw2SRv7S48OdnaQIS5mn'}, | |
| {'alg': 'md5', 'salt': '2WZiae2QuqTOxBKaaqCNHCW3olu2UImelkDzBn'}, | |
| {'alg': 'md5', 'salt': '/vJ3upic39lgmrkX855Qx'}, | |
| {'alg': 'md5', 'salt': 'yNc9ruCVMV7pGV7XvFeuLMOcy1'}, | |
| {'alg': 'md5', 'salt': '4FPq8mT3JQ1jzcVxMVfwFftLQm33M7i'}, | |
| {'alg': 'md5', 'salt': 'xozoy5e3Ea'}] | |
| }, | |
| { | |
| "v": "1.49.3", | |
| "algorithms": [{'alg': 'md5', 'salt': '7xOq4Z8s'}, {'alg': 'md5', 'salt': 'QE9/9+IQco'}, | |
| {'alg': 'md5', 'salt': 'WdX5J9CPLZp'}, {'alg': 'md5', 'salt': 'NmQ5qFAXqH3w984cYhMeC5TJR8j'}, | |
| {'alg': 'md5', 'salt': 'cc44M+l7GDhav'}, {'alg': 'md5', 'salt': 'KxGjo/wHB+Yx8Lf7kMP+/m9I+'}, | |
| {'alg': 'md5', 'salt': 'wla81BUVSmDkctHDpUT'}, | |
| {'alg': 'md5', 'salt': 'c6wMr1sm1WxiR3i8LDAm3W'}, | |
| {'alg': 'md5', 'salt': 'hRLrEQCFNYi0PFPV'}, | |
| {'alg': 'md5', 'salt': 'o1J41zIraDtJPNuhBu7Ifb/q3'}, | |
| {'alg': 'md5', 'salt': 'U'}, {'alg': 'md5', 'salt': 'RrbZvV0CTu3gaZJ56PVKki4IeP'}, | |
| {'alg': 'md5', 'salt': 'NNuRbLckJqUp1Do0YlrKCUP'}, | |
| {'alg': 'md5', 'salt': 'UUwnBbipMTvInA0U0E9'}, | |
| {'alg': 'md5', 'salt': 'VzGc'}] | |
| }, | |
| { | |
| "v": "1.51.2", | |
| "algorithms": [{'alg': 'md5', 'salt': 'vPjelkvqcWoCsQO1CnkVod8j2GbcE0yEHEwJ3PKSKW'}, | |
| {'alg': 'md5', 'salt': 'Rw5aO9MHuhY'}, {'alg': 'md5', 'salt': 'Gk111qdZkPw/xgj'}, | |
| {'alg': 'md5', 'salt': '/aaQ4/f8HNpyzPOtIF3rG/UEENiRRvpIXku3WDWZHuaIq+0EOF'}, | |
| {'alg': 'md5', 'salt': '6p1gxZhV0CNuKV2QO5vpibkR8IJeFURvqNIKXWOIyv1A'}, | |
| {'alg': 'md5', 'salt': 'gWR'}, | |
| {'alg': 'md5', 'salt': 'iPD'}, {'alg': 'md5', 'salt': 'ASEm+P75YfKzQRW6eRDNNTd'}, | |
| {'alg': 'md5', 'salt': '2fauuwVCxLCpL/FQ/iJ5NpOPb7gRZs0EWJwe/2YNPQr3ore+ZiIri6s/tYayG'}] | |
| } | |
| ] | |
| return version_list[0] | |
| # return random.choice(version_list) | |
| def random_rtc_token(): | |
| # 生成 8 组 16 进制数,每组 4 位,使用冒号分隔 | |
| ipv6_parts = ["{:04x}".format(random.randint(0, 0xFFFF)) for _ in range(8)] | |
| ipv6_address = ":".join(ipv6_parts) | |
| return ipv6_address | |
| class PikPak: | |
| def __init__(self, invite_code, client_id, device_id, version, algorithms, email, rtc_token, | |
| client_secret, package_name, use_proxy=False, proxy_http=None, proxy_https=None): | |
| # 初始化实例属性 | |
| self.invite_code = invite_code # 邀请码 | |
| self.client_id = client_id # 客户端ID | |
| self.device_id = device_id # 设备ID | |
| self.timestamp = 0 # 时间戳 | |
| self.algorithms = algorithms # 版本盐值 | |
| self.version = version # 版本 | |
| self.email = email # 邮箱 | |
| self.rtc_token = rtc_token # RTC Token | |
| self.captcha_token = "" # Captcha Token | |
| self.client_secret = client_secret # Client Secret | |
| self.user_id = "" # 用户ID | |
| self.access_token = "" # 登录令牌 | |
| self.refresh_token = "" # 刷新令牌 | |
| self.verification_token = "" # Verification Token | |
| self.captcha_sign = "" # Captcha Sign | |
| self.verification_id = "" # Verification ID | |
| self.package_name = package_name # 客户端包名 | |
| self.use_proxy = use_proxy # 是否使用代理 | |
| # 代理配置 | |
| if use_proxy: | |
| self.proxies = { | |
| "http": proxy_http or "http://127.0.0.1:7890", | |
| "https": proxy_https or "http://127.0.0.1:7890", | |
| } | |
| else: | |
| self.proxies = None | |
| def send_request(self, method, url, headers=None, params=None, json_data=None, data=None, use_proxy=None): | |
| headers = headers or {} | |
| # 如果未指定use_proxy,则使用类的全局设置 | |
| use_proxy = self.use_proxy if use_proxy is None else use_proxy | |
| # 确保当use_proxy为True时,有可用的代理配置 | |
| if use_proxy and not self.proxies: | |
| # 如果类的use_proxy为True但proxies未设置,使用默认代理 | |
| proxies = { | |
| "http": "http://127.0.0.1:7890", | |
| "https": "http://127.0.0.1:7890" | |
| } | |
| else: | |
| proxies = self.proxies if use_proxy else None | |
| try: | |
| response = requests.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| params=params, | |
| json=json_data, | |
| data=data, | |
| proxies=proxies, | |
| timeout=30 # 添加超时设置 | |
| ) | |
| response.raise_for_status() # 检查HTTP状态码 | |
| print(response.text) | |
| try: | |
| return response.json() | |
| except json.JSONDecodeError: | |
| return response.text | |
| except requests.exceptions.RequestException as e: | |
| print(f"请求失败: {url}, 错误: {e}") | |
| if use_proxy: | |
| print(f"当前使用的代理: {proxies}") | |
| # 返回一个空的响应对象 | |
| return {} | |
| def gen(self): | |
| url = "https://user.mypikpak.com/pzzl/gen" | |
| params = {"deviceid": self.device_id, "traceid": ""} | |
| headers = {"Host": "user.mypikpak.com", "accept": "application/json, text/plain, */*"} | |
| response = self.send_request("GET", url, headers=headers, params=params) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict) or "pid" not in response or "frames" not in response: | |
| print(f"gen请求返回无效响应: {response}") | |
| return response | |
| def image_verify(self, pid, trace_id, f, n, p, a, c, d): | |
| url = "https://user.mypikpak.com/pzzl/verify" | |
| params = {"pid": pid, "deviceid": self.device_id, "traceid": trace_id, "f": f, "n": n, "p": p, "a": a, "c": c, | |
| "d": d} | |
| headers = {"Host": "user.mypikpak.com", "accept": "application/json, text/plain, */*"} | |
| response = self.send_request("GET", url, headers=headers, params=params) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict) or "result" not in response: | |
| print(f"image_verify请求返回无效响应: {response}") | |
| return {"result": "reject"} | |
| return response | |
| def executor(self): | |
| url = "https://api-drive.mypikpak.com/captcha-jsonp/v2/executor?callback=handleJsonpResult_" + str(int(time.time() * 1000)) | |
| headers = {'pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'} | |
| try: | |
| # 使用普通 requests 而不是 self.send_request 以获取原始响应 | |
| response = requests.get(url, headers=headers, proxies=self.proxies if self.use_proxy else None, timeout=30) | |
| response.raise_for_status() # 检查 HTTP 状态码 | |
| content = response.text | |
| print(f"executor 原始响应: {content}") | |
| # 如果内容为空,直接返回空字符串 | |
| if not content: | |
| print("executor 响应内容为空") | |
| return "" | |
| # 处理 JSONP 响应格式 | |
| if "handleJsonpResult" in content: | |
| # 提取 JSON 部分,JSONP 格式通常是 callback(json数据) | |
| start_index = content.find('(') | |
| end_index = content.rfind(')') | |
| if start_index != -1 and end_index != -1: | |
| json_str = content[start_index + 1:end_index] | |
| # 有时 JSONP 响应中包含反引号,需要去除 | |
| if json_str.startswith('`') and json_str.endswith('`'): | |
| json_str = json_str[1:-1] | |
| return json_str | |
| else: | |
| print(f"无法从JSONP响应中提取有效内容: {content}") | |
| return "" | |
| elif isinstance(content, str) and (content.startswith('{') or content.startswith('[')): | |
| # 可能是直接返回的 JSON 字符串 | |
| return content | |
| else: | |
| print(f"未知的响应格式: {content}") | |
| return "" | |
| except requests.exceptions.RequestException as e: | |
| print(f"执行 executor 请求失败: {e}") | |
| return "" | |
| except Exception as e: | |
| print(f"解析 executor 响应失败: {e}") | |
| return "" | |
| def report(self, request_id, sign, pid, trace_id): | |
| url = "https://user.mypikpak.com/credit/v1/report" | |
| params = { | |
| "deviceid": self.device_id, | |
| "captcha_token": self.captcha_token, | |
| "request_id": request_id, | |
| "sign": sign, | |
| "type": "pzzlSlider", | |
| "result": 0, | |
| "data": pid, | |
| "traceid": trace_id, | |
| "rtc_token": self.rtc_token | |
| } | |
| headers = {'pragma': 'no-cache', 'priority': 'u=1, i'} | |
| response = self.send_request("GET", url, params=params, headers=headers) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict) or "captcha_token" not in response: | |
| print(f"report请求返回无效响应: {response}") | |
| else: | |
| self.captcha_token = response.get('captcha_token') | |
| return response | |
| def verification(self): | |
| url = 'https://user.mypikpak.com/v1/auth/verification' | |
| params = {"email": self.email, "target": "ANY", "usage": "REGISTER", "locale": "zh-CN", | |
| "client_id": self.client_id} | |
| headers = {'host': 'user.mypikpak.com', 'x-captcha-token': self.captcha_token, 'x-device-id': self.device_id, | |
| "x-client-id": self.client_id} | |
| response = self.send_request("POST", url, headers=headers, data=params) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict) or "verification_id" not in response: | |
| print(f"verification请求返回无效响应: {response}") | |
| else: | |
| self.verification_id = response.get('verification_id') | |
| return response | |
| def verify_post(self, verification_code): | |
| url = "https://user.mypikpak.com/v1/auth/verification/verify" | |
| params = {"client_id": self.client_id} | |
| payload = {"client_id": self.client_id, "verification_id": self.verification_id, | |
| "verification_code": verification_code} | |
| headers = {"X-Device-Id": self.device_id} | |
| response = self.send_request("POST", url, headers=headers, json_data=payload, params=params) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict) or "verification_token" not in response: | |
| print(f"verify_post请求返回无效响应: {response}") | |
| else: | |
| self.verification_token = response.get('verification_token') | |
| return response | |
| def init(self, action): | |
| self.refresh_captcha_sign() | |
| url = "https://user.mypikpak.com/v1/shield/captcha/init" | |
| params = {"client_id": self.client_id} | |
| payload = { | |
| "action": action, | |
| "captcha_token": self.captcha_token, | |
| "client_id": self.client_id, | |
| "device_id": self.device_id, | |
| "meta": { | |
| "captcha_sign": "1." + self.captcha_sign, | |
| "user_id": self.user_id, | |
| "package_name": self.package_name, | |
| "client_version": self.version, | |
| "email": self.email, | |
| "timestamp": self.timestamp | |
| } | |
| } | |
| headers = {"x-device-id": self.device_id} | |
| response = self.send_request("POST", url, headers=headers, json_data=payload, params=params) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict) or "captcha_token" not in response: | |
| print(f"init请求返回无效响应: {response}") | |
| else: | |
| self.captcha_token = response.get('captcha_token') | |
| return response | |
| def signup(self, name, password, verification_code): | |
| url = "https://user.mypikpak.com/v1/auth/signup" | |
| params = {"client_id": self.client_id} | |
| payload = { | |
| "captcha_token": self.captcha_token, | |
| "client_id": self.client_id, | |
| "client_secret": self.client_secret, | |
| "email": self.email, | |
| "name": name, | |
| "password": password, | |
| "verification_code": verification_code, | |
| "verification_token": self.verification_token | |
| } | |
| headers = {"X-Device-Id": self.device_id} | |
| response = self.send_request("POST", url, headers=headers, json_data=payload, params=params) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict): | |
| print(f"signup请求返回无效响应: {response}") | |
| else: | |
| self.access_token = response.get('access_token', '') | |
| self.refresh_token = response.get('refresh_token', '') | |
| self.user_id = response.get('sub', '') | |
| return response | |
| def activation_code(self): | |
| url = "https://api-drive.mypikpak.com/vip/v1/order/activation-code" | |
| payload = {"activation_code": self.invite_code, "data": {}} | |
| headers = { | |
| "Host": "api-drive.mypikpak.com", | |
| "authorization": "Bearer " + self.access_token, | |
| "x-captcha-token": self.captcha_token, | |
| "x-device-id": self.device_id, | |
| 'x-system-language': "ko", | |
| 'content-type': 'application/json' | |
| } | |
| response = self.send_request("POST", url, headers=headers, json_data=payload) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict): | |
| print(f"activation_code请求返回无效响应: {response}") | |
| return response | |
| def files_task(self, task_link): | |
| url = "https://api-drive.mypikpak.com/drive/v1/files" | |
| payload = { | |
| "kind": "drive#file", | |
| "folder_type": "DOWNLOAD", | |
| "upload_type": "UPLOAD_TYPE_URL", | |
| "url": {"url": task_link}, | |
| "params": {"with_thumbnail": "true", "from": "manual"} | |
| } | |
| headers = { | |
| "Authorization": "Bearer " + self.access_token, | |
| "x-device-id": self.device_id, | |
| "x-captcha-token": self.captcha_token, | |
| "Content-Type": "application/json" | |
| } | |
| response = self.send_request("POST", url, headers=headers, json_data=payload) | |
| # 检查响应是否有效 | |
| if not response or not isinstance(response, dict): | |
| print(f"files_task请求返回无效响应: {response}") | |
| return response | |
| def refresh_captcha_sign(self): | |
| self.timestamp = str(int(time.time()) * 1000) | |
| encrypt_string = self.client_id + self.version + self.package_name + self.device_id + self.timestamp | |
| self.captcha_sign = captcha_sign_encrypt(encrypt_string, self.algorithms) | |
| def save_account_info(name, account_info): | |
| # 保证account目录存在 | |
| if not os.path.exists("./account"): | |
| os.makedirs("./account") | |
| with open("./account/" + name + ".json", "w", encoding="utf-8") as f: | |
| json.dump(account_info, f, ensure_ascii=False, indent=4) | |
| def test_proxy(proxy_url): | |
| """测试代理连接是否可用""" | |
| test_url = "https://mypikpak.com" # 改为 PikPak 的网站,更可能连通 | |
| proxies = { | |
| "http": proxy_url, | |
| "https": proxy_url | |
| } | |
| try: | |
| response = requests.get(test_url, proxies=proxies, timeout=10) # 增加超时时间 | |
| response.raise_for_status() | |
| print(f"代理连接测试成功: {proxy_url}") | |
| return True | |
| except Exception as e: | |
| print(f"代理连接测试失败: {proxy_url}, 错误: {e}") | |
| return False | |
| # 程序运行主函数 | |
| def main(): | |
| try: | |
| # 1、初始化参数 | |
| current_version = ramdom_version() | |
| version = current_version['v'] | |
| algorithms = current_version['algorithms'] | |
| client_id = "YNxT9w7GMdWvEOKa" | |
| client_secret = "dbw2OtmVEeuUvIptb1Coyg" | |
| package_name = "com.pikcloud.pikpak" | |
| device_id = str(uuid.uuid4()).replace("-", "") | |
| rtc_token = random_rtc_token() | |
| print(f"当前版本:{version} 设备号:{device_id} 令牌:{rtc_token}") | |
| # 询问用户是否使用代理 | |
| use_proxy_input = input('是否启用代理(y/n):').strip().lower() | |
| use_proxy = use_proxy_input == 'y' or use_proxy_input == 'yes' | |
| proxy_http = None | |
| proxy_https = None | |
| if use_proxy: | |
| # 询问用户是否使用默认代理 | |
| default_proxy = input('是否使用默认代理地址 http://127.0.0.1:7890 (y/n):').strip().lower() | |
| if default_proxy == 'y' or default_proxy == 'yes': | |
| proxy_url = "http://127.0.0.1:7890" | |
| print("已启用代理,使用默认地址:", proxy_url) | |
| # 测试默认代理连接 | |
| if not test_proxy(proxy_url): | |
| retry = input("默认代理连接测试失败,是否继续使用(y/n):").strip().lower() | |
| if retry != 'y' and retry != 'yes': | |
| print("已取消代理设置,将直接连接") | |
| use_proxy = False | |
| proxy_url = None | |
| if use_proxy: | |
| proxy_http = proxy_url | |
| proxy_https = proxy_url | |
| else: | |
| # 用户自定义代理地址和端口 | |
| proxy_host = input('请输入代理主机地址 (默认127.0.0.1): ').strip() | |
| proxy_host = proxy_host if proxy_host else '127.0.0.1' | |
| proxy_port = input('请输入代理端口 (默认7890): ').strip() | |
| proxy_port = proxy_port if proxy_port else '7890' | |
| proxy_protocol = input('请输入代理协议 (http/https/socks5,默认http): ').strip().lower() | |
| proxy_protocol = proxy_protocol if proxy_protocol in ['http', 'https', 'socks5'] else 'http' | |
| proxy_url = f"{proxy_protocol}://{proxy_host}:{proxy_port}" | |
| print(f"已设置代理地址: {proxy_url}") | |
| # 测试自定义代理连接 | |
| if not test_proxy(proxy_url): | |
| retry = input("自定义代理连接测试失败,是否继续使用(y/n):").strip().lower() | |
| if retry != 'y' and retry != 'yes': | |
| print("已取消代理设置,将直接连接") | |
| use_proxy = False | |
| proxy_url = None | |
| if use_proxy: | |
| proxy_http = proxy_url | |
| proxy_https = proxy_url | |
| else: | |
| print("未启用代理,直接连接") | |
| invite_code = input('请输入你的邀请码:') | |
| email = input("请输入注册用的邮箱:") | |
| # 2、实例化PikPak类,传入代理设置 | |
| pikpak = PikPak(invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret, | |
| package_name, use_proxy=use_proxy, proxy_http=proxy_http, proxy_https=proxy_https) | |
| # 3、刷新timestamp,加密sign值。 | |
| init_result = pikpak.init("POST:/v1/auth/verification") | |
| if not init_result or not isinstance(init_result, dict) or "captcha_token" not in init_result: | |
| print("初始化失败,请检查网络连接或代理设置") | |
| input("按任意键退出程序") | |
| return | |
| # 4、图片滑块分析 | |
| max_attempts = 5 # 最大尝试次数 | |
| captcha_result = None | |
| for attempt in range(max_attempts): | |
| print(f"尝试滑块验证 ({attempt+1}/{max_attempts})...") | |
| try: | |
| captcha_result = captcha_image_parse(pikpak, device_id) | |
| print(captcha_result) | |
| if captcha_result and "response_data" in captcha_result and captcha_result['response_data'].get('result') == 'accept': | |
| print("滑块验证成功!") | |
| break | |
| else: | |
| print('滑块验证失败, 正在重新尝试...') | |
| time.sleep(2) # 延迟2秒再次尝试 | |
| except Exception as e: | |
| print(f"滑块验证过程出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| time.sleep(2) # 出错后延迟2秒再次尝试 | |
| if not captcha_result or "response_data" not in captcha_result or captcha_result['response_data'].get('result') != 'accept': | |
| print("滑块验证失败,达到最大尝试次数") | |
| input("按任意键退出程序") | |
| return | |
| # 5、滑块验证加密 | |
| try: | |
| executor_info = pikpak.executor() | |
| if not executor_info: | |
| print("获取executor信息失败") | |
| input("按任意键退出程序") | |
| return | |
| sign_encrypt_info = sign_encrypt(executor_info, pikpak.captcha_token, rtc_token, pikpak.use_proxy, pikpak.proxies) | |
| if not sign_encrypt_info or "request_id" not in sign_encrypt_info or "sign" not in sign_encrypt_info: | |
| print("签名加密失败") | |
| print(f"executor_info: {executor_info}") | |
| print(f"captcha_token: {pikpak.captcha_token}") | |
| print(f"rtc_token: {rtc_token}") | |
| input("按任意键退出程序") | |
| return | |
| # 更新 captcha_token | |
| pikpak.report(sign_encrypt_info['request_id'], sign_encrypt_info['sign'], captcha_result['pid'], | |
| captcha_result['traceid']) | |
| # 发送邮箱验证码 | |
| verification_result = pikpak.verification() | |
| if not verification_result or not isinstance(verification_result, dict) or "verification_id" not in verification_result: | |
| print("请求验证码失败") | |
| input("按任意键退出程序") | |
| return | |
| except Exception as e: | |
| print(f"验证过程出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| input("按任意键退出程序") | |
| return | |
| # 6、提交验证码 | |
| verification_code = input("请输入接收到的验证码:") | |
| pikpak.verify_post(verification_code) | |
| # 7、刷新timestamp,加密sign值 | |
| pikpak.init("POST:/v1/auth/signup") | |
| # 8、注册登录 | |
| name = email.split("@")[0] | |
| password = "zhiyuan233" | |
| pikpak.signup(name, password, verification_code) | |
| # 9、填写邀请码 | |
| pikpak.activation_code() | |
| # 准备账号信息 | |
| account_info = { | |
| "version": pikpak.version, | |
| "device_id": pikpak.device_id, | |
| "email": pikpak.email, | |
| "captcha_token": pikpak.captcha_token, | |
| "access_token": pikpak.access_token, | |
| "refresh_token": pikpak.refresh_token, | |
| "user_id": pikpak.user_id, | |
| "timestamp": pikpak.timestamp, | |
| "password": password, | |
| "name": name | |
| } | |
| print("请保存好账号信息备用:", json.dumps(account_info, indent=4, ensure_ascii=False)) | |
| # 确认是否保存账号信息 | |
| save_info = input("是否保存账号信息到文件(y/n):").strip().lower() | |
| if save_info == 'y' or save_info == 'yes': | |
| try: | |
| # 创建account目录(如果不存在) | |
| if not os.path.exists("./account"): | |
| os.makedirs("./account") | |
| save_account_info(name, account_info) | |
| print(f"账号信息已保存到 ./account/{name}.json") | |
| except Exception as e: | |
| print(f"保存账号信息失败: {e}") | |
| input("运行完成,回车结束程序:") | |
| except KeyboardInterrupt: | |
| print("\n程序被用户中断") | |
| except Exception as e: | |
| print(f"程序运行出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| input("按任意键退出程序") | |
| if __name__ == "__main__": | |
| print("开发者声明:免费转载需标注出处:B站-纸鸢花的花语,此工具仅供交流学习和技术分析,严禁用于任何商业牟利行为。(包括但不限于倒卖、二改倒卖、引流、冒充作者、广告植入...)") | |
| main() | |