Spaces:
Sleeping
Sleeping
File size: 4,342 Bytes
0ef482f 938f609 9d41b1d bbcef43 54fb5ed 48607b7 1f8048b 9c6065d 1f8048b d722b23 e6b6ea7 d722b23 9230f22 e6b6ea7 5410665 9c6065d d722b23 1f8048b e6b6ea7 1f8048b bbcef43 1f8048b bbcef43 1f8048b a10dd0b 1f8048b 9d41b1d 938f609 0ef482f dac9550 48607b7 bbcef43 48607b7 9c6065d 406e27f 9c6065d dac9550 e6b6ea7 9c6065d e6b6ea7 a10dd0b c5129eb a10dd0b bbcef43 c5129eb 9d41b1d e6b6ea7 c5129eb 9d41b1d c5129eb 5410665 48607b7 dac9550 d722b23 9c6065d e6b6ea7 48607b7 dd3451f 53af268 938f609 48607b7 bbcef43 e6b6ea7 dd3451f ec3d9e7 0ef482f cd798bc d081bf3 9a56bc2 9c6065d 9a56bc2 444e2a5 0ef482f acccc23 e6b6ea7 17438da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import FastMCP
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# --- MCP server instance ---
mcp = FastMCP(name="Robot MCP")
# --- MCP Tool ---
@mcp.tool()
def say_hi(greeting_text: str = "Hi there!") -> dict:
"""Return a greeting command in JSON."""
return {"command": "say_hi", "text": greeting_text}
# --- Helper Functions ---
def save_and_upload_image(image_b64: str, hf_token: str):
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Save image & upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Initialize HF client
hf_client = InferenceClient(token=hf_token)
# System prompt (without stio.describe_tools because not using STIO here)
system_prompt = """
You are a helpful robot assistant.
When you receive an image, you must:
1. Describe the image in detail.
2. Decide actions for the robot. Example:
- Human figure → call the `say_hi` tool with a friendly greeting (vary every time)
Always respond in JSON with:
{
"description": "...",
"action": "say_hi",
"greeting_text": "a friendly greeting"
}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# Call VLM
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300
)
vlm_text = chat_completion.choices[0].message.content.strip()
# Parse JSON from VLM
try:
action_data = json.loads(vlm_text)
except json.JSONDecodeError:
action_data = {"description": vlm_text, "action": None, "greeting_text": None}
# Call the tool if action == say_hi
tool_result = None
if action_data.get("action") == "say_hi":
greeting = action_data.get("greeting_text") or "Hi!"
tool_result = say_hi(greeting_text=greeting)
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"vlm_action": action_data.get("action"),
"vlm_description": action_data.get("description"),
"tool_result": tool_result
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
if __name__ == "__main__":
# Run FastMCP server *in the same process* (blocking)
import threading
def run_mcp():
mcp.run(transport="stdio")
t = threading.Thread(target=run_mcp, daemon=True)
t.start()
demo.launch(mcp_server=True)
|