File size: 3,217 Bytes
165189d
 
 
627d59b
 
165189d
27c0f8e
165189d
 
 
 
 
27c0f8e
165189d
27c0f8e
 
165189d
27c0f8e
 
165189d
 
 
 
27c0f8e
 
165189d
 
 
 
 
 
 
 
 
27c0f8e
 
 
 
165189d
 
 
 
 
 
 
 
 
 
27c0f8e
 
165189d
 
27c0f8e
 
 
627d59b
165189d
27c0f8e
 
 
 
 
165189d
27c0f8e
165189d
 
 
27c0f8e
165189d
 
 
627d59b
27c0f8e
165189d
27c0f8e
 
627d59b
 
 
 
 
 
165189d
27c0f8e
627d59b
165189d
 
27c0f8e
 
165189d
27c0f8e
cbcaac5
27c0f8e
 
 
 
 
 
 
 
 
 
 
165189d
 
27c0f8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import base64
import time
from PIL import Image
import io
import gradio as gr
from gradio_client import Client
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich import box

# Load environment variables
load_dotenv()

ROBOT_ID = os.environ.get("ROBOT_ID")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
HF_SPACE = "OppaAI/Robot_MCP_Server"
API_NAME = "/predict"

console = Console()


def pretty_print_response(resp: dict):
    """Rich table output with row lines, no URL."""
    table = Table(
        title="😎 Robot Vision Result",
        title_style="bold cyan",
        title_justify="left",
        box=box.ROUNDED,
        show_lines=True,
        show_header=False,
        style="bold cyan"
    )

    objects_list = resp.get("objects", [])
    objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

    table.add_column("Field", style="bold magenta")
    table.add_column("Value", style="white")

    table.add_row("πŸ€– Robot ID", str(resp.get("robot_id", "N/A")))
    table.add_row("🏞️  Image Size", str(resp.get("file_size_bytes", "N/A")))
    table.add_row("πŸ“ Description", str(resp.get("description", "N/A")))
    table.add_row("πŸ‘₯ Human", str(resp.get("human", "N/A")))
    table.add_row("πŸ“¦ Objects", objects_str)
    table.add_row("πŸ›οΈ  Environment", str(resp.get("environment", "N/A")))

    console.print(table)
    return resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "")


def process_webcam_stream(image):
    """Send webcam image to HF MCP Server and get result"""
    if image is None:
        return "", "", "", ""

    # Convert to base64
    buffered = io.BytesIO()
    img = Image.fromarray(image)
    img.save(buffered, format="JPEG")
    b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")

    # Prepare payload
    payload = {
        "image_b64": b64_img,
        "robot_id": ROBOT_ID,
        "timestamp": time.time(),
        "hf_token": HF_TOKEN
    }

    # Send to HF Space
    client = Client(HF_SPACE)
    try:
        resp = client.predict(payload, api_name=API_NAME)
        pretty_print_response(resp)
        return (
            resp.get("description", ""),
            resp.get("human", ""),
            ", ".join(resp.get("objects", [])) if resp.get("objects") else "",
            resp.get("environment", "")
        )
    except Exception as e:
        console.print(f"[bold red]Error sending to HF:[/bold red] {e}")
        return "", "", "", ""


with gr.Blocks() as demo:
    gr.Markdown("## πŸŽ₯ Robot Vision Webcam Stream")

    with gr.Row():
        webcam_input = gr.Image(sources="webcam", streaming=True, label="Webcam Input")
        description_out = gr.Textbox(label="Description")
        human_out = gr.Textbox(label="Human")
        objects_out = gr.Textbox(label="Objects")
        environment_out = gr.Textbox(label="Environment")

    webcam_input.stream(
        process_webcam_stream,
        inputs=[webcam_input],
        outputs=[description_out, human_out, objects_out, environment_out],
        stream_every=0.5
    )

if __name__ == "__main__":
    demo.launch()