Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import json | |
| import gradio as gr | |
| from huggingface_hub import upload_file, InferenceClient | |
| from datetime import datetime | |
| # --- Config --- | |
| HF_DATASET_REPO = "OppaAI/Robot_MCP" | |
| HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| # ========================================== | |
| # 1. DEFINE ROBOT TOOLS | |
| # ========================================== | |
| def tool_speak(text: str, emotion: str = "neutral") -> dict: | |
| """ | |
| Command the robot to speak text via TTS. | |
| """ | |
| # In a real scenario, this would send a signal to the robot's speaker driver | |
| return { | |
| "status": "success", | |
| "action_executed": "speak", | |
| "payload": {"text": text, "emotion": emotion} | |
| } | |
| def tool_navigate(direction: str, distance_meters: float) -> dict: | |
| """ | |
| Move the robot. Direction options: 'forward', 'backward', 'left', 'right'. | |
| """ | |
| if distance_meters > 5.0: | |
| return {"status": "error", "message": "Safety limit: Cannot move more than 5m at once."} | |
| return { | |
| "status": "success", | |
| "action_executed": "navigate", | |
| "payload": {"direction": direction, "distance": distance_meters} | |
| } | |
| def tool_scan_hazard(hazard_type: str, severity: str) -> dict: | |
| """ | |
| Log a safety hazard if seen in the image (e.g., 'fire', 'water', 'obstacle'). | |
| """ | |
| timestamp = datetime.now().isoformat() | |
| log_entry = f"[{timestamp}] WARNING: {hazard_type} detected (Severity: {severity})" | |
| # Here you would write to a log file or trigger an alarm | |
| return { | |
| "status": "warning_logged", | |
| "log": log_entry | |
| } | |
| def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict: | |
| """ | |
| Specialized analysis when a human is detected. | |
| """ | |
| return { | |
| "status": "human_tracked", | |
| "details": f"Human wearing {clothing_color} is likely {estimated_action}." | |
| } | |
| # --- Tool Dispatcher --- | |
| # This maps string names to the actual Python functions | |
| TOOL_REGISTRY = { | |
| "speak": tool_speak, | |
| "navigate": tool_navigate, | |
| "scan_hazard": tool_scan_hazard, | |
| "analyze_human": tool_analyze_human | |
| } | |
| # ========================================== | |
| # 2. HELPER FUNCTIONS | |
| # ========================================== | |
| def save_and_upload_image(image_b64: str, hf_token: str): | |
| try: | |
| image_bytes = base64.b64decode(image_b64) | |
| local_tmp_path = "/tmp/tmp.jpg" | |
| with open(local_tmp_path, "wb") as f: | |
| f.write(image_bytes) | |
| # Create unique filename to avoid overwriting | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| path_in_repo = f"images/robot_{timestamp}.jpg" | |
| upload_file( | |
| path_or_fileobj=local_tmp_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=HF_DATASET_REPO, | |
| token=hf_token, | |
| repo_type="dataset" | |
| ) | |
| hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" | |
| return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes) | |
| except Exception as e: | |
| print(f"Upload failed: {e}") | |
| return None, None, None, 0 | |
| # ========================================== | |
| # 3. MAIN LOGIC | |
| # ========================================== | |
| def process_and_describe(payload: dict): | |
| tool_result = None | |
| vlm_text = "" | |
| action_data = {} | |
| try: | |
| hf_token = payload.get("hf_token") | |
| if not hf_token: | |
| return {"error": "HF token not provided in payload."} | |
| robot_id = payload.get("robot_id", "unknown") | |
| image_b64 = payload.get("image_b64") | |
| if not image_b64: | |
| return {"error": "No image provided."} | |
| # Upload Image | |
| local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token) | |
| # Initialize HF Client | |
| hf_client = InferenceClient(token=hf_token) | |
| # --- Dynamic System Prompt Construction --- | |
| tools_desc = json.dumps({ | |
| "speak": {"text": "string", "emotion": "string"}, | |
| "navigate": {"direction": "forward/left/right", "distance_meters": "float"}, | |
| "scan_hazard": {"hazard_type": "string", "severity": "low/medium/high"}, | |
| "analyze_human": {"clothing_color": "string", "estimated_action": "string"} | |
| }, indent=2) | |
| system_prompt = f""" | |
| You are a Robot Control AI. Analyze the image and choose ONE tool to execute. | |
| AVAILABLE TOOLS (JSON Schema): | |
| {tools_desc} | |
| INSTRUCTIONS: | |
| 1. Describe what you see briefly. | |
| 2. Select the most appropriate tool based on the visual context. | |
| - If you see a person -> use 'analyze_human' OR 'speak'. | |
| - If you see a clear path -> use 'navigate'. | |
| - If you see fire/mess -> use 'scan_hazard'. | |
| RESPONSE FORMAT (Strict JSON): | |
| {{ | |
| "description": "Brief visual description", | |
| "tool_name": "name_of_tool", | |
| "arguments": {{ ...args matching schema... }} | |
| }} | |
| """ | |
| messages_payload = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Analyze this camera feed and decide on an action."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} | |
| ]} | |
| ] | |
| # Call VLM | |
| chat_completion = hf_client.chat.completions.create( | |
| model=HF_VLM_MODEL, | |
| messages=messages_payload, | |
| max_tokens=300, | |
| temperature=0.1 # Low temp for reliable JSON | |
| ) | |
| vlm_text = chat_completion.choices[0].message.content.strip() | |
| # Clean up markdown code blocks if the model adds them (```json ... ```) | |
| if vlm_text.startswith("```"): | |
| vlm_text = vlm_text.strip("`").replace("json", "").strip() | |
| # Parse JSON | |
| try: | |
| action_data = json.loads(vlm_text) | |
| # --- TOOL EXECUTION BLOCK --- | |
| tool_name = action_data.get("tool_name") | |
| tool_args = action_data.get("arguments", {}) | |
| if tool_name in TOOL_REGISTRY: | |
| # Execute the Python function dynamically | |
| print(f"Executing tool: {tool_name} with args {tool_args}") | |
| tool_result = TOOL_REGISTRY[tool_name](**tool_args) | |
| else: | |
| tool_result = {"error": f"Tool '{tool_name}' not found in registry."} | |
| except json.JSONDecodeError: | |
| action_data = {"description": vlm_text, "tool_name": None} | |
| tool_result = {"error": "Model did not return valid JSON."} | |
| return { | |
| "status": "success", | |
| "robot_id": robot_id, | |
| "image_url": hf_url, | |
| "analysis": action_data.get("description"), | |
| "chosen_tool": action_data.get("tool_name"), | |
| "tool_arguments": action_data.get("arguments"), | |
| "tool_execution_result": tool_result | |
| } | |
| except Exception as e: | |
| return {"error": f"Server error: {str(e)}", "raw_response": vlm_text} | |
| # --- Gradio Interface --- | |
| demo = gr.Interface( | |
| fn=process_and_describe, | |
| inputs=gr.JSON(label="Input (JSON with 'image_b64' and 'hf_token')"), | |
| outputs=gr.JSON(label="Robot Command Output"), | |
| api_name="predict" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |