| import ast |
|
|
| from structlog import get_logger |
| import symtable |
| from typing import Union |
|
|
| from .anutils import ( |
| ExecuteInInnerScope, |
| Scope, |
| UnresolvedSuperCallError, |
| format_alias, |
| get_ast_node_name, |
| get_module_name, |
| resolve_method_resolution_order, |
| sanitize_exprs, |
| tail, |
| ) |
| from .node import Flavor, Node |
|
|
|
|
| logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
| class CallGraphVisitor(ast.NodeVisitor): |
| """A visitor that can be walked over a Python AST, and will derive |
| information about the objects in the AST and how they use each other. |
| |
| A single CallGraphVisitor object can be run over several ASTs (from a |
| set of source files). The resulting information is the aggregate from |
| all files. This way use information between objects in different files |
| can be gathered.""" |
|
|
| def __init__(self, files:dict, root: str = None): |
| self.logger = logger |
|
|
| |
| self.module_to_filename = {} |
| for filename, _ in files.items(): |
| mod_name = get_module_name(filename, files=files) |
| self.module_to_filename[mod_name] = filename |
| self.filenames = files.keys() |
| self.root = root |
| self.files = files |
| |
| self.defines_edges = {} |
| self.uses_edges = {} |
| self.nodes = {} |
| self.scopes = {} |
|
|
| self.class_base_ast_nodes = {} |
| self.class_base_nodes = {} |
| self.mro = {} |
|
|
| |
| self.module_name = None |
| self.filename = None |
| self.name_stack = [] |
| self.scope_stack = [] |
| self.class_stack = [] |
| self.context_stack = [] |
| self.last_value = None |
|
|
| |
| self.process() |
|
|
| def process(self): |
| """Analyze the set of files, twice so that any forward-references are picked up.""" |
| for pas in range(2): |
| for filename in self.filenames: |
| self.logger.info("========== pass %d, file '%s' ==========" % (pas + 1, filename)) |
| self.process_one(filename) |
| if pas == 0: |
| self.resolve_base_classes() |
| self.postprocess() |
|
|
| def process_one(self, filename): |
| """Analyze the specified Python source file.""" |
| if filename not in self.filenames: |
| raise ValueError( |
| "Filename '%s' has not been preprocessed (was not given to __init__, which got %s)" |
| % (filename, self.filenames) |
| ) |
| |
| self.filename = filename |
| self.module_name = get_module_name(filename, self.files, root=self.root) |
| self.analyze_scopes(self.files[filename], filename) |
| self.visit(ast.parse(self.files[filename], filename)) |
| self.module_name = None |
| self.filename = None |
|
|
| def resolve_base_classes(self): |
| """Resolve base classes from AST nodes to Nodes. |
| |
| Run this between pass 1 and pass 2 to pick up inherited methods. |
| Currently, this can parse ast.Names and ast.Attributes as bases. |
| """ |
| self.logger.debug("Resolving base classes") |
| assert len(self.scope_stack) == 0 |
| for node in self.class_base_ast_nodes: |
| self.class_base_nodes[node] = [] |
| for ast_node in self.class_base_ast_nodes[node]: |
| |
| self.scope_stack.append(self.scopes[node.namespace]) |
|
|
| if isinstance(ast_node, ast.Name): |
| baseclass_node = self.get_value(ast_node.id) |
| elif isinstance(ast_node, ast.Attribute): |
| _, baseclass_node = self.get_attribute(ast_node) |
| else: |
| baseclass_node = None |
|
|
| self.scope_stack.pop() |
|
|
| if isinstance(baseclass_node, Node) and baseclass_node.namespace is not None: |
| self.class_base_nodes[node].append(baseclass_node) |
|
|
| self.logger.debug("All base classes (non-recursive, local level only): %s" % self.class_base_nodes) |
|
|
| self.logger.debug("Resolving method resolution order (MRO) for all analyzed classes") |
| self.mro = resolve_method_resolution_order(self.class_base_nodes, self.logger) |
| self.logger.debug("Method resolution order (MRO) for all analyzed classes: %s" % self.mro) |
|
|
| def postprocess(self): |
| """Finalize the analysis.""" |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| self.expand_unknowns() |
| self.resolve_imports() |
| self.contract_nonexistents() |
| self.cull_inherited() |
| self.collapse_inner() |
|
|
| |
| |
|
|
| |
|
|
| |
| |
|
|
| def resolve_imports(self): |
| """ |
| resolve relative imports and remap nodes |
| """ |
| |
| |
| imports_to_resolve = {n for items in self.nodes.values() for n in items if n.flavor == Flavor.IMPORTEDITEM} |
| |
| import_mapping = {} |
| while len(imports_to_resolve) > 0: |
| from_node = imports_to_resolve.pop() |
| if from_node in import_mapping: |
| continue |
| to_uses = self.uses_edges.get(from_node, set([from_node])) |
| assert len(to_uses) == 1 |
| to_node = to_uses.pop() |
| |
| if to_node.namespace == "": |
| module_node = to_node |
| else: |
| assert from_node.name == to_node.name |
| module_node = self.get_node("", to_node.namespace) |
| module_uses = self.uses_edges.get(module_node) |
| if module_uses is not None: |
| |
| for candidate_to_node in module_uses: |
| if candidate_to_node.name == from_node.name: |
| to_node = candidate_to_node |
| import_mapping[from_node] = to_node |
| if to_node.flavor == Flavor.IMPORTEDITEM and from_node is not to_node: |
| imports_to_resolve.add(to_node) |
| break |
|
|
| |
| |
| attribute_import_mapping = {} |
| for nodes in self.nodes.values(): |
| for node in nodes: |
| if not node.defined and node.flavor == Flavor.ATTRIBUTE: |
| |
| for from_node, to_node in import_mapping.items(): |
| if ( |
| f"{from_node.namespace}.{from_node.name}" == node.namespace |
| and from_node.flavor == Flavor.IMPORTEDITEM |
| ): |
| |
| if to_node not in self.defines_edges: |
| continue |
| for candidate_to_node in self.defines_edges[to_node]: |
| if candidate_to_node.name == node.name: |
| attribute_import_mapping[node] = candidate_to_node |
| break |
| import_mapping.update(attribute_import_mapping) |
|
|
| |
| self.nodes = {name: [import_mapping.get(n, n) for n in items] for name, items in self.nodes.items()} |
| self.uses_edges = { |
| import_mapping.get(from_node, from_node): {import_mapping.get(to_node, to_node) for to_node in to_nodes} |
| for from_node, to_nodes in self.uses_edges.items() |
| if len(to_nodes) > 0 |
| } |
| self.defines_edges = { |
| import_mapping.get(from_node, from_node): {import_mapping.get(to_node, to_node) for to_node in to_nodes} |
| for from_node, to_nodes in self.defines_edges.items() |
| if len(to_nodes) > 0 |
| } |
|
|
| def filter(self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000): |
| """ |
| filter callgraph nodes that related to `node` or are in `namespace` |
| |
| Args: |
| node: pyan node for which related nodes should be found, if none, filter only for namespace |
| namespace: namespace to search in (name of top level module), |
| if None, determines namespace from `node` |
| max_iter: maximum number of iterations and nodes to iterate |
| |
| Returns: |
| self |
| """ |
| |
| filtered_nodes = self.get_related_nodes(node, namespace=namespace, max_iter=max_iter) |
|
|
| self.nodes = {name: [node for node in nodes if node in filtered_nodes] for name, nodes in self.nodes.items()} |
| self.uses_edges = { |
| node: {n for n in nodes if n in filtered_nodes} |
| for node, nodes in self.uses_edges.items() |
| if node in filtered_nodes |
| } |
| self.defines_edges = { |
| node: {n for n in nodes if n in filtered_nodes} |
| for node, nodes in self.defines_edges.items() |
| if node in filtered_nodes |
| } |
| return self |
|
|
| def get_related_nodes( |
| self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000 |
| ) -> set: |
| """ |
| get nodes that related to `node` or are in `namespace` |
| |
| Args: |
| node: pyan node for which related nodes should be found, if none, filter only for namespace |
| namespace: namespace to search in (name of top level module), |
| if None, determines namespace from `node` |
| max_iter: maximum number of iterations and nodes to iterate |
| |
| Returns: |
| set: set of nodes related to `node` including `node` itself |
| """ |
| |
| if node is None: |
| queue = [] |
| if namespace is None: |
| new_nodes = {n for items in self.nodes.values() for n in items} |
| else: |
| new_nodes = { |
| n |
| for items in self.nodes.values() |
| for n in items |
| if n.namespace is not None and namespace in n.namespace |
| } |
|
|
| else: |
| new_nodes = set() |
| if namespace is None: |
| namespace = node.namespace.strip(".").split(".", 1)[0] |
| queue = [node] |
|
|
| |
| |
| |
| i = max_iter |
| while len(queue) > 0: |
| item = queue.pop() |
| if item not in new_nodes: |
| new_nodes.add(item) |
| i -= 1 |
| if i < 0: |
| break |
| queue.extend( |
| [ |
| n |
| for n in self.uses_edges.get(item, []) |
| if n in self.uses_edges and n not in new_nodes and namespace in n.namespace |
| ] |
| ) |
| queue.extend( |
| [ |
| n |
| for n in self.defines_edges.get(item, []) |
| if n in self.defines_edges and n not in new_nodes and namespace in n.namespace |
| ] |
| ) |
|
|
| return new_nodes |
|
|
| def visit_Module(self, node): |
| self.logger.debug("Module %s, %s" % (self.module_name, self.filename)) |
|
|
| |
| module_node = self.get_node("", self.module_name, node, flavor=Flavor.MODULE) |
| self.associate_node(module_node, node, filename=self.filename) |
|
|
| ns = self.module_name |
| self.name_stack.append(ns) |
| self.scope_stack.append(self.scopes[ns]) |
| self.context_stack.append("Module %s" % (ns)) |
| self.generic_visit(node) |
| self.context_stack.pop() |
| self.scope_stack.pop() |
| self.name_stack.pop() |
| self.last_value = None |
|
|
| if self.add_defines_edge(module_node, None): |
| self.logger.info("Def Module %s" % node) |
|
|
| def visit_ClassDef(self, node): |
| self.logger.debug("ClassDef %s, %s:%s" % (node.name, self.filename, node.lineno)) |
|
|
| from_node = self.get_node_of_current_namespace() |
| ns = from_node.get_name() |
| to_node = self.get_node(ns, node.name, node, flavor=Flavor.CLASS) |
| if self.add_defines_edge(from_node, to_node): |
| self.logger.info("Def from %s to Class %s" % (from_node, to_node)) |
|
|
| |
| |
| |
| |
| |
| |
| |
| self.associate_node(to_node, node, self.filename) |
|
|
| |
| |
| |
| self.set_value(node.name, to_node) |
|
|
| self.class_stack.append(to_node) |
| self.name_stack.append(node.name) |
| inner_ns = self.get_node_of_current_namespace().get_name() |
| self.scope_stack.append(self.scopes[inner_ns]) |
| self.context_stack.append("ClassDef %s" % (node.name)) |
|
|
| self.class_base_ast_nodes[to_node] = [] |
| for b in node.bases: |
| |
| self.class_base_ast_nodes[to_node].append(b) |
| |
| self.visit(b) |
|
|
| for stmt in node.body: |
| self.visit(stmt) |
|
|
| self.context_stack.pop() |
| self.scope_stack.pop() |
| self.name_stack.pop() |
| self.class_stack.pop() |
|
|
| def visit_FunctionDef(self, node): |
| self.logger.debug("FunctionDef %s, %s:%s" % (node.name, self.filename, node.lineno)) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| self_name, flavor = self.analyze_functiondef(node) |
|
|
| |
| |
| from_node = self.get_node_of_current_namespace() |
| ns = from_node.get_name() |
| to_node = self.get_node(ns, node.name, node, flavor=flavor) |
| if self.add_defines_edge(from_node, to_node): |
| self.logger.info("Def from %s to Function %s" % (from_node, to_node)) |
|
|
| |
| |
| self.associate_node(to_node, node, self.filename) |
| self.set_value(node.name, to_node) |
|
|
| |
| |
| self.name_stack.append(node.name) |
| inner_ns = self.get_node_of_current_namespace().get_name() |
| if inner_ns not in self.scopes: |
| self.name_stack.pop() |
| return |
| self.scope_stack.append(self.scopes[inner_ns]) |
| self.context_stack.append("FunctionDef %s" % (node.name)) |
|
|
| |
| |
| self.generate_args_nodes(node.args, inner_ns) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if self_name is not None: |
| class_node = self.get_current_class() |
| self.scopes[inner_ns].defs[self_name] = class_node |
| self.logger.info('Method def: setting self name "%s" to %s' % (self_name, class_node)) |
|
|
| |
| self.analyze_arguments(node.args) |
|
|
| |
| |
| for stmt in node.body: |
| self.visit(stmt) |
|
|
| |
| |
| self.context_stack.pop() |
| self.scope_stack.pop() |
| self.name_stack.pop() |
|
|
| def visit_AsyncFunctionDef(self, node): |
| self.visit_FunctionDef(node) |
|
|
| def visit_Lambda(self, node): |
| |
| self.logger.debug("Lambda, %s:%s" % (self.filename, node.lineno)) |
| with ExecuteInInnerScope(self, "lambda"): |
| inner_ns = self.get_node_of_current_namespace().get_name() |
| self.generate_args_nodes(node.args, inner_ns) |
| self.analyze_arguments(node.args) |
| self.visit(node.body) |
|
|
| def generate_args_nodes(self, ast_args, inner_ns): |
| """Capture which names correspond to function args. |
| |
| In the function scope, set them to a nonsense Node, |
| to prevent leakage of identifiers of matching name |
| from the enclosing scope (due to the local value being None |
| until we set it to this nonsense Node). |
| |
| ast_args: node.args from a FunctionDef or Lambda |
| inner_ns: namespace of the function or lambda, for scope lookup |
| """ |
| sc = self.scopes[inner_ns] |
| |
| |
| |
| |
| nonsense_node = self.get_node(inner_ns, "^^^argument^^^", None) |
| |
| for a in ast_args.args: |
| sc.defs[a.arg] = nonsense_node |
| if ast_args.vararg is not None: |
| sc.defs[ast_args.vararg] = nonsense_node |
| for a in ast_args.kwonlyargs: |
| sc.defs[a.arg] = nonsense_node |
| if ast_args.kwarg is not None: |
| sc.defs[ast_args.kwarg] = nonsense_node |
|
|
| def analyze_arguments(self, ast_args): |
| """Analyze an arguments node of the AST. |
| |
| Record bindings of args to the given default values, if present. |
| |
| Used for analyzing FunctionDefs and Lambdas.""" |
| |
| if ast_args.defaults: |
| n = len(ast_args.defaults) |
| for tgt, val in zip(ast_args.args[-n:], ast_args.defaults): |
| targets = sanitize_exprs(tgt) |
| values = sanitize_exprs(val) |
| self.analyze_binding(targets, values) |
| if ast_args.kw_defaults: |
| n = len(ast_args.kw_defaults) |
| for tgt, val in zip(ast_args.kwonlyargs, ast_args.kw_defaults): |
| if val is not None: |
| targets = sanitize_exprs(tgt) |
| values = sanitize_exprs(val) |
| self.analyze_binding(targets, values) |
|
|
| def visit_Import(self, node): |
| self.logger.debug("Import %s, %s:%s" % ([format_alias(x) for x in node.names], self.filename, node.lineno)) |
|
|
| |
| |
|
|
| for import_item in node.names: |
| self.analyze_module_import(import_item, node) |
|
|
| def visit_ImportFrom(self, node): |
| self.logger.debug( |
| "ImportFrom: from %s import %s, %s:%s" |
| % (node.module, [format_alias(x) for x in node.names], self.filename, node.lineno) |
| ) |
| |
| |
| |
| |
| |
| from_node = self.get_node_of_current_namespace() |
| if node.module is None: |
| self.logger.debug( |
| "ImportFrom (original) from %s import %s, %s:%s" |
| % ("." * node.level, [format_alias(x) for x in node.names], self.filename, node.lineno) |
| ) |
| tgt_level = node.level |
| current_module_namespace = self.module_name.rsplit(".", tgt_level)[0] |
| tgt_name = current_module_namespace |
| self.logger.debug( |
| "ImportFrom (resolved): from %s import %s, %s:%s" |
| % (tgt_name, [format_alias(x) for x in node.names], self.filename, node.lineno) |
| ) |
| elif node.level != 0: |
| self.logger.debug( |
| "ImportFrom (original): from %s import %s, %s:%s" |
| % (node.module, [format_alias(x) for x in node.names], self.filename, node.lineno) |
| ) |
| tgt_level = node.level |
| current_module_namespace = self.module_name.rsplit(".", tgt_level)[0] |
| tgt_name = current_module_namespace + "." + node.module |
| self.logger.debug( |
| "ImportFrom (resolved): from %s import %s, %s:%s" |
| % (tgt_name, [format_alias(x) for x in node.names], self.filename, node.lineno) |
| ) |
| else: |
| tgt_name = node.module |
|
|
| |
| for alias in node.names: |
| |
| if tgt_name + "." + alias.name in self.module_to_filename: |
| to_node = self.get_node("", tgt_name + "." + alias.name, node, flavor=Flavor.MODULE) |
| else: |
| to_node = self.get_node(tgt_name, alias.name, node, flavor=Flavor.IMPORTEDITEM) |
| |
| if alias.asname is not None: |
| alias_name = alias.asname |
| else: |
| alias_name = alias.name |
| self.set_value(alias_name, to_node) |
| self.logger.info("From setting name %s to %s" % (alias_name, to_node)) |
|
|
| self.logger.debug("Use from %s to ImportFrom %s" % (from_node, to_node)) |
| if self.add_uses_edge(from_node, to_node): |
| self.logger.info("New edge added for Use from %s to ImportFrom %s" % (from_node, to_node)) |
|
|
| def analyze_module_import(self, import_item, ast_node): |
| """Analyze a names AST node inside an Import or ImportFrom AST node. |
| |
| This handles the case where the objects being imported are modules. |
| |
| import_item: an item of ast_node.names |
| ast_node: for recording source location information |
| """ |
| src_name = import_item.name |
|
|
| |
| |
| |
| from_node = self.get_node_of_current_namespace() |
| |
| mod_node = self.get_node("", src_name, ast_node, flavor=Flavor.MODULE) |
| |
| if import_item.asname is not None: |
| alias_name = import_item.asname |
| else: |
| alias_name = mod_node.name |
| self.add_uses_edge(from_node, mod_node) |
| self.logger.info("New edge added for Use import %s in %s" % (mod_node, from_node)) |
| self.set_value(alias_name, mod_node) |
| self.logger.info("From setting name %s to %s" % (alias_name, mod_node)) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def visit_Constant(self, node): |
| self.logger.debug("Constant %s, %s:%s" % (node.value, self.filename, node.lineno)) |
| t = type(node.value) |
| ns = self.get_node_of_current_namespace().get_name() |
| tn = t.__name__ |
| self.last_value = self.get_node(ns, tn, node, flavor=Flavor.ATTRIBUTE) |
|
|
| |
| def visit_Attribute(self, node): |
| objname = get_ast_node_name(node.value) |
| self.logger.debug( |
| "Attribute %s of %s in context %s, %s:%s" % (node.attr, objname, type(node.ctx), self.filename, node.lineno) |
| ) |
|
|
| |
| |
| |
| |
| if isinstance(node.ctx, ast.Store): |
| new_value = self.last_value |
| try: |
| if self.set_attribute(node, new_value): |
| self.logger.info("setattr %s on %s to %s" % (node.attr, objname, new_value)) |
| except UnresolvedSuperCallError: |
| |
| |
| return |
|
|
| elif isinstance(node.ctx, ast.Load): |
| try: |
| obj_node, attr_node = self.get_attribute(node) |
| except UnresolvedSuperCallError: |
| |
| |
| return |
|
|
| |
| if isinstance(attr_node, Node): |
| self.logger.info("getattr %s on %s returns %s" % (node.attr, objname, attr_node)) |
|
|
| |
| from_node = self.get_node_of_current_namespace() |
| self.logger.debug("Use from %s to %s" % (from_node, attr_node)) |
| if self.add_uses_edge(from_node, attr_node): |
| self.logger.info("New edge added for Use from %s to %s" % (from_node, attr_node)) |
|
|
| |
| if attr_node.namespace is not None: |
| self.remove_wild(from_node, attr_node, node.attr) |
|
|
| self.last_value = attr_node |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| elif isinstance(obj_node, Node) and obj_node.namespace is not None: |
| tgt_name = node.attr |
| from_node = self.get_node_of_current_namespace() |
| ns = obj_node.get_name() |
| to_node = self.get_node(ns, tgt_name, node, flavor=Flavor.ATTRIBUTE) |
| self.logger.debug( |
| f"Use from {from_node} to {to_node} (target obj {obj_node} known but target attr " |
| f"{node.attr} not resolved; maybe fwd ref or unanalyzed import)" |
| ) |
| if self.add_uses_edge(from_node, to_node): |
| self.logger.info( |
| "New edge added for Use from {from_node} to {to_node} (target obj {obj_node} known but " |
| f"target attr {node.attr} not resolved; maybe fwd ref or unanalyzed import)" |
| ) |
|
|
| |
| self.remove_wild(from_node, obj_node, node.attr) |
|
|
| self.last_value = to_node |
|
|
| |
| else: |
| self.visit(node.value) |
|
|
| |
| def visit_Name(self, node): |
| self.logger.debug("Name %s in context %s, %s:%s" % (node.id, type(node.ctx), self.filename, node.lineno)) |
|
|
| |
| |
| |
| if isinstance(node.ctx, ast.Store): |
| |
| self.set_value(node.id, self.last_value) |
|
|
| |
| elif isinstance(node.ctx, ast.Load): |
| tgt_name = node.id |
| to_node = self.get_value(tgt_name) |
| current_class = self.get_current_class() |
| if current_class is None or to_node is not current_class: |
| |
| |
| if not isinstance(to_node, Node): |
| |
| to_node = self.get_node(None, tgt_name, node, flavor=Flavor.UNKNOWN) |
|
|
| from_node = self.get_node_of_current_namespace() |
| self.logger.debug("Use from %s to Name %s" % (from_node, to_node)) |
| if self.add_uses_edge(from_node, to_node): |
| self.logger.info("New edge added for Use from %s to Name %s" % (from_node, to_node)) |
|
|
| self.last_value = to_node |
|
|
| def visit_Assign(self, node): |
| |
| |
| |
| if len(node.targets) > 1: |
| self.logger.debug("Assign (chained with %d outputs)" % (len(node.targets))) |
|
|
| |
| |
| |
|
|
| values = sanitize_exprs(node.value) |
| for targets in node.targets: |
| targets = sanitize_exprs(targets) |
| self.logger.debug( |
| "Assign %s %s, %s:%s" |
| % ( |
| [get_ast_node_name(x) for x in targets], |
| [get_ast_node_name(x) for x in values], |
| self.filename, |
| node.lineno, |
| ) |
| ) |
| self.analyze_binding(targets, values) |
|
|
| def visit_AnnAssign(self, node): |
| target = sanitize_exprs(node.target) |
| self.last_value = None |
| if node.value is not None: |
| value = sanitize_exprs(node.value) |
| |
| |
| self.logger.debug( |
| "AnnAssign %s %s, %s:%s" |
| % (get_ast_node_name(target[0]), get_ast_node_name(value), self.filename, node.lineno) |
| ) |
| self.analyze_binding(target, value) |
| else: |
| self.logger.debug( |
| "AnnAssign %s <no value>, %s:%s" % (get_ast_node_name(target[0]), self.filename, node.lineno) |
| ) |
| self.last_value = None |
| self.visit(target[0]) |
| |
| |
|
|
| def visit_AugAssign(self, node): |
| targets = sanitize_exprs(node.target) |
| values = sanitize_exprs(node.value) |
|
|
| self.logger.debug( |
| "AugAssign %s %s %s, %s:%s" |
| % ( |
| [get_ast_node_name(x) for x in targets], |
| type(node.op), |
| [get_ast_node_name(x) for x in values], |
| self.filename, |
| node.lineno, |
| ) |
| ) |
|
|
| |
| self.analyze_binding(targets, values) |
|
|
| |
| |
| |
| |
| |
| |
| |
| def visit_For(self, node): |
| self.logger.debug("For-loop, %s:%s" % (self.filename, node.lineno)) |
|
|
| targets = sanitize_exprs(node.target) |
| values = sanitize_exprs(node.iter) |
| self.analyze_binding(targets, values) |
|
|
| for stmt in node.body: |
| self.visit(stmt) |
| for stmt in node.orelse: |
| self.visit(stmt) |
|
|
| def visit_AsyncFor(self, node): |
| self.visit_For(node) |
|
|
| def visit_ListComp(self, node): |
| self.logger.debug("ListComp, %s:%s" % (self.filename, node.lineno)) |
| self.analyze_comprehension(node, "listcomp") |
|
|
| def visit_SetComp(self, node): |
| self.logger.debug("SetComp, %s:%s" % (self.filename, node.lineno)) |
| self.analyze_comprehension(node, "setcomp") |
|
|
| def visit_DictComp(self, node): |
| self.logger.debug("DictComp, %s:%s" % (self.filename, node.lineno)) |
| self.analyze_comprehension(node, "dictcomp", field1="key", field2="value") |
|
|
| def visit_GeneratorExp(self, node): |
| self.logger.debug("GeneratorExp, %s:%s" % (self.filename, node.lineno)) |
| self.analyze_comprehension(node, "genexpr") |
|
|
| def analyze_comprehension(self, node, label, field1="elt", field2=None): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| gens = node.generators |
| outermost = gens[0] |
| moregens = gens[1:] if len(gens) > 1 else [] |
|
|
| outermost_iters = sanitize_exprs(outermost.iter) |
| outermost_targets = sanitize_exprs(outermost.target) |
| for expr in outermost_iters: |
| self.visit(expr) |
| if label not in {"lambda", "listcomp", "setcomp", "dictcomp", "genexpr"}: |
| with ExecuteInInnerScope(self, label): |
| for expr in outermost_targets: |
| self.visit(expr) |
| self.last_value = None |
| for expr in outermost.ifs: |
| self.visit(expr) |
|
|
| |
| for gen in moregens: |
| targets = sanitize_exprs(gen.target) |
| values = sanitize_exprs(gen.iter) |
| self.analyze_binding(targets, values) |
| for expr in gen.ifs: |
| self.visit(expr) |
|
|
| self.visit(getattr(node, field1)) |
| if field2: |
| self.visit(getattr(node, field2)) |
|
|
| def visit_Call(self, node): |
| self.logger.debug("Call %s, %s:%s" % (get_ast_node_name(node.func), self.filename, node.lineno)) |
|
|
| |
| for arg in node.args: |
| self.visit(arg) |
| for kw in node.keywords: |
| self.visit(kw.value) |
|
|
| |
| try: |
| result_node = self.resolve_builtins(node) |
| except UnresolvedSuperCallError: |
| result_node = None |
|
|
| if isinstance(result_node, Node): |
| self.last_value = result_node |
|
|
| from_node = self.get_node_of_current_namespace() |
| to_node = result_node |
| self.logger.debug("Use from %s to %s (via resolved call to built-ins)" % (from_node, to_node)) |
| if self.add_uses_edge(from_node, to_node): |
| self.logger.info( |
| "New edge added for Use from %s to %s (via resolved call to built-ins)" % (from_node, to_node) |
| ) |
|
|
| else: |
| |
| |
| self.visit(node.func) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if self.last_value in self.class_base_ast_nodes: |
| from_node = self.get_node_of_current_namespace() |
| class_node = self.last_value |
| to_node = self.get_node(class_node.get_name(), "__init__", None, flavor=Flavor.METHOD) |
| self.logger.debug("Use from %s to %s (call creates an instance)" % (from_node, to_node)) |
| if self.add_uses_edge(from_node, to_node): |
| self.logger.info( |
| "New edge added for Use from %s to %s (call creates an instance)" % (from_node, to_node) |
| ) |
|
|
| def visit_With(self, node): |
| self.logger.debug("With (context manager), %s:%s" % (self.filename, node.lineno)) |
|
|
| def add_uses_enter_exit_of(graph_node): |
| |
| if isinstance(graph_node, Node): |
| from_node = self.get_node_of_current_namespace() |
| withed_obj_node = graph_node |
|
|
| self.logger.debug("Use from %s to With %s" % (from_node, withed_obj_node)) |
| for methodname in ("__enter__", "__exit__"): |
| to_node = self.get_node(withed_obj_node.get_name(), methodname, None, flavor=Flavor.METHOD) |
| if self.add_uses_edge(from_node, to_node): |
| self.logger.info("New edge added for Use from %s to %s" % (from_node, to_node)) |
|
|
| for withitem in node.items: |
| expr = withitem.context_expr |
| vars = withitem.optional_vars |
|
|
| |
| self.last_value = None |
| self.visit(expr) |
| add_uses_enter_exit_of(self.last_value) |
| self.last_value = None |
|
|
| if vars is not None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if isinstance(vars, ast.Name): |
| self.analyze_binding(sanitize_exprs(vars), sanitize_exprs(expr)) |
| else: |
| self.visit(vars) |
|
|
| for stmt in node.body: |
| self.visit(stmt) |
|
|
| |
| |
|
|
| def analyze_functiondef(self, ast_node): |
| """Analyze a function definition. |
| |
| Visit decorators, and if this is a method definition, capture the name |
| of the first positional argument to denote "self", like Python does. |
| |
| Return (self_name, flavor), where self_name the name representing self, |
| or None if not applicable; and flavor is a Flavor, specifically one of |
| FUNCTION, METHOD, STATICMETHOD or CLASSMETHOD.""" |
|
|
| if not isinstance(ast_node, (ast.AsyncFunctionDef, ast.FunctionDef)): |
| raise TypeError("Expected ast.FunctionDef; got %s" % (type(ast_node))) |
|
|
| |
| self.last_value = None |
| deco_names = [] |
| for deco in ast_node.decorator_list: |
| self.visit(deco) |
| deco_node = self.last_value |
| if isinstance(deco_node, Node): |
| deco_names.append(deco_node.name) |
| self.last_value = None |
|
|
| |
| in_class_ns = self.context_stack[-1].startswith("ClassDef") |
| if not in_class_ns: |
| flavor = Flavor.FUNCTION |
| else: |
| if "staticmethod" in deco_names: |
| flavor = Flavor.STATICMETHOD |
| elif "classmethod" in deco_names: |
| flavor = Flavor.CLASSMETHOD |
| else: |
| flavor = Flavor.METHOD |
|
|
| |
| |
| |
| |
| |
| |
| if flavor in (Flavor.METHOD, Flavor.CLASSMETHOD): |
| |
| |
| all_args = ast_node.args |
| posargs = all_args.args |
| if len(posargs): |
| self_name = posargs[0].arg |
| return self_name, flavor |
|
|
| return None, flavor |
|
|
| def analyze_binding(self, targets, values): |
| """Generic handler for binding forms. Inputs must be sanitize_exprs()d.""" |
|
|
| |
| |
| |
| |
| |
| self.last_value = None |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| if len(targets) == len(values): |
| captured_values = [] |
| for value in values: |
| self.visit(value) |
| captured_values.append(self.last_value) |
| self.last_value = None |
| for tgt, val in zip(targets, captured_values): |
| self.last_value = val |
| self.visit(tgt) |
| self.last_value = None |
| else: |
| |
| for value in values: |
| self.visit(value) |
| for tgt in targets: |
| self.visit(tgt) |
| self.last_value = None |
|
|
| def resolve_builtins(self, ast_node): |
| """Resolve those calls to built-in functions whose return values |
| can be determined in a simple manner. |
| |
| Currently, this supports: |
| |
| - str(obj), repr(obj) --> obj.__str__, obj.__repr__ |
| |
| - super() (any arguments ignored), which works only in pass 2, |
| because the MRO is determined between passes. |
| |
| May raise UnresolvedSuperCallError, if the call is to super(), |
| but the result cannot be (currently) determined (usually because either |
| pass 1, or some relevant source file is not in the analyzed set). |
| |
| Returns the Node the call resolves to, or None if not determined. |
| """ |
| if not isinstance(ast_node, ast.Call): |
| raise TypeError("Expected ast.Call; got %s" % (type(ast_node))) |
|
|
| func_ast_node = ast_node.func |
| if isinstance(func_ast_node, ast.Name): |
| funcname = func_ast_node.id |
| if funcname == "super": |
| class_node = self.get_current_class() |
| self.logger.debug("Resolving super() of %s" % (class_node)) |
| if class_node in self.mro: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if len(self.mro[class_node]) > 1: |
| result = self.mro[class_node][1] |
| self.logger.debug("super of %s is %s" % (class_node, result)) |
| return result |
| else: |
| msg = "super called for %s, but no known bases" % (class_node) |
| self.logger.info(msg) |
| raise UnresolvedSuperCallError(msg) |
| else: |
| msg = "super called for %s, but MRO not determined for it (maybe still in pass 1?)" % (class_node) |
| self.logger.info(msg) |
| raise UnresolvedSuperCallError(msg) |
|
|
| if funcname in ("str", "repr"): |
| if len(ast_node.args) == 1: |
| obj_astnode = ast_node.args[0] |
| if isinstance(obj_astnode, (ast.Name, ast.Attribute)): |
| self.logger.debug("Resolving %s() of %s" % (funcname, get_ast_node_name(obj_astnode))) |
| attrname = "__%s__" % (funcname) |
| |
| tmp_astnode = ast.Attribute(value=obj_astnode, attr=attrname, ctx=obj_astnode.ctx) |
| obj_node, attr_node = self.get_attribute(tmp_astnode) |
| self.logger.debug( |
| "Resolve %s() of %s: returning attr node %s" |
| % (funcname, get_ast_node_name(obj_astnode), attr_node) |
| ) |
| return attr_node |
|
|
| |
|
|
| def resolve_attribute(self, ast_node): |
| """Resolve an ast.Attribute. |
| |
| Nested attributes (a.b.c) are automatically handled by recursion. |
| |
| Return (obj,attrname), where obj is a Node (or None on lookup failure), |
| and attrname is the attribute name. |
| |
| May pass through UnresolvedSuperCallError, if the attribute resolution |
| failed specifically due to an unresolved super() call. |
| """ |
|
|
| if not isinstance(ast_node, ast.Attribute): |
| raise TypeError("Expected ast.Attribute; got %s" % (type(ast_node))) |
|
|
| self.logger.debug( |
| "Resolve %s.%s in context %s" % (get_ast_node_name(ast_node.value), ast_node.attr, type(ast_node.ctx)) |
| ) |
|
|
| |
| |
| |
| |
| |
| if isinstance(ast_node.value, ast.Attribute): |
| obj_node, attr_name = self.resolve_attribute(ast_node.value) |
|
|
| if isinstance(obj_node, Node) and obj_node.namespace is not None: |
| ns = obj_node.get_name() |
| if ns in self.scopes: |
| sc = self.scopes[ns] |
| if attr_name in sc.defs: |
| self.logger.debug("Resolved to attr %s of %s" % (ast_node.attr, sc.defs[attr_name])) |
| return sc.defs[attr_name], ast_node.attr |
|
|
| |
| |
| |
| |
| |
| |
| |
| self.logger.debug("Unresolved, returning attr %s of unknown" % (ast_node.attr)) |
| return None, ast_node.attr |
| else: |
| |
| if isinstance(ast_node.value, (ast.Num, ast.Str)): |
| t = type(ast_node.value) |
| tn = t.__name__ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| obj_node = self.get_node("", tn, None, flavor=Flavor.CLASS) |
|
|
| |
| elif isinstance(ast_node.value, ast.Call): |
| |
| |
| obj_node = self.resolve_builtins(ast_node.value) |
|
|
| |
| if not isinstance(obj_node, Node): |
| self.logger.debug("Unresolved function call as obj, returning attr %s of unknown" % (ast_node.attr)) |
| return None, ast_node.attr |
| else: |
| |
| |
| |
| |
| |
| obj_node = self.get_value(get_ast_node_name(ast_node.value)) |
|
|
| self.logger.debug("Resolved to attr %s of %s" % (ast_node.attr, obj_node)) |
| return obj_node, ast_node.attr |
|
|
| |
| |
|
|
| def analyze_scopes(self, code, filename): |
| """Gather lexical scope information.""" |
|
|
| |
| |
| |
| |
| |
| |
| |
| scopes = {} |
|
|
| def process(parent_ns, table): |
| sc = Scope(table) |
| ns = "%s.%s" % (parent_ns, sc.name) if len(sc.name) else parent_ns |
| scopes[ns] = sc |
| for t in table.get_children(): |
| process(ns, t) |
|
|
| process(self.module_name, symtable.symtable(code, filename, compile_type="exec")) |
|
|
| |
| for ns in scopes: |
| if ns not in self.scopes: |
| self.scopes[ns] = scopes[ns] |
| else: |
| sc = scopes[ns] |
| oldsc = self.scopes[ns] |
| for name in sc.defs: |
| if name not in oldsc.defs: |
| oldsc.defs[name] = sc.defs[name] |
|
|
| self.logger.debug("Scopes now: %s" % (self.scopes)) |
|
|
| def get_current_class(self): |
| """Return the node representing the current class, or None if not inside a class definition.""" |
| return self.class_stack[-1] if len(self.class_stack) else None |
|
|
| def get_node_of_current_namespace(self): |
| """Return the unique node representing the current namespace, |
| based on self.name_stack. |
| |
| For a Node n representing a namespace: |
| - n.namespace = fully qualified name of the parent namespace |
| (empty string if at top level) |
| - n.name = name of this namespace |
| - no associated AST node. |
| """ |
| assert len(self.name_stack) |
|
|
| namespace = ".".join(self.name_stack[0:-1]) |
| name = self.name_stack[-1] |
| return self.get_node(namespace, name, None, flavor=Flavor.NAMESPACE) |
|
|
| |
| |
|
|
| def get_value(self, name): |
| """Get the value of name in the current scope. Return the Node, or None |
| if name is not set to a value.""" |
|
|
| |
| def find_scope(name): |
| for sc in reversed(self.scope_stack): |
| if name in sc.defs and sc.defs[name] is not None: |
| return sc |
|
|
| sc = find_scope(name) |
| if sc is not None: |
| value = sc.defs[name] |
| if isinstance(value, Node): |
| self.logger.info("Get %s in %s, found in %s, value %s" % (name, self.scope_stack[-1], sc, value)) |
| return value |
| else: |
| |
| self.logger.debug( |
| "Get %s in %s, found in %s: value %s is not a Node" % (name, self.scope_stack[-1], sc, value) |
| ) |
| else: |
| self.logger.debug("Get %s in %s: no Node value (or name not in scope)" % (name, self.scope_stack[-1])) |
|
|
| def set_value(self, name, value): |
| """Set the value of name in the current scope. Value must be a Node.""" |
|
|
| |
| def find_scope(name): |
| for sc in reversed(self.scope_stack): |
| if name in sc.defs: |
| return sc |
|
|
| sc = find_scope(name) |
| if sc is not None: |
| if isinstance(value, Node): |
| sc.defs[name] = value |
| self.logger.info("Set %s in %s to %s" % (name, sc, value)) |
| else: |
| |
| self.logger.debug("Set %s in %s: value %s is not a Node" % (name, sc, value)) |
| else: |
| self.logger.debug("Set: name %s not in scope" % (name)) |
|
|
| |
| |
|
|
| def get_attribute(self, ast_node): |
| """Get value of an ast.Attribute. |
| |
| Supports inherited attributes. If the obj's own namespace has no match |
| for attr, the ancestors of obj are also tried, following the MRO based |
| on the static type of the object, until one of them matches or until |
| all ancestors are exhausted. |
| |
| Return pair of Node objects (obj,attr), where each item can be None |
| on lookup failure. (Object not known, or no Node value assigned |
| to its attr.) |
| |
| May pass through UnresolvedSuperCallError. |
| """ |
|
|
| if not isinstance(ast_node, ast.Attribute): |
| raise TypeError("Expected ast.Attribute; got %s" % (type(ast_node))) |
| if not isinstance(ast_node.ctx, ast.Load): |
| raise ValueError("Expected a load context, got %s" % (type(ast_node.ctx))) |
|
|
| obj_node, attr_name = self.resolve_attribute(ast_node) |
|
|
| if isinstance(obj_node, Node) and obj_node.namespace is not None: |
| ns = obj_node.get_name() |
|
|
| |
| |
| |
| |
| |
| |
| if ns in ("Num", "Str"): |
| return obj_node, self.get_node(ns, attr_name, None, flavor=Flavor.ATTRIBUTE) |
|
|
| |
| def lookup(ns): |
| if ns in self.scopes: |
| sc = self.scopes[ns] |
| if attr_name in sc.defs: |
| return sc.defs[attr_name] |
|
|
| |
| value_node = lookup(ns) |
| if value_node is not None: |
| return obj_node, value_node |
|
|
| |
| |
| |
| if obj_node in self.mro: |
| for base_node in tail(self.mro[obj_node]): |
| ns = base_node.get_name() |
| value_node = lookup(ns) |
| if value_node is not None: |
| break |
| else: |
| return None, None |
| return base_node, value_node |
|
|
| return obj_node, None |
|
|
| def set_attribute(self, ast_node, new_value): |
| """Assign the Node provided as new_value into the attribute described |
| by the AST node ast_node. Return True if assignment was done, |
| False otherwise. |
| |
| May pass through UnresolvedSuperCallError. |
| """ |
|
|
| if not isinstance(ast_node, ast.Attribute): |
| raise TypeError("Expected ast.Attribute; got %s" % (type(ast_node))) |
| if not isinstance(ast_node.ctx, ast.Store): |
| raise ValueError("Expected a store context, got %s" % (type(ast_node.ctx))) |
|
|
| if not isinstance(new_value, Node): |
| return False |
|
|
| obj_node, attr_name = self.resolve_attribute(ast_node) |
|
|
| if isinstance(obj_node, Node) and obj_node.namespace is not None: |
| ns = obj_node.get_name() |
| if ns in self.scopes: |
| sc = self.scopes[ns] |
| sc.defs[attr_name] = new_value |
| return True |
| return False |
|
|
| |
| |
|
|
| def get_node(self, namespace, name, ast_node=None, flavor=Flavor.UNSPECIFIED): |
| """Return the unique node matching the namespace and name. |
| Create a new node if one doesn't already exist. |
| |
| To associate the node with a syntax object in the analyzed source code, |
| an AST node can be passed in. This only takes effect if a new Node |
| is created. |
| |
| To associate an AST node to an existing graph node, |
| see associate_node(). |
| |
| Flavor describes the kind of object the node represents. |
| See the node.Flavor enum for currently supported values. |
| |
| For existing nodes, flavor overwrites, if the given flavor is |
| (strictly) more specific than the node's existing one. |
| See node.Flavor.specificity(). |
| |
| !!! |
| In CallGraphVisitor, always use get_node() to create nodes, because it |
| also sets some important auxiliary information. Do not call the Node |
| constructor directly. |
| !!! |
| """ |
|
|
| if name in self.nodes: |
| for n in self.nodes[name]: |
| if n.namespace == namespace: |
| if Flavor.specificity(flavor) > Flavor.specificity(n.flavor): |
| n.flavor = flavor |
| return n |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if namespace in self.module_to_filename: |
| |
| |
| filename = self.module_to_filename[namespace] |
| else: |
| filename = self.filename |
|
|
| n = Node(namespace, name, ast_node, filename, flavor) |
|
|
| |
| if name in self.nodes: |
| self.nodes[name].append(n) |
| else: |
| self.nodes[name] = [n] |
|
|
| return n |
|
|
| def get_parent_node(self, graph_node): |
| """Get the parent node of the given Node. (Used in postprocessing.)""" |
| if "." in graph_node.namespace: |
| ns, name = graph_node.namespace.rsplit(".", 1) |
| else: |
| ns, name = "", graph_node.namespace |
| return self.get_node(ns, name, None) |
|
|
| def associate_node(self, graph_node, ast_node, filename=None): |
| """Change the AST node (and optionally filename) mapping of a graph node. |
| |
| This is useful for generating annotated output with source filename |
| and line number information. |
| |
| Sometimes a function in the analyzed code is first seen in a FromImport |
| before its definition has been analyzed. The namespace can be deduced |
| correctly already at that point, but the source line number information |
| has to wait until the actual definition is found (because the line |
| number is contained in the AST node). However, a graph Node must be |
| created immediately when the function is first encountered, in order |
| to have a Node that can act as a "uses" target (namespaced correctly, |
| to avoid a wildcard and the over-reaching expand_unknowns() in cases |
| where they are not needed). |
| |
| This method re-associates the given graph Node with a different |
| AST node, which allows updating the context when the definition |
| of a function or class is encountered.""" |
| graph_node.ast_node = ast_node |
| if filename is not None: |
| graph_node.filename = filename |
|
|
| def add_defines_edge(self, from_node, to_node): |
| """Add a defines edge in the graph between two nodes. |
| N.B. This will mark both nodes as defined.""" |
| status = False |
| if from_node not in self.defines_edges: |
| self.defines_edges[from_node] = set() |
| status = True |
| from_node.defined = True |
| if to_node is None or to_node in self.defines_edges[from_node]: |
| return status |
| self.defines_edges[from_node].add(to_node) |
| to_node.defined = True |
| return True |
|
|
| def add_uses_edge(self, from_node, to_node): |
| """Add a uses edge in the graph between two nodes.""" |
|
|
| if from_node not in self.uses_edges: |
| self.uses_edges[from_node] = set() |
| if to_node in self.uses_edges[from_node]: |
| return False |
| self.uses_edges[from_node].add(to_node) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if to_node.namespace is not None: |
| self.remove_wild(from_node, to_node, to_node.name) |
|
|
| return True |
|
|
| def remove_uses_edge(self, from_node, to_node): |
| """Remove a uses edge from the graph. (Used in postprocessing.)""" |
|
|
| if from_node in self.uses_edges: |
| u = self.uses_edges[from_node] |
| if to_node in u: |
| u.remove(to_node) |
|
|
| def remove_wild(self, from_node, to_node, name): |
| """Remove uses edge from from_node to wildcard *.name. |
| |
| This needs both to_node and name because in case of a bound name |
| (e.g. attribute lookup) the name field of the *target value* does not |
| necessarily match the formal name in the wildcard. |
| |
| Used for cleaning up forward-references once resolved. |
| This prevents spurious edges due to expand_unknowns().""" |
|
|
| if name is None: |
| return |
|
|
| if from_node not in self.uses_edges: |
| return |
|
|
| |
| |
| if to_node.get_name().find("^^^argument^^^") != -1: |
| return |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if to_node == from_node: |
| return |
|
|
| matching_wilds = [n for n in self.uses_edges[from_node] if n.namespace is None and n.name == name] |
| assert len(matching_wilds) < 2 |
| if len(matching_wilds): |
| wild_node = matching_wilds[0] |
| self.logger.info("Use from %s to %s resolves %s; removing wildcard" % (from_node, to_node, wild_node)) |
| self.remove_uses_edge(from_node, wild_node) |
|
|
| |
| |
|
|
| def contract_nonexistents(self): |
| """For all use edges to non-existent (i.e. not defined nodes) X.name, replace with edge to *.name.""" |
|
|
| new_uses_edges = [] |
| removed_uses_edges = [] |
| for n in self.uses_edges: |
| for n2 in self.uses_edges[n]: |
| if n2.namespace is not None and not n2.defined: |
| n3 = self.get_node(None, n2.name, n2.ast_node) |
| n3.defined = False |
| new_uses_edges.append((n, n3)) |
| removed_uses_edges.append((n, n2)) |
| self.logger.info("Contracting non-existent from %s to %s as %s" % (n, n2, n3)) |
|
|
| for from_node, to_node in new_uses_edges: |
| self.add_uses_edge(from_node, to_node) |
|
|
| for from_node, to_node in removed_uses_edges: |
| self.remove_uses_edge(from_node, to_node) |
|
|
| def expand_unknowns(self): |
| """For each unknown node *.name, replace all its incoming edges with edges to X.name for all possible Xs. |
| |
| Also mark all unknown nodes as not defined (so that they won't be visualized).""" |
|
|
| new_defines_edges = [] |
| for n in self.defines_edges: |
| for n2 in self.defines_edges[n]: |
| if n2.namespace is None: |
| for n3 in self.nodes[n2.name]: |
| if n3.namespace is not None: |
| new_defines_edges.append((n, n3)) |
|
|
| for from_node, to_node in new_defines_edges: |
| self.add_defines_edge(from_node, to_node) |
| self.logger.info("Expanding unknowns: new defines edge from %s to %s" % (from_node, to_node)) |
|
|
| new_uses_edges = [] |
| for n in self.uses_edges: |
| for n2 in self.uses_edges[n]: |
| if n2.namespace is None: |
| for n3 in self.nodes[n2.name]: |
| if n3.namespace is not None: |
| new_uses_edges.append((n, n3)) |
|
|
| for from_node, to_node in new_uses_edges: |
| self.add_uses_edge(from_node, to_node) |
| self.logger.info("Expanding unknowns: new uses edge from %s to %s" % (from_node, to_node)) |
|
|
| for name in self.nodes: |
| for n in self.nodes[name]: |
| if n.namespace is None: |
| n.defined = False |
|
|
| def cull_inherited(self): |
| """ |
| For each use edge from W to X.name, if it also has an edge to W to Y.name where |
| Y is used by X, then remove the first edge. |
| """ |
|
|
| removed_uses_edges = [] |
| for n in self.uses_edges: |
| for n2 in self.uses_edges[n]: |
| inherited = False |
| for n3 in self.uses_edges[n]: |
| if ( |
| n3.name == n2.name |
| and n2.namespace is not None |
| and n3.namespace is not None |
| and n3.namespace != n2.namespace |
| ): |
| pn2 = self.get_parent_node(n2) |
| pn3 = self.get_parent_node(n3) |
| |
| |
| if pn2 in self.uses_edges and pn3 in self.uses_edges[pn2]: |
| inherited = True |
|
|
| if inherited and n in self.uses_edges: |
| removed_uses_edges.append((n, n2)) |
| self.logger.info("Removing inherited edge from %s to %s" % (n, n2)) |
|
|
| for from_node, to_node in removed_uses_edges: |
| self.remove_uses_edge(from_node, to_node) |
|
|
| def collapse_inner(self): |
| """Combine lambda and comprehension Nodes with their parent Nodes to reduce visual noise. |
| Also mark those original nodes as undefined, so that they won't be visualized.""" |
|
|
| |
| |
|
|
| |
| |
| for name in list(self.nodes): |
| if name in ("lambda", "listcomp", "setcomp", "dictcomp", "genexpr"): |
| for n in self.nodes[name]: |
| pn = self.get_parent_node(n) |
| if n in self.uses_edges: |
| for n2 in self.uses_edges[n]: |
| self.logger.info("Collapsing inner from %s to %s, uses %s" % (n, pn, n2)) |
| self.add_uses_edge(pn, n2) |
| n.defined = False |