""" Extracts climate data from EPW files Includes Solar Analysis tab for solar angle and ground-reflected radiation calculations. Author: Dr Majed Abuseif Date: May 2025 Version: 2.1.6 """ from typing import Dict, List, Any, Optional import pandas as pd import numpy as np import os import json from dataclasses import dataclass import streamlit as st import plotly.graph_objects as go from io import StringIO import pvlib from datetime import datetime, timedelta import re import logging from data.solar_calculations import SolarCalculations # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define paths DATA_DIR = os.path.dirname(os.path.abspath(__file__)) # CSS for consistent formatting STYLE = """ """ @dataclass class ClimateLocation: """Class representing a climate location with ASHRAE 169 data derived from EPW files.""" id: str country: str state_province: str city: str latitude: float longitude: float elevation: float # meters timezone: float # hours from UTC climate_zone: str heating_degree_days: float # base 18°C cooling_degree_days: float # base 18°C winter_design_temp: float # 99.6% heating design temperature (°C) summer_design_temp_db: float # 0.4% cooling design dry-bulb temperature (°C) summer_design_temp_wb: float # 0.4% cooling design wet-bulb temperature (°C) summer_daily_range: float # Mean daily temperature range in summer (°C) wind_speed: float # Mean wind speed (m/s) pressure: float # Mean atmospheric pressure (Pa) hourly_data: List[Dict] # Hourly data for integration with main.py typical_extreme_periods: Dict[str, Dict] # Typical/extreme periods (summer/winter) ground_temperatures: Dict[str, List[float]] # Monthly ground temperatures by depth solar_calculations: List[Dict] = None # Solar calculation results def __init__(self, epw_file: pd.DataFrame, typical_extreme_periods: Dict, ground_temperatures: Dict, **kwargs): """Initialize ClimateLocation with EPW file data and header information.""" self.id = kwargs.get("id") self.country = kwargs.get("country") self.state_province = kwargs.get("state_province", "N/A") self.city = kwargs.get("city") self.latitude = kwargs.get("latitude") self.longitude = kwargs.get("longitude") self.elevation = kwargs.get("elevation") self.timezone = kwargs.get("timezone") self.typical_extreme_periods = typical_extreme_periods self.ground_temperatures = ground_temperatures self.solar_calculations = kwargs.get("solar_calculations", []) # Extract columns from EPW data months = pd.to_numeric(epw_file[1], errors='coerce').values days = pd.to_numeric(epw_file[2], errors='coerce').values hours = pd.to_numeric(epw_file[3], errors='coerce').values dry_bulb = pd.to_numeric(epw_file[6], errors='coerce').values humidity = pd.to_numeric(epw_file[8], errors='coerce').values pressure = pd.to_numeric(epw_file[9], errors='coerce').values global_radiation = pd.to_numeric(epw_file[13], errors='coerce').values direct_normal_radiation = pd.to_numeric(epw_file[14], errors='coerce').values diffuse_horizontal_radiation = pd.to_numeric(epw_file[15], errors='coerce').values wind_direction = pd.to_numeric(epw_file[20], errors='coerce').values wind_speed = pd.to_numeric(epw_file[21], errors='coerce') # Filter wind speed outliers and log high values wind_speed = wind_speed[wind_speed <= 50] # Remove extreme outliers if (wind_speed > 15).any(): logger.warning(f"High wind speeds detected: {wind_speed[wind_speed > 15].tolist()}") # Calculate wet-bulb temperature wet_bulb = ClimateData.calculate_wet_bulb(dry_bulb, humidity) # Calculate design conditions self.winter_design_temp = round(np.nanpercentile(dry_bulb, 0.4), 1) self.summer_design_temp_db = round(np.nanpercentile(dry_bulb, 99.6), 1) self.summer_design_temp_wb = round(np.nanpercentile(wet_bulb, 99.6), 1) # Calculate degree days using (T_max + T_min)/2 daily_temps = dry_bulb.reshape(-1, 24) daily_max = np.nanmax(daily_temps, axis=1) daily_min = np.nanmin(daily_temps, axis=1) daily_avg = (daily_max + daily_min) / 2 self.heating_degree_days = round(np.nansum(np.where(daily_avg < 18, 18 - daily_avg, 0))) self.cooling_degree_days = round(np.nansum(np.where(daily_avg > 18, daily_avg - 18, 0))) # Calculate summer daily temperature range (June–August, Southern Hemisphere) summer_mask = (months >= 6) & (months <= 8) summer_temps = dry_bulb[summer_mask].reshape(-1, 24) self.summer_daily_range = round(np.nanmean(np.nanmax(summer_temps, axis=1) - np.nanmin(summer_temps, axis=1)), 1) # Calculate mean wind speed and pressure self.wind_speed = round(np.nanmean(wind_speed), 1) self.pressure = round(np.nanmean(pressure), 1) # Log wind speed diagnostics logger.info(f"Wind speed stats: min={wind_speed.min():.1f}, max={wind_speed.max():.1f}, mean={self.wind_speed:.1f}") # Assign climate zone self.climate_zone = ClimateData.assign_climate_zone(self.heating_degree_days, self.cooling_degree_days, np.nanmean(humidity)) # Store hourly data with enhanced fields self.hourly_data = [] for i in range(len(months)): if np.isnan(months[i]) or np.isnan(days[i]) or np.isnan(hours[i]) or np.isnan(dry_bulb[i]): continue # Skip records with missing critical fields record = { "month": int(months[i]), "day": int(days[i]), "hour": int(hours[i]), "dry_bulb": float(dry_bulb[i]), "relative_humidity": float(humidity[i]) if not np.isnan(humidity[i]) else 0.0, "atmospheric_pressure": float(pressure[i]) if not np.isnan(pressure[i]) else self.pressure, "global_horizontal_radiation": float(global_radiation[i]) if not np.isnan(global_radiation[i]) else 0.0, "direct_normal_radiation": float(direct_normal_radiation[i]) if not np.isnan(direct_normal_radiation[i]) else 0.0, "diffuse_horizontal_radiation": float(diffuse_horizontal_radiation[i]) if not np.isnan(diffuse_horizontal_radiation[i]) else 0.0, "wind_speed": float(wind_speed[i]) if not np.isnan(wind_speed[i]) else 0.0, "wind_direction": float(wind_direction[i]) if not np.isnan(wind_direction[i]) else 0.0 } self.hourly_data.append(record) if len(self.hourly_data) != 8760: st.warning(f"Hourly data has {len(self.hourly_data)} records instead of 8760. Some records may have been excluded due to missing data.") def to_dict(self) -> Dict[str, Any]: """Convert the climate location to a dictionary.""" return { "id": self.id, "country": self.country, "state_province": self.state_province, "city": self.city, "latitude": self.latitude, "longitude": self.longitude, "elevation": self.elevation, "timezone": self.timezone, "climate_zone": self.climate_zone, "heating_degree_days": self.heating_degree_days, "cooling_degree_days": self.cooling_degree_days, "winter_design_temp": self.winter_design_temp, "summer_design_temp_db": self.summer_design_temp_db, "summer_design_temp_wb": self.summer_design_temp_wb, "summer_daily_range": self.summer_daily_range, "wind_speed": self.wind_speed, "pressure": self.pressure, "hourly_data": self.hourly_data, "typical_extreme_periods": self.typical_extreme_periods, "ground_temperatures": self.ground_temperatures, "solar_calculations": self.solar_calculations } class ClimateData: """Class for managing ASHRAE 169 climate data from EPW files.""" def __init__(self): """Initialize climate data.""" self.locations = {} self.countries = [] self.country_states = {} def add_location(self, location: ClimateLocation): """Add a new location to the dictionary.""" self.locations[location.id] = location self.countries = sorted(list(set(loc.country for loc in self.locations.values()))) self.country_states = self._group_locations_by_country_state() def _group_locations_by_country_state(self) -> Dict[str, Dict[str, List[str]]]: """Group locations by country and state/province.""" result = {} for loc in self.locations.values(): if loc.country not in result: result[loc.country] = {} if loc.state_province not in result[loc.country]: result[loc.country][loc.state_province] = [] result[loc.country][loc.state_province].append(loc.city) for country in result: for state in result[country]: result[country][state] = sorted(result[country][state]) return result def get_location_by_id(self, location_id: str, session_state: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Retrieve climate data by ID from session state or locations.""" if "climate_data" in session_state and session_state["climate_data"].get("id") == location_id: return session_state["climate_data"] if location_id in self.locations: return self.locations[location_id].to_dict() return None @staticmethod def validate_climate_data(data: Dict[str, Any]) -> bool: """Validate climate data for required fields and ranges.""" required_fields = [ "id", "country", "city", "latitude", "longitude", "elevation", "timezone", "climate_zone", "heating_degree_days", "cooling_degree_days", "winter_design_temp", "summer_design_temp_db", "summer_design_temp_wb", "summer_daily_range", "wind_speed", "pressure", "hourly_data" ] for field in required_fields: if field not in data: st.error(f"Validation failed: Missing required field '{field}'") return False if not (-90 <= data["latitude"] <= 90 and -180 <= data["longitude"] <= 180): st.error("Validation failed: Invalid latitude or longitude") return False if data["elevation"] < 0: st.error("Validation failed: Negative elevation") return False if not (-24 <= data["timezone"] <= 24): st.error(f"Validation failed: Timezone {data['timezone']} outside range") return False if data["climate_zone"] not in ["0A", "0B", "1A", "1B", "2A", "2B", "3A", "3B", "3C", "4A", "4B", "4C", "5A", "5B", "5C", "6A", "6B", "7", "8"]: st.error(f"Validation failed: Invalid climate zone '{data['climate_zone']}'") return False if not (data["heating_degree_days"] >= 0 and data["cooling_degree_days"] >= 0): st.error("Validation failed: Negative degree days") return False if not (-50 <= data["winter_design_temp"] <= 20): st.error(f"Validation failed: Winter design temp {data['winter_design_temp']} outside range") return False if not (0 <= data["summer_design_temp_db"] <= 50 and 0 <= data["summer_design_temp_wb"] <= 40): st.error("Validation failed: Invalid summer design temperatures") return False if data["summer_daily_range"] < 0: st.error("Validation failed: Negative summer daily range") return False if not (0 <= data["wind_speed"] <= 30): st.error(f"Validation failed: Wind speed {data['wind_speed']} outside range") return False if not (80000 <= data["pressure"] <= 110000): st.error(f"Validation failed: Pressure {data['pressure']} outside range") return False if not data["hourly_data"] or len(data["hourly_data"]) < 8700: st.error(f"Validation failed: Hourly data has {len(data['hourly_data'])} records, expected ~8760") return False for record in data["hourly_data"]: if not (1 <= record["month"] <= 12): st.error(f"Validation failed: Invalid month {record['month']}") return False if not (1 <= record["day"] <= 31): st.error(f"Validation failed: Invalid day {record['day']}") return False if not (1 <= record["hour"] <= 24): st.error(f"Validation failed: Invalid hour {record['hour']}") return False if not (-50 <= record["dry_bulb"] <= 50): st.error(f"Validation failed: Dry bulb {record['dry_bulb']} outside range") return False if not (0 <= record["relative_humidity"] <= 100): st.error(f"Validation failed: Relative humidity {record['relative_humidity']} outside range") return False if not (80000 <= record["atmospheric_pressure"] <= 110000): st.error(f"Validation failed: Atmospheric pressure {record['atmospheric_pressure']} outside range") return False if not (0 <= record["global_horizontal_radiation"] <= 1200): st.error(f"Validation failed: Global radiation {record['global_horizontal_radiation']} outside range") return False if not (0 <= record["direct_normal_radiation"] <= 1200): st.error(f"Validation failed: Direct normal radiation {record['direct_normal_radiation']} outside range") return False if not (0 <= record["diffuse_horizontal_radiation"] <= 1200): st.error(f"Validation failed: Diffuse horizontal radiation {record['diffuse_horizontal_radiation']} outside range") return False if not (0 <= record["wind_speed"] <= 30): st.error(f"Validation failed: Wind speed {record['wind_speed']} outside range") return False if not (0 <= record["wind_direction"] <= 360): st.error(f"Validation failed: Wind direction {record['wind_direction']} outside range") return False # Validate typical/extreme periods (optional) if "typical_extreme_periods" in data and data["typical_extreme_periods"]: expected_periods = ["summer_extreme", "summer_typical", "winter_extreme", "winter_typical"] missing_periods = [p for p in expected_periods if p not in data["typical_extreme_periods"]] if missing_periods: st.warning(f"Validation warning: Missing typical/extreme periods: {', '.join(missing_periods)}") for period in data["typical_extreme_periods"].values(): for date in ["start", "end"]: if not (1 <= period[date]["month"] <= 12 and 1 <= period[date]["day"] <= 31): st.error(f"Validation failed: Invalid date in typical/extreme periods: {period[date]}") return False # Validate ground temperatures (optional) if "ground_temperatures" in data and data["ground_temperatures"]: for depth, temps in data["ground_temperatures"].items(): if len(temps) != 12 or not all(0 <= t <= 50 for t in temps): st.error(f"Validation failed: Invalid ground temperatures for depth {depth}") return False # Validate solar calculations (optional) if "solar_calculations" in data and data["solar_calculations"]: for calc in data["solar_calculations"]: if not (1 <= calc["month"] <= 12 and 1 <= calc["day"] <= 31 and 1 <= calc["hour"] <= 24): st.error(f"Validation failed: Invalid date/time in solar calculations: {calc}") return False if not (-23.45 <= calc["declination"] <= 23.45): st.error(f"Validation failed: Declination {calc['declination']} outside range") return False if not (0 <= calc["LST"] <= 24): st.error(f"Validation failed: LST {calc['LST']} outside range") return False if not (-180 <= calc["HRA"] <= 180): st.error(f"Validation failed: HRA {calc['HRA']} outside range") return False if not (0 <= calc["altitude"] <= 90): st.error(f"Validation failed: Altitude {calc['altitude']} outside range") return False if not (0 <= calc["azimuth"] <= 360): st.error(f"Validation failed: Azimuth {calc['azimuth']} outside range") return False if not (0 <= calc["ground_reflected"] <= 1200): st.error(f"Validation failed: Ground-reflected radiation {calc['ground_reflected']} outside range") return False return True @staticmethod def calculate_wet_bulb(dry_bulb: np.ndarray, relative_humidity: np.ndarray) -> np.ndarray: """Calculate Wet Bulb Temperature using Stull (2011) approximation.""" db = np.array(dry_bulb, dtype=float) rh = np.array(relative_humidity, dtype=float) term1 = db * np.arctan(0.151977 * (rh + 8.313659)**0.5) term2 = np.arctan(db + rh) term3 = np.arctan(rh - 1.676331) term4 = 0.00391838 * rh**1.5 * np.arctan(0.023101 * rh) term5 = -4.686035 wet_bulb = term1 + term2 - term3 + term4 + term5 invalid_mask = (rh < 5) | (rh > 99) | (db < -20) | (db > 50) | np.isnan(db) | np.isnan(rh) wet_bulb[invalid_mask] = np.nan return wet_bulb @staticmethod def is_numeric(value: str) -> bool: """Check if a string can be converted to a number.""" try: float(value) return True except ValueError: return False def display_climate_input(self, session_state: Dict[str, Any]): """Display Streamlit interface for EPW upload, visualizations, and solar analysis.""" st.title("Climate Data Analysis") # Apply consistent styling st.markdown(STYLE, unsafe_allow_html=True) # Clear invalid session_state["climate_data"] without warning if "climate_data" in session_state and not all(key in session_state["climate_data"] for key in ["id", "country", "city", "timezone"]): del session_state["climate_data"] uploaded_file = st.file_uploader("Upload EPW File", type=["epw"]) # Initialize location and epw_data for display location = None epw_data = None if uploaded_file: try: # Process new EPW file epw_content = uploaded_file.read().decode("utf-8") epw_lines = epw_content.splitlines() # Parse header header = next(line for line in epw_lines if line.startswith("LOCATION")) header_parts = header.split(",") city = header_parts[1].strip() or "Unknown" # Clean city name by removing suffixes like '.Racecourse' city = re.sub(r'\..*', '', city) state_province = header_parts[2].strip() or "Unknown" country = header_parts[3].strip() or "Unknown" latitude = float(header_parts[6]) longitude = float(header_parts[7]) elevation = float(header_parts[9]) timezone = float(header_parts[8]) # Time zone from EPW header # Parse TYPICAL/EXTREME PERIODS typical_extreme_periods = {} date_pattern = r'^\d{1,2}\s*/\s*\d{1,2}$' for line in epw_lines: if line.startswith("TYPICAL/EXTREME PERIODS"): parts = line.strip().split(',') try: num_periods = int(parts[1]) except ValueError: st.warning("Invalid number of periods in TYPICAL/EXTREME PERIODS, skipping parsing.") break for i in range(num_periods): try: if len(parts) < 2 + i*4 + 4: st.warning(f"Insufficient fields for period {i+1}, skipping.") continue period_name = parts[2 + i*4] period_type = parts[3 + i*4] start_date = parts[4 + i*4].strip() end_date = parts[5 + i*4].strip() if period_name in [ "Summer - Week Nearest Max Temperature For Period", "Summer - Week Nearest Average Temperature For Period", "Winter - Week Nearest Min Temperature For Period", "Winter - Week Nearest Average Temperature For Period" ]: season = 'summer' if 'Summer' in period_name else 'winter' period_type = ('extreme' if 'Max' in period_name or 'Min' in period_name else 'typical') key = f"{season}_{period_type}" # Clean dates to remove non-standard whitespace start_date_clean = re.sub(r'\s+', '', start_date) end_date_clean = re.sub(r'\s+', '', end_date) if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date): st.warning(f"Invalid date format for period {period_name}: {start_date} to {end_date}, skipping.") continue start_month, start_day = map(int, start_date_clean.split('/')) end_month, end_day = map(int, end_date_clean.split('/')) typical_extreme_periods[key] = { "start": {"month": start_month, "day": start_day}, "end": {"month": end_month, "day": end_day} } except (IndexError, ValueError) as e: st.warning(f"Error parsing period {i+1}: {str(e)}, skipping.") continue break # Parse GROUND TEMPERATURES ground_temperatures = {} for line in epw_lines: if line.startswith("GROUND TEMPERATURES"): parts = line.strip().split(',') try: num_depths = int(parts[1]) except ValueError: st.warning("Invalid number of depths in GROUND TEMPERATURES, skipping parsing.") break for i in range(num_depths): try: if len(parts) < 2 + i*16 + 16: st.warning(f"Insufficient fields for ground temperature depth {i+1}, skipping.") continue depth = parts[2 + i*16] temps = [float(t) for t in parts[6 + i*16:18 + i*16] if t.strip()] if len(temps) != 12: st.warning(f"Invalid number of temperatures for depth {depth}m, expected 12, got {len(temps)}, skipping.") continue ground_temperatures[depth] = temps except (ValueError, IndexError) as e: st.warning(f"Error parsing ground temperatures for depth {i+1}: {str(e)}, skipping.") continue break # Read data section data_start_idx = next(i for i, line in enumerate(epw_lines) if line.startswith("DATA PERIODS")) + 1 epw_data = pd.read_csv(StringIO("\n".join(epw_lines[data_start_idx:])), header=None, dtype=str) if len(epw_data) != 8760: raise ValueError(f"EPW file has {len(epw_data)} records, expected 8760.") if len(epw_data.columns) != 35: raise ValueError(f"EPW file has {len(epw_data.columns)} columns, expected 35.") for col in [1, 2, 3, 6, 8, 9, 13, 14, 15, 20, 21]: epw_data[col] = pd.to_numeric(epw_data[col], errors='coerce') if epw_data[col].isna().all(): raise ValueError(f"Column {col} contains only non-numeric or missing data.") # Create ClimateLocation location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=typical_extreme_periods, ground_temperatures=ground_temperatures, id=f"{country[:1].upper()}{city[:3].upper()}", country=country, state_province=state_province, city=city, latitude=latitude, longitude=longitude, elevation=elevation, timezone=timezone ) self.add_location(location) climate_data_dict = location.to_dict() if not self.validate_climate_data(climate_data_dict): raise ValueError("Invalid climate data extracted from EPW file.") session_state["climate_data"] = climate_data_dict st.success("Climate data extracted from EPW file!") except Exception as e: st.error(f"Error processing EPW file: {str(e)}. Ensure it has 8760 hourly records and correct format.") elif "climate_data" in session_state and self.validate_climate_data(session_state["climate_data"]): # Reconstruct from session_state climate_data_dict = session_state["climate_data"] # Rebuild epw_data from hourly_data hourly_data = climate_data_dict["hourly_data"] epw_data = pd.DataFrame({ 1: [d["month"] for d in hourly_data], # Month 2: [d["day"] for d in hourly_data], # Day 3: [d["hour"] for d in hourly_data], # Hour 6: [d["dry_bulb"] for d in hourly_data], # Dry-bulb temperature 8: [d["relative_humidity"] for d in hourly_data], # Relative humidity 9: [d["atmospheric_pressure"] for d in hourly_data], # Pressure 13: [d["global_horizontal_radiation"] for d in hourly_data], # Global horizontal radiation 14: [d["direct_normal_radiation"] for d in hourly_data], # Direct normal radiation 15: [d["diffuse_horizontal_radiation"] for d in hourly_data], # Diffuse horizontal radiation 20: [d["wind_direction"] for d in hourly_data], # Wind direction 21: [d["wind_speed"] for d in hourly_data], # Wind speed }) # Create ClimateLocation with reconstructed epw_data location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=climate_data_dict["typical_extreme_periods"], ground_temperatures=climate_data_dict["ground_temperatures"], id=climate_data_dict["id"], country=climate_data_dict["country"], state_province=climate_data_dict["state_province"], city=climate_data_dict["city"], latitude=climate_data_dict["latitude"], longitude=climate_data_dict["longitude"], elevation=climate_data_dict["elevation"], timezone=climate_data_dict["timezone"], solar_calculations=climate_data_dict.get("solar_calculations", []) ) # Override hourly_data to ensure consistency location.hourly_data = climate_data_dict["hourly_data"] self.add_location(location) st.info("Displaying previously extracted climate data.") # Display tabs if location and epw_data are available if location and epw_data is not None: tab1, tab2 = st.tabs(["General Information", "Solar Analysis"]) with tab1: self.display_design_conditions(location) with tab2: self.display_solar_analysis(location, session_state) else: st.info("No climate data available. Please upload an EPW file to proceed.") def display_solar_analysis(self, location: ClimateLocation, session_state: Dict[str, Any]): """Display solar analysis tab with input fields and calculation results.""" st.subheader("Solar Analysis") # Input fields with help text col1, col2 = st.columns(2) with col1: ground_reflectivity = st.number_input( "Ground Reflectivity (ρg)", min_value=0.0, max_value=1.0, value=0.2, step=0.01, help="Enter the albedo of the ground surface (0 to 1). Common values: 0.2 (grass), 0.3 (concrete), 0.8 (snow). Default: 0.2." ) with col2: surface_tilt = st.number_input( "Surface Tilt (β, degrees)", min_value=0.0, max_value=180.0, value=0.0, step=1.0, help="Enter the tilt angle of the surface in degrees (0° for horizontal, 90° for vertical, up to 180° for downward-facing). Default: 0°." ) # Calculate button if st.button("Calculate Solar Parameters"): try: solar_results = SolarCalculations.calculate_solar_parameters( hourly_data=location.hourly_data, latitude=location.latitude, longitude=location.longitude, timezone=session_state["climate_data"].get("timezone", 0), ground_reflectivity=ground_reflectivity, surface_tilt=surface_tilt ) session_state["climate_data"]["solar_calculations"] = solar_results location.solar_calculations = solar_results st.success("Solar calculations completed!") except Exception as e: st.error(f"Error in solar calculations: {str(e)}") # Display results table if "solar_calculations" in session_state["climate_data"] and session_state["climate_data"]["solar_calculations"]: st.markdown('

