OppaAI's picture
Update app.py
bbcef43 verified
raw
history blame
3.88 kB
import os
import base64
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
import json
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# --- Helper Functions ---
def save_and_upload_image(image_b64, hf_token):
"""Save image to /tmp and upload to HF dataset."""
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
path_in_repo = "images/tmp.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
# --- Main MCP function ---
def process_and_describe(payload: dict):
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Save & upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Init HF client
hf_client = InferenceClient(token=hf_token)
# System prompt: describe + suggest action
system_prompt = """
You are a helpful robot assistant.
1. Describe the image in detail.
2. Suggest what the robot should do next based on what it sees:
- Human figure β†’ say 'Hi'.
- Ball β†’ move towards it.
- Obstacles β†’ stop or avoid.
- Red button β†’ press it.
Always respond in JSON:
{"description": "...", "action": {"move": "...", "interact": "..."}}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Here is an image."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# Call VLM
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300
)
# Robustly extract text
try:
vlm_text = chat_completion.choices[0].message.content.strip()
except Exception:
# fallback if structure is different
vlm_text = str(chat_completion)
# Attempt to parse JSON from VLM
action_data = {}
try:
action_data = json.loads(vlm_text)
except Exception:
# If VLM didn't return valid JSON, wrap text as description
action_data = {"description": vlm_text, "action": {"move": "unknown", "interact": "unknown"}}
return {
"saved_to_hf_hub": True,
"repo_id": HF_DATASET_REPO,
"path_in_repo": path_in_repo,
"image_url": hf_url,
"file_size_bytes": size_bytes,
"robot_id": robot_id,
"vlm_response": vlm_text,
"vlm_action": action_data.get("action", {}),
"vlm_description": action_data.get("description", "")
}
except Exception as e:
return {"error": f"An API error occurred: {str(e)}"}
# --- Gradio MCP Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input Payload (Dict format with 'image_b64')"),
outputs=gr.JSON(label="Reply to Jetson"),
api_name="predict"
)
if __name__ == "__main__":
demo.launch(mcp_server=True)