| from fastapi import FastAPI, Query |
| from fastapi.staticfiles import StaticFiles |
| from kokoro import KPipeline |
| import soundfile as sf |
| from pydub import AudioSegment |
| import os |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel |
| import uuid |
| import time |
| app = FastAPI() |
|
|
| |
| os.makedirs("static", exist_ok=True) |
| app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
| class AudioResponse(BaseModel): |
| audio_url: str |
| time: float |
| |
| pipeline = KPipeline(lang_code='z') |
| @app.get("/generate_audio/", response_model=AudioResponse) |
| async def generate_audio(text: str = Query(..., description="输入要转换为语音的文本"), |
| voice: str = Query("zf_xiaoxiao", description="选择语音类型")): |
| per=time.time() |
| |
| generator = pipeline(text, voice=voice, speed=1, split_pattern=r'\n+') |
| |
| |
| audio_files = [] |
| for i, (gs, ps, audio) in enumerate(generator): |
| audio_filename = f"static/temp_{i}.wav" |
| sf.write(audio_filename, audio, 24000) |
| audio_files.append(audio_filename) |
| |
| |
| combined_audio = AudioSegment.empty() |
| for audio_file in audio_files: |
| audio_segment = AudioSegment.from_wav(audio_file) |
| combined_audio += audio_segment |
| |
| |
| unique_filename = f"audio_{uuid.uuid4().hex}.wav" |
| combined_filepath = os.path.join("static", unique_filename) |
| |
| |
| combined_audio.export(combined_filepath, format="wav") |
| |
| |
| for audio_file in audio_files: |
| os.remove(audio_file) |
| end=time.time() |
| |
| audio_url = f"/static/{unique_filename}" |
| return {"audio_url": audio_url,"time": end-per} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|