import gradio as gr import threading import numpy as np from scipy.io import wavfile from scipy.signal import resample from ali import * from Zhipu import * def to16000(sr, data, fname): # 计算重采样后的样本数量 num_samples = round(len(data) * 16000 / sr) # 使用resample函数进行重采样 resampled_data = resample(data, num_samples) # 创建新的1600Hz采样率的WAV文件 wavfile.write(fname, 16000, resampled_data.astype(data.dtype)) # 这个函数将在后台运行,用于处理音频流并保存为WAV文件 def process_and_save_audio(sr, data, filename="recording.wav"): # 保存为WAV文件 # write(filename, sr, data.astype(np.int16)) to16000(sr, data, filename) def audio2text(fname): accessKeyId = 'LTAI5tF5nA43VsaQApK6evWh' # os.getenv('ALIYUN_AK_ID') accessKeySecret = 'ywESssmELzIorrIWGPvuL2pD9dbbgs' # os.getenv('ALIYUN_AK_SECRET') appKey = 'lgo44Om6bL3j81AW' # os.getenv('NLS_APP_KEY') # 执行录音文件识别 url = upload2costemp(fname) resp = fileTrans(accessKeyId, accessKeySecret, appKey, url) if resp is None: return try: for i in range(len(resp['Result']['Sentences'])): s = resp['Result']['Sentences'][i] speakerid = s['SpeakerId'] text = s['Text'] texts.append(f'speaker_{speakerid}:{text}') except Exception as e: print(f'处理声音错误,{e}') # 初始化一个变量来跟踪上次保存文件的时间 last_save_time = time.time() wavData = None texts = [] def processWavData(sr, wavData): # 保存流到WAV文件 fname = f"audio_{int(time.time())}.wav" process_and_save_audio(sr, wavData, fname) threading.Thread(target=audio2text, args=(fname,), daemon=True).start() def transcribe(new_chunk): global last_save_time, wavData sr, y = new_chunk # print(f'change {y[:10]}') # print(f'{y[0:10]}, len(y)={len(y)}') if wavData is not None: wavData = np.concatenate((wavData, y)) else: wavData = y last_save_time = time.time() # 检查是否已经过去了30秒 if time.time() - last_save_time >= 30: processWavData(sr, wavData) # 重置流 wavData = None # 更新上次保存时间 last_save_time = time.time() return text_output # transcriber({"sampling_rate": sr, "raw": stream})["text"] def get_summary(): _systemp = systemp.replace('{Data}', f'{",".join(texts)}') messages = [ { "role": "system", "content": _systemp } ] print(_systemp) messages.append({"role": "user", "content": '请总结给出纪要'}) response = client.chat.completions.create( model="glm-4-long", # 填写需要调用的模型名称 messages=messages, # tools=tools, tool_choice="auto", ) print(response.choices[0].message) content = response.choices[0].message.content texts.append(f'总结:{content}') return texts # summary(texts) def startrecoding(new_chunk): global last_save_time # 更新上次保存时间 last_save_time = time.time() def stoprecoding(new_chunk): global wavData sr, y = new_chunk processWavData(sr, wavData) # 重置流 wavData = None return texts with gr.Blocks() as demo: gr.Label('Demo电子白板, 智能会议纪要生成工具') audio_input = gr.Audio(label="开始录音", interactive=True, streaming=True, waveform_options=gr.WaveformOptions( sample_rate=16000, )) text_output = gr.Textbox(label="转换后的文本", lines=10, interactive=False, value=texts) summary_button = gr.Button("获取总结") # 定义一个实时处理音频的函数,这里使用了 trans 函数 audio_input.change(transcribe, inputs=audio_input, outputs=text_output) audio_input.start_recording(startrecoding, inputs=audio_input) audio_input.stop_recording(stoprecoding, inputs=audio_input, outputs=text_output) # 当按钮被点击时,调用 get_summary 函数,并将结果更新到 text_output 组件中 summary_button.click(get_summary, outputs=text_output) demo.launch(server_name="0.0.0.0", ssl_verify=False, server_port=8030)