OppaAI's picture
Update app.py
5253b0d verified
raw
history blame
4.94 kB
import os
import base64
import time
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
from dotenv import load_dotenv
import ast # For safely evaluating Python literals returned from server
# -------------------------------
# Load environment variables
# -------------------------------
load_dotenv()
ROBOT_ID = "Robot_MCP_Client" # Local client identifier
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
print("Warning: HF_TOKEN not found. API calls may fail.")
HF_TOKEN = "missing_token_placeholder" # Placeholder to avoid crash
# MCP server info
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"
# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
async def process_webcam_stream_async(image):
"""
Send webcam image to MCP server and process the response.
Args:
image (PIL.Image or None): Image captured from webcam or uploaded.
Returns:
tuple: (description, human, objects_str, environment)
description (str): Description of scene.
human (str): Human-related information.
objects_str (str): Comma-separated list of objects.
environment (str): Environment description.
"""
if image is None:
return "", "", "", ""
if HF_TOKEN == "missing_token_placeholder":
return "Error: HF_TOKEN not set locally.", "", "", ""
# Convert image to Base64 string
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Prepare payload according to server's expected fields
payload = {
"hf_token_input": HF_TOKEN,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
try:
# Use async context to call MCP server tool
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
# Extract error message
error_text = response.content[0].text if response.content and isinstance(response.content, list) else "Unknown error"
raise Exception(f"MCP Tool Error: {error_text}")
# Server may return Python-style string (single quotes)
raw_text = response.content[0].text
response_dict = ast.literal_eval(raw_text)
# -------------------------------
# Extract fields from response
# -------------------------------
vlm_result = response_dict.get("result", {})
description_out = vlm_result.get("description", "")
human_out = vlm_result.get("human", "")
objects_list = vlm_result.get("objects", [])
environment_out = vlm_result.get("environment", "")
# Convert objects list to a comma-separated string for display
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
return description_out, human_out, objects_str, environment_out
except Exception as e:
print(f"Error calling remote MCP API: {e}")
import traceback
traceback.print_exc()
return f"Error: {e}", "", "", ""
# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
gr.Markdown("## πŸŽ₯ Robot Vision Webcam Stream (using MCP Client)")
gr.Markdown("""
### πŸ”‘ Hugging Face Token Required
To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`.
**A write token is required** to upload images to the public dataset associated with this space.
Resource usage for VLM inference will be tracked against your account.
""")
with gr.Row():
# Webcam / upload image input
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["upload", "webcam"],
type="pil"
)
with gr.Column():
# Output fields for MCP response
description_out = gr.Textbox(label="Description", lines=5)
human_out = gr.Textbox(label="Human", lines=3)
objects_out = gr.Textbox(label="Objects", lines=2)
environment_out = gr.Textbox(label="Environment", lines=3)
# Stream webcam input to server every 0.5 seconds
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[description_out, human_out, objects_out, environment_out],
stream_every=0.5
)
if __name__ == "__main__":
demo.launch()