import os import sys import numpy as np import torch # Ensure project root is on the Python path # Please ensure you follow proper directory structure for running this code sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from Environment.solar_sys_environment import SolarSys from Environment.cluster_env_wrapper import GlobalPriceVecEnvWrapper from Environment.cluster_env_wrapper import make_vec_env class InterClusterLedger: """ Tracks inter-cluster debts/transfers. """ def __init__(self): self.balances = {} def record_transfer(self, from_id: str, to_id: str, amount: float): if from_id == to_id: return self.balances.setdefault(from_id, {}) self.balances.setdefault(to_id, {}) self.balances[from_id][to_id] = self.balances[from_id].get(to_id, 0.0) - amount self.balances[to_id][from_id] = self.balances[to_id].get(from_id, 0.0) + amount def get_balance(self, a_id: str, b_id: str) -> float: return self.balances.get(a_id, {}).get(b_id, 0.0) def net_balances(self) -> dict: return self.balances class InterClusterCoordinator: def __init__( self, cluster_env, high_level_agent, ledger, max_transfer_kwh: float = 1000000.0, w_cost_savings: float = 2.0, w_grid_penalty: float = 0.3, w_p2p_bonus: float = 0.3 ): self.cluster_env = cluster_env self.agent = high_level_agent self.ledger = ledger self.max_transfer_kwh = max_transfer_kwh self.w_cost_savings = w_cost_savings self.w_grid_penalty = w_grid_penalty self.w_p2p_bonus = w_p2p_bonus def get_cluster_state(self, env, step_count: int) -> np.ndarray: """ array summarizing a single cluster's state by reading from its vectorized attributes. """ solar_env = env # This is one of the vectorized SolarSys envs idx = min(step_count, solar_env.num_steps - 1) agg_soc = np.sum(solar_env.battery_soc) agg_max_capacity = np.sum(solar_env.battery_max_capacity) agg_soc_fraction = agg_soc / agg_max_capacity if agg_max_capacity > 0 else 0.0 agg_demand = np.sum(solar_env.demands_day[idx]) agg_solar = np.sum(solar_env.solars_day[idx]) price = solar_env.get_grid_price(idx) t_norm = idx / float(solar_env.steps_per_day) return np.array([ agg_soc, agg_max_capacity, agg_soc_fraction, agg_demand, agg_solar, price, t_norm ], dtype=np.float32) def build_transfers(self, agent_action_vector: np.ndarray, reports: dict) -> tuple[np.ndarray, np.ndarray]: """ Acts as a centralized market maker based on agent actions and LIVE capacity reports. """ n = len(self.cluster_env.clusters) raw_export_prefs = agent_action_vector[:, 0] raw_import_prefs = agent_action_vector[:, 1] export_prefs = torch.softmax(torch.tensor(raw_export_prefs), dim=-1).numpy() import_prefs = torch.softmax(torch.tensor(raw_import_prefs), dim=-1).numpy() total_available_for_export = 0.0 potential_exports = np.zeros(n) for i in range(n): export_capacity = reports[i]['export_capacity'] pref = float(export_prefs[i]) potential_exports[i] = min(pref * self.max_transfer_kwh, export_capacity) total_available_for_export += potential_exports[i] total_requested_for_import = 0.0 potential_imports = np.zeros(n) for i in range(n): import_capacity = reports[i]['import_capacity'] pref = float(import_prefs[i]) potential_imports[i] = min(pref * self.max_transfer_kwh, import_capacity) total_requested_for_import += potential_imports[i] total_matched_energy = min(total_available_for_export, total_requested_for_import) actual_exports = np.zeros(n) actual_imports = np.zeros(n) if total_matched_energy > 1e-6: if total_available_for_export > 0: actual_exports = (potential_exports / total_available_for_export) * total_matched_energy if total_requested_for_import > 0: actual_imports = (potential_imports / total_requested_for_import) * total_matched_energy return actual_exports, actual_imports def compute_inter_cluster_reward(self, all_cluster_infos: dict, actual_transfers: tuple, step_count: int) -> np.ndarray: """ Computes an INDIVIDUAL reward for each cluster agent to solve the credit assignment problem. """ actual_exports, actual_imports = actual_transfers num_clusters = len(self.cluster_env.cluster_envs) cluster_rewards = np.zeros(num_clusters, dtype=np.float32) # Extract per-cluster cost and import data from the batched info dict costs_per_cluster = [np.sum(c) for c in all_cluster_infos['costs']] baseline_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_no_p2p']] actual_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_with_p2p']] # Get the single grid price for the current step grid_price = self.cluster_env.cluster_envs[0].get_grid_price(step_count) for i in range(num_clusters): baseline_cost_this_cluster = baseline_imports_per_cluster[i] * grid_price actual_cost_this_cluster = costs_per_cluster[i] cost_saved = baseline_cost_this_cluster - actual_cost_this_cluster r_savings = self.w_cost_savings * cost_saved r_grid = self.w_grid_penalty * actual_imports_per_cluster[i] p2p_volume_this_cluster = actual_exports[i] + actual_imports[i] r_p2p = self.w_p2p_bonus * p2p_volume_this_cluster cluster_rewards[i] = r_savings + r_p2p - r_grid return cluster_rewards