OppaAI's picture
Update app.py
5df7db5 verified
raw
history blame
7.28 kB
# app.py (MCP + HF Space unified)
import os
import base64
import json
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
from datetime import datetime
import traceback
from typing import Tuple, Optional, Dict, Any
from fastmcp import FastMCP, Tool
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# ================================================================
# MCP SERVER + TOOLS (FASTMCP)
# ================================================================
mcp = FastMCP("Robot_MCP_Server")
# -------------------------
# MCP Tools
# -------------------------
@mcp.tool()
def speak(text: str, emotion: str = "neutral") -> dict:
"""
Speak something with a given emotion.
"""
return {
"status": "success",
"action_executed": "speak",
"payload": {"text": text, "emotion": emotion}
}
@mcp.tool()
def navigate(direction: str, distance_meters: float) -> dict:
"""
Navigate the robot safely. Max distance: 5m.
"""
if distance_meters > 5.0:
return {"status": "error", "message": "Safety limit exceeded"}
return {
"status": "success",
"action_executed": "navigate",
"payload": {"direction": direction, "distance": distance_meters}
}
@mcp.tool()
def scan_hazard(hazard_type: str, severity: str) -> dict:
"""
Log a hazard event.
"""
timestamp = datetime.now().isoformat()
return {
"status": "warning_logged",
"log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})"
}
@mcp.tool()
def analyze_human(clothing_color: str, estimated_action: str) -> dict:
"""
Describe a detected human.
"""
return {
"status": "human_tracked",
"details": f"Human wearing {clothing_color} is {estimated_action}"
}
# MCP tool definitions to embed into VLM system prompt
TOOL_SPECS = mcp.get_tool_schemas()
# ================================================================
# HELPER: SAVE + UPLOAD IMAGE
# ================================================================
def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]:
try:
image_bytes = base64.b64decode(image_b64)
size_bytes = len(image_bytes)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_path, "wb") as f:
f.write(image_bytes)
filename = f"robot_{timestamp}.jpg"
upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}"
return local_path, url, filename, size_bytes
except Exception as e:
traceback.print_exc()
return None, None, None, 0
# ================================================================
# VLM JSON PARSER
# ================================================================
def safe_parse_json_from_text(text: str) -> Optional[dict]:
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip().strip("`")
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
return json.loads(cleaned[start:end+1])
except:
pass
return None
# ================================================================
# EXECUTE TOOL USING MCP INTERNAL DISPATCH
# ================================================================
def execute_tool(tool_name: str, tool_args: dict):
tools = {t["name"]: t for t in TOOL_SPECS}
if tool_name not in tools:
return {"error": f"Unknown tool '{tool_name}'"}
try:
# Run actual MCP tool function
fn = mcp.tools[tool_name]
return fn(**tool_args)
except Exception as e:
traceback.print_exc()
return {"error": f"Tool execution error: {str(e)}"}
# ================================================================
# MAIN API HANDLER (used by Gradio)
# ================================================================
def process_and_describe(payload):
if isinstance(payload, str):
try:
payload = json.loads(payload)
except:
return {"error": "Invalid JSON string"}
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "hf_token missing"}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "image_b64 missing"}
# ---- save & upload ----
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
if not hf_url:
return {"error": "Image upload failed"}
# ---- Build VLM prompt ----
tool_list_json = json.dumps(TOOL_SPECS, indent=2)
system_prompt = f"""
You are an AI that MUST respond in valid JSON only.
You have the following robot tools available:
{tool_list_json}
Return ONLY this format:
{{
"description": "short visual description",
"tool_name": "<one of the tool names>",
"arguments": {{ ... }}
}}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze the image and pick EXACTLY ONE tool."},
{"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
client = InferenceClient(token=hf_token)
response = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
temperature=0.1,
max_tokens=300
)
vlm_raw = response.choices[0].message.content.strip()
parsed = safe_parse_json_from_text(vlm_raw)
if not parsed:
return {
"status": "model_no_json",
"robot_id": robot_id,
"image_url": hf_url,
"vlm_raw": vlm_raw,
"error": "VLM did not provide valid JSON"
}
tool_name = parsed.get("tool_name")
tool_args = parsed.get("arguments") or {}
tool_exec = execute_tool(tool_name, tool_args)
result = {
"status": "success",
"robot_id": robot_id,
"image_url": hf_url,
"image_bytes": size_bytes,
"analysis": parsed.get("description"),
"chosen_tool": tool_name,
"tool_arguments": tool_args,
"tool_execution_result": tool_exec,
"vlm_raw": vlm_raw
}
return result
# ================================================================
# GRADIO API (for your client script)
# ================================================================
iface = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input JSON"),
outputs=gr.JSON(label="Output JSON"),
api_name="predict",
allow_flagging="never"
)
if __name__ == "__main__":
# Start MCP server (background)
mcp.run_in_thread()
iface.launch()