Spaces:
Sleeping
Sleeping
| """Export XGBoost sweep trial metrics from ClearML to CSV. | |
| This helper is intended for the local Optuna workflow in sweep_local.py, | |
| where each trial logs val_loss / val_f1 / val_accuracy as ClearML scalars. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| from pathlib import Path | |
| from typing import Optional | |
| from clearml import Task | |
| from sklearn.metrics import accuracy_score | |
| from xgboost import XGBClassifier | |
| from data_preparation.prepare_dataset import get_default_split_config, get_numpy_splits | |
| DEFAULT_PROJECT = "FocusGuards Large Group Project" | |
| DEFAULT_TAGS = ["xgboost", "optuna_manual"] | |
| DEFAULT_OUTPUT = Path("models/xgboost/sweep_results_all_40.csv") | |
| DEFAULT_NAME_PREFIX = "XGBoost Sweep Trial #" | |
| DEFAULT_LIMIT = 40 | |
| DEFAULT_SORT_BY = "val_f1" | |
| def _read_metric(metrics: dict, title: str, series: str) -> Optional[float]: | |
| raw_value = metrics.get(title, {}).get(series, {}).get("last") | |
| if raw_value is None: | |
| return None | |
| try: | |
| return float(raw_value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _to_float(value, default: float = 0.0) -> float: | |
| try: | |
| return float(value) | |
| except (TypeError, ValueError): | |
| return default | |
| def _task_time_key(task: Task) -> str: | |
| # Prefer last update time when available so --limit keeps the latest trials. | |
| data = getattr(task, "data", None) | |
| if data is not None: | |
| for attr in ("last_update", "last_iteration_timestamp", "created"): | |
| value = getattr(data, attr, None) | |
| if value: | |
| return str(value) | |
| for attr in ("last_update", "created"): | |
| value = getattr(task, attr, None) | |
| if value: | |
| return str(value) | |
| return "" | |
| def _is_valid_core_metric(value: Optional[float]) -> bool: | |
| return value is not None and value > 0.0 | |
| def _sort_metric(value: Optional[float], default: float) -> float: | |
| return value if value is not None else default | |
| def _compute_missing_val_accuracy(rows: list[dict], seed: int) -> int: | |
| need_backfill = [r for r in rows if r["val_accuracy"] is None] | |
| if not need_backfill: | |
| return 0 | |
| split_ratios, _default_seed = get_default_split_config() | |
| try: | |
| splits, _num_features, _num_classes, _scaler = get_numpy_splits( | |
| model_name="face_orientation", | |
| split_ratios=split_ratios, | |
| seed=seed, | |
| scale=False, | |
| ) | |
| except Exception as exc: | |
| print(f"[FETCH] WARNING: Could not backfill val_accuracy (dataset unavailable): {exc}") | |
| return 0 | |
| X_train, y_train = splits["X_train"], splits["y_train"] | |
| X_val, y_val = splits["X_val"], splits["y_val"] | |
| computed = 0 | |
| for row in need_backfill: | |
| try: | |
| params = { | |
| "n_estimators": int(row["n_estimators"]), | |
| "max_depth": int(row["max_depth"]), | |
| "learning_rate": float(row["learning_rate"]), | |
| "subsample": float(row["subsample"]), | |
| "colsample_bytree": float(row["colsample_bytree"]), | |
| "reg_alpha": float(row["reg_alpha"]), | |
| "reg_lambda": float(row["reg_lambda"]), | |
| "eval_metric": "logloss", | |
| "random_state": seed, | |
| "verbosity": 0, | |
| } | |
| model = XGBClassifier(**params) | |
| model.fit(X_train, y_train) | |
| val_preds = model.predict(X_val) | |
| row["val_accuracy"] = float(accuracy_score(y_val, val_preds)) | |
| computed += 1 | |
| except Exception as exc: | |
| print(f"[FETCH] WARNING: Failed val_accuracy backfill for task_id={row['task_id']}: {exc}") | |
| return computed | |
| def _sort_key(row: dict, sort_by: str) -> tuple[float, float, float]: | |
| val_loss = _sort_metric(row["val_loss"], float("inf")) | |
| val_f1 = _sort_metric(row["val_f1"], float("-inf")) | |
| val_accuracy = _sort_metric(row["val_accuracy"], float("-inf")) | |
| if sort_by == "val_loss": | |
| return (val_loss, -val_f1, -val_accuracy) | |
| if sort_by == "val_accuracy": | |
| return (-val_accuracy, -val_f1, val_loss) | |
| return (-val_f1, val_loss, -val_accuracy) | |
| def fetch_rows( | |
| project_name: str, | |
| tags: list[str], | |
| name_prefix: str, | |
| limit: int, | |
| drop_zero_metrics: bool, | |
| sort_by: str, | |
| compute_missing_val_accuracy: bool, | |
| seed: int, | |
| ) -> list[dict]: | |
| print( | |
| f"[FETCH] Project={project_name} Tags={tags} " | |
| f"NamePrefix={name_prefix!r} Limit={limit}" | |
| ) | |
| tasks = Task.get_tasks( | |
| project_name=project_name, | |
| tags=tags, | |
| task_filter={"status": ["completed"]}, | |
| ) | |
| filtered_tasks = [t for t in tasks if (t.name or "").startswith(name_prefix)] | |
| filtered_tasks.sort(key=_task_time_key, reverse=True) | |
| if limit > 0: | |
| filtered_tasks = filtered_tasks[:limit] | |
| print( | |
| f"[FETCH] Total completed tagged tasks={len(tasks)} | " | |
| f"name-matched={len(filtered_tasks)}" | |
| ) | |
| rows = [] | |
| for task in filtered_tasks: | |
| params = task.get_parameters() or {} | |
| metrics = task.get_last_scalar_metrics() or {} | |
| val_loss = _read_metric(metrics, "Loss", "Val") | |
| val_accuracy = _read_metric(metrics, "Summary", "val_accuracy") | |
| val_f1 = _read_metric(metrics, "Summary", "val_f1") | |
| row = { | |
| "task_id": task.id, | |
| "val_loss": val_loss, | |
| "val_accuracy": val_accuracy, | |
| "val_f1": val_f1, | |
| "n_estimators": _to_float(params.get("General/n_estimators", params.get("n_estimators"))), | |
| "max_depth": _to_float(params.get("General/max_depth", params.get("max_depth"))), | |
| "learning_rate": _to_float(params.get("General/learning_rate", params.get("learning_rate"))), | |
| "subsample": _to_float(params.get("General/subsample", params.get("subsample"))), | |
| "colsample_bytree": _to_float( | |
| params.get("General/colsample_bytree", params.get("colsample_bytree")) | |
| ), | |
| "reg_alpha": _to_float(params.get("General/reg_alpha", params.get("reg_alpha"))), | |
| "reg_lambda": _to_float(params.get("General/reg_lambda", params.get("reg_lambda"))), | |
| } | |
| rows.append(row) | |
| if compute_missing_val_accuracy: | |
| computed = _compute_missing_val_accuracy(rows, seed=seed) | |
| print(f"[FETCH] Backfilled val_accuracy for {computed} rows where it was missing") | |
| if drop_zero_metrics: | |
| before = len(rows) | |
| rows = [ | |
| r | |
| for r in rows | |
| if ( | |
| _is_valid_core_metric(r["val_loss"]) | |
| and _is_valid_core_metric(r["val_accuracy"]) | |
| and _is_valid_core_metric(r["val_f1"]) | |
| ) | |
| ] | |
| print(f"[FETCH] Skipped tasks with missing/zero core metrics: {before - len(rows)}") | |
| # Default ranking is by validation F1, then val_loss, then val_accuracy. | |
| rows.sort(key=lambda r: _sort_key(r, sort_by=sort_by)) | |
| return rows | |
| def write_csv(rows: list[dict], output_path: Path) -> None: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| fieldnames = [ | |
| "task_id", | |
| "val_loss", | |
| "val_accuracy", | |
| "val_f1", | |
| "n_estimators", | |
| "max_depth", | |
| "learning_rate", | |
| "subsample", | |
| "colsample_bytree", | |
| "reg_alpha", | |
| "reg_lambda", | |
| ] | |
| with output_path.open("w", newline="", encoding="utf-8") as handle: | |
| writer = csv.DictWriter(handle, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| print(f"[FETCH] Wrote {len(rows)} rows to {output_path}") | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("--project", default=DEFAULT_PROJECT, help="ClearML project name") | |
| parser.add_argument( | |
| "--tags", | |
| nargs="+", | |
| default=DEFAULT_TAGS, | |
| help="ClearML task tags to filter (default: xgboost optuna_manual)", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default=str(DEFAULT_OUTPUT), | |
| help="Output CSV path", | |
| ) | |
| parser.add_argument( | |
| "--name-prefix", | |
| default=DEFAULT_NAME_PREFIX, | |
| help="Only include tasks whose name starts with this prefix", | |
| ) | |
| parser.add_argument( | |
| "--limit", | |
| type=int, | |
| default=DEFAULT_LIMIT, | |
| help="Max number of latest matching tasks to inspect; <=0 means no limit", | |
| ) | |
| parser.add_argument( | |
| "--keep-zero-metrics", | |
| action="store_true", | |
| help="Keep tasks even when val_loss/val_accuracy/val_f1 are missing or zero", | |
| ) | |
| parser.add_argument( | |
| "--sort-by", | |
| choices=["val_f1", "val_loss", "val_accuracy"], | |
| default=DEFAULT_SORT_BY, | |
| help="Primary ranking metric for exported rows (default: val_f1)", | |
| ) | |
| parser.add_argument( | |
| "--compute-missing-val-accuracy", | |
| action="store_true", | |
| help="Train per-row models to backfill val_accuracy only when it is missing", | |
| ) | |
| parser.add_argument( | |
| "--seed", | |
| type=int, | |
| default=42, | |
| help="Random seed used when backfilling missing val_accuracy", | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| rows = fetch_rows( | |
| project_name=args.project, | |
| tags=args.tags, | |
| name_prefix=args.name_prefix, | |
| limit=args.limit, | |
| drop_zero_metrics=not args.keep_zero_metrics, | |
| sort_by=args.sort_by, | |
| compute_missing_val_accuracy=args.compute_missing_val_accuracy, | |
| seed=args.seed, | |
| ) | |
| write_csv(rows, output_path=Path(args.output)) | |
| if __name__ == "__main__": | |
| main() | |