from fastapi import FastAPI, HTTPException from fastapi.responses import Response from pydantic import BaseModel from kokoro import KPipeline import soundfile as sf import torch import os import uuid import numpy as np import io from typing import Optional pipeline = KPipeline(lang_code='a') app = FastAPI(title="Text to Speech API") class TextToSpeechRequest(BaseModel): text: str language: Optional[str] = "en" slow: Optional[bool] = False def tensor_to_audio_bytes(audio_tensor: torch.Tensor, sample_rate: int = 24000) -> bytes: """ Convert a float audio tensor to bytes. Args: audio_tensor (torch.Tensor): Input audio tensor of shape (samples,) or (channels, samples) sample_rate (int): Sample rate of the audio in Hz. Default is 24000. Returns: bytes: Audio data in bytes format """ # Ensure tensor is on CPU and convert to numpy audio_np = audio_tensor.detach().cpu().numpy() # Handle different input shapes if len(audio_np.shape) == 1: # Mono audio (samples,) audio_np = audio_np.reshape(1, -1) elif len(audio_np.shape) > 2: raise ValueError(f"Expected 1D or 2D tensor, got shape {audio_np.shape}") # Create a bytes buffer buffer = io.BytesIO() # Write audio data to buffer using soundfile sf.write(buffer, audio_np.T, sample_rate, format='WAV') # Get the bytes from the buffer audio_bytes = buffer.getvalue() buffer.close() return audio_bytes @app.post("/tts") async def text_to_speech(request: TextToSpeechRequest): try: generator = pipeline(request.text, voice='af_heart') for i, (gs, ps, audio) in enumerate(generator): audio_tensor = audio audio_bytes = tensor_to_audio_bytes(audio_tensor) # Return audio bytes directly with appropriate headers return Response( content=audio_bytes, media_type="audio/wav", headers={ "Content-Disposition": "attachment; filename=speech.wav" } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/") async def root(): return {"message": "Welcome to the Text to Speech API. Use POST /tts to convert text to speech. the body should be a json with the following fields: {'text': 'text to convert to speech', 'language': 'language code (optional, default is en)', 'slow': 'boolean (optional, default is False)'}"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)