import ast import ast class CallCollector(ast.NodeVisitor): def __init__(self, defined_funcs): self.calls = [] self.defined_funcs = defined_funcs def visit_Call(self, node): if isinstance(node.func, ast.Name) and node.func.id in self.defined_funcs: self.calls.append(node.func.id) self.generic_visit(node) def parse_functions_from_files(file_dict): functions = {} defined_funcs = set() def infer_type_from_value(value_node): if isinstance(value_node, ast.Call) and isinstance(value_node.func, ast.Attribute): if value_node.func.attr in ("read_csv", "DataFrame"): return "pd.DataFrame" if value_node.func.attr == "array": return "np.ndarray" elif isinstance(value_node, ast.List): return "list" elif isinstance(value_node, ast.Dict): return "dict" elif isinstance(value_node, ast.Set): return "set" elif isinstance(value_node, ast.Constant): if isinstance(value_node.value, str): return "str" if isinstance(value_node.value, bool): return "bool" if isinstance(value_node.value, int): return "int" if isinstance(value_node.value, float): return "float" return "?" for fname, code in file_dict.items(): tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): defined_funcs.add(node.name) for fname, code in file_dict.items(): tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): func_name = node.name args, returns = [], [] local_assignments = {} reads_state, writes_state = set(), set() for arg in node.args.args: arg_type = ast.unparse(arg.annotation) if arg.annotation else "?" args.append(f"{arg.arg}: {arg_type}") collector = CallCollector(defined_funcs) collector.visit(node) calls = collector.calls for sub in ast.walk(node): if isinstance(sub, ast.Assign): for target in sub.targets: if isinstance(target, ast.Name): local_assignments[target.id] = infer_type_from_value(sub.value) elif isinstance(sub, ast.Return): if sub.value is None: continue if isinstance(sub.value, ast.Tuple): for elt in sub.value.elts: label = ast.unparse(elt) returns.append(f"{label}: {local_assignments.get(label, infer_type_from_value(elt))}") else: label = ast.unparse(sub.value) returns.append(f"{label}: {local_assignments.get(label, infer_type_from_value(sub.value))}") functions[func_name] = { "args": args, "returns": returns, "calls": calls, "filename": fname, "reads_state": sorted(reads_state), "writes_state": sorted(writes_state) } return functions def get_reachable_functions(start, graph): visited, stack = set(), [start] while stack: node = stack.pop() if node not in visited: visited.add(node) stack.extend(graph.get(node, [])) return visited def get_backtrace_functions(target, graph): reverse_graph = {} for caller, callees in graph.items(): for callee in callees: reverse_graph.setdefault(callee, []).append(caller) visited, stack = set(), [target] while stack: node = stack.pop() if node not in visited: visited.add(node) stack.extend(reverse_graph.get(node, [])) return visited from collections import deque, defaultdict def build_nodes_and_edges(parsed, root_func, reachable, reverse=False, max_depth=10): depth_map = {} x_offset_map = defaultdict(int) positions = {} visited = set() queue = deque([(root_func, 0)]) while queue: current, depth = queue.popleft() if current in visited or depth > max_depth: continue visited.add(current) adjusted_depth = -depth if reverse else depth x = x_offset_map[adjusted_depth] * 300 y = adjusted_depth * 150 positions[current] = {"x": x, "y": y} x_offset_map[adjusted_depth] += 1 if reverse: # Find callers of this function next_funcs = [ caller for caller, meta in parsed.items() if current in meta["calls"] and caller in reachable ] else: # Find callees next_funcs = [ callee for callee in parsed[current]["calls"] if callee in reachable ] for nxt in next_funcs: queue.append((nxt, depth + 1)) nodes = [{ "data": {"id": name, "label": name}, "position": positions.get(name, {"x": 0, "y": 0}), "classes": "main" if name == root_func else "" } for name in visited] edges = [] for src in visited: call_sequence = parsed[src]["calls"] call_index = 1 for tgt in call_sequence: if tgt in visited: edges.append({ "data": { "source": src, "target": tgt, "label": str(call_index) }, "style": { "line-width": 4 if call_sequence.count(tgt) > 1 else 2 } }) call_index += 1 return nodes, edges