Spaces:
Runtime error
Runtime error
| # import whisper | |
| # model = whisper.load_model("medium") | |
| # options = whisper.DecodingOptions(language="spanish", fp16=False) | |
| # result = model.transcribe("audio2.mp3", decode_options=options) | |
| # print(result["text"]) | |
| import torch | |
| from transformers import pipeline | |
| from fastapi import BackgroundTasks, FastAPI | |
| from fastapi.responses import RedirectResponse | |
| from azure.storage.blob import BlobClient, ContentSettings | |
| STORAGEACCOUNTURL = "https://callreviewer.blob.core.windows.net" | |
| STORAGEACCOUNTKEY = "vXq0X89zOaQxQmv7UBGFqqa61V0FRE6Gx1TgJvbtxZn5zLJ1ETc9aGDbbotuSoQzf5ob9QTuXlof+AStdHXOpA==" | |
| CONTAINERNAME = "default" | |
| BLOBNAME = "audio.mp3" | |
| MODEL_NAME = "openai/whisper-large-v2" | |
| device = 0 if torch.cuda.is_available() else "cpu" | |
| pipe = pipeline( | |
| task="automatic-speech-recognition", | |
| model=MODEL_NAME, | |
| chunk_length_s=30, | |
| device=device, | |
| ) | |
| all_special_ids = pipe.tokenizer.all_special_ids | |
| transcribe_token_id = all_special_ids[-5] | |
| translate_token_id = all_special_ids[-6] | |
| app = FastAPI() | |
| def transcribe_task(): | |
| try: | |
| print("For processing...") | |
| blob = BlobClient(account_url=STORAGEACCOUNTURL,container_name=CONTAINERNAME, blob_name=BLOBNAME,credential=STORAGEACCOUNTKEY) | |
| blob.set_http_headers(ContentSettings(content_type='audio/mp3')) | |
| with open('audio22.mp3', "wb") as file: | |
| file.write(blob.download_blob().readall()) | |
| pipe.model.config.forced_decoder_ids = [[2, transcribe_token_id]] | |
| print("Call pipeline...") | |
| res = pipe('audio22.mp3', return_timestamps=True) | |
| print(res['text']) | |
| with open('new_file.txt', "w") as file: | |
| file.write(res['text']) | |
| except Exception as e: | |
| with open('new_file.txt', "w") as file: | |
| file.write(str(e)) | |
| async def transcribe(background_tasks: BackgroundTasks): | |
| background_tasks.add_task(transcribe_task) | |
| return {"text": "Processing file..."} | |
| def get_text(): | |
| file = open('new_file.txt', 'r') | |
| content = file.read() | |
| file.close() | |
| return {"text": content} | |
| async def redirect_to_docs(): | |
| return RedirectResponse(url="/docs") |