import os import base64 import time import io import gradio as gr # Replace gradio_client with fastmcp Client and transport from fastmcp import Client from fastmcp.client import StreamableHttpTransport # Import asyncio to manage async calls within the stream function import asyncio from dotenv import load_dotenv # Load environment variables (ensure .env is set up locally) load_dotenv() ROBOT_ID = os.environ.get("ROBOT_ID", "unknown") HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") if not HF_TOKEN: print("Warning: HF_TOKEN not found. API calls may fail.") # The MCP URL of your remote server MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" SERVER_NAME = "Robot_MCP_Server" # The exact tool name that matches the server function: TOOL_NAME = "Robot_MCP_Server_robot_watch" # Initialize the MCP client globally HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) # This function needs to be an async function because client.call_tool is async async def process_webcam_stream_async(image): """Send webcam image to HF MCP Server using MCP protocol and get result""" if image is None: return "", "", "", "" # Convert Image to base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Prepare payload using the keys the server expects (from the working client) payload = { "hf_token_input": HF_TOKEN, "robot_id_input": ROBOT_ID, "image_b64_input": b64_img } try: # Use the global client instance to call the tool asynchronously async with MCP_CLIENT: response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) if response.is_error: error_text = response.content.text if response.content else "Unknown error" raise Exception(f"MCP Tool Error: {error_text}") # Parse the JSON string response from the server's output import json response_dict = json.loads(response.content.text) vlm_result = response_dict.get("result", {}) description_out = vlm_result.get("description", "") human_out = vlm_result.get("human", "") objects_list = vlm_result.get("objects", []) environment_out = vlm_result.get("environment", "") objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) return ( description_out, human_out, objects_str, environment_out ) except Exception as e: print(f"Error calling remote MCP API: {e}") return f"Error: {e}", "", "", "" with gr.Blocks() as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)") with gr.Row(): webcam_input = gr.Image( label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil" ) with gr.Column(): description_out = gr.Textbox(label="Description") human_out = gr.Textbox(label="Human") objects_out = gr.Textbox(label="Objects") environment_out = gr.Textbox(label="Environment") # Gradio handles the local streaming loop and automatically wraps async functions webcam_input.stream( process_webcam_stream_async, # Use the async function here inputs=[webcam_input], outputs=[description_out, human_out, objects_out, environment_out], stream_every=0.5 ) if __name__ == "__main__": demo.launch()