File size: 3,474 Bytes
feb939c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a3d38f
feb939c
0a3d38f
feb939c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 用于刷新idToken的定时任务
import asyncio
import os
import time
from typing import Optional
from datetime import datetime
from core.utils import sign_in_with_idp, handle_firebase_response, refresh_token_via_rest


class TokenManager:
    def __init__(self):
        self.id_token: Optional[str] = None
        self.last_refresh_time: Optional[float] = None
        self.refresh_interval = 30 * 60  # 30分钟,单位:秒
        self.is_running = False

    async def get_token(self) -> str:
        """
        获取当前的 idToken,如果不存在或已过期则刷新
        """
        if not self.id_token or self._should_refresh():
            await self.refresh_token()
        return self.id_token

    def _should_refresh(self) -> bool:
        """
        检查是否需要刷新 token
        """
        if not self.last_refresh_time:
            return True
        return time.time() - self.last_refresh_time >= self.refresh_interval

    async def refresh_token(self):
        """
        刷新 idToken
        """
        try:
            if os.getenv("REFRESH_TOKEN", "") == "" or os.getenv("REFRESH_TOKEN", "") == "None":
                response = await sign_in_with_idp()
                result = await handle_firebase_response(response) # idToken 实际就是bearer token
            else:
                result = await refresh_token_via_rest(os.getenv("REFRESH_TOKEN"))
            if result is not None:
                self.id_token = result
                # 修改配置中的 TOKEN
                # print(f"Before Token is {os.getenv('TOKEN', '')}")
                os.environ["TOKEN"] = self.id_token
                # print(f"Now Token is {os.getenv('TOKEN', '')}")
                self.last_refresh_time = time.time()
                print(f"Token refreshed at {datetime.now()}")
            else:
                print(f"Failed to refresh token: {result['error']}")
        except Exception as e:
            print(f"Error refreshing token: {str(e)}")

    async def start_auto_refresh(self):
        """
        启动自动刷新任务
        """
        if self.is_running:
            return

        self.is_running = True
        while self.is_running:
            try:
                await self.refresh_token()
                # 等待到下次刷新时间
                await asyncio.sleep(self.refresh_interval)
            except Exception as e:
                print(f"Auto refresh error: {str(e)}")
                # 发生错误时等待短暂时间后重试
                await asyncio.sleep(60)

    async def stop_auto_refresh(self):
        """
        停止自动刷新任务
        """
        self.is_running = False

# 使用示例
# async def main():
#     # 创建 TokenManager 实例
#     token_manager = TokenManager()
#
#     try:
#         # 启动自动刷新任务
#         refresh_task = asyncio.create_task(token_manager.start_auto_refresh())
#
#         # 模拟应用运行
#         while True:
#             # 获取当前 token
#             token = await token_manager.get_token()
#             print(f"Current token: {token[:20]}...")
#
#             # 等待一段时间再次获取
#             await asyncio.sleep(300)  # 每5分钟打印一次当前token
#
#     except KeyboardInterrupt:
#         # 处理 Ctrl+C
#         await token_manager.stop_auto_refresh()
#         await refresh_task
#
# # 运行示例
# if __name__ == "__main__":
#     asyncio.run(main())