Spaces:
Sleeping
Sleeping
| """Grid generator service for random grid creation.""" | |
| import random | |
| from typing import Tuple, List, Set, Optional | |
| from ..models.grid import Grid | |
| from ..models.entities import Store, Destination, Tunnel | |
| from ..models.state import SearchState | |
| from .parser import format_initial_state, format_traffic | |
| def gen_grid( | |
| width: Optional[int] = None, | |
| height: Optional[int] = None, | |
| num_stores: Optional[int] = None, | |
| num_destinations: Optional[int] = None, | |
| num_tunnels: Optional[int] = None, | |
| obstacle_density: float = 0.1, | |
| seed: Optional[int] = None, | |
| ) -> Tuple[str, str, SearchState]: | |
| """ | |
| Randomly generate a valid grid configuration. | |
| Args: | |
| width: Grid width (random 5-15 if None) | |
| height: Grid height (random 5-15 if None) | |
| num_stores: Number of stores (random 1-3 if None) | |
| num_destinations: Number of destinations (random 1-10 if None) | |
| num_tunnels: Number of tunnels (random 0-5 if None) | |
| obstacle_density: Fraction of segments to block (0.0-0.5) | |
| seed: Random seed for reproducibility | |
| Returns: | |
| Tuple of (initial_state_string, traffic_string, SearchState) | |
| """ | |
| if seed is not None: | |
| random.seed(seed) | |
| # Set defaults | |
| width = width or random.randint(5, 15) | |
| height = height or random.randint(5, 15) | |
| num_stores = num_stores or random.randint(1, 3) | |
| num_destinations = num_destinations or random.randint( | |
| 1, min(10, width * height // 4) | |
| ) | |
| num_tunnels = num_tunnels or random.randint(0, min(5, width * height // 10)) | |
| # Validate constraints | |
| num_stores = min(num_stores, 3) | |
| num_destinations = min(num_destinations, 10) | |
| # Track occupied positions | |
| occupied: Set[Tuple[int, int]] = set() | |
| # Generate stores | |
| stores = _generate_stores(width, height, num_stores, occupied) | |
| # Generate destinations | |
| destinations = _generate_destinations(width, height, num_destinations, occupied) | |
| # Generate tunnels | |
| tunnels = _generate_tunnels(width, height, num_tunnels, occupied) | |
| # Generate grid with traffic | |
| grid = _generate_traffic(width, height, obstacle_density, stores, destinations) | |
| # Create search state | |
| state = SearchState( | |
| grid=grid, stores=stores, destinations=destinations, tunnels=tunnels | |
| ) | |
| # Format strings | |
| initial_state = format_initial_state(width, height, stores, destinations, tunnels) | |
| traffic = format_traffic(grid) | |
| return initial_state, traffic, state | |
| def _generate_stores( | |
| width: int, height: int, num_stores: int, occupied: Set[Tuple[int, int]] | |
| ) -> List[Store]: | |
| """Generate store positions at corners/edges.""" | |
| stores = [] | |
| # Prefer corners | |
| corners = [ | |
| (0, 0), | |
| (width - 1, 0), | |
| (0, height - 1), | |
| (width - 1, height - 1), | |
| ] | |
| random.shuffle(corners) | |
| for i, pos in enumerate(corners[:num_stores]): | |
| stores.append(Store(id=i + 1, position=pos)) | |
| occupied.add(pos) | |
| # If need more, use edges | |
| if len(stores) < num_stores: | |
| edges = [] | |
| for x in range(1, width - 1): | |
| edges.append((x, 0)) | |
| edges.append((x, height - 1)) | |
| for y in range(1, height - 1): | |
| edges.append((0, y)) | |
| edges.append((width - 1, y)) | |
| random.shuffle(edges) | |
| for pos in edges: | |
| if pos not in occupied and len(stores) < num_stores: | |
| stores.append(Store(id=len(stores) + 1, position=pos)) | |
| occupied.add(pos) | |
| return stores | |
| def _generate_destinations( | |
| width: int, height: int, num_destinations: int, occupied: Set[Tuple[int, int]] | |
| ) -> List[Destination]: | |
| """Generate random destination positions.""" | |
| destinations = [] | |
| # Try to spread destinations across the grid | |
| available = [] | |
| for x in range(width): | |
| for y in range(height): | |
| if (x, y) not in occupied: | |
| available.append((x, y)) | |
| random.shuffle(available) | |
| for i, pos in enumerate(available[:num_destinations]): | |
| destinations.append(Destination(id=i + 1, position=pos)) | |
| occupied.add(pos) | |
| return destinations | |
| def _generate_tunnels( | |
| width: int, height: int, num_tunnels: int, occupied: Set[Tuple[int, int]] | |
| ) -> List[Tunnel]: | |
| """Generate random tunnel pairs.""" | |
| tunnels = [] | |
| # Find available positions for tunnel entrances | |
| available = [] | |
| for x in range(width): | |
| for y in range(height): | |
| if (x, y) not in occupied: | |
| available.append((x, y)) | |
| random.shuffle(available) | |
| i = 0 | |
| attempts = 0 | |
| max_attempts = len(available) * 2 | |
| while i < num_tunnels and len(available) >= 2 and attempts < max_attempts: | |
| attempts += 1 | |
| idx1 = random.randint(0, len(available) - 1) | |
| entrance1 = available[idx1] | |
| remaining = [pos for j, pos in enumerate(available) if j != idx1] | |
| if not remaining: | |
| break | |
| idx2 = random.randint(0, len(remaining) - 1) | |
| entrance2 = remaining[idx2] | |
| # Ensure tunnels are useful | |
| dist = abs(entrance1[0] - entrance2[0]) + abs(entrance1[1] - entrance2[1]) | |
| if dist >= 3: # Only create if Manhattan distance >= 3 | |
| tunnels.append(Tunnel(entrance1=entrance1, entrance2=entrance2)) | |
| occupied.add(entrance1) | |
| occupied.add(entrance2) | |
| # Remove used positions from available | |
| available.remove(entrance1) | |
| available.remove(entrance2) | |
| i += 1 | |
| return tunnels | |
| def _generate_traffic( | |
| width: int, | |
| height: int, | |
| obstacle_density: float, | |
| stores: List[Store], | |
| destinations: List[Destination], | |
| ) -> Grid: | |
| """ | |
| Generate traffic levels for all segments. | |
| Ensures connectivity between stores and destinations. | |
| """ | |
| grid = Grid(width=width, height=height) | |
| # First, add all segments with random traffic | |
| all_segments = [] | |
| # Horizontal segments | |
| for x in range(width - 1): | |
| for y in range(height): | |
| all_segments.append(((x, y), (x + 1, y))) | |
| # Vertical segments | |
| for x in range(width): | |
| for y in range(height - 1): | |
| all_segments.append(((x, y), (x, y + 1))) | |
| # Add segments with traffic | |
| for src, dst in all_segments: | |
| # Random traffic level 1-4 or blocked (0) | |
| if random.random() < obstacle_density: | |
| traffic = 0 # Blocked | |
| else: | |
| traffic = random.randint(1, 4) | |
| grid.add_segment(src, dst, traffic) | |
| # Ensure connectivity - make sure there's a path from each store to each destination | |
| _ensure_connectivity(grid, stores, destinations) | |
| return grid | |
| def _ensure_connectivity( | |
| grid: Grid, stores: List[Store], destinations: List[Destination] | |
| ) -> None: | |
| """ | |
| Ensure the grid is connected between stores and destinations. | |
| Uses BFS to check connectivity and unblocks segments if needed. | |
| """ | |
| # Get all important positions | |
| important_positions = [s.position for s in stores] + [ | |
| d.position for d in destinations | |
| ] | |
| if len(important_positions) < 2: | |
| return | |
| # Check connectivity from first store to all destinations | |
| start = stores[0].position if stores else important_positions[0] | |
| # BFS to find reachable positions | |
| visited = {start} | |
| queue = [start] | |
| while queue: | |
| current = queue.pop(0) | |
| for neighbor in grid.get_neighbors(current): | |
| if neighbor not in visited: | |
| visited.add(neighbor) | |
| queue.append(neighbor) | |
| # Check if all important positions are reachable | |
| unreachable = [pos for pos in important_positions if pos not in visited] | |
| # If some positions are unreachable, create paths to them | |
| for pos in unreachable: | |
| _create_path_to(grid, start, pos, visited) | |
| def _create_path_to( | |
| grid: Grid, | |
| start: Tuple[int, int], | |
| goal: Tuple[int, int], | |
| visited: Set[Tuple[int, int]], | |
| ) -> None: | |
| """Create a path from visited area to goal by unblocking segments.""" | |
| # Simple approach: find closest visited cell to goal and unblock path | |
| closest = min(visited, key=lambda p: abs(p[0] - goal[0]) + abs(p[1] - goal[1])) | |
| # Create path from closest to goal | |
| current = closest | |
| while current != goal: | |
| dx = 0 if goal[0] == current[0] else (1 if goal[0] > current[0] else -1) | |
| dy = 0 if goal[1] == current[1] else (1 if goal[1] > current[1] else -1) | |
| # Prefer moving in direction with larger difference | |
| if abs(goal[0] - current[0]) >= abs(goal[1] - current[1]) and dx != 0: | |
| next_pos = (current[0] + dx, current[1]) | |
| elif dy != 0: | |
| next_pos = (current[0], current[1] + dy) | |
| else: | |
| next_pos = (current[0] + dx, current[1]) | |
| # Unblock segment if blocked | |
| if grid.is_blocked(current, next_pos): | |
| grid.add_segment(current, next_pos, random.randint(1, 4)) | |
| visited.add(next_pos) | |
| current = next_pos | |