Spaces:
Sleeping
Sleeping
| import base64 | |
| import io | |
| import gradio as gr | |
| from fastmcp import Client | |
| from fastmcp.client import StreamableHttpTransport | |
| import asyncio | |
| import ast | |
| # ------------------------------- | |
| # MCP server info | |
| # ------------------------------- | |
| ROBOT_ID = "Robot_MCP_Client" # Local client identifier | |
| MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/" | |
| SERVER_NAME = "Robot_MCP_Server" | |
| TOOL_NAME = "Robot_MCP_Server_robot_watch" | |
| # ------------------------------- | |
| # Initialize MCP client globally | |
| # ------------------------------- | |
| HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL) | |
| MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME) | |
| # ------------------------------- | |
| # Async function using user's HF token | |
| # ------------------------------- | |
| async def process_webcam_stream_async(image, oauth_token: gr.OAuthToken | None): | |
| """ | |
| Send webcam image to MCP server using user's HF token and process the response. | |
| """ | |
| if oauth_token is None: | |
| return "Please log in first.", "", "", "", "", "", "", "" | |
| if image is None: | |
| return "", "", "", "", "", "", "", "" | |
| # Convert image to Base64 | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| # Payload with user token | |
| payload = { | |
| "hf_token_input": oauth_token.token, | |
| "robot_id_input": ROBOT_ID, | |
| "image_b64_input": b64_img | |
| } | |
| try: | |
| async with MCP_CLIENT: | |
| response = await MCP_CLIENT.call_tool(TOOL_NAME, payload) | |
| if response.is_error: | |
| error_text = response.content.text if response.content else "Unknown error" | |
| raise Exception(f"MCP Tool Error: {error_text}") | |
| raw_text = response.content.text | |
| response_dict = ast.literal_eval(raw_text) | |
| vlm_result = response_dict.get("result", {}) | |
| # Extract all fields | |
| description_out = vlm_result.get("description", "") | |
| human_out = vlm_result.get("human", "") | |
| environment_out = vlm_result.get("environment", "") | |
| indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "") | |
| lighting_condition_out = vlm_result.get("lighting_condition", "") | |
| animals_list = vlm_result.get("animals", []) | |
| hazards_list = vlm_result.get("hazards", []) | |
| objects_list = vlm_result.get("objects", []) | |
| # Convert lists to strings | |
| objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) | |
| animals_str = ", ".join(animals_list) if isinstance(animals_list, list) else str(animals_list) | |
| hazards_str = ", ".join(hazards_list) if isinstance(hazards_list, list) else str(hazards_list) | |
| return ( | |
| description_out, | |
| environment_out, | |
| indoor_outdoor_out, | |
| lighting_condition_out, | |
| human_out, | |
| animals_str, | |
| objects_str, | |
| hazards_str | |
| ) | |
| except Exception as e: | |
| print(f"Error calling MCP API: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"Error: {e}", "", "", "", "", "", "", "" | |
| # ------------------------------- | |
| # Gradio UI | |
| # ------------------------------- | |
| with gr.Blocks() as demo: | |
| # Hugging Face OAuth login button | |
| gr.LoginButton() | |
| gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)") | |
| with gr.Row(): | |
| webcam_input = gr.Image(label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil") | |
| with gr.Column(): | |
| description_out = gr.Textbox(label="Description", lines=5) | |
| environment_out = gr.Textbox(label="Environment", lines=3) | |
| indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1) | |
| lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1) | |
| human_out = gr.Textbox(label="Human Detected", lines=3) | |
| animals_out = gr.Textbox(label="Animals Detected", lines=2) | |
| objects_out = gr.Textbox(label="Objects Detected", lines=2) | |
| hazards_out = gr.Textbox(label="Hazards Identified", lines=2) | |
| # Only webcam input in inputs; Gradio automatically injects oauth_token | |
| webcam_input.stream( | |
| process_webcam_stream_async, | |
| inputs=[webcam_input], | |
| outputs=[ | |
| description_out, | |
| environment_out, | |
| indoor_outdoor_out, | |
| lighting_condition_out, | |
| human_out, | |
| animals_out, | |
| objects_out, | |
| hazards_out | |
| ], | |
| stream_every=1.0 | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |