OppaAI's picture
Update app.py
d192cfe verified
raw
history blame
7.27 kB
# app.py
import os
import base64
import json
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
from datetime import datetime
import traceback
import threading
from typing import Tuple, Optional, Dict, Any
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
PROCESSED_REQUESTS: Dict[str, Dict[str, Any]] = {}
PROCESSED_LOCK = threading.Lock()
# --------------------
# Robot Tools
# --------------------
def tool_speak(text: str, emotion: str = "neutral") -> dict:
return {"status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion}}
def tool_navigate(direction: str, distance_meters: float) -> dict:
if distance_meters > 5.0:
return {"status": "error", "message": "Safety limit exceeded"}
return {"status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters}}
def tool_scan_hazard(hazard_type: str, severity: str) -> dict:
timestamp = datetime.now().isoformat()
return {"status": "warning_logged", "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})"}
def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict:
return {"status": "human_tracked", "details": f"Human wearing {clothing_color} is {estimated_action}"}
TOOL_REGISTRY = {
"speak": tool_speak,
"navigate": tool_navigate,
"scan_hazard": tool_scan_hazard,
"analyze_human": tool_analyze_human
}
# --------------------
# Save + Upload
# --------------------
def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]:
try:
image_bytes = base64.b64decode(image_b64)
size_bytes = len(image_bytes)
print("[debug] decoded image bytes:", size_bytes)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
local_path = f"/tmp/robot_img_{timestamp}.jpg"
with open(local_path, "wb") as f:
f.write(image_bytes)
print("[debug] wrote local tmp file:", local_path)
filename = f"robot_{timestamp}.jpg"
upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
print("[debug] upload successful:", filename)
url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}"
return local_path, url, filename, size_bytes
except Exception as e:
traceback.print_exc()
return None, None, None, 0
# --------------------
# JSON Parse Helper
# --------------------
def safe_parse_json_from_text(text: str) -> Optional[dict]:
if not text:
return None
try:
return json.loads(text)
except:
pass
cleaned = text.strip()
if cleaned.startswith("```"):
cleaned = cleaned.strip("`")
try:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
return json.loads(cleaned[start:end+1])
except:
return None
return None
# --------------------
# Tool validation + exec
# --------------------
def validate_and_call_tool(tool_name: str, tool_args: dict):
if not tool_name:
return {"error": "Missing tool_name"}
if tool_name not in TOOL_REGISTRY:
return {"error": f"Unknown tool '{tool_name}'"}
try:
return TOOL_REGISTRY[tool_name](**tool_args)
except Exception as e:
traceback.print_exc()
return {"error": f"Tool error: {str(e)}"}
# --------------------
# Main Function
# --------------------
def process_and_describe(payload):
# If string → parse JSON
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception as e:
print("[error] invalid JSON from client:", payload)
return {"error": f"Invalid JSON string: {str(e)}"}
print("\n================ NEW REQUEST ================")
print("[debug] Incoming payload:", payload)
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "hf_token missing"}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "image_b64 missing"}
# Save & Upload
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
if not hf_url:
print("[error] Image upload failed.")
return {"error": "Image upload failed"}
print("[debug] HF image URL:", hf_url)
# Build prompt
system_prompt = """
Respond in STRICT JSON:
{
"description":"short visual description",
"tool_name":"name",
"arguments": { ... }
}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze image and select one tool"},
{"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
print("[debug] Calling VLM model...")
client = InferenceClient(token=hf_token)
response = client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages,
max_tokens=300,
temperature=0.1
)
vlm_output = response.choices[0].message.content.strip()
# 🔥 PRINT VLM RAW OUTPUT (你要求的)
print("\n------ VLM RAW OUTPUT ------")
print(vlm_output)
print("------ END VLM RAW ------\n")
parsed = safe_parse_json_from_text(vlm_output)
if parsed is None:
print("[error] VLM did NOT return valid JSON")
return {
"status": "model_no_json",
"robot_id": robot_id,
"image_url": hf_url,
"vlm_raw": vlm_output,
"message": "VLM did not output valid JSON"
}
tool_name = parsed.get("tool_name")
tool_args = parsed.get("arguments") or {}
print("[debug] Parsed JSON:", parsed)
tool_result = validate_and_call_tool(tool_name, tool_args)
result = {
"status": "success",
"robot_id": robot_id,
"image_url": hf_url,
"image_bytes": size_bytes,
"analysis": parsed.get("description"),
"chosen_tool": tool_name,
"tool_arguments": tool_args,
"tool_execution_result": tool_result,
"vlm_raw": vlm_output
}
print("[debug] Final result:", result)
print("============================================\n")
return result
except Exception as e:
traceback.print_exc()
return {"error": f"Server exception: {str(e)}"}
# --------------------
# Gradio
# --------------------
iface = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input JSON"),
outputs=gr.JSON(label="Output JSON"),
api_name="predict",
allow_flagging="never"
)
if __name__ == "__main__":
iface.launch()