sisense-datamodel / src /streamlit_app.py
vyankatesh2003's picture
perspective list
9ae03df verified
import streamlit as st
import json
from collections import defaultdict, deque
from pyvis.network import Network
import tempfile
import os
import colorsys
# page config
st.set_page_config(
page_title="Sisense Table Relationship Graph",
page_icon="๐Ÿ”—",
layout="wide",
initial_sidebar_state="expanded"
)
@st.cache_data
def load_model(file_path=None, file_content=None):
"""load sisense model file"""
if file_content:
# load from uploaded file content
return json.loads(file_content)
elif file_path and os.path.exists(file_path):
# load from file path
with open(file_path, 'r') as f:
return json.load(f)
else:
# try default filename
default_path = 'assethub-v3.smodel'
if os.path.exists(default_path):
with open(default_path, 'r') as f:
return json.load(f)
raise FileNotFoundError("Model file not found. Please upload the .smodel file.")
@st.cache_data
def extract_perspectives(model):
"""extract all perspectives with their names and table oids"""
perspectives_dict = {}
dependencies = model.get('dependencies', {})
perspectives = dependencies.get('perspectives', [])
for perspective in perspectives:
name = perspective.get('name', 'Unnamed Perspective')
tables = perspective.get('tables', [])
# only include perspectives that have tables
if tables:
table_oids = set()
for table in tables:
table_oid = table.get('oid')
if table_oid:
table_oids.add(table_oid)
if table_oids:
perspectives_dict[name] = table_oids
return perspectives_dict
@st.cache_data
def build_mappings(model):
"""build table and column mappings"""
table_map = {}
column_map = {}
dataset_map = {}
for dataset in model.get('datasets', []):
dataset_oid = dataset.get('oid')
dataset_name = dataset.get('name', 'unknown')
dataset_map[dataset_oid] = dataset_name
schema = dataset.get('schema', {})
tables = schema.get('tables', [])
for table in tables:
table_oid = table.get('oid')
table_name = table.get('name', table.get('id', 'unknown'))
table_map[table_oid] = table_name
for column in table.get('columns', []):
column_oid = column.get('oid')
column_name = column.get('name', column.get('id', 'unknown'))
column_map[(dataset_oid, table_oid, column_oid)] = column_name
return table_map, column_map, dataset_map
@st.cache_data
def build_graph(model, table_map, column_map):
"""build graph from relations"""
graph = defaultdict(set)
join_details = []
for relation in model.get('relations', []):
columns = relation.get('columns', [])
if len(columns) != 2:
continue
col1 = columns[0]
col2 = columns[1]
table1_oid = col1.get('table')
table2_oid = col2.get('table')
dataset1_oid = col1.get('dataset')
dataset2_oid = col2.get('dataset')
column1_oid = col1.get('column')
column2_oid = col2.get('column')
col1_name = column_map.get((dataset1_oid, table1_oid, column1_oid), 'unknown')
col2_name = column_map.get((dataset2_oid, table2_oid, column2_oid), 'unknown')
graph[table1_oid].add(table2_oid)
graph[table2_oid].add(table1_oid)
join_details.append({
'table1': table1_oid,
'table2': table2_oid,
'table1_name': table_map.get(table1_oid, 'unknown'),
'table2_name': table_map.get(table2_oid, 'unknown'),
'column1': col1_name,
'column2': col2_name
})
return graph, join_details
@st.cache_data
def build_tree(root, graph, table_map):
"""build bfs tree structure"""
tree = {}
visited = set()
queue = deque([(root, 0)])
visited.add(root)
while queue:
current, level = queue.popleft()
if current not in tree:
tree[current] = {
'name': table_map.get(current, 'unknown'),
'level': level,
'children': []
}
for neighbor in graph.get(current, []):
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, level + 1))
tree[current]['children'].append(neighbor)
if neighbor not in tree:
tree[neighbor] = {
'name': table_map.get(neighbor, 'unknown'),
'level': level + 1,
'children': []
}
return tree
def find_root_table(graph, table_map):
"""find most connected table as root"""
if not graph:
return list(table_map.keys())[0] if table_map else None
max_connections = 0
root_table = None
for table_oid, connections in graph.items():
if len(connections) > max_connections:
max_connections = len(connections)
root_table = table_oid
return root_table
def get_bfs_levels(root, graph):
"""get nodes grouped by bfs level"""
levels = {}
visited = set()
queue = deque([(root, 0)])
visited.add(root)
while queue:
node, level = queue.popleft()
if level not in levels:
levels[level] = []
levels[level].append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, level + 1))
return levels
def prioritize_back_edges(back_edges_list, graph, top_n=None):
"""prioritize back edges by importance"""
edge_scores = []
for u, v in back_edges_list:
connections_u = len(graph.get(u, set()))
connections_v = len(graph.get(v, set()))
score = connections_u + connections_v
edge_scores.append((score, (u, v)))
edge_scores.sort(reverse=True)
if top_n:
return [edge for _, edge in edge_scores[:top_n]]
return [edge for _, edge in edge_scores]
def find_all_simple_paths(graph, source, target, max_depth=10):
"""
find all simple paths between source and target using dfs.
ensures no node or edge is traversed twice in any path.
"""
if source == target:
return [[source]]
if source not in graph or target not in graph:
return []
all_paths = []
def dfs(current, target, path, visited_nodes, visited_edges, depth):
"""
dfs to find all simple paths.
visited_nodes: set of nodes already in current path (prevents node revisits)
visited_edges: set of edges already traversed in current path (prevents edge revisits)
"""
if depth > max_depth:
return
if current == target:
all_paths.append(path[:])
return
for neighbor in graph.get(current, []):
# create edge tuple (normalized for undirected graph)
edge = tuple(sorted([current, neighbor]))
# ensure: node not visited AND edge not traversed in current path
if neighbor not in visited_nodes and edge not in visited_edges:
visited_nodes.add(neighbor)
visited_edges.add(edge)
path.append(neighbor)
dfs(neighbor, target, path, visited_nodes, visited_edges, depth + 1)
path.pop()
visited_edges.remove(edge)
visited_nodes.remove(neighbor)
visited_nodes = {source} # track nodes in current path
visited_edges = set() # track edges in current path
dfs(source, target, [source], visited_nodes, visited_edges, 0)
return all_paths
def create_pyvis_network(graph_data, table_map, tree_structure, join_lookup,
show_tree_edges=True, show_back_edges=True,
back_edges_list=None, max_back_edges=None,
show_levels=None, selected_node=None, tree_mode=False,
paths=None, path_colors=None, path_source=None, path_target=None):
"""create interactive pyvis network"""
net = Network(
height='800px',
width='100%',
bgcolor='#ffffff',
font_color='#000000',
directed=True,
layout='hierarchical' if (show_tree_edges or tree_mode) else None
)
# set physics options with edge repulsion and straight edges
# forceAtlas2Based provides natural edge repulsion and node spacing
# keep physics enabled with reduced intensity after stabilization for interactivity
if tree_mode or show_tree_edges:
# tree mode with edge repulsion
options_str = json.dumps({
"edges": {
"smooth": {
"enabled": False,
"type": "straight"
}
},
"physics": {
"enabled": True,
"forceAtlas2Based": {
"gravitationalConstant": -120,
"centralGravity": 0.01,
"springLength": 400,
"springConstant": 0.08,
"damping": 0.7,
"avoidOverlap": 1.2
},
"solver": "forceAtlas2Based",
"stabilization": {
"enabled": True,
"iterations": 300,
"fit": True,
"updateInterval": 25
},
"adaptiveTimestep": True
}
})
net.set_options(options_str)
else:
# force-directed with edge repulsion
options_str = json.dumps({
"edges": {
"smooth": {
"enabled": False,
"type": "straight"
}
},
"physics": {
"enabled": True,
"forceAtlas2Based": {
"gravitationalConstant": -120,
"centralGravity": 0.01,
"springLength": 300,
"springConstant": 0.08,
"damping": 0.7,
"avoidOverlap": 1.2
},
"solver": "forceAtlas2Based",
"stabilization": {
"enabled": True,
"iterations": 300,
"fit": True,
"updateInterval": 25
},
"adaptiveTimestep": True
}
})
net.set_options(options_str)
# note: physics intensity will be reduced via javascript after stabilization for better interactivity
# level colors
level_colors = {
0: '#3399ff', # blue
1: '#4dcc4d', # green
2: '#ff9933', # orange
3: '#cc66cc', # pink
4: '#9999ff', # purple
}
# collect all nodes that should be displayed
all_nodes = set()
if show_tree_edges:
for node in tree_structure.keys():
all_nodes.add(node)
if show_back_edges and back_edges_list:
for u, v in back_edges_list:
all_nodes.add(u)
all_nodes.add(v)
# add path nodes if paths are provided
path_nodes_set = set()
if paths:
for path in paths:
for node in path:
path_nodes_set.add(node)
all_nodes.add(node)
# filter by levels if specified
if show_levels is not None:
filtered_nodes = set()
for node in all_nodes:
level = tree_structure.get(node, {}).get('level', 99)
if level in show_levels:
filtered_nodes.add(node)
all_nodes = filtered_nodes
# determine node colors and borders based on paths
node_path_colors = {}
has_paths = paths and path_colors
if has_paths:
for i, path in enumerate(paths):
path_color = path_colors[i]
for node in path:
if node not in node_path_colors:
node_path_colors[node] = []
node_path_colors[node].append(path_color)
for node_oid in all_nodes:
level = tree_structure.get(node_oid, {}).get('level', 0)
base_color = level_colors.get(level, '#cccccc')
name = table_map.get(node_oid, 'unknown')
# highlight source and target nodes first (highest priority) - always black
if node_oid == path_source or node_oid == path_target:
# source and target nodes: black color with thick border
node_color = '#000000' # black
border_color = '#000000'
border_width = 6
opacity = 1.0
else:
# determine node color - if in paths, use path color(s), otherwise dim if paths exist
if node_oid in node_path_colors:
# if node is in multiple paths, use first path color or blend
node_color = node_path_colors[node_oid][0]
opacity = 1.0
else:
if has_paths:
# dim non-path nodes when paths are shown
node_color = '#e0e0e0' # light gray
opacity = 0.2
else:
node_color = base_color
opacity = 1.0
# set border for non-source/target nodes
if node_oid == selected_node:
border_color = '#ff0000'
border_width = 5
opacity = 1.0 # always fully visible when selected
elif node_oid in path_nodes_set:
border_color = '#0000ff' # blue border for path nodes
border_width = 4
opacity = 1.0 # always fully visible for path nodes
else:
border_color = '#cccccc' if has_paths else '#000000'
border_width = 1 if has_paths else 2
# convert color to rgba for opacity support
if opacity < 1.0:
# convert hex to rgb and add opacity
hex_color = node_color.lstrip('#')
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
node_color = f"rgba({r},{g},{b},{opacity})"
net.add_node(
node_oid,
label=name,
color=node_color,
level=level,
title=f"{name}<br>Level: {level}<br>ID: {node_oid}",
borderWidth=border_width,
borderColor=border_color,
font={'size': 16 if (node_oid == path_source or node_oid == path_target) else (14 if node_oid in path_nodes_set or node_oid == selected_node else 10), 'face': 'Consolas', 'color': '#ffffff' if (node_oid == path_source or node_oid == path_target) else '#000000'},
opacity=opacity
)
# collect path edges to identify which edges are part of paths
path_edges_set = set()
path_edge_to_path_index = {} # map edge to path index and color
if paths and path_colors:
for i, path in enumerate(paths):
path_color = path_colors[i]
for j in range(len(path) - 1):
u, v = path[j], path[j + 1]
edge_key = (u, v)
reverse_key = (v, u)
path_edges_set.add(edge_key)
path_edges_set.add(reverse_key)
path_edge_to_path_index[edge_key] = (i, path_color)
path_edge_to_path_index[reverse_key] = (i, path_color)
# add tree edges
if show_tree_edges:
tree_edges = set()
def collect_tree_edges(node_oid, tree, parent_oid=None):
if parent_oid:
tree_edges.add((parent_oid, node_oid))
for child_oid in tree[node_oid]['children']:
collect_tree_edges(child_oid, tree, node_oid)
root = find_root_table(graph_data, table_map)
if root in tree_structure:
collect_tree_edges(root, tree_structure)
for u, v in tree_edges:
if u in all_nodes and v in all_nodes:
edge_key = (u, v)
reverse_key = (v, u)
# check if this edge is part of a path
is_path_edge = edge_key in path_edges_set or reverse_key in path_edges_set
if is_path_edge:
# use path color
path_info = path_edge_to_path_index.get(edge_key) or path_edge_to_path_index.get(reverse_key)
if path_info:
path_idx, path_color = path_info
net.add_edge(
u, v,
label=f"P{path_idx+1}",
color=path_color,
width=4,
arrows='',
title=f"Path {path_idx+1}",
smooth=False,
font={'face': 'Consolas', 'size': 12},
opacity=1.0
)
else:
# regular edge - uniform gray, dim if paths are shown
if has_paths:
# use rgba for opacity
edge_color = 'rgba(128, 128, 128, 0.2)'
else:
edge_color = '#808080'
net.add_edge(
u, v,
label="",
color=edge_color,
width=2,
arrows='',
title="",
smooth=False
)
# add back edges
if show_back_edges and back_edges_list:
filtered_back_edges = back_edges_list
if max_back_edges:
filtered_back_edges = prioritize_back_edges(back_edges_list, graph_data, top_n=max_back_edges)
for u, v in filtered_back_edges:
if u in all_nodes and v in all_nodes:
edge_key = (u, v)
reverse_key = (v, u)
# check if this edge is part of a path
is_path_edge = edge_key in path_edges_set or reverse_key in path_edges_set
if is_path_edge:
# use path color
path_info = path_edge_to_path_index.get(edge_key) or path_edge_to_path_index.get(reverse_key)
if path_info:
path_idx, path_color = path_info
net.add_edge(
u, v,
label=f"P{path_idx+1}",
color=path_color,
width=4,
arrows='',
title=f"Path {path_idx+1}",
smooth=False,
font={'face': 'Consolas', 'size': 12},
opacity=1.0
)
else:
# regular edge - uniform gray, dim if paths are shown
if has_paths:
# use rgba for opacity
edge_color = 'rgba(128, 128, 128, 0.2)'
else:
edge_color = '#808080'
net.add_edge(
u, v,
label="",
color=edge_color,
width=2,
arrows='',
title="",
smooth=False
)
return net
def apply_edge_modifications(graph, join_details, removed_edges, added_edges):
"""apply edge modifications to graph and join_details"""
modified_graph = defaultdict(set)
modified_join_details = []
# copy original graph
for node, neighbors in graph.items():
modified_graph[node] = neighbors.copy()
# remove edges
for u, v in removed_edges:
if u in modified_graph:
modified_graph[u].discard(v)
if v in modified_graph:
modified_graph[v].discard(u)
# add edges
for edge_info in added_edges:
u = edge_info['table1']
v = edge_info['table2']
modified_graph[u].add(v)
modified_graph[v].add(u)
modified_join_details.append(edge_info)
# add original joins that weren't removed
for join in join_details:
u, v = join['table1'], join['table2']
if (u, v) not in removed_edges and (v, u) not in removed_edges:
modified_join_details.append(join)
return modified_graph, modified_join_details
def filter_by_perspective_tables(graph, join_details, table_map, perspective_tables):
"""filter graph and join_details to only include tables in perspectives"""
if not perspective_tables:
return graph, join_details
filtered_graph = defaultdict(set)
filtered_join_details = []
# filter graph - only keep edges where both nodes are in perspectives
for node, neighbors in graph.items():
if node in perspective_tables:
filtered_neighbors = {n for n in neighbors if n in perspective_tables}
if filtered_neighbors:
filtered_graph[node] = filtered_neighbors
# filter join_details - only keep joins where both tables are in perspectives
for join in join_details:
u, v = join['table1'], join['table2']
if u in perspective_tables and v in perspective_tables:
filtered_join_details.append(join)
return filtered_graph, filtered_join_details
# main app
st.title("๐Ÿ”— Sisense Table Relationship Graph")
st.markdown("Interactive visualization of table relationships with dynamic edge management")
# initialize session state for edge management
if 'removed_edges' not in st.session_state:
st.session_state.removed_edges = set()
if 'added_edges' not in st.session_state:
st.session_state.added_edges = []
if 'path_source' not in st.session_state:
st.session_state.path_source = None
if 'path_target' not in st.session_state:
st.session_state.path_target = None
if 'uploaded_model' not in st.session_state:
st.session_state.uploaded_model = None
if 'selected_path_number' not in st.session_state:
st.session_state.selected_path_number = None
if 'selected_perspective' not in st.session_state:
st.session_state.selected_perspective = None
# file upload section
st.sidebar.header("๐Ÿ“ Model File")
# try file upload first
uploaded_file = None
try:
uploaded_file = st.sidebar.file_uploader(
"Upload Sisense Model File",
type=['smodel', 'json'],
help="Upload your assethub-v3.smodel file or any .smodel/.json file. If you get a 403 error, use the text input below instead.",
key="model_uploader"
)
except Exception as e:
st.sidebar.warning(f"โš ๏ธ File uploader error: {str(e)}")
st.sidebar.info("๐Ÿ’ก Use the text input below to paste your JSON content instead.")
# alternative: paste JSON content directly
st.sidebar.subheader("Or Paste JSON Content")
pasted_content = st.sidebar.text_area(
"Paste your .smodel file content here (if file upload doesn't work)",
height=100,
help="If file upload gives a 403 error, copy and paste the entire JSON content from your .smodel file here",
key="pasted_model"
)
# handle file upload or pasted content
model_file_content = None
if uploaded_file is not None:
try:
# read uploaded file content
model_file_content = uploaded_file.read().decode('utf-8')
st.session_state.uploaded_model = model_file_content
st.sidebar.success(f"โœ… Loaded: {uploaded_file.name}")
except Exception as e:
st.sidebar.error(f"โŒ Error reading file: {str(e)}")
st.sidebar.info("๐Ÿ’ก Try using the text input above to paste the content instead.")
# fall back to pasted content or session state
if pasted_content:
model_file_content = pasted_content
elif st.session_state.uploaded_model:
model_file_content = st.session_state.uploaded_model
elif pasted_content and pasted_content.strip():
# use pasted content
model_file_content = pasted_content.strip()
st.session_state.uploaded_model = model_file_content
st.sidebar.success("โœ… Loaded from pasted content")
elif st.session_state.uploaded_model:
# use previously uploaded file
model_file_content = st.session_state.uploaded_model
# load data
try:
model = load_model(file_content=model_file_content)
table_map, column_map, dataset_map = build_mappings(model)
original_graph, original_join_details = build_graph(model, table_map, column_map)
# extract all perspectives
perspectives_dict = extract_perspectives(model)
# get selected perspective tables (if any perspective is selected)
selected_perspective_tables = None
if st.session_state.selected_perspective and st.session_state.selected_perspective in perspectives_dict:
selected_perspective_tables = perspectives_dict[st.session_state.selected_perspective]
# apply modifications
graph, join_details = apply_edge_modifications(
original_graph,
original_join_details,
st.session_state.removed_edges,
st.session_state.added_edges
)
# filter by perspective tables if a perspective is selected
if selected_perspective_tables:
graph, join_details = filter_by_perspective_tables(
graph,
join_details,
table_map,
selected_perspective_tables
)
# build tree
root_table = find_root_table(graph, table_map)
tree_structure = build_tree(root_table, graph, table_map)
bfs_levels = get_bfs_levels(root_table, graph)
# build join lookup
join_lookup = {}
for join in join_details:
key1 = (join['table1'], join['table2'])
key2 = (join['table2'], join['table1'])
join_lookup[key1] = join
join_lookup[key2] = join
# identify back edges
tree_edges_set = set()
def collect_tree_edges_set(node_oid, tree, parent_oid=None):
if parent_oid:
tree_edges_set.add((parent_oid, node_oid))
tree_edges_set.add((node_oid, parent_oid))
for child_oid in tree[node_oid]['children']:
collect_tree_edges_set(child_oid, tree, node_oid)
collect_tree_edges_set(root_table, tree_structure)
back_edges = []
for join in join_details:
u, v = join['table1'], join['table2']
if (u, v) not in tree_edges_set and (v, u) not in tree_edges_set:
back_edges.append((u, v))
# sidebar controls
st.sidebar.header("โš™๏ธ Controls")
# perspective selection
if perspectives_dict:
st.sidebar.subheader("๐Ÿ” Perspective Filter")
# build options list with "All Tables" as first option
perspective_options = ["All Tables"] + sorted(perspectives_dict.keys())
# determine current index
current_index = 0
if st.session_state.selected_perspective in perspectives_dict:
current_index = perspective_options.index(st.session_state.selected_perspective)
selected_perspective = st.sidebar.radio(
"Select Perspective",
perspective_options,
index=current_index,
format_func=lambda x: f"{x} ({len(perspectives_dict.get(x, []))} tables)" if x != "All Tables" else f"All Tables ({len(table_map)} tables)",
key="perspective_radio"
)
# update session state if selection changed
new_perspective = None if selected_perspective == "All Tables" else selected_perspective
if new_perspective != st.session_state.selected_perspective:
st.session_state.selected_perspective = new_perspective
st.rerun()
if st.session_state.selected_perspective:
st.sidebar.info(f"๐Ÿ“Š Showing {len(selected_perspective_tables)} tables from '{st.session_state.selected_perspective}'")
# always show all edges
show_tree_edges = True
show_back_edges = True
tree_mode = False
max_back_edges = None
# level filtering
st.sidebar.subheader("Level Filtering")
all_levels = sorted(bfs_levels.keys())
selected_levels = st.sidebar.multiselect(
"Show Levels",
all_levels,
default=all_levels,
format_func=lambda x: f"Level {x} ({len(bfs_levels[x])} tables)"
)
# edge management
st.sidebar.subheader("๐Ÿ”ง Edge Management")
# remove edge section
with st.sidebar.expander("Remove Edges", expanded=False):
all_edges = []
for join in join_details:
u, v = join['table1'], join['table2']
u_name = table_map.get(u, 'unknown')
v_name = table_map.get(v, 'unknown')
edge_key = (u, v)
if edge_key not in st.session_state.removed_edges and (v, u) not in st.session_state.removed_edges:
all_edges.append({
'key': edge_key,
'label': f"{u_name} โ†” {v_name}",
'join': join
})
if all_edges:
edge_options = [edge['label'] for edge in all_edges]
selected_edge_idx = st.selectbox("Select edge to remove", range(len(edge_options)),
format_func=lambda x: edge_options[x])
if st.button("Remove Edge", key="remove_edge"):
selected_edge = all_edges[selected_edge_idx]
edge_key = selected_edge['key']
st.session_state.removed_edges.add(edge_key)
# remove from added_edges if it was manually added
st.session_state.added_edges = [
e for e in st.session_state.added_edges
if not ((e['table1'] == edge_key[0] and e['table2'] == edge_key[1]) or
(e['table1'] == edge_key[1] and e['table2'] == edge_key[0]))
]
st.rerun()
else:
st.info("No edges available to remove")
# add edge section
with st.sidebar.expander("Add Edge", expanded=False):
# filter table names by perspective if a perspective is selected
if selected_perspective_tables:
all_table_names = sorted([table_map.get(oid, 'unknown') for oid in selected_perspective_tables if oid in table_map])
else:
all_table_names = sorted([table_map.get(oid, 'unknown') for oid in table_map.keys()])
source_table = st.selectbox("Source Table", all_table_names, key="add_source")
target_table = st.selectbox("Target Table", all_table_names, key="add_target")
# find oids
source_oid = None
target_oid = None
for oid, name in table_map.items():
if name == source_table:
source_oid = oid
if name == target_table:
target_oid = oid
if source_oid and target_oid and source_oid != target_oid:
# check if edge already exists in current graph (after modifications)
edge_exists_in_graph = target_oid in graph.get(source_oid, set())
# check if already in added_edges
already_added = any(
(e['table1'] == source_oid and e['table2'] == target_oid) or
(e['table1'] == target_oid and e['table2'] == source_oid)
for e in st.session_state.added_edges
)
if edge_exists_in_graph and not already_added:
st.warning("Edge already exists in the graph")
elif already_added:
st.info("Edge is already queued to be added")
else:
if st.button("Add Edge", key="add_edge"):
new_edge = {
'table1': source_oid,
'table2': target_oid,
'table1_name': source_table,
'table2_name': target_table,
'column1': 'unknown',
'column2': 'unknown'
}
st.session_state.added_edges.append(new_edge)
st.rerun()
elif source_oid == target_oid:
st.warning("Source and target cannot be the same")
# show modified edges summary
if st.session_state.removed_edges or st.session_state.added_edges:
with st.sidebar.expander("Modified Edges Summary", expanded=False):
if st.session_state.removed_edges:
st.write("**Removed:**")
for u, v in list(st.session_state.removed_edges)[:10]: # show first 10
u_name = table_map.get(u, 'unknown')
v_name = table_map.get(v, 'unknown')
st.write(f"- {u_name} โ†” {v_name}")
if len(st.session_state.removed_edges) > 10:
st.write(f"... and {len(st.session_state.removed_edges) - 10} more")
if st.session_state.added_edges:
st.write("**Added:**")
for edge in st.session_state.added_edges[:10]: # show first 10
st.write(f"- {edge['table1_name']} โ†” {edge['table2_name']}")
if len(st.session_state.added_edges) > 10:
st.write(f"... and {len(st.session_state.added_edges) - 10} more")
# reset edges
if st.sidebar.button("Reset All Edge Changes", type="secondary"):
st.session_state.removed_edges = set()
st.session_state.added_edges = []
st.rerun()
# path finding section
st.sidebar.subheader("๐Ÿ” Path Finding")
# filter table names by perspective if a perspective is selected
if selected_perspective_tables:
all_table_names = sorted([table_map.get(oid, 'unknown') for oid in selected_perspective_tables if oid in table_map])
else:
all_table_names = sorted([table_map.get(oid, 'unknown') for oid in table_map.keys()])
# source table selection
source_table_name = st.sidebar.selectbox(
"Source Table",
[None] + all_table_names,
index=0 if st.session_state.path_source is None else
([None] + all_table_names).index(
next((name for oid, name in table_map.items() if oid == st.session_state.path_source), None) or None
) if st.session_state.path_source else 0,
key="path_source_select"
)
# target table selection
target_table_name = st.sidebar.selectbox(
"Target Table",
[None] + all_table_names,
index=0 if st.session_state.path_target is None else
([None] + all_table_names).index(
next((name for oid, name in table_map.items() if oid == st.session_state.path_target), None) or None
) if st.session_state.path_target else 0,
key="path_target_select"
)
# find oids for selected tables
path_source_oid = None
path_target_oid = None
if source_table_name:
for oid, name in table_map.items():
if name == source_table_name:
path_source_oid = oid
# reset selected path if source changed
if st.session_state.path_source != oid:
st.session_state.selected_path_number = None
st.session_state.path_source = oid
break
if target_table_name:
for oid, name in table_map.items():
if name == target_table_name:
path_target_oid = oid
# reset selected path if target changed
if st.session_state.path_target != oid:
st.session_state.selected_path_number = None
st.session_state.path_target = oid
break
# toggle for showing all paths vs shortest paths only
show_all_paths = st.sidebar.checkbox(
"Show All Paths (not just shortest)",
value=False,
help="When unchecked, shows only shortest paths. When checked, shows all simple paths up to 10 hops.",
key="show_all_paths"
)
# find paths if both source and target are selected
all_paths = []
path_colors = []
filtered_paths = []
filtered_path_colors = []
if path_source_oid and path_target_oid:
all_paths = find_all_simple_paths(graph, path_source_oid, path_target_oid, max_depth=10)
# sort paths by length (shortest first)
if all_paths:
all_paths.sort(key=len)
# filter to only shortest paths if checkbox is unchecked
if all_paths and not show_all_paths:
min_length = len(all_paths[0]) # already sorted, so first is shortest
all_paths = [path for path in all_paths if len(path) == min_length]
# generate distinct colors for each path
num_paths = len(all_paths)
if num_paths > 0:
for i in range(num_paths):
hue = i / max(num_paths, 1)
rgb = colorsys.hsv_to_rgb(hue, 0.8, 0.9)
color = f"#{int(rgb[0]*255):02x}{int(rgb[1]*255):02x}{int(rgb[2]*255):02x}"
path_colors.append(color)
# path number selector to show only specific path
path_options = ["All Paths"] + [f"Path {i+1}" for i in range(num_paths)]
# determine default index
default_index = 0
if st.session_state.selected_path_number is not None:
# check if the stored path number is still valid
if 1 <= st.session_state.selected_path_number <= num_paths:
default_index = st.session_state.selected_path_number
else:
st.session_state.selected_path_number = None
selected_path_option = st.sidebar.selectbox(
"Select Path to Display",
path_options,
index=default_index,
key="path_selector",
help="Select a specific path number to display only that path, or 'All Paths' to show all"
)
# parse selected path number
if selected_path_option == "All Paths":
st.session_state.selected_path_number = None
filtered_paths = all_paths
filtered_path_colors = path_colors
else:
# extract path number (e.g., "Path 3" -> 2 (0-indexed))
path_num = int(selected_path_option.split()[1]) - 1
st.session_state.selected_path_number = path_num + 1 # store 1-indexed
if 0 <= path_num < num_paths:
filtered_paths = [all_paths[path_num]]
filtered_path_colors = [path_colors[path_num]]
else:
filtered_paths = all_paths
filtered_path_colors = path_colors
else:
filtered_paths = all_paths
filtered_path_colors = path_colors
# display path information
if all_paths:
if show_all_paths:
# show all paths with varying lengths
path_lengths = [len(path) - 1 for path in all_paths]
min_hops = min(path_lengths)
max_hops = max(path_lengths)
if min_hops == max_hops:
st.sidebar.success(f"Found {len(all_paths)} path(s) ({min_hops} hops)")
else:
st.sidebar.success(f"Found {len(all_paths)} path(s) ({min_hops}-{max_hops} hops)")
else:
# all paths have same length (shortest)
path_length = len(all_paths[0]) - 1
st.sidebar.success(f"Found {len(all_paths)} shortest path(s) ({path_length} hops)")
with st.sidebar.expander("๐Ÿ“‹ Path Details", expanded=True):
for i, path in enumerate(all_paths):
path_names = [table_map.get(node, 'unknown') for node in path]
path_str = " โ†’ ".join(path_names)
path_hops = len(path) - 1
st.markdown(f"**Path {i+1}** ({path_hops} hops):")
st.markdown(f"<span style='color: {path_colors[i]}; font-weight: bold;'>{path_str}</span>",
unsafe_allow_html=True)
st.markdown("---")
else:
st.sidebar.warning("No paths found between selected tables")
elif path_source_oid or path_target_oid:
st.sidebar.info("Select both source and target tables to find paths")
# node search
st.sidebar.subheader("Node Search")
search_term = st.sidebar.text_input("Search table", "")
selected_node = None
if search_term:
matching_tables = [name for name in all_table_names if search_term.lower() in name.lower()]
if matching_tables:
selected_table = st.sidebar.selectbox("Select table", matching_tables)
# find oid for selected table
for oid, name in table_map.items():
if name == selected_table:
selected_node = oid
break
# stats
st.sidebar.subheader("๐Ÿ“Š Statistics")
st.sidebar.metric("Total Tables", len(table_map))
if perspectives_dict:
st.sidebar.metric("Available Perspectives", len(perspectives_dict))
if selected_perspective_tables:
st.sidebar.metric("Filtered Tables", len(selected_perspective_tables))
st.sidebar.metric("Tree Edges", len(tree_edges_set) // 2)
st.sidebar.metric("Back Edges", len(back_edges))
st.sidebar.metric("BFS Levels", len(bfs_levels))
st.sidebar.metric("Removed Edges", len(st.session_state.removed_edges))
st.sidebar.metric("Added Edges", len(st.session_state.added_edges))
# create network
net = create_pyvis_network(
graph,
table_map,
tree_structure,
join_lookup,
show_tree_edges=show_tree_edges,
show_back_edges=show_back_edges,
back_edges_list=back_edges,
max_back_edges=max_back_edges,
show_levels=selected_levels if selected_levels else None,
selected_node=selected_node,
tree_mode=tree_mode,
paths=filtered_paths if path_source_oid and path_target_oid else None,
path_colors=filtered_path_colors if path_source_oid and path_target_oid else None,
path_source=path_source_oid if path_source_oid and path_target_oid else None,
path_target=path_target_oid if path_source_oid and path_target_oid else None
)
# save and display
with tempfile.NamedTemporaryFile(delete=False, suffix='.html', mode='w') as tmp_file:
net.save_graph(tmp_file.name)
html_file = tmp_file.name
with open(html_file, 'r', encoding='utf-8') as f:
html_content = f.read()
# modify physics settings directly in the HTML options
# change physics.enabled from true to false after a delay
import re
# find and modify physics enabled in options
def modify_physics_options(match):
options_str = match.group(0)
# replace "enabled": true with "enabled": false in physics section
options_str = re.sub(
r'("physics"\s*:\s*\{[^}]*)"enabled"\s*:\s*true',
r'\1"enabled": false',
options_str,
flags=re.DOTALL
)
return options_str
# try to modify the options directly
html_content = re.sub(
r'"physics"\s*:\s*\{[^}]*"enabled"\s*:\s*true[^}]*\}',
lambda m: m.group(0).replace('"enabled": true', '"enabled": false').replace('"enabled":True', '"enabled":false'),
html_content
)
# inject css to set Consolas font globally
consolas_font_style = """
<style>
body, .vis-network, .vis-network canvas {
font-family: 'Consolas', 'Courier New', monospace !important;
}
.vis-network .vis-label {
font-family: 'Consolas', 'Courier New', monospace !important;
}
</style>
"""
# inject javascript to reduce physics intensity after stabilization
# keeps physics enabled for interactivity but reduces movement
reduce_physics_script = """
<script>
(function() {
function reducePhysicsIntensity() {
try {
// find network via vis-network class
var containers = document.querySelectorAll('.vis-network');
for (var i = 0; i < containers.length; i++) {
var container = containers[i];
if (container && container.network) {
// reduce physics intensity instead of disabling
// this keeps nodes interactive but prevents excessive movement
container.network.setOptions({
physics: {
enabled: true,
forceAtlas2Based: {
gravitationalConstant: -50,
centralGravity: 0.005,
springLength: 250,
springConstant: 0.04,
damping: 0.9,
avoidOverlap: 1.2
},
stabilization: {
enabled: false
}
}
});
}
}
// also try to find via window or global scope
if (window.network) {
window.network.setOptions({
physics: {
enabled: true,
forceAtlas2Based: {
gravitationalConstant: -50,
centralGravity: 0.005,
springLength: 250,
springConstant: 0.04,
damping: 0.9,
avoidOverlap: 1.2
},
stabilization: {
enabled: false
}
}
});
}
} catch(e) {
// silently fail
}
}
// wait for stabilization to complete, then reduce intensity
setTimeout(reducePhysicsIntensity, 3000);
setTimeout(reducePhysicsIntensity, 5000);
// listen for stabilization complete event
var checkCount = 0;
var maxChecks = 20;
var checkInterval = setInterval(function() {
checkCount++;
reducePhysicsIntensity();
if (checkCount >= maxChecks) {
clearInterval(checkInterval);
}
}, 500);
// also on window load
if (document.readyState === 'complete') {
setTimeout(reducePhysicsIntensity, 4000);
} else {
window.addEventListener('load', function() {
setTimeout(reducePhysicsIntensity, 4000);
});
}
})();
</script>
"""
# insert style in head and script before closing body tag
if '<head>' in html_content:
html_content = html_content.replace('<head>', '<head>' + consolas_font_style)
elif '</head>' in html_content:
html_content = html_content.replace('</head>', consolas_font_style + '</head>')
else:
html_content = consolas_font_style + html_content
if '</body>' in html_content:
html_content = html_content.replace('</body>', reduce_physics_script + '</body>')
else:
html_content += reduce_physics_script
st.components.v1.html(html_content, height=850, scrolling=True)
# cleanup
os.unlink(html_file)
# info section
with st.expander("โ„น๏ธ About this visualization"):
st.markdown("""
**Graph Structure:**
- All edges are displayed uniformly (same color and style)
- All edges (tree edges and back edges) are always shown
- **Tree Edges**: Parent-child relationships from BFS tree
- **Back Edges**: Additional edges that create cycles
**Edge Management:**
- **Remove Edges**: Select and remove any edge from the graph
- **Add Edges**: Manually add new edges between tables
- Changes persist during the session
- Use "Reset All Edge Changes" to restore original graph
**Controls:**
- Filter by BFS level to focus on specific hierarchy levels
- Search and highlight specific tables
**Path Finding:**
- Select source and target tables to find all simple paths between them
- Toggle "Show All Paths" to see all paths or just shortest paths
- All paths are simple (no repeated nodes or edges)
- Path list shows all found paths with hop counts
- Paths automatically update when edges are removed or added
**Interactivity:**
- Click and drag nodes to rearrange
- Hover over nodes/edges for details
- Zoom with mouse wheel
- Pan by dragging background
""")
except FileNotFoundError:
st.error("โŒ Model file not found. Please upload your .smodel file using the file uploader in the sidebar.")
st.info("๐Ÿ’ก **Tip**: In Hugging Face Spaces, you need to upload the file each time. Use the sidebar file uploader above.")
except json.JSONDecodeError as e:
st.error(f"โŒ Error parsing JSON: {str(e)}")
st.info("Please ensure you're uploading a valid .smodel or .json file.")
except Exception as e:
st.error(f"โŒ Error loading data: {str(e)}")
st.exception(e)