OppaAI commited on
Commit
27c0f8e
Β·
verified Β·
1 Parent(s): 9b0c24e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +54 -71
app.py CHANGED
@@ -1,35 +1,27 @@
1
  import os
2
- import cv2
3
  import base64
4
  import time
5
- import requests
6
- from io import BytesIO
7
- from typing import Dict, Any
8
-
9
  import gradio as gr
 
10
  from dotenv import load_dotenv
11
  from rich.console import Console
12
  from rich.table import Table
13
  from rich import box
14
 
15
- # ------------------------------
16
- # Environment
17
- # ------------------------------
18
  load_dotenv()
19
- ROBOT_ID = os.environ.get("ROBOT_ID", "robot_001")
 
20
  HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
21
- MCP_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:7860/run_tool/robot_watch") # Replace with actual URL
 
22
 
23
  console = Console()
24
 
25
- # ------------------------------
26
- # Rich table helper
27
- # ------------------------------
28
- def format_response(resp: Dict[str, Any]):
29
- """Return a string for Gradio display with similar formatting to terminal rich table."""
30
- objects_list = resp.get("objects", [])
31
- objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
32
 
 
 
33
  table = Table(
34
  title="😎 Robot Vision Result",
35
  title_style="bold cyan",
@@ -39,6 +31,10 @@ def format_response(resp: Dict[str, Any]):
39
  show_header=False,
40
  style="bold cyan"
41
  )
 
 
 
 
42
  table.add_column("Field", style="bold magenta")
43
  table.add_column("Value", style="white")
44
 
@@ -49,75 +45,62 @@ def format_response(resp: Dict[str, Any]):
49
  table.add_row("πŸ“¦ Objects", objects_str)
50
  table.add_row("πŸ›οΈ Environment", str(resp.get("environment", "N/A")))
51
 
52
- # Render as string for Gradio display
53
- from rich.console import Console
54
- from io import StringIO
55
-
56
- s = StringIO()
57
- temp_console = Console(file=s, force_terminal=True, color_system="truecolor", width=120)
58
- temp_console.print(table)
59
- return s.getvalue()
60
-
61
 
62
- # ------------------------------
63
- # Capture & call MCP tool
64
- # ------------------------------
65
- def process_frame_stream() -> Dict[str, Any]:
66
- """Capture frame, send to MCP server, and return dict for Gradio."""
67
- cap = cv2.VideoCapture(0)
68
- if not cap.isOpened():
69
- return {"result": "Camera not opened", "image": None}
70
 
71
- ret, frame = cap.read()
72
- cap.release()
 
 
73
 
74
- if not ret:
75
- return {"result": "Failed to read frame", "image": None}
 
 
 
 
 
76
 
77
- # Encode image as JPEG + base64
78
- ok, jpeg = cv2.imencode(".jpg", frame)
79
- if not ok:
80
- return {"result": "Failed to encode frame", "image": None}
81
-
82
- b64_img = base64.b64encode(jpeg.tobytes()).decode("utf-8")
83
-
84
- # Payload for MCP server
85
  payload = {
86
  "image_b64": b64_img,
87
  "robot_id": ROBOT_ID,
 
88
  "hf_token": HF_TOKEN
89
  }
90
 
 
 
91
  try:
92
- # Streamable POST request to MCP
93
- response = requests.post(MCP_URL, json=payload, stream=True)
94
- response.raise_for_status()
95
-
96
- # MCP returns JSON
97
- resp_json = response.json()
98
-
99
- # Convert response into rich table string
100
- table_str = format_response(resp_json)
101
-
102
- # Decode frame for display in Gradio
103
- img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
104
- return {"result": table_str, "image": img_rgb}
105
 
 
 
106
  except Exception as e:
107
- return {"result": f"Error calling MCP: {e}", "image": None}
 
108
 
109
 
