Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -1,4 +1,3 @@
|
|
| 1 |
-
# app.py
|
| 2 |
import os
|
| 3 |
import base64
|
| 4 |
import json
|
|
@@ -7,51 +6,22 @@ from huggingface_hub import upload_file, InferenceClient
|
|
| 7 |
from datetime import datetime
|
| 8 |
import traceback
|
| 9 |
from typing import Optional, Dict, Any
|
| 10 |
-
|
| 11 |
-
from fastmcp import
|
| 12 |
|
| 13 |
# --- Configuration ---
|
| 14 |
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
|
| 15 |
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
|
| 16 |
-
|
| 17 |
-
mcp = FastMCP("Robot_MCP")
|
| 18 |
-
|
| 19 |
-
# -----------------------------------------------------
|
| 20 |
-
# Register Robot Tools (MCP)
|
| 21 |
-
# -----------------------------------------------------
|
| 22 |
-
@mcp.tool()
|
| 23 |
-
def speak(text: str, emotion: str = "neutral"):
|
| 24 |
-
"""Makes the robot speak a given text with an emotion."""
|
| 25 |
-
return {"status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion}}
|
| 26 |
-
|
| 27 |
-
@mcp.tool()
|
| 28 |
-
def navigate(direction: str, distance_meters: float):
|
| 29 |
-
"""Moves the robot a specified distance in a direction (max 5m)."""
|
| 30 |
-
if distance_meters > 5.0:
|
| 31 |
-
return {"status": "error", "message": "Safety limit exceeded"}
|
| 32 |
-
return {"status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters}}
|
| 33 |
-
|
| 34 |
-
@mcp.tool()
|
| 35 |
-
def scan_hazard(hazard_type: str, severity: str):
|
| 36 |
-
"""Logs a potential hazard detected by the robot."""
|
| 37 |
-
timestamp = datetime.now().isoformat()
|
| 38 |
-
return {"status": "warning_logged", "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})"}
|
| 39 |
-
|
| 40 |
-
@mcp.tool()
|
| 41 |
-
def analyze_human(clothing_color: str, estimated_action: str):
|
| 42 |
-
"""Tracks human activity based on visual input."""
|
| 43 |
-
return {"status": "human_tracked", "details": f"Human wearing {clothing_color} is {estimated_action}"}
|
| 44 |
|
| 45 |
# -----------------------------------------------------
|
| 46 |
# Save and upload image to HF
|
| 47 |
# -----------------------------------------------------
|
| 48 |
def save_and_upload_image(image_b64: str, hf_token: str):
|
| 49 |
-
"""Decodes a base64 image, saves it locally, and uploads to Hugging Face Hub."""
|
| 50 |
try:
|
| 51 |
image_bytes = base64.b64decode(image_b64)
|
| 52 |
size_bytes = len(image_bytes)
|
| 53 |
|
| 54 |
-
# Ensure the /tmp directory exists
|
| 55 |
os.makedirs("/tmp", exist_ok=True)
|
| 56 |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
| 57 |
local_path = f"/tmp/robot_img_{timestamp}.jpg"
|
|
@@ -61,7 +31,6 @@ def save_and_upload_image(image_b64: str, hf_token: str):
|
|
| 61 |
|
| 62 |
filename = f"robot_{timestamp}.jpg"
|
| 63 |
|
| 64 |
-
# Corrected Hugging Face Hub upload
|
| 65 |
from huggingface_hub import HfApi
|
| 66 |
api = HfApi()
|
| 67 |
api.upload_file(
|
|
@@ -84,7 +53,6 @@ def save_and_upload_image(image_b64: str, hf_token: str):
|
|
| 84 |
# JSON parsing helper
|
| 85 |
# -----------------------------------------------------
|
| 86 |
def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
|
| 87 |
-
"""Safely extract JSON from messy VLM output"""
|
| 88 |
if not text:
|
| 89 |
return None
|
| 90 |
try:
|
|
@@ -104,25 +72,22 @@ def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
|
|
| 104 |
return None
|
| 105 |
|
| 106 |
# -----------------------------------------------------
|
| 107 |
-
# Call MCP tool
|
| 108 |
# -----------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 109 |
def validate_and_call_tool(tool_name: str, tool_args: dict) -> Dict[str, Any]:
|
| 110 |
-
"""Use public API instead of _tools"""
|
| 111 |
try:
|
| 112 |
-
|
| 113 |
-
if hasattr(mcp, "call_tool"):
|
| 114 |
-
return mcp.call_tool(tool_name, tool_args)
|
| 115 |
-
# fallback: call the registered function directly
|
| 116 |
-
if hasattr(mcp, tool_name):
|
| 117 |
-
tool_fn = getattr(mcp, tool_name)
|
| 118 |
-
return tool_fn(**tool_args)
|
| 119 |
-
return {"error": f"Unknown tool '{tool_name}'"}
|
| 120 |
except Exception as e:
|
| 121 |
traceback.print_exc()
|
| 122 |
-
return {"error": f"
|
| 123 |
|
| 124 |
# -----------------------------------------------------
|
| 125 |
-
# Main pipeline: image → VLM → tool
|
| 126 |
# -----------------------------------------------------
|
| 127 |
def process_and_describe(payload: Dict[str, Any]) -> Dict[str, Any]:
|
| 128 |
if isinstance(payload, str):
|
|
@@ -176,7 +141,7 @@ Respond in STRICT JSON ONLY:
|
|
| 176 |
vlm_output = response.choices[0].message.content.strip()
|
| 177 |
parsed = safe_parse_json_from_text(vlm_output)
|
| 178 |
if parsed is None:
|
| 179 |
-
return {"status": "model_no_json", "robot_id": robot_id, "
|
| 180 |
|
| 181 |
tool_name = parsed.get("tool_name")
|
| 182 |
tool_args = parsed.get("arguments") or {}
|
|
@@ -185,7 +150,6 @@ Respond in STRICT JSON ONLY:
|
|
| 185 |
return {
|
| 186 |
"status": "success",
|
| 187 |
"robot_id": robot_id,
|
| 188 |
-
#"image_url": hf_url,
|
| 189 |
"file_size_bytes": size_bytes,
|
| 190 |
"vlm_description": parsed.get("description"),
|
| 191 |
"chosen_tool": tool_name,
|
|
@@ -197,7 +161,7 @@ Respond in STRICT JSON ONLY:
|
|
| 197 |
# ------------------------------
|
| 198 |
# Gradio Interface
|
| 199 |
# ------------------------------
|
| 200 |
-
|
| 201 |
fn=process_and_describe,
|
| 202 |
inputs=gr.JSON(label="Input JSON Payload (must include hf_token & image_b64)"),
|
| 203 |
outputs=gr.JSON(label="Output JSON Result"),
|
|
@@ -211,5 +175,6 @@ iface = gr.Interface(
|
|
| 211 |
if __name__ == "__main__":
|
| 212 |
print(f"[Config] HF_DATASET_REPO: {HF_DATASET_REPO}")
|
| 213 |
print(f"[Config] HF_VLM_MODEL: {HF_VLM_MODEL}")
|
|
|
|
| 214 |
print("[Gradio] Launching interface...")
|
| 215 |
-
|
|
|
|
|
|
|
| 1 |
import os
|
| 2 |
import base64
|
| 3 |
import json
|
|
|
|
| 6 |
from datetime import datetime
|
| 7 |
import traceback
|
| 8 |
from typing import Optional, Dict, Any
|
| 9 |
+
import asyncio
|
| 10 |
+
from fastmcp import Client
|
| 11 |
|
| 12 |
# --- Configuration ---
|
| 13 |
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
|
| 14 |
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
|
| 15 |
+
REMOTE_MCP_URL = os.environ.get("REMOTE_MCP_URL", "https://abidlabs-mcp-tools.hf.space/")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 16 |
|
| 17 |
# -----------------------------------------------------
|
| 18 |
# Save and upload image to HF
|
| 19 |
# -----------------------------------------------------
|
| 20 |
def save_and_upload_image(image_b64: str, hf_token: str):
|
|
|
|
| 21 |
try:
|
| 22 |
image_bytes = base64.b64decode(image_b64)
|
| 23 |
size_bytes = len(image_bytes)
|
| 24 |
|
|
|
|
| 25 |
os.makedirs("/tmp", exist_ok=True)
|
| 26 |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
| 27 |
local_path = f"/tmp/robot_img_{timestamp}.jpg"
|
|
|
|
| 31 |
|
| 32 |
filename = f"robot_{timestamp}.jpg"
|
| 33 |
|
|
|
|
| 34 |
from huggingface_hub import HfApi
|
| 35 |
api = HfApi()
|
| 36 |
api.upload_file(
|
|
|
|
| 53 |
# JSON parsing helper
|
| 54 |
# -----------------------------------------------------
|
| 55 |
def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
|
|
|
|
| 56 |
if not text:
|
| 57 |
return None
|
| 58 |
try:
|
|
|
|
| 72 |
return None
|
| 73 |
|
| 74 |
# -----------------------------------------------------
|
| 75 |
+
# Call remote MCP tool asynchronously
|
| 76 |
# -----------------------------------------------------
|
| 77 |
+
async def call_remote_tool(tool_name: str, **kwargs):
|
| 78 |
+
async with Client(REMOTE_MCP_URL) as client:
|
| 79 |
+
result = await client.call_tool(tool_name, **kwargs)
|
| 80 |
+
return result
|
| 81 |
+
|
| 82 |
def validate_and_call_tool(tool_name: str, tool_args: dict) -> Dict[str, Any]:
|
|
|
|
| 83 |
try:
|
| 84 |
+
return asyncio.run(call_remote_tool(tool_name, **tool_args))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 85 |
except Exception as e:
|
| 86 |
traceback.print_exc()
|
| 87 |
+
return {"error": f"Remote tool execution error: {str(e)}"}
|
| 88 |
|
| 89 |
# -----------------------------------------------------
|
| 90 |
+
# Main pipeline: image → VLM → remote tool
|
| 91 |
# -----------------------------------------------------
|
| 92 |
def process_and_describe(payload: Dict[str, Any]) -> Dict[str, Any]:
|
| 93 |
if isinstance(payload, str):
|
|
|
|
| 141 |
vlm_output = response.choices[0].message.content.strip()
|
| 142 |
parsed = safe_parse_json_from_text(vlm_output)
|
| 143 |
if parsed is None:
|
| 144 |
+
return {"status": "model_no_json", "robot_id": robot_id, "vlm_raw": vlm_output, "message": "VLM returned invalid JSON"}
|
| 145 |
|
| 146 |
tool_name = parsed.get("tool_name")
|
| 147 |
tool_args = parsed.get("arguments") or {}
|
|
|
|
| 150 |
return {
|
| 151 |
"status": "success",
|
| 152 |
"robot_id": robot_id,
|
|
|
|
| 153 |
"file_size_bytes": size_bytes,
|
| 154 |
"vlm_description": parsed.get("description"),
|
| 155 |
"chosen_tool": tool_name,
|
|
|
|
| 161 |
# ------------------------------
|
| 162 |
# Gradio Interface
|
| 163 |
# ------------------------------
|
| 164 |
+
app = gr.Interface(
|
| 165 |
fn=process_and_describe,
|
| 166 |
inputs=gr.JSON(label="Input JSON Payload (must include hf_token & image_b64)"),
|
| 167 |
outputs=gr.JSON(label="Output JSON Result"),
|
|
|
|
| 175 |
if __name__ == "__main__":
|
| 176 |
print(f"[Config] HF_DATASET_REPO: {HF_DATASET_REPO}")
|
| 177 |
print(f"[Config] HF_VLM_MODEL: {HF_VLM_MODEL}")
|
| 178 |
+
print(f"[Config] REMOTE_MCP_URL: {REMOTE_MCP_URL}")
|
| 179 |
print("[Gradio] Launching interface...")
|
| 180 |
+
app.launch(server_name="0.0.0.0", server_port=7860)
|