File size: 6,086 Bytes
7c6b50b
 
 
 
 
 
 
 
 
 
 
 
 
5895c26
7c6b50b
 
 
 
 
352e9a7
aa5a1be
 
7c6b50b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197ea96
 
 
 
 
 
 
 
7c6b50b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import base64
import io
import gradio as gr
from fastmcp import Client
from fastmcp.client import StreamableHttpTransport
import asyncio
import ast
import json
import os

# -------------------------------
# MCP server info
# -------------------------------
ROBOT_ID = "CV_MCP_Client"
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
    print("Warning: HF_TOKEN not found. API calls may fail.")
    HF_TOKEN = "missing_token_placeholder"

MCP_SERVER_URL = "https://mcp-1st-birthday-cv-mcp-server.hf.space/gradio_api/mcp/"
SERVER_NAME = "CV_MCP_Server"
TOOL_NAME = "CV_MCP_Server_robot_watch"

# -------------------------------
# Initialize MCP client globally
# -------------------------------
HTTP_TRANSPORT = StreamableHttpTransport(url=MCP_SERVER_URL)
MCP_CLIENT = Client(transport=HTTP_TRANSPORT, name=SERVER_NAME)

# -------------------------------
# Async function using user's HF token
# -------------------------------
async def process_webcam_stream_async(image):
    if image is None:
        return "", "", "", "", "", "", "", ""

    if HF_TOKEN == "missing_token_placeholder":
        return "Error: HF_TOKEN not set locally.", "", "", "", "", "", "", ""

    # Convert image to Base64
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    payload = {
        "hf_token_input": HF_TOKEN,
        "robot_id_input": ROBOT_ID,
        "image_b64_input": b64_img
    }

    try:
        async with MCP_CLIENT:
            response = await MCP_CLIENT.call_tool(TOOL_NAME, payload)
            
            if response.is_error:
                # Handle error content safely
                error_msg = "Unknown Error"
                if hasattr(response, 'content') and isinstance(response.content, list):
                    error_msg = " ".join([getattr(item, 'text', '') for item in response.content])
                raise Exception(f"MCP Tool Error: {error_msg}")

            # ---------------------------------------------------------
            # FIX: Handle List Content
            # The 'content' is a list of objects (e.g., TextContent).
            # We iterate through the list and join the text parts.
            # ---------------------------------------------------------
            raw_text = ""
            if hasattr(response, 'content') and isinstance(response.content, list):
                for item in response.content:
                    # Check if the item has a 'text' attribute
                    if hasattr(item, 'text'):
                        raw_text += item.text
            else:
                # Fallback for unexpected structure
                raw_text = str(response)

            # 6. PARSE RESPONSE
            try:
                response_dict = json.loads(raw_text)
            except json.JSONDecodeError:
                try:
                    response_dict = ast.literal_eval(raw_text)
                except Exception:
                    # If parsing fails completely, return the raw text in description
                    return f"Parsing Error. Raw output: {raw_text}", "", "", "", "", "", "", ""
            
            vlm_result = response_dict.get("result", {})

            # 7. EXTRACT DATA
            description_out = vlm_result.get("description", "")
            environment_out = vlm_result.get("environment", "")
            indoor_outdoor_out = vlm_result.get("indoor_or_outdoor", "")
            lighting_condition_out = vlm_result.get("lighting_condition", "")
            human_out = vlm_result.get("human", "")
            animals_out = vlm_result.get("animals", "")
            objects_list = vlm_result.get("objects", [])
            hazards_out = vlm_result.get("hazards", "")

            objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

            return (
                description_out,
                environment_out,
                indoor_outdoor_out,
                lighting_condition_out,
                human_out,
                animals_out,
                objects_str,
                hazards_out
            )

    except Exception as e:
        print(f"Error calling MCP API: {e}")
        return f"Error: {e}", "", "", "", "", "", "", ""


# -------------------------------
# Gradio UI
# -------------------------------
with gr.Blocks() as demo:
    gr.Markdown("## 🎥 Robot Vision Webcam Stream (MCP Client)")
    gr.Markdown(
        """
This interface captures a live webcam feed and sends each frame to the MCP Client for analysis.
The system extracts detailed information from the scene — including descriptions, detected objects,
humans, animals, environmental context, lighting conditions, and potential hazards.
Use this dashboard to observe how the robot interprets the world in real time.
        """
    )

    with gr.Row():
        webcam_input = gr.Image(
            label="Captured from Web-Cam", 
            sources=["webcam"], 
            type="pil"
        )
        with gr.Column():
            description_out = gr.Textbox(label="Description", lines=5)
            environment_out = gr.Textbox(label="Environment", lines=3)
            indoor_outdoor_out = gr.Textbox(label="Indoor/Outdoor", lines=1)
            lighting_condition_out = gr.Textbox(label="Lighting Condition", lines=1)
            human_out = gr.Textbox(label="Human Detected", lines=3)
            animals_out = gr.Textbox(label="Animals Detected", lines=2)
            objects_out = gr.Textbox(label="Objects Detected", lines=2)
            hazards_out = gr.Textbox(label="Hazards Identified", lines=2)

    webcam_input.stream(
        process_webcam_stream_async,
        inputs=[webcam_input], 
        outputs=[
            description_out,
            environment_out,
            indoor_outdoor_out,
            lighting_condition_out,
            human_out,
            animals_out,
            objects_out,
            hazards_out
        ],
        stream_every=1.0
    )

if __name__ == "__main__":
    demo.launch(ssr_mode=False)