# app.py import os import base64 import json import gradio as gr from huggingface_hub import upload_file, InferenceClient from datetime import datetime import traceback import threading from typing import Optional, Dict, Any, Tuple from fastmcp import FastMCP HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" mcp = FastMCP("Robot_MCP") # ----------------------------------------------------- # Register Robot Tools (MCP) # ----------------------------------------------------- @mcp.tool() def speak(text: str, emotion: str = "neutral"): """Robot speech output""" return { "status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion}, } @mcp.tool() def navigate(direction: str, distance_meters: float): """Move robot safely""" if distance_meters > 5.0: return {"status": "error", "message": "Safety limit exceeded"} return { "status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters}, } @mcp.tool() def scan_hazard(hazard_type: str, severity: str): """Hazard scan + log""" timestamp = datetime.now().isoformat() return { "status": "warning_logged", "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})", } @mcp.tool() def analyze_human(clothing_color: str, estimated_action: str): """Human detection description""" return { "status": "human_tracked", "details": f"Human wearing {clothing_color} is {estimated_action}", } # ----------------------------------------------------- # Save and Upload Image # ----------------------------------------------------- def save_and_upload_image(image_b64: str, hf_token: str): try: image_bytes = base64.b64decode(image_b64) size_bytes = len(image_bytes) print("[debug] decoded image bytes:", size_bytes) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_path, "wb") as f: f.write(image_bytes) print("[debug] wrote local tmp file:", local_path) filename = f"robot_{timestamp}.jpg" upload_file( path_or_fileobj=local_path, path_in_repo=filename, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset", ) print("[debug] upload successful:", filename) url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}" return local_path, url, filename, size_bytes except Exception: traceback.print_exc() return None, None, None, 0 # ----------------------------------------------------- # JSON Parsing Helper # ----------------------------------------------------- def safe_parse_json_from_text(text: str): if not text: return None try: return json.loads(text) except: pass cleaned = text.strip().strip("`") try: start = cleaned.find("{") end = cleaned.rfind("}") if start >= 0 and end > start: return json.loads(cleaned[start : end + 1]) except: pass return None # ----------------------------------------------------- # Only allow tools from MCP registry # ----------------------------------------------------- def validate_and_call_tool(tool_name: str, tool_args: dict): if tool_name not in mcp.tools: return {"error": f"Unknown or unauthorized tool '{tool_name}'"} try: return mcp.tools[tool_name](**tool_args) except Exception as e: traceback.print_exc() return {"error": f"Tool error: {str(e)}"} # ----------------------------------------------------- # Main Pipeline # ----------------------------------------------------- def process_and_describe(payload): if isinstance(payload, str): try: payload = json.loads(payload) except: return {"error": "Invalid JSON payload"} print("\n========== NEW REQUEST ==========") print("[debug] Incoming payload:", payload) hf_token = payload.get("hf_token") if not hf_token: return {"error": "hf_token missing"} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "image_b64 missing"} # Save + Upload local_tmp_path, hf_url, filename, size_bytes = save_and_upload_image( image_b64, hf_token ) if not hf_url: return {"error": "Image upload failed"} print("[debug] HF image URL:", hf_url) # VLM SYSTEM PROMPT system_prompt = """ Respond in STRICT JSON ONLY. Format: { "description": "short visual description", "tool_name": "one of: speak, navigate, scan_hazard, analyze_human", "arguments": { ... } } """ messages = [ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ {"type": "text", "text": "Analyze the image and choose ONE tool."}, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}, }, ], }, ] # VLM CALL print("[debug] Calling VLM model...") client = InferenceClient(token=hf_token) response = client.chat.completions.create( model=HF_VLM_MODEL, messages=messages, max_tokens=300, temperature=0.1, ) vlm_output = response.choices[0].message.content.strip() print("\n------ VLM RAW OUTPUT ------") print(vlm_output) print("------ END VLM RAW ------\n") parsed = safe_parse_json_from_text(vlm_output) if parsed is None: return { "status": "model_no_json", "robot_id": robot_id, "image_url": hf_url, "vlm_raw": vlm_output, "message": "VLM returned invalid JSON", } tool_name = parsed.get("tool_name") tool_args = parsed.get("arguments") or {} tool_result = validate_and_call_tool(tool_name, tool_args) return { "status": "success", "robot_id": robot_id, "image_url": hf_url, "file_size_bytes": size_bytes, "vlm_description": parsed.get("description"), "chosen_tool": tool_name, "tool_arguments": tool_args, "tool_execution_result": tool_result, "vlm_raw": vlm_output, } # ------------------------------ # Gradio Interface # ------------------------------ iface = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input JSON"), outputs=gr.JSON(label="Output JSON"), api_name="predict", flagging_mode="never" ) # ------------------------------ # Main Entry # ------------------------------ if __name__ == "__main__": print("[Gradio] Launching interface...") iface.launch(server_name="0.0.0.0", server_port=7860)