Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import httpx | |
| from core.conf import settings | |
| _token_cache = { | |
| "access_token": None, | |
| "expires_at": 0, | |
| } | |
| API_BASE_URL_ACCESS_TOKEN = settings.API_BASE_URL_ACCESS_TOKEN | |
| async def get_access_token() -> str: | |
| current_time = int(time.time()) | |
| if _token_cache["access_token"] is None or current_time >= _token_cache["expires_at"]: | |
| await refresh_access_token() | |
| return _token_cache["access_token"] | |
| async def refresh_access_token(): | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(url=API_BASE_URL_ACCESS_TOKEN) | |
| response.raise_for_status() | |
| data = response.json() | |
| access_token = data.get("data") | |
| expires_in = 3600 | |
| _token_cache["access_token"] = access_token | |
| _token_cache["expires_at"] = int(time.time()) + expires_in - 60 | |