import os import base64 import gradio as gr from huggingface_hub import upload_file, InferenceClient import json from fastmcp import FastMCP # --- Config --- HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" # --- MCP server instance --- mcp = FastMCP(name="Robot MCP") # --- MCP Tool --- @mcp.tool() def say_hi(greeting_text: str = "Hi there!") -> dict: """Return a greeting command in JSON.""" return {"command": "say_hi", "text": greeting_text} # --- Helper Functions --- def save_and_upload_image(image_b64: str, hf_token: str): image_bytes = base64.b64decode(image_b64) local_tmp_path = "/tmp/tmp.jpg" with open(local_tmp_path, "wb") as f: f.write(image_bytes) path_in_repo = "images/tmp.jpg" upload_file( path_or_fileobj=local_tmp_path, path_in_repo=path_in_repo, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset" ) hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes) # --- Main MCP function --- def process_and_describe(payload: dict): try: hf_token = payload.get("hf_token") if not hf_token: return {"error": "HF token not provided in payload."} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "No image provided."} # Save image & upload local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) # Initialize HF client hf_client = InferenceClient(token=hf_token) # System prompt (without stio.describe_tools because not using STIO here) system_prompt = """ You are a helpful robot assistant. When you receive an image, you must: 1. Describe the image in detail. 2. Decide actions for the robot. Example: - Human figure → call the `say_hi` tool with a friendly greeting (vary every time) Always respond in JSON with: { "description": "...", "action": "say_hi", "greeting_text": "a friendly greeting" } """ messages_payload = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Here is an image."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] # Call VLM chat_completion = hf_client.chat.completions.create( model=HF_VLM_MODEL, messages=messages_payload, max_tokens=300 ) vlm_text = chat_completion.choices[0].message.content.strip() # Parse JSON from VLM try: action_data = json.loads(vlm_text) except json.JSONDecodeError: action_data = {"description": vlm_text, "action": None, "greeting_text": None} # Call the tool if action == say_hi tool_result = None if action_data.get("action") == "say_hi": greeting = action_data.get("greeting_text") or "Hi!" tool_result = say_hi(greeting_text=greeting) return { "saved_to_hf_hub": True, "repo_id": HF_DATASET_REPO, "path_in_repo": path_in_repo, "image_url": hf_url, "file_size_bytes": size_bytes, "robot_id": robot_id, "vlm_response": vlm_text, "vlm_action": action_data.get("action"), "vlm_description": action_data.get("description"), "tool_result": tool_result } except Exception as e: return {"error": f"An API error occurred: {str(e)}"} # --- Gradio MCP Interface --- demo = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input Payload"), outputs=gr.JSON(label="Reply to Jetson"), api_name="predict" ) if __name__ == "__main__": # Run FastMCP server *in the same process* (blocking) import threading def run_mcp(): mcp.run(transport="stdio") t = threading.Thread(target=run_mcp, daemon=True) t.start() demo.launch(mcp_server=True)