OppaAI's picture
Update app.py
cbcaac5 verified
raw
history blame
3.22 kB
import os
import base64
import time
from PIL import Image
import io
import gradio as gr
from gradio_client import Client
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich import box
# Load environment variables
load_dotenv()
ROBOT_ID = os.environ.get("ROBOT_ID")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
HF_SPACE = "OppaAI/Robot_MCP_Server"
API_NAME = "/predict"
console = Console()
def pretty_print_response(resp: dict):
"""Rich table output with row lines, no URL."""
table = Table(
title="😎 Robot Vision Result",
title_style="bold cyan",
title_justify="left",
box=box.ROUNDED,
show_lines=True,
show_header=False,
style="bold cyan"
)
objects_list = resp.get("objects", [])
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
table.add_column("Field", style="bold magenta")
table.add_column("Value", style="white")
table.add_row("πŸ€– Robot ID", str(resp.get("robot_id", "N/A")))
table.add_row("🏞️ Image Size", str(resp.get("file_size_bytes", "N/A")))
table.add_row("πŸ“ Description", str(resp.get("description", "N/A")))
table.add_row("πŸ‘₯ Human", str(resp.get("human", "N/A")))
table.add_row("πŸ“¦ Objects", objects_str)
table.add_row("πŸ›οΈ Environment", str(resp.get("environment", "N/A")))
console.print(table)
return resp.get("description", ""), resp.get("human", ""), objects_str, resp.get("environment", "")
def process_webcam_stream(image):
"""Send webcam image to HF MCP Server and get result"""
if image is None:
return "", "", "", ""
# Convert to base64
buffered = io.BytesIO()
img = Image.fromarray(image)
img.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Prepare payload
payload = {
"image_b64": b64_img,
"robot_id": ROBOT_ID,
"timestamp": time.time(),
"hf_token": HF_TOKEN
}
# Send to HF Space
client = Client(HF_SPACE)
try:
resp = client.predict(payload, api_name=API_NAME)
pretty_print_response(resp)
return (
resp.get("description", ""),
resp.get("human", ""),
", ".join(resp.get("objects", [])) if resp.get("objects") else "",
resp.get("environment", "")
)
except Exception as e:
console.print(f"[bold red]Error sending to HF:[/bold red] {e}")
return "", "", "", ""
with gr.Blocks() as demo:
gr.Markdown("## πŸŽ₯ Robot Vision Webcam Stream")
with gr.Row():
webcam_input = gr.Image(sources="webcam", streaming=True, label="Webcam Input")
description_out = gr.Textbox(label="Description")
human_out = gr.Textbox(label="Human")
objects_out = gr.Textbox(label="Objects")
environment_out = gr.Textbox(label="Environment")
webcam_input.stream(
process_webcam_stream,
inputs=[webcam_input],
outputs=[description_out, human_out, objects_out, environment_out],
stream_every=0.5
)
if __name__ == "__main__":
demo.launch()