OppaAI's picture
Update app.py
08216b8 verified
raw
history blame
7.43 kB
import os
import base64
import json
import gradio as gr
from huggingface_hub import upload_file, InferenceClient
from datetime import datetime
# --- Config ---
HF_DATASET_REPO = "OppaAI/Robot_MCP"
HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
# ==========================================
# 1. DEFINE ROBOT TOOLS
# ==========================================
def tool_speak(text: str, emotion: str = "neutral") -> dict:
"""
Command the robot to speak text via TTS.
"""
# In a real scenario, this would send a signal to the robot's speaker driver
return {
"status": "success",
"action_executed": "speak",
"payload": {"text": text, "emotion": emotion}
}
def tool_navigate(direction: str, distance_meters: float) -> dict:
"""
Move the robot. Direction options: 'forward', 'backward', 'left', 'right'.
"""
if distance_meters > 5.0:
return {"status": "error", "message": "Safety limit: Cannot move more than 5m at once."}
return {
"status": "success",
"action_executed": "navigate",
"payload": {"direction": direction, "distance": distance_meters}
}
def tool_scan_hazard(hazard_type: str, severity: str) -> dict:
"""
Log a safety hazard if seen in the image (e.g., 'fire', 'water', 'obstacle').
"""
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] WARNING: {hazard_type} detected (Severity: {severity})"
# Here you would write to a log file or trigger an alarm
return {
"status": "warning_logged",
"log": log_entry
}
def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict:
"""
Specialized analysis when a human is detected.
"""
return {
"status": "human_tracked",
"details": f"Human wearing {clothing_color} is likely {estimated_action}."
}
# --- Tool Dispatcher ---
# This maps string names to the actual Python functions
TOOL_REGISTRY = {
"speak": tool_speak,
"navigate": tool_navigate,
"scan_hazard": tool_scan_hazard,
"analyze_human": tool_analyze_human
}
# ==========================================
# 2. HELPER FUNCTIONS
# ==========================================
def save_and_upload_image(image_b64: str, hf_token: str):
try:
image_bytes = base64.b64decode(image_b64)
local_tmp_path = "/tmp/tmp.jpg"
with open(local_tmp_path, "wb") as f:
f.write(image_bytes)
# Create unique filename to avoid overwriting
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path_in_repo = f"images/robot_{timestamp}.jpg"
upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=path_in_repo,
repo_id=HF_DATASET_REPO,
token=hf_token,
repo_type="dataset"
)
hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
except Exception as e:
print(f"Upload failed: {e}")
return None, None, None, 0
# ==========================================
# 3. MAIN LOGIC
# ==========================================
def process_and_describe(payload: dict):
tool_result = None
vlm_text = ""
action_data = {}
try:
hf_token = payload.get("hf_token")
if not hf_token:
return {"error": "HF token not provided in payload."}
robot_id = payload.get("robot_id", "unknown")
image_b64 = payload.get("image_b64")
if not image_b64:
return {"error": "No image provided."}
# Upload Image
local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
# Initialize HF Client
hf_client = InferenceClient(token=hf_token)
# --- Dynamic System Prompt Construction ---
tools_desc = json.dumps({
"speak": {"text": "string", "emotion": "string"},
"navigate": {"direction": "forward/left/right", "distance_meters": "float"},
"scan_hazard": {"hazard_type": "string", "severity": "low/medium/high"},
"analyze_human": {"clothing_color": "string", "estimated_action": "string"}
}, indent=2)
system_prompt = f"""
You are a Robot Control AI. Analyze the image and choose ONE tool to execute.
AVAILABLE TOOLS (JSON Schema):
{tools_desc}
INSTRUCTIONS:
1. Describe what you see briefly.
2. Select the most appropriate tool based on the visual context.
- If you see a person -> use 'analyze_human' OR 'speak'.
- If you see a clear path -> use 'navigate'.
- If you see fire/mess -> use 'scan_hazard'.
RESPONSE FORMAT (Strict JSON):
{{
"description": "Brief visual description",
"tool_name": "name_of_tool",
"arguments": {{ ...args matching schema... }}
}}
"""
messages_payload = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": [
{"type": "text", "text": "Analyze this camera feed and decide on an action."},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
]}
]
# Call VLM
chat_completion = hf_client.chat.completions.create(
model=HF_VLM_MODEL,
messages=messages_payload,
max_tokens=300,
temperature=0.1 # Low temp for reliable JSON
)
vlm_text = chat_completion.choices[0].message.content.strip()
# Clean up markdown code blocks if the model adds them (```json ... ```)
if vlm_text.startswith("```"):
vlm_text = vlm_text.strip("`").replace("json", "").strip()
# Parse JSON
try:
action_data = json.loads(vlm_text)
# --- TOOL EXECUTION BLOCK ---
tool_name = action_data.get("tool_name")
tool_args = action_data.get("arguments", {})
if tool_name in TOOL_REGISTRY:
# Execute the Python function dynamically
print(f"Executing tool: {tool_name} with args {tool_args}")
tool_result = TOOL_REGISTRY[tool_name](**tool_args)
else:
tool_result = {"error": f"Tool '{tool_name}' not found in registry."}
except json.JSONDecodeError:
action_data = {"description": vlm_text, "tool_name": None}
tool_result = {"error": "Model did not return valid JSON."}
return {
"status": "success",
"robot_id": robot_id,
"image_url": hf_url,
"analysis": action_data.get("description"),
"chosen_tool": action_data.get("tool_name"),
"tool_arguments": action_data.get("arguments"),
"tool_execution_result": tool_result
}
except Exception as e:
return {"error": f"Server error: {str(e)}", "raw_response": vlm_text}
# --- Gradio Interface ---
demo = gr.Interface(
fn=process_and_describe,
inputs=gr.JSON(label="Input (JSON with 'image_b64' and 'hf_token')"),
outputs=gr.JSON(label="Robot Command Output"),
api_name="predict"
)
if __name__ == "__main__":
demo.launch()