|
|
import torch |
|
|
import torch.nn as nn |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import os |
|
|
import joblib |
|
|
import math |
|
|
import datetime |
|
|
from tqdm import tqdm |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib.dates as mdates |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from hierarchical_diffusion_model import ( |
|
|
HierarchicalDiffusionModel, ConditionalUnet, ResnetBlock1D, |
|
|
AttentionBlock1D, DownBlock1D, UpBlock1D, |
|
|
SinusoidalPositionEmbeddings, ImprovedDiffusionModel |
|
|
) |
|
|
print("Diffusion model classes imported.") |
|
|
except ImportError: |
|
|
print("="*50) |
|
|
print("ERROR: Could not import model classes from 'hierarchical_diffusion_model.py'.") |
|
|
print("="*50) |
|
|
exit() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_amplitude_jitter(series, daily_samples=48, scale=0.05): |
|
|
series = series.copy() |
|
|
num_days = len(series) // daily_samples |
|
|
if num_days == 0: return series |
|
|
factors = np.random.normal(1.0, scale, size=num_days) |
|
|
for d in range(num_days): |
|
|
start, end = d * daily_samples, (d + 1) * daily_samples |
|
|
series[start:end] *= factors[d] |
|
|
return series |
|
|
|
|
|
def add_cloud_variability(pv, timestamps, base_sigma=0.25): |
|
|
pv = pv.copy() |
|
|
if len(pv) == 0: return pv |
|
|
days = pd.Series(pv, index=timestamps).groupby(timestamps.date) |
|
|
adjusted = [] |
|
|
for day, vals in days: |
|
|
cloud_factor = np.random.lognormal(mean=-0.02, sigma=base_sigma) |
|
|
hour = vals.index.hour |
|
|
day_pv = np.where((hour >= 6) & (hour <= 18), vals * cloud_factor, 0.0) |
|
|
adjusted.append(day_pv) |
|
|
if not adjusted: return np.array([]) |
|
|
return np.concatenate(adjusted) |
|
|
|
|
|
def enforce_physics(df: pd.DataFrame, pv_cap_kw: float | None = None) -> pd.DataFrame: |
|
|
df = df.copy() |
|
|
df['solar_generation'] = np.clip(df['solar_generation'], 0.0, None) |
|
|
hour = df.index.hour |
|
|
night = (hour < 7) | (hour > 18) |
|
|
df.loc[night, 'solar_generation'] = 0.0 |
|
|
export_mask = df['grid_usage'] < 0 |
|
|
if export_mask.any(): |
|
|
limited_export = -np.minimum(-df.loc[export_mask, 'grid_usage'], df.loc[export_mask, 'solar_generation']) |
|
|
df.loc[export_mask, 'grid_usage'] = limited_export |
|
|
zero_pv_neg_grid = export_mask & (df['solar_generation'] <= 1e-6) |
|
|
df.loc[zero_pv_neg_grid, 'grid_usage'] = 0.0 |
|
|
if pv_cap_kw is not None: |
|
|
df['solar_generation'] = np.clip(df['solar_generation'], 0.0, pv_cap_kw) |
|
|
return df |
|
|
|
|
|
def calculate_generation_length(duration: str, samples_per_day: int) -> int: |
|
|
"""Calculate samples needed.""" |
|
|
if duration == '1_year': |
|
|
return 365 * samples_per_day |
|
|
elif duration == '6_months': |
|
|
return 182 * samples_per_day |
|
|
elif duration == '2_months': |
|
|
return 60 * samples_per_day |
|
|
elif duration == '1_month': |
|
|
return 30 * samples_per_day |
|
|
elif duration == '14_days': |
|
|
return 14 * samples_per_day |
|
|
elif duration == '7_days': |
|
|
return 7 * samples_per_day |
|
|
elif duration == '2_days': |
|
|
return 2 * samples_per_day |
|
|
else: |
|
|
print(f"Warning: Unknown duration '{duration}'. Defaulting to 1 year.") |
|
|
return 365 * samples_per_day |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Config: |
|
|
|
|
|
MODEL_PATH = './trained_model/best_hierarchical_model.pth' |
|
|
SCALER_PATH = './data/global_scaler.gz' |
|
|
ORIGINAL_DATA_DIR = './data/per_house' |
|
|
OUTPUT_DIR = './generated_data' |
|
|
|
|
|
|
|
|
GENERATION_DURATION = '1_year' |
|
|
NUM_PROFILES_TO_GENERATE = 2000 |
|
|
PLOTS_TO_GENERATE = 20 |
|
|
GENERATION_BATCH_SIZE = 128 |
|
|
|
|
|
|
|
|
TRAINING_WINDOW_DAYS = 14 |
|
|
|
|
|
NUM_HOUSES_TRAINED_ON = 300 |
|
|
SAMPLES_PER_DAY = 48 |
|
|
NUM_FEATURES = 4 |
|
|
DOWNSCALE_FACTOR = 4 |
|
|
EMBEDDING_DIM = 64 |
|
|
HIDDEN_SIZE = 512 |
|
|
HIDDEN_DIMS = [HIDDEN_SIZE // 4, HIDDEN_SIZE // 2, HIDDEN_SIZE] |
|
|
DROPOUT = 0.1 |
|
|
USE_ATTENTION = True |
|
|
DIFFUSION_TIMESTEPS = 500 |
|
|
BLOCKS_PER_LEVEL = 3 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(cfg, run_output_dir): |
|
|
"""Main generation logic.""" |
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" |
|
|
print(f"Using device: {DEVICE}") |
|
|
|
|
|
csv_output_dir = os.path.join(run_output_dir, 'csv') |
|
|
plot_output_dir = os.path.join(run_output_dir, 'plots') |
|
|
os.makedirs(csv_output_dir, exist_ok=True) |
|
|
os.makedirs(plot_output_dir, exist_ok=True) |
|
|
|
|
|
print("Loading resources...") |
|
|
try: |
|
|
scaler = joblib.load(cfg.SCALER_PATH) |
|
|
if scaler.n_features_in_ != cfg.NUM_FEATURES: |
|
|
print(f"WARNING: Scaler was fit on {scaler.n_features_in_} features, but model expects {cfg.NUM_FEATURES}.") |
|
|
|
|
|
original_files = sorted([f for f in os.listdir(cfg.ORIGINAL_DATA_DIR) if f.endswith('.csv')]) |
|
|
if not original_files: |
|
|
raise FileNotFoundError("No original data files found to extract timestamps.") |
|
|
|
|
|
sample_original_df = pd.read_csv(os.path.join(cfg.ORIGINAL_DATA_DIR, original_files[0]), index_col='timestamp', parse_dates=True) |
|
|
|
|
|
|
|
|
full_timestamps = sample_original_df.index[:(365 * cfg.SAMPLES_PER_DAY)] |
|
|
|
|
|
|
|
|
total_samples_needed = calculate_generation_length(cfg.GENERATION_DURATION, cfg.SAMPLES_PER_DAY) |
|
|
|
|
|
|
|
|
TRAINING_WINDOW_SAMPLES = cfg.TRAINING_WINDOW_DAYS * cfg.SAMPLES_PER_DAY |
|
|
|
|
|
|
|
|
if total_samples_needed > len(full_timestamps): |
|
|
print(f"Warning: Requested {total_samples_needed} samples, but file has {len(full_timestamps)}. Clamping to max.") |
|
|
total_samples_needed = len(full_timestamps) |
|
|
|
|
|
print(f"Goal: Generate {total_samples_needed} samples ({cfg.GENERATION_DURATION}) per profile.") |
|
|
print(f"Strategy: Stitching {TRAINING_WINDOW_SAMPLES}-sample chunks.") |
|
|
|
|
|
model = HierarchicalDiffusionModel( |
|
|
in_channels=cfg.NUM_FEATURES, |
|
|
num_houses=cfg.NUM_HOUSES_TRAINED_ON, |
|
|
downscale_factor=cfg.DOWNSCALE_FACTOR, |
|
|
embedding_dim=cfg.EMBEDDING_DIM, |
|
|
hidden_dims=cfg.HIDDEN_DIMS, |
|
|
dropout=cfg.DROPOUT, |
|
|
use_attention=cfg.USE_ATTENTION, |
|
|
num_timesteps=cfg.DIFFUSION_TIMESTEPS, |
|
|
blocks_per_level=cfg.BLOCKS_PER_LEVEL |
|
|
) |
|
|
|
|
|
model.load_state_dict(torch.load(cfg.MODEL_PATH, map_location=DEVICE)) |
|
|
model.to(DEVICE) |
|
|
model.eval() |
|
|
print("Model, scaler, timestamps ready.") |
|
|
|
|
|
except FileNotFoundError as e: |
|
|
print(f"ERROR: A required file was not found. Details: {e}") |
|
|
return |
|
|
except Exception as e: |
|
|
print(f"An error occurred during setup: {e}") |
|
|
return |
|
|
|
|
|
num_batches = math.ceil(cfg.NUM_PROFILES_TO_GENERATE / cfg.GENERATION_BATCH_SIZE) |
|
|
house_counter = 0 |
|
|
|
|
|
pbar = tqdm(range(num_batches), desc="Generating Batches") |
|
|
for i in pbar: |
|
|
current_batch_size = min(cfg.GENERATION_BATCH_SIZE, cfg.NUM_PROFILES_TO_GENERATE - house_counter) |
|
|
if current_batch_size <= 0: break |
|
|
pbar.set_postfix({'batch_size': current_batch_size}) |
|
|
|
|
|
|
|
|
num_chunks_needed = math.ceil(total_samples_needed / TRAINING_WINDOW_SAMPLES) |
|
|
batch_chunks_list = [] |
|
|
|
|
|
for chunk_idx in range(num_chunks_needed): |
|
|
|
|
|
samples_remaining = total_samples_needed - (chunk_idx * TRAINING_WINDOW_SAMPLES) |
|
|
current_chunk_length = min(TRAINING_WINDOW_SAMPLES, samples_remaining) |
|
|
|
|
|
shape_to_generate = (current_chunk_length, cfg.NUM_FEATURES) |
|
|
|
|
|
|
|
|
sample_conditions = { |
|
|
"house_id": torch.randint(0, cfg.NUM_HOUSES_TRAINED_ON, (current_batch_size,), device=DEVICE), |
|
|
"day_of_week": torch.randint(0, 7, (current_batch_size,), device=DEVICE), |
|
|
"day_of_year": torch.randint(0, 365, (current_batch_size,), device=DEVICE) |
|
|
} |
|
|
|
|
|
with torch.no_grad(): |
|
|
|
|
|
generated_chunk_data = model.sample(current_batch_size, sample_conditions, shape=shape_to_generate) |
|
|
|
|
|
batch_chunks_list.append(generated_chunk_data.cpu().numpy()) |
|
|
|
|
|
|
|
|
generated_data_np = np.concatenate(batch_chunks_list, axis=1) |
|
|
|
|
|
|
|
|
|
|
|
for j in range(current_batch_size): |
|
|
current_house_num = house_counter + 1 |
|
|
|
|
|
profile_timestamps = full_timestamps[:total_samples_needed] |
|
|
normalized_series = generated_data_np[j] |
|
|
|
|
|
unscaled_series = scaler.inverse_transform(normalized_series) |
|
|
|
|
|
df = pd.DataFrame( |
|
|
unscaled_series, |
|
|
columns=['grid_usage', 'solar_generation', 'sin_time', 'cos_time'], |
|
|
index=profile_timestamps |
|
|
) |
|
|
|
|
|
df = enforce_physics(df) |
|
|
df['grid_usage'] = add_amplitude_jitter(df['grid_usage'].values, scale=0.08, daily_samples=cfg.SAMPLES_PER_DAY) |
|
|
df['solar_generation'] = add_cloud_variability(df['solar_generation'].values, df.index, base_sigma=0.3) |
|
|
df = enforce_physics(df) |
|
|
|
|
|
df_to_save = df[['grid_usage', 'solar_generation']] |
|
|
df_to_save.to_csv(os.path.join(csv_output_dir, f'generated_house_{current_house_num}.csv')) |
|
|
|
|
|
if house_counter < cfg.PLOTS_TO_GENERATE: |
|
|
plot_df = df_to_save.head(cfg.SAMPLES_PER_DAY * 14) |
|
|
plt.figure(figsize=(15, 6)) |
|
|
plt.plot(plot_df.index, plot_df['grid_usage'], label='Grid Usage', color='dodgerblue', alpha=0.9) |
|
|
plt.plot(plot_df.index, plot_df['solar_generation'], label='Solar Generation', color='darkorange', alpha=0.9) |
|
|
plt.title(f'Generated Data for Profile {current_house_num} (First 14 Days)') |
|
|
plt.xlabel('Timestamp'); plt.ylabel('Power (kW)'); plt.legend(); plt.grid(True, which='both', linestyle='--', linewidth=0.5) |
|
|
plt.tight_layout() |
|
|
plt.savefig(os.path.join(plot_output_dir, f'generated_profile_{current_house_num}_plot.png')) |
|
|
plt.close() |
|
|
|
|
|
house_counter += 1 |
|
|
|
|
|
print(f"\nSuccessfully generated and saved {house_counter} house profiles.") |
|
|
if cfg.PLOTS_TO_GENERATE > 0: |
|
|
print(f"Plots saved to '{plot_output_dir}'.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
config = Config() |
|
|
|
|
|
|
|
|
run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") |
|
|
run_name = f"generation_run_{config.GENERATION_DURATION}_{run_timestamp}" |
|
|
run_output_dir = os.path.join(config.OUTPUT_DIR, run_name) |
|
|
os.makedirs(run_output_dir, exist_ok=True) |
|
|
|
|
|
print(f"Starting new generation run: {run_name}") |
|
|
print(f"All outputs will be saved to: {run_output_dir}") |
|
|
|
|
|
|
|
|
main(config, run_output_dir) |
|
|
|
|
|
print("\nGeneration process complete.") |