AumCoreAI commited on
Commit
a25b588
·
verified ·
1 Parent(s): 93650a4

Update modules/orchestrator.py

Browse files
Files changed (1) hide show
  1. modules/orchestrator.py +125 -116
modules/orchestrator.py CHANGED
@@ -13,199 +13,208 @@ import re
13
  from datetime import datetime
14
  from typing import Dict, Any, List, Optional, Union, Callable
15
  from dataclasses import dataclass, field
16
- from fastapi import APIRouter, Form, Request
17
 
18
  # =================================================================
19
- # 1. TITAN ENTERPRISE CONFIGURATION (2.0)
20
  # =================================================================
21
  @dataclass
22
  class MasterConfig:
23
- version: str = "4.2.0-Titan-Stable"
24
  max_workers: int = os.cpu_count() or 4
25
- timeout: int = 150
26
  memory_limit_mb: int = 1024
27
- log_file: str = "logs/orchestrator_titan.log"
28
- diagnostics_log: str = "logs/titan_diagnostics.log"
29
  username: str = "AumCoreAI"
30
  repo_name: str = "aumcore-m7b-docker"
31
  branch: str = "main"
 
32
 
33
  # =================================================================
34
- # 2. HIGH-LEVEL AUDIT SYSTEM
35
  # =================================================================
36
- class TitanAudit:
37
- """Enterprise logging for tracking every AI decision"""
38
  def __init__(self, config: MasterConfig):
39
  if not os.path.exists('logs'): os.makedirs('logs')
40
- self.logger = logging.getLogger("TitanMaster")
41
  self.logger.setLevel(logging.DEBUG)
42
  if not self.logger.handlers:
43
- fmt = logging.Formatter('%(asctime)s | [%(levelname)s] | %(message)s')
44
  fh = logging.FileHandler(config.log_file)
 
 
 
 
 
 
45
  fh.setFormatter(fmt)
46
  self.logger.addHandler(fh)
 
47
  ch = logging.StreamHandler()
48
  ch.setFormatter(fmt)
49
  self.logger.addHandler(ch)
50
 
51
- def info(self, msg): self.logger.info(msg)
52
- def error(self, msg): self.logger.error(msg)
53
- def warn(self, msg): self.logger.warning(msg)
 
 
54
 
55
  # =================================================================
56
- # 3. CORE LOGIC PIPELINE (The Brain)
57
  # =================================================================
58
- class TitanPipeline:
59
- """Manages AI Modules and ensures Professional Output"""
60
- def __init__(self, client, config: MasterConfig, audit: TitanAudit):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  self.client = client
62
  self.config = config
63
  self.audit = audit
 
64
 
65
- async def execute_task(self, user_query: str) -> str:
66
- """The Master Sequence: Reason -> Gen -> Style"""
67
- req_id = f"AUM-{uuid.uuid4().hex[:6].upper()}"
68
- self.audit.info(f"Task {req_id} started: {user_query[:30]}")
69
 
70
  try:
71
- # Step 1: Intelligence Hook (Logic Check)
72
- logic_data = await self._call_module("code_intelligence", "analyze_logic", user_query)
73
 
74
- # Step 2: Expert Generation (With Hardcoded Persona)
75
- # IMPORTANT: We use clean Markdown here to keep your COPY BUTTON active
76
- final_response = await self._generate_with_expert_persona(logic_data or user_query)
77
 
78
- # Step 3: Reviewer Hook (Bug detection)
79
- reviewed_data = await self._call_module("code_reviewer", "review_code", final_response)
80
 
81
- self.audit.info(f"Task {req_id} completed successfully.")
82
- return reviewed_data or final_response
 
 
 
83
 
84
  except Exception as e:
85
- self.audit.error(f"Pipeline Crash on {req_id}: {str(e)}")
86
- return f"❌ **System Error:** {str(e)}"
87
-
88
- async def _generate_with_expert_persona(self, prompt: str) -> str:
89
- """Calls Groq with Strict Senior Developer Persona"""
90
- if not self.client: return "Groq client is offline."
91
-
92
- system_instruction = (
93
- "YOU ARE A SENIOR PRINCIPAL ENGINEER AT AUMCORE AI. "
94
- "YOUR GOAL: PROVIDE ENTERPRISE-GRADE PYTHON SOLUTIONS. "
95
- "RULES: "
96
- "1. ALWAYS USE STANDARD MARKDOWN CODE BLOCKS (```python ... ```). "
97
- "2. NEVER CHIPKAO (STICK) IMPORTS; USE PROPER NEWLINES. "
98
- "3. INCLUDE TYPE HINTS, DOCSTRINGS, AND TRY-EXCEPT BLOCKS. "
99
- "4. DO NOT USE MANUAL HTML DIVS; KEEP IT NATIVE MARKDOWN FOR THE COPY BUTTON. "
100
- "5. OUTPUT MUST BE CLEAN, WELL-SPACED, AND LOGICAL."
101
  )
