GENIE / api_routes.py
Tom1986's picture
feat: Implement Genie TTS API with text-to-speech and voice cloning functionalities
9bc318e
"""
API路由定义模块
包含所有API端点的路由定义,保持代码简洁
"""
import os
import asyncio
import logging
from datetime import datetime
from typing import List
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import FileResponse
import tempfile
from .models import *
from .voice_cloning import VoiceCloner, AudioAnalyzer
logger = logging.getLogger(__name__)
# 创建路由器
basic_router = APIRouter(prefix="", tags=["基础功能"])
tts_router = APIRouter(prefix="/tts", tags=["文本转语音"])
voice_clone_router = APIRouter(prefix="/voice-clone", tags=["语音克隆"])
character_router = APIRouter(prefix="/characters", tags=["角色管理"])
# 全局变量(将在主应用中初始化)
tts_engine = None
voice_cloner = None
def set_engines(tts_eng, voice_clone):
"""设置引擎实例(在主应用中调用)"""
global tts_engine, voice_cloner
tts_engine = tts_eng
voice_cloner = voice_clone
# ==================== 基础功能路由 ====================
@basic_router.get("/", response_model=dict)
async def root():
"""根路径 - API信息"""
return {
"name": "Genie TTS API",
"version": "1.0.0",
"description": "高质量日语文本转语音API服务",
"engine": "GPT-SoVITS V2 (ONNX)",
"supported_languages": ["ja"],
"features": ["TTS合成", "语音克隆", "角色管理"],
"docs": "/docs",
"health": "/health"
}
@basic_router.get("/health", response_model=HealthResponse)
async def health_check():
"""健康检查端点"""
global tts_engine, voice_cloner
engine_status = "ready" if tts_engine and tts_engine.genie else "unavailable"
# 获取角色统计信息
from config import AVAILABLE_CHARACTER_NAMES
predefined_count = len(AVAILABLE_CHARACTER_NAMES)
cloned_count = len(voice_cloner.cloned_voices) if voice_cloner else 0
return HealthResponse(
status="healthy",
version="1.0.0",
engine_status=engine_status,
available_characters=AVAILABLE_CHARACTER_NAMES,
predefined_characters=predefined_count,
custom_characters=0, # 暂时不支持自定义模型
cloned_voices=cloned_count,
timestamp=datetime.now().isoformat()
)
# ==================== TTS路由 ====================
@tts_router.post("/synthesize", response_model=TTSResponse)
async def synthesize_text(request: TTSRequest):
"""文本转语音合成端点"""
global tts_engine
if not tts_engine or not tts_engine.genie:
raise HTTPException(status_code=503, detail="TTS引擎不可用")
try:
logger.info(f"TTS合成: '{request.text[:50]}...' 角色: {request.character}")
# 在线程池中执行TTS合成
audio_path = await asyncio.get_event_loop().run_in_executor(
None,
tts_engine.synthesize_speech,
request.text,
request.character
)
if not audio_path:
raise HTTPException(status_code=500, detail="语音合成失败")
# 计算音频时长
try:
duration = await asyncio.get_event_loop().run_in_executor(
None, tts_engine.get_audio_duration, audio_path
)
except:
duration = 0.0
# 生成访问URL
filename = os.path.basename(audio_path)
audio_url = f"/audio/{filename}"
return TTSResponse(
success=True,
message="语音合成成功",
audio_url=audio_url,
duration=duration,
character=request.character,
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"TTS合成失败: {e}")
raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}")
@tts_router.get("/synthesize", response_model=TTSResponse)
async def synthesize_text_get(
text: str = Query(..., description="要转换的文本", min_length=1, max_length=1000),
character: str = Query("misono_mika", description="角色模型名称"),
speed: float = Query(1.0, description="语速倍率", ge=0.5, le=2.0),
format: str = Query("wav", description="输出格式")
):
"""GET方式的文本转语音合成端点"""
request = TTSRequest(text=text, character=character, speed=speed, format=format)
return await synthesize_text(request)
@tts_router.post("/batch-synthesize", response_model=BatchTTSResponse)
async def batch_synthesize(requests: List[TTSRequest]):
"""批量文本转语音合成"""
global tts_engine
if not tts_engine or not tts_engine.genie:
raise HTTPException(status_code=503, detail="TTS引擎不可用")
if len(requests) > 10:
raise HTTPException(status_code=400, detail="批量请求数量不能超过10个")
results = []
for i, request in enumerate(requests):
try:
logger.info(f"批量合成 {i+1}/{len(requests)}: '{request.text[:30]}...'")
audio_path = await asyncio.get_event_loop().run_in_executor(
None, tts_engine.synthesize_speech, request.text, request.character
)
if audio_path:
filename = os.path.basename(audio_path)
try:
duration = await asyncio.get_event_loop().run_in_executor(
None, tts_engine.get_audio_duration, audio_path
)
except:
duration = 0.0
results.append({
"success": True,
"index": i,
"audio_url": f"/audio/{filename}",
"duration": duration,
"character": request.character
})
else:
results.append({"success": False, "index": i, "error": "合成失败"})
except Exception as e:
logger.error(f"批量合成第{i+1}项失败: {e}")
results.append({"success": False, "index": i, "error": str(e)})
return BatchTTSResponse(
success=True,
results=results,
total=len(requests),
completed=sum(1 for r in results if r.get("success")),
timestamp=datetime.now().isoformat()
)
# ==================== 语音克隆路由 ====================
@voice_clone_router.post("/analyze-audio")
async def analyze_reference_audio(audio_url: str = Query(..., description="参考音频文件路径")):
"""分析参考音频的质量和特征"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
if not os.path.exists(audio_url):
raise HTTPException(status_code=400, detail="音频文件路径不存在")
analysis = AudioAnalyzer.analyze_audio(audio_url)
if "error" in analysis:
raise HTTPException(status_code=400, detail=analysis["error"])
return create_success_response("音频分析完成", {"analysis": analysis})
except Exception as e:
logger.error(f"音频分析失败: {e}")
raise HTTPException(status_code=500, detail=f"分析失败: {str(e)}")
@voice_clone_router.post("/create")
async def create_cloned_voice(request: VoiceCloneRequest):
"""创建克隆声音"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
# 检查声音名称是否已存在
if request.voice_name in voice_cloner.cloned_voices:
raise HTTPException(status_code=400, detail=f"声音名称 '{request.voice_name}' 已存在")
if not os.path.exists(request.reference_audio_url):
raise HTTPException(status_code=400, detail="参考音频文件不存在")
# 在线程池中执行克隆操作
success, message = await asyncio.get_event_loop().run_in_executor(
None,
voice_cloner.create_cloned_voice,
request.voice_name,
request.reference_audio_url,
request.reference_text,
request.description
)
if success:
return create_success_response(message, {"voice_name": request.voice_name})
else:
raise HTTPException(status_code=400, detail=message)
except HTTPException:
raise
except Exception as e:
logger.error(f"创建克隆声音失败: {e}")
raise HTTPException(status_code=500, detail=f"创建失败: {str(e)}")
@voice_clone_router.get("/list")
async def list_cloned_voices():
"""获取所有克隆声音列表"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
cloned_voices = voice_cloner.get_cloned_voices_info()
return create_success_response(
f"获取到 {len(cloned_voices)} 个克隆声音",
{"cloned_voices": cloned_voices, "count": len(cloned_voices)}
)
except Exception as e:
logger.error(f"获取克隆声音列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取列表失败: {str(e)}")
@voice_clone_router.post("/synthesize")
async def synthesize_with_cloned_voice(
voice_name: str = Query(..., description="克隆声音名称"),
text: str = Query(..., description="要合成的文本", min_length=1, max_length=1000)
):
"""使用克隆声音进行语音合成"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
if voice_name not in voice_cloner.cloned_voices:
available_voices = list(voice_cloner.cloned_voices.keys())
raise HTTPException(
status_code=404,
detail=f"克隆声音 '{voice_name}' 不存在。可用声音: {available_voices}"
)
try:
logger.info(f"克隆声音合成: '{text[:50]}...' 声音: {voice_name}")
# 在线程池中执行合成
audio_path = await asyncio.get_event_loop().run_in_executor(
None, voice_cloner.synthesize_with_cloned_voice, voice_name, text
)
if not audio_path:
raise HTTPException(status_code=500, detail="克隆声音合成失败")
# 计算音频时长
try:
duration = await asyncio.get_event_loop().run_in_executor(
None, voice_cloner.tts_engine.get_audio_duration, audio_path
)
except:
duration = 0.0
# 生成访问URL
filename = os.path.basename(audio_path)
audio_url = f"/audio/{filename}"
return create_success_response(
"克隆声音合成成功",
{
"audio_url": audio_url,
"duration": duration,
"voice_name": voice_name
}
)
except Exception as e:
logger.error(f"克隆声音合成失败: {e}")
raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}")
@voice_clone_router.delete("/{voice_name}")
async def remove_cloned_voice(voice_name: str):
"""移除克隆声音"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
if voice_cloner.remove_cloned_voice(voice_name):
return create_success_response(f"成功移除克隆声音: {voice_name}")
else:
raise HTTPException(status_code=404, detail=f"克隆声音 '{voice_name}' 不存在")
except HTTPException:
raise
except Exception as e:
logger.error(f"移除克隆声音失败: {e}")
raise HTTPException(status_code=500, detail=f"移除失败: {str(e)}")
# ==================== 角色管理路由 ====================
@character_router.get("/")
async def get_characters():
"""获取所有可用角色信息"""
try:
from config import AVAILABLE_CHARACTERS
characters_info = []
for char_id, char_data in AVAILABLE_CHARACTERS.items():
characters_info.append(char_data)
return create_success_response(
f"获取到 {len(characters_info)} 个预定义角色",
{
"characters": characters_info,
"predefined_count": len(characters_info),
"custom_count": 0, # 暂时不支持
"total_count": len(characters_info)
}
)
except Exception as e:
logger.error(f"获取角色列表失败: {e}")
raise HTTPException(status_code=500, detail="获取角色列表失败")
# ==================== 音频文件路由 ====================
@basic_router.get("/audio/{filename}")
async def get_audio_file(filename: str):
"""获取生成的音频文件"""
try:
# 构建安全的文件路径
audio_dir = tempfile.gettempdir()
file_path = os.path.join(audio_dir, filename)
# 安全检查
if not os.path.exists(file_path) or not filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=404, detail="音频文件不存在")
# 返回音频文件
return FileResponse(
file_path,
media_type="audio/wav" if filename.endswith('.wav') else "audio/mpeg",
filename=filename
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="音频文件不存在")
except Exception as e:
logger.error(f"获取音频文件失败: {e}")
raise HTTPException(status_code=500, detail="获取音频文件失败")