OppaAI's picture
Update app.py
8c3dcd1 verified
raw
history blame
3.8 kB
import os
import base64
import json
from datetime import datetime
import traceback
from typing import Optional, Dict, Any
import gradio as gr
from huggingface_hub import HfApi, InferenceClient
from fastmcp import FastMCP
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
mcp = FastMCP("Robot_MCP_Server")
# -------------------------------
# Upload helper
# -------------------------------
def upload_image(image_b64: str, hf_token: str):
try:
image_bytes = base64.b64decode(image_b64)
size_bytes = len(image_bytes)
os.makedirs("/tmp", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_path, "wb") as f:
f.write(image_bytes)
filename = f"robot_{timestamp}.jpg"
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"tmp/{filename}",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=hf_token
)
url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
return local_path, url, filename, size_bytes
except Exception:
traceback.print_exc()
return None, None, None, 0
# -------------------------------
# Safe JSON parse
# -------------------------------
def safe_parse_json_from_text(text: str):
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip().strip("`").strip()
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:].strip()
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
return json.loads(cleaned[start:end + 1])
except:
return None
# -------------------------------
# MCP TOOL
# -------------------------------
@mcp.tool()
def robot_watch(payload: Dict[str, Any]):
if isinstance(payload, str):
try:
payload = json.loads(payload)
except:
return {"error": "Invalid JSON payload"}
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "hf_token missing"}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "image_b64 missing"}
# Upload image
_, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
if not hf_url:
return {"error": "Image upload failed"}
# VLM call
system_prompt = """
Respond in STRICT JSON ONLY.
{
"description": "...",
"human": "...",
"environment": "..."
}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze the image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
client = InferenceClient(token=hf_token)
try:
resp = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
max_tokens=300,
temperature=0.1
)
except Exception as e:
return {"status": "error", "message": str(e)}
vlm_output = resp.choices[0].message.content.strip()
parsed = safe_parse_json_from_text(vlm_output)
if parsed is None:
return {
"status": "model_no_json",
"vlm_raw": vlm_output,
"message": "Invalid JSON returned"
}
return {
"status": "success",
"robot_id": robot_id,
"file_size_bytes": size_bytes,
"image_url": hf_url,
"de