""" ASHRAE 169 climate data module for HVAC Load Calculator. Extracts climate data from EPW files and provides visualizations inspired by Climate Consultant. Author: Dr Majed Abuseif Date: May 2025 Version: 2.1.6 """ from typing import Dict, List, Any, Optional import pandas as pd import numpy as np import os import json from dataclasses import dataclass import streamlit as st import plotly.graph_objects as go from io import StringIO import pvlib from datetime import datetime, timedelta import re import logging from os.path import join as os_join # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define paths at module level AU_CCH_DIR = "au_cch" # Relative path to au_cch folder from climate_data.py in data/ (e.g., au_cch/1/RCP2.6/2070/) # CSS for consistent formatting STYLE = """ """ # Location mapping from provided list LOCATION_MAPPING = { "24": {"city": "Canberra", "state": "ACT"}, "11": {"city": "Coffs Harbour", "state": "NSW"}, "17": {"city": "Sydney RO (Observatory Hill)", "state": "NSW"}, "56": {"city": "Mascot (Sydney Airport)", "state": "NSW"}, "77": {"city": "Parramatta", "state": "NSW"}, "78": {"city": "Sub-Alpine (Cooma Airport)", "state": "NSW"}, "79": {"city": "Blue Mountains", "state": "NSW"}, "1": {"city": "Darwin", "state": "NT"}, "6": {"city": "Alice Springs", "state": "NT"}, "5": {"city": "Townsville", "state": "QLD"}, "7": {"city": "Rockhampton", "state": "QLD"}, "10": {"city": "Brisbane", "state": "QLD"}, "19": {"city": "Charleville", "state": "QLD"}, "32": {"city": "Cairns", "state": "QLD"}, "70": {"city": "Toowoomba", "state": "QLD"}, "16": {"city": "Adelaide", "state": "SA"}, "75": {"city": "Adelaide Coastal (AMO)", "state": "SA"}, "26": {"city": "Hobart", "state": "TAS"}, "21": {"city": "Melbourne RO", "state": "VIC"}, "27": {"city": "Mildura", "state": "VIC"}, "60": {"city": "Tullamarine (Melbourne Airport)", "state": "VIC"}, "63": {"city": "Warrnambool", "state": "VIC"}, "66": {"city": "Ballarat", "state": "VIC"}, "30": {"city": "Wyndham", "state": "WA"}, "52": {"city": "Swanbourne", "state": "WA"}, "58": {"city": "Albany", "state": "WA"}, "83": {"city": "Christmas Island", "state": "WA"} } @dataclass class ClimateLocation: """Class representing a climate location with ASHRAE 169 data derived from EPW files.""" id: str country: str state_province: str city: str latitude: float longitude: float elevation: float # meters time_zone: float # UTC offset in hours climate_zone: str heating_degree_days: float # base 18°C cooling_degree_days: float # base 18°C winter_design_temp: float # 99.6% heating design temperature (°C) summer_design_temp_db: float # 0.4% cooling design dry-bulb temperature (°C) summer_design_temp_wb: float # 0.4% cooling design wet-bulb temperature (°C) summer_daily_range: float # Mean daily temperature range in summer (°C) wind_speed: float # Mean wind speed (m/s) pressure: float # Mean atmospheric pressure (Pa) hourly_data: List[Dict] # Hourly data for integration with main.py typical_extreme_periods: Dict[str, Dict] # Typical/extreme periods (summer/winter) ground_temperatures: Dict[str, List[float]] # Monthly ground temperatures by depth def __init__(self, epw_file: pd.DataFrame, typical_extreme_periods: Dict, ground_temperatures: Dict, **kwargs): """Initialize ClimateLocation with EPW file data and header information.""" self.id = kwargs.get("id") self.country = kwargs.get("country") self.state_province = kwargs.get("state_province", "N/A") self.city = kwargs.get("city") self.latitude = kwargs.get("latitude") self.longitude = kwargs.get("longitude") self.elevation = kwargs.get("elevation") self.time_zone = kwargs.get("time_zone", 0.0) # Default to 0.0 if not provided self.climate_zone = kwargs.get("climate_zone", "Unknown") # Use provided climate_zone self.typical_extreme_periods = typical_extreme_periods self.ground_temperatures = ground_temperatures # Extract columns from EPW data months = pd.to_numeric(epw_file[1], errors='coerce').values days = pd.to_numeric(epw_file[2], errors='coerce').values hours = pd.to_numeric(epw_file[3], errors='coerce').values dry_bulb = pd.to_numeric(epw_file[6], errors='coerce').values humidity = pd.to_numeric(epw_file[8], errors='coerce').values pressure = pd.to_numeric(epw_file[9], errors='coerce').values global_radiation = pd.to_numeric(epw_file[13], errors='coerce').values direct_normal_radiation = pd.to_numeric(epw_file[14], errors='coerce').values diffuse_horizontal_radiation = pd.to_numeric(epw_file[15], errors='coerce').values wind_direction = pd.to_numeric(epw_file[20], errors='coerce').values wind_speed = pd.to_numeric(epw_file[21], errors='coerce').values # Filter wind speed outliers and log high values wind_speed = wind_speed[wind_speed <= 50] # Remove extreme outliers if (wind_speed > 15).any(): logger.warning(f"High wind speeds detected: {wind_speed[wind_speed > 15].tolist()}") # Calculate wet-bulb temperature wet_bulb = ClimateData.calculate_wet_bulb(dry_bulb, humidity) # Calculate design conditions self.winter_design_temp = round(np.nanpercentile(dry_bulb, 0.4), 1) self.summer_design_temp_db = round(np.nanpercentile(dry_bulb, 99.6), 1) self.summer_design_temp_wb = round(np.nanpercentile(wet_bulb, 99.6), 1) # Calculate degree days daily_temps = np.nanmean(dry_bulb.reshape(-1, 24), axis=1) self.heating_degree_days = round(np.nansum(np.maximum(18 - daily_temps, 0))) self.cooling_degree_days = round(np.nansum(np.maximum(daily_temps - 18, 0))) # Calculate summer daily temperature range (June–August, Southern Hemisphere) summer_mask = (months >= 6) & (months <= 8) summer_temps = dry_bulb[summer_mask].reshape(-1, 24) self.summer_daily_range = round(np.nanmean(np.nanmax(summer_temps, axis=1) - np.nanmin(summer_temps, axis=1)), 1) # Calculate mean wind speed and pressure self.wind_speed = round(np.nanmean(wind_speed), 1) self.pressure = round(np.nanmean(pressure), 1) # Log wind speed diagnostics logger.info(f"Wind speed stats: min={wind_speed.min():.1f}, max={wind_speed.max():.1f}, mean={self.wind_speed:.1f}") # Store hourly data with enhanced fields self.hourly_data = [] for i in range(len(months)): if np.isnan(months[i]) or np.isnan(days[i]) or np.isnan(hours[i]) or np.isnan(dry_bulb[i]): continue # Skip records with missing critical fields record = { "month": int(months[i]), "day": int(days[i]), "hour": int(hours[i]), "dry_bulb": float(dry_bulb[i]), "relative_humidity": float(humidity[i]) if not np.isnan(humidity[i]) else 0.0, "atmospheric_pressure": float(pressure[i]) if not np.isnan(pressure[i]) else self.pressure, "global_horizontal_radiation": float(global_radiation[i]) if not np.isnan(global_radiation[i]) else 0.0, "direct_normal_radiation": float(direct_normal_radiation[i]) if not np.isnan(direct_normal_radiation[i]) else 0.0, "diffuse_horizontal_radiation": float(diffuse_horizontal_radiation[i]) if not np.isnan(diffuse_horizontal_radiation[i]) else 0.0, "wind_speed": float(wind_speed[i]) if not np.isnan(wind_speed[i]) else 0.0, "wind_direction": float(wind_direction[i]) if not np.isnan(wind_direction[i]) else 0.0 } self.hourly_data.append(record) if len(self.hourly_data) != 8760: st.warning(f"Hourly data has {len(self.hourly_data)} records instead of 8760. Some records may have been excluded due to missing data.") def to_dict(self) -> Dict[str, Any]: """Convert the climate location to a dictionary.""" return { "id": self.id, "country": self.country, "state_province": self.state_province, "city": self.city, "latitude": self.latitude, "longitude": self.longitude, "elevation": self.elevation, "time_zone": self.time_zone, "climate_zone": self.climate_zone, "heating_degree_days": self.heating_degree_days, "cooling_degree_days": self.cooling_degree_days, "winter_design_temp": self.winter_design_temp, "summer_design_temp_db": self.summer_design_temp_db, "summer_design_temp_wb": self.summer_design_temp_wb, "summer_daily_range": self.summer_daily_range, "wind_speed": self.wind_speed, "pressure": self.pressure, "hourly_data": self.hourly_data, "typical_extreme_periods": self.typical_extreme_periods, "ground_temperatures": self.ground_temperatures } class ClimateData: """Class for managing ASHRAE 169 climate data from EPW files.""" def __init__(self): """Initialize climate data.""" self.locations = {} self.countries = [] self.country_states = {} def add_location(self, location: ClimateLocation): """Add a new location to the dictionary.""" self.locations[location.id] = location self.countries = sorted(list(set(loc.country for loc in self.locations.values()))) self.country_states = self._group_locations_by_country_state() def _group_locations_by_country_state(self) -> Dict[str, Dict[str, List[str]]]: """Group locations by country and state/province.""" result = {} for loc in self.locations.values(): if loc.country not in result: result[loc.country] = {} if loc.state_province not in result[loc.country]: result[loc.country][loc.state_province] = [] result[loc.country][loc.state_province].append(loc.city) for country in result: for state in result[country]: result[country][state] = sorted(result[country][state]) return result def get_location_by_id(self, location_id: str, session_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Retrieve climate data by ID from session state or locations.""" if "climate_data" in session_state and session_state["climate_data"].get("id") == location_id: return session_state["climate_data"] if location_id in self.locations: return self.locations[location_id].to_dict() return None @staticmethod def validate_climate_data(data: Dict[str, Any]) -> bool: """Validate climate data for required fields and ranges.""" required_fields = [ "id", "country", "city", "latitude", "longitude", "elevation", "time_zone", "climate_zone", "heating_degree_days", "cooling_degree_days", "winter_design_temp", "summer_design_temp_db", "summer_design_temp_wb", "summer_daily_range", "wind_speed", "pressure", "hourly_data" ] for field in required_fields: if field not in data: st.error(f"Validation failed: Missing required field '{field}'") logger.warning(f"Validation failed: Missing field '{field}'") return False if not (-90 <= data["latitude"] <= 90 and -180 <= data["longitude"] <= 180): st.error("Validation failed: Invalid latitude or longitude") logger.warning("Validation failed: Invalid latitude or longitude") return False if data["elevation"] < 0: st.error("Validation failed: Negative elevation") logger.warning("Validation failed: Negative elevation") return False if not (-12 <= data["time_zone"] <= 14): st.error(f"Validation failed: Time zone {data['time_zone']} outside range (-12 to +14)") logger.warning(f"Validation failed: Time zone {data['time_zone']} outside range") return False if data["climate_zone"] not in ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"]: st.error(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") logger.warning(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") return False if not (data["heating_degree_days"] >= 0 and data["cooling_degree_days"] >= 0): st.error("Validation failed: Negative degree days") logger.warning("Validation failed: Negative degree days") return False if not (-50 <= data["winter_design_temp"] <= 20): st.error(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") logger.warning(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") return False if not (0 <= data["summer_design_temp_db"] <= 50 and 0 <= data["summer_design_temp_wb"] <= 40): st.error("Validation failed: Invalid summer design temperatures") logger.warning("Validation failed: Invalid summer design temperatures") return False if data["summer_daily_range"] < 0: st.error("Validation failed: Negative summer daily range") logger.warning("Validation failed: Negative summer daily range") return False if not (0 <= data["wind_speed"] <= 30): st.error(f"Validation failed: Wind speed {data['wind_speed']} outside range") logger.warning(f"Validation failed: Wind speed {data['wind_speed']} outside range") return False if not (80000 <= data["pressure"] <= 110000): st.error(f"Validation failed: Pressure {data['pressure']} outside range") logger.warning(f"Validation failed: Pressure {data['pressure']} outside range") return False if not data["hourly_data"] or len(data["hourly_data"]) < 8700: st.error(f"Validation failed: Hourly data has {len(data['hourly_data'])} records, expected ~8760") logger.warning(f"Validation failed: Hourly data has {len(data['hourly_data'])} records") return False for record in data["hourly_data"]: if not (1 <= record["month"] <= 12): st.error(f"Validation failed: Invalid month {record['month']}") logger.warning(f"Validation failed: Invalid month {record['month']}") return False if not (1 <= record["day"] <= 31): st.error(f"Validation failed: Invalid day {record['day']}") logger.warning(f"Validation failed: Invalid day {record['day']}") return False if not (1 <= record["hour"] <= 24): st.error(f"Validation failed: Invalid hour {record['hour']}") logger.warning(f"Validation failed: Invalid hour {record['hour']}") return False if not (-50 <= record["dry_bulb"] <= 50): st.error(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") logger.warning(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") return False if not (0 <= record["relative_humidity"] <= 100): st.error(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") logger.warning(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") return False if not (80000 <= record["atmospheric_pressure"] <= 110000): st.error(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") logger.warning(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") return False if not (0 <= record["global_horizontal_radiation"] <= 1200): st.error(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") logger.warning(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") return False if not (0 <= record["direct_normal_radiation"] <= 1200): st.error(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") logger.warning(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") return False if not (0 <= record["diffuse_horizontal_radiation"] <= 1200): st.error(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") logger.warning(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") return False if not (0 <= record["wind_speed"] <= 30): st.error(f"Validation failed: Wind speed {record['wind_speed']} outside range") logger.warning(f"Validation failed: Wind speed {record['wind_speed']} outside range") return False if not (0 <= record["wind_direction"] <= 360): st.error(f"Validation failed: Wind direction {record['wind_direction']} outside range") logger.warning(f"Validation failed: Wind direction {record['wind_direction']} outside range") return False # Validate typical/extreme periods (optional) if "typical_extreme_periods" in data and data["typical_extreme_periods"]: expected_periods = ["summer_extreme", "summer_typical", "winter_extreme", "winter_typical"] missing_periods = [p for p in expected_periods if p not in data["typical_extreme_periods"]] if missing_periods: st.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") logger.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") for period in data["typical_extreme_periods"].values(): for date in ["start", "end"]: if not (1 <= period[date]["month"] <= 12 and 1 <= period[date]["day"] <= 31): st.error(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") logger.warning(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") return False # Validate ground temperatures (optional) if "ground_temperatures" in data and data["ground_temperatures"]: for depth, temps in data["ground_temperatures"].items(): if len(temps) != 12 or not all(0 <= t <= 50 for t in temps): st.error(f"Validation failed: Invalid ground temperatures for depth {depth}") logger.warning(f"Validation failed: Invalid ground temperatures for depth {depth}") return False return True @staticmethod def calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray: """Calculate Wet Bulb Temperature using Stull (2011) approximation.""" db = np.array(dry_bulb, dtype=float) rh = np.array(relative_humidity, dtype=float) term1 = db * np.arctan(0.151977 * (rh + 8.313659)**0.5) term2 = np.arctan(db + rh) term3 = np.arctan(rh - 1.676331) term4 = 0.00391838 * rh**1.5 * np.arctan(0.023101 * rh) term5 = -4.686035 wet_bulb = term1 + term2 - term3 + term4 + term5 invalid_mask = (rh < 5) | (rh > 99) | (db < -20) | (db > 50) | np.isnan(db) | np.isnan(rh) wet_bulb[invalid_mask] = np.nan return wet_bulb @staticmethod def is_numeric(value: str) -> bool: """Check if a string can be converted to a number.""" try: float(value) return True except ValueError: return False def get_locations_by_state(self, state: str) -> List[Dict[str, str]]: """Get list of locations for a given state from LOCATION_MAPPING.""" return [ {"number": loc_num, "city": loc_info["city"]} for loc_num, loc_info in LOCATION_MAPPING.items() if loc_info["state"] == state ] def process_epw_file(self, epw_content: str, location_num: str, rcp: str, year: str) -> Optional[ClimateLocation]: """Process an EPW file content and return a ClimateLocation object.""" try: epw_lines = epw_content.splitlines() # Parse header header = next(line for line in epw_lines if line.startswith("LOCATION")) header_parts = header.split(",") if len(header_parts) < 10: raise ValueError("Invalid LOCATION header: too few fields.") city = header_parts[1].strip() or "Unknown" city = re.sub(r'\..*', '', city) # Clean city name state_province = header_parts[2].strip() or "Unknown" country = header_parts[3].strip() or "Unknown" latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0 longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0 time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0 elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0 logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s", city, country, latitude, longitude, time_zone, elevation) # Override city and state from LOCATION_MAPPING if location_num in LOCATION_MAPPING: city = LOCATION_MAPPING[location_num]["city"] state_province = LOCATION_MAPPING[location_num]["state"] # Parse TYPICAL/EXTREME PERIODS typical_extreme_periods = {} date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' for line in epw_lines: if line.startswith("TYPICAL/EXTREME PERIODS"): parts = line.strip().split(',') try: num_periods = int(parts[1]) except ValueError: st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") break for i in range(num_periods): try: if len(parts) < 2 + i*4 + 4: st.warning(f"Insufficient fields for period {i+1}, skipping.") continue period_name = parts[2 + i*4] period_type = parts[3 + i*4] start_date = parts[4 + i*4].strip() end_date = parts[5 + i*4].strip() if period_name in [ "Summer - Week Nearest Max Temperature For Period", "Summer - Week Nearest Average Temperature For Period", "Winter - Week Nearest Min Temperature For Period", "Winter - Week Nearest Average Temperature For Period" ]: season = 'summer' if 'Summer' in period_name else 'winter' period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') key = f"{season}_{period_type}" start_date_clean = re.sub(r'\s+', '', start_date) end_date_clean = re.sub(r'\s+', '', end_date) if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") continue start_month, start_day = map(int, start_date_clean.split('/')) end_month, end_day = map(int, end_date_clean.split('/')) typical_extreme_periods[key] = { "start": {"month": start_month, "day": start_day}, "end": {"month": end_month, "day": end_day} } except (IndexError, ValueError) as e: st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") continue break # Parse GROUND TEMPERATURES ground_temperatures = {} for line in epw_lines: if line.startswith("GROUND TEMPERATURES"): parts = line.strip().split(',') try: num_depths = int(parts[1]) except ValueError: st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") break for i in range(num_depths): try: if len(parts) < 2 + i*16 + 16: st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") continue depth = parts[2 + i*16] temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] if len(temps) != 12: st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") continue ground_temperatures[depth] = temps except (ValueError, IndexError) as e: st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") continue break # Read data section data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) if len(epw_data) != 8760: raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") if len(epw_data.columns) not in [32, 35]: raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') if epw_data[col].isna().all(): raise ValueError(f"Column {col} contains only non-numeric or missing data.") # Calculate average humidity for climate zone assignment humidity = pd.to_numeric(epw_data[8], errors='coerce').values avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0 logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country) # Create ClimateLocation location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=typical_extreme_periods, ground_temperatures=ground_temperatures, id=f"{country[:1].upper()}{city[:3].upper()}_{rcp}_{year}", country=country, state_province=state_province, city=city, latitude=latitude, longitude=longitude, elevation=elevation, time_zone=time_zone ) # Assign climate zone try: climate_zone = self.assign_climate_zone( hdd=location.heating_degree_days, cdd=location.cooling_degree_days, avg_humidity=avg_humidity ) location.climate_zone = climate_zone logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country) except Exception as e: st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.") logger.error("Climate zone assignment error: %s", str(e)) location.climate_zone = "Unknown" return location except Exception as e: st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") logger.error(f"EPW processing error: %s", str(e)) return None def display_climate_input(self, session_state: Dict[str, Any]): """Display Streamlit interface for EPW upload and visualizations.""" st.title("Climate Data Analysis") # Apply consistent styling st.markdown(STYLE, unsafe_allow_html=True) # Clear invalid session_state["climate_data"] to prevent validation errors if "climate_data" in session_state and not all(key in session_state["climate_data"] for key in ["id", "country", "city"]): logger.warning("Invalid climate_data in session_state, clearing: %s", session_state["climate_data"]) session_state["climate_data"] = {} # Initialize active tab in session_state if "active_tab" not in session_state: session_state["active_tab"] = "General Information" # Define tabs, including new Climate Projection tab tab_names = [ "General Information", "Climate Projection", "Psychrometric Chart", "Sun Shading Chart", "Temperature Range", "Wind Rose" ] tabs = st.tabs(tab_names) # Initialize location and epw_data for display location = None epw_data = None # General Information tab: Handle EPW upload and display existing data with tabs[0]: uploaded_file = st.file_uploader("Upload EPW File", type=["epw"]) if uploaded_file: with st.spinner("Processing uploaded EPW file..."): try: # Process new EPW file epw_content = uploaded_file.read().decode("utf-8") epw_lines = epw_content.splitlines() # Parse header header = next(line for line in epw_lines if line.startswith("LOCATION")) header_parts = header.split(",") if len(header_parts) < 10: raise ValueError("Invalid LOCATION header: too few fields.") city = header_parts[1].strip() or "Unknown" city = re.sub(r'\..*', '', city) state_province = header_parts[2].strip() or "Unknown" country = header_parts[3].strip() or "Unknown" latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0 longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0 time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0 elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0 logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s", city, country, latitude, longitude, time_zone, elevation) # Parse TYPICAL/EXTREME PERIODS typical_extreme_periods = {} date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' for line in epw_lines: if line.startswith("TYPICAL/EXTREME PERIODS"): parts = line.strip().split(',') try: num_periods = int(parts[1]) except ValueError: st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") break for i in range(num_periods): try: if len(parts) < 2 + i*4 + 4: st.warning(f"Insufficient fields for period {i+1}, skipping.") continue period_name = parts[2 + i*4] period_type = parts[3 + i*4] start_date = parts[4 + i*4].strip() end_date = parts[5 + i*4].strip() if period_name in [ "Summer - Week Nearest Max Temperature For Period", "Summer - Week Nearest Average Temperature For Period", "Winter - Week Nearest Min Temperature For Period", "Winter - Week Nearest Average Temperature For Period" ]: season = 'summer' if 'Summer' in period_name else 'winter' period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') key = f"{season}_{period_type}" start_date_clean = re.sub(r'\s+', '', start_date) end_date_clean = re.sub(r'\s+', '', end_date) if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") continue start_month, start_day = map(int, start_date_clean.split('/')) end_month, end_day = map(int, end_date_clean.split('/')) typical_extreme_periods[key] = { "start": {"month": start_month, "day": start_day}, "end": {"month": end_month, "day": end_day} } except (IndexError, ValueError) as e: st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") continue break # Parse GROUND TEMPERATURES ground_temperatures = {} for line in epw_lines: if line.startswith("GROUND TEMPERATURES"): parts = line.strip().split(',') try: num_depths = int(parts[1]) except ValueError: st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") break for i in range(num_depths): try: if len(parts) < 2 + i*16 + 16: st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") continue depth = parts[2 + i*16] temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] if len(temps) != 12: st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") continue ground_temperatures[depth] = temps except (ValueError, IndexError) as e: st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") continue break # Read data section data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) if len(epw_data) != 8760: raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") if len(epw_data.columns) != 35: raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') if epw_data[col].isna().all(): raise ValueError(f"Column {col} contains only non-numeric or missing data.") # Calculate average humidity for climate zone assignment humidity = pd.to_numeric(epw_data[8], errors='coerce').values avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0 logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country) # Create ClimateLocation with consistent ID location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=typical_extreme_periods, ground_temperatures=ground_temperatures, id=f"{country[:1].upper()}{city[:3].upper()}_UPLOAD", country=country, state_province=state_province, city=city, latitude=latitude, longitude=longitude, elevation=elevation, time_zone=time_zone ) # Assign climate zone try: climate_zone = self.assign_climate_zone( hdd=location.heating_degree_days, cdd=location.cooling_degree_days, avg_humidity=avg_humidity ) location.climate_zone = climate_zone logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country) except Exception as e: st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.") logger.error("Climate zone assignment error: %s", str(e)) location.climate_zone = "Unknown" self.add_location(location) climate_data_dict = location.to_dict() session_state["climate_data"] = climate_data_dict if not self.validate_climate_data(climate_data_dict): st.warning(f"Climate data validation failed for {city}, {country}. Displaying data anyway.") logger.warning("Validation failed for new EPW data: %s", climate_data_dict["id"]) st.success("Climate data extracted from EPW file!") logger.info("Successfully processed EPW file and stored in session_state: %s", climate_data_dict["id"]) session_state["active_tab"] = "General Information" except Exception as e: st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") logger.error(f"EPW processing error: %s", str(e)) session_state["climate_data"] = {} elif "climate_data" in session_state and session_state["climate_data"]: # Reconstruct from session_state climate_data_dict = session_state["climate_data"] logger.info("Attempting to reconstruct climate data from session_state: %s", climate_data_dict.get("id", "Unknown")) required_keys = ["id", "country", "city", "latitude", "longitude", "elevation", "time_zone", "climate_zone", "hourly_data"] missing_keys = [key for key in required_keys if key not in climate_data_dict] if missing_keys: st.warning(f"Invalid climate data in session state, missing keys: {', '.join(missing_keys)}. Please upload a new EPW file.") logger.warning("Missing keys in session_state.climate_data: %s", missing_keys) session_state["climate_data"] = {} else: if not self.validate_climate_data(climate_data_dict): st.warning(f"Stored climate data validation failed for {climate_data_dict.get('city', 'Unknown')}, {climate_data_dict.get('country', 'Unknown')}. Displaying data anyway.") logger.warning("Validation failed for session_state.climate_data: %s", climate_data_dict.get("id", "Unknown")) try: # Rebuild epw_data from hourly_data hourly_data = climate_data_dict["hourly_data"] epw_data = pd.DataFrame(np.nan, index=range(len(hourly_data)), columns=range(35)) epw_data[1] = [d["month"] for d in hourly_data] epw_data[2] = [d["day"] for d in hourly_data] epw_data[3] = [d["hour"] for d in hourly_data] epw_data[6] = [d["dry_bulb"] for d in hourly_data] epw_data[8] = [d["relative_humidity"] for d in hourly_data] epw_data[9] = [d["atmospheric_pressure"] for d in hourly_data] epw_data[13] = [d["global_horizontal_radiation"] for d in hourly_data] epw_data[14] = [d["direct_normal_radiation"] for d in hourly_data] epw_data[15] = [d["diffuse_horizontal_radiation"] for d in hourly_data] epw_data[20] = [d["wind_direction"] for d in hourly_data] epw_data[21] = [d["wind_speed"] for d in hourly_data] # Create ClimateLocation location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=climate_data_dict.get("typical_extreme_periods", {}), ground_temperatures=climate_data_dict.get("ground_temperatures", {}), id=climate_data_dict["id"], country=climate_data_dict["country"], state_province=climate_data_dict.get("state_province", "N/A"), city=climate_data_dict["city"], latitude=climate_data_dict["latitude"], longitude=climate_data_dict["longitude"], elevation=climate_data_dict["elevation"], time_zone=climate_data_dict["time_zone"], climate_zone=climate_data_dict["climate_zone"] ) location.hourly_data = climate_data_dict["hourly_data"] self.add_location(location) st.info(f"Displaying previously extracted climate data for {climate_data_dict['city']}, {climate_data_dict['country']}.") logger.info("Successfully reconstructed climate data from session_state: %s", climate_data_dict["id"]) except Exception as e: st.error(f"Error reconstructing climate data: {str(e)}. Please upload a new EPW file.") logger.error(f"Reconstruction error: %s", str(e)) session_state["climate_data"] = {} # Display data if available if location is not None and epw_data is not None: self.display_design_conditions(location) # Climate Projection tab with tabs[1]: st.markdown("""

