import os import base64 import json from datetime import datetime import traceback from typing import Dict, Any import gradio as gr from huggingface_hub import HfApi, InferenceClient from fastmcp import FastMCP from pydantic import BaseModel, Field # Import Pydantic BaseModel and Field HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP") HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct") mcp = FastMCP("Robot_MCP_Server") # <-- Important # --------------------------------------------------- # Define Pydantic Schema for the input payload # --------------------------------------------------- # This defines the expected structure and automatically generates the valid JSON schema class RobotWatchPayload(BaseModel): hf_token: str = Field(description="Your Hugging Face API token.") robot_id: str = Field(description="The unique identifier for the robot.", default="unknown") image_b64: str = Field(description="Base64 encoded image data.") def upload_image(image_b64: str, hf_token: str): try: image_bytes = base64.b64decode(image_b64) os.makedirs("/tmp", exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_path, "wb") as f: f.write(image_bytes) filename = f"robot_{timestamp}.jpg" api = HfApi() api.upload_file( path_or_fileobj=local_path, path_in_repo=f"tmp/{filename}", repo_id=HF_DATASET_REPO, repo_type="dataset", token=hf_token ) url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}" return local_path, url, filename, len(image_bytes) except Exception: traceback.print_exc() return None, None, None, 0 def safe_parse_json_from_text(text: str): if not text: return None try: return json.loads(text) except: pass cleaned = text.strip().strip("`").strip() if cleaned.lower().startswith("json"): cleaned = cleaned[4:].strip() try: start = cleaned.find("{") end = cleaned.rfind("}") return json.loads(cleaned[start:end + 1]) except: return None # --------------------------------------------------- # TRUE MCP TOOL — THIS must be exposed to MCP client # --------------------------------------------------- @mcp.tool("robot_watch", description="Analyze a base64 image using Qwen VLM and return structured JSON.") def robot_watch(payload: RobotWatchPayload): # <-- Type hint with Pydantic model # The payload is already validated and typed correctly by fastmcp/pydantic hf_token = payload.hf_token image_b64 = payload.image_b64 robot_id = payload.robot_id if not hf_token: # This check is technically redundant if the schema demands it, but safe. return {"error": "Missing hf_token"} # image_b64 existence is also guaranteed by the schema _, hf_url, _, size_bytes = upload_image(image_b64, hf_token) if not hf_url: return {"error": "Image upload failed"} system_prompt = """ Respond in STRICT JSON ONLY: { "description": "...", "human": "...", "environment": "...", "objects": [] } """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Analyze the image."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] client = InferenceClient(token=hf_token) try: resp = client.chat.completions.create( model=HF_VLM_MODEL, messages=messages, max_tokens=500, temperature=0.1 ) except Exception as e: return {"status": "error", "message": str(e)} vlm_output = resp.choices[0].message.content.strip() parsed = safe_parse_json_from_text(vlm_output) or {} return { "status": "success", "robot_id": robot_id, "file_size_bytes": size_bytes, "image_url": hf_url, "result": parsed, "vlm_raw": vlm_output } # --------------------------------------------------- # Gradio UI — Use a simple placeholder function for the UI # --------------------------------------------------- def gradio_placeholder(payload): # This is just for the interactive UI, the real API call goes to /robot_watch endpoint return {"message": "Use the MCP Client to call the robot_watch tool."} app = gr.Interface( fn=gradio_placeholder, inputs=gr.JSON(), outputs=gr.JSON(), title="Robot MCP Server", description="A MCP Server to describe image obtained from the CV of a Robot/Webcam" ) if __name__ == "__main__": # Launch Gradio, which automatically hooks up the 'mcp' instance's APIs app.launch(mcp_server=True)