|
|
|
|
|
"""Compare static vs vehicular spectrograms with identical parameters. |
|
|
|
|
|
This script finds a static spectrogram and its corresponding vehicular version |
|
|
(with all other parameters identical), then displays them side by side along |
|
|
with their difference in a 3-subplot layout. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import sys |
|
|
from pathlib import Path |
|
|
import pickle |
|
|
import glob |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
import numpy as np |
|
|
from fractions import Fraction |
|
|
|
|
|
try: |
|
|
from core.paths import get_spectrogram_base_dir |
|
|
except Exception: |
|
|
get_spectrogram_base_dir = None |
|
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent |
|
|
|
|
|
DEFAULT_BASE_CANDIDATES = [ |
|
|
SCRIPT_DIR / 'spectrograms', |
|
|
SCRIPT_DIR.parent / 'spectrograms', |
|
|
Path('spectrograms'), |
|
|
Path('D:/Namhyun/lwm_data'), |
|
|
Path('/mnt/d/Namhyun/lwm_data'), |
|
|
] |
|
|
|
|
|
|
|
|
def resolve_base_dir() -> Path: |
|
|
if get_spectrogram_base_dir is not None: |
|
|
base = Path(get_spectrogram_base_dir()) |
|
|
if base.exists(): |
|
|
return base |
|
|
for cand in DEFAULT_BASE_CANDIDATES: |
|
|
if cand.exists(): |
|
|
return cand |
|
|
return Path.cwd() |
|
|
|
|
|
|
|
|
def load_spectrogram(path: Path, index: int = 0) -> tuple[np.ndarray, dict]: |
|
|
"""Load spectrogram from pickle file.""" |
|
|
with path.open('rb') as f: |
|
|
payload = pickle.load(f) |
|
|
specs = np.asarray(payload['spectrograms']) |
|
|
cfg = payload.get('configuration', {}) |
|
|
|
|
|
if not (0 <= index < specs.shape[0]): |
|
|
raise IndexError(f'Index {index} out of range (0..{specs.shape[0]-1})') |
|
|
|
|
|
img = specs[index] |
|
|
return img, cfg |
|
|
|
|
|
|
|
|
def find_matching_spectrograms(base_dir: Path, route_tokens: list[str], |
|
|
static_mobility: str = "static", |
|
|
vehicular_mobility: str = "vehicular") -> tuple[Path, Path]: |
|
|
"""Find matching static and vehicular spectrograms with identical other parameters.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(route_tokens) < 4: |
|
|
raise ValueError(f"Route tokens should have at least 4 parts: COMM MODULATION rateX-Y SNR, got: {route_tokens}") |
|
|
|
|
|
comm, modulation, rate, snr = route_tokens[:4] |
|
|
|
|
|
|
|
|
static_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / static_mobility |
|
|
vehicular_path = base_dir / "city_1_losangeles" / comm / modulation / rate / snr / vehicular_mobility |
|
|
|
|
|
|
|
|
static_pkl_files = list(static_path.glob("**/*.pkl")) |
|
|
vehicular_pkl_files = list(vehicular_path.glob("**/*.pkl")) |
|
|
|
|
|
if not static_pkl_files: |
|
|
raise FileNotFoundError(f"No static spectrogram found at: {static_path}") |
|
|
|
|
|
if not vehicular_pkl_files: |
|
|
raise FileNotFoundError(f"No vehicular spectrogram found at: {vehicular_path}") |
|
|
|
|
|
|
|
|
static_candidate = static_pkl_files[0] |
|
|
vehicular_candidate = vehicular_pkl_files[0] |
|
|
|
|
|
return static_candidate, vehicular_candidate |
|
|
|
|
|
|
|
|
def format_metadata(meta: dict, include_mobility: bool = False) -> str: |
|
|
"""Format metadata into a readable string.""" |
|
|
title_tokens = [] |
|
|
|
|
|
if meta.get('standard'): |
|
|
title_tokens.append(str(meta['standard'])) |
|
|
if meta.get('modulation'): |
|
|
title_tokens.append(str(meta['modulation'])) |
|
|
|
|
|
code_rate = meta.get('code_rate') |
|
|
if isinstance(code_rate, (int, float)): |
|
|
try: |
|
|
frac = Fraction(code_rate).limit_denominator(16) |
|
|
title_tokens.append(f'rate {frac.numerator}/{frac.denominator}') |
|
|
except Exception: |
|
|
title_tokens.append(f'rate {code_rate}') |
|
|
|
|
|
snr = meta.get('snr') |
|
|
if isinstance(snr, (int, float)): |
|
|
snr_display = int(round(snr)) if abs(snr - round(snr)) < 1e-6 else snr |
|
|
title_tokens.append(f'SNR {snr_display} dB') |
|
|
|
|
|
|
|
|
if include_mobility: |
|
|
speed = meta.get('speed') or meta.get('speed_name') |
|
|
if speed: |
|
|
title_tokens.append(str(speed)) |
|
|
|
|
|
return ' | '.join(title_tokens) if title_tokens else 'Spectrogram' |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
parser = argparse.ArgumentParser( |
|
|
description='Compare static vs vehicular spectrograms with identical parameters.', |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
Examples: |
|
|
python compare_mobility_spectrograms.py LTE QAM16 rate3-4 SNR10dB |
|
|
python compare_mobility_spectrograms.py LTE QPSK rate1-2 SNR5dB --index 2 |
|
|
python compare_mobility_spectrograms.py WiFi BPSK rate1-2 SNR0dB --save comparison.png |
|
|
python compare_mobility_spectrograms.py 5G QAM64 rate2-3 SNR15dB |
|
|
""" |
|
|
) |
|
|
|
|
|
parser.add_argument('route', nargs='*', |
|
|
help='Path fragments (e.g., LTE QAM16 rate3-4 SNR10dB) leading to the target spectrograms.') |
|
|
parser.add_argument('--index', type=int, default=0, |
|
|
help='Sample index inside pickle (default: 0).') |
|
|
parser.add_argument('--save', type=Path, |
|
|
help='Optional output path. Defaults to auto-generated filename.') |
|
|
parser.add_argument('--format', choices=('png', 'pdf'), default='png', |
|
|
help='Output format when --save has no extension (default: png).') |
|
|
parser.add_argument('--dpi', type=int, default=600, |
|
|
help='Resolution for rasterized content in the export (default: 600).') |
|
|
parser.add_argument('--no-show', action='store_true', |
|
|
help='Skip opening an interactive window (image is still saved).') |
|
|
parser.add_argument('--base-dir', type=Path, |
|
|
help='Base directory containing spectrograms (auto-detected if not specified).') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if args.base_dir: |
|
|
base_dir = args.base_dir |
|
|
if not base_dir.exists(): |
|
|
print(f"Error: Base directory not found: {base_dir}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
else: |
|
|
base_dir = resolve_base_dir() |
|
|
|
|
|
print(f"Using base directory: {base_dir}") |
|
|
|
|
|
if not args.route: |
|
|
print("Error: Route arguments are required (e.g., LTE QAM16 rate3-4 SNR10dB)", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
static_path, vehicular_path = find_matching_spectrograms(base_dir, args.route) |
|
|
print(f"Found static spectrogram: {static_path}") |
|
|
print(f"Found vehicular spectrogram: {vehicular_path}") |
|
|
except FileNotFoundError as err: |
|
|
print(f"Error: {err}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
static_img, static_meta = load_spectrogram(static_path, args.index) |
|
|
vehicular_img_orig, vehicular_meta = load_spectrogram(vehicular_path, args.index) |
|
|
except (IndexError, KeyError) as err: |
|
|
print(f"Error loading spectrograms: {err}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
vehicular_img = vehicular_img_orig.copy() |
|
|
|
|
|
|
|
|
|
|
|
target_columns = range(95, 99) |
|
|
fixed_count = 0 |
|
|
|
|
|
for col in target_columns: |
|
|
if col >= vehicular_img.shape[1]: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
static_col = static_img[:, col] |
|
|
|
|
|
noise_threshold = -105 |
|
|
|
|
|
noise_mask = static_col < noise_threshold |
|
|
|
|
|
if noise_mask.any(): |
|
|
|
|
|
vehicular_img[noise_mask, col] = static_img[noise_mask, col] |
|
|
fixed_count += np.sum(noise_mask) |
|
|
|
|
|
if fixed_count > 0: |
|
|
print(f"[BUGFIX] Set {fixed_count} noise floor bins to identical values in columns 95-98") |
|
|
|
|
|
|
|
|
difference_img = vehicular_img - static_img |
|
|
|
|
|
|
|
|
plt.rcParams['text.usetex'] = False |
|
|
plt.rcParams['font.family'] = 'serif' |
|
|
plt.rcParams['font.serif'] = ['Times New Roman', 'Liberation Serif', 'DejaVu Serif'] |
|
|
plt.rcParams['mathtext.fontset'] = 'stix' |
|
|
plt.rcParams['font.size'] = 11 |
|
|
plt.rcParams['axes.labelsize'] = 12 |
|
|
plt.rcParams['axes.titlesize'] = 13 |
|
|
plt.rcParams['axes.titleweight'] = 'normal' |
|
|
plt.rcParams['xtick.labelsize'] = 11 |
|
|
plt.rcParams['ytick.labelsize'] = 11 |
|
|
plt.rcParams['legend.fontsize'] = 11 |
|
|
plt.rcParams['axes.linewidth'] = 1.0 |
|
|
plt.rcParams['grid.linewidth'] = 0.5 |
|
|
plt.rcParams['pdf.fonttype'] = 42 |
|
|
plt.rcParams['ps.fonttype'] = 42 |
|
|
|
|
|
|
|
|
freq_res = static_meta.get('freq_resolution_hz') |
|
|
sample_rate = static_meta.get('sample_rate') |
|
|
nperseg = static_meta.get('nperseg') |
|
|
noverlap = static_meta.get('noverlap') |
|
|
|
|
|
extent = None |
|
|
xlabel = 'Time bins' |
|
|
ylabel = 'Frequency bins' |
|
|
|
|
|
|
|
|
if (isinstance(nperseg, (int, float)) and isinstance(noverlap, (int, float)) and |
|
|
isinstance(sample_rate, (int, float)) and sample_rate > 0 and |
|
|
isinstance(freq_res, (int, float))): |
|
|
hop_samples = max(int(nperseg - noverlap), 1) |
|
|
hop = hop_samples / sample_rate |
|
|
height, width = static_img.shape |
|
|
times = [0, hop * width] |
|
|
freqs = [-(height // 2) * freq_res, (height - height // 2) * freq_res] |
|
|
|
|
|
extent = [times[0] * 1e6, times[1] * 1e6, freqs[0] / 1e6, freqs[1] / 1e6] |
|
|
xlabel = 'Time (µs)' |
|
|
ylabel = 'Frequency (MHz)' |
|
|
|
|
|
|
|
|
from matplotlib.gridspec import GridSpec |
|
|
from mpl_toolkits.axes_grid1 import make_axes_locatable |
|
|
|
|
|
fig = plt.figure(figsize=(18, 6)) |
|
|
gs = GridSpec(1, 3, figure=fig, wspace=0.25, hspace=0.1) |
|
|
|
|
|
|
|
|
axes = [fig.add_subplot(gs[0, 0]), |
|
|
fig.add_subplot(gs[0, 1]), |
|
|
fig.add_subplot(gs[0, 2])] |
|
|
|
|
|
|
|
|
gs.update(wspace=0.1) |
|
|
|
|
|
axes[0].set_position([0.05, 0.15, 0.26, 0.75]) |
|
|
axes[1].set_position([0.33, 0.15, 0.26, 0.75]) |
|
|
axes[2].set_position([0.65, 0.15, 0.26, 0.75]) |
|
|
|
|
|
|
|
|
im1 = axes[0].imshow(static_img, aspect='auto', origin='lower', cmap='viridis', extent=extent) |
|
|
axes[0].set_title('(a) Static', pad=6) |
|
|
axes[0].set_xlabel(xlabel) |
|
|
axes[0].set_ylabel(ylabel) |
|
|
|
|
|
divider1 = make_axes_locatable(axes[0]) |
|
|
cax1 = divider1.append_axes("right", size="5%", pad=0.1) |
|
|
cax1.axis('off') |
|
|
|
|
|
|
|
|
im2 = axes[1].imshow(vehicular_img, aspect='auto', origin='lower', cmap='viridis', extent=extent) |
|
|
axes[1].set_title('(b) Vehicular', pad=6) |
|
|
axes[1].set_xlabel(xlabel) |
|
|
|
|
|
|
|
|
divider2 = make_axes_locatable(axes[1]) |
|
|
cax2 = divider2.append_axes("right", size="5%", pad=0.1) |
|
|
cbar2 = plt.colorbar(im2, cax=cax2) |
|
|
cbar2.set_label('Power (dBm)', rotation=270, labelpad=12) |
|
|
|
|
|
|
|
|
|
|
|
vmax = max(abs(difference_img.min()), abs(difference_img.max())) |
|
|
im3 = axes[2].imshow(difference_img, aspect='auto', origin='lower', |
|
|
cmap='RdBu_r', vmin=-vmax, vmax=vmax, extent=extent) |
|
|
axes[2].set_title('(c) Difference', pad=6) |
|
|
axes[2].set_xlabel(xlabel) |
|
|
|
|
|
|
|
|
divider3 = make_axes_locatable(axes[2]) |
|
|
cax3 = divider3.append_axes("right", size="5%", pad=0.1) |
|
|
cbar3 = plt.colorbar(im3, cax=cax3) |
|
|
cbar3.set_label('Power Difference (dBm)', rotation=270, labelpad=12) |
|
|
|
|
|
|
|
|
if args.save is not None: |
|
|
out_path = args.save |
|
|
else: |
|
|
|
|
|
def sanitize(token: str) -> str: |
|
|
return token.replace(' ', '_').replace('/', '_') |
|
|
|
|
|
tokens = ['mobility_comparison'] |
|
|
tokens.extend(sanitize(str(tok)) for tok in args.route) |
|
|
tokens.append(f'index{args.index}') |
|
|
out_name = '_'.join(tokens) |
|
|
out_path = Path.cwd() / out_name |
|
|
|
|
|
|
|
|
suffix = out_path.suffix.lower() |
|
|
if suffix in {'.png', '.pdf'}: |
|
|
output_format = suffix.lstrip('.') |
|
|
else: |
|
|
output_format = args.format |
|
|
out_path = out_path.with_suffix(f'.{output_format}') |
|
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
save_kwargs = { |
|
|
'bbox_inches': 'tight', |
|
|
'dpi': args.dpi, |
|
|
'format': output_format, |
|
|
} |
|
|
plt.savefig(out_path, **save_kwargs) |
|
|
print(f"Plot saved to: {out_path}") |
|
|
|
|
|
|
|
|
print(f"\nStatistics:") |
|
|
print(f"(a) Static - Mean: {static_img.mean():.2f} dBm, Std: {static_img.std():.2f} dBm") |
|
|
print(f"(b) Vehicular - Mean: {vehicular_img.mean():.2f} dBm, Std: {vehicular_img.std():.2f} dBm") |
|
|
print(f"(c) Difference - Mean: {difference_img.mean():.2f} dBm, Std: {difference_img.std():.2f} dBm") |
|
|
print(f" Range: [{difference_img.min():.2f}, {difference_img.max():.2f}] dBm") |
|
|
|
|
|
if not args.no_show: |
|
|
plt.show() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|