|
|
|
|
|
"""Visualise Doppler-aware projection embeddings via t-SNE. |
|
|
|
|
|
This utility mirrors the balanced sampling used during Doppler fine-tuning and |
|
|
projects spectrograms through the projection head introduced in |
|
|
the mobility fine-tuning utilities shared across Task 2. The resulting embeddings are meant to |
|
|
highlight mobility separation encouraged by the supervised contrastive loss. |
|
|
|
|
|
Example usage: |
|
|
|
|
|
```bash |
|
|
python task2/plot_projection_tsne.py \ |
|
|
--data-root spectrograms \ |
|
|
--cities city_1_losangeles \ |
|
|
--comm WiFi \ |
|
|
--checkpoint models/doppler_finetuned/wifi/lwm_wifi_doppler_epoch07_val75.99.pth \ |
|
|
--models-root models/WiFi_models \ |
|
|
--samples-per-config 256 \ |
|
|
--save-path task2/figures/wifi_projection_tsne.png \ |
|
|
--report-metrics |
|
|
``` |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Sequence |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
import numpy as np |
|
|
import torch |
|
|
from sklearn.manifold import TSNE |
|
|
from sklearn.metrics import silhouette_score |
|
|
from sklearn.model_selection import StratifiedKFold |
|
|
from sklearn.neighbors import KNeighborsClassifier |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
|
|
|
from task2.mobility_utils import ( |
|
|
MOBILITY_LABELS, |
|
|
LWMClassifierMinimal, |
|
|
_collect_balanced_arrays, |
|
|
gather_controlled_groups, |
|
|
load_dataset_stats, |
|
|
prepare_model, |
|
|
) |
|
|
from task1.train_mcs_models import apply_normalization, set_seed |
|
|
|
|
|
|
|
|
try: |
|
|
from tqdm.auto import tqdm |
|
|
except ImportError: |
|
|
tqdm = None |
|
|
|
|
|
|
|
|
def progress_bar(iterable, **kwargs): |
|
|
if tqdm is None: |
|
|
return iterable |
|
|
return tqdm(iterable, **kwargs) |
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description=__doc__) |
|
|
parser.add_argument("--data-root", default="spectrograms", help="Root directory containing city folders") |
|
|
parser.add_argument( |
|
|
"--cities", |
|
|
nargs="*", |
|
|
default=None, |
|
|
help="City folders to include (default: infer all city_* under data root)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--comm", |
|
|
default="WiFi", |
|
|
help="Communication profile to analyse (e.g., WiFi, LTE, 5G)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--mobilities", |
|
|
nargs="*", |
|
|
default=MOBILITY_LABELS, |
|
|
help="Mobility labels to include (default: %(default)s)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--snrs", |
|
|
nargs="*", |
|
|
default=None, |
|
|
help="Restrict to these SNR folders (default: all available)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--fft-folders", |
|
|
nargs="*", |
|
|
default=None, |
|
|
help="Optional whitelist of FFT/window folders (e.g. win384_ovlp288)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--samples-per-config", |
|
|
type=int, |
|
|
default=256, |
|
|
help="Maximum samples per mobility within a matched configuration (default: %(default)s)", |
|
|
) |
|
|
parser.add_argument("--perplexity", type=int, default=30, help="t-SNE perplexity (default: %(default)s)") |
|
|
parser.add_argument("--seed", type=int, default=42, help="Random seed") |
|
|
parser.add_argument( |
|
|
"--batch-size", |
|
|
type=int, |
|
|
default=256, |
|
|
help="Batch size when embedding spectrograms (default: %(default)s)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--checkpoint", |
|
|
required=True, |
|
|
type=Path, |
|
|
help="Fine-tuned checkpoint containing the projection head", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--models-root", |
|
|
type=Path, |
|
|
default=None, |
|
|
help="Directory containing dataset_stats.json (default: infer from checkpoint parent)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output-root", |
|
|
type=Path, |
|
|
default=Path("task2/figures"), |
|
|
help="Root directory where the figure will be written", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--save-path", |
|
|
type=Path, |
|
|
default=None, |
|
|
help="Optional explicit path for the output figure", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--report-metrics", |
|
|
action="store_true", |
|
|
help="Print silhouette and 5-NN accuracy metrics", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--metrics-only", |
|
|
action="store_true", |
|
|
help="Report metrics and exit without writing the t-SNE figure", |
|
|
) |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def discover_cities(data_root: Path) -> List[str]: |
|
|
if not data_root.exists(): |
|
|
return [] |
|
|
return sorted([p.name for p in data_root.iterdir() if p.is_dir() and p.name.startswith("city_")]) |
|
|
|
|
|
|
|
|
def load_projection_embeddings( |
|
|
checkpoint: Path, |
|
|
stats: Dict[str, float | str], |
|
|
data_root: Path, |
|
|
cities: Sequence[str], |
|
|
comm: str, |
|
|
mobilities: Sequence[str], |
|
|
snrs: Sequence[str] | None, |
|
|
fft_folders: Sequence[str] | None, |
|
|
samples_per_config: int, |
|
|
seed: int, |
|
|
batch_size: int, |
|
|
) -> tuple[np.ndarray, np.ndarray]: |
|
|
rng = np.random.default_rng(seed) |
|
|
groups = gather_controlled_groups( |
|
|
data_root=data_root, |
|
|
cities=cities, |
|
|
comm=comm, |
|
|
mobilities=mobilities, |
|
|
snrs=snrs, |
|
|
fft_whitelist=fft_folders, |
|
|
) |
|
|
specs, labels, meta = _collect_balanced_arrays( |
|
|
groups, |
|
|
mobilities=mobilities, |
|
|
max_per_config=samples_per_config, |
|
|
rng=rng, |
|
|
) |
|
|
per_mobility_summary = ", ".join(f"{mob}:{count}" for mob, count in meta["per_mobility"].items()) |
|
|
print( |
|
|
f"[INFO] ({comm}) Matched configs={meta['matched_configs']} | samples per mobility -> {per_mobility_summary}" |
|
|
) |
|
|
if meta["preview_configs"]: |
|
|
example = ["/".join(cfg) for cfg in meta["preview_configs"]] |
|
|
print(f"[INFO] ({comm}) Example configs: {', '.join(example)}") |
|
|
|
|
|
normalized = apply_normalization(specs, stats) |
|
|
ordering = rng.permutation(normalized.shape[0]) |
|
|
normalized = normalized[ordering] |
|
|
labels = labels[ordering] |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model = prepare_model( |
|
|
checkpoint=checkpoint, |
|
|
num_classes=len(mobilities), |
|
|
classifier_dim=128, |
|
|
dropout=0.1, |
|
|
trainable_layers=0, |
|
|
projection_dim=128, |
|
|
).to(device) |
|
|
model.eval() |
|
|
|
|
|
embeddings: List[np.ndarray] = [] |
|
|
label_batches: List[np.ndarray] = [] |
|
|
tensor = torch.from_numpy(normalized) |
|
|
with torch.no_grad(): |
|
|
iterator = progress_bar( |
|
|
torch.split(tensor, batch_size), |
|
|
desc=f"{comm} projection", |
|
|
leave=False, |
|
|
) |
|
|
for idx, batch in enumerate(iterator): |
|
|
batch = batch.to(device) |
|
|
logits, proj = model(batch, return_projection=True) |
|
|
embeddings.append(proj.cpu().numpy()) |
|
|
label_batches.append(labels[idx * batch_size : idx * batch_size + batch.size(0)]) |
|
|
embeddings_np = np.concatenate(embeddings, axis=0) |
|
|
labels_np = np.concatenate(label_batches, axis=0) |
|
|
return embeddings_np, labels_np |
|
|
|
|
|
|
|
|
def compute_metrics(name: str, features: np.ndarray, labels: np.ndarray) -> None: |
|
|
unique = np.unique(labels) |
|
|
if unique.size < 2: |
|
|
print(f"[METRIC] {name}: skipped (only one class present)") |
|
|
return |
|
|
scaler = StandardScaler() |
|
|
features_scaled = scaler.fit_transform(features) |
|
|
silhouette = silhouette_score(features_scaled, labels) |
|
|
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) |
|
|
scores: List[float] = [] |
|
|
for train_idx, test_idx in skf.split(features_scaled, labels): |
|
|
clf = KNeighborsClassifier(n_neighbors=5) |
|
|
clf.fit(features_scaled[train_idx], labels[train_idx]) |
|
|
scores.append(clf.score(features_scaled[test_idx], labels[test_idx])) |
|
|
mean_acc = float(np.mean(scores)) |
|
|
std_acc = float(np.std(scores)) |
|
|
print( |
|
|
f"[METRIC] {name}: silhouette={silhouette:.3f}, " |
|
|
f"5-NN accuracy={mean_acc:.3f} ± {std_acc:.3f}" |
|
|
) |
|
|
|
|
|
|
|
|
def run_tsne(features: np.ndarray, labels: np.ndarray, perplexity: int) -> np.ndarray: |
|
|
scaler = StandardScaler() |
|
|
features_scaled = scaler.fit_transform(features) |
|
|
perplexity = max(5, min(perplexity, len(features_scaled) - 1)) |
|
|
tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42) |
|
|
return tsne.fit_transform(features_scaled) |
|
|
|
|
|
|
|
|
def plot_embedding(embedding: np.ndarray, labels: np.ndarray, title: str, save_path: Path) -> None: |
|
|
classes = np.unique(labels) |
|
|
colors = plt.cm.Set2(np.linspace(0, 1, len(classes))) |
|
|
fig, ax = plt.subplots(figsize=(9, 7)) |
|
|
for color, cls in zip(colors, classes): |
|
|
mask = labels == cls |
|
|
ax.scatter( |
|
|
embedding[mask, 0], |
|
|
embedding[mask, 1], |
|
|
c=[color], |
|
|
s=18, |
|
|
alpha=0.7, |
|
|
label=str(cls), |
|
|
) |
|
|
ax.set_title(title, fontsize=14, fontweight="bold") |
|
|
ax.set_xlabel("t-SNE Component 1", fontsize=12) |
|
|
ax.set_ylabel("t-SNE Component 2", fontsize=12) |
|
|
ax.grid(True, alpha=0.3) |
|
|
ax.legend(bbox_to_anchor=(1.02, 1), loc="upper left", fontsize=9) |
|
|
fig.tight_layout() |
|
|
save_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
fig.savefig(save_path, dpi=300, bbox_inches="tight") |
|
|
plt.close(fig) |
|
|
print(f"[INFO] Figure saved to {save_path}") |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
args = parse_args() |
|
|
set_seed(args.seed) |
|
|
|
|
|
data_root = Path(args.data_root) |
|
|
cities = args.cities if args.cities else discover_cities(data_root) |
|
|
if not cities: |
|
|
raise FileNotFoundError(f"No city directories found under {data_root}") |
|
|
|
|
|
if not args.checkpoint.exists(): |
|
|
raise FileNotFoundError(f"Checkpoint not found: {args.checkpoint}") |
|
|
|
|
|
if args.models_root is not None: |
|
|
stats_dir = args.models_root |
|
|
else: |
|
|
stats_dir = args.checkpoint.parent |
|
|
stats = load_dataset_stats(stats_dir) |
|
|
|
|
|
embeddings, labels = load_projection_embeddings( |
|
|
checkpoint=args.checkpoint, |
|
|
stats=stats, |
|
|
data_root=data_root, |
|
|
cities=cities, |
|
|
comm=args.comm, |
|
|
mobilities=args.mobilities, |
|
|
snrs=args.snrs, |
|
|
fft_folders=args.fft_folders, |
|
|
samples_per_config=args.samples_per_config, |
|
|
seed=args.seed, |
|
|
batch_size=args.batch_size, |
|
|
) |
|
|
label_names = np.array([args.mobilities[idx] for idx in labels]) |
|
|
|
|
|
if args.report_metrics: |
|
|
compute_metrics("Projection embeddings", embeddings, label_names) |
|
|
if args.metrics_only: |
|
|
return |
|
|
|
|
|
coords = run_tsne(embeddings, label_names, args.perplexity) |
|
|
|
|
|
if args.save_path is not None: |
|
|
save_path = args.save_path |
|
|
else: |
|
|
comm_suffix = args.comm.lower() |
|
|
save_path = args.output_root / f"projection_tsne_{comm_suffix}.png" |
|
|
|
|
|
title = f"Projection Head t-SNE ({args.comm})" |
|
|
plot_embedding(coords, label_names, title, save_path) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|