OppaAI's picture
Update app.py
87deda2 verified
raw
history blame
5.23 kB
import os
import base64
import json
import gradio as gr
from huggingface_hub import HfApi, InferenceClient
from datetime import datetime
import traceback
from typing import Optional, Dict, Any
from fastmcp import FastMCP
# --- Configuration ---
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
# Create MCP server
mcp = FastMCP("Robot_MCP_Server")
# -----------------------------------------------------
# Save and upload image to HF
# -----------------------------------------------------
def upload_image(image_b64: str, hf_token: str):
try:
image_bytes = base64.b64decode(image_b64)
size_bytes = len(image_bytes)
os.makedirs("/tmp", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_path, "wb") as f:
f.write(image_bytes)
filename = f"robot_{timestamp}.jpg"
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"tmp/{filename}",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=hf_token
)
# FIXED URL
url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
return local_path, url, filename, size_bytes
except Exception as e:
print(f"[Error] during image upload: {e}")
traceback.print_exc()
return None, None, None, 0
# -----------------------------------------------------
# JSON parsing helper
# -----------------------------------------------------
def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip().strip("`").strip()
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:].strip()
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
return json.loads(cleaned[start:end + 1])
except:
return None
return None
# -----------------------------------------------------
# MCP Tool: image β†’ VLM β†’ structured JSON
# -----------------------------------------------------
@mcp.tool()
def robot_watch(payload: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(payload, str):
try:
payload = json.loads(payload)
except:
return {"error": "Invalid JSON payload"}
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "hf_token missing"}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "image_b64 missing"}
# 1. Save + Upload
_, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
if not hf_url:
return {"error": "Image upload failed"}
# 2. VLM prompt
system_prompt = """
Respond in STRICT JSON ONLY.
Output format:
{
"description": "...",
"human": "...",
"environment": "..."
}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze the image and provide the description."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
client = InferenceClient(token=hf_token)
try:
response = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
max_tokens=300,
temperature=0.1,
)
except Exception as e:
return {"status": "error", "message": f"Inference API call failed: {e}"}
vlm_output = response.choices[0].message.content.strip()
parsed = safe_parse_json_from_text(vlm_output)
if parsed is None:
return {
"status": "model_no_json",
"robot_id": robot_id,
"vlm_raw": vlm_output,
"message": "VLM returned invalid JSON"
}
return {
"status": "success",
"robot_id": robot_id,
"file_size_bytes": size_bytes,
"image_url": hf_url,
"description": parsed.get("description"),
"human": parsed.get("human"),
"environment": parsed.get("environment"),
"vlm_raw": vlm_output
}
# -----------------------------------------------------
# Gradio Interface wrapper
# -----------------------------------------------------
def process_and_describe(payload):
return robot_watch(payload)
app = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input JSON Payload (must include hf_token & image_b64)"),
outputs=gr.JSON(label="Output JSON Result"),
api_name="predict",
flagging_mode="never"
)
# -----------------------------------------------------
# Entry
# -----------------------------------------------------
if __name__ == "__main__":
print("[MCP] Robot MCP Server starting...")
mcp.run(background=True)
print("[Gradio] Launching interface...")
app.launch()