wnd-demo / src /viz.py
mijtsma3's picture
Unit included in WND offset labels
748df7b
import webbrowser
import dash as dcc
import dash as html
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
import networkx as nx
import datetime
class Visualizer:
def wnd_visualization(figure_handle, dataframe, current_time, start_time = 0, end_time = 10^6):
# Diagonal line y=x
figure_handle.add_trace(go.Scatter(x=[start_time, end_time], y=[start_time, end_time], mode='lines', line=dict(color='black', dash='dash'), showlegend=False))
# Dotted lines from the point to axes
figure_handle.add_trace(go.Scatter(x=[current_time, current_time, start_time], y=[start_time, current_time, current_time], mode='lines', line=dict(color='black', dash='dot'), showlegend=False))
# Adding scatter points with hover text
hover_text = [f"Time: {time}<br>Action: {name}" for time, name in dataframe[['time', 'actionName']].values]
figure_handle.add_trace(go.Scatter(
x=dataframe['time'],
y=dataframe['time'],
mode='markers',
marker=dict(color='black', size=15),
text=hover_text,
hoverinfo='text',
name='Action'
))
timestamps_and_ids_dict = {}
for index, row in dataframe.iterrows():
array_data = row['attributes']
if isinstance(array_data, str):
timestamp_id_pairs = [pair.split('|') for pair in array_data.split(';')]
else:
timestamp_id_pairs = []
timestamp_id_pairs.sort(key=lambda x: float(x[0]))
timestamp_id_dict = {float(pair[0]): (str(pair[1][:-14]),str(pair[1][-12:-9])) for pair in timestamp_id_pairs}
timestamps_and_ids_dict[row['time']] = timestamp_id_dict
timestamps_before_simulation_time = {timestamp: data for timestamp, data in timestamps_and_ids_dict.items() if timestamp <= current_time}
mental_model_over_time = {}
if timestamps_before_simulation_time:
for mm_time, array_timestamps_and_ids in timestamps_before_simulation_time.items():
for timestamp, value in array_timestamps_and_ids.items():
# value = value[:-14]
if value[0] in mental_model_over_time:
# Value is already in the dictionary, append the timestamp to the existing list
mental_model_over_time[value[0]].append((mm_time, timestamp, value[1]))
else:
# Value is not in the dictionary, create a new list with the timestamp
mental_model_over_time[value[0]] = [(mm_time, timestamp, value[1])]
# Get last update on mental model
previous_timestamp, array_timestamps_and_ids = max(timestamps_before_simulation_time.items(), key=lambda x: x[0])
# Assuming mental model does not change in between, load this as current mental model into dictionary
for timestamp, value in array_timestamps_and_ids.items():
# value = value[:-14]
print(value[1])
if value[0] in mental_model_over_time:
# Value is already in the dictionary, append the timestamp to the existing list
mental_model_over_time[value[0]].append((current_time, timestamp, value[1]))
else:
# Value is not in the dictionary, create a new list with the timestamp
mental_model_over_time[value[0]] = [(current_time, timestamp, value[1])]
# Add text box with name of event
# figure_handle.add_annotation(x=current_time, y=timestamp, text=value[0], showarrow=True, arrowhead=1, arrowcolor='black', arrowwidth=2)
colors = ["blue", "green", "red", "orange", "purple", "yellow"] # Define a list of colors for each trace
color_index = 0 # Initialize color index
for label, coordinates in mental_model_over_time.items():
x = [coord[0] for coord in coordinates]
y = [coord[1] for coord in coordinates]
# Assuming 'z' is the third dimension you want to use for line thickness
z = [float(coord[2].strip('%')) for coord in coordinates]
max_thickness = 20 # Define the maximum line thickness
min_thickness = 1 # Define the minimum line thickness
# Normalize z to be within the range of min_thickness to max_thickness
# z_normalized = [(value - min(z)) / (max(z) - min(z)) * (max_thickness - min_thickness) + min_thickness for value in z]
z_normalized = [value/2 for value in z]
for i in range(len(x) - 1):
if i == 0 or y[i] != y[i-1]:
figure_handle.add_trace(go.Scatter(x=[x[i], x[i]], y=[x[i], y[i]], mode='lines', line=dict(color='gray'), showlegend=False, legendgroup=label))
# Add an annotation with the arrow
figure_handle.add_annotation(x=x[i], y=y[i], ax=x[i], ay=x[i], xref="x", yref="y", axref="x", ayref="y", showarrow=True, arrowhead=2, arrowsize=1.5, arrowwidth=2, arrowcolor="gray")
figure_handle.add_trace(go.Scatter(
x=[x[i], x[i+1]],
y=[y[i], y[i+1]],
mode='lines',
line=dict(
color=colors[color_index], # Use the color from the list
width=z_normalized[i]
),
opacity=0.4, # Set the opacity to 50%
name=label,
legendgroup=label,
showlegend=(i==0) # Show legend only for the first segment
))
color_index = (color_index + 1) % len(colors) # Update color index for the next trace
# ----- Custom Tick and "Now" Label Logic -----
tick_offsets = [-180, -120, -60, 0, 60, 120, 180]
tick_vals = [current_time + offset for offset in tick_offsets]
tick_texts = [f"{offset:+}" if offset != 0 else "Now" for offset in tick_offsets]
# Replace +0 with "Now", rest: e.g., -180, -120, -60, Now, +60, +120, +180
tick_texts = []
for offset in tick_offsets:
if offset == 0:
tick_texts.append("Now")
else:
# always sign and append (s), e.g. "+200(s)", "-100(s)"
tick_texts.append("{:+}(s)".format(offset))
# Update layout with manual ticks and custom 'Now' at both axes, with annotation reinforcing "Now"
figure_handle.update_layout(
xaxis_title='Real Time',
yaxis_title='Comprehension of Timeline of Events',
# showlegend=False,
xaxis=dict(
showgrid=False,
showline=True,
linecolor='black',
range=[start_time, end_time],
tickvals=tick_vals,
ticktext=tick_texts,
tickfont=dict(size=12)
),
yaxis=dict(
showgrid=False,
showline=True,
linecolor='black',
range=[start_time, end_time],
scaleanchor="x",
scaleratio=1,
tickvals=tick_vals,
ticktext=tick_texts,
tickfont=dict(size=12)
),
plot_bgcolor='white', # Set plot background color to white
paper_bgcolor='white' # Set paper background color to white
)
return figure_handle
def plot_trajectory(figure_handle, parsed_data, uptime=float('inf'), background_image=None):
fig = figure_handle
if background_image:
fig.add_layout_image(
source=background_image,
xref="x",
yref="y",
x=-96.88,
y=33.16,
sizex=0.23,
sizey=0.44,
sizing="fill",
opacity=0.5,
layer="below"
)
else:
fig.update_layout(
mapbox_style="open-street-map",
mapbox_center_lon=-96.765,
mapbox_center_lat=32.95,
mapbox_zoom=10
)
# Create a graph for the flight plans
G = nx.Graph()
# Define waypoints and their positions with new names from the embedded code
waypoints = {
"NTI": (-96.7535, 32.928), "NTHW": (-96.749, 32.8573), "RUBL": (-96.7588, 32.78),
"HW342": (-96.8003, 32.7508), "TLWY": (-96.807, 32.769), "4DT": (-96.8508, 32.846),
"T57": (-96.686, 32.888), "FSC": (-96.8213, 33.1413), "PLN": (-96.7275, 33.0297)
}
# Add edges between waypoints to the graph with updated names
edges = [("NTI", "NTHW"), ("NTHW", "RUBL"), ("RUBL", "HW342"), ("HW342", "TLWY"),
("TLWY", "4DT"), ("T57", "NTI"), ("FSC", "PLN"),("PLN","NTI")]
G.add_edges_from(edges)
# Draw the graph with smaller waypoint symbols in gray and keep the node labels
for edge in G.edges():
x_values = [waypoints[edge[0]][0], waypoints[edge[1]][0]]
y_values = [waypoints[edge[0]][1], waypoints[edge[1]][1]]
fig.add_trace(go.Scattermapbox(lon=x_values, lat=y_values, mode='lines', line=dict(color='gray'), showlegend=False))#, dash='dash'
for node in G.nodes():
x_value = waypoints[node][0]
y_value = waypoints[node][1]
fig.add_trace(go.Scattermapbox(lon=[x_value], lat=[y_value], mode='markers', marker=dict(size=5, color='gray'), showlegend=False))#, symbol='pentagon'
# for node, (x_value, y_value) in waypoints.items():
# fig.add_annotation(lon=x_value, lat=y_value, text=node, showarrow=False, font=dict(size=6, color='gray'), xanchor='left', yanchor='bottom')
# Add legend label for waypoints and corridors
legend_labels = []
legend_labels.append("Corridors")
legend_labels.append("Waypoints")
colors = ['blue', 'red', 'green', 'purple', 'orange', 'yellow', 'pink', 'cyan', 'magenta', 'brown']
color_index = 0
for ac_name, data in parsed_data.items():
mask = (data['time'] >= 0) & (data['time'] <= uptime)
filtered_xdata = data['longitude_deg'][mask]
filtered_ydata = data['latitude_deg'][mask]
filtered_heading = data['heading_deg'][mask]
filtered_altitude = data['altitude_ft'][mask] # In case we ever want to do 3D visualizations
fig.add_trace(go.Scattermapbox(#Scatter3d possibly for 3D but issue with Scattermapbox not being compatible
mode="lines",
lon=filtered_xdata,
lat=filtered_ydata,
line=dict(width=2, color=colors[color_index]),
name=ac_name + " Flight Path"
))
if len(filtered_xdata) > 0 and len(filtered_ydata) > 0:
current_heading = filtered_heading.iloc[-1]
fig.add_trace(go.Scattermapbox(
mode="markers",
lon=[filtered_xdata.iloc[-1]],
lat=[filtered_ydata.iloc[-1]],
marker=dict(size=10, color=colors[color_index]),#, symbol=['cross']), #To make custom markers, you need MapBox access token (to use any of Mapbox's tools, APIs, or SDK)
name=ac_name + " Current Location"
))
color_index = (color_index + 1) % len(colors)
fig.update_layout(
mapbox=dict(
center=dict(lon=-96.765, lat=32.95),
zoom=9
),
margin=dict(l=20, r=20, t=20, b=20)
)
return fig
def timeline(figure_handle, events, current_time = 0, start_date = 0, end_date = 0):
category_order = sorted(list(set([e['agent'] for e in events])))
figure_handle = px.timeline(
events, x_start="start", x_end="end", y="agent",
hover_data=['attributes', 'duration'],
color='name',# height=400, width=1600,
category_orders={'agent': category_order},
opacity=0.7
)
# Add a vertical dashed line at x = current_time
current_datetime = start_date + datetime.timedelta(seconds=current_time)
figure_handle.add_shape(
type="line",
x0=current_datetime, y0=0,
x1=current_datetime, y1=1,
yref="paper",
line=dict(
color="Black",
width=2,
dash="dash",
)
)
figure_handle.update_layout(
xaxis_range=[start_date, end_date]#,
# width=600,
# height=500
)
return figure_handle