{ "cells": [ { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "from core import networkdata as nd\n", "from core.parsing.jsonparser import JSONParser\n", "from core.parsing.jsonencoder import JSONEncoder\n", "from cytoapp.cytoscapeapp import CytoscapeApp\n", "from userapps.rovercase.roverdatahandler import RoverDataHandler\n", "import itertools\n", "import copy\n", "import webbrowser\n", "from core.visualization.tikzstandard import StandardTikzVisualizer as s\n", "from core.visualization.tikzlayer import LayeredTikzVisualizer as l" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 2: Constants and Initial Data" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "directory: str = \"data/RevisedWMC/\"\n", "data_sets: list[str] = [\"initial_model\"]\n", "edge_qos_values = {\n", " (\"PP\", \"NWS\"): [10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70],\n", " (\"confirmation\", \"\"): [0,10,20,30,40,50,60,70,80,90,100]\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 3: Class Definitions\n" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "class UserData:\n", " def __init__(self, QOS=1000000): # Default value set to 1000000\n", " self.QOS = QOS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 4: Function Definitions\n" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "def custom_user_parse(user_data):\n", " QOS = user_data[\"QOS\"]\n", " if QOS != \"\":\n", " user_data = UserData(float(QOS))\n", " else:\n", " user_data = UserData(1000000) # Assumption for missing data\n", " return user_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 5: Parse Network Models\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "data_dict: dict[str, nd.NetworkModel] = {}\n", "for name in data_sets:\n", " data_dict[name] = JSONParser.parse(directory + name + \".json\", e_user_data_func = custom_user_parse)\n", "main_net = data_dict[\"initial_model\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 6: Define Roles\n" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "operator: nd.Agent = nd.Agent(\"Operator\")\n", "rover: nd.Agent = nd.Agent(\"Robot\")\n", "\n", "operator.add_action(main_net.get_node(\"LAA\"), operator.allocation_types.Authority)\n", "operator.add_action(main_net.get_node(\"OLL\"), operator.allocation_types.Authority)\n", "operator.add_action(main_net.get_node(\"CAM\"), operator.allocation_types.Authority)\n", "\n", "rover.add_action(main_net.get_node(\"BLM\"), rover.allocation_types.Authority)\n", "rover.add_action(main_net.get_node(\"TMP\"), rover.allocation_types.Authority)\n", "rover.add_action(main_net.get_node(\"RPP\"), rover.allocation_types.Authority)\n", "rover.add_action(main_net.get_node(\"NWS\"), rover.allocation_types.Authority)\n", "rover.add_action(main_net.get_node(\"IC\"), rover.allocation_types.Authority)\n", "rover.add_action(main_net.get_node(\"RM\"), rover.allocation_types.Authority)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 7: Assign Responsibilities\n" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "shared_actions = []\n", "for node_id in main_net.get_graph().nodes():\n", " node = main_net.get_node(node_id)\n", " if issubclass(node.__class__, nd.TaskworkAction):\n", " authorized_agent = node.get_authorized_agent()\n", " if authorized_agent != operator:\n", " shared_actions.append(node_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 9: Handle Shared Actions\n" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "for node_id in shared_actions:\n", " if node_id == \"RM\":\n", " continue\n", " soc_org_node = main_net.add_node(nd.CoordinationResource(\"Confirmation-\"+node_id))\n", " main_net.get_node(\"Confirmation-\"+node_id).user_data = \"Confirmation-\"+node_id\n", " teamwork_node = main_net.add_node(nd.TeamworkAction(\"Confirming-\"+node_id))\n", " main_net.get_node(\"Confirming-\"+node_id).user_data = \"Confirming-\"+node_id\n", " main_net.add_edge(\"Confirming-\"+node_id,\"Confirmation-\"+node_id)\n", " main_net.get_edge(\"Confirming-\"+node_id,\"Confirmation-\"+node_id).user_data = UserData(100000)\n", " main_net.add_edge(\"Confirmation-\"+node_id,node_id)\n", " for wd_resource in main_net.get_graph().successors(node_id):\n", " main_net.add_edge(wd_resource,\"Confirming-\"+node_id)\n", " main_net.get_edge(wd_resource,\"Confirming-\"+node_id).user_data = UserData(1000000)\n", " operator.add_action(main_net.get_node(\"Confirming-\"+node_id), operator.allocation_types.Authority)\n", " operator.add_action(main_net.get_node(\"Confirming-\"+node_id), operator.allocation_types.Responsibility)\n", " main_net.get_edge(\"Confirmation-\"+node_id,node_id).user_data = UserData(0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 10: Custom Encoder\n" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "def custom_user_encode(user_data):\n", " user_data = {\"QOS\": user_data.QOS}\n", " return user_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 11: Create QOS Combinations\n" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "qos_combinations = list(itertools.product(*edge_qos_values.values()))\n", "for qos_combo in qos_combinations:\n", " temp_net = copy.deepcopy(main_net)\n", " for edge, qos_value in zip(edge_qos_values.keys(), qos_combo):\n", " if edge[0] == \"confirmation\":\n", " confirmation_nodes = [node for node in temp_net.get_node_ids() if node.startswith(\"Confirmation-\")]\n", " for full_node_id in confirmation_nodes:\n", " node_id_after_dash = full_node_id.split(\"-\", 1)[1]\n", " temp_net.get_edge(full_node_id, node_id_after_dash).user_data = UserData(qos_value)\n", " else:\n", " temp_net.get_edge(*edge).user_data = UserData(qos_value)\n", " file_name = \"data/ForWMCreading/PPtoNWS_{}_conf_{}.json\".format(*qos_combo)\n", " JSONEncoder.encode(file_name, temp_net, e_user_data_func=custom_user_encode)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 12: Manually Set QOS\n" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "main_net.get_edge(\"NWP\",\"RM\").user_data = UserData(0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 13: Add Last Update Time\n" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "for node_id in main_net.get_node_ids():\n", " node = main_net.get_node(node_id)\n", " if issubclass(node.__class__, nd.WorkDomainResource) or issubclass(node.__class__, nd.CoordinationResource):\n", " user_data = lambda: None\n", " user_data.last_update_time = 0\n", " node.user_data = user_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 14: Identify Out of Bound Edges\n" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "current_time = 32\n", "out_of_bound_edges = []\n", "for edge_id in main_net.get_edge_ids():\n", " edge = main_net.get_edge(edge_id[0],edge_id[1])\n", " target_node = main_net.get_node(edge_id[1])\n", " if issubclass(target_node.__class__, nd.ActionNode):\n", " source_node = main_net.get_node(edge_id[0])\n", " last_update_time = source_node.user_data.last_update_time\n", " QOS = edge.user_data.QOS\n", " if float(current_time) - float(last_update_time) > float(QOS):\n", " out_of_bound_edges.append(edge_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 15: Define Predecessor Function\n" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "def add_predecessors_to_strategy(node_id, out_of_bound_edges, strategy_list, main_net):\n", " node = main_net.get_node(node_id)\n", " if node not in strategy_list:\n", " strategy_list.append(node)\n", " for pred_node_id in main_net.get_graph().predecessors(node_id):\n", " if (pred_node_id, node_id) in out_of_bound_edges:\n", " strategy_list.append(main_net.get_node(pred_node_id))\n", " for pred_pred_node_id in main_net.get_graph().predecessors(pred_node_id):\n", " if main_net.get_node(pred_pred_node_id) not in strategy_list:\n", " add_predecessors_to_strategy(pred_pred_node_id, out_of_bound_edges, strategy_list, main_net)\n", " return strategy_list" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 16: Identify Strategy\n" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [], "source": [ "strategy_list = add_predecessors_to_strategy('RM', out_of_bound_edges, [], main_net)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 17: Identify Connecting Edges\n" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "connecting_edges = []\n", "for node in strategy_list:\n", " for incoming_edge in main_net.get_graph().in_edges(node.id):\n", " if incoming_edge not in connecting_edges:\n", " connecting_edges.append(incoming_edge)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 18: Output TikZ Graph (Optional)\n" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Generating node positions...\n", "\n", "Sim complete!\n", "\n" ] } ], "source": [ "l.visualize(main_net, \"tikzout_test.tex\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cell 19: Run the App" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "''' Alter networks after they have been read \n", " from JSON.\n", "'''\n", "# app = CytoscapeApp(data_dict, RoverDataHandler, connecting_edges)\n", "app = CytoscapeApp(data_dict, RoverDataHandler)\n", "# webbrowser.open_new(\"http://127.0.0.1:8050\")\n", "app.run()" ] } ], "metadata": { "kernelspec": { "display_name": "env-3.10.13", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" } }, "nbformat": 4, "nbformat_minor": 2 }