Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import time | |
| import io | |
| import gradio as gr | |
| from fastmcp import Client | |
| from fastmcp.client import StreamableHttpTransport | |
| import asyncio | |
| from dotenv import load_dotenv | |
| import ast # For safely evaluating Python literals returned from server | |
| # ------------------------------- | |
| # Load environment variables | |
| # ------------------------------- | |
| load_dotenv() | |
| ROBOT_ID = "Robot_MCP_Client" # Local client identifier | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("Warning: HF_TOKEN not found. API calls may fail.") | |
| HF_TOKEN = "missing_token_placeholder" # Placeholder to avoid crash | |
| # MCP server info | |
| MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" | |
| SERVER_NAME = "Robot_MCP_Server" | |
| TOOL_NAME = "Robot_MCP_Server_robot_watch" | |
| # ------------------------------- | |
| # Initialize MCP client globally | |
| # ------------------------------- | |
| HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) | |
| MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) | |
| async def process_webcam_stream_async(image): | |
| """ | |
| Send webcam image to MCP server and process the response. | |
| Args: | |
| image (PIL.Image or None): Image captured from webcam or uploaded. | |
| Returns: | |
| tuple: (description, environment, indoor_or_outdoor, lighting_condition, human, animals_str, objects_str, hazards_str) | |
| description (str): General description of the scene. | |
| environment (str): Description of the surrounding environment. | |
| indoor_or_outdoor (str): Whether the scene appears to be indoors or outdoors. | |
| lighting_condition (str): Lighting condition (e.g., bright, dim, natural, artificial). | |
| human (str): Information about any humans detected. | |
| animals_str (str): Information about any animals detected, or "none". | |
| objects_str (str): Comma-separated list of detected objects. | |
| hazards_str (str): Comma-separated list of hazards, or "none". | |
| """ | |
| if image is None: | |
| return "", "", "", "" | |
| if HF_TOKEN == "missing_token_placeholder": | |
| return "Error: HF_TOKEN not set locally.", "", "", "" | |
| # Convert image to Base64 string | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| # Prepare payload according to server's expected fields | |
| payload = { | |
| "hf_token_input": HF_TOKEN, | |
| "robot_id_input": ROBOT_ID, | |
| "image_b64_input": b64_img | |
| } | |
| try: | |
| # Use async context to call MCP server tool | |
| async with MCP_CLIENT: | |
| response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) | |
| if response.is_error: | |
| # Extract error message using the correct attribute access | |
| error_text = response.content.text if response.content else "Unknown error" | |
| raise Exception(f"MCP Tool Error: {error_text}") | |
| # Server may return Python-style string (single quotes) | |
| # Corrected: Access the combined text content directly | |
| raw_text = response.content.text | |
| response_dict = ast.literal_eval(raw_text) | |
| # ------------------------------- | |
| # Extract fields from response | |
| # ------------------------------- | |
| vlm_result = response_dict.get("result", {}) | |
| description_out = vlm_result.get("description", "") | |
| human_out = vlm_result.get("human", "") | |
| environment_out = vlm_result.get("environment", "") | |
| # New fields (assuming your server update added these) | |
| indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "") | |
| lighting_condition_out = vlm_result.get("lighting_condition", "") | |
| animals_list = vlm_result.get("animals", []) # Assuming animals are in a list | |
| hazards_list = vlm_result.get("hazards", []) # Assuming hazards are in a list | |
| objects_list = vlm_result.get("objects", []) | |
| # Convert lists to a comma-separated string for display | |
| objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) | |
| animals_str = ", ".join(animals_list) if isinstance(animals_list, list) else str(animals_list) | |
| hazards_str = ", ".join(hazards_list) if isinstance(hazards_list, list) else str(hazards_list) | |
| # Return all 8 fields in the correct order | |
| return ( | |
| description_out, | |
| environment_out, | |
| indoor_outdoor_out, | |
| lighting_condition_out, | |
| human_out, | |
| animals_str, | |
| objects_str, | |
| hazards_str | |
| ) | |
| except Exception as e: | |
| print(f"Error calling remote MCP API: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Ensure error returns 8 values as well to maintain consistency | |
| return f"Error: {e}", "", "", "", "", "", "", "" | |
| # ------------------------------- | |
| # Gradio UI | |
| # ------------------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π₯ Robot Vision Webcam Stream (using MCP Client)") | |
| gr.Markdown(""" | |
| ### π Hugging Face Token Required | |
| To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`. | |
| **A write token is required** to upload images to the public dataset associated with this space. | |
| Resource usage for VLM inference will be tracked against your account. | |
| """) | |
| with gr.Row(): | |
| # Webcam / upload image input | |
| webcam_input = gr.Image( | |
| label="Captured from Web-Cam", | |
| sources=["upload", "webcam"], | |
| type="pil" | |
| ) | |
| with gr.Column(): | |
| # Output fields for MCP response | |
| description_out = gr.Textbox(label="Description", lines=5) | |
| environment_out = gr.Textbox(label="Environment", lines=3) | |
| indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1) | |
| lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1) | |
| human_out = gr.Textbox(label="Human Detected", lines=3) | |
| animals_out = gr.Textbox(label="Animals Detected", lines=2) | |
| objects_out = gr.Textbox(label="Objects Detected", lines=2) | |
| hazards_out = gr.Textbox(label="Hazards Identified", lines=2) | |
| # Stream webcam input to server every 0.5 seconds | |
| webcam_input.stream( | |
| process_webcam_stream_async, | |
| inputs=[webcam_input], | |
| outputs=[ | |
| description_out, | |
| objects_out, | |
| environment_out, | |
| indoor_outdoor_out, | |
| lighting_condition_out, | |
| human_out, | |
| animals_out, | |
| hazards_out | |
| ], | |
| stream_every=1.0 | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |