zxlwq commited on
Commit
ea51004
·
verified ·
1 Parent(s): 30707cf

Upload 4 files

Browse files
pikpakapi/PikpakException.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ class PikpakException(Exception):
2
+ def __init__(self, message):
3
+ super().__init__(message)
4
+
5
+
6
+ class PikpakRetryException(PikpakException):
7
+ pass
pikpakapi/__init__.py ADDED
@@ -0,0 +1,1125 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import binascii
3
+ import inspect
4
+ import json
5
+ import logging
6
+ import re
7
+ from base64 import b64decode, b64encode
8
+ from hashlib import md5
9
+ from types import NoneType
10
+ from typing import Any, Dict, List, Optional, Callable, Coroutine
11
+ from urllib.parse import urljoin
12
+
13
+ import httpx
14
+
15
+ from .PikpakException import PikpakException, PikpakRetryException
16
+ from .enums import DownloadStatus
17
+ from .utils import (
18
+ CLIENT_ID,
19
+ CLIENT_SECRET,
20
+ CLIENT_VERSION,
21
+ PACKAG_ENAME,
22
+ build_custom_user_agent,
23
+ captcha_sign,
24
+ get_timestamp,
25
+ )
26
+
27
+
28
+ class PikPakApi:
29
+ """
30
+ PikPakApi class
31
+
32
+ Attributes:
33
+ PIKPAK_API_HOST: str - PikPak API host
34
+ PIKPAK_USER_HOST: str - PikPak user API host
35
+
36
+ username: str - username of the user
37
+ password: str - password of the user
38
+ encoded_token: str - encoded token of the user with access and refresh tokens
39
+ access_token: str - access token of the user , expire in 7200
40
+ refresh_token: str - refresh token of the user
41
+ user_id: str - user id of the user
42
+ token_refresh_callback: Callable[[PikPakApi, **Any], Coroutine[Any, Any, None]] - async callback function to be called after token refresh
43
+ token_refresh_callback_kwargs: Dict[str, Any] - custom arguments to be passed to the token refresh callback
44
+ """
45
+
46
+ PIKPAK_API_HOST = "api-pan.xunleix.com"
47
+ PIKPAK_USER_HOST = "xluser-ssl.xunleix.com"
48
+
49
+ def __init__(
50
+ self,
51
+ username: Optional[str] = None,
52
+ password: Optional[str] = None,
53
+ encoded_token: Optional[str] = None,
54
+ httpx_client_args: Optional[Dict[str, Any]] = None,
55
+ device_id: Optional[str] = None,
56
+ request_max_retries: int = 3,
57
+ request_initial_backoff: float = 3.0,
58
+ token_refresh_callback: Optional[Callable] = None,
59
+ token_refresh_callback_kwargs: Optional[Dict[str, Any]] = None,
60
+ ):
61
+ """
62
+ username: str - username of the user
63
+ password: str - password of the user
64
+ encoded_token: str - encoded token of the user with access and refresh token
65
+ httpx_client_args: dict - extra arguments for httpx.AsyncClient (https://www.python-httpx.org/api/#asyncclient)
66
+ device_id: str - device id to identify the device
67
+ request_max_retries: int - maximum number of retries for requests
68
+ request_initial_backoff: float - initial backoff time for retries
69
+ token_refresh_callback: Callable[[PikPakApi, **Any], Coroutine[Any, Any, None]] - async callback function to be called after token refresh
70
+ token_refresh_callback_kwargs: Dict[str, Any] - custom arguments to be passed to the token refresh callback
71
+ """
72
+
73
+ self.username = username
74
+ self.password = password
75
+ self.encoded_token = encoded_token
76
+ self.max_retries = request_max_retries
77
+ self.initial_backoff = request_initial_backoff
78
+ self.token_refresh_callback = token_refresh_callback
79
+ self.token_refresh_callback_kwargs = token_refresh_callback_kwargs or {}
80
+
81
+ self.access_token = None
82
+ self.refresh_token = None
83
+ self.user_id = None
84
+
85
+ self.data_response = None
86
+
87
+ # device_id is used to identify the device, if not provided, a random device_id will be generated, 32 characters
88
+ self.device_id = (
89
+ device_id
90
+ if device_id
91
+ else md5(f"{self.username}{self.password}".encode()).hexdigest()
92
+ )
93
+ self.captcha_token = None
94
+
95
+ httpx_client_args = httpx_client_args or {"timeout": 10}
96
+ self.httpx_client = httpx.AsyncClient(**httpx_client_args)
97
+
98
+ self._path_id_cache: Dict[str, Any] = {}
99
+
100
+ self.user_agent: Optional[str] = None
101
+
102
+ if self.encoded_token:
103
+ self.decode_token()
104
+ elif self.username and self.password:
105
+ pass
106
+ else:
107
+ raise PikpakException("username and password or encoded_token is required")
108
+
109
+ @classmethod
110
+ def from_dict(cls, data: Dict[str, Any]) -> "PikPakApi":
111
+ """
112
+ Create PikPakApi object from a dictionary
113
+ """
114
+ params = inspect.signature(cls).parameters
115
+ filtered_data = {key: data[key] for key in params if key in data}
116
+ client = cls(
117
+ **filtered_data,
118
+ )
119
+ client.__dict__.update(data)
120
+ return client
121
+
122
+ def to_dict(self) -> Dict[str, Any]:
123
+ """
124
+ Returns the PikPakApi object as a dictionary
125
+ """
126
+ data = self.__dict__.copy()
127
+ # remove can't be serialized attributes
128
+ keys_to_delete = [
129
+ k
130
+ for k, v in data.items()
131
+ if not type(v) in [str, int, float, bool, list, dict, NoneType]
132
+ ]
133
+ for k in keys_to_delete:
134
+ del data[k]
135
+ return data
136
+
137
+ def build_custom_user_agent(self) -> str:
138
+
139
+ self.user_agent = build_custom_user_agent(
140
+ device_id=self.device_id,
141
+ user_id=self.user_id if self.user_id else "",
142
+ )
143
+ return self.user_agent
144
+
145
+ def get_headers(self, access_token: Optional[str] = None) -> Dict[str, str]:
146
+ """
147
+ Returns the headers to use for the requests.
148
+ """
149
+ headers = {
150
+ "User-Agent": (
151
+ self.build_custom_user_agent()
152
+ if self.captcha_token
153
+ else "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
154
+ ),
155
+ "Content-Type": "application/json; charset=utf-8",
156
+ }
157
+
158
+ if self.access_token:
159
+ headers["Authorization"] = f"Bearer {self.access_token}"
160
+ if access_token:
161
+ headers["Authorization"] = f"Bearer {access_token}"
162
+ if self.captcha_token:
163
+ headers["X-Captcha-Token"] = self.captcha_token
164
+ if self.device_id:
165
+ headers["X-Device-Id"] = self.device_id
166
+ return headers
167
+
168
+ async def _make_request(
169
+ self,
170
+ method: str,
171
+ url: str,
172
+ data: Optional[Dict[str, Any]] = None,
173
+ params: Optional[Dict[str, Any]] = None,
174
+ headers: Optional[Dict[str, str]] = None,
175
+ ) -> Dict[str, Any]:
176
+ last_error = None
177
+
178
+ # url = "{proxy}{target}".format(proxy="https://pikpak.tjsky.top/", target=url)
179
+ # print(url)
180
+
181
+ for attempt in range(self.max_retries):
182
+ try:
183
+ response = await self._send_request(method, url, data, params, headers)
184
+ return await self._handle_response(response)
185
+ except PikpakRetryException as error:
186
+ logging.info(f"Retry attempt {attempt + 1}/{self.max_retries}")
187
+ last_error = error
188
+ except PikpakException:
189
+ raise
190
+ except httpx.HTTPError as error:
191
+ logging.error(
192
+ f"HTTP Error on attempt {attempt + 1}/{self.max_retries}: {str(error)}"
193
+ )
194
+ last_error = error
195
+ except Exception as error:
196
+ logging.error(
197
+ f"Unexpected error on attempt {attempt + 1}/{self.max_retries}: {str(error)}"
198
+ )
199
+ last_error = error
200
+
201
+ await asyncio.sleep(self.initial_backoff * (2**attempt))
202
+
203
+ # If we've exhausted all retries, raise an exception with the last error
204
+ raise PikpakException(f"Max retries reached. Last error: {str(last_error)}")
205
+
206
+ async def _send_request(self, method, url, data, params, headers):
207
+ req_headers = headers or self.get_headers()
208
+ return await self.httpx_client.request(
209
+ method,
210
+ url,
211
+ json=data,
212
+ params=params,
213
+ headers=req_headers,
214
+ )
215
+
216
+ async def _handle_response(self, response) -> Dict[str, Any]:
217
+ try:
218
+ json_data = response.json()
219
+ except ValueError:
220
+ if response.status_code == 200:
221
+ return {}
222
+ raise PikpakRetryException("Empty JSON data")
223
+
224
+ self.data_response = json_data
225
+
226
+ if not json_data:
227
+ if response.status_code == 200:
228
+ return {}
229
+ raise PikpakRetryException("Empty JSON data")
230
+
231
+ if "error" not in json_data:
232
+ return json_data
233
+
234
+ if "captcha_token" in json_data:
235
+ self.captcha_token = json_data["captcha_token"]
236
+
237
+ if json_data["error"] == "invalid_account_or_password":
238
+ raise PikpakException("Invalid username or password")
239
+
240
+ if json_data.get("error_code") == 16:
241
+ await self.refresh_access_token()
242
+ raise PikpakRetryException("Token refreshed, please retry")
243
+
244
+ if json_data.get("error_description") is not None:
245
+ return json_data.get("error_description")
246
+
247
+ raise PikpakException(json_data.get("error_description", "Unknown Error"))
248
+
249
+ async def _request_get(
250
+ self,
251
+ url: str,
252
+ params: dict = None,
253
+ ):
254
+ return await self._make_request("get", url, params=params)
255
+
256
+ async def _request_post(
257
+ self,
258
+ url: str,
259
+ data: dict = None,
260
+ headers: dict = None,
261
+ ):
262
+ return await self._make_request("post", url, data=data, headers=headers)
263
+
264
+ async def _request_patch(
265
+ self,
266
+ url: str,
267
+ data: dict = None,
268
+ ):
269
+ return await self._make_request("patch", url, data=data)
270
+
271
+ async def _request_delete(
272
+ self,
273
+ url: str,
274
+ params: dict = None,
275
+ data: dict = None,
276
+ ):
277
+ return await self._make_request("delete", url, params=params, data=data)
278
+
279
+ def decode_token(self):
280
+ """Decodes the encoded token to update access and refresh tokens."""
281
+ try:
282
+ decoded_data = json.loads(b64decode(self.encoded_token).decode())
283
+ except (binascii.Error, json.JSONDecodeError):
284
+ raise PikpakException("Invalid encoded token")
285
+ if not decoded_data.get("access_token") or not decoded_data.get(
286
+ "refresh_token"
287
+ ):
288
+ raise PikpakException("Invalid encoded token")
289
+ self.access_token = decoded_data.get("access_token")
290
+ self.refresh_token = decoded_data.get("refresh_token")
291
+
292
+ def encode_token(self):
293
+ """Encodes the access and refresh tokens into a single string."""
294
+ token_data = {
295
+ "access_token": self.access_token,
296
+ "refresh_token": self.refresh_token,
297
+ }
298
+ self.encoded_token = b64encode(json.dumps(token_data).encode()).decode()
299
+
300
+ async def captcha_init(self, action: str, meta: dict = None) -> Dict[str, Any]:
301
+ url = f"https://{PikPakApi.PIKPAK_USER_HOST}/v1/shield/captcha/init"
302
+ if not meta:
303
+ t = f"{get_timestamp()}"
304
+ meta = {
305
+ "captcha_sign": captcha_sign(self.device_id, t),
306
+ "client_version": CLIENT_VERSION,
307
+ "package_name": PACKAG_ENAME,
308
+ "user_id": self.user_id,
309
+ "timestamp": t,
310
+ }
311
+ params = {
312
+ "client_id": CLIENT_ID,
313
+ "action": action,
314
+ "device_id": self.device_id,
315
+ "meta": meta,
316
+ }
317
+ return await self._request_post(url, data=params)
318
+
319
+ async def login(self) -> None:
320
+ """
321
+ Login to PikPak
322
+ """
323
+ login_url = f"https://{PikPakApi.PIKPAK_USER_HOST}/v1/auth/signin"
324
+ metas = {}
325
+ if not self.username or not self.password:
326
+ raise PikpakException("username and password are required")
327
+ if re.match(r"\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*", self.username):
328
+ metas["email"] = self.username
329
+ elif re.match(r"\d{11,18}", self.username):
330
+ metas["phone_number"] = self.username
331
+ else:
332
+ metas["username"] = self.username
333
+ result = await self.captcha_init(
334
+ action=f"POST:{login_url}",
335
+ meta=metas,
336
+ )
337
+ captcha_token = result.get("captcha_token", "")
338
+ if not captcha_token:
339
+ raise PikpakException("captcha_token get failed")
340
+
341
+ login_data = {
342
+ "client_id": CLIENT_ID,
343
+ "client_secret": CLIENT_SECRET,
344
+ "password": self.password,
345
+ "username": self.username,
346
+ "captcha_token": captcha_token,
347
+ }
348
+ user_info = await self._request_post(
349
+ login_url,
350
+ login_data,
351
+ {
352
+ "Content-Type": "application/x-www-form-urlencoded",
353
+ },
354
+ )
355
+ self.access_token = user_info["access_token"]
356
+ self.refresh_token = user_info["refresh_token"]
357
+ self.user_id = user_info["sub"]
358
+ self.encode_token()
359
+
360
+ async def refresh_access_token(self) -> None:
361
+ """
362
+ Refresh access token
363
+ """
364
+ refresh_url = f"https://{self.PIKPAK_USER_HOST}/v1/auth/token"
365
+ refresh_data = {
366
+ "client_id": CLIENT_ID,
367
+ "refresh_token": self.refresh_token,
368
+ "grant_type": "refresh_token",
369
+ }
370
+ user_info = await self._request_post(refresh_url, refresh_data)
371
+
372
+ self.access_token = user_info["access_token"]
373
+ self.refresh_token = user_info["refresh_token"]
374
+ self.user_id = user_info["sub"]
375
+ self.encode_token()
376
+ if self.token_refresh_callback:
377
+ await self.token_refresh_callback(
378
+ self, **self.token_refresh_callback_kwargs
379
+ )
380
+
381
+ def get_user_info(self) -> Dict[str, Optional[str]]:
382
+ """
383
+ Get user info
384
+ """
385
+ return {
386
+ "username": self.username,
387
+ "user_id": self.user_id,
388
+ "access_token": self.access_token,
389
+ "refresh_token": self.refresh_token,
390
+ "encoded_token": self.encoded_token,
391
+ }
392
+
393
+ async def create_folder(
394
+ self, name: str = "新建文件夹", parent_id: Optional[str] = None
395
+ ) -> Dict[str, Any]:
396
+ """
397
+ name: str - 文件夹名称
398
+ parent_id: str - 父文件夹id, 默认创建到根目录
399
+
400
+ 创建文件夹
401
+ """
402
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files"
403
+ data = {
404
+ "kind": "drive#folder",
405
+ "name": name,
406
+ "parent_id": parent_id,
407
+ }
408
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files")
409
+ self.captcha_token = captcha_result.get("captcha_token")
410
+ result = await self._request_post(url, data)
411
+ return result
412
+
413
+ async def delete_to_trash(self, ids: List[str]) -> Dict[str, Any]:
414
+ """
415
+ ids: List[str] - 文件夹、文件id列表
416
+
417
+ 将文件夹、文件移动到回收站
418
+ """
419
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchTrash"
420
+ data = {
421
+ "ids": ids,
422
+ }
423
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchTrash")
424
+ self.captcha_token = captcha_result.get("captcha_token")
425
+ result = await self._request_post(url, data)
426
+ return result
427
+
428
+ async def untrash(self, ids: List[str]) -> Dict[str, Any]:
429
+ """
430
+ ids: List[str] - 文件夹、文件id列表
431
+
432
+ 将文件夹、文件移出回收站
433
+ """
434
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchUntrash"
435
+ data = {
436
+ "ids": ids,
437
+ }
438
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchUntrash")
439
+ self.captcha_token = captcha_result.get("captcha_token")
440
+ result = await self._request_post(url, data)
441
+ return result
442
+
443
+ async def emptytrash(self) -> Dict[str, Any]:
444
+ """
445
+ 清空回收站
446
+ """
447
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files/trash:empty"
448
+ data = {}
449
+ captcha_result = await self.captcha_init(f"PATCH:/drive/v1/files/trash:empty")
450
+ self.captcha_token = captcha_result.get("captcha_token")
451
+ result = await self._request_patch(url, data)
452
+ return result
453
+
454
+ async def delete_forever(self, ids: List[str]) -> Dict[str, Any]:
455
+ """
456
+ ids: List[str] - 文件夹、文件id列表
457
+
458
+ 永远删除文件夹、文件, 慎用
459
+ """
460
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchDelete"
461
+ data = {
462
+ "ids": ids,
463
+ }
464
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchDelete")
465
+ self.captcha_token = captcha_result.get("captcha_token")
466
+ result = await self._request_post(url, data)
467
+ return result
468
+
469
+ async def offline_download(
470
+ self, file_url: str, parent_id: Optional[str] = None, name: Optional[str] = None
471
+ ) -> Dict[str, Any]:
472
+ """
473
+ file_url: str - 文件链接
474
+ parent_id: str - 父文件夹id, 不传默认存储到 My Pack
475
+ name: str - 文件名, 不传默认为文件链接的文件名
476
+
477
+ 离线下载磁力链
478
+ """
479
+ download_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files"
480
+ download_data = {
481
+ "kind": "drive#file",
482
+ "name": name,
483
+ "upload_type": "UPLOAD_TYPE_URL",
484
+ "url": {"url": file_url, "parent_id": parent_id},
485
+ "parent_id": parent_id,
486
+ }
487
+
488
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files")
489
+ self.captcha_token = captcha_result.get("captcha_token")
490
+ result = await self._request_post(download_url, download_data)
491
+ return result
492
+
493
+ async def offline_list(
494
+ self,
495
+ size: int = 10000,
496
+ next_page_token: Optional[str] = None,
497
+ phase: Optional[List[str]] = None,
498
+ ) -> Dict[str, Any]:
499
+ """
500
+ size: int - 每次请求的数量
501
+ next_page_token: str - 下一页的page token
502
+ phase: List[str] - Offline download task status, default is ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"]
503
+ supported values: PHASE_TYPE_RUNNING, PHASE_TYPE_ERROR, PHASE_TYPE_COMPLETE, PHASE_TYPE_PENDING
504
+
505
+ 获取离线下载列表
506
+ """
507
+ if phase is None:
508
+ phase = ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"]
509
+ list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks"
510
+ list_data = {
511
+ "type": "offline",
512
+ "thumbnail_size": "SIZE_SMALL",
513
+ "limit": size,
514
+ "page_token": next_page_token,
515
+ "filters": json.dumps({"phase": {"in": ",".join(phase)}}),
516
+ "with": "reference_resource",
517
+ }
518
+ captcha_result = await self.captcha_init("GET:/drive/v1/tasks")
519
+ self.captcha_token = captcha_result.get("captcha_token")
520
+ result = await self._request_get(list_url, list_data)
521
+ return result
522
+
523
+ async def offline_file_info(self, file_id: str) -> Dict[str, Any]:
524
+ """
525
+ file_id: str - 离线下载文件id
526
+
527
+ 离线下载文件信息
528
+ """
529
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files/{file_id}")
530
+ self.captcha_token = captcha_result.get("captcha_token")
531
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{file_id}"
532
+ result = await self._request_get(url, {"thumbnail_size": "SIZE_LARGE"})
533
+ return result
534
+
535
+ async def file_list(
536
+ self,
537
+ size: int = 100,
538
+ parent_id: Optional[str] = None,
539
+ next_page_token: Optional[str] = None,
540
+ additional_filters: Optional[Dict[str, Any]] = None,
541
+ ) -> Dict[str, Any]:
542
+ """
543
+ size: int - 每次请求的数量
544
+ parent_id: str - 父文件夹id, 默认列出根目录
545
+ next_page_token: str - 下一页的page token
546
+ additional_filters: Dict[str, Any] - 额外的过滤条件
547
+
548
+ 获取文件列表,可以获得文件下载链接
549
+ """
550
+ default_filters = {
551
+ "trashed": {"eq": False},
552
+ "phase": {"eq": "PHASE_TYPE_COMPLETE"},
553
+ }
554
+ if additional_filters:
555
+ default_filters.update(additional_filters)
556
+ list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files"
557
+ list_data = {
558
+ "parent_id": parent_id,
559
+ "thumbnail_size": "SIZE_MEDIUM",
560
+ "limit": size,
561
+ "with_audit": "true",
562
+ "page_token": next_page_token,
563
+ "filters": json.dumps(default_filters),
564
+ }
565
+
566
+ captcha_result = await self.captcha_init("GET:/drive/v1/files")
567
+ self.captcha_token = captcha_result.get("captcha_token")
568
+ result = await self._request_get(list_url, list_data)
569
+ return result
570
+
571
+ async def events(
572
+ self, size: int = 100, next_page_token: Optional[str] = None
573
+ ) -> Dict[str, Any]:
574
+ """
575
+ size: int - 每次请求的数量
576
+ next_page_token: str - 下一页的page token
577
+
578
+ 获取最近添加事件列表
579
+ """
580
+ list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/events"
581
+ list_data = {
582
+ "thumbnail_size": "SIZE_MEDIUM",
583
+ "limit": size,
584
+ "next_page_token": next_page_token,
585
+ }
586
+ captcha_result = await self.captcha_init("GET:/drive/v1/files")
587
+ self.captcha_token = captcha_result.get("captcha_token")
588
+ result = await self._request_get(list_url, list_data)
589
+ return result
590
+
591
+ async def offline_task_retry(self, task_id: str) -> Dict[str, Any]:
592
+ """
593
+ task_id: str - 离线下载任务id
594
+
595
+ 重试离线下载任务
596
+ """
597
+ list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/task"
598
+ list_data = {
599
+ "type": "offline",
600
+ "create_type": "RETRY",
601
+ "id": task_id,
602
+ }
603
+ try:
604
+ captcha_result = await self.captcha_init("GET:/drive/v1/task")
605
+ self.captcha_token = captcha_result.get("captcha_token")
606
+ result = await self._request_post(list_url, list_data)
607
+ return result
608
+ except Exception as e:
609
+ raise PikpakException(f"重试离线下载任务失败: {task_id}. {e}")
610
+
611
+ async def delete_tasks(
612
+ self, task_ids: List[str], delete_files: bool = False
613
+ ) -> None:
614
+ """
615
+ delete tasks by task ids
616
+ task_ids: List[str] - task ids to delete
617
+ """
618
+ delete_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks"
619
+ params = {
620
+ "task_ids": task_ids,
621
+ "delete_files": delete_files,
622
+ }
623
+ try:
624
+ captcha_result = await self.captcha_init("GET:/drive/v1/tasks")
625
+ self.captcha_token = captcha_result.get("captcha_token")
626
+ await self._request_delete(delete_url, params=params)
627
+ except Exception as e:
628
+ raise PikpakException(f"Failing to delete tasks: {task_ids}. {e}")
629
+
630
+ async def get_task_status(self, task_id: str, file_id: str) -> DownloadStatus:
631
+ """
632
+ task_id: str - 离线下载任务id
633
+ file_id: str - 离线下载文件id
634
+
635
+ 获取离线下载任务状态, 临时实现, 后期可能变更
636
+ """
637
+ try:
638
+ infos = await self.offline_list()
639
+ if infos and infos.get("tasks", []):
640
+ for task in infos.get("tasks", []):
641
+ if task_id == task.get("id"):
642
+ return DownloadStatus.downloading
643
+ file_info = await self.offline_file_info(file_id=file_id)
644
+ if file_info:
645
+ return DownloadStatus.done
646
+ else:
647
+ return DownloadStatus.not_found
648
+ except PikpakException:
649
+ return DownloadStatus.error
650
+
651
+ async def path_to_id(self, path: str, create: bool = False) -> List[Dict[str, str]]:
652
+ """
653
+ path: str - 路径
654
+ create: bool - 是否创建不存在的文件夹
655
+
656
+ 将形如 /path/a/b 的路径转换为 文件夹的id
657
+ """
658
+ if not path or len(path) <= 0:
659
+ return []
660
+ paths = path.split("/")
661
+ paths = [p.strip() for p in paths if len(p) > 0]
662
+ # 构造不同级别的path表达式,尝试找到距离目标最近的那一层
663
+ multi_level_paths = ["/" + "/".join(paths[: i + 1]) for i in range(len(paths))]
664
+ path_ids = [
665
+ self._path_id_cache[p]
666
+ for p in multi_level_paths
667
+ if p in self._path_id_cache
668
+ ]
669
+ # 判断缓存命中情况
670
+ hit_cnt = len(path_ids)
671
+ if hit_cnt == len(paths):
672
+ return path_ids
673
+ elif hit_cnt == 0:
674
+ count = 0
675
+ parent_id = None
676
+ else:
677
+ count = hit_cnt
678
+ parent_id = path_ids[-1]["id"]
679
+
680
+ next_page_token = None
681
+ while count < len(paths):
682
+ data = await self.file_list(
683
+ parent_id=parent_id, next_page_token=next_page_token
684
+ )
685
+ record_of_target_path = None
686
+ for f in data.get("files", []):
687
+ current_path = "/" + "/".join(paths[:count] + [f.get("name")])
688
+ file_type = (
689
+ "folder" if f.get("kind", "").find("folder") != -1 else "file"
690
+ )
691
+ record = {
692
+ "id": f.get("id"),
693
+ "name": f.get("name"),
694
+ "file_type": file_type,
695
+ }
696
+ self._path_id_cache[current_path] = record
697
+ if f.get("name") == paths[count]:
698
+ record_of_target_path = record
699
+ # 不break: 剩下的文件也同样缓存起来
700
+ if record_of_target_path is not None:
701
+ path_ids.append(record_of_target_path)
702
+ count += 1
703
+ parent_id = record_of_target_path["id"]
704
+ elif data.get("next_page_token") and (
705
+ not next_page_token or next_page_token != data.get("next_page_token")
706
+ ):
707
+ next_page_token = data.get("next_page_token")
708
+ elif create:
709
+ data = await self.create_folder(name=paths[count], parent_id=parent_id)
710
+ file_id = data.get("file").get("id")
711
+ record = {
712
+ "id": file_id,
713
+ "name": paths[count],
714
+ "file_type": "folder",
715
+ }
716
+ path_ids.append(record)
717
+ current_path = "/" + "/".join(paths[: count + 1])
718
+ self._path_id_cache[current_path] = record
719
+ count += 1
720
+ parent_id = file_id
721
+ else:
722
+ break
723
+ return path_ids
724
+
725
+ async def file_batch_move(
726
+ self,
727
+ ids: List[str],
728
+ to_parent_id: Optional[str] = None,
729
+ ) -> Dict[str, Any]:
730
+ """
731
+ ids: List[str] - 文件id列表
732
+ to_parent_id: str - 移动到的文件夹id, 默认为根目录
733
+
734
+ 批量移动文件
735
+ """
736
+ to = (
737
+ {
738
+ "parent_id": to_parent_id,
739
+ }
740
+ if to_parent_id
741
+ else {}
742
+ )
743
+ captcha_result = await self.captcha_init("GET:/drive/v1/files:batchMove")
744
+ self.captcha_token = captcha_result.get("captcha_token")
745
+ result = await self._request_post(
746
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchMove",
747
+ data={
748
+ "ids": ids,
749
+ "to": to,
750
+ },
751
+ )
752
+ return result
753
+
754
+ async def file_batch_copy(
755
+ self,
756
+ ids: List[str],
757
+ to_parent_id: Optional[str] = None,
758
+ ) -> Dict[str, Any]:
759
+ """
760
+ ids: List[str] - 文件id列表
761
+ to_parent_id: str - 复制到的文件夹id, 默认为根目录
762
+
763
+ 批量复制文件
764
+ """
765
+ to = (
766
+ {
767
+ "parent_id": to_parent_id,
768
+ }
769
+ if to_parent_id
770
+ else {}
771
+ )
772
+ captcha_result = await self.captcha_init("GET:/drive/v1/files:batchCopy")
773
+ self.captcha_token = captcha_result.get("captcha_token")
774
+ result = await self._request_post(
775
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchCopy",
776
+ data={
777
+ "ids": ids,
778
+ "to": to,
779
+ },
780
+ )
781
+ return result
782
+
783
+ async def file_move_or_copy_by_path(
784
+ self,
785
+ from_path: List[str],
786
+ to_path: str,
787
+ move: bool = False,
788
+ create: bool = False,
789
+ ) -> Dict[str, Any]:
790
+ """
791
+ from_path: List[str] - 要移动或复制的文件路径列表
792
+ to_path: str - 移动或复制到的路径
793
+ is_move: bool - 是否移动, 默认为复制
794
+ create: bool - 是否创建不存在的文件夹
795
+
796
+ 根据路径移动或复制文件
797
+ """
798
+ from_ids: List[str] = []
799
+ for path in from_path:
800
+ if path_ids := await self.path_to_id(path):
801
+ if file_id := path_ids[-1].get("id"):
802
+ from_ids.append(file_id)
803
+ if not from_ids:
804
+ raise PikpakException("要移动的文件不存在")
805
+ to_path_ids = await self.path_to_id(to_path, create=create)
806
+ if to_path_ids:
807
+ to_parent_id = to_path_ids[-1].get("id")
808
+ else:
809
+ to_parent_id = None
810
+ if move:
811
+ result = await self.file_batch_move(ids=from_ids, to_parent_id=to_parent_id)
812
+ else:
813
+ result = await self.file_batch_copy(ids=from_ids, to_parent_id=to_parent_id)
814
+ return result
815
+
816
+ async def get_download_url(self, file_id: str) -> Dict[str, Any]:
817
+ """
818
+ id: str - 文件id
819
+
820
+ Returns the file details data.
821
+ 1. Use `medias[0][link][url]` for streaming with high speed in streaming services or tools.
822
+ 2. Use `web_content_link` to download the file
823
+ """
824
+ result = await self.captcha_init(
825
+ action=f"GET:/drive/v1/files/{file_id}",
826
+ )
827
+ self.captcha_token = result.get("captcha_token")
828
+ result = await self._request_get(
829
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{file_id}?",
830
+ )
831
+ self.captcha_token = None
832
+ return result
833
+
834
+ async def file_rename(self, id: str, new_file_name: str) -> Dict[str, Any]:
835
+ """
836
+ id: str - 文件id
837
+ new_file_name: str - 新的文件名
838
+
839
+ 重命名文件
840
+ 返回文件的详细信息
841
+ """
842
+ data = {
843
+ "name": new_file_name,
844
+ }
845
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files/{id}")
846
+ self.captcha_token = captcha_result.get("captcha_token")
847
+ result = await self._request_patch(
848
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{id}",
849
+ data=data,
850
+ )
851
+ return result
852
+
853
+ async def file_batch_star(
854
+ self,
855
+ ids: List[str],
856
+ ) -> Dict[str, Any]:
857
+ """
858
+ ids: List[str] - 文件id列表
859
+
860
+ 批量给文件加星标
861
+ """
862
+ data = {
863
+ "ids": ids,
864
+ }
865
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files/star")
866
+ self.captcha_token = captcha_result.get("captcha_token")
867
+ result = await self._request_post(
868
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:star",
869
+ data=data,
870
+ )
871
+ return result
872
+
873
+ async def file_batch_unstar(
874
+ self,
875
+ ids: List[str],
876
+ ) -> Dict[str, Any]:
877
+ """
878
+ ids: List[str] - 文件id列表
879
+
880
+ 批量给文件取消星标
881
+ """
882
+ data = {
883
+ "ids": ids,
884
+ }
885
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/files/unstar")
886
+ self.captcha_token = captcha_result.get("captcha_token")
887
+ result = await self._request_post(
888
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:unstar",
889
+ data=data,
890
+ )
891
+ return result
892
+
893
+ async def file_star_list(
894
+ self,
895
+ size: int = 100,
896
+ next_page_token: Optional[str] = None,
897
+ ) -> Dict[str, Any]:
898
+ """
899
+ size: int - 每次请求的数量
900
+ next_page_token: str - 下一页的page token
901
+
902
+ 获取加星标的文件列表,可以获得文件下载链接
903
+ parent_id只可取默认值*,子目录列表通过获取星标目录以后自行使用file_list方法获取
904
+ """
905
+ additional_filters = {"system_tag": {"in": "STAR"}}
906
+ result = await self.file_list(
907
+ size=size,
908
+ parent_id="*",
909
+ next_page_token=next_page_token,
910
+ additional_filters=additional_filters,
911
+ )
912
+ return result
913
+
914
+ async def get_quota_info(self) -> Dict[str, Any]:
915
+ """
916
+ 获取当前空间的quota信息
917
+ 返回数据结构如下:
918
+ {
919
+ "kind": "drive#about",
920
+ "quota": {
921
+ "kind": "drive#quota",
922
+ "limit": "10995116277760", //空间总大小, 单位Byte
923
+ "usage": "5113157556024", // 已用空间大小,单位Byte
924
+ "usage_in_trash": "1281564700871", // 回收站占用大小,单位Byte
925
+ "play_times_limit": "-1",
926
+ "play_times_usage": "0"
927
+ },
928
+ "expires_at": "",
929
+ "quotas": {}
930
+ }
931
+ """
932
+ # captcha_result = await self.captcha_init(f"GET:/drive/v1/about")
933
+ # self.captcha_token = captcha_result.get("captcha_token")
934
+ result = await self._request_get(
935
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/about",
936
+ )
937
+ return result
938
+
939
+ async def get_invite_code(self):
940
+ captcha_result = await self.captcha_init(f"GET:/vip/v1/activity/inviteCode")
941
+ self.captcha_token = captcha_result.get("captcha_token")
942
+ result = await self._request_get(
943
+ url=f"https://{self.PIKPAK_API_HOST}/vip/v1/activity/inviteCode",
944
+ )
945
+ return result["code"]
946
+
947
+ async def vip_info(self):
948
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/privilege/vip")
949
+ self.captcha_token = captcha_result.get("captcha_token")
950
+ result = await self._request_get(
951
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/privilege/vip",
952
+ )
953
+ return result
954
+
955
+ async def get_transfer_quota(self) -> Dict[str, Any]:
956
+ """
957
+ Get transfer quota
958
+ """
959
+ url = f"https://{self.PIKPAK_API_HOST}/vip/v1/quantity/list?type=transfer"
960
+ captcha_result = await self.captcha_init(
961
+ f"GET:/vip/v1/quantity/list?type=transfer"
962
+ )
963
+ self.captcha_token = captcha_result.get("captcha_token")
964
+ result = await self._request_get(url)
965
+ return result
966
+
967
+ async def file_batch_share(
968
+ self,
969
+ ids: List[str],
970
+ need_password: Optional[bool] = False,
971
+ expiration_days: Optional[int] = -1,
972
+ ) -> Dict[str, Any]:
973
+ """
974
+ ids: List[str] - 文件id列表
975
+ need_password: Optional[bool] - 是否需要分享密码
976
+ expiration_days: Optional[int] - 分享天数
977
+
978
+ 批量分享文件,并生成分享链接
979
+ 返回数据结构:
980
+ {
981
+ "share_id": "xxx", //分享ID
982
+ "share_url": "https://mypikpak.com/s/xxx", // 分享链接
983
+ "pass_code": "53fe", // 分享密码
984
+ "share_text": "https://mypikpak.com/s/xxx",
985
+ "share_list": []
986
+ }
987
+ """
988
+ data = {
989
+ "file_ids": ids,
990
+ "share_to": "copy",
991
+ "restore_limit": "-1",
992
+ "expiration_days": "-1",
993
+ "expiration_days": expiration_days,
994
+ "pass_code_option": "REQUIRED" if need_password else "NOT_REQUIRED",
995
+ }
996
+ captcha_result = await self.captcha_init(f"POST:/drive/v1/share")
997
+ self.captcha_token = captcha_result.get("captcha_token")
998
+ result = await self._request_post(
999
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share",
1000
+ data=data,
1001
+ )
1002
+ return result
1003
+
1004
+ async def share_batch_delete(
1005
+ self,
1006
+ ids: List[str],
1007
+ ) -> Dict[str, Any]:
1008
+ """
1009
+ ids: List[str] - 文件id列表
1010
+
1011
+ 批量分享文件,并生成分享链接
1012
+ 返回数据结构:
1013
+ {
1014
+ "share_id": "xxx", //分享ID
1015
+ "share_url": "https://mypikpak.com/s/xxx", // 分享链接
1016
+ "pass_code": "53fe", // 分享密码
1017
+ "share_text": "https://mypikpak.com/s/xxx",
1018
+ "share_list": []
1019
+ }
1020
+ """
1021
+ data = {
1022
+ "ids": ids,
1023
+ }
1024
+ captcha_result = await self.captcha_init(f"POST:/drive/v1/share:batchDelete")
1025
+ self.captcha_token = captcha_result.get("captcha_token")
1026
+ result = await self._request_post(
1027
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share:batchDelete",
1028
+ data=data,
1029
+ )
1030
+ return result
1031
+
1032
+ async def get_share_list(
1033
+ self, next_page_token: Optional[str] = None
1034
+ ) -> Dict[str, Any]:
1035
+ """
1036
+ 获取账号下所有分享信息
1037
+ """
1038
+ data = {
1039
+ "limit": "100",
1040
+ "thumbnail_size": "SIZE_SMALL",
1041
+ "page_token": next_page_token,
1042
+ }
1043
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share/list"
1044
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/share/list")
1045
+ self.captcha_token = captcha_result.get("captcha_token")
1046
+ return await self._request_get(url, params=data)
1047
+
1048
+ async def get_share_folder(
1049
+ self, share_id: str, pass_code_token: str, parent_id: str = None
1050
+ ) -> Dict[str, Any]:
1051
+ """
1052
+ 获取分享链接下文件夹内容
1053
+
1054
+ Args:
1055
+ share_id: str - 分享ID eg. /s/VO8BcRb-XXXXX 的 VO8BcRb-XXXXX
1056
+ pass_code_token: str - 通过 get_share_info 获取到的 pass_code_token
1057
+ parent_id: str - 父文件夹id, 默认列出根目录
1058
+ """
1059
+ data = {
1060
+ "limit": "100",
1061
+ "thumbnail_size": "SIZE_LARGE",
1062
+ "order": "6",
1063
+ "share_id": share_id,
1064
+ "parent_id": parent_id,
1065
+ "pass_code_token": pass_code_token,
1066
+ }
1067
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share/detail"
1068
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/share/detail")
1069
+ self.captcha_token = captcha_result.get("captcha_token")
1070
+ return await self._request_get(url, params=data)
1071
+
1072
+ async def get_share_info(
1073
+ self, share_link: str, pass_code: str = None
1074
+ ) -> ValueError | Dict[str, Any] | List[Dict[str | Any, str | Any]]:
1075
+ """
1076
+ 获取分享链接下内容
1077
+
1078
+ Args:
1079
+ share_link: str - 分享链接
1080
+ pass_code: str - 分享密码, 无密码则留空
1081
+ """
1082
+ match = re.search(r"/s/([^/]+)(?:.*/([^/]+))?$", share_link)
1083
+ if match:
1084
+ share_id = match.group(1)
1085
+ parent_id = match.group(2) if match.group(2) else None
1086
+ else:
1087
+ return ValueError("Share Link Is Not Right")
1088
+
1089
+ data = {
1090
+ "limit": "100",
1091
+ "thumbnail_size": "SIZE_LARGE",
1092
+ "order": "3",
1093
+ "share_id": share_id,
1094
+ "parent_id": parent_id,
1095
+ "pass_code": pass_code,
1096
+ }
1097
+ url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share"
1098
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/share")
1099
+ self.captcha_token = captcha_result.get("captcha_token")
1100
+ return await self._request_get(url, params=data)
1101
+
1102
+ async def restore(
1103
+ self, share_id: str, pass_code_token: str, file_ids: List[str]
1104
+ ) -> Dict[str, Any]:
1105
+ """
1106
+
1107
+ Args:
1108
+ share_id: 分享链接eg. /s/VO8BcRb-XXXXX 的 VO8BcRb-XXXXX
1109
+ pass_code_token: get_share_info获取, 无密码则留空
1110
+ file_ids: 需要转存的文件/文件夹ID列表, get_share_info获取id值
1111
+ """
1112
+ data = {
1113
+ "share_id": share_id,
1114
+ "pass_code_token": pass_code_token,
1115
+ "file_ids": file_ids,
1116
+ "folder_type": "NORMAL",
1117
+ "specify_parent_id" : True,
1118
+ "parent_id" : "",
1119
+ }
1120
+ captcha_result = await self.captcha_init(f"GET:/drive/v1/share/restore")
1121
+ self.captcha_token = captcha_result.get("captcha_token")
1122
+ result = await self._request_post(
1123
+ url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share/restore", data=data
1124
+ )
1125
+ return result
pikpakapi/enums.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ from enum import Enum
2
+
3
+
4
+ class DownloadStatus(Enum):
5
+ not_downloading = "not_downloading"
6
+ downloading = "downloading"
7
+ done = "done"
8
+ error = "error"
9
+ not_found = "not_found"
pikpakapi/utils.py ADDED
@@ -0,0 +1,108 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import hashlib
2
+ from uuid import uuid4
3
+ import time
4
+
5
+ CLIENT_ID = "ZQL_zwA4qhHcoe_2"
6
+ CLIENT_SECRET = "Og9Vr1L8Ee6bh0olFxFDRg"
7
+ CLIENT_VERSION = "1.06.0.2132"
8
+ PACKAG_ENAME = "com.thunder.downloader"
9
+ SDK_VERSION = "2.0.3.203100 "
10
+ APP_NAME = PACKAG_ENAME
11
+
12
+
13
+ def get_timestamp() -> int:
14
+ """
15
+ Get current timestamp.
16
+ """
17
+ return int(time.time() * 1000)
18
+
19
+
20
+ def device_id_generator() -> str:
21
+ """
22
+ Generate a random device id.
23
+ """
24
+ return str(uuid4()).replace("-", "")
25
+
26
+
27
+ SALTS = [
28
+ "kVy0WbPhiE4v6oxXZ88DvoA3Q",
29
+ "lON/AUoZKj8/nBtcE85mVbkOaVdVa",
30
+ "rLGffQrfBKH0BgwQ33yZofvO3Or",
31
+ "FO6HWqw",
32
+ "GbgvyA2",
33
+ "L1NU9QvIQIH7DTRt",
34
+ "y7llk4Y8WfYflt6",
35
+ "iuDp1WPbV3HRZudZtoXChxH4HNVBX5ZALe",
36
+ "8C28RTXmVcco0",
37
+ "X5Xh",
38
+ "7xe25YUgfGgD0xW3ezFS",
39
+ "",
40
+ "CKCR",
41
+ "8EmDjBo6h3eLaK7U6vU2Qys0NsMx",
42
+ "t2TeZBXKqbdP09Arh9C3",
43
+ ]
44
+
45
+
46
+ def captcha_sign(device_id: str, timestamp: str) -> str:
47
+ """
48
+ Generate a captcha sign.
49
+
50
+ 在网页端的js中, 搜索 captcha_sign, 可以找到对应的js代码
51
+
52
+ """
53
+ sign = CLIENT_ID + CLIENT_VERSION + PACKAG_ENAME + device_id + timestamp
54
+ for salt in SALTS:
55
+ sign = hashlib.md5((sign + salt).encode()).hexdigest()
56
+ return f"1.{sign}"
57
+
58
+
59
+ def generate_device_sign(device_id, package_name):
60
+ signature_base = f"{device_id}{package_name}1appkey"
61
+
62
+ # 计算 SHA-1 哈希
63
+ sha1_hash = hashlib.sha1()
64
+ sha1_hash.update(signature_base.encode("utf-8"))
65
+ sha1_result = sha1_hash.hexdigest()
66
+
67
+ # 计算 MD5 哈希
68
+ md5_hash = hashlib.md5()
69
+ md5_hash.update(sha1_result.encode("utf-8"))
70
+ md5_result = md5_hash.hexdigest()
71
+
72
+ device_sign = f"div101.{device_id}{md5_result}"
73
+
74
+ return device_sign
75
+
76
+
77
+ def build_custom_user_agent(device_id, user_id):
78
+ device_sign = generate_device_sign(device_id, PACKAG_ENAME)
79
+
80
+ user_agent_parts = [
81
+ f"ANDROID-{APP_NAME}/{CLIENT_VERSION}",
82
+ "protocolVersion/200",
83
+ "accesstype/",
84
+ f"clientid/{CLIENT_ID}",
85
+ f"clientversion/{CLIENT_VERSION}",
86
+ "action_type/",
87
+ "networktype/WIFI",
88
+ "sessionid/",
89
+ f"deviceid/{device_id}",
90
+ "providername/NONE",
91
+ f"devicesign/{device_sign}",
92
+ "refresh_token/",
93
+ f"sdkversion/{SDK_VERSION}",
94
+ f"datetime/{get_timestamp()}",
95
+ f"usrno/{user_id}",
96
+ f"appname/{APP_NAME}",
97
+ "session_origin/",
98
+ "grant_type/",
99
+ "appid/",
100
+ "clientip/",
101
+ "devicename/Xiaomi_M2004j7ac",
102
+ "osversion/13",
103
+ "platformversion/10",
104
+ "accessmode/",
105
+ "devicemodel/M2004J7AC",
106
+ ]
107
+
108
+ return " ".join(user_agent_parts)