File size: 2,003 Bytes
165189d
 
 
627d59b
165189d
27c0f8e
165189d
27c0f8e
 
165189d
27c0f8e
 
165189d
 
27c0f8e
 
 
73ea45e
165189d
ef0cc41
27c0f8e
73ea45e
27c0f8e
165189d
27c0f8e
165189d
 
 
27c0f8e
165189d
 
 
0cefe4b
27c0f8e
165189d
27c0f8e
73ea45e
 
0cefe4b
627d59b
 
 
73ea45e
627d59b
 
165189d
73ea45e
 
165189d
27c0f8e
 
165189d
27c0f8e
a3fed0c
 
73ea45e
a3fed0c
 
27c0f8e
 
 
 
 
 
 
 
 
 
 
165189d
 
27c0f8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import os
import base64
import time
import io
import gradio as gr
from gradio_client import Client

# Load environment variables
ROBOT_ID = os.environ.get("ROBOT_ID")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
HF_SPACE = "OppaAI/Robot_MCP_Server"
API_NAME = "/predict"


def process_webcam_stream(image):
    """Send webcam image to HF MCP Server and get result"""
    if image is None:
        return "", "", "", ""

    # Convert Image to base64
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    # Prepare payload
    payload = {
        "image_b64": b64_img,
        "robot_id": ROBOT_ID,
        "timestamp": time.time(),
        "hf_token": HF_TOKEN
    }

    # Send to HF Space using streaming-friendly predict
    client = Client(HF_SPACE)
    try:
        resp = client.predict(payload, api_name=API_NAME)
        objects_list = resp.get("objects", [])
        objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

        return (
            resp.get("description", ""),
            resp.get("human", ""),
            objects_str,
            resp.get("environment", "")
        )
    except Exception as e:
        return f"Error: {e}", "", "", ""


with gr.Blocks() as demo:
    gr.Markdown("## 🎥 Robot Vision Webcam Stream")

    with gr.Row():
        webcam_input = gr.Image(
            label="Captured from Web-Cam",
            sources=["upload", "webcam"],
            type="pil"
        )
        description_out = gr.Textbox(label="Description")
        human_out = gr.Textbox(label="Human")
        objects_out = gr.Textbox(label="Objects")
        environment_out = gr.Textbox(label="Environment")

    webcam_input.stream(
        process_webcam_stream,
        inputs=[webcam_input],
        outputs=[description_out, human_out, objects_out, environment_out],
        stream_every=0.5
    )

if __name__ == "__main__":
    demo.launch()