Climate Projection

At this stage, this section is focused on some locations in Australia, and the provided data is based on "Projected weather files for building energy modelling" from CSIRO 2022.

""", unsafe_allow_html=True) # Dropdown menus country = st.selectbox("Country", ["Australia"], key="projection_country") states = ["ACT", "NSW", "NT", "QLD", "SA", "TAS", "VIC", "WA"] state = st.selectbox("State", states, key="projection_state") # Get locations for selected state locations = self.get_locations_by_state(state) location_options = [f"{loc['city']} ({loc['number']})" for loc in locations] location_display = st.selectbox("Location", location_options, key="location") # Extract location number from selection location_num = "" if location_display: location_num = next(loc["number"] for loc in locations if f"{loc['city']} ({loc['number']})" == location_display) rcp_options = ["RCP2.6", "RCP4.5", "RCP8.5"] rcp = st.selectbox("RCP Scenario", rcp_options, key="rcp") year_options = ["2030", "2050", "2070", "2090"] year = st.selectbox("Year", year_options, key="year") if st.button("Extract Data"): with st.spinner("Extracting climate projection data..."): # Log AU_CCH_DIR for debugging logger.debug(f"AU_CCH_DIR set to: {os.path.abspath(AU_CCH_DIR)}") # Construct file path file_path = os_join(AU_CCH_DIR, location_num, rcp, year) logger.debug(f"Attempting to access directory: {os.path.abspath(file_path)}") if not os.path.exists(file_path): st.error(f"No directory found at au_cch/{location_num}/{rcp}/{year}/. In the Hugging Face Space 'mabuseif/Update-materials-solar', ensure the 'au_cch' folder is in the repository root alongside 'data' and 'utils', with the structure au_cch/{location_num}/{rcp}/{year} (e.g., au_cch/1/RCP2.6/2070/) containing a single .epw file (e.g., adelaide_rcp2.6_2070.epw).") logger.error(f"Directory does not exist: {file_path}") else: try: epw_files = [f for f in os.listdir(file_path) if f.endswith(".epw")] if not epw_files: st.error(f"No EPW file found in au_cch/{location_num}/{rcp}/{year}/. Please check that au_cch/{location_num}/{rcp}/{year}/ (e.g., au_cch/1/RCP2.6/{year}/) contains a single file with a .epw extension (e.g., adelaide_rcp2.6_2070.epw).") logger.error(f"No EPW file found in {file_path}") elif len(epw_files) > 1: st.error(f"Multiple EPW files found in au_cch/{location_num}/{rcp}/{year}/: {epw_files}. Please ensure exactly one .epw file per directory (e.g., au_cch/1/RCP2.6/{year}/).") logger.error(f"Multiple EPW files found: {epw_files}") else: epw_file_path = os_join(file_path, epw_files[0]) try: with open(epw_file_path, 'r') as f: epw_content = f.read() location = self.process_epw_file(epw_content, location_num, rcp, year) if location: self.add_location(location) climate_data_dict = location.to_dict() if self.validate_climate_data(climate_data_dict): session_state["climate_data"] = climate_data_dict st.success(f"Successfully extracted climate projection data for {location.city}, {location.country}, {rcp}, {year}!") logger.info(f"Successful processing projection of {climate_data_dict['id']}") session_state["active_tab"] = "General Information" # Set location and epw_data for immediate display epw_data = pd.DataFrame(np.nan, index=range(len(climate_data_dict["hourly_data"])), columns=range(35)) epw_data[1] = [d["month"] for d in climate_data_dict["hourly_data"]] epw_data[2] = [d["day"] for d in climate_data_dict["hourly_data"]] epw_data[3] = [d["hour"] for d in climate_data_dict["hourly_data"]] epw_data[6] = [d["dry_bulb"] for d in climate_data_dict["hourly_data"]] epw_data[8] = [d["relative_humidity"] for d in climate_data_dict["hourly_data"]] epw_data[9] = [d["atmospheric_pressure"] for d in climate_data_dict["hourly_data"]] epw_data[13] = [d["global_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] epw_data[14] = [d["direct_normal_radiation"] for d in climate_data_dict["hourly_data"]] epw_data[15] = [d["diffuse_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] epw_data[20] = [d["wind_direction"] for d in climate_data_dict["hourly_data"]] epw_data[21] = [d["wind_speed"] for d in climate_data_dict["hourly_data"]] else: st.warning(f"Climate projection data validation failed for {location.city}, {location.country}. Displaying data anyway.") logger.warning(f"Validation failed for {climate_data_dict['id']}") session_state["climate_data"] = climate_data_dict session_state["active_tab"] = "General Information" # Set location and epw_data for immediate display epw_data = pd.DataFrame(np.nan, index=range(len(climate_data_dict["hourly_data"])), columns=range(35)) epw_data[1] = [d["month"] for d in climate_data_dict["hourly_data"]] epw_data[2] = [d["day"] for d in climate_data_dict["hourly_data"]] epw_data[3] = [d["hour"] for d in climate_data_dict["hourly_data"]] epw_data[6] = [d["dry_bulb"] for d in climate_data_dict["hourly_data"]] epw_data[8] = [d["relative_humidity"] for d in climate_data_dict["hourly_data"]] epw_data[9] = [d["atmospheric_pressure"] for d in climate_data_dict["hourly_data"]] epw_data[13] = [d["global_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] epw_data[14] = [d["direct_normal_radiation"] for d in climate_data_dict["hourly_data"]] epw_data[15] = [d["diffuse_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] epw_data[20] = [d["wind_direction"] for d in climate_data_dict["hourly_data"]] epw_data[21] = [d["wind_speed"] for d in climate_data_dict["hourly_data"]] except Exception as e: st.error(f"Error reading {epw_file_path}: {str(e)}") logger.error(f"Error reading {epw_file_path}: {str(e)}") session_state["climate_data"] = {} except Exception as e: st.error(f"Error accessing directory au_cch/{location_num}/{rcp}/{year}/: {str(e)}") logger.error(f"Error accessing directory {file_path}: {str(e)}") # Other tabs if location is not None and epw_data is not None: with tabs[2]: self.plot_psychrometric_chart(location, epw_data) with tabs[3]: self.plot_sun_shading_chart(location) with tabs[4]: self.plot_temperature_range(location, epw_data) with tabs[5]: self.plot_wind_rose(epw_data) else: for i in range(2, len(tabs)): with tabs[i]: st.info("No climate data available. Please upload an EPW file or select a climate projection to proceed.") logger.info("No climate data to display in tab %s; prompting for EPW upload.", tab_names[i]) def display_design_conditions(self, location: ClimateLocation): """Display design conditions for HVAC calculations using styled HTML.""" st.subheader("Design Conditions") # Location Details st.markdown(f"""

