Spaces:
Sleeping
Sleeping
File size: 2,003 Bytes
165189d 627d59b 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 73ea45e 165189d ef0cc41 27c0f8e 73ea45e 27c0f8e 165189d 27c0f8e 165189d 27c0f8e 165189d 0cefe4b 27c0f8e 165189d 27c0f8e 73ea45e 0cefe4b 627d59b 73ea45e 627d59b 165189d 73ea45e 165189d 27c0f8e 165189d 27c0f8e a3fed0c 73ea45e a3fed0c 27c0f8e 165189d 27c0f8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import os
import base64
import time
import io
import gradio as gr
from gradio_client import Client
# Load environment variables
ROBOT_ID = os.environ.get("ROBOT_ID")
HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN")
HF_SPACE = "OppaAI/Robot_MCP_Server"
API_NAME = "/predict"
def process_webcam_stream(image):
"""Send webcam image to HF MCP Server and get result"""
if image is None:
return "", "", "", ""
# Convert Image to base64
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
b64_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Prepare payload
payload = {
"image_b64": b64_img,
"robot_id": ROBOT_ID,
"timestamp": time.time(),
"hf_token": HF_TOKEN
}
# Send to HF Space using streaming-friendly predict
client = Client(HF_SPACE)
try:
resp = client.predict(payload, api_name=API_NAME)
objects_list = resp.get("objects", [])
objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list)
return (
resp.get("description", ""),
resp.get("human", ""),
objects_str,
resp.get("environment", "")
)
except Exception as e:
return f"Error: {e}", "", "", ""
with gr.Blocks() as demo:
gr.Markdown("## 🎥 Robot Vision Webcam Stream")
with gr.Row():
webcam_input = gr.Image(
label="Captured from Web-Cam",
sources=["upload", "webcam"],
type="pil"
)
description_out = gr.Textbox(label="Description")
human_out = gr.Textbox(label="Human")
objects_out = gr.Textbox(label="Objects")
environment_out = gr.Textbox(label="Environment")
webcam_input.stream(
process_webcam_stream,
inputs=[webcam_input],
outputs=[description_out, human_out, objects_out, environment_out],
stream_every=0.5
)
if __name__ == "__main__":
demo.launch()
|