OppaAI commited on
Commit
165189d
Β·
verified Β·
1 Parent(s): 073c1ea

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +123 -0
app.py ADDED
@@ -0,0 +1,123 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import cv2
3
+ import base64
4
+ import time
5
+ import requests
6
+ from io import BytesIO
7
+ from typing import Dict, Any
8
+
9
+ import gradio as gr
10
+ from dotenv import load_dotenv
11
+ from rich.console import Console
12
+ from rich.table import Table
13
+ from rich import box
14
+
15
+ # ------------------------------
16
+ # Environment
17
+ # ------------------------------
18
+ load_dotenv()
19
+ ROBOT_ID = os.environ.get("ROBOT_ID", "robot_001")
20
+ HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
21
+ MCP_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:7860/run_tool/robot_watch") # Replace with actual URL
22
+
23
+ console = Console()
24
+
25
+ # ------------------------------
26
+ # Rich table helper
27
+ # ------------------------------
28
+ def format_response(resp: Dict[str, Any]):
29
+ """Return a string for Gradio display with similar formatting to terminal rich table."""
30
+ objects_list = resp.get("objects", [])
31
+ objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
32
+
33
+ table = Table(
34
+ title="😎 Robot Vision Result",
35
+ title_style="bold cyan",
36
+ title_justify="left",
37
+ box=box.ROUNDED,
38
+ show_lines=True,
39
+ show_header=False,
40
+ style="bold cyan"
41
+ )
42
+ table.add_column("Field", style="bold magenta")
43
+ table.add_column("Value", style="white")
44
+
45
+ table.add_row("πŸ€– Robot ID", str(resp.get("robot_id", "N/A")))
46
+ table.add_row("🏞️ Image Size", str(resp.get("file_size_bytes", "N/A")))
47
+ table.add_row("πŸ“ Description", str(resp.get("description", "N/A")))
48
+ table.add_row("πŸ‘₯ Human", str(resp.get("human", "N/A")))
49
+ table.add_row("πŸ“¦ Objects", objects_str)
50
+ table.add_row("πŸ›οΈ Environment", str(resp.get("environment", "N/A")))
51
+
52
+ # Render as string for Gradio display
53
+ from rich.console import Console
54
+ from io import StringIO
55
+
56
+ s = StringIO()
57
+ temp_console = Console(file=s, force_terminal=True, color_system="truecolor", width=120)
58
+ temp_console.print(table)
59
+ return s.getvalue()
60
+
61
+
62
+ # ------------------------------
63
+ # Capture & call MCP tool
64
+ # ------------------------------
65
+ def process_frame_stream() -> Dict[str, Any]:
66
+ """Capture frame, send to MCP server, and return dict for Gradio."""
67
+ cap = cv2.VideoCapture(0)
68
+ if not cap.isOpened():
69
+ return {"result": "Camera not opened", "image": None}
70
+
71
+ ret, frame = cap.read()
72
+ cap.release()
73
+
74
+ if not ret:
75
+ return {"result": "Failed to read frame", "image": None}
76
+
77
+ # Encode image as JPEG + base64
78
+ ok, jpeg = cv2.imencode(".jpg", frame)
79
+ if not ok:
80
+ return {"result": "Failed to encode frame", "image": None}
81
+
82
+ b64_img = base64.b64encode(jpeg.tobytes()).decode("utf-8")
83
+
84
+ # Payload for MCP server
85
+ payload = {
86
+ "image_b64": b64_img,
87
+ "robot_id": ROBOT_ID,
88
+ "hf_token": HF_TOKEN
89
+ }
90
+
91
+ try:
92
+ # Streamable POST request to MCP
93
+ response = requests.post(MCP_URL, json=payload, stream=True)
94
+ response.raise_for_status()
95
+
96
+ # MCP returns JSON
97
+ resp_json = response.json()
98
+
99
+ # Convert response into rich table string
100
+ table_str = format_response(resp_json)
101
+
102
+ # Decode frame for display in Gradio
103
+ img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
104
+ return {"result": table_str, "image": img_rgb}
105
+
106
+ except Exception as e:
107
+ return {"result": f"Error calling MCP: {e}", "image": None}
108
+
109
+
110
+ # ------------------------------
111
+ # Gradio Interface
112
+ # ------------------------------
113
+ with gr.Blocks(title="Robot Vision Stream") as app:
114
+ with gr.Row():
115
+ output_text = gr.Textbox(label="Result", lines=20, interactive=False, placeholder="MCP results will appear here")
116
+ output_image = gr.Image(label="Camera Frame", type="numpy")
117
+
118
+ # Stream button triggers frame capture every 1 second
119
+ gr.Button("Capture & Analyze").click(fn=process_frame_stream, outputs=[output_text, output_image])
120
+
121
+
122
+ if __name__ == "__main__":
123
+ app.launch()