File size: 9,732 Bytes
c835a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
530d354
c835a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
530d354
c835a57
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from core import networkdata as nd
from core.parsing.jsonparser import JSONParser
from core.parsing.jsonencoder import JSONEncoder
from cytoapp.cytoscapeapp import CytoscapeApp
from roverdatahandler import RoverDataHandler
import itertools
import copy
import webbrowser
from core.visualization.tikzstandard import StandardTikzVisualizer as s
from core.visualization.tikzlayer import LayeredTikzVisualizer as l
### HRT Metrics Script

''' The folder containing the JSON data.
'''
directory: str = "data/"

''' The JSON files in the given folder
'''
data_sets: list[str] = [
    "robot_example",
]

# Class for user data
class UserData:
    def __init__(self, QOS=1000000):  # Default value set to 1000000
        self.QOS = QOS

# Custom edge_user_data processor
def custom_user_parse(user_data):
    ''' The default behavior for parsing user data. These types of
        functions recieve the data tied to the "UserData" entries in
        JSON files and return what user_data within the network model
        elements should be set to.
    '''

    QOS = user_data["QOS"]
    # user_data = lambda: None
    if QOS != "":
        user_data = UserData(float(QOS))
    else:
        user_data = UserData(1000000) # Assumption for missing data

    return user_data

''' Store the parsed network models in a dictionary.
'''
data_dict: dict[str, nd.NetworkModel] = {}
for name in data_sets:
    data_dict[name] = JSONParser.parse(directory + name + ".json", e_user_data_func = custom_user_parse)

# Main network
main_net = data_dict["robot_example"]

# Define roles for both human and operator (is there a way to do this through the JSON?)
operator: nd.Agent = nd.Agent("Operator")
rover: nd.Agent = nd.Agent("Robot")

operator.add_action(main_net.get_node("LAA"), operator.allocation_types.Authority)
operator.add_action(main_net.get_node("OLL"), operator.allocation_types.Authority)
operator.add_action(main_net.get_node("CAM"), operator.allocation_types.Authority)
operator.add_action(main_net.get_node("REV"), operator.allocation_types.Authority)
# Obstacle size estimation?

