import os import base64 import gradio as gr from huggingface_hub import upload_file, InferenceClient import json # --- Config --- HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" # --- Helper Functions --- def save_and_upload_image(image_b64, hf_token): """Save image to /tmp and upload to HF dataset.""" image_bytes = base64.b64decode(image_b64) local_tmp_path = "/tmp/tmp.jpg" with open(local_tmp_path, "wb") as f: f.write(image_bytes) path_in_repo = "images/tmp.jpg" upload_file( path_or_fileobj=local_tmp_path, path_in_repo=path_in_repo, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset" ) hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes) # --- Main MCP function --- def process_and_describe(payload: dict): try: hf_token = payload.get("hf_token") if not hf_token: return {"error": "HF token not provided in payload."} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "No image provided."} # Save & upload local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) # Init HF client hf_client = InferenceClient(token=hf_token) # System prompt: describe + suggest action system_prompt = """ You are a helpful robot assistant. 1. Describe the image in detail. 2. Suggest what the robot should do next based on what it sees: - Human figure → say 'Hi'. - Ball → move towards it. - Obstacles → stop or avoid. - Red button → press it. Always respond in JSON: {"description": "...", "action": {"move": "...", "interact": "..."}} """ messages_payload = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Here is an image."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] # Call VLM chat_completion = hf_client.chat.completions.create( model=HF_VLM_MODEL, messages=messages_payload, max_tokens=300 ) # Robustly extract text try: vlm_text = chat_completion.choices[0].message.content.strip() except Exception: # fallback if structure is different vlm_text = str(chat_completion) # Attempt to parse JSON from VLM action_data = {} try: action_data = json.loads(vlm_text) except Exception: # If VLM didn't return valid JSON, wrap text as description action_data = {"description": vlm_text, "action": {"move": "unknown", "interact": "unknown"}} return { "saved_to_hf_hub": True, "repo_id": HF_DATASET_REPO, "path_in_repo": path_in_repo, "image_url": hf_url, "file_size_bytes": size_bytes, "robot_id": robot_id, "vlm_response": vlm_text, "vlm_action": action_data.get("action", {}), "vlm_description": action_data.get("description", "") } except Exception as e: return {"error": f"An API error occurred: {str(e)}"} # --- Gradio MCP Interface --- demo = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input Payload (Dict format with 'image_b64')"), outputs=gr.JSON(label="Reply to Jetson"), api_name="predict" ) if __name__ == "__main__": demo.launch(mcp_server=True)