102
 
103
  try:
104
  loop = asyncio.get_event_loop()
105
- response = await loop.run_in_executor(None, lambda: self.client.chat.completions.create(
106
  model="llama-3.3-70b-versatile",
107
  messages=[
108
- {"role": "system", "content": system_instruction},
109
  {"role": "user", "content": prompt}
110
  ],
111
- temperature=0.2,
112
- max_tokens=3000
113
  ))
114
- return response.choices[0].message.content
115
  except Exception as e:
116
- raise Exception(f"Titan Generation Failed: {str(e)}")
117
 
118
- async def _call_module(self, mod_name: str, method: str, data: str):
119
- """Dynamic cross-module communication"""
120
  module = sys.modules.get(mod_name)
121
  if module and hasattr(module, method):
122
  try:
 
123
  func = getattr(module, method)
124
- if asyncio.iscoroutinefunction(func):
125
- return await func(data)
126
- return func(data)
127
  except Exception as e:
128
- self.audit.warn(f"Module {mod_name} execution error: {e}")
129
  return None
130
 
131
  # =================================================================
132
- # 4. SYSTEM HEALTH & DIAGNOSTICS (Updated)
133
- # =================================================================
134
- class TitanMonitor:
135
- def __init__(self, config: MasterConfig):
136
- self.config = config
137
-
138
- async def run_diagnostics(self) -> Dict:
139
- """Deep system check"""
140
- process = psutil.Process(os.getpid())
141
- return {
142
- "uptime": f"{round(time.time() - psutil.boot_time())}s",
143
- "memory_usage": f"{process.memory_info().rss / 1024 / 1024:.2f} MB",
144
- "cpu_percent": f"{psutil.cpu_percent()}%",
145
- "active_threads": threading.active_count(),
146
- "python_env": sys.version.split()[0]
147
- }
148
-
149
- # =================================================================
150
- # 5. THE MASTER ORCHESTRATOR CLASS
151
  # =================================================================
152
  class AumCoreMaster:
153
- """The Ultimate Controller of the AI Ecosystem"""
154
  def __init__(self, client=None):
155
  self.config = MasterConfig()
156
- self.audit = TitanAudit(self.config)
157
- self.pipeline = TitanPipeline(client, self.config, self.audit)
158
- self.monitor = TitanMonitor(self.config)
159
- self.is_running = True
 
 
 
 
160
 
161
- # OS Signal handling
162
- signal.signal(signal.SIGINT, self._handle_exit)
163
-
164
- def _handle_exit(self, sig, frame):
165
- self.audit.warn("Titan is shutting down gracefully...")
166
- self.is_running = False
167
-
168
- async def process_request(self, message: str) -> str:
169
- """Public entry point for professional chat"""
170
- # Detection logic
171
- triggers = ["code", "script", "python", "build", "create", "fix"]
172
- if any(t in message.lower() for t in triggers):
173
- return await self.pipeline.execute_task(message)
174
 
175
- # Fallback for normal conversation (Still runs through Titan Brain)
176
- return await self.pipeline.execute_task(f"Chat mode: {message}")
177
 
178
  # =================================================================
179
- # 6. FASTAPI MODULE REGISTRATION
180
  # =================================================================
181
  def register_module(app, client, username):
182
- """Integrates Titan Orchestrator into the FastAPI Space"""
 
 
 
183
  master = AumCoreMaster(client)
184
- router = APIRouter(prefix="/system", tags=["Titan Core"])
185
 
186
  @router.post("/orchestrate")
187
- async def orchestrate_v2(message: str = Form(...)):
188
- """Primary endpoint for all AI logic"""
189
- response = await master.process_request(message)
190
  return {"response": response}
191
 
192
- @router.get("/status/titan")
193
- async def titan_status():
194
- report = await master.monitor.run_diagnostics()
195
- return {"status": "Titan Active", "report": report}
 
 
 
 
 
196
 
 
197
  app.include_router(router)
198
  app.state.orchestrator = master
199
 
200
- print("\n" + "="*60)
201
- print(f"🚀 TITAN ORCHESTRATOR v{master.config.version} DEPLOYED")
202
- print(f"✅ Copy-Button Native Support: ACTIVE")
203
- print(f"✅ Professional Coding Pipeline: ACTIVE")
204
- print(f" User: {username} | Repo: {master.config.repo_name}")
205
- print("="*60 + "\n")
206
 
207
  # =================================================================
208
- # 7. BOOTSTRAP INITIALIZATION
209
  # =================================================================
210
  if __name__ == "__main__":
211
- print("Titan Orchestrator standalone check...")
 
13
  from datetime import datetime
