import gradio as gr from huggingface_hub import InferenceClient import os import numpy as np from scipy.io.wavfile import write as write_wav from PIL import Image from tools import audio_to_str, image_to_str # 导入tools.py中的方法 client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") # 指定保存文件的相对路径 SAVE_DIR = 'download' # 相对路径 os.makedirs(SAVE_DIR, exist_ok=True) # 确保目录存在 def get_client_ip(request: gr.Request, debug_mode=False): """获取客户端真实IP地址""" if request: # 从请求头中获取真实IP(考虑代理情况) x_forwarded_for = request.headers.get("x-forwarded-for", "") if x_forwarded_for: client_ip = x_forwarded_for.split(",")[0] else: client_ip = request.client.host if debug_mode: print(f"Debug: Client IP detected as {client_ip}") return client_ip return "unknown" def save_audio(audio, filename): """保存音频为.wav文件""" sample_rate, audio_data = audio write_wav(filename, sample_rate, audio_data) def save_image(image, filename): """保存图片为.jpg文件""" img = Image.fromarray(image.astype('uint8')) img.save(filename) def process(audio, image, text, request: gr.Request): """处理语音、图片和文本的示例函数""" client_ip = get_client_ip(request, True) print(f"Processing request from IP: {client_ip}") audio_info = "未收到音频" image_info = "未收到图片" text_info = "未收到文本" audio_filename = None image_filename = None audio_text = "" image_text = "" if audio is not None: sample_rate, audio_data = audio audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}" # 保存音频为.wav文件 audio_filename = os.path.join(SAVE_DIR, f"audio_{client_ip}.wav") save_audio(audio, audio_filename) print(f"Audio saved as {audio_filename}") # 调用tools.py中的audio_to_str方法处理音频 audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename) if audio_text: print(f"Audio text: {audio_text}") else: print("Audio processing failed") if image is not None: image_info = f"图片尺寸: {image.shape}" # 保存图片为.jpg文件 image_filename = os.path.join(SAVE_DIR, f"image_{client_ip}.jpg") save_image(image, image_filename) print(f"Image saved as {image_filename}") # 调用tools.py中的image_to_str方法处理图片 image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename) if image_text: print(f"Image text: {image_text}") else: print("Image processing failed") if text: text_info = f"接收到文本: {text}" return audio_info, image_info, text_info, audio_text, image_text # 创建自定义的聊天界面 with gr.Blocks() as app: gr.Markdown("# ToDoAgent Multi-Modal Interface") # 创建两个标签页 with gr.Tab("Chat"): # 修复Chatbot类型警告 chatbot = gr.Chatbot(height=500, type="messages") msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...") # 上传区域 with gr.Row(): audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"]) image_input = gr.Image(label="上传图片", type="numpy") # 设置区域 with gr.Accordion("高级设置", open=False): system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示") max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度") temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度") top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p") # 提交按钮 submit_btn = gr.Button("发送", variant="primary") # 清除按钮 clear = gr.Button("清除聊天") # 事件处理 def user(user_message, chat_history): return "", chat_history + [{"role": "user", "content": user_message}] #新增多模态处理--1 def respond(message, chat_history, system_message, max_tokens, temperature, top_p, audio=None, image=None, text=None, request=None): """生成响应的函数""" # 处理多模态输入 multimodal_content = "" if audio is not None: try: audio_filename = os.path.join(SAVE_DIR, "temp_audio.wav") save_audio(audio, audio_filename) audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename) if audio_text: multimodal_content += f"音频内容: {audio_text}\n" except Exception as e: print(f"Audio processing error: {e}") if image is not None: try: image_filename = os.path.join(SAVE_DIR, "temp_image.jpg") save_image(image, image_filename) image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename) if image_text: multimodal_content += f"图片内容: {image_text}\n" except Exception as e: print(f"Image processing error: {e}") # 组合最终消息 final_message = message if multimodal_content: final_message = f"{message}\n\n{multimodal_content}" # 构建消息历史 messages = [{"role": "system", "content": system_message}] for chat in chat_history: if isinstance(chat, dict) and "role" in chat and "content" in chat: messages.append(chat) messages.append({"role": "user", "content": final_message}) # 调用HuggingFace API try: response = client.chat_completion( messages, max_tokens=max_tokens, stream=True, temperature=temperature, top_p=top_p, ) partial_message = "" for token in response: if token.choices[0].delta.content is not None: partial_message += token.choices[0].delta.content yield partial_message except Exception as e: yield f"抱歉,生成响应时出现错误: {str(e)}" def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image, text): # 检查chat_history是否为空 if not chat_history or len(chat_history) == 0: return # 获取最后一条用户消息 last_message = chat_history[-1] if not last_message or not isinstance(last_message, dict) or "content" not in last_message: return user_message = last_message["content"] # 生成响应 bot_response = "" for response in respond( user_message, chat_history[:-1], system_message, max_tokens, temperature, top_p, audio, image, text ): bot_response = response # 添加助手回复到聊天历史 updated_history = chat_history + [{"role": "assistant", "content": bot_response}] yield updated_history msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot ) submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot ) clear.click(lambda: None, None, chatbot, queue=False) with gr.Tab("Audio/Image Processing"): gr.Markdown("## 处理音频和图片") audio_processor = gr.Audio(label="上传音频", type="numpy") image_processor = gr.Image(label="上传图片", type="numpy") text_input = gr.Textbox(label="输入文本") process_btn = gr.Button("处理", variant="primary") audio_output = gr.Textbox(label="音频信息") image_output = gr.Textbox(label="图片信息") text_output = gr.Textbox(label="文本信息") audio_text_output = gr.Textbox(label="音频转文字结果") image_text_output = gr.Textbox(label="图片转文字结果") # 修改后的处理函数调用 process_btn.click( process, inputs=[audio_processor, image_processor, text_input], outputs=[audio_output, image_output, text_output, audio_text_output, image_text_output] ) if __name__ == "__main__": app.launch()