Premchan369 commited on
Commit
cf61dba
·
verified ·
1 Parent(s): 8d1063d

Upload qads/core/system.py

Browse files
Files changed (1) hide show
  1. qads/core/system.py +133 -0
qads/core/system.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """QADS Main System Integration."""
2
+ import numpy as np
3
+ from typing import Dict, Any, Optional, Tuple, List
4
+ from .config import QADSConfig
5
+ from ..quantum.core import QuantumDecisionCore
6
+ from ..planner.hybrid import HybridPlanner
7
+ from ..simulation.environment import SimulationEnvironment
8
+ from ..perception.fusion import SensorFusion
9
+ from ..control.interface import ControlInterface
10
+
11
+
12
+ class QADSSystem:
13
+ """
14
+ Quantum Autonomous Decision System.
15
+
16
+ Integrates perception, quantum decision core, hybrid planning,
17
+ reinforcement learning, and control into a unified framework.
18
+ """
19
+
20
+ def __init__(self, config: Optional[QADSConfig] = None):
21
+ self.config = config or QADSConfig()
22
+ self.perception = SensorFusion(self.config.perception)
23
+ self.quantum_core = QuantumDecisionCore(self.config.quantum)
24
+ self.planner = HybridPlanner(self.config.planner, self.quantum_core)
25
+ self.control = ControlInterface()
26
+ self.env: Optional[SimulationEnvironment] = None
27
+ self.state_history: List[Dict[str, Any]] = []
28
+ self.mission_log: List[Dict[str, Any]] = []
29
+
30
+ def initialize_environment(self, env_config: Optional[Dict[str, Any]] = None) -> SimulationEnvironment:
31
+ """Initialize simulation environment."""
32
+ config = env_config or {}
33
+ self.env = SimulationEnvironment(self.config.simulation, **config)
34
+ return self.env
35
+
36
+ def perceive(self, raw_sensors: Dict[str, np.ndarray]) -> Dict[str, Any]:
37
+ """Process raw sensor data into structured world state."""
38
+ return self.perception.process(raw_sensors)
39
+
40
+ def plan(self,
41
+ start: Tuple[float, ...],
42
+ goal: Tuple[float, ...],
43
+ world_state: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
44
+ """Generate plan from start to goal."""
45
+ if world_state is None and self.env is not None:
46
+ world_state = self.env.get_world_state()
47
+
48
+ plan = self.planner.plan(start, goal, world_state)
49
+ self.state_history.append({
50
+ 'start': start,
51
+ 'goal': goal,
52
+ 'plan': plan,
53
+ 'world_state': world_state
54
+ })
55
+ return plan
56
+
57
+ def execute(self, plan: Dict[str, Any]) -> Dict[str, Any]:
58
+ """Execute plan and return execution results."""
59
+ if self.env is None:
60
+ raise RuntimeError("Environment not initialized")
61
+
62
+ trajectory = []
63
+ rewards = []
64
+ done = False
65
+ step = 0
66
+
67
+ current_pos = plan['start']
68
+ actions = plan.get('actions', [])
69
+
70
+ for action in actions:
71
+ if done:
72
+ break
73
+
74
+ next_state, reward, terminated, truncated, info = self.env.step(action)
75
+ trajectory.append({
76
+ 'position': next_state['position'],
77
+ 'action': action,
78
+ 'reward': reward,
79
+ 'info': info
80
+ })
81
+ rewards.append(reward)
82
+ done = terminated or truncated
83
+ step += 1
84
+
85
+ result = {
86
+ 'success': plan['goal_reached'] if 'goal_reached' in plan else False,
87
+ 'trajectory': trajectory,
88
+ 'total_reward': sum(rewards),
89
+ 'steps': step,
90
+ 'plan_metrics': plan.get('metrics', {})
91
+ }
92
+ self.mission_log.append(result)
93
+ return result
94
+
95
+ def run_mission(self,
96
+ start: Tuple[float, ...],
97
+ goal: Tuple[float, ...],
98
+ max_retries: int = 3) -> Dict[str, Any]:
99
+ """Run complete mission: perceive → plan → execute."""
100
+ for attempt in range(max_retries):
101
+ world_state = self.env.get_world_state() if self.env else None
102
+ plan = self.plan(start, goal, world_state)
103
+ result = self.execute(plan)
104
+
105
+ if result['success']:
106
+ result['attempts'] = attempt + 1
107
+ return result
108
+
109
+ # Replan if failed
110
+ if attempt < max_retries - 1:
111
+ self.planner.update_world_state(self.env.get_world_state())
112
+
113
+ result['attempts'] = max_retries
114
+ return result
115
+
116
+ def get_quantum_metrics(self) -> Dict[str, float]:
117
+ """Get quantum computation metrics."""
118
+ return self.quantum_core.get_metrics()
119
+
120
+ def get_mission_summary(self) -> Dict[str, Any]:
121
+ """Get summary of all missions."""
122
+ if not self.mission_log:
123
+ return {}
124
+
125
+ successes = sum(1 for m in self.mission_log if m['success'])
126
+ return {
127
+ 'total_missions': len(self.mission_log),
128
+ 'successes': successes,
129
+ 'failures': len(self.mission_log) - successes,
130
+ 'success_rate': successes / len(self.mission_log),
131
+ 'avg_steps': np.mean([m['steps'] for m in self.mission_log]),
132
+ 'avg_reward': np.mean([m['total_reward'] for m in self.mission_log])
133
+ }