Spaces:
Runtime error
Runtime error
| import sys, re, json, pprint | |
| from pathlib import Path | |
| from collections import defaultdict | |
| from inspect import currentframe, getframeinfo, ismodule | |
| __all__ = ['this_tests'] | |
| DB_NAME = 'test_registry.json' | |
| def _json_set_default(obj): | |
| if isinstance(obj, set): return list(obj) | |
| raise TypeError | |
| class TestRegistry: | |
| "Tests register which API they validate using this class." | |
| registry = defaultdict(list) | |
| this_tests_check = None | |
| missing_this_tests = set() | |
| # logic for checking whether each test calls `this_tests`: | |
| # 1. `this_tests_check` is set to True during test's 'setup' stage if it wasn't skipped | |
| # 2. if the test is dynamically skipped `this_tests_check` is set to False | |
| # 3. `this_tests` sets this flag to False when it's successfully completes | |
| # 4. if during the 'teardown' stage `this_tests_check` is still True then we | |
| # know that this test needs `this_tests_check` | |
| def this_tests(*funcs): | |
| prev_frame = currentframe().f_back.f_back | |
| file_name, lineno, test_name, _, _ = getframeinfo(prev_frame) | |
| parent_func_lineno, _ = get_parent_func(lineno, get_lines(file_name)) | |
| entry = {'file': relative_test_path(file_name), 'test': test_name , 'line': parent_func_lineno} | |
| for func in funcs: | |
| if func == 'na': | |
| # special case when we can't find a function to declare, e.g. | |
| # when attributes are tested | |
| continue | |
| try: | |
| func_fq = get_func_fq_name(func) | |
| except: | |
| raise Exception(f"'{func}' is not a function") from None | |
| if re.match(r'fastai\.', func_fq): | |
| if entry not in TestRegistry.registry[func_fq]: | |
| TestRegistry.registry[func_fq].append(entry) | |
| else: | |
| raise Exception(f"'{func}' is not in the fastai API") from None | |
| TestRegistry.this_tests_check = False | |
| def this_tests_check_on(): | |
| TestRegistry.this_tests_check = True | |
| def this_tests_check_off(): | |
| TestRegistry.this_tests_check = False | |
| def this_tests_check_run(file_name, test_name): | |
| if TestRegistry.this_tests_check: | |
| TestRegistry.missing_this_tests.add(f"{file_name}::{test_name}") | |
| def registry_save(): | |
| if TestRegistry.registry: | |
| path = Path(__file__).parent.parent.resolve()/DB_NAME | |
| if path.exists(): | |
| #print("\n*** Merging with the existing test registry") | |
| with open(path, 'r') as f: old_registry = json.load(f) | |
| TestRegistry.registry = merge_registries(old_registry, TestRegistry.registry) | |
| #print(f"\n*** Saving test registry @ {path}") | |
| with open(path, 'w') as f: | |
| json.dump(obj=TestRegistry.registry, fp=f, indent=4, sort_keys=True, default=_json_set_default) | |
| def missing_this_tests_alert(): | |
| if TestRegistry.missing_this_tests: | |
| tests = '\n '.join(sorted(TestRegistry.missing_this_tests)) | |
| print(f""" | |
| *** Attention *** | |
| Please include `this_tests` call in each of the following tests: | |
| {tests} | |
| For details see: https://docs.fast.ai/dev/test.html#test-registry""") | |
| # merge_registries helpers | |
| # merge dict of lists of dict | |
| def a2k(a): return '::'.join([a['file'], a['test']]), a['line'] | |
| def k2a(k, v): f,t = k.split('::'); return {"file": f, "line": v, "test": t} | |
| # merge by key that is a combination of 2 values: test, file | |
| def merge_lists(a, b): | |
| x = dict(map(a2k, [*a, *b])) # pack + merge | |
| return [k2a(k, v) for k,v in x.items()] # unpack | |
| def merge_registries(a, b): | |
| for i in b: a[i] = merge_lists(a[i], b[i]) if i in a else b[i] | |
| return a | |
| def this_tests(*funcs): TestRegistry.this_tests(*funcs) | |
| def str2func(name): | |
| "Converts 'fastai.foo.bar' into an function 'object' if such exists" | |
| if isinstance(name, str): subpaths = name.split('.') | |
| else: return None | |
| module = subpaths.pop(0) | |
| if module in sys.modules: obj = sys.modules[module] | |
| else: return None | |
| for subpath in subpaths: | |
| obj = getattr(obj, subpath, None) | |
| if obj == None: return None | |
| return obj | |
| def get_func_fq_name(func): | |
| if ismodule(func): return func.__name__ | |
| if isinstance(func, str): func = str2func(func) | |
| name = None | |
| if hasattr(func, '__qualname__'): name = func.__qualname__ | |
| elif hasattr(func, '__name__'): name = func.__name__ | |
| elif hasattr(func, '__wrapped__'): return get_func_fq_name(func.__wrapped__) | |
| elif hasattr(func, '__class__'): name = func.__class__.__name__ | |
| else: raise Exception(f"'{func}' is not a func or class") | |
| return f'{func.__module__}.{name}' | |
| def get_parent_func(lineno, lines, ignore_missing=False): | |
| "Find any lines where `elt` is called and return the parent test function" | |
| for idx,l in enumerate(reversed(lines[:lineno])): | |
| if re.match(f'\s*def test', l): return (lineno - idx), l # 1 based index for github | |
| if re.match(f'\w+', l): break # top level indent - out of function scope | |
| if ignore_missing: return None | |
| raise LookupError('Could not find parent function for line:', lineno, lines[:lineno]) | |
| def relative_test_path(test_file:Path)->str: | |
| "Path relative to the `fastai` parent directory" | |
| test_file = Path(test_file) | |
| testdir_idx = list(reversed(test_file.parts)).index('tests') | |
| return '/'.join(test_file.parts[-(testdir_idx+1):]) | |
| def get_lines(file): | |
| with open(file, 'r') as f: return f.readlines() | |