import pandas as pd import io import copy from typing import List, Dict, Tuple, Optional, Any, Generator def init_process_state(session_state): if 'processes' not in session_state: session_state['processes'] = [] # list of proc dicts. if 'selected_process_idx' not in session_state: session_state['selected_process_idx'] = None REQUIRED_PROC_COLS = {"name","next","conntemp","product_tout","connm","conncp","stream_no","mdot","temp_in","temp_out","cp"} # Level names for different hierarchy depths LEVEL_NAMES = { 0: 'Process', 1: 'Subprocess', 2: 'Sub-subprocess', 3: 'Sub-sub-subprocess', 4: 'Sub-sub-sub-subprocess' } def get_level_name(level: int) -> str: """Get the display name for a hierarchy level.""" return LEVEL_NAMES.get(level, f'Level-{level} Process') def create_stream(name: str = '', stream_type: str = 'product') -> Dict[str, Any]: """ Create a new stream with consistent structure. This is the reusable function for creating streams at any level. Args: name: Name of the stream stream_type: Type of stream (product, steam, air, water) Returns: A dict representing the stream with all standard fields """ return { 'name': name, 'type': stream_type, 'properties': { 'prop1': 'Tin', 'prop2': 'Tout', 'prop3': 'ṁ', 'prop4': 'cp' }, 'values': { 'val1': '', 'val2': '', 'val3': '', 'val4': '' }, # Legacy fields for backward compatibility 'mdot': '', 'temp_in': '', 'temp_out': '', 'cp': '' } def create_process_node(name: str = '', level: int = 0) -> Dict[str, Any]: """ Create a new process/subprocess/sub-subprocess node with a consistent structure. This is a REUSABLE function that creates nodes at ANY hierarchy level. The same structure is used for: - Processes (level 0) - Subprocesses (level 1) - Sub-subprocesses (level 2) - And so on... Args: name: Name of the process node level: Hierarchy level (0=process, 1=subprocess, 2=sub-subprocess, etc.) Returns: A dict representing the process node with all standard fields """ return { 'name': name or f'{get_level_name(level)} 1', 'level': level, 'next': '', 'conntemp': '', # Product Tin 'product_tout': '', # Product Tout 'connm': '', # Product ṁ 'conncp': '', # Product cp 'streams': [], 'children': [], # Sub-nodes (subprocesses, sub-subprocesses, etc.) - RECURSIVE! 'lat': None, 'lon': None, 'box_scale': 1.0 if level > 0 else 1.5, 'extra_info': { 'air_tin': '', 'air_tout': '', 'air_mdot': '', 'air_cp': '', 'water_content_in': '', 'water_content_out': '', 'density': '', 'pressure': '', 'notes': '' }, 'expanded': False, # UI state: whether this node is expanded 'info_expanded': False, # UI state: whether info section is expanded 'model': {'level1': None, 'level2': None}, # Process model selection 'params': { # Process parameters 'tin': '', 'tout': '', 'time': '', 'cp': '', 'mass_flow': None, 'thermal_power': None }, 'params_requested': False, 'hours': '' # Operating hours } def add_child_to_node(parent_node: Dict[str, Any], child_name: str = '') -> Dict[str, Any]: """ Add a child node (subprocess/sub-subprocess/etc.) to a parent node. This works recursively at ANY level - the same function is used to: - Add subprocess to process - Add sub-subprocess to subprocess - Add sub-sub-subprocess to sub-subprocess - etc. Args: parent_node: The parent process node to add a child to child_name: Optional name for the child Returns: The newly created child node """ if 'children' not in parent_node: parent_node['children'] = [] parent_level = parent_node.get('level', 0) child_level = parent_level + 1 child_index = len(parent_node['children']) default_name = f"{get_level_name(child_level)} {child_index + 1}" child_node = create_process_node( name=child_name or default_name, level=child_level ) parent_node['children'].append(child_node) return child_node def delete_child_from_node(parent_node: Dict[str, Any], child_index: int) -> bool: """ Delete a child node from a parent node by index. Works at any hierarchy level. Args: parent_node: The parent process node child_index: Index of the child to delete Returns: True if successful, False otherwise """ if 'children' not in parent_node: return False children = parent_node['children'] if 0 <= child_index < len(children): children.pop(child_index) return True return False def add_stream_to_node(node: Dict[str, Any], stream_name: str = '') -> Dict[str, Any]: """ Add a stream to ANY process node (process, subprocess, sub-subprocess, etc.). This is the REUSABLE function for adding streams at any level. Args: node: The process node to add a stream to stream_name: Optional name for the stream Returns: The newly created stream """ if 'streams' not in node: node['streams'] = [] stream_count = len(node['streams']) stream = create_stream( name=stream_name or f'Stream {stream_count + 1}', stream_type='product' ) node['streams'].append(stream) return stream def delete_stream_from_node(node: Dict[str, Any], stream_index: int) -> bool: """ Delete a stream from ANY process node. This is the REUSABLE function for deleting streams at any level. Args: node: The process node stream_index: Index of the stream to delete Returns: True if successful, False otherwise """ if 'streams' not in node: return False streams = node['streams'] if 0 <= stream_index < len(streams): streams.pop(stream_index) return True return False def iterate_all_nodes(nodes: List[Dict[str, Any]]) -> Generator[Tuple[Dict[str, Any], int, List[int]], None, None]: """ Generator that yields all nodes in a tree structure (depth-first). Useful for operations that need to traverse the entire hierarchy. Args: nodes: List of root nodes Yields: Tuples of (node, level, path_indices) - node: The process node - level: Hierarchy level (0=process, 1=subprocess, etc.) - path_indices: List of indices from root to this node, e.g., [0, 2, 1] """ def _iterate(node_list: List[Dict], level: int, path: List[int]): for i, node in enumerate(node_list): current_path = path + [i] yield node, level, current_path # Recurse into children children = node.get('children', []) if children: yield from _iterate(children, level + 1, current_path) yield from _iterate(nodes, 0, []) def get_node_by_path(root_nodes: List[Dict[str, Any]], path: List[int]) -> Optional[Dict[str, Any]]: """ Get a node by its path indices (e.g., [0, 1, 2] means process 0, child 1, grandchild 2). Args: root_nodes: List of root process nodes path: List of indices forming the path to the node Returns: The node if found, None otherwise """ if not path or path[0] >= len(root_nodes): return None current = root_nodes[path[0]] for idx in path[1:]: children = current.get('children', []) if idx >= len(children): return None current = children[idx] return current def copy_streams_to_all_descendants(parent_node: Dict[str, Any]): """ Copy streams from a parent node to ALL its descendants (children, grandchildren, etc.). This implements the requirement that changing streams should propagate to all descendants. Args: parent_node: The parent node whose streams to copy to all descendants """ parent_streams = parent_node.get('streams', []) def _copy_recursive(node: Dict[str, Any]): children = node.get('children', []) for child in children: # Deep copy the streams to child child['streams'] = copy.deepcopy(parent_streams) # Recurse to grandchildren _copy_recursive(child) _copy_recursive(parent_node) def sync_node_with_parent(child_node: Dict[str, Any], parent_node: Dict[str, Any], sync_streams: bool = True, sync_info: bool = False): """ Sync data from parent to a specific child node. Args: child_node: The child node to update parent_node: The parent node to copy from sync_streams: Whether to sync streams sync_info: Whether to sync extra_info fields """ if sync_streams: child_node['streams'] = copy.deepcopy(parent_node.get('streams', [])) if sync_info: child_node['extra_info'] = copy.deepcopy(parent_node.get('extra_info', {})) def count_all_descendants(node: Dict[str, Any]) -> int: """ Count all descendants (children, grandchildren, etc.) of a node. Args: node: The node to count descendants for Returns: Total number of descendants """ count = 0 children = node.get('children', []) count += len(children) for child in children: count += count_all_descendants(child) return count # ============================================================================= # LEGACY FUNCTIONS - Keep for backward compatibility with existing code # ============================================================================= def parse_process_csv_file(uploaded_file) -> Tuple[Optional[list], str]: if uploaded_file is None: return None, "No file provided" try: content = uploaded_file.read() text = content.decode('utf-8') df = pd.read_csv(io.StringIO(text)) if not REQUIRED_PROC_COLS.issubset(df.columns): return None, "CSV missing required columns" procs = [] proc_lookup = {} for _, row in df.iterrows(): # Include product_tout in key for uniqueness if present key = (row['name'], row['next'], row['conntemp'], row.get('product_tout',''), row['connm'], row['conncp']) if key not in proc_lookup: p = { "name": row.get('name',''), "next": row.get('next',''), "conntemp": row.get('conntemp',''), # Product Tin "product_tout": row.get('product_tout',''), # P Tout "connm": row.get('connm',''), # P ṁ "conncp": row.get('conncp',''), # P cp "streams": [], "children": [], # Support for sub-levels "lat": row.get('lat') if 'lat' in df.columns else None, "lon": row.get('lon') if 'lon' in df.columns else None, } procs.append(p) proc_lookup[key] = p stream_no = row.get('stream_no') if pd.notna(stream_no): proc_lookup[key]['streams'].append({ "mdot": row.get('mdot',''), "temp_in": row.get('temp_in',''), "temp_out": row.get('temp_out',''), "cp": row.get('cp',''), }) return procs, f"Loaded {len(procs)} processes" except (UnicodeDecodeError, OSError, ValueError) as e: return None, f"Failed: {e}" def processes_to_csv_bytes(processes: List[Dict]) -> bytes: """Export processes to CSV, including nested children.""" rows = [] def _add_process_rows(p: Dict, parent_name: str = ''): """Recursively add rows for a process and its children.""" proc_name = p.get('name', '') full_name = f"{parent_name} > {proc_name}" if parent_name else proc_name if not p.get('streams'): rows.append({ 'name': full_name, 'next': p.get('next',''), 'conntemp': p.get('conntemp',''), 'product_tout': p.get('product_tout',''), 'connm': p.get('connm',''), 'conncp': p.get('conncp',''), 'stream_no': '', 'mdot':'','temp_in':'','temp_out':'','cp':'', 'lat': p.get('lat'), 'lon': p.get('lon'), 'level': p.get('level', 0) }) else: for idx, s in enumerate(p['streams'], start=1): rows.append({ 'name': full_name, 'next': p.get('next',''), 'conntemp': p.get('conntemp',''), 'product_tout': p.get('product_tout',''), 'connm': p.get('connm',''), 'conncp': p.get('conncp',''), 'stream_no': idx, 'mdot': s.get('mdot',''), 'temp_in': s.get('temp_in',''), 'temp_out': s.get('temp_out',''), 'cp': s.get('cp',''), 'lat': p.get('lat'), 'lon': p.get('lon'), 'level': p.get('level', 0) }) # Recurse into children for child in p.get('children', []): _add_process_rows(child, full_name) for p in processes: _add_process_rows(p) df = pd.DataFrame(rows) buf = io.StringIO() df.to_csv(buf, index=False) return buf.getvalue().encode('utf-8') def add_process(session_state): """Legacy function - adds a subprocess to the flat processes list.""" new_proc = create_process_node(level=1) # Level 1 for subprocess new_proc['name'] = '' # Let UI set the name session_state['processes'].append(new_proc) session_state['selected_process_idx'] = len(session_state['processes'])-1 def delete_process(session_state, idx): """Legacy function - deletes a subprocess from the flat processes list.""" if 0 <= idx < len(session_state['processes']): session_state['processes'].pop(idx) if session_state['selected_process_idx'] == idx: session_state['selected_process_idx'] = None def add_stream_to_process(session_state, pidx): """Legacy function - adds a stream to a subprocess in the flat list.""" if 0 <= pidx < len(session_state['processes']): add_stream_to_node(session_state['processes'][pidx]) def delete_stream_from_process(session_state, pidx, sidx): """Legacy function - deletes a stream from a subprocess in the flat list.""" if 0 <= pidx < len(session_state['processes']): delete_stream_from_node(session_state['processes'][pidx], sidx)