Spaces:
Sleeping
Sleeping
File size: 5,477 Bytes
da3fe02 5f9bad9 da3fe02 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
from __future__ import annotations
import asyncio
import json
import random
from dataclasses import dataclass
from typing import Any, Optional
import httpx
from .utils import ApiMetrics, RateLimiter
BASE_URL = "https://open.api.nexon.com"
class ApiError(RuntimeError):
def __init__(self, status_code: int, message: str, payload: Any = None) -> None:
super().__init__(message)
self.status_code = status_code
self.payload = payload
class RateLimitError(ApiError):
def __init__(self, status_code: int, message: str, payload: Any = None, retry_after: Optional[float] = None) -> None:
super().__init__(status_code, message, payload)
self.retry_after = retry_after
class ServerError(ApiError):
pass
class DataPreparingError(ApiError):
pass
class TransportError(RuntimeError):
pass
@dataclass
class ApiClient:
api_key: str
concurrency: int = 8
rps: float = 500.0
timeout_seconds: float = 30.0
max_attempts: int = 5
def __post_init__(self) -> None:
self._client: Optional[httpx.AsyncClient] = None
self._semaphore = asyncio.Semaphore(self.concurrency)
self._rate_limiter = RateLimiter(self.rps)
self.metrics = ApiMetrics()
async def __aenter__(self) -> "ApiClient":
headers = {"x-nxopen-api-key": self.api_key}
self._client = httpx.AsyncClient(base_url=BASE_URL, headers=headers)
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
if self._client:
await self._client.aclose()
async def get(self, path: str, params: dict[str, Any]) -> dict[str, Any]:
return await self._request_json("GET", path, params=params)
async def _request_json(self, method: str, path: str, params: dict[str, Any]) -> dict[str, Any]:
attempt = 0
while True:
attempt += 1
try:
await self._rate_limiter.acquire()
async with self._semaphore:
assert self._client is not None
response = await self._client.request(
method,
path,
params=params,
timeout=self.timeout_seconds,
)
self.metrics.total_requests += 1
if 200 <= response.status_code < 300:
return response.json()
payload = _safe_json(response)
message = _extract_message(payload)
if response.status_code == 400 and _is_data_preparing(payload):
self.metrics.data_preparing_hits += 1
raise DataPreparingError(response.status_code, message, payload)
if response.status_code == 429:
self.metrics.rate_limit_hits += 1
retry_after = _retry_after_seconds(response)
raise RateLimitError(response.status_code, message, payload, retry_after=retry_after)
if response.status_code >= 500:
self.metrics.server_errors += 1
raise ServerError(response.status_code, message, payload)
self.metrics.other_errors += 1
raise ApiError(response.status_code, message, payload)
except (httpx.TimeoutException, httpx.TransportError) as exc:
self.metrics.other_errors += 1
error = TransportError(str(exc))
except (RateLimitError, ServerError, DataPreparingError) as exc:
error = exc
except ApiError:
raise
if attempt >= self.max_attempts:
raise error
await asyncio.sleep(_compute_wait_seconds(error, attempt))
def _safe_json(response: httpx.Response) -> Any:
try:
return response.json()
except json.JSONDecodeError:
return {"message": response.text}
def _extract_message(payload: Any) -> str:
if isinstance(payload, dict):
if isinstance(payload.get("error"), dict):
return payload["error"].get("message") or "API error"
return payload.get("message") or "API error"
return "API error"
def _extract_code(payload: Any) -> Optional[str]:
if isinstance(payload, dict):
if isinstance(payload.get("error"), dict):
return payload["error"].get("code") or payload["error"].get("error_code")
return payload.get("code") or payload.get("error_code")
return None
def _is_data_preparing(payload: Any) -> bool:
code = _extract_code(payload)
message = _extract_message(payload)
if code and code.upper() == "OPENAPI00009":
return True
if message and "Data being prepared" in message:
return True
return False
def _retry_after_seconds(response: httpx.Response) -> Optional[float]:
value = response.headers.get("Retry-After")
if not value:
return None
try:
return float(value)
except ValueError:
return None
def _compute_wait_seconds(error: Exception, attempt: int) -> float:
if isinstance(error, DataPreparingError):
return random.uniform(30, 120)
base = 1.0
max_wait = 30.0
wait = min(max_wait, base * (2 ** (attempt - 1)))
jitter = random.uniform(0, 0.5)
if isinstance(error, RateLimitError) and error.retry_after:
return max(wait + jitter, error.retry_after)
return wait + jitter
|