Spaces:
Sleeping
Sleeping
| # 用于刷新idToken的定时任务 | |
| import asyncio | |
| import os | |
| import time | |
| from typing import Optional | |
| from datetime import datetime | |
| from core.utils import sign_in_with_idp, handle_firebase_response, refresh_token_via_rest | |
| class TokenManager: | |
| def __init__(self): | |
| self.id_token: Optional[str] = None | |
| self.last_refresh_time: Optional[float] = None | |
| self.refresh_interval = 30 * 60 # 30分钟,单位:秒 | |
| self.is_running = False | |
| async def get_token(self) -> str: | |
| """ | |
| 获取当前的 idToken,如果不存在或已过期则刷新 | |
| """ | |
| if not self.id_token or self._should_refresh(): | |
| await self.refresh_token() | |
| return self.id_token | |
| def _should_refresh(self) -> bool: | |
| """ | |
| 检查是否需要刷新 token | |
| """ | |
| if not self.last_refresh_time: | |
| return True | |
| return time.time() - self.last_refresh_time >= self.refresh_interval | |
| async def refresh_token(self): | |
| """ | |
| 刷新 idToken | |
| """ | |
| try: | |
| if os.getenv("REFRESH_TOKEN", "") == "" or os.getenv("REFRESH_TOKEN", "") == "None": | |
| response = await sign_in_with_idp() | |
| result = await handle_firebase_response(response) # idToken 实际就是bearer token | |
| else: | |
| result = await refresh_token_via_rest(os.getenv("REFRESH_TOKEN")) | |
| if result is not None: | |
| self.id_token = result | |
| # 修改配置中的 TOKEN | |
| # print(f"Before Token is {os.getenv('TOKEN', '')}") | |
| os.environ["TOKEN"] = self.id_token | |
| # print(f"Now Token is {os.getenv('TOKEN', '')}") | |
| self.last_refresh_time = time.time() | |
| print(f"Token refreshed at {datetime.now()}") | |
| else: | |
| print(f"Failed to refresh token: {result['error']}") | |
| except Exception as e: | |
| print(f"Error refreshing token: {str(e)}") | |
| async def start_auto_refresh(self): | |
| """ | |
| 启动自动刷新任务 | |
| """ | |
| if self.is_running: | |
| return | |
| self.is_running = True | |
| while self.is_running: | |
| try: | |
| await self.refresh_token() | |
| # 等待到下次刷新时间 | |
| await asyncio.sleep(self.refresh_interval) | |
| except Exception as e: | |
| print(f"Auto refresh error: {str(e)}") | |
| # 发生错误时等待短暂时间后重试 | |
| await asyncio.sleep(60) | |
| async def stop_auto_refresh(self): | |
| """ | |
| 停止自动刷新任务 | |
| """ | |
| self.is_running = False | |
| # 使用示例 | |
| # async def main(): | |
| # # 创建 TokenManager 实例 | |
| # token_manager = TokenManager() | |
| # | |
| # try: | |
| # # 启动自动刷新任务 | |
| # refresh_task = asyncio.create_task(token_manager.start_auto_refresh()) | |
| # | |
| # # 模拟应用运行 | |
| # while True: | |
| # # 获取当前 token | |
| # token = await token_manager.get_token() | |
| # print(f"Current token: {token[:20]}...") | |
| # | |
| # # 等待一段时间再次获取 | |
| # await asyncio.sleep(300) # 每5分钟打印一次当前token | |
| # | |
| # except KeyboardInterrupt: | |
| # # 处理 Ctrl+C | |
| # await token_manager.stop_auto_refresh() | |
| # await refresh_task | |
| # | |
| # # 运行示例 | |
| # if __name__ == "__main__": | |
| # asyncio.run(main()) | |