File size: 3,927 Bytes
165189d
 
 
627d59b
165189d
1fb1e3b
 
 
b18ef1e
165189d
b18ef1e
 
 
c65f577
1cb393e
b18ef1e
 
17f5b16
 
 
b18ef1e
1fb1e3b
 
 
a1a55a9
165189d
 
1fb1e3b
 
 
 
 
 
 
27c0f8e
73ea45e
165189d
17f5b16
 
 
 
ef0cc41
27c0f8e
73ea45e
27c0f8e
165189d
17f5b16
165189d
b18ef1e
 
 
165189d
 
 
1fb1e3b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
165189d
1fb1e3b
73ea45e
 
165189d
27c0f8e
1fb1e3b
b458243
 
 
 
 
 
27c0f8e
a3fed0c
 
73ea45e
a3fed0c
 
b18ef1e
 
 
 
 
27c0f8e
 
17f5b16
27c0f8e
 
 
 
165189d
 
27c0f8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import base64
import time
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport 
import asyncio
from dotenv import load_dotenv

# Load environment variables (ensure .env is set up locally)
load_dotenv()

ROBOT_ID = "Robot_MCP_Client"
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
    print("Warning: HF_TOKEN not found. API calls may fail.")
    # Set a placeholder string to avoid the 'None is not of type string' error
    # The API call will fail later due to auth, but validation will pass.
    HF_TOKEN = "missing_token_placeholder" 

# The MCP URL of your remote server
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch" 


# Initialize the MCP client globally
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)


async def process_webcam_stream_async(image):
    """Send webcam image to HF MCP Server using MCP protocol and get result"""
    if image is None:
        return "", "", "", ""

    # Check if a valid token is available before proceeding
    if HF_TOKEN == "missing_token_placeholder":
        return "Error: HF_TOKEN not set locally.", "", "", ""

    # Convert Image to base64
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    # Prepare payload
    payload = {
        "hf_token_input": HF_TOKEN,
        "robot_id_input": ROBOT_ID,
        "image_b64_input": b64_img
    }

    try:
        async with MCP_CLIENT:
            response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
            
            if response.is_error:
                error_text = response.content.text if response.content else "Unknown error"
                raise Exception(f"MCP Tool Error: {error_text}")

            import json
            response_dict = json.loads(response.content.text)
            
            vlm_result = response_dict.get("result", {})
            
            description_out = vlm_result.get("description", "")
            human_out = vlm_result.get("human", "")
            objects_list = vlm_result.get("objects", [])
            environment_out = vlm_result.get("environment", "")

            objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

            return (
                description_out,
                human_out,
                objects_str,
                environment_out
            )
            
    except Exception as e:
        print(f"Error calling remote MCP API: {e}") 
        return f"Error: {e}", "", "", ""


with gr.Blocks() as demo:
    gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)")
    gr.Markdown("""
    ### 🔑 Hugging Face Token Required
    To use this application, you must set a valid **Hugging Face API Token** in your local environment variables (`HF_TOKEN` or `HF_CV_ROBOT_TOKEN`).

    **A write token is required** to upload images to the public dataset associated with this space. The resource usage for VLM inference will be tracked against *your* account.
    """)
    with gr.Row():
        webcam_input = gr.Image(
            label="Captured from Web-Cam",
            sources=["upload", "webcam"],
            type="pil"
        )
        with gr.Column():
            description_out = gr.Textbox(label="Description")
            human_out = gr.Textbox(label="Human")
            objects_out = gr.Textbox(label="Objects")
            environment_out = gr.Textbox(label="Environment")

    webcam_input.stream(
        process_webcam_stream_async,
        inputs=[webcam_input],
        outputs=[description_out, human_out, objects_out, environment_out],
        stream_every=0.5
    )

if __name__ == "__main__":
    demo.launch()