Update-materials-solar / data /climate_data.py
mabuseif's picture
Upload climate_data.py
2b900dd verified
"""
ASHRAE 169 climate data module for HVAC Load Calculator.
Extracts climate data from EPW files and provides visualizations inspired by Climate Consultant.
Author: Dr Majed Abuseif
Date: May 2025
Version: 2.1.6
"""
from typing import Dict, List, Any, Optional
import pandas as pd
import numpy as np
import os
import json
from dataclasses import dataclass
import streamlit as st
import plotly.graph_objects as go
from io import StringIO
import pvlib
from datetime import datetime, timedelta
import re
import logging
from os.path import join as os_join
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define paths at module level
AU_CCH_DIR = "au_cch" # Relative path to au_cch folder from climate_data.py in data/ (e.g., au_cch/1/RCP2.6/2070/)
# CSS for consistent formatting
STYLE = """
<style>
.markdown-text {
font-family: Roboto, sans-serif;
font-size: 14px;
line-height: 1.5;
margin-bottom: 20px;
}
.markdown-text h3 {
font-size: 18px;
font-weight: bold;
margin-top: 20px;
margin-bottom: 10px;
}
.markdown-text ul {
list-style-type: disc;
padding-left: 20px;
margin: 0;
}
.markdown-text li {
margin-bottom: 8px;
}
.markdown-text strong {
font-weight: bold;
}
</style>
"""
# Location mapping from provided list
LOCATION_MAPPING = {
"24": {"city": "Canberra", "state": "ACT"},
"11": {"city": "Coffs Harbour", "state": "NSW"},
"17": {"city": "Sydney RO (Observatory Hill)", "state": "NSW"},
"56": {"city": "Mascot (Sydney Airport)", "state": "NSW"},
"77": {"city": "Parramatta", "state": "NSW"},
"78": {"city": "Sub-Alpine (Cooma Airport)", "state": "NSW"},
"79": {"city": "Blue Mountains", "state": "NSW"},
"1": {"city": "Darwin", "state": "NT"},
"6": {"city": "Alice Springs", "state": "NT"},
"5": {"city": "Townsville", "state": "QLD"},
"7": {"city": "Rockhampton", "state": "QLD"},
"10": {"city": "Brisbane", "state": "QLD"},
"19": {"city": "Charleville", "state": "QLD"},
"32": {"city": "Cairns", "state": "QLD"},
"70": {"city": "Toowoomba", "state": "QLD"},
"16": {"city": "Adelaide", "state": "SA"},
"75": {"city": "Adelaide Coastal (AMO)", "state": "SA"},
"26": {"city": "Hobart", "state": "TAS"},
"21": {"city": "Melbourne RO", "state": "VIC"},
"27": {"city": "Mildura", "state": "VIC"},
"60": {"city": "Tullamarine (Melbourne Airport)", "state": "VIC"},
"63": {"city": "Warrnambool", "state": "VIC"},
"66": {"city": "Ballarat", "state": "VIC"},
"30": {"city": "Wyndham", "state": "WA"},
"52": {"city": "Swanbourne", "state": "WA"},
"58": {"city": "Albany", "state": "WA"},
"83": {"city": "Christmas Island", "state": "WA"}
}
@dataclass
class ClimateLocation:
"""Class representing a climate location with ASHRAE 169 data derived from EPW files."""
id: str
country: str
state_province: str
city: str
latitude: float
longitude: float
elevation: float # meters
time_zone: float # UTC offset in hours
climate_zone: str
heating_degree_days: float # base 18°C
cooling_degree_days: float # base 18°C
winter_design_temp: float # 99.6% heating design temperature (°C)
summer_design_temp_db: float # 0.4% cooling design dry-bulb temperature (°C)
summer_design_temp_wb: float # 0.4% cooling design wet-bulb temperature (°C)
summer_daily_range: float # Mean daily temperature range in summer (°C)
wind_speed: float # Mean wind speed (m/s)
pressure: float # Mean atmospheric pressure (Pa)
hourly_data: List[Dict] # Hourly data for integration with main.py
typical_extreme_periods: Dict[str, Dict] # Typical/extreme periods (summer/winter)
ground_temperatures: Dict[str, List[float]] # Monthly ground temperatures by depth
def __init__(self, epw_file: pd.DataFrame, typical_extreme_periods: Dict, ground_temperatures: Dict, **kwargs):
"""Initialize ClimateLocation with EPW file data and header information."""
self.id = kwargs.get("id")
self.country = kwargs.get("country")
self.state_province = kwargs.get("state_province", "N/A")
self.city = kwargs.get("city")
self.latitude = kwargs.get("latitude")
self.longitude = kwargs.get("longitude")
self.elevation = kwargs.get("elevation")
self.time_zone = kwargs.get("time_zone", 0.0) # Default to 0.0 if not provided
self.climate_zone = kwargs.get("climate_zone", "Unknown") # Use provided climate_zone
self.typical_extreme_periods = typical_extreme_periods
self.ground_temperatures = ground_temperatures
# Extract columns from EPW data
months = pd.to_numeric(epw_file[1], errors='coerce').values
days = pd.to_numeric(epw_file[2], errors='coerce').values
hours = pd.to_numeric(epw_file[3], errors='coerce').values
dry_bulb = pd.to_numeric(epw_file[6], errors='coerce').values
humidity = pd.to_numeric(epw_file[8], errors='coerce').values
pressure = pd.to_numeric(epw_file[9], errors='coerce').values
global_radiation = pd.to_numeric(epw_file[13], errors='coerce').values
direct_normal_radiation = pd.to_numeric(epw_file[14], errors='coerce').values
diffuse_horizontal_radiation = pd.to_numeric(epw_file[15], errors='coerce').values
wind_direction = pd.to_numeric(epw_file[20], errors='coerce').values
wind_speed = pd.to_numeric(epw_file[21], errors='coerce').values
# Filter wind speed outliers and log high values
wind_speed = wind_speed[wind_speed <= 50] # Remove extreme outliers
if (wind_speed > 15).any():
logger.warning(f"High wind speeds detected: {wind_speed[wind_speed > 15].tolist()}")
# Calculate wet-bulb temperature
wet_bulb = ClimateData.calculate_wet_bulb(dry_bulb, humidity)
# Calculate design conditions
self.winter_design_temp = round(np.nanpercentile(dry_bulb, 0.4), 1)
self.summer_design_temp_db = round(np.nanpercentile(dry_bulb, 99.6), 1)
self.summer_design_temp_wb = round(np.nanpercentile(wet_bulb, 99.6), 1)
# Calculate degree days
daily_temps = np.nanmean(dry_bulb.reshape(-1, 24), axis=1)
self.heating_degree_days = round(np.nansum(np.maximum(18 - daily_temps, 0)))
self.cooling_degree_days = round(np.nansum(np.maximum(daily_temps - 18, 0)))
# Calculate summer daily temperature range (June–August, Southern Hemisphere)
summer_mask = (months >= 6) & (months <= 8)
summer_temps = dry_bulb[summer_mask].reshape(-1, 24)
self.summer_daily_range = round(np.nanmean(np.nanmax(summer_temps, axis=1) - np.nanmin(summer_temps, axis=1)), 1)
# Calculate mean wind speed and pressure
self.wind_speed = round(np.nanmean(wind_speed), 1)
self.pressure = round(np.nanmean(pressure), 1)
# Log wind speed diagnostics
logger.info(f"Wind speed stats: min={wind_speed.min():.1f}, max={wind_speed.max():.1f}, mean={self.wind_speed:.1f}")
# Store hourly data with enhanced fields
self.hourly_data = []
for i in range(len(months)):
if np.isnan(months[i]) or np.isnan(days[i]) or np.isnan(hours[i]) or np.isnan(dry_bulb[i]):
continue # Skip records with missing critical fields
record = {
"month": int(months[i]),
"day": int(days[i]),
"hour": int(hours[i]),
"dry_bulb": float(dry_bulb[i]),
"relative_humidity": float(humidity[i]) if not np.isnan(humidity[i]) else 0.0,
"atmospheric_pressure": float(pressure[i]) if not np.isnan(pressure[i]) else self.pressure,
"global_horizontal_radiation": float(global_radiation[i]) if not np.isnan(global_radiation[i]) else 0.0,
"direct_normal_radiation": float(direct_normal_radiation[i]) if not np.isnan(direct_normal_radiation[i]) else 0.0,
"diffuse_horizontal_radiation": float(diffuse_horizontal_radiation[i]) if not np.isnan(diffuse_horizontal_radiation[i]) else 0.0,
"wind_speed": float(wind_speed[i]) if not np.isnan(wind_speed[i]) else 0.0,
"wind_direction": float(wind_direction[i]) if not np.isnan(wind_direction[i]) else 0.0
}
self.hourly_data.append(record)
if len(self.hourly_data) != 8760:
st.warning(f"Hourly data has {len(self.hourly_data)} records instead of 8760. Some records may have been excluded due to missing data.")
def to_dict(self) -> Dict[str, Any]:
"""Convert the climate location to a dictionary."""
return {
"id": self.id,
"country": self.country,
"state_province": self.state_province,
"city": self.city,
"latitude": self.latitude,
"longitude": self.longitude,
"elevation": self.elevation,
"time_zone": self.time_zone,
"climate_zone": self.climate_zone,
"heating_degree_days": self.heating_degree_days,
"cooling_degree_days": self.cooling_degree_days,
"winter_design_temp": self.winter_design_temp,
"summer_design_temp_db": self.summer_design_temp_db,
"summer_design_temp_wb": self.summer_design_temp_wb,
"summer_daily_range": self.summer_daily_range,
"wind_speed": self.wind_speed,
"pressure": self.pressure,
"hourly_data": self.hourly_data,
"typical_extreme_periods": self.typical_extreme_periods,
"ground_temperatures": self.ground_temperatures
}
class ClimateData:
"""Class for managing ASHRAE 169 climate data from EPW files."""
def __init__(self):
"""Initialize climate data."""
self.locations = {}
self.countries = []
self.country_states = {}
def add_location(self, location: ClimateLocation):
"""Add a new location to the dictionary."""
self.locations[location.id] = location
self.countries = sorted(list(set(loc.country for loc in self.locations.values())))
self.country_states = self._group_locations_by_country_state()
def _group_locations_by_country_state(self) -> Dict[str, Dict[str, List[str]]]:
"""Group locations by country and state/province."""
result = {}
for loc in self.locations.values():
if loc.country not in result:
result[loc.country] = {}
if loc.state_province not in result[loc.country]:
result[loc.country][loc.state_province] = []
result[loc.country][loc.state_province].append(loc.city)
for country in result:
for state in result[country]:
result[country][state] = sorted(result[country][state])
return result
def get_location_by_id(self, location_id: str, session_state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Retrieve climate data by ID from session state or locations."""
if "climate_data" in session_state and session_state["climate_data"].get("id") == location_id:
return session_state["climate_data"]
if location_id in self.locations:
return self.locations[location_id].to_dict()
return None
@staticmethod
def validate_climate_data(data: Dict[str, Any]) -> bool:
"""Validate climate data for required fields and ranges."""
required_fields = [
"id", "country", "city", "latitude", "longitude", "elevation", "time_zone",
"climate_zone", "heating_degree_days", "cooling_degree_days",
"winter_design_temp", "summer_design_temp_db", "summer_design_temp_wb",
"summer_daily_range", "wind_speed", "pressure", "hourly_data"
]
for field in required_fields:
if field not in data:
st.error(f"Validation failed: Missing required field '{field}'")
logger.warning(f"Validation failed: Missing field '{field}'")
return False
if not (-90 <= data["latitude"] <= 90 and -180 <= data["longitude"] <= 180):
st.error("Validation failed: Invalid latitude or longitude")
logger.warning("Validation failed: Invalid latitude or longitude")
return False
if data["elevation"] < 0:
st.error("Validation failed: Negative elevation")
logger.warning("Validation failed: Negative elevation")
return False
if not (-12 <= data["time_zone"] <= 14):
st.error(f"Validation failed: Time zone {data['time_zone']} outside range (-12 to +14)")
logger.warning(f"Validation failed: Time zone {data['time_zone']} outside range")
return False
if data["climate_zone"] not in ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"]:
st.error(f"Validation failed: Invalid climate zone '{data['climate_zone']}'")
logger.warning(f"Validation failed: Invalid climate zone '{data['climate_zone']}'")
return False
if not (data["heating_degree_days"] >= 0 and data["cooling_degree_days"] >= 0):
st.error("Validation failed: Negative degree days")
logger.warning("Validation failed: Negative degree days")
return False
if not (-50 <= data["winter_design_temp"] <= 20):
st.error(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range")
logger.warning(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range")
return False
if not (0 <= data["summer_design_temp_db"] <= 50 and 0 <= data["summer_design_temp_wb"] <= 40):
st.error("Validation failed: Invalid summer design temperatures")
logger.warning("Validation failed: Invalid summer design temperatures")
return False
if data["summer_daily_range"] < 0:
st.error("Validation failed: Negative summer daily range")
logger.warning("Validation failed: Negative summer daily range")
return False
if not (0 <= data["wind_speed"] <= 30):
st.error(f"Validation failed: Wind speed {data['wind_speed']} outside range")
logger.warning(f"Validation failed: Wind speed {data['wind_speed']} outside range")
return False
if not (80000 <= data["pressure"] <= 110000):
st.error(f"Validation failed: Pressure {data['pressure']} outside range")
logger.warning(f"Validation failed: Pressure {data['pressure']} outside range")
return False
if not data["hourly_data"] or len(data["hourly_data"]) < 8700:
st.error(f"Validation failed: Hourly data has {len(data['hourly_data'])} records, expected ~8760")
logger.warning(f"Validation failed: Hourly data has {len(data['hourly_data'])} records")
return False
for record in data["hourly_data"]:
if not (1 <= record["month"] <= 12):
st.error(f"Validation failed: Invalid month {record['month']}")
logger.warning(f"Validation failed: Invalid month {record['month']}")
return False
if not (1 <= record["day"] <= 31):
st.error(f"Validation failed: Invalid day {record['day']}")
logger.warning(f"Validation failed: Invalid day {record['day']}")
return False
if not (1 <= record["hour"] <= 24):
st.error(f"Validation failed: Invalid hour {record['hour']}")
logger.warning(f"Validation failed: Invalid hour {record['hour']}")
return False
if not (-50 <= record["dry_bulb"] <= 50):
st.error(f"Validation failed: Dry bulb {record['dry_bulb']} outside range")
logger.warning(f"Validation failed: Dry bulb {record['dry_bulb']} outside range")
return False
if not (0 <= record["relative_humidity"] <= 100):
st.error(f"Validation failed: Relative humidity {record['relative_humidity']} outside range")
logger.warning(f"Validation failed: Relative humidity {record['relative_humidity']} outside range")
return False
if not (80000 <= record["atmospheric_pressure"] <= 110000):
st.error(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range")
logger.warning(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range")
return False
if not (0 <= record["global_horizontal_radiation"] <= 1200):
st.error(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range")
logger.warning(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range")
return False
if not (0 <= record["direct_normal_radiation"] <= 1200):
st.error(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range")
logger.warning(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range")
return False
if not (0 <= record["diffuse_horizontal_radiation"] <= 1200):
st.error(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range")
logger.warning(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range")
return False
if not (0 <= record["wind_speed"] <= 30):
st.error(f"Validation failed: Wind speed {record['wind_speed']} outside range")
logger.warning(f"Validation failed: Wind speed {record['wind_speed']} outside range")
return False
if not (0 <= record["wind_direction"] <= 360):
st.error(f"Validation failed: Wind direction {record['wind_direction']} outside range")
logger.warning(f"Validation failed: Wind direction {record['wind_direction']} outside range")
return False
# Validate typical/extreme periods (optional)
if "typical_extreme_periods" in data and data["typical_extreme_periods"]:
expected_periods = ["summer_extreme", "summer_typical", "winter_extreme", "winter_typical"]
missing_periods = [p for p in expected_periods if p not in data["typical_extreme_periods"]]
if missing_periods:
st.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}")
logger.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}")
for period in data["typical_extreme_periods"].values():
for date in ["start", "end"]:
if not (1 <= period[date]["month"] <= 12 and 1 <= period[date]["day"] <= 31):
st.error(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}")
logger.warning(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}")
return False
# Validate ground temperatures (optional)
if "ground_temperatures" in data and data["ground_temperatures"]:
for depth, temps in data["ground_temperatures"].items():
if len(temps) != 12 or not all(0 <= t <= 50 for t in temps):
st.error(f"Validation failed: Invalid ground temperatures for depth {depth}")
logger.warning(f"Validation failed: Invalid ground temperatures for depth {depth}")
return False
return True
@staticmethod
def calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray:
"""Calculate Wet Bulb Temperature using Stull (2011) approximation."""
db = np.array(dry_bulb, dtype=float)
rh = np.array(relative_humidity, dtype=float)
term1 = db * np.arctan(0.151977 * (rh + 8.313659)**0.5)
term2 = np.arctan(db + rh)
term3 = np.arctan(rh - 1.676331)
term4 = 0.00391838 * rh**1.5 * np.arctan(0.023101 * rh)
term5 = -4.686035
wet_bulb = term1 + term2 - term3 + term4 + term5
invalid_mask = (rh < 5) | (rh > 99) | (db < -20) | (db > 50) | np.isnan(db) | np.isnan(rh)
wet_bulb[invalid_mask] = np.nan
return wet_bulb
@staticmethod
def is_numeric(value: str) -> bool:
"""Check if a string can be converted to a number."""
try:
float(value)
return True
except ValueError:
return False
def get_locations_by_state(self, state: str) -> List[Dict[str, str]]:
"""Get list of locations for a given state from LOCATION_MAPPING."""
return [
{"number": loc_num, "city": loc_info["city"]}
for loc_num, loc_info in LOCATION_MAPPING.items()
if loc_info["state"] == state
]
def process_epw_file(self, epw_content: str, location_num: str, rcp: str, year: str) -> Optional[ClimateLocation]:
"""Process an EPW file content and return a ClimateLocation object."""
try:
epw_lines = epw_content.splitlines()
# Parse header
header = next(line for line in epw_lines if line.startswith("LOCATION"))
header_parts = header.split(",")
if len(header_parts) < 10:
raise ValueError("Invalid LOCATION header: too few fields.")
city = header_parts[1].strip() or "Unknown"
city = re.sub(r'\..*', '', city) # Clean city name
state_province = header_parts[2].strip() or "Unknown"
country = header_parts[3].strip() or "Unknown"
latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0
longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0
time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0
elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0
logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s",
city, country, latitude, longitude, time_zone, elevation)
# Override city and state from LOCATION_MAPPING
if location_num in LOCATION_MAPPING:
city = LOCATION_MAPPING[location_num]["city"]
state_province = LOCATION_MAPPING[location_num]["state"]
# Parse TYPICAL/EXTREME PERIODS
typical_extreme_periods = {}
date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$'
for line in epw_lines:
if line.startswith("TYPICAL/EXTREME PERIODS"):
parts = line.strip().split(',')
try:
num_periods = int(parts[1])
except ValueError:
st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.")
break
for i in range(num_periods):
try:
if len(parts) < 2 + i*4 + 4:
st.warning(f"Insufficient fields for period {i+1}, skipping.")
continue
period_name = parts[2 + i*4]
period_type = parts[3 + i*4]
start_date = parts[4 + i*4].strip()
end_date = parts[5 + i*4].strip()
if period_name in [
"Summer - Week Nearest Max Temperature For Period",
"Summer - Week Nearest Average Temperature For Period",
"Winter - Week Nearest Min Temperature For Period",
"Winter - Week Nearest Average Temperature For Period"
]:
season = 'summer' if 'Summer' in period_name else 'winter'
period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical')
key = f"{season}_{period_type}"
start_date_clean = re.sub(r'\s+', '', start_date)
end_date_clean = re.sub(r'\s+', '', end_date)
if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.")
continue
start_month, start_day = map(int, start_date_clean.split('/'))
end_month, end_day = map(int, end_date_clean.split('/'))
typical_extreme_periods[key] = {
"start": {"month": start_month, "day": start_day},
"end": {"month": end_month, "day": end_day}
}
except (IndexError, ValueError) as e:
st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.")
continue
break
# Parse GROUND TEMPERATURES
ground_temperatures = {}
for line in epw_lines:
if line.startswith("GROUND TEMPERATURES"):
parts = line.strip().split(',')
try:
num_depths = int(parts[1])
except ValueError:
st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.")
break
for i in range(num_depths):
try:
if len(parts) < 2 + i*16 + 16:
st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.")
continue
depth = parts[2 + i*16]
temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()]
if len(temps) != 12:
st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.")
continue
ground_temperatures[depth] = temps
except (ValueError, IndexError) as e:
st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.")
continue
break
# Read data section
data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1
epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str)
if len(epw_data) != 8760:
raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.")
if len(epw_data.columns) not in [32, 35]:
raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.")
for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]:
epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce')
if epw_data[col].isna().all():
raise ValueError(f"Column {col} contains only non-numeric or missing data.")
# Calculate average humidity for climate zone assignment
humidity = pd.to_numeric(epw_data[8], errors='coerce').values
avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0
logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country)
# Create ClimateLocation
location = ClimateLocation(
epw_file=epw_data,
typical_extreme_periods=typical_extreme_periods,
ground_temperatures=ground_temperatures,
id=f"{country[:1].upper()}{city[:3].upper()}_{rcp}_{year}",
country=country,
state_province=state_province,
city=city,
latitude=latitude,
longitude=longitude,
elevation=elevation,
time_zone=time_zone
)
# Assign climate zone
try:
climate_zone = self.assign_climate_zone(
hdd=location.heating_degree_days,
cdd=location.cooling_degree_days,
avg_humidity=avg_humidity
)
location.climate_zone = climate_zone
logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country)
except Exception as e:
st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.")
logger.error("Climate zone assignment error: %s", str(e))
location.climate_zone = "Unknown"
return location
except Exception as e:
st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.")
logger.error(f"EPW processing error: %s", str(e))
return None
def display_climate_input(self, session_state: Dict[str, Any]):
"""Display Streamlit interface for EPW upload and visualizations."""
st.title("Climate Data Analysis")
# Apply consistent styling
st.markdown(STYLE, unsafe_allow_html=True)
# Clear invalid session_state["climate_data"] to prevent validation errors
if "climate_data" in session_state and not all(key in session_state["climate_data"] for key in ["id", "country", "city"]):
logger.warning("Invalid climate_data in session_state, clearing: %s", session_state["climate_data"])
session_state["climate_data"] = {}
# Initialize active tab in session_state
if "active_tab" not in session_state:
session_state["active_tab"] = "General Information"
# Define tabs, including new Climate Projection tab
tab_names = [
"General Information",
"Climate Projection",
"Psychrometric Chart",
"Sun Shading Chart",
"Temperature Range",
"Wind Rose"
]
tabs = st.tabs(tab_names)
# Initialize location and epw_data for display
location = None
epw_data = None
# General Information tab: Handle EPW upload and display existing data
with tabs[0]:
uploaded_file = st.file_uploader("Upload EPW File", type=["epw"])
if uploaded_file:
with st.spinner("Processing uploaded EPW file..."):
try:
# Process new EPW file
epw_content = uploaded_file.read().decode("utf-8")
epw_lines = epw_content.splitlines()
# Parse header
header = next(line for line in epw_lines if line.startswith("LOCATION"))
header_parts = header.split(",")
if len(header_parts) < 10:
raise ValueError("Invalid LOCATION header: too few fields.")
city = header_parts[1].strip() or "Unknown"
city = re.sub(r'\..*', '', city)
state_province = header_parts[2].strip() or "Unknown"
country = header_parts[3].strip() or "Unknown"
latitude = float(header_parts[6]) if header_parts[6].strip() and self.is_numeric(header_parts[6]) else 0.0
longitude = float(header_parts[7]) if header_parts[7].strip() and self.is_numeric(header_parts[7]) else 0.0
time_zone = float(header_parts[8]) if header_parts[8].strip() and self.is_numeric(header_parts[8]) else 0.0
elevation = float(header_parts[9]) if header_parts[9].strip() and self.is_numeric(header_parts[9]) else 0.0
logger.info("Parsed EPW header: city=%s, country=%s, latitude=%s, longitude=%s, time_zone=%s, elevation=%s",
city, country, latitude, longitude, time_zone, elevation)
# Parse TYPICAL/EXTREME PERIODS
typical_extreme_periods = {}
date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$'
for line in epw_lines:
if line.startswith("TYPICAL/EXTREME PERIODS"):
parts = line.strip().split(',')
try:
num_periods = int(parts[1])
except ValueError:
st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.")
break
for i in range(num_periods):
try:
if len(parts) < 2 + i*4 + 4:
st.warning(f"Insufficient fields for period {i+1}, skipping.")
continue
period_name = parts[2 + i*4]
period_type = parts[3 + i*4]
start_date = parts[4 + i*4].strip()
end_date = parts[5 + i*4].strip()
if period_name in [
"Summer - Week Nearest Max Temperature For Period",
"Summer - Week Nearest Average Temperature For Period",
"Winter - Week Nearest Min Temperature For Period",
"Winter - Week Nearest Average Temperature For Period"
]:
season = 'summer' if 'Summer' in period_name else 'winter'
period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical')
key = f"{season}_{period_type}"
start_date_clean = re.sub(r'\s+', '', start_date)
end_date_clean = re.sub(r'\s+', '', end_date)
if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.")
continue
start_month, start_day = map(int, start_date_clean.split('/'))
end_month, end_day = map(int, end_date_clean.split('/'))
typical_extreme_periods[key] = {
"start": {"month": start_month, "day": start_day},
"end": {"month": end_month, "day": end_day}
}
except (IndexError, ValueError) as e:
st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.")
continue
break
# Parse GROUND TEMPERATURES
ground_temperatures = {}
for line in epw_lines:
if line.startswith("GROUND TEMPERATURES"):
parts = line.strip().split(',')
try:
num_depths = int(parts[1])
except ValueError:
st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.")
break
for i in range(num_depths):
try:
if len(parts) < 2 + i*16 + 16:
st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.")
continue
depth = parts[2 + i*16]
temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()]
if len(temps) != 12:
st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.")
continue
ground_temperatures[depth] = temps
except (ValueError, IndexError) as e:
st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.")
continue
break
# Read data section
data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1
epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str)
if len(epw_data) != 8760:
raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.")
if len(epw_data.columns) != 35:
raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.")
for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]:
epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce')
if epw_data[col].isna().all():
raise ValueError(f"Column {col} contains only non-numeric or missing data.")
# Calculate average humidity for climate zone assignment
humidity = pd.to_numeric(epw_data[8], errors='coerce').values
avg_humidity = float(np.nanmean(humidity)) if not np.all(np.isnan(humidity)) else 50.0
logger.info("Calculated average humidity: %.1f%% for %s, %s", avg_humidity, city, country)
# Create ClimateLocation with consistent ID
location = ClimateLocation(
epw_file=epw_data,
typical_extreme_periods=typical_extreme_periods,
ground_temperatures=ground_temperatures,
id=f"{country[:1].upper()}{city[:3].upper()}_UPLOAD",
country=country,
state_province=state_province,
city=city,
latitude=latitude,
longitude=longitude,
elevation=elevation,
time_zone=time_zone
)
# Assign climate zone
try:
climate_zone = self.assign_climate_zone(
hdd=location.heating_degree_days,
cdd=location.cooling_degree_days,
avg_humidity=avg_humidity
)
location.climate_zone = climate_zone
logger.info("Assigned climate zone: %s for %s, %s", climate_zone, city, country)
except Exception as e:
st.warning(f"Failed to assign climate zone: {str(e)}. Using default 'Unknown'.")
logger.error("Climate zone assignment error: %s", str(e))
location.climate_zone = "Unknown"
self.add_location(location)
climate_data_dict = location.to_dict()
session_state["climate_data"] = climate_data_dict
if not self.validate_climate_data(climate_data_dict):
st.warning(f"Climate data validation failed for {city}, {country}. Displaying data anyway.")
logger.warning("Validation failed for new EPW data: %s", climate_data_dict["id"])
st.success("Climate data extracted from EPW file!")
logger.info("Successfully processed EPW file and stored in session_state: %s", climate_data_dict["id"])
session_state["active_tab"] = "General Information"
except Exception as e:
st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.")
logger.error(f"EPW processing error: %s", str(e))
session_state["climate_data"] = {}
elif "climate_data" in session_state and session_state["climate_data"]:
# Reconstruct from session_state
climate_data_dict = session_state["climate_data"]
logger.info("Attempting to reconstruct climate data from session_state: %s", climate_data_dict.get("id", "Unknown"))
required_keys = ["id", "country", "city", "latitude", "longitude", "elevation", "time_zone", "climate_zone", "hourly_data"]
missing_keys = [key for key in required_keys if key not in climate_data_dict]
if missing_keys:
st.warning(f"Invalid climate data in session state, missing keys: {', '.join(missing_keys)}. Please upload a new EPW file.")
logger.warning("Missing keys in session_state.climate_data: %s", missing_keys)
session_state["climate_data"] = {}
else:
if not self.validate_climate_data(climate_data_dict):
st.warning(f"Stored climate data validation failed for {climate_data_dict.get('city', 'Unknown')}, {climate_data_dict.get('country', 'Unknown')}. Displaying data anyway.")
logger.warning("Validation failed for session_state.climate_data: %s", climate_data_dict.get("id", "Unknown"))
try:
# Rebuild epw_data from hourly_data
hourly_data = climate_data_dict["hourly_data"]
epw_data = pd.DataFrame(np.nan, index=range(len(hourly_data)), columns=range(35))
epw_data[1] = [d["month"] for d in hourly_data]
epw_data[2] = [d["day"] for d in hourly_data]
epw_data[3] = [d["hour"] for d in hourly_data]
epw_data[6] = [d["dry_bulb"] for d in hourly_data]
epw_data[8] = [d["relative_humidity"] for d in hourly_data]
epw_data[9] = [d["atmospheric_pressure"] for d in hourly_data]
epw_data[13] = [d["global_horizontal_radiation"] for d in hourly_data]
epw_data[14] = [d["direct_normal_radiation"] for d in hourly_data]
epw_data[15] = [d["diffuse_horizontal_radiation"] for d in hourly_data]
epw_data[20] = [d["wind_direction"] for d in hourly_data]
epw_data[21] = [d["wind_speed"] for d in hourly_data]
# Create ClimateLocation
location = ClimateLocation(
epw_file=epw_data,
typical_extreme_periods=climate_data_dict.get("typical_extreme_periods", {}),
ground_temperatures=climate_data_dict.get("ground_temperatures", {}),
id=climate_data_dict["id"],
country=climate_data_dict["country"],
state_province=climate_data_dict.get("state_province", "N/A"),
city=climate_data_dict["city"],
latitude=climate_data_dict["latitude"],
longitude=climate_data_dict["longitude"],
elevation=climate_data_dict["elevation"],
time_zone=climate_data_dict["time_zone"],
climate_zone=climate_data_dict["climate_zone"]
)
location.hourly_data = climate_data_dict["hourly_data"]
self.add_location(location)
st.info(f"Displaying previously extracted climate data for {climate_data_dict['city']}, {climate_data_dict['country']}.")
logger.info("Successfully reconstructed climate data from session_state: %s", climate_data_dict["id"])
except Exception as e:
st.error(f"Error reconstructing climate data: {str(e)}. Please upload a new EPW file.")
logger.error(f"Reconstruction error: %s", str(e))
session_state["climate_data"] = {}
# Display data if available
if location is not None and epw_data is not None:
self.display_design_conditions(location)
# Climate Projection tab
with tabs[1]:
st.markdown("""
<div class="markdown-text">
<h3>Climate Projection</h3>
<p>At this stage, this section is focused on some locations in Australia, and the provided data is based on "Projected weather files for building energy modelling" from CSIRO 2022.</p>
</div>
""", unsafe_allow_html=True)
# Dropdown menus
country = st.selectbox("Country", ["Australia"], key="projection_country")
states = ["ACT", "NSW", "NT", "QLD", "SA", "TAS", "VIC", "WA"]
state = st.selectbox("State", states, key="projection_state")
# Get locations for selected state
locations = self.get_locations_by_state(state)
location_options = [f"{loc['city']} ({loc['number']})" for loc in locations]
location_display = st.selectbox("Location", location_options, key="location")
# Extract location number from selection
location_num = ""
if location_display:
location_num = next(loc["number"] for loc in locations if f"{loc['city']} ({loc['number']})" == location_display)
rcp_options = ["RCP2.6", "RCP4.5", "RCP8.5"]
rcp = st.selectbox("RCP Scenario", rcp_options, key="rcp")
year_options = ["2030", "2050", "2070", "2090"]
year = st.selectbox("Year", year_options, key="year")
if st.button("Extract Data"):
with st.spinner("Extracting climate projection data..."):
# Log AU_CCH_DIR for debugging
logger.debug(f"AU_CCH_DIR set to: {os.path.abspath(AU_CCH_DIR)}")
# Construct file path
file_path = os_join(AU_CCH_DIR, location_num, rcp, year)
logger.debug(f"Attempting to access directory: {os.path.abspath(file_path)}")
if not os.path.exists(file_path):
st.error(f"No directory found at au_cch/{location_num}/{rcp}/{year}/. In the Hugging Face Space 'mabuseif/Update-materials-solar', ensure the 'au_cch' folder is in the repository root alongside 'data' and 'utils', with the structure au_cch/{location_num}/{rcp}/{year} (e.g., au_cch/1/RCP2.6/2070/) containing a single .epw file (e.g., adelaide_rcp2.6_2070.epw).")
logger.error(f"Directory does not exist: {file_path}")
else:
try:
epw_files = [f for f in os.listdir(file_path) if f.endswith(".epw")]
if not epw_files:
st.error(f"No EPW file found in au_cch/{location_num}/{rcp}/{year}/. Please check that au_cch/{location_num}/{rcp}/{year}/ (e.g., au_cch/1/RCP2.6/{year}/) contains a single file with a .epw extension (e.g., adelaide_rcp2.6_2070.epw).")
logger.error(f"No EPW file found in {file_path}")
elif len(epw_files) > 1:
st.error(f"Multiple EPW files found in au_cch/{location_num}/{rcp}/{year}/: {epw_files}. Please ensure exactly one .epw file per directory (e.g., au_cch/1/RCP2.6/{year}/).")
logger.error(f"Multiple EPW files found: {epw_files}")
else:
epw_file_path = os_join(file_path, epw_files[0])
try:
with open(epw_file_path, 'r') as f:
epw_content = f.read()
location = self.process_epw_file(epw_content, location_num, rcp, year)
if location:
self.add_location(location)
climate_data_dict = location.to_dict()
if self.validate_climate_data(climate_data_dict):
session_state["climate_data"] = climate_data_dict
st.success(f"Successfully extracted climate projection data for {location.city}, {location.country}, {rcp}, {year}!")
logger.info(f"Successful processing projection of {climate_data_dict['id']}")
session_state["active_tab"] = "General Information"
# Set location and epw_data for immediate display
epw_data = pd.DataFrame(np.nan, index=range(len(climate_data_dict["hourly_data"])), columns=range(35))
epw_data[1] = [d["month"] for d in climate_data_dict["hourly_data"]]
epw_data[2] = [d["day"] for d in climate_data_dict["hourly_data"]]
epw_data[3] = [d["hour"] for d in climate_data_dict["hourly_data"]]
epw_data[6] = [d["dry_bulb"] for d in climate_data_dict["hourly_data"]]
epw_data[8] = [d["relative_humidity"] for d in climate_data_dict["hourly_data"]]
epw_data[9] = [d["atmospheric_pressure"] for d in climate_data_dict["hourly_data"]]
epw_data[13] = [d["global_horizontal_radiation"] for d in climate_data_dict["hourly_data"]]
epw_data[14] = [d["direct_normal_radiation"] for d in climate_data_dict["hourly_data"]]
epw_data[15] = [d["diffuse_horizontal_radiation"] for d in climate_data_dict["hourly_data"]]
epw_data[20] = [d["wind_direction"] for d in climate_data_dict["hourly_data"]]
epw_data[21] = [d["wind_speed"] for d in climate_data_dict["hourly_data"]]
else:
st.warning(f"Climate projection data validation failed for {location.city}, {location.country}. Displaying data anyway.")
logger.warning(f"Validation failed for {climate_data_dict['id']}")
session_state["climate_data"] = climate_data_dict
session_state["active_tab"] = "General Information"
# Set location and epw_data for immediate display
epw_data = pd.DataFrame(np.nan, index=range(len(climate_data_dict["hourly_data"])), columns=range(35))
epw_data[1] = [d["month"] for d in climate_data_dict["hourly_data"]]
epw_data[2] = [d["day"] for d in climate_data_dict["hourly_data"]]
epw_data[3] = [d["hour"] for d in climate_data_dict["hourly_data"]]
epw_data[6] = [d["dry_bulb"] for d in climate_data_dict["hourly_data"]]
epw_data[8] = [d["relative_humidity"] for d in climate_data_dict["hourly_data"]]
epw_data[9] = [d["atmospheric_pressure"] for d in climate_data_dict["hourly_data"]]
epw_data[13] = [d["global_horizontal_radiation"] for d in climate_data_dict["hourly_data"]]
epw_data[14] = [d["direct_normal_radiation"] for d in climate_data_dict["hourly_data"]]
epw_data[15] = [d["diffuse_horizontal_radiation"] for d in climate_data_dict["hourly_data"]]
epw_data[20] = [d["wind_direction"] for d in climate_data_dict["hourly_data"]]
epw_data[21] = [d["wind_speed"] for d in climate_data_dict["hourly_data"]]
except Exception as e:
st.error(f"Error reading {epw_file_path}: {str(e)}")
logger.error(f"Error reading {epw_file_path}: {str(e)}")
session_state["climate_data"] = {}
except Exception as e:
st.error(f"Error accessing directory au_cch/{location_num}/{rcp}/{year}/: {str(e)}")
logger.error(f"Error accessing directory {file_path}: {str(e)}")
# Other tabs
if location is not None and epw_data is not None:
with tabs[2]:
self.plot_psychrometric_chart(location, epw_data)
with tabs[3]:
self.plot_sun_shading_chart(location)
with tabs[4]:
self.plot_temperature_range(location, epw_data)
with tabs[5]:
self.plot_wind_rose(epw_data)
else:
for i in range(2, len(tabs)):
with tabs[i]:
st.info("No climate data available. Please upload an EPW file or select a climate projection to proceed.")
logger.info("No climate data to display in tab %s; prompting for EPW upload.", tab_names[i])
def display_design_conditions(self, location: ClimateLocation):
"""Display design conditions for HVAC calculations using styled HTML."""
st.subheader("Design Conditions")
# Location Details
st.markdown(f"""
<div class="markdown-text">
<h3>Location Details</h3>
<ul>
<li><strong>Country:</strong> {location.country}</li>
<li><strong>City:</strong> {location.city}</li>
<li><strong>State/Province:</strong> {location.state_province}</li>
<li><strong>Latitude:</strong> {location.latitude}°</li>
<li><strong>Longitude:</strong> {location.longitude}°</li>
<li><strong>Elevation:</strong> {location.elevation} m</li>
<li><strong>Time Zone:</strong> {location.time_zone} hours (UTC)</li>
</ul>
</div>
""", unsafe_allow_html=True)
# Calculated Climate Parameters
st.markdown(f"""
<div class="markdown-text">
<h3>Calculated Climate Parameters</h3>
<ul>
<li><strong>Climate Zone:</strong> {location.climate_zone}</li>
<li><strong>Heating Degree Days (base 18°C):</strong> {location.heating_degree_days} HDD</li>
<li><strong>Cooling Degree Days (base 18°C):</strong> {location.cooling_degree_days} CDD</li>
<li><strong>Winter Design Temperature (99.6%):</strong> {location.winter_design_temp} °C</li>
<li><strong>Summer Design Dry-Bulb Temp (0.4%):</strong> {location.summer_design_temp_db} °C</li>
<li><strong>Summer Design Wet-Bulb Temp (0.4%):</strong> {location.summer_design_temp_wb} °C</li>
<li><strong>Summer Daily Temperature Range:</strong> {location.summer_daily_range} °C</li>
<li><strong>Mean Wind Speed:</strong> {location.wind_speed} m/s</li>
<li><strong>Mean Atmospheric Pressure:</strong> {location.pressure} Pa</li>
</ul>
</div>
""", unsafe_allow_html=True)
# Typical/Extreme Periods
if location.typical_extreme_periods:
period_items = [
f"<li><strong>{key.replace('_', ' ').title()}:</strong> {period['start']['month']}/{period['start']['day']} to {period['end']['month']}/{period['end']['day']}</li>"
for key, period in location.typical_extreme_periods.items()
]
st.markdown(f"""
<div class="markdown-text">
<h3>Typical/Extreme Periods</h3>
<ul>
{''.join(period_items)}
</ul>
</div>
""", unsafe_allow_html=True)
# Ground Temperatures (Table)
if location.ground_temperatures:
st.markdown('<div class="markdown-text"><h3>Ground Temperatures</h3></div>', unsafe_allow_html=True)
month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
table_data = []
for depth, temps in location.ground_temperatures.items():
row = {"Depth (m)": float(depth)}
row.update({month: f"{temp:.2f}" for month, temp in zip(month_names, temps)})
table_data.append(row)
df = pd.DataFrame(table_data)
st.dataframe(df, use_container_width=True)
# Add download button for Ground Temperatures with unique key
csv = df.to_csv(index=False)
st.download_button(
label="Download Ground Temperatures as CSV",
data=csv,
file_name=f"ground_temperatures_{location.city}_{location.country}.csv",
mime="text/csv",
key=f"download_ground_temperatures_{location.id}" # Unique key based on location.id
)
# Hourly Data (Table)
st.markdown('<div class="markdown-text"><h3>Hourly Climate Data</h3></div>', unsafe_allow_html=True)
hourly_table_data = [
{
"Month": record["month"],
"Day": record["day"],
"Hour": record["hour"],
"Dry Bulb Temp (°C)": f"{record['dry_bulb']:.1f}",
"Relative Humidity (%)": f"{record['relative_humidity']:.1f}",
"Atmospheric Pressure (Pa)": f"{record['atmospheric_pressure']:.1f}",
"Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}",
"Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}",
"Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}",
"Wind Speed (m/s)": f"{record['wind_speed']:.1f}",
"Wind Direction (°)": f"{record['wind_direction']:.1f}"
}
for record in location.hourly_data
]
hourly_df = pd.DataFrame(hourly_table_data)
st.dataframe(hourly_df, use_container_width=True)
# Add download button for Hourly Climate Data with unique key
csv = hourly_df.to_csv(index=False)
st.download_button(
label="Download Hourly Climate Data as CSV",
data=csv,
file_name=f"hourly_climate_data_{location.city}_{location.country}.csv",
mime="text/csv",
key=f"download_hourly_climate_{location.id}" # Unique key based on location.id
)
@staticmethod
def assign_climate_zone(hdd: float, cdd: float, avg_humidity: float) -> str:
"""Assign ASHRAE 169 climate zone based on HDD, CDD, and humidity."""
if cdd > 10000:
return "0A" if avg_humidity > 60 else "0B"
elif cdd > 5000:
return "1A" if avg_humidity > 60 else "1B"
elif cdd > 2500:
return "2A" if avg_humidity > 60 else "2B"
elif hdd < 2000 and cdd > 1000:
return "3A" if avg_humidity > 60 else "3B" if avg_humidity < 40 else "3C"
elif hdd < 3000:
return "4A" if avg_humidity > 60 else "4B" if avg_humidity < 40 else "4C"
elif hdd < 4000:
return "5A" if avg_humidity > 60 else "5B" if avg_humidity < 40 else "5C"
elif hdd < 5000:
return "6A" if avg_humidity > 60 else "6B"
elif hdd < 7000:
return "7"
else:
return "8"
def plot_psychrometric_chart(self, location: ClimateLocation, epw_data: pd.DataFrame):
"""Plot psychrometric chart with ASHRAE 55 comfort zone and psychrometric lines."""
st.subheader("Psychrometric Chart")
dry_bulb = pd.to_numeric(epw_data[6], errors='coerce').values
humidity = pd.to_numeric(epw_data[8], errors='coerce').values
valid_mask = ~np.isnan(dry_bulb) & ~np.isnan(humidity)
dry_bulb = dry_bulb[valid_mask]
humidity = humidity[valid_mask]
# Calculate humidity ratio (kg/kg dry air)
pressure = location.pressure / 1000 # kPa
saturation_pressure = 6.1078 * 10 ** (7.5 * dry_bulb / (dry_bulb + 237.3))
vapor_pressure = humidity / 100 * saturation_pressure
humidity_ratio = 0.62198 * vapor_pressure / (pressure - vapor_pressure) * 1000 # Convert to g/kg
fig = go.Figure()
# Hourly data points
fig.add_trace(go.Scatter(
x=dry_bulb,
y=humidity_ratio,
mode='markers',
marker=dict(size=5, opacity=0.5, color='blue'),
name='Hourly Conditions'
))
# ASHRAE 55 comfort zone
comfort_db = [20, 26, 26, 20, 20]
comfort_rh = [30, 30, 60, 60, 30]
comfort_vp = np.array(comfort_rh) / 100 * 6.1078 * 10 ** (7.5 * np.array(comfort_db) / (np.array(comfort_db) + 237.3))
comfort_hr = 0.62198 * comfort_vp / (pressure - comfort_vp) * 1000
fig.add_trace(go.Scatter(
x=comfort_db,
y=comfort_hr,
mode='lines',
line=dict(color='green', width=2),
fill='toself',
fillcolor='rgba(0, 255, 0, 0.2)',
name='ASHRAE 55 Comfort Zone'
))
# Constant humidity ratio lines
for hr in [5, 10, 15]:
db_range = np.linspace(0, 40, 100)
vp = (hr / 1000 * pressure) / (0.62198 + hr / 1000)
rh = vp / (6.1078 * 10 ** (7.5 * db_range / (db_range + 237.3))) * 100
hr_line = np.full_like(db_range, hr)
fig.add_trace(go.Scatter(
x=db_range,
y=hr_line,
mode='lines',
line=dict(color='gray', width=1, dash='dash'),
name=f'{hr} g/kg',
showlegend=True
))
# Constant wet-bulb temperature lines
wet_bulb_temps = [10, 15, 20]
for wbt in wet_bulb_temps:
db_range = np.linspace(0, 40, 100)
rh_range = np.linspace(5, 95, 100)
wb_values = self.calculate_wet_bulb(db_range, rh_range)
vp = rh_range / 100 * (6.1078 * 10 ** (7.5 * db_range / (db_range + 237.3)))
hr_values = 0.62198 * vp / (pressure - vp) * 1000
mask = (wb_values >= wbt - 0.5) & (wb_values <= wbt + 0.5)
if np.any(mask):
fig.add_trace(go.Scatter(
x=db_range[mask],
y=hr_values[mask],
mode='lines',
line=dict(color='purple', width=1, dash='dot'),
name=f'Wet-Bulb {wbt}°C',
showlegend=True
))
fig.update_layout(
title="Psychrometric Chart",
xaxis_title="Dry-Bulb Temperature (°C)",
yaxis_title="Humidity Ratio (g/kg dry air)",
xaxis=dict(range=[-5, 40]),
yaxis=dict(range=[0, 25]),
showlegend=True,
template='plotly_white'
)
st.plotly_chart(fig, use_container_width=True)
def plot_sun_shading_chart(self, location: ClimateLocation):
"""Plot sun path chart for summer and winter solstices, inspired by Climate Consultant."""
st.subheader("Sun Shading Chart")
dates = [
datetime(2025, 6, 21), # Winter solstice (Southern Hemisphere)
datetime(2025, 12, 21) # Summer solstice (Southern Hemisphere)
]
times = pd.date_range(start="2025-01-01 00:00", end="2025-01-01 23:00", freq='H')
solar_data = []
for date in dates:
solpos = pvlib.solarposition.get_solarposition(
time=[date.replace(hour=t.hour, minute=t.minute) for t in times],
latitude=location.latitude,
longitude=location.longitude,
altitude=location.elevation
)
solar_data.append({
'date': date.strftime('%Y-%m-%d'),
'azimuth': solpos['azimuth'].values,
'altitude': solpos['elevation'].values
})
fig = go.Figure()
colors = ['orange', 'blue']
labels = ['Summer Solstice (Dec 21)', 'Winter Solstice (Jun 21)']
for i, data in enumerate(solar_data):
fig.add_trace(go.Scatterpolar(
r=data['altitude'],
theta=data['azimuth'],
mode='lines+markers',
name=labels[i],
line=dict(color=colors[i], width=2),
marker=dict(size=6, color=colors[i]),
opacity=0.8
))
fig.update_layout(
title="Sun Path Diagram",
polar=dict(
radialaxis=dict(
range=[0, 90],
tickvals=[0, 30, 60, 90],
ticktext=["0°", "30°", "60°", "90°"],
title="Altitude (degrees)"
),
angularaxis=dict(
direction="clockwise",
rotation=90,
tickvals=[0, 90, 180, 270],
ticktext=["N", "E", "S", "W"]
)
),
showlegend=True,
template='plotly_white'
)
st.plotly_chart(fig, use_container_width=True)
def plot_temperature_range(self, location: ClimateLocation, epw_data: pd.DataFrame):
"""Plot monthly temperature ranges with design conditions."""
st.subheader("Monthly Temperature Range")
months = pd.to_numeric(epw_data[1], errors='coerce').values
dry_bulb = pd.to_numeric(epw_data[6], errors='coerce').values
month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
temps_min = []
temps_max = []
temps_avg = []
for i in range(1, 13):
month_mask = (months == i)
temps_min.append(round(np.nanmin(dry_bulb[month_mask]), 1))
temps_max.append(round(np.nanmax(dry_bulb[month_mask]), 1))
temps_avg.append(round(np.nanmean(dry_bulb[month_mask]), 1))
fig = go.Figure()
fig.add_trace(go.Scatter(
x=list(range(1, 13)),
y=temps_max,
mode='lines',
name='Max Temperature',
line=dict(color='red', dash='dash'),
opacity=0.5
))
fig.add_trace(go.Scatter(
x=list(range(1, 13)),
y=temps_min,
mode='lines',
name='Min Temperature',
line=dict(color='red', dash='dash'),
opacity=0.5,
fill='tonexty',
fillcolor='rgba(255, 0, 0, 0.1)'
))
fig.add_trace(go.Scatter(
x=list(range(1, 13)),
y=temps_avg,
mode='lines+markers',
name='Avg Temperature',
line=dict(color='red'),
marker=dict(size=8)
))
# Add design temperatures
fig.add_hline(y=location.winter_design_temp, line_dash="dot", line_color="blue", annotation_text="Winter Design Temp", annotation_position="top left")
fig.add_hline(y=location.summer_design_temp_db, line_dash="dot", line_color="orange", annotation_text="Summer Design Temp (DB)", annotation_position="bottom left")
fig.update_layout(
title="Monthly Temperature Profile",
xaxis_title="Month",
yaxis_title="Temperature (°C)",
xaxis=dict(tickmode='array', tickvals=list(range(1, 13)), ticktext=month_names),
legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
showlegend=True,
template='plotly_white'
)
st.plotly_chart(fig, use_container_width=True)
def plot_wind_rose(self, epw_data: pd.DataFrame):
"""Plot wind rose diagram with improved clarity, inspired by Climate Consultant."""
st.subheader("Wind Rose")
wind_speed = pd.to_numeric(epw_data[21], errors='coerce').values
wind_direction = pd.to_numeric(epw_data[20], errors='coerce').values
valid_mask = ~np.isnan(wind_speed) & ~np.isnan(wind_direction)
wind_speed = wind_speed[valid_mask]
wind_direction = wind_direction[valid_mask]
# Bin data with 8 directions and tailored speed bins
speed_bins = [0, 2, 4, 6, 8, np.inf]
direction_bins = np.linspace(0, 360, 9)[:-1]
speed_labels = ['0-2 m/s', '2-4 m/s', '4-6 m/s', '6-8 m/s', '8+ m/s']
direction_labels = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
hist = np.histogram2d(
wind_direction, wind_speed,
bins=[direction_bins, speed_bins],
density=True
)[0]
hist = hist * 100 # Convert to percentage
fig = go.Figure()
colors = ['#E6F0FF', '#B3D1FF', '#80B2FF', '#4D94FF', '#1A75FF']
for i, speed_label in enumerate(speed_labels):
fig.add_trace(go.Barpolar(
r=hist[:, i],
theta=direction_bins,
width=45,
name=speed_label,
marker=dict(color=colors[i]),
opacity=0.8
))
fig.update_layout(
title="Wind Rose",
polar=dict(
radialaxis=dict(
tickvals=[0, 5, 10, 15],
ticktext=["0%", "5%", "10%", "15%"],
title="Frequency (%)"
),
angularaxis=dict(
direction="clockwise",
rotation=90,
tickvals=direction_bins,
ticktext=direction_labels
)
),
showlegend=True,
template='plotly_white'
)
st.plotly_chart(fig, use_container_width=True)
def export_to_json(self, file_path: str) -> None:
"""Export all climate data to a JSON file."""
data = {loc_id: loc.to_dict() for loc_id, loc in self.locations.items()}
with open(file_path, 'w') as f:
json.dump(data, f, indent=4)
@classmethod
def from_json(cls, file_path: str) -> 'ClimateData':
"""Load climate data from a JSON file."""
with open(file_path, 'r') as f:
data = json.load(f)
climate_data = cls()
for loc_id, loc_dict in data.items():
hourly_data = loc_dict["hourly_data"]
epw_data = pd.DataFrame({
1: [d["month"] for d in hourly_data],
2: [d["day"] for d in hourly_data],
3: [d["hour"] for d in hourly_data],
6: [d["dry_bulb"] for d in hourly_data],
8: [d["relative_humidity"] for d in hourly_data],
9: [d["atmospheric_pressure"] for d in hourly_data],
13: [d["global_horizontal_radiation"] for d in hourly_data],
14: [d["direct_normal_radiation"] for d in hourly_data],
15: [d["diffuse_horizontal_radiation"] for d in hourly_data],
20: [d["wind_direction"] for d in hourly_data],
21: [d["wind_speed"] for d in hourly_data],
})
location = ClimateLocation(
epw_file=epw_data,
typical_extreme_periods=loc_dict["typical_extreme_periods"],
ground_temperatures=loc_dict["ground_temperatures"],
id=loc_dict["id"],
country=loc_dict["country"],
state_province=loc_dict["state_province"],
city=loc_dict["city"],
latitude=loc_dict["latitude"],
longitude=loc_dict["longitude"],
elevation=loc_dict["elevation"],
time_zone=loc_dict["time_zone"],
climate_zone=loc_dict["climate_zone"]
)
location.hourly_data = loc_dict["hourly_data"]
climate_data.add_location(location)
return climate_data
if __name__ == "__main__":
climate_data = ClimateData()
session_state = {"building_info": {"country": "Australia", "city": "Geelong"}, "page": "Climate Data"}
climate_data.display_climate_input(session_state)