Solar Analysis Results

', unsafe_allow_html=True) table_data = [] solar_data = {f"{r['month']}-{r['day']}-{r['hour']}": r for r in session_state["climate_data"]["solar_calculations"]} for record in location.hourly_data: key = f"{record['month']}-{record['day']}-{record['hour']}" row = { "Month": record["month"], "Day": record["day"], "Hour": record["hour"], "Dry Bulb Temperature (°C)": f"{record['dry_bulb']:.1f}", "Relative Humidity (%)": f"{record['relative_humidity']:.1f}", "Wind Speed (m/s)": f"{record['wind_speed']:.1f}", "Wind Direction (°)": f"{record['wind_direction']:.1f}", "Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}", "Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}", "Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}", "Declination (°)": "", "Local Solar Time (h)": "", "Hour Angle (°)": "", "Solar Altitude (°)": "", "Solar Azimuth (°)": "", "Ground-Reflected Radiation (W/m²)": "" } if key in solar_data: solar = solar_data[key] row.update({ "Declination (°)": f"{solar['declination']:.2f}", "Local Solar Time (h)": f"{solar['LST']:.2f}", "Hour Angle (°)": f"{solar['HRA']:.2f}", "Solar Altitude (°)": f"{solar['altitude']:.2f}", "Solar Azimuth (°)": f"{solar['azimuth']:.2f}", "Ground-Reflected Radiation (W/m²)": f"{solar['ground_reflected']:.2f}" }) table_data.append(row) df = pd.DataFrame(table_data) st.dataframe(df, use_container_width=True) else: st.info("No solar calculation results available. Click 'Calculate Solar Parameters' to generate results.") def display_design_conditions(self, location: ClimateLocation): """Display design conditions for HVAC calculations using styled HTML.""" st.subheader("Design Conditions") col1, col2 = st.columns(2) # Location Details (First Column) with col1: st.markdown(f"""

