File size: 3,847 Bytes
0ef482f
938f609
 
9d41b1d
bbcef43
d722b23
 
 
48607b7
1f8048b
 
5d97286
1f8048b
d722b23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1f8048b
406e27f
1f8048b
bbcef43
1f8048b
 
 
bbcef43
1f8048b
 
 
 
a10dd0b
1f8048b
 
 
 
 
9d41b1d
938f609
 
0ef482f
dac9550
 
 
 
48607b7
bbcef43
 
 
48607b7
406e27f
dac9550
 
a10dd0b
bbcef43
d722b23
 
5d97286
bbcef43
d722b23
a10dd0b
 
c5129eb
a10dd0b
bbcef43
 
 
 
c5129eb
9d41b1d
c5129eb
9d41b1d
c5129eb
d722b23
48607b7
dac9550
d722b23
bbcef43
 
 
 
d722b23
 
 
 
 
 
 
48607b7
dd3451f
53af268
 
 
938f609
 
48607b7
bbcef43
d722b23
 
 
dd3451f
ec3d9e7
0ef482f
cd798bc
d081bf3
9a56bc2
 
 
d722b23
9a56bc2
 
 
444e2a5
0ef482f
17438da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import MCP, MCPClient
from playsound import playsound
from gtts import gTTS

# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" 

# --- MCP server instance ---
mcp = MCP()  # 用於定義工具

# --- MCP Tool ---
@mcp.tools()
def say_hi(text="Hi!"):
    # 1️⃣ 生成 mp3
    tts = gTTS(text=text, lang="en")
    tmp_path = "/tmp/say_hi.mp3"
    tts.save(tmp_path)
    
    # 2️⃣ 播放音檔
    playsound(tmp_path)
    
    return f"Played: {text}"

# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
    image_bytes = base64.b64decode(image_b64)
    local_tmp_path = "/tmp/tmp.jpg"
    with open(local_tmp_path, "wb") as f:
        f.write(image_bytes)

    path_in_repo = "images/tmp.jpg"
    upload_file(
        path_or_fileobj=local_tmp_path,
        path_in_repo=path_in_repo,
        repo_id=HF_DATASET_REPO,
        token=hf_token,
        repo_type="dataset"
    )

    hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
    return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)

# --- Main MCP function ---
def process_and_describe(payload: dict):
    try:
        hf_token = payload.get("hf_token")
        if not hf_token:
            return {"error": "HF token not provided in payload."}

        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload.get("image_b64")
        if not image_b64:
            return {"error": "No image provided."}

        local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
        hf_client = InferenceClient(token=hf_token)

        system_prompt = """
        You are a helpful robot assistant.
        1. Describe the image in detail.
        2. Suggest what the robot should do next.
           - Human figure → say 'Hi'.
        Always respond in JSON:
        {"description": "...", "action": "say_hi"}
        """

        messages_payload = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": [
                {"type": "text", "text": "Here is an image."},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
            ]}
        ]

        chat_completion = hf_client.chat.completions.create(
            model=HF_VLM_MODEL,
            messages=messages_payload,
            max_tokens=200
        )

        vlm_text = chat_completion.choices[0].message.content.strip()
        action_data = {}
        try:
            action_data = json.loads(vlm_text)
        except Exception:
            action_data = {"description": vlm_text, "action": "unknown"}

        # --- Call MCP tool ---
        vlm_action = action_data.get("action")
        tool_result = None
        if vlm_action == "say_hi":
            tool_result = say_hi(text="Hi!")  # 這裡會生成 /tmp/say_hi.mp3

        return {
            "saved_to_hf_hub": True,
            "repo_id": HF_DATASET_REPO,
            "path_in_repo": path_in_repo,
            "image_url": hf_url,
            "file_size_bytes": size_bytes,
            "robot_id": robot_id,
            "vlm_response": vlm_text,
            "vlm_action": vlm_action,
            "vlm_description": action_data.get("description", ""),
            "tool_result": tool_result
        }

    except Exception as e:
        return {"error": f"An API error occurred: {str(e)}"}

# --- Gradio MCP Interface ---
demo = gr.Interface(
    fn=process_and_describe,
    inputs=gr.JSON(label="Input Payload"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)