"""Unit tests for the Evaluation Queue functionality. Tests cover: - get_evaluation_queue_df correctly loads and categorizes queue entries - Status categorization: Finished, Running, Pending (incl. Waiting/Rerun) - Column completeness and data types - Robustness against entries missing expected fields (e.g. quant entries) - Count consistency: sum of 3 queues == total parseable entries - No crashes on the real cache_git/status data """ import json import logging import os import sys import tempfile import shutil import pandas as pd sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logging.basicConfig(level=logging.DEBUG, format="%(name)s %(levelname)s: %(message)s") logger = logging.getLogger("test_eval_queue") from src.populate import get_evaluation_queue_df from src.display.utils import EvalQueueColumn, EVAL_COLS, EVAL_TYPES, QUANT_COLS, QUANT_TYPES, eval_queue_cols from src.display.formatting import make_clickable_model # ── Paths ──────────────────────────────────────────────────────────────────── PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) REAL_STATUS_PATH = os.path.join(PROJECT_ROOT, "cache_git", "status") # ═══════════════════════════════════════════════════════════════════════════════ # Helpers # ═══════════════════════════════════════════════════════════════════════════════ def _count_json_recursive(path): """Count all .json files recursively under *path*.""" total = 0 for root, _dirs, files in os.walk(path): for f in files: if f.endswith(".json"): total += 1 return total def _collect_statuses(path): """Collect all status values from JSON files under *path*.""" statuses = [] for root, _dirs, files in os.walk(path): for f in files: if f.endswith(".json"): fp = os.path.join(root, f) try: with open(fp) as fh: d = json.load(fh) statuses.append(d.get("status", "UNKNOWN")) except (json.JSONDecodeError, OSError): pass return statuses def _create_test_entry(model, status, precision="4bit", extra=None): """Create a minimal queue JSON dict.""" entry = { "model": model, "revision": "main", "private": False, "precision": precision, "weight_dtype": "int4", "status": status, } if extra: entry.update(extra) return entry # ═══════════════════════════════════════════════════════════════════════════════ # Test 1: Real data — basic loading and non-crash # ═══════════════════════════════════════════════════════════════════════════════ def test_real_data_loads(): """get_evaluation_queue_df should load real cache_git/status without crashing.""" print(f"\n{'#'*70}") print(f" TEST: Real data loads without crash") print(f"{'#'*70}") assert os.path.isdir(REAL_STATUS_PATH), f"Status path missing: {REAL_STATUS_PATH}" finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) print(f" Finished: {len(finished_df)} rows") print(f" Running: {len(running_df)} rows") print(f" Pending: {len(pending_df)} rows") assert isinstance(finished_df, pd.DataFrame), "finished should be DataFrame" assert isinstance(running_df, pd.DataFrame), "running should be DataFrame" assert isinstance(pending_df, pd.DataFrame), "pending should be DataFrame" print(f" ✅ All three DataFrames loaded successfully") # ═══════════════════════════════════════════════════════════════════════════════ # Test 2: Status categorization correctness # ═══════════════════════════════════════════════════════════════════════════════ def test_status_categorization(): """Entries should be categorized into correct queues based on status field.""" print(f"\n{'#'*70}") print(f" TEST: Status categorization") print(f"{'#'*70}") # Manually count expected numbers from raw data statuses = _collect_statuses(REAL_STATUS_PATH) total_files = _count_json_recursive(REAL_STATUS_PATH) expected_pending = sum(1 for s in statuses if s in ("Pending", "Rerun", "Waiting")) expected_running = sum(1 for s in statuses if s == "Running") expected_finished = sum(1 for s in statuses if s.startswith("Finished") or s == "PENDING_NEW_EVAL") expected_total = expected_pending + expected_running + expected_finished print(f" JSON files on disk: {total_files}") print(f" Parseable statuses: {len(statuses)}") print(f" Expected pending: {expected_pending}") print(f" Expected running: {expected_running}") print(f" Expected finished: {expected_finished}") print(f" Sum (3 queues): {expected_total}") finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) actual_pending = len(pending_df) actual_running = len(running_df) actual_finished = len(finished_df) actual_total = actual_pending + actual_running + actual_finished print(f"\n Actual pending: {actual_pending}") print(f" Actual running: {actual_running}") print(f" Actual finished: {actual_finished}") print(f" Actual total: {actual_total}") errors = [] if actual_pending != expected_pending: errors.append(f"Pending mismatch: expected {expected_pending}, got {actual_pending}") if actual_running != expected_running: errors.append(f"Running mismatch: expected {expected_running}, got {actual_running}") if actual_finished != expected_finished: errors.append(f"Finished mismatch: expected {expected_finished}, got {actual_finished}") # Some entries may have unknown statuses and fall into none of the 3 queues uncategorized = len(statuses) - expected_total if uncategorized > 0: print(f"\n ⚠️ {uncategorized} entries have unrecognized status (not in any queue)") uncategorized_statuses = [s for s in statuses if s not in ("Pending", "Rerun", "Waiting", "Running") and not s.startswith("Finished") and s != "PENDING_NEW_EVAL"] for s in set(uncategorized_statuses): cnt = uncategorized_statuses.count(s) print(f" status='{s}': {cnt}") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ Categorization correct") # ═══════════════════════════════════════════════════════════════════════════════ # Test 3: Column completeness # ═══════════════════════════════════════════════════════════════════════════════ def test_columns_present(): """All three queue DataFrames should have the expected EVAL_COLS columns.""" print(f"\n{'#'*70}") print(f" TEST: Column completeness") print(f"{'#'*70}") print(f" Expected EVAL_COLS: {EVAL_COLS}") finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) errors = [] for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]: actual_cols = list(df.columns) print(f" {name:10s} columns: {actual_cols}") # Check that expected columns are present for col in EVAL_COLS: if col not in actual_cols: errors.append(f"{name}: missing column '{col}'") # Check for unexpected extra columns extra = [c for c in actual_cols if c not in EVAL_COLS] if extra: print(f" {name:10s} extra columns (not in EVAL_COLS): {extra}") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ All expected columns present") # ═══════════════════════════════════════════════════════════════════════════════ # Test 4: Quant entries (missing 'precision') don't crash the queue # ═══════════════════════════════════════════════════════════════════════════════ def test_quant_entry_no_crash(): """Quant entries that use quant_precision instead of precision shouldn't crash.""" print(f"\n{'#'*70}") print(f" TEST: Quant entry (missing 'precision') handling") print(f"{'#'*70}") # Find the actual quant entry we know about quant_file = None for root, _dirs, files in os.walk(REAL_STATUS_PATH): for f in files: if f.endswith(".json"): fp = os.path.join(root, f) try: d = json.load(open(fp)) if "precision" not in d and "quant_precision" in d: quant_file = fp break except: pass if quant_file: break if quant_file: print(f" Found quant entry without 'precision': {quant_file}") with open(quant_file) as fh: d = json.load(fh) print(f" status: {d.get('status')}") print(f" quant_precision: {d.get('quant_precision')}") print(f" has 'precision': {'precision' in d}") else: print(f" No quant entries without 'precision' found (skipping)") # The main check: no crash finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) # If the quant entry is Pending, it should be in pending_df if quant_file: d = json.load(open(quant_file)) model_name = d["model"] status = d["status"] # Check which queue it ended up in target_df = None target_name = None if status in ("Pending", "Rerun", "Waiting"): target_df = pending_df target_name = "pending" elif status == "Running": target_df = running_df target_name = "running" elif status.startswith("Finished") or status == "PENDING_NEW_EVAL": target_df = finished_df target_name = "finished" if target_df is not None: # The 'model' column contains clickable HTML, search within found = target_df["model"].astype(str).str.contains(model_name, regex=False).any() if found: print(f" ✅ Quant entry '{model_name}' correctly in {target_name} queue") else: print(f" ❌ Quant entry '{model_name}' NOT found in {target_name} queue") assert False, f"Quant entry missing from {target_name}" # Check the 'precision' column for this entry — it should be NaN or empty mask = target_df["model"].astype(str).str.contains(model_name, regex=False) row = target_df[mask] precision_val = row["precision"].iloc[0] if len(row) > 0 else "N/A" print(f" precision column value: {precision_val} (type: {type(precision_val).__name__})") else: print(f" ⚠️ Quant entry has unrecognized status: {status}") else: print(f" ✅ No crash (no quant entries to test specifically)") # ═══════════════════════════════════════════════════════════════════════════════ # Test 5: Synthetic data — controlled status routing # ═══════════════════════════════════════════════════════════════════════════════ def test_synthetic_status_routing(): """Test status routing with synthetic data covering all status variants.""" print(f"\n{'#'*70}") print(f" TEST: Synthetic status routing") print(f"{'#'*70}") tmpdir = tempfile.mkdtemp(prefix="test_queue_") try: test_cases = [ ("pending_model", "Pending", "pending"), ("rerun_model", "Rerun", "pending"), ("waiting_model", "Waiting", "pending"), ("running_model", "Running", "running"), ("finished_model", "Finished", "finished"), ("finished2_model","Finished_2024", "finished"), # startswith("Finished") ("pne_model", "PENDING_NEW_EVAL", "finished"), ] for model, status, _ in test_cases: entry = _create_test_entry(f"test/{model}", status) fname = f"{model}_{status}.json" with open(os.path.join(tmpdir, fname), "w") as fh: json.dump(entry, fh) finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) errors = [] for model, status, expected_queue in test_cases: full_model = f"test/{model}" if expected_queue == "pending": df = pending_df elif expected_queue == "running": df = running_df else: df = finished_df found = df["model"].astype(str).str.contains(full_model, regex=False).any() label = f"status='{status}' → {expected_queue}" if found: print(f" ✅ {label}") else: print(f" ❌ {label} — NOT FOUND") errors.append(label) # Verify counts print(f"\n Counts: pending={len(pending_df)}, running={len(running_df)}, finished={len(finished_df)}") expected_counts = {"pending": 3, "running": 1, "finished": 3} if len(pending_df) != expected_counts["pending"]: errors.append(f"pending count: expected {expected_counts['pending']}, got {len(pending_df)}") if len(running_df) != expected_counts["running"]: errors.append(f"running count: expected {expected_counts['running']}, got {len(running_df)}") if len(finished_df) != expected_counts["finished"]: errors.append(f"finished count: expected {expected_counts['finished']}, got {len(finished_df)}") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ All synthetic entries correctly routed") finally: shutil.rmtree(tmpdir, ignore_errors=True) def test_resubmitted_quant_pending_not_overridden_by_old_failed_result(): """A fresh quant re-submit should stay Pending despite an older failed result.""" tmpdir = tempfile.mkdtemp(prefix="test_queue_resubmit_quant_") results_dir = tempfile.mkdtemp(prefix="test_results_resubmit_quant_") try: entry = { "model": "org/model", "revision": "main", "private": False, "quant_scheme": "INT4 (W4A16)", "quant_precision": "4bit", "quant_weight_dtype": "int4", "status": "Pending", "submitted_time": "2026-05-21T10:00:00Z", "script": "auto_quant", "model_params": 7.0, } with open(os.path.join(tmpdir, "request.json"), "w") as fh: json.dump(entry, fh) old_failed_result = { "model_id": "org/model", "generated_at": "2026-05-21T09:00:00Z", "run_dir": "runs/old", "copied_files": ["x"], "quant_summary": {"scheme": "W4A16", "status": "failed"}, "accuracy": {"status": "missing"}, } with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh: json.dump(old_failed_result, fh) finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df( tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir ) assert len(pending_df) == 1 assert pending_df["model"].astype(str).str.contains("org/model", regex=False).any() assert len(failed_df) == 0 finally: shutil.rmtree(tmpdir, ignore_errors=True) shutil.rmtree(results_dir, ignore_errors=True) def test_submitted_quant_is_failed_when_result_is_newer(): """A newer failed result should still move an older Pending request to Failed.""" tmpdir = tempfile.mkdtemp(prefix="test_queue_newer_result_quant_") results_dir = tempfile.mkdtemp(prefix="test_results_newer_result_quant_") try: entry = { "model": "org/model", "revision": "main", "private": False, "quant_scheme": "INT4 (W4A16)", "quant_precision": "4bit", "quant_weight_dtype": "int4", "status": "Pending", "submitted_time": "2026-05-21T08:00:00Z", "script": "auto_quant", "model_params": 7.0, } with open(os.path.join(tmpdir, "request.json"), "w") as fh: json.dump(entry, fh) newer_failed_result = { "model_id": "org/model", "generated_at": "2026-05-21T09:00:00Z", "run_dir": "runs/new", "copied_files": ["x"], "quant_summary": {"scheme": "W4A16", "status": "failed"}, "accuracy": {"status": "missing"}, } with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh: json.dump(newer_failed_result, fh) finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df( tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir ) assert len(pending_df) == 0 assert len(failed_df) == 1 assert failed_df["model"].astype(str).str.contains("org/model", regex=False).any() finally: shutil.rmtree(tmpdir, ignore_errors=True) shutil.rmtree(results_dir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════════════════════ # Test 6: Synthetic — unknown status entries are silently dropped # ═══════════════════════════════════════════════════════════════════════════════ def test_unknown_status_dropped(): """Entries with unrecognized status values should not appear in any queue.""" print(f"\n{'#'*70}") print(f" TEST: Unknown status entries are dropped") print(f"{'#'*70}") tmpdir = tempfile.mkdtemp(prefix="test_queue_unknown_") try: entries = [ _create_test_entry("test/good", "Pending"), _create_test_entry("test/bad1", "Cancelled"), _create_test_entry("test/bad2", "Failed"), _create_test_entry("test/bad3", "Deleted"), ] for i, entry in enumerate(entries): with open(os.path.join(tmpdir, f"entry_{i}.json"), "w") as fh: json.dump(entry, fh) finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) total = len(finished_df) + len(running_df) + len(pending_df) print(f" Total in queues: {total} (expected 1)") assert total == 1, f"Expected 1 entry in queues, got {total}" assert len(pending_df) == 1, f"Expected 1 pending, got {len(pending_df)}" print(f" ✅ Only recognized status entries kept") finally: shutil.rmtree(tmpdir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════════════════════ # Test 7: Synthetic — subdirectory entries are also loaded # ═══════════════════════════════════════════════════════════════════════════════ def test_subdirectory_loading(): """Entries in subdirectories should also be loaded (org/model pattern).""" print(f"\n{'#'*70}") print(f" TEST: Subdirectory loading") print(f"{'#'*70}") tmpdir = tempfile.mkdtemp(prefix="test_queue_subdir_") try: # Root-level entry root_entry = _create_test_entry("root/model1", "Finished") with open(os.path.join(tmpdir, "root_model1.json"), "w") as fh: json.dump(root_entry, fh) # Subdirectory entry (like org/model pattern) subdir = os.path.join(tmpdir, "myorg") os.makedirs(subdir) sub_entry = _create_test_entry("myorg/model2", "Running") with open(os.path.join(subdir, "model2.json"), "w") as fh: json.dump(sub_entry, fh) finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) errors = [] if len(finished_df) != 1: errors.append(f"Expected 1 finished (root-level), got {len(finished_df)}") if len(running_df) != 1: errors.append(f"Expected 1 running (subdirectory), got {len(running_df)}") # Verify the subdirectory model is found if len(running_df) > 0: found = running_df["model"].astype(str).str.contains("myorg/model2", regex=False).any() if not found: errors.append("Subdirectory model 'myorg/model2' not found in running queue") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ Both root and subdirectory entries loaded") finally: shutil.rmtree(tmpdir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════════════════════ # Test 8: Malformed JSON files are skipped gracefully # ═══════════════════════════════════════════════════════════════════════════════ def test_malformed_json_skipped(): """Malformed JSON files should be skipped without crashing.""" print(f"\n{'#'*70}") print(f" TEST: Malformed JSON handling") print(f"{'#'*70}") tmpdir = tempfile.mkdtemp(prefix="test_queue_malformed_") try: # Valid entry good = _create_test_entry("test/good", "Pending") with open(os.path.join(tmpdir, "good.json"), "w") as fh: json.dump(good, fh) # Malformed JSON with open(os.path.join(tmpdir, "bad.json"), "w") as fh: fh.write("{broken json content") # Empty file with open(os.path.join(tmpdir, "empty.json"), "w") as fh: fh.write("") finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) total = len(finished_df) + len(running_df) + len(pending_df) print(f" Total entries loaded: {total} (expected 1)") assert total == 1, f"Expected 1, got {total}" print(f" ✅ Malformed files skipped, good entry loaded") finally: shutil.rmtree(tmpdir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════════════════════ # Test 9: Empty directory returns empty DataFrames # ═══════════════════════════════════════════════════════════════════════════════ def test_empty_directory(): """An empty status directory should return three empty DataFrames.""" print(f"\n{'#'*70}") print(f" TEST: Empty directory handling") print(f"{'#'*70}") tmpdir = tempfile.mkdtemp(prefix="test_queue_empty_") try: finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) assert len(finished_df) == 0, f"finished should be empty, got {len(finished_df)}" assert len(running_df) == 0, f"running should be empty, got {len(running_df)}" assert len(pending_df) == 0, f"pending should be empty, got {len(pending_df)}" print(f" ✅ Empty directory returns 3 empty DataFrames") finally: shutil.rmtree(tmpdir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════════════════════ # Test 10: Real data — model column contains clickable links # ═══════════════════════════════════════════════════════════════════════════════ def test_model_column_clickable(): """The model column should contain HTML hyperlinks (make_clickable_model).""" print(f"\n{'#'*70}") print(f" TEST: Model column contains clickable links") print(f"{'#'*70}") finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) errors = [] for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]: if len(df) == 0: print(f" {name}: empty (skipped)") continue # Check first row's model column first_model = str(df["model"].iloc[0]) has_link = "= total_all and total_quant > 0: errors.append("eval filter didn't exclude any quant entries") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ eval + quant = total, filters work correctly") # ═══════════════════════════════════════════════════════════════════════════════ # Test 14: Quant queue uses QUANT_COLS correctly # ═══════════════════════════════════════════════════════════════════════════════ def test_quant_queue_columns(): """Quant queue should use quant-specific columns (quant_scheme, input_dtype).""" print(f"\n{'#'*70}") print(f" TEST: Quant queue has correct columns") print(f"{'#'*70}") print(f" Expected QUANT_COLS: {QUANT_COLS}") quant_fin, quant_run, quant_pend = get_evaluation_queue_df( REAL_STATUS_PATH, QUANT_COLS, request_type="quant" ) total = len(quant_fin) + len(quant_run) + len(quant_pend) print(f" Total quant entries: {total}") if total == 0: print(f" ⚠️ No quant entries in real data — checking column structure only") for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]: actual_cols = list(df.columns) print(f" {name:10s} columns: {actual_cols}") # Verify columns don't contain eval-only fields errors = [] for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]: for col in ["weight_type"]: if col in df.columns: errors.append(f"{name}: should NOT have eval-specific column '{col}'") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ Quant queue columns are correct") # ═══════════════════════════════════════════════════════════════════════════════ # Test 15: Synthetic — request_type filter routes correctly # ═══════════════════════════════════════════════════════════════════════════════ def test_synthetic_request_type_filter(): """Synthetic test: eval/quant filter correctly separates by filename pattern.""" print(f"\n{'#'*70}") print(f" TEST: Synthetic request_type filter") print(f"{'#'*70}") tmpdir = tempfile.mkdtemp(prefix="test_queue_filter_") try: # Create eval entries for i in range(3): entry = _create_test_entry(f"org/eval_model_{i}", "Finished") fname = f"eval_model_{i}_eval_request_False_AWQ_4bit_int4.json" with open(os.path.join(tmpdir, fname), "w") as fh: json.dump(entry, fh) # Create quant entries for i in range(2): entry = _create_test_entry(f"org/quant_model_{i}", "Pending", extra={"quant_scheme": "INT4 (W4A16)", "input_dtype": "bfloat16"}) fname = f"quant_model_{i}_quant_request_False_INT4.json" with open(os.path.join(tmpdir, fname), "w") as fh: json.dump(entry, fh) # No filter — all 5 all_fin, all_run, all_pend = get_evaluation_queue_df(tmpdir, EVAL_COLS) total_all = len(all_fin) + len(all_run) + len(all_pend) # Eval filter — should get 3 eval_fin, eval_run, eval_pend = get_evaluation_queue_df( tmpdir, EVAL_COLS, request_type="eval" ) total_eval = len(eval_fin) + len(eval_run) + len(eval_pend) # Quant filter — should get 2 quant_fin, quant_run, quant_pend = get_evaluation_queue_df( tmpdir, QUANT_COLS, request_type="quant" ) total_quant = len(quant_fin) + len(quant_run) + len(quant_pend) print(f" All: {total_all} (expected 5)") print(f" Eval: {total_eval} (expected 3)") print(f" Quant: {total_quant} (expected 2)") errors = [] if total_all != 5: errors.append(f"All: expected 5, got {total_all}") if total_eval != 3: errors.append(f"Eval: expected 3, got {total_eval}") if total_quant != 2: errors.append(f"Quant: expected 2, got {total_quant}") # Verify quant entries have quant_scheme column if total_quant > 0 and "quant_scheme" in quant_pend.columns: vals = quant_pend["quant_scheme"].dropna().tolist() if all(v == "INT4 (W4A16)" for v in vals): print(f" ✅ Quant entries have correct quant_scheme") else: errors.append(f"quant_scheme values: {vals}") if errors: for e in errors: print(f" ❌ {e}") assert False, "; ".join(errors) else: print(f" ✅ request_type filter works correctly") finally: shutil.rmtree(tmpdir, ignore_errors=True) # ═══════════════════════════════════════════════════════════════════════════════ # Main # ═══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": print("=" * 70) print(" Evaluation Queue Unit Tests") print("=" * 70) tests = [ ("test_real_data_loads", test_real_data_loads), ("test_status_categorization", test_status_categorization), ("test_columns_present", test_columns_present), ("test_quant_entry_no_crash", test_quant_entry_no_crash), ("test_synthetic_status_routing", test_synthetic_status_routing), ("test_unknown_status_dropped", test_unknown_status_dropped), ("test_subdirectory_loading", test_subdirectory_loading), ("test_malformed_json_skipped", test_malformed_json_skipped), ("test_empty_directory", test_empty_directory), ("test_model_column_clickable", test_model_column_clickable), ("test_no_cross_queue_duplicates", test_no_cross_queue_duplicates), ("test_queue_size_sanity", test_queue_size_sanity), ("test_eval_filter_excludes_quant", test_eval_filter_excludes_quant), ("test_quant_queue_columns", test_quant_queue_columns), ("test_synthetic_request_type_filter", test_synthetic_request_type_filter), ] results = {} for name, func in tests: try: func() results[name] = True except Exception as e: results[name] = False print(f" ❌ EXCEPTION: {e}") print(f"\n{'='*70}") print(" SUMMARY") print(f"{'='*70}") for name, passed in results.items(): status = "✅ PASS" if passed else "❌ FAIL" print(f" {status} {name}") total = len(results) passed = sum(1 for v in results.values() if v) print(f"\n {passed}/{total} tests passed") print(f"{'='*70}")