Location Details

""", unsafe_allow_html=True) # Typical/Extreme Periods (Second Column) with col2: if location.typical_extreme_periods: period_items = [ f"
  • {key.replace('_', ' ').title()}: {period['start']['month']}/{period['start']['day']} to {period['end']['month']}/{period['end']['day']}
  • " for key, period in location.typical_extreme_periods.items() ] period_content = f"""

    Typical/Extreme Periods

    """ else: period_content = """

    Typical/Extreme Periods

    No typical/extreme period data available.

    """ st.markdown(period_content, unsafe_allow_html=True) # Calculated Climate Parameters st.markdown(f"""

    Calculated Climate Parameters

    """, unsafe_allow_html=True) # Ground Temperatures (Table) if location.ground_temperatures: st.markdown('

    Ground Temperatures

    ', unsafe_allow_html=True) month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] table_data = [] for depth, temps in location.ground_temperatures.items(): row = {"Depth (m)": float(depth)} row.update({month: f"{temp:.2f}" for month, temp in zip(month_names, temps)}) table_data.append(row) df = pd.DataFrame(table_data) st.dataframe(df, use_container_width=True) # Hourly Climate Data (Table) if location.hourly_data: st.markdown('

    Hourly Climate Data

    ', unsafe_allow_html=True) hourly_table_data = [] for record in location.hourly_data: row = { "Month": record["month"], "Day": record["day"], "Hour": record["hour"], "Dry Bulb Temperature (°C)": f"{record['dry_bulb']:.1f}", "Relative Humidity (%)": f"{record['relative_humidity']:.1f}", "Atmospheric Pressure (Pa)": f"{record['atmospheric_pressure']:.1f}", "Global Horizontal Radiation (W/m²)": f"{record['global_horizontal_radiation']:.1f}", "Direct Normal Radiation (W/m²)": f"{record['direct_normal_radiation']:.1f}", "Diffuse Horizontal Radiation (W/m²)": f"{record['diffuse_horizontal_radiation']:.1f}", "Wind Speed (m/s)": f"{record['wind_speed']:.1f}", "Wind Direction (°)": f"{record['wind_direction']:.1f}" } hourly_table_data.append(row) hourly_df = pd.DataFrame(hourly_table_data) st.dataframe(hourly_df, use_container_width=True) @staticmethod def assign_climate_zone(hdd: float, cdd: float, avg_humidity: float) -> str: """Assign ASHRAE 169 climate zone based on HDD, CDD, and humidity.""" if cdd > 10000: return "0A" if avg_humidity > 60 else "0B" elif cdd > 5000: return "1A" if avg_humidity > 60 else "1B" elif cdd > 2500: return "2A" if avg_humidity > 60 else "2B" elif hdd < 2000 and cdd > 1000: return "3A" if avg_humidity > 60 else "3B" if avg_humidity < 40 else "3C" elif hdd < 3000: return "4A" if avg_humidity > 60 else "4B" if avg_humidity < 40 else "4C" elif hdd < 4000: return "5A" if avg_humidity > 60 else "5B" if avg_humidity < 40 else "5C" elif hdd < 5000: return "6A" if avg_humidity > 60 else "6B" elif hdd < 7000: return "7" else: return "8" def export_to_json(self, file_path: str) -> None: """Export all climate data to a JSON file.""" data = {loc_id: loc.to_dict() for loc_id, loc in self.locations.items()} with open(file_path, 'w') as f: json.dump(data, f, indent=4) @classmethod def from_json(cls, file_path: str) -> 'ClimateData': """Load climate data from a JSON file.""" with open(file_path, 'r') as f: data = json.load(f) climate_data = cls() for loc_id, loc_dict in data.items(): # Rebuild epw_data from hourly_data hourly_data = loc_dict["hourly_data"] epw_data = pd.DataFrame({ 1: [d["month"] for d in hourly_data], 2: [d["day"] for d in hourly_data], 3: [d["hour"] for d in hourly_data], 6: [d["dry_bulb"] for d in hourly_data], 8: [d["relative_humidity"] for d in hourly_data], 9: [d["atmospheric_pressure"] for d in hourly_data], 13: [d["global_horizontal_radiation"] for d in hourly_data], 14: [d["direct_normal_radiation"] for d in hourly_data], 15: [d["diffuse_horizontal_radiation"] for d in hourly_data], 20: [d["wind_direction"] for d in hourly_data], 21: [d["wind_speed"] for d in hourly_data], }) location = ClimateLocation( epw_file=epw_data, typical_extreme_periods=loc_dict["typical_extreme_periods"], ground_temperatures=loc_dict["ground_temperatures"], id=loc_dict["id"], country=loc_dict["country"], state_province=loc_dict["state_province"], city=loc_dict["city"], latitude=loc_dict["latitude"], longitude=loc_dict["longitude"], elevation=loc_dict["elevation"], timezone=loc_dict["timezone"], solar_calculations=loc_dict.get("solar_calculations", []) ) location.hourly_data = loc_dict["hourly_data"] # Ensure consistency climate_data.add_location(location) return climate_data if __name__ == "__main__": climate_data = ClimateData() session_state = {"building_info": {"country": "Australia", "city": "Geelong"}, "page": "Climate Data"} climate_data.display_climate_input(session_state)