| """ |
| Test the func_inspect module. |
| """ |
|
|
| |
| |
| |
|
|
| import functools |
|
|
| from joblib.func_inspect import ( |
| _clean_win_chars, |
| filter_args, |
| format_signature, |
| get_func_code, |
| get_func_name, |
| ) |
| from joblib.memory import Memory |
| from joblib.test.common import with_numpy |
| from joblib.testing import fixture, parametrize, raises |
|
|
|
|
| |
| |
| def f(x, y=0): |
| pass |
|
|
|
|
| def g(x): |
| pass |
|
|
|
|
| def h(x, y=0, *args, **kwargs): |
| pass |
|
|
|
|
| def i(x=1): |
| pass |
|
|
|
|
| def j(x, y, **kwargs): |
| pass |
|
|
|
|
| def k(*args, **kwargs): |
| pass |
|
|
|
|
| def m1(x, *, y): |
| pass |
|
|
|
|
| def m2(x, *, y, z=3): |
| pass |
|
|
|
|
| @fixture(scope="module") |
| def cached_func(tmpdir_factory): |
| |
| |
| |
| cachedir = tmpdir_factory.mktemp("joblib_test_func_inspect") |
| mem = Memory(cachedir.strpath) |
|
|
| @mem.cache |
| def cached_func_inner(x): |
| return x |
|
|
| return cached_func_inner |
|
|
|
|
| class Klass(object): |
| def f(self, x): |
| return x |
|
|
|
|
| |
| |
|
|
|
|
| @parametrize( |
| "func,args,filtered_args", |
| [ |
| (f, [[], (1,)], {"x": 1, "y": 0}), |
| (f, [["x"], (1,)], {"y": 0}), |
| (f, [["y"], (0,)], {"x": 0}), |
| (f, [["y"], (0,), {"y": 1}], {"x": 0}), |
| (f, [["x", "y"], (0,)], {}), |
| (f, [[], (0,), {"y": 1}], {"x": 0, "y": 1}), |
| (f, [["y"], (), {"x": 2, "y": 1}], {"x": 2}), |
| (g, [[], (), {"x": 1}], {"x": 1}), |
| (i, [[], (2,)], {"x": 2}), |
| ], |
| ) |
| def test_filter_args(func, args, filtered_args): |
| assert filter_args(func, *args) == filtered_args |
|
|
|
|
| def test_filter_args_method(): |
| obj = Klass() |
| assert filter_args(obj.f, [], (1,)) == {"x": 1, "self": obj} |
|
|
|
|
| @parametrize( |
| "func,args,filtered_args", |
| [ |
| (h, [[], (1,)], {"x": 1, "y": 0, "*": [], "**": {}}), |
| (h, [[], (1, 2, 3, 4)], {"x": 1, "y": 2, "*": [3, 4], "**": {}}), |
| (h, [[], (1, 25), {"ee": 2}], {"x": 1, "y": 25, "*": [], "**": {"ee": 2}}), |
| (h, [["*"], (1, 2, 25), {"ee": 2}], {"x": 1, "y": 2, "**": {"ee": 2}}), |
| ], |
| ) |
| def test_filter_varargs(func, args, filtered_args): |
| assert filter_args(func, *args) == filtered_args |
|
|
|
|
| test_filter_kwargs_extra_params = [ |
| (m1, [[], (1,), {"y": 2}], {"x": 1, "y": 2}), |
| (m2, [[], (1,), {"y": 2}], {"x": 1, "y": 2, "z": 3}), |
| ] |
|
|
|
|
| @parametrize( |
| "func,args,filtered_args", |
| [ |
| (k, [[], (1, 2), {"ee": 2}], {"*": [1, 2], "**": {"ee": 2}}), |
| (k, [[], (3, 4)], {"*": [3, 4], "**": {}}), |
| ] |
| + test_filter_kwargs_extra_params, |
| ) |
| def test_filter_kwargs(func, args, filtered_args): |
| assert filter_args(func, *args) == filtered_args |
|
|
|
|
| def test_filter_args_2(): |
| assert filter_args(j, [], (1, 2), {"ee": 2}) == {"x": 1, "y": 2, "**": {"ee": 2}} |
|
|
| ff = functools.partial(f, 1) |
| |
| assert filter_args(ff, [], (1,)) == {"*": [1], "**": {}} |
| assert filter_args(ff, ["y"], (1,)) == {"*": [1], "**": {}} |
|
|
|
|
| @parametrize("func,funcname", [(f, "f"), (g, "g"), (cached_func, "cached_func")]) |
| def test_func_name(func, funcname): |
| |
| |
| assert get_func_name(func)[1] == funcname |
|
|
|
|
| def test_func_name_on_inner_func(cached_func): |
| |
| |
| |
| assert get_func_name(cached_func)[1] == "cached_func_inner" |
|
|
|
|
| def test_func_name_collision_on_inner_func(): |
| |
| |
| def f(): |
| def inner_func(): |
| return |
|
|
| return get_func_name(inner_func) |
|
|
| def g(): |
| def inner_func(): |
| return |
|
|
| return get_func_name(inner_func) |
|
|
| module, name = f() |
| other_module, other_name = g() |
|
|
| assert name == other_name |
| assert module != other_module |
|
|
|
|
| def test_func_inspect_errors(): |
| |
| assert get_func_name("a".lower)[-1] == "lower" |
| assert get_func_code("a".lower)[1:] == (None, -1) |
| ff = lambda x: x |
| assert get_func_name(ff, win_characters=False)[-1] == "<lambda>" |
| assert get_func_code(ff)[1] == __file__.replace(".pyc", ".py") |
| |
| ff.__module__ = "__main__" |
| assert get_func_name(ff, win_characters=False)[-1] == "<lambda>" |
| assert get_func_code(ff)[1] == __file__.replace(".pyc", ".py") |
|
|
|
|
| def func_with_kwonly_args(a, b, *, kw1="kw1", kw2="kw2"): |
| pass |
|
|
|
|
| def func_with_signature(a: int, b: int) -> None: |
| pass |
|
|
|
|
| def test_filter_args_edge_cases(): |
| assert filter_args(func_with_kwonly_args, [], (1, 2), {"kw1": 3, "kw2": 4}) == { |
| "a": 1, |
| "b": 2, |
| "kw1": 3, |
| "kw2": 4, |
| } |
|
|
| |
| |
| with raises(ValueError) as excinfo: |
| filter_args(func_with_kwonly_args, [], (1, 2, 3), {"kw2": 2}) |
| excinfo.match("Keyword-only parameter 'kw1' was passed as positional parameter") |
|
|
| assert filter_args( |
| func_with_kwonly_args, ["b", "kw2"], (1, 2), {"kw1": 3, "kw2": 4} |
| ) == {"a": 1, "kw1": 3} |
|
|
| assert filter_args(func_with_signature, ["b"], (1, 2)) == {"a": 1} |
|
|
|
|
| def test_bound_methods(): |
| """Make sure that calling the same method on two different instances |
| of the same class does resolv to different signatures. |
| """ |
| a = Klass() |
| b = Klass() |
| assert filter_args(a.f, [], (1,)) != filter_args(b.f, [], (1,)) |
|
|
|
|
| @parametrize( |
| "exception,regex,func,args", |
| [ |
| ( |
| ValueError, |
| "ignore_lst must be a list of parameters to ignore", |
| f, |
| ["bar", (None,)], |
| ), |
| ( |
| ValueError, |
| r"Ignore list: argument \'(.*)\' is not defined", |
| g, |
| [["bar"], (None,)], |
| ), |
| (ValueError, "Wrong number of arguments", h, [[]]), |
| ], |
| ) |
| def test_filter_args_error_msg(exception, regex, func, args): |
| """Make sure that filter_args returns decent error messages, for the |
| sake of the user. |
| """ |
| with raises(exception) as excinfo: |
| filter_args(func, *args) |
| excinfo.match(regex) |
|
|
|
|
| def test_filter_args_no_kwargs_mutation(): |
| """None-regression test against 0.12.0 changes. |
| |
| https://github.com/joblib/joblib/pull/75 |
| |
| Make sure filter args doesn't mutate the kwargs dict that gets passed in. |
| """ |
| kwargs = {"x": 0} |
| filter_args(g, [], [], kwargs) |
| assert kwargs == {"x": 0} |
|
|
|
|
| def test_clean_win_chars(): |
| string = r"C:\foo\bar\main.py" |
| mangled_string = _clean_win_chars(string) |
| for char in ("\\", ":", "<", ">", "!"): |
| assert char not in mangled_string |
|
|
|
|
| @parametrize( |
| "func,args,kwargs,sgn_expected", |
| [ |
| (g, [list(range(5))], {}, "g([0, 1, 2, 3, 4])"), |
| (k, [1, 2, (3, 4)], {"y": True}, "k(1, 2, (3, 4), y=True)"), |
| ], |
| ) |
| def test_format_signature(func, args, kwargs, sgn_expected): |
| |
| path, sgn_result = format_signature(func, *args, **kwargs) |
| assert sgn_result == sgn_expected |
|
|
|
|
| def test_format_signature_long_arguments(): |
| shortening_threshold = 1500 |
| |
| |
| |
| shortening_target = 700 + 10 |
|
|
| arg = "a" * shortening_threshold |
| _, signature = format_signature(h, arg) |
| assert len(signature) < shortening_target |
|
|
| nb_args = 5 |
| args = [arg for _ in range(nb_args)] |
| _, signature = format_signature(h, *args) |
| assert len(signature) < shortening_target * nb_args |
|
|
| kwargs = {str(i): arg for i, arg in enumerate(args)} |
| _, signature = format_signature(h, **kwargs) |
| assert len(signature) < shortening_target * nb_args |
|
|
| _, signature = format_signature(h, *args, **kwargs) |
| assert len(signature) < shortening_target * 2 * nb_args |
|
|
|
|
| @with_numpy |
| def test_format_signature_numpy(): |
| """Test the format signature formatting with numpy.""" |
|
|
|
|
| def test_special_source_encoding(): |
| from joblib.test.test_func_inspect_special_encoding import big5_f |
|
|
| func_code, source_file, first_line = get_func_code(big5_f) |
| assert first_line == 5 |
| assert "def big5_f():" in func_code |
| assert "test_func_inspect_special_encoding" in source_file |
|
|
|
|
| def _get_code(): |
| from joblib.test.test_func_inspect_special_encoding import big5_f |
|
|
| return get_func_code(big5_f)[0] |
|
|
|
|
| def test_func_code_consistency(): |
| from joblib.parallel import Parallel, delayed |
|
|
| codes = Parallel(n_jobs=2)(delayed(_get_code)() for _ in range(5)) |
| assert len(set(codes)) == 1 |
|
|