import os import base64 import gradio as gr from huggingface_hub import upload_file, InferenceClient import json from fastmcp import MCP, MCPClient from playsound import playsound from gtts import gTTS # --- Config --- HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" # --- MCP server instance --- mcp = MCP() # 用於定義工具 # --- MCP Tool --- @mcp.tools() def say_hi(text="Hi!"): # 1️⃣ 生成 mp3 tts = gTTS(text=text, lang="en") tmp_path = "/tmp/say_hi.mp3" tts.save(tmp_path) # 2️⃣ 播放音檔 playsound(tmp_path) return f"Played: {text}" # --- Helper Functions --- def save_and_upload_image(image_b64, hf_token): image_bytes = base64.b64decode(image_b64) local_tmp_path = "/tmp/tmp.jpg" with open(local_tmp_path, "wb") as f: f.write(image_bytes) path_in_repo = "images/tmp.jpg" upload_file( path_or_fileobj=local_tmp_path, path_in_repo=path_in_repo, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset" ) hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes) # --- Main MCP function --- def process_and_describe(payload: dict): try: hf_token = payload.get("hf_token") if not hf_token: return {"error": "HF token not provided in payload."} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "No image provided."} local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) hf_client = InferenceClient(token=hf_token) system_prompt = """ You are a helpful robot assistant. 1. Describe the image in detail. 2. Suggest what the robot should do next. - Human figure → say 'Hi'. Always respond in JSON: {"description": "...", "action": "say_hi"} """ messages_payload = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Here is an image."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] chat_completion = hf_client.chat.completions.create( model=HF_VLM_MODEL, messages=messages_payload, max_tokens=200 ) vlm_text = chat_completion.choices[0].message.content.strip() action_data = {} try: action_data = json.loads(vlm_text) except Exception: action_data = {"description": vlm_text, "action": "unknown"} # --- Call MCP tool --- vlm_action = action_data.get("action") tool_result = None if vlm_action == "say_hi": tool_result = say_hi(text="Hi!") # 這裡會生成 /tmp/say_hi.mp3 return { "saved_to_hf_hub": True, "repo_id": HF_DATASET_REPO, "path_in_repo": path_in_repo, "image_url": hf_url, "file_size_bytes": size_bytes, "robot_id": robot_id, "vlm_response": vlm_text, "vlm_action": vlm_action, "vlm_description": action_data.get("description", ""), "tool_result": tool_result } except Exception as e: return {"error": f"An API error occurred: {str(e)}"} # --- Gradio MCP Interface --- demo = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input Payload"), outputs=gr.JSON(label="Reply to Jetson"), api_name="predict" ) if __name__ == "__main__": demo.launch(mcp_server=True)