File size: 3,923 Bytes
165189d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import cv2
import base64
import time
import requests
from io import BytesIO
from typing import Dict, Any

import gradio as gr
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich import box

# ------------------------------
# Environment
# ------------------------------
load_dotenv()
ROBOT_ID = os.environ.get("ROBOT_ID", "robot_001")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
MCP_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:7860/run_tool/robot_watch")  # Replace with actual URL

console = Console()

# ------------------------------
# Rich table helper
# ------------------------------
def format_response(resp: Dict[str, Any]):
    """Return a string for Gradio display with similar formatting to terminal rich table."""
    objects_list = resp.get("objects", [])
    objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)

    table = Table(
        title="😎 Robot Vision Result",
        title_style="bold cyan",
        title_justify="left",
        box=box.ROUNDED,
        show_lines=True,
        show_header=False,
        style="bold cyan"
    )
    table.add_column("Field", style="bold magenta")
    table.add_column("Value", style="white")

    table.add_row("πŸ€– Robot ID", str(resp.get("robot_id", "N/A")))
    table.add_row("🏞️  Image Size", str(resp.get("file_size_bytes", "N/A")))
    table.add_row("πŸ“ Description", str(resp.get("description", "N/A")))
    table.add_row("πŸ‘₯ Human", str(resp.get("human", "N/A")))
    table.add_row("πŸ“¦ Objects", objects_str)
    table.add_row("πŸ›οΈ  Environment", str(resp.get("environment", "N/A")))

    # Render as string for Gradio display
    from rich.console import Console
    from io import StringIO

    s = StringIO()
    temp_console = Console(file=s, force_terminal=True, color_system="truecolor", width=120)
    temp_console.print(table)
    return s.getvalue()


# ------------------------------
# Capture & call MCP tool
# ------------------------------
def process_frame_stream() -> Dict[str, Any]:
    """Capture frame, send to MCP server, and return dict for Gradio."""
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        return {"result": "Camera not opened", "image": None}

    ret, frame = cap.read()
    cap.release()

    if not ret:
        return {"result": "Failed to read frame", "image": None}

    # Encode image as JPEG + base64
    ok, jpeg = cv2.imencode(".jpg", frame)
    if not ok:
        return {"result": "Failed to encode frame", "image": None}

    b64_img = base64.b64encode(jpeg.tobytes()).decode("utf-8")

    # Payload for MCP server
    payload = {
        "image_b64": b64_img,
        "robot_id": ROBOT_ID,
        "hf_token": HF_TOKEN
    }

    try:
        # Streamable POST request to MCP
        response = requests.post(MCP_URL, json=payload, stream=True)
        response.raise_for_status()

        # MCP returns JSON
        resp_json = response.json()

        # Convert response into rich table string
        table_str = format_response(resp_json)

        # Decode frame for display in Gradio
        img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        return {"result": table_str, "image": img_rgb}

    except Exception as e:
        return {"result": f"Error calling MCP: {e}", "image": None}


# ------------------------------
# Gradio Interface
# ------------------------------
with gr.Blocks(title="Robot Vision Stream") as app:
    with gr.Row():
        output_text = gr.Textbox(label="Result", lines=20, interactive=False, placeholder="MCP results will appear here")
        output_image = gr.Image(label="Camera Frame", type="numpy")

    # Stream button triggers frame capture every 1 second
    gr.Button("Capture & Analyze").click(fn=process_frame_stream, outputs=[output_text, output_image])


if __name__ == "__main__":
    app.launch()