File size: 4,937 Bytes
165189d
 
 
627d59b
165189d
1fb1e3b
5253b0d
1fb1e3b
b18ef1e
5253b0d
165189d
5253b0d
 
 
4decfa0
73ea45e
5253b0d
4decfa0
 
 
5253b0d
4decfa0
5253b0d
4decfa0
 
5253b0d
4decfa0
5253b0d
 
 
4decfa0
 
 
 
 
5253b0d
 
 
 
 
 
 
 
 
 
 
 
 
4decfa0
 
 
 
 
 
5253b0d
4decfa0
 
 
 
5253b0d
4decfa0
 
 
 
 
 
 
5253b0d
4decfa0
 
5253b0d
4decfa0
5253b0d
4decfa0
 
 
5253b0d
4decfa0
 
5253b0d
 
 
 
4decfa0
5253b0d
4decfa0
 
 
 
 
5253b0d
4decfa0
 
5253b0d
 
4decfa0
5253b0d
4decfa0
5253b0d
4decfa0
165189d
5253b0d
 
 
 
27c0f8e
1fb1e3b
70b499c
b458243
70b499c
5253b0d
 
b458243
27c0f8e
5253b0d
a3fed0c
 
73ea45e
a3fed0c
 
b18ef1e
5253b0d
 
 
 
 
27c0f8e
5253b0d
27c0f8e
17f5b16
27c0f8e
 
 
 
165189d
 
27c0f8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import base64
import time
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
from dotenv import load_dotenv
import ast  # For safely evaluating Python literals returned from server

# -------------------------------
# Load environment variables
# -------------------------------
load_dotenv()

ROBOT_ID = "Robot_MCP_Client"  # Local client identifier
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
    print("Warning: HF_TOKEN not found. API calls may fail.")
    HF_TOKEN = "missing_token_placeholder"  # Placeholder to avoid crash

# MCP server info
MCP_SERVER_URL = "https://oppaai-robot-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "Robot_MCP_Server"
TOOL_NAME = "Robot_MCP_Server_robot_watch"

# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)


async def process_webcam_stream_async(image):
    """
    Send webcam image to MCP server and process the response.

    Args:
        image (PIL.Image or None): Image captured from webcam or uploaded.

    Returns:
        tuple: (description, human, objects_str, environment)
            description (str): Description of scene.
            human (str): Human-related information.
            objects_str (str): Comma-separated list of objects.
            environment (str): Environment description.
    """
    if image is None:
        return "", "", "", ""

    if HF_TOKEN == "missing_token_placeholder":
        return "Error: HF_TOKEN not set locally.", "", "", ""

    # Convert image to Base64 string
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    # Prepare payload according to server's expected fields
    payload = {
        "hf_token_input": HF_TOKEN,
        "robot_id_input": ROBOT_ID,
        "image_b64_input": b64_img
    }

    try:
        # Use async context to call MCP server tool
        async with MCP_CLIENT:
            response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)

            if response.is_error:
                # Extract error message
                error_text = response.content[0].text if response.content and isinstance(response.content, list) else "Unknown error"
                raise Exception(f"MCP Tool Error: {error_text}")

            # Server may return Python-style string (single quotes)
            raw_text = response.content[0].text
            response_dict = ast.literal_eval(raw_text)

            # -------------------------------
            # Extract fields from response
            # -------------------------------
            vlm_result = response_dict.get("result", {})

            description_out = vlm_result.get("description", "")
            human_out = vlm_result.get("human", "")
            objects_list = vlm_result.get("objects", [])
            environment_out = vlm_result.get("environment", "")

            # Convert objects list to a comma-separated string for display
            objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

            return description_out, human_out, objects_str, environment_out

    except Exception as e:
        print(f"Error calling remote MCP API: {e}")
        import traceback
        traceback.print_exc()
        return f"Error: {e}", "", "", ""


# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
    gr.Markdown("## 🎥 Robot Vision Webcam Stream (using MCP Client)")
    gr.Markdown("""   
    ### 🔑 Hugging Face Token Required
    To use this application, you must set a valid **Hugging Face API Token** in your local environment variables: `HF_TOKEN`.
    **A write token is required** to upload images to the public dataset associated with this space.
    Resource usage for VLM inference will be tracked against your account.
    """)
    with gr.Row():
        # Webcam / upload image input
        webcam_input = gr.Image(
            label="Captured from Web-Cam",
            sources=["upload", "webcam"],
            type="pil"
        )
        with gr.Column():
            # Output fields for MCP response
            description_out = gr.Textbox(label="Description", lines=5)
            human_out = gr.Textbox(label="Human", lines=3)
            objects_out = gr.Textbox(label="Objects", lines=2)
            environment_out = gr.Textbox(label="Environment", lines=3)

    # Stream webcam input to server every 0.5 seconds
    webcam_input.stream(
        process_webcam_stream_async,
        inputs=[webcam_input],
        outputs=[description_out, human_out, objects_out, environment_out],
        stream_every=0.5
    )

if __name__ == "__main__":
    demo.launch()