| """Unit tests for the Evaluation Queue functionality. |
| |
| Tests cover: |
| - get_evaluation_queue_df correctly loads and categorizes queue entries |
| - Status categorization: Finished, Running, Pending (incl. Waiting/Rerun) |
| - Column completeness and data types |
| - Robustness against entries missing expected fields (e.g. quant entries) |
| - Count consistency: sum of 3 queues == total parseable entries |
| - No crashes on the real cache_git/status data |
| """ |
|
|
| import json |
| import logging |
| import os |
| import sys |
| import tempfile |
| import shutil |
|
|
| import pandas as pd |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| logging.basicConfig(level=logging.DEBUG, format="%(name)s %(levelname)s: %(message)s") |
| logger = logging.getLogger("test_eval_queue") |
|
|
| from src.populate import get_evaluation_queue_df |
| from src.display.utils import EvalQueueColumn, EVAL_COLS, EVAL_TYPES, QUANT_COLS, QUANT_TYPES, eval_queue_cols |
| from src.display.formatting import make_clickable_model |
|
|
| |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| REAL_STATUS_PATH = os.path.join(PROJECT_ROOT, "cache_git", "status") |
|
|
|
|
| |
| |
| |
|
|
| def _count_json_recursive(path): |
| """Count all .json files recursively under *path*.""" |
| total = 0 |
| for root, _dirs, files in os.walk(path): |
| for f in files: |
| if f.endswith(".json"): |
| total += 1 |
| return total |
|
|
|
|
| def _collect_statuses(path): |
| """Collect all status values from JSON files under *path*.""" |
| statuses = [] |
| for root, _dirs, files in os.walk(path): |
| for f in files: |
| if f.endswith(".json"): |
| fp = os.path.join(root, f) |
| try: |
| with open(fp) as fh: |
| d = json.load(fh) |
| statuses.append(d.get("status", "UNKNOWN")) |
| except (json.JSONDecodeError, OSError): |
| pass |
| return statuses |
|
|
|
|
| def _create_test_entry(model, status, precision="4bit", extra=None): |
| """Create a minimal queue JSON dict.""" |
| entry = { |
| "model": model, |
| "revision": "main", |
| "private": False, |
| "precision": precision, |
| "weight_dtype": "int4", |
| "status": status, |
| } |
| if extra: |
| entry.update(extra) |
| return entry |
|
|
|
|
| |
| |
| |
|
|
| def test_real_data_loads(): |
| """get_evaluation_queue_df should load real cache_git/status without crashing.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Real data loads without crash") |
| print(f"{'#'*70}") |
|
|
| assert os.path.isdir(REAL_STATUS_PATH), f"Status path missing: {REAL_STATUS_PATH}" |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| print(f" Finished: {len(finished_df)} rows") |
| print(f" Running: {len(running_df)} rows") |
| print(f" Pending: {len(pending_df)} rows") |
|
|
| assert isinstance(finished_df, pd.DataFrame), "finished should be DataFrame" |
| assert isinstance(running_df, pd.DataFrame), "running should be DataFrame" |
| assert isinstance(pending_df, pd.DataFrame), "pending should be DataFrame" |
|
|
| print(f" β
All three DataFrames loaded successfully") |
|
|
|
|
| |
| |
| |
|
|
| def test_status_categorization(): |
| """Entries should be categorized into correct queues based on status field.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Status categorization") |
| print(f"{'#'*70}") |
|
|
| |
| statuses = _collect_statuses(REAL_STATUS_PATH) |
| total_files = _count_json_recursive(REAL_STATUS_PATH) |
|
|
| expected_pending = sum(1 for s in statuses if s in ("Pending", "Rerun", "Waiting")) |
| expected_running = sum(1 for s in statuses if s == "Running") |
| expected_finished = sum(1 for s in statuses if s.startswith("Finished") or s == "PENDING_NEW_EVAL") |
| expected_total = expected_pending + expected_running + expected_finished |
|
|
| print(f" JSON files on disk: {total_files}") |
| print(f" Parseable statuses: {len(statuses)}") |
| print(f" Expected pending: {expected_pending}") |
| print(f" Expected running: {expected_running}") |
| print(f" Expected finished: {expected_finished}") |
| print(f" Sum (3 queues): {expected_total}") |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| actual_pending = len(pending_df) |
| actual_running = len(running_df) |
| actual_finished = len(finished_df) |
| actual_total = actual_pending + actual_running + actual_finished |
|
|
| print(f"\n Actual pending: {actual_pending}") |
| print(f" Actual running: {actual_running}") |
| print(f" Actual finished: {actual_finished}") |
| print(f" Actual total: {actual_total}") |
|
|
| errors = [] |
|
|
| if actual_pending != expected_pending: |
| errors.append(f"Pending mismatch: expected {expected_pending}, got {actual_pending}") |
| if actual_running != expected_running: |
| errors.append(f"Running mismatch: expected {expected_running}, got {actual_running}") |
| if actual_finished != expected_finished: |
| errors.append(f"Finished mismatch: expected {expected_finished}, got {actual_finished}") |
|
|
| |
| uncategorized = len(statuses) - expected_total |
| if uncategorized > 0: |
| print(f"\n β οΈ {uncategorized} entries have unrecognized status (not in any queue)") |
| uncategorized_statuses = [s for s in statuses if s not in ("Pending", "Rerun", "Waiting", "Running") and not s.startswith("Finished") and s != "PENDING_NEW_EVAL"] |
| for s in set(uncategorized_statuses): |
| cnt = uncategorized_statuses.count(s) |
| print(f" status='{s}': {cnt}") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
Categorization correct") |
|
|
|
|
| |
| |
| |
|
|
| def test_columns_present(): |
| """All three queue DataFrames should have the expected EVAL_COLS columns.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Column completeness") |
| print(f"{'#'*70}") |
|
|
| print(f" Expected EVAL_COLS: {EVAL_COLS}") |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| errors = [] |
| for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]: |
| actual_cols = list(df.columns) |
| print(f" {name:10s} columns: {actual_cols}") |
|
|
| |
| for col in EVAL_COLS: |
| if col not in actual_cols: |
| errors.append(f"{name}: missing column '{col}'") |
|
|
| |
| extra = [c for c in actual_cols if c not in EVAL_COLS] |
| if extra: |
| print(f" {name:10s} extra columns (not in EVAL_COLS): {extra}") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
All expected columns present") |
|
|
|
|
| |
| |
| |
|
|
| def test_quant_entry_no_crash(): |
| """Quant entries that use quant_precision instead of precision shouldn't crash.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Quant entry (missing 'precision') handling") |
| print(f"{'#'*70}") |
|
|
| |
| quant_file = None |
| for root, _dirs, files in os.walk(REAL_STATUS_PATH): |
| for f in files: |
| if f.endswith(".json"): |
| fp = os.path.join(root, f) |
| try: |
| d = json.load(open(fp)) |
| if "precision" not in d and "quant_precision" in d: |
| quant_file = fp |
| break |
| except: |
| pass |
| if quant_file: |
| break |
|
|
| if quant_file: |
| print(f" Found quant entry without 'precision': {quant_file}") |
| with open(quant_file) as fh: |
| d = json.load(fh) |
| print(f" status: {d.get('status')}") |
| print(f" quant_precision: {d.get('quant_precision')}") |
| print(f" has 'precision': {'precision' in d}") |
| else: |
| print(f" No quant entries without 'precision' found (skipping)") |
|
|
| |
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| |
| if quant_file: |
| d = json.load(open(quant_file)) |
| model_name = d["model"] |
| status = d["status"] |
|
|
| |
| target_df = None |
| target_name = None |
| if status in ("Pending", "Rerun", "Waiting"): |
| target_df = pending_df |
| target_name = "pending" |
| elif status == "Running": |
| target_df = running_df |
| target_name = "running" |
| elif status.startswith("Finished") or status == "PENDING_NEW_EVAL": |
| target_df = finished_df |
| target_name = "finished" |
|
|
| if target_df is not None: |
| |
| found = target_df["model"].astype(str).str.contains(model_name, regex=False).any() |
| if found: |
| print(f" β
Quant entry '{model_name}' correctly in {target_name} queue") |
| else: |
| print(f" β Quant entry '{model_name}' NOT found in {target_name} queue") |
| assert False, f"Quant entry missing from {target_name}" |
|
|
| |
| mask = target_df["model"].astype(str).str.contains(model_name, regex=False) |
| row = target_df[mask] |
| precision_val = row["precision"].iloc[0] if len(row) > 0 else "N/A" |
| print(f" precision column value: {precision_val} (type: {type(precision_val).__name__})") |
| else: |
| print(f" β οΈ Quant entry has unrecognized status: {status}") |
| else: |
| print(f" β
No crash (no quant entries to test specifically)") |
|
|
|
|
| |
| |
| |
|
|
| def test_synthetic_status_routing(): |
| """Test status routing with synthetic data covering all status variants.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Synthetic status routing") |
| print(f"{'#'*70}") |
|
|
| tmpdir = tempfile.mkdtemp(prefix="test_queue_") |
| try: |
| test_cases = [ |
| ("pending_model", "Pending", "pending"), |
| ("rerun_model", "Rerun", "pending"), |
| ("waiting_model", "Waiting", "pending"), |
| ("running_model", "Running", "running"), |
| ("finished_model", "Finished", "finished"), |
| ("finished2_model","Finished_2024", "finished"), |
| ("pne_model", "PENDING_NEW_EVAL", "finished"), |
| ] |
|
|
| for model, status, _ in test_cases: |
| entry = _create_test_entry(f"test/{model}", status) |
| fname = f"{model}_{status}.json" |
| with open(os.path.join(tmpdir, fname), "w") as fh: |
| json.dump(entry, fh) |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) |
|
|
| errors = [] |
| for model, status, expected_queue in test_cases: |
| full_model = f"test/{model}" |
| if expected_queue == "pending": |
| df = pending_df |
| elif expected_queue == "running": |
| df = running_df |
| else: |
| df = finished_df |
|
|
| found = df["model"].astype(str).str.contains(full_model, regex=False).any() |
| label = f"status='{status}' β {expected_queue}" |
| if found: |
| print(f" β
{label}") |
| else: |
| print(f" β {label} β NOT FOUND") |
| errors.append(label) |
|
|
| |
| print(f"\n Counts: pending={len(pending_df)}, running={len(running_df)}, finished={len(finished_df)}") |
| expected_counts = {"pending": 3, "running": 1, "finished": 3} |
| if len(pending_df) != expected_counts["pending"]: |
| errors.append(f"pending count: expected {expected_counts['pending']}, got {len(pending_df)}") |
| if len(running_df) != expected_counts["running"]: |
| errors.append(f"running count: expected {expected_counts['running']}, got {len(running_df)}") |
| if len(finished_df) != expected_counts["finished"]: |
| errors.append(f"finished count: expected {expected_counts['finished']}, got {len(finished_df)}") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
All synthetic entries correctly routed") |
|
|
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
|
|
|
|
| def test_resubmitted_quant_pending_not_overridden_by_old_failed_result(): |
| """A fresh quant re-submit should stay Pending despite an older failed result.""" |
| tmpdir = tempfile.mkdtemp(prefix="test_queue_resubmit_quant_") |
| results_dir = tempfile.mkdtemp(prefix="test_results_resubmit_quant_") |
| try: |
| entry = { |
| "model": "org/model", |
| "revision": "main", |
| "private": False, |
| "quant_scheme": "INT4 (W4A16)", |
| "quant_precision": "4bit", |
| "quant_weight_dtype": "int4", |
| "status": "Pending", |
| "submitted_time": "2026-05-21T10:00:00Z", |
| "script": "auto_quant", |
| "model_params": 7.0, |
| } |
| with open(os.path.join(tmpdir, "request.json"), "w") as fh: |
| json.dump(entry, fh) |
|
|
| old_failed_result = { |
| "model_id": "org/model", |
| "generated_at": "2026-05-21T09:00:00Z", |
| "run_dir": "runs/old", |
| "copied_files": ["x"], |
| "quant_summary": {"scheme": "W4A16", "status": "failed"}, |
| "accuracy": {"status": "missing"}, |
| } |
| with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh: |
| json.dump(old_failed_result, fh) |
|
|
| finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df( |
| tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir |
| ) |
|
|
| assert len(pending_df) == 1 |
| assert pending_df["model"].astype(str).str.contains("org/model", regex=False).any() |
| assert len(failed_df) == 0 |
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
| shutil.rmtree(results_dir, ignore_errors=True) |
|
|
|
|
| def test_submitted_quant_is_failed_when_result_is_newer(): |
| """A newer failed result should still move an older Pending request to Failed.""" |
| tmpdir = tempfile.mkdtemp(prefix="test_queue_newer_result_quant_") |
| results_dir = tempfile.mkdtemp(prefix="test_results_newer_result_quant_") |
| try: |
| entry = { |
| "model": "org/model", |
| "revision": "main", |
| "private": False, |
| "quant_scheme": "INT4 (W4A16)", |
| "quant_precision": "4bit", |
| "quant_weight_dtype": "int4", |
| "status": "Pending", |
| "submitted_time": "2026-05-21T08:00:00Z", |
| "script": "auto_quant", |
| "model_params": 7.0, |
| } |
| with open(os.path.join(tmpdir, "request.json"), "w") as fh: |
| json.dump(entry, fh) |
|
|
| newer_failed_result = { |
| "model_id": "org/model", |
| "generated_at": "2026-05-21T09:00:00Z", |
| "run_dir": "runs/new", |
| "copied_files": ["x"], |
| "quant_summary": {"scheme": "W4A16", "status": "failed"}, |
| "accuracy": {"status": "missing"}, |
| } |
| with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh: |
| json.dump(newer_failed_result, fh) |
|
|
| finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df( |
| tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir |
| ) |
|
|
| assert len(pending_df) == 0 |
| assert len(failed_df) == 1 |
| assert failed_df["model"].astype(str).str.contains("org/model", regex=False).any() |
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
| shutil.rmtree(results_dir, ignore_errors=True) |
|
|
|
|
| |
| |
| |
|
|
| def test_unknown_status_dropped(): |
| """Entries with unrecognized status values should not appear in any queue.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Unknown status entries are dropped") |
| print(f"{'#'*70}") |
|
|
| tmpdir = tempfile.mkdtemp(prefix="test_queue_unknown_") |
| try: |
| entries = [ |
| _create_test_entry("test/good", "Pending"), |
| _create_test_entry("test/bad1", "Cancelled"), |
| _create_test_entry("test/bad2", "Failed"), |
| _create_test_entry("test/bad3", "Deleted"), |
| ] |
| for i, entry in enumerate(entries): |
| with open(os.path.join(tmpdir, f"entry_{i}.json"), "w") as fh: |
| json.dump(entry, fh) |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) |
|
|
| total = len(finished_df) + len(running_df) + len(pending_df) |
| print(f" Total in queues: {total} (expected 1)") |
|
|
| assert total == 1, f"Expected 1 entry in queues, got {total}" |
| assert len(pending_df) == 1, f"Expected 1 pending, got {len(pending_df)}" |
| print(f" β
Only recognized status entries kept") |
|
|
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
|
|
|
|
| |
| |
| |
|
|
| def test_subdirectory_loading(): |
| """Entries in subdirectories should also be loaded (org/model pattern).""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Subdirectory loading") |
| print(f"{'#'*70}") |
|
|
| tmpdir = tempfile.mkdtemp(prefix="test_queue_subdir_") |
| try: |
| |
| root_entry = _create_test_entry("root/model1", "Finished") |
| with open(os.path.join(tmpdir, "root_model1.json"), "w") as fh: |
| json.dump(root_entry, fh) |
|
|
| |
| subdir = os.path.join(tmpdir, "myorg") |
| os.makedirs(subdir) |
| sub_entry = _create_test_entry("myorg/model2", "Running") |
| with open(os.path.join(subdir, "model2.json"), "w") as fh: |
| json.dump(sub_entry, fh) |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) |
|
|
| errors = [] |
| if len(finished_df) != 1: |
| errors.append(f"Expected 1 finished (root-level), got {len(finished_df)}") |
| if len(running_df) != 1: |
| errors.append(f"Expected 1 running (subdirectory), got {len(running_df)}") |
|
|
| |
| if len(running_df) > 0: |
| found = running_df["model"].astype(str).str.contains("myorg/model2", regex=False).any() |
| if not found: |
| errors.append("Subdirectory model 'myorg/model2' not found in running queue") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
Both root and subdirectory entries loaded") |
|
|
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
|
|
|
|
| |
| |
| |
|
|
| def test_malformed_json_skipped(): |
| """Malformed JSON files should be skipped without crashing.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Malformed JSON handling") |
| print(f"{'#'*70}") |
|
|
| tmpdir = tempfile.mkdtemp(prefix="test_queue_malformed_") |
| try: |
| |
| good = _create_test_entry("test/good", "Pending") |
| with open(os.path.join(tmpdir, "good.json"), "w") as fh: |
| json.dump(good, fh) |
|
|
| |
| with open(os.path.join(tmpdir, "bad.json"), "w") as fh: |
| fh.write("{broken json content") |
|
|
| |
| with open(os.path.join(tmpdir, "empty.json"), "w") as fh: |
| fh.write("") |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) |
|
|
| total = len(finished_df) + len(running_df) + len(pending_df) |
| print(f" Total entries loaded: {total} (expected 1)") |
|
|
| assert total == 1, f"Expected 1, got {total}" |
| print(f" β
Malformed files skipped, good entry loaded") |
|
|
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
|
|
|
|
| |
| |
| |
|
|
| def test_empty_directory(): |
| """An empty status directory should return three empty DataFrames.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Empty directory handling") |
| print(f"{'#'*70}") |
|
|
| tmpdir = tempfile.mkdtemp(prefix="test_queue_empty_") |
| try: |
| finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS) |
|
|
| assert len(finished_df) == 0, f"finished should be empty, got {len(finished_df)}" |
| assert len(running_df) == 0, f"running should be empty, got {len(running_df)}" |
| assert len(pending_df) == 0, f"pending should be empty, got {len(pending_df)}" |
|
|
| print(f" β
Empty directory returns 3 empty DataFrames") |
|
|
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
|
|
|
|
| |
| |
| |
|
|
| def test_model_column_clickable(): |
| """The model column should contain HTML hyperlinks (make_clickable_model).""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Model column contains clickable links") |
| print(f"{'#'*70}") |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| errors = [] |
| for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]: |
| if len(df) == 0: |
| print(f" {name}: empty (skipped)") |
| continue |
|
|
| |
| first_model = str(df["model"].iloc[0]) |
| has_link = "<a " in first_model and "href=" in first_model |
| if has_link: |
| print(f" {name}: β
model column has HTML links (sample: {first_model[:80]}...)") |
| else: |
| errors.append(f"{name}: model column missing HTML links: {first_model[:120]}") |
| print(f" {name}: β model column has no HTML links: {first_model[:120]}") |
|
|
| if errors: |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
All non-empty queues have clickable model links") |
|
|
|
|
| |
| |
| |
|
|
| def test_no_cross_queue_duplicates(): |
| """No model should appear in more than one queue.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: No cross-queue duplicates") |
| print(f"{'#'*70}") |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| |
| def extract_models(df): |
| if len(df) == 0: |
| return set() |
| return set(df["model"].astype(str).tolist()) |
|
|
| finished_models = extract_models(finished_df) |
| running_models = extract_models(running_df) |
| pending_models = extract_models(pending_df) |
|
|
| overlap_fr = finished_models & running_models |
| overlap_fp = finished_models & pending_models |
| overlap_rp = running_models & pending_models |
|
|
| errors = [] |
| if overlap_fr: |
| errors.append(f"Finished β© Running: {len(overlap_fr)} entries") |
| if overlap_fp: |
| errors.append(f"Finished β© Pending: {len(overlap_fp)} entries") |
| if overlap_rp: |
| errors.append(f"Running β© Pending: {len(overlap_rp)} entries") |
|
|
| print(f" Finished models: {len(finished_models)}") |
| print(f" Running models: {len(running_models)}") |
| print(f" Pending models: {len(pending_models)}") |
| print(f" Overlaps: Fβ©R={len(overlap_fr)}, Fβ©P={len(overlap_fp)}, Rβ©P={len(overlap_rp)}") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
No cross-queue duplicates") |
|
|
|
|
| |
| |
| |
|
|
| def test_queue_size_sanity(): |
| """Basic sanity: Finished queue should be the largest.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Queue size sanity") |
| print(f"{'#'*70}") |
|
|
| finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
|
|
| f, r, p = len(finished_df), len(running_df), len(pending_df) |
| print(f" Finished={f}, Running={r}, Pending={p}") |
|
|
| errors = [] |
| if f == 0: |
| errors.append("Finished queue is empty β expected many entries") |
| if f < r: |
| errors.append(f"Finished ({f}) < Running ({r}) β unexpected") |
| if f < p: |
| errors.append(f"Finished ({f}) < Pending ({p}) β unexpected for a mature leaderboard") |
|
|
| total = f + r + p |
| if total == 0: |
| errors.append("All queues empty β data not loaded?") |
|
|
| print(f" Total across queues: {total}") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
Queue sizes look reasonable") |
|
|
|
|
| |
| |
| |
|
|
| def test_eval_filter_excludes_quant(): |
| """request_type='eval' should only include _eval_request_ files.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: eval filter excludes quant entries") |
| print(f"{'#'*70}") |
|
|
| |
| all_fin, all_run, all_pend = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS) |
| total_all = len(all_fin) + len(all_run) + len(all_pend) |
|
|
| |
| eval_fin, eval_run, eval_pend = get_evaluation_queue_df( |
| REAL_STATUS_PATH, EVAL_COLS, request_type="eval" |
| ) |
| total_eval = len(eval_fin) + len(eval_run) + len(eval_pend) |
|
|
| |
| quant_fin, quant_run, quant_pend = get_evaluation_queue_df( |
| REAL_STATUS_PATH, QUANT_COLS, request_type="quant" |
| ) |
| total_quant = len(quant_fin) + len(quant_run) + len(quant_pend) |
|
|
| print(f" All (no filter): {total_all}") |
| print(f" Eval only: {total_eval}") |
| print(f" Quant only: {total_quant}") |
| print(f" Sum (eval+quant): {total_eval + total_quant}") |
|
|
| errors = [] |
| if total_eval + total_quant != total_all: |
| errors.append( |
| f"eval({total_eval}) + quant({total_quant}) = {total_eval + total_quant} " |
| f"!= all({total_all})" |
| ) |
| if total_eval == 0: |
| errors.append("eval filter returned 0 entries β expected many") |
| if total_eval >= total_all and total_quant > 0: |
| errors.append("eval filter didn't exclude any quant entries") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
eval + quant = total, filters work correctly") |
|
|
|
|
| |
| |
| |
|
|
| def test_quant_queue_columns(): |
| """Quant queue should use quant-specific columns (quant_scheme, input_dtype).""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Quant queue has correct columns") |
| print(f"{'#'*70}") |
|
|
| print(f" Expected QUANT_COLS: {QUANT_COLS}") |
|
|
| quant_fin, quant_run, quant_pend = get_evaluation_queue_df( |
| REAL_STATUS_PATH, QUANT_COLS, request_type="quant" |
| ) |
|
|
| total = len(quant_fin) + len(quant_run) + len(quant_pend) |
| print(f" Total quant entries: {total}") |
|
|
| if total == 0: |
| print(f" β οΈ No quant entries in real data β checking column structure only") |
|
|
| for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]: |
| actual_cols = list(df.columns) |
| print(f" {name:10s} columns: {actual_cols}") |
|
|
| |
| errors = [] |
| for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]: |
| for col in ["weight_type"]: |
| if col in df.columns: |
| errors.append(f"{name}: should NOT have eval-specific column '{col}'") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
Quant queue columns are correct") |
|
|
|
|
| |
| |
| |
|
|
| def test_synthetic_request_type_filter(): |
| """Synthetic test: eval/quant filter correctly separates by filename pattern.""" |
| print(f"\n{'#'*70}") |
| print(f" TEST: Synthetic request_type filter") |
| print(f"{'#'*70}") |
|
|
| tmpdir = tempfile.mkdtemp(prefix="test_queue_filter_") |
| try: |
| |
| for i in range(3): |
| entry = _create_test_entry(f"org/eval_model_{i}", "Finished") |
| fname = f"eval_model_{i}_eval_request_False_AWQ_4bit_int4.json" |
| with open(os.path.join(tmpdir, fname), "w") as fh: |
| json.dump(entry, fh) |
|
|
| |
| for i in range(2): |
| entry = _create_test_entry(f"org/quant_model_{i}", "Pending", |
| extra={"quant_scheme": "INT4 (W4A16)", "input_dtype": "bfloat16"}) |
| fname = f"quant_model_{i}_quant_request_False_INT4.json" |
| with open(os.path.join(tmpdir, fname), "w") as fh: |
| json.dump(entry, fh) |
|
|
| |
| all_fin, all_run, all_pend = get_evaluation_queue_df(tmpdir, EVAL_COLS) |
| total_all = len(all_fin) + len(all_run) + len(all_pend) |
|
|
| |
| eval_fin, eval_run, eval_pend = get_evaluation_queue_df( |
| tmpdir, EVAL_COLS, request_type="eval" |
| ) |
| total_eval = len(eval_fin) + len(eval_run) + len(eval_pend) |
|
|
| |
| quant_fin, quant_run, quant_pend = get_evaluation_queue_df( |
| tmpdir, QUANT_COLS, request_type="quant" |
| ) |
| total_quant = len(quant_fin) + len(quant_run) + len(quant_pend) |
|
|
| print(f" All: {total_all} (expected 5)") |
| print(f" Eval: {total_eval} (expected 3)") |
| print(f" Quant: {total_quant} (expected 2)") |
|
|
| errors = [] |
| if total_all != 5: |
| errors.append(f"All: expected 5, got {total_all}") |
| if total_eval != 3: |
| errors.append(f"Eval: expected 3, got {total_eval}") |
| if total_quant != 2: |
| errors.append(f"Quant: expected 2, got {total_quant}") |
|
|
| |
| if total_quant > 0 and "quant_scheme" in quant_pend.columns: |
| vals = quant_pend["quant_scheme"].dropna().tolist() |
| if all(v == "INT4 (W4A16)" for v in vals): |
| print(f" β
Quant entries have correct quant_scheme") |
| else: |
| errors.append(f"quant_scheme values: {vals}") |
|
|
| if errors: |
| for e in errors: |
| print(f" β {e}") |
| assert False, "; ".join(errors) |
| else: |
| print(f" β
request_type filter works correctly") |
|
|
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| print("=" * 70) |
| print(" Evaluation Queue Unit Tests") |
| print("=" * 70) |
|
|
| tests = [ |
| ("test_real_data_loads", test_real_data_loads), |
| ("test_status_categorization", test_status_categorization), |
| ("test_columns_present", test_columns_present), |
| ("test_quant_entry_no_crash", test_quant_entry_no_crash), |
| ("test_synthetic_status_routing", test_synthetic_status_routing), |
| ("test_unknown_status_dropped", test_unknown_status_dropped), |
| ("test_subdirectory_loading", test_subdirectory_loading), |
| ("test_malformed_json_skipped", test_malformed_json_skipped), |
| ("test_empty_directory", test_empty_directory), |
| ("test_model_column_clickable", test_model_column_clickable), |
| ("test_no_cross_queue_duplicates", test_no_cross_queue_duplicates), |
| ("test_queue_size_sanity", test_queue_size_sanity), |
| ("test_eval_filter_excludes_quant", test_eval_filter_excludes_quant), |
| ("test_quant_queue_columns", test_quant_queue_columns), |
| ("test_synthetic_request_type_filter", test_synthetic_request_type_filter), |
| ] |
|
|
| results = {} |
| for name, func in tests: |
| try: |
| func() |
| results[name] = True |
| except Exception as e: |
| results[name] = False |
| print(f" β EXCEPTION: {e}") |
|
|
| print(f"\n{'='*70}") |
| print(" SUMMARY") |
| print(f"{'='*70}") |
| for name, passed in results.items(): |
| status = "β
PASS" if passed else "β FAIL" |
| print(f" {status} {name}") |
|
|
| total = len(results) |
| passed = sum(1 for v in results.values() if v) |
| print(f"\n {passed}/{total} tests passed") |
| print(f"{'='*70}") |
|
|