Simranjit's picture
Update app.py
21ba187 verified
import requests
import os
from scipy.io.wavfile import write
import gradio as gr
import numpy as np
import uuid
#import boto3
import datetime
import time
# access_key = os.environ.get('access_key', None)
# secret_access_key = os.environ.get('secret_access_key', None)
# session = boto3.Session(
# aws_access_key_id=access_key,
# aws_secret_access_key=secret_access_key,
# )
# s3 = session.resource('s3')
# BUCKET = "audio-text-938"
token_hf = os.environ.get('token_hf', None)
API_URL = "https://tfugbov5t776omzd.us-east-1.aws.endpoints.huggingface.cloud"
headers = {
"Accept" : "application/json",
"Authorization": f"Bearer {token_hf}",
"Content-Type": "audio/wav"
}
def query(data):
with open("test.wav", "rb") as f:
ndata = f.read()
response = requests.post(API_URL, headers=headers, data=ndata)
return response.json()
def greet(audio):
write('test.wav', audio[0], audio[1])
output = query(audio)
return output["text"]
# print("cur path", os.listdir(os.path.join("..", "..", "..")))
# if not os.path.isdir(os.path.join("..", "..", "..", "data", "hfcache")):
# os.mkdir(os.path.join("..", "..", "..", "data", "hfcache"))
# if not os.path.isdir(os.path.join("..", "..", "..", "data", "audio")):
# os.mkdir(os.path.join("..", "..", "..", "data", "audio"))
# if not os.path.isdir(os.path.join("..", "..", "..", "data", "audio_texts")):
# os.mkdir(os.path.join("..", "..", "..", "data", "audio_texts"))
# os.environ["HF_HOME"] = os.path.join("..", "..", "..", "data", "hfcache")
def post_process(text):
text = text.replace("nouvelle ligne", "\n")
text = text.replace("à la ligne", "\n")
text = text.replace("point d'intérogation", "?")
text = text.replace("point d'intérrogation", "?")
text = text.replace("point d'interrogation", "?")
text = text.replace("point d'interogation", "?")
text = text.replace(" virgule", ",")
text = text.replace(" virgule", ",")
text = text.replace(" deux points", ":")
text = text.replace(" deux points", ":")
text = text.replace(" point", ".")
text = text.replace(" point", ".")
text = text.replace(" nouveau paragraphe ", "\n\n")
text = text.replace(" paragraphe ", "\n\n")
text = text.split("\n")
text = [t.strip() for t in text]
text = "\n".join(text)
return text
def transcribe(state, audio):
sr, y = audio
y = y.astype(np.float32)
y /= np.max(np.abs(y))
if state is not None:
state = np.concatenate([state, y])
else:
state = y
text = greet([sr, state])
text = post_process(text)
return state, text
def save_fn(audio, text):
# sr, y = audio
# y = y.astype(np.float32)
# y /= np.max(np.abs(y))
# uid = str(uuid.uuid4())
# with open(f"{uid}.txt", "w", encoding="utf-8") as f:
# f.write(text)
# s3.Bucket(BUCKET).upload_file(f"{uid}.txt", f"texts/{uid}.txt") #local path, bucket path
# write(f"{uid}.wav", sr, y)
# s3.Bucket(BUCKET).upload_file(f"{uid}.wav", f"audios/{uid}.wav") #local path, bucket path
return [None, None, ""]
with gr.Blocks() as demo:
state = gr.State(None)
current_speaches = gr.State(1)
old_text = gr.State("")
last_text = gr.State("")
audio = gr.Audio(streaming=True)
text = gr.TextArea(show_copy_button=True)
audio.stream(fn=transcribe, inputs=[state, audio], outputs=[state, text])
save = gr.Button("save")
save.click(fn=save_fn, inputs=[audio, text], outputs=[state, audio, text])
demo.launch()