hrt-demo / script.py
mijtsma3's picture
Upload script.py
0441cef verified
from core import networkdata as nd
from core.parsing.jsonparser import JSONParser
from core.parsing.jsonencoder import JSONEncoder
from cytoapp.cytoscapeapp import CytoscapeApp
from userapps.rovercase.roverdatahandler import RoverDataHandler
import itertools
import copy
import webbrowser
from core.visualization.tikzstandard import StandardTikzVisualizer as s
from core.visualization.tikzlayer import LayeredTikzVisualizer as l
### HRT Metrics Script
''' The folder containing the JSON data.
'''
directory: str = "data/RevisedWMC/"
''' The JSON files in the given folder
'''
data_sets: list[str] = [
"initial_model"
]
# Define a dictionary with edges as keys and lists of QOS values as values
# edge_qos_values = {
# # ("PP", "NWS"): [30, 31, 42, 43],
# ("PP", "NWS"): [10, 15, 20, 25, 30, 31, 42, 43],
# # ("PP", "NWS"): [0, 10, 15, 20],
# ("Confirmation-BLM", "BLM"): [0, 20, 30, 40]
# }
edge_qos_values = {
# "PP_NWS": [30, 31, 42, 43],
# ("PP", "NWS"): [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 110, 130, 150, 180, 210, 240, 270],
("PP", "NWS"): [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70],
# ("PP", "NWS"): [1, 2, 3, 5, 10, 15, 20, 25, 30, 40, 50, 60, 70, 80, 90, 100, 120, 140, 160, 200, 240, 320],
# ("Confirmation-BLM", "BLM"): [0, 20, 30, 40, 50, 60, 80]
# ("confirmation", ""): [0]
("confirmation", ""): [0,10,20,30,40,50,60,70,80,90,100]
}
# Class for user data
class UserData:
def __init__(self, QOS=1000000): # Default value set to 1000000
self.QOS = QOS
# Custom edge_user_data processor
def custom_user_parse(user_data):
''' The default behavior for parsing user data. These types of
functions recieve the data tied to the "UserData" entries in
JSON files and return what user_data within the network model
elements should be set to.
'''
QOS = user_data["QOS"]
# user_data = lambda: None
if QOS != "":
user_data = UserData(float(QOS))
else:
user_data = UserData(1000000) # Assumption for missing data
return user_data
''' Store the parsed network models in a dictionary.
'''
data_dict: dict[str, nd.NetworkModel] = {}
for name in data_sets:
data_dict[name] = JSONParser.parse(directory + name + ".json", e_user_data_func = custom_user_parse)
# Main network
main_net = data_dict["initial_model"]
# Define roles for both human and operator (is there a way to do this through the JSON?)
operator: nd.Agent = nd.Agent("Operator")
rover: nd.Agent = nd.Agent("Robot")
operator.add_action(main_net.get_node("LAA"), operator.allocation_types.Authority)
operator.add_action(main_net.get_node("OLL"), operator.allocation_types.Authority)
operator.add_action(main_net.get_node("CAM"), operator.allocation_types.Authority)
# Obstacle size estimation?
rover.add_action(main_net.get_node("BLM"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("TMP"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("RPP"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("NWS"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("IC"), rover.allocation_types.Authority)
rover.add_action(main_net.get_node("RM"), rover.allocation_types.Authority) # Remove this function?
# Responsibility
operator.add_action(main_net.get_node("BLM"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("TMP"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("RPP"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("NWS"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("IC"), operator.allocation_types.Responsibility)
operator.add_action(main_net.get_node("RM"), operator.allocation_types.Responsibility) # Remove this function?
# # Identify the shared resources
# shared_resources = []
# # Loop over all nodes in the graph
# for node_id in main_net.get_graph().nodes():
# node = main_net.get_node(node_id)
# # Check if WorkDomainResource
# if issubclass (node.__class__, nd.WorkDomainResource):
# # Get all neighboring nodes (actionNodes)
# functions_that_set = main_net.get_graph().predecessors(node_id)
# functions_that_get = main_net.get_graph().successors(node_id)
# # A list of all function pairs that are connected through this resources
# interdependent_functions = list(itertools.product(functions_that_get,functions_that_set))
# # For each pair, check whether roles is the same
# for pair in interdependent_functions:
# agent_setting = main_net.get_node(pair[0]).get_authorized_agent()
# agent_getting = main_net.get_node(pair[1]).get_authorized_agent()
# if agent_setting.id != agent_getting.id:
# shared_resources.append((node_id,pair))
# # For each of the shared resources, do something!
# # Current rule: When an information resource is shared between agents at the taskwork level,
# # then a function must be created at the teamwork level to show the relaying of information,
# # such that a shared awareness is formed at the social organizational level.
# for element in shared_resources:
# # print("There is a shared resource",element[0],"between ", element[1][0], "and", element[1][1])
# # Create a shared resources node and a teamwork function node
# soc_org_node = main_net.add_node(nd.Node("shared_"+element[0])) # TODO: Specify the node type/class
# teamwork_node = main_net.add_node(nd.ActionNode("sharing_"+element[0]))
# # Add edge going from teamwork node to shared_resource node
# main_net.add_edge("sharing_"+element[0],"shared_"+element[0])
# # Add edge going from original resource to the teamwork node
# main_net.add_edge(element[0],"sharing_"+element[0])
# # Add QOS to each edge (making an assumption that this needs to updated always but can change!)
# user_data = lambda: None
# user_data.QOS = 0
# # main_net.get_edge("sharing_"+element[0],"shared_"+element[0]).user_data = user_data # No QOS needed--because is set relationship
# main_net.get_edge(element[0],"sharing_"+element[0]).user_data = user_data
# Identify the shared resources
shared_actions = []
# Loop over all nodes in the graph
for node_id in main_net.get_graph().nodes():
node = main_net.get_node(node_id)
# Check if TaskworkAction
if issubclass (node.__class__, nd.TaskworkAction):
# Get all neighboring nodes (actionNodes)
authorized_agent = node.get_authorized_agent()
# responsible_agent = node.get_responsible_agent()
# For each pair, check whether roles is the same
if authorized_agent != operator:
shared_actions.append(node_id)
# For each of the shared action, do something!
# Current rule: When authority-responsibility mismatch, create a confirmation resources and an a confirmation
# action that is allocated to the operator
for node_id in shared_actions:
if node_id == "RM":
continue
# Create a coordination resource and a teamwork function node
soc_org_node = main_net.add_node(nd.CoordinationResource("Confirmation-"+node_id)) # TODO: Specify the node type/class
main_net.get_node("Confirmation-"+node_id).user_data = "Confirmation-"+node_id # Need to fill these for WMC parsing
teamwork_node = main_net.add_node(nd.TeamworkAction("Confirming-"+node_id))
main_net.get_node("Confirming-"+node_id).user_data = "Confirming-"+node_id # Need to fill these for WMC parsing
# Add edge going from teamwork node to coordination resource
main_net.add_edge("Confirming-"+node_id,"Confirmation-"+node_id)
main_net.get_edge("Confirming-"+node_id,"Confirmation-"+node_id).user_data = UserData(100000)
# Add edges going from coordination resource to original node
main_net.add_edge("Confirmation-"+node_id,node_id)
# Add edge going from work domain resources set by shared action to teamwork node
for wd_resource in main_net.get_graph().successors(node_id):
main_net.add_edge(wd_resource,"Confirming-"+node_id)
# Add QOS to each edge (making an assumption that this needs to updated always but can change!)
main_net.get_edge(wd_resource,"Confirming-"+node_id).user_data = UserData(1000000)
# Allocate authority and responsibility to human
operator.add_action(main_net.get_node("Confirming-"+node_id), operator.allocation_types.Authority)
operator.add_action(main_net.get_node("Confirming-"+node_id), operator.allocation_types.Responsibility)
# Add QOS to each edge (making an assumption that this needs to updated always but can change!)
main_net.get_edge("Confirmation-"+node_id,node_id).user_data = UserData(0)
# We want to write this new graph to an output JSON file.
# Create a custom encoder for the user data that should fill the QOS
def custom_user_encode(user_data):
user_data = {
"QOS": user_data.QOS,
}
return user_data
# Create all combinations of QOS values for the edges
qos_combinations = list(itertools.product(*edge_qos_values.values()))
# Loop over each combination to create copies of the main_net with these QOS values
for qos_combo in qos_combinations:
# Create a deep copy of the main_net to modify
temp_net = copy.deepcopy(main_net)
# Set the QOS values for the edges in the temp_net
for edge, qos_value in zip(edge_qos_values.keys(), qos_combo):
if edge[0] == "confirmation":
confirmation_nodes = [node for node in temp_net.get_node_ids() if node.startswith("Confirmation-")]
for full_node_id in confirmation_nodes:
node_id_after_dash = full_node_id.split("-", 1)[1]
print(full_node_id,node_id_after_dash)
temp_net.get_edge(full_node_id, node_id_after_dash).user_data = UserData(qos_value)
else:
print(edge[0])
temp_net.get_edge(*edge).user_data = UserData(qos_value)
# Write the data to a JSON file using the JSONEncoder and custom_user_encode
# The file name includes the QOS values for identification
file_name = "data/ForWMCreading/PPtoNWS_{}_conf_{}.json".format(*qos_combo)
JSONEncoder.encode(file_name, temp_net, e_user_data_func=custom_user_encode)
# Manually set the QOS for NWP to RM to 0. This is necessary because WMC schedules NWS but for
# the network analysis, this edge must be triggered
main_net.get_edge("NWP","RM").user_data = UserData(0)
# Now add attributes that represent when each WorkDomainResource and OrgResource was last updated
for node_id in main_net.get_node_ids():
node = main_net.get_node(node_id)
# Check if WorkDomainResource
if issubclass (node.__class__, nd.WorkDomainResource) or issubclass (node.__class__, nd.CoordinationResource):
user_data = lambda: None # Lambda function that we can add attributes to, could use to set last_update_time to zero.
user_data.last_update_time = 0#-1000000 # Something really small so that it updates the first time we run it.
node.user_data = user_data
# Check that previous is actually working
# for node_id in main_net.get_node_ids():
# node = main_net.get_node(node_id)
# # Check if WorkDomainResource
# if issubclass (node.__class__, nd.WorkDomainResource) or issubclass (node.__class__, nd.CoordinationResource):
# print(node.user_data.last_update_time)
# Check that QOS is loaded properly
# for edge_id in main_net.get_edge_ids():
# edge = main_net.get_edge(edge_id[0],edge_id[1])
# print(edge.user_data.QOS)
# Now we can do some funky things with the QOS requirements...
# First want to find edges that need updating based on when resources were last updated.
current_time = 32 #Needs something different here
# Manually change when resources were last updated
# Strategy for NWS without updating PP
# main_net.get_node("PP").user_data.last_update_time = current_time
# Strategy without cross-checking
# main_net.get_node("Confirmation-NWS").user_data.last_update_time = current_time
# main_net.get_node("Confirmation-BLM").user_data.last_update_time = current_time
# main_net.get_node("Confirmation-TMP").user_data.last_update_time = current_time
# main_net.get_node("Confirmation-RPP").user_data.last_update_time = current_time
# Strategy without obstacle location
# main_net.get_node("OL").user_data.last_update_time = current_time
# Strategy without battery level and temperature measurement
# main_net.get_node("OST").user_data.last_update_time = current_time
# main_net.get_node("OBL").user_data.last_update_time = current_time
out_of_bound_edges = []
# Identify which edges are out of QOS bound
for edge_id in main_net.get_edge_ids():
edge = main_net.get_edge(edge_id[0],edge_id[1])
# Check if the target is a function node
target_node = main_net.get_node(edge_id[1])
if issubclass (target_node.__class__, nd.ActionNode):
# Check when resource was last updated
source_node = main_net.get_node(edge_id[0])
last_update_time = source_node.user_data.last_update_time
QOS = edge.user_data.QOS
if float(current_time) - float(last_update_time) > float(QOS):
print("QOS for",source_node.id,"to",target_node.id,"is out of bounds with QOS",float(QOS),"and last update time",last_update_time)
out_of_bound_edges.append(edge_id)
# This function is used to identify functional nodes are predependencies for a target function
# based on QOS bounds and when resources were last updated.
# This function takes in a node id of a FUNCTION node that is the target
# It returns a list of function and resource nodes that need to be updated/executed to abide by QOS requirements
def add_predecessors_to_strategy(node_id, out_of_bound_edges, strategy_list, main_net):
# Get the node handle
node = main_net.get_node(node_id)
# If this node is not yet in the strategy, add it!
if node not in strategy_list:
strategy_list.append(node)
# Now loop over all incoming edges
for pred_node_id in main_net.get_graph().predecessors(node_id):
# And check whether it is within QOS bound
if (pred_node_id, node_id) in out_of_bound_edges:
# If out of QOS bound, need to add to our strategy (in need for an update)
strategy_list.append(main_net.get_node(pred_node_id))
# We need to add each of the functions that is linked to set this predecessor node
# and perform the same check to see if we need to add their predecessors
for pred_pred_node_id in main_net.get_graph().predecessors(pred_node_id):
if main_net.get_node(pred_pred_node_id) not in strategy_list:
add_predecessors_to_strategy(pred_pred_node_id, out_of_bound_edges, strategy_list, main_net)
return strategy_list
# Identify the strategy
strategy_list = add_predecessors_to_strategy('RM', out_of_bound_edges, [], main_net)
# # Identify edges to highlight (that are connecting the nodes in the strategy)
# connecting_edges = []
# for node in strategy_list:
# for neighbor in main_net.get_graph().neighbors(node.id):
# if main_net.get_node(neighbor) in strategy_list:
# edge = (node.id, neighbor) if (node.id, neighbor) in main_net.get_edge_ids() else (neighbor, node.id)
# if edge not in connecting_edges:
# connecting_edges.append(edge)
# Identify all incoming edges for every node in the strategy_list
connecting_edges = []
for node in strategy_list:
print(node.id)
for incoming_edge in main_net.get_graph().in_edges(node.id):
if incoming_edge not in connecting_edges:
connecting_edges.append(incoming_edge)
# Let's output as a TikZ graph
l.visualize(main_net, "tikzout3.tex")
''' Alter networks after they have been read
from JSON.
'''
# app = CytoscapeApp(data_dict, RoverDataHandler, connecting_edges)
app = CytoscapeApp(data_dict, RoverDataHandler)
webbrowser.open_new("http://127.0.0.1:8050")
app.run()