import streamlit as st
import sys
import os
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import tempfile
import csv
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from graphics_utils import draw_smooth_ellipse
from datetime import datetime
# # Add the src directory to the path for imports
# src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# if src_path not in sys.path:
# sys.path.insert(0, src_path)
# Import pinch analysis modules
try:
from Modules.Pinch.Pinch import Pinch
from Modules.HeatPumpIntegration.HeatPumpIntegration import HeatPumpIntegration as HPI
from Modules.Pinch_main import Pinchmain
PINCH_AVAILABLE = True
PINCH_IMPORT_ERROR = None
except ImportError as e:
PINCH_AVAILABLE = False
PINCH_IMPORT_ERROR = str(e)
st.set_page_config(
page_title="Potential Analysis",
initial_sidebar_state="collapsed"
)
# Apply styles immediately to prevent flash
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# Helper function to convert lon/lat to pixel coordinates (same as data_collection.py)
def snapshot_lonlat_to_pixel(lon, lat, center_ll, z_level, img_w, img_h):
import math as _math
def lonlat_to_xy(lon_val, lat_val, z_val):
lat_rad = _math.radians(lat_val)
n_val = 2.0 ** z_val
xtile = (lon_val + 180.0) / 360.0 * n_val
ytile = (1.0 - _math.log(_math.tan(lat_rad) + 1 / _math.cos(lat_rad)) / _math.pi) / 2.0 * n_val
return xtile, ytile
center_lon, center_lat = center_ll
cx, cy = lonlat_to_xy(center_lon, center_lat, z_level)
px_tile, py_tile = lonlat_to_xy(lon, lat, z_level)
tile_size = 256
dx_tiles = px_tile - cx
dy_tiles = py_tile - cy
dx_px = dx_tiles * tile_size
dy_px = dy_tiles * tile_size
screen_x = img_w / 2 + dx_px
screen_y = img_h / 2 + dy_px
return screen_x, screen_y
def generate_process_level_map():
"""Generate a map image showing only the process-level (green boxes)."""
snapshots_dict = st.session_state.get('map_snapshots', {})
active_base = st.session_state.get('current_base', 'OpenStreetMap')
if active_base == 'Blank':
base_img = Image.new('RGBA', (800, 600), (242, 242, 243, 255))
else:
chosen_bytes = snapshots_dict.get(active_base) or st.session_state.get('map_snapshot')
if not chosen_bytes:
return None
base_img = Image.open(BytesIO(chosen_bytes)).convert("RGBA")
w, h = base_img.size
draw = ImageDraw.Draw(base_img)
# Load font
BOX_FONT_SIZE = 20
try:
font = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", BOX_FONT_SIZE)
except (OSError, IOError):
try:
font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", BOX_FONT_SIZE)
except (OSError, IOError):
try:
font = ImageFont.truetype("DejaVuSans.ttf", BOX_FONT_SIZE)
except (OSError, IOError):
font = ImageFont.load_default()
# Draw process-level boxes (green)
group_coords = st.session_state.get('proc_group_coordinates', {})
proc_group_names = st.session_state.get('proc_group_names', [])
map_center = st.session_state.get('map_center', [0, 0])
map_zoom = st.session_state.get('map_zoom', 17.5)
for group_idx, coords_data in group_coords.items():
group_idx = int(group_idx) if isinstance(group_idx, str) else group_idx
if group_idx < len(proc_group_names):
lat = coords_data.get('lat')
lon = coords_data.get('lon')
if lat is not None and lon is not None:
try:
lat_f = float(lat)
lon_f = float(lon)
group_px, group_py = snapshot_lonlat_to_pixel(
lon_f, lat_f,
(map_center[1], map_center[0]),
map_zoom, w, h
)
if group_px < -50 or group_py < -20 or group_px > w + 50 or group_py > h + 20:
continue
group_label = proc_group_names[group_idx]
scale = float(coords_data.get('box_scale', 1.5) or 1.5)
padding = int(8 * scale)
text_bbox = draw.textbbox((0, 0), group_label, font=font)
tw = text_bbox[2] - text_bbox[0]
th = text_bbox[3] - text_bbox[1]
box_w = int(tw * scale + padding * 2)
box_h = int(th * scale + padding * 2)
x0 = int(group_px - box_w / 2)
y0 = int(group_py - box_h / 2)
x1 = x0 + box_w
y1 = y0 + box_h
# Draw box
fill_color = (200, 255, 200, 245)
border_color = (34, 139, 34, 255)
text_color = (0, 100, 0, 255)
draw.rectangle([x0, y0, x1, y1], fill=fill_color, outline=border_color, width=3)
# Draw label
text_x = x0 + (box_w - tw) // 2
text_y = y0 + (box_h - th) // 2
draw.text((text_x, text_y), group_label, fill=text_color, font=font)
# Draw stream circles above the process box
group_subprocess_list = st.session_state.get('proc_groups', [])[group_idx] if group_idx < len(st.session_state.get('proc_groups', [])) else []
all_streams = []
selected_items = st.session_state.get('selected_items', {})
# Collect selected streams from all subprocesses
for sub_idx in group_subprocess_list:
if sub_idx < len(st.session_state.get('processes', [])):
subprocess = st.session_state['processes'][sub_idx]
for stream_idx, stream in enumerate(subprocess.get('streams', [])):
# Only add if selected
stream_key = f"stream_{sub_idx}_{stream_idx}"
if selected_items.get(stream_key, True):
all_streams.append(stream)
if all_streams:
circle_y = y0 - 20
circle_spacing = 32
base_radius = 15
total_width = len(all_streams) * circle_spacing
start_x = int((x0 + x1) / 2 - total_width / 2 + circle_spacing / 2)
for stream_idx, stream in enumerate(all_streams):
sv = stream.get('stream_values', {})
tin = sv.get('Tin', '') or stream.get('temp_in', '')
tout = sv.get('Tout', '') or stream.get('temp_out', '')
mdot = sv.get('ṁ', '') or stream.get('mdot', '')
cp_val = sv.get('cp', '') or stream.get('cp', '')
CP_val = sv.get('CP', '') or stream.get('CP', '')
Q = 0
try:
if tin and tout:
tin_f = float(tin)
tout_f = float(tout)
delta_T = abs(tout_f - tin_f)
if CP_val:
Q = float(CP_val) * delta_T
elif mdot and cp_val:
Q = float(mdot) * float(cp_val) * delta_T
except (ValueError, TypeError):
Q = 0
if Q > 0:
import math as _m
# New size logic: below 1000 smaller, above 5000 very big, scaled in between
if Q < 1000:
radius = base_radius + 5 # smaller
elif Q > 5000:
radius = base_radius + 25 # very big
else:
# Scale between 1000-5000
scale_factor = (Q - 1000) / (5000 - 1000) # 0 to 1
radius = base_radius + 5 + int(scale_factor * 20) # 5 to 25
else:
radius = base_radius
# Determine color with temperature-based intensity
try:
if tin and tout:
tin_f = float(tin)
tout_f = float(tout)
max_temp = max(tin_f, tout_f)
if tin_f > tout_f:
# Hot stream (Heat Source)
if max_temp > 100:
circle_color = (255, 0, 0, 220) # Strong red
else:
circle_color = (255, 100, 100, 220) # Less strong red
else:
# Cold Stream (Heat Sink)
if max_temp > 100:
circle_color = (0, 0, 255, 220) # Strong blue
else:
circle_color = (100, 150, 255, 220) # Less strong blue
else:
circle_color = (150, 150, 150, 200)
except (ValueError, TypeError):
circle_color = (150, 150, 150, 200)
circle_x = start_x + stream_idx * circle_spacing
base_img = draw_smooth_ellipse(
base_img,
[circle_x - radius, circle_y - radius, circle_x + radius, circle_y + radius],
fill=circle_color,
outline=(50, 50, 50, 255),
width=1,
)
draw = ImageDraw.Draw(base_img)
except (ValueError, TypeError):
continue
return base_img
def generate_subprocess_level_map(group_idx=None):
"""Generate a map image showing subprocesses with all connections, energy data, colors, arrows.
Args:
group_idx: If provided, only show subprocesses from this process group. If None, show all.
"""
import math
snapshots_dict = st.session_state.get('map_snapshots', {})
active_base = st.session_state.get('current_base', 'OpenStreetMap')
if active_base == 'Blank':
base_img = Image.new('RGBA', (800, 600), (242, 242, 243, 255))
else:
chosen_bytes = snapshots_dict.get(active_base) or st.session_state.get('map_snapshot')
if not chosen_bytes:
return None
# Create a fresh copy of the base image to avoid modifying cached version
original_img = Image.open(BytesIO(chosen_bytes)).convert("RGBA")
base_img = original_img.copy()
w, h = base_img.size
draw = ImageDraw.Draw(base_img)
# Load font - Cross-platform font loading
BOX_FONT_SIZE = 20
LABEL_FONT_SIZE = 6 # Very small font for stream labels
# Try common font paths for different platforms
font_paths = [
# Linux common fonts
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
"/usr/share/fonts/TTF/DejaVuSans.ttf",
# macOS fonts
"/System/Library/Fonts/Helvetica.ttc",
"/System/Library/Fonts/Arial.ttf",
# Windows fonts (fallback)
"C:/Windows/Fonts/arial.ttf",
# Generic fallback
"DejaVuSans.ttf"
]
font = None
label_font = None
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, BOX_FONT_SIZE)
break # Success, stop trying
except (OSError, IOError):
continue
# Final fallback if all font paths fail
if font is None:
try:
font = ImageFont.load_default()
except:
font = None
# FORCE label_font to None to always use fallback sizing
label_font = None
# Create a tiny font for labels that will work everywhere
try:
# Try to create the smallest possible truetype font
tiny_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 4)
except:
try:
tiny_font = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", 4)
except:
# Last resort: use default but we'll override the sizing
tiny_font = None
processes = st.session_state.get('processes', [])
map_center = st.session_state.get('map_center', [0, 0])
map_zoom = st.session_state.get('map_zoom', 17.5)
proc_groups = st.session_state.get('proc_groups', [])
proc_group_names = st.session_state.get('proc_group_names', [])
if not processes:
return None
# Create mapping of subprocess index to group index
subprocess_to_group = {}
for g_idx, group_subprocess_list in enumerate(proc_groups):
for subprocess_idx in group_subprocess_list:
subprocess_to_group[subprocess_idx] = g_idx
# Draw white canvas overlay (90% of screen, centered) - ALWAYS show for report
overlay_w = int(w * 0.9)
overlay_h = int(h * 0.9)
center_px = w // 2
center_py = h // 2
overlay_x0 = int(center_px - overlay_w / 2)
overlay_y0 = int(center_py - overlay_h / 2)
overlay_x1 = overlay_x0 + overlay_w
overlay_y1 = overlay_y0 + overlay_h
margin = 20
overlay_x0 = max(margin, overlay_x0)
overlay_y0 = max(margin, overlay_y0)
overlay_x1 = min(w - margin, overlay_x1)
overlay_y1 = min(h - margin, overlay_y1)
draw.rectangle([overlay_x0, overlay_y0, overlay_x1, overlay_y1],
fill=(250, 250, 250, 40),
outline=(245, 245, 245, 80),
width=1)
# Add process area label
if proc_group_names and group_idx is not None and group_idx < len(proc_group_names):
overlay_label = f"Process Area: {proc_group_names[group_idx]}"
elif proc_group_names:
overlay_label = f"Process Area: {proc_group_names[0]}" if len(proc_group_names) == 1 else "Subprocess View"
else:
overlay_label = "Subprocess View"
if font:
label_bbox = draw.textbbox((0, 0), overlay_label, font=font)
label_w = label_bbox[2] - label_bbox[0]
label_h = label_bbox[3] - label_bbox[1]
else:
label_w = len(overlay_label) * 6
label_h = 10
label_x = overlay_x0 + 15
label_y = overlay_y0 + 15
draw.rectangle([label_x-5, label_y-3, label_x+label_w+5, label_y+label_h+3],
fill=(255, 255, 255, 120),
outline=(220, 220, 220, 100),
width=1)
if font:
draw.text((label_x, label_y), overlay_label, fill=(40, 40, 40, 255), font=font)
else:
draw.text((label_x, label_y), overlay_label, fill=(40, 40, 40, 255))
# Filter processes by group if specified
if group_idx is not None:
proc_groups_list = st.session_state.get('proc_groups', [])
if group_idx < len(proc_groups_list):
subprocess_indices = set(proc_groups_list[group_idx]) # Convert to set for fast lookup
print(f"DEBUG: Generating subprocess map for group {group_idx}")
print(f" subprocess_indices (set): {subprocess_indices}")
print(f" proc_groups_list: {proc_groups_list}")
else:
subprocess_indices = set()
print(f"DEBUG: Group {group_idx} is out of bounds, no subprocesses")
else:
subprocess_indices = set(range(len(processes)))
print(f"DEBUG: Generating subprocess map for all processes: {subprocess_indices}")
# Collect positioned subprocesses (filtered by group if specified)
positioned = []
name_index = {}
skipped_count = 0
included_count = 0
print(f" Starting subprocess iteration, total processes: {len(processes)}")
for i, p in enumerate(processes):
# Skip if filtering by group and this subprocess is not in the group
if group_idx is not None and i not in subprocess_indices:
print(f" Process {i} ({p.get('name', 'unnamed')}): SKIPPED (not in group {group_idx})")
skipped_count += 1
continue
print(f" Process {i} ({p.get('name', 'unnamed')}): INCLUDED in group {group_idx}")
included_count += 1
lat = p.get('lat')
lon = p.get('lon')
if lat in (None, "", "None") or lon in (None, "", "None"):
continue
try:
lat_f = float(lat)
lon_f = float(lon)
proc_px, proc_py = snapshot_lonlat_to_pixel(
lon_f, lat_f,
(map_center[1], map_center[0]),
map_zoom, w, h
)
if proc_px < -50 or proc_py < -20 or proc_px > w + 50 or proc_py > h + 20:
continue
label = p.get('name') or f"P{i+1}"
scale = float(p.get('box_scale', 6.0) or 6.0)
base_padding = 18
padding = int(base_padding * scale)
text_bbox = draw.textbbox((0, 0), label, font=font) if font else (0, 0, len(label) * 6, 10)
tw = text_bbox[2] - text_bbox[0]
th = text_bbox[3] - text_bbox[1]
box_w = int(tw * scale + padding * 2)
box_h = int(th * scale + padding * 2)
x0 = int(proc_px - box_w / 2)
y0 = int(proc_py - box_h / 2)
x1 = x0 + box_w
y1 = y0 + box_h
if x1 < 0 or y1 < 0 or x0 > w or y0 > h:
continue
positioned.append({
'idx': i,
'label': label,
'center': (proc_px, proc_py),
'box': (x0, y0, x1, y1),
'next_raw': p.get('next', '') or '',
'type': 'subprocess'
})
lname = label.strip().lower()
name_index.setdefault(lname, []).append(len(positioned) - 1)
except (ValueError, TypeError):
continue
# Helper: draw arrow with head
def _draw_arrow(draw_ctx, x_start, y_start, x_end, y_end, color=(0, 0, 0, 255), width=3, head_len=18, head_angle_deg=30):
draw_ctx.line([(x_start, y_start), (x_end, y_end)], fill=color, width=width, joint='curve')
ang = math.atan2(y_end - y_start, x_end - x_start)
ang_left = ang - math.radians(head_angle_deg)
ang_right = ang + math.radians(head_angle_deg)
x_left = x_end - head_len * math.cos(ang_left)
y_left = y_end - head_len * math.sin(ang_left)
x_right = x_end - head_len * math.cos(ang_right)
y_right = y_end - head_len * math.sin(ang_right)
draw_ctx.polygon([(x_end, y_end), (x_left, y_left), (x_right, y_right)], fill=color)
def _resolve_targets(target_token, positioned_list, name_idx):
target_token = target_token.strip()
if not target_token:
return []
if target_token.isdigit():
idx_int = int(target_token) - 1
for d in positioned_list:
if d['idx'] == idx_int:
return [d]
return []
lname2 = target_token.lower()
if lname2 in name_idx:
return [positioned_list[i] for i in name_idx[lname2]]
return [d for d in positioned_list if d['label'].lower() == lname2]
# Draw connection arrows and collect product stream info
connection_product_streams = []
for src in positioned:
raw_next = src['next_raw']
if not raw_next:
continue
parts = []
for chunk in raw_next.replace(';', ',').replace('|', ',').split(','):
part = chunk.strip()
if part:
parts.append(part)
if not parts:
continue
sx, sy = src['center']
for part_token in parts:
targets = _resolve_targets(part_token, positioned, name_index)
for tgt in targets:
if tgt is src:
continue
tx, ty = tgt['center']
sx0, sy0, sx1, sy1 = src['box']
sw2 = (sx1 - sx0) / 2.0
sh2 = (sy1 - sy0) / 2.0
tx0, ty0, tx1, ty1 = tgt['box']
tw2 = (tx1 - tx0) / 2.0
th2 = (ty1 - ty0) / 2.0
vec_dx = tx - sx
vec_dy = ty - sy
if vec_dx == 0 and vec_dy == 0:
continue
factors_s = []
if vec_dx != 0:
factors_s.append(sw2 / abs(vec_dx))
if vec_dy != 0:
factors_s.append(sh2 / abs(vec_dy))
t_s = min(factors_s) if factors_s else 0
factors_t = []
if vec_dx != 0:
factors_t.append(tw2 / abs(vec_dx))
if vec_dy != 0:
factors_t.append(th2 / abs(vec_dy))
t_t = min(factors_t) if factors_t else 0
start_x = sx + vec_dx * t_s * 1.02
start_y = sy + vec_dy * t_s * 1.02
end_x = tx - vec_dx * t_t * 1.02
end_y = ty - vec_dy * t_t * 1.02
_draw_arrow(draw, start_x, start_y, end_x, end_y, color=(0, 0, 0, 245), width=5)
# Collect product streams for labeling
if src.get('type') == 'subprocess':
src_proc = processes[src['idx']]
src_streams = src_proc.get('streams', []) or []
product_streams = [s for s in src_streams if s.get('type', 'product') == 'product']
if product_streams:
connection_product_streams.append({
'start_x': start_x, 'start_y': start_y,
'end_x': end_x, 'end_y': end_y,
'streams': product_streams
})
# Draw product stream labels on connection arrows
def _draw_connection_stream_label(draw_ctx, start_x, start_y, end_x, end_y, streams, font_obj):
try:
small_font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", 10)
except:
try:
small_font = ImageFont.truetype("arial.ttf", 10)
except:
small_font = font_obj
all_parts = []
for s in streams:
display_vars = s.get('display_vars', [])
sv = s.get('stream_values', {}) or s.get('product_values', {})
tin_val = sv.get('Tin', '') or s.get('temp_in', '')
tout_val = sv.get('Tout', '') or s.get('temp_out', '')
mdot_val = sv.get('ṁ', '') or s.get('mdot', '')
cp_val = sv.get('cp', '') or s.get('cp', '')
CP_val = sv.get('CP', '')
# Calculate Q (heat power)
Q_val = ''
try:
if tin_val and tout_val and mdot_val and cp_val:
Q_calc = float(mdot_val) * float(cp_val) * abs(float(tout_val) - float(tin_val))
Q_val = f"{Q_calc:.2f}"
except (ValueError, TypeError):
Q_val = ''
stream_parts = []
stream_name = s.get('name', '')
if tin_val:
stream_parts.append(f"Tin={tin_val}")
if tout_val and "Tout" in display_vars:
stream_parts.append(f"Tout={tout_val}")
if mdot_val and "ṁ" in display_vars:
stream_parts.append(f"ṁ={mdot_val}")
if cp_val and "cp" in display_vars:
stream_parts.append(f"cp={cp_val}")
if CP_val and "CP" in display_vars:
stream_parts.append(f"CP={CP_val}")
if Q_val:
stream_parts.append(f"Q={Q_val}kW")
if stream_parts:
label = " | ".join(stream_parts)
if stream_name:
label = f"{stream_name}: {label}"
all_parts.append(label)
if not all_parts:
return
mid_x = (start_x + end_x) / 2
mid_y = (start_y + end_y) / 2
perp_offset = 18
line_height = 14
for line_idx, stream_label in enumerate(all_parts):
if small_font:
tb = draw_ctx.textbbox((0, 0), stream_label, font=small_font)
t_width = tb[2] - tb[0]
t_height = tb[3] - tb[1]
else:
t_width = len(stream_label) * 5
t_height = 8
lx = int(mid_x - t_width / 2)
ly = int(mid_y + perp_offset + (line_idx * line_height))
draw_ctx.rectangle([lx - 3, ly - 1, lx + t_width + 3, ly + t_height + 1],
fill=(255, 255, 255, 235), outline=(120, 120, 120, 150))
if small_font:
draw_ctx.text((lx, ly), stream_label, fill=(0, 70, 0, 255), font=small_font)
else:
draw_ctx.text((lx, ly), stream_label, fill=(0, 70, 0, 255))
# Draw subprocess boxes
for item in positioned:
x0, y0, x1, y1 = item['box']
label = item['label']
fill_color = (224, 242, 255, 245) # Light blue
border_color = (23, 105, 170, 255) # Dark blue
text_color = (10, 53, 85, 255)
draw.rectangle([x0, y0, x1, y1], fill=fill_color, outline=border_color, width=2)
box_w = x1 - x0
box_h = y1 - y0
if font:
bbox_lbl = draw.textbbox((0, 0), label, font=font)
t_w = bbox_lbl[2] - bbox_lbl[0]
t_h = bbox_lbl[3] - bbox_lbl[1]
else:
t_w = len(label) * 6
t_h = 10
ct_x = int(x0 + (box_w - t_w) / 2)
ct_y = int(y0 + (box_h - t_h) / 2)
if font:
draw.text((ct_x, ct_y), label, fill=text_color, font=font)
else:
draw.text((ct_x, ct_y), label, fill=text_color)
# Stream labels removed - data shown in table below instead
# Draw vertical stream arrows (Tin/Tout)
arrow_len_v = 45
base_stream_spacing = 35 # Even more reduced spacing
def _draw_v_arrow(draw_ctx, x_pos, y_from, y_to, head_at_end=True, color=(0, 0, 0, 245), width=3):
draw_ctx.line([(x_pos, y_from), (x_pos, y_to)], fill=color, width=width)
head_len = 11
head_half = 7
if head_at_end:
direction = 1 if y_to >= y_from else -1
tip_y = y_to
base_y = y_to - direction * head_len
else:
direction = 1 if y_from >= y_to else -1
tip_y = y_from
base_y = y_from - direction * head_len
draw_ctx.polygon([
(x_pos, tip_y),
(x_pos - head_half, base_y),
(x_pos + head_half, base_y)
], fill=color)
def _label_centered(text_str, x_center, y_baseline, above=True):
if not text_str:
return
# Try to use tiny font, otherwise use manual sizing
if tiny_font:
tb = draw.textbbox((0, 0), text_str, font=tiny_font)
t_width = tb[2] - tb[0]
t_height = tb[3] - tb[1]
text_xc = int(x_center - t_width / 2)
text_yc = int(y_baseline - (t_height if above else 0))
draw.rectangle([text_xc - 1, text_yc - 1, text_xc + t_width + 1, text_yc + t_height + 1], fill=(255, 255, 255, 240))
draw.text((text_xc, text_yc), text_str, fill=(0, 0, 0, 255), font=tiny_font)
else:
# Manual ultra-small sizing as fallback
t_width = len(text_str) * 2.5
t_height = 4
text_xc = int(x_center - t_width / 2)
text_yc = int(y_baseline - (t_height if above else 0))
draw.rectangle([text_xc - 1, text_yc - 1, text_xc + t_width + 1, text_yc + t_height + 1], fill=(255, 255, 255, 240))
draw.text((text_xc, text_yc), text_str, fill=(0, 0, 0, 255))
# Draw stream arrows for each subprocess
for item in positioned:
if item.get('type') != 'subprocess':
continue
proc_idx = item['idx']
proc = processes[proc_idx]
streams = proc.get('streams', []) or []
if not streams:
continue
non_product_streams = [s for s in streams if s.get('type', 'product') != 'product']
if not non_product_streams:
continue
x0, y0, x1, y1 = item['box']
box_center_x = (x0 + x1) / 2
n_streams = len(non_product_streams)
stream_h_spacing = max(28, min(base_stream_spacing, (x1 - x0 - 20) / max(1, n_streams))) if n_streams > 1 else 0
for s_i, s in enumerate(non_product_streams):
offset = (s_i - (n_streams - 1) / 2) * stream_h_spacing
sx = int(box_center_x + offset)
tin_raw = s.get('temp_in', '')
tout_raw = s.get('temp_out', '')
try:
tin_val = float(str(tin_raw).strip())
except (ValueError, TypeError):
tin_val = None
try:
tout_val = float(str(tout_raw).strip())
except (ValueError, TypeError):
tout_val = None
if tin_val is not None and tout_val is not None:
is_cooling = tin_val > tout_val
col = (200, 25, 25, 255) if is_cooling else (25, 80, 200, 255)
else:
col = (90, 90, 90, 255)
inbound_bottom = y0 - 2
inbound_top = inbound_bottom - arrow_len_v
_draw_v_arrow(draw, sx, inbound_top, inbound_bottom, head_at_end=True, color=col, width=4)
outbound_top = y1 + 2
outbound_bottom = outbound_top + arrow_len_v
_draw_v_arrow(draw, sx, outbound_top, outbound_bottom, head_at_end=True, color=col, width=4)
mdot_raw = s.get('mdot', '')
cp_raw = s.get('cp', '')
m_symbol = "ṁ"
mdot_part = f"{m_symbol}={mdot_raw}" if mdot_raw not in (None, '') else ''
cp_part = f"cp={cp_raw}" if cp_raw not in (None, '') else ''
tin_label = f"Tin={tin_raw}" if tin_raw not in ('', None) else 'Tin=?'
tout_label = f"Tout={tout_raw}" if tout_raw not in ('', None) else 'Tout=?'
top_components = [tin_label]
if mdot_part:
top_components.append(mdot_part)
if cp_part:
top_components.append(cp_part)
bot_components = [tout_label]
if mdot_part:
bot_components.append(mdot_part)
if cp_part:
bot_components.append(cp_part)
top_text = " | ".join(top_components)
bot_text = " | ".join(bot_components)
# Labels removed - only arrows shown in subprocess view
print(f" - Total processes in session: {len(processes)}")
print(f" - Skipped: {skipped_count}, Included: {included_count}")
print(f" - Positioned (drawable): {len(positioned)}")
return base_img
def generate_report():
"""Generate an HTML report with process maps, notes, and pinch analysis results."""
import base64
import os
import tempfile
import csv
# Load the logo SVG file
logo_b64 = ""
try:
logo_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'data', 'symbol.svg')
with open(logo_path, 'r') as f:
svg_content = f.read()
logo_b64 = base64.b64encode(svg_content.encode('utf-8')).decode('utf-8')
except Exception:
pass
# Generate process-level maps for both OpenStreetMap and Satellite
# Temporarily save current base to restore later
original_base = st.session_state.get('current_base', 'OpenStreetMap')
process_map_osm_b64 = ""
process_map_satellite_b64 = ""
# Generate OpenStreetMap version
st.session_state['current_base'] = 'OpenStreetMap'
process_map_osm = generate_process_level_map()
if process_map_osm:
img_buffer = BytesIO()
process_map_osm.save(img_buffer, format='PNG')
img_buffer.seek(0)
process_map_osm_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
# Generate Satellite version
st.session_state['current_base'] = 'Satellite'
process_map_satellite = generate_process_level_map()
if process_map_satellite:
img_buffer = BytesIO()
process_map_satellite.save(img_buffer, format='PNG')
img_buffer.seek(0)
process_map_satellite_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
# Restore original base BEFORE generating subprocess maps
st.session_state['current_base'] = original_base
# Generate subprocess maps for each process group
proc_groups = st.session_state.get('proc_groups', [])
proc_group_names = st.session_state.get('proc_group_names', [])
subprocess_maps_b64 = []
print(f"\n=== STARTING SUBPROCESS MAP GENERATION ===")
print(f"Total process groups: {len(proc_groups)}")
print(f"Process groups structure: {proc_groups}")
print(f"Process group names: {proc_group_names}")
# CRITICAL CHECK: Verify groups contain different subprocesses
all_subprocess_indices = []
for g_idx, g_subprocs in enumerate(proc_groups):
all_subprocess_indices.extend(g_subprocs)
print(f" Group {g_idx} ('{proc_group_names[g_idx] if g_idx < len(proc_group_names) else 'unnamed'}'): subprocesses {g_subprocs}")
if len(all_subprocess_indices) != len(set(all_subprocess_indices)):
print(f"WARNING: Some subprocesses appear in multiple groups! This will cause duplicate maps.")
print(f"All subprocess indices: {all_subprocess_indices}")
print(f"Unique subprocess indices: {set(all_subprocess_indices)}")
for group_idx in range(len(proc_groups)):
if len(proc_groups[group_idx]) > 0: # Only generate if group has subprocesses
print(f"\n--- Generating map for group {group_idx} ('{proc_group_names[group_idx] if group_idx < len(proc_group_names) else 'unnamed'}') ---")
print(f"Subprocesses that SHOULD appear in this map: {proc_groups[group_idx]}")
# Generate unique map for this specific group - force fresh generation
subprocess_map = generate_subprocess_level_map(group_idx=group_idx)
if subprocess_map:
# Create a fresh buffer for each map
img_buffer = BytesIO()
subprocess_map.save(img_buffer, format='PNG')
img_buffer.seek(0)
img_data = img_buffer.getvalue()
map_b64 = base64.b64encode(img_data).decode('utf-8')
img_buffer.close() # Ensure buffer is closed
# Debug: Print first 50 chars of base64 to verify uniqueness
print(f"Group {group_idx} map encoded: {len(map_b64)} chars, starts with: {map_b64[:50]}")
group_name = proc_group_names[group_idx] if group_idx < len(proc_group_names) else f"Process {group_idx + 1}"
subprocess_maps_b64.append({
'name': group_name,
'image': map_b64,
'group_idx': group_idx,
'subprocess_count': len(proc_groups[group_idx])
})
print(f"Added map for '{group_name}' to list (index {len(subprocess_maps_b64)-1})")
else:
print(f"WARNING: No map generated for group {group_idx}")
print(f"\n=== SUBPROCESS MAP GENERATION COMPLETE ===")
print(f"Total maps generated: {len(subprocess_maps_b64)}")
for idx, m in enumerate(subprocess_maps_b64):
print(f" Map {idx}: {m['name']} (group {m['group_idx']}, {m['subprocess_count']} subprocesses)")
# Legacy single subprocess map for backward compatibility
subprocess_map_b64 = subprocess_maps_b64[0]['image'] if subprocess_maps_b64 else ""
# Get notes
project_notes = st.session_state.get('project_notes', '')
notes_html = ""
if project_notes:
for para in project_notes.split('\n'):
if para.strip():
notes_html += f"
{para}
\n"
else:
notes_html = "No notes recorded.
"
# Build data collection table
processes = st.session_state.get('processes', [])
proc_group_names = st.session_state.get('proc_group_names', [])
proc_groups = st.session_state.get('proc_groups', [])
# Create mapping of subprocess to process
subprocess_to_process = {}
for group_idx, group_subprocess_list in enumerate(proc_groups):
for subprocess_idx in group_subprocess_list:
subprocess_to_process[subprocess_idx] = group_idx
data_table_html = """
| Process |
Subprocess |
Stream |
Type |
Tin (°C) |
Tout (°C) |
ṁ |
cp |
CP |
Q (kW) |
Water In |
Water Out |
Density |
Pressure |
"""
for subprocess_idx, subprocess in enumerate(processes):
# Get process name
process_idx = subprocess_to_process.get(subprocess_idx)
if process_idx is not None and process_idx < len(proc_group_names):
process_name = proc_group_names[process_idx]
else:
process_name = "Unknown"
subprocess_name = subprocess.get('name', f'Subprocess {subprocess_idx + 1}')
# Get streams
streams = subprocess.get('streams', [])
if streams:
for stream_idx, stream in enumerate(streams):
stream_name = stream.get('name', f'Stream {stream_idx + 1}')
stream_type = stream.get('type', 'product')
# Get stream values
stream_values = stream.get('stream_values', {})
tin = stream_values.get('Tin', '')
tout = stream_values.get('Tout', '')
mdot = stream_values.get('ṁ', '')
cp_val = stream_values.get('cp', '')
CP = stream_values.get('CP', '')
water_in = stream_values.get('Water Content In', '')
water_out = stream_values.get('Water Content Out', '')
density = stream_values.get('Density', '')
pressure = stream_values.get('Pressure', '')
# Fallback to legacy fields if stream_values is empty
if not stream_values:
tin = stream.get('temp_in', '')
tout = stream.get('temp_out', '')
mdot = stream.get('mdot', '')
cp_val = stream.get('cp', '')
# Calculate Q (heat power) = ṁ × cp × ΔT
Q = ''
try:
if tin and tout and mdot and cp_val:
tin_f = float(tin)
tout_f = float(tout)
mdot_f = float(mdot)
cp_f = float(cp_val)
Q_calc = mdot_f * cp_f * abs(tout_f - tin_f)
Q = f"{Q_calc:.2f}"
except (ValueError, TypeError):
Q = ''
data_table_html += f"""
| {process_name} |
{subprocess_name} |
{stream_name} |
{stream_type} |
{tin} |
{tout} |
{mdot} |
{cp_val} |
{CP} |
{Q} |
{water_in} |
{water_out} |
{density} |
{pressure} |
"""
else:
# Subprocess with no streams
data_table_html += f"""
| {process_name} |
{subprocess_name} |
No streams |
"""
data_table_html += """
"""
# Get pinch notes
pinch_notes = st.session_state.get('pinch_notes', '')
pinch_notes_html = ""
if pinch_notes:
for para in pinch_notes.split('\n'):
if para.strip():
pinch_notes_html += f"{para}
\n"
else:
pinch_notes_html = "No pinch analysis notes recorded.
"
# =====================================================
# PINCH ANALYSIS DATA FOR REPORT
# =====================================================
pinch_section_html = ""
composite_plot_b64 = ""
gcc_plot_b64 = ""
interval_plot_b64 = ""
if PINCH_AVAILABLE:
try:
# Helper function to extract stream info (same as main code)
def get_stream_info_report(stream):
properties = stream.get('properties', {})
values = stream.get('values', {})
stream_values = stream.get('stream_values', {})
if not stream_values:
stream_values = stream.get('product_values', {})
tin = None
tout = None
mdot = None
cp_val = None
CP_direct = None
if stream_values:
if stream_values.get('Tin'):
try: tin = float(stream_values['Tin'])
except: pass
if stream_values.get('Tout'):
try: tout = float(stream_values['Tout'])
except: pass
if stream_values.get('ṁ'):
try: mdot = float(stream_values['ṁ'])
except: pass
if stream_values.get('cp'):
try: cp_val = float(stream_values['cp'])
except: pass
if stream_values.get('CP'):
try: CP_direct = float(stream_values['CP'])
except: pass
if isinstance(properties, dict) and isinstance(values, dict):
for pk, pname in properties.items():
vk = pk.replace('prop', 'val')
v = values.get(vk, '')
if pname == 'Tin' and v and tin is None:
try: tin = float(v)
except: pass
elif pname == 'Tout' and v and tout is None:
try: tout = float(v)
except: pass
elif pname == 'ṁ' and v and mdot is None:
try: mdot = float(v)
except: pass
elif pname == 'cp' and v and cp_val is None:
try: cp_val = float(v)
except: pass
elif pname == 'CP' and v and CP_direct is None:
try: CP_direct = float(v)
except: pass
if tin is None and stream.get('temp_in'):
try: tin = float(stream['temp_in'])
except: pass
if tout is None and stream.get('temp_out'):
try: tout = float(stream['temp_out'])
except: pass
if mdot is None and stream.get('mdot'):
try: mdot = float(stream['mdot'])
except: pass
if cp_val is None and stream.get('cp'):
try: cp_val = float(stream['cp'])
except: pass
stream_type = None
if tin is not None and tout is not None:
stream_type = "Hot stream (Heat Source)" if tin > tout else "Cold stream (Heat sink)"
CP_flow = CP_direct if CP_direct is not None else (mdot * cp_val if mdot and cp_val else None)
Q = abs(CP_flow * (tout - tin)) if CP_flow and tin is not None and tout is not None else None
return {'tin': tin, 'tout': tout, 'mdot': mdot, 'cp': cp_val, 'CP': CP_flow, 'Q': Q, 'type': stream_type}
# Extract stream data from selections
processes = st.session_state.get('processes', [])
sel_items = st.session_state.get('selected_items', {})
streams_data = []
for sel_key, is_sel in sel_items.items():
if not is_sel or not sel_key.startswith("stream_"):
continue
parts_split = sel_key.split("_")
p_idx, s_idx = int(parts_split[1]), int(parts_split[2])
if p_idx < len(processes):
proc = processes[p_idx]
proc_streams = proc.get('streams', [])
if s_idx < len(proc_streams):
strm = proc_streams[s_idx]
info = get_stream_info_report(strm)
if info['tin'] is not None and info['tout'] is not None and info['CP'] is not None:
streams_data.append({
'name': f"{proc.get('name', f'Subprocess {p_idx + 1}')} - {strm.get('name', f'Stream {s_idx + 1}')}",
'CP': info['CP'], 'Tin': info['tin'], 'Tout': info['tout'], 'Q': info['Q'], 'type': info['type']
})
if len(streams_data) >= 2:
# Build stream list HTML for side-by-side layout
stream_list_html = ""
for sel_key, is_sel in sel_items.items():
if sel_key.startswith("stream_"):
parts_split = sel_key.split("_")
p_idx, s_idx = int(parts_split[1]), int(parts_split[2])
if p_idx < len(processes):
proc = processes[p_idx]
proc_streams = proc.get('streams', [])
if s_idx < len(proc_streams):
strm = proc_streams[s_idx]
info = get_stream_info_report(strm)
stream_name = f"{proc.get('name', f'Subprocess {p_idx + 1}')} - {strm.get('name', f'Stream {s_idx + 1}')}"
# Determine if selected
selected_class = "selected" if is_sel else ""
checkbox_content = "✓" if is_sel else ""
# Show Q if available
q_display = f"{info['Q']:.2f} kW" if info['Q'] is not None else "N/A"
type_display = "Hot stream (Heat Source)" if info['type'] == "Hot stream (Heat Source)" else ("Cold stream (Heat sink)" if info['type'] == "Cold stream (Heat sink)" else (info['type'] if info['type'] else "N/A"))
stream_list_html += f"""
{checkbox_content}
{stream_name}
Type: {type_display} | Q: {q_display}
"""
# Generate process map with circles for selected streams
process_map_with_streams = generate_process_level_map()
process_map_streams_b64 = ""
if process_map_with_streams:
img_buffer = BytesIO()
process_map_with_streams.save(img_buffer, format='PNG')
img_buffer.seek(0)
process_map_streams_b64 = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
# Build streams table HTML
streams_table_html = """
| Stream | Tin (°C) | Tout (°C) | CP (kW/K) | Q (kW) | Type |
"""
for s in streams_data:
type_badge = f'Hot stream (Heat Source)' if s['type'] == 'Hot stream (Heat Source)' else f'Cold Stream (Heat Sink)'
streams_table_html += f"""
| {s['name']} |
{s['Tin']:.1f} |
{s['Tout']:.1f} |
{s['CP']:.2f} |
{s['Q']:.2f} |
{type_badge} |
"""
streams_table_html += "
"
# Run pinch analysis
tmin = st.session_state.get('tmin_input', 10.0)
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='') as f:
writer = csv.writer(f)
writer.writerow(['Tmin', str(tmin)])
writer.writerow(['CP', 'TSUPPLY', 'TTARGET'])
for strm in streams_data:
writer.writerow([strm['CP'], strm['Tin'], strm['Tout']])
temp_csv_path = f.name
try:
pinch_obj = Pinch(temp_csv_path, options={})
pinch_obj.shiftTemperatures()
pinch_obj.constructTemperatureInterval()
pinch_obj.constructProblemTable()
pinch_obj.constructHeatCascade()
pinch_obj.constructShiftedCompositeDiagram('EN')
pinch_obj.constructCompositeDiagram('EN')
pinch_obj.constructGrandCompositeCurve('EN')
# Calculate Heat Recovery
total_hot_duty = sum(abs(s['Q']) for s in streams_data if s['Tin'] > s['Tout'])
heat_recovery = total_hot_duty - pinch_obj.hotUtility
results = {
'hot_utility': pinch_obj.hotUtility,
'cold_utility': pinch_obj.coldUtility,
'pinch_temperature': pinch_obj.pinchTemperature,
'heat_recovery': heat_recovery,
'tmin': pinch_obj.tmin,
'composite_diagram': pinch_obj.compositeDiagram,
'grand_composite_curve': pinch_obj.grandCompositeCurve,
'heat_cascade': pinch_obj.heatCascade,
'temperatures': pinch_obj._temperatures,
'streams': list(pinch_obj.streams)
}
# Generate Composite Curves plot using Plotly (interactive HTML)
diagram = results['composite_diagram']
fig1 = go.Figure()
fig1.add_trace(go.Scatter(x=diagram['hot']['H'], y=diagram['hot']['T'], mode='lines+markers', name='Hot streams (Heat sources)', line=dict(color='red', width=3), marker=dict(size=8)))
fig1.add_trace(go.Scatter(x=diagram['cold']['H'], y=diagram['cold']['T'], mode='lines+markers', name='Cold stream (Heat Sinks)', line=dict(color='blue', width=3), marker=dict(size=8)))
fig1.add_hline(y=results['pinch_temperature'], line_dash='dash', line_color='gray', annotation_text=f"Pinch: {results['pinch_temperature']:.1f}°C")
fig1.update_layout(title='Composite Curves', xaxis_title='Enthalpy H (kW)', yaxis_title='Temperature T (°C)', height=600, width=900, xaxis=dict(rangemode='tozero'), yaxis=dict(rangemode='tozero'))
composite_plot_html = fig1.to_html(full_html=False, include_plotlyjs='cdn')
# Generate Grand Composite Curve plot using Plotly (interactive HTML)
gcc_H = results['grand_composite_curve']['H']
gcc_T = results['grand_composite_curve']['T']
heat_cascade = results['heat_cascade']
fig2 = go.Figure()
for i in range(len(gcc_H) - 1):
color = 'red' if i < len(heat_cascade) and heat_cascade[i]['deltaH'] > 0 else ('blue' if i < len(heat_cascade) and heat_cascade[i]['deltaH'] < 0 else 'gray')
fig2.add_trace(go.Scatter(x=[gcc_H[i], gcc_H[i+1]], y=[gcc_T[i], gcc_T[i+1]], mode='lines+markers', line=dict(color=color, width=3), marker=dict(size=8, color=color), showlegend=False))
fig2.add_hline(y=results['pinch_temperature'], line_dash='dash', line_color='gray', annotation_text=f"Pinch: {results['pinch_temperature']:.1f}°C")
fig2.add_vline(x=0, line_color='black', line_width=1, opacity=0.3)
fig2.update_layout(title='Grand Composite Curve', xaxis_title='Net ΔH (kW)', yaxis_title='Shifted Temperature (°C)', height=600, width=900, yaxis=dict(rangemode='tozero'))
gcc_plot_html = fig2.to_html(full_html=False, include_plotlyjs=False)
# Generate Temperature Interval Diagram using Plotly (interactive HTML)
interval_plot_html = ""
temps = results['temperatures']
pinch_streams = results['streams']
if pinch_streams and temps:
fig3 = go.Figure()
num_streams = len(pinch_streams)
x_positions = [(i + 1) * 1.0 for i in range(num_streams)]
# Draw horizontal temperature lines
for temperature in temps:
fig3.add_shape(type="line", x0=0, x1=num_streams + 1, y0=temperature, y1=temperature, line=dict(color="gray", width=1, dash="dot"))
# Draw pinch line
fig3.add_shape(type="line", x0=0, x1=num_streams + 1, y0=results['pinch_temperature'], y1=results['pinch_temperature'], line=dict(color="black", width=2, dash="dash"))
# Draw stream bars with arrows
for i, stream in enumerate(pinch_streams):
ss, st_temp = stream['ss'], stream['st']
color = 'red' if stream['type'] == 'HOT' else 'blue'
fig3.add_trace(go.Scatter(x=[x_positions[i], x_positions[i]], y=[ss, st_temp], mode='lines', line=dict(color=color, width=10), showlegend=False))
fig3.add_annotation(x=x_positions[i], y=st_temp, ax=x_positions[i], ay=ss, xref='x', yref='y', axref='x', ayref='y', showarrow=True, arrowhead=2, arrowsize=1.5, arrowwidth=3, arrowcolor=color)
fig3.add_annotation(x=x_positions[i], y=max(ss, st_temp) + (max(temps) - min(temps)) * 0.03, text=f"S{i+1}", showarrow=False, font=dict(size=12, color='white'), bgcolor=color)
fig3.update_layout(title='Temperature Interval Diagram', xaxis=dict(title='Streams', showticklabels=False, range=[0, num_streams + 1]), yaxis=dict(title='Shifted Temperature (°C)'), height=600, width=900, showlegend=False)
interval_plot_html = fig3.to_html(full_html=False, include_plotlyjs=False)
# Build pinch analysis section HTML
# =====================================================
# HEAT PUMP INTEGRATION ANALYSIS FOR REPORT
# =====================================================
hpi_section_html = ""
try:
# Create temporary CSV for HPI analysis
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='') as f:
writer = csv.writer(f)
writer.writerow(['Tmin', str(tmin)])
writer.writerow(['CP', 'TSUPPLY', 'TTARGET'])
for strm in streams_data:
writer.writerow([strm['CP'], strm['Tin'], strm['Tout']])
hpi_csv_path = f.name
try:
# Create Pinchmain instance
pyPinchHPI = Pinchmain(hpi_csv_path, options={})
# Create HPI instance
hpi_analysis = HPI(hpi_csv_path, Tsinkout=None, pyPinch=pyPinchHPI)
# Run HPI analysis
hpi_analysis.GCCdraw = pyPinchHPI.solvePinchforHPI().grandCompositeCurve
hpi_analysis.deleteTemperaturePockets()
hpi_analysis.GCCSource, hpi_analysis.GCCSink = hpi_analysis.splitHotandCold()
# Check if HPI is possible
if hpi_analysis.GCCSource['T'] and hpi_analysis.GCCSink['T']:
# Run integration to get available heat pumps
hpi_analysis.IntegrateHeatPump()
hpi_analysis.findIntegration()
# Get available heat pump types
all_hp_data = []
if hpi_analysis.IntegrationPoint and len(hpi_analysis.IntegrationPoint['Temp']) > 0:
int_temp = hpi_analysis.IntegrationPoint['Temp'][-1]
available_hps = hpi_analysis.get_available_heat_pumps(int_temp)
total_hot_duty = sum(abs(s['Q']) for s in streams_data if s['Tin'] > s['Tout'])
# Calculate integration for each available heat pump
for hp in available_hps:
if hp['available']:
hpi_analysis.KoWP = []
hpi_analysis.EvWP = []
hpi_analysis.COPwerte = []
hpi_analysis.COPT = []
hpi_analysis.IntegrateHeatPump_specific(hp['name'])
hpi_analysis.findIntegration()
if hpi_analysis.IntegrationPoint and len(hpi_analysis.IntegrationPoint['Temp']) > 0:
hp_int_temp = hpi_analysis.IntegrationPoint['Temp'][-1]
hp_int_qsource = hpi_analysis.IntegrationPoint['QQuelle'][-1]
hp_int_qsink = hpi_analysis.IntegrationPoint['QSenke'][-1]
hp_int_cop = hpi_analysis.IntegrationPoint['COP'][-1]
all_hp_data.append({
'name': hp['name'],
'cop': hp_int_cop,
't_source': hp_int_temp,
't_sink': hpi_analysis.Tsinkout,
'q_source': hp_int_qsource,
'q_sink': hp_int_qsink
})
# Sort by COP descending
all_hp_data.sort(key=lambda x: x['cop'], reverse=True)
if all_hp_data:
# Build heat pump comparison table with dark-mode aware styling
hp_table_html = """
| Heat Pump | COP | T_source (°C) | T_sink (°C) | Q_source (kW) | Q_sink (kW) |
"""
for hp in all_hp_data:
hp_table_html += f"""
| {hp['name']} |
{hp['cop']:.2f} |
{hp['t_source']:.1f} |
{hp['t_sink']:.1f} |
{hp['q_source']:.1f} |
{hp['q_sink']:.1f} |
"""
hp_table_html += "
"
# Generate HPI plot with all heat pumps and dropdown selector
gcc_H = hpi_analysis.GCCdraw['H']
gcc_T = hpi_analysis.GCCdraw['T']
hp_colors = ['#00CC96', '#AB63FA', '#FFA15A', '#19D3F3', '#FF6692', '#B6E880', '#FF97FF', '#FECB52']
# Create HPI figure
fig_hpi = go.Figure()
# Count GCC segments (they'll be visible for all heat pumps)
num_gcc_segments = len(gcc_H) - 1
# Plot GCC segments (always visible)
for i in range(num_gcc_segments):
if i < len(hpi_analysis.pyPinch.heatCascade):
dh = hpi_analysis.pyPinch.heatCascade[i]['deltaH']
color = 'red' if dh > 0 else ('blue' if dh < 0 else 'gray')
else:
color = 'gray'
fig_hpi.add_trace(go.Scatter(
x=[gcc_H[i], gcc_H[i+1]],
y=[gcc_T[i], gcc_T[i+1]],
mode='lines+markers',
line=dict(color=color, width=2),
marker=dict(size=5),
showlegend=False,
visible=True # Always visible
))
# Store integration data for all heat pumps
hp_integration_traces = []
# Add integration points for ALL available heat pumps
for idx, hp_data in enumerate(all_hp_data):
# Re-run integration for this heat pump
hpi_analysis.KoWP = []
hpi_analysis.EvWP = []
hpi_analysis.COPwerte = []
hpi_analysis.COPT = []
hpi_analysis.IntegrateHeatPump_specific(hp_data['name'])
hpi_analysis.findIntegration()
if hpi_analysis.IntegrationPoint and len(hpi_analysis.IntegrationPoint['Temp']) > 0:
color = hp_colors[idx % len(hp_colors)]
hp_name = hp_data['name']
int_temp = hpi_analysis.IntegrationPoint['Temp'][-1]
int_qsource = hpi_analysis.IntegrationPoint['QQuelle'][-1]
int_qsink = hpi_analysis.IntegrationPoint['QSenke'][-1]
int_cop = hpi_analysis.IntegrationPoint['COP'][-1]
t_sink = hpi_analysis.Tsinkout
# Add source marker (visible only for first HP initially)
fig_hpi.add_trace(go.Scatter(
x=[int_qsource], y=[int_temp],
mode='markers',
marker=dict(size=14, color=color, symbol='diamond',
line=dict(width=2, color=color)),
name=f'{hp_name} - Source',
showlegend=True,
visible=(idx == 0), # Only first HP visible by default
hovertemplate=f'{hp_name} - Source
T: {int_temp:.1f}°C
Q: {int_qsource:.1f} kW
COP: {int_cop:.2f}'
))
# Add sink marker (visible only for first HP initially)
fig_hpi.add_trace(go.Scatter(
x=[int_qsink], y=[t_sink],
mode='markers',
marker=dict(size=14, color=color, symbol='diamond-open',
line=dict(width=2, color=color)),
name=f'{hp_name} - Sink',
showlegend=True,
visible=(idx == 0), # Only first HP visible by default
hovertemplate=f'{hp_name} - Sink
T: {t_sink:.1f}°C
Q: {int_qsink:.1f} kW
COP: {int_cop:.2f}'
))
hp_integration_traces.append({
'name': hp_name,
'cop': int_cop,
'source_idx': len(fig_hpi.data) - 2,
'sink_idx': len(fig_hpi.data) - 1
})
# Create dropdown buttons for heat pump selection
buttons = []
for i, hp_trace in enumerate(hp_integration_traces):
# Create visibility list: GCC always visible, only selected HP visible
visible = [True] * num_gcc_segments # GCC segments always visible
for j in range(len(hp_integration_traces)):
visible.append(j == i) # source marker
visible.append(j == i) # sink marker
buttons.append(dict(
label=f"{hp_trace['name']} (COP: {hp_trace['cop']:.2f})",
method="update",
args=[{"visible": visible}]
))
# Add pinch line
fig_hpi.add_hline(y=pinch_obj.pinchTemperature, line_dash='dash', line_color='gray',
annotation_text=f"Pinch: {pinch_obj.pinchTemperature:.1f}°C")
fig_hpi.add_vline(x=0, line_color='black', line_width=1, opacity=0.3)
# Update layout with dropdown
fig_hpi.update_layout(
xaxis_title='Net ΔH (kW)',
yaxis_title='Shifted Temperature (°C)',
height=600,
width=1100,
xaxis=dict(domain=[0.35, 1], rangemode='tozero'),
yaxis=dict(domain=[0, 1], rangemode='tozero'),
showlegend=True,
legend=dict(
x=0.01,
y=0.5,
xanchor='left',
yanchor='middle',
bgcolor='rgba(255,255,255,0.8)',
bordercolor='gray',
borderwidth=1
),
updatemenus=[dict(
type="dropdown",
x=0.01, xanchor="left",
y=1, yanchor="top",
buttons=buttons,
bgcolor="white",
bordercolor="gray",
borderwidth=1
)] if buttons else [],
margin=dict(l=20, r=20, t=40, b=50)
)
hpi_plot_html = fig_hpi.to_html(full_html=False, include_plotlyjs=False)
hpi_section_html = f"""
Heat Pump Integration Analysis
🔍 Heat Pump Comparison
{hp_table_html}
{hpi_plot_html}
"""
finally:
os.unlink(hpi_csv_path)
except Exception as hpi_error:
hpi_section_html = f"Heat Pump Integration Analysis
Heat pump integration could not be performed: {str(hpi_error)}
"
pinch_section_html = f"""
📊 Potential Analysis
Stream Selection & Process Map
Selected Streams
{stream_list_html}
Process Overview
{"

" if process_map_streams_b64 else "
Map not available
"}
Selected Streams (Detailed)
{streams_table_html}
Pinch Analysis Results (ΔTmin = {tmin}°C)
Minimum Heating Demand
{results['hot_utility']:.2f} kW
Minimum Cooling Demand
{results['cold_utility']:.2f} kW
Pinch Temperature
{results['pinch_temperature']:.1f} °C
Heat Recovery Potential
{results['heat_recovery']:.2f} kW
Diagrams
{composite_plot_html}
{gcc_plot_html}
{hpi_section_html}
Temperature Interval Diagram
📝 Pinch Analysis Notes
{pinch_notes_html}
"""
finally:
os.unlink(temp_csv_path)
else:
pinch_section_html = """
📊 Potential Analysis
Not enough streams selected for pinch analysis. Select at least 2 streams with complete data.
"""
except Exception as e:
pinch_section_html = f"""
📊 Potential Analysis
Error generating pinch analysis: {str(e)}
"""
else:
pinch_section_html = """
📊 Potential Analysis
Pinch analysis module not available.
"""
# Generate HTML with multi-page navigation
html_content = f"""
Heat Integration Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}
- 📍 Data Collection
- 📊 Potential Analysis
📍 Data Collection
Process Level Maps
{"

" if process_map_osm_b64 else "
Process map not available
"}
OpenStreetMap
{"

" if process_map_satellite_b64 else "
Satellite map not available
"}
Satellite
Subprocess Level Maps
{"
" + "".join([f"

{m['name']}
" for m in subprocess_maps_b64]) + "
" if subprocess_maps_b64 else "
No subprocess maps available
"}
📋 Collected Data
{data_table_html}
📝 Data Collection Notes
{notes_html}
{pinch_section_html}
"""
return html_content
# Title and Generate Report button
title_col, button_col = st.columns([4, 1])
with title_col:
st.title("Potential Analysis")
with button_col:
st.write("") # Spacer to align button
if st.button("📄 Generate Report", key="generate_report_btn"):
try:
html_data = generate_report()
st.download_button(
label="⬇️ Download Report",
data=html_data,
file_name=f"heat_integration_report_{datetime.now().strftime('%Y%m%d_%H%M')}.html",
mime="text/html",
key="download_report_btn"
)
except Exception as e:
st.error(f"Error generating report: {str(e)}")
# Initialize session state for selections if not exists
if 'selected_items' not in st.session_state:
st.session_state['selected_items'] = {}
if 'current_energy_demands' not in st.session_state:
st.session_state['current_energy_demands'] = [
{
'heat_demand': 0.0,
'cooling_demand': 0.0,
'selected_streams': [] # Will be populated with all streams
}
]
# Helper function to determine stream type and extract data
def get_stream_info(stream):
"""Extract Tin, Tout, mdot, cp, CP from stream and determine if Hot stream (Heat Source) or Cold Stream (Heat Sink).
Calculate Q = CP * (Tout - Tin).
CP can be provided directly, or calculated as mdot * cp.
"""
properties = stream.get('properties', {})
values = stream.get('values', {})
# Also check stream_values (new structure)
stream_values = stream.get('stream_values', {})
if not stream_values:
stream_values = stream.get('product_values', {})
tin = None
tout = None
mdot = None
cp_val = None
CP_direct = None # CP provided directly
# First try stream_values (new structure)
if stream_values:
if 'Tin' in stream_values and stream_values['Tin']:
try:
tin = float(stream_values['Tin'])
except (ValueError, TypeError):
pass
if 'Tout' in stream_values and stream_values['Tout']:
try:
tout = float(stream_values['Tout'])
except (ValueError, TypeError):
pass
if 'ṁ' in stream_values and stream_values['ṁ']:
try:
mdot = float(stream_values['ṁ'])
except (ValueError, TypeError):
pass
if 'cp' in stream_values and stream_values['cp']:
try:
cp_val = float(stream_values['cp'])
except (ValueError, TypeError):
pass
if 'CP' in stream_values and stream_values['CP']:
try:
CP_direct = float(stream_values['CP'])
except (ValueError, TypeError):
pass
# Check properties dict structure
if isinstance(properties, dict) and isinstance(values, dict):
for pk, pname in properties.items():
vk = pk.replace('prop', 'val')
v = values.get(vk, '')
if pname == 'Tin' and v and tin is None:
try:
tin = float(v)
except (ValueError, TypeError):
pass
elif pname == 'Tout' and v and tout is None:
try:
tout = float(v)
except (ValueError, TypeError):
pass
elif pname == 'ṁ' and v and mdot is None:
try:
mdot = float(v)
except (ValueError, TypeError):
pass
elif pname == 'cp' and v and cp_val is None:
try:
cp_val = float(v)
except (ValueError, TypeError):
pass
elif pname == 'CP' and v and CP_direct is None:
try:
CP_direct = float(v)
except (ValueError, TypeError):
pass
# Fallback to legacy fields
if tin is None and stream.get('temp_in'):
try:
tin = float(stream['temp_in'])
except (ValueError, TypeError):
pass
if tout is None and stream.get('temp_out'):
try:
tout = float(stream['temp_out'])
except (ValueError, TypeError):
pass
if mdot is None and stream.get('mdot'):
try:
mdot = float(stream['mdot'])
except (ValueError, TypeError):
pass
if cp_val is None and stream.get('cp'):
try:
cp_val = float(stream['cp'])
except (ValueError, TypeError):
pass
# Determine stream type
stream_type = None
if tin is not None and tout is not None:
if tin > tout:
stream_type = "Hot stream (Heat Source)"
else:
stream_type = "Cold stream (Heat sink)"
# Determine CP: use direct CP if provided, otherwise calculate from mdot * cp
CP_flow = None
if CP_direct is not None:
CP_flow = CP_direct
elif mdot is not None and cp_val is not None:
CP_flow = mdot * cp_val
# Calculate Q = CP * |Tout - Tin| (always positive)
Q = None
if CP_flow is not None and tin is not None and tout is not None:
Q = abs(CP_flow * (tout - tin))
return {
'tin': tin,
'tout': tout,
'mdot': mdot,
'cp': cp_val,
'CP': CP_flow,
'Q': Q,
'type': stream_type
}
# Get processes from session state
processes = st.session_state.get('processes', [])
if not processes:
st.info("No processes found. Please add processes in the Data Collection page first.")
else:
# Create side-by-side layout: streams on left, demands in middle, map on right
streams_col, demands_col, map_col = st.columns([0.7, 0.8, 1.2])
# Left column: Display streams with selection checkboxes
with streams_col:
st.markdown("**Streams Selection**")
for idx, process in enumerate(processes):
process_name = process.get('name', f'Subprocess {idx + 1}')
# Only show process header if it has streams
streams = process.get('streams', [])
if streams:
st.markdown(f"**{process_name}**")
for stream_idx, stream in enumerate(streams):
stream_key = f"stream_{idx}_{stream_idx}"
if stream_key not in st.session_state['selected_items']:
st.session_state['selected_items'][stream_key] = True
stream_cols_inner = st.columns([0.05, 0.25, 0.7])
stream_selected = stream_cols_inner[0].checkbox(
"S",
key=f"cb_{stream_key}",
value=st.session_state['selected_items'][stream_key],
label_visibility="collapsed"
)
st.session_state['selected_items'][stream_key] = stream_selected
# Display stream name
stream_name = stream.get('name', f'Stream {stream_idx + 1}')
stream_cols_inner[1].write(stream_name)
# Get stream info and display type + key values
info = get_stream_info(stream)
display_parts = []
if info['tin'] is not None:
display_parts.append(f"Tin:{info['tin']}°C")
if info['tout'] is not None:
display_parts.append(f"Tout:{info['tout']}°C")
if info['CP'] is not None:
display_parts.append(f"CP:{info['CP']:.2f}")
if info['Q'] is not None:
display_parts.append(f"Q:{info['Q']:.2f} kW")
if info['type']:
type_color = "🔴" if info['type'] == "Hot stream (Heat Source)" else "🔵"
display_parts.append(f"{type_color} {info['type']}")
if display_parts:
stream_cols_inner[2].caption(' | '.join(display_parts))
else:
stream_cols_inner[2].caption("(incomplete data)")
# Middle column: Current Energy Demands
with demands_col:
with st.expander("Current energy demand set", expanded=False):
# Create stream options once
stream_options = []
for idx, process in enumerate(processes):
for stream_idx, stream in enumerate(process.get('streams', [])):
stream_name = stream.get('name', f'Stream {stream_idx + 1} in {process.get('name', f'Subprocess {idx + 1}')}')
stream_options.append((idx, stream_idx, stream_name))
all_stream_names = [name for _, _, name in stream_options]
# Initialize selected_streams for the first demand if empty
if not st.session_state['current_energy_demands'][0]['selected_streams']:
st.session_state['current_energy_demands'][0]['selected_streams'] = all_stream_names.copy()
# Display each demand
demands_to_remove = []
for i, demand in enumerate(st.session_state['current_energy_demands']):
st.markdown(f"**Demand Set {i+1}**")
col1, col2, col3 = st.columns([2, 2, 1])
with col1:
demand['heat_demand'] = st.number_input(f"Heat Demand (kW) - Set {i+1}", min_value=0.0, step=0.1, value=demand['heat_demand'], key=f"heat_{i}")
with col2:
demand['cooling_demand'] = st.number_input(f"Cooling Demand (kW) - Set {i+1}", min_value=0.0, step=0.1, value=demand['cooling_demand'], key=f"cooling_{i}")
with col3:
if i > 0: # Don't allow deleting the first demand
if st.button("🗑️", key=f"delete_{i}"):
demands_to_remove.append(i)
demand['selected_streams'] = st.multiselect(
f"Streams - Set {i+1}",
all_stream_names,
default=demand['selected_streams'] if demand['selected_streams'] else all_stream_names,
key=f"streams_{i}"
)
st.markdown("---")
# Remove demands marked for deletion
for i in reversed(demands_to_remove):
st.session_state['current_energy_demands'].pop(i)
# Add new demand button
if st.button("➕ Add Demand Set"):
st.session_state['current_energy_demands'].append({
'heat_demand': 0.0,
'cooling_demand': 0.0,
'selected_streams': all_stream_names.copy()
})
st.rerun()
# Right column: Display map with process circles
with map_col:
st.markdown("**🗺️ Georeferencing**")
if st.session_state.get('map_snapshots'):
map_img = generate_process_level_map()
if map_img:
st.image(map_img)
else:
st.info("Map not available. Lock a map in Data Collection first.")
# Count selected streams
selected_count = sum(1 for k, v in st.session_state['selected_items'].items()
if v and k.startswith("stream_"))
# =====================================================
# PINCH ANALYSIS SECTION
# =====================================================
st.markdown("---")
if not PINCH_AVAILABLE:
st.error(f"Pinch analysis module not available: {PINCH_IMPORT_ERROR or 'Unknown error'}")
st.info("Please ensure the pinch_tool module is properly installed.")
else:
# Helper function to extract stream data from selection
def extract_stream_data(procs, sel_items):
"""
Extract stream data from selected items.
Returns list of dicts with: CP, Tin, Tout, Q
CP can be provided directly, or calculated as mdot * cp.
Q = CP * (Tout - Tin)
"""
result_streams = []
for sel_key, is_sel in sel_items.items():
if not is_sel:
continue
if sel_key.startswith("stream_"):
parts_split = sel_key.split("_")
p_idx = int(parts_split[1])
s_idx = int(parts_split[2])
if p_idx < len(procs):
proc = procs[p_idx]
proc_streams = proc.get('streams', [])
if s_idx < len(proc_streams):
strm = proc_streams[s_idx]
# Use get_stream_info to extract all values consistently
info = get_stream_info(strm)
tin = info['tin']
tout = info['tout']
CP = info['CP']
Q = info['Q']
# Only add if we have the required data
if tin is not None and tout is not None and CP is not None:
strm_name = strm.get('name', f'Stream {s_idx + 1}')
proc_nm = proc.get('name', f'Subprocess {p_idx + 1}')
result_streams.append({
'name': f"{proc_nm} - {strm_name}",
'CP': CP,
'Tin': tin,
'Tout': tout,
'Q': Q
})
return result_streams
# Helper function to run pinch analysis
def run_pinch_analysis(strm_data, delta_tmin):
"""
Run pinch analysis on the given stream data.
Returns the Pinch object with results.
"""
# Create a temporary CSV file with the stream data
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='') as f:
writer = csv.writer(f)
writer.writerow(['Tmin', str(delta_tmin)])
writer.writerow(['CP', 'TSUPPLY', 'TTARGET'])
for strm in strm_data:
writer.writerow([strm['CP'], strm['Tin'], strm['Tout']])
temp_csv_path = f.name
try:
# Run pinch analysis without drawing (we'll draw ourselves)
pinch_obj = Pinch(temp_csv_path, options={})
pinch_obj.shiftTemperatures()
pinch_obj.constructTemperatureInterval()
pinch_obj.constructProblemTable()
pinch_obj.constructHeatCascade()
pinch_obj.constructShiftedCompositeDiagram('EN')
pinch_obj.constructCompositeDiagram('EN')
pinch_obj.constructGrandCompositeCurve('EN')
return pinch_obj
finally:
# Clean up temp file
os.unlink(temp_csv_path)
# Extract stream data from selections
streams_data = extract_stream_data(processes, st.session_state['selected_items'])
if len(streams_data) < 2:
st.info("Select at least 2 streams with complete data (Tin, Tout, and either CP or ṁ+cp) to run pinch analysis.")
# Show what data is missing for selected streams
if selected_count > 0:
st.markdown("**Data status for selected items:**")
for sel_key, is_sel in st.session_state['selected_items'].items():
if not is_sel:
continue
if sel_key.startswith("stream_"):
parts_split = sel_key.split("_")
p_idx = int(parts_split[1])
s_idx = int(parts_split[2])
if p_idx < len(processes):
proc = processes[p_idx]
proc_streams = proc.get('streams', [])
if s_idx < len(proc_streams):
strm = proc_streams[s_idx]
strm_name = strm.get('name', f'Stream {s_idx + 1}')
proc_nm = proc.get('name', f'Subprocess {p_idx + 1}')
# Check what data is available
props = strm.get('properties', {})
vals = strm.get('values', {})
has_tin = False
has_tout = False
has_mdot = False
has_cp = False
has_CP = False # CP = ṁ * cp (heat capacity rate)
# Check stream_values (new structure)
stream_vals = strm.get('stream_values', {})
if not stream_vals:
stream_vals = strm.get('product_values', {})
if stream_vals:
if stream_vals.get('Tin'):
has_tin = True
if stream_vals.get('Tout'):
has_tout = True
if stream_vals.get('ṁ'):
has_mdot = True
if stream_vals.get('cp'):
has_cp = True
if stream_vals.get('CP'):
has_CP = True
if isinstance(props, dict) and isinstance(vals, dict):
for pk, pname in props.items():
vk = pk.replace('prop', 'val')
v = vals.get(vk, '')
if pname == 'Tin' and v:
has_tin = True
elif pname == 'Tout' and v:
has_tout = True
elif pname == 'ṁ' and v:
has_mdot = True
elif pname == 'cp' and v:
has_cp = True
elif pname == 'CP' and v:
has_CP = True
# Fallback to legacy
if not has_tin and strm.get('temp_in'):
has_tin = True
if not has_tout and strm.get('temp_out'):
has_tout = True
if not has_mdot and strm.get('mdot'):
has_mdot = True
if not has_cp and strm.get('cp'):
has_cp = True
missing = []
if not has_tin:
missing.append("Tin")
if not has_tout:
missing.append("Tout")
# Either CP is provided directly, or both ṁ and cp are needed
if not has_CP and not (has_mdot and has_cp):
if not has_mdot:
missing.append("ṁ")
if not has_cp:
missing.append("cp")
if not missing or (not has_mdot and not has_cp):
# If neither ṁ nor cp, suggest CP as alternative
missing.append("(or CP)")
if missing:
st.warning(f"⚠️ {proc_nm} - {strm_name}: Missing {', '.join(missing)}")
else:
st.success(f"✅ {proc_nm} - {strm_name}: Complete data")
else:
# Auto-run pinch analysis
try:
# Row: Shifted toggle | ΔTmin (small) | spacer | Hot Utility | Cold Utility | Pinch Temp | Heat Recovery
toggle_col, tmin_col, spacer, metric1, metric2, metric3, metric4 = st.columns([0.6, 0.5, 0.3, 0.6, 0.6, 0.6, 0.7])
with toggle_col:
show_shifted = st.toggle("Show Shifted Composite Curves", value=False, key="shifted_toggle")
with tmin_col:
tmin = st.number_input(
"ΔTmin",
min_value=1.0,
max_value=50.0,
value=10.0,
step=1.0,
key="tmin_input",
format="%.0f"
)
pinch = run_pinch_analysis(streams_data, tmin)
# Calculate Heat Recovery: Total heat available from hot streams minus hot utility needed
total_hot_duty = sum(abs(s['Q']) for s in streams_data if s['Tin'] > s['Tout'])
heat_recovery = total_hot_duty - pinch.hotUtility
results = {
'hot_utility': pinch.hotUtility,
'cold_utility': pinch.coldUtility,
'pinch_temperature': pinch.pinchTemperature,
'heat_recovery': heat_recovery,
'tmin': pinch.tmin,
'composite_diagram': pinch.compositeDiagram,
'shifted_composite_diagram': pinch.shiftedCompositeDiagram,
'grand_composite_curve': pinch.grandCompositeCurve,
'heat_cascade': pinch.heatCascade,
'unfeasible_heat_cascade': pinch.unfeasibleHeatCascade,
'problem_table': pinch.problemTable,
'temperatures': pinch._temperatures,
'streams': list(pinch.streams)
}
metric1.metric("Minimum Heating Demand", f"{results['hot_utility']:.2f} kW")
metric2.metric("Minimum Cooling Demand", f"{results['cold_utility']:.2f} kW")
metric3.metric("Pinch Temperature", f"{results['pinch_temperature']:.1f} °C")
metric4.metric("Heat Recovery Potential", f"{results['heat_recovery']:.2f} kW")
# Side by side plots: Composite Curves (left) and Grand Composite Curve (right)
plot_col1, plot_col2 = st.columns(2)
# Build hover text for streams
hot_streams = [s for s in streams_data if s['Tin'] > s['Tout']]
cold_streams = [s for s in streams_data if s['Tin'] < s['Tout']]
with plot_col1:
fig1 = go.Figure()
# Select which diagram to show
if show_shifted:
diagram = results['shifted_composite_diagram']
curve_label = "Shifted"
title_text = "Shifted Composite Curves"
# For shifted, temperatures are shifted by ±Tmin/2
tmin_half = results['tmin'] / 2
else:
diagram = results['composite_diagram']
curve_label = ""
title_text = "Composite Curves"
tmin_half = 0
# Hot stream (Heat Source) composite curve with hover info
hot_T = diagram['hot']['T']
hot_H = diagram['hot']['H']
# Create hover text for hot stream (Heat Source) curve points
hot_hover = []
for i, (h, t) in enumerate(zip(hot_H, hot_T)):
# Find streams at this temperature (adjust for shifted temps)
if show_shifted:
actual_t = t + tmin_half # Convert back to actual temp
else:
actual_t = t
matching = [s['name'] for s in hot_streams if min(s['Tin'], s['Tout']) <= actual_t <= max(s['Tin'], s['Tout'])]
stream_info = '
'.join(matching) if matching else 'Composite'
label = f"Hot stream (Heat Source) {curve_label}" if curve_label else "Hot stream (Heat Source) Composite"
hot_hover.append(f"{label}
T: {t:.1f}°C
H: {h:.1f} kW
Streams: {stream_info}")
fig1.add_trace(go.Scatter(
x=hot_H, y=hot_T,
mode='lines+markers',
name='Hot streams (Heat sources)',
line=dict(color='red', width=2),
marker=dict(size=6),
hovertemplate='%{text}',
text=hot_hover
))
# Cold Stream (Heat Sink) composite curve with hover info
cold_T = diagram['cold']['T']
cold_H = diagram['cold']['H']
# Create hover text for Cold Stream (Heat Sink) curve points
cold_hover = []
for i, (h, t) in enumerate(zip(cold_H, cold_T)):
if show_shifted:
actual_t = t - tmin_half # Convert back to actual temp
else:
actual_t = t
matching = [s['name'] for s in cold_streams if min(s['Tin'], s['Tout']) <= actual_t <= max(s['Tin'], s['Tout'])]
stream_info = '
'.join(matching) if matching else 'Composite'
label = f"Cold streams (Heat sinks) {curve_label}" if curve_label else "Cold streams (Heat sinks) Composite"
cold_hover.append(f"{label}
T: {t:.1f}°C
H: {h:.1f} kW
Streams: {stream_info}")
fig1.add_trace(go.Scatter(
x=cold_H, y=cold_T,
mode='lines+markers',
name='Cold streams (Heat sinks)',
line=dict(color='blue', width=2),
marker=dict(size=6),
hovertemplate='%{text}',
text=cold_hover
))
# Pinch temperature line
fig1.add_hline(
y=results['pinch_temperature'],
line_dash='dash',
line_color='gray',
annotation_text=f"Pinch: {results['pinch_temperature']:.1f}°C",
annotation_position='top right'
)
fig1.update_layout(
title=dict(text=title_text, font=dict(size=14)),
xaxis_title='Enthalpy H (kW)',
yaxis_title='Temperature T (°C)',
height=400,
margin=dict(l=60, r=20, t=40, b=50),
legend=dict(x=0.7, y=0.1),
hovermode='closest',
xaxis=dict(rangemode='tozero'),
yaxis=dict(rangemode='tozero')
)
st.plotly_chart(fig1, width='stretch', key="composite_chart")
with plot_col2:
fig2 = go.Figure()
gcc_H = results['grand_composite_curve']['H']
gcc_T = results['grand_composite_curve']['T']
heat_cascade = results['heat_cascade']
temperatures = results['temperatures']
# Create hover text for GCC points
gcc_hover = []
for i, (h, t) in enumerate(zip(gcc_H, gcc_T)):
if i < len(heat_cascade):
dh = heat_cascade[i]['deltaH']
region = 'Heat deficit (needs heating)' if dh > 0 else ('Heat surplus (needs cooling)' if dh < 0 else 'Balanced')
else:
region = ''
gcc_hover.append(f"GCC
Shifted T: {t:.1f}°C
Net ΔH: {h:.1f} kW
{region}")
# Plot GCC with color segments
for i in range(len(gcc_H) - 1):
if i < len(heat_cascade):
if heat_cascade[i]['deltaH'] > 0:
color = 'red'
elif heat_cascade[i]['deltaH'] < 0:
color = 'blue'
else:
color = 'gray'
else:
color = 'gray'
fig2.add_trace(go.Scatter(
x=[gcc_H[i], gcc_H[i+1]],
y=[gcc_T[i], gcc_T[i+1]],
mode='lines+markers',
line=dict(color=color, width=2),
marker=dict(size=6, color=color),
hovertemplate='%{text}',
text=[gcc_hover[i], gcc_hover[i+1] if i+1 < len(gcc_hover) else ''],
showlegend=False
))
# Pinch temperature line
fig2.add_hline(
y=results['pinch_temperature'],
line_dash='dash',
line_color='gray',
annotation_text=f"Pinch: {results['pinch_temperature']:.1f}°C",
annotation_position='top right'
)
# Zero enthalpy line
fig2.add_vline(x=0, line_color='black', line_width=1, opacity=0.3)
fig2.update_layout(
title=dict(text='Grand Composite Curve', font=dict(size=14)),
xaxis_title='Net ΔH (kW)',
yaxis_title='Shifted Temperature (°C)',
height=400,
margin=dict(l=60, r=20, t=40, b=50),
hovermode='closest',
yaxis=dict(rangemode='tozero')
)
st.plotly_chart(fig2, width='stretch', key="gcc_chart")
# Heat Pump Integration Analysis
st.markdown("---")
st.markdown("#### Heat Pump Integration Analysis")
try:
# Create temporary CSV for HPI analysis (same as pinch analysis)
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='') as f:
writer = csv.writer(f)
writer.writerow(['Tmin', str(tmin)])
writer.writerow(['CP', 'TSUPPLY', 'TTARGET'])
for strm in streams_data:
writer.writerow([strm['CP'], strm['Tin'], strm['Tout']])
hpi_csv_path = f.name
try:
# Create Pinchmain instance
pyPinchHPI = Pinchmain(hpi_csv_path, options={})
# Create HPI instance (Tsinkout=None for iterative analysis)
hpi_analysis = HPI(hpi_csv_path, Tsinkout=None, pyPinch=pyPinchHPI)
# Run HPI analysis to get data (without plotting)
hpi_analysis.GCCdraw = pyPinchHPI.solvePinchforHPI().grandCompositeCurve
hpi_analysis.deleteTemperaturePockets()
hpi_analysis.GCCSource, hpi_analysis.GCCSink = hpi_analysis.splitHotandCold()
# Check if there are sufficient hot and cold streams for heat pump integration
if not hpi_analysis.GCCSource['T'] or not hpi_analysis.GCCSink['T']:
st.warning("⚠️ Heat pump integration is not possible with the selected streams. The Grand Composite Curve does not have both heating and cooling requirements suitable for heat pump integration.")
raise ValueError("Insufficient streams for heat pump integration")
# First run to get available heat pumps
hpi_analysis.IntegrateHeatPump()
hpi_analysis.findIntegration()
# Get available heat pump types at the integration point
all_hp_data = []
if hpi_analysis.IntegrationPoint and len(hpi_analysis.IntegrationPoint['Temp']) > 0:
int_temp = hpi_analysis.IntegrationPoint['Temp'][-1]
available_hps = hpi_analysis.get_available_heat_pumps(int_temp)
# Calculate total heat available from hot streams
total_hot_duty = sum(abs(s['Q']) for s in streams_data if s['Tin'] > s['Tout'])
# Calculate integration for each available heat pump
for hp in available_hps:
if hp['available']:
# Run integration for this specific heat pump
hpi_analysis.KoWP = []
hpi_analysis.EvWP = []
hpi_analysis.COPwerte = []
hpi_analysis.COPT = []
hpi_analysis.IntegrateHeatPump_specific(hp['name'])
hpi_analysis.findIntegration()
# Get integration results
if hpi_analysis.IntegrationPoint and len(hpi_analysis.IntegrationPoint['Temp']) > 0:
hp_int_temp = hpi_analysis.IntegrationPoint['Temp'][-1]
hp_int_qsource = hpi_analysis.IntegrationPoint['QQuelle'][-1]
hp_int_qsink = hpi_analysis.IntegrationPoint['QSenke'][-1]
hp_int_cop = hpi_analysis.IntegrationPoint['COP'][-1]
coverage = (hp_int_qsource / total_hot_duty * 100) if total_hot_duty > 0 else 0
all_hp_data.append({
'name': hp['name'],
'cop': hp_int_cop,
't_source': hp_int_temp,
't_sink': hpi_analysis.Tsinkout,
'q_source': hp_int_qsource,
'q_sink': hp_int_qsink,
'coverage': coverage,
'available': True
})
else:
# Unavailable heat pump
all_hp_data.append({
'name': hp['name'],
'cop': None,
't_source': None,
't_sink': None,
'q_source': None,
'q_sink': None,
'coverage': None,
'available': False,
'reason': hp['reason']
})
# Sort by COP descending
available_hps_sorted = sorted(
[hp for hp in all_hp_data if hp['available']],
key=lambda x: x['cop'],
reverse=True
)
available_hp_names = [hp['name'] for hp in available_hps_sorted]
# Store integration data for all available heat pumps initially
all_hp_integration_data = []
for hp_name in available_hp_names:
# Re-run integration with this specific heat pump type
hpi_analysis.KoWP = []
hpi_analysis.EvWP = []
hpi_analysis.COPwerte = []
hpi_analysis.COPT = []
hpi_analysis.IntegrateHeatPump_specific(hp_name)
hpi_analysis.findIntegration()
# Store the integration data
if hpi_analysis.IntegrationPoint and len(hpi_analysis.IntegrationPoint['Temp']) > 0:
all_hp_integration_data.append({
'name': hp_name,
'int_temp': hpi_analysis.IntegrationPoint['Temp'][-1],
'int_qsource': hpi_analysis.IntegrationPoint['QQuelle'][-1],
'int_qsink': hpi_analysis.IntegrationPoint['QSenke'][-1],
'int_cop': hpi_analysis.IntegrationPoint['COP'][-1],
't_sink': hpi_analysis.Tsinkout
})
else:
available_hp_names = []
all_hp_integration_data = []
# Get HPI data
gcc_H = hpi_analysis.GCCdraw['H']
gcc_T = hpi_analysis.GCCdraw['T']
integration_point = hpi_analysis.IntegrationPoint
# Define colors for different heat pumps
hp_colors = ['#00CC96', '#AB63FA', '#FFA15A', '#19D3F3', '#FF6692', '#B6E880', '#FF97FF', '#FECB52']
# Show heat pump comparison table on left and chart on right
if all_hp_data:
hpi_col1, hpi_spacer, hpi_col2 = st.columns([0.35, 0.05, 0.60])
with hpi_col1:
# Create color mapping for heat pumps based on integration data order
hp_color_map = {}
for idx, hp_int in enumerate(all_hp_integration_data):
hp_color_map[hp_int['name']] = hp_colors[idx % len(hp_colors)]
# Only show available heat pumps, sorted by COP descending
available_data = []
for hp in all_hp_data:
if hp['available']:
# Get color for this heat pump
color = hp_color_map.get(hp['name'], '#999999')
available_data.append({
'Heat Pump': hp['name'],
'COP': f"{hp['cop']:.2f}",
'T_source (°C)': f"{hp['t_source']:.1f}",
'T_sink (°C)': f"{hp['t_sink']:.1f}",
'Q_source (kW)': f"{hp['q_source']:.1f}",
'Q_sink (kW)': f"{hp['q_sink']:.1f}",
'cop_value': hp['cop'], # Keep for sorting
'color': color
})
# Sort available heat pumps by COP (descending)
available_data.sort(key=lambda x: x['cop_value'], reverse=True)
# Build table with dark-mode aware styling
table_html = (
''
'| Heat Pump | COP | T_source (°C) | T_sink (°C) | Q_source (kW) | Q_sink (kW) |
'
)
for item in available_data:
table_html += f'| {item["Heat Pump"]} | {item["COP"]} | {item["T_source (°C)"]} | {item["T_sink (°C)"]} | {item["Q_source (kW)"]} | {item["Q_sink (kW)"]} |
'
table_html += '
'
st.markdown(table_html, unsafe_allow_html=True)
# Show reasons for unavailable heat pumps
unavailable_reasons = [hp for hp in all_hp_data if not hp['available']]
if unavailable_reasons:
with st.expander("ℹ️ Heat pumps that cannot be integrated"):
for hp in unavailable_reasons:
st.markdown(f"**{hp['name']}**: {hp['reason']}")
with hpi_col2:
# Add multiselect for heat pump selection
selected_hps = st.multiselect(
"Select Heat Pump Types to visualize:",
available_hp_names,
default=[available_hp_names[0]] if available_hp_names else [],
key="hp_type_multiselect"
)
# Filter integration data based on selection
hp_integration_data = [hp for hp in all_hp_integration_data if hp['name'] in selected_hps]
# Create columns for legend and chart within hpi_col2
legend_col, chart_col = st.columns([0.15, 0.85])
with legend_col:
# Create custom legend with larger symbols
st.markdown("**Legend**")
legend_html = ''
for idx, hp_data in enumerate(hp_integration_data):
color = hp_colors[idx % len(hp_colors)]
hp_name = hp_data['name']
legend_html += f'
◆{hp_name} - Source
'
legend_html += f'
◇{hp_name} - Sink
'
legend_html += '
'
st.markdown(legend_html, unsafe_allow_html=True)
with chart_col:
# Create Plotly figure for HPI
fig_hpi = go.Figure()
# Plot GCC segments with colors (red for heating, blue for cooling)
for i in range(len(gcc_H) - 1):
# Determine color based on deltaH
if i < len(hpi_analysis.pyPinch.heatCascade):
dh = hpi_analysis.pyPinch.heatCascade[i]['deltaH']
color = 'red' if dh > 0 else ('blue' if dh < 0 else 'gray')
else:
color = 'gray'
fig_hpi.add_trace(go.Scatter(
x=[gcc_H[i], gcc_H[i+1]],
y=[gcc_T[i], gcc_T[i+1]],
mode='lines+markers',
line=dict(color=color, width=2),
marker=dict(size=5),
showlegend=False,
hovertemplate='T: %{y:.1f}°C
H: %{x:.1f} kW'
))
# Add integration points for each selected heat pump
for idx, hp_data in enumerate(hp_integration_data):
color = hp_colors[idx % len(hp_colors)]
hp_name = hp_data['name']
int_temp = hp_data['int_temp']
int_qsource = hp_data['int_qsource']
int_qsink = hp_data['int_qsink']
int_cop = hp_data['int_cop']
t_sink = hp_data['t_sink']
# Add diamond markers for source (evaporator)
fig_hpi.add_trace(go.Scatter(
x=[int_qsource],
y=[int_temp],
mode='markers',
marker=dict(size=12, color=color, symbol='diamond',
line=dict(width=2, color=color)),
name=f'{hp_name} - Source',
legendgroup=hp_name,
hovertemplate=f'{hp_name} - Source
T: {int_temp:.1f}°C
Q: {int_qsource:.1f} kW
COP: {int_cop:.2f}'
))
# Add diamond markers for sink (condenser)
fig_hpi.add_trace(go.Scatter(
x=[int_qsink],
y=[t_sink],
mode='markers',
marker=dict(size=12, color=color, symbol='diamond-open',
line=dict(width=2, color=color)),
name=f'{hp_name} - Sink',
legendgroup=hp_name,
hovertemplate=f'{hp_name} - Sink
T: {t_sink:.1f}°C
Q: {int_qsink:.1f} kW
COP: {int_cop:.2f}'
))
# Add pinch line
fig_hpi.add_hline(
y=pinch.pinchTemperature,
line_dash='dash',
line_color='gray',
annotation_text=f"Pinch: {pinch.pinchTemperature:.1f}°C",
annotation_position='top right'
)
# Add zero enthalpy line
fig_hpi.add_vline(x=0, line_color='black', line_width=1, opacity=0.3)
fig_hpi.update_layout(
title=dict(text='Heat Pump Integration - Grand Composite Curve', font=dict(size=14)),
xaxis_title='Net ΔH (kW)',
yaxis_title='Shifted Temperature (°C)',
height=400,
margin=dict(l=60, r=20, t=40, b=50),
hovermode='closest',
yaxis=dict(rangemode='tozero'),
showlegend=False
)
st.plotly_chart(fig_hpi, key="hpi_chart")
else:
st.plotly_chart(fig_hpi, key="hpi_chart")
# Show reasons for unavailable heat pumps
unavailable_reasons = [hp for hp in all_hp_data if not hp['available']]
if unavailable_reasons:
with st.expander("ℹ️ Heat pumps that cannot be integrated"):
for hp in unavailable_reasons:
st.markdown(f"**{hp['name']}**: {hp['reason']}")
finally:
os.unlink(hpi_csv_path)
except ValueError as ve:
# This is our custom error for insufficient streams
pass # Warning already shown above
except IndexError:
st.warning("⚠️ Heat pump integration is not possible with the selected streams. Please select streams with both heating and cooling requirements that allow for heat pump integration.")
except Exception as hpi_error:
st.warning(f"⚠️ Heat pump integration could not be performed with the selected streams: {str(hpi_error)}")
# =====================================================
# STATUS QUO VS PINCH ANALYSIS COMPARISON
# =====================================================
st.markdown("---")
st.markdown("## Status Quo vs Pinch Analysis Comparison")
# Check if we have current energy demands
current_demands = st.session_state.get('current_energy_demands', [])
has_demands = any(d['heat_demand'] > 0 or d['cooling_demand'] > 0 for d in current_demands)
if not has_demands:
st.info("💡 Current energy demands are required to calculate the difference to status quo. Please configure energy demand sets in the 'Current energy demand set' section above.")
else:
# Calculate total current demands
total_current_heat = sum(d['heat_demand'] for d in current_demands)
total_current_cooling = sum(d['cooling_demand'] for d in current_demands)
# Check if pinch analysis was run (look for the results in the session or check if we have the data)
pinch_results_available = False
pinch_heat_min = 0
pinch_cooling_min = 0
# Try to get pinch results from the analysis above
try:
if 'results' in locals() and results:
pinch_heat_min = results['hot_utility']
pinch_cooling_min = results['cold_utility']
pinch_results_available = True
except:
pinch_results_available = False
if pinch_results_available:
# Calculate improvements (positive = savings, negative = additional need)
heat_improvement = total_current_heat - pinch_heat_min
cooling_improvement = total_current_cooling - pinch_cooling_min
# Create comparison table
import pandas as pd
comparison_data = {
'Energy Type': ['Heating Demand', 'Cooling Demand'],
'Current Status Quo (kW)': [f"{total_current_heat:.1f}", f"{total_current_cooling:.1f}"],
'Pinch Reduction (kW)': [f"-{pinch_heat_min:.1f}", f"-{pinch_cooling_min:.1f}"],
'Savings (kW)': [f"-{heat_improvement:.1f}", f"-{cooling_improvement:.1f}"],
'Percentage': [
f"-{abs(heat_improvement) / total_current_heat * 100:.1f}%" if total_current_heat > 0 else "N/A",
f"-{abs(cooling_improvement) / total_current_cooling * 100:.1f}%" if total_current_cooling > 0 else "N/A"
]
}
comparison_df = pd.DataFrame(comparison_data)
st.table(comparison_df)
else:
st.warning("⚠️ Pinch analysis results not available. Please ensure you have selected at least 2 streams with complete data to run the analysis.")
# Notes section
st.markdown("---")
st.markdown("##### Notes")
if 'pinch_notes' not in st.session_state:
st.session_state['pinch_notes'] = ""
notes = st.text_area(
"Notes",
value=st.session_state['pinch_notes'],
height=100,
placeholder="Add notes about the pinch analysis here...",
key="pinch_notes_input",
label_visibility="collapsed"
)
st.session_state['pinch_notes'] = notes
# More information expander
with st.expander("More information"):
import pandas as pd
temps = results['temperatures']
pinch_streams = results['streams']
if pinch_streams and temps:
fig_interval = go.Figure()
num_streams = len(pinch_streams)
x_positions = [(i + 1) * 1.0 for i in range(num_streams)]
# Draw horizontal temperature lines
for temperature in temps:
fig_interval.add_shape(
type="line",
x0=0, x1=num_streams + 1,
y0=temperature, y1=temperature,
line=dict(color="gray", width=1, dash="dot"),
)
# Draw pinch temperature line
fig_interval.add_shape(
type="line",
x0=0, x1=num_streams + 1,
y0=results['pinch_temperature'], y1=results['pinch_temperature'],
line=dict(color="black", width=2, dash="dash"),
)
fig_interval.add_annotation(
x=num_streams + 0.5, y=results['pinch_temperature'],
text=f"Pinch: {results['pinch_temperature']:.1f}°C",
showarrow=False, font=dict(size=10),
xanchor='left'
)
# Draw stream arrows
for i, stream in enumerate(pinch_streams):
ss = stream['ss'] # Shifted supply temp
st_temp = stream['st'] # Shifted target temp
stream_type = stream['type']
x_pos = x_positions[i]
# Color based on stream type
color = 'red' if stream_type == 'HOT' else 'blue'
stream_name = streams_data[i]['name'] if i < len(streams_data) else f'Stream {i+1}'
# Draw arrow as a line with annotation for arrowhead
fig_interval.add_trace(go.Scatter(
x=[x_pos, x_pos],
y=[ss, st_temp],
mode='lines',
line=dict(color=color, width=8),
hovertemplate=f"{stream_name}
" +
f"Type: {stream_type}
" +
f"T_supply (shifted): {ss:.1f}°C
" +
f"T_target (shifted): {st_temp:.1f}°C
" +
f"CP: {stream['cp']:.2f} kW/K",
showlegend=False
))
# Add arrowhead
fig_interval.add_annotation(
x=x_pos, y=st_temp,
ax=x_pos, ay=ss,
xref='x', yref='y',
axref='x', ayref='y',
showarrow=True,
arrowhead=2,
arrowsize=1.5,
arrowwidth=3,
arrowcolor=color
)
# Stream label at top
label_y = max(ss, st_temp) + (max(temps) - min(temps)) * 0.03
fig_interval.add_annotation(
x=x_pos, y=label_y,
text=f"S{i+1}",
showarrow=False,
font=dict(size=11, color='white'),
bgcolor=color,
bordercolor='black',
borderwidth=1,
borderpad=3
)
# CP value in middle
mid_y = (ss + st_temp) / 2
fig_interval.add_annotation(
x=x_pos, y=mid_y,
text=f"CP={stream['cp']:.1f}",
showarrow=False,
font=dict(size=9, color='white'),
textangle=-90
)
fig_interval.update_layout(
title=dict(text='Shifted Temperature Interval Diagram', font=dict(size=14)),
xaxis=dict(
title='Streams',
showticklabels=False,
range=[0, num_streams + 1],
showgrid=False
),
yaxis=dict(
title='Shifted Temperature S (°C)',
showgrid=True,
gridcolor='rgba(0,0,0,0.1)'
),
height=400,
margin=dict(l=60, r=20, t=40, b=40),
hovermode='closest',
showlegend=False
)
st.plotly_chart(fig_interval, width='stretch', key="interval_chart")
except Exception as e:
st.error(f"Error: {str(e)}")