lwm-temporal / examples /utils /data_utils.py
Sadjad Alikhani
added the dynamic dataset generation pipeline
80a230c
# data_utils.py
# -*- coding: utf-8 -*-
from __future__ import annotations
import os, time, math, pickle
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
from scipy.spatial import KDTree
from scipy.interpolate import griddata
from matplotlib.colors import LinearSegmentedColormap, Normalize
import torch
from input_preprocess import DeepMIMO_data_gen
# -------------------- DeepMIMO consts (robust defaults) --------------------
try:
import DeepMIMOv3.consts as c
except Exception:
class _C: pass
c = _C()
if not hasattr(c, 'PARAMSET_OFDM'):
c.PARAMSET_OFDM = 'OFDM'
c.PARAMSET_OFDM_BW = 'bandwidth'
c.PARAMSET_OFDM_BW_MULT = 1e9
c.PARAMSET_OFDM_SC_SAMP = 'selected_subcarriers'
c.PARAMSET_OFDM_SC_NUM = 'subcarriers'
c.PARAMSET_OFDM_LPF = 'LPF'
c.PARAMSET_FDTD = 'FDTD'
c.PARAMSET_ANT_SHAPE = 'shape'
c.PARAMSET_ANT_SPACING = 'spacing'
c.PARAMSET_ANT_ROTATION = 'rotation'
c.PARAMSET_ANT_FOV = 'FoV'
c.PARAMSET_ANT_RAD_PAT = 'radiation_pattern'
c.OUT_PATH_NUM = 'num_paths'
c.OUT_PATH_DOD_THETA = 'DoD_theta'
c.OUT_PATH_DOD_PHI = 'DoD_phi'
c.OUT_PATH_DOA_THETA = 'DoA_theta'
c.OUT_PATH_DOA_PHI = 'DoA_phi'
c.OUT_PATH_PHASE = 'phase'
c.OUT_PATH_TOA = 'ToA'
c.OUT_PATH_RX_POW = 'power'
c.OUT_PATH_DOP_VEL = 'Doppler_vel'
c.OUT_PATH_DOP_ACC = 'Doppler_acc'
c.PARAMSET_DOPPLER_EN = 'Doppler'
c.PARAMSET_SCENARIO_PARAMS = 'scenario_params'
c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN = 'Doppler_enabled'
c.PARAMSET_SCENARIO_PARAMS_CF = 'carrier_freq'
c.LIGHTSPEED = 3e8
# ============================ helpers: grid & sampling ============================
def infer_grid_step(pos_total: np.ndarray) -> float:
xy = np.unique(pos_total[:, :2], axis=0)
if len(xy) < 2:
return 1.0
if len(xy) > 20000:
xy = xy[np.random.choice(len(xy), 20000, replace=False)]
# For regular grids, find the minimum non-zero distance
tree = KDTree(xy)
dists, _ = tree.query(xy, k=2)
nn = dists[:, 1]
nn = nn[nn > 0]
if len(nn) == 0:
return 1.0
# Use the minimum distance as the grid step for regular grids
min_dist = float(np.min(nn))
# Debug: print some statistics
print(f"[DEBUG] Grid spacing inference:")
print(f" Min distance: {min_dist:.6f} m")
print(f" Max distance: {float(np.max(nn)):.6f} m")
print(f" Mean distance: {float(np.mean(nn)):.6f} m")
print(f" Median distance: {float(np.median(nn)):.6f} m")
# Verify this is likely a regular grid by checking if there are many points at this distance
# Count how many points have this minimum distance as their nearest neighbor
tolerance = min_dist * 0.1 # 10% tolerance
count_at_min = np.sum(np.abs(nn - min_dist) < tolerance)
percentage = count_at_min / len(nn) * 100
print(f" Points at min distance: {count_at_min}/{len(nn)} ({percentage:.1f}%)")
# If at least 20% of points have the minimum distance, it's likely a regular grid
if count_at_min >= 0.1 * len(nn):
print(f" Using min distance as grid step: {min_dist:.6f} m")
return min_dist
else:
# Fallback to the original method for irregular grids
lo, hi = np.percentile(nn, 5), np.percentile(nn, 30)
mid = nn[(nn >= lo) & (nn <= hi)]
step = float(np.median(mid) if len(mid) else np.median(nn))
print(f" Using fallback method: {step:.6f} m")
return max(step, 1e-3)
def sample_continuous_along_polyline(traj_pos, idxs, speed, dt, N):
"""
Walk along the polyline by exactly speed*dt per sample (continuous).
Returns positions (N,3), (idx0,idx1) per sample, alpha in [0,1), and 2D velocity directions.
"""
traj_pos = np.asarray(traj_pos, float)
if traj_pos.shape[0] < 2:
p = np.repeat(traj_pos[:1], N, axis=0)
pairs = [(idxs[0], idxs[0])] * N
alphas = np.zeros(N, float)
vdirs = np.zeros((N, 2), float)
return p.astype(np.float32), pairs, alphas.astype(np.float32), vdirs.astype(np.float32)
seg_vec = traj_pos[1:] - traj_pos[:-1] # (S,3)
seg_len = np.linalg.norm(seg_vec[:, :2], axis=1) # (S,)
S = len(seg_len)
eps = 1e-12
# cum distance at each vertex (including 0 at start)
seg_cum = np.zeros(S + 1, float)
seg_cum[1:] = np.cumsum(seg_len)
# cumulative traveled distance at each sample
ds = speed * dt * np.arange(N, dtype=float)
ds = np.clip(ds, 0.0, max(seg_cum[-1] - eps, 0.0))
pos_c = np.zeros((N, 3), float)
alphas = np.zeros(N, float)
vdirs = np.zeros((N, 2), float)
pairs = []
for k, s in enumerate(ds):
# find segment i such that seg_cum[i] <= s < seg_cum[i+1]
i = int(np.searchsorted(seg_cum, s, side='right') - 1)
i = min(max(i, 0), S - 1)
s0 = seg_cum[i]
L = seg_len[i]
a = (s - s0) / (L if L > eps else 1.0)
a = min(max(a, 0.0), 1.0 - 1e-9) # keep <1 so we don't hop to next vertex
p0 = traj_pos[i]
p1 = traj_pos[i + 1]
pos = p0 + a * (p1 - p0)
d = seg_vec[i, :2] / (L if L > eps else 1.0)
pos_c[k] = pos
alphas[k] = a
vdirs[k] = d
pairs.append((idxs[i], idxs[i + 1]))
return pos_c.astype(np.float32), pairs, alphas.astype(np.float32), vdirs.astype(np.float32)
# ============================ grid → roads & discrete trajs ============================
def filter_road_positions(valid_positions, ROAD_WIDTH, ROAD_CENTER_SPACING):
road_positions, lane_info = [], {}
half = ROAD_WIDTH / 2
for pos in valid_positions:
x, y, z = pos
cx = round(x / ROAD_CENTER_SPACING) * ROAD_CENTER_SPACING
cy = round(y / ROAD_CENTER_SPACING) * ROAD_CENTER_SPACING
dx, dy = x - cx, y - cy
on_v, on_h = abs(dx) < half, abs(dy) < half
if on_v and not on_h:
lane_info[tuple(pos)] = ((0, 1) if dx >= 0 else (0, -1), "vertical")
road_positions.append(pos)
elif on_h and not on_v:
lane_info[tuple(pos)] = ((1, 0) if dy < 0 else (-1, 0), "horizontal")
road_positions.append(pos)
elif on_v and on_h:
lane_info[tuple(pos)] = ((0, 1) if dx >= 0 else (0, -1), "intersection")
road_positions.append(pos)
return np.array(road_positions), lane_info
def create_grid_road_network(road_positions, lane_info, STEP_SIZE):
G = nx.DiGraph()
pos_dict = {tuple(pos): i for i, pos in enumerate(road_positions)}
for pos, idx in pos_dict.items():
if pos in lane_info:
direction, lane_type = lane_info[pos]
G.add_node(idx, pos=np.array(pos), direction=direction, lane_type=lane_type)
tree = KDTree(road_positions)
for idx, pos in enumerate(road_positions):
if idx not in G.nodes: continue
nbrs = tree.query_ball_point(pos, r=STEP_SIZE + 0.1)
for nb in nbrs:
if nb == idx or nb not in G.nodes: continue
nbpos = road_positions[nb]
d = np.linalg.norm(pos - nbpos)
if not np.isclose(d, STEP_SIZE, atol=0.1): continue
move_dir = (int(np.sign(nbpos[0] - pos[0])), int(np.sign(nbpos[1] - pos[1])))
lane_type = G.nodes[idx].get("lane_type", "vertical")
if lane_type == "intersection":
if move_dir in [(0,1),(0,-1),(1,0),(-1,0)]:
G.add_edge(idx, nb, weight=d)
else:
if move_dir == G.nodes[idx]['direction']:
G.add_edge(idx, nb, weight=d)
return G, road_positions
def generate_smooth_grid_trajectory(G, road_positions, TURN_PROBABILITY, sequence_length=12, start_node=None):
"""
Lane-aware walk; if stuck, we later fallback to a looser walk.
"""
import numpy as np
if start_node is None:
nodes = list(G.nodes)
if not nodes:
return np.empty((0, 3))
start_node = np.random.choice(nodes)
traj = [road_positions[start_node]]
current = start_node
prev = None
for _ in range(sequence_length - 1):
if current not in G:
break
nbrs = list(G.neighbors(current))
if prev in nbrs:
nbrs.remove(prev)
if not nbrs:
break
node_data = G.nodes[current]
lane_type = node_data.get("lane_type", "vertical")
if lane_type == "intersection" and prev is not None:
prev_pos = np.array(G.nodes[prev]["pos"])
curr_pos = np.array(node_data["pos"])
incoming_dir = (int(np.sign(curr_pos[0] - prev_pos[0])), int(np.sign(curr_pos[1] - prev_pos[1])))
default_direction = incoming_dir
else:
default_direction = node_data.get("direction", None)
pos = np.array(node_data["pos"])
defnbrs, turnnbrs = [], []
for n in nbrs:
npos = np.array(G.nodes[n]["pos"])
move_dir = (int(np.sign(npos[0] - pos[0])), int(np.sign(npos[1] - pos[1])))
if move_dir == default_direction:
defnbrs.append((n, np.linalg.norm(npos - pos)))
else:
turnnbrs.append((n, np.linalg.norm(npos - pos)))
if lane_type == "intersection":
r = np.random.rand()
if defnbrs and r > TURN_PROBABILITY:
nxt = min(defnbrs, key=lambda x: x[1])[0]
elif turnnbrs and r < TURN_PROBABILITY:
nxt = min(turnnbrs, key=lambda x: x[1])[0]
elif defnbrs:
nxt = min(defnbrs, key=lambda x: x[1])[0]
elif turnnbrs:
nxt = min(turnnbrs, key=lambda x: x[1])[0]
else:
break
else:
if defnbrs:
nxt = min(defnbrs, key=lambda x: x[1])[0]
else:
# lane says no, take any neighbor as a weak fallback
nxt = min(nbrs, key=lambda n: np.linalg.norm(road_positions[n] - road_positions[current]))
traj.append(road_positions[nxt])
prev, current = current, nxt
return np.array(traj)
def _fallback_anywalk(road_positions, step_size, length, start_idx=None):
"""
Undirected, geometry-only walk that steps to any neighbor ~step_size away.
Used only when lane-constrained walk can't achieve the desired skeleton length.
"""
import numpy as np
from scipy.spatial import KDTree
if start_idx is None:
start_idx = np.random.randint(0, len(road_positions))
pos = road_positions[start_idx]
traj = [pos]
tree = KDTree(road_positions)
for _ in range(length - 1):
# neighbors within a small shell around step_size
idxs = tree.query_ball_point(pos, r=step_size*1.1)
candidates = []
for i in idxs:
if np.allclose(road_positions[i], pos):
continue
d = np.linalg.norm(road_positions[i] - pos)
if np.isclose(d, step_size, atol=0.15*max(1.0, step_size)):
candidates.append(i)
if not candidates:
# jitter search radius slightly
idxs = tree.query_ball_point(pos, r=max(1.5*step_size, step_size+0.5))
if not idxs:
break
# pick nearest
i = min(idxs, key=lambda j: np.linalg.norm(road_positions[j]-pos))
else:
i = np.random.choice(candidates)
pos = road_positions[i]
traj.append(pos)
return np.array(traj)
def generate_n_smooth_grid_trajectories(G, road_positions, n, sequence_length=12, TURN_PROBABILITY=.15, max_attempts=2000, step_size=1.0):
"""
Try lane-aware skeletons first; if a sample can't reach `sequence_length`,
build a geometric fallback walk so we never return zero trajectories.
"""
import numpy as np
from scipy.spatial import KDTree
trajs = []
attempts = 0
hard_cap = n * max_attempts
tree = KDTree(road_positions)
min_x, min_y = np.min(road_positions[:, 0]), np.min(road_positions[:, 1])
max_x, max_y = np.max(road_positions[:, 0]), np.max(road_positions[:, 1])
while len(trajs) < n and attempts < hard_cap:
rand = [np.random.uniform(min_x, max_x), np.random.uniform(min_y, max_y), 0]
_, start_idx = tree.query(rand)
t = generate_smooth_grid_trajectory(G, road_positions, TURN_PROBABILITY, sequence_length, start_node=start_idx)
if len(t) < sequence_length:
# try fallback
t = _fallback_anywalk(road_positions, step_size, sequence_length, start_idx=start_idx)
if len(t) >= sequence_length:
trajs.append(t[:sequence_length])
attempts += 1
if len(trajs) < n:
print(f"[warn] only {len(trajs)} / {n} trajectories generated (skeleton).")
return trajs
def generate_pedestrian_trajectory(valid_positions, sequence_length=10, step_size=2.5, angle_std=0.1, start=None):
tree = KDTree(valid_positions)
if start is None:
start = valid_positions[np.random.choice(len(valid_positions))]
traj, ang, cur = [start], np.random.uniform(0, 2*np.pi), start
for _ in range(sequence_length-1):
ang += np.random.normal(0, angle_std)
new_cont = cur + np.array([step_size*np.cos(ang), step_size*np.sin(ang), 0])
_, idx = tree.query(new_cont)
new_pos = valid_positions[idx]
traj.append(new_pos); cur = new_pos
return np.array(traj)
def generate_n_pedestrian_trajectories(valid_positions, n, sequence_length=10, step_size=2.5, angle_std=0.1):
return [generate_pedestrian_trajectory(valid_positions, sequence_length, step_size, angle_std) for _ in range(n)]
def get_trajectory_indices(trajectories, pos_total):
pos_to_idx = {tuple(pos): i for i, pos in enumerate(pos_total)}
out = []
for tr in trajectories:
out.append([pos_to_idx.get(tuple(p), -1) for p in tr])
return out
# ============================ motion/doppler (discrete ref) ============================
def compute_cumulative_time(trajectories, speed_profile):
tr = np.array(trajectories); spd = np.array(speed_profile)
n_s, n_t = tr.shape[0], tr.shape[1]
cum = np.zeros((n_s, n_t))
deltas = np.diff(tr, axis=1)
step_sizes = np.sqrt(np.sum(deltas**2, axis=2))
for s in range(n_s):
cum[s,0] = 0.0
for t in range(n_t-1):
v = spd[s,t]; dt = 0.0 if v == 0 else step_sizes[s,t] / v
cum[s,t+1] = cum[s,t] + dt
return cum
def compute_velocity_directions(trajectories, cum_times):
tr = np.array(trajectories); ct = np.array(cum_times)
n_s, n_t = tr.shape[0], tr.shape[1]
dirs = np.zeros((n_s, n_t, 3)); angs = np.zeros((n_s, n_t)); vmag = np.zeros((n_s, n_t))
deltas = np.diff(tr, axis=1); dt = np.diff(ct, axis=1)
for s in range(n_s):
dx, dy, dz = deltas[s,:,0], deltas[s,:,1], deltas[s,:,2]
dist = np.sqrt(dx**2 + dy**2 + dz**2)
v = dist / np.clip(dt[s], 1e-12, None)
vx, vy, vz = dx/np.clip(dt[s],1e-12,None), dy/np.clip(dt[s],1e-12,None), dz/np.clip(dt[s],1e-12,None)
vm = np.sqrt(vx**2 + vy**2 + vz**2)
dirs[s,:-1,0] = np.where(vm>0, vx/vm, 0); dirs[s,:-1,1] = np.where(vm>0, vy/vm, 0); dirs[s,:-1,2] = np.where(vm>0, vz/vm, 0)
dirs[s,-1] = dirs[s,-2]
vmag[s,:-1] = v; vmag[s,-1] = v[-1]
angs[s,:-1] = np.degrees(np.arctan2(dy, dx)); angs[s,-1] = angs[s,-2]
return dirs, angs, vmag
def compute_speed_profile(traj, road_graph, road_positions, is_vehicle=True, speed_range=(5/3.6, 60/3.6)):
N = len(traj)
v = float(np.random.uniform(*speed_range))
spd = np.full(N, v, float)
same = np.allclose(traj[1:,:2], traj[:-1,:2], atol=1e-9)
spd[1:][same] = 0.0
return spd
def compute_pedestrian_speed_profile(traj, speed_range=(0.5, 2.0), tol=1e-3):
traj = np.asarray(traj, float); N = traj.shape[0]
spd = np.zeros(N, float)
v = float(np.random.uniform(*speed_range))
for i in range(N-1):
spd[i] = 0.0 if np.allclose(traj[i+1], traj[i], atol=tol) else v
spd[-1] = 0.0
return spd
# ============================ angle/phase interpolation ============================
def unwrap_angle_deg(a0, a1):
d = ((a1 - a0 + 180.0) % 360.0) - 180.0
return a0, a0 + d
def interpolate_ray_params(deepmimo_data, idx0, idx1, alpha):
p0 = deepmimo_data['user']['paths'][idx0]
p1 = deepmimo_data['user']['paths'][idx1]
L0, L1 = int(p0['num_paths']), int(p1['num_paths'])
if L0 == 0 or L1 == 0:
return dict(num_paths=0, DoD_theta=np.array([]), DoD_phi=np.array([]),
DoA_theta=np.array([]), DoA_phi=np.array([]),
phase=np.array([]), ToA=np.array([]), power=np.array([]), LoS=np.array([], int))
o0 = np.argsort(-np.asarray(p0['power']).flatten())
o1 = np.argsort(-np.asarray(p1['power']).flatten())
L = min(L0, L1); i0, i1 = o0[:L], o1[:L]
def g(dct, key, sel): return np.asarray(dct[key]).flatten()[sel].astype(float)
pow0, pow1 = g(p0,'power',i0), g(p1,'power',i1)
power = (1-alpha)*pow0 + alpha*pow1
ToA0, ToA1 = g(p0,'ToA',i0), g(p1,'ToA',i1)
ToA = (1-alpha)*ToA0 + alpha*ToA1
def ainterp(key):
a0, a1 = g(p0,key,i0), g(p1,key,i1)
out = np.zeros_like(a0)
for n in range(L):
u0,u1 = unwrap_angle_deg(a0[n], a1[n])
out[n] = (1-alpha)*u0 + alpha*u1
return out
DoD_theta, DoD_phi = ainterp('DoD_theta'), ainterp('DoD_phi')
DoA_theta, DoA_phi = ainterp('DoA_theta'), ainterp('DoA_phi')
ph0, ph1 = np.deg2rad(g(p0,'phase',i0)), np.deg2rad(g(p1,'phase',i1))
dphi = np.angle(np.exp(1j*(ph1-ph0)))
phase = np.rad2deg(ph0 + alpha*dphi)
LoS = (g(p0,'LoS',i0).astype(int) + g(p1,'LoS',i1).astype(int) > 0).astype(int)
return dict(num_paths=L, DoD_theta=DoD_theta, DoD_phi=DoD_phi,
DoA_theta=DoA_theta, DoA_phi=DoA_phi, phase=phase,
ToA=ToA, power=power, LoS=LoS)
# ============================ array/OFDM channel core ============================
def array_response_phase(theta, phi, kd):
gamma_x = 1j * kd * np.sin(theta) * np.cos(phi)
gamma_y = 1j * kd * np.sin(theta) * np.sin(phi)
gamma_z = 1j * kd * np.cos(theta)
return np.vstack([gamma_x, gamma_y, gamma_z]).T
def array_response(ant_ind, theta, phi, kd):
gamma = array_response_phase(theta, phi, kd)
return np.exp(ant_ind @ gamma.T)
def ant_indices(panel_size):
gx = np.tile(np.arange(1), panel_size[0] * panel_size[1])
gy = np.tile(np.repeat(np.arange(panel_size[0]), 1), panel_size[1])
gz = np.repeat(np.arange(panel_size[1]), panel_size[0])
return np.vstack([gx, gy, gz]).T
def apply_FoV(FoV, theta, phi):
theta = np.mod(theta, 2*np.pi); phi = np.mod(phi, 2*np.pi)
FoV = np.deg2rad(FoV)
inc_phi = np.logical_or(phi <= 0 + FoV[0]/2, phi >= 2*np.pi - FoV[0]/2)
inc_the = np.logical_and(theta <= np.pi/2 + FoV[1]/2, theta >= np.pi/2 - FoV[1]/2)
return np.logical_and(inc_phi, inc_the)
def rotate_angles(rotation, theta, phi):
theta = np.deg2rad(theta); phi = np.deg2rad(phi)
if rotation is not None:
R = np.deg2rad(rotation)
sa = np.sin(phi - R[2]); sb = np.sin(R[1]); sg = np.sin(R[0])
ca = np.cos(phi - R[2]); cb = np.cos(R[1]); cg = np.cos(R[0])
st, ct = np.sin(theta), np.cos(theta)
theta = np.arccos(cb*cg*ct + st*(sb*cg*ca - sg*sa))
phi = np.angle(cb*st*ca - sb*ct + 1j*(cb*sg*ct + st*(sb*sg*ca + cg*sa)))
return theta, phi
class OFDM_PathGenerator:
def __init__(self, params, subcarriers):
self.params = params
self.O = params[c.PARAMSET_OFDM]
self.generate = self.no_LPF if self.O[c.PARAMSET_OFDM_LPF] == 0 else self.with_LPF
self.subcarriers = subcarriers
self.total_sc = self.O[c.PARAMSET_OFDM_SC_NUM]
self.delay_d = np.arange(self.O['subcarriers'])
self.delay_to_OFDM = np.exp(-1j*2*np.pi/self.total_sc * np.outer(self.delay_d, self.subcarriers))
def _doppler_phase(self, raydata):
if not (self.params[c.PARAMSET_DOPPLER_EN] and self.params[c.PARAMSET_SCENARIO_PARAMS][c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN]):
return None
fc = self.params[c.PARAMSET_SCENARIO_PARAMS][c.PARAMSET_SCENARIO_PARAMS_CF]
v = np.asarray(raydata.get(c.OUT_PATH_DOP_VEL, 0.0)).reshape(-1,1)
t = np.asarray(raydata.get('elapsed_time', 0.0)).reshape(-1,1)
return np.exp(-1j*2*np.pi*(fc/c.LIGHTSPEED)*(v*t))
def no_LPF(self, raydata, Ts):
power = raydata[c.OUT_PATH_RX_POW].reshape(-1,1)
delay_n = (raydata[c.OUT_PATH_TOA] / Ts).reshape(-1,1)
phase = raydata[c.OUT_PATH_PHASE].reshape(-1,1)
over = (delay_n >= self.O['subcarriers'])
power[over] = 0; delay_n[over] = self.O['subcarriers']
path_const = np.sqrt(power/self.total_sc) * np.exp(1j*(np.deg2rad(phase) - (2*np.pi/self.total_sc)*np.outer(delay_n, self.subcarriers)))
DP = self._doppler_phase(raydata)
if DP is not None: path_const *= DP
return path_const
def with_LPF(self, raydata, Ts):
power = raydata[c.OUT_PATH_RX_POW].reshape(-1,1)
delay_n = (raydata[c.OUT_PATH_TOA] / Ts).reshape(-1,1)
phase = raydata[c.OUT_PATH_PHASE].reshape(-1,1)
over = (delay_n >= self.O['subcarriers'])
power[over] = 0; delay_n[over] = self.O['subcarriers']
pulse = np.sinc(self.delay_d - delay_n) * np.sqrt(power/self.total_sc) * np.exp(1j*np.deg2rad(phase))
DP = self._doppler_phase(raydata)
if DP is not None: pulse *= DP
return pulse
def generate_MIMO_channel(raydata, params, tx_ant_params, rx_ant_params):
bw = params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_BW] * c.PARAMSET_OFDM_BW_MULT
kd_tx = 2*np.pi*tx_ant_params[c.PARAMSET_ANT_SPACING]
kd_rx = 2*np.pi*rx_ant_params[c.PARAMSET_ANT_SPACING]
Ts = 1 / bw
subc = params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_SC_SAMP]
pg = OFDM_PathGenerator(params, subc)
M_tx = int(np.prod(tx_ant_params[c.PARAMSET_ANT_SHAPE])); ind_tx = ant_indices(tx_ant_params[c.PARAMSET_ANT_SHAPE])
M_rx = int(np.prod(rx_ant_params[c.PARAMSET_ANT_SHAPE])); ind_rx = ant_indices(rx_ant_params[c.PARAMSET_ANT_SHAPE])
ch = np.zeros((len(raydata), M_rx, M_tx, len(subc)), dtype=np.csingle)
los = np.zeros((len(raydata)), dtype=np.int8) - 2
for i in range(len(raydata)):
if raydata[i][c.OUT_PATH_NUM] == 0:
los[i] = -1; continue
dod_t, dod_p = rotate_angles(tx_ant_params[c.PARAMSET_ANT_ROTATION], raydata[i][c.OUT_PATH_DOD_THETA], raydata[i][c.OUT_PATH_DOD_PHI])
doa_t, doa_p = rotate_angles(rx_ant_params[c.PARAMSET_ANT_ROTATION], raydata[i][c.OUT_PATH_DOA_THETA], raydata[i][c.OUT_PATH_DOA_PHI])
f_tx = apply_FoV(tx_ant_params[c.PARAMSET_ANT_FOV], dod_t, dod_p)
f_rx = apply_FoV(rx_ant_params[c.PARAMSET_ANT_FOV], doa_t, doa_p)
f = np.logical_and(f_tx, f_rx)
dod_t, dod_p, doa_t, doa_p = dod_t[f], dod_p[f], doa_t[f], doa_p[f]
for k in list(raydata[i].keys()):
if k == c.OUT_PATH_NUM: raydata[i][k] = f.sum()
elif isinstance(raydata[i][k], np.ndarray) and raydata[i][k].shape[0] == f.shape[0]:
raydata[i][k] = raydata[i][k][f]
if raydata[i][c.OUT_PATH_NUM] == 0:
los[i] = -1; continue
else:
los[i] = int(np.sum(raydata[i].get('LoS', np.zeros(int(raydata[i][c.OUT_PATH_NUM])))))
aTX = array_response(ind_tx, dod_t, dod_p, kd_tx)
aRX = array_response(ind_rx, doa_t, doa_p, kd_rx)
path_const = pg.generate(raydata[i], Ts) # (L,1) complex per-subcarrier factors
if params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_LPF] == 0:
ch[i] = np.sum(aRX[:,None,None,:] * aTX[None,:,None,:] * path_const.T[None,None,:,:], axis=3)
else:
ch[i] = (np.sum(aRX[:,None,None,:] * aTX[None,:,None,:] * path_const.T[None,None,:,:], axis=3)) @ pg.delay_to_OFDM
return ch, los
def generate_channel_from_interpolated_ray(ray_interp, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, fc):
raydata = np.array([{
c.OUT_PATH_NUM: int(ray_interp['num_paths']),
c.OUT_PATH_DOD_THETA: np.asarray(ray_interp['DoD_theta']),
c.OUT_PATH_DOD_PHI: np.asarray(ray_interp['DoD_phi']),
c.OUT_PATH_DOA_THETA: np.asarray(ray_interp['DoA_theta']),
c.OUT_PATH_DOA_PHI: np.asarray(ray_interp['DoA_phi']),
c.OUT_PATH_PHASE: np.asarray(ray_interp['phase']),
c.OUT_PATH_TOA: np.asarray(ray_interp['ToA']),
c.OUT_PATH_RX_POW: np.asarray(ray_interp['power']),
'LoS': np.asarray(ray_interp.get('LoS', np.zeros(int(ray_interp['num_paths'])))),
c.OUT_PATH_DOP_VEL: np.asarray(ray_interp.get('Doppler_vel', np.zeros(int(ray_interp['num_paths'])))),
'elapsed_time': np.asarray(ray_interp.get('elapsed_time', np.zeros(int(ray_interp['num_paths'])))),
}], dtype=object)
params = {
c.PARAMSET_OFDM: {
c.PARAMSET_OFDM_BW: 1.92e-3,
c.PARAMSET_OFDM_SC_SAMP: np.arange(num_subcarriers),
c.PARAMSET_OFDM_SC_NUM: num_subcarriers,
c.PARAMSET_OFDM_LPF: 0,
'subcarriers': num_subcarriers
},
c.PARAMSET_FDTD: True,
c.PARAMSET_DOPPLER_EN: True,
c.PARAMSET_SCENARIO_PARAMS: { c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN: True, c.PARAMSET_SCENARIO_PARAMS_CF: float(fc) }
}
tx = { c.PARAMSET_ANT_SHAPE: np.array([num_antennas_tx_hor, num_antennas_tx_vert, 1]),
c.PARAMSET_ANT_SPACING: 0.5, c.PARAMSET_ANT_ROTATION: np.array([0,0,-135]),
c.PARAMSET_ANT_FOV: [360,180], c.PARAMSET_ANT_RAD_PAT: 'isotropic' }
rx = { c.PARAMSET_ANT_SHAPE: np.array([1,1,1]),
c.PARAMSET_ANT_SPACING: 0.5, c.PARAMSET_ANT_ROTATION: np.array([0,0,0]),
c.PARAMSET_ANT_FOV: [360,180], c.PARAMSET_ANT_RAD_PAT: 'isotropic' }
ch, los = generate_MIMO_channel(raydata, params, tx, rx)
return ch, None, los
# ============================ MAIN ============================
def dynamic_scenario_gen(
scenario,
time_steps=20,
num_antennas_rx=1,
num_antennas_tx_hor=32,
num_antennas_tx_vert=1,
num_subcarriers=32,
step_size=1.0,
road_center_spacing=25,
road_width=6,
turn_probability=0.1,
n_car=50,
n_ped=10,
max_attempts=300,
angle_std=0.1,
fc=3.5e9,
save_filename="trajectory_doppler_data.csv",
plot_background=True,
auto_step_size=True,
sample_dt=1e-3,
car_speed_range=(5/3.6, 60/3.6),
ped_speed_range=(0.5, 2.0),
continuous_mode=True,
cont_len=None
):
# 1) load DeepMIMO
os.makedirs("data", exist_ok=True)
fname = f"{scenario}_{num_antennas_tx_hor}_{num_antennas_tx_vert}_{num_subcarriers}.p"
fpath = os.path.join("data", fname)
if not os.path.exists(fpath):
print(f"Generating DeepMIMO data for scenario {scenario}...")
deepmimo_data = DeepMIMO_data_gen(scenario, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, 1)[0]
# Save the generated data for future use
with open(fpath, 'wb') as f:
pickle.dump(deepmimo_data, f)
print(f"DeepMIMO data saved to {fpath}")
else:
print(f"Loading cached DeepMIMO data from {fpath}")
try:
with open(fpath, 'rb') as f:
deepmimo_data = pickle.load(f)
except (pickle.UnpicklingError, EOFError, ValueError) as e:
print(f"Error loading cached data: {e}. Regenerating...")
deepmimo_data = DeepMIMO_data_gen(scenario, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, 1)[0]
# Save the regenerated data
with open(fpath, 'wb') as f:
pickle.dump(deepmimo_data, f)
print(f"DeepMIMO data regenerated and saved to {fpath}")
path_exist = np.array(deepmimo_data['user']['LoS'])
pos_total = np.array(deepmimo_data['user']['location'])
# background (optional)
if plot_background:
x,y = pos_total[:,0], pos_total[:,1]
cols = {0:'orange', -1:'white', 1:'gray'}
plt.figure(figsize=(8,8), dpi=160)
for v,col in cols.items():
m = path_exist==v; plt.scatter(x[m], y[m], s=3, c=col, alpha=0.6, label=f"LoS={v}")
plt.legend(); plt.grid(True); os.makedirs("figs", exist_ok=True)
plt.savefig("figs/environment.png"); plt.close()
inferred_step = infer_grid_step(pos_total)
STEP = float(inferred_step if auto_step_size else step_size)
print(f"[dynamic_scenario_gen] road step size used: {STEP:.6f} m (inferred={inferred_step:.6f} m)")
# 2) roads & discrete trajectories
valid_positions = pos_total[(path_exist == 0) | (path_exist == 1)]
road_positions, lanes = filter_road_positions(valid_positions, road_width, road_center_spacing)
road_graph, road_positions = create_grid_road_network(road_positions, lanes, STEP)
print("Total Nodes in Graph:", len(road_graph.nodes()))
vehicle_trajs = generate_n_smooth_grid_trajectories(
road_graph, road_positions, n_car, sequence_length=time_steps,
TURN_PROBABILITY=turn_probability, max_attempts=max_attempts
)
ped_trajs = generate_n_pedestrian_trajectories(
valid_positions, n_ped, sequence_length=time_steps,
step_size=STEP, angle_std=angle_std
)
veh_idx = get_trajectory_indices(vehicle_trajs, pos_total)
ped_idx = get_trajectory_indices(ped_trajs, pos_total)
# 3) discrete channels (reference; not used for continuous outputs)
Mtx = num_antennas_tx_hor * num_antennas_tx_vert
channel_discrete = np.zeros((n_car+n_ped, time_steps, Mtx, num_subcarriers), np.complex128)
# 4) build continuous tracks by speed*dt
if cont_len is None:
cont_len = time_steps
def build_tracks(trajs, idxs, speed_rng, N, dt):
tracks = []
for traj_pos, idl in zip(trajs, idxs):
v = float(np.random.uniform(*speed_rng))
pos_c, pairs, alpha, vdir = sample_continuous_along_polyline(traj_pos, idl, v, dt, N)
tracks.append(dict(speed=v, pos=pos_c, pairs=pairs, alpha=alpha, vdir=vdir))
return tracks
car_tracks = build_tracks(vehicle_trajs, veh_idx, car_speed_range, cont_len, sample_dt)
ped_tracks = build_tracks(ped_trajs, ped_idx, ped_speed_range, cont_len, sample_dt)
# 5) turn interpolated ray params into channels
def channels_for_tracks(tracks):
ch_all, pos_all, v_all, a_all, dop_all, ang_all, del_all, step_xy_all = [], [], [], [], [], [], [], []
for tr in tracks:
v = tr["speed"]; pos = tr["pos"]; pairs = tr["pairs"]; alpha = tr["alpha"]; vdir = tr["vdir"]
ch_list, dop_list, ang_list, del_list = [], [], [], []
vel_series = np.full(len(alpha), v, float)
acc_series = np.zeros_like(vel_series)
# per-sample XY step (for your sanity check)
step_xy = np.zeros(len(alpha), float)
step_xy[1:] = np.linalg.norm(pos[1:,:2] - pos[:-1,:2], axis=1)
for k in range(len(alpha)):
i0,i1 = pairs[k]; a = float(alpha[k]); vd = vdir[k]
ir = interpolate_ray_params(deepmimo_data, i0, i1, a)
if ir['num_paths'] == 0:
ch_list.append(np.zeros((Mtx, num_subcarriers), np.complex128))
dop_list.append(np.zeros((0,), np.float32))
ang_list.append(np.zeros((0,), np.float32))
del_list.append(np.zeros((0,), np.float32))
continue
# Doppler projection: v toward BS (AoA) gives positive |f_d|
doa_phi = np.deg2rad(ir['DoA_phi'])
aoa_unit = np.stack([np.cos(doa_phi), np.sin(doa_phi)], axis=1)
v_proj = -np.sum(aoa_unit * vd[None,:], axis=1) * v # (L,)
ir['Doppler_vel'] = v_proj.astype(np.float32)
ir['elapsed_time'] = np.ones_like(ir['power'], dtype=np.float32) * (k * sample_dt)
pred, _, _ = generate_channel_from_interpolated_ray(ir, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, fc)
ch_list.append(np.asarray(pred[0]).squeeze(0))
dop_list.append(v_proj.astype(np.float32))
ang_list.append(ir['DoA_phi'].astype(np.float32))
del_list.append(ir['ToA'].astype(np.float32))
ch_all.append(np.stack(ch_list, axis=0))
pos_all.append(pos.astype(np.float32))
v_all.append(vel_series.astype(np.float32))
a_all.append(acc_series.astype(np.float32))
dop_all.append(dop_list); ang_all.append(ang_list); del_all.append(del_list)
step_xy_all.append(step_xy.astype(np.float32))
return (np.stack(ch_all, axis=0),
np.stack(pos_all, axis=0),
np.stack(v_all, axis=0),
np.stack(a_all, axis=0),
dop_all, ang_all, del_all,
np.stack(step_xy_all, axis=0))
car_ch, car_pos, car_vel, car_acc, car_dop, car_ang, car_del, car_step = channels_for_tracks(car_tracks)
ped_ch, ped_pos, ped_vel, ped_acc, ped_dop, ped_ang, ped_del, ped_step = channels_for_tracks(ped_tracks)
channel_cont = np.concatenate([car_ch, ped_ch], axis=0)
pos_cont = np.concatenate([car_pos, ped_pos], axis=0)
vel_cont = np.concatenate([car_vel, ped_vel], axis=0)
acc_cont = np.concatenate([car_acc, ped_acc], axis=0)
step_cont = np.concatenate([car_step, ped_step], axis=0)
doppler_cont = car_dop + ped_dop
angle_cont = car_ang + ped_ang
delay_cont = car_del + ped_del
# 6) return: when continuous_mode=True, expose the continuous arrays under the
# canonical keys ("channel", "pos", "vel"...), so your script works unchanged.
out = {
"scenario": scenario,
"index_discrete": veh_idx + ped_idx,
"grid_step": STEP,
"sample_dt": sample_dt,
"car_speed_range": car_speed_range,
"ped_speed_range": ped_speed_range,
"los": path_exist,
# continuous (interpolated)
"channel_cont": channel_cont,
"pos_cont": pos_cont,
"vel_cont": vel_cont,
"acc_cont": acc_cont,
"doppler_vel_cont": doppler_cont,
"angle_cont": angle_cont,
"delay_cont": delay_cont,
"pos_step_xy": step_cont, # NEW: per-sample XY distance (should be ~ speed*dt)
# discrete references
"channel_discrete": channel_discrete,
"pos_discrete": vehicle_trajs + ped_trajs
}
if continuous_mode:
out["channel"] = channel_cont
out["pos"] = pos_cont
out["vel"] = vel_cont
out["acc"] = acc_cont
out["doppler_vel"] = doppler_cont
out["angle"] = angle_cont
out["delay"] = delay_cont
else:
out["channel"] = channel_discrete
# (no single array for discrete positions because they’re ragged lists)
return out
# -------------------- convenience (unchanged) --------------------
def channel_dim_generator(seed=42):
import pandas as pd
np.random.seed(seed)
n_scenarios = 2000; max_product = 2**16
dims = []
while len(dims) < n_scenarios:
a1 = np.random.randint(0,16); time_steps = 2**4 - a1
a2 = np.random.randint(0,6); num_ant = 2**(7-a2)
a3 = np.random.randint(0,6); num_sc = 2**(9-a3)
prod = time_steps * num_ant * num_sc
if prod <= max_product:
h,v = max(1,int(round(np.sqrt(num_ant)))), 1
# better factor finder:
for k in range(1, int(np.sqrt(num_ant))+1):
if num_ant % k == 0: h,v = num_ant//k, k
dims.append([time_steps, h, v, num_sc, prod])
arr = np.array(dims); np.random.shuffle(arr)
df = pd.DataFrame(arr, columns=['time_steps','num_antennas_tx_hor','num_antennas_tx_vert','num_subcarriers','product'])
df.to_csv('channel_dimensions.csv', index=False)
print(f"Generated {len(df)} sets.")
return df
def get_channel_dimensions(df, i):
if i < 0 or i >= len(df):
return f"Error: Index {i} is out of range. Valid range is 0 to {len(df)-1}"
r = df.iloc[i, :4].tolist()
return dict(time_steps=int(r[0]), num_antennas_tx_hor=int(r[1]), num_antennas_tx_vert=int(r[2]), num_subcarriers=int(r[3]))
import numpy as np
import matplotlib.pyplot as plt
def make_channel_gif(
scenario_data,
sample_idx=0,
out_gif="channel_evolution.gif",
mode="heatmap", # "heatmap" or "angle-delay"
tx_shape=None, # (num_antennas_tx_hor, num_antennas_tx_vert) if mode="angle-delay"
fps=12,
clim=None, # (vmin, vmax) for dB; if None uses global percentiles
downsample_every=1, # e.g., 2 to take every other frame
annotate=True,
figsize=(6,4),
cmap="gray_r"
):
"""
Create a GIF visualizing channel evolution for a single trajectory.
scenario_data["channel"] can be (N, T, M_tx, N_sc) or (N, T, M_rx, M_tx, N_sc).
Optional: scenario_data["pos"] (N, T, 3), scenario_data["vel"] (N, T), scenario_data["sample_dt"].
"""
try:
import imageio.v2 as imageio
except Exception:
raise ImportError("imageio is required. Install with: pip install imageio")
ch = scenario_data["channel"]
if ch.ndim == 5:
# average over RX if present → (N, T, M_tx, N_sc)
ch = ch.mean(axis=2)
ch = ch[sample_idx] # (T, M_tx, N_sc)
T, Mtx, Nsc = ch.shape
# Optional meta
pos = scenario_data.get("pos", None)
vel = scenario_data.get("vel", None)
dt = scenario_data.get("sample_dt", None)
pos_sample = pos[sample_idx] if isinstance(pos, np.ndarray) else None
vel_sample = vel[sample_idx] if isinstance(vel, np.ndarray) else None
# magnitude (dB) for heatmap scaling
eps = 1e-12
mag_db_all = 20.0 * np.log10(np.maximum(np.abs(ch), eps))
if clim is None:
vmin = float(np.percentile(mag_db_all, 5))
vmax = float(np.percentile(mag_db_all, 95))
else:
vmin, vmax = clim
frames = []
time_indices = list(range(0, T, max(1, int(downsample_every))))
for t in time_indices:
fig, ax = plt.subplots(figsize=figsize, dpi=140)
if mode == "heatmap":
mag_db = mag_db_all[t] # (M_tx, N_sc)
im = ax.imshow(mag_db, aspect="auto", origin="lower",
vmin=vmin, vmax=vmax, cmap=cmap)
ax.set_xlabel("Subcarrier index")
ax.set_ylabel("Tx element")
title = f"|H| (dB) — t={t}"
if dt is not None:
title += f" ({t*dt:.4f} s)"
if annotate and vel_sample is not None:
title += f", v={vel_sample[t]:.2f} m/s"
ax.set_title(title)
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
cbar.set_label("dB")
elif mode == "angle-delay":
if tx_shape is None or np.prod(tx_shape) != Mtx:
plt.close(fig)
raise ValueError(
'mode="angle-delay" needs tx_shape=(num_antennas_tx_hor, num_antennas_tx_vert) '
"matching M_tx."
)
Ht = ch[t].reshape(tx_shape[0], tx_shape[1], Nsc) # (H, V, Nsc)
Hh = Ht.mean(axis=1) # collapse vertical → (H, Nsc)
# delay transform (IFFT over subcarriers)
Hd = np.fft.ifftshift(np.fft.ifft(np.fft.fftshift(Hh, axes=1), axis=1), axes=1) # (H, Nsc)
# angle transform (FFT over horizontal)
Ha = np.fft.fftshift(np.fft.fft(Hd, axis=0), axes=0) # (H, Nsc)
PdB = 20.0 * np.log10(np.maximum(np.abs(Ha), eps))
im = ax.imshow(PdB, aspect="auto", origin="lower", cmap=cmap)
ax.set_xlabel("Delay bin")
ax.set_ylabel("Angle bin")
title = f"Angle–Delay power (dB) — t={t}"
if dt is not None:
title += f" ({t*dt:.4f} s)"
if annotate and vel_sample is not None:
title += f", v={vel_sample[t]:.2f} m/s"
ax.set_title(title)
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
cbar.set_label("dB")
else:
plt.close(fig)
raise ValueError('mode must be "heatmap" or "angle-delay"')
if annotate and pos_sample is not None:
x, y = pos_sample[t, 0], pos_sample[t, 1]
ax.text(
0.01, 0.99,
f"pos=({x:.2f}, {y:.2f})",
transform=ax.transAxes, ha="left", va="top",
fontsize=8, color="w",
bbox=dict(facecolor="k", alpha=0.35, pad=2, lw=0)
)
# --- robust frame extraction across backends ---
fig.canvas.draw()
try:
# preferred: fast path
w, h = fig.canvas.get_width_height()
buf = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
frame = buf.reshape(h, w, 4)[..., :3] # drop alpha
except Exception:
# fallback: render to PNG in-memory then read
import io
try:
import imageio.v2 as imageio
except Exception:
raise ImportError("imageio is required. Install with: pip install imageio")
bio = io.BytesIO()
fig.savefig(bio, format="png", bbox_inches="tight", pad_inches=0)
bio.seek(0)
frame = imageio.imread(bio)
bio.close()
frames.append(frame)
plt.close(fig)
imageio.mimsave(out_gif, frames, fps=fps)
return out_gif
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
def _angle_delay_transform(H_seq, tx_shape):
"""
H_seq: (T, M_tx, N_sc) complex channel for one UE
tx_shape: (M_h, M_v), M_h * M_v == M_tx
Returns: Had (T, M_h, M_v, N_sc) in angle–delay domain
- Angle: 2D FFT over (M_h, M_v) with fftshift
- Delay: IFFT over subcarriers
"""
T, Mtx, Nsc = H_seq.shape
Mh, Mv = tx_shape
if Mh * Mv != Mtx:
raise ValueError(f"tx_shape {tx_shape} must multiply to {Mtx}")
H4 = H_seq.reshape(T, Mh, Mv, Nsc)
# Angle domain (AoD) via 2D FFT across antenna plane
Ha = np.fft.fftshift(np.fft.fftn(H4, axes=(1, 2)), axes=(1, 2))
# Delay domain via IFFT across subcarriers
Had = np.fft.ifft(Ha, axis=-1)
return Had
def _pick_topk_bins(Had, k):
"""
Had: (T, Mh, Mv, Nsc)
Select top-k bins by mean |Had| over time.
Returns: list of (ih, iv, idelay) sorted by descending mean magnitude.
"""
mag = np.abs(Had) # (T, Mh, Mv, Nsc)
mean_mag = mag.mean(axis=0) # (Mh, Mv, Nsc)
flat = mean_mag.reshape(-1)
k = int(min(k, flat.size))
idx = np.argpartition(flat, -k)[-k:]
idx = idx[np.argsort(-flat[idx])] # sort descending
bins = [np.unravel_index(i, mean_mag.shape) for i in idx]
return bins
def make_topk_angle_delay_curves_gif(
scenario_data,
sample_idx=0,
tx_shape=(32, 1),
k=5,
out_gif="topk_bins_evolution.gif",
fps=12,
unwrap_phase=True,
downsample_every=1
):
"""
Visualize the evolution of magnitude & phase for the top-k angle–delay bins (by time-avg magnitude).
Produces a GIF with two subplots (magnitude/phase) where each bin’s curve reveals over time.
Args:
scenario_data: dict returned by dynamic_scenario_gen (must contain "channel" and optionally "sample_dt")
sample_idx: which UE trajectory to visualize
tx_shape: (M_h, M_v) so that M_h * M_v == M_tx
k: number of bins to track
out_gif: output filename (GIF)
fps: frames per second for the animation
unwrap_phase: if True, show unwrapped phase (continuous); else raw angle in [-pi, pi]
downsample_every: plot every Nth time sample to shorten very long sequences
Returns:
out_gif (str): path to the saved GIF
"""
# 1) pull the sequence and transform
H = scenario_data["channel"][sample_idx] # (T, M_tx, N_sc)
T = H.shape[0]
dt = float(scenario_data.get("sample_dt", 1.0))
t_axis = np.arange(T) * dt
Had = _angle_delay_transform(H, tx_shape) # (T, Mh, Mv, N_sc)
bins = _pick_topk_bins(Had, k)
# 2) build time series for chosen bins
mags, phases = [], []
for (ih, iv, idl) in bins:
ts = Had[:, ih, iv, idl] # (T,)
mags.append(np.abs(ts))
ph = np.angle(ts)
if unwrap_phase:
ph = np.unwrap(ph)
phases.append(ph)
mags = np.stack(mags, axis=0) # (k, T)
phases = np.stack(phases, axis=0) # (k, T)
# 3) optional time downsampling
if downsample_every > 1:
mags = mags[:, ::downsample_every]
phases = phases[:, ::downsample_every]
t_axis = t_axis[::downsample_every]
Tds = t_axis.size
# 4) set up animation
fig, axes = plt.subplots(1, 2, figsize=(10, 4), dpi=120)
ax_mag, ax_ph = axes
# Pre-plot empty lines (one per bin), let default colors be used
mag_lines = []
ph_lines = []
for i in range(len(bins)):
(ml,) = ax_mag.plot([], [], lw=1.5, label=f"bin {i}: (ah={bins[i][0]}, av={bins[i][1]}, τ={bins[i][2]})")
(pl,) = ax_ph.plot([], [], lw=1.5, label=f"bin {i}")
mag_lines.append(ml)
ph_lines.append(pl)
ax_mag.set_title("Top-k angle–delay bins: magnitude vs time")
ax_mag.set_xlabel("time (s)")
ax_mag.set_ylabel("|H|")
ax_mag.grid(True)
ax_mag.legend(loc="best", fontsize=8)
ax_ph.set_title("Top-k angle–delay bins: phase vs time")
ax_ph.set_xlabel("time (s)")
ax_ph.set_ylabel("phase (rad)" if unwrap_phase else "phase (wrapped)")
ax_ph.grid(True)
# Fix y-lims for stable animation
mag_max = float(np.max(mags)) if mags.size else 1.0
ax_mag.set_ylim(0, 1.05 * mag_max)
# Phase range: use global min/max
if phases.size:
ph_min, ph_max = float(np.min(phases)), float(np.max(phases))
if ph_max - ph_min < 1e-6:
ph_min -= 1.0
ph_max += 1.0
ax_ph.set_ylim(ph_min - 0.05 * abs(ph_min), ph_max + 0.05 * abs(ph_max))
ax_mag.set_xlim(t_axis[0], t_axis[-1])
ax_ph.set_xlim(t_axis[0], t_axis[-1])
# init & update
def _init():
for ln in mag_lines + ph_lines:
ln.set_data([], [])
return mag_lines + ph_lines
def _update(frame):
# reveal up to current frame
for i in range(len(bins)):
mag_lines[i].set_data(t_axis[:frame+1], mags[i, :frame+1])
ph_lines[i].set_data(t_axis[:frame+1], phases[i, :frame+1])
fig.suptitle(f"Sample {sample_idx} — frame {frame+1}/{Tds}", fontsize=11)
return mag_lines + ph_lines
ani = FuncAnimation(
fig, _update, frames=Tds, init_func=_init,
interval=1000.0 / fps, blit=False, repeat=False
)
# 5) save GIF using Pillow writer (keeps backend-agnostic; avoids tostring_rgb issues)
writer = PillowWriter(fps=fps)
ani.save(out_gif, writer=writer)
plt.close(fig)
return out_gif