final_test / models /xgboost /fetch_sweep_results.py
Abdelrahman Almatrooshi
Deploy snapshot from main b7a59b11809483dfc959f196f1930240f2662c49
22a6915
"""Export XGBoost sweep trial metrics from ClearML to CSV.
This helper is intended for the local Optuna workflow in sweep_local.py,
where each trial logs val_loss / val_f1 / val_accuracy as ClearML scalars.
"""
from __future__ import annotations
import argparse
import csv
from pathlib import Path
from typing import Optional
from clearml import Task
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier
from data_preparation.prepare_dataset import get_default_split_config, get_numpy_splits
DEFAULT_PROJECT = "FocusGuards Large Group Project"
DEFAULT_TAGS = ["xgboost", "optuna_manual"]
DEFAULT_OUTPUT = Path("models/xgboost/sweep_results_all_40.csv")
DEFAULT_NAME_PREFIX = "XGBoost Sweep Trial #"
DEFAULT_LIMIT = 40
DEFAULT_SORT_BY = "val_f1"
def _read_metric(metrics: dict, title: str, series: str) -> Optional[float]:
raw_value = metrics.get(title, {}).get(series, {}).get("last")
if raw_value is None:
return None
try:
return float(raw_value)
except (TypeError, ValueError):
return None
def _to_float(value, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _task_time_key(task: Task) -> str:
# Prefer last update time when available so --limit keeps the latest trials.
data = getattr(task, "data", None)
if data is not None:
for attr in ("last_update", "last_iteration_timestamp", "created"):
value = getattr(data, attr, None)
if value:
return str(value)
for attr in ("last_update", "created"):
value = getattr(task, attr, None)
if value:
return str(value)
return ""
def _is_valid_core_metric(value: Optional[float]) -> bool:
return value is not None and value > 0.0
def _sort_metric(value: Optional[float], default: float) -> float:
return value if value is not None else default
def _compute_missing_val_accuracy(rows: list[dict], seed: int) -> int:
need_backfill = [r for r in rows if r["val_accuracy"] is None]
if not need_backfill:
return 0
split_ratios, _default_seed = get_default_split_config()
try:
splits, _num_features, _num_classes, _scaler = get_numpy_splits(
model_name="face_orientation",
split_ratios=split_ratios,
seed=seed,
scale=False,
)
except Exception as exc:
print(f"[FETCH] WARNING: Could not backfill val_accuracy (dataset unavailable): {exc}")
return 0
X_train, y_train = splits["X_train"], splits["y_train"]
X_val, y_val = splits["X_val"], splits["y_val"]
computed = 0
for row in need_backfill:
try:
params = {
"n_estimators": int(row["n_estimators"]),
"max_depth": int(row["max_depth"]),
"learning_rate": float(row["learning_rate"]),
"subsample": float(row["subsample"]),
"colsample_bytree": float(row["colsample_bytree"]),
"reg_alpha": float(row["reg_alpha"]),
"reg_lambda": float(row["reg_lambda"]),
"eval_metric": "logloss",
"random_state": seed,
"verbosity": 0,
}
model = XGBClassifier(**params)
model.fit(X_train, y_train)
val_preds = model.predict(X_val)
row["val_accuracy"] = float(accuracy_score(y_val, val_preds))
computed += 1
except Exception as exc:
print(f"[FETCH] WARNING: Failed val_accuracy backfill for task_id={row['task_id']}: {exc}")
return computed
def _sort_key(row: dict, sort_by: str) -> tuple[float, float, float]:
val_loss = _sort_metric(row["val_loss"], float("inf"))
val_f1 = _sort_metric(row["val_f1"], float("-inf"))
val_accuracy = _sort_metric(row["val_accuracy"], float("-inf"))
if sort_by == "val_loss":
return (val_loss, -val_f1, -val_accuracy)
if sort_by == "val_accuracy":
return (-val_accuracy, -val_f1, val_loss)
return (-val_f1, val_loss, -val_accuracy)
def fetch_rows(
project_name: str,
tags: list[str],
name_prefix: str,
limit: int,
drop_zero_metrics: bool,
sort_by: str,
compute_missing_val_accuracy: bool,
seed: int,
) -> list[dict]:
print(
f"[FETCH] Project={project_name} Tags={tags} "
f"NamePrefix={name_prefix!r} Limit={limit}"
)
tasks = Task.get_tasks(
project_name=project_name,
tags=tags,
task_filter={"status": ["completed"]},
)
filtered_tasks = [t for t in tasks if (t.name or "").startswith(name_prefix)]
filtered_tasks.sort(key=_task_time_key, reverse=True)
if limit > 0:
filtered_tasks = filtered_tasks[:limit]
print(
f"[FETCH] Total completed tagged tasks={len(tasks)} | "
f"name-matched={len(filtered_tasks)}"
)
rows = []
for task in filtered_tasks:
params = task.get_parameters() or {}
metrics = task.get_last_scalar_metrics() or {}
val_loss = _read_metric(metrics, "Loss", "Val")
val_accuracy = _read_metric(metrics, "Summary", "val_accuracy")
val_f1 = _read_metric(metrics, "Summary", "val_f1")
row = {
"task_id": task.id,
"val_loss": val_loss,
"val_accuracy": val_accuracy,
"val_f1": val_f1,
"n_estimators": _to_float(params.get("General/n_estimators", params.get("n_estimators"))),
"max_depth": _to_float(params.get("General/max_depth", params.get("max_depth"))),
"learning_rate": _to_float(params.get("General/learning_rate", params.get("learning_rate"))),
"subsample": _to_float(params.get("General/subsample", params.get("subsample"))),
"colsample_bytree": _to_float(
params.get("General/colsample_bytree", params.get("colsample_bytree"))
),
"reg_alpha": _to_float(params.get("General/reg_alpha", params.get("reg_alpha"))),
"reg_lambda": _to_float(params.get("General/reg_lambda", params.get("reg_lambda"))),
}
rows.append(row)
if compute_missing_val_accuracy:
computed = _compute_missing_val_accuracy(rows, seed=seed)
print(f"[FETCH] Backfilled val_accuracy for {computed} rows where it was missing")
if drop_zero_metrics:
before = len(rows)
rows = [
r
for r in rows
if (
_is_valid_core_metric(r["val_loss"])
and _is_valid_core_metric(r["val_accuracy"])
and _is_valid_core_metric(r["val_f1"])
)
]
print(f"[FETCH] Skipped tasks with missing/zero core metrics: {before - len(rows)}")
# Default ranking is by validation F1, then val_loss, then val_accuracy.
rows.sort(key=lambda r: _sort_key(r, sort_by=sort_by))
return rows
def write_csv(rows: list[dict], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
"task_id",
"val_loss",
"val_accuracy",
"val_f1",
"n_estimators",
"max_depth",
"learning_rate",
"subsample",
"colsample_bytree",
"reg_alpha",
"reg_lambda",
]
with output_path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"[FETCH] Wrote {len(rows)} rows to {output_path}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--project", default=DEFAULT_PROJECT, help="ClearML project name")
parser.add_argument(
"--tags",
nargs="+",
default=DEFAULT_TAGS,
help="ClearML task tags to filter (default: xgboost optuna_manual)",
)
parser.add_argument(
"--output",
default=str(DEFAULT_OUTPUT),
help="Output CSV path",
)
parser.add_argument(
"--name-prefix",
default=DEFAULT_NAME_PREFIX,
help="Only include tasks whose name starts with this prefix",
)
parser.add_argument(
"--limit",
type=int,
default=DEFAULT_LIMIT,
help="Max number of latest matching tasks to inspect; <=0 means no limit",
)
parser.add_argument(
"--keep-zero-metrics",
action="store_true",
help="Keep tasks even when val_loss/val_accuracy/val_f1 are missing or zero",
)
parser.add_argument(
"--sort-by",
choices=["val_f1", "val_loss", "val_accuracy"],
default=DEFAULT_SORT_BY,
help="Primary ranking metric for exported rows (default: val_f1)",
)
parser.add_argument(
"--compute-missing-val-accuracy",
action="store_true",
help="Train per-row models to backfill val_accuracy only when it is missing",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed used when backfilling missing val_accuracy",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
rows = fetch_rows(
project_name=args.project,
tags=args.tags,
name_prefix=args.name_prefix,
limit=args.limit,
drop_zero_metrics=not args.keep_zero_metrics,
sort_by=args.sort_by,
compute_missing_val_accuracy=args.compute_missing_val_accuracy,
seed=args.seed,
)
write_csv(rows, output_path=Path(args.output))
if __name__ == "__main__":
main()