File size: 3,875 Bytes
0ef482f
938f609
 
9d41b1d
bbcef43
48607b7
1f8048b
 
c5129eb
1f8048b
 
406e27f
1f8048b
 
bbcef43
1f8048b
 
 
bbcef43
1f8048b
 
 
 
a10dd0b
1f8048b
 
 
 
 
9d41b1d
406e27f
938f609
 
0ef482f
dac9550
 
 
 
48607b7
bbcef43
 
 
48607b7
bbcef43
406e27f
dac9550
bbcef43
dac9550
 
bbcef43
a10dd0b
bbcef43
a10dd0b
bbcef43
 
 
 
 
 
 
a10dd0b
 
c5129eb
a10dd0b
bbcef43
 
 
 
c5129eb
9d41b1d
bbcef43
c5129eb
9d41b1d
c5129eb
bbcef43
48607b7
dac9550
bbcef43
 
 
 
 
 
 
 
 
 
 
 
 
 
48607b7
dd3451f
53af268
 
 
938f609
 
48607b7
bbcef43
 
 
dd3451f
ec3d9e7
0ef482f
cd798bc
d081bf3
a10dd0b
9a56bc2
 
 
 
 
 
 
444e2a5
0ef482f
17438da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json

# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" 

# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
    """Save image to /tmp and upload to HF dataset."""
    image_bytes = base64.b64decode(image_b64)
    local_tmp_path = "/tmp/tmp.jpg"
    with open(local_tmp_path, "wb") as f:
        f.write(image_bytes)

    path_in_repo = "images/tmp.jpg"
    upload_file(
        path_or_fileobj=local_tmp_path,
        path_in_repo=path_in_repo,
        repo_id=HF_DATASET_REPO,
        token=hf_token,
        repo_type="dataset"
    )

    hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
    return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)


# --- Main MCP function ---
def process_and_describe(payload: dict):
    try:
        hf_token = payload.get("hf_token")
        if not hf_token:
            return {"error": "HF token not provided in payload."}

        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload.get("image_b64")
        if not image_b64:
            return {"error": "No image provided."}

        # Save & upload
        local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)

        # Init HF client
        hf_client = InferenceClient(token=hf_token)

        # System prompt: describe + suggest action
        system_prompt = """
        You are a helpful robot assistant.
        1. Describe the image in detail.
        2. Suggest what the robot should do next based on what it sees:
           - Human figure → say 'Hi'.
           - Ball → move towards it.
           - Obstacles → stop or avoid.
           - Red button → press it.
        Always respond in JSON:
        {"description": "...", "action": {"move": "...", "interact": "..."}}
        """

        messages_payload = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": [
                {"type": "text", "text": "Here is an image."},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
            ]}
        ]

        # Call VLM
        chat_completion = hf_client.chat.completions.create(
            model=HF_VLM_MODEL,
            messages=messages_payload,
            max_tokens=300
        )

        # Robustly extract text
        try:
            vlm_text = chat_completion.choices[0].message.content.strip()
        except Exception:
            # fallback if structure is different
            vlm_text = str(chat_completion)

        # Attempt to parse JSON from VLM
        action_data = {}
        try:
            action_data = json.loads(vlm_text)
        except Exception:
            # If VLM didn't return valid JSON, wrap text as description
            action_data = {"description": vlm_text, "action": {"move": "unknown", "interact": "unknown"}}

        return {
            "saved_to_hf_hub": True,
            "repo_id": HF_DATASET_REPO,
            "path_in_repo": path_in_repo,
            "image_url": hf_url,
            "file_size_bytes": size_bytes,
            "robot_id": robot_id,
            "vlm_response": vlm_text,
            "vlm_action": action_data.get("action", {}),
            "vlm_description": action_data.get("description", "")
        }

    except Exception as e:
        return {"error": f"An API error occurred: {str(e)}"}


# --- Gradio MCP Interface ---
demo = gr.Interface(
    fn=process_and_describe,
    inputs=gr.JSON(label="Input Payload (Dict format with 'image_b64')"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)