Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| import os | |
| import numpy as np | |
| from scipy.io.wavfile import write as write_wav | |
| from PIL import Image | |
| from tools import audio_to_str, image_to_str # 导入tools.py中的方法 | |
| client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
| # 指定保存文件的相对路径 | |
| SAVE_DIR = 'download' # 相对路径 | |
| os.makedirs(SAVE_DIR, exist_ok=True) # 确保目录存在 | |
| def get_client_ip(request: gr.Request, debug_mode=False): | |
| """获取客户端真实IP地址""" | |
| if request: | |
| # 从请求头中获取真实IP(考虑代理情况) | |
| x_forwarded_for = request.headers.get("x-forwarded-for", "") | |
| if x_forwarded_for: | |
| client_ip = x_forwarded_for.split(",")[0] | |
| else: | |
| client_ip = request.client.host | |
| if debug_mode: | |
| print(f"Debug: Client IP detected as {client_ip}") | |
| return client_ip | |
| return "unknown" | |
| def save_audio(audio, filename): | |
| """保存音频为.wav文件""" | |
| sample_rate, audio_data = audio | |
| write_wav(filename, sample_rate, audio_data) | |
| def save_image(image, filename): | |
| """保存图片为.jpg文件""" | |
| img = Image.fromarray(image.astype('uint8')) | |
| img.save(filename) | |
| def process(audio, image, text, request: gr.Request): | |
| """处理语音、图片和文本的示例函数""" | |
| client_ip = get_client_ip(request, True) | |
| print(f"Processing request from IP: {client_ip}") | |
| audio_info = "未收到音频" | |
| image_info = "未收到图片" | |
| text_info = "未收到文本" | |
| audio_filename = None | |
| image_filename = None | |
| audio_text = "" | |
| image_text = "" | |
| if audio is not None: | |
| sample_rate, audio_data = audio | |
| audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}" | |
| # 保存音频为.wav文件 | |
| audio_filename = os.path.join(SAVE_DIR, f"audio_{client_ip}.wav") | |
| save_audio(audio, audio_filename) | |
| print(f"Audio saved as {audio_filename}") | |
| # 调用tools.py中的audio_to_str方法处理音频 | |
| audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename) | |
| if audio_text: | |
| print(f"Audio text: {audio_text}") | |
| else: | |
| print("Audio processing failed") | |
| if image is not None: | |
| image_info = f"图片尺寸: {image.shape}" | |
| # 保存图片为.jpg文件 | |
| image_filename = os.path.join(SAVE_DIR, f"image_{client_ip}.jpg") | |
| save_image(image, image_filename) | |
| print(f"Image saved as {image_filename}") | |
| # 调用tools.py中的image_to_str方法处理图片 | |
| image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename) | |
| if image_text: | |
| print(f"Image text: {image_text}") | |
| else: | |
| print("Image processing failed") | |
| if text: | |
| text_info = f"接收到文本: {text}" | |
| return audio_info, image_info, text_info, audio_text, image_text | |
| # 创建自定义的聊天界面 | |
| with gr.Blocks() as app: | |
| gr.Markdown("# ToDoAgent Multi-Modal Interface") | |
| # 创建两个标签页 | |
| with gr.Tab("Chat"): | |
| # 修复Chatbot类型警告 | |
| chatbot = gr.Chatbot(height=500, type="messages") | |
| msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...") | |
| # 上传区域 | |
| with gr.Row(): | |
| audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"]) | |
| image_input = gr.Image(label="上传图片", type="numpy") | |
| # 设置区域 | |
| with gr.Accordion("高级设置", open=False): | |
| system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示") | |
| max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度") | |
| temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度") | |
| top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p") | |
| # 提交按钮 | |
| submit_btn = gr.Button("发送", variant="primary") | |
| # 清除按钮 | |
| clear = gr.Button("清除聊天") | |
| # 事件处理 | |
| def user(user_message, chat_history): | |
| return "", chat_history + [{"role": "user", "content": user_message}] | |
| #新增多模态处理--1 | |
| def respond(message, chat_history, system_message, max_tokens, temperature, top_p, audio=None, image=None, text=None, request=None): | |
| """生成响应的函数""" | |
| # 处理多模态输入 | |
| multimodal_content = "" | |
| if audio is not None: | |
| try: | |
| audio_filename = os.path.join(SAVE_DIR, "temp_audio.wav") | |
| save_audio(audio, audio_filename) | |
| audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename) | |
| if audio_text: | |
| multimodal_content += f"音频内容: {audio_text}\n" | |
| except Exception as e: | |
| print(f"Audio processing error: {e}") | |
| if image is not None: | |
| try: | |
| image_filename = os.path.join(SAVE_DIR, "temp_image.jpg") | |
| save_image(image, image_filename) | |
| image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename) | |
| if image_text: | |
| multimodal_content += f"图片内容: {image_text}\n" | |
| except Exception as e: | |
| print(f"Image processing error: {e}") | |
| # 组合最终消息 | |
| final_message = message | |
| if multimodal_content: | |
| final_message = f"{message}\n\n{multimodal_content}" | |
| # 构建消息历史 | |
| messages = [{"role": "system", "content": system_message}] | |
| for chat in chat_history: | |
| if isinstance(chat, dict) and "role" in chat and "content" in chat: | |
| messages.append(chat) | |
| messages.append({"role": "user", "content": final_message}) | |
| # 调用HuggingFace API | |
| try: | |
| response = client.chat_completion( | |
| messages, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ) | |
| partial_message = "" | |
| for token in response: | |
| if token.choices[0].delta.content is not None: | |
| partial_message += token.choices[0].delta.content | |
| yield partial_message | |
| except Exception as e: | |
| yield f"抱歉,生成响应时出现错误: {str(e)}" | |
| def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image, text): | |
| # 检查chat_history是否为空 | |
| if not chat_history or len(chat_history) == 0: | |
| return | |
| # 获取最后一条用户消息 | |
| last_message = chat_history[-1] | |
| if not last_message or not isinstance(last_message, dict) or "content" not in last_message: | |
| return | |
| user_message = last_message["content"] | |
| # 生成响应 | |
| bot_response = "" | |
| for response in respond( | |
| user_message, | |
| chat_history[:-1], | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| audio, | |
| image, | |
| text | |
| ): | |
| bot_response = response | |
| # 添加助手回复到聊天历史 | |
| updated_history = chat_history + [{"role": "assistant", "content": bot_response}] | |
| yield updated_history | |
| msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot | |
| ) | |
| submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot | |
| ) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| with gr.Tab("Audio/Image Processing"): | |
| gr.Markdown("## 处理音频和图片") | |
| audio_processor = gr.Audio(label="上传音频", type="numpy") | |
| image_processor = gr.Image(label="上传图片", type="numpy") | |
| text_input = gr.Textbox(label="输入文本") | |
| process_btn = gr.Button("处理", variant="primary") | |
| audio_output = gr.Textbox(label="音频信息") | |
| image_output = gr.Textbox(label="图片信息") | |
| text_output = gr.Textbox(label="文本信息") | |
| audio_text_output = gr.Textbox(label="音频转文字结果") | |
| image_text_output = gr.Textbox(label="图片转文字结果") | |
| # 修改后的处理函数调用 | |
| process_btn.click( | |
| process, | |
| inputs=[audio_processor, image_processor, text_input], | |
| outputs=[audio_output, image_output, text_output, audio_text_output, image_text_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |