Spaces:
Sleeping
Sleeping
| """ | |
| BuildSustain - Climate Data Module | |
| This module handles the climate data selection, EPW file processing, and display of climate information | |
| for the BuildSustain application. It allows users to upload EPW weather files or select climate projection | |
| data and extracts relevant climate data for use in load calculations. | |
| Developed by: Dr Majed Abuseif, Deakin University | |
| © 2025 | |
| """ | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import json | |
| import io | |
| import logging | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Tuple, Union | |
| import math | |
| import re | |
| from os.path import join as os_join | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Define constants | |
| MONTHS = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] | |
| CLIMATE_ZONES = ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"] | |
| AU_CCH_DIR = "au_cch" # Relative path to au_cch folder | |
| SKY_CLEARNESS_CONSTANT = 5.535e-6 # Constant k for Perez model Sky Clearness Index | |
| # Location mapping for Australian climate projections | |
| LOCATION_MAPPING = { | |
| "24": {"city": "Canberra", "state": "ACT"}, | |
| "11": {"city": "Coffs Harbour", "state": "NSW"}, | |
| "17": {"city": "Sydney RO (Observatory Hill)", "state": "NSW"}, | |
| "56": {"city": "Mascot (Sydney Airport)", "state": "NSW"}, | |
| "77": {"city": "Parramatta", "state": "NSW"}, | |
| "78": {"city": "Sub-Alpine (Cooma Airport)", "state": "NSW"}, | |
| "79": {"city": "Blue Mountains", "state": "NSW"}, | |
| "1": {"city": "Darwin", "state": "NT"}, | |
| "6": {"city": "Alice Springs", "state": "NT"}, | |
| "5": {"city": "Townsville", "state": "QLD"}, | |
| "7": {"city": "Rockhampton", "state": "QLD"}, | |
| "10": {"city": "Brisbane", "state": "QLD"}, | |
| "19": {"city": "Charleville", "state": "QLD"}, | |
| "32": {"city": "Cairns", "state": "QLD"}, | |
| "70": {"city": "Toowoomba", "state": "QLD"}, | |
| "16": {"city": "Adelaide", "state": "SA"}, | |
| "75": {"city": "Adelaide Coastal (AMO)", "state": "SA"}, | |
| "26": {"city": "Hobart", "state": "TAS"}, | |
| "21": {"city": "Melbourne RO", "state": "VIC"}, | |
| "27": {"city": "Mildura", "state": "VIC"}, | |
| "60": {"city": "Tullamarine (Melbourne Airport)", "state": "VIC"}, | |
| "63": {"city": "Warrnambool", "state": "VIC"}, | |
| "66": {"city": "Ballarat", "state": "VIC"}, | |
| "30": {"city": "Wyndham", "state": "WA"}, | |
| "52": {"city": "Swanbourne", "state": "WA"}, | |
| "58": {"city": "Albany", "state": "WA"}, | |
| "83": {"city": "Christmas Island", "state": "WA"} | |
| } | |
| class ClimateDataManager: | |
| """Class for managing climate data from EPW files.""" | |
| def __init__(self): | |
| """Initialize climate data manager.""" | |
| pass | |
| def load_epw(self, uploaded_file, location_num: str = None, rcp: str = None, year: str = None) -> Dict[str, Any]: | |
| """ | |
| Parse an EPW file and extract climate data. | |
| Args: | |
| uploaded_file: The uploaded EPW file object or file content as string | |
| location_num: Location number for climate projection (optional) | |
| rcp: RCP scenario for climate projection (optional) | |
| year: Year for climate projection (optional) | |
| Returns: | |
| Dict containing parsed climate data | |
| """ | |
| try: | |
| # Read the EPW file | |
| if isinstance(uploaded_file, str): | |
| content = uploaded_file | |
| epw_filename = f"{location_num}_{rcp}_{year}.epw" | |
| else: | |
| content = uploaded_file.getvalue().decode('utf-8') | |
| epw_filename = uploaded_file.name | |
| lines = content.split('\n') | |
| # Extract header information (first 8 lines) | |
| header_lines = lines[:8] | |
| # Parse location data from line 1 | |
| location_data = header_lines[0].split(',') | |
| # Extract location information | |
| location = { | |
| "city": location_data[1].strip(), | |
| "state_province": location_data[2].strip(), | |
| "country": location_data[3].strip(), | |
| "source": location_data[4].strip(), | |
| "wmo": location_data[5].strip(), | |
| "latitude": float(location_data[6]), | |
| "longitude": float(location_data[7]), | |
| "timezone": float(location_data[8]), | |
| "elevation": float(location_data[9]) | |
| } | |
| # Override city and state from LOCATION_MAPPING if provided | |
| if location_num in LOCATION_MAPPING: | |
| location["city"] = LOCATION_MAPPING[location_num]["city"] | |
| location["state_province"] = LOCATION_MAPPING[location_num]["state"] | |
| # Parse TYPICAL/EXTREME PERIODS | |
| typical_extreme_periods = {} | |
| date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' | |
| for line in lines: | |
| if line.startswith("TYPICAL/EXTREME PERIODS"): | |
| parts = line.strip().split(',') | |
| try: | |
| num_periods = int(parts[1]) | |
| except ValueError: | |
| logger.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") | |
| break | |
| for i in range(num_periods): | |
| try: | |
| if len(parts) < 2 + i*4 + 4: | |
| logger.warning(f"Insufficient fields for period {i+1}, skipping.") | |
| continue | |
| period_name = parts[2 + i*4] | |
| period_type = parts[3 + i*4] | |
| start_date = parts[4 + i*4].strip() | |
| end_date = parts[5 + i*4].strip() | |
| if period_name in [ | |
| "Summer - Week Nearest Max Temperature For Period", | |
| "Summer - Week Nearest Average Temperature For Period", | |
| "Winter - Week Nearest Min Temperature For Period", | |
| "Winter - Week Nearest Average Temperature For Period" | |
| ]: | |
| season = 'summer' if 'Summer' in period_name else 'winter' | |
| period_type = 'extreme' if 'Max' in period_name or 'Min' in period_name else 'typical' | |
| key = f"{season}_{period_type}" | |
| start_date_clean = re.sub(r'\s+', '', start_date) | |
| end_date_clean = re.sub(r'\s+', '', end_date) | |
| if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): | |
| logger.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") | |
| continue | |
| start_month, start_day = map(int, start_date_clean.split('/')) | |
| end_month, end_day = map(int, end_date_clean.split('/')) | |
| typical_extreme_periods[key] = { | |
| "start": {"month": start_month, "day": start_day}, | |
| "end": {"month": end_month, "day": end_day} | |
| } | |
| except (IndexError, ValueError) as e: | |
| logger.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") | |
| continue | |
| break | |
| # Parse GROUND TEMPERATURES | |
| ground_temperatures = {} | |
| for line in lines: | |
| if line.startswith("GROUND TEMPERATURES"): | |
| parts = line.strip().split(',') | |
| try: | |
| num_depths = int(parts[1]) | |
| except ValueError: | |
| logger.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") | |
| break | |
| for i in range(num_depths): | |
| try: | |
| if len(parts) < 2 + i*16 + 16: | |
| logger.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") | |
| continue | |
| depth = parts[2 + i*16] | |
| temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] | |
| if len(temps) != 12: | |
| logger.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") | |
| continue | |
| ground_temperatures[depth] = temps | |
| except (ValueError, IndexError) as e: | |
| logger.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") | |
| continue | |
| break | |
| # Parse data rows (starting from line 9) | |
| data_lines = lines[8:] | |
| # Create a DataFrame from the data rows | |
| data = [] | |
| for line in data_lines: | |
| if line.strip(): # Skip empty lines | |
| data.append(line.split(',')) | |
| # Define core columns (common to both 32 and 35 column formats) | |
| core_columns = [ | |
| "year", "month", "day", "hour", "minute", "data_source", "dry_bulb_temp", | |
| "dew_point_temp", "relative_humidity", "atmospheric_pressure", "extraterrestrial_radiation", | |
| "extraterrestrial_radiation_normal", "horizontal_infrared_radiation", "global_horizontal_radiation", | |
| "direct_normal_radiation", "diffuse_horizontal_radiation", "global_horizontal_illuminance", | |
| "direct_normal_illuminance", "diffuse_horizontal_illuminance", "zenith_luminance", | |
| "wind_direction", "wind_speed", "total_sky_cover", "opaque_sky_cover", "visibility", | |
| "ceiling_height", "present_weather_observation", "present_weather_codes", | |
| "precipitable_water", "aerosol_optical_depth", "snow_depth", "days_since_last_snowfall" | |
| ] | |
| # Additional columns for 35-column format | |
| additional_columns = ["albedo", "liquid_precipitation_depth", "liquid_precipitation_quantity"] | |
| # Determine number of columns in data | |
| num_columns = len(data[0]) if data else 0 | |
| if num_columns not in [32, 35]: | |
| raise ValueError(f"Invalid number of columns in EPW file: {num_columns}. Expected 32 or 35 columns.") | |
| # Select appropriate columns based on file format | |
| columns = core_columns if num_columns == 32 else core_columns + additional_columns | |
| # Create DataFrame | |
| df = pd.DataFrame(data, columns=columns[:num_columns]) | |
| # Convert numeric columns | |
| numeric_columns = [ | |
| "dry_bulb_temp", "dew_point_temp", "relative_humidity", "atmospheric_pressure", | |
| "global_horizontal_radiation", "direct_normal_radiation", "diffuse_horizontal_radiation", | |
| "wind_direction", "wind_speed" | |
| ] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Calculate diffuse fraction | |
| df['diffuse_fraction'] = df.apply( | |
| lambda row: row['diffuse_horizontal_radiation'] / row['global_horizontal_radiation'] if row['global_horizontal_radiation'] > 0 else 0.0, axis=1 | |
| ) | |
| # Calculate design conditions | |
| design_conditions = self._calculate_design_conditions(df) | |
| # Process hourly data | |
| hourly_data = self._process_hourly_data(df) | |
| # Determine climate zone based on HDD and CDD | |
| climate_zone = self._determine_climate_zone( | |
| design_conditions["heating_degree_days"], | |
| design_conditions["cooling_degree_days"] | |
| ) | |
| # Create climate data dictionary | |
| climate_data = { | |
| "id": f"{location['city']}_{location['country']}_{rcp}_{year}".replace(" ", "_") if rcp and year else f"{location['city']}_{location['country']}".replace(" ", "_"), | |
| "location": location, | |
| "design_conditions": design_conditions, | |
| "climate_zone": climate_zone, | |
| "hourly_data": hourly_data, | |
| "epw_filename": epw_filename, | |
| "typical_extreme_periods": typical_extreme_periods, | |
| "ground_temperatures": ground_temperatures | |
| } | |
| logger.info(f"EPW file processed successfully: {epw_filename}") | |
| return climate_data | |
| except Exception as e: | |
| logger.error(f"Error processing EPW file: {str(e)}") | |
| raise ValueError(f"Error processing EPW file: {str(e)}") | |
| def _calculate_design_conditions(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Calculate design conditions from EPW data. | |
| Args: | |
| df: DataFrame containing EPW data | |
| Returns: | |
| Dict containing design conditions | |
| """ | |
| try: | |
| # Convert temperatures from C to K if needed | |
| temp_col = df["dry_bulb_temp"].astype(float) | |
| # Calculate design temperatures | |
| winter_design_temp = np.percentile(temp_col, 0.4) # 99.6% heating design temperature | |
| summer_design_temp_db = np.percentile(temp_col, 99.6) # 0.4% cooling design temperature | |
| # Calculate wet-bulb temperature | |
| rh_col = df["relative_humidity"].astype(float) | |
| wet_bulb_temp = self._calculate_wet_bulb(temp_col, rh_col) | |
| summer_design_temp_wb = np.percentile(wet_bulb_temp, 99.6) # 0.4% cooling wet-bulb temperature | |
| # Calculate degree days | |
| df["month"] = df["month"].astype(int) | |
| df["day"] = df["day"].astype(int) | |
| df["hour"] = df["hour"].astype(int) | |
| # Group by day and calculate average temperature | |
| df["date"] = pd.to_datetime(df[["year", "month", "day"]].astype(int)) | |
| daily_temps = df.groupby("date")["dry_bulb_temp"].mean() | |
| # Calculate heating and cooling degree days (base 18°C) | |
| heating_degree_days = sum(max(0, 18 - temp) for temp in daily_temps) | |
| cooling_degree_days = sum(max(0, temp - 18) for temp in daily_temps) | |
| # Calculate monthly average temperatures | |
| monthly_temps = df.groupby(df["month"])["dry_bulb_temp"].mean().tolist() | |
| # Calculate monthly average radiation | |
| monthly_radiation = df.groupby(df["month"])["global_horizontal_radiation"].mean().tolist() | |
| # Calculate summer daily temperature range | |
| latitude = df["latitude"].iloc[0] if "latitude" in df.columns else 0 | |
| if latitude >= 0: # Northern Hemisphere | |
| summer_months = [6, 7, 8] | |
| else: # Southern Hemisphere | |
| summer_months = [12, 1, 2] | |
| summer_data = df[df["month"].isin(summer_months)] | |
| summer_daily_range = 0 | |
| if not summer_data.empty: | |
| summer_daily_max = summer_data.groupby(["month", "day"])["dry_bulb_temp"].max() | |
| summer_daily_min = summer_data.groupby(["month", "day"])["dry_bulb_temp"].min() | |
| summer_daily_range = (summer_daily_max - summer_daily_min).mean() | |
| # Calculate mean wind speed and pressure | |
| wind_speed = df["wind_speed"].mean() | |
| pressure = df["atmospheric_pressure"].mean() | |
| return { | |
| "winter_design_temp": round(winter_design_temp, 1), | |
| "summer_design_temp_db": round(summer_design_temp_db, 1), | |
| "summer_design_temp_wb": round(summer_design_temp_wb, 1), | |
| "heating_degree_days": round(heating_degree_days), | |
| "cooling_degree_days": round(cooling_degree_days), | |
| "monthly_average_temps": [round(t, 1) for t in monthly_temps], | |
| "monthly_average_radiation": [round(r, 1) for r in monthly_radiation], | |
| "summer_daily_range": round(summer_daily_range, 1), | |
| "wind_speed": round(wind_speed, 1), | |
| "pressure": round(pressure) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating design conditions: {str(e)}") | |
| return { | |
| "winter_design_temp": 0.0, | |
| "summer_design_temp_db": 30.0, | |
| "summer_design_temp_wb": 25.0, | |
| "heating_degree_days": 0, | |
| "cooling_degree_days": 0, | |
| "monthly_average_temps": [20.0] * 12, | |
| "monthly_average_radiation": [150.0] * 12, | |
| "summer_daily_range": 8.0, | |
| "wind_speed": 3.0, | |
| "pressure": 101325.0 | |
| } | |
| def _calculate_dew_point(self, dry_bulb: float, relative_humidity: float, atmospheric_pressure: float) -> float: | |
| """ | |
| Calculate dew point temperature using August-Roche-Magnus formula. | |
| Args: | |
| dry_bulb: Dry bulb temperature in °C | |
| relative_humidity: Relative humidity in % | |
| atmospheric_pressure: Atmospheric pressure in Pa (not used in simplified formula) | |
| Returns: | |
| Dew point temperature in °C | |
| """ | |
| try: | |
| # Step 1: Calculate saturation vapor pressure (hPa) | |
| es = 6.1078 * 10 ** ((7.5 * dry_bulb) / (237.3 + dry_bulb)) | |
| es = es * 100 # Convert to Pa | |
| # Step 2: Calculate actual vapor pressure | |
| e = (relative_humidity * es) / 100 | |
| # Step 3: Calculate dew point temperature | |
| if e <= 0: | |
| logger.warning(f"Invalid vapor pressure {e} Pa, returning dry bulb temperature {dry_bulb}°C as dew point") | |
| return dry_bulb | |
| ln_term = math.log(e / 610.78) | |
| dew_point = (237.3 * ln_term) / (7.5 - ln_term) | |
| # Ensure dew point does not exceed dry bulb temperature | |
| dew_point = min(dew_point, dry_bulb) | |
| return round(dew_point, 1) | |
| except (ValueError, ZeroDivisionError) as e: | |
| logger.warning(f"Error calculating dew point: {str(e)}, returning dry bulb temperature {dry_bulb}°C") | |
| return dry_bulb | |
| def _calculate_sky_clearness_index(self, diffuse_horizontal: float, global_horizontal: float, direct_normal: float) -> Optional[float]: | |
| """ | |
| Calculate Sky Clearness Index using the Perez model. | |
| Args: | |
| diffuse_horizontal: Diffuse horizontal irradiance in W/m² | |
| global_horizontal: Global horizontal irradiance in W/m² | |
| direct_normal: Direct normal irradiance in W/m² | |
| Returns: | |
| Sky Clearness Index (dimensionless) or None if undefined (e.g., nighttime) | |
| """ | |
| try: | |
| # Handle nighttime or invalid data | |
| if global_horizontal <= 0 or diffuse_horizontal < 0 or direct_normal < 0: | |
| return None | |
| # Cap diffuse_horizontal to global_horizontal to handle potential measurement errors | |
| diffuse_horizontal = min(diffuse_horizontal, global_horizontal) | |
| # Calculate Sky Clearness Index using Perez model | |
| k = SKY_CLEARNESS_CONSTANT | |
| if global_horizontal == 0: | |
| return None # Avoid division by zero | |
| epsilon = ((diffuse_horizontal / global_horizontal) + (k * direct_normal)) / (1 + k) | |
| return round(epsilon, 3) | |
| except Exception as e: | |
| logger.warning(f"Error calculating Sky Clearness Index: {str(e)}, returning None") | |
| return None | |
| def _process_hourly_data(self, df: pd.DataFrame) -> List[Dict[str, Any]]: | |
| """ | |
| Process hourly data from EPW DataFrame, including dew point, Sky Clearness Index, diffuse fraction, and total sky cover. | |
| Args: | |
| df: DataFrame containing EPW data | |
| Returns: | |
| List of hourly data records | |
| """ | |
| hourly_data = [] | |
| try: | |
| # Ensure numeric columns | |
| numeric_columns = [ | |
| "dry_bulb_temp", "dew_point_temp", "relative_humidity", "atmospheric_pressure", | |
| "global_horizontal_radiation", "direct_normal_radiation", | |
| "diffuse_horizontal_radiation", "wind_speed", "wind_direction", "total_sky_cover", | |
| "diffuse_fraction" | |
| ] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Convert to integers for month, day, hour | |
| df["month"] = pd.to_numeric(df["month"], errors='coerce').astype('Int64') | |
| df["day"] = pd.to_numeric(df["day"], errors='coerce').astype('Int64') | |
| df["hour"] = pd.to_numeric(df["hour"], errors='coerce').astype('Int64') | |
| # Process each row | |
| for _, row in df.iterrows(): | |
| if pd.isna(row["month"]) or pd.isna(row["day"]) or pd.isna(row["hour"]) or pd.isna(row["dry_bulb_temp"]): | |
| continue # Skip rows with missing critical data | |
| # Calculate dew point temperature | |
| dry_bulb = float(row["dry_bulb_temp"]) if not pd.isna(row["dry_bulb_temp"]) else 20.0 | |
| relative_humidity = float(row["relative_humidity"]) if not pd.isna(row["relative_humidity"]) else 50.0 | |
| atmospheric_pressure = float(row["atmospheric_pressure"]) if not pd.isna(row["atmospheric_pressure"]) else 101325.0 | |
| dew_point = self._calculate_dew_point(dry_bulb, relative_humidity, atmospheric_pressure) | |
| # Calculate Sky Clearness Index | |
| diffuse_horizontal = float(row["diffuse_horizontal_radiation"]) if not pd.isna(row["diffuse_horizontal_radiation"]) else 0.0 | |
| global_horizontal = float(row["global_horizontal_radiation"]) if not pd.isna(row["global_horizontal_radiation"]) else 0.0 | |
| direct_normal = float(row["direct_normal_radiation"]) if not pd.isna(row["direct_normal_radiation"]) else 0.0 | |
| sky_clearness_index = self._calculate_sky_clearness_index(diffuse_horizontal, global_horizontal, direct_normal) | |
| # Extract total sky cover | |
| total_sky_cover = float(row["total_sky_cover"]) if not pd.isna(row["total_sky_cover"]) else None | |
| # Extract diffuse fraction | |
| diffuse_fraction = float(row["diffuse_fraction"]) if not pd.isna(row["diffuse_fraction"]) else 0.0 | |
| record = { | |
| "month": int(row["month"]), | |
| "day": int(row["day"]), | |
| "hour": int(row["hour"]), | |
| "dry_bulb": float(row["dry_bulb_temp"]) if not pd.isna(row["dry_bulb_temp"]) else 20.0, | |
| "dew_point": dew_point, | |
| "relative_humidity": float(row["relative_humidity"]) if not pd.isna(row["relative_humidity"]) else 50.0, | |
| "atmospheric_pressure": float(row["atmospheric_pressure"]) if not pd.isna(row["atmospheric_pressure"]) else 101325.0, | |
| "global_horizontal_radiation": float(row["global_horizontal_radiation"]) if not pd.isna(row["global_horizontal_radiation"]) else 0.0, | |
| "direct_normal_radiation": float(row["direct_normal_radiation"]) if not pd.isna(row["direct_normal_radiation"]) else 0.0, | |
| "diffuse_horizontal_radiation": float(row["diffuse_horizontal_radiation"]) if not pd.isna(row["diffuse_horizontal_radiation"]) else 0.0, | |
| "wind_speed": float(row["wind_speed"]) if not pd.isna(row["wind_speed"]) else 0.0, | |
| "wind_direction": float(row["wind_direction"]) if not pd.isna(row["wind_direction"]) else 0.0, | |
| "sky_clearness_index": sky_clearness_index if sky_clearness_index is not None else None, | |
| "total_sky_cover": total_sky_cover, | |
| "diffuse_fraction": diffuse_fraction | |
| } | |
| hourly_data.append(record) | |
| # Check if we have the expected number of records (8760 hours in a year) | |
| if len(hourly_data) < 8700: # Allow for some missing data | |
| logger.warning(f"Hourly data has {len(hourly_data)} records instead of 8760. Some records may be missing.") | |
| return hourly_data | |
| except Exception as e: | |
| logger.error(f"Error processing hourly data: {str(e)}") | |
| return [] | |
| def _determine_climate_zone(self, hdd: float, cdd: float) -> str: | |
| """ | |
| Determine ASHRAE climate zone based on heating and cooling degree days. | |
| Args: | |
| hdd: Heating degree days (base 18°C) | |
| cdd: Cooling degree days (base 18°C) | |
| Returns: | |
| ASHRAE climate zone designation | |
| """ | |
| if hdd >= 7000: | |
| return "8" | |
| elif hdd >= 5400: | |
| return "7" | |
| elif hdd >= 3900: | |
| return "6A" if cdd <= 450 else "6B" | |
| elif hdd >= 2700: | |
| return "5A" if cdd <= 900 else ("5B" if cdd <= 1800 else "5C") | |
| elif hdd >= 1800: | |
| return "4A" if cdd <= 1800 else ("4B" if cdd <= 2700 else "4C") | |
| elif hdd >= 900: | |
| return "3A" if cdd <= 2700 else ("3B" if cdd <= 3600 else "3C") | |
| elif hdd >= 0: | |
| return "2A" if cdd <= 3600 else "2B" | |
| else: | |
| return "1A" if cdd <= 4500 else "1B" | |
| def _calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray: | |
| """ | |
| Calculate wet-bulb temperature using a simplified formula. | |
| Args: | |
| dry_bulb: Dry-bulb temperature in °C | |
| relative_humidity: Relative humidity in % | |
| Returns: | |
| Wet-bulb temperature in °C | |
| """ | |
| wet_bulb = dry_bulb * np.arctan(0.151977 * np.sqrt(relative_humidity + 8.313659)) + \ | |
| np.arctan(dry_bulb + relative_humidity) - np.arctan(relative_humidity - 1.676331) + \ | |
| 0.00391838 * (relative_humidity)**(3/2) * np.arctan(0.023101 * relative_humidity) - 4.686035 | |
| wet_bulb = np.minimum(wet_bulb, dry_bulb) | |
| return wet_bulb | |
| def get_locations_by_state(self, state: str) -> List[Dict[str, str]]: | |
| """Get list of locations for a given state from LOCATION_MAPPING.""" | |
| return [ | |
| {"number": loc_num, "city": loc_info["city"]} | |
| for loc_num, loc_info in LOCATION_MAPPING.items() | |
| if loc_info["state"] == state | |
| ] | |
| def display_climate_page(): | |
| """ | |
| Display the climate data page. | |
| This is the main function called by main.py when the Climate Data page is selected. | |
| """ | |
| st.title("Climate Data and Design Requirements") | |
| # Notify if climate data exists in session state | |
| if "project_data" in st.session_state and "climate_data" in st.session_state.project_data and st.session_state.project_data["climate_data"]: | |
| climate_data = st.session_state.project_data["climate_data"] | |
| st.info( | |
| f"Climate data already extracted for {climate_data['location']['city']}, {climate_data['location']['country']}. " | |
| f"View details in the 'Climate Summary' tab or upload/select new data below." | |
| ) | |
| # Display help information in an expandable section | |
| with st.expander("Help & Information"): | |
| display_climate_help() | |
| # Initialize climate data manager | |
| climate_manager = ClimateDataManager() | |
| # Create tabs for different sections | |
| tab1, tab2 = st.tabs(["EPW Data Input", "Climate Summary"]) | |
| # EPW Data Input tab | |
| with tab1: | |
| st.subheader("Select Climate Data Source") | |
| # Option to choose data source | |
| data_source = st.radio( | |
| "Choose data source:", | |
| ["Upload EPW File", "Select Climate Projection"], | |
| key="data_source" | |
| ) | |
| if data_source == "Upload EPW File": | |
| # File uploader for EPW files | |
| uploaded_file = st.file_uploader( | |
| "Upload EPW File", | |
| type=["epw"], | |
| help="Upload an EnergyPlus Weather (EPW) file for your location." | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| with st.spinner("Processing EPW file..."): | |
| climate_data = climate_manager.load_epw(uploaded_file) | |
| # Store climate data in session state | |
| st.session_state.project_data["climate_data"] = climate_data | |
| st.success(f"EPW file processed successfully: {uploaded_file.name}") | |
| # Display basic location information | |
| location = climate_data["location"] | |
| st.subheader("Location Information") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write(f"**City:** {location['city']}") | |
| st.write(f"**State/Province:** {location['state_province']}") | |
| st.write(f"**Country:** {location['country']}") | |
| with col2: | |
| st.write(f"**Latitude:** {location['latitude']}°") | |
| st.write(f"**Longitude:** {location['longitude']}°") | |
| st.write(f"**Elevation:** {location['elevation']} m") | |
| st.write(f"**Time Zone:** {location['timezone']} hours (UTC)") | |
| st.button("View Climate Summary", on_click=lambda: st.session_state.update({"climate_tab": "Climate Summary"})) | |
| except Exception as e: | |
| st.error(f"Error processing EPW file: {str(e)}") | |
| logger.error(f"Error processing EPW file: {str(e)}") | |
| else: # Select Climate Projection | |
| st.markdown(""" | |
| ### Climate Projection | |
| Select from available Australian climate projection data based on CSIRO 2022 projections. | |
| """) | |
| # Dropdown menus for climate projection | |
| country = st.selectbox("Country", ["Australia"], key="projection_country") | |
| states = ["ACT", "NSW", "NT", "QLD", "SA", "TAS", "VIC", "WA"] | |
| state = st.selectbox("State", states, key="projection_state") | |
| locations = climate_manager.get_locations_by_state(state) | |
| location_options = [f"{loc['city']} ({loc['number']})" for loc in locations] | |
| location_display = st.selectbox("Location", location_options, key="location") | |
| location_num = "" | |
| if location_display: | |
| location_num = next(loc["number"] for loc in locations if f"{loc['city']} ({loc['number']})" == location_display) | |
| rcp_options = ["RCP2.6", "RCP4.5", "RCP8.5"] | |
| rcp = st.selectbox("RCP Scenario", rcp_options, key="rcp") | |
| year_options = ["2030", "2050", "2070", "2090"] | |
| year = st.selectbox("Year", year_options, key="year") | |
| if st.button("Extract Projection Data"): | |
| with st.spinner("Extracting climate projection data..."): | |
| file_path = os_join(AU_CCH_DIR, location_num, rcp, year) | |
| logger.debug(f"Attempting to access directory: {os.path.abspath(file_path)}") | |
| if not os.path.exists(file_path): | |
| st.error( | |
| f"No directory found at au_cch/{location_num}/{rcp}/{year}/. " | |
| f"Ensure the 'au_cch' folder is in the repository root with structure " | |
| f"au_cch/{location_num}/{rcp}/{year} (e.g., au_cch/1/RCP2.6/2070/) " | |
| f"containing a single .epw file." | |
| ) | |
| logger.error(f"Directory does not exist: {file_path}") | |
| else: | |
| try: | |
| epw_files = [f for f in os.listdir(file_path) if f.endswith(".epw")] | |
| if not epw_files: | |
| st.error( | |
| f"No EPW file found in au_cch/{location_num}/{rcp}/{year}/. " | |
| f"Ensure the directory contains a single .epw file." | |
| ) | |
| logger.error(f"No EPW file found in {file_path}") | |
| elif len(epw_files) > 1: | |
| st.error( | |
| f"Multiple EPW files found in au_cch/{location_num}/{rcp}/{year}/: {epw_files}. " | |
| f"Ensure exactly one .epw file per directory." | |
| ) | |
| logger.error(f"Multiple EPW files found: {epw_files}") | |
| else: | |
| epw_file_path = os_join(file_path, epw_files[0]) | |
| with open(epw_file_path, 'r') as f: | |
| epw_content = f.read() | |
| climate_data = climate_manager.load_epw(epw_content, location_num, rcp, year) | |
| st.session_state.project_data["climate_data"] = climate_data | |
| st.success( | |
| f"Successfully extracted climate projection data for " | |
| f"{climate_data['location']['city']}, {climate_data['location']['country']}, " | |
| f"{rcp}, {year}!" | |
| ) | |
| logger.info(f"Successfully processed projection: {climate_data['id']}") | |
| st.button("View Climate Summary", on_click=lambda: st.session_state.update({"climate_tab": "Climate Summary"})) | |
| except Exception as e: | |
| st.error(f"Error reading {epw_file_path}: {str(e)}") | |
| logger.error(f"Error reading {epw_file_path}: {str(e)}") | |
| # Climate Summary tab | |
| with tab2: | |
| if "project_data" in st.session_state and "climate_data" in st.session_state.project_data and st.session_state.project_data["climate_data"]: | |
| display_climate_summary(st.session_state.project_data["climate_data"]) | |
| else: | |
| st.info("Please upload an EPW file or select a climate projection in the 'EPW Data Input' tab to view climate summary.") | |
| # Navigation buttons | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Back to Building Information", key="back_to_building"): | |
| st.session_state.current_page = "Building Information" | |
| st.rerun() | |
| with col2: | |
| if st.button("Continue to Material Library", key="continue_to_material"): | |
| if "project_data" not in st.session_state or "climate_data" not in st.session_state.project_data or not st.session_state.project_data["climate_data"]: | |
| st.warning("Please upload an EPW file or select a climate projection before continuing.") | |
| else: | |
| st.session_state.current_page = "Material Library" | |
| st.rerun() | |
| def display_climate_summary(climate_data: Dict[str, Any]): | |
| """ | |
| Display climate summary information. | |
| Args: | |
| climate_data: Dictionary containing climate data | |
| """ | |
| st.subheader("Climate Summary") | |
| # Extract data | |
| design = climate_data["design_conditions"] | |
| location = climate_data["location"] | |
| # Location Details and Typical/Extreme Periods side by side | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### Location Details") | |
| st.markdown(f""" | |
| - **Country:** {location['country']} | |
| - **City:** {location['city']} | |
| - **State/Province:** {location['state_province']} | |
| - **Latitude:** {location['latitude']}° | |
| - **Longitude:** {location['longitude']}° | |
| - **Elevation:** {location['elevation']} m | |
| - **Time Zone:** {location['timezone']} hours (UTC) | |
| """) | |
| with col2: | |
| if climate_data.get("typical_extreme_periods"): | |
| st.markdown("### Typical/Extreme Periods") | |
| period_items = [ | |
| f"- **{key.replace('_', ' ').title()}:** {period['start']['month']}/{period['start']['day']} to {period['end']['month']}/{period['end']['day']}" | |
| for key, period in climate_data["typical_extreme_periods"].items() | |
| ] | |
| st.markdown("\n".join(period_items)) | |
| # Climate Zone | |
| st.markdown(f"### ASHRAE Climate Zone: {climate_data['climate_zone']}") | |
| # Design Conditions | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Design Temperatures") | |
| st.write(f"**Winter Design Temperature:** {design['winter_design_temp']}°C") | |
| st.write(f"**Summer Design Temperature (DB):** {design['summer_design_temp_db']}°C") | |
| st.write(f"**Summer Design Temperature (WB):** {design['summer_design_temp_wb']}°C") | |
| st.write(f"**Summer Daily Temperature Range:** {design['summer_daily_range']}°C") | |
| with col2: | |
| st.subheader("Degree Days") | |
| st.write(f"**Heating Degree Days (Base 18°C):** {design['heating_degree_days']}") | |
| st.write(f"**Cooling Degree Days (Base 18°C):** {design['cooling_degree_days']}") | |
| st.write(f"**Average Wind Speed:** {design['wind_speed']} m/s") | |
| st.write(f"**Average Atmospheric Pressure:** {design['pressure']} Pa") | |
| # Monthly Temperature Chart | |
| st.subheader("Monthly Average Temperatures") | |
| fig_temp = go.Figure() | |
| fig_temp.add_trace(go.Scatter( | |
| x=MONTHS, | |
| y=design["monthly_average_temps"], | |
| mode='lines+markers', | |
| name='Temperature', | |
| line=dict(color='firebrick', width=2), | |
| marker=dict(size=8) | |
| )) | |
| fig_temp.update_layout( | |
| xaxis_title="Month", | |
| yaxis_title="Temperature (°C)", | |
| height=400, | |
| margin=dict(l=20, r=20, t=30, b=20), | |
| ) | |
| st.plotly_chart(fig_temp, use_container_width=True) | |
| # Monthly Radiation Chart | |
| st.subheader("Monthly Average Solar Radiation") | |
| fig_rad = go.Figure() | |
| fig_rad.add_trace(go.Bar( | |
| x=MONTHS, | |
| y=design["monthly_average_radiation"], | |
| name='Global Horizontal Radiation', | |
| marker_color='gold' | |
| )) | |
| fig_rad.update_layout( | |
| xaxis_title="Month", | |
| yaxis_title="Radiation (W/m²)", | |
| height=400, | |
| margin=dict(l=20, r=20, t=30, b=20), | |
| ) | |
| st.plotly_chart(fig_rad, use_container_width=True) | |
| # Ground Temperatures | |
| if climate_data.get("ground_temperatures"): | |
| st.subheader("Ground Temperatures") | |
| table_data = [] | |
| for depth, temps in climate_data["ground_temperatures"].items(): | |
| row = {"Depth (m)": float(depth)} | |
| row.update({month: f"{temp:.2f}" for month, temp in zip(MONTHS, temps)}) | |
| table_data.append(row) | |
| df = pd.DataFrame(table_data) | |
| st.dataframe(df, use_container_width=True) | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Ground Temperatures as CSV", | |
| data=csv, | |
| file_name=f"ground_temperatures_{location['city']}_{location['country']}.csv", | |
| mime="text/csv", | |
| key=f"download_ground_temperatures_{climate_data['id']}" | |
| ) | |
| # Hourly Climate Data Table | |
| st.subheader("Hourly Climate Data") | |
| if "hourly_data" in climate_data and climate_data["hourly_data"]: | |
| hourly_table_data = [ | |
| { | |
| "Month": record["month"], | |
| "Day": record["day"], | |
| "Hour": record["hour"], | |
| "Dry Bulb Temp (°C)": f"{record['dry_bulb']:.1f}", | |
| "Dew Point Temp (°C)": f"{record['dew_point']:.1f}" if record['dew_point'] is not None else "N/A", | |
| "Relative Humidity (%)": f"{record['relative_humidity']:.1f}", | |
| "Atmospheric Pressure (Pa)": f"{record['atmospheric_pressure']:.1f}", | |
| "Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}", | |
| "Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}", | |
| "Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}", | |
| "Wind Speed (m/s)": f"{record['wind_speed']:.1f}", | |
| "Wind Direction (°)": f"{record['wind_direction']:.1f}", | |
| "Sky Clearness Index": f"{record['sky_clearness_index']:.3f}" if record['sky_clearness_index'] is not None else "N/A", | |
| "Total Sky Cover": f"{record['total_sky_cover']:.1f}" if record['total_sky_cover'] is not None else "N/A", | |
| "Diffuse Fraction": f"{record['diffuse_fraction']:.3f}" if record['diffuse_fraction'] is not None else "N/A" | |
| } | |
| for record in climate_data["hourly_data"] | |
| ] | |
| hourly_df = pd.DataFrame(hourly_table_data) | |
| st.dataframe(hourly_df, use_container_width=True) | |
| csv = hourly_df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Hourly Climate Data as CSV", | |
| data=csv, | |
| file_name=f"hourly_climate_data_{location['city']}_{location['country']}.csv", | |
| mime="text/csv", | |
| key=f"download_hourly_climate_{climate_data['id']}" | |
| ) | |
| # Hourly Data Statistics | |
| st.subheader("Hourly Data Statistics") | |
| hourly_count = len(climate_data["hourly_data"]) | |
| st.write(f"**Number of Hourly Records:** {hourly_count}") | |
| if hourly_count < 8760: | |
| st.warning(f"Expected 8760 hourly records for a full year, but found {hourly_count}. Some data may be missing.") | |
| else: | |
| st.warning("No hourly data available.") | |
| def display_climate_help(): | |
| """Display help information for the climate data page.""" | |
| st.markdown(""" | |
| ### Climate Data Help | |
| This section allows you to upload or select weather data for your location, which is essential for accurate calculations. | |
| **EPW Files:** | |
| EPW (EnergyPlus Weather) files contain hourly weather data for a specific location, including: | |
| * Dry-bulb temperature | |
| * Dew point temperature | |
| * Relative humidity | |
| * Solar radiation (direct and diffuse) | |
| * Wind speed and direction | |
| * Atmospheric pressure | |
| * Sky Clearness Index (calculated) | |
| * Diffuse Fraction (calculated) | |
| **Where to Find EPW Files:** | |
| * [EnergyPlus Weather Data](https://energyplus.net/weather) | |
| * [Climate.OneBuilding.Org](https://climate.onebuilding.org/) | |
| * [ASHRAE International Weather for Energy Calculations (IWEC)](https://www.ashrae.org/technical-resources/bookstore/ashrae-international-weather-files-for-energy-calculations-2-0-iwec2) | |
| **Climate Projections:** | |
| Select from predefined Australian climate projection data (CSIRO 2022) by choosing a location, RCP scenario, and future year. | |
| **Climate Summary:** | |
| After uploading an EPW file or selecting a climate projection, the Climate Summary tab will display: | |
| * Location details (including Time Zone) | |
| * ASHRAE Climate Zone | |
| * Design temperatures for heating and cooling | |
| * Heating and cooling degree days | |
| * Typical/Extreme periods | |
| * Ground temperatures by depth | |
| * Monthly average temperatures and solar radiation | |
| * Hourly data statistics with downloadable tables (including dew point, Sky Clearness Index, and Diffuse Fraction) | |
| This information will be used throughout the calculation process. | |
| """) |