|
|
import os |
|
|
import sys |
|
|
import numpy as np |
|
|
import torch |
|
|
|
|
|
|
|
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
|
|
|
|
|
from Environment.solar_sys_environment import SolarSys |
|
|
from Environment.cluster_env_wrapper import GlobalPriceVecEnvWrapper |
|
|
from Environment.cluster_env_wrapper import make_vec_env |
|
|
class InterClusterLedger: |
|
|
""" |
|
|
Tracks inter-cluster debts/transfers. |
|
|
""" |
|
|
def __init__(self): |
|
|
self.balances = {} |
|
|
|
|
|
def record_transfer(self, from_id: str, to_id: str, amount: float): |
|
|
if from_id == to_id: return |
|
|
self.balances.setdefault(from_id, {}) |
|
|
self.balances.setdefault(to_id, {}) |
|
|
self.balances[from_id][to_id] = self.balances[from_id].get(to_id, 0.0) - amount |
|
|
self.balances[to_id][from_id] = self.balances[to_id].get(from_id, 0.0) + amount |
|
|
|
|
|
def get_balance(self, a_id: str, b_id: str) -> float: |
|
|
return self.balances.get(a_id, {}).get(b_id, 0.0) |
|
|
|
|
|
def net_balances(self) -> dict: |
|
|
return self.balances |
|
|
|
|
|
|
|
|
class InterClusterCoordinator: |
|
|
def __init__( |
|
|
self, |
|
|
cluster_env, |
|
|
high_level_agent, |
|
|
ledger, |
|
|
max_transfer_kwh: float = 1000000.0, |
|
|
w_cost_savings: float = 2.0, |
|
|
w_grid_penalty: float = 0.3, |
|
|
w_p2p_bonus: float = 0.3 |
|
|
): |
|
|
self.cluster_env = cluster_env |
|
|
self.agent = high_level_agent |
|
|
self.ledger = ledger |
|
|
self.max_transfer_kwh = max_transfer_kwh |
|
|
self.w_cost_savings = w_cost_savings |
|
|
self.w_grid_penalty = w_grid_penalty |
|
|
self.w_p2p_bonus = w_p2p_bonus |
|
|
|
|
|
def get_cluster_state(self, env, step_count: int) -> np.ndarray: |
|
|
""" |
|
|
array summarizing a single cluster's state by reading from its vectorized attributes. |
|
|
""" |
|
|
solar_env = env |
|
|
idx = min(step_count, solar_env.num_steps - 1) |
|
|
agg_soc = np.sum(solar_env.battery_soc) |
|
|
agg_max_capacity = np.sum(solar_env.battery_max_capacity) |
|
|
agg_soc_fraction = agg_soc / agg_max_capacity if agg_max_capacity > 0 else 0.0 |
|
|
|
|
|
agg_demand = np.sum(solar_env.demands_day[idx]) |
|
|
agg_solar = np.sum(solar_env.solars_day[idx]) |
|
|
|
|
|
price = solar_env.get_grid_price(idx) |
|
|
t_norm = idx / float(solar_env.steps_per_day) |
|
|
|
|
|
return np.array([ |
|
|
agg_soc, agg_max_capacity, agg_soc_fraction, |
|
|
agg_demand, agg_solar, price, t_norm |
|
|
], dtype=np.float32) |
|
|
|
|
|
def build_transfers(self, agent_action_vector: np.ndarray, reports: dict) -> tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Acts as a centralized market maker based on agent actions and LIVE capacity reports. |
|
|
""" |
|
|
n = len(self.cluster_env.clusters) |
|
|
raw_export_prefs = agent_action_vector[:, 0] |
|
|
raw_import_prefs = agent_action_vector[:, 1] |
|
|
|
|
|
export_prefs = torch.softmax(torch.tensor(raw_export_prefs), dim=-1).numpy() |
|
|
import_prefs = torch.softmax(torch.tensor(raw_import_prefs), dim=-1).numpy() |
|
|
|
|
|
total_available_for_export = 0.0 |
|
|
potential_exports = np.zeros(n) |
|
|
for i in range(n): |
|
|
export_capacity = reports[i]['export_capacity'] |
|
|
pref = float(export_prefs[i]) |
|
|
potential_exports[i] = min(pref * self.max_transfer_kwh, export_capacity) |
|
|
total_available_for_export += potential_exports[i] |
|
|
|
|
|
total_requested_for_import = 0.0 |
|
|
potential_imports = np.zeros(n) |
|
|
for i in range(n): |
|
|
import_capacity = reports[i]['import_capacity'] |
|
|
pref = float(import_prefs[i]) |
|
|
potential_imports[i] = min(pref * self.max_transfer_kwh, import_capacity) |
|
|
total_requested_for_import += potential_imports[i] |
|
|
|
|
|
total_matched_energy = min(total_available_for_export, total_requested_for_import) |
|
|
actual_exports = np.zeros(n) |
|
|
actual_imports = np.zeros(n) |
|
|
|
|
|
if total_matched_energy > 1e-6: |
|
|
if total_available_for_export > 0: |
|
|
actual_exports = (potential_exports / total_available_for_export) * total_matched_energy |
|
|
if total_requested_for_import > 0: |
|
|
actual_imports = (potential_imports / total_requested_for_import) * total_matched_energy |
|
|
|
|
|
return actual_exports, actual_imports |
|
|
|
|
|
def compute_inter_cluster_reward(self, all_cluster_infos: dict, actual_transfers: tuple, step_count: int) -> np.ndarray: |
|
|
""" |
|
|
Computes an INDIVIDUAL reward for each cluster agent to solve |
|
|
the credit assignment problem. |
|
|
""" |
|
|
actual_exports, actual_imports = actual_transfers |
|
|
num_clusters = len(self.cluster_env.cluster_envs) |
|
|
cluster_rewards = np.zeros(num_clusters, dtype=np.float32) |
|
|
|
|
|
|
|
|
costs_per_cluster = [np.sum(c) for c in all_cluster_infos['costs']] |
|
|
baseline_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_no_p2p']] |
|
|
actual_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_with_p2p']] |
|
|
|
|
|
|
|
|
grid_price = self.cluster_env.cluster_envs[0].get_grid_price(step_count) |
|
|
|
|
|
for i in range(num_clusters): |
|
|
baseline_cost_this_cluster = baseline_imports_per_cluster[i] * grid_price |
|
|
actual_cost_this_cluster = costs_per_cluster[i] |
|
|
cost_saved = baseline_cost_this_cluster - actual_cost_this_cluster |
|
|
r_savings = self.w_cost_savings * cost_saved |
|
|
r_grid = self.w_grid_penalty * actual_imports_per_cluster[i] |
|
|
p2p_volume_this_cluster = actual_exports[i] + actual_imports[i] |
|
|
r_p2p = self.w_p2p_bonus * p2p_volume_this_cluster |
|
|
cluster_rewards[i] = r_savings + r_p2p - r_grid |
|
|
|
|
|
return cluster_rewards |