OppaAI's picture
Update app.py
acccc23 verified
raw
history blame
4.34 kB
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import FastMCP
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# --- MCP server instance ---
mcp = FastMCP(name="Robot MCP")
# --- MCP Tool ---
@mcp.tool()
def say_hi(greeting_text: str = "Hi there!") -> dict:
"""Return a greeting command in JSON."""
return {"command": "say_hi", "text": greeting_text}
# --- Helper Functions ---
def save_and_upload_image(image_b64: str, hf_token: str):
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Save image & upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Initialize HF client
hf_client = InferenceClient(token=hf_token)
# System prompt (without stio.describe_tools because not using STIO here)
system_prompt = """
You are a helpful robot assistant.
When you receive an image, you must:
1. Describe the image in detail.
2. Decide actions for the robot. Example:
- Human figure → call the `say_hi` tool with a friendly greeting (vary every time)
Always respond in JSON with:
{
"description": "...",
"action": "say_hi",
"greeting_text": "a friendly greeting"
}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# Call VLM
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300
)
vlm_text = chat_completion.choices[0].message.content.strip()
# Parse JSON from VLM
try:
action_data = json.loads(vlm_text)
except json.JSONDecodeError:
action_data = {"description": vlm_text, "action": None, "greeting_text": None}
# Call the tool if action == say_hi
tool_result = None
if action_data.get("action") == "say_hi":
greeting = action_data.get("greeting_text") or "Hi!"
tool_result = say_hi(greeting_text=greeting)
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"vlm_action": action_data.get("action"),
"vlm_description": action_data.get("description"),
"tool_result": tool_result
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
if __name__ == "__main__":
# Run FastMCP server *in the same process* (blocking)
import threading
def run_mcp():
mcp.run(transport="stdio")
t = threading.Thread(target=run_mcp, daemon=True)
t.start()
demo.launch(mcp_server=True)