Spaces:
Paused
Paused
| # coding:utf-8 | |
| """ | |
| PikPak登录接口自动过滑块验证码-AAA | |
| """ | |
| from quart import Quart, request, jsonify | |
| from quart_cors import route_cors | |
| import asyncio | |
| from hypercorn.config import Config | |
| from hypercorn.asyncio import serve | |
| from multiprocessing import Pool | |
| from functools import partial | |
| from tqdm import trange | |
| import numpy as np | |
| import requests | |
| import hashlib | |
| import random | |
| import json | |
| import uuid | |
| import time | |
| import cv2 | |
| from hashlib import md5 | |
| # 滑块数据加密 | |
| def r(e, t): | |
| n = t - 1 | |
| if n < 0: | |
| n = 0 | |
| r = e[n] | |
| u = r["row"] // 2 + 1 | |
| c = r["column"] // 2 + 1 | |
| f = r["matrix"][u][c] | |
| l = t + 1 | |
| if l >= len(e): | |
| l = t | |
| d = e[l] | |
| p = l % d["row"] | |
| h = l % d["column"] | |
| g = d["matrix"][p][h] | |
| y = e[t] | |
| m = 3 % y["row"] | |
| v = 7 % y["column"] | |
| w = y["matrix"][m][v] | |
| b = i(f) + o(w) | |
| x = i(w) - o(f) | |
| return [s(a(i(f), o(f))), s(a(i(g), o(g))), s(a(i(w), o(w))), s(a(b, x))] | |
| def i(e): | |
| return int(e.split(",")[0]) | |
| def o(e): | |
| return int(e.split(",")[1]) | |
| def a(e, t): | |
| return str(e) + "^^" + str(t) | |
| def s(e): | |
| t = 0 | |
| n = len(e) | |
| for r in range(n): | |
| t = u(31 * t + ord(e[r])) | |
| return t | |
| def u(e): | |
| t = -2147483648 | |
| n = 2147483647 | |
| if e > n: | |
| return t + (e - n) % (n - t + 1) - 1 | |
| if e < t: | |
| return n - (t - e) % (n - t + 1) + 1 | |
| return e | |
| def c(e, t): | |
| return s(e + "" + str(t)) | |
| def img_jj(e, t, n): | |
| return {"ca": r(e, t), "f": c(n, t)} | |
| # md5加密字符串 | |
| def get_hash(str): | |
| return hashlib.md5(str.encode()).hexdigest() | |
| Name = ["VIVO-", 'OPPO-', 'HUAWEI-', 'Honor-', 'XIAOMI', 'HONGMI-', 'APPLE-'] | |
| Type = ["S ", " pro ", "A "] | |
| def B(): | |
| timestamp = str(int(time.time()) * 1000) | |
| device_id = str(uuid.uuid4()).replace("-", "") | |
| phoneModel = "" | |
| for i in range(3): | |
| r = chr(random.randint(65, 90)) | |
| phoneModel += r | |
| phoneModel = phoneModel + "-" | |
| for i in range(5): | |
| if random.randint(0, 4) != 2: | |
| r = chr(random.randint(65, 90)) | |
| else: | |
| r = chr(random.randint(48, 57)) | |
| phoneModel += r | |
| phoneBuilder = Name[random.randint(0, 6)] + str(random.randint(3, 9)) + Type[random.randint(0, 2)] + str( | |
| random.randint(5, 6)) + "G" | |
| return timestamp, device_id, phoneModel, phoneBuilder | |
| async def get_change_ip(): | |
| m = random.randint(0, 255) | |
| n = random.randint(0, 255) | |
| x = random.randint(0, 255) | |
| y = random.randint(0, 255) | |
| randomIP = str(m) + '.' + str(n) + '.' + str(x) + '.' + str(y) | |
| return randomIP | |
| def get_UA(id, ua_key, timestamp, phoneModel, phoneBuilder, version): | |
| UA = "ANDROID-com.pikcloud.pikpak/" + version + " accessmode/ devicename/" + phoneBuilder + " appname/ android-com.pikcloud.pikpak appid/ action_type/ clientid/ YNxT9w7GMdWvEOKa deviceid/" + id + "refresh_token/ grant_type/ devicemodel/ " + phoneModel + " networktype/WIFI accesstype/ sessionid/ osversion/" + "13." + str( | |
| random.randint(0, 9)) + "." + str(random.randint(0, | |
| 9)) + " datetime/" + timestamp + " protocolversion/200 sdkversion/2.0.1.200200 clientversion/" + version + " providername/NONE clientip/ session_origin/ devicesign/div101." + ua_key + "platformversion/10 usrno/" | |
| return UA | |
| def get_ua_key(id): | |
| k = "com.pikcloud.pikpak1appkey" | |
| rank_1 = hashlib.sha1((id + k).encode("utf-8")).hexdigest() | |
| rank_2 = get_hash(rank_1) | |
| return id + rank_2 | |
| # 设置请求头基本信息 | |
| basicRequestHeaders_1 = { | |
| "Accept-Language": "zh", | |
| "Content-Type": "application/json; charset=utf-8", | |
| "Host": "user.mypikpak.com", | |
| "Connection": "Keep-Alive", | |
| "Accept-Encoding": "gzip", | |
| "content-type": "application/json" | |
| } | |
| async def get_image_d(pid, device_id, f, change_ip): | |
| url = "https://pik-d.bilivo.top/" | |
| payload = { | |
| "pid": pid, | |
| "device_id": device_id, | |
| "f": f | |
| } | |
| headers = { | |
| 'content-type': 'application/json', | |
| 'X-Forwarded-For': str(change_ip) | |
| } | |
| response = requests.request("POST", url, json=payload, headers=headers) | |
| response_data = response.json() | |
| return response_data['d'] | |
| # 安全验证-登录 | |
| async def init(client_id, device_id, User_Agent, mail, change_ip): | |
| url = f"https://user.mypikpak.com/v1/shield/captcha/init?client_id={client_id}" | |
| payload = { | |
| "action": "POST:/v1/auth/signin", | |
| "captcha_token": '', | |
| "client_id": client_id, | |
| "device_id": device_id, | |
| "meta": { | |
| "email": mail | |
| }, | |
| } | |
| headers = { | |
| "X-Device-Id": device_id, | |
| "User-Agent": User_Agent, | |
| 'X-Forwarded-For': str(change_ip) | |
| } | |
| headers.update(basicRequestHeaders_1) | |
| response = requests.request("POST", url, json=payload, headers=headers) | |
| if response.status_code != 200: | |
| error_response = response.json() | |
| message = None | |
| if "details" in error_response and len(error_response["details"]) > 0: | |
| last_detail = error_response["details"][-1] | |
| message = last_detail.get("message", "操作频繁...") | |
| raise Exception(f"{message}") | |
| return json.loads(response.text) | |
| async def get_image(xid, change_ip): | |
| url = "http://user.mypikpak.com/pzzl/gen" | |
| header = {'X-Forwarded-For': str(change_ip)} | |
| params = { | |
| "deviceid": xid, | |
| "traceid": "" | |
| } | |
| response = requests.get(url, params=params) | |
| imgs_json = response.json() | |
| frames = imgs_json["frames"] | |
| pid = imgs_json['pid'] | |
| traceid = imgs_json['traceid'] | |
| result = {"frames": frames} | |
| SELECT_ID = pass_verify(xid, pid, traceid, result,) | |
| print('滑块ID:' + str(SELECT_ID)) | |
| json_data = img_jj(frames, int(SELECT_ID), pid) | |
| f = json_data['f'] | |
| npac = json_data['ca'] | |
| d = await get_image_d(pid, xid, f, change_ip) | |
| params = { | |
| 'pid': pid, | |
| 'deviceid': xid, | |
| 'traceid': traceid, | |
| 'f': f, | |
| 'n': npac[0], | |
| 'p': npac[1], | |
| 'a': npac[2], | |
| 'd': d, | |
| 'c': npac[3] | |
| } | |
| response1 = requests.get(f"http://user.mypikpak.com/pzzl/verify", params=params, headers=header) | |
| response_data = response1.json() | |
| result = {'pid': pid, 'traceid': traceid, 'response_data': response_data} | |
| return result | |
| async def sign_in(username, password, client_id, captcha_token, change_ip): | |
| url = "https://user.pikpak.me/v1/auth/signin" | |
| headers = { | |
| "X-Captcha-Token": captcha_token, | |
| 'X-Forwarded-For': str(change_ip) | |
| } | |
| payload = { | |
| "username": username, | |
| "password": password, | |
| "client_id": client_id | |
| } | |
| response = requests.post(url, headers=headers, json=payload) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None | |
| # ---------------滑块验证--------------- | |
| # 定义一个函数,用于图像拼接 | |
| def image_assemble(sum_rows, sum_cols, channels, part_num): | |
| # 初始化一个全零的3维矩阵,用于存储拼接后的图像 | |
| final_matrix = np.zeros((sum_rows, sum_cols, channels), np.uint8) | |
| # 第一排 | |
| final_matrix[0:75, 0:150] = part_num[0] | |
| final_matrix[0:75, 150:300] = part_num[1] | |
| final_matrix[0:75, 300:450] = part_num[2] | |
| final_matrix[0:75, 450:600] = part_num[3] | |
| # 第二排 | |
| final_matrix[75:150, 0:150] = part_num[4] | |
| final_matrix[75:150, 150:300] = part_num[5] | |
| final_matrix[75:150, 300:450] = part_num[6] | |
| final_matrix[75:150, 450:600] = part_num[7] | |
| # 第三排 | |
| final_matrix[150:225, 0:150] = part_num[8] | |
| final_matrix[150:225, 150:300] = part_num[9] | |
| final_matrix[150:225, 300:450] = part_num[10] | |
| final_matrix[150:225, 450:600] = part_num[11] | |
| # 第四排 | |
| final_matrix[225:300, 0:150] = part_num[12] | |
| final_matrix[225:300, 150:300] = part_num[13] | |
| final_matrix[225:300, 300:450] = part_num[14] | |
| final_matrix[225:300, 450:600] = part_num[15] | |
| # 返回拼接后的图像 | |
| return final_matrix | |
| def get_img(deviceid, pid, traceid): | |
| url = 'http://user.mypikpak.com/pzzl/image?deviceid=' + deviceid + '&pid=' + pid + '&traceid=' + traceid | |
| response = requests.get(url) | |
| return response.content | |
| def getSize(p): | |
| sum_rows = p.shape[0] | |
| sum_cols = p.shape[1] | |
| channels = p.shape[2] | |
| return sum_rows, sum_cols, channels | |
| def pass_verify(deviceid, pid, traceid, result): | |
| pool = Pool(10) | |
| try: | |
| # 获取验证码图片 | |
| img = get_img(deviceid, pid, traceid) | |
| # 获取图片大小 | |
| start_time = time.time() | |
| img_re = cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR) | |
| finded = False | |
| sum_rows, sum_cols, channels = getSize(img_re) | |
| except Exception as e: | |
| print(e) | |
| else: | |
| # 分配进程 | |
| data_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] | |
| start_pass_verify_re = partial(start_pass_verify, sum_rows) | |
| start_pass_verify_re = partial(start_pass_verify_re, sum_cols) | |
| start_pass_verify_re = partial(start_pass_verify_re, channels) | |
| start_pass_verify_re = partial(start_pass_verify_re, img_re) | |
| start_pass_verify_re = partial(start_pass_verify_re, result) | |
| len_num = [] | |
| data_i = [] | |
| results = [] | |
| total = trange(len(data_list)) | |
| for data in data_list: | |
| # 等待任务执行完成 | |
| result = pool.apply_async(start_pass_verify_re, args=(data,), callback=lambda _: total.update(1),) | |
| data_i.append(data) | |
| results.append(result) | |
| pool.close() | |
| pool.join() | |
| for data in data_i: | |
| len_num.append(results[data].get()) | |
| print(len_num) | |
| for data, line in enumerate(len_num): | |
| if line is None: | |
| num = data | |
| else: | |
| if data == 0: | |
| num = 0 | |
| lines = line | |
| if lines <= line: | |
| finded = True | |
| elif lines > line: | |
| lines = line | |
| num = data | |
| data += 1 | |
| print(f'滑块识别时间{time.time() - start_time}') | |
| print('测试次数', data, '最终状态', finded) | |
| return num | |
| def start_pass_verify(sum_rows, sum_cols, channels, img, result, i): | |
| # 获取结果中的frames部分 | |
| part_1 = result["frames"][i]['matrix'][0] | |
| part_2 = result["frames"][i]['matrix'][1] | |
| part_3 = result["frames"][i]['matrix'][2] | |
| part_4 = result["frames"][i]['matrix'][3] | |
| # 将四个部分合并 | |
| part = [] | |
| part.extend(part_1) | |
| part.extend(part_2) | |
| part.extend(part_3) | |
| part.extend(part_4) | |
| # 将合并后的部分重新组装 | |
| start_time = time.time() | |
| part_num = re_image_assemble(part, img) | |
| # 获取重新组装后的图片 | |
| part_num = get_reimage(part_num) | |
| # 将重新组装后的图片按照sum_rows和sum_cols进行组装 | |
| f = image_assemble(sum_rows, sum_cols, channels, part_num) | |
| # 将组装后的图片转换为灰度图 | |
| gray = cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) | |
| # 对灰度图进行边缘检测 | |
| edges = cv2.Canny(gray, 100, 200, apertureSize=3) | |
| # 对边缘检测后的图片进行霍夫变换 | |
| # lines = cv2.HoughLinesP(edges, 0.001, np.pi / 180, 1, minLineLength=0, maxLineGap=0) | |
| lsd = cv2.createLineSegmentDetector(0, scale=3.0, sigma_scale=0.6, quant=2.0, ang_th=90,) | |
| # 执行检测结果 | |
| lines = lsd.detect(edges)[0] | |
| # 返回线段数量 | |
| return len(lines) | |
| def re_image_assemble(part, img): | |
| # 获取图片的每一部分 | |
| part1 = img[0:75, 0:150] | |
| part2 = img[0:75, 150:300] | |
| part3 = img[0:75, 300:450] | |
| part4 = img[0:75, 450:600] | |
| # 第二排 | |
| part5 = img[75:150, 0:150] | |
| part6 = img[75:150, 150:300] | |
| part7 = img[75:150, 300:450] | |
| part8 = img[75:150, 450:600] | |
| # 第三排 | |
| part9 = img[150:225, 0:150] | |
| part10 = img[150:225, 150:300] | |
| part11 = img[150:225, 300:450] | |
| part12 = img[150:225, 450:600] | |
| # 第四排 | |
| part13 = img[225:300, 0:150] | |
| part14 = img[225:300, 150:300] | |
| part15 = img[225:300, 300:450] | |
| part16 = img[225:300, 450:600] | |
| part_nu = [] | |
| for i, j in enumerate(part): | |
| if j == '0,0': | |
| part_nu.append(part1) | |
| if j == '0,1': | |
| part_nu.append(part5) | |
| if j == '0,2': | |
| part_nu.append(part9) | |
| if j == '0,3': | |
| part_nu.append(part13) | |
| if j == '1,0': | |
| part_nu.append(part2) | |
| if j == '1,1': | |
| part_nu.append(part6) | |
| if j == '1,2': | |
| part_nu.append(part10) | |
| if j == '1,3': | |
| part_nu.append(part14) | |
| if j == '2,0': | |
| part_nu.append(part3) | |
| if j == '2,1': | |
| part_nu.append(part7) | |
| if j == '2,2': | |
| part_nu.append(part11) | |
| if j == '2,3': | |
| part_nu.append(part15) | |
| if j == '3,0': | |
| part_nu.append(part4) | |
| if j == '3,1': | |
| part_nu.append(part8) | |
| if j == '3,2': | |
| part_nu.append(part12) | |
| if j == '3,3': | |
| part_nu.append(part16) | |
| return part_nu | |
| # 定义一个函数,用于获取处理图像 | |
| def get_reimage(images): | |
| # 定义一个空字典,用于存储裁剪后的图像 | |
| cropped_img = {} | |
| # 遍历images中的每一张图像 | |
| for i in range(16): | |
| # 对图像进行裁剪 | |
| re_num = corp_image(images[i]) | |
| # 将裁剪后的图像resize为150x75的大小 | |
| cropped_img[i] = cv2.resize(re_num, (150, 75)) | |
| # 返回裁剪后的图像字典 | |
| return cropped_img | |
| # 定义一个函数,用于处理图像 | |
| def corp_image(img): | |
| # 将图像转换为灰度图像 | |
| img2 = img.sum(axis=2) | |
| # 获取图像的行数和列数 | |
| (row, col) = img2.shape | |
| # 初始化行和列的上下界 | |
| row_top = 0 | |
| raw_down = 0 | |
| col_top = 0 | |
| col_down = 0 | |
| # 遍历行,找到行上下界 | |
| for r in range(0, row): | |
| if img2.sum(axis=1)[r] < 740 * col: | |
| row_top = r | |
| break | |
| for r in range(row - 1, 0, -1): | |
| if img2.sum(axis=1)[r] < 740 * col: | |
| raw_down = r | |
| break | |
| # 遍历列,找到列上下界 | |
| for c in range(0, col): | |
| if img2.sum(axis=0)[c] < 740 * row: | |
| col_top = c | |
| break | |
| for c in range(col - 1, 0, -1): | |
| if img2.sum(axis=0)[c] < 740 * row: | |
| col_down = c | |
| break | |
| # 裁剪图像,并返回新的图像 | |
| new_img = img[row_top:raw_down + 1, col_top:col_down + 1, 0:3] | |
| return new_img | |
| async def get_new_token(result, xid, captcha, change_ip): | |
| traceid = result['traceid'] | |
| pid = result['pid'] | |
| header = {'X-Forwarded-For': str(change_ip)} | |
| response2 = requests.get( | |
| f"http://user.mypikpak.com/credit/v1/report?deviceid={xid}&captcha_token={captcha}&type" | |
| f"=pzzlSlider&result=0&data={pid}&traceid={traceid}", headers=header) | |
| response_data = response2.json() | |
| print('获取验证TOKEN中......') | |
| return response_data | |
| # ---------------滑块验证--------------- | |
| app = Quart(__name__) | |
| async def home(): | |
| data = json.dumps({"msg": "用于PikPak登录获取 AccessToken接口-作者:AAA", | |
| "data": "https://github.com/LiJunYi2"}, ensure_ascii=False) | |
| return jsonify(data), 200 | |
| async def login(): | |
| data = await request.get_json() | |
| if not data: | |
| return jsonify({"error": "没有指定参数"}), 400 | |
| email = data.get('userName') | |
| password = data.get('passWord') | |
| timestamp = B()[0] | |
| phoneModel = B()[2] | |
| phoneBuilder = B()[3] | |
| try: | |
| change_ip = await get_change_ip() | |
| client_id1 = "YUMx5nI8ZU8Ap8pm" #web | |
| #client_id1 = "YNxT9w7GMdWvEOKa" #android | |
| #device_id = str(uuid.uuid4()).replace("-", "") | |
| device_id = md5(f"{email}{password}".encode()).hexdigest() | |
| ua_key = get_ua_key(device_id) | |
| user_agent = get_UA(device_id, ua_key, timestamp, phoneModel, phoneBuilder, "1.47.1") | |
| action_init = await init(client_id1, device_id, user_agent, email, change_ip) | |
| captcha_token = action_init["captcha_token"] | |
| if 'url' in action_init.keys(): | |
| while True: | |
| img_info = await get_image(device_id, change_ip) | |
| if img_info['response_data']['result'] == 'accept': | |
| print('验证通过!!!') | |
| break | |
| token_data = await get_new_token(img_info, device_id, captcha_token, change_ip) | |
| captcha_token = token_data['captcha_token'] | |
| login_data = await sign_in(email, password, client_id1, captcha_token, change_ip) | |
| if login_data is None: | |
| return jsonify({"code": 500, "data": '', "msg": 'Login Failure'}), 200 | |
| else: | |
| return jsonify({"code": 200, "data": login_data, "device_id": device_id, "msg": 'success'}), 200 | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"code": 500, "data": {}, "msg": str(e)}), 200 | |
| config = Config() | |
| config.bind = ["0.0.0.0:8090"] | |
| if __name__ == '__main__': | |
| import asyncio | |
| asyncio.run(serve(app, config)) | |