| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Iterable |
|
|
| from src.hackathon.data import list_dummy_models, list_dummy_stimuli, stimulus_key |
|
|
| MODEL_REGISTRY_ENV = "HACKATHON_MODEL_REGISTRY" |
| BLUE_MODEL_REGISTRY_ENV = "HACKATHON_BLUE_MODEL_REGISTRY" |
| RED_MODEL_REGISTRY_ENV = "HACKATHON_RED_MODEL_REGISTRY" |
| STIMULI_CATALOG_ENV = "HACKATHON_STIMULI_CATALOG" |
| BLUE_STIMULI_CATALOG_ENV = "HACKATHON_BLUE_STIMULI_CATALOG" |
|
|
| BLUE_TEAM_REQUIRED_MODELS = 20 |
| RED_TEAM_REQUIRED_STIMULI = 1000 |
|
|
|
|
| def _ensure_unique(values: list[str], label: str) -> None: |
| if len(values) != len(set(values)): |
| raise ValueError(f"{label} must be unique.") |
|
|
|
|
| def _load_json(path: Path) -> object: |
| try: |
| return json.loads(path.read_text()) |
| except FileNotFoundError as exc: |
| raise ValueError(f"File not found: {path}") from exc |
| except json.JSONDecodeError as exc: |
| raise ValueError(f"Invalid JSON in {path}: {exc}") from exc |
|
|
|
|
| def _parse_registry_entries(path: str | None) -> list[dict | str]: |
| """Parse registry file into raw entries (strings or dicts).""" |
| if not path: |
| return [] |
|
|
| data = _load_json(Path(path)) |
| if isinstance(data, dict): |
| entries = data.get("models") |
| if entries is None: |
| raise ValueError("Model registry JSON must be a list or contain a 'models' list.") |
| elif isinstance(data, list): |
| entries = data |
| else: |
| raise ValueError("Model registry JSON must be a list or object.") |
|
|
| return entries |
|
|
|
|
| def load_model_registry(path: str | None) -> set[str]: |
| if not path: |
| return set(list_dummy_models()) |
|
|
| entries = _parse_registry_entries(path) |
| names: list[str] = [] |
| for idx, entry in enumerate(entries, start=1): |
| if isinstance(entry, str): |
| name = entry.strip() |
| elif isinstance(entry, dict): |
| name = str(entry.get("model_name", "")).strip() |
| else: |
| raise ValueError(f"Model registry entry {idx} must be a string or object.") |
|
|
| if not name: |
| raise ValueError(f"Model registry entry {idx} is missing model_name.") |
| names.append(name) |
|
|
| _ensure_unique(names, "Model registry entries") |
| return set(names) |
|
|
|
|
| def load_model_registry_specs(path: str | None) -> dict[str, dict]: |
| """Load full model specs keyed by model_name. |
| |
| Returns a dict mapping model_name -> {"layer": ..., "embedding": ..., "preprocess": ...}. |
| """ |
| if not path: |
| return {} |
|
|
| entries = _parse_registry_entries(path) |
| specs: dict[str, dict] = {} |
| for idx, entry in enumerate(entries, start=1): |
| if not isinstance(entry, dict): |
| continue |
| name = str(entry.get("model_name", "")).strip() |
| if not name: |
| continue |
| specs[name] = { |
| "layer": str(entry.get("layer", "")).strip(), |
| "embedding": str(entry.get("embedding", "flatten")).strip(), |
| "preprocess": entry.get("preprocess", {}), |
| } |
| return specs |
|
|
|
|
| def load_stimuli_catalog(path: str | None) -> list[dict[str, str]]: |
| if not path: |
| return list_dummy_stimuli() |
|
|
| path_obj = Path(path) |
| if path_obj.suffix == ".jsonl": |
| lines = path_obj.read_text().splitlines() |
| entries = [json.loads(line) for line in lines if line.strip()] |
| else: |
| data = _load_json(path_obj) |
| if isinstance(data, dict): |
| entries = data.get("stimuli") |
| if entries is None: |
| raise ValueError("Stimuli catalog JSON must be a list or contain a 'stimuli' list.") |
| elif isinstance(data, list): |
| entries = data |
| else: |
| raise ValueError("Stimuli catalog JSON must be a list or object.") |
|
|
| stimuli: list[dict[str, str]] = [] |
| for idx, entry in enumerate(entries, start=1): |
| if not isinstance(entry, dict): |
| raise ValueError(f"Stimulus entry {idx} must be an object.") |
| dataset_name = str(entry.get("dataset_name", "")).strip() |
| image_identifier = str(entry.get("image_identifier", "")).strip() |
| if not dataset_name or not image_identifier: |
| raise ValueError(f"Stimulus entry {idx} must include dataset_name and image_identifier.") |
| stimuli.append({"dataset_name": dataset_name, "image_identifier": image_identifier}) |
|
|
| keys = [stimulus_key(stimulus) for stimulus in stimuli] |
| _ensure_unique(keys, "Stimuli catalog entries") |
| return stimuli |
|
|
|
|
| def validate_blue_submission( |
| payload: dict, |
| *, |
| model_registry: Iterable[str] | None = None, |
| registry_specs: dict[str, dict] | None = None, |
| ) -> list[str]: |
| if model_registry is None: |
| model_registry = list_dummy_models() |
| registry_set = set(model_registry) |
|
|
| models = payload.get("models") |
| if not isinstance(models, list): |
| raise ValueError("Blue submission must include a list of models.") |
|
|
| names: list[str] = [] |
| layer_mismatches: list[str] = [] |
| missing_layers: list[str] = [] |
| for idx, item in enumerate(models, start=1): |
| if isinstance(item, str): |
| name = item.strip() |
| layer_name = None |
| elif isinstance(item, dict): |
| name = str(item.get("model_name", "")).strip() |
| layer_name = str(item.get("layer_name", "")).strip() or None |
| else: |
| raise ValueError(f"Model entry {idx} must be a string or object with model_name.") |
| if not name: |
| raise ValueError(f"Model entry {idx} is missing model_name.") |
| names.append(name) |
|
|
| |
| if registry_specs: |
| if not layer_name: |
| missing_layers.append(f"Model entry {idx} ({name}) is missing layer_name.") |
| elif name in registry_specs: |
| expected_layer = registry_specs[name].get("layer", "") |
| if layer_name != expected_layer: |
| layer_mismatches.append( |
| f"{name}: submitted layer_name '{layer_name}' " |
| f"does not match registry layer '{expected_layer}'" |
| ) |
|
|
| _ensure_unique(names, "Model selections") |
|
|
| if len(names) != BLUE_TEAM_REQUIRED_MODELS: |
| raise ValueError( |
| f"Blue team submission must contain exactly {BLUE_TEAM_REQUIRED_MODELS} " |
| f"unique models, but got {len(names)}." |
| ) |
|
|
| missing = [name for name in names if name not in registry_set] |
| if missing: |
| missing_str = ", ".join(missing) |
| raise ValueError(f"Unknown models requested: {missing_str}") |
|
|
| if missing_layers: |
| raise ValueError( |
| f"Missing layer_name for {len(missing_layers)} model(s):\n" |
| + "\n".join(f" - {m}" for m in missing_layers) |
| ) |
|
|
| if layer_mismatches: |
| raise ValueError( |
| f"Layer name mismatch for {len(layer_mismatches)} model(s):\n" |
| + "\n".join(f" - {m}" for m in layer_mismatches) |
| ) |
|
|
| return names |
|
|
|
|
| def validate_red_submission( |
| payload: dict, |
| *, |
| stimuli_catalog: Iterable[dict[str, str]] | None = None, |
| ) -> list[str]: |
| if stimuli_catalog is None: |
| stimuli_catalog = list_dummy_stimuli() |
|
|
| images = payload.get("differentiating_images") |
| if not isinstance(images, list): |
| raise ValueError("Red submission must include differentiating_images.") |
| if len(images) != RED_TEAM_REQUIRED_STIMULI: |
| raise ValueError( |
| f"Red team submission must contain exactly {RED_TEAM_REQUIRED_STIMULI} " |
| f"unique stimuli, but got {len(images)}." |
| ) |
|
|
| keys: list[str] = [] |
| for idx, item in enumerate(images, start=1): |
| if not isinstance(item, dict): |
| raise ValueError(f"Stimulus entry {idx} must be an object.") |
| dataset_name = str(item.get("dataset_name", "")).strip() |
| image_identifier = str(item.get("image_identifier", "")).strip() |
| if not dataset_name or not image_identifier: |
| raise ValueError(f"Stimulus entry {idx} must include dataset_name and image_identifier.") |
| keys.append(stimulus_key({"dataset_name": dataset_name, "image_identifier": image_identifier})) |
|
|
| _ensure_unique(keys, "Stimulus selections") |
|
|
| available = {stimulus_key(stimulus) for stimulus in stimuli_catalog} |
| missing = [key for key in keys if key not in available] |
| if missing: |
| missing_str = ", ".join(missing) |
| raise ValueError(f"Unknown stimuli requested: {missing_str}") |
|
|
| return keys |
|
|