Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import base64 | |
| import json | |
| import gradio as gr | |
| from huggingface_hub import upload_file, InferenceClient | |
| from datetime import datetime | |
| import traceback | |
| import threading | |
| from typing import Tuple, Optional, Dict, Any | |
| # --- Config --- | |
| HF_DATASET_REPO = "OppaAI/Robot_MCP" | |
| HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| # In-memory processed requests cache to prevent duplicate execution for identical request_id | |
| PROCESSED_REQUESTS: Dict[str, Dict[str, Any]] = {} | |
| PROCESSED_LOCK = threading.Lock() | |
| # ========================================== | |
| # Robot Tools (unchanged semantics) | |
| # ========================================== | |
| def tool_speak(text: str, emotion: str = "neutral") -> dict: | |
| return { | |
| "status": "success", | |
| "action_executed": "speak", | |
| "payload": {"text": text, "emotion": emotion} | |
| } | |
| def tool_navigate(direction: str, distance_meters: float) -> dict: | |
| if distance_meters > 5.0: | |
| return {"status": "error", "message": "Safety limit: Cannot move more than 5m at once."} | |
| return { | |
| "status": "success", | |
| "action_executed": "navigate", | |
| "payload": {"direction": direction, "distance": distance_meters} | |
| } | |
| def tool_scan_hazard(hazard_type: str, severity: str) -> dict: | |
| timestamp = datetime.now().isoformat() | |
| log_entry = f"[{timestamp}] WARNING: {hazard_type} detected (Severity: {severity})" | |
| # (in real system: write to file/logging infra) | |
| return { | |
| "status": "warning_logged", | |
| "log": log_entry | |
| } | |
| def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict: | |
| return { | |
| "status": "human_tracked", | |
| "details": f"Human wearing {clothing_color} is likely {estimated_action}." | |
| } | |
| TOOL_REGISTRY = { | |
| "speak": tool_speak, | |
| "navigate": tool_navigate, | |
| "scan_hazard": tool_scan_hazard, | |
| "analyze_human": tool_analyze_human | |
| } | |
| # ========================================== | |
| # Helper: Save & Upload (robust) | |
| # ========================================== | |
| def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]: | |
| """ | |
| Save a base64 image to a uniquely named /tmp file and upload to HF dataset repo. | |
| Returns: local_tmp_path, hf_url, path_in_repo, size_bytes | |
| """ | |
| try: | |
| # decode | |
| image_bytes = base64.b64decode(image_b64) | |
| size_bytes = len(image_bytes) | |
| print("[debug] decoded image bytes:", size_bytes) | |
| if size_bytes < 10: | |
| raise ValueError("Decoded image is too small or invalid base64") | |
| # unique tmp filename (avoid collision across workers) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| local_tmp_path = f"/tmp/robot_img_{timestamp}.jpg" | |
| with open(local_tmp_path, "wb") as f: | |
| f.write(image_bytes) | |
| print(f"[debug] wrote local tmp file: {local_tmp_path}") | |
| # Prepare filename in repo (put at repo root to avoid folder permission issues) | |
| filename = f"robot_{timestamp}.jpg" | |
| path_in_repo = filename | |
| # upload_file might raise. capture exception and show traceback | |
| upload_file( | |
| path_or_fileobj=local_tmp_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=HF_DATASET_REPO, | |
| token=hf_token, | |
| repo_type="dataset" | |
| ) | |
| hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" | |
| print("[debug] upload successful:", hf_image_url) | |
| return local_tmp_path, hf_image_url, path_in_repo, size_bytes | |
| except Exception as e: | |
| print("[error] save_and_upload_image failed:", e) | |
| traceback.print_exc() | |
| return None, None, None, 0 | |
| # ========================================== | |
| # Main logic | |
| # ========================================== | |
| def safe_parse_json_from_text(text: str) -> Optional[dict]: | |
| """ | |
| Try to extract JSON object from model output. | |
| Accepts raw JSON, or a ```json\n{...}``` block, or text with JSON substring. | |
| Returns dict or None. | |
| """ | |
| if not text: | |
| return None | |
| # remove markdown fences | |
| t = text.strip() | |
| if t.startswith("```") and "```" in t[3:]: | |
| # remove outer fences | |
| t = t.strip("`") | |
| # find first '{' and last '}' to try to extract JSON substring | |
| start = t.find("{") | |
| end = t.rfind("}") | |
| if start >= 0 and end > start: | |
| candidate = t[start:end+1] | |
| try: | |
| return json.loads(candidate) | |
| except Exception: | |
| # fallback: try the whole text | |
| try: | |
| return json.loads(t) | |
| except Exception: | |
| return None | |
| else: | |
| try: | |
| return json.loads(t) | |
| except Exception: | |
| return None | |
| def validate_and_call_tool(tool_name: str, tool_args: dict): | |
| if not tool_name: | |
| return {"error": "No tool_name provided by VLM."} | |
| if tool_name not in TOOL_REGISTRY: | |
| return {"error": f"Tool '{tool_name}' not found in registry."} | |
| # safe-call: ensure dict args only contain acceptable keys for that tool | |
| try: | |
| result = TOOL_REGISTRY[tool_name](**tool_args) | |
| return result | |
| except TypeError as e: | |
| return {"error": f"Tool call argument mismatch: {str(e)}"} | |
| except Exception as e: | |
| traceback.print_exc() | |
| return {"error": f"Tool execution failed: {str(e)}"} | |
| def process_and_describe(payload: dict): | |
| """ | |
| payload expects keys: | |
| - hf_token (string) | |
| - image_b64 (base64 str) | |
| - robot_id (optional) | |
| - request_id (optional) # recommended to dedupe retries | |
| """ | |
| vlm_text = "" | |
| tool_result = None | |
| action_data = {} | |
| try: | |
| # basic checks | |
| hf_token = payload.get("hf_token") | |
| if not hf_token: | |
| return {"error": "HF token not provided in payload. Token must have datasets write permission if uploading."} | |
| request_id = payload.get("request_id") or payload.get("robot_id") or None | |
| if request_id: | |
| with PROCESSED_LOCK: | |
| if request_id in PROCESSED_REQUESTS: | |
| print("[info] duplicate request_id detected; returning cached result") | |
| return PROCESSED_REQUESTS[request_id] | |
| robot_id = payload.get("robot_id", "unknown") | |
| image_b64 = payload.get("image_b64") | |
| if not image_b64: | |
| return {"error": "No image provided in payload."} | |
| # Save & upload (only once per invocation) | |
| local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) | |
| if not hf_url: | |
| # Upload failed: return error with helpful debug info | |
| return { | |
| "error": "Image upload failed on server.", | |
| "debug": { | |
| "local_tmp_path": local_tmp_path, | |
| "path_in_repo": path_in_repo, | |
| "size_bytes": size_bytes | |
| } | |
| } | |
| # Build system prompt (kept compact) | |
| tools_desc = json.dumps({ | |
| "speak": {"text": "string", "emotion": "string"}, | |
| "navigate": {"direction": "forward/left/right", "distance_meters": "float"}, | |
| "scan_hazard": {"hazard_type": "string", "severity": "low/medium/high"}, | |
| "analyze_human": {"clothing_color": "string", "estimated_action": "string"} | |
| }, indent=2) | |
| system_prompt = f""" | |
| You are a Robot Control AI. Analyze the image and choose ONE tool to execute. | |
| AVAILABLE TOOLS (JSON Schema): | |
| {tools_desc} | |
| INSTRUCTIONS: | |
| 1. Describe what you see briefly. | |
| 2. Select the single most appropriate tool and provide arguments matching the schema. | |
| RESPONSE FORMAT (Strict JSON): | |
| {{ | |
| "description": "Brief visual description", | |
| "tool_name": "name_of_tool", | |
| "arguments": {{ ...args matching schema... }} | |
| }} | |
| """ | |
| # Build messages payload for VLM - include the uploaded HF URL (some VLMs can fetch it) | |
| messages_payload = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Analyze this camera feed and decide on an action."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} | |
| ]} | |
| ] | |
| # Instantiate HF Inference client and call chat completion | |
| hf_client = InferenceClient(token=hf_token) | |
| # NOTE: huggingface InferenceClient usage may vary by version. We use the chat completions create call. | |
| chat_completion = hf_client.chat.completions.create( | |
| model=HF_VLM_MODEL, | |
| messages=messages_payload, | |
| max_tokens=300, | |
| temperature=0.1 | |
| ) | |
| vlm_text = chat_completion.choices[0].message.content.strip() | |
| print("[debug] VLM raw output:", vlm_text[:1000]) | |
| # attempt to parse JSON | |
| parsed = safe_parse_json_from_text(vlm_text) | |
| if parsed is None: | |
| # If the model didn't return JSON, return descriptive fallback but do not execute tools | |
| result = { | |
| "status": "model_no_json", | |
| "robot_id": robot_id, | |
| "image_url": hf_url, | |
| "vlm_raw": vlm_text, | |
| "message": "VLM did not return valid JSON following the required schema." | |
| } | |
| if request_id: | |
| with PROCESSED_LOCK: | |
| PROCESSED_REQUESTS[request_id] = result | |
| return result | |
| action_data = parsed | |
| tool_name = action_data.get("tool_name") | |
| tool_args = action_data.get("arguments", {}) or {} | |
| # Validate that arguments is a dict | |
| if not isinstance(tool_args, dict): | |
| tool_args = {} | |
| # Execute the tool once and capture result | |
| print(f"[info] Executing tool: {tool_name} with args {tool_args}") | |
| tool_result = validate_and_call_tool(tool_name, tool_args) | |
| result = { | |
| "status": "success", | |
| "robot_id": robot_id, | |
| "image_url": hf_url, | |
| "image_bytes": size_bytes, | |
| "analysis": action_data.get("description"), | |
| "chosen_tool": tool_name, | |
| "tool_arguments": tool_args, | |
| "tool_execution_result": tool_result, | |
| "vlm_raw": vlm_text | |
| } | |
| if request_id: | |
| with PROCESSED_LOCK: | |
| PROCESSED_REQUESTS[request_id] = result | |
| return result | |
| except Exception as e: | |
| traceback.print_exc() | |
| return {"error": f"Server error: {str(e)}", "vlm_raw": vlm_text} | |
| # --- Gradio Interface --- | |
| iface = gr.Interface( | |
| fn=process_and_describe, | |
| inputs=gr.JSON(label="Input (JSON with 'image_b64', 'hf_token', optional 'request_id')"), | |
| outputs=gr.JSON(label="Robot Command Output"), | |
| api_name="predict", | |
| allow_flagging="never", | |
| live=False | |
| ) | |
| if __name__ == "__main__": | |
| # When deploying to HF Space: set server_name and server_port via env if you need | |
| iface.launch() | |