Spaces:
Runtime error
Runtime error
File size: 2,082 Bytes
6bff6a1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | from fastapi import APIRouter, Depends
from pydantic import BaseModel
from app.core.auth import verify_function_key
from app.core.exceptions import AppException
from app.services.grok.services.voice import VoiceService
from app.services.token.manager import get_token_manager
router = APIRouter()
class VoiceTokenResponse(BaseModel):
token: str
url: str
participant_name: str = ""
room_name: str = ""
@router.get(
"/voice/token",
dependencies=[Depends(verify_function_key)],
response_model=VoiceTokenResponse,
)
async def function_voice_token(
voice: str = "ara",
personality: str = "assistant",
speed: float = 1.0,
):
"""获取 Grok Voice Mode (LiveKit) Token"""
token_mgr = await get_token_manager()
sso_token = None
for pool_name in ("ssoBasic", "ssoSuper"):
sso_token = token_mgr.get_token(pool_name)
if sso_token:
break
if not sso_token:
raise AppException(
"No available tokens for voice mode",
code="no_token",
status_code=503,
)
service = VoiceService()
try:
data = await service.get_token(
token=sso_token,
voice=voice,
personality=personality,
speed=speed,
)
token = data.get("token")
if not token:
raise AppException(
"Upstream returned no voice token",
code="upstream_error",
status_code=502,
)
return VoiceTokenResponse(
token=token,
url="wss://livekit.grok.com",
participant_name="",
room_name="",
)
except Exception as e:
if isinstance(e, AppException):
raise
raise AppException(
f"Voice token error: {str(e)}",
code="voice_error",
status_code=500,
)
@router.get("/verify", dependencies=[Depends(verify_function_key)])
async def function_verify_api():
"""验证 Function Key"""
return {"status": "success"}
|