Spaces:
Paused
Paused
| import gradio as gr | |
| import threading | |
| import numpy as np | |
| from scipy.io import wavfile | |
| from scipy.signal import resample | |
| from ali import * | |
| from Zhipu import * | |
| def to16000(sr, data, fname): | |
| # 计算重采样后的样本数量 | |
| num_samples = round(len(data) * 16000 / sr) | |
| # 使用resample函数进行重采样 | |
| resampled_data = resample(data, num_samples) | |
| # 创建新的1600Hz采样率的WAV文件 | |
| wavfile.write(fname, 16000, resampled_data.astype(data.dtype)) | |
| # 这个函数将在后台运行,用于处理音频流并保存为WAV文件 | |
| def process_and_save_audio(sr, data, filename="recording.wav"): | |
| # 保存为WAV文件 | |
| # write(filename, sr, data.astype(np.int16)) | |
| to16000(sr, data, filename) | |
| def audio2text(fname): | |
| accessKeyId = 'LTAI5tF5nA43VsaQApK6evWh' # os.getenv('ALIYUN_AK_ID') | |
| accessKeySecret = 'ywESssmELzIorrIWGPvuL2pD9dbbgs' # os.getenv('ALIYUN_AK_SECRET') | |
| appKey = 'lgo44Om6bL3j81AW' # os.getenv('NLS_APP_KEY') | |
| # 执行录音文件识别 | |
| url = upload2costemp(fname) | |
| resp = fileTrans(accessKeyId, accessKeySecret, appKey, url) | |
| if resp is None: | |
| return | |
| try: | |
| for i in range(len(resp['Result']['Sentences'])): | |
| s = resp['Result']['Sentences'][i] | |
| speakerid = s['SpeakerId'] | |
| text = s['Text'] | |
| texts.append(f'speaker_{speakerid}:{text}') | |
| except Exception as e: | |
| print(f'处理声音错误,{e}') | |
| # 初始化一个变量来跟踪上次保存文件的时间 | |
| last_save_time = time.time() | |
| wavData = None | |
| texts = [] | |
| def processWavData(sr, wavData): | |
| # 保存流到WAV文件 | |
| fname = f"audio_{int(time.time())}.wav" | |
| process_and_save_audio(sr, wavData, fname) | |
| threading.Thread(target=audio2text, args=(fname,), daemon=True).start() | |
| def transcribe(new_chunk): | |
| global last_save_time, wavData | |
| sr, y = new_chunk | |
| # print(f'change {y[:10]}') | |
| # print(f'{y[0:10]}, len(y)={len(y)}') | |
| if wavData is not None: | |
| wavData = np.concatenate((wavData, y)) | |
| else: | |
| wavData = y | |
| last_save_time = time.time() | |
| # 检查是否已经过去了30秒 | |
| if time.time() - last_save_time >= 30: | |
| processWavData(sr, wavData) | |
| # 重置流 | |
| wavData = None | |
| # 更新上次保存时间 | |
| last_save_time = time.time() | |
| return text_output # transcriber({"sampling_rate": sr, "raw": stream})["text"] | |
| def get_summary(): | |
| _systemp = systemp.replace('{Data}', f'{",".join(texts)}') | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": _systemp | |
| } | |
| ] | |
| print(_systemp) | |
| messages.append({"role": "user", "content": '请总结给出纪要'}) | |
| response = client.chat.completions.create( | |
| model="glm-4-long", # 填写需要调用的模型名称 | |
| messages=messages, | |
| # tools=tools, | |
| tool_choice="auto", | |
| ) | |
| print(response.choices[0].message) | |
| content = response.choices[0].message.content | |
| texts.append(f'总结:{content}') | |
| return texts # summary(texts) | |
| def startrecoding(new_chunk): | |
| global last_save_time | |
| # 更新上次保存时间 | |
| last_save_time = time.time() | |
| def stoprecoding(new_chunk): | |
| global wavData | |
| sr, y = new_chunk | |
| processWavData(sr, wavData) | |
| # 重置流 | |
| wavData = None | |
| return texts | |
| with gr.Blocks() as demo: | |
| gr.Label('Demo电子白板, 智能会议纪要生成工具') | |
| audio_input = gr.Audio(label="开始录音", | |
| interactive=True, streaming=True, | |
| waveform_options=gr.WaveformOptions( | |
| sample_rate=16000, | |
| )) | |
| text_output = gr.Textbox(label="转换后的文本", lines=10, interactive=False, value=texts) | |
| summary_button = gr.Button("获取总结") | |
| # 定义一个实时处理音频的函数,这里使用了 trans 函数 | |
| audio_input.change(transcribe, inputs=audio_input, outputs=text_output) | |
| audio_input.start_recording(startrecoding, inputs=audio_input) | |
| audio_input.stop_recording(stoprecoding, inputs=audio_input, outputs=text_output) | |
| # 当按钮被点击时,调用 get_summary 函数,并将结果更新到 text_output 组件中 | |
| summary_button.click(get_summary, outputs=text_output) | |
| demo.launch(server_name="0.0.0.0", ssl_verify=False, server_port=8030) | |