Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import time | |
| from PIL import Image | |
| import io | |
| import gradio as gr | |
| from gradio_client import Client | |
| from dotenv import load_dotenv | |
| from rich.console import Console | |
| from rich.table import Table | |
| from rich import box | |
| # Load environment variables | |
| load_dotenv() | |
| ROBOT_ID = os.environ.get("ROBOT_ID") | |
| HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") | |
| HF_SPACE = "OppaAI/Robot_MCP_Server" | |
| API_NAME = "/predict" | |
| console = Console() | |
| def pretty_print_response(resp: dict): | |
| """Rich table output with row lines, no URL.""" | |
| table = Table( | |
| title="π Robot Vision Result", | |
| title_style="bold cyan", | |
| title_justify="left", | |
| box=box.ROUNDED, | |
| show_lines=True, | |
| show_header=False, | |
| style="bold cyan" | |
| ) | |
| objects_list = resp.get("objects", []) | |
| objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) | |
| table.add_column("Field", style="bold magenta") | |
| table.add_column("Value", style="white") | |
| table.add_row("π€ Robot ID", str(resp.get("robot_id", "N/A"))) | |
| table.add_row("ποΈ Image Size", str(resp.get("file_size_bytes", "N/A"))) | |
| table.add_row("π Description", str(resp.get("description", "N/A"))) | |
| table.add_row("π₯ Human", str(resp.get("human", "N/A"))) | |
| table.add_row("π¦ Objects", objects_str) | |
| table.add_row("ποΈ Environment", str(resp.get("environment", "N/A"))) | |
| console.print(table) | |
| return resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "") | |
| def process_webcam_stream(image): | |
| """Send webcam image to HF MCP Server and get result""" | |
| if image is None: | |
| return "", "", "", "" | |
| # Convert to base64 | |
| buffered = io.BytesIO() | |
| img = Image.fromarray(image) | |
| img.save(buffered, format="JPEG") | |
| b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| # Prepare payload | |
| payload = { | |
| "image_b64": b64_img, | |
| "robot_id": ROBOT_ID, | |
| "timestamp": time.time(), | |
| "hf_token": HF_TOKEN | |
| } | |
| # Send to HF Space | |
| client = Client(HF_SPACE) | |
| try: | |
| resp = client.predict(payload, api_name=API_NAME) | |
| pretty_print_response(resp) | |
| return ( | |
| resp.get("description", ""), | |
| resp.get("human", ""), | |
| ", ".join(resp.get("objects", [])) if resp.get("objects") else "", | |
| resp.get("environment", "") | |
| ) | |
| except Exception as e: | |
| console.print(f"[bold red]Error sending to HF:[/bold red] {e}") | |
| return "", "", "", "" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π₯ Robot Vision Webcam Stream") | |
| with gr.Row(): | |
| webcam_input = gr.Image(sources="webcam", streaming=True, label="Webcam Input") | |
| description_out = gr.Textbox(label="Description") | |
| human_out = gr.Textbox(label="Human") | |
| objects_out = gr.Textbox(label="Objects") | |
| environment_out = gr.Textbox(label="Environment") | |
| webcam_input.stream( | |
| process_webcam_stream, | |
| inputs=[webcam_input], | |
| outputs=[description_out, human_out, objects_out, environment_out], | |
| stream_every=0.5 | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |