import os import base64 import time import io import gradio as gr from fastmcp import Client from fastmcp.client import StreamableHttpTransport import asyncio from dotenv import load_dotenv # Load environment variables (ensure .env is set up locally) load_dotenv() ROBOT_ID = "Robot_MCP_Client" HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") if not HF_TOKEN: print("Warning: HF_TOKEN not found. API calls may fail.") # Set a placeholder string to avoid the 'None is not of type string' error # The API call will fail later due to auth, but validation will pass. HF_TOKEN = "missing_token_placeholder" # The MCP URL of your remote server MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" SERVER_NAME = "Robot_MCP_Server" TOOL_NAME = "Robot_MCP_Server_robot_watch" # Initialize the MCP client globally HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) async def process_webcam_stream_async(image): """Send webcam image to HF MCP Server using MCP protocol and get result""" if image is None: return "", "", "", "" # Check if a valid token is available before proceeding if HF_TOKEN == "missing_token_placeholder": return "Error: HF_TOKEN not set locally.", "", "", "" # Convert Image to base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Prepare payload payload = { "hf_token_input": HF_TOKEN, "robot_id_input": ROBOT_ID, "image_b64_input": b64_img } try: async with MCP_CLIENT: response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) if response.is_error: error_text = response.content.text if response.content else "Unknown error" raise Exception(f"MCP Tool Error: {error_text}") import json response_dict = json.loads(response.content.text) vlm_result = response_dict.get("result", {}) description_out = vlm_result.get("description", "") human_out = vlm_result.get("human", "") objects_list = vlm_result.get("objects", []) environment_out = vlm_result.get("environment", "") objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) return ( description_out, human_out, objects_str, environment_out ) except Exception as e: print(f"Error calling remote MCP API: {e}") return f"Error: {e}", "", "", "" with gr.Blocks() as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)") # ... (Gradio UI layout remains the same) ... with gr.Row(): webcam_input = gr.Image( label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil" ) with gr.Column(): description_out = gr.Textbox(label="Description") human_out = gr.Textbox(label="Human") objects_out = gr.Textbox(label="Objects") environment_out = gr.Textbox(label="Environment") webcam_input.stream( process_webcam_stream_async, inputs=[webcam_input], outputs=[description_out, human_out, objects_out, environment_out], stream_every=0.5 ) if __name__ == "__main__": demo.launch()