grok2api-hf / app /api /v1 /function /voice.py
Codex
Add root Dockerfile for HF Space build
6bff6a1
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from app.core.auth import verify_function_key
from app.core.exceptions import AppException
from app.services.grok.services.voice import VoiceService
from app.services.token.manager import get_token_manager
router = APIRouter()
class VoiceTokenResponse(BaseModel):
token: str
url: str
participant_name: str = ""
room_name: str = ""
@router.get(
"/voice/token",
dependencies=[Depends(verify_function_key)],
response_model=VoiceTokenResponse,
)
async def function_voice_token(
voice: str = "ara",
personality: str = "assistant",
speed: float = 1.0,
):
"""获取 Grok Voice Mode (LiveKit) Token"""
token_mgr = await get_token_manager()
sso_token = None
for pool_name in ("ssoBasic", "ssoSuper"):
sso_token = token_mgr.get_token(pool_name)
if sso_token:
break
if not sso_token:
raise AppException(
"No available tokens for voice mode",
code="no_token",
status_code=503,
)
service = VoiceService()
try:
data = await service.get_token(
token=sso_token,
voice=voice,
personality=personality,
speed=speed,
)
token = data.get("token")
if not token:
raise AppException(
"Upstream returned no voice token",
code="upstream_error",
status_code=502,
)
return VoiceTokenResponse(
token=token,
url="wss://livekit.grok.com",
participant_name="",
room_name="",
)
except Exception as e:
if isinstance(e, AppException):
raise
raise AppException(
f"Voice token error: {str(e)}",
code="voice_error",
status_code=500,
)
@router.get("/verify", dependencies=[Depends(verify_function_key)])
async def function_verify_api():
"""验证 Function Key"""
return {"status": "success"}