import os import base64 import time import io import gradio as gr from fastmcp import Client from fastmcp.client import StreamableHttpTransport import asyncio from dotenv import load_dotenv import ast # For safely evaluating Python literals returned from server # ------------------------------- # Load environment variables # ------------------------------- load_dotenv() ROBOT_ID = "Robot_MCP_Client" # Local client identifier HF_TOKEN = os.environ.get("HF_TOKEN") if not HF_TOKEN: print("Warning: HF_TOKEN not found. API calls may fail.") HF_TOKEN = "missing_token_placeholder" # Placeholder to avoid crash # MCP server info MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" SERVER_NAME = "Robot_MCP_Server" TOOL_NAME = "Robot_MCP_Server_robot_watch" # ------------------------------- # Initialize MCP client globally # ------------------------------- HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) async def process_webcam_stream_async(image): """ Send webcam image to MCP server and process the response. Args: image (PIL.Image or None): Image captured from webcam or uploaded. Returns: tuple: (description, environment, indoor_or_outdoor, lighting_condition, human, animals_str, objects_str, hazards_str) description (str): General description of the scene. environment (str): Description of the surrounding environment. indoor_or_outdoor (str): Whether the scene appears to be indoors or outdoors. lighting_condition (str): Lighting condition (e.g., bright, dim, natural, artificial). human (str): Information about any humans detected. animals_str (str): Information about any animals detected, or "none". objects_str (str): Comma-separated list of detected objects. hazards_str (str): Comma-separated list of hazards, or "none". """ if image is None: return "", "", "", "" if HF_TOKEN == "missing_token_placeholder": return "Error: HF_TOKEN not set locally.", "", "", "" # Convert image to Base64 string buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Prepare payload according to server's expected fields payload = { "hf_token_input": HF_TOKEN, "robot_id_input": ROBOT_ID, "image_b64_input": b64_img } try: # Use async context to call MCP server tool async with MCP_CLIENT: response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) if response.is_error: # Extract error message error_text = response.content[0].text if response.content and isinstance(response.content, list) else "Unknown error" raise Exception(f"MCP Tool Error: {error_text}") # Server may return Python-style string (single quotes) raw_text = response.content[0].text response_dict = ast.literal_eval(raw_text) # ------------------------------- # Extract fields from response # ------------------------------- vlm_result = response_dict.get("result", {}) description_out = vlm_result.get("description", "") human_out = vlm_result.get("human", "") objects_list = vlm_result.get("objects", []) environment_out = vlm_result.get("environment", "") # Convert objects list to a comma-separated string for display objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) return description_out, human_out, objects_str, environment_out except Exception as e: print(f"Error calling remote MCP API: {e}") import traceback traceback.print_exc() return f"Error: {e}", "", "", "" # ------------------------------- # Gradio UI # ------------------------------- with gr.Blocks() as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)") gr.Markdown(""" ### 🔑 Hugging Face Token Required To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`. **A write token is required** to upload images to the public dataset associated with this space. Resource usage for VLM inference will be tracked against your account. """) with gr.Row(): # Webcam / upload image input webcam_input = gr.Image( label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil" ) with gr.Column(): # Output fields for MCP response description_out = gr.Textbox(label="Description", lines=5) environment_out = gr.Textbox(label="Environment", lines=3) indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1) lighting_out = gr.Textbox(label="Lighting Condition", lines=1) human_out = gr.Textbox(label="Human Detected", lines=3) animals_out = gr.Textbox(label="Animals Detected", lines=2) objects_out = gr.Textbox(label="Objects Detected", lines=2) hazards_out = gr.Textbox(label="Hazards Identified", lines=2) # Stream webcam input to server every 0.5 seconds webcam_input.stream( process_webcam_stream_async, inputs=[webcam_input], outputs=[ description_out, objects_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_out, hazards_out ], stream_every=1.0 ) if __name__ == "__main__": demo.launch()