| | |
| | |
| | from __future__ import annotations |
| | import os, time, math, pickle |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | import networkx as nx |
| | from scipy.spatial import KDTree |
| | from scipy.interpolate import griddata |
| | from matplotlib.colors import LinearSegmentedColormap, Normalize |
| | import torch |
| |
|
| | from input_preprocess import DeepMIMO_data_gen |
| |
|
| | |
| | try: |
| | import DeepMIMOv3.consts as c |
| | except Exception: |
| | class _C: pass |
| | c = _C() |
| | if not hasattr(c, 'PARAMSET_OFDM'): |
| | c.PARAMSET_OFDM = 'OFDM' |
| | c.PARAMSET_OFDM_BW = 'bandwidth' |
| | c.PARAMSET_OFDM_BW_MULT = 1e9 |
| | c.PARAMSET_OFDM_SC_SAMP = 'selected_subcarriers' |
| | c.PARAMSET_OFDM_SC_NUM = 'subcarriers' |
| | c.PARAMSET_OFDM_LPF = 'LPF' |
| | c.PARAMSET_FDTD = 'FDTD' |
| | c.PARAMSET_ANT_SHAPE = 'shape' |
| | c.PARAMSET_ANT_SPACING = 'spacing' |
| | c.PARAMSET_ANT_ROTATION = 'rotation' |
| | c.PARAMSET_ANT_FOV = 'FoV' |
| | c.PARAMSET_ANT_RAD_PAT = 'radiation_pattern' |
| | c.OUT_PATH_NUM = 'num_paths' |
| | c.OUT_PATH_DOD_THETA = 'DoD_theta' |
| | c.OUT_PATH_DOD_PHI = 'DoD_phi' |
| | c.OUT_PATH_DOA_THETA = 'DoA_theta' |
| | c.OUT_PATH_DOA_PHI = 'DoA_phi' |
| | c.OUT_PATH_PHASE = 'phase' |
| | c.OUT_PATH_TOA = 'ToA' |
| | c.OUT_PATH_RX_POW = 'power' |
| | c.OUT_PATH_DOP_VEL = 'Doppler_vel' |
| | c.OUT_PATH_DOP_ACC = 'Doppler_acc' |
| | c.PARAMSET_DOPPLER_EN = 'Doppler' |
| | c.PARAMSET_SCENARIO_PARAMS = 'scenario_params' |
| | c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN = 'Doppler_enabled' |
| | c.PARAMSET_SCENARIO_PARAMS_CF = 'carrier_freq' |
| | c.LIGHTSPEED = 3e8 |
| |
|
| | |
| |
|
| | def infer_grid_step(pos_total: np.ndarray) -> float: |
| | xy = np.unique(pos_total[:, :2], axis=0) |
| | if len(xy) < 2: |
| | return 1.0 |
| | if len(xy) > 20000: |
| | xy = xy[np.random.choice(len(xy), 20000, replace=False)] |
| | |
| | |
| | tree = KDTree(xy) |
| | dists, _ = tree.query(xy, k=2) |
| | nn = dists[:, 1] |
| | nn = nn[nn > 0] |
| | if len(nn) == 0: |
| | return 1.0 |
| | |
| | |
| | min_dist = float(np.min(nn)) |
| | |
| | |
| | print(f"[DEBUG] Grid spacing inference:") |
| | print(f" Min distance: {min_dist:.6f} m") |
| | print(f" Max distance: {float(np.max(nn)):.6f} m") |
| | print(f" Mean distance: {float(np.mean(nn)):.6f} m") |
| | print(f" Median distance: {float(np.median(nn)):.6f} m") |
| | |
| | |
| | |
| | tolerance = min_dist * 0.1 |
| | count_at_min = np.sum(np.abs(nn - min_dist) < tolerance) |
| | percentage = count_at_min / len(nn) * 100 |
| | |
| | print(f" Points at min distance: {count_at_min}/{len(nn)} ({percentage:.1f}%)") |
| | |
| | |
| | if count_at_min >= 0.1 * len(nn): |
| | print(f" Using min distance as grid step: {min_dist:.6f} m") |
| | return min_dist |
| | else: |
| | |
| | lo, hi = np.percentile(nn, 5), np.percentile(nn, 30) |
| | mid = nn[(nn >= lo) & (nn <= hi)] |
| | step = float(np.median(mid) if len(mid) else np.median(nn)) |
| | print(f" Using fallback method: {step:.6f} m") |
| | return max(step, 1e-3) |
| |
|
| | def sample_continuous_along_polyline(traj_pos, idxs, speed, dt, N): |
| | """ |
| | Walk along the polyline by exactly speed*dt per sample (continuous). |
| | Returns positions (N,3), (idx0,idx1) per sample, alpha in [0,1), and 2D velocity directions. |
| | """ |
| | traj_pos = np.asarray(traj_pos, float) |
| | if traj_pos.shape[0] < 2: |
| | p = np.repeat(traj_pos[:1], N, axis=0) |
| | pairs = [(idxs[0], idxs[0])] * N |
| | alphas = np.zeros(N, float) |
| | vdirs = np.zeros((N, 2), float) |
| | return p.astype(np.float32), pairs, alphas.astype(np.float32), vdirs.astype(np.float32) |
| |
|
| | seg_vec = traj_pos[1:] - traj_pos[:-1] |
| | seg_len = np.linalg.norm(seg_vec[:, :2], axis=1) |
| | S = len(seg_len) |
| | eps = 1e-12 |
| | |
| | seg_cum = np.zeros(S + 1, float) |
| | seg_cum[1:] = np.cumsum(seg_len) |
| |
|
| | |
| | ds = speed * dt * np.arange(N, dtype=float) |
| | ds = np.clip(ds, 0.0, max(seg_cum[-1] - eps, 0.0)) |
| |
|
| | pos_c = np.zeros((N, 3), float) |
| | alphas = np.zeros(N, float) |
| | vdirs = np.zeros((N, 2), float) |
| | pairs = [] |
| |
|
| | for k, s in enumerate(ds): |
| | |
| | i = int(np.searchsorted(seg_cum, s, side='right') - 1) |
| | i = min(max(i, 0), S - 1) |
| | s0 = seg_cum[i] |
| | L = seg_len[i] |
| | a = (s - s0) / (L if L > eps else 1.0) |
| | a = min(max(a, 0.0), 1.0 - 1e-9) |
| |
|
| | p0 = traj_pos[i] |
| | p1 = traj_pos[i + 1] |
| | pos = p0 + a * (p1 - p0) |
| |
|
| | d = seg_vec[i, :2] / (L if L > eps else 1.0) |
| |
|
| | pos_c[k] = pos |
| | alphas[k] = a |
| | vdirs[k] = d |
| | pairs.append((idxs[i], idxs[i + 1])) |
| |
|
| | return pos_c.astype(np.float32), pairs, alphas.astype(np.float32), vdirs.astype(np.float32) |
| |
|
| | |
| |
|
| | def filter_road_positions(valid_positions, ROAD_WIDTH, ROAD_CENTER_SPACING): |
| | road_positions, lane_info = [], {} |
| | half = ROAD_WIDTH / 2 |
| | for pos in valid_positions: |
| | x, y, z = pos |
| | cx = round(x / ROAD_CENTER_SPACING) * ROAD_CENTER_SPACING |
| | cy = round(y / ROAD_CENTER_SPACING) * ROAD_CENTER_SPACING |
| | dx, dy = x - cx, y - cy |
| | on_v, on_h = abs(dx) < half, abs(dy) < half |
| | if on_v and not on_h: |
| | lane_info[tuple(pos)] = ((0, 1) if dx >= 0 else (0, -1), "vertical") |
| | road_positions.append(pos) |
| | elif on_h and not on_v: |
| | lane_info[tuple(pos)] = ((1, 0) if dy < 0 else (-1, 0), "horizontal") |
| | road_positions.append(pos) |
| | elif on_v and on_h: |
| | lane_info[tuple(pos)] = ((0, 1) if dx >= 0 else (0, -1), "intersection") |
| | road_positions.append(pos) |
| | return np.array(road_positions), lane_info |
| |
|
| | def create_grid_road_network(road_positions, lane_info, STEP_SIZE): |
| | G = nx.DiGraph() |
| | pos_dict = {tuple(pos): i for i, pos in enumerate(road_positions)} |
| | for pos, idx in pos_dict.items(): |
| | if pos in lane_info: |
| | direction, lane_type = lane_info[pos] |
| | G.add_node(idx, pos=np.array(pos), direction=direction, lane_type=lane_type) |
| | tree = KDTree(road_positions) |
| | for idx, pos in enumerate(road_positions): |
| | if idx not in G.nodes: continue |
| | nbrs = tree.query_ball_point(pos, r=STEP_SIZE + 0.1) |
| | for nb in nbrs: |
| | if nb == idx or nb not in G.nodes: continue |
| | nbpos = road_positions[nb] |
| | d = np.linalg.norm(pos - nbpos) |
| | if not np.isclose(d, STEP_SIZE, atol=0.1): continue |
| | move_dir = (int(np.sign(nbpos[0] - pos[0])), int(np.sign(nbpos[1] - pos[1]))) |
| | lane_type = G.nodes[idx].get("lane_type", "vertical") |
| | if lane_type == "intersection": |
| | if move_dir in [(0,1),(0,-1),(1,0),(-1,0)]: |
| | G.add_edge(idx, nb, weight=d) |
| | else: |
| | if move_dir == G.nodes[idx]['direction']: |
| | G.add_edge(idx, nb, weight=d) |
| | return G, road_positions |
| |
|
| | def generate_smooth_grid_trajectory(G, road_positions, TURN_PROBABILITY, sequence_length=12, start_node=None): |
| | """ |
| | Lane-aware walk; if stuck, we later fallback to a looser walk. |
| | """ |
| | import numpy as np |
| | if start_node is None: |
| | nodes = list(G.nodes) |
| | if not nodes: |
| | return np.empty((0, 3)) |
| | start_node = np.random.choice(nodes) |
| | traj = [road_positions[start_node]] |
| | current = start_node |
| | prev = None |
| |
|
| | for _ in range(sequence_length - 1): |
| | if current not in G: |
| | break |
| | nbrs = list(G.neighbors(current)) |
| | if prev in nbrs: |
| | nbrs.remove(prev) |
| |
|
| | if not nbrs: |
| | break |
| |
|
| | node_data = G.nodes[current] |
| | lane_type = node_data.get("lane_type", "vertical") |
| |
|
| | if lane_type == "intersection" and prev is not None: |
| | prev_pos = np.array(G.nodes[prev]["pos"]) |
| | curr_pos = np.array(node_data["pos"]) |
| | incoming_dir = (int(np.sign(curr_pos[0] - prev_pos[0])), int(np.sign(curr_pos[1] - prev_pos[1]))) |
| | default_direction = incoming_dir |
| | else: |
| | default_direction = node_data.get("direction", None) |
| |
|
| | pos = np.array(node_data["pos"]) |
| | defnbrs, turnnbrs = [], [] |
| | for n in nbrs: |
| | npos = np.array(G.nodes[n]["pos"]) |
| | move_dir = (int(np.sign(npos[0] - pos[0])), int(np.sign(npos[1] - pos[1]))) |
| | if move_dir == default_direction: |
| | defnbrs.append((n, np.linalg.norm(npos - pos))) |
| | else: |
| | turnnbrs.append((n, np.linalg.norm(npos - pos))) |
| |
|
| | if lane_type == "intersection": |
| | r = np.random.rand() |
| | if defnbrs and r > TURN_PROBABILITY: |
| | nxt = min(defnbrs, key=lambda x: x[1])[0] |
| | elif turnnbrs and r < TURN_PROBABILITY: |
| | nxt = min(turnnbrs, key=lambda x: x[1])[0] |
| | elif defnbrs: |
| | nxt = min(defnbrs, key=lambda x: x[1])[0] |
| | elif turnnbrs: |
| | nxt = min(turnnbrs, key=lambda x: x[1])[0] |
| | else: |
| | break |
| | else: |
| | if defnbrs: |
| | nxt = min(defnbrs, key=lambda x: x[1])[0] |
| | else: |
| | |
| | nxt = min(nbrs, key=lambda n: np.linalg.norm(road_positions[n] - road_positions[current])) |
| |
|
| | traj.append(road_positions[nxt]) |
| | prev, current = current, nxt |
| |
|
| | return np.array(traj) |
| |
|
| | def _fallback_anywalk(road_positions, step_size, length, start_idx=None): |
| | """ |
| | Undirected, geometry-only walk that steps to any neighbor ~step_size away. |
| | Used only when lane-constrained walk can't achieve the desired skeleton length. |
| | """ |
| | import numpy as np |
| | from scipy.spatial import KDTree |
| |
|
| | if start_idx is None: |
| | start_idx = np.random.randint(0, len(road_positions)) |
| | pos = road_positions[start_idx] |
| | traj = [pos] |
| | tree = KDTree(road_positions) |
| | for _ in range(length - 1): |
| | |
| | idxs = tree.query_ball_point(pos, r=step_size*1.1) |
| | candidates = [] |
| | for i in idxs: |
| | if np.allclose(road_positions[i], pos): |
| | continue |
| | d = np.linalg.norm(road_positions[i] - pos) |
| | if np.isclose(d, step_size, atol=0.15*max(1.0, step_size)): |
| | candidates.append(i) |
| | if not candidates: |
| | |
| | idxs = tree.query_ball_point(pos, r=max(1.5*step_size, step_size+0.5)) |
| | if not idxs: |
| | break |
| | |
| | i = min(idxs, key=lambda j: np.linalg.norm(road_positions[j]-pos)) |
| | else: |
| | i = np.random.choice(candidates) |
| | pos = road_positions[i] |
| | traj.append(pos) |
| | return np.array(traj) |
| |
|
| | def generate_n_smooth_grid_trajectories(G, road_positions, n, sequence_length=12, TURN_PROBABILITY=.15, max_attempts=2000, step_size=1.0): |
| | """ |
| | Try lane-aware skeletons first; if a sample can't reach `sequence_length`, |
| | build a geometric fallback walk so we never return zero trajectories. |
| | """ |
| | import numpy as np |
| | from scipy.spatial import KDTree |
| | trajs = [] |
| | attempts = 0 |
| | hard_cap = n * max_attempts |
| | tree = KDTree(road_positions) |
| | min_x, min_y = np.min(road_positions[:, 0]), np.min(road_positions[:, 1]) |
| | max_x, max_y = np.max(road_positions[:, 0]), np.max(road_positions[:, 1]) |
| |
|
| | while len(trajs) < n and attempts < hard_cap: |
| | rand = [np.random.uniform(min_x, max_x), np.random.uniform(min_y, max_y), 0] |
| | _, start_idx = tree.query(rand) |
| | t = generate_smooth_grid_trajectory(G, road_positions, TURN_PROBABILITY, sequence_length, start_node=start_idx) |
| | if len(t) < sequence_length: |
| | |
| | t = _fallback_anywalk(road_positions, step_size, sequence_length, start_idx=start_idx) |
| | if len(t) >= sequence_length: |
| | trajs.append(t[:sequence_length]) |
| | attempts += 1 |
| |
|
| | if len(trajs) < n: |
| | print(f"[warn] only {len(trajs)} / {n} trajectories generated (skeleton).") |
| | return trajs |
| |
|
| | def generate_pedestrian_trajectory(valid_positions, sequence_length=10, step_size=2.5, angle_std=0.1, start=None): |
| | tree = KDTree(valid_positions) |
| | if start is None: |
| | start = valid_positions[np.random.choice(len(valid_positions))] |
| | traj, ang, cur = [start], np.random.uniform(0, 2*np.pi), start |
| | for _ in range(sequence_length-1): |
| | ang += np.random.normal(0, angle_std) |
| | new_cont = cur + np.array([step_size*np.cos(ang), step_size*np.sin(ang), 0]) |
| | _, idx = tree.query(new_cont) |
| | new_pos = valid_positions[idx] |
| | traj.append(new_pos); cur = new_pos |
| | return np.array(traj) |
| |
|
| | def generate_n_pedestrian_trajectories(valid_positions, n, sequence_length=10, step_size=2.5, angle_std=0.1): |
| | return [generate_pedestrian_trajectory(valid_positions, sequence_length, step_size, angle_std) for _ in range(n)] |
| |
|
| | def get_trajectory_indices(trajectories, pos_total): |
| | pos_to_idx = {tuple(pos): i for i, pos in enumerate(pos_total)} |
| | out = [] |
| | for tr in trajectories: |
| | out.append([pos_to_idx.get(tuple(p), -1) for p in tr]) |
| | return out |
| |
|
| | |
| |
|
| | def compute_cumulative_time(trajectories, speed_profile): |
| | tr = np.array(trajectories); spd = np.array(speed_profile) |
| | n_s, n_t = tr.shape[0], tr.shape[1] |
| | cum = np.zeros((n_s, n_t)) |
| | deltas = np.diff(tr, axis=1) |
| | step_sizes = np.sqrt(np.sum(deltas**2, axis=2)) |
| | for s in range(n_s): |
| | cum[s,0] = 0.0 |
| | for t in range(n_t-1): |
| | v = spd[s,t]; dt = 0.0 if v == 0 else step_sizes[s,t] / v |
| | cum[s,t+1] = cum[s,t] + dt |
| | return cum |
| |
|
| | def compute_velocity_directions(trajectories, cum_times): |
| | tr = np.array(trajectories); ct = np.array(cum_times) |
| | n_s, n_t = tr.shape[0], tr.shape[1] |
| | dirs = np.zeros((n_s, n_t, 3)); angs = np.zeros((n_s, n_t)); vmag = np.zeros((n_s, n_t)) |
| | deltas = np.diff(tr, axis=1); dt = np.diff(ct, axis=1) |
| | for s in range(n_s): |
| | dx, dy, dz = deltas[s,:,0], deltas[s,:,1], deltas[s,:,2] |
| | dist = np.sqrt(dx**2 + dy**2 + dz**2) |
| | v = dist / np.clip(dt[s], 1e-12, None) |
| | vx, vy, vz = dx/np.clip(dt[s],1e-12,None), dy/np.clip(dt[s],1e-12,None), dz/np.clip(dt[s],1e-12,None) |
| | vm = np.sqrt(vx**2 + vy**2 + vz**2) |
| | dirs[s,:-1,0] = np.where(vm>0, vx/vm, 0); dirs[s,:-1,1] = np.where(vm>0, vy/vm, 0); dirs[s,:-1,2] = np.where(vm>0, vz/vm, 0) |
| | dirs[s,-1] = dirs[s,-2] |
| | vmag[s,:-1] = v; vmag[s,-1] = v[-1] |
| | angs[s,:-1] = np.degrees(np.arctan2(dy, dx)); angs[s,-1] = angs[s,-2] |
| | return dirs, angs, vmag |
| |
|
| | def compute_speed_profile(traj, road_graph, road_positions, is_vehicle=True, speed_range=(5/3.6, 60/3.6)): |
| | N = len(traj) |
| | v = float(np.random.uniform(*speed_range)) |
| | spd = np.full(N, v, float) |
| | same = np.allclose(traj[1:,:2], traj[:-1,:2], atol=1e-9) |
| | spd[1:][same] = 0.0 |
| | return spd |
| |
|
| | def compute_pedestrian_speed_profile(traj, speed_range=(0.5, 2.0), tol=1e-3): |
| | traj = np.asarray(traj, float); N = traj.shape[0] |
| | spd = np.zeros(N, float) |
| | v = float(np.random.uniform(*speed_range)) |
| | for i in range(N-1): |
| | spd[i] = 0.0 if np.allclose(traj[i+1], traj[i], atol=tol) else v |
| | spd[-1] = 0.0 |
| | return spd |
| |
|
| | |
| |
|
| | def unwrap_angle_deg(a0, a1): |
| | d = ((a1 - a0 + 180.0) % 360.0) - 180.0 |
| | return a0, a0 + d |
| |
|
| | def interpolate_ray_params(deepmimo_data, idx0, idx1, alpha): |
| | p0 = deepmimo_data['user']['paths'][idx0] |
| | p1 = deepmimo_data['user']['paths'][idx1] |
| | L0, L1 = int(p0['num_paths']), int(p1['num_paths']) |
| | if L0 == 0 or L1 == 0: |
| | return dict(num_paths=0, DoD_theta=np.array([]), DoD_phi=np.array([]), |
| | DoA_theta=np.array([]), DoA_phi=np.array([]), |
| | phase=np.array([]), ToA=np.array([]), power=np.array([]), LoS=np.array([], int)) |
| | o0 = np.argsort(-np.asarray(p0['power']).flatten()) |
| | o1 = np.argsort(-np.asarray(p1['power']).flatten()) |
| | L = min(L0, L1); i0, i1 = o0[:L], o1[:L] |
| |
|
| | def g(dct, key, sel): return np.asarray(dct[key]).flatten()[sel].astype(float) |
| |
|
| | pow0, pow1 = g(p0,'power',i0), g(p1,'power',i1) |
| | power = (1-alpha)*pow0 + alpha*pow1 |
| | ToA0, ToA1 = g(p0,'ToA',i0), g(p1,'ToA',i1) |
| | ToA = (1-alpha)*ToA0 + alpha*ToA1 |
| |
|
| | def ainterp(key): |
| | a0, a1 = g(p0,key,i0), g(p1,key,i1) |
| | out = np.zeros_like(a0) |
| | for n in range(L): |
| | u0,u1 = unwrap_angle_deg(a0[n], a1[n]) |
| | out[n] = (1-alpha)*u0 + alpha*u1 |
| | return out |
| | DoD_theta, DoD_phi = ainterp('DoD_theta'), ainterp('DoD_phi') |
| | DoA_theta, DoA_phi = ainterp('DoA_theta'), ainterp('DoA_phi') |
| |
|
| | ph0, ph1 = np.deg2rad(g(p0,'phase',i0)), np.deg2rad(g(p1,'phase',i1)) |
| | dphi = np.angle(np.exp(1j*(ph1-ph0))) |
| | phase = np.rad2deg(ph0 + alpha*dphi) |
| |
|
| | LoS = (g(p0,'LoS',i0).astype(int) + g(p1,'LoS',i1).astype(int) > 0).astype(int) |
| |
|
| | return dict(num_paths=L, DoD_theta=DoD_theta, DoD_phi=DoD_phi, |
| | DoA_theta=DoA_theta, DoA_phi=DoA_phi, phase=phase, |
| | ToA=ToA, power=power, LoS=LoS) |
| |
|
| | |
| |
|
| | def array_response_phase(theta, phi, kd): |
| | gamma_x = 1j * kd * np.sin(theta) * np.cos(phi) |
| | gamma_y = 1j * kd * np.sin(theta) * np.sin(phi) |
| | gamma_z = 1j * kd * np.cos(theta) |
| | return np.vstack([gamma_x, gamma_y, gamma_z]).T |
| |
|
| | def array_response(ant_ind, theta, phi, kd): |
| | gamma = array_response_phase(theta, phi, kd) |
| | return np.exp(ant_ind @ gamma.T) |
| |
|
| | def ant_indices(panel_size): |
| | gx = np.tile(np.arange(1), panel_size[0] * panel_size[1]) |
| | gy = np.tile(np.repeat(np.arange(panel_size[0]), 1), panel_size[1]) |
| | gz = np.repeat(np.arange(panel_size[1]), panel_size[0]) |
| | return np.vstack([gx, gy, gz]).T |
| |
|
| | def apply_FoV(FoV, theta, phi): |
| | theta = np.mod(theta, 2*np.pi); phi = np.mod(phi, 2*np.pi) |
| | FoV = np.deg2rad(FoV) |
| | inc_phi = np.logical_or(phi <= 0 + FoV[0]/2, phi >= 2*np.pi - FoV[0]/2) |
| | inc_the = np.logical_and(theta <= np.pi/2 + FoV[1]/2, theta >= np.pi/2 - FoV[1]/2) |
| | return np.logical_and(inc_phi, inc_the) |
| |
|
| | def rotate_angles(rotation, theta, phi): |
| | theta = np.deg2rad(theta); phi = np.deg2rad(phi) |
| | if rotation is not None: |
| | R = np.deg2rad(rotation) |
| | sa = np.sin(phi - R[2]); sb = np.sin(R[1]); sg = np.sin(R[0]) |
| | ca = np.cos(phi - R[2]); cb = np.cos(R[1]); cg = np.cos(R[0]) |
| | st, ct = np.sin(theta), np.cos(theta) |
| | theta = np.arccos(cb*cg*ct + st*(sb*cg*ca - sg*sa)) |
| | phi = np.angle(cb*st*ca - sb*ct + 1j*(cb*sg*ct + st*(sb*sg*ca + cg*sa))) |
| | return theta, phi |
| |
|
| | class OFDM_PathGenerator: |
| | def __init__(self, params, subcarriers): |
| | self.params = params |
| | self.O = params[c.PARAMSET_OFDM] |
| | self.generate = self.no_LPF if self.O[c.PARAMSET_OFDM_LPF] == 0 else self.with_LPF |
| | self.subcarriers = subcarriers |
| | self.total_sc = self.O[c.PARAMSET_OFDM_SC_NUM] |
| | self.delay_d = np.arange(self.O['subcarriers']) |
| | self.delay_to_OFDM = np.exp(-1j*2*np.pi/self.total_sc * np.outer(self.delay_d, self.subcarriers)) |
| |
|
| | def _doppler_phase(self, raydata): |
| | if not (self.params[c.PARAMSET_DOPPLER_EN] and self.params[c.PARAMSET_SCENARIO_PARAMS][c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN]): |
| | return None |
| | fc = self.params[c.PARAMSET_SCENARIO_PARAMS][c.PARAMSET_SCENARIO_PARAMS_CF] |
| | v = np.asarray(raydata.get(c.OUT_PATH_DOP_VEL, 0.0)).reshape(-1,1) |
| | t = np.asarray(raydata.get('elapsed_time', 0.0)).reshape(-1,1) |
| | return np.exp(-1j*2*np.pi*(fc/c.LIGHTSPEED)*(v*t)) |
| |
|
| | def no_LPF(self, raydata, Ts): |
| | power = raydata[c.OUT_PATH_RX_POW].reshape(-1,1) |
| | delay_n = (raydata[c.OUT_PATH_TOA] / Ts).reshape(-1,1) |
| | phase = raydata[c.OUT_PATH_PHASE].reshape(-1,1) |
| | over = (delay_n >= self.O['subcarriers']) |
| | power[over] = 0; delay_n[over] = self.O['subcarriers'] |
| | path_const = np.sqrt(power/self.total_sc) * np.exp(1j*(np.deg2rad(phase) - (2*np.pi/self.total_sc)*np.outer(delay_n, self.subcarriers))) |
| | DP = self._doppler_phase(raydata) |
| | if DP is not None: path_const *= DP |
| | return path_const |
| |
|
| | def with_LPF(self, raydata, Ts): |
| | power = raydata[c.OUT_PATH_RX_POW].reshape(-1,1) |
| | delay_n = (raydata[c.OUT_PATH_TOA] / Ts).reshape(-1,1) |
| | phase = raydata[c.OUT_PATH_PHASE].reshape(-1,1) |
| | over = (delay_n >= self.O['subcarriers']) |
| | power[over] = 0; delay_n[over] = self.O['subcarriers'] |
| | pulse = np.sinc(self.delay_d - delay_n) * np.sqrt(power/self.total_sc) * np.exp(1j*np.deg2rad(phase)) |
| | DP = self._doppler_phase(raydata) |
| | if DP is not None: pulse *= DP |
| | return pulse |
| |
|
| | def generate_MIMO_channel(raydata, params, tx_ant_params, rx_ant_params): |
| | bw = params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_BW] * c.PARAMSET_OFDM_BW_MULT |
| | kd_tx = 2*np.pi*tx_ant_params[c.PARAMSET_ANT_SPACING] |
| | kd_rx = 2*np.pi*rx_ant_params[c.PARAMSET_ANT_SPACING] |
| | Ts = 1 / bw |
| | subc = params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_SC_SAMP] |
| | pg = OFDM_PathGenerator(params, subc) |
| |
|
| | M_tx = int(np.prod(tx_ant_params[c.PARAMSET_ANT_SHAPE])); ind_tx = ant_indices(tx_ant_params[c.PARAMSET_ANT_SHAPE]) |
| | M_rx = int(np.prod(rx_ant_params[c.PARAMSET_ANT_SHAPE])); ind_rx = ant_indices(rx_ant_params[c.PARAMSET_ANT_SHAPE]) |
| |
|
| | ch = np.zeros((len(raydata), M_rx, M_tx, len(subc)), dtype=np.csingle) |
| | los = np.zeros((len(raydata)), dtype=np.int8) - 2 |
| |
|
| | for i in range(len(raydata)): |
| | if raydata[i][c.OUT_PATH_NUM] == 0: |
| | los[i] = -1; continue |
| | dod_t, dod_p = rotate_angles(tx_ant_params[c.PARAMSET_ANT_ROTATION], raydata[i][c.OUT_PATH_DOD_THETA], raydata[i][c.OUT_PATH_DOD_PHI]) |
| | doa_t, doa_p = rotate_angles(rx_ant_params[c.PARAMSET_ANT_ROTATION], raydata[i][c.OUT_PATH_DOA_THETA], raydata[i][c.OUT_PATH_DOA_PHI]) |
| | f_tx = apply_FoV(tx_ant_params[c.PARAMSET_ANT_FOV], dod_t, dod_p) |
| | f_rx = apply_FoV(rx_ant_params[c.PARAMSET_ANT_FOV], doa_t, doa_p) |
| | f = np.logical_and(f_tx, f_rx) |
| | dod_t, dod_p, doa_t, doa_p = dod_t[f], dod_p[f], doa_t[f], doa_p[f] |
| | for k in list(raydata[i].keys()): |
| | if k == c.OUT_PATH_NUM: raydata[i][k] = f.sum() |
| | elif isinstance(raydata[i][k], np.ndarray) and raydata[i][k].shape[0] == f.shape[0]: |
| | raydata[i][k] = raydata[i][k][f] |
| | if raydata[i][c.OUT_PATH_NUM] == 0: |
| | los[i] = -1; continue |
| | else: |
| | los[i] = int(np.sum(raydata[i].get('LoS', np.zeros(int(raydata[i][c.OUT_PATH_NUM]))))) |
| |
|
| | aTX = array_response(ind_tx, dod_t, dod_p, kd_tx) |
| | aRX = array_response(ind_rx, doa_t, doa_p, kd_rx) |
| | path_const = pg.generate(raydata[i], Ts) |
| |
|
| | if params[c.PARAMSET_OFDM][c.PARAMSET_OFDM_LPF] == 0: |
| | ch[i] = np.sum(aRX[:,None,None,:] * aTX[None,:,None,:] * path_const.T[None,None,:,:], axis=3) |
| | else: |
| | ch[i] = (np.sum(aRX[:,None,None,:] * aTX[None,:,None,:] * path_const.T[None,None,:,:], axis=3)) @ pg.delay_to_OFDM |
| | return ch, los |
| |
|
| | def generate_channel_from_interpolated_ray(ray_interp, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, fc): |
| | raydata = np.array([{ |
| | c.OUT_PATH_NUM: int(ray_interp['num_paths']), |
| | c.OUT_PATH_DOD_THETA: np.asarray(ray_interp['DoD_theta']), |
| | c.OUT_PATH_DOD_PHI: np.asarray(ray_interp['DoD_phi']), |
| | c.OUT_PATH_DOA_THETA: np.asarray(ray_interp['DoA_theta']), |
| | c.OUT_PATH_DOA_PHI: np.asarray(ray_interp['DoA_phi']), |
| | c.OUT_PATH_PHASE: np.asarray(ray_interp['phase']), |
| | c.OUT_PATH_TOA: np.asarray(ray_interp['ToA']), |
| | c.OUT_PATH_RX_POW: np.asarray(ray_interp['power']), |
| | 'LoS': np.asarray(ray_interp.get('LoS', np.zeros(int(ray_interp['num_paths'])))), |
| | c.OUT_PATH_DOP_VEL: np.asarray(ray_interp.get('Doppler_vel', np.zeros(int(ray_interp['num_paths'])))), |
| | 'elapsed_time': np.asarray(ray_interp.get('elapsed_time', np.zeros(int(ray_interp['num_paths'])))), |
| | }], dtype=object) |
| |
|
| | params = { |
| | c.PARAMSET_OFDM: { |
| | c.PARAMSET_OFDM_BW: 1.92e-3, |
| | c.PARAMSET_OFDM_SC_SAMP: np.arange(num_subcarriers), |
| | c.PARAMSET_OFDM_SC_NUM: num_subcarriers, |
| | c.PARAMSET_OFDM_LPF: 0, |
| | 'subcarriers': num_subcarriers |
| | }, |
| | c.PARAMSET_FDTD: True, |
| | c.PARAMSET_DOPPLER_EN: True, |
| | c.PARAMSET_SCENARIO_PARAMS: { c.PARAMSET_SCENARIO_PARAMS_DOPPLER_EN: True, c.PARAMSET_SCENARIO_PARAMS_CF: float(fc) } |
| | } |
| | tx = { c.PARAMSET_ANT_SHAPE: np.array([num_antennas_tx_hor, num_antennas_tx_vert, 1]), |
| | c.PARAMSET_ANT_SPACING: 0.5, c.PARAMSET_ANT_ROTATION: np.array([0,0,-135]), |
| | c.PARAMSET_ANT_FOV: [360,180], c.PARAMSET_ANT_RAD_PAT: 'isotropic' } |
| | rx = { c.PARAMSET_ANT_SHAPE: np.array([1,1,1]), |
| | c.PARAMSET_ANT_SPACING: 0.5, c.PARAMSET_ANT_ROTATION: np.array([0,0,0]), |
| | c.PARAMSET_ANT_FOV: [360,180], c.PARAMSET_ANT_RAD_PAT: 'isotropic' } |
| | ch, los = generate_MIMO_channel(raydata, params, tx, rx) |
| | return ch, None, los |
| |
|
| | |
| |
|
| | def dynamic_scenario_gen( |
| | scenario, |
| | time_steps=20, |
| | num_antennas_rx=1, |
| | num_antennas_tx_hor=32, |
| | num_antennas_tx_vert=1, |
| | num_subcarriers=32, |
| | step_size=1.0, |
| | road_center_spacing=25, |
| | road_width=6, |
| | turn_probability=0.1, |
| | n_car=50, |
| | n_ped=10, |
| | max_attempts=300, |
| | angle_std=0.1, |
| | fc=3.5e9, |
| | save_filename="trajectory_doppler_data.csv", |
| | plot_background=True, |
| | auto_step_size=True, |
| | sample_dt=1e-3, |
| | car_speed_range=(5/3.6, 60/3.6), |
| | ped_speed_range=(0.5, 2.0), |
| | continuous_mode=True, |
| | cont_len=None |
| | ): |
| | |
| | os.makedirs("data", exist_ok=True) |
| | fname = f"{scenario}_{num_antennas_tx_hor}_{num_antennas_tx_vert}_{num_subcarriers}.p" |
| | fpath = os.path.join("data", fname) |
| | if not os.path.exists(fpath): |
| | print(f"Generating DeepMIMO data for scenario {scenario}...") |
| | deepmimo_data = DeepMIMO_data_gen(scenario, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, 1)[0] |
| | |
| | with open(fpath, 'wb') as f: |
| | pickle.dump(deepmimo_data, f) |
| | print(f"DeepMIMO data saved to {fpath}") |
| | else: |
| | print(f"Loading cached DeepMIMO data from {fpath}") |
| | try: |
| | with open(fpath, 'rb') as f: |
| | deepmimo_data = pickle.load(f) |
| | except (pickle.UnpicklingError, EOFError, ValueError) as e: |
| | print(f"Error loading cached data: {e}. Regenerating...") |
| | deepmimo_data = DeepMIMO_data_gen(scenario, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, 1)[0] |
| | |
| | with open(fpath, 'wb') as f: |
| | pickle.dump(deepmimo_data, f) |
| | print(f"DeepMIMO data regenerated and saved to {fpath}") |
| |
|
| | path_exist = np.array(deepmimo_data['user']['LoS']) |
| | pos_total = np.array(deepmimo_data['user']['location']) |
| |
|
| | |
| | if plot_background: |
| | x,y = pos_total[:,0], pos_total[:,1] |
| | cols = {0:'orange', -1:'white', 1:'gray'} |
| | plt.figure(figsize=(8,8), dpi=160) |
| | for v,col in cols.items(): |
| | m = path_exist==v; plt.scatter(x[m], y[m], s=3, c=col, alpha=0.6, label=f"LoS={v}") |
| | plt.legend(); plt.grid(True); os.makedirs("figs", exist_ok=True) |
| | plt.savefig("figs/environment.png"); plt.close() |
| |
|
| | inferred_step = infer_grid_step(pos_total) |
| | STEP = float(inferred_step if auto_step_size else step_size) |
| | print(f"[dynamic_scenario_gen] road step size used: {STEP:.6f} m (inferred={inferred_step:.6f} m)") |
| |
|
| | |
| | valid_positions = pos_total[(path_exist == 0) | (path_exist == 1)] |
| | road_positions, lanes = filter_road_positions(valid_positions, road_width, road_center_spacing) |
| | road_graph, road_positions = create_grid_road_network(road_positions, lanes, STEP) |
| | print("Total Nodes in Graph:", len(road_graph.nodes())) |
| |
|
| | vehicle_trajs = generate_n_smooth_grid_trajectories( |
| | road_graph, road_positions, n_car, sequence_length=time_steps, |
| | TURN_PROBABILITY=turn_probability, max_attempts=max_attempts |
| | ) |
| | ped_trajs = generate_n_pedestrian_trajectories( |
| | valid_positions, n_ped, sequence_length=time_steps, |
| | step_size=STEP, angle_std=angle_std |
| | ) |
| | veh_idx = get_trajectory_indices(vehicle_trajs, pos_total) |
| | ped_idx = get_trajectory_indices(ped_trajs, pos_total) |
| |
|
| | |
| | Mtx = num_antennas_tx_hor * num_antennas_tx_vert |
| | channel_discrete = np.zeros((n_car+n_ped, time_steps, Mtx, num_subcarriers), np.complex128) |
| |
|
| | |
| | if cont_len is None: |
| | cont_len = time_steps |
| |
|
| | def build_tracks(trajs, idxs, speed_rng, N, dt): |
| | tracks = [] |
| | for traj_pos, idl in zip(trajs, idxs): |
| | v = float(np.random.uniform(*speed_rng)) |
| | pos_c, pairs, alpha, vdir = sample_continuous_along_polyline(traj_pos, idl, v, dt, N) |
| | tracks.append(dict(speed=v, pos=pos_c, pairs=pairs, alpha=alpha, vdir=vdir)) |
| | return tracks |
| |
|
| | car_tracks = build_tracks(vehicle_trajs, veh_idx, car_speed_range, cont_len, sample_dt) |
| | ped_tracks = build_tracks(ped_trajs, ped_idx, ped_speed_range, cont_len, sample_dt) |
| |
|
| | |
| | def channels_for_tracks(tracks): |
| | ch_all, pos_all, v_all, a_all, dop_all, ang_all, del_all, step_xy_all = [], [], [], [], [], [], [], [] |
| | for tr in tracks: |
| | v = tr["speed"]; pos = tr["pos"]; pairs = tr["pairs"]; alpha = tr["alpha"]; vdir = tr["vdir"] |
| | ch_list, dop_list, ang_list, del_list = [], [], [], [] |
| | vel_series = np.full(len(alpha), v, float) |
| | acc_series = np.zeros_like(vel_series) |
| | |
| | step_xy = np.zeros(len(alpha), float) |
| | step_xy[1:] = np.linalg.norm(pos[1:,:2] - pos[:-1,:2], axis=1) |
| |
|
| | for k in range(len(alpha)): |
| | i0,i1 = pairs[k]; a = float(alpha[k]); vd = vdir[k] |
| | ir = interpolate_ray_params(deepmimo_data, i0, i1, a) |
| | if ir['num_paths'] == 0: |
| | ch_list.append(np.zeros((Mtx, num_subcarriers), np.complex128)) |
| | dop_list.append(np.zeros((0,), np.float32)) |
| | ang_list.append(np.zeros((0,), np.float32)) |
| | del_list.append(np.zeros((0,), np.float32)) |
| | continue |
| |
|
| | |
| | doa_phi = np.deg2rad(ir['DoA_phi']) |
| | aoa_unit = np.stack([np.cos(doa_phi), np.sin(doa_phi)], axis=1) |
| | v_proj = -np.sum(aoa_unit * vd[None,:], axis=1) * v |
| |
|
| | ir['Doppler_vel'] = v_proj.astype(np.float32) |
| | ir['elapsed_time'] = np.ones_like(ir['power'], dtype=np.float32) * (k * sample_dt) |
| |
|
| | pred, _, _ = generate_channel_from_interpolated_ray(ir, num_antennas_tx_hor, num_antennas_tx_vert, num_subcarriers, fc) |
| | ch_list.append(np.asarray(pred[0]).squeeze(0)) |
| | dop_list.append(v_proj.astype(np.float32)) |
| | ang_list.append(ir['DoA_phi'].astype(np.float32)) |
| | del_list.append(ir['ToA'].astype(np.float32)) |
| |
|
| | ch_all.append(np.stack(ch_list, axis=0)) |
| | pos_all.append(pos.astype(np.float32)) |
| | v_all.append(vel_series.astype(np.float32)) |
| | a_all.append(acc_series.astype(np.float32)) |
| | dop_all.append(dop_list); ang_all.append(ang_list); del_all.append(del_list) |
| | step_xy_all.append(step_xy.astype(np.float32)) |
| |
|
| | return (np.stack(ch_all, axis=0), |
| | np.stack(pos_all, axis=0), |
| | np.stack(v_all, axis=0), |
| | np.stack(a_all, axis=0), |
| | dop_all, ang_all, del_all, |
| | np.stack(step_xy_all, axis=0)) |
| |
|
| | car_ch, car_pos, car_vel, car_acc, car_dop, car_ang, car_del, car_step = channels_for_tracks(car_tracks) |
| | ped_ch, ped_pos, ped_vel, ped_acc, ped_dop, ped_ang, ped_del, ped_step = channels_for_tracks(ped_tracks) |
| |
|
| | channel_cont = np.concatenate([car_ch, ped_ch], axis=0) |
| | pos_cont = np.concatenate([car_pos, ped_pos], axis=0) |
| | vel_cont = np.concatenate([car_vel, ped_vel], axis=0) |
| | acc_cont = np.concatenate([car_acc, ped_acc], axis=0) |
| | step_cont = np.concatenate([car_step, ped_step], axis=0) |
| | doppler_cont = car_dop + ped_dop |
| | angle_cont = car_ang + ped_ang |
| | delay_cont = car_del + ped_del |
| |
|
| | |
| | |
| | out = { |
| | "scenario": scenario, |
| | "index_discrete": veh_idx + ped_idx, |
| | "grid_step": STEP, |
| | "sample_dt": sample_dt, |
| | "car_speed_range": car_speed_range, |
| | "ped_speed_range": ped_speed_range, |
| | "los": path_exist, |
| |
|
| | |
| | "channel_cont": channel_cont, |
| | "pos_cont": pos_cont, |
| | "vel_cont": vel_cont, |
| | "acc_cont": acc_cont, |
| | "doppler_vel_cont": doppler_cont, |
| | "angle_cont": angle_cont, |
| | "delay_cont": delay_cont, |
| | "pos_step_xy": step_cont, |
| |
|
| | |
| | "channel_discrete": channel_discrete, |
| | "pos_discrete": vehicle_trajs + ped_trajs |
| | } |
| |
|
| | if continuous_mode: |
| | out["channel"] = channel_cont |
| | out["pos"] = pos_cont |
| | out["vel"] = vel_cont |
| | out["acc"] = acc_cont |
| | out["doppler_vel"] = doppler_cont |
| | out["angle"] = angle_cont |
| | out["delay"] = delay_cont |
| | else: |
| | out["channel"] = channel_discrete |
| | |
| |
|
| | return out |
| |
|
| | |
| |
|
| | def channel_dim_generator(seed=42): |
| | import pandas as pd |
| | np.random.seed(seed) |
| | n_scenarios = 2000; max_product = 2**16 |
| | dims = [] |
| | while len(dims) < n_scenarios: |
| | a1 = np.random.randint(0,16); time_steps = 2**4 - a1 |
| | a2 = np.random.randint(0,6); num_ant = 2**(7-a2) |
| | a3 = np.random.randint(0,6); num_sc = 2**(9-a3) |
| | prod = time_steps * num_ant * num_sc |
| | if prod <= max_product: |
| | h,v = max(1,int(round(np.sqrt(num_ant)))), 1 |
| | |
| | for k in range(1, int(np.sqrt(num_ant))+1): |
| | if num_ant % k == 0: h,v = num_ant//k, k |
| | dims.append([time_steps, h, v, num_sc, prod]) |
| | arr = np.array(dims); np.random.shuffle(arr) |
| | df = pd.DataFrame(arr, columns=['time_steps','num_antennas_tx_hor','num_antennas_tx_vert','num_subcarriers','product']) |
| | df.to_csv('channel_dimensions.csv', index=False) |
| | print(f"Generated {len(df)} sets.") |
| | return df |
| |
|
| | def get_channel_dimensions(df, i): |
| | if i < 0 or i >= len(df): |
| | return f"Error: Index {i} is out of range. Valid range is 0 to {len(df)-1}" |
| | r = df.iloc[i, :4].tolist() |
| | return dict(time_steps=int(r[0]), num_antennas_tx_hor=int(r[1]), num_antennas_tx_vert=int(r[2]), num_subcarriers=int(r[3])) |
| |
|
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| |
|
| | def make_channel_gif( |
| | scenario_data, |
| | sample_idx=0, |
| | out_gif="channel_evolution.gif", |
| | mode="heatmap", |
| | tx_shape=None, |
| | fps=12, |
| | clim=None, |
| | downsample_every=1, |
| | annotate=True, |
| | figsize=(6,4), |
| | cmap="gray_r" |
| | ): |
| | """ |
| | Create a GIF visualizing channel evolution for a single trajectory. |
| | |
| | scenario_data["channel"] can be (N, T, M_tx, N_sc) or (N, T, M_rx, M_tx, N_sc). |
| | Optional: scenario_data["pos"] (N, T, 3), scenario_data["vel"] (N, T), scenario_data["sample_dt"]. |
| | """ |
| | try: |
| | import imageio.v2 as imageio |
| | except Exception: |
| | raise ImportError("imageio is required. Install with: pip install imageio") |
| |
|
| | ch = scenario_data["channel"] |
| | if ch.ndim == 5: |
| | |
| | ch = ch.mean(axis=2) |
| | ch = ch[sample_idx] |
| | T, Mtx, Nsc = ch.shape |
| |
|
| | |
| | pos = scenario_data.get("pos", None) |
| | vel = scenario_data.get("vel", None) |
| | dt = scenario_data.get("sample_dt", None) |
| |
|
| | pos_sample = pos[sample_idx] if isinstance(pos, np.ndarray) else None |
| | vel_sample = vel[sample_idx] if isinstance(vel, np.ndarray) else None |
| |
|
| | |
| | eps = 1e-12 |
| | mag_db_all = 20.0 * np.log10(np.maximum(np.abs(ch), eps)) |
| | if clim is None: |
| | vmin = float(np.percentile(mag_db_all, 5)) |
| | vmax = float(np.percentile(mag_db_all, 95)) |
| | else: |
| | vmin, vmax = clim |
| |
|
| | frames = [] |
| | time_indices = list(range(0, T, max(1, int(downsample_every)))) |
| |
|
| | for t in time_indices: |
| | fig, ax = plt.subplots(figsize=figsize, dpi=140) |
| |
|
| | if mode == "heatmap": |
| | mag_db = mag_db_all[t] |
| | im = ax.imshow(mag_db, aspect="auto", origin="lower", |
| | vmin=vmin, vmax=vmax, cmap=cmap) |
| | ax.set_xlabel("Subcarrier index") |
| | ax.set_ylabel("Tx element") |
| | title = f"|H| (dB) — t={t}" |
| | if dt is not None: |
| | title += f" ({t*dt:.4f} s)" |
| | if annotate and vel_sample is not None: |
| | title += f", v={vel_sample[t]:.2f} m/s" |
| | ax.set_title(title) |
| | cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) |
| | cbar.set_label("dB") |
| |
|
| | elif mode == "angle-delay": |
| | if tx_shape is None or np.prod(tx_shape) != Mtx: |
| | plt.close(fig) |
| | raise ValueError( |
| | 'mode="angle-delay" needs tx_shape=(num_antennas_tx_hor, num_antennas_tx_vert) ' |
| | "matching M_tx." |
| | ) |
| | Ht = ch[t].reshape(tx_shape[0], tx_shape[1], Nsc) |
| | Hh = Ht.mean(axis=1) |
| |
|
| | |
| | Hd = np.fft.ifftshift(np.fft.ifft(np.fft.fftshift(Hh, axes=1), axis=1), axes=1) |
| | |
| | Ha = np.fft.fftshift(np.fft.fft(Hd, axis=0), axes=0) |
| | PdB = 20.0 * np.log10(np.maximum(np.abs(Ha), eps)) |
| |
|
| | im = ax.imshow(PdB, aspect="auto", origin="lower", cmap=cmap) |
| | ax.set_xlabel("Delay bin") |
| | ax.set_ylabel("Angle bin") |
| | title = f"Angle–Delay power (dB) — t={t}" |
| | if dt is not None: |
| | title += f" ({t*dt:.4f} s)" |
| | if annotate and vel_sample is not None: |
| | title += f", v={vel_sample[t]:.2f} m/s" |
| | ax.set_title(title) |
| | cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) |
| | cbar.set_label("dB") |
| | else: |
| | plt.close(fig) |
| | raise ValueError('mode must be "heatmap" or "angle-delay"') |
| |
|
| | if annotate and pos_sample is not None: |
| | x, y = pos_sample[t, 0], pos_sample[t, 1] |
| | ax.text( |
| | 0.01, 0.99, |
| | f"pos=({x:.2f}, {y:.2f})", |
| | transform=ax.transAxes, ha="left", va="top", |
| | fontsize=8, color="w", |
| | bbox=dict(facecolor="k", alpha=0.35, pad=2, lw=0) |
| | ) |
| |
|
| | |
| | fig.canvas.draw() |
| | try: |
| | |
| | w, h = fig.canvas.get_width_height() |
| | buf = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) |
| | frame = buf.reshape(h, w, 4)[..., :3] |
| | except Exception: |
| | |
| | import io |
| | try: |
| | import imageio.v2 as imageio |
| | except Exception: |
| | raise ImportError("imageio is required. Install with: pip install imageio") |
| | bio = io.BytesIO() |
| | fig.savefig(bio, format="png", bbox_inches="tight", pad_inches=0) |
| | bio.seek(0) |
| | frame = imageio.imread(bio) |
| | bio.close() |
| |
|
| | frames.append(frame) |
| | plt.close(fig) |
| |
|
| | imageio.mimsave(out_gif, frames, fps=fps) |
| | return out_gif |
| |
|
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | from matplotlib.animation import FuncAnimation, PillowWriter |
| |
|
| | def _angle_delay_transform(H_seq, tx_shape): |
| | """ |
| | H_seq: (T, M_tx, N_sc) complex channel for one UE |
| | tx_shape: (M_h, M_v), M_h * M_v == M_tx |
| | Returns: Had (T, M_h, M_v, N_sc) in angle–delay domain |
| | - Angle: 2D FFT over (M_h, M_v) with fftshift |
| | - Delay: IFFT over subcarriers |
| | """ |
| | T, Mtx, Nsc = H_seq.shape |
| | Mh, Mv = tx_shape |
| | if Mh * Mv != Mtx: |
| | raise ValueError(f"tx_shape {tx_shape} must multiply to {Mtx}") |
| | H4 = H_seq.reshape(T, Mh, Mv, Nsc) |
| | |
| | Ha = np.fft.fftshift(np.fft.fftn(H4, axes=(1, 2)), axes=(1, 2)) |
| | |
| | Had = np.fft.ifft(Ha, axis=-1) |
| | return Had |
| |
|
| | def _pick_topk_bins(Had, k): |
| | """ |
| | Had: (T, Mh, Mv, Nsc) |
| | Select top-k bins by mean |Had| over time. |
| | Returns: list of (ih, iv, idelay) sorted by descending mean magnitude. |
| | """ |
| | mag = np.abs(Had) |
| | mean_mag = mag.mean(axis=0) |
| | flat = mean_mag.reshape(-1) |
| | k = int(min(k, flat.size)) |
| | idx = np.argpartition(flat, -k)[-k:] |
| | idx = idx[np.argsort(-flat[idx])] |
| | bins = [np.unravel_index(i, mean_mag.shape) for i in idx] |
| | return bins |
| |
|
| | def make_topk_angle_delay_curves_gif( |
| | scenario_data, |
| | sample_idx=0, |
| | tx_shape=(32, 1), |
| | k=5, |
| | out_gif="topk_bins_evolution.gif", |
| | fps=12, |
| | unwrap_phase=True, |
| | downsample_every=1 |
| | ): |
| | """ |
| | Visualize the evolution of magnitude & phase for the top-k angle–delay bins (by time-avg magnitude). |
| | Produces a GIF with two subplots (magnitude/phase) where each bin’s curve reveals over time. |
| | |
| | Args: |
| | scenario_data: dict returned by dynamic_scenario_gen (must contain "channel" and optionally "sample_dt") |
| | sample_idx: which UE trajectory to visualize |
| | tx_shape: (M_h, M_v) so that M_h * M_v == M_tx |
| | k: number of bins to track |
| | out_gif: output filename (GIF) |
| | fps: frames per second for the animation |
| | unwrap_phase: if True, show unwrapped phase (continuous); else raw angle in [-pi, pi] |
| | downsample_every: plot every Nth time sample to shorten very long sequences |
| | |
| | Returns: |
| | out_gif (str): path to the saved GIF |
| | """ |
| | |
| | H = scenario_data["channel"][sample_idx] |
| | T = H.shape[0] |
| | dt = float(scenario_data.get("sample_dt", 1.0)) |
| | t_axis = np.arange(T) * dt |
| |
|
| | Had = _angle_delay_transform(H, tx_shape) |
| | bins = _pick_topk_bins(Had, k) |
| |
|
| | |
| | mags, phases = [], [] |
| | for (ih, iv, idl) in bins: |
| | ts = Had[:, ih, iv, idl] |
| | mags.append(np.abs(ts)) |
| | ph = np.angle(ts) |
| | if unwrap_phase: |
| | ph = np.unwrap(ph) |
| | phases.append(ph) |
| |
|
| | mags = np.stack(mags, axis=0) |
| | phases = np.stack(phases, axis=0) |
| |
|
| | |
| | if downsample_every > 1: |
| | mags = mags[:, ::downsample_every] |
| | phases = phases[:, ::downsample_every] |
| | t_axis = t_axis[::downsample_every] |
| | Tds = t_axis.size |
| |
|
| | |
| | fig, axes = plt.subplots(1, 2, figsize=(10, 4), dpi=120) |
| | ax_mag, ax_ph = axes |
| |
|
| | |
| | mag_lines = [] |
| | ph_lines = [] |
| | for i in range(len(bins)): |
| | (ml,) = ax_mag.plot([], [], lw=1.5, label=f"bin {i}: (ah={bins[i][0]}, av={bins[i][1]}, τ={bins[i][2]})") |
| | (pl,) = ax_ph.plot([], [], lw=1.5, label=f"bin {i}") |
| | mag_lines.append(ml) |
| | ph_lines.append(pl) |
| |
|
| | ax_mag.set_title("Top-k angle–delay bins: magnitude vs time") |
| | ax_mag.set_xlabel("time (s)") |
| | ax_mag.set_ylabel("|H|") |
| | ax_mag.grid(True) |
| | ax_mag.legend(loc="best", fontsize=8) |
| |
|
| | ax_ph.set_title("Top-k angle–delay bins: phase vs time") |
| | ax_ph.set_xlabel("time (s)") |
| | ax_ph.set_ylabel("phase (rad)" if unwrap_phase else "phase (wrapped)") |
| | ax_ph.grid(True) |
| |
|
| | |
| | mag_max = float(np.max(mags)) if mags.size else 1.0 |
| | ax_mag.set_ylim(0, 1.05 * mag_max) |
| | |
| | if phases.size: |
| | ph_min, ph_max = float(np.min(phases)), float(np.max(phases)) |
| | if ph_max - ph_min < 1e-6: |
| | ph_min -= 1.0 |
| | ph_max += 1.0 |
| | ax_ph.set_ylim(ph_min - 0.05 * abs(ph_min), ph_max + 0.05 * abs(ph_max)) |
| |
|
| | ax_mag.set_xlim(t_axis[0], t_axis[-1]) |
| | ax_ph.set_xlim(t_axis[0], t_axis[-1]) |
| |
|
| | |
| | def _init(): |
| | for ln in mag_lines + ph_lines: |
| | ln.set_data([], []) |
| | return mag_lines + ph_lines |
| |
|
| | def _update(frame): |
| | |
| | for i in range(len(bins)): |
| | mag_lines[i].set_data(t_axis[:frame+1], mags[i, :frame+1]) |
| | ph_lines[i].set_data(t_axis[:frame+1], phases[i, :frame+1]) |
| |
|
| | fig.suptitle(f"Sample {sample_idx} — frame {frame+1}/{Tds}", fontsize=11) |
| | return mag_lines + ph_lines |
| |
|
| | ani = FuncAnimation( |
| | fig, _update, frames=Tds, init_func=_init, |
| | interval=1000.0 / fps, blit=False, repeat=False |
| | ) |
| |
|
| | |
| | writer = PillowWriter(fps=fps) |
| | ani.save(out_gif, writer=writer) |
| | plt.close(fig) |
| | return out_gif |
| |
|