# app.py (MCP + HF Space unified) import os import base64 import json import gradio as gr from huggingface_hub import upload_file, InferenceClient from datetime import datetime import traceback from typing import Tuple, Optional, Dict, Any from fastmcp import FastMCP, Tool HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" # ================================================================ # MCP SERVER + TOOLS (FASTMCP) # ================================================================ mcp = FastMCP("Robot_MCP_Server") # ------------------------- # MCP Tools # ------------------------- @mcp.tool() def speak(text: str, emotion: str = "neutral") -> dict: """ Speak something with a given emotion. """ return { "status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion} } @mcp.tool() def navigate(direction: str, distance_meters: float) -> dict: """ Navigate the robot safely. Max distance: 5m. """ if distance_meters > 5.0: return {"status": "error", "message": "Safety limit exceeded"} return { "status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters} } @mcp.tool() def scan_hazard(hazard_type: str, severity: str) -> dict: """ Log a hazard event. """ timestamp = datetime.now().isoformat() return { "status": "warning_logged", "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})" } @mcp.tool() def analyze_human(clothing_color: str, estimated_action: str) -> dict: """ Describe a detected human. """ return { "status": "human_tracked", "details": f"Human wearing {clothing_color} is {estimated_action}" } # MCP tool definitions to embed into VLM system prompt TOOL_SPECS = mcp.get_tool_schemas() # ================================================================ # HELPER: SAVE + UPLOAD IMAGE # ================================================================ def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]: try: image_bytes = base64.b64decode(image_b64) size_bytes = len(image_bytes) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_path, "wb") as f: f.write(image_bytes) filename = f"robot_{timestamp}.jpg" upload_file( path_or_fileobj=local_path, path_in_repo=filename, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset" ) url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}" return local_path, url, filename, size_bytes except Exception as e: traceback.print_exc() return None, None, None, 0 # ================================================================ # VLM JSON PARSER # ================================================================ def safe_parse_json_from_text(text: str) -> Optional[dict]: if not text: return None try: return json.loads(text) except: pass cleaned = text.strip().strip("`") try: start = cleaned.find("{") end = cleaned.rfind("}") if start >= 0 and end > start: return json.loads(cleaned[start:end+1]) except: pass return None # ================================================================ # EXECUTE TOOL USING MCP INTERNAL DISPATCH # ================================================================ def execute_tool(tool_name: str, tool_args: dict): tools = {t["name"]: t for t in TOOL_SPECS} if tool_name not in tools: return {"error": f"Unknown tool '{tool_name}'"} try: # Run actual MCP tool function fn = mcp.tools[tool_name] return fn(**tool_args) except Exception as e: traceback.print_exc() return {"error": f"Tool execution error: {str(e)}"} # ================================================================ # MAIN API HANDLER (used by Gradio) # ================================================================ def process_and_describe(payload): if isinstance(payload, str): try: payload = json.loads(payload) except: return {"error": "Invalid JSON string"} hf_token = payload.get("hf_token") if not hf_token: return {"error": "hf_token missing"} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "image_b64 missing"} # ---- save & upload ---- local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) if not hf_url: return {"error": "Image upload failed"} # ---- Build VLM prompt ---- tool_list_json = json.dumps(TOOL_SPECS, indent=2) system_prompt = f""" You are an AI that MUST respond in valid JSON only. You have the following robot tools available: {tool_list_json} Return ONLY this format: {{ "description": "short visual description", "tool_name": "", "arguments": {{ ... }} }} """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Analyze the image and pick EXACTLY ONE tool."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] client = InferenceClient(token=hf_token) response = client.chat.completions.create( model=HF_VLM_MODEL, messages=messages, temperature=0.1, max_tokens=300 ) vlm_raw = response.choices[0].message.content.strip() parsed = safe_parse_json_from_text(vlm_raw) if not parsed: return { "status": "model_no_json", "robot_id": robot_id, "image_url": hf_url, "vlm_raw": vlm_raw, "error": "VLM did not provide valid JSON" } tool_name = parsed.get("tool_name") tool_args = parsed.get("arguments") or {} tool_exec = execute_tool(tool_name, tool_args) result = { "status": "success", "robot_id": robot_id, "image_url": hf_url, "image_bytes": size_bytes, "analysis": parsed.get("description"), "chosen_tool": tool_name, "tool_arguments": tool_args, "tool_execution_result": tool_exec, "vlm_raw": vlm_raw } return result # ================================================================ # GRADIO API (for your client script) # ================================================================ iface = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input JSON"), outputs=gr.JSON(label="Output JSON"), api_name="predict", allow_flagging="never" ) if __name__ == "__main__": # Start MCP server (background) mcp.run_in_thread() iface.launch()