File size: 4,727 Bytes
e067c2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Breadth-First Search algorithm."""
from typing import Tuple, Optional, List, Generator, TYPE_CHECKING

if TYPE_CHECKING:
    from ..core.generic_search import GenericSearch

from ..core.node import SearchNode
from ..core.frontier import QueueFrontier
from ..models.state import PathResult, SearchStep


def bfs_search(
    problem: 'GenericSearch',
    visualize: bool = False
) -> Tuple[PathResult, Optional[List[SearchStep]]]:
    """
    Breadth-first search using FIFO queue.

    Finds path with minimum number of steps (not minimum cost).
    Complete and optimal for unweighted graphs.

    Args:
        problem: The search problem to solve
        visualize: If True, collect visualization steps

    Returns:
        Tuple of (PathResult, Optional[List[SearchStep]])
    """
    frontier = QueueFrontier()
    start = problem.initial_state()
    start_node = SearchNode(state=start, path_cost=0, depth=0)
    frontier.push(start_node)

    explored: set = set()
    nodes_expanded = 0
    steps: List[SearchStep] = [] if visualize else None

    while not frontier.is_empty():
        node = frontier.pop()

        # Record step for visualization
        if visualize:
            steps.append(SearchStep(
                step_number=nodes_expanded,
                current_node=node.state,
                action=node.action,
                frontier=frontier.get_states(),
                explored=list(explored),
                current_path=node.get_path(),
                path_cost=node.path_cost
            ))

        # Goal test after pop (standard BFS)
        if problem.goal_test(node.state):
            return PathResult(
                plan=node.get_solution(),
                cost=node.path_cost,
                nodes_expanded=nodes_expanded,
                path=node.get_path()
            ), steps

        # Skip if already explored
        if node.state in explored:
            continue

        explored.add(node.state)
        nodes_expanded += 1

        # Expand node
        for action in problem.actions(node.state):
            child_state = problem.result(node.state, action)
            if child_state not in explored and not frontier.contains_state(child_state):
                step_cost = problem.step_cost(node.state, action, child_state)
                child = SearchNode(
                    state=child_state,
                    parent=node,
                    action=action,
                    path_cost=node.path_cost + step_cost,
                    depth=node.depth + 1
                )
                frontier.push(child)

    # No solution found
    return PathResult(
        plan="",
        cost=float('inf'),
        nodes_expanded=nodes_expanded,
        path=[]
    ), steps


def bfs_search_generator(
    problem: 'GenericSearch'
) -> Generator[SearchStep, None, PathResult]:
    """
    Generator version of BFS that yields steps during execution.

    Args:
        problem: The search problem to solve

    Yields:
        SearchStep objects

    Returns:
        Final PathResult
    """
    frontier = QueueFrontier()
    start = problem.initial_state()
    start_node = SearchNode(state=start, path_cost=0, depth=0)
    frontier.push(start_node)

    explored: set = set()
    nodes_expanded = 0

    while not frontier.is_empty():
        node = frontier.pop()

        yield SearchStep(
            step_number=nodes_expanded,
            current_node=node.state,
            action=node.action,
            frontier=frontier.get_states(),
            explored=list(explored),
            current_path=node.get_path(),
            path_cost=node.path_cost
        )

        if problem.goal_test(node.state):
            return PathResult(
                plan=node.get_solution(),
                cost=node.path_cost,
                nodes_expanded=nodes_expanded,
                path=node.get_path()
            )

        if node.state in explored:
            continue

        explored.add(node.state)
        nodes_expanded += 1

        for action in problem.actions(node.state):
            child_state = problem.result(node.state, action)
            if child_state not in explored and not frontier.contains_state(child_state):
                step_cost = problem.step_cost(node.state, action, child_state)
                child = SearchNode(
                    state=child_state,
                    parent=node,
                    action=action,
                    path_cost=node.path_cost + step_cost,
                    depth=node.depth + 1
                )
                frontier.push(child)

    return PathResult(
        plan="",
        cost=float('inf'),
        nodes_expanded=nodes_expanded,
        path=[]
    )