Spaces:
Sleeping
Sleeping
File size: 2,416 Bytes
0ef482f ec3d9e7 c071669 6c10eb2 0ef482f 6c10eb2 c071669 23f2922 444e2a5 71865dd ec3d9e7 0ef482f c071669 71865dd 23f2922 0ef482f 71865dd c071669 71865dd ec3d9e7 0ef482f 71865dd 0ef482f 71865dd 0ef482f 71865dd eb6d527 ec3d9e7 0ef482f d081bf3 71865dd 0ef482f 71865dd 0ef482f 444e2a5 0ef482f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | import gradio as gr
import json
import base64
from io import BytesIO
import requests
import os
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
HF_UPLOAD_URL = "https://huggingface.co/api/uploads"
def upload_to_hf(bytes_data):
"""Upload image bytes to HF and return image_url."""
resp = requests.post(
HF_UPLOAD_URL,
headers={"Authorization": f"Bearer {HF_TOKEN}"},
files={"file": ("temp.jpg", bytes_data, "image/jpeg")}
)
if resp.status_code != 200:
raise RuntimeError(f"HF upload failed: {resp.text}")
url = resp.json()["url"]
return url
def process(payload: dict):
try:
if not HF_TOKEN:
return {"error": "Missing HF token."}
robot_id = payload.get("robot_id", "unknown")
# --- get image bytes
image_b64 = payload["image_b64"]
img_bytes = base64.b64decode(image_b64)
# --- upload to HF (get public URL)
image_url = upload_to_hf(img_bytes)
# --- VLM request (image_url only)
data = {
"model": MODEL,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image in detail."},
{"type": "image_url", "image_url": {"url": image_url}}
]
}
]
}
resp = requests.post(
"https://router.huggingface.co/v1/chat/completions",
headers={"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"},
data=json.dumps(data),
timeout=60
)
if resp.status_code != 200:
return {"error": f"VLM API error: {resp.status_code}, {resp.text}"}
try:
vlm_text = resp.json()["choices"][0]["message"]["content"][0]["text"]
except:
return {"error": f"Bad VLM response: {resp.text}"}
return {
"received": True,
"robot_id": robot_id,
"vllm_analysis": vlm_text
}
except Exception as e:
return {"error": str(e)}
demo = gr.Interface(
fn=process,
inputs=gr.JSON(label="Input Payload (Dict format)"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
if __name__ == "__main__":
demo.launch(mcp_server=True)
|