Spaces:
Sleeping
Sleeping
File size: 5,795 Bytes
165189d 627d59b 165189d 1fb1e3b 5253b0d 1fb1e3b 3565497 306ab5e 165189d 5253b0d 3565497 5253b0d 306ab5e 4decfa0 5253b0d 4decfa0 5253b0d 4decfa0 3565497 9cd6aba 3565497 306ab5e 5253b0d 3565497 5253b0d 79f6e03 3565497 306ab5e 4decfa0 79f6e03 3565497 4decfa0 79f6e03 306ab5e 79f6e03 306ab5e 79f6e03 306ab5e 4decfa0 306ab5e 4decfa0 79f6e03 306ab5e 79f6e03 306ab5e 4decfa0 3565497 306ab5e 4decfa0 0e3d6b3 f745b5c 0e3d6b3 f745b5c 3565497 4decfa0 3565497 0e3d6b3 f745b5c 0e3d6b3 f745b5c 0e3d6b3 3565497 4decfa0 9cd6aba 0e3d6b3 165189d 9cd6aba 5253b0d 27c0f8e 3565497 306ab5e 27c0f8e 306ab5e b18ef1e 5253b0d 3c36d2f 971f1e0 3c36d2f 27c0f8e 17f5b16 306ab5e 3c36d2f 3565497 3c36d2f 7caebc5 27c0f8e 165189d 79f6e03 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | import base64
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
import ast
import json
# -------------------------------
# MCP server info
# -------------------------------
ROBOT_ID = "Robot_MCP_Client"
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"
# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
# -------------------------------
# Async function using user's HF token
# -------------------------------
async def process_webcam_stream_async(image, oauth_token: gr.OAuthToken | None = None):
"""
Send webcam image to MCP server using user's HF token and process the response.
"""
# 1. CHECK LOGIN
if oauth_token is None:
return "Please log in using the button above.", "", "", "", "", "", "", ""
# 2. CHECK IMAGE
if image is None:
return "", "", "", "", "", "", "", ""
try:
# 3. PREPARE IMAGE
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# 4. PREPARE PAYLOAD
payload = {
"hf_token_input": oauth_token.token,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
# 5. CALL MCP SERVER
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
# Handle error content safely
error_msg = "Unknown Error"
if hasattr(response, 'content') and isinstance(response.content, list):
error_msg = " ".join([getattr(item, 'text', '') for item in response.content])
raise Exception(f"MCP Tool Error: {error_msg}")
# ---------------------------------------------------------
# FIX: Handle List Content
# The 'content' is a list of objects (e.g., TextContent).
# We iterate through the list and join the text parts.
# ---------------------------------------------------------
raw_text = ""
if hasattr(response, 'content') and isinstance(response.content, list):
for item in response.content:
# Check if the item has a 'text' attribute
if hasattr(item, 'text'):
raw_text += item.text
else:
# Fallback for unexpected structure
raw_text = str(response)
# 6. PARSE RESPONSE
try:
response_dict = json.loads(raw_text)
except json.JSONDecodeError:
try:
response_dict = ast.literal_eval(raw_text)
except Exception:
# If parsing fails completely, return the raw text in description
return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", ""
vlm_result = response_dict.get("result", {})
# 7. EXTRACT DATA
description_out = vlm_result.get("description", "")
environment_out = vlm_result.get("environment", "")
indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "")
lighting_condition_out = vlm_result.get("lighting_condition", "")
human_out = vlm_result.get("human", "")
animals_out = vlm_result.get("animals", "")
objects_list = vlm_result.get("objects", [])
hazards_out = vlm_result.get("hazards", "")
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
return (
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_str,
hazards_out
)
except Exception as e:
print(f"Error calling MCP API: {e}")
return f"Error: {e}", "", "", "", "", "", "", ""
# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)")
gr.LoginButton()
with gr.Row():
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["webcam"],
type="pil"
)
with gr.Column():
description_out = gr.Textbox(label="Description", lines=5)
environment_out = gr.Textbox(label="Environment", lines=3)
indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1)
lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1)
human_out = gr.Textbox(label="Human Detected", lines=3)
animals_out = gr.Textbox(label="Animals Detected", lines=2)
objects_out = gr.Textbox(label="Objects Detected", lines=2)
hazards_out = gr.Textbox(label="Hazards Identified", lines=2)
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_out,
hazards_out
],
stream_every=1.0
)
if __name__ == "__main__":
demo.launch(ssr_mode=False) |