import os import base64 import json from datetime import datetime import traceback # Removed unused typing import: from typing import Dict, Any import gradio as gr from huggingface_hub import HfApi, InferenceClient # The FastMCP object is automatically initialized when you call app.launch(mcp_server=True) # You don't need to manually instantiate FastMCP if only using Gradio's integration. # from fastmcp import FastMCP # Removed manual import/instantiation from pydantic import BaseModel, Field HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP") HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct") # mcp = FastMCP("Robot_MCP_Server") # Removed this line # --------------------------------------------------- # Payload Schema # --------------------------------------------------- class RobotWatchPayload(BaseModel): hf_token: str = Field(description="Your Hugging Face API token.") robot_id: str = Field(description="Robot identifier.", default="unknown") image_b64: str = Field(description="Base64 encoded image data.") # --------------------------------------------------- # Upload Helper (Remains the same) # --------------------------------------------------- def upload_image(image_b64: str, hf_token: str): try: image_bytes = base64.b64decode(image_b64) os.makedirs("/tmp", exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_path, "wb") as f: f.write(image_bytes) filename = f"robot_{timestamp}.jpg" api = HfApi() api.upload_file( path_or_fileobj=local_path, path_in_repo=f"tmp/{filename}", repo_id=HF_DATASET_REPO, repo_type="dataset", token=hf_token ) hf_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}" return local_path, hf_url, filename, len(image_bytes) except Exception: traceback.print_exc() return None, None, None, 0 # --------------------------------------------------- # JSON Cleaning Helper (Remains the same) # --------------------------------------------------- def safe_parse_json_from_text(text: str): if not text: return None try: return json.loads(text) except: pass cleaned = text.strip().strip("`").strip() if cleaned.lower().startswith("json"): cleaned = cleaned[4:].strip() try: start = cleaned.find("{") end = cleaned.rfind("}") return json.loads(cleaned[start:end + 1]) except: return None # --------------------------------------------------- # Core VLM Analysis Logic (Renamed to avoid conflict) # --------------------------------------------------- def run_vlm_analysis(payload: RobotWatchPayload): """ Analyze a base64-encoded image using a Hugging Face Vision-Language Model (VLM). """ # The payload is automatically validated by the time it reaches here if called via MCP hf_token = payload.hf_token image_b64 = payload.image_b64 robot_id = payload.robot_id _, hf_url, _, size_bytes = upload_image(image_b64, hf_token) if not hf_url: return {"error": "Image upload failed"} system_prompt = """ Respond in STRICT JSON ONLY: { "description": "...", "human": "...", "environment": "...", "objects": [] } """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Analyze the image."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] client = InferenceClient(token=hf_token) try: resp = client.chat.completions.create( model=HF_VLM_MODEL, messages=messages, max_tokens=500, temperature=0.1 ) except Exception as e: return {"status": "error", "message": str(e)} vlm_output = resp.choices[0].message.content.strip() parsed = safe_parse_json_from_text(vlm_output) or {} return { "status": "success", "robot_id": robot_id, "file_size_bytes": size_bytes, "image_url": hf_url, "result": parsed, "vlm_raw": vlm_output } # --------------------------------------------------- # Gradio UI Function # --------------------------------------------------- def gradio_interface_fn(payload: RobotWatchPayload): """ This function acts as the entry point for both the Gradio UI and the MCP Server endpoint. Using the Pydantic model ensures a valid JSON schema is exposed. """ # When called via MCP, the input is already a RobotWatchPayload instance. return run_vlm_analysis(payload) app = gr.Interface( fn=gradio_interface_fn, # Use the single entry point function # Corrected input component from gr.JSON() to gr.Json() as per Gradio documentation inputs=gr.Json(label="Input Payload (Pydantic Schema Applied)"), outputs=gr.Json(label="Tool Output"), title="Robot MCP Server", description="A MCP Server to describe image obtained from the CV of a robot/webcam.", api_name="predict" ) # --------------------------------------------------- # Explicit MCP API Definition # --------------------------------------------------- # We explicitly add the API using the Pydantic model for schema generation app.api.post( "/mcp/tool/robot_watch", # This defines the exact endpoint path for the tool run_vlm_analysis, # Link it to the Pydantic-typed function inputs=[RobotWatchPayload], # Use the Pydantic model as the explicit input schema outputs=[dict] # The output type ) if __name__ == "__main__": # Launch Gradio with mcp_server=True which hooks up the above API app.launch(mcp_server=True)