CTF-TFM / utils /ctf_calculations.py
mabuseif's picture
Update utils/ctf_calculations.py
017e28e verified
"""
CTF Calculations Module
This module contains the CTFCalculator class for calculating Conduction Transfer Function (CTF)
coefficients for HVAC load calculations using the implicit Finite Difference Method.
Developed by: Dr Majed Abuseif, Deakin University
© 2025
"""
import numpy as np
import scipy.sparse as sparse
import scipy.sparse.linalg as sparse_linalg
import hashlib
import logging
import threading
from typing import List
from data.material_library import Construction
from enum import Enum
from typing import Dict, List, Optional, NamedTuple
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ComponentType(Enum):
WALL = "Wall"
ROOF = "Roof"
FLOOR = "Floor"
WINDOW = "Window"
DOOR = "Door"
SKYLIGHT = "Skylight"
class CTFCoefficients(NamedTuple):
X: List[float] # Exterior temperature coefficients
Y: List[float] # Cross coefficients
Z: List[float] # Interior temperature coefficients
F: List[float] # Flux history coefficients
class CTFCalculator:
"""Class to calculate and cache CTF coefficients for building components."""
# Cache for CTF coefficients based on construction properties
_ctf_cache = {}
_cache_lock = threading.Lock() # Thread-safe lock for cache access
@staticmethod
def _hash_construction(construction: Construction) -> str:
"""Generate a unique hash for a construction based on its properties.
Args:
construction: Construction object containing material layers.
Returns:
str: SHA-256 hash of the construction properties.
"""
hash_input = f"{construction.name}"
for layer in construction.layers:
material = layer["material"]
hash_input += f"{material.name}{material.conductivity}{material.density}{material.specific_heat}{layer['thickness']}"
return hashlib.sha256(hash_input.encode()).hexdigest()
@classmethod
def calculate_ctf_coefficients(cls, component) -> CTFCoefficients:
"""Calculate CTF coefficients using implicit Finite Difference Method.
Note: Per ASHRAE, CTF calculations are skipped for WINDOW, DOOR, and SKYLIGHT components,
as they use typical material properties. CTF tables for these components will be added later.
Args:
component: Building component with construction properties.
Returns:
CTFCoefficients: Named tuple containing X, Y, Z, and F coefficients.
"""
# Skip CTF for WINDOW, DOOR, SKYLIGHT as per ASHRAE; return zero coefficients
if component.component_type in [ComponentType.WINDOW, ComponentType.DOOR, ComponentType.SKYLIGHT]:
logger.info(f"Skipping CTF calculation for {component.component_type.value} component '{component.name}'. Using zero coefficients until CTF tables are implemented.")
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0])
# Check if construction exists and has layers
construction = component.construction
if not construction or not construction.layers:
logger.warning(f"No valid construction or layers for component '{component.name}' ({component.component_type.value}). Returning zero CTFs.")
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0])
# Check cache with thread-safe access
construction_hash = cls._hash_construction(construction)
with cls._cache_lock:
if construction_hash in cls._ctf_cache:
logger.info(f"Using cached CTF coefficients for construction {construction.name}")
return cls._ctf_cache[construction_hash]
# Discretization parameters
dt = 3600 # 1-hour time step (s)
nodes_per_layer = 3 # 2–4 nodes per layer for balance
R_out = 0.04 # Outdoor surface resistance (m²·K/W, ASHRAE)
R_in = 0.12 # Indoor surface resistance (m²·K/W, ASHRAE)
# Collect layer properties
thicknesses = [layer["thickness"] for layer in construction.layers]
materials = [layer["material"] for layer in construction.layers]
k = [m.conductivity for m in materials] # W/m·K
rho = [m.density for m in materials] # kg/m³
c = [m.specific_heat for m in materials] # J/kg·K
alpha = [k_i / (rho_i * c_i) for k_i, rho_i, c_i in zip(k, rho, c)] # Thermal diffusivity (m²/s)
# Calculate node spacing and check stability
total_nodes = sum(nodes_per_layer for _ in thicknesses)
dx = [t / nodes_per_layer for t in thicknesses] # Node spacing per layer
node_positions = []
node_idx = 0
for i, t in enumerate(thicknesses):
for j in range(nodes_per_layer):
node_positions.append((i, j, node_idx)) # (layer_idx, node_in_layer, global_node_idx)
node_idx += 1
# Stability check: Fourier number
for i, (a, d) in enumerate(zip(alpha, dx)):
Fo = a * dt / (d ** 2)
if Fo < 0.33:
logger.warning(f"Fourier number {Fo:.3f} < 0.33 for layer {i} ({materials[i].name}). Adjusting node spacing.")
dx[i] = np.sqrt(a * dt / 0.33)
nodes_per_layer = max(2, int(np.ceil(thicknesses[i] / dx[i])))
dx[i] = thicknesses[i] / nodes_per_layer
Fo = a * dt / (dx[i] ** 2)
logger.info(f"Adjusted node spacing for layer {i}: dx={dx[i]:.4f} m, Fo={Fo:.3f}")
# Build system matrices
A = sparse.lil_matrix((total_nodes, total_nodes))
b = np.zeros(total_nodes)
node_to_layer = [i for i, _, _ in node_positions]
for idx, (layer_idx, node_j, global_idx) in enumerate(node_positions):
k_i = k[layer_idx]
rho_i = rho[layer_idx]
c_i = c[layer_idx]
dx_i = dx[layer_idx]
if node_j == 0 and layer_idx == 0: # Outdoor surface node
A[idx, idx] = 1.0 + 2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) + dt / (rho_i * c_i * dx_i * R_out)
A[idx, idx + 1] = -2 * dt * k_i / (dx_i * rho_i * c_i * dx_i)
b[idx] = dt / (rho_i * c_i * dx_i * R_out) # Outdoor temp contribution
elif node_j == nodes_per_layer - 1 and layer_idx == len(thicknesses) - 1: # Indoor surface node
A[idx, idx] = 1.0 + 2 * dt * k_i / (dx_i * rho_i * c_i * dx_i) + dt / (rho_i * c_i * dx_i * R_in)
A[idx, idx - 1] = -2 * dt * k_i / (dx_i * rho_i * c_i * dx_i)
b[idx] = dt / (rho_i * c_i * dx_i * R_in) # Indoor temp contribution
elif node_j == nodes_per_layer - 1 and layer_idx < len(thicknesses) - 1: # Interface between layers
k_next = k[layer_idx + 1]
dx_next = dx[layer_idx + 1]
rho_next = rho[layer_idx + 1]
c_next = c[layer_idx + 1]
A[idx, idx] = 1.0 + dt * (k_i / dx_i + k_next / dx_next) / (0.5 * (rho_i * c_i * dx_i + rho_next * c_next * dx_next))
A[idx, idx - 1] = -dt * k_i / (dx_i * 0.5 * (rho_i * c_i * dx_i + rho_next * c_next * dx_next))
A[idx, idx + 1] = -dt * k_next / (dx_next * 0.5 * (rho_i * c_i * dx_i + rho_next * c_next * dx_next))
elif node_j == 0 and layer_idx > 0: # Interface from previous layer
k_prev = k[layer_idx - 1]
dx_prev = dx[layer_idx - 1]
rho_prev = rho[layer_idx - 1]
c_prev = c[layer_idx - 1]
A[idx, idx] = 1.0 + dt * (k_prev / dx_prev + k_i / dx_i) / (0.5 * (rho_prev * c_prev * dx_prev + rho_i * c_i * dx_i))
A[idx, idx - 1] = -dt * k_prev / (dx_prev * 0.5 * (rho_prev * c_prev * dx_prev + rho_i * c_i * dx_i))
A[idx, idx + 1] = -dt * k_i / (dx_i * 0.5 * (rho_prev * c_prev * dx_prev + rho_i * c_i * dx_i))
else: # Internal node
A[idx, idx] = 1.0 + 2 * dt * k_i / (dx_i * rho_i * c_i * dx_i)
A[idx, idx - 1] = -dt * k_i / (dx_i * rho_i * c_i * dx_i)
A[idx, idx + 1] = -dt * k_i / (dx_i * rho_i * c_i * dx_i)
A = A.tocsr() # Convert to CSR for efficient solving
# Calculate CTF coefficients (X, Y, Z, F)
num_ctf = 12 # Standard number of coefficients
X = [0.0] * num_ctf # Exterior temp response
Y = [0.0] * num_ctf # Cross response
Z = [0.0] * num_ctf # Interior temp response
F = [0.0] * num_ctf # Flux history
T_prev = np.zeros(total_nodes) # Previous temperatures
# Impulse response for exterior temperature (X, Y)
for t in range(num_ctf):
b_out = b.copy()
if t == 0:
b_out[0] = dt / (rho[0] * c[0] * dx[0] * R_out) # Unit outdoor temp impulse
T = sparse_linalg.spsolve(A, b_out + T_prev)
q_in = (T[-1] - 0.0) / R_in # Indoor heat flux (W/m²)
Y[t] = q_in
q_out = (0.0 - T[0]) / R_out # Outdoor heat flux
X[t] = q_out
T_prev = T.copy()
# Reset for interior temperature (Z)
T_prev = np.zeros(total_nodes)
for t in range(num_ctf):
b_in = b.copy()
if t == 0:
b_in[-1] = dt / (rho[-1] * c[-1] * dx[-1] * R_in) # Unit indoor temp impulse
T = sparse_linalg.spsolve(A, b_in + T_prev)
q_in = (T[-1] - 0.0) / R_in
Z[t] = q_in
T_prev = T.copy()
# Flux history coefficients (F)
T_prev = np.zeros(total_nodes)
for t in range(num_ctf):
b_flux = np.zeros(total_nodes)
if t == 0:
b_flux[-1] = -1.0 / (rho[-1] * c[-1] * dx[-1]) # Unit flux impulse
T = sparse_linalg.spsolve(A, b_flux + T_prev)
q_in = (T[-1] - 0.0) / R_in
F[t] = q_in
T_prev = T.copy()
ctf = CTFCoefficients(X=X, Y=Y, Z=Z, F=F)
with cls._cache_lock:
cls._ctf_cache[construction_hash] = ctf
logger.info(f"Calculated CTF coefficients for construction {construction.name}")
return ctf
@classmethod
def calculate_ctf_tables(cls, component) -> CTFCoefficients:
"""Placeholder for future implementation of CTF table lookups for windows, doors, and skylights.
Args:
component: Building component with construction properties.
Returns:
CTFCoefficients: Placeholder zero coefficients until implementation.
"""
logger.info(f"CTF table calculation for {component.component_type.value} component '{component.name}' not yet implemented. Returning zero coefficients.")
return CTFCoefficients(X=[0.0], Y=[0.0], Z=[0.0], F=[0.0])