CV_MCP_Client / app.py
OppaAI's picture
Update app.py
5895c26 verified
import base64
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
import ast
import json
import os
# -------------------------------
# MCP server info
# -------------------------------
ROBOT_ID = "CV_MCP_Client"
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
print("Warning: HF_TOKEN not found. API calls may fail.")
HF_TOKEN = "missing_token_placeholder"
MCP_SERVER_URL = "https://mcp-1st-birthday-cv-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "CV_MCP_Server"
TOOL_NAME = "CV_MCP_Server_robot_watch"
# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
# -------------------------------
# Async function using user's HF token
# -------------------------------
async def process_webcam_stream_async(image):
if image is None:
return "", "", "", "", "", "", "", ""
if HF_TOKEN == "missing_token_placeholder":
return "Error: HF_TOKEN not set locally.", "", "", "", "", "", "", ""
# Convert image to Base64
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
payload = {
"hf_token_input": HF_TOKEN,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
try:
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
# Handle error content safely
error_msg = "Unknown Error"
if hasattr(response, 'content') and isinstance(response.content, list):
error_msg = " ".join([getattr(item, 'text', '') for item in response.content])
raise Exception(f"MCP Tool Error: {error_msg}")
# ---------------------------------------------------------
# FIX: Handle List Content
# The 'content' is a list of objects (e.g., TextContent).
# We iterate through the list and join the text parts.
# ---------------------------------------------------------
raw_text = ""
if hasattr(response, 'content') and isinstance(response.content, list):
for item in response.content:
# Check if the item has a 'text' attribute
if hasattr(item, 'text'):
raw_text += item.text
else:
# Fallback for unexpected structure
raw_text = str(response)
# 6. PARSE RESPONSE
try:
response_dict = json.loads(raw_text)
except json.JSONDecodeError:
try:
response_dict = ast.literal_eval(raw_text)
except Exception:
# If parsing fails completely, return the raw text in description
return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", ""
vlm_result = response_dict.get("result", {})
# 7. EXTRACT DATA
description_out = vlm_result.get("description", "")
environment_out = vlm_result.get("environment", "")
indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "")
lighting_condition_out = vlm_result.get("lighting_condition", "")
human_out = vlm_result.get("human", "")
animals_out = vlm_result.get("animals", "")
objects_list = vlm_result.get("objects", [])
hazards_out = vlm_result.get("hazards", "")
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
return (
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_str,
hazards_out
)
except Exception as e:
print(f"Error calling MCP API: {e}")
return f"Error: {e}", "", "", "", "", "", "", ""
# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)")
gr.Markdown(
"""
This interface captures a live webcam feed and sends each frame to the MCP Client for analysis.
The system extracts detailed information from the scene — including descriptions, detected objects,
humans, animals, environmental context, lighting conditions, and potential hazards.
Use this dashboard to observe how the robot interprets the world in real time.
"""
)
with gr.Row():
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["webcam"],
type="pil"
)
with gr.Column():
description_out = gr.Textbox(label="Description", lines=5)
environment_out = gr.Textbox(label="Environment", lines=3)
indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1)
lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1)
human_out = gr.Textbox(label="Human Detected", lines=3)
animals_out = gr.Textbox(label="Animals Detected", lines=2)
objects_out = gr.Textbox(label="Objects Detected", lines=2)
hazards_out = gr.Textbox(label="Hazards Identified", lines=2)
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_out,
hazards_out
],
stream_every=1.0
)
if __name__ == "__main__":
demo.launch(ssr_mode=False)