rover.add_action(main_net.get_node("BLM"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("TMP"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("RPP"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("NWS"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("IC"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("RM"), rover.allocation_types.Authority) # Remove this function?

# Responsibility
operator.add_action(main_net.get_node("BLM"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("TMP"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("RPP"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("NWS"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("IC"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("RM"), operator.allocation_types.Responsibility) # Remove this function?


# You can identify what resources are shared between agents as a way to identify interdependencies between agents that
# require some form of information exchange and/or coordination.

# Identify the shared resources
shared_resources = []

# Loop over all nodes in the graph
for node_id in main_net.get_graph().nodes():
    
    node = main_net.get_node(node_id)
    
    # Check if BaseEnvironmentResource
    if issubclass (node.__class__, nd.BaseEnvironmentResource):

        # Get all neighboring nodes (Function Nodes)
        functions_that_set = main_net.get_graph().predecessors(node_id)
        functions_that_get = main_net.get_graph().successors(node_id)

        # A list of all function pairs that are connected through this resources
        interdependent_functions = list(itertools.product(functions_that_get,functions_that_set))

        # For each pair, check whether roles is the same
        for pair in interdependent_functions:
            agent_setting = main_net.get_node(pair[0]).get_authorized_agent()
            agent_getting = main_net.get_node(pair[1]).get_authorized_agent()

            if agent_setting.id != agent_getting.id:
                shared_resources.append((node_id,pair))

# One can also identify where there are mismatches in which agent is authorized to perform a function
# (i.e., is executing the work) and the agent who is responsible (i.e., accountable for the outcome, in a legal or
# organizational sense). Mismatches have implications for coordination overhead, as responsible agents need to be able
# to supervise and manage authorized agents.
# Identify the functions with authority-responsibility mismatches
functions_w_auth_resp_mismatch = []

# Loop over all nodes in the graph
for node_id in main_net.get_graph().nodes():
    
    node = main_net.get_node(node_id)
    
    # Check if DistributedWorkFunction
    if issubclass (node.__class__, nd.DistributedWorkFunction):

        # Get all neighboring nodes (actionNodes)
        authorized_agent = node.get_authorized_agent()
        # responsible_agent = node.get_responsible_agent()

        # For each pair, check whether roles is the same
        if authorized_agent != operator:
            functions_w_auth_resp_mismatch.append(node_id)

# For each function with an authority-responsibility mismatch, do something!
# Current rule: When authority-responsibility mismatch, create a confirmation resources and a confirmation
# function that is allocated to the operator
for node_id in functions_w_auth_resp_mismatch:

    if node_id == "RM":
        continue

    # Create a coordination resource and a teamwork function node
    soc_org_node = main_net.add_node(nd.CoordinationGroundingResource("Confirmation-"+node_id)) # TODO: Specify the node type/class
    main_net.get_node("Confirmation-"+node_id).user_data = "Confirmation-"+node_id # Need to fill these for WMC parsing
    teamwork_node = main_net.add_node(nd.SynchronyFunction("Confirming-"+node_id))
    main_net.get_node("Confirming-"+node_id).user_data = "Confirming-"+node_id # Need to fill these for WMC parsing

    # Add edge going from teamwork node to coordination resource
    main_net.add_edge("Confirming-"+node_id,"Confirmation-"+node_id)
    main_net.get_edge("Confirming-"+node_id,"Confirmation-"+node_id).user_data = UserData(100000)

    # Add edges going from coordination resource to original node
    main_net.add_edge("Confirmation-"+node_id,node_id)

    # Add edge going from work domain resources set by shared action to teamwork node
    for wd_resource in main_net.get_graph().successors(node_id):
        main_net.add_edge(wd_resource,"Confirming-"+node_id)
        
        # Add QOS to each edge (making an assumption that this needs to updated always but can change!)
        main_net.get_edge(wd_resource,"Confirming-"+node_id).user_data = UserData(1000000)

    # Allocate authority and responsibility to human
    operator.add_action(main_net.get_node("Confirming-"+node_id), operator.allocation_types.Authority)
    operator.add_action(main_net.get_node("Confirming-"+node_id), operator.allocation_types.Responsibility)

    # Add QOS to each edge (making an assumption that this needs to updated always but can change!)
    main_net.get_edge("Confirmation-"+node_id,node_id).user_data = UserData(0)


# We want to write this new graph to an output JSON file.
# Create a custom encoder for the user data that should fill the QOS
def custom_user_encode(user_data):

    user_data = {
                "QOS": user_data.QOS,
            }
    
    return user_data


# Let's output as a TikZ graph
# l.visualize(main_net, "tikzout3.tex")

''' Alter networks after they have been read 
    from JSON.
'''
# This will generate the graph without a highlighted strategy, comment this out if you use on the highlighted strategies below
app = CytoscapeApp(data_dict, RoverDataHandler)
# Type in the edges you wish to highlight in format ("node1", "node2") as seen in examples below
# Included are two different examples of highlighted strategies, uncomment the one you wish to visualize
# Strategy without updating PP for NWS
# app = CytoscapeApp(data_dict, RoverDataHandler, [("GL", "RM"),("GL", "NWS"),("TM", "NWS"), ("Confirmation-NWS", "NWS"), ("NWS", "NWP"), ("NWP", "Confirming-NWS"), ("Confirming-NWS", "Confirmation-NWS"), ("NWP", "RM"), ("PP", "NWS"), ("PP", "RM")])
# Fully highlighted
# app = CytoscapeApp(data_dict, RoverDataHandler, [("GL", "RM"),("GL", "NWS"),("TM", "NWS"), ("NWS", "NWP"), ("NWP", "RM"), ("PP", "NWS"), ("PP", "RM"),("TM", "LAA"), ("CIMG", "LAA"), ("LAA", "RS"), ("RS", "CAM"), ("OS", "CAM"), ("CAM","CA"),("CA","IC"),("CA","OLL"),
#                                                    ("CIMG","OLL"), ("OLL", "OL"), ("OL", "RPP"),("RS","RPP"),("RS","IC"),("OS","RPP"),("TM","RPP"),("GL","RPP"),("RPP","PP"),("IC","CIMG"),("OST","RPP"),("OBL","RPP"),("TMP","OST"),("BLM","OBL"),
#                                                    ("Confirmation-NWS", "NWS"),("NWP", "Confirming-NWS"),("Confirming-NWS", "Confirmation-NWS"),("CIMG", "Confirming-IC"),("Confirming-IC", "IC"),("Confirming-IC","Confirmation-IC"),("Confirmation-IC","IC"),("PP","Confirming-RPP"),("Confirming-RPP","Confirmation-RPP"),("Confirmation-RPP","RPP"),
#                                                    ("OBL","Confirming-BLM"),("Confirming-BLM","Confirmation-BLM"),("Confirmation-BLM","BLM"),("OST","Confirming-TMP"),("Confirming-TMP","Confirmation-TMP"),
#                                                    ("Confirmation-TMP","TMP")])
# webbrowser.open_new("http://127.0.0.1:8050")
app.run()