import os import sys import time from datetime import datetime, timedelta import re import numpy as np import torch import pandas as pd import matplotlib.pyplot as plt # Allow imports from project root sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from cluster import InterClusterCoordinator, InterClusterLedger from Environment.cluster_env_wrapper import make_vec_env from mappo.trainer.mappo import MAPPO def recursive_sum(item): total = 0 # Check if the item is a list, array, or other iterable, but not a string if hasattr(item, '__iter__') and not isinstance(item, str): for sub_item in item: total += recursive_sum(sub_item) # If it's a single number, just add it elif np.isreal(item): total += item # Ignore any non-numeric, non-iterable items return total def main(): overall_start_time = time.time() # Training Configuration Parameters STATE_TO_RUN = "oklahoma" # or "colorado", "oklahoma" DATA_PATH = "data/training/1000houses_152days_TRAIN.csv" # Dynamically extract the number of agents from the file path match = re.search(r'(\d+)houses', DATA_PATH) if not match: raise ValueError("Could not extract the number of houses from DATA_PATH.") NUMBER_OF_AGENTS = int(match.group(1)) CLUSTER_SIZE = 10 NUM_EPISODES = 10000 BATCH_SIZE = 256 CHECKPOINT_INTERVAL = 100000 # Reduced for more frequent saving during testing WINDOW_SIZE = 80 MAX_TRANSFER_KWH = 100000 LR = 2e-4 GAMMA = 0.95 LAMBDA = 0.95 CLIP_EPS = 0.2 K_EPOCHS = 4 JOINT_TRAINING_START_EPISODE = 2000 FREEZE_HIGH_FOR_EPISODES = 20 FREEZE_LOW_FOR_EPISODES = 10 # Build run directories timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_name = f"hierarchical_{STATE_TO_RUN}_{NUMBER_OF_AGENTS}agents_" \ f"{CLUSTER_SIZE}size_{NUM_EPISODES}eps_{timestamp}" root_dir = os.path.join("FINALE_FINALE_FINALE", run_name) # New folder for new runs models_dir = os.path.join(root_dir, "models") logs_dir = os.path.join(root_dir, "logs") plots_dir = os.path.join(root_dir, "plots") for d in (models_dir, logs_dir, plots_dir): os.makedirs(d, exist_ok=True) print(f"Logging to: {root_dir}") # Environment & Agent Initialization # Instantiate the environment using vectorized environment factory function # This single call replaces the manual creation of base_env and ClusterEnvWrapper cluster_env = make_vec_env( data_path=DATA_PATH, time_freq="15T", cluster_size=CLUSTER_SIZE, state=STATE_TO_RUN ) # Get environment parameters from the vectorized environment object n_clusters = cluster_env.num_envs sample_subenv = cluster_env.cluster_envs[0] # Access a sample sub-env n_agents_per_cluster = sample_subenv.num_agents local_dim = sample_subenv.observation_space.shape[-1] global_dim = n_agents_per_cluster * local_dim # Access the action dim from the first part of the Tuple action space act_dim = sample_subenv.action_space[0].shape[-1] # The total number of transitions collected each episode is (steps_per_day * num_clusters) total_buffer_size = sample_subenv.num_steps * n_clusters print(f"Low-level agent buffer size set to: {total_buffer_size}") print(f"Created {n_clusters} clusters.") print(f"Shared low-level agent: {n_agents_per_cluster} agents per cluster, " f"obs_dim={local_dim}, global_dim={global_dim}, act_dim={act_dim}") print(f"Creating {n_clusters} independent low-level MAPPO agents...") low_agents = [] for i in range(n_clusters): # Each agent's buffer only needs to be as long as one episode day agent_buffer_size = sample_subenv.num_steps agent = MAPPO( n_agents=n_agents_per_cluster, local_dim=local_dim, global_dim=global_dim, act_dim=act_dim, lr=LR, gamma=GAMMA, lam=LAMBDA, clip_eps=CLIP_EPS, k_epochs=K_EPOCHS, batch_size=BATCH_SIZE, episode_len=agent_buffer_size ) low_agents.append(agent) # Define dimensions for the high-level MAPPO agent OBS_DIM_HI_LOCAL = 7 # Each cluster has 7 features for its local state act_dim_inter = 2 # Export/Import preference for each cluster # The global state for the high-level agent is the concatenation # of all high-level local states OBS_DIM_HI_GLOBAL = n_clusters * OBS_DIM_HI_LOCAL print(f"Inter-cluster agent (MAPPO): n_agents={n_clusters}, " f"local_dim={OBS_DIM_HI_LOCAL}, global_dim={OBS_DIM_HI_GLOBAL}, act_dim={act_dim_inter}") # Instantiate MAPPO for the inter-cluster agent inter_agent = MAPPO( n_agents=n_clusters, local_dim=OBS_DIM_HI_LOCAL, global_dim=OBS_DIM_HI_GLOBAL, act_dim=act_dim_inter, lr=LR, gamma=GAMMA, lam=LAMBDA, clip_eps=CLIP_EPS, k_epochs=K_EPOCHS, batch_size=BATCH_SIZE, episode_len=sample_subenv.num_steps ) ledger = InterClusterLedger() coordinator = InterClusterCoordinator( cluster_env, inter_agent, ledger, max_transfer_kwh=MAX_TRANSFER_KWH ) # Training loop total_steps = 0 episode_log_data = [] performance_metrics_log = [] intra_log = {} inter_log = {} total_log = {} cost_log = {} for ep in range(1, NUM_EPISODES + 1): step_count = 0 start_time = time.time() ep_total_inter_cluster_reward = 0.0 day_logs = [] obs_clusters, _ = cluster_env.reset() if ep > 1: # For vectorized envs, call is the right way to invoke a method on all sub-envs # This returns a list of dictionaries, one from each cluster env all_cluster_metrics = cluster_env.call('get_episode_metrics') # Aggregate the metrics from all clusters into a single system-wide summary system_metrics = { "grid_reduction_entire_day": sum(m["grid_reduction_entire_day"] for m in all_cluster_metrics), "grid_reduction_peak_hours": sum(m["grid_reduction_peak_hours"] for m in all_cluster_metrics), "total_cost_savings": sum(m["total_cost_savings"] for m in all_cluster_metrics), "battery_degradation_cost_total": sum(m["battery_degradation_cost_total"] for m in all_cluster_metrics), # For fairness, we average the fairness index across clusters "fairness_on_cost_savings": np.mean([m["fairness_on_cost_savings"] for m in all_cluster_metrics]), "Episode": ep - 1 # Associate with the episode that just finished } # Append the aggregated dictionary to our log performance_metrics_log.append(system_metrics) # Use a single 'done' flag for the episode done_all = False # Initialize rewards and costs cluster_rewards = np.zeros((n_clusters, n_agents_per_cluster), dtype=np.float32) total_cost = 0.0 total_grid_import = 0.0 # Determine training phase is_phase_1 = ep < JOINT_TRAINING_START_EPISODE if ep == 1: print(f"\n--- Starting Phase 1: Training Low-Level Agent Only (up to ep {JOINT_TRAINING_START_EPISODE-1}) ---") if ep == JOINT_TRAINING_START_EPISODE: print(f"\n--- Starting Phase 2: Joint Hierarchical Training (from ep {JOINT_TRAINING_START_EPISODE}) ---") # The main loop continues as long as the episode is not done while not done_all: total_steps += 1 step_count += 1 # Action Selection (Low-Level) batch_global_obs = obs_clusters.reshape(n_clusters, -1) low_level_actions_list = [] low_level_logps_list = [] for c_idx in range(n_clusters): agent = low_agents[c_idx] local_obs_cluster = obs_clusters[c_idx] global_obs_cluster = batch_global_obs[c_idx] actions, logps = agent.select_action(local_obs_cluster, global_obs_cluster) low_level_actions_list.append(actions) low_level_logps_list.append(logps) low_level_actions = np.stack(low_level_actions_list) low_level_logps = np.stack(low_level_logps_list) # Action Selection & Transfers (High-Level, Phase 2 only) if is_phase_1: exports, imports = None, None else: inter_cluster_obs_local_list = [coordinator.get_cluster_state(se, step_count) for se in cluster_env.cluster_envs] inter_cluster_obs_local = np.array(inter_cluster_obs_local_list) # Create the global state for the high-level agent inter_cluster_obs_global = inter_cluster_obs_local.flatten() # Call select_action with local and global states high_level_action, high_level_logp = inter_agent.select_action( inter_cluster_obs_local, inter_cluster_obs_global ) current_reports = {i: {'export_capacity': cluster_env.get_export_capacity(i), 'import_capacity': cluster_env.get_import_capacity(i)} for i in range(n_clusters)} exports, imports = coordinator.build_transfers(high_level_action, current_reports) # Environment Step next_obs_clusters, rewards, done_all, step_info = cluster_env.step( low_level_actions, exports=exports, imports=imports ) cluster_infos = step_info.get("cluster_infos") day_logs.append({ "costs": cluster_infos["costs"], "grid_import_no_p2p": cluster_infos["grid_import_no_p2p"], "charge_amount": cluster_infos.get("charge_amount"), "discharge_amount": cluster_infos.get("discharge_amount") }) # Reward Calculation and Data Storage per_agent_rewards = np.stack(cluster_infos['agent_rewards']) rewards_for_buffer = per_agent_rewards if not is_phase_1: transfers_for_logging = (exports, imports) high_level_rewards_per_cluster = coordinator.compute_inter_cluster_reward( all_cluster_infos=cluster_infos, actual_transfers=transfers_for_logging, step_count=step_count ) ep_total_inter_cluster_reward += np.sum(high_level_rewards_per_cluster) # Get next state for high-level agent's buffer next_inter_cluster_obs_local_list = [coordinator.get_cluster_state(se, step_count + 1) for se in cluster_env.cluster_envs] next_inter_cluster_obs_local = np.array(next_inter_cluster_obs_local_list) # Create the next global state next_inter_cluster_obs_global = next_inter_cluster_obs_local.flatten() # Store the transition in the high-level MAPPO agent's buffer inter_agent.store( inter_cluster_obs_local, # s_local inter_cluster_obs_global, # s_global high_level_action, # action high_level_logp, # log_prob high_level_rewards_per_cluster,# reward [done_all] * n_clusters, # done next_inter_cluster_obs_global # s'_global ) bonus_per_agent = np.zeros_like(per_agent_rewards) for c_idx in range(n_clusters): num_agents_in_cluster = per_agent_rewards.shape[1] if num_agents_in_cluster > 0: bonus = high_level_rewards_per_cluster[c_idx] / num_agents_in_cluster bonus_per_agent[c_idx, :] = bonus rewards_for_buffer = per_agent_rewards + bonus_per_agent # Data Storage (Low-Level) dones_list = step_info.get("cluster_dones") for idx in range(n_clusters): low_agents[idx].store( obs_clusters[idx], batch_global_obs[idx], low_level_actions[idx], low_level_logps[idx], rewards_for_buffer[idx], dones_list[idx], next_obs_clusters[idx].reshape(-1) ) cluster_rewards += per_agent_rewards total_cost += np.sum(cluster_infos['costs']) total_grid_import += np.sum(cluster_infos['grid_import_with_p2p']) obs_clusters = next_obs_clusters # Agent Updates (End of Episode) if is_phase_1: for agent in low_agents: agent.update() else: CYCLE_LENGTH = FREEZE_HIGH_FOR_EPISODES + FREEZE_LOW_FOR_EPISODES phase2_episode_num = ep - JOINT_TRAINING_START_EPISODE position_in_cycle = phase2_episode_num % CYCLE_LENGTH if position_in_cycle < FREEZE_HIGH_FOR_EPISODES: print(f"Updating ALL LOW-LEVEL agents (High-level is frozen).") for agent in low_agents: agent.update() else: print(f"Updating HIGH-LEVEL agent (Low-level is frozen).") inter_agent.update() # Unified End-of-Episode Logging duration = time.time() - start_time num_low_level_agents = n_clusters * n_agents_per_cluster get_price_fn = cluster_env.cluster_envs[0].get_grid_price # Calculate Costs & Cost Reduction # Use the recursive helper function to safely sum the broken data # This is guaranteed to produce a single number for each step baseline_costs_per_step = [ recursive_sum(entry["grid_import_no_p2p"]) * get_price_fn(i) for i, entry in enumerate(day_logs) ] total_baseline_cost = sum(baseline_costs_per_step) # Apply the same robust method to the actual costs actual_costs_per_step = [recursive_sum(entry["costs"]) for entry in day_logs] total_actual_cost = sum(actual_costs_per_step) cost_reduction_pct = (1 - (total_actual_cost / total_baseline_cost)) * 100 if total_baseline_cost > 0 else 0.0 # Calculate All Reward Metrics # Intra-Cluster (Low-Level) Rewards total_reward_intra = cluster_rewards.sum() mean_reward_intra = total_reward_intra / num_low_level_agents if num_low_level_agents > 0 else 0.0 # Inter-Cluster (High-Level) Rewards total_reward_inter = ep_total_inter_cluster_reward mean_reward_inter = total_reward_inter / step_count if step_count > 0 else 0.0 # Total System Rewards total_reward_system = total_reward_intra + total_reward_inter mean_reward_system = total_reward_system / num_low_level_agents if num_low_level_agents > 0 else 0.0 # Populate Logs for Plotting (to keep generate_plots working) intra_log.setdefault('total', []).append(total_reward_intra) intra_log.setdefault('mean', []).append(mean_reward_intra) inter_log.setdefault('total', []).append(total_reward_inter) inter_log.setdefault('mean', []).append(mean_reward_inter) total_log.setdefault('total', []).append(total_reward_system) total_log.setdefault('mean', []).append(mean_reward_system) cost_log.setdefault('total_cost', []).append(total_actual_cost) cost_log.setdefault('cost_without_p2p', []).append(total_baseline_cost) # Populate the Main Log for the Final CSV File episode_log_data.append({ "Episode": ep, "Mean_Reward_System": mean_reward_system, "Mean_Reward_Intra": mean_reward_intra, "Mean_Reward_Inter": mean_reward_inter, "Total_Reward_System": total_reward_system, "Total_Reward_Intra": total_reward_intra, "Total_Reward_Inter": total_reward_inter, "Cost_Reduction_Pct": cost_reduction_pct, "Episode_Duration": duration, }) # Print Final Episode Summary print(f"Ep {ep}/{NUM_EPISODES} | " f"Mean System R: {mean_reward_system:.3f} | " f"Cost Red: {cost_reduction_pct:.1f}% | " f"Time: {duration:.2f}s") if ep % CHECKPOINT_INTERVAL == 0 or ep == NUM_EPISODES: for c_idx, agent in enumerate(low_agents): agent.save(os.path.join(models_dir, f"low_cluster{c_idx}_ep{ep}.pth")) inter_agent.save(os.path.join(models_dir, f"inter_ep{ep}.pth")) print(f"Saved checkpoint at episode {ep}") print("Training completed! Aggregating final logs...") # Capture the metrics for the very last episode final_cluster_metrics = cluster_env.call('get_episode_metrics') final_system_metrics = { "grid_reduction_entire_day": sum(m["grid_reduction_entire_day"] for m in final_cluster_metrics), "grid_reduction_peak_hours": sum(m["grid_reduction_peak_hours"] for m in final_cluster_metrics), "total_cost_savings": sum(m["total_cost_savings"] for m in final_cluster_metrics), "battery_degradation_cost_total": sum(m["battery_degradation_cost_total"] for m in final_cluster_metrics), "fairness_on_cost_savings": np.mean([m["fairness_on_cost_savings"] for m in final_cluster_metrics]), "Episode": NUM_EPISODES } performance_metrics_log.append(final_system_metrics) # Create, Merge, and Save Final DataFrame df_rewards_log = pd.DataFrame(episode_log_data) df_perf_log = pd.DataFrame(performance_metrics_log) df_final_log = pd.merge(df_rewards_log, df_perf_log, on="Episode") log_csv_path = os.path.join(logs_dir, "training_performance_log.csv") # Add total training time to the dataframe before saving overall_end_time = time.time() total_duration_seconds = overall_end_time - overall_start_time total_time_row = pd.DataFrame([{"Episode": "Total_Training_Time", "Episode_Duration": total_duration_seconds}]) df_to_save = pd.concat([df_final_log, total_time_row], ignore_index=True) # Reorder and select columns for the final CSV columns_to_save = [ "Episode", "Mean_Reward_System", "Mean_Reward_Intra", "Mean_Reward_Inter", "Total_Reward_System", "Total_Reward_Intra", "Total_Reward_Inter", "Cost_Reduction_Pct", "battery_degradation_cost_total", "Episode_Duration", "total_cost_savings", "grid_reduction_entire_day", "fairness_on_cost_savings" ] df_to_save = df_to_save[[col for col in columns_to_save if col in df_to_save.columns]] df_to_save.to_csv(log_csv_path, index=False) print(f"Saved comprehensive training performance log to: {log_csv_path}") generate_plots( plots_dir=plots_dir, num_episodes=NUM_EPISODES, intra_log=intra_log, inter_log=inter_log, total_log=total_log, cost_log=cost_log, df_final_log=df_final_log ) overall_end_time = time.time() total_duration_seconds = overall_end_time - overall_start_time # Format into hours, minutes, seconds total_duration_formatted = str(timedelta(seconds=int(total_duration_seconds))) print("\n" + "="*50) print(f"Total Training Time: {total_duration_formatted} (HH:MM:SS)") print("="*50) def generate_plots( plots_dir: str, num_episodes: int, intra_log: dict, inter_log: dict, total_log: dict, cost_log: list, df_final_log: pd.DataFrame ): """ Generates and saves all final plots after training is complete. """ print("Training completed! Generating plots…") # Helper for moving average def moving_avg(series, window): return pd.Series(series).rolling(window=window, center=True, min_periods=1).mean().to_numpy() ma_window = 120 episodes = np.arange(1, num_episodes + 1) # Plot 1: Intra-cluster (Low-Level) Rewards fig, ax = plt.subplots(figsize=(12, 7)) ax.plot(episodes, moving_avg(intra_log['total'], ma_window), label=f'Total Reward (MA {ma_window})', linewidth=2) ax.set_xlabel("Episode") ax.set_ylabel("Total Intra-Cluster Reward", color='tab:blue') ax.tick_params(axis='y', labelcolor='tab:blue') ax.grid(True) ax2 = ax.twinx() ax2.plot(episodes, moving_avg(intra_log['mean'], ma_window), label=f'Mean Reward (MA {ma_window})', linewidth=2, linestyle='--', color='tab:cyan') ax2.set_ylabel("Mean Intra-Cluster Reward", color='tab:cyan') ax2.tick_params(axis='y', labelcolor='tab:cyan') fig.suptitle("Intra-Cluster (Low-Level Agent) Rewards") fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.9)) plt.savefig(os.path.join(plots_dir, "1_intra_cluster_rewards.png"), dpi=200) plt.close() # Plot 2: Inter-cluster (High-Level) Rewards fig, ax = plt.subplots(figsize=(12, 7)) ax.plot(episodes, moving_avg(inter_log['total'], ma_window), label=f'Total Reward (MA {ma_window})', linewidth=2, color='tab:green') ax.set_xlabel("Episode") ax.set_ylabel("Total Inter-Cluster Reward", color='tab:green') ax.tick_params(axis='y', labelcolor='tab:green') ax.grid(True) ax2 = ax.twinx() ax2.plot(episodes, moving_avg(inter_log['mean'], ma_window), label=f'Mean Reward (MA {ma_window})', linewidth=2, linestyle='--', color='mediumseagreen') ax2.set_ylabel("Mean Inter-Cluster Reward", color='mediumseagreen') ax2.tick_params(axis='y', labelcolor='mediumseagreen') fig.suptitle("Inter-Cluster (High-Level Agent) Rewards") fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.9)) plt.savefig(os.path.join(plots_dir, "2_inter_cluster_rewards.png"), dpi=200) plt.close() # Plot 3: Total System Rewards fig, ax = plt.subplots(figsize=(12, 7)) ax.plot(episodes, moving_avg(total_log['total'], ma_window), label=f'Total System Reward (MA {ma_window})', linewidth=2, color='tab:red') ax.set_xlabel("Episode") ax.set_ylabel("Total System Reward", color='tab:red') ax.tick_params(axis='y', labelcolor='tab:red') ax.grid(True) ax2 = ax.twinx() ax2.plot(episodes, moving_avg(total_log['mean'], ma_window), label=f'Mean System Reward (MA {ma_window})', linewidth=2, linestyle='--', color='salmon') ax2.set_ylabel("Mean System Reward per Agent", color='salmon') ax2.tick_params(axis='y', labelcolor='salmon') fig.suptitle("Total System Rewards (Intra + Inter)") fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.9)) plt.savefig(os.path.join(plots_dir, "3_total_system_rewards.png"), dpi=200) plt.close() # Plot 4: Cost Reduction cost_df = pd.DataFrame(cost_log) cost_df['cost_reduction_pct'] = 100 * (1 - (cost_df['total_cost'] / cost_df['cost_without_p2p'])).clip(lower=-np.inf, upper=100) plt.figure(figsize=(12, 7)) plt.plot(episodes, moving_avg(cost_df['cost_reduction_pct'], ma_window), label=f'Cost Reduction % (MA {ma_window})', color='purple', linewidth=2) plt.xlabel("Episode") plt.ylabel("Cost Reduction (%)") plt.title("Total System-Wide Cost Reduction") plt.legend() plt.grid(True) plt.savefig(os.path.join(plots_dir, "4_cost_reduction.png"), dpi=200) plt.close() df_plot = df_final_log[pd.to_numeric(df_final_log['Episode'], errors='coerce').notna()].copy() df_plot['Episode'] = pd.to_numeric(df_plot['Episode']) # Plot 5: Battery Degradation Cost plt.figure(figsize=(12, 7)) plt.plot(df_plot["Episode"], moving_avg(df_plot["battery_degradation_cost_total"], ma_window), label=f'Degradation Cost (MA {ma_window})', color='darkgreen', linewidth=2) plt.xlabel("Episode") plt.ylabel("Total Degradation Cost ($)") plt.title("Total Battery Degradation Cost") plt.legend() plt.grid(True) plt.savefig(os.path.join(plots_dir, "5_battery_degradation_cost.png"), dpi=200) plt.close() print(f"All plots have been saved to: {plots_dir}") if __name__ == "__main__": main()