# app.py import os import base64 import json import gradio as gr from huggingface_hub import upload_file, InferenceClient from datetime import datetime import traceback import threading from typing import Tuple, Optional, Dict, Any HF_DATASET_REPO = "OppaAI/Robot_MCP" HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" PROCESSED_REQUESTS: Dict[str, Dict[str, Any]] = {} PROCESSED_LOCK = threading.Lock() # -------------------- # Robot Tools # -------------------- def tool_speak(text: str, emotion: str = "neutral") -> dict: return {"status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion}} def tool_navigate(direction: str, distance_meters: float) -> dict: if distance_meters > 5.0: return {"status": "error", "message": "Safety limit exceeded"} return {"status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters}} def tool_scan_hazard(hazard_type: str, severity: str) -> dict: timestamp = datetime.now().isoformat() return {"status": "warning_logged", "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})"} def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict: return {"status": "human_tracked", "details": f"Human wearing {clothing_color} is {estimated_action}"} TOOL_REGISTRY = { "speak": tool_speak, "navigate": tool_navigate, "scan_hazard": tool_scan_hazard, "analyze_human": tool_analyze_human } # -------------------- # Save + Upload # -------------------- def save_and_upload_image(image_b64: str, hf_token: str) -> Tuple[Optional[str], Optional[str], Optional[str], int]: try: image_bytes = base64.b64decode(image_b64) size_bytes = len(image_bytes) print("[debug] decoded image bytes:", size_bytes) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") local_path = f"/tmp/robot_img_{timestamp}.jpg" with open(local_path, "wb") as f: f.write(image_bytes) print("[debug] wrote local tmp file:", local_path) filename = f"robot_{timestamp}.jpg" upload_file( path_or_fileobj=local_path, path_in_repo=filename, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset" ) print("[debug] upload successful:", filename) url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}" return local_path, url, filename, size_bytes except Exception as e: traceback.print_exc() return None, None, None, 0 # -------------------- # JSON Parse Helper # -------------------- def safe_parse_json_from_text(text: str) -> Optional[dict]: if not text: return None try: return json.loads(text) except: pass cleaned = text.strip() if cleaned.startswith("```"): cleaned = cleaned.strip("`") try: start = cleaned.find("{") end = cleaned.rfind("}") if start >= 0 and end > start: return json.loads(cleaned[start:end+1]) except: return None return None # -------------------- # Tool validation + exec # -------------------- def validate_and_call_tool(tool_name: str, tool_args: dict): if not tool_name: return {"error": "Missing tool_name"} if tool_name not in TOOL_REGISTRY: return {"error": f"Unknown tool '{tool_name}'"} try: return TOOL_REGISTRY[tool_name](**tool_args) except Exception as e: traceback.print_exc() return {"error": f"Tool error: {str(e)}"} # -------------------- # Main Function # -------------------- def process_and_describe(payload): # If string → parse JSON if isinstance(payload, str): try: payload = json.loads(payload) except Exception as e: print("[error] invalid JSON from client:", payload) return {"error": f"Invalid JSON string: {str(e)}"} print("\n================ NEW REQUEST ================") print("[debug] Incoming payload:", payload) try: hf_token = payload.get("hf_token") if not hf_token: return {"error": "hf_token missing"} robot_id = payload.get("robot_id", "unknown") image_b64 = payload.get("image_b64") if not image_b64: return {"error": "image_b64 missing"} # Save & Upload local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) if not hf_url: print("[error] Image upload failed.") return {"error": "Image upload failed"} print("[debug] HF image URL:", hf_url) # Build prompt system_prompt = """ Respond in STRICT JSON: { "description":"short visual description", "tool_name":"name", "arguments": { ... } } """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Analyze image and select one tool"}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} ]} ] print("[debug] Calling VLM model...") client = InferenceClient(token=hf_token) response = client.chat.completions.create( model=HF_VLM_MODEL, messages=messages, max_tokens=300, temperature=0.1 ) vlm_output = response.choices[0].message.content.strip() # 🔥 PRINT VLM RAW OUTPUT (你要求的) print("\n------ VLM RAW OUTPUT ------") print(vlm_output) print("------ END VLM RAW ------\n") parsed = safe_parse_json_from_text(vlm_output) if parsed is None: print("[error] VLM did NOT return valid JSON") return { "status": "model_no_json", "robot_id": robot_id, "image_url": hf_url, "vlm_raw": vlm_output, "message": "VLM did not output valid JSON" } tool_name = parsed.get("tool_name") tool_args = parsed.get("arguments") or {} print("[debug] Parsed JSON:", parsed) tool_result = validate_and_call_tool(tool_name, tool_args) result = { "status": "success", "robot_id": robot_id, "image_url": hf_url, "image_bytes": size_bytes, "analysis": parsed.get("description"), "chosen_tool": tool_name, "tool_arguments": tool_args, "tool_execution_result": tool_result, "vlm_raw": vlm_output } print("[debug] Final result:", result) print("============================================\n") return result except Exception as e: traceback.print_exc() return {"error": f"Server exception: {str(e)}"} # -------------------- # Gradio # -------------------- iface = gr.Interface( fn=process_and_describe, inputs=gr.JSON(label="Input JSON"), outputs=gr.JSON(label="Output JSON"), api_name="predict", allow_flagging="never" ) if __name__ == "__main__": iface.launch()