OppaAI's picture
Update app.py
9cd6aba verified
raw
history blame
4.77 kB
import base64
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
import ast
# -------------------------------
# MCP server info
# -------------------------------
ROBOT_ID = "Robot_MCP_Client" # Local client identifier
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"
# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)
# -------------------------------
# Async function using user's HF token
# -------------------------------
async def process_webcam_stream_async(image, oauth_token: gr.OAuthToken | None):
"""
Send webcam image to MCP server using user's HF token and process the response.
"""
if oauth_token is None:
return "Please log in first.", "", "", "", "", "", "", ""
if image is None:
return "", "", "", "", "", "", "", ""
# Convert image to Base64
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Payload with user token
payload = {
"hf_token_input": oauth_token.token,
"robot_id_input": ROBOT_ID,
"image_b64_input": b64_img
}
try:
async with MCP_CLIENT:
response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
if response.is_error:
error_text = response.content.text if response.content else "Unknown error"
raise Exception(f"MCP Tool Error: {error_text}")
raw_text = response.content.text
response_dict = ast.literal_eval(raw_text)
vlm_result = response_dict.get("result", {})
# Extract all fields
description_out = vlm_result.get("description", "")
human_out = vlm_result.get("human", "")
environment_out = vlm_result.get("environment", "")
indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "")
lighting_condition_out = vlm_result.get("lighting_condition", "")
animals_list = vlm_result.get("animals", [])
hazards_list = vlm_result.get("hazards", [])
objects_list = vlm_result.get("objects", [])
# Convert lists to strings
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
animals_str = ", ".join(animals_list) if isinstance(animals_list, list) else str(animals_list)
hazards_str = ", ".join(hazards_list) if isinstance(hazards_list, list) else str(hazards_list)
return (
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_str,
objects_str,
hazards_str
)
except Exception as e:
print(f"Error calling MCP API: {e}")
import traceback
traceback.print_exc()
return f"Error: {e}", "", "", "", "", "", "", ""
# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
# Hugging Face OAuth login button
gr.LoginButton()
gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)")
with gr.Row():
webcam_input = gr.Image(label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil")
with gr.Column():
description_out = gr.Textbox(label="Description", lines=5)
environment_out = gr.Textbox(label="Environment", lines=3)
indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1)
lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1)
human_out = gr.Textbox(label="Human Detected", lines=3)
animals_out = gr.Textbox(label="Animals Detected", lines=2)
objects_out = gr.Textbox(label="Objects Detected", lines=2)
hazards_out = gr.Textbox(label="Hazards Identified", lines=2)
# Only webcam input in inputs; Gradio automatically injects oauth_token
webcam_input.stream(
process_webcam_stream_async,
inputs=[webcam_input],
outputs=[
description_out,
environment_out,
indoor_outdoor_out,
lighting_condition_out,
human_out,
animals_out,
objects_out,
hazards_out
],
stream_every=1.0
)
if __name__ == "__main__":
demo.launch()