| | """Tests for autoreload extension. |
| | """ |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | import os |
| | import platform |
| | import pytest |
| | import sys |
| | import tempfile |
| | import textwrap |
| | import shutil |
| | import random |
| | import time |
| | from io import StringIO |
| | from dataclasses import dataclass |
| |
|
| | import IPython.testing.tools as tt |
| |
|
| | from unittest import TestCase |
| |
|
| | from IPython.extensions.autoreload import AutoreloadMagics |
| | from IPython.core.events import EventManager, pre_run_cell |
| | from IPython.testing.decorators import skipif_not_numpy |
| |
|
| | if platform.python_implementation() == "PyPy": |
| | pytest.skip( |
| | "Current autoreload implementation is extremely slow on PyPy", |
| | allow_module_level=True, |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | noop = lambda *a, **kw: None |
| |
|
| |
|
| | class FakeShell: |
| | def __init__(self): |
| | self.ns = {} |
| | self.user_ns = self.ns |
| | self.user_ns_hidden = {} |
| | self.events = EventManager(self, {"pre_run_cell", pre_run_cell}) |
| | self.auto_magics = AutoreloadMagics(shell=self) |
| | self.events.register("pre_run_cell", self.auto_magics.pre_run_cell) |
| |
|
| | register_magics = set_hook = noop |
| |
|
| | def run_code(self, code): |
| | self.events.trigger("pre_run_cell") |
| | exec(code, self.user_ns) |
| | self.auto_magics.post_execute_hook() |
| |
|
| | def push(self, items): |
| | self.ns.update(items) |
| |
|
| | def magic_autoreload(self, parameter): |
| | self.auto_magics.autoreload(parameter) |
| |
|
| | def magic_aimport(self, parameter, stream=None): |
| | self.auto_magics.aimport(parameter, stream=stream) |
| | self.auto_magics.post_execute_hook() |
| |
|
| |
|
| | class Fixture(TestCase): |
| | """Fixture for creating test module files""" |
| |
|
| | test_dir = None |
| | old_sys_path = None |
| | filename_chars = "abcdefghijklmopqrstuvwxyz0123456789" |
| |
|
| | def setUp(self): |
| | self.test_dir = tempfile.mkdtemp() |
| | self.old_sys_path = list(sys.path) |
| | sys.path.insert(0, self.test_dir) |
| | self.shell = FakeShell() |
| |
|
| | def tearDown(self): |
| | shutil.rmtree(self.test_dir) |
| | sys.path = self.old_sys_path |
| |
|
| | self.test_dir = None |
| | self.old_sys_path = None |
| | self.shell = None |
| |
|
| | def get_module(self): |
| | module_name = "tmpmod_" + "".join(random.sample(self.filename_chars, 20)) |
| | if module_name in sys.modules: |
| | del sys.modules[module_name] |
| | file_name = os.path.join(self.test_dir, module_name + ".py") |
| | return module_name, file_name |
| |
|
| | def write_file(self, filename, content): |
| | """ |
| | Write a file, and force a timestamp difference of at least one second |
| | |
| | Notes |
| | ----- |
| | Python's .pyc files record the timestamp of their compilation |
| | with a time resolution of one second. |
| | |
| | Therefore, we need to force a timestamp difference between .py |
| | and .pyc, without having the .py file be timestamped in the |
| | future, and without changing the timestamp of the .pyc file |
| | (because that is stored in the file). The only reliable way |
| | to achieve this seems to be to sleep. |
| | """ |
| | content = textwrap.dedent(content) |
| | |
| | time.sleep(1.05) |
| |
|
| | |
| | with open(filename, "w", encoding="utf-8") as f: |
| | f.write(content) |
| |
|
| | def new_module(self, code): |
| | code = textwrap.dedent(code) |
| | mod_name, mod_fn = self.get_module() |
| | with open(mod_fn, "w", encoding="utf-8") as f: |
| | f.write(code) |
| | return mod_name, mod_fn |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def pickle_get_current_class(obj): |
| | """ |
| | Original issue comes from pickle; hence the name. |
| | """ |
| | name = obj.__class__.__name__ |
| | module_name = getattr(obj, "__module__", None) |
| | obj2 = sys.modules[module_name] |
| | for subpath in name.split("."): |
| | obj2 = getattr(obj2, subpath) |
| | return obj2 |
| |
|
| |
|
| | class TestAutoreload(Fixture): |
| | def test_reload_enums(self): |
| | mod_name, mod_fn = self.new_module( |
| | textwrap.dedent( |
| | """ |
| | from enum import Enum |
| | class MyEnum(Enum): |
| | A = 'A' |
| | B = 'B' |
| | """ |
| | ) |
| | ) |
| | self.shell.magic_autoreload("2") |
| | self.shell.magic_aimport(mod_name) |
| | self.write_file( |
| | mod_fn, |
| | textwrap.dedent( |
| | """ |
| | from enum import Enum |
| | class MyEnum(Enum): |
| | A = 'A' |
| | B = 'B' |
| | C = 'C' |
| | """ |
| | ), |
| | ) |
| | with tt.AssertNotPrints( |
| | ("[autoreload of %s failed:" % mod_name), channel="stderr" |
| | ): |
| | self.shell.run_code("pass") |
| |
|
| | def test_reload_class_type(self): |
| | self.shell.magic_autoreload("2") |
| | mod_name, mod_fn = self.new_module( |
| | """ |
| | class Test(): |
| | def meth(self): |
| | return "old" |
| | """ |
| | ) |
| | assert "test" not in self.shell.ns |
| | assert "result" not in self.shell.ns |
| |
|
| | self.shell.run_code("from %s import Test" % mod_name) |
| | self.shell.run_code("test = Test()") |
| |
|
| | self.write_file( |
| | mod_fn, |
| | """ |
| | class Test(): |
| | def meth(self): |
| | return "new" |
| | """, |
| | ) |
| |
|
| | test_object = self.shell.ns["test"] |
| |
|
| | |
| | self.shell.run_code("pass") |
| |
|
| | test_class = pickle_get_current_class(test_object) |
| | assert isinstance(test_object, test_class) |
| |
|
| | |
| | self.shell.run_code("import pickle") |
| | self.shell.run_code("p = pickle.dumps(test)") |
| |
|
| | def test_reload_class_attributes(self): |
| | self.shell.magic_autoreload("2") |
| | mod_name, mod_fn = self.new_module( |
| | textwrap.dedent( |
| | """ |
| | class MyClass: |
| | |
| | def __init__(self, a=10): |
| | self.a = a |
| | self.b = 22 |
| | # self.toto = 33 |
| | |
| | def square(self): |
| | print('compute square') |
| | return self.a*self.a |
| | """ |
| | ) |
| | ) |
| | self.shell.run_code("from %s import MyClass" % mod_name) |
| | self.shell.run_code("first = MyClass(5)") |
| | self.shell.run_code("first.square()") |
| | with self.assertRaises(AttributeError): |
| | self.shell.run_code("first.cube()") |
| | with self.assertRaises(AttributeError): |
| | self.shell.run_code("first.power(5)") |
| | self.shell.run_code("first.b") |
| | with self.assertRaises(AttributeError): |
| | self.shell.run_code("first.toto") |
| |
|
| | |
| |
|
| | self.write_file( |
| | mod_fn, |
| | textwrap.dedent( |
| | """ |
| | class MyClass: |
| | |
| | def __init__(self, a=10): |
| | self.a = a |
| | self.b = 11 |
| | |
| | def power(self, p): |
| | print('compute power '+str(p)) |
| | return self.a**p |
| | """ |
| | ), |
| | ) |
| |
|
| | self.shell.run_code("second = MyClass(5)") |
| |
|
| | for object_name in {"first", "second"}: |
| | self.shell.run_code(f"{object_name}.power(5)") |
| | with self.assertRaises(AttributeError): |
| | self.shell.run_code(f"{object_name}.cube()") |
| | with self.assertRaises(AttributeError): |
| | self.shell.run_code(f"{object_name}.square()") |
| | self.shell.run_code(f"{object_name}.b") |
| | self.shell.run_code(f"{object_name}.a") |
| | with self.assertRaises(AttributeError): |
| | self.shell.run_code(f"{object_name}.toto") |
| |
|
| | @skipif_not_numpy |
| | def test_comparing_numpy_structures(self): |
| | self.shell.magic_autoreload("2") |
| | mod_name, mod_fn = self.new_module( |
| | textwrap.dedent( |
| | """ |
| | import numpy as np |
| | class MyClass: |
| | a = (np.array((.1, .2)), |
| | np.array((.2, .3))) |
| | """ |
| | ) |
| | ) |
| | self.shell.run_code("from %s import MyClass" % mod_name) |
| | self.shell.run_code("first = MyClass()") |
| |
|
| | |
| | self.write_file( |
| | mod_fn, |
| | textwrap.dedent( |
| | """ |
| | import numpy as np |
| | class MyClass: |
| | a = (np.array((.3, .4)), |
| | np.array((.5, .6))) |
| | """ |
| | ), |
| | ) |
| |
|
| | with tt.AssertNotPrints( |
| | ("[autoreload of %s failed:" % mod_name), channel="stderr" |
| | ): |
| | self.shell.run_code("pass") |
| |
|
| | def test_autoload_newly_added_objects(self): |
| | |
| | self.shell.magic_autoreload("3") |
| | mod_code = """ |
| | def func1(): pass |
| | """ |
| | mod_name, mod_fn = self.new_module(textwrap.dedent(mod_code)) |
| | self.shell.run_code(f"from {mod_name} import *") |
| | self.shell.run_code("func1()") |
| | with self.assertRaises(NameError): |
| | self.shell.run_code("func2()") |
| | with self.assertRaises(NameError): |
| | self.shell.run_code("t = Test()") |
| | with self.assertRaises(NameError): |
| | self.shell.run_code("number") |
| |
|
| | |
| |
|
| | new_code = """ |
| | def func1(): pass |
| | def func2(): pass |
| | class Test: pass |
| | number = 0 |
| | from enum import Enum |
| | class TestEnum(Enum): |
| | A = 'a' |
| | """ |
| | self.write_file(mod_fn, textwrap.dedent(new_code)) |
| |
|
| | |
| | self.shell.run_code("func2()") |
| | |
| | self.shell.run_code(f"import sys; sys.modules['{mod_name}'].func2()") |
| | |
| | self.shell.run_code("t = Test()") |
| | |
| | self.shell.run_code("number") |
| | |
| | self.shell.run_code("TestEnum.A") |
| |
|
| | |
| |
|
| | new_code = """ |
| | def func1(): return 'changed' |
| | def func2(): return 'changed' |
| | class Test: |
| | def new_func(self): |
| | return 'changed' |
| | number = 1 |
| | from enum import Enum |
| | class TestEnum(Enum): |
| | A = 'a' |
| | B = 'added' |
| | """ |
| | self.write_file(mod_fn, textwrap.dedent(new_code)) |
| | self.shell.run_code("assert func1() == 'changed'") |
| | self.shell.run_code("assert func2() == 'changed'") |
| | self.shell.run_code("t = Test(); assert t.new_func() == 'changed'") |
| | self.shell.run_code("assert number == 1") |
| | if sys.version_info < (3, 12): |
| | self.shell.run_code("assert TestEnum.B.value == 'added'") |
| |
|
| | |
| |
|
| | new_mod_code = """ |
| | from enum import Enum |
| | class Ext(Enum): |
| | A = 'ext' |
| | def ext_func(): |
| | return 'ext' |
| | class ExtTest: |
| | def meth(self): |
| | return 'ext' |
| | ext_int = 2 |
| | """ |
| | new_mod_name, new_mod_fn = self.new_module(textwrap.dedent(new_mod_code)) |
| | current_mod_code = f""" |
| | from {new_mod_name} import * |
| | """ |
| | self.write_file(mod_fn, textwrap.dedent(current_mod_code)) |
| | self.shell.run_code("assert Ext.A.value == 'ext'") |
| | self.shell.run_code("assert ext_func() == 'ext'") |
| | self.shell.run_code("t = ExtTest(); assert t.meth() == 'ext'") |
| | self.shell.run_code("assert ext_int == 2") |
| |
|
| | def test_verbose_names(self): |
| | |
| | @dataclass |
| | class AutoreloadSettings: |
| | check_all: bool |
| | enabled: bool |
| | autoload_obj: bool |
| |
|
| | def gather_settings(mode): |
| | self.shell.magic_autoreload(mode) |
| | module_reloader = self.shell.auto_magics._reloader |
| | return AutoreloadSettings( |
| | module_reloader.check_all, |
| | module_reloader.enabled, |
| | module_reloader.autoload_obj, |
| | ) |
| |
|
| | assert gather_settings("0") == gather_settings("off") |
| | assert gather_settings("0") == gather_settings("OFF") |
| | assert gather_settings("1") == gather_settings("explicit") |
| | assert gather_settings("2") == gather_settings("all") |
| | assert gather_settings("3") == gather_settings("complete") |
| |
|
| | |
| | with self.assertRaises(ValueError): |
| | self.shell.magic_autoreload("4") |
| |
|
| | def test_aimport_parsing(self): |
| | |
| | module_reloader = self.shell.auto_magics._reloader |
| | self.shell.magic_aimport("os") |
| | assert module_reloader.modules["os"] is True |
| | assert "os" not in module_reloader.skip_modules.keys() |
| |
|
| | self.shell.magic_aimport("-math") |
| | assert module_reloader.skip_modules["math"] is True |
| | assert "math" not in module_reloader.modules.keys() |
| |
|
| | self.shell.magic_aimport( |
| | "-os, math" |
| | ) |
| | assert module_reloader.modules["math"] is True |
| | assert "math" not in module_reloader.skip_modules.keys() |
| | assert module_reloader.skip_modules["os"] is True |
| | assert "os" not in module_reloader.modules.keys() |
| |
|
| | def test_autoreload_output(self): |
| | self.shell.magic_autoreload("complete") |
| | mod_code = """ |
| | def func1(): pass |
| | """ |
| | mod_name, mod_fn = self.new_module(mod_code) |
| | self.shell.run_code(f"import {mod_name}") |
| | with tt.AssertPrints("", channel="stdout"): |
| | self.shell.run_code("pass") |
| |
|
| | self.shell.magic_autoreload("complete --print") |
| | self.write_file(mod_fn, mod_code) |
| | with tt.AssertPrints( |
| | f"Reloading '{mod_name}'.", channel="stdout" |
| | ): |
| | self.shell.run_code("pass") |
| |
|
| | self.shell.magic_autoreload("complete -p") |
| | self.write_file(mod_fn, mod_code) |
| | with tt.AssertPrints( |
| | f"Reloading '{mod_name}'.", channel="stdout" |
| | ): |
| | self.shell.run_code("pass") |
| |
|
| | self.shell.magic_autoreload("complete --print --log") |
| | self.write_file(mod_fn, mod_code) |
| | with tt.AssertPrints( |
| | f"Reloading '{mod_name}'.", channel="stdout" |
| | ): |
| | self.shell.run_code("pass") |
| |
|
| | self.shell.magic_autoreload("complete --print --log") |
| | self.write_file(mod_fn, mod_code) |
| | with self.assertLogs(logger="autoreload") as lo: |
| | self.shell.run_code("pass") |
| | assert lo.output == [f"INFO:autoreload:Reloading '{mod_name}'."] |
| |
|
| | self.shell.magic_autoreload("complete -l") |
| | self.write_file(mod_fn, mod_code) |
| | with self.assertLogs(logger="autoreload") as lo: |
| | self.shell.run_code("pass") |
| | assert lo.output == [f"INFO:autoreload:Reloading '{mod_name}'."] |
| |
|
| | def _check_smoketest(self, use_aimport=True): |
| | """ |
| | Functional test for the automatic reloader using either |
| | '%autoreload 1' or '%autoreload 2' |
| | """ |
| |
|
| | mod_name, mod_fn = self.new_module( |
| | """ |
| | x = 9 |
| | |
| | z = 123 # this item will be deleted |
| | |
| | def foo(y): |
| | return y + 3 |
| | |
| | class Baz(object): |
| | def __init__(self, x): |
| | self.x = x |
| | def bar(self, y): |
| | return self.x + y |
| | @property |
| | def quux(self): |
| | return 42 |
| | def zzz(self): |
| | '''This method will be deleted below''' |
| | return 99 |
| | |
| | class Bar: # old-style class: weakref doesn't work for it on Python < 2.7 |
| | def foo(self): |
| | return 1 |
| | """ |
| | ) |
| |
|
| | |
| | |
| | |
| | if use_aimport: |
| | self.shell.magic_autoreload("1") |
| | self.shell.magic_aimport(mod_name) |
| | stream = StringIO() |
| | self.shell.magic_aimport("", stream=stream) |
| | self.assertIn(("Modules to reload:\n%s" % mod_name), stream.getvalue()) |
| |
|
| | with self.assertRaises(ImportError): |
| | self.shell.magic_aimport("tmpmod_as318989e89ds") |
| | else: |
| | self.shell.magic_autoreload("2") |
| | self.shell.run_code("import %s" % mod_name) |
| | stream = StringIO() |
| | self.shell.magic_aimport("", stream=stream) |
| | self.assertTrue( |
| | "Modules to reload:\nall-except-skipped" in stream.getvalue() |
| | ) |
| | self.assertIn(mod_name, self.shell.ns) |
| |
|
| | mod = sys.modules[mod_name] |
| |
|
| | |
| | |
| | |
| | old_foo = mod.foo |
| | old_obj = mod.Baz(9) |
| | old_obj2 = mod.Bar() |
| |
|
| | def check_module_contents(): |
| | self.assertEqual(mod.x, 9) |
| | self.assertEqual(mod.z, 123) |
| |
|
| | self.assertEqual(old_foo(0), 3) |
| | self.assertEqual(mod.foo(0), 3) |
| |
|
| | obj = mod.Baz(9) |
| | self.assertEqual(old_obj.bar(1), 10) |
| | self.assertEqual(obj.bar(1), 10) |
| | self.assertEqual(obj.quux, 42) |
| | self.assertEqual(obj.zzz(), 99) |
| |
|
| | obj2 = mod.Bar() |
| | self.assertEqual(old_obj2.foo(), 1) |
| | self.assertEqual(obj2.foo(), 1) |
| |
|
| | check_module_contents() |
| |
|
| | |
| | |
| | |
| | |
| | self.write_file( |
| | mod_fn, |
| | """ |
| | a syntax error |
| | """, |
| | ) |
| |
|
| | with tt.AssertPrints( |
| | ("[autoreload of %s failed:" % mod_name), channel="stderr" |
| | ): |
| | self.shell.run_code("pass") |
| | with tt.AssertNotPrints( |
| | ("[autoreload of %s failed:" % mod_name), channel="stderr" |
| | ): |
| | self.shell.run_code("pass") |
| | check_module_contents() |
| |
|
| | |
| | |
| | |
| | self.write_file( |
| | mod_fn, |
| | """ |
| | x = 10 |
| | |
| | def foo(y): |
| | return y + 4 |
| | |
| | class Baz(object): |
| | def __init__(self, x): |
| | self.x = x |
| | def bar(self, y): |
| | return self.x + y + 1 |
| | @property |
| | def quux(self): |
| | return 43 |
| | |
| | class Bar: # old-style class |
| | def foo(self): |
| | return 2 |
| | """, |
| | ) |
| |
|
| | def check_module_contents(): |
| | self.assertEqual(mod.x, 10) |
| | self.assertFalse(hasattr(mod, "z")) |
| |
|
| | self.assertEqual(old_foo(0), 4) |
| | self.assertEqual(mod.foo(0), 4) |
| |
|
| | obj = mod.Baz(9) |
| | self.assertEqual(old_obj.bar(1), 11) |
| | self.assertEqual(obj.bar(1), 11) |
| |
|
| | self.assertEqual(old_obj.quux, 43) |
| | self.assertEqual(obj.quux, 43) |
| |
|
| | self.assertFalse(hasattr(old_obj, "zzz")) |
| | self.assertFalse(hasattr(obj, "zzz")) |
| |
|
| | obj2 = mod.Bar() |
| | self.assertEqual(old_obj2.foo(), 2) |
| | self.assertEqual(obj2.foo(), 2) |
| |
|
| | self.shell.run_code("pass") |
| | check_module_contents() |
| |
|
| | |
| | |
| | |
| | os.unlink(mod_fn) |
| |
|
| | self.shell.run_code("pass") |
| | check_module_contents() |
| |
|
| | |
| | |
| | |
| | if use_aimport: |
| | self.shell.magic_aimport("-" + mod_name) |
| | stream = StringIO() |
| | self.shell.magic_aimport("", stream=stream) |
| | self.assertTrue(("Modules to skip:\n%s" % mod_name) in stream.getvalue()) |
| |
|
| | |
| | self.shell.magic_aimport("-tmpmod_as318989e89ds") |
| | else: |
| | self.shell.magic_autoreload("0") |
| |
|
| | self.write_file( |
| | mod_fn, |
| | """ |
| | x = -99 |
| | """, |
| | ) |
| |
|
| | self.shell.run_code("pass") |
| | self.shell.run_code("pass") |
| | check_module_contents() |
| |
|
| | |
| | |
| | |
| | if use_aimport: |
| | self.shell.magic_aimport(mod_name) |
| | else: |
| | self.shell.magic_autoreload("") |
| |
|
| | self.shell.run_code("pass") |
| | self.assertEqual(mod.x, -99) |
| |
|
| | def test_smoketest_aimport(self): |
| | self._check_smoketest(use_aimport=True) |
| |
|
| | def test_smoketest_autoreload(self): |
| | self._check_smoketest(use_aimport=False) |
| |
|