File size: 1,971 Bytes
0ef482f
 
ec3d9e7
6c10eb2
0ef482f
6c10eb2
24f75ec
 
444e2a5
0ef482f
24f75ec
ec3d9e7
0ef482f
 
 
 
 
 
24f75ec
 
0ef482f
 
ec3d9e7
 
 
 
0ef482f
24f75ec
ec3d9e7
 
 
 
 
0ef482f
 
 
 
 
 
 
 
 
 
24f75ec
0ef482f
 
 
 
 
 
 
 
 
eb6d527
ec3d9e7
0ef482f
 
d081bf3
24f75ec
0ef482f
 
 
 
 
 
444e2a5
0ef482f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import gradio as gr
import json
import base64
import requests
import os

HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")  # 在 Space Secrets 裡設定
MODEL = "Qwen/Qwen2-VL-7B-Instruct"  # 你要用的 VLM model

if not HF_TOKEN:
    print("ERROR: HF_CV_ROBOT_TOKEN is not set!")

def process(payload: dict):
    try:
        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload["image_b64"]

        headers = {"Authorization": f"Bearer {HF_TOKEN}"}

        # 這裡用 type='file' + Base64,Router API 支援直接解析
        data = {
            "model": MODEL,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe this image in detail."},
                        {"type": "file", "file": {"b64": image_b64, "name": "image.jpg"}}
                    ]
                }
            ]
        }

        resp = requests.post(
            "https://router.huggingface.co/v1/chat/completions",
            headers=headers,
            json=data,
            timeout=60
        )

        if resp.status_code != 200:
            return {"error": f"VLM API error: {resp.status_code}, {resp.text}"}

        # 解析回傳文字描述
        try:
            vlm_text = resp.json()["choices"][0]["message"]["content"][0]["text"]
        except (KeyError, IndexError, json.JSONDecodeError) as e:
            return {"error": f"Failed to parse VLM response: {e}, Response text: {resp.text}"}

        return {
            "received": True,
            "robot_id": robot_id,
            "vllm_analysis": vlm_text
        }

    except Exception as e:
        return {"error": str(e)}

# Gradio MCP Server
demo = gr.Interface(
    fn=process,
    inputs=gr.JSON(label="Input Payload (Dict format)"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)