File size: 6,009 Bytes
55da406 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import os
import sys
import numpy as np
import torch
# Ensure project root is on the Python path
# Please ensure you follow proper directory structure for running this code
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from Environment.solar_sys_environment import SolarSys
from Environment.cluster_env_wrapper import GlobalPriceVecEnvWrapper
from Environment.cluster_env_wrapper import make_vec_env
class InterClusterLedger:
"""
Tracks inter-cluster debts/transfers.
"""
def __init__(self):
self.balances = {}
def record_transfer(self, from_id: str, to_id: str, amount: float):
if from_id == to_id: return
self.balances.setdefault(from_id, {})
self.balances.setdefault(to_id, {})
self.balances[from_id][to_id] = self.balances[from_id].get(to_id, 0.0) - amount
self.balances[to_id][from_id] = self.balances[to_id].get(from_id, 0.0) + amount
def get_balance(self, a_id: str, b_id: str) -> float:
return self.balances.get(a_id, {}).get(b_id, 0.0)
def net_balances(self) -> dict:
return self.balances
class InterClusterCoordinator:
def __init__(
self,
cluster_env,
high_level_agent,
ledger,
max_transfer_kwh: float = 1000000.0,
w_cost_savings: float = 2.0,
w_grid_penalty: float = 0.3,
w_p2p_bonus: float = 0.3
):
self.cluster_env = cluster_env
self.agent = high_level_agent
self.ledger = ledger
self.max_transfer_kwh = max_transfer_kwh
self.w_cost_savings = w_cost_savings
self.w_grid_penalty = w_grid_penalty
self.w_p2p_bonus = w_p2p_bonus
def get_cluster_state(self, env, step_count: int) -> np.ndarray:
"""
array summarizing a single cluster's state by reading from its vectorized attributes.
"""
solar_env = env # This is one of the vectorized SolarSys envs
idx = min(step_count, solar_env.num_steps - 1)
agg_soc = np.sum(solar_env.battery_soc)
agg_max_capacity = np.sum(solar_env.battery_max_capacity)
agg_soc_fraction = agg_soc / agg_max_capacity if agg_max_capacity > 0 else 0.0
agg_demand = np.sum(solar_env.demands_day[idx])
agg_solar = np.sum(solar_env.solars_day[idx])
price = solar_env.get_grid_price(idx)
t_norm = idx / float(solar_env.steps_per_day)
return np.array([
agg_soc, agg_max_capacity, agg_soc_fraction,
agg_demand, agg_solar, price, t_norm
], dtype=np.float32)
def build_transfers(self, agent_action_vector: np.ndarray, reports: dict) -> tuple[np.ndarray, np.ndarray]:
"""
Acts as a centralized market maker based on agent actions and LIVE capacity reports.
"""
n = len(self.cluster_env.clusters)
raw_export_prefs = agent_action_vector[:, 0]
raw_import_prefs = agent_action_vector[:, 1]
export_prefs = torch.softmax(torch.tensor(raw_export_prefs), dim=-1).numpy()
import_prefs = torch.softmax(torch.tensor(raw_import_prefs), dim=-1).numpy()
total_available_for_export = 0.0
potential_exports = np.zeros(n)
for i in range(n):
export_capacity = reports[i]['export_capacity']
pref = float(export_prefs[i])
potential_exports[i] = min(pref * self.max_transfer_kwh, export_capacity)
total_available_for_export += potential_exports[i]
total_requested_for_import = 0.0
potential_imports = np.zeros(n)
for i in range(n):
import_capacity = reports[i]['import_capacity']
pref = float(import_prefs[i])
potential_imports[i] = min(pref * self.max_transfer_kwh, import_capacity)
total_requested_for_import += potential_imports[i]
total_matched_energy = min(total_available_for_export, total_requested_for_import)
actual_exports = np.zeros(n)
actual_imports = np.zeros(n)
if total_matched_energy > 1e-6:
if total_available_for_export > 0:
actual_exports = (potential_exports / total_available_for_export) * total_matched_energy
if total_requested_for_import > 0:
actual_imports = (potential_imports / total_requested_for_import) * total_matched_energy
return actual_exports, actual_imports
def compute_inter_cluster_reward(self, all_cluster_infos: dict, actual_transfers: tuple, step_count: int) -> np.ndarray:
"""
Computes an INDIVIDUAL reward for each cluster agent to solve
the credit assignment problem.
"""
actual_exports, actual_imports = actual_transfers
num_clusters = len(self.cluster_env.cluster_envs)
cluster_rewards = np.zeros(num_clusters, dtype=np.float32)
# Extract per-cluster cost and import data from the batched info dict
costs_per_cluster = [np.sum(c) for c in all_cluster_infos['costs']]
baseline_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_no_p2p']]
actual_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_with_p2p']]
# Get the single grid price for the current step
grid_price = self.cluster_env.cluster_envs[0].get_grid_price(step_count)
for i in range(num_clusters):
baseline_cost_this_cluster = baseline_imports_per_cluster[i] * grid_price
actual_cost_this_cluster = costs_per_cluster[i]
cost_saved = baseline_cost_this_cluster - actual_cost_this_cluster
r_savings = self.w_cost_savings * cost_saved
r_grid = self.w_grid_penalty * actual_imports_per_cluster[i]
p2p_volume_this_cluster = actual_exports[i] + actual_imports[i]
r_p2p = self.w_p2p_bonus * p2p_volume_this_cluster
cluster_rewards[i] = r_savings + r_p2p - r_grid
return cluster_rewards |