from baml_client.types import ( AnswerQuestion, GetVisualInfo, GoTo, PickObject, PlaceObject, SayWithContext, GiveObject, FollowPersonUntil, GuidePersonTo, Count, GetPersonInfo, FindPerson, FindPersonByName ) from baml_client.sync_client import b from command_interpreter.status import Status from command_interpreter.embeddings.categorization import Embeddings class Tasks: def __init__(self): self.tasks = { "object_detection": "Detect objects in an image.", "image_classification": "Classify the content of an image.", "face_recognition": "Recognize faces in an image.", "image_segmentation": "Segment different parts of an image.", "optical_character_recognition": "Extract text from images." } self.embeddings = Embeddings() def go_to(self, command: GoTo, grounding: bool = True) -> tuple[Status, str]: if grounding: query_result = self.embeddings.query_location(command.location_to_go) area = self.embeddings.get_area(query_result) subarea = self.embeddings.get_subarea(query_result) else: area = command.location_to_go subarea = None return (Status.EXECUTION_SUCCESS, "arrived to: " + (area + (" -> " + subarea if subarea else ""))) def pick_object(self, command: PickObject, grounding: bool = True) -> tuple[Status, str]: if grounding: query_result = self.embeddings.query_item(command.object_to_pick) name = self.embeddings.get_name(query_result) else: name = command.object_to_pick return (Status.EXECUTION_SUCCESS, "picked up: " + name) def place_object(self, command: PlaceObject, grounding: bool = True): return Status.EXECUTION_SUCCESS, "placed object" def say_with_context(self, command: SayWithContext, grounding: bool = True): if grounding: query_command_history = self.embeddings.query_command_history(command.previous_command_info + " " + command.user_instruction) query_tec_knowledge = self.embeddings.query_tec_knowledge(command.previous_command_info + " " + command.user_instruction) query_frida_knowledge = self.embeddings.query_frida_knowledge(command.previous_command_info + " " + command.user_instruction) query_roborregos_knowledge = self.embeddings.query_roborregos_knowledge(command.previous_command_info + " " + command.user_instruction) query_result = ("command history: " + str(query_command_history) + "\n" + "tec knowledge: " + str(query_tec_knowledge) + "\n" + "frida knowledge: " + str(query_frida_knowledge) + "\n" + "roborregos knowledge: " + str(query_roborregos_knowledge)) else: query_result = command.previous_command_info response = b.AugmentedResponse(query_result, command.user_instruction) return Status.EXECUTION_SUCCESS, response def answer_question(self, command: AnswerQuestion, grounding: bool = True): # It is assumed it always answers the question return Status.EXECUTION_SUCCESS, "answered user's question" def get_visual_info(self, command: GetVisualInfo, grounding: bool = True): # It is assumed it always finds a box as the desired object return Status.EXECUTION_SUCCESS, "found: box as " + command.measure + " " + command.object_category def give_object(self, command: GiveObject, grounding: bool = True): return Status.EXECUTION_SUCCESS, "object given" def follow_person_until(self, command: FollowPersonUntil, grounding: bool = True): if command.destination == "canceled" or command.destination == "cancelled": return Status.EXECUTION_SUCCESS, "followed user until canceled" if grounding: query_result = self.embeddings.query_location(command.destination) area = self.embeddings.get_area(query_result) subarea = self.embeddings.get_subarea(query_result) else: area = command.destination subarea = None return Status.EXECUTION_SUCCESS, "arrived to: " + (area + (" -> " + subarea if subarea else "")) def guide_person_to(self, command: GuidePersonTo, grounding: bool = True): if grounding: query_result = self.embeddings.query_location(command.destination_room) area = self.embeddings.get_area(query_result) subarea = self.embeddings.get_subarea(query_result) else: area = command.destination_room subarea = None return Status.EXECUTION_SUCCESS, "arrived to: " + (area + (" -> " + subarea if subarea else "")) def get_person_info(self, command: GetPersonInfo, grounding: bool = True): if command.info_type == "gesture": return Status.EXECUTION_SUCCESS, "person gesture is pointing to the right" elif command.info_type == "pose": return Status.EXECUTION_SUCCESS, "person pose is standing" elif command.info_type == "name": return Status.EXECUTION_SUCCESS, "person name is John" return Status.EXECUTION_SUCCESS, "person " + command.info_type + " was found" def count(self, command: Count, grounding: bool = True): # Always returns 4 return Status.EXECUTION_SUCCESS, "found: 4 " + command.target_to_count def find_person(self, command: FindPerson, grounding: bool = True): # Is assumed it always finds the person if command.attribute_value == "": return Status.EXECUTION_SUCCESS, "found person" else: return Status.EXECUTION_SUCCESS, "found person with attribute: " + command.attribute_value def find_person_by_name(self, command: FindPersonByName, grounding: bool = True): # Is assumed it always finds the person return Status.EXECUTION_SUCCESS, f"found {command.name}" def add_command_history(self, command, res, status): self.embeddings.add_command_history( command, res, status, ) def clear_command_history(self): """Clears the command history before execution""" self.embeddings.delete_collection("command_history") self.embeddings.build_embeddings()