OppaAI's picture
Update app.py
dfddc79 verified
raw
history blame
3.79 kB
import os
import base64
import time
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
from dotenv import load_dotenv
# Load environment variables (ensure .env is set up locally)
load_dotenv()
ROBOT_ID = "Robot_MCP_Client"
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
print("Warning: HF_TOKEN not found. API calls may fail.")
HF_TOKEN = "missing_token_placeholder"
# The MCP URL of your remote server
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"
# Initialize the MCP client globally
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
async def process_webcam_stream_async(image):
"""Send webcam image to HF MCP Server using MCP protocol and get result"""
if image is None:
return "", "", "", ""
if HF_TOKEN == "missing_token_placeholder":
return "Error: HF_TOKEN not set locally.", "", "", ""
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
payload = {
"hf_token_input": HF_TOKEN,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
try:
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
# Access the first item in the content list for the error text
error_text = response.content[0].text if response.content and isinstance(response.content, list) else "Unknown error"
raise Exception(f"MCP Tool Error: {error_text}")
import json
# Access the first item in the content list for the raw JSON string
raw_text = response.content[0].text
response_dict = json.loads(raw_text)
vlm_result = response_dict.get("result", {})
description_out = vlm_result
human_out = vlm_result
objects_str = vlm_result
environment_out = vlm_result
#objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
return (
description_out,
human_out,
objects_str,
environment_out
)
except Exception as e:
print(f"Error calling remote MCP API: {e}")
return f"Error: {e}", "", "", ""
with gr.Blocks() as demo:
gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)")
gr.Markdown("""
### 🔑 Hugging Face Token Required
To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`.
**A write token is required** to upload images to the public dataset associated with this space. The resource usage for VLM inference will be tracked against *your* account.
""")
with gr.Row():
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["upload", "webcam"],
type="pil"
)
with gr.Column():
description_out = gr.Textbox(label="Description")
human_out = gr.Textbox(label="Human")
objects_out = gr.Textbox(label="Objects")
environment_out = gr.Textbox(label="Environment")
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[description_out, human_out, objects_out, environment_out],
stream_every=0.5
)
if __name__ == "__main__":
demo.launch()