import os import base64 import time import io import gradio as gr from gradio_client import Client # Load environment variables ROBOT_ID = os.environ.get("ROBOT_ID") HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") HF_SPACE = "OppaAI/Robot_MCP_Server" API_NAME = "/predict" def process_webcam_stream(image): """Send webcam image to HF MCP Server and get result""" if image is None: return "", "", "", "" # Convert Image to base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Prepare payload payload = { "image_b64": b64_img, "robot_id": ROBOT_ID, "timestamp": time.time(), "hf_token": HF_TOKEN } # Send to HF Space using streaming-friendly predict client = Client(HF_SPACE) try: resp = client.predict(payload, api_name=API_NAME) objects_list = resp.get("objects", []) objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) return ( resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "") ) except Exception as e: return f"Error: {e}", "", "", "" with gr.Blocks() as demo: gr.Markdown("## 🎥 Robot Vision Webcam Stream") with gr.Row(): webcam_input = gr.Image( label="Captured from Web-Cam", sources=["upload", "webcam"], type="pil" ) description_out = gr.Textbox(label="Description") human_out = gr.Textbox(label="Human") objects_out = gr.Textbox(label="Objects") environment_out = gr.Textbox(label="Environment") webcam_input.stream( process_webcam_stream, inputs=[webcam_input], outputs=[description_out, human_out, objects_out, environment_out], stream_every=0.5 ) if __name__ == "__main__": demo.launch()