douyin2api / crawlers /base_crawler.py
fengmiguoji's picture
Upload 82 files
4be1dd5 verified
# ==============================================================================
# Copyright (C) 2021 Evil0ctal
#
# This file is part of the Douyin_TikTok_Download_API project.
#
# This project is licensed under the Apache License 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#         __
#        />  フ
#       |  _  _ l
#       /` ミ_xノ
#      /      | Feed me Stars ⭐ ️
#     /  ヽ   ノ
#     │  | | |
#  / ̄|   | | |
#  | ( ̄ヽ__ヽ_)__)
#  \二つ
# ==============================================================================
#
# Contributor Link:
# - https://github.com/Evil0ctal
# - https://github.com/Johnserf-Seed
#
# ==============================================================================
import httpx
import json
import asyncio
import re
from httpx import Response
from crawlers.utils.logger import logger
from crawlers.utils.api_exceptions import (
APIError,
APIConnectionError,
APIResponseError,
APITimeoutError,
APIUnavailableError,
APIUnauthorizedError,
APINotFoundError,
APIRateLimitError,
APIRetryExhaustedError,
)
class BaseCrawler:
"""
基础爬虫客户端 (Base crawler client)
"""
def __init__(
self,
proxies: dict = None,
max_retries: int = 3,
max_connections: int = 50,
timeout: int = 10,
max_tasks: int = 50,
crawler_headers: dict = {},
):
if isinstance(proxies, dict):
self.proxies = proxies
# [f"{k}://{v}" for k, v in proxies.items()]
else:
self.proxies = None
# 爬虫请求头 / Crawler request header
self.crawler_headers = crawler_headers or {}
# 异步的任务数 / Number of asynchronous tasks
self._max_tasks = max_tasks
self.semaphore = asyncio.Semaphore(max_tasks)
# 限制最大连接数 / Limit the maximum number of connections
self._max_connections = max_connections
self.limits = httpx.Limits(max_connections=max_connections)
# 业务逻辑重试次数 / Business logic retry count
self._max_retries = max_retries
# 底层连接重试次数 / Underlying connection retry count
self.atransport = httpx.AsyncHTTPTransport(retries=max_retries)
# 超时等待时间 / Timeout waiting time
self._timeout = timeout
self.timeout = httpx.Timeout(timeout)
# 异步客户端 / Asynchronous client
self.aclient = httpx.AsyncClient(
headers=self.crawler_headers,
proxies=self.proxies,
timeout=self.timeout,
limits=self.limits,
transport=self.atransport,
)
async def fetch_response(self, endpoint: str) -> Response:
"""获取数据 (Get data)
Args:
endpoint (str): 接口地址 (Endpoint URL)
Returns:
Response: 原始响应对象 (Raw response object)
"""
return await self.get_fetch_data(endpoint)
async def fetch_get_json(self, endpoint: str) -> dict:
"""获取 JSON 数据 (Get JSON data)
Args:
endpoint (str): 接口地址 (Endpoint URL)
Returns:
dict: 解析后的JSON数据 (Parsed JSON data)
"""
response = await self.get_fetch_data(endpoint)
return self.parse_json(response)
async def fetch_post_json(self, endpoint: str, params: dict = {}, data=None) -> dict:
"""获取 JSON 数据 (Post JSON data)
Args:
endpoint (str): 接口地址 (Endpoint URL)
Returns:
dict: 解析后的JSON数据 (Parsed JSON data)
"""
response = await self.post_fetch_data(endpoint, params, data)
return self.parse_json(response)
def parse_json(self, response: Response) -> dict:
"""解析JSON响应对象 (Parse JSON response object)
Args:
response (Response): 原始响应对象 (Raw response object)
Returns:
dict: 解析后的JSON数据 (Parsed JSON data)
"""
if (
response is not None
and isinstance(response, Response)
and response.status_code == 200
):
try:
return response.json()
except json.JSONDecodeError as e:
# 尝试使用正则表达式匹配response.text中的json数据
match = re.search(r"\{.*\}", response.text)
try:
return json.loads(match.group())
except json.JSONDecodeError as e:
logger.error("解析 {0} 接口 JSON 失败: {1}".format(response.url, e))
raise APIResponseError("解析JSON数据失败")
else:
if isinstance(response, Response):
logger.error(
"获取数据失败。状态码: {0}".format(response.status_code)
)
else:
logger.error("无效响应类型。响应类型: {0}".format(type(response)))
raise APIResponseError("获取数据失败")
async def get_fetch_data(self, url: str):
"""
获取GET端点数据 (Get GET endpoint data)
Args:
url (str): 端点URL (Endpoint URL)
Returns:
response: 响应内容 (Response content)
"""
for attempt in range(self._max_retries):
try:
response = await self.aclient.get(url, follow_redirects=True)
if not response.text.strip() or not response.content:
error_message = "第 {0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1,
response.status_code,
response.url)
logger.warning(error_message)
if attempt == self._max_retries - 1:
raise APIRetryExhaustedError(
"获取端点数据失败, 次数达到上限"
)
await asyncio.sleep(self._timeout)
continue
# logger.info("响应状态码: {0}".format(response.status_code))
response.raise_for_status()
return response
except httpx.RequestError:
raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}"
.format(url, self.proxies, self.__class__.__name__)
)
except httpx.HTTPStatusError as http_error:
self.handle_http_status_error(http_error, url, attempt + 1)
except APIError as e:
e.display_error()
async def post_fetch_data(self, url: str, params: dict = {}, data=None):
"""
获取POST端点数据 (Get POST endpoint data)
Args:
url (str): 端点URL (Endpoint URL)
params (dict): POST请求参数 (POST request parameters)
Returns:
response: 响应内容 (Response content)
"""
for attempt in range(self._max_retries):
try:
response = await self.aclient.post(
url,
json=None if not params else dict(params),
data=None if not data else data,
follow_redirects=True
)
if not response.text.strip() or not response.content:
error_message = "第 {0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1,
response.status_code,
response.url)
logger.warning(error_message)
if attempt == self._max_retries - 1:
raise APIRetryExhaustedError(
"获取端点数据失败, 次数达到上限"
)
await asyncio.sleep(self._timeout)
continue
# logger.info("响应状态码: {0}".format(response.status_code))
response.raise_for_status()
return response
except httpx.RequestError:
raise APIConnectionError(
"连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(url, self.proxies,
self.__class__.__name__)
)
except httpx.HTTPStatusError as http_error:
self.handle_http_status_error(http_error, url, attempt + 1)
except APIError as e:
e.display_error()
async def head_fetch_data(self, url: str):
"""
获取HEAD端点数据 (Get HEAD endpoint data)
Args:
url (str): 端点URL (Endpoint URL)
Returns:
response: 响应内容 (Response content)
"""
try:
response = await self.aclient.head(url)
# logger.info("响应状态码: {0}".format(response.status_code))
response.raise_for_status()
return response
except httpx.RequestError:
raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(
url, self.proxies, self.__class__.__name__
)
)
except httpx.HTTPStatusError as http_error:
self.handle_http_status_error(http_error, url, 1)
except APIError as e:
e.display_error()
def handle_http_status_error(self, http_error, url: str, attempt):
"""
处理HTTP状态错误 (Handle HTTP status error)
Args:
http_error: HTTP状态错误 (HTTP status error)
url: 端点URL (Endpoint URL)
attempt: 尝试次数 (Number of attempts)
Raises:
APIConnectionError: 连接端点失败 (Failed to connect to endpoint)
APIResponseError: 响应错误 (Response error)
APIUnavailableError: 服务不可用 (Service unavailable)
APINotFoundError: 端点不存在 (Endpoint does not exist)
APITimeoutError: 连接超时 (Connection timeout)
APIUnauthorizedError: 未授权 (Unauthorized)
APIRateLimitError: 请求频率过高 (Request frequency is too high)
APIRetryExhaustedError: 重试次数达到上限 (The number of retries has reached the upper limit)
"""
response = getattr(http_error, "response", None)
status_code = getattr(response, "status_code", None)
if response is None or status_code is None:
logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format(
http_error, url, attempt
)
)
raise APIResponseError(f"处理HTTP错误时遇到意外情况: {http_error}")
if status_code == 302:
pass
elif status_code == 404:
raise APINotFoundError(f"HTTP Status Code {status_code}")
elif status_code == 503:
raise APIUnavailableError(f"HTTP Status Code {status_code}")
elif status_code == 408:
raise APITimeoutError(f"HTTP Status Code {status_code}")
elif status_code == 401:
raise APIUnauthorizedError(f"HTTP Status Code {status_code}")
elif status_code == 429:
raise APIRateLimitError(f"HTTP Status Code {status_code}")
else:
logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format(
status_code, url, attempt
)
)
raise APIResponseError(f"HTTP状态错误: {status_code}")
async def close(self):
await self.aclient.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclient.aclose()