14
  from typing import Dict, Any, List, Optional, Union, Callable
15
  from dataclasses import dataclass, field
16
+ from fastapi import APIRouter, Form, Request, Response
17
 
18
  # =================================================================
19
+ # 1. ENTERPRISE TITAN CONFIGURATION (ULTIMATE)
20
  # =================================================================
21
  @dataclass
22
  class MasterConfig:
23
+ version: str = "5.0.0-Titan-Core"
24
  max_workers: int = os.cpu_count() or 4
25
+ timeout: int = 180
26
  memory_limit_mb: int = 1024
27
+ log_file: str = "logs/orchestrator_titan_v5.log"
 
28
  username: str = "AumCoreAI"
29
  repo_name: str = "aumcore-m7b-docker"
30
  branch: str = "main"
31
+ system_persona: str = "Senior Principal Software Architect"
32
 
33
  # =================================================================
34
+ # 2. ADVANCED TELEMETRY & AUDIT ENGINE
35
  # =================================================================
36
+ class TitanAuditEngine:
37
+ """Professional logging and system auditing"""
38
  def __init__(self, config: MasterConfig):
39
  if not os.path.exists('logs'): os.makedirs('logs')
40
+ self.logger = logging.getLogger("TitanV5")
41
  self.logger.setLevel(logging.DEBUG)
42
  if not self.logger.handlers:
43
+ fmt = logging.Formatter('%(asctime)s | [%(levelname)s] | ID:%(id)s | %(message)s')
44
  fh = logging.FileHandler(config.log_file)
45
+ # Custom filter to add unique IDs to logs
46
+ class IDFilter(logging.Filter):
47
+ def filter(self, record):
48
+ record.id = getattr(record, 'id', 'SYSTEM')
49
+ return True
50
+ fh.addFilter(IDFilter())
51
  fh.setFormatter(fmt)
52
  self.logger.addHandler(fh)
53
+
54
  ch = logging.StreamHandler()
55
  ch.setFormatter(fmt)
56
  self.logger.addHandler(ch)
57
 
58
+ def log(self, level: str, msg: str, req_id: str = "CORE"):
59
+ extra = {'id': req_id}
60
+ if level == "info": self.logger.info(msg, extra=extra)
61
+ elif level == "error": self.logger.error(msg, extra=extra)
62
+ elif level == "warn": self.logger.warning(msg, extra=extra)
63
 
64
  # =================================================================
65
+ # 3. RESPONSE SANITIZER (THE BLACK & WHITE KILLER)
66
  # =================================================================
67
+ class ResponseSanitizer:
68
+ """Ensures AI output is always professional Markdown for Copy Buttons"""
69
+ @staticmethod
70
+ def fix_formatting(text: str) -> str:
71
+ # Fix 1: Replace jhandu '------Python' with '```python'
72
+ text = re.sub(r"-+Python", "```python", text, flags=re.IGNORECASE)
73
+ text = re.sub(r"-+Code", "```python", text, flags=re.IGNORECASE)
74
+
75
+ # Fix 2: Ensure code blocks have proper closure
76
+ if "```python" in text and "```" not in text.split("```python")[-1]:
77
+ text += "\n```"
78
+
79
+ # Fix 3: Remove cluttered imports (Add newlines)
80
+ text = text.replace("import loggingimport", "import logging\nimport")
81
+ text = text.replace("import timefrom", "import time\nfrom")
82
+
83
+ return text
84
+
85
+ # =================================================================
86
+ # 4. TITAN PIPELINE CORE (250+ LINES LOGIC)
87
+ # =================================================================
88
+ class TitanCorePipeline:
89
+ def __init__(self, client, config: MasterConfig, audit: TitanAuditEngine):
90
  self.client = client
91
  self.config = config
92
  self.audit = audit
93
+ self.sanitizer = ResponseSanitizer()
94
 
95
+ async def run_workflow(self, message: str) -> str:
96
+ req_id = f"TX-{uuid.uuid4().hex[:8].upper()}"
97
+ self.audit.log("info", f"New Workflow Triggered: {message[:40]}", req_id)
 
98
 
99
  try:
100
+ # Stage 1: Intelligence Analysis
101
+ logic_prompt = await self._invoke_module("code_intelligence", "analyze_logic", message)
102
 
103
+ # Stage 2: Hard-Coded Architect Persona Generation
104
+ raw_ai_out = await self._generate_expert_content(logic_prompt or message, req_id)
 
105
 
106
+ # Stage 3: Professional Sanitization (Crucial for UI Boxes)
107
+ clean_output = self.sanitizer.fix_formatting(raw_ai_out)
108
 
109
+ # Stage 4: Code Review Logic Integration
110
+ final_code = await self._invoke_module("code_reviewer", "review_code", clean_output)
111
+
112
+ self.audit.log("info", "Workflow completed successfully", req_id)
113
+ return final_code or clean_output
114
 
