AumCoreAI commited on
Commit
1f0cbed
·
verified ·
1 Parent(s): ff9a92a

Create system_orchestrator.py

Browse files
Files changed (1) hide show
  1. system_orchestrator.py +228 -0
system_orchestrator.py ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ import json
4
+ import os
5
+ import sys
6
+ import uuid
7
+ import signal
8
+ import time
9
+ import psutil
10
+ import threading
11
+ from abc import ABC, abstractmethod
12
+ from datetime import datetime
13
+ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
14
+ from typing import Dict, Any, List, Optional, Union, Callable
15
+ from dataclasses import dataclass, field
16
+
17
+ # ==========================================
18
+ # 1. ADVANCED CONFIGURATION & CONSTANTS
19
+ # ==========================================
20
+ @dataclass
21
+ class SystemConfig:
22
+ version: str = "2.0.1-Stable"
23
+ max_workers: int = os.cpu_count() or 4
24
+ timeout: int = 60
25
+ memory_limit_mb: int = 512
26
+ db_path: str = "data/system_state.json"
27
+ log_file: str = "logs/aumcore_main.log"
28
+ enable_telemetry: bool = True
29
+
30
+ # ==========================================
31
+ # 2. ENHANCED LOGGING SYSTEM
32
+ # ==========================================
33
+ class AumLogger:
34
+ """Professional Logging System with Multiple Outputs"""
35
+ def __init__(self, name: str, log_file: str):
36
+ if not os.path.exists('logs'): os.makedirs('logs')
37
+ self.logger = logging.getLogger(name)
38
+ self.logger.setLevel(logging.DEBUG)
39
+
40
+ formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
41
+
42
+ # File Handler
43
+ fh = logging.FileHandler(log_file)
44
+ fh.setFormatter(formatter)
45
+ self.logger.addHandler(fh)
46
+
47
+ # Console Handler
48
+ ch = logging.StreamHandler()
49
+ ch.setFormatter(formatter)
50
+ self.logger.addHandler(ch)
51
+
52
+ def info(self, msg): self.logger.info(msg)
53
+ def error(self, msg): self.logger.error(msg)
54
+ def warning(self, msg): self.logger.warning(msg)
55
+
56
+ # ==========================================
57
+ # 3. CUSTOM EXCEPTIONS & ERROR HANDLING
58
+ # ==========================================
59
+ class AumCoreError(Exception):
60
+ """Base exception for AumCore system"""
61
+ def __init__(self, message: str, error_code: int):
62
+ super().__init__(message)
63
+ self.error_code = error_code
64
+
65
+ # ==========================================
66
+ # 4. PERFORMANCE TRACKING DECORATORS
67
+ # ==========================================
68
+ def monitor_system_resources(func: Callable):
69
+ """Decorator to monitor CPU and Memory usage during execution"""
70
+ @functools.wraps(func)
71
+ async def wrapper(*args, **kwargs):
72
+ process = psutil.Process(os.getpid())
73
+ mem_before = process.memory_info().rss / (1024 * 1024)
74
+ start_time = time.perf_counter()
75
+
76
+ result = await func(*args, **kwargs)
77
+
78
+ end_time = time.perf_counter()
79
+ mem_after = process.memory_info().rss / (1024 * 1024)
80
+
81
+ logging.info(f"Method {func.__name__} executed. Time: {end_time-start_time:.2f}s | Mem Delta: {mem_after-mem_before:.2f}MB")
82
+ return result
83
+ return wrapper
84
+
85
+ import functools # Re-importing inside for safety if used as standalone
86
+
87
+ # ==========================================
88
+ # 5. CORE INTERFACES (CONTRACTS)
89
+ # ==========================================
90
+ class IEngine(ABC):
91
+ @abstractmethod
92
+ async def startup(self): pass
93
+ @abstractmethod
94
+ async def shutdown(self): pass
95
+ @abstractmethod
96
+ async def process_request(self, data: Any): pass
97
+
98
+ # ==========================================
99
+ # 6. DATA PERSISTENCE LAYER
100
+ # ==========================================
101
+ class StateManager:
102
+ """Handles Saving and Loading System State"""
103
+ def __init__(self, path: str):
104
+ self.path = path
105
+ self._lock = threading.Lock()
106
+ self._init_storage()
107
+
108
+ def _init_storage(self):
109
+ if not os.path.exists(os.path.dirname(self.path)):
110
+ os.makedirs(os.path.dirname(self.path))
111
+ if not os.path.exists(self.path):
112
+ self.save_state({"sessions": [], "metrics": {}})
113
+
114
+ def save_state(self, data: Dict):
115
+ with self._lock:
116
+ with open(self.path, 'w') as f:
117
+ json.dump(data, f, indent=4)
118
+
119
+ def load_state(self) -> Dict:
120
+ with self._lock:
121
+ with open(self.path, 'r') as f:
122
+ return json.load(f)
123
+
124
+ # ==========================================
125
+ # 7. ASYNC AI TASK PROCESSOR
126
+ # ==========================================
127
+ class TaskProcessor(IEngine):
128
+ def __init__(self, config: SystemConfig):
129
+ self.config = config
130
+ self.logger = AumLogger("TaskProcessor", config.log_file)
131
+ self.thread_pool = ThreadPoolExecutor(max_workers=config.max_workers)
132
+ self.process_pool = ProcessPoolExecutor(max_workers=2)
133
+ self.active_tasks = 0
134
+
135
+ async def startup(self):
136
+ self.logger.info(f"System Version {self.config.version} starting up...")
137
+ self.logger.info(f"CPU Workers available: {self.config.max_workers}")
138
+ await asyncio.sleep(1) # Simulated hardware check
139
+
140
+ async def shutdown(self):
141
+ self.logger.info("Cleaning up resources...")
142
+ self.thread_pool.shutdown(wait=True)
143
+ self.process_pool.shutdown(wait=True)
144
+
145
+ @monitor_system_resources
146
+ async def process_request(self, payload: Dict):
147
+ self.active_tasks += 1
148
+ request_id = payload.get("id", str(uuid.uuid4()))
149
+
150
+ try:
151
+ loop = asyncio.get_running_loop()
152
+ # Offloading heavy computation to process pool to avoid blocking Event Loop
153
+ result = await loop.run_in_executor(
154
+ self.process_pool,
155
+ self._compute_heavy_logic,
156
+ payload.get("data")
157
+ )
158
+ return {"id": request_id, "result": result, "timestamp": str(datetime.now())}
159
+ except Exception as e:
160
+ self.logger.error(f"Processing failed: {str(e)}")
161
+ return {"error": "Processing Error", "id": request_id}
162
+ finally:
163
+ self.active_tasks -= 1
164
+
165
+ @staticmethod
166
+ def _compute_heavy_logic(data: str) -> str:
167
+ """Isolated function for CPU intensive work"""
168
+ time.sleep(2) # Simulated Neural Network Inference
169
+ return f"PROCESSED_DATA_{data.upper()}"
170
+
171
+ # ==========================================
172
+ # 8. MASTER ORCHESTRATOR
173
+ # ==========================================
174
+ class AumCoreMaster:
175
+ def __init__(self):
176
+ self.config = SystemConfig()
177
+ self.logger = AumLogger("Master", self.config.log_file)
178
+ self.state = StateManager(self.config.db_path)
179
+ self.processor = TaskProcessor(self.config)
180
+ self.is_running = True
181
+
182
+ # Register OS Signals for Graceful Exit
183
+ signal.signal(signal.SIGINT, self._signal_handler)
184
+ signal.signal(signal.SIGTERM, self._signal_handler)
185
+
186
+ def _signal_handler(self, sig, frame):
187
+ self.logger.warning(f"Signal {sig} received. Shutting down...")
188
+ self.is_running = False
189
+
190
+ async def run_forever(self):
191
+ await self.processor.startup()
192
+ self.logger.info("Master loop started. Listening for tasks...")
193
+
194
+ try:
195
+ while self.is_running:
196
+ # In a real app, this would be a web server or message queue listener
197
+ dummy_input = f"input_batch_{int(time.time())}"
198
+ response = await self.processor.process_request({"data": dummy_input})
199
+
200
+ # Update State
201
+ current_state = self.state.load_state()
202
+ current_state["sessions"].append(response)
203
+ self.state.save_state(current_state)
204
+
205
+ self.logger.info(f"Task Handled: {response['id']}")
206
+ await asyncio.sleep(5) # Throttling for demo
207
+ except Exception as e:
208
+ self.logger.error(f"Critical System Failure: {str(e)}")
209
+ finally:
210
+ await self.processor.shutdown()
211
+
212
+ # ==========================================
213
+ # 9. EXECUTION ENTRY POINT
214
+ # ==========================================
215
+ async def bootstrap():
216
+ """Main Entry point with proper lifecycle management"""
217
+ master = AumCoreMaster()
218
+ await master.run_forever()
219
+
220
+ if __name__ == "__main__":
221
+ try:
222
+ if sys.platform == 'win32':
223
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
224
+ asyncio.run(bootstrap())
225
+ except KeyboardInterrupt:
226
+ pass
227
+ except Exception as e:
228
+ print(f"FATAL ERROR: {e}")