""" ASHRAE 169 climate data module for HVAC Load Calculator. Extracts climate data from EPW files and provides visualizations inspired by Climate Consultant. Author: Dr Majed Abuseif Date: May 2025 Version: 2.1.6 """ from typing import Dict, List, Any, Optional import pandas as pd import numpy as np import os import json from dataclasses import dataclass import streamlit as st import plotly.graph_objects as go from io import StringIO import pvlib from datetime import datetime, timedelta import re import logging from os.path import join as os_join # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define paths at module level AU_CCH_DIR = "au_cch" # Relative path to au_cch folder from climate_data.py in data/ (e.g., au_cch/1/RCP2.6/2070/) # CSS for consistent formatting STYLE = """ """ # Location mapping from provided list LOCATION_MAPPING = { "24": {"city": "Canberra", "state": "ACT"}, "11": {"city": "Coffs Harbour", "state": "NSW"}, "17": {"city": "Sydney RO (Observatory Hill)", "state": "NSW"}, "56": {"city": "Mascot (Sydney Airport)", "state": "NSW"}, "77": {"city": "Parramatta", "state": "NSW"}, "78": {"city": "Sub-Alpine (Cooma Airport)", "state": "NSW"}, "79": {"city": "Blue Mountains", "state": "NSW"}, "1": {"city": "Darwin", "state": "NT"}, "6": {"city": "Alice Springs", "state": "NT"}, "5": {"city": "Townsville", "state": "QLD"}, "7": {"city": "Rockhampton", "state": "QLD"}, "10": {"city": "Brisbane", "state": "QLD"}, "19": {"city": "Charleville", "state": "QLD"}, "32": {"city": "Cairns", "state": "QLD"}, "70": {"city": "Toowoomba", "state": "QLD"}, "16": {"city": "Adelaide", "state": "SA"}, "75": {"city": "Adelaide Coastal (AMO)", "state": "SA"}, "26": {"city": "Hobart", "state": "TAS"}, "21": {"city": "Melbourne RO", "state": "VIC"}, "27": {"city": "Mildura", "state": "VIC"}, "60": {"city": "Tullamarine (Melbourne Airport)", "state": "VIC"}, "63": {"city": "Warrnambool", "state": "VIC"}, "66": {"city": "Ballarat", "state": "VIC"}, "30": {"city": "Wyndham", "state": "WA"}, "52": {"city": "Swanbourne", "state": "WA"}, "58": {"city": "Albany", "state": "WA"}, "83": {"city": "Christmas Island", "state": "WA"} } @dataclass class ClimateLocation: """Class representing a climate location with ASHRAE 169 data derived from EPW files.""" id: str country: str state_province: str city: str latitude: float longitude: float elevation: float # meters time_zone: float # UTC offset in hours climate_zone: str heating_degree_days: float # base 18°C cooling_degree_days: float # base 18°C winter_design_temp: float # 99.6% heating design temperature (°C) summer_design_temp_db: float # 0.4% cooling design dry-bulb temperature (°C) summer_design_temp_wb: float # 0.4% cooling design wet-bulb temperature (°C) summer_daily_range: float # Mean daily temperature range in summer (°C) wind_speed: float # Mean wind speed (m/s) pressure: float # Mean atmospheric pressure (Pa) hourly_data: List[Dict] # Hourly data for integration with main.py typical_extreme_periods: Dict[str, Dict] # Typical/extreme periods (summer/winter) ground_temperatures: Dict[str, List[float]] # Monthly ground temperatures by depth def __init__(self, epw_file: pd.DataFrame, typical_extreme_periods: Dict, ground_temperatures: Dict, **kwargs): """Initialize ClimateLocation with EPW file data and header information.""" self.id = kwargs.get("id") self.country = kwargs.get("country") self.state_province = kwargs.get("state_province", "N/A") self.city = kwargs.get("city") self.latitude = kwargs.get("latitude") self.longitude = kwargs.get("longitude") self.elevation = kwargs.get("elevation") self.time_zone = kwargs.get("time_zone", 0.0) # Default to 0.0 if not provided self.climate_zone = kwargs.get("climate_zone", "Unknown") # Use provided climate_zone self.typical_extreme_periods = typical_extreme_periods self.ground_temperatures = ground_temperatures # Extract columns from EPW data months = pd.to_numeric(epw_file[1], errors='coerce').values days = pd.to_numeric(epw_file[2], errors='coerce').values hours = pd.to_numeric(epw_file[3], errors='coerce').values dry_bulb = pd.to_numeric(epw_file[6], errors='coerce').values humidity = pd.to_numeric(epw_file[8], errors='coerce').values pressure = pd.to_numeric(epw_file[9], errors='coerce').values global_radiation = pd.to_numeric(epw_file[13], errors='coerce').values direct_normal_radiation = pd.to_numeric(epw_file[14], errors='coerce').values diffuse_horizontal_radiation = pd.to_numeric(epw_file[15], errors='coerce').values wind_direction = pd.to_numeric(epw_file[20], errors='coerce').values wind_speed = pd.to_numeric(epw_file[21], errors='coerce').values # Filter wind speed outliers and log high values wind_speed = wind_speed[wind_speed <= 50] # Remove extreme outliers if (wind_speed > 15).any(): logger.warning(f"High wind speeds detected: {wind_speed[wind_speed > 15].tolist()}") # Calculate wet-bulb temperature wet_bulb = ClimateData.calculate_wet_bulb(dry_bulb, humidity) # Calculate design conditions self.winter_design_temp = round(np.nanpercentile(dry_bulb, 0.4), 1) self.summer_design_temp_db = round(np.nanpercentile(dry_bulb, 99.6), 1) self.summer_design_temp_wb = round(np.nanpercentile(wet_bulb, 99.6), 1) # Calculate degree days daily_temps = np.nanmean(dry_bulb.reshape(-1, 24), axis=1) self.heating_degree_days = round(np.nansum(np.maximum(18 - daily_temps, 0))) self.cooling_degree_days = round(np.nansum(np.maximum(daily_temps - 18, 0))) # Calculate summer daily temperature range (June–August, Southern Hemisphere) summer_mask = (months >= 6) & (months <= 8) summer_temps = dry_bulb[summer_mask].reshape(-1, 24) self.summer_daily_range = round(np.nanmean(np.nanmax(summer_temps, axis=1) - np.nanmin(summer_temps, axis=1)), 1) # Calculate mean wind speed and pressure self.wind_speed = round(np.nanmean(wind_speed), 1) self.pressure = round(np.nanmean(pressure), 1) # Log wind speed diagnostics logger.info(f"Wind speed stats: min={wind_speed.min():.1f}, max={wind_speed.max():.1f}, mean={self.wind_speed:.1f}") # Store hourly data with enhanced fields self.hourly_data = [] for i in range(len(months)): if np.isnan(months[i]) or np.isnan(days[i]) or np.isnan(hours[i]) or np.isnan(dry_bulb[i]): continue # Skip records with missing critical fields record = { "month": int(months[i]), "day": int(days[i]), "hour": int(hours[i]), "dry_bulb": float(dry_bulb[i]), "relative_humidity": float(humidity[i]) if not np.isnan(humidity[i]) else 0.0, "atmospheric_pressure": float(pressure[i]) if not np.isnan(pressure[i]) else self.pressure, "global_horizontal_radiation": float(global_radiation[i]) if not np.isnan(global_radiation[i]) else 0.0, "direct_normal_radiation": float(direct_normal_radiation[i]) if not np.isnan(direct_normal_radiation[i]) else 0.0, "diffuse_horizontal_radiation": float(diffuse_horizontal_radiation[i]) if not np.isnan(diffuse_horizontal_radiation[i]) else 0.0, "wind_speed": float(wind_speed[i]) if not np.isnan(wind_speed[i]) else 0.0, "wind_direction": float(wind_direction[i]) if not np.isnan(wind_direction[i]) else 0.0 } self.hourly_data.append(record) if len(self.hourly_data) != 8760: st.warning(f"Hourly data has {len(self.hourly_data)} records instead of 8760. Some records may have been excluded due to missing data.") def to_dict(self) -> Dict[str, Any]: """Convert the climate location to a dictionary.""" return { "id": self.id, "country": self.country, "state_province": self.state_province, "city": self.city, "latitude": self.latitude, "longitude": self.longitude, "elevation": self.elevation, "time_zone": self.time_zone, "climate_zone": self.climate_zone, "heating_degree_days": self.heating_degree_days, "cooling_degree_days": self.cooling_degree_days, "winter_design_temp": self.winter_design_temp, "summer_design_temp_db": self.summer_design_temp_db, "summer_design_temp_wb": self.summer_design_temp_wb, "summer_daily_range": self.summer_daily_range, "wind_speed": self.wind_speed, "pressure": self.pressure, "hourly_data": self.hourly_data, "typical_extreme_periods": self.typical_extreme_periods, "ground_temperatures": self.ground_temperatures } class ClimateData: """Class for managing ASHRAE 169 climate data from EPW files.""" def __init__(self): """Initialize climate data.""" self.locations = {} self.countries = [] self.country_states = {} def add_location(self, location: ClimateLocation): """Add a new location to the dictionary.""" self.locations[location.id] = location self.countries = sorted(list(set(loc.country for loc in self.locations.values()))) self.country_states = self._group_locations_by_country_state() def _group_locations_by_country_state(self) -> Dict[str, Dict[str, List[str]]]: """Group locations by country and state/province.""" result = {} for loc in self.locations.values(): if loc.country not in result: result[loc.country] = {} if loc.state_province not in result[loc.country]: result[loc.country][loc.state_province] = [] result[loc.country][loc.state_province].append(loc.city) for country in result: for state in result[country]: result[country][state] = sorted(result[country][state]) return result def get_location_by_id(self, location_id: str, session_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Retrieve climate data by ID from session state or locations.""" if "climate_data" in session_state and session_state["climate_data"].get("id") == location_id: return session_state["climate_data"] if location_id in self.locations: return self.locations[location_id].to_dict() return None @staticmethod def validate_climate_data(data: Dict[str, Any]) -> bool: """Validate climate data for required fields and ranges.""" required_fields = [ "id", "country", "city", "latitude", "longitude", "elevation", "time_zone", "climate_zone", "heating_degree_days", "cooling_degree_days", "winter_design_temp", "summer_design_temp_db", "summer_design_temp_wb", "summer_daily_range", "wind_speed", "pressure", "hourly_data" ] for field in required_fields: if field not in data: st.error(f"Validation failed: Missing required field '{field}'") logger.warning(f"Validation failed: Missing field '{field}'") return False if not (-90 <= data["latitude"] <= 90 and -180 <= data["longitude"] <= 180): st.error("Validation failed: Invalid latitude or longitude") logger.warning("Validation failed: Invalid latitude or longitude") return False if data["elevation"] < 0: st.error("Validation failed: Negative elevation") logger.warning("Validation failed: Negative elevation") return False if not (-12 <= data["time_zone"] <= 14): st.error(f"Validation failed: Time zone {data['time_zone']} outside range (-12 to +14)") logger.warning(f"Validation failed: Time zone {data['time_zone']} outside range") return False if data["climate_zone"] not in ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"]: st.error(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") logger.warning(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") return False if not (data["heating_degree_days"] >= 0 and data["cooling_degree_days"] >= 0): st.error("Validation failed: Negative degree days") logger.warning("Validation failed: Negative degree days") return False if not (-50 <= data["winter_design_temp"] <= 20): st.error(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") logger.warning(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") return False if not (0 <= data["summer_design_temp_db"] <= 50 and 0 <= data["summer_design_temp_wb"] <= 40): st.error("Validation failed: Invalid summer design temperatures") logger.warning("Validation failed: Invalid summer design temperatures") return False if data["summer_daily_range"] < 0: st.error("Validation failed: Negative summer daily range") logger.warning("Validation failed: Negative summer daily range") return False if not (0 <= data["wind_speed"] <= 30): st.error(f"Validation failed: Wind speed {data['wind_speed']} outside range") logger.warning(f"Validation failed: Wind speed {data['wind_speed']} outside range") return False if not (80000 <= data["pressure"] <= 110000): st.error(f"Validation failed: Pressure {data['pressure']} outside range") logger.warning(f"Validation failed: Pressure {data['pressure']} outside range") return False if not data["hourly_data"] or len(data["hourly_data"]) < 8700: st.error(f"Validation failed: Hourly data has {len(data['hourly_data'])} records, expected ~8760") logger.warning(f"Validation failed: Hourly data has {len(data['hourly_data'])} records") return False for record in data["hourly_data"]: if not (1 <= record["month"] <= 12): st.error(f"Validation failed: Invalid month {record['month']}") logger.warning(f"Validation failed: Invalid month {record['month']}") return False if not (1 <= record["day"] <= 31): st.error(f"Validation failed: Invalid day {record['day']}") logger.warning(f"Validation failed: Invalid day {record['day']}") return False if not (1 <= record["hour"] <= 24): st.error(f"Validation failed: Invalid hour {record['hour']}") logger.warning(f"Validation failed: Invalid hour {record['hour']}") return False if not (-50 <= record["dry_bulb"] <= 50): st.error(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") logger.warning(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") return False if not (0 <= record["relative_humidity"] <= 100): st.error(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") logger.warning(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") return False if not (80000 <= record["atmospheric_pressure"] <= 110000): st.error(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") logger.warning(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") return False if not (0 <= record["global_horizontal_radiation"] <= 1200): st.error(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") logger.warning(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") return False if not (0 <= record["direct_normal_radiation"] <= 1200): st.error(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") logger.warning(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") return False if not (0 <= record["diffuse_horizontal_radiation"] <= 1200): st.error(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") logger.warning(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") return False if not (0 <= record["wind_speed"] <= 30): st.error(f"Validation failed: Wind speed {record['wind_speed']} outside range") logger.warning(f"Validation failed: Wind speed {record['wind_speed']} outside range") return False if not (0 <= record["wind_direction"] <= 360): st.error(f"Validation failed: Wind direction {record['wind_direction']} outside range") logger.warning(f"Validation failed: Wind direction {record['wind_direction']} outside range") return False # Validate typical/extreme periods (optional) if "typical_extreme_periods" in data and data["typical_extreme_periods"]: expected_periods = ["summer_extreme", "summer_typical", "winter_extreme", "winter_typical"] missing_periods = [p for p in expected_periods if p not in data["typical_extreme_periods"]] if missing_periods: st.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") logger.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") for period in data["typical_extreme_periods"].values(): for date in ["start", "end"]: if not (1 <= period[date]["month"] <= 12 and 1 <= period[date]["day"] <= 31): st.error(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") logger.warning(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") return False # Validate ground temperatures (optional) if "ground_temperatures" in data and data["ground_temperatures"]: for depth, temps in data["ground_temperatures"].items(): if len(temps) != 12 or not all(0 <= t <= 50 for t in temps): st.error(f"Validation failed: Invalid ground temperatures for depth {depth}") logger.warning(f"Validation failed: Invalid ground temperatures for depth {depth}") return False return True @staticmethod def calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray: """Calculate Wet Bulb Temperature using Stull (2011) approximation.""" db = np.array(dry_bulb, dtype=float) rh = np.array(relative_humidity, dtype=float) term1 = db * np.arctan(0.151977 * (rh + 8.313659)**0.5) term2 = np.arctan(db + rh) term3 = np.arctan(rh - 1.676331) term4 = 0.00391838 * rh**1.5 * np.arctan(0.023101 * rh) term5 = -4.686035 wet_bulb = term1 + term2 - term3 + term4 + term5 invalid_mask = (rh < 5) | (rh > 99) | (db < -20) | (db > 50) | np.isnan(db) | np.isnan(rh) wet_bulb[invalid_mask] = np.nan return wet_bulb @staticmethod def is_numeric(value: str) -> bool: """Check if a string can be converted to a number.""" try: float(value) return True except ValueError: return False def get_locations_by_state(self, state: str) -> List[Dict[str, str]]: """Get list of locations for a given state from LOCATION_MAPPING.""" return [ {"number": loc_num, "city": loc_info["city"]} for loc_num, loc_info in LOCATION_MAPPING.items() if loc_info["state"] == state ] def process_epw_file(self, epw_content: str, location_num: str, rcp: str, year: str) -> Optional[ClimateLocation]: """Process an EPW file content and return a ClimateLocation object.""" try: epw_lines = epw_content.splitlines() # Parse header header = next(line for line in epw_lines if line.startswith("LOCATION")) header_parts = header.split(",") if len(header_parts) < 10: raise ValueError("Invalid LOCATION header: too few fields.") city = header_parts[1].strip() or "Unknown" city = re.sub(r'\..*', '', city) # Clean city name state_province = header_parts[2].strip() or "Unknown" country = header_parts[3].strip() or "Unknown" latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0 longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0 time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0 elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0 logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s", city, country, latitude, longitude, time_zone, elevation) # Override city and state from LOCATION_MAPPING if location_num in LOCATION_MAPPING: city = LOCATION_MAPPING[location_num]["city"] state_province = LOCATION_MAPPING[location_num]["state"] # Parse TYPICAL/EXTREME PERIODS typical_extreme_periods = {} date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' for line in epw_lines: if line.startswith("TYPICAL/EXTREME PERIODS"): parts = line.strip().split(',') try: num_periods = int(parts[1]) except ValueError: st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") break for i in range(num_periods): try: if len(parts) < 2 + i*4 + 4: st.warning(f"Insufficient fields for period {i+1}, skipping.") continue period_name = parts[2 + i*4] period_type = parts[3 + i*4] start_date = parts[4 + i*4].strip() end_date = parts[5 + i*4].strip() if period_name in [ "Summer - Week Nearest Max Temperature For Period", "Summer - Week Nearest Average Temperature For Period", "Winter - Week Nearest Min Temperature For Period", "Winter - Week Nearest Average Temperature For Period" ]: season = 'summer' if 'Summer' in period_name else 'winter' period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') key = f"{season}_{period_type}" start_date_clean = re.sub(r'\s+', '', start_date) end_date_clean = re.sub(r'\s+', '', end_date) if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") continue start_month, start_day = map(int, start_date_clean.split('/')) end_month, end_day = map(int, end_date_clean.split('/')) typical_extreme_periods[key] = { "start": {"month": start_month, "day": start_day}, "end": {"month": end_month, "day": end_day} } except (IndexError, ValueError) as e: st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") continue break # Parse GROUND TEMPERATURES ground_temperatures = {} for line in epw_lines: if line.startswith("GROUND TEMPERATURES"): parts = line.strip().split(',') try: num_depths = int(parts[1]) except ValueError: st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") break for i in range(num_depths): try: if len(parts) < 2 + i*16 + 16: st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") continue depth = parts[2 + i*16] temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] if len(temps) != 12: st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") continue ground_temperatures[depth] = temps except (ValueError, IndexError) as e: st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") continue break # Read data section data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) if len(epw_data) != 8760: raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") if len(epw_data.columns) not in [32, 35]: raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') if epw_data[col].isna().all(): raise ValueError(f"Column {col} contains only non-numeric or missing data.") # Calculate average humidity for climate zone assignment humidity = pd.to_numeric(epw_data[8], errors='coerce').values avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0 logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country) # Create ClimateLocation location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=typical_extreme_periods, ground_temperatures=ground_temperatures, id=f"{country[:1].upper()}{city[:3].upper()}_{rcp}_{year}", country=country, state_province=state_province, city=city, latitude=latitude, longitude=longitude, elevation=elevation, time_zone=time_zone ) # Assign climate zone try: climate_zone = self.assign_climate_zone( hdd=location.heating_degree_days, cdd=location.cooling_degree_days, avg_humidity=avg_humidity ) location.climate_zone = climate_zone logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country) except Exception as e: st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.") logger.error("Climate zone assignment error: %s", str(e)) location.climate_zone = "Unknown" return location except Exception as e: st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") logger.error(f"EPW processing error: %s", str(e)) return None def display_climate_input(self, session_state: Dict[str, Any]): """Display Streamlit interface for EPW upload and visualizations.""" st.title("Climate Data Analysis") # Apply consistent styling st.markdown(STYLE, unsafe_allow_html=True) # Clear invalid session_state["climate_data"] to prevent validation errors if "climate_data" in session_state and not all(key in session_state["climate_data"] for key in ["id", "country", "city"]): logger.warning("Invalid climate_data in session_state, clearing: %s", session_state["climate_data"]) session_state["climate_data"] = {} # Initialize active tab in session_state if "active_tab" not in session_state: session_state["active_tab"] = "General Information" # Define tabs, including new Climate Projection tab tab_names = [ "General Information", "Climate Projection", "Psychrometric Chart", "Sun Shading Chart", "Temperature Range", "Wind Rose" ] tabs = st.tabs(tab_names) # Initialize location and epw_data for display location = None epw_data = None # General Information tab: Handle EPW upload and display existing data with tabs[0]: uploaded_file = st.file_uploader("Upload EPW File", type=["epw"]) if uploaded_file: with st.spinner("Processing uploaded EPW file..."): try: # Process new EPW file epw_content = uploaded_file.read().decode("utf-8") epw_lines = epw_content.splitlines() # Parse header header = next(line for line in epw_lines if line.startswith("LOCATION")) header_parts = header.split(",") if len(header_parts) < 10: raise ValueError("Invalid LOCATION header: too few fields.") city = header_parts[1].strip() or "Unknown" city = re.sub(r'\..*', '', city) state_province = header_parts[2].strip() or "Unknown" country = header_parts[3].strip() or "Unknown" latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0 longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0 time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0 elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0 logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s", city, country, latitude, longitude, time_zone, elevation) # Parse TYPICAL/EXTREME PERIODS typical_extreme_periods = {} date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' for line in epw_lines: if line.startswith("TYPICAL/EXTREME PERIODS"): parts = line.strip().split(',') try: num_periods = int(parts[1]) except ValueError: st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") break for i in range(num_periods): try: if len(parts) < 2 + i*4 + 4: st.warning(f"Insufficient fields for period {i+1}, skipping.") continue period_name = parts[2 + i*4] period_type = parts[3 + i*4] start_date = parts[4 + i*4].strip() end_date = parts[5 + i*4].strip() if period_name in [ "Summer - Week Nearest Max Temperature For Period", "Summer - Week Nearest Average Temperature For Period", "Winter - Week Nearest Min Temperature For Period", "Winter - Week Nearest Average Temperature For Period" ]: season = 'summer' if 'Summer' in period_name else 'winter' period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') key = f"{season}_{period_type}" start_date_clean = re.sub(r'\s+', '', start_date) end_date_clean = re.sub(r'\s+', '', end_date) if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") continue start_month, start_day = map(int, start_date_clean.split('/')) end_month, end_day = map(int, end_date_clean.split('/')) typical_extreme_periods[key] = { "start": {"month": start_month, "day": start_day}, "end": {"month": end_month, "day": end_day} } except (IndexError, ValueError) as e: st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") continue break # Parse GROUND TEMPERATURES ground_temperatures = {} for line in epw_lines: if line.startswith("GROUND TEMPERATURES"): parts = line.strip().split(',') try: num_depths = int(parts[1]) except ValueError: st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") break for i in range(num_depths): try: if len(parts) < 2 + i*16 + 16: st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") continue depth = parts[2 + i*16] temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] if len(temps) != 12: st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") continue ground_temperatures[depth] = temps except (ValueError, IndexError) as e: st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") continue break # Read data section data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) if len(epw_data) != 8760: raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") if len(epw_data.columns) != 35: raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') if epw_data[col].isna().all(): raise ValueError(f"Column {col} contains only non-numeric or missing data.") # Calculate average humidity for climate zone assignment humidity = pd.to_numeric(epw_data[8], errors='coerce').values avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0 logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country) # Create ClimateLocation with consistent ID location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=typical_extreme_periods, ground_temperatures=ground_temperatures, id=f"{country[:1].upper()}{city[:3].upper()}_UPLOAD", country=country, state_province=state_province, city=city, latitude=latitude, longitude=longitude, elevation=elevation, time_zone=time_zone ) # Assign climate zone try: climate_zone = self.assign_climate_zone( hdd=location.heating_degree_days, cdd=location.cooling_degree_days, avg_humidity=avg_humidity ) location.climate_zone = climate_zone logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country) except Exception as e: st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.") logger.error("Climate zone assignment error: %s", str(e)) location.climate_zone = "Unknown" self.add_location(location) climate_data_dict = location.to_dict() session_state["climate_data"] = climate_data_dict if not self.validate_climate_data(climate_data_dict): st.warning(f"Climate data validation failed for {city}, {country}. Displaying data anyway.") logger.warning("Validation failed for new EPW data: %s", climate_data_dict["id"]) st.success("Climate data extracted from EPW file!") logger.info("Successfully processed EPW file and stored in session_state: %s", climate_data_dict["id"]) session_state["active_tab"] = "General Information" except Exception as e: st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") logger.error(f"EPW processing error: %s", str(e)) session_state["climate_data"] = {} elif "climate_data" in session_state and session_state["climate_data"]: # Reconstruct from session_state climate_data_dict = session_state["climate_data"] logger.info("Attempting to reconstruct climate data from session_state: %s", climate_data_dict.get("id", "Unknown")) required_keys = ["id", "country", "city", "latitude", "longitude", "elevation", "time_zone", "climate_zone", "hourly_data"] missing_keys = [key for key in required_keys if key not in climate_data_dict] if missing_keys: st.warning(f"Invalid climate data in session state, missing keys: {', '.join(missing_keys)}. Please upload a new EPW file.") logger.warning("Missing keys in session_state.climate_data: %s", missing_keys) session_state["climate_data"] = {} else: if not self.validate_climate_data(climate_data_dict): st.warning(f"Stored climate data validation failed for {climate_data_dict.get('city', 'Unknown')}, {climate_data_dict.get('country', 'Unknown')}. Displaying data anyway.") logger.warning("Validation failed for session_state.climate_data: %s", climate_data_dict.get("id", "Unknown")) try: # Rebuild epw_data from hourly_data hourly_data = climate_data_dict["hourly_data"] epw_data = pd.DataFrame(np.nan, index=range(len(hourly_data)), columns=range(35)) epw_data[1] = [d["month"] for d in hourly_data] epw_data[2] = [d["day"] for d in hourly_data] epw_data[3] = [d["hour"] for d in hourly_data] epw_data[6] = [d["dry_bulb"] for d in hourly_data] epw_data[8] = [d["relative_humidity"] for d in hourly_data] epw_data[9] = [d["atmospheric_pressure"] for d in hourly_data] epw_data[13] = [d["global_horizontal_radiation"] for d in hourly_data] epw_data[14] = [d["direct_normal_radiation"] for d in hourly_data] epw_data[15] = [d["diffuse_horizontal_radiation"] for d in hourly_data] epw_data[20] = [d["wind_direction"] for d in hourly_data] epw_data[21] = [d["wind_speed"] for d in hourly_data] # Create ClimateLocation location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=climate_data_dict.get("typical_extreme_periods", {}), ground_temperatures=climate_data_dict.get("ground_temperatures", {}), id=climate_data_dict["id"], country=climate_data_dict["country"], state_province=climate_data_dict.get("state_province", "N/A"), city=climate_data_dict["city"], latitude=climate_data_dict["latitude"], longitude=climate_data_dict["longitude"], elevation=climate_data_dict["elevation"], time_zone=climate_data_dict["time_zone"], climate_zone=climate_data_dict["climate_zone"] ) location.hourly_data = climate_data_dict["hourly_data"] self.add_location(location) st.info(f"Displaying previously extracted climate data for {climate_data_dict['city']}, {climate_data_dict['country']}.") logger.info("Successfully reconstructed climate data from session_state: %s", climate_data_dict["id"]) except Exception as e: st.error(f"Error reconstructing climate data: {str(e)}. Please upload a new EPW file.") logger.error(f"Reconstruction error: %s", str(e)) session_state["climate_data"] = {} # Display data if available if location is not None and epw_data is not None: self.display_design_conditions(location) # Climate Projection tab with tabs[1]: st.markdown("""
At this stage, this section is focused on some locations in Australia, and the provided data is based on "Projected weather files for building energy modelling" from CSIRO 2022.