File size: 1,664 Bytes
d081bf3
d82a7f0
 
6c10eb2
2ec7ad2
6c10eb2
4f14703
2ec7ad2
7e4a7ef
6c10eb2
f594945
4f14703
 
 
20f710b
4f14703
f594945
4f14703
f594945
 
 
 
 
 
 
4f14703
 
 
 
 
 
 
 
 
 
6c10eb2
 
4f14703
 
 
 
afac99d
f594945
eb6d527
 
6c10eb2
 
eb6d527
9fc69b3
afac99d
6c10eb2
d081bf3
4f14703
d081bf3
d82a7f0
4f14703
6c10eb2
afac99d
d081bf3
 
6c10eb2
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import gradio as gr
import base64
import json
import requests
import os

HF_ROUTER_API = "https://router.huggingface.co/hf-inference"
HF_TOKEN = os.getenv("HF_CV_ROBOT_TOKEN")
MODEL_NAME = "Qwen/Qwen3-VL-32B-Instruct"

def call_vlm_api(payload: dict):
    """
    Call Hugging Face Router Inference API with Base64 image.
    """
    headers = {"Authorization": f"Bearer {HF_TOKEN}"}
    
    data = {
        "model": MODEL_NAME,
        "inputs": [
            {
                "image": {"b64": payload["image_b64"]},
                "text": "Describe the image in detail."
            }
        ]
    }
    
    try:
        resp = requests.post(HF_ROUTER_API, headers=headers, json=data, timeout=60)
        if resp.status_code == 200:
            # 取第一個 generated_text
            return resp.json()[0].get("generated_text", "")
        else:
            return f"VLM API error: {resp.status_code}, {resp.text}"
    except Exception as e:
        return f"Exception: {str(e)}"

def process(payload: dict):
    """
    Process JSON payload from Jetson: Base64 image + robot_id
    Return JSON with VLM analysis
    """
    try:
        vlm_text = call_vlm_api(payload)
        reply = {
            "received": True,
            "robot_id": payload.get("robot_id", "unknown"),
            "vllm_analysis": vlm_text
        }
        return reply
    except Exception as e:
        return {"error": str(e)}

# Gradio MCP server
demo = gr.Interface(
    fn=process,
    inputs=gr.JSON(label="Input Payload from Jetson"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)