OppaAI's picture
Update app.py
ef2db45 verified
raw
history blame
3.71 kB
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
from fastmcp import MCP, STIO
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# --- MCP server instance ---
mcp = MCP()
# --- STIO for the LLM ---
stio = STIO(mcp) # Bind STIO to MCP tools
# --- MCP Tool ---
@mcp.tools()
def say_hi(greeting_text: str = "Hi there!"):
"""Return a greeting command in JSON."""
return {"command": "say_hi", "text": greeting_text}
# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Save image & upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Initialize HF client
hf_client = InferenceClient(token=hf_token)
# --- System prompt with STIO instructions ---
system_prompt = f"""
You are a helpful robot assistant. You have access to MCP tools via STIO.
When you receive an image, you must:
1️⃣ Describe the image in detail.
2️⃣ Decide actions for the robot. Example:
- Human figure → call `say_hi` tool with a friendly greeting (vary every time)
3️⃣ Use STIO to call the tools. Always respond in JSON if calling tools.
Available tools:
{stio.describe_tools()}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# --- Call VLM with STIO ---
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300
)
vlm_text = chat_completion.choices[0].message.content.strip()
# --- Use STIO to execute tool calls if present ---
tool_results = stio.run(vlm_text)
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"tool_results": tool_results
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
#
if __name__ == "__main__":
demo.launch(mcp_server=True)