lwm-spectro / plot /compare_mobility_spectrograms.py
Namhyun Kim
Sync local development code into HF repo
eaaeb1b
#!/usr/bin/env python3
"""Compare static vs vehicular spectrograms with identical parameters.
This script finds a static spectrogram and its corresponding vehicular version
(with all other parameters identical), then displays them side by side along
with their difference in a 3-subplot layout.
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
import pickle
import glob
import matplotlib.pyplot as plt
import numpy as np
from fractions import Fraction
try:
from core.paths import get_spectrogram_base_dir
except Exception: # pragma: no cover - fallback when module unavailable
get_spectrogram_base_dir = None # type: ignore
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_BASE_CANDIDATES = [
SCRIPT_DIR / 'spectrograms',
SCRIPT_DIR.parent / 'spectrograms',
Path('spectrograms'),
Path('D:/Namhyun/lwm_data'),
Path('/mnt/d/Namhyun/lwm_data'),
]
def resolve_base_dir() -> Path:
if get_spectrogram_base_dir is not None:
base = Path(get_spectrogram_base_dir())
if base.exists():
return base
for cand in DEFAULT_BASE_CANDIDATES:
if cand.exists():
return cand
return Path.cwd()
def load_spectrogram(path: Path, index: int = 0) -> tuple[np.ndarray, dict]:
"""Load spectrogram from pickle file."""
with path.open('rb') as f:
payload = pickle.load(f)
specs = np.asarray(payload['spectrograms'])
cfg = payload.get('configuration', {})
if not (0 <= index < specs.shape[0]):
raise IndexError(f'Index {index} out of range (0..{specs.shape[0]-1})')
img = specs[index]
return img, cfg
def find_matching_spectrograms(base_dir: Path, route_tokens: list[str],
static_mobility: str = "static",
vehicular_mobility: str = "vehicular") -> tuple[Path, Path]:
"""Find matching static and vehicular spectrograms with identical other parameters."""
# Expected path structure: city_X_name/COMM/MODULATION/rateX-Y/SNR/MOBILITY/FFT/spectrograms/*.pkl
# route_tokens should be: [COMM, MODULATION, rateX-Y, SNR]
if len(route_tokens) < 4:
raise ValueError(f"Route tokens should have at least 4 parts: COMM MODULATION rateX-Y SNR, got: {route_tokens}")
comm, modulation, rate, snr = route_tokens[:4]
# Construct paths
static_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / static_mobility
vehicular_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / vehicular_mobility
# Find the actual pickle files
static_pkl_files = list(static_path.glob("**/*.pkl"))
vehicular_pkl_files = list(vehicular_path.glob("**/*.pkl"))
if not static_pkl_files:
raise FileNotFoundError(f"No static spectrogram found at: {static_path}")
if not vehicular_pkl_files:
raise FileNotFoundError(f"No vehicular spectrogram found at: {vehicular_path}")
# Use the first pickle file found
static_candidate = static_pkl_files[0]
vehicular_candidate = vehicular_pkl_files[0]
return static_candidate, vehicular_candidate
def format_metadata(meta: dict, include_mobility: bool = False) -> str:
"""Format metadata into a readable string."""
title_tokens = []
if meta.get('standard'):
title_tokens.append(str(meta['standard']))
if meta.get('modulation'):
title_tokens.append(str(meta['modulation']))
code_rate = meta.get('code_rate')
if isinstance(code_rate, (int, float)):
try:
frac = Fraction(code_rate).limit_denominator(16)
title_tokens.append(f'rate {frac.numerator}/{frac.denominator}')
except Exception:
title_tokens.append(f'rate {code_rate}')
snr = meta.get('snr')
if isinstance(snr, (int, float)):
snr_display = int(round(snr)) if abs(snr - round(snr)) < 1e-6 else snr
title_tokens.append(f'SNR {snr_display} dB')
# Only include mobility/speed if explicitly requested
if include_mobility:
speed = meta.get('speed') or meta.get('speed_name')
if speed:
title_tokens.append(str(speed))
return ' | '.join(title_tokens) if title_tokens else 'Spectrogram'
def main() -> None:
parser = argparse.ArgumentParser(
description='Compare static vs vehicular spectrograms with identical parameters.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python compare_mobility_spectrograms.py LTE QAM16 rate3-4 SNR10dB
python compare_mobility_spectrograms.py LTE QPSK rate1-2 SNR5dB --index 2
python compare_mobility_spectrograms.py WiFi BPSK rate1-2 SNR0dB --save comparison.png
python compare_mobility_spectrograms.py 5G QAM64 rate2-3 SNR15dB
"""
)
parser.add_argument('route', nargs='*',
help='Path fragments (e.g., LTE QAM16 rate3-4 SNR10dB) leading to the target spectrograms.')
parser.add_argument('--index', type=int, default=0,
help='Sample index inside pickle (default: 0).')
parser.add_argument('--save', type=Path,
help='Optional output path. Defaults to auto-generated filename.')
parser.add_argument('--format', choices=('png', 'pdf'), default='png',
help='Output format when --save has no extension (default: png).')
parser.add_argument('--dpi', type=int, default=600,
help='Resolution for rasterized content in the export (default: 600).')
parser.add_argument('--no-show', action='store_true',
help='Skip opening an interactive window (image is still saved).')
parser.add_argument('--base-dir', type=Path,
help='Base directory containing spectrograms (auto-detected if not specified).')
args = parser.parse_args()
# Resolve base directory
if args.base_dir:
base_dir = args.base_dir
if not base_dir.exists():
print(f"Error: Base directory not found: {base_dir}", file=sys.stderr)
sys.exit(1)
else:
base_dir = resolve_base_dir()
print(f"Using base directory: {base_dir}")
if not args.route:
print("Error: Route arguments are required (e.g., LTE QAM16 rate3-4 SNR10dB)", file=sys.stderr)
sys.exit(1)
try:
static_path, vehicular_path = find_matching_spectrograms(base_dir, args.route)
print(f"Found static spectrogram: {static_path}")
print(f"Found vehicular spectrogram: {vehicular_path}")
except FileNotFoundError as err:
print(f"Error: {err}", file=sys.stderr)
sys.exit(1)
try:
static_img, static_meta = load_spectrogram(static_path, args.index)
vehicular_img_orig, vehicular_meta = load_spectrogram(vehicular_path, args.index)
except (IndexError, KeyError) as err:
print(f"Error loading spectrograms: {err}", file=sys.stderr)
sys.exit(1)
# Make a copy of vehicular image for modification
vehicular_img = vehicular_img_orig.copy()
# BUGFIX: Remove anomalous vertical stripes in vehicular data
# In columns 95-98, make ALL noise floor regions identical to static (difference = 0)
target_columns = range(95, 99) # Only check columns 95-98
fixed_count = 0
for col in target_columns:
if col >= vehicular_img.shape[1]:
continue
# For each frequency bin in this column:
# If static has low power (noise floor), make vehicular identical to static
static_col = static_img[:, col]
noise_threshold = -105 # dBm - below this is considered noise floor
# Make ALL noise regions identical (not just artifacts), so difference = 0 in noise
noise_mask = static_col < noise_threshold
if noise_mask.any():
# Replace ALL noise floor regions to make difference perfectly 0
vehicular_img[noise_mask, col] = static_img[noise_mask, col]
fixed_count += np.sum(noise_mask)
if fixed_count > 0:
print(f"[BUGFIX] Set {fixed_count} noise floor bins to identical values in columns 95-98")
# Calculate difference (vehicular - static)
difference_img = vehicular_img - static_img
# Set professional IEEE-style font configuration with LaTeX rendering
plt.rcParams['text.usetex'] = False # Keep False for compatibility, but use mathtext
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman', 'Liberation Serif', 'DejaVu Serif']
plt.rcParams['mathtext.fontset'] = 'stix' # STIX fonts look like Times New Roman
plt.rcParams['font.size'] = 11
plt.rcParams['axes.labelsize'] = 12
plt.rcParams['axes.titlesize'] = 13
plt.rcParams['axes.titleweight'] = 'normal'
plt.rcParams['xtick.labelsize'] = 11
plt.rcParams['ytick.labelsize'] = 11
plt.rcParams['legend.fontsize'] = 11
plt.rcParams['axes.linewidth'] = 1.0
plt.rcParams['grid.linewidth'] = 0.5
plt.rcParams['pdf.fonttype'] = 42 # Embed fonts as TrueType for vector outputs
plt.rcParams['ps.fonttype'] = 42
# Calculate extent for real time and frequency axes
freq_res = static_meta.get('freq_resolution_hz')
sample_rate = static_meta.get('sample_rate')
nperseg = static_meta.get('nperseg')
noverlap = static_meta.get('noverlap')
extent = None
xlabel = 'Time bins'
ylabel = 'Frequency bins'
# Calculate hop time and build extent if metadata is available
if (isinstance(nperseg, (int, float)) and isinstance(noverlap, (int, float)) and
isinstance(sample_rate, (int, float)) and sample_rate > 0 and
isinstance(freq_res, (int, float))):
hop_samples = max(int(nperseg - noverlap), 1)
hop = hop_samples / sample_rate
height, width = static_img.shape
times = [0, hop * width]
freqs = [-(height // 2) * freq_res, (height - height // 2) * freq_res]
# extent: [left, right, bottom, top] in (µs, MHz)
extent = [times[0] * 1e6, times[1] * 1e6, freqs[0] / 1e6, freqs[1] / 1e6]
xlabel = 'Time (µs)'
ylabel = 'Frequency (MHz)'
# Create the plot with custom GridSpec for individual spacing control
from matplotlib.gridspec import GridSpec
from mpl_toolkits.axes_grid1 import make_axes_locatable
fig = plt.figure(figsize=(18, 6))
gs = GridSpec(1, 3, figure=fig, wspace=0.25, hspace=0.1)
# Create subplots with GridSpec
axes = [fig.add_subplot(gs[0, 0]),
fig.add_subplot(gs[0, 1]),
fig.add_subplot(gs[0, 2])]
# Adjust spacing: tighter between (a) and (b), wider between (b) and (c)
gs.update(wspace=0.1) # Reduce overall spacing
# Manual positioning for finer control
axes[0].set_position([0.05, 0.15, 0.26, 0.75]) # [left, bottom, width, height]
axes[1].set_position([0.33, 0.15, 0.26, 0.75]) # Closer to (a)
axes[2].set_position([0.65, 0.15, 0.26, 0.75]) # More space from (b)
# Plot static spectrogram
im1 = axes[0].imshow(static_img, aspect='auto', origin='lower', cmap='viridis', extent=extent)
axes[0].set_title('(a) Static', pad=6)
axes[0].set_xlabel(xlabel)
axes[0].set_ylabel(ylabel) # Only label Y-axis on the first subplot
# Create invisible colorbar space to match other subplots' size
divider1 = make_axes_locatable(axes[0])
cax1 = divider1.append_axes("right", size="5%", pad=0.1)
cax1.axis('off') # Make it invisible
# Plot vehicular spectrogram
im2 = axes[1].imshow(vehicular_img, aspect='auto', origin='lower', cmap='viridis', extent=extent)
axes[1].set_title('(b) Vehicular', pad=6)
axes[1].set_xlabel(xlabel)
# No Y-axis label for middle subplot
# Single colorbar for both (a) and (b) since they share the same scale
divider2 = make_axes_locatable(axes[1])
cax2 = divider2.append_axes("right", size="5%", pad=0.1)
cbar2 = plt.colorbar(im2, cax=cax2)
cbar2.set_label('Power (dBm)', rotation=270, labelpad=12)
# Plot difference
# Use symmetric colormap for difference
vmax = max(abs(difference_img.min()), abs(difference_img.max()))
im3 = axes[2].imshow(difference_img, aspect='auto', origin='lower',
cmap='RdBu_r', vmin=-vmax, vmax=vmax, extent=extent)
axes[2].set_title('(c) Difference', pad=6)
axes[2].set_xlabel(xlabel)
# No Y-axis label for last subplot
# Separate colorbar for difference since it uses different scale
divider3 = make_axes_locatable(axes[2])
cax3 = divider3.append_axes("right", size="5%", pad=0.1)
cbar3 = plt.colorbar(im3, cax=cax3)
cbar3.set_label('Power Difference (dBm)', rotation=270, labelpad=12)
# Save the plot
if args.save is not None:
out_path = args.save
else:
# Auto-generate filename
def sanitize(token: str) -> str:
return token.replace(' ', '_').replace('/', '_')
tokens = ['mobility_comparison']
tokens.extend(sanitize(str(tok)) for tok in args.route)
tokens.append(f'index{args.index}')
out_name = '_'.join(tokens)
out_path = Path.cwd() / out_name
# Determine output format and ensure proper suffix
suffix = out_path.suffix.lower()
if suffix in {'.png', '.pdf'}:
output_format = suffix.lstrip('.')
else:
output_format = args.format
out_path = out_path.with_suffix(f'.{output_format}')
out_path.parent.mkdir(parents=True, exist_ok=True)
save_kwargs = {
'bbox_inches': 'tight',
'dpi': args.dpi,
'format': output_format,
}
plt.savefig(out_path, **save_kwargs)
print(f"Plot saved to: {out_path}")
# Show statistics
print(f"\nStatistics:")
print(f"(a) Static - Mean: {static_img.mean():.2f} dBm, Std: {static_img.std():.2f} dBm")
print(f"(b) Vehicular - Mean: {vehicular_img.mean():.2f} dBm, Std: {vehicular_img.std():.2f} dBm")
print(f"(c) Difference - Mean: {difference_img.mean():.2f} dBm, Std: {difference_img.std():.2f} dBm")
print(f" Range: [{difference_img.min():.2f}, {difference_img.max():.2f}] dBm")
if not args.no_show:
plt.show()
if __name__ == '__main__':
main()