| import webbrowser |
| import dash as dcc |
| import dash as html |
| import numpy as np |
| import plotly.graph_objects as go |
| import plotly.express as px |
| import networkx as nx |
| import datetime |
|
|
| class Visualizer: |
|
|
| def wnd_visualization(figure_handle, dataframe, current_time, start_time = 0, end_time = 10^6): |
| |
| figure_handle.add_trace(go.Scatter(x=[start_time, end_time], y=[start_time, end_time], mode='lines', line=dict(color='black', dash='dash'), showlegend=False)) |
|
|
| |
| figure_handle.add_trace(go.Scatter(x=[current_time, current_time, start_time], y=[start_time, current_time, current_time], mode='lines', line=dict(color='black', dash='dot'), showlegend=False)) |
|
|
| |
| hover_text = [f"Time: {time}<br>Action: {name}" for time, name in dataframe[['time', 'actionName']].values] |
| figure_handle.add_trace(go.Scatter( |
| x=dataframe['time'], |
| y=dataframe['time'], |
| mode='markers', |
| marker=dict(color='black', size=15), |
| text=hover_text, |
| hoverinfo='text', |
| name='Action' |
| )) |
|
|
| timestamps_and_ids_dict = {} |
|
|
| for index, row in dataframe.iterrows(): |
| array_data = row['attributes'] |
| if isinstance(array_data, str): |
| timestamp_id_pairs = [pair.split('|') for pair in array_data.split(';')] |
| else: |
| timestamp_id_pairs = [] |
| timestamp_id_pairs.sort(key=lambda x: float(x[0])) |
|
|
| timestamp_id_dict = {float(pair[0]): (str(pair[1][:-14]),str(pair[1][-12:-9])) for pair in timestamp_id_pairs} |
| timestamps_and_ids_dict[row['time']] = timestamp_id_dict |
|
|
| timestamps_before_simulation_time = {timestamp: data for timestamp, data in timestamps_and_ids_dict.items() if timestamp <= current_time} |
| mental_model_over_time = {} |
|
|
| if timestamps_before_simulation_time: |
| for mm_time, array_timestamps_and_ids in timestamps_before_simulation_time.items(): |
| for timestamp, value in array_timestamps_and_ids.items(): |
| |
| if value[0] in mental_model_over_time: |
| |
| mental_model_over_time[value[0]].append((mm_time, timestamp, value[1])) |
| else: |
| |
| mental_model_over_time[value[0]] = [(mm_time, timestamp, value[1])] |
|
|
| |
| previous_timestamp, array_timestamps_and_ids = max(timestamps_before_simulation_time.items(), key=lambda x: x[0]) |
|
|
| |
| for timestamp, value in array_timestamps_and_ids.items(): |
|
|
| |
| print(value[1]) |
| if value[0] in mental_model_over_time: |
| |
| mental_model_over_time[value[0]].append((current_time, timestamp, value[1])) |
| else: |
| |
| mental_model_over_time[value[0]] = [(current_time, timestamp, value[1])] |
|
|
| |
| |
|
|
| colors = ["blue", "green", "red", "orange", "purple", "yellow"] |
| color_index = 0 |
|
|
| for label, coordinates in mental_model_over_time.items(): |
| x = [coord[0] for coord in coordinates] |
| y = [coord[1] for coord in coordinates] |
|
|
| |
| z = [float(coord[2].strip('%')) for coord in coordinates] |
|
|
| max_thickness = 20 |
| min_thickness = 1 |
|
|
| |
| |
| z_normalized = [value/2 for value in z] |
|
|
| for i in range(len(x) - 1): |
| |
| if i == 0 or y[i] != y[i-1]: |
| figure_handle.add_trace(go.Scatter(x=[x[i], x[i]], y=[x[i], y[i]], mode='lines', line=dict(color='gray'), showlegend=False, legendgroup=label)) |
|
|
| |
| figure_handle.add_annotation(x=x[i], y=y[i], ax=x[i], ay=x[i], xref="x", yref="y", axref="x", ayref="y", showarrow=True, arrowhead=2, arrowsize=1.5, arrowwidth=2, arrowcolor="gray") |
|
|
| figure_handle.add_trace(go.Scatter( |
| x=[x[i], x[i+1]], |
| y=[y[i], y[i+1]], |
| mode='lines', |
| line=dict( |
| color=colors[color_index], |
| width=z_normalized[i] |
| ), |
| opacity=0.4, |
| name=label, |
| legendgroup=label, |
| showlegend=(i==0) |
| )) |
| |
| color_index = (color_index + 1) % len(colors) |
|
|
| |
| tick_offsets = [-180, -120, -60, 0, 60, 120, 180] |
| tick_vals = [current_time + offset for offset in tick_offsets] |
| tick_texts = [f"{offset:+}" if offset != 0 else "Now" for offset in tick_offsets] |
|
|
| |
| tick_texts = [] |
| for offset in tick_offsets: |
| if offset == 0: |
| tick_texts.append("Now") |
| else: |
| |
| tick_texts.append("{:+}(s)".format(offset)) |
|
|
| |
| figure_handle.update_layout( |
| xaxis_title='Real Time', |
| yaxis_title='Comprehension of Timeline of Events', |
| |
| xaxis=dict( |
| showgrid=False, |
| showline=True, |
| linecolor='black', |
| range=[start_time, end_time], |
| tickvals=tick_vals, |
| ticktext=tick_texts, |
| tickfont=dict(size=12) |
| ), |
| yaxis=dict( |
| showgrid=False, |
| showline=True, |
| linecolor='black', |
| range=[start_time, end_time], |
| scaleanchor="x", |
| scaleratio=1, |
| tickvals=tick_vals, |
| ticktext=tick_texts, |
| tickfont=dict(size=12) |
| ), |
| plot_bgcolor='white', |
| paper_bgcolor='white' |
| |
| ) |
|
|
| return figure_handle |
|
|
| def plot_trajectory(figure_handle, parsed_data, uptime=float('inf'), background_image=None): |
| fig = figure_handle |
|
|
| if background_image: |
| fig.add_layout_image( |
| source=background_image, |
| xref="x", |
| yref="y", |
| x=-96.88, |
| y=33.16, |
| sizex=0.23, |
| sizey=0.44, |
| sizing="fill", |
| opacity=0.5, |
| layer="below" |
| ) |
| else: |
| fig.update_layout( |
| mapbox_style="open-street-map", |
| mapbox_center_lon=-96.765, |
| mapbox_center_lat=32.95, |
| mapbox_zoom=10 |
| ) |
|
|
| |
| G = nx.Graph() |
|
|
| |
| waypoints = { |
| "NTI": (-96.7535, 32.928), "NTHW": (-96.749, 32.8573), "RUBL": (-96.7588, 32.78), |
| "HW342": (-96.8003, 32.7508), "TLWY": (-96.807, 32.769), "4DT": (-96.8508, 32.846), |
| "T57": (-96.686, 32.888), "FSC": (-96.8213, 33.1413), "PLN": (-96.7275, 33.0297) |
| } |
|
|
| |
| edges = [("NTI", "NTHW"), ("NTHW", "RUBL"), ("RUBL", "HW342"), ("HW342", "TLWY"), |
| ("TLWY", "4DT"), ("T57", "NTI"), ("FSC", "PLN"),("PLN","NTI")] |
| G.add_edges_from(edges) |
|
|
| |
| for edge in G.edges(): |
| x_values = [waypoints[edge[0]][0], waypoints[edge[1]][0]] |
| y_values = [waypoints[edge[0]][1], waypoints[edge[1]][1]] |
| fig.add_trace(go.Scattermapbox(lon=x_values, lat=y_values, mode='lines', line=dict(color='gray'), showlegend=False)) |
|
|
| for node in G.nodes(): |
| x_value = waypoints[node][0] |
| y_value = waypoints[node][1] |
| fig.add_trace(go.Scattermapbox(lon=[x_value], lat=[y_value], mode='markers', marker=dict(size=5, color='gray'), showlegend=False)) |
| |
| |
| |
| |
| legend_labels = [] |
| legend_labels.append("Corridors") |
| legend_labels.append("Waypoints") |
|
|
| colors = ['blue', 'red', 'green', 'purple', 'orange', 'yellow', 'pink', 'cyan', 'magenta', 'brown'] |
| color_index = 0 |
|
|
| for ac_name, data in parsed_data.items(): |
| mask = (data['time'] >= 0) & (data['time'] <= uptime) |
| filtered_xdata = data['longitude_deg'][mask] |
| filtered_ydata = data['latitude_deg'][mask] |
| filtered_heading = data['heading_deg'][mask] |
| filtered_altitude = data['altitude_ft'][mask] |
|
|
| fig.add_trace(go.Scattermapbox( |
| mode="lines", |
| lon=filtered_xdata, |
| lat=filtered_ydata, |
| line=dict(width=2, color=colors[color_index]), |
| name=ac_name + " Flight Path" |
| )) |
|
|
| if len(filtered_xdata) > 0 and len(filtered_ydata) > 0: |
| current_heading = filtered_heading.iloc[-1] |
| fig.add_trace(go.Scattermapbox( |
| mode="markers", |
| lon=[filtered_xdata.iloc[-1]], |
| lat=[filtered_ydata.iloc[-1]], |
| marker=dict(size=10, color=colors[color_index]), |
| name=ac_name + " Current Location" |
| )) |
|
|
| color_index = (color_index + 1) % len(colors) |
|
|
| fig.update_layout( |
| mapbox=dict( |
| center=dict(lon=-96.765, lat=32.95), |
| zoom=9 |
| ), |
| margin=dict(l=20, r=20, t=20, b=20) |
| ) |
|
|
| return fig |
|
|
| def timeline(figure_handle, events, current_time = 0, start_date = 0, end_date = 0): |
|
|
| category_order = sorted(list(set([e['agent'] for e in events]))) |
| |
| figure_handle = px.timeline( |
| events, x_start="start", x_end="end", y="agent", |
| hover_data=['attributes', 'duration'], |
| color='name', |
| category_orders={'agent': category_order}, |
| opacity=0.7 |
| ) |
|
|
| |
| current_datetime = start_date + datetime.timedelta(seconds=current_time) |
| figure_handle.add_shape( |
| type="line", |
| x0=current_datetime, y0=0, |
| x1=current_datetime, y1=1, |
| yref="paper", |
| line=dict( |
| color="Black", |
| width=2, |
| dash="dash", |
| ) |
| ) |
|
|
| figure_handle.update_layout( |
| xaxis_range=[start_date, end_date] |
| |
| |
| ) |
|
|
| return figure_handle |