Spaces:
Sleeping
Sleeping
File size: 3,708 Bytes
0ef482f 938f609 9d41b1d bbcef43 9c6065d 48607b7 1f8048b 9c6065d 1f8048b d722b23 9c6065d d722b23 9c6065d 5410665 9c6065d d722b23 1f8048b 406e27f 1f8048b bbcef43 1f8048b bbcef43 1f8048b a10dd0b 1f8048b 9d41b1d 938f609 0ef482f dac9550 48607b7 bbcef43 48607b7 9c6065d 406e27f 9c6065d dac9550 9c6065d a10dd0b c5129eb a10dd0b bbcef43 c5129eb 9d41b1d 9c6065d c5129eb 9d41b1d c5129eb 5410665 48607b7 dac9550 d722b23 9c6065d 48607b7 dd3451f 53af268 938f609 48607b7 bbcef43 9c6065d dd3451f ec3d9e7 0ef482f cd798bc d081bf3 9a56bc2 9c6065d 9a56bc2 444e2a5 ef2db45 0ef482f 17438da | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import MCP, STIO
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# --- MCP server instance ---
mcp = MCP()
# --- STIO for the LLM ---
stio = STIO(mcp) # Bind STIO to MCP tools
# --- MCP Tool ---
@mcp.tools()
def say_hi(greeting_text: str = "Hi there!"):
"""Return a greeting command in JSON."""
return {"command": "say_hi", "text": greeting_text}
# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Save image & upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Initialize HF client
hf_client = InferenceClient(token=hf_token)
# --- System prompt with STIO instructions ---
system_prompt = f"""
You are a helpful robot assistant. You have access to MCP tools via STIO.
When you receive an image, you must:
1️⃣ Describe the image in detail.
2️⃣ Decide actions for the robot. Example:
- Human figure → call `say_hi` tool with a friendly greeting (vary every time)
3️⃣ Use STIO to call the tools. Always respond in JSON if calling tools.
Available tools:
{stio.describe_tools()}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# --- Call VLM with STIO ---
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300
)
vlm_text = chat_completion.choices[0].message.content.strip()
# --- Use STIO to execute tool calls if present ---
tool_results = stio.run(vlm_text)
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"tool_results": tool_results
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
#
if __name__ == "__main__":
demo.launch(mcp_server=True)
|