Spaces:
Sleeping
Sleeping
| """ | |
| ASHRAE 169 climate data module for HVAC Load Calculator. | |
| Extracts climate data from EPW files and provides visualizations inspired by Climate Consultant. | |
| Author: Dr Majed Abuseif | |
| Date: May 2025 | |
| Version: 2.1.6 | |
| """ | |
| from typing import Dict, List, Any, Optional | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import json | |
| from dataclasses import dataclass | |
| import streamlit as st | |
| import plotly.graph_objects as go | |
| from io import StringIO | |
| import pvlib | |
| from datetime import datetime, timedelta | |
| import re | |
| import logging | |
| from os.path import join as os_join | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Define paths at module level | |
| AU_CCH_DIR = "au_cch" # Relative path to au_cch folder from climate_data.py in data/ (e.g., au_cch/1/RCP2.6/2070/) | |
| # CSS for consistent formatting | |
| STYLE = """ | |
| <style> | |
| .markdown-text { | |
| font-family: Roboto, sans-serif; | |
| font-size: 14px; | |
| line-height: 1.5; | |
| margin-bottom: 20px; | |
| } | |
| .markdown-text h3 { | |
| font-size: 18px; | |
| font-weight: bold; | |
| margin-top: 20px; | |
| margin-bottom: 10px; | |
| } | |
| .markdown-text ul { | |
| list-style-type: disc; | |
| padding-left: 20px; | |
| margin: 0; | |
| } | |
| .markdown-text li { | |
| margin-bottom: 8px; | |
| } | |
| .markdown-text strong { | |
| font-weight: bold; | |
| } | |
| </style> | |
| """ | |
| # Location mapping from provided list | |
| LOCATION_MAPPING = { | |
| "24": {"city": "Canberra", "state": "ACT"}, | |
| "11": {"city": "Coffs Harbour", "state": "NSW"}, | |
| "17": {"city": "Sydney RO (Observatory Hill)", "state": "NSW"}, | |
| "56": {"city": "Mascot (Sydney Airport)", "state": "NSW"}, | |
| "77": {"city": "Parramatta", "state": "NSW"}, | |
| "78": {"city": "Sub-Alpine (Cooma Airport)", "state": "NSW"}, | |
| "79": {"city": "Blue Mountains", "state": "NSW"}, | |
| "1": {"city": "Darwin", "state": "NT"}, | |
| "6": {"city": "Alice Springs", "state": "NT"}, | |
| "5": {"city": "Townsville", "state": "QLD"}, | |
| "7": {"city": "Rockhampton", "state": "QLD"}, | |
| "10": {"city": "Brisbane", "state": "QLD"}, | |
| "19": {"city": "Charleville", "state": "QLD"}, | |
| "32": {"city": "Cairns", "state": "QLD"}, | |
| "70": {"city": "Toowoomba", "state": "QLD"}, | |
| "16": {"city": "Adelaide", "state": "SA"}, | |
| "75": {"city": "Adelaide Coastal (AMO)", "state": "SA"}, | |
| "26": {"city": "Hobart", "state": "TAS"}, | |
| "21": {"city": "Melbourne RO", "state": "VIC"}, | |
| "27": {"city": "Mildura", "state": "VIC"}, | |
| "60": {"city": "Tullamarine (Melbourne Airport)", "state": "VIC"}, | |
| "63": {"city": "Warrnambool", "state": "VIC"}, | |
| "66": {"city": "Ballarat", "state": "VIC"}, | |
| "30": {"city": "Wyndham", "state": "WA"}, | |
| "52": {"city": "Swanbourne", "state": "WA"}, | |
| "58": {"city": "Albany", "state": "WA"}, | |
| "83": {"city": "Christmas Island", "state": "WA"} | |
| } | |
| class ClimateLocation: | |
| """Class representing a climate location with ASHRAE 169 data derived from EPW files.""" | |
| id: str | |
| country: str | |
| state_province: str | |
| city: str | |
| latitude: float | |
| longitude: float | |
| elevation: float # meters | |
| time_zone: float # UTC offset in hours | |
| climate_zone: str | |
| heating_degree_days: float # base 18°C | |
| cooling_degree_days: float # base 18°C | |
| winter_design_temp: float # 99.6% heating design temperature (°C) | |
| summer_design_temp_db: float # 0.4% cooling design dry-bulb temperature (°C) | |
| summer_design_temp_wb: float # 0.4% cooling design wet-bulb temperature (°C) | |
| summer_daily_range: float # Mean daily temperature range in summer (°C) | |
| wind_speed: float # Mean wind speed (m/s) | |
| pressure: float # Mean atmospheric pressure (Pa) | |
| hourly_data: List[Dict] # Hourly data for integration with main.py | |
| typical_extreme_periods: Dict[str, Dict] # Typical/extreme periods (summer/winter) | |
| ground_temperatures: Dict[str, List[float]] # Monthly ground temperatures by depth | |
| def __init__(self, epw_file: pd.DataFrame, typical_extreme_periods: Dict, ground_temperatures: Dict, **kwargs): | |
| """Initialize ClimateLocation with EPW file data and header information.""" | |
| self.id = kwargs.get("id") | |
| self.country = kwargs.get("country") | |
| self.state_province = kwargs.get("state_province", "N/A") | |
| self.city = kwargs.get("city") | |
| self.latitude = kwargs.get("latitude") | |
| self.longitude = kwargs.get("longitude") | |
| self.elevation = kwargs.get("elevation") | |
| self.time_zone = kwargs.get("time_zone", 0.0) # Default to 0.0 if not provided | |
| self.climate_zone = kwargs.get("climate_zone", "Unknown") # Use provided climate_zone | |
| self.typical_extreme_periods = typical_extreme_periods | |
| self.ground_temperatures = ground_temperatures | |
| # Extract columns from EPW data | |
| months = pd.to_numeric(epw_file[1], errors='coerce').values | |
| days = pd.to_numeric(epw_file[2], errors='coerce').values | |
| hours = pd.to_numeric(epw_file[3], errors='coerce').values | |
| dry_bulb = pd.to_numeric(epw_file[6], errors='coerce').values | |
| humidity = pd.to_numeric(epw_file[8], errors='coerce').values | |
| pressure = pd.to_numeric(epw_file[9], errors='coerce').values | |
| global_radiation = pd.to_numeric(epw_file[13], errors='coerce').values | |
| direct_normal_radiation = pd.to_numeric(epw_file[14], errors='coerce').values | |
| diffuse_horizontal_radiation = pd.to_numeric(epw_file[15], errors='coerce').values | |
| wind_direction = pd.to_numeric(epw_file[20], errors='coerce').values | |
| wind_speed = pd.to_numeric(epw_file[21], errors='coerce').values | |
| # Filter wind speed outliers and log high values | |
| wind_speed = wind_speed[wind_speed <= 50] # Remove extreme outliers | |
| if (wind_speed > 15).any(): | |
| logger.warning(f"High wind speeds detected: {wind_speed[wind_speed > 15].tolist()}") | |
| # Calculate wet-bulb temperature | |
| wet_bulb = ClimateData.calculate_wet_bulb(dry_bulb, humidity) | |
| # Calculate design conditions | |
| self.winter_design_temp = round(np.nanpercentile(dry_bulb, 0.4), 1) | |
| self.summer_design_temp_db = round(np.nanpercentile(dry_bulb, 99.6), 1) | |
| self.summer_design_temp_wb = round(np.nanpercentile(wet_bulb, 99.6), 1) | |
| # Calculate degree days | |
| daily_temps = np.nanmean(dry_bulb.reshape(-1, 24), axis=1) | |
| self.heating_degree_days = round(np.nansum(np.maximum(18 - daily_temps, 0))) | |
| self.cooling_degree_days = round(np.nansum(np.maximum(daily_temps - 18, 0))) | |
| # Calculate summer daily temperature range (June–August, Southern Hemisphere) | |
| summer_mask = (months >= 6) & (months <= 8) | |
| summer_temps = dry_bulb[summer_mask].reshape(-1, 24) | |
| self.summer_daily_range = round(np.nanmean(np.nanmax(summer_temps, axis=1) - np.nanmin(summer_temps, axis=1)), 1) | |
| # Calculate mean wind speed and pressure | |
| self.wind_speed = round(np.nanmean(wind_speed), 1) | |
| self.pressure = round(np.nanmean(pressure), 1) | |
| # Log wind speed diagnostics | |
| logger.info(f"Wind speed stats: min={wind_speed.min():.1f}, max={wind_speed.max():.1f}, mean={self.wind_speed:.1f}") | |
| # Store hourly data with enhanced fields | |
| self.hourly_data = [] | |
| for i in range(len(months)): | |
| if np.isnan(months[i]) or np.isnan(days[i]) or np.isnan(hours[i]) or np.isnan(dry_bulb[i]): | |
| continue # Skip records with missing critical fields | |
| record = { | |
| "month": int(months[i]), | |
| "day": int(days[i]), | |
| "hour": int(hours[i]), | |
| "dry_bulb": float(dry_bulb[i]), | |
| "relative_humidity": float(humidity[i]) if not np.isnan(humidity[i]) else 0.0, | |
| "atmospheric_pressure": float(pressure[i]) if not np.isnan(pressure[i]) else self.pressure, | |
| "global_horizontal_radiation": float(global_radiation[i]) if not np.isnan(global_radiation[i]) else 0.0, | |
| "direct_normal_radiation": float(direct_normal_radiation[i]) if not np.isnan(direct_normal_radiation[i]) else 0.0, | |
| "diffuse_horizontal_radiation": float(diffuse_horizontal_radiation[i]) if not np.isnan(diffuse_horizontal_radiation[i]) else 0.0, | |
| "wind_speed": float(wind_speed[i]) if not np.isnan(wind_speed[i]) else 0.0, | |
| "wind_direction": float(wind_direction[i]) if not np.isnan(wind_direction[i]) else 0.0 | |
| } | |
| self.hourly_data.append(record) | |
| if len(self.hourly_data) != 8760: | |
| st.warning(f"Hourly data has {len(self.hourly_data)} records instead of 8760. Some records may have been excluded due to missing data.") | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert the climate location to a dictionary.""" | |
| return { | |
| "id": self.id, | |
| "country": self.country, | |
| "state_province": self.state_province, | |
| "city": self.city, | |
| "latitude": self.latitude, | |
| "longitude": self.longitude, | |
| "elevation": self.elevation, | |
| "time_zone": self.time_zone, | |
| "climate_zone": self.climate_zone, | |
| "heating_degree_days": self.heating_degree_days, | |
| "cooling_degree_days": self.cooling_degree_days, | |
| "winter_design_temp": self.winter_design_temp, | |
| "summer_design_temp_db": self.summer_design_temp_db, | |
| "summer_design_temp_wb": self.summer_design_temp_wb, | |
| "summer_daily_range": self.summer_daily_range, | |
| "wind_speed": self.wind_speed, | |
| "pressure": self.pressure, | |
| "hourly_data": self.hourly_data, | |
| "typical_extreme_periods": self.typical_extreme_periods, | |
| "ground_temperatures": self.ground_temperatures | |
| } | |
| class ClimateData: | |
| """Class for managing ASHRAE 169 climate data from EPW files.""" | |
| def __init__(self): | |
| """Initialize climate data.""" | |
| self.locations = {} | |
| self.countries = [] | |
| self.country_states = {} | |
| def add_location(self, location: ClimateLocation): | |
| """Add a new location to the dictionary.""" | |
| self.locations[location.id] = location | |
| self.countries = sorted(list(set(loc.country for loc in self.locations.values()))) | |
| self.country_states = self._group_locations_by_country_state() | |
| def _group_locations_by_country_state(self) -> Dict[str, Dict[str, List[str]]]: | |
| """Group locations by country and state/province.""" | |
| result = {} | |
| for loc in self.locations.values(): | |
| if loc.country not in result: | |
| result[loc.country] = {} | |
| if loc.state_province not in result[loc.country]: | |
| result[loc.country][loc.state_province] = [] | |
| result[loc.country][loc.state_province].append(loc.city) | |
| for country in result: | |
| for state in result[country]: | |
| result[country][state] = sorted(result[country][state]) | |
| return result | |
| def get_location_by_id(self, location_id: str, session_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Retrieve climate data by ID from session state or locations.""" | |
| if "climate_data" in session_state and session_state["climate_data"].get("id") == location_id: | |
| return session_state["climate_data"] | |
| if location_id in self.locations: | |
| return self.locations[location_id].to_dict() | |
| return None | |
| def validate_climate_data(data: Dict[str, Any]) -> bool: | |
| """Validate climate data for required fields and ranges.""" | |
| required_fields = [ | |
| "id", "country", "city", "latitude", "longitude", "elevation", "time_zone", | |
| "climate_zone", "heating_degree_days", "cooling_degree_days", | |
| "winter_design_temp", "summer_design_temp_db", "summer_design_temp_wb", | |
| "summer_daily_range", "wind_speed", "pressure", "hourly_data" | |
| ] | |
| for field in required_fields: | |
| if field not in data: | |
| st.error(f"Validation failed: Missing required field '{field}'") | |
| logger.warning(f"Validation failed: Missing field '{field}'") | |
| return False | |
| if not (-90 <= data["latitude"] <= 90 and -180 <= data["longitude"] <= 180): | |
| st.error("Validation failed: Invalid latitude or longitude") | |
| logger.warning("Validation failed: Invalid latitude or longitude") | |
| return False | |
| if data["elevation"] < 0: | |
| st.error("Validation failed: Negative elevation") | |
| logger.warning("Validation failed: Negative elevation") | |
| return False | |
| if not (-12 <= data["time_zone"] <= 14): | |
| st.error(f"Validation failed: Time zone {data['time_zone']} outside range (-12 to +14)") | |
| logger.warning(f"Validation failed: Time zone {data['time_zone']} outside range") | |
| return False | |
| if data["climate_zone"] not in ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"]: | |
| st.error(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") | |
| logger.warning(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") | |
| return False | |
| if not (data["heating_degree_days"] >= 0 and data["cooling_degree_days"] >= 0): | |
| st.error("Validation failed: Negative degree days") | |
| logger.warning("Validation failed: Negative degree days") | |
| return False | |
| if not (-50 <= data["winter_design_temp"] <= 20): | |
| st.error(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") | |
| logger.warning(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") | |
| return False | |
| if not (0 <= data["summer_design_temp_db"] <= 50 and 0 <= data["summer_design_temp_wb"] <= 40): | |
| st.error("Validation failed: Invalid summer design temperatures") | |
| logger.warning("Validation failed: Invalid summer design temperatures") | |
| return False | |
| if data["summer_daily_range"] < 0: | |
| st.error("Validation failed: Negative summer daily range") | |
| logger.warning("Validation failed: Negative summer daily range") | |
| return False | |
| if not (0 <= data["wind_speed"] <= 30): | |
| st.error(f"Validation failed: Wind speed {data['wind_speed']} outside range") | |
| logger.warning(f"Validation failed: Wind speed {data['wind_speed']} outside range") | |
| return False | |
| if not (80000 <= data["pressure"] <= 110000): | |
| st.error(f"Validation failed: Pressure {data['pressure']} outside range") | |
| logger.warning(f"Validation failed: Pressure {data['pressure']} outside range") | |
| return False | |
| if not data["hourly_data"] or len(data["hourly_data"]) < 8700: | |
| st.error(f"Validation failed: Hourly data has {len(data['hourly_data'])} records, expected ~8760") | |
| logger.warning(f"Validation failed: Hourly data has {len(data['hourly_data'])} records") | |
| return False | |
| for record in data["hourly_data"]: | |
| if not (1 <= record["month"] <= 12): | |
| st.error(f"Validation failed: Invalid month {record['month']}") | |
| logger.warning(f"Validation failed: Invalid month {record['month']}") | |
| return False | |
| if not (1 <= record["day"] <= 31): | |
| st.error(f"Validation failed: Invalid day {record['day']}") | |
| logger.warning(f"Validation failed: Invalid day {record['day']}") | |
| return False | |
| if not (1 <= record["hour"] <= 24): | |
| st.error(f"Validation failed: Invalid hour {record['hour']}") | |
| logger.warning(f"Validation failed: Invalid hour {record['hour']}") | |
| return False | |
| if not (-50 <= record["dry_bulb"] <= 50): | |
| st.error(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") | |
| logger.warning(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") | |
| return False | |
| if not (0 <= record["relative_humidity"] <= 100): | |
| st.error(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") | |
| logger.warning(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") | |
| return False | |
| if not (80000 <= record["atmospheric_pressure"] <= 110000): | |
| st.error(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") | |
| logger.warning(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") | |
| return False | |
| if not (0 <= record["global_horizontal_radiation"] <= 1200): | |
| st.error(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") | |
| logger.warning(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") | |
| return False | |
| if not (0 <= record["direct_normal_radiation"] <= 1200): | |
| st.error(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") | |
| logger.warning(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") | |
| return False | |
| if not (0 <= record["diffuse_horizontal_radiation"] <= 1200): | |
| st.error(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") | |
| logger.warning(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") | |
| return False | |
| if not (0 <= record["wind_speed"] <= 30): | |
| st.error(f"Validation failed: Wind speed {record['wind_speed']} outside range") | |
| logger.warning(f"Validation failed: Wind speed {record['wind_speed']} outside range") | |
| return False | |
| if not (0 <= record["wind_direction"] <= 360): | |
| st.error(f"Validation failed: Wind direction {record['wind_direction']} outside range") | |
| logger.warning(f"Validation failed: Wind direction {record['wind_direction']} outside range") | |
| return False | |
| # Validate typical/extreme periods (optional) | |
| if "typical_extreme_periods" in data and data["typical_extreme_periods"]: | |
| expected_periods = ["summer_extreme", "summer_typical", "winter_extreme", "winter_typical"] | |
| missing_periods = [p for p in expected_periods if p not in data["typical_extreme_periods"]] | |
| if missing_periods: | |
| st.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") | |
| logger.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") | |
| for period in data["typical_extreme_periods"].values(): | |
| for date in ["start", "end"]: | |
| if not (1 <= period[date]["month"] <= 12 and 1 <= period[date]["day"] <= 31): | |
| st.error(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") | |
| logger.warning(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") | |
| return False | |
| # Validate ground temperatures (optional) | |
| if "ground_temperatures" in data and data["ground_temperatures"]: | |
| for depth, temps in data["ground_temperatures"].items(): | |
| if len(temps) != 12 or not all(0 <= t <= 50 for t in temps): | |
| st.error(f"Validation failed: Invalid ground temperatures for depth {depth}") | |
| logger.warning(f"Validation failed: Invalid ground temperatures for depth {depth}") | |
| return False | |
| return True | |
| def calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray: | |
| """Calculate Wet Bulb Temperature using Stull (2011) approximation.""" | |
| db = np.array(dry_bulb, dtype=float) | |
| rh = np.array(relative_humidity, dtype=float) | |
| term1 = db * np.arctan(0.151977 * (rh + 8.313659)**0.5) | |
| term2 = np.arctan(db + rh) | |
| term3 = np.arctan(rh - 1.676331) | |
| term4 = 0.00391838 * rh**1.5 * np.arctan(0.023101 * rh) | |
| term5 = -4.686035 | |
| wet_bulb = term1 + term2 - term3 + term4 + term5 | |
| invalid_mask = (rh < 5) | (rh > 99) | (db < -20) | (db > 50) | np.isnan(db) | np.isnan(rh) | |
| wet_bulb[invalid_mask] = np.nan | |
| return wet_bulb | |
| def is_numeric(value: str) -> bool: | |
| """Check if a string can be converted to a number.""" | |
| try: | |
| float(value) | |
| return True | |
| except ValueError: | |
| return False | |
| def get_locations_by_state(self, state: str) -> List[Dict[str, str]]: | |
| """Get list of locations for a given state from LOCATION_MAPPING.""" | |
| return [ | |
| {"number": loc_num, "city": loc_info["city"]} | |
| for loc_num, loc_info in LOCATION_MAPPING.items() | |
| if loc_info["state"] == state | |
| ] | |
| def process_epw_file(self, epw_content: str, location_num: str, rcp: str, year: str) -> Optional[ClimateLocation]: | |
| """Process an EPW file content and return a ClimateLocation object.""" | |
| try: | |
| epw_lines = epw_content.splitlines() | |
| # Parse header | |
| header = next(line for line in epw_lines if line.startswith("LOCATION")) | |
| header_parts = header.split(",") | |
| if len(header_parts) < 10: | |
| raise ValueError("Invalid LOCATION header: too few fields.") | |
| city = header_parts[1].strip() or "Unknown" | |
| city = re.sub(r'\..*', '', city) # Clean city name | |
| state_province = header_parts[2].strip() or "Unknown" | |
| country = header_parts[3].strip() or "Unknown" | |
| latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0 | |
| longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0 | |
| time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0 | |
| elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0 | |
| logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s", | |
| city, country, latitude, longitude, time_zone, elevation) | |
| # Override city and state from LOCATION_MAPPING | |
| if location_num in LOCATION_MAPPING: | |
| city = LOCATION_MAPPING[location_num]["city"] | |
| state_province = LOCATION_MAPPING[location_num]["state"] | |
| # Parse TYPICAL/EXTREME PERIODS | |
| typical_extreme_periods = {} | |
| date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' | |
| for line in epw_lines: | |
| if line.startswith("TYPICAL/EXTREME PERIODS"): | |
| parts = line.strip().split(',') | |
| try: | |
| num_periods = int(parts[1]) | |
| except ValueError: | |
| st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") | |
| break | |
| for i in range(num_periods): | |
| try: | |
| if len(parts) < 2 + i*4 + 4: | |
| st.warning(f"Insufficient fields for period {i+1}, skipping.") | |
| continue | |
| period_name = parts[2 + i*4] | |
| period_type = parts[3 + i*4] | |
| start_date = parts[4 + i*4].strip() | |
| end_date = parts[5 + i*4].strip() | |
| if period_name in [ | |
| "Summer - Week Nearest Max Temperature For Period", | |
| "Summer - Week Nearest Average Temperature For Period", | |
| "Winter - Week Nearest Min Temperature For Period", | |
| "Winter - Week Nearest Average Temperature For Period" | |
| ]: | |
| season = 'summer' if 'Summer' in period_name else 'winter' | |
| period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') | |
| key = f"{season}_{period_type}" | |
| start_date_clean = re.sub(r'\s+', '', start_date) | |
| end_date_clean = re.sub(r'\s+', '', end_date) | |
| if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): | |
| st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") | |
| continue | |
| start_month, start_day = map(int, start_date_clean.split('/')) | |
| end_month, end_day = map(int, end_date_clean.split('/')) | |
| typical_extreme_periods[key] = { | |
| "start": {"month": start_month, "day": start_day}, | |
| "end": {"month": end_month, "day": end_day} | |
| } | |
| except (IndexError, ValueError) as e: | |
| st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") | |
| continue | |
| break | |
| # Parse GROUND TEMPERATURES | |
| ground_temperatures = {} | |
| for line in epw_lines: | |
| if line.startswith("GROUND TEMPERATURES"): | |
| parts = line.strip().split(',') | |
| try: | |
| num_depths = int(parts[1]) | |
| except ValueError: | |
| st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") | |
| break | |
| for i in range(num_depths): | |
| try: | |
| if len(parts) < 2 + i*16 + 16: | |
| st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") | |
| continue | |
| depth = parts[2 + i*16] | |
| temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] | |
| if len(temps) != 12: | |
| st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") | |
| continue | |
| ground_temperatures[depth] = temps | |
| except (ValueError, IndexError) as e: | |
| st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") | |
| continue | |
| break | |
| # Read data section | |
| data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 | |
| epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) | |
| if len(epw_data) != 8760: | |
| raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") | |
| if len(epw_data.columns) not in [32, 35]: | |
| raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") | |
| for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: | |
| epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') | |
| if epw_data[col].isna().all(): | |
| raise ValueError(f"Column {col} contains only non-numeric or missing data.") | |
| # Calculate average humidity for climate zone assignment | |
| humidity = pd.to_numeric(epw_data[8], errors='coerce').values | |
| avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0 | |
| logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country) | |
| # Create ClimateLocation | |
| location = ClimateLocation( | |
| epw_file=epw_data, | |
| typical_extreme_periods=typical_extreme_periods, | |
| ground_temperatures=ground_temperatures, | |
| id=f"{country[:1].upper()}{city[:3].upper()}_{rcp}_{year}", | |
| country=country, | |
| state_province=state_province, | |
| city=city, | |
| latitude=latitude, | |
| longitude=longitude, | |
| elevation=elevation, | |
| time_zone=time_zone | |
| ) | |
| # Assign climate zone | |
| try: | |
| climate_zone = self.assign_climate_zone( | |
| hdd=location.heating_degree_days, | |
| cdd=location.cooling_degree_days, | |
| avg_humidity=avg_humidity | |
| ) | |
| location.climate_zone = climate_zone | |
| logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country) | |
| except Exception as e: | |
| st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.") | |
| logger.error("Climate zone assignment error: %s", str(e)) | |
| location.climate_zone = "Unknown" | |
| return location | |
| except Exception as e: | |
| st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") | |
| logger.error(f"EPW processing error: %s", str(e)) | |
| return None | |
| def display_climate_input(self, session_state: Dict[str, Any]): | |
| """Display Streamlit interface for EPW upload and visualizations.""" | |
| st.title("Climate Data Analysis") | |
| # Apply consistent styling | |
| st.markdown(STYLE, unsafe_allow_html=True) | |
| # Clear invalid session_state["climate_data"] to prevent validation errors | |
| if "climate_data" in session_state and not all(key in session_state["climate_data"] for key in ["id", "country", "city"]): | |
| logger.warning("Invalid climate_data in session_state, clearing: %s", session_state["climate_data"]) | |
| session_state["climate_data"] = {} | |
| # Initialize active tab in session_state | |
| if "active_tab" not in session_state: | |
| session_state["active_tab"] = "General Information" | |
| # Define tabs, including new Climate Projection tab | |
| tab_names = [ | |
| "General Information", | |
| "Climate Projection", | |
| "Psychrometric Chart", | |
| "Sun Shading Chart", | |
| "Temperature Range", | |
| "Wind Rose" | |
| ] | |
| tabs = st.tabs(tab_names) | |
| # Initialize location and epw_data for display | |
| location = None | |
| epw_data = None | |
| # General Information tab: Handle EPW upload and display existing data | |
| with tabs[0]: | |
| uploaded_file = st.file_uploader("Upload EPW File", type=["epw"]) | |
| if uploaded_file: | |
| with st.spinner("Processing uploaded EPW file..."): | |
| try: | |
| # Process new EPW file | |
| epw_content = uploaded_file.read().decode("utf-8") | |
| epw_lines = epw_content.splitlines() | |
| # Parse header | |
| header = next(line for line in epw_lines if line.startswith("LOCATION")) | |
| header_parts = header.split(",") | |
| if len(header_parts) < 10: | |
| raise ValueError("Invalid LOCATION header: too few fields.") | |
| city = header_parts[1].strip() or "Unknown" | |
| city = re.sub(r'\..*', '', city) | |
| state_province = header_parts[2].strip() or "Unknown" | |
| country = header_parts[3].strip() or "Unknown" | |
| latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0 | |
| longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0 | |
| time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0 | |
| elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0 | |
| logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s", | |
| city, country, latitude, longitude, time_zone, elevation) | |
| # Parse TYPICAL/EXTREME PERIODS | |
| typical_extreme_periods = {} | |
| date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' | |
| for line in epw_lines: | |
| if line.startswith("TYPICAL/EXTREME PERIODS"): | |
| parts = line.strip().split(',') | |
| try: | |
| num_periods = int(parts[1]) | |
| except ValueError: | |
| st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") | |
| break | |
| for i in range(num_periods): | |
| try: | |
| if len(parts) < 2 + i*4 + 4: | |
| st.warning(f"Insufficient fields for period {i+1}, skipping.") | |
| continue | |
| period_name = parts[2 + i*4] | |
| period_type = parts[3 + i*4] | |
| start_date = parts[4 + i*4].strip() | |
| end_date = parts[5 + i*4].strip() | |
| if period_name in [ | |
| "Summer - Week Nearest Max Temperature For Period", | |
| "Summer - Week Nearest Average Temperature For Period", | |
| "Winter - Week Nearest Min Temperature For Period", | |
| "Winter - Week Nearest Average Temperature For Period" | |
| ]: | |
| season = 'summer' if 'Summer' in period_name else 'winter' | |
| period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') | |
| key = f"{season}_{period_type}" | |
| start_date_clean = re.sub(r'\s+', '', start_date) | |
| end_date_clean = re.sub(r'\s+', '', end_date) | |
| if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): | |
| st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") | |
| continue | |
| start_month, start_day = map(int, start_date_clean.split('/')) | |
| end_month, end_day = map(int, end_date_clean.split('/')) | |
| typical_extreme_periods[key] = { | |
| "start": {"month": start_month, "day": start_day}, | |
| "end": {"month": end_month, "day": end_day} | |
| } | |
| except (IndexError, ValueError) as e: | |
| st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") | |
| continue | |
| break | |
| # Parse GROUND TEMPERATURES | |
| ground_temperatures = {} | |
| for line in epw_lines: | |
| if line.startswith("GROUND TEMPERATURES"): | |
| parts = line.strip().split(',') | |
| try: | |
| num_depths = int(parts[1]) | |
| except ValueError: | |
| st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") | |
| break | |
| for i in range(num_depths): | |
| try: | |
| if len(parts) < 2 + i*16 + 16: | |
| st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") | |
| continue | |
| depth = parts[2 + i*16] | |
| temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] | |
| if len(temps) != 12: | |
| st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") | |
| continue | |
| ground_temperatures[depth] = temps | |
| except (ValueError, IndexError) as e: | |
| st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") | |
| continue | |
| break | |
| # Read data section | |
| data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 | |
| epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) | |
| if len(epw_data) != 8760: | |
| raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") | |
| if len(epw_data.columns) != 35: | |
| raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") | |
| for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: | |
| epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') | |
| if epw_data[col].isna().all(): | |
| raise ValueError(f"Column {col} contains only non-numeric or missing data.") | |
| # Calculate average humidity for climate zone assignment | |
| humidity = pd.to_numeric(epw_data[8], errors='coerce').values | |
| avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0 | |
| logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country) | |
| # Create ClimateLocation with consistent ID | |
| location = ClimateLocation( | |
| epw_file=epw_data, | |
| typical_extreme_periods=typical_extreme_periods, | |
| ground_temperatures=ground_temperatures, | |
| id=f"{country[:1].upper()}{city[:3].upper()}_UPLOAD", | |
| country=country, | |
| state_province=state_province, | |
| city=city, | |
| latitude=latitude, | |
| longitude=longitude, | |
| elevation=elevation, | |
| time_zone=time_zone | |
| ) | |
| # Assign climate zone | |
| try: | |
| climate_zone = self.assign_climate_zone( | |
| hdd=location.heating_degree_days, | |
| cdd=location.cooling_degree_days, | |
| avg_humidity=avg_humidity | |
| ) | |
| location.climate_zone = climate_zone | |
| logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country) | |
| except Exception as e: | |
| st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.") | |
| logger.error("Climate zone assignment error: %s", str(e)) | |
| location.climate_zone = "Unknown" | |
| self.add_location(location) | |
| climate_data_dict = location.to_dict() | |
| session_state["climate_data"] = climate_data_dict | |
| if not self.validate_climate_data(climate_data_dict): | |
| st.warning(f"Climate data validation failed for {city}, {country}. Displaying data anyway.") | |
| logger.warning("Validation failed for new EPW data: %s", climate_data_dict["id"]) | |
| st.success("Climate data extracted from EPW file!") | |
| logger.info("Successfully processed EPW file and stored in session_state: %s", climate_data_dict["id"]) | |
| session_state["active_tab"] = "General Information" | |
| except Exception as e: | |
| st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") | |
| logger.error(f"EPW processing error: %s", str(e)) | |
| session_state["climate_data"] = {} | |
| elif "climate_data" in session_state and session_state["climate_data"]: | |
| # Reconstruct from session_state | |
| climate_data_dict = session_state["climate_data"] | |
| logger.info("Attempting to reconstruct climate data from session_state: %s", climate_data_dict.get("id", "Unknown")) | |
| required_keys = ["id", "country", "city", "latitude", "longitude", "elevation", "time_zone", "climate_zone", "hourly_data"] | |
| missing_keys = [key for key in required_keys if key not in climate_data_dict] | |
| if missing_keys: | |
| st.warning(f"Invalid climate data in session state, missing keys: {', '.join(missing_keys)}. Please upload a new EPW file.") | |
| logger.warning("Missing keys in session_state.climate_data: %s", missing_keys) | |
| session_state["climate_data"] = {} | |
| else: | |
| if not self.validate_climate_data(climate_data_dict): | |
| st.warning(f"Stored climate data validation failed for {climate_data_dict.get('city', 'Unknown')}, {climate_data_dict.get('country', 'Unknown')}. Displaying data anyway.") | |
| logger.warning("Validation failed for session_state.climate_data: %s", climate_data_dict.get("id", "Unknown")) | |
| try: | |
| # Rebuild epw_data from hourly_data | |
| hourly_data = climate_data_dict["hourly_data"] | |
| epw_data = pd.DataFrame(np.nan, index=range(len(hourly_data)), columns=range(35)) | |
| epw_data[1] = [d["month"] for d in hourly_data] | |
| epw_data[2] = [d["day"] for d in hourly_data] | |
| epw_data[3] = [d["hour"] for d in hourly_data] | |
| epw_data[6] = [d["dry_bulb"] for d in hourly_data] | |
| epw_data[8] = [d["relative_humidity"] for d in hourly_data] | |
| epw_data[9] = [d["atmospheric_pressure"] for d in hourly_data] | |
| epw_data[13] = [d["global_horizontal_radiation"] for d in hourly_data] | |
| epw_data[14] = [d["direct_normal_radiation"] for d in hourly_data] | |
| epw_data[15] = [d["diffuse_horizontal_radiation"] for d in hourly_data] | |
| epw_data[20] = [d["wind_direction"] for d in hourly_data] | |
| epw_data[21] = [d["wind_speed"] for d in hourly_data] | |
| # Create ClimateLocation | |
| location = ClimateLocation( | |
| epw_file=epw_data, | |
| typical_extreme_periods=climate_data_dict.get("typical_extreme_periods", {}), | |
| ground_temperatures=climate_data_dict.get("ground_temperatures", {}), | |
| id=climate_data_dict["id"], | |
| country=climate_data_dict["country"], | |
| state_province=climate_data_dict.get("state_province", "N/A"), | |
| city=climate_data_dict["city"], | |
| latitude=climate_data_dict["latitude"], | |
| longitude=climate_data_dict["longitude"], | |
| elevation=climate_data_dict["elevation"], | |
| time_zone=climate_data_dict["time_zone"], | |
| climate_zone=climate_data_dict["climate_zone"] | |
| ) | |
| location.hourly_data = climate_data_dict["hourly_data"] | |
| self.add_location(location) | |
| st.info(f"Displaying previously extracted climate data for {climate_data_dict['city']}, {climate_data_dict['country']}.") | |
| logger.info("Successfully reconstructed climate data from session_state: %s", climate_data_dict["id"]) | |
| except Exception as e: | |
| st.error(f"Error reconstructing climate data: {str(e)}. Please upload a new EPW file.") | |
| logger.error(f"Reconstruction error: %s", str(e)) | |
| session_state["climate_data"] = {} | |
| # Display data if available | |
| if location is not None and epw_data is not None: | |
| self.display_design_conditions(location) | |
| # Climate Projection tab | |
| with tabs[1]: | |
| st.markdown(""" | |
| <div class="markdown-text"> | |
| <h3>Climate Projection</h3> | |
| <p>At this stage, this section is focused on some locations in Australia, and the provided data is based on "Projected weather files for building energy modelling" from CSIRO 2022.</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Dropdown menus | |
| country = st.selectbox("Country", ["Australia"], key="projection_country") | |
| states = ["ACT", "NSW", "NT", "QLD", "SA", "TAS", "VIC", "WA"] | |
| state = st.selectbox("State", states, key="projection_state") | |
| # Get locations for selected state | |
| locations = self.get_locations_by_state(state) | |
| location_options = [f"{loc['city']} ({loc['number']})" for loc in locations] | |
| location_display = st.selectbox("Location", location_options, key="location") | |
| # Extract location number from selection | |
| location_num = "" | |
| if location_display: | |
| location_num = next(loc["number"] for loc in locations if f"{loc['city']} ({loc['number']})" == location_display) | |
| rcp_options = ["RCP2.6", "RCP4.5", "RCP8.5"] | |
| rcp = st.selectbox("RCP Scenario", rcp_options, key="rcp") | |
| year_options = ["2030", "2050", "2070", "2090"] | |
| year = st.selectbox("Year", year_options, key="year") | |
| if st.button("Extract Data"): | |
| with st.spinner("Extracting climate projection data..."): | |
| # Log AU_CCH_DIR for debugging | |
| logger.debug(f"AU_CCH_DIR set to: {os.path.abspath(AU_CCH_DIR)}") | |
| # Construct file path | |
| file_path = os_join(AU_CCH_DIR, location_num, rcp, year) | |
| logger.debug(f"Attempting to access directory: {os.path.abspath(file_path)}") | |
| if not os.path.exists(file_path): | |
| st.error(f"No directory found at au_cch/{location_num}/{rcp}/{year}/. In the Hugging Face Space 'mabuseif/Update-materials-solar', ensure the 'au_cch' folder is in the repository root alongside 'data' and 'utils', with the structure au_cch/{location_num}/{rcp}/{year} (e.g., au_cch/1/RCP2.6/2070/) containing a single .epw file (e.g., adelaide_rcp2.6_2070.epw).") | |
| logger.error(f"Directory does not exist: {file_path}") | |
| else: | |
| try: | |
| epw_files = [f for f in os.listdir(file_path) if f.endswith(".epw")] | |
| if not epw_files: | |
| st.error(f"No EPW file found in au_cch/{location_num}/{rcp}/{year}/. Please check that au_cch/{location_num}/{rcp}/{year}/ (e.g., au_cch/1/RCP2.6/{year}/) contains a single file with a .epw extension (e.g., adelaide_rcp2.6_2070.epw).") | |
| logger.error(f"No EPW file found in {file_path}") | |
| elif len(epw_files) > 1: | |
| st.error(f"Multiple EPW files found in au_cch/{location_num}/{rcp}/{year}/: {epw_files}. Please ensure exactly one .epw file per directory (e.g., au_cch/1/RCP2.6/{year}/).") | |
| logger.error(f"Multiple EPW files found: {epw_files}") | |
| else: | |
| epw_file_path = os_join(file_path, epw_files[0]) | |
| try: | |
| with open(epw_file_path, 'r') as f: | |
| epw_content = f.read() | |
| location = self.process_epw_file(epw_content, location_num, rcp, year) | |
| if location: | |
| self.add_location(location) | |
| climate_data_dict = location.to_dict() | |
| if self.validate_climate_data(climate_data_dict): | |
| session_state["climate_data"] = climate_data_dict | |
| st.success(f"Successfully extracted climate projection data for {location.city}, {location.country}, {rcp}, {year}!") | |
| logger.info(f"Successful processing projection of {climate_data_dict['id']}") | |
| session_state["active_tab"] = "General Information" | |
| # Set location and epw_data for immediate display | |
| epw_data = pd.DataFrame(np.nan, index=range(len(climate_data_dict["hourly_data"])), columns=range(35)) | |
| epw_data[1] = [d["month"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[2] = [d["day"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[3] = [d["hour"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[6] = [d["dry_bulb"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[8] = [d["relative_humidity"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[9] = [d["atmospheric_pressure"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[13] = [d["global_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[14] = [d["direct_normal_radiation"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[15] = [d["diffuse_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[20] = [d["wind_direction"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[21] = [d["wind_speed"] for d in climate_data_dict["hourly_data"]] | |
| else: | |
| st.warning(f"Climate projection data validation failed for {location.city}, {location.country}. Displaying data anyway.") | |
| logger.warning(f"Validation failed for {climate_data_dict['id']}") | |
| session_state["climate_data"] = climate_data_dict | |
| session_state["active_tab"] = "General Information" | |
| # Set location and epw_data for immediate display | |
| epw_data = pd.DataFrame(np.nan, index=range(len(climate_data_dict["hourly_data"])), columns=range(35)) | |
| epw_data[1] = [d["month"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[2] = [d["day"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[3] = [d["hour"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[6] = [d["dry_bulb"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[8] = [d["relative_humidity"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[9] = [d["atmospheric_pressure"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[13] = [d["global_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[14] = [d["direct_normal_radiation"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[15] = [d["diffuse_horizontal_radiation"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[20] = [d["wind_direction"] for d in climate_data_dict["hourly_data"]] | |
| epw_data[21] = [d["wind_speed"] for d in climate_data_dict["hourly_data"]] | |
| except Exception as e: | |
| st.error(f"Error reading {epw_file_path}: {str(e)}") | |
| logger.error(f"Error reading {epw_file_path}: {str(e)}") | |
| session_state["climate_data"] = {} | |
| except Exception as e: | |
| st.error(f"Error accessing directory au_cch/{location_num}/{rcp}/{year}/: {str(e)}") | |
| logger.error(f"Error accessing directory {file_path}: {str(e)}") | |
| # Other tabs | |
| if location is not None and epw_data is not None: | |
| with tabs[2]: | |
| self.plot_psychrometric_chart(location, epw_data) | |
| with tabs[3]: | |
| self.plot_sun_shading_chart(location) | |
| with tabs[4]: | |
| self.plot_temperature_range(location, epw_data) | |
| with tabs[5]: | |
| self.plot_wind_rose(epw_data) | |
| else: | |
| for i in range(2, len(tabs)): | |
| with tabs[i]: | |
| st.info("No climate data available. Please upload an EPW file or select a climate projection to proceed.") | |
| logger.info("No climate data to display in tab %s; prompting for EPW upload.", tab_names[i]) | |
| def display_design_conditions(self, location: ClimateLocation): | |
| """Display design conditions for HVAC calculations using styled HTML.""" | |
| st.subheader("Design Conditions") | |
| # Location Details | |
| st.markdown(f""" | |
| <div class="markdown-text"> | |
| <h3>Location Details</h3> | |
| <ul> | |
| <li><strong>Country:</strong> {location.country}</li> | |
| <li><strong>City:</strong> {location.city}</li> | |
| <li><strong>State/Province:</strong> {location.state_province}</li> | |
| <li><strong>Latitude:</strong> {location.latitude}°</li> | |
| <li><strong>Longitude:</strong> {location.longitude}°</li> | |
| <li><strong>Elevation:</strong> {location.elevation} m</li> | |
| <li><strong>Time Zone:</strong> {location.time_zone} hours (UTC)</li> | |
| </ul> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Calculated Climate Parameters | |
| st.markdown(f""" | |
| <div class="markdown-text"> | |
| <h3>Calculated Climate Parameters</h3> | |
| <ul> | |
| <li><strong>Climate Zone:</strong> {location.climate_zone}</li> | |
| <li><strong>Heating Degree Days (base 18°C):</strong> {location.heating_degree_days} HDD</li> | |
| <li><strong>Cooling Degree Days (base 18°C):</strong> {location.cooling_degree_days} CDD</li> | |
| <li><strong>Winter Design Temperature (99.6%):</strong> {location.winter_design_temp} °C</li> | |
| <li><strong>Summer Design Dry-Bulb Temp (0.4%):</strong> {location.summer_design_temp_db} °C</li> | |
| <li><strong>Summer Design Wet-Bulb Temp (0.4%):</strong> {location.summer_design_temp_wb} °C</li> | |
| <li><strong>Summer Daily Temperature Range:</strong> {location.summer_daily_range} °C</li> | |
| <li><strong>Mean Wind Speed:</strong> {location.wind_speed} m/s</li> | |
| <li><strong>Mean Atmospheric Pressure:</strong> {location.pressure} Pa</li> | |
| </ul> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Typical/Extreme Periods | |
| if location.typical_extreme_periods: | |
| period_items = [ | |
| f"<li><strong>{key.replace('_', ' ').title()}:</strong> {period['start']['month']}/{period['start']['day']} to {period['end']['month']}/{period['end']['day']}</li>" | |
| for key, period in location.typical_extreme_periods.items() | |
| ] | |
| st.markdown(f""" | |
| <div class="markdown-text"> | |
| <h3>Typical/Extreme Periods</h3> | |
| <ul> | |
| {''.join(period_items)} | |
| </ul> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Ground Temperatures (Table) | |
| if location.ground_temperatures: | |
| st.markdown('<div class="markdown-text"><h3>Ground Temperatures</h3></div>', unsafe_allow_html=True) | |
| month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] | |
| table_data = [] | |
| for depth, temps in location.ground_temperatures.items(): | |
| row = {"Depth (m)": float(depth)} | |
| row.update({month: f"{temp:.2f}" for month, temp in zip(month_names, temps)}) | |
| table_data.append(row) | |
| df = pd.DataFrame(table_data) | |
| st.dataframe(df, use_container_width=True) | |
| # Add download button for Ground Temperatures with unique key | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Ground Temperatures as CSV", | |
| data=csv, | |
| file_name=f"ground_temperatures_{location.city}_{location.country}.csv", | |
| mime="text/csv", | |
| key=f"download_ground_temperatures_{location.id}" # Unique key based on location.id | |
| ) | |
| # Hourly Data (Table) | |
| st.markdown('<div class="markdown-text"><h3>Hourly Climate Data</h3></div>', unsafe_allow_html=True) | |
| hourly_table_data = [ | |
| { | |
| "Month": record["month"], | |
| "Day": record["day"], | |
| "Hour": record["hour"], | |
| "Dry Bulb Temp (°C)": f"{record['dry_bulb']:.1f}", | |
| "Relative Humidity (%)": f"{record['relative_humidity']:.1f}", | |
| "Atmospheric Pressure (Pa)": f"{record['atmospheric_pressure']:.1f}", | |
| "Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}", | |
| "Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}", | |
| "Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}", | |
| "Wind Speed (m/s)": f"{record['wind_speed']:.1f}", | |
| "Wind Direction (°)": f"{record['wind_direction']:.1f}" | |
| } | |
| for record in location.hourly_data | |
| ] | |
| hourly_df = pd.DataFrame(hourly_table_data) | |
| st.dataframe(hourly_df, use_container_width=True) | |
| # Add download button for Hourly Climate Data with unique key | |
| csv = hourly_df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Hourly Climate Data as CSV", | |
| data=csv, | |
| file_name=f"hourly_climate_data_{location.city}_{location.country}.csv", | |
| mime="text/csv", | |
| key=f"download_hourly_climate_{location.id}" # Unique key based on location.id | |
| ) | |
| def assign_climate_zone(hdd: float, cdd: float, avg_humidity: float) -> str: | |
| """Assign ASHRAE 169 climate zone based on HDD, CDD, and humidity.""" | |
| if cdd > 10000: | |
| return "0A" if avg_humidity > 60 else "0B" | |
| elif cdd > 5000: | |
| return "1A" if avg_humidity > 60 else "1B" | |
| elif cdd > 2500: | |
| return "2A" if avg_humidity > 60 else "2B" | |
| elif hdd < 2000 and cdd > 1000: | |
| return "3A" if avg_humidity > 60 else "3B" if avg_humidity < 40 else "3C" | |
| elif hdd < 3000: | |
| return "4A" if avg_humidity > 60 else "4B" if avg_humidity < 40 else "4C" | |
| elif hdd < 4000: | |
| return "5A" if avg_humidity > 60 else "5B" if avg_humidity < 40 else "5C" | |
| elif hdd < 5000: | |
| return "6A" if avg_humidity > 60 else "6B" | |
| elif hdd < 7000: | |
| return "7" | |
| else: | |
| return "8" | |
| def plot_psychrometric_chart(self, location: ClimateLocation, epw_data: pd.DataFrame): | |
| """Plot psychrometric chart with ASHRAE 55 comfort zone and psychrometric lines.""" | |
| st.subheader("Psychrometric Chart") | |
| dry_bulb = pd.to_numeric(epw_data[6], errors='coerce').values | |
| humidity = pd.to_numeric(epw_data[8], errors='coerce').values | |
| valid_mask = ~np.isnan(dry_bulb) & ~np.isnan(humidity) | |
| dry_bulb = dry_bulb[valid_mask] | |
| humidity = humidity[valid_mask] | |
| # Calculate humidity ratio (kg/kg dry air) | |
| pressure = location.pressure / 1000 # kPa | |
| saturation_pressure = 6.1078 * 10 ** (7.5 * dry_bulb / (dry_bulb + 237.3)) | |
| vapor_pressure = humidity / 100 * saturation_pressure | |
| humidity_ratio = 0.62198 * vapor_pressure / (pressure - vapor_pressure) * 1000 # Convert to g/kg | |
| fig = go.Figure() | |
| # Hourly data points | |
| fig.add_trace(go.Scatter( | |
| x=dry_bulb, | |
| y=humidity_ratio, | |
| mode='markers', | |
| marker=dict(size=5, opacity=0.5, color='blue'), | |
| name='Hourly Conditions' | |
| )) | |
| # ASHRAE 55 comfort zone | |
| comfort_db = [20, 26, 26, 20, 20] | |
| comfort_rh = [30, 30, 60, 60, 30] | |
| comfort_vp = np.array(comfort_rh) / 100 * 6.1078 * 10 ** (7.5 * np.array(comfort_db) / (np.array(comfort_db) + 237.3)) | |
| comfort_hr = 0.62198 * comfort_vp / (pressure - comfort_vp) * 1000 | |
| fig.add_trace(go.Scatter( | |
| x=comfort_db, | |
| y=comfort_hr, | |
| mode='lines', | |
| line=dict(color='green', width=2), | |
| fill='toself', | |
| fillcolor='rgba(0, 255, 0, 0.2)', | |
| name='ASHRAE 55 Comfort Zone' | |
| )) | |
| # Constant humidity ratio lines | |
| for hr in [5, 10, 15]: | |
| db_range = np.linspace(0, 40, 100) | |
| vp = (hr / 1000 * pressure) / (0.62198 + hr / 1000) | |
| rh = vp / (6.1078 * 10 ** (7.5 * db_range / (db_range + 237.3))) * 100 | |
| hr_line = np.full_like(db_range, hr) | |
| fig.add_trace(go.Scatter( | |
| x=db_range, | |
| y=hr_line, | |
| mode='lines', | |
| line=dict(color='gray', width=1, dash='dash'), | |
| name=f'{hr} g/kg', | |
| showlegend=True | |
| )) | |
| # Constant wet-bulb temperature lines | |
| wet_bulb_temps = [10, 15, 20] | |
| for wbt in wet_bulb_temps: | |
| db_range = np.linspace(0, 40, 100) | |
| rh_range = np.linspace(5, 95, 100) | |
| wb_values = self.calculate_wet_bulb(db_range, rh_range) | |
| vp = rh_range / 100 * (6.1078 * 10 ** (7.5 * db_range / (db_range + 237.3))) | |
| hr_values = 0.62198 * vp / (pressure - vp) * 1000 | |
| mask = (wb_values >= wbt - 0.5) & (wb_values <= wbt + 0.5) | |
| if np.any(mask): | |
| fig.add_trace(go.Scatter( | |
| x=db_range[mask], | |
| y=hr_values[mask], | |
| mode='lines', | |
| line=dict(color='purple', width=1, dash='dot'), | |
| name=f'Wet-Bulb {wbt}°C', | |
| showlegend=True | |
| )) | |
| fig.update_layout( | |
| title="Psychrometric Chart", | |
| xaxis_title="Dry-Bulb Temperature (°C)", | |
| yaxis_title="Humidity Ratio (g/kg dry air)", | |
| xaxis=dict(range=[-5, 40]), | |
| yaxis=dict(range=[0, 25]), | |
| showlegend=True, | |
| template='plotly_white' | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def plot_sun_shading_chart(self, location: ClimateLocation): | |
| """Plot sun path chart for summer and winter solstices, inspired by Climate Consultant.""" | |
| st.subheader("Sun Shading Chart") | |
| dates = [ | |
| datetime(2025, 6, 21), # Winter solstice (Southern Hemisphere) | |
| datetime(2025, 12, 21) # Summer solstice (Southern Hemisphere) | |
| ] | |
| times = pd.date_range(start="2025-01-01 00:00", end="2025-01-01 23:00", freq='H') | |
| solar_data = [] | |
| for date in dates: | |
| solpos = pvlib.solarposition.get_solarposition( | |
| time=[date.replace(hour=t.hour, minute=t.minute) for t in times], | |
| latitude=location.latitude, | |
| longitude=location.longitude, | |
| altitude=location.elevation | |
| ) | |
| solar_data.append({ | |
| 'date': date.strftime('%Y-%m-%d'), | |
| 'azimuth': solpos['azimuth'].values, | |
| 'altitude': solpos['elevation'].values | |
| }) | |
| fig = go.Figure() | |
| colors = ['orange', 'blue'] | |
| labels = ['Summer Solstice (Dec 21)', 'Winter Solstice (Jun 21)'] | |
| for i, data in enumerate(solar_data): | |
| fig.add_trace(go.Scatterpolar( | |
| r=data['altitude'], | |
| theta=data['azimuth'], | |
| mode='lines+markers', | |
| name=labels[i], | |
| line=dict(color=colors[i], width=2), | |
| marker=dict(size=6, color=colors[i]), | |
| opacity=0.8 | |
| )) | |
| fig.update_layout( | |
| title="Sun Path Diagram", | |
| polar=dict( | |
| radialaxis=dict( | |
| range=[0, 90], | |
| tickvals=[0, 30, 60, 90], | |
| ticktext=["0°", "30°", "60°", "90°"], | |
| title="Altitude (degrees)" | |
| ), | |
| angularaxis=dict( | |
| direction="clockwise", | |
| rotation=90, | |
| tickvals=[0, 90, 180, 270], | |
| ticktext=["N", "E", "S", "W"] | |
| ) | |
| ), | |
| showlegend=True, | |
| template='plotly_white' | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def plot_temperature_range(self, location: ClimateLocation, epw_data: pd.DataFrame): | |
| """Plot monthly temperature ranges with design conditions.""" | |
| st.subheader("Monthly Temperature Range") | |
| months = pd.to_numeric(epw_data[1], errors='coerce').values | |
| dry_bulb = pd.to_numeric(epw_data[6], errors='coerce').values | |
| month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] | |
| temps_min = [] | |
| temps_max = [] | |
| temps_avg = [] | |
| for i in range(1, 13): | |
| month_mask = (months == i) | |
| temps_min.append(round(np.nanmin(dry_bulb[month_mask]), 1)) | |
| temps_max.append(round(np.nanmax(dry_bulb[month_mask]), 1)) | |
| temps_avg.append(round(np.nanmean(dry_bulb[month_mask]), 1)) | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter( | |
| x=list(range(1, 13)), | |
| y=temps_max, | |
| mode='lines', | |
| name='Max Temperature', | |
| line=dict(color='red', dash='dash'), | |
| opacity=0.5 | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=list(range(1, 13)), | |
| y=temps_min, | |
| mode='lines', | |
| name='Min Temperature', | |
| line=dict(color='red', dash='dash'), | |
| opacity=0.5, | |
| fill='tonexty', | |
| fillcolor='rgba(255, 0, 0, 0.1)' | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=list(range(1, 13)), | |
| y=temps_avg, | |
| mode='lines+markers', | |
| name='Avg Temperature', | |
| line=dict(color='red'), | |
| marker=dict(size=8) | |
| )) | |
| # Add design temperatures | |
| fig.add_hline(y=location.winter_design_temp, line_dash="dot", line_color="blue", annotation_text="Winter Design Temp", annotation_position="top left") | |
| fig.add_hline(y=location.summer_design_temp_db, line_dash="dot", line_color="orange", annotation_text="Summer Design Temp (DB)", annotation_position="bottom left") | |
| fig.update_layout( | |
| title="Monthly Temperature Profile", | |
| xaxis_title="Month", | |
| yaxis_title="Temperature (°C)", | |
| xaxis=dict(tickmode='array', tickvals=list(range(1, 13)), ticktext=month_names), | |
| legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), | |
| showlegend=True, | |
| template='plotly_white' | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def plot_wind_rose(self, epw_data: pd.DataFrame): | |
| """Plot wind rose diagram with improved clarity, inspired by Climate Consultant.""" | |
| st.subheader("Wind Rose") | |
| wind_speed = pd.to_numeric(epw_data[21], errors='coerce').values | |
| wind_direction = pd.to_numeric(epw_data[20], errors='coerce').values | |
| valid_mask = ~np.isnan(wind_speed) & ~np.isnan(wind_direction) | |
| wind_speed = wind_speed[valid_mask] | |
| wind_direction = wind_direction[valid_mask] | |
| # Bin data with 8 directions and tailored speed bins | |
| speed_bins = [0, 2, 4, 6, 8, np.inf] | |
| direction_bins = np.linspace(0, 360, 9)[:-1] | |
| speed_labels = ['0-2 m/s', '2-4 m/s', '4-6 m/s', '6-8 m/s', '8+ m/s'] | |
| direction_labels = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] | |
| hist = np.histogram2d( | |
| wind_direction, wind_speed, | |
| bins=[direction_bins, speed_bins], | |
| density=True | |
| )[0] | |
| hist = hist * 100 # Convert to percentage | |
| fig = go.Figure() | |
| colors = ['#E6F0FF', '#B3D1FF', '#80B2FF', '#4D94FF', '#1A75FF'] | |
| for i, speed_label in enumerate(speed_labels): | |
| fig.add_trace(go.Barpolar( | |
| r=hist[:, i], | |
| theta=direction_bins, | |
| width=45, | |
| name=speed_label, | |
| marker=dict(color=colors[i]), | |
| opacity=0.8 | |
| )) | |
| fig.update_layout( | |
| title="Wind Rose", | |
| polar=dict( | |
| radialaxis=dict( | |
| tickvals=[0, 5, 10, 15], | |
| ticktext=["0%", "5%", "10%", "15%"], | |
| title="Frequency (%)" | |
| ), | |
| angularaxis=dict( | |
| direction="clockwise", | |
| rotation=90, | |
| tickvals=direction_bins, | |
| ticktext=direction_labels | |
| ) | |
| ), | |
| showlegend=True, | |
| template='plotly_white' | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def export_to_json(self, file_path: str) -> None: | |
| """Export all climate data to a JSON file.""" | |
| data = {loc_id: loc.to_dict() for loc_id, loc in self.locations.items()} | |
| with open(file_path, 'w') as f: | |
| json.dump(data, f, indent=4) | |
| def from_json(cls, file_path: str) -> 'ClimateData': | |
| """Load climate data from a JSON file.""" | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| climate_data = cls() | |
| for loc_id, loc_dict in data.items(): | |
| hourly_data = loc_dict["hourly_data"] | |
| epw_data = pd.DataFrame({ | |
| 1: [d["month"] for d in hourly_data], | |
| 2: [d["day"] for d in hourly_data], | |
| 3: [d["hour"] for d in hourly_data], | |
| 6: [d["dry_bulb"] for d in hourly_data], | |
| 8: [d["relative_humidity"] for d in hourly_data], | |
| 9: [d["atmospheric_pressure"] for d in hourly_data], | |
| 13: [d["global_horizontal_radiation"] for d in hourly_data], | |
| 14: [d["direct_normal_radiation"] for d in hourly_data], | |
| 15: [d["diffuse_horizontal_radiation"] for d in hourly_data], | |
| 20: [d["wind_direction"] for d in hourly_data], | |
| 21: [d["wind_speed"] for d in hourly_data], | |
| }) | |
| location = ClimateLocation( | |
| epw_file=epw_data, | |
| typical_extreme_periods=loc_dict["typical_extreme_periods"], | |
| ground_temperatures=loc_dict["ground_temperatures"], | |
| id=loc_dict["id"], | |
| country=loc_dict["country"], | |
| state_province=loc_dict["state_province"], | |
| city=loc_dict["city"], | |
| latitude=loc_dict["latitude"], | |
| longitude=loc_dict["longitude"], | |
| elevation=loc_dict["elevation"], | |
| time_zone=loc_dict["time_zone"], | |
| climate_zone=loc_dict["climate_zone"] | |
| ) | |
| location.hourly_data = loc_dict["hourly_data"] | |
| climate_data.add_location(location) | |
| return climate_data | |
| if __name__ == "__main__": | |
| climate_data = ClimateData() | |
| session_state = {"building_info": {"country": "Australia", "city": "Geelong"}, "page": "Climate Data"} | |
| climate_data.display_climate_input(session_state) |