HYPERDOA / hyperdoa /evaluation.py
pwh9882's picture
initial commit
37ed720
"""
Evaluation functions for HDC-based DOA estimation.
This module provides utilities for training and evaluating
HDC models for direction-of-arrival estimation.
Functions:
compute_mspe: Compute permutation-invariant MSPE
compute_mspe_db: Compute permutation-invariant MSPE in dB
evaluate_hdc: Train and evaluate HDC model
"""
import math
import time
from typing import List, Tuple, Optional
from itertools import permutations
import numpy as np
import torch
from torch.utils.data import DataLoader
from .models import HDCAoAModel
from .config import DOAConfig
from .utils import set_seed
def compute_mspe(predictions: np.ndarray, targets: np.ndarray) -> float:
"""Compute Permutation-invariant MSPE (Mean Square Periodic Error).
For multi-source DOA estimation, predictions can be in any order.
This function finds the best permutation and computes MSPE.
MSPE = (1/M) * ||error||^2
Args:
predictions: Predicted angles in radians, shape (N, M) or (N,)
targets: Ground truth angles in radians, shape (N, M) or (N,)
Returns:
Mean MSPE across all samples
Example:
>>> preds = np.array([[0.1, 0.5], [0.2, 0.6]]) # 2 samples, 2 sources
>>> targets = np.array([[0.5, 0.1], [0.6, 0.2]])
>>> loss = compute_mspe(preds, targets) # Order doesn't matter
"""
predictions = np.atleast_2d(predictions)
targets = np.atleast_2d(targets)
total_mspe = 0.0
count = 0
for pred_i, target_i in zip(predictions, targets):
M = len(target_i)
best_mspe = float("inf")
for perm in permutations(pred_i.tolist(), M):
p_arr = np.asarray(perm, dtype=float)
# Wrap angular difference to [-pi/2, pi/2]
err = ((p_arr - target_i) + math.pi / 2) % math.pi - math.pi / 2
mspe = (np.linalg.norm(err) ** 2) / M
best_mspe = min(best_mspe, mspe)
total_mspe += best_mspe
count += 1
return float(total_mspe / max(count, 1))
def compute_mspe_db(predictions: np.ndarray, targets: np.ndarray) -> float:
"""Compute Permutation-invariant MSPE in dB.
Converts MSPE to decibel scale: MSPE_dB = 10 * log10(MSPE)
Args:
predictions: Predicted angles in radians, shape (N, M) or (N,)
targets: Ground truth angles in radians, shape (N, M) or (N,)
Returns:
Mean MSPE in dB across all samples
Example:
>>> preds = np.array([[0.1, 0.5], [0.2, 0.6]])
>>> targets = np.array([[0.5, 0.1], [0.6, 0.2]])
>>> loss_db = compute_mspe_db(preds, targets)
"""
mspe = compute_mspe(predictions, targets)
# Avoid log(0) by clamping to a small positive value
mspe = max(mspe, 1e-12)
return float(10.0 * np.log10(mspe))
def evaluate_hdc(
train_data: List[Tuple[torch.Tensor, torch.Tensor]],
test_data: List[Tuple[torch.Tensor, torch.Tensor]],
config: DOAConfig,
feature_type: str = "lag",
device: str = None,
min_separation_deg: float = 6.0,
n_dimensions: int = 10000,
return_model: bool = False,
verbose: bool = True,
seed: int = 42,
) -> Tuple[float, Optional[HDCAoAModel]]:
"""Train and evaluate HDC model on given datasets.
This is the main entry point for HDC-based DOA estimation.
It handles data loading, model initialization, training, and evaluation.
Args:
train_data: Training dataset as list of (X, Y) tuples
- X: Complex tensor of shape (N, T) - sensor observations
- Y: Tensor of shape (M,) - ground truth DOA in radians
test_data: Test dataset in same format as train_data
config: DOAConfig with system parameters (N, M, T, etc.)
feature_type: Feature extraction method:
- "lag": Mean spatial-lag features
- "spatial_smoothing": Spatial smoothing covariance
device: Compute device ("cuda", "cpu", or None for auto)
min_separation_deg: Minimum peak separation in degrees
n_dimensions: Hypervector dimensionality
return_model: Whether to return the trained model
verbose: Print progress messages
seed: Random seed for reproducibility
Returns:
test_loss: MSPE on test set (dB)
model: Trained HDCAoAModel (if return_model=True, else None)
Example:
>>> config = DOAConfig(N=8, M=2, T=100)
>>> train_data = torch.load("train.pt")
>>> test_data = torch.load("test.pt")
>>> loss, model = evaluate_hdc(train_data, test_data, config, return_model=True)
>>> print(f"Test MSPE: {loss:.2f} dB")
"""
set_seed(seed)
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create data loaders
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)
# Initialize HDC model
hdc = HDCAoAModel(
N=config.N,
M=config.M,
T=config.T,
feature_type=feature_type,
n_dimensions=n_dimensions,
device=torch.device(device),
min_separation_deg=min_separation_deg,
)
# Set training parameters
hdc.epochs = 1
hdc.lr = 0.035
hdc.batch_size = 256 * 16 if device == "cuda" else 64
# Train
if verbose:
print(f"Training HDC ({feature_type})...")
start_time = time.time()
hdc.train_from_dataloader(train_loader)
train_time = time.time() - start_time
if verbose:
print(f" Training time: {train_time:.2f}s")
# Evaluate - collect predictions and targets
start_time = time.time()
all_preds = []
all_targets = []
for Xb, Yb in test_loader:
preds = hdc.predict(Xb)
all_preds.append(preds)
all_targets.append(Yb.detach().cpu().numpy())
all_preds = np.concatenate(all_preds, axis=0)
all_targets = np.concatenate(all_targets, axis=0)
# Compute MSPE in dB
test_mspe_db = compute_mspe_db(all_preds, all_targets)
eval_time = time.time() - start_time
if verbose:
print(f" Evaluation time: {eval_time:.2f}s")
print(f" Test MSPE: {test_mspe_db:.2f} dB")
if return_model:
return float(test_mspe_db), hdc
return float(test_mspe_db), None
def save_checkpoint(model: HDCAoAModel, path: str, meta: dict = None) -> None:
"""Save HDC model checkpoint.
Args:
model: Trained HDCAoAModel
path: Output file path (.pt)
meta: Optional metadata dictionary
"""
checkpoint = {
"state_dict": model.state_dict(),
"meta": {
"N": model.N,
"M": model.M,
"T": model.T,
"feature_type": model.feature_type,
"n_dimensions": model.n_dimensions,
"min_angle": model.min_angle,
"max_angle": model.max_angle,
"precision": model.precision,
"min_separation_deg": model.min_separation_deg,
**(meta or {}),
},
}
torch.save(checkpoint, path)
def load_checkpoint(path: str, device: str = None) -> Tuple[HDCAoAModel, dict]:
"""Load HDC model from checkpoint.
Args:
path: Checkpoint file path (.pt)
device: Target device
Returns:
Tuple of (model, metadata)
"""
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
checkpoint = torch.load(path, map_location=device, weights_only=False)
meta = checkpoint.get("meta", {})
state_dict = checkpoint.get("state_dict", checkpoint)
model = HDCAoAModel(
N=meta.get("N", 8),
M=meta.get("M", 2),
T=meta.get("T", 100),
feature_type=meta.get("feature_type", "lag"),
n_dimensions=meta.get("n_dimensions", 10000),
min_angle=meta.get("min_angle", -90.0),
max_angle=meta.get("max_angle", 90.0),
precision=meta.get("precision", 0.1),
min_separation_deg=meta.get("min_separation_deg", 6.0),
device=torch.device(device),
)
model.load_state_dict(state_dict, strict=False)
return model, meta