import os import base64 import json import gradio as gr from huggingface_hub import HfApi, InferenceClient from datetime import datetime import traceback from typing import Optional, Dict, Any from fastmcp import FastMCP # --- Configuration --- HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP") HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct") # Create MCP server mcp = FastMCP("Robot_MCP_Server") # ----------------------------------------------------- # Save and upload image to HF # ----------------------------------------------------- def upload_image(image_b64: str, hf_token: str): try: image_bytes = base64.b64decode(image_b64) size_bytes = len(image_bytes) os.makedirs("/tmp", exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_path, "wb") as f: f.write(image_bytes) filename = f"robot_{timestamp}.jpg" api = HfApi() api.upload_file( path_or_fileobj=local_path, path_in_repo=f"tmp/{filename}", repo_id=HF_DATASET_REPO, repo_type="dataset", token=hf_token ) # FIXED URL url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}" return local_path, url, filename, size_bytes except Exception as e: print(f"[Error] during image upload: {e}") traceback.print_exc() return None, None, None, 0 # ----------------------------------------------------- # JSON parsing helper # ----------------------------------------------------- def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]: if not text: return None try: return json.loads(text) except: pass cleaned = text.strip().strip("`").strip() if cleaned.lower().startswith("json"): cleaned = cleaned[4:].strip() try: start = cleaned.find("{") end = cleaned.rfind("}") if start >= 0 and end > start: return json.loads(cleaned[start:end + 1]) except: return None return None # ----------------------------------------------------- # MCP Tool: image → VLM → structured JSON # ----------------------------------------------------- @mcp.tool() def robot_watch(payload: Dict[str, Any]) -> Dict[str, Any]: if isinstance(payload, str): try: payload = json.loads(payload) except: return {"error": "Invalid JSON payload"} hf_token = payload.get("hf_token") if not hf_token: return {"error": "hf_token missing"} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "image_b64 missing"} # 1. Save + Upload _, hf_url, _, size_bytes = upload_image(image_b64, hf_token) if not hf_url: return {"error": "Image upload failed"} # 2. VLM prompt system_prompt = """ Respond in STRICT JSON ONLY. Output format: { "description": "...", "human": "...", "environment": "..." } """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Analyze the image and provide the description."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] client = InferenceClient(token=hf_token) try: response = client.chat.completions.create( model=HF_VLM_MODEL, messages=messages, max_tokens=300, temperature=0.1, ) except Exception as e: return {"status": "error", "message": f"Inference API call failed: {e}"} vlm_output = response.choices[0].message.content.strip() parsed = safe_parse_json_from_text(vlm_output) if parsed is None: return { "status": "model_no_json", "robot_id": robot_id, "vlm_raw": vlm_output, "message": "VLM returned invalid JSON" } return { "status": "success", "robot_id": robot_id, "file_size_bytes": size_bytes, "image_url": hf_url, "description": parsed.get("description"), "human": parsed.get("human"), "environment": parsed.get("environment"), "vlm_raw": vlm_output } # ----------------------------------------------------- # Gradio Interface wrapper # ----------------------------------------------------- def process_and_describe(payload): return robot_watch(payload) app = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input JSON Payload (must include hf_token & image_b64)"), outputs=gr.JSON(label="Output JSON Result"), api_name="predict", flagging_mode="never" ) # ----------------------------------------------------- # Entry # ----------------------------------------------------- if __name__ == "__main__": print("[MCP] Robot MCP Server starting...") mcp.run(background=True) print("[Gradio] Launching interface...") app.launch()