File size: 3,105 Bytes
d081bf3
d82a7f0
ec3d9e7
 
 
6c10eb2
2ec7ad2
6c10eb2
444e2a5
86e4fb5
e37ca9c
6c10eb2
444e2a5
 
 
 
 
 
ec3d9e7
444e2a5
ec3d9e7
6c10eb2
afac99d
444e2a5
 
 
ec3d9e7
 
 
444e2a5
ec3d9e7
444e2a5
 
 
ec3d9e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
444e2a5
 
ec3d9e7
 
444e2a5
 
 
 
 
 
ec3d9e7
 
eb6d527
ec3d9e7
6c10eb2
eb6d527
ec3d9e7
afac99d
444e2a5
 
6c10eb2
d081bf3
ec3d9e7
 
 
d081bf3
d82a7f0
ec3d9e7
6c10eb2
afac99d
d081bf3
 
6c10eb2
 
444e2a5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import gradio as gr
import json
import base64
from PIL import Image
import io
import requests
import os

# Get token from environment variable
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
MODEL = "Qwen/Qwen2-VL-7B-Instruct"

# Check if the token is available when the script starts
if not HF_TOKEN:
    print("ERROR: HF_CV_ROBOT_TOKEN environment variable not set.")
    # In a real app, you might want to stop execution or handle this more gracefully
    # For a Gradio app in a Space, it might just fail upon the first request.

# -------------------------------
# 主處理函數 (Main Processing Function)
# -------------------------------
def process(payload: dict):
    try:
        if not HF_TOKEN:
            return {"error": "Hugging Face token is missing. Please check Space secrets."}
            
        robot_id = payload.get("robot_id", "unknown")
        image_b64 = payload["image_b64"]

        # Base64 解碼成圖片,用 PIL 開啟 (Decode base64 to image, open with PIL)
        img_bytes = base64.b64decode(image_b64)
        # We don't actually use the PIL image object in the rest of the code,
        # so this part is technically unnecessary for the API call, but harmless.
        # img = Image.open(io.BytesIO(img_bytes)).convert("RGB") 

        # Router API payload
        headers = {"Authorization": f"Bearer {HF_TOKEN}"}
        data = {
            "model": MODEL,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe this image in detail."},
                        {"type": "image_data", "image_data": {"b64": image_b64}}
                    ]
                }
            ]
        }

        resp = requests.post(
            "https://router.huggingface.co/v1/chat/completions",
            headers=headers,
            json=data,
            timeout=60
        )

        if resp.status_code != 200:
            # Added more detail to error logging
            print(f"VLM API error: {resp.status_code}, {resp.text}")
            return {"error": f"VLM API error: {resp.status_code}, {resp.text}"}

        # Check if the expected response structure exists before accessing it
        try:
            vlm_text = resp.json()["choices"][0]["message"]["content"][0]["text"]
        except (KeyError, IndexError, json.JSONDecodeError) as e:
            return {"error": f"Failed to parse VLM response: {e}, Response text: {resp.text}"}


        return {
            "received": True,
            "robot_id": robot_id,
            "vllm_analysis": vlm_text
        }

    except Exception as e:
        # Added logging for general exceptions
        print(f"An unexpected error occurred: {e}")
        return {"error": str(e)}

# -------------------------------
# Gradio MCP Server
# -------------------------------
demo = gr.Interface(
    fn=process,
    inputs=gr.JSON(label="Input Payload (Dict format)"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)