# app.py import os import base64 import json import gradio as gr from huggingface_hub import upload_file, InferenceClient from datetime import datetime import traceback import threading from typing import Tuple, Optional, Dict, Any # --- Config --- HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" # In-memory processed requests cache to prevent duplicate execution for identical request_id PROCESSED_REQUESTS: Dict[str, Dict[str, Any]] = {} PROCESSED_LOCK = threading.Lock() # ========================================== # Robot Tools (unchanged semantics) # ========================================== def tool_speak(text: str, emotion: str = "neutral") -> dict: return { "status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion} } def tool_navigate(direction: str, distance_meters: float) -> dict: if distance_meters > 5.0: return {"status": "error", "message": "Safety limit: Cannot move more than 5m at once."} return { "status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters} } def tool_scan_hazard(hazard_type: str, severity: str) -> dict: timestamp = datetime.now().isoformat() log_entry = f"[{timestamp}] WARNING: {hazard_type} detected (Severity: {severity})" # (in real system: write to file/logging infra) return { "status": "warning_logged", "log": log_entry } def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict: return { "status": "human_tracked", "details": f"Human wearing {clothing_color} is likely {estimated_action}." } TOOL_REGISTRY = { "speak": tool_speak, "navigate": tool_navigate, "scan_hazard": tool_scan_hazard, "analyze_human": tool_analyze_human } # ========================================== # Helper: Save & Upload (robust) # ========================================== def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]: """ Save a base64 image to a uniquely named /tmp file and upload to HF dataset repo. Returns: local_tmp_path, hf_url, path_in_repo, size_bytes """ try: # decode image_bytes = base64.b64decode(image_b64) size_bytes = len(image_bytes) print("[debug] decoded image bytes:", size_bytes) if size_bytes < 10: raise ValueError("Decoded image is too small or invalid base64") # unique tmp filename (avoid collision across workers) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_tmp_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_tmp_path, "wb") as f: f.write(image_bytes) print(f"[debug] wrote local tmp file: {local_tmp_path}") # Prepare filename in repo (put at repo root to avoid folder permission issues) filename = f"robot_{timestamp}.jpg" path_in_repo = filename # upload_file might raise. capture exception and show traceback upload_file( path_or_fileobj=local_tmp_path, path_in_repo=path_in_repo, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset" ) hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" print("[debug] upload successful:", hf_image_url) return local_tmp_path, hf_image_url, path_in_repo, size_bytes except Exception as e: print("[error] save_and_upload_image failed:", e) traceback.print_exc() return None, None, None, 0 # ========================================== # Main logic # ========================================== def safe_parse_json_from_text(text: str) -> Optional[dict]: """ Try to extract JSON object from model output. Accepts raw JSON, or a ```json\n{...}``` block, or text with JSON substring. Returns dict or None. """ if not text: return None # remove markdown fences t = text.strip() if t.startswith("```") and "```" in t[3:]: # remove outer fences t = t.strip("`") # find first '{' and last '}' to try to extract JSON substring start = t.find("{") end = t.rfind("}") if start >= 0 and end > start: candidate = t[start:end+1] try: return json.loads(candidate) except Exception: # fallback: try the whole text try: return json.loads(t) except Exception: return None else: try: return json.loads(t) except Exception: return None def validate_and_call_tool(tool_name: str, tool_args: dict): if not tool_name: return {"error": "No tool_name provided by VLM."} if tool_name not in TOOL_REGISTRY: return {"error": f"Tool '{tool_name}' not found in registry."} # safe-call: ensure dict args only contain acceptable keys for that tool try: result = TOOL_REGISTRY[tool_name](**tool_args) return result except TypeError as e: return {"error": f"Tool call argument mismatch: {str(e)}"} except Exception as e: traceback.print_exc() return {"error": f"Tool execution failed: {str(e)}"} def process_and_describe(payload: dict): """ payload expects keys: - hf_token (string) - image_b64 (base64 str) - robot_id (optional) - request_id (optional) # recommended to dedupe retries """ vlm_text = "" tool_result = None action_data = {} try: # basic checks hf_token = payload.get("hf_token") if not hf_token: return {"error": "HF token not provided in payload. Token must have datasets write permission if uploading."} request_id = payload.get("request_id") or payload.get("robot_id") or None if request_id: with PROCESSED_LOCK: if request_id in PROCESSED_REQUESTS: print("[info] duplicate request_id detected; returning cached result") return PROCESSED_REQUESTS[request_id] robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "No image provided in payload."} # Save & upload (only once per invocation) local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) if not hf_url: # Upload failed: return error with helpful debug info return { "error": "Image upload failed on server.", "debug": { "local_tmp_path": local_tmp_path, "path_in_repo": path_in_repo, "size_bytes": size_bytes } } # Build system prompt (kept compact) tools_desc = json.dumps({ "speak": {"text": "string", "emotion": "string"}, "navigate": {"direction": "forward/left/right", "distance_meters": "float"}, "scan_hazard": {"hazard_type": "string", "severity": "low/medium/high"}, "analyze_human": {"clothing_color": "string", "estimated_action": "string"} }, indent=2) system_prompt = f""" You are a Robot Control AI. Analyze the image and choose ONE tool to execute. AVAILABLE TOOLS (JSON Schema): {tools_desc} INSTRUCTIONS: 1. Describe what you see briefly. 2. Select the single most appropriate tool and provide arguments matching the schema. RESPONSE FORMAT (Strict JSON): {{ "description": "Brief visual description", "tool_name": "name_of_tool", "arguments": {{ ...args matching schema... }} }} """ # Build messages payload for VLM - include the uploaded HF URL (some VLMs can fetch it) messages_payload = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Analyze this camera feed and decide on an action."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] # Instantiate HF Inference client and call chat completion hf_client = InferenceClient(token=hf_token) # NOTE: huggingface InferenceClient usage may vary by version. We use the chat completions create call. chat_completion = hf_client.chat.completions.create( model=HF_VLM_MODEL, messages=messages_payload, max_tokens=300, temperature=0.1 ) vlm_text = chat_completion.choices[0].message.content.strip() print("[debug] VLM raw output:", vlm_text[:1000]) # attempt to parse JSON parsed = safe_parse_json_from_text(vlm_text) if parsed is None: # If the model didn't return JSON, return descriptive fallback but do not execute tools result = { "status": "model_no_json", "robot_id": robot_id, "image_url": hf_url, "vlm_raw": vlm_text, "message": "VLM did not return valid JSON following the required schema." } if request_id: with PROCESSED_LOCK: PROCESSED_REQUESTS[request_id] = result return result action_data = parsed tool_name = action_data.get("tool_name") tool_args = action_data.get("arguments", {}) or {} # Validate that arguments is a dict if not isinstance(tool_args, dict): tool_args = {} # Execute the tool once and capture result print(f"[info] Executing tool: {tool_name} with args {tool_args}") tool_result = validate_and_call_tool(tool_name, tool_args) result = { "status": "success", "robot_id": robot_id, "image_url": hf_url, "image_bytes": size_bytes, "analysis": action_data.get("description"), "chosen_tool": tool_name, "tool_arguments": tool_args, "tool_execution_result": tool_result, "vlm_raw": vlm_text } if request_id: with PROCESSED_LOCK: PROCESSED_REQUESTS[request_id] = result return result except Exception as e: traceback.print_exc() return {"error": f"Server error: {str(e)}", "vlm_raw": vlm_text} # --- Gradio Interface --- iface = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input (JSON with 'image_b64', 'hf_token', optional 'request_id')"), outputs=gr.JSON(label="Robot Command Output"), api_name="predict", allow_flagging="never", live=False ) if __name__ == "__main__": # When deploying to HF Space: set server_name and server_port via env if you need iface.launch()