OppaAI's picture
Update app.py
3dffc39 verified
import os
import base64
import json
from datetime import datetime
import traceback
import gradio as gr
from huggingface_hub import HfApi, InferenceClient
from pydantic import BaseModel, Field
# -------------------------------
# Environment variables / Constants
# -------------------------------
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
# -------------------------------
# Pydantic schema for the tool payload
# -------------------------------
class RobotWatchPayload(BaseModel):
"""
Defines the expected input structure for the robot VLM analysis tool.
Attributes:
hf_token (str): Your Hugging Face API token.
robot_id (str): Identifier for the robot (default "unknown").
image_b64 (str): Base64 encoded image string to analyze.
"""
hf_token: str = Field(description="Your Hugging Face API token.")
robot_id: str = Field(description="Robot identifier.", default="unknown")
image_b64: str = Field(description="Base64 encoded image data.")
# -------------------------------
# Helper function: Upload image to Hugging Face dataset
# -------------------------------
def upload_image(image_b64: str, hf_token: str):
"""
Decodes a base64 image string, saves it locally, and uploads to Hugging Face dataset.
Args:
image_b64 (str): Base64 encoded image data.
hf_token (str): Hugging Face API token.
Returns:
tuple: (local_path, hf_url, filename, size_bytes)
"""
try:
image_bytes = base64.b64decode(image_b64)
os.makedirs("/tmp", exist_ok=True)
# Generate unique timestamped filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
# Save locally
with open(local_path, "wb") as f:
f.write(image_bytes)
filename = f"robot_{timestamp}.jpg"
# Upload to Hugging Face dataset
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"tmp/{filename}",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=hf_token
)
hf_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
return local_path, hf_url, filename, len(image_bytes)
except Exception:
traceback.print_exc()
return None, None, None, 0
# -------------------------------
# Helper function: Parse JSON safely
# -------------------------------
def safe_parse_json_from_text(text: str):
"""
Attempts to parse JSON from text returned by the VLM model.
Strips any leading/trailing characters and handles malformed responses.
Args:
text (str): Raw text output from the model.
Returns:
dict or None: Parsed JSON dictionary, or None if parsing fails.
"""
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip().strip("`").strip()
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:].strip()
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
return json.loads(cleaned[start:end + 1])
except:
return None
# -------------------------------
# Core VLM analysis function
# -------------------------------
def run_vlm_analysis(payload: RobotWatchPayload):
"""
Main logic for analyzing an image using Hugging Face VLM model.
Args:
payload (RobotWatchPayload): Validated payload containing token, robot_id, and image.
Returns:
dict: Analysis result including description, objects, and raw VLM output.
"""
hf_token = payload.hf_token
image_b64 = payload.image_b64
robot_id = payload.robot_id
# Upload the image to Hugging Face dataset
_, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
if not hf_url:
return {"error": "Image upload failed"}
# System prompt instructs VLM to return strict JSON
system_prompt = """
Respond in STRICT JSON ONLY. Put more details in Description. Ensure all the fields are never empty; list general items if specific ones are not clear.
{
"description": "...",
"environment": "...",
"indoor_or_outdoor": "...",
"lighting_condition": "..."
"human": "...",
"animals": "...",
"objects": [],
"hazards": "...",
}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze the image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
client = InferenceClient(token=hf_token)
try:
resp = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
max_tokens=500,
temperature=0.1
)
except Exception as e:
return {"status": "error", "message": str(e)}
vlm_output = resp.choices[0].message.content.strip()
parsed = safe_parse_json_from_text(vlm_output) or {}
return {
"status": "success",
"robot_id": robot_id,
"file_size_bytes": size_bytes,
"image_url": hf_url,
"result": parsed,
"vlm_raw": vlm_output
}
# -------------------------------
# Gradio interface function
# -------------------------------
def robot_watch(
hf_token_input: str,
robot_id_input: str,
image_b64_input: str
):
"""
Gradio wrapper for run_vlm_analysis.
Converts individual fields into Pydantic model and calls core logic.
Args:
hf_token_input (str): Hugging Face API token input from UI.
robot_id_input (str): Robot ID input from UI.
image_b64_input (str): Base64 image input from UI.
Returns:
dict: Result from run_vlm_analysis.
"""
if not image_b64_input:
return {"error": "Base64 image string is empty."}
# Create the payload instance
payload_instance = RobotWatchPayload(
hf_token=hf_token_input,
robot_id=robot_id_input,
image_b64=image_b64_input
)
# Run core analysis
result = run_vlm_analysis(payload_instance)
return result
# -------------------------------
# Gradio App
# -------------------------------
app = gr.Interface(
fn=robot_watch,
inputs=[
gr.Textbox(label="Hugging Face Token", lines=1),
gr.Textbox(label="Robot ID", lines=1, value="unknown"),
gr.Textbox(label="Image Base64 String", lines=5)
],
outputs=gr.Json(label="Tool Output"),
title="Robot CV MCP Server",
description="Interface for robot VLM analysis using individual fields, including base64 image string.",
api_name="predict"
)
if __name__ == "__main__":
# Launch Gradio app with MCP server enabled
app.launch(mcp_server=True)