Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import gradio as gr | |
| from huggingface_hub import upload_file, InferenceClient | |
| import json | |
| # --- Config --- | |
| HF_DATASET_REPO = "OppaAI/Robot_MCP" | |
| HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| # --- Helper Functions --- | |
| def save_and_upload_image(image_b64, hf_token): | |
| """Save image to /tmp and upload to HF dataset.""" | |
| image_bytes = base64.b64decode(image_b64) | |
| local_tmp_path = "/tmp/tmp.jpg" | |
| with open(local_tmp_path, "wb") as f: | |
| f.write(image_bytes) | |
| path_in_repo = "images/tmp.jpg" | |
| upload_file( | |
| path_or_fileobj=local_tmp_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=HF_DATASET_REPO, | |
| token=hf_token, | |
| repo_type="dataset" | |
| ) | |
| hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" | |
| return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes) | |
| # --- Main MCP function --- | |
| def process_and_describe(payload: dict): | |
| try: | |
| hf_token = payload.get("hf_token") | |
| if not hf_token: | |
| return {"error": "HF token not provided in payload."} | |
| robot_id = payload.get("robot_id", "unknown") | |
| image_b64 = payload.get("image_b64") | |
| if not image_b64: | |
| return {"error": "No image provided."} | |
| # Save & upload | |
| local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) | |
| # Init HF client | |
| hf_client = InferenceClient(token=hf_token) | |
| # System prompt: describe + suggest action | |
| system_prompt = """ | |
| You are a helpful robot assistant. | |
| 1. Describe the image in detail. | |
| 2. Suggest what the robot should do next based on what it sees: | |
| - Human figure β say 'Hi'. | |
| - Ball β move towards it. | |
| - Obstacles β stop or avoid. | |
| - Red button β press it. | |
| Always respond in JSON: | |
| {"description": "...", "action": {"move": "...", "interact": "..."}} | |
| """ | |
| messages_payload = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Here is an image."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} | |
| ]} | |
| ] | |
| # Call VLM | |
| chat_completion = hf_client.chat.completions.create( | |
| model=HF_VLM_MODEL, | |
| messages=messages_payload, | |
| max_tokens=300 | |
| ) | |
| # Robustly extract text | |
| try: | |
| vlm_text = chat_completion.choices[0].message.content.strip() | |
| except Exception: | |
| # fallback if structure is different | |
| vlm_text = str(chat_completion) | |
| # Attempt to parse JSON from VLM | |
| action_data = {} | |
| try: | |
| action_data = json.loads(vlm_text) | |
| except Exception: | |
| # If VLM didn't return valid JSON, wrap text as description | |
| action_data = {"description": vlm_text, "action": {"move": "unknown", "interact": "unknown"}} | |
| return { | |
| "saved_to_hf_hub": True, | |
| "repo_id": HF_DATASET_REPO, | |
| "path_in_repo": path_in_repo, | |
| "image_url": hf_url, | |
| "file_size_bytes": size_bytes, | |
| "robot_id": robot_id, | |
| "vlm_response": vlm_text, | |
| "vlm_action": action_data.get("action", {}), | |
| "vlm_description": action_data.get("description", "") | |
| } | |
| except Exception as e: | |
| return {"error": f"An API error occurred: {str(e)}"} | |
| # --- Gradio MCP Interface --- | |
| demo = gr.Interface( | |
| fn=process_and_describe, | |
| inputs=gr.JSON(label="Input Payload (Dict format with 'image_b64')"), | |
| outputs=gr.JSON(label="Reply to Jetson"), | |
| api_name="predict" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) | |