File size: 8,403 Bytes
265d187 95923eb 265d187 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
import numpy as np # type: ignore[import]
_LOGGER = logging.getLogger(__name__)
try: # pragma: no cover - optional dependency
import deepmimo # type: ignore[import]
from deepmimo import config as deepmimo_config # type: ignore[import]
_HAS_DEEPMIMO_V4 = True
except Exception: # pragma: no cover - DeepMIMO v4 not installed
deepmimo = None # type: ignore[assignment]
deepmimo_config = None # type: ignore[assignment]
_HAS_DEEPMIMO_V4 = False
try: # pragma: no cover - legacy fallback
from input_preprocess import DeepMIMO_data_gen as _legacy_data_gen # type: ignore[import]
except Exception: # pragma: no cover - legacy loader unavailable
_legacy_data_gen = None
ArrayLike = Union[np.ndarray, "np.typing.NDArray[np.floating[Any]]"]
@dataclass
class _PathTable:
power: np.ndarray
phase: np.ndarray
delay: np.ndarray
aoa_az: np.ndarray
aoa_el: np.ndarray
aod_az: np.ndarray
aod_el: np.ndarray
interactions: np.ndarray
num_paths: np.ndarray
los_user: np.ndarray
locations: np.ndarray
class _LazyPathAccessor:
"""Lazy view over per-user path dictionaries compatible with v3 interface."""
def __init__(self, data: _PathTable) -> None:
self._data = data
def __len__(self) -> int:
return int(self._data.num_paths.shape[0])
def __getitem__(self, index: Union[int, slice, Sequence[int]]) -> Union[Dict[str, np.ndarray], List[Dict[str, np.ndarray]]]:
if isinstance(index, slice):
return [self[i] for i in range(*index.indices(len(self)))]
if isinstance(index, Sequence) and not isinstance(index, (str, bytes)):
return [self[int(i)] for i in index]
idx = int(index)
count = int(self._data.num_paths[idx])
if count <= 0:
empty = np.empty((0,), dtype=np.float32)
return {
"num_paths": 0,
"DoD_theta": empty,
"DoD_phi": empty,
"DoA_theta": empty,
"DoA_phi": empty,
"phase": empty,
"ToA": empty,
"power": empty,
"LoS": np.empty((0,), dtype=np.int32),
}
sl = slice(0, count)
interactions = np.asarray(self._data.interactions[idx, sl])
los_per_path = np.where(np.isnan(interactions), 0, (interactions == 0).astype(np.int32))
return {
"num_paths": count,
"DoD_theta": np.asarray(self._data.aod_el[idx, sl]),
"DoD_phi": np.asarray(self._data.aod_az[idx, sl]),
"DoA_theta": np.asarray(self._data.aoa_el[idx, sl]),
"DoA_phi": np.asarray(self._data.aoa_az[idx, sl]),
"phase": np.asarray(self._data.phase[idx, sl]),
"ToA": np.asarray(self._data.delay[idx, sl]),
"power": np.asarray(self._data.power[idx, sl]),
"LoS": los_per_path.astype(np.int32),
}
def _cast(array: ArrayLike, dtype: np.dtype[Any]) -> np.ndarray:
arr = np.asarray(array)
if arr.dtype == dtype:
return arr
return arr.astype(dtype, copy=True)
def _load_v4_dataset(
scenario: str,
*,
scenarios_dir: Optional[Path],
load_params: Optional[Dict[str, Any]],
max_paths: Optional[int],
array_dtype: np.dtype[Any],
logger: Optional[logging.Logger],
) -> Dict[str, Any]:
if not _HAS_DEEPMIMO_V4:
raise RuntimeError("DeepMIMO v4 package is not available in the current environment")
if scenarios_dir is not None:
deepmimo_config.set("scenarios_folder", str(scenarios_dir)) # type: ignore[attr-defined]
params = dict(load_params or {})
if max_paths is not None:
params.setdefault("max_paths", int(max_paths))
dataset = deepmimo.load(scenario, **params) # type: ignore[call-arg]
logger = logger or _LOGGER
logger.info(
"Loaded DeepMIMO v4 scenario '%s' with %s users and %s max paths", # pragma: no cover - logging
scenario,
getattr(dataset, "n_ue", "unknown"),
params.get("max_paths", "default"),
)
num_paths_raw = np.asarray(dataset.num_paths)
tx_axis: Optional[int] = None
if num_paths_raw.ndim > 1 and num_paths_raw.shape[0] > 1:
axes = tuple(range(1, num_paths_raw.ndim))
scores = num_paths_raw.sum(axis=axes)
tx_axis = int(np.argmax(scores))
def _select_tx(arr: Any, dtype: Optional[np.dtype[Any]] = None) -> np.ndarray:
out = np.asarray(arr)
if out.dtype == object:
out = np.stack([np.asarray(v) for v in out], axis=0)
if tx_axis is not None and out.ndim >= 1 and out.shape[0] == num_paths_raw.shape[0]:
out = out[tx_axis]
if dtype is not None:
out = out.astype(dtype, copy=False)
return out
num_paths = _select_tx(num_paths_raw, dtype=np.int32).reshape(-1)
power_db = _select_tx(dataset.power, dtype=array_dtype)
power = np.power(10.0, power_db / 10.0, dtype=array_dtype, casting="unsafe")
phase = _select_tx(dataset.phase, dtype=array_dtype)
delay = _select_tx(dataset.delay, dtype=array_dtype)
aoa_az = _select_tx(dataset.aoa_az, dtype=array_dtype)
aoa_el = _select_tx(dataset.aoa_el, dtype=array_dtype)
aod_az = _select_tx(dataset.aod_az, dtype=array_dtype)
aod_el = _select_tx(dataset.aod_el, dtype=array_dtype)
interactions = _select_tx(dataset.inter, dtype=array_dtype)
los_raw = getattr(dataset, "los", None)
if los_raw is None:
los_selected = np.zeros_like(num_paths, dtype=np.int8)
else:
los_selected = _select_tx(los_raw, dtype=np.int8)
locations = _select_tx(dataset.rx_pos, dtype=np.float32)
if locations.ndim == 1:
if locations.size % 3 == 0:
locations = locations.reshape(-1, 3)
else:
locations = locations.reshape(-1, 1)
if locations.ndim > 2:
locations = locations.reshape(locations.shape[0], -1)
path_table = _PathTable(
power=power,
phase=phase,
delay=delay,
aoa_az=aoa_az,
aoa_el=aoa_el,
aod_az=aod_az,
aod_el=aod_el,
interactions=interactions,
num_paths=num_paths,
los_user=los_selected.reshape(-1),
locations=locations,
)
# Help GC release original dataset arrays early
del dataset
user_payload = {
"paths": _LazyPathAccessor(path_table),
"LoS": path_table.los_user,
"location": path_table.locations,
}
return {
"user": user_payload,
"_path_data": path_table,
"_source": "deepmimo_v4",
}
def load_deepmimo_user_data(
scenario: str,
*,
scenarios_dir: Optional[Path] = None,
load_params: Optional[Dict[str, Any]] = None,
max_paths: Optional[int] = None,
array_dtype: np.dtype[Any] = np.float32,
logger: Optional[logging.Logger] = None,
) -> Dict[str, Any]:
"""Load DeepMIMO scenario data in a form compatible with legacy utilities.
The returned dictionary mimics the structure produced by DeepMIMO v3's
``DeepMIMO_data_gen`` so downstream utilities (e.g., dynamic scenario
generation) can operate without modification. When DeepMIMO v4 is not
available, the function falls back to the legacy generator if present.
"""
if _HAS_DEEPMIMO_V4:
return _load_v4_dataset(
scenario,
scenarios_dir=scenarios_dir,
load_params=load_params,
max_paths=max_paths,
array_dtype=array_dtype,
logger=logger,
)
if _legacy_data_gen is not None:
raise RuntimeError(
"DeepMIMO v4 is not installed. The repository still includes the legacy "
"DeepMIMO_data_gen interface, but integration parameters must be provided "
"explicitly. Please migrate to the official DeepMIMO package or invoke "
"DeepMIMO_data_gen directly from your own tooling."
)
raise RuntimeError(
"Neither DeepMIMO v4 nor the legacy DeepMIMO_data_gen function is available. "
"Please install the DeepMIMO package or provide the legacy generator."
)
__all__ = ["load_deepmimo_user_data"]
|