# data_utils.py # -*- coding: utf-8 -*- from __future__ import annotations import os, time, math, pickle import numpy as np import matplotlib.pyplot as plt import networkx as nx from scipy.spatial import KDTree from scipy.interpolate import griddata from matplotlib.colors import LinearSegmentedColormap, Normalize import torch from input_preprocess import DeepMIMO_data_gen # -------------------- DeepMIMO consts (robust defaults) -------------------- try: import DeepMIMOv3.consts as c except Exception: class _C: pass c = _C() if not hasattr(c, 'PARAMSET_OFDM'): c.PARAMSET_OFDM = 'OFDM' c.PARAMSET_OFDM_BW = 'bandwidth' c.PARAMSET_OFDM_BW_MULT = 1e9 c.PARAMSET_OFDM_SC_SAMP = 'selected_subcarriers' c.PARAMSET_OFDM_SC_NUM = 'subcarriers' c.PARAMSET_OFDM_LPF = 'LPF' c.PARAMSET_FDTD = 'FDTD' c.PARAMSET_ANT_SHAPE = 'shape' c.PARAMSET_ANT_SPACING = 'spacing' c.PARAMSET_ANT_ROTATION = 'rotation' c.PARAMSET_ANT_FOV = 'FoV' c.PARAMSET_ANT_RAD_PAT = 'radiation_pattern' c.OUT_PATH_NUM = 'num_paths' c.OUT_PATH_DOD_THETA = 'DoD_theta' c.OUT_PATH_DOD_PHI = 'DoD_phi' c.OUT_PATH_DOA_THETA = 'DoA_theta' c.OUT_PATH_DOA_PHI = 'DoA_phi' c.OUT_PATH_PHASE = 'phase' c.OUT_PATH_TOA = 'ToA' c.OUT_PATH_RX_POW = 'power' c.OUT_PATH_DOP_VEL = 'Doppler_vel' c.OUT_PATH_DOP_ACC = 'Doppler_acc' c.PARAMSET_DOPPLER_EN = 'Doppler' c.PARAMSET_SCENARIO_PARAMS = 'scenario_params' c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN = 'Doppler_enabled' c.PARAMSET_SCENARIO_PARAMS_CF = 'carrier_freq' c.LIGHTSPEED = 3e8 # ============================ helpers: grid & sampling ============================ def infer_grid_step(pos_total: np.ndarray) -> float: xy = np.unique(pos_total[:, :2], axis=0) if len(xy) < 2: return 1.0 if len(xy) > 20000: xy = xy[np.random.choice(len(xy), 20000, replace=False)] # For regular grids, find the minimum non-zero distance tree = KDTree(xy) dists, _ = tree.query(xy, k=2) nn = dists[:, 1] nn = nn[nn > 0] if len(nn) == 0: return 1.0 # Use the minimum distance as the grid step for regular grids min_dist = float(np.min(nn)) # Debug: print some statistics print(f"[DEBUG] Grid spacing inference:") print(f" Min distance: {min_dist:.6f} m") print(f" Max distance: {float(np.max(nn)):.6f} m") print(f" Mean distance: {float(np.mean(nn)):.6f} m") print(f" Median distance: {float(np.median(nn)):.6f} m") # Verify this is likely a regular grid by checking if there are many points at this distance # Count how many points have this minimum distance as their nearest neighbor tolerance = min_dist * 0.1 # 10% tolerance count_at_min = np.sum(np.abs(nn - min_dist) < tolerance) percentage = count_at_min / len(nn) * 100 print(f" Points at min distance: {count_at_min}/{len(nn)} ({percentage:.1f}%)") # If at least 20% of points have the minimum distance, it's likely a regular grid if count_at_min >= 0.1 * len(nn): print(f" Using min distance as grid step: {min_dist:.6f} m") return min_dist else: # Fallback to the original method for irregular grids lo, hi = np.percentile(nn, 5), np.percentile(nn, 30) mid = nn[(nn >= lo) & (nn <= hi)] step = float(np.median(mid) if len(mid) else np.median(nn)) print(f" Using fallback method: {step:.6f} m") return max(step, 1e-3) def sample_continuous_along_polyline(traj_pos, idxs, speed, dt, N): """ Walk along the polyline by exactly speed*dt per sample (continuous). Returns positions (N,3), (idx0,idx1) per sample, alpha in [0,1), and 2D velocity directions. """ traj_pos = np.asarray(traj_pos, float) if traj_pos.shape[0] < 2: p = np.repeat(traj_pos[:1], N, axis=0) pairs = [(idxs[0], idxs[0])] * N alphas = np.zeros(N, float) vdirs = np.zeros((N, 2), float) return p.astype(np.float32), pairs, alphas.astype(np.float32), vdirs.astype(np.float32) seg_vec = traj_pos[1:] - traj_pos[:-1] # (S,3) seg_len = np.linalg.norm(seg_vec[:, :2], axis=1) # (S,) S = len(seg_len) eps = 1e-12 # cum distance at each vertex (including 0 at start) seg_cum = np.zeros(S + 1, float) seg_cum[1:] = np.cumsum(seg_len) # cumulative traveled distance at each sample ds = speed * dt * np.arange(N, dtype=float) ds = np.clip(ds, 0.0, max(seg_cum[-1] - eps, 0.0)) pos_c = np.zeros((N, 3), float) alphas = np.zeros(N, float) vdirs = np.zeros((N, 2), float) pairs = [] for k, s in enumerate(ds): # find segment i such that seg_cum[i] <= s < seg_cum[i+1] i = int(np.searchsorted(seg_cum, s, side='right') - 1) i = min(max(i, 0), S - 1) s0 = seg_cum[i] L = seg_len[i] a = (s - s0) / (L if L > eps else 1.0) a = min(max(a, 0.0), 1.0 - 1e-9) # keep <1 so we don't hop to next vertex p0 = traj_pos[i] p1 = traj_pos[i + 1] pos = p0 + a * (p1 - p0) d = seg_vec[i, :2] / (L if L > eps else 1.0) pos_c[k] = pos alphas[k] = a vdirs[k] = d pairs.append((idxs[i], idxs[i + 1])) return pos_c.astype(np.float32), pairs, alphas.astype(np.float32), vdirs.astype(np.float32) # ============================ grid → roads & discrete trajs ============================ def filter_road_positions(valid_positions, ROAD_WIDTH, ROAD_CENTER_SPACING): road_positions, lane_info = [], {} half = ROAD_WIDTH / 2 for pos in valid_positions: x, y, z = pos cx = round(x / ROAD_CENTER_SPACING) * ROAD_CENTER_SPACING cy = round(y / ROAD_CENTER_SPACING) * ROAD_CENTER_SPACING dx, dy = x - cx, y - cy on_v, on_h = abs(dx) < half, abs(dy) < half if on_v and not on_h: lane_info[tuple(pos)] = ((0, 1) if dx >= 0 else (0, -1), "vertical") road_positions.append(pos) elif on_h and not on_v: lane_info[tuple(pos)] = ((1, 0) if dy < 0 else (-1, 0), "horizontal") road_positions.append(pos) elif on_v and on_h: lane_info[tuple(pos)] = ((0, 1) if dx >= 0 else (0, -1), "intersection") road_positions.append(pos) return np.array(road_positions), lane_info def create_grid_road_network(road_positions, lane_info, STEP_SIZE): G = nx.DiGraph() pos_dict = {tuple(pos): i for i, pos in enumerate(road_positions)} for pos, idx in pos_dict.items(): if pos in lane_info: direction, lane_type = lane_info[pos] G.add_node(idx, pos=np.array(pos), direction=direction, lane_type=lane_type) tree = KDTree(road_positions) for idx, pos in enumerate(road_positions): if idx not in G.nodes: continue nbrs = tree.query_ball_point(pos, r=STEP_SIZE + 0.1) for nb in nbrs: if nb == idx or nb not in G.nodes: continue nbpos = road_positions[nb] d = np.linalg.norm(pos - nbpos) if not np.isclose(d, STEP_SIZE, atol=0.1): continue move_dir = (int(np.sign(nbpos[0] - pos[0])), int(np.sign(nbpos[1] - pos[1]))) lane_type = G.nodes[idx].get("lane_type", "vertical") if lane_type == "intersection": if move_dir in [(0,1),(0,-1),(1,0),(-1,0)]: G.add_edge(idx, nb, weight=d) else: if move_dir == G.nodes[idx]['direction']: G.add_edge(idx, nb, weight=d) return G, road_positions def generate_smooth_grid_trajectory(G, road_positions, TURN_PROBABILITY, sequence_length=12, start_node=None): """ Lane-aware walk; if stuck, we later fallback to a looser walk. """ import numpy as np if start_node is None: nodes = list(G.nodes) if not nodes: return np.empty((0, 3)) start_node = np.random.choice(nodes) traj = [road_positions[start_node]] current = start_node prev = None for _ in range(sequence_length - 1): if current not in G: break nbrs = list(G.neighbors(current)) if prev in nbrs: nbrs.remove(prev) if not nbrs: break node_data = G.nodes[current] lane_type = node_data.get("lane_type", "vertical") if lane_type == "intersection" and prev is not None: prev_pos = np.array(G.nodes[prev]["pos"]) curr_pos = np.array(node_data["pos"]) incoming_dir = (int(np.sign(curr_pos[0] - prev_pos[0])), int(np.sign(curr_pos[1] - prev_pos[1]))) default_direction = incoming_dir else: default_direction = node_data.get("direction", None) pos = np.array(node_data["pos"]) defnbrs, turnnbrs = [], [] for n in nbrs: npos = np.array(G.nodes[n]["pos"]) move_dir = (int(np.sign(npos[0] - pos[0])), int(np.sign(npos[1] - pos[1]))) if move_dir == default_direction: defnbrs.append((n, np.linalg.norm(npos - pos))) else: turnnbrs.append((n, np.linalg.norm(npos - pos))) if lane_type == "intersection": r = np.random.rand() if defnbrs and r > TURN_PROBABILITY: nxt = min(defnbrs, key=lambda x: x[1])[0] elif turnnbrs and r < TURN_PROBABILITY: nxt = min(turnnbrs, key=lambda x: x[1])[0] elif defnbrs: nxt = min(defnbrs, key=lambda x: x[1])[0] elif turnnbrs: nxt = min(turnnbrs, key=lambda x: x[1])[0] else: break else: if defnbrs: nxt = min(defnbrs, key=lambda x: x[1])[0] else: # lane says no, take any neighbor as a weak fallback nxt = min(nbrs, key=lambda n: np.linalg.norm(road_positions[n] - road_positions[current])) traj.append(road_positions[nxt]) prev, current = current, nxt return np.array(traj) def _fallback_anywalk(road_positions, step_size, length, start_idx=None): """ Undirected, geometry-only walk that steps to any neighbor ~step_size away. Used only when lane-constrained walk can't achieve the desired skeleton length. """ import numpy as np from scipy.spatial import KDTree if start_idx is None: start_idx = np.random.randint(0, len(road_positions)) pos = road_positions[start_idx] traj = [pos] tree = KDTree(road_positions) for _ in range(length - 1): # neighbors within a small shell around step_size idxs = tree.query_ball_point(pos, r=step_size*1.1) candidates = [] for i in idxs: if np.allclose(road_positions[i], pos): continue d = np.linalg.norm(road_positions[i] - pos) if np.isclose(d, step_size, atol=0.15*max(1.0, step_size)): candidates.append(i) if not candidates: # jitter search radius slightly idxs = tree.query_ball_point(pos, r=max(1.5*step_size, step_size+0.5)) if not idxs: break # pick nearest i = min(idxs, key=lambda j: np.linalg.norm(road_positions[j]-pos)) else: i = np.random.choice(candidates) pos = road_positions[i] traj.append(pos) return np.array(traj) def generate_n_smooth_grid_trajectories(G, road_positions, n, sequence_length=12, TURN_PROBABILITY=.15, max_attempts=2000, step_size=1.0): """ Try lane-aware skeletons first; if a sample can't reach `sequence_length`, build a geometric fallback walk so we never return zero trajectories. """ import numpy as np from scipy.spatial import KDTree trajs = [] attempts = 0 hard_cap = n * max_attempts tree = KDTree(road_positions) min_x, min_y = np.min(road_positions[:, 0]), np.min(road_positions[:, 1]) max_x, max_y = np.max(road_positions[:, 0]), np.max(road_positions[:, 1]) while len(trajs) < n and attempts < hard_cap: rand = [np.random.uniform(min_x, max_x), np.random.uniform(min_y, max_y), 0] _, start_idx = tree.query(rand) t = generate_smooth_grid_trajectory(G, road_positions, TURN_PROBABILITY, sequence_length, start_node=start_idx) if len(t) < sequence_length: # try fallback t = _fallback_anywalk(road_positions, step_size, sequence_length, start_idx=start_idx) if len(t) >= sequence_length: trajs.append(t[:sequence_length]) attempts += 1 if len(trajs) < n: print(f"[warn] only {len(trajs)} / {n} trajectories generated (skeleton).") return trajs def generate_pedestrian_trajectory(valid_positions, sequence_length=10, step_size=2.5, angle_std=0.1, start=None): tree = KDTree(valid_positions) if start is None: start = valid_positions[np.random.choice(len(valid_positions))] traj, ang, cur = [start], np.random.uniform(0, 2*np.pi), start for _ in range(sequence_length-1): ang += np.random.normal(0, angle_std) new_cont = cur + np.array([step_size*np.cos(ang), step_size*np.sin(ang), 0]) _, idx = tree.query(new_cont) new_pos = valid_positions[idx] traj.append(new_pos); cur = new_pos return np.array(traj) def generate_n_pedestrian_trajectories(valid_positions, n, sequence_length=10, step_size=2.5, angle_std=0.1): return [generate_pedestrian_trajectory(valid_positions, sequence_length, step_size, angle_std) for _ in range(n)] def get_trajectory_indices(trajectories, pos_total): pos_to_idx = {tuple(pos): i for i, pos in enumerate(pos_total)} out = [] for tr in trajectories: out.append([pos_to_idx.get(tuple(p), -1) for p in tr]) return out # ============================ motion/doppler (discrete ref) ============================ def compute_cumulative_time(trajectories, speed_profile): tr = np.array(trajectories); spd = np.array(speed_profile) n_s, n_t = tr.shape[0], tr.shape[1] cum = np.zeros((n_s, n_t)) deltas = np.diff(tr, axis=1) step_sizes = np.sqrt(np.sum(deltas**2, axis=2)) for s in range(n_s): cum[s,0] = 0.0 for t in range(n_t-1): v = spd[s,t]; dt = 0.0 if v == 0 else step_sizes[s,t] / v cum[s,t+1] = cum[s,t] + dt return cum def compute_velocity_directions(trajectories, cum_times): tr = np.array(trajectories); ct = np.array(cum_times) n_s, n_t = tr.shape[0], tr.shape[1] dirs = np.zeros((n_s, n_t, 3)); angs = np.zeros((n_s, n_t)); vmag = np.zeros((n_s, n_t)) deltas = np.diff(tr, axis=1); dt = np.diff(ct, axis=1) for s in range(n_s): dx, dy, dz = deltas[s,:,0], deltas[s,:,1], deltas[s,:,2] dist = np.sqrt(dx**2 + dy**2 + dz**2) v = dist / np.clip(dt[s], 1e-12, None) vx, vy, vz = dx/np.clip(dt[s],1e-12,None), dy/np.clip(dt[s],1e-12,None), dz/np.clip(dt[s],1e-12,None) vm = np.sqrt(vx**2 + vy**2 + vz**2) dirs[s,:-1,0] = np.where(vm>0, vx/vm, 0); dirs[s,:-1,1] = np.where(vm>0, vy/vm, 0); dirs[s,:-1,2] = np.where(vm>0, vz/vm, 0) dirs[s,-1] = dirs[s,-2] vmag[s,:-1] = v; vmag[s,-1] = v[-1] angs[s,:-1] = np.degrees(np.arctan2(dy, dx)); angs[s,-1] = angs[s,-2] return dirs, angs, vmag def compute_speed_profile(traj, road_graph, road_positions, is_vehicle=True, speed_range=(5/3.6, 60/3.6)): N = len(traj) v = float(np.random.uniform(*speed_range)) spd = np.full(N, v, float) same = np.allclose(traj[1:,:2], traj[:-1,:2], atol=1e-9) spd[1:][same] = 0.0 return spd def compute_pedestrian_speed_profile(traj, speed_range=(0.5, 2.0), tol=1e-3): traj = np.asarray(traj, float); N = traj.shape[0] spd = np.zeros(N, float) v = float(np.random.uniform(*speed_range)) for i in range(N-1): spd[i] = 0.0 if np.allclose(traj[i+1], traj[i], atol=tol) else v spd[-1] = 0.0 return spd # ============================ angle/phase interpolation ============================ def unwrap_angle_deg(a0, a1): d = ((a1 - a0 + 180.0) % 360.0) - 180.0 return a0, a0 + d def interpolate_ray_params(deepmimo_data, idx0, idx1, alpha): p0 = deepmimo_data['user']['paths'][idx0] p1 = deepmimo_data['user']['paths'][idx1] L0, L1 = int(p0['num_paths']), int(p1['num_paths']) if L0 == 0 or L1 == 0: return dict(num_paths=0, DoD_theta=np.array([]), DoD_phi=np.array([]), DoA_theta=np.array([]), DoA_phi=np.array([]), phase=np.array([]), ToA=np.array([]), power=np.array([]), LoS=np.array([], int)) o0 = np.argsort(-np.asarray(p0['power']).flatten()) o1 = np.argsort(-np.asarray(p1['power']).flatten()) L = min(L0, L1); i0, i1 = o0[:L], o1[:L] def g(dct, key, sel): return np.asarray(dct[key]).flatten()[sel].astype(float) pow0, pow1 = g(p0,'power',i0), g(p1,'power',i1) power = (1-alpha)*pow0 + alpha*pow1 ToA0, ToA1 = g(p0,'ToA',i0), g(p1,'ToA',i1) ToA = (1-alpha)*ToA0 + alpha*ToA1 def ainterp(key): a0, a1 = g(p0,key,i0), g(p1,key,i1) out = np.zeros_like(a0) for n in range(L): u0,u1 = unwrap_angle_deg(a0[n], a1[n]) out[n] = (1-alpha)*u0 + alpha*u1 return out DoD_theta, DoD_phi = ainterp('DoD_theta'), ainterp('DoD_phi') DoA_theta, DoA_phi = ainterp('DoA_theta'), ainterp('DoA_phi') ph0, ph1 = np.deg2rad(g(p0,'phase',i0)), np.deg2rad(g(p1,'phase',i1)) dphi = np.angle(np.exp(1j*(ph1-ph0))) phase = np.rad2deg(ph0 + alpha*dphi) LoS = (g(p0,'LoS',i0).astype(int) + g(p1,'LoS',i1).astype(int) > 0).astype(int) return dict(num_paths=L, DoD_theta=DoD_theta, DoD_phi=DoD_phi, DoA_theta=DoA_theta, DoA_phi=DoA_phi, phase=phase, ToA=ToA, power=power, LoS=LoS) # ============================ array/OFDM channel core ============================ def array_response_phase(theta, phi, kd): gamma_x = 1j * kd * np.sin(theta) * np.cos(phi) gamma_y = 1j * kd * np.sin(theta) * np.sin(phi) gamma_z = 1j * kd * np.cos(theta) return np.vstack([gamma_x, gamma_y, gamma_z]).T def array_response(ant_ind, theta, phi, kd): gamma = array_response_phase(theta, phi, kd) return np.exp(ant_ind @ gamma.T) def ant_indices(panel_size): gx = np.tile(np.arange(1), panel_size[0] * panel_size[1]) gy = np.tile(np.repeat(np.arange(panel_size[0]), 1), panel_size[1]) gz = np.repeat(np.arange(panel_size[1]), panel_size[0]) return np.vstack([gx, gy, gz]).T def apply_FoV(FoV, theta, phi): theta = np.mod(theta, 2*np.pi); phi = np.mod(phi, 2*np.pi) FoV = np.deg2rad(FoV) inc_phi = np.logical_or(phi <= 0 + FoV[0]/2, phi >= 2*np.pi - FoV[0]/2) inc_the = np.logical_and(theta <= np.pi/2 + FoV[1]/2, theta >= np.pi/2 - FoV[1]/2) return np.logical_and(inc_phi, inc_the) def rotate_angles(rotation, theta, phi): theta = np.deg2rad(theta); phi = np.deg2rad(phi) if rotation is not None: R = np.deg2rad(rotation) sa = np.sin(phi - R[2]); sb = np.sin(R[1]); sg = np.sin(R[0]) ca = np.cos(phi - R[2]); cb = np.cos(R[1]); cg = np.cos(R[0]) st, ct = np.sin(theta), np.cos(theta) theta = np.arccos(cb*cg*ct + st*(sb*cg*ca - sg*sa)) phi = np.angle(cb*st*ca - sb*ct + 1j*(cb*sg*ct + st*(sb*sg*ca + cg*sa))) return theta, phi class OFDM_PathGenerator: def __init__(self, params, subcarriers): self.params = params self.O = params[c.PARAMSET_OFDM] self.generate = self.no_LPF if self.O[c.PARAMSET_OFDM_LPF] == 0 else self.with_LPF self.subcarriers = subcarriers self.total_sc = self.O[c.PARAMSET_OFDM_SC_NUM] self.delay_d = np.arange(self.O['subcarriers']) self.delay_to_OFDM = np.exp(-1j*2*np.pi/self.total_sc * np.outer(self.delay_d, self.subcarriers)) def _doppler_phase(self, raydata): if not (self.params[c.PARAMSET_DOPPLER_EN] and self.params[c.PARAMSET_SCENARIO_PARAMS][c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN]): return None fc = self.params[c.PARAMSET_SCENARIO_PARAMS][c.PARAMSET_SCENARIO_PARAMS_CF] v = np.asarray(raydata.get(c.OUT_PATH_DOP_VEL, 0.0)).reshape(-1,1) t = np.asarray(raydata.get('elapsed_time', 0.0)).reshape(-1,1) return np.exp(-1j*2*np.pi*(fc/c.LIGHTSPEED)*(v*t)) def no_LPF(self, raydata, Ts): power = raydata[c.OUT_PATH_RX_POW].reshape(-1,1) delay_n = (raydata[c.OUT_PATH_TOA] / Ts).reshape(-1,1) phase = raydata[c.OUT_PATH_PHASE].reshape(-1,1) over = (delay_n >= self.O['subcarriers']) power[over] = 0; delay_n[over] = self.O['subcarriers'] path_const = np.sqrt(power/self.total_sc) * np.exp(1j*(np.deg2rad(phase) - (2*np.pi/self.total_sc)*np.outer(delay_n, self.subcarriers))) DP = self._doppler_phase(raydata) if DP is not None: path_const *= DP return path_const def with_LPF(self, raydata, Ts): power = raydata[c.OUT_PATH_RX_POW].reshape(-1,1) delay_n = (raydata[c.OUT_PATH_TOA] / Ts).reshape(-1,1) phase = raydata[c.OUT_PATH_PHASE].reshape(-1,1) over = (delay_n >= self.O['subcarriers']) power[over] = 0; delay_n[over] = self.O['subcarriers'] pulse = np.sinc(self.delay_d - delay_n) * np.sqrt(power/self.total_sc) * np.exp(1j*np.deg2rad(phase)) DP = self._doppler_phase(raydata) if DP is not None: pulse *= DP return pulse def generate_MIMO_channel(raydata, params, tx_ant_params, rx_ant_params): bw = params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_BW] * c.PARAMSET_OFDM_BW_MULT kd_tx = 2*np.pi*tx_ant_params[c.PARAMSET_ANT_SPACING] kd_rx = 2*np.pi*rx_ant_params[c.PARAMSET_ANT_SPACING] Ts = 1 / bw subc = params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_SC_SAMP] pg = OFDM_PathGenerator(params, subc) M_tx = int(np.prod(tx_ant_params[c.PARAMSET_ANT_SHAPE])); ind_tx = ant_indices(tx_ant_params[c.PARAMSET_ANT_SHAPE]) M_rx = int(np.prod(rx_ant_params[c.PARAMSET_ANT_SHAPE])); ind_rx = ant_indices(rx_ant_params[c.PARAMSET_ANT_SHAPE]) ch = np.zeros((len(raydata), M_rx, M_tx, len(subc)), dtype=np.csingle) los = np.zeros((len(raydata)), dtype=np.int8) - 2 for i in range(len(raydata)): if raydata[i][c.OUT_PATH_NUM] == 0: los[i] = -1; continue dod_t, dod_p = rotate_angles(tx_ant_params[c.PARAMSET_ANT_ROTATION], raydata[i][c.OUT_PATH_DOD_THETA], raydata[i][c.OUT_PATH_DOD_PHI]) doa_t, doa_p = rotate_angles(rx_ant_params[c.PARAMSET_ANT_ROTATION], raydata[i][c.OUT_PATH_DOA_THETA], raydata[i][c.OUT_PATH_DOA_PHI]) f_tx = apply_FoV(tx_ant_params[c.PARAMSET_ANT_FOV], dod_t, dod_p) f_rx = apply_FoV(rx_ant_params[c.PARAMSET_ANT_FOV], doa_t, doa_p) f = np.logical_and(f_tx, f_rx) dod_t, dod_p, doa_t, doa_p = dod_t[f], dod_p[f], doa_t[f], doa_p[f] for k in list(raydata[i].keys()): if k == c.OUT_PATH_NUM: raydata[i][k] = f.sum() elif isinstance(raydata[i][k], np.ndarray) and raydata[i][k].shape[0] == f.shape[0]: raydata[i][k] = raydata[i][k][f] if raydata[i][c.OUT_PATH_NUM] == 0: los[i] = -1; continue else: los[i] = int(np.sum(raydata[i].get('LoS', np.zeros(int(raydata[i][c.OUT_PATH_NUM]))))) aTX = array_response(ind_tx, dod_t, dod_p, kd_tx) aRX = array_response(ind_rx, doa_t, doa_p, kd_rx) path_const = pg.generate(raydata[i], Ts) # (L,1) complex per-subcarrier factors if params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_LPF] == 0: ch[i] = np.sum(aRX[:,None,None,:] * aTX[None,:,None,:] * path_const.T[None,None,:,:], axis=3) else: ch[i] = (np.sum(aRX[:,None,None,:] * aTX[None,:,None,:] * path_const.T[None,None,:,:], axis=3)) @ pg.delay_to_OFDM return ch, los def generate_channel_from_interpolated_ray(ray_interp, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, fc): raydata = np.array([{ c.OUT_PATH_NUM: int(ray_interp['num_paths']), c.OUT_PATH_DOD_THETA: np.asarray(ray_interp['DoD_theta']), c.OUT_PATH_DOD_PHI: np.asarray(ray_interp['DoD_phi']), c.OUT_PATH_DOA_THETA: np.asarray(ray_interp['DoA_theta']), c.OUT_PATH_DOA_PHI: np.asarray(ray_interp['DoA_phi']), c.OUT_PATH_PHASE: np.asarray(ray_interp['phase']), c.OUT_PATH_TOA: np.asarray(ray_interp['ToA']), c.OUT_PATH_RX_POW: np.asarray(ray_interp['power']), 'LoS': np.asarray(ray_interp.get('LoS', np.zeros(int(ray_interp['num_paths'])))), c.OUT_PATH_DOP_VEL: np.asarray(ray_interp.get('Doppler_vel', np.zeros(int(ray_interp['num_paths'])))), 'elapsed_time': np.asarray(ray_interp.get('elapsed_time', np.zeros(int(ray_interp['num_paths'])))), }], dtype=object) params = { c.PARAMSET_OFDM: { c.PARAMSET_OFDM_BW: 1.92e-3, c.PARAMSET_OFDM_SC_SAMP: np.arange(num_subcarriers), c.PARAMSET_OFDM_SC_NUM: num_subcarriers, c.PARAMSET_OFDM_LPF: 0, 'subcarriers': num_subcarriers }, c.PARAMSET_FDTD: True, c.PARAMSET_DOPPLER_EN: True, c.PARAMSET_SCENARIO_PARAMS: { c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN: True, c.PARAMSET_SCENARIO_PARAMS_CF: float(fc) } } tx = { c.PARAMSET_ANT_SHAPE: np.array([num_antennas_tx_hor, num_antennas_tx_vert, 1]), c.PARAMSET_ANT_SPACING: 0.5, c.PARAMSET_ANT_ROTATION: np.array([0,0,-135]), c.PARAMSET_ANT_FOV: [360,180], c.PARAMSET_ANT_RAD_PAT: 'isotropic' } rx = { c.PARAMSET_ANT_SHAPE: np.array([1,1,1]), c.PARAMSET_ANT_SPACING: 0.5, c.PARAMSET_ANT_ROTATION: np.array([0,0,0]), c.PARAMSET_ANT_FOV: [360,180], c.PARAMSET_ANT_RAD_PAT: 'isotropic' } ch, los = generate_MIMO_channel(raydata, params, tx, rx) return ch, None, los # ============================ MAIN ============================ def dynamic_scenario_gen( scenario, time_steps=20, num_antennas_rx=1, num_antennas_tx_hor=32, num_antennas_tx_vert=1, num_subcarriers=32, step_size=1.0, road_center_spacing=25, road_width=6, turn_probability=0.1, n_car=50, n_ped=10, max_attempts=300, angle_std=0.1, fc=3.5e9, save_filename="trajectory_doppler_data.csv", plot_background=True, auto_step_size=True, sample_dt=1e-3, car_speed_range=(5/3.6, 60/3.6), ped_speed_range=(0.5, 2.0), continuous_mode=True, cont_len=None ): # 1) load DeepMIMO os.makedirs("data", exist_ok=True) fname = f"{scenario}_{num_antennas_tx_hor}_{num_antennas_tx_vert}_{num_subcarriers}.p" fpath = os.path.join("data", fname) if not os.path.exists(fpath): print(f"Generating DeepMIMO data for scenario {scenario}...") deepmimo_data = DeepMIMO_data_gen(scenario, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, 1)[0] # Save the generated data for future use with open(fpath, 'wb') as f: pickle.dump(deepmimo_data, f) print(f"DeepMIMO data saved to {fpath}") else: print(f"Loading cached DeepMIMO data from {fpath}") try: with open(fpath, 'rb') as f: deepmimo_data = pickle.load(f) except (pickle.UnpicklingError, EOFError, ValueError) as e: print(f"Error loading cached data: {e}. Regenerating...") deepmimo_data = DeepMIMO_data_gen(scenario, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, 1)[0] # Save the regenerated data with open(fpath, 'wb') as f: pickle.dump(deepmimo_data, f) print(f"DeepMIMO data regenerated and saved to {fpath}") path_exist = np.array(deepmimo_data['user']['LoS']) pos_total = np.array(deepmimo_data['user']['location']) # background (optional) if plot_background: x,y = pos_total[:,0], pos_total[:,1] cols = {0:'orange', -1:'white', 1:'gray'} plt.figure(figsize=(8,8), dpi=160) for v,col in cols.items(): m = path_exist==v; plt.scatter(x[m], y[m], s=3, c=col, alpha=0.6, label=f"LoS={v}") plt.legend(); plt.grid(True); os.makedirs("figs", exist_ok=True) plt.savefig("figs/environment.png"); plt.close() inferred_step = infer_grid_step(pos_total) STEP = float(inferred_step if auto_step_size else step_size) print(f"[dynamic_scenario_gen] road step size used: {STEP:.6f} m (inferred={inferred_step:.6f} m)") # 2) roads & discrete trajectories valid_positions = pos_total[(path_exist == 0) | (path_exist == 1)] road_positions, lanes = filter_road_positions(valid_positions, road_width, road_center_spacing) road_graph, road_positions = create_grid_road_network(road_positions, lanes, STEP) print("Total Nodes in Graph:", len(road_graph.nodes())) vehicle_trajs = generate_n_smooth_grid_trajectories( road_graph, road_positions, n_car, sequence_length=time_steps, TURN_PROBABILITY=turn_probability, max_attempts=max_attempts ) ped_trajs = generate_n_pedestrian_trajectories( valid_positions, n_ped, sequence_length=time_steps, step_size=STEP, angle_std=angle_std ) veh_idx = get_trajectory_indices(vehicle_trajs, pos_total) ped_idx = get_trajectory_indices(ped_trajs, pos_total) # 3) discrete channels (reference; not used for continuous outputs) Mtx = num_antennas_tx_hor * num_antennas_tx_vert channel_discrete = np.zeros((n_car+n_ped, time_steps, Mtx, num_subcarriers), np.complex128) # 4) build continuous tracks by speed*dt if cont_len is None: cont_len = time_steps def build_tracks(trajs, idxs, speed_rng, N, dt): tracks = [] for traj_pos, idl in zip(trajs, idxs): v = float(np.random.uniform(*speed_rng)) pos_c, pairs, alpha, vdir = sample_continuous_along_polyline(traj_pos, idl, v, dt, N) tracks.append(dict(speed=v, pos=pos_c, pairs=pairs, alpha=alpha, vdir=vdir)) return tracks car_tracks = build_tracks(vehicle_trajs, veh_idx, car_speed_range, cont_len, sample_dt) ped_tracks = build_tracks(ped_trajs, ped_idx, ped_speed_range, cont_len, sample_dt) # 5) turn interpolated ray params into channels def channels_for_tracks(tracks): ch_all, pos_all, v_all, a_all, dop_all, ang_all, del_all, step_xy_all = [], [], [], [], [], [], [], [] for tr in tracks: v = tr["speed"]; pos = tr["pos"]; pairs = tr["pairs"]; alpha = tr["alpha"]; vdir = tr["vdir"] ch_list, dop_list, ang_list, del_list = [], [], [], [] vel_series = np.full(len(alpha), v, float) acc_series = np.zeros_like(vel_series) # per-sample XY step (for your sanity check) step_xy = np.zeros(len(alpha), float) step_xy[1:] = np.linalg.norm(pos[1:,:2] - pos[:-1,:2], axis=1) for k in range(len(alpha)): i0,i1 = pairs[k]; a = float(alpha[k]); vd = vdir[k] ir = interpolate_ray_params(deepmimo_data, i0, i1, a) if ir['num_paths'] == 0: ch_list.append(np.zeros((Mtx, num_subcarriers), np.complex128)) dop_list.append(np.zeros((0,), np.float32)) ang_list.append(np.zeros((0,), np.float32)) del_list.append(np.zeros((0,), np.float32)) continue # Doppler projection: v toward BS (AoA) gives positive |f_d| doa_phi = np.deg2rad(ir['DoA_phi']) aoa_unit = np.stack([np.cos(doa_phi), np.sin(doa_phi)], axis=1) v_proj = -np.sum(aoa_unit * vd[None,:], axis=1) * v # (L,) ir['Doppler_vel'] = v_proj.astype(np.float32) ir['elapsed_time'] = np.ones_like(ir['power'], dtype=np.float32) * (k * sample_dt) pred, _, _ = generate_channel_from_interpolated_ray(ir, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, fc) ch_list.append(np.asarray(pred[0]).squeeze(0)) dop_list.append(v_proj.astype(np.float32)) ang_list.append(ir['DoA_phi'].astype(np.float32)) del_list.append(ir['ToA'].astype(np.float32)) ch_all.append(np.stack(ch_list, axis=0)) pos_all.append(pos.astype(np.float32)) v_all.append(vel_series.astype(np.float32)) a_all.append(acc_series.astype(np.float32)) dop_all.append(dop_list); ang_all.append(ang_list); del_all.append(del_list) step_xy_all.append(step_xy.astype(np.float32)) return (np.stack(ch_all, axis=0), np.stack(pos_all, axis=0), np.stack(v_all, axis=0), np.stack(a_all, axis=0), dop_all, ang_all, del_all, np.stack(step_xy_all, axis=0)) car_ch, car_pos, car_vel, car_acc, car_dop, car_ang, car_del, car_step = channels_for_tracks(car_tracks) ped_ch, ped_pos, ped_vel, ped_acc, ped_dop, ped_ang, ped_del, ped_step = channels_for_tracks(ped_tracks) channel_cont = np.concatenate([car_ch, ped_ch], axis=0) pos_cont = np.concatenate([car_pos, ped_pos], axis=0) vel_cont = np.concatenate([car_vel, ped_vel], axis=0) acc_cont = np.concatenate([car_acc, ped_acc], axis=0) step_cont = np.concatenate([car_step, ped_step], axis=0) doppler_cont = car_dop + ped_dop angle_cont = car_ang + ped_ang delay_cont = car_del + ped_del # 6) return: when continuous_mode=True, expose the continuous arrays under the # canonical keys ("channel", "pos", "vel"...), so your script works unchanged. out = { "scenario": scenario, "index_discrete": veh_idx + ped_idx, "grid_step": STEP, "sample_dt": sample_dt, "car_speed_range": car_speed_range, "ped_speed_range": ped_speed_range, "los": path_exist, # continuous (interpolated) "channel_cont": channel_cont, "pos_cont": pos_cont, "vel_cont": vel_cont, "acc_cont": acc_cont, "doppler_vel_cont": doppler_cont, "angle_cont": angle_cont, "delay_cont": delay_cont, "pos_step_xy": step_cont, # NEW: per-sample XY distance (should be ~ speed*dt) # discrete references "channel_discrete": channel_discrete, "pos_discrete": vehicle_trajs + ped_trajs } if continuous_mode: out["channel"] = channel_cont out["pos"] = pos_cont out["vel"] = vel_cont out["acc"] = acc_cont out["doppler_vel"] = doppler_cont out["angle"] = angle_cont out["delay"] = delay_cont else: out["channel"] = channel_discrete # (no single array for discrete positions because they’re ragged lists) return out # -------------------- convenience (unchanged) -------------------- def channel_dim_generator(seed=42): import pandas as pd np.random.seed(seed) n_scenarios = 2000; max_product = 2**16 dims = [] while len(dims) < n_scenarios: a1 = np.random.randint(0,16); time_steps = 2**4 - a1 a2 = np.random.randint(0,6); num_ant = 2**(7-a2) a3 = np.random.randint(0,6); num_sc = 2**(9-a3) prod = time_steps * num_ant * num_sc if prod <= max_product: h,v = max(1,int(round(np.sqrt(num_ant)))), 1 # better factor finder: for k in range(1, int(np.sqrt(num_ant))+1): if num_ant % k == 0: h,v = num_ant//k, k dims.append([time_steps, h, v, num_sc, prod]) arr = np.array(dims); np.random.shuffle(arr) df = pd.DataFrame(arr, columns=['time_steps','num_antennas_tx_hor','num_antennas_tx_vert','num_subcarriers','product']) df.to_csv('channel_dimensions.csv', index=False) print(f"Generated {len(df)} sets.") return df def get_channel_dimensions(df, i): if i < 0 or i >= len(df): return f"Error: Index {i} is out of range. Valid range is 0 to {len(df)-1}" r = df.iloc[i, :4].tolist() return dict(time_steps=int(r[0]), num_antennas_tx_hor=int(r[1]), num_antennas_tx_vert=int(r[2]), num_subcarriers=int(r[3])) import numpy as np import matplotlib.pyplot as plt def make_channel_gif( scenario_data, sample_idx=0, out_gif="channel_evolution.gif", mode="heatmap", # "heatmap" or "angle-delay" tx_shape=None, # (num_antennas_tx_hor, num_antennas_tx_vert) if mode="angle-delay" fps=12, clim=None, # (vmin, vmax) for dB; if None uses global percentiles downsample_every=1, # e.g., 2 to take every other frame annotate=True, figsize=(6,4), cmap="gray_r" ): """ Create a GIF visualizing channel evolution for a single trajectory. scenario_data["channel"] can be (N, T, M_tx, N_sc) or (N, T, M_rx, M_tx, N_sc). Optional: scenario_data["pos"] (N, T, 3), scenario_data["vel"] (N, T), scenario_data["sample_dt"]. """ try: import imageio.v2 as imageio except Exception: raise ImportError("imageio is required. Install with: pip install imageio") ch = scenario_data["channel"] if ch.ndim == 5: # average over RX if present → (N, T, M_tx, N_sc) ch = ch.mean(axis=2) ch = ch[sample_idx] # (T, M_tx, N_sc) T, Mtx, Nsc = ch.shape # Optional meta pos = scenario_data.get("pos", None) vel = scenario_data.get("vel", None) dt = scenario_data.get("sample_dt", None) pos_sample = pos[sample_idx] if isinstance(pos, np.ndarray) else None vel_sample = vel[sample_idx] if isinstance(vel, np.ndarray) else None # magnitude (dB) for heatmap scaling eps = 1e-12 mag_db_all = 20.0 * np.log10(np.maximum(np.abs(ch), eps)) if clim is None: vmin = float(np.percentile(mag_db_all, 5)) vmax = float(np.percentile(mag_db_all, 95)) else: vmin, vmax = clim frames = [] time_indices = list(range(0, T, max(1, int(downsample_every)))) for t in time_indices: fig, ax = plt.subplots(figsize=figsize, dpi=140) if mode == "heatmap": mag_db = mag_db_all[t] # (M_tx, N_sc) im = ax.imshow(mag_db, aspect="auto", origin="lower", vmin=vmin, vmax=vmax, cmap=cmap) ax.set_xlabel("Subcarrier index") ax.set_ylabel("Tx element") title = f"|H| (dB) — t={t}" if dt is not None: title += f" ({t*dt:.4f} s)" if annotate and vel_sample is not None: title += f", v={vel_sample[t]:.2f} m/s" ax.set_title(title) cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("dB") elif mode == "angle-delay": if tx_shape is None or np.prod(tx_shape) != Mtx: plt.close(fig) raise ValueError( 'mode="angle-delay" needs tx_shape=(num_antennas_tx_hor, num_antennas_tx_vert) ' "matching M_tx." ) Ht = ch[t].reshape(tx_shape[0], tx_shape[1], Nsc) # (H, V, Nsc) Hh = Ht.mean(axis=1) # collapse vertical → (H, Nsc) # delay transform (IFFT over subcarriers) Hd = np.fft.ifftshift(np.fft.ifft(np.fft.fftshift(Hh, axes=1), axis=1), axes=1) # (H, Nsc) # angle transform (FFT over horizontal) Ha = np.fft.fftshift(np.fft.fft(Hd, axis=0), axes=0) # (H, Nsc) PdB = 20.0 * np.log10(np.maximum(np.abs(Ha), eps)) im = ax.imshow(PdB, aspect="auto", origin="lower", cmap=cmap) ax.set_xlabel("Delay bin") ax.set_ylabel("Angle bin") title = f"Angle–Delay power (dB) — t={t}" if dt is not None: title += f" ({t*dt:.4f} s)" if annotate and vel_sample is not None: title += f", v={vel_sample[t]:.2f} m/s" ax.set_title(title) cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("dB") else: plt.close(fig) raise ValueError('mode must be "heatmap" or "angle-delay"') if annotate and pos_sample is not None: x, y = pos_sample[t, 0], pos_sample[t, 1] ax.text( 0.01, 0.99, f"pos=({x:.2f}, {y:.2f})", transform=ax.transAxes, ha="left", va="top", fontsize=8, color="w", bbox=dict(facecolor="k", alpha=0.35, pad=2, lw=0) ) # --- robust frame extraction across backends --- fig.canvas.draw() try: # preferred: fast path w, h = fig.canvas.get_width_height() buf = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) frame = buf.reshape(h, w, 4)[..., :3] # drop alpha except Exception: # fallback: render to PNG in-memory then read import io try: import imageio.v2 as imageio except Exception: raise ImportError("imageio is required. Install with: pip install imageio") bio = io.BytesIO() fig.savefig(bio, format="png", bbox_inches="tight", pad_inches=0) bio.seek(0) frame = imageio.imread(bio) bio.close() frames.append(frame) plt.close(fig) imageio.mimsave(out_gif, frames, fps=fps) return out_gif import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation, PillowWriter def _angle_delay_transform(H_seq, tx_shape): """ H_seq: (T, M_tx, N_sc) complex channel for one UE tx_shape: (M_h, M_v), M_h * M_v == M_tx Returns: Had (T, M_h, M_v, N_sc) in angle–delay domain - Angle: 2D FFT over (M_h, M_v) with fftshift - Delay: IFFT over subcarriers """ T, Mtx, Nsc = H_seq.shape Mh, Mv = tx_shape if Mh * Mv != Mtx: raise ValueError(f"tx_shape {tx_shape} must multiply to {Mtx}") H4 = H_seq.reshape(T, Mh, Mv, Nsc) # Angle domain (AoD) via 2D FFT across antenna plane Ha = np.fft.fftshift(np.fft.fftn(H4, axes=(1, 2)), axes=(1, 2)) # Delay domain via IFFT across subcarriers Had = np.fft.ifft(Ha, axis=-1) return Had def _pick_topk_bins(Had, k): """ Had: (T, Mh, Mv, Nsc) Select top-k bins by mean |Had| over time. Returns: list of (ih, iv, idelay) sorted by descending mean magnitude. """ mag = np.abs(Had) # (T, Mh, Mv, Nsc) mean_mag = mag.mean(axis=0) # (Mh, Mv, Nsc) flat = mean_mag.reshape(-1) k = int(min(k, flat.size)) idx = np.argpartition(flat, -k)[-k:] idx = idx[np.argsort(-flat[idx])] # sort descending bins = [np.unravel_index(i, mean_mag.shape) for i in idx] return bins def make_topk_angle_delay_curves_gif( scenario_data, sample_idx=0, tx_shape=(32, 1), k=5, out_gif="topk_bins_evolution.gif", fps=12, unwrap_phase=True, downsample_every=1 ): """ Visualize the evolution of magnitude & phase for the top-k angle–delay bins (by time-avg magnitude). Produces a GIF with two subplots (magnitude/phase) where each bin’s curve reveals over time. Args: scenario_data: dict returned by dynamic_scenario_gen (must contain "channel" and optionally "sample_dt") sample_idx: which UE trajectory to visualize tx_shape: (M_h, M_v) so that M_h * M_v == M_tx k: number of bins to track out_gif: output filename (GIF) fps: frames per second for the animation unwrap_phase: if True, show unwrapped phase (continuous); else raw angle in [-pi, pi] downsample_every: plot every Nth time sample to shorten very long sequences Returns: out_gif (str): path to the saved GIF """ # 1) pull the sequence and transform H = scenario_data["channel"][sample_idx] # (T, M_tx, N_sc) T = H.shape[0] dt = float(scenario_data.get("sample_dt", 1.0)) t_axis = np.arange(T) * dt Had = _angle_delay_transform(H, tx_shape) # (T, Mh, Mv, N_sc) bins = _pick_topk_bins(Had, k) # 2) build time series for chosen bins mags, phases = [], [] for (ih, iv, idl) in bins: ts = Had[:, ih, iv, idl] # (T,) mags.append(np.abs(ts)) ph = np.angle(ts) if unwrap_phase: ph = np.unwrap(ph) phases.append(ph) mags = np.stack(mags, axis=0) # (k, T) phases = np.stack(phases, axis=0) # (k, T) # 3) optional time downsampling if downsample_every > 1: mags = mags[:, ::downsample_every] phases = phases[:, ::downsample_every] t_axis = t_axis[::downsample_every] Tds = t_axis.size # 4) set up animation fig, axes = plt.subplots(1, 2, figsize=(10, 4), dpi=120) ax_mag, ax_ph = axes # Pre-plot empty lines (one per bin), let default colors be used mag_lines = [] ph_lines = [] for i in range(len(bins)): (ml,) = ax_mag.plot([], [], lw=1.5, label=f"bin {i}: (ah={bins[i][0]}, av={bins[i][1]}, τ={bins[i][2]})") (pl,) = ax_ph.plot([], [], lw=1.5, label=f"bin {i}") mag_lines.append(ml) ph_lines.append(pl) ax_mag.set_title("Top-k angle–delay bins: magnitude vs time") ax_mag.set_xlabel("time (s)") ax_mag.set_ylabel("|H|") ax_mag.grid(True) ax_mag.legend(loc="best", fontsize=8) ax_ph.set_title("Top-k angle–delay bins: phase vs time") ax_ph.set_xlabel("time (s)") ax_ph.set_ylabel("phase (rad)" if unwrap_phase else "phase (wrapped)") ax_ph.grid(True) # Fix y-lims for stable animation mag_max = float(np.max(mags)) if mags.size else 1.0 ax_mag.set_ylim(0, 1.05 * mag_max) # Phase range: use global min/max if phases.size: ph_min, ph_max = float(np.min(phases)), float(np.max(phases)) if ph_max - ph_min < 1e-6: ph_min -= 1.0 ph_max += 1.0 ax_ph.set_ylim(ph_min - 0.05 * abs(ph_min), ph_max + 0.05 * abs(ph_max)) ax_mag.set_xlim(t_axis[0], t_axis[-1]) ax_ph.set_xlim(t_axis[0], t_axis[-1]) # init & update def _init(): for ln in mag_lines + ph_lines: ln.set_data([], []) return mag_lines + ph_lines def _update(frame): # reveal up to current frame for i in range(len(bins)): mag_lines[i].set_data(t_axis[:frame+1], mags[i, :frame+1]) ph_lines[i].set_data(t_axis[:frame+1], phases[i, :frame+1]) fig.suptitle(f"Sample {sample_idx} — frame {frame+1}/{Tds}", fontsize=11) return mag_lines + ph_lines ani = FuncAnimation( fig, _update, frames=Tds, init_func=_init, interval=1000.0 / fps, blit=False, repeat=False ) # 5) save GIF using Pillow writer (keeps backend-agnostic; avoids tostring_rgb issues) writer = PillowWriter(fps=fps) ani.save(out_gif, writer=writer) plt.close(fig) return out_gif