from __future__ import annotations import argparse import time from pathlib import Path from typing import List, Optional, Sequence try: # pragma: no cover - external deps may be optional in some environments import numpy as np # type: ignore[import] except ModuleNotFoundError as exc: # pragma: no cover raise ImportError("numpy is required for dynamic scenario generation") from exc try: # pragma: no cover - external deps may be optional in some environments import pandas as pd # type: ignore[import] except ModuleNotFoundError as exc: # pragma: no cover raise ImportError("pandas is required for dynamic scenario generation") from exc from ..data.scenario_generation import ( AntennaArrayConfig, DynamicScenarioGenerator, GridConfig, ScenarioGenerationConfig, ScenarioSamplingConfig, TrafficConfig, ) from ..utils.logging import setup_logging try: # pragma: no cover - optional dependency for scenario listing from deepmimo import general_utils as deepmimo_utils # type: ignore[import] from deepmimo import config as deepmimo_config # type: ignore[import] _HAS_DEEPMIMO = True except Exception: # pragma: no cover - DeepMIMO not installed deepmimo_utils = None # type: ignore[assignment] deepmimo_config = None # type: ignore[assignment] _HAS_DEEPMIMO = False def _ensure_channel_dimensions(csv_path: Path, seed: int, logger) -> pd.DataFrame: if csv_path.exists(): return pd.read_csv(csv_path) logger.info("Channel dimension CSV not found. Generating new table at %s", csv_path) np.random.seed(seed) n_scenarios = 2000 max_product = 2 ** 16 dims = [] while len(dims) < n_scenarios: a1 = np.random.randint(0, 16) time_steps = 2 ** 4 - a1 a2 = np.random.randint(0, 6) num_ant = 2 ** (7 - a2) a3 = np.random.randint(0, 6) num_sc = 2 ** (9 - a3) prod = time_steps * num_ant * num_sc if prod <= max_product: h, v = max(1, int(round(np.sqrt(num_ant)))), 1 for k in range(1, int(np.sqrt(num_ant)) + 1): if num_ant % k == 0: h, v = num_ant // k, k dims.append([time_steps, h, v, num_sc, prod]) arr = np.array(dims) np.random.shuffle(arr) df = pd.DataFrame( arr, columns=["time_steps", "num_antennas_tx_hor", "num_antennas_tx_vert", "num_subcarriers", "product"], ) csv_path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(csv_path, index=False) return df def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Dynamic scenario generation pipeline") parser.add_argument("--output-dir", type=Path, default=Path("examples/data")) parser.add_argument("--full-output-dir", type=Path, default=Path("examples/full_data")) parser.add_argument("--figures-dir", type=Path, default=Path("examples/figs")) parser.add_argument("--channel-dims", type=Path, default=Path("examples/data/channel_dimensions.csv")) parser.add_argument("--regen-dims", action="store_true", help="Regenerate the channel dimension table") parser.add_argument("--dims-seed", type=int, default=1323) parser.add_argument("--scenario-indices", type=int, nargs="*", default=None, help="Indices of scenarios to generate") parser.add_argument("--scenario-range", type=int, nargs=2, metavar=("START", "END"), default=None) parser.add_argument("--time-steps", type=int, default=None, help="Override time steps for all scenarios") parser.add_argument("--tx-horizontal", type=int, default=None, help="Override horizontal TX elements") parser.add_argument("--tx-vertical", type=int, default=None, help="Override vertical TX elements") parser.add_argument("--subcarriers", type=int, default=None, help="Override number of subcarriers") parser.add_argument("--num-vehicles", type=int, default=50) parser.add_argument("--num-pedestrians", type=int, default=10) parser.add_argument("--vehicle-speed", type=float, nargs=2, metavar=("MIN", "MAX"), default=None) parser.add_argument("--ped-speed", type=float, nargs=2, metavar=("MIN", "MAX"), default=None) parser.add_argument("--continuous-length", type=int, default=None) parser.add_argument("--sample-dt", type=float, default=1e-3) parser.add_argument("--turn-probability", type=float, default=0.1) parser.add_argument("--ped-angle-std", type=float, default=0.1) parser.add_argument("--max-attempts", type=int, default=300) parser.add_argument("--carrier-frequency", type=float, default=3.5e9) parser.add_argument("--scenarios-dir", type=Path, default=Path("deepmimo_scenarios"), help="Directory containing DeepMIMO scenarios") parser.add_argument("--max-paths", type=int, default=6, help="Maximum number of paths per UE to load from DeepMIMO") parser.add_argument("--road-width", type=float, default=2.0, help="Approximate lane width in meters for road filtering") parser.add_argument("--road-spacing", type=float, default=10.0, help="Center-to-center spacing between parallel roads (meters)") parser.add_argument("--grid-step", type=float, default=None, help="Manual road graph step size (meters). Overrides automatic inference when provided.") parser.add_argument("--disable-auto-grid-step", action="store_true", help="Disable automatic road grid step inference.") parser.add_argument("--scenario-name", type=str, default=None, help="Explicit DeepMIMO scenario name (bypass index lookup)") parser.add_argument("--seed", type=int, default=None, help="Set RNG seed for reproducibility") parser.add_argument("--overwrite", action="store_true") parser.add_argument("--log-dir", type=Path, default=Path("logs")) return parser.parse_args(argv) def _load_scenario_names(scenarios_dir: Optional[Path]) -> List[str]: if not _HAS_DEEPMIMO: raise ImportError( "DeepMIMO package is required to enumerate scenarios. Provide --scenario-name or install the package." ) if scenarios_dir is not None: deepmimo_config.set("scenarios_folder", str(scenarios_dir)) # type: ignore[operator] names = deepmimo_utils.get_available_scenarios() # type: ignore[operator] if not names: raise RuntimeError("No DeepMIMO scenarios found in the specified directory") return names def _resolve_indices(args: argparse.Namespace, n_scenarios: int) -> Sequence[int]: if args.scenario_indices: return [int(i) for i in args.scenario_indices] if args.scenario_range: start, end = args.scenario_range return list(range(start, end)) return list(range(n_scenarios)) def main(argv: Optional[Sequence[str]] = None) -> None: args = parse_args(argv) logger = setup_logging("LWMTemporal.dynamic_generation", args.log_dir) if args.seed is not None: np.random.seed(args.seed) if args.regen_dims and args.channel_dims.exists(): args.channel_dims.unlink() dims_df = _ensure_channel_dimensions(args.channel_dims, args.dims_seed, logger) scenario_names: Optional[List[str]] = None if args.scenario_name is not None: indices = [0] worklist = [(args.scenario_name, 0)] else: scenario_names = _load_scenario_names(args.scenarios_dir) indices = _resolve_indices(args, len(scenario_names)) worklist = [(scenario_names[idx], idx) for idx in indices] logger.info("Preparing to generate %d scenarios", len(indices)) total_start = time.time() for counter, (scenario_name, idx) in enumerate(worklist, start=1): if idx is not None and scenario_names is not None: if idx < 0 or idx >= len(scenario_names): logger.warning("Skipping invalid scenario index %d", idx) continue dims_row = dims_df.iloc[idx % len(dims_df)] time_steps = args.time_steps or int(dims_row["time_steps"]) tx_h = args.tx_horizontal or int(dims_row["num_antennas_tx_hor"]) tx_v = args.tx_vertical or int(dims_row["num_antennas_tx_vert"]) subcarriers = args.subcarriers or int(dims_row["num_subcarriers"]) antenna = AntennaArrayConfig(tx_horizontal=tx_h, tx_vertical=tx_v, subcarriers=subcarriers) traffic = TrafficConfig( num_vehicles=args.num_vehicles, num_pedestrians=args.num_pedestrians, vehicle_speed_range=tuple(args.vehicle_speed) if args.vehicle_speed else (5 / 3.6, 60 / 3.6), pedestrian_speed_range=tuple(args.ped_speed) if args.ped_speed else (0.5, 2.0), turn_probability=args.turn_probability, max_attempts=args.max_attempts, pedestrian_angle_std=args.ped_angle_std, ) sampling = ScenarioSamplingConfig( time_steps=time_steps, continuous_length=args.continuous_length, sample_dt=args.sample_dt, continuous_mode=True, ) grid = GridConfig( road_width=args.road_width, road_center_spacing=args.road_spacing, step_size=args.grid_step if args.grid_step is not None else GridConfig.step_size, auto_step_size=(not args.disable_auto_grid_step) and args.grid_step is None, ) config = ScenarioGenerationConfig( scenario=scenario_name, antenna=antenna, sampling=sampling, traffic=traffic, grid=grid, carrier_frequency_hz=args.carrier_frequency, output_dir=args.output_dir, full_output_dir=args.full_output_dir, figures_dir=args.figures_dir, export_environment_plot=args.figures_dir is not None, rng_seed=args.seed, scenarios_dir=args.scenarios_dir, deepmimo_max_paths=args.max_paths, ) logger.info( "[%d/%d] Generating scenario '%s' (T=%d, tx=%d x %d, subc=%d)", counter, len(indices), scenario_name, time_steps, tx_h, tx_v, subcarriers, ) start_time = time.time() generator = DynamicScenarioGenerator(config, logger=logger) result = generator.generate(overwrite=args.overwrite) elapsed = time.time() - start_time status = "generated" if result.generated else "cached" logger.info( "Scenario '%s' %s in %.2f seconds -> %s", scenario_name, status, elapsed, result.output_path, ) avg_time = (time.time() - total_start) / counter remaining = avg_time * (len(indices) - counter) logger.info( "Estimated remaining time: %.2f seconds (%.2f minutes)", remaining, remaining / 60.0, ) total_elapsed = time.time() - total_start logger.info( "Completed dynamic scenario generation in %.2f seconds (%.2f minutes)", total_elapsed, total_elapsed / 60.0, ) __all__ = ["parse_args", "main"] if __name__ == "__main__": main()