lwm-temporal / LWMTemporal /data /deepmimo_adapter.py
wi-lab's picture
Update scenario gen and docs
95923eb
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
import numpy as np # type: ignore[import]
_LOGGER = logging.getLogger(__name__)
try: # pragma: no cover - optional dependency
import deepmimo # type: ignore[import]
from deepmimo import config as deepmimo_config # type: ignore[import]
_HAS_DEEPMIMO_V4 = True
except Exception: # pragma: no cover - DeepMIMO v4 not installed
deepmimo = None # type: ignore[assignment]
deepmimo_config = None # type: ignore[assignment]
_HAS_DEEPMIMO_V4 = False
try: # pragma: no cover - legacy fallback
from input_preprocess import DeepMIMO_data_gen as _legacy_data_gen # type: ignore[import]
except Exception: # pragma: no cover - legacy loader unavailable
_legacy_data_gen = None
ArrayLike = Union[np.ndarray, "np.typing.NDArray[np.floating[Any]]"]
@dataclass
class _PathTable:
power: np.ndarray
phase: np.ndarray
delay: np.ndarray
aoa_az: np.ndarray
aoa_el: np.ndarray
aod_az: np.ndarray
aod_el: np.ndarray
interactions: np.ndarray
num_paths: np.ndarray
los_user: np.ndarray
locations: np.ndarray
class _LazyPathAccessor:
"""Lazy view over per-user path dictionaries compatible with v3 interface."""
def __init__(self, data: _PathTable) -> None:
self._data = data
def __len__(self) -> int:
return int(self._data.num_paths.shape[0])
def __getitem__(self, index: Union[int, slice, Sequence[int]]) -> Union[Dict[str, np.ndarray], List[Dict[str, np.ndarray]]]:
if isinstance(index, slice):
return [self[i] for i in range(*index.indices(len(self)))]
if isinstance(index, Sequence) and not isinstance(index, (str, bytes)):
return [self[int(i)] for i in index]
idx = int(index)
count = int(self._data.num_paths[idx])
if count <= 0:
empty = np.empty((0,), dtype=np.float32)
return {
"num_paths": 0,
"DoD_theta": empty,
"DoD_phi": empty,
"DoA_theta": empty,
"DoA_phi": empty,
"phase": empty,
"ToA": empty,
"power": empty,
"LoS": np.empty((0,), dtype=np.int32),
}
sl = slice(0, count)
interactions = np.asarray(self._data.interactions[idx, sl])
los_per_path = np.where(np.isnan(interactions), 0, (interactions == 0).astype(np.int32))
return {
"num_paths": count,
"DoD_theta": np.asarray(self._data.aod_el[idx, sl]),
"DoD_phi": np.asarray(self._data.aod_az[idx, sl]),
"DoA_theta": np.asarray(self._data.aoa_el[idx, sl]),
"DoA_phi": np.asarray(self._data.aoa_az[idx, sl]),
"phase": np.asarray(self._data.phase[idx, sl]),
"ToA": np.asarray(self._data.delay[idx, sl]),
"power": np.asarray(self._data.power[idx, sl]),
"LoS": los_per_path.astype(np.int32),
}
def _cast(array: ArrayLike, dtype: np.dtype[Any]) -> np.ndarray:
arr = np.asarray(array)
if arr.dtype == dtype:
return arr
return arr.astype(dtype, copy=True)
def _load_v4_dataset(
scenario: str,
*,
scenarios_dir: Optional[Path],
load_params: Optional[Dict[str, Any]],
max_paths: Optional[int],
array_dtype: np.dtype[Any],
logger: Optional[logging.Logger],
) -> Dict[str, Any]:
if not _HAS_DEEPMIMO_V4:
raise RuntimeError("DeepMIMO v4 package is not available in the current environment")
if scenarios_dir is not None:
deepmimo_config.set("scenarios_folder", str(scenarios_dir)) # type: ignore[attr-defined]
params = dict(load_params or {})
if max_paths is not None:
params.setdefault("max_paths", int(max_paths))
dataset = deepmimo.load(scenario, **params) # type: ignore[call-arg]
logger = logger or _LOGGER
logger.info(
"Loaded DeepMIMO v4 scenario '%s' with %s users and %s max paths", # pragma: no cover - logging
scenario,
getattr(dataset, "n_ue", "unknown"),
params.get("max_paths", "default"),
)
num_paths_raw = np.asarray(dataset.num_paths)
tx_axis: Optional[int] = None
if num_paths_raw.ndim > 1 and num_paths_raw.shape[0] > 1:
axes = tuple(range(1, num_paths_raw.ndim))
scores = num_paths_raw.sum(axis=axes)
tx_axis = int(np.argmax(scores))
def _select_tx(arr: Any, dtype: Optional[np.dtype[Any]] = None) -> np.ndarray:
out = np.asarray(arr)
if out.dtype == object:
out = np.stack([np.asarray(v) for v in out], axis=0)
if tx_axis is not None and out.ndim >= 1 and out.shape[0] == num_paths_raw.shape[0]:
out = out[tx_axis]
if dtype is not None:
out = out.astype(dtype, copy=False)
return out
num_paths = _select_tx(num_paths_raw, dtype=np.int32).reshape(-1)
power_db = _select_tx(dataset.power, dtype=array_dtype)
power = np.power(10.0, power_db / 10.0, dtype=array_dtype, casting="unsafe")
phase = _select_tx(dataset.phase, dtype=array_dtype)
delay = _select_tx(dataset.delay, dtype=array_dtype)
aoa_az = _select_tx(dataset.aoa_az, dtype=array_dtype)
aoa_el = _select_tx(dataset.aoa_el, dtype=array_dtype)
aod_az = _select_tx(dataset.aod_az, dtype=array_dtype)
aod_el = _select_tx(dataset.aod_el, dtype=array_dtype)
interactions = _select_tx(dataset.inter, dtype=array_dtype)
los_raw = getattr(dataset, "los", None)
if los_raw is None:
los_selected = np.zeros_like(num_paths, dtype=np.int8)
else:
los_selected = _select_tx(los_raw, dtype=np.int8)
locations = _select_tx(dataset.rx_pos, dtype=np.float32)
if locations.ndim == 1:
if locations.size % 3 == 0:
locations = locations.reshape(-1, 3)
else:
locations = locations.reshape(-1, 1)
if locations.ndim > 2:
locations = locations.reshape(locations.shape[0], -1)
path_table = _PathTable(
power=power,
phase=phase,
delay=delay,
aoa_az=aoa_az,
aoa_el=aoa_el,
aod_az=aod_az,
aod_el=aod_el,
interactions=interactions,
num_paths=num_paths,
los_user=los_selected.reshape(-1),
locations=locations,
)
# Help GC release original dataset arrays early
del dataset
user_payload = {
"paths": _LazyPathAccessor(path_table),
"LoS": path_table.los_user,
"location": path_table.locations,
}
return {
"user": user_payload,
"_path_data": path_table,
"_source": "deepmimo_v4",
}
def load_deepmimo_user_data(
scenario: str,
*,
scenarios_dir: Optional[Path] = None,
load_params: Optional[Dict[str, Any]] = None,
max_paths: Optional[int] = None,
array_dtype: np.dtype[Any] = np.float32,
logger: Optional[logging.Logger] = None,
) -> Dict[str, Any]:
"""Load DeepMIMO scenario data in a form compatible with legacy utilities.
The returned dictionary mimics the structure produced by DeepMIMO v3's
``DeepMIMO_data_gen`` so downstream utilities (e.g., dynamic scenario
generation) can operate without modification. When DeepMIMO v4 is not
available, the function falls back to the legacy generator if present.
"""
if _HAS_DEEPMIMO_V4:
return _load_v4_dataset(
scenario,
scenarios_dir=scenarios_dir,
load_params=load_params,
max_paths=max_paths,
array_dtype=array_dtype,
logger=logger,
)
if _legacy_data_gen is not None:
raise RuntimeError(
"DeepMIMO v4 is not installed. The repository still includes the legacy "
"DeepMIMO_data_gen interface, but integration parameters must be provided "
"explicitly. Please migrate to the official DeepMIMO package or invoke "
"DeepMIMO_data_gen directly from your own tooling."
)
raise RuntimeError(
"Neither DeepMIMO v4 nor the legacy DeepMIMO_data_gen function is available. "
"Please install the DeepMIMO package or provide the legacy generator."
)
__all__ = ["load_deepmimo_user_data"]