File size: 1,973 Bytes
0ef482f
 
ec3d9e7
6c10eb2
0ef482f
6c10eb2
24f75ec
0413f6a
444e2a5
0ef482f
24f75ec
ec3d9e7
0ef482f
 
 
 
 
 
24f75ec
 
0ef482f
 
ec3d9e7
 
 
 
0ef482f
24f75ec
ec3d9e7
 
 
 
 
0ef482f
 
 
 
 
 
 
 
 
 
24f75ec
0ef482f
 
 
 
 
 
 
 
 
eb6d527
ec3d9e7
0ef482f
 
d081bf3
24f75ec
0ef482f
 
 
 
 
 
444e2a5
0ef482f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import gradio as gr
import json
import base64
import requests
import os

HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")  # 在 Space Secrets 裡設定
MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"  # 你要用的 VLM model

if not HF_TOKEN:
    print("ERROR: HF_CV_ROBOT_TOKEN is not set!")

def process(payload: dict):
    try:
        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload["image_b64"]

        headers = {"Authorization": f"Bearer {HF_TOKEN}"}

        # 這裡用 type='file' + Base64,Router API 支援直接解析
        data = {
            "model": MODEL,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe this image in detail."},
                        {"type": "file", "file": {"b64": image_b64, "name": "image.jpg"}}
                    ]
                }
            ]
        }

        resp = requests.post(
            "https://router.huggingface.co/v1/chat/completions",
            headers=headers,
            json=data,
            timeout=60
        )

        if resp.status_code != 200:
            return {"error": f"VLM API error: {resp.status_code}, {resp.text}"}

        # 解析回傳文字描述
        try:
            vlm_text = resp.json()["choices"][0]["message"]["content"][0]["text"]
        except (KeyError, IndexError, json.JSONDecodeError) as e:
            return {"error": f"Failed to parse VLM response: {e}, Response text: {resp.text}"}

        return {
            "received": True,
            "robot_id": robot_id,
            "vllm_analysis": vlm_text
        }

    except Exception as e:
        return {"error": str(e)}

# Gradio MCP Server
demo = gr.Interface(
    fn=process,
    inputs=gr.JSON(label="Input Payload (Dict format)"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)