File size: 3,708 Bytes
0ef482f
938f609
 
9d41b1d
bbcef43
9c6065d
48607b7
1f8048b
 
9c6065d
1f8048b
d722b23
9c6065d
 
 
 
d722b23
 
 
9c6065d
5410665
9c6065d
d722b23
1f8048b
406e27f
1f8048b
bbcef43
1f8048b
 
 
bbcef43
1f8048b
 
 
 
a10dd0b
1f8048b
 
 
 
 
9d41b1d
938f609
 
0ef482f
dac9550
 
 
 
48607b7
bbcef43
 
 
48607b7
9c6065d
406e27f
9c6065d
 
dac9550
 
9c6065d
 
 
 
 
 
 
 
 
 
 
a10dd0b
 
c5129eb
a10dd0b
bbcef43
 
 
 
c5129eb
9d41b1d
9c6065d
c5129eb
9d41b1d
c5129eb
5410665
48607b7
dac9550
d722b23
9c6065d
 
 
48607b7
dd3451f
53af268
 
 
938f609
 
48607b7
bbcef43
9c6065d
dd3451f
ec3d9e7
0ef482f
cd798bc
d081bf3
9a56bc2
 
 
9c6065d
9a56bc2
 
 
444e2a5
ef2db45
0ef482f
17438da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import MCP, STIO

# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"

# --- MCP server instance ---
mcp = MCP()

# --- STIO for the LLM ---
stio = STIO(mcp)  # Bind STIO to MCP tools

# --- MCP Tool ---
@mcp.tools()
def say_hi(greeting_text: str = "Hi there!"):
    """Return a greeting command in JSON."""
    return {"command": "say_hi", "text": greeting_text}

# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
    image_bytes = base64.b64decode(image_b64)
    local_tmp_path = "/tmp/tmp.jpg"
    with open(local_tmp_path, "wb") as f:
        f.write(image_bytes)

    path_in_repo = "images/tmp.jpg"
    upload_file(
        path_or_fileobj=local_tmp_path,
        path_in_repo=path_in_repo,
        repo_id=HF_DATASET_REPO,
        token=hf_token,
        repo_type="dataset"
    )

    hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
    return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)

# --- Main MCP function ---
def process_and_describe(payload: dict):
    try:
        hf_token = payload.get("hf_token")
        if not hf_token:
            return {"error": "HF token not provided in payload."}

        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload.get("image_b64")
        if not image_b64:
            return {"error": "No image provided."}

        # Save image & upload
        local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)

        # Initialize HF client
        hf_client = InferenceClient(token=hf_token)

        # --- System prompt with STIO instructions ---
        system_prompt = f"""
        You are a helpful robot assistant. You have access to MCP tools via STIO.
        When you receive an image, you must:
        1️⃣ Describe the image in detail.
        2️⃣ Decide actions for the robot. Example:
           - Human figure → call `say_hi` tool with a friendly greeting (vary every time)
        3️⃣ Use STIO to call the tools. Always respond in JSON if calling tools.

        Available tools:
        {stio.describe_tools()}
        """

        messages_payload = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": [
                {"type": "text", "text": "Here is an image."},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
            ]}
        ]

        # --- Call VLM with STIO ---
        chat_completion = hf_client.chat.completions.create(
            model=HF_VLM_MODEL,
            messages=messages_payload,
            max_tokens=300
        )

        vlm_text = chat_completion.choices[0].message.content.strip()

        # --- Use STIO to execute tool calls if present ---
        tool_results = stio.run(vlm_text)

        return {
            "saved_to_hf_hub": True,
            "repo_id": HF_DATASET_REPO,
            "path_in_repo": path_in_repo,
            "image_url": hf_url,
            "file_size_bytes": size_bytes,
            "robot_id": robot_id,
            "vlm_response": vlm_text,
            "tool_results": tool_results
        }

    except Exception as e:
        return {"error": f"An API error occurred: {str(e)}"}

# --- Gradio MCP Interface ---
demo = gr.Interface(
    fn=process_and_describe,
    inputs=gr.JSON(label="Input Payload"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

#
if __name__ == "__main__":
    demo.launch(mcp_server=True)