File size: 3,785 Bytes
165189d
 
 
627d59b
165189d
1fb1e3b
 
 
 
 
b18ef1e
165189d
b18ef1e
 
 
1fb1e3b
165189d
b18ef1e
 
 
1fb1e3b
 
 
 
 
165189d
 
1fb1e3b
 
 
 
 
 
 
 
27c0f8e
73ea45e
165189d
ef0cc41
27c0f8e
73ea45e
27c0f8e
165189d
1fb1e3b
165189d
b18ef1e
 
 
165189d
 
 
1fb1e3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
165189d
1fb1e3b
73ea45e
 
165189d
27c0f8e
1fb1e3b
165189d
27c0f8e
a3fed0c
 
73ea45e
a3fed0c
 
b18ef1e
 
 
 
 
27c0f8e
1fb1e3b
27c0f8e
1fb1e3b
27c0f8e
 
 
 
165189d
 
27c0f8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import base64
import time
import io
import gradio as gr
# Replace gradio_client with fastmcp Client and transport
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport 
# Import asyncio to manage async calls within the stream function
import asyncio
from dotenv import load_dotenv

# Load environment variables (ensure .env is set up locally)
load_dotenv()

ROBOT_ID = os.environ.get("ROBOT_ID", "unknown")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
if not HF_TOKEN:
    print("Warning: HF_TOKEN not found. API calls may fail.")

# The MCP URL of your remote server
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
# The exact tool name that matches the server function:
TOOL_NAME = "Robot_MCP_Server_gradio_ui_with_base64_fields" 


# Initialize the MCP client globally
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)


# This function needs to be an async function because client.call_tool is async
async def process_webcam_stream_async(image):
    """Send webcam image to HF MCP Server using MCP protocol and get result"""
    if image is None:
        return "", "", "", ""

    # Convert Image to base64
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    # Prepare payload using the keys the server expects (from the working client)
    payload = {
        "hf_token_input": HF_TOKEN,
        "robot_id_input": ROBOT_ID,
        "image_b64_input": b64_img
    }

    try:
        # Use the global client instance to call the tool asynchronously
        async with MCP_CLIENT:
            response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
            
            if response.is_error:
                error_text = response.content.text if response.content else "Unknown error"
                raise Exception(f"MCP Tool Error: {error_text}")

            # Parse the JSON string response from the server's output
            import json
            response_dict = json.loads(response.content.text)
            
            vlm_result = response_dict.get("result", {})
            
            description_out = vlm_result.get("description", "")
            human_out = vlm_result.get("human", "")
            objects_list = vlm_result.get("objects", [])
            environment_out = vlm_result.get("environment", "")

            objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

            return (
                description_out,
                human_out,
                objects_str,
                environment_out
            )
            
    except Exception as e:
        print(f"Error calling remote MCP API: {e}") 
        return f"Error: {e}", "", "", ""


with gr.Blocks() as demo:
    gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)")

    with gr.Row():
        webcam_input = gr.Image(
            label="Captured from Web-Cam",
            sources=["upload", "webcam"],
            type="pil"
        )
        with gr.Column():
            description_out = gr.Textbox(label="Description")
            human_out = gr.Textbox(label="Human")
            objects_out = gr.Textbox(label="Objects")
            environment_out = gr.Textbox(label="Environment")

    # Gradio handles the local streaming loop and automatically wraps async functions
    webcam_input.stream(
        process_webcam_stream_async, # Use the async function here
        inputs=[webcam_input],
        outputs=[description_out, human_out, objects_out, environment_out],
        stream_every=0.5
    )

if __name__ == "__main__":
    demo.launch()