| from core import networkdata as nd | |
| from core.parsing.jsonparser import JSONParser | |
| from core.parsing.jsonencoder import JSONEncoder | |
| from cytoapp.cytoscapeapp import CytoscapeApp | |
| from userapps.rovercase.roverdatahandler import RoverDataHandler | |
| import itertools | |
| import copy | |
| import webbrowser | |
| from core.visualization.tikzstandard import StandardTikzVisualizer as s | |
| from core.visualization.tikzlayer import LayeredTikzVisualizer as l | |
| ### HRT Metrics Script | |
| ''' The folder containing the JSON data. | |
| ''' | |
| directory: str = "data/RevisedWMC/" | |
| ''' The JSON files in the given folder | |
| ''' | |
| data_sets: list[str] = [ | |
| "initial_model" | |
| ] | |
| # Define a dictionary with edges as keys and lists of QOS values as values | |
| # edge_qos_values = { | |
| # # ("PP", "NWS"): [30, 31, 42, 43], | |
| # ("PP", "NWS"): [10, 15, 20, 25, 30, 31, 42, 43], | |
| # # ("PP", "NWS"): [0, 10, 15, 20], | |
| # ("Confirmation-BLM", "BLM"): [0, 20, 30, 40] | |
| # } | |
| edge_qos_values = { | |
| # "PP_NWS": [30, 31, 42, 43], | |
| # ("PP", "NWS"): [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 110, 130, 150, 180, 210, 240, 270], | |
| ("PP", "NWS"): [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70], | |
| # ("PP", "NWS"): [1, 2, 3, 5, 10, 15, 20, 25, 30, 40, 50, 60, 70, 80, 90, 100, 120, 140, 160, 200, 240, 320], | |
| # ("Confirmation-BLM", "BLM"): [0, 20, 30, 40, 50, 60, 80] | |
| # ("confirmation", ""): [0] | |
| ("confirmation", ""): [0,10,20,30,40,50,60,70,80,90,100] | |
| } | |
| # Class for user data | |
| class UserData: | |
| def __init__(self, QOS=1000000): # Default value set to 1000000 | |
| self.QOS = QOS | |
| # Custom edge_user_data processor | |
| def custom_user_parse(user_data): | |
| ''' The default behavior for parsing user data. These types of | |
| functions recieve the data tied to the "UserData" entries in | |
| JSON files and return what user_data within the network model | |
| elements should be set to. | |
| ''' | |
| QOS = user_data["QOS"] | |
| # user_data = lambda: None | |
| if QOS != "": | |
| user_data = UserData(float(QOS)) | |
| else: | |
| user_data = UserData(1000000) # Assumption for missing data | |
| return user_data | |
| ''' Store the parsed network models in a dictionary. | |
| ''' | |
| data_dict: dict[str, nd.NetworkModel] = {} | |
| for name in data_sets: | |
| data_dict[name] = JSONParser.parse(directory + name + ".json", e_user_data_func = custom_user_parse) | |
| # Main network | |
| main_net = data_dict["initial_model"] | |
| # Define roles for both human and operator (is there a way to do this through the JSON?) | |
| operator: nd.Agent = nd.Agent("Operator") | |
| rover: nd.Agent = nd.Agent("Robot") | |
| operator.add_action(main_net.get_node("LAA"), operator.allocation_types.Authority) | |
| operator.add_action(main_net.get_node("OLL"), operator.allocation_types.Authority) | |
| operator.add_action(main_net.get_node("CAM"), operator.allocation_types.Authority) | |
| # Obstacle size estimation? | |
| rover.add_action(main_net.get_node("BLM"), rover.allocation_types.Authority) | |
| rover.add_action(main_net.get_node("TMP"), rover.allocation_types.Authority) | |
| rover.add_action(main_net.get_node("RPP"), rover.allocation_types.Authority) | |
| rover.add_action(main_net.get_node("NWS"), rover.allocation_types.Authority) | |
| rover.add_action(main_net.get_node("IC"), rover.allocation_types.Authority) | |
| rover.add_action(main_net.get_node("RM"), rover.allocation_types.Authority) # Remove this function? | |
| # Responsibility | |
| operator.add_action(main_net.get_node("BLM"), operator.allocation_types.Responsibility) | |
| operator.add_action(main_net.get_node("TMP"), operator.allocation_types.Responsibility) | |
| operator.add_action(main_net.get_node("RPP"), operator.allocation_types.Responsibility) | |
| operator.add_action(main_net.get_node("NWS"), operator.allocation_types.Responsibility) | |
| operator.add_action(main_net.get_node("IC"), operator.allocation_types.Responsibility) | |
| operator.add_action(main_net.get_node("RM"), operator.allocation_types.Responsibility) # Remove this function? | |
| # # Identify the shared resources | |
| # shared_resources = [] | |
| # # Loop over all nodes in the graph | |
| # for node_id in main_net.get_graph().nodes(): | |
| # node = main_net.get_node(node_id) | |
| # # Check if WorkDomainResource | |
| # if issubclass (node.__class__, nd.WorkDomainResource): | |
| # # Get all neighboring nodes (actionNodes) | |
| # functions_that_set = main_net.get_graph().predecessors(node_id) | |
| # functions_that_get = main_net.get_graph().successors(node_id) | |
| # # A list of all function pairs that are connected through this resources | |
| # interdependent_functions = list(itertools.product(functions_that_get,functions_that_set)) | |
| # # For each pair, check whether roles is the same | |
| # for pair in interdependent_functions: | |
| # agent_setting = main_net.get_node(pair[0]).get_authorized_agent() | |
| # agent_getting = main_net.get_node(pair[1]).get_authorized_agent() | |
| # if agent_setting.id != agent_getting.id: | |
| # shared_resources.append((node_id,pair)) | |
| # # For each of the shared resources, do something! | |
| # # Current rule: When an information resource is shared between agents at the taskwork level, | |
| # # then a function must be created at the teamwork level to show the relaying of information, | |
| # # such that a shared awareness is formed at the social organizational level. | |
| # for element in shared_resources: | |
| # # print("There is a shared resource",element[0],"between ", element[1][0], "and", element[1][1]) | |
| # # Create a shared resources node and a teamwork function node | |
| # soc_org_node = main_net.add_node(nd.Node("shared_"+element[0])) # TODO: Specify the node type/class | |
| # teamwork_node = main_net.add_node(nd.ActionNode("sharing_"+element[0])) | |
| # # Add edge going from teamwork node to shared_resource node | |
| # main_net.add_edge("sharing_"+element[0],"shared_"+element[0]) | |
| # # Add edge going from original resource to the teamwork node | |
| # main_net.add_edge(element[0],"sharing_"+element[0]) | |
| # # Add QOS to each edge (making an assumption that this needs to updated always but can change!) | |
| # user_data = lambda: None | |
| # user_data.QOS = 0 | |
| # # main_net.get_edge("sharing_"+element[0],"shared_"+element[0]).user_data = user_data # No QOS needed--because is set relationship | |
| # main_net.get_edge(element[0],"sharing_"+element[0]).user_data = user_data | |
| # Identify the shared resources | |
| shared_actions = [] | |
| # Loop over all nodes in the graph | |
| for node_id in main_net.get_graph().nodes(): | |
| node = main_net.get_node(node_id) | |
| # Check if TaskworkAction | |
| if issubclass (node.__class__, nd.TaskworkAction): | |
| # Get all neighboring nodes (actionNodes) | |
| authorized_agent = node.get_authorized_agent() | |
| # responsible_agent = node.get_responsible_agent() | |
| # For each pair, check whether roles is the same | |
| if authorized_agent != operator: | |
| shared_actions.append(node_id) | |
| # For each of the shared action, do something! | |
| # Current rule: When authority-responsibility mismatch, create a confirmation resources and an a confirmation | |
| # action that is allocated to the operator | |
| for node_id in shared_actions: | |
| if node_id == "RM": | |
| continue | |
| # Create a coordination resource and a teamwork function node | |
| soc_org_node = main_net.add_node(nd.CoordinationResource("Confirmation-"+node_id)) # TODO: Specify the node type/class | |
| main_net.get_node("Confirmation-"+node_id).user_data = "Confirmation-"+node_id # Need to fill these for WMC parsing | |
| teamwork_node = main_net.add_node(nd.TeamworkAction("Confirming-"+node_id)) | |
| main_net.get_node("Confirming-"+node_id).user_data = "Confirming-"+node_id # Need to fill these for WMC parsing | |
| # Add edge going from teamwork node to coordination resource | |
| main_net.add_edge("Confirming-"+node_id,"Confirmation-"+node_id) | |
| main_net.get_edge("Confirming-"+node_id,"Confirmation-"+node_id).user_data = UserData(100000) | |
| # Add edges going from coordination resource to original node | |
| main_net.add_edge("Confirmation-"+node_id,node_id) | |
| # Add edge going from work domain resources set by shared action to teamwork node | |
| for wd_resource in main_net.get_graph().successors(node_id): | |
| main_net.add_edge(wd_resource,"Confirming-"+node_id) | |
| # Add QOS to each edge (making an assumption that this needs to updated always but can change!) | |
| main_net.get_edge(wd_resource,"Confirming-"+node_id).user_data = UserData(1000000) | |
| # Allocate authority and responsibility to human | |
| operator.add_action(main_net.get_node("Confirming-"+node_id), operator.allocation_types.Authority) | |
| operator.add_action(main_net.get_node("Confirming-"+node_id), operator.allocation_types.Responsibility) | |
| # Add QOS to each edge (making an assumption that this needs to updated always but can change!) | |
| main_net.get_edge("Confirmation-"+node_id,node_id).user_data = UserData(0) | |
| # We want to write this new graph to an output JSON file. | |
| # Create a custom encoder for the user data that should fill the QOS | |
| def custom_user_encode(user_data): | |
| user_data = { | |
| "QOS": user_data.QOS, | |
| } | |
| return user_data | |
| # Create all combinations of QOS values for the edges | |
| qos_combinations = list(itertools.product(*edge_qos_values.values())) | |
| # Loop over each combination to create copies of the main_net with these QOS values | |
| for qos_combo in qos_combinations: | |
| # Create a deep copy of the main_net to modify | |
| temp_net = copy.deepcopy(main_net) | |
| # Set the QOS values for the edges in the temp_net | |
| for edge, qos_value in zip(edge_qos_values.keys(), qos_combo): | |
| if edge[0] == "confirmation": | |
| confirmation_nodes = [node for node in temp_net.get_node_ids() if node.startswith("Confirmation-")] | |
| for full_node_id in confirmation_nodes: | |
| node_id_after_dash = full_node_id.split("-", 1)[1] | |
| print(full_node_id,node_id_after_dash) | |
| temp_net.get_edge(full_node_id, node_id_after_dash).user_data = UserData(qos_value) | |
| else: | |
| print(edge[0]) | |
| temp_net.get_edge(*edge).user_data = UserData(qos_value) | |
| # Write the data to a JSON file using the JSONEncoder and custom_user_encode | |
| # The file name includes the QOS values for identification | |
| file_name = "data/ForWMCreading/PPtoNWS_{}_conf_{}.json".format(*qos_combo) | |
| JSONEncoder.encode(file_name, temp_net, e_user_data_func=custom_user_encode) | |
| # Manually set the QOS for NWP to RM to 0. This is necessary because WMC schedules NWS but for | |
| # the network analysis, this edge must be triggered | |
| main_net.get_edge("NWP","RM").user_data = UserData(0) | |
| # Now add attributes that represent when each WorkDomainResource and OrgResource was last updated | |
| for node_id in main_net.get_node_ids(): | |
| node = main_net.get_node(node_id) | |
| # Check if WorkDomainResource | |
| if issubclass (node.__class__, nd.WorkDomainResource) or issubclass (node.__class__, nd.CoordinationResource): | |
| user_data = lambda: None # Lambda function that we can add attributes to, could use to set last_update_time to zero. | |
| user_data.last_update_time = 0#-1000000 # Something really small so that it updates the first time we run it. | |
| node.user_data = user_data | |
| # Check that previous is actually working | |
| # for node_id in main_net.get_node_ids(): | |
| # node = main_net.get_node(node_id) | |
| # # Check if WorkDomainResource | |
| # if issubclass (node.__class__, nd.WorkDomainResource) or issubclass (node.__class__, nd.CoordinationResource): | |
| # print(node.user_data.last_update_time) | |
| # Check that QOS is loaded properly | |
| # for edge_id in main_net.get_edge_ids(): | |
| # edge = main_net.get_edge(edge_id[0],edge_id[1]) | |
| # print(edge.user_data.QOS) | |
| # Now we can do some funky things with the QOS requirements... | |
| # First want to find edges that need updating based on when resources were last updated. | |
| current_time = 32 #Needs something different here | |
| # Manually change when resources were last updated | |
| # Strategy for NWS without updating PP | |
| # main_net.get_node("PP").user_data.last_update_time = current_time | |
| # Strategy without cross-checking | |
| # main_net.get_node("Confirmation-NWS").user_data.last_update_time = current_time | |
| # main_net.get_node("Confirmation-BLM").user_data.last_update_time = current_time | |
| # main_net.get_node("Confirmation-TMP").user_data.last_update_time = current_time | |
| # main_net.get_node("Confirmation-RPP").user_data.last_update_time = current_time | |
| # Strategy without obstacle location | |
| # main_net.get_node("OL").user_data.last_update_time = current_time | |
| # Strategy without battery level and temperature measurement | |
| # main_net.get_node("OST").user_data.last_update_time = current_time | |
| # main_net.get_node("OBL").user_data.last_update_time = current_time | |
| out_of_bound_edges = [] | |
| # Identify which edges are out of QOS bound | |
| for edge_id in main_net.get_edge_ids(): | |
| edge = main_net.get_edge(edge_id[0],edge_id[1]) | |
| # Check if the target is a function node | |
| target_node = main_net.get_node(edge_id[1]) | |
| if issubclass (target_node.__class__, nd.ActionNode): | |
| # Check when resource was last updated | |
| source_node = main_net.get_node(edge_id[0]) | |
| last_update_time = source_node.user_data.last_update_time | |
| QOS = edge.user_data.QOS | |
| if float(current_time) - float(last_update_time) > float(QOS): | |
| print("QOS for",source_node.id,"to",target_node.id,"is out of bounds with QOS",float(QOS),"and last update time",last_update_time) | |
| out_of_bound_edges.append(edge_id) | |
| # This function is used to identify functional nodes are predependencies for a target function | |
| # based on QOS bounds and when resources were last updated. | |
| # This function takes in a node id of a FUNCTION node that is the target | |
| # It returns a list of function and resource nodes that need to be updated/executed to abide by QOS requirements | |
| def add_predecessors_to_strategy(node_id, out_of_bound_edges, strategy_list, main_net): | |
| # Get the node handle | |
| node = main_net.get_node(node_id) | |
| # If this node is not yet in the strategy, add it! | |
| if node not in strategy_list: | |
| strategy_list.append(node) | |
| # Now loop over all incoming edges | |
| for pred_node_id in main_net.get_graph().predecessors(node_id): | |
| # And check whether it is within QOS bound | |
| if (pred_node_id, node_id) in out_of_bound_edges: | |
| # If out of QOS bound, need to add to our strategy (in need for an update) | |
| strategy_list.append(main_net.get_node(pred_node_id)) | |
| # We need to add each of the functions that is linked to set this predecessor node | |
| # and perform the same check to see if we need to add their predecessors | |
| for pred_pred_node_id in main_net.get_graph().predecessors(pred_node_id): | |
| if main_net.get_node(pred_pred_node_id) not in strategy_list: | |
| add_predecessors_to_strategy(pred_pred_node_id, out_of_bound_edges, strategy_list, main_net) | |
| return strategy_list | |
| # Identify the strategy | |
| strategy_list = add_predecessors_to_strategy('RM', out_of_bound_edges, [], main_net) | |
| # # Identify edges to highlight (that are connecting the nodes in the strategy) | |
| # connecting_edges = [] | |
| # for node in strategy_list: | |
| # for neighbor in main_net.get_graph().neighbors(node.id): | |
| # if main_net.get_node(neighbor) in strategy_list: | |
| # edge = (node.id, neighbor) if (node.id, neighbor) in main_net.get_edge_ids() else (neighbor, node.id) | |
| # if edge not in connecting_edges: | |
| # connecting_edges.append(edge) | |
| # Identify all incoming edges for every node in the strategy_list | |
| connecting_edges = [] | |
| for node in strategy_list: | |
| print(node.id) | |
| for incoming_edge in main_net.get_graph().in_edges(node.id): | |
| if incoming_edge not in connecting_edges: | |
| connecting_edges.append(incoming_edge) | |
| # Let's output as a TikZ graph | |
| l.visualize(main_net, "tikzout3.tex") | |
| ''' Alter networks after they have been read | |
| from JSON. | |
| ''' | |
| # app = CytoscapeApp(data_dict, RoverDataHandler, connecting_edges) | |
| app = CytoscapeApp(data_dict, RoverDataHandler) | |
| webbrowser.open_new("http://127.0.0.1:8050") | |
| app.run() |