Spaces:
Sleeping
Sleeping
| """ | |
| API路由定义模块 | |
| 包含所有API端点的路由定义,保持代码简洁 | |
| """ | |
| import os | |
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| from typing import List | |
| from fastapi import APIRouter, HTTPException, Query | |
| from fastapi.responses import FileResponse | |
| import tempfile | |
| from .models import * | |
| from .voice_cloning import VoiceCloner, AudioAnalyzer | |
| logger = logging.getLogger(__name__) | |
| # 创建路由器 | |
| basic_router = APIRouter(prefix="", tags=["基础功能"]) | |
| tts_router = APIRouter(prefix="/tts", tags=["文本转语音"]) | |
| voice_clone_router = APIRouter(prefix="/voice-clone", tags=["语音克隆"]) | |
| character_router = APIRouter(prefix="/characters", tags=["角色管理"]) | |
| # 全局变量(将在主应用中初始化) | |
| tts_engine = None | |
| voice_cloner = None | |
| def set_engines(tts_eng, voice_clone): | |
| """设置引擎实例(在主应用中调用)""" | |
| global tts_engine, voice_cloner | |
| tts_engine = tts_eng | |
| voice_cloner = voice_clone | |
| # ==================== 基础功能路由 ==================== | |
| async def root(): | |
| """根路径 - API信息""" | |
| return { | |
| "name": "Genie TTS API", | |
| "version": "1.0.0", | |
| "description": "高质量日语文本转语音API服务", | |
| "engine": "GPT-SoVITS V2 (ONNX)", | |
| "supported_languages": ["ja"], | |
| "features": ["TTS合成", "语音克隆", "角色管理"], | |
| "docs": "/docs", | |
| "health": "/health" | |
| } | |
| async def health_check(): | |
| """健康检查端点""" | |
| global tts_engine, voice_cloner | |
| engine_status = "ready" if tts_engine and tts_engine.genie else "unavailable" | |
| # 获取角色统计信息 | |
| from config import AVAILABLE_CHARACTER_NAMES | |
| predefined_count = len(AVAILABLE_CHARACTER_NAMES) | |
| cloned_count = len(voice_cloner.cloned_voices) if voice_cloner else 0 | |
| return HealthResponse( | |
| status="healthy", | |
| version="1.0.0", | |
| engine_status=engine_status, | |
| available_characters=AVAILABLE_CHARACTER_NAMES, | |
| predefined_characters=predefined_count, | |
| custom_characters=0, # 暂时不支持自定义模型 | |
| cloned_voices=cloned_count, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| # ==================== TTS路由 ==================== | |
| async def synthesize_text(request: TTSRequest): | |
| """文本转语音合成端点""" | |
| global tts_engine | |
| if not tts_engine or not tts_engine.genie: | |
| raise HTTPException(status_code=503, detail="TTS引擎不可用") | |
| try: | |
| logger.info(f"TTS合成: '{request.text[:50]}...' 角色: {request.character}") | |
| # 在线程池中执行TTS合成 | |
| audio_path = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| tts_engine.synthesize_speech, | |
| request.text, | |
| request.character | |
| ) | |
| if not audio_path: | |
| raise HTTPException(status_code=500, detail="语音合成失败") | |
| # 计算音频时长 | |
| try: | |
| duration = await asyncio.get_event_loop().run_in_executor( | |
| None, tts_engine.get_audio_duration, audio_path | |
| ) | |
| except: | |
| duration = 0.0 | |
| # 生成访问URL | |
| filename = os.path.basename(audio_path) | |
| audio_url = f"/audio/{filename}" | |
| return TTSResponse( | |
| success=True, | |
| message="语音合成成功", | |
| audio_url=audio_url, | |
| duration=duration, | |
| character=request.character, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| except Exception as e: | |
| logger.error(f"TTS合成失败: {e}") | |
| raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}") | |
| async def synthesize_text_get( | |
| text: str = Query(..., description="要转换的文本", min_length=1, max_length=1000), | |
| character: str = Query("misono_mika", description="角色模型名称"), | |
| speed: float = Query(1.0, description="语速倍率", ge=0.5, le=2.0), | |
| format: str = Query("wav", description="输出格式") | |
| ): | |
| """GET方式的文本转语音合成端点""" | |
| request = TTSRequest(text=text, character=character, speed=speed, format=format) | |
| return await synthesize_text(request) | |
| async def batch_synthesize(requests: List[TTSRequest]): | |
| """批量文本转语音合成""" | |
| global tts_engine | |
| if not tts_engine or not tts_engine.genie: | |
| raise HTTPException(status_code=503, detail="TTS引擎不可用") | |
| if len(requests) > 10: | |
| raise HTTPException(status_code=400, detail="批量请求数量不能超过10个") | |
| results = [] | |
| for i, request in enumerate(requests): | |
| try: | |
| logger.info(f"批量合成 {i+1}/{len(requests)}: '{request.text[:30]}...'") | |
| audio_path = await asyncio.get_event_loop().run_in_executor( | |
| None, tts_engine.synthesize_speech, request.text, request.character | |
| ) | |
| if audio_path: | |
| filename = os.path.basename(audio_path) | |
| try: | |
| duration = await asyncio.get_event_loop().run_in_executor( | |
| None, tts_engine.get_audio_duration, audio_path | |
| ) | |
| except: | |
| duration = 0.0 | |
| results.append({ | |
| "success": True, | |
| "index": i, | |
| "audio_url": f"/audio/{filename}", | |
| "duration": duration, | |
| "character": request.character | |
| }) | |
| else: | |
| results.append({"success": False, "index": i, "error": "合成失败"}) | |
| except Exception as e: | |
| logger.error(f"批量合成第{i+1}项失败: {e}") | |
| results.append({"success": False, "index": i, "error": str(e)}) | |
| return BatchTTSResponse( | |
| success=True, | |
| results=results, | |
| total=len(requests), | |
| completed=sum(1 for r in results if r.get("success")), | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| # ==================== 语音克隆路由 ==================== | |
| async def analyze_reference_audio(audio_url: str = Query(..., description="参考音频文件路径")): | |
| """分析参考音频的质量和特征""" | |
| global voice_cloner | |
| if not voice_cloner: | |
| raise HTTPException(status_code=503, detail="语音克隆服务不可用") | |
| try: | |
| if not os.path.exists(audio_url): | |
| raise HTTPException(status_code=400, detail="音频文件路径不存在") | |
| analysis = AudioAnalyzer.analyze_audio(audio_url) | |
| if "error" in analysis: | |
| raise HTTPException(status_code=400, detail=analysis["error"]) | |
| return create_success_response("音频分析完成", {"analysis": analysis}) | |
| except Exception as e: | |
| logger.error(f"音频分析失败: {e}") | |
| raise HTTPException(status_code=500, detail=f"分析失败: {str(e)}") | |
| async def create_cloned_voice(request: VoiceCloneRequest): | |
| """创建克隆声音""" | |
| global voice_cloner | |
| if not voice_cloner: | |
| raise HTTPException(status_code=503, detail="语音克隆服务不可用") | |
| try: | |
| # 检查声音名称是否已存在 | |
| if request.voice_name in voice_cloner.cloned_voices: | |
| raise HTTPException(status_code=400, detail=f"声音名称 '{request.voice_name}' 已存在") | |
| if not os.path.exists(request.reference_audio_url): | |
| raise HTTPException(status_code=400, detail="参考音频文件不存在") | |
| # 在线程池中执行克隆操作 | |
| success, message = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| voice_cloner.create_cloned_voice, | |
| request.voice_name, | |
| request.reference_audio_url, | |
| request.reference_text, | |
| request.description | |
| ) | |
| if success: | |
| return create_success_response(message, {"voice_name": request.voice_name}) | |
| else: | |
| raise HTTPException(status_code=400, detail=message) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"创建克隆声音失败: {e}") | |
| raise HTTPException(status_code=500, detail=f"创建失败: {str(e)}") | |
| async def list_cloned_voices(): | |
| """获取所有克隆声音列表""" | |
| global voice_cloner | |
| if not voice_cloner: | |
| raise HTTPException(status_code=503, detail="语音克隆服务不可用") | |
| try: | |
| cloned_voices = voice_cloner.get_cloned_voices_info() | |
| return create_success_response( | |
| f"获取到 {len(cloned_voices)} 个克隆声音", | |
| {"cloned_voices": cloned_voices, "count": len(cloned_voices)} | |
| ) | |
| except Exception as e: | |
| logger.error(f"获取克隆声音列表失败: {e}") | |
| raise HTTPException(status_code=500, detail=f"获取列表失败: {str(e)}") | |
| async def synthesize_with_cloned_voice( | |
| voice_name: str = Query(..., description="克隆声音名称"), | |
| text: str = Query(..., description="要合成的文本", min_length=1, max_length=1000) | |
| ): | |
| """使用克隆声音进行语音合成""" | |
| global voice_cloner | |
| if not voice_cloner: | |
| raise HTTPException(status_code=503, detail="语音克隆服务不可用") | |
| if voice_name not in voice_cloner.cloned_voices: | |
| available_voices = list(voice_cloner.cloned_voices.keys()) | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"克隆声音 '{voice_name}' 不存在。可用声音: {available_voices}" | |
| ) | |
| try: | |
| logger.info(f"克隆声音合成: '{text[:50]}...' 声音: {voice_name}") | |
| # 在线程池中执行合成 | |
| audio_path = await asyncio.get_event_loop().run_in_executor( | |
| None, voice_cloner.synthesize_with_cloned_voice, voice_name, text | |
| ) | |
| if not audio_path: | |
| raise HTTPException(status_code=500, detail="克隆声音合成失败") | |
| # 计算音频时长 | |
| try: | |
| duration = await asyncio.get_event_loop().run_in_executor( | |
| None, voice_cloner.tts_engine.get_audio_duration, audio_path | |
| ) | |
| except: | |
| duration = 0.0 | |
| # 生成访问URL | |
| filename = os.path.basename(audio_path) | |
| audio_url = f"/audio/{filename}" | |
| return create_success_response( | |
| "克隆声音合成成功", | |
| { | |
| "audio_url": audio_url, | |
| "duration": duration, | |
| "voice_name": voice_name | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"克隆声音合成失败: {e}") | |
| raise HTTPException(status_code=500, detail=f"合成失败: {str(e)}") | |
| async def remove_cloned_voice(voice_name: str): | |
| """移除克隆声音""" | |
| global voice_cloner | |
| if not voice_cloner: | |
| raise HTTPException(status_code=503, detail="语音克隆服务不可用") | |
| try: | |
| if voice_cloner.remove_cloned_voice(voice_name): | |
| return create_success_response(f"成功移除克隆声音: {voice_name}") | |
| else: | |
| raise HTTPException(status_code=404, detail=f"克隆声音 '{voice_name}' 不存在") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"移除克隆声音失败: {e}") | |
| raise HTTPException(status_code=500, detail=f"移除失败: {str(e)}") | |
| # ==================== 角色管理路由 ==================== | |
| async def get_characters(): | |
| """获取所有可用角色信息""" | |
| try: | |
| from config import AVAILABLE_CHARACTERS | |
| characters_info = [] | |
| for char_id, char_data in AVAILABLE_CHARACTERS.items(): | |
| characters_info.append(char_data) | |
| return create_success_response( | |
| f"获取到 {len(characters_info)} 个预定义角色", | |
| { | |
| "characters": characters_info, | |
| "predefined_count": len(characters_info), | |
| "custom_count": 0, # 暂时不支持 | |
| "total_count": len(characters_info) | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"获取角色列表失败: {e}") | |
| raise HTTPException(status_code=500, detail="获取角色列表失败") | |
| # ==================== 音频文件路由 ==================== | |
| async def get_audio_file(filename: str): | |
| """获取生成的音频文件""" | |
| try: | |
| # 构建安全的文件路径 | |
| audio_dir = tempfile.gettempdir() | |
| file_path = os.path.join(audio_dir, filename) | |
| # 安全检查 | |
| if not os.path.exists(file_path) or not filename.endswith(('.wav', '.mp3')): | |
| raise HTTPException(status_code=404, detail="音频文件不存在") | |
| # 返回音频文件 | |
| return FileResponse( | |
| file_path, | |
| media_type="audio/wav" if filename.endswith('.wav') else "audio/mpeg", | |
| filename=filename | |
| ) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="音频文件不存在") | |
| except Exception as e: | |
| logger.error(f"获取音频文件失败: {e}") | |
| raise HTTPException(status_code=500, detail="获取音频文件失败") |