110
- # ------------------------------
111
- # Gradio Interface
112
- # ------------------------------
113
- with gr.Blocks(title="Robot Vision Stream") as app:
114
- with gr.Row():
115
- output_text = gr.Textbox(label="Result", lines=20, interactive=False, placeholder="MCP results will appear here")
116
- output_image = gr.Image(label="Camera Frame", type="numpy")
117
-
118
- # Stream button triggers frame capture every 1 second
119
- gr.Button("Capture & Analyze").click(fn=process_frame_stream, outputs=[output_text, output_image])
120
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
  if __name__ == "__main__":
123
- app.launch()
 
1
  import os
 
2
  import base64
3
  import time
4
+ import json
 
 
 
5
  import gradio as gr
6
+ from gradio_client import Client
7
  from dotenv import load_dotenv
8
  from rich.console import Console
9
  from rich.table import Table
10
  from rich import box
11
 
12
+ # Load environment variables
 
 
13
  load_dotenv()
14
+
15
+ ROBOT_ID = os.environ.get("ROBOT_ID")
16
  HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
17
+ HF_SPACE = "OppaAI/Robot_MCP_Server"
18
+ API_NAME = "/predict"
19
 
20
  console = Console()
21
 
 
 
 
 
 
 
 
22
 
23
+ def pretty_print_response(resp: dict):
24
+ """Rich table output with row lines, no URL."""
25
  table = Table(
26
  title="😎 Robot Vision Result",
27
  title_style="bold cyan",
 
31
  show_header=False,
32
  style="bold cyan"
33
  )
34
+
35
+ objects_list = resp.get("objects", [])
36
+ objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
37
+
38
  table.add_column("Field", style="bold magenta")
39
  table.add_column("Value", style="white")
40
 
 
45
  table.add_row("πŸ“¦ Objects", objects_str)
46
  table.add_row("πŸ›οΈ Environment", str(resp.get("environment", "N/A")))
47
 
48
+ console.print(table)
49
+ return resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "")
 
 
 
 
 
 
 
50
 
 
 
 
 
 
 
 
 
51
 
52
+ def process_webcam_stream(image):
53
+ """Send webcam image to HF MCP Server and get result"""
54
+ if image is None:
55
+ return None, None, None, None
56
 
57
+ # Convert to base64
58
+ import io
59
+ from PIL import Image
60
+ buffered = io.BytesIO()
61
+ img = Image.fromarray(image)
62
+ img.save(buffered, format="JPEG")
63
+ b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
64
 
65
+ # Prepare payload
 
 
 
 
 
 
 
66
  payload = {
67
  "image_b64": b64_img,
68
  "robot_id": ROBOT_ID,
69
+ "timestamp": time.time(),
70
  "hf_token": HF_TOKEN
71
  }
72
 
73
+ # Send to HF Space using streaming-friendly predict
74
+ client = Client(HF_SPACE)
75
  try:
76
+ resp = client.predict(payload, api_name=API_NAME)
77
+ # Print table in console
78
+ pretty_print_response(resp)
 
 
 
 
 
 
 
 
 
 
79
 
80
+ # Return selected fields for Gradio display
81
+ return resp.get("description", ""), resp.get("human", ""), ", ".join(resp.get("objects", [])), resp.get("environment", "")
82
  except Exception as e:
83
+ console.print(f"[bold red]Error sending to HF:[/bold red] {e}")
84
+ return None, None, None, None
85
 
86
 
87
+ with gr.Blocks() as demo:
88
+ gr.Markdown("## πŸŽ₯ Robot Vision Webcam Stream")
 
 
 
 
 
 
 
 
89
 
90
+ with gr.Row():
91
+ webcam_input = gr.Image(source="webcam", streaming=True, label="Webcam Input")
92
+ description_out = gr.Textbox(label="Description")
93
+ human_out = gr.Textbox(label="Human")
94
+ objects_out = gr.Textbox(label="Objects")
95
+ environment_out = gr.Textbox(label="Environment")
96
+
97
+ # Connect streaming
98
+ webcam_input.stream(
99
+ process_webcam_stream,
100
+ inputs=[webcam_input],
101
+ outputs=[description_out, human_out, objects_out, environment_out],
102
+ stream_every=0.5
103
+ )
104
 
105
  if __name__ == "__main__":
106
+ demo.launch()