import base64 import io import gradio as gr from fastmcp import Client from fastmcp.client import StreamableHttpTransport import asyncio import ast import json import warnings # ------------------------------- # 0. CLEANUP: Ignore the spammy DeprecationWarnings # ------------------------------- warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=UserWarning) # ------------------------------- # MCP server info # ------------------------------- ROBOT_ID = "Robot_MCP_Client" MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" SERVER_NAME = "Robot_MCP_Server" TOOL_NAME = "Robot_MCP_Server_robot_watch" # ------------------------------- # Initialize MCP client globally # ------------------------------- HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) # ------------------------------- # Async function # ------------------------------- async def process_webcam_stream_async(image, oauth_token: gr.OAuthToken | None = None): # 1. Login Check if oauth_token is None: return "⚠️ Please log in via the button above to start.", "", "", "", "", "", "", "" # 2. Image Check if image is None: return "", "", "", "", "", "", "", "" try: # 3. Process Image buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") payload = { "hf_token_input": oauth_token.token, "robot_id_input": ROBOT_ID, "image_b64_input": b64_img } # 4. Call MCP Server async with MCP_CLIENT: response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) # Handle MCP Errors if response.is_error: error_msg = "Unknown Error" if hasattr(response, 'content') and isinstance(response.content, list): error_msg = " ".join([getattr(item, 'text', '') for item in response.content]) raise Exception(f"MCP Tool Error: {error_msg}") # 5. Extract Text from Response List raw_text = "" if hasattr(response, 'content') and isinstance(response.content, list): for item in response.content: if hasattr(item, 'text'): raw_text += item.text else: raw_text = str(response) # 6. Parse JSON/Dict try: response_dict = json.loads(raw_text) except json.JSONDecodeError: try: response_dict = ast.literal_eval(raw_text) except Exception: return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", "" vlm_result = response_dict.get("result", {}) # 7. Map to Outputs description_out = vlm_result.get("description", "") environment_out = vlm_result.get("environment", "") indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "") lighting_condition_out = vlm_result.get("lighting_condition", "") human_out = vlm_result.get("human", "") animals_out = vlm_result.get("animals", "") objects_list = vlm_result.get("objects", []) hazards_out = vlm_result.get("hazards", "") objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) return ( description_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_out, objects_str, hazards_out ) except Exception as e: print(f"Error: {e}") return f"Error: {e}", "", "", "", "", "", "", "" # ------------------------------- # Gradio UI # ------------------------------- with gr.Blocks(title="Robot Vision MCP") as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)") # Login Button gr.LoginButton() with gr.Row(): webcam_input = gr.Image( label="Webcam Input", sources=["webcam"], type="pil" ) with gr.Column(): description_out = gr.Textbox(label="Description", lines=4) with gr.Row(): environment_out = gr.Textbox(label="Environment") indoor_outdoor_out = gr.Textbox(label="In/Out") with gr.Row(): human_out = gr.Textbox(label="Humans") hazards_out = gr.Textbox(label="Hazards") # Hidden / Extra fields (optional, add back if needed) lighting_condition_out = gr.Textbox(visible=False) animals_out = gr.Textbox(visible=False) objects_out = gr.Textbox(visible=False) # ------------------------------- # STREAM CONFIGURATION (The Important Fix) # ------------------------------- webcam_input.stream( process_webcam_stream_async, inputs=[webcam_input], outputs=[ description_out, environment_out, indoor_outdoor_out, lighting_condition_out, human_out, animals_out, objects_out, hazards_out ], # Update every 3 seconds to give the AI time to think stream_every=3.0, # Wait for the previous request to finish before sending a new one concurrency_limit=1 ) if __name__ == "__main__": demo.launch(ssr_mode=False)