Spaces:
Sleeping
Sleeping
File size: 3,921 Bytes
0ef482f 938f609 9d41b1d bbcef43 48607b7 1f8048b 9120a4e 1f8048b 406e27f 1f8048b bbcef43 1f8048b bbcef43 1f8048b a10dd0b 1f8048b 9d41b1d 406e27f 938f609 0ef482f dac9550 48607b7 bbcef43 48607b7 bbcef43 406e27f dac9550 bbcef43 dac9550 bbcef43 a10dd0b bbcef43 a10dd0b bbcef43 ac8a86a bbcef43 ac8a86a bbcef43 a10dd0b c5129eb a10dd0b bbcef43 c5129eb 9d41b1d bbcef43 c5129eb 9d41b1d c5129eb bbcef43 48607b7 dac9550 bbcef43 48607b7 dd3451f 53af268 938f609 48607b7 bbcef43 dd3451f ec3d9e7 0ef482f cd798bc d081bf3 a10dd0b 9a56bc2 444e2a5 0ef482f 17438da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-72B-Instruct"
# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
"""Save image to /tmp and upload to HF dataset."""
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Save & upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Init HF client
hf_client = InferenceClient(token=hf_token)
# System prompt: describe + suggest action
system_prompt = """
You are a helpful robot assistant.
1. Describe the image in detail.
2. Suggest what the robot should do next based on what it sees:
- Human figure → describe the human and say 'Hi'.
- Ball → move towards it.
- Obstacles → stop or avoid.
- Animal → identify the animal and take photos
Always respond in JSON:
{"description": "...", "action": {"move": "...", "interact": "..."}}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# Call VLM
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300
)
# Robustly extract text
try:
vlm_text = chat_completion.choices[0].message.content.strip()
except Exception:
# fallback if structure is different
vlm_text = str(chat_completion)
# Attempt to parse JSON from VLM
action_data = {}
try:
action_data = json.loads(vlm_text)
except Exception:
# If VLM didn't return valid JSON, wrap text as description
action_data = {"description": vlm_text, "action": {"move": "unknown", "interact": "unknown"}}
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"vlm_action": action_data.get("action", {}),
"vlm_description": action_data.get("description", "")
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload (Dict format with 'image_b64')"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
if __name__ == "__main__":
demo.launch(mcp_server=True)
|