|
|
""" |
|
|
API路由定义模块 |
|
|
包含所有API端点的路由定义,保持代码简洁 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import asyncio |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from typing import List |
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Query |
|
|
from fastapi.responses import FileResponse |
|
|
import tempfile |
|
|
|
|
|
from .models import * |
|
|
from .voice_cloning import VoiceCloner, AudioAnalyzer |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
basic_router = APIRouter(prefix="", tags=["基础功能"]) |
|
|
tts_router = APIRouter(prefix="/tts", tags=["文本转语音"]) |
|
|
voice_clone_router = APIRouter(prefix="/voice-clone", tags=["语音克隆"]) |
|
|
character_router = APIRouter(prefix="/characters", tags=["角色管理"]) |
|
|
|
|
|
|
|
|
tts_engine = None |
|
|
voice_cloner = None |
|
|
|
|
|
|
|
|
def set_engines(tts_eng, voice_clone): |
|
|
"""设置引擎实例(在主应用中调用)""" |
|
|
global tts_engine, voice_cloner |
|
|
tts_engine = tts_eng |
|
|
voice_cloner = voice_clone |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@basic_router.get("/", response_model=dict) |
|
|
async def root(): |
|
|
"""根路径 - API信息""" |
|
|
return { |
|
|
"name": "Genie TTS API", |
|
|
"version": "1.0.0", |
|
|
"description": "高质量日语文本转语音API服务", |
|
|
"engine": "GPT-SoVITS V2 (ONNX)", |
|
|
"supported_languages": ["ja"], |
|
|
"features": ["TTS合成", "语音克隆", "角色管理"], |
|
|
"docs": "/docs", |
|
|
"health": "/health" |
|
|
} |
|
|
|
|
|
|
|
|
@basic_router.get("/health", response_model=HealthResponse) |
|
|
async def health_check(): |
|
|
"""健康检查端点""" |
|
|
global tts_engine, voice_cloner |
|
|
|
|
|
engine_status = "ready" if tts_engine and tts_engine.genie else "unavailable" |
|
|
|
|
|
|
|
|
from config import AVAILABLE_CHARACTER_NAMES |
|
|
predefined_count = len(AVAILABLE_CHARACTER_NAMES) |
|
|
cloned_count = len(voice_cloner.cloned_voices) if voice_cloner else 0 |
|
|
|
|
|
return HealthResponse( |
|
|
status="healthy", |
|
|
version="1.0.0", |
|
|
engine_status=engine_status, |
|
|
available_characters=AVAILABLE_CHARACTER_NAMES, |
|
|
predefined_characters=predefined_count, |
|
|
custom_characters=0, |
|
|
cloned_voices=cloned_count, |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tts_router.post("/synthesize", response_model=TTSResponse) |
|
|
async def synthesize_text(request: TTSRequest): |
|
|
"""文本转语音合成端点""" |
|
|
global tts_engine |
|
|
|
|
|
if not tts_engine or not tts_engine.genie: |
|
|
raise HTTPException(status_code=503, detail="TTS引擎不可用") |
|
|
|
|
|
try: |
|
|
logger.info(f"TTS合成: '{request.text[:50]}...' 角色: {request.character}") |
|
|
|
|
|
|
|
|
audio_path = await asyncio.get_event_loop().run_in_executor( |
|
|
None, |
|
|
tts_engine.synthesize_speech, |
|
|
request.text, |
|
|
request.character |
|
|
) |
|
|
|
|
|
if not audio_path: |
|
|
raise HTTPException(status_code=500, detail="语音合成失败") |
|
|
|
|
|
|
|
|
try: |
|
|
duration = await asyncio.get_event_loop().run_in_executor( |
|
|
None, tts_engine.get_audio_duration, audio_path |
|
|
) |
|
|
except: |
|
|
duration = 0.0 |
|
|
|
|
|
|
|
|
filename = os.path.basename(audio_path) |
|
|
audio_url = f"/audio/{filename}" |
|
|
|
|
|
return TTSResponse( |
|
|
success=True, |
|
|
message="语音合成成功", |
|
|
audio_url=audio_url, |
|
|
duration=duration, |
|
|
character=request.character, |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"TTS合成失败: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}") |
|
|
|
|
|
|
|
|
@tts_router.get("/synthesize", response_model=TTSResponse) |
|
|
async def synthesize_text_get( |
|
|
text: str = Query(..., description="要转换的文本", min_length=1, max_length=1000), |
|
|
character: str = Query("misono_mika", description="角色模型名称"), |
|
|
speed: float = Query(1.0, description="语速倍率", ge=0.5, le=2.0), |
|
|
format: str = Query("wav", description="输出格式") |
|
|
): |
|
|
"""GET方式的文本转语音合成端点""" |
|
|
request = TTSRequest(text=text, character=character, speed=speed, format=format) |
|
|
return await synthesize_text(request) |
|
|
|
|
|
|
|
|
@tts_router.post("/batch-synthesize", response_model=BatchTTSResponse) |
|
|
async def batch_synthesize(requests: List[TTSRequest]): |
|
|
"""批量文本转语音合成""" |
|
|
global tts_engine |
|
|
|
|
|
if not tts_engine or not tts_engine.genie: |
|
|
raise HTTPException(status_code=503, detail="TTS引擎不可用") |
|
|
|
|
|
if len(requests) > 10: |
|
|
raise HTTPException(status_code=400, detail="批量请求数量不能超过10个") |
|
|
|
|
|
results = [] |
|
|
for i, request in enumerate(requests): |
|
|
try: |
|
|
logger.info(f"批量合成 {i+1}/{len(requests)}: '{request.text[:30]}...'") |
|
|
|
|
|
audio_path = await asyncio.get_event_loop().run_in_executor( |
|
|
None, tts_engine.synthesize_speech, request.text, request.character |
|
|
) |
|
|
|
|
|
if audio_path: |
|
|
filename = os.path.basename(audio_path) |
|
|
try: |
|
|
duration = await asyncio.get_event_loop().run_in_executor( |
|
|
None, tts_engine.get_audio_duration, audio_path |
|
|
) |
|
|
except: |
|
|
duration = 0.0 |
|
|
|
|
|
results.append({ |
|
|
"success": True, |
|
|
"index": i, |
|
|
"audio_url": f"/audio/{filename}", |
|
|
"duration": duration, |
|
|
"character": request.character |
|
|
}) |
|
|
else: |
|
|
results.append({"success": False, "index": i, "error": "合成失败"}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"批量合成第{i+1}项失败: {e}") |
|
|
results.append({"success": False, "index": i, "error": str(e)}) |
|
|
|
|
|
return BatchTTSResponse( |
|
|
success=True, |
|
|
results=results, |
|
|
total=len(requests), |
|
|
completed=sum(1 for r in results if r.get("success")), |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@voice_clone_router.post("/analyze-audio") |
|
|
async def analyze_reference_audio(audio_url: str = Query(..., description="参考音频文件路径")): |
|
|
"""分析参考音频的质量和特征""" |
|
|
global voice_cloner |
|
|
|
|
|
if not voice_cloner: |
|
|
raise HTTPException(status_code=503, detail="语音克隆服务不可用") |
|
|
|
|
|
try: |
|
|
if not os.path.exists(audio_url): |
|
|
raise HTTPException(status_code=400, detail="音频文件路径不存在") |
|
|
|
|
|
analysis = AudioAnalyzer.analyze_audio(audio_url) |
|
|
|
|
|
if "error" in analysis: |
|
|
raise HTTPException(status_code=400, detail=analysis["error"]) |
|
|
|
|
|
return create_success_response("音频分析完成", {"analysis": analysis}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"音频分析失败: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"分析失败: {str(e)}") |
|
|
|
|
|
|
|
|
@voice_clone_router.post("/create") |
|
|
async def create_cloned_voice(request: VoiceCloneRequest): |
|
|
"""创建克隆声音""" |
|
|
global voice_cloner |
|
|
|
|
|
if not voice_cloner: |
|
|
raise HTTPException(status_code=503, detail="语音克隆服务不可用") |
|
|
|
|
|
try: |
|
|
|
|
|
if request.voice_name in voice_cloner.cloned_voices: |
|
|
raise HTTPException(status_code=400, detail=f"声音名称 '{request.voice_name}' 已存在") |
|
|
|
|
|
if not os.path.exists(request.reference_audio_url): |
|
|
raise HTTPException(status_code=400, detail="参考音频文件不存在") |
|
|
|
|
|
|
|
|
success, message = await asyncio.get_event_loop().run_in_executor( |
|
|
None, |
|
|
voice_cloner.create_cloned_voice, |
|
|
request.voice_name, |
|
|
request.reference_audio_url, |
|
|
request.reference_text, |
|
|
request.description |
|
|
) |
|
|
|
|
|
if success: |
|
|
return create_success_response(message, {"voice_name": request.voice_name}) |
|
|
else: |
|
|
raise HTTPException(status_code=400, detail=message) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"创建克隆声音失败: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"创建失败: {str(e)}") |
|
|
|
|
|
|
|
|
@voice_clone_router.get("/list") |
|
|
async def list_cloned_voices(): |
|
|
"""获取所有克隆声音列表""" |
|
|
global voice_cloner |
|
|
|
|
|
if not voice_cloner: |
|
|
raise HTTPException(status_code=503, detail="语音克隆服务不可用") |
|
|
|
|
|
try: |
|
|
cloned_voices = voice_cloner.get_cloned_voices_info() |
|
|
return create_success_response( |
|
|
f"获取到 {len(cloned_voices)} 个克隆声音", |
|
|
{"cloned_voices": cloned_voices, "count": len(cloned_voices)} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"获取克隆声音列表失败: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"获取列表失败: {str(e)}") |
|
|
|
|
|
|
|
|
@voice_clone_router.post("/synthesize") |
|
|
async def synthesize_with_cloned_voice( |
|
|
voice_name: str = Query(..., description="克隆声音名称"), |
|
|
text: str = Query(..., description="要合成的文本", min_length=1, max_length=1000) |
|
|
): |
|
|
"""使用克隆声音进行语音合成""" |
|
|
global voice_cloner |
|
|
|
|
|
if not voice_cloner: |
|
|
raise HTTPException(status_code=503, detail="语音克隆服务不可用") |
|
|
|
|
|
if voice_name not in voice_cloner.cloned_voices: |
|
|
available_voices = list(voice_cloner.cloned_voices.keys()) |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"克隆声音 '{voice_name}' 不存在。可用声音: {available_voices}" |
|
|
) |
|
|
|
|
|
try: |
|
|
logger.info(f"克隆声音合成: '{text[:50]}...' 声音: {voice_name}") |
|
|
|
|
|
|
|
|
audio_path = await asyncio.get_event_loop().run_in_executor( |
|
|
None, voice_cloner.synthesize_with_cloned_voice, voice_name, text |
|
|
) |
|
|
|
|
|
if not audio_path: |
|
|
raise HTTPException(status_code=500, detail="克隆声音合成失败") |
|
|
|
|
|
|
|
|
try: |
|
|
duration = await asyncio.get_event_loop().run_in_executor( |
|
|
None, voice_cloner.tts_engine.get_audio_duration, audio_path |
|
|
) |
|
|
except: |
|
|
duration = 0.0 |
|
|
|
|
|
|
|
|
filename = os.path.basename(audio_path) |
|
|
audio_url = f"/audio/{filename}" |
|
|
|
|
|
return create_success_response( |
|
|
"克隆声音合成成功", |
|
|
{ |
|
|
"audio_url": audio_url, |
|
|
"duration": duration, |
|
|
"voice_name": voice_name |
|
|
} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"克隆声音合成失败: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}") |
|
|
|
|
|
|
|
|
@voice_clone_router.delete("/{voice_name}") |
|
|
async def remove_cloned_voice(voice_name: str): |
|
|
"""移除克隆声音""" |
|
|
global voice_cloner |
|
|
|
|
|
if not voice_cloner: |
|
|
raise HTTPException(status_code=503, detail="语音克隆服务不可用") |
|
|
|
|
|
try: |
|
|
if voice_cloner.remove_cloned_voice(voice_name): |
|
|
return create_success_response(f"成功移除克隆声音: {voice_name}") |
|
|
else: |
|
|
raise HTTPException(status_code=404, detail=f"克隆声音 '{voice_name}' 不存在") |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"移除克隆声音失败: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"移除失败: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@character_router.get("/") |
|
|
async def get_characters(): |
|
|
"""获取所有可用角色信息""" |
|
|
try: |
|
|
from config import AVAILABLE_CHARACTERS |
|
|
|
|
|
characters_info = [] |
|
|
for char_id, char_data in AVAILABLE_CHARACTERS.items(): |
|
|
characters_info.append(char_data) |
|
|
|
|
|
return create_success_response( |
|
|
f"获取到 {len(characters_info)} 个预定义角色", |
|
|
{ |
|
|
"characters": characters_info, |
|
|
"predefined_count": len(characters_info), |
|
|
"custom_count": 0, |
|
|
"total_count": len(characters_info) |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"获取角色列表失败: {e}") |
|
|
raise HTTPException(status_code=500, detail="获取角色列表失败") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@basic_router.get("/audio/{filename}") |
|
|
async def get_audio_file(filename: str): |
|
|
"""获取生成的音频文件""" |
|
|
try: |
|
|
|
|
|
audio_dir = tempfile.gettempdir() |
|
|
file_path = os.path.join(audio_dir, filename) |
|
|
|
|
|
|
|
|
if not os.path.exists(file_path) or not filename.endswith(('.wav', '.mp3')): |
|
|
raise HTTPException(status_code=404, detail="音频文件不存在") |
|
|
|
|
|
|
|
|
return FileResponse( |
|
|
file_path, |
|
|
media_type="audio/wav" if filename.endswith('.wav') else "audio/mpeg", |
|
|
filename=filename |
|
|
) |
|
|
|
|
|
except FileNotFoundError: |
|
|
raise HTTPException(status_code=404, detail="音频文件不存在") |
|
|
except Exception as e: |
|
|
logger.error(f"获取音频文件失败: {e}") |
|
|
raise HTTPException(status_code=500, detail="获取音频文件失败") |