Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import httpx | |
| from loguru import logger | |
| from core.conf import settings | |
| _token_cache = { | |
| "access_token": None, | |
| "expires_at": 0, | |
| } | |
| API_BASE_URL_ACCESS_TOKEN = settings.API_BASE_URL_ACCESS_TOKEN | |
| # API_BASE_URL_ACCESS_TOKEN = "https://api.futabus.vn/identity/api/token/anonymous-token" | |
| async def get_access_token() -> str: | |
| current_time = int(time.time()) | |
| if ( | |
| _token_cache["access_token"] is None | |
| or current_time >= _token_cache["expires_at"] | |
| ): | |
| await refresh_access_token() | |
| return _token_cache["access_token"] | |
| async def refresh_access_token(): | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(url=API_BASE_URL_ACCESS_TOKEN) | |
| response.raise_for_status() | |
| data = response.json() | |
| access_token = data.get("data", "") | |
| if not access_token: | |
| logger.error("Access token is empty or not found in the response") | |
| raise ValueError("Access token is empty or not found in the response") | |
| expires_in = 3600 | |
| _token_cache["access_token"] = access_token | |
| _token_cache["expires_at"] = int(time.time()) + expires_in - 60 | |