low_bit_open_llm_leaderboard / tests /test_eval_queue.py
wenjiao's picture
Enhance features(Access, queue, table, UI and so on...)
4291baa
"""Unit tests for the Evaluation Queue functionality.
Tests cover:
- get_evaluation_queue_df correctly loads and categorizes queue entries
- Status categorization: Finished, Running, Pending (incl. Waiting/Rerun)
- Column completeness and data types
- Robustness against entries missing expected fields (e.g. quant entries)
- Count consistency: sum of 3 queues == total parseable entries
- No crashes on the real cache_git/status data
"""
import json
import logging
import os
import sys
import tempfile
import shutil
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(level=logging.DEBUG, format="%(name)s %(levelname)s: %(message)s")
logger = logging.getLogger("test_eval_queue")
from src.populate import get_evaluation_queue_df
from src.display.utils import EvalQueueColumn, EVAL_COLS, EVAL_TYPES, QUANT_COLS, QUANT_TYPES, eval_queue_cols
from src.display.formatting import make_clickable_model
# ── Paths ────────────────────────────────────────────────────────────────────
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
REAL_STATUS_PATH = os.path.join(PROJECT_ROOT, "cache_git", "status")
# ═══════════════════════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════════════════════
def _count_json_recursive(path):
"""Count all .json files recursively under *path*."""
total = 0
for root, _dirs, files in os.walk(path):
for f in files:
if f.endswith(".json"):
total += 1
return total
def _collect_statuses(path):
"""Collect all status values from JSON files under *path*."""
statuses = []
for root, _dirs, files in os.walk(path):
for f in files:
if f.endswith(".json"):
fp = os.path.join(root, f)
try:
with open(fp) as fh:
d = json.load(fh)
statuses.append(d.get("status", "UNKNOWN"))
except (json.JSONDecodeError, OSError):
pass
return statuses
def _create_test_entry(model, status, precision="4bit", extra=None):
"""Create a minimal queue JSON dict."""
entry = {
"model": model,
"revision": "main",
"private": False,
"precision": precision,
"weight_dtype": "int4",
"status": status,
}
if extra:
entry.update(extra)
return entry
# ═══════════════════════════════════════════════════════════════════════════════
# Test 1: Real data β€” basic loading and non-crash
# ═══════════════════════════════════════════════════════════════════════════════
def test_real_data_loads():
"""get_evaluation_queue_df should load real cache_git/status without crashing."""
print(f"\n{'#'*70}")
print(f" TEST: Real data loads without crash")
print(f"{'#'*70}")
assert os.path.isdir(REAL_STATUS_PATH), f"Status path missing: {REAL_STATUS_PATH}"
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
print(f" Finished: {len(finished_df)} rows")
print(f" Running: {len(running_df)} rows")
print(f" Pending: {len(pending_df)} rows")
assert isinstance(finished_df, pd.DataFrame), "finished should be DataFrame"
assert isinstance(running_df, pd.DataFrame), "running should be DataFrame"
assert isinstance(pending_df, pd.DataFrame), "pending should be DataFrame"
print(f" βœ… All three DataFrames loaded successfully")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 2: Status categorization correctness
# ═══════════════════════════════════════════════════════════════════════════════
def test_status_categorization():
"""Entries should be categorized into correct queues based on status field."""
print(f"\n{'#'*70}")
print(f" TEST: Status categorization")
print(f"{'#'*70}")
# Manually count expected numbers from raw data
statuses = _collect_statuses(REAL_STATUS_PATH)
total_files = _count_json_recursive(REAL_STATUS_PATH)
expected_pending = sum(1 for s in statuses if s in ("Pending", "Rerun", "Waiting"))
expected_running = sum(1 for s in statuses if s == "Running")
expected_finished = sum(1 for s in statuses if s.startswith("Finished") or s == "PENDING_NEW_EVAL")
expected_total = expected_pending + expected_running + expected_finished
print(f" JSON files on disk: {total_files}")
print(f" Parseable statuses: {len(statuses)}")
print(f" Expected pending: {expected_pending}")
print(f" Expected running: {expected_running}")
print(f" Expected finished: {expected_finished}")
print(f" Sum (3 queues): {expected_total}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
actual_pending = len(pending_df)
actual_running = len(running_df)
actual_finished = len(finished_df)
actual_total = actual_pending + actual_running + actual_finished
print(f"\n Actual pending: {actual_pending}")
print(f" Actual running: {actual_running}")
print(f" Actual finished: {actual_finished}")
print(f" Actual total: {actual_total}")
errors = []
if actual_pending != expected_pending:
errors.append(f"Pending mismatch: expected {expected_pending}, got {actual_pending}")
if actual_running != expected_running:
errors.append(f"Running mismatch: expected {expected_running}, got {actual_running}")
if actual_finished != expected_finished:
errors.append(f"Finished mismatch: expected {expected_finished}, got {actual_finished}")
# Some entries may have unknown statuses and fall into none of the 3 queues
uncategorized = len(statuses) - expected_total
if uncategorized > 0:
print(f"\n ⚠️ {uncategorized} entries have unrecognized status (not in any queue)")
uncategorized_statuses = [s for s in statuses if s not in ("Pending", "Rerun", "Waiting", "Running") and not s.startswith("Finished") and s != "PENDING_NEW_EVAL"]
for s in set(uncategorized_statuses):
cnt = uncategorized_statuses.count(s)
print(f" status='{s}': {cnt}")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… Categorization correct")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 3: Column completeness
# ═══════════════════════════════════════════════════════════════════════════════
def test_columns_present():
"""All three queue DataFrames should have the expected EVAL_COLS columns."""
print(f"\n{'#'*70}")
print(f" TEST: Column completeness")
print(f"{'#'*70}")
print(f" Expected EVAL_COLS: {EVAL_COLS}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
errors = []
for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]:
actual_cols = list(df.columns)
print(f" {name:10s} columns: {actual_cols}")
# Check that expected columns are present
for col in EVAL_COLS:
if col not in actual_cols:
errors.append(f"{name}: missing column '{col}'")
# Check for unexpected extra columns
extra = [c for c in actual_cols if c not in EVAL_COLS]
if extra:
print(f" {name:10s} extra columns (not in EVAL_COLS): {extra}")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… All expected columns present")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 4: Quant entries (missing 'precision') don't crash the queue
# ═══════════════════════════════════════════════════════════════════════════════
def test_quant_entry_no_crash():
"""Quant entries that use quant_precision instead of precision shouldn't crash."""
print(f"\n{'#'*70}")
print(f" TEST: Quant entry (missing 'precision') handling")
print(f"{'#'*70}")
# Find the actual quant entry we know about
quant_file = None
for root, _dirs, files in os.walk(REAL_STATUS_PATH):
for f in files:
if f.endswith(".json"):
fp = os.path.join(root, f)
try:
d = json.load(open(fp))
if "precision" not in d and "quant_precision" in d:
quant_file = fp
break
except:
pass
if quant_file:
break
if quant_file:
print(f" Found quant entry without 'precision': {quant_file}")
with open(quant_file) as fh:
d = json.load(fh)
print(f" status: {d.get('status')}")
print(f" quant_precision: {d.get('quant_precision')}")
print(f" has 'precision': {'precision' in d}")
else:
print(f" No quant entries without 'precision' found (skipping)")
# The main check: no crash
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
# If the quant entry is Pending, it should be in pending_df
if quant_file:
d = json.load(open(quant_file))
model_name = d["model"]
status = d["status"]
# Check which queue it ended up in
target_df = None
target_name = None
if status in ("Pending", "Rerun", "Waiting"):
target_df = pending_df
target_name = "pending"
elif status == "Running":
target_df = running_df
target_name = "running"
elif status.startswith("Finished") or status == "PENDING_NEW_EVAL":
target_df = finished_df
target_name = "finished"
if target_df is not None:
# The 'model' column contains clickable HTML, search within
found = target_df["model"].astype(str).str.contains(model_name, regex=False).any()
if found:
print(f" βœ… Quant entry '{model_name}' correctly in {target_name} queue")
else:
print(f" ❌ Quant entry '{model_name}' NOT found in {target_name} queue")
assert False, f"Quant entry missing from {target_name}"
# Check the 'precision' column for this entry β€” it should be NaN or empty
mask = target_df["model"].astype(str).str.contains(model_name, regex=False)
row = target_df[mask]
precision_val = row["precision"].iloc[0] if len(row) > 0 else "N/A"
print(f" precision column value: {precision_val} (type: {type(precision_val).__name__})")
else:
print(f" ⚠️ Quant entry has unrecognized status: {status}")
else:
print(f" βœ… No crash (no quant entries to test specifically)")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 5: Synthetic data β€” controlled status routing
# ═══════════════════════════════════════════════════════════════════════════════
def test_synthetic_status_routing():
"""Test status routing with synthetic data covering all status variants."""
print(f"\n{'#'*70}")
print(f" TEST: Synthetic status routing")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_")
try:
test_cases = [
("pending_model", "Pending", "pending"),
("rerun_model", "Rerun", "pending"),
("waiting_model", "Waiting", "pending"),
("running_model", "Running", "running"),
("finished_model", "Finished", "finished"),
("finished2_model","Finished_2024", "finished"), # startswith("Finished")
("pne_model", "PENDING_NEW_EVAL", "finished"),
]
for model, status, _ in test_cases:
entry = _create_test_entry(f"test/{model}", status)
fname = f"{model}_{status}.json"
with open(os.path.join(tmpdir, fname), "w") as fh:
json.dump(entry, fh)
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
errors = []
for model, status, expected_queue in test_cases:
full_model = f"test/{model}"
if expected_queue == "pending":
df = pending_df
elif expected_queue == "running":
df = running_df
else:
df = finished_df
found = df["model"].astype(str).str.contains(full_model, regex=False).any()
label = f"status='{status}' β†’ {expected_queue}"
if found:
print(f" βœ… {label}")
else:
print(f" ❌ {label} β€” NOT FOUND")
errors.append(label)
# Verify counts
print(f"\n Counts: pending={len(pending_df)}, running={len(running_df)}, finished={len(finished_df)}")
expected_counts = {"pending": 3, "running": 1, "finished": 3}
if len(pending_df) != expected_counts["pending"]:
errors.append(f"pending count: expected {expected_counts['pending']}, got {len(pending_df)}")
if len(running_df) != expected_counts["running"]:
errors.append(f"running count: expected {expected_counts['running']}, got {len(running_df)}")
if len(finished_df) != expected_counts["finished"]:
errors.append(f"finished count: expected {expected_counts['finished']}, got {len(finished_df)}")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… All synthetic entries correctly routed")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def test_resubmitted_quant_pending_not_overridden_by_old_failed_result():
"""A fresh quant re-submit should stay Pending despite an older failed result."""
tmpdir = tempfile.mkdtemp(prefix="test_queue_resubmit_quant_")
results_dir = tempfile.mkdtemp(prefix="test_results_resubmit_quant_")
try:
entry = {
"model": "org/model",
"revision": "main",
"private": False,
"quant_scheme": "INT4 (W4A16)",
"quant_precision": "4bit",
"quant_weight_dtype": "int4",
"status": "Pending",
"submitted_time": "2026-05-21T10:00:00Z",
"script": "auto_quant",
"model_params": 7.0,
}
with open(os.path.join(tmpdir, "request.json"), "w") as fh:
json.dump(entry, fh)
old_failed_result = {
"model_id": "org/model",
"generated_at": "2026-05-21T09:00:00Z",
"run_dir": "runs/old",
"copied_files": ["x"],
"quant_summary": {"scheme": "W4A16", "status": "failed"},
"accuracy": {"status": "missing"},
}
with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh:
json.dump(old_failed_result, fh)
finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df(
tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir
)
assert len(pending_df) == 1
assert pending_df["model"].astype(str).str.contains("org/model", regex=False).any()
assert len(failed_df) == 0
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
shutil.rmtree(results_dir, ignore_errors=True)
def test_submitted_quant_is_failed_when_result_is_newer():
"""A newer failed result should still move an older Pending request to Failed."""
tmpdir = tempfile.mkdtemp(prefix="test_queue_newer_result_quant_")
results_dir = tempfile.mkdtemp(prefix="test_results_newer_result_quant_")
try:
entry = {
"model": "org/model",
"revision": "main",
"private": False,
"quant_scheme": "INT4 (W4A16)",
"quant_precision": "4bit",
"quant_weight_dtype": "int4",
"status": "Pending",
"submitted_time": "2026-05-21T08:00:00Z",
"script": "auto_quant",
"model_params": 7.0,
}
with open(os.path.join(tmpdir, "request.json"), "w") as fh:
json.dump(entry, fh)
newer_failed_result = {
"model_id": "org/model",
"generated_at": "2026-05-21T09:00:00Z",
"run_dir": "runs/new",
"copied_files": ["x"],
"quant_summary": {"scheme": "W4A16", "status": "failed"},
"accuracy": {"status": "missing"},
}
with open(os.path.join(results_dir, "results_2026-05-21-09-00-00.json"), "w") as fh:
json.dump(newer_failed_result, fh)
finished_df, running_df, pending_df, failed_df = get_evaluation_queue_df(
tmpdir, QUANT_COLS, request_type="quant", results_path=results_dir
)
assert len(pending_df) == 0
assert len(failed_df) == 1
assert failed_df["model"].astype(str).str.contains("org/model", regex=False).any()
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
shutil.rmtree(results_dir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════════════════════
# Test 6: Synthetic β€” unknown status entries are silently dropped
# ═══════════════════════════════════════════════════════════════════════════════
def test_unknown_status_dropped():
"""Entries with unrecognized status values should not appear in any queue."""
print(f"\n{'#'*70}")
print(f" TEST: Unknown status entries are dropped")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_unknown_")
try:
entries = [
_create_test_entry("test/good", "Pending"),
_create_test_entry("test/bad1", "Cancelled"),
_create_test_entry("test/bad2", "Failed"),
_create_test_entry("test/bad3", "Deleted"),
]
for i, entry in enumerate(entries):
with open(os.path.join(tmpdir, f"entry_{i}.json"), "w") as fh:
json.dump(entry, fh)
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
total = len(finished_df) + len(running_df) + len(pending_df)
print(f" Total in queues: {total} (expected 1)")
assert total == 1, f"Expected 1 entry in queues, got {total}"
assert len(pending_df) == 1, f"Expected 1 pending, got {len(pending_df)}"
print(f" βœ… Only recognized status entries kept")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════════════════════
# Test 7: Synthetic β€” subdirectory entries are also loaded
# ═══════════════════════════════════════════════════════════════════════════════
def test_subdirectory_loading():
"""Entries in subdirectories should also be loaded (org/model pattern)."""
print(f"\n{'#'*70}")
print(f" TEST: Subdirectory loading")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_subdir_")
try:
# Root-level entry
root_entry = _create_test_entry("root/model1", "Finished")
with open(os.path.join(tmpdir, "root_model1.json"), "w") as fh:
json.dump(root_entry, fh)
# Subdirectory entry (like org/model pattern)
subdir = os.path.join(tmpdir, "myorg")
os.makedirs(subdir)
sub_entry = _create_test_entry("myorg/model2", "Running")
with open(os.path.join(subdir, "model2.json"), "w") as fh:
json.dump(sub_entry, fh)
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
errors = []
if len(finished_df) != 1:
errors.append(f"Expected 1 finished (root-level), got {len(finished_df)}")
if len(running_df) != 1:
errors.append(f"Expected 1 running (subdirectory), got {len(running_df)}")
# Verify the subdirectory model is found
if len(running_df) > 0:
found = running_df["model"].astype(str).str.contains("myorg/model2", regex=False).any()
if not found:
errors.append("Subdirectory model 'myorg/model2' not found in running queue")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… Both root and subdirectory entries loaded")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════════════════════
# Test 8: Malformed JSON files are skipped gracefully
# ═══════════════════════════════════════════════════════════════════════════════
def test_malformed_json_skipped():
"""Malformed JSON files should be skipped without crashing."""
print(f"\n{'#'*70}")
print(f" TEST: Malformed JSON handling")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_malformed_")
try:
# Valid entry
good = _create_test_entry("test/good", "Pending")
with open(os.path.join(tmpdir, "good.json"), "w") as fh:
json.dump(good, fh)
# Malformed JSON
with open(os.path.join(tmpdir, "bad.json"), "w") as fh:
fh.write("{broken json content")
# Empty file
with open(os.path.join(tmpdir, "empty.json"), "w") as fh:
fh.write("")
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
total = len(finished_df) + len(running_df) + len(pending_df)
print(f" Total entries loaded: {total} (expected 1)")
assert total == 1, f"Expected 1, got {total}"
print(f" βœ… Malformed files skipped, good entry loaded")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════════════════════
# Test 9: Empty directory returns empty DataFrames
# ═══════════════════════════════════════════════════════════════════════════════
def test_empty_directory():
"""An empty status directory should return three empty DataFrames."""
print(f"\n{'#'*70}")
print(f" TEST: Empty directory handling")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_empty_")
try:
finished_df, running_df, pending_df = get_evaluation_queue_df(tmpdir, EVAL_COLS)
assert len(finished_df) == 0, f"finished should be empty, got {len(finished_df)}"
assert len(running_df) == 0, f"running should be empty, got {len(running_df)}"
assert len(pending_df) == 0, f"pending should be empty, got {len(pending_df)}"
print(f" βœ… Empty directory returns 3 empty DataFrames")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════════════════════
# Test 10: Real data β€” model column contains clickable links
# ═══════════════════════════════════════════════════════════════════════════════
def test_model_column_clickable():
"""The model column should contain HTML hyperlinks (make_clickable_model)."""
print(f"\n{'#'*70}")
print(f" TEST: Model column contains clickable links")
print(f"{'#'*70}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
errors = []
for name, df in [("finished", finished_df), ("running", running_df), ("pending", pending_df)]:
if len(df) == 0:
print(f" {name}: empty (skipped)")
continue
# Check first row's model column
first_model = str(df["model"].iloc[0])
has_link = "<a " in first_model and "href=" in first_model
if has_link:
print(f" {name}: βœ… model column has HTML links (sample: {first_model[:80]}...)")
else:
errors.append(f"{name}: model column missing HTML links: {first_model[:120]}")
print(f" {name}: ❌ model column has no HTML links: {first_model[:120]}")
if errors:
assert False, "; ".join(errors)
else:
print(f" βœ… All non-empty queues have clickable model links")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 11: Real data β€” no duplicate entries across queues
# ═══════════════════════════════════════════════════════════════════════════════
def test_no_cross_queue_duplicates():
"""No model should appear in more than one queue."""
print(f"\n{'#'*70}")
print(f" TEST: No cross-queue duplicates")
print(f"{'#'*70}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
# Extract raw model text from HTML for comparison
def extract_models(df):
if len(df) == 0:
return set()
return set(df["model"].astype(str).tolist())
finished_models = extract_models(finished_df)
running_models = extract_models(running_df)
pending_models = extract_models(pending_df)
overlap_fr = finished_models & running_models
overlap_fp = finished_models & pending_models
overlap_rp = running_models & pending_models
errors = []
if overlap_fr:
errors.append(f"Finished ∩ Running: {len(overlap_fr)} entries")
if overlap_fp:
errors.append(f"Finished ∩ Pending: {len(overlap_fp)} entries")
if overlap_rp:
errors.append(f"Running ∩ Pending: {len(overlap_rp)} entries")
print(f" Finished models: {len(finished_models)}")
print(f" Running models: {len(running_models)}")
print(f" Pending models: {len(pending_models)}")
print(f" Overlaps: F∩R={len(overlap_fr)}, F∩P={len(overlap_fp)}, R∩P={len(overlap_rp)}")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… No cross-queue duplicates")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 12: Real data β€” Finished queue has the most entries
# ═══════════════════════════════════════════════════════════════════════════════
def test_queue_size_sanity():
"""Basic sanity: Finished queue should be the largest."""
print(f"\n{'#'*70}")
print(f" TEST: Queue size sanity")
print(f"{'#'*70}")
finished_df, running_df, pending_df = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
f, r, p = len(finished_df), len(running_df), len(pending_df)
print(f" Finished={f}, Running={r}, Pending={p}")
errors = []
if f == 0:
errors.append("Finished queue is empty β€” expected many entries")
if f < r:
errors.append(f"Finished ({f}) < Running ({r}) β€” unexpected")
if f < p:
errors.append(f"Finished ({f}) < Pending ({p}) β€” unexpected for a mature leaderboard")
total = f + r + p
if total == 0:
errors.append("All queues empty β€” data not loaded?")
print(f" Total across queues: {total}")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… Queue sizes look reasonable")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 13: Real data β€” eval filter excludes quant entries
# ═══════════════════════════════════════════════════════════════════════════════
def test_eval_filter_excludes_quant():
"""request_type='eval' should only include _eval_request_ files."""
print(f"\n{'#'*70}")
print(f" TEST: eval filter excludes quant entries")
print(f"{'#'*70}")
# Load all (no filter)
all_fin, all_run, all_pend = get_evaluation_queue_df(REAL_STATUS_PATH, EVAL_COLS)
total_all = len(all_fin) + len(all_run) + len(all_pend)
# Load eval only
eval_fin, eval_run, eval_pend = get_evaluation_queue_df(
REAL_STATUS_PATH, EVAL_COLS, request_type="eval"
)
total_eval = len(eval_fin) + len(eval_run) + len(eval_pend)
# Load quant only
quant_fin, quant_run, quant_pend = get_evaluation_queue_df(
REAL_STATUS_PATH, QUANT_COLS, request_type="quant"
)
total_quant = len(quant_fin) + len(quant_run) + len(quant_pend)
print(f" All (no filter): {total_all}")
print(f" Eval only: {total_eval}")
print(f" Quant only: {total_quant}")
print(f" Sum (eval+quant): {total_eval + total_quant}")
errors = []
if total_eval + total_quant != total_all:
errors.append(
f"eval({total_eval}) + quant({total_quant}) = {total_eval + total_quant} "
f"!= all({total_all})"
)
if total_eval == 0:
errors.append("eval filter returned 0 entries β€” expected many")
if total_eval >= total_all and total_quant > 0:
errors.append("eval filter didn't exclude any quant entries")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… eval + quant = total, filters work correctly")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 14: Quant queue uses QUANT_COLS correctly
# ═══════════════════════════════════════════════════════════════════════════════
def test_quant_queue_columns():
"""Quant queue should use quant-specific columns (quant_scheme, input_dtype)."""
print(f"\n{'#'*70}")
print(f" TEST: Quant queue has correct columns")
print(f"{'#'*70}")
print(f" Expected QUANT_COLS: {QUANT_COLS}")
quant_fin, quant_run, quant_pend = get_evaluation_queue_df(
REAL_STATUS_PATH, QUANT_COLS, request_type="quant"
)
total = len(quant_fin) + len(quant_run) + len(quant_pend)
print(f" Total quant entries: {total}")
if total == 0:
print(f" ⚠️ No quant entries in real data β€” checking column structure only")
for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]:
actual_cols = list(df.columns)
print(f" {name:10s} columns: {actual_cols}")
# Verify columns don't contain eval-only fields
errors = []
for name, df in [("finished", quant_fin), ("running", quant_run), ("pending", quant_pend)]:
for col in ["weight_type"]:
if col in df.columns:
errors.append(f"{name}: should NOT have eval-specific column '{col}'")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… Quant queue columns are correct")
# ═══════════════════════════════════════════════════════════════════════════════
# Test 15: Synthetic β€” request_type filter routes correctly
# ═══════════════════════════════════════════════════════════════════════════════
def test_synthetic_request_type_filter():
"""Synthetic test: eval/quant filter correctly separates by filename pattern."""
print(f"\n{'#'*70}")
print(f" TEST: Synthetic request_type filter")
print(f"{'#'*70}")
tmpdir = tempfile.mkdtemp(prefix="test_queue_filter_")
try:
# Create eval entries
for i in range(3):
entry = _create_test_entry(f"org/eval_model_{i}", "Finished")
fname = f"eval_model_{i}_eval_request_False_AWQ_4bit_int4.json"
with open(os.path.join(tmpdir, fname), "w") as fh:
json.dump(entry, fh)
# Create quant entries
for i in range(2):
entry = _create_test_entry(f"org/quant_model_{i}", "Pending",
extra={"quant_scheme": "INT4 (W4A16)", "input_dtype": "bfloat16"})
fname = f"quant_model_{i}_quant_request_False_INT4.json"
with open(os.path.join(tmpdir, fname), "w") as fh:
json.dump(entry, fh)
# No filter β€” all 5
all_fin, all_run, all_pend = get_evaluation_queue_df(tmpdir, EVAL_COLS)
total_all = len(all_fin) + len(all_run) + len(all_pend)
# Eval filter β€” should get 3
eval_fin, eval_run, eval_pend = get_evaluation_queue_df(
tmpdir, EVAL_COLS, request_type="eval"
)
total_eval = len(eval_fin) + len(eval_run) + len(eval_pend)
# Quant filter β€” should get 2
quant_fin, quant_run, quant_pend = get_evaluation_queue_df(
tmpdir, QUANT_COLS, request_type="quant"
)
total_quant = len(quant_fin) + len(quant_run) + len(quant_pend)
print(f" All: {total_all} (expected 5)")
print(f" Eval: {total_eval} (expected 3)")
print(f" Quant: {total_quant} (expected 2)")
errors = []
if total_all != 5:
errors.append(f"All: expected 5, got {total_all}")
if total_eval != 3:
errors.append(f"Eval: expected 3, got {total_eval}")
if total_quant != 2:
errors.append(f"Quant: expected 2, got {total_quant}")
# Verify quant entries have quant_scheme column
if total_quant > 0 and "quant_scheme" in quant_pend.columns:
vals = quant_pend["quant_scheme"].dropna().tolist()
if all(v == "INT4 (W4A16)" for v in vals):
print(f" βœ… Quant entries have correct quant_scheme")
else:
errors.append(f"quant_scheme values: {vals}")
if errors:
for e in errors:
print(f" ❌ {e}")
assert False, "; ".join(errors)
else:
print(f" βœ… request_type filter works correctly")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ═══════════════════════════════════════════════════════════════════════════════
# Main
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 70)
print(" Evaluation Queue Unit Tests")
print("=" * 70)
tests = [
("test_real_data_loads", test_real_data_loads),
("test_status_categorization", test_status_categorization),
("test_columns_present", test_columns_present),
("test_quant_entry_no_crash", test_quant_entry_no_crash),
("test_synthetic_status_routing", test_synthetic_status_routing),
("test_unknown_status_dropped", test_unknown_status_dropped),
("test_subdirectory_loading", test_subdirectory_loading),
("test_malformed_json_skipped", test_malformed_json_skipped),
("test_empty_directory", test_empty_directory),
("test_model_column_clickable", test_model_column_clickable),
("test_no_cross_queue_duplicates", test_no_cross_queue_duplicates),
("test_queue_size_sanity", test_queue_size_sanity),
("test_eval_filter_excludes_quant", test_eval_filter_excludes_quant),
("test_quant_queue_columns", test_quant_queue_columns),
("test_synthetic_request_type_filter", test_synthetic_request_type_filter),
]
results = {}
for name, func in tests:
try:
func()
results[name] = True
except Exception as e:
results[name] = False
print(f" ❌ EXCEPTION: {e}")
print(f"\n{'='*70}")
print(" SUMMARY")
print(f"{'='*70}")
for name, passed in results.items():
status = "βœ… PASS" if passed else "❌ FAIL"
print(f" {status} {name}")
total = len(results)
passed = sum(1 for v in results.values() if v)
print(f"\n {passed}/{total} tests passed")
print(f"{'='*70}")