Premchan369 commited on
Commit
09acbf5
·
verified ·
1 Parent(s): 7345e66

Upload qads/planner/hybrid.py

Browse files
Files changed (1) hide show
  1. qads/planner/hybrid.py +280 -0
qads/planner/hybrid.py ADDED
@@ -0,0 +1,280 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Hybrid Planner: combines classical planning with quantum optimization."""
2
+ import numpy as np
3
+ import time
4
+ from typing import Dict, Any, Optional, Tuple, List
5
+ from .graph import WorldGraph
6
+ from .classical import AStarPlanner, DStarPlanner, RRTStarPlanner
7
+
8
+
9
+ class HybridPlanner:
10
+ """
11
+ Hybrid classical + quantum planner with entropy-based activation.
12
+
13
+ Simple environments: classical planner only
14
+ Complex uncertain environments: quantum planner activated
15
+ """
16
+
17
+ def __init__(self,
18
+ config: Any,
19
+ quantum_core: Optional[Any] = None):
20
+ self.config = config
21
+ self.quantum_core = quantum_core
22
+
23
+ # Classical planners
24
+ self.classical_planners = {
25
+ 'astar': AStarPlanner(),
26
+ 'dstar': DStarPlanner(),
27
+ 'rrt_star': RRTStarPlanner()
28
+ }
29
+
30
+ # Metrics
31
+ self.stats = {
32
+ 'classical_calls': 0,
33
+ 'quantum_calls': 0,
34
+ 'total_plans': 0,
35
+ 'avg_classical_time': 0.0,
36
+ 'avg_quantum_time': 0.0
37
+ }
38
+
39
+ def plan(self,
40
+ start: Tuple[float, ...],
41
+ goal: Tuple[float, ...],
42
+ world_state: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
43
+ """
44
+ Generate plan with automatic quantum activation.
45
+ """
46
+ self.stats['total_plans'] += 1
47
+
48
+ # Build or use world graph
49
+ graph = self._build_graph(world_state)
50
+
51
+ # Check if quantum should be activated
52
+ use_quantum = self._should_activate_quantum(graph, world_state)
53
+
54
+ if use_quantum and self.quantum_core is not None:
55
+ return self._quantum_plan(graph, start, goal, world_state)
56
+ else:
57
+ return self._classical_plan(graph, start, goal)
58
+
59
+ def _build_graph(self, world_state: Optional[Dict[str, Any]]) -> WorldGraph:
60
+ """Build world graph from state or create default."""
61
+ if world_state is None:
62
+ # Create default grid
63
+ graph = WorldGraph(resolution=self.config.grid_resolution)
64
+ graph.build_grid(
65
+ bounds=((0, 20), (0, 20)),
66
+ obstacle_map=None,
67
+ uncertainty_map=None
68
+ )
69
+ return graph
70
+
71
+ # Build from world state
72
+ bounds = world_state.get('bounds', ((0, 20), (0, 20)))
73
+ obstacle_map = world_state.get('obstacle_map')
74
+ uncertainty_map = world_state.get('uncertainty_map')
75
+ resolution = world_state.get('resolution', self.config.grid_resolution)
76
+
77
+ graph = WorldGraph(resolution=resolution)
78
+ graph.build_grid(bounds=bounds,
79
+ obstacle_map=obstacle_map,
80
+ uncertainty_map=uncertainty_map)
81
+
82
+ # Add any additional nodes/edges from world state
83
+ obstacles = world_state.get('obstacles', [])
84
+ for obs in obstacles:
85
+ pos = obs['position']
86
+ node_id = graph.find_node_at(tuple(pos))
87
+ if node_id is not None:
88
+ graph.update_node(node_id,
89
+ obstacle_prob=obs.get('probability', 1.0),
90
+ risk=obs.get('risk', 1.0))
91
+
92
+ return graph
93
+
94
+ def _should_activate_quantum(self,
95
+ graph: WorldGraph,
96
+ world_state: Optional[Dict[str, Any]]) -> bool:
97
+ """Determine if quantum planner should be activated."""
98
+ # Direct check from world state
99
+ if world_state is not None:
100
+ entropy = world_state.get('entropy', 0.0)
101
+ uncertainty = world_state.get('uncertainty', 0.0)
102
+ obstacle_density = world_state.get('obstacle_density', 0.0)
103
+
104
+ # Activation conditions
105
+ activate = (
106
+ entropy > self.config.activation_entropy or
107
+ uncertainty > 0.5 or
108
+ obstacle_density > 0.4
109
+ )
110
+
111
+ if activate:
112
+ return True
113
+
114
+ # Check graph metrics
115
+ graph_entropy = graph.get_entropy()
116
+ graph_uncertainty = graph.get_uncertainty()
117
+ graph_obstacle = graph.get_obstacle_density()
118
+
119
+ return (
120
+ graph_entropy > self.config.activation_entropy or
121
+ graph_uncertainty > 0.5 or
122
+ graph_obstacle > 0.4
123
+ )
124
+
125
+ def _classical_plan(self,
126
+ graph: WorldGraph,
127
+ start: Tuple[float, ...],
128
+ goal: Tuple[float, ...]) -> Dict[str, Any]:
129
+ """Classical planning path."""
130
+ start_time = time.time()
131
+ planner = self.classical_planners.get(
132
+ self.config.classical_planner,
133
+ AStarPlanner()
134
+ )
135
+
136
+ result = planner.plan(graph, start, goal)
137
+ elapsed = time.time() - start_time
138
+
139
+ self.stats['classical_calls'] += 1
140
+ self.stats['avg_classical_time'] = (
141
+ (self.stats['avg_classical_time'] * (self.stats['classical_calls'] - 1) + elapsed) /
142
+ self.stats['classical_calls']
143
+ )
144
+
145
+ result.update({
146
+ 'quantum_activated': False,
147
+ 'plan_time': elapsed,
148
+ 'start': start,
149
+ 'goal': goal
150
+ })
151
+
152
+ # Convert path to actions
153
+ result['actions'] = self._path_to_actions(result.get('path_positions', []))
154
+ result['goal_reached'] = result.get('success', False)
155
+
156
+ return result
157
+
158
+ def _quantum_plan(self,
159
+ graph: WorldGraph,
160
+ start: Tuple[float, ...],
161
+ goal: Tuple[float, ...],
162
+ world_state: Optional[Dict[str, Any]]) -> Dict[str, Any]:
163
+ """Quantum-enhanced planning path."""
164
+ start_time = time.time()
165
+
166
+ # Step 1: Generate classical candidates
167
+ classical_results = []
168
+ for planner_name, planner in self.classical_planners.items():
169
+ r = planner.plan(graph, start, goal)
170
+ if r['success']:
171
+ classical_results.append(r)
172
+
173
+ if not classical_results:
174
+ # Fall back to classical only
175
+ return self._classical_plan(graph, start, goal)
176
+
177
+ # Step 2: Generate trajectory candidates from classical plans
178
+ trajectories = []
179
+ for r in classical_results:
180
+ positions = np.array(r['path_positions'])
181
+ if len(positions) > 0:
182
+ trajectories.append(positions)
183
+
184
+ # Step 3: Quantum evaluation of trajectories
185
+ if self.quantum_core is not None and trajectories:
186
+ # Compute world state for quantum evaluation
187
+ world_state_for_quantum = {
188
+ 'entropy': graph.get_entropy(),
189
+ 'uncertainty': graph.get_uncertainty(),
190
+ 'obstacle_density': graph.get_obstacle_density(),
191
+ 'risk_score': np.mean([m['risk'] for m in graph.node_metadata.values()])
192
+ }
193
+
194
+ evaluated = self.quantum_core.evaluate_trajectories(
195
+ trajectories, world_state_for_quantum
196
+ )
197
+
198
+ # Select best trajectory
199
+ if evaluated:
200
+ best = evaluated[0]
201
+ best_traj = best['trajectory']
202
+
203
+ # Convert back to node IDs
204
+ path = []
205
+ path_positions = []
206
+ for pos in best_traj:
207
+ path_positions.append(tuple(pos))
208
+ # Find nearest node
209
+ node_id = None
210
+ for nid, npos in graph.node_positions.items():
211
+ if np.allclose(npos, pos, atol=graph.resolution):
212
+ node_id = nid
213
+ break
214
+ if node_id is not None:
215
+ path.append(node_id)
216
+
217
+ elapsed = time.time() - start_time
218
+ self.stats['quantum_calls'] += 1
219
+ self.stats['avg_quantum_time'] = (
220
+ (self.stats['avg_quantum_time'] * (self.stats['quantum_calls'] - 1) + elapsed) /
221
+ self.stats['quantum_calls']
222
+ )
223
+
224
+ result = {
225
+ 'path': path,
226
+ 'path_positions': path_positions,
227
+ 'cost': best['quantum_score'],
228
+ 'success': True,
229
+ 'explored': len(classical_results),
230
+ 'planner': 'hybrid_quantum',
231
+ 'quantum_activated': True,
232
+ 'quantum_score': best['quantum_score'],
233
+ 'entropy': best['entropy'],
234
+ 'confidence': best['confidence'],
235
+ 'uncertainty': best['uncertainty'],
236
+ 'plan_time': elapsed,
237
+ 'start': start,
238
+ 'goal': goal,
239
+ 'goal_reached': True,
240
+ 'actions': self._path_to_actions(path_positions)
241
+ }
242
+ return result
243
+
244
+ # Quantum failed or not available, use best classical
245
+ best_classical = min(classical_results, key=lambda x: x['cost'])
246
+ elapsed = time.time() - start_time
247
+
248
+ self.stats['classical_calls'] += 1
249
+ best_classical.update({
250
+ 'quantum_activated': False,
251
+ 'quantum_score': 0.0,
252
+ 'plan_time': elapsed,
253
+ 'goal_reached': best_classical.get('success', False)
254
+ })
255
+ best_classical['actions'] = self._path_to_actions(
256
+ best_classical.get('path_positions', [])
257
+ )
258
+ return best_classical
259
+
260
+ def _path_to_actions(self, path_positions: List[Tuple[float, ...]]) -> List[np.ndarray]:
261
+ """Convert path positions to action vectors."""
262
+ if len(path_positions) < 2:
263
+ return []
264
+
265
+ actions = []
266
+ for i in range(len(path_positions) - 1):
267
+ current = np.array(path_positions[i])
268
+ next_pos = np.array(path_positions[i + 1])
269
+ action = next_pos - current
270
+ actions.append(action)
271
+
272
+ return actions
273
+
274
+ def update_world_state(self, world_state: Dict[str, Any]):
275
+ """Update planner with new world state."""
276
+ pass # State is rebuilt each plan call
277
+
278
+ def get_stats(self) -> Dict[str, Any]:
279
+ """Get planning statistics."""
280
+ return self.stats.copy()