from __future__ import annotations import logging from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Union import numpy as np # type: ignore[import] _LOGGER = logging.getLogger(__name__) try: # pragma: no cover - optional dependency import deepmimo # type: ignore[import] from deepmimo import config as deepmimo_config # type: ignore[import] _HAS_DEEPMIMO_V4 = True except Exception: # pragma: no cover - DeepMIMO v4 not installed deepmimo = None # type: ignore[assignment] deepmimo_config = None # type: ignore[assignment] _HAS_DEEPMIMO_V4 = False try: # pragma: no cover - legacy fallback from input_preprocess import DeepMIMO_data_gen as _legacy_data_gen # type: ignore[import] except Exception: # pragma: no cover - legacy loader unavailable _legacy_data_gen = None ArrayLike = Union[np.ndarray, "np.typing.NDArray[np.floating[Any]]"] @dataclass class _PathTable: power: np.ndarray phase: np.ndarray delay: np.ndarray aoa_az: np.ndarray aoa_el: np.ndarray aod_az: np.ndarray aod_el: np.ndarray interactions: np.ndarray num_paths: np.ndarray los_user: np.ndarray locations: np.ndarray class _LazyPathAccessor: """Lazy view over per-user path dictionaries compatible with v3 interface.""" def __init__(self, data: _PathTable) -> None: self._data = data def __len__(self) -> int: return int(self._data.num_paths.shape[0]) def __getitem__(self, index: Union[int, slice, Sequence[int]]) -> Union[Dict[str, np.ndarray], List[Dict[str, np.ndarray]]]: if isinstance(index, slice): return [self[i] for i in range(*index.indices(len(self)))] if isinstance(index, Sequence) and not isinstance(index, (str, bytes)): return [self[int(i)] for i in index] idx = int(index) count = int(self._data.num_paths[idx]) if count <= 0: empty = np.empty((0,), dtype=np.float32) return { "num_paths": 0, "DoD_theta": empty, "DoD_phi": empty, "DoA_theta": empty, "DoA_phi": empty, "phase": empty, "ToA": empty, "power": empty, "LoS": np.empty((0,), dtype=np.int32), } sl = slice(0, count) interactions = np.asarray(self._data.interactions[idx, sl]) los_per_path = np.where(np.isnan(interactions), 0, (interactions == 0).astype(np.int32)) return { "num_paths": count, "DoD_theta": np.asarray(self._data.aod_el[idx, sl]), "DoD_phi": np.asarray(self._data.aod_az[idx, sl]), "DoA_theta": np.asarray(self._data.aoa_el[idx, sl]), "DoA_phi": np.asarray(self._data.aoa_az[idx, sl]), "phase": np.asarray(self._data.phase[idx, sl]), "ToA": np.asarray(self._data.delay[idx, sl]), "power": np.asarray(self._data.power[idx, sl]), "LoS": los_per_path.astype(np.int32), } def _cast(array: ArrayLike, dtype: np.dtype[Any]) -> np.ndarray: arr = np.asarray(array) if arr.dtype == dtype: return arr return arr.astype(dtype, copy=True) def _load_v4_dataset( scenario: str, *, scenarios_dir: Optional[Path], load_params: Optional[Dict[str, Any]], max_paths: Optional[int], array_dtype: np.dtype[Any], logger: Optional[logging.Logger], ) -> Dict[str, Any]: if not _HAS_DEEPMIMO_V4: raise RuntimeError("DeepMIMO v4 package is not available in the current environment") if scenarios_dir is not None: deepmimo_config.set("scenarios_folder", str(scenarios_dir)) # type: ignore[attr-defined] params = dict(load_params or {}) if max_paths is not None: params.setdefault("max_paths", int(max_paths)) dataset = deepmimo.load(scenario, **params) # type: ignore[call-arg] logger = logger or _LOGGER logger.info( "Loaded DeepMIMO v4 scenario '%s' with %s users and %s max paths", # pragma: no cover - logging scenario, getattr(dataset, "n_ue", "unknown"), params.get("max_paths", "default"), ) num_paths_raw = np.asarray(dataset.num_paths) tx_axis: Optional[int] = None if num_paths_raw.ndim > 1 and num_paths_raw.shape[0] > 1: axes = tuple(range(1, num_paths_raw.ndim)) scores = num_paths_raw.sum(axis=axes) tx_axis = int(np.argmax(scores)) def _select_tx(arr: Any, dtype: Optional[np.dtype[Any]] = None) -> np.ndarray: out = np.asarray(arr) if out.dtype == object: out = np.stack([np.asarray(v) for v in out], axis=0) if tx_axis is not None and out.ndim >= 1 and out.shape[0] == num_paths_raw.shape[0]: out = out[tx_axis] if dtype is not None: out = out.astype(dtype, copy=False) return out num_paths = _select_tx(num_paths_raw, dtype=np.int32).reshape(-1) power_db = _select_tx(dataset.power, dtype=array_dtype) power = np.power(10.0, power_db / 10.0, dtype=array_dtype, casting="unsafe") phase = _select_tx(dataset.phase, dtype=array_dtype) delay = _select_tx(dataset.delay, dtype=array_dtype) aoa_az = _select_tx(dataset.aoa_az, dtype=array_dtype) aoa_el = _select_tx(dataset.aoa_el, dtype=array_dtype) aod_az = _select_tx(dataset.aod_az, dtype=array_dtype) aod_el = _select_tx(dataset.aod_el, dtype=array_dtype) interactions = _select_tx(dataset.inter, dtype=array_dtype) los_raw = getattr(dataset, "los", None) if los_raw is None: los_selected = np.zeros_like(num_paths, dtype=np.int8) else: los_selected = _select_tx(los_raw, dtype=np.int8) locations = _select_tx(dataset.rx_pos, dtype=np.float32) if locations.ndim == 1: if locations.size % 3 == 0: locations = locations.reshape(-1, 3) else: locations = locations.reshape(-1, 1) if locations.ndim > 2: locations = locations.reshape(locations.shape[0], -1) path_table = _PathTable( power=power, phase=phase, delay=delay, aoa_az=aoa_az, aoa_el=aoa_el, aod_az=aod_az, aod_el=aod_el, interactions=interactions, num_paths=num_paths, los_user=los_selected.reshape(-1), locations=locations, ) # Help GC release original dataset arrays early del dataset user_payload = { "paths": _LazyPathAccessor(path_table), "LoS": path_table.los_user, "location": path_table.locations, } return { "user": user_payload, "_path_data": path_table, "_source": "deepmimo_v4", } def load_deepmimo_user_data( scenario: str, *, scenarios_dir: Optional[Path] = None, load_params: Optional[Dict[str, Any]] = None, max_paths: Optional[int] = None, array_dtype: np.dtype[Any] = np.float32, logger: Optional[logging.Logger] = None, ) -> Dict[str, Any]: """Load DeepMIMO scenario data in a form compatible with legacy utilities. The returned dictionary mimics the structure produced by DeepMIMO v3's ``DeepMIMO_data_gen`` so downstream utilities (e.g., dynamic scenario generation) can operate without modification. When DeepMIMO v4 is not available, the function falls back to the legacy generator if present. """ if _HAS_DEEPMIMO_V4: return _load_v4_dataset( scenario, scenarios_dir=scenarios_dir, load_params=load_params, max_paths=max_paths, array_dtype=array_dtype, logger=logger, ) if _legacy_data_gen is not None: raise RuntimeError( "DeepMIMO v4 is not installed. The repository still includes the legacy " "DeepMIMO_data_gen interface, but integration parameters must be provided " "explicitly. Please migrate to the official DeepMIMO package or invoke " "DeepMIMO_data_gen directly from your own tooling." ) raise RuntimeError( "Neither DeepMIMO v4 nor the legacy DeepMIMO_data_gen function is available. " "Please install the DeepMIMO package or provide the legacy generator." ) __all__ = ["load_deepmimo_user_data"]