neuapi / api /src /routers /openai_compatible.py
grimshaw's picture
Upload folder using huggingface_hub
35bb6f4 verified
Raw
History Blame Contribute Delete
4.69 kB
from __future__ import annotations
import time
from fastapi import APIRouter, HTTPException
from fastapi.responses import Response, StreamingResponse
from loguru import logger
from api.src.core.model_config import get_backbone_info
from api.src.inference.model_manager import ModelManager
from api.src.inference.voice_manager import VoiceManager
from api.src.services.streaming_audio_writer import get_content_type
from api.src.services.tts_service import TTSService
from api.src.structures.schemas import (
ModelDetailResponse,
ModelInfo,
ModelListResponse,
OpenAISpeechRequest,
VoiceInfo,
VoiceListResponse,
make_error,
)
router = APIRouter(prefix="/v1", tags=["OpenAI Compatible"])
@router.post("/audio/speech")
async def create_speech(request: OpenAISpeechRequest) -> Response:
"""OpenAI-compatible TTS endpoint."""
model_manager = ModelManager.get_instance()
voice_manager = VoiceManager.get_instance()
tts_service = TTSService.get_instance()
# Validate model
if not model_manager.is_loaded(request.model):
raise HTTPException(status_code=400, detail=make_error(
f"Model '{request.model}' is not loaded. "
f"Available: {list(model_manager.loaded_models.keys())}"
))
# Validate voice
if not voice_manager.voice_exists(request.voice):
raise HTTPException(status_code=400, detail=make_error(
f"Voice '{request.voice}' not found. "
f"Available: {list(voice_manager.voices.keys())}"
))
content_type = get_content_type(request.response_format)
try:
if request.stream:
return StreamingResponse(
tts_service.stream_speech(request),
media_type=content_type,
headers={
"Content-Type": content_type,
"Transfer-Encoding": "chunked",
},
)
else:
start = time.perf_counter()
audio_data = await tts_service.generate_speech(request)
elapsed = time.perf_counter() - start
logger.info(
f"TTS: model={request.model} voice={request.voice} "
f"format={request.response_format} chars={len(request.input)} "
f"time={elapsed:.2f}s size={len(audio_data)} bytes"
)
return Response(
content=audio_data,
media_type=content_type,
headers={"Content-Type": content_type},
)
except Exception as e:
logger.error(f"TTS generation failed: {e}")
raise HTTPException(status_code=500, detail=make_error(str(e), "server_error", 500))
@router.get("/audio/voices", response_model=VoiceListResponse)
async def list_voices() -> VoiceListResponse:
"""List available voices."""
voice_manager = VoiceManager.get_instance()
voices = [
VoiceInfo(
voice_id=name,
name=name,
language=info["language"],
gender=info["gender"],
custom=info.get("custom", False),
available=info.get("available", True),
)
for name, info in voice_manager.voices.items()
]
return VoiceListResponse(voices=voices)
@router.get("/models", response_model=ModelListResponse)
async def list_models() -> ModelListResponse:
"""List loaded models (OpenAI-compatible)."""
model_manager = ModelManager.get_instance()
models = []
for model_id, loaded in model_manager.loaded_models.items():
info = get_backbone_info(model_id)
models.append(ModelInfo(
id=model_id,
language=info.language if info else None,
backend=info.backend.value if info else None,
supports_streaming=info.supports_streaming if info else False,
backbone_device=loaded.backbone_device,
codec_device=loaded.codec_device,
))
return ModelListResponse(data=models)
@router.get("/models/{model_id}", response_model=ModelDetailResponse)
async def get_model(model_id: str) -> ModelDetailResponse:
"""Get details about a specific model."""
model_manager = ModelManager.get_instance()
info = get_backbone_info(model_id)
if info is None:
raise HTTPException(status_code=404, detail=make_error(f"Model '{model_id}' not found"))
loaded = model_manager.loaded_models.get(model_id)
return ModelDetailResponse(
id=model_id,
language=info.language,
backend=info.backend.value,
supports_streaming=info.supports_streaming,
loaded=loaded is not None,
codec=loaded.codec_id if loaded else None,
)