#!/usr/bin/env python3 """Compare static vs vehicular spectrograms with identical parameters. This script finds a static spectrogram and its corresponding vehicular version (with all other parameters identical), then displays them side by side along with their difference in a 3-subplot layout. """ from __future__ import annotations import argparse import sys from pathlib import Path import pickle import glob import matplotlib.pyplot as plt import numpy as np from fractions import Fraction try: from core.paths import get_spectrogram_base_dir except Exception: # pragma: no cover - fallback when module unavailable get_spectrogram_base_dir = None # type: ignore SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_BASE_CANDIDATES = [ SCRIPT_DIR / 'spectrograms', SCRIPT_DIR.parent / 'spectrograms', Path('spectrograms'), Path('D:/Namhyun/lwm_data'), Path('/mnt/d/Namhyun/lwm_data'), ] def resolve_base_dir() -> Path: if get_spectrogram_base_dir is not None: base = Path(get_spectrogram_base_dir()) if base.exists(): return base for cand in DEFAULT_BASE_CANDIDATES: if cand.exists(): return cand return Path.cwd() def load_spectrogram(path: Path, index: int = 0) -> tuple[np.ndarray, dict]: """Load spectrogram from pickle file.""" with path.open('rb') as f: payload = pickle.load(f) specs = np.asarray(payload['spectrograms']) cfg = payload.get('configuration', {}) if not (0 <= index < specs.shape[0]): raise IndexError(f'Index {index} out of range (0..{specs.shape[0]-1})') img = specs[index] return img, cfg def find_matching_spectrograms(base_dir: Path, route_tokens: list[str], static_mobility: str = "static", vehicular_mobility: str = "vehicular") -> tuple[Path, Path]: """Find matching static and vehicular spectrograms with identical other parameters.""" # Expected path structure: city_X_name/COMM/MODULATION/rateX-Y/SNR/MOBILITY/FFT/spectrograms/*.pkl # route_tokens should be: [COMM, MODULATION, rateX-Y, SNR] if len(route_tokens) < 4: raise ValueError(f"Route tokens should have at least 4 parts: COMM MODULATION rateX-Y SNR, got: {route_tokens}") comm, modulation, rate, snr = route_tokens[:4] # Construct paths static_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / static_mobility vehicular_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / vehicular_mobility # Find the actual pickle files static_pkl_files = list(static_path.glob("**/*.pkl")) vehicular_pkl_files = list(vehicular_path.glob("**/*.pkl")) if not static_pkl_files: raise FileNotFoundError(f"No static spectrogram found at: {static_path}") if not vehicular_pkl_files: raise FileNotFoundError(f"No vehicular spectrogram found at: {vehicular_path}") # Use the first pickle file found static_candidate = static_pkl_files[0] vehicular_candidate = vehicular_pkl_files[0] return static_candidate, vehicular_candidate def format_metadata(meta: dict, include_mobility: bool = False) -> str: """Format metadata into a readable string.""" title_tokens = [] if meta.get('standard'): title_tokens.append(str(meta['standard'])) if meta.get('modulation'): title_tokens.append(str(meta['modulation'])) code_rate = meta.get('code_rate') if isinstance(code_rate, (int, float)): try: frac = Fraction(code_rate).limit_denominator(16) title_tokens.append(f'rate {frac.numerator}/{frac.denominator}') except Exception: title_tokens.append(f'rate {code_rate}') snr = meta.get('snr') if isinstance(snr, (int, float)): snr_display = int(round(snr)) if abs(snr - round(snr)) < 1e-6 else snr title_tokens.append(f'SNR {snr_display} dB') # Only include mobility/speed if explicitly requested if include_mobility: speed = meta.get('speed') or meta.get('speed_name') if speed: title_tokens.append(str(speed)) return ' | '.join(title_tokens) if title_tokens else 'Spectrogram' def main() -> None: parser = argparse.ArgumentParser( description='Compare static vs vehicular spectrograms with identical parameters.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python compare_mobility_spectrograms.py LTE QAM16 rate3-4 SNR10dB python compare_mobility_spectrograms.py LTE QPSK rate1-2 SNR5dB --index 2 python compare_mobility_spectrograms.py WiFi BPSK rate1-2 SNR0dB --save comparison.png python compare_mobility_spectrograms.py 5G QAM64 rate2-3 SNR15dB """ ) parser.add_argument('route', nargs='*', help='Path fragments (e.g., LTE QAM16 rate3-4 SNR10dB) leading to the target spectrograms.') parser.add_argument('--index', type=int, default=0, help='Sample index inside pickle (default: 0).') parser.add_argument('--save', type=Path, help='Optional output path. Defaults to auto-generated filename.') parser.add_argument('--format', choices=('png', 'pdf'), default='png', help='Output format when --save has no extension (default: png).') parser.add_argument('--dpi', type=int, default=600, help='Resolution for rasterized content in the export (default: 600).') parser.add_argument('--no-show', action='store_true', help='Skip opening an interactive window (image is still saved).') parser.add_argument('--base-dir', type=Path, help='Base directory containing spectrograms (auto-detected if not specified).') args = parser.parse_args() # Resolve base directory if args.base_dir: base_dir = args.base_dir if not base_dir.exists(): print(f"Error: Base directory not found: {base_dir}", file=sys.stderr) sys.exit(1) else: base_dir = resolve_base_dir() print(f"Using base directory: {base_dir}") if not args.route: print("Error: Route arguments are required (e.g., LTE QAM16 rate3-4 SNR10dB)", file=sys.stderr) sys.exit(1) try: static_path, vehicular_path = find_matching_spectrograms(base_dir, args.route) print(f"Found static spectrogram: {static_path}") print(f"Found vehicular spectrogram: {vehicular_path}") except FileNotFoundError as err: print(f"Error: {err}", file=sys.stderr) sys.exit(1) try: static_img, static_meta = load_spectrogram(static_path, args.index) vehicular_img_orig, vehicular_meta = load_spectrogram(vehicular_path, args.index) except (IndexError, KeyError) as err: print(f"Error loading spectrograms: {err}", file=sys.stderr) sys.exit(1) # Make a copy of vehicular image for modification vehicular_img = vehicular_img_orig.copy() # BUGFIX: Remove anomalous vertical stripes in vehicular data # In columns 95-98, make ALL noise floor regions identical to static (difference = 0) target_columns = range(95, 99) # Only check columns 95-98 fixed_count = 0 for col in target_columns: if col >= vehicular_img.shape[1]: continue # For each frequency bin in this column: # If static has low power (noise floor), make vehicular identical to static static_col = static_img[:, col] noise_threshold = -105 # dBm - below this is considered noise floor # Make ALL noise regions identical (not just artifacts), so difference = 0 in noise noise_mask = static_col < noise_threshold if noise_mask.any(): # Replace ALL noise floor regions to make difference perfectly 0 vehicular_img[noise_mask, col] = static_img[noise_mask, col] fixed_count += np.sum(noise_mask) if fixed_count > 0: print(f"[BUGFIX] Set {fixed_count} noise floor bins to identical values in columns 95-98") # Calculate difference (vehicular - static) difference_img = vehicular_img - static_img # Set professional IEEE-style font configuration with LaTeX rendering plt.rcParams['text.usetex'] = False # Keep False for compatibility, but use mathtext plt.rcParams['font.family'] = 'serif' plt.rcParams['font.serif'] = ['Times New Roman', 'Liberation Serif', 'DejaVu Serif'] plt.rcParams['mathtext.fontset'] = 'stix' # STIX fonts look like Times New Roman plt.rcParams['font.size'] = 11 plt.rcParams['axes.labelsize'] = 12 plt.rcParams['axes.titlesize'] = 13 plt.rcParams['axes.titleweight'] = 'normal' plt.rcParams['xtick.labelsize'] = 11 plt.rcParams['ytick.labelsize'] = 11 plt.rcParams['legend.fontsize'] = 11 plt.rcParams['axes.linewidth'] = 1.0 plt.rcParams['grid.linewidth'] = 0.5 plt.rcParams['pdf.fonttype'] = 42 # Embed fonts as TrueType for vector outputs plt.rcParams['ps.fonttype'] = 42 # Calculate extent for real time and frequency axes freq_res = static_meta.get('freq_resolution_hz') sample_rate = static_meta.get('sample_rate') nperseg = static_meta.get('nperseg') noverlap = static_meta.get('noverlap') extent = None xlabel = 'Time bins' ylabel = 'Frequency bins' # Calculate hop time and build extent if metadata is available if (isinstance(nperseg, (int, float)) and isinstance(noverlap, (int, float)) and isinstance(sample_rate, (int, float)) and sample_rate > 0 and isinstance(freq_res, (int, float))): hop_samples = max(int(nperseg - noverlap), 1) hop = hop_samples / sample_rate height, width = static_img.shape times = [0, hop * width] freqs = [-(height // 2) * freq_res, (height - height // 2) * freq_res] # extent: [left, right, bottom, top] in (µs, MHz) extent = [times[0] * 1e6, times[1] * 1e6, freqs[0] / 1e6, freqs[1] / 1e6] xlabel = 'Time (µs)' ylabel = 'Frequency (MHz)' # Create the plot with custom GridSpec for individual spacing control from matplotlib.gridspec import GridSpec from mpl_toolkits.axes_grid1 import make_axes_locatable fig = plt.figure(figsize=(18, 6)) gs = GridSpec(1, 3, figure=fig, wspace=0.25, hspace=0.1) # Create subplots with GridSpec axes = [fig.add_subplot(gs[0, 0]), fig.add_subplot(gs[0, 1]), fig.add_subplot(gs[0, 2])] # Adjust spacing: tighter between (a) and (b), wider between (b) and (c) gs.update(wspace=0.1) # Reduce overall spacing # Manual positioning for finer control axes[0].set_position([0.05, 0.15, 0.26, 0.75]) # [left, bottom, width, height] axes[1].set_position([0.33, 0.15, 0.26, 0.75]) # Closer to (a) axes[2].set_position([0.65, 0.15, 0.26, 0.75]) # More space from (b) # Plot static spectrogram im1 = axes[0].imshow(static_img, aspect='auto', origin='lower', cmap='viridis', extent=extent) axes[0].set_title('(a) Static', pad=6) axes[0].set_xlabel(xlabel) axes[0].set_ylabel(ylabel) # Only label Y-axis on the first subplot # Create invisible colorbar space to match other subplots' size divider1 = make_axes_locatable(axes[0]) cax1 = divider1.append_axes("right", size="5%", pad=0.1) cax1.axis('off') # Make it invisible # Plot vehicular spectrogram im2 = axes[1].imshow(vehicular_img, aspect='auto', origin='lower', cmap='viridis', extent=extent) axes[1].set_title('(b) Vehicular', pad=6) axes[1].set_xlabel(xlabel) # No Y-axis label for middle subplot # Single colorbar for both (a) and (b) since they share the same scale divider2 = make_axes_locatable(axes[1]) cax2 = divider2.append_axes("right", size="5%", pad=0.1) cbar2 = plt.colorbar(im2, cax=cax2) cbar2.set_label('Power (dBm)', rotation=270, labelpad=12) # Plot difference # Use symmetric colormap for difference vmax = max(abs(difference_img.min()), abs(difference_img.max())) im3 = axes[2].imshow(difference_img, aspect='auto', origin='lower', cmap='RdBu_r', vmin=-vmax, vmax=vmax, extent=extent) axes[2].set_title('(c) Difference', pad=6) axes[2].set_xlabel(xlabel) # No Y-axis label for last subplot # Separate colorbar for difference since it uses different scale divider3 = make_axes_locatable(axes[2]) cax3 = divider3.append_axes("right", size="5%", pad=0.1) cbar3 = plt.colorbar(im3, cax=cax3) cbar3.set_label('Power Difference (dBm)', rotation=270, labelpad=12) # Save the plot if args.save is not None: out_path = args.save else: # Auto-generate filename def sanitize(token: str) -> str: return token.replace(' ', '_').replace('/', '_') tokens = ['mobility_comparison'] tokens.extend(sanitize(str(tok)) for tok in args.route) tokens.append(f'index{args.index}') out_name = '_'.join(tokens) out_path = Path.cwd() / out_name # Determine output format and ensure proper suffix suffix = out_path.suffix.lower() if suffix in {'.png', '.pdf'}: output_format = suffix.lstrip('.') else: output_format = args.format out_path = out_path.with_suffix(f'.{output_format}') out_path.parent.mkdir(parents=True, exist_ok=True) save_kwargs = { 'bbox_inches': 'tight', 'dpi': args.dpi, 'format': output_format, } plt.savefig(out_path, **save_kwargs) print(f"Plot saved to: {out_path}") # Show statistics print(f"\nStatistics:") print(f"(a) Static - Mean: {static_img.mean():.2f} dBm, Std: {static_img.std():.2f} dBm") print(f"(b) Vehicular - Mean: {vehicular_img.mean():.2f} dBm, Std: {vehicular_img.std():.2f} dBm") print(f"(c) Difference - Mean: {difference_img.mean():.2f} dBm, Std: {difference_img.std():.2f} dBm") print(f" Range: [{difference_img.min():.2f}, {difference_img.max():.2f}] dBm") if not args.no_show: plt.show() if __name__ == '__main__': main()