|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
Unit and integration testing for the expanded trace_back module, including analytics, visualization, streaming, plugins, error handling, and benchmarking.
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import unittest
|
|
|
import random
|
|
|
import time
|
|
|
import threading
|
|
|
import tempfile
|
|
|
import os
|
|
|
from absl.testing import absltest
|
|
|
import ddar
|
|
|
import graph as gh
|
|
|
import problem as pr
|
|
|
import trace_back as tb
|
|
|
|
|
|
from hypothesis import given, strategies as st
|
|
|
import logging
|
|
|
class TracebackPropertyBasedTest(unittest.TestCase):
|
|
|
@given(st.lists(st.text(min_size=1, max_size=5), min_size=1, max_size=10))
|
|
|
def test_randomized_dummy_traceback(self, names):
|
|
|
|
|
|
class DummyDepSmall:
|
|
|
def __init__(self, name):
|
|
|
self._name = name
|
|
|
def hashed(self):
|
|
|
return self._name
|
|
|
@property
|
|
|
def rule_name(self):
|
|
|
return ''
|
|
|
@property
|
|
|
def why(self):
|
|
|
return []
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
queries = [DummyDepSmall(n) for n in names]
|
|
|
logs = tb.parallel_recursive_traceback(queries, max_workers=4)
|
|
|
self.assertEqual(len(logs), len(names))
|
|
|
|
|
|
@given(st.lists(st.text(min_size=1, max_size=10), min_size=10, max_size=100))
|
|
|
def test_large_randomized_traceback(self, names):
|
|
|
|
|
|
class DummyDep:
|
|
|
def __init__(self, name):
|
|
|
self._name = name
|
|
|
def hashed(self):
|
|
|
return self._name
|
|
|
@property
|
|
|
def rule_name(self):
|
|
|
return ''
|
|
|
@property
|
|
|
def why(self):
|
|
|
return []
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
queries = [DummyDep(n) for n in names]
|
|
|
logs = tb.parallel_recursive_traceback(queries, max_workers=4)
|
|
|
self.assertEqual(len(logs), len(names))
|
|
|
|
|
|
def test_fuzzing_malformed_dependencies(self):
|
|
|
class MalformedDep:
|
|
|
def __init__(self, name):
|
|
|
self._name = name
|
|
|
def hashed(self):
|
|
|
if random.random() < 0.5:
|
|
|
raise Exception("Malformed hash")
|
|
|
return self._name
|
|
|
@property
|
|
|
def rule_name(self):
|
|
|
return ''
|
|
|
@property
|
|
|
def why(self):
|
|
|
return []
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
deps = [MalformedDep(f"bad{i}") for i in range(20)]
|
|
|
for dep in deps:
|
|
|
result = tb.safe_traceback(dep)
|
|
|
self.assertTrue(result is None or isinstance(result, list))
|
|
|
|
|
|
def test_plugin_chaining_and_dynamic_loading(self):
|
|
|
class PluginA(tb.TracebackPlugin):
|
|
|
def analyze(self, log):
|
|
|
return 'A'
|
|
|
class PluginB(tb.TracebackPlugin):
|
|
|
def analyze(self, log):
|
|
|
return 'B'
|
|
|
pm = tb.TracebackPluginManager()
|
|
|
pm.register('A', PluginA())
|
|
|
pm.register('B', PluginB())
|
|
|
log = [([DummyDep('A')], [DummyDep('B')])]
|
|
|
results = pm.run_all(log)
|
|
|
self.assertEqual(results['A'], 'A')
|
|
|
self.assertEqual(results['B'], 'B')
|
|
|
|
|
|
for i in range(10):
|
|
|
pm.register(f"dyn{i}", PluginA())
|
|
|
results = pm.run_all(log)
|
|
|
self.assertEqual(results['dyn9'], 'A')
|
|
|
|
|
|
def test_streaming_under_failure(self):
|
|
|
log = [([DummyDep('A')], [DummyDep('B')]) for _ in range(5)]
|
|
|
streamer = tb.TracebackStreamer()
|
|
|
results = []
|
|
|
def faulty_listener(step):
|
|
|
if len(results) == 2:
|
|
|
raise Exception("Listener failure")
|
|
|
results.append(step)
|
|
|
streamer.add_listener(faulty_listener)
|
|
|
t = threading.Thread(target=lambda: streamer.stream(log))
|
|
|
t.start()
|
|
|
t.join()
|
|
|
self.assertGreaterEqual(len(results), 2)
|
|
|
|
|
|
def test_provenance_export_and_import(self):
|
|
|
log = [([DummyDep('A')], [DummyDep('B')]) for _ in range(3)]
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
tb.export_traceback_provenance(log, f.name)
|
|
|
self.assertTrue(os.path.exists(f.name))
|
|
|
with open(f.name, 'r', encoding='utf-8') as fin:
|
|
|
data = fin.read()
|
|
|
self.assertIn('prems', data)
|
|
|
os.remove(f.name)
|
|
|
|
|
|
def test_compliance_logging_and_error_propagation(self):
|
|
|
logger = logging.getLogger('traceback_compliance')
|
|
|
logger.setLevel(logging.INFO)
|
|
|
with self.assertLogs('traceback_compliance', level='INFO') as cm:
|
|
|
logger.info('Compliance event')
|
|
|
self.assertIn('Compliance event', cm.output[0])
|
|
|
|
|
|
def test_stress_parallel_streaming(self):
|
|
|
log = [([DummyDep(f"A{i}")], [DummyDep(f"B{i}")]) for i in range(100)]
|
|
|
streamer = tb.TracebackStreamer()
|
|
|
results = []
|
|
|
streamer.add_listener(lambda step: results.append(step))
|
|
|
t = threading.Thread(target=lambda: streamer.stream(log))
|
|
|
t.start()
|
|
|
t.join()
|
|
|
self.assertEqual(len(results), 100)
|
|
|
class DummyDep:
|
|
|
def __init__(self, name):
|
|
|
self._name = name
|
|
|
def hashed(self):
|
|
|
return self._name
|
|
|
@property
|
|
|
def rule_name(self):
|
|
|
return ''
|
|
|
@property
|
|
|
def why(self):
|
|
|
return []
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
queries = [DummyDep(n) for n in names]
|
|
|
logs = tb.parallel_recursive_traceback(queries, max_workers=2)
|
|
|
self.assertEqual(len(logs), len(names))
|
|
|
|
|
|
def test_empty_and_cyclic(self):
|
|
|
|
|
|
stats = tb.traceback_statistics([])
|
|
|
self.assertEqual(stats['num_steps'], 0)
|
|
|
|
|
|
class CyclicDep:
|
|
|
def __init__(self, name):
|
|
|
self._name = name
|
|
|
def hashed(self):
|
|
|
return self._name
|
|
|
@property
|
|
|
def rule_name(self):
|
|
|
return ''
|
|
|
@property
|
|
|
def why(self):
|
|
|
return [self]
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
result = tb.safe_traceback(CyclicDep('cycle'))
|
|
|
self.assertIsInstance(result, list)
|
|
|
|
|
|
def test_plugin_chaining(self):
|
|
|
class PluginA(tb.TracebackPlugin):
|
|
|
def analyze(self, log):
|
|
|
return 'A'
|
|
|
class PluginB(tb.TracebackPlugin):
|
|
|
def analyze(self, log):
|
|
|
return 'B'
|
|
|
pm = tb.TracebackPluginManager()
|
|
|
pm.register('A', PluginA())
|
|
|
pm.register('B', PluginB())
|
|
|
log = [([DummyDep('A')], [DummyDep('B')])]
|
|
|
results = pm.run_all(log)
|
|
|
self.assertEqual(results['A'], 'A')
|
|
|
self.assertEqual(results['B'], 'B')
|
|
|
|
|
|
def test_external_prover_stub(self):
|
|
|
called = []
|
|
|
def prover_api(step):
|
|
|
called.append(step)
|
|
|
log = [([DummyDep('A')], [DummyDep('B')]) for _ in range(3)]
|
|
|
tb.integrate_external_prover(log, prover_api)
|
|
|
self.assertEqual(len(called), 3)
|
|
|
|
|
|
def test_logging_and_compliance(self):
|
|
|
logger = logging.getLogger('traceback_test')
|
|
|
logger.setLevel(logging.INFO)
|
|
|
with self.assertLogs('traceback_test', level='INFO') as cm:
|
|
|
logger.info('Compliance log test')
|
|
|
self.assertIn('Compliance log test', cm.output[0])
|
|
|
|
|
|
|
|
|
|
|
|
class DummyDep:
|
|
|
def __init__(self, name):
|
|
|
self._name = name
|
|
|
def hashed(self):
|
|
|
return self._name
|
|
|
@property
|
|
|
def rule_name(self):
|
|
|
return random.choice(['', 'c0', 'collx', 'coll'])
|
|
|
@property
|
|
|
def why(self):
|
|
|
return []
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
|
|
|
class TracebackAdvancedTest(unittest.TestCase):
|
|
|
def test_traceback_statistics(self):
|
|
|
log = [[DummyDep('A')], [DummyDep('B')]]
|
|
|
log = [(l, [DummyDep('C')]) for l in log]
|
|
|
stats = tb.traceback_statistics(log)
|
|
|
self.assertIn('num_steps', stats)
|
|
|
|
|
|
def test_export_provenance(self):
|
|
|
log = [([DummyDep('A')], [DummyDep('B')])]
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
tb.export_traceback_provenance(log, f.name)
|
|
|
self.assertTrue(os.path.exists(f.name))
|
|
|
os.remove(f.name)
|
|
|
|
|
|
def test_visualization(self):
|
|
|
log = [([DummyDep('A')], [DummyDep('B')])]
|
|
|
tb.visualize_traceback_graph(log, show=False)
|
|
|
|
|
|
def test_parallel_traceback(self):
|
|
|
queries = [DummyDep(f"Q{i}") for i in range(10)]
|
|
|
logs = tb.parallel_recursive_traceback(queries, max_workers=2)
|
|
|
self.assertEqual(len(logs), 10)
|
|
|
|
|
|
def test_streaming(self):
|
|
|
log = [([DummyDep('A')], [DummyDep('B')]) for _ in range(5)]
|
|
|
streamer = tb.TracebackStreamer()
|
|
|
results = []
|
|
|
streamer.add_listener(lambda step: results.append(step))
|
|
|
t = threading.Thread(target=lambda: streamer.stream(log))
|
|
|
t.start()
|
|
|
t.join()
|
|
|
self.assertEqual(len(results), 5)
|
|
|
|
|
|
def test_plugin_system(self):
|
|
|
class StepCountPlugin(tb.TracebackPlugin):
|
|
|
def analyze(self, log):
|
|
|
return len(log)
|
|
|
pm = tb.TracebackPluginManager()
|
|
|
pm.register("step_count", StepCountPlugin())
|
|
|
log = [([DummyDep('A')], [DummyDep('B')]) for _ in range(3)]
|
|
|
results = pm.run_all(log)
|
|
|
self.assertEqual(results['step_count'], 3)
|
|
|
|
|
|
def test_safe_traceback(self):
|
|
|
class BadDep:
|
|
|
def hashed(self):
|
|
|
raise Exception("fail")
|
|
|
def remove_loop(self):
|
|
|
return self
|
|
|
result = tb.safe_traceback(BadDep())
|
|
|
self.assertIsNone(result)
|
|
|
|
|
|
def test_benchmark_large_dag(self):
|
|
|
|
|
|
n = 1000
|
|
|
queries = [DummyDep(f"Q{i}") for i in range(n)]
|
|
|
start = time.time()
|
|
|
logs = tb.parallel_recursive_traceback(queries, max_workers=8)
|
|
|
elapsed = time.time() - start
|
|
|
self.assertEqual(len(logs), n)
|
|
|
self.assertLess(elapsed, 10)
|
|
|
|
|
|
@classmethod
|
|
|
def setUpClass(cls):
|
|
|
super().setUpClass()
|
|
|
cls.defs = pr.Definition.from_txt_file('defs.txt', to_dict=True)
|
|
|
cls.rules = pr.Theorem.from_txt_file('rules.txt', to_dict=True)
|
|
|
|
|
|
def test_orthocenter_dependency_difference(self):
|
|
|
txt = 'a b c = triangle a b c; d = on_tline d b a c, on_tline d c a b; e = on_line e a c, on_line e b d ? perp a d b c'
|
|
|
p = pr.Problem.from_txt(txt)
|
|
|
g, _ = gh.Graph.build_problem(p, TracebackTest.defs)
|
|
|
|
|
|
ddar.solve(g, TracebackTest.rules, p)
|
|
|
|
|
|
goal_args = g.names2nodes(p.goal.args)
|
|
|
query = pr.Dependency(p.goal.name, goal_args, None, None)
|
|
|
|
|
|
setup, aux, _, _ = tb.get_logs(query, g, merge_trivials=False)
|
|
|
|
|
|
|
|
|
setup = [p.hashed() for p in setup]
|
|
|
aux = [p.hashed() for p in aux]
|
|
|
|
|
|
self.assertCountEqual(
|
|
|
setup, [('perp', 'a', 'c', 'b', 'd'), ('perp', 'a', 'b', 'c', 'd')]
|
|
|
)
|
|
|
|
|
|
self.assertCountEqual(
|
|
|
aux, [('coll', 'a', 'c', 'e'), ('coll', 'b', 'd', 'e')]
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
absltest.main()
|
|
|
|