File size: 2,416 Bytes
0ef482f
 
ec3d9e7
c071669
6c10eb2
0ef482f
6c10eb2
c071669
23f2922
444e2a5
71865dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec3d9e7
0ef482f
 
c071669
71865dd
23f2922
0ef482f
 
71865dd
 
c071669
71865dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec3d9e7
0ef482f
 
71865dd
 
0ef482f
 
 
 
 
 
71865dd
 
 
 
0ef482f
 
 
 
71865dd
eb6d527
ec3d9e7
0ef482f
 
d081bf3
71865dd
0ef482f
 
71865dd
 
0ef482f
 
444e2a5
0ef482f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import gradio as gr
import json
import base64
from io import BytesIO
import requests
import os

HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"

HF_UPLOAD_URL = "https://huggingface.co/api/uploads"

def upload_to_hf(bytes_data):
    """Upload image bytes to HF and return image_url."""
    resp = requests.post(
        HF_UPLOAD_URL,
        headers={"Authorization": f"Bearer {HF_TOKEN}"},
        files={"file": ("temp.jpg", bytes_data, "image/jpeg")}
    )

    if resp.status_code != 200:
        raise RuntimeError(f"HF upload failed: {resp.text}")

    url = resp.json()["url"]
    return url


def process(payload: dict):
    try:
        if not HF_TOKEN:
            return {"error": "Missing HF token."}

        robot_id = payload.get("robot_id", "unknown")

        # --- get image bytes
        image_b64 = payload["image_b64"]
        img_bytes = base64.b64decode(image_b64)

        # --- upload to HF (get public URL)
        image_url = upload_to_hf(img_bytes)

        # --- VLM request (image_url only)
        data = {
            "model": MODEL,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe this image in detail."},
                        {"type": "image_url", "image_url": {"url": image_url}}
                    ]
                }
            ]
        }

        resp = requests.post(
            "https://router.huggingface.co/v1/chat/completions",
            headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"},
            data=json.dumps(data),
            timeout=60
        )

        if resp.status_code != 200:
            return {"error": f"VLM API error: {resp.status_code}, {resp.text}"}

        try:
            vlm_text = resp.json()["choices"][0]["message"]["content"][0]["text"]
        except:
            return {"error": f"Bad VLM response: {resp.text}"}

        return {
            "received": True,
            "robot_id": robot_id,
            "vllm_analysis": vlm_text
        }

    except Exception as e:
        return {"error": str(e)}


demo = gr.Interface(
    fn=process,
    inputs=gr.JSON(label="Input Payload (Dict format)"),
    outputs=gr.JSON(label="Reply to Jetson"),
    api_name="predict"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)