import httpx from loguru import logger from core.conf import settings from typing import Optional, Dict, Any from core.token_manager import get_access_token API_BASE_URL = settings.API_BASE_URL # API_BASE_URL = "https://api.futabus.vn" class API: @staticmethod async def get( endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, api_base: str = None, ): if api_base: url = f"{api_base}{endpoint}" else: url = f"{API_BASE_URL}{endpoint}" access_token = await get_access_token() headers = headers or {} headers.setdefault("Content-Type", "application/json") headers["Authorization"] = f"Bearer {access_token}" async with httpx.AsyncClient() as client: try: response = await client.get(url, headers=headers, params=params) response.raise_for_status() return response.json() # except httpx.HTTPStatusError as http_err: # return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"} except httpx.HTTPStatusError as http_err: ## RETRY FOR 403 ERROR if ( http_err.response.status_code == 403 or http_err.response.status_code == 401 ): headers.pop("Authorization", None) if http_err.response.status_code == 403: headers["x-access-token"] = access_token headers["token_type"] = "anonymous" try: response = await client.get(url, headers=headers, params=params) response.raise_for_status() return response.json() except httpx.HTTPStatusError as second_err: return { "error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}" } except Exception as second_err: return { "error": f"[403 RETRY] Request failed: {str(second_err)}" } else: return { "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" } except Exception as err: return {"error": f"Request failed: {str(err)}"} @staticmethod async def post( endpoint: str, payload: Dict[str, Any] = None, headers: Optional[Dict[str, str]] = None, api_base: str = None, ): url = "" if api_base: url = f"{api_base}{endpoint}" else: url = f"{API_BASE_URL}{endpoint}" access_token = await get_access_token() headers = headers or {} headers.setdefault("Content-Type", "application/json") headers["Authorization"] = f"Bearer {access_token}" async with httpx.AsyncClient() as client: try: response = await client.post(url, json=payload, headers=headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as http_err: ## RETRY FOR 403 ERROR if ( http_err.response.status_code == 403 or http_err.response.status_code == 401 ): headers.pop("Authorization", None) headers["x-access-token"] = access_token headers["token_type"] = "anonymous" try: response = await client.post(url, json=payload, headers=headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as second_err: return { "error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}" } except Exception as second_err: return { "error": f"[403 RETRY] Request failed: {str(second_err)}" } else: return { "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" } except Exception as err: return {"error": f"Request failed: {str(err)}"} @staticmethod async def put_api( endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None ): url = f"{API_BASE_URL}{endpoint}" access_token = await get_access_token() headers = headers or {} headers.setdefault("Content-Type", "application/json") headers["Authorization"] = f"Bearer {access_token}" async with httpx.AsyncClient() as client: try: response = await client.put(url, json=payload, headers=headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as http_err: return { "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" } except Exception as err: return {"error": f"Request failed: {str(err)}"} @staticmethod async def delete_api( endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ): url = f"{API_BASE_URL}{endpoint}" access_token = await get_access_token() headers = headers or {} headers.setdefault("Content-Type", "application/json") headers["Authorization"] = f"Bearer {access_token}" async with httpx.AsyncClient() as client: try: response = await client.delete(url, headers=headers, params=params) response.raise_for_status() return response.json() except httpx.HTTPStatusError as http_err: return { "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}" } except Exception as err: return {"error": f"Request failed: {str(err)}"} api: API = API()