OppaAI's picture
Update app.py
79f6e03 verified
raw
history blame
5.8 kB
import base64
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
import ast
import json
# -------------------------------
# MCP server info
# -------------------------------
ROBOT_ID = "Robot_MCP_Client"
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"
# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
# -------------------------------
# Async function using user's HF token
# -------------------------------
async def process_webcam_stream_async(image, oauth_token: gr.OAuthToken | None = None):
"""
Send webcam image to MCP server using user's HF token and process the response.
"""
# 1. CHECK LOGIN
if oauth_token is None:
return "Please log in using the button above.", "", "", "", "", "", "", ""
# 2. CHECK IMAGE
if image is None:
return "", "", "", "", "", "", "", ""
try:
# 3. PREPARE IMAGE
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# 4. PREPARE PAYLOAD
payload = {
"hf_token_input": oauth_token.token,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
# 5. CALL MCP SERVER
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
# Handle error content safely
error_msg = "Unknown Error"
if hasattr(response, 'content') and isinstance(response.content, list):
error_msg = " ".join([getattr(item, 'text', '') for item in response.content])
raise Exception(f"MCP Tool Error: {error_msg}")
# ---------------------------------------------------------
# FIX: Handle List Content
# The 'content' is a list of objects (e.g., TextContent).
# We iterate through the list and join the text parts.
# ---------------------------------------------------------
raw_text = ""
if hasattr(response, 'content') and isinstance(response.content, list):
for item in response.content:
# Check if the item has a 'text' attribute
if hasattr(item, 'text'):
raw_text += item.text
else:
# Fallback for unexpected structure
raw_text = str(response)
# 6. PARSE RESPONSE
try:
response_dict = json.loads(raw_text)
except json.JSONDecodeError:
try:
response_dict = ast.literal_eval(raw_text)
except Exception:
# If parsing fails completely, return the raw text in description
return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", ""
vlm_result = response_dict.get("result", {})
# 7. EXTRACT DATA
description_out = vlm_result.get("description", "")
environment_out = vlm_result.get("environment", "")
indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "")
lighting_condition_out = vlm_result.get("lighting_condition", "")
human_out = vlm_result.get("human", "")
animals_out = vlm_result.get("animals", "")
objects_list = vlm_result.get("objects", [])
hazards_out = vlm_result.get("hazards", "")
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
return (
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_str,
hazards_out
)
except Exception as e:
print(f"Error calling MCP API: {e}")
return f"Error: {e}", "", "", "", "", "", "", ""
# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)")
gr.LoginButton()
with gr.Row():
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["webcam"],
type="pil"
)
with gr.Column():
description_out = gr.Textbox(label="Description", lines=5)
environment_out = gr.Textbox(label="Environment", lines=3)
indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1)
lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1)
human_out = gr.Textbox(label="Human Detected", lines=3)
animals_out = gr.Textbox(label="Animals Detected", lines=2)
objects_out = gr.Textbox(label="Objects Detected", lines=2)
hazards_out = gr.Textbox(label="Hazards Identified", lines=2)
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_out,
hazards_out
],
stream_every=1.0
)
if __name__ == "__main__":
demo.launch(ssr_mode=False)