fbot_agent_sim / server /fbot_agent_sim_environment.py
crislmfroes's picture
Upload folder using huggingface_hub
18ca40a verified
Raw
History Blame Contribute Delete
18.1 kB
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Fbot Agent Sim Environment Implementation.
A simple test environment that echoes back messages sent to it.
Perfect for testing HTTP server infrastructure.
"""
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
try:
from ..models import FbotAgentSimAction, FbotAgentSimObservation, WorldState, RobotState, Pose, Object, Person
except ImportError:
from models import FbotAgentSimAction, FbotAgentSimObservation
from typing import Tuple, Dict
import math
class FbotAgentSimEnvironment(Environment):
"""
A simple echo environment that echoes back messages.
This environment is designed for testing the HTTP server infrastructure.
It maintains minimal state and simply echoes back whatever message it receives.
Example:
>>> env = FbotAgentSimEnvironment()
>>> obs = env.reset()
>>> print(obs.echoed_message) # "Fbot Agent Sim environment ready!"
>>>
>>> obs = env.step(FbotAgentSimAction(message="Hello"))
>>> print(obs.echoed_message) # "Hello"
>>> print(obs.message_length) # 5
"""
# Enable concurrent WebSocket sessions.
# Set to True if your environment isolates state between instances.
# When True, multiple WebSocket clients can connect simultaneously, each
# getting their own environment instance (when using factory mode in app.py).
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self._state = None
self._last_observation = None
def reset(self) -> FbotAgentSimObservation:
"""Reset the world to a default configuration."""
# Robot starts at origin
robot = RobotState()
# Predefine some objects
objects = {
"ball": Object("ball", Pose(2.0, 1.0, 0), description="red ball"),
"cup": Object("cup", Pose(3.0, 2.5, 0), description="blue ceramic cup"),
"book": Object("book", Pose(1.0, 4.0, 0), description="thick science book"),
}
# Predefine some people
people = {
"Alice": Person("Alice", "woman with glasses", face_uuid="face_alice", pose=Pose(5.0, 0.0, 0)),
"Bob": Person("Bob", "short man with beard", face_uuid="face_bob", pose=Pose(0.0, 5.0, 0)),
}
named_locations = {
"kitchen": Pose(2.0, 2.0, 0),
"living_room": Pose(4.0, 4.0, 0),
"dining_table": Pose(3.0, 3.0, 0),
}
self._state = WorldState(robot=robot, objects=objects, people=people, named_locations=named_locations)
self._last_observation = self._generate_observation("Reset.", success=True)
return self._last_observation
def step(self, action: FbotAgentSimAction) -> Tuple[FbotAgentSimObservation, float, bool, Dict]:
"""
Execute the given tool action, update state, and return observation,
reward, done flag, and info dict.
"""
try:
# Dispatch based on tool name
method_name = f"_tool_{action.tool}"
if hasattr(self, method_name):
result_msg, success = getattr(self, method_name)(action)
else:
raise ValueError(f"Unknown tool: {action.tool}")
# Generate observation
obs = self._generate_observation(result_msg, success)
# Compute reward
reward = self._compute_reward(obs, action)
done = False # episodes are continuous (never done)
info = {"action": action.tool, "success": success}
self._last_observation = obs
return obs, reward, done, info
except Exception as e:
# On error, return a failure observation
obs = self._generate_observation(f"Error: {str(e)}", success=False)
return obs, -1.0, False, {"error": str(e)}
# -----------------------------------------------------------------
# Tool implementations
# -----------------------------------------------------------------
def _tool_navigate_to_pose(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Navigate to a named location."""
if action.location_name not in self._state.named_locations:
return f"Location '{action.location_name}' unknown.", False
target = self._state.named_locations[action.location_name]
self._state.robot.pose = Pose(target.x, target.y, target.theta)
return f"Navigated to {action.location_name}.", True
def _tool_move_forward(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Move forward by distance (meters)."""
d = action.distance if action.distance else 0.0
if d < 0:
return "Distance must be positive.", False
self._state.robot.pose.x += d * math.cos(self._state.robot.pose.theta)
self._state.robot.pose.y += d * math.sin(self._state.robot.pose.theta)
return f"Moved forward {d:.2f}m.", True
def _tool_rotate(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Rotate in place by angle (radians)."""
angle = action.angle if action.angle else 0.0
self._state.robot.pose.theta += angle
return f"Rotated by {angle:.2f} rad.", True
def _tool_query_pose_by_name(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Get the pose of a named location."""
if action.location_name not in self._state.named_locations:
return f"Location '{action.location_name}' unknown.", False
pose = self._state.named_locations[action.location_name]
return f"Pose of {action.location_name}: {pose}", True
def _tool_detect_and_approach_person_by_description(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Find a person by physical description and move close."""
desc = action.person_description
if not desc:
return "No description provided.", False
for person in self._state.people.values():
if desc.lower() in person.appearance.lower():
# Move to person's pose
self._state.robot.pose = Pose(person.pose.x, person.pose.y, person.pose.theta)
return f"Found and approached {person.name}.", True
return f"No person matching '{desc}' found.", False
def _tool_detect_and_approach_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Find an object by name and move close."""
name = action.object_name
if not name or name not in self._state.objects:
return f"Object '{name}' not found.", False
obj = self._state.objects[name]
self._state.robot.pose = Pose(obj.pose.x, obj.pose.y, obj.pose.theta)
return f"Approached {name}.", True
def _tool_detect_and_approach_unknown_person_by_name(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Find a person by asking their name (simulated: check people dict)."""
name = action.person_name
if not name or name not in self._state.people:
return f"Person named '{name}' not known.", False
person = self._state.people[name]
self._state.robot.pose = Pose(person.pose.x, person.pose.y, person.pose.theta)
return f"Found and approached {name}.", True
def _tool_follow_person(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Follow a person described by appearance."""
desc = action.person_description
if not desc:
return "No description provided.", False
for person in self._state.people.values():
if desc.lower() in person.appearance.lower():
# Simulate following: robot moves to person's pose each step
self._state.robot.pose = Pose(person.pose.x, person.pose.y, person.pose.theta)
return f"Following {person.name}.", True
return f"Cannot follow: person matching '{desc}' not found.", False
def _tool_detect_and_pick_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Pick up an object by name. Must be within 1m."""
name = action.object_name
if not name or name not in self._state.objects:
return f"Object '{name}' not found.", False
obj = self._state.objects[name]
dist = self._state.robot.pose.distance_to(obj.pose)
if dist > 1.0:
return f"{name} is {dist:.2f}m away – too far to pick.", False
if not obj.is_pickable:
return f"{name} cannot be picked up.", False
if self._state.robot.held_object:
return f"Already holding {self._state.robot.held_object}. Release it first.", False
self._state.robot.held_object = name
# Remove from world (it's now held)
del self._state.objects[name]
return f"Picked up {name}.", True
def _tool_give_object_to_user(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Give the held object to a user (person)."""
user = action.target_user
if not user or user not in self._state.people:
return f"User '{user}' not found.", False
if not self._state.robot.held_object:
return "Nothing to give.", False
obj_name = self._state.robot.held_object
self._state.robot.held_object = None
# Object is now considered given away (not added back to world)
return f"Gave {obj_name} to {user}.", True
def _tool_place_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Place the held object at a named location."""
loc = action.location_pose
if not loc or loc not in self._state.named_locations:
return f"Location '{loc}' unknown.", False
if not self._state.robot.held_object:
return "Nothing to place.", False
obj_name = self._state.robot.held_object
# Create a new object at that location
new_pose = self._state.named_locations[loc]
self._state.objects[obj_name] = Object(obj_name, Pose(new_pose.x, new_pose.y, new_pose.theta))
self._state.robot.held_object = None
return f"Placed {obj_name} at {loc}.", True
def _tool_analyze_scene(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Answer a visual question using a VLM (simulated)."""
q = action.question
if not q:
return "No question provided.", False
# Simple simulated answers based on current state
if "how many people" in q.lower():
count = len(self._state.people)
return f"There are {count} people.", True
if "what am I holding" in q.lower():
held = self._state.robot.held_object or "nothing"
return f"You are holding {held}.", True
# Generic
return f"Simulated VLM answer to: '{q}'.", True
def _tool_count_objects(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Count objects by name or class."""
name = action.count_object
if not name:
return "No object specified.", False
count = sum(1 for obj in self._state.objects.values() if obj.name == name)
return f"Found {count} {name}(s).", True
def _tool_count_people(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Count people matching a description."""
desc = action.count_person_desc
if not desc:
return "No description provided.", False
count = sum(1 for p in self._state.people.values() if desc.lower() in p.appearance.lower())
return f"Found {count} people matching '{desc}'.", True
def _tool_search_person(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Search for a person by name or description."""
name = action.search_name
if not name:
return "No search term provided.", False
for p in self._state.people.values():
if name.lower() in p.name.lower() or name.lower() in p.appearance.lower():
return f"Found {p.name} at {p.pose}.", True
return f"No person matching '{name}' found.", False
def _tool_detect_faces(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Simulate face detection."""
# List people within 3m
nearby = []
for p in self._state.people.values():
if self._state.robot.pose.distance_to(p.pose) < 3.0:
nearby.append(f"{p.name} (uuid: {p.face_uuid})")
if not nearby:
return "No faces detected.", True
return f"Detected faces: {', '.join(nearby)}", True
def _tool_find_person_saved_by_face(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Find a person by saved face UUID."""
uuid = action.face_uuid
if not uuid:
return "No face UUID provided.", False
for p in self._state.people.values():
if p.face_uuid == uuid:
return f"Found {p.name} at {p.pose}.", True
return f"Person with face UUID '{uuid}' not found.", False
def _tool_save_person_face(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Register a new person's face (simulated)."""
# In a real system you'd add a new person. Here we just confirm.
return f"Face saved with UUID {action.face_uuid or 'new-uuid-1234'}.", True
def _tool_detect_object(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Detect an object and return its pose."""
name = action.object_name
if not name or name not in self._state.objects:
return f"Object '{name}' not detected.", False
obj = self._state.objects[name]
return f"Detected {name} at {obj.pose}.", True
def _tool_say_something(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Speak text using TTS (simulated)."""
text = action.text
if not text:
return "Nothing to say.", False
self._state.robot.last_speech = text
return f"Said: '{text}'", True
def _tool_listen_something(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Listen for speech input via ASR (simulated)."""
# In simulation we can return a canned phrase or ask for input.
# For demo, we'll return a fake "heard" message.
self._state.robot.heard_speech = "User said: 'Hello robot'"
return f"Heard: {self._state.robot.heard_speech}", True
def _tool_transform_pose(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Transform a pose between coordinate frames (simulated identity)."""
if not action.source_pose:
return "No source pose provided.", False
# In a real system you'd transform using TF. Here we return the same pose.
return f"Transformed pose: {action.source_pose}", True
def _tool_get_question_answer(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Answer general knowledge questions (simulated LLM)."""
q = action.text
if not q:
return "No question provided.", False
# Simple canned answers
if "capital of France" in q.lower():
return "Paris", True
if "meaning of life" in q.lower():
return "42", True
return f"I don't know the answer to '{q}'.", True
def _tool_set_emotion(self, action: FbotAgentSimAction) -> Tuple[str, bool]:
"""Set the robot's facial emotion display."""
emotion = action.emotion
allowed = ["neutral", "happy", "sad", "angry", "surprised"]
if emotion not in allowed:
return f"Emotion must be one of {allowed}.", False
self._state.robot.emotion = emotion
return f"Emotion set to {emotion}.", True
# -----------------------------------------------------------------
# Helper methods
# -----------------------------------------------------------------
def _generate_observation(self, message: str, success: bool) -> FbotAgentSimObservation:
"""Create an observation from the current world state."""
robot = self._state.robot
# Detect nearby objects (within 2m)
near_objects = []
for name, obj in self._state.objects.items():
if robot.pose.distance_to(obj.pose) < 2.0:
near_objects.append(name)
# Detect nearby people (within 5m)
near_people = []
for name, person in self._state.people.items():
if robot.pose.distance_to(person.pose) < 5.0:
near_people.append(name)
return FbotAgentSimObservation(
robot_pose=robot.pose,
held_object=robot.held_object,
near_objects=near_objects,
near_people=near_people,
last_speech=robot.last_speech,
heard_speech=robot.heard_speech,
emotion=robot.emotion,
success=success,
message=message,
)
def _compute_reward(self, obs: FbotAgentSimObservation, action: FbotAgentSimAction) -> float:
"""
Reward function – you can customise this for your task.
Here we give a small positive reward for successful actions,
a small penalty for failures, and extra reward for achieving goals.
"""
reward = 0.0
if obs.success:
reward += 0.1
else:
reward -= 0.05
# Bonus for picking up an object
if action.tool == "detect_and_pick_object" and obs.success:
reward += 0.5
# Bonus for giving an object
if action.tool == "give_object_to_user" and obs.success:
reward += 1.0
# Penalty for saying something rude (optional)
if action.tool == "say_something" and action.text and "stupid" in action.text.lower():
reward -= 0.2
return reward