File size: 1,928 Bytes
d081bf3
d82a7f0
ec3d9e7
 
 
6c10eb2
2ec7ad2
6c10eb2
86e4fb5
ec3d9e7
6c10eb2
ec3d9e7
 
 
6c10eb2
afac99d
ec3d9e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eb6d527
ec3d9e7
6c10eb2
eb6d527
ec3d9e7
afac99d
6c10eb2
d081bf3
ec3d9e7
 
 
d081bf3
d82a7f0
ec3d9e7
6c10eb2
afac99d
d081bf3
 
6c10eb2
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import gradio as gr
import json
import base64
from PIL import Image
import io
import requests
import os

HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
MODEL = "Qwen/Qwen3-VL-32B-Instruct"

# -------------------------------
# 主處理函數
# -------------------------------
def process(payload: dict):
    try:
        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload["image_b64"]

        # Base64 解碼成圖片,用 PIL 開啟
        img_bytes = base64.b64decode(image_b64)
        img = Image.open(io.BytesIO(img_bytes)).convert("RGB")

        # Router API payload
        headers = {"Authorization": f"Bearer {HF_TOKEN}"}
        data = {
            "model": MODEL,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe this image in detail."},
                        {"type": "image_data", "image_data": {"b64": image_b64}}
                    ]
                }
            ]
        }

        resp = requests.post(
            "https://router.huggingface.co/v1/chat/completions",
            headers=headers,
            json=data,
            timeout=60
        )

        if resp.status_code != 200:
            return {"error": f"VLM API error: {resp.status_code}, {resp.text}"}

        vlm_text = resp.json()["choices"][0]["message"]["content"][0]["text"]

        return {
            "received": True,
            "robot_id": robot_id,
            "vllm_analysis": vlm_text
        }

    except Exception as e:
        return {"error": str(e)}

# -------------------------------
# Gradio MCP Server
# -------------------------------
demo = gr.Interface(
    fn=process,
    inputs=gr.JSON(label="Input Payload (Dict format)"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)