import streamlit as st import sys import os import plotly.graph_objects as go import matplotlib.pyplot as plt import tempfile import csv from io import BytesIO from PIL import Image, ImageDraw, ImageFont from graphics_utils import draw_smooth_ellipse import math # Add the pinch_tool directory to the path for imports pinch_tool_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'pinch_tool')) if pinch_tool_path not in sys.path: sys.path.insert(0, pinch_tool_path) # Import pinch analysis modules try: from Modules.Pinch.Pinch import Pinch PINCH_AVAILABLE = True PINCH_IMPORT_ERROR = None except ImportError as e: PINCH_AVAILABLE = False PINCH_IMPORT_ERROR = str(e) st.set_page_config( page_title="Potential Analysis", initial_sidebar_state="collapsed", layout="wide" ) # Helper function to convert lon/lat to pixel coordinates on snapshot def snapshot_lonlat_to_pixel(lon_val_in, lat_val_in, center_ll, z_level, img_w, img_h): def lonlat_to_xy(lon_inner, lat_inner, z_val): lat_rad = math.radians(lat_inner) n_val = 2.0 ** z_val xtile = (lon_inner + 180.0) / 360.0 * n_val ytile = (1.0 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2.0 * n_val return xtile, ytile lon0, lat0 = center_ll xtile0, ytile0 = lonlat_to_xy(lon0, lat0, z_level) xtile, ytile = lonlat_to_xy(lon_val_in, lat_val_in, z_level) dxtile = xtile - xtile0 dytile = ytile - ytile0 px_per_tile = 256 snapshot_px = img_w / 2 + dxtile * px_per_tile snapshot_py = img_h / 2 + dytile * px_per_tile return snapshot_px, snapshot_py # Apply styles immediately to prevent flash st.markdown( """ """, unsafe_allow_html=True, ) st.title("Potential Analysis") # ===================================================== # HELPER FUNCTION: Generate mini-map with kW circles for each STREAM # ===================================================== def generate_stream_kw_minimap(processes, map_snapshot, map_center, map_zoom, max_width=500, max_height=400): """ Generate a mini-map image showing each stream as a circle sized by kW. Streams are positioned near their parent subprocess location. Returns a PIL Image or None if no snapshot available. """ if not map_snapshot: return None try: # Load the base map snapshot base_img = Image.open(BytesIO(map_snapshot)).convert("RGBA") orig_w, orig_h = base_img.size # Calculate scale to fit within max dimensions while maintaining aspect ratio scale = min(max_width / orig_w, max_height / orig_h) new_w = int(orig_w * scale) new_h = int(orig_h * scale) # Resize the base image base_img = base_img.resize((new_w, new_h), Image.Resampling.LANCZOS) # Create drawing context draw = ImageDraw.Draw(base_img) # Try to load a font try: font = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", 11) font_small = ImageFont.truetype("/System/Library/Fonts/Arial.ttf", 9) except (OSError, IOError): try: font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", 11) font_small = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", 9) except (OSError, IOError): font = ImageFont.load_default() font_small = font # Collect all streams with their kW values and positions all_streams = [] for proc_idx, subprocess in enumerate(processes): sub_lat = subprocess.get('lat') sub_lon = subprocess.get('lon') subprocess_name = subprocess.get('name', f'Subprocess {proc_idx + 1}') streams = subprocess.get('streams', []) for s_idx, stream in enumerate(streams): stream_name = stream.get('name', f'Stream {s_idx + 1}') # Extract stream data props = stream.get('properties', {}) vals = stream.get('values', {}) tin = None tout = None mdot = None cp_val = None if isinstance(props, dict) and isinstance(vals, dict): for pk, pname in props.items(): vk = pk.replace('prop', 'val') v = vals.get(vk, '') if pname == 'Tin' and v: try: tin = float(v) except (ValueError, TypeError): pass elif pname == 'Tout' and v: try: tout = float(v) except (ValueError, TypeError): pass elif pname == 'ṁ' and v: try: mdot = float(v) except (ValueError, TypeError): pass elif pname == 'cp' and v: try: cp_val = float(v) except (ValueError, TypeError): pass # Fallback to legacy fields if tin is None and stream.get('temp_in'): try: tin = float(stream['temp_in']) except (ValueError, TypeError): pass if tout is None and stream.get('temp_out'): try: tout = float(stream['temp_out']) except (ValueError, TypeError): pass if mdot is None and stream.get('mdot'): try: mdot = float(stream['mdot']) except (ValueError, TypeError): pass if cp_val is None and stream.get('cp'): try: cp_val = float(stream['cp']) except (ValueError, TypeError): pass # Calculate kW = mdot * cp * |ΔT| stream_kw = 0.0 is_hot = None if tin is not None and tout is not None and mdot is not None and cp_val is not None: delta_t = abs(tin - tout) stream_kw = mdot * cp_val * delta_t is_hot = tin > tout # True = HOT (cooling), False = COLD (heating) all_streams.append({ 'proc_idx': proc_idx, 'stream_idx': s_idx, 'subprocess_name': subprocess_name, 'stream_name': stream_name, 'lat': sub_lat, 'lon': sub_lon, 'kw': stream_kw, 'is_hot': is_hot, 'tin': tin, 'tout': tout }) # Find max kW for scaling circle sizes kw_values = [s['kw'] for s in all_streams if s['kw'] > 0] max_kw = max(kw_values) if kw_values else 1.0 if max_kw == 0: max_kw = 1.0 # Group streams by subprocess for positioning subprocess_streams = {} for s in all_streams: key = s['proc_idx'] if key not in subprocess_streams: subprocess_streams[key] = [] subprocess_streams[key].append(s) # Draw circles for each stream for proc_idx, streams_list in subprocess_streams.items(): if not streams_list: continue # Get subprocess position first_stream = streams_list[0] lat = first_stream['lat'] lon = first_stream['lon'] if lat is None or lon is None: continue try: lat_f = float(lat) lon_f = float(lon) # Convert to pixel coordinates (on original size, then scale) base_px, base_py = snapshot_lonlat_to_pixel( lon_f, lat_f, (map_center[1], map_center[0]), map_zoom, orig_w, orig_h ) # Scale to new dimensions base_px = base_px * scale base_py = base_py * scale # Skip if outside bounds if base_px < -50 or base_py < -50 or base_px > new_w + 50 or base_py > new_h + 50: continue # Draw subprocess name first subprocess_name = first_stream['subprocess_name'] if font: bbox = draw.textbbox((0, 0), subprocess_name, font=font) tw = bbox[2] - bbox[0] th = bbox[3] - bbox[1] else: tw = len(subprocess_name) * 6 th = 10 # Draw subprocess label above streams label_x = int(base_px - tw / 2) label_y = int(base_py - 50) draw.rectangle([label_x - 3, label_y - 2, label_x + tw + 3, label_y + th + 2], fill=(255, 255, 255, 230), outline=(100, 100, 100, 200)) draw.text((label_x, label_y), subprocess_name, fill=(0, 0, 0, 255), font=font) # Position streams in a row below the label n_streams = len(streams_list) stream_spacing = 45 # pixels between stream circles start_x = base_px - (n_streams - 1) * stream_spacing / 2 for i, stream_data in enumerate(streams_list): px = start_x + i * stream_spacing py = base_py kw = stream_data['kw'] is_hot = stream_data['is_hot'] # Calculate circle radius based on kW (min 12, max 35 pixels) if kw > 0: radius = 12 + (kw / max_kw) * 23 else: radius = 10 # Determine color based on hot/cold if is_hot is True: fill_color = (255, 80, 80, 220) # Red for HOT border_color = (180, 30, 30, 255) elif is_hot is False: fill_color = (80, 140, 255, 220) # Blue for COLD border_color = (30, 80, 180, 255) else: fill_color = (180, 180, 180, 180) # Gray for unknown border_color = (120, 120, 120, 220) # Draw circle x0 = int(px - radius) y0 = int(py - radius) x1 = int(px + radius) y1 = int(py + radius) base_img = draw_smooth_ellipse(base_img, [x0, y0, x1, y1], fill=fill_color, outline=border_color, width=2) draw = ImageDraw.Draw(base_img) # Draw kW label inside circle if kw > 0: kw_text = f"{kw:.0f}" if font_small: bbox = draw.textbbox((0, 0), kw_text, font=font_small) text_w = bbox[2] - bbox[0] text_h = bbox[3] - bbox[1] else: text_w = len(kw_text) * 5 text_h = 8 tx = int(px - text_w / 2) ty = int(py - text_h / 2) # White text for visibility draw.text((tx, ty), kw_text, fill=(255, 255, 255, 255), font=font_small) # Draw stream name below circle stream_name = stream_data['stream_name'] if font_small: bbox = draw.textbbox((0, 0), stream_name, font=font_small) name_w = bbox[2] - bbox[0] name_h = bbox[3] - bbox[1] else: name_w = len(stream_name) * 5 name_h = 8 name_x = int(px - name_w / 2) name_y = int(py + radius + 4) draw.rectangle([name_x - 2, name_y - 1, name_x + name_w + 2, name_y + name_h + 1], fill=(255, 255, 255, 220)) draw.text((name_x, name_y), stream_name, fill=(0, 0, 0, 255), font=font_small) except (ValueError, TypeError): continue # Add legend in top-left corner legend_x = 10 legend_y = 10 legend_w = 70 legend_h = 55 # Legend background draw.rectangle([legend_x, legend_y, legend_x + legend_w, legend_y + legend_h], fill=(255, 255, 255, 240), outline=(150, 150, 150, 200)) # Legend title draw.text((legend_x + 5, legend_y + 3), "kW", fill=(0, 0, 0, 255), font=font) # Hot indicator base_img = draw_smooth_ellipse(base_img, [legend_x + 5, legend_y + 20, legend_x + 17, legend_y + 32], fill=(255, 80, 80, 220), outline=(180, 30, 30, 255), width=1) draw = ImageDraw.Draw(base_img) draw.text((legend_x + 22, legend_y + 21), "Hot", fill=(0, 0, 0, 255), font=font_small) # Cold indicator base_img = draw_smooth_ellipse(base_img, [legend_x + 5, legend_y + 37, legend_x + 17, legend_y + 49], fill=(80, 140, 255, 220), outline=(30, 80, 180, 255), width=1) draw = ImageDraw.Draw(base_img) draw.text((legend_x + 22, legend_y + 38), "Cold", fill=(0, 0, 0, 255), font=font_small) return base_img except Exception as e: return None # Initialize session state for selections if not exists if 'selected_items' not in st.session_state: st.session_state['selected_items'] = {} # Get processes from session state processes = st.session_state.get('processes', []) if not processes: st.info("No processes found. Please add processes in the Data Collection page first.") else: # Helper function to determine stream type and extract data def get_stream_info(stream): """Extract Tin, Tout, mdot, cp from stream and determine if HOT or COLD""" properties = stream.get('properties', {}) values = stream.get('values', {}) tin = None tout = None mdot = None cp_val = None # Check properties dict structure if isinstance(properties, dict) and isinstance(values, dict): for pk, pname in properties.items(): vk = pk.replace('prop', 'val') v = values.get(vk, '') if pname == 'Tin' and v: try: tin = float(v) except (ValueError, TypeError): pass elif pname == 'Tout' and v: try: tout = float(v) except (ValueError, TypeError): pass elif pname == 'ṁ' and v: try: mdot = float(v) except (ValueError, TypeError): pass elif pname == 'cp' and v: try: cp_val = float(v) except (ValueError, TypeError): pass # Fallback to legacy fields if tin is None and stream.get('temp_in'): try: tin = float(stream['temp_in']) except (ValueError, TypeError): pass if tout is None and stream.get('temp_out'): try: tout = float(stream['temp_out']) except (ValueError, TypeError): pass if mdot is None and stream.get('mdot'): try: mdot = float(stream['mdot']) except (ValueError, TypeError): pass if cp_val is None and stream.get('cp'): try: cp_val = float(stream['cp']) except (ValueError, TypeError): pass # Determine stream type stream_type = None if tin is not None and tout is not None: if tin > tout: stream_type = "HOT" else: stream_type = "COLD" # Calculate CP if possible cp_flow = None if mdot is not None and cp_val is not None: cp_flow = mdot * cp_val # Calculate kW = mdot * cp * |ΔT| kw = None if tin is not None and tout is not None and mdot is not None and cp_val is not None: kw = mdot * cp_val * abs(tin - tout) return { 'tin': tin, 'tout': tout, 'mdot': mdot, 'cp': cp_val, 'CP': cp_flow, 'kW': kw, 'type': stream_type } # ===================================================== # TWO-COLUMN LAYOUT: Stream Selection (left) + Map (right) # ===================================================== stream_col, map_col = st.columns([1, 1.2]) with stream_col: st.markdown("**Select streams for analysis:**") # Display each process and its streams for idx, process in enumerate(processes): process_name = process.get('name', f'Subprocess {idx + 1}') # Only show process header if it has streams streams = process.get('streams', []) if streams: st.markdown(f"**{process_name}**") for stream_idx, stream in enumerate(streams): stream_key = f"stream_{idx}_{stream_idx}" if stream_key not in st.session_state['selected_items']: st.session_state['selected_items'][stream_key] = False stream_cols = st.columns([0.05, 0.20, 0.75]) stream_selected = stream_cols[0].checkbox( "S", key=f"cb_{stream_key}", value=st.session_state['selected_items'][stream_key], label_visibility="collapsed" ) st.session_state['selected_items'][stream_key] = stream_selected # Display stream name stream_name = stream.get('name', f'Stream {stream_idx + 1}') stream_cols[1].write(stream_name) # Get stream info and display type + key values info = get_stream_info(stream) display_parts = [] if info['tin'] is not None: display_parts.append(f"Tin:{info['tin']}°C") if info['tout'] is not None: display_parts.append(f"Tout:{info['tout']}°C") if info['kW'] is not None: display_parts.append(f"**{info['kW']:.0f} kW**") if info['type']: type_color = "🔴" if info['type'] == "HOT" else "🔵" display_parts.append(f"{type_color} {info['type']}") if display_parts: stream_cols[2].caption(' | '.join(display_parts)) else: stream_cols[2].caption("(incomplete data)") with map_col: st.markdown("**Energy Map Overview (circle size = kW):**") # Generate and display mini-map with kW circles for each stream map_snapshot = st.session_state.get('map_snapshot') map_snapshots = st.session_state.get('map_snapshots', {}) current_base = st.session_state.get('current_base', 'OpenStreetMap') # Use the appropriate snapshot based on current base layer if current_base in map_snapshots: snapshot_to_use = map_snapshots[current_base] else: snapshot_to_use = map_snapshot map_center = st.session_state.get('map_center', [51.708, 8.772]) map_zoom = st.session_state.get('map_zoom', 17.5) if snapshot_to_use: minimap_img = generate_stream_kw_minimap( processes=processes, map_snapshot=snapshot_to_use, map_center=map_center, map_zoom=map_zoom, max_width=600, max_height=450 ) if minimap_img: st.image(minimap_img) else: st.caption("📍 Could not generate map preview") else: st.info("📍 Lock the map in Data Collection page first to see the energy overview.") # Count selected streams selected_count = sum(1 for k, v in st.session_state['selected_items'].items() if v and k.startswith("stream_")) # ===================================================== # PINCH ANALYSIS SECTION # ===================================================== st.markdown("---") if not PINCH_AVAILABLE: st.error(f"Pinch analysis module not available: {PINCH_IMPORT_ERROR or 'Unknown error'}") st.info("Please ensure the pinch_tool module is properly installed.") else: # Helper function to extract stream data from selection def extract_stream_data(procs, sel_items): """ Extract stream data from selected items. Returns list of dicts with: CP (calculated as mdot * cp), Tin, Tout """ result_streams = [] for sel_key, is_sel in sel_items.items(): if not is_sel: continue if sel_key.startswith("stream_"): parts_split = sel_key.split("_") p_idx = int(parts_split[1]) s_idx = int(parts_split[2]) if p_idx < len(procs): proc = procs[p_idx] proc_streams = proc.get('streams', []) if s_idx < len(proc_streams): strm = proc_streams[s_idx] # Extract values from properties/values structure props = strm.get('properties', {}) vals = strm.get('values', {}) tin = None tout = None mdot = None cp_val = None # Check properties dict structure if isinstance(props, dict) and isinstance(vals, dict): for pk, pname in props.items(): vk = pk.replace('prop', 'val') v = vals.get(vk, '') if pname == 'Tin' and v: try: tin = float(v) except (ValueError, TypeError): pass elif pname == 'Tout' and v: try: tout = float(v) except (ValueError, TypeError): pass elif pname == 'ṁ' and v: try: mdot = float(v) except (ValueError, TypeError): pass elif pname == 'cp' and v: try: cp_val = float(v) except (ValueError, TypeError): pass # Fallback to legacy fields if tin is None and strm.get('temp_in'): try: tin = float(strm['temp_in']) except (ValueError, TypeError): pass if tout is None and strm.get('temp_out'): try: tout = float(strm['temp_out']) except (ValueError, TypeError): pass if mdot is None and strm.get('mdot'): try: mdot = float(strm['mdot']) except (ValueError, TypeError): pass if cp_val is None and strm.get('cp'): try: cp_val = float(strm['cp']) except (ValueError, TypeError): pass # Calculate CP = mdot * cp if tin is not None and tout is not None and mdot is not None and cp_val is not None: CP = mdot * cp_val strm_name = strm.get('name', f'Stream {s_idx + 1}') proc_nm = proc.get('name', f'Subprocess {p_idx + 1}') result_streams.append({ 'name': f"{proc_nm} - {strm_name}", 'CP': CP, 'Tin': tin, 'Tout': tout }) return result_streams # Helper function to run pinch analysis def run_pinch_analysis(strm_data, delta_tmin): """ Run pinch analysis on the given stream data. Returns the Pinch object with results. """ # Create a temporary CSV file with the stream data with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, newline='') as f: writer = csv.writer(f) writer.writerow(['Tmin', str(delta_tmin)]) writer.writerow(['CP', 'TSUPPLY', 'TTARGET']) for strm in strm_data: writer.writerow([strm['CP'], strm['Tin'], strm['Tout']]) temp_csv_path = f.name try: # Run pinch analysis without drawing (we'll draw ourselves) pinch_obj = Pinch(temp_csv_path, options={}) pinch_obj.shiftTemperatures() pinch_obj.constructTemperatureInterval() pinch_obj.constructProblemTable() pinch_obj.constructHeatCascade() pinch_obj.constructShiftedCompositeDiagram('EN') pinch_obj.constructCompositeDiagram('EN') pinch_obj.constructGrandCompositeCurve('EN') return pinch_obj finally: # Clean up temp file os.unlink(temp_csv_path) # Extract stream data from selections streams_data = extract_stream_data(processes, st.session_state['selected_items']) if len(streams_data) < 2: st.info("Select at least 2 streams with complete data (Tin, Tout, ṁ, cp) to run pinch analysis.") # Show what data is missing for selected streams if selected_count > 0: st.markdown("**Data status for selected items:**") for sel_key, is_sel in st.session_state['selected_items'].items(): if not is_sel: continue if sel_key.startswith("stream_"): parts_split = sel_key.split("_") p_idx = int(parts_split[1]) s_idx = int(parts_split[2]) if p_idx < len(processes): proc = processes[p_idx] proc_streams = proc.get('streams', []) if s_idx < len(proc_streams): strm = proc_streams[s_idx] strm_name = strm.get('name', f'Stream {s_idx + 1}') proc_nm = proc.get('name', f'Subprocess {p_idx + 1}') # Check what data is available props = strm.get('properties', {}) vals = strm.get('values', {}) has_tin = False has_tout = False has_mdot = False has_cp = False if isinstance(props, dict) and isinstance(vals, dict): for pk, pname in props.items(): vk = pk.replace('prop', 'val') v = vals.get(vk, '') if pname == 'Tin' and v: has_tin = True elif pname == 'Tout' and v: has_tout = True elif pname == 'ṁ' and v: has_mdot = True elif pname == 'cp' and v: has_cp = True # Fallback to legacy if not has_tin and strm.get('temp_in'): has_tin = True if not has_tout and strm.get('temp_out'): has_tout = True if not has_mdot and strm.get('mdot'): has_mdot = True if not has_cp and strm.get('cp'): has_cp = True missing = [] if not has_tin: missing.append("Tin") if not has_tout: missing.append("Tout") if not has_mdot: missing.append("ṁ") if not has_cp: missing.append("cp") if missing: st.warning(f"⚠️ {proc_nm} - {strm_name}: Missing {', '.join(missing)}") else: st.success(f"✅ {proc_nm} - {strm_name}: Complete data") else: # Auto-run pinch analysis try: # Row: Shifted toggle | ΔTmin (small) | spacer | Hot Utility | Cold Utility | Pinch Temp toggle_col, tmin_col, spacer, metric1, metric2, metric3 = st.columns([0.6, 0.5, 0.4, 0.7, 0.7, 0.7]) with toggle_col: show_shifted = st.toggle("Show Shifted Composite Curves", value=False, key="shifted_toggle") with tmin_col: tmin = st.number_input( "ΔTmin", min_value=1.0, max_value=50.0, value=10.0, step=1.0, key="tmin_input", format="%.0f" ) pinch = run_pinch_analysis(streams_data, tmin) results = { 'hot_utility': pinch.hotUtility, 'cold_utility': pinch.coldUtility, 'pinch_temperature': pinch.pinchTemperature, 'tmin': pinch.tmin, 'composite_diagram': pinch.compositeDiagram, 'shifted_composite_diagram': pinch.shiftedCompositeDiagram, 'grand_composite_curve': pinch.grandCompositeCurve, 'heat_cascade': pinch.heatCascade, 'unfeasible_heat_cascade': pinch.unfeasibleHeatCascade, 'problem_table': pinch.problemTable, 'temperatures': pinch._temperatures, 'streams': list(pinch.streams) } metric1.metric("Hot Utility", f"{results['hot_utility']:.2f} kW") metric2.metric("Cold Utility", f"{results['cold_utility']:.2f} kW") metric3.metric("Pinch Temp", f"{results['pinch_temperature']:.1f} °C") # Side by side plots: Composite Curves (left) and Grand Composite Curve (right) plot_col1, plot_col2 = st.columns(2) # Build hover text for streams hot_streams = [s for s in streams_data if s['Tin'] > s['Tout']] cold_streams = [s for s in streams_data if s['Tin'] < s['Tout']] with plot_col1: fig1 = go.Figure() # Select which diagram to show if show_shifted: diagram = results['shifted_composite_diagram'] curve_label = "Shifted" title_text = "Shifted Composite Curves" # For shifted, temperatures are shifted by ±Tmin/2 tmin_half = results['tmin'] / 2 else: diagram = results['composite_diagram'] curve_label = "" title_text = "Composite Curves" tmin_half = 0 # Hot composite curve with hover info hot_T = diagram['hot']['T'] hot_H = diagram['hot']['H'] # Create hover text for hot curve points hot_hover = [] for i, (h, t) in enumerate(zip(hot_H, hot_T)): # Find streams at this temperature (adjust for shifted temps) if show_shifted: actual_t = t + tmin_half # Convert back to actual temp else: actual_t = t matching = [s['name'] for s in hot_streams if min(s['Tin'], s['Tout']) <= actual_t <= max(s['Tin'], s['Tout'])] stream_info = '
'.join(matching) if matching else 'Composite' label = f"Hot {curve_label}" if curve_label else "Hot Composite" hot_hover.append(f"{label}
T: {t:.1f}°C
H: {h:.1f} kW
Streams: {stream_info}") fig1.add_trace(go.Scatter( x=hot_H, y=hot_T, mode='lines+markers', name='Hot', line=dict(color='red', width=2), marker=dict(size=6), hovertemplate='%{text}', text=hot_hover )) # Cold composite curve with hover info cold_T = diagram['cold']['T'] cold_H = diagram['cold']['H'] # Create hover text for cold curve points cold_hover = [] for i, (h, t) in enumerate(zip(cold_H, cold_T)): if show_shifted: actual_t = t - tmin_half # Convert back to actual temp else: actual_t = t matching = [s['name'] for s in cold_streams if min(s['Tin'], s['Tout']) <= actual_t <= max(s['Tin'], s['Tout'])] stream_info = '
'.join(matching) if matching else 'Composite' label = f"Cold {curve_label}" if curve_label else "Cold Composite" cold_hover.append(f"{label}
T: {t:.1f}°C
H: {h:.1f} kW
Streams: {stream_info}") fig1.add_trace(go.Scatter( x=cold_H, y=cold_T, mode='lines+markers', name='Cold', line=dict(color='blue', width=2), marker=dict(size=6), hovertemplate='%{text}', text=cold_hover )) # Pinch temperature line fig1.add_hline( y=results['pinch_temperature'], line_dash='dash', line_color='gray', annotation_text=f"Pinch: {results['pinch_temperature']:.1f}°C", annotation_position='top right' ) fig1.update_layout( title=dict(text=title_text, font=dict(size=14)), xaxis_title='Enthalpy H (kW)', yaxis_title='Temperature T (°C)', height=400, margin=dict(l=60, r=20, t=40, b=50), legend=dict(x=0.7, y=0.1), hovermode='closest', xaxis=dict(rangemode='tozero'), yaxis=dict(rangemode='tozero') ) st.plotly_chart(fig1, width='stretch', key="composite_chart") with plot_col2: fig2 = go.Figure() gcc_H = results['grand_composite_curve']['H'] gcc_T = results['grand_composite_curve']['T'] heat_cascade = results['heat_cascade'] temperatures = results['temperatures'] # Create hover text for GCC points gcc_hover = [] for i, (h, t) in enumerate(zip(gcc_H, gcc_T)): if i < len(heat_cascade): dh = heat_cascade[i]['deltaH'] region = 'Heat deficit (needs heating)' if dh > 0 else ('Heat surplus (needs cooling)' if dh < 0 else 'Balanced') else: region = '' gcc_hover.append(f"GCC
Shifted T: {t:.1f}°C
Net ΔH: {h:.1f} kW
{region}") # Plot GCC with color segments for i in range(len(gcc_H) - 1): if i < len(heat_cascade): if heat_cascade[i]['deltaH'] > 0: color = 'red' elif heat_cascade[i]['deltaH'] < 0: color = 'blue' else: color = 'gray' else: color = 'gray' fig2.add_trace(go.Scatter( x=[gcc_H[i], gcc_H[i+1]], y=[gcc_T[i], gcc_T[i+1]], mode='lines+markers', line=dict(color=color, width=2), marker=dict(size=6, color=color), hovertemplate='%{text}', text=[gcc_hover[i], gcc_hover[i+1] if i+1 < len(gcc_hover) else ''], showlegend=False )) # Pinch temperature line fig2.add_hline( y=results['pinch_temperature'], line_dash='dash', line_color='gray', annotation_text=f"Pinch: {results['pinch_temperature']:.1f}°C", annotation_position='top right' ) # Zero enthalpy line fig2.add_vline(x=0, line_color='black', line_width=1, opacity=0.3) fig2.update_layout( title=dict(text='Grand Composite Curve', font=dict(size=14)), xaxis_title='Net ΔH (kW)', yaxis_title='Shifted Temperature (°C)', height=400, margin=dict(l=60, r=20, t=40, b=50), hovermode='closest', yaxis=dict(rangemode='tozero') ) st.plotly_chart(fig2, width='stretch', key="gcc_chart") # More information expander with st.expander("More information"): import pandas as pd temps = results['temperatures'] pinch_streams = results['streams'] if pinch_streams and temps: fig_interval = go.Figure() num_streams = len(pinch_streams) x_positions = [(i + 1) * 1.0 for i in range(num_streams)] # Draw horizontal temperature lines for temperature in temps: fig_interval.add_shape( type="line", x0=0, x1=num_streams + 1, y0=temperature, y1=temperature, line=dict(color="gray", width=1, dash="dot"), ) # Draw pinch temperature line fig_interval.add_shape( type="line", x0=0, x1=num_streams + 1, y0=results['pinch_temperature'], y1=results['pinch_temperature'], line=dict(color="black", width=2, dash="dash"), ) fig_interval.add_annotation( x=num_streams + 0.5, y=results['pinch_temperature'], text=f"Pinch: {results['pinch_temperature']:.1f}°C", showarrow=False, font=dict(size=10), xanchor='left' ) # Draw stream arrows for i, stream in enumerate(pinch_streams): ss = stream['ss'] # Shifted supply temp st_temp = stream['st'] # Shifted target temp stream_type = stream['type'] x_pos = x_positions[i] # Color based on stream type color = 'red' if stream_type == 'HOT' else 'blue' stream_name = streams_data[i]['name'] if i < len(streams_data) else f'Stream {i+1}' # Draw arrow as a line with annotation for arrowhead fig_interval.add_trace(go.Scatter( x=[x_pos, x_pos], y=[ss, st_temp], mode='lines', line=dict(color=color, width=8), hovertemplate=f"{stream_name}
" + f"Type: {stream_type}
" + f"T_supply (shifted): {ss:.1f}°C
" + f"T_target (shifted): {st_temp:.1f}°C
" + f"CP: {stream['cp']:.2f} kW/K", showlegend=False )) # Add arrowhead fig_interval.add_annotation( x=x_pos, y=st_temp, ax=x_pos, ay=ss, xref='x', yref='y', axref='x', ayref='y', showarrow=True, arrowhead=2, arrowsize=1.5, arrowwidth=3, arrowcolor=color ) # Stream label at top label_y = max(ss, st_temp) + (max(temps) - min(temps)) * 0.03 fig_interval.add_annotation( x=x_pos, y=label_y, text=f"S{i+1}", showarrow=False, font=dict(size=11, color='white'), bgcolor=color, bordercolor='black', borderwidth=1, borderpad=3 ) # CP value in middle mid_y = (ss + st_temp) / 2 fig_interval.add_annotation( x=x_pos, y=mid_y, text=f"CP={stream['cp']:.1f}", showarrow=False, font=dict(size=9, color='white'), textangle=-90 ) fig_interval.update_layout( title=dict(text='Shifted Temperature Interval Diagram', font=dict(size=14)), xaxis=dict( title='Streams', showticklabels=False, range=[0, num_streams + 1], showgrid=False ), yaxis=dict( title='Shifted Temperature S (°C)', showgrid=True, gridcolor='rgba(0,0,0,0.1)' ), height=400, margin=dict(l=60, r=20, t=40, b=40), hovermode='closest', showlegend=False ) st.plotly_chart(fig_interval, width='stretch', key="interval_chart") st.markdown("---") # Problem Table st.markdown("##### Problem Table") if results['problem_table']: problem_df = pd.DataFrame(results['problem_table']) # Rename columns for clarity col_rename = { 'T': 'T (°C)', 'deltaT': 'ΔT (°C)', 'cpHot': 'ΣCP Hot (kW/K)', 'cpCold': 'ΣCP Cold (kW/K)', 'deltaCp': 'ΔCP (kW/K)', 'deltaH': 'ΔH (kW)' } problem_df = problem_df.rename(columns={k: v for k, v in col_rename.items() if k in problem_df.columns}) st.dataframe(problem_df, width='stretch', hide_index=True) else: st.info("No problem table data available") # Heat Cascades side by side cascade_col1, cascade_col2 = st.columns(2) with cascade_col1: st.markdown("##### Unfeasible Heat Cascade") if results['unfeasible_heat_cascade']: # Add temperature column to dataframe unfeasible_data = [] for i, item in enumerate(results['unfeasible_heat_cascade']): row = {'T (°C)': temps[i+1] if i+1 < len(temps) else '', 'ΔH (kW)': item['deltaH'], 'Cascade (kW)': item['exitH']} unfeasible_data.append(row) unfeasible_df = pd.DataFrame(unfeasible_data) st.dataframe(unfeasible_df, width='stretch', hide_index=True) else: st.info("No unfeasible cascade data") with cascade_col2: st.markdown("##### Feasible Heat Cascade") if results['heat_cascade']: # Add temperature column to dataframe feasible_data = [] for i, item in enumerate(results['heat_cascade']): row = {'T (°C)': temps[i+1] if i+1 < len(temps) else '', 'ΔH (kW)': item['deltaH'], 'Cascade (kW)': item['exitH']} feasible_data.append(row) feasible_df = pd.DataFrame(feasible_data) st.dataframe(feasible_df, width='stretch', hide_index=True) else: st.info("No feasible cascade data") except Exception as e: st.error(f"Error: {str(e)}")