Spaces:
Sleeping
Sleeping
File size: 3,847 Bytes
0ef482f 938f609 9d41b1d bbcef43 d722b23 48607b7 1f8048b 5d97286 1f8048b d722b23 1f8048b 406e27f 1f8048b bbcef43 1f8048b bbcef43 1f8048b a10dd0b 1f8048b 9d41b1d 938f609 0ef482f dac9550 48607b7 bbcef43 48607b7 406e27f dac9550 a10dd0b bbcef43 d722b23 5d97286 bbcef43 d722b23 a10dd0b c5129eb a10dd0b bbcef43 c5129eb 9d41b1d c5129eb 9d41b1d c5129eb d722b23 48607b7 dac9550 d722b23 bbcef43 d722b23 48607b7 dd3451f 53af268 938f609 48607b7 bbcef43 d722b23 dd3451f ec3d9e7 0ef482f cd798bc d081bf3 9a56bc2 d722b23 9a56bc2 444e2a5 0ef482f 17438da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import MCP, MCPClient
from playsound import playsound
from gtts import gTTS
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# --- MCP server instance ---
mcp = MCP() # 用於定義工具
# --- MCP Tool ---
@mcp.tools()
def say_hi(text="Hi!"):
# 1️⃣ 生成 mp3
tts = gTTS(text=text, lang="en")
tmp_path = "/tmp/say_hi.mp3"
tts.save(tmp_path)
# 2️⃣ 播放音檔
playsound(tmp_path)
return f"Played: {text}"
# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
hf_client = InferenceClient(token=hf_token)
system_prompt = """
You are a helpful robot assistant.
1. Describe the image in detail.
2. Suggest what the robot should do next.
- Human figure → say 'Hi'.
Always respond in JSON:
{"description": "...", "action": "say_hi"}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=200
)
vlm_text = chat_completion.choices[0].message.content.strip()
action_data = {}
try:
action_data = json.loads(vlm_text)
except Exception:
action_data = {"description": vlm_text, "action": "unknown"}
# --- Call MCP tool ---
vlm_action = action_data.get("action")
tool_result = None
if vlm_action == "say_hi":
tool_result = say_hi(text="Hi!") # 這裡會生成 /tmp/say_hi.mp3
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"vlm_action": vlm_action,
"vlm_description": action_data.get("description", ""),
"tool_result": tool_result
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
if __name__ == "__main__":
demo.launch(mcp_server=True)
|