File size: 14,074 Bytes
9bc318e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
"""
API路由定义模块
包含所有API端点的路由定义,保持代码简洁
"""
import os
import asyncio
import logging
from datetime import datetime
from typing import List
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import FileResponse
import tempfile
from .models import *
from .voice_cloning import VoiceCloner, AudioAnalyzer
logger = logging.getLogger(__name__)
# 创建路由器
basic_router = APIRouter(prefix="", tags=["基础功能"])
tts_router = APIRouter(prefix="/tts", tags=["文本转语音"])
voice_clone_router = APIRouter(prefix="/voice-clone", tags=["语音克隆"])
character_router = APIRouter(prefix="/characters", tags=["角色管理"])
# 全局变量(将在主应用中初始化)
tts_engine = None
voice_cloner = None
def set_engines(tts_eng, voice_clone):
"""设置引擎实例(在主应用中调用)"""
global tts_engine, voice_cloner
tts_engine = tts_eng
voice_cloner = voice_clone
# ==================== 基础功能路由 ====================
@basic_router.get("/", response_model=dict)
async def root():
"""根路径 - API信息"""
return {
"name": "Genie TTS API",
"version": "1.0.0",
"description": "高质量日语文本转语音API服务",
"engine": "GPT-SoVITS V2 (ONNX)",
"supported_languages": ["ja"],
"features": ["TTS合成", "语音克隆", "角色管理"],
"docs": "/docs",
"health": "/health"
}
@basic_router.get("/health", response_model=HealthResponse)
async def health_check():
"""健康检查端点"""
global tts_engine, voice_cloner
engine_status = "ready" if tts_engine and tts_engine.genie else "unavailable"
# 获取角色统计信息
from config import AVAILABLE_CHARACTER_NAMES
predefined_count = len(AVAILABLE_CHARACTER_NAMES)
cloned_count = len(voice_cloner.cloned_voices) if voice_cloner else 0
return HealthResponse(
status="healthy",
version="1.0.0",
engine_status=engine_status,
available_characters=AVAILABLE_CHARACTER_NAMES,
predefined_characters=predefined_count,
custom_characters=0, # 暂时不支持自定义模型
cloned_voices=cloned_count,
timestamp=datetime.now().isoformat()
)
# ==================== TTS路由 ====================
@tts_router.post("/synthesize", response_model=TTSResponse)
async def synthesize_text(request: TTSRequest):
"""文本转语音合成端点"""
global tts_engine
if not tts_engine or not tts_engine.genie:
raise HTTPException(status_code=503, detail="TTS引擎不可用")
try:
logger.info(f"TTS合成: '{request.text[:50]}...' 角色: {request.character}")
# 在线程池中执行TTS合成
audio_path = await asyncio.get_event_loop().run_in_executor(
None,
tts_engine.synthesize_speech,
request.text,
request.character
)
if not audio_path:
raise HTTPException(status_code=500, detail="语音合成失败")
# 计算音频时长
try:
duration = await asyncio.get_event_loop().run_in_executor(
None, tts_engine.get_audio_duration, audio_path
)
except:
duration = 0.0
# 生成访问URL
filename = os.path.basename(audio_path)
audio_url = f"/audio/{filename}"
return TTSResponse(
success=True,
message="语音合成成功",
audio_url=audio_url,
duration=duration,
character=request.character,
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"TTS合成失败: {e}")
raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}")
@tts_router.get("/synthesize", response_model=TTSResponse)
async def synthesize_text_get(
text: str = Query(..., description="要转换的文本", min_length=1, max_length=1000),
character: str = Query("misono_mika", description="角色模型名称"),
speed: float = Query(1.0, description="语速倍率", ge=0.5, le=2.0),
format: str = Query("wav", description="输出格式")
):
"""GET方式的文本转语音合成端点"""
request = TTSRequest(text=text, character=character, speed=speed, format=format)
return await synthesize_text(request)
@tts_router.post("/batch-synthesize", response_model=BatchTTSResponse)
async def batch_synthesize(requests: List[TTSRequest]):
"""批量文本转语音合成"""
global tts_engine
if not tts_engine or not tts_engine.genie:
raise HTTPException(status_code=503, detail="TTS引擎不可用")
if len(requests) > 10:
raise HTTPException(status_code=400, detail="批量请求数量不能超过10个")
results = []
for i, request in enumerate(requests):
try:
logger.info(f"批量合成 {i+1}/{len(requests)}: '{request.text[:30]}...'")
audio_path = await asyncio.get_event_loop().run_in_executor(
None, tts_engine.synthesize_speech, request.text, request.character
)
if audio_path:
filename = os.path.basename(audio_path)
try:
duration = await asyncio.get_event_loop().run_in_executor(
None, tts_engine.get_audio_duration, audio_path
)
except:
duration = 0.0
results.append({
"success": True,
"index": i,
"audio_url": f"/audio/{filename}",
"duration": duration,
"character": request.character
})
else:
results.append({"success": False, "index": i, "error": "合成失败"})
except Exception as e:
logger.error(f"批量合成第{i+1}项失败: {e}")
results.append({"success": False, "index": i, "error": str(e)})
return BatchTTSResponse(
success=True,
results=results,
total=len(requests),
completed=sum(1 for r in results if r.get("success")),
timestamp=datetime.now().isoformat()
)
# ==================== 语音克隆路由 ====================
@voice_clone_router.post("/analyze-audio")
async def analyze_reference_audio(audio_url: str = Query(..., description="参考音频文件路径")):
"""分析参考音频的质量和特征"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
if not os.path.exists(audio_url):
raise HTTPException(status_code=400, detail="音频文件路径不存在")
analysis = AudioAnalyzer.analyze_audio(audio_url)
if "error" in analysis:
raise HTTPException(status_code=400, detail=analysis["error"])
return create_success_response("音频分析完成", {"analysis": analysis})
except Exception as e:
logger.error(f"音频分析失败: {e}")
raise HTTPException(status_code=500, detail=f"分析失败: {str(e)}")
@voice_clone_router.post("/create")
async def create_cloned_voice(request: VoiceCloneRequest):
"""创建克隆声音"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
# 检查声音名称是否已存在
if request.voice_name in voice_cloner.cloned_voices:
raise HTTPException(status_code=400, detail=f"声音名称 '{request.voice_name}' 已存在")
if not os.path.exists(request.reference_audio_url):
raise HTTPException(status_code=400, detail="参考音频文件不存在")
# 在线程池中执行克隆操作
success, message = await asyncio.get_event_loop().run_in_executor(
None,
voice_cloner.create_cloned_voice,
request.voice_name,
request.reference_audio_url,
request.reference_text,
request.description
)
if success:
return create_success_response(message, {"voice_name": request.voice_name})
else:
raise HTTPException(status_code=400, detail=message)
except HTTPException:
raise
except Exception as e:
logger.error(f"创建克隆声音失败: {e}")
raise HTTPException(status_code=500, detail=f"创建失败: {str(e)}")
@voice_clone_router.get("/list")
async def list_cloned_voices():
"""获取所有克隆声音列表"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
cloned_voices = voice_cloner.get_cloned_voices_info()
return create_success_response(
f"获取到 {len(cloned_voices)} 个克隆声音",
{"cloned_voices": cloned_voices, "count": len(cloned_voices)}
)
except Exception as e:
logger.error(f"获取克隆声音列表失败: {e}")
raise HTTPException(status_code=500, detail=f"获取列表失败: {str(e)}")
@voice_clone_router.post("/synthesize")
async def synthesize_with_cloned_voice(
voice_name: str = Query(..., description="克隆声音名称"),
text: str = Query(..., description="要合成的文本", min_length=1, max_length=1000)
):
"""使用克隆声音进行语音合成"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
if voice_name not in voice_cloner.cloned_voices:
available_voices = list(voice_cloner.cloned_voices.keys())
raise HTTPException(
status_code=404,
detail=f"克隆声音 '{voice_name}' 不存在。可用声音: {available_voices}"
)
try:
logger.info(f"克隆声音合成: '{text[:50]}...' 声音: {voice_name}")
# 在线程池中执行合成
audio_path = await asyncio.get_event_loop().run_in_executor(
None, voice_cloner.synthesize_with_cloned_voice, voice_name, text
)
if not audio_path:
raise HTTPException(status_code=500, detail="克隆声音合成失败")
# 计算音频时长
try:
duration = await asyncio.get_event_loop().run_in_executor(
None, voice_cloner.tts_engine.get_audio_duration, audio_path
)
except:
duration = 0.0
# 生成访问URL
filename = os.path.basename(audio_path)
audio_url = f"/audio/{filename}"
return create_success_response(
"克隆声音合成成功",
{
"audio_url": audio_url,
"duration": duration,
"voice_name": voice_name
}
)
except Exception as e:
logger.error(f"克隆声音合成失败: {e}")
raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}")
@voice_clone_router.delete("/{voice_name}")
async def remove_cloned_voice(voice_name: str):
"""移除克隆声音"""
global voice_cloner
if not voice_cloner:
raise HTTPException(status_code=503, detail="语音克隆服务不可用")
try:
if voice_cloner.remove_cloned_voice(voice_name):
return create_success_response(f"成功移除克隆声音: {voice_name}")
else:
raise HTTPException(status_code=404, detail=f"克隆声音 '{voice_name}' 不存在")
except HTTPException:
raise
except Exception as e:
logger.error(f"移除克隆声音失败: {e}")
raise HTTPException(status_code=500, detail=f"移除失败: {str(e)}")
# ==================== 角色管理路由 ====================
@character_router.get("/")
async def get_characters():
"""获取所有可用角色信息"""
try:
from config import AVAILABLE_CHARACTERS
characters_info = []
for char_id, char_data in AVAILABLE_CHARACTERS.items():
characters_info.append(char_data)
return create_success_response(
f"获取到 {len(characters_info)} 个预定义角色",
{
"characters": characters_info,
"predefined_count": len(characters_info),
"custom_count": 0, # 暂时不支持
"total_count": len(characters_info)
}
)
except Exception as e:
logger.error(f"获取角色列表失败: {e}")
raise HTTPException(status_code=500, detail="获取角色列表失败")
# ==================== 音频文件路由 ====================
@basic_router.get("/audio/{filename}")
async def get_audio_file(filename: str):
"""获取生成的音频文件"""
try:
# 构建安全的文件路径
audio_dir = tempfile.gettempdir()
file_path = os.path.join(audio_dir, filename)
# 安全检查
if not os.path.exists(file_path) or not filename.endswith(('.wav', '.mp3')):
raise HTTPException(status_code=404, detail="音频文件不存在")
# 返回音频文件
return FileResponse(
file_path,
media_type="audio/wav" if filename.endswith('.wav') else "audio/mpeg",
filename=filename
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="音频文件不存在")
except Exception as e:
logger.error(f"获取音频文件失败: {e}")
raise HTTPException(status_code=500, detail="获取音频文件失败") |