Spaces:
Runtime error
Runtime error
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Fbot Agent Sim Environment Implementation. | |
| A simple test environment that echoes back messages sent to it. | |
| Perfect for testing HTTP server infrastructure. | |
| """ | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| try: | |
| from ..models import FbotAgentSimAction, FbotAgentSimObservation, WorldState, RobotState, Pose, Object, Person | |
| except ImportError: | |
| from models import FbotAgentSimAction, FbotAgentSimObservation | |
| from typing import Tuple, Dict | |
| import math | |
| class FbotAgentSimEnvironment(Environment): | |
| """ | |
| A simple echo environment that echoes back messages. | |
| This environment is designed for testing the HTTP server infrastructure. | |
| It maintains minimal state and simply echoes back whatever message it receives. | |
| Example: | |
| >>> env = FbotAgentSimEnvironment() | |
| >>> obs = env.reset() | |
| >>> print(obs.echoed_message) # "Fbot Agent Sim environment ready!" | |
| >>> | |
| >>> obs = env.step(FbotAgentSimAction(message="Hello")) | |
| >>> print(obs.echoed_message) # "Hello" | |
| >>> print(obs.message_length) # 5 | |
| """ | |
| # Enable concurrent WebSocket sessions. | |
| # Set to True if your environment isolates state between instances. | |
| # When True, multiple WebSocket clients can connect simultaneously, each | |
| # getting their own environment instance (when using factory mode in app.py). | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self._state = None | |
| self._last_observation = None | |
| def reset(self) -> FbotAgentSimObservation: | |
| """Reset the world to a default configuration.""" | |
| # Robot starts at origin | |
| robot = RobotState() | |
| # Predefine some objects | |
| objects = { | |
| "ball": Object("ball", Pose(2.0, 1.0, 0), description="red ball"), | |
| "cup": Object("cup", Pose(3.0, 2.5, 0), description="blue ceramic cup"), | |
| "book": Object("book", Pose(1.0, 4.0, 0), description="thick science book"), | |
| } | |
| # Predefine some people | |
| people = { | |
| "Alice": Person("Alice", "woman with glasses", face_uuid="face_alice", pose=Pose(5.0, 0.0, 0)), | |
| "Bob": Person("Bob", "short man with beard", face_uuid="face_bob", pose=Pose(0.0, 5.0, 0)), | |
| } | |
| named_locations = { | |
| "kitchen": Pose(2.0, 2.0, 0), | |
| "living_room": Pose(4.0, 4.0, 0), | |
| "dining_table": Pose(3.0, 3.0, 0), | |
| } | |
| self._state = WorldState(robot=robot, objects=objects, people=people, named_locations=named_locations) | |
| self._last_observation = self._generate_observation("Reset.", success=True) | |
| return self._last_observation | |
| def step(self, action: FbotAgentSimAction) -> Tuple[FbotAgentSimObservation, float, bool, Dict]: | |
| """ | |
| Execute the given tool action, update state, and return observation, | |
| reward, done flag, and info dict. | |
| """ | |
| try: | |
| # Dispatch based on tool name | |
| method_name = f"_tool_{action.tool}" | |
| if hasattr(self, method_name): | |
| result_msg, success = getattr(self, method_name)(action) | |
| else: | |
| raise ValueError(f"Unknown tool: {action.tool}") | |
| # Generate observation | |
| obs = self._generate_observation(result_msg, success) | |
| # Compute reward | |
| reward = self._compute_reward(obs, action) | |
| done = False # episodes are continuous (never done) | |
| info = {"action": action.tool, "success": success} | |
| self._last_observation = obs | |
| return obs, reward, done, info | |
| except Exception as e: | |
| # On error, return a failure observation | |
| obs = self._generate_observation(f"Error: {str(e)}", success=False) | |
| return obs, -1.0, False, {"error": str(e)} | |
| # ----------------------------------------------------------------- | |
| # Tool implementations | |
| # ----------------------------------------------------------------- | |
| def _tool_navigate_to_pose(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Navigate to a named location.""" | |
| if action.location_name not in self._state.named_locations: | |
| return f"Location '{action.location_name}' unknown.", False | |
| target = self._state.named_locations[action.location_name] | |
| self._state.robot.pose = Pose(target.x, target.y, target.theta) | |
| return f"Navigated to {action.location_name}.", True | |
| def _tool_move_forward(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Move forward by distance (meters).""" | |
| d = action.distance if action.distance else 0.0 | |
| if d < 0: | |
| return "Distance must be positive.", False | |
| self._state.robot.pose.x += d * math.cos(self._state.robot.pose.theta) | |
| self._state.robot.pose.y += d * math.sin(self._state.robot.pose.theta) | |
| return f"Moved forward {d:.2f}m.", True | |
| def _tool_rotate(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Rotate in place by angle (radians).""" | |
| angle = action.angle if action.angle else 0.0 | |
| self._state.robot.pose.theta += angle | |
| return f"Rotated by {angle:.2f} rad.", True | |
| def _tool_query_pose_by_name(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Get the pose of a named location.""" | |
| if action.location_name not in self._state.named_locations: | |
| return f"Location '{action.location_name}' unknown.", False | |
| pose = self._state.named_locations[action.location_name] | |
| return f"Pose of {action.location_name}: {pose}", True | |
| def _tool_detect_and_approach_person_by_description(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Find a person by physical description and move close.""" | |
| desc = action.person_description | |
| if not desc: | |
| return "No description provided.", False | |
| for person in self._state.people.values(): | |
| if desc.lower() in person.appearance.lower(): | |
| # Move to person's pose | |
| self._state.robot.pose = Pose(person.pose.x, person.pose.y, person.pose.theta) | |
| return f"Found and approached {person.name}.", True | |
| return f"No person matching '{desc}' found.", False | |
| def _tool_detect_and_approach_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Find an object by name and move close.""" | |
| name = action.object_name | |
| if not name or name not in self._state.objects: | |
| return f"Object '{name}' not found.", False | |
| obj = self._state.objects[name] | |
| self._state.robot.pose = Pose(obj.pose.x, obj.pose.y, obj.pose.theta) | |
| return f"Approached {name}.", True | |
| def _tool_detect_and_approach_unknown_person_by_name(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Find a person by asking their name (simulated: check people dict).""" | |
| name = action.person_name | |
| if not name or name not in self._state.people: | |
| return f"Person named '{name}' not known.", False | |
| person = self._state.people[name] | |
| self._state.robot.pose = Pose(person.pose.x, person.pose.y, person.pose.theta) | |
| return f"Found and approached {name}.", True | |
| def _tool_follow_person(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Follow a person described by appearance.""" | |
| desc = action.person_description | |
| if not desc: | |
| return "No description provided.", False | |
| for person in self._state.people.values(): | |
| if desc.lower() in person.appearance.lower(): | |
| # Simulate following: robot moves to person's pose each step | |
| self._state.robot.pose = Pose(person.pose.x, person.pose.y, person.pose.theta) | |
| return f"Following {person.name}.", True | |
| return f"Cannot follow: person matching '{desc}' not found.", False | |
| def _tool_detect_and_pick_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Pick up an object by name. Must be within 1m.""" | |
| name = action.object_name | |
| if not name or name not in self._state.objects: | |
| return f"Object '{name}' not found.", False | |
| obj = self._state.objects[name] | |
| dist = self._state.robot.pose.distance_to(obj.pose) | |
| if dist > 1.0: | |
| return f"{name} is {dist:.2f}m away – too far to pick.", False | |
| if not obj.is_pickable: | |
| return f"{name} cannot be picked up.", False | |
| if self._state.robot.held_object: | |
| return f"Already holding {self._state.robot.held_object}. Release it first.", False | |
| self._state.robot.held_object = name | |
| # Remove from world (it's now held) | |
| del self._state.objects[name] | |
| return f"Picked up {name}.", True | |
| def _tool_give_object_to_user(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Give the held object to a user (person).""" | |
| user = action.target_user | |
| if not user or user not in self._state.people: | |
| return f"User '{user}' not found.", False | |
| if not self._state.robot.held_object: | |
| return "Nothing to give.", False | |
| obj_name = self._state.robot.held_object | |
| self._state.robot.held_object = None | |
| # Object is now considered given away (not added back to world) | |
| return f"Gave {obj_name} to {user}.", True | |
| def _tool_place_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Place the held object at a named location.""" | |
| loc = action.location_pose | |
| if not loc or loc not in self._state.named_locations: | |
| return f"Location '{loc}' unknown.", False | |
| if not self._state.robot.held_object: | |
| return "Nothing to place.", False | |
| obj_name = self._state.robot.held_object | |
| # Create a new object at that location | |
| new_pose = self._state.named_locations[loc] | |
| self._state.objects[obj_name] = Object(obj_name, Pose(new_pose.x, new_pose.y, new_pose.theta)) | |
| self._state.robot.held_object = None | |
| return f"Placed {obj_name} at {loc}.", True | |
| def _tool_analyze_scene(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Answer a visual question using a VLM (simulated).""" | |
| q = action.question | |
| if not q: | |
| return "No question provided.", False | |
| # Simple simulated answers based on current state | |
| if "how many people" in q.lower(): | |
| count = len(self._state.people) | |
| return f"There are {count} people.", True | |
| if "what am I holding" in q.lower(): | |
| held = self._state.robot.held_object or "nothing" | |
| return f"You are holding {held}.", True | |
| # Generic | |
| return f"Simulated VLM answer to: '{q}'.", True | |
| def _tool_count_objects(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Count objects by name or class.""" | |
| name = action.count_object | |
| if not name: | |
| return "No object specified.", False | |
| count = sum(1 for obj in self._state.objects.values() if obj.name == name) | |
| return f"Found {count} {name}(s).", True | |
| def _tool_count_people(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Count people matching a description.""" | |
| desc = action.count_person_desc | |
| if not desc: | |
| return "No description provided.", False | |
| count = sum(1 for p in self._state.people.values() if desc.lower() in p.appearance.lower()) | |
| return f"Found {count} people matching '{desc}'.", True | |
| def _tool_search_person(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Search for a person by name or description.""" | |
| name = action.search_name | |
| if not name: | |
| return "No search term provided.", False | |
| for p in self._state.people.values(): | |
| if name.lower() in p.name.lower() or name.lower() in p.appearance.lower(): | |
| return f"Found {p.name} at {p.pose}.", True | |
| return f"No person matching '{name}' found.", False | |
| def _tool_detect_faces(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Simulate face detection.""" | |
| # List people within 3m | |
| nearby = [] | |
| for p in self._state.people.values(): | |
| if self._state.robot.pose.distance_to(p.pose) < 3.0: | |
| nearby.append(f"{p.name} (uuid: {p.face_uuid})") | |
| if not nearby: | |
| return "No faces detected.", True | |
| return f"Detected faces: {', '.join(nearby)}", True | |
| def _tool_find_person_saved_by_face(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Find a person by saved face UUID.""" | |
| uuid = action.face_uuid | |
| if not uuid: | |
| return "No face UUID provided.", False | |
| for p in self._state.people.values(): | |
| if p.face_uuid == uuid: | |
| return f"Found {p.name} at {p.pose}.", True | |
| return f"Person with face UUID '{uuid}' not found.", False | |
| def _tool_save_person_face(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Register a new person's face (simulated).""" | |
| # In a real system you'd add a new person. Here we just confirm. | |
| return f"Face saved with UUID {action.face_uuid or 'new-uuid-1234'}.", True | |
| def _tool_detect_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Detect an object and return its pose.""" | |
| name = action.object_name | |
| if not name or name not in self._state.objects: | |
| return f"Object '{name}' not detected.", False | |
| obj = self._state.objects[name] | |
| return f"Detected {name} at {obj.pose}.", True | |
| def _tool_say_something(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Speak text using TTS (simulated).""" | |
| text = action.text | |
| if not text: | |
| return "Nothing to say.", False | |
| self._state.robot.last_speech = text | |
| return f"Said: '{text}'", True | |
| def _tool_listen_something(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Listen for speech input via ASR (simulated).""" | |
| # In simulation we can return a canned phrase or ask for input. | |
| # For demo, we'll return a fake "heard" message. | |
| self._state.robot.heard_speech = "User said: 'Hello robot'" | |
| return f"Heard: {self._state.robot.heard_speech}", True | |
| def _tool_transform_pose(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Transform a pose between coordinate frames (simulated identity).""" | |
| if not action.source_pose: | |
| return "No source pose provided.", False | |
| # In a real system you'd transform using TF. Here we return the same pose. | |
| return f"Transformed pose: {action.source_pose}", True | |
| def _tool_get_question_answer(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Answer general knowledge questions (simulated LLM).""" | |
| q = action.text | |
| if not q: | |
| return "No question provided.", False | |
| # Simple canned answers | |
| if "capital of France" in q.lower(): | |
| return "Paris", True | |
| if "meaning of life" in q.lower(): | |
| return "42", True | |
| return f"I don't know the answer to '{q}'.", True | |
| def _tool_set_emotion(self, action: FbotAgentSimAction) -> Tuple[str, bool]: | |
| """Set the robot's facial emotion display.""" | |
| emotion = action.emotion | |
| allowed = ["neutral", "happy", "sad", "angry", "surprised"] | |
| if emotion not in allowed: | |
| return f"Emotion must be one of {allowed}.", False | |
| self._state.robot.emotion = emotion | |
| return f"Emotion set to {emotion}.", True | |
| # ----------------------------------------------------------------- | |
| # Helper methods | |
| # ----------------------------------------------------------------- | |
| def _generate_observation(self, message: str, success: bool) -> FbotAgentSimObservation: | |
| """Create an observation from the current world state.""" | |
| robot = self._state.robot | |
| # Detect nearby objects (within 2m) | |
| near_objects = [] | |
| for name, obj in self._state.objects.items(): | |
| if robot.pose.distance_to(obj.pose) < 2.0: | |
| near_objects.append(name) | |
| # Detect nearby people (within 5m) | |
| near_people = [] | |
| for name, person in self._state.people.items(): | |
| if robot.pose.distance_to(person.pose) < 5.0: | |
| near_people.append(name) | |
| return FbotAgentSimObservation( | |
| robot_pose=robot.pose, | |
| held_object=robot.held_object, | |
| near_objects=near_objects, | |
| near_people=near_people, | |
| last_speech=robot.last_speech, | |
| heard_speech=robot.heard_speech, | |
| emotion=robot.emotion, | |
| success=success, | |
| message=message, | |
| ) | |
| def _compute_reward(self, obs: FbotAgentSimObservation, action: FbotAgentSimAction) -> float: | |
| """ | |
| Reward function – you can customise this for your task. | |
| Here we give a small positive reward for successful actions, | |
| a small penalty for failures, and extra reward for achieving goals. | |
| """ | |
| reward = 0.0 | |
| if obs.success: | |
| reward += 0.1 | |
| else: | |
| reward -= 0.05 | |
| # Bonus for picking up an object | |
| if action.tool == "detect_and_pick_object" and obs.success: | |
| reward += 0.5 | |
| # Bonus for giving an object | |
| if action.tool == "give_object_to_user" and obs.success: | |
| reward += 1.0 | |
| # Penalty for saying something rude (optional) | |
| if action.tool == "say_something" and action.text and "stupid" in action.text.lower(): | |
| reward -= 0.2 | |
| return reward | |