Spaces:
Sleeping
Sleeping
File size: 6,989 Bytes
165189d 627d59b 165189d 1fb1e3b 5253b0d 1fb1e3b b18ef1e 5253b0d 165189d 5253b0d 4decfa0 73ea45e 5253b0d 4decfa0 5253b0d 4decfa0 5253b0d 4decfa0 5253b0d 4decfa0 5253b0d 4decfa0 5253b0d 3c36d2f 5253b0d 4decfa0 5253b0d 4decfa0 5253b0d 4decfa0 5253b0d 4decfa0 0e3d6b3 4decfa0 0e3d6b3 4decfa0 0e3d6b3 5253b0d 0e3d6b3 4decfa0 0e3d6b3 5253b0d 4decfa0 0e3d6b3 4decfa0 0e3d6b3 4decfa0 0e3d6b3 4decfa0 5253b0d 4decfa0 5253b0d 0e3d6b3 165189d 5253b0d 27c0f8e 1fb1e3b 70b499c b458243 70b499c 5253b0d b458243 27c0f8e 5253b0d a3fed0c 73ea45e a3fed0c b18ef1e 5253b0d 3c36d2f 971f1e0 3c36d2f 27c0f8e 5253b0d 27c0f8e 17f5b16 27c0f8e 3c36d2f 7caebc5 27c0f8e 165189d 27c0f8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import os
import base64
import time
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
from dotenv import load_dotenv
import ast # For safely evaluating Python literals returned from server
# -------------------------------
# Load environment variables
# -------------------------------
load_dotenv()
ROBOT_ID = "Robot_MCP_Client" # Local client identifier
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
print("Warning: HF_TOKEN not found. API calls may fail.")
HF_TOKEN = "missing_token_placeholder" # Placeholder to avoid crash
# MCP server info
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"
# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
async def process_webcam_stream_async(image):
"""
Send webcam image to MCP server and process the response.
Args:
image (PIL.Image or None): Image captured from webcam or uploaded.
Returns:
tuple: (description, environment, indoor_or_outdoor, lighting_condition, human, animals_str, objects_str, hazards_str)
description (str): General description of the scene.
environment (str): Description of the surrounding environment.
indoor_or_outdoor (str): Whether the scene appears to be indoors or outdoors.
lighting_condition (str): Lighting condition (e.g., bright, dim, natural, artificial).
human (str): Information about any humans detected.
animals_str (str): Information about any animals detected, or "none".
objects_str (str): Comma-separated list of detected objects.
hazards_str (str): Comma-separated list of hazards, or "none".
"""
if image is None:
return "", "", "", ""
if HF_TOKEN == "missing_token_placeholder":
return "Error: HF_TOKEN not set locally.", "", "", ""
# Convert image to Base64 string
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Prepare payload according to server's expected fields
payload = {
"hf_token_input": HF_TOKEN,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
try:
# Use async context to call MCP server tool
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
# Extract error message using the correct attribute access
error_text = response.content.text if response.content else "Unknown error"
raise Exception(f"MCP Tool Error: {error_text}")
# Server may return Python-style string (single quotes)
# Corrected: Access the combined text content directly
raw_text = response.content.text
response_dict = ast.literal_eval(raw_text)
# -------------------------------
# Extract fields from response
# -------------------------------
vlm_result = response_dict.get("result", {})
description_out = vlm_result.get("description", "")
human_out = vlm_result.get("human", "")
environment_out = vlm_result.get("environment", "")
# New fields (assuming your server update added these)
indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "")
lighting_condition_out = vlm_result.get("lighting_condition", "")
animals_list = vlm_result.get("animals", []) # Assuming animals are in a list
hazards_list = vlm_result.get("hazards", []) # Assuming hazards are in a list
objects_list = vlm_result.get("objects", [])
# Convert lists to a comma-separated string for display
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
animals_str = ", ".join(animals_list) if isinstance(animals_list, list) else str(animals_list)
hazards_str = ", ".join(hazards_list) if isinstance(hazards_list, list) else str(hazards_list)
# Return all 8 fields in the correct order
return (
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_str,
objects_str,
hazards_str
)
except Exception as e:
print(f"Error calling remote MCP API: {e}")
import traceback
traceback.print_exc()
# Ensure error returns 8 values as well to maintain consistency
return f"Error: {e}", "", "", "", "", "", "", ""
# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)")
gr.Markdown("""
### 🔑 Hugging Face Token Required
To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`.
**A write token is required** to upload images to the public dataset associated with this space.
Resource usage for VLM inference will be tracked against your account.
""")
with gr.Row():
# Webcam / upload image input
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["upload", "webcam"],
type="pil"
)
with gr.Column():
# Output fields for MCP response
description_out = gr.Textbox(label="Description", lines=5)
environment_out = gr.Textbox(label="Environment", lines=3)
indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1)
lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1)
human_out = gr.Textbox(label="Human Detected", lines=3)
animals_out = gr.Textbox(label="Animals Detected", lines=2)
objects_out = gr.Textbox(label="Objects Detected", lines=2)
hazards_out = gr.Textbox(label="Hazards Identified", lines=2)
# Stream webcam input to server every 0.5 seconds
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[
description_out,
objects_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
hazards_out
],
stream_every=1.0
)
if __name__ == "__main__":
demo.launch()
|