Spaces:
Sleeping
Sleeping
| import httpx | |
| from loguru import logger | |
| from core.conf import settings | |
| from typing import Optional, Dict, Any | |
| from core.token_manager import get_access_token | |
| API_BASE_URL = settings.API_BASE_URL | |
| # API_BASE_URL = "https://api.futabus.vn" | |
| class API: | |
| async def get( | |
| endpoint: str, | |
| params: Optional[Dict[str, Any]] = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| api_base: str = None, | |
| ): | |
| if api_base: | |
| url = f"{api_base}{endpoint}" | |
| else: | |
| url = f"{API_BASE_URL}{endpoint}" | |
| access_token = await get_access_token() | |
| headers = headers or {} | |
| headers.setdefault("Content-Type", "application/json") | |
| headers["Authorization"] = f"Bearer {access_token}" | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| # except httpx.HTTPStatusError as http_err: | |
| # return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} | |
| except httpx.HTTPStatusError as http_err: | |
| ## RETRY FOR 403 ERROR | |
| if ( | |
| http_err.response.status_code == 403 | |
| or http_err.response.status_code == 401 | |
| ): | |
| headers.pop("Authorization", None) | |
| if http_err.response.status_code == 403: | |
| headers["x-access-token"] = access_token | |
| headers["token_type"] = "anonymous" | |
| try: | |
| response = await client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as second_err: | |
| return { | |
| "error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}" | |
| } | |
| except Exception as second_err: | |
| return { | |
| "error": f"[403 RETRY] Request failed: {str(second_err)}" | |
| } | |
| else: | |
| return { | |
| "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
| } | |
| except Exception as err: | |
| return {"error": f"Request failed: {str(err)}"} | |
| async def post( | |
| endpoint: str, | |
| payload: Dict[str, Any] = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| api_base: str = None, | |
| ): | |
| url = "" | |
| if api_base: | |
| url = f"{api_base}{endpoint}" | |
| else: | |
| url = f"{API_BASE_URL}{endpoint}" | |
| access_token = await get_access_token() | |
| headers = headers or {} | |
| headers.setdefault("Content-Type", "application/json") | |
| headers["Authorization"] = f"Bearer {access_token}" | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as http_err: | |
| ## RETRY FOR 403 ERROR | |
| if ( | |
| http_err.response.status_code == 403 | |
| or http_err.response.status_code == 401 | |
| ): | |
| headers.pop("Authorization", None) | |
| headers["x-access-token"] = access_token | |
| headers["token_type"] = "anonymous" | |
| try: | |
| response = await client.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as second_err: | |
| return { | |
| "error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}" | |
| } | |
| except Exception as second_err: | |
| return { | |
| "error": f"[403 RETRY] Request failed: {str(second_err)}" | |
| } | |
| else: | |
| return { | |
| "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
| } | |
| except Exception as err: | |
| return {"error": f"Request failed: {str(err)}"} | |
| async def put_api( | |
| endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None | |
| ): | |
| url = f"{API_BASE_URL}{endpoint}" | |
| access_token = await get_access_token() | |
| headers = headers or {} | |
| headers.setdefault("Content-Type", "application/json") | |
| headers["Authorization"] = f"Bearer {access_token}" | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.put(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as http_err: | |
| return { | |
| "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
| } | |
| except Exception as err: | |
| return {"error": f"Request failed: {str(err)}"} | |
| async def delete_api( | |
| endpoint: str, | |
| params: Optional[Dict[str, Any]] = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| ): | |
| url = f"{API_BASE_URL}{endpoint}" | |
| access_token = await get_access_token() | |
| headers = headers or {} | |
| headers.setdefault("Content-Type", "application/json") | |
| headers["Authorization"] = f"Bearer {access_token}" | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.delete(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as http_err: | |
| return { | |
| "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" | |
| } | |
| except Exception as err: | |
| return {"error": f"Request failed: {str(err)}"} | |
| api: API = API() | |