|
|
import ast |
|
|
|
|
|
from structlog import get_logger |
|
|
import symtable |
|
|
from typing import Union |
|
|
|
|
|
from .anutils import ( |
|
|
ExecuteInInnerScope, |
|
|
Scope, |
|
|
UnresolvedSuperCallError, |
|
|
format_alias, |
|
|
get_ast_node_name, |
|
|
get_module_name, |
|
|
resolve_method_resolution_order, |
|
|
sanitize_exprs, |
|
|
tail, |
|
|
) |
|
|
from .node import Flavor, Node |
|
|
|
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CallGraphVisitor(ast.NodeVisitor): |
|
|
"""A visitor that can be walked over a Python AST, and will derive |
|
|
information about the objects in the AST and how they use each other. |
|
|
|
|
|
A single CallGraphVisitor object can be run over several ASTs (from a |
|
|
set of source files). The resulting information is the aggregate from |
|
|
all files. This way use information between objects in different files |
|
|
can be gathered.""" |
|
|
|
|
|
def __init__(self, files:dict, root: str = None): |
|
|
self.logger = logger |
|
|
|
|
|
|
|
|
self.module_to_filename = {} |
|
|
for filename, _ in files.items(): |
|
|
mod_name = get_module_name(filename, files=files) |
|
|
self.module_to_filename[mod_name] = filename |
|
|
self.filenames = files.keys() |
|
|
self.root = root |
|
|
self.files = files |
|
|
|
|
|
self.defines_edges = {} |
|
|
self.uses_edges = {} |
|
|
self.nodes = {} |
|
|
self.scopes = {} |
|
|
|
|
|
self.class_base_ast_nodes = {} |
|
|
self.class_base_nodes = {} |
|
|
self.mro = {} |
|
|
|
|
|
|
|
|
self.module_name = None |
|
|
self.filename = None |
|
|
self.name_stack = [] |
|
|
self.scope_stack = [] |
|
|
self.class_stack = [] |
|
|
self.context_stack = [] |
|
|
self.last_value = None |
|
|
|
|
|
|
|
|
self.process() |
|
|
|
|
|
def process(self): |
|
|
"""Analyze the set of files, twice so that any forward-references are picked up.""" |
|
|
for pas in range(2): |
|
|
for filename in self.filenames: |
|
|
self.logger.info("========== pass %d, file '%s' ==========" % (pas + 1, filename)) |
|
|
self.process_one(filename) |
|
|
if pas == 0: |
|
|
self.resolve_base_classes() |
|
|
self.postprocess() |
|
|
|
|
|
def process_one(self, filename): |
|
|
"""Analyze the specified Python source file.""" |
|
|
if filename not in self.filenames: |
|
|
raise ValueError( |
|
|
"Filename '%s' has not been preprocessed (was not given to __init__, which got %s)" |
|
|
% (filename, self.filenames) |
|
|
) |
|
|
|
|
|
self.filename = filename |
|
|
self.module_name = get_module_name(filename, self.files, root=self.root) |
|
|
self.analyze_scopes(self.files[filename], filename) |
|
|
self.visit(ast.parse(self.files[filename], filename)) |
|
|
self.module_name = None |
|
|
self.filename = None |
|
|
|
|
|
def resolve_base_classes(self): |
|
|
"""Resolve base classes from AST nodes to Nodes. |
|
|
|
|
|
Run this between pass 1 and pass 2 to pick up inherited methods. |
|
|
Currently, this can parse ast.Names and ast.Attributes as bases. |
|
|
""" |
|
|
self.logger.debug("Resolving base classes") |
|
|
assert len(self.scope_stack) == 0 |
|
|
for node in self.class_base_ast_nodes: |
|
|
self.class_base_nodes[node] = [] |
|
|
for ast_node in self.class_base_ast_nodes[node]: |
|
|
|
|
|
self.scope_stack.append(self.scopes[node.namespace]) |
|
|
|
|
|
if isinstance(ast_node, ast.Name): |
|
|
baseclass_node = self.get_value(ast_node.id) |
|
|
elif isinstance(ast_node, ast.Attribute): |
|
|
_, baseclass_node = self.get_attribute(ast_node) |
|
|
else: |
|
|
baseclass_node = None |
|
|
|
|
|
self.scope_stack.pop() |
|
|
|
|
|
if isinstance(baseclass_node, Node) and baseclass_node.namespace is not None: |
|
|
self.class_base_nodes[node].append(baseclass_node) |
|
|
|
|
|
self.logger.debug("All base classes (non-recursive, local level only): %s" % self.class_base_nodes) |
|
|
|
|
|
self.logger.debug("Resolving method resolution order (MRO) for all analyzed classes") |
|
|
self.mro = resolve_method_resolution_order(self.class_base_nodes, self.logger) |
|
|
self.logger.debug("Method resolution order (MRO) for all analyzed classes: %s" % self.mro) |
|
|
|
|
|
def postprocess(self): |
|
|
"""Finalize the analysis.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.expand_unknowns() |
|
|
self.resolve_imports() |
|
|
self.contract_nonexistents() |
|
|
self.cull_inherited() |
|
|
self.collapse_inner() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def resolve_imports(self): |
|
|
""" |
|
|
resolve relative imports and remap nodes |
|
|
""" |
|
|
|
|
|
|
|
|
imports_to_resolve = {n for items in self.nodes.values() for n in items if n.flavor == Flavor.IMPORTEDITEM} |
|
|
|
|
|
import_mapping = {} |
|
|
while len(imports_to_resolve) > 0: |
|
|
from_node = imports_to_resolve.pop() |
|
|
if from_node in import_mapping: |
|
|
continue |
|
|
to_uses = self.uses_edges.get(from_node, set([from_node])) |
|
|
assert len(to_uses) == 1 |
|
|
to_node = to_uses.pop() |
|
|
|
|
|
if to_node.namespace == "": |
|
|
module_node = to_node |
|
|
else: |
|
|
assert from_node.name == to_node.name |
|
|
module_node = self.get_node("", to_node.namespace) |
|
|
module_uses = self.uses_edges.get(module_node) |
|
|
if module_uses is not None: |
|
|
|
|
|
for candidate_to_node in module_uses: |
|
|
if candidate_to_node.name == from_node.name: |
|
|
to_node = candidate_to_node |
|
|
import_mapping[from_node] = to_node |
|
|
if to_node.flavor == Flavor.IMPORTEDITEM and from_node is not to_node: |
|
|
imports_to_resolve.add(to_node) |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
attribute_import_mapping = {} |
|
|
for nodes in self.nodes.values(): |
|
|
for node in nodes: |
|
|
if not node.defined and node.flavor == Flavor.ATTRIBUTE: |
|
|
|
|
|
for from_node, to_node in import_mapping.items(): |
|
|
if ( |
|
|
f"{from_node.namespace}.{from_node.name}" == node.namespace |
|
|
and from_node.flavor == Flavor.IMPORTEDITEM |
|
|
): |
|
|
|
|
|
if to_node not in self.defines_edges: |
|
|
continue |
|
|
for candidate_to_node in self.defines_edges[to_node]: |
|
|
if candidate_to_node.name == node.name: |
|
|
attribute_import_mapping[node] = candidate_to_node |
|
|
break |
|
|
import_mapping.update(attribute_import_mapping) |
|
|
|
|
|
|
|
|
self.nodes = {name: [import_mapping.get(n, n) for n in items] for name, items in self.nodes.items()} |
|
|
self.uses_edges = { |
|
|
import_mapping.get(from_node, from_node): {import_mapping.get(to_node, to_node) for to_node in to_nodes} |
|
|
for from_node, to_nodes in self.uses_edges.items() |
|
|
if len(to_nodes) > 0 |
|
|
} |
|
|
self.defines_edges = { |
|
|
import_mapping.get(from_node, from_node): {import_mapping.get(to_node, to_node) for to_node in to_nodes} |
|
|
for from_node, to_nodes in self.defines_edges.items() |
|
|
if len(to_nodes) > 0 |
|
|
} |
|
|
|
|
|
def filter(self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000): |
|
|
""" |
|
|
filter callgraph nodes that related to `node` or are in `namespace` |
|
|
|
|
|
Args: |
|
|
node: pyan node for which related nodes should be found, if none, filter only for namespace |
|
|
namespace: namespace to search in (name of top level module), |
|
|
if None, determines namespace from `node` |
|
|
max_iter: maximum number of iterations and nodes to iterate |
|
|
|
|
|
Returns: |
|
|
self |
|
|
""" |
|
|
|
|
|
filtered_nodes = self.get_related_nodes(node, namespace=namespace, max_iter=max_iter) |
|
|
|
|
|
self.nodes = {name: [node for node in nodes if node in filtered_nodes] for name, nodes in self.nodes.items()} |
|
|
self.uses_edges = { |
|
|
node: {n for n in nodes if n in filtered_nodes} |
|
|
for node, nodes in self.uses_edges.items() |
|
|
if node in filtered_nodes |
|
|
} |
|
|
self.defines_edges = { |
|
|
node: {n for n in nodes if n in filtered_nodes} |
|
|
for node, nodes in self.defines_edges.items() |
|
|
if node in filtered_nodes |
|
|
} |
|
|
return self |
|
|
|
|
|
def get_related_nodes( |
|
|
self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000 |
|
|
) -> set: |
|
|
""" |
|
|
get nodes that related to `node` or are in `namespace` |
|
|
|
|
|
Args: |
|
|
node: pyan node for which related nodes should be found, if none, filter only for namespace |
|
|
namespace: namespace to search in (name of top level module), |
|
|
if None, determines namespace from `node` |
|
|
max_iter: maximum number of iterations and nodes to iterate |
|
|
|
|
|
Returns: |
|
|
set: set of nodes related to `node` including `node` itself |
|
|
""" |
|
|
|
|
|
if node is None: |
|
|
queue = [] |
|
|
if namespace is None: |
|
|
new_nodes = {n for items in self.nodes.values() for n in items} |
|
|
else: |
|
|
new_nodes = { |
|
|
n |
|
|
for items in self.nodes.values() |
|
|
for n in items |
|
|
if n.namespace is not None and namespace in n.namespace |
|
|
} |
|
|
|
|
|
else: |
|
|
new_nodes = set() |
|
|
if namespace is None: |
|
|
namespace = node.namespace.strip(".").split(".", 1)[0] |
|
|
queue = [node] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
i = max_iter |
|
|
while len(queue) > 0: |
|
|
item = queue.pop() |
|
|
if item not in new_nodes: |
|
|
new_nodes.add(item) |
|
|
i -= 1 |
|
|
if i < 0: |
|
|
break |
|
|
queue.extend( |
|
|
[ |
|
|
n |
|
|
for n in self.uses_edges.get(item, []) |
|
|
if n in self.uses_edges and n not in new_nodes and namespace in n.namespace |
|
|
] |
|
|
) |
|
|
queue.extend( |
|
|
[ |
|
|
n |
|
|
for n in self.defines_edges.get(item, []) |
|
|
if n in self.defines_edges and n not in new_nodes and namespace in n.namespace |
|
|
] |
|
|
) |
|
|
|
|
|
return new_nodes |
|
|
|
|
|
def visit_Module(self, node): |
|
|
self.logger.debug("Module %s, %s" % (self.module_name, self.filename)) |
|
|
|
|
|
|
|
|
module_node = self.get_node("", self.module_name, node, flavor=Flavor.MODULE) |
|
|
self.associate_node(module_node, node, filename=self.filename) |
|
|
|
|
|
ns = self.module_name |
|
|
self.name_stack.append(ns) |
|
|
self.scope_stack.append(self.scopes[ns]) |
|
|
self.context_stack.append("Module %s" % (ns)) |
|
|
self.generic_visit(node) |
|
|
self.context_stack.pop() |
|
|
self.scope_stack.pop() |
|
|
self.name_stack.pop() |
|
|
self.last_value = None |
|
|
|
|
|
if self.add_defines_edge(module_node, None): |
|
|
self.logger.info("Def Module %s" % node) |
|
|
|
|
|
def visit_ClassDef(self, node): |
|
|
self.logger.debug("ClassDef %s, %s:%s" % (node.name, self.filename, node.lineno)) |
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
ns = from_node.get_name() |
|
|
to_node = self.get_node(ns, node.name, node, flavor=Flavor.CLASS) |
|
|
if self.add_defines_edge(from_node, to_node): |
|
|
self.logger.info("Def from %s to Class %s" % (from_node, to_node)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.associate_node(to_node, node, self.filename) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.set_value(node.name, to_node) |
|
|
|
|
|
self.class_stack.append(to_node) |
|
|
self.name_stack.append(node.name) |
|
|
inner_ns = self.get_node_of_current_namespace().get_name() |
|
|
self.scope_stack.append(self.scopes[inner_ns]) |
|
|
self.context_stack.append("ClassDef %s" % (node.name)) |
|
|
|
|
|
self.class_base_ast_nodes[to_node] = [] |
|
|
for b in node.bases: |
|
|
|
|
|
self.class_base_ast_nodes[to_node].append(b) |
|
|
|
|
|
self.visit(b) |
|
|
|
|
|
for stmt in node.body: |
|
|
self.visit(stmt) |
|
|
|
|
|
self.context_stack.pop() |
|
|
self.scope_stack.pop() |
|
|
self.name_stack.pop() |
|
|
self.class_stack.pop() |
|
|
|
|
|
def visit_FunctionDef(self, node): |
|
|
self.logger.debug("FunctionDef %s, %s:%s" % (node.name, self.filename, node.lineno)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self_name, flavor = self.analyze_functiondef(node) |
|
|
|
|
|
|
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
ns = from_node.get_name() |
|
|
to_node = self.get_node(ns, node.name, node, flavor=flavor) |
|
|
if self.add_defines_edge(from_node, to_node): |
|
|
self.logger.info("Def from %s to Function %s" % (from_node, to_node)) |
|
|
|
|
|
|
|
|
|
|
|
self.associate_node(to_node, node, self.filename) |
|
|
self.set_value(node.name, to_node) |
|
|
|
|
|
|
|
|
|
|
|
self.name_stack.append(node.name) |
|
|
inner_ns = self.get_node_of_current_namespace().get_name() |
|
|
if inner_ns not in self.scopes: |
|
|
self.name_stack.pop() |
|
|
return |
|
|
self.scope_stack.append(self.scopes[inner_ns]) |
|
|
self.context_stack.append("FunctionDef %s" % (node.name)) |
|
|
|
|
|
|
|
|
|
|
|
self.generate_args_nodes(node.args, inner_ns) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self_name is not None: |
|
|
class_node = self.get_current_class() |
|
|
self.scopes[inner_ns].defs[self_name] = class_node |
|
|
self.logger.info('Method def: setting self name "%s" to %s' % (self_name, class_node)) |
|
|
|
|
|
|
|
|
self.analyze_arguments(node.args) |
|
|
|
|
|
|
|
|
|
|
|
for stmt in node.body: |
|
|
self.visit(stmt) |
|
|
|
|
|
|
|
|
|
|
|
self.context_stack.pop() |
|
|
self.scope_stack.pop() |
|
|
self.name_stack.pop() |
|
|
|
|
|
def visit_AsyncFunctionDef(self, node): |
|
|
self.visit_FunctionDef(node) |
|
|
|
|
|
def visit_Lambda(self, node): |
|
|
|
|
|
self.logger.debug("Lambda, %s:%s" % (self.filename, node.lineno)) |
|
|
with ExecuteInInnerScope(self, "lambda"): |
|
|
inner_ns = self.get_node_of_current_namespace().get_name() |
|
|
self.generate_args_nodes(node.args, inner_ns) |
|
|
self.analyze_arguments(node.args) |
|
|
self.visit(node.body) |
|
|
|
|
|
def generate_args_nodes(self, ast_args, inner_ns): |
|
|
"""Capture which names correspond to function args. |
|
|
|
|
|
In the function scope, set them to a nonsense Node, |
|
|
to prevent leakage of identifiers of matching name |
|
|
from the enclosing scope (due to the local value being None |
|
|
until we set it to this nonsense Node). |
|
|
|
|
|
ast_args: node.args from a FunctionDef or Lambda |
|
|
inner_ns: namespace of the function or lambda, for scope lookup |
|
|
""" |
|
|
sc = self.scopes[inner_ns] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
nonsense_node = self.get_node(inner_ns, "^^^argument^^^", None) |
|
|
|
|
|
for a in ast_args.args: |
|
|
sc.defs[a.arg] = nonsense_node |
|
|
if ast_args.vararg is not None: |
|
|
sc.defs[ast_args.vararg] = nonsense_node |
|
|
for a in ast_args.kwonlyargs: |
|
|
sc.defs[a.arg] = nonsense_node |
|
|
if ast_args.kwarg is not None: |
|
|
sc.defs[ast_args.kwarg] = nonsense_node |
|
|
|
|
|
def analyze_arguments(self, ast_args): |
|
|
"""Analyze an arguments node of the AST. |
|
|
|
|
|
Record bindings of args to the given default values, if present. |
|
|
|
|
|
Used for analyzing FunctionDefs and Lambdas.""" |
|
|
|
|
|
if ast_args.defaults: |
|
|
n = len(ast_args.defaults) |
|
|
for tgt, val in zip(ast_args.args[-n:], ast_args.defaults): |
|
|
targets = sanitize_exprs(tgt) |
|
|
values = sanitize_exprs(val) |
|
|
self.analyze_binding(targets, values) |
|
|
if ast_args.kw_defaults: |
|
|
n = len(ast_args.kw_defaults) |
|
|
for tgt, val in zip(ast_args.kwonlyargs, ast_args.kw_defaults): |
|
|
if val is not None: |
|
|
targets = sanitize_exprs(tgt) |
|
|
values = sanitize_exprs(val) |
|
|
self.analyze_binding(targets, values) |
|
|
|
|
|
def visit_Import(self, node): |
|
|
self.logger.debug("Import %s, %s:%s" % ([format_alias(x) for x in node.names], self.filename, node.lineno)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for import_item in node.names: |
|
|
self.analyze_module_import(import_item, node) |
|
|
|
|
|
def visit_ImportFrom(self, node): |
|
|
self.logger.debug( |
|
|
"ImportFrom: from %s import %s, %s:%s" |
|
|
% (node.module, [format_alias(x) for x in node.names], self.filename, node.lineno) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
if node.module is None: |
|
|
self.logger.debug( |
|
|
"ImportFrom (original) from %s import %s, %s:%s" |
|
|
% ("." * node.level, [format_alias(x) for x in node.names], self.filename, node.lineno) |
|
|
) |
|
|
tgt_level = node.level |
|
|
current_module_namespace = self.module_name.rsplit(".", tgt_level)[0] |
|
|
tgt_name = current_module_namespace |
|
|
self.logger.debug( |
|
|
"ImportFrom (resolved): from %s import %s, %s:%s" |
|
|
% (tgt_name, [format_alias(x) for x in node.names], self.filename, node.lineno) |
|
|
) |
|
|
elif node.level != 0: |
|
|
self.logger.debug( |
|
|
"ImportFrom (original): from %s import %s, %s:%s" |
|
|
% (node.module, [format_alias(x) for x in node.names], self.filename, node.lineno) |
|
|
) |
|
|
tgt_level = node.level |
|
|
current_module_namespace = self.module_name.rsplit(".", tgt_level)[0] |
|
|
tgt_name = current_module_namespace + "." + node.module |
|
|
self.logger.debug( |
|
|
"ImportFrom (resolved): from %s import %s, %s:%s" |
|
|
% (tgt_name, [format_alias(x) for x in node.names], self.filename, node.lineno) |
|
|
) |
|
|
else: |
|
|
tgt_name = node.module |
|
|
|
|
|
|
|
|
for alias in node.names: |
|
|
|
|
|
if tgt_name + "." + alias.name in self.module_to_filename: |
|
|
to_node = self.get_node("", tgt_name + "." + alias.name, node, flavor=Flavor.MODULE) |
|
|
else: |
|
|
to_node = self.get_node(tgt_name, alias.name, node, flavor=Flavor.IMPORTEDITEM) |
|
|
|
|
|
if alias.asname is not None: |
|
|
alias_name = alias.asname |
|
|
else: |
|
|
alias_name = alias.name |
|
|
self.set_value(alias_name, to_node) |
|
|
self.logger.info("From setting name %s to %s" % (alias_name, to_node)) |
|
|
|
|
|
self.logger.debug("Use from %s to ImportFrom %s" % (from_node, to_node)) |
|
|
if self.add_uses_edge(from_node, to_node): |
|
|
self.logger.info("New edge added for Use from %s to ImportFrom %s" % (from_node, to_node)) |
|
|
|
|
|
def analyze_module_import(self, import_item, ast_node): |
|
|
"""Analyze a names AST node inside an Import or ImportFrom AST node. |
|
|
|
|
|
This handles the case where the objects being imported are modules. |
|
|
|
|
|
import_item: an item of ast_node.names |
|
|
ast_node: for recording source location information |
|
|
""" |
|
|
src_name = import_item.name |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
|
|
|
mod_node = self.get_node("", src_name, ast_node, flavor=Flavor.MODULE) |
|
|
|
|
|
if import_item.asname is not None: |
|
|
alias_name = import_item.asname |
|
|
else: |
|
|
alias_name = mod_node.name |
|
|
self.add_uses_edge(from_node, mod_node) |
|
|
self.logger.info("New edge added for Use import %s in %s" % (mod_node, from_node)) |
|
|
self.set_value(alias_name, mod_node) |
|
|
self.logger.info("From setting name %s to %s" % (alias_name, mod_node)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def visit_Constant(self, node): |
|
|
self.logger.debug("Constant %s, %s:%s" % (node.value, self.filename, node.lineno)) |
|
|
t = type(node.value) |
|
|
ns = self.get_node_of_current_namespace().get_name() |
|
|
tn = t.__name__ |
|
|
self.last_value = self.get_node(ns, tn, node, flavor=Flavor.ATTRIBUTE) |
|
|
|
|
|
|
|
|
def visit_Attribute(self, node): |
|
|
objname = get_ast_node_name(node.value) |
|
|
self.logger.debug( |
|
|
"Attribute %s of %s in context %s, %s:%s" % (node.attr, objname, type(node.ctx), self.filename, node.lineno) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(node.ctx, ast.Store): |
|
|
new_value = self.last_value |
|
|
try: |
|
|
if self.set_attribute(node, new_value): |
|
|
self.logger.info("setattr %s on %s to %s" % (node.attr, objname, new_value)) |
|
|
except UnresolvedSuperCallError: |
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
elif isinstance(node.ctx, ast.Load): |
|
|
try: |
|
|
obj_node, attr_node = self.get_attribute(node) |
|
|
except UnresolvedSuperCallError: |
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
if isinstance(attr_node, Node): |
|
|
self.logger.info("getattr %s on %s returns %s" % (node.attr, objname, attr_node)) |
|
|
|
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
self.logger.debug("Use from %s to %s" % (from_node, attr_node)) |
|
|
if self.add_uses_edge(from_node, attr_node): |
|
|
self.logger.info("New edge added for Use from %s to %s" % (from_node, attr_node)) |
|
|
|
|
|
|
|
|
if attr_node.namespace is not None: |
|
|
self.remove_wild(from_node, attr_node, node.attr) |
|
|
|
|
|
self.last_value = attr_node |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif isinstance(obj_node, Node) and obj_node.namespace is not None: |
|
|
tgt_name = node.attr |
|
|
from_node = self.get_node_of_current_namespace() |
|
|
ns = obj_node.get_name() |
|
|
to_node = self.get_node(ns, tgt_name, node, flavor=Flavor.ATTRIBUTE) |
|
|
self.logger.debug( |
|
|
f"Use from {from_node} to {to_node} (target obj {obj_node} known but target attr " |
|
|
f"{node.attr} not resolved; maybe fwd ref or unanalyzed import)" |
|
|
) |
|
|
if self.add_uses_edge(from_node, to_node): |
|
|
self.logger.info( |
|
|
"New edge added for Use from {from_node} to {to_node} (target obj {obj_node} known but " |
|
|
f"target attr {node.attr} not resolved; maybe fwd ref or unanalyzed import)" |
|
|
) |
|
|
|
|
|
|
|
|
self.remove_wild(from_node, obj_node, node.attr) |
|
|
|
|
|
self.last_value = to_node |
|
|
|
|
|
|
|
|
else: |
|
|
self.visit(node.value) |
|
|
|
|
|
|
|
|
def visit_Name(self, node): |
|
|
self.logger.debug("Name %s in context %s, %s:%s" % (node.id, type(node.ctx), self.filename, node.lineno)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(node.ctx, ast.Store): |
|
|
|
|
|
self.set_value(node.id, self.last_value) |
|
|
|
|
|
|
|
|
elif isinstance(node.ctx, ast.Load): |
|
|
tgt_name = node.id |
|
|
to_node = self.get_value(tgt_name) |
|
|
current_class = self.get_current_class() |
|
|
if current_class is None or to_node is not current_class: |
|
|
|
|
|
|
|
|
if not isinstance(to_node, Node): |
|
|
|
|
|
to_node = self.get_node(None, tgt_name, node, flavor=Flavor.UNKNOWN) |
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
self.logger.debug("Use from %s to Name %s" % (from_node, to_node)) |
|
|
if self.add_uses_edge(from_node, to_node): |
|
|
self.logger.info("New edge added for Use from %s to Name %s" % (from_node, to_node)) |
|
|
|
|
|
self.last_value = to_node |
|
|
|
|
|
def visit_Assign(self, node): |
|
|
|
|
|
|
|
|
|
|
|
if len(node.targets) > 1: |
|
|
self.logger.debug("Assign (chained with %d outputs)" % (len(node.targets))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
values = sanitize_exprs(node.value) |
|
|
for targets in node.targets: |
|
|
targets = sanitize_exprs(targets) |
|
|
self.logger.debug( |
|
|
"Assign %s %s, %s:%s" |
|
|
% ( |
|
|
[get_ast_node_name(x) for x in targets], |
|
|
[get_ast_node_name(x) for x in values], |
|
|
self.filename, |
|
|
node.lineno, |
|
|
) |
|
|
) |
|
|
self.analyze_binding(targets, values) |
|
|
|
|
|
def visit_AnnAssign(self, node): |
|
|
target = sanitize_exprs(node.target) |
|
|
self.last_value = None |
|
|
if node.value is not None: |
|
|
value = sanitize_exprs(node.value) |
|
|
|
|
|
|
|
|
self.logger.debug( |
|
|
"AnnAssign %s %s, %s:%s" |
|
|
% (get_ast_node_name(target[0]), get_ast_node_name(value), self.filename, node.lineno) |
|
|
) |
|
|
self.analyze_binding(target, value) |
|
|
else: |
|
|
self.logger.debug( |
|
|
"AnnAssign %s <no value>, %s:%s" % (get_ast_node_name(target[0]), self.filename, node.lineno) |
|
|
) |
|
|
self.last_value = None |
|
|
self.visit(target[0]) |
|
|
|
|
|
|
|
|
|
|
|
def visit_AugAssign(self, node): |
|
|
targets = sanitize_exprs(node.target) |
|
|
values = sanitize_exprs(node.value) |
|
|
|
|
|
self.logger.debug( |
|
|
"AugAssign %s %s %s, %s:%s" |
|
|
% ( |
|
|
[get_ast_node_name(x) for x in targets], |
|
|
type(node.op), |
|
|
[get_ast_node_name(x) for x in values], |
|
|
self.filename, |
|
|
node.lineno, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
self.analyze_binding(targets, values) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def visit_For(self, node): |
|
|
self.logger.debug("For-loop, %s:%s" % (self.filename, node.lineno)) |
|
|
|
|
|
targets = sanitize_exprs(node.target) |
|
|
values = sanitize_exprs(node.iter) |
|
|
self.analyze_binding(targets, values) |
|
|
|
|
|
for stmt in node.body: |
|
|
self.visit(stmt) |
|
|
for stmt in node.orelse: |
|
|
self.visit(stmt) |
|
|
|
|
|
def visit_AsyncFor(self, node): |
|
|
self.visit_For(node) |
|
|
|
|
|
def visit_ListComp(self, node): |
|
|
self.logger.debug("ListComp, %s:%s" % (self.filename, node.lineno)) |
|
|
self.analyze_comprehension(node, "listcomp") |
|
|
|
|
|
def visit_SetComp(self, node): |
|
|
self.logger.debug("SetComp, %s:%s" % (self.filename, node.lineno)) |
|
|
self.analyze_comprehension(node, "setcomp") |
|
|
|
|
|
def visit_DictComp(self, node): |
|
|
self.logger.debug("DictComp, %s:%s" % (self.filename, node.lineno)) |
|
|
self.analyze_comprehension(node, "dictcomp", field1="key", field2="value") |
|
|
|
|
|
def visit_GeneratorExp(self, node): |
|
|
self.logger.debug("GeneratorExp, %s:%s" % (self.filename, node.lineno)) |
|
|
self.analyze_comprehension(node, "genexpr") |
|
|
|
|
|
def analyze_comprehension(self, node, label, field1="elt", field2=None): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gens = node.generators |
|
|
outermost = gens[0] |
|
|
moregens = gens[1:] if len(gens) > 1 else [] |
|
|
|
|
|
outermost_iters = sanitize_exprs(outermost.iter) |
|
|
outermost_targets = sanitize_exprs(outermost.target) |
|
|
for expr in outermost_iters: |
|
|
self.visit(expr) |
|
|
if label not in {"lambda", "listcomp", "setcomp", "dictcomp", "genexpr"}: |
|
|
with ExecuteInInnerScope(self, label): |
|
|
for expr in outermost_targets: |
|
|
self.visit(expr) |
|
|
self.last_value = None |
|
|
for expr in outermost.ifs: |
|
|
self.visit(expr) |
|
|
|
|
|
|
|
|
for gen in moregens: |
|
|
targets = sanitize_exprs(gen.target) |
|
|
values = sanitize_exprs(gen.iter) |
|
|
self.analyze_binding(targets, values) |
|
|
for expr in gen.ifs: |
|
|
self.visit(expr) |
|
|
|
|
|
self.visit(getattr(node, field1)) |
|
|
if field2: |
|
|
self.visit(getattr(node, field2)) |
|
|
|
|
|
def visit_Call(self, node): |
|
|
self.logger.debug("Call %s, %s:%s" % (get_ast_node_name(node.func), self.filename, node.lineno)) |
|
|
|
|
|
|
|
|
for arg in node.args: |
|
|
self.visit(arg) |
|
|
for kw in node.keywords: |
|
|
self.visit(kw.value) |
|
|
|
|
|
|
|
|
try: |
|
|
result_node = self.resolve_builtins(node) |
|
|
except UnresolvedSuperCallError: |
|
|
result_node = None |
|
|
|
|
|
if isinstance(result_node, Node): |
|
|
self.last_value = result_node |
|
|
|
|
|
from_node = self.get_node_of_current_namespace() |
|
|
to_node = result_node |
|
|
self.logger.debug("Use from %s to %s (via resolved call to built-ins)" % (from_node, to_node)) |
|
|
if self.add_uses_edge(from_node, to_node): |
|
|
self.logger.info( |
|
|
"New edge added for Use from %s to %s (via resolved call to built-ins)" % (from_node, to_node) |
|
|
) |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
self.visit(node.func) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.last_value in self.class_base_ast_nodes: |
|
|
from_node = self.get_node_of_current_namespace() |
|
|
class_node = self.last_value |
|
|
to_node = self.get_node(class_node.get_name(), "__init__", None, flavor=Flavor.METHOD) |
|
|
self.logger.debug("Use from %s to %s (call creates an instance)" % (from_node, to_node)) |
|
|
if self.add_uses_edge(from_node, to_node): |
|
|
self.logger.info( |
|
|
"New edge added for Use from %s to %s (call creates an instance)" % (from_node, to_node) |
|
|
) |
|
|
|
|
|
def visit_With(self, node): |
|
|
self.logger.debug("With (context manager), %s:%s" % (self.filename, node.lineno)) |
|
|
|
|
|
def add_uses_enter_exit_of(graph_node): |
|
|
|
|
|
if isinstance(graph_node, Node): |
|
|
from_node = self.get_node_of_current_namespace() |
|
|
withed_obj_node = graph_node |
|
|
|
|
|
self.logger.debug("Use from %s to With %s" % (from_node, withed_obj_node)) |
|
|
for methodname in ("__enter__", "__exit__"): |
|
|
to_node = self.get_node(withed_obj_node.get_name(), methodname, None, flavor=Flavor.METHOD) |
|
|
if self.add_uses_edge(from_node, to_node): |
|
|
self.logger.info("New edge added for Use from %s to %s" % (from_node, to_node)) |
|
|
|
|
|
for withitem in node.items: |
|
|
expr = withitem.context_expr |
|
|
vars = withitem.optional_vars |
|
|
|
|
|
|
|
|
self.last_value = None |
|
|
self.visit(expr) |
|
|
add_uses_enter_exit_of(self.last_value) |
|
|
self.last_value = None |
|
|
|
|
|
if vars is not None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(vars, ast.Name): |
|
|
self.analyze_binding(sanitize_exprs(vars), sanitize_exprs(expr)) |
|
|
else: |
|
|
self.visit(vars) |
|
|
|
|
|
for stmt in node.body: |
|
|
self.visit(stmt) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_functiondef(self, ast_node): |
|
|
"""Analyze a function definition. |
|
|
|
|
|
Visit decorators, and if this is a method definition, capture the name |
|
|
of the first positional argument to denote "self", like Python does. |
|
|
|
|
|
Return (self_name, flavor), where self_name the name representing self, |
|
|
or None if not applicable; and flavor is a Flavor, specifically one of |
|
|
FUNCTION, METHOD, STATICMETHOD or CLASSMETHOD.""" |
|
|
|
|
|
if not isinstance(ast_node, (ast.AsyncFunctionDef, ast.FunctionDef)): |
|
|
raise TypeError("Expected ast.FunctionDef; got %s" % (type(ast_node))) |
|
|
|
|
|
|
|
|
self.last_value = None |
|
|
deco_names = [] |
|
|
for deco in ast_node.decorator_list: |
|
|
self.visit(deco) |
|
|
deco_node = self.last_value |
|
|
if isinstance(deco_node, Node): |
|
|
deco_names.append(deco_node.name) |
|
|
self.last_value = None |
|
|
|
|
|
|
|
|
in_class_ns = self.context_stack[-1].startswith("ClassDef") |
|
|
if not in_class_ns: |
|
|
flavor = Flavor.FUNCTION |
|
|
else: |
|
|
if "staticmethod" in deco_names: |
|
|
flavor = Flavor.STATICMETHOD |
|
|
elif "classmethod" in deco_names: |
|
|
flavor = Flavor.CLASSMETHOD |
|
|
else: |
|
|
flavor = Flavor.METHOD |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if flavor in (Flavor.METHOD, Flavor.CLASSMETHOD): |
|
|
|
|
|
|
|
|
all_args = ast_node.args |
|
|
posargs = all_args.args |
|
|
if len(posargs): |
|
|
self_name = posargs[0].arg |
|
|
return self_name, flavor |
|
|
|
|
|
return None, flavor |
|
|
|
|
|
def analyze_binding(self, targets, values): |
|
|
"""Generic handler for binding forms. Inputs must be sanitize_exprs()d.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.last_value = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(targets) == len(values): |
|
|
captured_values = [] |
|
|
for value in values: |
|
|
self.visit(value) |
|
|
captured_values.append(self.last_value) |
|
|
self.last_value = None |
|
|
for tgt, val in zip(targets, captured_values): |
|
|
self.last_value = val |
|
|
self.visit(tgt) |
|
|
self.last_value = None |
|
|
else: |
|
|
|
|
|
for value in values: |
|
|
self.visit(value) |
|
|
for tgt in targets: |
|
|
self.visit(tgt) |
|
|
self.last_value = None |
|
|
|
|
|
def resolve_builtins(self, ast_node): |
|
|
"""Resolve those calls to built-in functions whose return values |
|
|
can be determined in a simple manner. |
|
|
|
|
|
Currently, this supports: |
|
|
|
|
|
- str(obj), repr(obj) --> obj.__str__, obj.__repr__ |
|
|
|
|
|
- super() (any arguments ignored), which works only in pass 2, |
|
|
because the MRO is determined between passes. |
|
|
|
|
|
May raise UnresolvedSuperCallError, if the call is to super(), |
|
|
but the result cannot be (currently) determined (usually because either |
|
|
pass 1, or some relevant source file is not in the analyzed set). |
|
|
|
|
|
Returns the Node the call resolves to, or None if not determined. |
|
|
""" |
|
|
if not isinstance(ast_node, ast.Call): |
|
|
raise TypeError("Expected ast.Call; got %s" % (type(ast_node))) |
|
|
|
|
|
func_ast_node = ast_node.func |
|
|
if isinstance(func_ast_node, ast.Name): |
|
|
funcname = func_ast_node.id |
|
|
if funcname == "super": |
|
|
class_node = self.get_current_class() |
|
|
self.logger.debug("Resolving super() of %s" % (class_node)) |
|
|
if class_node in self.mro: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(self.mro[class_node]) > 1: |
|
|
result = self.mro[class_node][1] |
|
|
self.logger.debug("super of %s is %s" % (class_node, result)) |
|
|
return result |
|
|
else: |
|
|
msg = "super called for %s, but no known bases" % (class_node) |
|
|
self.logger.info(msg) |
|
|
raise UnresolvedSuperCallError(msg) |
|
|
else: |
|
|
msg = "super called for %s, but MRO not determined for it (maybe still in pass 1?)" % (class_node) |
|
|
self.logger.info(msg) |
|
|
raise UnresolvedSuperCallError(msg) |
|
|
|
|
|
if funcname in ("str", "repr"): |
|
|
if len(ast_node.args) == 1: |
|
|
obj_astnode = ast_node.args[0] |
|
|
if isinstance(obj_astnode, (ast.Name, ast.Attribute)): |
|
|
self.logger.debug("Resolving %s() of %s" % (funcname, get_ast_node_name(obj_astnode))) |
|
|
attrname = "__%s__" % (funcname) |
|
|
|
|
|
tmp_astnode = ast.Attribute(value=obj_astnode, attr=attrname, ctx=obj_astnode.ctx) |
|
|
obj_node, attr_node = self.get_attribute(tmp_astnode) |
|
|
self.logger.debug( |
|
|
"Resolve %s() of %s: returning attr node %s" |
|
|
% (funcname, get_ast_node_name(obj_astnode), attr_node) |
|
|
) |
|
|
return attr_node |
|
|
|
|
|
|
|
|
|
|
|
def resolve_attribute(self, ast_node): |
|
|
"""Resolve an ast.Attribute. |
|
|
|
|
|
Nested attributes (a.b.c) are automatically handled by recursion. |
|
|
|
|
|
Return (obj,attrname), where obj is a Node (or None on lookup failure), |
|
|
and attrname is the attribute name. |
|
|
|
|
|
May pass through UnresolvedSuperCallError, if the attribute resolution |
|
|
failed specifically due to an unresolved super() call. |
|
|
""" |
|
|
|
|
|
if not isinstance(ast_node, ast.Attribute): |
|
|
raise TypeError("Expected ast.Attribute; got %s" % (type(ast_node))) |
|
|
|
|
|
self.logger.debug( |
|
|
"Resolve %s.%s in context %s" % (get_ast_node_name(ast_node.value), ast_node.attr, type(ast_node.ctx)) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(ast_node.value, ast.Attribute): |
|
|
obj_node, attr_name = self.resolve_attribute(ast_node.value) |
|
|
|
|
|
if isinstance(obj_node, Node) and obj_node.namespace is not None: |
|
|
ns = obj_node.get_name() |
|
|
if ns in self.scopes: |
|
|
sc = self.scopes[ns] |
|
|
if attr_name in sc.defs: |
|
|
self.logger.debug("Resolved to attr %s of %s" % (ast_node.attr, sc.defs[attr_name])) |
|
|
return sc.defs[attr_name], ast_node.attr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.debug("Unresolved, returning attr %s of unknown" % (ast_node.attr)) |
|
|
return None, ast_node.attr |
|
|
else: |
|
|
|
|
|
if isinstance(ast_node.value, (ast.Num, ast.Str)): |
|
|
t = type(ast_node.value) |
|
|
tn = t.__name__ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
obj_node = self.get_node("", tn, None, flavor=Flavor.CLASS) |
|
|
|
|
|
|
|
|
elif isinstance(ast_node.value, ast.Call): |
|
|
|
|
|
|
|
|
obj_node = self.resolve_builtins(ast_node.value) |
|
|
|
|
|
|
|
|
if not isinstance(obj_node, Node): |
|
|
self.logger.debug("Unresolved function call as obj, returning attr %s of unknown" % (ast_node.attr)) |
|
|
return None, ast_node.attr |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
obj_node = self.get_value(get_ast_node_name(ast_node.value)) |
|
|
|
|
|
self.logger.debug("Resolved to attr %s of %s" % (ast_node.attr, obj_node)) |
|
|
return obj_node, ast_node.attr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_scopes(self, code, filename): |
|
|
"""Gather lexical scope information.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scopes = {} |
|
|
|
|
|
def process(parent_ns, table): |
|
|
sc = Scope(table) |
|
|
ns = "%s.%s" % (parent_ns, sc.name) if len(sc.name) else parent_ns |
|
|
scopes[ns] = sc |
|
|
for t in table.get_children(): |
|
|
process(ns, t) |
|
|
|
|
|
process(self.module_name, symtable.symtable(code, filename, compile_type="exec")) |
|
|
|
|
|
|
|
|
for ns in scopes: |
|
|
if ns not in self.scopes: |
|
|
self.scopes[ns] = scopes[ns] |
|
|
else: |
|
|
sc = scopes[ns] |
|
|
oldsc = self.scopes[ns] |
|
|
for name in sc.defs: |
|
|
if name not in oldsc.defs: |
|
|
oldsc.defs[name] = sc.defs[name] |
|
|
|
|
|
self.logger.debug("Scopes now: %s" % (self.scopes)) |
|
|
|
|
|
def get_current_class(self): |
|
|
"""Return the node representing the current class, or None if not inside a class definition.""" |
|
|
return self.class_stack[-1] if len(self.class_stack) else None |
|
|
|
|
|
def get_node_of_current_namespace(self): |
|
|
"""Return the unique node representing the current namespace, |
|
|
based on self.name_stack. |
|
|
|
|
|
For a Node n representing a namespace: |
|
|
- n.namespace = fully qualified name of the parent namespace |
|
|
(empty string if at top level) |
|
|
- n.name = name of this namespace |
|
|
- no associated AST node. |
|
|
""" |
|
|
assert len(self.name_stack) |
|
|
|
|
|
namespace = ".".join(self.name_stack[0:-1]) |
|
|
name = self.name_stack[-1] |
|
|
return self.get_node(namespace, name, None, flavor=Flavor.NAMESPACE) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_value(self, name): |
|
|
"""Get the value of name in the current scope. Return the Node, or None |
|
|
if name is not set to a value.""" |
|
|
|
|
|
|
|
|
def find_scope(name): |
|
|
for sc in reversed(self.scope_stack): |
|
|
if name in sc.defs and sc.defs[name] is not None: |
|
|
return sc |
|
|
|
|
|
sc = find_scope(name) |
|
|
if sc is not None: |
|
|
value = sc.defs[name] |
|
|
if isinstance(value, Node): |
|
|
self.logger.info("Get %s in %s, found in %s, value %s" % (name, self.scope_stack[-1], sc, value)) |
|
|
return value |
|
|
else: |
|
|
|
|
|
self.logger.debug( |
|
|
"Get %s in %s, found in %s: value %s is not a Node" % (name, self.scope_stack[-1], sc, value) |
|
|
) |
|
|
else: |
|
|
self.logger.debug("Get %s in %s: no Node value (or name not in scope)" % (name, self.scope_stack[-1])) |
|
|
|
|
|
def set_value(self, name, value): |
|
|
"""Set the value of name in the current scope. Value must be a Node.""" |
|
|
|
|
|
|
|
|
def find_scope(name): |
|
|
for sc in reversed(self.scope_stack): |
|
|
if name in sc.defs: |
|
|
return sc |
|
|
|
|
|
sc = find_scope(name) |
|
|
if sc is not None: |
|
|
if isinstance(value, Node): |
|
|
sc.defs[name] = value |
|
|
self.logger.info("Set %s in %s to %s" % (name, sc, value)) |
|
|
else: |
|
|
|
|
|
self.logger.debug("Set %s in %s: value %s is not a Node" % (name, sc, value)) |
|
|
else: |
|
|
self.logger.debug("Set: name %s not in scope" % (name)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_attribute(self, ast_node): |
|
|
"""Get value of an ast.Attribute. |
|
|
|
|
|
Supports inherited attributes. If the obj's own namespace has no match |
|
|
for attr, the ancestors of obj are also tried, following the MRO based |
|
|
on the static type of the object, until one of them matches or until |
|
|
all ancestors are exhausted. |
|
|
|
|
|
Return pair of Node objects (obj,attr), where each item can be None |
|
|
on lookup failure. (Object not known, or no Node value assigned |
|
|
to its attr.) |
|
|
|
|
|
May pass through UnresolvedSuperCallError. |
|
|
""" |
|
|
|
|
|
if not isinstance(ast_node, ast.Attribute): |
|
|
raise TypeError("Expected ast.Attribute; got %s" % (type(ast_node))) |
|
|
if not isinstance(ast_node.ctx, ast.Load): |
|
|
raise ValueError("Expected a load context, got %s" % (type(ast_node.ctx))) |
|
|
|
|
|
obj_node, attr_name = self.resolve_attribute(ast_node) |
|
|
|
|
|
if isinstance(obj_node, Node) and obj_node.namespace is not None: |
|
|
ns = obj_node.get_name() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ns in ("Num", "Str"): |
|
|
return obj_node, self.get_node(ns, attr_name, None, flavor=Flavor.ATTRIBUTE) |
|
|
|
|
|
|
|
|
def lookup(ns): |
|
|
if ns in self.scopes: |
|
|
sc = self.scopes[ns] |
|
|
if attr_name in sc.defs: |
|
|
return sc.defs[attr_name] |
|
|
|
|
|
|
|
|
value_node = lookup(ns) |
|
|
if value_node is not None: |
|
|
return obj_node, value_node |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if obj_node in self.mro: |
|
|
for base_node in tail(self.mro[obj_node]): |
|
|
ns = base_node.get_name() |
|
|
value_node = lookup(ns) |
|
|
if value_node is not None: |
|
|
break |
|
|
else: |
|
|
return None, None |
|
|
return base_node, value_node |
|
|
|
|
|
return obj_node, None |
|
|
|
|
|
def set_attribute(self, ast_node, new_value): |
|
|
"""Assign the Node provided as new_value into the attribute described |
|
|
by the AST node ast_node. Return True if assignment was done, |
|
|
False otherwise. |
|
|
|
|
|
May pass through UnresolvedSuperCallError. |
|
|
""" |
|
|
|
|
|
if not isinstance(ast_node, ast.Attribute): |
|
|
raise TypeError("Expected ast.Attribute; got %s" % (type(ast_node))) |
|
|
if not isinstance(ast_node.ctx, ast.Store): |
|
|
raise ValueError("Expected a store context, got %s" % (type(ast_node.ctx))) |
|
|
|
|
|
if not isinstance(new_value, Node): |
|
|
return False |
|
|
|
|
|
obj_node, attr_name = self.resolve_attribute(ast_node) |
|
|
|
|
|
if isinstance(obj_node, Node) and obj_node.namespace is not None: |
|
|
ns = obj_node.get_name() |
|
|
if ns in self.scopes: |
|
|
sc = self.scopes[ns] |
|
|
sc.defs[attr_name] = new_value |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_node(self, namespace, name, ast_node=None, flavor=Flavor.UNSPECIFIED): |
|
|
"""Return the unique node matching the namespace and name. |
|
|
Create a new node if one doesn't already exist. |
|
|
|
|
|
To associate the node with a syntax object in the analyzed source code, |
|
|
an AST node can be passed in. This only takes effect if a new Node |
|
|
is created. |
|
|
|
|
|
To associate an AST node to an existing graph node, |
|
|
see associate_node(). |
|
|
|
|
|
Flavor describes the kind of object the node represents. |
|
|
See the node.Flavor enum for currently supported values. |
|
|
|
|
|
For existing nodes, flavor overwrites, if the given flavor is |
|
|
(strictly) more specific than the node's existing one. |
|
|
See node.Flavor.specificity(). |
|
|
|
|
|
!!! |
|
|
In CallGraphVisitor, always use get_node() to create nodes, because it |
|
|
also sets some important auxiliary information. Do not call the Node |
|
|
constructor directly. |
|
|
!!! |
|
|
""" |
|
|
|
|
|
if name in self.nodes: |
|
|
for n in self.nodes[name]: |
|
|
if n.namespace == namespace: |
|
|
if Flavor.specificity(flavor) > Flavor.specificity(n.flavor): |
|
|
n.flavor = flavor |
|
|
return n |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if namespace in self.module_to_filename: |
|
|
|
|
|
|
|
|
filename = self.module_to_filename[namespace] |
|
|
else: |
|
|
filename = self.filename |
|
|
|
|
|
n = Node(namespace, name, ast_node, filename, flavor) |
|
|
|
|
|
|
|
|
if name in self.nodes: |
|
|
self.nodes[name].append(n) |
|
|
else: |
|
|
self.nodes[name] = [n] |
|
|
|
|
|
return n |
|
|
|
|
|
def get_parent_node(self, graph_node): |
|
|
"""Get the parent node of the given Node. (Used in postprocessing.)""" |
|
|
if "." in graph_node.namespace: |
|
|
ns, name = graph_node.namespace.rsplit(".", 1) |
|
|
else: |
|
|
ns, name = "", graph_node.namespace |
|
|
return self.get_node(ns, name, None) |
|
|
|
|
|
def associate_node(self, graph_node, ast_node, filename=None): |
|
|
"""Change the AST node (and optionally filename) mapping of a graph node. |
|
|
|
|
|
This is useful for generating annotated output with source filename |
|
|
and line number information. |
|
|
|
|
|
Sometimes a function in the analyzed code is first seen in a FromImport |
|
|
before its definition has been analyzed. The namespace can be deduced |
|
|
correctly already at that point, but the source line number information |
|
|
has to wait until the actual definition is found (because the line |
|
|
number is contained in the AST node). However, a graph Node must be |
|
|
created immediately when the function is first encountered, in order |
|
|
to have a Node that can act as a "uses" target (namespaced correctly, |
|
|
to avoid a wildcard and the over-reaching expand_unknowns() in cases |
|
|
where they are not needed). |
|
|
|
|
|
This method re-associates the given graph Node with a different |
|
|
AST node, which allows updating the context when the definition |
|
|
of a function or class is encountered.""" |
|
|
graph_node.ast_node = ast_node |
|
|
if filename is not None: |
|
|
graph_node.filename = filename |
|
|
|
|
|
def add_defines_edge(self, from_node, to_node): |
|
|
"""Add a defines edge in the graph between two nodes. |
|
|
N.B. This will mark both nodes as defined.""" |
|
|
status = False |
|
|
if from_node not in self.defines_edges: |
|
|
self.defines_edges[from_node] = set() |
|
|
status = True |
|
|
from_node.defined = True |
|
|
if to_node is None or to_node in self.defines_edges[from_node]: |
|
|
return status |
|
|
self.defines_edges[from_node].add(to_node) |
|
|
to_node.defined = True |
|
|
return True |
|
|
|
|
|
def add_uses_edge(self, from_node, to_node): |
|
|
"""Add a uses edge in the graph between two nodes.""" |
|
|
|
|
|
if from_node not in self.uses_edges: |
|
|
self.uses_edges[from_node] = set() |
|
|
if to_node in self.uses_edges[from_node]: |
|
|
return False |
|
|
self.uses_edges[from_node].add(to_node) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if to_node.namespace is not None: |
|
|
self.remove_wild(from_node, to_node, to_node.name) |
|
|
|
|
|
return True |
|
|
|
|
|
def remove_uses_edge(self, from_node, to_node): |
|
|
"""Remove a uses edge from the graph. (Used in postprocessing.)""" |
|
|
|
|
|
if from_node in self.uses_edges: |
|
|
u = self.uses_edges[from_node] |
|
|
if to_node in u: |
|
|
u.remove(to_node) |
|
|
|
|
|
def remove_wild(self, from_node, to_node, name): |
|
|
"""Remove uses edge from from_node to wildcard *.name. |
|
|
|
|
|
This needs both to_node and name because in case of a bound name |
|
|
(e.g. attribute lookup) the name field of the *target value* does not |
|
|
necessarily match the formal name in the wildcard. |
|
|
|
|
|
Used for cleaning up forward-references once resolved. |
|
|
This prevents spurious edges due to expand_unknowns().""" |
|
|
|
|
|
if name is None: |
|
|
return |
|
|
|
|
|
if from_node not in self.uses_edges: |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
if to_node.get_name().find("^^^argument^^^") != -1: |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if to_node == from_node: |
|
|
return |
|
|
|
|
|
matching_wilds = [n for n in self.uses_edges[from_node] if n.namespace is None and n.name == name] |
|
|
assert len(matching_wilds) < 2 |
|
|
if len(matching_wilds): |
|
|
wild_node = matching_wilds[0] |
|
|
self.logger.info("Use from %s to %s resolves %s; removing wildcard" % (from_node, to_node, wild_node)) |
|
|
self.remove_uses_edge(from_node, wild_node) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def contract_nonexistents(self): |
|
|
"""For all use edges to non-existent (i.e. not defined nodes) X.name, replace with edge to *.name.""" |
|
|
|
|
|
new_uses_edges = [] |
|
|
removed_uses_edges = [] |
|
|
for n in self.uses_edges: |
|
|
for n2 in self.uses_edges[n]: |
|
|
if n2.namespace is not None and not n2.defined: |
|
|
n3 = self.get_node(None, n2.name, n2.ast_node) |
|
|
n3.defined = False |
|
|
new_uses_edges.append((n, n3)) |
|
|
removed_uses_edges.append((n, n2)) |
|
|
self.logger.info("Contracting non-existent from %s to %s as %s" % (n, n2, n3)) |
|
|
|
|
|
for from_node, to_node in new_uses_edges: |
|
|
self.add_uses_edge(from_node, to_node) |
|
|
|
|
|
for from_node, to_node in removed_uses_edges: |
|
|
self.remove_uses_edge(from_node, to_node) |
|
|
|
|
|
def expand_unknowns(self): |
|
|
"""For each unknown node *.name, replace all its incoming edges with edges to X.name for all possible Xs. |
|
|
|
|
|
Also mark all unknown nodes as not defined (so that they won't be visualized).""" |
|
|
|
|
|
new_defines_edges = [] |
|
|
for n in self.defines_edges: |
|
|
for n2 in self.defines_edges[n]: |
|
|
if n2.namespace is None: |
|
|
for n3 in self.nodes[n2.name]: |
|
|
if n3.namespace is not None: |
|
|
new_defines_edges.append((n, n3)) |
|
|
|
|
|
for from_node, to_node in new_defines_edges: |
|
|
self.add_defines_edge(from_node, to_node) |
|
|
self.logger.info("Expanding unknowns: new defines edge from %s to %s" % (from_node, to_node)) |
|
|
|
|
|
new_uses_edges = [] |
|
|
for n in self.uses_edges: |
|
|
for n2 in self.uses_edges[n]: |
|
|
if n2.namespace is None: |
|
|
for n3 in self.nodes[n2.name]: |
|
|
if n3.namespace is not None: |
|
|
new_uses_edges.append((n, n3)) |
|
|
|
|
|
for from_node, to_node in new_uses_edges: |
|
|
self.add_uses_edge(from_node, to_node) |
|
|
self.logger.info("Expanding unknowns: new uses edge from %s to %s" % (from_node, to_node)) |
|
|
|
|
|
for name in self.nodes: |
|
|
for n in self.nodes[name]: |
|
|
if n.namespace is None: |
|
|
n.defined = False |
|
|
|
|
|
def cull_inherited(self): |
|
|
""" |
|
|
For each use edge from W to X.name, if it also has an edge to W to Y.name where |
|
|
Y is used by X, then remove the first edge. |
|
|
""" |
|
|
|
|
|
removed_uses_edges = [] |
|
|
for n in self.uses_edges: |
|
|
for n2 in self.uses_edges[n]: |
|
|
inherited = False |
|
|
for n3 in self.uses_edges[n]: |
|
|
if ( |
|
|
n3.name == n2.name |
|
|
and n2.namespace is not None |
|
|
and n3.namespace is not None |
|
|
and n3.namespace != n2.namespace |
|
|
): |
|
|
pn2 = self.get_parent_node(n2) |
|
|
pn3 = self.get_parent_node(n3) |
|
|
|
|
|
|
|
|
if pn2 in self.uses_edges and pn3 in self.uses_edges[pn2]: |
|
|
inherited = True |
|
|
|
|
|
if inherited and n in self.uses_edges: |
|
|
removed_uses_edges.append((n, n2)) |
|
|
self.logger.info("Removing inherited edge from %s to %s" % (n, n2)) |
|
|
|
|
|
for from_node, to_node in removed_uses_edges: |
|
|
self.remove_uses_edge(from_node, to_node) |
|
|
|
|
|
def collapse_inner(self): |
|
|
"""Combine lambda and comprehension Nodes with their parent Nodes to reduce visual noise. |
|
|
Also mark those original nodes as undefined, so that they won't be visualized.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for name in list(self.nodes): |
|
|
if name in ("lambda", "listcomp", "setcomp", "dictcomp", "genexpr"): |
|
|
for n in self.nodes[name]: |
|
|
pn = self.get_parent_node(n) |
|
|
if n in self.uses_edges: |
|
|
for n2 in self.uses_edges[n]: |
|
|
self.logger.info("Collapsing inner from %s to %s, uses %s" % (n, pn, n2)) |
|
|
self.add_uses_edge(pn, n2) |
|
|
n.defined = False |