Spaces:
Sleeping
Sleeping
| # app.py (MCP + HF Space unified) | |
| import os | |
| import base64 | |
| import json | |
| import gradio as gr | |
| from huggingface_hub import upload_file, InferenceClient | |
| from datetime import datetime | |
| import traceback | |
| from typing import Tuple, Optional, Dict, Any | |
| from fastmcp import FastMCP, Tool | |
| HF_DATASET_REPO = "OppaAI/Robot_MCP" | |
| HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| # ================================================================ | |
| # MCP SERVER + TOOLS (FASTMCP) | |
| # ================================================================ | |
| mcp = FastMCP("Robot_MCP_Server") | |
| # ------------------------- | |
| # MCP Tools | |
| # ------------------------- | |
| def speak(text: str, emotion: str = "neutral") -> dict: | |
| """ | |
| Speak something with a given emotion. | |
| """ | |
| return { | |
| "status": "success", | |
| "action_executed": "speak", | |
| "payload": {"text": text, "emotion": emotion} | |
| } | |
| def navigate(direction: str, distance_meters: float) -> dict: | |
| """ | |
| Navigate the robot safely. Max distance: 5m. | |
| """ | |
| if distance_meters > 5.0: | |
| return {"status": "error", "message": "Safety limit exceeded"} | |
| return { | |
| "status": "success", | |
| "action_executed": "navigate", | |
| "payload": {"direction": direction, "distance": distance_meters} | |
| } | |
| def scan_hazard(hazard_type: str, severity: str) -> dict: | |
| """ | |
| Log a hazard event. | |
| """ | |
| timestamp = datetime.now().isoformat() | |
| return { | |
| "status": "warning_logged", | |
| "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})" | |
| } | |
| def analyze_human(clothing_color: str, estimated_action: str) -> dict: | |
| """ | |
| Describe a detected human. | |
| """ | |
| return { | |
| "status": "human_tracked", | |
| "details": f"Human wearing {clothing_color} is {estimated_action}" | |
| } | |
| # MCP tool definitions to embed into VLM system prompt | |
| TOOL_SPECS = mcp.get_tool_schemas() | |
| # ================================================================ | |
| # HELPER: SAVE + UPLOAD IMAGE | |
| # ================================================================ | |
| def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]: | |
| try: | |
| image_bytes = base64.b64decode(image_b64) | |
| size_bytes = len(image_bytes) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| local_path = f"/tmp/robot_img_{timestamp}.jpg" | |
| with open(local_path, "wb") as f: | |
| f.write(image_bytes) | |
| filename = f"robot_{timestamp}.jpg" | |
| upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=filename, | |
| repo_id=HF_DATASET_REPO, | |
| token=hf_token, | |
| repo_type="dataset" | |
| ) | |
| url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}" | |
| return local_path, url, filename, size_bytes | |
| except Exception as e: | |
| traceback.print_exc() | |
| return None, None, None, 0 | |
| # ================================================================ | |
| # VLM JSON PARSER | |
| # ================================================================ | |
| def safe_parse_json_from_text(text: str) -> Optional[dict]: | |
| if not text: | |
| return None | |
| try: | |
| return json.loads(text) | |
| except: | |
| pass | |
| cleaned = text.strip().strip("`") | |
| try: | |
| start = cleaned.find("{") | |
| end = cleaned.rfind("}") | |
| if start >= 0 and end > start: | |
| return json.loads(cleaned[start:end+1]) | |
| except: | |
| pass | |
| return None | |
| # ================================================================ | |
| # EXECUTE TOOL USING MCP INTERNAL DISPATCH | |
| # ================================================================ | |
| def execute_tool(tool_name: str, tool_args: dict): | |
| tools = {t["name"]: t for t in TOOL_SPECS} | |
| if tool_name not in tools: | |
| return {"error": f"Unknown tool '{tool_name}'"} | |
| try: | |
| # Run actual MCP tool function | |
| fn = mcp.tools[tool_name] | |
| return fn(**tool_args) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return {"error": f"Tool execution error: {str(e)}"} | |
| # ================================================================ | |
| # MAIN API HANDLER (used by Gradio) | |
| # ================================================================ | |
| def process_and_describe(payload): | |
| if isinstance(payload, str): | |
| try: | |
| payload = json.loads(payload) | |
| except: | |
| return {"error": "Invalid JSON string"} | |
| hf_token = payload.get("hf_token") | |
| if not hf_token: | |
| return {"error": "hf_token missing"} | |
| robot_id = payload.get("robot_id", "unknown") | |
| image_b64 = payload.get("image_b64") | |
| if not image_b64: | |
| return {"error": "image_b64 missing"} | |
| # ---- save & upload ---- | |
| local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) | |
| if not hf_url: | |
| return {"error": "Image upload failed"} | |
| # ---- Build VLM prompt ---- | |
| tool_list_json = json.dumps(TOOL_SPECS, indent=2) | |
| system_prompt = f""" | |
| You are an AI that MUST respond in valid JSON only. | |
| You have the following robot tools available: | |
| {tool_list_json} | |
| Return ONLY this format: | |
| {{ | |
| "description": "short visual description", | |
| "tool_name": "<one of the tool names>", | |
| "arguments": {{ ... }} | |
| }} | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Analyze the image and pick EXACTLY ONE tool."}, | |
| {"type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} | |
| ]} | |
| ] | |
| client = InferenceClient(token=hf_token) | |
| response = client.chat.completions.create( | |
| model=HF_VLM_MODEL, | |
| messages=messages, | |
| temperature=0.1, | |
| max_tokens=300 | |
| ) | |
| vlm_raw = response.choices[0].message.content.strip() | |
| parsed = safe_parse_json_from_text(vlm_raw) | |
| if not parsed: | |
| return { | |
| "status": "model_no_json", | |
| "robot_id": robot_id, | |
| "image_url": hf_url, | |
| "vlm_raw": vlm_raw, | |
| "error": "VLM did not provide valid JSON" | |
| } | |
| tool_name = parsed.get("tool_name") | |
| tool_args = parsed.get("arguments") or {} | |
| tool_exec = execute_tool(tool_name, tool_args) | |
| result = { | |
| "status": "success", | |
| "robot_id": robot_id, | |
| "image_url": hf_url, | |
| "image_bytes": size_bytes, | |
| "analysis": parsed.get("description"), | |
| "chosen_tool": tool_name, | |
| "tool_arguments": tool_args, | |
| "tool_execution_result": tool_exec, | |
| "vlm_raw": vlm_raw | |
| } | |
| return result | |
| # ================================================================ | |
| # GRADIO API (for your client script) | |
| # ================================================================ | |
| iface = gr.Interface( | |
| fn=process_and_describe, | |
| inputs=gr.JSON(label="Input JSON"), | |
| outputs=gr.JSON(label="Output JSON"), | |
| api_name="predict", | |
| allow_flagging="never" | |
| ) | |
| if __name__ == "__main__": | |
| # Start MCP server (background) | |
| mcp.run_in_thread() | |
| iface.launch() | |