Spaces:
Sleeping
Sleeping
| import time | |
| from scipy.io.wavfile import write | |
| # from typing import Union | |
| # from pydantic import BaseModel | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| # from fastapi.staticfiles import StaticFiles | |
| # from fastapi.responses import FileResponse | |
| import torch | |
| # from transformers import pipeline | |
| from transformers import SeamlessM4Tv2Model | |
| from transformers import AutoProcessor | |
| model_name = "facebook/seamless-m4t-v2-large" | |
| # model_name = "facebook/hf-seamless-m4t-medium" | |
| processor = AutoProcessor.from_pretrained(model_name) | |
| model = SeamlessM4Tv2Model.from_pretrained(model_name) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| # torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| model.to(device) | |
| app = FastAPI(docs_url="/api/docs") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| allow_credentials=True, | |
| ) | |
| BATCH_SIZE = 8 | |
| def getDevice(): | |
| start_time = time.time() | |
| print("Time took to process the request and return response is {} sec".format( | |
| time.time() - start_time)) | |
| return device | |
| def transcribe(inputs, src_lang="eng", tgt_lang="por"): | |
| start_time = time.time() | |
| if inputs is None: | |
| raise "No audio file submitted! Please upload or record an audio file before submitting your request." | |
| text_inputs = processor(text=inputs, | |
| src_lang=src_lang, return_tensors="pt").to(device) | |
| output_tokens = model.generate( | |
| **text_inputs, tgt_lang=tgt_lang, generate_speech=False) | |
| translated_text_from_text = processor.decode( | |
| output_tokens[0].tolist()[0], skip_special_tokens=True) | |
| print("Time took to process the request and return response is {} sec".format( | |
| time.time() - start_time)) | |
| return translated_text_from_text | |
| async def audio(inputs, src_lang="eng", tgt_lang="por", speaker_id=5): | |
| start_time = time.time() | |
| if inputs is None: | |
| raise "No audio file submitted! Please upload or record an audio file before submitting your request." | |
| text_inputs = processor(text=inputs, | |
| src_lang=src_lang, return_tensors="pt").to(device) | |
| audio_array_from_text = model.generate( | |
| **text_inputs, tgt_lang=tgt_lang, speaker_id=int(speaker_id))[0].cpu().numpy().squeeze() | |
| print("Time took to process the request and return response is {} sec".format( | |
| time.time() - start_time)) | |
| print(f"sampling_rate {model.config.sampling_rate}") | |
| write(f"/tmp/output{start_time}.wav", model.config.sampling_rate, | |
| audio_array_from_text) | |
| return FileResponse(f"/tmp/output{start_time}.wav", media_type="audio/mpeg") | |