BuildSustain-02 / app /climate_data.py
mabuseif's picture
Update app/climate_data.py
1446093 verified
"""
BuildSustain - Climate Data Module
This module handles the climate data selection, EPW file processing, and display of climate information
for the BuildSustain application. It allows users to upload EPW weather files or select climate projection
data and extracts relevant climate data for use in load calculations.
Developed by: Dr Majed Abuseif, Deakin University
© 2025
"""
import streamlit as st
import pandas as pd
import numpy as np
import os
import json
import io
import logging
import plotly.graph_objects as go
import plotly.express as px
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple, Union
import math
import re
from os.path import join as os_join
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Define constants
MONTHS = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
CLIMATE_ZONES = ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"]
AU_CCH_DIR = "au_cch" # Relative path to au_cch folder
SKY_CLEARNESS_CONSTANT = 5.535e-6 # Constant k for Perez model Sky Clearness Index
# Location mapping for Australian climate projections
LOCATION_MAPPING = {
"24": {"city": "Canberra", "state": "ACT"},
"11": {"city": "Coffs Harbour", "state": "NSW"},
"17": {"city": "Sydney RO (Observatory Hill)", "state": "NSW"},
"56": {"city": "Mascot (Sydney Airport)", "state": "NSW"},
"77": {"city": "Parramatta", "state": "NSW"},
"78": {"city": "Sub-Alpine (Cooma Airport)", "state": "NSW"},
"79": {"city": "Blue Mountains", "state": "NSW"},
"1": {"city": "Darwin", "state": "NT"},
"6": {"city": "Alice Springs", "state": "NT"},
"5": {"city": "Townsville", "state": "QLD"},
"7": {"city": "Rockhampton", "state": "QLD"},
"10": {"city": "Brisbane", "state": "QLD"},
"19": {"city": "Charleville", "state": "QLD"},
"32": {"city": "Cairns", "state": "QLD"},
"70": {"city": "Toowoomba", "state": "QLD"},
"16": {"city": "Adelaide", "state": "SA"},
"75": {"city": "Adelaide Coastal (AMO)", "state": "SA"},
"26": {"city": "Hobart", "state": "TAS"},
"21": {"city": "Melbourne RO", "state": "VIC"},
"27": {"city": "Mildura", "state": "VIC"},
"60": {"city": "Tullamarine (Melbourne Airport)", "state": "VIC"},
"63": {"city": "Warrnambool", "state": "VIC"},
"66": {"city": "Ballarat", "state": "VIC"},
"30": {"city": "Wyndham", "state": "WA"},
"52": {"city": "Swanbourne", "state": "WA"},
"58": {"city": "Albany", "state": "WA"},
"83": {"city": "Christmas Island", "state": "WA"}
}
class ClimateDataManager:
"""Class for managing climate data from EPW files."""
def __init__(self):
"""Initialize climate data manager."""
pass
def load_epw(self, uploaded_file, location_num: str = None, rcp: str = None, year: str = None) -> Dict[str, Any]:
"""
Parse an EPW file and extract climate data.
Args:
uploaded_file: The uploaded EPW file object or file content as string
location_num: Location number for climate projection (optional)
rcp: RCP scenario for climate projection (optional)
year: Year for climate projection (optional)
Returns:
Dict containing parsed climate data
"""
try:
# Read the EPW file
if isinstance(uploaded_file, str):
content = uploaded_file
epw_filename = f"{location_num}_{rcp}_{year}.epw"
else:
content = uploaded_file.getvalue().decode('utf-8')
epw_filename = uploaded_file.name
lines = content.split('\n')
# Extract header information (first 8 lines)
header_lines = lines[:8]
# Parse location data from line 1
location_data = header_lines[0].split(',')
# Extract location information
location = {
"city": location_data[1].strip(),
"state_province": location_data[2].strip(),
"country": location_data[3].strip(),
"source": location_data[4].strip(),
"wmo": location_data[5].strip(),
"latitude": float(location_data[6]),
"longitude": float(location_data[7]),
"timezone": float(location_data[8]),
"elevation": float(location_data[9])
}
# Override city and state from LOCATION_MAPPING if provided
if location_num in LOCATION_MAPPING:
location["city"] = LOCATION_MAPPING[location_num]["city"]
location["state_province"] = LOCATION_MAPPING[location_num]["state"]
# Parse TYPICAL/EXTREME PERIODS
typical_extreme_periods = {}
date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$'
for line in lines:
if line.startswith("TYPICAL/EXTREME PERIODS"):
parts = line.strip().split(',')
try:
num_periods = int(parts[1])
except ValueError:
logger.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.")
break
for i in range(num_periods):
try:
if len(parts) < 2 + i*4 + 4:
logger.warning(f"Insufficient fields for period {i+1}, skipping.")
continue
period_name = parts[2 + i*4]
period_type = parts[3 + i*4]
start_date = parts[4 + i*4].strip()
end_date = parts[5 + i*4].strip()
if period_name in [
"Summer - Week Nearest Max Temperature For Period",
"Summer - Week Nearest Average Temperature For Period",
"Winter - Week Nearest Min Temperature For Period",
"Winter - Week Nearest Average Temperature For Period"
]:
season = 'summer' if 'Summer' in period_name else 'winter'
period_type = 'extreme' if 'Max' in period_name or 'Min' in period_name else 'typical'
key = f"{season}_{period_type}"
start_date_clean = re.sub(r'\s+', '', start_date)
end_date_clean = re.sub(r'\s+', '', end_date)
if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
logger.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.")
continue
start_month, start_day = map(int, start_date_clean.split('/'))
end_month, end_day = map(int, end_date_clean.split('/'))
typical_extreme_periods[key] = {
"start": {"month": start_month, "day": start_day},
"end": {"month": end_month, "day": end_day}
}
except (IndexError, ValueError) as e:
logger.warning(f"Error parsing period {i+1}: {str(e)}, skipping.")
continue
break
# Parse GROUND TEMPERATURES
ground_temperatures = {}
for line in lines:
if line.startswith("GROUND TEMPERATURES"):
parts = line.strip().split(',')
try:
num_depths = int(parts[1])
except ValueError:
logger.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.")
break
for i in range(num_depths):
try:
if len(parts) < 2 + i*16 + 16:
logger.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.")
continue
depth = parts[2 + i*16]
temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()]
if len(temps) != 12:
logger.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.")
continue
ground_temperatures[depth] = temps
except (ValueError, IndexError) as e:
logger.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.")
continue
break
# Parse data rows (starting from line 9)
data_lines = lines[8:]
# Create a DataFrame from the data rows
data = []
for line in data_lines:
if line.strip(): # Skip empty lines
data.append(line.split(','))
# Define core columns (common to both 32 and 35 column formats)
core_columns = [
"year", "month", "day", "hour", "minute", "data_source", "dry_bulb_temp",
"dew_point_temp", "relative_humidity", "atmospheric_pressure", "extraterrestrial_radiation",
"extraterrestrial_radiation_normal", "horizontal_infrared_radiation", "global_horizontal_radiation",
"direct_normal_radiation", "diffuse_horizontal_radiation", "global_horizontal_illuminance",
"direct_normal_illuminance", "diffuse_horizontal_illuminance", "zenith_luminance",
"wind_direction", "wind_speed", "total_sky_cover", "opaque_sky_cover", "visibility",
"ceiling_height", "present_weather_observation", "present_weather_codes",
"precipitable_water", "aerosol_optical_depth", "snow_depth", "days_since_last_snowfall"
]
# Additional columns for 35-column format
additional_columns = ["albedo", "liquid_precipitation_depth", "liquid_precipitation_quantity"]
# Determine number of columns in data
num_columns = len(data[0]) if data else 0
if num_columns not in [32, 35]:
raise ValueError(f"Invalid number of columns in EPW file: {num_columns}. Expected 32 or 35 columns.")
# Select appropriate columns based on file format
columns = core_columns if num_columns == 32 else core_columns + additional_columns
# Create DataFrame
df = pd.DataFrame(data, columns=columns[:num_columns])
# Convert numeric columns
numeric_columns = [
"dry_bulb_temp", "dew_point_temp", "relative_humidity", "atmospheric_pressure",
"global_horizontal_radiation", "direct_normal_radiation", "diffuse_horizontal_radiation",
"wind_direction", "wind_speed"
]
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Calculate diffuse fraction
df['diffuse_fraction'] = df.apply(
lambda row: row['diffuse_horizontal_radiation'] / row['global_horizontal_radiation'] if row['global_horizontal_radiation'] > 0 else 0.0, axis=1
)
# Calculate design conditions
design_conditions = self._calculate_design_conditions(df)
# Process hourly data
hourly_data = self._process_hourly_data(df)
# Determine climate zone based on HDD and CDD
climate_zone = self._determine_climate_zone(
design_conditions["heating_degree_days"],
design_conditions["cooling_degree_days"]
)
# Create climate data dictionary
climate_data = {
"id": f"{location['city']}_{location['country']}_{rcp}_{year}".replace(" ", "_") if rcp and year else f"{location['city']}_{location['country']}".replace(" ", "_"),
"location": location,
"design_conditions": design_conditions,
"climate_zone": climate_zone,
"hourly_data": hourly_data,
"epw_filename": epw_filename,
"typical_extreme_periods": typical_extreme_periods,
"ground_temperatures": ground_temperatures
}
logger.info(f"EPW file processed successfully: {epw_filename}")
return climate_data
except Exception as e:
logger.error(f"Error processing EPW file: {str(e)}")
raise ValueError(f"Error processing EPW file: {str(e)}")
def _calculate_design_conditions(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate design conditions from EPW data.
Args:
df: DataFrame containing EPW data
Returns:
Dict containing design conditions
"""
try:
# Convert temperatures from C to K if needed
temp_col = df["dry_bulb_temp"].astype(float)
# Calculate design temperatures
winter_design_temp = np.percentile(temp_col, 0.4) # 99.6% heating design temperature
summer_design_temp_db = np.percentile(temp_col, 99.6) # 0.4% cooling design temperature
# Calculate wet-bulb temperature
rh_col = df["relative_humidity"].astype(float)
wet_bulb_temp = self._calculate_wet_bulb(temp_col, rh_col)
summer_design_temp_wb = np.percentile(wet_bulb_temp, 99.6) # 0.4% cooling wet-bulb temperature
# Calculate degree days
df["month"] = df["month"].astype(int)
df["day"] = df["day"].astype(int)
df["hour"] = df["hour"].astype(int)
# Group by day and calculate average temperature
df["date"] = pd.to_datetime(df[["year", "month", "day"]].astype(int))
daily_temps = df.groupby("date")["dry_bulb_temp"].mean()
# Calculate heating and cooling degree days (base 18°C)
heating_degree_days = sum(max(0, 18 - temp) for temp in daily_temps)
cooling_degree_days = sum(max(0, temp - 18) for temp in daily_temps)
# Calculate monthly average temperatures
monthly_temps = df.groupby(df["month"])["dry_bulb_temp"].mean().tolist()
# Calculate monthly average radiation
monthly_radiation = df.groupby(df["month"])["global_horizontal_radiation"].mean().tolist()
# Calculate summer daily temperature range
latitude = df["latitude"].iloc[0] if "latitude" in df.columns else 0
if latitude >= 0: # Northern Hemisphere
summer_months = [6, 7, 8]
else: # Southern Hemisphere
summer_months = [12, 1, 2]
summer_data = df[df["month"].isin(summer_months)]
summer_daily_range = 0
if not summer_data.empty:
summer_daily_max = summer_data.groupby(["month", "day"])["dry_bulb_temp"].max()
summer_daily_min = summer_data.groupby(["month", "day"])["dry_bulb_temp"].min()
summer_daily_range = (summer_daily_max - summer_daily_min).mean()
# Calculate mean wind speed and pressure
wind_speed = df["wind_speed"].mean()
pressure = df["atmospheric_pressure"].mean()
return {
"winter_design_temp": round(winter_design_temp, 1),
"summer_design_temp_db": round(summer_design_temp_db, 1),
"summer_design_temp_wb": round(summer_design_temp_wb, 1),
"heating_degree_days": round(heating_degree_days),
"cooling_degree_days": round(cooling_degree_days),
"monthly_average_temps": [round(t, 1) for t in monthly_temps],
"monthly_average_radiation": [round(r, 1) for r in monthly_radiation],
"summer_daily_range": round(summer_daily_range, 1),
"wind_speed": round(wind_speed, 1),
"pressure": round(pressure)
}
except Exception as e:
logger.error(f"Error calculating design conditions: {str(e)}")
return {
"winter_design_temp": 0.0,
"summer_design_temp_db": 30.0,
"summer_design_temp_wb": 25.0,
"heating_degree_days": 0,
"cooling_degree_days": 0,
"monthly_average_temps": [20.0] * 12,
"monthly_average_radiation": [150.0] * 12,
"summer_daily_range": 8.0,
"wind_speed": 3.0,
"pressure": 101325.0
}
def _calculate_dew_point(self, dry_bulb: float, relative_humidity: float, atmospheric_pressure: float) -> float:
"""
Calculate dew point temperature using August-Roche-Magnus formula.
Args:
dry_bulb: Dry bulb temperature in °C
relative_humidity: Relative humidity in %
atmospheric_pressure: Atmospheric pressure in Pa (not used in simplified formula)
Returns:
Dew point temperature in °C
"""
try:
# Step 1: Calculate saturation vapor pressure (hPa)
es = 6.1078 * 10 ** ((7.5 * dry_bulb) / (237.3 + dry_bulb))
es = es * 100 # Convert to Pa
# Step 2: Calculate actual vapor pressure
e = (relative_humidity * es) / 100
# Step 3: Calculate dew point temperature
if e <= 0:
logger.warning(f"Invalid vapor pressure {e} Pa, returning dry bulb temperature {dry_bulb}°C as dew point")
return dry_bulb
ln_term = math.log(e / 610.78)
dew_point = (237.3 * ln_term) / (7.5 - ln_term)
# Ensure dew point does not exceed dry bulb temperature
dew_point = min(dew_point, dry_bulb)
return round(dew_point, 1)
except (ValueError, ZeroDivisionError) as e:
logger.warning(f"Error calculating dew point: {str(e)}, returning dry bulb temperature {dry_bulb}°C")
return dry_bulb
def _calculate_sky_clearness_index(self, diffuse_horizontal: float, global_horizontal: float, direct_normal: float) -> Optional[float]:
"""
Calculate Sky Clearness Index using the Perez model.
Args:
diffuse_horizontal: Diffuse horizontal irradiance in W/m²
global_horizontal: Global horizontal irradiance in W/m²
direct_normal: Direct normal irradiance in W/m²
Returns:
Sky Clearness Index (dimensionless) or None if undefined (e.g., nighttime)
"""
try:
# Handle nighttime or invalid data
if global_horizontal <= 0 or diffuse_horizontal < 0 or direct_normal < 0:
return None
# Cap diffuse_horizontal to global_horizontal to handle potential measurement errors
diffuse_horizontal = min(diffuse_horizontal, global_horizontal)
# Calculate Sky Clearness Index using Perez model
k = SKY_CLEARNESS_CONSTANT
if global_horizontal == 0:
return None # Avoid division by zero
epsilon = ((diffuse_horizontal / global_horizontal) + (k * direct_normal)) / (1 + k)
return round(epsilon, 3)
except Exception as e:
logger.warning(f"Error calculating Sky Clearness Index: {str(e)}, returning None")
return None
def _process_hourly_data(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
"""
Process hourly data from EPW DataFrame, including dew point, Sky Clearness Index, diffuse fraction, and total sky cover.
Args:
df: DataFrame containing EPW data
Returns:
List of hourly data records
"""
hourly_data = []
try:
# Ensure numeric columns
numeric_columns = [
"dry_bulb_temp", "dew_point_temp", "relative_humidity", "atmospheric_pressure",
"global_horizontal_radiation", "direct_normal_radiation",
"diffuse_horizontal_radiation", "wind_speed", "wind_direction", "total_sky_cover",
"diffuse_fraction"
]
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Convert to integers for month, day, hour
df["month"] = pd.to_numeric(df["month"], errors='coerce').astype('Int64')
df["day"] = pd.to_numeric(df["day"], errors='coerce').astype('Int64')
df["hour"] = pd.to_numeric(df["hour"], errors='coerce').astype('Int64')
# Process each row
for _, row in df.iterrows():
if pd.isna(row["month"]) or pd.isna(row["day"]) or pd.isna(row["hour"]) or pd.isna(row["dry_bulb_temp"]):
continue # Skip rows with missing critical data
# Calculate dew point temperature
dry_bulb = float(row["dry_bulb_temp"]) if not pd.isna(row["dry_bulb_temp"]) else 20.0
relative_humidity = float(row["relative_humidity"]) if not pd.isna(row["relative_humidity"]) else 50.0
atmospheric_pressure = float(row["atmospheric_pressure"]) if not pd.isna(row["atmospheric_pressure"]) else 101325.0
dew_point = self._calculate_dew_point(dry_bulb, relative_humidity, atmospheric_pressure)
# Calculate Sky Clearness Index
diffuse_horizontal = float(row["diffuse_horizontal_radiation"]) if not pd.isna(row["diffuse_horizontal_radiation"]) else 0.0
global_horizontal = float(row["global_horizontal_radiation"]) if not pd.isna(row["global_horizontal_radiation"]) else 0.0
direct_normal = float(row["direct_normal_radiation"]) if not pd.isna(row["direct_normal_radiation"]) else 0.0
sky_clearness_index = self._calculate_sky_clearness_index(diffuse_horizontal, global_horizontal, direct_normal)
# Extract total sky cover
total_sky_cover = float(row["total_sky_cover"]) if not pd.isna(row["total_sky_cover"]) else None
# Extract diffuse fraction
diffuse_fraction = float(row["diffuse_fraction"]) if not pd.isna(row["diffuse_fraction"]) else 0.0
record = {
"month": int(row["month"]),
"day": int(row["day"]),
"hour": int(row["hour"]),
"dry_bulb": float(row["dry_bulb_temp"]) if not pd.isna(row["dry_bulb_temp"]) else 20.0,
"dew_point": dew_point,
"relative_humidity": float(row["relative_humidity"]) if not pd.isna(row["relative_humidity"]) else 50.0,
"atmospheric_pressure": float(row["atmospheric_pressure"]) if not pd.isna(row["atmospheric_pressure"]) else 101325.0,
"global_horizontal_radiation": float(row["global_horizontal_radiation"]) if not pd.isna(row["global_horizontal_radiation"]) else 0.0,
"direct_normal_radiation": float(row["direct_normal_radiation"]) if not pd.isna(row["direct_normal_radiation"]) else 0.0,
"diffuse_horizontal_radiation": float(row["diffuse_horizontal_radiation"]) if not pd.isna(row["diffuse_horizontal_radiation"]) else 0.0,
"wind_speed": float(row["wind_speed"]) if not pd.isna(row["wind_speed"]) else 0.0,
"wind_direction": float(row["wind_direction"]) if not pd.isna(row["wind_direction"]) else 0.0,
"sky_clearness_index": sky_clearness_index if sky_clearness_index is not None else None,
"total_sky_cover": total_sky_cover,
"diffuse_fraction": diffuse_fraction
}
hourly_data.append(record)
# Check if we have the expected number of records (8760 hours in a year)
if len(hourly_data) < 8700: # Allow for some missing data
logger.warning(f"Hourly data has {len(hourly_data)} records instead of 8760. Some records may be missing.")
return hourly_data
except Exception as e:
logger.error(f"Error processing hourly data: {str(e)}")
return []
def _determine_climate_zone(self, hdd: float, cdd: float) -> str:
"""
Determine ASHRAE climate zone based on heating and cooling degree days.
Args:
hdd: Heating degree days (base 18°C)
cdd: Cooling degree days (base 18°C)
Returns:
ASHRAE climate zone designation
"""
if hdd >= 7000:
return "8"
elif hdd >= 5400:
return "7"
elif hdd >= 3900:
return "6A" if cdd <= 450 else "6B"
elif hdd >= 2700:
return "5A" if cdd <= 900 else ("5B" if cdd <= 1800 else "5C")
elif hdd >= 1800:
return "4A" if cdd <= 1800 else ("4B" if cdd <= 2700 else "4C")
elif hdd >= 900:
return "3A" if cdd <= 2700 else ("3B" if cdd <= 3600 else "3C")
elif hdd >= 0:
return "2A" if cdd <= 3600 else "2B"
else:
return "1A" if cdd <= 4500 else "1B"
@staticmethod
def _calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray:
"""
Calculate wet-bulb temperature using a simplified formula.
Args:
dry_bulb: Dry-bulb temperature in °C
relative_humidity: Relative humidity in %
Returns:
Wet-bulb temperature in °C
"""
wet_bulb = dry_bulb * np.arctan(0.151977 * np.sqrt(relative_humidity + 8.313659)) + \
np.arctan(dry_bulb + relative_humidity) - np.arctan(relative_humidity - 1.676331) + \
0.00391838 * (relative_humidity)**(3/2) * np.arctan(0.023101 * relative_humidity) - 4.686035
wet_bulb = np.minimum(wet_bulb, dry_bulb)
return wet_bulb
def get_locations_by_state(self, state: str) -> List[Dict[str, str]]:
"""Get list of locations for a given state from LOCATION_MAPPING."""
return [
{"number": loc_num, "city": loc_info["city"]}
for loc_num, loc_info in LOCATION_MAPPING.items()
if loc_info["state"] == state
]
def display_climate_page():
"""
Display the climate data page.
This is the main function called by main.py when the Climate Data page is selected.
"""
st.title("Climate Data and Design Requirements")
# Notify if climate data exists in session state
if "project_data" in st.session_state and "climate_data" in st.session_state.project_data and st.session_state.project_data["climate_data"]:
climate_data = st.session_state.project_data["climate_data"]
st.info(
f"Climate data already extracted for {climate_data['location']['city']}, {climate_data['location']['country']}. "
f"View details in the 'Climate Summary' tab or upload/select new data below."
)
# Display help information in an expandable section
with st.expander("Help & Information"):
display_climate_help()
# Initialize climate data manager
climate_manager = ClimateDataManager()
# Create tabs for different sections
tab1, tab2 = st.tabs(["EPW Data Input", "Climate Summary"])
# EPW Data Input tab
with tab1:
st.subheader("Select Climate Data Source")
# Option to choose data source
data_source = st.radio(
"Choose data source:",
["Upload EPW File", "Select Climate Projection"],
key="data_source"
)
if data_source == "Upload EPW File":
# File uploader for EPW files
uploaded_file = st.file_uploader(
"Upload EPW File",
type=["epw"],
help="Upload an EnergyPlus Weather (EPW) file for your location."
)
if uploaded_file is not None:
try:
with st.spinner("Processing EPW file..."):
climate_data = climate_manager.load_epw(uploaded_file)
# Store climate data in session state
st.session_state.project_data["climate_data"] = climate_data
st.success(f"EPW file processed successfully: {uploaded_file.name}")
# Display basic location information
location = climate_data["location"]
st.subheader("Location Information")
col1, col2 = st.columns(2)
with col1:
st.write(f"**City:** {location['city']}")
st.write(f"**State/Province:** {location['state_province']}")
st.write(f"**Country:** {location['country']}")
with col2:
st.write(f"**Latitude:** {location['latitude']}°")
st.write(f"**Longitude:** {location['longitude']}°")
st.write(f"**Elevation:** {location['elevation']} m")
st.write(f"**Time Zone:** {location['timezone']} hours (UTC)")
st.button("View Climate Summary", on_click=lambda: st.session_state.update({"climate_tab": "Climate Summary"}))
except Exception as e:
st.error(f"Error processing EPW file: {str(e)}")
logger.error(f"Error processing EPW file: {str(e)}")
else: # Select Climate Projection
st.markdown("""
### Climate Projection
Select from available Australian climate projection data based on CSIRO 2022 projections.
""")
# Dropdown menus for climate projection
country = st.selectbox("Country", ["Australia"], key="projection_country")
states = ["ACT", "NSW", "NT", "QLD", "SA", "TAS", "VIC", "WA"]
state = st.selectbox("State", states, key="projection_state")
locations = climate_manager.get_locations_by_state(state)
location_options = [f"{loc['city']} ({loc['number']})" for loc in locations]
location_display = st.selectbox("Location", location_options, key="location")
location_num = ""
if location_display:
location_num = next(loc["number"] for loc in locations if f"{loc['city']} ({loc['number']})" == location_display)
rcp_options = ["RCP2.6", "RCP4.5", "RCP8.5"]
rcp = st.selectbox("RCP Scenario", rcp_options, key="rcp")
year_options = ["2030", "2050", "2070", "2090"]
year = st.selectbox("Year", year_options, key="year")
if st.button("Extract Projection Data"):
with st.spinner("Extracting climate projection data..."):
file_path = os_join(AU_CCH_DIR, location_num, rcp, year)
logger.debug(f"Attempting to access directory: {os.path.abspath(file_path)}")
if not os.path.exists(file_path):
st.error(
f"No directory found at au_cch/{location_num}/{rcp}/{year}/. "
f"Ensure the 'au_cch' folder is in the repository root with structure "
f"au_cch/{location_num}/{rcp}/{year} (e.g., au_cch/1/RCP2.6/2070/) "
f"containing a single .epw file."
)
logger.error(f"Directory does not exist: {file_path}")
else:
try:
epw_files = [f for f in os.listdir(file_path) if f.endswith(".epw")]
if not epw_files:
st.error(
f"No EPW file found in au_cch/{location_num}/{rcp}/{year}/. "
f"Ensure the directory contains a single .epw file."
)
logger.error(f"No EPW file found in {file_path}")
elif len(epw_files) > 1:
st.error(
f"Multiple EPW files found in au_cch/{location_num}/{rcp}/{year}/: {epw_files}. "
f"Ensure exactly one .epw file per directory."
)
logger.error(f"Multiple EPW files found: {epw_files}")
else:
epw_file_path = os_join(file_path, epw_files[0])
with open(epw_file_path, 'r') as f:
epw_content = f.read()
climate_data = climate_manager.load_epw(epw_content, location_num, rcp, year)
st.session_state.project_data["climate_data"] = climate_data
st.success(
f"Successfully extracted climate projection data for "
f"{climate_data['location']['city']}, {climate_data['location']['country']}, "
f"{rcp}, {year}!"
)
logger.info(f"Successfully processed projection: {climate_data['id']}")
st.button("View Climate Summary", on_click=lambda: st.session_state.update({"climate_tab": "Climate Summary"}))
except Exception as e:
st.error(f"Error reading {epw_file_path}: {str(e)}")
logger.error(f"Error reading {epw_file_path}: {str(e)}")
# Climate Summary tab
with tab2:
if "project_data" in st.session_state and "climate_data" in st.session_state.project_data and st.session_state.project_data["climate_data"]:
display_climate_summary(st.session_state.project_data["climate_data"])
else:
st.info("Please upload an EPW file or select a climate projection in the 'EPW Data Input' tab to view climate summary.")
# Navigation buttons
col1, col2 = st.columns(2)
with col1:
if st.button("Back to Building Information", key="back_to_building"):
st.session_state.current_page = "Building Information"
st.rerun()
with col2:
if st.button("Continue to Material Library", key="continue_to_material"):
if "project_data" not in st.session_state or "climate_data" not in st.session_state.project_data or not st.session_state.project_data["climate_data"]:
st.warning("Please upload an EPW file or select a climate projection before continuing.")
else:
st.session_state.current_page = "Material Library"
st.rerun()
def display_climate_summary(climate_data: Dict[str, Any]):
"""
Display climate summary information.
Args:
climate_data: Dictionary containing climate data
"""
st.subheader("Climate Summary")
# Extract data
design = climate_data["design_conditions"]
location = climate_data["location"]
# Location Details and Typical/Extreme Periods side by side
col1, col2 = st.columns(2)
with col1:
st.markdown("### Location Details")
st.markdown(f"""
- **Country:** {location['country']}
- **City:** {location['city']}
- **State/Province:** {location['state_province']}
- **Latitude:** {location['latitude']}°
- **Longitude:** {location['longitude']}°
- **Elevation:** {location['elevation']} m
- **Time Zone:** {location['timezone']} hours (UTC)
""")
with col2:
if climate_data.get("typical_extreme_periods"):
st.markdown("### Typical/Extreme Periods")
period_items = [
f"- **{key.replace('_', ' ').title()}:** {period['start']['month']}/{period['start']['day']} to {period['end']['month']}/{period['end']['day']}"
for key, period in climate_data["typical_extreme_periods"].items()
]
st.markdown("\n".join(period_items))
# Climate Zone
st.markdown(f"### ASHRAE Climate Zone: {climate_data['climate_zone']}")
# Design Conditions
col1, col2 = st.columns(2)
with col1:
st.subheader("Design Temperatures")
st.write(f"**Winter Design Temperature:** {design['winter_design_temp']}°C")
st.write(f"**Summer Design Temperature (DB):** {design['summer_design_temp_db']}°C")
st.write(f"**Summer Design Temperature (WB):** {design['summer_design_temp_wb']}°C")
st.write(f"**Summer Daily Temperature Range:** {design['summer_daily_range']}°C")
with col2:
st.subheader("Degree Days")
st.write(f"**Heating Degree Days (Base 18°C):** {design['heating_degree_days']}")
st.write(f"**Cooling Degree Days (Base 18°C):** {design['cooling_degree_days']}")
st.write(f"**Average Wind Speed:** {design['wind_speed']} m/s")
st.write(f"**Average Atmospheric Pressure:** {design['pressure']} Pa")
# Monthly Temperature Chart
st.subheader("Monthly Average Temperatures")
fig_temp = go.Figure()
fig_temp.add_trace(go.Scatter(
x=MONTHS,
y=design["monthly_average_temps"],
mode='lines+markers',
name='Temperature',
line=dict(color='firebrick', width=2),
marker=dict(size=8)
))
fig_temp.update_layout(
xaxis_title="Month",
yaxis_title="Temperature (°C)",
height=400,
margin=dict(l=20, r=20, t=30, b=20),
)
st.plotly_chart(fig_temp, use_container_width=True)
# Monthly Radiation Chart
st.subheader("Monthly Average Solar Radiation")
fig_rad = go.Figure()
fig_rad.add_trace(go.Bar(
x=MONTHS,
y=design["monthly_average_radiation"],
name='Global Horizontal Radiation',
marker_color='gold'
))
fig_rad.update_layout(
xaxis_title="Month",
yaxis_title="Radiation (W/m²)",
height=400,
margin=dict(l=20, r=20, t=30, b=20),
)
st.plotly_chart(fig_rad, use_container_width=True)
# Ground Temperatures
if climate_data.get("ground_temperatures"):
st.subheader("Ground Temperatures")
table_data = []
for depth, temps in climate_data["ground_temperatures"].items():
row = {"Depth (m)": float(depth)}
row.update({month: f"{temp:.2f}" for month, temp in zip(MONTHS, temps)})
table_data.append(row)
df = pd.DataFrame(table_data)
st.dataframe(df, use_container_width=True)
csv = df.to_csv(index=False)
st.download_button(
label="Download Ground Temperatures as CSV",
data=csv,
file_name=f"ground_temperatures_{location['city']}_{location['country']}.csv",
mime="text/csv",
key=f"download_ground_temperatures_{climate_data['id']}"
)
# Hourly Climate Data Table
st.subheader("Hourly Climate Data")
if "hourly_data" in climate_data and climate_data["hourly_data"]:
hourly_table_data = [
{
"Month": record["month"],
"Day": record["day"],
"Hour": record["hour"],
"Dry Bulb Temp (°C)": f"{record['dry_bulb']:.1f}",
"Dew Point Temp (°C)": f"{record['dew_point']:.1f}" if record['dew_point'] is not None else "N/A",
"Relative Humidity (%)": f"{record['relative_humidity']:.1f}",
"Atmospheric Pressure (Pa)": f"{record['atmospheric_pressure']:.1f}",
"Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}",
"Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}",
"Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}",
"Wind Speed (m/s)": f"{record['wind_speed']:.1f}",
"Wind Direction (°)": f"{record['wind_direction']:.1f}",
"Sky Clearness Index": f"{record['sky_clearness_index']:.3f}" if record['sky_clearness_index'] is not None else "N/A",
"Total Sky Cover": f"{record['total_sky_cover']:.1f}" if record['total_sky_cover'] is not None else "N/A",
"Diffuse Fraction": f"{record['diffuse_fraction']:.3f}" if record['diffuse_fraction'] is not None else "N/A"
}
for record in climate_data["hourly_data"]
]
hourly_df = pd.DataFrame(hourly_table_data)
st.dataframe(hourly_df, use_container_width=True)
csv = hourly_df.to_csv(index=False)
st.download_button(
label="Download Hourly Climate Data as CSV",
data=csv,
file_name=f"hourly_climate_data_{location['city']}_{location['country']}.csv",
mime="text/csv",
key=f"download_hourly_climate_{climate_data['id']}"
)
# Hourly Data Statistics
st.subheader("Hourly Data Statistics")
hourly_count = len(climate_data["hourly_data"])
st.write(f"**Number of Hourly Records:** {hourly_count}")
if hourly_count < 8760:
st.warning(f"Expected 8760 hourly records for a full year, but found {hourly_count}. Some data may be missing.")
else:
st.warning("No hourly data available.")
def display_climate_help():
"""Display help information for the climate data page."""
st.markdown("""
### Climate Data Help
This section allows you to upload or select weather data for your location, which is essential for accurate calculations.
**EPW Files:**
EPW (EnergyPlus Weather) files contain hourly weather data for a specific location, including:
* Dry-bulb temperature
* Dew point temperature
* Relative humidity
* Solar radiation (direct and diffuse)
* Wind speed and direction
* Atmospheric pressure
* Sky Clearness Index (calculated)
* Diffuse Fraction (calculated)
**Where to Find EPW Files:**
* [EnergyPlus Weather Data](https://energyplus.net/weather)
* [Climate.OneBuilding.Org](https://climate.onebuilding.org/)
* [ASHRAE International Weather for Energy Calculations (IWEC)](https://www.ashrae.org/technical-resources/bookstore/ashrae-international-weather-files-for-energy-calculations-2-0-iwec2)
**Climate Projections:**
Select from predefined Australian climate projection data (CSIRO 2022) by choosing a location, RCP scenario, and future year.
**Climate Summary:**
After uploading an EPW file or selecting a climate projection, the Climate Summary tab will display:
* Location details (including Time Zone)
* ASHRAE Climate Zone
* Design temperatures for heating and cooling
* Heating and cooling degree days
* Typical/Extreme periods
* Ground temperatures by depth
* Monthly average temperatures and solar radiation
* Hourly data statistics with downloadable tables (including dew point, Sky Clearness Index, and Diffuse Fraction)
This information will be used throughout the calculation process.
""")