import os import base64 import time from PIL import Image import io import gradio as gr from gradio_client import Client from dotenv import load_dotenv from rich.console import Console from rich.table import Table from rich import box # Load environment variables load_dotenv() ROBOT_ID = os.environ.get("ROBOT_ID") HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") HF_SPACE = "OppaAI/Robot_MCP_Server" API_NAME = "/predict" console = Console() def pretty_print_response(resp: dict): """Rich table output with row lines, no URL.""" table = Table( title="😎 Robot Vision Result", title_style="bold cyan", title_justify="left", box=box.ROUNDED, show_lines=True, show_header=False, style="bold cyan" ) objects_list = resp.get("objects", []) objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) table.add_column("Field", style="bold magenta") table.add_column("Value", style="white") table.add_row("🤖 Robot ID", str(resp.get("robot_id", "N/A"))) table.add_row("🏞️ Image Size", str(resp.get("file_size_bytes", "N/A"))) table.add_row("📝 Description", str(resp.get("description", "N/A"))) table.add_row("👥 Human", str(resp.get("human", "N/A"))) table.add_row("📦 Objects", objects_str) table.add_row("🏛️ Environment", str(resp.get("environment", "N/A"))) console.print(table) return resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "") def process_webcam_stream(image): """Send webcam image to HF MCP Server and get result""" if image is None: return "", "", "", "" # Convert to base64 buffered = io.BytesIO() img = Image.fromarray(image) img.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Prepare payload payload = { "image_b64": b64_img, "robot_id": ROBOT_ID, "timestamp": time.time(), "hf_token": HF_TOKEN } # Send to HF Space client = Client(HF_SPACE) try: resp = client.predict(payload, api_name=API_NAME) pretty_print_response(resp) return ( resp.get("description", ""), resp.get("human", ""), ", ".join(resp.get("objects", [])) if resp.get("objects") else "", resp.get("environment", "") ) except Exception as e: console.print(f"[bold red]Error sending to HF:[/bold red] {e}") return "", "", "", "" with gr.Blocks() as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream") with gr.Row(): webcam_input = gr.Image(sources="webcam", streaming=True, label="Webcam Input") description_out = gr.Textbox(label="Description") human_out = gr.Textbox(label="Human") objects_out = gr.Textbox(label="Objects") environment_out = gr.Textbox(label="Environment") webcam_input.stream( process_webcam_stream, inputs=[webcam_input], outputs=[description_out, human_out, objects_out, environment_out], stream_every=0.5 ) if __name__ == "__main__": demo.launch()