OppaAI's picture
Update app.py
906625a verified
raw
history blame
5 kB
import os
import base64
import json
from datetime import datetime
import traceback
from typing import Dict, Any
import gradio as gr
from huggingface_hub import HfApi, InferenceClient
from fastmcp import FastMCP
from pydantic import BaseModel, Field # Import Pydantic BaseModel and Field
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
mcp = FastMCP("Robot_MCP_Server") # <-- Important
# ---------------------------------------------------
# Define Pydantic Schema for the input payload
# ---------------------------------------------------
# This defines the expected structure and automatically generates the valid JSON schema
class RobotWatchPayload(BaseModel):
hf_token: str = Field(description="Your Hugging Face API token.")
robot_id: str = Field(description="The unique identifier for the robot.", default="unknown")
image_b64: str = Field(description="Base64 encoded image data.")
def upload_image(image_b64: str, hf_token: str):
try:
image_bytes = base64.b64decode(image_b64)
os.makedirs("/tmp", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_path, "wb") as f:
f.write(image_bytes)
filename = f"robot_{timestamp}.jpg"
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"tmp/{filename}",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=hf_token
)
url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
return local_path, url, filename, len(image_bytes)
except Exception:
traceback.print_exc()
return None, None, None, 0
def safe_parse_json_from_text(text: str):
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip().strip("`").strip()
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:].strip()
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
return json.loads(cleaned[start:end + 1])
except:
return None
# ---------------------------------------------------
# TRUE MCP TOOL — THIS must be exposed to MCP client
# ---------------------------------------------------
@mcp.tool(
name="robot_watch",
description="Analyze a base64 image using Qwen VLM and return structured JSON.",
)
def robot_watch(payload: RobotWatchPayload): # <-- Type hint with Pydantic model
# The payload is already validated and typed correctly by fastmcp/pydantic
hf_token = payload.hf_token
image_b64 = payload.image_b64
robot_id = payload.robot_id
if not hf_token:
# This check is technically redundant if the schema demands it, but safe.
return {"error": "Missing hf_token"}
# image_b64 existence is also guaranteed by the schema
_, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
if not hf_url:
return {"error": "Image upload failed"}
system_prompt = """
Respond in STRICT JSON ONLY:
{
"description": "...",
"human": "...",
"environment": "...",
"objects": []
}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze the image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
client = InferenceClient(token=hf_token)
try:
resp = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
max_tokens=500,
temperature=0.1
)
except Exception as e:
return {"status": "error", "message": str(e)}
vlm_output = resp.choices[0].message.content.strip()
parsed = safe_parse_json_from_text(vlm_output) or {}
return {
"status": "success",
"robot_id": robot_id,
"file_size_bytes": size_bytes,
"image_url": hf_url,
"result": parsed,
"vlm_raw": vlm_output
}
# ---------------------------------------------------
# Gradio UI — separate from MCP tool layer
# ---------------------------------------------------
# The process_json function will still work with the Pydantic model input
def process_json(payload: Dict[str, Any]):
# When called via Gradio UI (not MCP), input will be Dict, so handle type conversion
pydantic_payload = RobotWatchPayload(**payload)
return robot_watch(pydantic_payload)
app = gr.Interface(
fn=process_json,
inputs=gr.JSON(),
outputs=gr.JSON(),
title="Robot MCP Server",
api_name="predict"
)
if __name__ == "__main__":
# Gradio will use the 'mcp' instance defined globally to host the MCP server endpoints
app.launch(mcp_server=True)