File size: 41,383 Bytes
55473c1 4291baa 55473c1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 | """Unit tests for the Evaluation Queue functionality.
Tests cover:
- get_evaluation_queue_df correctly loads and categorizes queue entries
- Status categorization: Finished, Running, Pending (incl. Waiting/Rerun)
- Column completeness and data types
- Robustness against entries missing expected fields (e.g. quant entries)
- Count consistency: sum of 3 queues == total parseable entries
- No crashes on the real cache_git/status data
"""
import json
import logging
import os
import sys
import tempfile
import shutil
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(level=logging.DEBUG, format="%(name)s %(levelname)s: %(message)s")
logger = logging.getLogger("test_eval_queue")
from src.populate import get_evaluation_queue_df
from src.display.utils import EvalQueueColumn, EVAL_COLS, EVAL_TYPES, QUANT_COLS, QUANT_TYPES, eval_queue_cols
from src.display.formatting import make_clickable_model
# ββ Paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
REAL_STATUS_PATH = os.path.join(PROJECT_ROOT, "cache_git", "status")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Helpers
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _count_json_recursive(path):
"""Count all .json files recursively under *path*."""
total = 0
for root, _dirs, files in os.walk(path):
for f in files:
if f.endswith(".json"):
total += 1
return total
def _collect_statuses(path):
"""Collect all status values from JSON files under *path*."""
statuses = []
for root, _dirs, files in os.walk(path):
for f in files:
if f.endswith(".json"):
fp = os.path.join(root, f)
try:
with open(fp) as fh:
d = json.load(fh)
statuses.append(d.get("status", "UNKNOWN"))
except (json.JSONDecodeError, OSError):
pass
return statuses
def _create_test_entry(model, status, precision="4bit", extra=None):
"""Create a minimal queue JSON dict."""
entry = {
"model": model,
"revision": "main",
"private": False,
"precision": precision,
"weight_dtype": "int4",
"status": status,
}
if extra:
entry.update(extra)
return entry
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 1: Real data β basic loading and non-crash
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_real_data_loads():
"""get_evaluation_queue_df should load real cache_git/status without crashing."""
print(f"\n{'#'*70}")
print(f" TEST: Real data loads without crash")
print(f"{'#'*70}")
assert os.path.isdir(REAL_STATUS_PATH), f"Status path missing: {REAL_STATUS_PATH}"
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
print(f" Finished: {len(finished_df)} rows")
print(f" Running: {len(running_df)} rows")
print(f" Pending: {len(pending_df)} rows")
assert isinstance(finished_df, pd.DataFrame), "finished should be DataFrame"
assert isinstance(running_df, pd.DataFrame), "running should be DataFrame"
assert isinstance(pending_df, pd.DataFrame), "pending should be DataFrame"
print(f" β
All three DataFrames loaded successfully")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 2: Status categorization correctness
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_status_categorization():
"""Entries should be categorized into correct queues based on status field."""
print(f"\n{'#'*70}")
print(f" TEST: Status categorization")
print(f"{'#'*70}")
# Manually count expected numbers from raw data
statuses = _collect_statuses(REAL_STATUS_PATH)
total_files = _count_json_recursive(REAL_STATUS_PATH)
expected_pending = sum(1 for s in statuses if s in ("Pending", "Rerun", "Waiting"))
expected_running = sum(1 for s in statuses if s == "Running")
expected_finished = sum(1 for s in statuses if s.startswith("Finished") or s == "PENDING_NEW_EVAL")
expected_total = expected_pending + expected_running + expected_finished
print(f" JSON files on disk: {total_files}")
print(f" Parseable statuses: {len(statuses)}")
print(f" Expected pending: {expected_pending}")
print(f" Expected running: {expected_running}")
print(f" Expected finished: {expected_finished}")
print(f" Sum (3 queues): {expected_total}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
actual_pending = len(pending_df)
actual_running = len(running_df)
actual_finished = len(finished_df)
actual_total = actual_pending + actual_running + actual_finished
print(f"\n Actual pending: {actual_pending}")
print(f" Actual running: {actual_running}")
print(f" Actual finished: {actual_finished}")
print(f" Actual total: {actual_total}")
errors = []
if actual_pending != expected_pending:
errors.append(f"Pending mismatch: expected {expected_pending}, got {actual_pending}")
if actual_running != expected_running:
errors.append(f"Running mismatch: expected {expected_running}, got {actual_running}")
if actual_finished != expected_finished:
errors.append(f"Finished mismatch: expected {expected_finished}, got {actual_finished}")
# Some entries may have unknown statuses and fall into none of the 3 queues
uncategorized = len(statuses) - expected_total
if uncategorized > 0:
print(f"\n β οΈ {uncategorized} entries have unrecognized status (not in any queue)")
uncategorized_statuses = [s for s in statuses if s not in ("Pending", "Rerun", "Waiting", "Running") and not s.startswith("Finished") and s != "PENDING_NEW_EVAL"]
for s in set(uncategorized_statuses):
cnt = uncategorized_statuses.count(s)
print(f" status='{s}': {cnt}")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
Categorization correct")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 3: Column completeness
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_columns_present():
"""All three queue DataFrames should have the expected EVAL_COLS columns."""
print(f"\n{'#'*70}")
print(f" TEST: Column completeness")
print(f"{'#'*70}")
print(f" Expected EVAL_COLS: {EVAL_COLS}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
errors = []
for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]:
actual_cols = list(df.columns)
print(f" {name:10s} columns: {actual_cols}")
# Check that expected columns are present
for col in EVAL_COLS:
if col not in actual_cols:
errors.append(f"{name}: missing column '{col}'")
# Check for unexpected extra columns
extra = [c for c in actual_cols if c not in EVAL_COLS]
if extra:
print(f" {name:10s} extra columns (not in EVAL_COLS): {extra}")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
All expected columns present")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 4: Quant entries (missing 'precision') don't crash the queue
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_quant_entry_no_crash():
"""Quant entries that use quant_precision instead of precision shouldn't crash."""
print(f"\n{'#'*70}")
print(f" TEST: Quant entry (missing 'precision') handling")
print(f"{'#'*70}")
# Find the actual quant entry we know about
quant_file = None
for root, _dirs, files in os.walk(REAL_STATUS_PATH):
for f in files:
if f.endswith(".json"):
fp = os.path.join(root, f)
try:
d = json.load(open(fp))
if "precision" not in d and "quant_precision" in d:
quant_file = fp
break
except:
pass
if quant_file:
break
if quant_file:
print(f" Found quant entry without 'precision': {quant_file}")
with open(quant_file) as fh:
d = json.load(fh)
print(f" status: {d.get('status')}")
print(f" quant_precision: {d.get('quant_precision')}")
print(f" has 'precision': {'precision' in d}")
else:
print(f" No quant entries without 'precision' found (skipping)")
# The main check: no crash
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
# If the quant entry is Pending, it should be in pending_df
if quant_file:
d = json.load(open(quant_file))
model_name = d["model"]
status = d["status"]
# Check which queue it ended up in
target_df = None
target_name = None
if status in ("Pending", "Rerun", "Waiting"):
target_df = pending_df
target_name = "pending"
elif status == "Running":
target_df = running_df
target_name = "running"
elif status.startswith("Finished") or status == "PENDING_NEW_EVAL":
target_df = finished_df
target_name = "finished"
if target_df is not None:
# The 'model' column contains clickable HTML, search within
found = target_df["model"].astype(str).str.contains(model_name, regex=False).any()
if found:
print(f" β
Quant entry '{model_name}' correctly in {target_name} queue")
else:
print(f" β Quant entry '{model_name}' NOT found in {target_name} queue")
assert False, f"Quant entry missing from {target_name}"
# Check the 'precision' column for this entry β it should be NaN or empty
mask = target_df["model"].astype(str).str.contains(model_name, regex=False)
row = target_df[mask]
precision_val = row["precision"].iloc[0] if len(row) > 0 else "N/A"
print(f" precision column value: {precision_val} (type: {type(precision_val).__name__})")
else:
print(f" β οΈ Quant entry has unrecognized status: {status}")
else:
print(f" β
No crash (no quant entries to test specifically)")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 5: Synthetic data β controlled status routing
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_synthetic_status_routing():
"""Test status routing with synthetic data covering all status variants."""
print(f"\n{'#'*70}")
print(f" TEST: Synthetic status routing")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_")
try:
test_cases = [
("pending_model", "Pending", "pending"),
("rerun_model", "Rerun", "pending"),
("waiting_model", "Waiting", "pending"),
("running_model", "Running", "running"),
("finished_model", "Finished", "finished"),
("finished2_model","Finished_2024", "finished"), # startswith("Finished")
("pne_model", "PENDING_NEW_EVAL", "finished"),
]
for model, status, _ in test_cases:
entry = _create_test_entry(f"test/{model}", status)
fname = f"{model}_{status}.json"
with open(os.path.join(tmpdir, fname), "w") as fh:
json.dump(entry, fh)
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
errors = []
for model, status, expected_queue in test_cases:
full_model = f"test/{model}"
if expected_queue == "pending":
df = pending_df
elif expected_queue == "running":
df = running_df
else:
df = finished_df
found = df["model"].astype(str).str.contains(full_model, regex=False).any()
label = f"status='{status}' β {expected_queue}"
if found:
print(f" β
{label}")
else:
print(f" β {label} β NOT FOUND")
errors.append(label)
# Verify counts
print(f"\n Counts: pending={len(pending_df)}, running={len(running_df)}, finished={len(finished_df)}")
expected_counts = {"pending": 3, "running": 1, "finished": 3}
if len(pending_df) != expected_counts["pending"]:
errors.append(f"pending count: expected {expected_counts['pending']}, got {len(pending_df)}")
if len(running_df) != expected_counts["running"]:
errors.append(f"running count: expected {expected_counts['running']}, got {len(running_df)}")
if len(finished_df) != expected_counts["finished"]:
errors.append(f"finished count: expected {expected_counts['finished']}, got {len(finished_df)}")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
All synthetic entries correctly routed")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def test_resubmitted_quant_pending_not_overridden_by_old_failed_result():
"""A fresh quant re-submit should stay Pending despite an older failed result."""
tmpdir = tempfile.mkdtemp(prefix="test_queue_resubmit_quant_")
results_dir = tempfile.mkdtemp(prefix="test_results_resubmit_quant_")
try:
entry = {
"model": "org/model",
"revision": "main",
"private": False,
"quant_scheme": "INT4 (W4A16)",
"quant_precision": "4bit",
"quant_weight_dtype": "int4",
"status": "Pending",
"submitted_time": "2026-05-21T10:00:00Z",
"script": "auto_quant",
"model_params": 7.0,
}
with open(os.path.join(tmpdir, "request.json"), "w") as fh:
json.dump(entry, fh)
old_failed_result = {
"model_id": "org/model",
"generated_at": "2026-05-21T09:00:00Z",
"run_dir": "runs/old",
"copied_files": ["x"],
"quant_summary": {"scheme": "W4A16", "status": "failed"},
"accuracy": {"status": "missing"},
}
with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh:
json.dump(old_failed_result, fh)
finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df(
tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir
)
assert len(pending_df) == 1
assert pending_df["model"].astype(str).str.contains("org/model", regex=False).any()
assert len(failed_df) == 0
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
shutil.rmtree(results_dir, ignore_errors=True)
def test_submitted_quant_is_failed_when_result_is_newer():
"""A newer failed result should still move an older Pending request to Failed."""
tmpdir = tempfile.mkdtemp(prefix="test_queue_newer_result_quant_")
results_dir = tempfile.mkdtemp(prefix="test_results_newer_result_quant_")
try:
entry = {
"model": "org/model",
"revision": "main",
"private": False,
"quant_scheme": "INT4 (W4A16)",
"quant_precision": "4bit",
"quant_weight_dtype": "int4",
"status": "Pending",
"submitted_time": "2026-05-21T08:00:00Z",
"script": "auto_quant",
"model_params": 7.0,
}
with open(os.path.join(tmpdir, "request.json"), "w") as fh:
json.dump(entry, fh)
newer_failed_result = {
"model_id": "org/model",
"generated_at": "2026-05-21T09:00:00Z",
"run_dir": "runs/new",
"copied_files": ["x"],
"quant_summary": {"scheme": "W4A16", "status": "failed"},
"accuracy": {"status": "missing"},
}
with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh:
json.dump(newer_failed_result, fh)
finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df(
tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir
)
assert len(pending_df) == 0
assert len(failed_df) == 1
assert failed_df["model"].astype(str).str.contains("org/model", regex=False).any()
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
shutil.rmtree(results_dir, ignore_errors=True)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 6: Synthetic β unknown status entries are silently dropped
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_unknown_status_dropped():
"""Entries with unrecognized status values should not appear in any queue."""
print(f"\n{'#'*70}")
print(f" TEST: Unknown status entries are dropped")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_unknown_")
try:
entries = [
_create_test_entry("test/good", "Pending"),
_create_test_entry("test/bad1", "Cancelled"),
_create_test_entry("test/bad2", "Failed"),
_create_test_entry("test/bad3", "Deleted"),
]
for i, entry in enumerate(entries):
with open(os.path.join(tmpdir, f"entry_{i}.json"), "w") as fh:
json.dump(entry, fh)
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
total = len(finished_df) + len(running_df) + len(pending_df)
print(f" Total in queues: {total} (expected 1)")
assert total == 1, f"Expected 1 entry in queues, got {total}"
assert len(pending_df) == 1, f"Expected 1 pending, got {len(pending_df)}"
print(f" β
Only recognized status entries kept")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 7: Synthetic β subdirectory entries are also loaded
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_subdirectory_loading():
"""Entries in subdirectories should also be loaded (org/model pattern)."""
print(f"\n{'#'*70}")
print(f" TEST: Subdirectory loading")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_subdir_")
try:
# Root-level entry
root_entry = _create_test_entry("root/model1", "Finished")
with open(os.path.join(tmpdir, "root_model1.json"), "w") as fh:
json.dump(root_entry, fh)
# Subdirectory entry (like org/model pattern)
subdir = os.path.join(tmpdir, "myorg")
os.makedirs(subdir)
sub_entry = _create_test_entry("myorg/model2", "Running")
with open(os.path.join(subdir, "model2.json"), "w") as fh:
json.dump(sub_entry, fh)
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
errors = []
if len(finished_df) != 1:
errors.append(f"Expected 1 finished (root-level), got {len(finished_df)}")
if len(running_df) != 1:
errors.append(f"Expected 1 running (subdirectory), got {len(running_df)}")
# Verify the subdirectory model is found
if len(running_df) > 0:
found = running_df["model"].astype(str).str.contains("myorg/model2", regex=False).any()
if not found:
errors.append("Subdirectory model 'myorg/model2' not found in running queue")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
Both root and subdirectory entries loaded")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 8: Malformed JSON files are skipped gracefully
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_malformed_json_skipped():
"""Malformed JSON files should be skipped without crashing."""
print(f"\n{'#'*70}")
print(f" TEST: Malformed JSON handling")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_malformed_")
try:
# Valid entry
good = _create_test_entry("test/good", "Pending")
with open(os.path.join(tmpdir, "good.json"), "w") as fh:
json.dump(good, fh)
# Malformed JSON
with open(os.path.join(tmpdir, "bad.json"), "w") as fh:
fh.write("{broken json content")
# Empty file
with open(os.path.join(tmpdir, "empty.json"), "w") as fh:
fh.write("")
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
total = len(finished_df) + len(running_df) + len(pending_df)
print(f" Total entries loaded: {total} (expected 1)")
assert total == 1, f"Expected 1, got {total}"
print(f" β
Malformed files skipped, good entry loaded")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 9: Empty directory returns empty DataFrames
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_empty_directory():
"""An empty status directory should return three empty DataFrames."""
print(f"\n{'#'*70}")
print(f" TEST: Empty directory handling")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_empty_")
try:
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
assert len(finished_df) == 0, f"finished should be empty, got {len(finished_df)}"
assert len(running_df) == 0, f"running should be empty, got {len(running_df)}"
assert len(pending_df) == 0, f"pending should be empty, got {len(pending_df)}"
print(f" β
Empty directory returns 3 empty DataFrames")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 10: Real data β model column contains clickable links
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_model_column_clickable():
"""The model column should contain HTML hyperlinks (make_clickable_model)."""
print(f"\n{'#'*70}")
print(f" TEST: Model column contains clickable links")
print(f"{'#'*70}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
errors = []
for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]:
if len(df) == 0:
print(f" {name}: empty (skipped)")
continue
# Check first row's model column
first_model = str(df["model"].iloc[0])
has_link = "<a " in first_model and "href=" in first_model
if has_link:
print(f" {name}: β
model column has HTML links (sample: {first_model[:80]}...)")
else:
errors.append(f"{name}: model column missing HTML links: {first_model[:120]}")
print(f" {name}: β model column has no HTML links: {first_model[:120]}")
if errors:
assert False, "; ".join(errors)
else:
print(f" β
All non-empty queues have clickable model links")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 11: Real data β no duplicate entries across queues
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_no_cross_queue_duplicates():
"""No model should appear in more than one queue."""
print(f"\n{'#'*70}")
print(f" TEST: No cross-queue duplicates")
print(f"{'#'*70}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
# Extract raw model text from HTML for comparison
def extract_models(df):
if len(df) == 0:
return set()
return set(df["model"].astype(str).tolist())
finished_models = extract_models(finished_df)
running_models = extract_models(running_df)
pending_models = extract_models(pending_df)
overlap_fr = finished_models & running_models
overlap_fp = finished_models & pending_models
overlap_rp = running_models & pending_models
errors = []
if overlap_fr:
errors.append(f"Finished β© Running: {len(overlap_fr)} entries")
if overlap_fp:
errors.append(f"Finished β© Pending: {len(overlap_fp)} entries")
if overlap_rp:
errors.append(f"Running β© Pending: {len(overlap_rp)} entries")
print(f" Finished models: {len(finished_models)}")
print(f" Running models: {len(running_models)}")
print(f" Pending models: {len(pending_models)}")
print(f" Overlaps: Fβ©R={len(overlap_fr)}, Fβ©P={len(overlap_fp)}, Rβ©P={len(overlap_rp)}")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
No cross-queue duplicates")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 12: Real data β Finished queue has the most entries
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_queue_size_sanity():
"""Basic sanity: Finished queue should be the largest."""
print(f"\n{'#'*70}")
print(f" TEST: Queue size sanity")
print(f"{'#'*70}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
f, r, p = len(finished_df), len(running_df), len(pending_df)
print(f" Finished={f}, Running={r}, Pending={p}")
errors = []
if f == 0:
errors.append("Finished queue is empty β expected many entries")
if f < r:
errors.append(f"Finished ({f}) < Running ({r}) β unexpected")
if f < p:
errors.append(f"Finished ({f}) < Pending ({p}) β unexpected for a mature leaderboard")
total = f + r + p
if total == 0:
errors.append("All queues empty β data not loaded?")
print(f" Total across queues: {total}")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
Queue sizes look reasonable")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 13: Real data β eval filter excludes quant entries
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_eval_filter_excludes_quant():
"""request_type='eval' should only include _eval_request_ files."""
print(f"\n{'#'*70}")
print(f" TEST: eval filter excludes quant entries")
print(f"{'#'*70}")
# Load all (no filter)
all_fin, all_run, all_pend = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
total_all = len(all_fin) + len(all_run) + len(all_pend)
# Load eval only
eval_fin, eval_run, eval_pend = get_evaluation_queue_df(
REAL_STATUS_PATH, EVAL_COLS, request_type="eval"
)
total_eval = len(eval_fin) + len(eval_run) + len(eval_pend)
# Load quant only
quant_fin, quant_run, quant_pend = get_evaluation_queue_df(
REAL_STATUS_PATH, QUANT_COLS, request_type="quant"
)
total_quant = len(quant_fin) + len(quant_run) + len(quant_pend)
print(f" All (no filter): {total_all}")
print(f" Eval only: {total_eval}")
print(f" Quant only: {total_quant}")
print(f" Sum (eval+quant): {total_eval + total_quant}")
errors = []
if total_eval + total_quant != total_all:
errors.append(
f"eval({total_eval}) + quant({total_quant}) = {total_eval + total_quant} "
f"!= all({total_all})"
)
if total_eval == 0:
errors.append("eval filter returned 0 entries β expected many")
if total_eval >= total_all and total_quant > 0:
errors.append("eval filter didn't exclude any quant entries")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
eval + quant = total, filters work correctly")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 14: Quant queue uses QUANT_COLS correctly
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_quant_queue_columns():
"""Quant queue should use quant-specific columns (quant_scheme, input_dtype)."""
print(f"\n{'#'*70}")
print(f" TEST: Quant queue has correct columns")
print(f"{'#'*70}")
print(f" Expected QUANT_COLS: {QUANT_COLS}")
quant_fin, quant_run, quant_pend = get_evaluation_queue_df(
REAL_STATUS_PATH, QUANT_COLS, request_type="quant"
)
total = len(quant_fin) + len(quant_run) + len(quant_pend)
print(f" Total quant entries: {total}")
if total == 0:
print(f" β οΈ No quant entries in real data β checking column structure only")
for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]:
actual_cols = list(df.columns)
print(f" {name:10s} columns: {actual_cols}")
# Verify columns don't contain eval-only fields
errors = []
for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]:
for col in ["weight_type"]:
if col in df.columns:
errors.append(f"{name}: should NOT have eval-specific column '{col}'")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
Quant queue columns are correct")
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Test 15: Synthetic β request_type filter routes correctly
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_synthetic_request_type_filter():
"""Synthetic test: eval/quant filter correctly separates by filename pattern."""
print(f"\n{'#'*70}")
print(f" TEST: Synthetic request_type filter")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_filter_")
try:
# Create eval entries
for i in range(3):
entry = _create_test_entry(f"org/eval_model_{i}", "Finished")
fname = f"eval_model_{i}_eval_request_False_AWQ_4bit_int4.json"
with open(os.path.join(tmpdir, fname), "w") as fh:
json.dump(entry, fh)
# Create quant entries
for i in range(2):
entry = _create_test_entry(f"org/quant_model_{i}", "Pending",
extra={"quant_scheme": "INT4 (W4A16)", "input_dtype": "bfloat16"})
fname = f"quant_model_{i}_quant_request_False_INT4.json"
with open(os.path.join(tmpdir, fname), "w") as fh:
json.dump(entry, fh)
# No filter β all 5
all_fin, all_run, all_pend = get_evaluation_queue_df(tmpdir, EVAL_COLS)
total_all = len(all_fin) + len(all_run) + len(all_pend)
# Eval filter β should get 3
eval_fin, eval_run, eval_pend = get_evaluation_queue_df(
tmpdir, EVAL_COLS, request_type="eval"
)
total_eval = len(eval_fin) + len(eval_run) + len(eval_pend)
# Quant filter β should get 2
quant_fin, quant_run, quant_pend = get_evaluation_queue_df(
tmpdir, QUANT_COLS, request_type="quant"
)
total_quant = len(quant_fin) + len(quant_run) + len(quant_pend)
print(f" All: {total_all} (expected 5)")
print(f" Eval: {total_eval} (expected 3)")
print(f" Quant: {total_quant} (expected 2)")
errors = []
if total_all != 5:
errors.append(f"All: expected 5, got {total_all}")
if total_eval != 3:
errors.append(f"Eval: expected 3, got {total_eval}")
if total_quant != 2:
errors.append(f"Quant: expected 2, got {total_quant}")
# Verify quant entries have quant_scheme column
if total_quant > 0 and "quant_scheme" in quant_pend.columns:
vals = quant_pend["quant_scheme"].dropna().tolist()
if all(v == "INT4 (W4A16)" for v in vals):
print(f" β
Quant entries have correct quant_scheme")
else:
errors.append(f"quant_scheme values: {vals}")
if errors:
for e in errors:
print(f" β {e}")
assert False, "; ".join(errors)
else:
print(f" β
request_type filter works correctly")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Main
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
print("=" * 70)
print(" Evaluation Queue Unit Tests")
print("=" * 70)
tests = [
("test_real_data_loads", test_real_data_loads),
("test_status_categorization", test_status_categorization),
("test_columns_present", test_columns_present),
("test_quant_entry_no_crash", test_quant_entry_no_crash),
("test_synthetic_status_routing", test_synthetic_status_routing),
("test_unknown_status_dropped", test_unknown_status_dropped),
("test_subdirectory_loading", test_subdirectory_loading),
("test_malformed_json_skipped", test_malformed_json_skipped),
("test_empty_directory", test_empty_directory),
("test_model_column_clickable", test_model_column_clickable),
("test_no_cross_queue_duplicates", test_no_cross_queue_duplicates),
("test_queue_size_sanity", test_queue_size_sanity),
("test_eval_filter_excludes_quant", test_eval_filter_excludes_quant),
("test_quant_queue_columns", test_quant_queue_columns),
("test_synthetic_request_type_filter", test_synthetic_request_type_filter),
]
results = {}
for name, func in tests:
try:
func()
results[name] = True
except Exception as e:
results[name] = False
print(f" β EXCEPTION: {e}")
print(f"\n{'='*70}")
print(" SUMMARY")
print(f"{'='*70}")
for name, passed in results.items():
status = "β
PASS" if passed else "β FAIL"
print(f" {status} {name}")
total = len(results)
passed = sum(1 for v in results.values() if v)
print(f"\n {passed}/{total} tests passed")
print(f"{'='*70}")
|