OppaAI's picture
Create app.py
165189d verified
raw
history blame
3.92 kB
import os
import cv2
import base64
import time
import requests
from io import BytesIO
from typing import Dict, Any
import gradio as gr
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich import box
# ------------------------------
# Environment
# ------------------------------
load_dotenv()
ROBOT_ID = os.environ.get("ROBOT_ID", "robot_001")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
MCP_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:7860/run_tool/robot_watch") # Replace with actual URL
console = Console()
# ------------------------------
# Rich table helper
# ------------------------------
def format_response(resp: Dict[str, Any]):
"""Return a string for Gradio display with similar formatting to terminal rich table."""
objects_list = resp.get("objects", [])
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
table = Table(
title="😎 Robot Vision Result",
title_style="bold cyan",
title_justify="left",
box=box.ROUNDED,
show_lines=True,
show_header=False,
style="bold cyan"
)
table.add_column("Field", style="bold magenta")
table.add_column("Value", style="white")
table.add_row("πŸ€– Robot ID", str(resp.get("robot_id", "N/A")))
table.add_row("🏞️ Image Size", str(resp.get("file_size_bytes", "N/A")))
table.add_row("πŸ“ Description", str(resp.get("description", "N/A")))
table.add_row("πŸ‘₯ Human", str(resp.get("human", "N/A")))
table.add_row("πŸ“¦ Objects", objects_str)
table.add_row("πŸ›οΈ Environment", str(resp.get("environment", "N/A")))
# Render as string for Gradio display
from rich.console import Console
from io import StringIO
s = StringIO()
temp_console = Console(file=s, force_terminal=True, color_system="truecolor", width=120)
temp_console.print(table)
return s.getvalue()
# ------------------------------
# Capture & call MCP tool
# ------------------------------
def process_frame_stream() -> Dict[str, Any]:
"""Capture frame, send to MCP server, and return dict for Gradio."""
cap = cv2.VideoCapture(0)
if not cap.isOpened():
return {"result": "Camera not opened", "image": None}
ret, frame = cap.read()
cap.release()
if not ret:
return {"result": "Failed to read frame", "image": None}
# Encode image as JPEG + base64
ok, jpeg = cv2.imencode(".jpg", frame)
if not ok:
return {"result": "Failed to encode frame", "image": None}
b64_img = base64.b64encode(jpeg.tobytes()).decode("utf-8")
# Payload for MCP server
payload = {
"image_b64": b64_img,
"robot_id": ROBOT_ID,
"hf_token": HF_TOKEN
}
try:
# Streamable POST request to MCP
response = requests.post(MCP_URL, json=payload, stream=True)
response.raise_for_status()
# MCP returns JSON
resp_json = response.json()
# Convert response into rich table string
table_str = format_response(resp_json)
# Decode frame for display in Gradio
img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return {"result": table_str, "image": img_rgb}
except Exception as e:
return {"result": f"Error calling MCP: {e}", "image": None}
# ------------------------------
# Gradio Interface
# ------------------------------
with gr.Blocks(title="Robot Vision Stream") as app:
with gr.Row():
output_text = gr.Textbox(label="Result", lines=20, interactive=False, placeholder="MCP results will appear here")
output_image = gr.Image(label="Camera Frame", type="numpy")
# Stream button triggers frame capture every 1 second
gr.Button("Capture & Analyze").click(fn=process_frame_stream, outputs=[output_text, output_image])
if __name__ == "__main__":
app.launch()