File size: 2,725 Bytes
157a15c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from kokoro import KPipeline

import soundfile as sf
import torch
import os
import uuid
import numpy as np
import io
from typing import Optional



pipeline = KPipeline(lang_code='a')
app = FastAPI(title="Text to Speech API")

class TextToSpeechRequest(BaseModel):
    text: str
    language: Optional[str] = "en"
    slow: Optional[bool] = False



def tensor_to_audio_bytes(audio_tensor: torch.Tensor, sample_rate: int = 24000) -> bytes:
    """

    Convert a float audio tensor to bytes.

    

    Args:

        audio_tensor (torch.Tensor): Input audio tensor of shape (samples,) or (channels, samples)

        sample_rate (int): Sample rate of the audio in Hz. Default is 24000.

        

    Returns:

        bytes: Audio data in bytes format

    """
    # Ensure tensor is on CPU and convert to numpy
    audio_np = audio_tensor.detach().cpu().numpy()
    
    # Handle different input shapes
    if len(audio_np.shape) == 1:
        # Mono audio (samples,)
        audio_np = audio_np.reshape(1, -1)
    elif len(audio_np.shape) > 2:
        raise ValueError(f"Expected 1D or 2D tensor, got shape {audio_np.shape}")
    
    # Create a bytes buffer
    buffer = io.BytesIO()
    
    # Write audio data to buffer using soundfile
    sf.write(buffer, audio_np.T, sample_rate, format='WAV')
    
    # Get the bytes from the buffer
    audio_bytes = buffer.getvalue()
    buffer.close()
    
    return audio_bytes

@app.post("/tts")
async def text_to_speech(request: TextToSpeechRequest):
    try:
        
        
        generator = pipeline(request.text, voice='af_heart')
        for i, (gs, ps, audio) in enumerate(generator):
            audio_tensor = audio

        audio_bytes = tensor_to_audio_bytes(audio_tensor)
        
        # Return audio bytes directly with appropriate headers
        return Response(
            content=audio_bytes,
            media_type="audio/wav",
            headers={
                "Content-Disposition": "attachment; filename=speech.wav"
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/")
async def root():
    return {"message": "Welcome to the Text to Speech API. Use POST /tts to convert text to speech. the body should be a json with the following fields: {'text': 'text to convert to speech', 'language': 'language code (optional, default is en)', 'slow': 'boolean (optional, default is False)'}"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)