Spaces:
Sleeping
Sleeping
| import ast | |
| import base64 | |
| import dash | |
| from dash import html, dcc, dash_table | |
| import dash_cytoscape as cyto | |
| from dash.dependencies import Input, Output, State | |
| import io | |
| import json | |
| import urllib.parse | |
| app = dash.Dash(__name__, suppress_callback_exceptions=True) | |
| server = app.server | |
| def parse_functions_from_files(file_dict): | |
| functions = {} | |
| defined_funcs = set() | |
| for fname, code in file_dict.items(): | |
| tree = ast.parse(code) | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.FunctionDef): | |
| defined_funcs.add(node.name) | |
| for fname, code in file_dict.items(): | |
| tree = ast.parse(code) | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.FunctionDef): | |
| func_name = node.name | |
| args = [arg.arg for arg in node.args.args] | |
| returns = [] | |
| calls = [] | |
| reads_state = set() | |
| writes_state = set() | |
| for sub in ast.walk(node): | |
| # Function calls | |
| if isinstance(sub, ast.Call) and isinstance(sub.func, ast.Name): | |
| if sub.func.id in defined_funcs: | |
| calls.append(sub.func.id) | |
| # Return values | |
| elif isinstance(sub, ast.Return): | |
| if sub.value is None: | |
| continue | |
| elif isinstance(sub.value, ast.Tuple): | |
| returns.extend([ast.unparse(elt) for elt in sub.value.elts]) | |
| else: | |
| returns.append(ast.unparse(sub.value)) | |
| # Streamlit session_state interactions | |
| elif isinstance(sub, ast.Subscript): | |
| if ( | |
| isinstance(sub.value, ast.Attribute) | |
| and isinstance(sub.value.value, ast.Name) | |
| and sub.value.value.id == "st" | |
| and sub.value.attr == "session_state" | |
| ): | |
| key = None | |
| try: | |
| if isinstance(sub.slice, ast.Constant): # Python 3.9+ | |
| key = sub.slice.value | |
| elif isinstance(sub.slice, ast.Index) and isinstance(sub.slice.value, ast.Constant): # <3.9 | |
| key = sub.slice.value.value | |
| except Exception: | |
| pass | |
| if key: | |
| if isinstance(sub.ctx, ast.Store): | |
| writes_state.add(str(key)) | |
| else: | |
| reads_state.add(str(key)) | |
| functions[func_name] = { | |
| "args": args, | |
| "returns": returns, | |
| "calls": calls, | |
| "filename": fname, | |
| "reads_state": sorted(reads_state), | |
| "writes_state": sorted(writes_state), | |
| } | |
| return functions | |
| def get_reachable_functions(start, graph): | |
| visited = set() | |
| stack = [start] | |
| while stack: | |
| node = stack.pop() | |
| if node not in visited: | |
| visited.add(node) | |
| stack.extend(graph.get(node, [])) | |
| return visited | |
| app.layout = html.Div([ | |
| html.H2("Multi-File Function Dependency Graph (AST-based)"), | |
| html.Div([ | |
| dcc.Upload(id="upload", children=html.Button("Upload Python Files"), multiple=True, style={"marginRight": "10px"}), | |
| html.Div(id="main-function-ui", style={"marginRight": "10px", "width": "300px"}), | |
| html.A("Download Graph JSON", id="download-link", download="graph.json", href="", target="_blank") | |
| ], style={"display": "flex", "flexDirection": "row", "alignItems": "center", "gap": "10px"}), | |
| html.Div(id="file-name", style={"marginTop": "10px"}), | |
| cyto.Cytoscape( | |
| id="cytoscape-graph", | |
| layout={"name": "breadthfirst", "directed": True, "padding": 30, "spacingFactor": 1.5}, | |
| style={"width": "100%", "height": "600px"}, | |
| elements=[], | |
| stylesheet=[ | |
| {"selector": "node", "style": { | |
| "label": "data(label)", | |
| "text-wrap": "wrap", | |
| "text-max-width": 120, | |
| "text-valign": "center", | |
| "background-color": "#aed6f1" | |
| }}, | |
| {"selector": "edge", "style": { | |
| "curve-style": "bezier", | |
| "target-arrow-shape": "triangle", | |
| "arrow-scale": 2, | |
| "line-color": "#888", | |
| "target-arrow-color": "#888" | |
| }} | |
| ], | |
| userZoomingEnabled=True, | |
| userPanningEnabled=True, | |
| minZoom=0.2, | |
| maxZoom=2, | |
| wheelSensitivity=0.1 | |
| ), | |
| html.Hr(), | |
| html.H4("Function Input/Output Table"), | |
| dash_table.DataTable( | |
| id="function-table", | |
| columns=[], # dynamically injected | |
| style_table={"overflowX": "auto"}, | |
| style_cell={ | |
| "textAlign": "left", | |
| "whiteSpace": "normal", | |
| "wordBreak": "break-word", | |
| "maxWidth": 300 | |
| } | |
| ) | |
| ]) | |
| uploaded_files = {} | |
| def store_multi_upload(list_of_contents, list_of_names): | |
| if not list_of_contents or not list_of_names: | |
| return "", "" | |
| uploaded_files.clear() | |
| for content, name in zip(list_of_contents, list_of_names): | |
| _, content_string = content.split(",") | |
| uploaded_files[name] = base64.b64decode(content_string).decode("utf-8") | |
| parsed = parse_functions_from_files(uploaded_files) | |
| main_candidates = [fn for fn, f in parsed.items() if len(f["calls"]) > 0] | |
| options = [{"label": fn, "value": fn} for fn in main_candidates] | |
| dropdown = dcc.Dropdown(id="main-function", options=options, value=options[0]["value"] if options else None) | |
| return dropdown, f"Uploaded files: {', '.join(list_of_names)}" | |
| def update_multi_graph(main_func): | |
| if not uploaded_files or not main_func: | |
| return [], [], [], "" | |
| parsed = parse_functions_from_files(uploaded_files) | |
| graph = {k: v["calls"] for k, v in parsed.items()} | |
| reachable = get_reachable_functions(main_func, graph) | |
| nodes = [{"data": {"id": name, "label": name}} for name in reachable] | |
| edges = [] | |
| for src in reachable: | |
| for tgt in parsed[src]["calls"]: | |
| if tgt in reachable: | |
| edge = { | |
| "data": { | |
| "source": src, | |
| "target": tgt | |
| } | |
| } | |
| if parsed[src]["filename"] != parsed[tgt]["filename"]: | |
| edge["classes"] = "crossfile" | |
| edges.append(edge) | |
| raw_table_data = [{ | |
| "Function": fn, | |
| "Arguments": ", ".join(parsed[fn]["args"]), | |
| "Returns": ", ".join(parsed[fn]["returns"]), | |
| "Reads State": ", ".join(parsed[fn]["reads_state"]), | |
| "Writes State": ", ".join(parsed[fn]["writes_state"]), | |
| "File": parsed[fn]["filename"] | |
| } for fn in reachable] | |
| show_reads = any(row["Reads State"] for row in raw_table_data) | |
| show_writes = any(row["Writes State"] for row in raw_table_data) | |
| columns = [ | |
| {"name": "Function", "id": "Function"}, | |
| {"name": "Arguments", "id": "Arguments"}, | |
| {"name": "Returns", "id": "Returns"}, | |
| ] | |
| if show_reads: | |
| columns.append({"name": "Reads State", "id": "Reads State"}) | |
| if show_writes: | |
| columns.append({"name": "Writes State", "id": "Writes State"}) | |
| columns.append({"name": "File", "id": "File"}) | |
| json_data = json.dumps({"nodes": nodes, "edges": edges}, indent=2) | |
| href_data = "data:application/json;charset=utf-8," + urllib.parse.quote(json_data) | |
| return nodes + edges, raw_table_data, columns, href_data | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |