File size: 3,794 Bytes
165189d
 
 
627d59b
165189d
1fb1e3b
 
 
b18ef1e
165189d
b18ef1e
 
 
c65f577
1cb393e
b18ef1e
 
17f5b16
b18ef1e
1fb1e3b
 
 
a1a55a9
165189d
 
1fb1e3b
 
 
 
 
 
 
27c0f8e
73ea45e
165189d
17f5b16
 
 
27c0f8e
73ea45e
27c0f8e
165189d
 
b18ef1e
 
 
165189d
 
 
1fb1e3b
 
 
 
b98a5e1
 
1fb1e3b
 
 
b98a5e1
 
 
1fb1e3b
 
 
dfddc79
 
 
 
1fb1e3b
dfddc79
1fb1e3b
 
 
 
 
 
 
 
165189d
1fb1e3b
73ea45e
 
165189d
27c0f8e
1fb1e3b
70b499c
b458243
70b499c
b458243
 
27c0f8e
a3fed0c
 
73ea45e
a3fed0c
 
b18ef1e
 
 
 
 
27c0f8e
 
17f5b16
27c0f8e
 
 
 
165189d
 
27c0f8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import base64
import time
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport 
import asyncio
from dotenv import load_dotenv

# Load environment variables (ensure .env is set up locally)
load_dotenv()

ROBOT_ID = "Robot_MCP_Client"
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
    print("Warning: HF_TOKEN not found. API calls may fail.")
    HF_TOKEN = "missing_token_placeholder" 

# The MCP URL of your remote server
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch" 


# Initialize the MCP client globally
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)


async def process_webcam_stream_async(image):
    """Send webcam image to HF MCP Server using MCP protocol and get result"""
    if image is None:
        return "", "", "", ""

    if HF_TOKEN == "missing_token_placeholder":
        return "Error: HF_TOKEN not set locally.", "", "", ""

    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    payload = {
        "hf_token_input": HF_TOKEN,
        "robot_id_input": ROBOT_ID,
        "image_b64_input": b64_img
    }

    try:
        async with MCP_CLIENT:
            response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
            
            if response.is_error:
                # Access the first item in the content list for the error text
                error_text = response.content[0].text if response.content and isinstance(response.content, list) else "Unknown error"
                raise Exception(f"MCP Tool Error: {error_text}")

            import json
            # Access the first item in the content list for the raw JSON string
            raw_text = response.content[0].text
            response_dict = json.loads(raw_text)
            
            vlm_result = response_dict.get("result", {})
            
            description_out = vlm_result
            human_out = vlm_result
            objects_str = vlm_result
            environment_out = vlm_result

            #objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

            return (
                description_out,
                human_out,
                objects_str,
                environment_out
            )
            
    except Exception as e:
        print(f"Error calling remote MCP API: {e}") 
        return f"Error: {e}", "", "", ""


with gr.Blocks() as demo:
    gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)")
    gr.Markdown("""   
    ### 🔑 Hugging Face Token Required
    To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`.
    **A write token is required** to upload images to the public dataset associated with this space. The resource usage for VLM inference will be tracked against *your* account.
    """)
    with gr.Row():
        webcam_input = gr.Image(
            label="Captured from Web-Cam",
            sources=["upload", "webcam"],
            type="pil"
        )
        with gr.Column():
            description_out = gr.Textbox(label="Description")
            human_out = gr.Textbox(label="Human")
            objects_out = gr.Textbox(label="Objects")
            environment_out = gr.Textbox(label="Environment")

    webcam_input.stream(
        process_webcam_stream_async,
        inputs=[webcam_input],
        outputs=[description_out, human_out, objects_out, environment_out],
        stream_every=0.5
    )

if __name__ == "__main__":
    demo.launch()