Location Details

""", unsafe_allow_html=True) # Calculated Climate Parameters st.markdown(f"""

Calculated Climate Parameters

""", unsafe_allow_html=True) # Typical/Extreme Periods if location.typical_extreme_periods: period_items = [ f"
  • {key.replace('_', ' ').title()}: {period['start']['month']}/{period['start']['day']} to {period['end']['month']}/{period['end']['day']}
  • " for key, period in location.typical_extreme_periods.items() ] st.markdown(f"""

    Typical/Extreme Periods

    """, unsafe_allow_html=True) # Ground Temperatures (Table) if location.ground_temperatures: st.markdown('

    Ground Temperatures

    ', unsafe_allow_html=True) month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] table_data = [] for depth, temps in location.ground_temperatures.items(): row = {"Depth (m)": float(depth)} row.update({month: f"{temp:.2f}" for month, temp in zip(month_names, temps)}) table_data.append(row) df = pd.DataFrame(table_data) st.dataframe(df, use_container_width=True) # Add download button for Ground Temperatures with unique key csv = df.to_csv(index=False) st.download_button( label="Download Ground Temperatures as CSV", data=csv, file_name=f"ground_temperatures_{location.city}_{location.country}.csv", mime="text/csv", key=f"download_ground_temperatures_{location.id}" # Unique key based on location.id ) # Hourly Data (Table) st.markdown('

    Hourly Climate Data

    ', unsafe_allow_html=True) hourly_table_data = [ { "Month": record["month"], "Day": record["day"], "Hour": record["hour"], "Dry Bulb Temp (°C)": f"{record['dry_bulb']:.1f}", "Relative Humidity (%)": f"{record['relative_humidity']:.1f}", "Atmospheric Pressure (Pa)": f"{record['atmospheric_pressure']:.1f}", "Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}", "Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}", "Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}", "Wind Speed (m/s)": f"{record['wind_speed']:.1f}", "Wind Direction (°)": f"{record['wind_direction']:.1f}" } for record in location.hourly_data ] hourly_df = pd.DataFrame(hourly_table_data) st.dataframe(hourly_df, use_container_width=True) # Add download button for Hourly Climate Data with unique key csv = hourly_df.to_csv(index=False) st.download_button( label="Download Hourly Climate Data as CSV", data=csv, file_name=f"hourly_climate_data_{location.city}_{location.country}.csv", mime="text/csv", key=f"download_hourly_climate_{location.id}" # Unique key based on location.id ) @staticmethod def assign_climate_zone(hdd: float, cdd: float, avg_humidity: float) -> str: """Assign ASHRAE 169 climate zone based on HDD, CDD, and humidity.""" if cdd > 10000: return "0A" if avg_humidity > 60 else "0B" elif cdd > 5000: return "1A" if avg_humidity > 60 else "1B" elif cdd > 2500: return "2A" if avg_humidity > 60 else "2B" elif hdd < 2000 and cdd > 1000: return "3A" if avg_humidity > 60 else "3B" if avg_humidity < 40 else "3C" elif hdd < 3000: return "4A" if avg_humidity > 60 else "4B" if avg_humidity < 40 else "4C" elif hdd < 4000: return "5A" if avg_humidity > 60 else "5B" if avg_humidity < 40 else "5C" elif hdd < 5000: return "6A" if avg_humidity > 60 else "6B" elif hdd < 7000: return "7" else: return "8" def plot_psychrometric_chart(self, location: ClimateLocation, epw_data: pd.DataFrame): """Plot psychrometric chart with ASHRAE 55 comfort zone and psychrometric lines.""" st.subheader("Psychrometric Chart") dry_bulb = pd.to_numeric(epw_data[6], errors='coerce').values humidity = pd.to_numeric(epw_data[8], errors='coerce').values valid_mask = ~np.isnan(dry_bulb) & ~np.isnan(humidity) dry_bulb = dry_bulb[valid_mask] humidity = humidity[valid_mask] # Calculate humidity ratio (kg/kg dry air) pressure = location.pressure / 1000 # kPa saturation_pressure = 6.1078 * 10 ** (7.5 * dry_bulb / (dry_bulb + 237.3)) vapor_pressure = humidity / 100 * saturation_pressure humidity_ratio = 0.62198 * vapor_pressure / (pressure - vapor_pressure) * 1000 # Convert to g/kg fig = go.Figure() # Hourly data points fig.add_trace(go.Scatter( x=dry_bulb, y=humidity_ratio, mode='markers', marker=dict(size=5, opacity=0.5, color='blue'), name='Hourly Conditions' )) # ASHRAE 55 comfort zone comfort_db = [20, 26, 26, 20, 20] comfort_rh = [30, 30, 60, 60, 30] comfort_vp = np.array(comfort_rh) / 100 * 6.1078 * 10 ** (7.5 * np.array(comfort_db) / (np.array(comfort_db) + 237.3)) comfort_hr = 0.62198 * comfort_vp / (pressure - comfort_vp) * 1000 fig.add_trace(go.Scatter( x=comfort_db, y=comfort_hr, mode='lines', line=dict(color='green', width=2), fill='toself', fillcolor='rgba(0, 255, 0, 0.2)', name='ASHRAE 55 Comfort Zone' )) # Constant humidity ratio lines for hr in [5, 10, 15]: db_range = np.linspace(0, 40, 100) vp = (hr / 1000 * pressure) / (0.62198 + hr / 1000) rh = vp / (6.1078 * 10 ** (7.5 * db_range / (db_range + 237.3))) * 100 hr_line = np.full_like(db_range, hr) fig.add_trace(go.Scatter( x=db_range, y=hr_line, mode='lines', line=dict(color='gray', width=1, dash='dash'), name=f'{hr} g/kg', showlegend=True )) # Constant wet-bulb temperature lines wet_bulb_temps = [10, 15, 20] for wbt in wet_bulb_temps: db_range = np.linspace(0, 40, 100) rh_range = np.linspace(5, 95, 100) wb_values = self.calculate_wet_bulb(db_range, rh_range) vp = rh_range / 100 * (6.1078 * 10 ** (7.5 * db_range / (db_range + 237.3))) hr_values = 0.62198 * vp / (pressure - vp) * 1000 mask = (wb_values >= wbt - 0.5) & (wb_values <= wbt + 0.5) if np.any(mask): fig.add_trace(go.Scatter( x=db_range[mask], y=hr_values[mask], mode='lines', line=dict(color='purple', width=1, dash='dot'), name=f'Wet-Bulb {wbt}°C', showlegend=True )) fig.update_layout( title="Psychrometric Chart", xaxis_title="Dry-Bulb Temperature (°C)", yaxis_title="Humidity Ratio (g/kg dry air)", xaxis=dict(range=[-5, 40]), yaxis=dict(range=[0, 25]), showlegend=True, template='plotly_white' ) st.plotly_chart(fig, use_container_width=True) def plot_sun_shading_chart(self, location: ClimateLocation): """Plot sun path chart for summer and winter solstices, inspired by Climate Consultant.""" st.subheader("Sun Shading Chart") dates = [ datetime(2025, 6, 21), # Winter solstice (Southern Hemisphere) datetime(2025, 12, 21) # Summer solstice (Southern Hemisphere) ] times = pd.date_range(start="2025-01-01 00:00", end="2025-01-01 23:00", freq='H') solar_data = [] for date in dates: solpos = pvlib.solarposition.get_solarposition( time=[date.replace(hour=t.hour, minute=t.minute) for t in times], latitude=location.latitude, longitude=location.longitude, altitude=location.elevation ) solar_data.append({ 'date': date.strftime('%Y-%m-%d'), 'azimuth': solpos['azimuth'].values, 'altitude': solpos['elevation'].values }) fig = go.Figure() colors = ['orange', 'blue'] labels = ['Summer Solstice (Dec 21)', 'Winter Solstice (Jun 21)'] for i, data in enumerate(solar_data): fig.add_trace(go.Scatterpolar( r=data['altitude'], theta=data['azimuth'], mode='lines+markers', name=labels[i], line=dict(color=colors[i], width=2), marker=dict(size=6, color=colors[i]), opacity=0.8 )) fig.update_layout( title="Sun Path Diagram", polar=dict( radialaxis=dict( range=[0, 90], tickvals=[0, 30, 60, 90], ticktext=["0°", "30°", "60°", "90°"], title="Altitude (degrees)" ), angularaxis=dict( direction="clockwise", rotation=90, tickvals=[0, 90, 180, 270], ticktext=["N", "E", "S", "W"] ) ), showlegend=True, template='plotly_white' ) st.plotly_chart(fig, use_container_width=True) def plot_temperature_range(self, location: ClimateLocation, epw_data: pd.DataFrame): """Plot monthly temperature ranges with design conditions.""" st.subheader("Monthly Temperature Range") months = pd.to_numeric(epw_data[1], errors='coerce').values dry_bulb = pd.to_numeric(epw_data[6], errors='coerce').values month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] temps_min = [] temps_max = [] temps_avg = [] for i in range(1, 13): month_mask = (months == i) temps_min.append(round(np.nanmin(dry_bulb[month_mask]), 1)) temps_max.append(round(np.nanmax(dry_bulb[month_mask]), 1)) temps_avg.append(round(np.nanmean(dry_bulb[month_mask]), 1)) fig = go.Figure() fig.add_trace(go.Scatter( x=list(range(1, 13)), y=temps_max, mode='lines', name='Max Temperature', line=dict(color='red', dash='dash'), opacity=0.5 )) fig.add_trace(go.Scatter( x=list(range(1, 13)), y=temps_min, mode='lines', name='Min Temperature', line=dict(color='red', dash='dash'), opacity=0.5, fill='tonexty', fillcolor='rgba(255, 0, 0, 0.1)' )) fig.add_trace(go.Scatter( x=list(range(1, 13)), y=temps_avg, mode='lines+markers', name='Avg Temperature', line=dict(color='red'), marker=dict(size=8) )) # Add design temperatures fig.add_hline(y=location.winter_design_temp, line_dash="dot", line_color="blue", annotation_text="Winter Design Temp", annotation_position="top left") fig.add_hline(y=location.summer_design_temp_db, line_dash="dot", line_color="orange", annotation_text="Summer Design Temp (DB)", annotation_position="bottom left") fig.update_layout( title="Monthly Temperature Profile", xaxis_title="Month", yaxis_title="Temperature (°C)", xaxis=dict(tickmode='array', tickvals=list(range(1, 13)), ticktext=month_names), legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), showlegend=True, template='plotly_white' ) st.plotly_chart(fig, use_container_width=True) def plot_wind_rose(self, epw_data: pd.DataFrame): """Plot wind rose diagram with improved clarity, inspired by Climate Consultant.""" st.subheader("Wind Rose") wind_speed = pd.to_numeric(epw_data[21], errors='coerce').values wind_direction = pd.to_numeric(epw_data[20], errors='coerce').values valid_mask = ~np.isnan(wind_speed) & ~np.isnan(wind_direction) wind_speed = wind_speed[valid_mask] wind_direction = wind_direction[valid_mask] # Bin data with 8 directions and tailored speed bins speed_bins = [0, 2, 4, 6, 8, np.inf] direction_bins = np.linspace(0, 360, 9)[:-1] speed_labels = ['0-2 m/s', '2-4 m/s', '4-6 m/s', '6-8 m/s', '8+ m/s'] direction_labels = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] hist = np.histogram2d( wind_direction, wind_speed, bins=[direction_bins, speed_bins], density=True )[0] hist = hist * 100 # Convert to percentage fig = go.Figure() colors = ['#E6F0FF', '#B3D1FF', '#80B2FF', '#4D94FF', '#1A75FF'] for i, speed_label in enumerate(speed_labels): fig.add_trace(go.Barpolar( r=hist[:, i], theta=direction_bins, width=45, name=speed_label, marker=dict(color=colors[i]), opacity=0.8 )) fig.update_layout( title="Wind Rose", polar=dict( radialaxis=dict( tickvals=[0, 5, 10, 15], ticktext=["0%", "5%", "10%", "15%"], title="Frequency (%)" ), angularaxis=dict( direction="clockwise", rotation=90, tickvals=direction_bins, ticktext=direction_labels ) ), showlegend=True, template='plotly_white' ) st.plotly_chart(fig, use_container_width=True) def export_to_json(self, file_path: str) -> None: """Export all climate data to a JSON file.""" data = {loc_id: loc.to_dict() for loc_id, loc in self.locations.items()} with open(file_path, 'w') as f: json.dump(data, f, indent=4) @classmethod def from_json(cls, file_path: str) -> 'ClimateData': """Load climate data from a JSON file.""" with open(file_path, 'r') as f: data = json.load(f) climate_data = cls() for loc_id, loc_dict in data.items(): hourly_data = loc_dict["hourly_data"] epw_data = pd.DataFrame({ 1: [d["month"] for d in hourly_data], 2: [d["day"] for d in hourly_data], 3: [d["hour"] for d in hourly_data], 6: [d["dry_bulb"] for d in hourly_data], 8: [d["relative_humidity"] for d in hourly_data], 9: [d["atmospheric_pressure"] for d in hourly_data], 13: [d["global_horizontal_radiation"] for d in hourly_data], 14: [d["direct_normal_radiation"] for d in hourly_data], 15: [d["diffuse_horizontal_radiation"] for d in hourly_data], 20: [d["wind_direction"] for d in hourly_data], 21: [d["wind_speed"] for d in hourly_data], }) location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=loc_dict["typical_extreme_periods"], ground_temperatures=loc_dict["ground_temperatures"], id=loc_dict["id"], country=loc_dict["country"], state_province=loc_dict["state_province"], city=loc_dict["city"], latitude=loc_dict["latitude"], longitude=loc_dict["longitude"], elevation=loc_dict["elevation"], time_zone=loc_dict["time_zone"], climate_zone=loc_dict["climate_zone"] ) location.hourly_data = loc_dict["hourly_data"] climate_data.add_location(location) return climate_data if __name__ == "__main__": climate_data = ClimateData() session_state = {"building_info": {"country": "Australia", "city": "Geelong"}, "page": "Climate Data"} climate_data.display_climate_input(session_state)