File size: 4,342 Bytes
0ef482f
938f609
 
9d41b1d
bbcef43
54fb5ed
48607b7
1f8048b
 
9c6065d
1f8048b
d722b23
e6b6ea7
d722b23
 
9230f22
e6b6ea7
5410665
9c6065d
d722b23
1f8048b
e6b6ea7
1f8048b
bbcef43
1f8048b
 
 
bbcef43
1f8048b
 
 
 
a10dd0b
1f8048b
 
 
 
 
9d41b1d
938f609
 
0ef482f
dac9550
 
 
 
48607b7
bbcef43
 
 
48607b7
9c6065d
406e27f
9c6065d
 
dac9550
 
e6b6ea7
 
 
9c6065d
e6b6ea7
 
 
 
 
 
 
 
 
a10dd0b
 
c5129eb
a10dd0b
bbcef43
 
 
 
c5129eb
9d41b1d
e6b6ea7
c5129eb
9d41b1d
c5129eb
5410665
48607b7
dac9550
d722b23
9c6065d
e6b6ea7
 
 
 
 
 
 
 
 
 
 
48607b7
dd3451f
53af268
 
 
938f609
 
48607b7
bbcef43
e6b6ea7
 
 
dd3451f
ec3d9e7
0ef482f
cd798bc
d081bf3
9a56bc2
 
 
9c6065d
9a56bc2
 
 
444e2a5
0ef482f
acccc23
 
 
 
 
 
 
 
e6b6ea7
17438da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import FastMCP

# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"

# --- MCP server instance ---
mcp = FastMCP(name="Robot MCP")

# --- MCP Tool ---
@mcp.tool()
def say_hi(greeting_text: str = "Hi there!") -> dict:
    """Return a greeting command in JSON."""
    return {"command": "say_hi", "text": greeting_text}

# --- Helper Functions ---
def save_and_upload_image(image_b64: str, hf_token: str):
    image_bytes = base64.b64decode(image_b64)
    local_tmp_path = "/tmp/tmp.jpg"
    with open(local_tmp_path, "wb") as f:
        f.write(image_bytes)

    path_in_repo = "images/tmp.jpg"
    upload_file(
        path_or_fileobj=local_tmp_path,
        path_in_repo=path_in_repo,
        repo_id=HF_DATASET_REPO,
        token=hf_token,
        repo_type="dataset"
    )

    hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
    return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)

# --- Main MCP function ---
def process_and_describe(payload: dict):
    try:
        hf_token = payload.get("hf_token")
        if not hf_token:
            return {"error": "HF token not provided in payload."}

        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload.get("image_b64")
        if not image_b64:
            return {"error": "No image provided."}

        # Save image & upload
        local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)

        # Initialize HF client
        hf_client = InferenceClient(token=hf_token)

        # System prompt (without stio.describe_tools because not using STIO here)
        system_prompt = """
        You are a helpful robot assistant.
        When you receive an image, you must:
        1. Describe the image in detail.
        2. Decide actions for the robot. Example:
           - Human figure → call the `say_hi` tool with a friendly greeting (vary every time)
        Always respond in JSON with:
        {
            "description": "...",
            "action": "say_hi",
            "greeting_text": "a friendly greeting"
        }
        """

        messages_payload = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": [
                {"type": "text", "text": "Here is an image."},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
            ]}
        ]

        # Call VLM
        chat_completion = hf_client.chat.completions.create(
            model=HF_VLM_MODEL,
            messages=messages_payload,
            max_tokens=300
        )

        vlm_text = chat_completion.choices[0].message.content.strip()

        # Parse JSON from VLM
        try:
            action_data = json.loads(vlm_text)
        except json.JSONDecodeError:
            action_data = {"description": vlm_text, "action": None, "greeting_text": None}

        # Call the tool if action == say_hi
        tool_result = None
        if action_data.get("action") == "say_hi":
            greeting = action_data.get("greeting_text") or "Hi!"
            tool_result = say_hi(greeting_text=greeting)

        return {
            "saved_to_hf_hub": True,
            "repo_id": HF_DATASET_REPO,
            "path_in_repo": path_in_repo,
            "image_url": hf_url,
            "file_size_bytes": size_bytes,
            "robot_id": robot_id,
            "vlm_response": vlm_text,
            "vlm_action": action_data.get("action"),
            "vlm_description": action_data.get("description"),
            "tool_result": tool_result
        }

    except Exception as e:
        return {"error": f"An API error occurred: {str(e)}"}

# --- Gradio MCP Interface ---
demo = gr.Interface(
    fn=process_and_describe,
    inputs=gr.JSON(label="Input Payload"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    # Run FastMCP server *in the same process* (blocking)
    import threading

    def run_mcp():
        mcp.run(transport="stdio")

    t = threading.Thread(target=run_mcp, daemon=True)
    t.start()

    demo.launch(mcp_server=True)