Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
| def get_client_ip(request: gr.Request, debug_mode=False): | |
| """获取客户端真实IP地址""" | |
| if request: | |
| # 从请求头中获取真实IP(考虑代理情况) | |
| x_forwarded_for = request.headers.get("x-forwarded-for", "") | |
| if x_forwarded_for: | |
| client_ip = x_forwarded_for.split(",")[0] | |
| else: | |
| client_ip = request.client.host | |
| if(debug_mode): | |
| print(f"Debug: Client IP detected as {client_ip}") | |
| return client_ip | |
| return "unknown" | |
| def process(audio, image, request: gr.Request): | |
| """处理语音和图片的示例函数""" | |
| client_ip = get_client_ip(request, True) | |
| print(f"Processing request from IP: {client_ip}") | |
| if audio is not None: | |
| sample_rate, audio_data = audio | |
| audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}" | |
| else: | |
| audio_info = "未收到音频" | |
| if image is not None: | |
| image_info = f"图片尺寸: {image.shape}" | |
| else: | |
| image_info = "未收到图片" | |
| return audio_info, image_info | |
| def respond( | |
| message, | |
| history: list[tuple[str, str]], | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| audio, | |
| image, | |
| request: gr.Request # 添加请求对象 | |
| ): | |
| # 获取客户端IP | |
| client_ip = get_client_ip(request, True) | |
| print(f"Chat request from IP: {client_ip}") | |
| # 如果有上传的音频或图片,添加到消息中 | |
| if audio is not None: | |
| audio_sample_rate, audio_data = audio | |
| message += f"\n[附加音频信息: 采样率 {audio_sample_rate}Hz, 时长 {len(audio_data)/audio_sample_rate:.2f}秒]" | |
| if image is not None: | |
| message += f"\n[附加图片信息: 尺寸 {image.shape}]" | |
| # 可选:将IP信息添加到系统消息中 | |
| annotated_system_message = f"{system_message}\n[用户IP: {client_ip}]" | |
| messages = [{"role": "system", "content": annotated_system_message}] | |
| for val in history: | |
| if val[0]: | |
| messages.append({"role": "user", "content": val[0]}) | |
| if val[1]: | |
| messages.append({"role": "assistant", "content": val[1]}) | |
| messages.append({"role": "user", "content": message}) | |
| response = "" | |
| for message in client.chat_completion( | |
| messages, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ): | |
| token = message.choices[0].delta.content | |
| response += token | |
| yield response | |
| # 创建自定义的聊天界面 | |
| with gr.Blocks() as app: | |
| gr.Markdown("# ToDoAgent Multi-Modal Interface") | |
| # 创建两个标签页 | |
| with gr.Tab("Chat"): | |
| # 修复Chatbot类型警告 | |
| chatbot = gr.Chatbot(height=500, type="messages") | |
| msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...") | |
| # 上传区域 | |
| with gr.Row(): | |
| audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"]) | |
| image_input = gr.Image(label="上传图片", type="numpy") | |
| # 设置区域 | |
| with gr.Accordion("高级设置", open=False): | |
| system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示") | |
| max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度") | |
| temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度") | |
| top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p") | |
| # 提交按钮 | |
| submit_btn = gr.Button("发送", variant="primary") | |
| # 清除按钮 | |
| clear = gr.Button("清除聊天") | |
| # 事件处理 | |
| def user(user_message, chat_history): | |
| return "", chat_history + [[{"role": "user", "content": user_message}, None]] | |
| # 修改后的bot函数 - 使用正确的参数数量 | |
| def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image): | |
| # 获取最后一条用户消息 | |
| user_message = chat_history[-1][0]["content"] | |
| # 获取请求对象 | |
| request = gr.Request() | |
| # 生成响应 | |
| bot_response = "" | |
| for response in respond( | |
| user_message, | |
| chat_history[:-1], | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| audio, | |
| image, | |
| request # 传递请求对象 | |
| ): | |
| bot_response = response | |
| chat_history[-1][1] = {"role": "assistant", "content": bot_response} | |
| yield chat_history | |
| # 连接事件 - 移除gr.Request()输入 | |
| msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input], chatbot | |
| ) | |
| submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input], chatbot | |
| ) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| with gr.Tab("Audio/Image Processing"): | |
| gr.Markdown("## 处理音频和图片") | |
| audio_processor = gr.Audio(label="上传音频", type="numpy") | |
| image_processor = gr.Image(label="上传图片", type="numpy") | |
| process_btn = gr.Button("处理", variant="primary") | |
| audio_output = gr.Textbox(label="音频信息") | |
| image_output = gr.Textbox(label="图片信息") | |
| # 修改后的处理函数调用 | |
| process_btn.click( | |
| process, | |
| inputs=[audio_processor, image_processor], | |
| outputs=[audio_output, image_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |