lwm-temporal / LWMTemporal /cli /generate_dynamic_data.py
Sadjad Alikhani
added the dynamic dataset generation pipeline
80a230c
from __future__ import annotations
import argparse
import time
from pathlib import Path
from typing import List, Optional, Sequence
try: # pragma: no cover - external deps may be optional in some environments
import numpy as np # type: ignore[import]
except ModuleNotFoundError as exc: # pragma: no cover
raise ImportError("numpy is required for dynamic scenario generation") from exc
try: # pragma: no cover - external deps may be optional in some environments
import pandas as pd # type: ignore[import]
except ModuleNotFoundError as exc: # pragma: no cover
raise ImportError("pandas is required for dynamic scenario generation") from exc
from ..data.scenario_generation import (
AntennaArrayConfig,
DynamicScenarioGenerator,
GridConfig,
ScenarioGenerationConfig,
ScenarioSamplingConfig,
TrafficConfig,
)
from ..utils.logging import setup_logging
try: # pragma: no cover - optional dependency for scenario listing
from deepmimo import general_utils as deepmimo_utils # type: ignore[import]
from deepmimo import config as deepmimo_config # type: ignore[import]
_HAS_DEEPMIMO = True
except Exception: # pragma: no cover - DeepMIMO not installed
deepmimo_utils = None # type: ignore[assignment]
deepmimo_config = None # type: ignore[assignment]
_HAS_DEEPMIMO = False
def _ensure_channel_dimensions(csv_path: Path, seed: int, logger) -> pd.DataFrame:
if csv_path.exists():
return pd.read_csv(csv_path)
logger.info("Channel dimension CSV not found. Generating new table at %s", csv_path)
np.random.seed(seed)
n_scenarios = 2000
max_product = 2 ** 16
dims = []
while len(dims) < n_scenarios:
a1 = np.random.randint(0, 16)
time_steps = 2 ** 4 - a1
a2 = np.random.randint(0, 6)
num_ant = 2 ** (7 - a2)
a3 = np.random.randint(0, 6)
num_sc = 2 ** (9 - a3)
prod = time_steps * num_ant * num_sc
if prod <= max_product:
h, v = max(1, int(round(np.sqrt(num_ant)))), 1
for k in range(1, int(np.sqrt(num_ant)) + 1):
if num_ant % k == 0:
h, v = num_ant // k, k
dims.append([time_steps, h, v, num_sc, prod])
arr = np.array(dims)
np.random.shuffle(arr)
df = pd.DataFrame(
arr,
columns=["time_steps", "num_antennas_tx_hor", "num_antennas_tx_vert", "num_subcarriers", "product"],
)
csv_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(csv_path, index=False)
return df
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Dynamic scenario generation pipeline")
parser.add_argument("--output-dir", type=Path, default=Path("examples/data"))
parser.add_argument("--full-output-dir", type=Path, default=Path("examples/full_data"))
parser.add_argument("--figures-dir", type=Path, default=Path("examples/figs"))
parser.add_argument("--channel-dims", type=Path, default=Path("examples/data/channel_dimensions.csv"))
parser.add_argument("--regen-dims", action="store_true", help="Regenerate the channel dimension table")
parser.add_argument("--dims-seed", type=int, default=1323)
parser.add_argument("--scenario-indices", type=int, nargs="*", default=None, help="Indices of scenarios to generate")
parser.add_argument("--scenario-range", type=int, nargs=2, metavar=("START", "END"), default=None)
parser.add_argument("--time-steps", type=int, default=None, help="Override time steps for all scenarios")
parser.add_argument("--tx-horizontal", type=int, default=None, help="Override horizontal TX elements")
parser.add_argument("--tx-vertical", type=int, default=None, help="Override vertical TX elements")
parser.add_argument("--subcarriers", type=int, default=None, help="Override number of subcarriers")
parser.add_argument("--num-vehicles", type=int, default=50)
parser.add_argument("--num-pedestrians", type=int, default=10)
parser.add_argument("--vehicle-speed", type=float, nargs=2, metavar=("MIN", "MAX"), default=None)
parser.add_argument("--ped-speed", type=float, nargs=2, metavar=("MIN", "MAX"), default=None)
parser.add_argument("--continuous-length", type=int, default=None)
parser.add_argument("--sample-dt", type=float, default=1e-3)
parser.add_argument("--turn-probability", type=float, default=0.1)
parser.add_argument("--ped-angle-std", type=float, default=0.1)
parser.add_argument("--max-attempts", type=int, default=300)
parser.add_argument("--carrier-frequency", type=float, default=3.5e9)
parser.add_argument("--scenarios-dir", type=Path, default=Path("deepmimo_scenarios"), help="Directory containing DeepMIMO scenarios")
parser.add_argument("--max-paths", type=int, default=6, help="Maximum number of paths per UE to load from DeepMIMO")
parser.add_argument("--road-width", type=float, default=2.0, help="Approximate lane width in meters for road filtering")
parser.add_argument("--road-spacing", type=float, default=10.0, help="Center-to-center spacing between parallel roads (meters)")
parser.add_argument("--grid-step", type=float, default=None, help="Manual road graph step size (meters). Overrides automatic inference when provided.")
parser.add_argument("--disable-auto-grid-step", action="store_true", help="Disable automatic road grid step inference.")
parser.add_argument("--scenario-name", type=str, default=None, help="Explicit DeepMIMO scenario name (bypass index lookup)")
parser.add_argument("--seed", type=int, default=None, help="Set RNG seed for reproducibility")
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--log-dir", type=Path, default=Path("logs"))
return parser.parse_args(argv)
def _load_scenario_names(scenarios_dir: Optional[Path]) -> List[str]:
if not _HAS_DEEPMIMO:
raise ImportError(
"DeepMIMO package is required to enumerate scenarios. Provide --scenario-name or install the package."
)
if scenarios_dir is not None:
deepmimo_config.set("scenarios_folder", str(scenarios_dir)) # type: ignore[operator]
names = deepmimo_utils.get_available_scenarios() # type: ignore[operator]
if not names:
raise RuntimeError("No DeepMIMO scenarios found in the specified directory")
return names
def _resolve_indices(args: argparse.Namespace, n_scenarios: int) -> Sequence[int]:
if args.scenario_indices:
return [int(i) for i in args.scenario_indices]
if args.scenario_range:
start, end = args.scenario_range
return list(range(start, end))
return list(range(n_scenarios))
def main(argv: Optional[Sequence[str]] = None) -> None:
args = parse_args(argv)
logger = setup_logging("LWMTemporal.dynamic_generation", args.log_dir)
if args.seed is not None:
np.random.seed(args.seed)
if args.regen_dims and args.channel_dims.exists():
args.channel_dims.unlink()
dims_df = _ensure_channel_dimensions(args.channel_dims, args.dims_seed, logger)
scenario_names: Optional[List[str]] = None
if args.scenario_name is not None:
indices = [0]
worklist = [(args.scenario_name, 0)]
else:
scenario_names = _load_scenario_names(args.scenarios_dir)
indices = _resolve_indices(args, len(scenario_names))
worklist = [(scenario_names[idx], idx) for idx in indices]
logger.info("Preparing to generate %d scenarios", len(indices))
total_start = time.time()
for counter, (scenario_name, idx) in enumerate(worklist, start=1):
if idx is not None and scenario_names is not None:
if idx < 0 or idx >= len(scenario_names):
logger.warning("Skipping invalid scenario index %d", idx)
continue
dims_row = dims_df.iloc[idx % len(dims_df)]
time_steps = args.time_steps or int(dims_row["time_steps"])
tx_h = args.tx_horizontal or int(dims_row["num_antennas_tx_hor"])
tx_v = args.tx_vertical or int(dims_row["num_antennas_tx_vert"])
subcarriers = args.subcarriers or int(dims_row["num_subcarriers"])
antenna = AntennaArrayConfig(tx_horizontal=tx_h, tx_vertical=tx_v, subcarriers=subcarriers)
traffic = TrafficConfig(
num_vehicles=args.num_vehicles,
num_pedestrians=args.num_pedestrians,
vehicle_speed_range=tuple(args.vehicle_speed) if args.vehicle_speed else (5 / 3.6, 60 / 3.6),
pedestrian_speed_range=tuple(args.ped_speed) if args.ped_speed else (0.5, 2.0),
turn_probability=args.turn_probability,
max_attempts=args.max_attempts,
pedestrian_angle_std=args.ped_angle_std,
)
sampling = ScenarioSamplingConfig(
time_steps=time_steps,
continuous_length=args.continuous_length,
sample_dt=args.sample_dt,
continuous_mode=True,
)
grid = GridConfig(
road_width=args.road_width,
road_center_spacing=args.road_spacing,
step_size=args.grid_step if args.grid_step is not None else GridConfig.step_size,
auto_step_size=(not args.disable_auto_grid_step) and args.grid_step is None,
)
config = ScenarioGenerationConfig(
scenario=scenario_name,
antenna=antenna,
sampling=sampling,
traffic=traffic,
grid=grid,
carrier_frequency_hz=args.carrier_frequency,
output_dir=args.output_dir,
full_output_dir=args.full_output_dir,
figures_dir=args.figures_dir,
export_environment_plot=args.figures_dir is not None,
rng_seed=args.seed,
scenarios_dir=args.scenarios_dir,
deepmimo_max_paths=args.max_paths,
)
logger.info(
"[%d/%d] Generating scenario '%s' (T=%d, tx=%d x %d, subc=%d)",
counter,
len(indices),
scenario_name,
time_steps,
tx_h,
tx_v,
subcarriers,
)
start_time = time.time()
generator = DynamicScenarioGenerator(config, logger=logger)
result = generator.generate(overwrite=args.overwrite)
elapsed = time.time() - start_time
status = "generated" if result.generated else "cached"
logger.info(
"Scenario '%s' %s in %.2f seconds -> %s",
scenario_name,
status,
elapsed,
result.output_path,
)
avg_time = (time.time() - total_start) / counter
remaining = avg_time * (len(indices) - counter)
logger.info(
"Estimated remaining time: %.2f seconds (%.2f minutes)",
remaining,
remaining / 60.0,
)
total_elapsed = time.time() - total_start
logger.info(
"Completed dynamic scenario generation in %.2f seconds (%.2f minutes)",
total_elapsed,
total_elapsed / 60.0,
)
__all__ = ["parse_args", "main"]
if __name__ == "__main__":
main()