import base64 import io import gradio as gr from fastmcp import Client from fastmcp.client import StreamableHttpTransport import asyncio import ast # ------------------------------- # MCP server info # ------------------------------- ROBOT_ID = "Robot_MCP_Client" # Local client identifier MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" SERVER_NAME = "Robot_MCP_Server" TOOL_NAME = "Robot_MCP_Server_robot_watch" # ------------------------------- # Initialize MCP client globally # ------------------------------- HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) # ------------------------------- # Async function using user's HF token # ------------------------------- async def process_webcam_stream_async(image, oauth_token: gr.OAuthToken | None): """ Send webcam image to MCP server using user's HF token and process the response. """ if oauth_token is None: return "Please log in first.", "", "", "", "", "", "", "" if image is None: return "", "", "", "", "", "", "", "" # Convert image to Base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Payload with user token payload = { "hf_token_input": oauth_token.token, "robot_id_input": ROBOT_ID, "image_b64_input": b64_img } try: async with MCP_CLIENT: response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) if response.is_error: error_text = response.content.text if response.content else "Unknown error" raise Exception(f"MCP Tool Error: {error_text}") raw_text = response.content.text response_dict = ast.literal_eval(raw_text) vlm_result = response_dict.get("result", {}) # Extract all fields description_out = vlm_result.get("description", "") human_out = vlm_result.get("human", "") environment_out = vlm_result.get("environment", "") indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "") lighting_condition_out = vlm_result.get("lighting_condition", "") animals_list = vlm_result.get("animals", []) hazards_list = vlm_result.get("hazards", []) objects_list = vlm_result.get("objects", []) # Convert lists to strings objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) animals_str = ", ".join(animals_list) if isinstance(animals_list, list) else str(animals_list) hazards_str = ", ".join(hazards_list) if isinstance(hazards_list, list) else str(hazards_list) return ( description_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_str, objects_str, hazards_str ) except Exception as e: print(f"Error calling MCP API: {e}") import traceback traceback.print_exc() return f"Error: {e}", "", "", "", "", "", "", "" # ------------------------------- # Gradio UI # ------------------------------- with gr.Blocks() as demo: # Hugging Face OAuth login button gr.LoginButton() gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)") with gr.Row(): webcam_input = gr.Image(label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil") with gr.Column(): description_out = gr.Textbox(label="Description", lines=5) environment_out = gr.Textbox(label="Environment", lines=3) indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1) lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1) human_out = gr.Textbox(label="Human Detected", lines=3) animals_out = gr.Textbox(label="Animals Detected", lines=2) objects_out = gr.Textbox(label="Objects Detected", lines=2) hazards_out = gr.Textbox(label="Hazards Identified", lines=2) # Only webcam input in inputs; Gradio automatically injects oauth_token webcam_input.stream( process_webcam_stream_async, inputs=[webcam_input], outputs=[ description_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_out, objects_out, hazards_out ], stream_every=1.0 ) if __name__ == "__main__": demo.launch()