heattransplanapp / src /process_utils.py
drzg's picture
test
d502905
import pandas as pd
import io
import copy
from typing import List, Dict, Tuple, Optional, Any, Generator
def init_process_state(session_state):
if 'processes' not in session_state:
session_state['processes'] = [] # list of proc dicts.
if 'selected_process_idx' not in session_state:
session_state['selected_process_idx'] = None
REQUIRED_PROC_COLS = {"name","next","conntemp","product_tout","connm","conncp","stream_no","mdot","temp_in","temp_out","cp"}
# Level names for different hierarchy depths
LEVEL_NAMES = {
0: 'Process',
1: 'Subprocess',
2: 'Sub-subprocess',
3: 'Sub-sub-subprocess',
4: 'Sub-sub-sub-subprocess'
}
def get_level_name(level: int) -> str:
"""Get the display name for a hierarchy level."""
return LEVEL_NAMES.get(level, f'Level-{level} Process')
def create_stream(name: str = '', stream_type: str = 'product') -> Dict[str, Any]:
"""
Create a new stream with consistent structure.
This is the reusable function for creating streams at any level.
Args:
name: Name of the stream
stream_type: Type of stream (product, steam, air, water)
Returns:
A dict representing the stream with all standard fields
"""
return {
'name': name,
'type': stream_type,
'properties': {
'prop1': 'Tin',
'prop2': 'Tout',
'prop3': 'ṁ',
'prop4': 'cp'
},
'values': {
'val1': '',
'val2': '',
'val3': '',
'val4': ''
},
# Legacy fields for backward compatibility
'mdot': '',
'temp_in': '',
'temp_out': '',
'cp': ''
}
def create_process_node(name: str = '', level: int = 0) -> Dict[str, Any]:
"""
Create a new process/subprocess/sub-subprocess node with a consistent structure.
This is a REUSABLE function that creates nodes at ANY hierarchy level.
The same structure is used for:
- Processes (level 0)
- Subprocesses (level 1)
- Sub-subprocesses (level 2)
- And so on...
Args:
name: Name of the process node
level: Hierarchy level (0=process, 1=subprocess, 2=sub-subprocess, etc.)
Returns:
A dict representing the process node with all standard fields
"""
return {
'name': name or f'{get_level_name(level)} 1',
'level': level,
'next': '',
'conntemp': '', # Product Tin
'product_tout': '', # Product Tout
'connm': '', # Product ṁ
'conncp': '', # Product cp
'streams': [],
'children': [], # Sub-nodes (subprocesses, sub-subprocesses, etc.) - RECURSIVE!
'lat': None,
'lon': None,
'box_scale': 1.0 if level > 0 else 1.5,
'extra_info': {
'air_tin': '',
'air_tout': '',
'air_mdot': '',
'air_cp': '',
'water_content_in': '',
'water_content_out': '',
'density': '',
'pressure': '',
'notes': ''
},
'expanded': False, # UI state: whether this node is expanded
'info_expanded': False, # UI state: whether info section is expanded
'model': {'level1': None, 'level2': None}, # Process model selection
'params': { # Process parameters
'tin': '', 'tout': '', 'time': '', 'cp': '',
'mass_flow': None, 'thermal_power': None
},
'params_requested': False,
'hours': '' # Operating hours
}
def add_child_to_node(parent_node: Dict[str, Any], child_name: str = '') -> Dict[str, Any]:
"""
Add a child node (subprocess/sub-subprocess/etc.) to a parent node.
This works recursively at ANY level - the same function is used to:
- Add subprocess to process
- Add sub-subprocess to subprocess
- Add sub-sub-subprocess to sub-subprocess
- etc.
Args:
parent_node: The parent process node to add a child to
child_name: Optional name for the child
Returns:
The newly created child node
"""
if 'children' not in parent_node:
parent_node['children'] = []
parent_level = parent_node.get('level', 0)
child_level = parent_level + 1
child_index = len(parent_node['children'])
default_name = f"{get_level_name(child_level)} {child_index + 1}"
child_node = create_process_node(
name=child_name or default_name,
level=child_level
)
parent_node['children'].append(child_node)
return child_node
def delete_child_from_node(parent_node: Dict[str, Any], child_index: int) -> bool:
"""
Delete a child node from a parent node by index.
Works at any hierarchy level.
Args:
parent_node: The parent process node
child_index: Index of the child to delete
Returns:
True if successful, False otherwise
"""
if 'children' not in parent_node:
return False
children = parent_node['children']
if 0 <= child_index < len(children):
children.pop(child_index)
return True
return False
def add_stream_to_node(node: Dict[str, Any], stream_name: str = '') -> Dict[str, Any]:
"""
Add a stream to ANY process node (process, subprocess, sub-subprocess, etc.).
This is the REUSABLE function for adding streams at any level.
Args:
node: The process node to add a stream to
stream_name: Optional name for the stream
Returns:
The newly created stream
"""
if 'streams' not in node:
node['streams'] = []
stream_count = len(node['streams'])
stream = create_stream(
name=stream_name or f'Stream {stream_count + 1}',
stream_type='product'
)
node['streams'].append(stream)
return stream
def delete_stream_from_node(node: Dict[str, Any], stream_index: int) -> bool:
"""
Delete a stream from ANY process node.
This is the REUSABLE function for deleting streams at any level.
Args:
node: The process node
stream_index: Index of the stream to delete
Returns:
True if successful, False otherwise
"""
if 'streams' not in node:
return False
streams = node['streams']
if 0 <= stream_index < len(streams):
streams.pop(stream_index)
return True
return False
def iterate_all_nodes(nodes: List[Dict[str, Any]]) -> Generator[Tuple[Dict[str, Any], int, List[int]], None, None]:
"""
Generator that yields all nodes in a tree structure (depth-first).
Useful for operations that need to traverse the entire hierarchy.
Args:
nodes: List of root nodes
Yields:
Tuples of (node, level, path_indices)
- node: The process node
- level: Hierarchy level (0=process, 1=subprocess, etc.)
- path_indices: List of indices from root to this node, e.g., [0, 2, 1]
"""
def _iterate(node_list: List[Dict], level: int, path: List[int]):
for i, node in enumerate(node_list):
current_path = path + [i]
yield node, level, current_path
# Recurse into children
children = node.get('children', [])
if children:
yield from _iterate(children, level + 1, current_path)
yield from _iterate(nodes, 0, [])
def get_node_by_path(root_nodes: List[Dict[str, Any]], path: List[int]) -> Optional[Dict[str, Any]]:
"""
Get a node by its path indices (e.g., [0, 1, 2] means process 0, child 1, grandchild 2).
Args:
root_nodes: List of root process nodes
path: List of indices forming the path to the node
Returns:
The node if found, None otherwise
"""
if not path or path[0] >= len(root_nodes):
return None
current = root_nodes[path[0]]
for idx in path[1:]:
children = current.get('children', [])
if idx >= len(children):
return None
current = children[idx]
return current
def copy_streams_to_all_descendants(parent_node: Dict[str, Any]):
"""
Copy streams from a parent node to ALL its descendants (children, grandchildren, etc.).
This implements the requirement that changing streams should propagate to all descendants.
Args:
parent_node: The parent node whose streams to copy to all descendants
"""
parent_streams = parent_node.get('streams', [])
def _copy_recursive(node: Dict[str, Any]):
children = node.get('children', [])
for child in children:
# Deep copy the streams to child
child['streams'] = copy.deepcopy(parent_streams)
# Recurse to grandchildren
_copy_recursive(child)
_copy_recursive(parent_node)
def sync_node_with_parent(child_node: Dict[str, Any], parent_node: Dict[str, Any],
sync_streams: bool = True, sync_info: bool = False):
"""
Sync data from parent to a specific child node.
Args:
child_node: The child node to update
parent_node: The parent node to copy from
sync_streams: Whether to sync streams
sync_info: Whether to sync extra_info fields
"""
if sync_streams:
child_node['streams'] = copy.deepcopy(parent_node.get('streams', []))
if sync_info:
child_node['extra_info'] = copy.deepcopy(parent_node.get('extra_info', {}))
def count_all_descendants(node: Dict[str, Any]) -> int:
"""
Count all descendants (children, grandchildren, etc.) of a node.
Args:
node: The node to count descendants for
Returns:
Total number of descendants
"""
count = 0
children = node.get('children', [])
count += len(children)
for child in children:
count += count_all_descendants(child)
return count
# =============================================================================
# LEGACY FUNCTIONS - Keep for backward compatibility with existing code
# =============================================================================
def parse_process_csv_file(uploaded_file) -> Tuple[Optional[list], str]:
if uploaded_file is None:
return None, "No file provided"
try:
content = uploaded_file.read()
text = content.decode('utf-8')
df = pd.read_csv(io.StringIO(text))
if not REQUIRED_PROC_COLS.issubset(df.columns):
return None, "CSV missing required columns"
procs = []
proc_lookup = {}
for _, row in df.iterrows():
# Include product_tout in key for uniqueness if present
key = (row['name'], row['next'], row['conntemp'], row.get('product_tout',''), row['connm'], row['conncp'])
if key not in proc_lookup:
p = {
"name": row.get('name',''),
"next": row.get('next',''),
"conntemp": row.get('conntemp',''), # Product Tin
"product_tout": row.get('product_tout',''), # P Tout
"connm": row.get('connm',''), # P ṁ
"conncp": row.get('conncp',''), # P cp
"streams": [],
"children": [], # Support for sub-levels
"lat": row.get('lat') if 'lat' in df.columns else None,
"lon": row.get('lon') if 'lon' in df.columns else None,
}
procs.append(p)
proc_lookup[key] = p
stream_no = row.get('stream_no')
if pd.notna(stream_no):
proc_lookup[key]['streams'].append({
"mdot": row.get('mdot',''),
"temp_in": row.get('temp_in',''),
"temp_out": row.get('temp_out',''),
"cp": row.get('cp',''),
})
return procs, f"Loaded {len(procs)} processes"
except (UnicodeDecodeError, OSError, ValueError) as e:
return None, f"Failed: {e}"
def processes_to_csv_bytes(processes: List[Dict]) -> bytes:
"""Export processes to CSV, including nested children."""
rows = []
def _add_process_rows(p: Dict, parent_name: str = ''):
"""Recursively add rows for a process and its children."""
proc_name = p.get('name', '')
full_name = f"{parent_name} > {proc_name}" if parent_name else proc_name
if not p.get('streams'):
rows.append({
'name': full_name, 'next': p.get('next',''), 'conntemp': p.get('conntemp',''),
'product_tout': p.get('product_tout',''),
'connm': p.get('connm',''), 'conncp': p.get('conncp',''), 'stream_no': '',
'mdot':'','temp_in':'','temp_out':'','cp':'',
'lat': p.get('lat'), 'lon': p.get('lon'),
'level': p.get('level', 0)
})
else:
for idx, s in enumerate(p['streams'], start=1):
rows.append({
'name': full_name, 'next': p.get('next',''), 'conntemp': p.get('conntemp',''),
'product_tout': p.get('product_tout',''),
'connm': p.get('connm',''), 'conncp': p.get('conncp',''), 'stream_no': idx,
'mdot': s.get('mdot',''), 'temp_in': s.get('temp_in',''),
'temp_out': s.get('temp_out',''), 'cp': s.get('cp',''),
'lat': p.get('lat'), 'lon': p.get('lon'),
'level': p.get('level', 0)
})
# Recurse into children
for child in p.get('children', []):
_add_process_rows(child, full_name)
for p in processes:
_add_process_rows(p)
df = pd.DataFrame(rows)
buf = io.StringIO()
df.to_csv(buf, index=False)
return buf.getvalue().encode('utf-8')
def add_process(session_state):
"""Legacy function - adds a subprocess to the flat processes list."""
new_proc = create_process_node(level=1) # Level 1 for subprocess
new_proc['name'] = '' # Let UI set the name
session_state['processes'].append(new_proc)
session_state['selected_process_idx'] = len(session_state['processes'])-1
def delete_process(session_state, idx):
"""Legacy function - deletes a subprocess from the flat processes list."""
if 0 <= idx < len(session_state['processes']):
session_state['processes'].pop(idx)
if session_state['selected_process_idx'] == idx:
session_state['selected_process_idx'] = None
def add_stream_to_process(session_state, pidx):
"""Legacy function - adds a stream to a subprocess in the flat list."""
if 0 <= pidx < len(session_state['processes']):
add_stream_to_node(session_state['processes'][pidx])
def delete_stream_from_process(session_state, pidx, sidx):
"""Legacy function - deletes a stream from a subprocess in the flat list."""
if 0 <= pidx < len(session_state['processes']):
delete_stream_from_node(session_state['processes'][pidx], sidx)