Spaces:
Paused
Paused
| import gradio as gr | |
| # import soundfile as sf | |
| from scipy.io.wavfile import write | |
| import numpy as np | |
| import wave | |
| import time | |
| from scipy.io import wavfile | |
| from scipy.signal import resample | |
| import requests | |
| from ali import * | |
| def to16000(sr, data, fname): | |
| # 计算重采样后的样本数量 | |
| num_samples = round(len(data) * 16000 / sr) | |
| # 使用resample函数进行重采样 | |
| resampled_data = resample(data, num_samples) | |
| # 创建新的1600Hz采样率的WAV文件 | |
| wavfile.write(fname, 16000, resampled_data.astype(data.dtype)) | |
| # 这个函数将在后台运行,用于处理音频流并保存为WAV文件 | |
| def process_and_save_audio(sr, data, filename="recording.wav"): | |
| # 保存为WAV文件 | |
| # write(filename, sr, data.astype(np.int16)) | |
| to16000(sr, data, filename) | |
| def audio2text(fname): | |
| accessKeyId = 'LTAI5tF5nA43VsaQApK6evWh' # os.getenv('ALIYUN_AK_ID') | |
| accessKeySecret = 'ywESssmELzIorrIWGPvuL2pD9dbbgs' # os.getenv('ALIYUN_AK_SECRET') | |
| appKey = 'lgo44Om6bL3j81AW' # os.getenv('NLS_APP_KEY') | |
| # 执行录音文件识别 | |
| url = upload2costemp(fname) | |
| resp = fileTrans(accessKeyId, accessKeySecret, appKey, url) | |
| try: | |
| for i in range(len(resp['Result']['Sentences'])): | |
| s = resp['Result']['Sentences'][i] | |
| speakerid = s['SpeakerId'] | |
| text = s['Text'] | |
| texts.append(f'speaker_{speakerid}:{text}') | |
| except Exception as e: | |
| print(f'处理声音错误,{e}') | |
| # 初始化一个变量来跟踪上次保存文件的时间 | |
| last_save_time = time.time() | |
| wavData = None | |
| texts = [] | |
| def transcribe(new_chunk): | |
| global last_save_time, wavData | |
| sr, y = new_chunk | |
| # print(f'{y[0:10]}, len(y)={len(y)}') | |
| if wavData is not None: | |
| wavData = np.concatenate((wavData, y)) | |
| else: | |
| wavData = y | |
| last_save_time = time.time() | |
| # 检查是否已经过去了30秒 | |
| if time.time() - last_save_time >= 30: | |
| # 保存流到WAV文件 | |
| fname = f"audio_{int(time.time())}.wav" | |
| process_and_save_audio(sr, wavData, fname) | |
| threading.Thread(target=audio2text, args=(fname,), daemon=True).start() | |
| # 重置流 | |
| wavData = None | |
| # 更新上次保存时间 | |
| last_save_time = time.time() | |
| return gr.Textbox(label="转换后的文本", lines=10, interactive=False, | |
| value=texts) # transcriber({"sampling_rate": sr, "raw": stream})["text"] | |
| audio = gr.Audio(label="开始录音", sources=["microphone"], streaming=True) | |
| text = gr.Textbox(label="转换后的文本", lines=10, interactive=False, value=texts) | |
| def startrecoding(): | |
| print('startrecoding') | |
| def stoprecoding(): | |
| print('stoprecoding') | |
| audio.start_recording(startrecoding, audio, text) | |
| audio.stop_recording(stoprecoding, audio, text) | |
| demo = gr.Interface( | |
| transcribe, | |
| audio, | |
| text, live=True, | |
| ) | |
| # 启动Gradio应用 | |
| demo.launch(server_name="0.0.0.0", ssl_verify=False, server_port=8030, share=True) | |