OppaAI's picture
Update app.py
08c9a39 verified
raw
history blame
7.05 kB
# app.py
import os
import base64
import json
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
from datetime import datetime
import traceback
import threading
from typing import Optional, Dict, Any, Tuple
from fastmcp import FastMCP
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
mcp = FastMCP("Robot_MCP")
# -----------------------------------------------------
# Register Robot Tools (MCP)
# -----------------------------------------------------
@mcp.tool()
def speak(text: str, emotion: str = "neutral"):
"""Robot speech output"""
return {
"status": "success",
"action_executed": "speak",
"payload": {"text": text, "emotion": emotion},
}
@mcp.tool()
def navigate(direction: str, distance_meters: float):
"""Move robot safely"""
if distance_meters > 5.0:
return {"status": "error", "message": "Safety limit exceeded"}
return {
"status": "success",
"action_executed": "navigate",
"payload": {"direction": direction, "distance": distance_meters},
}
@mcp.tool()
def scan_hazard(hazard_type: str, severity: str):
"""Hazard scan + log"""
timestamp = datetime.now().isoformat()
return {
"status": "warning_logged",
"log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})",
}
@mcp.tool()
def analyze_human(clothing_color: str, estimated_action: str):
"""Human detection description"""
return {
"status": "human_tracked",
"details": f"Human wearing {clothing_color} is {estimated_action}",
}
# -----------------------------------------------------
# Save and Upload Image
# -----------------------------------------------------
def save_and_upload_image(image_b64: str, hf_token: str):
try:
image_bytes = base64.b64decode(image_b64)
size_bytes = len(image_bytes)
print("[debug] decoded image bytes:", size_bytes)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_path, "wb") as f:
f.write(image_bytes)
print("[debug] wrote local tmp file:", local_path)
filename = f"robot_{timestamp}.jpg"
upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset",
)
print("[debug] upload successful:", filename)
url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}"
return local_path, url, filename, size_bytes
except Exception:
traceback.print_exc()
return None, None, None, 0
# -----------------------------------------------------
# JSON Parsing Helper
# -----------------------------------------------------
def safe_parse_json_from_text(text: str):
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip().strip("`")
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
return json.loads(cleaned[start : end + 1])
except:
pass
return None
# -----------------------------------------------------
# Only allow tools from MCP registry
# -----------------------------------------------------
def validate_and_call_tool(tool_name: str, tool_args: dict):
if tool_name not in mcp.tools:
return {"error": f"Unknown or unauthorized tool '{tool_name}'"}
try:
return mcp.tools[tool_name](**tool_args)
except Exception as e:
traceback.print_exc()
return {"error": f"Tool error: {str(e)}"}
# -----------------------------------------------------
# Main Pipeline
# -----------------------------------------------------
def process_and_describe(payload):
if isinstance(payload, str):
try:
payload = json.loads(payload)
except:
return {"error": "Invalid JSON payload"}
print("\n========== NEW REQUEST ==========")
print("[debug] Incoming payload:", payload)
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "hf_token missing"}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "image_b64 missing"}
# Save + Upload
local_tmp_path, hf_url, filename, size_bytes = save_and_upload_image(
image_b64, hf_token
)
if not hf_url:
return {"error": "Image upload failed"}
print("[debug] HF image URL:", hf_url)
# VLM SYSTEM PROMPT
system_prompt = """
Respond in STRICT JSON ONLY. Format:
{
"description": "short visual description",
"tool_name": "one of: speak, navigate, scan_hazard, analyze_human",
"arguments": { ... }
}
"""
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "text", "text": "Analyze the image and choose ONE tool."},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
},
],
},
]
# VLM CALL
print("[debug] Calling VLM model...")
client = InferenceClient(token=hf_token)
response = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
max_tokens=300,
temperature=0.1,
)
vlm_output = response.choices[0].message.content.strip()
print("\n------ VLM RAW OUTPUT ------")
print(vlm_output)
print("------ END VLM RAW ------\n")
parsed = safe_parse_json_from_text(vlm_output)
if parsed is None:
return {
"status": "model_no_json",
"robot_id": robot_id,
"image_url": hf_url,
"vlm_raw": vlm_output,
"message": "VLM returned invalid JSON",
}
tool_name = parsed.get("tool_name")
tool_args = parsed.get("arguments") or {}
tool_result = validate_and_call_tool(tool_name, tool_args)
return {
"status": "success",
"robot_id": robot_id,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"vlm_description": parsed.get("description"),
"chosen_tool": tool_name,
"tool_arguments": tool_args,
"tool_execution_result": tool_result,
"vlm_raw": vlm_output,
}
# ------------------------------
# Gradio Interface
# ------------------------------
iface = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input JSON"),
outputs=gr.JSON(label="Output JSON"),
api_name="predict",
flagging_mode="never"
)
# ------------------------------
# Main Entry
# ------------------------------
if __name__ == "__main__":
print("[Gradio] Launching interface...")
iface.launch(server_name="0.0.0.0", server_port=7860)