Spaces:
Sleeping
Sleeping
| """ | |
| HVAC Calculator Code Documentation | |
| Developed by: Dr Majed Abuseif, Deakin University | |
| © 2025 | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Optional, NamedTuple, Any, Tuple | |
| from enum import Enum | |
| import streamlit as st | |
| from app.materials_library import GlazingMaterial, Material, MaterialLibrary | |
| from app.internal_loads import PEOPLE_ACTIVITY_LEVELS, LIGHTING_FIXTURE_TYPES, DEFAULT_BUILDING_INTERNALS | |
| from datetime import datetime | |
| from collections import defaultdict | |
| import logging | |
| import math | |
| from utils.ctf_calculations import CTFCalculator, ComponentType, CTFCoefficients | |
| from utils.solar import SolarCalculations # Import SolarCalculations for SHGC data | |
| from app.m_c_data import SAMPLE_MATERIALS, SAMPLE_FENESTRATIONS, DEFAULT_MATERIAL_PROPERTIES, DEFAULT_WINDOW_PROPERTIES | |
| import plotly.express as px | |
| import uuid | |
| import psychrolib | |
| run_id = str(uuid.uuid4()) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Initialize psychrolib for SI units | |
| psychrolib.SetUnitSystem(psychrolib.SI) | |
| class AdaptiveComfortModel: | |
| def compute_daily_mean_temperatures(hourly_data: List[Dict]) -> List[Tuple[Tuple[int, int], float]]: | |
| daily_means = {} | |
| for data in hourly_data: | |
| key = (data["month"], data["day"]) | |
| daily_means.setdefault(key, []).append(data["dry_bulb"]) | |
| return [(key, np.mean(values)) for key, values in sorted(daily_means.items())] | |
| def compute_running_mean(daily_means: List[float], alpha: float = 0.8) -> List[float]: | |
| trm = [] | |
| for i, t in enumerate(daily_means): | |
| if i == 0: | |
| trm.append(t) | |
| else: | |
| trm.append((1 - alpha) * t + alpha * trm[i - 1]) | |
| return trm | |
| def generate_adaptive_setpoints(hourly_data: List[Dict], acceptability: str = "90") -> Dict[Tuple[int, int], float]: | |
| daily_mean_data = AdaptiveComfortModel.compute_daily_mean_temperatures(hourly_data) | |
| daily_keys = [key for key, _ in daily_mean_data] | |
| daily_values = [value for _, value in daily_mean_data] | |
| running_means = AdaptiveComfortModel.compute_running_mean(daily_values) | |
| setpoints = {} | |
| for i, key in enumerate(daily_keys): | |
| trm = running_means[i] | |
| if acceptability == "80": | |
| t_min = 0.31 * trm + 14.3 - 3.0 | |
| t_max = 0.31 * trm + 14.3 + 3.0 | |
| elif acceptability == "85": | |
| t_min = 0.31 * trm + 14.3 - 2.5 | |
| t_max = 0.31 * trm + 14.3 + 2.5 | |
| elif acceptability == "95": | |
| t_min = 0.31 * trm + 14.3 - 1.5 | |
| t_max = 0.31 * trm + 14.3 + 1.5 | |
| else: # Default to 90% | |
| t_min = 0.31 * trm + 14.3 - 2.0 | |
| t_max = 0.31 * trm + 14.3 + 2.0 | |
| setpoints[key] = (t_min + t_max) / 2 | |
| return setpoints | |
| def classify_azimuth(azimuth: float) -> str: | |
| """Classify azimuth angle into cardinal directions.""" | |
| azimuth = azimuth % 360 | |
| if 337.5 <= azimuth or azimuth < 22.5: | |
| return "N" | |
| elif 22.5 <= azimuth < 67.5: | |
| return "NE" | |
| elif 67.5 <= azimuth < 112.5: | |
| return "E" | |
| elif 112.5 <= azimuth < 157.5: | |
| return "SE" | |
| elif 157.5 <= azimuth < 202.5: | |
| return "S" | |
| elif 202.5 <= azimuth < 247.5: | |
| return "SW" | |
| elif 247.5 <= azimuth < 292.5: | |
| return "W" | |
| elif 292.5 <= azimuth < 337.5: | |
| return "NW" | |
| return "N" | |
| class TFMCalculations: | |
| def get_component_type(component: Dict[str, Any]) -> ComponentType: | |
| """Map component dictionary 'type' to ComponentType enum.""" | |
| comp_type_map = { | |
| 'walls': ComponentType.WALL, | |
| 'roofs': ComponentType.ROOF, | |
| 'floors': ComponentType.FLOOR, | |
| 'windows': ComponentType.WINDOW, | |
| 'skylights': ComponentType.SKYLIGHT | |
| } | |
| comp_type_str = component.get('type', '').lower() | |
| component_type = comp_type_map.get(comp_type_str, None) | |
| if not component_type: | |
| logger.warning(f"Invalid component type '{comp_type_str}' for component '{component.get('name', 'Unknown')}'. Defaulting to WALL.") | |
| return ComponentType.WALL | |
| return component_type | |
| def calculate_conduction_load(component: Dict[str, Any], outdoor_temp: float, indoor_temp: float, hour: int, month: int, mode: str = "none") -> tuple[float, float]: | |
| """Calculate conduction load for heating and cooling in kW based on mode.""" | |
| if mode == "none": | |
| return 0, 0 | |
| component_name = component.get('name', 'unnamed_component') | |
| component_type = TFMCalculations.get_component_type(component) | |
| # Validate adiabatic and ground_contact mutual exclusivity | |
| if component.get('adiabatic', False) and component.get('ground_contact', False): | |
| logger.warning(f"Component {component_name} has both adiabatic and ground_contact set to True. Treating as adiabatic, setting ground_contact to False.") | |
| component['ground_contact'] = False | |
| # Skip for adiabatic components | |
| if component.get('adiabatic', False): | |
| logger.info(f"Skipping conduction load calculation for adiabatic component {component_name} at hour {hour}") | |
| return 0, 0 | |
| # Handle ground-contact surfaces | |
| if component.get('ground_contact', False): | |
| valid_types = [ComponentType.WALL, ComponentType.ROOF, ComponentType.FLOOR] | |
| if component_type not in valid_types: | |
| logger.warning(f"Invalid ground-contact component type '{component_type.value}' for {component_name}. Using outdoor temperature {outdoor_temp:.2f}°C.") | |
| else: | |
| # Retrieve ground temperature | |
| climate_data = st.session_state.project_data.get("climate_data", {}) | |
| ground_temperatures = climate_data.get("ground_temperatures", {}) | |
| depth = "2" # Default depth | |
| default_temps = {"0.5": 20.0, "2": 18.0, "4": 16.0} | |
| if depth not in ground_temperatures or not ground_temperatures[depth]: | |
| logger.warning(f"No ground temperature data for depth {depth} m for month {month}. Using default {default_temps[depth]}°C.") | |
| outdoor_temp = default_temps[depth] | |
| else: | |
| outdoor_temp = ground_temperatures[depth][month-1] if len(ground_temperatures[depth]) >= month else default_temps[depth] | |
| logger.info(f"Ground-contact component {component_name} at hour {hour}, month {month}: using ground temperature {outdoor_temp:.2f}°C") | |
| delta_t = outdoor_temp - indoor_temp | |
| if mode == "cooling" and delta_t <= 0: | |
| return 0, 0 | |
| if mode == "heating" and delta_t >= 0: | |
| return 0, 0 | |
| try: | |
| # Get CTF coefficients, preferring stored value | |
| ctf = component.get('ctf') | |
| if not ctf: | |
| logger.debug(f"No CTF coefficients found for {component_name}. Calculating CTF coefficients.") | |
| ctf = CTFCalculator.calculate_ctf_coefficients(component) | |
| component['ctf'] = ctf # Store in dictionary | |
| # Initialize history terms (simplified: assume steady-state history for demonstration) | |
| load = component.get('u_value', 0.0) * component.get('area', 0.0) * delta_t | |
| for i in range(len(ctf.Y)): | |
| load += component.get('area', 0.0) * ctf.Y[i] * (outdoor_temp - indoor_temp) * np.exp(-i * 3600 / 3600) | |
| load -= component.get('area', 0.0) * ctf.Z[i] * (outdoor_temp - indoor_temp) * np.exp(-i * 3600 / 3600) | |
| cooling_load = load / 1000 if mode == "cooling" else 0 | |
| heating_load = -load / 1000 if mode == "heating" else 0 | |
| logger.info(f"Conduction load for {component_name} at hour {hour}: cooling={cooling_load:.3f} kW, heating={heating_load:.3f} kW") | |
| return cooling_load, heating_load | |
| except Exception as e: | |
| logger.error(f"Error calculating conduction load for {component_name} at hour {hour}: {str(e)}") | |
| return 0, 0 | |
| def day_of_year(month: int, day: int, year: int) -> int: | |
| """Calculate day of the year (n) from month, day, and year, accounting for leap years.""" | |
| days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] | |
| if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0): | |
| days_in_month[1] = 29 | |
| return sum(days_in_month[:month-1]) + day | |
| def equation_of_time(n: int) -> float: | |
| """Calculate Equation of Time (EOT) in minutes using Spencer's formula.""" | |
| B = (n - 1) * 360 / 365 | |
| B_rad = math.radians(B) | |
| EOT = 229.2 * (0.000075 + 0.001868 * math.cos(B_rad) - 0.032077 * math.sin(B_rad) - | |
| 0.014615 * math.cos(2 * B_rad) - 0.04089 * math.sin(2 * B_rad)) | |
| return EOT | |
| def get_surface_parameters(component: Dict[str, Any], building_info: Dict, material_library: MaterialLibrary, | |
| project_materials: Dict, project_constructions: Dict, | |
| project_glazing_materials: Dict) -> Tuple[float, float, float, Optional[float], float]: | |
| """Determine surface parameters (tilt, azimuth, h_o, emissivity, absorptivity) for a component.""" | |
| component_name = component.get('name', 'unnamed_component') | |
| component_type = TFMCalculations.get_component_type(component) | |
| # Default parameters | |
| surface_tilt = component.get('surface_tilt', 90.0 if component_type in [ComponentType.WALL, ComponentType.WINDOW] else | |
| 0.0 if component_type in [ComponentType.ROOF, ComponentType.SKYLIGHT] else 180.0) | |
| surface_azimuth = component.get('surface_azimuth', 0.0) | |
| h_o = DEFAULT_WINDOW_PROPERTIES["h_o"] if component_type in [ComponentType.WINDOW, ComponentType.SKYLIGHT] else \ | |
| 17.0 if component_type in [ComponentType.WALL, ComponentType.FLOOR] else 23.0 | |
| emissivity = component.get('emissivity', 0.9 if component_type in [ComponentType.WALL, ComponentType.ROOF] else None) | |
| absorptivity = component.get('absorptivity', 0.6 if component_type in [ComponentType.WALL, ComponentType.ROOF] else 0.0) | |
| try: | |
| # For windows and skylights, adjust h_o and use shgc instead of absorptivity | |
| if component_type in [ComponentType.WINDOW, ComponentType.SKYLIGHT]: | |
| fenestration_name = component.get('fenestration', None) | |
| h_o = component.get('h_o', DEFAULT_WINDOW_PROPERTIES["h_o"]) # Use component-stored h_o | |
| if not fenestration_name: | |
| logger.warning(f"No fenestration defined for {component_name}. Using default SHGC=0.7, h_o={h_o}.") | |
| absorptivity = component.get('shgc', 0.7) # Use shgc as absorptivity for consistency | |
| else: | |
| absorptivity = component.get('shgc', 0.7) # Use component-stored shgc | |
| logger.debug(f"Using component-stored SHGC for {component_name}: shgc={absorptivity}, h_o={h_o}") | |
| emissivity = None # Emissivity not used for fenestrations | |
| logger.info(f"Surface parameters for {component_name}: tilt={surface_tilt:.1f}, azimuth={surface_azimuth:.1f}, h_o={h_o:.1f}, " | |
| f"emissivity={emissivity}, absorptivity={absorptivity}") | |
| return surface_tilt, surface_azimuth, h_o, emissivity, absorptivity | |
| except Exception as e: | |
| logger.error(f"Error retrieving surface parameters for {component_name}: {str(e)}") | |
| # Apply defaults on error | |
| if component_type == ComponentType.ROOF: | |
| surface_tilt = 0.0 | |
| h_o = 23.0 | |
| surface_azimuth = 0.0 | |
| elif component_type == ComponentType.SKYLIGHT: | |
| surface_tilt = 0.0 | |
| h_o = DEFAULT_WINDOW_PROPERTIES["h_o"] | |
| surface_azimuth = 0.0 | |
| elif component_type == ComponentType.FLOOR: | |
| surface_tilt = 180.0 | |
| h_o = 17.0 | |
| surface_azimuth = 0.0 | |
| else: # WALL, WINDOW | |
| surface_tilt = 90.0 | |
| h_o = DEFAULT_WINDOW_PROPERTIES["h_o"] if component_type == ComponentType.WINDOW else 17.0 | |
| surface_azimuth = 0.0 | |
| if component_type in [ComponentType.WALL, ComponentType.ROOF]: | |
| absorptivity = 0.6 | |
| emissivity = 0.9 | |
| else: # WINDOW, SKYLIGHT | |
| absorptivity = 0.7 | |
| emissivity = None | |
| logger.info(f"Default surface parameters for {component_name}: tilt={surface_tilt:.1f}, azimuth={surface_azimuth:.1f}, h_o={h_o:.1f}") | |
| return surface_tilt, surface_azimuth, h_o, emissivity, absorptivity | |
| def calculate_solar_load(component: Dict[str, Any], hourly_data: Dict, hour: int, building_orientation: float, mode: str = "none") -> float: | |
| """Calculate solar load in kW (cooling only) using ASHRAE-compliant solar calculations.""" | |
| if mode != "cooling": | |
| return 0 | |
| component_type = TFMCalculations.get_component_type(component) | |
| component_name = component.get('name', 'unnamed_component') | |
| # Skip for floors, adiabatic, or ground-contact components | |
| if component_type == ComponentType.FLOOR or component.get('adiabatic', False) or component.get('ground_contact', False): | |
| logger.info(f"Skipping solar load calculation for {component_name} at hour {hour} (type={component_type.value}, adiabatic={component.get('adiabatic', False)}, ground_contact={component.get('ground_contact', False)})") | |
| return 0 | |
| try: | |
| material_library = st.session_state.get("material_library") | |
| if not material_library: | |
| from app.materials_library import MaterialLibrary | |
| material_library = MaterialLibrary() | |
| st.session_state.material_library = material_library | |
| logger.info(f"Created new MaterialLibrary for {component_name}") | |
| project_materials = st.session_state.get("project_data", {}).get("materials", {}).get("project", {}) | |
| project_constructions = st.session_state.get("project_data", {}).get("constructions", {}).get("project", {}) | |
| project_glazing_materials = st.session_state.get("project_data", {}).get("fenestrations", {}).get("project", {}) | |
| climate_data = st.session_state.get("project_data", {}).get("climate_data", {}) | |
| latitude = climate_data.get("latitude", 0.0) | |
| longitude = climate_data.get("longitude", 0.0) | |
| timezone = climate_data.get("time_zone", 0.0) | |
| ground_reflectivity = climate_data.get("ground_reflectivity", 0.2) | |
| if not -90 <= latitude <= 90: | |
| logger.warning(f"Invalid latitude {latitude} for {component_name}. Using default 0.0.") | |
| latitude = 0.0 | |
| if not -180 <= longitude <= 180: | |
| logger.warning(f"Invalid longitude {longitude} for {component_name}. Using default 0.0.") | |
| longitude = 0.0 | |
| if not -12 <= timezone <= 14: | |
| logger.warning(f"Invalid timezone {timezone} for {component_name}. Using default 0.0.") | |
| timezone = 0.0 | |
| if not 0 <= ground_reflectivity <= 1: | |
| logger.warning(f"Invalid ground_reflectivity {ground_reflectivity} for {component_name}. Using default 0.2.") | |
| ground_reflectivity = 0.2 | |
| required_fields = ["month", "day", "hour", "global_horizontal_radiation", "direct_normal_radiation", | |
| "diffuse_horizontal_radiation", "dry_bulb"] | |
| if not all(field in hourly_data for field in required_fields): | |
| logger.warning(f"Missing required fields in hourly_data for hour {hour} for {component_name}: {hourly_data}") | |
| return 0 | |
| if hourly_data["global_horizontal_radiation"] <= 0: | |
| logger.info(f"No solar load for hour {hour} due to GHI={hourly_data['global_horizontal_radiation']} for {component_name}") | |
| return 0 | |
| month = hourly_data["month"] | |
| day = hourly_data["day"] | |
| hour = hourly_data["hour"] | |
| ghi = hourly_data["global_horizontal_radiation"] | |
| dni = hourly_data.get("direct_normal_radiation", ghi * 0.7) | |
| dhi = hourly_data.get("diffuse_horizontal_radiation", ghi * 0.3) | |
| outdoor_temp = hourly_data["dry_bulb"] | |
| if ghi < 0 or dni < 0 or dhi < 0: | |
| logger.error(f"Negative radiation values for {month}/{day}/{hour} for {component_name}") | |
| raise ValueError(f"Negative radiation values for {month}/{day}/{hour}") | |
| logger.info(f"Processing solar for {month}/{day}/{hour} with GHI={ghi}, DNI={dni}, DHI={dhi}, " | |
| f"dry_bulb={outdoor_temp} for {component_name}") | |
| year = 2025 | |
| n = TFMCalculations.day_of_year(month, day, year) | |
| EOT = TFMCalculations.equation_of_time(n) | |
| lambda_std = 15 * timezone | |
| standard_time = hour - 1 + 0.5 | |
| LST = standard_time + (4 * (lambda_std - longitude) + EOT) / 60 | |
| delta = 23.45 * math.sin(math.radians(360 / 365 * (284 + n))) | |
| phi = math.radians(latitude) | |
| delta_rad = math.radians(delta) | |
| hra = 15 * (LST - 12) | |
| hra_rad = math.radians(hra) | |
| sin_alpha = math.sin(phi) * math.sin(delta_rad) + math.cos(phi) * math.cos(delta_rad) * math.cos(hra_rad) | |
| alpha = math.degrees(math.asin(sin_alpha)) | |
| if abs(math.cos(math.radians(alpha))) < 0.01: | |
| azimuth = 0 | |
| else: | |
| sin_az = math.cos(delta_rad) * math.sin(hra_rad) / math.cos(math.radians(alpha)) | |
| cos_az = (sin_alpha * math.sin(phi) - math.sin(delta_rad)) / (math.cos(math.radians(alpha)) * math.cos(phi)) | |
| azimuth = math.degrees(math.atan2(sin_az, cos_az)) | |
| if hra > 0: | |
| azimuth = 360 - azimuth if azimuth > 0 else -azimuth | |
| logger.info(f"Solar angles for {month}/{day}/{hour}: declination={delta:.2f}, LST={LST:.2f}, " | |
| f"HRA={hra:.2f}, altitude={alpha:.2f}, azimuth={azimuth:.2f} for {component_name}") | |
| building_info = {"orientation_angle": building_orientation} | |
| try: | |
| surface_tilt, surface_azimuth, h_o, emissivity, absorptivity = \ | |
| TFMCalculations.get_surface_parameters( | |
| component, building_info, material_library, project_materials, | |
| project_constructions, project_glazing_materials | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error getting surface parameters for {component_name}: {str(e)}. Using defaults.") | |
| if component_type == ComponentType.ROOF: | |
| surface_tilt = 0.0 | |
| surface_azimuth = 0.0 | |
| elif component_type == ComponentType.SKYLIGHT: | |
| surface_tilt = 0.0 | |
| surface_azimuth = 0.0 | |
| elif component_type == ComponentType.FLOOR: | |
| surface_tilt = 180.0 | |
| surface_azimuth = 0.0 | |
| else: # WALL, WINDOW | |
| surface_tilt = 90.0 | |
| surface_azimuth = 0.0 | |
| if component_type in [ComponentType.WALL, ComponentType.ROOF]: | |
| absorptivity = 0.6 | |
| h_o = 17.0 if component_type == ComponentType.WALL else 23.0 | |
| else: # WINDOW, SKYLIGHT | |
| absorptivity = 0.0 | |
| h_o = DEFAULT_WINDOW_PROPERTIES["h_o"] | |
| alpha_rad = math.radians(alpha) | |
| surface_tilt_rad = math.radians(surface_tilt) | |
| azimuth_rad = math.radians(azimuth) | |
| surface_azimuth_rad = math.radians(surface_azimuth) | |
| cos_theta = (math.sin(alpha_rad) * math.cos(surface_tilt_rad) + | |
| math.cos(alpha_rad) * math.sin(surface_tilt_rad) * | |
| math.cos(azimuth_rad - surface_azimuth_rad)) | |
| cos_theta = max(min(cos_theta, 1.0), 0.0) | |
| logger.info(f" Component {component_name} at {month}/{day}/{hour}: " | |
| f"surface_tilt={surface_tilt:.2f}, surface_azimuth={surface_azimuth:.2f}, " | |
| f"cos_theta={cos_theta:.4f}") | |
| view_factor = (1 - math.cos(surface_tilt_rad)) / 2 | |
| ground_reflected = ground_reflectivity * ghi * view_factor | |
| if cos_theta > 0: | |
| I_t = dni * cos_theta + dhi + ground_reflected | |
| else: | |
| I_t = dhi + ground_reflected | |
| solar_heat_gain = 0.0 | |
| if component_type in [ComponentType.WINDOW, ComponentType.SKYLIGHT]: | |
| shgc = component.get('shgc', 0.7) # Use component-stored shgc | |
| glazing_type = SolarCalculations.GLAZING_TYPE_MAPPING.get(component.get('fenestration', ''), "Single Clear") | |
| shading_coeff = component.get('shading_coefficient', 1.0) # Aligned with components.py | |
| # Adjust shading coefficient based on shading type | |
| shading_type = component.get('shading_type', 'No shading') | |
| if shading_type == "External Shading": | |
| shading_coeff *= 0.6 | |
| elif shading_type == "Internal Shading": | |
| shading_coeff *= 0.8 | |
| shgc_dynamic = shgc * SolarCalculations.calculate_dynamic_shgc(glazing_type, cos_theta) | |
| solar_heat_gain = component.get('area', 0.0) * shgc_dynamic * I_t * shading_coeff / 1000 | |
| logger.info(f"Fenestration solar heat gain for {component_name} at {month}/{day}/{hour}: " | |
| f"{solar_heat_gain:.4f} kW (area={component.get('area', 0.0)}, shgc_dynamic={shgc_dynamic:.4f}, " | |
| f"I_t={I_t:.2f}, shading_coeff={shading_coeff}, shading_type={shading_type})") | |
| elif component_type in [ComponentType.WALL, ComponentType.ROOF]: | |
| surface_resistance = 1/h_o | |
| solar_heat_gain = component.get('area', 0.0) * absorptivity * I_t * surface_resistance / 1000 | |
| logger.info(f"Opaque surface solar heat gain for {component_name} at {month}/{day}/{hour}: " | |
| f"{solar_heat_gain:.4f} kW (area={component.get('area', 0.0)}, absorptivity={absorptivity:.2f}, " | |
| f"I_t={I_t:.2f}, surface_resistance={surface_resistance:.4f})") | |
| return solar_heat_gain | |
| except Exception as e: | |
| logger.error(f"Error calculating solar load for {component_name} at hour {hour}: {str(e)}") | |
| return 0 | |
| def get_schedule_fraction(schedule_name: str, hour: int, is_weekend: bool) -> float: | |
| """Get the schedule fraction for the given hour and day type.""" | |
| schedules = st.session_state.project_data["internal_loads"].get("schedules", {}) | |
| schedule = schedules.get(schedule_name, {}) | |
| if not schedule: | |
| logger.warning(f"Schedule '{schedule_name}' not found. Using fraction=1.0.") | |
| return 1.0 | |
| values = schedule.get("weekend" if is_weekend else "weekday", [1.0] * 24) | |
| hour_idx = hour % 24 | |
| if 0 <= hour_idx < len(values): | |
| fraction = values[hour_idx] | |
| logger.debug(f"Schedule '{schedule_name}' at hour {hour_idx} ({'weekend' if is_weekend else 'weekday'}): fraction={fraction:.2f}") | |
| return fraction | |
| logger.warning(f"Invalid hour index {hour_idx} for schedule '{schedule_name}'. Using fraction=1.0.") | |
| return 1.0 | |
| def calculate_internal_load(internal_loads: Dict, hour: int, operation_hours: int, area: float) -> float: | |
| """Calculate total internal load in kW, incorporating schedules.""" | |
| total_load = 0.0 | |
| is_weekend = False # Simplified; in practice, determine from date | |
| try: | |
| # People loads | |
| for group in internal_loads.get("people", []): | |
| schedule_name = group.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| sensible = group.get("total_sensible_heat", 0.0) | |
| latent = group.get("total_latent_heat", 0.0) | |
| group_load = (sensible + latent) * fraction / 1000 # Convert W to kW | |
| total_load += group_load | |
| logger.debug(f"People group '{group.get('name', 'unknown')}': sensible={sensible:.2f} W, latent={latent:.2f} W, fraction={fraction:.2f}, load={group_load:.3f} kW") | |
| # Lighting loads | |
| for light in internal_loads.get("lighting", []): | |
| schedule_name = light.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| total_power = light.get("total_power", 0.0) | |
| lighting_load = total_power * fraction / 1000 # Convert W to kW | |
| total_load += lighting_load | |
| logger.debug(f"Lighting system '{light.get('name', 'unknown')}': total_power={total_power:.2f} W, fraction={fraction:.2f}, load={lighting_load:.3f} kW") | |
| # Equipment loads | |
| for equip in internal_loads.get("equipment", []): | |
| schedule_name = equip.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| sensible = equip.get("total_sensible_power", 0.0) | |
| latent = equip.get("total_latent_power", 0.0) | |
| equip_load = (sensible + latent) * fraction / 1000 # Convert W to kW | |
| total_load += equip_load | |
| logger.debug(f"Equipment '{equip.get('name', 'unknown')}': sensible={sensible:.2f} W, latent={latent:.2f} W, fraction={fraction:.2f}, load={equip_load:.3f} kW") | |
| logger.info(f"Total internal load for hour {hour}: {total_load:.3f} kW") | |
| return total_load | |
| except Exception as e: | |
| logger.error(f"Error calculating internal load for hour {hour}: {str(e)}") | |
| return 0.0 | |
| def calculate_enthalpy(temp_C: float, RH_percent: float, pressure_kPa: float = 101.325) -> float: | |
| """Calculate air enthalpy (kJ/kg) using psychrometric properties.""" | |
| try: | |
| # Clamp inputs to valid ranges | |
| temp_C = max(-50.0, min(60.0, temp_C)) | |
| RH_percent = max(0.0, min(100.0, RH_percent)) | |
| pressure_kPa = max(50.0, min(120.0, pressure_kPa)) | |
| # Convert pressure to Pa for psychrolib | |
| pressure_Pa = pressure_kPa * 1000.0 | |
| # Calculate humidity ratio (kg/kg) | |
| W = psychrolib.GetHumRatioFromRelHum(temp_C, RH_percent / 100.0, pressure_Pa) | |
| # Calculate enthalpy (kJ/kg) | |
| h = psychrolib.GetMoistAirEnthalpy(temp_C, W) | |
| h = h / 1000.0 # Convert J/kg to kJ/kg | |
| logger.debug(f"Enthalpy calculated: temp={temp_C:.2f}°C, RH={RH_percent:.2f}%, pressure={pressure_kPa:.2f} kPa, h={h:.2f} kJ/kg") | |
| return h | |
| except Exception as e: | |
| logger.error(f"Error calculating enthalpy: {str(e)}. Using default 50 kJ/kg.") | |
| return 50.0 # Default enthalpy for 25°C, 50% RH | |
| def calculate_ventilation_load(internal_loads: Dict, outdoor_temp: float, indoor_temp: float, area: float, building_info: Dict, hour: int, mode: str = "none") -> tuple[float, float]: | |
| """Calculate ventilation load (sensible and latent) for heating and cooling in kW based on mode.""" | |
| if mode == "none": | |
| return 0, 0 | |
| delta_t = outdoor_temp - indoor_temp | |
| if mode == "cooling" and delta_t <= 0: | |
| return 0, 0 | |
| if mode == "heating" and delta_t >= 0: | |
| return 0, 0 | |
| total_cooling_load = 0.0 | |
| total_heating_load = 0.0 | |
| air_density = 1.2 # kg/m³ | |
| specific_heat = 1.006 # kJ/kg·K | |
| is_weekend = False # Simplified; determine from date in practice | |
| climate_data = st.session_state.project_data.get("climate_data", {}) | |
| outdoor_rh = climate_data.get("hourly_data", [{}])[hour % 24].get("relative_humidity", 50.0) | |
| indoor_rh = building_info.get("summer_indoor_design_rh" if mode == "cooling" else "winter_indoor_design_rh", 50.0) | |
| altitude = climate_data.get("location", {}).get("elevation", 0.0) | |
| pressure_kPa = 101.325 * (1 - 2.25577e-5 * altitude) ** 5.25588 | |
| try: | |
| for system in internal_loads.get("ventilation", []): | |
| schedule_name = system.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| system_type = system.get("system_type", "AirChanges/Hour") | |
| system_area = system.get("area", area) | |
| ventilation_flow = 0.0 | |
| fan_power = 0.0 | |
| if system_type == "AirChanges/Hour": | |
| design_flow_rate = system.get("design_flow_rate", 1.0) # L/s·m² | |
| ventilation_flow = min(max(design_flow_rate * system_area / 1000, 0.0), system_area * 0.05) # m³/s, capped at 50 L/s·m² | |
| if system.get("ventilation_type", "Natural") == "Mechanical": | |
| fan_power = (system.get("fan_pressure_rise", 200.0) * ventilation_flow) / system.get("fan_efficiency", 0.7) / 1000 # kW | |
| elif system_type == "Wind and Stack Open Area": | |
| opening_effectiveness = system.get("opening_effectiveness", 50.0) / 100 | |
| ventilation_flow = 0.001 * system_area * opening_effectiveness # m³/s (placeholder) | |
| elif system_type in ["Balanced Flow", "Heat Recovery"]: | |
| design_flow_rate = system.get("design_flow_rate", 1.0) # L/s·m² | |
| ventilation_flow = min(max(design_flow_rate * system_area / 1000, 0.0), system_area * 0.05) # m³/s | |
| fan_power = (system.get("fan_pressure_rise", 200.0) * ventilation_flow) / system.get("fan_efficiency", 0.7) / 1000 # kW | |
| if system_type == "Heat Recovery": | |
| sensible_eff = system.get("sensible_effectiveness", 0.5) | |
| delta_t = delta_t * (1 - sensible_eff) | |
| # Calculate enthalpies | |
| h_out = TFMCalculations.calculate_enthalpy(outdoor_temp, outdoor_rh, pressure_kPa) | |
| h_in = TFMCalculations.calculate_enthalpy(indoor_temp, indoor_rh, pressure_kPa) | |
| # Total load (kW) | |
| total_load = ventilation_flow * air_density * (h_out - h_in) * fraction / 1000 | |
| # Sensible load (kW) | |
| sensible_load = ventilation_flow * air_density * specific_heat * delta_t * fraction / 1000 | |
| # Latent load (kW) | |
| latent_load = total_load - sensible_load | |
| # Assign loads based on mode | |
| cooling_load = (sensible_load + latent_load + fan_power) if mode == "cooling" and total_load > 0 else 0 | |
| heating_load = -(sensible_load + latent_load) if mode == "heating" and total_load < 0 else 0 | |
| total_cooling_load += cooling_load | |
| total_heating_load += heating_load | |
| # Store sensible and latent components for UI | |
| system["sensible_load"] = sensible_load | |
| system["latent_load"] = latent_load | |
| logger.debug(f"Ventilation '{system.get('name', 'unknown')}': flow={ventilation_flow:.4f} m³/s, fraction={fraction:.2f}, sensible={sensible_load:.3f} kW, latent={latent_load:.3f} kW, cooling={cooling_load:.3f} kW, heating={heating_load:.3f} kW") | |
| logger.info(f"Total ventilation load for hour {hour}: cooling={total_cooling_load:.3f} kW, heating={total_heating_load:.3f} kW") | |
| return total_cooling_load, total_heating_load | |
| except Exception as e: | |
| logger.error(f"Error calculating ventilation load for hour {hour}: {str(e)}") | |
| return 0, 0 | |
| def calculate_infiltration_load(internal_loads: Dict, outdoor_temp: float, indoor_temp: float, area: float, building_info: Dict, hour: int, mode: str = "none") -> tuple[float, float]: | |
| """Calculate infiltration load (sensible and latent) for heating and cooling in kW based on mode.""" | |
| if mode == "none": | |
| return 0, 0 | |
| delta_t = outdoor_temp - indoor_temp | |
| if mode == "cooling" and delta_t <= 0: | |
| return 0, 0 | |
| if mode == "heating" and delta_t >= 0: | |
| return 0, 0 | |
| total_cooling_load = 0.0 | |
| total_heating_load = 0.0 | |
| air_density = 1.2 # kg/m³ | |
| specific_heat = 1.006 # kJ/kg·K | |
| building_height = building_info.get("building_height", 3.0) | |
| volume = area * building_height | |
| is_weekend = False # Simplified; determine from date in practice | |
| climate_data = st.session_state.project_data.get("climate_data", {}) | |
| wind_speed = max(climate_data.get("hourly_data", [{}])[hour % 24].get("wind_speed", 4.0), 0.0) | |
| wind_speed = min(wind_speed, 20.0) | |
| wind_direction = climate_data.get("hourly_data", [{}])[hour % 24].get("wind_direction", 0.0) | |
| outdoor_rh = climate_data.get("hourly_data", [{}])[hour % 24].get("relative_humidity", 50.0) | |
| indoor_rh = building_info.get("summer_indoor_design_rh" if mode == "cooling" else "winter_indoor_design_rh", 50.0) | |
| altitude = climate_data.get("location", {}).get("elevation", 0.0) | |
| pressure_kPa = 101.325 * (1 - 2.25577e-5 * altitude) ** 5.25588 | |
| try: | |
| for system in internal_loads.get("infiltration", []): | |
| schedule_name = system.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| system_type = system.get("system_type", "AirChanges/Hour") | |
| system_area = system.get("area", area) | |
| infiltration_flow = 0.0 | |
| if system_type == "AirChanges/Hour": | |
| ach = system.get("design_flow_rate", 0.3) | |
| Q = ach * volume / 3600 # m³/s | |
| wind_coeff = 0.23 # LBL model coefficient | |
| relative_angle = abs((wind_direction - system.get("surface_azimuth", 0.0)) % 360) | |
| wind_coeff_adjusted = wind_coeff * max(0.2, math.cos(math.radians(relative_angle))) | |
| ela = Q / (wind_coeff_adjusted * max(wind_speed, 0.1) ** 2) if wind_speed > 0 else 0.0001 * system_area | |
| infiltration_flow = wind_coeff_adjusted * ela * (wind_speed ** 2) * fraction | |
| elif system_type == "Effective Leakage Area": | |
| ela = system.get("effective_air_leakage_area", 100.0) / 10000 # cm² to m² | |
| stack_coeff = system.get("stack_coefficient", 0.0001) | |
| wind_coeff = system.get("wind_coefficient", 0.0001) | |
| relative_angle = abs((wind_direction - system.get("surface_azimuth", 0.0)) % 360) | |
| wind_coeff_adjusted = wind_coeff * max(0.2, math.cos(math.radians(relative_angle))) | |
| delta_t_abs = abs(delta_t) | |
| Q_stack = stack_coeff * ela * (delta_t_abs ** 0.5) | |
| Q_wind = wind_coeff_adjusted * ela * (wind_speed ** 2) | |
| infiltration_flow = ((Q_stack ** 2 + Q_wind ** 2) ** 0.5) * fraction | |
| elif system_type == "Flow Coefficient": | |
| c = system.get("flow_coefficient", 0.0001) | |
| n = system.get("pressure_exponent", 0.6) | |
| stack_coeff = system.get("stack_coefficient", 0.0001) | |
| wind_coeff = system.get("wind_coefficient", 0.0001) | |
| relative_angle = abs((wind_direction - system.get("surface_azimuth", 0.0)) % 360) | |
| wind_coeff_adjusted = wind_coeff * max(0.2, math.cos(math.radians(relative_angle))) | |
| delta_t_abs = abs(delta_t) | |
| delta_p_stack = stack_coeff * delta_t_abs | |
| delta_p_wind = wind_coeff_adjusted * (wind_speed ** 2) | |
| delta_p = (delta_p_stack ** 2 + delta_p_wind ** 2) ** 0.5 | |
| infiltration_flow = c * (delta_p ** n) * system_area * fraction | |
| # Calculate enthalpies | |
| h_out = TFMCalculations.calculate_enthalpy(outdoor_temp, outdoor_rh, pressure_kPa) | |
| h_in = TFMCalculations.calculate_enthalpy(indoor_temp, indoor_rh, pressure_kPa) | |
| # Total load (kW) | |
| total_load = air_density * infiltration_flow * (h_out - h_in) / 1000 | |
| # Sensible load (kW) | |
| sensible_load = air_density * infiltration_flow * specific_heat * delta_t / 1000 | |
| # Latent load (kW) | |
| latent_load = total_load - sensible_load | |
| # Assign loads based on mode | |
| cooling_load = (sensible_load + latent_load) if mode == "cooling" and total_load > 0 else 0 | |
| heating_load = -(sensible_load + latent_load) if mode == "heating" and total_load < 0 else 0 | |
| total_cooling_load += cooling_load | |
| total_heating_load += heating_load | |
| # Store sensible and latent components for UI | |
| system["sensible_load"] = sensible_load | |
| system["latent_load"] = latent_load | |
| logger.debug(f"Infiltration '{system.get('name', 'unknown')}': flow={infiltration_flow:.4f} m³/s, fraction={fraction:.2f}, sensible={sensible_load:.3f} kW, latent={latent_load:.3f} kW, cooling={cooling_load:.3f} kW, heating={heating_load:.3f} kW") | |
| logger.info(f"Total infiltration load for hour {hour}: cooling={total_cooling_load:.3f} kW, heating={total_heating_load:.3f} kW") | |
| return total_cooling_load, total_heating_load | |
| except Exception as e: | |
| logger.error(f"Error calculating infiltration load for hour {hour}: {str(e)}") | |
| return 0, 0 | |
| def get_adaptive_comfort_temp(outdoor_temp: float) -> float: | |
| """Deprecated: Use AdaptiveComfortModel instead.""" | |
| logger.warning("get_adaptive_comfort_temp is deprecated. Use AdaptiveComfortModel.generate_adaptive_setpoints.") | |
| if 10 <= outdoor_temp <= 33.5: | |
| return 0.31 * outdoor_temp + 17.8 | |
| return 24.0 | |
| def filter_hourly_data(hourly_data: List[Dict], sim_period: Dict, climate_data: Dict) -> List[Dict]: | |
| """Filter hourly data based on simulation period, ignoring year.""" | |
| sim_type = sim_period["type"] | |
| if sim_type == "Full Year": | |
| return hourly_data | |
| filtered_data = [] | |
| if sim_type == "From Date to Date": | |
| start_month = sim_period["start_date"].month | |
| start_day = sim_period["start_date"].day | |
| end_month = sim_period["end_date"].month | |
| end_day = sim_period["end_date"].day | |
| for data in hourly_data: | |
| month, day = data["month"], data["day"] | |
| if (month > start_month or (month == start_month and day >= start_day)) and \ | |
| (month < end_month or (month == end_month and day <= end_day)): | |
| filtered_data.append(data) | |
| elif sim_type in ["Heating Only", "Cooling Only"]: | |
| base_temp = sim_period.get("base_temp", 18.3 if sim_type == "Heating Only" else 23.9) | |
| for data in hourly_data: | |
| temp = data["dry_bulb"] | |
| if (sim_type == "Heating Only" and temp < base_temp) or \ | |
| (sim_type == "Cooling Only" and temp > base_temp): | |
| filtered_data.append(data) | |
| elif sim_type in ["Summer Extreme", "Summer Typical", "Winter Extreme", "Winter Typical"]: | |
| period_key = sim_type.lower().replace(" ", "_") | |
| period = climate_data.get("typical_extreme_periods", {}).get(period_key) | |
| if not period: | |
| logger.warning(f"No data found for {sim_type} in typical_extreme_periods.") | |
| return [] | |
| start_month = period["start"]["month"] | |
| start_day = period["start"]["day"] | |
| end_month = period["end"]["month"] | |
| end_day = period["end"]["day"] | |
| for data in hourly_data: | |
| month, day = data["month"], data["day"] | |
| # Handle year-end wrap-around | |
| if start_month > end_month: | |
| if (month > start_month or (month == start_month and day >= start_day)) or \ | |
| (month < end_month or (month == end_month and day <= end_day)): | |
| filtered_data.append(data) | |
| else: | |
| if (month > start_month or (month == start_month and day >= start_day)) and \ | |
| (month < end_month or (month == end_month and day <= end_day)): | |
| filtered_data.append(data) | |
| return filtered_data | |
| def get_indoor_conditions(indoor_conditions: Dict, hour: int, outdoor_temp: float, month: int = 1, day: int = 1, adaptive_setpoints: Optional[Dict[Tuple[int, int], float]] = None) -> Dict: | |
| """Determine indoor conditions based on user settings.""" | |
| if indoor_conditions["type"] == "Fixed Setpoints": | |
| mode = "none" if abs(outdoor_temp - 18) < 0.01 else "cooling" if outdoor_temp > 18 else "heating" | |
| if mode == "cooling": | |
| return { | |
| "temperature": indoor_conditions.get("cooling_setpoint", {}).get("temperature", 24.0), | |
| "rh": indoor_conditions.get("cooling_setpoint", {}).get("rh", 50.0) | |
| } | |
| elif mode == "heating": | |
| return { | |
| "temperature": indoor_conditions.get("heating_setpoint", {}).get("temperature", 20.0), | |
| "rh": indoor_conditions.get("heating_setpoint", {}).get("rh", 50.0) | |
| } | |
| else: | |
| return {"temperature": 24.0, "rh": 50.0} | |
| else: # Adaptive | |
| key = (month, day) | |
| temp = adaptive_setpoints.get(key, 24.0) if adaptive_setpoints else 24.0 | |
| return {"temperature": temp, "rh": 50.0} | |
| def calculate_tfm_loads(components: Dict, hourly_data: List[Dict], indoor_conditions: Dict, internal_loads: Dict, building_info: Dict, sim_period: Dict, hvac_settings: Dict) -> List[Dict]: | |
| """Calculate TFM loads for heating and cooling with user-defined filters and temperature threshold.""" | |
| # Access climate_data for ground temperatures | |
| climate_data = st.session_state.project_data["climate_data"] | |
| ground_temperatures = climate_data.get("ground_temperatures", {}) | |
| logger.debug(f"Ground temperatures available: {ground_temperatures.keys()}") | |
| filtered_data = TFMCalculations.filter_hourly_data(hourly_data, sim_period, climate_data) | |
| temp_loads = [] | |
| building_orientation = building_info.get("orientation_angle", 0.0) | |
| operating_periods = hvac_settings.get("operating_hours", [{"start": 8, "end": 18}]) | |
| area = building_info.get("floor_area", 100.0) | |
| if "material_library" not in st.session_state: | |
| from app.materials_library import MaterialLibrary | |
| st.session_state.material_library = MaterialLibrary() | |
| logger.info("Initialized MaterialLibrary in session_state for solar calculations") | |
| if indoor_conditions["type"] == "ASHRAE 55 Adaptive Comfort": | |
| acceptability = indoor_conditions.get("adaptive_acceptability", "90") | |
| adaptive_setpoints = AdaptiveComfortModel.generate_adaptive_setpoints(hourly_data, acceptability) | |
| else: | |
| adaptive_setpoints = None | |
| for comp_list in components.values(): | |
| for comp in comp_list: | |
| comp['ctf'] = CTFCalculator.calculate_ctf_coefficients(comp) | |
| logger.debug(f"Stored CTF coefficients for component {comp.get('name', 'Unknown')}") | |
| # Cache total surface area for opaque components (walls, roofs, floors) | |
| total_surface_area = 0.0 | |
| opaque_components = [] | |
| for comp_list in components.values(): | |
| for comp in comp_list: | |
| comp_type = TFMCalculations.get_component_type(comp) | |
| if comp_type in [ComponentType.WALL, ComponentType.ROOF, ComponentType.FLOOR]: | |
| comp_area = comp.get('area', 0.0) | |
| if comp_area > 0: | |
| total_surface_area += comp_area | |
| opaque_components.append(comp) | |
| logger.debug(f"Total surface area for opaque components: {total_surface_area:.2f} m²") | |
| unmet_hours = 0 | |
| for hour_data in filtered_data: | |
| hour = hour_data["hour"] | |
| outdoor_temp = hour_data["dry_bulb"] | |
| month = hour_data["month"] | |
| day = hour_data["day"] | |
| ground_temp = ground_temperatures.get("0.5", [20.0]*12)[month-1] if ground_temperatures else 20.0 | |
| logger.debug(f"Ground temperature for month {month}: {ground_temp:.1f}°C") | |
| indoor_cond = TFMCalculations.get_indoor_conditions(indoor_conditions, hour, outdoor_temp, month, day, adaptive_setpoints) | |
| indoor_temp = indoor_cond["temperature"] | |
| if indoor_conditions["type"] == "ASHRAE 55 Adaptive Comfort": | |
| key = (hour_data["month"], hour_data["day"]) | |
| t_comf = adaptive_setpoints.get(key, 24.0) | |
| t_min = t_comf - (2.0 if indoor_conditions.get("adaptive_acceptability", "90") == "90" else 3.0) | |
| t_max = t_comf + (2.0 if indoor_conditions.get("adaptive_acceptability", "90") == "90" else 3.0) | |
| if indoor_temp < t_min or indoor_temp > t_max: | |
| unmet_hours += 1 | |
| for hour_data in filtered_data: | |
| hour = hour_data["hour"] | |
| outdoor_temp = hour_data["dry_bulb"] | |
| month = hour_data["month"] | |
| day = hour_data["day"] | |
| # For future enhancement: Retrieve ground temperature for the current month | |
| ground_temp = ground_temperatures.get("0.5", [20.0]*12)[month-1] if ground_temperatures else 20.0 | |
| logger.debug(f"Ground temperature for month {month}: {ground_temp:.1f}°C") | |
| indoor_cond = TFMCalculations.get_indoor_conditions(indoor_conditions, hour, outdoor_temp, month, day, adaptive_setpoints) | |
| indoor_temp = indoor_cond["temperature"] | |
| conduction_cooling = conduction_heating = solar = internal = ventilation_cooling = ventilation_heating = infiltration_cooling = infiltration_heating = 0 | |
| solar_by_orientation = defaultdict(float) | |
| conduction_by_orientation = defaultdict(float) | |
| is_operating = False | |
| for period in operating_periods: | |
| start_hour = period.get("start", 8) | |
| end_hour = period.get("end", 18) | |
| if start_hour <= hour % 24 <= end_hour: | |
| is_operating = True | |
| break | |
| mode = "none" if abs(outdoor_temp - 18) < 0.01 else "cooling" if outdoor_temp > 18 else "heating" | |
| # Calculate radiant loads from internal sources | |
| total_radiant_load = 0.0 | |
| internal_loads_conditions = st.session_state.project_data.get("internal_loads_conditions", { | |
| "air_velocity": 0.1, | |
| "lighting_convective_fraction": 0.5, | |
| "lighting_radiative_fraction": 0.5, | |
| "equipment_convective_fraction": 0.5, | |
| "equipment_radiative_fraction": 0.5 | |
| }) | |
| is_weekend = False # Simplified; determine from date in practice | |
| # People radiant load | |
| air_velocity = internal_loads_conditions.get("air_velocity", 0.1) | |
| if air_velocity < 0.0 or air_velocity > 2.0: | |
| logger.warning(f"Air velocity {air_velocity} out of range [0.0, 2.0] for hour {hour}. Clamping to nearest bound.") | |
| air_velocity = max(0.0, min(2.0, air_velocity)) | |
| people_convective_fraction = min(max(0.5 + 0.31 * air_velocity, 0.0), 1.0) | |
| people_radiative_fraction = 1.0 - people_convective_fraction | |
| for group in internal_loads.get("people", []): | |
| schedule_name = group.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| sensible = group.get("total_sensible_heat", 0.0) | |
| radiant_load = sensible * people_radiative_fraction * fraction / 1000 # kW, exclude latent heat | |
| total_radiant_load += radiant_load | |
| logger.debug(f"People group '{group.get('name', 'unknown')}': sensible={sensible:.2f} W, radiative_fraction={people_radiative_fraction:.2f}, fraction={fraction:.2f}, radiant_load={radiant_load:.3f} kW") | |
| # Lighting radiant load | |
| lighting_radiative_fraction = internal_loads_conditions.get("lighting_radiative_fraction", 0.5) | |
| for light in internal_loads.get("lighting", []): | |
| schedule_name = light.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| total_power = light.get("total_power", 0.0) | |
| radiant_load = total_power * lighting_radiative_fraction * fraction / 1000 # kW | |
| total_radiant_load += radiant_load | |
| logger.debug(f"Lighting system '{light.get('name', 'unknown')}': total_power={total_power:.2f} W, radiative_fraction={lighting_radiative_fraction:.2f}, fraction={fraction:.2f}, radiant_load={radiant_load:.3f} kW") | |
| # Equipment radiant load | |
| equipment_radiative_fraction = internal_loads_conditions.get("equipment_radiative_fraction", 0.5) | |
| for equip in internal_loads.get("equipment", []): | |
| schedule_name = equip.get("schedule", "Continuous") | |
| fraction = TFMCalculations.get_schedule_fraction(schedule_name, hour, is_weekend) | |
| sensible = equip.get("total_sensible_power", 0.0) | |
| radiant_load = sensible * equipment_radiative_fraction * fraction / 1000 # kW, exclude latent heat | |
| total_radiant_load += radiant_load | |
| logger.debug(f"Equipment '{equip.get('name', 'unknown')}': sensible={sensible:.2f} W, radiative_fraction={equipment_radiative_fraction:.2f}, fraction={fraction:.2f}, radiant_load={radiant_load:.3f} kW") | |
| logger.debug(f"Total radiant load for hour {hour}: {total_radiant_load:.3f} kW") | |
| # Distribute radiant load to opaque surfaces | |
| if total_surface_area > 0: | |
| for comp in opaque_components: | |
| comp_area = comp.get('area', 0.0) | |
| comp['radiant_load'] = total_radiant_load * (comp_area / total_surface_area) | |
| logger.debug(f"Component '{comp.get('name', 'Unknown')}': area={comp_area:.2f} m², radiant_load={comp['radiant_load']:.3f} kW") | |
| else: | |
| logger.warning(f"No valid surface area for hour {hour}. Skipping radiant load distribution.") | |
| for comp in opaque_components: | |
| comp['radiant_load'] = 0.0 | |
| if is_operating and mode == "cooling": | |
| for comp_list in components.values(): | |
| for comp in comp_list: | |
| cool_load, _ = TFMCalculations.calculate_conduction_load(comp, outdoor_temp, indoor_temp, hour, month, mode="cooling") | |
| component_solar_load = TFMCalculations.calculate_solar_load(comp, hour_data, hour, building_orientation, mode="cooling") | |
| orientation = classify_azimuth(comp.get("surface_azimuth", 0)) | |
| element = TFMCalculations.get_component_type(comp).name | |
| key = orientation if element in ["WALL", "WINDOW"] else element | |
| conduction_by_orientation[key] += cool_load | |
| solar_by_orientation[key] += component_solar_load | |
| conduction_cooling += cool_load | |
| solar += component_solar_load | |
| logger.info(f"Component {comp.get('name', 'Unknown')} ({TFMCalculations.get_component_type(comp).value}) solar load: {component_solar_load:.3f} kW, accumulated solar: {solar:.3f} kW") | |
| internal = TFMCalculations.calculate_internal_load(internal_loads, hour, max([p["end"] - p["start"] for p in operating_periods]), area) | |
| ventilation_cooling, _ = TFMCalculations.calculate_ventilation_load(internal_loads, outdoor_temp, indoor_temp, area, building_info, hour, mode="cooling") | |
| infiltration_cooling, _ = TFMCalculations.calculate_infiltration_load(internal_loads, outdoor_temp, indoor_temp, area, building_info, hour, mode="cooling") | |
| elif is_operating and mode == "heating": | |
| for comp_list in components.values(): | |
| for comp in comp_list: | |
| _, heat_load = TFMCalculations.calculate_conduction_load(comp, outdoor_temp, indoor_temp, hour, month, mode="heating") | |
| orientation = classify_azimuth(comp.get("surface_azimuth", 0)) | |
| element = TFMCalculations.get_component_type(comp).name | |
| key = orientation if element in ["WALL", "WINDOW"] else element | |
| conduction_by_orientation[key] += heat_load | |
| conduction_heating += heat_load | |
| internal = TFMCalculations.calculate_internal_load(internal_loads, hour, max([p["end"] - p["start"] for p in operating_periods]), area) | |
| _, ventilation_heating = TFMCalculations.calculate_ventilation_load(internal_loads, outdoor_temp, indoor_temp, area, building_info, hour, mode="heating") | |
| _, infiltration_heating = TFMCalculations.calculate_infiltration_load(internal_loads, outdoor_temp, indoor_temp, area, building_info, hour, mode="heating") | |
| else: | |
| internal = 0 | |
| logger.info(f"Hour {hour} total loads - conduction: {conduction_cooling:.3f} kW, solar: {solar:.3f} kW, internal: {internal:.3f} kW") | |
| total_cooling = conduction_cooling + solar + internal + ventilation_cooling + infiltration_cooling | |
| total_heating = max(conduction_heating + ventilation_heating + infiltration_heating - internal, 0) | |
| if mode == "cooling": | |
| total_heating = 0 | |
| elif mode == "heating": | |
| total_cooling = 0 | |
| temp_loads.append({ | |
| "hour": hour, | |
| "month": month, | |
| "day": day, | |
| "conduction_cooling": conduction_cooling, | |
| "conduction_heating": conduction_heating, | |
| "solar": solar, | |
| "internal": internal, | |
| "ventilation_cooling": ventilation_cooling, | |
| "ventilation_heating": ventilation_heating, | |
| "infiltration_cooling": infiltration_cooling, | |
| "infiltration_heating": infiltration_heating, | |
| "total_cooling": total_cooling, | |
| "total_heating": total_heating, | |
| "ground_temperature": ground_temp, | |
| "solar_by_orientation": dict(solar_by_orientation), | |
| "conduction_by_orientation": dict(conduction_by_orientation), | |
| "unmet_hours": unmet_hours | |
| }) | |
| loads_by_day = defaultdict(list) | |
| for load in temp_loads: | |
| day_key = (load["month"], load["day"]) | |
| loads_by_day[day_key].append(load) | |
| final_loads = [] | |
| for day_key, day_loads in loads_by_day.items(): | |
| cooling_hours = sum(1 for load in day_loads if load["total_cooling"] > 0) | |
| heating_hours = sum(1 for load in day_loads if load["total_heating"] > 0) | |
| for load in day_loads: | |
| if cooling_hours > heating_hours: | |
| load["total_heating"] = 0 | |
| elif heating_hours > cooling_hours: | |
| load["total_cooling"] = 0 | |
| else: | |
| load["total_cooling"] = 0 | |
| load["total_heating"] = 0 | |
| final_loads.append(load) | |
| return final_loads | |
| def make_pie(data: Dict[str, float], title: str) -> px.pie: | |
| """Create a Plotly pie chart from a dictionary of values.""" | |
| fig = px.pie( | |
| names=list(data.keys()), | |
| values=list(data.values()), | |
| title=title, | |
| hole=0.4 | |
| ) | |
| fig.update_traces(textinfo='percent+label', hoverinfo='label+percent+value') | |
| return fig | |
| def display_hvac_results_ui(loads: List[Dict[str, Any]], run_id: str = "default"): | |
| """Display HVAC load results with enhanced UI elements in a two-column format.""" | |
| st.subheader("HVAC Load Results") | |
| # First Row: Equipment Sizing (Two Columns) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Cooling Equipment Sizing") | |
| cooling_loads = [load for load in loads if load["total_cooling"] > 0] | |
| peak_cooling = max(cooling_loads, key=lambda x: x["total_cooling"]) if cooling_loads else None | |
| if peak_cooling: | |
| st.write(f"**Peak Cooling Load**: {peak_cooling['total_cooling']:.2f} kW") | |
| st.write(f"Occurred on: {peak_cooling['month']}/{peak_cooling['day']} at {peak_cooling['hour']}:00") | |
| vent_sensible = sum(system.get("sensible_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("ventilation", [])) | |
| vent_latent = sum(system.get("latent_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("ventilation", [])) | |
| inf_sensible = sum(system.get("sensible_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("infiltration", [])) | |
| inf_latent = sum(system.get("latent_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("infiltration", [])) | |
| st.metric("Unmet Hours (Adaptive Comfort)", loads[-1].get("unmet_hours", 0)) | |
| else: | |
| st.write("**Peak Cooling Load**: 0.00 kW") | |
| st.metric("Unmet Hours (Adaptive Comfort)", loads[-1].get("unmet_hours", 0) if loads else 0) | |
| with col2: | |
| st.subheader("Heating Equipment Sizing") | |
| heating_loads = [load for load in loads if load["total_heating"] > 0] | |
| peak_heating = max(heating_loads, key=lambda x: x["total_heating"]) if heating_loads else None | |
| if peak_heating: | |
| st.write(f"**Peak Heating Load**: {peak_heating['total_heating']:.2f} kW") | |
| st.write(f"Occurred on: {peak_heating['month']}/{peak_heating['day']} at {peak_heating['hour']}:00") | |
| else: | |
| st.write("**Peak Heating Load**: 0.00 kW") | |
| # Second Row: Monthly Loads Graph (Single Column) | |
| st.subheader("Monthly Heating and Cooling Load") | |
| monthly_df = pd.DataFrame(loads).groupby("month").agg({ | |
| "total_cooling": "sum", | |
| "total_heating": "sum" | |
| }).reset_index() | |
| monthly_df = monthly_df.rename(columns={"total_cooling": "Cooling", "total_heating": "Heating"}) | |
| fig = px.bar( | |
| monthly_df, | |
| x="month", | |
| y=["Cooling", "Heating"], | |
| barmode="group", | |
| title="Monthly Load Summary" | |
| ) | |
| fig.update_xaxes(title="Month", tickvals=list(range(1, 13))) | |
| fig.update_yaxes(title="Load (kW)") | |
| st.plotly_chart(fig, use_container_width=True) | |
| st.session_state.project_data["hvac_loads"]["monthly_summary"] = monthly_df.to_dict() | |
| # Third Row: Load Breakdown (Two Columns) | |
| col3, col4 = st.columns(2) | |
| with col3: | |
| st.subheader("Cooling Load Breakdown") | |
| cooling_breakdown = { | |
| "Conduction": sum(load["conduction_cooling"] for load in cooling_loads), | |
| "Solar Gains": sum(load["solar"] for load in cooling_loads), | |
| "Internal": sum(load["internal"] for load in cooling_loads), | |
| "Ventilation": sum(load["ventilation_cooling"] for load in cooling_loads), | |
| "Infiltration": sum(load["infiltration_cooling"] for load in cooling_loads) | |
| } | |
| cooling_pie = make_pie({k: v for k, v in cooling_breakdown.items() if v > 0}, "Cooling Load Components") | |
| st.plotly_chart(cooling_pie) | |
| st.session_state.project_data["hvac_loads"]["cooling"]["charts"]["pie_by_component"] = cooling_breakdown | |
| with col4: | |
| st.subheader("Heating Load Breakdown") | |
| heating_breakdown = { | |
| "Conduction": sum(load["conduction_heating"] for load in heating_loads), | |
| "Ventilation": sum(load["ventilation_heating"] for load in heating_loads), | |
| "Infiltration": sum(load["infiltration_heating"] for load in heating_loads) | |
| } | |
| heating_pie = make_pie({k: v for k, v in heating_breakdown.items() if v > 0}, "Heating Load Components") | |
| st.plotly_chart(heating_pie) | |
| st.session_state.project_data["hvac_loads"]["heating"]["charts"]["pie_by_component"] = heating_breakdown | |
| # Fourth Row: Heat Gain by Orientation (Two Columns) | |
| col5, col6 = st.columns(2) | |
| with col5: | |
| st.subheader("Cooling Heat Gain by Orientation") | |
| orientation_solar = defaultdict(float) | |
| orientation_conduction = defaultdict(float) | |
| for load in cooling_loads: | |
| for key, value in load["solar_by_orientation"].items(): | |
| orientation_solar[key] += value | |
| for key, value in load["conduction_by_orientation"].items(): | |
| orientation_conduction[key] += value | |
| orientation_breakdown = {k: orientation_solar[k] + orientation_conduction[k] for k in set(orientation_solar) | set(orientation_conduction)} | |
| orientation_pie = make_pie({k: v for k, v in orientation_breakdown.items() if v > 0}, "Cooling Heat Gain by Orientation") | |
| st.plotly_chart(orientation_pie) | |
| st.session_state.project_data["hvac_loads"]["cooling"]["charts"]["pie_by_orientation"] = orientation_breakdown | |
| with col6: | |
| st.subheader("Heating Heat Gain by Orientation") | |
| orientation_conduction = defaultdict(float) | |
| for load in heating_loads: | |
| for key, value in load["conduction_by_orientation"].items(): | |
| orientation_conduction[key] += value | |
| orientation_breakdown = {k: v for k, v in orientation_conduction.items() if v > 0} | |
| orientation_pie = make_pie(orientation_breakdown, "Heating Heat Gain by Orientation") | |
| st.plotly_chart(orientation_pie) | |
| st.session_state.project_data["hvac_loads"]["heating"]["charts"]["pie_by_orientation"] = orientation_breakdown | |
| # Fifth Row: Explore Hourly Loads (Single Column) | |
| st.subheader("Explore Hourly Loads") | |
| df = pd.DataFrame(loads) | |
| # Flatten orientation-based loads | |
| unique_orientations = set() | |
| for load in loads: | |
| unique_orientations.update(load["solar_by_orientation"].keys()) | |
| unique_orientations.update(load["conduction_by_orientation"].keys()) | |
| columns = [ | |
| "month", "day", "hour", | |
| "total_cooling", "total_heating", | |
| "conduction_cooling", "solar", "internal", | |
| "ventilation_cooling", "infiltration_cooling", | |
| "ventilation_heating", "infiltration_heating" | |
| ] | |
| for orient in sorted(unique_orientations): | |
| df[f"solar_{orient}"] = df["solar_by_orientation"].apply(lambda x: x.get(orient, 0.0)) | |
| df[f"conduction_{orient}"] = df["conduction_by_orientation"].apply(lambda x: x.get(orient, 0.0)) | |
| columns.extend([f"solar_{orient}", f"conduction_{orient}"]) | |
| st.dataframe(df[columns]) | |
| # CSV Export | |
| csv = df[columns].to_csv(index=False) | |
| st.download_button("Download Hourly Summary as CSV", data=csv, file_name="hourly_loads.csv") | |
| def display_hvac_loads_page(): | |
| """ | |
| Display the HVAC Loads page in the Streamlit application. | |
| Organizes input configuration and results in separate tabs, with clearing and updating of session state. | |
| """ | |
| try: | |
| st.header("HVAC Loads") | |
| st.markdown("Configure and calculate HVAC loads for the building.") | |
| # Notify if HVAC load data exists in session state | |
| if ( | |
| st.session_state.project_data.get("hvac_loads", {}).get("cooling", {}).get("hourly") | |
| or st.session_state.project_data.get("hvac_loads", {}).get("heating", {}).get("hourly") | |
| ): | |
| st.info( | |
| f"HVAC load results already calculated. " | |
| f"View details in the 'HVAC Load Results' tab or configure new calculations below." | |
| ) | |
| # Create tabs for input and results | |
| tab1, tab2 = st.tabs(["HVAC Load Input", "HVAC Load Results"]) | |
| # HVAC Load Input tab | |
| with tab1: | |
| # Generate a unique run ID for this session | |
| import uuid | |
| run_id = str(uuid.uuid4()) | |
| # Location Information | |
| st.subheader("Location Information") | |
| climate_data = st.session_state.project_data["climate_data"] | |
| location_data = { | |
| "Country": climate_data.get("location", {}).get("country", ""), | |
| "City": climate_data.get("location", {}).get("city", ""), | |
| "State/Province": climate_data.get("location", {}).get("state_province", ""), | |
| "Latitude": climate_data.get("location", {}).get("latitude", 0.0), | |
| "Longitude": climate_data.get("location", {}).get("longitude", 0.0), | |
| "Elevation": climate_data.get("location", {}).get("elevation", 0.0), | |
| "Time Zone": climate_data.get("location", {}).get("timezone", "UTC"), | |
| "Ground Reflectivity": climate_data.get("ground_reflectivity", 0.2) | |
| } | |
| # Create two rows with four columns each | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| country = st.text_input("Country", value=location_data["Country"], key="hvac_country") | |
| with col2: | |
| city = st.text_input("City", value=location_data["City"], key="hvac_city") | |
| with col3: | |
| state_province = st.text_input("State/Province", value=location_data["State/Province"], key="hvac_state_province") | |
| with col4: | |
| latitude = st.number_input( | |
| "Latitude (°)", | |
| min_value=-90.0, | |
| max_value=90.0, | |
| value=location_data["Latitude"], | |
| step=0.1, | |
| key="hvac_latitude" | |
| ) | |
| col5, col6, col7, col8 = st.columns(4) | |
| with col5: | |
| longitude = st.number_input( | |
| "Longitude (°)", | |
| min_value=-180.0, | |
| max_value=180.0, | |
| value=location_data["Longitude"], | |
| step=0.1, | |
| key="hvac_longitude" | |
| ) | |
| with col6: | |
| elevation = st.number_input( | |
| "Elevation (m)", | |
| min_value=0.0, | |
| max_value=10000.0, | |
| value=location_data["Elevation"], | |
| step=1.0, | |
| key="hvac_elevation" | |
| ) | |
| with col7: | |
| timezone = st.number_input( | |
| "Time Zone (UTC offset)", | |
| min_value=-12.0, | |
| max_value=14.0, | |
| value=float(location_data["Time Zone"]) if isinstance(location_data["Time Zone"], (int, float, str)) else 0.0, | |
| step=0.5, | |
| key="hvac_timezone" | |
| ) | |
| with col8: | |
| ground_reflectivity = st.number_input( | |
| "Ground Reflectivity", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=location_data["Ground Reflectivity"], | |
| step=0.01, | |
| key="hvac_ground_reflectivity" | |
| ) | |
| if st.button("Save Location"): | |
| st.session_state.project_data["climate_data"]["location"].update({ | |
| "country": country, | |
| "city": city, | |
| "state_province": state_province, | |
| "latitude": latitude, | |
| "longitude": longitude, | |
| "elevation": elevation, | |
| "timezone": timezone | |
| }) | |
| st.session_state.project_data["climate_data"]["ground_reflectivity"] = ground_reflectivity | |
| st.success("Location information saved successfully.") | |
| logger.info("Location information updated in session state") | |
| # Simulation Period Configuration | |
| st.subheader("Simulation Period") | |
| sim_type = st.selectbox( | |
| "Simulation Type", | |
| ["Full Year", "From Date to Date", "Heating Only", "Cooling Only", | |
| "Summer Extreme", "Summer Typical", "Winter Extreme", "Winter Typical"], | |
| key="hvac_sim_type", | |
| index=["Full Year", "From Date to Date", "Heating Only", "Cooling Only", | |
| "Summer Extreme", "Summer Typical", "Winter Extreme", "Winter Typical"].index( | |
| st.session_state.project_data["sim_period"]["type"] | |
| ) if st.session_state.project_data["sim_period"]["type"] in | |
| ["Full Year", "From Date to Date", "Heating Only", "Cooling Only", | |
| "Summer Extreme", "Summer Typical", "Winter Extreme", "Winter Typical"] else 0 | |
| ) | |
| st.session_state.project_data["sim_period"]["type"] = sim_type | |
| if sim_type == "From Date to Date": | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| start_date = st.date_input( | |
| "Start Date", | |
| value=st.session_state.project_data["sim_period"]["start_date"] or datetime(2025, 1, 1), | |
| key="hvac_start_date" | |
| ) | |
| with col2: | |
| end_date = st.date_input( | |
| "End Date", | |
| value=st.session_state.project_data["sim_period"]["end_date"] or datetime(2025, 12, 31), | |
| key="hvac_end_date" | |
| ) | |
| st.session_state.project_data["sim_period"]["start_date"] = start_date | |
| st.session_state.project_data["sim_period"]["end_date"] = end_date | |
| elif sim_type in ["Heating Only", "Cooling Only"]: | |
| base_temp = st.number_input( | |
| "Base Temperature (°C)", | |
| min_value=0.0, | |
| max_value=40.0, | |
| value=st.session_state.project_data["sim_period"].get("base_temp", 18.3 if sim_type == "Heating Only" else 23.9), | |
| step=0.1, | |
| key="hvac_base_temp" | |
| ) | |
| st.session_state.project_data["sim_period"]["base_temp"] = base_temp | |
| # Indoor Conditions Configuration | |
| st.subheader("Indoor Conditions") | |
| indoor_type = st.selectbox( | |
| "Indoor Conditions Type", | |
| ["Fixed Setpoints", "ASHRAE 55 Adaptive Comfort"], | |
| key="hvac_indoor_type", | |
| index=["Fixed Setpoints", "ASHRAE 55 Adaptive Comfort"].index(st.session_state.project_data["indoor_conditions"]["type"]) | |
| ) | |
| st.session_state.project_data["indoor_conditions"]["type"] = indoor_type | |
| if indoor_type == "Fixed Setpoints": | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| cooling_temp = st.number_input( | |
| "Cooling Setpoint Temperature (°C)", | |
| min_value=18.0, | |
| max_value=30.0, | |
| value=st.session_state.project_data["indoor_conditions"]["cooling_setpoint"]["temperature"], | |
| step=0.1, | |
| key="hvac_cooling_temp" | |
| ) | |
| cooling_rh = st.number_input( | |
| "Cooling Setpoint Relative Humidity (%)", | |
| min_value=30.0, | |
| max_value=70.0, | |
| value=st.session_state.project_data["indoor_conditions"]["cooling_setpoint"]["rh"], | |
| step=1.0, | |
| key="hvac_cooling_rh" | |
| ) | |
| with col2: | |
| heating_temp = st.number_input( | |
| "Heating Setpoint Temperature (°C)", | |
| min_value=16.0, | |
| max_value=26.0, | |
| value=st.session_state.project_data["indoor_conditions"]["heating_setpoint"]["temperature"], | |
| step=0.1, | |
| key="hvac_heating_temp" | |
| ) | |
| heating_rh = st.number_input( | |
| "Heating Setpoint Relative Humidity (%)", | |
| min_value=30.0, | |
| max_value=70.0, | |
| value=st.session_state.project_data["indoor_conditions"]["heating_setpoint"]["rh"], | |
| step=1.0, | |
| key="hvac_heating_rh" | |
| ) | |
| st.session_state.project_data["indoor_conditions"]["cooling_setpoint"] = { | |
| "temperature": cooling_temp, | |
| "rh": cooling_rh | |
| } | |
| st.session_state.project_data["indoor_conditions"]["heating_setpoint"] = { | |
| "temperature": heating_temp, | |
| "rh": heating_rh | |
| } | |
| elif indoor_type == "ASHRAE 55 Adaptive Comfort": | |
| acceptability = st.selectbox( | |
| "Adaptive Comfort Acceptability (%)", | |
| ["80", "85", "90", "95"], | |
| index=["80", "85", "90", "95"].index(st.session_state.project_data["indoor_conditions"].get("adaptive_acceptability", "90")), | |
| key="adaptive_acceptability" | |
| ) | |
| st.session_state.project_data["indoor_conditions"]["adaptive_acceptability"] = acceptability | |
| # Internal Loads Conditions Configuration | |
| st.subheader("Internal Loads Conditions") | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| # Initialize internal_loads_conditions if not present | |
| if "internal_loads_conditions" not in st.session_state.project_data: | |
| st.session_state.project_data["internal_loads_conditions"] = { | |
| "air_velocity": 0.1, | |
| "lighting_convective_fraction": 0.5, | |
| "lighting_radiative_fraction": 0.5, | |
| "equipment_convective_fraction": 0.5, | |
| "equipment_radiative_fraction": 0.5 | |
| } | |
| # Retrieve internal loads data | |
| internal_loads = st.session_state.project_data.get("internal_loads", {}) | |
| lighting_systems = internal_loads.get("lighting", []) | |
| equipment_systems = internal_loads.get("equipment", []) | |
| # Calculate default lighting fractions from lighting systems | |
| if lighting_systems: | |
| lighting_convective_avg = sum(system.get("convective_fraction", 0.5) for system in lighting_systems) / len(lighting_systems) | |
| lighting_radiative_avg = sum(system.get("radiative_fraction", 0.5) for system in lighting_systems) / len(lighting_systems) | |
| else: | |
| lighting_convective_avg = st.session_state.project_data["internal_loads_conditions"].get("lighting_convective_fraction", 0.5) | |
| lighting_radiative_avg = st.session_state.project_data["internal_loads_conditions"].get("lighting_radiative_fraction", 0.5) | |
| # Calculate default equipment fractions from equipment systems | |
| if equipment_systems: | |
| equipment_convective_avg = sum(system.get("convective_fraction", 0.5) for system in equipment_systems) / len(equipment_systems) | |
| equipment_radiative_avg = sum(system.get("radiative_fraction", 0.5) for system in equipment_systems) / len(equipment_systems) | |
| else: | |
| equipment_convective_avg = st.session_state.project_data["internal_loads_conditions"].get("equipment_convective_fraction", 0.5) | |
| equipment_radiative_avg = st.session_state.project_data["internal_loads_conditions"].get("equipment_radiative_fraction", 0.5) | |
| with col1: | |
| air_velocity = st.number_input( | |
| "Air Velocity (m/s)", | |
| min_value=0.0, | |
| max_value=2.0, | |
| value=st.session_state.project_data["internal_loads_conditions"].get("air_velocity", 0.1), | |
| step=0.01, | |
| key="hvac_air_velocity" | |
| ) | |
| if air_velocity < 0.0 or air_velocity > 2.0: | |
| st.error("Air velocity must be between 0 and 2 m/s.") | |
| air_velocity = max(0.0, min(2.0, air_velocity)) | |
| with col2: | |
| lighting_convective_fraction = st.number_input( | |
| "Lighting Convective Fraction", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=lighting_convective_avg, | |
| step=0.01, | |
| key="hvac_lighting_convective" | |
| ) | |
| with col3: | |
| lighting_radiative_fraction = st.number_input( | |
| "Lighting Radiative Fraction", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=lighting_radiative_avg, | |
| step=0.01, | |
| key="hvac_lighting_radiative" | |
| ) | |
| # Validate lighting fractions sum to 1.0 | |
| if abs(lighting_convective_fraction + lighting_radiative_fraction - 1.0) > 0.01: | |
| st.error("Lighting convective and radiative fractions must sum to 1.0.") | |
| lighting_radiative_fraction = 1.0 - lighting_convective_fraction # Auto-correct radiative fraction | |
| st.warning(f"Adjusted Lighting Radiative Fraction to {lighting_radiative_fraction:.2f} to ensure sum equals 1.0.") | |
| with col4: | |
| equipment_convective_fraction = st.number_input( | |
| "Equipment Convective Fraction", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=equipment_convective_avg, | |
| step=0.01, | |
| key="hvac_equipment_convective" | |
| ) | |
| with col5: | |
| equipment_radiative_fraction = st.number_input( | |
| "Equipment Radiative Fraction", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=equipment_radiative_avg, | |
| step=0.01, | |
| key="hvac_equipment_radiative" | |
| ) | |
| # Validate equipment fractions sum to 1.0 | |
| if abs(equipment_convective_fraction + equipment_radiative_fraction - 1.0) > 0.01: | |
| st.error("Equipment convective and radiative fractions must sum to 1.0.") | |
| equipment_radiative_fraction = 1.0 - equipment_convective_fraction # Auto-correct radiative fraction | |
| st.warning(f"Adjusted Equipment Radiative Fraction to {equipment_radiative_fraction:.2f} to ensure sum equals 1.0.") | |
| if st.button("Save Internal Loads Conditions"): | |
| # Update internal loads conditions in session state | |
| st.session_state.project_data["internal_loads_conditions"].update({ | |
| "air_velocity": air_velocity, | |
| "lighting_convective_fraction": lighting_convective_fraction, | |
| "lighting_radiative_fraction": lighting_radiative_fraction, | |
| "equipment_convective_fraction": equipment_convective_fraction, | |
| "equipment_radiative_fraction": equipment_radiative_fraction | |
| }) | |
| # Update lighting systems with new fractions | |
| for system in lighting_systems: | |
| system["convective_fraction"] = lighting_convective_fraction | |
| system["radiative_fraction"] = lighting_radiative_fraction | |
| # Update equipment systems with new fractions | |
| for system in equipment_systems: | |
| system["convective_fraction"] = equipment_convective_fraction | |
| system["radiative_fraction"] = equipment_radiative_fraction | |
| st.success("Internal loads conditions saved successfully.") | |
| logger.info("Internal loads conditions updated in session state.") | |
| # Ground Temperature Configuration | |
| st.subheader("Ground Temperature Configuration") | |
| has_ground_contact = any( | |
| comp.get('ground_contact', False) | |
| for comp_list in st.session_state.project_data["components"].values() | |
| for comp in comp_list | |
| ) | |
| if has_ground_contact: | |
| st.markdown("Configure monthly ground temperatures for components in contact with the ground (e.g., floors, walls, roofs). Typical ranges are 10–20°C at 2 m depth (ASHRAE Fundamentals, Chapter 18).") | |
| depth_options = ["0.5", "2", "4"] | |
| default_depth = "2" | |
| selected_depth = st.selectbox( | |
| "Ground Temperature Depth (m)", | |
| options=depth_options, | |
| index=depth_options.index(default_depth), | |
| key="ground_temp_depth" | |
| ) | |
| climate_data = st.session_state.project_data.get("climate_data", {}) | |
| ground_temperatures = climate_data.get("ground_temperatures", {}) | |
| default_temps = {"0.5": [20.0]*12, "2": [18.0]*12, "4": [16.0]*12} | |
| if selected_depth not in ground_temperatures or not ground_temperatures[selected_depth]: | |
| st.warning(f"No ground temperature data available for depth {selected_depth} m. Using default temperatures: {default_temps[selected_depth][0]}°C.") | |
| monthly_temps = default_temps[selected_depth] | |
| else: | |
| monthly_temps = ground_temperatures[selected_depth] | |
| st.write("Enter monthly ground temperatures (°C):") | |
| months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] | |
| temp_inputs = [] | |
| cols = st.columns(12) | |
| for i, month in enumerate(months): | |
| with cols[i]: | |
| temp = st.number_input( | |
| month, | |
| min_value=-20.0, | |
| max_value=40.0, | |
| value=monthly_temps[i], | |
| step=0.1, | |
| key=f"ground_temp_{selected_depth}_{month}" | |
| ) | |
| temp_inputs.append(temp) | |
| if st.button("Save Ground Temperatures"): | |
| if len(temp_inputs) != 12: | |
| st.error("Please provide temperatures for all 12 months.") | |
| elif any(not -20.0 <= t <= 40.0 for t in temp_inputs): | |
| st.error("All temperatures must be between -20°C and 40°C.") | |
| else: | |
| st.session_state.project_data["climate_data"]["ground_temperatures"][selected_depth] = temp_inputs | |
| st.success(f"Ground temperatures for depth {selected_depth} m saved successfully.") | |
| logger.info(f"Ground temperatures for depth {selected_depth} m updated in session state") | |
| else: | |
| st.info("No ground-contact components detected. Ground temperature configuration is not required.") | |
| # Calculate HVAC Loads | |
| if st.button("Calculate HVAC Loads"): | |
| try: | |
| with st.spinner("Running simulation... this may take up to a minute depending on data size."): | |
| components = st.session_state.project_data["components"] | |
| hourly_data = st.session_state.project_data["climate_data"]["hourly_data"] | |
| indoor_conditions = st.session_state.project_data["indoor_conditions"] | |
| internal_loads = st.session_state.project_data["internal_loads"] | |
| building_info = st.session_state.project_data["building_info"] | |
| sim_period = st.session_state.project_data["sim_period"] | |
| hvac_settings = st.session_state.project_data["hvac_settings"] | |
| if not hourly_data: | |
| st.error("No climate data available. Please configure climate data first.") | |
| logger.error("HVAC calculation failed: No climate data available") | |
| return | |
| elif not any(comp_list for comp_list in components.values()): | |
| st.error("No building components defined. Please configure components first.") | |
| logger.error("HVAC calculation failed: No building components defined") | |
| return | |
| else: | |
| loads = TFMCalculations.calculate_tfm_loads( | |
| components=components, | |
| hourly_data=hourly_data, | |
| indoor_conditions=indoor_conditions, | |
| internal_loads=internal_loads, | |
| building_info=building_info, | |
| sim_period=sim_period, | |
| hvac_settings=hvac_settings | |
| ) | |
| # Clear previous HVAC loads from session state | |
| st.session_state.project_data["hvac_loads"] = { | |
| "cooling": {"hourly": [], "peak": 0, "charts": {}, "breakdown": {}}, | |
| "heating": {"hourly": [], "peak": 0, "charts": {}, "breakdown": {}} | |
| } | |
| # Update session state with new results | |
| cooling_loads = [load for load in loads if load["total_cooling"] > 0] | |
| heating_loads = [load for load in loads if load["total_heating"] > 0] | |
| st.session_state.project_data["hvac_loads"]["cooling"]["hourly"] = cooling_loads | |
| st.session_state.project_data["hvac_loads"]["heating"]["hourly"] = heating_loads | |
| st.session_state.project_data["hvac_loads"]["cooling"]["peak"] = max([load["total_cooling"] for load in cooling_loads], default=0) | |
| st.session_state.project_data["hvac_loads"]["heating"]["peak"] = max([load["total_heating"] for load in heating_loads], default=0) | |
| st.session_state.project_data["hvac_loads"]["cooling"]["charts"] = {} | |
| st.session_state.project_data["hvac_loads"]["heating"]["charts"] = {} | |
| # Store breakdown | |
| cooling_breakdown = { | |
| "Conduction": sum(load["conduction_cooling"] for load in cooling_loads), | |
| "Solar Gains": sum(load["solar"] for load in cooling_loads), | |
| "Internal": sum(load["internal"] for load in cooling_loads), | |
| "Ventilation Sensible": sum(system.get("sensible_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("ventilation", [])), | |
| "Ventilation Latent": sum(system.get("latent_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("ventilation", [])), | |
| "Infiltration Sensible": sum(system.get("sensible_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("infiltration", [])), | |
| "Infiltration Latent": sum(system.get("latent_load", 0.0) for system in st.session_state.project_data["internal_loads"].get("infiltration", [])) | |
| } | |
| heating_breakdown = { | |
| "conduction": sum(load["conduction_heating"] for load in heating_loads), | |
| "ventilation": sum(load["ventilation_heating"] for load in heating_loads), | |
| "infiltration": sum(load["infiltration_heating"] for load in heating_loads) | |
| } | |
| st.session_state.project_data["hvac_loads"]["cooling"]["charts"]["pie_by_component"] = cooling_breakdown | |
| st.session_state.project_data["hvac_loads"]["heating"]["breakdown"] = heating_breakdown | |
| st.success("HVAC loads calculated successfully.") | |
| logger.info("HVAC loads calculated and stored in session state") | |
| st.button("View HVAC Load Results", on_click=lambda: st.session_state.update({"hvac_tab": "HVAC Load Results"})) | |
| except Exception as e: | |
| st.error(f"Error calculating HVAC loads: {str(e)}") | |
| logger.error(f"HVAC calculation error: {str(e)}") | |
| # HVAC Load Results tab | |
| with tab2: | |
| if ( | |
| st.session_state.project_data.get("hvac_loads", {}).get("cooling", {}).get("hourly") | |
| or st.session_state.project_data.get("hvac_loads", {}).get("heating", {}).get("hourly") | |
| ): | |
| loads = [] | |
| cooling_loads = st.session_state.project_data["hvac_loads"]["cooling"].get("hourly", []) | |
| heating_loads = st.session_state.project_data["hvac_loads"]["heating"].get("hourly", []) | |
| loads.extend(cooling_loads) | |
| loads.extend(heating_loads) | |
| # Sort loads by month, day, hour to ensure consistent display | |
| loads = sorted(loads, key=lambda x: (x["month"], x["day"], x["hour"])) | |
| if loads: | |
| display_hvac_results_ui(loads, run_id=run_id) | |
| else: | |
| st.info("Please configure and calculate HVAC loads in the 'HVAC Load Input' tab to view results.") | |
| except Exception as e: | |
| st.error(f"Error rendering HVAC Loads page: {str(e)}") | |
| logger.error(f"HVAC page rendering error: {str(e)}") |