| from datetime import datetime |
| from agents.risk_agent import RiskAnalysisAgent |
| from agents.recommendation_agents import ( |
| EmergencyRecommendationAgent, |
| HouseholdAdaptationAgent, |
| BusinessContinuityAgent, |
| FinancialAdaptationAgent, |
| ) |
|
|
|
|
| class ClimateRiskOrchestrator: |
| """Main orchestrator that coordinates all agents.""" |
|
|
| def __init__(self, model): |
| self.risk_agent = RiskAnalysisAgent(model) |
| self.emergency_agent = EmergencyRecommendationAgent(model) |
| self.household_agent = HouseholdAdaptationAgent(model) |
| self.business_agent = BusinessContinuityAgent(model) |
| self.financial_agent = FinancialAdaptationAgent(model) |
|
|
| def analyze_and_recommend(self, user_query: str, user_profile: dict) -> dict: |
| """Main method that coordinates risk analysis and recommendation generation.""" |
| try: |
| print("π Analyzing risks...") |
| risk_analysis = self.risk_agent.analyze_risks(user_query) |
|
|
| recommendations = {} |
|
|
| print("π¨ Generating emergency recommendations...") |
| recommendations["emergency"] = ( |
| self.emergency_agent.generate_emergency_recommendations( |
| risk_analysis, user_profile |
| ) |
| ) |
|
|
| print("π° Generating financial recommendations...") |
| recommendations["financial"] = ( |
| self.financial_agent.generate_financial_recommendations( |
| risk_analysis, user_profile |
| ) |
| ) |
|
|
| profile_type = user_profile.get("type", "general").lower() |
|
|
| if profile_type in [ |
| "household", |
| "residential", |
| "general", |
| "general public", |
| ]: |
| print("π Generating household recommendations...") |
| recommendations["household"] = ( |
| self.household_agent.generate_household_recommendations( |
| risk_analysis, user_profile |
| ) |
| ) |
|
|
| if profile_type in ["business", "business owner"]: |
| print("π’ Generating business recommendations...") |
| recommendations["business"] = ( |
| self.business_agent.generate_business_recommendations( |
| risk_analysis, user_profile |
| ) |
| ) |
|
|
| return { |
| "risk_analysis": risk_analysis, |
| "recommendations": recommendations, |
| "generated_at": datetime.now().isoformat(), |
| "user_profile": user_profile, |
| } |
|
|
| except Exception as e: |
| return { |
| "error": f"Analysis failed: {str(e)}", |
| "risk_analysis": {}, |
| "recommendations": {}, |
| "generated_at": datetime.now().isoformat(), |
| } |
|
|