Spaces:
Paused
Paused
| import asyncio | |
| import binascii | |
| import inspect | |
| import json | |
| import logging | |
| import re | |
| from base64 import b64decode, b64encode | |
| from hashlib import md5 | |
| from types import NoneType | |
| from typing import Any, Dict, List, Optional, Callable, Coroutine | |
| from urllib.parse import urljoin | |
| import httpx | |
| from .PikpakException import PikpakException, PikpakRetryException | |
| from .enums import DownloadStatus | |
| from .utils import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| CLIENT_VERSION, | |
| PACKAG_ENAME, | |
| build_custom_user_agent, | |
| captcha_sign, | |
| get_timestamp, | |
| ) | |
| class PikPakApi: | |
| """ | |
| PikPakApi class | |
| Attributes: | |
| PIKPAK_API_HOST: str - PikPak API host | |
| PIKPAK_USER_HOST: str - PikPak user API host | |
| username: str - username of the user | |
| password: str - password of the user | |
| encoded_token: str - encoded token of the user with access and refresh tokens | |
| access_token: str - access token of the user , expire in 7200 | |
| refresh_token: str - refresh token of the user | |
| user_id: str - user id of the user | |
| token_refresh_callback: Callable[[PikPakApi, **Any], Coroutine[Any, Any, None]] - async callback function to be called after token refresh | |
| token_refresh_callback_kwargs: Dict[str, Any] - custom arguments to be passed to the token refresh callback | |
| """ | |
| PIKPAK_API_HOST = "api-pan.xunleix.com" | |
| PIKPAK_USER_HOST = "xluser-ssl.xunleix.com" | |
| def __init__( | |
| self, | |
| username: Optional[str] = None, | |
| password: Optional[str] = None, | |
| encoded_token: Optional[str] = None, | |
| httpx_client_args: Optional[Dict[str, Any]] = None, | |
| device_id: Optional[str] = None, | |
| request_max_retries: int = 3, | |
| request_initial_backoff: float = 3.0, | |
| token_refresh_callback: Optional[Callable] = None, | |
| token_refresh_callback_kwargs: Optional[Dict[str, Any]] = None, | |
| ): | |
| """ | |
| username: str - username of the user | |
| password: str - password of the user | |
| encoded_token: str - encoded token of the user with access and refresh token | |
| httpx_client_args: dict - extra arguments for httpx.AsyncClient (https://www.python-httpx.org/api/#asyncclient) | |
| device_id: str - device id to identify the device | |
| request_max_retries: int - maximum number of retries for requests | |
| request_initial_backoff: float - initial backoff time for retries | |
| token_refresh_callback: Callable[[PikPakApi, **Any], Coroutine[Any, Any, None]] - async callback function to be called after token refresh | |
| token_refresh_callback_kwargs: Dict[str, Any] - custom arguments to be passed to the token refresh callback | |
| """ | |
| self.username = username | |
| self.password = password | |
| self.encoded_token = encoded_token | |
| self.max_retries = request_max_retries | |
| self.initial_backoff = request_initial_backoff | |
| self.token_refresh_callback = token_refresh_callback | |
| self.token_refresh_callback_kwargs = token_refresh_callback_kwargs or {} | |
| self.access_token = None | |
| self.refresh_token = None | |
| self.user_id = None | |
| self.data_response = None | |
| # device_id is used to identify the device, if not provided, a random device_id will be generated, 32 characters | |
| self.device_id = ( | |
| device_id | |
| if device_id | |
| else md5(f"{self.username}{self.password}".encode()).hexdigest() | |
| ) | |
| self.captcha_token = None | |
| httpx_client_args = httpx_client_args or {"timeout": 10} | |
| self.httpx_client = httpx.AsyncClient(**httpx_client_args) | |
| self._path_id_cache: Dict[str, Any] = {} | |
| self.user_agent: Optional[str] = None | |
| if self.encoded_token: | |
| self.decode_token() | |
| elif self.username and self.password: | |
| pass | |
| else: | |
| raise PikpakException("username and password or encoded_token is required") | |
| def from_dict(cls, data: Dict[str, Any]) -> "PikPakApi": | |
| """ | |
| Create PikPakApi object from a dictionary | |
| """ | |
| params = inspect.signature(cls).parameters | |
| filtered_data = {key: data[key] for key in params if key in data} | |
| client = cls( | |
| **filtered_data, | |
| ) | |
| client.__dict__.update(data) | |
| return client | |
| def to_dict(self) -> Dict[str, Any]: | |
| """ | |
| Returns the PikPakApi object as a dictionary | |
| """ | |
| data = self.__dict__.copy() | |
| # remove can't be serialized attributes | |
| keys_to_delete = [ | |
| k | |
| for k, v in data.items() | |
| if not type(v) in [str, int, float, bool, list, dict, NoneType] | |
| ] | |
| for k in keys_to_delete: | |
| del data[k] | |
| return data | |
| def build_custom_user_agent(self) -> str: | |
| self.user_agent = build_custom_user_agent( | |
| device_id=self.device_id, | |
| user_id=self.user_id if self.user_id else "", | |
| ) | |
| return self.user_agent | |
| def get_headers(self, access_token: Optional[str] = None) -> Dict[str, str]: | |
| """ | |
| Returns the headers to use for the requests. | |
| """ | |
| headers = { | |
| "User-Agent": ( | |
| self.build_custom_user_agent() | |
| if self.captcha_token | |
| else "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" | |
| ), | |
| "Content-Type": "application/json; charset=utf-8", | |
| } | |
| if self.access_token: | |
| headers["Authorization"] = f"Bearer {self.access_token}" | |
| if access_token: | |
| headers["Authorization"] = f"Bearer {access_token}" | |
| if self.captcha_token: | |
| headers["X-Captcha-Token"] = self.captcha_token | |
| if self.device_id: | |
| headers["X-Device-Id"] = self.device_id | |
| return headers | |
| async def _make_request( | |
| self, | |
| method: str, | |
| url: str, | |
| data: Optional[Dict[str, Any]] = None, | |
| params: Optional[Dict[str, Any]] = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> Dict[str, Any]: | |
| last_error = None | |
| # url = "{proxy}{target}".format(proxy="https://pikpak.tjsky.top/", target=url) | |
| # print(url) | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = await self._send_request(method, url, data, params, headers) | |
| return await self._handle_response(response) | |
| except PikpakRetryException as error: | |
| logging.info(f"Retry attempt {attempt + 1}/{self.max_retries}") | |
| last_error = error | |
| except PikpakException: | |
| raise | |
| except httpx.HTTPError as error: | |
| logging.error( | |
| f"HTTP Error on attempt {attempt + 1}/{self.max_retries}: {str(error)}" | |
| ) | |
| last_error = error | |
| except Exception as error: | |
| logging.error( | |
| f"Unexpected error on attempt {attempt + 1}/{self.max_retries}: {str(error)}" | |
| ) | |
| last_error = error | |
| await asyncio.sleep(self.initial_backoff * (2**attempt)) | |
| # If we've exhausted all retries, raise an exception with the last error | |
| raise PikpakException(f"Max retries reached. Last error: {str(last_error)}") | |
| async def _send_request(self, method, url, data, params, headers): | |
| req_headers = headers or self.get_headers() | |
| return await self.httpx_client.request( | |
| method, | |
| url, | |
| json=data, | |
| params=params, | |
| headers=req_headers, | |
| ) | |
| async def _handle_response(self, response) -> Dict[str, Any]: | |
| try: | |
| json_data = response.json() | |
| except ValueError: | |
| if response.status_code == 200: | |
| return {} | |
| raise PikpakRetryException("Empty JSON data") | |
| self.data_response = json_data | |
| if not json_data: | |
| if response.status_code == 200: | |
| return {} | |
| raise PikpakRetryException("Empty JSON data") | |
| if "error" not in json_data: | |
| return json_data | |
| if "captcha_token" in json_data: | |
| self.captcha_token = json_data["captcha_token"] | |
| if json_data["error"] == "invalid_account_or_password": | |
| raise PikpakException("Invalid username or password") | |
| if json_data.get("error_code") == 16: | |
| await self.refresh_access_token() | |
| raise PikpakRetryException("Token refreshed, please retry") | |
| raise PikpakException(json_data.get("error_description", "Unknown Error")) | |
| async def _request_get( | |
| self, | |
| url: str, | |
| params: dict = None, | |
| ): | |
| return await self._make_request("get", url, params=params) | |
| async def _request_post( | |
| self, | |
| url: str, | |
| data: dict = None, | |
| headers: dict = None, | |
| ): | |
| return await self._make_request("post", url, data=data, headers=headers) | |
| async def _request_patch( | |
| self, | |
| url: str, | |
| data: dict = None, | |
| ): | |
| return await self._make_request("patch", url, data=data) | |
| async def _request_delete( | |
| self, | |
| url: str, | |
| params: dict = None, | |
| data: dict = None, | |
| ): | |
| return await self._make_request("delete", url, params=params, data=data) | |
| def decode_token(self): | |
| """Decodes the encoded token to update access and refresh tokens.""" | |
| try: | |
| decoded_data = json.loads(b64decode(self.encoded_token).decode()) | |
| except (binascii.Error, json.JSONDecodeError): | |
| raise PikpakException("Invalid encoded token") | |
| if not decoded_data.get("access_token") or not decoded_data.get( | |
| "refresh_token" | |
| ): | |
| raise PikpakException("Invalid encoded token") | |
| self.access_token = decoded_data.get("access_token") | |
| self.refresh_token = decoded_data.get("refresh_token") | |
| def encode_token(self): | |
| """Encodes the access and refresh tokens into a single string.""" | |
| token_data = { | |
| "access_token": self.access_token, | |
| "refresh_token": self.refresh_token, | |
| } | |
| self.encoded_token = b64encode(json.dumps(token_data).encode()).decode() | |
| async def captcha_init(self, action: str, meta: dict = None) -> Dict[str, Any]: | |
| url = f"https://{PikPakApi.PIKPAK_USER_HOST}/v1/shield/captcha/init" | |
| if not meta: | |
| t = f"{get_timestamp()}" | |
| meta = { | |
| "captcha_sign": captcha_sign(self.device_id, t), | |
| "client_version": CLIENT_VERSION, | |
| "package_name": PACKAG_ENAME, | |
| "user_id": self.user_id, | |
| "timestamp": t, | |
| } | |
| params = { | |
| "client_id": CLIENT_ID, | |
| "action": action, | |
| "device_id": self.device_id, | |
| "meta": meta, | |
| } | |
| return await self._request_post(url, data=params) | |
| async def login(self) -> None: | |
| """ | |
| Login to PikPak | |
| """ | |
| login_url = f"https://{PikPakApi.PIKPAK_USER_HOST}/v1/auth/signin" | |
| metas = {} | |
| if not self.username or not self.password: | |
| raise PikpakException("username and password are required") | |
| if re.match(r"\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*", self.username): | |
| metas["email"] = self.username | |
| elif re.match(r"\d{11,18}", self.username): | |
| metas["phone_number"] = self.username | |
| else: | |
| metas["username"] = self.username | |
| result = await self.captcha_init( | |
| action=f"POST:{login_url}", | |
| meta=metas, | |
| ) | |
| captcha_token = result.get("captcha_token", "") | |
| if not captcha_token: | |
| raise PikpakException("captcha_token get failed") | |
| login_data = { | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "password": self.password, | |
| "username": self.username, | |
| "captcha_token": captcha_token, | |
| } | |
| user_info = await self._request_post( | |
| login_url, | |
| login_data, | |
| { | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| }, | |
| ) | |
| self.access_token = user_info["access_token"] | |
| self.refresh_token = user_info["refresh_token"] | |
| self.user_id = user_info["sub"] | |
| self.encode_token() | |
| async def refresh_access_token(self) -> None: | |
| """ | |
| Refresh access token | |
| """ | |
| refresh_url = f"https://{self.PIKPAK_USER_HOST}/v1/auth/token" | |
| refresh_data = { | |
| "client_id": CLIENT_ID, | |
| "refresh_token": self.refresh_token, | |
| "grant_type": "refresh_token", | |
| } | |
| user_info = await self._request_post(refresh_url, refresh_data) | |
| self.access_token = user_info["access_token"] | |
| self.refresh_token = user_info["refresh_token"] | |
| self.user_id = user_info["sub"] | |
| self.encode_token() | |
| if self.token_refresh_callback: | |
| await self.token_refresh_callback( | |
| self, **self.token_refresh_callback_kwargs | |
| ) | |
| def get_user_info(self) -> Dict[str, Optional[str]]: | |
| """ | |
| Get user info | |
| """ | |
| return { | |
| "username": self.username, | |
| "user_id": self.user_id, | |
| "access_token": self.access_token, | |
| "refresh_token": self.refresh_token, | |
| "encoded_token": self.encoded_token, | |
| } | |
| async def create_folder( | |
| self, name: str = "新建文件夹", parent_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| name: str - 文件夹名称 | |
| parent_id: str - 父文件夹id, 默认创建到根目录 | |
| 创建文件夹 | |
| """ | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files" | |
| data = { | |
| "kind": "drive#folder", | |
| "name": name, | |
| "parent_id": parent_id, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post(url, data) | |
| return result | |
| async def delete_to_trash(self, ids: List[str]) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件夹、文件id列表 | |
| 将文件夹、文件移动到回收站 | |
| """ | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchTrash" | |
| data = { | |
| "ids": ids, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchTrash") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post(url, data) | |
| return result | |
| async def untrash(self, ids: List[str]) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件夹、文件id列表 | |
| 将文件夹、文件移出回收站 | |
| """ | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchUntrash" | |
| data = { | |
| "ids": ids, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchUntrash") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post(url, data) | |
| return result | |
| async def emptytrash(self) -> Dict[str, Any]: | |
| """ | |
| 清空回收站 | |
| """ | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files/trash:empty" | |
| data = {} | |
| captcha_result = await self.captcha_init(f"PATCH:/drive/v1/files/trash:empty") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_patch(url, data) | |
| return result | |
| async def delete_forever(self, ids: List[str]) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件夹、文件id列表 | |
| 永远删除文件夹、文件, 慎用 | |
| """ | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchDelete" | |
| data = { | |
| "ids": ids, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files:batchDelete") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post(url, data) | |
| return result | |
| async def offline_download( | |
| self, file_url: str, parent_id: Optional[str] = None, name: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| file_url: str - 文件链接 | |
| parent_id: str - 父文件夹id, 不传默认存储到 My Pack | |
| name: str - 文件名, 不传默认为文件链接的文件名 | |
| 离线下载磁力链 | |
| """ | |
| download_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files" | |
| download_data = { | |
| "kind": "drive#file", | |
| "name": name, | |
| "upload_type": "UPLOAD_TYPE_URL", | |
| "url": {"url": file_url, "parent_id": parent_id}, | |
| "parent_id": parent_id, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post(download_url, download_data) | |
| return result | |
| async def offline_list( | |
| self, | |
| size: int = 10000, | |
| next_page_token: Optional[str] = None, | |
| phase: Optional[List[str]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| size: int - 每次请求的数量 | |
| next_page_token: str - 下一页的page token | |
| phase: List[str] - Offline download task status, default is ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"] | |
| supported values: PHASE_TYPE_RUNNING, PHASE_TYPE_ERROR, PHASE_TYPE_COMPLETE, PHASE_TYPE_PENDING | |
| 获取离线下载列表 | |
| """ | |
| if phase is None: | |
| phase = ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"] | |
| list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks" | |
| list_data = { | |
| "type": "offline", | |
| "thumbnail_size": "SIZE_SMALL", | |
| "limit": size, | |
| "page_token": next_page_token, | |
| "filters": json.dumps({"phase": {"in": ",".join(phase)}}), | |
| "with": "reference_resource", | |
| } | |
| captcha_result = await self.captcha_init("GET:/drive/v1/tasks") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get(list_url, list_data) | |
| return result | |
| async def offline_file_info(self, file_id: str) -> Dict[str, Any]: | |
| """ | |
| file_id: str - 离线下载文件id | |
| 离线下载文件信息 | |
| """ | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files/{file_id}") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{file_id}" | |
| result = await self._request_get(url, {"thumbnail_size": "SIZE_LARGE"}) | |
| return result | |
| async def file_list( | |
| self, | |
| size: int = 100, | |
| parent_id: Optional[str] = None, | |
| next_page_token: Optional[str] = None, | |
| additional_filters: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| size: int - 每次请求的数量 | |
| parent_id: str - 父文件夹id, 默认列出根目录 | |
| next_page_token: str - 下一页的page token | |
| additional_filters: Dict[str, Any] - 额外的过滤条件 | |
| 获取文件列表,可以获得文件下载链接 | |
| """ | |
| default_filters = { | |
| "trashed": {"eq": False}, | |
| "phase": {"eq": "PHASE_TYPE_COMPLETE"}, | |
| } | |
| if additional_filters: | |
| default_filters.update(additional_filters) | |
| list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/files" | |
| list_data = { | |
| "parent_id": parent_id, | |
| "thumbnail_size": "SIZE_MEDIUM", | |
| "limit": size, | |
| "with_audit": "true", | |
| "page_token": next_page_token, | |
| "filters": json.dumps(default_filters), | |
| } | |
| captcha_result = await self.captcha_init("GET:/drive/v1/files") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get(list_url, list_data) | |
| return result | |
| async def events( | |
| self, size: int = 100, next_page_token: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| size: int - 每次请求的数量 | |
| next_page_token: str - 下一页的page token | |
| 获取最近添加事件列表 | |
| """ | |
| list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/events" | |
| list_data = { | |
| "thumbnail_size": "SIZE_MEDIUM", | |
| "limit": size, | |
| "next_page_token": next_page_token, | |
| } | |
| captcha_result = await self.captcha_init("GET:/drive/v1/files") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get(list_url, list_data) | |
| return result | |
| async def offline_task_retry(self, task_id: str) -> Dict[str, Any]: | |
| """ | |
| task_id: str - 离线下载任务id | |
| 重试离线下载任务 | |
| """ | |
| list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/task" | |
| list_data = { | |
| "type": "offline", | |
| "create_type": "RETRY", | |
| "id": task_id, | |
| } | |
| try: | |
| captcha_result = await self.captcha_init("GET:/drive/v1/task") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post(list_url, list_data) | |
| return result | |
| except Exception as e: | |
| raise PikpakException(f"重试离线下载任务失败: {task_id}. {e}") | |
| async def delete_tasks( | |
| self, task_ids: List[str], delete_files: bool = False | |
| ) -> None: | |
| """ | |
| delete tasks by task ids | |
| task_ids: List[str] - task ids to delete | |
| """ | |
| delete_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks" | |
| params = { | |
| "task_ids": task_ids, | |
| "delete_files": delete_files, | |
| } | |
| try: | |
| captcha_result = await self.captcha_init("GET:/drive/v1/tasks") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| await self._request_delete(delete_url, params=params) | |
| except Exception as e: | |
| raise PikpakException(f"Failing to delete tasks: {task_ids}. {e}") | |
| async def get_task_status(self, task_id: str, file_id: str) -> DownloadStatus: | |
| """ | |
| task_id: str - 离线下载任务id | |
| file_id: str - 离线下载文件id | |
| 获取离线下载任务状态, 临时实现, 后期可能变更 | |
| """ | |
| try: | |
| infos = await self.offline_list() | |
| if infos and infos.get("tasks", []): | |
| for task in infos.get("tasks", []): | |
| if task_id == task.get("id"): | |
| return DownloadStatus.downloading | |
| file_info = await self.offline_file_info(file_id=file_id) | |
| if file_info: | |
| return DownloadStatus.done | |
| else: | |
| return DownloadStatus.not_found | |
| except PikpakException: | |
| return DownloadStatus.error | |
| async def path_to_id(self, path: str, create: bool = False) -> List[Dict[str, str]]: | |
| """ | |
| path: str - 路径 | |
| create: bool - 是否创建不存在的文件夹 | |
| 将形如 /path/a/b 的路径转换为 文件夹的id | |
| """ | |
| if not path or len(path) <= 0: | |
| return [] | |
| paths = path.split("/") | |
| paths = [p.strip() for p in paths if len(p) > 0] | |
| # 构造不同级别的path表达式,尝试找到距离目标最近的那一层 | |
| multi_level_paths = ["/" + "/".join(paths[: i + 1]) for i in range(len(paths))] | |
| path_ids = [ | |
| self._path_id_cache[p] | |
| for p in multi_level_paths | |
| if p in self._path_id_cache | |
| ] | |
| # 判断缓存命中情况 | |
| hit_cnt = len(path_ids) | |
| if hit_cnt == len(paths): | |
| return path_ids | |
| elif hit_cnt == 0: | |
| count = 0 | |
| parent_id = None | |
| else: | |
| count = hit_cnt | |
| parent_id = path_ids[-1]["id"] | |
| next_page_token = None | |
| while count < len(paths): | |
| data = await self.file_list( | |
| parent_id=parent_id, next_page_token=next_page_token | |
| ) | |
| record_of_target_path = None | |
| for f in data.get("files", []): | |
| current_path = "/" + "/".join(paths[:count] + [f.get("name")]) | |
| file_type = ( | |
| "folder" if f.get("kind", "").find("folder") != -1 else "file" | |
| ) | |
| record = { | |
| "id": f.get("id"), | |
| "name": f.get("name"), | |
| "file_type": file_type, | |
| } | |
| self._path_id_cache[current_path] = record | |
| if f.get("name") == paths[count]: | |
| record_of_target_path = record | |
| # 不break: 剩下的文件也同样缓存起来 | |
| if record_of_target_path is not None: | |
| path_ids.append(record_of_target_path) | |
| count += 1 | |
| parent_id = record_of_target_path["id"] | |
| elif data.get("next_page_token") and ( | |
| not next_page_token or next_page_token != data.get("next_page_token") | |
| ): | |
| next_page_token = data.get("next_page_token") | |
| elif create: | |
| data = await self.create_folder(name=paths[count], parent_id=parent_id) | |
| file_id = data.get("file").get("id") | |
| record = { | |
| "id": file_id, | |
| "name": paths[count], | |
| "file_type": "folder", | |
| } | |
| path_ids.append(record) | |
| current_path = "/" + "/".join(paths[: count + 1]) | |
| self._path_id_cache[current_path] = record | |
| count += 1 | |
| parent_id = file_id | |
| else: | |
| break | |
| return path_ids | |
| async def file_batch_move( | |
| self, | |
| ids: List[str], | |
| to_parent_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件id列表 | |
| to_parent_id: str - 移动到的文件夹id, 默认为根目录 | |
| 批量移动文件 | |
| """ | |
| to = ( | |
| { | |
| "parent_id": to_parent_id, | |
| } | |
| if to_parent_id | |
| else {} | |
| ) | |
| captcha_result = await self.captcha_init("GET:/drive/v1/files:batchMove") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchMove", | |
| data={ | |
| "ids": ids, | |
| "to": to, | |
| }, | |
| ) | |
| return result | |
| async def file_batch_copy( | |
| self, | |
| ids: List[str], | |
| to_parent_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件id列表 | |
| to_parent_id: str - 复制到的文件夹id, 默认为根目录 | |
| 批量复制文件 | |
| """ | |
| to = ( | |
| { | |
| "parent_id": to_parent_id, | |
| } | |
| if to_parent_id | |
| else {} | |
| ) | |
| captcha_result = await self.captcha_init("GET:/drive/v1/files:batchCopy") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:batchCopy", | |
| data={ | |
| "ids": ids, | |
| "to": to, | |
| }, | |
| ) | |
| return result | |
| async def file_move_or_copy_by_path( | |
| self, | |
| from_path: List[str], | |
| to_path: str, | |
| move: bool = False, | |
| create: bool = False, | |
| ) -> Dict[str, Any]: | |
| """ | |
| from_path: List[str] - 要移动或复制的文件路径列表 | |
| to_path: str - 移动或复制到的路径 | |
| is_move: bool - 是否移动, 默认为复制 | |
| create: bool - 是否创建不存在的文件夹 | |
| 根据路径移动或复制文件 | |
| """ | |
| from_ids: List[str] = [] | |
| for path in from_path: | |
| if path_ids := await self.path_to_id(path): | |
| if file_id := path_ids[-1].get("id"): | |
| from_ids.append(file_id) | |
| if not from_ids: | |
| raise PikpakException("要移动的文件不存在") | |
| to_path_ids = await self.path_to_id(to_path, create=create) | |
| if to_path_ids: | |
| to_parent_id = to_path_ids[-1].get("id") | |
| else: | |
| to_parent_id = None | |
| if move: | |
| result = await self.file_batch_move(ids=from_ids, to_parent_id=to_parent_id) | |
| else: | |
| result = await self.file_batch_copy(ids=from_ids, to_parent_id=to_parent_id) | |
| return result | |
| async def get_download_url(self, file_id: str) -> Dict[str, Any]: | |
| """ | |
| id: str - 文件id | |
| Returns the file details data. | |
| 1. Use `medias[0][link][url]` for streaming with high speed in streaming services or tools. | |
| 2. Use `web_content_link` to download the file | |
| """ | |
| result = await self.captcha_init( | |
| action=f"GET:/drive/v1/files/{file_id}", | |
| ) | |
| self.captcha_token = result.get("captcha_token") | |
| result = await self._request_get( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{file_id}?", | |
| ) | |
| self.captcha_token = None | |
| return result | |
| async def file_rename(self, id: str, new_file_name: str) -> Dict[str, Any]: | |
| """ | |
| id: str - 文件id | |
| new_file_name: str - 新的文件名 | |
| 重命名文件 | |
| 返回文件的详细信息 | |
| """ | |
| data = { | |
| "name": new_file_name, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files/{id}") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_patch( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files/{id}", | |
| data=data, | |
| ) | |
| return result | |
| async def file_batch_star( | |
| self, | |
| ids: List[str], | |
| ) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件id列表 | |
| 批量给文件加星标 | |
| """ | |
| data = { | |
| "ids": ids, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files/star") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:star", | |
| data=data, | |
| ) | |
| return result | |
| async def file_batch_unstar( | |
| self, | |
| ids: List[str], | |
| ) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件id列表 | |
| 批量给文件取消星标 | |
| """ | |
| data = { | |
| "ids": ids, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/files/unstar") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/files:unstar", | |
| data=data, | |
| ) | |
| return result | |
| async def file_star_list( | |
| self, | |
| size: int = 100, | |
| next_page_token: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| size: int - 每次请求的数量 | |
| next_page_token: str - 下一页的page token | |
| 获取加星标的文件列表,可以获得文件下载链接 | |
| parent_id只可取默认值*,子目录列表通过获取星标目录以后自行使用file_list方法获取 | |
| """ | |
| additional_filters = {"system_tag": {"in": "STAR"}} | |
| result = await self.file_list( | |
| size=size, | |
| parent_id="*", | |
| next_page_token=next_page_token, | |
| additional_filters=additional_filters, | |
| ) | |
| return result | |
| async def file_batch_share( | |
| self, | |
| ids: List[str], | |
| need_password: Optional[bool] = False, | |
| expiration_days: Optional[int] = -1, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ids: List[str] - 文件id列表 | |
| need_password: Optional[bool] - 是否需要分享密码 | |
| expiration_days: Optional[int] - 分享天数 | |
| 批量分享文件,并生成分享链接 | |
| 返回数据结构: | |
| { | |
| "share_id": "xxx", //分享ID | |
| "share_url": "https://mypikpak.com/s/xxx", // 分享链接 | |
| "pass_code": "53fe", // 分享密码 | |
| "share_text": "https://mypikpak.com/s/xxx", | |
| "share_list": [] | |
| } | |
| """ | |
| data = { | |
| "file_ids": ids, | |
| "share_to": "encryptedlink" if need_password else "publiclink", | |
| "expiration_days": expiration_days, | |
| "pass_code_option": "REQUIRED" if need_password else "NOT_REQUIRED", | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/share") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share", | |
| data=data, | |
| ) | |
| return result | |
| async def get_quota_info(self) -> Dict[str, Any]: | |
| """ | |
| 获取当前空间的quota信息 | |
| 返回数据结构如下: | |
| { | |
| "kind": "drive#about", | |
| "quota": { | |
| "kind": "drive#quota", | |
| "limit": "10995116277760", //空间总大小, 单位Byte | |
| "usage": "5113157556024", // 已用空间大小,单位Byte | |
| "usage_in_trash": "1281564700871", // 回收站占用大小,单位Byte | |
| "play_times_limit": "-1", | |
| "play_times_usage": "0" | |
| }, | |
| "expires_at": "", | |
| "quotas": {} | |
| } | |
| """ | |
| # captcha_result = await self.captcha_init(f"GET:/drive/v1/about") | |
| # self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/about", | |
| ) | |
| return result | |
| async def get_invite_code(self): | |
| captcha_result = await self.captcha_init(f"GET:/vip/v1/activity/inviteCode") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get( | |
| url=f"https://{self.PIKPAK_API_HOST}/vip/v1/activity/inviteCode", | |
| ) | |
| return result["code"] | |
| async def vip_info(self): | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/privilege/vip") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/privilege/vip", | |
| ) | |
| return result | |
| async def get_transfer_quota(self) -> Dict[str, Any]: | |
| """ | |
| Get transfer quota | |
| """ | |
| url = f"https://{self.PIKPAK_API_HOST}/vip/v1/quantity/list?type=transfer" | |
| captcha_result = await self.captcha_init( | |
| f"GET:/vip/v1/quantity/list?type=transfer" | |
| ) | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_get(url) | |
| return result | |
| async def get_share_folder( | |
| self, share_id: str, pass_code_token: str, parent_id: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| 获取分享链接下文件夹内容 | |
| Args: | |
| share_id: str - 分享ID eg. /s/VO8BcRb-XXXXX 的 VO8BcRb-XXXXX | |
| pass_code_token: str - 通过 get_share_info 获取到的 pass_code_token | |
| parent_id: str - 父文件夹id, 默认列出根目录 | |
| """ | |
| data = { | |
| "limit": "100", | |
| "thumbnail_size": "SIZE_LARGE", | |
| "order": "6", | |
| "share_id": share_id, | |
| "parent_id": parent_id, | |
| "pass_code_token": pass_code_token, | |
| } | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share/detail" | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/share/detail") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| return await self._request_get(url, params=data) | |
| async def get_share_info( | |
| self, share_link: str, pass_code: str = None | |
| ) -> ValueError | Dict[str, Any] | List[Dict[str | Any, str | Any]]: | |
| """ | |
| 获取分享链接下内容 | |
| Args: | |
| share_link: str - 分享链接 | |
| pass_code: str - 分享密码, 无密码则留空 | |
| """ | |
| match = re.search(r"/s/([^/]+)(?:.*/([^/]+))?$", share_link) | |
| if match: | |
| share_id = match.group(1) | |
| parent_id = match.group(2) if match.group(2) else None | |
| else: | |
| return ValueError("Share Link Is Not Right") | |
| data = { | |
| "limit": "100", | |
| "thumbnail_size": "SIZE_LARGE", | |
| "order": "3", | |
| "share_id": share_id, | |
| "parent_id": parent_id, | |
| "pass_code": pass_code, | |
| } | |
| url = f"https://{self.PIKPAK_API_HOST}/drive/v1/share" | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/share") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| return await self._request_get(url, params=data) | |
| async def restore( | |
| self, share_id: str, pass_code_token: str, file_ids: List[str] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Args: | |
| share_id: 分享链接eg. /s/VO8BcRb-XXXXX 的 VO8BcRb-XXXXX | |
| pass_code_token: get_share_info获取, 无密码则留空 | |
| file_ids: 需要转存的文件/文件夹ID列表, get_share_info获取id值 | |
| """ | |
| data = { | |
| "share_id": share_id, | |
| "pass_code_token": pass_code_token, | |
| "file_ids": file_ids, | |
| } | |
| captcha_result = await self.captcha_init(f"GET:/drive/v1/share/restore") | |
| self.captcha_token = captcha_result.get("captcha_token") | |
| result = await self._request_post( | |
| url=f"https://{self.PIKPAK_API_HOST}/drive/v1/share/restore", data=data | |
| ) | |
| return result | |