115
  except Exception as e:
116
+ self.audit.log("error", f"Pipeline Failure: {str(e)}", req_id)
117
+ return f"❌ **Titan System Error:** {str(e)}"
118
+
119
+ async def _generate_expert_content(self, prompt: str, req_id: str) -> str:
120
+ if not self.client: return "Error: LLM Client not found."
121
+
122
+ system_msg = (
123
+ f"Persona: {self.config.system_persona}\n"
124
+ "Strict Guidelines:\n"
125
+ "1. Output ONLY in high-quality Markdown.\n"
126
+ "2. For code, ALWAYS use: ```python [code] ```\n"
127
+ "3. NO simple text separators like '---Python'.\n"
128
+ "4. Use professional docstrings, PEP 8, and Type Hints.\n"
129
+ "5. Ensure proper line breaks between imports and classes."
 
 
130
  )
131
 
132
  try:
133
  loop = asyncio.get_event_loop()
134
+ res = await loop.run_in_executor(None, lambda: self.client.chat.completions.create(
135
  model="llama-3.3-70b-versatile",
136
  messages=[
137
+ {"role": "system", "content": system_msg},
138
  {"role": "user", "content": prompt}
139
  ],
140
+ temperature=0.15,
141
+ max_tokens=3500
142
  ))
143
+ return res.choices[0].message.content
144
  except Exception as e:
145
+ raise RuntimeError(f"LLM Core Exception: {e}")
146
 
147
+ async def _invoke_module(self, mod_name: str, method: str, data: str):
 
148
  module = sys.modules.get(mod_name)
149
  if module and hasattr(module, method):
150
  try:
151
+ self.audit.log("info", f"Invoking {mod_name}.{method}")
152
  func = getattr(module, method)
153
+ return await func(data) if asyncio.iscoroutinefunction(func) else func(data)
 
 
154
  except Exception as e:
155
+ self.audit.log("warn", f"Module {mod_name} call failed: {e}")
156
  return None
157
 
158
  # =================================================================
159
+ # 5. MASTER ORCHESTRATOR CLASS (THE BRIDGE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  # =================================================================
161
  class AumCoreMaster:
 
162
  def __init__(self, client=None):
163
  self.config = MasterConfig()
164
+ self.audit = TitanAuditEngine(self.config)
165
+ self.pipeline = TitanCorePipeline(client, self.config, self.audit)
166
+ self.is_active = True
167
+
168
+ async def process_orchestration(self, message: str) -> str:
169
+ # Check if the user wants code
170
+ code_keywords = ["code", "python", "script", "build", "create", "fix", "function"]
171
+ is_coding = any(k in message.lower() for k in code_keywords)
172
 
173
+ if is_coding:
174
+ return await self.pipeline.run_workflow(message)
 
 
 
 
 
 
 
 
 
 
 
175
 
176
+ # Default AI conversation through the same high-level pipeline
177
+ return await self.pipeline.run_workflow(f"Handle this query professionally: {message}")
178
 
179
  # =================================================================
180
+ # 6. MODULE REGISTRATION (FASTAPI)
181
  # =================================================================
182
  def register_module(app, client, username):
183
+ """
184
+ MASTER REGISTRATION SYSTEM:
185
+ This connects to your 815-line app.py via the /system/orchestrate endpoint.
186
+ """
187
  master = AumCoreMaster(client)
188
+ router = APIRouter(prefix="/system", tags=["Titan Core v5"])
189
 
190
  @router.post("/orchestrate")
191
+ async def run_orchestration(message: str = Form(...)):
192
+ """Final Endpoint for Professional Coding"""
193
+ response = await master.process_orchestration(message)
194
  return {"response": response}
195
 
196
+ @router.get("/titan/heartbeat")
197
+ async def heartbeat():
198
+ vmem = psutil.virtual_memory()
199
+ return {
200
+ "status": "Alive",
201
+ "version": master.config.version,
202
+ "memory": f"{vmem.percent}%",
203
+ "threads": threading.active_count()
204
+ }
205
 
206
+ # Add to FastAPI App
207
  app.include_router(router)
208
  app.state.orchestrator = master
209
 
210
+ print("\n" + "X"*60)
211
+ print(f"🔱 TITAN ORCHESTRATOR v{master.config.version} LOADED")
212
+ print(f"✅ Target Repo: {master.config.repo_name}")
213
+ print(f"✅ Response Sanitizer: ACTIVE (Fixing Markdown)")
214
+ print("X"*60 + "\n")
 
215
 
216
  # =================================================================
217
+ # 7. SYSTEM STARTUP
218
  # =================================================================
219
  if __name__ == "__main__":
220
+ print("Titan v5.0 standalone initialization...")