| """ |
| Master Orchestrator β Tiered Architecture with LLM-first Routing |
| - LLM routing is the default path. |
| - Keyword routing is used ONLY as a fallback. |
| - Tier-3 β Tier-2 β Tier-1 execution flow preserved. |
| - PARALLELIZED for speed. |
| - Returns executive-ready TEXT (not JSON) |
| """ |
|
|
| import os |
| import sys |
| import re |
| import json |
| from dataclasses import dataclass |
| from datetime import datetime |
| from typing import Dict, Any, List, Optional |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
| |
| |
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| parent_dir = os.path.dirname(current_dir) |
| sys.path.insert(0, parent_dir) |
|
|
| |
| |
| |
| from agents.exec_intelligence_agent import ExecutiveIntelligenceAgent |
|
|
| |
| |
| |
| from agents.schedule_guard_agent import ScheduleAgent |
| from agents.ce_insights_agent import ContractsAgent |
| from agents.supplier_fragility import SupplierAgent |
| from agents.performance_agent import CostPerformanceAgent |
| from agents.decision_agent import DecisionsAgent |
| from agents.public_inbox_agent import PublicInboxAgent |
|
|
| |
| |
| |
| from agents.cash_flow_agent import CashflowAgent |
| from agents.risk_agent import RiskAgent |
| from agents.contracts_360_agent import Contract360Agent |
| from agents.design_guard_agent import DesignGuardAgent |
| from agents.commissioning_agent import CommissioningAgent |
|
|
|
|
| |
| |
| |
| class _LLMRouter: |
| """ |
| Suggests tier2_zones and tier3_supporting for a user query. |
| Expects self.llm to be a callable LLM instance. |
| """ |
| _VALID_T2 = {"schedule", "ce_insights", "supplier", "performance", "decisions", "public"} |
| _VALID_T3 = {"cashflow", "contract360", "design_guard", "commissioning", "risk"} |
|
|
| def __init__(self, llm=None, min_confidence: float = 0.55): |
| self.llm = llm |
| self.min_confidence = float(min_confidence) |
|
|
| def route(self, user_query: str) -> Dict[str, Any]: |
| if self.llm is None: |
| return {"tier2_zones": [], "tier3_supporting": [], "rationale": "No LLM configured", "confidence": 0.0} |
|
|
| prompt = f""" |
| You are a router for a 3-tier agentic system. |
| |
| Tier-2 zone KEYS: schedule, ce_insights, supplier, performance, decisions, public |
| Tier-3 agent KEYS: cashflow, contract360, design_guard, commissioning, risk |
| |
| Pick the most relevant Tier-2 zones (1β3, unless the user asks for a portfolio-wide/overall summary). |
| Include Tier-3 only if directly relevant to the selected zones. |
| |
| IMPORTANT: Return confidence as a decimal (0.0 to 1.0), not a percentage. |
| Example: "confidence": 0.85 (NOT "confidence": 85) |
| |
| Return ONLY JSON: |
| {{ |
| "tier2_zones": ["..."], |
| "tier3_supporting": ["..."], |
| "rationale": "1β2 sentences", |
| "confidence": 0.85 |
| }} |
| |
| USER QUERY: {user_query} |
| """.strip() |
|
|
| try: |
| |
| if hasattr(self.llm, 'call'): |
| text = self.llm.call(prompt) or "" |
| elif callable(self.llm): |
| text = self.llm(prompt) or "" |
| else: |
| return {"tier2_zones": [], "tier3_supporting": [], "rationale": "LLM not callable", "confidence": 0.0} |
| except Exception as e: |
| print(f"β οΈ LLM routing error: {e}") |
| return {"tier2_zones": [], "tier3_supporting": [], "rationale": f"LLM error: {e}", "confidence": 0.0} |
|
|
| |
| text = re.sub(r'```json\s*', '', text) |
| text = re.sub(r'```\s*', '', text) |
| m = re.search(r'\{[\s\S]*\}', text) |
| if not m: |
| return {"tier2_zones": [], "tier3_supporting": [], "rationale": "No JSON found in LLM response", "confidence": 0.0} |
|
|
| try: |
| data = json.loads(m.group(0)) |
| t2 = [z for z in data.get("tier2_zones", []) if z in self._VALID_T2] |
| t3 = [a for a in data.get("tier3_supporting", []) if a in self._VALID_T3] |
| |
| |
| raw_conf = data.get("confidence", 0.0) |
| if isinstance(raw_conf, str): |
| raw_conf = float(raw_conf.rstrip('%')) |
| conf = float(raw_conf) |
| |
| |
| if conf > 1.0: |
| conf = conf / 100.0 |
| |
| conf = max(0.0, min(1.0, conf)) |
| |
| return { |
| "tier2_zones": list(dict.fromkeys(t2)), |
| "tier3_supporting": list(dict.fromkeys(t3)), |
| "rationale": data.get("rationale", ""), |
| "confidence": conf, |
| } |
| except Exception as e: |
| print(f"β οΈ JSON parse error: {e}") |
| return {"tier2_zones": [], "tier3_supporting": [], "rationale": f"JSON parse error: {e}", "confidence": 0.0} |
|
|
|
|
| |
| |
| |
| @dataclass |
| class RoutingDecision: |
| tier2_zones: List[str] |
| tier3_supporting: List[str] |
| rationale: str |
|
|
|
|
| |
| |
| |
| class MasterOrchestrator: |
| """ |
| LLM-first routing: |
| - Try LLM router. If confident (>= threshold), use it. |
| - Otherwise fall back to deterministic keyword routing. |
| Execution order: |
| Tier-3 β Tier-2 (with Tier-3 inputs) β Tier-1 (Mother synthesis). |
| PARALLELIZED for speed. |
| Returns executive-ready TEXT (not JSON). |
| """ |
|
|
| def __init__(self, llm_instance=None, llm_router_min_conf: float = 0.6): |
| print("π Initializing Master Orchestrator (LLM-first routing + parallelization)") |
| self.llm = llm_instance |
|
|
| |
| self.mother = ExecutiveIntelligenceAgent(llm=self.llm) |
|
|
| |
| self.tier2_zones: Dict[str, Dict[str, Any]] = { |
| "schedule": {"agent": ScheduleAgent(llm=None), "zone_name": "Schedule Zone", "icon": "β°", "tier3_feeds": []}, |
| "ce_insights":{"agent": ContractsAgent(llm=None), "zone_name": "Commercial Zone", "icon": "π°", "tier3_feeds": ["cashflow", "contract360"]}, |
| "supplier": {"agent": SupplierAgent(llm=None), "zone_name": "Supplier Fragility Zone", "icon": "ποΈ", "tier3_feeds": []}, |
| "performance":{"agent": CostPerformanceAgent(llm=None),"zone_name": "Cost Performance Zone", "icon": "π", "tier3_feeds": []}, |
| "decisions": {"agent": DecisionsAgent(llm=None), "zone_name": "Decisions & Guardrail Zone","icon": "π‘οΈ", "tier3_feeds": ["design_guard"]}, |
| "public": {"agent": PublicInboxAgent(llm=None), "zone_name": "Advisory Zone", "icon": "π₯", "tier3_feeds": ["commissioning"]}, |
| } |
|
|
| |
| self.tier3_supporting: Dict[str, Dict[str, Any]] = { |
| "cashflow": {"agent": CashflowAgent(llm=None), "feeds_into_zone": "ce_insights", "description": "Cashflow and burn rate analysis"}, |
| "contract360": {"agent": Contract360Agent(llm=None), "feeds_into_zone": "ce_insights", "description": "Contract-level aggregation"}, |
| "design_guard": {"agent": DesignGuardAgent(llm=None), "feeds_into_zone": "decisions", "description": "Design compliance monitoring"}, |
| "commissioning": {"agent": CommissioningAgent(llm=None), "feeds_into_zone": "public", "description": "Commissioning readiness"}, |
| "risk": {"agent": RiskAgent(llm=None), "feeds_into_zone": "all", "description": "Cross-zone risk intelligence"}, |
| } |
|
|
| |
| self.routing_keywords: Dict[str, List[str]] = { |
| "schedule": ["schedule", "float", "delay", "milestone", "brittle", "critical path", "timeline"], |
| "ce_insights": ["ce", "compensation event", "commercial", "variation", "change", "changes", "exposure", "cost"], |
| "cashflow": ["cash", "cashflow", "burn rate", "runway", "liquidity", "enough cash"], |
| "supplier": ["supplier", "contractor", "delivery", "partner", "fragility", "retention", "payment"], |
| "performance": ["cpi", "spi", "eac", "cost performance", "budget", "overrun", "performance"], |
| "decisions": ["decision", "approval", "sla", "guardrail", "velocity", "design maturity"], |
| "public": ["community", "stakeholder", "policy", "regulation", "disruption", "compliance"], |
| "design_guard": ["design", "maturity", "ncr", "non-conformance", "standard"], |
| "commissioning": ["commissioning", "handover", "testing", "snag", "readiness"], |
| "risk": ["risk", "exposure", "materialization"] |
| } |
|
|
| |
| self.llm_router = _LLMRouter(llm=self.llm, min_confidence=llm_router_min_conf) |
| self.llm_router_min_conf = float(llm_router_min_conf) |
|
|
| print(f"β
TIER 1 ready | β
TIER 2 zones: {len(self.tier2_zones)} | β
TIER 3 agents: {len(self.tier3_supporting)}") |
| print(f"β
LLM routing enabled (threshold={self.llm_router_min_conf})") |
| print(f"β
Parallel execution enabled\n") |
|
|
| |
| |
| |
| def orchestrate(self, query: str, context: Optional[Dict[str, Any]] = None) -> str: |
| """ |
| Main orchestration method - returns executive-ready text response |
| |
| Returns: |
| str: Executive-ready response text (NOT JSON) |
| """ |
| q_lower = query.lower() |
| print(f"\nπ [MASTER ORCHESTRATOR] Processing: '{query[:80]}...'") |
|
|
| |
| routing = self._route_query_llm_first(query, q_lower) |
|
|
| print("\nπ― [ROUTING DECISION]") |
| print(f" Tier-2 Zones: {routing.tier2_zones}") |
| print(f" Tier-3 Supporting: {routing.tier3_supporting}") |
| print(f" Rationale: {routing.rationale}") |
|
|
| |
| print(f"\nπ [TIER 3] Executing {len(routing.tier3_supporting)} supporting agents in PARALLEL...") |
| tier3_results = self._execute_tier3(routing.tier3_supporting, query, context) |
|
|
| |
| print(f"\nπ [TIER 2] Executing {len(routing.tier2_zones)} zone orchestrators in PARALLEL...") |
| tier2_zone_summaries = self._execute_tier2_with_tier3_inputs( |
| routing.tier2_zones, tier3_results, query, context |
| ) |
|
|
| |
| print(f"\nπ€ [TIER 1] Mother Orchestrator synthesizing executive response...") |
| executive_response_text = self._synthesize_with_mother(query, tier2_zone_summaries, tier3_results) |
|
|
| |
| print(f"\nβ
[COMPLETE] Executive response ready\n") |
| return executive_response_text |
|
|
| |
| |
| |
| def _route_query_llm_first(self, original_query: str, query_lower: str) -> RoutingDecision: |
| """ |
| 1) Call LLM router; if confidence >= threshold, use it. |
| 2) Else use deterministic keyword fallback. |
| 3) Always apply universal rules: default Tier-2 if empty, include 'risk', |
| auto-include Tier-3 feeds for selected Tier-2 zones. |
| """ |
| |
| llm_suggestion = self.llm_router.route(original_query) |
| t2 = set(llm_suggestion.get("tier2_zones", [])) |
| t3 = set(llm_suggestion.get("tier3_supporting", [])) |
| rationale_parts = [] |
|
|
| if llm_suggestion.get("confidence", 0.0) >= self.llm_router_min_conf and t2: |
| rationale_parts.append(f"LLM routing (conf={llm_suggestion['confidence']:.2f}): {llm_suggestion.get('rationale','')}") |
| else: |
| |
| kw = self._route_query_keyword_fallback(query_lower) |
| t2 = set(kw.tier2_zones) |
| t3 = set(kw.tier3_supporting) |
| rationale_parts.append(f"Keyword fallback (LLM conf={llm_suggestion.get('confidence',0.0):.2f})") |
|
|
| |
| if not t2: |
| t2 = {"schedule", "ce_insights", "supplier", "performance"} |
| rationale_parts.append("Applied default Tier-2 core set") |
|
|
| |
| if "risk" not in t3: |
| t3.add("risk") |
|
|
| |
| for z in t2: |
| for feed in self.tier2_zones.get(z, {}).get("tier3_feeds", []): |
| t3.add(feed) |
|
|
| return RoutingDecision( |
| tier2_zones=list(dict.fromkeys(t2)), |
| tier3_supporting=list(dict.fromkeys(t3)), |
| rationale="; ".join(rationale_parts) if rationale_parts else "LLM routing" |
| ) |
|
|
| def _route_query_keyword_fallback(self, query_lower: str) -> RoutingDecision: |
| """Deterministic routing used ONLY if LLM routing fails/low confidence.""" |
| tier2_to_call: List[str] = [] |
| tier3_to_call: List[str] = [] |
|
|
| for name, keywords in self.routing_keywords.items(): |
| if name in self.tier2_zones and any(kw in query_lower for kw in keywords): |
| tier2_to_call.append(name) |
| elif name in self.tier3_supporting and any(kw in query_lower for kw in keywords): |
| tier3_to_call.append(name) |
|
|
| |
| if "enough" in query_lower and any(kw in query_lower for kw in ["cash", "runway"]): |
| if "ce_insights" not in tier2_to_call: |
| tier2_to_call.append("ce_insights") |
| if "cashflow" not in tier3_to_call: |
| tier3_to_call.append("cashflow") |
|
|
| |
| if any(kw in query_lower for kw in ["portfolio", "overall", "status", "summary"]): |
| tier2_to_call = list(self.tier2_zones.keys()) |
| for k in ["cashflow", "risk"]: |
| if k not in tier3_to_call: |
| tier3_to_call.append(k) |
|
|
| |
| if not tier2_to_call: |
| tier2_to_call = ["schedule", "ce_insights", "supplier", "performance"] |
|
|
| |
| if "risk" not in tier3_to_call: |
| tier3_to_call.append("risk") |
|
|
| |
| for zone in tier2_to_call: |
| for feed in self.tier2_zones.get(zone, {}).get("tier3_feeds", []): |
| if feed not in tier3_to_call: |
| tier3_to_call.append(feed) |
|
|
| return RoutingDecision( |
| tier2_zones=tier2_to_call, |
| tier3_supporting=tier3_to_call, |
| rationale="Keyword heuristic (fallback)" |
| ) |
|
|
| |
| |
| |
| def _execute_tier3(self, tier3_agents: List[str], query: str, context: Optional[Dict[str, Any]]) -> Dict[str, Any]: |
| """Execute TIER 3 agents in PARALLEL""" |
| results: Dict[str, Any] = {} |
| |
| if not tier3_agents: |
| return results |
| |
| def run_tier3_agent(name: str): |
| """Run a single TIER 3 agent""" |
| if name not in self.tier3_supporting: |
| return name, {"error": "Agent not found", "tier": 3} |
| |
| meta = self.tier3_supporting[name] |
| print(f" β Executing {name} (feeds into {meta['feeds_into_zone']})...") |
| try: |
| res = meta["agent"].analyze(query, context) |
| res["tier"] = 3 |
| res["feeds_into_zone"] = meta["feeds_into_zone"] |
| print(f" β {name} complete") |
| return name, res |
| except Exception as e: |
| print(f" β {name} failed: {e}") |
| return name, {"error": str(e), "tier": 3, "feeds_into_zone": meta["feeds_into_zone"]} |
| |
| |
| with ThreadPoolExecutor(max_workers=min(len(tier3_agents), 5)) as executor: |
| futures = {executor.submit(run_tier3_agent, name): name for name in tier3_agents} |
| |
| for future in as_completed(futures): |
| name, result = future.result() |
| results[name] = result |
| |
| return results |
|
|
| def _execute_tier2_with_tier3_inputs( |
| self, |
| tier2_zones: List[str], |
| tier3_results: Dict[str, Any], |
| query: str, |
| context: Optional[Dict[str, Any]] |
| ) -> Dict[str, Any]: |
| """Execute TIER 2 zones in PARALLEL""" |
| summaries: Dict[str, Any] = {} |
| |
| if not tier2_zones: |
| return summaries |
| |
| def run_tier2_zone(key: str): |
| """Run a single TIER 2 zone""" |
| if key not in self.tier2_zones: |
| return key, {"error": "Zone not found", "tier": 2} |
| |
| z = self.tier2_zones[key] |
| print(f" β {z['icon']} Executing {key} ({z['zone_name']})...") |
|
|
| zone_ctx = dict(context or {}) |
| zone_ctx["tier3_inputs"] = {} |
| for t3_name, t3_res in tier3_results.items(): |
| if not isinstance(t3_res, dict): |
| continue |
| feeds_into = t3_res.get("feeds_into_zone") |
| if feeds_into == key or feeds_into == "all": |
| zone_ctx["tier3_inputs"][t3_name] = t3_res |
|
|
| try: |
| out = z["agent"].analyze(query, zone_ctx) |
| out["tier"] = 2 |
| out["zone_name"] = z["zone_name"] |
| out["tier3_inputs_received"] = list(zone_ctx["tier3_inputs"].keys()) |
| print(f" β {key} complete (received {len(zone_ctx['tier3_inputs'])} Tier-3 inputs)") |
| return key, out |
| except Exception as e: |
| print(f" β {key} failed: {e}") |
| return key, { |
| "error": str(e), |
| "tier": 2, |
| "zone_name": z["zone_name"], |
| "tier3_inputs_received": list(zone_ctx.get("tier3_inputs", {}).keys()) |
| } |
| |
| |
| with ThreadPoolExecutor(max_workers=min(len(tier2_zones), 6)) as executor: |
| futures = {executor.submit(run_tier2_zone, key): key for key in tier2_zones} |
| |
| for future in as_completed(futures): |
| key, result = future.result() |
| summaries[key] = result |
| |
| return summaries |
|
|
| def _synthesize_with_mother( |
| self, |
| query: str, |
| tier2_summaries: Dict[str, Any], |
| tier3_results: Dict[str, Any] |
| ) -> str: |
| """ |
| Call Mother Orchestrator to synthesize executive response |
| |
| Returns: |
| str: Executive-ready text response |
| """ |
| try: |
| ctx = { |
| "query": query, |
| "tier2_zone_summaries": tier2_summaries, |
| "tier3_supporting_results": tier3_results |
| } |
| |
| executive_text = self.mother.analyze(query, ctx) |
| return executive_text |
| except Exception as e: |
| print(f"β οΈ Mother Orchestrator failed: {e}") |
| return f"""**Executive Summary** |
| |
| **Question:** {query} |
| |
| **Status:** β οΈ Analysis failed - {str(e)} |
| |
| **Action Required:** Please contact technical support or try rephrasing your question. |
| |
| **Data Sources:** Partial zone analysis available |
| **Timestamp:** {datetime.now().strftime('%d-%b-%Y %H:%M')} |
| """ |
|
|
|
|
| |
| |
| |
| _orchestrator_instance: Optional[MasterOrchestrator] = None |
|
|
| def get_orchestrator(llm_instance=None) -> MasterOrchestrator: |
| global _orchestrator_instance |
| if _orchestrator_instance is None: |
| _orchestrator_instance = MasterOrchestrator(llm_instance=llm_instance) |
| return _orchestrator_instance |