Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import base64 | |
| import time | |
| import requests | |
| from io import BytesIO | |
| from typing import Dict, Any | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from rich.console import Console | |
| from rich.table import Table | |
| from rich import box | |
| # ------------------------------ | |
| # Environment | |
| # ------------------------------ | |
| load_dotenv() | |
| ROBOT_ID = os.environ.get("ROBOT_ID", "robot_001") | |
| HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") | |
| MCP_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:7860/run_tool/robot_watch") # Replace with actual URL | |
| console = Console() | |
| # ------------------------------ | |
| # Rich table helper | |
| # ------------------------------ | |
| def format_response(resp: Dict[str, Any]): | |
| """Return a string for Gradio display with similar formatting to terminal rich table.""" | |
| objects_list = resp.get("objects", []) | |
| objects_str = ", ".join(objects_list) if isinstance(objects_list, list) else str(objects_list) | |
| table = Table( | |
| title="π Robot Vision Result", | |
| title_style="bold cyan", | |
| title_justify="left", | |
| box=box.ROUNDED, | |
| show_lines=True, | |
| show_header=False, | |
| style="bold cyan" | |
| ) | |
| table.add_column("Field", style="bold magenta") | |
| table.add_column("Value", style="white") | |
| table.add_row("π€ Robot ID", str(resp.get("robot_id", "N/A"))) | |
| table.add_row("ποΈ Image Size", str(resp.get("file_size_bytes", "N/A"))) | |
| table.add_row("π Description", str(resp.get("description", "N/A"))) | |
| table.add_row("π₯ Human", str(resp.get("human", "N/A"))) | |
| table.add_row("π¦ Objects", objects_str) | |
| table.add_row("ποΈ Environment", str(resp.get("environment", "N/A"))) | |
| # Render as string for Gradio display | |
| from rich.console import Console | |
| from io import StringIO | |
| s = StringIO() | |
| temp_console = Console(file=s, force_terminal=True, color_system="truecolor", width=120) | |
| temp_console.print(table) | |
| return s.getvalue() | |
| # ------------------------------ | |
| # Capture & call MCP tool | |
| # ------------------------------ | |
| def process_frame_stream() -> Dict[str, Any]: | |
| """Capture frame, send to MCP server, and return dict for Gradio.""" | |
| cap = cv2.VideoCapture(0) | |
| if not cap.isOpened(): | |
| return {"result": "Camera not opened", "image": None} | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| return {"result": "Failed to read frame", "image": None} | |
| # Encode image as JPEG + base64 | |
| ok, jpeg = cv2.imencode(".jpg", frame) | |
| if not ok: | |
| return {"result": "Failed to encode frame", "image": None} | |
| b64_img = base64.b64encode(jpeg.tobytes()).decode("utf-8") | |
| # Payload for MCP server | |
| payload = { | |
| "image_b64": b64_img, | |
| "robot_id": ROBOT_ID, | |
| "hf_token": HF_TOKEN | |
| } | |
| try: | |
| # Streamable POST request to MCP | |
| response = requests.post(MCP_URL, json=payload, stream=True) | |
| response.raise_for_status() | |
| # MCP returns JSON | |
| resp_json = response.json() | |
| # Convert response into rich table string | |
| table_str = format_response(resp_json) | |
| # Decode frame for display in Gradio | |
| img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| return {"result": table_str, "image": img_rgb} | |
| except Exception as e: | |
| return {"result": f"Error calling MCP: {e}", "image": None} | |
| # ------------------------------ | |
| # Gradio Interface | |
| # ------------------------------ | |
| with gr.Blocks(title="Robot Vision Stream") as app: | |
| with gr.Row(): | |
| output_text = gr.Textbox(label="Result", lines=20, interactive=False, placeholder="MCP results will appear here") | |
| output_image = gr.Image(label="Camera Frame", type="numpy") | |
| # Stream button triggers frame capture every 1 second | |
| gr.Button("Capture & Analyze").click(fn=process_frame_stream, outputs=[output_text, output_image]) | |
| if __name__ == "__main__": | |
| app.launch() | |