| | from __future__ import annotations |
| |
|
| | import logging |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any, Dict, Iterable, List, Optional, Sequence, Union |
| |
|
| | import numpy as np |
| |
|
| | _LOGGER = logging.getLogger(__name__) |
| |
|
| | try: |
| | import deepmimo |
| | from deepmimo import config as deepmimo_config |
| |
|
| | _HAS_DEEPMIMO_V4 = True |
| | except Exception: |
| | deepmimo = None |
| | deepmimo_config = None |
| | _HAS_DEEPMIMO_V4 = False |
| |
|
| | try: |
| | from input_preprocess import DeepMIMO_data_gen as _legacy_data_gen |
| | except Exception: |
| | _legacy_data_gen = None |
| |
|
| | ArrayLike = Union[np.ndarray, "np.typing.NDArray[np.floating[Any]]"] |
| |
|
| |
|
| | @dataclass |
| | class _PathTable: |
| | power: np.ndarray |
| | phase: np.ndarray |
| | delay: np.ndarray |
| | aoa_az: np.ndarray |
| | aoa_el: np.ndarray |
| | aod_az: np.ndarray |
| | aod_el: np.ndarray |
| | interactions: np.ndarray |
| | num_paths: np.ndarray |
| | los_user: np.ndarray |
| | locations: np.ndarray |
| |
|
| |
|
| | class _LazyPathAccessor: |
| | """Lazy view over per-user path dictionaries compatible with v3 interface.""" |
| |
|
| | def __init__(self, data: _PathTable) -> None: |
| | self._data = data |
| |
|
| | def __len__(self) -> int: |
| | return int(self._data.num_paths.shape[0]) |
| |
|
| | def __getitem__(self, index: Union[int, slice, Sequence[int]]) -> Union[Dict[str, np.ndarray], List[Dict[str, np.ndarray]]]: |
| | if isinstance(index, slice): |
| | return [self[i] for i in range(*index.indices(len(self)))] |
| | if isinstance(index, Sequence) and not isinstance(index, (str, bytes)): |
| | return [self[int(i)] for i in index] |
| | idx = int(index) |
| | count = int(self._data.num_paths[idx]) |
| | if count <= 0: |
| | empty = np.empty((0,), dtype=np.float32) |
| | return { |
| | "num_paths": 0, |
| | "DoD_theta": empty, |
| | "DoD_phi": empty, |
| | "DoA_theta": empty, |
| | "DoA_phi": empty, |
| | "phase": empty, |
| | "ToA": empty, |
| | "power": empty, |
| | "LoS": np.empty((0,), dtype=np.int32), |
| | } |
| | sl = slice(0, count) |
| | interactions = np.asarray(self._data.interactions[idx, sl]) |
| | los_per_path = np.where(np.isnan(interactions), 0, (interactions == 0).astype(np.int32)) |
| | return { |
| | "num_paths": count, |
| | "DoD_theta": np.asarray(self._data.aod_el[idx, sl]), |
| | "DoD_phi": np.asarray(self._data.aod_az[idx, sl]), |
| | "DoA_theta": np.asarray(self._data.aoa_el[idx, sl]), |
| | "DoA_phi": np.asarray(self._data.aoa_az[idx, sl]), |
| | "phase": np.asarray(self._data.phase[idx, sl]), |
| | "ToA": np.asarray(self._data.delay[idx, sl]), |
| | "power": np.asarray(self._data.power[idx, sl]), |
| | "LoS": los_per_path.astype(np.int32), |
| | } |
| |
|
| |
|
| | def _cast(array: ArrayLike, dtype: np.dtype[Any]) -> np.ndarray: |
| | arr = np.asarray(array) |
| | if arr.dtype == dtype: |
| | return arr |
| | return arr.astype(dtype, copy=True) |
| |
|
| |
|
| | def _load_v4_dataset( |
| | scenario: str, |
| | *, |
| | scenarios_dir: Optional[Path], |
| | load_params: Optional[Dict[str, Any]], |
| | max_paths: Optional[int], |
| | array_dtype: np.dtype[Any], |
| | logger: Optional[logging.Logger], |
| | ) -> Dict[str, Any]: |
| | if not _HAS_DEEPMIMO_V4: |
| | raise RuntimeError("DeepMIMO v4 package is not available in the current environment") |
| |
|
| | if scenarios_dir is not None: |
| | deepmimo_config.set("scenarios_folder", str(scenarios_dir)) |
| |
|
| | params = dict(load_params or {}) |
| | if max_paths is not None: |
| | params.setdefault("max_paths", int(max_paths)) |
| |
|
| | dataset = deepmimo.load(scenario, **params) |
| | logger = logger or _LOGGER |
| | logger.info( |
| | "Loaded DeepMIMO v4 scenario '%s' with %s users and %s max paths", |
| | scenario, |
| | getattr(dataset, "n_ue", "unknown"), |
| | params.get("max_paths", "default"), |
| | ) |
| |
|
| | num_paths_raw = np.asarray(dataset.num_paths) |
| | tx_axis: Optional[int] = None |
| | if num_paths_raw.ndim > 1 and num_paths_raw.shape[0] > 1: |
| | axes = tuple(range(1, num_paths_raw.ndim)) |
| | scores = num_paths_raw.sum(axis=axes) |
| | tx_axis = int(np.argmax(scores)) |
| |
|
| | def _select_tx(arr: Any, dtype: Optional[np.dtype[Any]] = None) -> np.ndarray: |
| | out = np.asarray(arr) |
| | if out.dtype == object: |
| | out = np.stack([np.asarray(v) for v in out], axis=0) |
| | if tx_axis is not None and out.ndim >= 1 and out.shape[0] == num_paths_raw.shape[0]: |
| | out = out[tx_axis] |
| | if dtype is not None: |
| | out = out.astype(dtype, copy=False) |
| | return out |
| |
|
| | num_paths = _select_tx(num_paths_raw, dtype=np.int32).reshape(-1) |
| | power_db = _select_tx(dataset.power, dtype=array_dtype) |
| | power = np.power(10.0, power_db / 10.0, dtype=array_dtype, casting="unsafe") |
| | phase = _select_tx(dataset.phase, dtype=array_dtype) |
| | delay = _select_tx(dataset.delay, dtype=array_dtype) |
| | aoa_az = _select_tx(dataset.aoa_az, dtype=array_dtype) |
| | aoa_el = _select_tx(dataset.aoa_el, dtype=array_dtype) |
| | aod_az = _select_tx(dataset.aod_az, dtype=array_dtype) |
| | aod_el = _select_tx(dataset.aod_el, dtype=array_dtype) |
| | interactions = _select_tx(dataset.inter, dtype=array_dtype) |
| | los_raw = getattr(dataset, "los", None) |
| | if los_raw is None: |
| | los_selected = np.zeros_like(num_paths, dtype=np.int8) |
| | else: |
| | los_selected = _select_tx(los_raw, dtype=np.int8) |
| | locations = _select_tx(dataset.rx_pos, dtype=np.float32) |
| | if locations.ndim == 1: |
| | if locations.size % 3 == 0: |
| | locations = locations.reshape(-1, 3) |
| | else: |
| | locations = locations.reshape(-1, 1) |
| | if locations.ndim > 2: |
| | locations = locations.reshape(locations.shape[0], -1) |
| |
|
| | path_table = _PathTable( |
| | power=power, |
| | phase=phase, |
| | delay=delay, |
| | aoa_az=aoa_az, |
| | aoa_el=aoa_el, |
| | aod_az=aod_az, |
| | aod_el=aod_el, |
| | interactions=interactions, |
| | num_paths=num_paths, |
| | los_user=los_selected.reshape(-1), |
| | locations=locations, |
| | ) |
| |
|
| | |
| | del dataset |
| |
|
| | user_payload = { |
| | "paths": _LazyPathAccessor(path_table), |
| | "LoS": path_table.los_user, |
| | "location": path_table.locations, |
| | } |
| |
|
| | return { |
| | "user": user_payload, |
| | "_path_data": path_table, |
| | "_source": "deepmimo_v4", |
| | } |
| |
|
| |
|
| | def load_deepmimo_user_data( |
| | scenario: str, |
| | *, |
| | scenarios_dir: Optional[Path] = None, |
| | load_params: Optional[Dict[str, Any]] = None, |
| | max_paths: Optional[int] = None, |
| | array_dtype: np.dtype[Any] = np.float32, |
| | logger: Optional[logging.Logger] = None, |
| | ) -> Dict[str, Any]: |
| | """Load DeepMIMO scenario data in a form compatible with legacy utilities. |
| | |
| | The returned dictionary mimics the structure produced by DeepMIMO v3's |
| | ``DeepMIMO_data_gen`` so downstream utilities (e.g., dynamic scenario |
| | generation) can operate without modification. When DeepMIMO v4 is not |
| | available, the function falls back to the legacy generator if present. |
| | """ |
| |
|
| | if _HAS_DEEPMIMO_V4: |
| | return _load_v4_dataset( |
| | scenario, |
| | scenarios_dir=scenarios_dir, |
| | load_params=load_params, |
| | max_paths=max_paths, |
| | array_dtype=array_dtype, |
| | logger=logger, |
| | ) |
| |
|
| | if _legacy_data_gen is not None: |
| | raise RuntimeError( |
| | "DeepMIMO v4 is not installed. The repository still includes the legacy " |
| | "DeepMIMO_data_gen interface, but integration parameters must be provided " |
| | "explicitly. Please migrate to the official DeepMIMO package or invoke " |
| | "DeepMIMO_data_gen directly from your own tooling." |
| | ) |
| |
|
| | raise RuntimeError( |
| | "Neither DeepMIMO v4 nor the legacy DeepMIMO_data_gen function is available. " |
| | "Please install the DeepMIMO package or provide the legacy generator." |
| | ) |
| |
|
| |
|
| | __all__ = ["load_deepmimo_user_data"] |
| |
|