Spaces:
Sleeping
Sleeping
| import torch | |
| import torchaudio | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
| from datasets import load_dataset | |
| from googletrans import Translator | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pathlib import Path | |
| import numpy as np | |
| app = FastAPI() | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| model_id = "openai/whisper-large-v3" | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True) | |
| model.to(device) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| max_new_tokens=256, | |
| chunk_length_s=30, | |
| batch_size=16, | |
| return_timestamps=True, | |
| torch_dtype=torch_dtype, | |
| device=device, | |
| ) | |
| dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation") | |
| async def process_audio(file: UploadFile = File(...)): | |
| try: | |
| # File | |
| save_directory = Path("/home/user") | |
| save_directory.mkdir(parents=True, exist_ok=True) | |
| file_location = save_directory / file.filename | |
| with open(file_location, "wb") as saved_file: | |
| saved_file.write(file.file.read()) | |
| # Read audio file and convert to NumPy ndarray | |
| audio_array, _ = torchaudio.load(file_location, normalize=True) | |
| audio_array = np.array(audio_array[0].numpy()) | |
| # JP | |
| original = pipe(audio_array) | |
| original_version = original["text"] | |
| # EN | |
| result = pipe(audio_array, generate_kwargs={"task": "translate"}) | |
| hasil = result["text"] | |
| # ID | |
| detect = detect_google(hasil) | |
| id_ver = translate_google(hasil, f"{detect}", "ID") | |
| # Additional modifications | |
| id_ver = modify_text(id_ver) | |
| return JSONResponse(content={"response": {"jp_text": original_version, "en_text": hasil, "id_text": id_ver}}, status_code=200) | |
| except Exception as e: | |
| return HTTPException(status_code=500, detail=f"Error: {e}") | |
| def detect_google(text): | |
| try: | |
| translator = Translator() | |
| detected_lang = translator.detect(text) | |
| return detected_lang.lang.upper() | |
| except Exception as e: | |
| print(f"Error detect: {e}") | |
| return None | |
| def translate_google(text, source, target): | |
| try: | |
| translator = Translator() | |
| translated_text = translator.translate(text, src=source, dest=target) | |
| return translated_text.text | |
| except Exception as e: | |
| print(f"Error translate: {e}") | |
| return None | |
| def modify_text(text): | |
| # Additional modifications, case-sensitive | |
| replacements = { | |
| "Tuan": "Master", | |
| "tuan": "Master", | |
| "Guru": "Master", | |
| "guru": "Master", | |
| "Monica": "Monika", | |
| "monica": "Monika", | |
| } | |
| for original, replacement in replacements.items(): | |
| text = text.replace(original, replacement) | |
| return text | |