import streamlit as st import json from collections import defaultdict, deque from pyvis.network import Network import tempfile import os import colorsys # page config st.set_page_config( page_title="Sisense Table Relationship Graph", page_icon="🔗", layout="wide", initial_sidebar_state="expanded" ) @st.cache_data def load_model(file_path=None, file_content=None): """load sisense model file""" if file_content: # load from uploaded file content return json.loads(file_content) elif file_path and os.path.exists(file_path): # load from file path with open(file_path, 'r') as f: return json.load(f) else: # try default filename default_path = 'assethub-v3.smodel' if os.path.exists(default_path): with open(default_path, 'r') as f: return json.load(f) raise FileNotFoundError("Model file not found. Please upload the .smodel file.") @st.cache_data def extract_perspectives(model): """extract all perspectives with their names and table oids""" perspectives_dict = {} dependencies = model.get('dependencies', {}) perspectives = dependencies.get('perspectives', []) for perspective in perspectives: name = perspective.get('name', 'Unnamed Perspective') tables = perspective.get('tables', []) # only include perspectives that have tables if tables: table_oids = set() for table in tables: table_oid = table.get('oid') if table_oid: table_oids.add(table_oid) if table_oids: perspectives_dict[name] = table_oids return perspectives_dict @st.cache_data def build_mappings(model): """build table and column mappings""" table_map = {} column_map = {} dataset_map = {} for dataset in model.get('datasets', []): dataset_oid = dataset.get('oid') dataset_name = dataset.get('name', 'unknown') dataset_map[dataset_oid] = dataset_name schema = dataset.get('schema', {}) tables = schema.get('tables', []) for table in tables: table_oid = table.get('oid') table_name = table.get('name', table.get('id', 'unknown')) table_map[table_oid] = table_name for column in table.get('columns', []): column_oid = column.get('oid') column_name = column.get('name', column.get('id', 'unknown')) column_map[(dataset_oid, table_oid, column_oid)] = column_name return table_map, column_map, dataset_map @st.cache_data def build_graph(model, table_map, column_map): """build graph from relations""" graph = defaultdict(set) join_details = [] for relation in model.get('relations', []): columns = relation.get('columns', []) if len(columns) != 2: continue col1 = columns[0] col2 = columns[1] table1_oid = col1.get('table') table2_oid = col2.get('table') dataset1_oid = col1.get('dataset') dataset2_oid = col2.get('dataset') column1_oid = col1.get('column') column2_oid = col2.get('column') col1_name = column_map.get((dataset1_oid, table1_oid, column1_oid), 'unknown') col2_name = column_map.get((dataset2_oid, table2_oid, column2_oid), 'unknown') graph[table1_oid].add(table2_oid) graph[table2_oid].add(table1_oid) join_details.append({ 'table1': table1_oid, 'table2': table2_oid, 'table1_name': table_map.get(table1_oid, 'unknown'), 'table2_name': table_map.get(table2_oid, 'unknown'), 'column1': col1_name, 'column2': col2_name }) return graph, join_details @st.cache_data def build_tree(root, graph, table_map): """build bfs tree structure""" tree = {} visited = set() queue = deque([(root, 0)]) visited.add(root) while queue: current, level = queue.popleft() if current not in tree: tree[current] = { 'name': table_map.get(current, 'unknown'), 'level': level, 'children': [] } for neighbor in graph.get(current, []): if neighbor not in visited: visited.add(neighbor) queue.append((neighbor, level + 1)) tree[current]['children'].append(neighbor) if neighbor not in tree: tree[neighbor] = { 'name': table_map.get(neighbor, 'unknown'), 'level': level + 1, 'children': [] } return tree def find_root_table(graph, table_map): """find most connected table as root""" if not graph: return list(table_map.keys())[0] if table_map else None max_connections = 0 root_table = None for table_oid, connections in graph.items(): if len(connections) > max_connections: max_connections = len(connections) root_table = table_oid return root_table def get_bfs_levels(root, graph): """get nodes grouped by bfs level""" levels = {} visited = set() queue = deque([(root, 0)]) visited.add(root) while queue: node, level = queue.popleft() if level not in levels: levels[level] = [] levels[level].append(node) for neighbor in graph.get(node, []): if neighbor not in visited: visited.add(neighbor) queue.append((neighbor, level + 1)) return levels def prioritize_back_edges(back_edges_list, graph, top_n=None): """prioritize back edges by importance""" edge_scores = [] for u, v in back_edges_list: connections_u = len(graph.get(u, set())) connections_v = len(graph.get(v, set())) score = connections_u + connections_v edge_scores.append((score, (u, v))) edge_scores.sort(reverse=True) if top_n: return [edge for _, edge in edge_scores[:top_n]] return [edge for _, edge in edge_scores] def find_all_simple_paths(graph, source, target, max_depth=10): """ find all simple paths between source and target using dfs. ensures no node or edge is traversed twice in any path. """ if source == target: return [[source]] if source not in graph or target not in graph: return [] all_paths = [] def dfs(current, target, path, visited_nodes, visited_edges, depth): """ dfs to find all simple paths. visited_nodes: set of nodes already in current path (prevents node revisits) visited_edges: set of edges already traversed in current path (prevents edge revisits) """ if depth > max_depth: return if current == target: all_paths.append(path[:]) return for neighbor in graph.get(current, []): # create edge tuple (normalized for undirected graph) edge = tuple(sorted([current, neighbor])) # ensure: node not visited AND edge not traversed in current path if neighbor not in visited_nodes and edge not in visited_edges: visited_nodes.add(neighbor) visited_edges.add(edge) path.append(neighbor) dfs(neighbor, target, path, visited_nodes, visited_edges, depth + 1) path.pop() visited_edges.remove(edge) visited_nodes.remove(neighbor) visited_nodes = {source} # track nodes in current path visited_edges = set() # track edges in current path dfs(source, target, [source], visited_nodes, visited_edges, 0) return all_paths def create_pyvis_network(graph_data, table_map, tree_structure, join_lookup, show_tree_edges=True, show_back_edges=True, back_edges_list=None, max_back_edges=None, show_levels=None, selected_node=None, tree_mode=False, paths=None, path_colors=None, path_source=None, path_target=None): """create interactive pyvis network""" net = Network( height='800px', width='100%', bgcolor='#ffffff', font_color='#000000', directed=True, layout='hierarchical' if (show_tree_edges or tree_mode) else None ) # set physics options with edge repulsion and straight edges # forceAtlas2Based provides natural edge repulsion and node spacing # keep physics enabled with reduced intensity after stabilization for interactivity if tree_mode or show_tree_edges: # tree mode with edge repulsion options_str = json.dumps({ "edges": { "smooth": { "enabled": False, "type": "straight" } }, "physics": { "enabled": True, "forceAtlas2Based": { "gravitationalConstant": -120, "centralGravity": 0.01, "springLength": 400, "springConstant": 0.08, "damping": 0.7, "avoidOverlap": 1.2 }, "solver": "forceAtlas2Based", "stabilization": { "enabled": True, "iterations": 300, "fit": True, "updateInterval": 25 }, "adaptiveTimestep": True } }) net.set_options(options_str) else: # force-directed with edge repulsion options_str = json.dumps({ "edges": { "smooth": { "enabled": False, "type": "straight" } }, "physics": { "enabled": True, "forceAtlas2Based": { "gravitationalConstant": -120, "centralGravity": 0.01, "springLength": 300, "springConstant": 0.08, "damping": 0.7, "avoidOverlap": 1.2 }, "solver": "forceAtlas2Based", "stabilization": { "enabled": True, "iterations": 300, "fit": True, "updateInterval": 25 }, "adaptiveTimestep": True } }) net.set_options(options_str) # note: physics intensity will be reduced via javascript after stabilization for better interactivity # level colors level_colors = { 0: '#3399ff', # blue 1: '#4dcc4d', # green 2: '#ff9933', # orange 3: '#cc66cc', # pink 4: '#9999ff', # purple } # collect all nodes that should be displayed all_nodes = set() if show_tree_edges: for node in tree_structure.keys(): all_nodes.add(node) if show_back_edges and back_edges_list: for u, v in back_edges_list: all_nodes.add(u) all_nodes.add(v) # add path nodes if paths are provided path_nodes_set = set() if paths: for path in paths: for node in path: path_nodes_set.add(node) all_nodes.add(node) # filter by levels if specified if show_levels is not None: filtered_nodes = set() for node in all_nodes: level = tree_structure.get(node, {}).get('level', 99) if level in show_levels: filtered_nodes.add(node) all_nodes = filtered_nodes # determine node colors and borders based on paths node_path_colors = {} has_paths = paths and path_colors if has_paths: for i, path in enumerate(paths): path_color = path_colors[i] for node in path: if node not in node_path_colors: node_path_colors[node] = [] node_path_colors[node].append(path_color) for node_oid in all_nodes: level = tree_structure.get(node_oid, {}).get('level', 0) base_color = level_colors.get(level, '#cccccc') name = table_map.get(node_oid, 'unknown') # highlight source and target nodes first (highest priority) - always black if node_oid == path_source or node_oid == path_target: # source and target nodes: black color with thick border node_color = '#000000' # black border_color = '#000000' border_width = 6 opacity = 1.0 else: # determine node color - if in paths, use path color(s), otherwise dim if paths exist if node_oid in node_path_colors: # if node is in multiple paths, use first path color or blend node_color = node_path_colors[node_oid][0] opacity = 1.0 else: if has_paths: # dim non-path nodes when paths are shown node_color = '#e0e0e0' # light gray opacity = 0.2 else: node_color = base_color opacity = 1.0 # set border for non-source/target nodes if node_oid == selected_node: border_color = '#ff0000' border_width = 5 opacity = 1.0 # always fully visible when selected elif node_oid in path_nodes_set: border_color = '#0000ff' # blue border for path nodes border_width = 4 opacity = 1.0 # always fully visible for path nodes else: border_color = '#cccccc' if has_paths else '#000000' border_width = 1 if has_paths else 2 # convert color to rgba for opacity support if opacity < 1.0: # convert hex to rgb and add opacity hex_color = node_color.lstrip('#') r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) node_color = f"rgba({r},{g},{b},{opacity})" net.add_node( node_oid, label=name, color=node_color, level=level, title=f"{name}
Level: {level}
ID: {node_oid}", borderWidth=border_width, borderColor=border_color, font={'size': 16 if (node_oid == path_source or node_oid == path_target) else (14 if node_oid in path_nodes_set or node_oid == selected_node else 10), 'face': 'Consolas', 'color': '#ffffff' if (node_oid == path_source or node_oid == path_target) else '#000000'}, opacity=opacity ) # collect path edges to identify which edges are part of paths path_edges_set = set() path_edge_to_path_index = {} # map edge to path index and color if paths and path_colors: for i, path in enumerate(paths): path_color = path_colors[i] for j in range(len(path) - 1): u, v = path[j], path[j + 1] edge_key = (u, v) reverse_key = (v, u) path_edges_set.add(edge_key) path_edges_set.add(reverse_key) path_edge_to_path_index[edge_key] = (i, path_color) path_edge_to_path_index[reverse_key] = (i, path_color) # add tree edges if show_tree_edges: tree_edges = set() def collect_tree_edges(node_oid, tree, parent_oid=None): if parent_oid: tree_edges.add((parent_oid, node_oid)) for child_oid in tree[node_oid]['children']: collect_tree_edges(child_oid, tree, node_oid) root = find_root_table(graph_data, table_map) if root in tree_structure: collect_tree_edges(root, tree_structure) for u, v in tree_edges: if u in all_nodes and v in all_nodes: edge_key = (u, v) reverse_key = (v, u) # check if this edge is part of a path is_path_edge = edge_key in path_edges_set or reverse_key in path_edges_set if is_path_edge: # use path color path_info = path_edge_to_path_index.get(edge_key) or path_edge_to_path_index.get(reverse_key) if path_info: path_idx, path_color = path_info net.add_edge( u, v, label=f"P{path_idx+1}", color=path_color, width=4, arrows='', title=f"Path {path_idx+1}", smooth=False, font={'face': 'Consolas', 'size': 12}, opacity=1.0 ) else: # regular edge - uniform gray, dim if paths are shown if has_paths: # use rgba for opacity edge_color = 'rgba(128, 128, 128, 0.2)' else: edge_color = '#808080' net.add_edge( u, v, label="", color=edge_color, width=2, arrows='', title="", smooth=False ) # add back edges if show_back_edges and back_edges_list: filtered_back_edges = back_edges_list if max_back_edges: filtered_back_edges = prioritize_back_edges(back_edges_list, graph_data, top_n=max_back_edges) for u, v in filtered_back_edges: if u in all_nodes and v in all_nodes: edge_key = (u, v) reverse_key = (v, u) # check if this edge is part of a path is_path_edge = edge_key in path_edges_set or reverse_key in path_edges_set if is_path_edge: # use path color path_info = path_edge_to_path_index.get(edge_key) or path_edge_to_path_index.get(reverse_key) if path_info: path_idx, path_color = path_info net.add_edge( u, v, label=f"P{path_idx+1}", color=path_color, width=4, arrows='', title=f"Path {path_idx+1}", smooth=False, font={'face': 'Consolas', 'size': 12}, opacity=1.0 ) else: # regular edge - uniform gray, dim if paths are shown if has_paths: # use rgba for opacity edge_color = 'rgba(128, 128, 128, 0.2)' else: edge_color = '#808080' net.add_edge( u, v, label="", color=edge_color, width=2, arrows='', title="", smooth=False ) return net def apply_edge_modifications(graph, join_details, removed_edges, added_edges): """apply edge modifications to graph and join_details""" modified_graph = defaultdict(set) modified_join_details = [] # copy original graph for node, neighbors in graph.items(): modified_graph[node] = neighbors.copy() # remove edges for u, v in removed_edges: if u in modified_graph: modified_graph[u].discard(v) if v in modified_graph: modified_graph[v].discard(u) # add edges for edge_info in added_edges: u = edge_info['table1'] v = edge_info['table2'] modified_graph[u].add(v) modified_graph[v].add(u) modified_join_details.append(edge_info) # add original joins that weren't removed for join in join_details: u, v = join['table1'], join['table2'] if (u, v) not in removed_edges and (v, u) not in removed_edges: modified_join_details.append(join) return modified_graph, modified_join_details def filter_by_perspective_tables(graph, join_details, table_map, perspective_tables): """filter graph and join_details to only include tables in perspectives""" if not perspective_tables: return graph, join_details filtered_graph = defaultdict(set) filtered_join_details = [] # filter graph - only keep edges where both nodes are in perspectives for node, neighbors in graph.items(): if node in perspective_tables: filtered_neighbors = {n for n in neighbors if n in perspective_tables} if filtered_neighbors: filtered_graph[node] = filtered_neighbors # filter join_details - only keep joins where both tables are in perspectives for join in join_details: u, v = join['table1'], join['table2'] if u in perspective_tables and v in perspective_tables: filtered_join_details.append(join) return filtered_graph, filtered_join_details # main app st.title("🔗 Sisense Table Relationship Graph") st.markdown("Interactive visualization of table relationships with dynamic edge management") # initialize session state for edge management if 'removed_edges' not in st.session_state: st.session_state.removed_edges = set() if 'added_edges' not in st.session_state: st.session_state.added_edges = [] if 'path_source' not in st.session_state: st.session_state.path_source = None if 'path_target' not in st.session_state: st.session_state.path_target = None if 'uploaded_model' not in st.session_state: st.session_state.uploaded_model = None if 'selected_path_number' not in st.session_state: st.session_state.selected_path_number = None if 'selected_perspective' not in st.session_state: st.session_state.selected_perspective = None # file upload section st.sidebar.header("📁 Model File") # try file upload first uploaded_file = None try: uploaded_file = st.sidebar.file_uploader( "Upload Sisense Model File", type=['smodel', 'json'], help="Upload your assethub-v3.smodel file or any .smodel/.json file. If you get a 403 error, use the text input below instead.", key="model_uploader" ) except Exception as e: st.sidebar.warning(f"âš ī¸ File uploader error: {str(e)}") st.sidebar.info("💡 Use the text input below to paste your JSON content instead.") # alternative: paste JSON content directly st.sidebar.subheader("Or Paste JSON Content") pasted_content = st.sidebar.text_area( "Paste your .smodel file content here (if file upload doesn't work)", height=100, help="If file upload gives a 403 error, copy and paste the entire JSON content from your .smodel file here", key="pasted_model" ) # handle file upload or pasted content model_file_content = None if uploaded_file is not None: try: # read uploaded file content model_file_content = uploaded_file.read().decode('utf-8') st.session_state.uploaded_model = model_file_content st.sidebar.success(f"✅ Loaded: {uploaded_file.name}") except Exception as e: st.sidebar.error(f"❌ Error reading file: {str(e)}") st.sidebar.info("💡 Try using the text input above to paste the content instead.") # fall back to pasted content or session state if pasted_content: model_file_content = pasted_content elif st.session_state.uploaded_model: model_file_content = st.session_state.uploaded_model elif pasted_content and pasted_content.strip(): # use pasted content model_file_content = pasted_content.strip() st.session_state.uploaded_model = model_file_content st.sidebar.success("✅ Loaded from pasted content") elif st.session_state.uploaded_model: # use previously uploaded file model_file_content = st.session_state.uploaded_model # load data try: model = load_model(file_content=model_file_content) table_map, column_map, dataset_map = build_mappings(model) original_graph, original_join_details = build_graph(model, table_map, column_map) # extract all perspectives perspectives_dict = extract_perspectives(model) # get selected perspective tables (if any perspective is selected) selected_perspective_tables = None if st.session_state.selected_perspective and st.session_state.selected_perspective in perspectives_dict: selected_perspective_tables = perspectives_dict[st.session_state.selected_perspective] # apply modifications graph, join_details = apply_edge_modifications( original_graph, original_join_details, st.session_state.removed_edges, st.session_state.added_edges ) # filter by perspective tables if a perspective is selected if selected_perspective_tables: graph, join_details = filter_by_perspective_tables( graph, join_details, table_map, selected_perspective_tables ) # build tree root_table = find_root_table(graph, table_map) tree_structure = build_tree(root_table, graph, table_map) bfs_levels = get_bfs_levels(root_table, graph) # build join lookup join_lookup = {} for join in join_details: key1 = (join['table1'], join['table2']) key2 = (join['table2'], join['table1']) join_lookup[key1] = join join_lookup[key2] = join # identify back edges tree_edges_set = set() def collect_tree_edges_set(node_oid, tree, parent_oid=None): if parent_oid: tree_edges_set.add((parent_oid, node_oid)) tree_edges_set.add((node_oid, parent_oid)) for child_oid in tree[node_oid]['children']: collect_tree_edges_set(child_oid, tree, node_oid) collect_tree_edges_set(root_table, tree_structure) back_edges = [] for join in join_details: u, v = join['table1'], join['table2'] if (u, v) not in tree_edges_set and (v, u) not in tree_edges_set: back_edges.append((u, v)) # sidebar controls st.sidebar.header("âš™ī¸ Controls") # perspective selection if perspectives_dict: st.sidebar.subheader("🔍 Perspective Filter") # build options list with "All Tables" as first option perspective_options = ["All Tables"] + sorted(perspectives_dict.keys()) # determine current index current_index = 0 if st.session_state.selected_perspective in perspectives_dict: current_index = perspective_options.index(st.session_state.selected_perspective) selected_perspective = st.sidebar.radio( "Select Perspective", perspective_options, index=current_index, format_func=lambda x: f"{x} ({len(perspectives_dict.get(x, []))} tables)" if x != "All Tables" else f"All Tables ({len(table_map)} tables)", key="perspective_radio" ) # update session state if selection changed new_perspective = None if selected_perspective == "All Tables" else selected_perspective if new_perspective != st.session_state.selected_perspective: st.session_state.selected_perspective = new_perspective st.rerun() if st.session_state.selected_perspective: st.sidebar.info(f"📊 Showing {len(selected_perspective_tables)} tables from '{st.session_state.selected_perspective}'") # always show all edges show_tree_edges = True show_back_edges = True tree_mode = False max_back_edges = None # level filtering st.sidebar.subheader("Level Filtering") all_levels = sorted(bfs_levels.keys()) selected_levels = st.sidebar.multiselect( "Show Levels", all_levels, default=all_levels, format_func=lambda x: f"Level {x} ({len(bfs_levels[x])} tables)" ) # edge management st.sidebar.subheader("🔧 Edge Management") # remove edge section with st.sidebar.expander("Remove Edges", expanded=False): all_edges = [] for join in join_details: u, v = join['table1'], join['table2'] u_name = table_map.get(u, 'unknown') v_name = table_map.get(v, 'unknown') edge_key = (u, v) if edge_key not in st.session_state.removed_edges and (v, u) not in st.session_state.removed_edges: all_edges.append({ 'key': edge_key, 'label': f"{u_name} ↔ {v_name}", 'join': join }) if all_edges: edge_options = [edge['label'] for edge in all_edges] selected_edge_idx = st.selectbox("Select edge to remove", range(len(edge_options)), format_func=lambda x: edge_options[x]) if st.button("Remove Edge", key="remove_edge"): selected_edge = all_edges[selected_edge_idx] edge_key = selected_edge['key'] st.session_state.removed_edges.add(edge_key) # remove from added_edges if it was manually added st.session_state.added_edges = [ e for e in st.session_state.added_edges if not ((e['table1'] == edge_key[0] and e['table2'] == edge_key[1]) or (e['table1'] == edge_key[1] and e['table2'] == edge_key[0])) ] st.rerun() else: st.info("No edges available to remove") # add edge section with st.sidebar.expander("Add Edge", expanded=False): # filter table names by perspective if a perspective is selected if selected_perspective_tables: all_table_names = sorted([table_map.get(oid, 'unknown') for oid in selected_perspective_tables if oid in table_map]) else: all_table_names = sorted([table_map.get(oid, 'unknown') for oid in table_map.keys()]) source_table = st.selectbox("Source Table", all_table_names, key="add_source") target_table = st.selectbox("Target Table", all_table_names, key="add_target") # find oids source_oid = None target_oid = None for oid, name in table_map.items(): if name == source_table: source_oid = oid if name == target_table: target_oid = oid if source_oid and target_oid and source_oid != target_oid: # check if edge already exists in current graph (after modifications) edge_exists_in_graph = target_oid in graph.get(source_oid, set()) # check if already in added_edges already_added = any( (e['table1'] == source_oid and e['table2'] == target_oid) or (e['table1'] == target_oid and e['table2'] == source_oid) for e in st.session_state.added_edges ) if edge_exists_in_graph and not already_added: st.warning("Edge already exists in the graph") elif already_added: st.info("Edge is already queued to be added") else: if st.button("Add Edge", key="add_edge"): new_edge = { 'table1': source_oid, 'table2': target_oid, 'table1_name': source_table, 'table2_name': target_table, 'column1': 'unknown', 'column2': 'unknown' } st.session_state.added_edges.append(new_edge) st.rerun() elif source_oid == target_oid: st.warning("Source and target cannot be the same") # show modified edges summary if st.session_state.removed_edges or st.session_state.added_edges: with st.sidebar.expander("Modified Edges Summary", expanded=False): if st.session_state.removed_edges: st.write("**Removed:**") for u, v in list(st.session_state.removed_edges)[:10]: # show first 10 u_name = table_map.get(u, 'unknown') v_name = table_map.get(v, 'unknown') st.write(f"- {u_name} ↔ {v_name}") if len(st.session_state.removed_edges) > 10: st.write(f"... and {len(st.session_state.removed_edges) - 10} more") if st.session_state.added_edges: st.write("**Added:**") for edge in st.session_state.added_edges[:10]: # show first 10 st.write(f"- {edge['table1_name']} ↔ {edge['table2_name']}") if len(st.session_state.added_edges) > 10: st.write(f"... and {len(st.session_state.added_edges) - 10} more") # reset edges if st.sidebar.button("Reset All Edge Changes", type="secondary"): st.session_state.removed_edges = set() st.session_state.added_edges = [] st.rerun() # path finding section st.sidebar.subheader("🔍 Path Finding") # filter table names by perspective if a perspective is selected if selected_perspective_tables: all_table_names = sorted([table_map.get(oid, 'unknown') for oid in selected_perspective_tables if oid in table_map]) else: all_table_names = sorted([table_map.get(oid, 'unknown') for oid in table_map.keys()]) # source table selection source_table_name = st.sidebar.selectbox( "Source Table", [None] + all_table_names, index=0 if st.session_state.path_source is None else ([None] + all_table_names).index( next((name for oid, name in table_map.items() if oid == st.session_state.path_source), None) or None ) if st.session_state.path_source else 0, key="path_source_select" ) # target table selection target_table_name = st.sidebar.selectbox( "Target Table", [None] + all_table_names, index=0 if st.session_state.path_target is None else ([None] + all_table_names).index( next((name for oid, name in table_map.items() if oid == st.session_state.path_target), None) or None ) if st.session_state.path_target else 0, key="path_target_select" ) # find oids for selected tables path_source_oid = None path_target_oid = None if source_table_name: for oid, name in table_map.items(): if name == source_table_name: path_source_oid = oid # reset selected path if source changed if st.session_state.path_source != oid: st.session_state.selected_path_number = None st.session_state.path_source = oid break if target_table_name: for oid, name in table_map.items(): if name == target_table_name: path_target_oid = oid # reset selected path if target changed if st.session_state.path_target != oid: st.session_state.selected_path_number = None st.session_state.path_target = oid break # toggle for showing all paths vs shortest paths only show_all_paths = st.sidebar.checkbox( "Show All Paths (not just shortest)", value=False, help="When unchecked, shows only shortest paths. When checked, shows all simple paths up to 10 hops.", key="show_all_paths" ) # find paths if both source and target are selected all_paths = [] path_colors = [] filtered_paths = [] filtered_path_colors = [] if path_source_oid and path_target_oid: all_paths = find_all_simple_paths(graph, path_source_oid, path_target_oid, max_depth=10) # sort paths by length (shortest first) if all_paths: all_paths.sort(key=len) # filter to only shortest paths if checkbox is unchecked if all_paths and not show_all_paths: min_length = len(all_paths[0]) # already sorted, so first is shortest all_paths = [path for path in all_paths if len(path) == min_length] # generate distinct colors for each path num_paths = len(all_paths) if num_paths > 0: for i in range(num_paths): hue = i / max(num_paths, 1) rgb = colorsys.hsv_to_rgb(hue, 0.8, 0.9) color = f"#{int(rgb[0]*255):02x}{int(rgb[1]*255):02x}{int(rgb[2]*255):02x}" path_colors.append(color) # path number selector to show only specific path path_options = ["All Paths"] + [f"Path {i+1}" for i in range(num_paths)] # determine default index default_index = 0 if st.session_state.selected_path_number is not None: # check if the stored path number is still valid if 1 <= st.session_state.selected_path_number <= num_paths: default_index = st.session_state.selected_path_number else: st.session_state.selected_path_number = None selected_path_option = st.sidebar.selectbox( "Select Path to Display", path_options, index=default_index, key="path_selector", help="Select a specific path number to display only that path, or 'All Paths' to show all" ) # parse selected path number if selected_path_option == "All Paths": st.session_state.selected_path_number = None filtered_paths = all_paths filtered_path_colors = path_colors else: # extract path number (e.g., "Path 3" -> 2 (0-indexed)) path_num = int(selected_path_option.split()[1]) - 1 st.session_state.selected_path_number = path_num + 1 # store 1-indexed if 0 <= path_num < num_paths: filtered_paths = [all_paths[path_num]] filtered_path_colors = [path_colors[path_num]] else: filtered_paths = all_paths filtered_path_colors = path_colors else: filtered_paths = all_paths filtered_path_colors = path_colors # display path information if all_paths: if show_all_paths: # show all paths with varying lengths path_lengths = [len(path) - 1 for path in all_paths] min_hops = min(path_lengths) max_hops = max(path_lengths) if min_hops == max_hops: st.sidebar.success(f"Found {len(all_paths)} path(s) ({min_hops} hops)") else: st.sidebar.success(f"Found {len(all_paths)} path(s) ({min_hops}-{max_hops} hops)") else: # all paths have same length (shortest) path_length = len(all_paths[0]) - 1 st.sidebar.success(f"Found {len(all_paths)} shortest path(s) ({path_length} hops)") with st.sidebar.expander("📋 Path Details", expanded=True): for i, path in enumerate(all_paths): path_names = [table_map.get(node, 'unknown') for node in path] path_str = " → ".join(path_names) path_hops = len(path) - 1 st.markdown(f"**Path {i+1}** ({path_hops} hops):") st.markdown(f"{path_str}", unsafe_allow_html=True) st.markdown("---") else: st.sidebar.warning("No paths found between selected tables") elif path_source_oid or path_target_oid: st.sidebar.info("Select both source and target tables to find paths") # node search st.sidebar.subheader("Node Search") search_term = st.sidebar.text_input("Search table", "") selected_node = None if search_term: matching_tables = [name for name in all_table_names if search_term.lower() in name.lower()] if matching_tables: selected_table = st.sidebar.selectbox("Select table", matching_tables) # find oid for selected table for oid, name in table_map.items(): if name == selected_table: selected_node = oid break # stats st.sidebar.subheader("📊 Statistics") st.sidebar.metric("Total Tables", len(table_map)) if perspectives_dict: st.sidebar.metric("Available Perspectives", len(perspectives_dict)) if selected_perspective_tables: st.sidebar.metric("Filtered Tables", len(selected_perspective_tables)) st.sidebar.metric("Tree Edges", len(tree_edges_set) // 2) st.sidebar.metric("Back Edges", len(back_edges)) st.sidebar.metric("BFS Levels", len(bfs_levels)) st.sidebar.metric("Removed Edges", len(st.session_state.removed_edges)) st.sidebar.metric("Added Edges", len(st.session_state.added_edges)) # create network net = create_pyvis_network( graph, table_map, tree_structure, join_lookup, show_tree_edges=show_tree_edges, show_back_edges=show_back_edges, back_edges_list=back_edges, max_back_edges=max_back_edges, show_levels=selected_levels if selected_levels else None, selected_node=selected_node, tree_mode=tree_mode, paths=filtered_paths if path_source_oid and path_target_oid else None, path_colors=filtered_path_colors if path_source_oid and path_target_oid else None, path_source=path_source_oid if path_source_oid and path_target_oid else None, path_target=path_target_oid if path_source_oid and path_target_oid else None ) # save and display with tempfile.NamedTemporaryFile(delete=False, suffix='.html', mode='w') as tmp_file: net.save_graph(tmp_file.name) html_file = tmp_file.name with open(html_file, 'r', encoding='utf-8') as f: html_content = f.read() # modify physics settings directly in the HTML options # change physics.enabled from true to false after a delay import re # find and modify physics enabled in options def modify_physics_options(match): options_str = match.group(0) # replace "enabled": true with "enabled": false in physics section options_str = re.sub( r'("physics"\s*:\s*\{[^}]*)"enabled"\s*:\s*true', r'\1"enabled": false', options_str, flags=re.DOTALL ) return options_str # try to modify the options directly html_content = re.sub( r'"physics"\s*:\s*\{[^}]*"enabled"\s*:\s*true[^}]*\}', lambda m: m.group(0).replace('"enabled": true', '"enabled": false').replace('"enabled":True', '"enabled":false'), html_content ) # inject css to set Consolas font globally consolas_font_style = """ """ # inject javascript to reduce physics intensity after stabilization # keeps physics enabled for interactivity but reduces movement reduce_physics_script = """ """ # insert style in head and script before closing body tag if '' in html_content: html_content = html_content.replace('', '' + consolas_font_style) elif '' in html_content: html_content = html_content.replace('', consolas_font_style + '') else: html_content = consolas_font_style + html_content if '' in html_content: html_content = html_content.replace('', reduce_physics_script + '') else: html_content += reduce_physics_script st.components.v1.html(html_content, height=850, scrolling=True) # cleanup os.unlink(html_file) # info section with st.expander("â„šī¸ About this visualization"): st.markdown(""" **Graph Structure:** - All edges are displayed uniformly (same color and style) - All edges (tree edges and back edges) are always shown - **Tree Edges**: Parent-child relationships from BFS tree - **Back Edges**: Additional edges that create cycles **Edge Management:** - **Remove Edges**: Select and remove any edge from the graph - **Add Edges**: Manually add new edges between tables - Changes persist during the session - Use "Reset All Edge Changes" to restore original graph **Controls:** - Filter by BFS level to focus on specific hierarchy levels - Search and highlight specific tables **Path Finding:** - Select source and target tables to find all simple paths between them - Toggle "Show All Paths" to see all paths or just shortest paths - All paths are simple (no repeated nodes or edges) - Path list shows all found paths with hop counts - Paths automatically update when edges are removed or added **Interactivity:** - Click and drag nodes to rearrange - Hover over nodes/edges for details - Zoom with mouse wheel - Pan by dragging background """) except FileNotFoundError: st.error("❌ Model file not found. Please upload your .smodel file using the file uploader in the sidebar.") st.info("💡 **Tip**: In Hugging Face Spaces, you need to upload the file each time. Use the sidebar file uploader above.") except json.JSONDecodeError as e: st.error(f"❌ Error parsing JSON: {str(e)}") st.info("Please ensure you're uploading a valid .smodel or .json file.") except Exception as e: st.error(f"❌ Error loading data: {str(e)}") st.exception(e)