| | from __future__ import annotations |
| |
|
| | import argparse |
| | import time |
| | from pathlib import Path |
| | from typing import List, Optional, Sequence |
| |
|
| | try: |
| | import numpy as np |
| | except ModuleNotFoundError as exc: |
| | raise ImportError("numpy is required for dynamic scenario generation") from exc |
| |
|
| | try: |
| | import pandas as pd |
| | except ModuleNotFoundError as exc: |
| | raise ImportError("pandas is required for dynamic scenario generation") from exc |
| |
|
| | from ..data.scenario_generation import ( |
| | AntennaArrayConfig, |
| | DynamicScenarioGenerator, |
| | GridConfig, |
| | ScenarioGenerationConfig, |
| | ScenarioSamplingConfig, |
| | TrafficConfig, |
| | ) |
| | from ..utils.logging import setup_logging |
| |
|
| | try: |
| | from deepmimo import general_utils as deepmimo_utils |
| | from deepmimo import config as deepmimo_config |
| |
|
| | _HAS_DEEPMIMO = True |
| | except Exception: |
| | deepmimo_utils = None |
| | deepmimo_config = None |
| | _HAS_DEEPMIMO = False |
| |
|
| |
|
| | def _ensure_channel_dimensions(csv_path: Path, seed: int, logger) -> pd.DataFrame: |
| | if csv_path.exists(): |
| | return pd.read_csv(csv_path) |
| | logger.info("Channel dimension CSV not found. Generating new table at %s", csv_path) |
| | np.random.seed(seed) |
| | n_scenarios = 2000 |
| | max_product = 2 ** 16 |
| | dims = [] |
| | while len(dims) < n_scenarios: |
| | a1 = np.random.randint(0, 16) |
| | time_steps = 2 ** 4 - a1 |
| | a2 = np.random.randint(0, 6) |
| | num_ant = 2 ** (7 - a2) |
| | a3 = np.random.randint(0, 6) |
| | num_sc = 2 ** (9 - a3) |
| | prod = time_steps * num_ant * num_sc |
| | if prod <= max_product: |
| | h, v = max(1, int(round(np.sqrt(num_ant)))), 1 |
| | for k in range(1, int(np.sqrt(num_ant)) + 1): |
| | if num_ant % k == 0: |
| | h, v = num_ant // k, k |
| | dims.append([time_steps, h, v, num_sc, prod]) |
| | arr = np.array(dims) |
| | np.random.shuffle(arr) |
| | df = pd.DataFrame( |
| | arr, |
| | columns=["time_steps", "num_antennas_tx_hor", "num_antennas_tx_vert", "num_subcarriers", "product"], |
| | ) |
| | csv_path.parent.mkdir(parents=True, exist_ok=True) |
| | df.to_csv(csv_path, index=False) |
| | return df |
| |
|
| |
|
| | def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: |
| | parser = argparse.ArgumentParser(description="Dynamic scenario generation pipeline") |
| | parser.add_argument("--output-dir", type=Path, default=Path("examples/data")) |
| | parser.add_argument("--full-output-dir", type=Path, default=Path("examples/full_data")) |
| | parser.add_argument("--figures-dir", type=Path, default=Path("examples/figs")) |
| | parser.add_argument("--channel-dims", type=Path, default=Path("examples/data/channel_dimensions.csv")) |
| | parser.add_argument("--regen-dims", action="store_true", help="Regenerate the channel dimension table") |
| | parser.add_argument("--dims-seed", type=int, default=1323) |
| | parser.add_argument("--scenario-indices", type=int, nargs="*", default=None, help="Indices of scenarios to generate") |
| | parser.add_argument("--scenario-range", type=int, nargs=2, metavar=("START", "END"), default=None) |
| | parser.add_argument("--time-steps", type=int, default=None, help="Override time steps for all scenarios") |
| | parser.add_argument("--tx-horizontal", type=int, default=None, help="Override horizontal TX elements") |
| | parser.add_argument("--tx-vertical", type=int, default=None, help="Override vertical TX elements") |
| | parser.add_argument("--subcarriers", type=int, default=None, help="Override number of subcarriers") |
| | parser.add_argument("--num-vehicles", type=int, default=50) |
| | parser.add_argument("--num-pedestrians", type=int, default=10) |
| | parser.add_argument("--vehicle-speed", type=float, nargs=2, metavar=("MIN", "MAX"), default=None) |
| | parser.add_argument("--ped-speed", type=float, nargs=2, metavar=("MIN", "MAX"), default=None) |
| | parser.add_argument("--continuous-length", type=int, default=None) |
| | parser.add_argument("--sample-dt", type=float, default=1e-3) |
| | parser.add_argument("--turn-probability", type=float, default=0.1) |
| | parser.add_argument("--ped-angle-std", type=float, default=0.1) |
| | parser.add_argument("--max-attempts", type=int, default=300) |
| | parser.add_argument("--carrier-frequency", type=float, default=3.5e9) |
| | parser.add_argument("--scenarios-dir", type=Path, default=Path("deepmimo_scenarios"), help="Directory containing DeepMIMO scenarios") |
| | parser.add_argument("--max-paths", type=int, default=6, help="Maximum number of paths per UE to load from DeepMIMO") |
| | parser.add_argument("--road-width", type=float, default=2.0, help="Approximate lane width in meters for road filtering") |
| | parser.add_argument("--road-spacing", type=float, default=10.0, help="Center-to-center spacing between parallel roads (meters)") |
| | parser.add_argument("--grid-step", type=float, default=None, help="Manual road graph step size (meters). Overrides automatic inference when provided.") |
| | parser.add_argument("--disable-auto-grid-step", action="store_true", help="Disable automatic road grid step inference.") |
| | parser.add_argument("--scenario-name", type=str, default=None, help="Explicit DeepMIMO scenario name (bypass index lookup)") |
| | parser.add_argument("--seed", type=int, default=None, help="Set RNG seed for reproducibility") |
| | parser.add_argument("--overwrite", action="store_true") |
| | parser.add_argument("--log-dir", type=Path, default=Path("logs")) |
| | return parser.parse_args(argv) |
| |
|
| |
|
| | def _load_scenario_names(scenarios_dir: Optional[Path]) -> List[str]: |
| | if not _HAS_DEEPMIMO: |
| | raise ImportError( |
| | "DeepMIMO package is required to enumerate scenarios. Provide --scenario-name or install the package." |
| | ) |
| | if scenarios_dir is not None: |
| | deepmimo_config.set("scenarios_folder", str(scenarios_dir)) |
| | names = deepmimo_utils.get_available_scenarios() |
| | if not names: |
| | raise RuntimeError("No DeepMIMO scenarios found in the specified directory") |
| | return names |
| |
|
| |
|
| | def _resolve_indices(args: argparse.Namespace, n_scenarios: int) -> Sequence[int]: |
| | if args.scenario_indices: |
| | return [int(i) for i in args.scenario_indices] |
| | if args.scenario_range: |
| | start, end = args.scenario_range |
| | return list(range(start, end)) |
| | return list(range(n_scenarios)) |
| |
|
| |
|
| | def main(argv: Optional[Sequence[str]] = None) -> None: |
| | args = parse_args(argv) |
| | logger = setup_logging("LWMTemporal.dynamic_generation", args.log_dir) |
| | if args.seed is not None: |
| | np.random.seed(args.seed) |
| |
|
| | if args.regen_dims and args.channel_dims.exists(): |
| | args.channel_dims.unlink() |
| | dims_df = _ensure_channel_dimensions(args.channel_dims, args.dims_seed, logger) |
| |
|
| | scenario_names: Optional[List[str]] = None |
| | if args.scenario_name is not None: |
| | indices = [0] |
| | worklist = [(args.scenario_name, 0)] |
| | else: |
| | scenario_names = _load_scenario_names(args.scenarios_dir) |
| | indices = _resolve_indices(args, len(scenario_names)) |
| | worklist = [(scenario_names[idx], idx) for idx in indices] |
| | logger.info("Preparing to generate %d scenarios", len(indices)) |
| |
|
| | total_start = time.time() |
| |
|
| | for counter, (scenario_name, idx) in enumerate(worklist, start=1): |
| | if idx is not None and scenario_names is not None: |
| | if idx < 0 or idx >= len(scenario_names): |
| | logger.warning("Skipping invalid scenario index %d", idx) |
| | continue |
| | dims_row = dims_df.iloc[idx % len(dims_df)] |
| | time_steps = args.time_steps or int(dims_row["time_steps"]) |
| | tx_h = args.tx_horizontal or int(dims_row["num_antennas_tx_hor"]) |
| | tx_v = args.tx_vertical or int(dims_row["num_antennas_tx_vert"]) |
| | subcarriers = args.subcarriers or int(dims_row["num_subcarriers"]) |
| |
|
| | antenna = AntennaArrayConfig(tx_horizontal=tx_h, tx_vertical=tx_v, subcarriers=subcarriers) |
| | traffic = TrafficConfig( |
| | num_vehicles=args.num_vehicles, |
| | num_pedestrians=args.num_pedestrians, |
| | vehicle_speed_range=tuple(args.vehicle_speed) if args.vehicle_speed else (5 / 3.6, 60 / 3.6), |
| | pedestrian_speed_range=tuple(args.ped_speed) if args.ped_speed else (0.5, 2.0), |
| | turn_probability=args.turn_probability, |
| | max_attempts=args.max_attempts, |
| | pedestrian_angle_std=args.ped_angle_std, |
| | ) |
| | sampling = ScenarioSamplingConfig( |
| | time_steps=time_steps, |
| | continuous_length=args.continuous_length, |
| | sample_dt=args.sample_dt, |
| | continuous_mode=True, |
| | ) |
| | grid = GridConfig( |
| | road_width=args.road_width, |
| | road_center_spacing=args.road_spacing, |
| | step_size=args.grid_step if args.grid_step is not None else GridConfig.step_size, |
| | auto_step_size=(not args.disable_auto_grid_step) and args.grid_step is None, |
| | ) |
| | config = ScenarioGenerationConfig( |
| | scenario=scenario_name, |
| | antenna=antenna, |
| | sampling=sampling, |
| | traffic=traffic, |
| | grid=grid, |
| | carrier_frequency_hz=args.carrier_frequency, |
| | output_dir=args.output_dir, |
| | full_output_dir=args.full_output_dir, |
| | figures_dir=args.figures_dir, |
| | export_environment_plot=args.figures_dir is not None, |
| | rng_seed=args.seed, |
| | scenarios_dir=args.scenarios_dir, |
| | deepmimo_max_paths=args.max_paths, |
| | ) |
| |
|
| | logger.info( |
| | "[%d/%d] Generating scenario '%s' (T=%d, tx=%d x %d, subc=%d)", |
| | counter, |
| | len(indices), |
| | scenario_name, |
| | time_steps, |
| | tx_h, |
| | tx_v, |
| | subcarriers, |
| | ) |
| | start_time = time.time() |
| | generator = DynamicScenarioGenerator(config, logger=logger) |
| | result = generator.generate(overwrite=args.overwrite) |
| | elapsed = time.time() - start_time |
| | status = "generated" if result.generated else "cached" |
| | logger.info( |
| | "Scenario '%s' %s in %.2f seconds -> %s", |
| | scenario_name, |
| | status, |
| | elapsed, |
| | result.output_path, |
| | ) |
| | avg_time = (time.time() - total_start) / counter |
| | remaining = avg_time * (len(indices) - counter) |
| | logger.info( |
| | "Estimated remaining time: %.2f seconds (%.2f minutes)", |
| | remaining, |
| | remaining / 60.0, |
| | ) |
| | total_elapsed = time.time() - total_start |
| | logger.info( |
| | "Completed dynamic scenario generation in %.2f seconds (%.2f minutes)", |
| | total_elapsed, |
| | total_elapsed / 60.0, |
| | ) |
| |
|
| |
|
| | __all__ = ["parse_args", "main"] |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|