Spaces:
Running
Running
| """JWT管理模块 | |
| 负责JWT token的生成、刷新和管理 | |
| """ | |
| import asyncio | |
| import base64 | |
| import hashlib | |
| import hmac | |
| import json | |
| import logging | |
| import time | |
| from typing import TYPE_CHECKING | |
| import httpx | |
| from fastapi import HTTPException | |
| if TYPE_CHECKING: | |
| from main import AccountConfig | |
| logger = logging.getLogger(__name__) | |
| def urlsafe_b64encode(data: bytes) -> str: | |
| return base64.urlsafe_b64encode(data).decode().rstrip("=") | |
| def kq_encode(s: str) -> str: | |
| b = bytearray() | |
| for ch in s: | |
| v = ord(ch) | |
| if v > 255: | |
| b.append(v & 255) | |
| b.append(v >> 8) | |
| else: | |
| b.append(v) | |
| return urlsafe_b64encode(bytes(b)) | |
| def create_jwt(key_bytes: bytes, key_id: str, csesidx: str) -> str: | |
| now = int(time.time()) | |
| header = {"alg": "HS256", "typ": "JWT", "kid": key_id} | |
| payload = { | |
| "iss": "https://business.gemini.google", | |
| "aud": "https://biz-discoveryengine.googleapis.com", | |
| "sub": f"csesidx/{csesidx}", | |
| "iat": now, | |
| "exp": now + 300, | |
| "nbf": now, | |
| } | |
| header_b64 = kq_encode(json.dumps(header, separators=(",", ":"))) | |
| payload_b64 = kq_encode(json.dumps(payload, separators=(",", ":"))) | |
| message = f"{header_b64}.{payload_b64}" | |
| sig = hmac.new(key_bytes, message.encode(), hashlib.sha256).digest() | |
| return f"{message}.{urlsafe_b64encode(sig)}" | |
| class JWTManager: | |
| """JWT token管理器 | |
| 负责JWT的获取、刷新和缓存 | |
| """ | |
| def __init__(self, config: "AccountConfig", http_client: httpx.AsyncClient, user_agent: str) -> None: | |
| self.config = config | |
| self.http_client = http_client | |
| self.user_agent = user_agent | |
| self.jwt: str = "" | |
| self.expires: float = 0 | |
| self._lock = asyncio.Lock() | |
| async def get(self, request_id: str = "") -> str: | |
| """获取JWT token(自动刷新)""" | |
| async with self._lock: | |
| if time.time() > self.expires: | |
| await self._refresh(request_id) | |
| return self.jwt | |
| async def _refresh(self, request_id: str = "") -> None: | |
| """刷新JWT token""" | |
| cookie = f"__Secure-C_SES={self.config.secure_c_ses}" | |
| if self.config.host_c_oses: | |
| cookie += f"; __Host-C_OSES={self.config.host_c_oses}" | |
| req_tag = f"[req_{request_id}] " if request_id else "" | |
| r = await self.http_client.get( | |
| "https://business.gemini.google/auth/getoxsrf", | |
| params={"csesidx": self.config.csesidx}, | |
| headers={ | |
| "cookie": cookie, | |
| "user-agent": self.user_agent, | |
| "referer": "https://business.gemini.google/" | |
| }, | |
| ) | |
| if r.status_code != 200: | |
| logger.error(f"[AUTH] [{self.config.account_id}] {req_tag}JWT 刷新失败: {r.status_code}") | |
| raise HTTPException(r.status_code, "getoxsrf failed") | |
| txt = r.text[4:] if r.text.startswith(")]}'") else r.text | |
| data = json.loads(txt) | |
| key_bytes = base64.urlsafe_b64decode(data["xsrfToken"] + "==") | |
| self.jwt = create_jwt(key_bytes, data["keyId"], self.config.csesidx) | |
| self.expires = time.time() + 270 | |
| logger.info(f"[AUTH] [{self.config.account_id}] {req_tag}JWT 刷新成功") | |