Spaces:
Sleeping
Sleeping
File size: 3,217 Bytes
165189d 627d59b 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 627d59b 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 627d59b 27c0f8e 165189d 27c0f8e 627d59b 165189d 27c0f8e 627d59b 165189d 27c0f8e 165189d 27c0f8e cbcaac5 27c0f8e 165189d 27c0f8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
import os
import base64
import time
from PIL import Image
import io
import gradio as gr
from gradio_client import Client
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich import box
# Load environment variables
load_dotenv()
ROBOT_ID = os.environ.get("ROBOT_ID")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
HF_SPACE = "OppaAI/Robot_MCP_Server"
API_NAME = "/predict"
console = Console()
def pretty_print_response(resp: dict):
"""Rich table output with row lines, no URL."""
table = Table(
title="π Robot Vision Result",
title_style="bold cyan",
title_justify="left",
box=box.ROUNDED,
show_lines=True,
show_header=False,
style="bold cyan"
)
objects_list = resp.get("objects", [])
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
table.add_column("Field", style="bold magenta")
table.add_column("Value", style="white")
table.add_row("π€ Robot ID", str(resp.get("robot_id", "N/A")))
table.add_row("ποΈ Image Size", str(resp.get("file_size_bytes", "N/A")))
table.add_row("π Description", str(resp.get("description", "N/A")))
table.add_row("π₯ Human", str(resp.get("human", "N/A")))
table.add_row("π¦ Objects", objects_str)
table.add_row("ποΈ Environment", str(resp.get("environment", "N/A")))
console.print(table)
return resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "")
def process_webcam_stream(image):
"""Send webcam image to HF MCP Server and get result"""
if image is None:
return "", "", "", ""
# Convert to base64
buffered = io.BytesIO()
img = Image.fromarray(image)
img.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Prepare payload
payload = {
"image_b64": b64_img,
"robot_id": ROBOT_ID,
"timestamp": time.time(),
"hf_token": HF_TOKEN
}
# Send to HF Space
client = Client(HF_SPACE)
try:
resp = client.predict(payload, api_name=API_NAME)
pretty_print_response(resp)
return (
resp.get("description", ""),
resp.get("human", ""),
", ".join(resp.get("objects", [])) if resp.get("objects") else "",
resp.get("environment", "")
)
except Exception as e:
console.print(f"[bold red]Error sending to HF:[/bold red] {e}")
return "", "", "", ""
with gr.Blocks() as demo:
gr.Markdown("## π₯ Robot Vision Webcam Stream")
with gr.Row():
webcam_input = gr.Image(sources="webcam", streaming=True, label="Webcam Input")
description_out = gr.Textbox(label="Description")
human_out = gr.Textbox(label="Human")
objects_out = gr.Textbox(label="Objects")
environment_out = gr.Textbox(label="Environment")
webcam_input.stream(
process_webcam_stream,
inputs=[webcam_input],
outputs=[description_out, human_out, objects_out, environment_out],
stream_every=0.5
)
if __name__ == "__main__":
demo.launch()
|