capital-project-agentic-system / src /orchestrator /master_orchestrator.py
MoEY14422's picture
updaitng response logic
25b1650
"""
Master Orchestrator β€” Tiered Architecture with LLM-first Routing
- LLM routing is the default path.
- Keyword routing is used ONLY as a fallback.
- Tier-3 β†’ Tier-2 β†’ Tier-1 execution flow preserved.
- PARALLELIZED for speed.
- Returns executive-ready TEXT (not JSON)
"""
import os
import sys
import re
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, List, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
# ----------------------------
# Import path setup
# ----------------------------
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
# ----------------------------
# TIER 1: Mother Orchestrator
# ----------------------------
from agents.exec_intelligence_agent import ExecutiveIntelligenceAgent
# ----------------------------
# TIER 2: Zone Orchestrators
# ----------------------------
from agents.schedule_guard_agent import ScheduleAgent
from agents.ce_insights_agent import ContractsAgent
from agents.supplier_fragility import SupplierAgent
from agents.performance_agent import CostPerformanceAgent
from agents.decision_agent import DecisionsAgent
from agents.public_inbox_agent import PublicInboxAgent
# ----------------------------
# TIER 3: Supporting Agents
# ----------------------------
from agents.cash_flow_agent import CashflowAgent
from agents.risk_agent import RiskAgent
from agents.contracts_360_agent import Contract360Agent
from agents.design_guard_agent import DesignGuardAgent
from agents.commissioning_agent import CommissioningAgent
# =============================================================================
# LLM Router (inline helper)
# =============================================================================
class _LLMRouter:
"""
Suggests tier2_zones and tier3_supporting for a user query.
Expects self.llm to be a callable LLM instance.
"""
_VALID_T2 = {"schedule", "ce_insights", "supplier", "performance", "decisions", "public"}
_VALID_T3 = {"cashflow", "contract360", "design_guard", "commissioning", "risk"}
def __init__(self, llm=None, min_confidence: float = 0.55):
self.llm = llm
self.min_confidence = float(min_confidence)
def route(self, user_query: str) -> Dict[str, Any]:
if self.llm is None:
return {"tier2_zones": [], "tier3_supporting": [], "rationale": "No LLM configured", "confidence": 0.0}
prompt = f"""
You are a router for a 3-tier agentic system.
Tier-2 zone KEYS: schedule, ce_insights, supplier, performance, decisions, public
Tier-3 agent KEYS: cashflow, contract360, design_guard, commissioning, risk
Pick the most relevant Tier-2 zones (1–3, unless the user asks for a portfolio-wide/overall summary).
Include Tier-3 only if directly relevant to the selected zones.
IMPORTANT: Return confidence as a decimal (0.0 to 1.0), not a percentage.
Example: "confidence": 0.85 (NOT "confidence": 85)
Return ONLY JSON:
{{
"tier2_zones": ["..."],
"tier3_supporting": ["..."],
"rationale": "1–2 sentences",
"confidence": 0.85
}}
USER QUERY: {user_query}
""".strip()
try:
# Call LLM - handle both CrewAI LLM and direct callable
if hasattr(self.llm, 'call'):
text = self.llm.call(prompt) or ""
elif callable(self.llm):
text = self.llm(prompt) or ""
else:
return {"tier2_zones": [], "tier3_supporting": [], "rationale": "LLM not callable", "confidence": 0.0}
except Exception as e:
print(f"⚠️ LLM routing error: {e}")
return {"tier2_zones": [], "tier3_supporting": [], "rationale": f"LLM error: {e}", "confidence": 0.0}
# Strip code fences if present and extract JSON
text = re.sub(r'```json\s*', '', text)
text = re.sub(r'```\s*', '', text)
m = re.search(r'\{[\s\S]*\}', text)
if not m:
return {"tier2_zones": [], "tier3_supporting": [], "rationale": "No JSON found in LLM response", "confidence": 0.0}
try:
data = json.loads(m.group(0))
t2 = [z for z in data.get("tier2_zones", []) if z in self._VALID_T2]
t3 = [a for a in data.get("tier3_supporting", []) if a in self._VALID_T3]
# Parse confidence - handle both decimal and percentage
raw_conf = data.get("confidence", 0.0)
if isinstance(raw_conf, str):
raw_conf = float(raw_conf.rstrip('%'))
conf = float(raw_conf)
# If confidence > 1.0, assume it's a percentage and convert
if conf > 1.0:
conf = conf / 100.0
conf = max(0.0, min(1.0, conf))
return {
"tier2_zones": list(dict.fromkeys(t2)),
"tier3_supporting": list(dict.fromkeys(t3)),
"rationale": data.get("rationale", ""),
"confidence": conf,
}
except Exception as e:
print(f"⚠️ JSON parse error: {e}")
return {"tier2_zones": [], "tier3_supporting": [], "rationale": f"JSON parse error: {e}", "confidence": 0.0}
# =============================================================================
# Data
# =============================================================================
@dataclass
class RoutingDecision:
tier2_zones: List[str]
tier3_supporting: List[str]
rationale: str
# =============================================================================
# Master Orchestrator
# =============================================================================
class MasterOrchestrator:
"""
LLM-first routing:
- Try LLM router. If confident (>= threshold), use it.
- Otherwise fall back to deterministic keyword routing.
Execution order:
Tier-3 β†’ Tier-2 (with Tier-3 inputs) β†’ Tier-1 (Mother synthesis).
PARALLELIZED for speed.
Returns executive-ready TEXT (not JSON).
"""
def __init__(self, llm_instance=None, llm_router_min_conf: float = 0.6):
print("πŸ”€ Initializing Master Orchestrator (LLM-first routing + parallelization)")
self.llm = llm_instance
# TIER 1
self.mother = ExecutiveIntelligenceAgent(llm=self.llm)
# TIER 2
self.tier2_zones: Dict[str, Dict[str, Any]] = {
"schedule": {"agent": ScheduleAgent(llm=None), "zone_name": "Schedule Zone", "icon": "⏰", "tier3_feeds": []},
"ce_insights":{"agent": ContractsAgent(llm=None), "zone_name": "Commercial Zone", "icon": "πŸ’°", "tier3_feeds": ["cashflow", "contract360"]},
"supplier": {"agent": SupplierAgent(llm=None), "zone_name": "Supplier Fragility Zone", "icon": "πŸ—οΈ", "tier3_feeds": []},
"performance":{"agent": CostPerformanceAgent(llm=None),"zone_name": "Cost Performance Zone", "icon": "πŸ“Š", "tier3_feeds": []},
"decisions": {"agent": DecisionsAgent(llm=None), "zone_name": "Decisions & Guardrail Zone","icon": "πŸ›‘οΈ", "tier3_feeds": ["design_guard"]},
"public": {"agent": PublicInboxAgent(llm=None), "zone_name": "Advisory Zone", "icon": "πŸ“₯", "tier3_feeds": ["commissioning"]},
}
# TIER 3
self.tier3_supporting: Dict[str, Dict[str, Any]] = {
"cashflow": {"agent": CashflowAgent(llm=None), "feeds_into_zone": "ce_insights", "description": "Cashflow and burn rate analysis"},
"contract360": {"agent": Contract360Agent(llm=None), "feeds_into_zone": "ce_insights", "description": "Contract-level aggregation"},
"design_guard": {"agent": DesignGuardAgent(llm=None), "feeds_into_zone": "decisions", "description": "Design compliance monitoring"},
"commissioning": {"agent": CommissioningAgent(llm=None), "feeds_into_zone": "public", "description": "Commissioning readiness"},
"risk": {"agent": RiskAgent(llm=None), "feeds_into_zone": "all", "description": "Cross-zone risk intelligence"},
}
# Keyword fallback dictionary (used ONLY when LLM fails/low-conf)
self.routing_keywords: Dict[str, List[str]] = {
"schedule": ["schedule", "float", "delay", "milestone", "brittle", "critical path", "timeline"],
"ce_insights": ["ce", "compensation event", "commercial", "variation", "change", "changes", "exposure", "cost"],
"cashflow": ["cash", "cashflow", "burn rate", "runway", "liquidity", "enough cash"],
"supplier": ["supplier", "contractor", "delivery", "partner", "fragility", "retention", "payment"],
"performance": ["cpi", "spi", "eac", "cost performance", "budget", "overrun", "performance"],
"decisions": ["decision", "approval", "sla", "guardrail", "velocity", "design maturity"],
"public": ["community", "stakeholder", "policy", "regulation", "disruption", "compliance"],
"design_guard": ["design", "maturity", "ncr", "non-conformance", "standard"],
"commissioning": ["commissioning", "handover", "testing", "snag", "readiness"],
"risk": ["risk", "exposure", "materialization"]
}
# LLM Router (default on)
self.llm_router = _LLMRouter(llm=self.llm, min_confidence=llm_router_min_conf)
self.llm_router_min_conf = float(llm_router_min_conf)
print(f"βœ… TIER 1 ready | βœ… TIER 2 zones: {len(self.tier2_zones)} | βœ… TIER 3 agents: {len(self.tier3_supporting)}")
print(f"βœ… LLM routing enabled (threshold={self.llm_router_min_conf})")
print(f"βœ… Parallel execution enabled\n")
# -------------------------------------------------------------------------
# Public API - NOW RETURNS STRING (executive text)
# -------------------------------------------------------------------------
def orchestrate(self, query: str, context: Optional[Dict[str, Any]] = None) -> str:
"""
Main orchestration method - returns executive-ready text response
Returns:
str: Executive-ready response text (NOT JSON)
"""
q_lower = query.lower()
print(f"\nπŸ”€ [MASTER ORCHESTRATOR] Processing: '{query[:80]}...'")
# STEP 1 β€” ROUTING (LLM first, keyword fallback)
routing = self._route_query_llm_first(query, q_lower)
print("\n🎯 [ROUTING DECISION]")
print(f" Tier-2 Zones: {routing.tier2_zones}")
print(f" Tier-3 Supporting: {routing.tier3_supporting}")
print(f" Rationale: {routing.rationale}")
# STEP 2 β€” TIER 3 (PARALLEL)
print(f"\nπŸ”— [TIER 3] Executing {len(routing.tier3_supporting)} supporting agents in PARALLEL...")
tier3_results = self._execute_tier3(routing.tier3_supporting, query, context)
# STEP 3 β€” TIER 2 (PARALLEL)
print(f"\nπŸ“Š [TIER 2] Executing {len(routing.tier2_zones)} zone orchestrators in PARALLEL...")
tier2_zone_summaries = self._execute_tier2_with_tier3_inputs(
routing.tier2_zones, tier3_results, query, context
)
# STEP 4 β€” TIER 1 synthesis (Mother) - NOW RETURNS TEXT
print(f"\nπŸ€– [TIER 1] Mother Orchestrator synthesizing executive response...")
executive_response_text = self._synthesize_with_mother(query, tier2_zone_summaries, tier3_results)
# STEP 5 β€” Return executive text directly
print(f"\nβœ… [COMPLETE] Executive response ready\n")
return executive_response_text
# -------------------------------------------------------------------------
# Routing (LLM-first, keyword fallback)
# -------------------------------------------------------------------------
def _route_query_llm_first(self, original_query: str, query_lower: str) -> RoutingDecision:
"""
1) Call LLM router; if confidence >= threshold, use it.
2) Else use deterministic keyword fallback.
3) Always apply universal rules: default Tier-2 if empty, include 'risk',
auto-include Tier-3 feeds for selected Tier-2 zones.
"""
# Try LLM routing
llm_suggestion = self.llm_router.route(original_query)
t2 = set(llm_suggestion.get("tier2_zones", []))
t3 = set(llm_suggestion.get("tier3_supporting", []))
rationale_parts = []
if llm_suggestion.get("confidence", 0.0) >= self.llm_router_min_conf and t2:
rationale_parts.append(f"LLM routing (conf={llm_suggestion['confidence']:.2f}): {llm_suggestion.get('rationale','')}")
else:
# Fallback to keyword routing
kw = self._route_query_keyword_fallback(query_lower)
t2 = set(kw.tier2_zones)
t3 = set(kw.tier3_supporting)
rationale_parts.append(f"Keyword fallback (LLM conf={llm_suggestion.get('confidence',0.0):.2f})")
# Ensure at least core coverage if still empty
if not t2:
t2 = {"schedule", "ce_insights", "supplier", "performance"}
rationale_parts.append("Applied default Tier-2 core set")
# Always include Risk (cross-zone)
if "risk" not in t3:
t3.add("risk")
# Auto-include Tier-3 feeds for selected Tier-2
for z in t2:
for feed in self.tier2_zones.get(z, {}).get("tier3_feeds", []):
t3.add(feed)
return RoutingDecision(
tier2_zones=list(dict.fromkeys(t2)),
tier3_supporting=list(dict.fromkeys(t3)),
rationale="; ".join(rationale_parts) if rationale_parts else "LLM routing"
)
def _route_query_keyword_fallback(self, query_lower: str) -> RoutingDecision:
"""Deterministic routing used ONLY if LLM routing fails/low confidence."""
tier2_to_call: List[str] = []
tier3_to_call: List[str] = []
for name, keywords in self.routing_keywords.items():
if name in self.tier2_zones and any(kw in query_lower for kw in keywords):
tier2_to_call.append(name)
elif name in self.tier3_supporting and any(kw in query_lower for kw in keywords):
tier3_to_call.append(name)
# Special case: "enough cash/runway" => ce_insights + cashflow
if "enough" in query_lower and any(kw in query_lower for kw in ["cash", "runway"]):
if "ce_insights" not in tier2_to_call:
tier2_to_call.append("ce_insights")
if "cashflow" not in tier3_to_call:
tier3_to_call.append("cashflow")
# Broad queries => all Tier-2 zones + key Tier-3
if any(kw in query_lower for kw in ["portfolio", "overall", "status", "summary"]):
tier2_to_call = list(self.tier2_zones.keys())
for k in ["cashflow", "risk"]:
if k not in tier3_to_call:
tier3_to_call.append(k)
# If still empty, choose core set
if not tier2_to_call:
tier2_to_call = ["schedule", "ce_insights", "supplier", "performance"]
# Always include risk
if "risk" not in tier3_to_call:
tier3_to_call.append("risk")
# Auto-feeds
for zone in tier2_to_call:
for feed in self.tier2_zones.get(zone, {}).get("tier3_feeds", []):
if feed not in tier3_to_call:
tier3_to_call.append(feed)
return RoutingDecision(
tier2_zones=tier2_to_call,
tier3_supporting=tier3_to_call,
rationale="Keyword heuristic (fallback)"
)
# -------------------------------------------------------------------------
# Execution (PARALLELIZED)
# -------------------------------------------------------------------------
def _execute_tier3(self, tier3_agents: List[str], query: str, context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute TIER 3 agents in PARALLEL"""
results: Dict[str, Any] = {}
if not tier3_agents:
return results
def run_tier3_agent(name: str):
"""Run a single TIER 3 agent"""
if name not in self.tier3_supporting:
return name, {"error": "Agent not found", "tier": 3}
meta = self.tier3_supporting[name]
print(f" β†’ Executing {name} (feeds into {meta['feeds_into_zone']})...")
try:
res = meta["agent"].analyze(query, context)
res["tier"] = 3
res["feeds_into_zone"] = meta["feeds_into_zone"]
print(f" βœ“ {name} complete")
return name, res
except Exception as e:
print(f" βœ— {name} failed: {e}")
return name, {"error": str(e), "tier": 3, "feeds_into_zone": meta["feeds_into_zone"]}
# Run all TIER 3 agents in parallel
with ThreadPoolExecutor(max_workers=min(len(tier3_agents), 5)) as executor:
futures = {executor.submit(run_tier3_agent, name): name for name in tier3_agents}
for future in as_completed(futures):
name, result = future.result()
results[name] = result
return results
def _execute_tier2_with_tier3_inputs(
self,
tier2_zones: List[str],
tier3_results: Dict[str, Any],
query: str,
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Execute TIER 2 zones in PARALLEL"""
summaries: Dict[str, Any] = {}
if not tier2_zones:
return summaries
def run_tier2_zone(key: str):
"""Run a single TIER 2 zone"""
if key not in self.tier2_zones:
return key, {"error": "Zone not found", "tier": 2}
z = self.tier2_zones[key]
print(f" β†’ {z['icon']} Executing {key} ({z['zone_name']})...")
zone_ctx = dict(context or {})
zone_ctx["tier3_inputs"] = {}
for t3_name, t3_res in tier3_results.items():
if not isinstance(t3_res, dict):
continue
feeds_into = t3_res.get("feeds_into_zone")
if feeds_into == key or feeds_into == "all":
zone_ctx["tier3_inputs"][t3_name] = t3_res
try:
out = z["agent"].analyze(query, zone_ctx)
out["tier"] = 2
out["zone_name"] = z["zone_name"]
out["tier3_inputs_received"] = list(zone_ctx["tier3_inputs"].keys())
print(f" βœ“ {key} complete (received {len(zone_ctx['tier3_inputs'])} Tier-3 inputs)")
return key, out
except Exception as e:
print(f" βœ— {key} failed: {e}")
return key, {
"error": str(e),
"tier": 2,
"zone_name": z["zone_name"],
"tier3_inputs_received": list(zone_ctx.get("tier3_inputs", {}).keys())
}
# Run all TIER 2 zones in parallel
with ThreadPoolExecutor(max_workers=min(len(tier2_zones), 6)) as executor:
futures = {executor.submit(run_tier2_zone, key): key for key in tier2_zones}
for future in as_completed(futures):
key, result = future.result()
summaries[key] = result
return summaries
def _synthesize_with_mother(
self,
query: str,
tier2_summaries: Dict[str, Any],
tier3_results: Dict[str, Any]
) -> str:
"""
Call Mother Orchestrator to synthesize executive response
Returns:
str: Executive-ready text response
"""
try:
ctx = {
"query": query,
"tier2_zone_summaries": tier2_summaries,
"tier3_supporting_results": tier3_results
}
# Mother now returns text (not JSON)
executive_text = self.mother.analyze(query, ctx)
return executive_text
except Exception as e:
print(f"⚠️ Mother Orchestrator failed: {e}")
return f"""**Executive Summary**
**Question:** {query}
**Status:** ⚠️ Analysis failed - {str(e)}
**Action Required:** Please contact technical support or try rephrasing your question.
**Data Sources:** Partial zone analysis available
**Timestamp:** {datetime.now().strftime('%d-%b-%Y %H:%M')}
"""
# ----------------------------
# Singleton accessor
# ----------------------------
_orchestrator_instance: Optional[MasterOrchestrator] = None
def get_orchestrator(llm_instance=None) -> MasterOrchestrator:
global _orchestrator_instance
if _orchestrator_instance is None:
_orchestrator_instance = MasterOrchestrator(llm_instance=llm_instance)
return _orchestrator_instance