import os import cv2 import base64 import time import requests from io import BytesIO from typing import Dict, Any import gradio as gr from dotenv import load_dotenv from rich.console import Console from rich.table import Table from rich import box # ------------------------------ # Environment # ------------------------------ load_dotenv() ROBOT_ID = os.environ.get("ROBOT_ID", "robot_001") HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") MCP_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:7860/run_tool/robot_watch") # Replace with actual URL console = Console() # ------------------------------ # Rich table helper # ------------------------------ def format_response(resp: Dict[str, Any]): """Return a string for Gradio display with similar formatting to terminal rich table.""" objects_list = resp.get("objects", []) objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) table = Table( title="😎 Robot Vision Result", title_style="bold cyan", title_justify="left", box=box.ROUNDED, show_lines=True, show_header=False, style="bold cyan" ) table.add_column("Field", style="bold magenta") table.add_column("Value", style="white") table.add_row("🤖 Robot ID", str(resp.get("robot_id", "N/A"))) table.add_row("🏞️ Image Size", str(resp.get("file_size_bytes", "N/A"))) table.add_row("📝 Description", str(resp.get("description", "N/A"))) table.add_row("👥 Human", str(resp.get("human", "N/A"))) table.add_row("📦 Objects", objects_str) table.add_row("🏛️ Environment", str(resp.get("environment", "N/A"))) # Render as string for Gradio display from rich.console import Console from io import StringIO s = StringIO() temp_console = Console(file=s, force_terminal=True, color_system="truecolor", width=120) temp_console.print(table) return s.getvalue() # ------------------------------ # Capture & call MCP tool # ------------------------------ def process_frame_stream() -> Dict[str, Any]: """Capture frame, send to MCP server, and return dict for Gradio.""" cap = cv2.VideoCapture(0) if not cap.isOpened(): return {"result": "Camera not opened", "image": None} ret, frame = cap.read() cap.release() if not ret: return {"result": "Failed to read frame", "image": None} # Encode image as JPEG + base64 ok, jpeg = cv2.imencode(".jpg", frame) if not ok: return {"result": "Failed to encode frame", "image": None} b64_img = base64.b64encode(jpeg.tobytes()).decode("utf-8") # Payload for MCP server payload = { "image_b64": b64_img, "robot_id": ROBOT_ID, "hf_token": HF_TOKEN } try: # Streamable POST request to MCP response = requests.post(MCP_URL, json=payload, stream=True) response.raise_for_status() # MCP returns JSON resp_json = response.json() # Convert response into rich table string table_str = format_response(resp_json) # Decode frame for display in Gradio img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return {"result": table_str, "image": img_rgb} except Exception as e: return {"result": f"Error calling MCP: {e}", "image": None} # ------------------------------ # Gradio Interface # ------------------------------ with gr.Blocks(title="Robot Vision Stream") as app: with gr.Row(): output_text = gr.Textbox(label="Result", lines=20, interactive=False, placeholder="MCP results will appear here") output_image = gr.Image(label="Camera Frame", type="numpy") # Stream button triggers frame capture every 1 second gr.Button("Capture & Analyze").click(fn=process_frame_stream, outputs=[output_text, output_image]) if __name__ == "__main__": app.launch()