File size: 6,497 Bytes
a3643ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from baml_client.types import (
    AnswerQuestion,
    GetVisualInfo,
    GoTo,
    PickObject,
    PlaceObject,
    SayWithContext,
    GiveObject,
    FollowPersonUntil,
    GuidePersonTo,
    Count,
    GetPersonInfo,
    FindPerson,
    FindPersonByName
)
from baml_client.sync_client import b

from command_interpreter.status import Status
from command_interpreter.embeddings.categorization import Embeddings

class Tasks:
    def __init__(self):
        self.tasks = {
            "object_detection": "Detect objects in an image.",
            "image_classification": "Classify the content of an image.",
            "face_recognition": "Recognize faces in an image.",
            "image_segmentation": "Segment different parts of an image.",
            "optical_character_recognition": "Extract text from images."
        }
        self.embeddings = Embeddings()

    
    def go_to(self, command: GoTo, grounding: bool = True) -> tuple[Status, str]:
        if grounding:
            query_result = self.embeddings.query_location(command.location_to_go)
            area = self.embeddings.get_area(query_result)
            subarea = self.embeddings.get_subarea(query_result)
        else:
            area = command.location_to_go
            subarea = None
        return (Status.EXECUTION_SUCCESS, "arrived to: " +
                (area + (" -> " + subarea if subarea else "")))
    
    def pick_object(self, command: PickObject, grounding: bool = True) -> tuple[Status, str]:
        if grounding:
            query_result = self.embeddings.query_item(command.object_to_pick)
            name = self.embeddings.get_name(query_result)
        else:
            name = command.object_to_pick
        return (Status.EXECUTION_SUCCESS, "picked up: " + name)
    
    def place_object(self, command: PlaceObject, grounding: bool = True):
        return Status.EXECUTION_SUCCESS, "placed object"
    
    def say_with_context(self, command: SayWithContext, grounding: bool = True):
        if grounding:
            query_command_history = self.embeddings.query_command_history(command.previous_command_info + " " + command.user_instruction)
            query_tec_knowledge = self.embeddings.query_tec_knowledge(command.previous_command_info + " " + command.user_instruction)
            query_frida_knowledge = self.embeddings.query_frida_knowledge(command.previous_command_info + " " + command.user_instruction)
            query_roborregos_knowledge = self.embeddings.query_roborregos_knowledge(command.previous_command_info + " " + command.user_instruction)
            query_result = ("command history: " + str(query_command_history) + "\n"
                            + "tec knowledge: " + str(query_tec_knowledge) + "\n"
                            + "frida knowledge: " + str(query_frida_knowledge) + "\n"
                            + "roborregos knowledge: " + str(query_roborregos_knowledge))
        else:
            query_result = command.previous_command_info
        response = b.AugmentedResponse(query_result, command.user_instruction)
        return Status.EXECUTION_SUCCESS, response
    
    def answer_question(self, command: AnswerQuestion, grounding: bool = True):
        # It is assumed it always answers the question
        return Status.EXECUTION_SUCCESS, "answered user's question"
    
    def get_visual_info(self, command: GetVisualInfo, grounding: bool = True):
        # It is assumed it always finds a box as the desired object
        return Status.EXECUTION_SUCCESS, "found: box as " + command.measure + " " + command.object_category
    
    def give_object(self, command: GiveObject, grounding: bool = True):
        return Status.EXECUTION_SUCCESS, "object given"
    
    def follow_person_until(self, command: FollowPersonUntil, grounding: bool = True):
        if command.destination == "canceled" or command.destination == "cancelled":
            return Status.EXECUTION_SUCCESS, "followed user until canceled"
        if grounding:
            query_result = self.embeddings.query_location(command.destination)
            area = self.embeddings.get_area(query_result)
            subarea = self.embeddings.get_subarea(query_result)
        else:
            area = command.destination
            subarea = None
        return Status.EXECUTION_SUCCESS, "arrived to: " + (area + (" -> " + subarea if subarea else ""))
    
    def guide_person_to(self, command: GuidePersonTo, grounding: bool = True):
        if grounding:
            query_result = self.embeddings.query_location(command.destination_room)
            area = self.embeddings.get_area(query_result)
            subarea = self.embeddings.get_subarea(query_result)
        else:
            area = command.destination_room
            subarea = None
        return Status.EXECUTION_SUCCESS, "arrived to: " + (area + (" -> " + subarea if subarea else ""))

    def get_person_info(self, command: GetPersonInfo, grounding: bool = True):
        if command.info_type == "gesture":
            return Status.EXECUTION_SUCCESS, "person gesture is pointing to the right"
        elif command.info_type == "pose":
            return Status.EXECUTION_SUCCESS, "person pose is standing"
        elif command.info_type == "name":
            return Status.EXECUTION_SUCCESS, "person name is John"
        
        return Status.EXECUTION_SUCCESS, "person " + command.info_type + " was found"
    
    def count(self, command: Count, grounding: bool = True):
        # Always returns 4
        return Status.EXECUTION_SUCCESS, "found: 4 " + command.target_to_count
    
    def find_person(self, command: FindPerson, grounding: bool = True):
        # Is assumed it always finds the person
        if command.attribute_value == "":
            return Status.EXECUTION_SUCCESS, "found person"
        else:
            return Status.EXECUTION_SUCCESS, "found person with attribute: " + command.attribute_value
    
    def find_person_by_name(self, command: FindPersonByName, grounding: bool = True):
        # Is assumed it always finds the person
        return Status.EXECUTION_SUCCESS, f"found {command.name}"
    
    def add_command_history(self, command, res, status):
        self.embeddings.add_command_history(
            command,
            res,
            status,
        )

    def clear_command_history(self):
        """Clears the command history before execution"""
        self.embeddings.delete_collection("command_history")
        self.embeddings.build_embeddings()