Spaces:
Sleeping
Sleeping
| import base64 | |
| import io | |
| import gradio as gr | |
| from fastmcp import Client | |
| from fastmcp.client import StreamableHttpTransport | |
| import asyncio | |
| import ast | |
| import json | |
| import os | |
| # ------------------------------- | |
| # MCP server info | |
| # ------------------------------- | |
| ROBOT_ID = "CV_MCP_Client" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("Warning: HF_TOKEN not found. API calls may fail.") | |
| HF_TOKEN = "missing_token_placeholder" | |
| MCP_SERVER_URL = "https://mcp-1st-birthday-cv-mcp-server.hf.space/gradio_api/mcp/" | |
| SERVER_NAME = "CV_MCP_Server" | |
| TOOL_NAME = "CV_MCP_Server_robot_watch" | |
| # ------------------------------- | |
| # Initialize MCP client globally | |
| # ------------------------------- | |
| HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) | |
| MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) | |
| # ------------------------------- | |
| # Async function using user's HF token | |
| # ------------------------------- | |
| async def process_webcam_stream_async(image): | |
| if image is None: | |
| return "", "", "", "", "", "", "", "" | |
| if HF_TOKEN == "missing_token_placeholder": | |
| return "Error: HF_TOKEN not set locally.", "", "", "", "", "", "", "" | |
| # Convert image to Base64 | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| payload = { | |
| "hf_token_input": HF_TOKEN, | |
| "robot_id_input": ROBOT_ID, | |
| "image_b64_input": b64_img | |
| } | |
| try: | |
| async with MCP_CLIENT: | |
| response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) | |
| if response.is_error: | |
| # Handle error content safely | |
| error_msg = "Unknown Error" | |
| if hasattr(response, 'content') and isinstance(response.content, list): | |
| error_msg = " ".join([getattr(item, 'text', '') for item in response.content]) | |
| raise Exception(f"MCP Tool Error: {error_msg}") | |
| # --------------------------------------------------------- | |
| # FIX: Handle List Content | |
| # The 'content' is a list of objects (e.g., TextContent). | |
| # We iterate through the list and join the text parts. | |
| # --------------------------------------------------------- | |
| raw_text = "" | |
| if hasattr(response, 'content') and isinstance(response.content, list): | |
| for item in response.content: | |
| # Check if the item has a 'text' attribute | |
| if hasattr(item, 'text'): | |
| raw_text += item.text | |
| else: | |
| # Fallback for unexpected structure | |
| raw_text = str(response) | |
| # 6. PARSE RESPONSE | |
| try: | |
| response_dict = json.loads(raw_text) | |
| except json.JSONDecodeError: | |
| try: | |
| response_dict = ast.literal_eval(raw_text) | |
| except Exception: | |
| # If parsing fails completely, return the raw text in description | |
| return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", "" | |
| vlm_result = response_dict.get("result", {}) | |
| # 7. EXTRACT DATA | |
| description_out = vlm_result.get("description", "") | |
| environment_out = vlm_result.get("environment", "") | |
| indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "") | |
| lighting_condition_out = vlm_result.get("lighting_condition", "") | |
| human_out = vlm_result.get("human", "") | |
| animals_out = vlm_result.get("animals", "") | |
| objects_list = vlm_result.get("objects", []) | |
| hazards_out = vlm_result.get("hazards", "") | |
| objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) | |
| return ( | |
| description_out, | |
| environment_out, | |
| indoor_outdoor_out, | |
| lighting_condition_out, | |
| human_out, | |
| animals_out, | |
| objects_str, | |
| hazards_out | |
| ) | |
| except Exception as e: | |
| print(f"Error calling MCP API: {e}") | |
| return f"Error: {e}", "", "", "", "", "", "", "" | |
| # ------------------------------- | |
| # Gradio UI | |
| # ------------------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)") | |
| gr.Markdown( | |
| """ | |
| This interface captures a live webcam feed and sends each frame to the MCP Client for analysis. | |
| The system extracts detailed information from the scene — including descriptions, detected objects, | |
| humans, animals, environmental context, lighting conditions, and potential hazards. | |
| Use this dashboard to observe how the robot interprets the world in real time. | |
| """ | |
| ) | |
| with gr.Row(): | |
| webcam_input = gr.Image( | |
| label="Captured from Web-Cam", | |
| sources=["webcam"], | |
| type="pil" | |
| ) | |
| with gr.Column(): | |
| description_out = gr.Textbox(label="Description", lines=5) | |
| environment_out = gr.Textbox(label="Environment", lines=3) | |
| indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1) | |
| lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1) | |
| human_out = gr.Textbox(label="Human Detected", lines=3) | |
| animals_out = gr.Textbox(label="Animals Detected", lines=2) | |
| objects_out = gr.Textbox(label="Objects Detected", lines=2) | |
| hazards_out = gr.Textbox(label="Hazards Identified", lines=2) | |
| webcam_input.stream( | |
| process_webcam_stream_async, | |
| inputs=[webcam_input], | |
| outputs=[ | |
| description_out, | |
| environment_out, | |
| indoor_outdoor_out, | |
| lighting_condition_out, | |
| human_out, | |
| animals_out, | |
| objects_out, | |
| hazards_out | |
| ], | |
| stream_every=1.0 | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False) |