OppaAI's picture
Update app.py
024277f verified
raw
history blame
11 kB
# app.py
import os
import base64
import json
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
from datetime import datetime
import traceback
import threading
from typing import Tuple, Optional, Dict, Any
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# In-memory processed requests cache to prevent duplicate execution for identical request_id
PROCESSED_REQUESTS: Dict[str, Dict[str, Any]] = {}
PROCESSED_LOCK = threading.Lock()
# ==========================================
# Robot Tools (unchanged semantics)
# ==========================================
def tool_speak(text: str, emotion: str = "neutral") -> dict:
return {
"status": "success",
"action_executed": "speak",
"payload": {"text": text, "emotion": emotion}
}
def tool_navigate(direction: str, distance_meters: float) -> dict:
if distance_meters > 5.0:
return {"status": "error", "message": "Safety limit: Cannot move more than 5m at once."}
return {
"status": "success",
"action_executed": "navigate",
"payload": {"direction": direction, "distance": distance_meters}
}
def tool_scan_hazard(hazard_type: str, severity: str) -> dict:
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] WARNING: {hazard_type} detected (Severity: {severity})"
# (in real system: write to file/logging infra)
return {
"status": "warning_logged",
"log": log_entry
}
def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict:
return {
"status": "human_tracked",
"details": f"Human wearing {clothing_color} is likely {estimated_action}."
}
TOOL_REGISTRY = {
"speak": tool_speak,
"navigate": tool_navigate,
"scan_hazard": tool_scan_hazard,
"analyze_human": tool_analyze_human
}
# ==========================================
# Helper: Save & Upload (robust)
# ==========================================
def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]:
"""
Save a base64 image to a uniquely named /tmp file and upload to HF dataset repo.
Returns: local_tmp_path, hf_url, path_in_repo, size_bytes
"""
try:
# decode
image_bytes = base64.b64decode(image_b64)
size_bytes = len(image_bytes)
print("[debug] decoded image bytes:", size_bytes)
if size_bytes < 10:
raise ValueError("Decoded image is too small or invalid base64")
# unique tmp filename (avoid collision across workers)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_tmp_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
print(f"[debug] wrote local tmp file: {local_tmp_path}")
# Prepare filename in repo (put at repo root to avoid folder permission issues)
filename = f"robot_{timestamp}.jpg"
path_in_repo = filename
# upload_file might raise. capture exception and show traceback
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
print("[debug] upload successful:", hf_image_url)
return local_tmp_path, hf_image_url, path_in_repo, size_bytes
except Exception as e:
print("[error] save_and_upload_image failed:", e)
traceback.print_exc()
return None, None, None, 0
# ==========================================
# Main logic
# ==========================================
def safe_parse_json_from_text(text: str) -> Optional[dict]:
"""
Try to extract JSON object from model output.
Accepts raw JSON, or a ```json\n{...}``` block, or text with JSON substring.
Returns dict or None.
"""
if not text:
return None
# remove markdown fences
t = text.strip()
if t.startswith("```") and "```" in t[3:]:
# remove outer fences
t = t.strip("`")
# find first '{' and last '}' to try to extract JSON substring
start = t.find("{")
end = t.rfind("}")
if start >= 0 and end > start:
candidate = t[start:end+1]
try:
return json.loads(candidate)
except Exception:
# fallback: try the whole text
try:
return json.loads(t)
except Exception:
return None
else:
try:
return json.loads(t)
except Exception:
return None
def validate_and_call_tool(tool_name: str, tool_args: dict):
if not tool_name:
return {"error": "No tool_name provided by VLM."}
if tool_name not in TOOL_REGISTRY:
return {"error": f"Tool '{tool_name}' not found in registry."}
# safe-call: ensure dict args only contain acceptable keys for that tool
try:
result = TOOL_REGISTRY[tool_name](**tool_args)
return result
except TypeError as e:
return {"error": f"Tool call argument mismatch: {str(e)}"}
except Exception as e:
traceback.print_exc()
return {"error": f"Tool execution failed: {str(e)}"}
def process_and_describe(payload: dict):
"""
payload expects keys:
- hf_token (string)
- image_b64 (base64 str)
- robot_id (optional)
- request_id (optional) # recommended to dedupe retries
"""
vlm_text = ""
tool_result = None
action_data = {}
try:
# basic checks
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload. Token must have datasets write permission if uploading."}
request_id = payload.get("request_id") or payload.get("robot_id") or None
if request_id:
with PROCESSED_LOCK:
if request_id in PROCESSED_REQUESTS:
print("[info] duplicate request_id detected; returning cached result")
return PROCESSED_REQUESTS[request_id]
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided in payload."}
# Save & upload (only once per invocation)
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
if not hf_url:
# Upload failed: return error with helpful debug info
return {
"error": "Image upload failed on server.",
"debug": {
"local_tmp_path": local_tmp_path,
"path_in_repo": path_in_repo,
"size_bytes": size_bytes
}
}
# Build system prompt (kept compact)
tools_desc = json.dumps({
"speak": {"text": "string", "emotion": "string"},
"navigate": {"direction": "forward/left/right", "distance_meters": "float"},
"scan_hazard": {"hazard_type": "string", "severity": "low/medium/high"},
"analyze_human": {"clothing_color": "string", "estimated_action": "string"}
}, indent=2)
system_prompt = f"""
You are a Robot Control AI. Analyze the image and choose ONE tool to execute.
AVAILABLE TOOLS (JSON Schema):
{tools_desc}
INSTRUCTIONS:
1. Describe what you see briefly.
2. Select the single most appropriate tool and provide arguments matching the schema.
RESPONSE FORMAT (Strict JSON):
{{
"description": "Brief visual description",
"tool_name": "name_of_tool",
"arguments": {{ ...args matching schema... }}
}}
"""
# Build messages payload for VLM - include the uploaded HF URL (some VLMs can fetch it)
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze this camera feed and decide on an action."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# Instantiate HF Inference client and call chat completion
hf_client = InferenceClient(token=hf_token)
# NOTE: huggingface InferenceClient usage may vary by version. We use the chat completions create call.
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300,
temperature=0.1
)
vlm_text = chat_completion.choices[0].message.content.strip()
print("[debug] VLM raw output:", vlm_text[:1000])
# attempt to parse JSON
parsed = safe_parse_json_from_text(vlm_text)
if parsed is None:
# If the model didn't return JSON, return descriptive fallback but do not execute tools
result = {
"status": "model_no_json",
"robot_id": robot_id,
"image_url": hf_url,
"vlm_raw": vlm_text,
"message": "VLM did not return valid JSON following the required schema."
}
if request_id:
with PROCESSED_LOCK:
PROCESSED_REQUESTS[request_id] = result
return result
action_data = parsed
tool_name = action_data.get("tool_name")
tool_args = action_data.get("arguments", {}) or {}
# Validate that arguments is a dict
if not isinstance(tool_args, dict):
tool_args = {}
# Execute the tool once and capture result
print(f"[info] Executing tool: {tool_name} with args {tool_args}")
tool_result = validate_and_call_tool(tool_name, tool_args)
result = {
"status": "success",
"robot_id": robot_id,
"image_url": hf_url,
"image_bytes": size_bytes,
"analysis": action_data.get("description"),
"chosen_tool": tool_name,
"tool_arguments": tool_args,
"tool_execution_result": tool_result,
"vlm_raw": vlm_text
}
if request_id:
with PROCESSED_LOCK:
PROCESSED_REQUESTS[request_id] = result
return result
except Exception as e:
traceback.print_exc()
return {"error": f"Server error: {str(e)}", "vlm_raw": vlm_text}
# --- Gradio Interface ---
iface = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input (JSON with 'image_b64', 'hf_token', optional 'request_id')"),
outputs=gr.JSON(label="Robot Command Output"),
api_name="predict",
allow_flagging="never",
live=False
)
if __name__ == "__main__":
# When deploying to HF Space: set server_name and server_port via env if you need
iface.launch()